agentic_huge_data_base / wiki
页面 Cognee · 12.5 自定义管线教程·DeepWiki 中文全文译文

12.5 · 自定义管线教程(Custom Pipeline Tutorial)

记忆管道与知识图谱构建 · 聚焦本章的模块关系、源码依据与实现要点。

项目Cognee 章节12.5 状态全文译文 模块入库与解析、图谱与关系、文档对象与元数据、工作流与编排
源码线索
  • cognee/modules/pipelines/models/PipelineContext.py
  • cognee/modules/pipelines/operations/run_pipeline.py
  • cognee/modules/pipelines/tasks/task.py
  • cognee/pipelines/__init__.py
  • cognee/pipelines/types.py
  • cognee/tests/unit/infrastructure/engine/test_identity_fields.py
  • cognee/tests/unit/pipelines/test_simplified_pipelines.py
  • examples/custom_pipelines/agentic_reasoning_procurement_example.py
  • examples/custom_pipelines/memify_coding_agent_rule_extraction_example.py
  • examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py
模块标签
  • 入库与解析
  • 图谱与关系
  • 文档对象与元数据
  • 工作流与编排
  • 界面与交互

章节正文

自定义管线教程

自定义管线教程

相关源文件

以下文件为本 Wiki 页面生成时使用的上下文:

  • cognee/modules/pipelines/models/PipelineContext.py
  • cognee/modules/pipelines/operations/run_pipeline.py
  • cognee/modules/pipelines/tasks/task.py
  • cognee/pipelines/__init__.py
  • cognee/pipelines/types.py
  • cognee/tests/unit/infrastructure/engine/test_identity_fields.py
  • cognee/tests/unit/pipelines/test_simplified_pipelines.py
  • examples/custom_pipelines/agentic_reasoning_procurement_example.py
  • examples/custom_pipelines/memify_coding_agent_rule_extraction_example.py
  • examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py
  • examples/demos/dynamic_multiple_weighted_edges_example.py
  • examples/demos/web_url_content_ingestion_example.py
  • examples/guides/consolidate_entity_descriptions_example.py
  • examples/guides/custom_data_models.py
  • examples/guides/custom_graph_model.py
  • examples/guides/custom_prompts.py
  • examples/guides/custom_tasks_and_pipelines.py
  • examples/guides/graph_visualization.py
  • examples/guides/importance_weight.py
  • examples/guides/improve_quickstart.py
  • examples/guides/ontology_quickstart.py

本教程演示如何在 Cognee 中通过组合 Task 对象并执行它们来创建自定义数据处理管线。自定义管线支持超越默认 addcognify 操作的领域特定处理工作流,例如提取特定组织层级或执行自定义实体解析。

关于内置管线任务和执行框架的信息,请参见管线任务与执行

理解任务抽象

Cognee 管线由包装 Python 函数的 Task 对象组成。每个任务按顺序处理数据,前一个任务的输出会传入下一个任务。

任务函数签名模式
Cognee · 任务函数签名模式 · 图 1
Cognee · 任务函数签名模式 · 图 1

任务函数要求:

  • 必须是 async 函数、生成器或协程。cognee/modules/pipelines/tasks/task.py:173-178
  • 第一个参数接收来自前一个任务或初始管线数据的数据。
  • 可以接受 ctx 参数(一个 PipelineContext 实例),其中包含 userdataset_idpipeline_namecognee/modules/pipelines/models/PipelineContext.py:8-14
  • handle_task 中的编排逻辑会在传递 ctx 之前检查函数签名是否接受该参数。cognee/modules/pipelines/operations/run_tasks_base.py:145-149

来源: cognee/modules/pipelines/tasks/task.py:173-206cognee/modules/pipelines/operations/run_tasks_base.py:123-166cognee/modules/pipelines/models/PipelineContext.py:8-14

任务包装器配置

Task 类提供了批处理和参数传递的配置选项。现代 Cognee 用法还支持 @task 装饰器和 TaskSpec 用于延迟执行。cognee/modules/pipelines/tasks/task.py:34-62

