Confluent Cloud 快速入门
|
本页面包含有关第三方平台使用说明的内容,该平台可能会发生我们无法控制的更改。如有疑问,请参阅第三方平台文档。 |
Confluent Cloud 提供了一个使用“用于 Confluent 的 Neo4j 连接器”的托管 Sink 连接器。对于希望将 Aura/Neo4j 用作源(Source)的客户,Neo4j 支持在 Confluent Cloud 中以自定义连接器(Custom Connector)的形式运行 Neo4j 连接器。
先决条件
-
需要一个已启用 CDC(变更数据捕获)且可通过公网访问的 AuraDB Enterprise 或本地部署的 Neo4j Enterprise。请按照 在 Neo4j Aura 上启用 CDC 或 在 Neo4j DBMS 上启用 CDC 中的说明进行操作。
-
必须在 Confluent Cloud 中拥有一个环境和一个正在运行的集群,类似于 图 1,“Confluent Cloud 中正在运行的集群” 中所示。
将源插件作为自定义连接器上传
在创建连接器实例之前,我们首先需要将用于 Kafka 的 Neo4j 连接器定义为自定义连接器。
-
选择要安装连接器的集群,打开
Connectors(连接器)部分,然后点击Add plugin(添加插件)。 -
点击
Add Plugin,填写如下所示的新自定义连接器的详细信息,然后接受条款并点击Submit(提交)。- 连接器插件名称
-
Neo4j Connector for Confluent Source
- 自定义插件描述
-
用于 Confluent 的 Neo4j 源连接器插件(作为自定义连接器)。
- 连接器类
-
org.neo4j.connectors.kafka.source.Neo4jConnector - 连接器类型
-
源 (Source) - 连接器归档文件
-
请按照 分发说明 获取最新的 Confluent Hub 组件归档包,并从本地计算机选择下载的
neo4j-kafka-connector-5.3.0.zip文件。 - 敏感属性
-
为了在连接器实例中保护敏感配置属性,您至少应将以下配置属性标记为敏感。
neo4j.authentication.basic.password neo4j.authentication.kerberos.ticket neo4j.authentication.bearer.token neo4j.authentication.custom.credentials
它将上传归档文件并创建源插件。
自定义连接器是由用户创建的 Kafka Connect 插件、修改后的开源连接器插件,或者是像“用于 Confluent 的 Neo4j 连接器”这样的第三方连接器插件。在 Confluent 文档 中了解更多关于自定义连接器的信息,并阅读我们的 博客文章,其中包含关于如何使用 Aura 进行设置的操作示例。
您可以按照 Neo4j 开发者博客 上的说明,创建从 Confluent Cloud 到 Neo4j 和 Neo4j AuraDB 的连接器。
创建源实例
在上一节中创建了源自定义连接器后,我们现在可以开始配置我们的源实例了。
-
在 Confluent Cloud 中,转到集群的连接器部分,搜索我们在 上面 创建的插件
Neo4j Connector for Confluent Source。 -
点击该连接器开始配置我们的源连接器实例。
-
配置用于访问 Kafka 集群的 API 密钥,然后点击 Continue(继续)。
-
首先点击
Auto Configure Schema Registry(自动配置 Schema Registry),根据您的偏好选择JSON Schema、Avro或Protobuf,然后点击Apply changes(应用更改)。这将生成几个用于支持 Schema 的配置选项。接下来,以单独的键值对形式或添加到现有的 JSON 中来配置连接器选项。为了快速入门,我们将配置源实例,使其在匹配模式(:TestSource)的节点上发送变更事件消息,并根据您选择的序列化格式,将其发送到名为creates、updates和deletes的主题中。验证所有配置选项是否正确,然后点击
Continue(继续)。 -
在下一个屏幕上,我们需要添加连接端点,以便我们的连接器可以访问 Neo4j 或 AuraDB。从 Neo4j 连接 URI 中提取主机名和端口,并将其作为端点添加。请记住,Neo4j 连接的默认端口号是
7687。例如,对于连接 URIneo4j+s://<redacted>.databases.neo4j.io,我们应该输入<redacted>.databases.neo4j.io:7687作为端点。 -
接下来,选择您的连接器应运行的任务数量,然后点击
Continue(继续)。源连接器始终以 1 个任务运行,因此默认值1就足够了。 -
最后,命名您的连接器实例,检查您的设置,然后点击
Continue(继续)。 -
源实例将被配置,并在几分钟内显示为
Running(运行中)。
现在您已经有了一个正在运行的源实例,您可以在 Neo4j 中创建以下节点
CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});
这将导致新消息发布到名为 creates 的主题中。
创建 Sink 实例
在上一节中创建了源实例后,我们现在可以开始配置我们的 Sink 实例,以便我们可以对源实例生成的消息执行操作。
-
在 Confluent Cloud 中,转到集群的连接器部分,搜索插件
Neo4j Sink,该插件由 Confluent 作为托管连接器提供。 -
点击该连接器开始配置我们的 Sink 连接器实例。
-
选择
creates、updates和deletes主题供此 Sink 实例消费,然后点击 Continue(继续)。 -
配置用于访问 Kafka 集群的 API 密钥,然后点击 Continue(继续)。
-
输入包括认证方法、用户名和密码在内的 Neo4j 连接详细信息,然后点击 Continue(继续)。
-
根据您的传入消息序列化格式,选择
JSON Schema、Avro或Protobuf作为记录值格式。接下来,根据您的偏好配置连接器选项。为了快速入门,我们将配置 Sink 实例,使其为从creates、updates和deletes主题收到的每条消息执行 Cypher 语句。配置以下选项
主题的 Cypher 语句{ "creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)", "updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)", "deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p", }清除
Bind Timestamp As、Bind Header As和Bind Key As选项,将Bind Value As设置为__value,并将Bind Value As Event设置为false。保持其他选项为默认值,验证所有配置选项是否正确,然后点击
Continue(继续)。 -
接下来,选择您的连接器应运行的任务数量,然后点击
Continue(继续)。 -
最后,命名您的连接器实例,检查您的设置,然后点击
Continue(继续)。 -
Sink 实例将被配置,并在几分钟内显示为
Running(运行中)。
测试
现在您可以访问您的 Confluent Cloud 集群,并验证至少 creates 主题已按照连接器配置中的说明创建。
随着源连接器和 Sink 连接器同时运行,先前创建的 :TestSource 节点将导致源实例向 creates 主题发布消息。这些消息随后将被 Sink 实例消费,并在 Neo4j 内部创建相应的 :Person 和 :Family 节点。当您创建、更新和删除 TestSource 标签的节点时,也会创建 updates 和 deletes 主题。
通过在 https://:7474/browser/ 的 Neo4j Browser 中执行以下查询,检查是否已实现:
MATCH (n:(Person | Family)) RETURN n
您现在可以通过执行更多类似以下的语句来创建、更新或删除 Person 和 Family 节点:
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});
验证是否创建了一个新的 Person 和一个新的 Family 节点,并将它们连接在一起。
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';
验证现有的 Person 节点现在是否已更新姓氏为 smith,并链接到一个新的 Family 节点。
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;
验证现有的 Person 节点现在是否已删除。