Agno 支持使用 MongoDbStorage 类将 MongoDB 作为工作流的存储后端。

用法

运行 MongoDB

安装 docker desktop,然后使用以下命令在端口 27017 上运行 MongoDB

docker run --name mongodb -d -p 27017:27017 mongodb/mongodb-community-server:latest
mongodb_storage_for_workflow.py
import json
from typing import Iterator

import httpx
from agno.agent import Agent
from agno.run.response import RunResponse
from agno.storage.mongodb import MongoDbStorage
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import Workflow

db_url = "mongodb://localhost:27017"


class HackerNewsReporter(Workflow):
    description: str = (
        "获取 Hacker News 的热门新闻并撰写报告。"
    )

    hn_agent: Agent = Agent(
        description="获取 hacker news 的热门新闻。"
        "提供所有可能的信息,包括 url、分数、标题和摘要(如果可用)。",
        show_tool_calls=True,
    )

    writer: Agent = Agent(
        tools=[Newspaper4kTools()],
        description="撰写一份关于 hacker news 热门新闻的引人入胜的报告。",
        instructions=[
            "你将获得热门新闻及其链接。",
            "仔细阅读每篇文章并思考内容",
            "然后生成一篇最终的《纽约时报》级别的文章",
            "将文章分成几个部分并在结尾提供要点",
            "确保标题吸引人且引人入胜",
            "分享每篇文章的分数、标题、url 和摘要",
            "为每个部分提供相关的标题,并在每个部分中提供详细信息/事实/流程",
            "忽略你无法阅读或理解的文章。",
            "请记住:你是在为《纽约时报》写作,文章质量很重要。",
        ],
    )

    def get_top_hackernews_stories(self, num_stories: int = 10) -> str:
        """使用此函数从 Hacker News 获取热门新闻。

        Args:
            num_stories (int): 要返回的故事数量。默认为 10。

        Returns:
            str: 热门新闻的 JSON 字符串。
        """

        # 获取热门故事 ID
        response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json")
        story_ids = response.json()

        # 获取故事详情
        stories = []
        for story_id in story_ids[:num_stories]:
            story_response = httpx.get(
                f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
            )
            story = story_response.json()
            story["username"] = story["by"]
            stories.append(story)
        return json.dumps(stories)

    def run(self, num_stories: int = 5) -> Iterator[RunResponse]:
        # 在此处设置 hn_agent 的工具以避免循环引用
        self.hn_agent.tools = [self.get_top_hackernews_stories]

        logger.info(f"正在从 HackerNews 获取前 {num_stories} 条新闻。")
        top_stories: RunResponse = self.hn_agent.run(num_stories=num_stories)
        if top_stories is None or not top_stories.content:
            yield RunResponse(
                run_id=self.run_id, content="抱歉,无法获取热门新闻。"
            )
            return

        logger.info("正在阅读每篇新闻并撰写报告。")
        yield from self.writer.run(top_stories.content, stream=True)


if __name__ == "__main__":
    # 运行工作流
    storage = MongoDbStorage(
        collection_name="agent_sessions", db_url=db_url, db_name="agno"
    )
    storage.drop()
    report: Iterator[RunResponse] = HackerNewsReporter(
        storage=storage, debug_mode=False
    ).run(num_stories=5)
    # 打印报告
    pprint_run_response(report, markdown=True, show_time=True)

参数

参数类型默认描述
collection_namestr-用于存储 Workflow 会话的集合名称。
db_urlOptional[str]NoneMongoDB 连接 URL。
db_namestr"agno"数据库名称。
clientOptional[MongoClient]None可选的现有 MongoDB 客户端用于连接。

开发者资源