模式策略
此策略允许您通过提取模式从 Kafka 消息中提取节点和关系。
配置
要为目标主题配置模式策略,必须遵循以下约定
"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"
例如,给定发布到 user 和 lives_in 主题的以下消息
{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}
您可以通过提供以下配置将其转换为节点
"neo4j.pattern.topic.user": "(:User{!id: userId})"
您还可以通过提供以下配置来提供关系模式,将其转换为路径,例如 (n)-[r]→(m)
"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
| 关系键属性仅适用于 Neo4j Enterprise Edition 5.7 及更高版本以及 AuraDB 5。 |
创建 Sink 实例
基于上述示例,您可以使用以下配置之一。选择一个消息序列化格式示例,并将其保存为名为 sink.pattern.neo4j.json 的文件到本地目录中。
通过此 REST 调用将配置加载到 Kafka Connect 中
curl -X POST https://:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.pattern.neo4j.json
现在,您可以访问位于 https://:9021/clusters 的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。
|
CDC Source 连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 Source Monitoring。 |
模式
节点模式
节点模式的定义类似于 Cypher 节点模式。
-
以
(开头。 -
定义可选的标签列表,用
:分隔,例如:Person或:Person:Employee。 -
用
{打开属性部分。 -
定义要视为键的属性,每个属性前加上
!,至少需要提供一个键属性。可以显式引用单个消息字段并将其分配给用户定义的属性,格式为userId: __key.user.id。默认情况下,消息字段可以引用为__timestamp、__headers、__key和__value。 -
二选一:
-
无内容或
*,意味着将消息中的所有属性分配给节点。 -
要从消息分配给节点的属性名称列表。可以显式引用单个消息字段并将其分配给用户定义的属性,格式为
userName: __value.user.name。默认情况下,消息字段可以引用为__timestamp、__headers、__key和__value。 -
不分配给节点的属性名称列表,每个名称前加上
-,消息中的所有其他属性都将分配给节点。
-
-
用
}关闭属性部分。 -
以
)结尾。
| 模式中不能混合使用包含和排除,即您的模式必须仅包含排除属性或仅包含包含属性。 |
示例
-
对
User标签执行MERGE操作,将userId视为键,并将传入消息值中的所有属性分配给节点(:User{!userId})或
(:User{!userId, *}) -
对
User标签执行MERGE操作,将userId视为键,并**仅**将传入消息中的surname属性分配给节点(:User{!userId, surname}) -
对
User标签执行MERGE操作,将userId视为键,并**仅**将传入消息中的surname和address.city属性分配给节点(:User{!userId, surname, city: address.city}) -
对
User标签执行MERGE操作,将userId视为键,并将除address属性外的所有传入消息属性分配给节点(:User{!userId, -address}) -
对
User标签执行MERGE操作,将userId视为从消息键部分获取的键,并**仅**将传入消息值部分中的name和surname属性分配给节点(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName}) -
对
User标签执行MERGE操作,将userId视为从传入消息的键部分获取的键,并**仅**将值部分的name、surname属性,标头中的createdBy属性,以及作为当前时间戳的createdAt属性分配给节点(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})
关系模式
关系模式的定义类似于 Cypher 关系模式。
-
起始节点的节点模式。
-
-[ -
定义关系类型,前缀为
:,例如:BOUGHT或:KNOWS。 -
用
{打开属性部分。 -
[可选] 定义要视为键的属性,每个属性前加上
!,至少需要提供一个键属性。可以显式引用单个消息字段并将其分配给用户定义的属性,格式为relationshipId: __key.relationship.id。默认情况下,消息字段可以引用为__timestamp、__headers、__key和__value。 -
二选一:
-
无内容或
*,意味着将消息中的所有属性分配给关系。 -
要从消息分配给关系的属性名称列表。可以显式引用单个消息字段并将其分配给用户定义的属性,格式为
relationshipType: __value.relationship.type。默认情况下,消息字段可以引用为__timestamp、__headers、__key和__value。 -
不分配给关系的属性名称列表,每个名称前加上
-,消息中的所有其他属性都将分配给关系。
-
-
用
}关闭属性部分。 -
]-> -
结束节点的节点模式。
| 模式中不能混合使用包含和排除,即您的模式必须包含所有排除属性或所有包含属性。 |
示例
-
对
User和Product标签执行MERGE操作,分别将userId和productId视为键,并在它们之间MERGE一个BOUGHT关系,并将所有其他消息属性分配给该关系(:User{!userId})-[:BOUGHT]->(:Product{!productId}) -
对
User和Product标签执行MERGE操作,分别将userId和productId视为键,并在它们之间MERGE一个BOUGHT关系,**仅**将传入消息中的price和currency属性分配给该关系(:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId}) -
对
User和Product标签执行MERGE操作,分别将userId和productId视为键,并在它们之间MERGE一个BOUGHT关系,**仅**将传入消息中的price、currency和shippingAddress.city属性分配给该关系(:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId}) -
对
User和Product标签执行MERGE操作,分别将userId和productId视为键,并在它们之间MERGE一个BOUGHT关系,分配除shippingAddress外的**所有**传入消息属性给该关系(:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId}) -
对
User和Product标签执行MERGE操作,分别将userId和productId视为键,将消息中的userFirstName和userLastName属性分配给User节点,并在它们之间MERGE一个BOUGHT关系,**仅**将消息中的price和currency属性分配给该关系(:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId}) -
对
User和Product标签执行MERGE操作,分别将userId和productId视为键,并在它们之间MERGE一个BOUGHT关系,**仅**将消息键部分中的transactionId属性视为键属性,并将消息值部分中的date分配给该关系(:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})
墓碑记录 (Tombstone records)
模式策略支持 墓碑记录。要使用此功能,消息键应至少包含所提供模式中存在的键属性,且消息值应设置为 null。
| 无法为单个主题定义多个模式,例如无法从单条消息中提取多个节点或关系类型。要实现此目的,必须为每个模式使用不同的主题。 |