agentic_huge_data_base / wiki
页面 Graphiti · 11.3 播客处理示例·DeepWiki 中文全文译文

11.3 · 播客处理示例(Podcast Processing Example)

时序知识图谱与动态事实记忆 · 聚焦本章的模块关系、源码依据与实现要点。

项目Graphiti 章节11.3 状态全文译文 模块图谱与关系、入库与解析、测试、发布与运维、检索、召回与索引
源码线索
  • examples/podcast/podcast_runner.py
  • examples/podcast/podcast_transcript.txt
  • examples/podcast/transcript_parser.py
  • graphiti_core/search/search.py
  • graphiti_core/search/search_config.py
  • graphiti_core/search/search_config_recipes.py
  • graphiti_core/search/search_utils.py
  • tests/test_graphiti_int.py
模块标签
  • 图谱与关系
  • 入库与解析
  • 测试、发布与运维
  • 检索、召回与索引
  • 文档对象与元数据

章节正文

播客处理示例

播客处理示例

相关源文件

本章引用的主要源码文件:

  • examples/podcast/podcast_runner.py
  • examples/podcast/podcast_transcript.txt
  • examples/podcast/transcript_parser.py
  • graphiti_core/search/search.py
  • graphiti_core/search/search_config.py
  • graphiti_core/search/search_config_recipes.py
  • graphiti_core/search/search_utils.py
  • tests/test_graphiti_int.py

本文档将逐步讲解位于 examples/podcast/podcast_runner.py 的播客处理示例,该示例演示了如何使用 Graphiti 从播客转录数据构建知识图谱。它具体涵盖了自定义实体和边类型定义、边类型映射,以及顺序入库和批量入库两种模式。

关于 add_episodeadd_episode_bulk API 的通用说明,请参见 4.1。关于自定义实体和边类型功能的背景知识,请参见 10.3。关于通用批量入库机制,请参见 5.4

示例涵盖的内容

该播客示例将一份 Freakonomics Radio 转录文本(examples/podcast/podcast_transcript.txt)导入到 Graphiti 知识图谱中。它演示了以下内容:

  • 定义自定义实体类型PersonCity)为带有结构化属性的 Pydantic 模型。
  • 定义自定义边类型IsPresidentOfInterpersonalRelationshipLocatedIn)以约束关系语义。
  • 使用边类型映射来限制哪些边类型可以出现在哪些节点类型对之间。
  • 顺序模式add_episode)或批量模式add_episode_bulk)运行入库。
  • 将所有剧集关联到一个命名的传奇故事
  • 在入库后报告Token 使用量
  • 对生成的图谱执行混合搜索

文件结构

文件作用
examples/podcast/podcast_runner.py主运行程序:解析转录文本、配置类型、运行入库并执行搜索。
examples/podcast/podcast_transcript.txt原始播客转录文本,包含说话者 ID 和时间戳。
examples/podcast/transcript_parser.py将转录文件解析为结构化的 ParsedMessage 对象。

自定义实体类型

自定义实体类型是带有描述实体类别文档字符串的 Pydantic 模型。大语言模型(LLM)会同时使用类名和文档字符串来对提取的实体进行分类。

examples/podcast/podcast_runner.py:61-73

文档字符串额外属性
Person"一个人类个体,虚构或非虚构。"first_namelast_nameoccupation
City"一个城市"country

这些类通过 entity_types 参数以 dict[str, type[BaseModel]] 的形式传递给 add_episode examples/podcast/podcast_runner.py:150-161add_episode_bulk examples/podcast/podcast_runner.py:135-142

由大语言模型(LLM)提取并归类为 PersonCity 的实体将填充其结构化属性。任何不匹配自定义类型的实体将回退到内置的 Entity 类型。

来源:examples/podcast/podcast_runner.py:61-73examples/podcast/podcast_runner.py:156

自定义边类型

自定义边类型是描述关系语义的 Pydantic 模型。没有字段的模型(如 IsPresidentOf)仅约束关系命名而不添加结构化数据;有字段的模型则会向边添加类型化属性。

examples/podcast/podcast_runner.py:75-85

文档字符串
IsPresidentOf"一个人与其担任主席的实体之间的关系"
InterpersonalRelationship"两个人之间的关系(例如,认识、共事、采访)"
LocatedIn"表示某物位于或关联于某个地点的关系"

这些作为 edge_types 传递:

edge_types = {
    'IS_PRESIDENT_OF': IsPresidentOf,
    'INTERPERSONAL_RELATIONSHIP': InterpersonalRelationship,
    'LOCATED_IN': LocatedIn,
}

来源:examples/podcast/podcast_runner.py:75-85examples/podcast/podcast_runner.py:118-122

边类型映射

edge_type_map 控制哪些边类型在特定的节点类型对之间是有效的。它是一个 dict[tuple[str, str], list[str]],其中键是 (source_label, target_label),值是被允许的边类型名称列表。

examples/podcast/podcast_runner.py:127-132

edge_type_map = {
    ('Person', 'Entity'): ['IS_PRESIDENT_OF', 'INTERPERSONAL_RELATIONSHIP'],
    ('Person', 'Person'): ['INTERPERSONAL_RELATIONSHIP'],
    ('Person', 'City'):   ['LOCATED_IN'],
    ('Entity', 'City'):   ['LOCATED_IN'],
}

