后台任务处理与 Celery
使用 Celery 进行后台任务处理
相关源文件
本章引用的主要源码文件:
api/commands/plugin.pyapi/commands/storage.pyapi/commands/system.pyapi/commands/vector.pyapi/configs/middleware/cache/redis_config.pyapi/core/helper/encrypter.pyapi/events/document_index_event.pyapi/events/event_handlers/create_document_index.pyapi/events/event_handlers/create_installed_app_when_app_created.pyapi/events/event_handlers/create_site_record_when_app_created.pyapi/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.pyapi/events/event_handlers/update_app_dataset_join_when_app_published_workflow_updated.pyapi/extensions/ext_celery.pyapi/extensions/ext_redis.pyapi/fields/workflow_fields.pyapi/migrations/versions/6e957a32015b_add_embedding_cache_created_at_index.pyapi/models/enums.pyapi/pyrefly-local-excludes.txtapi/schedule/clean_embedding_cache_task.pyapi/schedule/clean_messages.pyapi/schedule/clean_unused_datasets_task.pyapi/schedule/clean_workflow_runs_task.pyapi/schedule/queue_monitor_task.pyapi/schedule/workflow_schedule_task.pyapi/tasks/add_document_to_index_task.pyapi/tasks/batch_clean_document_task.pyapi/tasks/batch_create_segment_to_index_task.pyapi/tasks/clean_notion_document_task.pyapi/tasks/create_segment_to_index_task.pyapi/tasks/delete_account_task.pyapi/tasks/delete_segment_from_index_task.pyapi/tasks/disable_segments_from_index_task.pyapi/tasks/document_indexing_sync_task.pyapi/tasks/document_indexing_task.pyapi/tasks/document_indexing_update_task.pyapi/tasks/duplicate_document_indexing_task.pyapi/tasks/enable_segments_to_index_task.pyapi/tasks/retry_document_indexing_task.pyapi/tasks/sync_website_document_indexing_task.pyapi/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.pyapi/tests/test_containers_integration_tests/trigger/__init__.pyapi/tests/test_containers_integration_tests/trigger/conftest.pyapi/tests/test_containers_integration_tests/trigger/test_trigger_e2e.pyapi/tests/unit_tests/commands/test_reset_encrypt_key_pair.pyapi/tests/unit_tests/core/app/apps/pipeline/test_pipeline_generator.pyapi/tests/unit_tests/core/app/apps/pipeline/test_pipeline_runner.pyapi/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.pyapi/tests/unit_tests/extensions/test_celery_ssl.pyapi/tests/unit_tests/extensions/test_redis.pyapi/tests/unit_tests/models/test_enums_creator_user_role.pyapi/tests/unit_tests/tasks/test_clean_document_task.pyapi/tests/unit_tests/tasks/test_delete_account_task.pyapi/tests/unit_tests/tasks/test_document_indexing_sync_task.pyapi/tests/unit_tests/tasks/test_document_indexing_update_task.py
目的与范围
本文档描述了 Dify 中的后台任务处理基础设施,该系统使用 Celery 进行异步任务执行。系统在 API 服务的请求-响应周期之外处理长时间运行的操作,例如数据集索引、工作流执行、数据清理和应用删除。
有关工作流执行引擎的信息,请参见 5.1。有关文档索引管线的详细信息,请参见 4.2。
架构总览
服务拓扑
Dify 的后台处理基础设施由三个主要组件组成,它们共享同一个容器镜像(langgenius/dify-api),但以不同的模式运行。系统使用 Redis 作为中央消息代理来管理跨工作节点的任务分发。
来源: api/extensions/ext_celery.py:105-110,api/tasks/batch_create_segment_to_index_task.py:177-180,api/tasks/document_indexing_task.py:33
基于模式的服务区分
同一个 Docker 镜像支持多种运行模式,通过 MODE 环境变量控制。这使得 Dify 可以独立扩展 API 处理和后台处理能力。
| 模式 | 服务 | 用途 | 进程 |
|---|---|---|---|
api | API 服务器 | 处理 HTTP 请求 | Gunicorn + Flask |
worker | Celery 工作节点 | 执行后台任务 | Celery worker 进程 |
beat | Celery Beat | 调度周期性任务 | Celery beat 调度器 |
队列拓扑
队列结构
Dify 中的 Celery 工作节点从多个专用队列消费任务。这种分离方式允许对不同任务类型进行优先级处理和资源分配。
| 队列 | 任务类型 | 关键操作 |
|---|---|---|
dataset | 文档处理 | 文件提取、片段切分、嵌入向量生成、向量索引 |
workflow / generation | 工作流执行 | 节点处理、大语言模型(LLM)调用、工具调用 |
app_deletion | 资源清理 | 删除应用、对话、消息以及关联的存储文件 |
mail | 邮件发送 | 密码重置、邀请、通知 |
来源: api/tasks/document_indexing_task.py:33,api/tasks/batch_create_segment_to_index_task.py:29,api/tasks/document_indexing_sync_task.py:21-22
工作节点配置
工作节点进程管理
Celery 应用在 api/extensions/ext_celery.py 中初始化。它使用自定义的 FlaskTask 类来确保每个后台任务都在 Flask 应用上下文中运行,从而提供对数据库会话和配置的访问。
# 用于 Flask 上下文的自定义 Task 类
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
from core.logging.context import init_request_context
with app.app_context():
# 初始化此任务的日志上下文
init_request_context()
return self.run(*args, **kwargs)
来源: api/extensions/ext_celery.py:94-101
消息代理和后端配置
Redis 集成
Dify 使用 Redis 作为主要消息代理和结果后端。系统支持高级 Redis 配置,包括 SSL/TLS 和 Sentinel。
SSL/TLS 支持
如果启用了 BROKER_USE_SSL,系统会将证书要求字符串(例如 CERT_REQUIRED)映射到 Python 的 ssl 常量,并将其应用于消息代理和 Redis 结果后端。
来源: api/extensions/ext_celery.py:33-61,api/extensions/ext_celery.py:129-136
Sentinel 支持
当 CELERY_USE_SENTINEL 为 true 时,系统会使用主节点名称和 Sentinel 凭证配置 broker_transport_options。
来源: api/extensions/ext_celery.py:64-82
使用 Beat 进行任务调度
Celery Beat 管理周期性的维护任务。调度表在初始化期间根据 dify_config 中的环境功能开关动态定义。
周期性任务调度表
| 任务名称 | 执行频率 | 用途 |
|---|---|---|
clean_embedding_cache_task | CELERY_BEAT_SCHEDULER_TIME | 从缓存中移除旧的嵌入向量结果 |
clean_unused_datasets_task | CELERY_BEAT_SCHEDULER_TIME | 根据计划清理标记为删除的数据集 |
clean_messages | CELERY_BEAT_SCHEDULER_TIME | 清理旧消息 |
create_tidb_serverless_task | 每小时 | TiDB Serverless 实例管理 |
update_tidb_serverless_status_task | 每 10 分钟 | TiDB 状态监控 |
来源: api/extensions/ext_celery.py:155-185,api/schedule/clean_unused_datasets_task.py:26-42
核心后台任务模式
1. 文档索引管线
索引管线是一个多阶段过程,由 IndexingRunner 管理。诸如 document_indexing_task 和 document_indexing_sync_task 之类的任务处理文档准备的异步生命周期。
来源: api/tasks/document_indexing_task.py:33-112,api/tasks/document_indexing_sync_task.py:134-147,api/models/enums.py:127-138
2. 片段批量处理
对于大批量导入,Dify 使用 batch_create_segment_to_index_task 来处理 CSV/Excel 文件处理、通过 ModelManager 生成嵌入向量以及通过 VectorService 进行向量存储。
来源: api/tasks/batch_create_segment_to_index_task.py:30-180
3. 数据集维护与清理
clean_unused_datasets_task 执行复杂的清理工作,根据数据集的 CloudPlan(例如 SANDBOX 与 PRO)和活动级别进行过滤。它使用 IndexProcessorFactory 从向量索引中移除数据。
来源: api/schedule/clean_unused_datasets_task.py:80-148,api/extensions/ext_celery.py:164-167
实现细节
数据库会话管理
任务使用 session_factory.create_session() 来确保长时间运行的后台进程中具有清晰的事务边界。这对于防止工作节点池中的连接泄漏至关重要。
来源: api/tasks/batch_create_segment_to_index_task.py:59,api/tasks/document_indexing_task.py:59,api/tasks/document_indexing_sync_task.py:34
多租户与隔离
TenantIsolatedTaskQueue 用于诸如 duplicate_document_indexing_task 之类的任务,以确保一个租户的重型索引操作不会阻塞其他租户。它根据 dify_config.TENANT_ISOLATED_TASK_CONCURRENCY 中定义的租户特定并发限制来拉取任务。
来源: api/tasks/duplicate_document_indexing_task.py:55-76,api/tasks/document_indexing_task.py:15
Redis 键前缀
为了支持共享 Redis 实例,如果配置了 REDIS_KEY_PREFIX,Dify 会将全局键前缀应用于所有 Celery 传输制品。
来源: api/extensions/ext_celery.py:85-90,api/configs/middleware/cache/redis_config.py:35-38