Kafka Source Connector:负载模式配置

Neo4j 的 Kafka Source Connector 支持三种负载模式,以控制序列化并发布到 Kafka 主题的数据格式:EXTENDEDCOMPACTRAW_JSON_STRING。该功能可通过 neo4j.payload-mode 属性进行配置,用户可以根据数据需求选择首选的序列化格式。

负载模式

neo4j.payload-mode 配置提供以下选项

  • EXTENDED(默认):为每个属性提供详细的结构,支持模式兼容性和一致性。当存在模式更改(例如属性类型更改)或时间类型时,此格式尤为有用,能够确保数据在变化过程中的一致性。

  • COMPACT:生成仅包含必要字段的简化格式。该格式更轻量,若不需要模式兼容性或复杂数据类型,则可能更合适。

  • RAW_JSON_STRING:生成数据的原始 JSON 字符串表示。当使用查询策略生成复杂数据结构且难以用 EXTENDEDCOMPACT 模式表示时,此模式非常有用。仅在使用查询策略时可用。

COMPACT 模式的限制

  • 属性类型更改COMPACT 模式不支持属性类型的更改。如果 Neo4j 中的属性类型发生变化(例如从整数变为字符串),可能会导致模式破坏。

  • Protobuf 兼容性COMPACT 模式不支持 Protobuf。它也不支持对时间类型(例如 LocalDateLocalDateTime)的序列化。

配置

可以在源连接器的设置中按如下方式配置负载模式

"neo4j.payload-mode": "EXTENDED" // Or "COMPACT" based on requirements

示例数据格式

以下示例展示了在每种负载模式下数据的发布方式。

COMPACT 模式示例

COMPACT 模式生成仅包含必要字段的极简负载

{
  "name": "mary",
  "surname": "doe",
  "timestamp": 1729779296311
}

当性能和简洁性是优先考虑时,此模式很有用,且适用于对模式演进和时间一致性不是主要关注点的场景。

EXTENDED 模式示例

EXTENDED 模式包含额外的结构和元数据,以支持复杂类型和模式一致性,防止属性类型随时间变化时出现问题

{
  "name": {
    "type": "S",
    "B": null,
    "I64": null,
    "F64": null,
    "S": "mary",
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  },
  "surname": {
    "type": "S",
    "B": null,
    "I64": null,
    "F64": null,
    "S": "doe",
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  },
  "timestamp": {
    "type": "I64",
    "B": null,
    "I64": 1729779365447,
    "F64": null,
    "S": null,
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  }
}

对于具有复杂模式需求的数据,此模式尤为有利,因为即使 Neo4j 端的属性类型发生变化,也能确保兼容性。

了解 EXTENDED 负载结构

EXTENDED 模式下,每个属性都包含针对所有受支持 Neo4j 类型的字段。只有对应实际属性类型的字段会包含非空值,其他字段均为 null。此结构确保属性类型的任何更改都不会在源连接器或汇连接器产生模式强制错误。

字段 描述

type

指示属性的类型。可能的取值包括:BI64F64SBATLDTLDTTLTTZDTTOTTDSP,或它们的列表等价形式(例如 LBLI64LF64LSLTLD 等)。

B

布尔类型(true 或 false)

I64

64 位整数

F64

64 位浮点数

S

字符串

BA

字节数组

TLD

时间本地日期

TLDT

时间本地日期时间

TLT

时间本地时间

TZDT

时间带时区日期时间

TOT

时间偏移时间

TD

时间持续时间

SP

空间点

LB、LI64、LF64、LS、LTLD 等

对应类型的列表

例如,字符串字段将表示为

{
  "type": "S",
  "B": null,
  "I64": null,
  "F64": null,
  "S": "actual_value",
  ...
}

配置建议

COMPACT 模式更易使用且更实用,当生成的消息被其他连接器或应用程序消费时,同时可以在目标主题上放宽模式兼容性。如果您的环境需要模式兼容性、时间数据类型,或对不同转换器(AVROJSON SchemaPROTOBUFJSON Embedded)有强类型安全要求,则应优先选择 EXTENDED 模式。

与汇连接器的兼容性

在连接器 5.1.0 版本中引入了 EXTENDED 格式,以确保所有发布到 Kafka 主题的数据遵循一致的模式。这可防止 Neo4j 端属性类型更改时(例如,名称属性从整数变为字符串)出现问题,从而实现连接器和 Kafka 消费者之间的平稳数据处理。当 Neo4j 汇连接器由 Neo4j 源连接器提供数据时,请使用 EXTENDED 模式,因为 Neo4j 汇连接器能够无缝处理 EXTENDED 数据类型。