使用子查询控制聚合的作用域
诸如 collect() 和 count() 之类的聚合在查询计划中显示为 EagerAggregation(积极聚合)算子(带有深蓝色标题)。
它们类似于 Eager(积极)算子,会形成一个壁垒,所有行必须执行到此处并在此停顿,以便完成聚合处理。但除此以外,它不会改变查询的流式处理行为(前提是计划中没有实际的 Eager 算子)。
这比 Eager 算子更节省内存,但聚合结果(包括用于分组键的变量)必须保存在内存中,直到聚合处理完所有输入,之后才能从生成的行恢复流式传输。
因此,如果聚合生成的行数足够多,可能会增加堆内存压力,导致高频次的 GC 暂停,最坏的情况下甚至会导致堆内存溢出。
本文展示了如何利用 Neo4j 4.1 引入的子查询来缩小聚合的作用域。这可以减轻堆内存负担,从而实现更高效的查询。
话虽如此,正确性始终是第一位的;有时对所有输入行进行积极聚合对于计算正确结果是必要的。但是,当你只想让积极聚合应用于特定的扩展(或输入段)时,可以通过子查询来控制它。
即使在 4.1 之前,你也可以通过模式推导(Pattern comprehensions)实现类似的效果(尽管规模较小),其作用是将 collect() 的作用域限制在推导中展开的模式内。
某些 APOC 过程也可以用作子查询的替代品。
聚合行为示例
让我们使用一个简单的例子,即 Neo4j Browser 中 :play movies 指南里的电影图谱。
从指南中创建好该图谱后,我们来看一个简单的查询,返回每个电影节点及其演员节点列表。
MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
RETURN movie, collect(p) as actors
它将如何执行?
首先会找到第一个路径匹配(从两个节点中的一个开始进行标签扫描,然后扩展模式,并根据节点标签进行过滤),接着开始聚合处理。
接着会找到下一个路径匹配,聚合会处理该行,依此类推。聚合会不断接收数据行,并相应地积累聚合结果。
最终,所有 172 条路径都由聚合处理完毕。聚合积累了 38 个结果行(每个电影对应一个,这是分组键),每一行包含电影节点和该电影的演员列表。现在所有输入行都已处理完毕,聚合完成,流式传输从 38 个结果行开始。
| 堆内存中的这种中间状态不包含节点或关系的属性,除非这些属性被作为变量投影出来。 |
稍大的数据集
这是一个小数据集。但如果图谱中的数据集远大呢?据估计,历史上已经制作了 50 万部电影。一次性在堆中保持 50 万行仍然不是问题,我们需要更大的数字才能构成挑战,或者至少在执行时产生明显的耗时差异。
让我们创建一个包含 100 万部电影、100 万名演员以及每部电影有 10 名演员的电影图谱。
我们将使用 APOC 过程中的 apoc.periodic.iterate() 来创建我们的图谱。
首先,让我们为电影和人物各创建 100 万个节点。
CALL apoc.periodic.iterate("
UNWIND range(1,1000000) as id
RETURN id
",
"
CREATE (m:Movie {id:id})
CREATE (p:Person {id:id})
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
目前我们不需要额外的属性。我们正在进行的匹配和聚合不依赖于节点属性,而且在将它们投影出来之前,我们也不会因使用它们而付出代价。
现在确保我们已经创建了索引。
CREATE INDEX ON :Person(id);
CREATE INDEX ON :Movie(id);
有了索引后,我们可以随机为每部电影分配 10 名演员。
CALL apoc.periodic.iterate("
MATCH (m:Movie)
RETURN m
",
"
UNWIND range(0,10) as i
WITH m, toInteger(rand() * 1000000) as id
MATCH (p:Person {id:id})
CREATE (m)<-[:ACTED_IN]-(p)
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
对于我的笔记本电脑,我配置了 4GB 的堆内存。实际的服务器部署通常会使用至少两倍于此的内存,建议最大值可达 31GB。
让我们使用原查询的稍作修改版本来看看效果。我想去掉结果返回部分(其中包含对所有这些节点的属性访问),因此我们只以 count() 聚合结尾,这通常开销更小(毕竟它只是在最后对每行输入增加计数)。
MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
WITH movie, collect(p) as actors
RETURN count(*)
分组键仍然是电影,所以我们知道在聚合构建过程中,必须在内存中保持多达 100 万行,以及每部电影的演员列表。
取决于堆内存的大小(以及同时执行的其他查询),这可能会给堆内存带来压力,导致内存耗尽且无法回收,从而引起高频次的 GC 暂停。我们甚至可能完全耗尽所有堆内存。
让我们尝试一下。首先是 EXPLAIN 计划。

我们可以看到 collect(开销较大)和 count(便宜)的聚合。让我们试着运行它。
Started streaming 1 records after 1 ms and completed after 14907 ms.
它返回了 100 万的计数(由于不重要已省略),但更有趣的是执行时间,或者说一旦我们有对比查询,它就会变得有趣。对我来说,这大约花费了 15 秒。
这个查询中最有趣的部分其实是在调试日志中。
2020-10-01 04:06:31.703+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=178, gcTime=248, gcCount=1}
2020-10-01 04:06:32.893+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=254, gcTime=269, gcCount=1}
2020-10-01 04:06:34.620+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=277, gcTime=295, gcCount=1}
2020-10-01 04:06:36.506+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=328, gcTime=383, gcCount=1}
2020-10-01 04:06:38.847+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=542, gcTime=628, gcCount=1}
2020-10-01 04:06:40.937+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=346, gcTime=384, gcCount=1}
2020-10-01 04:06:42.994+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=314, gcTime=348, gcCount=1}
2020-10-01 04:06:44.965+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=241, gcTime=271, gcCount=1}
2020-10-01 04:07:04.570+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=242, gcTime=256, gcCount=1}
2020-10-01 04:08:42.469+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=169, gcTime=198, gcCount=1}
这些 GC 虽然单次并不高,但这表明此类聚合确实会导致 GC 暂停。对于更复杂的查询或更复杂的数据集,这些暂停可能会变得非常显著。
子查询缩小聚合的作用域
如果我们选择合适的位置使用子查询并在子查询内进行聚合,就可以缩小聚合的作用域,并避免需要同时在内存中体现所有这些行。
MATCH (movie:Movie)
CALL {
WITH movie
MATCH (movie)<-[:ACTED_IN]-(p:Person)
RETURN collect(p) as actors
}
RETURN count(*)
这种方式应该更节省内存。
请记住,子查询是按行执行的。由于子查询之前有 MATCH,我们每一行对应一部电影。
MATCH 和聚合发生在子查询内部,所以对于每个 collect(),它一次只考虑单个电影的路径。这意味着每个 collect() 只应用于 10 个输入行(因为每部电影 10 个演员),因此单行的结果会非常快地得到。
请注意,这是一种权衡:我们不是将单个 collect() 聚合应用于 100 万行,而是使用子查询在电影层面分解工作。因为有 100 万部电影,我们最终会进行 100 万次子查询调用,每个调用都执行自己的扩展和 collect(),所以总计 collect() 被调用了 100 万次,但每次只需要在极少量数据上运行。
我们可以为每个输入行执行子查询,在有限的作用域内进行聚合,输出结果,然后继续下一行。在该行执行期间使用的内存都有资格进行垃圾回收,不需要在处理后续行时一直保存在堆中。
首先让我们检查这个查询的计划。

