用户定义的过程
用户定义的过程(User-defined procedure)是一种通过编写自定义代码来扩展 Neo4j 的机制,这些代码可以直接从 Cypher 中调用。过程可以接收参数、对数据库执行操作并返回结果。有关用户定义的过程、函数和聚合函数之间的比较,请参见 Neo4j 自定义代码。
|
需要在系统数据库上执行的用户定义过程必须包含 |
调用过程
要调用用户定义的过程,请使用 Cypher CALL 子句。过程名称必须是全限定名,因此定义在 org.neo4j.examples 包中名为 findDenseNodes 的过程可以按照以下方式调用:
CALL org.neo4j.examples.findDenseNodes(1000)
CALL 既可以是 Cypher 语句中唯一的子句,也可以与其他子句结合使用。参数可以直接在查询中提供,也可以从关联的参数集中获取。有关完整详细信息,请参阅 Cypher 手册 → CALL 过程 中的文档。
创建过程
请确保您已阅读并遵循 设置插件项目 中的准备工作设置说明。
|
下面讨论的示例可以在 GitHub 上的仓库 中找到。为了快速入门,您可以 fork 该仓库,并在按照下方指南操作时使用其中的代码。 |
首先,确定过程的功能,然后编写一个证明其功能正确的测试。最后,编写一个能通过该测试的过程。
集成测试
测试依赖项包括 Neo4j Harness 和 JUnit。这些可以用来为过程编写集成测试。测试应启动一个 Neo4j 实例,加载过程,并针对它执行查询。
package example;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Value;
import org.neo4j.harness.Neo4j;
import org.neo4j.harness.Neo4jBuilders;
import static org.assertj.core.api.Assertions.assertThat;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class GetRelationshipTypesTests {
private Driver driver;
private Neo4j embeddedDatabaseServer;
@BeforeAll
void initializeNeo4j() {
this.embeddedDatabaseServer = Neo4jBuilders.newInProcessBuilder()
.withDisabledServer()
.withProcedure(GetRelationshipTypes.class)
.build();
this.driver = GraphDatabase.driver(embeddedDatabaseServer.boltURI());
}
@AfterAll
void closeDriver(){
this.driver.close();
this.embeddedDatabaseServer.close();
}
@AfterEach
void cleanDb(){
try(Session session = driver.session()) {
session.run("MATCH (n) DETACH DELETE n");
}
}
/**
* We should be getting the correct values when there is only one type in each direction
*/
@Test
public void shouldReturnTheTypesWhenThereIsOneEachWay() {
final String expectedIncoming = "INCOMING";
final String expectedOutgoing = "OUTGOING";
// In a try-block, to make sure we close the session after the test
try(Session session = driver.session()) {
//Create our data in the database.
session.run(String.format("CREATE (:Person)-[:%s]->(:Movie {id:1})-[:%s]->(:Person)", expectedIncoming, expectedOutgoing));
//Execute our procedure against it.
Record record = session.run("MATCH (u:Movie {id:1}) CALL example.getRelationshipTypes(u) YIELD outgoing, incoming RETURN outgoing, incoming").single();
//Get the incoming / outgoing relationships from the result
assertThat(record.get("incoming").asList(Value::asString)).containsOnly(expectedIncoming);
assertThat(record.get("outgoing").asList(Value::asString)).containsOnly(expectedOutgoing);
}
}
}
|
前一个示例使用了 JUnit 5,这需要使用 |
定义过程
在准备好测试后,编写一个满足测试预期的过程。完整示例可在 Neo4j 过程模板 仓库中找到。
特别注意事项
-
所有过程都必须使用
@Procedure注解。 -
过程注解可以带有三个可选参数:
name、mode和eager。-
name用于指定过程名称,而不是默认生成的class.path.nameOfMethod。如果指定了mode,则也必须指定name。 -
name不允许存在于保留命名空间中,并且在没有命名空间的情况下使用name的行为已被弃用。 -
如果注册的过程名称与弃用命名空间中的内置过程名称相同,则内置过程将被覆盖(shadowed)。
-
mode用于声明过程执行的交互类型。如果过程尝试执行违反其模式的数据库操作,则该过程会失败。默认的mode是READ。可用模式如下:-
READ— 该过程仅对图执行读取操作。 -
WRITE— 该过程对图执行读取和写入操作。 -
SCHEMA— 该过程对模式执行操作,例如创建和删除索引及约束。此模式的过程可以读取图数据,但不能写入。 -
DBMS— 该过程执行系统操作,如用户管理和查询管理。此模式的过程无法读取或写入图数据。
-
-
eager是一个布尔值设置,默认为false。如果设置为true,Cypher 规划器会在调用过程前后规划一个额外的eager操作。这在过程以可能影响过程前后操作的方式修改数据库时非常有用。例如:MATCH (n) WHERE n.key = 'value' WITH n CALL example.deleteNeighbours(n, 'FOLLOWS')此查询可能会删除 Cypher 查询匹配的部分节点,从而导致
n.key查找失败。将此过程标记为eager可防止其在 Cypher 代码中引发错误。然而,过程仍有可能通过尝试读取其之前已删除的实体而产生自我干扰。处理这种情况是过程编写者的责任。
-
-
过程的 上下文(与过程想要使用的每个资源相同)使用
@Context注解。
|
在过程中发出错误信号的正确方法是抛出 |
可注入资源
编写过程时,可以将某些资源从数据库注入到过程中。要注入这些资源,请使用 @Context 注解。可注入的类包括:
-
Log -
TerminationGuard -
GraphDatabaseService -
事务
上述所有类都被认为是安全且面向未来的,不会危及数据库的安全性。还有一些不受支持(受限)的类也可以被注入,但可能会在几乎没有通知的情况下发生更改。设计为使用这些受限 API 的过程默认不会被加载,您需要使用 dbms.security.procedures.unrestricted 来加载不安全的过程。有关此配置设置的更多信息,请阅读 操作手册 → 保护扩展。
内存资源跟踪
|
过程框架的内存资源跟踪 API 目前处于预览阶段。Neo4j 的未来版本可能会对此 API 进行破坏性更改。 |
如果您的过程或函数占用了大量堆内存,您可以注册分配以计入配置的事务限制,请参阅 操作手册 → 限制事务内存使用 以获取更多信息。这有助于避免导致数据库重启的 OutOfMemory 错误。内存分配也会显示在查询分析中。
为此,您需要在过程/函数类中将 org.neo4j.procedure.memory.ProcedureMemory 作为字段注入。ProcedureMemory 提供了多种方法允许您注册分配。例如(请参阅 javadoc 获取完整参考):
-
ProcedureMemoryTracker newTracker()创建一个绑定到当前事务的新内存资源跟踪器。 -
HeapEstimator heapEstimator()估算类和实例的堆大小。 -
HeapTrackingCollectionFactory collections()允许您创建内置了其内部结构内存跟踪功能的集合。
实现内存资源跟踪通常既困难又耗时。以下是一些值得记住的注意事项和警告:
-
限制内存管理的范围。仅关注内存占用可能显著增长的部分,忽略较小的偏差。
-
注意不要多次注册同一实例的分配,以免导致高估。如果担心此问题,可以添加引用计数或其他机制来避免高估。
-
通常在实例分配之前无法知道其大小,这可能导致您在分配完成后才注册分配。内存跟踪器实现尝试通过在内部内存池中预先注册一定量的内存来防止这种情况。
-
在 Java 中知道实例何时被垃圾回收是非常繁琐的。通常,您会在内存可以被垃圾回收时注册内存释放。为了解决这个问题,内存跟踪器可能会在内部选择不立即注册内存释放。
-
测试内存资源跟踪可能很困难。一种方法是使用第三方库(如 JAMM - Java Agent for Memory Measurements),并断言估算值对于给定的输入足够接近。
package org.example;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.memory.ProcedureMemory;
import java.util.Arrays;
import java.util.stream.Stream;
public class MyProcedures {
@Context
public ProcedureMemory memory;
record Output(Long value) {}
@Procedure("org.example.memoryHungryRange")
public Stream<Output> memoryHungryRange(@Name("size") int size) {
final var tracker = memory.newTracker();
// Register the allocation of the long array below
tracker.allocateHeap(memory.heapEstimator().sizeOfLongArray(size));
// The actual allocation
final var result = new long[size];
for (int i = 0; i < size; i++) result[i] = i;
return Arrays.stream(result)
.mapToObj(Output::new)
// Release all registered allocations when the stream is closed
.onClose(tracker::close);
}
}
保留和弃用的过程命名空间
请注意,弃用的过程命名空间将在下一个主要 Cypher 版本中移至保留命名空间。有关 Neo4j 和 Cypher 版本控制的更多信息,请参阅 操作手册 → 介绍。
| 保留 | 自 Neo4j 2025.11 起在 Cypher 25 中弃用 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|