agentic_huge_data_base / wiki
页面 Cognee · 3 核心数据管线·DeepWiki 中文全文译文

3 · 核心数据管线(Core Data Pipeline)

记忆管道与知识图谱构建 · 聚焦本章的模块关系、源码依据与实现要点。

项目Cognee 章节3 状态全文译文 模块测试、发布与运维、系统架构、工作流与编排、入库与解析
源码线索
  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/operations/pipeline.py
  • cognee/modules/pipelines/operations/run_tasks.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
模块标签
  • 测试、发布与运维
  • 系统架构
  • 工作流与编排
  • 入库与解析
  • 图谱与关系

章节正文

核心数据管线

核心数据管线

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/operations/pipeline.py
  • cognee/modules/pipelines/operations/run_tasks.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
  • cognee/modules/pipelines/operations/run_tasks_with_telemetry.py
  • cognee/modules/visualization/cognee_network_visualization.py
  • cognee/shared/CodeGraphEntities.py
  • cognee/tasks/ingestion/ingest_data.py
  • cognee/tasks/ingestion/save_data_item_to_storage.py
  • cognee/tasks/summarization/models.py
  • cognee/tests/unit/modules/visualization/visualization_test.py
  • poetry.lock
  • pyproject.toml
  • uv.lock

核心数据管线是 Cognee 的主要数据处理系统,负责将原始输入数据转换为可查询的知识图谱。它基于任务驱动的架构,协调从初始数据入库到知识提取和存储的完整流程,支持增量加载、批量处理以及全面的溯源追踪。

本文档涵盖整体管线架构和执行模型。关于各管线阶段的详细信息,请参见:

管线架构总览

核心数据管线采用两阶段架构,以 addcognify 操作为核心,两者均委托给统一的 run_pipeline 函数 cognee/modules/pipelines/operations/pipeline.py:33-44

管线编排层级

标题:管线执行流程

Cognee · 管线编排层级 · 图 1
Cognee · 管线编排层级 · 图 1

来源:cognee/api/v1/add/add.py:92-114cognee/api/v1/cognify/cognify.py:43-59cognee/modules/pipelines/operations/pipeline.py:33-44cognee/modules/pipelines/operations/run_tasks.py:56-65cognee/modules/pipelines/operations/run_tasks_base.py:123-130

管线中的数据流

管线通过不同阶段处理数据,每个阶段生成 DataPoint 对象并流入下一阶段。run_pipeline 函数协调跨数据集执行 cognee/modules/pipelines/operations/pipeline.py:52-63,而 run_tasks 使用 asyncio.Semaphore 控制并发,处理每个数据集的执行 cognee/modules/pipelines/operations/run_tasks.py:97-122

自然语言到代码实体空间

标题:数据转换管线

Cognee · 自然语言到代码实体空间 · 图 2
Cognee · 自然语言到代码实体空间 · 图 2

来源:cognee/tasks/ingestion/ingest_data.py:26-34cognee/api/v1/cognify/cognify.py:82-89cognee/infrastructure/engine/models/DataPoint.py:27-63cognee/modules/data/models/Data.py:1-40

基于任务的执行模型

管线使用 Task 抽象来封装处理步骤。每个 Task 包装一个 executable 函数 cognee/modules/pipelines/tasks/task.py:1-10run_tasks_base 函数按顺序执行任务,而 handle_task 管理工作流,包括日志记录、遥测和错误处理 cognee/modules/pipelines/operations/run_tasks_base.py:123-143

溯源标记系统

管线生成的每个 DataPoint 都会自动标记溯源元数据。_stamp_provenance 函数递归遍历数据结构,标记所有尚未访问的 DataPoint 实例 cognee/modules/pipelines/operations/run_tasks_base.py:33-41

溯源字段类型描述
source_pipelinestr管线名称(例如 cognify_pipelinecognee/infrastructure/engine/models/DataPoint.py:57
source_taskstr任务函数名称(例如 extract_graph_and_summarizecognee/infrastructure/engine/models/DataPoint.py:58
source_userstr触发管线的用户邮箱或 ID cognee/infrastructure/engine/models/DataPoint.py:60
source_node_setstr用于分组相关数据的节点集标识符 cognee/infrastructure/engine/models/DataPoint.py:59
source_content_hashstr原始源内容的哈希值 cognee/infrastructure/engine/models/DataPoint.py:61

来源:cognee/infrastructure/engine/models/DataPoint.py:57-61cognee/modules/pipelines/operations/run_tasks_base.py:33-85

增量加载与状态追踪

incremental_loading=True 时,管线会通过 check_pipeline_run_qualification 检查数据集或特定数据项是否基于其现有状态符合处理条件 cognee/modules/pipelines/operations/pipeline.py:79-86

内容变更检测

如果数据被重新添加,ingest_data 会将 new_content_hash 与现有哈希值进行比较。如果内容发生变化,元数据会被更新,并且 pipeline_status 通常会在关系数据库中重置 cognee/tasks/ingestion/ingest_data.py:149-165

# 来自 cognee/tasks/ingestion/ingest_data.py:149-165 的逻辑
new_content_hash = original_file_metadata["content_hash"]
content_changed = str(data_point.content_hash) != str(new_content_hash)

if content_changed:
    data_point.content_hash = new_content_hash
    # 更新位置和元数据...

来源:cognee/tasks/ingestion/ingest_data.py:149-165cognee/modules/pipelines/operations/pipeline.py:79-86

可视化与监控

Cognee 提供网络可视化工具,用于检查生成的知识图谱。cognee_network_visualization 函数会生成图谱数据的交互式 HTML 表示 cognee/modules/visualization/cognee_network_visualization.py:22-24

溯源可视化

可视化工具会为不同的溯源值(任务、管线、用户)生成确定性颜色,帮助用户理解数据血缘关系 cognee/modules/visualization/cognee_network_visualization.py:11-19

可视化模式代码参考
节点类型颜色cognee_network_visualization 中的 color_map cognee/modules/visualization/cognee_network_visualization.py:27-39
溯源颜色_generate_provenance_colors cognee/modules/visualization/cognee_network_visualization.py:11-19
多用户聚合aggregate_multi_user_graphs cognee/modules/visualization/cognee_network_visualization.py:115-125

来源:cognee/modules/visualization/cognee_network_visualization.py:11-112cognee/modules/visualization/cognee_network_visualization.py:115-157