通过 Cypher 查询写入

本页中的所有示例均假设 SparkSession 已使用适当的身份验证选项进行了初始化。有关更多详细信息,请参阅 快速入门示例

如果需要更灵活的方式,可以使用 query 选项来运行带有 CREATEMERGE 子句的自定义 Cypher® 查询。

连接器使用提供的查询持久化整个 DataFrame。节点按照 batch.size 属性定义的行批次发送到 Neo4j,并且查询被包装在 UNWIND $events AS event 语句中。

示例
case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val df = Seq(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

df.write
  .format("org.neo4j.spark.DataSource")
  .option("query", query)
  .mode(SaveMode.Overwrite)
  .save()
等效的 Cypher 查询
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})

events 是从数据集创建的一个批次。

注意事项

  • 必须始终指定 保存模式

  • 也可以在 WITH 语句中使用 events 列表。例如,您可以将前面示例中的查询替换为以下内容

    WITH event.name + ' ' + toUpper(event.surname) AS fullName
    CREATE (n:Person {fullName: fullName})
  • 支持在 CALL 中引用 events 列表的子查询

    CALL {
      WITH event
      RETURN event.name + ' ' + toUpper(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • 如果已安装 APOC,则可以使用 APOC 过程和函数

    CALL {
      WITH event
      RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • 虽然 RETURN 子句并未被禁止,但添加它对查询结果没有任何影响。

script 选项

script 选项允许在执行读取操作之前运行一系列 Cypher 查询。

脚本的结果可以在后续查询中使用,例如注入查询参数。

示例
val df = Seq(Person("John", "Doe", 42)).toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Append)
  .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
  .option("script",
    """CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
      |CREATE CONSTRAINT product_name_sku FOR (p:Product)
      | REQUIRE (p.name, p.sku)
      | IS NODE KEY;
      |RETURN 36 AS age;
      |""".stripMargin)
  .save()
等效的 Cypher 查询
WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})

scriptResultscript 中最后一个 Cypher 查询的结果,在本例中为 RETURN 36 AS age

events 是从数据集创建的一个批次。