arun
方法高效处理多个异步任务,确保了流畅且非阻塞的运行。
async_workflow.py
Copy
import asyncio
import json
from typing import AsyncIterator
import httpx
from agno.agent import Agent, RunResponse
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import RunEvent, Workflow
class AsyncHackerNewsReporter(Workflow):
description: str = (
"获取 Hacker News 的热门新闻并撰写一篇报告。"
)
hn_agent: Agent = Agent(
description="获取 hackernews 的热门新闻。 "
"分享所有可能的信息,包括 url、得分、标题和摘要(如果可用)。",
show_tool_calls=True,
)
writer: Agent = Agent(
tools=[Newspaper4kTools()],
description="就 hacker news 的热门新闻撰写一篇引人入胜的报告。",
instructions=[
"你将获得热门新闻及其链接。",
"仔细阅读每篇文章并思考其内容。",
"然后生成一篇最终的、值得《纽约时报》发布的文章。",
"将文章分成几个部分,并在结尾提供要点总结。",
"确保标题吸引人且引人入胜。",
"分享每篇文章的得分、标题、URL 和摘要。",
"为每个部分提供相关的标题,并在每个部分中提供详细信息/事实/过程。",
"忽略你无法阅读或理解的文章。",
"请记住:你正在为《纽约时报》写作,因此文章的质量很重要。",
],
)
async def get_top_hackernews_stories(self, num_stories: int = 10) -> str:
"""使用此函数从 Hacker News 获取热门新闻。
Args:
num_stories (int): 要返回的故事数量。默认为 10。
Returns:
str: 热门新闻的 JSON 字符串。
"""
async with httpx.AsyncClient() as client:
# 获取热门故事 ID
response = await client.get(
"https://hacker-news.firebaseio.com/v0/topstories.json"
)
story_ids = response.json()
# 并发获取故事详情
tasks = [
client.get(
f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
)
for story_id in story_ids[:num_stories]
]
responses = await asyncio.gather(*tasks)
stories = []
for response in responses:
story = response.json()
story["username"] = story["by"]
stories.append(story)
return json.dumps(stories)
async def arun(self, num_stories: int = 5) -> AsyncIterator[RunResponse]:
# 在此处设置 hn_agent 的工具,以避免循环引用
self.hn_agent.tools = [self.get_top_hackernews_stories]
logger.info(f"正在从 HackerNews 获取 {num_stories} 条热门故事。")
top_stories: RunResponse = await self.hn_agent.arun(num_stories=num_stories)
if top_stories is None or not top_stories.content:
yield RunResponse(
run_id=self.run_id,
content="抱歉,无法获取热门故事。",
event=RunEvent.workflow_completed,
)
return
logger.info("正在阅读每篇故事并撰写报告。")
# 从 writer.arun() 获取异步迭代器
writer_response = await self.writer.arun(top_stories.content, stream=True)
# 直接流式传输 writer 的响应
async for response in writer_response:
if response.content:
yield RunResponse(
content=response.content, event=response.event, run_id=self.run_id
)
if __name__ == "__main__":
import asyncio
async def main():
# 初始化工作流
workflow = AsyncHackerNewsReporter(debug_mode=False)
# 运行工作流并收集最终响应
final_content = []
try:
async for response in workflow.arun(num_stories=5):
if response.content:
final_content.append(response.content)
except Exception as e:
logger.error(f"运行工作流时出错: {e}")
return
# 创建包含合并内容的最终响应
if final_content:
final_response = RunResponse(
content="".join(final_content), event=RunEvent.workflow_completed
)
# 美化打印最终响应
pprint_run_response(final_response, markdown=True, show_time=True)
# 运行异步主函数
asyncio.run(main())
Usage
1
创建虚拟环境
打开
Terminal
并创建一个 python 虚拟环境。Copy
python3 -m venv .venv
source .venv/bin/activate
2
安装库
Copy
pip install agno newspaper4k lxml_html_clean openai httpx
3
运行代理
Copy
python async_workflow.py