后台处理与协作
后台处理与协调
相关源文件
本章引用的主要源码文件:
backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.pybackend/ee/onyx/background/celery/apps/primary.pybackend/ee/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/apps/app_base.pybackend/onyx/background/celery/apps/beat.pybackend/onyx/background/celery/apps/heavy.pybackend/onyx/background/celery/apps/light.pybackend/onyx/background/celery/apps/monitoring.pybackend/onyx/background/celery/apps/primary.pybackend/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/tasks/docprocessing/tasks.pybackend/onyx/background/celery/tasks/shared/tasks.pybackend/onyx/background/celery/tasks/vespa/tasks.pybackend/onyx/background/indexing/run_docfetching.pybackend/onyx/configs/app_configs.pybackend/onyx/configs/constants.pybackend/onyx/db/file_record.pybackend/onyx/db/index_attempt_metrics.pybackend/onyx/db/index_attempt_metrics_models.pybackend/onyx/document_index/factory.pybackend/onyx/file_store/staging.pybackend/onyx/redis/redis_pool.pybackend/onyx/utils/postgres_sanitization.pybackend/scripts/dev_run_background_jobs.pybackend/supervisord.confbackend/tests/external_dependency_unit/celery/test_docprocessing_priority.pybackend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.pybackend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.pybackend/tests/external_dependency_unit/indexing/__init__.pybackend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.pybackend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.pybackend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.pybackend/tests/external_dependency_unit/indexing_helpers.pybackend/tests/external_dependency_unit/redis/test_tenant_redis.pybackend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.pybackend/tests/unit/onyx/db/test_tools.py
本文档描述了 Onyx 基于 Celery 的后台处理基础设施,包括任务编排、基于 Redis 的协调模式、工作线程池架构以及动态任务调度。该系统负责处理异步操作,例如文档索引、连接器清理、权限同步以及向量数据库中的元数据更新。
关于文档索引管线的详细信息,请参阅文档索引管线。关于连接器配置和管理,请参阅连接器框架概述。
系统概述
Onyx 使用 Celery 作为分布式任务队列系统,并以 Redis 作为消息代理和协调层。该架构将关注点分离到专门的工作线程池中,每个池针对不同的工作负载特性(I/O 密集型获取、CPU 密集型处理、轻量级元数据更新)进行了优化。
系统组件图
标题:后台处理架构
来源: backend/onyx/background/celery/apps/beat.py:27-58, backend/onyx/configs/constants.py:83-93, backend/onyx/background/celery/apps/primary.py:115-130
Celery 工作线程架构
Onyx 采用多池工作线程策略,以防止长时间运行的任务(如深度索引)阻塞关键维护任务(如权限同步或文档删除)。配置通过 supervisord.conf 和 dev_run_background_jobs.py 进行管理。
工作线程池配置
系统定义了专门的工作线程池,每个池分配给特定的 OnyxCeleryQueues。
| 工作线程池 | 应用名称 | 主要队列 | 用途 |
|---|---|---|---|
| 主工作线程 | celery_worker_primary | celery | 高层协调和单例清理任务。 |
| 轻量工作线程 | celery_worker_light | vespa_metadata_sync, connector_deletion, doc_permissions_upsert | 快速元数据更新和数据库维护。 |
| 重量工作线程 | celery_worker_heavy | connector_pruning, connector_doc_permissions_sync | 资源密集型同步任务。 |
| 文档获取工作线程 | celery_worker_docfetching | connector_doc_fetching | I/O 密集型任务,从外部 API 拉取数据。 |
| 文档处理工作线程 | celery_worker_docprocessing | docprocessing | CPU 密集型解析和嵌入向量生成。 |
并发与生命周期:
- 工作线程在初始化时分配特定的队列,通常通过
supervisord进行backend/supervisord.conf:30-116。 主工作线程执行单例初始化,例如在启动时清除OnyxRedisLocks.PRIMARY_WORKER锁backend/onyx/background/celery/apps/primary.py:156-170。- 在多租户环境中,
TenantAwareTask基类确保在执行前正确设置CURRENT_TENANT_ID_CONTEXTVARbackend/onyx/background/celery/apps/app_base.py:80-99。 - 详细信息请参阅 Celery 工作线程架构。
来源: backend/onyx/configs/constants.py:83-96, backend/onyx/background/celery/apps/primary.py:115-130, backend/onyx/background/celery/apps/app_base.py:80-99, backend/supervisord.conf:30-116
Redis 协调模式
栅栏与任务集
Onyx 使用"栅栏"模式来管理分布式操作的生命周期。这可以防止多个工作线程执行相同的任务,并提供一种机制来跟踪一组子任务(称为"任务集")何时完成。
关键组件:
- TenantRedis:一个自定义的 Redis 客户端,会自动为键添加
tenant_id前缀,以确保缓存层的多租户隔离backend/onyx/redis/redis_pool.py:65-92。 - 栅栏键:Redis 键(通常带有 TTL),用于指示特定实体的活动操作
backend/onyx/background/celery/tasks/vespa/document_sync.py:23-27。 - 任务集键:Redis
Set结构,包含由生成器任务产生的所有子任务的 UUIDbackend/onyx/background/celery/apps/app_base.py:140-166。 - ACTIVE_FENCES:一个全局的 Redis 集合,在
OnyxRedisConstants.ACTIVE_FENCES中跟踪,由监控 beat 用于验证和清理过期的栅栏backend/onyx/background/celery/tasks/vespa/tasks.py:163-172。
锁注册表(OnyxRedisLocks)
| 锁名称 | 超时时间 | 文件引用 |
|---|---|---|
CHECK_VESPA_SYNC_BEAT_LOCK | 120秒 | backend/onyx/configs/constants.py:130 |
PRIMARY_WORKER | 120秒 | backend/onyx/configs/constants.py:133 |
CELERY_INDEXING_LOCK | 3小时15分钟 | backend/onyx/configs/constants.py:150 |
详细信息请参阅 Redis 协调模式。
来源: backend/onyx/configs/constants.py:128-168, backend/onyx/background/celery/apps/app_base.py:140-182, backend/onyx/redis/redis_pool.py:65-175
文档索引管线
索引管线是一个多阶段过程,由生成器任务进行编排。
- 文档获取:工作线程通过
instantiate_connector从源中拉取Document对象backend/onyx/background/indexing/run_docfetching.py:117-124。 - 批处理:文档被分批存储到
DocumentBatchStorage中backend/onyx/file_store/document_batch_storage.py:103-104。 - 文档处理:子任务获取批次,执行解析、片段切分和嵌入向量生成
backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111。 - Vespa 更新:最终的向量和元数据通过
run_indexing_pipelinebackend/onyx/background/celery/tasks/docprocessing/tasks.py:111和VespaDocumentFieldsbackend/onyx/document_index/interfaces.py:53推送到文档索引。
详细信息请参阅 文档索引管线。
来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111, backend/onyx/background/indexing/run_docfetching.py:117-124
Vespa 同步与维护任务
维护任务确保搜索索引与主 PostgreSQL 数据库保持一致。
- 元数据同步:
check_for_vespa_sync_task识别已更改的文档、DocumentSet成员资格或UserGroup权限,并触发同步backend/onyx/background/celery/tasks/vespa/tasks.py:75-109。 - 同步记录:系统使用
SyncRecord模型和SyncStatus枚举来跟踪后台更新的进度backend/onyx/db/enums.py:44-45。
详细信息请参阅 Vespa 同步与维护任务。
来源: backend/onyx/background/celery/tasks/vespa/tasks.py:75-158, backend/onyx/db/enums.py:44-45
动态任务调度
Onyx 使用自定义调度器来处理多租户任务生成和速率限制。
调度逻辑
- DynamicTenantScheduler:一个自定义的 Celery 调度器,会定期重新加载其调度,并可以适应租户的添加/移除
backend/onyx/background/celery/apps/beat.py:27-41。 - Beat 乘数:使用
beat_multiplier(云环境默认值为 8.0)来缩放任务间隔,在工作线程过载时提供背压backend/onyx/background/celery/tasks/beat_schedule.py:33。 - 任务模板:
beat_task_templates列表定义了索引检查和清理等周期性任务的调度和优先级backend/onyx/background/celery/tasks/beat_schedule.py:37-179。
详细信息请参阅 动态任务调度。
来源: backend/onyx/background/celery/tasks/beat_schedule.py:28-179, backend/onyx/background/celery/apps/beat.py:27-80
连接器生命周期操作
后台任务管理连接器生命周期的"清理"阶段:
- 清理:
check_for_pruning识别索引中不再存在于源中的文档并将其移除backend/onyx/background/celery/tasks/beat_schedule.py:122-130。 - 删除:
check_for_connector_deletion处理ConnectorCredentialPair的完全移除,包括撤销任何活动的索引或清理任务backend/onyx/background/celery/tasks/beat_schedule.py:100-110。
代码实体关联
标题:连接器生命周期任务映射
来源: backend/onyx/background/celery/tasks/beat_schedule.py:66-110, backend/onyx/configs/constants.py:130-133
详细信息请参阅 连接器生命周期操作。