Agno 的 MCP 集成支持 SSE 传输。此传输支持服务器到客户端的流式传输,在处理受限网络时可能比 stdio 更有用。

要使用它,请初始化 MCPTools,传入 MCP 服务器的 URL 并将传输设置为 sse

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools

server_url = "http://localhost:8000/sse"

async with MCPTools(url=server_url, transport="sse") as mcp_tools:
    agent = Agent(model=OpenAIChat(id="gpt-4o"), tools=[mcp_tools])
    await agent.aprint_response("What is the license for this project?", stream=True)

您还可以使用 server_params 参数来定义 MCP 连接。这样,您可以指定每次请求发送到 MCP 服务器的标头以及超时值:

from agno.tools.mcp import MCPTools, SSEClientParams

server_params = SSEClientParams(
    url=...,
    headers=...,
    timeout=...,
    sse_read_timeout=...,
)

async with MCPTools(server_params=server_params) as mcp_tools:
    ...

完整示例

设置一个简单的本地服务器并使用 SSE 传输连接到它:

1

设置服务器

sse_server.py
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("calendar_assistant")


@mcp.tool()
def get_events(day: str) -> str:
    return f"There are no events scheduled for {day}."


@mcp.tool()
def get_birthdays_this_week() -> str:
    return "It is your mom's birthday tomorrow"


if __name__ == "__main__":
    mcp.run(transport="sse")
2

设置客户端

sse_client.py
import asyncio

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools, MultiMCPTools

# 这是我们要使用的 MCP 服务器的 URL。
server_url = "http://localhost:8000/sse"


async def run_agent(message: str) -> None:
    async with MCPTools(transport="sse", url=server_url) as mcp_tools:
        agent = Agent(
            model=OpenAIChat(id="gpt-4o"),
            tools=[mcp_tools],
            markdown=True,
        )
        await agent.aprint_response(message=message, stream=True, markdown=True)


# 使用 MultiMCPTools,我们可以同时连接到多个 MCP 服务器,即使它们使用不同的传输。
# 在此示例中,我们同时连接到示例服务器(SSE 传输)和另一个服务器(stdio 传输)。
async def run_agent_with_multimcp(message: str) -> None:
    async with MultiMCPTools(
        commands=["npx -y @openbnb/mcp-server-airbnb --ignore-robots-txt"],
        urls=[server_url],
    ) as mcp_tools:
        agent = Agent(
            model=OpenAIChat(id="gpt-4o"),
            tools=[mcp_tools],
            markdown=True,
        )
        await agent.aprint_response(message=message, stream=True, markdown=True)


if __name__ == "__main__":
    asyncio.run(run_agent("Do I have any birthdays this week?"))
    asyncio.run(
        run_agent_with_multimcp(
            "Can you check when is my mom's birthday, and if there are any AirBnb listings in SF for two people for that day?"
        )
    )
3

运行服务器

python sse_server.py
4

运行客户端

python sse_client.py