适用于自托管 Neo4j 数据库的 Aura 图分析

Open In Colab

此 Jupyter 笔记本托管在 此处,位于 Neo4j Graph Data Science Client 的 Github 仓库中。

该笔记本展示了如何使用 graphdatascience Python 库来创建、管理和使用 GDS 会话。

我们以人物和水果的图为例,演示如何将自托管的 Neo4j 数据库连接到 GDS 会话,运行算法,最终将分析结果写回 Neo4j 数据库。我们将覆盖所有管理操作:创建、列出和删除。

如果您使用 AuraDB,请参考 此示例

1. 前置条件

此笔记本需要可用的 Neo4j 实例,并且需要为您的 Neo4j Aura 项目启用 Aura 图分析 功能

你还需要安装 graphdatascience Python 库,版本为 1.15 或更高。

%pip install "graphdatascience>=1.15" python-dotenv "neo4j_viz[gds]"
from dotenv import load_dotenv

# This allows to load required secrets from `.env` file in local directory
# This can include Aura API Credentials. If file does not exist this is a noop.
load_dotenv(".env")

2. Aura API 凭据

管理 GDS 会话的入口点是 GdsSessions 对象,该对象需要创建 Aura API 凭据

import os

from graphdatascience.session import AuraAPICredentials, GdsSessions

# you can also use AuraAPICredentials.from_env() to load credentials from environment variables
api_credentials = AuraAPICredentials(
    client_id=os.environ["CLIENT_ID"],
    client_secret=os.environ["CLIENT_SECRET"],
    # If your account is a member of several project, you must also specify the project ID to use
    project_id=os.environ.get("PROJECT_ID", None),
)

sessions = GdsSessions(api_credentials=api_credentials)

3. 创建新会话

通过调用 sessions.get_or_create() 并传入以下参数来创建新会话:

  • 会话名称,允许您通过再次调用 get_or_create 重新连接到现有会话。

  • 包含地址、用户名和密码的 DbmsConnectionInfo,用于访问自托管的 Neo4j DBMS 实例

  • 会话内存大小。

  • 云区域位置。

  • 生存时间 (TTL),确保会话在设定的时间内未使用后自动删除,以避免产生额外费用。

有关参数的更多详细信息,请参阅 API 参考文档或手册。

from graphdatascience.session import AlgorithmCategory, CloudLocation, SessionMemory

# Estimate the memory needed for the GDS session
memory = sessions.estimate(
    node_count=20,
    relationship_count=50,
    algorithm_categories=[AlgorithmCategory.CENTRALITY, AlgorithmCategory.NODE_EMBEDDING],
)

# Explicitly define the size of the session
memory = SessionMemory.m_2GB

print(f"Estimated memory: {memory}")

# Specify your cloud location
cloud_location = CloudLocation("gcp", "europe-west1")

# You can find available cloud locations by calling
cloud_locations = sessions.available_cloud_locations()
print(f"Available locations: {cloud_locations}")
import os
from datetime import timedelta

from graphdatascience.session import DbmsConnectionInfo

# Identify the AuraDB instance
# you can also use DbmsConnectionInfo.from_env() to load credentials from environment variables
db_connection = DbmsConnectionInfo(
    uri=os.environ["NEO4J_URI"],
    username=os.environ["NEO4J_USERNAME"],
    password=os.environ["NEO4J_PASSWORD"],
)

# Create a GDS session!
gds = sessions.get_or_create(
    # give it a representative name
    session_name="people-and-fruits-sm",
    memory=memory,
    db_connection=db_connection,
    ttl=timedelta(minutes=30),
    cloud_location=cloud_location,
)
# Verify the connectivity. Hints towards TLS or firewall issues if this fails directly after get_or_create
gds.verify_connectivity()

4. 列出会话

你可以使用 sessions.list() 查看每个已创建会话的详细信息。

from pandas import DataFrame

gds_sessions = sessions.list()

# for better visualization
DataFrame(gds_sessions)

5. 添加数据集

我们假设已配置的 Neo4j 数据库实例为空。您将使用标准 Cypher 添加数据集。

在更实际的场景中,此步骤已经完成,您只需连接到已有的数据库即可。

data_query = """
  CREATE
    (dan:Person {name: 'Dan',     age: 18, experience: 63, hipster: 0}),
    (annie:Person {name: 'Annie', age: 12, experience: 5, hipster: 0}),
    (matt:Person {name: 'Matt',   age: 22, experience: 42, hipster: 0}),
    (jeff:Person {name: 'Jeff',   age: 51, experience: 12, hipster: 0}),
    (brie:Person {name: 'Brie',   age: 31, experience: 6, hipster: 0}),
    (elsa:Person {name: 'Elsa',   age: 65, experience: 23, hipster: 1}),
    (john:Person {name: 'John',   age: 4, experience: 100, hipster: 0}),

    (apple:Fruit {name: 'Apple',   tropical: 0, sourness: 0.3, sweetness: 0.6}),
    (banana:Fruit {name: 'Banana', tropical: 1, sourness: 0.1, sweetness: 0.9}),
    (mango:Fruit {name: 'Mango',   tropical: 1, sourness: 0.3, sweetness: 1.0}),
    (plum:Fruit {name: 'Plum',     tropical: 0, sourness: 0.5, sweetness: 0.8})

  CREATE
    (dan)-[:LIKES]->(apple),
    (annie)-[:LIKES]->(banana),
    (matt)-[:LIKES]->(mango),
    (jeff)-[:LIKES]->(mango),
    (brie)-[:LIKES]->(banana),
    (elsa)-[:LIKES]->(plum),
    (john)-[:LIKES]->(plum),

    (dan)-[:KNOWS]->(annie),
    (dan)-[:KNOWS]->(matt),
    (annie)-[:KNOWS]->(matt),
    (annie)-[:KNOWS]->(jeff),
    (annie)-[:KNOWS]->(brie),
    (matt)-[:KNOWS]->(brie),
    (brie)-[:KNOWS]->(elsa),
    (brie)-[:KNOWS]->(jeff),
    (john)-[:KNOWS]->(jeff);
"""

