目录监听器

apoc.load.directory.async 过程用于管理触发器。每个触发器由监听一个或多个文件夹的监听器组成,当触发时会执行自定义 Cypher 查询。以下过程可用于添加、删除和列出触发器:

限定名称 类型 版本

apoc.load.directory.async.add

apoc.load.directory.async.add(name, cypher, pattern, urlDir, {}) YIELD name, status, pattern, cypher, urlDir, config, error - 添加或替换一个指定名称的文件夹监听器。该监听器针对所有符合给定模式(pattern)的文件触发,并在触发时执行指定的 Cypher 查询。返回所有监听器的列表。可以在 config 参数中指定事件类型。

过程

Apoc Extended

apoc.load.directory.async.remove

apoc.load.directory.async.remove(name) YIELD name, status, pattern, cypher, urlDir, config, error - 按名称删除文件夹监听器,并返回所有剩余的监听器(如果有的话)。

过程

Apoc Extended

apoc.load.directory.async.removeAll

apoc.load.directory.async.removeAll() - 删除所有文件夹监听器。

过程

Apoc Extended

apoc.load.directory.async.list

apoc.load.directory.async.list() YIELD name, status, pattern, cypher, urlDir, config, error - 列出所有文件夹监听器。

过程

Apoc Extended

用法示例

添加文件夹监听器

结合 apoc.load.csvapoc.load.json 等其他加载过程,可以异步加载文件。

此示例根据触发器提供的参数创建节点,仅在 csvFolder 中的文件创建和修改时触发。

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

其中 $fileName 是创建/修改的文件名,$filePath 是文件的相对路径,即 $IMPORT_DIR/csvFolder/[FILENAME.csv]$fileDirectory 是目录的相对路径,即 $IMPORT_DIR/csvFolder,而 $listenEventType 是触发的事件,即 CREATEMODIFY

假设我们的 IMPORT_DIR 设置为 import,且以下文件被上传到 import/csvFolder 文件夹

test.csv
name,age
Selma,8
Rana,11
Selina,18

然后执行 MATCH (n:CsvToNode) RETURN properties(n) as props

表 1. 结果
props

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] }

如果我们按如下方式修改 test.csv

test.csv
name,age
Selma,80
Rana,110
Selina,180

我们获得了 3 个带有以下属性的新节点

表 2. 结果
props

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] }

列表和删除过程

我们可以通过执行以下命令查看监听器列表

CALL apoc.load.directory.async.list();

例如,如果我们执行

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

结果将是

表 3. 结果
名称 (name) 状态 (status) pattern cypher urlDir config 错误 (error)

"csvImport"

"RUNNING"

"*.csv"

"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})"

<importUrlDir>/csvFolder

{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 }

""

我们可以通过执行删除过程并将特定名称作为参数来移除某个监听器

CALL apoc.load.directory.async.remove('csvImport')

结果将是剩余监听器的列表。

此外,我们可以通过以下过程删除所有内容(该过程将返回空结果)

CALL apoc.load.directory.async.removeAll()

错误处理

当监听器因某种原因失败时,其 status 字段会从 RUNNING 变为 ERROR,并输出相关错误。如果我们执行 call apoc.load.directory.async.list,例如我们将得到

名称 (name) 状态 (status) pattern cypher urlDir config 错误 (error)

listenerName

ERROR

*.csv

'create (n:Node)'

'path'

{}

"org.neo4j.graphdb.QueryExecutionException: Failed to invoke procedure apoc.load.csv: Caused by: java.io.FileNotFoundException …​.

配置

请注意,要使用 apoc.load.directory.async.* 过程,需要启用以下配置

apoc.conf
apoc.import.file.enabled=true

以下设置允许您更改导入文件夹

dbms.directories.import=import

可以将 apoc.import.file.use_neo4j_config=false 设置为在绝对路径中搜索文件。

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');
© . This site is unofficial and not affiliated with Neo4j, Inc.