apoc.periodic.iterate

事务可以通过 Cypher 命令 CALL {…​} IN CONCURRENT TRANSACTIONS 进行并行处理。更多信息,请参阅 事务中的 CALL 子查询 → 并发事务
详细信息

语法

apoc.periodic.iterate(cypherIterate, cypherAction, config) :: (batches, total, timeTaken, committedOperations, failedOperations, failedBatches, retries, errorMessages, batch, operations, wasTerminated, failedParams, updateStatistics)

描述

为第一个语句返回的每一项运行第二个语句。此过程返回批次数和处理的总行数。

输入参数

名称

类型

描述

cypherIterate

STRING

要运行的第一个 Cypher 语句。

cypherAction

STRING

针对初始 Cypher 语句返回的每一项要运行的 Cypher 语句。

config

MAP

{ batchSize = 10000 :: INTEGER, parallel = false :: BOOLEAN, retries = 0 :: INTEGER, batchMode = "BATCH" :: STRING, params = {} :: MAP, concurrency :: INTEGER, failedParams = -1 :: INTEGER, planner = "DEFAULT" :: ["DEFAULT", "COST", "IDP", "DP"] }

返回参数

名称

类型

描述

batches

INTEGER(整数)

总批次数。

总计

INTEGER(整数)

处理的输入行数。

耗时

INTEGER(整数)

所耗时长(秒)。

已提交的操作

INTEGER(整数)

成功的内部查询(操作)数量。

failedOperations

INTEGER(整数)

失败的内部查询(操作)数量。

failedBatches

INTEGER(整数)

失败的批次数。

retries

INTEGER(整数)

重试次数。

errorMessages

MAP

批次错误消息及其对应错误计数的映射表。

batch

MAP

{ total :: INTEGER, failed :: INTEGER, committed :: INTEGER, errors :: MAP }

操作

MAP

{ total :: INTEGER, failed :: INTEGER, committed :: INTEGER, errors :: MAP }

wasTerminated

布尔值 (BOOLEAN)

如果事务在完成前被终止。

failedParams

MAP

失败批次的参数。键是以字符串表示的批次号,值是批次参数的列表。

updateStatistics

MAP

{ nodesCreated :: INTEGER, nodesDeleted :: INTEGER, relationshipsCreated :: INTEGER, relationshipsDeleted :: INTEGER, propertiesSet :: INTEGER, labelsAdded :: INTEGER, labelsRemoved :: INTEGER }

配置参数

该过程支持以下配置参数

配置参数
名称 类型 默认 描述

batchSize

INTEGER(整数)

10000

在单个事务中运行指定数量的操作语句 - 参数:{_count, _batch}

parallel(并行)

布尔值 (BOOLEAN)

false

并行运行操作语句(注意:如果存在冲突,语句可能会死锁)。
请注意,在 parallel: false 的情况下,APOC 被设计为复用同一个最大池大小为 1 的 java.util.concurrent.ThreadPoolExecutor,以防止并行;这意味着如果你想要执行多个 apoc.periodic.iterate,每个执行都将在上一个执行完成后开始。相反,使用 parallel: true 时,APOC 将使用一个可以通过 apoc.jobs.pool.num_threads 配置或默认使用可用处理器数 * 2 来配置最大池大小的 ThreadPoolExecutor。因此,如果我们执行多个 apoc.periodic.iterate,只要队列池大小可以接受新任务,它们就会并行执行。此外,需要注意的是,并行运行会影响所有数据库,而不仅仅是你正在使用的单个数据库。例如,如果有两个数据库 db1db2,我们在 db1 上执行 apoc.periodic.iterate 会在我们在 db2 上执行 apoc.periodic.iterate 时影响性能。

retries

INTEGER(整数)

0

如果操作语句因错误而失败,则休眠 100 毫秒并重试,直到达到重试次数 - 参数 {_retry}

batchMode(批处理模式)

STRING

"BATCH"

操作语句应如何处理数据驱动的语句。有效值包括

  • "BATCH" - 每 batchSize 执行一次操作语句。操作语句会添加前缀,从 $_batch 参数中提取数据驱动语句返回的每个字段。

UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
  • "SINGLE" - 每次执行一个操作语句。

  • "BATCH_SINGLE" - 每 batchSize 执行一次操作语句,但将批次的解包留给操作语句处理。操作查询可以通过 $_batch 参数访问批处理的值。

params

MAP

{}

外部传入参数映射。

concurrency

INTEGER(整数)

可用处理器的数量

使用 parallel:true 时生成的并发任务数量

failedParams

INTEGER(整数)

-1

如果设置为非负值,则每个失败批次最多 failedParams 个参数集将在 yield failedParams 中返回。

planner

枚举[DEFAULT, COST, IDP, DP]

DEFAULT

