目录监听器
apoc.load.directory.async 过程用于管理触发器。每个触发器由监听一个或多个文件夹的监听器组成,当触发时会执行自定义 Cypher 查询。以下过程可用于添加、删除和列出触发器:
| 限定名称 | 类型 | 版本 |
|---|---|---|
|
|
|
|
apoc.load.directory.async.remove
|
|
|
apoc.load.directory.async.removeAll
|
|
|
apoc.load.directory.async.list
|
|
|
用法示例
添加文件夹监听器
结合 apoc.load.csv 或 apoc.load.json 等其他加载过程,可以异步加载文件。
此示例根据触发器提供的参数创建节点,仅在 csvFolder 中的文件创建和修改时触发。
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
其中 $fileName 是创建/修改的文件名,$filePath 是文件的相对路径,即 $IMPORT_DIR/csvFolder/[FILENAME.csv],$fileDirectory 是目录的相对路径,即 $IMPORT_DIR/csvFolder,而 $listenEventType 是触发的事件,即 CREATE 或 MODIFY。
假设我们的 IMPORT_DIR 设置为 import,且以下文件被上传到 import/csvFolder 文件夹
name,age
Selma,8
Rana,11
Selina,18
然后执行 MATCH (n:CsvToNode) RETURN properties(n) as props
| props |
|---|
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] } |
如果我们按如下方式修改 test.csv
name,age
Selma,80
Rana,110
Selina,180
我们获得了 3 个带有以下属性的新节点
| props |
|---|
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] } |
列表和删除过程
我们可以通过执行以下命令查看监听器列表
CALL apoc.load.directory.async.list();
例如,如果我们执行
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
结果将是
| 名称 (name) | 状态 (status) | pattern | cypher | urlDir | config | 错误 (error) |
|---|---|---|---|---|---|---|
"csvImport" |
"RUNNING" |
"*.csv" |
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})" |
<importUrlDir>/csvFolder |
{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 } |
"" |
我们可以通过执行删除过程并将特定名称作为参数来移除某个监听器
CALL apoc.load.directory.async.remove('csvImport')
结果将是剩余监听器的列表。
此外,我们可以通过以下过程删除所有内容(该过程将返回空结果)
CALL apoc.load.directory.async.removeAll()
错误处理
当监听器因某种原因失败时,其 status 字段会从 RUNNING 变为 ERROR,并输出相关错误。如果我们执行 call apoc.load.directory.async.list,例如我们将得到
| 名称 (name) | 状态 (status) | pattern | cypher | urlDir | config | 错误 (error) |
|---|---|---|---|---|---|---|
|
|
|
|
|
|
|
配置
请注意,要使用 apoc.load.directory.async.* 过程,需要启用以下配置
apoc.import.file.enabled=true
以下设置允许您更改导入文件夹
dbms.directories.import=import
可以将 apoc.import.file.use_neo4j_config=false 设置为在绝对路径中搜索文件。
CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');