加载 JDBC (RDBMS)

如果您想使用 DuckDB,应该从 Maven 仓库下载并导入驱动程序

数据集成是一个重要课题。从关系型数据库读取数据以创建和增强数据模型是一项非常有益的实践。

通过 apoc.load.jdbc,您可以访问任何提供 JDBC 驱动程序的数据库,并执行查询,将结果转换为数据行流。这些行随后可用于更新或创建图结构。

apoc jdbc northwind load

为了简化 JDBC URL 语法并保护凭据,您可以在 conf/apoc.conf 中配置别名

apoc.jdbc.myDB.url=jdbc:derby:derbyDB
CALL apoc.load.jdbc('jdbc:derby:derbyDB','PERSON')

变为

CALL apoc.load.jdbc('myDB','PERSON')

apoc.jdbc.<alias>.url= 中的第三个值有效地定义了将在 apoc.load.jdbc('<alias>',…​. 中使用的别名。

MySQL 示例

Northwind 是关系型数据库的常见示例集,我们的导入指南中也涵盖了该示例,例如 Neo4j 浏览器中的 :play northwind graph

MySQL Northwind 数据

select count(*) from products;
表 1. 结果
count(*)

77

describe products;
表 2. 结果
字段 类型 Null 默认 Extra

ProductID

int(11)

NO

PRI

NULL

auto_increment

ProductName

varchar(40)

NO

MUL

NULL

SupplierID

int(11)

YES

MUL

NULL

CategoryID

int(11)

YES

MUL

NULL

QuantityPerUnit

varchar(20)

YES

NULL

UnitPrice

decimal(10,4)

YES

0.0000

UnitsInStock

smallint(2)

YES

0

UnitsOnOrder

smallint(2)

YES

0

ReorderLevel

smallint(2)

YES

0

Discontinued

bit(1)

NO

b'0'

加载 JDBC 示例

加载 JDBC 驱动程序
CALL apoc.load.driver("com.mysql.jdbc.Driver");
计算 products 表中的行数
WITH "jdbc:mysql://:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN count(*);
表 3. 结果
count(*)

77

返回 products 表中的行
WITH "jdbc:mysql://:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN row limit 1;
表 4. 结果
row

{UnitPrice → 18.0000, UnitsOnOrder → 0, CategoryID → 1, UnitsInStock → 39}

apoc load jdbc

以事务批次加载数据

您可以从 JDBC 加载数据,并使用查询结果以批处理(并行)方式创建/更新图。

CALL apoc.periodic.iterate(
  'CALL apoc.load.jdbc("jdbc:mysql://:3306/northwind?user=root","company")',
  'CREATE (p:Person) SET p += value',
  { batchSize:10000, parallel:true})
YIELD batches, total

Cassandra 示例

设置 Song 数据库作为初始数据集

curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/playlist.cql
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/artists.csv
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/songs.csv
$CASSANDRA_HOME/bin/cassandra
$CASSANDRA_HOME/bin/cqlsh -f playlist.cql

下载 Cassandra JDBC Wrapper,并将其放入 $NEO4J_HOME/plugins 目录。将此配置选项添加到 $NEO4J_HOME/conf/apoc.conf,以便更轻松地与 Cassandra 实例进行交互。

添加到 conf/apoc.conf
apoc.jdbc.cassandra_songs.url=jdbc:cassandra://:9042/playlist

重启服务器。

现在您可以使用以下命令检查 Cassandra 中的数据。

CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN count(*);
表 5. 结果
count(*)

3605

CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row.first_letter, row.artist
LIMIT 5;
表 6. 结果
row.first_letter row.artist

C

C.W. Stoneking

C

CH2K

C

CHARLIE HUNTER WITH LEON PARKER

C

Calvin Harris

C

Camané

让我们创建一些图数据,我们来看一下 track_by_artist 表,其中包含约 6 万条记录。

CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN count(*);
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row
LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row.track_id, row.track_length_in_seconds, row.track, row.music_file, row.genre, row.artist, row.starred
LIMIT 2;
表 7. 结果
row.track_id length row.track row.music_file row.genre row.artist row.starred

c0693b1e-0eaa-4e81-b23f-b083db303842

219

1913 Massacre

TRYKHMD128F934154C

folk

Woody Guthrie & Jack Elliott

false

7d114937-0bc7-41c7-8e0c-94b5654ac77f

178

Alabammy Bound

TRMQLPV128F934152B

folk

Woody Guthrie & Jack Elliott

false

让我们创建一些索引和约束,请注意这会删除其他索引和约束。

CALL apoc.schema.assert(
  {Track:['title','length']},
  {Artist:['name'],Track:['id'],Genre:['name']});
表 8. 结果
标签 (label) 键 (key) unique action(动作)

Track

标题

false

CREATED

Track

length

false

CREATED

Artist

名称 (name)

true

CREATED

Genre

名称 (name)

true

CREATED

Track

id

true

CREATED

CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
MERGE (a:Artist {name:row.artist})
MERGE (g:Genre {name:row.genre})
CREATE (t:Track {id:toString(row.track_id), title:row.track, length:row.track_length_in_seconds})
CREATE (a)-[:PERFORMED]->(t)
CREATE (t)-[:GENRE]->(g);
Added 63213 labels, created 63213 nodes, set 182413 properties, created 119200 relationships, statement executed in 40076 ms.

支持使用 Kerberos 认证的 Hive

对 Hive(尤其是使用 Kerberos)的支持更为复杂。

首先,所需的配置更为详尽,请务必获取以下信息

  • Kerberos 用户 / 密码

  • Kerberos 领域 (realm) / KDC

  • Hive 主机名 + 端口 (10000)

在已知位置创建此 login.conf 文件

login.conf
KerberosClient {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true debugNative=true;
};

将这些选项添加到您的 conf/apoc.conf

apoc.conf
dbms.jvm.additional=-Djava.security.auth.login.config=/path/to/login.conf
dbms.jvm.additional=-Djava.security.auth.login.config.client=KerberosClient
dbms.jvm.additional=-Djava.security.krb5.realm=KRB.REALM.COM
dbms.jvm.additional=-Djava.security.krb5.kdc=krb-kdc.host.com

与其他 JDBC 驱动程序不同,Hive 带有许多依赖项,您可以从 Hadoop 提供商处下载这些依赖项

或者从 Maven Central 获取它们。

版本可能会有所不同,请使用您的 Hive 驱动程序附带的版本。

  • hadoop-common-2.7.3.2.6.1.0-129.jar

  • hive-exec-1.2.1000.2.6.1.0-129.jar

  • hive-jdbc-1.2.1000.2.6.1.0-129.jar

  • hive-metastore-1.2.1000.2.6.1.0-129.jar

  • hive-service-1.2.1000.2.6.1.0-129.jar

  • httpclient-4.4.jar

  • httpcore-4.4.jar

  • libfb303-0.9.2.jar

  • libthrift-0.9.3.jar

现在,您可以从 APOC 使用如下的 JDBC URL。

这没有换行,只是因为它太长而自动换行了。

jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

然后调用

WITH 'jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject' AS url
CALL apoc.load.jdbc(url,'PRODUCTS')
YIELD row
RETURN row.name, row.price;

您也可以将其作为键设置在 conf/apoc.conf

apoc.conf
apoc.jdbc.my-hive.url=jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

然后使用更简洁的调用

CALL apoc.load.jdbc('my-hive','SELECT * PRODUCTS');

加载 JDBC - 资源

要使用其他 JDBC 驱动程序,请使用这些下载链接和 JDBC URL。将 JDBC 驱动程序放入 $NEO4J_HOME/plugins 目录,并使用 apoc.jdbc.<alias>.url=<jdbc-url>$NEO4J_HOME/conf/apoc.conf 中配置 JDBC-URL。

凭据可以通过两种方式传递

  • 放入 URL

CALL apoc.load.jdbc('jdbc:derby:derbyDB;user=apoc;password=Ap0c!#Db;create=true', 'PERSON')
  • 通过配置参数。

CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})

