agentic_huge_data_base / wiki
页面 Cognee · 9.3 数据溯源与血缘·DeepWiki 中文全文译文

9.3 · 数据溯源与血缘(Data Provenance and Lineage)

记忆管道与知识图谱构建 · 聚焦本章的模块关系、源码依据与实现要点。

项目Cognee 章节9.3 状态全文译文 模块文档对象与元数据、图谱与关系、工作流与编排、测试、发布与运维
源码线索
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
模块标签
  • 文档对象与元数据
  • 图谱与关系
  • 工作流与编排
  • 测试、发布与运维
  • 认证、权限与安全

章节正文

数据溯源与血缘

数据溯源与血缘

相关源文件

本章引用的主要源码文件:

  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
  • cognee/modules/pipelines/operations/run_tasks_distributed.py
  • cognee/modules/pipelines/operations/run_tasks_with_telemetry.py
  • cognee/modules/users/permissions/methods/give_permission_on_dataset.py
  • cognee/modules/visualization/cognee_network_visualization.py
  • cognee/shared/CodeGraphEntities.py
  • cognee/tasks/summarization/models.py
  • cognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.py
  • cognee/tests/unit/modules/visualization/visualization_test.py
  • distributed/entrypoint.py
  • distributed/tasks/queued_add_data_points.py
  • distributed/tasks/queued_add_edges.py
  • distributed/tasks/queued_add_nodes.py
  • distributed/workers/data_point_saving_worker.py
  • distributed/workers/graph_saving_worker.py

目的与范围

本文档描述了 Cognee 的数据溯源与血缘追踪系统,该系统记录了数据点在处理管线中流转时的来源和转换历史。溯源元数据使用户能够追踪数据的来源、创建或修改数据的任务和管线,以及数据所属的用户或数据集。

关于基于 OpenTelemetry 的执行追踪(追踪*代码如何执行*而非*数据来自何处*),请参阅 9.2 OpenTelemetry 追踪

关于核心数据模型结构,请参阅 8.1 核心数据模型

关于管线执行,请参阅 3.2 管线任务与执行

DataPoint 模型中的溯源字段

DataPoint 类是 Cognee 中所有数据实体的基础模型,包括 EntityDocumentChunkTextSummaryCodeFile。它包含在整个系统中追踪数据血缘的核心溯源字段。

字段定义
Cognee · 字段定义 · 图 1
Cognee · 字段定义 · 图 1

溯源字段说明:

字段类型用途
source_pipelinestr | None创建或最后修改此数据点的管线名称 cognee/infrastructure/engine/models/DataPoint.py:57
source_taskstr | None创建或最后修改此数据点的任务函数名称 cognee/infrastructure/engine/models/DataPoint.py:58
source_node_setstr | None此数据点所属的逻辑节点集标识符 cognee/infrastructure/engine/models/DataPoint.py:59
source_userstr | None拥有此数据点的用户的电子邮件或 ID cognee/infrastructure/engine/models/DataPoint.py:60
source_content_hashstr | None用于派生此数据点的原始内容的哈希值 cognee/infrastructure/engine/models/DataPoint.py:61
versionint版本号,在更新时递增 cognee/infrastructure/engine/models/DataPoint.py:52

所有溯源字段都是可选的,默认值为 Noneversion 字段默认值为 1 cognee/infrastructure/engine/models/DataPoint.py:52

来源: cognee/infrastructure/engine/models/DataPoint.py:44-64cognee/modules/engine/models/Entity.py:7cognee/modules/chunking/models/DocumentChunk.py:10cognee/tasks/summarization/models.py:9cognee/shared/CodeGraphEntities.py:35

管线执行期间的溯源标记

DataPoint 实例被管线任务创建或转换时,溯源元数据会自动标记到这些实例上。此标记操作发生在管线执行系统中的 _stamp_provenance 函数中。

标记流程
Cognee · 标记流程 · 图 2
Cognee · 标记流程 · 图 2
标记实现

