数据类型映射

Neo4j 和 Cypher® 提供了一个类型系统,用于描述值在数据库中的存储方式,但这些类型并不总是与 Spark 提供的类型完全匹配。

在某些情况下,Neo4j 提供的数据类型在 Spark 中没有对应项,反之亦然。

数据类型映射

表 1. Spark 到 Neo4j 类型映射参考
Neo4j 类型 Spark 类型 说明

字符串

string

示例:"Hello"

整数

long, short, byte

示例:12345

浮点数

double

示例:3.141592

字符串

decimal

5.4.0 新增 示例:"16725.77423461" 此映射仅适用于写入 Neo4j。如果您需要读取 Spark 的 decimal,请根据您的用例以适当的方式解析其字符串表示形式。

布尔值

boolean

示例:true

Point

struct { type: string, srid: integer, x: double, y: double, z: double }

有关 Neo4j 空间类型的更多信息,请参阅空间值 (Spatial values)

Date

date

示例:2020-09-11

Time

struct { type: string, value: string }

示例:[offset-time, 12:14:08.209Z]

LocalTime

struct { type: string, value: string }

示例:[local-time, 12:18:11.628]

ZonedDateTime

时间戳

5.4.0 新增 示例:2020-09-11 12:17:39.192+02:00 Spark 时间戳 (TIMESTAMP) 被写入为 UTC 格式的 ZonedDateTime

LocalDateTime

时间戳

5.4.0 起已弃用data.conversion 设置为 legacy 以继续使用它。

LocalDateTime

timestamp_ntz

5.4.0 新增 示例:2020-09-11 12:14:49.081 Spark 本地时间戳 (TIMESTAMP_NTZ) 被写入为 LocalDateTime

整数

timestamp_ntz

5.4.0 起已弃用data.conversion 设置为 legacy 以继续使用它。

Duration

struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

参见时间函数:duration

Duration

durationperiod 对象

5.4.0 新增 示例:java.time.Duration.ofDays(42)java.time.Period.ofMonths(5)

Long

durationperiod 对象

5.4.0 起已弃用data.conversion 设置为 legacy 以继续使用它。

Duration

Spark SQL INTERVAL 类型

5.4.0 新增 示例:INTERVAL '10 05:30' DAY TO MINUTE, INTERVAL '4-5' YEAR TO MONTHtimestamp('2025-01-02 18:30:00.454') - timestamp('2024-01-01 00:00:00')

Long

Spark SQL INTERVAL 类型

5.4.0 起已弃用data.conversion 设置为 legacy 以继续使用它。

节点

struct { <id>: long, <labels>: array[string], (PROPERTIES) }

Neo4j 中的节点表示为属性容器;也就是说,它们显示为带有属性的结构体,这些属性对应于节点中的任何属性。为了易用性,通常最好从查询中返回各个属性,而不是返回整个节点。

关系

struct { <rel.id>: long, <rel.type>: string, <source.id>: long, <target.id>: long, (PROPERTIES) }

关系以 Map 形式返回,标识关系的源节点和目标节点、其类型以及关系的属性(如果有)。为了易用性,通常最好从查询中返回各个属性,而不是返回整个关系。

路径

string

示例:path[(322)←[20280:AIRLINE]-(33510)]为了易用性,建议使用 路径函数 (path functions) 从查询中返回路径的各个属性/方面。

ByteArray

binary, array[byte]

5.4.0 新增 二进制写入被专门视为 Neo4j 的 ByteArray 类型。

List<Integer>

binary, array[byte]

5.4.0 起已弃用data.conversion 设置为 legacy 以继续使用它。

[相同类型的数组]

array[element]

在 Neo4j 中,数组必须具有一致的类型(例如,数组只能包含 Float 值)。内部 Spark 类型匹配上述类型映射。

复杂数据类型

Spark 不原生支持所有 Neo4j 数据类型(例如 Point, Time, Duration)。此类类型被转换为包含所有有用数据的 Struct 类型。

