一个异步的 Hacker News 报告生成器,使用工作流获取最新新闻
AsyncHackerNewsReporter 是一个旨在异步获取 Hacker News 热门新闻并生成综合报告的工作流。该工作流利用 arun
方法高效处理多个异步任务,确保了流畅且非阻塞的运行。
import asyncio
import json
from typing import AsyncIterator
import httpx
from agno.agent import Agent, RunResponse
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import RunEvent, Workflow
class AsyncHackerNewsReporter(Workflow):
description: str = (
"获取 Hacker News 的热门新闻并撰写一篇报告。"
)
hn_agent: Agent = Agent(
description="获取 hackernews 的热门新闻。 "
"分享所有可能的信息,包括 url、得分、标题和摘要(如果可用)。",
show_tool_calls=True,
)
writer: Agent = Agent(
tools=[Newspaper4kTools()],
description="就 hacker news 的热门新闻撰写一篇引人入胜的报告。",
instructions=[
"你将获得热门新闻及其链接。",
"仔细阅读每篇文章并思考其内容。",
"然后生成一篇最终的、值得《纽约时报》发布的文章。",
"将文章分成几个部分,并在结尾提供要点总结。",
"确保标题吸引人且引人入胜。",
"分享每篇文章的得分、标题、URL 和摘要。",
"为每个部分提供相关的标题,并在每个部分中提供详细信息/事实/过程。",
"忽略你无法阅读或理解的文章。",
"请记住:你正在为《纽约时报》写作,因此文章的质量很重要。",
],
)
async def get_top_hackernews_stories(self, num_stories: int = 10) -> str:
"""使用此函数从 Hacker News 获取热门新闻。
Args:
num_stories (int): 要返回的故事数量。默认为 10。
Returns:
str: 热门新闻的 JSON 字符串。
"""
async with httpx.AsyncClient() as client:
# 获取热门故事 ID
response = await client.get(
"https://hacker-news.firebaseio.com/v0/topstories.json"
)
story_ids = response.json()
# 并发获取故事详情
tasks = [
client.get(
f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
)
for story_id in story_ids[:num_stories]
]
responses = await asyncio.gather(*tasks)
stories = []
for response in responses:
story = response.json()
story["username"] = story["by"]
stories.append(story)
return json.dumps(stories)
async def arun(self, num_stories: int = 5) -> AsyncIterator[RunResponse]:
# 在此处设置 hn_agent 的工具,以避免循环引用
self.hn_agent.tools = [self.get_top_hackernews_stories]
logger.info(f"正在从 HackerNews 获取 {num_stories} 条热门故事。")
top_stories: RunResponse = await self.hn_agent.arun(num_stories=num_stories)
if top_stories is None or not top_stories.content:
yield RunResponse(
run_id=self.run_id,
content="抱歉,无法获取热门故事。",
event=RunEvent.workflow_completed,
)
return
logger.info("正在阅读每篇故事并撰写报告。")
# 从 writer.arun() 获取异步迭代器
writer_response = await self.writer.arun(top_stories.content, stream=True)
# 直接流式传输 writer 的响应
async for response in writer_response:
if response.content:
yield RunResponse(
content=response.content, event=response.event, run_id=self.run_id
)
if __name__ == "__main__":
import asyncio
async def main():
# 初始化工作流
workflow = AsyncHackerNewsReporter(debug_mode=False)
# 运行工作流并收集最终响应
final_content = []
try:
async for response in workflow.arun(num_stories=5):
if response.content:
final_content.append(response.content)
except Exception as e:
logger.error(f"运行工作流时出错: {e}")
return
# 创建包含合并内容的最终响应
if final_content:
final_response = RunResponse(
content="".join(final_content), event=RunEvent.workflow_completed
)
# 美化打印最终响应
pprint_run_response(final_response, markdown=True, show_time=True)
# 运行异步主函数
asyncio.run(main())
创建虚拟环境
打开 Terminal
并创建一个 python 虚拟环境。
python3 -m venv .venv
source .venv/bin/activate
安装库
pip install agno newspaper4k lxml_html_clean openai httpx
运行代理
python async_workflow.py