Confluent Platform 快速入门

概述

我们将使用 Docker Compose 创建一个包含 Confluent Platform 组件和运行在 Docker 中的 Neo4j 的环境。

Neo4j Kafka 连接器将首先配置一个源(source)实例。该源将从节点模式 (:TestSource)CREATEUPDATEDELETE 操作中检索变更。接收到的变更随后将根据操作类型发布到 createsupdatesdeletes 主题中。

接下来,我们将创建一个接收器(sink)实例,它将监听 createsupdatesdeletes 主题中的消息,并在收到消息时执行 Cypher 语句,以在 Neo4j 中应用相应的变更。

以下指南使用 Confluent Platform 的 Docker 镜像。

使用 Docker Compose 运行

将以下 Docker Compose 文件复制到目标目录中。

下面提供的示例 docker-compose.yml 文件利用了 Docker Compose 的最新功能,因此需要较新版本的 Docker Compose。请确保您拥有的工具版本至少为 v2.20.3。
docker-compose.yml
---
services:
  neo4j:
    image: neo4j:2026-enterprise
    hostname: neo4j
    container_name: neo4j
    # this is to ensure you have the latest 2026.x version of the database
    pull_policy: always
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/password
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_server_memory_heap_max__size: "4G"
    healthcheck:
      test: [ "CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1" ]
      start_period: 2m
      start_interval: 10s
      interval: 30s
      timeout: 10s
      retries: 5

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "2181" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  broker:
    image: confluentinc/cp-server:7.5.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "9092" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "8081" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  connect:
    image: confluentinc/cp-server-connect:7.5.2
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "8083" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.2
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    healthcheck:
      test: [ "CMD", "curl", "-f", "https://:9021" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

将以下 Neo4j Kafka 连接器工件复制到与您的 docker-compose.yml 文件位于同一目录下的 plugins 目录中。目录结构应如下所示:

quickstart/
├─ plugins/
│  ├─ neo4j-kafka-connect-5.3.0.jar
├─ docker-compose.yml

打开终端,进入 Docker Compose 文件的目录并运行

docker compose up -d

当进程完成时,所有模块都应处于运行状态。您可以按如下方式检查所有服务的状态

docker compose ps

这应该会返回一个表格,显示每个服务都在正常运行。

NAME                COMMAND                  SERVICE             STATUS              PORTS
broker              "/etc/confluent/dock…"   broker              running             0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect             "bash -c '# confluen…"   connect             running             0.0.0.0:8083->8083/tcp, 9092/tcp
control-center      "/etc/confluent/dock…"   control-center      running             0.0.0.0:9021->9021/tcp
neo4j               "tini -g -- /startup…"   neo4j               running             0.0.0.0:7474->7474/tcp, 7473/tcp, 0.0.0.0:7687->7687/tcp
schema-registry     "/etc/confluent/dock…"   schema-registry     running             0.0.0.0:8081->8081/tcp
zookeeper           "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

现在,您可以访问您的 Neo4j 实例:https://:7474,使用 neo4j 作为用户名,password 作为密码登录(更新 Docker Compose 文件中的 NEO4J_AUTH 环境变量可更改密码)。确认您可以访问 https://:9021/clusters 处的 Confluent Control Center 实例,并确认集群状态正常(这可能需要 90-120 秒)。在 Control Center 中,您应该会看到一个 Broker、多个主题以及一个 Connect 集群。

启用 CDC

通过执行以下 Cypher 命令在源数据库上启用变更数据捕获 (CDC)。有关变更数据捕获及其启用的更多信息,请参阅本地安装的 Change Data Capture > Enable CDC > Neo4j DBMS 以及 Aura 的 Change Data Capture > Enable CDC > Aura

ALTER DATABASE neo4j SET OPTION txLogEnrichment 'FULL';

使用 CDC 的源

首先,我们需要将 Neo4j 设置为提供主题消息的源数据库。选择以下一种消息序列化格式,将所提供文件的内容保存到名为 source.neo4j.json 的本地目录中。

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}

现在,我们将通过调用以下 REST 调用来创建源实例

curl -X POST https://:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.neo4j.json

