运行非阻塞异步查询

查询数据库中的示例使用了 async/await 语法,这会强制驱动程序同步工作。当在查询中使用 await 时,您的应用程序会等待服务器检索所有查询结果并将其传输到驱动程序。对于大多数用例来说,这不是问题,但对于处理时间较长或结果集较大的查询,异步处理可能会加速您的应用程序。

有几种运行异步查询的方法

  • 异步迭代 (Asynchronous iteration) — 查询结果会尽可能快地(迭代地)被您的应用程序处理。驱动程序会相应地调节服务器传输的记录数量。

  • Promise API — 查询结果作为 Promise 返回。只有当完整的结果集可供驱动程序使用时,该 promise 才会解析。最适合服务器处理时间较长,但您希望一次性处理结果的查询。您的应用程序会批量接收结果,以便进行立即消费。

  • 流式 API (Streaming API) — 查询结果作为流返回,以便每个结果记录在可用时立即被处理。最适合记录需要单独处理的查询。您的应用程序会按片段接收结果,以便进行延迟消费。

  • 响应式 API (Reactive API) — 适用于响应式应用程序。

在事务函数中使用 await tx.run() 时,您可以将查询结果按原样从事务函数中返回以供进一步处理。另一方面,对于异步查询,您必须在事务函数内部处理结果(Promise API 除外)。

异步迭代

Result 对象支持 异步迭代。这允许您的应用程序按自己的节奏处理数据,驱动程序会相应地调节从服务器流式传输记录的速度,从而应用 背压 (backpressure)。使用异步迭代器,您可以保证应用程序接收数据的速度不会超过其处理数据的速度。

const session = driver.session()
try {
  const peopleNames = await session.executeWrite(async tx => {
    const result = tx.run(  (1)
      'MERGE (p:Person {name: $name}) RETURN p.name AS name',
      { name: 'Alice' }
    )
    let names = []
    for await (const record of result) {  (2)
      console.log(`Processing ${record.get('name')}`)
      names.push(record.get('name'))
    }
    return names  (3)
  })
} finally {
  await session.close()
}
1 运行查询
2 通过异步迭代处理记录
3 返回已处理的结果(而不是原始查询结果)

异步迭代器的使用有两点重要事项

  • 每个查询结果只能进行一次异步迭代。一旦结果游标 (cursor) 到达流的末尾,它就不会回退,因此您无法对同一个结果进行多次迭代。如果您的应用程序需要多次处理该数据,则必须手动将其存储在辅助数据结构(如上文中的列表)中。

  • 结果的处理必须在事务函数内部进行。您不应该将原始结果返回到事务函数之外然后再对其进行迭代。该工作流仅适用于 Promise API

Promise API

Promise API 允许运行查询并以 Promise 的形式接收结果。您可以将此查询方法视为允许您指定 Cypher 查询以及多个根据查询结果异步执行的回调函数。

const session = driver.session({database: '<database-name>'})
const result = session.executeWrite(async tx => {  (1)
  return tx.run(
    'MERGE (p:Person {name: $name}) RETURN p.name AS name',
    { name: 'Alice' }
  )
})
result.then(result => {  (2)
  result.records.forEach(record => {
    console.log(record.get('name'))
  })
  return result
})
.catch(error => {  (3)
  console.log(error)
})
.then(() => session.close())  (4)
1 运行查询
2 指定成功运行后的回调,并将查询结果作为输入
3 指定失败运行后的回调,并将驱动程序错误作为输入
4 指定无论查询结果如何都会运行的回调

Promise API 也适用于 Driver.executeQuery()。如果您在 .executeQuery() 调用前加上 await(如 查询数据库 中所示),您实际上强制应用程序等待结果准备就绪,并获得上述示例中 resulterror 的对应结果。

组合多个事务

要在同一事务中运行多个查询,请使用 Promise.all()。它会并发运行异步操作,因此您可以同时提交多个查询并等待它们全部完成。

