agentic_huge_data_base / wiki
页面 Graphiti · 4.1 Graphiti 核心客户端·DeepWiki 中文全文译文

4.1 · Graphiti 核心客户端(Graphiti Core Client)

时序知识图谱与动态事实记忆 · 聚焦本章的模块关系、源码依据与实现要点。

项目Graphiti 章节4.1 状态全文译文 模块模型调用与提供方适配、界面与交互、图谱与关系、测试、发布与运维
源码线索
  • graphiti_core/driver/falkordb/operations/graph_ops.py
  • graphiti_core/driver/neo4j/operations/graph_ops.py
  • graphiti_core/graphiti.py
  • graphiti_core/namespaces/edges.py
  • graphiti_core/namespaces/nodes.py
模块标签
  • 模型调用与提供方适配
  • 界面与交互
  • 图谱与关系
  • 测试、发布与运维
  • 检索、召回与索引

章节正文

Graphiti 核心客户端

Graphiti 核心客户端

相关源文件

本章引用的主要源码文件:

  • graphiti_core/driver/falkordb/operations/graph_ops.py
  • graphiti_core/driver/neo4j/operations/graph_ops.py
  • graphiti_core/graphiti.py
  • graphiti_core/namespaces/edges.py
  • graphiti_core/namespaces/nodes.py

本文档记录了 Graphiti 类——所有 graphiti-core 操作的主要入口点——包括其构造函数参数、公开方法、结果类型以及它所暴露的 NodeNamespace/EdgeNamespace 访问器 API。

关于 Graphiti 在更广泛系统中的架构角色,请参见系统架构。关于可插拔提供者系统和 GraphitiClients 容器的文档,请参见多提供者插件架构。关于 search 所委托的搜索子系统,请参见搜索与检索系统。关于完整的事件入库管线,请参见数据处理管线

概述

Graphiti(定义于 graphiti_core/graphiti.py:137-1051)是核心编排类。它持有所有提供者客户端的引用,暴露用于入库事件和搜索知识图谱的方法,并协调完整的提取-解析-持久化管线。

类关系图:

Graphiti · 概述 · 图 1
Graphiti · 概述 · 图 1

来源:graphiti_core/graphiti.py:137-246graphiti_core/namespaces/nodes.py:29-234graphiti_core/namespaces/edges.py:34-230

构造函数

graphiti_core/graphiti.py:138-246

Graphiti(
    uri, user, password,
    llm_client, embedder, cross_encoder,
    store_raw_episode_content, graph_driver,
    max_coroutines, tracer, trace_span_prefix
)
参数
参数类型默认值描述
uristr | NoneNoneNeo4j 数据库的 URI。当未提供 graph_driver 时必填。
userstr | NoneNone数据库用户名。
passwordstr | NoneNone数据库密码。
llm_clientLLMClient | NoneNone大语言模型(LLM)提供者客户端。默认为 OpenAIClient()
embedderEmbedderClient | NoneNone嵌入向量提供者客户端。默认为 OpenAIEmbedder()
cross_encoderCrossEncoderClient | NoneNone重排序器客户端。默认为 OpenAIRerankerClient()
store_raw_episode_contentboolTrue如果为 False,则在持久化前清除事件内容以节省存储空间。
graph_driverGraphDriver | NoneNone完全构造的驱动程序。会覆盖 uri/user/password
max_coroutinesint | NoneNone并发限制;会覆盖 SEMAPHORE_LIMIT 环境变量。
tracerTracer | NoneNoneOpenTelemetry 追踪器。如果为 None,则追踪为空操作。
trace_span_prefixstr'graphiti'所有跨度名称的前缀。
默认提供者解析

当提供者参数为 None 时,构造函数会回退到该角色的默认提供者:

Graphiti · 默认提供者解析 · 图 2
Graphiti · 默认提供者解析 · 图 2

来源:graphiti_core/graphiti.py:203-238

关键属性

构造完成后,以下公开属性可用:

属性类型描述
driverGraphDriver活跃的图数据库驱动程序。
llm_clientLLMClient用于提取和去重的大语言模型(LLM)客户端。
embedderEmbedderClient用于节点/边嵌入向量生成的嵌入向量客户端。
cross_encoderCrossEncoderClient搜索期间使用的重排序器。
tracerTracer追踪包装器;如果未配置则为空操作。
clientsGraphitiClients打包所有提供者客户端的便捷容器。
nodesNodeNamespace用于低级节点操作的命名空间。
edgesEdgeNamespace用于低级边操作的命名空间。
store_raw_episode_contentbool是否持久化原始事件文本。
max_coroutinesint | Nonesemaphore_gather 的并发上限。

来源:graphiti_core/graphiti.py:203-246

GraphitiClients 容器

GraphitiClients(从 graphiti_core/graphiti_types.py 导入)是一个数据类,将所有提供者引用分组到一个对象中。子系统函数(例如 utils/maintenance/ 中的函数)接受 GraphitiClients 参数而不是单独的参数,从而避免了长参数列表。

Graphiti · GraphitiClients 容器 · 图 3
Graphiti · GraphitiClients 容器 · 图 3

self.clients 在构造时组装(graphiti_core/graphiti.py:232-238),并传递给内部管线函数,如 extract_nodesresolve_extracted_nodesextract_edgessearch

来源:graphiti_core/graphiti.py:232-238

NodeNamespaceEdgeNamespace

NodeNamespaceEdgeNamespacegraphiti_core/namespaces/ 导入,并分别实例化为 self.nodesself.edgesgraphiti_core/graphiti.py:241-242)。

它们提供了一个结构化的 API,用于对图中的单个节点和边执行直接的 CRUD 风格操作,而无需运行完整的事件入库管线。两者在构造时都接受 GraphDriverEmbedderClient

节点访问器

通过 graphiti.nodes 访问。

  • entityEntityNodeNamespacegraphiti_core/namespaces/nodes.py:29-105
  • episodeEpisodeNodeNamespacegraphiti_core/namespaces/nodes.py:107-184
  • communityCommunityNodeNamespacegraphiti_core/namespaces/nodes.py:186-234
  • sagaSagaNodeNamespacegraphiti_core/namespaces/nodes.py:236-281
边访问器

通过 graphiti.edges 访问。

  • entityEntityEdgeNamespacegraphiti_core/namespaces/edges.py:34-111
  • episodicEpisodicEdgeNamespacegraphiti_core/namespaces/edges.py:113-163
  • communityCommunityEdgeNamespacegraphiti_core/namespaces/edges.py:165-207
  • has_episodeHasEpisodeEdgeNamespacegraphiti_core/namespaces/edges.py:209-230
  • next_episodeNextEpisodeEdgeNamespacegraphiti_core/namespaces/edges.py:232-253

来源:graphiti_core/graphiti.py:241-242graphiti_core/namespaces/nodes.py:1-281graphiti_core/namespaces/edges.py:1-253

token_tracker 属性

graphiti_core/graphiti.py:268-278

@property
def token_tracker(self):
    return self.llm_client.token_tracker

返回大语言模型(LLM)客户端的 TokenUsageTracker,该追踪器会跟踪通过该实例进行的所有大语言模型(LLM)调用的累计 Token 使用量。追踪器上可用的方法包括 get_usage()get_total_usage()reset()

来源:graphiti_core/graphiti.py:268-278

核心方法

build_indices_and_constraints

graphiti_core/graphiti.py:393-425

async def build_indices_and_constraints(delete_existing: bool = False)

委托给 self.driver.build_indices_and_constraints(delete_existing)。在首次使用前必须调用一次,以初始化数据库模式和索引。

close

graphiti_core/graphiti.py:311-341

async def close()

关闭底层驱动程序连接。当实例不再需要时应调用此方法。

retrieve_episodes

graphiti_core/graphiti.py:734-786

