agentic_huge_data_base / wiki
页面 Dify · 4.2 文档索引管线·DeepWiki 中文全文译文

4.2 · 文档索引管线(Document Indexing Pipeline)

应用编排与外部知识接入 · 聚焦本章的模块关系、源码依据与实现要点。

项目Dify 章节4.2 状态全文译文 模块系统架构、工作流与编排、文档对象与元数据、智能体运行时
源码线索
  • api/core/indexing_runner.py
  • api/core/rag/extractor/extract_processor.py
  • api/core/rag/extractor/helpers.py
  • api/core/rag/extractor/text_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_doc_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_eml_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_epub_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_msg_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_ppt_extractor.py
模块标签
  • 系统架构
  • 工作流与编排
  • 文档对象与元数据
  • 智能体运行时
  • 测试、发布与运维

章节正文

文档索引管线

文档索引管线

相关源文件

本章引用的主要源码文件:

  • api/core/indexing_runner.py
  • api/core/rag/extractor/extract_processor.py
  • api/core/rag/extractor/helpers.py
  • api/core/rag/extractor/text_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_doc_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_eml_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_epub_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_msg_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_ppt_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_pptx_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_xml_extractor.py
  • api/core/rag/index_processor/index_processor_base.py
  • api/core/rag/index_processor/processor/paragraph_index_processor.py
  • api/core/rag/index_processor/processor/parent_child_index_processor.py
  • api/core/rag/index_processor/processor/qa_index_processor.py
  • api/core/tools/utils/web_reader_tool.py
  • api/events/event_handlers/clean_when_dataset_deleted.py
  • api/events/event_handlers/clean_when_document_deleted.py
  • api/services/summary_index_service.py
  • api/tasks/add_document_to_index_task.py
  • api/tasks/batch_clean_document_task.py
  • api/tasks/batch_create_segment_to_index_task.py
  • api/tasks/clean_dataset_task.py
  • api/tasks/clean_document_task.py
  • api/tasks/clean_notion_document_task.py
  • api/tasks/create_segment_to_index_task.py
  • api/tasks/delete_account_task.py
  • api/tasks/delete_segment_from_index_task.py
  • api/tasks/disable_segments_from_index_task.py
  • api/tasks/document_indexing_sync_task.py
  • api/tasks/document_indexing_task.py
  • api/tasks/document_indexing_update_task.py
  • api/tasks/duplicate_document_indexing_task.py
  • api/tasks/enable_segments_to_index_task.py
  • api/tasks/retry_document_indexing_task.py
  • api/tasks/sync_website_document_indexing_task.py
  • api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py
  • api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py
  • api/tests/unit_tests/core/rag/indexing/processor/test_paragraph_index_processor.py
  • api/tests/unit_tests/core/rag/indexing/processor/test_parent_child_index_processor.py
  • api/tests/unit_tests/core/rag/indexing/processor/test_qa_index_processor.py
  • api/tests/unit_tests/core/tools/utils/test_web_reader_tool.py
  • api/tests/unit_tests/services/document_service_validation.py
  • api/tests/unit_tests/services/test_dataset_service_lock_not_owned.py
  • api/tests/unit_tests/services/test_summary_index_service.py
  • api/tests/unit_tests/services/test_vector_service.py
  • api/tests/unit_tests/tasks/test_clean_dataset_task.py
  • api/tests/unit_tests/tasks/test_clean_document_task.py
  • api/tests/unit_tests/tasks/test_delete_account_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_sync_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_update_task.py

目的与范围

本文档描述文档索引管线,该管线负责将原始文档处理为可搜索的知识库片段。管线遵循提取-转换-加载(ETL)模式,将来自不同来源(上传文件、Notion 页面、网站)的文档转换为存储在向量数据库和关键词索引中的索引片段。

此过程的主要入口点是 IndexingRunner 类,它负责协调文档在各个状态之间的转换,直到文档变为可搜索状态。

来源api/core/indexing_runner.py:50-54

管线总览

文档索引管线由 IndexingRunner 类执行,包含三个顺序阶段:

阶段目的关键操作
提取从数据源解析原始内容文件解析、Notion API 调用、网页抓取
转换清洗、拆分和结构化内容文本清洗、分段、嵌入向量准备
加载将处理后的内容存储到索引中向量数据库写入、关键词索引创建、元数据存储

管线通过 Celery 任务以同步或异步方式启动,任务被分发到 dataset 队列。

来源api/core/indexing_runner.py:94-119api/tasks/document_indexing_task.py:22-22

核心架构组件

IndexingRunner 类层次结构

下图将高层 ETL 概念与 Dify 后端中使用的具体代码实体联系起来。

Dify · IndexingRunner 类层次结构 · 图 1
Dify · IndexingRunner 类层次结构 · 图 1