使用 Simba 驱动程序的 Google BigQuery 需要使用附加参数 'autoCommit',例如:

CALL apoc.load.jdbc('BigQuery', 'SELECT action_type FROM `patents-public-data.ebi_chembl.action_type` LIMIT 10', [], {autoCommit:true})
数据库 JDBC-URL 驱动程序来源

MySQL

jdbc:mysql://<hostname>:<port/3306>/<database>?user=<user>&password=<pass>

Postgres

jdbc:postgresql://<hostname>/<database>?user=<user>&password=<pass>

Oracle

jdbc:oracle:thin:<user>/<pass>@<host>:<port>/<service_name>

MS SQLServer

jdbc:sqlserver://;servername=<servername>;databaseName=<database>;user=<user>;password=<pass>

IBM DB2

jdbc:db2://<host>:<port/5021>/<database>:user=<user>;password=<pass>;

Derby

jdbc:derby:derbyDB

JDK6-8 中已包含

Cassandra

jdbc:cassandra://<host>:<port/9042>/<database>

SAP Hana

jdbc:sap://<host>:<port/39015>/?user=<user>&password=<pass>

Apache Hive (带 Kerberos)

jdbc:hive2://username%40krb-realm:password@hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

Apache Hive 驱动程序 (Cloudera) (Hortonworks) 有多个 JAR 包 (hadoop-common-xxx.jar hive-exec-xxx.jar hive-jdbc-xxx.jar hive-metastore-xxx.jar hive-service-xxx.jar httpclient-4.4.jar httpcore-4.4.jar libfb303-0.9.2.jar libthrift-0.9.3.jar)

