Workflows 是为生产应用程序构建的确定性、有状态、多代理程序。它们经过实战检验,功能强大,并提供以下优势:

  • 纯 Python: 使用标准的 Python 构建您的工作流程逻辑。在构建了数百个代理系统后,没有任何框架或基于步骤的方法能提供纯 Python 所具有的灵活性和可靠性。想要循环?使用 while/for;想要条件?使用 if/else;想要异常处理?使用 try/except
  • 完全控制和灵活性: 因为您的工作流程逻辑是一个 Python 函数,所以您可以完全控制整个过程,例如在处理之前验证输入,生成代理并并行运行它们,根据需要缓存结果以及纠正任何中间错误。这种级别的控制对于可靠性至关重要。
  • 内置存储和缓存: Workflows 附带内置的存储和状态管理。使用 session_state 来缓存中间结果。这种方法的一大优势是,您可以在单独的进程中触发工作流程,稍后进行轮询以获取结果,这意味着您不会遇到长时间运行的工作流程中非常常见的请求超时问题。

因为工作流程逻辑是一个 Python 函数,所以 AI 代码编辑器可以为您编写工作流程。只需将 https://docs.agno.com 添加为文档源即可。

最棒的部分

无需学习新东西!您已经掌握了 Python,已经掌握了如何构建 Agents 和 Teams——现在只需使用常规的 Python 代码将它们组合起来即可。无需学习新的 DSL 或语法。

这是一个缓存输出的简单工作流程。您可以看到即使是“存储状态”发生在响应后产生,您对整个过程的控制程度。

simple_cache_workflow.py
from typing import Iterator

from agno.agent import Agent, RunResponse
from agno.models.openai import OpenAIChat
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import Workflow


class CacheWorkflow(Workflow):
    # 纯粹描述性的,工作流程不使用
    description: str = "A workflow that caches previous outputs"

    # 将代理或团队添加为工作流程的属性
    agent = Agent(model=OpenAIChat(id="gpt-4o-mini"))

    # 在 `run()` 方法中编写逻辑
    def run(self, message: str) -> Iterator[RunResponse]:
        logger.info(f"Checking cache for '{message}'")
        # 检查输出是否已缓存
        if self.session_state.get(message):
            logger.info(f"Cache hit for '{message}'")
            yield RunResponse(run_id=self.run_id, content=self.session_state.get(message))
            return

        logger.info(f"Cache miss for '{message}'")
        # 运行代理并产生响应
        yield from self.agent.run(message, stream=True)

        # 请求产生后缓存输出
        self.session_state[message] = self.agent.run_response.content


if __name__ == "__main__":
    workflow = CacheWorkflow()
    # 运行工作流程(此操作需要大约 1 秒)
    response: Iterator[RunResponse] = workflow.run(message="Tell me a joke.")
    # 打印响应
    pprint_run_response(response, markdown=True, show_time=True)
    # 再次运行工作流程(由于缓存,此操作立即完成)
    response: Iterator[RunResponse] = workflow.run(message="Tell me a joke.")
    # 打印响应
    pprint_run_response(response, markdown=True, show_time=True)

如何构建工作流程

  1. 通过继承 Workflow 类来定义您的工作流程类。
  2. 将代理或团队添加为工作流程的属性。这不是严格要求,只是帮助我们将代理的会话 ID 映射到工作流程的会话 ID。
  3. run() 方法中实现工作流程逻辑。这是运行工作流程时将调用的主函数(工作流程入口点)。此函数使我们能够充分控制整个过程,某些代理可以流式传输,其他代理可以生成结构化输出,代理可以使用 async.gather() 并行运行,某些代理可以具有在返回响应之前运行的验证逻辑。

您也可以使用 arun 方法异步执行工作流程。这允许在调用代理时进行更高效的非阻塞操作。有关详细示例,请参阅 Async Workflows 示例

完整示例:博客文章生成器

让我们创建一个博客文章生成器,它可以搜索网络、阅读热门链接并为我们撰写博客文章。我们将缓存数据库中的中间结果以提高性能。

创建工作流程

  1. 通过继承 Workflow 类来定义您的工作流程类。
blog_post_generator.py
from agno.workflow import Workflow

class BlogPostGenerator(Workflow):
    pass
  1. 向工作流程添加一个或多个代理,并在 run() 方法中实现工作流程逻辑。
blog_post_generator.py
import json
from textwrap import dedent
from typing import Dict, Iterator, Optional

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.storage.sqlite import SqliteStorage
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import RunEvent, RunResponse, Workflow
from pydantic import BaseModel, Field


class NewsArticle(BaseModel):
    title: str = Field(..., description="Title of the article.")
    url: str = Field(..., description="Link to the article.")
    summary: Optional[str] = Field(
        ..., description="Summary of the article if available."
    )


