agentic_huge_data_base / wiki
页面 RAGFlow · 9.1 文档存储抽象·DeepWiki 中文全文译文

9.1 · 文档存储抽象(Document Store Abstraction)

复杂文档理解与引用检索 · 聚焦本章的模块关系、源码依据与实现要点。

项目RAGFlow 章节9.1 状态全文译文 模块检索、召回与索引、系统架构、文档对象与元数据、存储与持久化
源码线索
  • api/db/joint_services/memory_message_service.py
  • api/db/joint_services/user_account_service.py
  • api/db/services/doc_metadata_service.py
  • common/doc_store/es_conn_base.py
  • common/doc_store/infinity_conn_base.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/settings.py
  • conf/infinity_mapping.json
  • conf/mapping.json
模块标签
  • 检索、召回与索引
  • 系统架构
  • 文档对象与元数据
  • 存储与持久化
  • 界面与交互

章节正文

文档存储抽象

文档存储抽象层

相关源文件

本章引用的主要源码文件:

  • api/db/joint_services/memory_message_service.py
  • api/db/joint_services/user_account_service.py
  • api/db/services/doc_metadata_service.py
  • common/doc_store/es_conn_base.py
  • common/doc_store/infinity_conn_base.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/settings.py
  • conf/infinity_mapping.json
  • conf/mapping.json
  • memory/services/messages.py
  • memory/utils/aggregation_utils.py
  • memory/utils/es_conn.py
  • memory/utils/highlight_utils.py
  • memory/utils/infinity_conn.py
  • memory/utils/ob_conn.py
  • rag/utils/es_conn.py
  • rag/utils/infinity_conn.py
  • rag/utils/ob_conn.py
  • rag/utils/opensearch_conn.py
  • test/unit_test/memory/utils/test_ob_conn_aggregation.py
  • test/unit_test/memory/utils/test_ob_conn_highlight.py
  • test/unit_test/rag/utils/test_opensearch_doc_meta.py

目的与范围

本文档描述了 RAGFlow 的文档存储抽象层,该层提供了一个统一接口,用于与多种搜索引擎后端进行交互。该抽象层使 RAGFlow 能够支持 Elasticsearch、Infinity、OceanBase 和 OpenSearch 作为可插拔的存储引擎,而无需修改应用程序代码。该层负责处理片段存储、向量索引、全文搜索和混合检索操作。

该抽象层确保检索增强生成(RAG)管线与底层存储技术无关,无论底层是传统搜索引擎(Elasticsearch)、高性能 AI 原生数据库(Infinity),还是具备向量能力的分布式关系型数据库(OceanBase)。

架构总览

文档存储抽象层采用三层架构:抽象接口、后端特定基类,以及通过环境配置在运行时选择的具体实现。

图:文档存储抽象层

RAGFlow · 架构总览 · 图 1
RAGFlow · 架构总览 · 图 1

来源:common/doc_store/doc_store_base.py:34-34, common/settings.py:85-91, rag/utils/infinity_conn.py:29-30, rag/utils/es_conn.py:61-62, rag/utils/ob_conn.py:33-34, rag/utils/opensearch_conn.py:65-65

DocStoreConnection 接口

DocStoreConnection 抽象基类定义了所有后端实现必须满足的契约。它按功能类别组织,包括索引管理、增删改查(CRUD)和检索操作。

