文档路由与管线编排
文档路由与管线编排
相关源文件
本章引用的主要源码文件:
docs/LightRAG-API-Server-zh.mddocs/LightRAG-API-Server.mdlightrag/api/gunicorn_config.pylightrag/api/routers/document_routes.pylightrag/api/run_with_gunicorn.pylightrag/kg/shared_storage.pylightrag/parser_routing.pylightrag/pipeline.pylightrag/utils_pipeline.pytests/test_api_config_bedrock.pytests/test_document_routes_docx_archive.pytests/test_pipeline_release_closure.py
本文详细介绍了基于 FastAPI 的文档管理路由以及底层多阶段入库管线。系统采用级联工作队列架构来管理文档从原始上传到知识图谱(KG)集成的状态转换。
API 端点与文档管理
document_routes.py 模块定义了文档操作的 RESTful 接口。这些路由与 LightRAG 实例及其 DocStatusStorage 交互,以跟踪每个文件的生命周期。
关键路由
| 端点 | 方法 | 函数 | 描述 |
|---|---|---|---|
/documents/upload | POST | upload_documents | 接收多部分文件上传,对文件名进行消毒处理,并将其移动到工作区输入目录。 |
/documents/scan | POST | scan_documents | 触发对输入目录的后台扫描,以检测新增、修改或删除的文件。 |
/documents/insert_text | POST | insert_text | 直接将原始文本内容插入管线,无需物理文件。 |
/documents/delete | DELETE | delete_documents | 从存储中移除文档,并触发对关联的知识图谱实体和向量的清理。 |
/documents/status | GET | get_document_status | 返回文档的当前处理状态和元数据。 |
来源:lightrag/api/routers/document_routes.py:17-47、lightrag/api/routers/document_routes.py:465-485、lightrag/api/routers/document_routes.py:734-754
文件名消毒与提示
为防止路径遍历攻击,服务器会通过移除分隔符和控制字符来对文件名进行消毒 lightrag/api/routers/document_routes.py:107-150。LightRAG 支持在文件名中嵌入"解析器提示"(例如 doc.[native-iet].pdf),允许用户在上传时指定解析器引擎和处理选项(如 i 表示图片,t 表示表格)lightrag/parser_routing.py:47-51、lightrag/parser_routing.py:155-162。
管线编排
编排逻辑实现在 _PipelineMixin 中,该混入类被主 LightRAG 类继承。它通过多个异步队列来管理文档的流转。
级联工作队列
管线使用一系列 asyncio.Queue 对象将文档传递给专门的处理器。这种架构允许不同阶段并行处理(例如,在解析一个文档的同时从另一个文档中提取实体)。
数据流图:文档入库管线
来源:lightrag/pipeline.py:173-191、lightrag/pipeline.py:350-410、lightrag/pipeline.py:535-590
处理器阶段
- 解析:根据所选解析器引擎,文档被路由到
q_native、q_mineru或q_docling。处理器将文件转换为通用的中间格式(Sidecar)lightrag/pipeline.py:640-670。 - 分析:
_pipeline_analyze_worker处理多模态内容(图片、表格),如果启用,会使用视觉语言模型(VLM)lightrag/pipeline.py:730-760。 - 提取:
_pipeline_process_worker执行文本片段切分以及由大语言模型(LLM)驱动的实体/关系提取lightrag/pipeline.py:810-850。
锁定策略与并发
为确保跨多个进程(尤其是在使用 Gunicorn 运行时)的数据一致性,LightRAG 通过 shared_storage.py 采用了一套复杂的锁定策略。
管线状态标志
系统在共享内存中维护一个 pipeline_status 字典,包含几个关键标志:
busy:表示管线当前正在处理文档lightrag/api/routers/document_routes.py:157-159。scanning:在目录扫描进行中时设置,以防止并发扫描lightrag/api/routers/document_routes.py:1360-1370。destructive_busy:在修改模式或删除大型数据集等操作期间设置,会阻止所有其他管线活动lightrag/api/routers/document_routes.py:1410-1420。
锁定原语
LightRAG 使用 KeyedUnifiedLock 来提供进程级(multiprocessing.Lock)和协程级(asyncio.Lock)的同步。
代码实体映射:锁定与状态
来源:lightrag/kg/shared_storage.py:137-179、lightrag/kg/shared_storage.py:430-460、lightrag/pipeline.py:43-44
数据一致性与恢复
管线设计为可恢复的。如果服务器崩溃或处理器失败,文档会在 DocStatusStorage 中保持 FAILED 或 PENDING 状态。
- 状态跟踪:每次文档状态转换都会被记录。状态包括
PARSING、ANALYZING、PROCESSING、PROCESSED和FAILEDlightrag/base.py:32-41。 - 扫描恢复:当触发
run_scanning_process时,它会将输入目录中的物理文件与DocStatusStorage进行比对。它会自动重新入队处于FAILED状态或知识图谱中缺失的文档lightrag/api/routers/document_routes.py:1380-1450。 - 内容哈希:文档通过其内容的哈希值(
compute_text_content_hash)进行标识。这可以防止同一文件以不同名称上传时被重复处理lightrag/pipeline.py:71-74。
来源:lightrag/pipeline.py:94-100、lightrag/utils_pipeline.py:142-155、lightrag/api/routers/document_routes.py:1500-1550