机器学习管道:节点分类

Open In Colab

此 Jupyter 笔记本托管在 Neo4j Graph Data Science Client Github 仓库中。

本笔记本展示了如何结合 Python 客户端和著名的 Cora 数据集使用 GDS 机器学习管道。

我们在此涵盖的任务是图机器学习中的一个典型用例:在给定图和一些节点特征的情况下对节点进行分类。

1. 设置

我们需要一个可使用 Neo4j 和 GDS 的专用环境,例如全新的 AuraDS 实例(预装 GDS)或带有专用数据库的 Neo4j Desktop。

请注意,我们将执行向 Neo4j 写入和删除数据的操作。

一旦获得了访问该环境的凭据,我们就可以安装 graphdatascience 包并导入客户端类。

%pip install graphdatascience
import os

from graphdatascience import GraphDataScience

使用本地 Neo4j 设置时,默认连接 URI 为 bolt://:7687;而使用 AuraDS 时,连接 URI 略有不同,因为它使用 neo4j+s 协议。在这种情况下,客户端还应包含 aura_ds=True 标志以启用 AuraDS 推荐的设置。有关更多详细信息,请查看 Neo4j GDS 客户端文档

# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
    NEO4J_AUTH = (
        os.environ.get("NEO4J_USER"),
        os.environ.get("NEO4J_PASSWORD"),
    )
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)

# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)

我们还需要检查 GDS 库的版本是否为 2.5.0 或更新版本。

from graphdatascience import ServerVersion

assert gds.server_version() >= ServerVersion(2, 5, 0)

最后,我们导入 json 以辅助编写用于加载数据的 Cypher 查询,并导入 numpypandas 以进行后续的数据处理。

import json

import numpy as np
import pandas as pd

2. 加载 Cora 数据集

首先,我们需要将 Cora 数据集加载到 Neo4j 中。最新版本的 GDS 客户端将 Cora 数据集作为可直接使用的图包含在内(例如参见 PyG 示例笔记本);或者,图构建笔记本展示了如何在不写入 Neo4j 的情况下将 Cora 图投影到内存中。无论如何,在本教程中,我们使用 CSV 文件中的数据和一些 Cypher 代码来运行一个端到端的示例,从将源数据加载到 Neo4j 到训练模型并使用它进行预测。

请注意,如果您在 AuraDS 实例上使用 Cora 图加载器或图构建方法,则无法将数据写入 Neo4j 数据库。

CSV 文件可在以下 URI 中找到

CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"

加载后,我们需要执行一个额外的预处理步骤,将 subject 字段(数据集中的字符串)转换为整数,因为节点属性必须是数值才能投影到图中;虽然我们可以分配连续的 ID,但我们将第一个主题的 ID 指定为非 0 的值,以便稍后展示模型中类标签是如何表示的。

我们还选择了一些节点作为保留集,以便在模型训练完成后对其进行测试。注意:这与算法的测试/分割比例无关。

SUBJECT_TO_ID = {
    "Neural_Networks": 100,
    "Rule_Learning": 1,
    "Reinforcement_Learning": 2,
    "Probabilistic_Methods": 3,
    "Theory": 4,
    "Genetic_Algorithms": 5,
    "Case_Based": 6,
}

HOLDOUT_NODES = 10

现在,我们可以使用 LOAD CSV Cypher 语句和一些基础的数据转换来加载 CSV 文件

# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")

# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
    LOAD CSV FROM "{CORA_CONTENT}" AS row
    WITH
      {subject_map} AS subject_to_id,
      toInteger(row[0]) AS extId,
      row[1] AS subject,
      toIntegerList(row[2..]) AS features
    MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
    WITH p LIMIT {HOLDOUT_NODES}
    REMOVE p:Paper
    SET p:UnclassifiedPaper
"""

# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
    LOAD CSV FROM "{CORA_CITES}" AS row
    MATCH (n), (m)
    WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
    MERGE (n)-[:CITES]->(m)
"""

# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)

数据加载到 Neo4j 后,我们现在可以投影一个包含所有节点和 CITES 关系的图,将其作为无向图(并使用 SINGLE 聚合来跳过因添加反向方向而导致的重复关系)。

# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
    "cora-graph",
    {"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
    {"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)

最后,我们可以检查新投影图中节点和关系的数量,以确保它已正确创建

assert G.node_count() == 2708
assert G.relationship_count() == 10556

3. 管道目录基础知识

数据集加载完成后,我们可以定义一个节点分类机器学习管道。

# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")

我们可以使用 list 方法检查管道是否确实已创建

# List all pipelines
gds.pipeline.list()

# Alternatively, get the details of a specific pipeline object
gds.pipeline.list(node_pipeline)

4. 配置管道

我们现在可以配置管道。提醒一下,我们需要:

  1. 选择一部分可用的节点属性作为机器学习模型的特征

  2. 配置训练/测试集划分和 k 折交叉验证的折数 (可选)

  3. 配置用于训练的候选模型

  4. 配置自动调优 (可选) 在此示例中,我们使用逻辑回归作为训练的候选模型,但也提供其他算法(如随机森林)。我们还设置了一些合理的起始参数,这些参数可以根据所需的指标进行进一步调整。

某些超参数(如 penalty)可以是单个值或范围。如果它们表示为范围,则使用自动调优来搜索其最佳值。

configureAutoTuning 方法可用于设置要尝试的模型候选数量。在这里我们选择 5,以保持较短的训练时间。

# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])

# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)

# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))

# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)

5. 训练管道

配置好的管道现在可以进行模型选择和训练了。我们还运行一次训练评估,以确保有足够的资源来运行后续的实际训练。

节点分类模型支持多种评估指标。在这里,我们使用全局指标 F1_WEIGHTED

注意:为了演示目的,concurrency 参数被明确设置为 4(默认值)。对于 Neo4j 社区版,库中的最大并发数限制为 4。

# Estimate the resources needed for training the model
node_pipeline.train_estimate(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

我们可以检查训练结果,例如打印已训练模型的评估指标。

# Uncomment to print all stats
# print(stats.to_json(indent=2))

# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]

6. 使用模型进行预测

训练完成后,模型即可用于对未分类的数据进行分类。

使用 predict 模式的一种简单方法是直接流式传输预测结果。当图非常大时,这可能不切实际,因此应仅用于实验目的。

predicted = model.predict_stream(
    G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)

预测的结果是一个 Pandas DataFrame,其中包含每个节点的预测类以及所有类的预测概率。

predicted

predictedProbabilities 字段中类的顺序在模型信息中给出,可用于检索预测类的预测概率。

请注意,类出现在 predictedProbabilities 字段中的顺序在某种程度上是随意的,因此获取每个概率的正确方法是通过从模型获得的类索引,而不是其位置。

# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)

# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
    lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)

predicted

7. 添加数据预处理步骤

通过添加更多特征或完全使用不同的特征,可以提高模型质量。一种方法是使用 FastRP 等算法,这些算法基于节点属性和图特征创建嵌入,可以通过 addNodeProperty 管道方法添加。此类属性是“瞬态”的,因为它们由管道本身自动创建和删除。

在此示例中,我们还使用 contextNodeLabels 参数明确设置我们要为其计算嵌入的节点类型,并包括已分类和未分类的节点。这很有用,因为使用的节点越多,生成的嵌入就越好。虽然这看起来可能违反直觉,但未分类的节点在训练期间不需要完全不可见(因此,例如,可以保留其邻居的信息)。更多信息可以在图机器学习出版物中找到,例如《图表示学习书》。

node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")

# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
    "fastRP",
    mutateProperty="embedding",
    embeddingDimension=512,
    propertyRatio=1.0,
    randomSeed=42,
    featureProperties=["features"],
    contextNodeLabels=["Paper", "UnclassifiedPaper"],
)

# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])

# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)

训练过程与上一节相同

# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model-fastrp",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

F1_WEIGHTED 指标在使用嵌入时表现更好

print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])

使用 predict_stream 进行的分类可以以同样的方式运行

predicted_fastrp = model_fastrp.predict_stream(
    G,
    modelName="cora-pipeline-model-fastrp",
    includePredictedProbabilities=True,
    targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))

为了提高性能,尤其是当预测值被多次使用时,预测可以以 mutate 模式运行,而不是流式传输结果。可以使用带有 UnclassifiedPaper 类的 nodeProperty.stream 方法检索预测节点。

model_fastrp.predict_mutate(
    G,
    mutateProperty="predictedClass",
    modelName="cora-pipeline-model-fastrp",
    predictedProbabilityProperty="predictedProbabilities",
    targetNodeLabels=["UnclassifiedPaper"],
)

predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp

这对于将分类结果与测试节点的原始 subject 值进行比较非常有用,原始值必须从 Neo4j 数据库中检索,因为它已从投影图中排除。

# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())

# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])

# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")

正如我们所见,所有测试节点的预测都是准确的。

8. 将结果写回 Neo4j

将预测的类写回图后,我们现在可以将它们写回 Neo4j 数据库。

请注意,如果您在 AuraDS 上运行此笔记本,则此步骤不适用。

gds.graph.nodeProperties.write(
    G,
    node_properties=["predictedClass"],
    node_labels=["UnclassifiedPaper"],
)

9. 清理

当不再需要图、模型和管道时,应将其删除以释放内存。只有在不重启 Neo4j 或 AuraDS 实例的情况下才需要这样做,因为重启会清理所有内存中的内容。

model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()

G.drop()

如果 Neo4j 数据库不再有用,则需要显式清理

gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")

关闭客户端也是一种良好的做法

gds.close()