Episode 处理工作流
片段处理工作流
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
graphiti_core/edges.pygraphiti_core/graphiti.pygraphiti_core/nodes.pygraphiti_core/prompts/extract_nodes_and_edges.pygraphiti_core/prompts/lib.pygraphiti_core/utils/maintenance/combined_extraction.pygraphiti_core/utils/maintenance/community_operations.pygraphiti_core/utils/maintenance/graph_data_operations.pygraphiti_core/utils/text_utils.pytests/utils/test_concatenate_episodes.py
片段处理工作流是将新信息添加到知识图谱的主要入库管线。它会把非结构化的片段内容(消息、文本文档或 JSON)转换为由实体节点、关系边和时间元数据组成的结构化图谱表示。
目的与范围
该工作流由 Graphiti.add_episode() 方法编排 graphiti_core/graphiti.py:510-610,涉及以下步骤:
- 检索相关历史上下文。
- 提取并解析实体节点。
- 提取并解析关系边。
- 使用时间元数据持久化所有数据。
- 可选地更新社区结构。
工作流总览
下图展示了从 add_episode 方法调用开始,经过所有处理阶段,最终完成持久化的完整端到端流程。
图表:端到端片段处理流程
来源:graphiti_core/graphiti.py:510-610、graphiti_core/utils/maintenance/combined_extraction.py:41-50、graphiti_core/utils/maintenance/node_operations.py:101-105、graphiti_core/utils/maintenance/edge_operations.py:91-96
入口点与配置
add_episode 方法是片段处理的主要入口点,定义在 Graphiti 类中 graphiti_core/graphiti.py:510-534。
参数分类
| 类别 | 参数 | 描述 |
|---|---|---|
| 片段元数据 | name、episode_body、source_description、reference_time、source | 核心片段信息和内容。graphiti_core/graphiti.py:511-516 |
| 图谱分区 | group_id、uuid | 数据库命名空间和可选的特定片段 UUID。graphiti_core/graphiti.py:517-518 |
| 实体配置 | entity_types、excluded_entity_types | 自定义实体类型模式和提取排除项。graphiti_core/graphiti.py:520-521 |
| 边配置 | edge_types、edge_type_map | 自定义关系模式和类型间的允许签名。graphiti_core/graphiti.py:523-524 |
| 上下文控制 | previous_episode_uuids、custom_extraction_instructions | 覆盖默认上下文检索或提供特定大语言模型(LLM)指导。graphiti_core/graphiti.py:522-525 |
| 叙事管理 | saga、saga_previous_episode_uuid | 通过 SagaNode 将片段链接到叙事序列中。graphiti_core/graphiti.py:526-527 |
来源:graphiti_core/graphiti.py:510-550
阶段 1:上下文检索
在处理之前,系统会检索最近的历史片段,为实体提取和消歧提供上下文。
图表:上下文检索流程
来源:graphiti_core/graphiti.py:560-580、graphiti_core/utils/maintenance/graph_data_operations.py:67-74
上下文检索逻辑
系统使用 EPISODE_WINDOW_LEN(默认值为 3)来确定在未指定时检索多少个之前的片段 graphiti_core/utils/maintenance/graph_data_operations.py:29-29。
# 来自 graphiti.py:560-569
previous_episodes = (
await retrieve_episodes(
self.driver,
reference_time,
last_n=EPISODE_WINDOW_LEN,
group_ids=[group_id],
source=source,
)
if previous_episode_uuids is None
else await EpisodicNode.get_by_uuids(self.driver, previous_episode_uuids)
)
来源:graphiti_core/graphiti.py:560-569、graphiti_core/utils/maintenance/graph_data_operations.py:67-74
阶段 2 和 3:组合提取
Graphiti 采用组合提取策略,在单次大语言模型(LLM)调用中同时提取节点和边 graphiti_core/utils/maintenance/combined_extraction.py:51-56。这通过让模型同时看到实体与事实之间的关系,确保了更高的提取质量。
组合提取逻辑
extract_nodes_and_edges 函数将当前片段与之前的片段拼接起来作为上下文 graphiti_core/utils/maintenance/combined_extraction.py:114-121。它使用了 extract_nodes_and_edges.extract_message 提示 graphiti_core/prompts/extract_nodes_and_edges.py:91-159。
| 特性 | 描述 |
|---|---|
| 实体提取 | 识别说话者、命名实体和所有格实体(例如,"James 的笔记本")graphiti_core/prompts/extract_nodes_and_edges.py:101-113。 |
| 事实提取 | 提取关系事实,包含 source_entity_name、target_entity_name 和 fact 描述 graphiti_core/prompts/extract_nodes_and_edges.py:35-59。 |
| 自引用事实 | 当没有第二个实体适合时,捕获例程或状态(例如,"Sam 觉得自己缺乏动力")graphiti_core/prompts/extract_nodes_and_edges.py:124-129。 |
来源:graphiti_core/utils/maintenance/combined_extraction.py:41-135、graphiti_core/prompts/extract_nodes_and_edges.py:25-66
阶段 4:解析与去重
提取完成后,必须将实体和关系与现有数据进行解析,以防止图谱膨胀和矛盾。
节点解析
系统调用 resolve_extracted_nodes graphiti_core/utils/maintenance/node_operations.py:104-104,该函数执行以下操作:
- 精确匹配:对名称进行归一化,合并明显的重复项
graphiti_core/utils/maintenance/combined_extraction.py:174-176。 - 模糊/大语言模型(LLM)解析:(详见 5.2 节)解析可能具有不同命名约定的相似实体。
边解析
resolve_extracted_edges 函数 graphiti_core/utils/maintenance/edge_operations.py:95-95 会识别新事实是否与现有事实矛盾。如果发现矛盾,较旧的边会被标记上 invalid_at 时间戳。
阶段 5:持久化与叙事管理
批量持久化
add_nodes_and_edges_bulk() 函数在单个数据库事务中持久化 EpisodicNode、EntityNode 和 EntityEdge graphiti_core/utils/bulk_utils.py:78-78。
# 来自 graphiti.py:603-610
await add_nodes_and_edges_bulk(
self.driver,
[episode],
episodic_edges,
nodes,
edges,
self.embedder,
)
来源:graphiti_core/graphiti.py:603-610
叙事管理
如果提供了 saga 名称,Graphiti 会将片段链接到时间顺序链中:
- SagaNode:表示叙事线索
graphiti_core/nodes.py:57-57。 - HasEpisodeEdge:将
SagaNode链接到当前的EpisodicNodegraphiti_core/edges.py:36-37。 - NextEpisodeEdge:将该叙事中的前一个片段链接到新片段
graphiti_core/edges.py:38-39。
来源:graphiti_core/graphiti.py:632-660、graphiti_core/edges.py:32-43
返回结果与遥测
工作流最终返回一个 AddEpisodeResults 对象 graphiti_core/graphiti.py:114-121,并捕获遥测事件 graphiti_core/graphiti.py:615-620。
结果结构
graphiti_core/graphiti.py:114-121
class AddEpisodeResults(BaseModel):
episode: EpisodicNode
episodic_edges: list[EpisodicEdge]
nodes: list[EntityNode]
edges: list[EntityEdge]
communities: list[CommunityNode]
community_edges: list[CommunityEdge]
遥测
使用 capture_event() 捕获事件,以跟踪实体和边计数等处理元数据 graphiti_core/graphiti.py:74。
来源:graphiti_core/graphiti.py:615-620、graphiti_core/telemetry.py:74-74