工作流引擎与节点执行
工作流引擎与节点执行
相关源文件
以下文件为本 Wiki 页面的上下文来源:
api/.importlinterapi/core/app/file_access/__init__.pyapi/core/app/file_access/controller.pyapi/core/app/file_access/scope.pyapi/core/workflow/node_factory.pyapi/core/workflow/node_runtime.pyapi/core/workflow/workflow_entry.pyapi/services/plugin/plugin_migration.pyapi/services/plugin/plugin_service.pyapi/services/rag_pipeline/rag_pipeline.pyapi/services/rag_pipeline/rag_pipeline_transform_service.pyapi/services/workflow_draft_variable_service.pyapi/services/workflow_service.pyapi/tests/integration_tests/workflow/nodes/test_code.pyapi/tests/integration_tests/workflow/nodes/test_http.pyapi/tests/integration_tests/workflow/nodes/test_llm.pyapi/tests/integration_tests/workflow/nodes/test_parameter_extractor.pyapi/tests/integration_tests/workflow/nodes/test_template_transform.pyapi/tests/integration_tests/workflow/nodes/test_tool.pyapi/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.pyapi/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.pyapi/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node.pyapi/tests/unit_tests/core/rag/datasource/test_retrieval_attachment_access.pyapi/tests/unit_tests/core/workflow/graph_engine/test_mock_factory.pyapi/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.pyapi/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.pyapi/tests/unit_tests/core/workflow/nodes/answer/test_answer.pyapi/tests/unit_tests/core/workflow/nodes/code/__init__.pyapi/tests/unit_tests/core/workflow/nodes/code/code_node_spec.pyapi/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.pyapi/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.pyapi/tests/unit_tests/core/workflow/nodes/template_transform/__init__.pyapi/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.pyapi/tests/unit_tests/core/workflow/nodes/test_if_else.pyapi/tests/unit_tests/core/workflow/nodes/test_list_operator.pyapi/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.pyapi/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_file_conversion.pyapi/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.pyapi/tests/unit_tests/core/workflow/test_node_factory.pyapi/tests/unit_tests/core/workflow/test_node_runtime.pyapi/tests/unit_tests/core/workflow/test_workflow_entry.pyapi/tests/unit_tests/services/plugin/__init__.pyapi/tests/unit_tests/services/plugin/conftest.pyapi/tests/unit_tests/services/plugin/test_dependencies_analysis.pyapi/tests/unit_tests/services/plugin/test_endpoint_service.pyapi/tests/unit_tests/services/plugin/test_plugin_migration.pyapi/tests/unit_tests/services/plugin/test_plugin_service.pyapi/tests/unit_tests/services/rag_pipeline/test_rag_pipeline.pyapi/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_transform_service.pyapi/tests/unit_tests/services/workflow/test_draft_var_loader_simple.pyapi/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py
目的与范围
本文档描述了 Dify 中的工作流执行引擎,该引擎通过基于节点的执行模型来编排基于图的应用程序逻辑。引擎处理定义为有向无环图(DAG)的工作流,其中节点代表操作(大语言模型调用、HTTP 请求、代码执行等),边定义了数据流。
本页面涵盖以下内容:
- 工作流定义与生命周期管理
- 节点类型及其执行语义
- 变量管理与节点间的数据传递
- 执行追踪与状态持久化
- 节点工厂模式与依赖注入
有关底层执行逻辑的详细信息,请参见工作流定义与执行模型。
架构总览
工作流执行架构
来源: api/core/workflow/workflow_entry.py:139-180, api/services/workflow_service.py:101-131, api/services/rag_pipeline/rag_pipeline.py:44-55, api/core/workflow/node_factory.py:15-18
工作流引擎采用分层架构:
- 入口层 -
WorkflowEntry初始化GraphEngine并管理用于子图的_WorkflowChildEngineBuilderapi/core/workflow/workflow_entry.py:48-82。 - 模型层 -
Workflow存储图定义,WorkflowRun追踪执行过程,WorkflowNodeExecutionModel记录节点级别的结果api/models/workflow.py:76-85,api/services/workflow_service.py:114-131。 - 图引擎 - 编排逻辑位于
graphon包中,通过GraphEngine和GraphRuntimeState管理节点调度与状态api/core/workflow/workflow_entry.py:33-41。 - 节点层 - 具体的节点实现执行业务逻辑,通过
DifyNodeFactory实例化,该工厂使用resolve_workflow_node_class解析类api/core/workflow/node_factory.py:123-134。
工作流定义与生命周期
工作流模型结构
Workflow 模型存储完整的图定义。关键字段包括版本控制系统,其中 VERSION_DRAFT 用于编辑器环境 api/services/workflow_service.py:134-161。
图配置结构
图定义包含节点和边的数组。DifyNodeFactory 使用 GraphInitParams 在执行期间保存此配置 api/core/workflow/node_factory.py:72-93。节点通过 get_node_type_classes_mapping 映射到类,该函数会执行所有节点模块的副作用导入 api/core/workflow/node_factory.py:105-120。
工作流生命周期状态
工作流存在草稿版本或已发布版本。WorkflowService 管理生命周期,包括检查工作流是否存在以及获取相应版本 api/services/workflow_service.py:133-172。
节点类型与执行
节点工厂模式
DifyNodeFactory 负责节点实例化,注入执行所需的 GraphInitParams 和 GraphRuntimeState。它还提供 DifyPreparedLLM 和 DifyToolNodeRuntime 以满足节点特定需求 api/core/workflow/node_factory.py:24-32。
来源: api/core/workflow/node_factory.py:24-54, api/core/workflow/workflow_entry.py:89-92
核心节点类型
| 节点类型 | 类 | 用途 | 关键依赖 |
|---|---|---|---|
| 大语言模型 | LLMNode | 模型推理 | ModelInstance, LLMFileSaver api/tests/integration_tests/workflow/nodes/test_llm.py:78-89 |
| HTTP 请求 | HttpRequestNode | 外部 API 调用 | ssrf_proxy, HttpRequestNodeConfig api/tests/integration_tests/workflow/nodes/test_http.py:77-87 |
| 代码 | CodeNode | Python/JS 执行 | CodeExecutor, CodeNodeLimits api/core/workflow/node_factory.py:14-17 |
| 知识库 | KnowledgeRetrievalNode | 检索增强生成(RAG)查询 | RagPipelineService api/services/rag_pipeline/rag_pipeline.py:96-104 |
| 人工输入 | HumanInputNode | 用户交互 | DifyHumanInputNodeRuntime api/core/workflow/node_factory.py:26-34 |
有关节点实例化的详细信息,请参见节点工厂与依赖注入。
节点执行流程
节点在 GraphEngine 内执行。像 RAG 这样的专用管线使用 RagPipelineTransformService 将标准数据集转换为基于工作流的执行图 api/services/rag_pipeline/rag_pipeline_transform_service.py:31-81。
- 大语言模型节点实现: 使用
DifyPromptMessageSerializer为不同的模型模式格式化消息api/tests/integration_tests/workflow/nodes/test_llm.py:72-75。请参见大语言模型节点实现。 - HTTP 和代码节点:
HttpRequestNode使用Executor处理复杂的请求体数据,包括嵌套的对象变量api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py:59-77。请参见HTTP 请求与代码执行节点。 - 知识库检索:
KnowledgeRetrievalNode(通常是rag-pipeline工作流类型的一部分)管理检索设置和索引技术api/services/rag_pipeline/rag_pipeline_transform_service.py:70-77。请参见知识库检索节点。 - 人工输入: 通过
DifyHumanInputNodeRuntime处理,支持交付渠道配置api/services/workflow_service.py:21-25。请参见人工输入节点与暂停-恢复机制。
变量管理与数据流
VariablePool 架构
VariablePool 是执行数据的中央仓库。在工作流入口过程中,它会使用系统变量和用户输入进行初始化 api/services/rag_pipeline/rag_pipeline.py:90-93。
变量解析与加载
VariableLoader 的实现(例如 DraftVarLoader)负责从数据库检索变量值。它支持多线程加载,用于存储在外部存储中的大型变量 api/services/workflow_draft_variable_service.py:80-163。
来源: api/services/workflow_draft_variable_service.py:110-163, api/core/workflow/variable_pool_initializer.py:25-38
执行追踪与状态
WorkflowRun 和 NodeExecution
- WorkflowRun: 追踪工作流实例的整体执行过程
api/services/rag_pipeline/rag_pipeline.py:104。 - WorkflowNodeExecutionModel: 记录节点的具体执行情况,包括其状态、输入和输出
api/services/workflow_service.py:114-131。
WorkflowService 提供 get_node_last_run 方法来检索历史执行数据,这对于工作流编辑器中的"从节点运行"功能至关重要 api/services/workflow_service.py:114-131。
测试与模拟系统
Dify 结合使用集成测试和模拟工厂来验证工作流逻辑。MockNodeFactory(在子页面中引用)允许对复杂图进行表驱动测试,而无需实际的大语言模型或 HTTP 调用。
有关详细信息,请参见工作流测试与模拟系统。
子系统总结
- 工作流定义与执行模型 — 核心引擎和图逻辑。
- 节点工厂与依赖注入 — 如何通过
DifyNodeFactory构建和配置节点。 - 大语言模型节点实现 — 模型推理、使用追踪和提示词处理。
- HTTP 请求与代码执行节点 — 通过 SSRF 代理和沙箱代码执行实现外部集成。
- 知识库检索节点 — 工作流中的 RAG 集成与管线转换。
- 人工输入节点与暂停-恢复机制 — 人机交互与表单交付。
- 工作流测试与模拟系统 — 自动化工作流测试和变量模拟的基础设施。
来源: api/core/workflow/workflow_entry.py:139-180, api/services/workflow_service.py:101-131, api/core/workflow/node_factory.py:105-134