选项与配置

使用连接器时,任何有效的 Neo4j 驱动程序选项都可以通过 Spark 中的 option 方法进行设置,如下所示

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "neo4j://:7687")
        .option("authentication.type", "basic")
        .option("authentication.basic.username", "myuser")
        .option("authentication.basic.password", "neo4jpassword")
        .option("labels", "Person")
        .load()

或者,您可以在 Spark Session 中指定全局配置,以避免每次都重复输入连接选项。您可以设置任何 Neo4j 连接器选项,只需在前面加上 neo4j. 前缀即可。

例如,如果您想在会话中设置 authentication.type 选项,则必须输入 neo4j.authentication.type。以下是一个完整示例

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder()
    .config("neo4j.url", "neo4j://:7687")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "myuser")
    .config("neo4j.authentication.basic.password", "neo4jpassword")
    .getOrCreate()

val dfPerson = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Person")
        .load()

val dfProduct = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Product")
        .load()

自定义身份验证提供程序

从 5.3.9 版本开始,连接器提供了一个选项,除了支持 none(无)、basic(基本)、kerberoscustom(自定义)和 bearer(令牌)等现有身份验证类型外,还可以使用自定义身份验证提供程序。

身份验证提供程序需要实现 org.neo4j.connectors.authn.AuthenticationTokenSupplierFactory 接口(来自 org.neo4j.connectors:commons-authn-spi 库)。该接口要求实现两个方法

  • String getName() - 返回提供程序的名称。必须是唯一的,且不能与现有的身份验证类型冲突。

  • Supplier<AuthenticationToken> create(String, String, Map<String, String>) - 接收用户名、密码和附加参数,并返回一个 AuthenticationToken 的提供程序。

AuthenticationToken 实例是对提供给 Neo4j 服务器的任意身份验证的抽象表示,它可以包含主体及其凭据(用户名和密码)、令牌或其他确认身份的方式。

该接口本身提供了以下工厂方法

  • AuthenticationToken#bearer

  • AuthenticationToken#kerberos

  • AuthenticationToken#none

  • AuthenticationToken#usernameAndPassword

  • AuthenticationToken#custom

身份验证提供程序的典型用例是支持由 OAuth 2.0 或 OIDC 提供程序颁发的过期令牌。

自定义身份验证提供程序可以通过服务加载器机制使用,并将配置参数 authentication.type 设置为提供程序的名称。附加参数可以指定为带有 authentication.$name. 前缀的配置选项。

Keycloak 身份验证提供程序示例

此提供程序基于 Keycloak 客户端,需要 Java 11 或更高版本。

Keycloak 的身份验证提供程序作为 org.neo4j.connectors:commons-authn-keycloak 库的一部分提供。

