运行并发事务

使用 Goroutines 和 channels 来运行并发查询,或者将查询结果的处理委托给多个线程。下面的示例还使用了 Go 的 sync来协调不同的例程。如果您不熟悉 Go 中的并发,请查看 Go 编程语言 → Go 并发模式:管道和取消

如果您需要在不同的事务之间实现因果一致性,请使用 书签 (bookmarks)

并发处理查询结果集(使用会话)

以下示例展示了如何将查询结果流式传输到通道中,并由多个消费者并发处理其中的记录。

package main

import (
    "fmt"
    "context"
    "time"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v6/neo4j"
)

type item struct {
    record *neo4j.Record
    err    error
}

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<database-uri>"
    dbUser := "<username>"
    dbPassword := "<password>"
    driver, err := neo4j.NewDriver(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    // Run a query and get results in a channel
    recordsC := queryToChannel(ctx, driver)  (1)

    // Spawn some consumers that will process records
    // They communicate back on the log channel
    // WaitGroup allows to keep track of progress and close channel when all are done
    log := make(chan string)  (4)
    wg := &sync.WaitGroup{}  (5)
    for i := 1; i < 10; i++ {  // i starts from 1 because 0th receiver would process too fast
        wg.Add(1)
        go consumer(wg, recordsC, log, i)  (6)
    }
    // When all consumers are done, close log channel
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log as it comes
    for v := range log {
        fmt.Println(v)
    }
}

func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan item {
    recordsC := make(chan item, 10)  (2)
    go func() {
        defer close(recordsC)
        session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
        defer session.Close(ctx)
        // Cypher query to create and retrieve some nodes
        result, err := session.Run(ctx, `
            UNWIND range(1,25) AS id
            MERGE (p:Person {id: id})
            RETURN p
            `, nil)
        if err != nil {
            var it item
            it.record, it.err = nil, err
            recordsC <- it
        } else {
            // Stream results to channel as they come from the server
            for result.Next(ctx) { (3)
                record := result.Record()
                var it item
                it.record, it.err = record, nil
                recordsC <- it
            }
            // If the stream interrupts, send the error through the channel
            if result.Err() != nil {
                var it item
                it.record, it.err = nil, err
                recordsC <- it
            }
        }
    }()
    return recordsC
}

func consumer(wg *sync.WaitGroup, records <-chan item, log chan string, n int) {
    defer wg.Done()  // will communicate that routine is done
    for it := range records {
        if it.err != nil {
            fmt.Println("ERROR:", it.err)
        } else {
            log <- fmt.Sprintf("Receiver %v processed %v", n, it.record)
            time.Sleep(time.Duration(n) * time.Second)  // proxy for a time-consuming processing
        }
    }
}
1 一个 Goroutine 使用 隐式事务 向 Neo4j 服务器运行查询。请注意,驱动程序的 会话 (session) 是在例程 内部 创建的,因为会话不是线程安全的。
2 通道 recordsC 是查询结果记录流向的地方,各种 consumer 从中读取数据。它是带缓冲的,这样驱动程序获取记录的速度就不会超过消费者的处理速度。
3 来自服务器的每一条结果记录都会通过 recordsC 通道发送。只要还有待处理的记录,流式传输就会持续,之后通道会关闭且例程退出。请注意,流式传输过程中可能会发生错误,该错误也必须通过通道发送。
4 通道 log 是消费者进行通信的地方。
5 需要一个 sync.WaitGroup 来确定所有消费者何时完成工作,从而可以关闭 log 通道。
6 多个 consumer 在单独的 Goroutines 中启动。每个消费者从 recordsC 通道读取并处理记录。每个消费者都通过睡眠定时器模拟一个耗时的操作。

上述示例使用了 隐式事务,而非 托管事务。托管事务要求事务函数是幂等的——但在使用 Goroutines 和通道时,这是无法实现的。在记录 v1, v2, v3 已经被流式传输并发送到通道中,而此时发生了可重试错误(导致驱动程序重新运行事务函数)的情况下,通道将总共流式传输 v1, v2, v3, err, v1, v2, v3, …。为了避免这种行为,如果您仍然希望流式传输结果而不对其进行缓冲,请使用其他形式的事务(隐式显式)。

并发运行多个查询(使用 ExecuteQuery()

以下示例展示了如何并发执行多个 ExecuteQuery() 调用。

package main

import (
    "fmt"
    "context"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v6/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<database-uri>"
    dbUser := "<username>"
    dbPassword := "<password>"
    driver, err := neo4j.NewDriver(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    log := make(chan string)  (1)
    wg := &sync.WaitGroup{}  (2)
    // Spawn 10 concurrent queries
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go runQuery(wg, ctx, driver, log)  (3)
    }
    // Wait for all runner routines to be done before closing log
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log
    for msg := range log {
        fmt.Println(msg)
    }
}

// Run Neo4j query with random sleep time, returning the sleep time in ms
func runQuery(wg *sync.WaitGroup, ctx context.Context, driver neo4j.Driver, log chan string) {
    defer wg.Done()  // will communicate that routine is done
    result, err := neo4j.ExecuteQuery(ctx, driver, `
        WITH round(rand()*2000) AS waitTime
        CALL apoc.util.sleep(toInteger(waitTime)) RETURN waitTime AS time
        `, nil, neo4j.EagerResultTransformer,
        neo4j.ExecuteQueryWithDatabase("<database-name>"))
    if err != nil {
        log <- fmt.Sprintf("ERROR: %v", err)
    } else {
        neo, _ := result.Records[0].Get("time")
        log <- fmt.Sprintf("Query returned %v", neo)
    }
}
1 log 通道是所有查询例程进行通信的地方。
2 需要一个 sync.WaitGroup 来确定所有查询例程何时完成工作,从而可以关闭 log 通道。
3 运行了十个不同的查询,每个都在自己的 Go 例程中。它们独立并并发地运行,并向共享的 log 通道报告。

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 ExecuteReadExecuteWrite 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

Driver 对象包含与 Neo4j 数据库建立连接所需的详细信息。