运行您自己的事务

使用 execute_query() 查询数据库时,驱动程序会自动创建一个事务。事务是一个工作单元,要么全部提交,要么在失败时回滚。您可以在单个查询中包含多个 Cypher 语句,例如在 更新数据库 时顺序使用 MATCHCREATE,但不能在多个查询之间穿插客户端逻辑。

对于这些更高级的用例,驱动程序提供了手动控制事务的函数。最常见的形式是托管事务,您可以将其视为一种拆解 .execute_query() 流程并能够在更多地方指定其预期行为的方式。

创建会话

在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间的查询通道,并确保强制执行因果一致性

会话通过 Driver.session() 方法创建,其中关键字参数 database 允许指定目标数据库。有关更多参数,请参阅会话配置

with driver.session(database="<database-name>") as session:
    ...

创建会话是一个轻量级操作,因此创建和销毁会话的成本不高。完成工作后,请务必关闭会话

会话不是线程安全的:您可以在线程之间共享主要的 Driver 对象,但每个线程都应该创建自己的会话。

运行托管事务

事务可以包含多个查询。由于 Neo4j 符合 ACID 标准,事务内的查询要么作为一个整体执行,要么完全不执行:您不会遇到事务一部分成功而另一部分失败的情况。使用事务将协同工作以实现单个逻辑数据库操作的相关查询分组在一起。

您可以使用 Session.execute_read()Session.execute_write() 方法创建托管事务,具体取决于您是要从数据库检索数据还是更改数据。这两个方法都采用事务函数回调,该回调负责执行查询并处理结果。

检索名字以 Al 开头的人员。
def match_person_nodes(tx, name_filter): (3)
    result = tx.run(""" (4)
        MATCH (p:Person) WHERE p.name STARTS WITH $filter
        RETURN p.name AS name ORDER BY name
        """, filter=name_filter)
    return list(result)  # a list of Record objects (5)

with driver.session(database="<database-name>") as session:  (1)
    people = session.execute_read(  (2)
        match_person_nodes,
        "Al",
    )
    for person in people:
        print(person.data())  # obtain dict representation
1 创建一个会话。单个会话可以作为多个查询的容器。除非使用 with 结构创建,否则请记得在使用完毕后将其关闭。
2 .execute_read()(或 .execute_write())方法是进入事务的入口点。它接收一个事务函数回调,以及任意数量的传递给该事务函数的位置参数和关键字参数。
3 事务函数回调负责运行查询。
4 使用 Transaction.run() 方法运行查询。每次运行查询都会返回一个 Result 对象。
5 使用 Result 上的任何方法处理结果

不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数

事务函数绝不应直接返回 Result 对象。相反,请始终以某种方式处理结果;至少将其转换为列表。在事务函数内,return 语句会导致事务提交,而如果引发异常,事务则会自动回滚。

包含多个查询、客户端逻辑和潜在回滚的事务
from neo4j import GraphDatabase


URI = "<database-uri>"
AUTH = ("<username>", "<password>")
employee_threshold=10


def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        with driver.session(database="<database-name>") as session:
            for i in range(100):
                name = f"Thor{i}"
                org_id = session.execute_write(employ_person_tx, name)
                print(f"User {name} added to organization {org_id}")


def employ_person_tx(tx, name):
    # Create new Person node with given name, if not exists already
    result = tx.run("""
        MERGE (p:Person {name: $name})
        RETURN p.name AS name
        """, name=name
    )

    # Obtain most recent organization ID and the number of people linked to it
    result = tx.run("""
        MATCH (o:Organization)
        RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employees_n
        ORDER BY o.created_date DESC
        LIMIT 1
    """)
    org = result.single()

    if org is not None and org["employees_n"] == 0:
        raise Exception("Most recent organization is empty.")
        # Transaction will roll back -> not even Person is created!

    # If org does not have too many employees, add this Person to that
    if org is not None and org.get("employees_n") < employee_threshold:
        result = tx.run("""
            MATCH (o:Organization {id: $org_id})
            MATCH (p:Person {name: $name})
            MERGE (p)-[r:WORKS_FOR]->(o)
            RETURN $org_id AS id
            """, org_id=org["id"], name=name
        )

    # Otherwise, create a new Organization and link Person to it
    else:
        result = tx.run("""
            MATCH (p:Person {name: $name})
            CREATE (o:Organization {id: randomuuid(), created_date: datetime()})
            MERGE (p)-[r:WORKS_FOR]->(o)
            RETURN o.id AS id
            """, name=name
        )

    # Return the Organization ID to which the new Person ends up in
    return result.single()["id"]


