查询策略

查询策略允许用户定义自己的 Cypher 查询以提取更改。这需要适当的模式修改,例如通过节点或关系上的专用变更跟踪属性(如时间戳)进行跟踪,或使用软删除来记录实体的删除。

配置

首先,您需要为连接器实例选择 QUERY 策略;

"neo4j.source-strategy": "QUERY"

其次,您需要定义用于跟踪更改的查询以及发布位置。

"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 接收消息的主题名称。
2 一个 Cypher 查询,返回自上一次迭代以来已更改的实体,由 $lastCheck 参数提供。
3 用于作为光标跟踪更改的属性(字段名)。该属性必须包含在返回结果中。

为了获得良好的源查询性能,用于跟踪的属性(例如 timestamp 字段)应具有正确的类型约束,并在适当情况下使用索引或键约束进行支持。

如果连接器按此属性过滤或排序,缺少索引会导致全表扫描,从而显著放慢轮询,尤其是在数据集增长时。将正确类型的跟踪属性与索引(或创建索引的相应约束)一起使用,可帮助 Neo4j 更高效地定位新增或更新的记录。

创建 Source 实例

基于上述示例,您可以使用以下配置之一。选择一种消息序列化格式示例,并将其保存为名为 source.query.neo4j.json 的文件到本地目录。

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONString",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONString",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}

现在,我们将通过调用以下 REST 调用来创建源实例

curl -X POST https://:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.query.neo4j.json

这将创建一个 Kafka Connect source 实例,该实例会将通过提供的查询生成的变更事件消息发送到 my-topic 主题,使用您偏好的序列化格式。在 Control Center 中,确认在 Connect 选项卡下的 connect‑default 中已创建 Source 连接器。

在此情况下生成的变更事件消息将具有以下结构,每个字段都包装在专用的、类型安全的结构中

{
  "name": {
    "type": "S",
    "B": null,
    "I64": null,
    "F64": null,
    "S": "<name>",
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  },
  "surname": {
    "type": "S",
    "B": null,
    "I64": null,
    "F64": null,
    "S": "<surname>",
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  },
  "timestamp": {
    "type": "I64",
    "B": null,
    "I64": <timestamp>,
    "F64": null,
    "S": null,
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  }
}

虽然上述变更消息的序列化形式在属性类型更改方面具有向前兼容性,但可能会使消费者端的反序列化逻辑变得相当复杂且并非所期望。此时,您可以将 neo4j.payload-mode 设置为 COMPACT,使变更事件消息改为如下结构

{
  "name": "<name>",
  "surname": "<surname>",
  "timestamp": <timestamp>
}

如果您在 Cypher 查询中生成了复杂的数据结构,也可以使用 RAW_JSON_STRING 负载模式,该模式会生成数据的原始 JSON 字符串表示,而不提供任何模式兼容性保证。虽然生成的消息看起来与 COMPACT 模式完全相同,但它会被编码为 STRING 类型。

请参阅 payload mode 页面,以获取有关 neo4j.payload-mode 设置的更多信息以及其限制。