运行您自己的事务

使用 executableQuery() 查询数据库时,驱动程序会自动创建一个事务。事务是一个工作单元,要么被提交(全部完成),要么在失败时被回滚。您可以在单个查询中包含多个 Cypher 语句,例如按顺序使用 MATCHCREATE更新数据库,但您不能在多个查询之间交替执行客户端逻辑。

对于这些更高级的用例,驱动程序提供了手动控制事务的函数。最常见的形式是托管事务,您可以将其视为一种拆解 executableQuery() 流程的方式,从而能够在更多地方指定所需的行为。

创建会话

在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间的查询通道,并确保强制执行因果一致性

会话通过 Driver.session() 方法创建。使用可选参数来更改会话的配置,例如目标数据库。有关更多配置参数,请参阅会话配置

// import org.neo4j.driver.SessionConfig

try (var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build())) {
    // session usage
}

创建会话是一个轻量级操作,因此创建和销毁会话的成本不高。完成工作后,请务必关闭会话

会话不是线程安全的:您可以在线程之间共享主要的 Driver 对象,但每个线程都应该创建自己的会话。

运行托管事务

事务可以包含多个查询。由于 Neo4j 符合 ACID 标准,事务内的查询要么作为一个整体执行,要么完全不执行:您不会遇到事务一部分成功而另一部分失败的情况。使用事务将协同工作以实现单个逻辑数据库操作的相关查询分组在一起。

您可以使用 Session.executeRead()Session.executeWrite() 方法创建托管事务,具体取决于您是要从数据库检索数据还是更改数据。这两个方法都接收一个事务函数回调,负责实际执行查询并处理结果。

检索名字以 Al 开头的人员。
// import java.util.Map
// import org.neo4j.driver.SessionConfig

try (var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build())) {  (1)
    var people = session.executeRead(tx -> {  (2)
        var result = tx.run("""
        MATCH (p:Person) WHERE p.name STARTS WITH $filter  (3)
        RETURN p.name AS name ORDER BY name
        """, Map.of("filter", "Al"));
        return result.list();  // return a list of Record objects (4)
    });
    people.forEach(person -> {
        System.out.println(person);
    });
    // further tx.run() calls will execute within the same transaction
}
1 创建一个会话。单个会话可以是多个查询的容器。除非将其作为资源使用 try 结构创建,否则请记住在完成后关闭它。
2 .executeRead()(或 .executeWrite())方法是事务的入口点。它接收一个事务函数的回调,该函数负责运行查询。
3 使用 tx.run() 方法执行查询。您可以提供一个查询参数映射作为第二个参数。每次查询运行都会返回一个 Result 对象。
4 处理结果,使用 Result 上的任何方法。.list() 方法将所有记录检索到一个列表中。

不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数

事务函数永远不应直接返回 Result 对象。相反,请始终以某种方式处理结果。在事务函数内,return 语句会导致事务被提交,而如果引发异常,事务则会自动回滚。

包含多个查询、客户端逻辑和潜在回滚的事务
package demo;

import java.util.Map;
import java.util.List;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.Record;
import org.neo4j.driver.RoutingControl;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionContext;
import org.neo4j.driver.exceptions.NoSuchRecordException;

public class App {

