agentic_huge_data_base / wiki
页面 Dify · 4.7 RAG 管线工作流·DeepWiki 中文全文译文

4.7 · RAG 管线工作流(RAG Pipeline Workflow)

应用编排与外部知识接入 · 聚焦本章的模块关系、源码依据与实现要点。

项目Dify 章节4.7 状态全文译文 模块图谱与关系、工作流与编排、测试、发布与运维、配置治理
源码线索
  • api/controllers/console/app/app_import.py
  • api/controllers/console/app/workflow_draft_variable.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline_datasets.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py
  • api/controllers/inner_api/app/dsl.py
  • api/core/db/__init__.py
  • api/core/db/session_factory.py
  • api/events/event_handlers/sync_workflow_schedule_when_app_published.py
模块标签
  • 图谱与关系
  • 工作流与编排
  • 测试、发布与运维
  • 配置治理
  • 系统架构

章节正文

RAG 管线工作流

检索增强生成(RAG)管线工作流

相关源文件

本章引用的主要源码文件:

  • api/controllers/console/app/app_import.py
  • api/controllers/console/app/workflow_draft_variable.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline_datasets.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py
  • api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py
  • api/controllers/inner_api/app/dsl.py
  • api/core/db/__init__.py
  • api/core/db/session_factory.py
  • api/events/event_handlers/sync_workflow_schedule_when_app_published.py
  • api/events/event_handlers/update_app_triggers_when_app_published_workflow_updated.py
  • api/extensions/ext_session_factory.py
  • api/services/entities/dsl_entities.py
  • api/services/plugin/plugin_migration.py
  • api/services/plugin/plugin_service.py
  • api/services/rag_pipeline/rag_pipeline.py
  • api/services/rag_pipeline/rag_pipeline_transform_service.py
  • api/services/workflow_draft_variable_service.py
  • api/services/workflow_service.py
  • api/tests/test_containers_integration_tests/controllers/console/app/test_app_import_api.py
  • api/tests/test_containers_integration_tests/services/test_schedule_service.py
  • api/tests/unit_tests/controllers/console/app/test_app_import_api.py
  • api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py
  • api/tests/unit_tests/controllers/inner_api/app/test_dsl.py
  • api/tests/unit_tests/services/plugin/__init__.py
  • api/tests/unit_tests/services/plugin/conftest.py
  • api/tests/unit_tests/services/plugin/test_dependencies_analysis.py
  • api/tests/unit_tests/services/plugin/test_endpoint_service.py
  • api/tests/unit_tests/services/plugin/test_plugin_migration.py
  • api/tests/unit_tests/services/plugin/test_plugin_service.py
  • api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline.py
  • api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_transform_service.py
  • api/tests/unit_tests/services/test_schedule_service.py
  • api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py
  • api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py

Dify 中的检索增强生成(RAG)管线是一种专门的工作流类型,旨在将数据索引和检索逻辑与标准应用流程解耦。它允许开发者使用基于图的领域特定语言(DSL)定义复杂的 ETL(提取、转换、加载)和检索策略。该管线通过 Celery 任务异步执行,并可作为数据集生命周期中的独立实体进行管理。

1. 作为独立工作流的 RAG 管线

与标准的聊天或补全应用不同,RAG 管线直接与 Dataset 关联,并在专用的 DatasetRuntimeMode.RAG_PIPELINE api/models/enums.py:256 模式下运行。它使用 Workflow 模型,但由特定的 RAG 事件(如文档导入或手动调试)触发 api/models/enums.py:25-26

核心组件
  • 管线模型Pipeline 实体存储 RAG 工作流的配置和元数据 api/models/dataset.py:62
  • 工作流集成:每个 RAG 管线都有一个对应的 Workflow 记录,用于定义执行图 api/models/workflow.py:72
  • 变量管理:管线使用 WorkflowDraftVariable 来持久化草稿工作流的状态和配置,允许用户测试不同的索引参数 api/models/workflow.py:101
  • 触发类型:执行被归类为 WorkflowRunTriggeredFrom.RAG_PIPELINE_RUNRAG_PIPELINE_DEBUGGING api/models/enums.py:25-26
数据实体关系

下表展示了 RAG 管线如何桥接知识库实体与工作流引擎。

