周期性执行

APOC 库包含一些过程,可用于在执行大规模写入操作时对事务进行批处理。

从 Neo4j 5.21 版本开始,可以使用 Cypher 命令 CALL {…​} IN CONCURRENT TRANSACTIONS 并行处理事务。有关更多信息,请参阅 事务中的 CALL 子查询 → 并发事务

批处理事务函数

限定名称 类型

apoc.periodic.commit(statement STRING, params MAP<STRING, ANY>) - 在单独的批量事务中运行给定的语句。

过程

apoc.periodic.iterate(cypherIterate STRING, cypherAction STRING, config MAP<STRING, ANY>) - 对第一个语句返回的每一项运行第二个语句。此过程返回批次数和已处理行的总数。

过程

周期性迭代 (Periodic Iterate)

当您需要处理大量数据进行导入、重构以及其他需要大型事务的场景时,apoc.periodic.iterate 过程非常有用。它通过将工作负载分为两部分,提供了一种对数据进行批处理的方法:

数据驱动语句

这定义了如何选择需要处理的数据。您可以提供一个 Cypher 语句从现有图数据中选择,从文件或 API 读取外部数据,或从其他数据存储中检索数据。

操作语句

这定义了您想要对所选数据执行的操作。您可以执行诸如更新、创建/删除数据的 Cypher 语句,或者在加载前运行其他过程来操作和转换值。

数据驱动语句作为第一个语句提供,其结果是一个待处理的值流。操作语句作为第二个语句提供,用于一次处理一个元素,或者(使用 batchMode: "BATCH" 时)一次处理一批元素。数据驱动语句的结果将作为参数传递给操作语句,因此它们会自动以其名称可用。

表 1. 配置
名称 (name) type 默认 description(描述)

batchSize

INTEGER(整数)

10000

在单个事务中运行指定数量的操作语句 - 参数:{_count, _batch}

parallel(并行)

布尔值 (BOOLEAN)

false

并行运行操作语句(注意,如果语句存在冲突,可能会发生死锁)
请注意,当 parallel: false 时,APOC 设计为重用同一个 java.util.concurrent.ThreadPoolExecutor(最大池大小为 1),以防止并行;这意味着如果您想执行多个 apoc.periodic.iterate,每一个都将在前一个完成后执行。相反,使用 parallel: true 时,APOC 将使用一个 ThreadPoolExecutor,其最大池大小可通过 apoc.jobs.pool.num_threads 配置,或者默认为可用处理器数量 * 2。因此,如果我们执行多个 apoc.periodic.iterate,只要队列池大小可以接受新任务,它们就会并行执行。此外,需要注意的是,并行运行会影响所有数据库,而不是您正在使用的单个数据库。例如,如果有 2 个数据库 db1db2,如果我们在 db2 上执行 apoc.periodic.iterate,那么在 db1 上执行的 apoc.periodic.iterate 将会受到影响。

retries

INTEGER(整数)

0

如果操作语句因错误而失败,则休眠 100 毫秒并重试,直到达到重试次数 - 参数 {_retry}

batchMode(批处理模式)

STRING

"BATCH"

操作语句应如何处理数据驱动的语句。有效值包括

  • "BATCH" - 每 batchSize 执行一次操作语句。操作语句前缀会自动处理,它从 $_batch 参数中提取数据驱动语句返回的每个字段。

UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
  • "SINGLE" - 每次处理一个元素来执行操作语句。

  • "BATCH_SINGLE" - 每 batchSize 执行一次操作语句,但将批处理的解包留给操作语句处理。操作查询可以通过 $_batch 参数访问批处理的值。

params

MAP

{}

外部传入参数映射

concurrency

INTEGER(整数)

可用处理器的数量

使用 parallel:true 时生成的并发任务数量

failedParams (失败参数)

INTEGER(整数)

-1

如果设置为非负值,则每个失败批次的最多 failedParams 参数集将在 yield failedParams 中返回。

在 APOC 4.0.0.11 及更早版本中,iterateList 配置键用于控制数据驱动语句返回值的批处理。这在 4.0.0.12 版本中被 batchMode 取代。这些配置键的处理方式如下:

  • 如果提供了 batchMode,其值优先于 iterateList

  • 如果未提供 batchMode 但提供了 iterateList,则 iterateList 的值将按如下表所述进行转换。

  • 如果既未提供 batchMode 也未提供 iterateList,则 batchMode 默认为 BATCH,这与 iterateList:true 相同。

表 2. 已弃用的配置
参数 默认 description(描述)

iterateList

true

每 batchSize 执行一次操作语句(整个 batchSize 列表作为参数 {_batch} 传递)

  • 值为 true 等同于 batchMode: "BATCH"

  • 值为 false 等同于 batchMode: "SINGLE"

周期性迭代示例

让我们来看一些示例。

如果您要为数百万个 :Person 节点添加 :Actor 标签,可以运行以下代码:

CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
  "SET p:Actor",
  {batchSize:10000, parallel:true})

