数据入库 (cognee.add)
数据入库(cognee.add)
相关源文件
本章引用的主要源码文件:
cognee/api/v1/add/add.pycognee/api/v1/cognify/cognify.pycognee/eval_framework/modal_eval_dashboard.pycognee/infrastructure/databases/vector/embeddings/EmbeddingEngine.pycognee/infrastructure/files/storage/LocalFileStorage.pycognee/infrastructure/files/storage/S3FileStorage.pycognee/infrastructure/files/storage/s3_config.pycognee/infrastructure/files/utils/extract_text_from_file.pycognee/infrastructure/files/utils/get_file_metadata.pycognee/infrastructure/files/utils/guess_file_type.pycognee/infrastructure/files/utils/is_text_content.pycognee/infrastructure/files/utils/open_data_file.pycognee/infrastructure/loaders/LoaderEngine.pycognee/infrastructure/loaders/LoaderInterface.pycognee/infrastructure/loaders/__init__.pycognee/infrastructure/loaders/core/__init__.pycognee/infrastructure/loaders/create_loader_engine.pycognee/infrastructure/loaders/external/__init__.pycognee/infrastructure/loaders/get_loader_engine.pycognee/infrastructure/loaders/supported_loaders.pycognee/modules/chunking/CsvChunker.pycognee/modules/data/models/Data.pycognee/modules/data/processing/document_types/CsvDocument.pycognee/modules/data/processing/document_types/DltRowDocument.pycognee/modules/data/processing/document_types/__init__.pycognee/modules/graph/methods/delete_data_related_edges.pycognee/modules/graph/methods/delete_data_related_nodes.pycognee/modules/graph/methods/get_dataset_related_edges.pycognee/modules/graph/methods/get_dataset_related_nodes.pycognee/modules/ingestion/classify.pycognee/modules/ingestion/data_types/BinaryData.pycognee/modules/ingestion/data_types/IngestionData.pycognee/modules/ingestion/data_types/S3BinaryData.pycognee/modules/ingestion/data_types/TextData.pycognee/modules/ingestion/data_types/__init__.pycognee/modules/pipelines/operations/pipeline.pycognee/modules/pipelines/operations/run_tasks.pycognee/modules/pipelines/operations/run_tasks_data_item.pycognee/tasks/ingestion/config.pycognee/tasks/ingestion/data_item_to_text_file.pycognee/tasks/ingestion/dlt_utils.pycognee/tasks/ingestion/exceptions/__init__.pycognee/tasks/ingestion/exceptions/exceptions.pycognee/tasks/ingestion/extract_dlt_fk_edges.pycognee/tasks/ingestion/get_dlt_destination.pycognee/tasks/ingestion/ingest_data.pycognee/tasks/ingestion/ingest_dlt_source.pycognee/tasks/ingestion/resolve_data_directories.pycognee/tasks/ingestion/resolve_dlt_sources.pycognee/tasks/ingestion/save_data_item_to_storage.pycognee/tests/test_advanced_pdf_loader.pycognee/tests/test_s3_file_storage.pyexamples/demos/dlt_ingestion_example.pyexamples/demos/test_data/employees_ontology.owlpoetry.lockpyproject.tomluv.lock
目的与范围
本文档描述了 Cognee 中的数据入库子系统,主要通过 cognee.add() 函数实现。数据入库是 Cognee 工作流的第一个阶段,负责接收来自各种来源的原始数据,并为其后续的知识图谱处理做好准备。
范围:本页面涵盖入库机制、支持的数据格式、数据集管理、增量加载和批处理。关于将入库数据加工为知识图谱的信息,请参见知识图谱生成(cognee.cognify)(3.3)。
概述
cognee.add() 函数(位于 cognee/api/v1/add/add.py:92-236)是数据入库的入口点。它将多种输入格式(包括本地文件、URL、二进制流和 DLT 源)归一化为一致的存储表示形式。它在关系数据库中创建 Data 记录,并将处理后的文本文件存储起来,供下游的 cognify() 处理使用。
关键操作:
- 数据解析:通过
resolve_data_directories和save_data_item_to_storage解析文件路径、目录结构,并验证可访问性。 - 内容提取:使用
LoaderEngine从各种文件格式中提取文本内容。 - 数据集存储:使用
Data模型将处理后的内容和元数据存储在关系数据库中。 - 元数据追踪:记录文件哈希值、时间戳和用户权限,用于增量加载和变更检测。
图表:add() 执行流程及代码实体引用
来源:cognee/api/v1/add/add.py:92-236, cognee/modules/pipelines/operations/pipeline.py:33-108, cognee/modules/pipelines/operations/run_tasks.py:55-184, cognee/tasks/ingestion/ingest_data.py:26-235, cognee/tasks/ingestion/save_data_item_to_storage.py:27-98
函数签名与参数
add() 函数定义于 cognee/api/v1/add/add.py:92-114:
async def add(
data: Union[
BinaryIO,
list[BinaryIO],
str,
list[str],
DataItem,
list[DataItem],
Any, # DltResource、SourceFactory 或其他 dlt 类型
],
dataset_name: str = "main_dataset",
user: User = None,
node_set: Optional[List[str]] = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
dataset_id: Optional[UUID] = None,
preferred_loaders: Optional[List[Union[str, dict[str, dict[str, Any]]]]] = None,
incremental_loading: bool = True,
data_per_batch: Optional[int] = 20,
importance_weight: Optional[float] = 0.5,
run_in_background: bool = False,
**kwargs,
):
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
data | Union[...] | 必填 | 输入数据:文本字符串、文件路径、URL、BinaryIO 流、DataItem 包装器或 DLT 源。 |
dataset_name | str | "main_dataset" | 用于组织数据的目标数据集名称。 |
user | User | None | 用于认证和数据访问的用户上下文。 |
node_set | Optional[List[str]] | None | 存储在 Data.node_set 中的访问控制标签。 |
dataset_id | Optional[UUID] | None | 用于替代 dataset_name 的特定数据集 UUID。 |
preferred_loaders | Optional[List] | None | 首选加载器名称或配置的列表。 |
incremental_loading | bool | True | 如果为 True,则跳过已基于 pipeline_status 完成处理的数据项。 |
data_per_batch | int | 20 | 并行任务执行的批次大小。 |
来源:cognee/api/v1/add/add.py:92-171
数据类型解析与归一化
输入归一化决策树
save_data_item_to_storage() 函数(位于 cognee/tasks/ingestion/save_data_item_to_storage.py:27-98)实现了一个归一化决策树,用于处理各种输入格式:
来源:cognee/tasks/ingestion/save_data_item_to_storage.py:27-98
支持的数据类型
文件格式与加载器
Cognee 使用 LoaderEngine 来管理文件加载器。加载器根据优先级和文件类型(MIME/扩展名)进行选择。
| 类别 | 支持的格式 | 示例加载器 |
|---|---|---|
| 文本 | .txt、.md、.csv | text_loader、csv_loader |
| 文档 | .pdf、.docx、.pptx | pypdf_loader、docling_loader |
| 多媒体 | .png、.jpg、.mp3、.wav | image_loader、audio_loader |
| 代码 | .py、.js、.ts 等 | text_loader |
data_item_to_text_file 函数(位于 cognee/tasks/ingestion/data_item_to_text_file.py:36-79)负责协调将归一化路径转换为 Cognee 存储中的文本文件,通过注册的加载器调用 loader.load_file() 实现。
来源:cognee/tasks/ingestion/data_item_to_text_file.py:36-79, cognee/api/v1/add/add.py:136-142
DLT(数据加载工具)集成
Cognee 通过 resolve_dlt_sources(位于 cognee/tasks/ingestion/resolve_dlt_sources.py)支持从 DLT 源进行入库。
- 解析:DLT 源和资源在处理前会被展开为单个数据项
cognee/api/v1/add/add.py:208-213。 - 标识:对于 DLT 数据项,优先使用源提供的稳定
data_id,而非基于内容哈希的 IDcognee/tasks/ingestion/ingest_data.py:125-128。
来源:cognee/api/v1/add/add.py:208-213, cognee/tasks/ingestion/ingest_data.py:125-128
入库管线架构
入库过程由 run_tasks(位于 cognee/modules/pipelines/operations/run_tasks.py:55-184)进行编排。
批处理与并发
Cognee 使用基于信号量的并发模型来处理大量数据项。
- 批处理:数据按照
data_per_batch(默认 20)定义的批次大小进行处理。 - 并发:
asyncio.Semaphore(data_per_batch)限制并发执行run_tasks_data_item的数量,以防止资源耗尽cognee/modules/pipelines/operations/run_tasks.py:97-118。 - 分布式模式:如果启用了
COGNEE_DISTRIBUTED,则通过@override_run_tasks装饰器将任务交给run_tasks_distributed处理cognee/modules/pipelines/operations/run_tasks.py:55。
增量加载
增量加载通过检查存储在 Data 模型中的 pipeline_status 进行管理 cognee/modules/data/models/Data.py:34。
- 标识:
ingestion.identify(classified_data, user)基于内容哈希和所有者 ID 生成data_idcognee/tasks/ingestion/ingest_data.py:117。 - 状态检查:
ingest_data检查具有该 ID 的Data记录是否已存在于关系数据库中cognee/tasks/ingestion/ingest_data.py:133-136。 - 变更检测:如果记录存在,Cognee 会比较当前的
content_hash与存储的哈希值cognee/tasks/ingestion/ingest_data.py:150-151。 - 更新:如果内容未更改,则更新元数据,但避免冗余处理
cognee/tasks/ingestion/ingest_data.py:153-165。
来源:cognee/modules/pipelines/operations/run_tasks.py:97-118, cognee/tasks/ingestion/ingest_data.py:113-165, cognee/modules/data/models/Data.py:34
数据集管理
数据集是 Cognee 中的主要组织单元。
- 解析:
resolve_authorized_user_dataset确保用户对目标数据集具有"写入"权限cognee/api/v1/add/add.py:199-206。 - 关系模式:
Data模型cognee/modules/data/models/Data.py:12-48追踪文件位置(raw_data_location、original_data_location)、MIME 类型、哈希值和 Token 数量。 - 关联:
Data通过DatasetData关联表与Dataset关联,支持多对多关系cognee/modules/data/models/Data.py:42-48。
图表:Data 与 Dataset 的关系关联
来源:cognee/modules/data/models/Data.py:12-48, cognee/tasks/ingestion/ingest_data.py:60-80