Google BigQuery

jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<Project ID>;OAuthType=0;OAuthServiceAcctEmail=<Service Account ID>;OAuthPvtKeyPath=/path/to/<Private Key>.json

有许多博客文章/示例详细介绍了 apoc.load.jdbc 的用法

加载 JDBC - 更新 (UPDATE)

jdbcUpdate 用于更新关系型数据库,通过带有可选参数的 SQL 语句实现

CALL apoc.load.jdbcUpdate(jdbc-url,statement, params, config)

有了这组数据,您可以以两种不同的模式调用该过程

MATCH (u:User)-[:BOUGHT]->(p:Product)<-[:BOUGHT]-(o:User)-[:BOUGHT]->(reco)
WHERE u <> o AND NOT (u)-[:BOUGHT]->(reco)
WITH u, reco, count(*) as score
WHERE score > 1000

您可以带参数调用该过程

CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(?,?,?)',[user.id, reco.id, score]);

您可以不带参数调用该过程

CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(user.id, reco.id, score)');

加载 JDBC 日期格式

从 Neo4j 3.4 开始,支持 时态值

如果加载操作返回的 JdbcType 为 TIMESTAMP 或 TIMESTAMP_WITH_TIMEZONE,您可以提供类型为 java.time.ZoneId 的配置参数 timezone

CALL apoc.load.jdbc('key or url','table or statement', config);

配置

Config 参数是可选的,默认值是一个空映射。

timezone

默认值:null

凭据

默认值:{}

示例

带时区
CALL apoc.load.jdbc(
  'jdbc:derby:derbyDB',
  'SELECT * FROM PERSON WHERE NAME = ?',['John'],
  {timezone: "Asia/Tokyo"})
2018-10-31T01:32:25.012+09:00[Asia/Tokyo]
带凭据
CALL apoc.load.jdbcUpdate('jdbc:derby:derbyDB','UPDATE PERSON SET NAME = ? WHERE NAME = ?',['John','John'],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})

JDBC 分析

您可以使用 apoc.jdbc.analytics(<cypherQuery>, <jdbcUrl>, <sqlQueryOverTemporaryTable>, <paramsList>, $config) 从 Cypher 查询创建临时表,并将复杂的分析委托给 JDBC URL 定义的数据库。

