知识库

使用子查询控制聚合的作用域

诸如 collect() 和 count() 之类的聚合在查询计划中显示为 EagerAggregation(积极聚合)算子(带有深蓝色标题)。

它们类似于 Eager(积极)算子,会形成一个壁垒,所有行必须执行到此处并在此停顿,以便完成聚合处理。但除此以外,它不会改变查询的流式处理行为(前提是计划中没有实际的 Eager 算子)。

这比 Eager 算子更节省内存,但聚合结果(包括用于分组键的变量)必须保存在内存中,直到聚合处理完所有输入,之后才能从生成的行恢复流式传输。

因此,如果聚合生成的行数足够多,可能会增加堆内存压力,导致高频次的 GC 暂停,最坏的情况下甚至会导致堆内存溢出。

本文展示了如何利用 Neo4j 4.1 引入的子查询来缩小聚合的作用域。这可以减轻堆内存负担,从而实现更高效的查询。

话虽如此,正确性始终是第一位的;有时对所有输入行进行积极聚合对于计算正确结果是必要的。但是,当你只想让积极聚合应用于特定的扩展(或输入段)时,可以通过子查询来控制它。

即使在 4.1 之前,你也可以通过模式推导(Pattern comprehensions)实现类似的效果(尽管规模较小),其作用是将 collect() 的作用域限制在推导中展开的模式内。

某些 APOC 过程也可以用作子查询的替代品。

聚合行为示例

让我们使用一个简单的例子,即 Neo4j Browser 中 :play movies 指南里的电影图谱。

从指南中创建好该图谱后,我们来看一个简单的查询,返回每个电影节点及其演员节点列表。

MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
RETURN movie, collect(p) as actors

它将如何执行?

首先会找到第一个路径匹配(从两个节点中的一个开始进行标签扫描,然后扩展模式,并根据节点标签进行过滤),接着开始聚合处理。

接着会找到下一个路径匹配,聚合会处理该行,依此类推。聚合会不断接收数据行,并相应地积累聚合结果。

最终,所有 172 条路径都由聚合处理完毕。聚合积累了 38 个结果行(每个电影对应一个,这是分组键),每一行包含电影节点和该电影的演员列表。现在所有输入行都已处理完毕,聚合完成,流式传输从 38 个结果行开始。

堆内存中的这种中间状态不包含节点或关系的属性,除非这些属性被作为变量投影出来。

稍大的数据集

这是一个小数据集。但如果图谱中的数据集远大呢?据估计,历史上已经制作了 50 万部电影。一次性在堆中保持 50 万行仍然不是问题,我们需要更大的数字才能构成挑战,或者至少在执行时产生明显的耗时差异。

让我们创建一个包含 100 万部电影、100 万名演员以及每部电影有 10 名演员的电影图谱。

我们将使用 APOC 过程中的 apoc.periodic.iterate() 来创建我们的图谱。

首先,让我们为电影和人物各创建 100 万个节点。