让我们分解传递给该过程的参数:

  • 我们的第一个 Cypher 语句选择了所有与另一个节点有 ACTED_IN 关系的 Person 节点并返回这些人员。这是数据驱动部分,我们在这里选择想要更改的数据。

  • 我们的第二个 Cypher 语句在选定的每个 Person 节点上设置 :Actor 标签。这是操作部分,我们在这里对第一个语句选择的数据应用更改。

  • 最后,我们指定了希望该过程使用的任何配置。我们定义了 10,000 的 batchSize 并设置并行运行这些语句。

执行此过程将获取我们在第一个 Cypher 语句中收集的所有 Person 节点,并用第二个 Cypher 语句更新它们中的每一个。它将工作分为批次——从流中取出 10,000 个 Person 节点并在单个事务中更新它们。如果我们的图中有 30,000 个具有 ACTED_IN 关系的 Person 节点,那么它会将其拆分为 3 个批次。

最后,它会并行运行这些批次,因为更新节点标签或属性不会冲突。

对于更复杂的操作(如更新或删除关系),请不要使用 parallel: true,或者确保您以这样一种方式对工作进行批处理:即每个数据子图在一次操作中更新,例如通过传输根对象。如果您尝试复杂操作,还要启用失败操作重试,例如使用 retries:3

现在让我们看一个更复杂的例子。

CALL apoc.periodic.iterate(
  "MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
  "MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
  {batchSize:100, parallel:true})

让我们分解传递给该过程的参数:

  • 我们的第一个 Cypher 语句选择了所有订单日期大于 2016年10月13日Order 节点(第一个 Cypher 语句)。

  • 我们的第二个 Cypher 语句获取这些组,查找与其它节点有 HAS_ITEM 关系的节点,然后对这些项目的价值求和,并将该总和设置为订单总价值属性 (o.value)。

  • 我们的配置会将这些节点分组成 100 个一批 (batchSize:100),并为第二个语句并行运行这些批次以进行处理。

批处理模式:BATCH_SINGLE

如果我们的操作语句调用了一个接收批处理值的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取传递给该过程的批处理值。当我们使用 BATCH_SINGLE 时,操作语句将能够访问 $_batch 参数,该参数将包含数据驱动语句返回的字段列表。

例如,如果数据驱动语句是:

RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b

传递给操作语句的 $_batch 变量的内容将是:

[
  {a: "mark", b: "michael"},
  {a: "jennifer", b: "andrea"}
]

让我们看看这在实践中的示例。我们将从创建一些节点开始。

以下查询创建了 100,000 个带有 Person 标签和 id 属性的节点:
UNWIND range(1,100000) as id create (:Person {id: id})

我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅 删除数据

此过程接收一个节点列表,我们可以从 $_batch 参数中提取该列表。

以下查询流式处理所有 Person 节点并以 100 个为一批删除它们:
CALL apoc.periodic.iterate(
  "MATCH (p:Person) RETURN p",
  // Extract `p` variable using list comprehension
  "CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
  {batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;

在操作语句中使用的 $_batch 参数的内容如下:

[
  {p: Node<1>},
  {p: Node<2>},
  ...
]

我们可以使用 列表推导式 (list comprehension) 从列表中的每个项中提取 p 变量。

如果我们运行此查询,将看到以下输出

表 3. 结果
batch 操作

{total: 1000, committed: 1000, failed: 0, errors: {}}

{total: 100000, committed: 100000, failed: 0, errors: {}}

周期性提交 (Periodic Commit)

特别是在图处理中,反复在单独的事务中运行查询直到它不再处理并生成任何结果非常有用。因此,您可以分批迭代那些不满足条件的元素,并对它们进行更新,以便它们随后能够满足条件。

作为安全保障,在 apoc.periodic.commit 中使用的语句必须包含 LIMIT 子句。

该查询会在单独的事务中反复执行,直到它返回 0。

call apoc.periodic.commit(
  "match (user:User) WHERE user.city IS NOT NULL
   with user limit $limit
   MERGE (city:City {name:user.city})
   MERGE (user)-[:LIVES_IN]->(city)
   REMOVE user.city
   RETURN count(*)",
  {limit:10000})
表 4. 结果
更新 执行

2000000

200

进度日志

要可视化 apoc.periodic.iterateapoc.periodic.commit 的详细进度日志,请在 neo4j.conf 文件中将 dbms.logs.debug.level 设置为 DEBUG

在下面的查询中,dbms.logs.debug.level 已被设置为 DEBUG

UNWIND range(1,100) AS x CREATE (:TestLog{bar:'TestLog_'+x});
CALL apoc.periodic.iterate('match (p:TestLog) return p', 'SET p.foo =p.bar REMOVE p.bar', {batchSize:10,parallel:true});

将返回以下日志:

2020-11-27 09:03:44.279+0000 INFO  Starting periodic iterate from `match (p:TestLog) return p` operation using iteration `SET p.foo =p.bar REMOVE p.bar` in separate thread with id: `fc8ff303-bfdd-49f0-a724-603f03b0da45`
2020-11-27 09:03:44.279+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 10 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 20 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.294+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 30 total
2020-11-27 09:03:44.294+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 40 total
2020-11-27 09:03:44.295+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 50 total
2020-11-27 09:03:44.297+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 60 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 70 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 80 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.299+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 90 total
2020-11-27 09:03:44.299+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.300+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 100 total
2020-11-27 09:03:44.512+0000 DEBUG Terminated periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45 with 100 executions