写入关系

本页中的所有示例均假设 SparkSession 已使用适当的身份验证选项进行了初始化。有关更多详细信息,请参阅 快速入门示例

使用 relationship 选项,连接器通过指定源节点、目标节点和关系,将 Spark DataFrame 写入 Neo4j 数据库。

为避免死锁,在向 Neo4j 写入关系之前,请始终使用单个分区(通过 coalesce(1)repartition(1))。

连接器会构建一个 CREATEMERGE Cypher® 查询(取决于 保存模式),该查询使用 UNWIND 子句来写入一批行(一个由 batch.size 选项定义大小的 events 列表)。

查询的其余部分根据多种数据源选项进行构建。

表 1. 写入选项
选项 描述 默认

relationship.save.strategy

定义要使用的 保存策略

  • native 要求 DataFrame 使用特定的模式 (schema)。

  • keys 更具灵活性。

native

relationship.source.save.mode

以及

relationship.target.save.mode

定义 节点保存模式,可为源节点和目标节点分别独立设置。

  • Match 模式执行 MATCH 操作。

  • Append 模式执行 CREATE 操作。

  • Overwrite 模式执行 MERGE 操作。

Match

relationship.source.labels

以及

relationship.target.labels

必需。

定义要分配给源节点和目标节点的标签。

以冒号分隔的标签列表。

(空)

relationship.source.node.keys

以及

relationship.target.node.keys

节点保存模式MatchOverwrite 时,定义用于标识节点的节点键。

以逗号分隔的 key:value 对列表。

如果 keyvalue 的值相同(例如 "name:name"),则可以省略其中一个。

(空)

relationship.source.node.properties

以及

relationship.target.node.properties

保存策略keys 时,定义要作为源/目标节点属性写入的 DataFrame 列。

以逗号分隔的 key:value 对列表。

如果 keyvalue 的值相同(例如 "name:name"),则可以省略其中一个。

(空)

relationship.properties

保存策略keys 时,定义要作为关系属性写入的 DataFrame 列。

以逗号分隔的 key:value 对列表。

如果 keyvalue 的值相同(例如 "name:name"),则可以省略其中一个。

(空)

保存策略

保存策略定义了连接器将 DataFrame 模式映射到 Neo4j 节点和关系的方式。

native 策略

native 策略要求 DataFrame 符合 关系读取模式,并将 relationship.nodes.map 选项设置为 false。DataFrame 必须至少包含 rel.[属性名]source.[属性名]target.[属性名] 列中的一个。

此模式的一个很好的用例是将数据从一个数据库传输到另一个数据库。当您使用连接器读取关系时,生成的 DataFrame 会具有正确的模式。

如果您使用连接器从一个数据库读取数据并写入到另一个不同的数据库,则需要在每个 DataFrame 上设置连接选项,而不是在 Spark 会话上设置。

以下示例展示了如何在关系和源/目标节点上使用带有 Append 保存模式的 native 策略。如果运行多次,它会创建重复的关系和节点。

示例
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
    "source.name",
    "source.surname",
    "source.id",
    "target.name",
    "rel.quantity",
    "rel.order"
)

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
示例
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
    [
        {
            "source.name": "John",
            "source.surname": "Doe",
            "source.customerID": 1,
            "target.name": "Product 1",
            "rel.quantity": 200,
            "rel.order": "ABC100",
        },
        {
            "source.name": "Jane",
            "source.surname": "Doe",
            "source.customerID": 2,
            "target.name": "Product 2",
            "rel.quantity": 100,
            "rel.order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

keys 策略

keys 策略在写入关系和源/目标节点的方式上提供了更多控制。它不需要 DataFrame 具备任何特定的模式,您可以指定要作为节点和关系属性写入的列。

以下示例展示了如何在关系和源/目标节点上使用带有 Append 保存模式的 keys 策略。如果运行多次,它会创建重复的关系和节点。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

节点保存模式

上一节中的示例对关系和节点都使用了 Append 模式;这意味着每次运行代码时都会创建新的关系和新的节点。

Append 的节点保存模式具有不同的行为。

Match 模式

Match 模式要求具有选定标签和键的节点必须已经存在。此模式需要同时设置 relationship.source.node.keysrelationship.target.node.keys 选项。

如果不存在匹配的节点,以下示例将不会创建任何关系。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Overwrite 模式

Overwrite 模式会在具有选定标签和键的节点不存在时创建它们。此模式需要同时设置 relationship.source.node.keysrelationship.target.node.keys 选项。

如果运行多次,以下示例会创建重复的关系,但不会创建重复的节点。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

由于 Spark 作业的并发性,在使用 Overwrite 模式时,您应该使用 属性唯一性约束 来确保节点的唯一性。

覆盖节点和关系

如果您需要对节点和关系执行更新插入 (upsert),则必须为 relationship.source.node.keysrelationship.target.node.keysmode 选项全部使用 Overwrite 模式。

如果运行多次,以下示例不会创建任何重复的节点或关系。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Overwrite relationships
    .mode(SaveMode.Overwrite)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Overwrite relationships
    .mode("Overwrite")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

由于 Spark 作业的并发性,在使用 Overwrite 模式时,您应该使用 属性唯一性约束 来确保节点的唯一性。