ElasticSearch

限定名称 类型 版本

apoc.es.stats

apoc.es.stats(host-or-key,$config) - elastic search 统计信息

过程

Apoc Extended

apoc.es.get

apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 GET 操作

过程

Apoc Extended

apoc.es.query

apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 SEARCH 操作

过程

Apoc Extended

apoc.es.getRaw

apoc.es.getRaw(host-or-key,path,payload-or-null,$config) yield value - 在 elastic search 上执行原生 GET 操作

过程

Apoc Extended

apoc.es.postRaw

apoc.es.postRaw(host-or-key,path,payload-or-null,$config) yield value - 在 elastic search 上执行原生 POST 操作

过程

Apoc Extended

apoc.es.post

apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 POST 操作

过程

Apoc Extended

apoc.es.put

apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 PUT 操作

过程

Apoc Extended

apoc.es.delete

apoc.es.delete(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,$config) yield value - 在 elastic search 上执行 DELETE 操作

过程

Apoc Extended

目前无法通过证书查询 Elastic 8,只能通过配置 "xpack.security.http.ssl.enabled=false" 禁用 SSL,使用 header 配置中的基本身份验证(请参阅下文的 config 参数),或者(不推荐)通过 xpack.security.enabled=false 禁用安全性。

示例

call apoc.es.post("localhost","tweets","users",null,{name:"Chris"})
call apoc.es.put("localhost","tweets","users","1",null,{name:"Chris"})
call apoc.es.get("localhost","tweets","users","1",null,null)
call apoc.es.stats("localhost")
call apoc.es.delete("localhost","indexName","typeName","idName")
apoc.es.get

分页

要使用 Elasticsearch 的分页功能,必须遵循以下步骤:

  1. 调用 apoc.es.query 获取第一块数据,并同时获取 scroll_id(以启用分页)。

  2. 使用前 N 条记录执行你的 merge/create 等操作。

  3. 使用 range(start,end,step) 函数重复调用,以获取剩余的所有数据块直到结束。例如,如果你有 1000 个文档,且每次请求获取 10 个文档,你可以执行 range(11,1000,10)。从 11 开始是因为前 10 个文档已经处理过了。如果你不知道准确的上限(文档总数),可以设置一个大于实际总数的数字。

  4. 重复调用的第二个函数是 apoc.es.get。记得将 scroll_id 设置为参数。

  5. 然后像处理第一块数据一样处理后续每一块数据的结果。

以下是一个示例:

// It's important to create an index to improve performance
CREATE INDEX FOR (n:Document) ON (n.id)
// First query: get first chunk of data + the scroll_id for pagination
CALL apoc.es.query('localhost','test-index','test-type','name:Neo4j&size=1&scroll=5m',null) yield value with value._scroll_id as scrollId, value.hits.hits as hits
// Do something with hits
UNWIND hits as hit
// Here we simply create a document and a relation to a company
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company)
// Then call for the other docs and use the scrollId value from previous query
// Use a range to count our chunk of data (i.e. i want to get chunks from 2 to 10)
WITH range(2,10,1) as list, scrollId
UNWIND list as count
CALL apoc.es.get("localhost","_search","scroll",null,{scroll:"5m",scroll_id:scrollId},null) yield value with value._scoll_id as scrollId, value.hits.hits as nextHits
// Again, do something with hits
UNWIND nextHits as hit
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company) return scrollId, doc, company

该示例在拥有 16GB 内存的 MacBook Pro 上进行了测试。将 20000 个文档从 ES 加载到 Neo4j(每次请求 100 个文档)耗时 1 分钟。

常规结构与参数

