apoc.periodic.iterate
可以使用 Cypher 命令 CALL {…} IN CONCURRENT TRANSACTIONS 并行处理事务。更多信息请参阅事务中的 CALL 子查询 → 并发事务。 |
语法 |
|
||
描述 |
为第一个语句返回的每个项目运行第二个语句。此过程返回批次数量和已处理的总行数。 |
||
输入参数 |
名称 |
类型 |
描述 |
|
|
要运行的第一个 Cypher 语句。 |
|
|
|
为初始 Cypher 语句返回的每个项目运行的 Cypher 语句。 |
|
|
|
|
|
返回参数 |
名称 |
类型 |
描述 |
|
|
批次总数。 |
|
|
|
已处理的输入行数。 |
|
|
|
以秒为单位的持续时间。 |
|
|
|
成功内部查询(操作)的数量。 |
|
|
|
失败内部查询(操作)的数量。 |
|
|
|
失败批次的数量。 |
|
|
|
重试次数。 |
|
|
|
批处理错误消息及其对应错误计数的映射。 |
|
|
|
|
|
|
|
|
|
|
|
事务是否在完成前被终止。 |
|
|
|
失败批次的参数。键是作为 STRING 的批次号,值是批次参数列表。 |
|
|
|
|
|
配置参数
该过程支持以下配置参数
| 名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
|
|
10000 |
在单个事务中运行指定数量的操作语句 - 参数: {_count, _batch} |
|
|
false |
并行运行操作语句(请注意,如果发生冲突,语句可能会死锁)。 |
|
|
0 |
如果操作语句失败并出现错误,暂停 100 毫秒并重试,直到达到重试计数 - 参数 {_retry} |
|
|
"BATCH" |
操作语句应如何处理数据驱动语句。有效值包括
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
|
|
{} |
外部传入的参数映射 |
|
|
可用处理器数量 |
使用 |
|
|
-1 |
如果设置为非负值,则每个失败的批次(最多 |
|
|
DEFAULT |
除 |
重新绑定节点和关系
自 Neo4j 4.0 起,节点和关系保留对其创建事务的引用。在 Neo4j 4.0 及更高版本中,从不同事务检索的节点和关系必须重新绑定,以确保它们可以安全引用。这可以通过在新事务中调用 MATCH 来完成。
MATCH (n) WHERE id(n) = id(myKnownNode)
此更改会影响 apoc.periodic.iterate,因为此过程在其自己的内部事务中运行。
apoc.periodic.iterateCALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN r',
'CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});
apoc.periodic.iterateCALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN id(r) as id',
'MATCH ()-[r]->() WHERE id(r)=id CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});
使用示例
我们来看一些例子。
如果要将 :Actor 标签添加到数百万个 :Person 节点,可以运行以下代码
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
让我们分解一下传递给过程的参数
-
我们的第一个 Cypher 语句选择所有具有
ACTED_IN关系到另一个节点的Person节点并返回这些人员。这是数据驱动部分,我们选择要更改的数据。 -
我们的第二个 Cypher 语句为每个选定的
Person节点设置:Actor标签。这是操作部分,我们对来自第一个语句的数据应用更改。 -
最后,我们指定希望过程使用的任何配置。我们定义了
batchSize为 10,000,并并行运行语句。
执行此过程将获取第一个 Cypher 语句中收集的所有 Person 节点,并使用第二个 Cypher 语句更新它们。它将工作分为批次——从流中取出 10,000 个 Person 节点,并在单个事务中更新它们。如果我们的图中共有 30,000 个具有 ACTED_IN 关系的 Person 节点,那么它将把此工作分解为 3 个批次。
最后,它并行运行这些操作,因为更新节点标签或属性不会冲突。
|
对于更复杂的操作,如更新或删除关系,请**不要使用 parallel: true**,或者确保您以一种在一次操作中更新每个数据子图的方式批量处理工作,例如通过传输根对象。如果您尝试复杂操作,也请启用失败操作的重试,例如使用 |
现在让我们看一个更复杂的例子。
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
"MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
让我们分解一下传递给过程的参数
-
我们的第一个 Cypher 语句选择所有订单日期晚于
2016 年 10 月 13 日的Order节点(第一个 Cypher 语句)。 -
我们的第二个 Cypher 语句获取这些组,并查找具有
HAS_ITEM关系到其他节点的节点,然后对这些项目的价值求和,并将该和设置为总订单值的属性(o.value)。 -
我们的配置会将这些节点分组为 100 个(
batchSize:100),并并行运行这些批次,供第二个语句处理。
批处理模式:BATCH_SINGLE
如果我们的操作语句调用了一个接受批量值的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取批量值以传递给该过程。当我们使用 BATCH_SINGLE 时,操作语句将能够访问 $_batch 参数,该参数将包含数据驱动语句中返回的字段列表。
例如,如果数据驱动语句是
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
传递给操作语句的 $_batch 变量的内容将是
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
让我们看一个实际的例子。我们首先创建一些节点。
Person 和属性 id 的节点UNWIND range(1,100000) as id create (:Person {id: id})
我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅删除数据。
此过程接受节点列表,我们可以从 $_batch 参数中提取这些节点。
以下查询流式传输所有 Person 节点,并以 100 个为一批进行删除。请注意,如果第一个参数使用节点而不是节点 ID(例如 MATCH (p:Person) RETURN p),则父事务将跟踪所有已删除的节点,从而导致总体内存使用量更高。请考虑使用 elementId 函数在事务之间传递节点信息。
CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN id(p) as personId",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
操作语句中使用的 $_batch 参数内容如下:
[
{p: Node<1>},
{p: Node<2>},
...
]
我们可以使用列表推导式从列表中的每个项目中提取 p 变量。
如果我们运行此查询,我们将看到以下输出
| batch | operations |
|---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |