创建作业规范文件
作业配置文件指导 Dataflow 如何运行导入(数据来源、如何映射到 Neo4j 等)。它由一个包含四个部分的 JSON 对象组成。
{
"version": "1",
"config": { ... }, (1)
"sources": [ (2)
{ ... }
],
"targets": [ (3)
{ ... }
],
"actions": [ (4)
{ ... }
]
}
| 1 | config — 影响导入执行方式的全局标志(可选) |
| 2 | sources — 数据源定义(关系型) |
| 3 | targets — 数据目标定义(图:节点/关系/Cypher 查询) |
| 4 | actions — 一次性操作(可选) |
从宏观上看,作业从 sources 获取数据,并将其转换/导入到 targets 中。
有效的规范文件必须至少包含一个源对象和一个目标对象。
完整示例
以下是一个可直接运行的作业规范文件示例,用于导入公开的 movies 数据集。
该数据集包含 Person 和 Movie 实体,通过 DIRECTED 和 ACTED_IN 关系进行连接。换句话说,每个 Person 可能 DIRECTED(导演了)和/或 ACTED_IN(参演了)某部 Movie。实体和关系都附加了额外的详细信息。数据源自以下文件: persons.csv, movies.csv, acted_in.csv, directed.csv。
接下来的章节将对其进行分解,并提供各部分的详细信息。我们建议结合作业规范示例阅读本指南。
{
"version": "1",
"config": {
"reset_db": true
},
"sources": [
{
"type": "bigquery",
"name": "persons",
"query": "SELECT person_tmdbId, name, bornIn, born, died FROM team-connectors-dev.movies.persons"
},
{
"type": "bigquery",
"name": "movies",
"query": "SELECT movieId, title, imdbRating, year FROM team-connectors-dev.movies.movies"
},
{
"type": "bigquery",
"name": "directed",
"query": "SELECT movieId, person_tmdbId FROM team-connectors-dev.movies.directed"
},
{
"type": "bigquery",
"name": "acted_in",
"query": "SELECT movieId, person_tmdbId, role FROM team-connectors-dev.movies.acted_in"
}
],
"targets": {
"nodes": [
{
"source": "persons",
"name": "Persons",
"write_mode": "merge",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_property": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_property": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_property": "bornDate",
"target_property_type": "date"
},
{
"source_field": "died",
"target_property": "diedDate",
"target_property_type": "date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
},
{
"source": "movies",
"name": "Movies",
"write_mode": "merge",
"labels": [ "Movie" ],
"properties": [
{
"source_field": "movieId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "title",
"target_property": "title",
"target_property_type": "string"
},
{
"source_field": "year",
"target_property": "releaseYear",
"target_property_type": "string"
},
{
"source_field": "imdbRating",
"target_property": "imdbRating",
"target_property_type": "float"
}
],
"schema": {
"key_constraints": [
{
"name": "movieIdKey",
"label": "Movie",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "movieTitleUnique",
"label": "Movie",
"properties": ["title"]
}
]
}
}
],
"relationships": [
{
"source": "directed",
"name": "Directed",
"write_mode": "merge",
"node_match_mode": "match",
"type": "DIRECTED",
"start_node_reference": "Persons",
"end_node_reference": "Movies"
},
{
"source": "acted_in",
"name": "Acted_in",
"write_mode": "merge",
"node_match_mode": "match",
"type": "ACTED_IN",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_property": "role",
"target_property_type": "string"
}
]
}
]
}
}
配置
config 对象包含导入作业的全局配置。所有设置都有默认值,因此除非您希望更改它们,否则无需指定。
"config": {
"reset_db": false,
"index_all_properties": false,
"node_target_batch_size": 5000,
"relationship_target_batch_size": 1000,
"query_target_batch_size": 1000,
"node_target_parallelism": 10,
"relationship_target_parallelism": 1,
"query_target_parallelism": 1
}
-
reset_db(bool) — 是否在导入前清空目标数据库。会删除数据、索引和约束。 -
index_all_properties(bool) — 是否为所有属性创建索引。请参阅 Cypher® → 搜索性能索引。 -
node_target_batch_size(int) — 每个节点目标导入事务处理的行数。 -
relationship_target_batch_size(int) — 每个关系目标事务处理的行数。 -
query_target_batch_size(int) — 每个自定义查询事务处理的行数。 -
node_target_parallelism(int) — 每个工作节点(worker)节点目标的最大并发事务数。 -
relationship_target_parallelism(int) — 每个工作节点关系目标的最大并发事务数。设置大于1的值时应谨慎,因为它们可能会导致死锁。 -
query_target_parallelism(int) — 每个工作节点 Cypher 查询目标的最大并发事务数。设置大于1的值时应谨慎,因为它们可能会导致死锁。
源 (Sources)
sources 部分以列表形式包含数据源的定义。粗略来说,您可以理解为 一张表 <=> 一个源。导入程序将获取源呈现的数据并将其提供给目标,目标最终将其映射到 Neo4j。
{
"type": "bigquery",
"name": "<sourceName>",
"query": "<bigQuerySqlQuery>",
"query_temp_project": "<projectName>",
"query_temp_dataset": "<datasetName>"
}
-
type(string) —bigquery。 -
name(string) — 源的人性化标签(在所有名称中唯一;不能包含空格)。您将在规范文件的其他部分通过此名称引用该源。 -
query(string) — 要从 BigQuery 提取的数据集,以 SQL 查询形式表示。请注意:-
源表中的列数可以多于您在查询中选择的列数;
-
多个目标可以使用同一个源,并可能针对不同的列子集进行过滤。
-
-
query_temp_project(string, 可选) 用于存储临时查询结果的 Google Cloud 项目(默认为当前项目)。 -
query_temp_dataset(string, 可选) 用于存储临时查询结果的 BigQuery 数据集(默认为一个新的临时数据集)。
|
在您仅拥有源所在项目/数据集的“读取”权限时,指定临时项目和/或数据集非常有用。 如果您设置了 |
不支持 BIGNUMERIC, GEOGRAPHY, JSON, INTERVAL 和 STRUCT 类型的列。 |
目标 (Targets)
targets 部分包含导入后将生成的图实体的定义。
您必须指定 至少 一个目标对象。
Neo4j 使用 节点(例如 movies, people)表示对象,并使用 关系(例如 ACTED_IN, DIRECTED)连接它们。targets 部分中的每个对象都会从源中提取数据,并在 Neo4j 中生成相应的实体(节点或关系)。也可以运行自定义 Cypher 查询。
"targets": {
"nodes": [ ... ],
"relationships": [ ... ],
"queries": [ ... ]
}
默认情况下,您无需考虑节点和关系之间的依赖关系。关系目标始终在与其起始节点和结束节点对应的目标之后处理。不过,也可以将其他目标添加为依赖项。
节点对象
节点实体必须在 targets 对象内的 nodes 键下进行分组。
"targets": {
"nodes": [
{ <nodeSpec1> },
{ <nodeSpec2> },
...
]
}
必填字段
每个节点对象至少必须包含属性 source, name, labels, properties 和 write_mode。
{
"source": "<sourceName>",
"name": "<targetName>",
"labels": ["<label1>", "<label2>", ...],
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
],
"write_mode": "merge"
}
-
source(string) — 此目标应从中获取数据的源名称。应与sources对象中的名称之一匹配。 -
name(string) — 目标的人性化名称(在所有名称中唯一)。 -
labels(字符串列表) — 用于标记节点的 标签。 -
properties(对象列表) — 源列与节点属性之间的映射。target_property_type的有效值包括:boolean,byte_array(假定为 base64 编码),date,duration,float,integer,local_date,local_datetime,local_time,point,string,zoned_datetime,zoned_time。每种属性类型(byte_array 除外)也提供 "_array" 形式(即 date_array, string_array 等),用于 BigQuery 的 "repeated" 列类型。 -
write_mode(string) — 在 Neo4j 中的创建模式。可以是create或merge。有关 Cypher 子句行为的信息,请参阅CREATE和MERGE。
模式定义
如果全局配置 index_all_properties 设置为 true,则所有属性都将使用范围索引进行索引。 |
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"labels": ["label1", "label2", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
其中每个对象的属性为
源数据中的 key_constraints 列不能包含空值,否则它们将与节点键约束冲突。如果源数据在此方面不干净,请考虑通过在相关的 source.query 字段中排除所有不符合约束的行(例如 WHERE person_tmbdId IS NOT NULL)来提前进行清理。或者,在 源转换 中使用 where 属性。 |
选项 key_constraints 和 existence_constraints 需要 Neo4j/Aura 企业版,在 Neo4j 社区版安装上运行时没有任何效果。 |
配置
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active(bool) — 是否应将该目标包含在导入中(默认:true)。 -
source_transformations(object) — 如果enable_grouping设置为true,导入程序将在key_constraints和properties中指定的所有字段上附加 SQLGROUP BY子句。如果设置为false,则源中的任何重复数据都将被推送到 Neo4j,这可能会导致约束错误或降低插入效率。该对象还可以包含聚合函数和其他字段,请参阅 源转换。 -
depends_on(字符串列表) — 应在当前目标 之前 执行的目标name。
示例
Person 节点的节点对象示例{
"source": "persons",
"name": "Persons",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_field": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_field": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_field": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_field": "bornDate",
"target_property_type": "local_date"
},
{
"source_field": "died",
"target_field": "diedDate",
"target_property_type": "local_date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
}
关系对象
关系实体必须在 targets 对象内的 relationships 键下进行分组。
"targets": {
...
"relationships": [
{ <relationshipSpec1> },
{ <relationshipSpec2> },
...
]
}
必填字段
每个关系对象至少必须包含属性 source, name, type, start_node_reference, end_node_reference, node_match_mode 和 write_mode。
{
"source": "<sourceName>",
"name": "<targetName>",
"type": "<relationshipType>",
"start_node_reference": "<nodeTarget>",
"end_node_reference": "<nodeTarget>",
"node_match_mode": "<match/merge>",
"write_mode": "<create/merge>"
}
-
source(string) — 此目标应从中获取数据的源名称。应与sources对象中的名称之一匹配。 -
name(string) — 目标的人性化名称(在所有名称中唯一)。 -
type(string) — 分配给关系的 类型。 -
node_match_mode(string) — 在创建关系之前用于获取源/结束节点的 Cypher 子句。有效值为match或merge,分别对应 Cypher 子句MATCH和MERGE。 -
write_mode(string) — 在 Neo4j 中的创建模式。可以是create或merge。有关 Cypher 子句行为的信息,请参阅CREATE和MERGE。
start/end_node_reference 属性包含有关关系链接哪些节点目标的信息。可以通过两种方式指定它们。
"start_node_reference": "<nodeTargetName>",
"end_node_reference": "<nodeTargetName>",
-
start_node_reference(string) — 作为关系 起点 的节点目标的名称。 -
end_node_reference(string) — 作为关系 终点 的节点目标的名称。
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"start_node_reference": {
"name": "<nodeTargetName>",
"key_mappings": [
{
"source_field": "<sourceMappingKey>",
"node_property": "<nodeTargetMappingKey>"
}
]
},
"end_node_reference": {
"name": "<nodeTargetName>",
"key_mappings": [
{
"source_field": "<sourceMappingKey>",
"node_property": "<nodeTargetMappingKey>"
}
]
},
-
start_node_reference(object) — 作为关系 起点 的节点目标名称,以及源中作为键的列名 (source_field) 和节点目标中作为键的导入属性 (node_property)。 -
end_node_reference(object) — 作为关系 终点 的节点目标名称,以及源中作为键的列名 (source_field) 和节点目标中作为键的导入属性 (node_property)。
"start_node_reference": {
"name": "Persons",
"key_mappings": [
{
"source_field": "person_tmdbId",
"node_property": "id"
}
]
},
"end_node_reference": {
"name": "Movies",
"key_mappings": [
{
"source_field": "movieId",
"node_property": "id"
}
]
},
您可以在 key_mappings 中列出多个对象(每个对象结构相同)以处理复合键。
属性
关系也可以将源列映射为属性。
{
...
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
]
}
-
properties(对象列表) — 源列与关系属性之间的映射。target_property_type的有效值包括:boolean,byte_array(假定为 base64 编码),date,duration,float,integer,local_date,local_datetime,local_time,point,string,zoned_datetime,zoned_time。每种属性类型(byte_array 除外)也提供 "_array" 形式(即 date_array, string_array 等),用于 BigQuery 的 "repeated" 列类型。
模式定义
如果全局配置 index_all_properties 设置为 true,则所有属性都将使用范围索引进行索引。 |
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"types": ["<relationshipType1>", "<relationshipType2>", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
其中每个对象的属性为
源数据中的 key_constraints 列不能包含空值,否则它们将与关系键约束冲突。如果源数据在此方面不干净,请考虑通过在相关的 source.query 字段中排除所有不符合约束的行(例如 WHERE person_tmbdId IS NOT NULL)来提前进行清理。或者,在 源转换 中使用 where 属性。
|
选项 key_constraints 和 existence_constraints 需要 Neo4j/Aura 企业版,在 Neo4j 社区版安装上运行时没有任何效果。 |
配置
关系目标配置选项及其默认值
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active (bool) — 是否应将该目标包含在导入中。
-
source_transformations (object) — 如果 enable_grouping 设置为 true,导入程序将对 key_constraints 和 properties 中指定的所有字段执行 SQL GROUP BY。如果设置为 false,则源中的任何重复数据都将被推送到 Neo4j,这可能会导致约束错误或降低插入效率。该对象还可以包含聚合函数和其他字段,请参阅 源转换。
-
depends_on (字符串列表) — 应在当前目标 之前 执行的目标 name。
示例
用于导入 ACTED_IN 关系的关系对象示例
{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_field": "role",
"target_property_type": "string"
}
]
}
自定义查询目标
当导入需要无法轻松融入节点/关系目标格式的复杂查询时,自定义查询目标非常有用。查询目标通过变量 $rows 接收行批次。
自定义查询必须在 targets 对象内的 queries 键下进行分组。
查询目标规范框架
"targets": {
...
"queries": [
{ <querySpec1> },
{ <querySpec2> },
...
]
}
不要使用自定义查询来运行不直接依赖于源的 Cypher;请改用 actions。一次性查询(特别是如果不是幂等的)不适合在自定义查询目标中使用。原因在于目标中的查询是分批运行的,因此自定义查询可能会根据从源中提取的 $rows 批次数量多次运行。
必填字段
每个查询目标至少必须包含属性 source, name 和 query。
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source (string) — 此目标应从中获取数据的源名称。应与 sources 对象中的名称之一匹配。
-
name (string) — 目标的人性化名称(在所有名称中唯一)。
-
query (string) — 一个 Cypher 查询。来自源的数据可作为参数 $rows 中的列表使用。
配置
查询目标配置选项及其默认值
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active (bool) — 是否应将该目标包含在导入中。
-
depends_on (字符串列表) — 应在当前目标 之前 执行的目标 name。
示例
用于导入 Person 节点并设置创建日期的查询对象示例
{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
源转换
每个节点和关系目标都可以选择性地包含一个包含聚合函数的 source_transformation 属性。这对于从更细粒度的源提取更高级别的维度非常有用。聚合会产生可用于属性映射的额外字段。
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping (bool) — 必须为 true 才能使 aggregations/where 工作。
-
aggregations (对象列表) — 聚合在 expression 属性中指定为 SQL 查询,结果以 field_name 中指定的名称作为源列提供。
-
limit (int) — 限制考虑用于导入的源行数(默认为无限制,编码为 -1)。
-
where (string) — 在导入前过滤掉源数据(使用 SQL WHERE 子句格式)。
-
order_by (对象列表) — 对源强制执行排序。
示例
虚拟数据集上的转换对象示例
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
默认情况下,源只被处理一次;其数据随后被分发到各个目标。但是,带有源转换的目标会触发新的数据获取,因为生成的源查询与默认查询不同。因此,对于没有转换的目标,源会被处理一次,对于定义了转换的目标,则会额外处理多次。因此,原始源查询必须是确定性的,否则不同的目标可能会接收到不同的数据。
操作 (Actions)
actions 部分包含可以在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个 stage。例如,您可以在步骤完成时提交 HTTP 请求、在源上执行 SQL 查询或在 Neo4j 目标实例上运行 Cypher 语句。
操作规范框架
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
每个操作对象至少必须包含属性 name, type 和 stage。其他属性取决于操作类型。
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
method (string) — HTTP 方法;可以是 get 或 post。
-
url (string) — HTTP 请求应指向的 URL。
-
headers (object, 可选) — 请求头。
导入完成后发送 GET 请求的操作示例
{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
query (string) — 要运行的 Cypher 查询。
-
execution_mode (string, 可选) — 查询应在何种模式下执行。有效值为 transaction, autocommit (默认: transaction)。
导入完成后创建 importJob 节点的操作示例
{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
sql (string) — 要运行的 SQL 查询。
导入完成后发送 GET 请求的操作示例
{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active(bool) — 是否应将该目标包含在导入中。 -
source_transformations(object) — 如果enable_grouping设置为true,导入程序将对key_constraints和properties中指定的所有字段执行 SQLGROUP BY。如果设置为false,则源中的任何重复数据都将被推送到 Neo4j,这可能会导致约束错误或降低插入效率。该对象还可以包含聚合函数和其他字段,请参阅 源转换。 -
depends_on(字符串列表) — 应在当前目标 之前 执行的目标name。
示例
用于导入 ACTED_IN 关系的关系对象示例
{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_field": "role",
"target_property_type": "string"
}
]
}
ACTED_IN 关系的关系对象示例
{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_field": "role",
"target_property_type": "string"
}
]
}
自定义查询目标
当导入需要无法轻松融入节点/关系目标格式的复杂查询时,自定义查询目标非常有用。查询目标通过变量 $rows 接收行批次。
自定义查询必须在 targets 对象内的 queries 键下进行分组。
查询目标规范框架
"targets": {
...
"queries": [
{ <querySpec1> },
{ <querySpec2> },
...
]
}
不要使用自定义查询来运行不直接依赖于源的 Cypher;请改用 actions。一次性查询(特别是如果不是幂等的)不适合在自定义查询目标中使用。原因在于目标中的查询是分批运行的,因此自定义查询可能会根据从源中提取的 $rows 批次数量多次运行。
必填字段
每个查询目标至少必须包含属性 source, name 和 query。
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source (string) — 此目标应从中获取数据的源名称。应与 sources 对象中的名称之一匹配。
-
name (string) — 目标的人性化名称(在所有名称中唯一)。
-
query (string) — 一个 Cypher 查询。来自源的数据可作为参数 $rows 中的列表使用。
配置
查询目标配置选项及其默认值
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active (bool) — 是否应将该目标包含在导入中。
-
depends_on (字符串列表) — 应在当前目标 之前 执行的目标 name。
示例
用于导入 Person 节点并设置创建日期的查询对象示例
{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
源转换
每个节点和关系目标都可以选择性地包含一个包含聚合函数的 source_transformation 属性。这对于从更细粒度的源提取更高级别的维度非常有用。聚合会产生可用于属性映射的额外字段。
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping (bool) — 必须为 true 才能使 aggregations/where 工作。
-
aggregations (对象列表) — 聚合在 expression 属性中指定为 SQL 查询,结果以 field_name 中指定的名称作为源列提供。
-
limit (int) — 限制考虑用于导入的源行数(默认为无限制,编码为 -1)。
-
where (string) — 在导入前过滤掉源数据(使用 SQL WHERE 子句格式)。
-
order_by (对象列表) — 对源强制执行排序。
示例
虚拟数据集上的转换对象示例
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
默认情况下,源只被处理一次;其数据随后被分发到各个目标。但是,带有源转换的目标会触发新的数据获取,因为生成的源查询与默认查询不同。因此,对于没有转换的目标,源会被处理一次,对于定义了转换的目标,则会额外处理多次。因此,原始源查询必须是确定性的,否则不同的目标可能会接收到不同的数据。
操作 (Actions)
actions 部分包含可以在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个 stage。例如,您可以在步骤完成时提交 HTTP 请求、在源上执行 SQL 查询或在 Neo4j 目标实例上运行 Cypher 语句。
操作规范框架
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
每个操作对象至少必须包含属性 name, type 和 stage。其他属性取决于操作类型。
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
method (string) — HTTP 方法;可以是 get 或 post。
-
url (string) — HTTP 请求应指向的 URL。
-
headers (object, 可选) — 请求头。
导入完成后发送 GET 请求的操作示例
{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
query (string) — 要运行的 Cypher 查询。
-
execution_mode (string, 可选) — 查询应在何种模式下执行。有效值为 transaction, autocommit (默认: transaction)。
导入完成后创建 importJob 节点的操作示例
{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
sql (string) — 要运行的 SQL 查询。
导入完成后发送 GET 请求的操作示例
{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}
当导入需要无法轻松融入节点/关系目标格式的复杂查询时,自定义查询目标非常有用。查询目标通过变量 $rows 接收行批次。
自定义查询必须在 targets 对象内的 queries 键下进行分组。
"targets": {
...
"queries": [
{ <querySpec1> },
{ <querySpec2> },
...
]
}
不要使用自定义查询来运行不直接依赖于源的 Cypher;请改用 actions。一次性查询(特别是如果不是幂等的)不适合在自定义查询目标中使用。原因在于目标中的查询是分批运行的,因此自定义查询可能会根据从源中提取的 $rows 批次数量多次运行。
|
必填字段
每个查询目标至少必须包含属性 source, name 和 query。
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source (string) — 此目标应从中获取数据的源名称。应与 sources 对象中的名称之一匹配。
-
name (string) — 目标的人性化名称(在所有名称中唯一)。
-
query (string) — 一个 Cypher 查询。来自源的数据可作为参数 $rows 中的列表使用。
每个查询目标至少必须包含属性 source, name 和 query。
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source(string) — 此目标应从中获取数据的源名称。应与sources对象中的名称之一匹配。 -
name(string) — 目标的人性化名称(在所有名称中唯一)。 -
query(string) — 一个 Cypher 查询。来自源的数据可作为参数$rows中的列表使用。
配置
查询目标配置选项及其默认值
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active (bool) — 是否应将该目标包含在导入中。
-
depends_on (字符串列表) — 应在当前目标 之前 执行的目标 name。
示例
用于导入 Person 节点并设置创建日期的查询对象示例
{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active(bool) — 是否应将该目标包含在导入中。 -
depends_on(字符串列表) — 应在当前目标 之前 执行的目标name。
示例
用于导入 Person 节点并设置创建日期的查询对象示例
{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
Person 节点并设置创建日期的查询对象示例
{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
源转换
每个节点和关系目标都可以选择性地包含一个包含聚合函数的 source_transformation 属性。这对于从更细粒度的源提取更高级别的维度非常有用。聚合会产生可用于属性映射的额外字段。
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping (bool) — 必须为 true 才能使 aggregations/where 工作。
-
aggregations (对象列表) — 聚合在 expression 属性中指定为 SQL 查询,结果以 field_name 中指定的名称作为源列提供。
-
limit (int) — 限制考虑用于导入的源行数(默认为无限制,编码为 -1)。
-
where (string) — 在导入前过滤掉源数据(使用 SQL WHERE 子句格式)。
-
order_by (对象列表) — 对源强制执行排序。
示例
虚拟数据集上的转换对象示例
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
默认情况下,源只被处理一次;其数据随后被分发到各个目标。但是,带有源转换的目标会触发新的数据获取,因为生成的源查询与默认查询不同。因此,对于没有转换的目标,源会被处理一次,对于定义了转换的目标,则会额外处理多次。因此,原始源查询必须是确定性的,否则不同的目标可能会接收到不同的数据。
操作 (Actions)
actions 部分包含可以在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个 stage。例如,您可以在步骤完成时提交 HTTP 请求、在源上执行 SQL 查询或在 Neo4j 目标实例上运行 Cypher 语句。
操作规范框架
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
每个操作对象至少必须包含属性 name, type 和 stage。其他属性取决于操作类型。
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
method (string) — HTTP 方法;可以是 get 或 post。
-
url (string) — HTTP 请求应指向的 URL。
-
headers (object, 可选) — 请求头。
导入完成后发送 GET 请求的操作示例
{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
query (string) — 要运行的 Cypher 查询。
-
execution_mode (string, 可选) — 查询应在何种模式下执行。有效值为 transaction, autocommit (默认: transaction)。
导入完成后创建 importJob 节点的操作示例
{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
sql (string) — 要运行的 SQL 查询。
导入完成后发送 GET 请求的操作示例
{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}
每个节点和关系目标都可以选择性地包含一个包含聚合函数的 source_transformation 属性。这对于从更细粒度的源提取更高级别的维度非常有用。聚合会产生可用于属性映射的额外字段。
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping(bool) — 必须为true才能使aggregations/where工作。 -
aggregations(对象列表) — 聚合在expression属性中指定为 SQL 查询,结果以field_name中指定的名称作为源列提供。 -
limit(int) — 限制考虑用于导入的源行数(默认为无限制,编码为-1)。 -
where(string) — 在导入前过滤掉源数据(使用 SQLWHERE子句格式)。 -
order_by(对象列表) — 对源强制执行排序。
示例
虚拟数据集上的转换对象示例
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
默认情况下,源只被处理一次;其数据随后被分发到各个目标。但是,带有源转换的目标会触发新的数据获取,因为生成的源查询与默认查询不同。因此,对于没有转换的目标,源会被处理一次,对于定义了转换的目标,则会额外处理多次。因此,原始源查询必须是确定性的,否则不同的目标可能会接收到不同的数据。
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
| 默认情况下,源只被处理一次;其数据随后被分发到各个目标。但是,带有源转换的目标会触发新的数据获取,因为生成的源查询与默认查询不同。因此,对于没有转换的目标,源会被处理一次,对于定义了转换的目标,则会额外处理多次。因此,原始源查询必须是确定性的,否则不同的目标可能会接收到不同的数据。 |
操作 (Actions)
actions 部分包含可以在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个 stage。例如,您可以在步骤完成时提交 HTTP 请求、在源上执行 SQL 查询或在 Neo4j 目标实例上运行 Cypher 语句。
操作规范框架
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
每个操作对象至少必须包含属性 name, type 和 stage。其他属性取决于操作类型。
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
method (string) — HTTP 方法;可以是 get 或 post。
-
url (string) — HTTP 请求应指向的 URL。
-
headers (object, 可选) — 请求头。
导入完成后发送 GET 请求的操作示例
{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
query (string) — 要运行的 Cypher 查询。
-
execution_mode (string, 可选) — 查询应在何种模式下执行。有效值为 transaction, autocommit (默认: transaction)。
导入完成后创建 importJob 节点的操作示例
{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type (string) — 操作类型。
-
name (string) — 操作的人性化名称(在所有名称中唯一)。
-
stage (string) — 操作应在导入的哪个时间点运行。有效值为:start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end。
-
sql (string) — 要运行的 SQL 查询。
导入完成后发送 GET 请求的操作示例
{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}
actions 部分包含可以在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个 stage。例如,您可以在步骤完成时提交 HTTP 请求、在源上执行 SQL 查询或在 Neo4j 目标实例上运行 Cypher 语句。
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
每个操作对象至少必须包含属性 name, type 和 stage。其他属性取决于操作类型。
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type(string) — 操作类型。 -
name(string) — 操作的人性化名称(在所有名称中唯一)。 -
stage(string) — 操作应在导入的哪个时间点运行。有效值为:start,post_sources,pre_nodes,post_nodes,pre_relationships,post_relationships,pre_queries,post_queries,end。 -
method(string) — HTTP 方法;可以是get或post。 -
url(string) — HTTP 请求应指向的 URL。 -
headers(object, 可选) — 请求头。
GET 请求的操作示例
{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type(string) — 操作类型。 -
name(string) — 操作的人性化名称(在所有名称中唯一)。 -
stage(string) — 操作应在导入的哪个时间点运行。有效值为:start,post_sources,pre_nodes,post_nodes,pre_relationships,post_relationships,pre_queries,post_queries,end。 -
query(string) — 要运行的 Cypher 查询。 -
execution_mode(string, 可选) — 查询应在何种模式下执行。有效值为transaction,autocommit(默认:transaction)。
importJob 节点的操作示例
{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type(string) — 操作类型。 -
name(string) — 操作的人性化名称(在所有名称中唯一)。 -
stage(string) — 操作应在导入的哪个时间点运行。有效值为:start,post_sources,pre_nodes,post_nodes,pre_relationships,post_relationships,pre_queries,post_queries,end。 -
sql(string) — 要运行的 SQL 查询。
GET 请求的操作示例
{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}