agentic_huge_data_base / wiki
页面 Cognee · 3.2 管线任务与执行·DeepWiki 中文全文译文

3.2 · 管线任务与执行(Pipeline Tasks and Execution)

记忆管道与知识图谱构建 · 聚焦本章的模块关系、源码依据与实现要点。

项目Cognee 章节3.2 状态全文译文 模块工作流与编排、图谱与关系、测试、发布与运维、入库与解析
源码线索
  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
模块标签
  • 工作流与编排
  • 图谱与关系
  • 测试、发布与运维
  • 入库与解析
  • 认证、权限与安全

章节正文

管线任务与执行

管线任务与执行

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/models/PipelineContext.py
  • cognee/modules/pipelines/operations/pipeline.py
  • cognee/modules/pipelines/operations/run_pipeline.py
  • cognee/modules/pipelines/operations/run_tasks.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
  • cognee/modules/pipelines/operations/run_tasks_distributed.py
  • cognee/modules/pipelines/operations/run_tasks_with_telemetry.py
  • cognee/modules/pipelines/tasks/task.py
  • cognee/modules/users/permissions/methods/give_permission_on_dataset.py
  • cognee/modules/visualization/cognee_network_visualization.py
  • cognee/pipelines/__init__.py
  • cognee/pipelines/types.py
  • cognee/shared/CodeGraphEntities.py
  • cognee/tasks/ingestion/ingest_data.py
  • cognee/tasks/ingestion/save_data_item_to_storage.py
  • cognee/tasks/summarization/models.py
  • cognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.py
  • cognee/tests/unit/modules/visualization/visualization_test.py
  • cognee/tests/unit/pipelines/test_simplified_pipelines.py
  • distributed/entrypoint.py
  • distributed/tasks/queued_add_data_points.py
  • distributed/tasks/queued_add_edges.py
  • distributed/tasks/queued_add_nodes.py
  • distributed/workers/data_point_saving_worker.py
  • distributed/workers/graph_saving_worker.py
  • poetry.lock
  • pyproject.toml
  • uv.lock

本文档解释了驱动 Cognee 数据处理管线的任务执行系统。任务是工作流中的基本单元,负责在各个阶段(分类、片段切分、提取、图谱构建)对数据进行转换。本文涵盖任务的定义方式、如何组装成管线、如何通过批处理和错误处理执行,以及如何进行监控。

有关具体处理阶段(文档分类、实体提取等)的信息,请参见知识图谱生成(3.3)。有关任务执行前的数据入库详情,请参见数据入库(3.1)

任务定义与结构

Cognee 中的任务是围绕处理函数的包装对象,用于定义管线中的离散操作。Task 类(定义在 cognee/modules/pipelines/tasks/task.py 中)封装了一个可调用函数及其配置,并提供了一个 execute() 方法,该方法以异步方式生成结果。

Cognee · 任务定义与结构 · 图 1
Cognee · 任务定义与结构 · 图 1

来源: cognee/modules/pipelines/tasks/task.py:1-25cognee/api/v1/cognify/cognify.py:13cognee/api/v1/cognify/cognify.py:22-33

任务组件

一个 Task 对象包含以下部分:

  • executable:实际的处理函数(例如 classify_documentsextract_graph_and_summarize)。cognee/modules/pipelines/tasks/task.py:10
  • task_config:配置字典,其中 batch_size 控制输出批处理。cognee/modules/pipelines/tasks/task.py:11
  • execute():异步生成器方法,以批次方式生成处理后的结果。cognee/modules/pipelines/tasks/task.py:16-25
  • accepts_ctx:在构造时确定的标志,指示 executable 是否接受 PipelineContext 参数。cognee/modules/pipelines/operations/run_tasks_base.py:148-149

cognify 管线中使用的示例任务函数:

任务函数用途文件引用
classify_documents识别文档类型和结构cognee/tasks/documents/__init__.py
extract_chunks_from_documents将内容拆分为语义片段cognee/tasks/documents/__init__.py
extract_graph_and_summarize提取实体、关系并创建摘要cognee/tasks/graph/extract_graph_and_summarize.py
add_data_points将嵌入向量和图数据存储到数据库中cognee/tasks/storage/add_data_points.py
extract_events_and_timestamps时间实体和事件提取cognee/tasks/temporal_graph/extract_events_and_entities.py

来源: cognee/api/v1/cognify/cognify.py:22-33cognee/modules/pipelines/operations/run_tasks_base.py:145-151

管线组装

管线通过创建按顺序执行的 Task 对象列表来组装。cognify 函数是主要的编排器,负责根据用户配置构建这些任务列表。

Cognee · 管线组装 · 图 2
Cognee · 管线组装 · 图 2

来源: cognee/api/v1/cognify/cognify.py:248-307cognee/api/v1/cognify/cognify.py:310-347

管线执行流程

管线执行遵循层次化流程:run_pipeline()run_pipeline_per_dataset()run_tasks()run_tasks_data_item()run_tasks_base()handle_task()

Cognee · 管线执行流程 · 图 3
Cognee · 管线执行流程 · 图 3

来源: cognee/modules/pipelines/operations/pipeline.py:33-63cognee/modules/pipelines/operations/run_tasks.py:56-122cognee/modules/pipelines/operations/run_tasks_base.py:123-183

