性能建议

始终指定目标数据库

在所有查询中指定目标数据库,无论是在 Driver.ExecutableQuery() 调用中,还是在 创建新会话时。如果不提供数据库,驱动程序必须向服务器发送额外的请求来确定默认数据库。对于单个查询,这种开销微不足道,但随着查询次数达到数百次,开销就会变得显著。

最佳实践

await driver.ExecutableQuery("<QUERY>")
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));

不良实践

await driver.ExecutableQuery("<QUERY>")
    .ExecuteAsync();
using var session = driver.AsyncSession();

了解事务的成本

通过 .ExecutableQuery().ExecuteRead/WriteAsync() 提交查询时,驱动程序会将它们包装在 事务 中。此行为确保数据库始终处于一致状态,无论事务执行期间发生什么(断电、软件崩溃等)。作为进一步的稳健性层,驱动程序还会以指数退避算法重试失败的事务。

围绕查询创建安全的执行上下文会产生一定的开销,虽然很小,但随着事务数量的增加会累积。当每个查询都作为单独的事务发送时,如果一个事务失败并需要回滚,所有其他事务都不会受到影响。就故障而言,这是最安全的执行模式,但由于事务开销随查询数量线性增加,这也是最慢的模式。

每个查询作为一个单独的事务(低吞吐量)
for (int i=0; i<1000; i++) {
    await driver.ExecutableQuery("<QUERY>")
        .WithConfig(new QueryConfig(database: "<database-name>"))
        .ExecuteAsync();
    // or session.ExecuteRead/WriteAsync() calls
}

一种性能更高的做法是将所有查询组合到一个事务中。这样,整个事务与其他事务隔离,但事务中的单个查询并不相互隔离,其中一个查询失败会导致所有查询回滚。

将查询组合为一个事务(高吞吐量)
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
await session.ExecuteWriteAsync(async tx => {
    for (int i=0; i<1000; i++) {
        var result = await tx.RunAsync("<QUERY>");
    }
});

一种更快的方法是跳过 .ExecuteRead/WriteAsync(),直接在会话上调用 .RunAsync()。这些查询作为自动提交事务运行,并且仍然与其他并发查询隔离,但如果其中任何一个失败,它们将不会被重试。使用此方法,你以牺牲一定的稳健性为代价换取了更高的吞吐量,因为查询会以服务器所能处理的最快速度发送。客户端大小的一个上限由连接池的大小决定:每次调用 .RunAsync() 都会借用一个连接,因此并行工作的数量受可用连接数的限制。

查询作为自动提交事务(最高吞吐量)
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
for (int i=0; i<1000; i++) {
    await session.RunAsync("<QUERY>");
}

不要一次性获取大量结果集

提交可能导致大量记录的查询时,不要一次性全部检索。Neo4j 服务器可以分批检索记录,并在记录可用时将其流式传输给驱动程序。对结果进行延迟加载可以分散网络流量和内存使用(客户端和服务器端)。

为方便起见,.ExecutableQuery() 总是立即检索所有结果记录(预加载)。若要延迟加载结果,请使用 .ExecuteRead/WriteAsync()(或其他手动处理 事务 的形式),并在处理结果时不要IResultCursor 对象上调用 .ToListAsync();改用迭代方式处理。

示例 1. 预加载与延迟加载的比较
预加载 (Eager loading) 延迟加载 (Lazy loading)
  • 服务器必须先从存储中读取全部 250 条记录,然后才能将第一条记录发送给驱动程序(即客户端接收第一条记录所需的时间更长)。

  • 在任何记录可供应用程序使用之前,驱动程序必须先接收所有 250 条记录。

  • 客户端必须将所有 250 条记录保存在内存中。

  • 服务器读取第一条记录并将其发送给驱动程序。

  • 一旦传输了第一条记录,应用程序就可以处理记录。

  • 剩余记录的等待时间和资源消耗推迟到应用程序请求更多记录时。

  • 服务器的获取时间可以用于客户端处理。

  • 资源消耗受驱动程序获取大小 (fetch size) 的限制。

预加载与延迟加载的时间和内存比较
using Neo4j.Driver;

const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";

// Returns 250 records, each with properties
// - `output` (an expensive computation, to slow down retrieval)
// - `dummyData` (a list of 10000 ints, about 8 KB).
string slowQuery = @"
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
       range(1, 10000) AS dummyData
";
// Delay for each processed record (ms)
int sleepTime = 500;

await using var driver = GraphDatabase.Driver(dbUri, AuthTokens.Basic(dbUser, dbPassword));
await driver.VerifyConnectivityAsync();

var watch = System.Diagnostics.Stopwatch.StartNew();
log("LAZY LOADING (ExecuteReadAsync)");
await lazyLoading(driver);
log($"--- {watch.ElapsedMilliseconds/1000} seconds ---");
Console.WriteLine();

watch.Restart();
log("EAGER LOADING (ExecutableQuery)");
await eagerLoading(driver);
log($"--- {watch.ElapsedMilliseconds/1000} seconds ---");

