从 REST API 导入 JSON 数据到 Neo4j
本文展示了一些将基于 JSON 的 REST API 数据加载到 Neo4j 中的技术。
将 JSON 数据导入 Neo4j
有大量的基于 JSON 的 Web API 可以导入到 Neo4j 中,我们可以使用 Load JSON 过程之一从这些 API 中检索数据,并将其转换为 Cypher® 可以使用的映射值 (map values)。
APOC 用户指南提供了一个示例,展示了如何将 StackOverflow 数据导入 Neo4j。
Strava API
Strava 是跑步者和骑行者用来记录活动并与朋友分享的应用程序。用户可以通过基于 JSON 的 REST API 获取这些数据。
在开始调用 API 之前,我们需要创建一个应用程序。随后我们将获得一个访问令牌,在所有对 API 的请求中都需要使用该令牌。
我们可以通过运行以下命令在 Neo4j Browser 或 Cypher Shell 中创建参数:
:params {stravaToken: "Bearer <insert-strava-token>"}
|
别忘了将 |
使用分页端点
我们有兴趣导入已登录运动员的活动。该端点采用以下参数:
我们感兴趣的是 per_page(在此我们可以定义每次调用端点时返回的活动数量)和 after(在此我们可以告诉 API 仅返回在提供的纪元时间戳之后的结果)。
假设我们的活动数量超过了单次 API 请求所能返回的数量。我们需要进行分页以检索所有活动并将其导入 Neo4j。
在对 API 进行分页之前,我们先学习如何将一页的活动导入到 Neo4j 中。以下查询将返回从最早时间戳开始的活动:
WITH 0 AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
我们为每个活动创建一个标签为 Run 的节点,并设置一些属性。对于本例,最有趣的是 startDate,稍后我们将把它传递给 after 参数。
此查询将加载前 30 个活动,但如果我们想获取接下来的 30 个怎么办?我们可以修改查询的第一行,找到我们任何 Run 节点中的最新时间戳,然后将其传递给 API。如果没有 Run 节点,我们可以像下面的查询那样使用 0 值。
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
我们可以继续手动运行此查询,但现在是时候将其自动化了。
API 自动分页
一种方法是使用脚本语言并创建一个循环,在循环中重复调用该端点,直到没有活动可以检索为止。如果我们稍微发挥一下创造力,可以使用 apoc.periodic.commit 过程实现相同的结果。
根据 APOC 文档,这是定期迭代过程的描述:
在我们的例子中,退出条件是当我们从 API 收到的活动少于 30 个时。让我们首先更新我们的查询,如果返回的活动少于 30 个,则返回 0;如果是 30 个,则返回实际数量。
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
现在剩下的就是用定期提交 (periodic commit) 包装整个过程。我们使用两个参数调用 apoc.periodic.commit 方法:
-
第一个是 Cypher 语句,运行该语句直到
RETURN子句返回 0; -
第二个是传递给 Cypher 语句的参数。
call apoc.periodic.commit("
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
", {stravaToken: $stravaToken})
此查询会向 API 发送多个提交,直到我们加载了所有的活动。