多节点同步(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/15.3-multi-node-synchronization
翻译时间:2026-06-09T16:11:47.479Z
翻译模型:deepseek-chat
原文字符数:13173
项目:Open WebUI (open-webui)
---
多节点同步
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
backend/open_webui/socket/main.pybackend/open_webui/socket/utils.pybackend/open_webui/tasks.pybackend/open_webui/test/util/test_redis.pybackend/open_webui/utils/rate_limit.pybackend/open_webui/utils/redis.py
Open WebUI 中的多节点同步通过使用 Redis 作为分布式协调层,协调多个后端实例之间的状态,从而实现水平扩展。主要的同步机制是 Socket.IO 的 AsyncRedisManager,它负责在多个服务器实例间广播实时事件。
---
概述与架构
Open WebUI 的多节点架构由三个基于 Redis 的同步层组成:
graph TB
subgraph "前端客户端"
browser1["浏览器 1"]
browser2["浏览器 2"]
browser3["浏览器 3"]
end
subgraph "后端实例 1"
sio1["socketio.AsyncServer"]
mgr1["AsyncRedisManager"]
session1["SESSION_POOL<br/>(RedisDict)"]
usage1["USAGE_POOL<br/>(RedisDict)"]
ydoc1["YDOC_MANAGER<br/>(YdocManager)"]
task_listener1["redis_task_command_listener"]
end
subgraph "后端实例 2"
sio2["socketio.AsyncServer"]
mgr2["AsyncRedisManager"]
session2["SESSION_POOL<br/>(RedisDict)"]
usage2["USAGE_POOL<br/>(RedisDict)"]
ydoc2["YDOC_MANAGER<br/>(YdocManager)"]
task_listener2["redis_task_command_listener"]
end
subgraph "Redis"
pubsub["发布/订阅频道"]
dict_keys["哈希键<br/>(SESSION_POOL, USAGE_POOL)"]
ydoc_keys["文档键<br/>(YDOC_MANAGER)"]
task_keys["任务键<br/>(REDIS_TASKS_KEY)"]
end
browser1 -->|WebSocket| sio1
browser2 -->|WebSocket| sio1
browser3 -->|WebSocket| sio2
sio1 --> mgr1
sio2 --> mgr2
mgr1 <-->|发布/订阅| pubsub
mgr2 <-->|发布/订阅| pubsub
session1 <--> dict_keys
session2 <--> dict_keys
usage1 <--> dict_keys
usage2 <--> dict_keys
ydoc1 <--> ydoc_keys
ydoc2 <--> ydoc_keys
task_listener1 <--> task_keys
task_listener2 <--> task_keys
task_listener1 <-->|发布/订阅| pubsub
基于 Redis 协调的多节点架构
系统使用四种不同的 Redis 模式进行同步:
- AsyncRedisManager (Socket.IO) - 基于发布/订阅的事件广播,用于实时通信
backend/open_webui/socket/main.py:65-84。 - RedisDict - 基于哈希的共享字典,用于
SESSION_POOL和USAGE_POOLbackend/open_webui/socket/utils.py:44-52。 - YdocManager - 基于列表/集合的文档状态管理,用于协同编辑
backend/open_webui/socket/utils.py:124-135。 - 分布式任务管理 - 基于发布/订阅和哈希的协调机制,用于跨节点取消后台任务
backend/open_webui/tasks.py:25-41。
来源:backend/open_webui/socket/main.py:65-176,backend/open_webui/socket/utils.py:44-228,backend/open_webui/tasks.py:20-22
---
Socket.IO AsyncRedisManager
管理器初始化
核心的多节点同步由 Socket.IO 的 AsyncRedisManager 处理,该管理器根据 WEBSOCKET_MANAGER 环境变量进行条件配置 backend/open_webui/socket/main.py:65:
graph TB
env_check{"WEBSOCKET_MANAGER<br/>== 'redis'?"}
env_check -->|是| sentinel_check{"WEBSOCKET_SENTINEL_HOSTS<br/>已设置?"}
env_check -->|否| local["socketio.AsyncServer<br/>(内存管理器)"]
sentinel_check -->|是| sentinel_mgr["AsyncRedisManager<br/>(哨兵 URL)"]
sentinel_check -->|否| redis_mgr["AsyncRedisManager<br/>(直接 URL)"]
sentinel_mgr --> sio_server["socketio.AsyncServer<br/>(client_manager=mgr)"]
redis_mgr --> sio_server
local --> sio_local["socketio.AsyncServer<br/>(默认管理器)"]
sio_server --> async_mode["async_mode='asgi'"]
sio_local --> async_mode
Socket.IO 管理器配置流程
管理器在 backend/open_webui/socket/main.py 中初始化 backend/open_webui/socket/main.py:65-84:
if WEBSOCKET_MANAGER == 'redis':
if WEBSOCKET_SENTINEL_HOSTS:
mgr = socketio.AsyncRedisManager(
get_sentinel_url_from_env(WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT),
redis_options=WEBSOCKET_REDIS_OPTIONS,
)
else:
mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL, redis_options=WEBSOCKET_REDIS_OPTIONS)
sio = socketio.AsyncServer(
# ...
client_manager=mgr,
# ...
)
当配置了 AsyncRedisManager 后,所有 sio.emit() 调用都会自动通过 Redis 发布/订阅广播到所有已连接的服务端实例。
来源:backend/open_webui/socket/main.py:65-84,backend/open_webui/env.py:28-41
---
RedisDict:分布式状态字典
RedisDict 实现
RedisDict 是 backend/open_webui/socket/utils.py 中的一个自定义抽象,它提供了一个由 Redis 哈希支持的、类似 Python 字典的接口 backend/open_webui/socket/utils.py:44-52:
classDiagram
class RedisDict {
+str name
+Redis redis
+__setitem__(key, value)
+__getitem__(key)
+__delitem__(key)
+__contains__(key)
+keys()
+values()
+items()
+get(key, default)
+clear()
}
class Redis {
+hset(name, key, value)
+hget(name, key)
+hdel(name, key)
+hexists(name, key)
+hgetall(name)
}
RedisDict --> Redis : "使用 get_redis_connection"
note for RedisDict "将值序列化为 JSON<br/>提供类似字典的 API<br/>所有操作都是原子的"
RedisDict 类结构
该实现使用 Redis 哈希命令(HSET、HGET、HDEL)配合 JSON 序列化 backend/open_webui/socket/utils.py:54-62:
class RedisDict:
def __setitem__(self, key, value):
serialized_value = json.dumps(value)
self.redis.hset(self.name, key, serialized_value)
def __getitem__(self, key):
value = self.redis.hget(self.name, key)
if value is None:
raise KeyError(key)
return json.loads(value)
来源:backend/open_webui/socket/utils.py:44-123
---
分布式任务管理
Open WebUI 使用 Redis 在多个节点间协调后台任务(如长时间运行的 AI 生成任务)。这使得连接到节点 A 的用户能够取消可能在节点 B 上运行的任务 backend/open_webui/tasks.py:145-163。
任务同步流程
sequenceDiagram
participant User as "用户(客户端)"
participant N1 as "节点 1 (API)"
participant R as "Redis (发布/订阅 + 哈希)"
participant N2 as "节点 2 (工作节点)"
Note over N2: 任务 T1 启动
N2->>R: redis_save_task(T1, item_id)
R-->>N2: HSET REDIS_TASKS_KEY T1
User->>N1: DELETE /tasks/T1 (停止任务)
N1->>R: redis_send_command({action: 'stop', task_id: T1})
R-->>N1: PUBLISH REDIS_PUBSUB_CHANNEL
R->>N2: 任务命令监听器收到 'stop'
N2->>N2: tasks.get(T1).cancel()
N2->>R: redis_cleanup_task(T1)
分布式任务取消
- 任务注册:当启用 Redis 时调用
create_task,任务 ID 和关联的项 ID(例如chat_id)会存储在 Redis 哈希REDIS_TASKS_KEY中backend/open_webui/tasks.py:49-54。 - 命令广播:
stop_task通过 Redis 发布/订阅使用REDIS_PUBSUB_CHANNEL发送 JSON 命令backend/open_webui/tasks.py:153-159。 - 本地执行:每个节点都运行
redis_task_command_listener,监听 "stop" 动作。如果task_id存在于本地tasks字典中,则对asyncio.Task调用.cancel()backend/open_webui/tasks.py:25-41。
来源:backend/open_webui/tasks.py:20-41,backend/open_webui/tasks.py:49-86,backend/open_webui/tasks.py:145-179
---
YdocManager:协同文档状态
YdocManager 管理基于 CRDT 的文档状态,用于跨多个实例的实时协同编辑。它使用 Redis 列表存储更新,使用集合存储活跃用户 backend/open_webui/socket/utils.py:124-135。
压缩策略
为防止 Redis 列表无限增长,YdocManager 实现了一种滚动压缩机制 backend/open_webui/socket/utils.py:152-166:
async def _compact_updates_redis(self, document_id: str):
"""滚动压缩:将最旧的一半更新合并为一个快照。"""
redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
all_updates = await self._redis.lrange(redis_key, 0, -1)
if len(all_updates) <= 1:
return
mid = len(all_updates) // 2
ydoc = Y.Doc()
for raw in all_updates[:mid]:
ydoc.apply_update(bytes(json.loads(raw)))
snapshot = json.dumps(list(ydoc.get_update()))
pipe = self._redis.pipeline()
pipe.delete(redis_key)
pipe.rpush(redis_key, snapshot, *all_updates[mid:])
await pipe.execute()
当更新数量超过 COMPACTION_THRESHOLD(500)时,最旧的一半更新会被合并为一个 Yjs 快照(使用 pycrdt)backend/open_webui/socket/utils.py:125-144。
来源:backend/open_webui/socket/utils.py:124-187
---
使用 RedisLock 实现分布式锁
RedisLock 提供分布式互斥锁,用于协调跨实例的独占操作,例如会话池清理 backend/open_webui/socket/utils.py:9-27。
实现细节
该锁使用 Redis 的 SET 命令,配合 nx=True(仅当键不存在时设置)和 ex=timeout_secs(过期时间)来确保原子性 backend/open_webui/socket/utils.py:29-32:
def aquire_lock(self):
# nx=True 仅在此键尚未被设置时才会设置它
self.lock_obtained = self.redis.set(self.lock_name, self.lock_id, nx=True, ex=self.timeout_secs)
return self.lock_obtained
这在 backend/open_webui/socket/main.py 中被用于防止多个节点在 periodic_session_pool_cleanup 中同时回收孤儿会话 backend/open_webui/socket/main.py:173-194。
来源:backend/open_webui/socket/utils.py:9-42,backend/open_webui/socket/main.py:136-156,backend/open_webui/socket/main.py:173-194
---
SentinelRedisProxy:故障转移处理
SentinelRedisProxy 封装了 Redis Sentinel 连接,并带有针对故障转移场景的自动重试逻辑 backend/open_webui/utils/redis.py:33-35。
自动重试包装器
该代理拦截所有 Redis 方法调用,并为同步和异步模式都包装了重试逻辑 backend/open_webui/utils/redis.py:43-49。它专门针对 ConnectionError 和 ReadOnlyError(当主节点变为副本节点时发生)进行处理 backend/open_webui/utils/redis.py:98-101。
async def _wrapped(*args, **kwargs):
for i in range(REDIS_SENTINEL_MAX_RETRY_COUNT):
try:
method = getattr(self._master(), item)
result = method(*args, **kwargs)
if inspect.iscoroutine(result):
return await result
return result
except (
redis.exceptions.ConnectionError,
redis.exceptions.ReadOnlyError,
) as e:
if i < REDIS_SENTINEL_MAX_RETRY_COUNT - 1:
# 记录日志并在重试前休眠
await asyncio.sleep(REDIS_RECONNECT_DELAY / 1000)
continue
raise e
来源:backend/open_webui/utils/redis.py:33-150,backend/open_webui/test/util/test_redis.py:14-116
---
键命名空间策略
所有 Redis 键都使用可配置的前缀,以支持在单个 Redis 实例上共享多个 Open WebUI 部署。
| 键模式 | 用途 | 来源 |
|---|---|---|
{REDIS_KEY_PREFIX}:models | 全局模型可用性池 | backend/open_webui/socket/main.py:116-121 |
{REDIS_KEY_PREFIX}:session_pool | 活跃的 Socket.IO 会话数据 | backend/open_webui/socket/main.py:123-128 |
{REDIS_KEY_PREFIX}:tasks | 分布式任务跟踪 | backend/open_webui/tasks.py:20 |
{REDIS_KEY_PREFIX}:tasks:commands | 任务控制的发布/订阅频道 | backend/open_webui/tasks.py:22 |
{REDIS_KEY_PREFIX}:ydoc:documents | Yjs 文档状态存储 | backend/open_webui/socket/main.py:167-170 |
{REDIS_KEY_PREFIX}:ratelimit:{key} | 速率限制器桶存储 | backend/open_webui/utils/rate_limit.py:37-38 |
前缀由 REDIS_KEY_PREFIX 环境变量定义 backend/open_webui/env.py:34。
来源:backend/open_webui/env.py:25-41,backend/open_webui/tasks.py:20-22,backend/open_webui/socket/main.py:116-170