agentic_huge_data_base / wiki
页面 Cognee · 3.1 数据入库 (cognee.add)·DeepWiki 中文全文译文

3.1 · 数据入库 (cognee.add)(Data Ingestion (cognee.add))

记忆管道与知识图谱构建 · 聚焦本章的模块关系、源码依据与实现要点。

项目Cognee 章节3.1 状态全文译文 模块界面与交互、文档对象与元数据、存储与持久化、入库与解析
源码线索
  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/eval_framework/modal_eval_dashboard.py
  • cognee/infrastructure/databases/vector/embeddings/EmbeddingEngine.py
  • cognee/infrastructure/files/storage/LocalFileStorage.py
  • cognee/infrastructure/files/storage/S3FileStorage.py
  • cognee/infrastructure/files/storage/s3_config.py
  • cognee/infrastructure/files/utils/extract_text_from_file.py
  • cognee/infrastructure/files/utils/get_file_metadata.py
  • cognee/infrastructure/files/utils/guess_file_type.py
模块标签
  • 界面与交互
  • 文档对象与元数据
  • 存储与持久化
  • 入库与解析
  • 图谱与关系

章节正文

数据入库 (cognee.add)

数据入库(cognee.add)

相关源文件

本章引用的主要源码文件:

  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/eval_framework/modal_eval_dashboard.py
  • cognee/infrastructure/databases/vector/embeddings/EmbeddingEngine.py
  • cognee/infrastructure/files/storage/LocalFileStorage.py
  • cognee/infrastructure/files/storage/S3FileStorage.py
  • cognee/infrastructure/files/storage/s3_config.py
  • cognee/infrastructure/files/utils/extract_text_from_file.py
  • cognee/infrastructure/files/utils/get_file_metadata.py
  • cognee/infrastructure/files/utils/guess_file_type.py
  • cognee/infrastructure/files/utils/is_text_content.py
  • cognee/infrastructure/files/utils/open_data_file.py
  • cognee/infrastructure/loaders/LoaderEngine.py
  • cognee/infrastructure/loaders/LoaderInterface.py
  • cognee/infrastructure/loaders/__init__.py
  • cognee/infrastructure/loaders/core/__init__.py
  • cognee/infrastructure/loaders/create_loader_engine.py
  • cognee/infrastructure/loaders/external/__init__.py
  • cognee/infrastructure/loaders/get_loader_engine.py
  • cognee/infrastructure/loaders/supported_loaders.py
  • cognee/modules/chunking/CsvChunker.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/data/processing/document_types/CsvDocument.py
  • cognee/modules/data/processing/document_types/DltRowDocument.py
  • cognee/modules/data/processing/document_types/__init__.py
  • cognee/modules/graph/methods/delete_data_related_edges.py
  • cognee/modules/graph/methods/delete_data_related_nodes.py
  • cognee/modules/graph/methods/get_dataset_related_edges.py
  • cognee/modules/graph/methods/get_dataset_related_nodes.py
  • cognee/modules/ingestion/classify.py
  • cognee/modules/ingestion/data_types/BinaryData.py
  • cognee/modules/ingestion/data_types/IngestionData.py
  • cognee/modules/ingestion/data_types/S3BinaryData.py
  • cognee/modules/ingestion/data_types/TextData.py
  • cognee/modules/ingestion/data_types/__init__.py
  • cognee/modules/pipelines/operations/pipeline.py
  • cognee/modules/pipelines/operations/run_tasks.py
  • cognee/modules/pipelines/operations/run_tasks_data_item.py
  • cognee/tasks/ingestion/config.py
  • cognee/tasks/ingestion/data_item_to_text_file.py
  • cognee/tasks/ingestion/dlt_utils.py
  • cognee/tasks/ingestion/exceptions/__init__.py
  • cognee/tasks/ingestion/exceptions/exceptions.py
  • cognee/tasks/ingestion/extract_dlt_fk_edges.py
  • cognee/tasks/ingestion/get_dlt_destination.py
  • cognee/tasks/ingestion/ingest_data.py
  • cognee/tasks/ingestion/ingest_dlt_source.py
  • cognee/tasks/ingestion/resolve_data_directories.py
  • cognee/tasks/ingestion/resolve_dlt_sources.py
  • cognee/tasks/ingestion/save_data_item_to_storage.py
  • cognee/tests/test_advanced_pdf_loader.py
  • cognee/tests/test_s3_file_storage.py
  • examples/demos/dlt_ingestion_example.py
  • examples/demos/test_data/employees_ontology.owl
  • poetry.lock
  • pyproject.toml
  • uv.lock

