用户指南:管道

本页提供有关如何创建管道的信息。

注意

管道以异步方式运行,见下例。

创建组件

组件是执行简单任务的异步工作单元,例如对文档进行分块或将结果保存到 Neo4j。此软件包包含一些默认组件,但开发者可以按照以下步骤创建自己的组件。

  1. 创建 Pydantic neo4j_graphrag.experimental.pipeline.DataModel 的子类,以表示组件返回的数据。

  2. 创建 neo4j_graphrag.experimental.pipeline.Component 的子类。

  3. 在新类中创建一个 run 方法,并使用刚创建的 DataModel 指定必需的输入和输出模型。

  4. 实现 run 方法:它是一个 async 方法,允许在该方法内部并行化任务并使用 await 等待它们完成。

下面给出一个示例,在该示例中创建了 ComponentAdd,用于将两个数字相加并返回结果和。

from neo4j_graphrag.experimental.pipeline import Component, DataModel

class IntResultModel(DataModel):
    result: int

class ComponentAdd(Component):
    async def run(self, number1: int, number2: int = 1) -> IntResultModel:
        return IntResultModel(result = number1 + number2)

在 API 文档中阅读更多关于 组件 的信息。

在管道中连接组件

创建组件的最终目标是将它们组装成一个用于特定目的的复杂管道,例如从文本数据构建知识图谱。

下面演示如何创建一个简单管道,并将结果从一个组件传播到另一个组件(详细解释如下)。

import asyncio
from neo4j_graphrag.experimental.pipeline import Pipeline

pipe = Pipeline()
pipe.add_component(ComponentAdd(), "a")
pipe.add_component(ComponentAdd(), "b")

pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4}}))
# result: 10+1+4 = 15
  1. 首先,创建一个管道,并向其中添加两个名为 “a” 与 “b” 的组件。

  2. 接下来,将这两个组件连接,使得 “b” 在 “a” 之后运行,并且组件 “b” 的 “number2” 参数取自组件 “a” 的结果。

  3. 最后,使用 10 和 1 作为 “a” 的输入参数运行管道。组件 “b” 将收到 11(10 + 1,即 “a” 的结果)作为 “number1”,以及 4 作为 “number2”(这在 pipeline.run 参数中指定)。

数据流在下图中示意:

10 ---\
        Component "a" -> 11
1 ----/                   \
                           \
                             Component "b" -> 15
4 -------------------------/

警告

循环图

管道中不允许出现循环。

警告

忽略的用户输入

如果用户在 pipeline.run 方法中提供了输入,同时在 connect 方法中通过 input_config 也提供了输入,则用户输入会被忽略。以下管道是对前面示例的改编。

pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4, "number2": 42}}))

结果仍然是 15,因为用户输入 “number2”: 42 被忽略了。

可视化管道

可以使用 draw 方法对管道进行可视化。

from neo4j_graphrag.experimental.pipeline import Pipeline

pipe = Pipeline()
# ... define components and connections

pipe.draw("pipeline.html")

下面是一个管道的示例渲染,展示为交互式 HTML 可视化。

# To view the visualization in a browser
import webbrowser
webbrowser.open("pipeline.html")

默认情况下,未映射到任何组件的输出字段会被隐藏。通过将 hide_unused_outputs 设置为 False,可以将它们添加到可视化中。

pipe.draw("pipeline_full.html", hide_unused_outputs=False)

# To view the full visualization in a browser
import webbrowser
webbrowser.open("pipeline_full.html")

添加事件回调

可以添加回调函数,以接收管道进度的通知。

  • PIPELINE_STARTED,当管道启动时

  • PIPELINE_FINISHED,当管道结束时

  • TASK_STARTED,当任务开始时

  • TASK_PROGRESS,由每个组件发送(取决于组件实现,见下文)

  • TASK_FINISHED,当任务结束时

请参阅 PipelineEventTaskEvent 了解每种事件类型中发送的内容。

import asyncio
import logging

from neo4j_graphrag.experimental.pipeline import Pipeline
from neo4j_graphrag.experimental.pipeline.notification import Event

logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.WARNING)


async def event_handler(event: Event) -> None:
    """Function can do anything about the event,
    here we're just logging it if it's a pipeline-level event.
    """
    if event.event_type.is_pipeline_event:
        logger.warning(event)

pipeline = Pipeline(
    callback=event_handler,
)
# ... add components, connect them as usual

await pipeline.run(...)

从组件发送事件

组件可以通过实现 run_from_context 方法,使用 context_ 中的 notify 函数发送进度通知。

from neo4j_graphrag.experimental.pipeline import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types.context import RunContext

class IntResultModel(DataModel):
    result: int

class ComponentAdd(Component):
    async def run_with_context(self, context_: RunContext, number1: int, number2: int = 1) -> IntResultModel:
        for fake_iteration in range(10):
            await context_.notify(
                message=f"Starting iteration {fake_iteration} out of 10",
                data={"iteration": fake_iteration, "total": 10}
            )
        return IntResultModel(result = number1 + number2)

这将在管道回调中发送一次 TASK_PROGRESS 事件。

注意

在未来的版本中,context_ 参数将被添加到 run 方法中。

© . This site is unofficial and not affiliated with Neo4j, Inc.