运行您自己的事务

使用 .ExecutableQuery() 查询数据库 时,驱动程序会自动创建一个事务。事务是一个工作单元,要么全部提交,要么在失败时回滚。你可以在单个查询中包含多个 Cypher 语句,例如在 更新数据库 时连续使用 MATCHCREATE,但不能在多个查询之间插入客户端逻辑。

对于这些更高级的用例,驱动程序提供了手动控制事务的函数。最常见的形式是托管事务,你可以将其视为一种解构 ExecutableQuery() 流程的方式,并能够在更多地方指定其所需行为。

创建会话

在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间的查询通道,并确保强制执行因果一致性

会话是通过 IDriver.AsyncSession() 方法创建的。使用可选参数来更改会话配置,例如 目标数据库。有关更多配置参数,请参阅 会话配置

using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));

创建会话是一个轻量级操作,因此创建和销毁会话的开销不大。完成工作后,请务必 关闭会话

会话不是线程安全的:你可以在不同线程间共享主要的 IDriver 对象,但每个线程都应该创建自己的会话。

运行托管事务

事务可以包含多个查询。由于 Neo4j 符合 ACID 标准,事务内的查询要么作为一个整体执行,要么完全不执行:您不会遇到事务一部分成功而另一部分失败的情况。使用事务将协同工作以实现单个逻辑数据库操作的相关查询分组在一起。

你可以使用 IAsyncSession.ExecuteRead()IAsyncSession.ExecuteWrite() 方法创建托管事务,具体取决于你是想从数据库检索数据还是修改数据。这两个方法都接收一个 事务函数 回调,该函数负责实际执行查询和处理结果。

检索名字以 Al 开头的人员。
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));  (1)
var people = await session.ExecuteReadAsync(  (2)
    async tx => {
        var result = await tx.RunAsync(@"
            MATCH (p:Person) WHERE p.name STARTS WITH $filter  (3)
            RETURN p.name AS name, 3 as age ORDER BY name
            ", new { filter = "Al" }
        );

        // Loop through the records asynchronously
        // `.Current` holds the record read by `.FetchAsync()`
        var people = new List<string>();
        while (await result.FetchAsync()) {  (4)
            people.Add(result.Current.Get<string>("name"));
        }
        // or `await result.ToListAsync()` to retrieve all records as a list.

        // further tx.RunAsync() calls will execute within the same transaction

        return people;
    }
);

foreach (var person in people) {
    Console.WriteLine(person);
}
1 创建一个会话。单个会话可以作为多个查询的容器。除非将其作为资源与 using 关键字一起使用,否则请记得在完成后将其关闭。
2 .ExecuteReadAsync()(或 .ExecuteWriteAsync())方法是进入事务的入口点。它接收一个事务函数的回调,该函数负责运行查询。
3 使用 tx.RunAsync() 方法执行查询。你可以提供一个查询参数映射作为第二个参数。每个查询运行都会返回一个 IResultCursor 对象。
4 处理结果 使用 IResultCursor 上的任何方法。.FetchAsync() 方法检索队列中的下一条记录并将其存储在 .Current 属性中;.ToListAsync() 方法将所有记录检索到列表中。

不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数

事务函数不应直接返回结果对象。相反,始终以某种方式 处理结果。在事务函数内部,return 语句会导致事务提交,而如果抛出异常,事务将自动回滚。

包含多个查询、客户端逻辑和潜在回滚的事务
using Neo4j.Driver;

const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";

await using var driver = GraphDatabase.Driver(dbUri, AuthTokens.Basic(dbUser, dbPassword));
await driver.VerifyConnectivityAsync();

// Create & employ 100 people to 10 different organizations
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
for (int i=0; i<100; i++) {
    var name = $"Thor{i}";
    try {
        string orgId = await session.ExecuteWriteAsync(async tx => {
            return await employPersonTx(tx, name);
        });
        Console.WriteLine($"User {name} added to organization {orgId}.");
    } catch (Neo4jException e) {
        Console.WriteLine(e);
    }
}

