agentic_huge_data_base / wiki
页面 Dify · 3.4 后台任务处理与 Celery·DeepWiki 中文全文译文

3.4 · 后台任务处理与 Celery(Background Task Processing with Celery)

应用编排与外部知识接入 · 聚焦本章的模块关系、源码依据与实现要点。

项目Dify 章节3.4 状态全文译文 模块工作流与编排、存储与持久化、系统架构、测试、发布与运维
源码线索
  • api/commands/plugin.py
  • api/commands/storage.py
  • api/commands/system.py
  • api/commands/vector.py
  • api/configs/middleware/cache/redis_config.py
  • api/core/helper/encrypter.py
  • api/events/document_index_event.py
  • api/events/event_handlers/create_document_index.py
  • api/events/event_handlers/create_installed_app_when_app_created.py
  • api/events/event_handlers/create_site_record_when_app_created.py
模块标签
  • 工作流与编排
  • 存储与持久化
  • 系统架构
  • 测试、发布与运维
  • 检索、召回与索引

章节正文

后台任务处理与 Celery

使用 Celery 进行后台任务处理

相关源文件

本章引用的主要源码文件:

  • api/commands/plugin.py
  • api/commands/storage.py
  • api/commands/system.py
  • api/commands/vector.py
  • api/configs/middleware/cache/redis_config.py
  • api/core/helper/encrypter.py
  • api/events/document_index_event.py
  • api/events/event_handlers/create_document_index.py
  • api/events/event_handlers/create_installed_app_when_app_created.py
  • api/events/event_handlers/create_site_record_when_app_created.py
  • api/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.py
  • api/events/event_handlers/update_app_dataset_join_when_app_published_workflow_updated.py
  • api/extensions/ext_celery.py
  • api/extensions/ext_redis.py
  • api/fields/workflow_fields.py
  • api/migrations/versions/6e957a32015b_add_embedding_cache_created_at_index.py
  • api/models/enums.py
  • api/pyrefly-local-excludes.txt
  • api/schedule/clean_embedding_cache_task.py
  • api/schedule/clean_messages.py
  • api/schedule/clean_unused_datasets_task.py
  • api/schedule/clean_workflow_runs_task.py
  • api/schedule/queue_monitor_task.py
  • api/schedule/workflow_schedule_task.py
  • api/tasks/add_document_to_index_task.py
  • api/tasks/batch_clean_document_task.py
  • api/tasks/batch_create_segment_to_index_task.py
  • api/tasks/clean_notion_document_task.py
  • api/tasks/create_segment_to_index_task.py
  • api/tasks/delete_account_task.py
  • api/tasks/delete_segment_from_index_task.py
  • api/tasks/disable_segments_from_index_task.py
  • api/tasks/document_indexing_sync_task.py
  • api/tasks/document_indexing_task.py
  • api/tasks/document_indexing_update_task.py
  • api/tasks/duplicate_document_indexing_task.py
  • api/tasks/enable_segments_to_index_task.py
  • api/tasks/retry_document_indexing_task.py
  • api/tasks/sync_website_document_indexing_task.py
  • api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py
  • api/tests/test_containers_integration_tests/trigger/__init__.py
  • api/tests/test_containers_integration_tests/trigger/conftest.py
  • api/tests/test_containers_integration_tests/trigger/test_trigger_e2e.py
  • api/tests/unit_tests/commands/test_reset_encrypt_key_pair.py
  • api/tests/unit_tests/core/app/apps/pipeline/test_pipeline_generator.py
  • api/tests/unit_tests/core/app/apps/pipeline/test_pipeline_runner.py
  • api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py
  • api/tests/unit_tests/extensions/test_celery_ssl.py
  • api/tests/unit_tests/extensions/test_redis.py
  • api/tests/unit_tests/models/test_enums_creator_user_role.py
  • api/tests/unit_tests/tasks/test_clean_document_task.py
  • api/tests/unit_tests/tasks/test_delete_account_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_sync_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_update_task.py

目的与范围

本文档描述了 Dify 中的后台任务处理基础设施,该系统使用 Celery 进行异步任务执行。系统在 API 服务的请求-响应周期之外处理长时间运行的操作,例如数据集索引、工作流执行、数据清理和应用删除。

有关工作流执行引擎的信息,请参见 5.1。有关文档索引管线的详细信息,请参见 4.2

架构总览

服务拓扑

Dify 的后台处理基础设施由三个主要组件组成,它们共享同一个容器镜像(langgenius/dify-api),但以不同的模式运行。系统使用 Redis 作为中央消息代理来管理跨工作节点的任务分发。

Dify · 服务拓扑 · 图 1
Dify · 服务拓扑 · 图 1

来源: api/extensions/ext_celery.py:105-110api/tasks/batch_create_segment_to_index_task.py:177-180api/tasks/document_indexing_task.py:33

基于模式的服务区分

同一个 Docker 镜像支持多种运行模式,通过 MODE 环境变量控制。这使得 Dify 可以独立扩展 API 处理和后台处理能力。

模式服务用途进程
apiAPI 服务器处理 HTTP 请求Gunicorn + Flask
workerCelery 工作节点执行后台任务Celery worker 进程
beatCelery Beat调度周期性任务Celery beat 调度器

