人工输入节点与暂停恢复机制
人工输入节点与暂停-恢复机制
相关源文件
本章引用的主要源码文件:
api/controllers/console/app/agent.pyapi/controllers/console/app/ops_trace.pyapi/controllers/console/app/workflow_trigger.pyapi/controllers/console/datasets/rag_pipeline/datasource_content_preview.pyapi/core/app/apps/advanced_chat/app_generator.pyapi/core/app/apps/advanced_chat/app_runner.pyapi/core/app/apps/advanced_chat/generate_task_pipeline.pyapi/core/app/apps/agent_chat/app_generator.pyapi/core/app/apps/chat/app_generator.pyapi/core/app/apps/completion/app_generator.pyapi/core/app/apps/message_based_app_generator.pyapi/core/app/apps/pipeline/pipeline_runner.pyapi/core/app/apps/workflow/app_generator.pyapi/core/app/apps/workflow/app_runner.pyapi/core/app/apps/workflow/generate_task_pipeline.pyapi/core/app/apps/workflow_app_runner.pyapi/core/app/entities/app_invoke_entities.pyapi/core/app/entities/queue_entities.pyapi/core/app/entities/task_entities.pyapi/core/app/layers/pause_state_persist_layer.pyapi/core/app/layers/trigger_post_layer.pyapi/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.pyapi/core/app/task_pipeline/message_cycle_manager.pyapi/models/types.pyapi/tests/test_containers_integration_tests/controllers/console/app/test_app_apis.pyapi/tests/test_containers_integration_tests/controllers/console/app/test_chat_conversation_status_count_api.pyapi/tests/test_containers_integration_tests/controllers/console/helpers.pyapi/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/test_containers_integration_tests/models/test_types_enum_text.pyapi/tests/test_containers_integration_tests/services/test_dataset_service_update_dataset.pyapi/tests/test_containers_integration_tests/test_workflow_pause_integration.pyapi/tests/unit_tests/controllers/console/app/test_workflow_app_log_api.pyapi/tests/unit_tests/controllers/console/app/test_workflow_trigger_api.pyapi/tests/unit_tests/core/app/apps/chat/test_base_app_runner_multimodal.pyapi/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.pyapi/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/unit_tests/core/app/task_pipeline/test_easy_ui_based_generate_task_pipeline.pyapi/tests/unit_tests/core/app/task_pipeline/test_message_cycle_manager_optimization.pyapi/tests/unit_tests/services/test_workflow_service.pyweb/app/components/workflow/constants.tsweb/app/components/workflow/hooks/use-checklist.tsweb/app/components/workflow/hooks/use-nodes-available-var-list.tsweb/app/components/workflow/nodes/human-input/__tests__/human-input.spec.tsxweb/app/components/workflow/nodes/human-input/__tests__/panel.spec.tsxweb/app/components/workflow/nodes/human-input/components/__tests__/user-action.spec.tsxweb/app/components/workflow/nodes/human-input/components/delivery-method/email-configure-modal.tsxweb/app/components/workflow/nodes/human-input/components/delivery-method/method-item.tsxweb/app/components/workflow/nodes/human-input/components/user-action.tsxweb/app/components/workflow/nodes/human-input/panel.tsxweb/i18n/en-US/workflow.json
目的与范围
本文档描述了 Dify 工作流中的人工输入节点实现,以及底层用于暂停工作流执行并等待外部用户输入的暂停-恢复机制。人工输入节点允许工作流通过动态生成的表单从用户处收集结构化数据,并支持多种投递渠道和可靠的超时处理。
该机制依赖于 GraphEngine 在运行时产生 GraphRunPausedEvent 时暂停执行的能力,并在满足所需外部条件(表单提交)后恢复执行。此状态通过聊天应用和工作流应用中的专用任务管线进行管理。
架构总览
人工输入系统由多个组件协同工作,以实现工作流的暂停和恢复。
系统组件与代码实体
| 自然语言概念 | 代码实体空间 | 文件路径 |
|---|---|---|
| 节点类 | HumanInputNode | api/graphon/nodes/builtin_node_types.py:82-82 |
| 暂停原因 | HumanInputRequired | api/graphon/entities/pause_reason.py:78-78 |
| 工作流状态 | GraphRuntimeState | api/graphon/runtime.py:83-83 |
| 持久化层 | PauseStatePersistenceLayer | api/core/app/layers/pause_state_persist_layer.py:42-42 |
| 任务管线 | WorkflowAppGenerateTaskPipeline | api/core/app/apps/workflow/generate_task_pipeline.py:75-75 |
| 响应类型 | WorkflowPauseStreamResponse | api/core/app/entities/task_entities.py:252-252 |
自然语言到代码实体空间的桥接
来源: api/core/app/layers/pause_state_persist_layer.py:42-42 api/graphon/entities/pause_reason.py:78-78 api/graphon/graph_engine/command_channels.py:41-41 api/models/workflow.py:70-70 api/graphon/nodes/builtin_node_types.py:82-82
HumanInputNode 实现
HumanInputNode 是一种专用节点类型,它与 GraphEngine 交互以请求暂停当前执行线程。
节点数据结构
节点配置包含表单定义(FormInputConfig)和输入字段。当执行到达该节点时,它会准备表单数据并产生一个暂停事件。
执行与暂停流程
- 暂停信号:节点产生一个
GraphRunPausedEventapi/graphon/graph_events.py:61-61,其中包含HumanInputRequired暂停原因api/graphon/entities/pause_reason.py:78-78。 - 引擎挂起:
GraphEngine捕获此事件,任务管线(例如WorkflowAppGenerateTaskPipeline)捕获QueueWorkflowPausedEventapi/core/app/apps/workflow/generate_task_pipeline.py:38-38。 - 状态持久化:
PauseStatePersistenceLayerapi/core/app/layers/pause_state_persist_layer.py:42-42负责保存当前状态。 - 工作流状态:
WorkflowExecutionStatus更新为PAUSEDapi/graphon/enums.py:79-79。
来源: api/graphon/entities/pause_reason.py:78-78 api/graphon/graph_events.py:61-61 api/core/app/layers/pause_state_persist_layer.py:42-42 api/graphon/runtime.py:83-83 api/core/app/apps/workflow/generate_task_pipeline.py:153-171
暂停与恢复机制
暂停-恢复机制允许 GraphEngine 被序列化并在之后重建。该机制由 WorkflowAppRunner api/core/app/apps/workflow/app_runner.py:24-24 和 AdvancedChatAppRunner api/core/app/apps/advanced_chat/app_runner.py:54-54 使用。
工作流恢复流程
恢复操作通过使用 graph_runtime_state api/core/app/apps/advanced_chat/app_runner.py:91-91 重新调用应用运行器来触发。
命令通道
Dify 使用 RedisChannel api/graphon/graph_engine/command_channels.py:41-41 与运行中或已暂停的工作流进行通信。当人工输入表单被填写时,会通过此通道发送命令以通知引擎继续执行。此操作由 WorkflowBasedAppRunner api/core/app/apps/workflow_app_runner.py:91-91 处理。
来源: api/core/app/apps/advanced_chat/app_runner.py:122-133 api/graphon/graph_engine/command_channels.py:41-41 api/core/app/apps/workflow_app_runner.py:100-103
状态持久化层
PauseStatePersistenceLayer api/core/app/layers/pause_state_persist_layer.py:42-42 负责将工作流的"快照"保存到数据库中。
持久化细节
- 配置:使用
PauseStateLayerConfig定义存储行为api/core/app/layers/pause_state_persist_layer.py:42-42。 - 状态快照:
GraphRuntimeStateapi/graphon/runtime.py:83-83被序列化。 - 变量池:所有当前变量都保存在
VariablePoolapi/graphon/runtime.py:83-83中。 - 任务管线:
AdvancedChatAppGenerateTaskPipelineapi/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141和WorkflowAppGenerateTaskPipelineapi/core/app/apps/workflow/generate_task_pipeline.py:75-75都通过GraphRuntimeStateSupport混入类支持GraphRuntimeState。
来源: api/core/app/layers/pause_state_persist_layer.py:42-42 api/graphon/runtime.py:83-83 api/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141 api/core/app/apps/workflow/generate_task_pipeline.py:11-11
表单投递与通知
当工作流因等待人工输入而暂停时,Dify 可以触发通知,以确保用户知晓需要其输入。
邮件投递
Dify 包含一个专门用于人工输入邮件投递的任务:dispatch_human_input_email_task api/core/app/apps/workflow_app_runner.py:86-86。该任务在 WorkflowBasedAppRunner 中被导入和使用。
表单过期
系统通过事件处理表单超时:
QueueHumanInputFormTimeoutEvent:在定义的超时时间到期时触发api/core/app/apps/workflow/generate_task_pipeline.py:21-21。NodeRunHumanInputFormTimeoutEvent:引擎级别的事件,指示特定节点已超时api/core/app/apps/workflow_app_runner.py:68-68。QueueHumanInputFormFilledEvent:在用户成功提交表单时触发api/core/app/apps/workflow/generate_task_pipeline.py:20-20。
来源:
api/core/app/apps/workflow/generate_task_pipeline.py:20-21api/core/app/apps/workflow_app_runner.py:68-68api/core/app/apps/workflow_app_runner.py:86-86api/core/app/entities/task_entities.py:91-91