请注意,返回的 SQL 列名必须与 Cypher 查询提供的列名一致。

除了 apoc.load.jdbc 过程的配置外,apoc.jdbc.analytics 还提供了以下配置

名称 (name) description(描述) 默认值

tableName

临时表名称

neo4j_tmp_table

provider

SQL 提供程序,用于根据它处理数据类型,可能的值为 "POSTGRES"、"MYSQL" 和 "DEFAULT"

"DEFAULT"

batchSize

向 SQL 表插入数据的批处理大小

10000

writeMode

'CREATE'

如果为 'CREATE',则创建一个新的临时表。如果为 'APPEND',则重用现有表。

可以在配置参数中指定提供程序。

您可以使用一些节点重现以下查询

CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2000, population: 1005})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2010, population: 1065})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2020, population: 1158})
CREATE (:City {country: 'US', name: 'Seattle', year: 2000, population: 564})
CREATE (:City {country: 'US', name: 'Seattle', year:2010, population: 608})
CREATE (:City {country: 'US', name: 'Seattle', year: 2020, population: 738})
CREATE (:City {country: 'US', name: 'New York City', year: 2000, population: 8015})
CREATE (:City {country: 'US', name: 'New York City', year: 2010, population: 8175})
CREATE (:City {country: 'US', name: 'New York City', year: 2020, population: 8772})

DuckDB

获取当前行排名(带间隙)的示例。SQL 查询的字段应与 Cypher 查询一致。有关详细信息,请访问 https://duckdb.org.cn/docs/sql/functions/window_functions.html#rank

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  $url,
  "SELECT
     country,
     name,
     year,
     population,
     RANK() OVER (PARTITION BY country ORDER BY year DESC) AS rank
   FROM 'neo4j_tmp_table'
   ORDER BY rank, country, name;"
)

使用窗口函数获取透视表的另一个示例

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  $url,
  "WITH ranked_data AS (
        SELECT
            country,
            name,
            year,
            population,
            ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS rank
        FROM 'neo4j_tmp_table'
        ORDER BY rank, country, name
    )
    SELECT *
    FROM ranked_data
    PIVOT (
        sum(population)
        FOR country IN ('NL', 'US')
        GROUP BY year
    )"
)

或直接使用 PIVOT <table> ON <column> 子句

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  $url,
  "PIVOT 'neo4j_tmp_table'
    ON year
    USING sum(population)
    ORDER by name"
)

在 DuckDB 中,我们还可以使用 jdbc:duckdb: URL 使用内存实例

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  'jdbc:duckdb:',
  "PIVOT 'neo4j_tmp_table'
    ON year
    USING sum(population)
    ORDER by name"
)

MySQL

返回当前行在其分区内的排名(带间隙)。有关详细信息,请访问 https://dev.mysqlserver.cn/doc/refman/8.4/en/window-function-descriptions.html#function_rank

CALL apoc.jdbc.analytics(
 "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
 $url,
 "SELECT
     country,
     name,
     year,
     population,
     RANK() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
  FROM 'neo4j_tmp_table'
  ORDER BY country, name;",
 $params,
 { provider: "MYSQL" })

这是 MySQL 中 ROW_NUMBER 窗口函数的示例

CALL apoc.jdbc.analytics(
 "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
 $url,
 "SELECT
      country,
      name,
      year,
      population,
      ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
  FROM 'neo4j_tmp_table'
  ORDER BY country, name;",
 $params,
 { provider: "MYSQL" })

PostgreSQL

这是一个使用窗口函数的示例。

CALL apoc.jdbc.analytics(
 "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
 $url,
 "SELECT
      country,
      name,
      year,
      population,
      RANK() OVER (PARTITION BY country ORDER BY year DESC) rank
  FROM 'neo4j_tmp_table'
  ORDER BY rank, country, name;",
 $params,
 { provider: "POSTGRES" })
© . This site is unofficial and not affiliated with Neo4j, Inc.