    // Create & employ 100 people to 10 different organizations
    public static void main(String... args) {

        final String dbUri = "<database-uri>";
        final String dbUser = "<username>";
        final String dbPassword = "<password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            try (var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build())) {
                for (int i=0; i<100; i++) {
                    String name = String.format("Thor%d", i);

                    try {
                        String orgId = session.executeWrite(tx -> employPersonTx(tx, name));
                        System.out.printf("User %s added to organization %s.%n", name, orgId);
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    }
                }
            }
        }
    }

    static String employPersonTx(TransactionContext tx, String name) {
        final int employeeThreshold = 10;

        // Create new Person node with given name, if not exists already
        tx.run("MERGE (p:Person {name: $name})", Map.of("name", name));

        // Obtain most recent organization ID and the number of people linked to it
        var result = tx.run("""
            MATCH (o:Organization)
            RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
            ORDER BY o.createdDate DESC
            LIMIT 1
            """);

        Record org = null;
        String orgId = null;
        int employeesN = 0;
        try {
            org = result.single();
            orgId = org.get("id").asString();
            employeesN = org.get("employeesN").asInt();
        } catch (NoSuchRecordException e) {
            // The query is guaranteed to return <= 1 results, so if.single() throws, it means there's none.
            // If no organization exists, create one and add Person to it
            orgId = createOrganization(tx);
            System.out.printf("No orgs available, created %s.%n", orgId);
        }

        // If org does not have too many employees, add this Person to it
        if (employeesN < employeeThreshold) {
            addPersonToOrganization(tx, name, orgId);
            // If the above throws, the transaction will roll back
            // -> not even Person is created!

        // Otherwise, create a new Organization and link Person to it
        } else {
            orgId = createOrganization(tx);
            System.out.printf("Latest org is full, created %s.%n", orgId);
            addPersonToOrganization(tx, name, orgId);
            // If any of the above throws, the transaction will roll back
            // -> not even Person is created!
        }

        return orgId;  // Organization ID to which the new Person ends up in
    }

    static String createOrganization(TransactionContext tx) {
        var result = tx.run("""
            CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
            RETURN o.id AS id
        """);
        var org = result.single();
        var orgId = org.get("id").asString();
        return orgId;
    }

    static void addPersonToOrganization(TransactionContext tx, String personName, String orgId) {
        tx.run("""
            MATCH (o:Organization {id: $orgId})
            MATCH (p:Person {name: $name})
            MERGE (p)-[:WORKS_FOR]->(o)
            """, Map.of("orgId", orgId, "name", personName)
        );
    }
}

如果失败被视为瞬态的(例如由于临时服务器不可用),驱动程序会自动重试运行失败的事务。如果操作在配置的最大重试时间后仍然失败,则会引发异常。

由于事务可能会被重新运行,事务函数在多次运行时应产生相同的影响(幂等性,因为您无法预先知道它们会被执行多少次。实际上,这意味着例如您不应该编辑或依赖全局变量。请注意,尽管事务函数可能会被执行多次,但其中的数据库查询将始终只运行一次。

一个会话可以链接多个事务,但在任何给定时间,一个会话内只能激活一个事务。要维护多个并发事务,请使用多个并发会话。

传递给 .executeRead().executeWrite() 的事务函数回调可以返回任何内容,因为返回类型是 Java 泛型。这也意味着它们不能返回 void,因为那不是 Object 的实例。
如果您对从事务函数中返回任何内容不感兴趣,您可以:

  1. 使用 .executeWriteWithoutResult(),它支持返回 void(且仅限 void);

  2. 使用 .executeRead()/.executeWrite() 并在事务函数中返回 null

运行显式事务

您可以通过使用 Session.beginTransaction() 方法手动开启事务来获得对事务的完全控制,该方法返回一个 Transaction 对象。然后,您可以使用 Transaction.run() 方法在显式事务内运行查询。

try (var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build())) {
    try (Transaction tx = session.beginTransaction()) {
        // use tx.run() to run queries
        //     tx.commit() to commit the transaction
        //     tx.rollback() to rollback the transaction
    }
}

显式事务可以通过 Transaction.commit() 提交,或通过 Transaction.rollback() 回滚。如果没有采取显式操作,驱动程序将在其生命周期结束时自动回滚事务。

因瞬态服务器错误而失败的 tx.run() 查询可以在无需更改原始请求的情况下进行重试。实现 RetryableException 的异常意味着再次尝试导致该异常的操作可能会成功。

显式事务对于需要将 Cypher 执行分布在同一个事务的多个函数中,或者需要在一个事务内运行多个查询但不需要托管事务提供的自动重试的应用程序最有用。

涉及外部 API 的显式事务示例
package demo;

import java.util.Map;
import java.util.List;
import java.util.Arrays;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Transaction;

public class App {

