Databricks 快速入门
|
本页面包含有关第三方平台使用说明的内容,该平台可能会发生我们无法控制的更改。如有疑问,请参阅第三方平台文档。 |
设置计算集群
-
创建一个计算集群,使用
Single user访问模式、Unrestricted策略,以及您偏好的 Scala 运行时。当前不支持共享访问模式。
-
集群可用后,打开其页面并选择 Libraries(库)选项卡。
-
选择 Install new(安装新库),并将库来源选为 Maven。
-
选择 Search Packages(搜索包),搜索
neo4j-spark-connector(来自neo4j组织的 Spark Packages)或neo4j-connector-apache-spark(Maven Central,来自org.neo4j组 ID),随后 Select(选择)最新发布的版本。请确保通过匹配 Scala 版本与集群运行时来选择正确的连接器版本。
-
点击 Install(安装)。
Unity Catalog(统一目录)
Neo4j 仅在 Single user(单用户)访问模式下支持 Unity Catalog。有关详细信息,请参阅 Databricks 文档。
会话配置
您可以按以下步骤在运行笔记本的集群上设置 Spark 配置:
-
打开集群配置页面。
-
在 Configuration(配置)下,选择 Advanced Options(高级选项)切换开关。
-
选择 Spark 选项卡。
例如,您可以在文本区域中添加 Neo4j Bearer 认证配置,如下所示:
neo4j.url neo4j://<host>:<port>
neo4j.authentication.type bearer
neo4j.authentication.bearer.token <token>
|
Databricks 建议不要以明文形式存储密码和令牌等密钥。更安全的做法是使用 密钥。 |
认证方法
支持 Neo4j Java Driver(4.4 及更高版本)所支持的所有认证方法。
有关认证配置的更多细节,请参阅 Neo4j 驱动选项。
设置密钥
您可以通过 Databricks CLI 使用 Secrets API 添加密钥到您的环境中。如果您使用 15.0 或以上版本的 Databricks Runtime,也可以直接 从笔记本终端添加密钥。
设置密钥后,您可以在 Databricks 笔记本中使用 Databricks Utilities (dbutils) 访问它们。例如,给定一个 neo4j 范围以及用于基本认证的 username 和 password 密钥,您可以在 Python 笔记本中这样做:
from pyspark.sql import SparkSession
url = "neo4j+s://xxxxxxxx.databases.neo4j.io"
username = dbutils.secrets.get(scope="neo4j", key="username")
password = dbutils.secrets.get(scope="neo4j", key="password")
dbname = "neo4j"
spark = (
SparkSession.builder.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
)
Delta 表
您可以使用 Spark 连接器在 Databricks 笔记本中读取和写入 Delta 表,无需任何额外设置。
基本往返
下面的示例展示了如何读取 Delta 表、将其写入 Neo4j 作为节点和节点属性、再从 Neo4j 读取相应的节点和属性,并将它们写入新的 Delta 表。
Delta 表内容
该示例假设已有一个名为 users_example 的 Delta 表,且表中包含以下数据:
| 名称 (name) | surname | age |
|---|---|---|
John |
Doe |
42 |
Jane |
Doe |
40 |
# Read the Delta table
tableDF = spark.read.table("users_example")
# Write the DataFrame to Neo4j as nodes
(
tableDF
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":User")
.save()
)
# Read the nodes with `:User` label from Neo4j
neoDF = (
spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":User")
.load()
)
# Write the DataFrame to another Delta table,
# which will contain the additional columns
# `<id>` and `<labels>`
neoDF.write.saveAsTable("users_new_example")
Delta 表 → Neo4j 节点和关系
|
为避免死锁,在将关系写入 Neo4j 之前请始终使用单分区(使用 |
下面的示例展示了如何读取 Delta 表并将其数据同时写入 Neo4j 作为节点和关系。有关使用 Overwrite 模式以及仅写入节点的详细信息,请参阅 写入 页面。
Delta 表内容
该示例假设已有一个名为 customers_products_example 的 Delta 表,且表中包含以下数据:
| 名称 (name) | surname | customerID | product | quantity | order |
|---|---|---|---|---|---|
John |
Doe |
1 |
Product 1 |
200 |
ABC100 |
Jane |
Doe |
2 |
Product 2 |
100 |
ABC200 |
# Read the Delta table into a DataFrame
relDF = spark.read.table("customers_products_example")
# Write the table to Neo4j using the
# `relationship` write option
(
relDF
# Use a single partition
.coalesce(1)
.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Map DataFrame columns to source node properties
.option("relationship.source.node.properties", "name,surname,customerID:id")
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
# Map DataFrame columns to target node properties
.option("relationship.target.node.properties", "product:name")
# Map DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
Neo4j 节点 → Delta 表
下面的示例展示了如何从 Neo4j 读取节点并将其写入 Delta 表。有关读取关系的详细信息,请参阅 读取 页面。
# Read the nodes with `:Customer` label from Neo4j
df = (
spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Customer")
.load()
)
# Write the DataFrame to another Delta table
df.write.saveAsTable("customers_status_example")