Aura 图分析
Aura 图分析是一个按需分配的临时计算环境,用于运行 GDS 工作负载。每个计算单元称为一个 GDS 会话 (GDS Session)。它是 Neo4j Aura 的一部分,Aura 是一个快速、可扩展、始终在线且完全自动化的云端图平台。
GDS 会话共有三种类型
-
Attached(已附加):数据源为 Neo4j AuraDB 实例。
-
Self-managed(自托管):数据源为自托管的 Neo4j DBMS。
-
Standalone(独立):数据源不基于 Neo4j。
将数据填充到会话的过程称为 远程投影 (remote projection)。一旦填充完毕,GDS 会话即可运行 GDS 工作负载,例如算法和机器学习模型。对于“已附加”和“自托管”类型,计算结果可以使用 远程回写 (remote write-back) 写回原始数据源。
1. GDS 会话管理
GdsSessions 对象是执行以下操作的 API 入口点
-
get_or_create:创建新的 GDS 会话,或连接到现有会话。 -
list:列出当前所有处于活动状态的 GDS 会话。 -
delete:删除一个 GDS 会话。
创建 GdsSessions 对象需要 Neo4j Aura API 凭据(CLIENT_ID 和 CLIENT_SECRET)。请参阅 Aura 文档,了解如何从您的 Neo4j Aura 账户创建 API 凭据。如果您的 Aura 用户属于多个项目,则还必须提供目标项目 ID。
from graphdatascience.session import GdsSessions, AuraAPICredentials
CLIENT_ID = "my-aura-api-client-id"
CLIENT_SECRET = "my-aura-api-client-secret"
PROJECT_ID = None
# Create a new GdsSessions object
sessions = GdsSessions(api_credentials=AuraAPICredentials(CLIENT_ID, CLIENT_SECRET, PROJECT_ID))
所有可用的方法和参数均在 API 参考文档 中列出。
1.1. 创建 GDS 会话
要创建 GDS 会话,请使用 get_or_create() 方法。如果会话不存在,它将创建一个新会话;如果已存在,则会连接到现有会话。如果提供的会话选项与现有会话不符,则会报错。
get_or_create() 的返回值是一个 AuraGraphDataScience 对象。它提供了与 GraphDataScience 对象类似的 API,但配置为在 GDS 会话上运行。按照惯例,请始终使用变量名 gds 来接收 get_or_create() 的返回值。
1.1.1. 会话过期与删除
创建会话时,可以配置一个可选的 ttl(生存时间)参数,用于设置会话在多久不活动后过期。ttl 的默认值为 1 小时,最大允许值为 7 天。已过期的会话无法运行工作负载,不产生费用,并将在 7 天后自动删除。它也可以通过 Aura 控制台 UI 进行删除。
1.1.3. 语法
sessions.get_or_create(
session_name: str,
memory: SessionMemory | SessionMemoryValue | str,
db_connection: Optional[DbmsConnectionInfo] = None,
ttl: Optional[timedelta] = None,
cloud_location: Optional[CloudLocation] = None,
aura_instance_id: Optional[str] = None,
timeout: Optional[int] = None,
neo4j_driver_options: Optional[dict[str, Any]] = None,
arrow_client_options: Optional[dict[str, Any]] = None,
): AuraGraphDataScience
| 名称 | 类型 | 可选 | 默认 | 描述 |
|---|---|---|---|---|
|
|
否 |
|
会话名称。在项目内必须唯一。 |
|
否 |
|
分配给会话的内存量。 |
|
|
是 |
|
Neo4j DBMS 的 Aura 实例 ID、用户名和密码。对于“已附加”和“自托管”类型是必需的。对于自托管类型,请提供 URI 而非实例 ID。除用户名和密码外,您也可以提供一个 |
|
|
|
是 |
|
会话的生存时间 (TTL)。 |
|
是 |
|
GDS 会话运行所在的 Aura 支持的云提供商和区域。对于“自托管”和“独立”类型是必需的。 |
|
|
|
是 |
|
等待会话进入“就绪 (Ready)”状态的秒数。如果超过该时间,将返回错误。 |
|
|
是 |
|
传递给 Neo4j DBMS 驱动程序的附加选项。仅在指定了 |
|
|
是 |
|
传递给用于连接会话的 Arrow Flight 客户端的附加选项。 |
1.1.4. 示例
from graphdatascience.session import DbmsConnectionInfo, SessionMemory
gds = sessions.get_or_create(
session_name="my-attached-session", # Must be unique within the project
memory=SessionMemory.m_4GB,
db_connection=DbmsConnectionInfo(
aura_instance_id="mydbid",
username="my-user",
password="my-password"
),
)
from graphdatascience.session import DbmsConnectionInfo, CloudLocation, SessionMemory
gds = sessions.get_or_create(
session_name="my-self-managed-session", # Must be unique within the project
memory=SessionMemory.m_4GB,
db_connection=DbmsConnectionInfo(
uri="neo4j://",
username="my-user",
password="my-password"
),
cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)
from graphdatascience.session import CloudLocation, SessionMemory
gds = sessions.get_or_create(
session_name="my-standalone-session", # Must be unique within the project
memory=SessionMemory.m_4GB,
cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)
1.2. 验证连接
要检查客户端与 Aura 图分析会话之间的连接设置,可以使用
gds.verify_connectivity()
1.2.1. 提供自定义 TLS 证书
如果连接失败,可能是由于 TLS 证书问题。首先,验证连接问题是否确实与 TLS 相关。
from graphdatascience.arrow_client.arrow_client_options_util import disable_server_verification
arrow_client_options = {}
disable_server_verification(arrow_client_options) # ONLY FOR TESTING
gds = sessions.get_or_create(
...,
arrow_client_options=arrow_client_options
)
gds.verify_connectivity()
如果此设置有效,您现在应该重新启用服务器验证,并传入自定义 TLS 证书
from graphdatascience.arrow_client.arrow_client_options_util import set_tls_root_certs
import certifi
arrow_client_options = {}
custom_tls = certifi.contents() # example certificates to use
set_tls_root_certs(arrow_client_options, custom_tls)
gds = sessions.get_or_create(
...,
arrow_client_options=arrow_client_options
)
gds.verify_connectivity()
1.4. 删除 GDS 会话
删除 GDS 会话将终止该会话,并停止任何进一步的累计费用。删除会话不会影响配置的 Neo4j 数据源。但是,任何未写回 Neo4j 实例的数据将会丢失。
如果您与会话有打开的连接
gds.delete()
使用 delete() 方法删除 GDS 会话。
sessions.delete(session_name="my-new-session")
1.5. 估算会话内存
为了帮助确定给定工作负载所需的会话大小,提供了 estimate() 函数。通过提供预期的图大小和打算使用的算法类别,它将返回估算的会话大小。
from graphdatascience.session import AlgorithmCategory
memory = sessions.estimate(
node_count=20,
relationship_count=50,
algorithm_categories=[AlgorithmCategory.CENTRALITY, AlgorithmCategory.NODE_EMBEDDING],
node_label_count=1,
node_property_count=1,
relationship_property_count=1
)
有关允许参数的详细说明,请参阅 API 参考。
2. 将图投影到 GDS 会话中
一旦拥有了 GDS 会话,就可以将图投影到其中。此操作称为 远程投影 (remote projection),因为数据源不是共存的数据库,而是远程数据库。
您可以使用 gds.graph.project() 端点创建一个远程投影,并指定图名称、Cypher 查询以及其他可选参数。Cypher 查询必须包含 gds.graph.project.remote() 函数,以便将图投影到 GDS 会话中。这仅适用于“已附加”和“自托管”会话。独立会话必须使用 graph.construct。
2.1. 语法
gds.graph.project(
graph_name: str,
query: str,
job_id: Optional[str] = None,
concurrency: int = 4,
undirected_relationship_types: Optional[list[str]] = None,
inverse_indexed_relationship_types: Optional[list[str]] = None,
batch_size: Optional[int] = None,
): (Graph, Series[Any])
| 名称 | 类型 | 可选 | 默认 | 描述 |
|---|---|---|---|---|
|
|
否 |
|
图的名称。 |
|
|
否 |
|
投影查询。 |
|
|
是 |
|
会话中该进程的关联 ID。如果未提供,将使用自动生成的 ID。 |
|
|
是 |
|
在会话中构建图时使用的并发度。 |
|
|
是 |
|
应被视为无向关系的类型名称列表。 |
|
|
是 |
|
应进行反向索引的关系类型名称列表。 |
|
|
是 |
|
从 DBMS 传输到会话的批次大小。 |
| 名称 | 类型 | 描述 |
|---|---|---|
|
表示投影图的图对象。 |
|
|
|
关于投影的统计数据。 |
可以使用 concurrency 和 batch_size 配置参数来调整远程投影的性能。
远程投影查询的并发度由 DBMS 服务器上的 Cypher 运行时控制。使用 CYPHER runtime=parallel 作为查询前缀以最大化性能。实际使用的并发度取决于 DBMS 服务器可用的处理器和当前的运行负载。 |
2.1.1. 远程投影查询语法
远程投影查询支持与 Cypher 投影相同的语法,但有两个主要区别
-
图名称不是参数。图名称通过
gds.graph.project()端点提供。 -
必须使用
gds.graph.project.remote()函数,而不是gds.graph.project()函数。
有关编写 Cypher 投影查询的完整详细信息和示例,请参阅 GDS 手册中的 Cypher 投影文档。
2.1.2. 关系类型无向性和反向索引
可选参数 undirectedRelationshipTypes 和 inverseIndexedRelationshipTypes 用于配置关系的无向性和反向索引。其行为与 GDS 手册 中的说明一致。
2.2. 示例
此示例展示了如何将图投影到 GDS 会话中。示例图是异构的,模拟了用户和产品。用户可以互相认识,也可以购买产品。
“已附加”和“自托管”示例使用 Cypher 查询来填充数据库数据。独立示例则使用 pandas DataFrame。
|
创建会话时,会话名称在项目内必须唯一。 |
import os # for reading environment variables
from graphdatascience.session import SessionMemory, DbmsConnectionInfo, GdsSessions, AuraAPICredentials
sessions = GdsSessions(api_credentials=AuraAPICredentials(os.environ["CLIENT_ID"], os.environ["CLIENT_SECRET"]))
# you can also use DbmsConnectionInfo.from_env() to load credentials from environment variables
db_connection = DbmsConnectionInfo(
uri=os.environ["NEO4J_URI"],
username=os.environ["NEO4J_USERNAME"],
password=os.environ["NEO4J_PASSWORD"],
aura_instance_id=os.environ["AURA_INSTANCEID"]
)
gds = sessions.get_or_create(
session_name="my-new-session", # Must be unique within the project
memory=SessionMemory.m_8GB,
db_connection=db_connection,
)
gds.run_cypher(
"""
CREATE
(u1:User {name: 'Mats'}),
(u2:User {name: 'Florentin'}),
(p1:Product {name: 'ice cream', cost: 4.2}),
(p2:Product {name: 'computer', cost: 13.37})
CREATE
(u1)-[:KNOWS {since: 2020}]->(u2),
(u2)-[:BOUGHT {price: 7474}]->(p1),
(u1)-[:BOUGHT {price: 1337}]->(p2)
"""
)
G, result = gds.graph.project(
graph_name="my-graph",
query="""
CALL () {
MATCH (u1:User)
OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
RETURN u1 AS source, r AS rel, u2 AS target, {} AS sourceNodeProperties, {} AS targetNodeProperties
UNION
MATCH (p:Product)
OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
}
RETURN gds.graph.project.remote(source, target, {
sourceNodeProperties: sourceNodeProperties,
targetNodeProperties: targetNodeProperties,
sourceNodeLabels: labels(source),
targetNodeLabels: labels(target),
relationshipType: type(rel),
relationshipProperties: properties(rel)
})
""",
)
import os # for reading environment variables
from graphdatascience.session import SessionMemory, DbmsConnectionInfo, GdsSessions, AuraAPICredentials, CloudLocation
sessions = GdsSessions(api_credentials=AuraAPICredentials(os.environ["CLIENT_ID"], os.environ["CLIENT_SECRET"]))
db_connection = DbmsConnectionInfo(os.environ["DB_URI"], os.environ["DB_USER"], os.environ["DB_PASSWORD"])
gds = sessions.get_or_create(
session_name="my-new-session", # Must be unique within the project
memory=SessionMemory.m_8GB,
db_connection=db_connection,
cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)
gds.run_cypher(
"""
CREATE
(u1:User {name: 'Mats'}),
(u2:User {name: 'Florentin'}),
(p1:Product {name: 'ice cream', cost: 4.2}),
(p2:Product {name: 'computer', cost: 13.37})
CREATE
(u1)-[:KNOWS {since: 2020}]->(u2),
(u2)-[:BOUGHT {price: 7474}]->(p1),
(u1)-[:BOUGHT {price: 1337}]->(p2)
"""
)
G, result = gds.graph.project(
graph_name="my-graph",
query="""
CALL () {
MATCH (u1:User)
OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
RETURN u1 AS source, r AS rel, u2 AS target, {} AS sourceNodeProperties, {} AS targetNodeProperties
UNION
MATCH (p:Product)
OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
}
RETURN gds.graph.project.remote(source, target, {
sourceNodeProperties: sourceNodeProperties,
targetNodeProperties: targetNodeProperties,
sourceNodeLabels: labels(source),
targetNodeLabels: labels(target),
relationshipType: type(rel),
relationshipProperties: properties(rel)
})
""",
)
from graphdatascience.session import CloudLocation, SessionMemory
gds = sessions.get_or_create(
session_name="my-standalone-session", # Must be unique within the project
memory=SessionMemory.m_4GB,
cloud_location=CloudLocation(provider="gcp", region="europe-west1"),
)
nodes = [pandas.DataFrame({
"nodeId": [0, 1],
"labels": ["Person", "Person"],
}), pandas.DataFrame({
"nodeId": [2, 3],
"labels": ["Product", "Product"],
"cost": [4.2, 13.37],
})
]
relationships = [pandas.DataFrame({
"sourceNodeId": [0],
"targetNodeId": [1],
"relationshipType": ["KNOWS"],
"since": [2020]
}), pandas.DataFrame({
"sourceNodeId": [0, 1],
"targetNodeId": [3, 2],
"relationshipType": ["BOUGHT", "BOUGHT"],
"price": [1337, 7474]
})
]
G = gds.graph.construct(
"my-graph",
nodes,
relationships
)
3. 运行算法
您可以像在任何投影图上一样,在远程投影图上运行算法。例如,您可以按以下方式在上一个示例的投影图上运行 PageRank 和 FastRP 算法
gds.pageRank.mutate(G, mutateProperty="pr")
gds.fastRP.mutate(G, featureProperties=["pr"], embeddingDimension=2, nodeSelfInfluence=0.1, mutateProperty="embedding")
# Stream the results back together with the `name` property fetched from the database
gds.graph.nodeProperties.stream(G, db_node_properties=["name"], node_properties=["pr", "embedding"])
有关可用算法的完整列表,请参阅 API 参考。
3.1. 限制
-
模型目录 (Model Catalog) 支持存在限制
-
训练好的模型只能在训练它们的同一个会话中使用。会话删除后,所有训练好的模型都将丢失。
-
不支持模型发布,包括
-
gds.model.publish
-
-
不支持模型持久化,包括
-
gds.model.store -
gds.model.load -
gds.model.delete
-
-
-
不支持拓扑链路预测算法,包括
-
gds.alpha.linkprediction.adamicAdar -
gds.alpha.linkprediction.commonNeighbors -
gds.alpha.linkprediction.preferentialAttachment -
gds.alpha.linkprediction.resourceAllocation -
gds.alpha.linkprediction.sameCommunity -
gds.alpha.linkprediction.totalNeighbors
-
4. 远程回写
在 GDS 会话中进行的计算结果的持久化取决于会话类型。“已附加”和“自托管”会话内置了将算法结果写回投影原始 Neo4j 数据库的支持。独立会话的用户必须将结果流式传输回客户端,并由用户自行将其持久化到目标系统中。本节将说明内置的远程回写功能。
默认情况下,回写将并发进行,每个批次一个事务。其行为受以下三个方面控制
-
数据集大小(例如节点数或关系数)
-
配置的批次大小
-
配置的并发度
4.1. 语法
“已附加”和“自托管”会话的远程回写语法相同。
gds.graph.<operation>.write(
graph_name: str,
# additional parameters,
**config: Any,
): Series[Any]
gds.<algo>.write(
graph_name: str,
**config: Any,
): Series[Any]
所有回写端点均支持以下附加配置
| 名称 | 可选 | 默认 | 描述 |
|---|---|---|---|
|
是 |
动态 [1] |
用于写回 DBMS 的并发度。 |
|
是 |
- |
包含从 DBMS 到 GDS Arrow 服务器连接的其他配置的字典。 |
1. DBMS 服务器处理器数量的两倍 |
|||
| 名称 | 可选 | 默认 | 描述 |
|---|---|---|---|
|
是 |
|
DBMS 从会话中检索的批次大小。 |
4.2. 示例
扩展上一个示例,我们可以按以下方式将 FastRP 嵌入写回 Neo4j 数据库
gds.graph.nodeProperties.write(G, "embedding")
如果我们想调整回写性能,可以配置 batchSize 和 concurrency。在此示例中,我们展示了如何配合算法 .write 模式执行此操作
gds.wcc.write(
G,
writeProperty="wcc",
concurrency=12,
arrowConfiguration={"batchSize": 25000}
)
5. 查询数据库
您可以使用 run_cypher() 方法在 Neo4j 数据库上运行 Cypher 查询。对可运行的查询类型没有限制,但请注意,查询是在 Neo4j 数据库上运行的,而不是在 GDS 会话上。
| 如果您想使用 Cypher 来操作 Aura 图分析,请使用 Cypher API。 |
gds.run_cypher("MATCH (n:User) RETURN n.name, n.embedding")