自然语言空间代码实体空间(SQLAlchemy 模型)
RAG 管线models.dataset.Pipeline api/models/dataset.py:62
执行图models.workflow.Workflow api/models/workflow.py:72
索引模式models.enums.DatasetRuntimeMode api/models/enums.py:252
运行历史models.workflow.WorkflowRun api/models/workflow.py:130
草稿变量models.workflow.WorkflowDraftVariable api/models/workflow.py:101

来源:api/models/enums.py:25-26, 252-256, api/models/dataset.py:62, api/models/workflow.py:72, 101, 130

2. 管线 DSL 与转换服务

RagPipelineTransformService 负责通过生成表示索引逻辑的 DSL(YAML)将传统的"通用"数据集转换为"RAG 管线"模式。

转换逻辑

当通过 transform_dataset() 转换数据集时,该服务会识别 doc_form(例如 PARAGRAPH_INDEXPARENT_CHILD_INDEX),并选择对应的 YAML 模板 api/services/rag_pipeline/rag_pipeline_transform_service.py:31-53

  • 模板选择:像 file-general-high-quality.ymlnotion-parentchild.yml 这样的模板定义了初始图结构 api/services/rag_pipeline/rag_pipeline_transform_service.py:113, 143
  • 节点注入:该服务将特定配置注入到节点中,例如 knowledge-index 设置和 datasource 凭证 api/services/rag_pipeline/rag_pipeline_transform_service.py:70-76
  • 数据集迁移:将 Dataset.runtime_mode 更新为 RAG_PIPELINE,并将其链接到新创建的 Pipeline ID api/services/rag_pipeline/rag_pipeline_transform_service.py:93-94
DSL 管理

RagPipelineDslService 负责这些管线配置的导入和导出 api/services/rag_pipeline/rag_pipeline_dsl_service.py:23

  • 导入import_rag_pipeline() 解析 YAML 内容,并创建关联的 PipelineWorkflow 记录 api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py:75-82
  • 依赖检查check_dependencies() 确保在执行管线之前,所需的插件或模型提供者可用 api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py:129

来源:api/services/rag_pipeline/rag_pipeline_transform_service.py:31-94, 113, 143, api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py:75-82, 129

3. 管线服务与变量加载

RagPipelineServiceWorkflowDraftVariableService 协同工作,管理 RAG 工作流的执行状态和参数。

草稿变量服务

WorkflowDraftVariableService 管理设计阶段使用的变量的持久化。如果变量超过大小限制,它支持将大变量卸载到存储后端 api/services/workflow_draft_variable_service.py:165-170

  • DraftVarLoader:实现 VariableLoader 接口,从数据库获取变量以供 VariablePool 使用 api/services/workflow_draft_variable_service.py:80-110
  • 变量持久化:使用 WorkflowDraftVariableWorkflowDraftVariableFile 存储值及关联的文件附件 api/services/workflow_draft_variable_service.py:48
变量解析流程

下图展示了服务层与图运行时变量之间的交互。

Dify · 变量解析流程 · 图 1
Dify · 变量解析流程 · 图 1

来源:api/services/workflow_draft_variable_service.py:80-110, 165-170, api/services/rag_pipeline/rag_pipeline.py:96-104

4. 通过 Celery 执行管线

RAG 管线被异步执行,以处理大规模数据处理而不阻塞 API。

任务架构

执行通过专用的 Celery 任务进行管理,这些任务使用 PipelineGenerator 将 DSL 转换为可执行图 api/core/app/apps/pipeline/pipeline_generator.py:17

  • 工作流入点RagPipelineService 使用 WorkflowEntry 来启动图执行 api/services/rag_pipeline/rag_pipeline.py:44
  • 系统变量:管线会注入系统变量(如 RAG_PIPELINE_VARIABLE_NODE_ID)以为节点提供上下文 api/services/workflow_draft_variable_service.py:24
执行工作流

下图展示了从 API 触发到 Celery 工作器执行逻辑的映射。

Dify · 执行工作流 · 图 2
Dify · 执行工作流 · 图 2
关键执行方法
  • 状态跟踪DocumentPipelineExecutionLog 模型跟踪每个文档在管线中的处理进度 api/models/dataset.py:61
  • 节点执行:使用标准工作流节点,但通常会配置 RAG 特定的数据,例如为 knowledge-index 节点配置 KnowledgeConfiguration api/services/rag_pipeline/rag_pipeline_transform_service.py:71

来源:api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py:82, api/services/rag_pipeline/rag_pipeline.py:44, api/models/dataset.py:61, api/services/workflow_draft_variable_service.py:24