agentic_huge_data_base / wiki
页面 RAGFlow · 6 文档处理管线·DeepWiki 中文全文译文

6 · 文档处理管线(Document Processing Pipeline)

复杂文档理解与引用检索 · 聚焦本章的模块关系、源码依据与实现要点。

项目RAGFlow 章节6 状态全文译文 模块入库与解析、系统架构、工作流与编排、界面与交互
源码线索
  • api/db/__init__.py
  • api/db/db_models.py
  • api/db/services/dialog_service.py
  • api/db/services/document_service.py
  • api/db/services/file_service.py
  • api/db/services/knowledgebase_service.py
  • api/db/services/llm_service.py
  • api/db/services/task_service.py
  • api/db/services/user_service.py
  • deepdoc/parser/excel_parser.py
模块标签
  • 入库与解析
  • 系统架构
  • 工作流与编排
  • 界面与交互
  • 模型调用与提供方适配

章节正文

文档处理管线

文档处理管线

相关源文件

本章引用的主要源码文件:

  • api/db/__init__.py
  • api/db/db_models.py
  • api/db/services/dialog_service.py
  • api/db/services/document_service.py
  • api/db/services/file_service.py
  • api/db/services/knowledgebase_service.py
  • api/db/services/llm_service.py
  • api/db/services/task_service.py
  • api/db/services/user_service.py
  • deepdoc/parser/excel_parser.py
  • rag/app/book.py
  • rag/app/laws.py
  • rag/app/manual.py
  • rag/app/naive.py
  • rag/app/one.py
  • rag/app/paper.py
  • rag/app/presentation.py
  • rag/app/qa.py
  • rag/app/table.py
  • rag/nlp/__init__.py
  • rag/nlp/search.py
  • rag/raptor.py
  • rag/svr/task_executor.py

目的与范围

文档处理管线是 RAGFlow 的核心引擎,负责将上传的原始文档转换为语义丰富、经过片段切分、嵌入向量化并建立索引的单元,以供检索使用。其流程涵盖从文档上传和入库,到解析、片段切分、语义增强、嵌入向量化,最终在文档存储中进行索引的全过程。

本文档是对端到端管线结构和流程的高层概述,并链接到每个核心阶段的详细子页面:解析、片段切分、增强、视觉处理(OCR/版面分析)、高级功能(GraphRAG、RAPTOR)以及外部连接器集成。

有关详细的机制和实现,请参见以下链接的子页面:

管线架构总览

高层管线流程

文档处理管线以异步方式运行,使用任务队列和后台工作者。当通过 API 上传文档时,会创建一个处理任务并将其加入 Redis Streams 队列。后台任务执行器工作者从队列中获取任务,选择合适的解析器,对文档进行片段切分,通过大语言模型(LLM)应用语义增强,将片段转换为嵌入向量,并在文档存储中建立索引。

RAGFlow · 高层管线流程 · 图 1
RAGFlow · 高层管线流程 · 图 1

来源rag/svr/task_executor.py:85-110, rag/svr/task_executor.py:177-240, api/db/services/task_service.py:75-142, api/db/services/document_service.py:44-72, rag/nlp/search.py:37-51

任务执行器与队列系统

任务收集与分发

处理任务由从 task_executor.py 实例化的后台工作者管理。这些工作者从 Redis Streams 队列中消费文档处理任务,该队列使用由 SVR_CONSUMER_GROUP_NAMEtask_executor_* 消费者名称标识的消费者组。通过 asyncio 信号量控制并发度,限制最大并行任务数和片段构建数。

RAGFlow · 任务收集与分发 · 图 2
RAGFlow · 任务收集与分发 · 图 2

多个任务执行器并行运行,各自消费任务,通过信号量控制以避免过载。

任务类型与路由

管线支持多种任务类型,路由到不同的逻辑处理器,定义在 TASK_TYPE_TO_PIPELINE_TASK_TYPE 中:

任务类型管线任务类型处理逻辑描述
dataflowPARSE标准解析流程(run_dataflow()通过解析器模块进行典型文档解析
raptorRAPTORRAPTOR 层次聚类用于检索的层次化片段聚类
graphragGRAPH_RAGGraphRAG 知识图谱提取实体-关系图谱提取
mindmapMINDMAP思维导图生成(MindMapExtractor)提取思维导图结构
memoryMEMORY智能体记忆持久化(handle_save_to_memory_task持久化智能体记忆管理

这种路由支持高级文档处理工作流的可扩展性。

来源rag/svr/task_executor.py:104-110, rag/svr/task_executor.py:33-46, rag/raptor.py:1-50

解析器选择与工厂模式

解析器工厂映射

解析器模块为各种文档类型实现了片段切分策略。FACTORY 字典将解析器类型常量(ParserType)映射到对应的解析器模块,从而在运行时动态选择解析逻辑。

RAGFlow · 解析器工厂映射 · 图 3
RAGFlow · 解析器工厂映射 · 图 3

这种工厂模式支持独立扩展或定制解析策略。

解析器选择逻辑

在任务执行时,系统通过工厂字典根据文档的 parser_id 选择解析器模块,然后调用解析器的 chunk() 函数。该函数通常处理文档二进制数据、指定的页码范围、语言以及各种解析器配置。

chunk 函数的参数包括:

  • filename:上传文件的名称
  • binary:原始文件字节(可选)
  • from_pageto_page:要解析的页码范围
  • lang:文档语言
  • callback:用于进度报告
  • parser_config:解析器特定的配置(例如,版面、片段大小)

有关详细的解析器实现和支持的文件格式(如 PDF、DOCX、Excel、Markdown、EPUB、PPT),请参见文档解析策略

来源rag/svr/task_executor.py:85-168, rag/app/naive.py:86-112, rag/app/manual.py:137-175

片段切分过程

片段构建工作流

片段构建函数(build_chunks())负责编排以下操作:

  • 从存储(MinIO)加载文件二进制数据
  • FACTORY 中选择并调用片段切分器
  • 处理图像并将提取的 ID 存储回 MinIO
  • 可选地调用基于大语言模型(LLM)的内容增强(关键词、问答对)
  • 最终准备用于嵌入向量化和索引的片段

该函数通过信号量应用并发控制,并使用超时守卫确保健壮性。

RAGFlow · 片段构建工作流 · 图 4
RAGFlow · 片段构建工作流 · 图 4

有关不同片段切分器的策略变体和用例,请参见片段切分方法

片段数据结构

解析器生成的每个片段都包含必要的元数据,以支持语义搜索:

字段类型描述
id字符串唯一片段 ID(内容哈希 + doc_id)
doc_id字符串父文档 ID
kb_id字符串知识库(数据集)ID
content_with_weight字符串包含标题权重的文本内容
content_ltks列表分词后的内容
img_id字符串图像引用(MinIO 对象 ID)
page_num_int整数源文档中的页码

这些片段被传递给嵌入向量化处理,并与元数据和向量一起存储在文档存储中。

来源rag/svr/task_executor.py:294-342, rag/nlp/search.py:149-153

内容增强与嵌入向量

基于大语言模型(LLM)的增强

如果解析器配置中启用了该功能,基于大语言模型(LLM)的增强会为片段生成元数据丰富的内容。这包括:

  • 关键词提取 — 检测片段中的重要关键词或短语。
  • 问题建议 — 生成片段内容可能回答的潜在问题。
  • 元数据提取 — 使用 JSON 模式提示提取结构化数据字段。
  • 目录生成 — 总结文档大纲条目以方便导航。

这些调用使用 rag.prompts.generator 下的提示生成工具。

有关增强和嵌入向量集成的详细信息,请参见内容增强与嵌入向量

来源rag/svr/task_executor.py:343-452, rag/prompts/generator.py:44-49

嵌入向量过程

增强后,片段通过调用大语言模型(LLM)抽象层(LLMBundle.encode)编码为向量嵌入向量。这支持多个提供商和使用跟踪。

嵌入向量以批处理方式进行,可能会在内容前添加文档或片段标题以保持语义连贯性。

生成的向量在文档存储中建立索引以供检索。

来源rag/svr/task_executor.py:575-626, api/db/services/llm_service.py:95-118

数据流管线执行

自定义管线处理

RAGFlow 通过智能体/画布系统支持用户定义的复杂文档处理。run_dataflow() 函数执行这些管线,可以加载文档、解析、片段切分、增强和嵌入向量化,由自定义工作流编排。

RAGFlow · 自定义管线处理 · 图 5
RAGFlow · 自定义管线处理 · 图 5

这种灵活的管线机制允许用复杂的处理流程集成或替换默认解析器。

来源rag/svr/task_executor.py:629-768, api/db/db_models.py:25

视觉与高级功能

视觉处理

RAGFlow 集成了高级视觉分析功能,包括 OCR、版面识别、表格检测、乱码文本检测和图像自动旋转。这些功能允许精确解析扫描的 PDF 和富含图像的文档。

视觉任务集成在解析器实现中,例如 rag.app.naiverag.app.manual

有关完整详细信息,请参见视觉处理:OCR 与版面识别

来源rag/app/naive.py:35-40, rag/app/manual.py:33-67

GraphRAG 与 RAPTOR

高级功能包括:

  • RAPTOR:一种层次聚类机制(RAPTOR_TREE_BUILDER),将片段组织成树状结构,以优化检索和摘要。
  • GraphRAG:通过实体和关系提取来提取知识图谱,用于复杂信息表示。

这些功能由任务执行器中的专门任务类型(raptorgraphrag)调用。

有关更多信息,请参见高级功能:GraphRAG 与 RAPTOR

来源rag/svr/task_executor.py:43-44, rag/raptor.py:1-75

数据源连接器

RAGFlow 支持超过 30 种外部数据源连接器(Confluence、Jira、Google Drive、GitHub、Slack、Notion 等)。这些连接器同步外部内容并将文档输入处理管线,从而实现统一的索引和检索。

连接器系统与数据集和文档导入管线紧密集成。

有关完整文档,请参见数据源连接器

来源api/db/services/knowledgebase_service.py:105-117

进度跟踪与错误处理

进度更新

处理进度更新通过 set_progress() 函数在所有管线阶段报告,并记录在任务数据库中,以实现前端实时 UI 反馈。

典型的进度范围映射到各阶段:

  • 0.0 - 0.6:OCR 和版面识别
  • 0.6 - 0.7:片段切分
  • 0.7 - 0.9:语义增强(关键词、问题、元数据)
  • 0.9 - 1.0:嵌入向量化和索引
  • -1:检测到错误或取消
错误处理与取消

管线包含多项健壮性功能:

  • 任务取消检查(has_canceled(task_id))允许提前中止耗时操作。
  • 任务在被放弃前有重试限制(最多 3 次)。
  • 文档和任务状态通过 TaskStatusStatusEnum 跟踪。

这些功能确保即使在失败或用户中止请求的情况下,也能保持稳定和响应式的处理。

来源rag/svr/task_executor.py:143-175, api/db/services/task_service.py:129-142, api/db/services/document_service.py:32

附加资源

本父页面提供了一个概念性地图,将底层代码制品与文档处理中涉及的自然语言系统组件连接起来。

总结图:从自然语言组件到代码实体

RAGFlow · 总结图:从自然语言组件到代码实体 · 图 6
RAGFlow · 总结图:从自然语言组件到代码实体 · 图 6

总结图:任务执行器与解析器工厂映射

RAGFlow · 总结图:任务执行器与解析器工厂映射 · 图 7
RAGFlow · 总结图:任务执行器与解析器工厂映射 · 图 7

参考

  • rag/svr/task_executor.py:1-200 — 任务执行器、解析器选择、片段构建逻辑。
  • rag/app/naive.py:1-112 — 朴素文本解析和片段切分实现。
  • rag/app/manual.py:1-75 — 手册解析实现。
  • rag/nlp/search.py:30-170 — 文档存储和嵌入向量搜索抽象。
  • api/db/services/task_service.py:50-150 — 任务数据库管理、重试、进度。
  • api/db/services/document_service.py:40-150 — 文档元数据和文件关联。
  • rag/raptor.py:1-75 — RAPTOR 层次聚类。
  • api/db/services/knowledgebase_service.py:100-120 — 数据集和源连接器概述。
  • api/db/services/llm_service.py:80-140 — 大语言模型(LLM)抽象和嵌入向量接口。

本文档结束了对文档处理管线的概述。请深入子页面了解每个核心阶段的详细设计和实现。