CUD 文件格式策略

CUD 文件格式是一种 JSON 文件,用于表示对节点或关系执行的操作(Create 创建、Update 更新、Delete 删除)。

配置

您可以按以下方式配置主题

"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

例如,如果您有两个主题 topic.1topic.2,并以 CUD 格式的消息发布,您可以按如下方式配置 Sink 实例。

"neo4j.cud.topics": "topic.1,topic.2"

创建 Sink 实例

基于上述示例,您可以使用以下配置之一。选择其中一个消息序列化格式示例,并将其保存为名为 sink.cud.neo4j.json 的文件放入本地目录中。

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}

通过此 REST 调用将配置加载到 Kafka Connect 中

curl -X POST https://:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cud.neo4j.json

现在,您可以访问位于 https://:9021/clusters 的 Confluent Control Center 实例。验证配置的连接器实例是否在 connect-default 下的 Connect 选项卡中运行。

CDC Source 连接器提供 JMX 指标来监控数据摄取的状态。有关更多信息,请参阅 Source Monitoring

CUD 格式规范

我们有两种格式

  • 节点

  • 关系

Node

节点 (Node) 格式定义了一组描述操作(createupdatemergedelete)、标签以及节点实体属性的字段。

表 1. 节点格式字段
字段 必填 描述

op

必填

操作类型。可以是 createmergeupdatedelete 之一。

delete 操作仅适用于单个节点。它不用于从 JSON 运行通用的 Cypher 查询。

属性

当操作为 delete 时,该字段可选

附加到节点的属性。

ID (ids)

当操作为 create 时,该字段可选

包含用于查找实体的主键/唯一键属性。如果您使用 _elementId(或 _id)作为属性名称,它将使用 Neo4j 的内部元素 ID 进行节点查找。

如果您在 merge 操作中使用 _elementId(或 _id),它将表现为简单的更新,即如果具有给定元素 ID 的节点不存在,则不会创建它。

标签

可选

附加到节点的标签。

尽管 Neo4j 允许创建不带标签的节点,但从性能角度来看,不建议这样做。

type

必填

实体类型:node

detach

可选

当操作为 delete 时,您可以指定是否执行 "detach" 删除

如果不提供值,则默认为 false

示例

  • CREATE 操作示例;

    {
      "type": "node",
      "op": "create",
      "labels": ["Foo", "Bar"],
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    它将被转换为以下 Cypher 查询

    CREATE (n:Foo:Bar) SET n = $properties
  • UPDATE 操作示例;

    {
      "type": "node",
      "op": "update",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • MERGE 操作示例;

    {
      "type": "node",
      "op": "merge",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    它将被转换为以下 Cypher 查询

    MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • DELETE 操作示例;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) DELETE n
  • detachtrueDELETE 操作示例;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "detach": true
    }

    它将被转换为以下 Cypher 查询

    MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n

Relationship

关系 (Relationship) 格式定义了一组描述操作(createupdatemergedelete)、关系类型、源节点和目标节点引用以及属性的字段。

表 2. 关系格式字段
字段 必填 描述

op

必填

操作类型。可以是 createmergeupdatedelete 之一。

属性

当操作为 delete 时,该字段可选

附加到关系的属性。

rel_type

必填

关系类型。

ID (ids)

可选

包含用于查找关系的主键/唯一键属性。

从 (from)

必填

包含有关关系源节点的信息。op 字段只能是 mergematch,默认为 match。有关 idslabels 字段的描述,请查看上文描述的节点字段。

如果您在 ids 中使用了 _elementId(或 _id)字段引用,则可以省略 labels 字段。

转换为

必填

包含有关关系目标节点的信息。op 字段只能是 mergematch,默认为 match。有关 idslabels 字段的描述,请查看上文描述的节点字段。

如果您在 ids 中使用了 _elementId(或 _id)字段引用,则可以省略 labels 字段。

type

必填

实体类型:relationship

示例

  • CREATE 操作示例;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "properties": {
        "by": "incident"
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • 带有源节点 merge 操作的 CREATE 示例;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        },
        "op": "merge"
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "match"
      },
      "properties": {
        "by": "incident"
      }
    }

    它将被转换为以下 Cypher 查询

    MERGE (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • UPDATE 操作示例;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • 带有 relationship idsUPDATE 操作示例;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • MERGE 操作示例;

    {
      "type": "relationship",
      "op": "merge",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • 带有 relationship idsMERGE 操作示例;

    {
      "type": "relationship",
      "op": "MERGE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • DELETE 操作示例;

    {
      "type": "relationship",
      "op": "delete",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    DELETE r
  • 带有 relationship idsDELETE 操作示例;

    {
      "type": "relationship",
      "op": "DELETE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "ids": {
        "id": 5
      }
    }

    它将被转换为以下 Cypher 查询

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    DELETE r