协调并行事务

在使用 Neo4j 集群时,因果一致性 (causal consistency) 在大多数情况下是默认强制执行的,这保证了查询能够读取到之前查询所做的更改。然而,对于并行运行的多个事务,默认情况下并不会自动实现这一点。在这种情况下,你可以使用书签 (bookmarks),让一个事务在运行自己的工作之前,等待另一个事务的结果在集群中传播完毕。这不是必需的,并且你仅应在需要跨不同事务实现因果一致性时才使用书签,因为等待书签可能会对性能产生负面影响。

使用 .execute_query() 的书签

使用 .execute_query() 查询数据库时,驱动程序会为你管理书签。在这种情况下,你可以保证后续查询能够读取到之前的更改,而无需采取进一步操作。

driver.execute_query("<QUERY 1>")

# subsequent .execute_query() calls will be causally chained

driver.execute_query("<QUERY 2>") # can read result of <QUERY 1>
driver.execute_query("<QUERY 3>") # can read result of <QUERY 2>

若要禁用书签管理和因果一致性,请在 .execute_query() 调用中设置 bookmark_manager_=None

driver.execute_query(
    "<QUERY>",
    bookmark_manager_=None,
)

单个会话内的书签

书签管理对于在单个会话中运行的查询是自动进行的:同一会话内的查询在因果上是链接在一起的。

with driver.session() as session:
    session.execute_write(lambda tx: tx.run("<QUERY 1>"))
    session.execute_write(lambda tx: tx.run("<QUERY 2>"))  # can read QUERY 1
    session.execute_write(lambda tx: tx.run("<QUERY 3>"))  # can read QUERY 1,2

跨多个会话的书签

如果你的应用程序使用多个会话,你可能需要确保一个会话在另一个会话运行其查询之前已完成其所有事务。

在下面的示例中,session_asession_b 可以并发运行,而 session_c 会等待直到它们的结果传播完毕。这保证了 session_c 想要操作的 Person 节点确实存在。

使用书签协调多个会话
from neo4j import GraphDatabase, Bookmarks


URI = "<database-uri>"
AUTH = ("<username>", "<password>")

def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()
        create_some_friends(driver)


def create_some_friends(driver):
    saved_bookmarks = Bookmarks()  # To collect the sessions' bookmarks

    # Create the first person and employment relationship
    with driver.session(database="<database-name>") as session_a:
        session_a.execute_write(create_person, "Alice")
        session_a.execute_write(employ, "Alice", "Wayne Enterprises")
        saved_bookmarks += session_a.last_bookmarks()  (1)

    # Create the second person and employment relationship
    with driver.session(database="<database-name>") as session_b:
        session_b.execute_write(create_person, "Bob")
        session_b.execute_write(employ, "Bob", "LexCorp")
        saved_bookmarks += session_b.last_bookmarks()  (1)

    # Create a friendship between the two people created above
    with driver.session(
        database="<database-name>", bookmarks=saved_bookmarks
    ) as session_c:  (2)
        session_c.execute_write(create_friendship, "Alice", "Bob")
        session_c.execute_read(print_friendships)


# Create a person node
def create_person(tx, name):
    tx.run("MERGE (:Person {name: $name})", name=name)


# Create an employment relationship to a pre-existing company node
# This relies on the person first having been created.
def employ(tx, person_name, company_name):
    tx.run("""
        MATCH (person:Person {name: $person_name})
        MATCH (company:Company {name: $company_name})
        CREATE (person)-[:WORKS_FOR]->(company)
        """, person_name=person_name, company_name=company_name
    )


# Create a friendship between two people
def create_friendship(tx, name_a, name_b):
    tx.run("""
        MATCH (a:Person {name: $name_a})
        MATCH (b:Person {name: $name_b})
        MERGE (a)-[:KNOWS]->(b)
        """, name_a=name_a, name_b=name_b
    )


# Retrieve and display all friendships
def print_friendships(tx):
    result = tx.run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name")
    for record in result:
        print("{} knows {}".format(record["a.name"], record["b.name"]))


if __name__ == "__main__":
    main()
1 使用 Session.last_bookmarks() 从不同会话中收集并合并书签,并将它们存储在 Bookmarks 对象中。
2 使用 bookmarks 参数将它们用于初始化另一个会话。

driver passing bookmarks

使用书签可能会对性能产生负面影响,因为查询被迫等待最新的更改在集群中传播。如果可能,请尝试将查询组合在单个事务或单个会话中。

混合使用 .execute_query() 和会话

为了确保部分使用 .execute_query() 执行而部分使用会话执行的事务之间具有因果一致性,请使用 bookmark_manager=driver.execute_query_bookmark_manager 创建会话。由于这是 .execute_query() 调用的默认书签管理器,这将确保所有工作都在同一个书签管理器下执行,从而实现因果一致性。

driver.execute_query("<QUERY 1>")

with driver.session(
    bookmark_manager=driver.execute_query_bookmark_manager
) as session:
    # every query inside this session will be causally chained
    # (i.e., can read what was written by <QUERY 1>)
    session.execute_write(lambda tx: tx.run("<QUERY 2>"))

# subsequent execute_query calls will be causally chained
# (i.e., can read what was written by <QUERY 2>)
driver.execute_query("<QUERY 3>")

实现自定义 BookmarkManager

书签管理器 (bookmark manager) 是驱动程序使用的一个接口,用于跟踪书签并自动保持会话一致性。

你可以继承 BookmarkManager 接口来实现自定义书签管理器,或者使用驱动程序通过 GraphDatabase.bookmark_manager() 提供的默认实现。在实现书签管理器时,请记住所有方法都必须是并发安全的。

有关该接口的详细信息,请参阅 API 文档

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 execute_readexecute_write 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

一个 Driver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。