本示例展示了在集成 Pipedream MCP 服务器时如何添加授权以与 Agno Agents 连接。
"""
🔒 使用 Pipedream MCP 服务器进行身份验证
这是一个关于如何使用 Pipedream MCP 服务器进行身份验证的示例。
如果您的应用程序代表用户与 MCP 服务器进行交互,这将非常有用。
1. 获取您的访问令牌。您可以在 Pipedream 的文档中查看如何操作:https://pipedream.com/docs/connect/mcp/developers/
2. 获取 MCP 服务器的 URL。它看起来会像这样:https://remote.mcp.pipedream.net/<External user id>/<MCP app slug>
3. 设置环境变量:
- MCP_SERVER_URL: 您之前获取的 MCP 服务器的 URL
- MCP_ACCESS_TOKEN: 您之前获取的访问令牌
- PIPEDREAM_PROJECT_ID: 您想使用的 Pipedream 项目的项目 ID
- PIPEDREAM_ENVIRONMENT: 您想使用的 Pipedream 环境的环境
3. 安装依赖项:pip install agno mcp-sdk
"""
import asyncio
from os import getenv
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools, StreamableHTTPClientParams
from agno.utils.log import log_exception
mcp_server_url = getenv("MCP_SERVER_URL")
mcp_access_token = getenv("MCP_ACCESS_TOKEN")
pipedream_project_id = getenv("PIPEDREAM_PROJECT_ID")
pipedream_environment = getenv("PIPEDREAM_ENVIRONMENT")
server_params = StreamableHTTPClientParams(
url=mcp_server_url,
headers={
"Authorization": f"Bearer {mcp_access_token}",
"x-pd-project-id": pipedream_project_id,
"x-pd-environment": pipedream_environment,
},
)
async def run_agent(task: str) -> None:
try:
async with MCPTools(
server_params=server_params, transport="streamable-http", timeout_seconds=20
) as mcp:
agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[mcp],
markdown=True,
)
await agent.aprint_response(message=task, stream=True)
except Exception as e:
log_exception(f"Unexpected error: {e}")
if __name__ == "__main__":
# Agent 可以读取 channels, users, messages 等
asyncio.run(run_agent("Show me the latest message in the channel #general"))