agentic_huge_data_base / wiki
页面 Onyx · 4.2 消息处理流程·DeepWiki 中文全文译文

4.2 · 消息处理流程(Message Processing Flow)

企业连接器与统一搜索 · 聚焦本章的模块关系、源码依据与实现要点。

项目Onyx 章节4.2 状态全文译文 模块智能体运行时、测试、发布与运维、记忆与上下文、检索、召回与索引
源码线索
  • backend/alembic/versions/f3c9e59c3b07_seed_coding_agent_tool.py
  • backend/onyx/chat/chat_state.py
  • backend/onyx/chat/chat_utils.py
  • backend/onyx/chat/llm_loop.py
  • backend/onyx/chat/llm_step.py
  • backend/onyx/chat/models.py
  • backend/onyx/chat/process_message.py
  • backend/onyx/chat/prompt_utils.py
  • backend/onyx/context/search/models.py
  • backend/onyx/context/search/pipeline.py
模块标签
  • 智能体运行时
  • 测试、发布与运维
  • 记忆与上下文
  • 检索、召回与索引
  • 接口与服务契约

章节正文

消息处理流程

消息处理流程

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • backend/alembic/versions/f3c9e59c3b07_seed_coding_agent_tool.py
  • backend/onyx/chat/chat_state.py
  • backend/onyx/chat/chat_utils.py
  • backend/onyx/chat/llm_loop.py
  • backend/onyx/chat/llm_step.py
  • backend/onyx/chat/models.py
  • backend/onyx/chat/process_message.py
  • backend/onyx/chat/prompt_utils.py
  • backend/onyx/context/search/models.py
  • backend/onyx/context/search/pipeline.py
  • backend/onyx/db/chat.py
  • backend/onyx/deep_research/dr_loop.py
  • backend/onyx/prompts/chat_prompts.py
  • backend/onyx/prompts/deep_research/__init__.py
  • backend/onyx/prompts/deep_research/dr_tool_prompts.py
  • backend/onyx/prompts/deep_research/orchestration_layer.py
  • backend/onyx/prompts/deep_research/research_agent.py
  • backend/onyx/prompts/prompt_utils.py
  • backend/onyx/prompts/tool_prompts.py
  • backend/onyx/server/query_and_chat/chat_backend.py
  • backend/onyx/server/query_and_chat/models.py
  • backend/onyx/server/query_and_chat/session_loading.py
  • backend/onyx/tools/built_in_tools.py
  • backend/onyx/tools/constants.py
  • backend/onyx/tools/fake_tools/research_agent.py
  • backend/onyx/tools/models.py
  • backend/onyx/tools/tool_constructor.py
  • backend/onyx/tools/tool_implementations/coding_agent/__init__.py
  • backend/onyx/tools/tool_implementations/coding_agent/coding_agent_tool.py
  • backend/onyx/tools/tool_implementations/search/search_tool.py
  • backend/onyx/tools/tool_runner.py
  • backend/tests/integration/common_utils/managers/chat.py
  • backend/tests/integration/common_utils/managers/tool.py
  • backend/tests/integration/common_utils/test_models.py
  • backend/tests/integration/tests/chat/test_chat_deletion.py
  • backend/tests/integration/tests/migrations/test_tool_seeding.py
  • backend/tests/integration/tests/streaming_endpoints/test_chat_stream.py
  • backend/tests/unit/onyx/chat/test_llm_loop.py
  • backend/tests/unit/onyx/chat/test_llm_step.py
  • web/src/app/app/components/tools/constants.ts
  • web/src/app/app/services/actionUtils.ts

目的与范围

本文档描述了 Onyx 聊天系统中完整的消息处理管线,从用户发送消息开始,到 AI 响应流式返回并持久化存储结束。内容涵盖通过 handle_stream_message_objects 的核心流程、通过 run_llm_looprun_llm_step 的大语言模型(LLM)编排、通过 run_tool_calls 的工具执行、通过 DynamicCitationProcessor 的引用处理,以及 ChatStateContainer 中的状态累积。

