|| apoc.periodic.iterate - APOC 核心文档 - Neo4j 文档

apoc.periodic.iterate

可以使用 Cypher 命令 CALL {…​} IN CONCURRENT TRANSACTIONS 并行处理事务。更多信息请参阅事务中的 CALL 子查询 → 并发事务
详情

语法

apoc.periodic.iterate(cypherIterate, cypherAction, config) :: (batches, total, timeTaken, committedOperations, failedOperations, failedBatches, retries, errorMessages, batch, operations, wasTerminated, failedParams, updateStatistics)

描述

为第一个语句返回的每个项目运行第二个语句。此过程返回批次数量和已处理的总行数。

输入参数

名称

类型

描述

cypherIterate

STRING

要运行的第一个 Cypher 语句。

cypherAction

STRING

为初始 Cypher 语句返回的每个项目运行的 Cypher 语句。

config

MAP

{ batchSize = 10000 :: INTEGER, parallel = false :: BOOLEAN, retries = 0 :: INTEGER, batchMode = "BATCH" :: STRING, params = {} :: MAP, concurrency :: INTEGER, failedParams = -1 :: INTEGER, planner = "DEFAULT" :: ["DEFAULT", "COST", "IDP", "DP"] }

返回参数

名称

类型

描述

batches

INTEGER

批次总数。

total

INTEGER

已处理的输入行数。

timeTaken

INTEGER

以秒为单位的持续时间。

committedOperations

INTEGER

成功内部查询(操作)的数量。

failedOperations

INTEGER

失败内部查询(操作)的数量。

failedBatches

INTEGER

失败批次的数量。

retries

INTEGER

重试次数。

errorMessages

MAP

批处理错误消息及其对应错误计数的映射。

batch

MAP

{ total :: INTEGER, failed :: INTEGER, committed :: INTEGER, errors :: MAP }

operations

MAP

{ total :: INTEGER, failed :: INTEGER, committed :: INTEGER, errors :: MAP }

wasTerminated

BOOLEAN

事务是否在完成前被终止。

failedParams

MAP

失败批次的参数。键是作为 STRING 的批次号,值是批次参数列表。

updateStatistics

MAP

{ nodesCreated :: INTEGER, nodesDeleted :: INTEGER, relationshipsCreated :: INTEGER, relationshipsDeleted :: INTEGER, propertiesSet :: INTEGER, labelsAdded :: INTEGER, labelsRemoved :: INTEGER }

配置参数

该过程支持以下配置参数

配置参数
名称 类型 默认值 描述

batchSize

INTEGER

10000

在单个事务中运行指定数量的操作语句 - 参数: {_count, _batch}

parallel

BOOLEAN

false

并行运行操作语句(请注意,如果发生冲突,语句可能会死锁)。
请注意,在 parallel: false 的情况下,APOC 旨在重用同一个 java.util.concurrent.ThreadPoolExecutor,其最大线程池大小等于 1,以防止并行执行;这意味着如果您想执行多个 apoc.periodic.iterate,每个都将在前一个完成后执行。相反,使用 parallel: true 时,APOC 将使用一个 ThreadPoolExecutor,其最大线程池大小可通过 apoc.jobs.pool.num_threads 配置或默认设置为可用处理器数量的两倍。因此,如果我们执行多个 apoc.periodic.iterate,如果队列池大小可以接受新任务,每个都将并行执行。此外,需要注意的是,并行运行会影响所有数据库,而不仅仅是您正在使用的单个数据库。因此,如果存在 db1db2 两个数据库,db1 上的 apoc.periodic.iterate 将会影响 db2 上的 apoc.periodic.iterate 的性能。

retries

INTEGER

0

如果操作语句失败并出现错误,暂停 100 毫秒并重试,直到达到重试计数 - 参数 {_retry}

batchMode

STRING

"BATCH"

操作语句应如何处理数据驱动语句。有效值包括

  • "BATCH" - 每批次执行一次操作语句。操作语句前缀如下,它从 $_batch 参数中提取数据驱动语句返回的每个字段。

UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
  • "SINGLE" - 一次执行一条操作语句。

  • "BATCH_SINGLE" - 每批次执行一次操作语句,但批次的解包留给操作语句处理。操作查询可以通过 $_batch 参数访问批处理值。

params

MAP

{}

外部传入的参数映射

concurrency

INTEGER

可用处理器数量

使用 parallel:true 时生成的并发任务数量

failedParams

INTEGER

-1

如果设置为非负值,则每个失败的批次(最多 failedParams 参数集)都将在 yield failedParams 中返回。

planner

枚举[DEFAULT, COST, IDP, DP]

DEFAULT

