工作流执行与流式
工作流执行与流式传输
相关源文件
以下文件为本维基页面的生成提供了上下文:
agent/canvas.pyagent/component/agent_with_tools.pyagent/component/base.pyagent/component/categorize.pyagent/component/llm.pyagent/tools/base.pyapi/apps/restful_apis/agent_api.pyapi/db/services/canvas_service.pyapi/db/services/conversation_service.pycommon/mcp_tool_call_conn.pydeepdoc/parser/mineru_parser.pyrag/prompts/generator.pytest/testcases/test_http_api/test_session_management/test_agent_completions.pytest/testcases/test_http_api/test_session_management/test_agent_sessions.pytest/testcases/test_http_api/test_session_management/test_chat_completions.pytest/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.pytest/testcases/test_web_api/test_agent_app/test_agents_webhook_unit.pyweb/src/components/embed-container.tsxweb/src/components/embed-dialog/use-show-embed-dialog.tsweb/src/components/home-card.tsxweb/src/components/message-input/next.tsxweb/src/components/next-message-item/index.tsxweb/src/components/ui/accordion.tsxweb/src/components/ui/tree-view.tsxweb/src/hooks/use-agent-request.tsweb/src/hooks/use-send-message.tsweb/src/interfaces/database/agent.tsweb/src/pages/agent/chat/box.tsxweb/src/pages/agent/chat/chat-sheet.tsxweb/src/pages/agent/chat/use-send-agent-message.tsweb/src/pages/agent/constant/chat.tsweb/src/pages/agent/hooks/use-cache-chat-log.tsweb/src/pages/agent/hooks/use-chat-logic.tsweb/src/pages/agent/share/index.tsxweb/src/pages/agent/utils/chat.tsweb/src/pages/agents/agent-card.tsxweb/src/pages/agents/agent-dropdown.tsxweb/src/pages/next-chats/hooks/use-send-shared-message.tsweb/src/pages/next-chats/share/index.tsxweb/src/services/agent-service.ts
本文档介绍了 Canvas 工作流执行引擎,该引擎负责编排异步组件执行,并通过服务器发送事件(SSE)流式传输实时事件。agent/canvas.py 中的 Graph 类实现了一个基于图的执行模型,支持取消操作、变量解析和动态路径规划。
工作流执行总览
图执行逻辑
Graph 类是代理工作流的主要执行引擎。它处理定义了组件及其连接关系的领域特定语言(DSL)。
执行模型:
关键特性:
- 异步生成器: 生成 SSE 事件,用于向前端提供实时更新。
- 路径驱动: 执行
self.path中定义的组件agent/canvas.py:111。 - 变量解析: 在执行过程中,使用正则表达式模式动态解析
{component_id@variable}或{sys.query}等变量agent/canvas.py:168-193。
来源: agent/canvas.py:43-82、agent/canvas.py:134-143、agent/canvas.py:168-193、api/db/services/task_service.py:35
初始化与状态设置
在执行之前,Graph 会加载 DSL 并初始化组件对象:
agent/canvas.py:96-112
初始化步骤:
| 步骤 | 代码实体 | 用途 |
|---|---|---|
| 组件加载 | component_class(name) | 从注册表中动态加载组件和参数类 agent/canvas.py:101-109 |
| 参数校验 | param.check() | 校验组件参数(例如,大语言模型(LLM)ID、阈值)agent/canvas.py:105 |
| 线程池 | ThreadPoolExecutor(max_workers=5) | 管理同步组件方法的执行 agent/canvas.py:93 |
| 变量映射 | self.dsl["globals"] | 存储会话的 sys.* 和 env.* 变量 agent/canvas.py:75-80 |
来源: agent/canvas.py:84-112、agent/component/base.py:57-58
SSE 事件流式传输
事件类型与结构
RAGFlow 以 JSON 对象的形式生成 SSE 事件。在前端,这些事件由 useSendMessageBySSE 等钩子处理 web/src/pages/agent/chat/use-send-agent-message.ts:244。
MessageEventType 中定义的事件类型:
| 事件类型 | 源文件 | 数据载荷 |
|---|---|---|
Message | web/src/pages/agent/chat/use-send-agent-message.ts:47 | content 片段、audio_binary、start_to_think 标志 |
WorkflowFinished | web/src/pages/agent/chat/use-send-agent-message.ts:84 | 最终的聚合输出、附件和下载项 |
UserInputs | web/src/pages/agent/chat/use-send-agent-message.ts:97 | 在工作流执行期间触发用于手动用户输入的 UI 表单 |
MessageEnd | web/src/pages/agent/chat/use-send-agent-message.ts:150 | reference(检索到的片段/文档) |
来源: web/src/pages/agent/chat/use-send-agent-message.ts:45-108、web/src/pages/agent/chat/use-send-agent-message.ts:148-163
前端事件处理
前端将原始的 SSE 流解析为结构化的 JSON 事件,以更新 UI 状态。
SSE 数据流:
关键前端工具函数:
findMessageFromList:将多个Message事件聚合为单个内容字符串,专门处理<think>标签以支持 DeepSeek 等推理模型web/src/pages/agent/chat/use-send-agent-message.ts:45-93。getLatestError:从组件输出的_ERROR字段中提取特定的错误消息web/src/pages/agent/chat/use-send-agent-message.ts:110-118。
来源: web/src/pages/agent/chat/use-send-agent-message.ts:45-118、web/src/pages/agent/chat/box.tsx:24-39
组件执行详解
大语言模型(LLM)与消息流式传输
LLM 和 Message 组件负责内容生成和输出格式化。
- LLM 组件: 通过
LLMBundle管理聊天补全agent/component/llm.py:89。它通过LLMParam.gen_conf()生成temperature和top_p等模型参数agent/component/llm.py:62-80。 - Message 组件: 实现了一个专门的
_stream方法,在解析嵌入变量的同时生成内容片段。它还处理从组件输出中提取downloads(文件对象)。
来源: agent/component/llm.py:62-93、agent/component/llm.py:118-127
带工具的代理(ReAct 循环)
Agent 组件实现了一个多步骤的推理循环,并支持工具调用:
- 工具绑定:
self._param.tools中列出的组件会被实例化并绑定到LLMBundleagent/component/agent_with_tools.py:107-113。 - MCP 集成: 连接到模型上下文协议(MCP)服务器以获取工具元数据,并通过
MCPToolCallSession执行调用agent/component/agent_with_tools.py:101-109。 - 回调机制: 使用
_canvas.tool_use_callback在推理过程中发出中间日志agent/component/agent_with_tools.py:110。
来源: agent/component/agent_with_tools.py:73-113、agent/component/agent_with_tools.py:101-109
检索与检索增强生成(RAG)搜索
Retrieval 组件在数据集上执行语义搜索:
- 引用格式化: 检索到的片段通过
chunks_format格式化为标准结构,供大语言模型(LLM)使用rag/prompts/generator.py:40-65。 - 提示构建:
kb_prompt函数将检索到的片段转换为结构化的文本块,包含 ID、标题和内容,用于大语言模型(LLM)上下文rag/prompts/generator.py:127-161。
来源: rag/prompts/generator.py:40-65、rag/prompts/generator.py:127-161
分类组件
Categorize 组件在工作流中充当路由器,通过分类用户查询来实现:
- 提示生成: 根据类别描述和示例动态构建系统提示。
- 分类: 调用聊天模型来确定类别。
- 路由: 将
_next输出设置为与识别类别关联的组件 ID。
来源: agent/component/llm.py:118-127、agent/canvas.py:168-193
变量与上下文管理
变量解析
Graph 使用基于正则表达式的替换系统来解析变量 agent/canvas.py:168-193。
解析层级:
- 系统变量:
sys.query、sys.user_id、sys.conversation_turns、sys.filesagent/canvas.py:75-80。 - 组件输出:
component_id@variable_name(例如,retrieval_0@chunks)agent/canvas.py:199-204。 - 环境变量:
env.VARIABLE_NAMEagent/canvas.py:169。
来源: agent/canvas.py:168-204
上下文适配
为了保持在大语言模型(LLM)上下文限制内,message_fit_in 会修剪历史记录,同时确保系统提示和最新查询被优先保留 rag/prompts/generator.py:68-125。
来源: rag/prompts/generator.py:68-125
取消机制
工作流执行可以通过基于 Redis 的信号异步中止。
- 触发: 取消逻辑在 Redis 中设置一个取消标志:
REDIS_CONN.set(f"{task_id}-cancel", "x")agent/canvas.py:139-140。 - 检查: 执行引擎在组件执行开始时检查
has_canceled(task_id)api/db/services/task_service.py:35。 - 重置:
Graph.reset()清除 Redis 中的取消标志和日志agent/canvas.py:134-143。
来源: agent/canvas.py:134-143、api/db/services/task_service.py:35