Agno 的 MCP 集成支持 SSE 传输。此传输支持服务器到客户端的流式传输,在处理受限网络时可能比 stdio 更有用。
要使用它,请初始化 MCPTools
,传入 MCP 服务器的 URL 并将传输设置为 sse
:
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools
server_url = "http://localhost:8000/sse"
async with MCPTools(url=server_url, transport="sse") as mcp_tools:
agent = Agent(model=OpenAIChat(id="gpt-4o"), tools=[mcp_tools])
await agent.aprint_response("What is the license for this project?", stream=True)
您还可以使用 server_params
参数来定义 MCP 连接。这样,您可以指定每次请求发送到 MCP 服务器的标头以及超时值:
from agno.tools.mcp import MCPTools, SSEClientParams
server_params = SSEClientParams(
url=...,
headers=...,
timeout=...,
sse_read_timeout=...,
)
async with MCPTools(server_params=server_params) as mcp_tools:
...
设置一个简单的本地服务器并使用 SSE 传输连接到它:
设置服务器
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("calendar_assistant")
@mcp.tool()
def get_events(day: str) -> str:
return f"There are no events scheduled for {day}."
@mcp.tool()
def get_birthdays_this_week() -> str:
return "It is your mom's birthday tomorrow"
if __name__ == "__main__":
mcp.run(transport="sse")
设置客户端
import asyncio
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools, MultiMCPTools
# 这是我们要使用的 MCP 服务器的 URL。
server_url = "http://localhost:8000/sse"
async def run_agent(message: str) -> None:
async with MCPTools(transport="sse", url=server_url) as mcp_tools:
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[mcp_tools],
markdown=True,
)
await agent.aprint_response(message=message, stream=True, markdown=True)
# 使用 MultiMCPTools,我们可以同时连接到多个 MCP 服务器,即使它们使用不同的传输。
# 在此示例中,我们同时连接到示例服务器(SSE 传输)和另一个服务器(stdio 传输)。
async def run_agent_with_multimcp(message: str) -> None:
async with MultiMCPTools(
commands=["npx -y @openbnb/mcp-server-airbnb --ignore-robots-txt"],
urls=[server_url],
) as mcp_tools:
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[mcp_tools],
markdown=True,
)
await agent.aprint_response(message=message, stream=True, markdown=True)
if __name__ == "__main__":
asyncio.run(run_agent("Do I have any birthdays this week?"))
asyncio.run(
run_agent_with_multimcp(
"Can you check when is my mom's birthday, and if there are any AirBnb listings in SF for two people for that day?"
)
)
运行服务器
python sse_server.py
运行客户端
python sse_client.py