写入 Neo4j

该连接器提供了三种数据源选项来将数据写入 Neo4j 数据库。

表 1. 写入选项
选项 描述 默认

标签

如果您只需创建或更新节点及其属性,或作为添加关系之前的第一步,请使用此选项。

以冒号分隔的要创建或更新的节点标签列表。

(空)

关系 (relationship)

如果您需要创建或更新关系以及它们的源节点和目标节点,请使用此选项。

要创建或更新的关系类型。

(空)

query

如果您需要更大的灵活性并且了解如何编写 Cypher® 查询,请使用此选项。

包含 CREATEMERGE 子句的 Cypher 查询。

(空)

示例

本页的所有示例都假设已使用适当的身份验证选项初始化了 SparkSession。有关详细信息,请参阅 快速入门示例

您可以运行每个选项的读取示例,以在写入后检查数据。

labels 选项

写入 :Person 节点。

示例
case class Person(name: String, surname: String, age: Int)

val peopleDF = List(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

peopleDF.write
    .format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Append)
    .option("labels", ":Person")
    .save()
示例
# Create example DataFrame
peopleDF = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

(
    peopleDF.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    .save()
)

有关更多信息和示例,请参阅 写入节点

relationship 选项

写入 :BOUGHT 关系及其源节点、目标节点和属性。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
# Create example DataFrame
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)

有关更多信息和示例,请参阅 写入关系

query 选项

使用 Cypher 查询写入数据。

示例
case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val queryDF = List(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val writeQuery =
    "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

queryDF.write
    .format("org.neo4j.spark.DataSource")
    .option("query", writeQuery)
    .mode(SaveMode.Overwrite)
    .save()
示例
# Create example DataFrame
queryDF = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

# Define the Cypher query to use in the write
write_query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

(
    queryDF.write.format("org.neo4j.spark.DataSource")
    .option("query", write_query)
    .mode("Overwrite")
    .save()
)

有关更多信息和示例,请参阅 使用 Cypher 查询写入

保存模式

无论使用哪种写入选项,连接器都支持数据源 mode() 方法的两种保存模式。

  • Append 模式通过构建 CREATE Cypher 查询来创建新节点或关系。

  • Overwrite 模式通过构建 MERGE Cypher 查询来创建或更新节点或关系。

    • 在与 labels 选项一起使用时,需要 node.keys 选项。

    • 在与 relationship 选项一起使用时,需要 relationship.source.node.keysrelationship.target.node.keys

类型映射

有关 Spark DataFrames 与 Neo4j 之间完整类型映射,请参阅 数据类型映射

性能注意事项

由于写入通常是一个昂贵的操作,请确保仅写入所需的 DataFrame 列。

例如,如果数据源的列为 namesurnameagelivesIn,但您只需要 namesurname,可以按以下方式操作

df.select(df("name"), df("surname"))
  .write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Append)
  .option("labels", ":Person")
  .save()