Databricks 快速入门

本页面包含有关第三方平台使用说明的内容,该平台可能会发生我们无法控制的更改。如有疑问,请参阅第三方平台文档。

先决条件

  • Databricks 工作区必须可在类似以下 URL 上使用 https://dbc-xxxxxxxx-yyyy.cloud.databricks.com

设置计算集群

  1. 创建一个计算集群,使用 Single user 访问模式、Unrestricted 策略,以及您偏好的 Scala 运行时。

    当前不支持共享访问模式。

  2. 集群可用后,打开其页面并选择 Libraries(库)选项卡。

  3. 选择 Install new(安装新库),并将库来源选为 Maven

  4. 选择 Search Packages(搜索包),搜索 neo4j-spark-connector(来自 neo4j 组织的 Spark Packages)或 neo4j-connector-apache-spark(Maven Central,来自 org.neo4j 组 ID),随后 Select(选择)最新发布的版本。

    请确保通过匹配 Scala 版本与集群运行时来选择正确的连接器版本。

  5. 点击 Install(安装)。

Unity Catalog(统一目录)

Neo4j 仅在 Single user(单用户)访问模式下支持 Unity Catalog。有关详细信息,请参阅 Databricks 文档

会话配置

您可以按以下步骤在运行笔记本的集群上设置 Spark 配置:

  1. 打开集群配置页面。

  2. Configuration(配置)下,选择 Advanced Options(高级选项)切换开关。

  3. 选择 Spark 选项卡。

例如,您可以在文本区域中添加 Neo4j Bearer 认证配置,如下所示:

Bearer 认证示例
neo4j.url neo4j://<host>:<port>
neo4j.authentication.type bearer
neo4j.authentication.bearer.token <token>

Databricks 建议不要以明文形式存储密码和令牌等密钥。更安全的做法是使用 密钥

认证方法

支持 Neo4j Java Driver(4.4 及更高版本)所支持的所有认证方法。

有关认证配置的更多细节,请参阅 Neo4j 驱动选项

设置密钥

您可以通过 Databricks CLI 使用 Secrets API 添加密钥到您的环境中。如果您使用 15.0 或以上版本的 Databricks Runtime,也可以直接 从笔记本终端添加密钥。

设置密钥后,您可以在 Databricks 笔记本中使用 Databricks Utilities (dbutils) 访问它们。例如,给定一个 neo4j 范围以及用于基本认证的 usernamepassword 密钥,您可以在 Python 笔记本中这样做:

from pyspark.sql import SparkSession

url = "neo4j+s://xxxxxxxx.databases.neo4j.io"
username = dbutils.secrets.get(scope="neo4j", key="username")
password = dbutils.secrets.get(scope="neo4j", key="password")
dbname = "neo4j"

spark = (
    SparkSession.builder.config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()
)

Delta 表

您可以使用 Spark 连接器在 Databricks 笔记本中读取和写入 Delta 表,无需任何额外设置。

基本往返

下面的示例展示了如何读取 Delta 表、将其写入 Neo4j 作为节点和节点属性、再从 Neo4j 读取相应的节点和属性,并将它们写入新的 Delta 表。

Delta 表内容

该示例假设已有一个名为 users_example 的 Delta 表,且表中包含以下数据:

表 1. users_example
名称 (name) surname age

John

Doe

42

Jane

Doe

40

# Read the Delta table
tableDF = spark.read.table("users_example")

# Write the DataFrame to Neo4j as nodes
(
    tableDF
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":User")
    .save()
)

# Read the nodes with `:User` label from Neo4j
neoDF = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":User")
    .load()
)

# Write the DataFrame to another Delta table,
# which will contain the additional columns
# `<id>` and `<labels>`
neoDF.write.saveAsTable("users_new_example")

Delta 表 → Neo4j 节点和关系

为避免死锁,在将关系写入 Neo4j 之前请始终使用单分区(使用 coalesce(1)repartition(1))。

下面的示例展示了如何读取 Delta 表并将其数据同时写入 Neo4j 作为节点和关系。有关使用 Overwrite 模式以及仅写入节点的详细信息,请参阅 写入 页面。

Delta 表内容

该示例假设已有一个名为 customers_products_example 的 Delta 表,且表中包含以下数据:

表 2. customers_products_example
名称 (name) surname customerID product quantity order

John

Doe

1

Product 1

200

ABC100

Jane

Doe

2

Product 2

100

ABC200

# Read the Delta table into a DataFrame
relDF = spark.read.table("customers_products_example")

# Write the table to Neo4j using the
# `relationship` write option
(
    relDF
    # Use a single partition
    .coalesce(1)
    .write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map DataFrame columns to source node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map DataFrame columns to target node properties
    .option("relationship.target.node.properties", "product:name")
    # Map DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)

Neo4j 节点 → Delta 表

下面的示例展示了如何从 Neo4j 读取节点并将其写入 Delta 表。有关读取关系的详细信息,请参阅 读取 页面。

# Read the nodes with `:Customer` label from Neo4j
df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Customer")
    .load()
)

# Write the DataFrame to another Delta table
df.write.saveAsTable("customers_status_example")