Code

"""
🔒 使用 Pipedream MCP 服务器进行身份验证

这是一个关于如何使用 Pipedream MCP 服务器进行身份验证的示例。
如果您的应用程序代表用户与 MCP 服务器进行交互,这将非常有用。

1. 获取您的访问令牌。您可以在 Pipedream 的文档中查看如何操作:https://pipedream.com/docs/connect/mcp/developers/
2. 获取 MCP 服务器的 URL。它看起来会像这样:https://remote.mcp.pipedream.net/<External user id>/<MCP app slug>
3. 设置环境变量:
    - MCP_SERVER_URL: 您之前获取的 MCP 服务器的 URL
    - MCP_ACCESS_TOKEN: 您之前获取的访问令牌
    - PIPEDREAM_PROJECT_ID: 您想使用的 Pipedream 项目的项目 ID
    - PIPEDREAM_ENVIRONMENT: 您想使用的 Pipedream 环境的环境
3. 安装依赖项:pip install agno mcp-sdk
"""

import asyncio
from os import getenv

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools, StreamableHTTPClientParams
from agno.utils.log import log_exception

mcp_server_url = getenv("MCP_SERVER_URL")
mcp_access_token = getenv("MCP_ACCESS_TOKEN")
pipedream_project_id = getenv("PIPEDREAM_PROJECT_ID")
pipedream_environment = getenv("PIPEDREAM_ENVIRONMENT")


server_params = StreamableHTTPClientParams(
    url=mcp_server_url,
    headers={
        "Authorization": f"Bearer {mcp_access_token}",
        "x-pd-project-id": pipedream_project_id,
        "x-pd-environment": pipedream_environment,
    },
)


async def run_agent(task: str) -> None:
    try:
        async with MCPTools(
            server_params=server_params, transport="streamable-http", timeout_seconds=20
        ) as mcp:
            agent = Agent(
                model=OpenAIChat(id="gpt-4o-mini"),
                tools=[mcp],
                markdown=True,
            )
            await agent.aprint_response(message=task, stream=True)
    except Exception as e:
        log_exception(f"Unexpected error: {e}")


if __name__ == "__main__":
    # Agent 可以读取 channels, users, messages 等
    asyncio.run(run_agent("Show me the latest message in the channel #general"))