agentic_huge_data_base / wiki
页面 RAGFlow · 11.4 迁移与实用工具·DeepWiki 中文全文译文

11.4 · 迁移与实用工具(Migration and Utility Tools)

复杂文档理解与引用检索 · 聚焦本章的模块关系、源码依据与实现要点。

项目RAGFlow 章节11.4 状态全文译文 模块测试、发布与运维、配置治理、检索、召回与索引、接口与服务契约
源码线索
  • api/db/services/evaluation_service.py
  • api/db/services/langfuse_service.py
  • api/db/services/pipeline_operation_log_service.py
  • api/db/services/search_service.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/settings.py
  • memory/utils/aggregation_utils.py
  • memory/utils/highlight_utils.py
  • memory/utils/ob_conn.py
模块标签
  • 测试、发布与运维
  • 配置治理
  • 检索、召回与索引
  • 接口与服务契约
  • 界面与交互

章节正文

迁移与实用工具

迁移与实用工具

相关源文件

本章引用的主要源码文件:

  • api/db/services/evaluation_service.py
  • api/db/services/langfuse_service.py
  • api/db/services/pipeline_operation_log_service.py
  • api/db/services/search_service.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/settings.py
  • memory/utils/aggregation_utils.py
  • memory/utils/highlight_utils.py
  • memory/utils/ob_conn.py
  • rag/utils/ob_conn.py
  • test/unit_test/memory/utils/test_ob_conn_aggregation.py
  • test/unit_test/memory/utils/test_ob_conn_highlight.py

本文档记录了 RAGFlow 代码库中提供的操作实用工具、数据同步工具和数据库迁移框架。这些工具有助于系统维护、存储引擎之间的数据迁移(特别是从 Elasticsearch 迁移到 OceanBase/Infinity)以及管理操作。

数据存储连接与迁移

RAGFlow 实现了多语言持久化策略,抽象了对多个文档引擎(Elasticsearch、Infinity、OpenSearch 和 OceanBase)的连接。系统提供了专门的连接类,用于处理模式映射、全文搜索(FTS)和向量操作。

OceanBase(OB)集成与模式

OceanBase 连接器(rag/utils/ob_conn.py)为 RAG 片段提供了高性能实现。它定义了一个结构化的模式,包含 JSON 元数据、用于全文搜索的 Token 化文本以及向量列。连接通过单例池(common/doc_store/ob_conn_pool.py)进行管理,该池确保 OceanBase 集群版本至少为 4.3.5.1 common/doc_store/ob_conn_pool.py:114-117

代码实体角色文件引用
OBConnection管理 OceanBase 生命周期和 CRUD 操作的单例。rag/utils/ob_conn.py:173-173
column_definitions定义 RAG 片段表模式的 SQLAlchemy Column 对象列表。rag/utils/ob_conn.py:52-99
EXTRA_COLUMNS为迁移支持和扩展解析而添加的列(_order_idgroup_idmom_idchunk_data)。rag/utils/ob_conn.py:134-141
FTS_COLUMNS_TKS基于权重的 Token 化全文搜索配置(例如 title_tks^10)。rag/utils/ob_conn.py:124-131
OceanBaseConnectionPool使用连接池管理 ObVecClientHybridSearch 实例。common/doc_store/ob_conn_pool.py:35-37

OceanBase 连接架构

RAGFlow · OceanBase(OB)集成与模式 · 图 1
RAGFlow · OceanBase(OB)集成与模式 · 图 1

来源:rag/utils/ob_conn.py:36-110rag/utils/ob_conn.py:149-160common/doc_store/ob_conn_pool.py:35-100common/doc_store/ob_conn_base.py:97-118

Infinity 与 Elasticsearch 连接

对于高速检索,RAGFlow 使用 InfinityConnection。与使用统一索引的 Elasticsearch 不同,Infinity 通常按 kb_id 进行表分离。

  • Infinity 映射:定义在 conf/infinity_mapping.json 中,它将 content 等字段映射到多个分析器(rag-coarserag-fine),并为 kb_id 等字段定义 secondary 等索引类型。
  • 字段转换InfinityConnection 将 RAGFlow 内部字段名(例如 docnm_kwd)转换为 Infinity 特定的全文索引(例如 docnm@ft_docnm_rag_coarse)。
  • ES Search AfterESConnection 实现了 _search_with_search_after,通过使用 search_after 令牌迭代批次来处理超出 MAX_RESULT_WINDOW(10,000 条命中)的深度分页。
  • OpenSearch 支持OSConnection 为 OpenSearch 提供了类似的抽象,要求版本 2.x 或更高,并使用 conf/os_mapping.json 中定义的自定义映射。

