动态任务调度
动态任务调度
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
backend/ee/onyx/background/celery/apps/primary.pybackend/ee/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/apps/app_base.pybackend/onyx/background/celery/apps/beat.pybackend/onyx/background/celery/apps/heavy.pybackend/onyx/background/celery/apps/light.pybackend/onyx/background/celery/apps/monitoring.pybackend/onyx/background/celery/apps/primary.pybackend/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/tasks/shared/tasks.pybackend/onyx/background/celery/tasks/vespa/tasks.pybackend/onyx/configs/app_configs.pybackend/onyx/configs/constants.pybackend/onyx/document_index/factory.pybackend/onyx/redis/redis_pool.pybackend/scripts/dev_run_background_jobs.pybackend/supervisord.confbackend/tests/external_dependency_unit/redis/test_tenant_redis.py
本文档介绍 Onyx 中的动态任务调度系统,重点说明 Celery Beat 如何配置以生成和管理周期性后台任务。关于任务本身及其执行的信息,请参见后台处理与协调。关于工作进程类型和队列分配的详细信息,请参见Celery 工作进程架构。
目的与架构
Onyx 后台处理系统使用自定义的 DynamicTenantScheduler,该类扩展了 Celery 的 PersistentScheduler,支持运行时配置的任务调度。该调度器同时支持单租户(自托管)和多租户(云)部署,可根据租户可用性和可调节的速率限制动态生成任务调度。
关键能力:
- 无需重启进程即可动态重新生成调度
backend/onyx/background/celery/apps/beat.py:155-156 - 多租户模式下按租户生成任务
backend/onyx/background/celery/apps/beat.py:123-151 - 通过 Redis 中存储的
beat_multiplier实现运行时可调节的任务生成速率backend/onyx/background/celery/apps/beat.py:169-171 - 每 60 秒自动发现租户并更新调度
backend/onyx/background/celery/apps/beat.py:31-41
来源:backend/onyx/background/celery/apps/beat.py:27-41, backend/onyx/background/celery/apps/beat.py:123-151, shared_configs/configs.py:56-56
DynamicTenantScheduler 类
DynamicTenantScheduler 定义在 backend/onyx/background/celery/apps/beat.py:27-235 中,提供了核心调度逻辑。它重写了 tick() 方法,以定期检查调度更新。
自然语言到代码实体的调度器架构
下图将高级调度概念映射到实现它们的特定代码实体。
来源:backend/onyx/background/celery/apps/beat.py:27-145, backend/onyx/background/celery/tasks/beat_schedule.py:37-179, backend/onyx/db/engine/tenant_utils.py:15-15
关键方法:
tick():由 Celery Beat 定期调用;每RELOAD_INTERVAL(60 秒)检查调度是否需要更新backend/onyx/background/celery/apps/beat.py:61-80_try_updating_schedule():获取当前租户和 beat 乘数,如果检测到任何变化则重新生成调度backend/onyx/background/celery/apps/beat.py:155-190_generate_schedule():根据MULTI_TENANT标志将任务模板转换为按租户或系统范围的任务条目backend/onyx/background/celery/apps/beat.py:82-153
来源:backend/onyx/background/celery/apps/beat.py:27-235
Beat 任务模板
Beat 任务模板作为周期性任务的蓝图。每个模板指定了任务名称、Celery 任务标识符、调度间隔和执行选项。
模板结构
| 字段 | 类型 | 描述 |
|---|---|---|
name | str | 人类可读的任务标识符 backend/onyx/background/celery/tasks/beat_schedule.py:39-39 |
task | str | Celery 任务名称(来自 OnyxCeleryTask 枚举) backend/onyx/background/celery/tasks/beat_schedule.py:40-40 |
schedule | timedelta | 执行频率 backend/onyx/background/celery/tasks/beat_schedule.py:41-41 |
options | dict | 优先级、过期时间和队列分配 backend/onyx/background/celery/tasks/beat_schedule.py:42-45 |
来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-46, backend/onyx/configs/constants.py:18-18
核心任务模板
以下核心模板配置在 backend/onyx/background/celery/tasks/beat_schedule.py:37-179 中:
| 任务名称 | 任务枚举 | 频率 | 优先级 | 描述 |
|---|---|---|---|---|
check-for-user-file-processing | CHECK_FOR_USER_FILE_PROCESSING | 20 秒 | 中 | 处理用户上传的文件 backend/onyx/background/celery/tasks/beat_schedule.py:39-46 |
check-for-indexing | CHECK_FOR_INDEXING | 15 秒 | 中 | 触发连接器索引 backend/onyx/background/celery/tasks/beat_schedule.py:66-74 |
check-for-connector-deletion | CHECK_FOR_CONNECTOR_DELETION | 20 秒 | 中 | 处理后台清理 backend/onyx/background/celery/tasks/beat_schedule.py:100-110 |
check-for-vespa-sync | CHECK_FOR_VESPA_SYNC_TASK | 20 秒 | 中 | 将元数据同步到文档索引 backend/onyx/background/celery/tasks/beat_schedule.py:112-120 |
check-for-pruning | CHECK_FOR_PRUNING | 20 秒 | 中 | 移除过期文档 backend/onyx/background/celery/tasks/beat_schedule.py:122-130 |
monitor-background-processes | MONITOR_BACKGROUND_PROCESSES | 5 分钟 | 低 | 工作进程健康检查 backend/onyx/background/celery/tasks/beat_schedule.py:141-149 |
来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-179
多租户任务生成
在多租户部署(MULTI_TENANT=true)中,调度器使用两层方法来管理跨多个租户的负载。
系统范围的云任务
云任务生成一次,处理跨所有租户的系统级操作。它们通过 get_cloud_tasks_to_schedule(beat_multiplier) 获取,并为整个集群仅调度一次 backend/onyx/background/celery/apps/beat.py:91-111。
按租户的任务扩展
对于通过 get_all_tenant_ids() 发现的每个租户,调度器会生成单独的任务实例。这样可以实现按租户的细粒度控制和后台工作隔离。
来源:backend/onyx/background/celery/apps/beat.py:123-151, backend/onyx/db/engine/tenant_utils.py:15-15
每个生成的任务在其 kwargs 中包含 tenant_id,该值通过 TenantAwareTask 基类传播到任务执行中 backend/onyx/background/celery/apps/app_base.py:80-99。
Beat 乘数配置
beat_multiplier 是一个运行时可调节的浮点值,用于缩放任务生成速率。这在云部署中至关重要,可以防止工作进程过载。
| 配置项 | 默认值 | 用途 |
|---|---|---|
CLOUD_BEAT_MULTIPLIER_DEFAULT | 8.0 | 缩放任务间隔(例如,15 秒变为 120 秒) backend/onyx/background/celery/tasks/beat_schedule.py:33-33 |
RELOAD_INTERVAL | 60 | 调度器检查乘数变化的频率(秒) backend/onyx/background/celery/apps/beat.py:31-31 |
乘数通过 OnyxRuntime.get_beat_multiplier() 获取 backend/onyx/background/celery/apps/beat.py:169-169。如果乘数或租户列表发生变化,DynamicTenantScheduler 会重建其内部调度 backend/onyx/background/celery/apps/beat.py:177-189。
来源:backend/onyx/background/celery/apps/beat.py:36-41, backend/onyx/background/celery/tasks/beat_schedule.py:33-33
存活探针与监控
调度器实现了健康监控机制:
存活探针文件
每次重新加载间隔完成时,调度器会更新一个存活探针文件,该文件路径由 make_probe_path("liveness", "beat@hostname") 生成 backend/onyx/background/celery/apps/beat.py:50-69。外部监控系统可以检查该文件的修改时间。
Redis 协调与栅栏
后台任务使用"栅栏"模式防止重叠执行。例如,check_for_vespa_sync_task 会获取一个超时时间为 CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT(120 秒)的 OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK 锁 backend/onyx/background/celery/tasks/vespa/tasks.py:95-102。这确保了即使有多个 beat 实例运行,每个租户也只有一个实例会启动同步逻辑。
来源:backend/onyx/background/celery/apps/beat.py:50-69, backend/onyx/background/celery/tasks/vespa/tasks.py:95-102, backend/onyx/configs/constants.py:130-130
初始化与启动
Beat 调度器通过 Celery 的 beat_init 信号进行初始化。
来源:backend/onyx/background/celery/apps/beat.py:237-253, backend/onyx/configs/constants.py:85-85
beat_init 信号处理器使用较小的连接池大小(2)执行数据库初始化,因为调度器只读取租户元数据,不执行繁重的数据库操作 backend/onyx/background/celery/apps/beat.py:238-241。
来源:backend/onyx/background/celery/apps/beat.py:237-253