    public static void main(String... args) {
        final String dbUri = "<database-uri>";
        final String dbUser = "<username>";
        final String dbPassword = "<password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            String customerId = createCustomer(driver);
            int otherBankId = 42;
            transferToOtherBank(driver, customerId, otherBankId, 999);
        }
    }

    static String createCustomer(Driver driver) {
        var result = driver.executableQuery("""
            MERGE (c:Customer {id: randomUUID(), balance: 1000})
            RETURN c.id AS id
            """)
            .withConfig(QueryConfig.builder().withDatabase("<database-name>").build())
            .execute();
        return result.records().get(0).get("id").asString();
    }

    static void transferToOtherBank(Driver driver, String customerId, int otherBankId, float amount) {
        try (var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build())) {
            try (var tx = session.beginTransaction()) {
                if (! customerBalanceCheck(tx, customerId, amount)) {
                    System.out.printf("Customer %s doesn't have enough funds.%n", customerId);
                    return;  // give up
                }

                otherBankTransferApi(customerId, otherBankId, amount);
                // Now the money has been transferred => can't rollback anymore
                // (cannot rollback external services interactions)

                try {
                    decreaseCustomerBalance(tx, customerId, amount);
                    tx.commit();
                    System.out.printf("Transferred %f to %s.%n", amount, customerId);
                } catch (Exception e) {
                    requestInspection(customerId, otherBankId, amount, e);
                    throw new RuntimeException(e.getMessage());
                }
            }
        }
    }

    static boolean customerBalanceCheck(Transaction tx, String customerId, float amount) {
        var result = tx.run("""
            MATCH (c:Customer {id: $id})
            RETURN c.balance >= $amount AS sufficient
            """, Map.of("id", customerId, "amount", amount));
        var record = result.single();
        return record.get("sufficient").asBoolean();
    }

    static void otherBankTransferApi(String customerId, int otherBankId, float amount) {
        // make some API call to other bank
    }

    static void decreaseCustomerBalance(Transaction tx, String customerId, float amount) {
        tx.run("""
            MATCH (c:Customer {id: $id})
            SET c.balance = c.balance - $amount
            """, Map.of("id", customerId, "amount", amount));
    }

    static void requestInspection(String customerId, int otherBankId, float amount, Exception e) {
        // manual cleanup required; log this or similar
        System.out.printf("WARNING: transaction rolled back due to exception: %s.%n", e.getMessage());
        System.out.printf("customerId: %s, otherBankId: %d, amount: %f.%n", customerId, otherBankId, amount);
    }
}

处理查询结果

驱动程序的查询输出是一个 Result 对象,它将 Cypher 结果封装在一种丰富的数据结构中,该结构需要在客户端进行一些解析。有两个主要点需要注意:

  • 结果记录不是立即且全部由服务器获取并返回的。相反,结果以延迟流的形式出现。具体来说,当驱动程序从服务器接收到一些记录时,它们最初在后台队列中进行缓冲。记录会保留在缓冲区中,直到被应用程序消费,此时它们会从缓冲区中移除。当没有更多可用记录时,结果即耗尽

  • 结果充当游标。这意味着除非您将其保存在辅助数据结构中,否则无法从流中检索先前的记录。

下方的动画演示了单个查询的路径:它展示了驱动程序如何处理结果记录,以及应用程序应如何处理结果。

处理结果最简单的方法是调用 .list(),它会生成一个 Record 对象列表。否则,Result 对象实现了许多处理记录的方法。下面列出了最常用的方法。

方法 描述

list() List<Record>

以列表形式返回结果的剩余部分。

single() Record

返回下一条且仅剩的一条记录。调用此方法总是会耗尽结果。如果可用记录多于(或少于)一条,则会引发 NoSuchRecordException

next() Record

返回结果中的下一条记录。如果没有更多可用记录,则抛出 NoSuchRecordException

hasNext() boolean

结果迭代器是否有下一条记录可移动到。

peek() Record

返回结果中的下一条记录而不消费它。这会将记录留在缓冲区中以便进一步处理。

consume() ResultSummary

返回查询结果摘要。它会耗尽结果,因此应仅在数据处理结束后调用。

有关 Result 方法的完整列表,请参阅 API 文档 → Result

Record 对象内部的属性嵌入在 Value 对象中。要提取并将它们转换为相应的 Java 类型,请使用 .as<type>()(例如 .asString()asInt() 等)。例如,如果来自数据库的 name 属性是字符串,则 record.get("name").asString() 将把属性值作为 String 对象返回。

