利用 Neo4j GDS 进行数据科学

Neo4j 图数据科学 (GDS) 让数据科学家能够利用强大的图算法。它提供了无监督机器学习 (ML) 方法和启发式算法,用于学习和描述图的拓扑结构。GDS 包含具有企业级功能的稳健图算法,例如确保结果一致性的确定性种子设定,以及可重现的机器学习工作流。

GDS 算法分为五组

  • 社区检测 (Community detection):用于检测组集群和分区选项。

  • 中心性 (Centrality):用于计算图中节点的重要性。

  • 拓扑链路预测 (Topological link prediction):用于估计节点间形成关系的概率。

  • 相似性 (Similarity):用于评估节点对之间的相似度。

  • 路径查找与搜索 (Path finding & search):用于查找最优路径、评估路由可用性等。

GDS 通过 Cypher 运行

所有 GDS 功能 均通过发出 Cypher® 查询来使用。因此,它可以轻松地通过 Spark 访问,因为 Neo4j Apache Spark 连接器可以发出 Cypher 查询并将结果读取回来。这种组合意味着您可以将 Neo4j 和 GDS 用作现有 Apache Spark 机器学习工作流中的图协处理器。

GDS 一等支持

Neo4j Spark 连接器为图数据科学库提供了“一等支持”,让我们看看它是如何工作的。

限制

我们不支持 mutatewrite 过程模式,因为它们不会在 dataframe 中返回任何可用的信息。您可以通过连接 dataframe 来实现同样的效果,然后使用 Neo4j Spark 连接器将数据写回您想要的任何 Neo4j 实例。

示例

想象一下,我们想要复现此处 GDS 手册中详述的 Page Rank 算法示例。相关的 Spark 示例看起来如下(我们假设数据已存在于 Neo4j 中)。

  1. 创建投影图:以下 Python 代码将使用原生投影方式投影一个图,并将其以 myGraph 的名称存储在图目录中。

spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.graph.project")
  .option("gds.graphName", "myGraph")
  .option("gds.nodeProjection", "Page")
  .option("gds.relationshipProjection", "LINKS")
  .option("gds.configuration.relationshipProperties", "weight")
  .load()
  .show(false)
(
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.graph.project")
  .option("gds.graphName", "myGraph")
  .option("gds.nodeProjection", "Page")
  .option("gds.relationshipProjection", "LINKS")
  .option("gds.configuration.relationshipProperties", "weight")
  .load()
  .show(truncate=False)
)

这将显示如下结果

+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection                            |relationshipProjection                                                                                                                                                                    |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph  |8        |14               |503          |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
  1. 估计算法成本:通过以下 Python 代码,我们将使用 estimate 过程来估计运行该算法的成本

spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream.estimate")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
  .show(false)
(
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
  .show(truncate=False)
)

这将显示如下结果

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView                                                                                                                                                                                                                                                                                                                                                                                                                 |mapView                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes     |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n    |-- this.instance: 88 Bytes\n    |-- vote bits: 104 Bytes\n    |-- compute steps: 208 Bytes\n        |-- this.instance: 104 Bytes\n    |-- node value: 120 Bytes\n        |-- pagerank (DOUBLE): 120 Bytes\n    |-- message arrays: 296 Bytes\n        |-- this.instance: 56 Bytes\n        |-- send array: 120 Bytes\n        |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816     |816     |8        |14               |0.1              |0.1              |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
  1. 计算算法:以下 Python 代码将返回 Page Rank 计算结果,而无需修改图

val pr_df = spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()

pr_df.show(false)
# We save the dataframe in the `pr_df` variable as we'll reuse it later
pr_df = (
  spark.read.format("org.neo4j.spark.DataSource")
  .option("gds", "gds.pageRank.stream")
  .option("gds.graphName", "myGraph")
  .option("gds.configuration.concurrency", "2")
  .load()
)

pr_df.show(truncate=False)

这将显示如下结果

+------+------------------+
|nodeId|score             |
+------+------------------+
|0     |3.215681999884452 |
|1     |1.0542700552146722|
|2     |1.0542700552146722|
|3     |1.0542700552146722|
|4     |0.3278578964488539|
|5     |0.3278578964488539|
|6     |0.3278578964488539|
|7     |0.3278578964488539|
+------+------------------+

正如您所见,我们现在只有 nodeIdscore 两列,让我们看看如何使用该分数来丰富您的节点。

  1. 使用分数丰富节点:以下 Python 代码将使用该分数来丰富节点

    # we'll assume that `spark` variable is already present
    # we create the `nodes_df`
    nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://:7687") \
      .option("labels", "Page") \
      .load()

    # we join `nodes_df` with `pr_df` created in the step before
    new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
    new_df.show(truncate=False)

这将显示如下结果

