agentic_huge_data_base / wiki
页面 Graphiti · 5.1 Episode 处理工作流·DeepWiki 中文全文译文

5.1 · Episode 处理工作流(Episode Processing Workflow)

时序知识图谱与动态事实记忆 · 聚焦本章的模块关系、源码依据与实现要点。

项目Graphiti 章节5.1 状态全文译文 模块图谱与关系、测试、发布与运维、工作流与编排、记忆与上下文
源码线索
  • graphiti_core/edges.py
  • graphiti_core/graphiti.py
  • graphiti_core/nodes.py
  • graphiti_core/prompts/extract_nodes_and_edges.py
  • graphiti_core/prompts/lib.py
  • graphiti_core/utils/maintenance/combined_extraction.py
  • graphiti_core/utils/maintenance/community_operations.py
  • graphiti_core/utils/maintenance/graph_data_operations.py
  • graphiti_core/utils/text_utils.py
  • tests/utils/test_concatenate_episodes.py
模块标签
  • 图谱与关系
  • 测试、发布与运维
  • 工作流与编排
  • 记忆与上下文
  • 界面与交互

章节正文

Episode 处理工作流

片段处理工作流

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • graphiti_core/edges.py
  • graphiti_core/graphiti.py
  • graphiti_core/nodes.py
  • graphiti_core/prompts/extract_nodes_and_edges.py
  • graphiti_core/prompts/lib.py
  • graphiti_core/utils/maintenance/combined_extraction.py
  • graphiti_core/utils/maintenance/community_operations.py
  • graphiti_core/utils/maintenance/graph_data_operations.py
  • graphiti_core/utils/text_utils.py
  • tests/utils/test_concatenate_episodes.py

片段处理工作流是将新信息添加到知识图谱的主要入库管线。它会把非结构化的片段内容(消息、文本文档或 JSON)转换为由实体节点、关系边和时间元数据组成的结构化图谱表示。

目的与范围

该工作流由 Graphiti.add_episode() 方法编排 graphiti_core/graphiti.py:510-610,涉及以下步骤:

  • 检索相关历史上下文。
  • 提取并解析实体节点。
  • 提取并解析关系边。
  • 使用时间元数据持久化所有数据。
  • 可选地更新社区结构。

工作流总览

下图展示了从 add_episode 方法调用开始,经过所有处理阶段,最终完成持久化的完整端到端流程。

图表:端到端片段处理流程

Graphiti · 工作流总览 · 图 1
Graphiti · 工作流总览 · 图 1

来源:graphiti_core/graphiti.py:510-610graphiti_core/utils/maintenance/combined_extraction.py:41-50graphiti_core/utils/maintenance/node_operations.py:101-105graphiti_core/utils/maintenance/edge_operations.py:91-96

入口点与配置

add_episode 方法是片段处理的主要入口点,定义在 Graphiti 类中 graphiti_core/graphiti.py:510-534

参数分类
类别参数描述
片段元数据nameepisode_bodysource_descriptionreference_timesource核心片段信息和内容。graphiti_core/graphiti.py:511-516
图谱分区group_iduuid数据库命名空间和可选的特定片段 UUID。graphiti_core/graphiti.py:517-518
实体配置entity_typesexcluded_entity_types自定义实体类型模式和提取排除项。graphiti_core/graphiti.py:520-521
边配置edge_typesedge_type_map自定义关系模式和类型间的允许签名。graphiti_core/graphiti.py:523-524
上下文控制previous_episode_uuidscustom_extraction_instructions覆盖默认上下文检索或提供特定大语言模型(LLM)指导。graphiti_core/graphiti.py:522-525
叙事管理sagasaga_previous_episode_uuid通过 SagaNode 将片段链接到叙事序列中。graphiti_core/graphiti.py:526-527

来源:graphiti_core/graphiti.py:510-550

阶段 1:上下文检索

在处理之前,系统会检索最近的历史片段,为实体提取和消歧提供上下文。

图表:上下文检索流程

Graphiti · 阶段 1:上下文检索 · 图 2
Graphiti · 阶段 1:上下文检索 · 图 2

来源:graphiti_core/graphiti.py:560-580graphiti_core/utils/maintenance/graph_data_operations.py:67-74

上下文检索逻辑

系统使用 EPISODE_WINDOW_LEN(默认值为 3)来确定在未指定时检索多少个之前的片段 graphiti_core/utils/maintenance/graph_data_operations.py:29-29

# 来自 graphiti.py:560-569
previous_episodes = (
    await retrieve_episodes(
        self.driver,
        reference_time,
        last_n=EPISODE_WINDOW_LEN,
        group_ids=[group_id],
        source=source,
    )
    if previous_episode_uuids is None
    else await EpisodicNode.get_by_uuids(self.driver, previous_episode_uuids)
)

来源:graphiti_core/graphiti.py:560-569graphiti_core/utils/maintenance/graph_data_operations.py:67-74

阶段 2 和 3:组合提取

Graphiti 采用组合提取策略,在单次大语言模型(LLM)调用中同时提取节点和边 graphiti_core/utils/maintenance/combined_extraction.py:51-56。这通过让模型同时看到实体与事实之间的关系,确保了更高的提取质量。

组合提取逻辑

extract_nodes_and_edges 函数将当前片段与之前的片段拼接起来作为上下文 graphiti_core/utils/maintenance/combined_extraction.py:114-121。它使用了 extract_nodes_and_edges.extract_message 提示 graphiti_core/prompts/extract_nodes_and_edges.py:91-159

特性描述
实体提取识别说话者、命名实体和所有格实体(例如,"James 的笔记本")graphiti_core/prompts/extract_nodes_and_edges.py:101-113
事实提取提取关系事实,包含 source_entity_nametarget_entity_namefact 描述 graphiti_core/prompts/extract_nodes_and_edges.py:35-59
自引用事实当没有第二个实体适合时,捕获例程或状态(例如,"Sam 觉得自己缺乏动力")graphiti_core/prompts/extract_nodes_and_edges.py:124-129

来源:graphiti_core/utils/maintenance/combined_extraction.py:41-135graphiti_core/prompts/extract_nodes_and_edges.py:25-66

阶段 4:解析与去重

提取完成后,必须将实体和关系与现有数据进行解析,以防止图谱膨胀和矛盾。

节点解析

系统调用 resolve_extracted_nodes graphiti_core/utils/maintenance/node_operations.py:104-104,该函数执行以下操作:

  1. 精确匹配:对名称进行归一化,合并明显的重复项 graphiti_core/utils/maintenance/combined_extraction.py:174-176
  2. 模糊/大语言模型(LLM)解析:(详见 5.2 节)解析可能具有不同命名约定的相似实体。
边解析

resolve_extracted_edges 函数 graphiti_core/utils/maintenance/edge_operations.py:95-95 会识别新事实是否与现有事实矛盾。如果发现矛盾,较旧的边会被标记上 invalid_at 时间戳。

阶段 5:持久化与叙事管理

批量持久化

add_nodes_and_edges_bulk() 函数在单个数据库事务中持久化 EpisodicNodeEntityNodeEntityEdge graphiti_core/utils/bulk_utils.py:78-78

# 来自 graphiti.py:603-610
await add_nodes_and_edges_bulk(
    self.driver,
    [episode],
    episodic_edges,
    nodes,
    edges,
    self.embedder,
)

来源:graphiti_core/graphiti.py:603-610

叙事管理

如果提供了 saga 名称,Graphiti 会将片段链接到时间顺序链中:

  1. SagaNode:表示叙事线索 graphiti_core/nodes.py:57-57
  2. HasEpisodeEdge:将 SagaNode 链接到当前的 EpisodicNode graphiti_core/edges.py:36-37
  3. NextEpisodeEdge:将该叙事中的前一个片段链接到新片段 graphiti_core/edges.py:38-39

来源:graphiti_core/graphiti.py:632-660graphiti_core/edges.py:32-43

返回结果与遥测

工作流最终返回一个 AddEpisodeResults 对象 graphiti_core/graphiti.py:114-121,并捕获遥测事件 graphiti_core/graphiti.py:615-620

结果结构

graphiti_core/graphiti.py:114-121

class AddEpisodeResults(BaseModel):
    episode: EpisodicNode
    episodic_edges: list[EpisodicEdge]
    nodes: list[EntityNode]
    edges: list[EntityEdge]
    communities: list[CommunityNode]
    community_edges: list[CommunityEdge]
遥测

使用 capture_event() 捕获事件,以跟踪实体和边计数等处理元数据 graphiti_core/graphiti.py:74

来源:graphiti_core/graphiti.py:615-620graphiti_core/telemetry.py:74-74