agentic_huge_data_base / wiki
页面 Graphiti · 10.6 并发与速率限制管理·DeepWiki 中文全文译文

10.6 · 并发与速率限制管理

时序知识图谱与动态事实记忆 · 聚焦本章的模块关系、源码依据与实现要点。

项目Graphiti 章节10.6 状态全文译文 模块模型调用与提供方适配、图谱与关系、存储与持久化、检索、召回与索引
源码线索
  • graphiti_core/driver/driver.py
  • graphiti_core/driver/falkordb_driver.py
  • graphiti_core/driver/neo4j_driver.py
  • graphiti_core/graphiti.py
  • graphiti_core/helpers.py
  • graphiti_core/utils/bulk_utils.py
模块标签
  • 模型调用与提供方适配
  • 图谱与关系
  • 存储与持久化
  • 检索、召回与索引
  • 界面与交互

章节正文

并发与速率限制管理

并发与速率限制管理

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • graphiti_core/driver/driver.py
  • graphiti_core/driver/falkordb_driver.py
  • graphiti_core/driver/neo4j_driver.py
  • graphiti_core/graphiti.py
  • graphiti_core/helpers.py
  • graphiti_core/utils/bulk_utils.py

本文档说明了 Graphiti 如何管理并发操作和大语言模型(LLM)提供商的速率限制。该系统采用基于信号量的并发控制机制,在吞吐量与 API 配额约束之间取得平衡,在防止 429 速率限制错误的同时最大化入库性能。

目的与范围

Graphiti 的入库管线本质上是并行的:每个片段都需要多次调用大语言模型(LLM)来完成实体提取、去重、属性提取和摘要生成。如果没有并发控制,即使处理少量片段批次也可能超出大语言模型(LLM)提供商的速率限制。本文档涵盖以下内容:

  • SEMAPHORE_LIMIT 环境变量及其在代码库中的使用方式。
  • semaphore_gather 辅助函数及其在限制异步操作中的作用。
  • 批量工具中并发处理的实现细节。
  • 提供商特定的速率限制错误及错误处理。

信号量控制系统

Graphiti 实现了集中式并发控制机制,以防止资源耗尽和速率限制。

配置

并发限制主要由环境变量驱动,并在辅助模块中进行解析。

变量默认值用途
SEMAPHORE_LIMIT20全局并发异步任务限制。
USE_PARALLEL_RUNTIMEFalse并行执行模式的开关。

SEMAPHORE_LIMITgraphiti_core/helpers.py graphiti_core/helpers.py:38-38 中初始化。它也可以在 Graphiti 实例级别通过构造函数中的 max_coroutines 参数进行覆盖 graphiti_core/graphiti.py:181-182

semaphore_gather 工具函数

Graphiti 中所有高并发操作都通过 semaphore_gather graphiti_core/helpers.py:123-133 进行汇聚。该工具函数封装了 asyncio.gather,并使用以配置限制或提供的覆盖值初始化的 asyncio.Semaphore graphiti_core/helpers.py:127-133

代码实体映射:信号量实现

Graphiti · semaphore_gather 工具函数 · 图 1
Graphiti · semaphore_gather 工具函数 · 图 1

来源:graphiti_core/helpers.py:37-38, graphiti_core/helpers.py:123-133, graphiti_core/graphiti.py:181-182

数据操作中的并发

批量片段检索

在处理片段批次时,Graphiti 使用 retrieve_previous_episodes_bulk graphiti_core/utils/bulk_utils.py:110-125 同时检索多个节点的上下文。该函数使用 semaphore_gather 来限制片段窗口的并发数据库查询数量 graphiti_core/utils/bulk_utils.py:113-120

持久化与嵌入向量

在批量持久化过程中,Graphiti 会为实体和边生成嵌入向量。这些操作通常在事务函数内的循环中执行。例如,add_nodes_and_edges_bulk_tx 会遍历实体节点和边,使用 EmbedderClient 生成嵌入向量 graphiti_core/utils/bulk_utils.py:167-169, graphiti_core/utils/bulk_utils.py:193-194

数据流:批量并发编排

Graphiti · 持久化与嵌入向量 · 图 2
Graphiti · 持久化与嵌入向量 · 图 2

来源:graphiti_core/utils/bulk_utils.py:113-125, graphiti_core/utils/maintenance/graph_data_operations.py:67-74

大语言模型(LLM)提供商速率限制

错误处理

Graphiti 定义了一个特定的 RateLimitError 来处理提供商侧的限流 graphiti_core/llm_client/errors.py:18-23。大语言模型(LLM)实现(例如 AnthropicClient)被设计为捕获提供商特定的异常(如 anthropic.RateLimitError)并抛出 Graphiti 内部的 RateLimitError tests/llm_client/test_anthropic_client.py:159-176

提供商调优指南

基于常见的速率限制,建议采用以下 SEMAPHORE_LIMIT 设置:

提供商推荐 SEMAPHORE_LIMIT理由
OpenAI(Tier 3+)20(默认值)高 RPM 限制允许大量并行化。
Anthropic10-15由于标准层级 RPM 较低,采取保守策略。
Groq5-10推理速度快但 RPM 限制严格。
本地大语言模型(LLM)1-3受本地硬件显存和计算能力限制。

来源:graphiti_core/helpers.py:38-38, graphiti_core/llm_client/errors.py:18-23

实现细节

限制协程

semaphore_gather 函数确保活动协程数量不超过限制。它创建一个包装器,在执行目标协程之前获取信号量。

# graphiti_core/helpers.py:123-133
async def semaphore_gather(
    *coroutines: Coroutine,
    max_coroutines: int | None = None,
) -> list[Any]:
    semaphore = asyncio.Semaphore(max_coroutines or SEMAPHORE_LIMIT)

    async def _wrap_coroutine(coroutine):
        async with semaphore:
            return await coroutine

    return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
数据库驱动程序使用

Neo4jDriver 这样的驱动程序在初始化期间使用 semaphore_gather 来跨多个标签构建索引和约束,而不会压垮数据库连接池 graphiti_core/driver/neo4j_driver.py:91-101。类似地,FalkorDriver 也使用 semaphore_gather 来构建索引和约束 graphiti_core/driver/falkordb_driver.py:200-201

来源:graphiti_core/helpers.py:123-133, graphiti_core/driver/neo4j_driver.py:91-101, graphiti_core/driver/falkordb_driver.py:200-201

内容片段切分与密度

除了请求并发之外,Graphiti 还通过基于密度的片段切分来管理实体提取的"内部"并发。如果某个片段的内容过于密集(由 CHUNK_DENSITY_THRESHOLD 决定),则会被拆分为多个片段,以避免大语言模型(LLM)提取失败 graphiti_core/helpers.py:45-55

设置项描述
CHUNK_TOKEN_SIZE3000高密度内容片段的目标大小。
CHUNK_MIN_TOKENS1000考虑进行片段切分前所需的最小 Token 数。
CHUNK_DENSITY_THRESHOLD0.15每 1000 个 Token 的元素数量阈值,超过此值将触发片段切分。

来源:graphiti_core/helpers.py:41-55