文档索引管线
文档索引管线
相关源文件
本章引用的主要源码文件:
api/core/indexing_runner.pyapi/core/rag/extractor/extract_processor.pyapi/core/rag/extractor/helpers.pyapi/core/rag/extractor/text_extractor.pyapi/core/rag/extractor/unstructured/unstructured_doc_extractor.pyapi/core/rag/extractor/unstructured/unstructured_eml_extractor.pyapi/core/rag/extractor/unstructured/unstructured_epub_extractor.pyapi/core/rag/extractor/unstructured/unstructured_markdown_extractor.pyapi/core/rag/extractor/unstructured/unstructured_msg_extractor.pyapi/core/rag/extractor/unstructured/unstructured_ppt_extractor.pyapi/core/rag/extractor/unstructured/unstructured_pptx_extractor.pyapi/core/rag/extractor/unstructured/unstructured_xml_extractor.pyapi/core/rag/index_processor/index_processor_base.pyapi/core/rag/index_processor/processor/paragraph_index_processor.pyapi/core/rag/index_processor/processor/parent_child_index_processor.pyapi/core/rag/index_processor/processor/qa_index_processor.pyapi/core/tools/utils/web_reader_tool.pyapi/events/event_handlers/clean_when_dataset_deleted.pyapi/events/event_handlers/clean_when_document_deleted.pyapi/services/summary_index_service.pyapi/tasks/add_document_to_index_task.pyapi/tasks/batch_clean_document_task.pyapi/tasks/batch_create_segment_to_index_task.pyapi/tasks/clean_dataset_task.pyapi/tasks/clean_document_task.pyapi/tasks/clean_notion_document_task.pyapi/tasks/create_segment_to_index_task.pyapi/tasks/delete_account_task.pyapi/tasks/delete_segment_from_index_task.pyapi/tasks/disable_segments_from_index_task.pyapi/tasks/document_indexing_sync_task.pyapi/tasks/document_indexing_task.pyapi/tasks/document_indexing_update_task.pyapi/tasks/duplicate_document_indexing_task.pyapi/tasks/enable_segments_to_index_task.pyapi/tasks/retry_document_indexing_task.pyapi/tasks/sync_website_document_indexing_task.pyapi/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.pyapi/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.pyapi/tests/unit_tests/core/rag/indexing/processor/test_paragraph_index_processor.pyapi/tests/unit_tests/core/rag/indexing/processor/test_parent_child_index_processor.pyapi/tests/unit_tests/core/rag/indexing/processor/test_qa_index_processor.pyapi/tests/unit_tests/core/tools/utils/test_web_reader_tool.pyapi/tests/unit_tests/services/document_service_validation.pyapi/tests/unit_tests/services/test_dataset_service_lock_not_owned.pyapi/tests/unit_tests/services/test_summary_index_service.pyapi/tests/unit_tests/services/test_vector_service.pyapi/tests/unit_tests/tasks/test_clean_dataset_task.pyapi/tests/unit_tests/tasks/test_clean_document_task.pyapi/tests/unit_tests/tasks/test_delete_account_task.pyapi/tests/unit_tests/tasks/test_document_indexing_sync_task.pyapi/tests/unit_tests/tasks/test_document_indexing_update_task.py
目的与范围
本文档描述文档索引管线,该管线负责将原始文档处理为可搜索的知识库片段。管线遵循提取-转换-加载(ETL)模式,将来自不同来源(上传文件、Notion 页面、网站)的文档转换为存储在向量数据库和关键词索引中的索引片段。
此过程的主要入口点是 IndexingRunner 类,它负责协调文档在各个状态之间的转换,直到文档变为可搜索状态。
来源:api/core/indexing_runner.py:50-54
管线总览
文档索引管线由 IndexingRunner 类执行,包含三个顺序阶段:
| 阶段 | 目的 | 关键操作 |
|---|---|---|
| 提取 | 从数据源解析原始内容 | 文件解析、Notion API 调用、网页抓取 |
| 转换 | 清洗、拆分和结构化内容 | 文本清洗、分段、嵌入向量准备 |
| 加载 | 将处理后的内容存储到索引中 | 向量数据库写入、关键词索引创建、元数据存储 |
管线通过 Celery 任务以同步或异步方式启动,任务被分发到 dataset 队列。
来源:api/core/indexing_runner.py:94-119,api/tasks/document_indexing_task.py:22-22
核心架构组件
IndexingRunner 类层次结构
下图将高层 ETL 概念与 Dify 后端中使用的具体代码实体联系起来。
来源:api/core/indexing_runner.py:93-119,api/core/rag/extractor/extract_processor.py:93-130,api/core/rag/index_processor/index_processor_factory.py:9-26
ETL 管线阶段
提取阶段
提取阶段从数据源获取原始内容,并将其转换为 core.rag.models.document.Document 对象列表。
提取阶段实现:
IndexingRunner._extract方法通过调用索引处理器的extract方法来编排此阶段。ExtractProcessor.extract根据文件扩展名或数据源类型选择合适的提取器。- 文件提取:支持 PDF、Word、Excel、CSV、Markdown 和 HTML。如果
ETL_TYPE设置为Unstructured,则对.doc、.ppt和.eml等特定格式使用 Unstructured.io 提取器。 - 网页提取:使用
FirecrawlWebExtractor、JinaReaderWebExtractor或WaterCrawlWebExtractor来获取和解析网站内容。
来源:api/core/indexing_runner.py:95-95,api/core/rag/extractor/extract_processor.py:93-130,api/core/rag/extractor/extract_processor.py:111-135
转换阶段
转换阶段对文本进行清洗和分段。处理逻辑取决于 doc_form(例如,段落、父子或问答)。
转换步骤:
- 清洗:
CleanProcessor.clean移除不需要的字符并应用预处理规则(例如,移除多余空格)。 - 拆分:
BaseIndexProcessor._get_splitter根据规则创建一个TextSplitter。FixedRecursiveCharacterTextSplitter用于custom或hierarchical规则。EnhanceRecursiveCharacterTextSplitter用于automatic规则。
- 结构化:
ParagraphIndexProcessor:标准的扁平化分段。它还使用_get_content_files提取 Markdown 图片。ParentChildIndexProcessor:创建大的父级片段(例如,整个文档或段落)和较小的子级块(ChildChunk模型),以提高检索精度。QAIndexProcessor:使用LLMGenerator从文本片段生成问答对。
来源:api/core/rag/index_processor/index_processor_base.py:114-151,api/core/rag/index_processor/processor/paragraph_index_processor.py:74-121,api/core/rag/index_processor/processor/parent_child_index_processor.py:57-130,api/core/rag/index_processor/processor/qa_index_processor.py:55-121
加载阶段
加载阶段将片段持久化到数据库,并将其索引以便检索。
实现细节:
_load_segments:将元数据和内容保存到DocumentSegment表。对于父子索引,还会保存到ChildChunk表。_load:根据indexing_technique处理实际的索引操作。- 高质量:使用
ModelInstance生成嵌入向量,并通过Vector.create()将其存储到向量数据库中。 - 经济模式:使用
Keyword.add_texts()构建关键词索引。
- 高质量:使用
- 多模态支持:如果数据集是多模态的,则调用
Vector.create_multimodal()来索引图片附件。
来源:api/core/indexing_runner.py:111-119,api/core/rag/index_processor/processor/paragraph_index_processor.py:123-144,api/core/rag/index_processor/processor/parent_child_index_processor.py:132-155
文档状态生命周期
文档会经历 models.enums.IndexingStatus 中定义的多个状态。
| 状态 | 描述 |
|---|---|
waiting | 文档已排队等待处理。 |
parsing | 正在从源中提取原始内容。 |
splitting | 正在清洗和分段文本。 |
indexing | 正在生成嵌入向量并保存到索引。 |
completed | 管线成功完成。 |
error | 管线失败;错误详情存储在 document.error 中。 |
来源:api/core/indexing_runner.py:58-67,api/models/enums.py:43-43
异步任务处理
大规模索引操作由 Celery 任务管理,以确保系统响应性。
- 索引:
add_document_to_index_task负责将已处理的片段添加到向量索引中。 - 同步:
document_indexing_sync_task检查更新(例如,Notion 中的更新),并在内容发生变化时重新索引。 - 清理:
clean_dataset_task和clean_document_task在删除数据集或文档时,负责从数据库和向量存储中删除片段。 - 批量操作:
batch_create_segment_to_index_task允许从 CSV 模板批量创建片段。
来源:api/tasks/add_document_to_index_task.py:23-23,api/tasks/document_indexing_sync_task.py:22-22,api/tasks/clean_dataset_task.py:33-33,api/tasks/batch_create_segment_to_index_task.py:30-30
索引预估
在开始完整的索引运行之前,用户可以请求预估 Token 和片段数量。
IndexingRunner.indexing_estimate()模拟提取和转换阶段。- 它会计算片段总数和嵌入向量所需的 Token 总数。
- 返回一个包含片段数和 Token 数的
IndexingEstimate实体。
来源:api/core/indexing_runner.py:264-275,api/core/entities/knowledge_entities.py:16-16