if __name__ == "__main__":
    main()

如果失败被视为瞬态的(例如由于服务器暂时不可用),驱动程序会自动重试运行失败的事务。如果操作在配置的最大重试时间后仍然失败,则会引发错误。

由于事务可能会被重新运行,事务函数必须是幂等的(即它们在多次运行时应产生相同的影响),因为您无法预先知道它们将被执行多少次。实际上,这意味着您不应编辑或依赖全局变量。请注意,尽管事务函数可能会执行多次,但其中的数据库查询始终只会运行一次。

一个会话可以链接多个事务,但在任何给定时间,一个会话内只能激活一个事务。要维护多个并发事务,请使用多个并发会话。

事务函数配置

装饰器 unit_of_work() 提供了对事务函数的进一步控制。它允许指定

  • 事务超时时间(以秒为单位)。运行时间更长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒 (0.001)。

  • 附加到事务的元数据字典。这些元数据会被记录在服务器的 query.log 中,并显示在 SHOW TRANSACTIONS Cypher 命令的输出中。使用此功能来标记事务。

from neo4j import unit_of_work

@unit_of_work(timeout=5, metadata={"app_name": "people_tracker"})
def count_people(tx):
    result = tx.run("MATCH (a:Person) RETURN count(a) AS people")
    record = result.single()
    return record["people"]


with driver.session(database="<database-name>") as session:
    people_n = session.execute_read(count_people)

运行显式事务

您可以通过使用 Session.begin_transaction() 方法手动开启一个事务,从而实现对事务的完全控制。然后,您可以使用 Transaction.run() 方法在显式事务中运行查询。

with driver.session(database="<database-name>") as session:
    with session.begin_transaction() as tx:
        # use tx.run() to run queries and tx.commit() when done
        tx.run("<QUERY 1>")
        tx.run("<QUERY 2>")

        tx.commit()

显式事务可以使用 Transaction.commit() 提交,或使用 Transaction.rollback() 回滚。如果没有采取显式操作,驱动程序将在事务生命周期结束时自动回滚事务。

由于瞬态服务器错误而失败的 tx.run() 查询可以在无需更改原始请求的情况下重试。您可以通过 Neo4jError.is_retryable() 方法了解错误是否为瞬态,该方法可以洞察进一步尝试是否可能会成功。

显式事务对于需要将 Cypher 执行分布在同一个事务的多个函数中,或者需要在一个事务内运行多个查询但不需要托管事务提供的自动重试的应用程序最有用。

涉及外部 API 的显式事务示例
import neo4j


URI = "<database-uri>"
AUTH = ("<username>", "<password>")


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        customer_id = create_customer(driver)
        other_bank_id = 42
        transfer_to_other_bank(driver, customer_id, other_bank_id, 999)


def create_customer(driver):
    result, _, _ = driver.execute_query("""
        MERGE (c:Customer {id: rand()})
        RETURN c.id AS id
    """, database_ = "<database-name>")
    return result[0]["id"]


def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
    with driver.session(database="<database-name>") as session:
        with session.begin_transaction() as tx:
            if not customer_balance_check(tx, customer_id, amount):
                # give up
                return

            other_bank_transfer_api(customer_id, other_bank_id, amount)
            # Now the money has been transferred => can't rollback anymore
            # (cannot rollback external services interactions)

            try:
                decrease_customer_balance(tx, customer_id, amount)
                tx.commit()
            except Exception as e:
                request_inspection(customer_id, other_bank_id, amount, e)
                raise  # roll back


