播客处理示例
播客处理示例
相关源文件
本章引用的主要源码文件:
examples/podcast/podcast_runner.pyexamples/podcast/podcast_transcript.txtexamples/podcast/transcript_parser.pygraphiti_core/search/search.pygraphiti_core/search/search_config.pygraphiti_core/search/search_config_recipes.pygraphiti_core/search/search_utils.pytests/test_graphiti_int.py
本文档将逐步讲解位于 examples/podcast/podcast_runner.py 的播客处理示例,该示例演示了如何使用 Graphiti 从播客转录数据构建知识图谱。它具体涵盖了自定义实体和边类型定义、边类型映射,以及顺序入库和批量入库两种模式。
关于 add_episode 和 add_episode_bulk API 的通用说明,请参见 4.1。关于自定义实体和边类型功能的背景知识,请参见 10.3。关于通用批量入库机制,请参见 5.4。
示例涵盖的内容
该播客示例将一份 Freakonomics Radio 转录文本(examples/podcast/podcast_transcript.txt)导入到 Graphiti 知识图谱中。它演示了以下内容:
- 定义自定义实体类型(
Person、City)为带有结构化属性的 Pydantic 模型。 - 定义自定义边类型(
IsPresidentOf、InterpersonalRelationship、LocatedIn)以约束关系语义。 - 使用边类型映射来限制哪些边类型可以出现在哪些节点类型对之间。
- 以顺序模式(
add_episode)或批量模式(add_episode_bulk)运行入库。 - 将所有剧集关联到一个命名的传奇故事。
- 在入库后报告Token 使用量。
- 对生成的图谱执行混合搜索。
文件结构
| 文件 | 作用 |
|---|---|
examples/podcast/podcast_runner.py | 主运行程序:解析转录文本、配置类型、运行入库并执行搜索。 |
examples/podcast/podcast_transcript.txt | 原始播客转录文本,包含说话者 ID 和时间戳。 |
examples/podcast/transcript_parser.py | 将转录文件解析为结构化的 ParsedMessage 对象。 |
自定义实体类型
自定义实体类型是带有描述实体类别文档字符串的 Pydantic 模型。大语言模型(LLM)会同时使用类名和文档字符串来对提取的实体进行分类。
examples/podcast/podcast_runner.py:61-73
| 类 | 文档字符串 | 额外属性 |
|---|---|---|
Person | "一个人类个体,虚构或非虚构。" | first_name、last_name、occupation |
City | "一个城市" | country |
这些类通过 entity_types 参数以 dict[str, type[BaseModel]] 的形式传递给 add_episode examples/podcast/podcast_runner.py:150-161 或 add_episode_bulk examples/podcast/podcast_runner.py:135-142。
由大语言模型(LLM)提取并归类为 Person 或 City 的实体将填充其结构化属性。任何不匹配自定义类型的实体将回退到内置的 Entity 类型。
来源:examples/podcast/podcast_runner.py:61-73、examples/podcast/podcast_runner.py:156
自定义边类型
自定义边类型是描述关系语义的 Pydantic 模型。没有字段的模型(如 IsPresidentOf)仅约束关系命名而不添加结构化数据;有字段的模型则会向边添加类型化属性。
examples/podcast/podcast_runner.py:75-85
| 类 | 文档字符串 |
|---|---|
IsPresidentOf | "一个人与其担任主席的实体之间的关系" |
InterpersonalRelationship | "两个人之间的关系(例如,认识、共事、采访)" |
LocatedIn | "表示某物位于或关联于某个地点的关系" |
这些作为 edge_types 传递:
edge_types = {
'IS_PRESIDENT_OF': IsPresidentOf,
'INTERPERSONAL_RELATIONSHIP': InterpersonalRelationship,
'LOCATED_IN': LocatedIn,
}
来源:examples/podcast/podcast_runner.py:75-85、examples/podcast/podcast_runner.py:118-122
边类型映射
edge_type_map 控制哪些边类型在特定的节点类型对之间是有效的。它是一个 dict[tuple[str, str], list[str]],其中键是 (source_label, target_label),值是被允许的边类型名称列表。
examples/podcast/podcast_runner.py:127-132
edge_type_map = {
('Person', 'Entity'): ['IS_PRESIDENT_OF', 'INTERPERSONAL_RELATIONSHIP'],
('Person', 'Person'): ['INTERPERSONAL_RELATIONSHIP'],
('Person', 'City'): ['LOCATED_IN'],
('Entity', 'City'): ['LOCATED_IN'],
}
此示例在 (Person, Entity) 和 (Person, Person) 之间复用了 INTERPERSONAL_RELATIONSHIP,并在 (Person, City) 和 (Entity, City) 之间复用了 LOCATED_IN。Graphiti 的提取逻辑会构建一个反向映射,以保留每个边类型名称的所有有效节点对签名。
边类型映射——节点对示意图:
来源:examples/podcast/podcast_runner.py:127-132
转录文本解析
转录文件(examples/podcast/podcast_transcript.txt)以说话者编号的轮次和时间戳存储播客对话:
0 (3s):
So let's talk a little bit about what you see as the purpose of college...
1 (23s):
Well, part of the ethos of Jesuit institutions...
说话者 ID 映射到参与者:0 是主持人(Stephen Dubner),1 是嘉宾(Tania Tetlow,福特汉姆大学校长)。parse_podcast_messages() 函数 examples/podcast/transcript_parser.py:106-124 读取此文件并返回一个 ParsedMessage 对象列表。
| 属性 | 类型 | 描述 |
|---|---|---|
speaker_name | str | 说话者姓名 |
role | str | 角色标签(主持人、嘉宾等) |
content | str | 此轮次的原始转录文本 |
actual_timestamp | datetime | 根据开始时间和相对偏移量计算的绝对时间戳 |
来源:examples/podcast/podcast_transcript.txt:1-17、examples/podcast/transcript_parser.py:14-20、examples/podcast/transcript_parser.py:106-124
剧集构建
每个解析后的消息成为一个 RawEpisode(用于批量模式),或者直接传递给 add_episode(用于顺序模式)。
使用的 RawEpisode 字段:
examples/podcast/podcast_runner.py:105-115
| 字段 | 示例中的值 |
|---|---|
name | f'Message {i}' |
content | f'{message.speaker_name} ({message.role}): {message.content}' |
reference_time | message.actual_timestamp |
source | EpisodeType.message |
source_description | 'Podcast Transcript' |
来源:examples/podcast/podcast_runner.py:105-115
入库模式
main() 函数 examples/podcast/podcast_runner.py:87 根据 use_bulk 的值在两种入库路径之间进行选择。
顺序入库与批量入库对比:
顺序模式 examples/podcast/podcast_runner.py:144-161:
- 使用
retrieve_episodes获取上下文(最近 3 个剧集)。 - 迭代调用
add_episode。 - 显式管理
previous_episode_uuids以实现时间链式连接。
批量模式 examples/podcast/podcast_runner.py:135-142:
- 将
RawEpisode列表传递给add_episode_bulk。 - Graphiti 处理并发提取和批量解析。
来源:examples/podcast/podcast_runner.py:135-161
传奇故事关联
所有剧集都关联到一个名为 'Freakonomics Podcast' 的传奇故事:
saga='Freakonomics Podcast'
这会创建一个 SagaNode,作为剧集链的容器。有关 HasEpisodeEdge 和 NextEpisodeEdge 的详细信息,请参见 10.5。
来源:examples/podcast/podcast_runner.py:141、examples/podcast/podcast_runner.py:160
搜索与检索
数据填充后,该示例使用两种方法执行搜索查询:
- 默认搜索:调用
client.searchexamples/podcast/podcast_runner.py:176,默认使用基于边的检索。 - 混合节点搜索:调用
client.search_examples/podcast/podcast_runner.py:182,使用NODE_HYBRID_SEARCH_RRFgraphiti_core/search/search_config_recipes.py:156-161。
node_results = await client.search_(
query,
group_ids=[group_id],
config=NODE_HYBRID_SEARCH_RRF.model_copy(update={'limit': 5}),
)
搜索数据流:
来源:examples/podcast/podcast_runner.py:168-191、graphiti_core/search/search.py:98-108、graphiti_core/search/search_config_recipes.py:156-161、graphiti_core/search/search_utils.py:230-260、graphiti_core/search/search_utils.py:300-330、graphiti_core/search/search_utils.py:600-609、graphiti_core/search/search_config.py:121-129
Token 使用量报告
该示例按提示名称排序打印 Token 消耗摘要:
examples/podcast/podcast_runner.py:164-165
client.token_tracker.print_summary(sort_by='prompt_name')
来源:examples/podcast/podcast_runner.py:164-165
运行示例
该脚本默认使用嵌入式 AsyncFalkorDB,无需安装外部数据库。
examples/podcast/podcast_runner.py:94-97
falkor_db_path = os.path.join(tempfile.gettempdir(), 'podcast_runner_falkordb.db')
falkor_db = AsyncFalkorDB(dbfilename=falkor_db_path)
falkor_driver = FalkorDriver(falkor_db=falkor_db)
运行步骤:
- 确保已设置
OPENAI_API_KEY环境变量。 - 运行
python examples/podcast/podcast_runner.py。
来源:examples/podcast/podcast_runner.py:94-97、examples/podcast/podcast_runner.py:193