文档索引管线
文档索引管线
相关源文件
本章引用的主要源码文件:
backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.pybackend/onyx/background/celery/tasks/docfetching/tasks.pybackend/onyx/background/celery/tasks/docprocessing/tasks.pybackend/onyx/background/celery/tasks/models.pybackend/onyx/background/indexing/job_client.pybackend/onyx/background/indexing/run_docfetching.pybackend/onyx/connectors/models.pybackend/onyx/context/search/retrieval/search_runner.pybackend/onyx/db/document.pybackend/onyx/db/file_record.pybackend/onyx/db/index_attempt_metrics.pybackend/onyx/db/index_attempt_metrics_models.pybackend/onyx/db/sync_record.pybackend/onyx/document_index/interfaces.pybackend/onyx/document_index/interfaces_new.pybackend/onyx/document_index/vespa/index.pybackend/onyx/document_index/vespa/indexing_utils.pybackend/onyx/document_index/vespa/vespa_document_index.pybackend/onyx/document_index/vespa_constants.pybackend/onyx/file_store/staging.pybackend/onyx/indexing/adapters/document_indexing_adapter.pybackend/onyx/indexing/adapters/user_file_indexing_adapter.pybackend/onyx/indexing/chunker.pybackend/onyx/indexing/indexing_pipeline.pybackend/onyx/indexing/models.pybackend/onyx/redis/redis_connector.pybackend/onyx/redis/redis_connector_index.pybackend/onyx/server/onyx_api/ingestion.pybackend/onyx/utils/postgres_sanitization.pybackend/scripts/query_time_check/seed_dummy_docs.pybackend/tests/external_dependency_unit/celery/test_docprocessing_priority.pybackend/tests/external_dependency_unit/celery/test_persona_file_sync.pybackend/tests/external_dependency_unit/celery/test_user_file_indexing_adapter.pybackend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.pybackend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.pybackend/tests/external_dependency_unit/indexing/__init__.pybackend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.pybackend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.pybackend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.pybackend/tests/external_dependency_unit/indexing_helpers.pybackend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.pybackend/tests/unit/onyx/background/indexing/test_simple_job_terminate.pybackend/tests/unit/onyx/db/test_tools.pybackend/tests/unit/onyx/indexing/test_embedder.pybackend/tests/unit/onyx/indexing/test_indexing_pipeline.pybackend/tests/unit/onyx/indexing/test_personas_in_chunks.py
目的与范围
本文档介绍 Onyx 中的文档索引管线——这是一个后台处理系统,负责从连接器(Connector)获取文档,将其处理为可搜索的片段(Chunk),通过 indexing_model_server 生成嵌入向量(Embedding),并最终存储到 Vespa、PostgreSQL 和 MinIO 中。该管线使用 Celery 进行编排,并通过 Redis 进行协调,以支持跨多个并发连接器的大规模数据入库。
核心逻辑位于 connector_indexing_generator_task 和 run_indexing_pipeline 函数中,它们管理从原始数据到向量搜索就绪状态的整个流程。
管线架构
索引管线实现为一个分布式任务工作流。它将文档获取(I/O 密集型)与文档处理和嵌入(CPU/GPU 密集型)的关注点分离。
高层系统流程
来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111,backend/onyx/indexing/indexing_pipeline.py:111-137,backend/onyx/background/indexing/run_docfetching.py:95-136
文档获取阶段
获取阶段由 connector_indexing_generator_task 处理,该任务使用 run_docfetching.py 中的逻辑。它通过 ConnectorRunner 遍历外部数据源,并生成 Document 对象批次。
关键实体与函数
instantiate_connector:工厂函数,根据DocumentSource返回特定的连接器类(例如 Slack、Google Drive)。backend/onyx/background/indexing/run_docfetching.py:117-124ConnectorRunner:管理迭代逻辑,处理基于POLL、LOAD_STATE或EVENT的InputType。backend/onyx/background/indexing/run_docfetching.py:30IndexAttempt:数据库模型,用于跟踪单次索引运行的生命周期,包括状态和开始时间。backend/onyx/db/models.py:94
检查点机制
为支持大数据集和容错,管线实现了检查点机制。继承自 CheckpointedConnector 的连接器可以通过 save_checkpoint 将其状态(例如分页游标)保存到 MinIO。如果任务被中断,它会从 get_latest_valid_checkpoint 恢复执行。backend/onyx/background/indexing/run_docfetching.py:16-18,backend/onyx/connectors/interfaces.py:34
来源: backend/onyx/background/indexing/run_docfetching.py:95-136,backend/onyx/connectors/models.py:23-27,backend/onyx/db/models.py:94
文档处理管线
获取到一批 Document 对象后,它们会被传递给 run_indexing_pipeline 函数。该函数是转换和插入的主要编排器。backend/onyx/indexing/indexing_pipeline.py:111
1. 提取与解析
对于基于文件的连接器,文本提取通过专门的逻辑处理。它支持多种格式:
- 章节:文档被拆分为
TextSection、ImageSection或TabularSection。backend/onyx/connectors/models.py:38-87 - 图像:如果启用了视觉功能,图像会通过
summarize_image_with_error_handling进行处理。backend/onyx/indexing/indexing_pipeline.py:52 - 上下文检索增强生成(Contextual RAG):如果启用了
ENABLE_CONTEXTUAL_RAG,管线会使用DOCUMENT_SUMMARY_PROMPT生成摘要。backend/onyx/indexing/indexing_pipeline.py:14-19,backend/onyx/indexing/indexing_pipeline.py:83
2. 片段切分
Chunker 类将章节拆分为更小的片段,称为 DocAwareChunk。backend/onyx/indexing/chunker.py:63,backend/onyx/indexing/models.py:49
- 它使用
BaseTokenizer确保片段大小不超过嵌入模型的上下文窗口。backend/onyx/indexing/indexing_pipeline.py:78-79 - 片段增强:片段会附加元数据后缀和标题前缀,以提升语义搜索性能。
backend/onyx/indexing/models.py:56-62
3. 嵌入向量生成
IndexingEmbedder(通常是 DefaultIndexingEmbedder)与 indexing_model_server 通信。backend/onyx/indexing/embedder.py:65,backend/onyx/indexing/indexing_pipeline.py:110
- 函数:
embed_chunks_with_failure_handling管理对模型服务器的 API 调用。backend/onyx/indexing/indexing_pipeline.py:64 - 模型详情:
EmbeddingModelDetail类封装了模型名称、提供方和 API 配置。backend/onyx/indexing/models.py:150-184
来源: backend/onyx/indexing/indexing_pipeline.py:111-148,backend/onyx/indexing/chunker.py:63,backend/onyx/indexing/models.py:49-93
存储与索引
Onyx 执行"双写"操作,以确保语义搜索(Vespa/OpenSearch)和关系型元数据(PostgreSQL)保持同步。
Vespa 集成
VespaDocumentIndex 负责将片段插入向量数据库。backend/onyx/document_index/vespa/vespa_document_index.py:29
index函数:接收DocMetadataAwareIndexChunk对象,并将其转换为 Vespa 兼容的 JSON 格式。backend/onyx/document_index/vespa/indexing_utils.py:139-181- ID 生成:使用
get_uuid_from_chunk为每个片段生成确定性的 UUID。backend/onyx/document_index/vespa/indexing_utils.py:152 - 张量:嵌入向量以张量形式存储在 Vespa 中(例如
FULL_CHUNK_EMBEDDING_KEY)。backend/onyx/document_index/vespa/indexing_utils.py:156
PostgreSQL 元数据
_upsert_documents_in_db 更新关系型存储:
Document表:存储基本信息,如semantic_identifier和doc_metadata。backend/onyx/indexing/indexing_pipeline.py:151-168Tag表:upsert_document_tags存储用于过滤的元数据键/值对。backend/onyx/indexing/indexing_pipeline.py:46HierarchyNode:通过link_hierarchy_nodes_to_documents将文档链接到父结构。backend/onyx/indexing/indexing_pipeline.py:39
来源: backend/onyx/document_index/vespa/indexing_utils.py:139-181,backend/onyx/indexing/indexing_pipeline.py:151-168,backend/onyx/db/document.py:35-50
数据流图:从自然语言到代码实体
此图将概念上的"自然语言空间"(例如 PDF 文件)映射到"代码实体空间"(具体的类和数据库模型)。
来源: backend/onyx/indexing/indexing_pipeline.py:111-148,backend/onyx/indexing/chunker.py:63,backend/onyx/document_index/vespa/indexing_utils.py:139-144
索引协调与 Redis 模式
为防止竞态条件并管理资源,Onyx 使用基于 Redis 的协调机制。
索引栅栏
"栅栏"用于标记一个 ConnectorCredentialPair(CCPair)正在被索引。
RedisDocprocessing:提供方法检查某个尝试的栅栏是否处于激活状态。backend/onyx/background/celery/tasks/docprocessing/tasks.py:115,backend/onyx/redis/redis_docprocessing.py:15IndexingCoordination:一个数据库和逻辑层,确保每个 CCPair 只有一个活跃的IndexAttempt。backend/onyx/db/indexing_coordination.py:92-93
心跳机制
索引任务会更新 Redis 中的心跳信号,以表明其仍在运行。
start_heartbeat/stop_heartbeat:这些函数管理一个后台线程,在长时间运行的索引任务期间定期更新 Redis。backend/onyx/background/celery/tasks/docprocessing/tasks.py:34-35
来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:34-42,backend/onyx/db/indexing_coordination.py:92-93
关键数据模型总结
| 模型 | 代码实体 | 存储位置 | 作用 |
|---|---|---|---|
| Document | onyx.connectors.models.Document | 内存 | 获取数据的中间表示 |
| Chunk | onyx.indexing.models.DocAwareChunk | 内存 | 带有相关元数据和源链接的文本片段 |
| Index Record | onyx.db.models.Document | Postgres | 用于访问控制和 UI 展示的关系型元数据 |
| Vespa Doc | VespaDocumentFields | Vespa | 混合搜索查询(向量 + 关键词)的主要目标 |
| Index Attempt | onyx.db.models.IndexAttempt | Postgres | 跟踪索引作业的状态、错误和进度 |
来源: backend/onyx/connectors/models.py:185,backend/onyx/indexing/models.py:49,backend/onyx/db/models.py:41,backend/onyx/db/models.py:94,backend/onyx/document_index/interfaces.py:44