Sink 配置设置

连接器设置

名称 描述

connector.class 必填

org.neo4j.connectors.kafka.sink.Neo4jConnector

key.converter 必填

兼容的 Kafka Connect 转换器。

可以是以下之一:

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

key.converter.schema.registry.url

用于消息键的 Schema Registry URL。当配置的 key.converter 需要 Schema Registry 时为必需。

key.converter.optional.for.nullables

key.converterio.confluent.connect.protobuf.ProtobufConverter 时,应设置为 true

value.converter 必填

兼容的 Kafka Connect 转换器。

可以是以下之一:

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

value.converter.schema.registry.url

用于消息值的 Schema Registry URL。当配置的 value.converter 需要 Schema Registry 时为必需。

value.converter.optional.for.nullables

key.converterio.confluent.connect.protobuf.ProtobufConverter 时,应设置为 true

Neo4j 连接设置

名称 描述

neo4j.uri 必填

要连接的 Neo4j URI。可以指定多个 URI,并用 , 分隔。

neo4j.database

要连接的 Neo4j 数据库名称。建议明确指定。

neo4j.authentication.type 必填

使用的身份验证类型。可选值:NONE, BASIC, KERBEROS, BEARER, CUSTOM

默认值:BASIC

neo4j.authentication.basic.username

用于身份验证的用户名。当 neo4j.authentication.typeBASIC 时为必需。

neo4j.authentication.basic.password

用于身份验证的密码。当 neo4j.authentication.typeBASIC 时为必需。

neo4j.authentication.basic.realm

用于身份验证的域(Realm),留空则使用默认值。

neo4j.authentication.kerberos.ticket

用于建立连接的 Kerberos 票据。当 neo4j.authentication.typeKERBEROS 时为必需。

neo4j.authentication.bearer.token

用于建立连接的 Bearer Token。当 neo4j.authentication.typeBEARER 时为必需。

neo4j.authentication.custom.scheme

用于建立连接的自定义身份验证方案。当 neo4j.authentication.typeCUSTOM 时为必需。

neo4j.authentication.custom.principal

用于建立连接的自定义主体(Principal)。当 neo4j.authentication.typeCUSTOM 时为必需。

neo4j.authentication.custom.credentials

用于建立连接的自定义凭据。当 neo4j.authentication.typeCUSTOM 时为必需。

neo4j.authentication.custom.realm

用于身份验证的自定义域,根据自定义身份验证提供程序的要求进行设置。

neo4j.connection-timeout

TCP 连接超时(有效单位:ms, s, m, hd;默认单位为 s)。

默认值:30s(驱动程序默认值)

neo4j.pool.max-connection-pool-size

连接池中保持的最大连接数。

默认值:100(驱动程序默认值)

neo4j.pool.connection-acquisition-timeout

从连接池获取连接的最大等待时间(有效单位:ms, s, m, hd;默认单位为 s)。

默认值:60s(驱动程序默认值)

neo4j.pool.idle-time-before-connection-test

空闲连接在进行活性测试前的持续时间(有效单位:ms, s, m, hd;默认单位为 s)。

默认值:no test(驱动程序默认值)

neo4j.pool.max-connection-lifetime

连接从连接池中丢弃前的持续时间(有效单位:ms, s, m, hd;默认单位为 s)。

默认值:1h(驱动程序默认值)

neo4j.max-retry-time

重试事务的最大持续时间。

默认值:30s

neo4j.security.encrypted

是否启用加密。仅当 neo4j.uri 中使用 boltneo4j 方案时适用。可选值:true, false

默认值:false

neo4j.security.trust-strategy

用于 TLS 连接的信任策略。可选值:TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES。当 neo4j.security.encryptedtrue 时为必需。

默认值:TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

neo4j.security.cert-files

包含要信任的 CA 的 X509 证书的文件列表。当 neo4j.security.trust-strategyTRUST_CUSTOM_CA_SIGNED_CERTIFICATES 时为必需。

neo4j.security.hostname-verification-enabled

在 TLS 握手期间是否启用主机名验证。可选值:true, false

默认值:true

