Aura 图分析

Aura 图分析是一个按需分配的临时计算环境,用于运行 GDS 工作负载。每个计算单元称为一个 GDS 会话 (GDS Session)。它是 Neo4j Aura 的一部分,Aura 是一个快速、可扩展、始终在线且完全自动化的云端图平台。

GDS 会话共有三种类型

  • Attached(已附加):数据源为 Neo4j AuraDB 实例。

  • Self-managed(自托管):数据源为自托管的 Neo4j DBMS。

  • Standalone(独立):数据源不基于 Neo4j。

将数据填充到会话的过程称为 远程投影 (remote projection)。一旦填充完毕,GDS 会话即可运行 GDS 工作负载,例如算法和机器学习模型。对于“已附加”和“自托管”类型,计算结果可以使用 远程回写 (remote write-back) 写回原始数据源。

如需直接运行的笔记本示例,请查看我们关于 GDS 会话的教程,适用于 AuraDB自托管数据库 以及 任何其他数据源

1. GDS 会话管理

GdsSessions 对象是执行以下操作的 API 入口点

  • get_or_create:创建新的 GDS 会话,或连接到现有会话。

  • list:列出当前所有处于活动状态的 GDS 会话。

  • delete:删除一个 GDS 会话。

创建 GdsSessions 对象需要 Neo4j Aura API 凭据(CLIENT_IDCLIENT_SECRET)。请参阅 Aura 文档,了解如何从您的 Neo4j Aura 账户创建 API 凭据。如果您的 Aura 用户属于多个项目,则还必须提供目标项目 ID。

创建 GdsSessions 对象
from graphdatascience.session import GdsSessions, AuraAPICredentials

CLIENT_ID = "my-aura-api-client-id"
CLIENT_SECRET = "my-aura-api-client-secret"
PROJECT_ID = None

# Create a new GdsSessions object
sessions = GdsSessions(api_credentials=AuraAPICredentials(CLIENT_ID, CLIENT_SECRET, PROJECT_ID))

所有可用的方法和参数均在 API 参考文档 中列出。

1.1. 创建 GDS 会话

要创建 GDS 会话,请使用 get_or_create() 方法。如果会话不存在,它将创建一个新会话;如果已存在,则会连接到现有会话。如果提供的会话选项与现有会话不符,则会报错。

get_or_create() 的返回值是一个 AuraGraphDataScience 对象。它提供了与 GraphDataScience 对象类似的 API,但配置为在 GDS 会话上运行。按照惯例,请始终使用变量名 gds 来接收 get_or_create() 的返回值。

1.1.1. 会话过期与删除

创建会话时,可以配置一个可选的 ttl(生存时间)参数,用于设置会话在多久不活动后过期。ttl 的默认值为 1 小时,最大允许值为 7 天。已过期的会话无法运行工作负载,不产生费用,并将在 7 天后自动删除。它也可以通过 Aura 控制台 UI 进行删除。

1.1.2. 最大生命周期

会话保持活动的时间最长不得超过 7 天。即使会话因活动而未过期,它也会在创建 7 天后强制过期。这是一个硬限制,无法更改。

1.1.3. 语法

sessions.get_or_create(
    session_name: str,
    memory: SessionMemory | SessionMemoryValue | str,
    db_connection: Optional[DbmsConnectionInfo] = None,
    ttl: Optional[timedelta] = None,
    cloud_location: Optional[CloudLocation] = None,
    aura_instance_id: Optional[str] = None,
    timeout: Optional[int] = None,
    neo4j_driver_options: Optional[dict[str, Any]] = None,
    arrow_client_options: Optional[dict[str, Any]] = None,
): AuraGraphDataScience
表 1. 参数
名称 类型 可选 默认 描述

session_name

str

-

会话名称。在项目内必须唯一。

memory

SessionMemory

-

分配给会话的内存量。

db_connection

DbmsConnectionInfo

None

Neo4j DBMS 的 Aura 实例 ID、用户名和密码。对于“已附加”和“自托管”类型是必需的。对于自托管类型,请提供 URI 而非实例 ID。除用户名和密码外,您也可以提供一个 neo4j.Auth 对象

