其他查询机制
隐式(或自动提交)事务
隐式事务是唯一可用于 CALL { … } IN TRANSACTIONS 查询的事务类型。 |
隐式事务是事务最基本且受限的形式。驱动程序不会自动重试隐式事务,这与使用 .execute_query() 和 托管事务 运行的查询不同。仅当其他查询接口不适用或用于快速原型设计时,才应使用隐式事务。
您可以使用 Session.run() 方法运行隐式事务。它会返回一个需要 相应处理 的 Result 对象。
with driver.session(database="<database-name>") as session:
res = session.run("CREATE (a:Person {name: $name})", name="Licia")
res.consume()
为确保隐式事务被提交,请消耗所有记录(通过对结果调用 .consume() 或迭代所有记录)。如果不消耗所有记录,可能会导致意想不到的行为:无法保证隐式事务在会话生命周期内的确切提交时间。
|
不要依赖关闭会话来提交隐式事务。虽然挂起的查询会在关闭会话之前提交到数据库,但无法保证它们一定会成功,并且除非以某种方式显式处理了结果,否则不会引发异常。以下示例包含一个可能不会引发异常的错误查询
|
由于驱动程序无法判断 session.run() 调用中的查询需要读取还是写入会话,因此默认选择写入。如果您的隐式事务仅包含只读查询,通过在创建会话时设置关键字参数 default_access_mode=neo4j.READ_ACCESS 来告知驱动程序,可以提升性能。
导入 CSV 文件
使用 Session.run() 最常见的用例是结合 Cypher 的 LOAD CSV 子句将大型 CSV 文件导入数据库,并防止因事务过大而导致的超时错误。
with driver.session(database="<database-name>") as session:
result = session.run("""
LOAD CSV FROM 'https://data.neo4j.com/bands/artists.csv' AS line
CALL {
WITH line
MERGE (:Artist {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS OF 2 ROWS
""")
print(result.consume().counters)
虽然 LOAD CSV 很方便,但将 CSV 文件的解析推迟到 Python 应用程序中处理并避开 LOAD CSV 并没有什么不妥。事实上,将解析逻辑移至应用程序可以使您更好地控制导入过程。有关高效批量数据插入的信息,请参阅 性能 → 批量数据创建。 |
更多信息,请参阅 Cypher → 子句 → Load CSV。
事务配置
Query 对象允许指定查询超时时间并为事务附加元数据。该元数据可在服务器日志中查看(如 unit_of_work 装饰器所述)。
from neo4j import Query
with driver.session(database="<database-name>") as session:
query = Query("CREATE (a:Person {name: $name})",
timeout=1.0,
metadata={"app_name": "people"})
result = session.run(query, name="John")
属性键、关系类型和标签中的动态值
通常,你不应该直接将参数拼接到查询字符串中,而应该使用 查询参数。然而,在某些情况下,查询结构可能导致无法在所有地方使用参数。实际上,尽管参数适用于字面量、表达式以及 节点标签和关系类型,但它们不能用于属性键,因此 MATCH (n) WHERE n.$param = 'something' 是无效的。
使用字符串拼接时,请将动态值用反引号括起来,并自行进行转义以防止 Cypher 注入。请注意,Cypher 处理 Unicode,因此也要留意 Unicode 字面量 \u0060。
key = "nam\\u0060e"
# convert \u0060 to literal backtick and then escape backticks
escaped_key = key.replace("\\u0060", "`").replace("`", "``")
driver.execute_query(
f"MATCH (p:Person {{`{escaped_key}`: $name}}) RETURN p.name",
name="Alice",
database_="<database-name>"
)
# rewritten to
# MATCH (p:Person {`nam``e`: $name}) RETURN p.name
避免字符串拼接的另一种变通方法是使用 APOC 过程,例如 apoc.merge.node,它支持动态标签和属性键。
apoc.merge.node 创建带有动态标签/属性键的节点。property_key = "name"
label = "Person"
driver.execute_query(
"CALL apoc.merge.node($labels, $properties)",
labels=[label], properties={property_key: "Alice"},
database_="<database-name>"
)
| 如果您在 Docker 中运行 Neo4j,则需要在启动容器时启用 APOC。请参阅 APOC → 安装 → Docker。 |
日志记录
驱动程序通过原生的 logging 库将消息记录到名为 neo4j 的日志记录器中。要将日志消息重定向到标准输出,请使用 watch 函数
import sys
from neo4j.debug import watch
watch("neo4j", out=sys.stdout)
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <POOL> created, routing address IPv4Address(('localhost', 7687))
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <POOL> acquire routing connection, access_mode='WRITE', database='neo4j'
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <ROUTING> checking table freshness (readonly=False): table expired=True, has_server_for_mode=False, table routers={IPv4Address(('localhost', 7687))} => False
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <POOL> attempting to update routing table from IPv4Address(('localhost', 7687))
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,616 [#0000] _: <RESOLVE> in: localhost:7687
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] _: <RESOLVE> dns resolver out: 127.0.0.1:7687
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] _: <POOL> _acquire router connection, database='neo4j', address=ResolvedIPv4Address(('127.0.0.1', 7687))
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] _: <POOL> trying to hand out new connection
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,617 [#0000] C: <OPEN> 127.0.0.1:7687
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,619 [#AF18] C: <MAGIC> 0x6060B017
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,619 [#AF18] C: <HANDSHAKE> 0x00000005 0x00020404 0x00000104 0x00000003
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,620 [#AF18] S: <HANDSHAKE> 0x00000005
[DEBUG ] [Thread 139807941394432] [Task None ] 2023-03-31 09:31:39,620 [#AF18] C: HELLO {'user_agent': 'neo4j-python/5.6.0 Python/3.10.6-final-0 (linux)', 'routing': {'address': 'localhost:7687'}, 'scheme': 'basic', 'principal': 'neo4j', 'credentials': '*******'}
术语表
- LTS (长期支持版)
-
长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。
- Bolt
-
Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。
- 最终一致性
-
如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。
- 因果一致性
-
如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用
null。 - 事务
-
事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。
- 背压
-
背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。
- 书签
-
书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。
- 事务函数
-
事务函数是由
execute_read或execute_write调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。 - 驱动程序 (Driver)
-
一个
Driver对象保存了与 Neo4j 数据库建立连接所需的详细信息。