异步操作
异步操作
相关源文件
本章引用的主要源码文件:
docs/api-reference/entities/delete-user.mdxdocs/api-reference/entities/get-users.mdxdocs/api-reference/events/get-event.mdxdocs/api-reference/events/get-events.mdxdocs/openapi.jsondocs/platform/quickstart.mdxmem0/__init__.pymem0/client/main.pymem0/configs/prompts.pymem0/memory/main.pymem0/memory/storage.pymem0/memory/utils.pytests/configs/test_prompts.pytests/memory/test_main.pytests/test_chatty_llm_parsing.pytests/test_main.pytests/test_memory.pytests/test_proxy.py
本文档介绍 Mem0 在 Python 和 TypeScript SDK 中对非阻塞异步操作的支持。异步操作支持并发内存管理,可提升 I/O 密集型应用的吞吐量和资源利用率。
概述
Mem0 提供异步 API,允许应用在不阻塞主执行线程的情况下执行内存操作。这在以下场景中尤为有价值:
- Web 服务器和 API:并发处理多个内存操作。
- 批量处理:并行处理大量内存操作。
- 实时应用:管理内存的同时保持响应能力。
- 高吞吐系统:通过并发请求最大化 I/O 利用率。
异步支持在不同平台上的实现方式不同:
- Python:提供
AsyncMemory和AsyncMemoryClient类,使用async/await语法。 - TypeScript:所有客户端方法默认使用 Promise 实现异步。
来源:mem0/__init__.py:5-7, mem0/memory/main.py:1-41, mem0/client/main.py:1-25
Python 异步架构
下图将自然语言层面的"内存操作"映射到实现异步操作的代码实体。
内存类到代码实体的映射
来源:mem0/memory/main.py:24-41, mem0/client/main.py:10-135, mem0/memory/storage.py:11-19, mem0/__init__.py:5-7
AsyncMemory 类(自托管)
AsyncMemory 类为自托管部署提供了所有核心内存操作的 async/await 版本。它继承自 MemoryBase,并镜像了 Memory 类的 API,但使用协程实现。
初始化
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig
# 使用配置初始化
config = MemoryConfig(
vector_store={
"provider": "qdrant",
"config": {
"host": "localhost",
"port": 6333,
}
}
)
memory = AsyncMemory(config=config)
关键方法
同步 Memory 类的所有方法在 AsyncMemory 中都有对应的异步版本:
| 同步方法 | 异步方法 | 描述 |
|---|---|---|
add() | await add() | 从消息中添加记忆(使用大语言模型进行提取) |
search() | await search() | 按查询搜索记忆(语义 + 关键词) |
get() | await get() | 按 ID 检索记忆 |
get_all() | await get_all() | 列出所有记忆并支持过滤 |
update() | await update() | 更新现有记忆 |
delete() | await delete() | 按 ID 删除记忆 |
delete_all() | await delete_all() | 基于过滤条件批量删除 |
history() | await history() | 检索指定记忆 ID 的历史记录 |
来源:mem0/memory/main.py:118-124, tests/memory/test_main.py:120-124, tests/test_memory.py:25-31
AsyncMemoryClient 类(平台)
AsyncMemoryClient 类为托管的 Mem0 平台 API 提供异步操作。
初始化
from mem0 import AsyncMemoryClient
client = AsyncMemoryClient(
api_key="your-api-key"
)
并发操作示例
使用 asyncio.gather 可以实现高并发的内存入库。
import asyncio
from mem0 import AsyncMemory
async def batch_operations():
memory = AsyncMemory()
tasks = [
memory.add(
messages=[{"role": "user", "content": f"Message {i}"}],
user_id=f"user_{i}"
)
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
来源:mem0/client/main.py:108-135, mem0/client/main.py:164-185
平台异步模式
async_mode 参数控制内存操作在平台端是同步执行还是异步执行。
平台执行流程
async_mode=True(默认):平台异步处理请求,并立即返回事件 ID。async_mode=False:平台同步处理请求,等待处理完成后返回最终结果。
来源:mem0/client/main.py:164-186, docs/openapi.json:123-144
性能考量
并发 vs 阻塞
异步操作通过重叠 I/O 等待时间,显著减少批量任务的总耗时。
内部实现细节
同步 Memory 类在内部使用 concurrent.futures.ThreadPoolExecutor 来并行化某些操作,同时保持阻塞的外部 API。例如,在 add 操作期间,事实提取和图更新可以并行触发。
相比之下,AsyncMemory 对这些任务使用原生 asyncio 模式,以避免阻塞事件循环。
来源:mem0/memory/main.py:1-11, tests/memory/test_main.py:126-153
最佳实践
- 生命周期管理:将
AsyncMemory包装在异步上下文管理器或 FastAPI 启动/关闭钩子中,以确保正确管理向量存储连接和SQLiteManager中的数据库句柄。 - 弹性:在使用
AsyncMemoryClient时,为网络绑定的异步操作实现重试机制。 - 事件循环感知:切勿在
async def函数之外调用AsyncMemory方法,以避免出现RuntimeError: no running event loop错误。 - 元数据保留:使用
update()时,确保正确传递元数据以保留上下文信息。
# 健壮的异步更新示例
# 来自 tests/memory/test_main.py:141-153
async def robust_update(memory_instance, mem_id, text, metadata):
result = await memory_instance.update(
memory_id=mem_id,
data=text,
metadata=metadata
)
return result
来源:mem0/memory/storage.py:11-19, tests/memory/test_main.py:141-153, mem0/client/main.py:164-186