数据库与索引操作
方法参数用途
db_type()返回后端类型标识符("elasticsearch""infinity""oceanbase""opensearch"
health()返回集群健康状态,包括可用性和资源指标
create_idx()indexName, knowledgebaseId, vectorSize, parser_id创建具有指定向量维度模式的索引/表
delete_idx()indexName, knowledgebaseId删除索引/表
index_exist()indexName, knowledgebaseId检查索引/表是否存在
create_doc_meta_idx()index_name创建用于文档元数据的专用索引

来源:common/doc_store/doc_store_base.py:143-185, rag/utils/opensearch_conn.py:107-157

增删改查(CRUD)与搜索操作

search() 方法是检索的主要入口点,支持混合搜索、过滤和聚合。

@abstractmethod
def search(
    self,
    select_fields: list[str],
    highlight_fields: list[str],
    condition: dict,
    match_expressions: list[MatchExpr],
    order_by: OrderByExpr,
    offset: int,
    limit: int,
    index_names: str | list[str],
    knowledgebase_ids: list[str],
    agg_fields: list[str] | None = None,
    rank_feature: dict | None = None
):

关键参数:

  • condition:过滤条件的字典(例如 {"available_int": 1})。
  • match_expressions:要执行的 MatchTextExpr(BM25)、MatchDenseExpr(向量)或 FusionExpr 列表。
  • knowledgebase_ids:用于多租户分区或表选择。

来源:common/doc_store/doc_store_base.py:191-236, rag/utils/infinity_conn.py:94-107

后端实现

Infinity 实现

Infinity 对片段存储采用每个知识库一张表的策略。它执行大量的字段映射,以将 RAGFlow 的逻辑字段与物理数据库列对齐。

字段转换逻辑: Infinity 将 RAGFlow 的逻辑字段映射到 conf/infinity_mapping.json 中定义的特定列和全文索引。

RAGFlow 字段Infinity 列全文索引
docnm_kwddocnmft_docnm_rag_coarse
content_ltkscontentft_content_rag_coarse
important_kwdimportant_keywordsft_important_keywords_rag_coarse

来源:rag/utils/infinity_conn.py:61-88, conf/infinity_mapping.json:9-17

实现细节:

  • 表分区: 片段存储在名为 {index_name}_{kb_id} 的表中 rag/utils/infinity_conn.py:164-165
  • 关键字处理: Infinity 将 *_kwd 字段视为关键字列表,但 knowledge_graph_kwd 等特定排除项除外 rag/utils/infinity_conn.py:38-42
  • 元数据争用: 实现了 _retry_on_meta_contention,采用指数退避和抖动策略,以处理索引创建或删除期间 RocksDB 的"资源繁忙"错误(代码 9003)common/doc_store/infinity_conn_base.py:106-145
Elasticsearch 实现

Elasticsearch 对每个租户使用单个索引,使用 kb_id 作为过滤字段进行隔离。

关键特性:

  • 混合搜索: 结合 query_string 进行文本匹配,使用 knn 进行向量相似度搜索 rag/utils/es_conn.py:183-205
  • Search After: 实现了 _search_with_search_after,用于在 MAX_RESULT_WINDOW(10,000)之外进行高效深度分页 rag/utils/es_conn.py:76-140
  • PageRank 调整: 使用自定义脚本 _PAGERANK_FEA_ADJUST_SCRIPT,在片段反馈期间对 pagerank_fea 进行原子更新 rag/utils/es_conn.py:36-58

来源:rag/utils/es_conn.py:31-154

OceanBase 实现

OceanBase 实现使用 pyobvector 和 SQLAlchemy,在关系型后端之上提供向量搜索能力。

模式定义: OceanBase 实现在 rag/utils/ob_conn.py 中定义了严格的模式,包含片段元数据、内容和向量数据的列。

  • 向量存储: 通过 pyobvector 使用 ARRAY 类型 rag/utils/ob_conn.py:26-30
  • 全文列: content_with_weightcontent_ltkstitle_tks 用于 BM25 搜索 rag/utils/ob_conn.py:116-131
  • 列定义: 诸如 available_intknowledge_graph_kwdpagerank_fea 等特定列使用 SQLAlchemy 的 Column 对象显式定义 rag/utils/ob_conn.py:52-99

来源:rag/utils/ob_conn.py:52-99, rag/utils/ob_conn.py:116-131

OpenSearch 实现

OpenSearch 实现为 OpenSearch 2.x 后端提供了统一接口,镜像了 Elasticsearch 的抽象,但使用了 opensearchpy 客户端。

关键特性:

  • 元数据索引: 支持使用 conf/doc_meta_es_mapping.json 中的专用映射创建 create_doc_meta_idx rag/utils/opensearch_conn.py:129-156
  • 工具函数: 提供 refresh_idxcount_idx 用于索引管理和监控 rag/utils/opensearch_conn.py:158-191

来源:rag/utils/opensearch_conn.py:65-102, rag/utils/opensearch_conn.py:129-191

匹配表达式系统

该系统使用统一的表达式语言来描述跨不同后端的搜索意图。

图:自然语言到代码实体空间

RAGFlow · 匹配表达式系统 · 图 2
RAGFlow · 匹配表达式系统 · 图 2

来源:common/doc_store/doc_store_base.py:56-127

MatchTextExpr

用于关键字和语义文本匹配。它包含 matching_textfieldsextra_options 等参数,用于控制 minimum_should_match common/doc_store/doc_store_base.py:56-68, memory/utils/es_conn.py:170-172

MatchDenseExpr

用于向量相似度搜索。它指定目标向量列(例如 q_768_vec)和查询嵌入向量 common/doc_store/doc_store_base.py:70-86

FusionExpr

启用混合搜索。Infinity 和 Elasticsearch 等后端使用 FusionExpr 中的参数来确定如何合并来自多个搜索路径的结果 rag/utils/infinity_conn.py:128-138, rag/utils/es_conn.py:185-190

文档与记忆存储

RAGFlow 在文档存储中维护文档级元数据和对话记忆。

文档元数据

DocMetadataService 管理,文档元数据存储在名为 ragflow_doc_meta_{tenant_id} 的索引中 api/db/services/doc_metadata_service.py:40-50。该存储作为文档级属性的单一事实来源。

  • 搜索迭代: DocMetadataService._iter_search_results 处理不同后端的不同输出格式(DataFrame、ES 字典、OceanBase SearchResult)api/db/services/doc_metadata_service.py:103-158
记忆系统

对话记忆使用每个用户的索引存储在文档存储中。

  • 索引名称: memory_{uid}index_name(uid) 生成 memory/services/messages.py:24-24
  • 消息类型: 支持 RAW 对话存储和提取的语义记忆 api/db/joint_services/memory_message_service.py:64-90
  • MsgStoreConn: 文档存储抽象的专用实例(settings.msgStoreConn)用于记忆操作 common/settings.py:91-91
  • 字段映射: map_message_to_es_fields 将消息字典转换为存储模式,包括名为 q_{dim}_vec 的向量嵌入 memory/utils/es_conn.py:53-78

来源:memory/services/messages.py:24-24, api/db/joint_services/memory_message_service.py:64-90, common/settings.py:91-91, memory/utils/es_conn.py:53-78

引擎选择与初始化

存储引擎的选择在 common/settings.py 的系统初始化期间确定。

图:系统初始化流程

RAGFlow · 引擎选择与初始化 · 图 3
RAGFlow · 引擎选择与初始化 · 图 3

来源:common/settings.py:85-91, common/settings.py:246-260

存储实现映射
引擎类名文件路径
ElasticsearchESConnectionrag/utils/es_conn.py
InfinityInfinityConnectionrag/utils/infinity_conn.py
OceanBaseOBConnectionrag/utils/ob_conn.py
OpenSearchOSConnectionrag/utils/opensearch_conn.py

来源:common/settings.py:29-32, common/settings.py:246-260, rag/utils/opensearch_conn.py:65-65