迁移与实用工具
迁移与实用工具
相关源文件
本章引用的主要源码文件:
api/db/services/evaluation_service.pyapi/db/services/langfuse_service.pyapi/db/services/pipeline_operation_log_service.pyapi/db/services/search_service.pycommon/doc_store/ob_conn_base.pycommon/doc_store/ob_conn_pool.pycommon/settings.pymemory/utils/aggregation_utils.pymemory/utils/highlight_utils.pymemory/utils/ob_conn.pyrag/utils/ob_conn.pytest/unit_test/memory/utils/test_ob_conn_aggregation.pytest/unit_test/memory/utils/test_ob_conn_highlight.py
本文档记录了 RAGFlow 代码库中提供的操作实用工具、数据同步工具和数据库迁移框架。这些工具有助于系统维护、存储引擎之间的数据迁移(特别是从 Elasticsearch 迁移到 OceanBase/Infinity)以及管理操作。
数据存储连接与迁移
RAGFlow 实现了多语言持久化策略,抽象了对多个文档引擎(Elasticsearch、Infinity、OpenSearch 和 OceanBase)的连接。系统提供了专门的连接类,用于处理模式映射、全文搜索(FTS)和向量操作。
OceanBase(OB)集成与模式
OceanBase 连接器(rag/utils/ob_conn.py)为 RAG 片段提供了高性能实现。它定义了一个结构化的模式,包含 JSON 元数据、用于全文搜索的 Token 化文本以及向量列。连接通过单例池(common/doc_store/ob_conn_pool.py)进行管理,该池确保 OceanBase 集群版本至少为 4.3.5.1 common/doc_store/ob_conn_pool.py:114-117。
| 代码实体 | 角色 | 文件引用 |
|---|---|---|
OBConnection | 管理 OceanBase 生命周期和 CRUD 操作的单例。 | rag/utils/ob_conn.py:173-173 |
column_definitions | 定义 RAG 片段表模式的 SQLAlchemy Column 对象列表。 | rag/utils/ob_conn.py:52-99 |
EXTRA_COLUMNS | 为迁移支持和扩展解析而添加的列(_order_id、group_id、mom_id、chunk_data)。 | rag/utils/ob_conn.py:134-141 |
FTS_COLUMNS_TKS | 基于权重的 Token 化全文搜索配置(例如 title_tks^10)。 | rag/utils/ob_conn.py:124-131 |
OceanBaseConnectionPool | 使用连接池管理 ObVecClient 和 HybridSearch 实例。 | common/doc_store/ob_conn_pool.py:35-37 |
OceanBase 连接架构
来源:rag/utils/ob_conn.py:36-110、rag/utils/ob_conn.py:149-160、common/doc_store/ob_conn_pool.py:35-100、common/doc_store/ob_conn_base.py:97-118
Infinity 与 Elasticsearch 连接
对于高速检索,RAGFlow 使用 InfinityConnection。与使用统一索引的 Elasticsearch 不同,Infinity 通常按 kb_id 进行表分离。
- Infinity 映射:定义在
conf/infinity_mapping.json中,它将content等字段映射到多个分析器(rag-coarse、rag-fine),并为kb_id等字段定义secondary等索引类型。 - 字段转换:
InfinityConnection将 RAGFlow 内部字段名(例如docnm_kwd)转换为 Infinity 特定的全文索引(例如docnm@ft_docnm_rag_coarse)。 - ES Search After:
ESConnection实现了_search_with_search_after,通过使用search_after令牌迭代批次来处理超出MAX_RESULT_WINDOW(10,000 条命中)的深度分页。 - OpenSearch 支持:
OSConnection为 OpenSearch 提供了类似的抽象,要求版本 2.x 或更高,并使用conf/os_mapping.json中定义的自定义映射。
来源:common/settings.py:85-91、common/doc_store/ob_conn_base.py:124-128
内存系统实用工具
内存系统使用专门的连接包装器,用于在不同文档引擎中存储代理对话历史和提取的语义内存。它通过 memory/utils/ob_conn.py 支持 OceanBase,该文件实现了专门的高亮显示和聚合逻辑。
内存存储映射
内存系统使用的映射模式与标准 RAG 片段不同。它使用 map_message_to_ob_fields memory/utils/ob_conn.py:138-161 将消息字段映射到引擎特定的存储字段。
| 内存字段 | 存储字段 | 描述 |
|---|---|---|
message_type | message_type_kwd | 内存类型(RAW、语义等) |
status | status_int | 布尔可用性标志(1=活跃,0=不活跃)memory/utils/ob_conn.py:49-49 |
content | content_ltks | 实际文本内容 |
content_embed | q_{dim}_vec | 向量嵌入字段 memory/utils/ob_conn.py:160-160 |
内存服务数据流
来源:memory/utils/ob_conn.py:36-52、memory/utils/ob_conn.py:138-161、memory/utils/highlight_utils.py:68-89、memory/utils/aggregation_utils.py:20-56
管线与评估实用工具
操作日志
PipelineOperationLogService 管理与文档处理和基于图的任务相关的日志持久化。它跟踪各种任务类型的进度、持续时间和状态,包括 GRAPH_RAG、RAPTOR 和 MINDMAP api/db/services/pipeline_operation_log_service.py:140-142。
- 日志创建:
create方法从Document和Task实体中提取元数据,以构建全面的操作日志api/db/services/pipeline_operation_log_service.py:115-123。 - 进度跟踪:对于异步任务,它查询
TaskService以将进度信息同步到日志中api/db/services/pipeline_operation_log_service.py:142-151。
评估框架
EvaluationService 提供 RAG 性能基准测试的后端逻辑 api/db/services/evaluation_service.py:18-26。
- 数据集管理:支持创建和列出与特定知识库关联的评估数据集
api/db/services/evaluation_service.py:52-85。 - 测试用例管理:允许添加结构化的测试用例,包括问题、参考答案和相关片段 ID
api/db/services/evaluation_service.py:150-185。
来源:api/db/services/pipeline_operation_log_service.py:35-170、api/db/services/evaluation_service.py:44-190
存储与系统初始化
多引擎存储连接
RAGFlow 通过 StorageFactory 为多个对象存储后端提供统一接口。这种抽象允许系统根据 STORAGE_IMPL 环境变量支持多种云和本地存储解决方案 common/settings.py:133-134。
| 提供商 | 实现类 | 配置来源 | 文件引用 |
|---|---|---|---|
| MinIO | RAGFlowMinio | settings.MINIO | common/settings.py:183-183 |
| AWS S3 | RAGFlowS3 | settings.S3 | common/settings.py:186-186 |
| 阿里云 OSS | RAGFlowOSS | settings.OSS | common/settings.py:187-187 |
| Google Cloud | RAGFlowGCS | settings.GCS | common/settings.py:189-189 |
| Azure (SPN) | RAGFlowAzureSpnBlob | settings.AZURE | common/settings.py:184-184 |
| OpenDAL | OpenDALStorage | - | common/settings.py:188-188 |
来源:common/settings.py:181-195
操作与构建实用工具
配置实用工具
系统使用 common/settings.py 来初始化全局状态。它解密数据库配置 common/settings.py:74-74,通过 Redis 管理分布式 SECRET_KEY common/settings.py:176-176,并从 conf/llm_factories.json 加载大语言模型(LLM)工厂信息 common/settings.py:228-229。
监控与可观测性
- Langfuse 集成:
TenantLangfuseService在租户级别管理用于 Langfuse 追踪的 API 密钥(公钥/密钥)和主机配置api/db/services/langfuse_service.py:26-40。 - 搜索分析:
SearchService跟踪搜索配置和描述,允许租户管理其检索历史和配置api/db/services/search_service.py:26-53。
来源:common/settings.py:144-180、common/settings.py:197-231、api/db/services/langfuse_service.py:34-63、api/db/services/search_service.py:57-79