新的 Streamable HTTP transport 取代了 2024-11-05 协议版本中的 HTTP+SSE transport。
此传输使 MCP 服务器能够处理多个客户端连接,同时还可以使用 SSE 进行服务器到客户端的流式传输。
要使用它,请初始化 MCPTools
,传入 MCP 服务器的 URL 并将 transport 设置为 streamable-http
:
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools
server_url = "http://localhost:8000/mcp"
async with MCPTools(url=server_url, transport="streamable-http") as mcp_tools:
agent = Agent(model=OpenAIChat(id="gpt-4o"), tools=[mcp_tools])
await agent.aprint_response("What is the license for this project?", stream=True)
您还可以使用 server_params
参数来定义 MCP 连接。这样,您可以指定每次请求发送到 MCP 服务器的标头和超时值:
from agno.tools.mcp import MCPTools, StreamableHTTPClientParams
server_params = StreamableHTTPClientParams(
url=...,
headers=...,
timeout=...,
sse_read_timeout=...,
terminate_on_close=...,
)
async with MCPTools(server_params=server_params) as mcp_tools:
...
让我们设置一个简单的本地服务器并使用 Streamable HTTP 传输进行连接:
设置服务器
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("calendar_assistant")
@mcp.tool()
def get_events(day: str) -> str:
return f"There are no events scheduled for {day}."
@mcp.tool()
def get_birthdays_this_week() -> str:
return "It is your mom's birthday tomorrow"
if __name__ == "__main__":
mcp.run(transport="streamable-http")
设置客户端
import asyncio
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools, MultiMCPTools
# 这是我们想要使用的 MCP 服务器的 URL。
server_url = "http://localhost:8000/mcp"
async def run_agent(message: str) -> None:
async with MCPTools(transport="streamable-http", url=server_url) as mcp_tools:
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[mcp_tools],
markdown=True,
)
await agent.aprint_response(message=message, stream=True, markdown=True)
# 使用 MultiMCPTools,我们可以一次连接到多个 MCP 服务器,即使它们使用不同的传输。
# 在此示例中,我们连接到我们的示例服务器(Streamable HTTP 传输)和一个不同的服务器(stdio 传输)。
async def run_agent_with_multimcp(message: str) -> None:
async with MultiMCPTools(
commands=["npx -y @openbnb/mcp-server-airbnb --ignore-robots-txt"],
urls=[server_url],
urls_transports=["streamable-http"],
) as mcp_tools:
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[mcp_tools],
markdown=True,
)
await agent.aprint_response(message=message, stream=True, markdown=True)
if __name__ == "__main__":
asyncio.run(run_agent("Do I have any birthdays this week?"))
asyncio.run(
run_agent_with_multimcp(
"Can you check when is my mom's birthday, and if there are any AirBnb listings in SF for two people for that day?"
)
)
运行服务器
python streamable_http_server.py
运行客户端
python sse_client.py