开发者中心 » 语言 » C# » 教程 » 探索 Neo4j .NET 驱动程序的高级功能

探索 Neo4j .NET 驱动程序的高级功能

简介

在第一个教程中,您迈出了 Neo4j 和 .NET 的第一步。您设置了一个项目,安装了 Neo4j 驱动程序包,并针对示例数据库运行了查询。您还学习了驱动程序如何将结果映射回您的应用程序对象,以及开发更稳健的企业级应用程序时的一些最佳实践。

所有这些代码都使用了 ExecutableQuery API 接口,这是与驱动程序交互的最直接方式。在本教程中,我们将探讨驱动程序更高级的功能和 API,在这些功能中,简洁性和自动化功能可以换取对事务生命周期、执行和结果处理的更大控制权。

更高级的驱动程序用法

使用 ExecutableQuery 接口时,驱动程序会将您希望运行的查询封装在事务中。这允许在出错时提交或回滚查询。但是,此接口确实将您限制为仅运行单个查询。

为了支持更复杂的用例(您可能希望在单个事务中运行多个查询,并可能在其中穿插一些客户端逻辑),驱动程序提供了用于控制事务生命周期的 API。

会话(Session)对象

驱动程序有一个称为“会话”的概念。会话为您提供了运行多个查询事务的接口,无论是通过托管事务还是手动管理事务。会话还在您在其中运行的查询之间强制执行因果一致性,这意味着您可以读取自己的写入内容。会话对象是轻量级的,旨在短时间使用;但是,它们不是线程安全的。不要缓存或保留它们,否则会占用它们利用的资源(例如与数据库的连接)。

让我们创建一个驱动程序实例并连接到“Goodreads”示例数据库。如果您已经完成了其他 .NET 材料,这看起来会很熟悉。

using Neo4j.Driver;

var driverInstance = GraphDatabase.Driver(new Uri("neo4j+s://demo.neo4jlabs.com"),
        AuthTokens.Basic("goodreads", "goodreads"));

现在,我们不再使用 ExecutableQuery 接口连接到 Neo4j,而是创建一个 Session

await using var session = driverInstance
        .AsyncSession(conf => conf.WithDatabase("goodreads"));

我们遵循了另一个最佳实践,即在配置中指定数据库名称。这有助于提高性能并减少与服务器的往返次数。

与驱动程序对象不同,会话对象旨在轻量级且易于实例化。它们不应被长期持有。如果会话对象不允许关闭,那么它正在使用的资源(如数据库连接)将不会返回到可供其他会话使用的状态。这会占用资源并随着时间的推移影响驱动程序的效率。

在会话中,我们可以在单个事务中运行任意数量的查询。有两种方法可以做到这一点:使用托管事务或使用手动事务。

使用事务函数的托管事务

API 的这一部分包含两个方法——ExecuteRead 和 ExecuteWrite。这两个方法都接受一个回调函数(通常作为 Lambda 提供),该函数接收一个事务作为参数。然后,您可以在回调中使用它来运行所需的任何查询,以及任何其他应用程序代码。该 API 会为您处理事务创建以及提交和回滚逻辑。它还实现了重试机制,我们将在下文进一步讨论。

让我们看一个使用带事务函数的托管事务的示例。

string cypher = @"MATCH (r:Review)-[:WRITTEN_FOR]->(b:Book)<-[:AUTHORED]-(a:Author)
   WHERE a.name = $name
   RETURN b.title AS Title, r.text AS ReviewText, r.rating As Rating
   ORDER BY r.rating DESC
   LIMIT 20";

//Call the entry point for running the transaction supplying a transaction function
//callback.
var queryResult = await session.ExecuteReadAsync(async tx =>	  ①
{
	//Use the transaction to run the query
        var resultCursor = await tx
        	.RunAsync(cypher, new {name = "Stephen King"})  ②
               .ConfigureAwait(false);

	//Stream the records from the server and extract the data
        var reviews = new List<object>();
        while (await resultCursor.FetchAsync())  ③
        {
        reviews.Add( new { Title: resultCursor.Current.Get<string>("Title"),
       	      	ReviewText: resultCursor.Current.Get<string>("ReviewText"),
           	Rating: resultCursor.Current.Get<int>("Rating")});
        }
        return reviews;
})
.ConfigureAwait(false);
  1. 这是运行查询的入口点。有两个可用方法:ExecuteReadExecuteWrite。您的选择取决于您是希望从数据库检索数据还是修改它。这些方法接受对事务函数的回调,该函数接收一个事务作为参数。
  2. 在这里,调用事务上的 RunAsync 方法,并传入查询字符串和任何参数(可选)。返回一个 IResultCursor,它表示将从中检索记录的流。不过,此时尚未从服务器接收到任何内容。
  3. 现在我们获取记录,这将启动驱动程序从服务器流式传输结果的过程。因为您可以在接收记录时对其进行操作,所以您可以更有效地利用内存。

重试和重要说明

我们应该在此处提及事务函数的一个重要功能,以及为什么 ExecutableQuery API 接口构建在它之上:可重试性和瞬态错误弹性。

提供给事务函数的回调被封装在一个事务中。这意味着如果在任何时候出现瞬态错误(可能是连接失败到服务器端问题的任何情况),驱动程序将回滚并重试该事务,例如重新运行事务函数及其中的任何内容。

需要注意的重要一点是,由于事务函数可能被多次运行,因此它必须是幂等的。这意味着无论执行多少次,事务函数的结果都应保持不变。例如,如果您要为节点的属性生成一个值(也许是增加计数器),那么如果事务成功提交,但发生了连接错误,并且驱动程序没有收到提交成功的消息,那么将发生重试,计数器将比预期大 1。这不是幂等的,因为结果可能会不同步。

这种幂等性也应适用于应用程序状态和查询本身。在事务函数内,您不应调用有状态且无法在不改变行为的情况下处理多次调用的代码。

单个事务函数中的多个查询

请注意,此示例将写入数据库。我们一直连接的 goodreads 数据库是只读的。如果您希望运行此示例代码和查询,可以使用 Neo4j Desktop 和此处找到的 Cypher 查询创建数据库的本地实例:https://github.com/JMHReif/graph-demo-datasets/blob/main/goodreadsUCSD/50k-books-ai/goodreadsai-50k-data-import.cypher

假设您有一个工作单元,需要多个相互依赖的查询才能完成。您可以使用事务函数来执行此操作。为了演示这一点,我们将创建一个重试安全的模式,用于使用两个查询更新书籍的聚合评分。

查询

string readCypher = @"MATCH (b:Book {title:$title)<-[:WRITTEN_FOR]-(r:Review)
        RETURN b.book_id AS bookId, avg(r.rating) 
        AS avgRating, count(r) AS ratingsCount";
string writeCypher = @"MATCH (b:Book {book_id: $bookId})
        SET b.average_rating = $avgRating, 
            b.ratings_count = $ratingsCount
        RETURN b"

代码

await using var session = driver
        .AsyncSession(conf => conf.WithDatabase("goodreads"));

await session.ExecuteWriteAsync(async tx => ①
{
    // Query 1: Compute fresh aggregates from reviews
    var result1 = await tx
        .RunAsync(readCypher, 
            new { title = "The Tommyknockers" })
        .ConfigureAwait(false);

    var record = await result1.SingleAsync();  ②
    var avgRating = record["avgRating"].As<double>();
    var ratingsCount = record["ratingsCount"].As<long>();
    var bookId = record["bookId"].As<string>();

    // Query 2: Update the book with recomputed values
    var result2 = await tx
        .RunAsync(writeCypher, ③
            new { bookId, avgRating, ratingsCount })
        .ConfiguerAwait(false);

    var resultSummary = await result2.ConsumeAsync().ConfigureAwait(false); ④
})
.ConfigureAwait(false);
  1. 我们正在使用 ExecuteWrite 事务函数,因为我们将在事务期间更新数据库。
  2. SingleAsync 是结果游标上的一个有用的扩展方法。如果您知道只会返回单个记录,这是一个获取它的便捷方法。
  3. 在这里,我们运行更新数据库的第二段 Cypher。它依赖于第一个查询,因为它使用了该第一个 Cypher 片段从数据库检索到的参数。这演示了一个涉及多个 Cypher 查询的简单工作单元,它将作为单个事务提交、回滚并可能被重试。
  4. 在这里,我们借此机会展示如何获取 IResultSummary。这包含有关已运行查询的有用信息。请注意,一旦调用此方法,结果游标正在获取的流中任何未使用的记录都将被丢弃。

此事务函数作为对幂等性至关重要性的简单演示。它是重试安全的,因为

  1. 我们不增加计数器,计数器不是幂等的。
  2. 我们每次都从源数据重新计算。
  3. 如果事务函数由于瞬态错误而重试,查询会重写相同的稳定聚合值。

我们避免了驱动程序重试和计数器方法带来的关键同步风险,例如

SET b.average_rating = ...

//or

b.ratings_count = b.ratings_count + 1

因此,我们不是增加计数,而是每次从所有 Review 节点重新计算聚合。这样,重试就是安全的——无论事务执行多少次,都会写入相同的确定性结果。

手动事务

有时,您确实需要对事务的生命周期进行手动控制。在这些情况下,您可能:

  • 希望有条件地提交或回滚。
  • 决定何时将查询发送到服务器。
  • 需要控制它是否重试。

为了允许这种级别的控制,我们在会话对象上提供了事务接口。

您可以显式打开一个事务,而不是将委托传递给 ExecuteReadAsyncExecuteWriteAsync

await using var session = driver.AsyncSession();

// Start a write transaction manually
var tx = await session.BeginTransactionAsync();

此时,您拥有一个 IAsyncTransaction 对象。此对象为您提供完全控制:您可以运行查询、提交或回滚。您可以将各种参数和配置传递给 BeginTransaction 方法,包括访问模式(读取或写入),具体取决于您是否会更新数据库。如果您不传递此参数,则将使用创建会话时配置的默认访问模式。

接下来,我们要运行查询并提交或回滚它们。使用上一个示例中的相同两个查询,我们得到:

await using var session = driver.AsyncSession();
var tx = await session.BeginTransactionAsync(); ①

try
{
        // Query 1: Compute fresh aggregates from reviews 
    	var result1 = await tx.RunAsync(readCypher, ②
  	   	      new { title = "The Tommyknockers" })
                .ConfigureAwait(false);

    	var record = await result1.SingleAsync();  
    	var avgRating = record["avgRating"].As<double>();
    	var ratingsCount = record["ratingsCount"].As<long>();
    	var bookId = record["bookId"].As<string>();

    	// Query 2: Update the book with recomputed values
    	var result2 = await tx.RunAsync(writeCypher, ③
   	              new { bookId, avgRating, ratingsCount })
                .ConfiguerAwait(false);

    	var resultSummary = await result2.ConsumeAsync().ConfigureAwait(false); 

    	await tx.CommitAsync();  // Commit all changes ④
}
catch (Exception e)
{
    	await tx.RollbackAsync(); // Undo everything ⑤
}
  1. 创建事务。在这里,我们可以覆盖会话的默认访问模式(如果需要)或指定一些特定于事务的配置。
  2. 以与使用事务函数相同的方式运行查询。
  3. 运行第二个查询。
  4. 一旦我们在事务中要完成的工作完成,我们就可以提交它。
  5. 如果发生错误,try-catch 语句将触发,我们可以回滚事务,以便不对数据库进行任何修改。

为什么要选择读取和写入?

您可能想知道为什么需要在 ExecuteReadExecuteWrite 事务函数或手动事务中的读取/写入访问模式之间进行选择,特别是当其中一个示例显示读取查询在 ExecuteWrite 函数中愉快地运行时。

我们不想在这里深入探讨,但高层解释很有用。在 Neo4j 集群服务器环境中,存在读取器和写入器(通常称为跟随者和领导者)。领导者既可以读取也可以写入,而跟随者只能读取。当您有读取繁重的工作负载时,最好让它在跟随者上执行,这样领导者就不会变得太忙,最终延迟对数据库的写入,成为整个系统的瓶颈。这两个事务函数使客户端代码能够以合适的方式将请求路由到集群的成员。

结果流 vs EagerResult

