工作流定义与执行模型
工作流定义与执行模型
相关源文件
以下文件为本维基页面的生成提供了上下文:
api/core/workflow/workflow_entry.pyapi/models/account.pyapi/models/api_based_extension.pyapi/models/dataset.pyapi/models/model.pyapi/models/provider.pyapi/models/source.pyapi/models/task.pyapi/models/tools.pyapi/models/web.pyapi/models/workflow.pyapi/services/plugin/plugin_migration.pyapi/services/plugin/plugin_service.pyapi/services/rag_pipeline/rag_pipeline.pyapi/services/rag_pipeline/rag_pipeline_transform_service.pyapi/services/workflow_draft_variable_service.pyapi/services/workflow_service.pyapi/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.pyapi/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.pyapi/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node.pyapi/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_file_conversion.pyapi/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.pyapi/tests/unit_tests/core/workflow/test_workflow_entry.pyapi/tests/unit_tests/services/plugin/__init__.pyapi/tests/unit_tests/services/plugin/conftest.pyapi/tests/unit_tests/services/plugin/test_dependencies_analysis.pyapi/tests/unit_tests/services/plugin/test_endpoint_service.pyapi/tests/unit_tests/services/plugin/test_plugin_migration.pyapi/tests/unit_tests/services/plugin/test_plugin_service.pyapi/tests/unit_tests/services/rag_pipeline/test_rag_pipeline.pyapi/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_transform_service.pyapi/tests/unit_tests/services/workflow/test_draft_var_loader_simple.pyapi/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py
目的与范围
本文档描述了 Dify 中的工作流定义与执行模型,涵盖图的静态定义、引擎的运行时编排以及执行状态的持久化。
该架构遵循严格的责任分离原则:
- 静态定义层:
Workflow模型存储图蓝图(节点、边和配置)api/models/workflow.py:147-204。 - 执行引擎层:
GraphEngine编排执行流程,管理节点转换和并行执行api/core/workflow/workflow_entry.py:185-200。 - 运行时状态管理:
VariablePool和GraphRuntimeState在运行期间处理数据流和变量作用域api/core/workflow/workflow_entry.py:84-88。 - 执行记录层:
WorkflowRun和WorkflowNodeExecutionModel持久化执行的结果、指标和日志api/models/workflow.py:581-647、api/models/workflow.py:753-891。
工作流模型:静态定义
Workflow 模型是 SQLAlchemy 实体,代表自动化流程的"蓝图"。它包含图结构和配置,但不持有执行状态。
工作流表结构
| 字段 | 类型 | 描述 |
|---|---|---|
id | UUID | 主键 api/models/workflow.py:186 |
tenant_id | UUID | 多租户的工作区标识符 api/models/workflow.py:187 |
app_id | UUID | 该工作流所属的应用 api/models/workflow.py:188 |
type | WorkflowType | 类型:workflow、chat 或 rag-pipeline api/models/workflow.py:107-114 |
version | String | draft 表示工作副本,或版本号/时间戳 api/models/workflow.py:190 |
graph | LongText | 可视化和逻辑结构(节点/边)api/models/workflow.py:193 |
_features | LongText | 功能特性(如开场白)的 JSON 配置 api/models/workflow.py:194 |
_environment_variables | LongText | 工作流范围内可用的加密变量 api/models/workflow.py:205 |
来源: api/models/workflow.py:147-210
工作流到执行实体的映射
下图将自然语言概念桥接到执行期间使用的具体代码实体。
来源: api/core/workflow/workflow_entry.py:139-192、api/models/workflow.py:147-210、api/models/workflow.py:581-647
GraphEngine 架构
GraphEngine(由 graphon 包提供,在 WorkflowEntry 中初始化)是核心运行时组件,负责遍历图并调用节点。
初始化与执行生命周期
- 实例化:
WorkflowEntry使用Graph对象、GraphRuntimeState和CommandChannel(默认为InMemoryChannel)初始化GraphEngineapi/core/workflow/workflow_entry.py:177-190。 - 状态设置:
VariablePool会填充系统变量、用户输入和环境变量api/core/workflow/workflow_entry.py:151-152。 - 节点创建:
DifyNodeFactory根据图配置和运行时状态实例化节点类api/core/workflow/workflow_entry.py:89-92。 - 执行:引擎根据节点依赖关系处理节点。它支持执行层(如用于速率限制的
LLMQuotaLayer)api/core/workflow/workflow_entry.py:115。
VariablePool 与变量加载
VariablePool 是执行期间所有数据的中央仓库。节点从中读取数据,并将结果写回其中。
- 选择器:通过选择器(如
['node_id', 'output_key'])访问变量。 - VariableLoader:
VariableLoader接口(由DraftVarLoader实现)负责从持久化存储中检索变量api/services/workflow_draft_variable_service.py:80-105。 - 卸载支持:大型变量会存储在外部,并使用
ThreadPoolExecutor加载以减少延迟api/services/workflow_draft_variable_service.py:158-163。
来源: api/core/workflow/workflow_entry.py:139-192、api/services/workflow_draft_variable_service.py:80-163
执行记录与持久化
Dify 通过 WorkflowRun 和 WorkflowNodeExecutionModel 实体跟踪每次工作流执行。
WorkflowRun:执行记录
WorkflowRun 模型存储单次执行的高层状态 api/models/workflow.py:581-647。
- 状态:跟踪运行是
running(运行中)、succeeded(成功)、failed(失败)还是stopped(已停止)api/models/workflow.py:636。 - 图快照:存储执行时图的 JSON 快照
api/models/workflow.py:606-608。 - 指标:记录
total_tokens(总 Token 数)、total_steps(总步骤数)和elapsed_time(耗时)api/models/workflow.py:639-642。
WorkflowNodeExecutionModel:细粒度追踪
运行中执行的每个节点都会创建一个 WorkflowNodeExecutionModel 记录 api/models/workflow.py:753-891。
- 输入/输出:捕获传递给节点和节点返回的数据。大型数据会为数据库截断并卸载到存储中
api/models/workflow.py:794-796。 - 状态:跟踪节点级别的成功或失败
api/models/workflow.py:814。 - 元数据:存储节点特定的元数据,如提供商信息或模型使用情况
api/models/workflow.py:821。
来源: api/models/workflow.py:581-891
数据流图:执行到持久化
下图展示了引擎如何与模型交互以持久化数据。
来源: api/core/workflow/workflow_entry.py:185-200、api/models/workflow.py:753-891、api/services/workflow_draft_variable_service.py:165-180
检索增强生成(RAG)管线集成
工作流也可以表示检索增强生成(RAG)管线。
- RagPipelineService:管理检索增强生成(RAG)管线的生命周期,这些管线是用于文档索引和检索的专门工作流
api/services/rag_pipeline/rag_pipeline.py:96-104。 - 转换:
RagPipelineTransformService可以将传统数据集索引配置转换为基于图的管线api/services/rag_pipeline/rag_pipeline_transform_service.py:31-104。 - 节点类型:这些管线中使用专门的节点(如
knowledge-index)来处理向量存储和检索api/services/rag_pipeline/rag_pipeline_transform_service.py:70-76。
来源: api/services/rag_pipeline/rag_pipeline.py:96-104、api/services/rag_pipeline/rag_pipeline_transform_service.py:30-104