+----+--------+-------+------+------------------+
|<id>|<labels>|   name|nodeId|             score|
+----+--------+-------+------+------------------+
|   0|  [Page]|   Home|     0| 3.215681999884452|
|   1|  [Page]|  About|     1|1.0542700552146722|
|   2|  [Page]|Product|     2|1.0542700552146722|
|   3|  [Page]|  Links|     3|1.0542700552146722|
|   4|  [Page]| Site A|     4|0.3278578964488539|
|   5|  [Page]| Site B|     5|0.3278578964488539|
|   6|  [Page]| Site C|     6|0.3278578964488539|
|   7|  [Page]| Site D|     7|0.3278578964488539|
+----+--------+-------+------+------------------+

现在您可以将此数据集持久化到您想要的任何 Neo4j 实例中。

选项

正如您从上面的示例中所理解的,您可以使用 gds. 前缀并配合点符号支持嵌套映射来传递所有必需的选项。

表 1. 可用配置设置列表
设置名称 描述 默认值 必填

GDS 选项

gds

过程名称。您可以从 GDS 手册中的以下页面为您的用例选择最合适的算法

(无)

gds.

设置名称仅仅是一个前缀,需要补充您所选过程的输入选项。

(无)

是的,它与您选择的过程相关

如何在 Spark 作业中管理 gds. 前缀

例如,考虑您想要投影一个图。如下所示

---
CALL gds.graph.project(
  'myGraph',
  'Page',
  'LINKS',
  {
    relationshipProperties: 'weight'
  }
)
---

所以我们需要

  • 来调用 gds.graph.project,这需要在我们的 Spark 作业中添加 .option("gds", "gds.graph.project")。正如您在 ink:/docs/graph-data-science/current/management-ops/projections/graph-project/[此处^] 中看到的那样,project 过程有 4 个输入参数

    • graphName:我们想将图命名为 myGraph;这需要添加 .option("gds.graphName", "myGraph")

    • nodeProjection:我们要投影 Page 节点;这需要添加 .option("gds.nodeProjection", "Page")

    • relationshipProjection:我们要投影 LINKS 关系;这需要添加 .option("gds.relationshipProjection", "LINKS")

    • configuration:我们想配置 weight 作为定义关系重要性的属性;configuration 是一个映射,我们需要在映射中添加一个 relationshipProperties 键,值为 weight。我们可以通过点符号来实现,这需要添加 .option("gds.configuration.relationshipProperties", "weight")

最终的 Spark 作业将如下所示

    # we'll assume that `spark` variable is already present
    spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://:7687") \
      .option("gds", "gds.graph.project") \
      .option("gds.graphName", "myGraph") \
      .option("gds.nodeProjection", "Page") \
      .option("gds.relationshipProjection", "LINKS") \
      .option("gds.configuration.relationshipProperties", "weight") \
      .load() \
      .show(truncate=False)

通过 Cypher 查询支持 GDS

通过这种模式,您可以使用复杂的自定义查询来分析您的 GDS 数据。

示例

Zeppelin Notebook 示例仓库中,有一个可以针对 Neo4j Sandbox 运行的 GDS 示例,展示了如何将两者结合使用。

使用 Spark 在 GDS 中创建虚拟图

这是一个非常简单直观的代码;它构建了正确的 Cypher 语句以在 GDS 中创建虚拟图并返回结果。

%pyspark
query = """
    CALL gds.graph.project('got-interactions', 'Person', {
      INTERACTS: {
        orientation: 'UNDIRECTED'
      }
    })
    YIELD graphName, nodeCount, relationshipCount, projectMillis
    RETURN graphName, nodeCount, relationshipCount, projectMillis
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()
如果您收到 A graph with name [name] already exists(名称为 [name] 的图已存在)错误,请查看此 FAQ

确保 partitions 选项设置为 1。您不需要并行执行此查询,它应该只执行一次。

使用存储过程时,必须包含 RETURN 子句。

运行 GDS 分析并流式传输返回结果

以下示例展示了如何运行分析并将结果作为另一个 Cypher 查询获取,该查询作为 Spark 对 Neo4j 的读取操作执行。

%pyspark

query = """
    CALL gds.pageRank.stream('got-interactions')
    YIELD nodeId, score
    RETURN gds.util.asNode(nodeId).name AS name, score
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()

df.show()
确保 partitions 选项设置为 1。该算法应该只执行一次。

流式传输与持久化 GDS 结果的对比

运行 GDS 算法时,该库允许您选择是将算法结果流式传输回调用者,还是修改底层图。将 GDS 与 Spark 一起使用提供了一个额外的选择,即转换或以其他方式使用 GDS 结果。最终,这两种方式都适用于 Neo4j Apache Spark 连接器,您可以根据用例选择最合适的一种。

如果您拥有的架构中 GDS 算法是在只读副本 (Read Replica) 或独立的单机实例上运行的,那么流式传输返回结果可能很方便(因为您无法将结果写入只读副本),然后使用连接器的写入功能获取该结果流,并将其写回 不同的 Neo4j 连接,例如常规的 Causal Cluster。