Celery 工作器架构
Celery 工作器架构
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.pybackend/ee/onyx/background/celery/apps/primary.pybackend/ee/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/apps/app_base.pybackend/onyx/background/celery/apps/beat.pybackend/onyx/background/celery/apps/heavy.pybackend/onyx/background/celery/apps/light.pybackend/onyx/background/celery/apps/monitoring.pybackend/onyx/background/celery/apps/primary.pybackend/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/tasks/docprocessing/tasks.pybackend/onyx/background/celery/tasks/shared/tasks.pybackend/onyx/background/celery/tasks/vespa/tasks.pybackend/onyx/background/indexing/run_docfetching.pybackend/onyx/configs/app_configs.pybackend/onyx/configs/constants.pybackend/onyx/db/file_record.pybackend/onyx/db/index_attempt_metrics.pybackend/onyx/db/index_attempt_metrics_models.pybackend/onyx/document_index/factory.pybackend/onyx/file_store/staging.pybackend/onyx/redis/redis_pool.pybackend/onyx/utils/postgres_sanitization.pybackend/scripts/dev_run_background_jobs.pybackend/supervisord.confbackend/tests/external_dependency_unit/celery/test_docprocessing_priority.pybackend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.pybackend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.pybackend/tests/external_dependency_unit/indexing/__init__.pybackend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.pybackend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.pybackend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.pybackend/tests/external_dependency_unit/indexing_helpers.pybackend/tests/external_dependency_unit/redis/test_tenant_redis.pybackend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.pybackend/tests/unit/onyx/db/test_tools.py
目的与范围
本文档描述了 Onyx 中 Celery Worker 的架构,这些 Worker 负责处理所有异步后台任务,包括文档索引、连接器操作、权限同步和系统维护任务。Worker 架构使用 supervisord 进行进程管理,并支持专门的 Worker 池来隔离不同类型的工作负载。
Worker 类型概览
Onyx 实现了一种专门的 Worker 架构,针对不同类型的工作负载使用不同的 Worker 进程。每种 Worker 类型处理特定的队列,并具有优化的并发设置,以防止长时间运行的任务(如文档获取)阻塞快速的元数据更新。
代码实体到系统名称的映射
下图展示了代码中定义的 Celery 应用实例与其各自角色和队列分配之间的关系。
标题:Celery 应用到 Worker 的映射
来源:backend/supervisord.conf:30-115、backend/onyx/background/celery/tasks/beat_schedule.py:37-179、backend/onyx/background/celery/apps/primary.py:53-55
Worker 类型汇总
| Worker 程序 | 应用模块 | 默认并发数 | 主要队列 | 用途 |
|---|---|---|---|---|
celery_worker_primary | ...primary | 6(开发环境) | celery | 系统初始化、单例任务、协调 |
celery_worker_light | ...light | 16(开发环境) | vespa_metadata_sync、connector_deletion、checkpoint_cleanup | 仅元数据的快速同步操作 |
celery_worker_docprocessing | ...docprocessing | 6(开发环境) | docprocessing | 计算密集型的文本片段切分和嵌入向量生成 |
celery_worker_docfetching | ...docfetching | 1 | connector_doc_fetching | IO 密集型的外部 API 调用(速率限制安全) |
celery_worker_heavy | ...heavy | 4(开发环境) | connector_pruning、sandbox、connector_external_group_sync | 长时间运行的修剪和权限同步 |
celery_worker_user_file_processing | ...user_file_processing | 2(开发环境) | user_file_processing | 处理本地文件上传 |
来源:backend/supervisord.conf:30-115、backend/scripts/dev_run_background_jobs.py:19-113、backend/onyx/configs/constants.py:82-96
队列架构与任务路由
任务会根据其优先级和资源需求被路由到特定的队列。这实现了细粒度的伸缩能力(例如,在大型重新索引期间增加 docprocessing Worker 的数量,而不会影响 light 元数据更新)。
标题:任务优先级与路由逻辑
来源:backend/onyx/background/celery/tasks/beat_schedule.py:43-146、backend/onyx/configs/constants.py:58-63
Worker 生命周期与初始化
所有 Worker 都使用标准化的初始化序列,以确保在开始消费任务之前,数据存储(Postgres、Redis、Vespa)是可访问的。
初始化序列
- 进程启动:由
supervisord管理。backend/supervisord.conf:30-115 worker_init:设置 Postgres 的application_name(例如POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME),并使用与 Worker 并发数匹配的连接池大小初始化SqlEngine。backend/onyx/background/celery/apps/primary.py:116-123- 等待数据存储:调用
app_base.wait_for_redis()、app_base.wait_for_db()和app_base.wait_for_document_index_or_shutdown()。backend/onyx/background/celery/apps/primary.py:125-127 - 主单例锁:(仅主 Worker)获取
OnyxRedisLocks.PRIMARY_WORKER以执行清理任务,如reset_document_sync()。backend/onyx/background/celery/apps/primary.py:156-175 worker_ready:启动指标服务器并处理集群范围内的就绪信号。backend/onyx/background/celery/apps/primary.py:223-225
来源:backend/onyx/background/celery/apps/app_base.py:12-18、backend/onyx/background/celery/apps/primary.py:110-230
主 Worker 与单例管理
主 Worker 充当集群的协调器。在非多租户环境中,它会执行关键的启动维护任务:
- 加锁:获取 Redis 锁
OnyxRedisLocks.PRIMARY_WORKER,超时时间为CELERY_PRIMARY_WORKER_LOCK_TIMEOUT(120 秒)。backend/onyx/background/celery/apps/primary.py:165-172、backend/onyx/configs/constants.py:133 - Redis 清理:调用
reset_document_sync()清除 Redis 中过期的索引状态。backend/onyx/background/celery/apps/primary.py:177 - 孤儿任务管理:识别处于
STARTED或INITIALIZING状态的IndexAttempt记录(可能是由于之前的 Worker 崩溃导致),并将其标记为CANCELED。backend/onyx/background/celery/apps/primary.py:192-213
来源:backend/onyx/background/celery/apps/primary.py:137-213、backend/onyx/configs/constants.py:133
租户感知的任务执行
Onyx 后台任务使用 TenantAwareTask 来执行。这个自定义任务类确保在任务逻辑运行之前,数据库模式被正确切换。
- 上下文注入:
__call__方法拦截任务执行,从kwargs中提取tenant_id,并设置CURRENT_TENANT_ID_CONTEXTVAR。backend/onyx/background/celery/apps/app_base.py:85-90 - 隔离性:这确保了即使多个租户的任务在同一个 Worker 进程上顺序执行,它们的数据库会话(通过
get_session_with_current_tenant)也会指向正确的模式。backend/onyx/background/celery/apps/app_base.py:93-98 - Redis 前缀:
TenantRedis会自动为所有 Redis 键添加tenant_id前缀,以在共享的 Redis 实例中提供逻辑隔离。backend/onyx/redis/redis_pool.py:70-91
来源:backend/onyx/background/celery/apps/app_base.py:80-99、backend/onyx/redis/redis_pool.py:65-175
Supervisord 与并发设置
Onyx 使用 supervisord 来管理这些 Worker 的生命周期。一个关键的配置选择是使用 threads 池而不是默认的 prefork 池。
- 线程池:用于避免 Celery、SQLAlchemy 和某些 C 扩展之间交互导致的
SIGSEGV(信号 11)问题。backend/supervisord.conf:20-29 - Broker 连接池限制:连接池通过
get_redis_client进行管理,该函数配置了对BusyLoadingError、ConnectionError和TimeoutError的重试。backend/onyx/redis/redis_pool.py:51-62 - 日志聚合:supervisord 中的
log-redirect-handler程序会跟踪所有单个 Worker 的日志,并将其管道输出到stdout,以实现容器级别的日志记录。backend/supervisord.conf:168-185
来源:backend/supervisord.conf:20-185、backend/onyx/redis/redis_pool.py:51-62