协调并行事务

在使用 Neo4j 集群时,在大多数情况下默认强制执行因果一致性 (causal consistency),从而保证查询能够读取之前查询所做的更改。然而,对于并行运行的多个事务,默认情况下并不会发生这种情况。在这种情况下,你可以使用书签 (bookmarks) 让一个事务等待另一个事务的结果在集群中传播完毕,然后再执行自己的工作。

这不是强制要求,并且只有当你需要在不同事务之间保持因果一致性时,才应使用书签,因为等待书签可能会对性能产生负面影响。

使用 .ExecutableQuery() 时的书签

使用 .ExecutableQuery() 查询数据库时,驱动程序会为你管理书签。在这种情况下,你可以保证后续查询能够读取之前的更改,而无需进行任何额外操作。

await driver.ExecutableQuery("<QUERY 1>").ExecuteAsync();

// subsequent .ExecutableQuery() calls will be causally chained

await driver.ExecutableQuery("<QUERY 2>").ExecuteAsync();  // can read result of <QUERY 1>
await driver.ExecutableQuery("<QUERY 3>").ExecuteAsync();  // can read result of <QUERY 2>

若要禁用书签管理和因果一致性,请在查询配置中使用 enableBookmarkManager: false

await driver.ExecutableQuery("<QUERY>")
    .WithConfig(new QueryConfig(enableBookmarkManager: false))
    .ExecuteAsync();

单个会话内的书签

书签管理对于在单个会话中运行的查询是自动进行的:同一会话内的查询在因果上是链接在一起的。

using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
await session.ExecuteWriteAsync(async tx => await tx.RunAsync("<QUERY 1>"));
await session.ExecuteWriteAsync(async tx => await tx.RunAsync("<QUERY 2>"));  // can read QUERY 1
await session.ExecuteWriteAsync(async tx => await tx.RunAsync("<QUERY 3>"));  // can read QUERY 1,2

跨多个会话的书签

如果你的应用程序使用多个会话,你可能需要确保一个会话在另一个会话运行其查询之前已完成其所有事务。

在下面的示例中,允许 sessionAsessionB 并发运行,而 sessionC 会等待直到它们的结果完成传播。这保证了 sessionC 想要操作的 Person 节点确实存在。

使用书签协调多个会话
using Neo4j.Driver;

const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";

await using var driver = GraphDatabase.Driver(dbUri, AuthTokens.Basic(dbUser, dbPassword));
await driver.VerifyConnectivityAsync();

await createSomeFriends(driver);

async Task createSomeFriends(IDriver driver) {
    Bookmarks savedBookmarks = Bookmarks.From(new List<string>());  // to collect the sessions' bookmarks

    // Create the first person and employment relationship
    using var sessionA = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
    await sessionA.ExecuteWriteAsync(tx => createPerson(tx, "Alice"));
    await sessionA.ExecuteWriteAsync(tx => employ(tx, "Alice", "Wayne Enterprises"));
    savedBookmarks += sessionA.LastBookmarks;  (1)

    // Create the second person and employment relationship
    using var sessionB = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
    await sessionB.ExecuteWriteAsync(tx => createPerson(tx, "Bob"));
    await sessionB.ExecuteWriteAsync(tx => employ(tx, "Bob", "LexCorp"));
    savedBookmarks += sessionB.LastBookmarks;  (1)

    // Create a friendship between the two people created above
    using var sessionC = driver.AsyncSession(conf => conf
        .WithDatabase("<database-name>")
        .WithBookmarks(savedBookmarks)  (2)
    );
    await sessionC.ExecuteWriteAsync(tx => createFriendship(tx, "Alice", "Bob"));
    await sessionC.ExecuteWriteAsync(tx => printFriendships(tx));
}

// Create a person node
async Task createPerson(IAsyncQueryRunner tx, string name) {
    await tx.RunAsync("MERGE (:Person {name: $name})", new { name = name });
}

// Create an employment relationship to a pre-existing company node
// This relies on the person first having been created.
async Task employ(IAsyncQueryRunner tx, string personName, string companyName) {
    await tx.RunAsync(@"
        MATCH (person:Person {name: $personName})
        MATCH (company:Company {name: $companyName})
        CREATE (person)-[:WORKS_FOR]->(company)
        ", new { personName = personName, companyName = companyName }
    );
}

// Create a friendship between two people
async Task createFriendship(IAsyncQueryRunner tx, string nameA, string nameB) {
    await tx.RunAsync(@"
        MATCH (a:Person {name: $nameA})
        MATCH (b:Person {name: $nameB})
        MERGE (a)-[:KNOWS]->(b)
        ", new { nameA = nameA, nameB = nameB }
    );
}

// Retrieve and display all friendships
async Task printFriendships(IAsyncQueryRunner tx) {
    var result = await tx.RunAsync("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name");
    while (await result.FetchAsync()) {
        var record = result.Current;
        Console.WriteLine(record.Get<string>("a.name") + " knows " + record.Get<string>("b.name"));
    }
}
1 使用 AsyncSession.LastBookmarks 从不同会话中收集并合并书签,并将它们存储在 Bookmarks 对象中。
2 使用 .WithBookmarks() 配置方法将这些书签用于初始化另一个会话。

driver passing bookmarks

使用书签可能会对性能产生负面影响,因为所有查询都被迫等待最新的更改在集群中传播。如果可能,请将查询组合在单个事务或单个会话中执行。

混合使用 .ExecutableQuery() 和会话

为了确保部分使用 .ExecutableQuery() 执行、部分使用会话执行的事务之间的因果一致性,你可以通过 driver.GetExecutableQueryBookmarkManager() 获取 ExecutableQuery 接口的默认书签管理器,并通过 SessionConfigBuilder.WithBookmarkManager() 配置方法将其传递给新会话。这将确保所有工作都在同一个书签管理器下执行,从而保持因果一致。

await driver.ExecutableQuery("<QUERY 1>").ExecuteAsync();

using var session = driver.AsyncSession(conf => conf.
    WithDatabase("<database-name>")
    .WithBookmarkManager(driver.GetExecutableQueryBookmarkManager()));

// every query inside this session will be causally chained
// (i.e., can read what was written by <QUERY 1>)
await session.ExecuteWriteAsync(async tx => await tx.RunAsync("<QUERY 2>"));


// subsequent ExecutableQuery calls will also be causally chained
// (i.e., can read what was written by <QUERY 2>)
await driver.ExecutableQuery("<QUERY 3>").ExecuteAsync();

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 .ExecuteReadAsync().ExecuteWriteAsync() 调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。

IDriver

IDriver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。