class SearchResults(BaseModel):
    articles: list[NewsArticle]


class ScrapedArticle(BaseModel):
    title: str = Field(..., description="Title of the article.")
    url: str = Field(..., description="Link to the article.")
    summary: Optional[str] = Field(
        ..., description="Summary of the article if available."
    )
    content: Optional[str] = Field(
        ...,
        description="Full article content in markdown format. None if content is unavailable.",
    )


class BlogPostGenerator(Workflow):
    """Advanced workflow for generating professional blog posts with proper research and citations."""

    description: str = dedent("""\
    An intelligent blog post generator that creates engaging, well-researched content.
    This workflow orchestrates multiple AI agents to research, analyze, and craft
    compelling blog posts that combine journalistic rigor with engaging storytelling.
    The system excels at creating content that is both informative and optimized for
    digital consumption.
    """)

    # Search Agent: Handles intelligent web searching and source gathering
    searcher: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[DuckDuckGoTools()],
        description=dedent("""\
        You are BlogResearch-X, an elite research assistant specializing in discovering
        high-quality sources for compelling blog content. Your expertise includes:

        - Finding authoritative and trending sources
        - Evaluating content credibility and relevance
        - Identifying diverse perspectives and expert opinions
        - Discovering unique angles and insights
        - Ensuring comprehensive topic coverage\
        """),
        instructions=dedent("""\
        1. Search Strategy 🔍
           - Find 10-15 relevant sources and select the 5-7 best ones
           - Prioritize recent, authoritative content
           - Look for unique angles and expert insights
        2. Source Evaluation 📊
           - Verify source credibility and expertise
           - Check publication dates for timeliness
           - Assess content depth and uniqueness
        3. Diversity of Perspectives 🌐
           - Include different viewpoints
           - Gather both mainstream and expert opinions
           - Find supporting data and statistics\
        """),
        response_model=SearchResults,
    )

    # Content Scraper: Extracts and processes article content
    article_scraper: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[Newspaper4kTools()],
        description=dedent("""\
        You are ContentBot-X, a specialist in extracting and processing digital content
        for blog creation. Your expertise includes:

        - Efficient content extraction
        - Smart formatting and structuring
        - Key information identification
        - Quote and statistic preservation
        - Maintaining source attribution\
        """),
        instructions=dedent("""\
        1. Content Extraction 📑
           - Extract content from the article
           - Preserve important quotes and statistics
           - Maintain proper attribution
           - Handle paywalls gracefully
        2. Content Processing 🔄
           - Format text in clean markdown
           - Preserve key information
           - Structure content logically
        3. Quality Control ✅
           - Verify content relevance
           - Ensure accurate extraction
           - Maintain readability\
        """),
        response_model=ScrapedArticle,
        structured_outputs=True,
    )

    # Content Writer Agent: Crafts engaging blog posts from research
    writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        description=dedent("""\
        You are BlogMaster-X, an elite content creator combining journalistic excellence
        with digital marketing expertise. Your strengths include:

        - Crafting viral-worthy headlines
        - Writing engaging introductions
        - Structuring content for digital consumption
        - Incorporating research seamlessly
        - Optimizing for SEO while maintaining quality
        - Creating shareable conclusions\
        """),
        instructions=dedent("""\
        1. Content Strategy 📝
           - Craft attention-grabbing headlines
           - Write compelling introductions
           - Structure content for engagement
           - Include relevant subheadings
        2. Writing Excellence ✍️
           - Balance expertise with accessibility
           - Use clear, engaging language
           - Include relevant examples
           - Incorporate statistics naturally
        3. Source Integration 🔍
           - Cite sources properly
           - Include expert quotes
           - Maintain factual accuracy
        4. Digital Optimization 💻
           - Structure for scanability
           - Include shareable takeaways
           - Optimize for SEO
           - Add engaging subheadings\
        """),
        expected_output=dedent("""\
        # {Viral-Worthy Headline}

        ## Introduction
        {Engaging hook and context}

        ## {Compelling Section 1}
        {Key insights and analysis}
        {Expert quotes and statistics}

        ## {Engaging Section 2}
        {Deeper exploration}
        {Real-world examples}

        ## {Practical Section 3}
        {Actionable insights}
        {Expert recommendations}

        ## Key Takeaways
        - {Shareable insight 1}
        - {Practical takeaway 2}
        - {Notable finding 3}

        ## Sources
        {Properly attributed sources with links}\
        """),
        markdown=True,
    )

    def run(
        self,
        topic: str,
        use_search_cache: bool = True,
        use_scrape_cache: bool = True,
        use_cached_report: bool = True,
    ) -> Iterator[RunResponse]:
        logger.info(f"Generating a blog post on: {topic}")

        # 如果 use_cache 为 True,则使用缓存的博客文章
        if use_cached_report:
            cached_blog_post = self.get_cached_blog_post(topic)
            if cached_blog_post:
                yield RunResponse(
                    content=cached_blog_post, event=RunEvent.workflow_completed
                )
                return

        # 搜索有关该主题的文章
        search_results: Optional[SearchResults] = self.get_search_results(
            topic, use_search_cache
        )
        # 如果找不到该主题的 search_results,则结束工作流程
        if search_results is None or len(search_results.articles) == 0:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Sorry, could not find any articles on the topic: {topic}",
            )
            return

        # 抓取搜索结果
        scraped_articles: Dict[str, ScrapedArticle] = self.scrape_articles(
            topic, search_results, use_scrape_cache
        )

        # 为 writer 准备输入
        writer_input = {
            "topic": topic,
            "articles": [v.model_dump() for v in scraped_articles.values()],
        }

        # 运行 writer 并产生响应
        yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)

        # 将博客文章保存在缓存中
        self.add_blog_post_to_cache(topic, self.writer.run_response.content)

    def get_cached_blog_post(self, topic: str) -> Optional[str]:
        logger.info("Checking if cached blog post exists")

        return self.session_state.get("blog_posts", {}).get(topic)

    def add_blog_post_to_cache(self, topic: str, blog_post: str):
        logger.info(f"Saving blog post for topic: {topic}")
        self.session_state.setdefault("blog_posts", {})
        self.session_state["blog_posts"][topic] = blog_post

    def get_cached_search_results(self, topic: str) -> Optional[SearchResults]:
        logger.info("Checking if cached search results exist")
        search_results = self.session_state.get("search_results", {}).get(topic)
        return (
            SearchResults.model_validate(search_results)
            if search_results and isinstance(search_results, dict)
            else search_results
        )

    def add_search_results_to_cache(self, topic: str, search_results: SearchResults):
        logger.info(f"Saving search results for topic: {topic}")
        self.session_state.setdefault("search_results", {})
        self.session_state["search_results"][topic] = search_results

    def get_cached_scraped_articles(
        self, topic: str
    ) -> Optional[Dict[str, ScrapedArticle]]:
        logger.info("Checking if cached scraped articles exist")
        scraped_articles = self.session_state.get("scraped_articles", {}).get(topic)
        return (
            ScrapedArticle.model_validate(scraped_articles)
            if scraped_articles and isinstance(scraped_articles, dict)
            else scraped_articles
        )

    def add_scraped_articles_to_cache(
        self, topic: str, scraped_articles: Dict[str, ScrapedArticle]
    ):
        logger.info(f"Saving scraped articles for topic: {topic}")
        self.session_state.setdefault("scraped_articles", {})
        self.session_state["scraped_articles"][topic] = scraped_articles

    def get_search_results(
        self, topic: str, use_search_cache: bool, num_attempts: int = 3
    ) -> Optional[SearchResults]:
        # 如果 use_search_cache 为 True,则从会话状态获取缓存的 search_results
        if use_search_cache:
            try:
                search_results_from_cache = self.get_cached_search_results(topic)
                if search_results_from_cache is not None:
                    search_results = SearchResults.model_validate(
                        search_results_from_cache
                    )
                    logger.info(
                        f"Found {len(search_results.articles)} articles in cache."
                    )
                    return search_results
            except Exception as e:
                logger.warning(f"Could not read search results from cache: {e}")

        # 如果没有缓存的 search_results,则使用 searcher 查找最新文章
        for attempt in range(num_attempts):
            try:
                searcher_response: RunResponse = self.searcher.run(topic)
                if (
                    searcher_response is not None
                    and searcher_response.content is not None
                    and isinstance(searcher_response.content, SearchResults)
                ):
                    article_count = len(searcher_response.content.articles)
                    logger.info(
                        f"Found {article_count} articles on attempt {attempt + 1}"
                    )
                    # 缓存搜索结果
                    self.add_search_results_to_cache(topic, searcher_response.content)
                    return searcher_response.content
                else:
                    logger.warning(
                        f"Attempt {attempt + 1}/{num_attempts} failed: Invalid response type"
                    )
            except Exception as e:
                logger.warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")

        logger.error(f"Failed to get search results after {num_attempts} attempts")
        return None

    def scrape_articles(
        self, topic: str, search_results: SearchResults, use_scrape_cache: bool
    ) -> Dict[str, ScrapedArticle]:
        scraped_articles: Dict[str, ScrapedArticle] = {}

        # 如果 use_scrape_cache 为 True,则从会话状态获取缓存的 scraped_articles
        if use_scrape_cache:
            try:
                scraped_articles_from_cache = self.get_cached_scraped_articles(topic)
                if scraped_articles_from_cache is not None:
                    scraped_articles = scraped_articles_from_cache
                    logger.info(
                        f"Found {len(scraped_articles)} scraped articles in cache."
                    )
                    return scraped_articles
            except Exception as e:
                logger.warning(f"Could not read scraped articles from cache: {e}")

        # 抓取不在缓存中的文章
        for article in search_results.articles:
            if article.url in scraped_articles:
                logger.info(f"Found scraped article in cache: {article.url}")
                continue

            article_scraper_response: RunResponse = self.article_scraper.run(
                article.url
            )
            if (
                article_scraper_response is not None
                and article_scraper_response.content is not None
                and isinstance(article_scraper_response.content, ScrapedArticle)
            ):
                scraped_articles[article_scraper_response.content.url] = (
                    article_scraper_response.content
                )
                logger.info(f"Scraped article: {article_scraper_response.content.url}")

        # 将抓取的文章保存在会话状态中
        self.add_scraped_articles_to_cache(topic, scraped_articles)
        return scraped_articles


