CUD 文件格式策略
CUD 文件格式是一种 JSON 文件,用于表示对节点或关系执行的操作(Create 创建、Update 更新、Delete 删除)。
配置
您可以按以下方式配置主题
"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"
例如,如果您有两个主题 topic.1 和 topic.2,并以 CUD 格式的消息发布,您可以按如下方式配置 Sink 实例。
"neo4j.cud.topics": "topic.1,topic.2"
创建 Sink 实例
基于上述示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其保存为名为 sink.cud.neo4j.json 的文件放入本地目录中。
通过此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST https://:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cud.neo4j.json
现在,您可以访问位于 https://:9021/clusters 的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。
|
CDC Source 连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 Source Monitoring。 |
CUD 格式规范
我们有两种格式
-
节点 -
关系
Node
节点 (Node) 格式定义了一组描述操作(create、update、merge 和 delete)、标签以及节点实体属性的字段。
| 字段 | 必填 | 描述 | ||
|---|---|---|---|---|
op |
必填 |
操作类型。可以是
|
||
属性 |
当操作为 |
附加到节点的属性。 |
||
ID (ids) |
当操作为 |
包含用于查找实体的主键/唯一键属性。如果您使用
|
||
标签 |
可选 |
附加到节点的标签。
|
||
type |
必填 |
实体类型: |
||
detach |
可选 |
当操作为
|
示例
-
CREATE操作示例;{ "type": "node", "op": "create", "labels": ["Foo", "Bar"], "properties": { "id": 1, "foo": "foo-value" } }它将被转换为以下 Cypher 查询
CREATE (n:Foo:Bar) SET n = $properties -
UPDATE操作示例;{ "type": "node", "op": "update", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "properties": { "id": 1, "foo": "foo-value" } }它将被转换为以下 Cypher 查询
MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties -
MERGE操作示例;{ "type": "node", "op": "merge", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "properties": { "id": 1, "foo": "foo-value" } }它将被转换为以下 Cypher 查询
MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties -
DELETE操作示例;{ "type": "NODE", "op": "delete", "labels": ["Foo", "Bar"], "ids": { "id": 0 } }它将被转换为以下 Cypher 查询
MATCH (n:Foo:Bar {id: $ids.id}) DELETE n -
detach为true的DELETE操作示例;{ "type": "NODE", "op": "delete", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "detach": true }它将被转换为以下 Cypher 查询
MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n
Relationship
关系 (Relationship) 格式定义了一组描述操作(create、update、merge 和 delete)、关系类型、源节点和目标节点引用以及属性的字段。
| 字段 | 必填 | 描述 | ||
|---|---|---|---|---|
op |
必填 |
操作类型。可以是 |
||
属性 |
当操作为 |
附加到关系的属性。 |
||
rel_type |
必填 |
关系类型。 |
||
ID (ids) |
可选 |
包含用于查找关系的主键/唯一键属性。 |
||
从 (from) |
必填 |
包含有关关系源节点的信息。
|
||
转换为 |
必填 |
包含有关关系目标节点的信息。
|
||
type |
必填 |
实体类型: |
示例
-
CREATE操作示例;{ "type": "relationship", "op": "create", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } }, "properties": { "by": "incident" } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end CREATE (start)-[r:RELATED_TO]->(end) SET r = $properties -
带有源节点
merge操作的CREATE示例;{ "type": "relationship", "op": "create", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 }, "op": "merge" }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "match" }, "properties": { "by": "incident" } }它将被转换为以下 Cypher 查询
MERGE (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end CREATE (start)-[r:RELATED_TO]->(end) SET r = $properties -
UPDATE操作示例;{ "type": "relationship", "op": "update", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "properties": { "by": "incident" } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO]->(end) SET r += $properties -
带有
relationship ids的UPDATE操作示例;{ "type": "relationship", "op": "update", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "ids": { "id": 5 }, "properties": { "by": "incident" } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end) SET r += $properties -
MERGE操作示例;{ "type": "relationship", "op": "merge", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "properties": { "by": "incident" } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MERGE (start)-[r:RELATED_TO]->(end) SET r += $properties -
带有
relationship ids的MERGE操作示例;{ "type": "relationship", "op": "MERGE", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "ids": { "id": 5 }, "properties": { "by": "incident" } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: to.ids.id}) WITH start, end MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end) SET r += $properties -
DELETE操作示例;{ "type": "relationship", "op": "delete", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO]->(end) DELETE r -
带有
relationship ids的DELETE操作示例;{ "type": "relationship", "op": "DELETE", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } }, "ids": { "id": 5 } }它将被转换为以下 Cypher 查询
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end) DELETE r