RAG 管线工作流
检索增强生成(RAG)管线工作流
相关源文件
本章引用的主要源码文件:
api/controllers/console/app/app_import.pyapi/controllers/console/app/workflow_draft_variable.pyapi/controllers/console/datasets/rag_pipeline/rag_pipeline.pyapi/controllers/console/datasets/rag_pipeline/rag_pipeline_datasets.pyapi/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.pyapi/controllers/console/datasets/rag_pipeline/rag_pipeline_import.pyapi/controllers/inner_api/app/dsl.pyapi/core/db/__init__.pyapi/core/db/session_factory.pyapi/events/event_handlers/sync_workflow_schedule_when_app_published.pyapi/events/event_handlers/update_app_triggers_when_app_published_workflow_updated.pyapi/extensions/ext_session_factory.pyapi/services/entities/dsl_entities.pyapi/services/plugin/plugin_migration.pyapi/services/plugin/plugin_service.pyapi/services/rag_pipeline/rag_pipeline.pyapi/services/rag_pipeline/rag_pipeline_transform_service.pyapi/services/workflow_draft_variable_service.pyapi/services/workflow_service.pyapi/tests/test_containers_integration_tests/controllers/console/app/test_app_import_api.pyapi/tests/test_containers_integration_tests/services/test_schedule_service.pyapi/tests/unit_tests/controllers/console/app/test_app_import_api.pyapi/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.pyapi/tests/unit_tests/controllers/inner_api/app/test_dsl.pyapi/tests/unit_tests/services/plugin/__init__.pyapi/tests/unit_tests/services/plugin/conftest.pyapi/tests/unit_tests/services/plugin/test_dependencies_analysis.pyapi/tests/unit_tests/services/plugin/test_endpoint_service.pyapi/tests/unit_tests/services/plugin/test_plugin_migration.pyapi/tests/unit_tests/services/plugin/test_plugin_service.pyapi/tests/unit_tests/services/rag_pipeline/test_rag_pipeline.pyapi/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_transform_service.pyapi/tests/unit_tests/services/test_schedule_service.pyapi/tests/unit_tests/services/workflow/test_draft_var_loader_simple.pyapi/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py
Dify 中的检索增强生成(RAG)管线是一种专门的工作流类型,旨在将数据索引和检索逻辑与标准应用流程解耦。它允许开发者使用基于图的领域特定语言(DSL)定义复杂的 ETL(提取、转换、加载)和检索策略。该管线通过 Celery 任务异步执行,并可作为数据集生命周期中的独立实体进行管理。
1. 作为独立工作流的 RAG 管线
与标准的聊天或补全应用不同,RAG 管线直接与 Dataset 关联,并在专用的 DatasetRuntimeMode.RAG_PIPELINE api/models/enums.py:256 模式下运行。它使用 Workflow 模型,但由特定的 RAG 事件(如文档导入或手动调试)触发 api/models/enums.py:25-26。
核心组件
- 管线模型:
Pipeline实体存储 RAG 工作流的配置和元数据api/models/dataset.py:62。 - 工作流集成:每个 RAG 管线都有一个对应的
Workflow记录,用于定义执行图api/models/workflow.py:72。 - 变量管理:管线使用
WorkflowDraftVariable来持久化草稿工作流的状态和配置,允许用户测试不同的索引参数api/models/workflow.py:101。 - 触发类型:执行被归类为
WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN或RAG_PIPELINE_DEBUGGINGapi/models/enums.py:25-26。
数据实体关系
下表展示了 RAG 管线如何桥接知识库实体与工作流引擎。
| 自然语言空间 | 代码实体空间(SQLAlchemy 模型) |
|---|---|
| RAG 管线 | models.dataset.Pipeline api/models/dataset.py:62 |
| 执行图 | models.workflow.Workflow api/models/workflow.py:72 |
| 索引模式 | models.enums.DatasetRuntimeMode api/models/enums.py:252 |
| 运行历史 | models.workflow.WorkflowRun api/models/workflow.py:130 |
| 草稿变量 | models.workflow.WorkflowDraftVariable api/models/workflow.py:101 |
来源:api/models/enums.py:25-26, 252-256, api/models/dataset.py:62, api/models/workflow.py:72, 101, 130。
2. 管线 DSL 与转换服务
RagPipelineTransformService 负责通过生成表示索引逻辑的 DSL(YAML)将传统的"通用"数据集转换为"RAG 管线"模式。
转换逻辑
当通过 transform_dataset() 转换数据集时,该服务会识别 doc_form(例如 PARAGRAPH_INDEX 或 PARENT_CHILD_INDEX),并选择对应的 YAML 模板 api/services/rag_pipeline/rag_pipeline_transform_service.py:31-53。
- 模板选择:像
file-general-high-quality.yml或notion-parentchild.yml这样的模板定义了初始图结构api/services/rag_pipeline/rag_pipeline_transform_service.py:113, 143。 - 节点注入:该服务将特定配置注入到节点中,例如
knowledge-index设置和datasource凭证api/services/rag_pipeline/rag_pipeline_transform_service.py:70-76。 - 数据集迁移:将
Dataset.runtime_mode更新为RAG_PIPELINE,并将其链接到新创建的PipelineIDapi/services/rag_pipeline/rag_pipeline_transform_service.py:93-94。
DSL 管理
RagPipelineDslService 负责这些管线配置的导入和导出 api/services/rag_pipeline/rag_pipeline_dsl_service.py:23。
- 导入:
import_rag_pipeline()解析 YAML 内容,并创建关联的Pipeline和Workflow记录api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py:75-82。 - 依赖检查:
check_dependencies()确保在执行管线之前,所需的插件或模型提供者可用api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py:129。
来源:api/services/rag_pipeline/rag_pipeline_transform_service.py:31-94, 113, 143, api/controllers/console/datasets/rag_pipeline/rag_pipeline_import.py:75-82, 129。
3. 管线服务与变量加载
RagPipelineService 和 WorkflowDraftVariableService 协同工作,管理 RAG 工作流的执行状态和参数。
草稿变量服务
WorkflowDraftVariableService 管理设计阶段使用的变量的持久化。如果变量超过大小限制,它支持将大变量卸载到存储后端 api/services/workflow_draft_variable_service.py:165-170。
- DraftVarLoader:实现
VariableLoader接口,从数据库获取变量以供VariablePool使用api/services/workflow_draft_variable_service.py:80-110。 - 变量持久化:使用
WorkflowDraftVariable和WorkflowDraftVariableFile存储值及关联的文件附件api/services/workflow_draft_variable_service.py:48。
变量解析流程
下图展示了服务层与图运行时变量之间的交互。
来源:api/services/workflow_draft_variable_service.py:80-110, 165-170, api/services/rag_pipeline/rag_pipeline.py:96-104。
4. 通过 Celery 执行管线
RAG 管线被异步执行,以处理大规模数据处理而不阻塞 API。
任务架构
执行通过专用的 Celery 任务进行管理,这些任务使用 PipelineGenerator 将 DSL 转换为可执行图 api/core/app/apps/pipeline/pipeline_generator.py:17。
- 工作流入点:
RagPipelineService使用WorkflowEntry来启动图执行api/services/rag_pipeline/rag_pipeline.py:44。 - 系统变量:管线会注入系统变量(如
RAG_PIPELINE_VARIABLE_NODE_ID)以为节点提供上下文api/services/workflow_draft_variable_service.py:24。
执行工作流
下图展示了从 API 触发到 Celery 工作器执行逻辑的映射。
关键执行方法
- 状态跟踪:
DocumentPipelineExecutionLog模型跟踪每个文档在管线中的处理进度api/models/dataset.py:61。 - 节点执行:使用标准工作流节点,但通常会配置 RAG 特定的数据,例如为
knowledge-index节点配置KnowledgeConfigurationapi/services/rag_pipeline/rag_pipeline_transform_service.py:71。
来源:api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py:82, api/services/rag_pipeline/rag_pipeline.py:44, api/models/dataset.py:61, api/services/workflow_draft_variable_service.py:24。