关于整体聊天架构和数据模型,请参见聊天架构与数据模型。关于文档检索和引用的详细信息,请参见检索与引用。关于深度研究模式的详细信息,请参见深度研究模式。关于大语言模型(LLM)提供商的配置,请参见提供商配置

高层消息处理流程

消息处理流程从用户通过聊天界面提交 SendMessageRequest backend/onyx/server/query_and_chat/models.py:115-115 开始,到通过 save_chat_turn backend/onyx/chat/save_chat.py:54-120 将完整的 AI 响应流式返回并持久化到数据库结束。

标题:消息执行序列

Onyx · 高层消息处理流程 · 图 1
Onyx · 高层消息处理流程 · 图 1

来源: backend/onyx/chat/process_message.py:413-564 backend/onyx/chat/llm_loop.py:513-650 backend/onyx/chat/llm_step.py:15-61 backend/onyx/chat/save_chat.py:54-120

入口点:handle_stream_message_objects

process_message.py 中的 handle_stream_message_objects 函数是单个聊天轮次的主要编排器。它由 chat_backend.py 中的 FastAPI 路由器调用 backend/onyx/server/query_and_chat/chat_backend.py:33-33

函数签名:

def handle_stream_message_objects(
    new_msg_req: SendMessageRequest,
    user: User,
    db_session: Session,
    ...
) -> AnswerStream:
主要职责:
  1. 会话校验:使用 get_chat_session_by_id 检索 ChatSession 及其关联的 Persona backend/onyx/chat/process_message.py:456-465
  2. 消息树插入:使用 _initialize_chat_session 创建用户消息,并使用 reserve_message_id 预分配助手消息 ID backend/onyx/chat/process_message.py:535-564
  3. 上下文准备
  • 通过 load_all_chat_files 加载文件内容 backend/onyx/chat/process_message.py:615-620
  • 通过 get_memories 检索用户记忆 backend/onyx/db/memory.py:75-75
  • 使用 construct_tools 构建可用工具 backend/onyx/tools/tool_constructor.py:114-146
  1. 循环编排:将迭代的大语言模型(LLM)-工具交互委托给 run_llm_loop backend/onyx/chat/llm_loop.py:513-513,如果启用了深度研究模式,则委托给 run_deep_research_llm_loop backend/onyx/chat/process_message.py:774-793

来源: backend/onyx/chat/process_message.py:413-896 backend/onyx/server/query_and_chat/chat_backend.py:33-33 backend/onyx/tools/tool_constructor.py:114-146

