AsyncHackerNewsReporter 是一个旨在异步获取 Hacker News 热门新闻并生成综合报告的工作流。该工作流利用 arun 方法高效处理多个异步任务,确保了流畅且非阻塞的运行。

async_workflow.py
import asyncio
import json
from typing import AsyncIterator

import httpx
from agno.agent import Agent, RunResponse
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import RunEvent, Workflow


class AsyncHackerNewsReporter(Workflow):
    description: str = (
        "获取 Hacker News 的热门新闻并撰写一篇报告。"
    )

    hn_agent: Agent = Agent(
        description="获取 hackernews 的热门新闻。 "
        "分享所有可能的信息,包括 url、得分、标题和摘要(如果可用)。",
        show_tool_calls=True,
    )

    writer: Agent = Agent(
        tools=[Newspaper4kTools()],
        description="就 hacker news 的热门新闻撰写一篇引人入胜的报告。",
        instructions=[
            "你将获得热门新闻及其链接。",
            "仔细阅读每篇文章并思考其内容。",
            "然后生成一篇最终的、值得《纽约时报》发布的文章。",
            "将文章分成几个部分,并在结尾提供要点总结。",
            "确保标题吸引人且引人入胜。",
            "分享每篇文章的得分、标题、URL 和摘要。",
            "为每个部分提供相关的标题,并在每个部分中提供详细信息/事实/过程。",
            "忽略你无法阅读或理解的文章。",
            "请记住:你正在为《纽约时报》写作,因此文章的质量很重要。",
        ],
    )

    async def get_top_hackernews_stories(self, num_stories: int = 10) -> str:
        """使用此函数从 Hacker News 获取热门新闻。

        Args:
            num_stories (int): 要返回的故事数量。默认为 10。

        Returns:
            str: 热门新闻的 JSON 字符串。
        """
        async with httpx.AsyncClient() as client:
            # 获取热门故事 ID
            response = await client.get(
                "https://hacker-news.firebaseio.com/v0/topstories.json"
            )
            story_ids = response.json()

            # 并发获取故事详情
            tasks = [
                client.get(
                    f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
                )
                for story_id in story_ids[:num_stories]
            ]
            responses = await asyncio.gather(*tasks)

            stories = []
            for response in responses:
                story = response.json()
                story["username"] = story["by"]
                stories.append(story)

            return json.dumps(stories)

    async def arun(self, num_stories: int = 5) -> AsyncIterator[RunResponse]:
        # 在此处设置 hn_agent 的工具,以避免循环引用
        self.hn_agent.tools = [self.get_top_hackernews_stories]

        logger.info(f"正在从 HackerNews 获取 {num_stories} 条热门故事。")
        top_stories: RunResponse = await self.hn_agent.arun(num_stories=num_stories)
        if top_stories is None or not top_stories.content:
            yield RunResponse(
                run_id=self.run_id,
                content="抱歉,无法获取热门故事。",
                event=RunEvent.workflow_completed,
            )
            return

        logger.info("正在阅读每篇故事并撰写报告。")
        # 从 writer.arun() 获取异步迭代器
        writer_response = await self.writer.arun(top_stories.content, stream=True)

        # 直接流式传输 writer 的响应
        async for response in writer_response:
            if response.content:
                yield RunResponse(
                    content=response.content, event=response.event, run_id=self.run_id
                )


if __name__ == "__main__":
    import asyncio

    async def main():
        # 初始化工作流
        workflow = AsyncHackerNewsReporter(debug_mode=False)

        # 运行工作流并收集最终响应
        final_content = []
        try:
            async for response in workflow.arun(num_stories=5):
                if response.content:
                    final_content.append(response.content)
        except Exception as e:
            logger.error(f"运行工作流时出错: {e}")
            return

        # 创建包含合并内容的最终响应
        if final_content:
            final_response = RunResponse(
                content="".join(final_content), event=RunEvent.workflow_completed
            )
            # 美化打印最终响应
            pprint_run_response(final_response, markdown=True, show_time=True)

    # 运行异步主函数
    asyncio.run(main())

Usage

1

创建虚拟环境

打开 Terminal 并创建一个 python 虚拟环境。

python3 -m venv .venv
source .venv/bin/activate
2

安装库

pip install agno newspaper4k lxml_html_clean openai httpx
3

运行代理

python async_workflow.py