async Task lazyLoading(IDriver driver) {
    using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
    await session.ExecuteReadAsync(async tx => {
        log("Submit query");
        var result = await tx.RunAsync(slowQuery);
        while (await result.FetchAsync()) {
            int recordN = result.Current.Get<int>("output");
            log($"Processing record {recordN}");
            System.Threading.Thread.Sleep(sleepTime);  // proxy for some expensive operation
        }
    });
}

async Task eagerLoading(IDriver driver) {
    log("Submit query");
    var result = await driver.ExecutableQuery(slowQuery)
        .WithConfig(new QueryConfig(database: "<database-name>"))
        .ExecuteAsync();
    foreach (var record in result.Result) {
        int recordN = record.Get<int>("output");
        log($"Processing record {recordN}");
        System.Threading.Thread.Sleep(sleepTime);  // proxy for some expensive operation
    }
}

void log(string msg) {
    Console.WriteLine($"[{DateTimeOffset.UtcNow.ToUnixTimeSeconds()}] {msg}");
}
输出
[1734336457] LAZY LOADING (ExecuteReadAsync)
[1734336457] Submit query
[1734336457] Processing record 1  (1)
[1734336457] Processing record 2
[1734336458] Processing record 3
...
[1734336581] Processing record 249
[1734336581] Processing record 250
[1734336582] --- 125 seconds ---

[1734336582] EAGER LOADING (ExecutableQuery)
[1734336582] Submit query
[1734336594] Processing record 1   (2)
[1734336594] Processing record 2
[1734336595] Processing record 3
...
[1734336718] Processing record 249
[1734336718] Processing record 250
[1734336719] --- 136 seconds ---  (3)
1 使用延迟加载,第一条记录可以更快地获取。
2 使用预加载,第一条记录在结果被消费时(即服务器检索完所有 250 条记录后)才可用。
3 使用预加载时总运行时间较长,因为客户端需要等待直到收到最后一条记录;而使用延迟加载时,客户端可以在服务器获取后续记录的同时处理当前记录。通过延迟加载,客户端还可以在满足某个条件后停止请求记录(通过在 Result 对象上调用 .ConsumeAsync()),从而节省时间和资源。

使用 .ExecutableQuery() 时,可以使用 .WithStreamProcessor() 方法在记录到达时对其进行处理。这不会等到所有记录加载完毕才开始处理,从而加快响应速度;但这并不会改善内存占用,因为你无法控制记录的流动。你可以使用 .WithMap().WithReduce() 来减少 EagerResult 的内存占用。

驱动程序的 获取大小 (fetch size) 会影响延迟加载的行为。它指示服务器流式传输等于获取大小的记录数量,然后在检索和发送更多记录之前等待客户端处理完毕。

获取大小允许限制客户端的内存消耗。但它并不总是能限制服务器端的内存消耗:这取决于查询本身。例如,带有 ORDER BY 的查询需要在排序前将整个结果集加载到内存中,然后才能将记录流式传输给客户端。

获取大小越小,客户端和服务器之间需要交换的消息就越多。特别是如果服务器延迟很高,较小的获取大小可能会导致性能下降。

将读取查询路由到集群读取器

在集群中,将读取查询路由到任何读取器节点。您可以执行以下操作:

最佳实践

await driver.ExecutableQuery("MATCH (p:Person) RETURN p")
    .WithConfig(new QueryConfig(
        database: "<database-name>",
        routing: RoutingControl.Readers
    ))
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf
    .WithDatabase("<database-name>")
    .WithDefaultAccessMode(AccessMode.Read)
);

不良实践

// defaults to routing = writers
await driver.ExecutableQuery("MATCH (p:Person) RETURN p")
    .WithConfig(new QueryConfig(
        database: "<database-name>"
    ))
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));

// don't ask to write on a read-only operation
await session.ExecuteWriteAsync(async tx => {
    var result = await tx.RunAsync("MATCH (p:Person) RETURN p");
    return result.ToListAsync();
});

创建索引

为经常进行过滤的属性创建索引。例如,如果您经常通过 name 属性查找 Person 节点,那么在 Person.name 上创建索引将非常有益。您可以使用 CREATE INDEX Cypher 子句为节点和关系创建索引。

Person.name 上创建索引
await driver.ExecutableQuery("CREATE INDEX person_name FOR (n:Person) ON (n.name)").ExecuteAsync();

有关更多信息,请参阅 搜索性能索引

分析查询 (Profile queries)

分析你的查询,以定位可以提高性能的查询。你可以通过在查询前添加 PROFILE 来分析它们。服务器输出可在 IResultSummary 对象的 Profile 属性中找到。