更多信息,请参阅 数据类型及其与 Cypher 类型的映射

会话配置

数据库选择

始终通过 .withDatabase("<database-name>") 方法明确指定数据库,即使在单数据库实例上也应如此。这允许驱动程序更高效地工作,因为它节省了到服务器以解析主数据库的网络往返。如果没有指定数据库,则使用 Neo4j 实例设置中配置的默认数据库

// import org.neo4j.driver.SessionConfig;

var session = driver.session(SessionConfig.builder()
    .withDatabase("<database-name>").build());
通过配置方法指定数据库比使用 USE Cypher 子句更受推荐。如果服务器在集群上运行,带有 USE 的查询需要启用服务端路由。查询执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要路由到包含所请求数据库的成员。

请求路由

在集群环境中,所有会话默认以写模式打开,并将它们路由到主节点(Leader)。您可以调用 .withRouting(RoutingControl.READ) 方法来更改此设置。请注意,.executeRead().executeWrite() 会自动覆盖会话的默认访问模式。

// import org.neo4j.driver.SessionConfig;
// import org.neo4j.driver.AccessMode;

var session = driver.session(SessionConfig.builder()
    .withDatabase("<database-name>")
    .withDefaultAccessMode(AccessMode.READ)
    .build());

尽管在读模式下执行查询会导致运行时错误,但您不应依赖此进行访问控制。这两种模式的区别在于事务被路由到集群的任何节点,而事务则被定向到主节点。没有任何安全保证写查询在读模式下提交时会被拒绝。

对于 .executeRead().executeWrite() 方法,上述说明同样适用。

以其他用户身份运行查询

您可以通过在会话创建时提供 AuthToken 作为第三个参数来通过不同的用户执行查询。在会话级别切换用户比创建新的 Driver 对象成本更低。查询将在给定用户的安全上下文(即主数据库、权限等)中运行。

// import org.neo4j.driver.AuthTokens;
// import org.neo4j.driver.Session;
// import org.neo4j.driver.SessionConfig;

var authToken = AuthTokens.basic("<username>", "<password>");
var session = driver.session(
    Session.class,
    SessionConfig.builder()
        .withDatabase("<database-name>")
        .build(),
    authToken
);

.withImpersonatedUser() 方法提供了类似的功能。区别在于您不需要知道用户的密码即可模拟他们,但创建 Driver 时所使用的用户需要具有适当的权限

// import org.neo4j.driver.SessionConfig;

var session = driver.session(SessionConfig.builder()
    .withDatabase("<database-name>")
    .withImpersonatedUser("<username>")
    .build());

事务配置

您可以通过提供 TransactionConfig 对象作为 .executeRead().executeWrite().beginTransaction() 的(可选)第二个参数来对事务进行进一步控制。使用它来指定:

  • 事务超时。运行时间过长的事务将被服务器终止。默认值在服务器端设置。最小值为 1 毫秒。

  • 附加到事务的元数据映射。这些元数据会记录在服务器的 query.log 中,并且在 SHOW TRANSACTIONS Cypher 命令的输出中可见。使用此功能标记事务。

// import java.time.Duration
// import org.neo4j.driver.SessionConfig
// import org.neo4j.driver.TransactionConfig

try (var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build())) {
    var people = session.executeRead(tx -> {
        var result = tx.run("MATCH (p:Person) RETURN p");
        return result.list();  // return a list of Record objects
    }, TransactionConfig.builder()
        .withTimeout(Duration.ofSeconds(5))
        .withMetadata(Map.of("appName", "peopleTracker"))
        .build()
    );
    people.forEach(person -> System.out.println(person));
}

关闭会话

每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会用尽会话。因此,建议使用 try-with-resources 语句创建会话,当应用程序使用完毕时,它会自动关闭会话。当会话关闭时,它会被归还到连接池以供稍后重用。

如果您没有使用 try 将会话作为资源打开,请记住在用完后调用 .close() 方法。

var session = driver.session(SessionConfig.builder().withDatabase("<database-name>").build());

// session usage

session.close();

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 executeReadexecuteWrite 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

Driver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。