_stamp_provenance 函数 cognee/modules/pipelines/operations/run_tasks_base.py:33-85 执行以下操作:

  1. 维护一个已访问集合 cognee/modules/pipelines/operations/run_tasks_base.py:46-49,以防止循环引用导致的无限递归。该集合通过 PipelineContext 在任务调用之间持久化 cognee/modules/pipelines/operations/run_tasks_base.py:38-41
  2. 仅在字段当前为 None 时进行标记 cognee/modules/pipelines/operations/run_tasks_base.py:51-56——通常保留现有的溯源信息。
  3. 从父 DataPoint 传播 node_setcontent_hash cognee/modules/pipelines/operations/run_tasks_base.py:58-71
  4. 递归处理嵌套的 DataPoint cognee/modules/pipelines/operations/run_tasks_base.py:72-84,通过遍历所有 model_fields 实现。

标记操作从 handle_task cognee/modules/pipelines/operations/run_tasks_base.py:123-180 在任务执行期间调用:

# 提取上下文和输入哈希值
pipe_name = ctx.pipeline_name if ctx else None
input_node_set = _extract_node_set(args)
input_content_hash = _extract_content_hash(args)
user_label = getattr(user, "email", None) or (str(user.id) if user else None)

async for result_data in running_task.execute(args, kwargs, next_task_batch_size):
    _stamp_provenance(
        result_data,
        pipe_name,
        task_name,
        visited=provenance_visited,
        node_set=input_node_set,
        user_label=user_label,
        content_hash=input_content_hash,
    )

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180

溯源传播模式

溯源元数据通过以下几种方式在系统中传播:

跨任务传播
Cognee · 跨任务传播 · 图 3
Cognee · 跨任务传播 · 图 3

节点集传播: _extract_node_set 函数 cognee/modules/pipelines/operations/run_tasks_base.py:93-102 扫描输入参数以查找现有的 source_node_set 值。 内容哈希传播: _extract_content_hash 函数 cognee/modules/pipelines/operations/run_tasks_base.py:105-121 从输入的 DataDataPoint 对象中提取内容哈希,以确保派生实体与源材料保持关联。

分布式传播

在分布式执行环境(例如使用 Modal)中,溯源上下文通过 PipelineContext 传递 cognee/modules/pipelines/operations/run_tasks_distributed.py:71-76run_tasks_on_modal 包装器确保当任务在远程工作节点上执行时,它们会接收到正确的 userdatasetpipeline_name,以维护准确的血缘关系 cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81

来源: cognee/modules/pipelines/operations/run_tasks_base.py:93-121cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81

使用溯源数据进行可视化

图可视化系统利用溯源字段,使用户能够根据节点的来源对节点进行颜色编码和过滤。

基于溯源的颜色编码

可视化系统使用 _generate_provenance_colors 辅助函数 cognee/modules/visualization/cognee_network_visualization.py:11-19 为各种溯源维度生成确定性颜色映射:

维度生成函数描述
任务task_color_map根据创建节点的任务对节点着色 cognee/modules/visualization/cognee_network_visualization.py:85
管线pipeline_color_map根据管线名称对节点着色 cognee/modules/visualization/cognee_network_visualization.py:86
节点集node_set_color_map根据节点的逻辑分组对节点着色 cognee/modules/visualization/cognee_network_visualization.py:87
用户user_color_map根据所有者对节点着色 cognee/modules/visualization/cognee_network_visualization.py:88
多用户图聚合

aggregate_multi_user_graphs 函数 cognee/modules/visualization/cognee_network_visualization.py:115-157 在保留或注入溯源信息的同时,合并来自多个用户的图数据:

Cognee · 多用户图聚合 · 图 4
Cognee · 多用户图聚合 · 图 4

来源: cognee/modules/visualization/cognee_network_visualization.py:11-157cognee/tests/unit/modules/visualization/visualization_test.py:79-92

总结:溯源数据流

Cognee · 总结:溯源数据流 · 图 5
Cognee · 总结:溯源数据流 · 图 5

溯源系统确保了从初始入库 cognee/modules/pipelines/operations/run_tasks_distributed.py:84-110cognify 中的转换任务 cognee/modules/pipelines/operations/run_tasks_base.py:123-180,再到最终可视化 cognee/modules/visualization/cognee_network_visualization.py:22-112 的完整可追溯性。

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180cognee/modules/pipelines/operations/run_tasks_distributed.py:47-110cognee/modules/visualization/cognee_network_visualization.py:11-157