agentic_huge_data_base / wiki
页面 Graphiti · 5 数据处理管线·DeepWiki 中文全文译文

5 · 数据处理管线(Data Processing Pipeline)

时序知识图谱与动态事实记忆 · 聚焦本章的模块关系、源码依据与实现要点。

项目Graphiti 章节5 状态全文译文 模块图谱与关系、测试、发布与运维、工作流与编排、记忆与上下文
源码线索
  • graphiti_core/graphiti.py
  • graphiti_core/utils/maintenance/edge_operations.py
  • graphiti_core/utils/maintenance/node_operations.py
  • tests/utils/maintenance/test_edge_operations.py
  • tests/utils/maintenance/test_node_operations.py
模块标签
  • 图谱与关系
  • 测试、发布与运维
  • 工作流与编排
  • 记忆与上下文
  • 入库与解析

章节正文

数据处理管线

数据处理管线

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • graphiti_core/graphiti.py
  • graphiti_core/utils/maintenance/edge_operations.py
  • graphiti_core/utils/maintenance/node_operations.py
  • tests/utils/maintenance/test_edge_operations.py
  • tests/utils/maintenance/test_node_operations.py

本文档记录了将剧集(Episode)通过提取、解析和持久化阶段进行多阶段处理的管线。有关高层级系统架构的上下文,请参见系统架构。有关提取过程中使用的具体大语言模型(LLM)提示词详情,请参见提示词库与模板

概述

数据处理管线通过三个主要阶段,将原始剧集内容(消息、文本或 JSON)转换为结构化的知识图谱:

  1. 节点提取与解析:提取实体,使用三层策略与现有节点进行去重,并为其补充属性和摘要。
  2. 边提取与解析:提取实体之间的关系,处理时间信息,并通过时间失效机制处理重复和矛盾。
  3. 持久化:以优化批次将节点、边和剧集元数据保存到图数据库。

该管线有两种实现变体:

  • 单剧集处理:通过 add_episode() graphiti_core/graphiti.py:788-1024 实现。顺序处理,检索完整上下文以确保高准确性。
  • 批量剧集处理:通过 add_episode_bulk() graphiti_core/graphiti.py:1026-1198 实现。采用优化的两遍去重策略,实现高吞吐量入库。

剧集处理工作流

单剧集流程

add_episode() 方法编排了完整的管线:

Graphiti · 单剧集流程 · 图 1
Graphiti · 单剧集流程 · 图 1

阶段分解

阶段操作函数大语言模型(LLM)调用次数
上下文检索获取之前的剧集retrieve_episodes() graphiti_core/utils/maintenance/graph_data_operations.py:67-1680
节点提取从内容中提取实体extract_nodes() graphiti_core/utils/maintenance/node_operations.py:69-1481
节点解析与图谱进行去重resolve_extracted_nodes() graphiti_core/utils/maintenance/node_operations.py:310-N(仅针对未解析的节点)
边提取提取关系extract_edges() graphiti_core/utils/maintenance/edge_operations.py:116-2481
边解析去重并检测矛盾resolve_extracted_edges() graphiti_core/utils/maintenance/edge_operations.py:251-314每条边 1 次
属性提取提取实体属性extract_attributes_from_nodes() graphiti_core/utils/maintenance/node_operations.py:30每种实体类型 1 次
摘要生成生成实体摘要_extract_entity_summaries_batch() graphiti_core/utils/maintenance/node_operations.py:28每批次 1 次(最多 30 个实体)
持久化保存到数据库add_nodes_and_edges_bulk() graphiti_core/utils/bulk_utils.py:128-1490

来源:graphiti_core/graphiti.py:788-1024graphiti_core/utils/maintenance/graph_data_operations.py:67-168graphiti_core/utils/maintenance/node_operations.py:69-148graphiti_core/utils/maintenance/edge_operations.py:116-248

上下文检索

在处理新剧集之前,系统会检索相对于 reference_time 最近的 last_n 个剧集(默认为 3 个)。此上下文用于消除代词指代歧义,并为实体提取提供连续性 graphiti_core/utils/maintenance/graph_data_operations.py:29-30

previous_episodes = await self.retrieve_episodes(
    reference_time,
    last_n=EPISODE_WINDOW_LEN,  # 3
    group_ids=[group_id],
)

来源:graphiti_core/graphiti.py:895-904graphiti_core/utils/maintenance/graph_data_operations.py:98graphiti_core/utils/maintenance/graph_data_operations.py:67-168

