实时通信架构(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/2.5-real-time-communication-architecture
翻译时间:2026-06-09T16:07:41.330Z
翻译模型:deepseek-chat
原文字符数:10255
项目:Open WebUI (open-webui)
---
实时通信架构
相关源文件
本 wiki 页面基于以下文件生成:
backend/open_webui/socket/main.pybackend/open_webui/socket/utils.pybackend/open_webui/tasks.pybackend/open_webui/test/util/test_redis.pybackend/open_webui/utils/rate_limit.pybackend/open_webui/utils/redis.pysrc/lib/apis/index.tssrc/lib/stores/index.tssrc/routes/+layout.svelte
目的与范围
本文档描述了 Open WebUI 中的实时通信基础设施,该设施支持跨分布式服务器部署的实时更新、协作编辑和多用户交互。系统基于 Socket.IO 构建,并采用 Redis 进行状态管理。
有关更广泛的后端架构信息,请参阅后端架构。有关数据存储层的详细信息,请参阅数据与存储层。
概述
Open WebUI 实现了一个双向实时通信系统,支持以下功能:
- 流式 AI 响应:在聊天补全过程中实时输出。
- 实时频道消息:包含输入状态指示和已读回执。
- 协作文档编辑:基于 CRDT(Yjs)实现。
- 多实例同步:通过 Redis pub/sub 实现。
- 分布式会话管理:跨服务器副本管理会话。
该架构由三个主要层组成:
- 传输层:Socket.IO 服务器,支持 WebSocket 和 HTTP 长轮询回退。
- 状态层:基于 Redis 的分布式数据结构。
- 事件层:针对不同功能的类型化事件处理器。
核心组件架构
下图展示了前端组件与后端实体及存储机制之间的关联。
graph TB
subgraph "前端 (SvelteKit)"
LayoutSvelte["src/routes/+layout.svelte<br/>setupSocket()"]
SocketStore["src/lib/stores/index.ts<br/>socket: Writable<Socket>"]
end
subgraph "后端 Socket.IO 服务器"
SocketMain["backend/open_webui/socket/main.py<br/>sio: AsyncServer"]
EventHandlers["事件处理器<br/>on('connect')<br/>on('user-join')<br/>on('heartbeat')"]
end
subgraph "Redis 状态管理"
SessionPool["SESSION_POOL<br/>RedisDict<br/>sid → 用户数据"]
UsagePool["USAGE_POOL<br/>RedisDict<br/>model_id → 连接数"]
YdocManager["YDOC_MANAGER<br/>YdocManager<br/>文档状态"]
AsyncRedisManager["socketio.AsyncRedisManager<br/>跨实例 pub/sub"]
end
subgraph "持久化 (SQL/Redis)"
ChatsDB["open_webui.models.chats.Chats"]
NotesDB["open_webui.models.notes.Notes"]
ChannelsDB["open_webui.models.channels.Channels"]
end
LayoutSvelte -->|"io() 连接"| SocketMain
LayoutSvelte -->|"存储 socket"| SocketStore
SocketMain -->|"管理"| EventHandlers
EventHandlers -->|"读写"| SessionPool
EventHandlers -->|"读写"| UsagePool
EventHandlers -->|"读写"| YdocManager
SocketMain -->|"使用"| AsyncRedisManager
AsyncRedisManager -->|"pub/sub"| SessionPool
AsyncRedisManager -->|"pub/sub"| UsagePool
AsyncRedisManager -->|"pub/sub"| YdocManager
EventHandlers -.->|"更新"| ChatsDB
EventHandlers -.->|"更新"| NotesDB
EventHandlers -.->|"更新"| ChannelsDB
来源:backend/open_webui/socket/main.py:65-170、src/lib/stores/index.ts:31-34、src/routes/+layout.svelte:115-125、backend/open_webui/models/chats.py:14-14、backend/open_webui/models/notes.py:15-15、backend/open_webui/models/channels.py:13-13
Socket.IO 服务器配置
Socket.IO 服务器在 backend/open_webui/socket/main.py backend/open_webui/socket/main.py:73-96 中初始化,并根据 WEBSOCKET_MANAGER 环境变量 backend/open_webui/env.py:28-28 支持两种部署模式。
单机模式
对于单实例部署,服务器使用内存管理器 backend/open_webui/socket/main.py:86-96。
Redis 支持模式
对于分布式部署,使用 socketio.AsyncRedisManager 在多个节点间同步事件 backend/open_webui/socket/main.py:65-84。此模式支持 Redis Sentinel 和 Redis Cluster 配置 backend/open_webui/socket/main.py:66-72。
| 环境变量 | 用途 | 默认值 |
|---|---|---|
WEBSOCKET_MANAGER | 设置为 "redis" 以启用分布式模式 | None |
ENABLE_WEBSOCKET_SUPPORT | 启用 WebSocket 传输 | True |
WEBSOCKET_SERVER_PING_INTERVAL | 心跳间隔(秒) | 25 |
WEBSOCKET_SERVER_PING_TIMEOUT | 连接超时(秒) | 60 |
WEBSOCKET_REDIS_URL | Redis 连接 URL | redis://localhost:6379/0 |
来源:backend/open_webui/socket/main.py:65-96、backend/open_webui/env.py:27-41
连接管理
后端连接处理
当客户端连接时,服务器执行身份验证并填充 SESSION_POOL backend/open_webui/socket/main.py:314-370。前端通过根布局中的 setupSocket 发起连接,并在 auth 负载中发送 JWT 令牌 src/routes/+layout.svelte:115-124。
sequenceDiagram
participant Client as "浏览器客户端"
participant SocketIO as "sio: AsyncServer"
participant Auth as "decode_token()"
participant SessionPool as "SESSION_POOL (RedisDict)"
participant DB as "用户模型"
Client->>SocketIO: connect(auth={token})
SocketIO->>Auth: decode_token(token)
Auth-->>SocketIO: user_id
SocketIO->>DB: Users.get_user_by_id(user_id)
DB-->>SocketIO: user_model
SocketIO->>SessionPool: __setitem__(sid, user_data)
SocketIO->>SocketIO: enter_room(sid, f"user:{user_id}")
Note over Client, SocketIO: 每 30 秒心跳
Client->>SocketIO: emit('heartbeat')
来源:backend/open_webui/socket/main.py:314-370、src/routes/+layout.svelte:158-164、backend/open_webui/socket/utils.py:54-56、backend/open_webui/models/users.py:12-12
分布式会话池
SESSION_POOL 是 RedisDict 的一个实例,它封装了 Redis 哈希操作(hset、hget),为分布式状态提供了类似字典的接口 backend/open_webui/socket/utils.py:44-52。
过期的会话由 periodic_session_pool_cleanup 清理,该函数使用 RedisLock 确保只有一个集群节点执行清理操作 backend/open_webui/socket/main.py:173-194。SESSION_POOL_TIMEOUT 设置为 120 秒 backend/open_webui/socket/main.py:101-101。
事件系统架构
系统针对不同的功能集使用特定的频道。
事件类型与路由
| 频道 | 处理器(后端) | 用途 |
|---|---|---|
events | __event_emitter__ | 聊天补全和状态更新 backend/open_webui/socket/main.py:710-724。 |
events:channel | channel_events | 实时消息、输入状态指示 backend/open_webui/socket/main.py:438-471。 |
ydoc:document:* | ydoc_document_join | 使用 Yjs 进行协作笔记编辑 backend/open_webui/socket/main.py:473-500。 |
usage | usage | 跟踪活跃的模型连接 backend/open_webui/socket/main.py:300-312。 |
协作编辑(Yjs)
YdocManager 负责在 Redis 中持久化二进制 CRDT 更新 backend/open_webui/socket/utils.py:124-136。它包含一个滚动压缩机制,当更新数量超过 COMPACTION_THRESHOLD(500)时,会压缩更新 backend/open_webui/socket/utils.py:125-125、backend/open_webui/socket/utils.py:152-166。
graph LR
subgraph "Yjs 事件流"
UpdateEvent["'ydoc:document:update'"]
YdocMgr["YdocManager.append_to_updates()"]
RedisList["Redis: rpush updates"]
Compaction["_compact_updates_redis()"]
end
UpdateEvent --> YdocMgr
YdocMgr --> RedisList
RedisList -->|达到阈值| Compaction
来源:backend/open_webui/socket/utils.py:124-166、backend/open_webui/socket/main.py:616-660
多节点同步
分布式任务协调
Open WebUI 使用 Redis Pub/Sub 频道 REDIS_PUBSUB_CHANNEL 在节点间管理异步任务(如长时间运行的 AI 生成)backend/open_webui/tasks.py:22-22。
redis_task_command_listener 监听 "stop" 命令,如果 task_id 存在于本地节点,则取消本地的 asyncio.Task 实例 backend/open_webui/tasks.py:25-42。
Redis Sentinel 支持
对于高可用环境,SentinelRedisProxy 提供了对 Redis Sentinel 的透明封装,在同步和异步模式下均实现了自动故障转移和重试 backend/open_webui/utils/redis.py:31-147。它会拦截对主节点的调用,并最多重试 REDIS_SENTINEL_MAX_RETRY_COUNT 次 backend/open_webui/utils/redis.py:89-115。
来源:backend/open_webui/utils/redis.py:31-147、backend/open_webui/tasks.py:25-42
性能与扩展
速率限制
RateLimiter 使用 Redis 桶实现了滚动窗口策略 backend/open_webui/utils/rate_limit.py:6-10。它在桶键上使用 incr 和 expire backend/open_webui/utils/rate_limit.py:82-84,并使用 mget 聚合窗口内的计数 backend/open_webui/utils/rate_limit.py:89-90。
清理机制
- 使用池:
periodic_usage_pool_cleanup定期运行,从USAGE_POOL中移除不活跃的模型连接backend/open_webui/socket/main.py:196-215。 - 会话池:
periodic_session_pool_cleanup在SESSION_POOL_TIMEOUT后清理孤立的会话backend/open_webui/socket/main.py:173-194。
来源:backend/open_webui/socket/main.py:101-101、backend/open_webui/socket/main.py:173-215、backend/open_webui/utils/rate_limit.py:6-92