目的与范围

本文档描述了 Cognee 中的数据入库子系统,主要通过 cognee.add() 函数实现。数据入库是 Cognee 工作流的第一个阶段,负责接收来自各种来源的原始数据,并为其后续的知识图谱处理做好准备。

范围:本页面涵盖入库机制、支持的数据格式、数据集管理、增量加载和批处理。关于将入库数据加工为知识图谱的信息,请参见知识图谱生成(cognee.cognify)(3.3)

概述

cognee.add() 函数(位于 cognee/api/v1/add/add.py:92-236)是数据入库的入口点。它将多种输入格式(包括本地文件、URL、二进制流和 DLT 源)归一化为一致的存储表示形式。它在关系数据库中创建 Data 记录,并将处理后的文本文件存储起来,供下游的 cognify() 处理使用。

关键操作

  1. 数据解析:通过 resolve_data_directoriessave_data_item_to_storage 解析文件路径、目录结构,并验证可访问性。
  2. 内容提取:使用 LoaderEngine 从各种文件格式中提取文本内容。
  3. 数据集存储:使用 Data 模型将处理后的内容和元数据存储在关系数据库中。
  4. 元数据追踪:记录文件哈希值、时间戳和用户权限,用于增量加载和变更检测。

图表:add() 执行流程及代码实体引用

Cognee · 概述 · 图 1
Cognee · 概述 · 图 1

来源:cognee/api/v1/add/add.py:92-236, cognee/modules/pipelines/operations/pipeline.py:33-108, cognee/modules/pipelines/operations/run_tasks.py:55-184, cognee/tasks/ingestion/ingest_data.py:26-235, cognee/tasks/ingestion/save_data_item_to_storage.py:27-98

函数签名与参数

add() 函数定义于 cognee/api/v1/add/add.py:92-114

async def add(
    data: Union[
        BinaryIO,
        list[BinaryIO],
        str,
        list[str],
        DataItem,
        list[DataItem],
        Any,  # DltResource、SourceFactory 或其他 dlt 类型
    ],
    dataset_name: str = "main_dataset",
    user: User = None,
    node_set: Optional[List[str]] = None,
    vector_db_config: dict = None,
    graph_db_config: dict = None,
    dataset_id: Optional[UUID] = None,
    preferred_loaders: Optional[List[Union[str, dict[str, dict[str, Any]]]]] = None,
    incremental_loading: bool = True,
    data_per_batch: Optional[int] = 20,
    importance_weight: Optional[float] = 0.5,
    run_in_background: bool = False,
    **kwargs,
):
参数类型默认值描述
dataUnion[...]必填输入数据:文本字符串、文件路径、URL、BinaryIO 流、DataItem 包装器或 DLT 源。
dataset_namestr"main_dataset"用于组织数据的目标数据集名称。
userUserNone用于认证和数据访问的用户上下文。
node_setOptional[List[str]]None存储在 Data.node_set 中的访问控制标签。
dataset_idOptional[UUID]None用于替代 dataset_name 的特定数据集 UUID。
preferred_loadersOptional[List]None首选加载器名称或配置的列表。
incremental_loadingboolTrue如果为 True,则跳过已基于 pipeline_status 完成处理的数据项。
data_per_batchint20并行任务执行的批次大小。

来源:cognee/api/v1/add/add.py:92-171

数据类型解析与归一化

输入归一化决策树

save_data_item_to_storage() 函数(位于 cognee/tasks/ingestion/save_data_item_to_storage.py:27-98)实现了一个归一化决策树,用于处理各种输入格式:

Cognee · 输入归一化决策树 · 图 2
Cognee · 输入归一化决策树 · 图 2

