使用响应式流控制结果流

推荐已经在采用响应式编程风格,且有特定需求必须通过响应式工作流来解决的应用程序使用响应式 API。对于所有其他情况,建议使用 同步 (sync)异步 (async) API。异步 API 在 Java 21+ 的虚拟线程上表现良好。

在响应式流中,消费者决定其从查询中获取记录的速率,而驱动程序则相应地管理从服务器请求记录的速率。

一个示例用例是:应用程序从 Neo4j 服务器获取记录,并对每条记录进行一些非常耗时的后处理。如果允许服务器在有记录可用时立即将其推送到客户端,客户端可能会在处理滞后时被大量条目淹没。响应式 API 可确保接收方不会被迫缓冲任意数量的数据。

驱动程序提供了两种响应式功能实现

本页示例使用 org.neo4j.driver.reactivestreams

安装依赖项

要使用响应式功能,您需要先将相关依赖项添加到项目中(请参阅 Reactor → 参考 → 获取 Reactor)。

  1. 将 Reactor 的 BOM 添加到 pom.xmldependencyManagement 部分。请注意,这是在常规 dependencies 部分之外的补充。如果您的 pom 中已经存在 dependencyManagement 部分,则只需添加其中的内容。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2023.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
  2. reactor-core 依赖项添加到 dependencies 部分。请注意,这里省略了版本标签(它将从 Reactor 的 BOM 中获取)。

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>

响应式查询示例

驱动程序的基本概念与同步情况相同,但查询是通过 ReactiveSession 运行的,且与查询相关的对象都有对应的响应式版本及前缀。

使用响应式会话的托管事务

托管事务 .executeRead() 示例
package demo;

import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;

public class App {

    public static void main(String... args) {
        final String dbUri = "<database-uri>";
        final String dbUser = "<username>";
        final String dbPassword = "<password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            Flux<Record> records = Flux.usingWhen(  (1)
                Mono.just(driver.session(  (2)
                    ReactiveSession.class,  (3)
                    SessionConfig.builder().withDatabase("<database-name>").build()
                )),
                rxSession -> Mono.fromDirect(rxSession.executeRead(  (4)
                    tx -> Mono
                        .fromDirect(tx.run("UNWIND range (1, 5) AS x RETURN x"))  (5)
                        .flatMapMany(ReactiveResult::records)  (6)
                )),
                ReactiveSession::close  (7)
            );

            // block for demonstration purposes
            List<Value> values = records.map(record -> record.get("x")).collectList().block();  (8)
            System.out.println(values);
        }
    }
}
1 Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction) 用于创建新会话、使用该会话运行查询,并最终将其关闭。它确保资源在需要的时间内保持活动状态,并允许指定最后要执行的清理操作。有关此模式的更多信息,请阅读 始终延迟创建会话
2 .usingWhen() 接收一个 Publisher 形式的资源提供程序,这就是为什么会话创建被包装在 Mono.just() 调用中的原因,该调用会从任何值生成一个 Mono
3 会话创建与异步情况类似,且 相同的配置方法 适用。不同之处在于第一个参数必须是 ReactiveSession.class,并且返回值是一个 ReactiveSession 对象。
4 方法 ReactiveSession.executeRead() 运行只读事务并返回包含被调用者返回值的 PublisherMono.fromDirect() 将其转换为 Mono
5 方法 tx.run() 返回一个 Publisher<ReactiveResult>Mono.fromDirect() 将其转换为 Mono
6 在返回最终结果之前,Mono.flatMapMany() 从结果中检索记录并将其作为新的 Flux 返回。
7 最终的清理工作将关闭会话。
8 为了展示响应式工作流的结果,.block() 会等待流完成,以便打印出值。在实际应用程序中,您不会阻塞,而是将记录发布者转发给您选择的框架,该框架会以有意义的方式处理它们。
您可以在 workerClosure 中通过多次调用 executeRead/Write() 在同一个响应式会话中运行多个查询。

使用响应式会话的隐式事务

以下示例与前一个示例非常相似,区别在于它使用了 隐式事务

隐式事务 .run() 示例
package demo;

import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;

public class App {

    public static void main(String... args) {
        final String dbUri = "<database-uri>";
        final String dbUser = "<username>";
        final String dbPassword = "<password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            Flux<Record> records = Flux.usingWhen(
                Mono.just(driver.session(
                    ReactiveSession.class,
                    SessionConfig.builder().withDatabase("<database-name>").build()
                )),
                rxSession -> Mono
                    .fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"))
                    .flatMapMany(ReactiveResult::records),
                ReactiveSession::close
            );

            // block for demonstration purposes
            List<Value> values = records.map(record -> record.get("x")).collectList().block();
            System.out.println(values);
        }
    }
}

始终延迟创建会话

在响应式编程中,除非有订阅者附加到 Publisher,否则它不会激活:Publisher 只是异步过程的抽象描述,只有订阅动作才会触发整个链条中的数据流。

因此,请注意将会话的创建/销毁作为此链的一部分,不要将会话的创建与查询 Publisher 链分开。否则可能会导致大量会话处于打开状态,它们既没有在执行工作,又在等待 Publisher 使用它们,这可能会耗尽应用程序可用的会话数。前面的示例使用 Flux.usingWhen() 来解决此问题。

不良做法 — 创建了会话但无人使用
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Mono<ReactiveResult> rxResult = Mono.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"));
// until somebody subscribes to `rxResult`, the Publisher doesn't materialize, but the session is busy!

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 executeReadexecuteWrite 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

Driver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。