agentic_huge_data_base / wiki
页面 Onyx · 7.5 动态任务调度·DeepWiki 中文全文译文

7.5 · 动态任务调度(Dynamic Task Scheduling)

企业连接器与统一搜索 · 聚焦本章的模块关系、源码依据与实现要点。

项目Onyx 章节7.5 状态全文译文 模块测试、发布与运维、工作流与编排、配置治理、文档对象与元数据
源码线索
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
模块标签
  • 测试、发布与运维
  • 工作流与编排
  • 配置治理
  • 文档对象与元数据
  • 系统架构

章节正文

动态任务调度

动态任务调度

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/redis/redis_pool.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py

本文档介绍 Onyx 中的动态任务调度系统,重点说明 Celery Beat 如何配置以生成和管理周期性后台任务。关于任务本身及其执行的信息,请参见后台处理与协调。关于工作进程类型和队列分配的详细信息,请参见Celery 工作进程架构

目的与架构

Onyx 后台处理系统使用自定义的 DynamicTenantScheduler,该类扩展了 Celery 的 PersistentScheduler,支持运行时配置的任务调度。该调度器同时支持单租户(自托管)和多租户(云)部署,可根据租户可用性和可调节的速率限制动态生成任务调度。

关键能力:

  • 无需重启进程即可动态重新生成调度 backend/onyx/background/celery/apps/beat.py:155-156
  • 多租户模式下按租户生成任务 backend/onyx/background/celery/apps/beat.py:123-151
  • 通过 Redis 中存储的 beat_multiplier 实现运行时可调节的任务生成速率 backend/onyx/background/celery/apps/beat.py:169-171
  • 每 60 秒自动发现租户并更新调度 backend/onyx/background/celery/apps/beat.py:31-41

来源:backend/onyx/background/celery/apps/beat.py:27-41, backend/onyx/background/celery/apps/beat.py:123-151, shared_configs/configs.py:56-56

DynamicTenantScheduler 类

DynamicTenantScheduler 定义在 backend/onyx/background/celery/apps/beat.py:27-235 中,提供了核心调度逻辑。它重写了 tick() 方法,以定期检查调度更新。

自然语言到代码实体的调度器架构

下图将高级调度概念映射到实现它们的特定代码实体。

Onyx · 自然语言到代码实体的调度器架构 · 图 1
Onyx · 自然语言到代码实体的调度器架构 · 图 1

来源:backend/onyx/background/celery/apps/beat.py:27-145, backend/onyx/background/celery/tasks/beat_schedule.py:37-179, backend/onyx/db/engine/tenant_utils.py:15-15

关键方法:

  • tick():由 Celery Beat 定期调用;每 RELOAD_INTERVAL(60 秒)检查调度是否需要更新 backend/onyx/background/celery/apps/beat.py:61-80
  • _try_updating_schedule():获取当前租户和 beat 乘数,如果检测到任何变化则重新生成调度 backend/onyx/background/celery/apps/beat.py:155-190
  • _generate_schedule():根据 MULTI_TENANT 标志将任务模板转换为按租户或系统范围的任务条目 backend/onyx/background/celery/apps/beat.py:82-153

来源:backend/onyx/background/celery/apps/beat.py:27-235

Beat 任务模板

Beat 任务模板作为周期性任务的蓝图。每个模板指定了任务名称、Celery 任务标识符、调度间隔和执行选项。

模板结构
字段类型描述
namestr人类可读的任务标识符 backend/onyx/background/celery/tasks/beat_schedule.py:39-39
taskstrCelery 任务名称(来自 OnyxCeleryTask 枚举) backend/onyx/background/celery/tasks/beat_schedule.py:40-40
scheduletimedelta执行频率 backend/onyx/background/celery/tasks/beat_schedule.py:41-41
optionsdict优先级、过期时间和队列分配 backend/onyx/background/celery/tasks/beat_schedule.py:42-45

来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-46, backend/onyx/configs/constants.py:18-18

核心任务模板

以下核心模板配置在 backend/onyx/background/celery/tasks/beat_schedule.py:37-179 中:

任务名称任务枚举频率优先级描述
check-for-user-file-processingCHECK_FOR_USER_FILE_PROCESSING20 秒处理用户上传的文件 backend/onyx/background/celery/tasks/beat_schedule.py:39-46
check-for-indexingCHECK_FOR_INDEXING15 秒触发连接器索引 backend/onyx/background/celery/tasks/beat_schedule.py:66-74
check-for-connector-deletionCHECK_FOR_CONNECTOR_DELETION20 秒处理后台清理 backend/onyx/background/celery/tasks/beat_schedule.py:100-110
check-for-vespa-syncCHECK_FOR_VESPA_SYNC_TASK20 秒将元数据同步到文档索引 backend/onyx/background/celery/tasks/beat_schedule.py:112-120
check-for-pruningCHECK_FOR_PRUNING20 秒移除过期文档 backend/onyx/background/celery/tasks/beat_schedule.py:122-130
monitor-background-processesMONITOR_BACKGROUND_PROCESSES5 分钟工作进程健康检查 backend/onyx/background/celery/tasks/beat_schedule.py:141-149

来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-179

多租户任务生成

在多租户部署(MULTI_TENANT=true)中,调度器使用两层方法来管理跨多个租户的负载。

系统范围的云任务

云任务生成一次,处理跨所有租户的系统级操作。它们通过 get_cloud_tasks_to_schedule(beat_multiplier) 获取,并为整个集群仅调度一次 backend/onyx/background/celery/apps/beat.py:91-111

按租户的任务扩展

对于通过 get_all_tenant_ids() 发现的每个租户,调度器会生成单独的任务实例。这样可以实现按租户的细粒度控制和后台工作隔离。

Onyx · 按租户的任务扩展 · 图 2
Onyx · 按租户的任务扩展 · 图 2

来源:backend/onyx/background/celery/apps/beat.py:123-151, backend/onyx/db/engine/tenant_utils.py:15-15

每个生成的任务在其 kwargs 中包含 tenant_id,该值通过 TenantAwareTask 基类传播到任务执行中 backend/onyx/background/celery/apps/app_base.py:80-99

Beat 乘数配置

beat_multiplier 是一个运行时可调节的浮点值,用于缩放任务生成速率。这在云部署中至关重要,可以防止工作进程过载。

配置项默认值用途
CLOUD_BEAT_MULTIPLIER_DEFAULT8.0缩放任务间隔(例如,15 秒变为 120 秒) backend/onyx/background/celery/tasks/beat_schedule.py:33-33
RELOAD_INTERVAL60调度器检查乘数变化的频率(秒) backend/onyx/background/celery/apps/beat.py:31-31

乘数通过 OnyxRuntime.get_beat_multiplier() 获取 backend/onyx/background/celery/apps/beat.py:169-169。如果乘数或租户列表发生变化,DynamicTenantScheduler 会重建其内部调度 backend/onyx/background/celery/apps/beat.py:177-189

来源:backend/onyx/background/celery/apps/beat.py:36-41, backend/onyx/background/celery/tasks/beat_schedule.py:33-33

存活探针与监控

调度器实现了健康监控机制:

存活探针文件

每次重新加载间隔完成时,调度器会更新一个存活探针文件,该文件路径由 make_probe_path("liveness", "beat@hostname") 生成 backend/onyx/background/celery/apps/beat.py:50-69。外部监控系统可以检查该文件的修改时间。

Redis 协调与栅栏

后台任务使用"栅栏"模式防止重叠执行。例如,check_for_vespa_sync_task 会获取一个超时时间为 CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT(120 秒)的 OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCKbackend/onyx/background/celery/tasks/vespa/tasks.py:95-102。这确保了即使有多个 beat 实例运行,每个租户也只有一个实例会启动同步逻辑。

来源:backend/onyx/background/celery/apps/beat.py:50-69, backend/onyx/background/celery/tasks/vespa/tasks.py:95-102, backend/onyx/configs/constants.py:130-130

初始化与启动

Beat 调度器通过 Celery 的 beat_init 信号进行初始化。

Onyx · 初始化与启动 · 图 3
Onyx · 初始化与启动 · 图 3

来源:backend/onyx/background/celery/apps/beat.py:237-253, backend/onyx/configs/constants.py:85-85

beat_init 信号处理器使用较小的连接池大小(2)执行数据库初始化,因为调度器只读取租户元数据,不执行繁重的数据库操作 backend/onyx/background/celery/apps/beat.py:238-241

来源:backend/onyx/background/celery/apps/beat.py:237-253