agentic_huge_data_base / wiki
页面 Dify · 5.6 人工输入节点与暂停恢复机制·DeepWiki 中文全文译文

5.6 · 人工输入节点与暂停恢复机制(Human Input Node and Pause-Resume Mechanism)

应用编排与外部知识接入 · 聚焦本章的模块关系、源码依据与实现要点。

项目Dify 章节5.6 状态全文译文 模块工作流与编排、系统架构、界面与交互、评测、反馈与人工复核
源码线索
  • api/controllers/console/app/agent.py
  • api/controllers/console/app/ops_trace.py
  • api/controllers/console/app/workflow_trigger.py
  • api/controllers/console/datasets/rag_pipeline/datasource_content_preview.py
  • api/core/app/apps/advanced_chat/app_generator.py
  • api/core/app/apps/advanced_chat/app_runner.py
  • api/core/app/apps/advanced_chat/generate_task_pipeline.py
  • api/core/app/apps/agent_chat/app_generator.py
  • api/core/app/apps/chat/app_generator.py
  • api/core/app/apps/completion/app_generator.py
模块标签
  • 工作流与编排
  • 系统架构
  • 界面与交互
  • 评测、反馈与人工复核
  • 图谱与关系

章节正文

人工输入节点与暂停恢复机制

人工输入节点与暂停-恢复机制

相关源文件

本章引用的主要源码文件:

  • api/controllers/console/app/agent.py
  • api/controllers/console/app/ops_trace.py
  • api/controllers/console/app/workflow_trigger.py
  • api/controllers/console/datasets/rag_pipeline/datasource_content_preview.py
  • api/core/app/apps/advanced_chat/app_generator.py
  • api/core/app/apps/advanced_chat/app_runner.py
  • api/core/app/apps/advanced_chat/generate_task_pipeline.py
  • api/core/app/apps/agent_chat/app_generator.py
  • api/core/app/apps/chat/app_generator.py
  • api/core/app/apps/completion/app_generator.py
  • api/core/app/apps/message_based_app_generator.py
  • api/core/app/apps/pipeline/pipeline_runner.py
  • api/core/app/apps/workflow/app_generator.py
  • api/core/app/apps/workflow/app_runner.py
  • api/core/app/apps/workflow/generate_task_pipeline.py
  • api/core/app/apps/workflow_app_runner.py
  • api/core/app/entities/app_invoke_entities.py
  • api/core/app/entities/queue_entities.py
  • api/core/app/entities/task_entities.py
  • api/core/app/layers/pause_state_persist_layer.py
  • api/core/app/layers/trigger_post_layer.py
  • api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
  • api/core/app/task_pipeline/message_cycle_manager.py
  • api/models/types.py
  • api/tests/test_containers_integration_tests/controllers/console/app/test_app_apis.py
  • api/tests/test_containers_integration_tests/controllers/console/app/test_chat_conversation_status_count_api.py
  • api/tests/test_containers_integration_tests/controllers/console/helpers.py
  • api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py
  • api/tests/test_containers_integration_tests/models/test_types_enum_text.py
  • api/tests/test_containers_integration_tests/services/test_dataset_service_update_dataset.py
  • api/tests/test_containers_integration_tests/test_workflow_pause_integration.py
  • api/tests/unit_tests/controllers/console/app/test_workflow_app_log_api.py
  • api/tests/unit_tests/controllers/console/app/test_workflow_trigger_api.py
  • api/tests/unit_tests/core/app/apps/chat/test_base_app_runner_multimodal.py
  • api/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.py
  • api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py
  • api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_based_generate_task_pipeline.py
  • api/tests/unit_tests/core/app/task_pipeline/test_message_cycle_manager_optimization.py
  • api/tests/unit_tests/services/test_workflow_service.py
  • web/app/components/workflow/constants.ts
  • web/app/components/workflow/hooks/use-checklist.ts
  • web/app/components/workflow/hooks/use-nodes-available-var-list.ts
  • web/app/components/workflow/nodes/human-input/__tests__/human-input.spec.tsx
  • web/app/components/workflow/nodes/human-input/__tests__/panel.spec.tsx
  • web/app/components/workflow/nodes/human-input/components/__tests__/user-action.spec.tsx
  • web/app/components/workflow/nodes/human-input/components/delivery-method/email-configure-modal.tsx
  • web/app/components/workflow/nodes/human-input/components/delivery-method/method-item.tsx
  • web/app/components/workflow/nodes/human-input/components/user-action.tsx
  • web/app/components/workflow/nodes/human-input/panel.tsx
  • web/i18n/en-US/workflow.json

目的与范围

本文档描述了 Dify 工作流中的人工输入节点实现,以及底层用于暂停工作流执行并等待外部用户输入的暂停-恢复机制。人工输入节点允许工作流通过动态生成的表单从用户处收集结构化数据,并支持多种投递渠道和可靠的超时处理。

该机制依赖于 GraphEngine 在运行时产生 GraphRunPausedEvent 时暂停执行的能力,并在满足所需外部条件(表单提交)后恢复执行。此状态通过聊天应用和工作流应用中的专用任务管线进行管理。

架构总览

人工输入系统由多个组件协同工作,以实现工作流的暂停和恢复。