var result = await driver.ExecutableQuery("PROFILE MATCH (p {name: $name}) RETURN p")
    .WithParameters(new { name = "Alice" })
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
// Note: `result.Result` is non-empty
var queryPlan = result.Summary.Profile;
Console.WriteLine(queryPlan.Arguments["string-representation"]);
/*
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator        | Details        | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline            |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults | p              |              1 |    1 |       3 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +Filter         | p.name = $name |              1 |    1 |       4 |                |                        |           |                     |
| |               +----------------+----------------+------+---------+----------------+                        |           |                     |
| +AllNodesScan   | p              |             10 |    4 |       5 |            120 |                 9160/0 |   108.923 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+

Total database accesses: 12, total allocated memory: 184
*/

如果某些查询太慢,以至于无法在合理的时间内运行,你可以在它们前面添加 EXPLAIN 而不是 PROFILE。这将返回服务器用于运行该查询的计划,而无需实际执行它。服务器输出可在 IResultSummary 对象的 Plan 属性中找到。

var result = await driver.ExecutableQuery("EXPLAIN MATCH (p {name: $name}) RETURN p")
    .WithParameters(new { name = "Alice" })
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
var queryPlan = result.Summary.Plan;
Console.WriteLine(queryPlan.Arguments["string-representation"]);
/*
Planner COST
Runtime PIPELINED
Runtime version 5.0
Batch size 128

+-----------------+----------------+----------------+---------------------+
| Operator        | Details        | Estimated Rows | Pipeline            |
+-----------------+----------------+----------------+---------------------+
| +ProduceResults | p              |              1 |                     |
| |               +----------------+----------------+                     |
| +Filter         | p.name = $name |              1 |                     |
| |               +----------------+----------------+                     |
| +AllNodesScan   | p              |             10 | Fused in Pipeline 0 |
+-----------------+----------------+----------------+---------------------+

Total database accesses: ?
*/

指定节点标签

指定节点标签(node labels)。这使得查询规划器可以更高效地工作,并利用可用的索引。要了解如何组合标签,请参阅 Cypher → 标签表达式

最佳实践

var result = await driver.ExecutableQuery("MATCH (p:Person|Animal {name: $name}) RETURN p")
    .WithParameters(new { name = "Alice" })
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
await session.RunAsync("MATCH (p:Person|Animal {name: $name}) RETURN p", new { name = "Alice" });

不良实践

var result = await driver.ExecutableQuery("MATCH (p {name: $name}) RETURN p")
    .WithParameters(new { name = "Alice" })
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
await session.RunAsync("MATCH (p {name: $name}) RETURN p", new { name = "Alice" });

批量创建数据

使用 WITHUNWIND Cypher 子句,在创建大量记录时对查询进行批处理

最佳实践

提交包含所有值的单个查询
// Generate a sequence of numbers
int start = 1;
int end = 10000;
var numbers = new List<Dictionary<string, dynamic>>();
for (int i=start; i<=end; i++) {
    numbers.Add(new Dictionary<string, dynamic>() {
        {"value", i},
    });
}

await driver.ExecutableQuery(@"
    UNWIND $numbers AS node
    MERGE (:Number {value: node.value})
    ")
    .WithParameters(new { numbers = numbers })
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();

不良实践

提交许多单个查询,每个值对应一个查询
for (int i=1; i<=10000; i++) {
    await driver.ExecutableQuery("MERGE (:Number {value: $value})")
        .WithParameters(new { value = i })
        .WithConfig(new QueryConfig(database: "<database-name>"))
        .ExecuteAsync();
}
将大量数据首次导入新数据库的最有效方式是使用 neo4j-admin database import 命令。

使用查询参数

始终使用 查询参数,而不是将值硬编码或拼接进查询字符串中。除了防止 Cypher 注入外,这还能更好地利用数据库查询缓存。

最佳实践

await driver.ExecutableQuery("MATCH (p:Person {name: $name}) RETURN p")
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .WithParameters(new { name = "Alice" })
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
await session.RunAsync("MATCH (p:Person {name: $name}) RETURN p", new { name = "Alice" });

不良实践

await driver.ExecutableQuery("MATCH (p:Person {name: 'Alice'}) RETURN p")
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
// or
string name = "Alice";
await driver.ExecutableQuery("MATCH (p:Person {name: '" + name + "'}) RETURN p")
    .WithConfig(new QueryConfig(database: "<database-name>"))
    .ExecuteAsync();
using var session = driver.AsyncSession(conf => conf.WithDatabase("<database-name>"));
await session.RunAsync("MATCH (p:Person {name: 'Alice'}) RETURN p");
// or
string name = "Alice";
session.RunAsync("MATCH (p:Person {name: '" + name + "'}) RETURN p");

仅在需要时使用 MERGE 进行创建

Cypher 子句 MERGE 非常适合创建数据,因为它可以在存在给定模式的精确克隆时避免重复数据。然而,它要求数据库运行两个查询:首先需要 MATCH 该模式,然后(如果需要)才能 CREATE 它。

如果您已经知道插入的数据是新的,请避免使用 MERGE,直接使用 CREATE——这实际上将数据库查询的数量减半了。

过滤通知

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 .ExecuteReadAsync().ExecuteWriteAsync() 调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。

IDriver

IDriver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。