agentic_huge_data_base / wiki
页面 Cognee · 5.7 子进程工作器与资源隔离·DeepWiki 中文全文译文

5.7 · 子进程工作器与资源隔离(Subprocess Workers and Resource Isolation)

记忆管道与知识图谱构建 · 聚焦本章的模块关系、源码依据与实现要点。

项目Cognee 章节5.7 状态全文译文 模块接口与服务契约、测试、发布与运维、存储与持久化、检索、召回与索引
源码线索
  • cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py
  • cognee/infrastructure/databases/utils/closing_lru_cache.py
  • cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py
  • cognee/tests/integration/infrastructure/graph/test_kuzu_adapter.py
  • cognee/tests/subprocesses/common.py
  • cognee/tests/subprocesses/reader.py
  • cognee/tests/subprocesses/simple_cognify_1.py
  • cognee/tests/subprocesses/simple_cognify_2.py
  • cognee/tests/subprocesses/writer.py
  • cognee/tests/test_concurrent_subprocess_access.py
模块标签
  • 接口与服务契约
  • 测试、发布与运维
  • 存储与持久化
  • 检索、召回与索引
  • 界面与交互

章节正文

子进程工作器与资源隔离

子进程工作器与资源隔离

相关源文件

本章引用的主要源码文件:

  • cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py
  • cognee/infrastructure/databases/utils/closing_lru_cache.py
  • cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py
  • cognee/tests/integration/infrastructure/graph/test_kuzu_adapter.py
  • cognee/tests/subprocesses/common.py
  • cognee/tests/subprocesses/reader.py
  • cognee/tests/subprocesses/simple_cognify_1.py
  • cognee/tests/subprocesses/simple_cognify_2.py
  • cognee/tests/subprocesses/writer.py
  • cognee/tests/test_concurrent_subprocess_access.py
  • cognee/tests/test_subprocess_rss.py
  • cognee/tests/unit/infrastructure/databases/graph/test_kuzu_subprocess_proxy.py
  • cognee/tests/unit/infrastructure/databases/test_closing_lru_cache.py
  • cognee/tests/unit/infrastructure/databases/test_subprocess_session_failures.py
  • cognee/tests/unit/infrastructure/databases/test_worker_import_hygiene.py
  • cognee_db_workers/harness.py
  • cognee_db_workers/kuzu_protocol.py
  • cognee_db_workers/kuzu_worker.py
  • cognee_db_workers/lancedb_protocol.py
  • cognee_db_workers/lancedb_worker.py

Cognee 实现了一套健壮的资源隔离与并发架构,旨在处理多进程环境中原生数据库库(如 Kùzu 和 LanceDB)的复杂性。该系统可以防止内存泄漏,处理原生崩溃而不影响主应用程序,并管理跨子进程边界的文件锁。

