此示例演示了如何实现流式用户确认流程,从而实现实时交互和响应流。
import json
import httpx
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools import tool
from agno.utils import pprint
from rich.console import Console
from rich.prompt import Prompt
console = Console()
@tool(requires_confirmation=True)
def get_top_hackernews_stories(num_stories: int) -> str:
"""Fetch top stories from Hacker News.
Args:
num_stories (int): Number of stories to retrieve
Returns:
str: JSON string containing story details
"""
# Fetch top story IDs
response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json")
story_ids = response.json()
# Yield story details
all_stories = []
for story_id in story_ids[:num_stories]:
story_response = httpx.get(
f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
)
story = story_response.json()
if "text" in story:
story.pop("text", None)
all_stories.append(story)
return json.dumps(all_stories)
agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[get_top_hackernews_stories],
markdown=True,
)
for run_response in agent.run("Fetch the top 2 hackernews stories", stream=True):
if run_response.is_paused:
for tool in run_response.tools_requiring_confirmation:
# Ask for confirmation
console.print(
f"工具 {tool.tool_name}({tool.tool_args}) 需要确认。"
)
message = (
Prompt.ask("是否继续?", choices=["y", "n"], default="y")
.strip()
.lower()
)
if message == "n":
tool.confirmed = False
else:
tool.confirmed = True
run_response = agent.continue_run(run_response=run_response, stream=True)
pprint.pprint_run_response(run_response)
创建虚拟环境
打开 Terminal
并创建一个 python 虚拟环境。
python3 -m venv .venv
source .venv/bin/activate
设置你的 API 密钥
export OPENAI_API_KEY=xxx
安装库
pip install -U agno httpx rich openai
运行示例
python cookbook/agent_concepts/user_control_flows/confirmation_required_stream.py
agent.run(stream=True)
进行流式响应agent.continue_run(stream=True)
实现流式续接