agentic_huge_data_base / wiki
页面 Dify · 5 工作流引擎与节点执行·DeepWiki 中文全文译文

5 · 工作流引擎与节点执行(Workflow Engine and Node Execution)

应用编排与外部知识接入 · 聚焦本章的模块关系、源码依据与实现要点。

项目Dify 章节5 状态全文译文 模块系统架构、测试、发布与运维、接口与服务契约、配置治理
源码线索
  • api/.importlinter
  • api/core/app/file_access/__init__.py
  • api/core/app/file_access/controller.py
  • api/core/app/file_access/scope.py
  • api/core/workflow/node_factory.py
  • api/core/workflow/node_runtime.py
  • api/core/workflow/workflow_entry.py
  • api/services/plugin/plugin_migration.py
  • api/services/plugin/plugin_service.py
  • api/services/rag_pipeline/rag_pipeline.py
模块标签
  • 系统架构
  • 测试、发布与运维
  • 接口与服务契约
  • 配置治理
  • 图谱与关系

章节正文

工作流引擎与节点执行

工作流引擎与节点执行

相关源文件

以下文件为本 Wiki 页面的上下文来源:

  • api/.importlinter
  • api/core/app/file_access/__init__.py
  • api/core/app/file_access/controller.py
  • api/core/app/file_access/scope.py
  • api/core/workflow/node_factory.py
  • api/core/workflow/node_runtime.py
  • api/core/workflow/workflow_entry.py
  • api/services/plugin/plugin_migration.py
  • api/services/plugin/plugin_service.py
  • api/services/rag_pipeline/rag_pipeline.py
  • api/services/rag_pipeline/rag_pipeline_transform_service.py
  • api/services/workflow_draft_variable_service.py
  • api/services/workflow_service.py
  • api/tests/integration_tests/workflow/nodes/test_code.py
  • api/tests/integration_tests/workflow/nodes/test_http.py
  • api/tests/integration_tests/workflow/nodes/test_llm.py
  • api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py
  • api/tests/integration_tests/workflow/nodes/test_template_transform.py
  • api/tests/integration_tests/workflow/nodes/test_tool.py
  • api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py
  • api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py
  • api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node.py
  • api/tests/unit_tests/core/rag/datasource/test_retrieval_attachment_access.py
  • api/tests/unit_tests/core/workflow/graph_engine/test_mock_factory.py
  • api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py
  • api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py
  • api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py
  • api/tests/unit_tests/core/workflow/nodes/code/__init__.py
  • api/tests/unit_tests/core/workflow/nodes/code/code_node_spec.py
  • api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py
  • api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py
  • api/tests/unit_tests/core/workflow/nodes/template_transform/__init__.py
  • api/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.py
  • api/tests/unit_tests/core/workflow/nodes/test_if_else.py
  • api/tests/unit_tests/core/workflow/nodes/test_list_operator.py
  • api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py
  • api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_file_conversion.py
  • api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.py
  • api/tests/unit_tests/core/workflow/test_node_factory.py
  • api/tests/unit_tests/core/workflow/test_node_runtime.py
  • api/tests/unit_tests/core/workflow/test_workflow_entry.py
  • api/tests/unit_tests/services/plugin/__init__.py
  • api/tests/unit_tests/services/plugin/conftest.py
  • api/tests/unit_tests/services/plugin/test_dependencies_analysis.py
  • api/tests/unit_tests/services/plugin/test_endpoint_service.py
  • api/tests/unit_tests/services/plugin/test_plugin_migration.py
  • api/tests/unit_tests/services/plugin/test_plugin_service.py
  • api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline.py
  • api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_transform_service.py
  • api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py
  • api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py

目的与范围

本文档描述了 Dify 中的工作流执行引擎,该引擎通过基于节点的执行模型来编排基于图的应用程序逻辑。引擎处理定义为有向无环图(DAG)的工作流,其中节点代表操作(大语言模型调用、HTTP 请求、代码执行等),边定义了数据流。

本页面涵盖以下内容:

  • 工作流定义与生命周期管理
  • 节点类型及其执行语义
  • 变量管理与节点间的数据传递
  • 执行追踪与状态持久化
  • 节点工厂模式与依赖注入

有关底层执行逻辑的详细信息,请参见工作流定义与执行模型

架构总览

工作流执行架构
Dify · 工作流执行架构 · 图 1
Dify · 工作流执行架构 · 图 1

来源: api/core/workflow/workflow_entry.py:139-180, api/services/workflow_service.py:101-131, api/services/rag_pipeline/rag_pipeline.py:44-55, api/core/workflow/node_factory.py:15-18

工作流引擎采用分层架构:

  1. 入口层 - WorkflowEntry 初始化 GraphEngine 并管理用于子图的 _WorkflowChildEngineBuilder api/core/workflow/workflow_entry.py:48-82
  2. 模型层 - Workflow 存储图定义,WorkflowRun 追踪执行过程,WorkflowNodeExecutionModel 记录节点级别的结果 api/models/workflow.py:76-85, api/services/workflow_service.py:114-131
  3. 图引擎 - 编排逻辑位于 graphon 包中,通过 GraphEngineGraphRuntimeState 管理节点调度与状态 api/core/workflow/workflow_entry.py:33-41
  4. 节点层 - 具体的节点实现执行业务逻辑,通过 DifyNodeFactory 实例化,该工厂使用 resolve_workflow_node_class 解析类 api/core/workflow/node_factory.py:123-134