此示例在 (Person, Entity)(Person, Person) 之间复用了 INTERPERSONAL_RELATIONSHIP,并在 (Person, City)(Entity, City) 之间复用了 LOCATED_INGraphiti 的提取逻辑会构建一个反向映射,以保留每个边类型名称的所有有效节点对签名。

边类型映射——节点对示意图:

Graphiti · 边类型映射 · 图 1
Graphiti · 边类型映射 · 图 1

来源:examples/podcast/podcast_runner.py:127-132

转录文本解析

转录文件(examples/podcast/podcast_transcript.txt)以说话者编号的轮次和时间戳存储播客对话:

0 (3s):
So let's talk a little bit about what you see as the purpose of college...

1 (23s):
Well, part of the ethos of Jesuit institutions...

说话者 ID 映射到参与者:0 是主持人(Stephen Dubner),1 是嘉宾(Tania Tetlow,福特汉姆大学校长)。parse_podcast_messages() 函数 examples/podcast/transcript_parser.py:106-124 读取此文件并返回一个 ParsedMessage 对象列表。

属性类型描述
speaker_namestr说话者姓名
rolestr角色标签(主持人、嘉宾等)
contentstr此轮次的原始转录文本
actual_timestampdatetime根据开始时间和相对偏移量计算的绝对时间戳

来源:examples/podcast/podcast_transcript.txt:1-17examples/podcast/transcript_parser.py:14-20examples/podcast/transcript_parser.py:106-124

剧集构建

每个解析后的消息成为一个 RawEpisode(用于批量模式),或者直接传递给 add_episode(用于顺序模式)。

使用的 RawEpisode 字段:

examples/podcast/podcast_runner.py:105-115

字段示例中的值
namef'Message {i}'
contentf'{message.speaker_name} ({message.role}): {message.content}'
reference_timemessage.actual_timestamp
sourceEpisodeType.message
source_description'Podcast Transcript'

来源:examples/podcast/podcast_runner.py:105-115

入库模式

main() 函数 examples/podcast/podcast_runner.py:87 根据 use_bulk 的值在两种入库路径之间进行选择。

顺序入库与批量入库对比:

Graphiti · 入库模式 · 图 2
Graphiti · 入库模式 · 图 2

顺序模式 examples/podcast/podcast_runner.py:144-161

  • 使用 retrieve_episodes 获取上下文(最近 3 个剧集)。
  • 迭代调用 add_episode
  • 显式管理 previous_episode_uuids 以实现时间链式连接。

批量模式 examples/podcast/podcast_runner.py:135-142

  • RawEpisode 列表传递给 add_episode_bulk
  • Graphiti 处理并发提取和批量解析。

来源:examples/podcast/podcast_runner.py:135-161

传奇故事关联

所有剧集都关联到一个名为 'Freakonomics Podcast' 的传奇故事:

saga='Freakonomics Podcast'

这会创建一个 SagaNode,作为剧集链的容器。有关 HasEpisodeEdgeNextEpisodeEdge 的详细信息,请参见 10.5

来源:examples/podcast/podcast_runner.py:141examples/podcast/podcast_runner.py:160

搜索与检索

数据填充后,该示例使用两种方法执行搜索查询:

  1. 默认搜索:调用 client.search examples/podcast/podcast_runner.py:176,默认使用基于边的检索。
  2. 混合节点搜索:调用 client.search_ examples/podcast/podcast_runner.py:182,使用 NODE_HYBRID_SEARCH_RRF graphiti_core/search/search_config_recipes.py:156-161
node_results = await client.search_(
    query,
    group_ids=[group_id],
    config=NODE_HYBRID_SEARCH_RRF.model_copy(update={'limit': 5}),
)

搜索数据流:

Graphiti · 搜索与检索 · 图 3
Graphiti · 搜索与检索 · 图 3

来源:examples/podcast/podcast_runner.py:168-191graphiti_core/search/search.py:98-108graphiti_core/search/search_config_recipes.py:156-161graphiti_core/search/search_utils.py:230-260graphiti_core/search/search_utils.py:300-330graphiti_core/search/search_utils.py:600-609graphiti_core/search/search_config.py:121-129

Token 使用量报告

该示例按提示名称排序打印 Token 消耗摘要:

examples/podcast/podcast_runner.py:164-165

client.token_tracker.print_summary(sort_by='prompt_name')

来源:examples/podcast/podcast_runner.py:164-165

运行示例

该脚本默认使用嵌入式 AsyncFalkorDB,无需安装外部数据库。

examples/podcast/podcast_runner.py:94-97

falkor_db_path = os.path.join(tempfile.gettempdir(), 'podcast_runner_falkordb.db')
falkor_db = AsyncFalkorDB(dbfilename=falkor_db_path)
falkor_driver = FalkorDriver(falkor_db=falkor_db)

运行步骤:

  1. 确保已设置 OPENAI_API_KEY 环境变量。
  2. 运行 python examples/podcast/podcast_runner.py

来源:examples/podcast/podcast_runner.py:94-97examples/podcast/podcast_runner.py:193