数据处理管线
数据处理管线
相关源文件
以下文件为本维基页面的生成提供了上下文:
graphiti_core/graphiti.pygraphiti_core/utils/maintenance/edge_operations.pygraphiti_core/utils/maintenance/node_operations.pytests/utils/maintenance/test_edge_operations.pytests/utils/maintenance/test_node_operations.py
本文档记录了将剧集(Episode)通过提取、解析和持久化阶段进行多阶段处理的管线。有关高层级系统架构的上下文,请参见系统架构。有关提取过程中使用的具体大语言模型(LLM)提示词详情,请参见提示词库与模板。
概述
数据处理管线通过三个主要阶段,将原始剧集内容(消息、文本或 JSON)转换为结构化的知识图谱:
- 节点提取与解析:提取实体,使用三层策略与现有节点进行去重,并为其补充属性和摘要。
- 边提取与解析:提取实体之间的关系,处理时间信息,并通过时间失效机制处理重复和矛盾。
- 持久化:以优化批次将节点、边和剧集元数据保存到图数据库。
该管线有两种实现变体:
- 单剧集处理:通过
add_episode()graphiti_core/graphiti.py:788-1024实现。顺序处理,检索完整上下文以确保高准确性。 - 批量剧集处理:通过
add_episode_bulk()graphiti_core/graphiti.py:1026-1198实现。采用优化的两遍去重策略,实现高吞吐量入库。
剧集处理工作流
单剧集流程
add_episode() 方法编排了完整的管线:
阶段分解
| 阶段 | 操作 | 函数 | 大语言模型(LLM)调用次数 |
|---|---|---|---|
| 上下文检索 | 获取之前的剧集 | retrieve_episodes() graphiti_core/utils/maintenance/graph_data_operations.py:67-168 | 0 |
| 节点提取 | 从内容中提取实体 | extract_nodes() graphiti_core/utils/maintenance/node_operations.py:69-148 | 1 |
| 节点解析 | 与图谱进行去重 | resolve_extracted_nodes() graphiti_core/utils/maintenance/node_operations.py:31 | 0-N(仅针对未解析的节点) |
| 边提取 | 提取关系 | extract_edges() graphiti_core/utils/maintenance/edge_operations.py:116-248 | 1 |
| 边解析 | 去重并检测矛盾 | resolve_extracted_edges() graphiti_core/utils/maintenance/edge_operations.py:251-314 | 每条边 1 次 |
| 属性提取 | 提取实体属性 | extract_attributes_from_nodes() graphiti_core/utils/maintenance/node_operations.py:30 | 每种实体类型 1 次 |
| 摘要生成 | 生成实体摘要 | _extract_entity_summaries_batch() graphiti_core/utils/maintenance/node_operations.py:28 | 每批次 1 次(最多 30 个实体) |
| 持久化 | 保存到数据库 | add_nodes_and_edges_bulk() graphiti_core/utils/bulk_utils.py:128-149 | 0 |
来源:graphiti_core/graphiti.py:788-1024、graphiti_core/utils/maintenance/graph_data_operations.py:67-168、graphiti_core/utils/maintenance/node_operations.py:69-148、graphiti_core/utils/maintenance/edge_operations.py:116-248
上下文检索
在处理新剧集之前,系统会检索相对于 reference_time 最近的 last_n 个剧集(默认为 3 个)。此上下文用于消除代词指代歧义,并为实体提取提供连续性 graphiti_core/utils/maintenance/graph_data_operations.py:29-30。
previous_episodes = await self.retrieve_episodes(
reference_time,
last_n=EPISODE_WINDOW_LEN, # 3
group_ids=[group_id],
)
来源:graphiti_core/graphiti.py:895-904、graphiti_core/utils/maintenance/graph_data_operations.py:98、graphiti_core/utils/maintenance/graph_data_operations.py:67-168
节点提取与解析
节点提取通过特定格式的提取、分类和去重,将剧集内容转换为 EntityNode 对象。
详情请参见节点提取与解析。
特定格式提取
extract_nodes() 函数根据 EpisodeType 路由到不同的提示词模板 graphiti_core/utils/maintenance/node_operations.py:69-148:
- 消息格式:使用对话上下文消除代词指代歧义,并将发言者提取为主要实体。
- JSON 格式:从层级属性中提取实体,排除日期。
- 文本格式:从散文中进行通用实体提取。
来源:graphiti_core/utils/maintenance/node_operations.py:69-148、graphiti_core/nodes.py:54
三层去重策略
节点解析使用级联策略,以最大限度地减少昂贵的大语言模型(LLM)调用:
- 第一层:精确字符串匹配:规范化小写名称比较
graphiti_core/utils/maintenance/dedup_helpers.py:49。 - 第二层:模糊相似度:使用 3 字符 n-gram 分片、MinHash 签名和局部敏感哈希(LSH)分带
graphiti_core/utils/maintenance/dedup_helpers.py:10-25。 - 第三层:大语言模型(LLM)推理:对于未解析的节点,系统会将提取的实体与候选实体进行对比,并附带完整上下文
graphiti_core/utils/maintenance/node_operations.py:29。
来源:graphiti_core/utils/maintenance/dedup_helpers.py:10-52、graphiti_core/utils/maintenance/node_operations.py:104
边提取与解析
边提取将实体关系转换为带有时间语义和矛盾检测功能的 EntityEdge 对象。
详情请参见边提取与解析。
关系提取
extract_edges() 函数调用大语言模型(LLM),并传入实体上下文和 reference_time graphiti_core/utils/maintenance/edge_operations.py:116-248:
# 传递给大语言模型(LLM)用于边提取的上下文
context = {
'episode_content': concatenate_episodes(episodes),
'nodes': [{'name': node.name, 'entity_types': node.labels} for node in nodes],
'reference_time': latest_episode.valid_at.isoformat(),
}
提取过程会指示大语言模型(LLM)以 SCREAMING_SNAKE_CASE 格式提取事实,并使用 reference_time 解析相对时间提及。
来源:graphiti_core/utils/maintenance/edge_operations.py:116-248
边解析与矛盾处理
边解析同时处理重复检测和时间矛盾 graphiti_core/utils/maintenance/edge_operations.py:251-314:
- 快速路径:在提取的批次内对精确匹配进行去重
graphiti_core/utils/maintenance/edge_operations.py:317-360。 - 大语言模型(LLM)解析:从图谱中已有的事实中识别重复和矛盾
graphiti_core/utils/maintenance/edge_operations.py:363-448。 - 时间失效:如果新事实与现有事实矛盾,系统会更新旧边的
invalid_at和expired_at字段graphiti_core/utils/maintenance/edge_operations.py:438-444。
来源:graphiti_core/utils/maintenance/edge_operations.py:251-448
批量处理与优化
add_episode_bulk() 方法使用两遍去重策略处理多个剧集,以处理批次内的重复项 graphiti_core/graphiti.py:1026-1198。
详情请参见批量操作与优化。
两遍节点去重
- 第一遍:独立地将每个剧集的节点与图谱进行解析
graphiti_core/graphiti.py:1108-1114。 - 第二遍:使用内存相似度将已解析的节点相互比较,以查找批次内的重复项
graphiti_core/graphiti.py:1116-1120。 - UUID 映射压缩:使用带路径压缩的有向并查集(Union-Find)来折叠传递别名链
graphiti_core/utils/bulk_utils.py:69-98。
来源:graphiti_core/graphiti.py:1108-1120、graphiti_core/utils/bulk_utils.py:69-98
批量持久化
持久化通过 add_nodes_and_edges_bulk() 处理。此函数使用优化的数据库事务和批量保存查询,以最小化往返次数 graphiti_core/utils/bulk_utils.py:128-149。
来源:graphiti_core/utils/bulk_utils.py:128-149、graphiti_core/utils/bulk_utils.py:35-42