CALL apoc.periodic.iterate("
UNWIND range(1,1000000) as id
RETURN id
",
"
CREATE (m:Movie {id:id})
CREATE (p:Person {id:id})
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages

目前我们不需要额外的属性。我们正在进行的匹配和聚合不依赖于节点属性,而且在将它们投影出来之前,我们也不会因使用它们而付出代价。

现在确保我们已经创建了索引。

CREATE INDEX ON :Person(id);
CREATE INDEX ON :Movie(id);

有了索引后,我们可以随机为每部电影分配 10 名演员。

CALL apoc.periodic.iterate("
MATCH (m:Movie)
RETURN m
",
"
UNWIND range(0,10) as i
WITH m, toInteger(rand() * 1000000) as id
MATCH (p:Person {id:id})
CREATE (m)<-[:ACTED_IN]-(p)
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages

对于我的笔记本电脑,我配置了 4GB 的堆内存。实际的服务器部署通常会使用至少两倍于此的内存,建议最大值可达 31GB。

让我们使用原查询的稍作修改版本来看看效果。我想去掉结果返回部分(其中包含对所有这些节点的属性访问),因此我们只以 count() 聚合结尾,这通常开销更小(毕竟它只是在最后对每行输入增加计数)。

MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
WITH movie, collect(p) as actors
RETURN count(*)

分组键仍然是电影,所以我们知道在聚合构建过程中,必须在内存中保持多达 100 万行,以及每部电影的演员列表。

取决于堆内存的大小(以及同时执行的其他查询),这可能会给堆内存带来压力,导致内存耗尽且无法回收,从而引起高频次的 GC 暂停。我们甚至可能完全耗尽所有堆内存。

让我们尝试一下。首先是 EXPLAIN 计划。

subqueries for aggregations plan1

我们可以看到 collect(开销较大)和 count(便宜)的聚合。让我们试着运行它。

Started streaming 1 records after 1 ms and completed after 14907 ms.

它返回了 100 万的计数(由于不重要已省略),但更有趣的是执行时间,或者说一旦我们有对比查询,它就会变得有趣。对我来说,这大约花费了 15 秒。

这个查询中最有趣的部分其实是在调试日志中。

2020-10-01 04:06:31.703+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=178, gcTime=248, gcCount=1}
2020-10-01 04:06:32.893+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=254, gcTime=269, gcCount=1}
2020-10-01 04:06:34.620+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=277, gcTime=295, gcCount=1}
2020-10-01 04:06:36.506+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=328, gcTime=383, gcCount=1}
2020-10-01 04:06:38.847+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=542, gcTime=628, gcCount=1}
2020-10-01 04:06:40.937+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=346, gcTime=384, gcCount=1}
2020-10-01 04:06:42.994+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=314, gcTime=348, gcCount=1}
2020-10-01 04:06:44.965+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=241, gcTime=271, gcCount=1}
2020-10-01 04:07:04.570+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=242, gcTime=256, gcCount=1}
2020-10-01 04:08:42.469+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=169, gcTime=198, gcCount=1}

这些 GC 虽然单次并不高,但这表明此类聚合确实会导致 GC 暂停。对于更复杂的查询或更复杂的数据集,这些暂停可能会变得非常显著。

子查询缩小聚合的作用域

如果我们选择合适的位置使用子查询并在子查询内进行聚合,就可以缩小聚合的作用域,并避免需要同时在内存中体现所有这些行。

MATCH (movie:Movie)
CALL {
    WITH movie
    MATCH (movie)<-[:ACTED_IN]-(p:Person)
    RETURN collect(p) as actors
}
RETURN count(*)

这种方式应该更节省内存。

请记住,子查询是按行执行的。由于子查询之前有 MATCH,我们每一行对应一部电影。

MATCH 和聚合发生在子查询内部,所以对于每个 collect(),它一次只考虑单个电影的路径。这意味着每个 collect() 只应用于 10 个输入行(因为每部电影 10 个演员),因此单行的结果会非常快地得到。

请注意,这是一种权衡:我们不是将单个 collect() 聚合应用于 100 万行,而是使用子查询在电影层面分解工作。因为有 100 万部电影,我们最终会进行 100 万次子查询调用,每个调用都执行自己的扩展和 collect(),所以总计 collect() 被调用了 100 万次,但每次只需要在极少量数据上运行。

我们可以为每个输入行执行子查询,在有限的作用域内进行聚合,输出结果,然后继续下一行。在该行执行期间使用的内存都有资格进行垃圾回收,不需要在处理后续行时一直保存在堆中。

首先让我们检查这个查询的计划。

subqueries for aggregations plan2

请注意,我们仍然看到 collect() 的积极聚合,但它输入到一个 Apply 操作中。这表明聚合的作用域仅限于它所应用的项目,即每个电影节点。

让我们试着运行它。我将省略实际的查询结果,因为我们知道那仍然是 100 万,但让我们看看耗时。

Started streaming 1 records after 1 ms and completed after 5542 ms.

重复运行会有一些差异,但通常在 4 到 6 秒之间。相比原始查询的 15 秒,这是一个不错的改进。