TTL (存活时间)

datetime.timedelta

1h

会话的生存时间 (TTL)。

cloud_location

CloudLocation

None

GDS 会话运行所在的 Aura 支持的云提供商和区域。对于“自托管”和“独立”类型是必需的。

timeout

int

None

等待会话进入“就绪 (Ready)”状态的秒数。如果超过该时间,将返回错误。

neo4j_driver_options

dict[str, any]

None

传递给 Neo4j DBMS 驱动程序的附加选项。仅在指定了 db_connection 时相关。

arrow_client_options

dict[str, any]

None

传递给用于连接会话的 Arrow Flight 客户端的附加选项。

1.1.4. 示例

创建一个附加到 AuraDB 实例的 GDS 会话
from graphdatascience.session import DbmsConnectionInfo, SessionMemory

gds = sessions.get_or_create(
    session_name="my-attached-session", # Must be unique within the project
    memory=SessionMemory.m_4GB,
    db_connection=DbmsConnectionInfo(
        aura_instance_id="mydbid",
        username="my-user",
        password="my-password"
    ),
)
为自托管的 Neo4j DBMS 创建一个 GDS 会话
from graphdatascience.session import DbmsConnectionInfo, CloudLocation, SessionMemory

gds = sessions.get_or_create(
    session_name="my-self-managed-session", # Must be unique within the project
    memory=SessionMemory.m_4GB,
    db_connection=DbmsConnectionInfo(
        uri="neo4j://",
        username="my-user",
        password="my-password"
    ),
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)
创建一个没有任何 Neo4j 数据库的 GDS 会话
from graphdatascience.session import CloudLocation, SessionMemory

gds = sessions.get_or_create(
    session_name="my-standalone-session", # Must be unique within the project
    memory=SessionMemory.m_4GB,
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)

1.2. 验证连接

要检查客户端与 Aura 图分析会话之间的连接设置,可以使用

gds.verify_connectivity()

1.2.1. 提供自定义 TLS 证书

如果连接失败,可能是由于 TLS 证书问题。首先,验证连接问题是否确实与 TLS 相关。

在 TLS 期间暂时跳过服务器验证
from graphdatascience.arrow_client.arrow_client_options_util import disable_server_verification

arrow_client_options = {}
disable_server_verification(arrow_client_options) # ONLY FOR TESTING
gds = sessions.get_or_create(
    ...,
    arrow_client_options=arrow_client_options
)
gds.verify_connectivity()

如果此设置有效,您现在应该重新启用服务器验证,并传入自定义 TLS 证书

显式传递 TLS 证书
from graphdatascience.arrow_client.arrow_client_options_util import set_tls_root_certs
import certifi

arrow_client_options = {}
custom_tls = certifi.contents() # example certificates to use
set_tls_root_certs(arrow_client_options, custom_tls)
gds = sessions.get_or_create(
    ...,
    arrow_client_options=arrow_client_options
)
gds.verify_connectivity()

1.3. 列出 GDS 会话

list() 方法返回当前所有处于活动状态的 GDS 会话的名称和内存大小。

列出 GDS 会话
sessions.list()

1.4. 删除 GDS 会话

删除 GDS 会话将终止该会话,并停止任何进一步的累计费用。删除会话不会影响配置的 Neo4j 数据源。但是,任何未写回 Neo4j 实例的数据将会丢失。

如果您与会话有打开的连接

通过打开的客户端连接删除 GDS 会话
gds.delete()

使用 delete() 方法删除 GDS 会话。

通过 GdsSessions 对象删除 GDS 会话
sessions.delete(session_name="my-new-session")

1.5. 估算会话内存

为了帮助确定给定工作负载所需的会话大小,提供了 estimate() 函数。通过提供预期的图大小和打算使用的算法类别,它将返回估算的会话大小。

通过 GdsSessions 对象估算 GDS 会话的大小
from graphdatascience.session import AlgorithmCategory

