agentic_huge_data_base / wiki
页面 Onyx · 7.1 Celery 工作器架构·DeepWiki 中文全文译文

7.1 · Celery 工作器架构(Celery Worker Architecture)

企业连接器与统一搜索 · 聚焦本章的模块关系、源码依据与实现要点。

项目Onyx 章节7.1 状态全文译文 模块文档对象与元数据、工作流与编排、系统架构、测试、发布与运维
源码线索
  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
模块标签
  • 文档对象与元数据
  • 工作流与编排
  • 系统架构
  • 测试、发布与运维
  • 存储与持久化

章节正文

Celery 工作器架构

Celery 工作器架构

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/docprocessing/tasks.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/background/indexing/run_docfetching.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/db/file_record.py
  • backend/onyx/db/index_attempt_metrics.py
  • backend/onyx/db/index_attempt_metrics_models.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/file_store/staging.py
  • backend/onyx/redis/redis_pool.py
  • backend/onyx/utils/postgres_sanitization.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/celery/test_docprocessing_priority.py
  • backend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.py
  • backend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.py
  • backend/tests/external_dependency_unit/indexing/__init__.py
  • backend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.py
  • backend/tests/external_dependency_unit/indexing_helpers.py
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py
  • backend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.py
  • backend/tests/unit/onyx/db/test_tools.py

目的与范围

本文档描述了 Onyx 中 Celery Worker 的架构,这些 Worker 负责处理所有异步后台任务,包括文档索引、连接器操作、权限同步和系统维护任务。Worker 架构使用 supervisord 进行进程管理,并支持专门的 Worker 池来隔离不同类型的工作负载。

Worker 类型概览

Onyx 实现了一种专门的 Worker 架构,针对不同类型的工作负载使用不同的 Worker 进程。每种 Worker 类型处理特定的队列,并具有优化的并发设置,以防止长时间运行的任务(如文档获取)阻塞快速的元数据更新。

代码实体到系统名称的映射

下图展示了代码中定义的 Celery 应用实例与其各自角色和队列分配之间的关系。

标题:Celery 应用到 Worker 的映射

Onyx · 代码实体到系统名称的映射 · 图 1
Onyx · 代码实体到系统名称的映射 · 图 1

来源:backend/supervisord.conf:30-115backend/onyx/background/celery/tasks/beat_schedule.py:37-179backend/onyx/background/celery/apps/primary.py:53-55

Worker 类型汇总

Worker 程序应用模块默认并发数主要队列用途
celery_worker_primary...primary6(开发环境)celery系统初始化、单例任务、协调
celery_worker_light...light16(开发环境)vespa_metadata_syncconnector_deletioncheckpoint_cleanup仅元数据的快速同步操作
celery_worker_docprocessing...docprocessing6(开发环境)docprocessing计算密集型的文本片段切分和嵌入向量生成
celery_worker_docfetching...docfetching1connector_doc_fetchingIO 密集型的外部 API 调用(速率限制安全)
celery_worker_heavy...heavy4(开发环境)connector_pruningsandboxconnector_external_group_sync长时间运行的修剪和权限同步
celery_worker_user_file_processing...user_file_processing2(开发环境)user_file_processing处理本地文件上传

来源:backend/supervisord.conf:30-115backend/scripts/dev_run_background_jobs.py:19-113backend/onyx/configs/constants.py:82-96

队列架构与任务路由

任务会根据其优先级和资源需求被路由到特定的队列。这实现了细粒度的伸缩能力(例如,在大型重新索引期间增加 docprocessing Worker 的数量,而不会影响 light 元数据更新)。

标题:任务优先级与路由逻辑

Onyx · 队列架构与任务路由 · 图 2
Onyx · 队列架构与任务路由 · 图 2

来源:backend/onyx/background/celery/tasks/beat_schedule.py:43-146backend/onyx/configs/constants.py:58-63

Worker 生命周期与初始化

