性能建议

使用 Rust 扩展

Python 驱动程序的 Rust 扩展是一个替代的驱动程序包,与常规驱动程序相比,可带来 3 倍到 10 倍的速度提升。您可以通过 pip install neo4j-rust-ext 安装它,既可以与 neo4j 包并存,也可以替代它。在使用方面,两个驱动程序是完全一致的:本指南中的所有内容均适用于这两个包。

始终指定目标数据库

在所有查询中指定目标数据库,可以通过 Driver.execute_query() 中的 database_ 参数,或者在 创建新会话时使用 database 参数来实现。如果没有提供数据库,驱动程序必须向服务器发送额外的请求以确定默认数据库是什么。对于单个查询,这种开销微乎其微,但在数百个查询中会变得非常显著。

最佳实践

driver.execute_query("<QUERY>", database_="<database-name>")
driver.session(database="<database-name>")

不良实践

driver.execute_query("<QUERY>")
driver.session()

了解事务的成本

当通过 .execute_query().execute_read/write() 提交查询时,驱动程序会将它们包装在 事务 中。这种行为确保了无论事务执行过程中发生什么(断电、软件崩溃等),数据库始终处于一致的状态。作为进一步的稳健性层,驱动程序还会以指数退避策略重试失败的事务。

围绕查询创建安全的执行上下文会产生一定的开销,虽然很小,但随着事务数量的增加会累积。当每个查询都作为单独的事务发送时,如果一个事务失败并需要回滚,所有其他事务都不会受到影响。就故障而言,这是最安全的执行模式,但由于事务开销随查询数量线性增加,这也是最慢的模式。

每个查询作为一个单独的事务(低吞吐量)
for i in range(1000):
    driver.execute_query("<QUERY>", database_="<database-name>")
    # or session.execute_read/write() calls

一种性能更高的做法是将所有查询组合到一个事务中。这样,整个事务与其他事务隔离,但事务中的单个查询并不相互隔离,其中一个查询失败会导致所有查询回滚。

将查询组合为一个事务(高吞吐量)
def query(tx):
    for i in range(1000):
        tx.run("<QUERY>")

with driver.session(database="<database-name>") as session:
    people = session.execute_read(query)

一种更快的方法是跳过 .execute_read/write(),直接在会话上调用 .run()。这些查询作为自动提交事务运行,并且仍与其他并发查询隔离,但如果其中任何一个失败,它们将不会被重试。使用这种方法,您以牺牲一定的稳健性为代价换取了更高的吞吐量,因为查询会以服务器能够处理的最快速度发送过去。客户端规模的一个上限由连接池的大小决定:每次调用 .run() 都会借用一个连接,因此并行工作的数量受到可用连接数的限制。

查询作为自动提交事务(最高吞吐量)
with driver.session(database="<database-name>") as session:
    for i in range(1000):
        session.run("<QUERY>")

不要一次性获取大量结果集

提交可能导致大量记录的查询时,不要一次性检索它们。Neo4j 服务器可以分批检索记录,并在记录可用时将其流式传输到驱动程序。延迟加载结果可以分散网络流量和内存使用(客户端和服务器端均是如此)。

为了方便起见,.execute_query() 总是会一次性检索所有结果记录(这就是 EagerResultEager 的含义)。要延迟加载结果,您必须使用 .execute_read/write()(或其他手动处理 事务 的形式),并且在处理结果时不要Result 对象强制转换为 list;请改用迭代方式处理。

有关结果处理的更多信息,请参阅 事务 → 处理查询结果

示例 1. 预加载与延迟加载的比较
预加载 (Eager loading) 延迟加载 (Lazy loading)
  • 服务器必须从存储中读取所有记录,然后才能将第一条记录发送给驱动程序(即客户端接收第一条记录所需的时间更长)。

  • 在任何记录可供应用程序使用之前,驱动程序必须接收所有记录。

  • 客户端必须在内存中保存所有记录。

  • 服务器读取第一条记录并将其发送给驱动程序。

  • 一旦传输了第一条记录,应用程序就可以处理记录。

  • 剩余记录的等待时间和资源消耗推迟到应用程序请求更多记录时。

  • 服务器的获取时间可以用于客户端处理。

  • 资源消耗受驱动程序获取大小 (fetch size) 的限制。

预加载与延迟加载的时间和内存比较
import neo4j
from time import sleep, time
import tracemalloc



