Spark 结构化流 (Structured Streaming)
让我们看看如何通过 Apache Spark 的 Neo4j 连接器使用 Spark Structured Streaming API。
尽管连接器相同,但 Spark 流处理与 Spark 批处理的工作方式不同。以下是一些了解 Spark 流处理方法的链接:
-
来自 Spark 官网的 Structured Streaming 编程指南。
-
由 Databricks 撰写的 深入了解 Apache Spark 流处理执行模型。
Neo4j 流处理选项
| 设置名称 | 描述 | 默认值 | 必填 |
|---|---|---|---|
Sink(接收器) |
|||
|
检查点文件位置(查看更多)。 |
(无) |
是 |
源 (Source) |
|||
|
用于批量读取的时间戳属性。点击此处了解更多。 |
(无) |
是 |
|
此选项用于告知连接器从何处向流发送数据。点击此处了解更多。
|
|
是 |
|
一个返回 long 值的有效 Cypher® READ_ONLY(只读)查询。 (例如: 用于获取给定查询在数据库中的最后一个时间戳。有关此内容的更多信息,请参阅此处。 |
(无) |
是,仅适用于 |
Sink(接收器)
将流写入 Neo4j 实例非常简单,可以使用三种写入策略中的任何一种来完成。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("subscribe", "PeopleTopic") \
.load()
query = df.writeStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://:7687") \
.option("save.mode", "ErrorIfExists") \
.option("checkpointLocation", "/tmp/checkpoint/myCheckPoint") \
.option("labels", "Person") \
.option("node.keys", "value") \
.start()
如前所述,您可以使用任何写入策略:节点 (Node)、关系 (Relationship) 或 查询 (Query)。
唯一的区别是您必须设置 checkpointLocation 和 save.mode 选项。
使用 save.mode,您可以控制数据的写入方式。更多信息请查看此处。
检查点 (Checkpoint)
检查点是一个允许 Spark Structured Streaming 从故障中恢复的文件。Spark 使用进度信息更新此文件,并在发生故障或查询重启时从该点恢复。此检查点位置必须是 HDFS 兼容文件系统中的路径。
由于该主题广泛且复杂,您可以阅读官方 Spark 文档。
Source(源)
从 Neo4j 读取流需要一些额外的配置。
让我们先看代码,然后分析所有选项。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://:7687") \
.option("labels", "Person") \
.option("streaming.property.name", "timestamp") \
.option("streaming.from", "NOW") \
.load()
# Memory streaming format writes the streamed data to a SparkSQL table
# NOTE: make sure this code is executed in another block,
# or at least seconds later the previous one to allow the full initialization of the stream.
# The risk is that the query will return an empty result.
query = stream.writeStream \
.format("memory") \
.queryName("testReadStream") \
.start()
spark \
.sql("select * from testReadStream order by timestamp") \
.show()
流处理属性名称
为了使流处理正常工作,您需要每条记录都有一个 timestamp 类型的属性,以便在从 Neo4j 读取新数据并发送到流时利用该属性。
在底层,连接器会构建一个带有 WHERE 子句的查询,该子句检查记录的 [timestampProperty] 是否在根据检查点数据和数据库中可用最新偏移量计算出的时间戳范围内。
因此,要求每个节点都必须具有 Neo4j 类型 (Long) 的时间戳属性,并且它必须不为 null。
| 像 "2021-08-11" 这样的字符串类型属性是不行的。它必须是 Neo4j 的 Long 类型。 |
属性名称可以是任何名称,只需记得相应地设置 streaming.property.name 即可。
Streaming from 选项
您可以决定流式传输数据库中的所有数据,还是仅流式传输新数据。为此,您可以将 streaming.from 选项设置为以下两个值之一:
-
NOW:从当前时间戳开始读取。这是streaming.from选项的默认值。 -
ALL:先读取数据库中的所有数据,然后再读取新数据。
读取模式
与 Sink 模式一样,您可以使用任何读取策略:节点 (Node)、关系 (Relationship) 或 查询 (Query)。
关于 query(查询)模式的说明
当使用查询模式时,处理 streaming.from 和 streaming.property.name 的自动化程度较低。
让我们看一下示例,然后解释发生了什么。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://:7687") \
.option("streaming.from", "NOW") \
.option("streaming.property.name", "timestamp") \
.option("query", \
"""MATCH (p:Test3_Person)
WHERE p.timestamp > $stream.from AND p.timestamp <= $stream.to
RETURN p.age AS age, p.timestamp AS timestamp""") \
.option("streaming.query.offset", \
"MATCH (p:Test3_Person) RETURN max(p.timestamp)") \
.load()
如您所见,无论如何都必须指定 streaming.from 和 streaming.property.name,但您需要自己处理 WHERE 子句。查询中提供了两个参数 $stream.to 和 $stream.from,它们描述了我们需要读取的更改范围。
|
尽管查询中仍支持现已弃用的查询参数 |
在这种情况下,streaming.query.offset 选项是强制性的;连接器使用此选项读取数据库中的最后一个时间戳,其结果用于计算要选择的范围。
其他示例
您可以在此存储库中找到流处理代码片段和许多其他包含 Zeppelin 笔记本的示例。
文章 From Kinesis via Spark to Neo4j 描述了一个使用 Spark、Neo4j 和 AWS Kinesis 的完整示例。