# 如果脚本直接执行,则运行工作流程
if __name__ == "__main__":
    import random

    from rich.prompt import Prompt

    # 展示生成器多功能性的有趣示例提示
    example_prompts = [
        "Why Cats Secretly Run the Internet",
        "The Science Behind Why Pizza Tastes Better at 2 AM",
        "Time Travelers' Guide to Modern Social Media",
        "How Rubber Ducks Revolutionized Software Development",
        "The Secret Society of Office Plants: A Survival Guide",
        "Why Dogs Think We're Bad at Smelling Things",
        "The Underground Economy of Coffee Shop WiFi Passwords",
        "A Historical Analysis of Dad Jokes Through the Ages",
    ]

    # 从用户处获取主题
    topic = Prompt.ask(
        "[bold]Enter a blog post topic[/bold] (or press Enter for a random example)\n✨",
        default=random.choice(example_prompts),
    )

    # 将主题转换为 URL 安全字符串以用作 session_id
    url_safe_topic = topic.lower().replace(" ", "-")

    # 初始化博客文章生成器工作流程
    # - 基于主题创建唯一的会话 ID
    # - 设置 SQLite 存储以缓存结果
    generate_blog_post = BlogPostGenerator(
        session_id=f"generate-blog-post-on-{url_safe_topic}",
        storage=SqliteStorage(
            table_name="generate_blog_post_workflows",
            db_file="tmp/agno_workflows.db",
        ),
        debug_mode=True,
    )

    # 使用缓存执行工作流程
    # 返回一个 RunResponse 对象迭代器,其中包含生成的博客文章内容
    blog_post: Iterator[RunResponse] = generate_blog_post.run(
        topic=topic,
        use_search_cache=True,
        use_scrape_cache=True,
        use_cached_report=True,
    )

    # 打印响应
    pprint_run_response(blog_post, markdown=True)

