运行您自己的事务
当 使用 ExecuteQuery() 查询数据库 时,驱动程序会自动创建一个事务。事务是一个工作单元,要么全部提交 (commit),要么在失败时回滚 (roll back)。您可以在单个查询中包含多个 Cypher 语句,例如在 更新数据库 时顺序使用 MATCH 和 CREATE,但不能在多个查询之间插入客户端逻辑。
对于这些更高级的用例,驱动程序提供了手动控制事务的函数。最常见的形式是托管事务 (managed transactions),您可以将其视为一种拆解 ExecuteQuery() 流程并能够在更多地方指定其所需行为的方法。
创建会话 (Session)
运行托管事务
事务可以包含多个查询。由于 Neo4j 符合 ACID 标准,事务内的查询要么作为一个整体执行,要么完全不执行:您不会遇到事务一部分成功而另一部分失败的情况。使用事务将协同工作以实现单个逻辑数据库操作的相关查询分组在一起。
您可以使用 Session.ExecuteRead() 和 Session.ExecuteWrite() 方法创建托管事务,具体取决于您是要从数据库检索数据还是更改数据。这两个方法都采用一个 事务函数 回调,该回调负责实际执行查询和处理结果。
Al 开头的人员session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"}) (1)
defer session.Close(ctx)
people, err := session.ExecuteRead(ctx, (2)
func(tx neo4j.ManagedTransaction) (any, error) { (3)
result, err := tx.Run(ctx, ` (4)
MATCH (p:Person) WHERE p.name STARTS WITH $filter
RETURN p.name AS name ORDER BY name
`, map[string]any{
"filter": "Al",
})
if err != nil {
return nil, err
}
records, err := result.Collect(ctx) (5)
if err != nil {
return nil, err
}
return records, nil
})
for _, person := range people.([]*neo4j.Record) {
fmt.Println(person.AsMap())
}
| 1 | 创建一个会话。单个会话可以作为多个查询的容器。记得在完成后关闭它(在此,我们在打开会话后立即使用 defer 关闭它)。 |
| 2 | .ExecuteRead()(或 .ExecuteWrite())方法是进入事务的入口点。 |
| 3 | 事务函数回调负责运行查询。 |
| 4 | 使用 ManagedTransaction.Run() 方法来运行查询。每个查询运行都会返回一个 Result 对象。 |
| 5 | 使用 Result 上的任意方法 处理结果。.Collect() 方法将所有记录检索到列表中。 |
不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数。
事务函数不应直接返回结果对象。相反,请始终以某种方式 处理结果。在事务函数内,如果 error 为 nil,则 return 语句会导致事务被提交;如果返回的 error 值不为 nil,则事务会自动回滚。
package main
import (
"fmt"
"context"
"strconv"
"errors"
"github.com/neo4j/neo4j-go-driver/v6/neo4j"
)
func main() {
ctx := context.Background()
var employeeThreshold int64 = 10 // Neo4j's integer maps to Go's int64
// Connection to database
dbUri := "<database-uri>"
dbUser := "<username>"
dbPassword := "<password>"
driver, err := neo4j.NewDriver(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
// Create 100 people and assign them to various organizations
for i := 0; i < 100; i++ {
name := "Thor" + strconv.Itoa(i)
orgId, err := session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
var orgId string
// Create new Person node with given name, if not exists already
_, err := tx.Run(
ctx,
"MERGE (p:Person {name: $name})",
map[string]any{
"name": name,
})
if err != nil {
return nil, err
}
// Obtain most recent organization ID and the number of people linked to it
result, err := tx.Run(
ctx, `
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
ORDER BY o.createdDate DESC
LIMIT 1
`, nil)
if err != nil {
return nil, err
}
org, err := result.Single(ctx)
// If no organization exists, create one and add Person to it
if org == nil {
orgId, _ = createOrganization(ctx, tx)
fmt.Println("No orgs available, created", orgId)
err = addPersonToOrganization(ctx, tx, name, orgId)
if err != nil {
return nil, errors.New("Failed to add person to new org")
// Transaction will roll back
// -> not even Person and/or Organization is created!
}
} else {
orgId = org.AsMap()["id"].(string)
if employeesN := org.AsMap()["employeesN"].(int64);
employeesN == 0 {
return nil, errors.New("Most recent organization is empty")
// Transaction will roll back
// -> not even Person is created!
}
// If org does not have too many employees, add this Person to it
if employeesN := org.AsMap()["employeesN"].(int64);
employeesN < employeeThreshold {
err = addPersonToOrganization(ctx, tx, name, orgId)
if err != nil {
return nil, err
// Transaction will roll back
// -> not even Person is created!
}
// Otherwise, create a new Organization and link Person to it
} else {
orgId, err = createOrganization(ctx, tx)
if err != nil {
return nil, err
// Transaction will roll back
// -> not even Person is created!
}
fmt.Println("Latest org is full, created", orgId)
err = addPersonToOrganization(ctx, tx, name, orgId)
if err != nil {
return nil, err
// Transaction will roll back
// -> not even Person and/or Organization is created!
}
}
}
// Return the Organization ID to which the new Person ends up in
return orgId, nil
})
if err != nil {
fmt.Println(err)
} else {
fmt.Println("User", name, "added to organization", orgId)
}
}
}
func createOrganization(ctx context.Context, tx neo4j.ManagedTransaction) (string, error) {
result, err := tx.Run(
ctx, `
CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
RETURN o.id AS id
`, nil)
if err != nil {
return "", err
}
org, err := result.Single(ctx)
if err != nil {
return "", err
}
orgId, _ := org.AsMap()["id"]
return orgId.(string), err
}
func addPersonToOrganization(ctx context.Context, tx neo4j.ManagedTransaction, personName string, orgId string) (error) {
_, err := tx.Run(
ctx, `
MATCH (o:Organization {id: $orgId})
MATCH (p:Person {name: $name})
MERGE (p)-[:WORKS_FOR]->(o)
`, map[string]any{
"orgId": orgId,
"name": personName,
})
return err
}
如果失败被视为瞬态(例如由于服务器暂时不可用),驱动程序会自动重试运行失败的事务。如果操作在配置的 最大重试时间 后仍然失败,则会引发异常。
由于事务可能会被重新运行,事务函数在多次运行时应产生相同的效果(幂等性),因为您无法预先知道它们将被执行多少次。请注意,尽管事务函数可能会执行多次,但其中的数据库查询始终只会运行一次。
一个会话可以链接多个事务,但在任何给定时间,一个会话内只能激活一个事务。要维护多个并发事务,请使用多个并发会话。
运行显式事务
您可以通过 Session.BeginTransaction() 方法手动开始事务,从而实现对事务的完全控制。您可以使用 ExplicitTransaction.Run() 方法在显式事务中运行查询。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
tx, err := session.BeginTransaction(ctx)
if err != nil {
panic(err)
}
// use tx.Run() to run queries
// tx.Commit() to commit the transaction
// tx.Rollback() to rollback the transaction
显式事务可以使用 ExplicitTransaction.Commit() 提交,也可以使用 ExplicitTransaction.Rollback() 回滚。如果没有采取显式操作,驱动程序会在事务生命周期结束时自动回滚事务。
|
因瞬态服务器错误而失败的 |
显式事务对于需要将 Cypher 执行分布在同一个事务的多个函数中,或者需要在一个事务内运行多个查询但不需要托管事务提供的自动重试的应用程序最有用。
package main
import (
"fmt"
"context"
"github.com/neo4j/neo4j-go-driver/v6/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "<database-uri>"
dbUser := "<username>"
dbPassword := "<password>"
driver, err := neo4j.NewDriver(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
customerId, err := createCustomer(ctx, driver)
if err != nil {
panic(err)
}
otherBankId := 42
transferToOtherBank(ctx, driver, customerId, otherBankId, 999)
}
func createCustomer(ctx context.Context, driver neo4j.Driver) (string, error) {
result, err := neo4j.ExecuteQuery(ctx, driver, `
MERGE (c:Customer {id: randomUUID()})
RETURN c.id AS id
`, nil,
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("<database-name>"))
if err != nil {
return "", err
}
customerId, _ := result.Records[0].Get("id")
return customerId.(string), err
}
func transferToOtherBank(ctx context.Context, driver neo4j.Driver, customerId string, otherBankId int, amount float32) {
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
tx, err := session.BeginTransaction(ctx)
if err != nil {
panic(err)
}
if ! customerBalanceCheck(ctx, tx, customerId, amount) {
// give up
return
}
otherBankTransferApi(ctx, customerId, otherBankId, amount)
// Now the money has been transferred => can't rollback anymore
// (cannot rollback external services interactions)
err = decreaseCustomerBalance(ctx, tx, customerId, amount)
if err != nil {
requestInspection(ctx, customerId, otherBankId, amount, err)
}
err = tx.Commit(ctx)
if err != nil {
requestInspection(ctx, customerId, otherBankId, amount, err)
}
}
func customerBalanceCheck(ctx context.Context, tx neo4j.ExplicitTransaction, customerId string, amount float32) (bool) {
result, err := tx.Run(ctx, `
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
`, map[string]any{
"id": customerId,
"amount": amount,
})
if err == nil {
return false
}
record, err := result.Single(ctx)
if err == nil {
return false
}
sufficient := record.AsMap()["sufficient"]
return sufficient.(bool)
}
func otherBankTransferApi(ctx context.Context, customerId string, otherBankId int, amount float32) {
// make some API call to other bank
}
func decreaseCustomerBalance(ctx context.Context, tx neo4j.ExplicitTransaction, customerId string, amount float32) (error) {
_, err := tx.Run(ctx, `
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
`, map[string]any{
"id": customerId,
"amount": amount,
})
return err
}
func requestInspection(ctx context.Context, customerId string, otherBankId int, amount float32, err error) {
// manual cleanup required; log this or similar
fmt.Println("WARNING: transaction rolled back due to exception:", err)
fmt.Println("customerId:", customerId, "otherBankId:", otherBankId, "amount:", amount)
}
处理查询结果
驱动程序的查询输出是一个 Result 对象,它不直接包含结果记录。相反,它将 Cypher 结果封装在一个丰富的数据结构中,需要在客户端进行一些解析。需要注意以下两个主要点
-
结果记录不是立即且全部由服务器获取并返回的。相反,结果以延迟流的形式出现。具体来说,当驱动程序从服务器接收到一些记录时,它们最初在后台队列中进行缓冲。记录会保留在缓冲区中,直到被应用程序消费,此时它们会从缓冲区中移除。当没有更多可用记录时,结果即耗尽。
-
结果充当游标。这意味着除非您将其保存在辅助数据结构中,否则无法从流中检索先前的记录。
下方的动画演示了单个查询的路径:它展示了驱动程序如何处理结果记录,以及应用程序应如何处理结果。
处理结果的最简单方法是对其调用 .Collect(ctx),这将产生一个 Record 对象数组。否则,Result 对象实现了许多用于处理记录的方法。下面列出了最常用的方法。
| 名称 | 描述 |
|---|---|
|
以列表形式返回结果的剩余部分。 |
|
返回下一条也是最后一条剩余的记录,或返回 如果可用记录多于(或少于)一条,则会返回非 |
|
返回当前记录。 |
|
如果当前记录之后还有可处理的记录,则返回 |
|
返回查询结果摘要。它会耗尽结果,因此应仅在数据处理结束后调用。 |
有关 Result 方法的完整列表,请参阅 API 文档 → Result。
会话配置
数据库选择
在创建会话时,即使在单数据库实例上,也要通过配置参数 DatabaseName 显式指定数据库。这允许驱动程序更高效地工作,因为它节省了向服务器解析主数据库所需的网络往返。如果不指定数据库,将使用 Neo4j 实例设置中配置的 默认数据库。
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "<database-name>",
})
请求路由
在集群环境中,所有会话都以写入模式打开,并路由到主节点 (Leader)。您可以通过将配置参数 AccessMode 显式设置为 neo4j.AccessModeRead 或 neo4j.AccessModeWrite 来更改此设置。请注意,.ExecuteRead() 和 .ExecuteWrite() 会自动覆盖会话的默认访问模式。
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "<database-name>",
AccessMode: neo4j.AccessModeRead,
})
|
尽管在读取模式下执行写入查询会导致运行时错误,但您不应依赖此功能进行访问控制。这两种模式的区别在于:读取事务将被路由到集群中的任何节点,而写入事务会被定向到主节点(primaries)。不能保证以读取模式提交的写入查询一定会遭到拒绝。 对于 |
以其他用户身份运行查询
您可以通过配置选项 Auth 以不同用户身份执行查询。在会话级别切换用户比创建新的 Driver 对象成本更低。查询随后在给定用户的安全上下文(例如,主数据库、权限等)中运行。
sessionAuth := neo4j.BasicAuth("<username>", "<password>", "")
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "<database-name>",
Auth: &sessionAuth,
})
选项 ImpersonatedUser 提供了类似的功能:区别在于您不需要知道用户的密码即可模拟他们,但创建 Driver 时使用的用户需要具备 适当的权限。
session := driver.NewSession(ctx, neo4j.SessionConfig{
DatabaseName: "<database-name>",
ImpersonatedUser: "<username>",
})
事务配置
您可以通过向 .ExecuteRead()、.ExecuteWrite() 和 .BeginTransaction() 提供配置回调来对事务进行进一步控制。使用它们来指定
-
事务超时(以秒为单位)。运行时间较长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒。
-
附加到事务的元数据映射。这些元数据会记录在服务器的
query.log中,并且在SHOW TRANSACTIONSCypher 命令的输出中可见。使用此功能标记事务。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
people, err := session.ExecuteRead(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
result, _ := tx.Run(ctx, "MATCH (:Person) RETURN count(*) AS n", nil)
return result.Collect(ctx)
},
neo4j.WithTxTimeout(5*time.Second), // remember to import `time`
neo4j.WithTxMetadata(map[string]any{"appName": "peopleTracker"}))
关闭会话
每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会耗尽会话资源。因此,建议在创建新会话后立即使用 defer 关键字调用 session.Close(),以确保在任何情况下都会关闭它。当会话关闭时,它会被返回到连接池以供以后重用。
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
// session usage
在某些极端情况下,会话关闭可能会返回错误,因此您可能也需要捕获这些情况。
术语表
- LTS (长期支持版)
-
长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。
- Bolt
-
Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。
- 最终一致性
-
如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。
- 因果一致性
-
如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用
null。 - 事务
-
事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。
- 背压
-
背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。
- 书签
-
书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。
- 事务函数
-
事务函数是由
ExecuteRead或ExecuteWrite调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。 - 驱动程序 (Driver)
-
Driver对象包含与 Neo4j 数据库建立连接所需的详细信息。