Cypher 策略

此策略为每条接收到的消息执行相应的 Cypher 语句。

要为所需主题配置 Cypher 策略,必须遵循以下约定

"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"

从 Neo4j Kafka 连接器的 5.1.0 版本开始,Cypher 策略将消息的 header、key 和 value 分别绑定为 __header__key__value,这些会作为预定义变量传递给用户提供的 Cypher 查询。有关如何自定义变量名称的更多信息,请参阅 Cypher 策略设置

为向后兼容,仍然可以使用 event,但它仅对应消息的 value 部分。

示例

假设您在 sink 配置设置中如下配置了 sink 连接器订阅的主题;

  "topics": "creates,updates,deletes"

您需要声明要使用 cypher 策略,并为每个主题提供相应的 Cypher 语句,如下所示;

  "topics": "creates,updates,deletes",
  "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
  "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
  "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"

上述配置片段定义了:

  • creates 主题接收的消息将由 Sink 连接器使用以下 Cypher 查询写入 Neo4j

    WITH __value.event.state.after AS state
    MERGE (p:Person {name: state.properties.name, surname: state.properties.surname})
    MERGE (f:Family {name: state.properties.surname})
    MERGE (p)-[:BELONGS_TO]->(f)
  • updates 主题接收的消息将由 Sink 连接器使用以下 Cypher 查询写入 Neo4j

    WITH __value.event.state.before AS before, __value.event.state.after AS after
    MATCH (p:Person {name: before.properties.name, surname: before.properties.surname})
    MATCH (fPre:Family {name: before.properties.surname})
    OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre)
    DELETE b
    WITH after, p
    SET p.name = after.properties.name, p.surname = after.properties.surname
    MERGE (f:Family {name: after.properties.surname})
    MERGE (p)-[:BELONGS_TO]->(f)
  • deletes 主题接收的消息将由 Sink 连接器使用以下 Cypher 查询写入 Neo4j

    WITH __value.event.state.before AS before
    MATCH (p:Person {name: before.properties.name, surname: before.properties.surname})
    DETACH DELETE p

创建 Sink 实例

基于上述示例,您可以使用以下任意一种配置。选择一种消息序列化格式示例,并将其保存为名为 sink.cypher.neo4j.json 的文件到本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}

通过此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST https://:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cypher.neo4j.json

现在,您可以访问位于 https://:9021/clusters 的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。

CDC Source 连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 Source Monitoring

在内部,连接器会为每个主题创建一个变更批次,并在查询前加上 UNWIND 子句后执行。对于上述示例,从 creates 主题接收的消息执行的查询将如下所示;

UNWIND $events AS message
WITH message.value AS event, message.header AS __header, message.key AS __key, message.value AS __value
WITH __value.event.state.after AS state
MERGE (p:Person {name: state.properties.name, surname: state.properties.surname})
MERGE (f:Family {name: state.properties.surname})
MERGE (p)-[:BELONGS_TO]->(f)

其中 $events 是一批变更事件。