agentic_huge_data_base / wiki
页面 RAGFlow · 8.4 工作流执行与流式·DeepWiki 中文全文译文

8.4 · 工作流执行与流式(Workflow Execution and Streaming)

复杂文档理解与引用检索 · 聚焦本章的模块关系、源码依据与实现要点。

项目RAGFlow 章节8.4 状态全文译文 模块界面与交互、检索、召回与索引、入库与解析、系统架构
源码线索
  • agent/canvas.py
  • agent/component/agent_with_tools.py
  • agent/component/base.py
  • agent/component/categorize.py
  • agent/component/llm.py
  • agent/tools/base.py
  • api/apps/restful_apis/agent_api.py
  • api/db/services/canvas_service.py
  • api/db/services/conversation_service.py
  • common/mcp_tool_call_conn.py
模块标签
  • 界面与交互
  • 检索、召回与索引
  • 入库与解析
  • 系统架构
  • 智能体运行时

章节正文

工作流执行与流式

工作流执行与流式传输

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • agent/canvas.py
  • agent/component/agent_with_tools.py
  • agent/component/base.py
  • agent/component/categorize.py
  • agent/component/llm.py
  • agent/tools/base.py
  • api/apps/restful_apis/agent_api.py
  • api/db/services/canvas_service.py
  • api/db/services/conversation_service.py
  • common/mcp_tool_call_conn.py
  • deepdoc/parser/mineru_parser.py
  • rag/prompts/generator.py
  • test/testcases/test_http_api/test_session_management/test_agent_completions.py
  • test/testcases/test_http_api/test_session_management/test_agent_sessions.py
  • test/testcases/test_http_api/test_session_management/test_chat_completions.py
  • test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py
  • test/testcases/test_web_api/test_agent_app/test_agents_webhook_unit.py
  • web/src/components/embed-container.tsx
  • web/src/components/embed-dialog/use-show-embed-dialog.ts
  • web/src/components/home-card.tsx
  • web/src/components/message-input/next.tsx
  • web/src/components/next-message-item/index.tsx
  • web/src/components/ui/accordion.tsx
  • web/src/components/ui/tree-view.tsx
  • web/src/hooks/use-agent-request.ts
  • web/src/hooks/use-send-message.ts
  • web/src/interfaces/database/agent.ts
  • web/src/pages/agent/chat/box.tsx
  • web/src/pages/agent/chat/chat-sheet.tsx
  • web/src/pages/agent/chat/use-send-agent-message.ts
  • web/src/pages/agent/constant/chat.ts
  • web/src/pages/agent/hooks/use-cache-chat-log.ts
  • web/src/pages/agent/hooks/use-chat-logic.ts
  • web/src/pages/agent/share/index.tsx
  • web/src/pages/agent/utils/chat.ts
  • web/src/pages/agents/agent-card.tsx
  • web/src/pages/agents/agent-dropdown.tsx
  • web/src/pages/next-chats/hooks/use-send-shared-message.ts
  • web/src/pages/next-chats/share/index.tsx
  • web/src/services/agent-service.ts

本文档介绍了 Canvas 工作流执行引擎,该引擎负责编排异步组件执行,并通过服务器发送事件(SSE)流式传输实时事件。agent/canvas.py 中的 Graph 类实现了一个基于图的执行模型,支持取消操作、变量解析和动态路径规划。

工作流执行总览

图执行逻辑

Graph 类是代理工作流的主要执行引擎。它处理定义了组件及其连接关系的领域特定语言(DSL)。

执行模型:

RAGFlow · 图执行逻辑 · 图 1
RAGFlow · 图执行逻辑 · 图 1

关键特性:

  • 异步生成器: 生成 SSE 事件,用于向前端提供实时更新。
  • 路径驱动: 执行 self.path 中定义的组件 agent/canvas.py:111
  • 变量解析: 在执行过程中,使用正则表达式模式动态解析 {component_id@variable}{sys.query} 等变量 agent/canvas.py:168-193

来源: agent/canvas.py:43-82agent/canvas.py:134-143agent/canvas.py:168-193api/db/services/task_service.py:35

初始化与状态设置

在执行之前,Graph 会加载 DSL 并初始化组件对象:

agent/canvas.py:96-112

初始化步骤:

步骤代码实体用途
组件加载component_class(name)从注册表中动态加载组件和参数类 agent/canvas.py:101-109
参数校验param.check()校验组件参数(例如,大语言模型(LLM)ID、阈值)agent/canvas.py:105
线程池ThreadPoolExecutor(max_workers=5)管理同步组件方法的执行 agent/canvas.py:93
变量映射self.dsl["globals"]存储会话的 sys.*env.* 变量 agent/canvas.py:75-80

来源: agent/canvas.py:84-112agent/component/base.py:57-58

SSE 事件流式传输

事件类型与结构

RAGFlow 以 JSON 对象的形式生成 SSE 事件。在前端,这些事件由 useSendMessageBySSE 等钩子处理 web/src/pages/agent/chat/use-send-agent-message.ts:244