memory = sessions.estimate(
    node_count=20,
    relationship_count=50,
    algorithm_categories=[AlgorithmCategory.CENTRALITY, AlgorithmCategory.NODE_EMBEDDING],
    node_label_count=1,
    node_property_count=1,
    relationship_property_count=1
)

有关允许参数的详细说明,请参阅 API 参考。

2. 将图投影到 GDS 会话中

一旦拥有了 GDS 会话,就可以将图投影到其中。此操作称为 远程投影 (remote projection),因为数据源不是共存的数据库,而是远程数据库。

您可以使用 gds.graph.project() 端点创建一个远程投影,并指定图名称、Cypher 查询以及其他可选参数。Cypher 查询必须包含 gds.graph.project.remote() 函数,以便将图投影到 GDS 会话中。这仅适用于“已附加”和“自托管”会话。独立会话必须使用 graph.construct

2.1. 语法

远程投影
gds.graph.project(
    graph_name: str,
    query: str,
    job_id: Optional[str] = None,
    concurrency: int = 4,
    undirected_relationship_types: Optional[list[str]] = None,
    inverse_indexed_relationship_types: Optional[list[str]] = None,
    batch_size: Optional[int] = None,
): (Graph, Series[Any])
表 2. 参数
名称 类型 可选 默认 描述

graph_name

str

-

图的名称。

query

str

-

投影查询。

job_id

str

None

会话中该进程的关联 ID。如果未提供,将使用自动生成的 ID。

concurrency

int

4

在会话中构建图时使用的并发度。

undirected_relationship_types

list[str]

[]

应被视为无向关系的类型名称列表。

inverse_indexed_relationship_types

list[str]

[]

应进行反向索引的关系类型名称列表。

batch_size

int

10000

从 DBMS 传输到会话的批次大小。

表 3. 结果
名称 类型 描述

graph(图)

Graph

表示投影图的图对象。

结果

Series[Any]

关于投影的统计数据。

可以使用 concurrencybatch_size 配置参数来调整远程投影的性能。

远程投影查询的并发度由 DBMS 服务器上的 Cypher 运行时控制。使用 CYPHER runtime=parallel 作为查询前缀以最大化性能。实际使用的并发度取决于 DBMS 服务器可用的处理器和当前的运行负载。

2.1.1. 远程投影查询语法

远程投影查询支持与 Cypher 投影相同的语法,但有两个主要区别

  1. 图名称不是参数。图名称通过 gds.graph.project() 端点提供。

  2. 必须使用 gds.graph.project.remote() 函数,而不是 gds.graph.project() 函数。

有关编写 Cypher 投影查询的完整详细信息和示例,请参阅 GDS 手册中的 Cypher 投影文档

2.1.2. 关系类型无向性和反向索引

可选参数 undirectedRelationshipTypesinverseIndexedRelationshipTypes 用于配置关系的无向性和反向索引。其行为与 GDS 手册 中的说明一致。

2.2. 示例

此示例展示了如何将图投影到 GDS 会话中。示例图是异构的,模拟了用户和产品。用户可以互相认识,也可以购买产品。

“已附加”和“自托管”示例使用 Cypher 查询来填充数据库数据。独立示例则使用 pandas DataFrame。

创建会话时,会话名称在项目内必须唯一。

在 Neo4j DBMS 中创建一些数据并将其投影到“已附加”GDS 会话中
import os # for reading environment variables
from graphdatascience.session import SessionMemory, DbmsConnectionInfo, GdsSessions, AuraAPICredentials

sessions = GdsSessions(api_credentials=AuraAPICredentials(os.environ["CLIENT_ID"], os.environ["CLIENT_SECRET"]))

# you can also use DbmsConnectionInfo.from_env() to load credentials from environment variables
db_connection = DbmsConnectionInfo(
    uri=os.environ["NEO4J_URI"],
    username=os.environ["NEO4J_USERNAME"],
    password=os.environ["NEO4J_PASSWORD"],
    aura_instance_id=os.environ["AURA_INSTANCEID"]
)

