从 REST API 导入 JSON 数据到 Neo4j

本文展示了一些将基于 JSON 的 REST API 数据加载到 Neo4j 中的技术。

将 JSON 数据导入 Neo4j

有大量的基于 JSON 的 Web API 可以导入到 Neo4j 中,我们可以使用 Load JSON 过程之一从这些 API 中检索数据,并将其转换为 Cypher® 可以使用的映射值 (map values)。

APOC 用户指南提供了一个示例,展示了如何将 StackOverflow 数据导入 Neo4j

Strava API

Strava 是跑步者和骑行者用来记录活动并与朋友分享的应用程序。用户可以通过基于 JSON 的 REST API 获取这些数据。

在开始调用 API 之前,我们需要创建一个应用程序。随后我们将获得一个访问令牌,在所有对 API 的请求中都需要使用该令牌。

我们可以通过运行以下命令在 Neo4j Browser 或 Cypher Shell 中创建参数:

:params {stravaToken: "Bearer <insert-strava-token>"}

别忘了将 <insert-strava-token> 替换为您 Strava 应用程序的令牌。

使用分页端点

我们有兴趣导入已登录运动员的活动。该端点采用以下参数:

我们感兴趣的是 per_page(在此我们可以定义每次调用端点时返回的活动数量)和 after(在此我们可以告诉 API 仅返回在提供的纪元时间戳之后的结果)。

假设我们的活动数量超过了单次 API 请求所能返回的数量。我们需要进行分页以检索所有活动并将其导入 Neo4j。

在对 API 进行分页之前,我们先学习如何将一页的活动导入到 Neo4j 中。以下查询将返回从最早时间戳开始的活动:

WITH 0 AS after

WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value

CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
    run.startDate = datetime(value.start_date_local),
    run.elapsedTime = duration({seconds: value.elapsed_time})

我们为每个活动创建一个标签为 Run 的节点,并设置一些属性。对于本例,最有趣的是 startDate,稍后我们将把它传递给 after 参数。

此查询将加载前 30 个活动,但如果我们想获取接下来的 30 个怎么办?我们可以修改查询的第一行,找到我们任何 Run 节点中的最新时间戳,然后将其传递给 API。如果没有 Run 节点,我们可以像下面的查询那样使用 0 值。

OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after

WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value

CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
    run.startDate = datetime(value.start_date_local),
    run.elapsedTime = duration({seconds: value.elapsed_time})

我们可以继续手动运行此查询,但现在是时候将其自动化了。

API 自动分页

一种方法是使用脚本语言并创建一个循环,在循环中重复调用该端点,直到没有活动可以检索为止。如果我们稍微发挥一下创造力,可以使用 apoc.periodic.commit 过程实现相同的结果。

根据 APOC 文档,这是定期迭代过程的描述:

它非常适合在单独的事务中重复运行查询,直到不再处理或生成任何结果。因此,您可以批量迭代那些不满足条件的元素,并对它们进行更新,以便之后满足条件。

在我们的例子中,退出条件是当我们从 API 收到的活动少于 30 个时。让我们首先更新我们的查询,如果返回的活动少于 30 个,则返回 0;如果是 30 个,则返回实际数量。

OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after

WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value

CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
    run.startDate = datetime(value.start_date_local),
    run.elapsedTime = duration({seconds: value.elapsed_time})

RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count

现在剩下的就是用定期提交 (periodic commit) 包装整个过程。我们使用两个参数调用 apoc.periodic.commit 方法:

  • 第一个是 Cypher 语句,运行该语句直到 RETURN 子句返回 0;

  • 第二个是传递给 Cypher 语句的参数。

call apoc.periodic.commit("
  OPTIONAL MATCH (run:Run)
  WITH run ORDER BY run.startDate DESC LIMIT 1
  WITH coalesce(run.startDate.epochSeconds, 0) AS after

  WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
  CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
  YIELD value

  CREATE (run:Run {id: value.id})
  SET run.distance = toFloat(value.distance),
      run.startDate = datetime(value.start_date_local),
      run.elapsedTime = duration({seconds: value.elapsed_time})

  RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
", {stravaToken: $stravaToken})

此查询会向 API 发送多个提交,直到我们加载了所有的活动。