用户指南:管道¶
本页提供有关如何创建管道的信息。
注意
管道以异步方式运行,见下例。
创建组件¶
组件是执行简单任务的异步工作单元,例如对文档进行分块或将结果保存到 Neo4j。此软件包包含一些默认组件,但开发者可以按照以下步骤创建自己的组件。
创建 Pydantic neo4j_graphrag.experimental.pipeline.DataModel 的子类,以表示组件返回的数据。
创建 neo4j_graphrag.experimental.pipeline.Component 的子类。
在新类中创建一个 run 方法,并使用刚创建的 DataModel 指定必需的输入和输出模型。
实现 run 方法:它是一个 async 方法,允许在该方法内部并行化任务并使用 await 等待它们完成。
下面给出一个示例,在该示例中创建了 ComponentAdd,用于将两个数字相加并返回结果和。
from neo4j_graphrag.experimental.pipeline import Component, DataModel
class IntResultModel(DataModel):
result: int
class ComponentAdd(Component):
async def run(self, number1: int, number2: int = 1) -> IntResultModel:
return IntResultModel(result = number1 + number2)
在 API 文档中阅读更多关于 组件 的信息。
在管道中连接组件¶
创建组件的最终目标是将它们组装成一个用于特定目的的复杂管道,例如从文本数据构建知识图谱。
下面演示如何创建一个简单管道,并将结果从一个组件传播到另一个组件(详细解释如下)。
import asyncio
from neo4j_graphrag.experimental.pipeline import Pipeline
pipe = Pipeline()
pipe.add_component(ComponentAdd(), "a")
pipe.add_component(ComponentAdd(), "b")
pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4}}))
# result: 10+1+4 = 15
首先,创建一个管道,并向其中添加两个名为 “a” 与 “b” 的组件。
接下来,将这两个组件连接,使得 “b” 在 “a” 之后运行,并且组件 “b” 的 “number2” 参数取自组件 “a” 的结果。
最后,使用 10 和 1 作为 “a” 的输入参数运行管道。组件 “b” 将收到 11(10 + 1,即 “a” 的结果)作为 “number1”,以及 4 作为 “number2”(这在 pipeline.run 参数中指定)。
数据流在下图中示意:
10 ---\
Component "a" -> 11
1 ----/ \
\
Component "b" -> 15
4 -------------------------/
警告
循环图
管道中不允许出现循环。
警告
忽略的用户输入
如果用户在 pipeline.run 方法中提供了输入,同时在 connect 方法中通过 input_config 也提供了输入,则用户输入会被忽略。以下管道是对前面示例的改编。
pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4, "number2": 42}}))
结果仍然是 15,因为用户输入 “number2”: 42 被忽略了。
可视化管道¶
可以使用 draw 方法对管道进行可视化。
from neo4j_graphrag.experimental.pipeline import Pipeline
pipe = Pipeline()
# ... define components and connections
pipe.draw("pipeline.html")
下面是一个管道的示例渲染,展示为交互式 HTML 可视化。
# To view the visualization in a browser
import webbrowser
webbrowser.open("pipeline.html")
默认情况下,未映射到任何组件的输出字段会被隐藏。通过将 hide_unused_outputs 设置为 False,可以将它们添加到可视化中。
pipe.draw("pipeline_full.html", hide_unused_outputs=False)
# To view the full visualization in a browser
import webbrowser
webbrowser.open("pipeline_full.html")
添加事件回调¶
可以添加回调函数,以接收管道进度的通知。
PIPELINE_STARTED,当管道启动时
PIPELINE_FINISHED,当管道结束时
TASK_STARTED,当任务开始时
TASK_PROGRESS,由每个组件发送(取决于组件实现,见下文)
TASK_FINISHED,当任务结束时
请参阅 PipelineEvent 和 TaskEvent 了解每种事件类型中发送的内容。
import asyncio
import logging
from neo4j_graphrag.experimental.pipeline import Pipeline
from neo4j_graphrag.experimental.pipeline.notification import Event
logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.WARNING)
async def event_handler(event: Event) -> None:
"""Function can do anything about the event,
here we're just logging it if it's a pipeline-level event.
"""
if event.event_type.is_pipeline_event:
logger.warning(event)
pipeline = Pipeline(
callback=event_handler,
)
# ... add components, connect them as usual
await pipeline.run(...)
从组件发送事件¶
组件可以通过实现 run_from_context 方法,使用 context_ 中的 notify 函数发送进度通知。
from neo4j_graphrag.experimental.pipeline import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types.context import RunContext
class IntResultModel(DataModel):
result: int
class ComponentAdd(Component):
async def run_with_context(self, context_: RunContext, number1: int, number2: int = 1) -> IntResultModel:
for fake_iteration in range(10):
await context_.notify(
message=f"Starting iteration {fake_iteration} out of 10",
data={"iteration": fake_iteration, "total": 10}
)
return IntResultModel(result = number1 + number2)
这将在管道回调中发送一次 TASK_PROGRESS 事件。
注意
在未来的版本中,context_ 参数将被添加到 run 方法中。