变更数据捕获 (CDC) 策略
变更数据捕获 (CDC) 策略利用了 Neo4j 和 Aura Enterprise 5 提供的变更数据捕获 (CDC) 功能。这是源连接器实例的首选策略,因为它不需要进行任何模式更改,并且能够可靠地捕获删除操作。在配置此策略的源实例之前,请务必执行变更数据捕获 > 入门中描述的必要步骤。
要配置此策略,您需要定义模式 (patterns) 和选择器,以描述您要跟踪哪些节点或关系的变更,并将它们分配给主题 (topics)。
| 当在本地安装中从备份恢复数据库,从快照恢复,或者在 Neo4j Aura 中暂停并恢复数据库时,现有的变更标识符将不再起作用,您需要从头开始重新配置您的源实例。有关更多信息,请参阅变更数据捕获 > 恢复备份和快照。 |
配置
首先,您需要为连接器实例选择 CDC 策略;
"neo4j.source-strategy": "CDC"
其次,您需要定义您的模式并将它们映射到您的主题;
"neo4j.cdc.topic.my-topic.patterns": "(:Person),(:Person)-[:KNOWS]-(:Person)"
虽然上述配置是为了方便起见而提供的,但如果您需要为变更事件定义其他过滤器(例如操作、已更改的属性名称或元数据字段),则需要使用如下所示的索引配置方法;
"neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)", (1)
"neo4j.cdc.topic.my-topic.patterns.0.operation": "create", (2)
"neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname", (3)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j", (4)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j", (5)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales", (6)
"neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
"neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
"neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
| 1 | 一种用于识别要监控变更的图实体的单一模式。 |
| 2 | 我们感兴趣的单一操作,可以是 create(创建)、update(更新)或 delete(删除)。 |
| 3 | 需要更新才能返回变更消息的属性列表。属性的添加和删除也算作更新。 |
| 4 | 执行变更的已认证用户。 |
| 5 | 执行变更的执行用户。通常与已认证用户相同,但如果使用了模拟 (impersonation),则可能不同。 |
| 6 | 需要与执行变更的事务的事务元数据进行匹配的键值对。 |
在上述示例中,只有 pattern 设置是强制性的,其他均为可选,可以根据您的要求添加。 |
精确一次语义 (Exactly Once Semantics)
从 5.2.0 版本开始,CDC 源连接器声明自己支持“精确一次”语义。对于支持 KIP-618 的环境,CDC 源连接器将利用此功能,无需任何额外配置。
|
“精确一次”语义保证每个变更事件仅准确地传递到目标 Kafka 主题一次,消除了在连接器重启或故障期间可能发生的重复记录。此功能需要支持 KIP-618 的 Kafka Connect 集群(在 Apache Kafka 3.3.0 及更高版本中提供)。 |
创建源实例
基于上述示例,您可以使用以下配置之一。选择一种消息序列化格式示例,并将其保存为名为 source.cdc.neo4j.json 的文件到本地目录。
现在,我们将通过调用以下 REST 调用来创建源实例
curl -X POST https://:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.cdc.neo4j.json
这将创建一个 Kafka Connect 源实例,该实例将使用您首选的序列化格式,将与提供的选择器匹配的变更事件消息发送到 my-topic 主题。在控制中心 (Control Center) 中,确认源连接器已在 Connect 选项卡的 connect-default 下创建。
|
CDC 源连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 源监控。 |
模式 (Patterns)
节点模式
节点模式的定义类似于 Cypher 节点模式。
-
以
(开头。 -
[可选] 定义可选的标签列表,用
:分隔,例如:Person或:Person:Employee。 -
[可选] 用
{打开属性部分。-
[可选] 定义要用作键过滤器的属性及其值,格式为
key: value。可以定义多个属性,并且必须用,分隔。这些属性必须对应于 NODE KEY 约束属性。 -
可以是;
-
为空或
*,意味着将 JSON 对象中的所有属性分配给节点。 -
属性名称列表,用
,分隔,用于从 JSON 对象分配给节点。 -
不分配给节点的属性名称列表,每个名称前加上
-并用,分隔,JSON 对象中的所有其他属性都将分配给节点。
-
-
用
}关闭属性部分。
-
-
以
)结尾。
| 您不能混合使用包含和排除属性,因此您的模式必须仅包含所有排除属性或仅包含所有包含属性。 |
示例
-
选择任意节点上的所有变更。
() -
选择带有
:User标签的节点上的所有变更。(:User) -
选择同时带有
:User和:Employee标签的节点上的所有变更。(:User:Employee) -
选择带有
:User标签的节点上的所有变更,并且仅在变更事件中包含name和surname属性。(:User{name, surname}) -
选择带有
:User标签的节点上的所有变更,并在变更事件中排除adress和dob属性。(:User{-address, -dob}) -
选择带有
:User标签且键属性userId等于1001的节点上的所有变更,并在变更事件中包含name和surname属性。(:User{userId: 1001, name, surname})此示例要求 :User标签的userId属性上存在 NODE KEY 约束。 -
选择同时带有
:User和Employee标签,且键属性name等于john且surname等于doe的节点上的所有变更。(:User:Employee{name: 'john', surname: 'doe'})此示例要求 :User或:Employee标签(或两者)的name和surname属性上存在 NODE KEY 约束。
关系模式
关系模式的定义类似于 Cypher 关系模式。
-
起始节点的节点模式,不包含任何属性包含或排除列表。
-
-[. -
定义关系类型,前面加上
:,例如:BOUGHT或:KNOWS。 -
[可选] 用
{打开属性部分。-
[可选] 定义要用作键过滤器的属性及其值,格式为
key: value。 -
可以是;
-
为空或
*,意味着将 JSON 对象中的所有属性分配给节点。 -
属性名称列表,用
,分隔,用于从 JSON 对象分配给节点。 -
不分配给节点的属性名称列表,每个名称前加上
-并用,分隔,JSON 对象中的所有其他属性都将分配给节点。
-
-
用
}关闭属性部分。
-
-
]->. -
结束节点的节点模式,不包含任何属性包含或排除列表。
| 您不能混合使用包含和排除,因此您的模式必须包含所有排除或包含属性。 |
示例
-
选择
:BOUGHT关系上的所有变更。()-[:BOUGHT]->() -
选择
:BOUGHT关系上的所有变更,且起始节点标签为:User,结束节点标签为:Product。(:User)-[:BOUGHT]->(:Product) -
选择
:BOUGHT关系上的所有变更,且起始节点标签为:User和:Employee,结束节点标签为:Product。(:User:Employee)-[:BOUGHT]->(:Product) -
选择
:BOUGHT关系上的所有变更,起始节点标签为:User,结束节点标签为:Product,并且仅在变更事件中包含price和currency属性。(:User)-[:BOUGHT{price, currency}]->(:Product) -
选择
:BOUGHT关系上的所有变更,起始节点标签为:User,结束节点标签为:Product,并在变更事件中排除card属性。(:User)-[:BOUGHT{-card}]->(:Product) -
选择由键属性
contractId等于5910标识的:WORKS_FOR关系上的所有变更。()-[:WORKS_FOR{contractId: 5910}]->()此示例要求 :WORKS_FOR关系类型的contractId属性上存在 RELATIONSHIP KEY 约束。 -
选择由键属性
contractId等于5910标识的:WORKS_FOR关系上的所有变更,并在变更事件中排除salary属性。()-[:WORKS_FOR{contractId: 5910,-salary}]->() -
选择从标签为
:User且键属性userId等于1001的节点开始的关系上的所有变更。(:User{userId: 1001})-[]->()