URI = "<database-uri>"
AUTH = ("<username>", "<password>")

# Returns 250 records, each with properties
# - `output` (an expensive computation, to slow down retrieval)
# - `dummyData` (a list of 10000 ints, about 8 KB).
slow_query = '''
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
       range(1, 10000) AS dummyData
'''
# Delay for each processed record
sleep_time = 0.5


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()

        start_time = time()
        log('LAZY LOADING (execute_read)')
        tracemalloc.start()
        lazy_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))

        start_time = time()
        log('EAGER LOADING (execute_query)')
        tracemalloc.start()
        eager_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))


def lazy_loading(driver):

    def process_records(tx):
        log('Submit query')
        result = tx.run(slow_query)

        for record in result:
            log(f'Processing record {int(record.get("output"))}')
            sleep(sleep_time)  # proxy for some expensive operation

    with driver.session(database='<database-name>') as session:
        processed_result = session.execute_read(process_records)


def eager_loading(driver):
    log('Submit query')
    records, _, _ = driver.execute_query(slow_query, database_='<database-name>')

    for record in records:
        log(f'Processing record {int(record.get("output"))}')
        sleep(sleep_time)  # proxy for some expensive operation


def log(msg):
    print(f'[{round(time(), 2)}] {msg}')


if __name__ == '__main__':
    main()
输出
[1718014256.98] LAZY LOADING (execute_read)
[1718014256.98] Submit query
[1718014256.21] Processing record 0  (1)
[1718014256.71] Processing record 1
[1718014257.21] Processing record 2
...
[1718014395.42] Processing record 249
[1718014395.92] Peak memory usage: 786254 bytes
[1718014395.92] --- 135.9284942150116 seconds ---

[1718014395.92] EAGER LOADING (execute_query)
[1718014395.92] Submit query
[1718014419.82] Processing record 0  (2)
[1718014420.33] Processing record 1
[1718014420.83] Processing record 2
...
[1718014544.52] Processing record 249
[1718014545.02] Peak memory usage: 89587150 bytes  (3)
[1718014545.02] --- 149.70468592643738 seconds ---  (4)
1 使用延迟加载,第一条记录可以更快地获取。
2 使用预加载,第一条记录在结果被消费时(即服务器检索完所有 250 条记录后)才可用。
3 使用预加载时内存使用量更大,因为应用程序会具体化一个包含 250 条记录的列表。
4 使用预加载时总运行时间更长,因为客户端必须等待直到收到最后一条记录;而使用延迟加载时,客户端可以在服务器获取后续记录的同时处理已有的记录。使用延迟加载,客户端还可以在满足某些条件后停止请求记录(通过在 Result 对象上调用 .consume()),从而节省时间和资源。

驱动程序的 获取大小 (fetch size) 会影响延迟加载的行为。它指示服务器流式传输等于获取大小数量的记录,然后等待客户端跟上,再检索并发送更多数据。

获取大小通常有助于限制客户端的内存消耗,特别是对于记录大小差异较小的结果集。如果单条记录非常大,驱动程序仍然需要为整个对象分配空间,因此即使获取大小较小,内存使用量也可能很大。

另一方面,获取大小并不总能限制服务器端的内存消耗:这取决于具体的查询。例如,包含 ORDER BY 的查询需要在记录流式传输到客户端之前将整个结果集加载到内存中进行排序。

获取大小越小,客户端和服务器之间需要交换的消息就越多。特别是如果服务器延迟较高,较小的获取大小可能会降低性能。

将读取查询路由到集群读取器

在集群中,将读取查询路由到任何读取器节点。您可以执行以下操作:

最佳实践

driver.execute_query("MATCH (p:Person) RETURN p", routing_="r")
session.execute_read(lambda tx: tx.run("MATCH (p:Person) RETURN p"))

不良实践

driver.execute_query("MATCH (p:Person) RETURN p")
# defaults to routing = writers
session.execute_write(lambda tx: tx.run("MATCH (p:Person) RETURN p"))
# don't ask to write on a read-only operation

创建索引

为经常进行过滤的属性创建索引。例如,如果您经常通过 name 属性查找 Person 节点,那么在 Person.name 上创建索引将非常有益。您可以使用 CREATE INDEX Cypher 子句为节点和关系创建索引。

# Create an index on Person.name
driver.execute_query("CREATE INDEX person_name FOR (n:Person) ON (n.name)")

有关更多信息,请参阅 搜索性能索引

分析查询 (Profile queries)

分析您的查询以定位性能可以优化的查询。您可以通过在查询前加上 PROFILE 来分析它们。服务器输出可在 ResultSummary 对象的 profile 属性中获得。

_, summary, _ = driver.execute_query("PROFILE MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.profile['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator        | Details        | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline            |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults | p              |              1 |    1 |       3 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +Filter         | p.name = $name |              1 |    1 |       4 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +AllNodesScan   | p              |             10 |    4 |       5 |            120 |                 9160/0 |   108.923 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+

Total database accesses: 12, total allocated memory: 184
"""

如果某些查询太慢,以至于无法在合理的时间内运行,您可以将 PROFILE 替换为 EXPLAIN。这将返回服务器运行查询所使用的计划,但不会实际执行它。服务器输出可在 ResultSummary 对象的 plan 属性中获得。

_, summary, _ = driver.execute_query("EXPLAIN MATCH (p {name: $name}) RETURN p", name="Alice")
print(summary.plan['args']['string-representation'])
"""
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+---------------------+
| Operator        | Details        | Estimated Rows | Pipeline            |
+-----------------+----------------+----------------+---------------------+
| +ProduceResults | p              |              1 |                     |
| |               +----------------+----------------+                     |
| +Filter         | p.name = $name |              1 |                     |
| |               +----------------+----------------+                     |
| +AllNodesScan   | p              |             10 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+---------------------+

Total database accesses: ?
"""

指定节点标签

在所有查询中指定节点标签。这使得查询规划器能够更高效地工作,并在可用时利用索引。要了解如何组合标签,请参阅 Cypher → 标签表达式

最佳实践

driver.execute_query("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")
with driver.session(database="<database-name>") as session:
    session.run("MATCH (p:Person|Animal {name: $name}) RETURN p", name="Alice")

不良实践

driver.execute_query("MATCH (p {name: $name}) RETURN p", name="Alice")
with driver.session(database="<database-name>") as session:
    session.run("MATCH (p {name: $name}) RETURN p", name="Alice")

批量创建数据

使用 WITHUNWIND Cypher 子句,在创建大量记录时对查询进行批处理

最佳实践

提交包含所有值的单个查询
numbers = [{"value": random()} for _ in range(10000)]
driver.execute_query("""
    UNWIND $numbers AS node
    MERGE (:Number {value: node.value})
    """, numbers=numbers,
)

不良实践

提交许多单个查询,每个值对应一个查询
for _ in range(10000):
    driver.execute_query("MERGE (:Number {value: $value})", value=random())
将大量数据首次导入新数据库的最有效方式是使用 neo4j-admin database import 命令。

使用查询参数

始终使用 查询参数,而不是将值硬编码或拼接进查询字符串中。除了防止 Cypher 注入外,这还能更好地利用数据库查询缓存。

最佳实践

driver.execute_query("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
with driver.session(database="<database-name>") as session:
    session.run("MATCH (p:Person {name: $name}) RETURN p", name="Alice")

不良实践

driver.execute_query("MATCH (p:Person {name: 'Alice'}) RETURN p")
# or
name = "Alice"
driver.execute_query("MATCH (p:Person {name: '" + name + "'}) RETURN p")
with driver.session(database="<database-name>") as session:
    session.run("MATCH (p:Person {name: 'Alice'}) RETURN p")
    # or
    name = "Alice"
    session.run("MATCH (p:Person {name: '" + name + "'}) RETURN p")

并发

使用 并发,无论是采用多线程形式,还是使用驱动程序的异步版本。如果您在应用程序中并行化复杂且耗时的查询,这对性能的影响可能会更大,但如果您运行许多简单的查询,效果则不那么明显。

仅在需要时使用 MERGE 进行创建

Cypher 子句 MERGE 非常适合创建数据,因为它可以在存在给定模式的精确克隆时避免重复数据。然而,它要求数据库运行两个查询:首先需要 MATCH 该模式,然后(如果需要)才能 CREATE 它。

如果您已经知道插入的数据是新的,请避免使用 MERGE,直接使用 CREATE——这实际上将数据库查询的数量减半了。

过滤通知

过滤服务器应引发的通知的类别和/或严重级别。

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 execute_readexecute_write 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

一个 Driver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。