# making sure the database is actually empty
assert gds.run_cypher("MATCH (n) RETURN count(n)").squeeze() == 0, "Database is not empty!"

# let's now write our graph!
gds.run_cypher(data_query)

gds.run_cypher("MATCH (n) RETURN count(n) AS nodeCount")

6. 投影图

现在您已经将图导入数据库,可以使用 gds.graph.project() 接口将其投影到 GDS 会话中。

您使用的远程投影查询会选取所有 Person 节点及其 LIKES 关系,以及所有 Fruit 节点及其 LIKES 关系。此外,还会投影节点属性以作示例。您可以将这些节点属性作为算法的输入,尽管本笔记本中并未使用它们。

G, result = gds.graph.project(
    "people-and-fruits",
    """
    CALL () {
        MATCH (p1:Person)
        OPTIONAL MATCH (p1)-[r:KNOWS]->(p2:Person)
        RETURN
          p1 AS source, r AS rel, p2 AS target,
          p1 {.age, .experience, .hipster } AS sourceNodeProperties,
          p2 {.age, .experience, .hipster } AS targetNodeProperties
        UNION
        MATCH (f:Fruit)
        OPTIONAL MATCH (f)<-[r:LIKES]-(p:Person)
        RETURN
          p AS source, r AS rel, f AS target,
          p {.age, .experience, .hipster } AS sourceNodeProperties,
          f { .tropical, .sourness, .sweetness } AS targetNodeProperties
    }
    RETURN gds.graph.project.remote(source, target, {
      sourceNodeProperties: sourceNodeProperties,
      targetNodeProperties: targetNodeProperties,
      sourceNodeLabels: labels(source),
      targetNodeLabels: labels(target),
      relationshipType: type(rel)
    })
    """,
)

str(G)
# Let us visualize the projected graph
from neo4j_viz.gds import from_gds

VG = from_gds(gds, G, db_node_properties=["name"])
for node in VG.nodes:
    node.caption = node.properties.get("name")

VG.render(initial_zoom=1.2)

7. 运行算法

你可以使用标准的 GDS Python 客户端 API 在已构建的图上运行算法。更多示例请参阅其他教程。

print("Running PageRank ...")
pr_result = gds.pageRank.mutate(G, mutateProperty="pagerank")
print(f"Compute millis: {pr_result['computeMillis']}")
print(f"Node properties written: {pr_result['nodePropertiesWritten']}")
print(f"Centrality distribution: {pr_result['centralityDistribution']}")

print("Running FastRP ...")
frp_result = gds.fastRP.mutate(
    G,
    mutateProperty="fastRP",
    embeddingDimension=8,
    featureProperties=["pagerank"],
    propertyRatio=0.2,
    nodeSelfInfluence=0.2,
)
print(f"Compute millis: {frp_result['computeMillis']}")
# stream back the results
gds.graph.nodeProperties.stream(G, ["pagerank", "fastRP"], separate_property_columns=True, db_node_properties=["name"])

8. 写回 Neo4j

GDS 会话的内存图是从指定的 Neo4j 数据库中的数据投影而来。写回操作会将数据持久化回同一 Neo4j 数据库。现在让我们将 PageRank 和 FastRP 算法的结果写回 Neo4j 数据库。

# if this fails once with some error like "unable to retrieve routing table"
# then run it again. this is a transient error with a stale server cache.
gds.graph.nodeProperties.write(G, ["pagerank", "fastRP"])

当然,您也可以直接使用 .write 模式。下面演示在写入模式下运行 Louvain。

gds.louvain.write(G, writeProperty="louvain")

现在您可以使用 gds.run_cypher() 方法查询已更新的图。请注意,run_cypher() 方法会在 Neo4j 数据库上执行查询。

gds.run_cypher(
    """
    MATCH (p:Person)
    RETURN p.name, p.pagerank AS rank, p.louvain
     ORDER BY rank DESC
    """
)

9. 删除会话

分析完成后,您可以删除会话。您已写回 Neo4j 数据库的结果不会丢失。若有未写回的额外计算结果,则会丢失。

删除会话将释放其所有关联资源,并停止产生费用。

# or gds.delete()
sessions.delete(session_name="people-and-fruits-sm")
# let's also make sure the deleted session is truly gone:
sessions.list()
# Lastly, let's clean up the database
gds.run_cypher("MATCH (n:Person|Fruit) DETACH DELETE n")