周期性执行
APOC 库包含一些过程,可用于在执行大规模写入操作时对事务进行批处理。
从 Neo4j 5.21 版本开始,可以使用 Cypher 命令 CALL {…} IN CONCURRENT TRANSACTIONS 并行处理事务。有关更多信息,请参阅 事务中的 CALL 子查询 → 并发事务。 |
批处理事务函数
| 限定名称 | 类型 |
|---|---|
|
过程 |
|
过程 |
周期性迭代 (Periodic Iterate)
当您需要处理大量数据进行导入、重构以及其他需要大型事务的场景时,apoc.periodic.iterate 过程非常有用。它通过将工作负载分为两部分,提供了一种对数据进行批处理的方法:
- 数据驱动语句
-
这定义了如何选择需要处理的数据。您可以提供一个 Cypher 语句从现有图数据中选择,从文件或 API 读取外部数据,或从其他数据存储中检索数据。
- 操作语句
-
这定义了您想要对所选数据执行的操作。您可以执行诸如更新、创建/删除数据的 Cypher 语句,或者在加载前运行其他过程来操作和转换值。
数据驱动语句作为第一个语句提供,其结果是一个待处理的值流。操作语句作为第二个语句提供,用于一次处理一个元素,或者(使用 batchMode: "BATCH" 时)一次处理一批元素。数据驱动语句的结果将作为参数传递给操作语句,因此它们会自动以其名称可用。
| 名称 (name) | type | 默认 | description(描述) |
|---|---|---|---|
batchSize |
INTEGER(整数) |
10000 |
在单个事务中运行指定数量的操作语句 - 参数:{_count, _batch} |
parallel(并行) |
布尔值 (BOOLEAN) |
false |
并行运行操作语句(注意,如果语句存在冲突,可能会发生死锁) |
retries |
INTEGER(整数) |
0 |
如果操作语句因错误而失败,则休眠 100 毫秒并重试,直到达到重试次数 - 参数 {_retry} |
batchMode(批处理模式) |
STRING |
"BATCH" |
操作语句应如何处理数据驱动的语句。有效值包括
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
params |
MAP |
{} |
外部传入参数映射 |
concurrency |
INTEGER(整数) |
可用处理器的数量 |
使用 |
failedParams (失败参数) |
INTEGER(整数) |
-1 |
如果设置为非负值,则每个失败批次的最多 |
|
在 APOC 4.0.0.11 及更早版本中,
|
| 参数 | 默认 | description(描述) |
|---|---|---|
iterateList |
true |
每 batchSize 执行一次操作语句(整个 batchSize 列表作为参数 {_batch} 传递)
|
周期性迭代示例
让我们来看一些示例。
如果您要为数百万个 :Person 节点添加 :Actor 标签,可以运行以下代码:
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
让我们分解传递给该过程的参数:
-
我们的第一个 Cypher 语句选择了所有与另一个节点有
ACTED_IN关系的Person节点并返回这些人员。这是数据驱动部分,我们在这里选择想要更改的数据。 -
我们的第二个 Cypher 语句在选定的每个
Person节点上设置:Actor标签。这是操作部分,我们在这里对第一个语句选择的数据应用更改。 -
最后,我们指定了希望该过程使用的任何配置。我们定义了 10,000 的
batchSize并设置并行运行这些语句。
执行此过程将获取我们在第一个 Cypher 语句中收集的所有 Person 节点,并用第二个 Cypher 语句更新它们中的每一个。它将工作分为批次——从流中取出 10,000 个 Person 节点并在单个事务中更新它们。如果我们的图中有 30,000 个具有 ACTED_IN 关系的 Person 节点,那么它会将其拆分为 3 个批次。
最后,它会并行运行这些批次,因为更新节点标签或属性不会冲突。
|
对于更复杂的操作(如更新或删除关系),请不要使用 parallel: true,或者确保您以这样一种方式对工作进行批处理:即每个数据子图在一次操作中更新,例如通过传输根对象。如果您尝试复杂操作,还要启用失败操作重试,例如使用 |
现在让我们看一个更复杂的例子。
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
"MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
让我们分解传递给该过程的参数:
-
我们的第一个 Cypher 语句选择了所有订单日期大于
2016年10月13日的Order节点(第一个 Cypher 语句)。 -
我们的第二个 Cypher 语句获取这些组,查找与其它节点有
HAS_ITEM关系的节点,然后对这些项目的价值求和,并将该总和设置为订单总价值属性 (o.value)。 -
我们的配置会将这些节点分组成 100 个一批 (
batchSize:100),并为第二个语句并行运行这些批次以进行处理。
批处理模式:BATCH_SINGLE
如果我们的操作语句调用了一个接收批处理值的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取传递给该过程的批处理值。当我们使用 BATCH_SINGLE 时,操作语句将能够访问 $_batch 参数,该参数将包含数据驱动语句返回的字段列表。
例如,如果数据驱动语句是:
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
传递给操作语句的 $_batch 变量的内容将是:
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
让我们看看这在实践中的示例。我们将从创建一些节点开始。
Person 标签和 id 属性的节点:UNWIND range(1,100000) as id create (:Person {id: id})
我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅 删除数据。
此过程接收一个节点列表,我们可以从 $_batch 参数中提取该列表。
Person 节点并以 100 个为一批删除它们:CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN p",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
在操作语句中使用的 $_batch 参数的内容如下:
[
{p: Node<1>},
{p: Node<2>},
...
]
我们可以使用 列表推导式 (list comprehension) 从列表中的每个项中提取 p 变量。
如果我们运行此查询,将看到以下输出
| batch | 操作 |
|---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |
周期性提交 (Periodic Commit)
特别是在图处理中,反复在单独的事务中运行查询直到它不再处理并生成任何结果非常有用。因此,您可以分批迭代那些不满足条件的元素,并对它们进行更新,以便它们随后能够满足条件。
作为安全保障,在 apoc.periodic.commit 中使用的语句必须包含 LIMIT 子句。 |
该查询会在单独的事务中反复执行,直到它返回 0。
call apoc.periodic.commit(
"match (user:User) WHERE user.city IS NOT NULL
with user limit $limit
MERGE (city:City {name:user.city})
MERGE (user)-[:LIVES_IN]->(city)
REMOVE user.city
RETURN count(*)",
{limit:10000})
| 更新 | 执行 |
|---|---|
2000000 |
200 |
进度日志
要可视化 apoc.periodic.iterate 或 apoc.periodic.commit 的详细进度日志,请在 neo4j.conf 文件中将 dbms.logs.debug.level 设置为 DEBUG。
在下面的查询中,dbms.logs.debug.level 已被设置为 DEBUG。
UNWIND range(1,100) AS x CREATE (:TestLog{bar:'TestLog_'+x});
CALL apoc.periodic.iterate('match (p:TestLog) return p', 'SET p.foo =p.bar REMOVE p.bar', {batchSize:10,parallel:true});
将返回以下日志:
2020-11-27 09:03:44.279+0000 INFO Starting periodic iterate from `match (p:TestLog) return p` operation using iteration `SET p.foo =p.bar REMOVE p.bar` in separate thread with id: `fc8ff303-bfdd-49f0-a724-603f03b0da45`
2020-11-27 09:03:44.279+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 10 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 20 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.294+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 30 total
2020-11-27 09:03:44.294+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 40 total
2020-11-27 09:03:44.295+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 50 total
2020-11-27 09:03:44.297+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 60 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 70 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 80 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.299+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 90 total
2020-11-27 09:03:44.299+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.300+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 100 total
2020-11-27 09:03:44.512+0000 DEBUG Terminated periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45 with 100 executions