MessageEventType 中定义的事件类型:

事件类型源文件数据载荷
Messageweb/src/pages/agent/chat/use-send-agent-message.ts:47content 片段、audio_binarystart_to_think 标志
WorkflowFinishedweb/src/pages/agent/chat/use-send-agent-message.ts:84最终的聚合输出、附件和下载项
UserInputsweb/src/pages/agent/chat/use-send-agent-message.ts:97在工作流执行期间触发用于手动用户输入的 UI 表单
MessageEndweb/src/pages/agent/chat/use-send-agent-message.ts:150reference(检索到的片段/文档)

来源: web/src/pages/agent/chat/use-send-agent-message.ts:45-108web/src/pages/agent/chat/use-send-agent-message.ts:148-163

前端事件处理

前端将原始的 SSE 流解析为结构化的 JSON 事件,以更新 UI 状态。

SSE 数据流:

RAGFlow · 前端事件处理 · 图 2
RAGFlow · 前端事件处理 · 图 2

关键前端工具函数:

  • findMessageFromList:将多个 Message 事件聚合为单个内容字符串,专门处理 <think> 标签以支持 DeepSeek 等推理模型 web/src/pages/agent/chat/use-send-agent-message.ts:45-93
  • getLatestError:从组件输出的 _ERROR 字段中提取特定的错误消息 web/src/pages/agent/chat/use-send-agent-message.ts:110-118

来源: web/src/pages/agent/chat/use-send-agent-message.ts:45-118web/src/pages/agent/chat/box.tsx:24-39

组件执行详解

大语言模型(LLM)与消息流式传输

LLMMessage 组件负责内容生成和输出格式化。

  • LLM 组件: 通过 LLMBundle 管理聊天补全 agent/component/llm.py:89。它通过 LLMParam.gen_conf() 生成 temperaturetop_p 等模型参数 agent/component/llm.py:62-80
  • Message 组件: 实现了一个专门的 _stream 方法,在解析嵌入变量的同时生成内容片段。它还处理从组件输出中提取 downloads(文件对象)。

来源: agent/component/llm.py:62-93agent/component/llm.py:118-127

带工具的代理(ReAct 循环)

Agent 组件实现了一个多步骤的推理循环,并支持工具调用:

  • 工具绑定: self._param.tools 中列出的组件会被实例化并绑定到 LLMBundle agent/component/agent_with_tools.py:107-113
  • MCP 集成: 连接到模型上下文协议(MCP)服务器以获取工具元数据,并通过 MCPToolCallSession 执行调用 agent/component/agent_with_tools.py:101-109
  • 回调机制: 使用 _canvas.tool_use_callback 在推理过程中发出中间日志 agent/component/agent_with_tools.py:110

来源: agent/component/agent_with_tools.py:73-113agent/component/agent_with_tools.py:101-109

检索与检索增强生成(RAG)搜索

Retrieval 组件在数据集上执行语义搜索:

  • 引用格式化: 检索到的片段通过 chunks_format 格式化为标准结构,供大语言模型(LLM)使用 rag/prompts/generator.py:40-65
  • 提示构建: kb_prompt 函数将检索到的片段转换为结构化的文本块,包含 ID、标题和内容,用于大语言模型(LLM)上下文 rag/prompts/generator.py:127-161

来源: rag/prompts/generator.py:40-65rag/prompts/generator.py:127-161

分类组件

Categorize 组件在工作流中充当路由器,通过分类用户查询来实现:

  • 提示生成: 根据类别描述和示例动态构建系统提示。
  • 分类: 调用聊天模型来确定类别。
  • 路由:_next 输出设置为与识别类别关联的组件 ID。

来源: agent/component/llm.py:118-127agent/canvas.py:168-193

变量与上下文管理

变量解析

Graph 使用基于正则表达式的替换系统来解析变量 agent/canvas.py:168-193

解析层级:

  1. 系统变量: sys.querysys.user_idsys.conversation_turnssys.files agent/canvas.py:75-80
  2. 组件输出: component_id@variable_name(例如,retrieval_0@chunksagent/canvas.py:199-204
  3. 环境变量: env.VARIABLE_NAME agent/canvas.py:169

来源: agent/canvas.py:168-204

上下文适配

为了保持在大语言模型(LLM)上下文限制内,message_fit_in 会修剪历史记录,同时确保系统提示和最新查询被优先保留 rag/prompts/generator.py:68-125

来源: rag/prompts/generator.py:68-125

取消机制

工作流执行可以通过基于 Redis 的信号异步中止。

  • 触发: 取消逻辑在 Redis 中设置一个取消标志:REDIS_CONN.set(f"{task_id}-cancel", "x") agent/canvas.py:139-140
  • 检查: 执行引擎在组件执行开始时检查 has_canceled(task_id) api/db/services/task_service.py:35
  • 重置: Graph.reset() 清除 Redis 中的取消标志和日志 agent/canvas.py:134-143

来源: agent/canvas.py:134-143api/db/services/task_service.py:35