agentic_huge_data_base / wiki
页面 Onyx · 7 后台处理与协作·DeepWiki 中文全文译文

7 · 后台处理与协作(Background Processing and Coordination)

企业连接器与统一搜索 · 聚焦本章的模块关系、源码依据与实现要点。

项目Onyx 章节7 状态全文译文 模块文档对象与元数据、工作流与编排、系统架构、测试、发布与运维
源码线索
  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
模块标签
  • 文档对象与元数据
  • 工作流与编排
  • 系统架构
  • 测试、发布与运维
  • 存储与持久化

章节正文

后台处理与协作

后台处理与协调

相关源文件

本章引用的主要源码文件:

  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/docprocessing/tasks.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/background/indexing/run_docfetching.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/db/file_record.py
  • backend/onyx/db/index_attempt_metrics.py
  • backend/onyx/db/index_attempt_metrics_models.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/file_store/staging.py
  • backend/onyx/redis/redis_pool.py
  • backend/onyx/utils/postgres_sanitization.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/celery/test_docprocessing_priority.py
  • backend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.py
  • backend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.py
  • backend/tests/external_dependency_unit/indexing/__init__.py
  • backend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.py
  • backend/tests/external_dependency_unit/indexing_helpers.py
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py
  • backend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.py
  • backend/tests/unit/onyx/db/test_tools.py

本文档描述了 Onyx 基于 Celery 的后台处理基础设施,包括任务编排、基于 Redis 的协调模式、工作线程池架构以及动态任务调度。该系统负责处理异步操作,例如文档索引、连接器清理、权限同步以及向量数据库中的元数据更新。

关于文档索引管线的详细信息,请参阅文档索引管线。关于连接器配置和管理,请参阅连接器框架概述

系统概述

Onyx 使用 Celery 作为分布式任务队列系统,并以 Redis 作为消息代理和协调层。该架构将关注点分离到专门的工作线程池中,每个池针对不同的工作负载特性(I/O 密集型获取、CPU 密集型处理、轻量级元数据更新)进行了优化。

系统组件图

标题:后台处理架构

Onyx · 系统组件图 · 图 1
Onyx · 系统组件图 · 图 1

来源: backend/onyx/background/celery/apps/beat.py:27-58, backend/onyx/configs/constants.py:83-93, backend/onyx/background/celery/apps/primary.py:115-130

Celery 工作线程架构

Onyx 采用多池工作线程策略,以防止长时间运行的任务(如深度索引)阻塞关键维护任务(如权限同步或文档删除)。配置通过 supervisord.confdev_run_background_jobs.py 进行管理。

工作线程池配置

系统定义了专门的工作线程池,每个池分配给特定的 OnyxCeleryQueues

工作线程池应用名称主要队列用途
主工作线程celery_worker_primarycelery高层协调和单例清理任务。
轻量工作线程celery_worker_lightvespa_metadata_sync, connector_deletion, doc_permissions_upsert快速元数据更新和数据库维护。
重量工作线程celery_worker_heavyconnector_pruning, connector_doc_permissions_sync资源密集型同步任务。
文档获取工作线程celery_worker_docfetchingconnector_doc_fetchingI/O 密集型任务,从外部 API 拉取数据。
文档处理工作线程celery_worker_docprocessingdocprocessingCPU 密集型解析和嵌入向量生成。

并发与生命周期:

  • 工作线程在初始化时分配特定的队列,通常通过 supervisord 进行 backend/supervisord.conf:30-116
  • 主工作线程 执行单例初始化,例如在启动时清除 OnyxRedisLocks.PRIMARY_WORKERbackend/onyx/background/celery/apps/primary.py:156-170
  • 在多租户环境中,TenantAwareTask 基类确保在执行前正确设置 CURRENT_TENANT_ID_CONTEXTVAR backend/onyx/background/celery/apps/app_base.py:80-99
  • 详细信息请参阅 Celery 工作线程架构

来源: backend/onyx/configs/constants.py:83-96, backend/onyx/background/celery/apps/primary.py:115-130, backend/onyx/background/celery/apps/app_base.py:80-99, backend/supervisord.conf:30-116

Redis 协调模式

栅栏与任务集

Onyx 使用"栅栏"模式来管理分布式操作的生命周期。这可以防止多个工作线程执行相同的任务,并提供一种机制来跟踪一组子任务(称为"任务集")何时完成。

关键组件:

  • TenantRedis:一个自定义的 Redis 客户端,会自动为键添加 tenant_id 前缀,以确保缓存层的多租户隔离 backend/onyx/redis/redis_pool.py:65-92
  • 栅栏键:Redis 键(通常带有 TTL),用于指示特定实体的活动操作 backend/onyx/background/celery/tasks/vespa/document_sync.py:23-27
  • 任务集键:Redis Set 结构,包含由生成器任务产生的所有子任务的 UUID backend/onyx/background/celery/apps/app_base.py:140-166
  • ACTIVE_FENCES:一个全局的 Redis 集合,在 OnyxRedisConstants.ACTIVE_FENCES 中跟踪,由监控 beat 用于验证和清理过期的栅栏 backend/onyx/background/celery/tasks/vespa/tasks.py:163-172
锁注册表(OnyxRedisLocks
锁名称超时时间文件引用
CHECK_VESPA_SYNC_BEAT_LOCK120秒backend/onyx/configs/constants.py:130
PRIMARY_WORKER120秒backend/onyx/configs/constants.py:133
CELERY_INDEXING_LOCK3小时15分钟backend/onyx/configs/constants.py:150

详细信息请参阅 Redis 协调模式

来源: backend/onyx/configs/constants.py:128-168, backend/onyx/background/celery/apps/app_base.py:140-182, backend/onyx/redis/redis_pool.py:65-175

文档索引管线

索引管线是一个多阶段过程,由生成器任务进行编排。

  1. 文档获取:工作线程通过 instantiate_connector 从源中拉取 Document 对象 backend/onyx/background/indexing/run_docfetching.py:117-124
  2. 批处理:文档被分批存储到 DocumentBatchStoragebackend/onyx/file_store/document_batch_storage.py:103-104
  3. 文档处理:子任务获取批次,执行解析、片段切分和嵌入向量生成 backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111
  4. Vespa 更新:最终的向量和元数据通过 run_indexing_pipeline backend/onyx/background/celery/tasks/docprocessing/tasks.py:111VespaDocumentFields backend/onyx/document_index/interfaces.py:53 推送到文档索引。

详细信息请参阅 文档索引管线

来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111, backend/onyx/background/indexing/run_docfetching.py:117-124

Vespa 同步与维护任务

维护任务确保搜索索引与主 PostgreSQL 数据库保持一致。

  • 元数据同步check_for_vespa_sync_task 识别已更改的文档、DocumentSet 成员资格或 UserGroup 权限,并触发同步 backend/onyx/background/celery/tasks/vespa/tasks.py:75-109
  • 同步记录:系统使用 SyncRecord 模型和 SyncStatus 枚举来跟踪后台更新的进度 backend/onyx/db/enums.py:44-45

详细信息请参阅 Vespa 同步与维护任务

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:75-158, backend/onyx/db/enums.py:44-45

动态任务调度

Onyx 使用自定义调度器来处理多租户任务生成和速率限制。

调度逻辑
  • DynamicTenantScheduler:一个自定义的 Celery 调度器,会定期重新加载其调度,并可以适应租户的添加/移除 backend/onyx/background/celery/apps/beat.py:27-41
  • Beat 乘数:使用 beat_multiplier(云环境默认值为 8.0)来缩放任务间隔,在工作线程过载时提供背压 backend/onyx/background/celery/tasks/beat_schedule.py:33
  • 任务模板beat_task_templates 列表定义了索引检查和清理等周期性任务的调度和优先级 backend/onyx/background/celery/tasks/beat_schedule.py:37-179

详细信息请参阅 动态任务调度

来源: backend/onyx/background/celery/tasks/beat_schedule.py:28-179, backend/onyx/background/celery/apps/beat.py:27-80

连接器生命周期操作

后台任务管理连接器生命周期的"清理"阶段:

  • 清理check_for_pruning 识别索引中不再存在于源中的文档并将其移除 backend/onyx/background/celery/tasks/beat_schedule.py:122-130
  • 删除check_for_connector_deletion 处理 ConnectorCredentialPair 的完全移除,包括撤销任何活动的索引或清理任务 backend/onyx/background/celery/tasks/beat_schedule.py:100-110
代码实体关联

标题:连接器生命周期任务映射

Onyx · 代码实体关联 · 图 2
Onyx · 代码实体关联 · 图 2

来源: backend/onyx/background/celery/tasks/beat_schedule.py:66-110, backend/onyx/configs/constants.py:130-133

详细信息请参阅 连接器生命周期操作