适用于 AuraDB 的 Aura 图分析
1. 前置条件
此笔记本要求您已有可用的 AuraDB 实例,并在项目中启用了 Aura Graph Analytics 功能。
你还需要安装 graphdatascience Python 库,版本为 1.15 或更高。
%pip install "graphdatascience>=1.15" python-dotenv "neo4j_viz[gds]"
from dotenv import load_dotenv
# This allows to load required secrets from `.env` file in local directory
# This can include Aura API Credentials and Database Credentials.
# If file does not exist this is a noop.
load_dotenv(".env")
2. Aura API 凭据
管理 GDS 会话的入口点是 GdsSessions 对象,该对象需要创建 Aura API 凭据。
import os
from graphdatascience.session import AuraAPICredentials, GdsSessions
# you can also use AuraAPICredentials.from_env() to load credentials from environment variables
api_credentials = AuraAPICredentials(
client_id=os.environ["CLIENT_ID"],
client_secret=os.environ["CLIENT_SECRET"],
# If your account is a member of several project, you must also specify the project ID to use
project_id=os.environ.get("PROJECT_ID", None),
)
sessions = GdsSessions(api_credentials=api_credentials)
3. 创建新会话
通过调用 sessions.get_or_create() 并传入以下参数来创建新会话:
-
会话名称,允许您通过再次调用
get_or_create重新连接到现有会话。 -
包含 AuraDB 实例地址、用户名和密码的
DbmsConnectionInfo -
会话内存大小。
-
云区域位置。
-
生存时间 (TTL),确保会话在设定的时间内未使用后自动删除,以避免产生额外费用。
有关参数的更多详细信息,请参阅 API 参考文档或手册。
from graphdatascience.session import AlgorithmCategory, SessionMemory
# Estimate the memory needed for the GDS session
memory = sessions.estimate(
node_count=20,
relationship_count=50,
algorithm_categories=[AlgorithmCategory.CENTRALITY, AlgorithmCategory.NODE_EMBEDDING],
)
print(f"Estimated memory for the session: {memory}")
# Explicitly define the size of the session
memory = SessionMemory.m_2GB
from datetime import timedelta
from graphdatascience.session import DbmsConnectionInfo
# Identify the AuraDB instance
# you can also use DbmsConnectionInfo.from_env() to load credentials from environment variables
db_connection = DbmsConnectionInfo(
username=os.environ["NEO4J_USERNAME"],
password=os.environ["NEO4J_PASSWORD"],
aura_instance_id=os.environ["AURA_INSTANCEID"],
)
# Create a GDS session!
gds = sessions.get_or_create(
# we give it a representative name
session_name="people_and_fruits",
memory=memory,
db_connection=db_connection,
ttl=timedelta(minutes=30),
)
# Verify the connectivity. Hints towards TLS or firewall issues if this fails directly after get_or_create
gds.verify_connectivity()
4. 列出会话
你可以使用 sessions.list() 查看每个已创建会话的详细信息。
from pandas import DataFrame
gds_sessions = sessions.list()
# for better visualization
DataFrame(gds_sessions)
5. 添加数据集
我们假设已配置的 AuraDB 实例是空的。我们将使用标准 Cypher 添加数据集。
在更真实的场景中,这一步已完成,我们只需连接到现有数据库。
data_query = """
CREATE
(dan:Person {name: 'Dan', age: 18, experience: 63, hipster: 0}),
(annie:Person {name: 'Annie', age: 12, experience: 5, hipster: 0}),
(matt:Person {name: 'Matt', age: 22, experience: 42, hipster: 0}),
(jeff:Person {name: 'Jeff', age: 51, experience: 12, hipster: 0}),
(brie:Person {name: 'Brie', age: 31, experience: 6, hipster: 0}),
(elsa:Person {name: 'Elsa', age: 65, experience: 23, hipster: 1}),
(john:Person {name: 'John', age: 4, experience: 100, hipster: 0}),
(apple:Fruit {name: 'Apple', tropical: 0, sourness: 0.3, sweetness: 0.6}),
(banana:Fruit {name: 'Banana', tropical: 1, sourness: 0.1, sweetness: 0.9}),
(mango:Fruit {name: 'Mango', tropical: 1, sourness: 0.3, sweetness: 1.0}),
(plum:Fruit {name: 'Plum', tropical: 0, sourness: 0.5, sweetness: 0.8})
CREATE
(dan)-[:LIKES]->(apple),
(annie)-[:LIKES]->(banana),
(matt)-[:LIKES]->(mango),
(jeff)-[:LIKES]->(mango),
(brie)-[:LIKES]->(banana),
(elsa)-[:LIKES]->(plum),
(john)-[:LIKES]->(plum),
(dan)-[:KNOWS]->(annie),
(dan)-[:KNOWS]->(matt),
(annie)-[:KNOWS]->(matt),
(annie)-[:KNOWS]->(jeff),
(annie)-[:KNOWS]->(brie),
(matt)-[:KNOWS]->(brie),
(brie)-[:KNOWS]->(elsa),
(brie)-[:KNOWS]->(jeff),
(john)-[:KNOWS]->(jeff);
"""
# making sure the database is actually empty
assert gds.run_cypher("MATCH (n) RETURN count(n)").squeeze() == 0, "Database is not empty!"
# let's now write our graph!
gds.run_cypher(data_query)
gds.run_cypher("MATCH (n) RETURN count(n) AS nodeCount")
6. 投影图
现在我们已经将图导入数据库,可以将其投影到我们的 GDS 会话中。我们通过使用 gds.graph.project() 接口来实现。
我们使用的远程投影查询会选择所有 Person 节点及其 LIKES 关系,以及所有 Fruit 节点及其 LIKES 关系。此外,为了演示,我们还投影了节点属性。我们可以将这些节点属性作为算法的输入,尽管本笔记本并未使用它们。
G, result = gds.graph.project(
"people-and-fruits",
"""
CALL () {
MATCH (p1:Person)
OPTIONAL MATCH (p1)-[r:KNOWS]->(p2:Person)
RETURN
p1 AS source, r AS rel, p2 AS target,
p1 {.age, .experience, .hipster } AS sourceNodeProperties,
p2 {.age, .experience, .hipster } AS targetNodeProperties
UNION
MATCH (f:Fruit)
OPTIONAL MATCH (f)<-[r:LIKES]-(p:Person)
RETURN
p AS source, r AS rel, f AS target,
p {.age, .experience, .hipster } AS sourceNodeProperties,
f { .tropical, .sourness, .sweetness } AS targetNodeProperties
}
RETURN gds.graph.project.remote(source, target, {
sourceNodeProperties: sourceNodeProperties,
targetNodeProperties: targetNodeProperties,
sourceNodeLabels: labels(source),
targetNodeLabels: labels(target),
relationshipType: type(rel)
})
""",
)
str(G)
# Let us visualize the projected graph
from neo4j_viz.gds import from_gds
VG = from_gds(gds, G, db_node_properties=["name"])
for node in VG.nodes:
node.caption = node.properties.get("name")
VG.render(initial_zoom=1.2)
7. 运行算法
你可以使用标准的 GDS Python 客户端 API 在已构建的图上运行算法。更多示例请参阅其他教程。
print("Running PageRank ...")
pr_result = gds.pageRank.mutate(G, mutateProperty="pagerank")
print(f"Compute millis: {pr_result['computeMillis']}")
print(f"Node properties written: {pr_result['nodePropertiesWritten']}")
print(f"Centrality distribution: {pr_result['centralityDistribution']}")
print("Running FastRP ...")
frp_result = gds.fastRP.mutate(
G,
mutateProperty="fastRP",
embeddingDimension=8,
featureProperties=["pagerank"],
propertyRatio=0.2,
nodeSelfInfluence=0.2,
)
print(f"Compute millis: {frp_result['computeMillis']}")
# stream back the results
gds.graph.nodeProperties.stream(G, ["pagerank", "fastRP"], separate_property_columns=True, db_node_properties=["name"])
8. 写回 AuraDB
GDS 会话的内存图是从指定的 AuraDB 实例中的数据投影而来。因此,写回操作会将数据持久化回同一 AuraDB。让我们将 PageRank 和 FastRP 算法的结果写回到 AuraDB 实例中。
# if this fails once with some error like "unable to retrieve routing table"
# then run it again. this is a transient error with a stale server cache.
gds.graph.nodeProperties.write(G, ["pagerank", "fastRP"])
当然,我们也可以使用 .write 模式。让我们以写入模式运行 Louvain 来演示。
gds.louvain.write(G, writeProperty="louvain")
现在我们可以使用 gds.run_cypher() 方法查询已更新的图。请注意,run_cypher() 方法将在 AuraDB 实例上执行查询。
gds.run_cypher(
"""
MATCH (p:Person)
RETURN p.name, p.pagerank AS rank, p.louvain
ORDER BY rank DESC
"""
)
9. 删除会话
既然我们已经完成分析,可以删除会话。我们产生的结果已经写回到 AuraDB 实例,不会丢失。如果我们有额外的计算结果未写回,则这些将会丢失。
删除会话将释放其所有关联资源,并停止产生费用。
sessions.delete(session_name="people_and_fruits")
# or gds.delete()
# let's also make sure the deleted session is truly gone:
sessions.list()
# Lastly, let's clean up the database
gds.run_cypher("MATCH (n:Person|Fruit) DETACH DELETE n")