来源api/core/indexing_runner.py:93-119api/core/rag/extractor/extract_processor.py:93-130api/core/rag/index_processor/index_processor_factory.py:9-26

ETL 管线阶段

提取阶段

提取阶段从数据源获取原始内容,并将其转换为 core.rag.models.document.Document 对象列表。

Dify · 提取阶段 · 图 2
Dify · 提取阶段 · 图 2

提取阶段实现

  • IndexingRunner._extract 方法通过调用索引处理器的 extract 方法来编排此阶段。
  • ExtractProcessor.extract 根据文件扩展名或数据源类型选择合适的提取器。
  • 文件提取:支持 PDF、Word、Excel、CSV、Markdown 和 HTML。如果 ETL_TYPE 设置为 Unstructured,则对 .doc.ppt.eml 等特定格式使用 Unstructured.io 提取器。
  • 网页提取:使用 FirecrawlWebExtractorJinaReaderWebExtractorWaterCrawlWebExtractor 来获取和解析网站内容。

来源api/core/indexing_runner.py:95-95api/core/rag/extractor/extract_processor.py:93-130api/core/rag/extractor/extract_processor.py:111-135

转换阶段

转换阶段对文本进行清洗和分段。处理逻辑取决于 doc_form(例如,段落、父子或问答)。

转换步骤

  1. 清洗CleanProcessor.clean 移除不需要的字符并应用预处理规则(例如,移除多余空格)。
  2. 拆分BaseIndexProcessor._get_splitter 根据规则创建一个 TextSplitter
    • FixedRecursiveCharacterTextSplitter 用于 customhierarchical 规则。
    • EnhanceRecursiveCharacterTextSplitter 用于 automatic 规则。
  3. 结构化
    • ParagraphIndexProcessor:标准的扁平化分段。它还使用 _get_content_files 提取 Markdown 图片。
    • ParentChildIndexProcessor:创建大的父级片段(例如,整个文档或段落)和较小的子级块(ChildChunk 模型),以提高检索精度。
    • QAIndexProcessor:使用 LLMGenerator 从文本片段生成问答对。

来源api/core/rag/index_processor/index_processor_base.py:114-151api/core/rag/index_processor/processor/paragraph_index_processor.py:74-121api/core/rag/index_processor/processor/parent_child_index_processor.py:57-130api/core/rag/index_processor/processor/qa_index_processor.py:55-121

加载阶段

加载阶段将片段持久化到数据库,并将其索引以便检索。

Dify · 加载阶段 · 图 3
Dify · 加载阶段 · 图 3

实现细节

  • _load_segments:将元数据和内容保存到 DocumentSegment 表。对于父子索引,还会保存到 ChildChunk 表。
  • _load:根据 indexing_technique 处理实际的索引操作。
    • 高质量:使用 ModelInstance 生成嵌入向量,并通过 Vector.create() 将其存储到向量数据库中。
    • 经济模式:使用 Keyword.add_texts() 构建关键词索引。
  • 多模态支持:如果数据集是多模态的,则调用 Vector.create_multimodal() 来索引图片附件。

来源api/core/indexing_runner.py:111-119api/core/rag/index_processor/processor/paragraph_index_processor.py:123-144api/core/rag/index_processor/processor/parent_child_index_processor.py:132-155

文档状态生命周期

文档会经历 models.enums.IndexingStatus 中定义的多个状态。

状态描述
waiting文档已排队等待处理。
parsing正在从源中提取原始内容。
splitting正在清洗和分段文本。
indexing正在生成嵌入向量并保存到索引。
completed管线成功完成。
error管线失败;错误详情存储在 document.error 中。

来源api/core/indexing_runner.py:58-67api/models/enums.py:43-43

异步任务处理

大规模索引操作由 Celery 任务管理,以确保系统响应性。

  • 索引add_document_to_index_task 负责将已处理的片段添加到向量索引中。
  • 同步document_indexing_sync_task 检查更新(例如,Notion 中的更新),并在内容发生变化时重新索引。
  • 清理clean_dataset_taskclean_document_task 在删除数据集或文档时,负责从数据库和向量存储中删除片段。
  • 批量操作batch_create_segment_to_index_task 允许从 CSV 模板批量创建片段。

来源api/tasks/add_document_to_index_task.py:23-23api/tasks/document_indexing_sync_task.py:22-22api/tasks/clean_dataset_task.py:33-33api/tasks/batch_create_segment_to_index_task.py:30-30

索引预估

在开始完整的索引运行之前,用户可以请求预估 Token 和片段数量。

  • IndexingRunner.indexing_estimate() 模拟提取和转换阶段。
  • 它会计算片段总数和嵌入向量所需的 Token 总数。
  • 返回一个包含片段数和 Token 数的 IndexingEstimate 实体。

来源api/core/indexing_runner.py:264-275api/core/entities/knowledge_entities.py:16-16