Azure Synapse Analytics

Azure Synapse Analytics(前身为 SQL 数据仓库)是一种基于云的企业级数据仓库,利用大规模并行处理(MPP)技术能够在数拍字节的数据上快速运行复杂查询。

先决条件

您需要一个已启动的 Azure Synapse Analytics 实例。如果您还没有,可以从此处创建。

依赖

Azure Synapse Analytics 仅通过 Databricks Runtime 中的 Spark 工作,因为所需的连接器尚未公开发布。

Authentication(身份验证)

Azure Synapse 连接器使用三种网络连接类型

  • Spark 驱动程序到 Azure Synapse

  • Spark 驱动程序和执行程序到 Azure 存储账户

  • Azure Synapse 到 Azure 存储账户

为了选择最适合您使用场景的身份验证方法,建议查看官方Azure Synapse 文档

从 Azure Synapse Analytics 到 Neo4j

根据您选择的身份验证方法,以下示例展示如何将 Azure Synapse Analytics 表中的数据摄入为 Neo4j 节点。

// Step (1)
// Load a table into a Spark DataFrame
val azureDF: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .load()

// Step (2)
// Save the `azureDF` as nodes with labels `Person` and `Customer` into Neo4j
azureDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save()
# Step (1)
# Load a table into a Spark DataFrame
azureDF = (spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .load())

# Step (2)
# Save the `azureDF` as nodes with labels `Person` and `Customer` into Neo4j
(azureDF.write
  .format("org.neo4j.spark.DataSource")
  .mode("ErrorIfExists")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save())

从 Neo4j 到 Azure Synapse Analytics

根据您选择的身份验证方法,以下示例展示如何将 Neo4j 中的数据摄入到 Azure Synapse Analytics 表中。

// Step (1)
// Load `:Person:Customer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into Azure Synapse Analytics
neo4jDF.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .save()
# Step (1)
# Load `:Person:Customer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into Azure Synapse Analytics
(neo4jDF.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .save())