agentic_huge_data_base / wiki
页面 Mem0 · 3.5 异步操作·DeepWiki 中文全文译文

3.5 · 异步操作(Asynchronous Operations)

长期记忆与上下文管理 · 聚焦本章的模块关系、源码依据与实现要点。

项目Mem0 章节3.5 状态全文译文 模块测试、发布与运维、界面与交互、记忆与上下文、接口与服务契约
源码线索
  • docs/api-reference/entities/delete-user.mdx
  • docs/api-reference/entities/get-users.mdx
  • docs/api-reference/events/get-event.mdx
  • docs/api-reference/events/get-events.mdx
  • docs/openapi.json
  • docs/platform/quickstart.mdx
  • mem0/__init__.py
  • mem0/client/main.py
  • mem0/configs/prompts.py
  • mem0/memory/main.py
模块标签
  • 测试、发布与运维
  • 界面与交互
  • 记忆与上下文
  • 接口与服务契约
  • 配置治理

章节正文

异步操作

异步操作

相关源文件

本章引用的主要源码文件:

  • docs/api-reference/entities/delete-user.mdx
  • docs/api-reference/entities/get-users.mdx
  • docs/api-reference/events/get-event.mdx
  • docs/api-reference/events/get-events.mdx
  • docs/openapi.json
  • docs/platform/quickstart.mdx
  • mem0/__init__.py
  • mem0/client/main.py
  • mem0/configs/prompts.py
  • mem0/memory/main.py
  • mem0/memory/storage.py
  • mem0/memory/utils.py
  • tests/configs/test_prompts.py
  • tests/memory/test_main.py
  • tests/test_chatty_llm_parsing.py
  • tests/test_main.py
  • tests/test_memory.py
  • tests/test_proxy.py

本文档介绍 Mem0 在 Python 和 TypeScript SDK 中对非阻塞异步操作的支持。异步操作支持并发内存管理,可提升 I/O 密集型应用的吞吐量和资源利用率。

概述

Mem0 提供异步 API,允许应用在不阻塞主执行线程的情况下执行内存操作。这在以下场景中尤为有价值:

  • Web 服务器和 API:并发处理多个内存操作。
  • 批量处理:并行处理大量内存操作。
  • 实时应用:管理内存的同时保持响应能力。
  • 高吞吐系统:通过并发请求最大化 I/O 利用率。

异步支持在不同平台上的实现方式不同:

  • Python:提供 AsyncMemoryAsyncMemoryClient 类,使用 async/await 语法。
  • TypeScript:所有客户端方法默认使用 Promise 实现异步。

来源:mem0/__init__.py:5-7, mem0/memory/main.py:1-41, mem0/client/main.py:1-25

Python 异步架构

下图将自然语言层面的"内存操作"映射到实现异步操作的代码实体。

内存类到代码实体的映射
Mem0 · 内存类到代码实体的映射 · 图 1
Mem0 · 内存类到代码实体的映射 · 图 1

来源:mem0/memory/main.py:24-41, mem0/client/main.py:10-135, mem0/memory/storage.py:11-19, mem0/__init__.py:5-7

AsyncMemory 类(自托管)

AsyncMemory 类为自托管部署提供了所有核心内存操作的 async/await 版本。它继承自 MemoryBase,并镜像了 Memory 类的 API,但使用协程实现。

初始化
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig

# 使用配置初始化
config = MemoryConfig(
    vector_store={
        "provider": "qdrant",
        "config": {
            "host": "localhost",
            "port": 6333,
        }
    }
)

memory = AsyncMemory(config=config)
关键方法

同步 Memory 类的所有方法在 AsyncMemory 中都有对应的异步版本:

同步方法异步方法描述
add()await add()从消息中添加记忆(使用大语言模型进行提取)
search()await search()按查询搜索记忆(语义 + 关键词)
get()await get()按 ID 检索记忆
get_all()await get_all()列出所有记忆并支持过滤
update()await update()更新现有记忆
delete()await delete()按 ID 删除记忆
delete_all()await delete_all()基于过滤条件批量删除
history()await history()检索指定记忆 ID 的历史记录

来源:mem0/memory/main.py:118-124, tests/memory/test_main.py:120-124, tests/test_memory.py:25-31

AsyncMemoryClient 类(平台)

AsyncMemoryClient 类为托管的 Mem0 平台 API 提供异步操作。

初始化
from mem0 import AsyncMemoryClient

client = AsyncMemoryClient(
    api_key="your-api-key"
)
并发操作示例

使用 asyncio.gather 可以实现高并发的内存入库。

import asyncio
from mem0 import AsyncMemory

async def batch_operations():
    memory = AsyncMemory()

    tasks = [
        memory.add(
            messages=[{"role": "user", "content": f"Message {i}"}],
            user_id=f"user_{i}"
        )
        for i in range(5)
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

来源:mem0/client/main.py:108-135, mem0/client/main.py:164-185

平台异步模式

async_mode 参数控制内存操作在平台端是同步执行还是异步执行。

平台执行流程
Mem0 · 平台执行流程 · 图 2
Mem0 · 平台执行流程 · 图 2
  • async_mode=True(默认):平台异步处理请求,并立即返回事件 ID。
  • async_mode=False:平台同步处理请求,等待处理完成后返回最终结果。

来源:mem0/client/main.py:164-186, docs/openapi.json:123-144

性能考量

并发 vs 阻塞

异步操作通过重叠 I/O 等待时间,显著减少批量任务的总耗时。

Mem0 · 并发 vs 阻塞 · 图 3
Mem0 · 并发 vs 阻塞 · 图 3
内部实现细节

同步 Memory 类在内部使用 concurrent.futures.ThreadPoolExecutor 来并行化某些操作,同时保持阻塞的外部 API。例如,在 add 操作期间,事实提取和图更新可以并行触发。

相比之下,AsyncMemory 对这些任务使用原生 asyncio 模式,以避免阻塞事件循环。

来源:mem0/memory/main.py:1-11, tests/memory/test_main.py:126-153

最佳实践

  1. 生命周期管理:将 AsyncMemory 包装在异步上下文管理器或 FastAPI 启动/关闭钩子中,以确保正确管理向量存储连接和 SQLiteManager 中的数据库句柄。
  2. 弹性:在使用 AsyncMemoryClient 时,为网络绑定的异步操作实现重试机制。
  3. 事件循环感知:切勿在 async def 函数之外调用 AsyncMemory 方法,以避免出现 RuntimeError: no running event loop 错误。
  4. 元数据保留:使用 update() 时,确保正确传递元数据以保留上下文信息。
# 健壮的异步更新示例
# 来自 tests/memory/test_main.py:141-153
async def robust_update(memory_instance, mem_id, text, metadata):
    result = await memory_instance.update(
        memory_id=mem_id,
        data=text,
        metadata=metadata
    )
    return result

来源:mem0/memory/storage.py:11-19, tests/memory/test_main.py:141-153, mem0/client/main.py:164-186