并发与速率限制管理
并发与速率限制管理
相关源文件
以下文件为本维基页面的生成提供了上下文:
graphiti_core/driver/driver.pygraphiti_core/driver/falkordb_driver.pygraphiti_core/driver/neo4j_driver.pygraphiti_core/graphiti.pygraphiti_core/helpers.pygraphiti_core/utils/bulk_utils.py
本文档说明了 Graphiti 如何管理并发操作和大语言模型(LLM)提供商的速率限制。该系统采用基于信号量的并发控制机制,在吞吐量与 API 配额约束之间取得平衡,在防止 429 速率限制错误的同时最大化入库性能。
目的与范围
Graphiti 的入库管线本质上是并行的:每个片段都需要多次调用大语言模型(LLM)来完成实体提取、去重、属性提取和摘要生成。如果没有并发控制,即使处理少量片段批次也可能超出大语言模型(LLM)提供商的速率限制。本文档涵盖以下内容:
SEMAPHORE_LIMIT环境变量及其在代码库中的使用方式。semaphore_gather辅助函数及其在限制异步操作中的作用。- 批量工具中并发处理的实现细节。
- 提供商特定的速率限制错误及错误处理。
信号量控制系统
Graphiti 实现了集中式并发控制机制,以防止资源耗尽和速率限制。
配置
并发限制主要由环境变量驱动,并在辅助模块中进行解析。
| 变量 | 默认值 | 用途 |
|---|---|---|
SEMAPHORE_LIMIT | 20 | 全局并发异步任务限制。 |
USE_PARALLEL_RUNTIME | False | 并行执行模式的开关。 |
SEMAPHORE_LIMIT 在 graphiti_core/helpers.py graphiti_core/helpers.py:38-38 中初始化。它也可以在 Graphiti 实例级别通过构造函数中的 max_coroutines 参数进行覆盖 graphiti_core/graphiti.py:181-182。
semaphore_gather 工具函数
Graphiti 中所有高并发操作都通过 semaphore_gather graphiti_core/helpers.py:123-133 进行汇聚。该工具函数封装了 asyncio.gather,并使用以配置限制或提供的覆盖值初始化的 asyncio.Semaphore graphiti_core/helpers.py:127-133。
代码实体映射:信号量实现
来源:graphiti_core/helpers.py:37-38, graphiti_core/helpers.py:123-133, graphiti_core/graphiti.py:181-182
数据操作中的并发
批量片段检索
在处理片段批次时,Graphiti 使用 retrieve_previous_episodes_bulk graphiti_core/utils/bulk_utils.py:110-125 同时检索多个节点的上下文。该函数使用 semaphore_gather 来限制片段窗口的并发数据库查询数量 graphiti_core/utils/bulk_utils.py:113-120。
持久化与嵌入向量
在批量持久化过程中,Graphiti 会为实体和边生成嵌入向量。这些操作通常在事务函数内的循环中执行。例如,add_nodes_and_edges_bulk_tx 会遍历实体节点和边,使用 EmbedderClient 生成嵌入向量 graphiti_core/utils/bulk_utils.py:167-169, graphiti_core/utils/bulk_utils.py:193-194。
数据流:批量并发编排
来源:graphiti_core/utils/bulk_utils.py:113-125, graphiti_core/utils/maintenance/graph_data_operations.py:67-74
大语言模型(LLM)提供商速率限制
错误处理
Graphiti 定义了一个特定的 RateLimitError 来处理提供商侧的限流 graphiti_core/llm_client/errors.py:18-23。大语言模型(LLM)实现(例如 AnthropicClient)被设计为捕获提供商特定的异常(如 anthropic.RateLimitError)并抛出 Graphiti 内部的 RateLimitError tests/llm_client/test_anthropic_client.py:159-176。
提供商调优指南
基于常见的速率限制,建议采用以下 SEMAPHORE_LIMIT 设置:
| 提供商 | 推荐 SEMAPHORE_LIMIT | 理由 |
|---|---|---|
| OpenAI(Tier 3+) | 20(默认值) | 高 RPM 限制允许大量并行化。 |
| Anthropic | 10-15 | 由于标准层级 RPM 较低,采取保守策略。 |
| Groq | 5-10 | 推理速度快但 RPM 限制严格。 |
| 本地大语言模型(LLM) | 1-3 | 受本地硬件显存和计算能力限制。 |
来源:graphiti_core/helpers.py:38-38, graphiti_core/llm_client/errors.py:18-23
实现细节
限制协程
semaphore_gather 函数确保活动协程数量不超过限制。它创建一个包装器,在执行目标协程之前获取信号量。
# graphiti_core/helpers.py:123-133
async def semaphore_gather(
*coroutines: Coroutine,
max_coroutines: int | None = None,
) -> list[Any]:
semaphore = asyncio.Semaphore(max_coroutines or SEMAPHORE_LIMIT)
async def _wrap_coroutine(coroutine):
async with semaphore:
return await coroutine
return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
数据库驱动程序使用
像 Neo4jDriver 这样的驱动程序在初始化期间使用 semaphore_gather 来跨多个标签构建索引和约束,而不会压垮数据库连接池 graphiti_core/driver/neo4j_driver.py:91-101。类似地,FalkorDriver 也使用 semaphore_gather 来构建索引和约束 graphiti_core/driver/falkordb_driver.py:200-201。
来源:graphiti_core/helpers.py:123-133, graphiti_core/driver/neo4j_driver.py:91-101, graphiti_core/driver/falkordb_driver.py:200-201
内容片段切分与密度
除了请求并发之外,Graphiti 还通过基于密度的片段切分来管理实体提取的"内部"并发。如果某个片段的内容过于密集(由 CHUNK_DENSITY_THRESHOLD 决定),则会被拆分为多个片段,以避免大语言模型(LLM)提取失败 graphiti_core/helpers.py:45-55。
| 设置项 | 值 | 描述 |
|---|---|---|
CHUNK_TOKEN_SIZE | 3000 | 高密度内容片段的目标大小。 |
CHUNK_MIN_TOKENS | 1000 | 考虑进行片段切分前所需的最小 Token 数。 |
CHUNK_DENSITY_THRESHOLD | 0.15 | 每 1000 个 Token 的元素数量阈值,超过此值将触发片段切分。 |
来源:graphiti_core/helpers.py:41-55