agentic_huge_data_base / wiki
页面 Onyx · 7.3 文档索引管线·DeepWiki 中文全文译文

7.3 · 文档索引管线(Document Indexing Pipeline)

企业连接器与统一搜索 · 聚焦本章的模块关系、源码依据与实现要点。

项目Onyx 章节7.3 状态全文译文 模块文档对象与元数据、工作流与编排、检索、召回与索引、入库与解析
源码线索
  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/onyx/background/celery/tasks/docfetching/tasks.py
  • backend/onyx/background/celery/tasks/docprocessing/tasks.py
  • backend/onyx/background/celery/tasks/models.py
  • backend/onyx/background/indexing/job_client.py
  • backend/onyx/background/indexing/run_docfetching.py
  • backend/onyx/connectors/models.py
  • backend/onyx/context/search/retrieval/search_runner.py
  • backend/onyx/db/document.py
  • backend/onyx/db/file_record.py
模块标签
  • 文档对象与元数据
  • 工作流与编排
  • 检索、召回与索引
  • 入库与解析
  • 存储与持久化

章节正文

文档索引管线

文档索引管线

相关源文件

本章引用的主要源码文件:

  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/onyx/background/celery/tasks/docfetching/tasks.py
  • backend/onyx/background/celery/tasks/docprocessing/tasks.py
  • backend/onyx/background/celery/tasks/models.py
  • backend/onyx/background/indexing/job_client.py
  • backend/onyx/background/indexing/run_docfetching.py
  • backend/onyx/connectors/models.py
  • backend/onyx/context/search/retrieval/search_runner.py
  • backend/onyx/db/document.py
  • backend/onyx/db/file_record.py
  • backend/onyx/db/index_attempt_metrics.py
  • backend/onyx/db/index_attempt_metrics_models.py
  • backend/onyx/db/sync_record.py
  • backend/onyx/document_index/interfaces.py
  • backend/onyx/document_index/interfaces_new.py
  • backend/onyx/document_index/vespa/index.py
  • backend/onyx/document_index/vespa/indexing_utils.py
  • backend/onyx/document_index/vespa/vespa_document_index.py
  • backend/onyx/document_index/vespa_constants.py
  • backend/onyx/file_store/staging.py
  • backend/onyx/indexing/adapters/document_indexing_adapter.py
  • backend/onyx/indexing/adapters/user_file_indexing_adapter.py
  • backend/onyx/indexing/chunker.py
  • backend/onyx/indexing/indexing_pipeline.py
  • backend/onyx/indexing/models.py
  • backend/onyx/redis/redis_connector.py
  • backend/onyx/redis/redis_connector_index.py
  • backend/onyx/server/onyx_api/ingestion.py
  • backend/onyx/utils/postgres_sanitization.py
  • backend/scripts/query_time_check/seed_dummy_docs.py
  • backend/tests/external_dependency_unit/celery/test_docprocessing_priority.py
  • backend/tests/external_dependency_unit/celery/test_persona_file_sync.py
  • backend/tests/external_dependency_unit/celery/test_user_file_indexing_adapter.py
  • backend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.py
  • backend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.py
  • backend/tests/external_dependency_unit/indexing/__init__.py
  • backend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.py
  • backend/tests/external_dependency_unit/indexing_helpers.py
  • backend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.py
  • backend/tests/unit/onyx/background/indexing/test_simple_job_terminate.py
  • backend/tests/unit/onyx/db/test_tools.py
  • backend/tests/unit/onyx/indexing/test_embedder.py
  • backend/tests/unit/onyx/indexing/test_indexing_pipeline.py
  • backend/tests/unit/onyx/indexing/test_personas_in_chunks.py

目的与范围

本文档介绍 Onyx 中的文档索引管线——这是一个后台处理系统,负责从连接器(Connector)获取文档,将其处理为可搜索的片段(Chunk),通过 indexing_model_server 生成嵌入向量(Embedding),并最终存储到 Vespa、PostgreSQL 和 MinIO 中。该管线使用 Celery 进行编排,并通过 Redis 进行协调,以支持跨多个并发连接器的大规模数据入库。

核心逻辑位于 connector_indexing_generator_taskrun_indexing_pipeline 函数中,它们管理从原始数据到向量搜索就绪状态的整个流程。

管线架构

索引管线实现为一个分布式任务工作流。它将文档获取(I/O 密集型)与文档处理和嵌入(CPU/GPU 密集型)的关注点分离。

高层系统流程
Onyx · 高层系统流程 · 图 1
Onyx · 高层系统流程 · 图 1

来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111backend/onyx/indexing/indexing_pipeline.py:111-137backend/onyx/background/indexing/run_docfetching.py:95-136

文档获取阶段

获取阶段由 connector_indexing_generator_task 处理,该任务使用 run_docfetching.py 中的逻辑。它通过 ConnectorRunner 遍历外部数据源,并生成 Document 对象批次。

关键实体与函数
  • instantiate_connector:工厂函数,根据 DocumentSource 返回特定的连接器类(例如 Slack、Google Drive)。backend/onyx/background/indexing/run_docfetching.py:117-124
  • ConnectorRunner:管理迭代逻辑,处理基于 POLLLOAD_STATEEVENTInputTypebackend/onyx/background/indexing/run_docfetching.py:30
  • IndexAttempt:数据库模型,用于跟踪单次索引运行的生命周期,包括状态和开始时间。backend/onyx/db/models.py:94
检查点机制

为支持大数据集和容错,管线实现了检查点机制。继承自 CheckpointedConnector 的连接器可以通过 save_checkpoint 将其状态(例如分页游标)保存到 MinIO。如果任务被中断,它会从 get_latest_valid_checkpoint 恢复执行。backend/onyx/background/indexing/run_docfetching.py:16-18backend/onyx/connectors/interfaces.py:34

来源: backend/onyx/background/indexing/run_docfetching.py:95-136backend/onyx/connectors/models.py:23-27backend/onyx/db/models.py:94

文档处理管线

获取到一批 Document 对象后,它们会被传递给 run_indexing_pipeline 函数。该函数是转换和插入的主要编排器。backend/onyx/indexing/indexing_pipeline.py:111

1. 提取与解析

对于基于文件的连接器,文本提取通过专门的逻辑处理。它支持多种格式:

  • 章节:文档被拆分为 TextSectionImageSectionTabularSectionbackend/onyx/connectors/models.py:38-87
  • 图像:如果启用了视觉功能,图像会通过 summarize_image_with_error_handling 进行处理。backend/onyx/indexing/indexing_pipeline.py:52
  • 上下文检索增强生成(Contextual RAG):如果启用了 ENABLE_CONTEXTUAL_RAG,管线会使用 DOCUMENT_SUMMARY_PROMPT 生成摘要。backend/onyx/indexing/indexing_pipeline.py:14-19backend/onyx/indexing/indexing_pipeline.py:83
2. 片段切分

Chunker 类将章节拆分为更小的片段,称为 DocAwareChunkbackend/onyx/indexing/chunker.py:63backend/onyx/indexing/models.py:49

  • 它使用 BaseTokenizer 确保片段大小不超过嵌入模型的上下文窗口。backend/onyx/indexing/indexing_pipeline.py:78-79
  • 片段增强:片段会附加元数据后缀和标题前缀,以提升语义搜索性能。backend/onyx/indexing/models.py:56-62
3. 嵌入向量生成

IndexingEmbedder(通常是 DefaultIndexingEmbedder)与 indexing_model_server 通信。backend/onyx/indexing/embedder.py:65backend/onyx/indexing/indexing_pipeline.py:110

  • 函数embed_chunks_with_failure_handling 管理对模型服务器的 API 调用。backend/onyx/indexing/indexing_pipeline.py:64
  • 模型详情EmbeddingModelDetail 类封装了模型名称、提供方和 API 配置。backend/onyx/indexing/models.py:150-184

来源: backend/onyx/indexing/indexing_pipeline.py:111-148backend/onyx/indexing/chunker.py:63backend/onyx/indexing/models.py:49-93

存储与索引

Onyx 执行"双写"操作,以确保语义搜索(Vespa/OpenSearch)和关系型元数据(PostgreSQL)保持同步。

Vespa 集成

VespaDocumentIndex 负责将片段插入向量数据库。backend/onyx/document_index/vespa/vespa_document_index.py:29

  • index 函数:接收 DocMetadataAwareIndexChunk 对象,并将其转换为 Vespa 兼容的 JSON 格式。backend/onyx/document_index/vespa/indexing_utils.py:139-181
  • ID 生成:使用 get_uuid_from_chunk 为每个片段生成确定性的 UUID。backend/onyx/document_index/vespa/indexing_utils.py:152
  • 张量:嵌入向量以张量形式存储在 Vespa 中(例如 FULL_CHUNK_EMBEDDING_KEY)。backend/onyx/document_index/vespa/indexing_utils.py:156
PostgreSQL 元数据

_upsert_documents_in_db 更新关系型存储:

  • Document:存储基本信息,如 semantic_identifierdoc_metadatabackend/onyx/indexing/indexing_pipeline.py:151-168
  • Tagupsert_document_tags 存储用于过滤的元数据键/值对。backend/onyx/indexing/indexing_pipeline.py:46
  • HierarchyNode:通过 link_hierarchy_nodes_to_documents 将文档链接到父结构。backend/onyx/indexing/indexing_pipeline.py:39

来源: backend/onyx/document_index/vespa/indexing_utils.py:139-181backend/onyx/indexing/indexing_pipeline.py:151-168backend/onyx/db/document.py:35-50

数据流图:从自然语言到代码实体

此图将概念上的"自然语言空间"(例如 PDF 文件)映射到"代码实体空间"(具体的类和数据库模型)。

Onyx · 数据流图:从自然语言到代码实体 · 图 2
Onyx · 数据流图:从自然语言到代码实体 · 图 2

来源: backend/onyx/indexing/indexing_pipeline.py:111-148backend/onyx/indexing/chunker.py:63backend/onyx/document_index/vespa/indexing_utils.py:139-144

索引协调与 Redis 模式

为防止竞态条件并管理资源,Onyx 使用基于 Redis 的协调机制。

索引栅栏

"栅栏"用于标记一个 ConnectorCredentialPair(CCPair)正在被索引。

  • RedisDocprocessing:提供方法检查某个尝试的栅栏是否处于激活状态。backend/onyx/background/celery/tasks/docprocessing/tasks.py:115backend/onyx/redis/redis_docprocessing.py:15
  • IndexingCoordination:一个数据库和逻辑层,确保每个 CCPair 只有一个活跃的 IndexAttemptbackend/onyx/db/indexing_coordination.py:92-93
心跳机制

索引任务会更新 Redis 中的心跳信号,以表明其仍在运行。

  • start_heartbeat / stop_heartbeat:这些函数管理一个后台线程,在长时间运行的索引任务期间定期更新 Redis。backend/onyx/background/celery/tasks/docprocessing/tasks.py:34-35

来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:34-42backend/onyx/db/indexing_coordination.py:92-93

关键数据模型总结

模型代码实体存储位置作用
Documentonyx.connectors.models.Document内存获取数据的中间表示
Chunkonyx.indexing.models.DocAwareChunk内存带有相关元数据和源链接的文本片段
Index Recordonyx.db.models.DocumentPostgres用于访问控制和 UI 展示的关系型元数据
Vespa DocVespaDocumentFieldsVespa混合搜索查询(向量 + 关键词)的主要目标
Index Attemptonyx.db.models.IndexAttemptPostgres跟踪索引作业的状态、错误和进度

来源: backend/onyx/connectors/models.py:185backend/onyx/indexing/models.py:49backend/onyx/db/models.py:41backend/onyx/db/models.py:94backend/onyx/document_index/interfaces.py:44