通过 Cypher 查询写入
|
本页中的所有示例均假设 |
如果需要更灵活的方式,可以使用 query 选项来运行带有 CREATE 和 MERGE 子句的自定义 Cypher® 查询。
连接器使用提供的查询持久化整个 DataFrame。节点按照 batch.size 属性定义的行批次发送到 Neo4j,并且查询被包装在 UNWIND $events AS event 语句中。
示例
case class Person(name: String, surname: String, age: Int)
// Create an example DataFrame
val df = Seq(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()
// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
df.write
.format("org.neo4j.spark.DataSource")
.option("query", query)
.mode(SaveMode.Overwrite)
.save()
等效的 Cypher 查询
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})
events 是从数据集创建的一个批次。
注意事项
-
必须始终指定 保存模式。
-
也可以在
WITH语句中使用events列表。例如,您可以将前面示例中的查询替换为以下内容WITH event.name + ' ' + toUpper(event.surname) AS fullName CREATE (n:Person {fullName: fullName}) -
支持在
CALL中引用events列表的子查询CALL { WITH event RETURN event.name + ' ' + toUpper(event.surname) AS fullName } CREATE (n:Person {fullName: fullName}) -
如果已安装 APOC,则可以使用 APOC 过程和函数
CALL { WITH event RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName } CREATE (n:Person {fullName: fullName}) -
虽然
RETURN子句并未被禁止,但添加它对查询结果没有任何影响。
script 选项
script 选项允许在执行读取操作之前运行一系列 Cypher 查询。
脚本的结果可以在后续查询中使用,例如注入查询参数。
示例
val df = Seq(Person("John", "Doe", 42)).toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Append)
.option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
.option("script",
"""CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
|CREATE CONSTRAINT product_name_sku FOR (p:Product)
| REQUIRE (p.name, p.sku)
| IS NODE KEY;
|RETURN 36 AS age;
|""".stripMargin)
.save()
等效的 Cypher 查询
WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})
scriptResult 是 script 中最后一个 Cypher 查询的结果,在本例中为 RETURN 36 AS age。
events 是从数据集创建的一个批次。