文档处理管线
文档处理管线
相关源文件
本章引用的主要源码文件:
api/db/__init__.pyapi/db/db_models.pyapi/db/services/dialog_service.pyapi/db/services/document_service.pyapi/db/services/file_service.pyapi/db/services/knowledgebase_service.pyapi/db/services/llm_service.pyapi/db/services/task_service.pyapi/db/services/user_service.pydeepdoc/parser/excel_parser.pyrag/app/book.pyrag/app/laws.pyrag/app/manual.pyrag/app/naive.pyrag/app/one.pyrag/app/paper.pyrag/app/presentation.pyrag/app/qa.pyrag/app/table.pyrag/nlp/__init__.pyrag/nlp/search.pyrag/raptor.pyrag/svr/task_executor.py
目的与范围
文档处理管线是 RAGFlow 的核心引擎,负责将上传的原始文档转换为语义丰富、经过片段切分、嵌入向量化并建立索引的单元,以供检索使用。其流程涵盖从文档上传和入库,到解析、片段切分、语义增强、嵌入向量化,最终在文档存储中进行索引的全过程。
本文档是对端到端管线结构和流程的高层概述,并链接到每个核心阶段的详细子页面:解析、片段切分、增强、视觉处理(OCR/版面分析)、高级功能(GraphRAG、RAPTOR)以及外部连接器集成。
有关详细的机制和实现,请参见以下链接的子页面:
管线架构总览
高层管线流程
文档处理管线以异步方式运行,使用任务队列和后台工作者。当通过 API 上传文档时,会创建一个处理任务并将其加入 Redis Streams 队列。后台任务执行器工作者从队列中获取任务,选择合适的解析器,对文档进行片段切分,通过大语言模型(LLM)应用语义增强,将片段转换为嵌入向量,并在文档存储中建立索引。
来源:rag/svr/task_executor.py:85-110, rag/svr/task_executor.py:177-240, api/db/services/task_service.py:75-142, api/db/services/document_service.py:44-72, rag/nlp/search.py:37-51
任务执行器与队列系统
任务收集与分发
处理任务由从 task_executor.py 实例化的后台工作者管理。这些工作者从 Redis Streams 队列中消费文档处理任务,该队列使用由 SVR_CONSUMER_GROUP_NAME 和 task_executor_* 消费者名称标识的消费者组。通过 asyncio 信号量控制并发度,限制最大并行任务数和片段构建数。
多个任务执行器并行运行,各自消费任务,通过信号量控制以避免过载。
任务类型与路由
管线支持多种任务类型,路由到不同的逻辑处理器,定义在 TASK_TYPE_TO_PIPELINE_TASK_TYPE 中:
| 任务类型 | 管线任务类型 | 处理逻辑 | 描述 |
|---|---|---|---|
dataflow | PARSE | 标准解析流程(run_dataflow()) | 通过解析器模块进行典型文档解析 |
raptor | RAPTOR | RAPTOR 层次聚类 | 用于检索的层次化片段聚类 |
graphrag | GRAPH_RAG | GraphRAG 知识图谱提取 | 实体-关系图谱提取 |
mindmap | MINDMAP | 思维导图生成(MindMapExtractor) | 提取思维导图结构 |
memory | MEMORY | 智能体记忆持久化(handle_save_to_memory_task) | 持久化智能体记忆管理 |
这种路由支持高级文档处理工作流的可扩展性。
来源:rag/svr/task_executor.py:104-110, rag/svr/task_executor.py:33-46, rag/raptor.py:1-50
解析器选择与工厂模式
解析器工厂映射
解析器模块为各种文档类型实现了片段切分策略。FACTORY 字典将解析器类型常量(ParserType)映射到对应的解析器模块,从而在运行时动态选择解析逻辑。
这种工厂模式支持独立扩展或定制解析策略。
解析器选择逻辑
在任务执行时,系统通过工厂字典根据文档的 parser_id 选择解析器模块,然后调用解析器的 chunk() 函数。该函数通常处理文档二进制数据、指定的页码范围、语言以及各种解析器配置。
chunk 函数的参数包括:
filename:上传文件的名称binary:原始文件字节(可选)from_page、to_page:要解析的页码范围lang:文档语言callback:用于进度报告parser_config:解析器特定的配置(例如,版面、片段大小)
有关详细的解析器实现和支持的文件格式(如 PDF、DOCX、Excel、Markdown、EPUB、PPT),请参见文档解析策略。
来源:rag/svr/task_executor.py:85-168, rag/app/naive.py:86-112, rag/app/manual.py:137-175
片段切分过程
片段构建工作流
片段构建函数(build_chunks())负责编排以下操作:
- 从存储(MinIO)加载文件二进制数据
- 从
FACTORY中选择并调用片段切分器 - 处理图像并将提取的 ID 存储回 MinIO
- 可选地调用基于大语言模型(LLM)的内容增强(关键词、问答对)
- 最终准备用于嵌入向量化和索引的片段
该函数通过信号量应用并发控制,并使用超时守卫确保健壮性。
有关不同片段切分器的策略变体和用例,请参见片段切分方法。
片段数据结构
解析器生成的每个片段都包含必要的元数据,以支持语义搜索:
| 字段 | 类型 | 描述 |
|---|---|---|
id | 字符串 | 唯一片段 ID(内容哈希 + doc_id) |
doc_id | 字符串 | 父文档 ID |
kb_id | 字符串 | 知识库(数据集)ID |
content_with_weight | 字符串 | 包含标题权重的文本内容 |
content_ltks | 列表 | 分词后的内容 |
img_id | 字符串 | 图像引用(MinIO 对象 ID) |
page_num_int | 整数 | 源文档中的页码 |
这些片段被传递给嵌入向量化处理,并与元数据和向量一起存储在文档存储中。
来源:rag/svr/task_executor.py:294-342, rag/nlp/search.py:149-153
内容增强与嵌入向量
基于大语言模型(LLM)的增强
如果解析器配置中启用了该功能,基于大语言模型(LLM)的增强会为片段生成元数据丰富的内容。这包括:
- 关键词提取 — 检测片段中的重要关键词或短语。
- 问题建议 — 生成片段内容可能回答的潜在问题。
- 元数据提取 — 使用 JSON 模式提示提取结构化数据字段。
- 目录生成 — 总结文档大纲条目以方便导航。
这些调用使用 rag.prompts.generator 下的提示生成工具。
有关增强和嵌入向量集成的详细信息,请参见内容增强与嵌入向量。
来源:rag/svr/task_executor.py:343-452, rag/prompts/generator.py:44-49
嵌入向量过程
增强后,片段通过调用大语言模型(LLM)抽象层(LLMBundle.encode)编码为向量嵌入向量。这支持多个提供商和使用跟踪。
嵌入向量以批处理方式进行,可能会在内容前添加文档或片段标题以保持语义连贯性。
生成的向量在文档存储中建立索引以供检索。
来源:rag/svr/task_executor.py:575-626, api/db/services/llm_service.py:95-118
数据流管线执行
自定义管线处理
RAGFlow 通过智能体/画布系统支持用户定义的复杂文档处理。run_dataflow() 函数执行这些管线,可以加载文档、解析、片段切分、增强和嵌入向量化,由自定义工作流编排。
这种灵活的管线机制允许用复杂的处理流程集成或替换默认解析器。
来源:rag/svr/task_executor.py:629-768, api/db/db_models.py:25
视觉与高级功能
视觉处理
RAGFlow 集成了高级视觉分析功能,包括 OCR、版面识别、表格检测、乱码文本检测和图像自动旋转。这些功能允许精确解析扫描的 PDF 和富含图像的文档。
视觉任务集成在解析器实现中,例如 rag.app.naive 和 rag.app.manual。
有关完整详细信息,请参见视觉处理:OCR 与版面识别。
来源:rag/app/naive.py:35-40, rag/app/manual.py:33-67
GraphRAG 与 RAPTOR
高级功能包括:
- RAPTOR:一种层次聚类机制(
RAPTOR_TREE_BUILDER),将片段组织成树状结构,以优化检索和摘要。 - GraphRAG:通过实体和关系提取来提取知识图谱,用于复杂信息表示。
这些功能由任务执行器中的专门任务类型(raptor、graphrag)调用。
有关更多信息,请参见高级功能:GraphRAG 与 RAPTOR。
来源:rag/svr/task_executor.py:43-44, rag/raptor.py:1-75
数据源连接器
RAGFlow 支持超过 30 种外部数据源连接器(Confluence、Jira、Google Drive、GitHub、Slack、Notion 等)。这些连接器同步外部内容并将文档输入处理管线,从而实现统一的索引和检索。
连接器系统与数据集和文档导入管线紧密集成。
有关完整文档,请参见数据源连接器。
来源:api/db/services/knowledgebase_service.py:105-117
进度跟踪与错误处理
进度更新
处理进度更新通过 set_progress() 函数在所有管线阶段报告,并记录在任务数据库中,以实现前端实时 UI 反馈。
典型的进度范围映射到各阶段:
0.0 - 0.6:OCR 和版面识别0.6 - 0.7:片段切分0.7 - 0.9:语义增强(关键词、问题、元数据)0.9 - 1.0:嵌入向量化和索引-1:检测到错误或取消
错误处理与取消
管线包含多项健壮性功能:
- 任务取消检查(
has_canceled(task_id))允许提前中止耗时操作。 - 任务在被放弃前有重试限制(最多 3 次)。
- 文档和任务状态通过
TaskStatus和StatusEnum跟踪。
这些功能确保即使在失败或用户中止请求的情况下,也能保持稳定和响应式的处理。
来源:rag/svr/task_executor.py:143-175, api/db/services/task_service.py:129-142, api/db/services/document_service.py:32
附加资源
- 有关详细的解析器工厂和文件格式,请参见文档解析策略。
- 要了解各种片段切分样式(如 naive、QA、laws、paper 等),请参见片段切分方法。
- 有关基于大语言模型(LLM)的片段丰富,请参见内容增强与嵌入向量。
- 有关图像和版面解析,请参见视觉处理:OCR 与版面识别。
- 有关聚类和关系提取,请参见高级功能:GraphRAG 与 RAPTOR。
- 有关外部集成管线,请参见数据源连接器。
本父页面提供了一个概念性地图,将底层代码制品与文档处理中涉及的自然语言系统组件连接起来。
总结图:从自然语言组件到代码实体
总结图:任务执行器与解析器工厂映射
参考
rag/svr/task_executor.py:1-200— 任务执行器、解析器选择、片段构建逻辑。rag/app/naive.py:1-112— 朴素文本解析和片段切分实现。rag/app/manual.py:1-75— 手册解析实现。rag/nlp/search.py:30-170— 文档存储和嵌入向量搜索抽象。api/db/services/task_service.py:50-150— 任务数据库管理、重试、进度。api/db/services/document_service.py:40-150— 文档元数据和文件关联。rag/raptor.py:1-75— RAPTOR 层次聚类。api/db/services/knowledgebase_service.py:100-120— 数据集和源连接器概述。api/db/services/llm_service.py:80-140— 大语言模型(LLM)抽象和嵌入向量接口。
本文档结束了对文档处理管线的概述。请深入子页面了解每个核心阶段的详细设计和实现。