除了 DEFAULT 之外的任何规划器都将作为 cypher planner=[VALUE_OF_CONFIG](或者在现有查询选项中插入 planner=[VALUE_OF_CONFIG])预置到第二个语句中。该规划器值(除了 DEFAULT)的优先级高于查询中定义的规划器(如果有)。

重新绑定节点和关系

自 Neo4j 4.0 起,节点和关系保留对其创建所在事务的引用。因此,在 Neo4j 4.0 及更高版本中,必须重新绑定从不同事务中检索到的节点和关系,以确保它们可以被安全引用。这可以通过在新的事务中调用 MATCH 来完成

MATCH (n) WHERE id(n) = id(myKnownNode)

此更改会影响 apoc.periodic.iterate,因为此过程在其自己的内部事务中运行。

在 Neo4j 3.5.x 及更早版本中使用 apoc.periodic.iterate
CALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN r',
'CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});
在 Neo4j 4.0 及更高版本中使用 apoc.periodic.iterate
CALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN id(r) as id',
'MATCH ()-[r]->() WHERE id(r)=id CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});

使用示例

让我们看一些例子。

如果你想为数百万个 :Person 节点添加 :Actor 标签,可以运行以下代码

CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
  "SET p:Actor",
  {batchSize:10000, parallel:true})

让我们拆解一下传递给该过程的参数

  • 我们的第一个 Cypher 语句选择所有与另一个节点有 ACTED_IN 关系的 Person 节点并返回这些人员。这是数据驱动部分,我们在此选择要更改的数据。

  • 我们的第二个 Cypher 语句为所选的每个 Person 节点设置 :Actor 标签。这是操作部分,我们在此将更改应用到第一个语句提供的数据上。

  • 最后,我们指定过程要使用的任何配置。我们定义了 10,000 的 batchSize 并设置并行运行语句。

执行此过程将获取我们在第一个 Cypher 语句中收集的所有 Person 节点,并用第二个 Cypher 语句更新它们中的每一个。它将工作划分为批次 - 从流中获取 10,000 个 Person 节点并在单个事务中更新它们。如果我们的图中共有 30,000 个带 ACTED_IN 关系的 Person 节点,它会将此工作拆分为 3 个批次。

最后,由于更新节点标签或属性不会产生冲突,它会并行运行这些批次。

对于更新或删除关系等更复杂的操作,请不要使用 parallel: true,或者确保以每个数据子图在一次操作中更新的方式对工作进行批处理,例如通过传输根对象。如果你尝试复杂操作,也请启用失败操作重试,例如设置 retries:3

现在让我们看一个更复杂的例子。

CALL apoc.periodic.iterate(
  "MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
  "MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
  {batchSize:100, parallel:true})

让我们拆解一下传递给该过程的参数

  • 我们的第一个 Cypher 语句选择所有订单日期大于 2016年10月13日Order 节点(第一个 Cypher 语句)。

  • 我们的第二个 Cypher 语句获取这些组,查找与其它节点有 HAS_ITEM 关系的节点,然后计算这些项的总价值,并将该总和设置为总订单价值的属性 (o.value)。

  • 我们的配置将把这些节点分组成 100 个一批 (batchSize:100),并为第二个语句并行运行批次处理。

批处理模式:BATCH_SINGLE

如果我们的操作语句调用了一个接收值批次的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取值批次并传递给该过程。当我们使用 BATCH_SINGLE 时,操作语句将可以访问 $_batch 参数,其中包含数据驱动语句中返回的字段列表。

例如,如果数据驱动语句是

RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b

传递给操作语句的 $_batch 变量的内容将是

[
  {a: "mark", b: "michael"},
  {a: "jennifer", b: "andrea"}
]

让我们看看它的实际应用。首先,我们创建一些节点

以下查询创建了 100,000 个带有 Person 标签和 id 属性的节点
UNWIND range(1,100000) as id create (:Person {id: id})

我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅 删除数据

此过程接收一个节点列表,我们可以从 $_batch 参数中提取该列表。

以下查询流式处理所有 Person 节点并以 100 个为一批删除它们。请注意,为第一个参数使用节点而不是节点 ID(例如 MATCH (p:Person) RETURN p)会导致父事务跟踪所有已删除的节点,从而导致更高的总体内存使用量。请考虑使用 elementId 函数在事务之间传递节点信息。

CALL apoc.periodic.iterate(
  "MATCH (p:Person) RETURN id(p) as personId",
  // Extract `p` variable using list comprehension
  "CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
  {batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;

在操作语句中使用的 $_batch 参数内容如下

[
  {p: Node<1>},
  {p: Node<2>},
  ...
]

我们可以使用 列表推导式 从列表的每一项中提取 p 变量。

如果我们运行此查询,将看到以下输出

结果
batch 操作

{total: 1000, committed: 1000, failed: 0, errors: {}}

{total: 100000, committed: 100000, failed: 0, errors: {}}