表 2. 复杂数据类型转换
Neo4j 类型 Spark Struct

Duration

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("months", DataTypes.LongType, false),
    ("days", DataTypes.LongType, false),
    ("seconds", DataTypes.LongType, false),
    ("nanoseconds", DataTypes.IntegerType, false),
    ("value", DataTypes.StringType, false)
  ))

Point

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("srid", DataTypes.IntegerType, false),
    ("x", DataTypes.DoubleType, false),
    ("y", DataTypes.DoubleType, false),
    ("z", DataTypes.DoubleType, true),
  ))

Time

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("value", DataTypes.StringType, false)
  ))

Map 类型

当列为 Map 时,连接器会尝试将其展平。例如,考虑以下数据集

id 名称 (name) lives_in

1

Andrea Santurbano

{address: 'Times Square, 1', city: 'NY', state: 'NY'}

2

Davide Fantuzzi

{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}

连接器将 lives_in 列展平为三列:lives_in.address, lives_in.citylives_in.state

id 名称 (name) lives_in.address lives_in.city lives_in.state

1

Andrea Santurbano

Times Square, 1

NY

NY

2

Davide Fantuzzi

Statue of Liberty, 10

NY

NY

当 DataFrame 的一列是 Map 时,我们在内部执行的操作是展平该 Map,因为 Neo4j 不支持这种图实体属性类型;因此,对于像这样的 Spark 作业:

val data = Seq(
  ("Foo", 1, Map("inner" -> Map("key" -> "innerValue"))),
  ("Bar", 2, Map("inner" -> Map("key" -> "innerValue1"))),
).toDF("id", "time", "table")

data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

在 Neo4j 中,对于标签为 MyNodeWithFlattenedMap 的节点,您将发现此信息被存储为:

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.inner.key`: 'innerValue'
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.inner.key`: 'innerValue1'
}

现在,您可能会遇到如下棘手情况:

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

由于产生的展平键是重复的,Neo4j Spark 将以非确定性的方式选取关联值中的一个。

因为我们将要存入 Neo4j 的信息将是这样(注意,顺序无法保证):

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: 'innerValue' // but it could be `value` as the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: 'innerValue1' // but it could be `value1` as the order is not guaranteed
}

将重复键归组为值的数组

您可以使用选项 schema.map.group.duplicate.keys 来避免此问题。连接器会将具有相同键的所有值归组为一个数组。该选项的默认值为 false。在如下场景中:

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .option("schema.map.group.duplicate.keys", true)
  .save()

输出将是:

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: ['innerValue', 'value'] // the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: ['innerValue1', 'value1'] // the order is not guaranteed
}

约束类型映射

表 3. Spark 到 Cypher 约束类型映射
Spark 类型 Neo4j 类型

BooleanType

布尔值 (BOOLEAN)

StringType

STRING

IntegerType

INTEGER(整数)

LongType

INTEGER(整数)

FloatType

FLOAT

DoubleType

FLOAT

DateType

DATE

TimestampType

LOCAL DATETIME

自定义 pointType 为:Struct { type: string, srid: integer, x: double, y: double, z: double }

POINT

自定义 durationType 为:Struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

DURATION(持续时间)

DataTypes.createArrayType(BooleanType, false)

LIST<BOOLEAN NOT NULL>

DataTypes.createArrayType(StringType, false)

LIST<STRING NOT NULL>

DataTypes.createArrayType(IntegerType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(LongType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(FloatType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DoubleType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DateType, false)

LIST<DATE NOT NULL>

DataTypes.createArrayType(TimestampType, false)

LIST<LOCAL DATETIME NOT NULL>

DataTypes.createArrayType(pointType, false)

LIST<POINT NOT NULL>

DataTypes.createArrayType(durationType, false)

LIST<DURATION NOT NULL>

特别是对于数组,我们使用不含 null 元素的版本,因为 Neo4j 不允许数组中存在 null。