Graphiti 核心客户端
Graphiti 核心客户端
相关源文件
本章引用的主要源码文件:
graphiti_core/driver/falkordb/operations/graph_ops.pygraphiti_core/driver/neo4j/operations/graph_ops.pygraphiti_core/graphiti.pygraphiti_core/namespaces/edges.pygraphiti_core/namespaces/nodes.py
本文档记录了 Graphiti 类——所有 graphiti-core 操作的主要入口点——包括其构造函数参数、公开方法、结果类型以及它所暴露的 NodeNamespace/EdgeNamespace 访问器 API。
关于 Graphiti 在更广泛系统中的架构角色,请参见系统架构。关于可插拔提供者系统和 GraphitiClients 容器的文档,请参见多提供者插件架构。关于 search 所委托的搜索子系统,请参见搜索与检索系统。关于完整的事件入库管线,请参见数据处理管线。
概述
Graphiti(定义于 graphiti_core/graphiti.py:137-1051)是核心编排类。它持有所有提供者客户端的引用,暴露用于入库事件和搜索知识图谱的方法,并协调完整的提取-解析-持久化管线。
类关系图:
来源:graphiti_core/graphiti.py:137-246,graphiti_core/namespaces/nodes.py:29-234,graphiti_core/namespaces/edges.py:34-230
构造函数
graphiti_core/graphiti.py:138-246
Graphiti(
uri, user, password,
llm_client, embedder, cross_encoder,
store_raw_episode_content, graph_driver,
max_coroutines, tracer, trace_span_prefix
)
参数
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
uri | str | None | None | Neo4j 数据库的 URI。当未提供 graph_driver 时必填。 |
user | str | None | None | 数据库用户名。 |
password | str | None | None | 数据库密码。 |
llm_client | LLMClient | None | None | 大语言模型(LLM)提供者客户端。默认为 OpenAIClient()。 |
embedder | EmbedderClient | None | None | 嵌入向量提供者客户端。默认为 OpenAIEmbedder()。 |
cross_encoder | CrossEncoderClient | None | None | 重排序器客户端。默认为 OpenAIRerankerClient()。 |
store_raw_episode_content | bool | True | 如果为 False,则在持久化前清除事件内容以节省存储空间。 |
graph_driver | GraphDriver | None | None | 完全构造的驱动程序。会覆盖 uri/user/password。 |
max_coroutines | int | None | None | 并发限制;会覆盖 SEMAPHORE_LIMIT 环境变量。 |
tracer | Tracer | None | None | OpenTelemetry 追踪器。如果为 None,则追踪为空操作。 |
trace_span_prefix | str | 'graphiti' | 所有跨度名称的前缀。 |
默认提供者解析
当提供者参数为 None 时,构造函数会回退到该角色的默认提供者:
来源:graphiti_core/graphiti.py:203-238
关键属性
构造完成后,以下公开属性可用:
| 属性 | 类型 | 描述 |
|---|---|---|
driver | GraphDriver | 活跃的图数据库驱动程序。 |
llm_client | LLMClient | 用于提取和去重的大语言模型(LLM)客户端。 |
embedder | EmbedderClient | 用于节点/边嵌入向量生成的嵌入向量客户端。 |
cross_encoder | CrossEncoderClient | 搜索期间使用的重排序器。 |
tracer | Tracer | 追踪包装器;如果未配置则为空操作。 |
clients | GraphitiClients | 打包所有提供者客户端的便捷容器。 |
nodes | NodeNamespace | 用于低级节点操作的命名空间。 |
edges | EdgeNamespace | 用于低级边操作的命名空间。 |
store_raw_episode_content | bool | 是否持久化原始事件文本。 |
max_coroutines | int | None | semaphore_gather 的并发上限。 |
来源:graphiti_core/graphiti.py:203-246
GraphitiClients 容器
GraphitiClients(从 graphiti_core/graphiti_types.py 导入)是一个数据类,将所有提供者引用分组到一个对象中。子系统函数(例如 utils/maintenance/ 中的函数)接受 GraphitiClients 参数而不是单独的参数,从而避免了长参数列表。
self.clients 在构造时组装(graphiti_core/graphiti.py:232-238),并传递给内部管线函数,如 extract_nodes、resolve_extracted_nodes、extract_edges 和 search。
来源:graphiti_core/graphiti.py:232-238
NodeNamespace 和 EdgeNamespace
NodeNamespace 和 EdgeNamespace 从 graphiti_core/namespaces/ 导入,并分别实例化为 self.nodes 和 self.edges(graphiti_core/graphiti.py:241-242)。
它们提供了一个结构化的 API,用于对图中的单个节点和边执行直接的 CRUD 风格操作,而无需运行完整的事件入库管线。两者在构造时都接受 GraphDriver 和 EmbedderClient。
节点访问器
通过 graphiti.nodes 访问。
entity:EntityNodeNamespace(graphiti_core/namespaces/nodes.py:29-105)episode:EpisodeNodeNamespace(graphiti_core/namespaces/nodes.py:107-184)community:CommunityNodeNamespace(graphiti_core/namespaces/nodes.py:186-234)saga:SagaNodeNamespace(graphiti_core/namespaces/nodes.py:236-281)
边访问器
通过 graphiti.edges 访问。
entity:EntityEdgeNamespace(graphiti_core/namespaces/edges.py:34-111)episodic:EpisodicEdgeNamespace(graphiti_core/namespaces/edges.py:113-163)community:CommunityEdgeNamespace(graphiti_core/namespaces/edges.py:165-207)has_episode:HasEpisodeEdgeNamespace(graphiti_core/namespaces/edges.py:209-230)next_episode:NextEpisodeEdgeNamespace(graphiti_core/namespaces/edges.py:232-253)
来源:graphiti_core/graphiti.py:241-242,graphiti_core/namespaces/nodes.py:1-281,graphiti_core/namespaces/edges.py:1-253
token_tracker 属性
graphiti_core/graphiti.py:268-278
@property
def token_tracker(self):
return self.llm_client.token_tracker
返回大语言模型(LLM)客户端的 TokenUsageTracker,该追踪器会跟踪通过该实例进行的所有大语言模型(LLM)调用的累计 Token 使用量。追踪器上可用的方法包括 get_usage()、get_total_usage() 和 reset()。
来源:graphiti_core/graphiti.py:268-278
核心方法
build_indices_and_constraints
graphiti_core/graphiti.py:393-425
async def build_indices_and_constraints(delete_existing: bool = False)
委托给 self.driver.build_indices_and_constraints(delete_existing)。在首次使用前必须调用一次,以初始化数据库模式和索引。
close
graphiti_core/graphiti.py:311-341
async def close()
关闭底层驱动程序连接。当实例不再需要时应调用此方法。
retrieve_episodes
graphiti_core/graphiti.py:734-786
async def retrieve_episodes(
reference_time: datetime,
last_n: int = EPISODE_WINDOW_LEN,
group_ids: list[str] | None = None,
source: EpisodeType | None = None,
driver: GraphDriver | None = None,
saga: str | None = None,
) -> list[EpisodicNode]
检索在 reference_time 之前创建的最近 last_n 个 EpisodicNode 对象。使用 @handle_multiple_group_ids 装饰器。
来源:graphiti_core/graphiti.py:734-786
add_episode
graphiti_core/graphiti.py:788-1000
async def add_episode(
name, episode_body, source_description, reference_time,
source, group_id, uuid, update_communities,
entity_types, excluded_entity_types,
previous_episode_uuids, edge_types, edge_type_map,
custom_extraction_instructions, saga, saga_previous_episode_uuid
) -> AddEpisodeResults
主要的入库方法。对单个事件运行完整的提取-解析-持久化管线。
处理流程:
来源:graphiti_core/graphiti.py:788-1000,graphiti_core/utils/maintenance/edge_operations.py:88-175
add_episode_bulk
graphiti_core/graphiti.py:1002-1051
批量处理 RawEpisode 对象列表。这比在循环中调用 add_episode 效率高得多,因为它通过 extract_nodes_and_edges_bulk 执行批量去重和并行提取。
返回 AddBulkEpisodeResults。
来源:graphiti_core/graphiti.py:1002-1051,graphiti_core/utils/bulk_utils.py:128-149
build_communities
graphiti_core/graphiti.py:448-472
async def build_communities(group_ids: list[str] | None = None)
触发社区检测和摘要管线。它会移除现有社区,并使用标签传播和大语言模型(LLM)对实体集群进行摘要来重新创建社区。
来源:graphiti_core/graphiti.py:448-472,graphiti_core/utils/maintenance/community_operations.py:87
检索
graphiti_core/graphiti.py:657-712
search 方法委托给 graphiti_core/search/search.py 中的 search 函数,传递 GraphitiClients 容器和 SearchConfig。它返回一个 SearchResults 对象,其中包含匹配的 EntityEdge、EntityNode、EpisodicNode 和 CommunityNode 列表。
来源:graphiti_core/graphiti.py:657-712,graphiti_core/search/search.py:27-28
结果类型
AddEpisodeResults
graphiti_core/graphiti.py:114-121
由 add_episode 返回。包含处理过程中创建或更新的所有图对象。
| 字段 | 类型 | 描述 |
|---|---|---|
episode | EpisodicNode | 持久化的事件节点 |
episodic_edges | list[EpisodicEdge] | 将事件链接到实体节点的边 |
nodes | list[EntityNode] | 从事件中提取/解析的实体节点 |
edges | list[EntityEdge] | 提取/解析的实体边 |
communities | list[CommunityNode] | 更新的社区节点(如果请求) |
community_edges | list[CommunityEdge] | 更新的社区成员边 |
AddBulkEpisodeResults
graphiti_core/graphiti.py:123-130
与 AddEpisodeResults 具有相同的字段结构,但使用 episodes: list[EpisodicNode] 而不是单个 episode。
方法-代码映射图
此图将公开方法名称映射到它们调用的内部管线函数:
来源:graphiti_core/graphiti.py:788-1051,graphiti_core/utils/bulk_utils.py:128-149,graphiti_core/utils/maintenance/edge_operations.py:88-175
并发与遥测
- 并发:
max_coroutines通过semaphore_gather辅助函数(graphiti_core/graphiti.py:148-151)设置并行异步操作的上限。如果未提供,则从SEMAPHORE_LIMIT环境变量读取该上限。 - 遥测:在初始化时,
Graphiti会调用_capture_initialization_telemetry()(graphiti_core/graphiti.py:247-266),该函数会发出一个包含提供者类型名称的匿名graphiti_initialized事件。可以通过GRAPHITI_TELEMETRY_ENABLED环境变量禁用此功能。
来源:graphiti_core/graphiti.py:247-266,graphiti_core/helpers.py:45