批量操作与优化
批量操作与优化
相关源文件
本章引用的主要源码文件:
graphiti_core/graphiti.pygraphiti_core/utils/bulk_utils.pytests/utils/maintenance/test_bulk_utils.py
本文档记录了批量剧集入库系统:包括 RawEpisode 输入类型、add_episode_bulk 方法、graphiti_core/utils/bulk_utils.py 的内部实现、批处理与顺序单剧集处理的区别,以及相关的性能权衡。
关于单剧集处理的文档,请参见剧集处理工作流。关于两个流程共用的节点和边提取内部机制,请参见节点提取与解析和边提取与解析。
概述
Graphiti 的主要入库路径是逐条处理剧集:每次调用 add_episode 都是完全顺序执行的,在持久化之前,它会根据实时图谱解析实体和边。对于大规模入库(例如加载转录文本、文档语料库或历史数据集),这种方式会很慢,因为每个剧集都需要多次往返调用大语言模型(LLM)和图数据库。
add_episode_bulk 通过在一个剧集窗口内批量执行提取和解析工作、在内存中进行跨批次去重,并将所有结果以每个块(chunk)一个事务的方式写入图谱,来解决这个问题。
关键约束:批处理以牺牲跨剧集时间失效的完整性来换取吞吐量。边的失效和去重在批次内按剧集进行,但在解析过程中,同一批次内的剧集无法看到彼此新添加的边。
来源:graphiti_core/graphiti.py:123-130,graphiti_core/utils/bulk_utils.py:1-66
RawEpisode 输入类型
RawEpisode 是一个轻量级的 Pydantic 模型,定义在 bulk_utils.py 中,它表示一个尚未转换为图节点的剧集。它是 add_episode_bulk 的输入元素。
| 字段 | 类型 | 描述 |
|---|---|---|
name | str | 人类可读的剧集名称 |
uuid | str | None | 可选的稳定 UUID;如果省略则自动生成 |
content | str | 原始剧集文本或结构化内容 |
source_description | str | 描述内容来源 |
source | EpisodeType | message、text 或 json |
reference_time | datetime | 所描述事件的发生时间(valid_at) |
来源:graphiti_core/utils/bulk_utils.py:101-107
add_episode_bulk 接口
add_episode_bulk 是 Graphiti 类的一个方法。其返回类型是 AddBulkEpisodeResults,它聚合了批次中所有剧集的结果。
AddBulkEpisodeResults 字段:
| 字段 | 类型 |
|---|---|
episodes | list[EpisodicNode] |
episodic_edges | list[EpisodicEdge] |
nodes | list[EntityNode] |
edges | list[EntityEdge] |
communities | list[CommunityNode] |
community_edges | list[CommunityEdge] |
add_episode_bulk 接受的关键参数:
| 参数 | 类型 | 说明 |
|---|---|---|
episodes | list[RawEpisode] | 输入剧集 |
group_id | str | None | 图谱分区标识符 |
entity_types | dict[str, type[BaseModel]] | None | 自定义实体类型 |
excluded_entity_types | list[str] | None | 要跳过的实体类型 |
edge_types | dict[str, type[BaseModel]] | None | 自定义边类型 |
edge_type_map | dict[tuple[str, str], list[str]] | None | 每对节点允许的边类型 |
custom_extraction_instructions | str | None | 给大语言模型(LLM)提取的额外指令 |
saga | str | SagaNode | None | 可选的 Saga 关联 |
完整的剧集列表会按 CHUNK_SIZE = 10 的块大小进行处理,该常量定义在 bulk_utils.py 中。
来源:graphiti_core/graphiti.py:123-130,graphiti_core/utils/bulk_utils.py:66
批量处理管线
图表:“add_episode_bulk” 高级管线
来源:graphiti_core/graphiti.py:591-732,graphiti_core/utils/bulk_utils.py:110-293
bulk_utils.py 内部机制
retrieve_previous_episodes_bulk
graphiti_core/utils/bulk_utils.py:110-125
使用 semaphore_gather 并发地为批次中的每个剧集调用 retrieve_episodes。返回一个 (EpisodicNode, list[EpisodicNode]) 元组列表,每个元组将一个剧集与其前序剧集配对,以提供上下文。上下文窗口由 EPISODE_WINDOW_LEN = 3 定义。
retrieve_previous_episodes_bulk(driver, episodes)
-> list[tuple[EpisodicNode, list[EpisodicNode]]]
来源:graphiti_core/utils/bulk_utils.py:110-125,graphiti_core/utils/maintenance/graph_data_operations.py:29
extract_nodes_and_edges_bulk
graphiti_core/utils/bulk_utils.py:254-293
运行两轮 semaphore_gather:
- 对所有
(episode, previous_episodes)元组并行调用extract_nodes。 - 使用第一步中提取的对应节点作为节点上下文,并行调用
extract_edges。
返回 (list[list[EntityNode]], list[list[EntityEdge]]) —— 每个剧集对应一个内部列表。
来源:graphiti_core/utils/bulk_utils.py:254-293
dedupe_nodes_bulk
graphiti_core/utils/bulk_utils.py:296-408
这是批量管线中最复杂的函数。它执行两轮去重策略:
第一轮 —— 图谱协调(并行): 为每个剧集并发调用 resolve_extracted_nodes。这与单剧集流程相同,使用嵌入向量相似度和 LLM 解析将提取的节点与现有图谱进行匹配。
第二轮 —— 跨批次内存去重: 遍历所有剧集中所有已解析的节点,构建一个 canonical_nodes 池。对于每个新节点:
- 使用
_normalize_string_exact检查精确字符串匹配。 - 如果没有精确匹配,则通过
_build_candidate_indexes使用基于 MinHash 的 Shingle 相似度,对所有当前规范节点运行_resolve_with_similarity。 - 记录重复对。
最后,使用 _build_directed_uuid_map 将所有每个剧集的 UUID 映射和跨批次重复对合并到一个 compressed_map 中。
图表:“dedupe_nodes_bulk” 两轮策略
来源:graphiti_core/utils/bulk_utils.py:296-408,graphiti_core/utils/maintenance/dedup_helpers.py:47-50
dedupe_edges_bulk
graphiti_core/utils/bulk_utils.py:411-503
使用内存相似度而非图谱查找来去重跨所有剧集提取的边。算法如下:
- 为所有边生成嵌入向量:使用
create_entity_edge_embeddings(按剧集组并行)。 - 查找候选边:对于每条边,从所有剧集中收集具有相同
source_node_uuid和target_node_uuid的边,然后通过单词重叠或余弦相似度(≥0.6)进行过滤。 - 解析:为每个
(episode, edge, candidates)三元组并行调用resolve_extracted_edge。 - 压缩:收集重复对,并使用
UnionFind通过compress_uuid_map进行处理,该函数为每个 UUID 组分配字典序最小的规范 UUID。
返回 edges_by_episode: dict[str, list[EntityEdge]]。
来源:graphiti_core/utils/bulk_utils.py:411-503
add_nodes_and_edges_bulk
graphiti_core/utils/bulk_utils.py:128-251
通过 driver.session().execute_write(add_nodes_and_edges_bulk_tx, ...) 在单个写入事务中将所有节点和边写入图谱。
事务函数 add_nodes_and_edges_bulk_tx 序列化所有数据,并运行特定于提供商的批量查询:
| 提供商 | 策略 |
|---|---|
| Neo4j / FalkorDB | 通过 get_entity_node_save_bulk_query 和 get_entity_edge_save_bulk_query 使用基于 UNWIND 的批量 MERGE 查询 |
| Kuzu | 单行插入(Kuzu 的 UNWIND 不支持 STRUCT[]) |
实现了 graph_operations_interface 的驱动程序 | 委托给接口方法 |
嵌入向量的生成在事务内部处理:没有 name_embedding 的节点会即时生成;没有 fact_embedding 的边也同样处理。
来源:graphiti_core/utils/bulk_utils.py:128-251,graphiti_core/models/nodes/node_db_queries.py:40-41,graphiti_core/models/edges/edge_db_queries.py:35-36
辅助工具
resolve_edge_pointers graphiti_core/utils/bulk_utils.py:549-556
使用 UUID 映射(由节点去重产生)更新边上的 source_node_uuid 和 target_node_uuid。这确保了在去重后,边能正确指向规范节点。
_build_directed_uuid_map graphiti_core/utils/bulk_utils.py:69-98
带迭代路径压缩的有向并查集。折叠别名 → 规范链,使得 alias_a → alias_b → canonical 能正确地将 alias_a 解析为 canonical。
compress_uuid_map graphiti_core/utils/bulk_utils.py:528-543
用于边去重。使用 UnionFind,并选择每个连通分量中字典序最小的 UUID 作为规范 UUID。
来源:graphiti_core/utils/bulk_utils.py:549-556,graphiti_core/utils/bulk_utils.py:69-98,graphiti_core/utils/bulk_utils.py:528-543
_extract_and_dedupe_nodes_bulk 和 _resolve_nodes_and_edges_bulk
这两个 Graphiti 的私有方法编排了批量管线,并从 add_episode_bulk 中调用。
_extract_and_dedupe_nodes_bulk graphiti_core/graphiti.py:591-621
封装了 extract_nodes_and_edges_bulk 和 dedupe_nodes_bulk。返回:
nodes_by_episode: dict[str, list[EntityNode]]uuid_map: dict[str, str]extracted_edges_bulk: list[list[EntityEdge]]
_resolve_nodes_and_edges_bulk graphiti_core/graphiti.py:623-732
- 为所有已解析节点构建
nodes_by_uuid查找表。 - 使用压缩后的 UUID 映射对每个剧集的节点进行去重。
- 为所有剧集并行调用
resolve_extracted_nodes。 - 为所有剧集并行调用
extract_attributes_from_nodes(填充/摘要)。 - 对每个剧集的边应用
resolve_edge_pointers。 - 为所有剧集并行调用
resolve_extracted_edges。
返回 (final_hydrated_nodes, resolved_edges, invalidated_edges, uuid_map)。
来源:graphiti_core/graphiti.py:591-621,graphiti_core/graphiti.py:623-732
单剧集处理与批处理对比
图表:代码实体映射 —— 单剧集 vs. 批量
| 方面 | add_episode | add_episode_bulk |
|---|---|---|
| 剧集上下文 | 来自图谱的前 N 个剧集 | 每个剧集的前 N 个剧集(并行查询) |
| 节点去重 | 仅针对图谱 | 针对图谱 + 跨批次内存去重(2 轮) |
| 边去重 | 通过嵌入向量 + BM25 搜索针对图谱 | 内存中单词重叠 + 余弦相似度(≥ 0.6) |
| 属性提取 | 边解析之后 | 节点解析之后,边解析之前 |
| 图谱写入 | 每个剧集一个事务 | 每个块一个事务(10 个剧集) |
| 时间失效 | 完整,具有跨剧集感知能力 | 每个剧集,批次内无跨可见性 |
| 块大小 | 不适用 | CHUNK_SIZE = 10 |
来源:graphiti_core/graphiti.py:788-1100,graphiti_core/graphiti.py:591-732,graphiti_core/utils/bulk_utils.py:66
性能考量
通过 semaphore_gather 实现并行
所有按剧集执行的操作(提取、节点解析、属性提取、边解析)都使用 semaphore_gather 并带有可配置的并发限制。这允许同时进行更多的大语言模型(LLM)和嵌入向量 API 调用,但代价是更高的速率限制压力。
来源:graphiti_core/utils/bulk_utils.py:113-120,graphiti_core/utils/bulk_utils.py:260-272
块大小
剧集按 10 个一组(CHUNK_SIZE)进行处理。每个块产生一个写入事务。更大的块可以提高内存去重质量(找到更多跨批次匹配),但会增加内存使用量和批次内冲突的风险。
来源:graphiti_core/utils/bulk_utils.py:66
内存边去重准确性
dedupe_edges_bulk 使用 0.6 的余弦相似度阈值和单词重叠作为候选过滤器,这比单剧集处理中 resolve_extracted_edges 使用的完整 BM25 + 嵌入向量搜索要粗略。这意味着一些跨越多个批处理块的近似重复边可能不会被捕获,直到它们稍后针对图谱进行解析。
来源:graphiti_core/utils/bulk_utils.py:411-503
事务效率
add_nodes_and_edges_bulk_tx 对 Neo4j 和 FalkorDB 使用基于 UNWIND 的批量 MERGE,这比为每个节点/边发出单独的 Cypher 写入要快得多。对于 Kuzu,由于驱动程序在 UNWIND 中不支持 STRUCT[] 的限制,因此使用单行插入。
来源:graphiti_core/utils/bulk_utils.py:128-251,graphiti_core/models/nodes/node_db_queries.py:40-41
嵌入向量生成
在 add_nodes_and_edges_bulk_tx 中,如果嵌入向量尚不存在,则会在事务函数内部惰性生成。对于大型批次,如果嵌入器吞吐量较低,这可能会成为瓶颈。
来源:graphiti_core/utils/bulk_utils.py:168-169,graphiti_core/utils/bulk_utils.py:193-194