agentic_huge_data_base / wiki
页面 Open WebUI · 15 实时通信·DeepWiki 中文全文译文

15 · 实时通信(Real-time Communication)

多模型对话工作台与知识应用入口 · 本章是 Open WebUI DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Open WebUI 章节15 状态全文译文 模块系统架构、频道、笔记与协作、接口与服务契约、界面与交互
源码线索
  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • src/lib/apis/index.ts
  • src/lib/stores/index.ts
  • backend/open_webui/env.py
模块标签
  • 系统架构
  • 频道、笔记与协作
  • 接口与服务契约
  • 界面与交互
  • 测试、发布与运维

中文译文

实时通信(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/15-real-time-communication
翻译时间:2026-06-09T16:11:34.412Z
翻译模型:deepseek-chat
原文字符数:9676
项目:Open WebUI (open-webui)

---

实时通信

相关源文件

以下文件为本 wiki 页面的生成提供了上下文:

  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • src/lib/apis/index.ts
  • src/lib/stores/index.ts
  • src/routes/+layout.svelte

目的与范围

本文档描述了 Open WebUI 中的实时通信系统。该系统使用 Socket.IO 实现前后端双向通信,并可选配 Redis 以支持分布式部署。系统支持实时聊天流式传输、协作文档编辑、会话管理以及分布式任务协调。

关于聊天消息处理与流式传输,请参见聊天系统。关于后端配置与环境设置,请参见环境配置

架构概览

实时通信层通过 Socket.IO 将 SvelteKit 前端与 FastAPI 后端连接起来,Redis 则为多实例部署提供分布式状态管理。

系统组件
graph TB
    subgraph "前端(浏览器)"
        LayoutComponent["布局组件<br/>+layout.svelte"]
        SocketStore["Socket 存储<br/>stores/index.ts:socket"]
        EventHandlers["事件处理器<br/>chatEventHandler<br/>channelEventHandler"]
    end

    subgraph "后端 Socket.IO 服务器"
        SocketIOServer["Socket.IO 服务器<br/>socket/main.py:sio"]
        ConnectHandler["@sio.event<br/>connect()"]
        UserJoinHandler["@sio.on('user-join')<br/>user_join()"]
        HeartbeatHandler["@sio.on('heartbeat')<br/>heartbeat()"]
        EventEmitter["get_event_emitter()<br/>__event_emitter__"]
        EventCaller["get_event_call()<br/>__event_caller__"]
    end

    subgraph "状态管理"
        SessionPool["SESSION_POOL<br/>RedisDict 或 dict"]
        UsagePool["USAGE_POOL<br/>RedisDict 或 dict"]
        YdocManager["YDOC_MANAGER<br/>YdocManager"]
    end

    subgraph "Redis 后端"
        RedisServer["Redis 服务器<br/>Sentinel 或 Cluster"]
        RedisDict["RedisDict<br/>socket/utils.py"]
        RedisLock["RedisLock<br/>socket/utils.py"]
        RedisPubSub["Redis Pub/Sub<br/>tasks.py"]
    end

    LayoutComponent -->|"io.connect()"| SocketIOServer
    SocketIOServer -->|"emit events"| EventHandlers
    EventHandlers -->|"emit commands"| SocketIOServer

    ConnectHandler --> SessionPool
    UserJoinHandler --> SessionPool
    HeartbeatHandler --> SessionPool

    EventEmitter -->|"emit('events')"| SocketIOServer
    EventCaller -->|"call('events')"| SocketIOServer

    SessionPool -->|"若 WEBSOCKET_MANAGER='redis'"| RedisDict
    UsagePool -->|"若 WEBSOCKET_MANAGER='redis'"| RedisDict
    YdocManager -->|"若 REDIS"| RedisServer

    RedisDict --> RedisServer
    RedisLock --> RedisServer
    RedisPubSub --> RedisServer

来源: backend/open_webui/socket/main.py:63-171backend/open_webui/socket/utils.py:9-131backend/open_webui/tasks.py:25-42

部署模式

系统根据 WEBSOCKET_MANAGER 环境变量支持两种部署模式 backend/open_webui/env.py:28

模式管理器使用场景状态存储
单机None单服务器实例内存中的 Python 字典 backend/open_webui/socket/main.py:158-162
分布式redis多服务器实例Redis 支持的 RedisDictAsyncRedisManager backend/open_webui/socket/main.py:65-84

来源: backend/open_webui/socket/main.py:65-96backend/open_webui/socket/main.py:105-166

详细信息请参见 WebSocket 架构

WebSocket 连接生命周期

连接建立

后端的 connect 处理器通过 JWT 对用户进行身份验证,并在 SESSION_POOL 中初始化其会话 backend/open_webui/socket/main.py:302-359。前端在根布局中使用 socket.io-client 发起连接,并传递身份验证令牌 src/routes/+layout.svelte:116-124

sequenceDiagram
    participant Browser
    participant SocketIO as Socket.IO 服务器
    participant Auth as decode_token()
    participant DB as 用户模型
    participant Redis as SESSION_POOL

    Browser->>SocketIO: io.connect({auth: {token: ...}})
    activate SocketIO
    SocketIO->>SocketIO: @sio.event connect()
    SocketIO->>Auth: decode_token(token)
    Auth-->>SocketIO: {id: user_id}
    SocketIO->>DB: get_user_by_id(user_id)
    DB-->>SocketIO: 用户对象
    SocketIO->>Redis: SESSION_POOL[sid] = user
    SocketIO->>SocketIO: enter_room(sid, "user:{user_id}")

    SocketIO-->>Browser: 连接已建立
    deactivate SocketIO

来源: src/routes/+layout.svelte:116-124backend/open_webui/socket/main.py:302-359backend/open_webui/utils/auth.py:42-42

房间管理

客户端被组织到不同的房间中,以实现定向事件广播。这一过程在 connectuser-join 事件中处理 backend/open_webui/socket/main.py:315-349

房间模式用途成员
user:{user_id}用户特定事件属于特定用户的所有会话
channel:{channel_id}群组消息特定频道的所有成员
doc_{document_id}协同编辑当前正在编辑文档的所有用户

来源: backend/open_webui/socket/main.py:315-349

事件处理系统

系统使用统一的 eventsevents:channel 通道来路由各种实时更新。

后端事件发射

后端提供了 get_event_emitter 工厂函数,用于简化向特定用户发送更新的操作,常用于长时间运行的 LLM 生成过程中 backend/open_webui/socket/main.py:695-813

sequenceDiagram
    participant Logic as 后端逻辑
    participant Factory as get_event_emitter()
    participant Emitter as __event_emitter__()
    participant SocketIO as sio.emit()

    Logic->>Factory: get_event_emitter(request_info)
    Factory-->>Logic: __event_emitter__ 函数

    Logic->>Emitter: {type: "status", data: "处理中..."}
    Emitter->>SocketIO: emit('events', data, room="user:{user_id}")

来源: backend/open_webui/socket/main.py:695-813

事件类型

系统处理以下几类事件:

  • 聊天事件chat:completionchat:titlechat:tags
  • 执行事件execute:pythonexecute:tool
  • 频道事件typingmessagechannel:created
  • Yjs 事件ydoc:document:updateydoc:awareness:update

详细信息请参见事件处理系统

多节点同步

在分布式环境中,Redis 充当会话数据和任务协调的唯一真实来源。

分布式任务管理

当用户请求停止生成任务时,该命令通过 Redis Pub/Sub 广播到所有节点 backend/open_webui/tasks.py:22-42create_task 函数用于注册任务 backend/open_webui/tasks.py:104-124,而 stop_task 则使用 Redis Pub/Sub 跨实例发送取消信号 backend/open_webui/tasks.py:145-163

graph LR
    subgraph "节点 A"
        API["API: stop_task(task_id)"]
        LocalTasksA["本地 tasks{}"]
    end

    subgraph "Redis"
        PubSub["频道: tasks:commands"]
    end

    subgraph "节点 B"
        Listener["redis_task_command_listener()"]
        LocalTasksB["本地 tasks{task_id: Task}"]
    end

    API -->|"redis_send_command()"| PubSub
    PubSub --> Listener
    Listener -->|"task.cancel()"| LocalTasksB

来源: backend/open_webui/tasks.py:25-42backend/open_webui/tasks.py:145-174

使用 RedisDict 共享状态

RedisDict 工具允许不同的服务器实例像访问本地 Python 字典一样访问共享的 SESSION_POOLUSAGE_POOL backend/open_webui/socket/utils.py:44-122。这确保了分布式设置中所有节点状态的一致性。periodic_session_pool_cleanup 使用分布式锁定期清理孤立会话 backend/open_webui/socket/main.py:173-193

来源: backend/open_webui/socket/utils.py:44-122backend/open_webui/socket/main.py:173-193

详细信息请参见多节点同步

协同编辑(Yjs)

Open WebUI 使用 Yjs 实现笔记和文档的无冲突协同编辑。YdocManager 负责将这些文档更新持久化到 Redis 中 backend/open_webui/socket/utils.py:124-136

  • 存储:更新存储在 Redis 列表(RPUSH)中,并定期压缩为单个快照,以优化内存和加载时间 backend/open_webui/socket/utils.py:137-145
  • 压缩:当文档达到 COMPACTION_THRESHOLD(500 次更新)时,_compact_updates_redis 方法会压缩最旧的一半更新 backend/open_webui/socket/utils.py:152-166

来源: backend/open_webui/socket/utils.py:124-176backend/open_webui/socket/main.py:448-510