请注意,我们仍然看到 collect() 的积极聚合,但它输入到一个 Apply 操作中。这表明聚合的作用域仅限于它所应用的项目,即每个电影节点。
让我们试着运行它。我将省略实际的查询结果,因为我们知道那仍然是 100 万,但让我们看看耗时。
Started streaming 1 records after 1 ms and completed after 5542 ms.
重复运行会有一些差异,但通常在 4 到 6 秒之间。相比原始查询的 15 秒,这是一个不错的改进。
调试日志中的 GC 暂停情况如何?你的情况可能有所不同,但即使在多次重复查询执行后,我也没有看到记录任何 GC。
这表明对大量行同时进行聚合可能会非常消耗内存,而通过巧妙地应用子查询来缩小聚合作用域,往往可以避免这种情况及随之而来的 GC 暂停(前提是这样做对你的用例是正确的)。
模式推导类似于在子查询中调用的 collect()
模式推导 可以产生类似的效果,并且自 Neo4j 3.1 起就已提供。
MATCH (movie:Movie)
WITH movie, [(movie)<-[:ACTED_IN]-(p:Person) | p] as actors
RETURN count(*)
模式推导最类似于 OPTIONAL MATCH 后接 collect(),但与子查询类似,它们是按行执行的。甚至 EXPLAIN 计划也很相似。