在本教程和第一个教程中,我们看到了从数据库检索数据的两种主要方式。

  1. Eager 结果(通过 ExecutableQuery
  2. 流式结果(通过事务函数中的 IResultCursor

它们都能为您获得查询结果,但在实践中有所不同。让我们回顾一下原因。

1. Eager 结果 — 一次性全部获取

使用 ExecutableQuery,驱动程序不会胡闹——它运行您的 Cypher,提取所有结果,并将它们以一个整洁的包交给您。例如

var result = await session
        .ExecutableQuery("MATCH (b:Book) RETURN b.title AS Title")
        .ExecuteAsync();

//Do something with the results
foreach (var record in result.Records)
{
        Console.WriteLine(record["Title"].As<string>());	
}

在这里,Neo4j 已经将整个结果集发送回客户端。您可以随心所欲地多次循环遍历它,传递它,无论做什么——它只是驻留在内存中等待您。

这使得 ExecutableQuery 对于中小型结果集非常方便。但是,它确实意味着如果您的查询匹配数百万个节点,您会在内存使用中注意到它。

2. 使用结果游标进行流式传输 — 每次一条记录

另一方面,IResultCursor 提供了一个流式 API。它不会预先缓冲所有内容,而是让您直接从服务器一条记录一条记录地拉取结果

await session.ExecuteReadAsync(async tx =>
{
        var cursor = await tx.RunAsync("MATCH (b:Book) RETURN b.title AS Title");

	//Fetch the results and do something with them
    	while (await cursor.FetchAsync())
    	{
         	var name = cursor.Current["name"].As<string>();
        	Console.WriteLine(name);
    	}
});

在这里,结果是惰性进入的——只有当您调用 FetchAsync() 时,驱动程序才会向服务器请求更多数据。如果您遇到以下情况,这非常完美:

  • 您正在处理巨大的结果集。
  • 您不想一次将所有内容放入内存。
  • 或者您已经在事务中,并希望根据当前结果运行更多查询。

权衡之处在于,您需要自己编写更多的样板代码,并且游标仅在事务打开时有效(不要将 IResultCursor 传递到事务函数之外)。

那么你应该使用哪一个?

  • 如果您只需要一个节点或值的快速列表,请使用 ExecutableQuery。它干净、简单,非常适合中小型查询。
  • 如果您正在流式传输数千或数百万个结果,或者需要在结果到达时做出反应,请坚持在事务中使用 IResultCursor

可以这样想:

  • Eager 结果 → “现在给我整个表。”
  • 流式游标 → “在我保持连接打开的同时,让我逐行读取。”

因果一致性和书签

在集群或云设置中使用 Neo4j 时,会出现一个很快的问题:“如果我在一个事务中写入一些数据,我如何确保我可以在下一个事务中立即读取它?” 

这就是因果一致性书签发挥作用的地方。因果一致性意味着保证您的读取能够“看到”它们之前发生的写入的影响。Neo4j 使用书签使这成为可能。

在 .NET 驱动程序中,您通常不会手动管理书签——但您可以这样做。

这是一个简单的示例

await using var session = driver.AsyncSession(o => o
        .WithDefaultAccessMode(AccessMode.Write));

var writeTx = await session.WriteTransactionAsync(async tx =>
{
    	await tx.RunAsync("CREATE (:Book {name: 'The Stand'})"); ①
    	return true;
});

// Grab the bookmark from the session
var bookmarks = session.LastBookmarks; ②

// Use the bookmark in a new session
await using var readSession = driver.AsyncSession(o => o
    	.WithDefaultAccessMode(AccessMode.Read)
    	.WithBookmarks(bookmarks)); ③

var readTx = await readSession.ReadTransactionAsync(async tx =>
{
    	var cursor = await tx.RunAsync("MATCH (b:Book) RETURN b.name"); ④
    	var record = await cursor.SingleAsync();
    	return record["name"].As<string>();
});

Console.WriteLine(readTx); // "The Stand"

这里

  1. 写入数据库以创建一本名为“The Stand”的书
  2. 从写入会话获取书签
  3. 将该书签传递到新的读取会话
  4. Neo4j 确保读取与写入在因果上是一致的

如果您在多个会话之间来回切换,手动管理书签可能会变得乏味。这就是 BookmarkManager 发挥作用的地方。它基本上是一个书签存储,驱动程序可以为您更新和重用它。截断前一个手动示例以显示相关更改将导致如下内容:

var bookmarkManager =
        GraphDatabase.BookmarkManagerFactory.NewBookmarkManager(); ①

// Session 1 writes Alice
await using var readSession = driver.AsyncSession(o => 
    o.WithDefaultAccessMode(AccessMode.Write)
     .WithBookmarkManager(bookmarkManager)); ②

// Run your write query using the ExecuteWrite transaction function

// Session 2 reads Alice, using the same bookmark manager
await using var readSession = driver.AsyncSession(o => 
    o.WithDefaultAccessMode(AccessMode.Read)
     .WithBookmarkManager(bookmarkManager)); ②

// Run your read query using the ExecuteRead transaction function
  1. 创建我们想要用于因果一致性的书签管理器。您还可以从驱动程序获取 IBookmarkManager,用于它在内部为 ExecutableQuery 使用的实例。通过这种方式,您可以将会话和事务函数与可执行查询混合使用,并仍然确保因果一致性。
  2. 将会话配置为使用书签管理器。

注意到我们没有手动传递任何书签吗?BookmarkManager 确保了两个会话都保持因果一致。

最后

我们希望这篇相当长的教程为您理解 Neo4j .NET 驱动程序的功能及其提供的接口奠定了坚实的基础。如需进一步阅读和了解更多信息,您可能会发现以下链接很有用:

分享文章