它包含

  • 一个用于创建身份验证令牌提供程序实例的工厂类

    package org.neo4j.connectors.authn.keycloak;
    
    import java.util.Map;
    import java.util.Objects;
    import java.util.function.Supplier;
    import org.apache.http.impl.client.HttpClients;
    import org.keycloak.authorization.client.Configuration;
    import org.neo4j.connectors.authn.AuthenticationToken;
    import org.neo4j.connectors.authn.AuthenticationTokenSupplierFactory;
    
    
    public class KeycloakOIDCAuthenticationSupplierFactory implements AuthenticationTokenSupplierFactory {
    
        @Override
        public String getName() {
            return "keycloak";
        }
    
        @Override
        public Supplier<AuthenticationToken> create(String username, String password, Map<String, String> parameters) {
            var url = Objects.requireNonNull(parameters.get("authServerUrl"));
            var realm = Objects.requireNonNull(parameters.get("realm"));
            var clientId = Objects.requireNonNull(parameters.get("clientId"));
            var clientSecret = Objects.requireNonNull(parameters.get("clientSecret"));
    
            return new KeycloakOIDCAuthenticationSupplier(
                    username,
                    password,
                    new Configuration(url, realm, clientId, Map.of("secret", clientSecret), HttpClients.createMinimal()));
        }
    
    }
  • 一个从 Keycloak 服务器收集访问令牌并在过期时进行刷新的提供程序

    import com.fasterxml.jackson.jr.ob.JSON;
    import java.io.IOException;
    import java.io.UncheckedIOException;
    import java.time.Instant;
    import java.util.Base64;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.function.Supplier;
    import org.keycloak.authorization.client.AuthzClient;
    import org.keycloak.authorization.client.Configuration;
    import org.keycloak.authorization.client.util.Http;
    import org.keycloak.representations.AccessTokenResponse;
    import org.neo4j.connectors.authn.AuthenticationToken;
    
    public class KeycloakOIDCAuthenticationSupplier implements Supplier<AuthenticationToken> {
        private final String username;
        private final String password;
        private final Configuration config;
        private final AuthzClient client;
        private final Http http;
        private final String url;
        private final AtomicReference<AuthenticationTokenAndTime> token = new AtomicReference<>();
    
        KeycloakOIDCAuthenticationSupplier(String username, String password, Configuration config) {
            this.username = username;
            this.password = password;
            this.config = config;
            this.client = AuthzClient.create(config);
            this.url = constructUrl(config);
            this.http = new Http(config, config.getClientCredentialsProvider());
        }
    
        public static Supplier<AuthenticationToken> of(String username, String password, Configuration config) {
            return new KeycloakOIDCAuthenticationSupplier(username, password, config);
        }
    
        private String constructUrl(Configuration config) {
            return config.getAuthServerUrl() + "/realms/" + config.getRealm() + "/protocol/openid-connect/token";
        }
    
        public boolean currentTokenIsExpired() {
            return token.get() == null || token.get().expireAt.isBefore(Instant.now());
        }
    
        @Override
        public AuthenticationToken get() {
            AuthenticationTokenAndTime freshToken = this.token.updateAndGet(this::get0);
            return freshToken.toAuthenticationToken();
        }
    
        private AuthenticationTokenAndTime get0(AuthenticationTokenAndTime previous) {
            if (previous == null) {
                return fetch();
            } else {
                return refresh(previous.refreshToken);
            }
        }
    
        private AuthenticationTokenAndTime fetch() {
            try {
                AccessTokenResponse response = this.client.obtainAccessToken(this.username, this.password);
                return AuthenticationTokenAndTime.of(response);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    
        private AuthenticationTokenAndTime refresh(String refreshToken) {
            try {
                AccessTokenResponse response = this.http
                        .<AccessTokenResponse>post(this.url)
                        .authentication()
                        .client()
                        .form()
                        .param("grant_type", "refresh_token")
                        .param("refresh_token", refreshToken)
                        .param("client_id", this.config.getResource())
                        .param("client_secret", (String)
                                this.config.getCredentials().get("secret"))
                        .response()
                        .json(AccessTokenResponse.class)
                        .execute();
                return AuthenticationTokenAndTime.of(response);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    
        private static final class AuthenticationTokenAndTime {
            static final Base64.Decoder DECODER = Base64.getDecoder();
            private final String token;
            private final Instant expireAt;
            private final String refreshToken;
    
            public AuthenticationTokenAndTime(String token, Instant expireAt, String refreshToken) {
                this.token = token;
                this.expireAt = expireAt;
                this.refreshToken = refreshToken;
            }
    
            static AuthenticationTokenAndTime of(AccessTokenResponse accessTokenResponse) throws IOException {
                String token = accessTokenResponse.getToken();
                String[] chunks = token.split("\\.");
                Map<String, Object> payload = JSON.std.mapFrom(DECODER.decode(chunks[1]));
                long epoch = ((Number) payload.get("exp")).longValue();
                Instant expireAt = Instant.ofEpochSecond(epoch);
    
                return new AuthenticationTokenAndTime(token, expireAt, accessTokenResponse.getRefreshToken());
            }
    
            AuthenticationToken toAuthenticationToken() {
                return AuthenticationToken.bearer(token, expireAt);
            }
        }
    }

您可以按如下方式配置 Spark 以使用 Keycloak 身份验证

val df = spark.read
  .format("org.neo4j.spark.DataSource")
  .option("url", s"$NEO4J_URL")
  .option("authentication.type", "keycloak")
  .option("authentication.keycloak.username", s"$KEYCLOAK_USERNAME")
  .option("authentication.keycloak.password", s"$KEYCLOAK_PASSWORD")
  .option("authentication.keycloak.authServerUrl", s"$KEYCLOAK_URL")
  .option("authentication.keycloak.realm", s"$KEYCLOAK_REALM")
  .option("authentication.keycloak.clientId", s"$KEYCLOAK_CLIENT_ID")
  .option("authentication.keycloak.clientSecret", s"$KEYCLOAK_CLIENT_SECRET")
  .option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.age as age")
  .load()

Neo4j 驱动程序选项

在底层,Spark 连接器使用了官方 Neo4j Java 驱动程序。在许多情况下,您需要控制驱动程序选项,以适应您的 Neo4j 生产部署及其通信方式。您可以使用上述 options 示例来完成此操作。

下表列出了与 Neo4j 驱动程序一起使用的最常见配置设置。有关 Neo4j 驱动程序所有可能配置选项的完整文档,请参阅 Neo4j Java 驱动程序手册

表 1. 可用配置设置列表
设置名称 描述 默认值 必填

驱动程序选项

url

要连接的 Neo4j 实例的 URL。

当提供以逗号分隔的 URI 列表时,将激活驱动程序的解析器函数功能。第一个 URI 将用作原始主机,其余的将被视为解析器函数的输出。

(无)

authentication.type

要使用的身份验证方法

有关更多信息,请参阅 身份验证

basic

authentication.basic.username

用于基本身份验证类型的用户名

(Neo4j 驱动程序默认值)

authentication.basic.password

用于基本身份验证类型的用户名

(Neo4j 驱动程序默认值)

authentication.kerberos.ticket

Kerberos 身份验证票据

(Neo4j 驱动程序默认值)

authentication.custom.principal

用于标识此令牌代表谁

(Neo4j 驱动程序默认值)

authentication.custom.credentials

这是验证主体的凭据

(Neo4j 驱动程序默认值)

authentication.custom.realm

这是指定身份验证提供程序的“领域 (realm)”字符串

(Neo4j 驱动程序默认值)

authentication.bearer.token

这是为 bearer 身份验证方案提供的令牌

(Neo4j 驱动程序默认值)

encryption.enabled

指定是否应启用加密。如果您使用带有 +s+ssc 的 URI 方案,此设置将被忽略

false

encryption.trust.strategy

设置证书信任策略,如果连接 URI 使用 +s+ssc 作为后缀,则忽略此设置。可用值为

  • TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

  • TRUST_CUSTOM_CA_SIGNED_CERTIFICATES

  • TRUST_ALL_CERTIFICATES.

(Neo4j 驱动程序默认值)

encryption.ca.certificate.path

TRUST_CUSTOM_CA_SIGNED_CERTIFICATES 信任策略设置证书路径

(Neo4j 驱动程序默认值)

connection.max.lifetime.msecs

连接生存期(毫秒)

(Neo4j 驱动程序默认值)

connection.liveness.timeout.msecs

活性检查超时(毫秒)

(Neo4j 驱动程序默认值)

connection.acquisition.timeout.msecs

连接获取超时(毫秒)

(Neo4j 驱动程序默认值)

connection.timeout.msecs

连接超时(毫秒)

(Neo4j 驱动程序默认值)

db.transaction.timeout

事务超时(毫秒)

(Neo4j 驱动程序默认值)

会话选项

database

要连接的数据库名称。驱动程序允许在 URL 中定义数据库,但如果您设置此选项,它将优先于 URL 中定义的数据库。

(Neo4j 驱动程序默认值)

access.mode

可能的值为

  • write

仅在从 Neo4j 拉取数据时使用。如果是 read,则集群环境中的连接器会将请求路由到从节点(follower),否则路由到主节点(leader)。

多重连接

适用于 Apache Spark 的 Neo4j 连接器允许您在单个 Spark Session 中使用多个连接。例如,您可以在同一个会话中从一个数据库读取数据并将其写入另一个数据库。

从一个数据库读取并写入另一个数据库
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://first.host.com:7687")
  .option("labels", "Person")
  .load()

df.write.format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://second.host.com:7687")
  .option("labels", "Person")
  .save()

使用多重连接的另一个场景是当您想要合并两个数据源时。

合并两个数据库的数据
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val dfOne = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://first.host.com:7687")
  .option("labels", "Person")
  .load()

val dfTwo = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://second.host.com:7687")
  .option("labels", "Person")
  .load()

val dfJoin = dfOne.join(dfTwo, dfOne("name") === dfTwo("name"))