1 概述 APOC扩展包中的查询任务管理提供相关过程帮助将复杂查询分解成较小的批次迭代执行,从而减少事务处理的开销、提高内存使用效率。 Cypher的LOAD CSV语句允许通过USING PERIODIC COMMIT 指定批次大小,以减小更新事务的规模、提高性能并降低对内存(主要是Java堆内存/Heap Memory)的需求。对于其他Cypher语句,例如CREATE、MERGE等更新数据库操作,可以在APOC的任务管理相关过程中执行、以达到同样的效果。 通过APOC任务管理提交的批处理任务多数在后台以异步方式运行,默认情况下线程池的大小 = 处理器内核数 x 2,可以通过以下设置修改线程池大小: neo4j.conf: apoc.jobs.pool.num_threads=10
计划任务依赖调度执行器来执行。调度执行器的线程池也有默认固定大小 = 处理器内核数 / 4 (至少为1)。可以使用以下配置属性配置池大小: neo4j.conf: apoc.jobs.scheduled.num_threads=10
APOC提供以下任务管理过程,它们均包含在apoc.periodic.*子库中。 过程名 接口 功能 apoc.periodic.commit CALL apoc.periodic.commit( statement, params ) 从给定节点出发寻找扩展路径,支持基本配置。 apoc.periodic.iterate CALL apoc.periodic.iterate( statement, itemStatement, { batchSize:1000, iterateList:true, parallel:false, params:{}, concurrency:50, retries:0 } ) YIELD batches, total 从给定节点出发寻找扩展路径,支持完整的配置。 apoc.periodic.list CALL apoc.periodic.list() 列出所有在后台运行的查询任务。 apoc.periodic.submit CALL apoc.periodic.submit( name, statement ) 提交statement作为在后台/异步运行的查询任务。 apoc.custom.asFunction CALL apoc.custom.asFunction( name, statement, mode, outputs, inputs, forceSingle, description ) 创建自定义Cypher功能函数。 apoc.custom.asProcedure CALL apoc.custom.asProcedure( name, statement, mode, outputs, inputs, description ) 创建自定义Cypher过程。
APOC也提供更加灵活、强大的运行Cypher查询的过程。 过程/函数名 接口 功能 apoc.cypher.run CALL apoc.cypher.run( fragment, params ) YIELD value 使用给定的参数执行数据库读取查询。 apoc.cypher. runFirstColumnSingle apoc.cypher.runFirstColumnSingle (statement, params) 使用给定的参数执行数据库读取查询并仅返回第一列,将返回第一行/单行或返回null。 apoc.cypher. runFirstColumnMany apoc.cypher.runFirstColumnMany (statement, params) 使用给定的参数执行数据库读取查询并仅返回第一列,将返回包含所有行的列表。 apoc.cypher.runFile CALL apoc.cypher.runFile( file or url, {config} ) YIELD row, result 运行文件中的每个语句,语句以分号隔开。目前不支持数据库模式操作。 apoc.cypher. runFiles CALL apoc.cypher.runFiles( [files or urls], {config} ) YIELD row, result 运行多个文件中的查询语句。 apoc.cypher. runSchemaFile CALL apoc.cypher.runSchemaFile (file or url,{config}) 运行文件中的每个数据库模式操作语句,所有语句以分号分开。 apoc.cypher.runMany CALL apoc.cypher.runMany( ‘cypher;\nstatements;’, {params}, {config} ) 运行每个分号分隔的语句并返回摘要。当前不支持数据库模式操作。 apoc.cypher. mapParallel CALL apoc.cypher.mapParallel( fragment, params, list-to-parallelize ) YIELD value 以并行批处理执行查询。 apoc.cypher.doIt CALL apoc.cypher.doIt( fragment, params ) YIELD value 使用给定的参数执行数据库写入查询。 apoc.cypher. runTimeboxed CALL apoc.cypher.runTimeboxed( ‘cypherStatement’, {params}, timeout ) 以预定义时间窗口运行Cypher查询,如果超时仍没有完成则中止语句。 apoc.when CALL apoc.when( condition, ifQuery, elseQuery:’’, params:{} ) YIELD value 基于条件,执行带有给定参数的只读ifQuery或elseQuery。 apoc.do.when CALL apoc.do.when( condition, ifQuery, elseQuery:’’, params:{} ) YIELD value 基于条件,执行使用给定参数编写ifQuery或elseQuery。 apoc.case CALL apoc.case( [condition,query, condition,query, …], elseQuery:’’, params:{} ) YIELD value 给定条件/只读查询对的列表,执行与第一个条件相关联的查询使用给定参数计算为true(或者如果没有则为else查询)。 apoc.do.case CALL apoc.do.case( [condition,query, condition,query,…], elseQuery:’’, params:{} ) YIELD value 给定条件/写入查询对的列表,执行与第一个条件相关联的查询使用给定参数计算为true(或者如果没有则为else查询)。
---- 待续 ---- (下一章:循环执行-apoc.periodic.commit)