def customer_balance_check(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        RETURN c.balance >= $amount AS sufficient
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    record = result.single(strict=True)
    return record["sufficient"]


def other_bank_transfer_api(customer_id, other_bank_id, amount):
    # make some API call to other bank
    pass


def decrease_customer_balance(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        SET c.balance = c.balance - $amount
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    result.consume()


def request_inspection(customer_id, other_bank_id, amount, e):
    # manual cleanup required; log this or similar
    print("WARNING: transaction rolled back due to exception:", repr(e))
    print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
          "amount:", amount)


if __name__ == "__main__":
    main()

处理查询结果

驱动程序的查询输出是一个 Result 对象,它将 Cypher 结果封装在一种需要在客户端进行解析的丰富数据结构中。有两个主要点需要注意

  • 结果记录不是立即且全部由服务器获取并返回的。相反,结果以延迟流的形式出现。具体来说,当驱动程序从服务器接收到一些记录时,它们最初在后台队列中进行缓冲。记录会保留在缓冲区中,直到被应用程序消费,此时它们会从缓冲区中移除。当没有更多可用记录时,结果即耗尽

  • 结果充当游标。这意味着除非您将其保存在辅助数据结构中,否则无法从流中检索先前的记录。

下方的动画演示了单个查询的路径:它展示了驱动程序如何处理结果记录,以及应用程序应如何处理结果。

处理结果最简单的方法是将其转换为列表,这会产生一个 Record 对象列表。否则,Result 对象实现了一些用于处理记录的方法。下面列出了最常用的方法。

名称 描述

value(key=0, default=None)

将结果的剩余部分作为列表返回。如果指定了 key,则仅包含给定的属性;default 允许为缺少该属性的节点指定值。

fetch(n)

从结果中返回最多 n 条记录。

single(strict=False)

返回下一条也是最后一条剩余的记录,或 None。调用此方法总是会耗尽结果。

如果可用记录多于(或少于)一条,

  • strict==False — 生成警告并返回其中第一条(如果有);

  • strict==True — 引发 ResultNotSingleError 错误。

peek()

在不消耗的情况下返回下一条记录。这会将记录留在缓冲区中以供进一步处理。

data(*keys)

返回一个类 JSON 的转储。仅用于调试/原型设计目的。

consume()

返回查询结果摘要。它会耗尽结果,因此应仅在数据处理结束后调用。

graph()

将结果转换为图形对象集合。请参阅转换为图形

to_df(expand, parse_dates)

将结果转换为 Pandas 数据框。请参阅转换为 Pandas 数据框

有关 Result 方法的完整列表,请参阅 API 文档 — Result

会话配置

数据库选择

始终通过 database 参数显式指定数据库,即使在单数据库实例中也是如此。这允许驱动程序更有效地工作,因为它节省了向服务器解析主数据库的网络往返。如果没有指定数据库,则使用用户的默认数据库

with driver.session(
    database="<database-name>"
) as session:
    ...
通过配置方法指定数据库比使用 USE Cypher 子句更受推荐。如果服务器在集群上运行,带有 USE 的查询需要启用服务端路由。查询执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要路由到包含所请求数据库的成员。

请求路由

在集群环境中,所有会话都以写入模式打开,并将它们路由到主节点 (Leader)。您可以通过将 default_access_mode 参数显式设置为 neo4j.READ_ACCESS 来更改此设置。请注意,.execute_read().execute_write() 会自动覆盖会话的默认访问模式。

import neo4j

with driver.session(
    database="<database-name>",
    default_access_mode=neo4j.READ_ACCESS
) as session:
    ...

尽管在读取模式下执行写入查询会导致运行时错误,但您不应依赖此功能进行访问控制。这两种模式的区别在于:读取事务将被路由到集群中的任何节点,而写入事务会被定向到主节点(primaries)。不能保证以读取模式提交的写入查询一定会遭到拒绝。

以其他用户身份运行查询

您可以通过参数 auth 以不同用户身份执行查询。在会话级别切换用户比创建新的 Driver 对象成本更低。查询随后将在给定用户的安全上下文中运行(即默认数据库、权限等)。

with driver.session(
    database="<database-name>",
    auth=("<username>", "<password>")
) as session:
    ...

参数 impersonated_user 提供了类似的功能。区别在于您无需知道用户的密码即可模拟他们,但创建 Driver 时所用的用户需要具有相应的权限

with driver.session(
    database="<database-name>",
    impersonated_user="<username>"
) as session:
    ...

关闭会话

每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会耗尽会话。因此,建议使用 with 语句创建会话,该语句会在应用程序处理完毕后自动关闭它们。当会话关闭时,它会被返回到连接池以供稍后重用。

如果您不使用 with,请记得在完成会话使用后调用 .close() 方法。

session = driver.session(database="<database-name>")

# session usage

session.close()

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 execute_readexecute_write 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

一个 Driver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。