队列拓扑

队列结构

Dify 中的 Celery 工作节点从多个专用队列消费任务。这种分离方式允许对不同任务类型进行优先级处理和资源分配。

队列任务类型关键操作
dataset文档处理文件提取、片段切分、嵌入向量生成、向量索引
workflow / generation工作流执行节点处理、大语言模型(LLM)调用、工具调用
app_deletion资源清理删除应用、对话、消息以及关联的存储文件
mail邮件发送密码重置、邀请、通知

来源: api/tasks/document_indexing_task.py:33api/tasks/batch_create_segment_to_index_task.py:29api/tasks/document_indexing_sync_task.py:21-22

工作节点配置

工作节点进程管理

Celery 应用在 api/extensions/ext_celery.py 中初始化。它使用自定义的 FlaskTask 类来确保每个后台任务都在 Flask 应用上下文中运行,从而提供对数据库会话和配置的访问。

# 用于 Flask 上下文的自定义 Task 类
class FlaskTask(Task):
    def __call__(self, *args: object, **kwargs: object) -> object:
        from core.logging.context import init_request_context

        with app.app_context():
            # 初始化此任务的日志上下文
            init_request_context()
            return self.run(*args, **kwargs)

来源: api/extensions/ext_celery.py:94-101

消息代理和后端配置

Redis 集成

Dify 使用 Redis 作为主要消息代理和结果后端。系统支持高级 Redis 配置,包括 SSL/TLS 和 Sentinel。

SSL/TLS 支持

如果启用了 BROKER_USE_SSL,系统会将证书要求字符串(例如 CERT_REQUIRED)映射到 Python 的 ssl 常量,并将其应用于消息代理和 Redis 结果后端。

来源: api/extensions/ext_celery.py:33-61api/extensions/ext_celery.py:129-136

Sentinel 支持

CELERY_USE_SENTINEL 为 true 时,系统会使用主节点名称和 Sentinel 凭证配置 broker_transport_options

来源: api/extensions/ext_celery.py:64-82

使用 Beat 进行任务调度

Celery Beat 管理周期性的维护任务。调度表在初始化期间根据 dify_config 中的环境功能开关动态定义。

周期性任务调度表
任务名称执行频率用途
clean_embedding_cache_taskCELERY_BEAT_SCHEDULER_TIME从缓存中移除旧的嵌入向量结果
clean_unused_datasets_taskCELERY_BEAT_SCHEDULER_TIME根据计划清理标记为删除的数据集
clean_messagesCELERY_BEAT_SCHEDULER_TIME清理旧消息
create_tidb_serverless_task每小时TiDB Serverless 实例管理
update_tidb_serverless_status_task每 10 分钟TiDB 状态监控

来源: api/extensions/ext_celery.py:155-185api/schedule/clean_unused_datasets_task.py:26-42

核心后台任务模式

1. 文档索引管线

索引管线是一个多阶段过程,由 IndexingRunner 管理。诸如 document_indexing_taskdocument_indexing_sync_task 之类的任务处理文档准备的异步生命周期。

Dify · 1. 文档索引管线 · 图 2
Dify · 1. 文档索引管线 · 图 2

来源: api/tasks/document_indexing_task.py:33-112api/tasks/document_indexing_sync_task.py:134-147api/models/enums.py:127-138

2. 片段批量处理

对于大批量导入,Dify 使用 batch_create_segment_to_index_task 来处理 CSV/Excel 文件处理、通过 ModelManager 生成嵌入向量以及通过 VectorService 进行向量存储。

来源: api/tasks/batch_create_segment_to_index_task.py:30-180

3. 数据集维护与清理

clean_unused_datasets_task 执行复杂的清理工作,根据数据集的 CloudPlan(例如 SANDBOX 与 PRO)和活动级别进行过滤。它使用 IndexProcessorFactory 从向量索引中移除数据。

Dify · 3. 数据集维护与清理 · 图 3
Dify · 3. 数据集维护与清理 · 图 3

来源: api/schedule/clean_unused_datasets_task.py:80-148api/extensions/ext_celery.py:164-167

实现细节

数据库会话管理

任务使用 session_factory.create_session() 来确保长时间运行的后台进程中具有清晰的事务边界。这对于防止工作节点池中的连接泄漏至关重要。

来源: api/tasks/batch_create_segment_to_index_task.py:59api/tasks/document_indexing_task.py:59api/tasks/document_indexing_sync_task.py:34

多租户与隔离

TenantIsolatedTaskQueue 用于诸如 duplicate_document_indexing_task 之类的任务,以确保一个租户的重型索引操作不会阻塞其他租户。它根据 dify_config.TENANT_ISOLATED_TASK_CONCURRENCY 中定义的租户特定并发限制来拉取任务。

来源: api/tasks/duplicate_document_indexing_task.py:55-76api/tasks/document_indexing_task.py:15

Redis 键前缀

为了支持共享 Redis 实例,如果配置了 REDIS_KEY_PREFIX,Dify 会将全局键前缀应用于所有 Celery 传输制品。

来源: api/extensions/ext_celery.py:85-90api/configs/middleware/cache/redis_config.py:35-38