DEFAULT 之外的任何规划器都将作为 cypher planner=[VALUE_OF_CONFIG] 附加到第二个语句前(或将 planner=[VALUE_OF_CONFIG] 插入任何现有查询选项中)。此规划器值(除了 DEFAULT)的优先级高于查询中定义的规划器(如果有)。

重新绑定节点和关系

自 Neo4j 4.0 起,节点和关系保留对其创建事务的引用。在 Neo4j 4.0 及更高版本中,从不同事务检索的节点和关系必须重新绑定,以确保它们可以安全引用。这可以通过在新事务中调用 MATCH 来完成。

MATCH (n) WHERE id(n) = id(myKnownNode)

此更改会影响 apoc.periodic.iterate,因为此过程在其自己的内部事务中运行。

在 Neo4j 3.5.x 及更早版本上使用 apoc.periodic.iterate
CALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN r',
'CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});
在 Neo4j 4.0 及更高版本上使用 apoc.periodic.iterate
CALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN id(r) as id',
'MATCH ()-[r]->() WHERE id(r)=id CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});

使用示例

我们来看一些例子。

如果要将 :Actor 标签添加到数百万个 :Person 节点,可以运行以下代码

CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
  "SET p:Actor",
  {batchSize:10000, parallel:true})

让我们分解一下传递给过程的参数

  • 我们的第一个 Cypher 语句选择所有具有 ACTED_IN 关系到另一个节点的 Person 节点并返回这些人员。这是数据驱动部分,我们选择要更改的数据。

  • 我们的第二个 Cypher 语句为每个选定的 Person 节点设置 :Actor 标签。这是操作部分,我们对来自第一个语句的数据应用更改。

  • 最后,我们指定希望过程使用的任何配置。我们定义了 batchSize 为 10,000,并并行运行语句。

执行此过程将获取第一个 Cypher 语句中收集的所有 Person 节点,并使用第二个 Cypher 语句更新它们。它将工作分为批次——从流中取出 10,000 个 Person 节点,并在单个事务中更新它们。如果我们的图中共有 30,000 个具有 ACTED_IN 关系的 Person 节点,那么它将把此工作分解为 3 个批次。

最后,它并行运行这些操作,因为更新节点标签或属性不会冲突。

对于更复杂的操作,如更新或删除关系,请**不要使用 parallel: true**,或者确保您以一种在一次操作中更新每个数据子图的方式批量处理工作,例如通过传输根对象。如果您尝试复杂操作,也请启用失败操作的重试,例如使用 retries:3

现在让我们看一个更复杂的例子。

CALL apoc.periodic.iterate(
  "MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
  "MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
  {batchSize:100, parallel:true})

让我们分解一下传递给过程的参数

  • 我们的第一个 Cypher 语句选择所有订单日期晚于 2016 年 10 月 13 日Order 节点(第一个 Cypher 语句)。

  • 我们的第二个 Cypher 语句获取这些组,并查找具有 HAS_ITEM 关系到其他节点的节点,然后对这些项目的价值求和,并将该和设置为总订单值的属性(o.value)。

  • 我们的配置会将这些节点分组为 100 个(batchSize:100),并并行运行这些批次,供第二个语句处理。

批处理模式:BATCH_SINGLE

如果我们的操作语句调用了一个接受批量值的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取批量值以传递给该过程。当我们使用 BATCH_SINGLE 时,操作语句将能够访问 $_batch 参数,该参数将包含数据驱动语句中返回的字段列表。

例如,如果数据驱动语句是

RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b

传递给操作语句的 $_batch 变量的内容将是

[
  {a: "mark", b: "michael"},
  {a: "jennifer", b: "andrea"}
]

让我们看一个实际的例子。我们首先创建一些节点。

以下查询创建 100,000 个带有标签 Person 和属性 id 的节点
UNWIND range(1,100000) as id create (:Person {id: id})

我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅删除数据

此过程接受节点列表,我们可以从 $_batch 参数中提取这些节点。

以下查询流式传输所有 Person 节点,并以 100 个为一批进行删除。请注意,如果第一个参数使用节点而不是节点 ID(例如 MATCH (p:Person) RETURN p),则父事务将跟踪所有已删除的节点,从而导致总体内存使用量更高。请考虑使用 elementId 函数在事务之间传递节点信息。

CALL apoc.periodic.iterate(
  "MATCH (p:Person) RETURN id(p) as personId",
  // Extract `p` variable using list comprehension
  "CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
  {batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;

操作语句中使用的 $_batch 参数内容如下:

[
  {p: Node<1>},
  {p: Node<2>},
  ...
]

我们可以使用列表推导式从列表中的每个项目中提取 p 变量。

如果我们运行此查询,我们将看到以下输出

结果
batch operations

{total: 1000, committed: 1000, failed: 0, errors: {}}

{total: 100000, committed: 100000, failed: 0, errors: {}}

© . This site is unofficial and not affiliated with Neo4j, Inc.