这个示例展示了一个由多个代理组成的团队如何协同管理和更新共享的会话状态。

代码

cookbook/teams/team_with_shared_state.py
from agno.agent.agent import Agent
from agno.models.openai.chat import OpenAIChat
from agno.team import Team


# 定义用于管理购物列表的工具
def add_item(agent: Agent, item: str) -> str:
    """将商品添加到购物列表并返回确认信息。

    Args:
        item (str): 要添加到购物列表的商品。
    """
    # 如果商品尚未在列表中,则添加该商品
    if item.lower() not in [
        i.lower() for i in agent.team_session_state["shopping_list"]
    ]:
        agent.team_session_state["shopping_list"].append(item)
        return f"已将 '{item}' 添加到购物列表"
    else:
        return f"'{item}' 已在购物列表中"


def remove_item(agent: Agent, item: str) -> str:
    """按名称从购物列表中移除商品。

    Args:
        item (str): 要从购物列表中移除的商品。
    """
    # 忽略大小写进行搜索
    for i, list_item in enumerate(agent.team_session_state["shopping_list"]):
        if list_item.lower() == item.lower():
            agent.team_session_state["shopping_list"].pop(i)
            return f"已从购物列表中移除 '{list_item}'"

    return f"在购物列表中未找到 '{item}'。当前购物列表:{agent.team_session_state['shopping_list']}"


def remove_all_items(agent: Agent) -> str:
    """移除购物列表中的所有商品。"""
    agent.team_session_state["shopping_list"] = []
    return "购物列表中的所有商品均已移除"


shopping_list_agent = Agent(
    name="Shopping List Agent",
    role="管理购物列表",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[add_item, remove_item, remove_all_items],
    instructions=[
        "查找维基百科中关于公司的信息",
    ],
)


def list_items(team: Team) -> str:
    """列出购物列表中的所有商品。"""
    shopping_list = team.session_state["shopping_list"]

    if not shopping_list:
        return "购物列表为空。"

    items_text = "\n".join([f"- {item}" for item in shopping_list])
    return f"当前购物列表:\n{items_text}"


shopping_team = Team(
    name="Shopping List Team",
    mode="coordinate",
    model=OpenAIChat(id="gpt-4o-mini"),
    session_state={"shopping_list": []},
    tools=[list_items],
    members=[
        shopping_list_agent,
    ],
    show_tool_calls=True,
    markdown=True,
    instructions=[
        "你们是一个管理购物列表的团队。",
        "如果需要添加或删除购物列表中的商品,请将完整的请求转发给购物列表代理(不要将其拆分为多个请求)。",
        "如果需要列出购物列表中的商品,请使用 list_items 工具。",
        "如果用户从购物列表中获取了某件商品,意味着可以将其从购物列表中移除。",
    ],
    show_members_responses=True,
)

# 示例用法
shopping_team.print_response(
    "将牛奶、鸡蛋和面包添加到购物列表", stream=True
)
print(f"Session state: {shopping_team.session_state}")

shopping_team.print_response("我拿到了面包", stream=True)
print(f"Session state: {shopping_team.session_state}")

shopping_team.print_response("我需要苹果和橘子", stream=True)
print(f"Session state: {shopping_team.session_state}")

shopping_team.print_response("我的列表里有什么?", stream=True)
print(f"Session state: {shopping_team.session_state}")

shopping_team.print_response(
    "将我列表中的所有东西都清空,然后重新开始,只添加香蕉和酸奶",
    stream=True,
)
print(f"Session state: {shopping_team.session_state}")

用法

1

创建虚拟环境

打开 Terminal 并创建一个 python 虚拟环境。

python3 -m venv .venv
source .venv/bin/activate
2

设置你的 API 密钥

export OPENAI_API_KEY=xxx
3

安装库

pip install -U openai agno
4

运行示例

python cookbook/teams/team_with_shared_state.py