在 Confluent 平台上迁移

本指南遵循 Confluent 平台的快速入门

步骤
  1. 打开位于 https://:9021/clusters 的 Confluent 控制中心实例,找到已注册的源或汇连接器。找到后,将其删除。

  2. 查看 connect 容器的日志(docker-compose logs -f connect)。迁移后的配置将在连接器删除/关闭时打印到日志中。示例

    connect  | [2024-09-04 09:18:40,066] INFO The migrated settings for 5.1 version of Neo4j Source Connector 'Neo4jSourceConnectorAVRO' is: `{
    connect  |   "connector.class" : "org.neo4j.connectors.kafka.source.Neo4jConnector",
    connect  |   "neo4j.authentication.basic.password" : "",
    connect  |   "neo4j.uri" : "bolt://neo4j:7687",
    connect  |   "neo4j.query" : "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    connect  |   "neo4j.query.streaming-property" : "timestamp",
    connect  |   "value.converter.schema.registry.url" : "http://schema-registry:8081",
    connect  |   "task.class" : "streams.kafka.connect.source.Neo4jSourceTask",
    connect  |   "neo4j.authentication.basic.username" : "neo4j",
    connect  |   "name" : "Neo4jSourceConnectorAVRO",
    connect  |   "neo4j.query.topic" : "my-topic",
    connect  |   "value.converter" : "io.confluent.connect.avro.AvroConverter",
    connect  |   "key.converter" : "io.confluent.connect.avro.AvroConverter",
    connect  |   "key.converter.schema.registry.url" : "http://schema-registry:8081",
    connect  |   "neo4j.query.poll-interval" : "5000ms",
    connect  |   "neo4j.start-from" : "USER_PROVIDED",
    connect  |   "neo4j.start-from.value" : 1725441505774
    connect  | }` (streams.kafka.connect.source.Neo4jSourceService)
  3. 创建一个新的 JSON 文件 (source/sink)_migrated.neo4j.json,并将迁移后的示例配置复制到此文件中,同时包括连接器名称。配置应包含 name 属性,迁移后的配置应嵌套在 config 键中。

  4. 验证新配置相较于先前版本的连接器配置,包含所有相关键。

    1. 参见 源配置设置汇配置设置,了解新配置选项的描述和示例。

    2. 如果迁移源组件,请注意已设置为最近检查偏移量的 neo4j.start-from.value

    3. 替换敏感值。

      原始配置中的敏感值不会在迁移后的配置中打印。需要为这些键填入相应的值。受影响的配置键包括 neo4j.authentication.basic.passwordneo4j.authentication.kerberos.ticket
  5. 按照 安装 指南下载新连接器版本,并移除原始的 5.0.x 连接器版本。确保将新插件复制到 Docker 设置期间创建的 ./plugins/ 文件夹中。

  6. 通过运行 docker-compose restart connect 重启 Kafka Connect Worker。这使 Kafka Connect 平台能够从 ./plugins/ 文件夹中加载新插件。

  7. 继续遵循 Confluent 平台的快速入门,使用新配置将插件部署到 Kafka Connect。

  8. 连接器启动运行后,验证其已成功运行。