使用响应式流控制结果流
在响应式流中,消费者决定从查询中消费记录的速率,驱动程序则相应地管理向服务器请求记录的速率。
一个示例用例是应用程序从 Neo4j 服务器获取记录,并对每条记录进行非常耗时的后处理。如果服务器被允许在记录可用时立即推送给客户端,客户端可能会因大量条目而被淹没,而其处理仍落后。Reactive API 确保接收端不必被迫缓冲任意数量的数据。
|
建议在已经采用响应式编程风格的应用程序中使用 Reactive API,并且这些应用有只有响应式工作流才能满足的需求。 |
安装依赖项
要使用响应式特性,首先需要将相关依赖项添加到项目中。驱动的响应式实现位于 Neo4j.Driver.Reactive NuGet 包中(尽管符号定义在常规的 Neo4j.Driver 命名空间中)。
dotnet add package Neo4j.Driver.Reactive
响应式查询示例
基本驱动的概念与同步情况相同,只是查询通过 RxSession 执行,并且与查询相关的对象拥有响应式对应物和前缀。
使用响应式会话的受管事务
.ExecuteRead() 示例using Neo4j.Driver;
const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";
await using var driver = GraphDatabase.Driver(
dbUri,
AuthTokens.Basic(dbUser, dbPassword));
var rxSession = driver.RxSession(SessionConfigBuilder.ForDatabase("<database-name>"));
var observable = rxSession.ExecuteRead(
tx => {
return tx
.Run("UNWIND range (1, 5) AS x RETURN x")
.Records();
}
);
var tcs = new TaskCompletionSource();
observable.Subscribe(
i => {
Console.WriteLine(i.Get<string>("x"));
},
ex => {
Console.WriteLine($"Error Occurred: {ex.Message}");
tcs.SetResult();
},
() => {
Console.WriteLine("Finished receiving records");
tcs.SetResult();
}
);
await tcs.Task;
使用响应式会话的隐式事务
以下示例与前一个非常相似,只是它使用了 隐式事务。
.Run() 示例using Neo4j.Driver;
const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";
await using var driver = GraphDatabase.Driver(
dbUri,
AuthTokens.Basic(dbUser, dbPassword));
var rxSession = driver.RxSession(SessionConfigBuilder.ForDatabase("<database-name>"));
var observable = rxSession
.Run("UNWIND range (1, 5) AS x RETURN x")
.Records();
var tcs = new TaskCompletionSource();
observable.Subscribe(
i => {
Console.WriteLine(i.Get<string>("x"));
},
ex => {
Console.WriteLine($"Error Occurred: {ex.Message}");
tcs.SetResult();
},
() => {
Console.WriteLine("Finished receiving records");
tcs.SetResult();
}
);
await tcs.Task;
始终延迟会话创建
请记住,在响应式编程中,Publisher 在订阅者附加之前不会真正启动。Publisher 只是对异步过程的抽象描述,只有订阅行为才会触发整个链条中的数据流动。
因此,请始终注意将会话创建/销毁纳入此链,而不是在查询 Publisher 链之外单独创建会话。这样做可能导致大量打开的会话却不执行任何工作,且都在等待 Publisher 使用,进而耗尽应用可用的会话数量。
术语表
- LTS (长期支持版)
-
长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。
- Bolt
-
Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。
- 最终一致性
-
如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。
- 因果一致性
-
如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用
null。 - 事务
-
事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。
- 背压
-
背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。
- 书签
-
书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。
- 事务函数
-
事务函数是由
.ExecuteReadAsync()或.ExecuteWriteAsync()调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。 - IDriver
-
IDriver对象保存了与 Neo4j 数据库建立连接所需的详细信息。