节点提取与解析

节点提取通过特定格式的提取、分类和去重,将剧集内容转换为 EntityNode 对象。

详情请参见节点提取与解析

特定格式提取

extract_nodes() 函数根据 EpisodeType 路由到不同的提示词模板 graphiti_core/utils/maintenance/node_operations.py:69-148

Graphiti · 特定格式提取 · 图 2
Graphiti · 特定格式提取 · 图 2
  • 消息格式:使用对话上下文消除代词指代歧义,并将发言者提取为主要实体。
  • JSON 格式:从层级属性中提取实体,排除日期。
  • 文本格式:从散文中进行通用实体提取。

来源:graphiti_core/utils/maintenance/node_operations.py:69-148graphiti_core/nodes.py:54

三层去重策略

节点解析使用级联策略,以最大限度地减少昂贵的大语言模型(LLM)调用:

  1. 第一层:精确字符串匹配:规范化小写名称比较 graphiti_core/utils/maintenance/dedup_helpers.py:49
  2. 第二层:模糊相似度:使用 3 字符 n-gram 分片、MinHash 签名和局部敏感哈希(LSH)分带 graphiti_core/utils/maintenance/dedup_helpers.py:10-25
  3. 第三层:大语言模型(LLM)推理:对于未解析的节点,系统会将提取的实体与候选实体进行对比,并附带完整上下文 graphiti_core/utils/maintenance/node_operations.py:29

来源:graphiti_core/utils/maintenance/dedup_helpers.py:10-52graphiti_core/utils/maintenance/node_operations.py:104

边提取与解析

边提取将实体关系转换为带有时间语义和矛盾检测功能的 EntityEdge 对象。

详情请参见边提取与解析

关系提取

extract_edges() 函数调用大语言模型(LLM),并传入实体上下文和 reference_time graphiti_core/utils/maintenance/edge_operations.py:116-248

# 传递给大语言模型(LLM)用于边提取的上下文
context = {
    'episode_content': concatenate_episodes(episodes),
    'nodes': [{'name': node.name, 'entity_types': node.labels} for node in nodes],
    'reference_time': latest_episode.valid_at.isoformat(),
}

提取过程会指示大语言模型(LLM)以 SCREAMING_SNAKE_CASE 格式提取事实,并使用 reference_time 解析相对时间提及。

来源:graphiti_core/utils/maintenance/edge_operations.py:116-248

边解析与矛盾处理

边解析同时处理重复检测和时间矛盾 graphiti_core/utils/maintenance/edge_operations.py:251-314

  1. 快速路径:在提取的批次内对精确匹配进行去重 graphiti_core/utils/maintenance/edge_operations.py:317-360
  2. 大语言模型(LLM)解析:从图谱中已有的事实中识别重复和矛盾 graphiti_core/utils/maintenance/edge_operations.py:363-448
  3. 时间失效:如果新事实与现有事实矛盾,系统会更新旧边的 invalid_atexpired_at 字段 graphiti_core/utils/maintenance/edge_operations.py:438-444

来源:graphiti_core/utils/maintenance/edge_operations.py:251-448

批量处理与优化

add_episode_bulk() 方法使用两遍去重策略处理多个剧集,以处理批次内的重复项 graphiti_core/graphiti.py:1026-1198

详情请参见批量操作与优化

两遍节点去重
Graphiti · 两遍节点去重 · 图 3
Graphiti · 两遍节点去重 · 图 3
  • 第一遍:独立地将每个剧集的节点与图谱进行解析 graphiti_core/graphiti.py:1108-1114
  • 第二遍:使用内存相似度将已解析的节点相互比较,以查找批次内的重复项 graphiti_core/graphiti.py:1116-1120
  • UUID 映射压缩:使用带路径压缩的有向并查集(Union-Find)来折叠传递别名链 graphiti_core/utils/bulk_utils.py:69-98

来源:graphiti_core/graphiti.py:1108-1120graphiti_core/utils/bulk_utils.py:69-98

批量持久化

持久化通过 add_nodes_and_edges_bulk() 处理。此函数使用优化的数据库事务和批量保存查询,以最小化往返次数 graphiti_core/utils/bulk_utils.py:128-149

来源:graphiti_core/utils/bulk_utils.py:128-149graphiti_core/utils/bulk_utils.py:35-42