import asyncio
import time
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.zep import ZepAsyncTools
async def main():
# 初始化 ZepAsyncTools
zep_tools = ZepAsyncTools(
user_id="agno", session_id="agno-async-session", add_instructions=True
)
# 初始化 Agent
agent = Agent(
model=OpenAIChat(),
tools=[zep_tools],
context={
"memory": lambda: zep_tools.get_zep_memory(memory_type="context"),
},
add_context=True,
)
# 与 Agent 交互
await agent.aprint_response("My name is John Billings")
await agent.aprint_response("I live in NYC")
await agent.aprint_response("I'm going to a concert tomorrow")
# 允许内存同步到 Zep 数据库
time.sleep(10)
# 刷新上下文
agent.context["memory"] = await zep_tools.get_zep_memory(memory_type="context")
# 询问 Agent 关于用户的信息
await agent.aprint_response("What do you know about me?")
if __name__ == "__main__":
asyncio.run(main())
创建虚拟环境
打开 Terminal
并创建一个 python 虚拟环境。
python3 -m venv .venv
source .venv/bin/activate
设置您的 API 密钥
export OPENAI_API_KEY=xxx
export ZEP_API_KEY=xxx
安装库
pip install -U zep-cloud openai agno
运行 Agent
python cookbook/tools/zep_async_tools.py