变更数据捕获 (CDC) 策略

此策略允许从另一个 Neo4j 实例接收 CDC 事件,这些事件由配置了 变更数据捕获策略 的源连接器实例或已弃用的 Neo4j Streams 插件生成。

变更数据捕获事件需要由相同对应版本的源连接器生成,且必须使用支持模式 (schema) 的值转换器进行配置。

提供两种子策略

  • Schema(模式)策略根据源数据库中定义的约束(节点键、关系键和/或属性唯一性+存在性)来合并节点和关系。

  • Source ID(源 ID)策略根据 CDC 事件的 elementIdid 字段(Neo4j 内部实体标识符)来合并节点和关系。

Schema 子策略

Schema 策略使用变更事件中声明的约束来合并节点和关系,从而保留源模式结构。

配置此策略需要声明用于读取变更事件的主题列表。

"neo4j.cdc.schema.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

示例

假设您按下述方式配置接收器 (Sink) 连接器订阅的主题;

"topics": "topic.1,topic.2"

您需要通过提供要消费变更事件的主题列表,来声明您希望使用 cdc.schema 策略。

"neo4j.cdc.schema.topics": "topic.1,topic.2"

随后,每个变更事件都将被投影为图实体。

考虑此节点创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
  "seq": 0,
  "txId": 12,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T20:51:56.769Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T20:51:56.714Z"
  },
  "event": {
    "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "eventType": "n",
    "operation": "c",
    "keys": {
      "Person": [
        {
          "first_name": "John",
          "last_name": "Doe"
        }
      ]
    },
    "labels": [
      "Person"
    ],
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Person"
        ],
        "properties": {
          "email": "john.doe@example.com",
          "first_name": "John",
          "last_name": "Doe"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "email": "john.doe@example.com",
        "last_name": "Doe",
        "first_name": "John"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [
      {
        "label": "Person",
        "properties": [
          "first_name",
          "last_name"
        ],
        "type": "UNIQUE"
      }
    ]
  }
}

关系的持久化如下所示,Sink 连接器使用 keysschema 字段来插入/更新节点,而无需额外的属性或标签。