call apoc.es.post(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value

// GET/PUT/POST url/index/type/id?query -d payload

host-or-key 参数

该参数可以是:

例如,使用 apoc.es.stats 时,我们可以执行:

CALL apoc.es.stats('http://username:password@host:port')

此外,它可以是在 apoc.conf 中查询的条目:

  • 查找 apoc.es.url

  • 查找 apoc.es.host

这比作为第一个参数的直接字符串主机或 URL 具有更高的优先级。

例如,当 apoc.conf 如下所示时:

apoc.es.url=http://username:password@host:port

或像这样:

apoc.es.host=username:password@host:port

我们可以通过将第一个参数设为 null 来连接到 elastic。

例如,使用 apoc.es.stats 时,我们可以执行:

CALL apoc.es.stats(null)

此外,它可以是在 apoc.conf 中查询的条目,其中 <key> 必须放在第一个参数中:

  • 通过键查找 apoc.es.<key>.url

  • 通过键查找 apoc.es.<key>.host

例如,当 apoc.conf 如下所示时:

apoc.es.custom.url=http://username:password@host:port

或者像这样:

apoc.es.custom.host=username:password@host:port

我们可以通过将第一个参数设为 null 来连接到 elastic。

例如,使用 apoc.es.stats 时,我们可以执行:

CALL apoc.es.stats('custom')

index 参数

主 ES 索引,将直接发送;如果为 null,则为 "_all";多个索引可在字符串中用逗号分隔。

type 参数

文档类型,将直接发送;如果为 null,则为 "_all";多个类型可在字符串中用逗号分隔。

id 参数

文档 ID,为 null 时将被忽略。

query 参数

Query 可以是一个转换为查询字符串的映射、直接字符串,或者为 null(此时会被忽略)。

payload 参数

Payload 可以是将被转换为 JSON 负载的 map,或者直接发送的字符串,或者 null。

config 参数

Config 可以是可选的 map,它可以包含以下条目:

表 1. 配置参数
名称 (name) type 默认 description(描述)

headers

Map

{content-type: "application/json", method, "<httpMethod>"}

包含一个用于添加(或替换)默认头的头部映射。APOC 需要 method: <httpMethod> 来在底层确定要使用的 HTTP 请求方法。即,默认情况下,apoc.es.put 使用 PUTapoc.es.postapoc.es.postRaw 使用 POST,其他情况使用 GET

version

字符串

DEFAULT

可以是 DEFAULTEIGHT,以便根据 Elastic 版本更改 RestAPI 端点。请参阅下方的 Endpoint 表格。

例如,使用 apoc.es.stats 时,我们可以执行:

CALL apoc.es.stats('custom', { headers: {Authorization: "Basic <Base64Token>"} })

用于使用 基本身份验证 (Basic authentication) 并创建以下 HTTP 头部:

Authorization: Basic <Base64Token>
method: GET
Content-Type: application/json

Elastic 8 中的某些 API 可以由过程直接调用,无需 {version: 'EIGHT'} 配置,例如 apoc.es.stats,但对于许多 API,必须设置它才能正确处理端点,例如 apoc.es.query

表 2. 端点 (Endpoint)
过程 使用版本: DEFAULT 使用版本: EIGHT

apoc.es.stats(host)

<host>/_stats

DEFAULT

apoc.es.query(host, index, type, query, payload, $conf)

<host>/<index param>/<type param>/_stats?<query param>

<host>/<index param>/_stats?<query param>

apoc.es.getRaw/apoc.es.postRaw(host, path, payload, $conf)

<host>/<path param>

DEFAULT

其他 apoc.es.<name>(host, index, type, id, query, payload, $conf) 过程

<host>/<index param>/<type param>/<id param>_stats?<query param> 默认情况下,<index param><id param> 将被填充为 _all,而如果 <id param> 不存在,它将从端点中移除。

<host>/<index param>/<type param>/<id param>_stats?<query param>。注意,你只需要在 <index param>, <id param><type param> 三者中输入一个值,其余的将最终从端点中排除。

类型参数通常是一个下划线字符串,表示 API 的类型,例如 _doc_update(之前表示 映射类型)。这允许你调用例如 此 API

例如,使用 apoc.es.query,我们可以执行搜索 API。

CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' })

通过 Update API 更新 Elastic 8 中的文档。

CALL apoc.es.put($host,'<indexName>','_doc','<idName>','refresh=true',{name: 'foo'}, {version: 'EIGHT'})

在 Elastic 8 中调用 创建索引 API

CALL apoc.es.put($host,'<indexName>', null, null, null, null, { version: 'EIGHT' })

结果

结果是 value 中的映射流。

倒数排名融合 (RRF)

可以使用 Neo4j 通过 ES 执行 RRF。有关详细信息,请阅读 官方文档。请注意,此 API 自 Elastic 8.14.x 版本起受支持。

以下是 Neo4j 与 ES 结合使用的示例。

步骤 1 - 创建映射

CALL apoc.es.put($host, 'example-index', null, null, null,
{
              "mappings": {
                "properties": {
                  "text": {
                    "type": "text"
                  },
                  "vector": {
                    "type": "dense_vector",
                    "dims": 1,
                    "index": true,
                    "similarity": "l2_norm"
                  },
                  "integer": {
                    "type": "integer"
                  }
                }
              }
            }, $config)

结果

结果是 value 中的映射流。

步骤 2 - 存入文档

CALL apoc.es.put($host, 'example-index/_doc/1', null, null, null,
{
    "text" : "rrf",
    "vector" : [5],
    "integer": 1
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/2', null, null, null,
{
    "text" : "rrf rrf",
    "vector" : [4],
    "integer": 2
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/3', null, null, null,
{
    "text" : "rrf rrf rrf",
    "vector" : [3],
    "integer": 1
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/4', null, null, null,
{
    "text" : "rrf rrf rrf rrf",
    "integer": 2
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/5', null, null, null,
{
    "vector" : [0],
    "integer": 1
}, $config)

结果

结果是 value 中的映射流。

步骤 3 - 刷新索引

CALL apoc.es.post($host, 'example-index/_refresh', null, null, '', $config)

结果

结果是 value 中的映射流。

步骤 4 - 使用 RRF 检索器执行搜索

CALL apoc.es.getRaw($host,'example-index/_search',
{
    "retriever": {
        "rrf": {
            "retrievers": [
                {
                    "standard": {
                        "query": {
                            "term": {
                                "text": "rrf"
                            }
                        }
                    }
                },
                {
                    "knn": {
                        "field": "vector",
                        "query_vector": [3],
                        "k": 5,
                        "num_candidates": 5
                    }
                }
            ],
            "window_size": 5,
            "rank_constant": 1
        }
    },
    "size": 3,
    "aggs": {
        "int_count": {
            "terms": {
                "field": "integer"
            }
        }
    }
}
,$config) yield value

结果

结果是 value 中的映射流。

© . This site is unofficial and not affiliated with Neo4j, Inc.