事务管理

本页面仅描述在使用 Neo4j Java API 时事务管理的某些特定方面,并提供了一些如何避免死锁、如何为特定数据库注册事务事件监听器,以及如何针对事务变更集执行基本操作的示例。

因此,强烈建议您在继续阅读本页之前,先阅读 操作手册 → 数据库内部机制与事务行为

概述

访问图、索引或模式的数据库操作均在事务中执行,以确保 ACID 特性。事务是单线程的、受限的且相互独立。在单个线程中可以启动多个事务,且它们彼此独立。

事务的交互周期遵循以下步骤:

  1. 开启事务。

  2. 执行数据库操作。

  3. 提交或回滚事务。

完成每个事务至关重要,因为事务获取的锁或内存只有在完成后才会释放。有关锁和死锁的更多信息,请参阅 操作手册 → 锁与死锁

Neo4j 中事务的惯用用法是使用 try-with-resources 语句,并将 transaction 声明为资源之一。然后启动事务并尝试执行图操作。try 块中的最后一个操作应根据业务逻辑提交或回滚事务。在这种场景下,try-with-resources 用作防止意外异常的保护措施,并作为一种额外的安全机制,确保无论语句块内部发生什么,事务都能得到关闭。所有未提交的事务都将在语句结束时作为资源清理的一部分进行回滚。对于已显式提交或回滚的事务,无需进行额外的资源清理,此时关闭事务是一个空操作。

事务中执行的所有修改都会保留在内存中。这意味着必须将非常大的更新拆分为多个事务,以避免内存耗尽。

死锁处理示例

由使用除 Neo4j 管理的锁之外的其他同步方式引起的死锁仍然可能发生。由于 Neo4j API 中的所有操作除非另有说明,否则都是线程安全的,因此无需外部同步。其他需要同步的代码应以这样一种方式进行同步:即在同步块中绝不执行任何 Neo4j 操作。

以下是一个如何在过程(Procedures)、服务器扩展或使用 Neo4j 嵌入式模式时处理死锁的示例。

代码片段中使用的完整源代码可以在 DeadlockDocTest.java 中找到。

在代码中处理死锁时,您可能需要解决以下几个问题:

  • 仅进行有限次数的重试,并在达到阈值时失败。

  • 在每次尝试之间暂停,以便在再次尝试之前让其他事务完成。

  • 重试循环不仅对死锁有用,对其他类型的瞬态错误也很有用。

下面是一个展示如何实现此功能的示例。

示例 1. 使用重试循环处理死锁

此示例展示了如何使用重试循环来处理死锁

Throwable txEx = null;
int RETRIES = 5;
int BACKOFF = 3000;
for ( int i = 0; i < RETRIES; i++ )
{
    try ( Transaction tx = databaseService.beginTx() )
    {
        Object result = doStuff(tx);
        tx.commit();
        return result;
    }
    catch ( Throwable ex )
    {
        txEx = ex;

        // Add whatever exceptions to retry on here
        if ( !(ex instanceof DeadlockDetectedException) )
        {
            break;
        }
    }

    // Wait so that we don't immediately get into the same deadlock
    if ( i < RETRIES - 1 )
    {
        try
        {
            Thread.sleep( BACKOFF );
        }
        catch ( InterruptedException e )
        {
            throw new TransactionFailureException( "Interrupted", e );
        }
    }
}

if ( txEx instanceof TransactionFailureException )
{
    throw ((TransactionFailureException) txEx);
}
else if ( txEx instanceof Error )
{
    throw ((Error) txEx);
}
else
{
    throw ((RuntimeException) txEx);
}

事务事件

可以注册一个 neo4j.org.graphdb.event.TransactionEventListener 来接收 Neo4j 数据库事务事件。一旦在 org.neo4j.dbms.api.DatabaseManagementService 实例上注册,它就会接收该数据库的事务事件。监听器会收到关于已执行任何写入操作并即将提交的事务的通知。如果未调用 Transaction#commit(),或者事务已通过 Transaction#rollback() 回滚,则事务将被回滚,且不会向监听器发送任何事件。

在事务提交之前,会调用监听器的 beforeCommit 方法,并传入事务中所做修改的完整差异(diff)。此时事务仍在运行,因此仍可进行更改。该方法还可以抛出异常,从而阻止事务提交。如果事务被回滚,随后将调用监听器的 afterRollback 方法。

监听器的执行顺序是不确定的——不能保证一个监听器所做的更改会被其他监听器看到。

如果 beforeCommit 在所有已注册的监听器中均成功执行,则事务提交,并调用传入相同事务数据的 afterCommit 方法。此调用还包括从 beforeCommit 返回的对象。

afterCommit 中,事务已关闭,访问 org.neo4j.graphdb.event.TransactionData 之外的任何内容都需要开启一个新的事务。一个 neo4j.org.graphdb.event.TransactionEventListener 会收到关于可以通过 org.neo4j.graphdb.event.TransactionData 访问到任何变更的事务的通知。某些索引和模式更改不会触发这些事件。

以下示例展示了如何为特定数据库注册监听器,并针对事务变更集执行基本操作。

代码片段中使用的完整源代码可以在 TransactionEventListenerExample.java 中找到。

示例 2. TransactionEventListener

注册事务事件监听器并检查变更集

public static void main( String[] args ) throws IOException
{
    FileUtils.deleteDirectory( HOME_DIRECTORY );
    var managementService = new DatabaseManagementServiceBuilder( HOME_DIRECTORY ).build();
    var database = managementService.database( DEFAULT_DATABASE_NAME );

    var countingListener = new CountingTransactionEventListener();
    managementService.registerTransactionEventListener( DEFAULT_DATABASE_NAME, countingListener );

    var connectionType = RelationshipType.withName( "CONNECTS" );
    try ( var transaction = database.beginTx() )
    {
        var startNode = transaction.createNode();
        var endNode = transaction.createNode();
        startNode.createRelationshipTo( endNode, connectionType );
        transaction.commit();
    }
}

private static class CountingTransactionEventListener implements TransactionEventListener<CreatedEntitiesCounter>
{
    @Override
    public CreatedEntitiesCounter beforeCommit( TransactionData data, Transaction transaction, GraphDatabaseService databaseService ) throws Exception
    {
        return new CreatedEntitiesCounter( size( data.createdNodes() ), size( data.createdRelationships() ) );
    }

    @Override
    public void afterCommit( TransactionData data, CreatedEntitiesCounter entitiesCounter, GraphDatabaseService databaseService )
    {
        System.out.println( "Number of created nodes: " + entitiesCounter.getCreatedNodes() );
        System.out.println( "Number of created relationships: " + entitiesCounter.getCreatedRelationships() );
    }

    @Override
    public void afterRollback( TransactionData data, CreatedEntitiesCounter state, GraphDatabaseService databaseService )
    {
    }
}

private static class CreatedEntitiesCounter
{
    private final long createdNodes;
    private final long createdRelationships;

    public CreatedEntitiesCounter( long createdNodes, long createdRelationships )
    {
        this.createdNodes = createdNodes;
        this.createdRelationships = createdRelationships;
    }

    public long getCreatedNodes()
    {
        return createdNodes;
    }

    public long getCreatedRelationships()
    {
        return createdRelationships;
    }
}