agentic_huge_data_base / wiki
页面 RAGFlow · 3.3 任务执行与队列系统·DeepWiki 中文全文译文

3.3 · 任务执行与队列系统(Task Execution and Queue System)

复杂文档理解与引用检索 · 聚焦本章的模块关系、源码依据与实现要点。

项目RAGFlow 章节3.3 状态全文译文 模块入库与解析、测试、发布与运维、系统架构、工作流与编排
源码线索
  • admin/server/admin_server.py
  • api/apps/__init__.py
  • api/db/__init__.py
  • api/db/db_models.py
  • api/db/init_data.py
  • api/db/services/dialog_service.py
  • api/db/services/document_service.py
  • api/db/services/file_service.py
  • api/db/services/knowledgebase_service.py
  • api/db/services/llm_service.py
模块标签
  • 入库与解析
  • 测试、发布与运维
  • 系统架构
  • 工作流与编排
  • 配置治理

章节正文

任务执行与队列系统

任务执行与队列系统

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • admin/server/admin_server.py
  • api/apps/__init__.py
  • api/db/__init__.py
  • api/db/db_models.py
  • api/db/init_data.py
  • api/db/services/dialog_service.py
  • api/db/services/document_service.py
  • api/db/services/file_service.py
  • api/db/services/knowledgebase_service.py
  • api/db/services/llm_service.py
  • api/db/services/task_service.py
  • api/db/services/user_service.py
  • api/ragflow_server.py
  • api/settings.py
  • api/utils/api_utils.py
  • conf/service_conf.yaml
  • docker/service_conf.yaml.template
  • rag/nlp/search.py
  • rag/raptor.py
  • rag/svr/task_executor.py
  • rag/utils/redis_conn.py
  • test/testcases/test_web_api/test_system_app/test_apps_init_unit.py

目的与范围

本文档描述了 RAGFlow 的异步任务执行系统,该系统在 HTTP 请求-响应周期之外处理长时间运行的文档处理操作。系统使用 Redis Streams 作为分布式任务队列,多个工作进程并发消费和执行任务。

涵盖的主题包括:

  • 基于 Redis Streams 的队列架构
  • 工作进程生命周期与配置
  • 任务类型及其执行管线
  • 并发控制机制
  • 进度跟踪与取消
  • 错误处理与重试策略

来源:rag/svr/task_executor.py:1-134, api/db/services/task_service.py:1-40

架构总览

任务执行系统采用生产者-消费者模式,其中 API 端点作为生产者,Redis Streams 作为消息代理,专用工作进程作为消费者。

任务系统数据流

下图展示了从 API 请求到后台执行的完整流程,将高层概念映射到具体代码实体。

图:请求到执行流程

RAGFlow · 任务系统数据流 · 图 1
RAGFlow · 任务系统数据流 · 图 1

来源:rag/svr/task_executor.py:177-210, api/db/services/document_service.py:41-72, api/db/services/task_service.py:57-142, rag/utils/redis_conn.py:37-59, api/apps/__init__.py:1-15

Redis Streams 队列系统

RAGFlow 使用带有消费者组的 Redis Streams 来实现可靠的任务分发。每种任务类型都有专用的队列(流名称),工作进程属于一个消费者组,确保每条消息只被处理一次。

队列配置
队列名称任务类型代码映射 (TASK_TYPE_TO_PIPELINE_TASK_TYPE)用途
ragflow_dataflowdataflowPipelineTaskType.PARSE文档解析、片段切分和嵌入向量生成
ragflow_raptorraptorPipelineTaskType.RAPTOR构建层级摘要(RAPTOR)
ragflow_graphraggraphragPipelineTaskType.GRAPH_RAG提取实体和关系以构建知识图谱
ragflow_mindmapmindmapPipelineTaskType.MINDMAP从文档内容生成思维导图
ragflow_memorymemoryPipelineTaskType.MEMORY处理并将会话消息保存到记忆

来源:rag/svr/task_executor.py:122-128, common/constants.py:62-62, rag/svr/task_executor.py:181-185

消息收集与确认

