运行您自己的事务
当使用 executeQuery() 查询数据库时,驱动程序会自动创建一个事务。事务是一个工作单元,要么全部提交,要么在失败时全部回滚。您可以在单个查询中包含多个 Cypher 语句,例如在更新数据库时依次使用 MATCH 和 CREATE,但不能在多个查询之间穿插客户端逻辑。
对于这些更高级的用例,驱动程序提供了手动控制事务的函数。最常见的形式是托管事务,您可以将其视为一种拆解 executeQuery() 流程的方法,并能够在更多地方指定其所需行为。
创建会话
在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间的查询通道,并确保强制执行因果一致性。
会话(Session)通过 Driver.session() 方法创建。它接受一个(可选的)对象参数,其中的 database 属性允许指定目标数据库。关于更多参数,请参见会话配置。
session = driver.session({ database: '<database-name>' })
创建会话是一个轻量级操作,因此创建和销毁会话的成本不高。完成工作后,请务必关闭会话。
会话不是线程安全的:您可以在线程之间共享主要的 Driver 对象,但每个线程都应该创建自己的会话。
运行托管事务
事务可以包含多个查询。由于 Neo4j 符合 ACID 标准,事务内的查询要么作为一个整体执行,要么完全不执行:您不会遇到事务一部分成功而另一部分失败的情况。使用事务将协同工作以实现单个逻辑数据库操作的相关查询分组在一起。
您可以使用 Session.executeRead() 和 Session.executeWrite() 方法创建托管事务,具体取决于您是要从数据库检索数据还是更改数据。这两个方法都接受一个事务函数回调,该回调负责实际执行查询并处理结果。
Al 开头的人员。let session = driver.session({ database: '<database-name>' }) (1)
try {
let result = await session.executeRead(async tx => { (2)
return await tx.run(` (3)
MATCH (p:Person) WHERE p.name STARTS WITH $filter
RETURN p.name AS name ORDER BY name
`, {filter: 'Al'}
)
})
for(let record in result.records) { (4)
console.log(record.get('name'))
}
console.log(
`The query \`${result.summary.query.text}\`` +
`returned ${result.records.length} nodes.\n`
)
} finally {
session.close()
}
| 1 | 创建一个会话。单个会话可以作为多个查询的容器。记得在完成后关闭它。 |
| 2 | .executeRead()(或 .executeWrite())方法是进入事务的入口点。 |
| 3 | 使用 Transaction.run() 方法运行查询,提供 Cypher 查询和查询参数对象。每次运行查询都会返回一个 Result 对象。 |
| 4 | 处理结果记录和查询摘要。 |
不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数。
事务函数绝不应直接返回 Result 对象。相反,始终以某种方式处理结果;至少应将其转换为列表。在事务函数内部,return 语句会导致事务被提交,而如果引发异常,事务则会自动回滚。
const neo4j = require('neo4j-driver');
(async () => {
const URI = '<database-uri>'
const USER = '<username>'
const PASSWORD = '<password>'
let driver, session
let employeeThreshold = 10
try {
driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
await driver.verifyConnectivity()
} catch(err) {
console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
await driver.close()
return
}
session = driver.session({ database: '<database-name>' })
for(let i=0; i<100; i++) {
const name = `Neo-${i.toString()}`
const orgId = await session.executeWrite(async tx => {
let result, orgInfo
// Create new Person node with given name, if not already existing
await tx.run(`
MERGE (p:Person {name: $name})
RETURN p.name AS name
`, { name: name }
)
// Obtain most recent organization ID and number of people linked to it
result = await tx.run(`
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
ORDER BY o.createdDate DESC
LIMIT 1
`)
if(result.records.length > 0) {
orgInfo = result.records[0]
}
if(orgInfo != undefined && orgInfo['employeesN'] == 0) {
throw new Error('Most recent organization is empty.')
// Transaction will roll back -> not even Person is created!
}
// If org does not have too many employees, add this Person to that
if(orgInfo != undefined && orgInfo['employeesN'] < employeeThreshold) {
result = await tx.run(`
MATCH (o:Organization {id: $orgId})
MATCH (p:Person {name: $name})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN $orgId AS id
`, { orgId: orgInfo['id'], name: name }
)
// Otherwise, create a new Organization and link Person to it
} else {
result = await tx.run(`
MATCH (p:Person {name: $name})
CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN o.id AS id
`, { name: name }
)
}
// Return the Organization ID to which the new Person ends up in
return result.records[0].get('id')
})
console.log(`User ${name} added to organization ${orgId}`)
}
await session.close()
await driver.close()
})()
如果失败被认为是瞬时的(例如由于服务器暂时不可用),驱动程序会自动重试运行失败的查询。如果操作在配置的最大重试时间后仍然失败,则会引发错误。
由于事务可能会被重新运行,事务函数在多次运行产生相同效果(幂等性),因为您无法预先知道它们将被执行多少次。实际上,这意味着例如您不应该编辑或依赖全局变量。请注意,尽管事务函数可能会执行多次,但其中的数据库查询始终只会运行一次。
一个会话可以链接多个事务,但在任何给定时间,一个会话内只能激活一个事务。这意味着一个查询必须在下一个查询运行之前完成,这也是为什么前面的示例都使用 async/await 语法的原因。要维护多个并发事务,请参阅如何运行异步查询。
运行显式事务
您可以通过使用 Session.beginTransaction() 方法手动开启事务,从而实现对事务的完全控制。正如在事务函数中一样,您可以使用 Transaction.run() 方法在显式事务中运行查询。
let session = driver.session({ database: '<database-name>' })
let transaction = await session.beginTransaction()
// use tx.run() to run queries
// tx.commit() to commit the transaction
// tx.rollback() to rollback the transaction
await transaction.commit()
await session.close()
显式事务可以通过 Transaction.commit() 提交,或通过 Transaction.rollback() 回滚。如果没有采取显式操作,驱动程序将在事务生命周期结束时自动回滚事务。
|
因瞬时服务器错误而导致 |
显式事务对于需要将 Cypher 执行分布在同一个事务的多个函数中,或者需要在一个事务内运行多个查询但不需要托管事务提供的自动重试的应用程序最有用。
const neo4j = require('neo4j-driver');
const URI = '<database-uri>';
const USER = '<username>';
const PASSWORD = '<password>';
(async () => {
try {
driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
await driver.verifyConnectivity()
} catch(err) {
console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
await driver.close()
return
}
let customerId = await createCustomer(driver)
let otherBankId = 42
await transferToOtherBank(driver, customerId, otherBankId, 999)
await driver.close()
})()
async function createCustomer(driver) {
let { records } = await driver.executeQuery(`
MERGE (c:Customer {id: randomUUID()})
RETURN c.id AS id
`, {},
{ database: '<database-name>' }
)
return records[0].get("id")
}
async function transferToOtherBank(driver, customerId, otherBankId, amount) {
const session = driver.session({ database: '<database-name>' })
const tx = await session.beginTransaction()
try {
if(! checkCustomerBalance(tx, customerId, amount))
return
try {
decreaseCustomerBalance(tx, customerId, amount)
await tx.commit()
} catch (error) {
requestInspection(customerId, otherBankId, amount, e)
throw error // roll back
}
await otherBankTransferApi(customerId, otherBankId, amount)
// Now the money has been transferred => can't rollback anymore
// (cannot rollback external services interactions)
} finally {
await session.close()
}
}
async function checkCustomerBalance(tx, customerId, amount) {
result = await tx.run(`
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
`, { id: customerId, amount: amount },
{ database: '<database-name>' }
)
return result.records[0].get('sufficient')
}
async function otherBankTransferApi(customerId, otherBankId, amount) {
// make some API call to other bank
}
async function decreaseCustomerBalance(tx, customerId, amount) {
await tx.run(`
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
`, { id: customerId, amount: amount }
)
}
async function requestInspection(customerId, otherBankId, amount, error) {
// manual cleanup required; log this or similar
console.log('WARNING: transaction rolled back due to exception:')
console.log(error)
}
会话配置
创建会话时,您可以提供一个 SessionConfig 类型的可选参数来指定会话配置值。
数据库选择
始终显式指定数据库,即使在单数据库实例上也应使用 database 参数。这使得驱动程序能够更高效地工作,因为它节省了向服务器解析主数据库的网络往返。如果没有指定数据库,则使用 Neo4j 实例设置中配置的用户主数据库。
const session = driver.session({
database: '<database-name>'
})
请求路由
在集群环境中,所有会话默认以写入模式打开,并将其路由到领导者(Leader)。您可以通过将 defaultAccessMode 参数显式设置为 neo4j.session.READ 来更改此设置。请注意,.executeRead() 和 .executeWrite() 会自动覆盖会话的默认访问模式。
const session = driver.session({
database: '<database-name>',
defaultAccessMode: neo4j.session.READ
})
|
尽管在读取模式下执行写入查询会导致运行时错误,但您不应依赖此功能进行访问控制。这两种模式的区别在于:读取事务将被路由到集群中的任何节点,而写入事务会被定向到主节点(primaries)。不能保证以读取模式提交的写入查询一定会遭到拒绝。 |
以其他用户身份运行查询
您可以通过配置参数 auth 使用不同的用户执行查询。在会话级别切换用户比创建新的 Driver 对象成本更低。查询随后将在给定用户的安全上下文中运行(即主数据库、权限等)。
const session = driver.session({
database: '<database-name>',
auth: neo4j.auth.basic('<username>', '<password>')
})
参数 impersonatedUser 提供了类似的功能。区别在于,您不需要知道用户的密码即可模拟他们,但创建 Driver 时所用的用户需要具有相应的权限。
const session = driver.session({
database: '<database-name>',
impersonatedUser: '<username>'
})
事务配置
您可以通过向 .executeRead()、.executeWrite() 和 .beginTransaction() 提供第二个 TransactionConfig 类型的可选参数,来对事务进行进一步控制。您可以指定:
-
事务超时时间(以毫秒为单位)。运行时间超过此限制的事务将由服务器终止。默认值在服务器端设置。最小值为 1 毫秒。
-
附加到事务的元数据对象。这些元数据会被记录在服务器的
query.log中,并且在SHOW TRANSACTIONS YIELD *Cypher 命令的输出中可见。使用此功能来标记事务。
let session = driver.session({ database: '<database-name>' })
const people_n = await session.executeRead(
async tx => { return await tx.run('MATCH (a:Person) RETURN count(a)') },
{ timeout: 5000, metadata: {'app_name': 'people'} } // TransactionConfig
)
关闭会话
每个连接池都有有限数量的会话,因此如果您打开会话而不关闭它们,应用程序可能会耗尽会话资源。因此,在完成工作后务必关闭会话非常重要,这样它们才能返回连接池以供稍后重用。最好的方法是将会话使用封装在 try/finally 块中,并在 finally 子句中调用 session.close()。
let session = driver.session({database: '<database-name>'})
try {
// use session to run queries
} finally {
await session.close()
}
术语表
- LTS (长期支持版)
-
长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。
- Bolt
-
Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。
- 最终一致性
-
如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。
- 因果一致性
-
如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用
null。 - 事务
-
事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。
- 背压
-
背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。
- 书签
-
书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。
- 事务函数
-
事务函数是由
executeRead或executeWrite调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。 - 驱动程序 (Driver)
-
Driver对象保存了与 Neo4j 数据库建立连接所需的详细信息。