(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"}
(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"})

考虑此关系创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
  "txId": 20,
  "seq": 0,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T21:34:02.965Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T21:34:02.867Z"
  },
  "event": {
    "elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "type": "KNOWS",
    "eventType": "r",
    "operation": "c",
    "start": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
      "keys": {
        "Person": [
          {
            "first_name": "John",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "end": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
      "keys": {
        "Person": [
          {
            "first_name": "Mary",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "keys": [],
    "state": {
      "before": null,
      "after": {
        "properties": {
          "since": "2012-01-01"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": [
        "Person"
      ],
      "id": "123",
      "ids": {
        "last_name": "Doe",
        "first_name": "John"
      }
    },
    "end": {
      "labels": [
        "Person"
      ],
      "id": "456",
      "ids": {
        "last_name": "Doe",
        "first_name": "Mary"
      }
    },
    "after": {
      "properties": {
        "since": "2012-01-01"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "LocalDateTime"
    },
    "constraints": [
      {
        "label": "KNOWS",
        "properties": [
          "since"
        ],
        "type": "RELATIONSHIP_PROPERTY_EXISTS"
      }
    ]
  }
}

关系的持久化如下所示,Sink 连接器使用变更事件中起始和结束节点的 keys 字段来创建或更新关系,同样无需额外的属性或标签。

(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})
(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})

创建 Sink 实例

基于上述示例,您可以使用以下配置之一。选择其中一种消息序列化格式示例,并将其另存为名为 sink.cdc.schema.neo4j.json 的文件放入本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}

通过此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST https://:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cdc.schema.neo4j.json

现在,您可以访问位于 https://:9021/clusters 的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。

CDC Source 连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 Source Monitoring

Source ID 子策略

Source ID 策略通过将源实体的 elementIdid 值存储为目标节点和关系上的显式属性,并使用显式标签标记节点,从而合并节点和关系。

配置此策略需要声明用于读取变更事件的主题列表。您可以添加可选的标签名称作为标记,并添加可选的属性名称来存储源实体的 elementIdid 值。

"neo4j.cdc.source-id.topics": "<comma-separated list of topics>"
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>"
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"

示例

假设您按下述方式配置接收器 (Sink) 连接器订阅的主题;

"topics": "topic.1,topic.2"

您需要通过提供要消费变更事件的主题列表,来声明您希望使用 cdc.source-id 策略。

"neo4j.cdc.source-id.topics": "topic.1,topic.2"

随后,每个变更事件都将被投影为图实体。

考虑此节点创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
  "seq": 0,
  "txId": 12,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T20:51:56.769Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T20:51:56.714Z"
  },
  "event": {
    "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "eventType": "n",
    "operation": "c",
    "keys": {
      "Person": [
        {
          "first_name": "John",
          "last_name": "Doe"
        }
      ]
    },
    "labels": [
      "Person"
    ],
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Person"
        ],
        "properties": {
          "email": "john.doe@example.com",
          "first_name": "John",
          "last_name": "Doe"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "email": "john.doe@example.com",
        "last_name": "Doe",
        "first_name": "John"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [
      {
        "label": "Person",
        "properties": [
          "first_name",
          "last_name"
        ],
        "type": "UNIQUE"
      }
    ]
  }
}

节点的持久化如下所示,Sink 连接器使用节点变更事件的 elementIdid 字段来创建或更新节点。

(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}
(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "1004"})

考虑此关系创建事件

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
  "txId": 20,
  "seq": 0,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T21:34:02.965Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T21:34:02.867Z"
  },
  "event": {
    "elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "type": "KNOWS",
    "eventType": "r",
    "operation": "c",
    "start": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
      "keys": {
        "Person": [
          {
            "first_name": "John",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "end": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
      "keys": {
        "Person": [
          {
            "first_name": "Mary",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "keys": [],
    "state": {
      "before": null,
      "after": {
        "properties": {
          "since": "2012-01-01"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": [
        "Person"
      ],
      "id": "123",
      "ids": {
        "last_name": "Doe",
        "first_name": "John"
      }
    },
    "end": {
      "labels": [
        "Person"
      ],
      "id": "456",
      "ids": {
        "last_name": "Doe",
        "first_name": "Mary"
      }
    },
    "after": {
      "properties": {
        "since": "2012-01-01"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "LocalDateTime"
    },
    "constraints": [
      {
        "label": "KNOWS",
        "properties": [
          "since"
        ],
        "type": "RELATIONSHIP_PROPERTY_EXISTS"
      }
    ]
  }
}

关系的持久化如下所示,Sink 连接器使用变更事件中起始和结束节点的 elementIdid 字段来创建或更新关系。

(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"})-[:KNOWS {since: "2012-01-01", sourceId: "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1"})
(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "123"})-[:KNOWS {since: "2012-01-01", sourceId: "123"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "456"})

创建 Sink 实例

基于上述示例,您可以使用以下配置之一。选择其中一种消息序列化格式示例,并将其另存为名为 sink.cdc.source-id.neo4j.json 的文件放入本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}

通过此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST https://:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cdc.source-id.neo4j.json

现在,您可以访问位于 https://:9021/clusters 的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。

CDC Source 连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 Source Monitoring

CDC 事件的批量处理

从 5.2.0 版本开始,提供了新的批量 CDC Sink 处理程序,以便在处理 CDC 事件时提供更好的吞吐量。这些处理程序以批次方式处理消息,并更有效地将它们应用到数据库。

目前 CDC Sink 处理程序使用两种批量处理策略

  1. 基于 APOC Core 的策略:此策略使用 apoc.cypher.doIt 过程来执行由 CDC 事件生成的 Cypher 语句批次。

  2. 原生策略:此策略为批次中的每条消息生成包含多个子查询的单个 Cypher 语句,并使用标准的 Cypher 执行引擎执行它。使用原生策略时,单个批次中包含的子查询最大数量可以通过 neo4j.max-batched-queries 设置进行配置,默认值为 50

连接器在可用时优先使用基于 APOC Core apoc.cypher.doIt 的批量处理策略,当目标数据库中没有 APOC Core 时,则回退到原生策略。原生策略仅从 5.3.0 版本开始可用。

适用于两种批量处理策略的批次大小,可以通过 neo4j.batch-size 设置进行配置,默认值为 1000

精确一次语义 (Exactly-once semantics)

从 5.3.0 版本开始,可以配置 CDC Sink 处理程序通过在数据库中跟踪最后成功处理的消息偏移量来实现精确一次的处理保证。这是通过存储一个代表最后成功处理消息偏移量的专用节点来实现的。使用 neo4j.eos-offset-label 设置来指定要附加到偏移量节点的标签名称。此设置默认为空,意味着禁用了精确一次语义,连接器将不会在目标数据库中跟踪处理过的消息偏移量,这意味着仅提供至少一次 (at-least-once) 的处理保证。

请确保配置的标签在 'strategy'、'topic' 和 'partition' 属性上定义了节点键 (NODE KEY) 约束。

假设您的 neo4j.eos-offset-label 设置为 __KafkaOffset,则应在目标数据库上创建以下约束

CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS FOR (n:__KafkaOffset) REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY