Shared 存储与多流程并发
共享存储与多进程并发
相关源文件
本章引用的主要源码文件:
docs/LightRAG-API-Server-zh.mddocs/LightRAG-API-Server.mdlightrag/api/gunicorn_config.pylightrag/api/run_with_gunicorn.pylightrag/kg/shared_storage.pytests/README_WORKSPACE_ISOLATION_TESTS.mdtests/test_api_config_bedrock.pytests/test_dimension_mismatch.pytests/test_graph_storage.pytests/test_neo4j_fulltext_index.pytests/test_no_model_suffix_safety.pytests/test_postgres_index_name.pytests/test_postgres_migration.pytests/test_postgres_retry_integration.pytests/test_qdrant_migration.pytests/test_unified_lock_safety.pytests/test_workspace_isolation.pytests/test_workspace_migration_isolation.pytests/test_write_json_optimization.py
LightRAG 提供了一个健壮的并发控制与共享存储层,旨在支持多进程环境,例如使用 Gunicorn 多工作进程的生产部署。该系统通过专门的锁机制,确保不同工作空间和命名空间之间的数据一致性,同时防止死锁。
共享数据基础设施
共享存储层结合使用 Python 的 multiprocessing.Manager 和 asyncio 原语,在进程边界之间同步状态。
初始化与生命周期
共享数据必须在任何存储操作之前完成初始化。这通常由服务器入口点或在 LightRAG 类实例化时处理。
initialize_share_data(workers: int = 1):配置全局状态。如果workers > 1,则会初始化一个multiprocessing.Manager,为字典和锁创建共享代理lightrag/kg/shared_storage.py:530-545。finalize_share_data():清理管理器并释放全局资源lightrag/kg/shared_storage.py:598-614。- Gunicorn 集成:在 Gunicorn 模式下,初始化在主进程中通过
on_starting进行,以便派生的工作进程可以继承共享的管理器代理lightrag/api/run_with_gunicorn.py:108-113,lightrag/api/gunicorn_config.py:95-123。
进程同步与协程同步
为了处理运行异步代码的多进程工作进程的复杂性,LightRAG 使用了一种"双锁"策略:
- 进程锁:一个
multiprocessing.Lock(通过 Manager)用于跨操作系统进程同步。 - 异步锁:一个
asyncio.Lock用于在单个进程内同步协程,防止当多个任务尝试访问同一资源时,单个工作进程阻塞自身lightrag/kg/shared_storage.py:158-164。
统一锁系统
LightRAG 将进程锁与线程锁的复杂性抽象为一个统一接口。
UnifiedLock
UnifiedLock 类提供了一个一致的 async with 接口,无论底层锁是标准的 asyncio.Lock 还是多进程的 ProcessLock lightrag/kg/shared_storage.py:137-147。它管理内部异步锁和跨进程锁的获取,以确保安全性 lightrag/kg/shared_storage.py:155-179。
KeyedUnifiedLock
一个基于注册表的锁系统,允许通过特定的字符串键(例如,文档 ID 或特定的命名空间)进行锁定。它包含一个自动清理机制,以防止未使用的锁对象导致内存泄漏 lightrag/kg/shared_storage.py:236-250。
- 清理逻辑:当键的数量超过
CLEANUP_THRESHOLD(500)且距离上次清理至少过去了MIN_CLEANUP_INTERVAL_SECONDS(30 秒)时触发lightrag/kg/shared_storage.py:68-71。 - 引用计数:跟踪有多少进程正在等待特定键,以确保锁仅在空闲时被删除
lightrag/kg/shared_storage.py:273-285。
NamespaceLock
代码库中最常用的锁,NamespaceLock 为特定的存储组件(KV、向量、图)提供工作空间隔离的锁定 lightrag/kg/shared_storage.py:445-455。
| 函数 | 用途 | 来源 |
|---|---|---|
get_namespace_lock | 返回工作空间内特定命名空间的 UnifiedLock。 | lightrag/kg/shared_storage.py:504-515 |
get_data_init_lock | 一个全局锁,专门用于初始化存储后端(例如,创建 SQL 表)。 | lightrag/kg/shared_storage.py:488-501 |
管线状态与标志
共享状态维护在 _shared_dicts 中,这使得所有进程都能看到 RAG 管线的当前状态。
管线状态标志
pipeline_status 命名空间跟踪工作空间的运行状态 lightrag/kg/shared_storage.py:650-655:
busy:在标准插入或查询操作期间设置为True。destructive_busy:在修改模式或执行批量删除的操作期间设置。request_pending:表示有一个高优先级操作正在等待当前任务完成。
更新标志
用于跨工作进程发出缓存失效或需要从磁盘重新加载数据的信号。
set_all_update_flags(workspace):将所有命名空间标记为"已更新"lightrag/kg/shared_storage.py:768-775。get_update_flag(namespace, workspace):检查特定存储组件是否需要刷新其本地内存缓存lightrag/kg/shared_storage.py:797-805。
架构图
并发控制流程
此图说明了来自 Gunicorn 工作进程的请求如何与 SharedStorage 层交互以获取锁。
标题:多进程锁获取流程
来源:lightrag/kg/shared_storage.py:261-300,lightrag/kg/shared_storage.py:155-179
工作空间与命名空间隔离
此图将"命名空间"的逻辑概念桥接到底层共享数据结构。
标题:共享数据结构映射
来源:lightrag/kg/shared_storage.py:99-112,lightrag/kg/shared_storage.py:650-660,lightrag/kg/shared_storage.py:716-725
工作空间隔离
LightRAG 使用 final_namespace 模式在工作空间之间强制执行严格隔离:workspace:namespace lightrag/kg/shared_storage.py:111-112。
- 数据隔离:
get_namespace_data使用带前缀的键从_shared_dicts中检索值,确保工作空间 A 的pipeline_status不能被工作空间 B 修改lightrag/kg/shared_storage.py:716-725。 - 锁隔离:
get_namespace_lock使用相同的前缀,允许工作空间 A 执行插入操作(锁定kv_storage),同时工作空间 B 执行自己的插入操作而不会产生争用lightrag/kg/shared_storage.py:504-515。
关键实现细节
锁清理
为了防止 _lock_registry 在长时间运行的服务器中无限增长,KeyedUnifiedLock 会对引用计数为零且锁空闲时间超过 CLEANUP_KEYED_LOCKS_AFTER_SECONDS(300 秒)的键执行清理 lightrag/kg/shared_storage.py:67,lightrag/kg/shared_storage.py:383-405。
错误处理
如果在调用 initialize_share_data 之前尝试执行操作,系统会抛出 PipelineNotInitializedError lightrag/kg/shared_storage.py:12,lightrag/kg/shared_storage.py:518-520。
日志记录
由于标准日志记录在多进程分叉中可能会被缓冲或丢失,因此提供了 direct_log,用于将关键的同步事件直接写入 sys.stderr 并启用刷新 lightrag/kg/shared_storage.py:18-51。
来源:
lightrag/kg/shared_storage.py:1-805lightrag/api/run_with_gunicorn.py:108-113lightrag/api/gunicorn_config.py:95-138tests/test_workspace_isolation.py:151-195