分布式模式
分布式模式
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
.devcontainer/devcontainer.json.github/workflows/distributed_test.yml.github/workflows/temporal_graph_tests.ymlcognee/infrastructure/utils/run_sync.pycognee/modules/data/methods/get_authorized_existing_datasets.pycognee/modules/data/methods/load_or_create_datasets.pycognee/modules/pipelines/layers/pipeline_execution_mode.pycognee/modules/pipelines/methods/__init__.pycognee/modules/pipelines/operations/run_tasks_distributed.pycognee/modules/users/permissions/methods/give_permission_on_dataset.pycognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.pycognee/tests/test_temporal_graph.pydeployment/helm/Dockerfiledeployment/setup_ubuntu_instance.shdistributed/Dockerfiledistributed/deploy/README.mddistributed/deploy/daytona.yamldistributed/deploy/daytona_sandbox.pydistributed/deploy/devcontainer.jsondistributed/deploy/fly-deploy.shdistributed/deploy/fly.tomldistributed/deploy/modal-deploy.shdistributed/deploy/modal_app.pydistributed/deploy/railway-template.jsondistributed/deploy/railway.tomldistributed/deploy/render.yamldistributed/entrypoint.pydistributed/tasks/queued_add_data_points.pydistributed/tasks/queued_add_edges.pydistributed/tasks/queued_add_nodes.pydistributed/workers/data_point_saving_worker.pydistributed/workers/graph_saving_worker.pytools/check-lockfile.py
Cognee 的分布式模式通过利用 Modal 实现无服务器执行,从而支持高性能、并行化的数据处理。该架构使 Cognee 能够跨多个容器扩展知识图谱构建和嵌入向量任务,突破了单机处理大数据集的限制。
架构总览
分布式执行模型将管线任务(提取、片段切分和大语言模型操作)的重负载处理与最终的数据库持久化分离。这是通过使用 Modal 的分布式队列,采用生产者-消费者模式实现的。
核心组件
run_tasks_distributed:分布式管线执行的入口点。它负责将数据项映射到远程 Modal 函数cognee/modules/pipelines/operations/run_tasks_distributed.py:84-92。run_tasks_on_modal:一个 Modal 装饰的函数,在远程容器内执行run_tasks_data_itemcognee/modules/pipelines/operations/run_tasks_distributed.py:40-57。- 分布式队列:Modal 支持的队列(
add_nodes_and_edges_queue、add_data_points_queue),用于将处理后的图谱元素和向量数据从工作节点传输到专门的保存工作节点distributed/entrypoint.py:10-12。 - 保存工作节点:专门的消费者,负责批量处理并将数据持久化到图谱数据库和向量数据库
distributed/workers/graph_saving_worker.py:51-53,distributed/workers/data_point_saving_worker.py:53-55。
分布式数据流
下图展示了数据集如何从本地入库流转到分布式处理,最终到达数据库持久化。
图表:分布式管线执行流程
来源:cognee/modules/pipelines/operations/run_tasks_distributed.py:133-137,distributed/entrypoint.py:41-48,distributed/workers/graph_saving_worker.py:69-71。
任务编排
run_tasks_distributed
该函数启动分布式流程。它会解析数据目录,通过 log_pipeline_run_start 记录管线启动,并使用 run_tasks_on_modal.map.aio 将工作分发到各个数据项 cognee/modules/pipelines/operations/run_tasks_distributed.py:101-134。它会使用 PipelineRunStarted、PipelineRunCompleted 和 PipelineRunErrored 模型生成状态更新 cognee/modules/pipelines/operations/run_tasks_distributed.py:105-175。
run_tasks_on_modal
这是在 Modal 上运行的执行包装器。其配置如下:
- 重试次数:3 次
cognee/modules/pipelines/operations/run_tasks_distributed.py:41。 - 最大容器数:最多 50 个并发容器
cognee/modules/pipelines/operations/run_tasks_distributed.py:44。 - 超时时间:24 小时(86400 秒)
cognee/modules/pipelines/operations/run_tasks_distributed.py:43。
来源:cognee/modules/pipelines/operations/run_tasks_distributed.py:40-57。
工作节点架构
Cognee 使用专用工作节点来处理数据持久化,以应对数据库特定的约束,如死锁和连接开销。
图谱保存工作节点(graph_saving_worker)
该工作节点从 add_nodes_and_edges_queue 消费数据。它会将传入的数据批量处理(默认 BATCH_SIZE = 25),然后调用 graph_engine.add_nodes 和 graph_engine.add_edges distributed/workers/graph_saving_worker.py:51-70。
- 并发性:限制为
max_containers=1,以防止在 Neo4j 或 Kuzu 等图谱数据库中出现写入冲突和死锁distributed/workers/graph_saving_worker.py:48。 - 死锁处理:针对
GraphDatabaseDeadlockError实现了带有指数退避的重试机制distributed/workers/graph_saving_worker.py:22-38。
数据点保存工作节点(data_point_saving_worker)
该工作节点通过 add_data_points_queue 处理向量数据和元数据的持久化。
- 并发性:可以扩展到最多
max_containers=10distributed/workers/data_point_saving_worker.py:50。 - 批量处理:将请求收集到
batched_points字典中,按集合名称分组,以优化vector_engine.create_data_points的调用distributed/workers/data_point_saving_worker.py:71-106。
来源:distributed/workers/graph_saving_worker.py:44-55,distributed/workers/data_point_saving_worker.py:46-57。
代码实体映射
下图将高层分布式概念与代码库中的具体类和函数联系起来。
图表:分布式模式代码实体
来源:cognee/modules/pipelines/operations/run_tasks_distributed.py:33-47,distributed/entrypoint.py:9-12,cognee/modules/pipelines/layers/pipeline_execution_mode.py:48-51。
部署与配置
环境变量
要启用分布式模式,需要设置以下环境变量:
COGNEE_DISTRIBUTED:设置为Truedistributed/entrypoint.py:18。MODAL_SECRET_NAME:包含数据库凭据的 Modal 密钥名称(默认为distributed_cognee)cognee/modules/pipelines/operations/run_tasks_distributed.py:38。RUN_MODE:在分布式 Dockerfile 中设置为modaldistributed/Dockerfile:5。
Daytona 集成
Cognee 可以使用 Daytona 部署在隔离的云沙箱中。distributed/deploy/daytona_sandbox.py 中的 deploy_cognee 函数会自动创建基于 Debian 的沙箱,通过 pip install 'cognee[api]' 安装 Cognee,并启动 Uvicorn 服务器 distributed/deploy/daytona_sandbox.py:57-92。
多云部署选项
Cognee 为多个平台提供了一键部署配置:
| 平台 | 部署机制 | 配置文件 |
|---|---|---|
| Modal | 无服务器 ASGI 应用 | distributed/deploy/modal_app.py distributed/deploy/modal_app.py:20-22 |
| Railway | 原生 Postgres + API | distributed/deploy/railway.toml distributed/deploy/README.md:43-46 |
| Fly.io | 带卷的边缘虚拟机 | distributed/deploy/fly.toml distributed/deploy/README.md:68-71 |
| Render | 蓝图(API + 数据库) | distributed/deploy/render.yaml distributed/deploy/README.md:90-95 |
来源:distributed/deploy/daytona_sandbox.py:57-118,distributed/deploy/modal_app.py:1-18,distributed/deploy/README.md:1-15。