rag/svr/task_executor.py 中的 collect() 函数管理消息的生命周期。它使用 REDIS_CONN(一个 RedisDB 单例)与 Redis 交互。

  1. 未确认消息检查:首先使用 REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) 检查未确认的消息。rag/svr/task_executor.py:184-185
  2. 新消息:如果没有未确认的消息,则使用 REDIS_CONN.queue_consumer() 监听新消息。rag/svr/task_executor.py:189-192
  3. 任务检索:使用 TaskService.get_task(task_id) 从数据库获取任务元数据。rag/svr/task_executor.py:196-197
  4. 确认:一旦任务成功路由到处理例程,通过 RedisMsg.ack() 确认消息。rag/utils/redis_conn.py:45-51

来源:rag/svr/task_executor.py:177-210, rag/utils/redis_conn.py:61-145

工作进程架构

启动与配置

每个工作进程由 CONSUMER_NO(作为命令行参数传入)标识,并注册为 task_executor_{N}rag/svr/task_executor.py:114-115

变量默认值来源描述
MAX_CONCURRENT_TASKS5os.environ每个工作进程的最大并行任务数
MAX_CONCURRENT_CHUNK_BUILDERS1os.environ并行片段切分操作数
MAX_CONCURRENT_MINIO10os.environ并行 MinIO 上传数
WORKER_HEARTBEAT_TIMEOUT120os.environ工作进程心跳超时时间(秒)

来源:rag/svr/task_executor.py:142-149

并发控制

系统使用 asyncio.Semaphore 对象来限制并发操作,防止资源耗尽。

图:资源限制器关联

RAGFlow · 并发控制 · 图 2
RAGFlow · 并发控制 · 图 2

来源:rag/svr/task_executor.py:145-149

任务类型与执行管线

数据流(文档解析)管线

核心入库逻辑位于数据流管线中。它根据文档的 parser_idFACTORY 中选择解析器。rag/svr/task_executor.py:103-120

  • 解析器选择:将 ParserType(例如 NAIVEPAPERBOOKQATABLE)映射到特定模块,如 rag.app.naiverag.app.qarag/svr/task_executor.py:103-120
  • 进度跟踪:调用 set_progress() 更新数据库中的 Task 表。rag/svr/task_executor.py:161-187
  • 嵌入向量生成:在处理过程中使用 LLMBundle 为片段生成向量。rag/svr/task_executor.py:75-75, api/db/services/llm_service.py:85-134
专用管线
  • RAPTORrun_raptor_for_kb 使用 RAPTOR_TREE_BUILDER 类构建层级摘要。rag/svr/task_executor.py:86-86
  • GraphRAGrun_graphrag_for_kb 提取实体和关系,用于复杂的多跳检索。rag/svr/task_executor.py:55-56
  • 思维导图run_mindmap_for_kb 从文档内容生成思维导图。rag/svr/task_executor.py:290-290
  • 记忆handle_save_to_memory_task 处理并索引对话历史。rag/svr/task_executor.py:39-39

来源:rag/svr/task_executor.py:39-128

任务生命周期管理

进度与取消

取消操作通过数据库中的状态检查来处理。TaskService 中的 has_canceled() 函数检查任务是否已被标记为已取消。api/db/services/task_service.py:76-76

task_executor.py 中的 set_progress() 函数会检查此标志:

cancel = has_canceled(task_id)
if cancel:
    msg += " [已取消]"
    prog = -1
# ... 通过 TaskService.update_progress 更新数据库 ...
if cancel:
    raise TaskCanceledException(msg)

来源:rag/svr/task_executor.py:161-187

错误处理与重试逻辑

任务失败时会进行重试,由 Task 模型状态和 TaskService 管理。

  • 重试次数TaskService.get_task() 会递增 retry_count。任务在尝试 3 次后被放弃。api/db/services/task_service.py:99-100, api/db/services/task_service.py:131-132
  • 数据库韧性:操作使用 retry_deadlock_operation 来处理瞬态数据库争用。api/db/services/document_service.py:27-27
  • 清理:系统通过 close_connection() 确保更新后关闭连接。rag/svr/task_executor.py:183-183

来源:api/db/services/task_service.py:75-142, api/db/db_models.py:152-163, rag/svr/task_executor.py:161-187