1. 子进程工作器框架(cognee_db_workers

cognee_db_workers 包提供了一个通用的、仅依赖标准库的框架,用于在隔离的子进程中运行数据库操作。这种设计确保工作器进程永远不会导入主 cognee 包,从而防止循环依赖并保持工作器内存占用最小化。

关键组件
  • SubprocessSession:主进程控制器,管理工作器的生命周期,包括生成、RPC 分发和崩溃恢复 cognee_db_workers/harness.py:301-315
  • Request / Response:通过 multiprocessing.Queue 进行进程间通信的数据类 cognee_db_workers/harness.py:84-98
  • HandleRegistry:工作器端的注册表,将整数 ID 映射到原生对象(如数据库连接)。这使得主进程可以引用复杂的原生对象,而无需对其进行序列化 cognee_db_workers/harness.py:127-133
  • ReplayStep:一种机制,用于在工作器进程崩溃并需要重新生成时重建工作器状态(例如,重新打开数据库或重新加载扩展)cognee_db_workers/harness.py:109-125
子进程数据流

下图展示了主进程与工作器进程之间的 RPC 流程。

图 1:子进程 RPC 架构

Cognee · 子进程数据流 · 图 1
Cognee · 子进程数据流 · 图 1

来源:cognee_db_workers/harness.py:84-156cognee_db_workers/kuzu_worker.py:159-165

2. 数据库子进程代理

Cognee 使用代理类来镜像 Kùzu 和 LanceDB 的 API,同时透明地将调用路由到隔离的工作器。

Kùzu(Ladybug)代理

KuzuSubprocessSessionRemoteKuzuDatabase 类允许 Cognee 与 Kùzu 交互,而无需将 Kùzu 库驻留在主进程内存中。

  • RemoteKuzuDatabase:代理 ladybug.Database 并处理 init_database 调用 cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:101-110
  • RemoteKuzuConnection:代理 ladybug.Connection 并管理查询执行 cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:199-205
  • _Materialized:查询结果的内存中副本,允许主进程迭代从工作器获取的行 cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:76-85
LanceDB 代理

LanceDB 代理处理 LanceDB 复杂的流式构建器 API(例如 table.vector_search().limit().to_list())。

  • _BuilderChain:在主端累积构建器调用,并在终端方法(如 .to_list().execute())上分发单个 RPC cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py:78-89
  • RemoteLanceDBTable:管理工作器端的表句柄,并确保在代理被垃圾回收时释放它们,以防止工作器中的内存泄漏 cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py:153-160

来源:cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:33-40cognee/infrastructure/databases/vector/lancedb/subprocess/proxy.py:39-41

3. 资源管理:ClosingLRUCache

为了在不超过文件描述符限制或内存边界的情况下管理跨多个数据集的数据库连接,Cognee 使用了一个专门的 ClosingLRUCache

与标准 LRU 缓存不同,ClosingLRUCache 确保当对象(如数据库适配器)被驱逐时,会调用其 close() 方法。然而,它使用了一种租约模式:如果主进程仍在持有该对象的代理,则 close() 调用会被延迟,直到代理也被释放 cognee/infrastructure/databases/utils/closing_lru_cache.py:1-10

缓存生命周期
  1. 创建get_or_create 返回一个 _LeasedValueProxy cognee/infrastructure/databases/utils/closing_lru_cache.py:51-57
  2. 驱逐:如果缓存超过 maxsize,最旧的条目会被标记为待关闭 cognee/infrastructure/databases/utils/closing_lru_cache.py:81-93
  3. 延迟关闭:如果代理仍被某个函数持有,则对象保持打开状态。一旦代理被垃圾回收,_LeasedCacheEntry 会触发实际的 close()(同步或异步)cognee/infrastructure/databases/utils/closing_lru_cache.py:120-130

图 2:ClosingLRUCache 生命周期

Cognee · 缓存生命周期 · 图 2
Cognee · 缓存生命周期 · 图 2

来源:cognee/infrastructure/databases/utils/closing_lru_cache.py:101-150cognee/infrastructure/databases/utils/closing_lru_cache.py:200-210

4. 并发与文件锁管理

Cognee 提供了机制来确保多个子进程(例如,独立的 cognify 任务)在访问相同的基于文件的数据库时不会发生冲突。

Kùzu 共享锁

Kùzu(Ladybug)需要对数据库文件进行独占访问。Cognee 利用基于 Redis 的共享锁来序列化访问,当多个子进程尝试读取或写入同一个 Kùzu 实例时 cognee/tests/test_concurrent_subprocess_access.py:12-19

子进程稳定性特性
  • 看门狗:轮询看门狗或 pdeathsig(在 Linux 上)确保如果父进程意外死亡,工作器进程会终止,从而防止容器化环境中出现"僵尸"工作器 cognee_db_workers/harness.py:193-204
  • 内存监控:系统可以跟踪工作器的常驻集大小(RSS),以检测原生库中的内存膨胀 cognee/infrastructure/databases/graph/kuzu/subprocess/proxy.py:70-73
  • 信号提示:当工作器死亡时,框架会将退出代码解码为人类可读的提示(例如,SIGKILL 对应"可能是 OOM 杀死"),以简化调试 cognee_db_workers/harness.py:162-170
并发访问模式

系统针对以下场景进行了测试:

  1. 写入器子进程和读取器子进程同时访问数据库 cognee/tests/test_concurrent_subprocess_access.py:33-37
  2. 多个 Cognify 任务在不同数据集上并行运行 cognee/tests/test_concurrent_subprocess_access.py:61-68

来源:cognee/tests/test_concurrent_subprocess_access.py:22-45cognee_db_workers/harness.py:725-740