数据溯源与血缘
数据溯源与血缘
相关源文件
本章引用的主要源码文件:
cognee/infrastructure/engine/models/DataPoint.pycognee/infrastructure/utils/run_sync.pycognee/modules/chunking/models/DocumentChunk.pycognee/modules/data/methods/get_authorized_existing_datasets.pycognee/modules/data/methods/load_or_create_datasets.pycognee/modules/engine/models/Entity.pycognee/modules/engine/models/EntityType.pycognee/modules/pipelines/layers/pipeline_execution_mode.pycognee/modules/pipelines/methods/__init__.pycognee/modules/pipelines/operations/run_tasks_base.pycognee/modules/pipelines/operations/run_tasks_distributed.pycognee/modules/pipelines/operations/run_tasks_with_telemetry.pycognee/modules/users/permissions/methods/give_permission_on_dataset.pycognee/modules/visualization/cognee_network_visualization.pycognee/shared/CodeGraphEntities.pycognee/tasks/summarization/models.pycognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.pycognee/tests/unit/modules/visualization/visualization_test.pydistributed/entrypoint.pydistributed/tasks/queued_add_data_points.pydistributed/tasks/queued_add_edges.pydistributed/tasks/queued_add_nodes.pydistributed/workers/data_point_saving_worker.pydistributed/workers/graph_saving_worker.py
目的与范围
本文档描述了 Cognee 的数据溯源与血缘追踪系统,该系统记录了数据点在处理管线中流转时的来源和转换历史。溯源元数据使用户能够追踪数据的来源、创建或修改数据的任务和管线,以及数据所属的用户或数据集。
关于基于 OpenTelemetry 的执行追踪(追踪*代码如何执行*而非*数据来自何处*),请参阅 9.2 OpenTelemetry 追踪。
关于核心数据模型结构,请参阅 8.1 核心数据模型。
关于管线执行,请参阅 3.2 管线任务与执行。
DataPoint 模型中的溯源字段
DataPoint 类是 Cognee 中所有数据实体的基础模型,包括 Entity、DocumentChunk、TextSummary 和 CodeFile。它包含在整个系统中追踪数据血缘的核心溯源字段。
字段定义
溯源字段说明:
| 字段 | 类型 | 用途 |
|---|---|---|
source_pipeline | str | None | 创建或最后修改此数据点的管线名称 cognee/infrastructure/engine/models/DataPoint.py:57 |
source_task | str | None | 创建或最后修改此数据点的任务函数名称 cognee/infrastructure/engine/models/DataPoint.py:58 |
source_node_set | str | None | 此数据点所属的逻辑节点集标识符 cognee/infrastructure/engine/models/DataPoint.py:59 |
source_user | str | None | 拥有此数据点的用户的电子邮件或 ID cognee/infrastructure/engine/models/DataPoint.py:60 |
source_content_hash | str | None | 用于派生此数据点的原始内容的哈希值 cognee/infrastructure/engine/models/DataPoint.py:61 |
version | int | 版本号,在更新时递增 cognee/infrastructure/engine/models/DataPoint.py:52 |
所有溯源字段都是可选的,默认值为 None。version 字段默认值为 1 cognee/infrastructure/engine/models/DataPoint.py:52。
来源: cognee/infrastructure/engine/models/DataPoint.py:44-64、cognee/modules/engine/models/Entity.py:7、cognee/modules/chunking/models/DocumentChunk.py:10、cognee/tasks/summarization/models.py:9、cognee/shared/CodeGraphEntities.py:35
管线执行期间的溯源标记
当 DataPoint 实例被管线任务创建或转换时,溯源元数据会自动标记到这些实例上。此标记操作发生在管线执行系统中的 _stamp_provenance 函数中。
标记流程
标记实现
_stamp_provenance 函数 cognee/modules/pipelines/operations/run_tasks_base.py:33-85 执行以下操作:
- 维护一个已访问集合
cognee/modules/pipelines/operations/run_tasks_base.py:46-49,以防止循环引用导致的无限递归。该集合通过PipelineContext在任务调用之间持久化cognee/modules/pipelines/operations/run_tasks_base.py:38-41。 - 仅在字段当前为
None时进行标记cognee/modules/pipelines/operations/run_tasks_base.py:51-56——通常保留现有的溯源信息。 - 从父 DataPoint 传播
node_set和content_hashcognee/modules/pipelines/operations/run_tasks_base.py:58-71。 - 递归处理嵌套的 DataPoint
cognee/modules/pipelines/operations/run_tasks_base.py:72-84,通过遍历所有model_fields实现。
标记操作从 handle_task cognee/modules/pipelines/operations/run_tasks_base.py:123-180 在任务执行期间调用:
# 提取上下文和输入哈希值
pipe_name = ctx.pipeline_name if ctx else None
input_node_set = _extract_node_set(args)
input_content_hash = _extract_content_hash(args)
user_label = getattr(user, "email", None) or (str(user.id) if user else None)
async for result_data in running_task.execute(args, kwargs, next_task_batch_size):
_stamp_provenance(
result_data,
pipe_name,
task_name,
visited=provenance_visited,
node_set=input_node_set,
user_label=user_label,
content_hash=input_content_hash,
)
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180
溯源传播模式
溯源元数据通过以下几种方式在系统中传播:
跨任务传播
节点集传播: _extract_node_set 函数 cognee/modules/pipelines/operations/run_tasks_base.py:93-102 扫描输入参数以查找现有的 source_node_set 值。 内容哈希传播: _extract_content_hash 函数 cognee/modules/pipelines/operations/run_tasks_base.py:105-121 从输入的 Data 或 DataPoint 对象中提取内容哈希,以确保派生实体与源材料保持关联。
分布式传播
在分布式执行环境(例如使用 Modal)中,溯源上下文通过 PipelineContext 传递 cognee/modules/pipelines/operations/run_tasks_distributed.py:71-76。run_tasks_on_modal 包装器确保当任务在远程工作节点上执行时,它们会接收到正确的 user、dataset 和 pipeline_name,以维护准确的血缘关系 cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81。
来源: cognee/modules/pipelines/operations/run_tasks_base.py:93-121、cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81
使用溯源数据进行可视化
图可视化系统利用溯源字段,使用户能够根据节点的来源对节点进行颜色编码和过滤。
基于溯源的颜色编码
可视化系统使用 _generate_provenance_colors 辅助函数 cognee/modules/visualization/cognee_network_visualization.py:11-19 为各种溯源维度生成确定性颜色映射:
| 维度 | 生成函数 | 描述 |
|---|---|---|
| 任务 | task_color_map | 根据创建节点的任务对节点着色 cognee/modules/visualization/cognee_network_visualization.py:85 |
| 管线 | pipeline_color_map | 根据管线名称对节点着色 cognee/modules/visualization/cognee_network_visualization.py:86 |
| 节点集 | node_set_color_map | 根据节点的逻辑分组对节点着色 cognee/modules/visualization/cognee_network_visualization.py:87 |
| 用户 | user_color_map | 根据所有者对节点着色 cognee/modules/visualization/cognee_network_visualization.py:88 |
多用户图聚合
aggregate_multi_user_graphs 函数 cognee/modules/visualization/cognee_network_visualization.py:115-157 在保留或注入溯源信息的同时,合并来自多个用户的图数据:
来源: cognee/modules/visualization/cognee_network_visualization.py:11-157、cognee/tests/unit/modules/visualization/visualization_test.py:79-92
总结:溯源数据流
溯源系统确保了从初始入库 cognee/modules/pipelines/operations/run_tasks_distributed.py:84-110 到 cognify 中的转换任务 cognee/modules/pipelines/operations/run_tasks_base.py:123-180,再到最终可视化 cognee/modules/visualization/cognee_network_visualization.py:22-112 的完整可追溯性。
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180、cognee/modules/pipelines/operations/run_tasks_distributed.py:47-110、cognee/modules/visualization/cognee_network_visualization.py:11-157