运行工作流程

安装库

pip install agno openai duckduckgo-search sqlalchemy

运行工作流程

python blog_post_generator.py

现在,结果已缓存到数据库中,可供将来运行重复使用。再次运行工作流程以查看缓存的结果。

python blog_post_generator.py

查看更多与工作流程相关的用例示例

设计决策

为什么我们建议将工作流程逻辑写成 Python 函数,而不是创建自定义抽象,如 Graph、Chain 或 Flow?

根据我们构建 AI 产品的经验,工作流程逻辑需要是动态的(即在运行时确定),并且需要对并行化、缓存、状态管理、错误处理和问题解决进行细粒度控制。

自定义抽象(Graph、Chain、Flow)以及新的 DSL 意味着需要学习新的概念和编写更多代码。最终我们将花费更多时间学习和与 DSL 搏斗。

在我们使用过的所有项目中,一个简单的 Python 函数似乎总是能奏效。我们还发现复杂的流程可能跨越多个文件,有时本身就变成了一个模块。这里什么东西效果很好?Python。

我们又回到了Unix 哲学

如果我们的工作流程无法用标准的 Python 编写,那么我们应该简化和重组我们的工作流程,而不是反过来。

另一个关于长时间运行的工作流程的重大挑战是管理请求/响应超时。我们需要工作流程异步触发,确认启动并将响应返回给客户端,然后允许客户端稍后轮询结果。实现这种用户体验需要将工作流程运行在后台任务中,并密切管理状态,以便客户端能够获得最新的更新。

基于这些原因,我们建议将工作流程构建为标准的 Python 函数,其提供的控制、灵活性和可靠性是无与伦比的。