任务执行与队列系统
任务执行与队列系统
相关源文件
以下文件为本维基页面的生成提供了上下文:
admin/server/admin_server.pyapi/apps/__init__.pyapi/db/__init__.pyapi/db/db_models.pyapi/db/init_data.pyapi/db/services/dialog_service.pyapi/db/services/document_service.pyapi/db/services/file_service.pyapi/db/services/knowledgebase_service.pyapi/db/services/llm_service.pyapi/db/services/task_service.pyapi/db/services/user_service.pyapi/ragflow_server.pyapi/settings.pyapi/utils/api_utils.pyconf/service_conf.yamldocker/service_conf.yaml.templaterag/nlp/search.pyrag/raptor.pyrag/svr/task_executor.pyrag/utils/redis_conn.pytest/testcases/test_web_api/test_system_app/test_apps_init_unit.py
目的与范围
本文档描述了 RAGFlow 的异步任务执行系统,该系统在 HTTP 请求-响应周期之外处理长时间运行的文档处理操作。系统使用 Redis Streams 作为分布式任务队列,多个工作进程并发消费和执行任务。
涵盖的主题包括:
- 基于 Redis Streams 的队列架构
- 工作进程生命周期与配置
- 任务类型及其执行管线
- 并发控制机制
- 进度跟踪与取消
- 错误处理与重试策略
来源:rag/svr/task_executor.py:1-134, api/db/services/task_service.py:1-40
架构总览
任务执行系统采用生产者-消费者模式,其中 API 端点作为生产者,Redis Streams 作为消息代理,专用工作进程作为消费者。
任务系统数据流
下图展示了从 API 请求到后台执行的完整流程,将高层概念映射到具体代码实体。
图:请求到执行流程
来源:rag/svr/task_executor.py:177-210, api/db/services/document_service.py:41-72, api/db/services/task_service.py:57-142, rag/utils/redis_conn.py:37-59, api/apps/__init__.py:1-15
Redis Streams 队列系统
RAGFlow 使用带有消费者组的 Redis Streams 来实现可靠的任务分发。每种任务类型都有专用的队列(流名称),工作进程属于一个消费者组,确保每条消息只被处理一次。
队列配置
| 队列名称 | 任务类型 | 代码映射 (TASK_TYPE_TO_PIPELINE_TASK_TYPE) | 用途 |
|---|---|---|---|
ragflow_dataflow | dataflow | PipelineTaskType.PARSE | 文档解析、片段切分和嵌入向量生成 |
ragflow_raptor | raptor | PipelineTaskType.RAPTOR | 构建层级摘要(RAPTOR) |
ragflow_graphrag | graphrag | PipelineTaskType.GRAPH_RAG | 提取实体和关系以构建知识图谱 |
ragflow_mindmap | mindmap | PipelineTaskType.MINDMAP | 从文档内容生成思维导图 |
ragflow_memory | memory | PipelineTaskType.MEMORY | 处理并将会话消息保存到记忆 |
来源:rag/svr/task_executor.py:122-128, common/constants.py:62-62, rag/svr/task_executor.py:181-185
消息收集与确认
rag/svr/task_executor.py 中的 collect() 函数管理消息的生命周期。它使用 REDIS_CONN(一个 RedisDB 单例)与 Redis 交互。
- 未确认消息检查:首先使用
REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)检查未确认的消息。rag/svr/task_executor.py:184-185 - 新消息:如果没有未确认的消息,则使用
REDIS_CONN.queue_consumer()监听新消息。rag/svr/task_executor.py:189-192 - 任务检索:使用
TaskService.get_task(task_id)从数据库获取任务元数据。rag/svr/task_executor.py:196-197 - 确认:一旦任务成功路由到处理例程,通过
RedisMsg.ack()确认消息。rag/utils/redis_conn.py:45-51
来源:rag/svr/task_executor.py:177-210, rag/utils/redis_conn.py:61-145
工作进程架构
启动与配置
每个工作进程由 CONSUMER_NO(作为命令行参数传入)标识,并注册为 task_executor_{N}。rag/svr/task_executor.py:114-115
| 变量 | 默认值 | 来源 | 描述 |
|---|---|---|---|
MAX_CONCURRENT_TASKS | 5 | os.environ | 每个工作进程的最大并行任务数 |
MAX_CONCURRENT_CHUNK_BUILDERS | 1 | os.environ | 并行片段切分操作数 |
MAX_CONCURRENT_MINIO | 10 | os.environ | 并行 MinIO 上传数 |
WORKER_HEARTBEAT_TIMEOUT | 120 | os.environ | 工作进程心跳超时时间(秒) |
来源:rag/svr/task_executor.py:142-149
并发控制
系统使用 asyncio.Semaphore 对象来限制并发操作,防止资源耗尽。
图:资源限制器关联
来源:rag/svr/task_executor.py:145-149
任务类型与执行管线
数据流(文档解析)管线
核心入库逻辑位于数据流管线中。它根据文档的 parser_id 从 FACTORY 中选择解析器。rag/svr/task_executor.py:103-120
- 解析器选择:将
ParserType(例如NAIVE、PAPER、BOOK、QA、TABLE)映射到特定模块,如rag.app.naive或rag.app.qa。rag/svr/task_executor.py:103-120 - 进度跟踪:调用
set_progress()更新数据库中的Task表。rag/svr/task_executor.py:161-187 - 嵌入向量生成:在处理过程中使用
LLMBundle为片段生成向量。rag/svr/task_executor.py:75-75,api/db/services/llm_service.py:85-134
专用管线
- RAPTOR:
run_raptor_for_kb使用RAPTOR_TREE_BUILDER类构建层级摘要。rag/svr/task_executor.py:86-86 - GraphRAG:
run_graphrag_for_kb提取实体和关系,用于复杂的多跳检索。rag/svr/task_executor.py:55-56 - 思维导图:
run_mindmap_for_kb从文档内容生成思维导图。rag/svr/task_executor.py:290-290 - 记忆:
handle_save_to_memory_task处理并索引对话历史。rag/svr/task_executor.py:39-39
来源:rag/svr/task_executor.py:39-128
任务生命周期管理
进度与取消
取消操作通过数据库中的状态检查来处理。TaskService 中的 has_canceled() 函数检查任务是否已被标记为已取消。api/db/services/task_service.py:76-76
task_executor.py 中的 set_progress() 函数会检查此标志:
cancel = has_canceled(task_id)
if cancel:
msg += " [已取消]"
prog = -1
# ... 通过 TaskService.update_progress 更新数据库 ...
if cancel:
raise TaskCanceledException(msg)
来源:rag/svr/task_executor.py:161-187
错误处理与重试逻辑
任务失败时会进行重试,由 Task 模型状态和 TaskService 管理。
- 重试次数:
TaskService.get_task()会递增retry_count。任务在尝试 3 次后被放弃。api/db/services/task_service.py:99-100,api/db/services/task_service.py:131-132 - 数据库韧性:操作使用
retry_deadlock_operation来处理瞬态数据库争用。api/db/services/document_service.py:27-27 - 清理:系统通过
close_connection()确保更新后关闭连接。rag/svr/task_executor.py:183-183
来源:api/db/services/task_service.py:75-142, api/db/db_models.py:152-163, rag/svr/task_executor.py:161-187