apoc.periodic.iterate过程
事务可以通过 Cypher 命令 CALL {…} IN CONCURRENT TRANSACTIONS 进行并行处理。更多信息,请参阅 事务中的 CALL 子查询 → 并发事务。 |
语法 |
|
||
描述 |
为第一个语句返回的每一项运行第二个语句。此过程返回批次数和处理的总行数。 |
||
输入参数 |
名称 |
类型 |
描述 |
|
|
要运行的第一个 Cypher 语句。 |
|
|
|
针对初始 Cypher 语句返回的每一项要运行的 Cypher 语句。 |
|
|
|
|
|
返回参数 |
名称 |
类型 |
描述 |
|
|
总批次数。 |
|
|
|
处理的输入行数。 |
|
|
|
所耗时长(秒)。 |
|
|
|
成功的内部查询(操作)数量。 |
|
|
|
失败的内部查询(操作)数量。 |
|
|
|
失败的批次数。 |
|
|
|
重试次数。 |
|
|
|
批次错误消息及其对应错误计数的映射表。 |
|
|
|
|
|
|
|
|
|
|
|
如果事务在完成前被终止。 |
|
|
|
失败批次的参数。键是以字符串表示的批次号,值是批次参数的列表。 |
|
|
|
|
|
配置参数
该过程支持以下配置参数
| 名称 | 类型 | 默认 | 描述 |
|---|---|---|---|
|
|
10000 |
在单个事务中运行指定数量的操作语句 - 参数:{_count, _batch} |
|
|
false |
并行运行操作语句(注意:如果存在冲突,语句可能会死锁)。 |
|
|
0 |
如果操作语句因错误而失败,则休眠 100 毫秒并重试,直到达到重试次数 - 参数 {_retry} |
|
|
"BATCH" |
操作语句应如何处理数据驱动的语句。有效值包括
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
|
|
{} |
外部传入参数映射。 |
|
|
可用处理器的数量 |
使用 |
|
|
-1 |
如果设置为非负值,则每个失败批次最多 |
|
|
DEFAULT |
除了 |
重新绑定节点和关系
自 Neo4j 4.0 起,节点和关系保留对其创建所在事务的引用。因此,在 Neo4j 4.0 及更高版本中,必须重新绑定从不同事务中检索到的节点和关系,以确保它们可以被安全引用。这可以通过在新的事务中调用 MATCH 来完成
MATCH (n) WHERE id(n) = id(myKnownNode)
此更改会影响 apoc.periodic.iterate,因为此过程在其自己的内部事务中运行。
apoc.periodic.iterateCALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN r',
'CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});
apoc.periodic.iterateCALL apoc.periodic.iterate('MATCH (:Account)-[r:ASSOCIATED_WITH]->() RETURN id(r) as id',
'MATCH ()-[r]->() WHERE id(r)=id CALL apoc.do.case(.....) YIELD value RETURN value',
{batchSize: 10000, parallel: false, iterateList: true});
使用示例
让我们看一些例子。
如果你想为数百万个 :Person 节点添加 :Actor 标签,可以运行以下代码
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
让我们拆解一下传递给该过程的参数
-
我们的第一个 Cypher 语句选择所有与另一个节点有
ACTED_IN关系的Person节点并返回这些人员。这是数据驱动部分,我们在此选择要更改的数据。 -
我们的第二个 Cypher 语句为所选的每个
Person节点设置:Actor标签。这是操作部分,我们在此将更改应用到第一个语句提供的数据上。 -
最后,我们指定过程要使用的任何配置。我们定义了 10,000 的
batchSize并设置并行运行语句。
执行此过程将获取我们在第一个 Cypher 语句中收集的所有 Person 节点,并用第二个 Cypher 语句更新它们中的每一个。它将工作划分为批次 - 从流中获取 10,000 个 Person 节点并在单个事务中更新它们。如果我们的图中共有 30,000 个带 ACTED_IN 关系的 Person 节点,它会将此工作拆分为 3 个批次。
最后,由于更新节点标签或属性不会产生冲突,它会并行运行这些批次。
|
对于更新或删除关系等更复杂的操作,请不要使用 parallel: true,或者确保以每个数据子图在一次操作中更新的方式对工作进行批处理,例如通过传输根对象。如果你尝试复杂操作,也请启用失败操作重试,例如设置 |
现在让我们看一个更复杂的例子。
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
"MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
让我们拆解一下传递给该过程的参数
-
我们的第一个 Cypher 语句选择所有订单日期大于
2016年10月13日的Order节点(第一个 Cypher 语句)。 -
我们的第二个 Cypher 语句获取这些组,查找与其它节点有
HAS_ITEM关系的节点,然后计算这些项的总价值,并将该总和设置为总订单价值的属性 (o.value)。 -
我们的配置将把这些节点分组成 100 个一批 (
batchSize:100),并为第二个语句并行运行批次处理。
批处理模式:BATCH_SINGLE
如果我们的操作语句调用了一个接收值批次的过程,我们可以使用 batchMode: "BATCH_SINGLE" 来获取值批次并传递给该过程。当我们使用 BATCH_SINGLE 时,操作语句将可以访问 $_batch 参数,其中包含数据驱动语句中返回的字段列表。
例如,如果数据驱动语句是
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
传递给操作语句的 $_batch 变量的内容将是
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
让我们看看它的实际应用。首先,我们创建一些节点
Person 标签和 id 属性的节点UNWIND range(1,100000) as id create (:Person {id: id})
我们可以使用 apoc.nodes.delete 过程删除这些节点。请参阅 删除数据。
此过程接收一个节点列表,我们可以从 $_batch 参数中提取该列表。
以下查询流式处理所有 Person 节点并以 100 个为一批删除它们。请注意,为第一个参数使用节点而不是节点 ID(例如 MATCH (p:Person) RETURN p)会导致父事务跟踪所有已删除的节点,从而导致更高的总体内存使用量。请考虑使用 elementId 函数在事务之间传递节点信息。
CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN id(p) as personId",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
在操作语句中使用的 $_batch 参数内容如下
[
{p: Node<1>},
{p: Node<2>},
...
]
我们可以使用 列表推导式 从列表的每一项中提取 p 变量。
如果我们运行此查询,将看到以下输出
| batch | 操作 |
|---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |