agentic_huge_data_base / wiki
页面 Open WebUI · 15.3 多节点同步·DeepWiki 中文全文译文

15.3 · 多节点同步(Multi-node Synchronization)

多模型对话工作台与知识应用入口 · 本章是 Open WebUI DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Open WebUI 章节15.3 状态全文译文 模块频道、笔记与协作、系统架构、检索、召回与知识系统、接口与服务契约
源码线索
  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • backend/open_webui/env.py
模块标签
  • 频道、笔记与协作
  • 系统架构
  • 检索、召回与知识系统
  • 接口与服务契约
  • 界面与交互

中文译文

多节点同步(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/15.3-multi-node-synchronization
翻译时间:2026-06-09T16:11:47.479Z
翻译模型:deepseek-chat
原文字符数:13173
项目:Open WebUI (open-webui)

---

多节点同步

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py

Open WebUI 中的多节点同步通过使用 Redis 作为分布式协调层,协调多个后端实例之间的状态,从而实现水平扩展。主要的同步机制是 Socket.IO 的 AsyncRedisManager,它负责在多个服务器实例间广播实时事件。

---

概述与架构

Open WebUI 的多节点架构由三个基于 Redis 的同步层组成:

graph TB
    subgraph "前端客户端"
        browser1["浏览器 1"]
        browser2["浏览器 2"]
        browser3["浏览器 3"]
    end

    subgraph "后端实例 1"
        sio1["socketio.AsyncServer"]
        mgr1["AsyncRedisManager"]
        session1["SESSION_POOL<br/>(RedisDict)"]
        usage1["USAGE_POOL<br/>(RedisDict)"]
        ydoc1["YDOC_MANAGER<br/>(YdocManager)"]
        task_listener1["redis_task_command_listener"]
    end

    subgraph "后端实例 2"
        sio2["socketio.AsyncServer"]
        mgr2["AsyncRedisManager"]
        session2["SESSION_POOL<br/>(RedisDict)"]
        usage2["USAGE_POOL<br/>(RedisDict)"]
        ydoc2["YDOC_MANAGER<br/>(YdocManager)"]
        task_listener2["redis_task_command_listener"]
    end

    subgraph "Redis"
        pubsub["发布/订阅频道"]
        dict_keys["哈希键<br/>(SESSION_POOL, USAGE_POOL)"]
        ydoc_keys["文档键<br/>(YDOC_MANAGER)"]
        task_keys["任务键<br/>(REDIS_TASKS_KEY)"]
    end

    browser1 -->|WebSocket| sio1
    browser2 -->|WebSocket| sio1
    browser3 -->|WebSocket| sio2

    sio1 --> mgr1
    sio2 --> mgr2

    mgr1 <-->|发布/订阅| pubsub
    mgr2 <-->|发布/订阅| pubsub

    session1 <--> dict_keys
    session2 <--> dict_keys

    usage1 <--> dict_keys
    usage2 <--> dict_keys

    ydoc1 <--> ydoc_keys
    ydoc2 <--> ydoc_keys

    task_listener1 <--> task_keys
    task_listener2 <--> task_keys
    task_listener1 <-->|发布/订阅| pubsub

基于 Redis 协调的多节点架构

系统使用四种不同的 Redis 模式进行同步:

  1. AsyncRedisManager (Socket.IO) - 基于发布/订阅的事件广播,用于实时通信 backend/open_webui/socket/main.py:65-84
  2. RedisDict - 基于哈希的共享字典,用于 SESSION_POOLUSAGE_POOL backend/open_webui/socket/utils.py:44-52
  3. YdocManager - 基于列表/集合的文档状态管理,用于协同编辑 backend/open_webui/socket/utils.py:124-135
  4. 分布式任务管理 - 基于发布/订阅和哈希的协调机制,用于跨节点取消后台任务 backend/open_webui/tasks.py:25-41

来源:backend/open_webui/socket/main.py:65-176backend/open_webui/socket/utils.py:44-228backend/open_webui/tasks.py:20-22

---

Socket.IO AsyncRedisManager

管理器初始化

核心的多节点同步由 Socket.IO 的 AsyncRedisManager 处理,该管理器根据 WEBSOCKET_MANAGER 环境变量进行条件配置 backend/open_webui/socket/main.py:65

graph TB
    env_check{"WEBSOCKET_MANAGER<br/>== 'redis'?"}

    env_check -->|是| sentinel_check{"WEBSOCKET_SENTINEL_HOSTS<br/>已设置?"}
    env_check -->|否| local["socketio.AsyncServer<br/>(内存管理器)"]

    sentinel_check -->|是| sentinel_mgr["AsyncRedisManager<br/>(哨兵 URL)"]
    sentinel_check -->|否| redis_mgr["AsyncRedisManager<br/>(直接 URL)"]

    sentinel_mgr --> sio_server["socketio.AsyncServer<br/>(client_manager=mgr)"]
    redis_mgr --> sio_server
    local --> sio_local["socketio.AsyncServer<br/>(默认管理器)"]

    sio_server --> async_mode["async_mode='asgi'"]
    sio_local --> async_mode

Socket.IO 管理器配置流程

管理器在 backend/open_webui/socket/main.py 中初始化 backend/open_webui/socket/main.py:65-84

if WEBSOCKET_MANAGER == 'redis':
    if WEBSOCKET_SENTINEL_HOSTS:
        mgr = socketio.AsyncRedisManager(
            get_sentinel_url_from_env(WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT),
            redis_options=WEBSOCKET_REDIS_OPTIONS,
        )
    else:
        mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL, redis_options=WEBSOCKET_REDIS_OPTIONS)
    sio = socketio.AsyncServer(
        # ...
        client_manager=mgr,
        # ...
    )

当配置了 AsyncRedisManager 后,所有 sio.emit() 调用都会自动通过 Redis 发布/订阅广播到所有已连接的服务端实例。

来源:backend/open_webui/socket/main.py:65-84backend/open_webui/env.py:28-41

---

RedisDict:分布式状态字典

RedisDict 实现

RedisDictbackend/open_webui/socket/utils.py 中的一个自定义抽象,它提供了一个由 Redis 哈希支持的、类似 Python 字典的接口 backend/open_webui/socket/utils.py:44-52

classDiagram
    class RedisDict {
        +str name
        +Redis redis
        +__setitem__(key, value)
        +__getitem__(key)
        +__delitem__(key)
        +__contains__(key)
        +keys()
        +values()
        +items()
        +get(key, default)
        +clear()
    }

    class Redis {
        +hset(name, key, value)
        +hget(name, key)
        +hdel(name, key)
        +hexists(name, key)
        +hgetall(name)
    }

    RedisDict --> Redis : "使用 get_redis_connection"

    note for RedisDict "将值序列化为 JSON<br/>提供类似字典的 API<br/>所有操作都是原子的"

RedisDict 类结构

该实现使用 Redis 哈希命令(HSETHGETHDEL)配合 JSON 序列化 backend/open_webui/socket/utils.py:54-62

class RedisDict:
    def __setitem__(self, key, value):
        serialized_value = json.dumps(value)
        self.redis.hset(self.name, key, serialized_value)

    def __getitem__(self, key):
        value = self.redis.hget(self.name, key)
        if value is None:
            raise KeyError(key)
        return json.loads(value)

来源:backend/open_webui/socket/utils.py:44-123

---

分布式任务管理

Open WebUI 使用 Redis 在多个节点间协调后台任务(如长时间运行的 AI 生成任务)。这使得连接到节点 A 的用户能够取消可能在节点 B 上运行的任务 backend/open_webui/tasks.py:145-163

任务同步流程
sequenceDiagram
    participant User as "用户(客户端)"
    participant N1 as "节点 1 (API)"
    participant R as "Redis (发布/订阅 + 哈希)"
    participant N2 as "节点 2 (工作节点)"

    Note over N2: 任务 T1 启动
    N2->>R: redis_save_task(T1, item_id)
    R-->>N2: HSET REDIS_TASKS_KEY T1

    User->>N1: DELETE /tasks/T1 (停止任务)
    N1->>R: redis_send_command({action: 'stop', task_id: T1})
    R-->>N1: PUBLISH REDIS_PUBSUB_CHANNEL

    R->>N2: 任务命令监听器收到 'stop'
    N2->>N2: tasks.get(T1).cancel()
    N2->>R: redis_cleanup_task(T1)

分布式任务取消

  1. 任务注册:当启用 Redis 时调用 create_task,任务 ID 和关联的项 ID(例如 chat_id)会存储在 Redis 哈希 REDIS_TASKS_KEYbackend/open_webui/tasks.py:49-54
  2. 命令广播stop_task 通过 Redis 发布/订阅使用 REDIS_PUBSUB_CHANNEL 发送 JSON 命令 backend/open_webui/tasks.py:153-159
  3. 本地执行:每个节点都运行 redis_task_command_listener,监听 "stop" 动作。如果 task_id 存在于本地 tasks 字典中,则对 asyncio.Task 调用 .cancel() backend/open_webui/tasks.py:25-41

来源:backend/open_webui/tasks.py:20-41backend/open_webui/tasks.py:49-86backend/open_webui/tasks.py:145-179

---

YdocManager:协同文档状态

YdocManager 管理基于 CRDT 的文档状态,用于跨多个实例的实时协同编辑。它使用 Redis 列表存储更新,使用集合存储活跃用户 backend/open_webui/socket/utils.py:124-135

压缩策略

为防止 Redis 列表无限增长,YdocManager 实现了一种滚动压缩机制 backend/open_webui/socket/utils.py:152-166

async def _compact_updates_redis(self, document_id: str):
    """滚动压缩:将最旧的一半更新合并为一个快照。"""
    redis_key = f'{self._redis_key_prefix}:{document_id}:updates'
    all_updates = await self._redis.lrange(redis_key, 0, -1)
    if len(all_updates) <= 1:
        return
    mid = len(all_updates) // 2
    ydoc = Y.Doc()
    for raw in all_updates[:mid]:
        ydoc.apply_update(bytes(json.loads(raw)))
    snapshot = json.dumps(list(ydoc.get_update()))
    pipe = self._redis.pipeline()
    pipe.delete(redis_key)
    pipe.rpush(redis_key, snapshot, *all_updates[mid:])
    await pipe.execute()

当更新数量超过 COMPACTION_THRESHOLD(500)时,最旧的一半更新会被合并为一个 Yjs 快照(使用 pycrdtbackend/open_webui/socket/utils.py:125-144

来源:backend/open_webui/socket/utils.py:124-187

---

使用 RedisLock 实现分布式锁

RedisLock 提供分布式互斥锁,用于协调跨实例的独占操作,例如会话池清理 backend/open_webui/socket/utils.py:9-27

实现细节

该锁使用 Redis 的 SET 命令,配合 nx=True(仅当键不存在时设置)和 ex=timeout_secs(过期时间)来确保原子性 backend/open_webui/socket/utils.py:29-32

def aquire_lock(self):
    # nx=True 仅在此键尚未被设置时才会设置它
    self.lock_obtained = self.redis.set(self.lock_name, self.lock_id, nx=True, ex=self.timeout_secs)
    return self.lock_obtained

这在 backend/open_webui/socket/main.py 中被用于防止多个节点在 periodic_session_pool_cleanup 中同时回收孤儿会话 backend/open_webui/socket/main.py:173-194

来源:backend/open_webui/socket/utils.py:9-42backend/open_webui/socket/main.py:136-156backend/open_webui/socket/main.py:173-194

---

SentinelRedisProxy:故障转移处理

SentinelRedisProxy 封装了 Redis Sentinel 连接,并带有针对故障转移场景的自动重试逻辑 backend/open_webui/utils/redis.py:33-35

自动重试包装器

该代理拦截所有 Redis 方法调用,并为同步和异步模式都包装了重试逻辑 backend/open_webui/utils/redis.py:43-49。它专门针对 ConnectionErrorReadOnlyError(当主节点变为副本节点时发生)进行处理 backend/open_webui/utils/redis.py:98-101

async def _wrapped(*args, **kwargs):
    for i in range(REDIS_SENTINEL_MAX_RETRY_COUNT):
        try:
            method = getattr(self._master(), item)
            result = method(*args, **kwargs)
            if inspect.iscoroutine(result):
                return await result
            return result
        except (
            redis.exceptions.ConnectionError,
            redis.exceptions.ReadOnlyError,
        ) as e:
            if i < REDIS_SENTINEL_MAX_RETRY_COUNT - 1:
                # 记录日志并在重试前休眠
                await asyncio.sleep(REDIS_RECONNECT_DELAY / 1000)
                continue
            raise e

来源:backend/open_webui/utils/redis.py:33-150backend/open_webui/test/util/test_redis.py:14-116

---

键命名空间策略

所有 Redis 键都使用可配置的前缀,以支持在单个 Redis 实例上共享多个 Open WebUI 部署。

键模式用途来源
{REDIS_KEY_PREFIX}:models全局模型可用性池backend/open_webui/socket/main.py:116-121
{REDIS_KEY_PREFIX}:session_pool活跃的 Socket.IO 会话数据backend/open_webui/socket/main.py:123-128
{REDIS_KEY_PREFIX}:tasks分布式任务跟踪backend/open_webui/tasks.py:20
{REDIS_KEY_PREFIX}:tasks:commands任务控制的发布/订阅频道backend/open_webui/tasks.py:22
{REDIS_KEY_PREFIX}:ydoc:documentsYjs 文档状态存储backend/open_webui/socket/main.py:167-170
{REDIS_KEY_PREFIX}:ratelimit:{key}速率限制器桶存储backend/open_webui/utils/rate_limit.py:37-38

前缀由 REDIS_KEY_PREFIX 环境变量定义 backend/open_webui/env.py:34

来源:backend/open_webui/env.py:25-41backend/open_webui/tasks.py:20-22backend/open_webui/socket/main.py:116-170