大语言模型(LLM)循环编排(run_llm_loop

run_llm_loop 函数实现了一个多周期的智能体模式,允许模型调用工具并优化其回答,最多可达由 MAX_LLM_CYCLES 定义的硬编码限制(通常为 6 次)backend/onyx/configs/chat_configs.py:27-27

循环逻辑:
  • 工具选择策略:在早期周期中,tool_choice 设置为 AUTO。如果提供了 forced_tool_id,则在第一个周期中将其设置为 REQUIRED backend/onyx/chat/llm_loop.py:547-564
  • 历史记录构建:每个周期调用 construct_message_history,将新的工具输出包含到上下文窗口中 backend/onyx/chat/llm_loop.py:573-584
  • 步骤执行:调用 run_llm_step 来处理实际的大语言模型(LLM)网络请求和流式传输 backend/onyx/chat/llm_step.py:15-16
  • 工具执行:如果 LlmStepResult 包含工具调用,则触发 run_tool_calls backend/onyx/tools/tool_runner.py:65-65
消息历史记录策略(construct_message_history

此函数 backend/onyx/chat/llm_loop.py:240-453 通过按特定优先级组装提示来管理 Token 预算:

  1. 系统提示:角色的基础指令 backend/onyx/chat/prompt_utils.py:24-24
  2. 自定义智能体提示:助手的特定指令 backend/onyx/chat/chat_utils.py:30-30
  3. 项目/上下文文件:通过 build_file_context 注入的文档内容 backend/onyx/chat/chat_utils.py:67-111
  4. 聊天历史记录:使用 compress_chat_history 从最早的消息开始截断,以适应 available_tokens backend/onyx/chat/compression.py:34-34

来源: backend/onyx/chat/llm_loop.py:513-831 backend/onyx/chat/llm_loop.py:240-453 backend/onyx/chat/compression.py:33-36

工具执行(run_tool_calls

工具使用 ThreadPoolExecutor 并行执行,以最小化延迟。

标题:工具执行逻辑

Onyx · 工具执行( run_tool_calls ) · 图 2
Onyx · 工具执行( run_tool_calls ) · 图 2
  • 合并:对同一工具的多次调用(例如,多次搜索查询)会合并到单个执行上下文中 backend/onyx/tools/tool_runner.py:180-200
  • 覆盖关键字参数:像 SearchTool 这样的工具会接收动态配置,例如 starting_citation_num,以确保跨周期的引用不重叠 backend/onyx/tools/models.py:169-186

来源: backend/onyx/tools/tool_runner.py:65-330 backend/onyx/tools/tool_implementations/search/search_tool.py:37-130 backend/onyx/tools/models.py:83-110

大语言模型(LLM)步骤与引用处理

run_llm_step backend/onyx/chat/llm_step.py:15-16 负责将大语言模型(LLM)流转换为 Onyx 特定的数据包。

动态引用处理

DynamicCitationProcessor backend/onyx/chat/citation_processor.py:14-14 监控流中是否存在像 [1][2, 3][[4]] 这样的模式。

  • 提取:它从文本流中提取文档引用 backend/onyx/chat/llm_step.py:633-660
  • 映射:它维护一个 citation_to_doc 映射,将数字链接到实际的 SearchDoc 元数据 backend/onyx/chat/citation_processor.py:45-60
  • 发送:对于每个检测到的引用,它会向前端生成一个 CitationInfo 数据包 backend/onyx/server/query_and_chat/streaming_models.py:119-119
Token 处理

如果使用了 think_tool(常见于推理模型),则 custom_token_processor 会将基于工具的推理转换为 ReasoningDelta 数据包 backend/onyx/chat/llm_step.py:52-52

来源: backend/onyx/chat/llm_step.py:531-871 backend/onyx/chat/citation_processor.py:45-350 backend/onyx/server/query_and_chat/streaming_models.py:52-54

完成与持久化

一旦循环终止(无论是通过最终回答还是达到 MAX_LLM_CYCLES),状态就会被持久化。

save_chat_turn 实现

此函数 backend/onyx/chat/save_chat.py:54-120 将内存中的 ChatStateContainer 与数据库同步:

  1. 助手消息:使用最终的 answer_tokensreasoning_tokens 更新预留的 ChatMessage backend/onyx/db/models.py:27-27 backend/onyx/chat/save_chat.py:128-150
  2. 引用:将引用映射作为 JSON 字段存储在消息记录中 backend/onyx/chat/save_chat.py:110-120
  3. 工具调用记录:为在该轮次中执行的每个工具创建 ToolCall 数据库条目 backend/onyx/db/models.py:33-33,并将它们链接到助手消息 backend/onyx/chat/save_chat.py:160-180

来源: backend/onyx/chat/save_chat.py:54-200 backend/onyx/db/chat.py:50-91 backend/onyx/db/models.py:27-33

状态管理(ChatStateContainer

ChatStateContainer backend/onyx/chat/chat_state.py:24-24 是一个线程安全对象,用于在异步流式处理过程中累积结果。

标题:聊天状态数据流

Onyx · 状态管理( ChatStateContainer ) · 图 3
Onyx · 状态管理( ChatStateContainer ) · 图 3

它确保即使流被中断,系统也能恢复部分推理或工具输出,以便保存到数据库 backend/onyx/chat/chat_state.py:24-150

来源:

  • backend/onyx/chat/chat_state.py:24-150
  • backend/onyx/chat/process_message.py:734-760
  • backend/onyx/tools/models.py:56-66