管线任务与执行
管线任务与执行
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
cognee/api/v1/add/add.pycognee/api/v1/cognify/cognify.pycognee/infrastructure/engine/models/DataPoint.pycognee/infrastructure/utils/run_sync.pycognee/modules/chunking/models/DocumentChunk.pycognee/modules/data/methods/get_authorized_existing_datasets.pycognee/modules/data/methods/load_or_create_datasets.pycognee/modules/data/models/Data.pycognee/modules/engine/models/Entity.pycognee/modules/engine/models/EntityType.pycognee/modules/pipelines/layers/pipeline_execution_mode.pycognee/modules/pipelines/methods/__init__.pycognee/modules/pipelines/models/PipelineContext.pycognee/modules/pipelines/operations/pipeline.pycognee/modules/pipelines/operations/run_pipeline.pycognee/modules/pipelines/operations/run_tasks.pycognee/modules/pipelines/operations/run_tasks_base.pycognee/modules/pipelines/operations/run_tasks_distributed.pycognee/modules/pipelines/operations/run_tasks_with_telemetry.pycognee/modules/pipelines/tasks/task.pycognee/modules/users/permissions/methods/give_permission_on_dataset.pycognee/modules/visualization/cognee_network_visualization.pycognee/pipelines/__init__.pycognee/pipelines/types.pycognee/shared/CodeGraphEntities.pycognee/tasks/ingestion/ingest_data.pycognee/tasks/ingestion/save_data_item_to_storage.pycognee/tasks/summarization/models.pycognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.pycognee/tests/unit/modules/visualization/visualization_test.pycognee/tests/unit/pipelines/test_simplified_pipelines.pydistributed/entrypoint.pydistributed/tasks/queued_add_data_points.pydistributed/tasks/queued_add_edges.pydistributed/tasks/queued_add_nodes.pydistributed/workers/data_point_saving_worker.pydistributed/workers/graph_saving_worker.pypoetry.lockpyproject.tomluv.lock
本文档解释了驱动 Cognee 数据处理管线的任务执行系统。任务是工作流中的基本单元,负责在各个阶段(分类、片段切分、提取、图谱构建)对数据进行转换。本文涵盖任务的定义方式、如何组装成管线、如何通过批处理和错误处理执行,以及如何进行监控。
有关具体处理阶段(文档分类、实体提取等)的信息,请参见知识图谱生成(3.3)。有关任务执行前的数据入库详情,请参见数据入库(3.1)。
任务定义与结构
Cognee 中的任务是围绕处理函数的包装对象,用于定义管线中的离散操作。Task 类(定义在 cognee/modules/pipelines/tasks/task.py 中)封装了一个可调用函数及其配置,并提供了一个 execute() 方法,该方法以异步方式生成结果。
来源: cognee/modules/pipelines/tasks/task.py:1-25、cognee/api/v1/cognify/cognify.py:13、cognee/api/v1/cognify/cognify.py:22-33
任务组件
一个 Task 对象包含以下部分:
executable:实际的处理函数(例如classify_documents、extract_graph_and_summarize)。cognee/modules/pipelines/tasks/task.py:10task_config:配置字典,其中batch_size控制输出批处理。cognee/modules/pipelines/tasks/task.py:11execute():异步生成器方法,以批次方式生成处理后的结果。cognee/modules/pipelines/tasks/task.py:16-25accepts_ctx:在构造时确定的标志,指示executable是否接受PipelineContext参数。cognee/modules/pipelines/operations/run_tasks_base.py:148-149
cognify 管线中使用的示例任务函数:
| 任务函数 | 用途 | 文件引用 |
|---|---|---|
classify_documents | 识别文档类型和结构 | cognee/tasks/documents/__init__.py |
extract_chunks_from_documents | 将内容拆分为语义片段 | cognee/tasks/documents/__init__.py |
extract_graph_and_summarize | 提取实体、关系并创建摘要 | cognee/tasks/graph/extract_graph_and_summarize.py |
add_data_points | 将嵌入向量和图数据存储到数据库中 | cognee/tasks/storage/add_data_points.py |
extract_events_and_timestamps | 时间实体和事件提取 | cognee/tasks/temporal_graph/extract_events_and_entities.py |
来源: cognee/api/v1/cognify/cognify.py:22-33、cognee/modules/pipelines/operations/run_tasks_base.py:145-151
管线组装
管线通过创建按顺序执行的 Task 对象列表来组装。cognify 函数是主要的编排器,负责根据用户配置构建这些任务列表。
来源: cognee/api/v1/cognify/cognify.py:248-307、cognee/api/v1/cognify/cognify.py:310-347
管线执行流程
管线执行遵循层次化流程:run_pipeline() → run_pipeline_per_dataset() → run_tasks() → run_tasks_data_item() → run_tasks_base() → handle_task()。
来源: cognee/modules/pipelines/operations/pipeline.py:33-63、cognee/modules/pipelines/operations/run_tasks.py:56-122、cognee/modules/pipelines/operations/run_tasks_base.py:123-183
执行层次
第 1 层:run_pipeline() 顶层入口点。它负责校验任务、设置数据库环境,并解析用户已授权的数据集。cognee/modules/pipelines/operations/pipeline.py:33-49
第 2 层:run_pipeline_per_dataset() 针对特定 Dataset 执行管线。它会检查管线是否已经运行过(资格校验逻辑),然后调用 run_tasks()。cognee/modules/pipelines/operations/pipeline.py:66-107
第 3 层:run_tasks() 编排数据集中所有数据项的执行。它会生成一个 pipeline_id,记录开始日志,并使用 asyncio.Semaphore 根据 data_per_batch 控制并发度。cognee/modules/pipelines/operations/run_tasks.py:74-117
第 4 层:run_tasks_base() 驱动顺序任务执行的递归引擎。它决定下一个任务的批次大小,并调用 handle_task()。cognee/modules/pipelines/operations/run_tasks_base.py:182-197
第 5 层:handle_task() 为任务执行添加可观测性(OpenTelemetry 跨度)、遥测和来源标记。它执行 running_task,然后递归调用 run_tasks_base 处理管线中的剩余任务。cognee/modules/pipelines/operations/run_tasks_base.py:123-183
批处理与并行化
Cognee 使用两种类型的批处理来优化性能:
- 数据级并发(
data_per_batch):在run_tasks()中,使用信号量限制同时处理的Data项数量(默认值为 20)。cognee/modules/pipelines/operations/run_tasks.py:97-117 - 任务级批处理(
batch_size):在Task内部,来自上一阶段的多个结果会先被合并成一个批次,然后再传递给当前任务的executable。这对于基于大语言模型(LLM)的提取至关重要,可以减少 API 开销。cognee/modules/pipelines/operations/run_tasks_base.py:166、cognee/modules/pipelines/tasks/task.py:11
来源: cognee/modules/pipelines/operations/run_tasks.py:97-117、cognee/modules/pipelines/operations/run_tasks_base.py:166
数据来源与血缘
管线生成的每个 DataPoint 都会被打上元数据标记,以追踪其来源。该逻辑在 _stamp_provenance() 中实现。cognee/modules/pipelines/operations/run_tasks_base.py:33-91
来源标记逻辑
当任务生成数据时,handle_task() 会调用 _stamp_provenance(),该方法会递归遍历 DataPoint 及其字段:
source_pipeline:管线的名称(例如 "cognify")。cognee/modules/pipelines/operations/run_tasks_base.py:51-52source_task:生成数据的executable函数名称。cognee/modules/pipelines/operations/run_tasks_base.py:53-54source_user:运行管线的用户的邮箱或 ID。cognee/modules/pipelines/operations/run_tasks_base.py:55-56source_node_set:从输入数据传播而来,用于对相关图元素进行分组。cognee/modules/pipelines/operations/run_tasks_base.py:59-63source_content_hash:追踪原始数据内容的哈希值。cognee/modules/pipelines/operations/run_tasks_base.py:65-70
来源可视化
cognee/modules/visualization/cognee_network_visualization.py 中的可视化系统使用这些字段生成确定性的颜色映射,使用户能够直观地区分哪些任务或用户创建了知识图谱的特定部分。cognee/modules/visualization/cognee_network_visualization.py:11-19、cognee/modules/visualization/cognee_network_visualization.py:85-88
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-91、cognee/modules/visualization/cognee_network_visualization.py:85-96
错误处理
Cognee 在多个层级处理错误,以确保管线的健壮性:
- 数据项级错误:在
run_tasks()中,asyncio.gather调用时设置了return_exceptions=True。异常会被捕获、记录,并转换为针对该特定数据项的PipelineRunErrored对象。cognee/modules/pipelines/operations/run_tasks.py:119-141 - 管线级错误:如果任何数据项失败,会抛出
PipelineRunFailedError以指示整个管线运行失败。cognee/modules/pipelines/operations/run_tasks.py:146-150 - 日志记录:所有开始、完成和错误信息都会通过
log_pipeline_run_start、log_pipeline_run_complete和log_pipeline_run_error持久化记录。cognee/modules/pipelines/operations/run_tasks.py:75、cognee/modules/pipelines/operations/run_tasks.py:151-153、cognee/modules/pipelines/operations/run_tasks.py:171-173
来源: cognee/modules/pipelines/operations/run_tasks.py:119-184
分布式执行
Cognee 支持使用 Modal 进行分布式执行。这是通过在 run_tasks 函数上添加 @override_run_tasks(run_tasks_distributed) 装饰器来实现的。cognee/modules/pipelines/operations/run_tasks.py:55
如果环境变量 COGNEE_DISTRIBUTED 设置为 True,执行流程会路由到 run_tasks_distributed(),该函数通过 graph_saving_worker 或 data_point_saving_worker 等工作进程处理任务的远程执行。cognee/modules/pipelines/operations/run_tasks.py:40-48
来源: cognee/modules/pipelines/operations/run_tasks.py:36-55、cognee/modules/pipelines/operations/run_tasks_distributed.py