来源:common/settings.py:85-91common/doc_store/ob_conn_base.py:124-128

内存系统实用工具

内存系统使用专门的连接包装器,用于在不同文档引擎中存储代理对话历史和提取的语义内存。它通过 memory/utils/ob_conn.py 支持 OceanBase,该文件实现了专门的高亮显示和聚合逻辑。

内存存储映射

内存系统使用的映射模式与标准 RAG 片段不同。它使用 map_message_to_ob_fields memory/utils/ob_conn.py:138-161 将消息字段映射到引擎特定的存储字段。

内存字段存储字段描述
message_typemessage_type_kwd内存类型(RAW、语义等)
statusstatus_int布尔可用性标志(1=活跃,0=不活跃)memory/utils/ob_conn.py:49-49
contentcontent_ltks实际文本内容
content_embedq_{dim}_vec向量嵌入字段 memory/utils/ob_conn.py:160-160

内存服务数据流

RAGFlow · 内存存储映射 · 图 2
RAGFlow · 内存存储映射 · 图 2

来源:memory/utils/ob_conn.py:36-52memory/utils/ob_conn.py:138-161memory/utils/highlight_utils.py:68-89memory/utils/aggregation_utils.py:20-56

管线与评估实用工具

操作日志

PipelineOperationLogService 管理与文档处理和基于图的任务相关的日志持久化。它跟踪各种任务类型的进度、持续时间和状态,包括 GRAPH_RAGRAPTORMINDMAP api/db/services/pipeline_operation_log_service.py:140-142

  • 日志创建create 方法从 DocumentTask 实体中提取元数据,以构建全面的操作日志 api/db/services/pipeline_operation_log_service.py:115-123
  • 进度跟踪:对于异步任务,它查询 TaskService 以将进度信息同步到日志中 api/db/services/pipeline_operation_log_service.py:142-151
评估框架

EvaluationService 提供 RAG 性能基准测试的后端逻辑 api/db/services/evaluation_service.py:18-26

  • 数据集管理:支持创建和列出与特定知识库关联的评估数据集 api/db/services/evaluation_service.py:52-85
  • 测试用例管理:允许添加结构化的测试用例,包括问题、参考答案和相关片段 ID api/db/services/evaluation_service.py:150-185

来源:api/db/services/pipeline_operation_log_service.py:35-170api/db/services/evaluation_service.py:44-190

存储与系统初始化

多引擎存储连接

RAGFlow 通过 StorageFactory 为多个对象存储后端提供统一接口。这种抽象允许系统根据 STORAGE_IMPL 环境变量支持多种云和本地存储解决方案 common/settings.py:133-134

提供商实现类配置来源文件引用
MinIORAGFlowMiniosettings.MINIOcommon/settings.py:183-183
AWS S3RAGFlowS3settings.S3common/settings.py:186-186
阿里云 OSSRAGFlowOSSsettings.OSScommon/settings.py:187-187
Google CloudRAGFlowGCSsettings.GCScommon/settings.py:189-189
Azure (SPN)RAGFlowAzureSpnBlobsettings.AZUREcommon/settings.py:184-184
OpenDALOpenDALStorage-common/settings.py:188-188

来源:common/settings.py:181-195

操作与构建实用工具

配置实用工具

系统使用 common/settings.py 来初始化全局状态。它解密数据库配置 common/settings.py:74-74,通过 Redis 管理分布式 SECRET_KEY common/settings.py:176-176,并从 conf/llm_factories.json 加载大语言模型(LLM)工厂信息 common/settings.py:228-229

监控与可观测性
  • Langfuse 集成TenantLangfuseService 在租户级别管理用于 Langfuse 追踪的 API 密钥(公钥/密钥)和主机配置 api/db/services/langfuse_service.py:26-40
  • 搜索分析SearchService 跟踪搜索配置和描述,允许租户管理其检索历史和配置 api/db/services/search_service.py:26-53

来源:common/settings.py:144-180common/settings.py:197-231api/db/services/langfuse_service.py:34-63api/db/services/search_service.py:57-79