利用 Neo4j GDS 进行数据科学
Neo4j 图数据科学 (GDS) 让数据科学家能够利用强大的图算法。它提供了无监督机器学习 (ML) 方法和启发式算法,用于学习和描述图的拓扑结构。GDS 包含具有企业级功能的稳健图算法,例如确保结果一致性的确定性种子设定,以及可重现的机器学习工作流。
GDS 算法分为五组
-
社区检测 (Community detection):用于检测组集群和分区选项。
-
中心性 (Centrality):用于计算图中节点的重要性。
-
拓扑链路预测 (Topological link prediction):用于估计节点间形成关系的概率。
-
相似性 (Similarity):用于评估节点对之间的相似度。
-
路径查找与搜索 (Path finding & search):用于查找最优路径、评估路由可用性等。
GDS 通过 Cypher 运行
所有 GDS 功能 均通过发出 Cypher® 查询来使用。因此,它可以轻松地通过 Spark 访问,因为 Neo4j Apache Spark 连接器可以发出 Cypher 查询并将结果读取回来。这种组合意味着您可以将 Neo4j 和 GDS 用作现有 Apache Spark 机器学习工作流中的图协处理器。
GDS 一等支持5.1 版本引入
Neo4j Spark 连接器为图数据科学库提供了“一等支持”,让我们看看它是如何工作的。
限制
我们不支持 mutate 或 write 过程模式,因为它们不会在 dataframe 中返回任何可用的信息。您可以通过连接 dataframe 来实现同样的效果,然后使用 Neo4j Spark 连接器将数据写回您想要的任何 Neo4j 实例。
示例
想象一下,我们想要复现此处 GDS 手册中详述的 Page Rank 算法示例。相关的 Spark 示例看起来如下(我们假设数据已存在于 Neo4j 中)。
-
创建投影图:以下 Python 代码将使用原生投影方式投影一个图,并将其以
myGraph的名称存储在图目录中。
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.graph.project")
.option("gds.graphName", "myGraph")
.option("gds.nodeProjection", "Page")
.option("gds.relationshipProjection", "LINKS")
.option("gds.configuration.relationshipProperties", "weight")
.load()
.show(false)
(
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.graph.project")
.option("gds.graphName", "myGraph")
.option("gds.nodeProjection", "Page")
.option("gds.relationshipProjection", "LINKS")
.option("gds.configuration.relationshipProperties", "weight")
.load()
.show(truncate=False)
)
这将显示如下结果
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection |relationshipProjection |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph |8 |14 |503 |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
-
估计算法成本:通过以下 Python 代码,我们将使用 estimate 过程来估计运行该算法的成本
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream.estimate")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
.show(false)
(
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
.show(truncate=False)
)
这将显示如下结果
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView |mapView |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n |-- this.instance: 88 Bytes\n |-- vote bits: 104 Bytes\n |-- compute steps: 208 Bytes\n |-- this.instance: 104 Bytes\n |-- node value: 120 Bytes\n |-- pagerank (DOUBLE): 120 Bytes\n |-- message arrays: 296 Bytes\n |-- this.instance: 56 Bytes\n |-- send array: 120 Bytes\n |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816 |816 |8 |14 |0.1 |0.1 |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
-
计算算法:以下 Python 代码将返回 Page Rank 计算结果,而无需修改图
val pr_df = spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
pr_df.show(false)
# We save the dataframe in the `pr_df` variable as we'll reuse it later
pr_df = (
spark.read.format("org.neo4j.spark.DataSource")
.option("gds", "gds.pageRank.stream")
.option("gds.graphName", "myGraph")
.option("gds.configuration.concurrency", "2")
.load()
)
pr_df.show(truncate=False)
这将显示如下结果
+------+------------------+
|nodeId|score |
+------+------------------+
|0 |3.215681999884452 |
|1 |1.0542700552146722|
|2 |1.0542700552146722|
|3 |1.0542700552146722|
|4 |0.3278578964488539|
|5 |0.3278578964488539|
|6 |0.3278578964488539|
|7 |0.3278578964488539|
+------+------------------+
正如您所见,我们现在只有 nodeId 和 score 两列,让我们看看如何使用该分数来丰富您的节点。
-
使用分数丰富节点:以下 Python 代码将使用该分数来丰富节点
# we'll assume that `spark` variable is already present
# we create the `nodes_df`
nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://:7687") \
.option("labels", "Page") \
.load()
# we join `nodes_df` with `pr_df` created in the step before
new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
new_df.show(truncate=False)
这将显示如下结果
+----+--------+-------+------+------------------+
|<id>|<labels>| name|nodeId| score|
+----+--------+-------+------+------------------+
| 0| [Page]| Home| 0| 3.215681999884452|
| 1| [Page]| About| 1|1.0542700552146722|
| 2| [Page]|Product| 2|1.0542700552146722|
| 3| [Page]| Links| 3|1.0542700552146722|
| 4| [Page]| Site A| 4|0.3278578964488539|
| 5| [Page]| Site B| 5|0.3278578964488539|
| 6| [Page]| Site C| 6|0.3278578964488539|
| 7| [Page]| Site D| 7|0.3278578964488539|
+----+--------+-------+------+------------------+
现在您可以将此数据集持久化到您想要的任何 Neo4j 实例中。
选项
正如您从上面的示例中所理解的,您可以使用 gds. 前缀并配合点符号支持嵌套映射来传递所有必需的选项。
| 设置名称 | 描述 | 默认值 | 必填 |
|---|---|---|---|
GDS 选项 |
|||
|
过程名称。您可以从 GDS 手册中的以下页面为您的用例选择最合适的算法 |
(无) |
是 |
|
设置名称仅仅是一个前缀,需要补充您所选过程的输入选项。 |
(无) |
是的,它与您选择的过程相关 |
如何在 Spark 作业中管理 gds. 前缀
例如,考虑您想要投影一个图。如下所示
---
CALL gds.graph.project(
'myGraph',
'Page',
'LINKS',
{
relationshipProperties: 'weight'
}
)
---
所以我们需要
-
来调用
gds.graph.project,这需要在我们的 Spark 作业中添加.option("gds", "gds.graph.project")。正如您在 ink:/docs/graph-data-science/current/management-ops/projections/graph-project/[此处^] 中看到的那样,project过程有 4 个输入参数-
graphName:我们想将图命名为myGraph;这需要添加.option("gds.graphName", "myGraph") -
nodeProjection:我们要投影Page节点;这需要添加.option("gds.nodeProjection", "Page") -
relationshipProjection:我们要投影LINKS关系;这需要添加.option("gds.relationshipProjection", "LINKS") -
configuration:我们想配置weight作为定义关系重要性的属性;configuration 是一个映射,我们需要在映射中添加一个relationshipProperties键,值为weight。我们可以通过点符号来实现,这需要添加.option("gds.configuration.relationshipProperties", "weight")
-
最终的 Spark 作业将如下所示
# we'll assume that `spark` variable is already present
spark.read.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://:7687") \
.option("gds", "gds.graph.project") \
.option("gds.graphName", "myGraph") \
.option("gds.nodeProjection", "Page") \
.option("gds.relationshipProjection", "LINKS") \
.option("gds.configuration.relationshipProperties", "weight") \
.load() \
.show(truncate=False)
通过 Cypher 查询支持 GDS
通过这种模式,您可以使用复杂的自定义查询来分析您的 GDS 数据。
示例
在 Zeppelin Notebook 示例仓库中,有一个可以针对 Neo4j Sandbox 运行的 GDS 示例,展示了如何将两者结合使用。
使用 Spark 在 GDS 中创建虚拟图
这是一个非常简单直观的代码;它构建了正确的 Cypher 语句以在 GDS 中创建虚拟图并返回结果。
%pyspark
query = """
CALL gds.graph.project('got-interactions', 'Person', {
INTERACTS: {
orientation: 'UNDIRECTED'
}
})
YIELD graphName, nodeCount, relationshipCount, projectMillis
RETURN graphName, nodeCount, relationshipCount, projectMillis
"""
df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", host) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", user) \
.option("authentication.basic.password", password) \
.option("query", query) \
.option("partitions", "1") \
.load()
| 如果您收到 A graph with name [name] already exists(名称为 [name] 的图已存在)错误,请查看此 FAQ。 |
确保 partitions 选项设置为 1。您不需要并行执行此查询,它应该只执行一次。
使用存储过程时,必须包含 RETURN 子句。
运行 GDS 分析并流式传输返回结果
以下示例展示了如何运行分析并将结果作为另一个 Cypher 查询获取,该查询作为 Spark 对 Neo4j 的读取操作执行。
%pyspark
query = """
CALL gds.pageRank.stream('got-interactions')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
"""
df = spark.read.format("org.neo4j.spark.DataSource") \
.option("url", host) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", user) \
.option("authentication.basic.password", password) \
.option("query", query) \
.option("partitions", "1") \
.load()
df.show()
确保 partitions 选项设置为 1。该算法应该只执行一次。
|
流式传输与持久化 GDS 结果的对比
在运行 GDS 算法时,该库允许您选择是将算法结果流式传输回调用者,还是修改底层图。将 GDS 与 Spark 一起使用提供了一个额外的选择,即转换或以其他方式使用 GDS 结果。最终,这两种方式都适用于 Neo4j Apache Spark 连接器,您可以根据用例选择最合适的一种。
如果您拥有的架构中 GDS 算法是在只读副本 (Read Replica) 或独立的单机实例上运行的,那么流式传输返回结果可能很方便(因为您无法将结果写入只读副本),然后使用连接器的写入功能获取该结果流,并将其写回 不同的 Neo4j 连接,例如常规的 Causal Cluster。