参数类型用途示例
executable可调用对象要执行的异步函数extract_people
*args任意类型传递给函数的位置参数max_chunk_size=512
task_config字典配置项,如 batch_size{"batch_size": 100}
**kwargs任意类型额外的关键字参数graph_model=CustomModel
Cognee · 任务包装器配置 · 图 2
Cognee · 任务包装器配置 · 图 2

来源: cognee/modules/pipelines/tasks/task.py:187-206cognee/modules/pipelines/operations/run_tasks_base.py:166-180

创建自定义任务函数

自定义任务函数根据其在管线中的角色遵循特定模式。

数据转换任务模式

转换数据的任务函数通常接收一个数据项列表或单个数据项,并返回转换后的数据。

来自自定义组织层级管线的示例: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:60-94

def ingest_files(data: List[Any]) -> List[Company]:
    all_companies: List[Company] = []
    # 将原始数据转换为 Company DataPoint 的逻辑
    for data_item in data:
        # ... 处理 ...
        all_companies.append(Company(name=company["name"], ...))
    return all_companies
存储任务模式

写入数据库的任务通常接收处理后的 DataPoint 对象并将其存储到配置的数据库中。add_data_points 任务通常用于自定义管线的末尾以持久化结果。

cognee/tasks/storage/__init__.py:1-5

from cognee.tasks.storage import add_data_points

# 这个内置任务接收 DataPoint 并将其保存到向量和图存储中
Task(add_data_points)

来源: examples/guides/custom_tasks_and_pipelines.py:76-79cognee/tasks/storage/__init__.py:1-5

组装和运行自定义管线

管线执行与来源追踪

当管线运行时,Cognee 会自动为 DataPoint 对象打上来源信息,包括 source_pipelinesource_tasksource_user。这由任务执行期间的 _stamp_provenance 函数处理。cognee/modules/pipelines/operations/run_tasks_base.py:33-92

Cognee · 管线执行与来源追踪 · 图 3
Cognee · 管线执行与来源追踪 · 图 3

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-92cognee/modules/pipelines/operations/run_tasks_base.py:123-183

run_custom_pipeline 接口

用户可以使用高级 run_custom_pipeline API 运行自定义任务列表。

examples/guides/custom_tasks_and_pipelines.py:81-83

await cognee.run_custom_pipeline(
    tasks=[
        Task(extract_people),
        Task(add_data_points),
    ],
    data=build_lightweight_data_object(text_data),
    dataset="people_demo"
)
代码到实体的映射

此图将高级管线概念桥接到实现中使用的具体代码实体。

Cognee · 代码到实体的映射 · 图 4
Cognee · 代码到实体的映射 · 图 4

来源: cognee/modules/pipelines/operations/run_tasks_base.py:182-194cognee/modules/pipelines/tasks/task.py:172-186cognee/infrastructure/engine/models/DataPoint.py:27-55cognee/api/v1/visualize/visualize.py:12-20

实现示例:组织层级

要实现领域特定的管线,请定义您的 DataPoint 模型和转换任务。在 metadata 中使用 identity_fields 可以实现确定性 ID 生成和自动去重。cognee/tests/unit/infrastructure/engine/test_identity_fields.py:11-20

examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-28

class Person(DataPoint):
    name: str
    metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}

class Department(DataPoint):
    name: str
    employees: list[Person]
    metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}
自定义管线的可视化

自定义管线通常会产生独特的图结构。您可以使用 visualize_graph 来可视化这些结构。可视化系统使用来源数据(source_tasksource_pipeline)为节点着色。

cognee/modules/visualization/cognee_network_visualization.py:85-88

task_color_map = _generate_provenance_colors([n.get("source_task") for n in nodes_list])
pipeline_color_map = _generate_provenance_colors([n.get("source_pipeline") for n in nodes_list])

来源: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-94cognee/modules/visualization/cognee_network_visualization.py:22-112cognee/infrastructure/engine/models/DataPoint.py:105-131