请注意,这次带有 collect() 积极聚合的执行线输入到了 ConditionalApply 中,它是 Apply 的变体,意味着右侧是在嵌套循环中执行的,这也是这些操作的作用域。
它的性能如何?
Started streaming 1 records after 1 ms and completed after 4539 ms.
重复运行结果在 4 到 6 秒之间,与子查询版本大致相同。同样,我们在调试日志中没有看到 GC。
因此,就效率而言,无论是在耗时还是内存方面,模式推导都与使用子查询大致相同。
虽然这比使用子查询更简洁,并且通常更通用(你可以在单个 WITH 子句中使用多个模式推导),但它们仅用于收集结果。虽然你可以获取结果列表的 size() 作为 count() 的等价物,但不能将其用于其他任何类型的聚合。
此外,模式推导目前不允许对列表结果进行排序、跳过(skip)或限制(limit),如果使用子查询,这些功能都可以自由使用。
APOC 过程可以替代子查询
如果你使用的不是 Neo4j 4.1.x 或更高版本,APOC 中有一些过程可以作为子查询达到同样的效果。
MATCH (movie:Movie)
CALL apoc.cypher.run("
WITH movie
MATCH (movie)<-[:ACTED_IN]-(p:Person)
RETURN collect(p) as actors", {movie:movie}) YIELD value
WITH movie, value.actors as actors
RETURN count(*)
与子查询一样,过程是按行执行的,因此 collect() 聚合类似地仅作用于在特定调用中匹配到的行。
由于有 100 万部电影,总共会有 100 万次 apoc.cypher.run() 调用,每一次都执行自己的 MATCH 和小范围的 collect()。
我们将省略这个的计划,因为它不会显示任何有趣的内容。我们会看到一个过程调用操作,但由于查询的主要部分是查询字符串的形式,计划器无法对其进行评估,因此它不会显示在计划中。
我们可以单独运行复制/粘贴的查询字符串的 EXPLAIN(进行少量修改以使其能编译),但我们已经看过这样的计划(带有 collect() 聚合)。唯一的区别是,这个计划将被作为一个完全独立的事务进行计划和执行,其结果将产生给当前查询的事务。让我们看看它的表现。
Started streaming 1 records after 1 ms and completed after 136441 ms.
哇,这里发生了什么?耗时飙升到了 2 分钟左右。为什么会这样?
这个 APOC 过程会将查询创建并作为新事务执行,而原生子查询仍然在同一个事务内执行。这意味着我们实际上通过 APOC 执行了 100 万个独立的事务,这在设置和执行方面是有代价的。
如果这种方法在处理大量行时耗时如此昂贵,为什么我们还要考虑它?因为我们仍然在调试日志中没有看到 GC 暂停。
如果你的查询因为此类聚合而导致 GC 和堆内存耗尽问题,且你运行的版本过低无法使用原生子查询,且用例不允许使用模式推导,那么这种使用特定 APOC 过程的方法可能会让你避开 GC 和堆压力,但可能需要付出时间的代价。
像往常一样,请在自己的数据上进行计时测试。
此页面有帮助吗?