这将创建一个 Kafka Connect 源实例,该实例将使用您首选的序列化格式将变更事件消息发送到名为 createsupdatesdeletes 的主题。在 Control Center 中,确认源连接器已在 Connect 选项卡下的 connect-default 中创建。

如图所示,您可以配置多个模式以读取变更,并将它们发布到您选择的主题中。因此,根据上述配置,连接器将读取标签为 TestSource 的节点上发生的变更,并且消息结构将基于 Change Data Capture > Change event schema,并根据配置的消息格式进行序列化。根据操作类型,预期的变更事件将具有以下结构。

{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": null,
      "after": {
        "properties": {
          "name": "<name>",
          "surname": "<surname>"
        },
        "labels": ["TestSource"]
      }
    },
    "operation": "c",
    "labels": ["TestSource"]
  }
}
{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": {
        "properties": {
          "name": "<old-name>",
          "surname": "<old-surname>"
        },
        "labels": ["TestSource"]
      },
      "after": {
        "properties": {
          "name": "<new-name>",
          "surname": "<new-surname>"
        },
        "labels": ["TestSource"]
      }
    },
    "operation": "u",
    "labels": ["TestSource"]
  }
}
{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": {
        "properties": {
          "name": "<name>",
          "surname": "<surname>"
        },
        "labels": ["TestSource"]
      },
      "after": null
    },
    "operation": "d",
    "labels": ["TestSource"]
  }
}

现在您已经拥有了一个正在运行的源实例,您可以在 Neo4j 中创建以下节点

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

这将导致新消息发布到名为 creates 的主题中。

使用 Cypher 的接收器

设置好源连接器后,下一步是配置一个接收器连接器,用于消费发布到 createsupdatesdeletes 主题的消息。

首先,将以下 JSON 文件保存到名为 sink.neo4j.json 的本地目录中。

{
  "name": "Neo4jSinkConnectorCypherAVRO",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorCypherJSONSchema",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorCypherProtobuf",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}

现在,我们将通过调用以下 REST 请求来创建接收器实例

curl -X POST https://:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @sink.neo4j.json

这将配置接收器实例以使用您首选的序列化格式消费数据。Cypher 策略将根据属性 neo4j.cypher.topic.createsneo4j.cypher.topic.updatesneo4j.cypher.topic.deletes 中定义的 Cypher 查询模板构建 Cypher 查询。

测试

现在,您可以访问 https://:9021/clusters 处的 Confluent Control Center 实例,并验证至少 creates 主题已按照连接器配置中的说明创建,以及源和接收器连接器实例正在 Connect (connect-default) 下运行。

当源连接器和接收器连接器都运行时,之前创建的 :TestSource 节点将导致源实例将消息发布到 creates 主题中。这些消息随后将被接收器实例消费,并在 Neo4j 中创建相应的 :Person:Family 节点。当您创建、更新和删除标记为 TestSource 的节点时,updatesdeletes 主题也将被创建。

通过在 https://:7474/browser/ 的 Neo4j 浏览器中执行以下查询,检查是否如此

MATCH (n:(Person | Family)) RETURN n

现在,您可以执行更多类似以下语句的操作来创建、更新或删除 Person 和 Family 节点

创建一个新人
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

验证是否创建了一个新的 Person 和一个新的 Family 节点并已关联。

更新现有人员
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

验证现有的 Person 节点是否已更新姓氏为 smith 并已链接到新的 Family 节点。

删除现有人员
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

验证现有的 Person 节点是否已被删除。

总结

在本快速入门中,我们展示了如何配置 Neo4j 数据库,使其既作为 Kafka 主题的消息源,又作为同一主题的接收器,以便在数据库中创建、更新或删除节点和关系。通常,我们的连接器要么在通过 Apache Kafka 或 Confluent 从其他数据源提取数据时用作接收器,要么在 Apache Kafka 或 Confluent 将数据推送到其他数据库时用作源。

故障排除

如果您没有看到任何消息发布到 createsupdatesdeletes 主题,或者没有看到任何 :Family:Person 节点被创建,请通过执行以下命令检查 Kafka Connect 日志并解决报告的任何问题。

docker compose logs connect