async def retrieve_episodes(
    reference_time: datetime,
    last_n: int = EPISODE_WINDOW_LEN,
    group_ids: list[str] | None = None,
    source: EpisodeType | None = None,
    driver: GraphDriver | None = None,
    saga: str | None = None,
) -> list[EpisodicNode]

检索在 reference_time 之前创建的最近 last_nEpisodicNode 对象。使用 @handle_multiple_group_ids 装饰器。

来源:graphiti_core/graphiti.py:734-786

add_episode

graphiti_core/graphiti.py:788-1000

async def add_episode(
    name, episode_body, source_description, reference_time,
    source, group_id, uuid, update_communities,
    entity_types, excluded_entity_types,
    previous_episode_uuids, edge_types, edge_type_map,
    custom_extraction_instructions, saga, saga_previous_episode_uuid
) -> AddEpisodeResults

主要的入库方法。对单个事件运行完整的提取-解析-持久化管线。

处理流程:

Graphiti · add_episode · 图 4
Graphiti · add_episode · 图 4

来源:graphiti_core/graphiti.py:788-1000graphiti_core/utils/maintenance/edge_operations.py:88-175

add_episode_bulk

graphiti_core/graphiti.py:1002-1051

批量处理 RawEpisode 对象列表。这比在循环中调用 add_episode 效率高得多,因为它通过 extract_nodes_and_edges_bulk 执行批量去重和并行提取。

返回 AddBulkEpisodeResults

来源:graphiti_core/graphiti.py:1002-1051graphiti_core/utils/bulk_utils.py:128-149

build_communities

graphiti_core/graphiti.py:448-472

async def build_communities(group_ids: list[str] | None = None)

触发社区检测和摘要管线。它会移除现有社区,并使用标签传播和大语言模型(LLM)对实体集群进行摘要来重新创建社区。

来源:graphiti_core/graphiti.py:448-472graphiti_core/utils/maintenance/community_operations.py:87

检索

graphiti_core/graphiti.py:657-712

search 方法委托给 graphiti_core/search/search.py 中的 search 函数,传递 GraphitiClients 容器和 SearchConfig。它返回一个 SearchResults 对象,其中包含匹配的 EntityEdgeEntityNodeEpisodicNodeCommunityNode 列表。

来源:graphiti_core/graphiti.py:657-712graphiti_core/search/search.py:27-28

结果类型

AddEpisodeResults

graphiti_core/graphiti.py:114-121

add_episode 返回。包含处理过程中创建或更新的所有图对象。

字段类型描述
episodeEpisodicNode持久化的事件节点
episodic_edgeslist[EpisodicEdge]将事件链接到实体节点的边
nodeslist[EntityNode]从事件中提取/解析的实体节点
edgeslist[EntityEdge]提取/解析的实体边
communitieslist[CommunityNode]更新的社区节点(如果请求)
community_edgeslist[CommunityEdge]更新的社区成员边
AddBulkEpisodeResults

graphiti_core/graphiti.py:123-130

AddEpisodeResults 具有相同的字段结构,但使用 episodes: list[EpisodicNode] 而不是单个 episode

方法-代码映射图

此图将公开方法名称映射到它们调用的内部管线函数:

Graphiti · 方法-代码映射图 · 图 5
Graphiti · 方法-代码映射图 · 图 5

来源:graphiti_core/graphiti.py:788-1051graphiti_core/utils/bulk_utils.py:128-149graphiti_core/utils/maintenance/edge_operations.py:88-175

并发与遥测

  • 并发max_coroutines 通过 semaphore_gather 辅助函数(graphiti_core/graphiti.py:148-151)设置并行异步操作的上限。如果未提供,则从 SEMAPHORE_LIMIT 环境变量读取该上限。
  • 遥测:在初始化时,Graphiti 会调用 _capture_initialization_telemetry()graphiti_core/graphiti.py:247-266),该函数会发出一个包含提供者类型名称的匿名 graphiti_initialized 事件。可以通过 GRAPHITI_TELEMETRY_ENABLED 环境变量禁用此功能。

来源:graphiti_core/graphiti.py:247-266graphiti_core/helpers.py:45