agentic_huge_data_base / wiki
页面 Onyx · 7.2 Redis 协作模式·DeepWiki 中文全文译文

7.2 · Redis 协作模式(Redis Coordination Patterns)

企业连接器与统一搜索 · 聚焦本章的模块关系、源码依据与实现要点。

项目Onyx 章节7.2 状态全文译文 模块测试、发布与运维、工作流与编排、存储与持久化、检索、召回与索引
源码线索
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/celery_redis.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
模块标签
  • 测试、发布与运维
  • 工作流与编排
  • 存储与持久化
  • 检索、召回与索引
  • 接口与服务契约

章节正文

Redis 协作模式

Redis 协调模式

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/celery_redis.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/document_sync.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/redis/redis_connector_delete.py
  • backend/onyx/redis/redis_connector_doc_perm_sync.py
  • backend/onyx/redis/redis_connector_ext_group_sync.py
  • backend/onyx/redis/redis_connector_prune.py
  • backend/onyx/redis/redis_document_set.py
  • backend/onyx/redis/redis_pool.py
  • backend/onyx/redis/redis_usergroup.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py

本文档描述了 Onyx 如何使用 Redis 在 Celery 工作器之间进行分布式协调、任务调度以及后台进程管理。内容涵盖栅栏模式(fence pattern)、任务集(taskset)追踪的实现细节,以及管理 Redis 连接和多租户的架构组件。

目的与范围

Redis 在 Onyx 中承担三个主要的协调角色:

  1. 任务代理和结果后端:用于 Celery 分布式任务处理。
  2. 分布式锁:用于单例执行和互斥访问。
  3. 任务集和栅栏:用于追踪分布式操作的进度和同步。

本文档重点介绍基于 Redis 构建的协调模式和原语,特别是 TenantRedis 自动前缀、RedisPool 逻辑以及后台任务的状态管理。

Redis 基础设施与连接管理

Onyx 通过一个集中式工具和一个处理多租户隔离的自定义客户端来管理 Redis 连接。

Redis 客户端管理

Redis 客户端通过 get_redis_client()get_redis_replica_client() 获取。这些函数处理底层连接细节,包括认证(标准认证或基于 IAM 的认证)和 SSL 配置。

Onyx · Redis 客户端管理 · 图 1
Onyx · Redis 客户端管理 · 图 1

来源: backend/onyx/redis/redis_pool.py:56-59, backend/onyx/redis/redis_pool.py:207-224

TenantRedis 与自动前缀

在多租户部署(MULTI_TENANT=true)中,Onyx 使用 TenantRedis 提供透明的隔离。这个 redis.Redis 的子类会拦截对 Redis 方法的调用,并在键名前添加 tenant_id 前缀。

特性实现
前缀逻辑如果键名尚未包含前缀,则在其前添加 {tenant_id}:backend/onyx/redis/redis_pool.py:70-91
包装的方法getsetlockdeleteexistssaddsremblpopeval 等。backend/onyx/redis/redis_pool.py:178-204
迭代器支持scan_itersscan_iter 会自动为 match 模式添加前缀,并从结果中去除前缀。backend/onyx/redis/redis_pool.py:104-126
Lua 脚本evalevalsha 被包装,以便为传递给脚本的键名添加前缀。backend/onyx/redis/redis_pool.py:160-174

来源: backend/onyx/redis/redis_pool.py:65-204, shared_configs/configs.py:20-20

栅栏模式

栅栏是同步屏障,用于防止冲突操作(例如,在之前的同步仍在验证时执行文档同步)。

ACTIVE_FENCES 注册表

Onyx 在名为 OnyxRedisConstants.ACTIVE_FENCES 的 Redis 集合中维护一个当前所有活跃栅栏的中央注册表。

Onyx · ACTIVE_FENCES 注册表 · 图 2
Onyx · ACTIVE_FENCES 注册表 · 图 2

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:163-172, backend/onyx/background/celery/apps/primary.py:156-156

栅栏生命周期与验证
  1. 创建:当协调操作开始时,它会创建一个唯一的栅栏键(例如 DOCUMENT_SYNC_FENCE_KEY),并将其添加到 ACTIVE_FENCES 集合中。backend/onyx/background/celery/tasks/vespa/document_sync.py:23-23
  2. 等待机制:依赖任务会检查这些栅栏,并等待最多 CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT(5 分钟)的时间,直到栅栏被清除。backend/onyx/configs/constants.py:156-156
  3. 验证check_for_vespa_sync_task 会定期扫描 ACTIVE_FENCES 集合。如果某个栅栏键存在于集合中,但对应的键在 Redis 中已过期或被删除,则会将其从注册表中移除,以防止过时的注册。backend/onyx/background/celery/tasks/vespa/tasks.py:164-169

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:162-180, backend/onyx/configs/constants.py:156-156

任务集协调模式

任务集允许 Onyx 跨多个工作器追踪一组分布式子任务(例如,将数千个文档同步到 Vespa)的进度。

通过任务后执行实现

Onyx 使用 Celery 的 task_postrun 信号来递减任务集。这确保了即使工作器崩溃或任务失败,协调状态也会得到更新。

Onyx · 通过任务后执行实现 · 图 3
Onyx · 通过任务后执行实现 · 图 3

来源: backend/onyx/background/celery/apps/app_base.py:130-202, backend/onyx/background/celery/tasks/vespa/document_sync.py:30-30

协调实体

专门的 Redis 类管理特定的任务集和协调状态:

用途键模式
RedisDocumentSet追踪文档集的 Vespa 同步document_set_{id}:taskset
RedisUserGroup追踪用户组的 Vespa 同步usergroup_{id}:taskset
RedisConnectorPrune管理剪枝子任务connector_pruning_{id}:subtasks
RedisConnectorDelete协调连接器删除connector_deletion_{id}:taskset

来源: backend/onyx/background/celery/apps/app_base.py:182-196, backend/onyx/redis/redis_document_set.py, backend/onyx/redis/redis_connector_prune.py

主工作器初始化

"主"工作器充当系统编排器。在启动时,它会执行 Redis 协调状态的"全面清理",以处理之前崩溃或中断部署留下的残留数据。

重置序列

当主工作器初始化时(on_worker_init):

  1. 获取全局锁:获取 OnyxRedisLocks.PRIMARY_WORKER 锁,以确保单例行为。backend/onyx/background/celery/apps/primary.py:165-172
  2. 清除栅栏:删除 ACTIVE_FENCES 注册表,以清除所有过时的锁。backend/onyx/background/celery/apps/primary.py:156-156
  3. 重置同步状态:调用 reset_document_sync(r) 以清除全局文档同步任务集。backend/onyx/background/celery/apps/primary.py:180-180
  4. 实体重置:遍历协调类(RedisDocumentSetRedisUserGroup 等),并调用 reset_all(r) 以清除所有特定租户的任务集。backend/onyx/background/celery/apps/primary.py:182-196

来源: backend/onyx/background/celery/apps/primary.py:115-196, backend/onyx/configs/constants.py:133-133

Redis 协调键汇总

常量 / 类Redis 键 / 前缀用途
OnyxRedisConstants.ACTIVE_FENCESda_active_fences所有活跃同步/索引栅栏的注册表
DOCUMENT_SYNC_TASKSET_KEYdocument_sync_taskset活跃 Vespa 同步任务 ID 的全局集合
OnyxRedisLocks.PRIMARY_WORKERda_lock:primary_worker主编排器的单例锁
DOCUMENT_SYNC_FENCE_KEYdocsync_fence文档同步操作的特定栅栏键
OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCKda_lock:check_vespa_sync_beat防止重叠的同步检查

来源: backend/onyx/configs/constants.py:134-134, backend/onyx/background/celery/tasks/vespa/document_sync.py:23-23, backend/onyx/background/celery/tasks/vespa/document_sync.py:30-30, backend/onyx/configs/constants.py:130-130