工作流定义与生命周期

工作流模型结构

Workflow 模型存储完整的图定义。关键字段包括版本控制系统,其中 VERSION_DRAFT 用于编辑器环境 api/services/workflow_service.py:134-161

图配置结构

图定义包含节点和边的数组。DifyNodeFactory 使用 GraphInitParams 在执行期间保存此配置 api/core/workflow/node_factory.py:72-93。节点通过 get_node_type_classes_mapping 映射到类,该函数会执行所有节点模块的副作用导入 api/core/workflow/node_factory.py:105-120

工作流生命周期状态

工作流存在草稿版本或已发布版本。WorkflowService 管理生命周期,包括检查工作流是否存在以及获取相应版本 api/services/workflow_service.py:133-172

节点类型与执行

节点工厂模式

DifyNodeFactory 负责节点实例化,注入执行所需的 GraphInitParamsGraphRuntimeState。它还提供 DifyPreparedLLMDifyToolNodeRuntime 以满足节点特定需求 api/core/workflow/node_factory.py:24-32

Dify · 节点工厂模式 · 图 2
Dify · 节点工厂模式 · 图 2

来源: api/core/workflow/node_factory.py:24-54, api/core/workflow/workflow_entry.py:89-92

核心节点类型
节点类型用途关键依赖
大语言模型LLMNode模型推理ModelInstance, LLMFileSaver api/tests/integration_tests/workflow/nodes/test_llm.py:78-89
HTTP 请求HttpRequestNode外部 API 调用ssrf_proxy, HttpRequestNodeConfig api/tests/integration_tests/workflow/nodes/test_http.py:77-87
代码CodeNodePython/JS 执行CodeExecutor, CodeNodeLimits api/core/workflow/node_factory.py:14-17
知识库KnowledgeRetrievalNode检索增强生成(RAG)查询RagPipelineService api/services/rag_pipeline/rag_pipeline.py:96-104
人工输入HumanInputNode用户交互DifyHumanInputNodeRuntime api/core/workflow/node_factory.py:26-34

有关节点实例化的详细信息,请参见节点工厂与依赖注入

节点执行流程

节点在 GraphEngine 内执行。像 RAG 这样的专用管线使用 RagPipelineTransformService 将标准数据集转换为基于工作流的执行图 api/services/rag_pipeline/rag_pipeline_transform_service.py:31-81

  • 大语言模型节点实现: 使用 DifyPromptMessageSerializer 为不同的模型模式格式化消息 api/tests/integration_tests/workflow/nodes/test_llm.py:72-75。请参见大语言模型节点实现
  • HTTP 和代码节点: HttpRequestNode 使用 Executor 处理复杂的请求体数据,包括嵌套的对象变量 api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py:59-77。请参见HTTP 请求与代码执行节点
  • 知识库检索: KnowledgeRetrievalNode(通常是 rag-pipeline 工作流类型的一部分)管理检索设置和索引技术 api/services/rag_pipeline/rag_pipeline_transform_service.py:70-77。请参见知识库检索节点
  • 人工输入: 通过 DifyHumanInputNodeRuntime 处理,支持交付渠道配置 api/services/workflow_service.py:21-25。请参见人工输入节点与暂停-恢复机制

变量管理与数据流

VariablePool 架构

VariablePool 是执行数据的中央仓库。在工作流入口过程中,它会使用系统变量和用户输入进行初始化 api/services/rag_pipeline/rag_pipeline.py:90-93

变量解析与加载

VariableLoader 的实现(例如 DraftVarLoader)负责从数据库检索变量值。它支持多线程加载,用于存储在外部存储中的大型变量 api/services/workflow_draft_variable_service.py:80-163

Dify · 变量解析与加载 · 图 3
Dify · 变量解析与加载 · 图 3

来源: api/services/workflow_draft_variable_service.py:110-163, api/core/workflow/variable_pool_initializer.py:25-38

执行追踪与状态

WorkflowRun 和 NodeExecution
  • WorkflowRun: 追踪工作流实例的整体执行过程 api/services/rag_pipeline/rag_pipeline.py:104
  • WorkflowNodeExecutionModel: 记录节点的具体执行情况,包括其状态、输入和输出 api/services/workflow_service.py:114-131

WorkflowService 提供 get_node_last_run 方法来检索历史执行数据,这对于工作流编辑器中的"从节点运行"功能至关重要 api/services/workflow_service.py:114-131

测试与模拟系统

Dify 结合使用集成测试和模拟工厂来验证工作流逻辑。MockNodeFactory(在子页面中引用)允许对复杂图进行表驱动测试,而无需实际的大语言模型或 HTTP 调用。

有关详细信息,请参见工作流测试与模拟系统

子系统总结

来源: api/core/workflow/workflow_entry.py:139-180, api/services/workflow_service.py:101-131, api/core/workflow/node_factory.py:105-134