gds = sessions.get_or_create(
    session_name="my-new-session", # Must be unique within the project
    memory=SessionMemory.m_8GB,
    db_connection=db_connection,
)

gds.run_cypher(
    """
    CREATE
     (u1:User {name: 'Mats'}),
     (u2:User {name: 'Florentin'}),
     (p1:Product {name: 'ice cream', cost: 4.2}),
     (p2:Product {name: 'computer', cost: 13.37})

    CREATE
     (u1)-[:KNOWS {since: 2020}]->(u2),
     (u2)-[:BOUGHT {price: 7474}]->(p1),
     (u1)-[:BOUGHT {price: 1337}]->(p2)
    """
)

G, result = gds.graph.project(
    graph_name="my-graph",
    query="""
    CALL () {
        MATCH (u1:User)
        OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
        RETURN u1 AS source, r AS rel, u2 AS target, {} AS sourceNodeProperties, {} AS targetNodeProperties
        UNION
        MATCH (p:Product)
        OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
        RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
    }
    RETURN gds.graph.project.remote(source, target, {
      sourceNodeProperties: sourceNodeProperties,
      targetNodeProperties: targetNodeProperties,
      sourceNodeLabels: labels(source),
      targetNodeLabels: labels(target),
      relationshipType: type(rel),
      relationshipProperties: properties(rel)
    })
    """,
)
在 Neo4j DBMS 中创建一些数据并将其投影到“自托管”GDS 会话中
import os # for reading environment variables
from graphdatascience.session import SessionMemory, DbmsConnectionInfo, GdsSessions, AuraAPICredentials, CloudLocation

sessions = GdsSessions(api_credentials=AuraAPICredentials(os.environ["CLIENT_ID"], os.environ["CLIENT_SECRET"]))

db_connection = DbmsConnectionInfo(os.environ["DB_URI"], os.environ["DB_USER"], os.environ["DB_PASSWORD"])

gds = sessions.get_or_create(
    session_name="my-new-session", # Must be unique within the project
    memory=SessionMemory.m_8GB,
    db_connection=db_connection,
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)

gds.run_cypher(
    """
    CREATE
     (u1:User {name: 'Mats'}),
     (u2:User {name: 'Florentin'}),
     (p1:Product {name: 'ice cream', cost: 4.2}),
     (p2:Product {name: 'computer', cost: 13.37})

    CREATE
     (u1)-[:KNOWS {since: 2020}]->(u2),
     (u2)-[:BOUGHT {price: 7474}]->(p1),
     (u1)-[:BOUGHT {price: 1337}]->(p2)
    """
)

G, result = gds.graph.project(
    graph_name="my-graph",
    query="""
    CALL () {
        MATCH (u1:User)
        OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
        RETURN u1 AS source, r AS rel, u2 AS target, {} AS sourceNodeProperties, {} AS targetNodeProperties
        UNION
        MATCH (p:Product)
        OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
        RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
    }
    RETURN gds.graph.project.remote(source, target, {
      sourceNodeProperties: sourceNodeProperties,
      targetNodeProperties: targetNodeProperties,
      sourceNodeLabels: labels(source),
      targetNodeLabels: labels(target),
      relationshipType: type(rel),
      relationshipProperties: properties(rel)
    })
    """,
)
将一些数据投影到“独立”GDS 会话中
from graphdatascience.session import CloudLocation, SessionMemory

gds = sessions.get_or_create(
    session_name="my-standalone-session", # Must be unique within the project
    memory=SessionMemory.m_4GB,
    cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)

nodes = [pandas.DataFrame({
        "nodeId": [0, 1],
        "labels":  ["Person", "Person"],
    }), pandas.DataFrame({
        "nodeId": [2, 3],
        "labels":  ["Product", "Product"],
        "cost": [4.2, 13.37],
    })
]

relationships = [pandas.DataFrame({
        "sourceNodeId": [0],
        "targetNodeId": [1],
        "relationshipType": ["KNOWS"],
        "since": [2020]
    }), pandas.DataFrame({
        "sourceNodeId": [0, 1],
        "targetNodeId": [3, 2],
        "relationshipType": ["BOUGHT", "BOUGHT"],
        "price": [1337, 7474]
    })
]

