子进程工作器与资源隔离
子进程工作器与资源隔离
相关源文件
本章引用的主要源码文件:
cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.pycognee/infrastructure/databases/utils/closing_lru_cache.pycognee/infrastructure/databases/vector/lancedb/subprocess/proxy.pycognee/tests/integration/infrastructure/graph/test_kuzu_adapter.pycognee/tests/subprocesses/common.pycognee/tests/subprocesses/reader.pycognee/tests/subprocesses/simple_cognify_1.pycognee/tests/subprocesses/simple_cognify_2.pycognee/tests/subprocesses/writer.pycognee/tests/test_concurrent_subprocess_access.pycognee/tests/test_subprocess_rss.pycognee/tests/unit/infrastructure/databases/graph/test_kuzu_subprocess_proxy.pycognee/tests/unit/infrastructure/databases/test_closing_lru_cache.pycognee/tests/unit/infrastructure/databases/test_subprocess_session_failures.pycognee/tests/unit/infrastructure/databases/test_worker_import_hygiene.pycognee_db_workers/harness.pycognee_db_workers/kuzu_protocol.pycognee_db_workers/kuzu_worker.pycognee_db_workers/lancedb_protocol.pycognee_db_workers/lancedb_worker.py
Cognee 实现了一套健壮的资源隔离与并发架构,旨在处理多进程环境中原生数据库库(如 Kùzu 和 LanceDB)的复杂性。该系统可以防止内存泄漏,处理原生崩溃而不影响主应用程序,并管理跨子进程边界的文件锁。
1. 子进程工作器框架(cognee_db_workers)
cognee_db_workers 包提供了一个通用的、仅依赖标准库的框架,用于在隔离的子进程中运行数据库操作。这种设计确保工作器进程永远不会导入主 cognee 包,从而防止循环依赖并保持工作器内存占用最小化。
关键组件
SubprocessSession:主进程控制器,管理工作器的生命周期,包括生成、RPC 分发和崩溃恢复cognee_db_workers/harness.py:301-315。Request/Response:通过multiprocessing.Queue进行进程间通信的数据类cognee_db_workers/harness.py:84-98。HandleRegistry:工作器端的注册表,将整数 ID 映射到原生对象(如数据库连接)。这使得主进程可以引用复杂的原生对象,而无需对其进行序列化cognee_db_workers/harness.py:127-133。ReplayStep:一种机制,用于在工作器进程崩溃并需要重新生成时重建工作器状态(例如,重新打开数据库或重新加载扩展)cognee_db_workers/harness.py:109-125。
子进程数据流
下图展示了主进程与工作器进程之间的 RPC 流程。
图 1:子进程 RPC 架构
来源:cognee_db_workers/harness.py:84-156,cognee_db_workers/kuzu_worker.py:159-165
2. 数据库子进程代理
Cognee 使用代理类来镜像 Kùzu 和 LanceDB 的 API,同时透明地将调用路由到隔离的工作器。
Kùzu(Ladybug)代理
KuzuSubprocessSession 和 RemoteKuzuDatabase 类允许 Cognee 与 Kùzu 交互,而无需将 Kùzu 库驻留在主进程内存中。
RemoteKuzuDatabase:代理ladybug.Database并处理init_database调用cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:101-110。RemoteKuzuConnection:代理ladybug.Connection并管理查询执行cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:199-205。_Materialized:查询结果的内存中副本,允许主进程迭代从工作器获取的行cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:76-85。
LanceDB 代理
LanceDB 代理处理 LanceDB 复杂的流式构建器 API(例如 table.vector_search().limit().to_list())。
_BuilderChain:在主端累积构建器调用,并在终端方法(如.to_list()或.execute())上分发单个 RPCcognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py:78-89。RemoteLanceDBTable:管理工作器端的表句柄,并确保在代理被垃圾回收时释放它们,以防止工作器中的内存泄漏cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py:153-160。
来源:cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:33-40,cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py:39-41
3. 资源管理:ClosingLRUCache
为了在不超过文件描述符限制或内存边界的情况下管理跨多个数据集的数据库连接,Cognee 使用了一个专门的 ClosingLRUCache。
与标准 LRU 缓存不同,ClosingLRUCache 确保当对象(如数据库适配器)被驱逐时,会调用其 close() 方法。然而,它使用了一种租约模式:如果主进程仍在持有该对象的代理,则 close() 调用会被延迟,直到代理也被释放 cognee/infrastructure/databases/utils/closing_lru_cache.py:1-10。
缓存生命周期
- 创建:
get_or_create返回一个_LeasedValueProxycognee/infrastructure/databases/utils/closing_lru_cache.py:51-57。 - 驱逐:如果缓存超过
maxsize,最旧的条目会被标记为待关闭cognee/infrastructure/databases/utils/closing_lru_cache.py:81-93。 - 延迟关闭:如果代理仍被某个函数持有,则对象保持打开状态。一旦代理被垃圾回收,
_LeasedCacheEntry会触发实际的close()(同步或异步)cognee/infrastructure/databases/utils/closing_lru_cache.py:120-130。
图 2:ClosingLRUCache 生命周期
来源:cognee/infrastructure/databases/utils/closing_lru_cache.py:101-150,cognee/infrastructure/databases/utils/closing_lru_cache.py:200-210
4. 并发与文件锁管理
Cognee 提供了机制来确保多个子进程(例如,独立的 cognify 任务)在访问相同的基于文件的数据库时不会发生冲突。
Kùzu 共享锁
Kùzu(Ladybug)需要对数据库文件进行独占访问。Cognee 利用基于 Redis 的共享锁来序列化访问,当多个子进程尝试读取或写入同一个 Kùzu 实例时 cognee/tests/test_concurrent_subprocess_access.py:12-19。
子进程稳定性特性
- 看门狗:轮询看门狗或
pdeathsig(在 Linux 上)确保如果父进程意外死亡,工作器进程会终止,从而防止容器化环境中出现"僵尸"工作器cognee_db_workers/harness.py:193-204。 - 内存监控:系统可以跟踪工作器的常驻集大小(RSS),以检测原生库中的内存膨胀
cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:70-73。 - 信号提示:当工作器死亡时,框架会将退出代码解码为人类可读的提示(例如,
SIGKILL对应"可能是 OOM 杀死"),以简化调试cognee_db_workers/harness.py:162-170。
并发访问模式
系统针对以下场景进行了测试:
- 写入器子进程和读取器子进程同时访问数据库
cognee/tests/test_concurrent_subprocess_access.py:33-37。 - 多个 Cognify 任务在不同数据集上并行运行
cognee/tests/test_concurrent_subprocess_access.py:61-68。
来源:cognee/tests/test_concurrent_subprocess_access.py:22-45,cognee_db_workers/harness.py:725-740