所有 Worker 都使用标准化的初始化序列,以确保在开始消费任务之前,数据存储(Postgres、Redis、Vespa)是可访问的。

初始化序列
  1. 进程启动:由 supervisord 管理。backend/supervisord.conf:30-115
  2. worker_init:设置 Postgres 的 application_name(例如 POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME),并使用与 Worker 并发数匹配的连接池大小初始化 SqlEnginebackend/onyx/background/celery/apps/primary.py:116-123
  3. 等待数据存储:调用 app_base.wait_for_redis()app_base.wait_for_db()app_base.wait_for_document_index_or_shutdown()backend/onyx/background/celery/apps/primary.py:125-127
  4. 主单例锁:(仅主 Worker)获取 OnyxRedisLocks.PRIMARY_WORKER 以执行清理任务,如 reset_document_sync()backend/onyx/background/celery/apps/primary.py:156-175
  5. worker_ready:启动指标服务器并处理集群范围内的就绪信号。backend/onyx/background/celery/apps/primary.py:223-225

来源:backend/onyx/background/celery/apps/app_base.py:12-18backend/onyx/background/celery/apps/primary.py:110-230

主 Worker 与单例管理

主 Worker 充当集群的协调器。在非多租户环境中,它会执行关键的启动维护任务:

  • 加锁:获取 Redis 锁 OnyxRedisLocks.PRIMARY_WORKER,超时时间为 CELERY_PRIMARY_WORKER_LOCK_TIMEOUT(120 秒)。backend/onyx/background/celery/apps/primary.py:165-172backend/onyx/configs/constants.py:133
  • Redis 清理:调用 reset_document_sync() 清除 Redis 中过期的索引状态。backend/onyx/background/celery/apps/primary.py:177
  • 孤儿任务管理:识别处于 STARTEDINITIALIZING 状态的 IndexAttempt 记录(可能是由于之前的 Worker 崩溃导致),并将其标记为 CANCELEDbackend/onyx/background/celery/apps/primary.py:192-213

来源:backend/onyx/background/celery/apps/primary.py:137-213backend/onyx/configs/constants.py:133

租户感知的任务执行

Onyx 后台任务使用 TenantAwareTask 来执行。这个自定义任务类确保在任务逻辑运行之前,数据库模式被正确切换。

  • 上下文注入__call__ 方法拦截任务执行,从 kwargs 中提取 tenant_id,并设置 CURRENT_TENANT_ID_CONTEXTVARbackend/onyx/background/celery/apps/app_base.py:85-90
  • 隔离性:这确保了即使多个租户的任务在同一个 Worker 进程上顺序执行,它们的数据库会话(通过 get_session_with_current_tenant)也会指向正确的模式。backend/onyx/background/celery/apps/app_base.py:93-98
  • Redis 前缀TenantRedis 会自动为所有 Redis 键添加 tenant_id 前缀,以在共享的 Redis 实例中提供逻辑隔离。backend/onyx/redis/redis_pool.py:70-91

来源:backend/onyx/background/celery/apps/app_base.py:80-99backend/onyx/redis/redis_pool.py:65-175

Supervisord 与并发设置

Onyx 使用 supervisord 来管理这些 Worker 的生命周期。一个关键的配置选择是使用 threads 池而不是默认的 prefork 池。

  • 线程池:用于避免 Celery、SQLAlchemy 和某些 C 扩展之间交互导致的 SIGSEGV(信号 11)问题。backend/supervisord.conf:20-29
  • Broker 连接池限制:连接池通过 get_redis_client 进行管理,该函数配置了对 BusyLoadingErrorConnectionErrorTimeoutError 的重试。backend/onyx/redis/redis_pool.py:51-62
  • 日志聚合:supervisord 中的 log-redirect-handler 程序会跟踪所有单个 Worker 的日志,并将其管道输出到 stdout,以实现容器级别的日志记录。backend/supervisord.conf:168-185

来源:backend/supervisord.conf:20-185backend/onyx/redis/redis_pool.py:51-62