G = gds.graph.construct(
    "my-graph",
    nodes,
    relationships
)

3. 运行算法

您可以像在任何投影图上一样,在远程投影图上运行算法。例如,您可以按以下方式在上一个示例的投影图上运行 PageRank 和 FastRP 算法

运行算法并流式返回结果
gds.pageRank.mutate(G, mutateProperty="pr")
gds.fastRP.mutate(G, featureProperties=["pr"], embeddingDimension=2, nodeSelfInfluence=0.1, mutateProperty="embedding")

# Stream the results back together with the `name` property fetched from the database
gds.graph.nodeProperties.stream(G, db_node_properties=["name"], node_properties=["pr", "embedding"])

有关可用算法的完整列表,请参阅 API 参考

3.1. 限制

  • 模型目录 (Model Catalog) 支持存在限制

    • 训练好的模型只能在训练它们的同一个会话中使用。会话删除后,所有训练好的模型都将丢失。

    • 不支持模型发布,包括

      • gds.model.publish

    • 不支持模型持久化,包括

      • gds.model.store

      • gds.model.load

      • gds.model.delete

  • 不支持拓扑链路预测算法,包括

    • gds.alpha.linkprediction.adamicAdar

    • gds.alpha.linkprediction.commonNeighbors

    • gds.alpha.linkprediction.preferentialAttachment

    • gds.alpha.linkprediction.resourceAllocation

    • gds.alpha.linkprediction.sameCommunity

    • gds.alpha.linkprediction.totalNeighbors

4. 远程回写

在 GDS 会话中进行的计算结果的持久化取决于会话类型。“已附加”和“自托管”会话内置了将算法结果写回投影原始 Neo4j 数据库的支持。独立会话的用户必须将结果流式传输回客户端,并由用户自行将其持久化到目标系统中。本节将说明内置的远程回写功能。

默认情况下,回写将并发进行,每个批次一个事务。其行为受以下三个方面控制

  • 数据集大小(例如节点数或关系数)

  • 配置的批次大小

  • 配置的并发度

4.1. 语法

“已附加”和“自托管”会话的远程回写语法相同。

远程图回写
gds.graph.<operation>.write(
    graph_name: str,
    # additional parameters,
    **config: Any,
): Series[Any]
远程算法回写
gds.<algo>.write(
    graph_name: str,
    **config: Any,
): Series[Any]

所有回写端点均支持以下附加配置

表 4. 参数
名称 可选 默认 描述

concurrency

动态 [1]

用于写回 DBMS 的并发度。

arrowConfiguration

-

包含从 DBMS 到 GDS Arrow 服务器连接的其他配置的字典。

1. DBMS 服务器处理器数量的两倍

表 5. Arrow 配置
名称 可选 默认 描述

batchSize

10000

DBMS 从会话中检索的批次大小。

4.2. 示例

扩展上一个示例,我们可以按以下方式将 FastRP 嵌入写回 Neo4j 数据库

将修改后的 FastRP 嵌入写回数据库
gds.graph.nodeProperties.write(G, "embedding")

如果我们想调整回写性能,可以配置 batchSizeconcurrency。在此示例中,我们展示了如何配合算法 .write 模式执行此操作

计算 WCC 并以自定义并发配置将组件 ID 写回为节点属性
gds.wcc.write(
  G,
  writeProperty="wcc",
  concurrency=12,
  arrowConfiguration={"batchSize": 25000}
)

5. 查询数据库

您可以使用 run_cypher() 方法在 Neo4j 数据库上运行 Cypher 查询。对可运行的查询类型没有限制,但请注意,查询是在 Neo4j 数据库上运行的,而不是在 GDS 会话上。

如果您想使用 Cypher 来操作 Aura 图分析,请使用 Cypher API
运行 Cypher 查询以查找我们回写的嵌入
gds.run_cypher("MATCH (n:User) RETURN n.name, n.embedding")