async Task<string> employPersonTx(IAsyncQueryRunner tx, string name) {
    var employeeThreshold = 10;

    // Create new Person node with given name, if not exists already
    await tx.RunAsync("MERGE (p:Person {name: $name})", new { name = name });

    // Obtain most recent organization ID and the number of people linked to it
    var result = await tx.RunAsync(@"
        MATCH (o:Organization)
        RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
        ORDER BY o.createdDate DESC
        LIMIT 1
        ");

    string orgId = "";
    int employeesN = 0;
    bool org = await result.FetchAsync();
    if (org == false) {
        // If no organization exists, create one and add Person to it
        orgId = await createOrganization(tx);
        Console.WriteLine($"No orgs available, created {orgId}.");
    } else {
        orgId = result.Current.Get<string>("id");
        employeesN = result.Current.Get<int>("employeesN");
    }

    // If org does not have too many employees, add this Person to it
    if (employeesN < employeeThreshold) {
        await addPersonToOrganization(tx, name, orgId);
        // If the above throws, the transaction will roll back
        // -> not even Person is created!

    // Otherwise, create a new Organization and link Person to it
    } else {
        orgId = await createOrganization(tx);
        Console.WriteLine($"Latest org is full, created {orgId}.");
        await addPersonToOrganization(tx, name, orgId);
        // If any of the above throws, the transaction will roll back
        // -> not even Person is created!
    }

    return orgId;  // Organization ID to which the new Person ends up in
}

async Task<string> createOrganization(IAsyncQueryRunner tx) {
    var result = await tx.RunAsync(@"
        CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
        RETURN o.id AS id
    ");
    bool org = await result.FetchAsync();  // can't be false
    string orgId = result.Current.Get<string>("id");
    return orgId;
}

async Task addPersonToOrganization(IAsyncQueryRunner tx, string personName, string orgId) {
    await tx.RunAsync(@"
        MATCH (o:Organization {id: $orgId})
        MATCH (p:Person {name: $name})
        MERGE (p)-[:WORKS_FOR]->(o)
        ", new { orgId = orgId, name = personName }
    );
}

如果失败被视为瞬态(例如由于服务器暂时不可用),驱动程序会自动重试运行失败的事务。如果操作在配置的 最大重试时间 后仍然失败,将抛出异常。

由于事务可能会被重新运行,事务函数在多次运行时应该产生相同的效果(幂等性,因为你无法预先知道它们会被执行多少次。请注意,虽然事务函数可能会执行多次,但其中的数据库查询始终只会运行一次。

一个会话可以链接多个事务,但在任何给定时间,一个会话内只能激活一个事务。要维护多个并发事务,请使用多个并发会话。

运行显式事务

你可以通过使用 IAsyncSession.BeginTransactionAsync() 方法手动开启事务来完全控制事务,该方法返回一个 IAsyncTransaction 对象。然后,你可以使用 .RunAsync() 方法在显式事务中运行查询。

using var session = driver.AsyncSession(SessionConfigBuilder.ForDatabase("<database-name>"));
using var tx = await session.BeginTransactionAsync();
// use tx.RunAsync()      to run queries
//     tx.CommitAsync()   to commit the transaction
//     tx.RollbackAsync() to roll the transaction back

显式事务可以使用 .CommitAsync() 提交,或使用 .RollbackAsync() 回滚。如果没有采取显式操作,驱动程序将在其生命周期结束时自动回滚事务。

由于瞬态服务器错误导致 tx.Run() 运行失败的查询可以在无需更改原始请求的情况下进行重试。你可以通过 IsRetriable 属性判断错误是否为瞬态,该属性提供了关于进一步尝试是否可能成功的见解。

显式事务对于需要将 Cypher 执行分布在同一个事务的多个函数中,或者需要在一个事务内运行多个查询但不需要托管事务提供的自动重试的应用程序最有用。

涉及外部 API 的显式事务示例
using Neo4j.Driver;

const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";

await using var driver = GraphDatabase.Driver(dbUri, AuthTokens.Basic(dbUser, dbPassword));
await driver.VerifyConnectivityAsync();

string customerId = await createCustomer(driver);
int otherBankId = 42;
await transferToOtherBank(driver, customerId, otherBankId, 999);

async Task<string> createCustomer(IDriver driver) {
    var result = await driver.ExecutableQuery(@"
        MERGE (c:Customer {id: randomUUID(), balance: 1000})
        RETURN c.id AS id
        ")
        .WithConfig(new QueryConfig(database: "<database-name>"))
        .ExecuteAsync();
    return result.Result[0].Get<string>("id");
}

async Task transferToOtherBank(IDriver driver, string customerId, int otherBankId, float amount) {
    using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
    using var tx = await session.BeginTransactionAsync();

    if (! await customerBalanceCheck(tx, customerId, amount)) {
        Console.WriteLine($"Customer {customerId} doesn't have enough funds.");
        return;  // give up
    }

    otherBankTransferApi(customerId, otherBankId, amount);
    // Now the money has been transferred => can't rollback anymore
    // (cannot rollback external services interactions)

    try {
        await decreaseCustomerBalance(tx, customerId, amount);
        await tx.CommitAsync();
        Console.WriteLine($"Transferred {amount} to {customerId}.");
    } catch (Neo4jException e) {
        requestInspection(customerId, otherBankId, amount, e);
        throw new InvalidOperationException(e.Message);
    }
}

async Task<bool> customerBalanceCheck(IAsyncTransaction tx, string customerId, float amount) {
    var result = await tx.RunAsync(@"
        MATCH (c:Customer {id: $id})
        RETURN c.balance >= $amount AS sufficient
        ", new { id = customerId, amount = amount }
    );
    bool isRecord = await result.FetchAsync();
    if (isRecord == false) {
        return false;
    }
    var record = result.Current;
    return record.Get<bool>("sufficient");
}

void otherBankTransferApi(string customerId, int otherBankId, float amount) {
    // make some API call to other bank
}

async Task decreaseCustomerBalance(IAsyncTransaction tx, string customerId, float amount) {
    await tx.RunAsync(@"
        MATCH (c:Customer {id: $id})
        SET c.balance = c.balance - $amount
        ", new { id = customerId, amount = amount }
    );
}

void requestInspection(string customerId, int otherBankId, float amount, Exception e) {
    // manual cleanup required; log this or similar
    Console.WriteLine($"WARNING: transaction rolled back due to exception: {e.Message}.");
    Console.WriteLine($"customerId: {customerId}, otherBankId: {otherBankId}, amount: {amount}.");
}

处理查询结果

驱动程序的查询输出是一个 IResultCursor 对象,它将 Cypher 结果封装在一个丰富的数据结构中,需要客户端进行一些解析。需要注意以下两个要点:

  • 结果记录不是立即且全部由服务器获取并返回的。相反,结果以延迟流的形式出现。具体来说,当驱动程序从服务器接收到一些记录时,它们最初在后台队列中进行缓冲。记录会保留在缓冲区中,直到被应用程序消费,此时它们会从缓冲区中移除。当没有更多可用记录时,结果即耗尽

  • 结果充当游标。这意味着除非您将其保存在辅助数据结构中,否则无法从流中检索先前的记录。

下方的动画演示了单个查询的路径:它展示了驱动程序如何处理结果记录,以及应用程序应如何处理结果。

处理结果最简单的方法是在其上调用 .ToListAsync(),这将产生一个 IRecord 对象列表。否则,IResultCursor 对象实现了许多用于处理记录的属性和方法。下面列出了最常用的那些。

属性/方法 描述

Current IRecord

持有游标当前指向的记录。

FetchAsync() Task<bool>

将游标推进到结果中的下一条记录。如果没有更多记录可用,则返回 false

ToListAsync() Task<List<IRecord>>

将结果的剩余部分作为列表返回。

PeekAsync() Task<IRecord>

返回结果中的下一条记录而不消耗它。这会将该记录留在缓冲区中以供进一步处理。

ConsumeAsync() Task<IResultSummary>

返回查询 结果摘要。它会耗尽结果,因此仅应在数据处理结束后调用。

有关 IResultCursor 方法的完整列表,请参阅 API 文档 → IResultCursor

会话配置

你可以通过创建会话时的可选 SessionConfig 参数自定义会话的行为。使用 SessionConfigBuilder 创建会话配置对象。

数据库选择

始终通过 .WithDatabase("<dbName>") 会话配置方法显式指定数据库,即使是在单数据库实例上也是如此。这允许驱动程序更高效地工作,因为它节省了到服务器以解析主数据库的网络往返。如果没有指定数据库,则使用 Neo4j 实例设置中设定的 默认数据库

using var session = driver.AsyncSession(conf => conf
    .WithDatabase("<database-name>")
);
建议通过配置方法指定数据库,而不是使用 USE Cypher 子句。如果服务器在集群上运行,带有 USE 的查询需要启用 服务器端路由。查询的执行时间也可能更长,因为它们第一次尝试时可能无法到达正确的集群成员,需要被路由到包含所请求数据库的集群成员。

请求路由

在集群环境中,所有会话都以写入模式打开,并路由到领导者(Leader)。你可以通过 .WithDefaultAccessMode(AccessMode.Read) 方法更改此设置。请注意,.ExecuteReadAsync().ExecuteWriteAsync() 会自动覆盖会话的默认访问模式。

using var session = driver.AsyncSession(conf => conf
    .WithDatabase("<database-name>")
    .WithDefaultAccessMode(AccessMode.Read)
);

虽然在读取模式下执行写入查询会导致运行时错误,但你不应依赖此来进行访问控制。这两种模式之间的区别在于,读取事务被路由到集群的任何节点,而写入事务被定向到主节点(primaries)。不能保证以读取模式提交的写入查询会被拒绝。

关于 .ExecuteReadAsync().ExecuteWriteAsync() 方法也有类似的说明。

以其他用户身份运行查询

你可以通过向 .WithAuthToken() 配置方法提供 IAuthToken,以不同用户身份执行查询。在会话级别切换用户比创建新的 IDriver 对象成本更低。查询将在给定用户的安全上下文中运行(例如,主数据库、权限等)。

using var session = driver.AsyncSession(conf => conf
    .WithDatabase("<database-name>")
    .WithAuthToken(AuthTokens.Basic("<username>", "<password>"))
);

.WithImpersonatedUser() 方法提供了类似的功能:区别在于你不需要知道用户的密码即可模拟他们,但创建 IDriver 所使用的用户需要具有 相应的权限

using var session = driver.AsyncSession(conf => conf
    .WithDatabase("<database-name>")
    .WithImpersonatedUser("<username>")
);

事务配置

你可以通过提供一个 TransactionConfig 对象作为 .ExecuteReadAsync().ExecuteWriteAsync().BeginTransactionAsync() 的(可选)最后一个参数来进一步控制事务。你可以通过 TransactionConfigBuilder 创建一个。使用它来指定:

  • 事务超时。运行时间更长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒。

  • 附加到事务的元数据映射。这些元数据会记录在服务器的 query.log 中,并且在 SHOW TRANSACTIONS Cypher 命令的输出中可见。使用此功能标记事务。

using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
var people = await session.ExecuteReadAsync(
    async tx => {
        var result = await tx.RunAsync("MATCH (p:Person) RETURN p");
        return await result.ToListAsync();
    },
    conf => conf
        .WithTimeout(new TimeSpan(5))
        .WithMetadata(new Dictionary<string, object>{ { "appName", "peopleTracker" } })
);
foreach (var person in people) {
    Console.WriteLine(person);
}

关闭会话

每个连接池都有有限数量的会话,因此如果你打开会话而不关闭它们,应用程序可能会耗尽会话。因此,建议使用 using 关键字创建会话,当作用域结束时它会自动关闭会话。当会话关闭时,它会被返回到连接池中以便稍后重用。

如果你没有使用 using 打开会话,请记得在使用完毕后调用 .DisposeAsync() 方法。

var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));

// session usage

await session.DisposeAsync();

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 .ExecuteReadAsync().ExecuteWriteAsync() 调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。

IDriver

IDriver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。