Confluent Cloud 快速入门

本页面包含有关第三方平台使用说明的内容,该平台可能会发生我们无法控制的更改。如有疑问,请参阅第三方平台文档。

Confluent Cloud 提供了一个使用“用于 Confluent 的 Neo4j 连接器”的托管 Sink 连接器。对于希望将 Aura/Neo4j 用作源(Source)的客户,Neo4j 支持在 Confluent Cloud 中以自定义连接器(Custom Connector)的形式运行 Neo4j 连接器。

先决条件

cc cluster
图 1. Confluent Cloud 中正在运行的集群

将源插件作为自定义连接器上传

在创建连接器实例之前,我们首先需要将用于 Kafka 的 Neo4j 连接器定义为自定义连接器。

  1. 选择要安装连接器的集群,打开 Connectors(连接器)部分,然后点击 Add plugin(添加插件)。

  2. 点击 Add Plugin,填写如下所示的新自定义连接器的详细信息,然后接受条款并点击 Submit(提交)。

    连接器插件名称

    Neo4j Connector for Confluent Source

    自定义插件描述

    用于 Confluent 的 Neo4j 源连接器插件(作为自定义连接器)。

    连接器类

    org.neo4j.connectors.kafka.source.Neo4jConnector

    连接器类型

    源 (Source)

    连接器归档文件

    请按照 分发说明 获取最新的 Confluent Hub 组件归档包,并从本地计算机选择下载的 neo4j-kafka-connector-5.3.0.zip 文件。

    敏感属性

    为了在连接器实例中保护敏感配置属性,您至少应将以下配置属性标记为敏感。

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

它将上传归档文件并创建源插件。

自定义连接器是由用户创建的 Kafka Connect 插件、修改后的开源连接器插件,或者是像“用于 Confluent 的 Neo4j 连接器”这样的第三方连接器插件。在 Confluent 文档 中了解更多关于自定义连接器的信息,并阅读我们的 博客文章,其中包含关于如何使用 Aura 进行设置的操作示例。

您可以按照 Neo4j 开发者博客 上的说明,创建从 Confluent Cloud 到 Neo4j 和 Neo4j AuraDB 的连接器。

创建源实例

在上一节中创建了源自定义连接器后,我们现在可以开始配置我们的源实例了。

  1. 在 Confluent Cloud 中,转到集群的连接器部分,搜索我们在 上面 创建的插件 Neo4j Connector for Confluent Source

  2. 点击该连接器开始配置我们的源连接器实例。

  3. 配置用于访问 Kafka 集群的 API 密钥,然后点击 Continue(继续)。

  4. 首先点击 Auto Configure Schema Registry(自动配置 Schema Registry),根据您的偏好选择 JSON SchemaAvroProtobuf,然后点击 Apply changes(应用更改)。这将生成几个用于支持 Schema 的配置选项。接下来,以单独的键值对形式或添加到现有的 JSON 中来配置连接器选项。为了快速入门,我们将配置源实例,使其在匹配模式 (:TestSource) 的节点上发送变更事件消息,并根据您选择的序列化格式,将其发送到名为 createsupdatesdeletes 的主题中。

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }

    验证所有配置选项是否正确,然后点击 Continue(继续)。

  5. 在下一个屏幕上,我们需要添加连接端点,以便我们的连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其作为端点添加。请记住,Neo4j 连接的默认端口号是 7687。例如,对于连接 URI neo4j+s://<redacted>.databases.neo4j.io,我们应该输入 <redacted>.databases.neo4j.io:7687 作为端点。

  6. 接下来,选择您的连接器应运行的任务数量,然后点击 Continue(继续)。源连接器始终以 1 个任务运行,因此默认值 1 就足够了。

  7. 最后,命名您的连接器实例,检查您的设置,然后点击 Continue(继续)。

  8. 源实例将被配置,并在几分钟内显示为 Running(运行中)。

现在您已经有了一个正在运行的源实例,您可以在 Neo4j 中创建以下节点

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

这将导致新消息发布到名为 creates 的主题中。

创建 Sink 实例

在上一节中创建了源实例后,我们现在可以开始配置我们的 Sink 实例,以便我们可以对源实例生成的消息执行操作。

  1. 在 Confluent Cloud 中,转到集群的连接器部分,搜索插件 Neo4j Sink,该插件由 Confluent 作为托管连接器提供。

  2. 点击该连接器开始配置我们的 Sink 连接器实例。

  3. 选择 createsupdatesdeletes 主题供此 Sink 实例消费,然后点击 Continue(继续)。

  4. 配置用于访问 Kafka 集群的 API 密钥,然后点击 Continue(继续)。

  5. 输入包括认证方法、用户名和密码在内的 Neo4j 连接详细信息,然后点击 Continue(继续)。

  6. 根据您的传入消息序列化格式,选择 JSON SchemaAvroProtobuf 作为记录值格式。接下来,根据您的偏好配置连接器选项。为了快速入门,我们将配置 Sink 实例,使其为从 createsupdatesdeletes 主题收到的每条消息执行 Cypher 语句。

    配置以下选项

    主题的 Cypher 语句
    {
      "creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    }

    清除 Bind Timestamp AsBind Header AsBind Key As 选项,将 Bind Value As 设置为 __value,并将 Bind Value As Event 设置为 false

    保持其他选项为默认值,验证所有配置选项是否正确,然后点击 Continue(继续)。

  7. 接下来,选择您的连接器应运行的任务数量,然后点击 Continue(继续)。

  8. 最后,命名您的连接器实例,检查您的设置,然后点击 Continue(继续)。

  9. Sink 实例将被配置,并在几分钟内显示为 Running(运行中)。

测试

现在您可以访问您的 Confluent Cloud 集群,并验证至少 creates 主题已按照连接器配置中的说明创建。

随着源连接器和 Sink 连接器同时运行,先前创建的 :TestSource 节点将导致源实例向 creates 主题发布消息。这些消息随后将被 Sink 实例消费,并在 Neo4j 内部创建相应的 :Person:Family 节点。当您创建、更新和删除 TestSource 标签的节点时,也会创建 updatesdeletes 主题。

通过在 https://:7474/browser/ 的 Neo4j Browser 中执行以下查询,检查是否已实现:

MATCH (n:(Person | Family)) RETURN n

您现在可以通过执行更多类似以下的语句来创建、更新或删除 Person 和 Family 节点:

创建一个新人员
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

验证是否创建了一个新的 Person 和一个新的 Family 节点,并将它们连接在一起。

更新现有人员
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

验证现有的 Person 节点现在是否已更新姓氏为 smith,并链接到一个新的 Family 节点。

删除现有人员
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

验证现有的 Person 节点现在是否已删除。

总结

在本快速入门中,我们展示了如何配置 AuraDB/Neo4j 数据库,使其既可以作为 Kafka 主题消息的源,也可以作为这些消息的 Sink,从而在数据库中创建、更新或删除节点和关系。通常,我们的连接器要么在通过 Confluent 从其他数据源提取数据时用作 Sink,要么用作 Confluent 将数据推送到其他数据库的源。

故障排除

  • 确保您的数据库已启用 CDC。

  • 确保按照上述说明正确设置了连接端点。

  • 检查连接器日志。

请注意,Confluent Cloud 中的自定义连接器日志可能不会立即显示。在测试连接器实例上的配置更改时,请记住这一点。