核心数据管线
核心数据管线
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
cognee/api/v1/add/add.pycognee/api/v1/cognify/cognify.pycognee/infrastructure/engine/models/DataPoint.pycognee/modules/chunking/models/DocumentChunk.pycognee/modules/data/models/Data.pycognee/modules/engine/models/Entity.pycognee/modules/engine/models/EntityType.pycognee/modules/pipelines/operations/pipeline.pycognee/modules/pipelines/operations/run_tasks.pycognee/modules/pipelines/operations/run_tasks_base.pycognee/modules/pipelines/operations/run_tasks_with_telemetry.pycognee/modules/visualization/cognee_network_visualization.pycognee/shared/CodeGraphEntities.pycognee/tasks/ingestion/ingest_data.pycognee/tasks/ingestion/save_data_item_to_storage.pycognee/tasks/summarization/models.pycognee/tests/unit/modules/visualization/visualization_test.pypoetry.lockpyproject.tomluv.lock
核心数据管线是 Cognee 的主要数据处理系统,负责将原始输入数据转换为可查询的知识图谱。它基于任务驱动的架构,协调从初始数据入库到知识提取和存储的完整流程,支持增量加载、批量处理以及全面的溯源追踪。
本文档涵盖整体管线架构和执行模型。关于各管线阶段的详细信息,请参见:
- 数据入库详情:数据入库(cognee.add)
- 任务配置与执行:管线任务与执行
- 知识图谱构建:知识图谱生成(cognee.cognify)
- 时序图谱:时序知识图谱
- 自定义图谱模型:自定义图谱模型
- 图谱增强:Memify 与图谱增强
- 网页抓取:网页抓取与 URL 入库
管线架构总览
核心数据管线采用两阶段架构,以 add 和 cognify 操作为核心,两者均委托给统一的 run_pipeline 函数 cognee/modules/pipelines/operations/pipeline.py:33-44。
管线编排层级
标题:管线执行流程
来源:cognee/api/v1/add/add.py:92-114、cognee/api/v1/cognify/cognify.py:43-59、cognee/modules/pipelines/operations/pipeline.py:33-44、cognee/modules/pipelines/operations/run_tasks.py:56-65、cognee/modules/pipelines/operations/run_tasks_base.py:123-130
管线中的数据流
管线通过不同阶段处理数据,每个阶段生成 DataPoint 对象并流入下一阶段。run_pipeline 函数协调跨数据集执行 cognee/modules/pipelines/operations/pipeline.py:52-63,而 run_tasks 使用 asyncio.Semaphore 控制并发,处理每个数据集的执行 cognee/modules/pipelines/operations/run_tasks.py:97-122。
自然语言到代码实体空间
标题:数据转换管线
来源:cognee/tasks/ingestion/ingest_data.py:26-34、cognee/api/v1/cognify/cognify.py:82-89、cognee/infrastructure/engine/models/DataPoint.py:27-63、cognee/modules/data/models/Data.py:1-40
基于任务的执行模型
管线使用 Task 抽象来封装处理步骤。每个 Task 包装一个 executable 函数 cognee/modules/pipelines/tasks/task.py:1-10。run_tasks_base 函数按顺序执行任务,而 handle_task 管理工作流,包括日志记录、遥测和错误处理 cognee/modules/pipelines/operations/run_tasks_base.py:123-143。
溯源标记系统
管线生成的每个 DataPoint 都会自动标记溯源元数据。_stamp_provenance 函数递归遍历数据结构,标记所有尚未访问的 DataPoint 实例 cognee/modules/pipelines/operations/run_tasks_base.py:33-41。
| 溯源字段 | 类型 | 描述 |
|---|---|---|
source_pipeline | str | 管线名称(例如 cognify_pipeline)cognee/infrastructure/engine/models/DataPoint.py:57 |
source_task | str | 任务函数名称(例如 extract_graph_and_summarize)cognee/infrastructure/engine/models/DataPoint.py:58 |
source_user | str | 触发管线的用户邮箱或 ID cognee/infrastructure/engine/models/DataPoint.py:60 |
source_node_set | str | 用于分组相关数据的节点集标识符 cognee/infrastructure/engine/models/DataPoint.py:59 |
source_content_hash | str | 原始源内容的哈希值 cognee/infrastructure/engine/models/DataPoint.py:61 |
来源:cognee/infrastructure/engine/models/DataPoint.py:57-61、cognee/modules/pipelines/operations/run_tasks_base.py:33-85
增量加载与状态追踪
当 incremental_loading=True 时,管线会通过 check_pipeline_run_qualification 检查数据集或特定数据项是否基于其现有状态符合处理条件 cognee/modules/pipelines/operations/pipeline.py:79-86。
内容变更检测
如果数据被重新添加,ingest_data 会将 new_content_hash 与现有哈希值进行比较。如果内容发生变化,元数据会被更新,并且 pipeline_status 通常会在关系数据库中重置 cognee/tasks/ingestion/ingest_data.py:149-165。
# 来自 cognee/tasks/ingestion/ingest_data.py:149-165 的逻辑
new_content_hash = original_file_metadata["content_hash"]
content_changed = str(data_point.content_hash) != str(new_content_hash)
if content_changed:
data_point.content_hash = new_content_hash
# 更新位置和元数据...
来源:cognee/tasks/ingestion/ingest_data.py:149-165、cognee/modules/pipelines/operations/pipeline.py:79-86
可视化与监控
Cognee 提供网络可视化工具,用于检查生成的知识图谱。cognee_network_visualization 函数会生成图谱数据的交互式 HTML 表示 cognee/modules/visualization/cognee_network_visualization.py:22-24。
溯源可视化
可视化工具会为不同的溯源值(任务、管线、用户)生成确定性颜色,帮助用户理解数据血缘关系 cognee/modules/visualization/cognee_network_visualization.py:11-19。
| 可视化模式 | 代码参考 |
|---|---|
| 节点类型颜色 | cognee_network_visualization 中的 color_map cognee/modules/visualization/cognee_network_visualization.py:27-39 |
| 溯源颜色 | _generate_provenance_colors cognee/modules/visualization/cognee_network_visualization.py:11-19 |
| 多用户聚合 | aggregate_multi_user_graphs cognee/modules/visualization/cognee_network_visualization.py:115-125 |
来源:cognee/modules/visualization/cognee_network_visualization.py:11-112、cognee/modules/visualization/cognee_network_visualization.py:115-157