文档存储抽象
文档存储抽象层
相关源文件
本章引用的主要源码文件:
api/db/joint_services/memory_message_service.pyapi/db/joint_services/user_account_service.pyapi/db/services/doc_metadata_service.pycommon/doc_store/es_conn_base.pycommon/doc_store/infinity_conn_base.pycommon/doc_store/ob_conn_base.pycommon/doc_store/ob_conn_pool.pycommon/settings.pyconf/infinity_mapping.jsonconf/mapping.jsonmemory/services/messages.pymemory/utils/aggregation_utils.pymemory/utils/es_conn.pymemory/utils/highlight_utils.pymemory/utils/infinity_conn.pymemory/utils/ob_conn.pyrag/utils/es_conn.pyrag/utils/infinity_conn.pyrag/utils/ob_conn.pyrag/utils/opensearch_conn.pytest/unit_test/memory/utils/test_ob_conn_aggregation.pytest/unit_test/memory/utils/test_ob_conn_highlight.pytest/unit_test/rag/utils/test_opensearch_doc_meta.py
目的与范围
本文档描述了 RAGFlow 的文档存储抽象层,该层提供了一个统一接口,用于与多种搜索引擎后端进行交互。该抽象层使 RAGFlow 能够支持 Elasticsearch、Infinity、OceanBase 和 OpenSearch 作为可插拔的存储引擎,而无需修改应用程序代码。该层负责处理片段存储、向量索引、全文搜索和混合检索操作。
该抽象层确保检索增强生成(RAG)管线与底层存储技术无关,无论底层是传统搜索引擎(Elasticsearch)、高性能 AI 原生数据库(Infinity),还是具备向量能力的分布式关系型数据库(OceanBase)。
架构总览
文档存储抽象层采用三层架构:抽象接口、后端特定基类,以及通过环境配置在运行时选择的具体实现。
图:文档存储抽象层
来源:common/doc_store/doc_store_base.py:34-34, common/settings.py:85-91, rag/utils/infinity_conn.py:29-30, rag/utils/es_conn.py:61-62, rag/utils/ob_conn.py:33-34, rag/utils/opensearch_conn.py:65-65
DocStoreConnection 接口
DocStoreConnection 抽象基类定义了所有后端实现必须满足的契约。它按功能类别组织,包括索引管理、增删改查(CRUD)和检索操作。
数据库与索引操作
| 方法 | 参数 | 用途 |
|---|---|---|
db_type() | 无 | 返回后端类型标识符("elasticsearch"、"infinity"、"oceanbase"、"opensearch") |
health() | 无 | 返回集群健康状态,包括可用性和资源指标 |
create_idx() | indexName, knowledgebaseId, vectorSize, parser_id | 创建具有指定向量维度模式的索引/表 |
delete_idx() | indexName, knowledgebaseId | 删除索引/表 |
index_exist() | indexName, knowledgebaseId | 检查索引/表是否存在 |
create_doc_meta_idx() | index_name | 创建用于文档元数据的专用索引 |
来源:common/doc_store/doc_store_base.py:143-185, rag/utils/opensearch_conn.py:107-157
增删改查(CRUD)与搜索操作
search() 方法是检索的主要入口点,支持混合搜索、过滤和聚合。
@abstractmethod
def search(
self,
select_fields: list[str],
highlight_fields: list[str],
condition: dict,
match_expressions: list[MatchExpr],
order_by: OrderByExpr,
offset: int,
limit: int,
index_names: str | list[str],
knowledgebase_ids: list[str],
agg_fields: list[str] | None = None,
rank_feature: dict | None = None
):
关键参数:
condition:过滤条件的字典(例如{"available_int": 1})。match_expressions:要执行的MatchTextExpr(BM25)、MatchDenseExpr(向量)或FusionExpr列表。knowledgebase_ids:用于多租户分区或表选择。
来源:common/doc_store/doc_store_base.py:191-236, rag/utils/infinity_conn.py:94-107
后端实现
Infinity 实现
Infinity 对片段存储采用每个知识库一张表的策略。它执行大量的字段映射,以将 RAGFlow 的逻辑字段与物理数据库列对齐。
字段转换逻辑: Infinity 将 RAGFlow 的逻辑字段映射到 conf/infinity_mapping.json 中定义的特定列和全文索引。
| RAGFlow 字段 | Infinity 列 | 全文索引 |
|---|---|---|
docnm_kwd | docnm | ft_docnm_rag_coarse |
content_ltks | content | ft_content_rag_coarse |
important_kwd | important_keywords | ft_important_keywords_rag_coarse |
来源:rag/utils/infinity_conn.py:61-88, conf/infinity_mapping.json:9-17
实现细节:
- 表分区: 片段存储在名为
{index_name}_{kb_id}的表中rag/utils/infinity_conn.py:164-165。 - 关键字处理: Infinity 将
*_kwd字段视为关键字列表,但knowledge_graph_kwd等特定排除项除外rag/utils/infinity_conn.py:38-42。 - 元数据争用: 实现了
_retry_on_meta_contention,采用指数退避和抖动策略,以处理索引创建或删除期间 RocksDB 的"资源繁忙"错误(代码 9003)common/doc_store/infinity_conn_base.py:106-145。
Elasticsearch 实现
Elasticsearch 对每个租户使用单个索引,使用 kb_id 作为过滤字段进行隔离。
关键特性:
- 混合搜索: 结合
query_string进行文本匹配,使用knn进行向量相似度搜索rag/utils/es_conn.py:183-205。 - Search After: 实现了
_search_with_search_after,用于在MAX_RESULT_WINDOW(10,000)之外进行高效深度分页rag/utils/es_conn.py:76-140。 - PageRank 调整: 使用自定义脚本
_PAGERANK_FEA_ADJUST_SCRIPT,在片段反馈期间对pagerank_fea进行原子更新rag/utils/es_conn.py:36-58。
来源:rag/utils/es_conn.py:31-154
OceanBase 实现
OceanBase 实现使用 pyobvector 和 SQLAlchemy,在关系型后端之上提供向量搜索能力。
模式定义: OceanBase 实现在 rag/utils/ob_conn.py 中定义了严格的模式,包含片段元数据、内容和向量数据的列。
- 向量存储: 通过
pyobvector使用ARRAY类型rag/utils/ob_conn.py:26-30。 - 全文列:
content_with_weight、content_ltks和title_tks用于 BM25 搜索rag/utils/ob_conn.py:116-131。 - 列定义: 诸如
available_int、knowledge_graph_kwd和pagerank_fea等特定列使用 SQLAlchemy 的Column对象显式定义rag/utils/ob_conn.py:52-99。
来源:rag/utils/ob_conn.py:52-99, rag/utils/ob_conn.py:116-131
OpenSearch 实现
OpenSearch 实现为 OpenSearch 2.x 后端提供了统一接口,镜像了 Elasticsearch 的抽象,但使用了 opensearchpy 客户端。
关键特性:
- 元数据索引: 支持使用
conf/doc_meta_es_mapping.json中的专用映射创建create_doc_meta_idxrag/utils/opensearch_conn.py:129-156。 - 工具函数: 提供
refresh_idx和count_idx用于索引管理和监控rag/utils/opensearch_conn.py:158-191。
来源:rag/utils/opensearch_conn.py:65-102, rag/utils/opensearch_conn.py:129-191
匹配表达式系统
该系统使用统一的表达式语言来描述跨不同后端的搜索意图。
图:自然语言到代码实体空间
来源:common/doc_store/doc_store_base.py:56-127
MatchTextExpr
用于关键字和语义文本匹配。它包含 matching_text、fields 和 extra_options 等参数,用于控制 minimum_should_match common/doc_store/doc_store_base.py:56-68, memory/utils/es_conn.py:170-172。
MatchDenseExpr
用于向量相似度搜索。它指定目标向量列(例如 q_768_vec)和查询嵌入向量 common/doc_store/doc_store_base.py:70-86。
FusionExpr
启用混合搜索。Infinity 和 Elasticsearch 等后端使用 FusionExpr 中的参数来确定如何合并来自多个搜索路径的结果 rag/utils/infinity_conn.py:128-138, rag/utils/es_conn.py:185-190。
文档与记忆存储
RAGFlow 在文档存储中维护文档级元数据和对话记忆。
文档元数据
由 DocMetadataService 管理,文档元数据存储在名为 ragflow_doc_meta_{tenant_id} 的索引中 api/db/services/doc_metadata_service.py:40-50。该存储作为文档级属性的单一事实来源。
- 搜索迭代:
DocMetadataService._iter_search_results处理不同后端的不同输出格式(DataFrame、ES 字典、OceanBase SearchResult)api/db/services/doc_metadata_service.py:103-158。
记忆系统
对话记忆使用每个用户的索引存储在文档存储中。
- 索引名称:
memory_{uid}由index_name(uid)生成memory/services/messages.py:24-24。 - 消息类型: 支持
RAW对话存储和提取的语义记忆api/db/joint_services/memory_message_service.py:64-90。 - MsgStoreConn: 文档存储抽象的专用实例(
settings.msgStoreConn)用于记忆操作common/settings.py:91-91。 - 字段映射:
map_message_to_es_fields将消息字典转换为存储模式,包括名为q_{dim}_vec的向量嵌入memory/utils/es_conn.py:53-78。
来源:memory/services/messages.py:24-24, api/db/joint_services/memory_message_service.py:64-90, common/settings.py:91-91, memory/utils/es_conn.py:53-78
引擎选择与初始化
存储引擎的选择在 common/settings.py 的系统初始化期间确定。
图:系统初始化流程
来源:common/settings.py:85-91, common/settings.py:246-260
存储实现映射
| 引擎 | 类名 | 文件路径 |
|---|---|---|
| Elasticsearch | ESConnection | rag/utils/es_conn.py |
| Infinity | InfinityConnection | rag/utils/infinity_conn.py |
| OceanBase | OBConnection | rag/utils/ob_conn.py |
| OpenSearch | OSConnection | rag/utils/opensearch_conn.py |
来源:common/settings.py:29-32, common/settings.py:246-260, rag/utils/opensearch_conn.py:65-65