检索人员姓名并将他们分配给 Neo4j 公司
const companyName = 'Neo4j'
const session = driver.session({database: '<database-name>'})
try {
  const names = await session.executeRead(async tx => {
    const result = await tx.run('MATCH (p:Person) RETURN p.name AS name')
    return result.records.map(record => record.get('name'))
  })

  const relationshipsCreated = await session.executeWrite(tx =>
    Promise.all(  // group together all Promises
      names.map(name =>
        tx.run(`
          MATCH (emp:Person {name: $personName})
          MERGE (com:Company {name: $companyName})
          MERGE (emp)-[:WORKS_FOR]->(com)
          `, { personName: name, companyName: companyName }
        )
        .then(result => result.summary.counters.updates().relationshipsCreated)
      )
    ).then(values => values.reduce((a, b) => a + b))  // aggregate results
  )

  console.log(`Created ${relationshipsCreated} employees relationships.`)
} finally {
  await session.close()
}

流式 API

流式 API 允许运行查询并在服务器准备好结果时立即单独接收它们。您可以指定一个回调来处理每条记录。此 API 特别适用于服务器检索不同记录所需时间可能不同的情况,但您希望在每条记录可用时立即处理它们。其行为类似于 异步迭代器,但编程风格有所不同。

const session = driver.session({database: '<database-name>'})
let peopleNames = []

session
  .run('MERGE (p:Person {name: $name}) RETURN p.name AS name', {  (1)
    name: 'Alice'
  })
  .subscribe({  (2)
    onKeys: keys => {  (3)
      console.log('Result columns are:')
      console.log(keys)
    },
    onNext: record => {  (4)
      console.log(`Processing ${record.get('name')}`)
      peopleNames.push(record.get('name'))
    },
    onCompleted: () => {  (5)
      session.close() // returns a Promise
    },
    onError: error => {  (6)
      console.log(error)
    }
  })
1 运行查询
2 将处理程序附加到结果流
3 onKeys 回调接收结果列列表
4 每次接收到记录时都会调用 onNext 回调
5 当事务结束时会调用 onCompleted 回调
6 如果发生错误,则触发 onError

响应式 API

响应式编程的典型特征是,在响应式流中,消费者控制其从查询中消费记录的速率,而驱动程序则相应地管理从服务器请求记录的速率。响应式 API 推荐用于已经面向响应式风格的应用程序。

const rxjs = require('rxjs');

const rxSession = driver.rxSession()  (1)
const rxResult = await rxSession.executeWrite(tx => {
  return tx
    .run('MERGE (p:Person {name: $name}) RETURN p.name AS name', {  (2)
      name: 'Alice'
    })
    .records()  (3)
    .pipe(  (4)
      rxjs.map(record => record.get('name')),
      //rxjs.materialize(),  // optional, turns outputs into Notifications
      rxjs.toArray()
    )
  })
const people = await rxResult.toPromise()
console.log(people)
1 获取响应式会话
2 运行查询
3 获取结果记录的 observable
4 响应式处理
响应式 API 需要一个 RxSession 并返回一个 RxResult
响应式 API 在驱动程序的精简版 (lite version) 中不可用。

术语表

LTS (长期支持版)

长期支持 (Long Term Support) 版本是保证在若干年内得到支持的版本。Neo4j 4.4 和 5.26 是 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它就像 SQL,但专用于图数据库。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数在 Cypher 本身中难以轻松实现。

Bolt

Bolt 是用于 Neo4j 实例和驱动程序之间交互的协议。默认监听 7687 端口。

ACID

原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability) (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保即使发生故障,数据库中的数据也能保持准确和一致。

最终一致性

如果一个数据库能保证所有集群成员在某个时间点都存储了数据的最新版本,则该数据库具有最终一致性。

因果一致性

如果读写查询被集群中的每个成员以相同的顺序看到,则数据库具有因果一致性。这比最终一致性更强。

NULL

空标记不是一种类型,而是缺失值的占位符。更多信息,请参阅 Cypher → 使用 null

事务

事务是一个工作单元,要么被提交,要么在失败时被回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或全部撤销,以避免钱从一个账户扣除却未存入另一个账户的情况。

背压

背压是对数据流的抵抗力。它确保客户端不会被过快发送的数据压垮,从而超出其处理能力。

书签

书签是代表数据库某种状态的标记。通过将一个或多个书签与查询一起传递,服务器将确保在所表示的状态建立之前,该查询不会被执行。

事务函数

事务函数是由 executeReadexecuteWrite 调用执行的回调。如果发生服务器故障,驱动程序会自动重新执行该回调。

驱动程序 (Driver)

Driver 对象保存了与 Neo4j 数据库建立连接所需的详细信息。