通用 Sink 设置

名称 描述

neo4j.batch-size

每个主题每个事务处理的最大消息数。

默认值:1000

neo4j.batch-timeout

允许处理一批数据的最长时间(有效单位:ms, s, m, hd;默认单位为 s)。

默认值:0s

neo4j.max-batched-queries 5.3.0

对于 Sink 处理程序,在原生批量处理策略生成的批量 Cypher 语句中包含的最大子查询数。

默认值:50

此设置目前仅适用于 CDC Sink 策略,对于非 CDC 策略以及使用基于 APOC Core 的批量策略时会被忽略。

neo4j.eos-offset-label 5.3.0

用于附加到代表最后成功处理消息偏移量的节点上的标签名称。指定后,此设置通过在数据库中跟踪最后成功处理的消息偏移量,来实现 Sink 策略的精确一次处理保证。

此设置目前仅适用于 CDC Sink 策略,对于非 CDC 策略会被忽略。

Cypher 策略设置

名称 描述

neo4j.cypher.topic.{NAME}

针对指定主题运行的 Cypher 语句。示例:"neo4j.cypher.topic.my-topic": "MERGE (n:Person {name: __value.name})"

neo4j.cypher.bind-timestamp-as

在用户提供的 Cypher 语句中,消息时间戳将以什么名称绑定。设置为空字符串可禁用消息时间戳绑定。

默认值:__timestamp

neo4j.cypher.bind-header-as

在用户提供的 Cypher 语句中,消息头将以什么名称绑定。消息头将作为头名称到对应值的映射进行绑定。设置为空字符串可禁用消息头绑定。

默认值:__header

neo4j.cypher.bind-key-as

在用户提供的 Cypher 语句中,消息键将以什么名称绑定。设置为空字符串可禁用消息键绑定。

默认值:__key

neo4j.cypher.bind-value-as

在用户提供的 Cypher 语句中,消息值将以什么名称绑定。设置为空字符串可禁用消息值绑定。

默认值:__value

neo4j.cypher.bind-value-as-event

为了向后兼容,是否将消息值在用户提供的 Cypher 语句中绑定为 'event'。

默认值:true

模式策略设置

名称 描述

neo4j.pattern.topic.{NAME}

应用于从指定主题接收到的消息的节点或关系模式。示例:"neo4j.pattern.topic.user": "(:User\{!userId})" 示例:"neo4j.pattern.topic.user": "(:User\{!userId})-[:KNOWS]→(:User\{!otherUserId})"

neo4j.pattern.merge-node-properties

是否将传入属性与节点的现有属性合并。

默认值:false

neo4j.pattern.merge-relationship-properties

是否将传入属性与关系的现有属性合并。

默认值:false

neo4j.pattern.bind-timestamp-as

在模式中,消息时间戳将以什么名称绑定。设置为空字符串可禁用消息时间戳绑定。

默认值:__timestamp

neo4j.pattern.bind-header-as

在模式中,消息头将以什么名称绑定。消息头将作为头名称到对应值的映射进行绑定。设置为空字符串可禁用消息头绑定。

默认值:__header

neo4j.pattern.bind-key-as

在模式中,消息键将以什么名称绑定。设置为空字符串可禁用消息键绑定。

默认值:__key

neo4j.pattern.bind-value-as

在模式中,消息值将以什么名称绑定。设置为空字符串可禁用消息值绑定。

默认值:__value

CDC 策略设置

名称 描述

neo4j.cdc.schema.topics

包含由该连接器的源实例使用 CDC 源策略生成的 CDC 事件的主题。

neo4j.cdc.source-id.topics

包含由该连接器的源实例使用 CDC 源策略生成的 CDC 事件的主题。

neo4j.cdc.source-id.label-name

附加到由 CDC Source Id 策略管理的节点上的标签名称。

默认值:SourceEvent

neo4j.cdc.source-id.property-name

附加到由 CDC Source Id 策略管理的节点上的 ID 属性名称。

默认值:sourceId

CUD 策略设置

名称 描述

neo4j.cud.topics

包含 CUD 事件的主题。