来源:cognee/tasks/ingestion/save_data_item_to_storage.py:27-98

支持的数据类型

文件格式与加载器

Cognee 使用 LoaderEngine 来管理文件加载器。加载器根据优先级和文件类型(MIME/扩展名)进行选择。

类别支持的格式示例加载器
文本.txt.md.csvtext_loadercsv_loader
文档.pdf.docx.pptxpypdf_loaderdocling_loader
多媒体.png.jpg.mp3.wavimage_loaderaudio_loader
代码.py.js.tstext_loader

data_item_to_text_file 函数(位于 cognee/tasks/ingestion/data_item_to_text_file.py:36-79)负责协调将归一化路径转换为 Cognee 存储中的文本文件,通过注册的加载器调用 loader.load_file() 实现。

来源:cognee/tasks/ingestion/data_item_to_text_file.py:36-79, cognee/api/v1/add/add.py:136-142

DLT(数据加载工具)集成

Cognee 通过 resolve_dlt_sources(位于 cognee/tasks/ingestion/resolve_dlt_sources.py)支持从 DLT 源进行入库。

  • 解析:DLT 源和资源在处理前会被展开为单个数据项 cognee/api/v1/add/add.py:208-213
  • 标识:对于 DLT 数据项,优先使用源提供的稳定 data_id,而非基于内容哈希的 ID cognee/tasks/ingestion/ingest_data.py:125-128

来源:cognee/api/v1/add/add.py:208-213, cognee/tasks/ingestion/ingest_data.py:125-128

入库管线架构

入库过程由 run_tasks(位于 cognee/modules/pipelines/operations/run_tasks.py:55-184)进行编排。

批处理与并发

Cognee 使用基于信号量的并发模型来处理大量数据项。

  • 批处理:数据按照 data_per_batch(默认 20)定义的批次大小进行处理。
  • 并发asyncio.Semaphore(data_per_batch) 限制并发执行 run_tasks_data_item 的数量,以防止资源耗尽 cognee/modules/pipelines/operations/run_tasks.py:97-118
  • 分布式模式:如果启用了 COGNEE_DISTRIBUTED,则通过 @override_run_tasks 装饰器将任务交给 run_tasks_distributed 处理 cognee/modules/pipelines/operations/run_tasks.py:55
增量加载

增量加载通过检查存储在 Data 模型中的 pipeline_status 进行管理 cognee/modules/data/models/Data.py:34

  1. 标识ingestion.identify(classified_data, user) 基于内容哈希和所有者 ID 生成 data_id cognee/tasks/ingestion/ingest_data.py:117
  2. 状态检查ingest_data 检查具有该 ID 的 Data 记录是否已存在于关系数据库中 cognee/tasks/ingestion/ingest_data.py:133-136
  3. 变更检测:如果记录存在,Cognee 会比较当前的 content_hash 与存储的哈希值 cognee/tasks/ingestion/ingest_data.py:150-151
  4. 更新:如果内容未更改,则更新元数据,但避免冗余处理 cognee/tasks/ingestion/ingest_data.py:153-165

来源:cognee/modules/pipelines/operations/run_tasks.py:97-118, cognee/tasks/ingestion/ingest_data.py:113-165, cognee/modules/data/models/Data.py:34

数据集管理

数据集是 Cognee 中的主要组织单元。

  • 解析resolve_authorized_user_dataset 确保用户对目标数据集具有"写入"权限 cognee/api/v1/add/add.py:199-206
  • 关系模式Data 模型 cognee/modules/data/models/Data.py:12-48 追踪文件位置(raw_data_locationoriginal_data_location)、MIME 类型、哈希值和 Token 数量。
  • 关联Data 通过 DatasetData 关联表与 Dataset 关联,支持多对多关系 cognee/modules/data/models/Data.py:42-48

图表:Data 与 Dataset 的关系关联

Cognee · 数据集管理 · 图 3
Cognee · 数据集管理 · 图 3

来源:cognee/modules/data/models/Data.py:12-48, cognee/tasks/ingestion/ingest_data.py:60-80