从 Neo4j Streams 迁移
Neo4j Connector for Kafka 与 Neo4j Streams 插件 在架构上存在显著差异。
下表概述了两种解决方案之间的差异。
| Neo4j Streams 插件 | Neo4j Kafka 连接器 | |
|---|---|---|
部署 |
插件以 JAR 文件的形式部署在 Neo4j 上;自行管理 Kafka 连接 |
Kafka Connect 连接器运行在 Kafka Connect 上,独立于 Neo4j |
通信 |
面向底层 Neo4j 数据库的进程内客户端 |
使用 Neo4j Java 驱动通过 BOLT 建立数据库连接 |
配置 |
neo4j.conf 文件中的 Kafka 配置选项 |
Kafka 配置和 转换器 可以在 Kafka Connect 实例全局配置,或在连接器的配置中针对每个连接器单独配置 |
Sink/Source 角色 |
单个插件实例支持 sink 和 source 两种功能 |
启动时必须明确配置为 source 或 sink 连接器 |
Kafka 配置
已移除的设置
neo4j.conf 文件中供 Neo4j Streams 插件使用的 Kafka 设置已不再受支持,且不得包含在新连接器的配置中。
|
一般而言,以 |
streams.procedures.enabled
streams.sink.enabled
streams.source.enabled
kafka.auto.offset.reset
kafka.enable.auto.commit
kafka.group.id
kafka.max.poll.records
kafka.session.timeout.ms
kafka.topic.discovery.polling.interval
kafka.value.deserializer
kafka.value.deserializer
kafka.acks
kafka.batch.size
kafka.buffer.memory
kafka.linger.ms
kafka.retries
kafka.transactional.id
kafka.bootstrap.servers
kafka.connection.timeout.ms
kafka.security.protocol
kafka.ssl.endpoint.identification.algorithm
kafka.ssl.key.password
kafka.ssl.keystore.location
kafka.ssl.keystore.password
kafka.ssl.truststore.location
kafka.ssl.truststore.password
kafka.reindex.batch.size
kafka.replication
kafka.streams.log.compaction.strategy
Neo4j Streams 作为源
Neo4j Streams 插件的源功能已被 变更数据捕获(CDC)源连接器 取代。
|
CDC 需要以下任一 Neo4j 部署
如果您使用的 Neo4j 或 AuraDB 版本不支持 CDC,可以改用 基于查询的源连接器。 |
所需更改
有关新配置选项的详细信息,请参阅 源 → CDC。
Streams 插件使用的模式语法已更新,以更好地体现 Cypher 模式语法,示例如下:
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
"neo4j.source-strategy": "CDC",
"neo4j.cdc.topic.my-nodes-topic.patterns": "(:Person)"
"neo4j.cdc.topic.my-rels-topic.patterns": "()-[:BELONGS-TO]->()"
Neo4j Streams 作为汇
Neo4j Connector for Kafka 支持 Neo4j Streams 插件提供的所有 sink 策略。
Cypher 策略
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
"neo4j.cypher.topic.<YOUR_TOPIC>": "<CYPHER_QUERY>"
Neo4j Connector for Kafka 在最近的版本中增加了更多选项,以映射传入消息的头部、键和/或值,详情请参阅 汇 → Cypher。
CDC 策略
仍然支持 CDC 事件的 SourceId 与 Schema 两种策略。虽然 source CDC 连接器会使用新的 CDC 兼容模式发出事件,但 sink 连接器仍可读取由 Neo4j Streams 插件生成的 CDC 消息。
streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the ID name given to the CDC ID field, default=sourceId>
"neo4j.cdc.source-id.topics": "<comma-separated list of topics>",
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>",
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"
streams.sink.topic.cdc.schema=<list of topics separated by semicolon>
"neo4j.cdc.schema.topics": "<comma-separated list of topics>"
Pattern 策略
Neo4j Streams 配置中的模式语法在 Neo4j Connector for Kafka 中得到完整支持。虽然同时支持简单模式和类 Cypher 模式,但后者是定义模式的首选方式。
streams.sink.topic.pattern.node.user=(:User{!userId})
streams.sink.topic.pattern.node.product=(:Product{!productId})
streams.sink.topic.pattern.relationship.bought=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})
"neo4j.pattern.topic.user": "(:User{!userId})",
"neo4j.pattern.topic.product": "(:Product{!productId})",
"neo4j.pattern.topic.bought": "(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})"
请注意,在 Neo4j Connector for Kafka 中声明模式配置时,不会区分节点或关系。
您还可以为同一连接器添加带有不同模式的更多主题。此外,Neo4j Connector for Kafka 引入了额外功能,例如将传入消息属性映射到新名称,详见 模式章节。
CUD 文件格式策略
Neo4j Connector for Kafka 完全支持 CUD 文件格式策略。
streams.sink.topic.cud=<list of topics separated by semicolon>
"neo4j.cud.topics": "<comma-separated list of topics>"
错误处理
Kafka Connect 拥有自己的错误处理机制来处理 sink 消息。有关为 sink 连接器配置高级错误处理的更多信息,请参阅 汇 → 错误处理。