系统组件与代码实体
自然语言概念代码实体空间文件路径
节点类HumanInputNodeapi/graphon/nodes/builtin_node_types.py:82-82
暂停原因HumanInputRequiredapi/graphon/entities/pause_reason.py:78-78
工作流状态GraphRuntimeStateapi/graphon/runtime.py:83-83
持久化层PauseStatePersistenceLayerapi/core/app/layers/pause_state_persist_layer.py:42-42
任务管线WorkflowAppGenerateTaskPipelineapi/core/app/apps/workflow/generate_task_pipeline.py:75-75
响应类型WorkflowPauseStreamResponseapi/core/app/entities/task_entities.py:252-252
自然语言到代码实体空间的桥接
Dify · 自然语言到代码实体空间的桥接 · 图 1
Dify · 自然语言到代码实体空间的桥接 · 图 1

来源: api/core/app/layers/pause_state_persist_layer.py:42-42 api/graphon/entities/pause_reason.py:78-78 api/graphon/graph_engine/command_channels.py:41-41 api/models/workflow.py:70-70 api/graphon/nodes/builtin_node_types.py:82-82

HumanInputNode 实现

HumanInputNode 是一种专用节点类型,它与 GraphEngine 交互以请求暂停当前执行线程。

节点数据结构

节点配置包含表单定义(FormInputConfig)和输入字段。当执行到达该节点时,它会准备表单数据并产生一个暂停事件。

Dify · 节点数据结构 · 图 2
Dify · 节点数据结构 · 图 2
执行与暂停流程
  1. 暂停信号:节点产生一个 GraphRunPausedEvent api/graphon/graph_events.py:61-61,其中包含 HumanInputRequired 暂停原因 api/graphon/entities/pause_reason.py:78-78
  2. 引擎挂起GraphEngine 捕获此事件,任务管线(例如 WorkflowAppGenerateTaskPipeline)捕获 QueueWorkflowPausedEvent api/core/app/apps/workflow/generate_task_pipeline.py:38-38
  3. 状态持久化PauseStatePersistenceLayer api/core/app/layers/pause_state_persist_layer.py:42-42 负责保存当前状态。
  4. 工作流状态WorkflowExecutionStatus 更新为 PAUSED api/graphon/enums.py:79-79

来源: api/graphon/entities/pause_reason.py:78-78 api/graphon/graph_events.py:61-61 api/core/app/layers/pause_state_persist_layer.py:42-42 api/graphon/runtime.py:83-83 api/core/app/apps/workflow/generate_task_pipeline.py:153-171

暂停与恢复机制

暂停-恢复机制允许 GraphEngine 被序列化并在之后重建。该机制由 WorkflowAppRunner api/core/app/apps/workflow/app_runner.py:24-24AdvancedChatAppRunner api/core/app/apps/advanced_chat/app_runner.py:54-54 使用。

工作流恢复流程

恢复操作通过使用 graph_runtime_state api/core/app/apps/advanced_chat/app_runner.py:91-91 重新调用应用运行器来触发。

Dify · 工作流恢复流程 · 图 3
Dify · 工作流恢复流程 · 图 3
命令通道

Dify 使用 RedisChannel api/graphon/graph_engine/command_channels.py:41-41 与运行中或已暂停的工作流进行通信。当人工输入表单被填写时,会通过此通道发送命令以通知引擎继续执行。此操作由 WorkflowBasedAppRunner api/core/app/apps/workflow_app_runner.py:91-91 处理。

来源: api/core/app/apps/advanced_chat/app_runner.py:122-133 api/graphon/graph_engine/command_channels.py:41-41 api/core/app/apps/workflow_app_runner.py:100-103

状态持久化层

PauseStatePersistenceLayer api/core/app/layers/pause_state_persist_layer.py:42-42 负责将工作流的"快照"保存到数据库中。

持久化细节
  • 配置:使用 PauseStateLayerConfig 定义存储行为 api/core/app/layers/pause_state_persist_layer.py:42-42
  • 状态快照GraphRuntimeState api/graphon/runtime.py:83-83 被序列化。
  • 变量池:所有当前变量都保存在 VariablePool api/graphon/runtime.py:83-83 中。
  • 任务管线AdvancedChatAppGenerateTaskPipeline api/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141WorkflowAppGenerateTaskPipeline api/core/app/apps/workflow/generate_task_pipeline.py:75-75 都通过 GraphRuntimeStateSupport 混入类支持 GraphRuntimeState

来源: api/core/app/layers/pause_state_persist_layer.py:42-42 api/graphon/runtime.py:83-83 api/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141 api/core/app/apps/workflow/generate_task_pipeline.py:11-11

表单投递与通知

当工作流因等待人工输入而暂停时,Dify 可以触发通知,以确保用户知晓需要其输入。

邮件投递

Dify 包含一个专门用于人工输入邮件投递的任务:dispatch_human_input_email_task api/core/app/apps/workflow_app_runner.py:86-86。该任务在 WorkflowBasedAppRunner 中被导入和使用。

表单过期

系统通过事件处理表单超时:

  • QueueHumanInputFormTimeoutEvent:在定义的超时时间到期时触发 api/core/app/apps/workflow/generate_task_pipeline.py:21-21
  • NodeRunHumanInputFormTimeoutEvent:引擎级别的事件,指示特定节点已超时 api/core/app/apps/workflow_app_runner.py:68-68
  • QueueHumanInputFormFilledEvent:在用户成功提交表单时触发 api/core/app/apps/workflow/generate_task_pipeline.py:20-20

来源:

  • api/core/app/apps/workflow/generate_task_pipeline.py:20-21
  • api/core/app/apps/workflow_app_runner.py:68-68
  • api/core/app/apps/workflow_app_runner.py:86-86
  • api/core/app/entities/task_entities.py:91-91