自定义管线教程
自定义管线教程
相关源文件
以下文件为本 Wiki 页面生成时使用的上下文:
cognee/modules/pipelines/models/PipelineContext.pycognee/modules/pipelines/operations/run_pipeline.pycognee/modules/pipelines/tasks/task.pycognee/pipelines/__init__.pycognee/pipelines/types.pycognee/tests/unit/infrastructure/engine/test_identity_fields.pycognee/tests/unit/pipelines/test_simplified_pipelines.pyexamples/custom_pipelines/agentic_reasoning_procurement_example.pyexamples/custom_pipelines/memify_coding_agent_rule_extraction_example.pyexamples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.pyexamples/demos/dynamic_multiple_weighted_edges_example.pyexamples/demos/web_url_content_ingestion_example.pyexamples/guides/consolidate_entity_descriptions_example.pyexamples/guides/custom_data_models.pyexamples/guides/custom_graph_model.pyexamples/guides/custom_prompts.pyexamples/guides/custom_tasks_and_pipelines.pyexamples/guides/graph_visualization.pyexamples/guides/importance_weight.pyexamples/guides/improve_quickstart.pyexamples/guides/ontology_quickstart.py
本教程演示如何在 Cognee 中通过组合 Task 对象并执行它们来创建自定义数据处理管线。自定义管线支持超越默认 add 和 cognify 操作的领域特定处理工作流,例如提取特定组织层级或执行自定义实体解析。
关于内置管线任务和执行框架的信息,请参见管线任务与执行。
理解任务抽象
Cognee 管线由包装 Python 函数的 Task 对象组成。每个任务按顺序处理数据,前一个任务的输出会传入下一个任务。
任务函数签名模式
任务函数要求:
- 必须是
async函数、生成器或协程。cognee/modules/pipelines/tasks/task.py:173-178 - 第一个参数接收来自前一个任务或初始管线数据的数据。
- 可以接受
ctx参数(一个PipelineContext实例),其中包含user、dataset_id和pipeline_name。cognee/modules/pipelines/models/PipelineContext.py:8-14 handle_task中的编排逻辑会在传递ctx之前检查函数签名是否接受该参数。cognee/modules/pipelines/operations/run_tasks_base.py:145-149
来源: cognee/modules/pipelines/tasks/task.py:173-206、cognee/modules/pipelines/operations/run_tasks_base.py:123-166、cognee/modules/pipelines/models/PipelineContext.py:8-14
任务包装器配置
Task 类提供了批处理和参数传递的配置选项。现代 Cognee 用法还支持 @task 装饰器和 TaskSpec 用于延迟执行。cognee/modules/pipelines/tasks/task.py:34-62
| 参数 | 类型 | 用途 | 示例 |
|---|---|---|---|
executable | 可调用对象 | 要执行的异步函数 | extract_people |
*args | 任意类型 | 传递给函数的位置参数 | max_chunk_size=512 |
task_config | 字典 | 配置项,如 batch_size | {"batch_size": 100} |
**kwargs | 任意类型 | 额外的关键字参数 | graph_model=CustomModel |
来源: cognee/modules/pipelines/tasks/task.py:187-206、cognee/modules/pipelines/operations/run_tasks_base.py:166-180
创建自定义任务函数
自定义任务函数根据其在管线中的角色遵循特定模式。
数据转换任务模式
转换数据的任务函数通常接收一个数据项列表或单个数据项,并返回转换后的数据。
来自自定义组织层级管线的示例: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:60-94
def ingest_files(data: List[Any]) -> List[Company]:
all_companies: List[Company] = []
# 将原始数据转换为 Company DataPoint 的逻辑
for data_item in data:
# ... 处理 ...
all_companies.append(Company(name=company["name"], ...))
return all_companies
存储任务模式
写入数据库的任务通常接收处理后的 DataPoint 对象并将其存储到配置的数据库中。add_data_points 任务通常用于自定义管线的末尾以持久化结果。
cognee/tasks/storage/__init__.py:1-5
from cognee.tasks.storage import add_data_points
# 这个内置任务接收 DataPoint 并将其保存到向量和图存储中
Task(add_data_points)
来源: examples/guides/custom_tasks_and_pipelines.py:76-79、cognee/tasks/storage/__init__.py:1-5
组装和运行自定义管线
管线执行与来源追踪
当管线运行时,Cognee 会自动为 DataPoint 对象打上来源信息,包括 source_pipeline、source_task 和 source_user。这由任务执行期间的 _stamp_provenance 函数处理。cognee/modules/pipelines/operations/run_tasks_base.py:33-92
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-92、cognee/modules/pipelines/operations/run_tasks_base.py:123-183
run_custom_pipeline 接口
用户可以使用高级 run_custom_pipeline API 运行自定义任务列表。
examples/guides/custom_tasks_and_pipelines.py:81-83
await cognee.run_custom_pipeline(
tasks=[
Task(extract_people),
Task(add_data_points),
],
data=build_lightweight_data_object(text_data),
dataset="people_demo"
)
代码到实体的映射
此图将高级管线概念桥接到实现中使用的具体代码实体。
来源: cognee/modules/pipelines/operations/run_tasks_base.py:182-194、cognee/modules/pipelines/tasks/task.py:172-186、cognee/infrastructure/engine/models/DataPoint.py:27-55、cognee/api/v1/visualize/visualize.py:12-20
实现示例:组织层级
要实现领域特定的管线,请定义您的 DataPoint 模型和转换任务。在 metadata 中使用 identity_fields 可以实现确定性 ID 生成和自动去重。cognee/tests/unit/infrastructure/engine/test_identity_fields.py:11-20
examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-28
class Person(DataPoint):
name: str
metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}
class Department(DataPoint):
name: str
employees: list[Person]
metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}
自定义管线的可视化
自定义管线通常会产生独特的图结构。您可以使用 visualize_graph 来可视化这些结构。可视化系统使用来源数据(source_task、source_pipeline)为节点着色。
cognee/modules/visualization/cognee_network_visualization.py:85-88
task_color_map = _generate_provenance_colors([n.get("source_task") for n in nodes_list])
pipeline_color_map = _generate_provenance_colors([n.get("source_pipeline") for n in nodes_list])
来源: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-94、cognee/modules/visualization/cognee_network_visualization.py:22-112、cognee/infrastructure/engine/models/DataPoint.py:105-131