执行层次

第 1 层:run_pipeline() 顶层入口点。它负责校验任务、设置数据库环境,并解析用户已授权的数据集。cognee/modules/pipelines/operations/pipeline.py:33-49

第 2 层:run_pipeline_per_dataset() 针对特定 Dataset 执行管线。它会检查管线是否已经运行过(资格校验逻辑),然后调用 run_tasks()cognee/modules/pipelines/operations/pipeline.py:66-107

第 3 层:run_tasks() 编排数据集中所有数据项的执行。它会生成一个 pipeline_id,记录开始日志,并使用 asyncio.Semaphore 根据 data_per_batch 控制并发度。cognee/modules/pipelines/operations/run_tasks.py:74-117

第 4 层:run_tasks_base() 驱动顺序任务执行的递归引擎。它决定下一个任务的批次大小,并调用 handle_task()cognee/modules/pipelines/operations/run_tasks_base.py:182-197

第 5 层:handle_task() 为任务执行添加可观测性(OpenTelemetry 跨度)、遥测和来源标记。它执行 running_task,然后递归调用 run_tasks_base 处理管线中的剩余任务。cognee/modules/pipelines/operations/run_tasks_base.py:123-183

批处理与并行化

Cognee 使用两种类型的批处理来优化性能:

  1. 数据级并发(data_per_batch:在 run_tasks() 中,使用信号量限制同时处理的 Data 项数量(默认值为 20)。cognee/modules/pipelines/operations/run_tasks.py:97-117
  2. 任务级批处理(batch_size:在 Task 内部,来自上一阶段的多个结果会先被合并成一个批次,然后再传递给当前任务的 executable。这对于基于大语言模型(LLM)的提取至关重要,可以减少 API 开销。cognee/modules/pipelines/operations/run_tasks_base.py:166cognee/modules/pipelines/tasks/task.py:11

来源: cognee/modules/pipelines/operations/run_tasks.py:97-117cognee/modules/pipelines/operations/run_tasks_base.py:166

数据来源与血缘

管线生成的每个 DataPoint 都会被打上元数据标记,以追踪其来源。该逻辑在 _stamp_provenance() 中实现。cognee/modules/pipelines/operations/run_tasks_base.py:33-91

来源标记逻辑

当任务生成数据时,handle_task() 会调用 _stamp_provenance(),该方法会递归遍历 DataPoint 及其字段:

  • source_pipeline:管线的名称(例如 "cognify")。cognee/modules/pipelines/operations/run_tasks_base.py:51-52
  • source_task:生成数据的 executable 函数名称。cognee/modules/pipelines/operations/run_tasks_base.py:53-54
  • source_user:运行管线的用户的邮箱或 ID。cognee/modules/pipelines/operations/run_tasks_base.py:55-56
  • source_node_set:从输入数据传播而来,用于对相关图元素进行分组。cognee/modules/pipelines/operations/run_tasks_base.py:59-63
  • source_content_hash:追踪原始数据内容的哈希值。cognee/modules/pipelines/operations/run_tasks_base.py:65-70
来源可视化

cognee/modules/visualization/cognee_network_visualization.py 中的可视化系统使用这些字段生成确定性的颜色映射,使用户能够直观地区分哪些任务或用户创建了知识图谱的特定部分。cognee/modules/visualization/cognee_network_visualization.py:11-19cognee/modules/visualization/cognee_network_visualization.py:85-88

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-91cognee/modules/visualization/cognee_network_visualization.py:85-96

错误处理

Cognee 在多个层级处理错误,以确保管线的健壮性:

  • 数据项级错误:在 run_tasks() 中,asyncio.gather 调用时设置了 return_exceptions=True。异常会被捕获、记录,并转换为针对该特定数据项的 PipelineRunErrored 对象。cognee/modules/pipelines/operations/run_tasks.py:119-141
  • 管线级错误:如果任何数据项失败,会抛出 PipelineRunFailedError 以指示整个管线运行失败。cognee/modules/pipelines/operations/run_tasks.py:146-150
  • 日志记录:所有开始、完成和错误信息都会通过 log_pipeline_run_startlog_pipeline_run_completelog_pipeline_run_error 持久化记录。cognee/modules/pipelines/operations/run_tasks.py:75cognee/modules/pipelines/operations/run_tasks.py:151-153cognee/modules/pipelines/operations/run_tasks.py:171-173

来源: cognee/modules/pipelines/operations/run_tasks.py:119-184

分布式执行

Cognee 支持使用 Modal 进行分布式执行。这是通过在 run_tasks 函数上添加 @override_run_tasks(run_tasks_distributed) 装饰器来实现的。cognee/modules/pipelines/operations/run_tasks.py:55

如果环境变量 COGNEE_DISTRIBUTED 设置为 True,执行流程会路由到 run_tasks_distributed(),该函数通过 graph_saving_workerdata_point_saving_worker 等工作进程处理任务的远程执行。cognee/modules/pipelines/operations/run_tasks.py:40-48

来源: cognee/modules/pipelines/operations/run_tasks.py:36-55cognee/modules/pipelines/operations/run_tasks_distributed.py