调试日志中的 GC 暂停情况如何?你的情况可能有所不同,但即使在多次重复查询执行后,我也没有看到记录任何 GC。

这表明对大量行同时进行聚合可能会非常消耗内存,而通过巧妙地应用子查询来缩小聚合作用域,往往可以避免这种情况及随之而来的 GC 暂停(前提是这样做对你的用例是正确的)。

模式推导类似于在子查询中调用的 collect()

模式推导 可以产生类似的效果,并且自 Neo4j 3.1 起就已提供。

MATCH (movie:Movie)
WITH movie, [(movie)<-[:ACTED_IN]-(p:Person) | p] as actors
RETURN count(*)

模式推导最类似于 OPTIONAL MATCH 后接 collect(),但与子查询类似,它们是按行执行的。甚至 EXPLAIN 计划也很相似。

subqueries for aggregations plan3

请注意,这次带有 collect() 积极聚合的执行线输入到了 ConditionalApply 中,它是 Apply 的变体,意味着右侧是在嵌套循环中执行的,这也是这些操作的作用域。

它的性能如何?

Started streaming 1 records after 1 ms and completed after 4539 ms.

重复运行结果在 4 到 6 秒之间,与子查询版本大致相同。同样,我们在调试日志中没有看到 GC。

因此,就效率而言,无论是在耗时还是内存方面,模式推导都与使用子查询大致相同。

虽然这比使用子查询更简洁,并且通常更通用(你可以在单个 WITH 子句中使用多个模式推导),但它们仅用于收集结果。虽然你可以获取结果列表的 size() 作为 count() 的等价物,但不能将其用于其他任何类型的聚合。

此外,模式推导目前不允许对列表结果进行排序、跳过(skip)或限制(limit),如果使用子查询,这些功能都可以自由使用。

APOC 过程可以替代子查询

如果你使用的不是 Neo4j 4.1.x 或更高版本,APOC 中有一些过程可以作为子查询达到同样的效果。

MATCH (movie:Movie)
CALL apoc.cypher.run("
    WITH movie
    MATCH (movie)<-[:ACTED_IN]-(p:Person)
    RETURN collect(p) as actors", {movie:movie}) YIELD value
WITH movie, value.actors as actors
RETURN count(*)

与子查询一样,过程是按行执行的,因此 collect() 聚合类似地仅作用于在特定调用中匹配到的行。

由于有 100 万部电影,总共会有 100 万次 apoc.cypher.run() 调用,每一次都执行自己的 MATCH 和小范围的 collect()。

我们将省略这个的计划,因为它不会显示任何有趣的内容。我们会看到一个过程调用操作,但由于查询的主要部分是查询字符串的形式,计划器无法对其进行评估,因此它不会显示在计划中。

我们可以单独运行复制/粘贴的查询字符串的 EXPLAIN(进行少量修改以使其能编译),但我们已经看过这样的计划(带有 collect() 聚合)。唯一的区别是,这个计划将被作为一个完全独立的事务进行计划和执行,其结果将产生给当前查询的事务。让我们看看它的表现。

Started streaming 1 records after 1 ms and completed after 136441 ms.

哇,这里发生了什么?耗时飙升到了 2 分钟左右。为什么会这样?

这个 APOC 过程会将查询创建并作为新事务执行,而原生子查询仍然在同一个事务内执行。这意味着我们实际上通过 APOC 执行了 100 万个独立的事务,这在设置和执行方面是有代价的。

如果这种方法在处理大量行时耗时如此昂贵,为什么我们还要考虑它?因为我们仍然在调试日志中没有看到 GC 暂停。

如果你的查询因为此类聚合而导致 GC 和堆内存耗尽问题,且你运行的版本过低无法使用原生子查询,且用例不允许使用模式推导,那么这种使用特定 APOC 过程的方法可能会让你避开 GC 和堆压力,但可能需要付出时间的代价。

像往常一样,请在自己的数据上进行计时测试。

© . This site is unofficial and not affiliated with Neo4j, Inc.