本示例展示了如何构建一个复杂的研究工作流程,该流程结合了: 🔍 用于查找相关资源的网络搜索能力 📚 内容提取和处理 ✍️ 学术风格报告生成 💾 智能缓存以提高性能
我们使用了以下免费工具:
您可以尝试以下示例研究主题:
import json
from textwrap import dedent
from typing import Dict, Iterator, Optional
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.storage.workflow.sqlite import SqliteWorkflowStorage
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import RunEvent, RunResponse, Workflow
from pydantic import BaseModel, Field
class Article(BaseModel):
title: str = Field(..., description="文章的标题。")
url: str = Field(..., description="文章的链接。")
summary: Optional[str] = Field(
..., description="文章的摘要(如果可用)。"
)
class SearchResults(BaseModel):
articles: list[Article]
class ScrapedArticle(BaseModel):
title: str = Field(..., description="文章的标题。")
url: str = Field(..., description="文章的链接。")
summary: Optional[str] = Field(
..., description="文章的摘要(如果可用)。"
)
content: Optional[str] = Field(
...,
description="文章的内容(如果可用,以 Markdown 格式)。如果内容不可用或无意义,则返回 None。",
)
class ResearchReportGenerator(Workflow):
description: str = dedent("""\
生成全面的研究报告,结合学术严谨性和引人入胜的叙事风格。
此工作流程协同多个 AI 代理来搜索、分析和综合来自不同来源的信息,并将其整合成结构良好的报告。
""")
web_searcher: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGoTools()],
description=dedent("""\
您是 ResearchBot-X,是发现和评估学术和科学来源的专家。\
"""),
instructions=dedent("""\
你是一个严谨的研究助理,精通来源评估!🔍
搜索 10-15 个资源,并确定其中 5-7 个最权威、最相关的资源。
优先考虑:
- 同行评审的文章和学术出版物
- 来自知名机构的最新进展
- 权威新闻来源和专家评论
- 来自公认专家的不同观点
避免观点性文章和非权威来源。\
"""),
response_model=SearchResults,
)
article_scraper: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[Newspaper4kTools()],
description=dedent("""\
您是 ContentBot-X,是提取和构建学术内容的专家。\
"""),
instructions=dedent("""\
你是一位注重学术细节的精准内容策展人!📚
处理内容时:
- 从文章中提取内容
- 保留学术引文和参考文献
- 在术语上保持技术准确性
- 以清晰的章节逻辑组织内容
- 提取关键发现和方法细节
- 优雅地处理付费墙内容
将所有内容格式化为干净的 Markdown,以获得最佳可读性。\
"""),
response_model=ScrapedArticle,
)
writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
description=dedent("""\
您是 Professor X-2000,一位杰出的 AI 研究科学家,结合了学术严谨性和引人入胜的叙事风格。\
"""),
instructions=dedent("""\
发挥世界级学术研究者的专业知识!
🎯 分析阶段:
- 评估来源的可信度和相关性
- 交叉引用来源间的发现
- 识别关键主题和突破
💡 综合阶段:
- 开发连贯的叙事框架
- 连接分散的发现
- 突出矛盾或差距
✍️ 写作阶段:
- 从引人入胜的执行摘要开始,吸引读者
- 清晰地呈现复杂思想
- 支持所有论点并附带引用
- 平衡深度与可读性
- 在保持学术语调的同时确保易读性
- 以影响和未来方向结尾。\
"""),
expected_output=dedent("""\
# {引人入胜的学术标题}
## 执行摘要
{关键发现和意义的简洁概述}
## 引言
{研究背景和上下文}
{该领域的当前状况}
## 方法
{搜索和分析方法}
{来源评估标准}
## 主要发现
{主要发现和进展}
{支持证据和分析}
{对比观点}
## 分析
{对发现的批判性评估}
{整合多方面观点}
{模式和趋势识别}
## 影响
{学术和实践意义}
{未来研究方向}
{潜在应用}
## 要点总结
- {关键发现 1}
- {关键发现 2}
- {关键发现 3}
## 参考文献
{格式正确的学术引用}
---
本报告由 Professor X-2000 生成
高级研究部
日期:{当前日期}\
"""),
markdown=True,
)
def run(
self,
topic: str,
use_search_cache: bool = True,
use_scrape_cache: bool = True,
use_cached_report: bool = True,
) -> Iterator[RunResponse]:
"""
针对给定主题生成全面的新闻报告。
此函数协调一个工作流程来搜索文章、抓取其内容并生成最终报告。
它利用缓存机制来优化性能。
Args:
topic (str): 要生成新闻报告的主题。
use_search_cache (bool, optional): 是否使用缓存的搜索结果。默认为 True。
use_scrape_cache (bool, optional): 是否使用缓存的文章抓取结果。默认为 True。
use_cached_report (bool, optional): 是否返回之前已生成的同一主题的报告。默认为 False。
Returns:
Iterator[RunResponse]: 一个包含生成的报告或状态信息的对象流。
步骤:
1. 如果 use_cached_report 为 True,则检查缓存报告。
2. 搜索关于该主题的文章:
- 如果缓存的搜索结果可用且 use_search_cache 为 True,则使用它们。
- 否则,执行新的网络搜索。
3. 抓取每篇文章的内容:
- 如果缓存的文章抓取结果可用且 use_scrape_cache 为 True,则使用它们。
- 抓取不在缓存中的新文章。
4. 使用抓取到的文章内容生成最终报告。
该函数利用 `session_state` 来存储和检索缓存数据。
"""
logger.info(f"正在生成关于以下主题的报告:{topic}")
# 如果 use_cached_report 为 True,则使用缓存的报告
if use_cached_report:
cached_report = self.get_cached_report(topic)
if cached_report:
yield RunResponse(
content=cached_report, event=RunEvent.workflow_completed
)
return
# 搜索关于该主题的文章
search_results: Optional[SearchResults] = self.get_search_results(
topic, use_search_cache
)
# 如果未找到该主题的 search_results,则结束工作流程
if search_results is None or len(search_results.articles) == 0:
yield RunResponse(
event=RunEvent.workflow_completed,
content=f"抱歉,未能找到关于主题 '{topic}' 的任何文章",
)
return
# 抓取搜索结果
scraped_articles: Dict[str, ScrapedArticle] = self.scrape_articles(
search_results, use_scrape_cache
)
# 撰写研究报告
yield from self.write_research_report(topic, scraped_articles)
def get_cached_report(self, topic: str) -> Optional[str]:
logger.info("正在检查是否存在缓存的报告")
return self.session_state.get("reports", {}).get(topic)
def add_report_to_cache(self, topic: str, report: str):
logger.info(f"正在为主题保存报告:{topic}")
self.session_state.setdefault("reports", {})
self.session_state["reports"][topic] = report
# 将报告保存到存储
self.write_to_storage()
def get_cached_search_results(self, topic: str) -> Optional[SearchResults]:
logger.info("正在检查是否存在缓存的搜索结果")
return self.session_state.get("search_results", {}).get(topic)
def add_search_results_to_cache(self, topic: str, search_results: SearchResults):
logger.info(f"正在为主题保存搜索结果:{topic}")
self.session_state.setdefault("search_results", {})
self.session_state["search_results"][topic] = search_results.model_dump()
# 将搜索结果保存到存储
self.write_to_storage()
def get_cached_scraped_articles(
self, topic: str
) -> Optional[Dict[str, ScrapedArticle]]:
logger.info("正在检查是否存在缓存的文章抓取结果")
return self.session_state.get("scraped_articles", {}).get(topic)
def add_scraped_articles_to_cache(
self, topic: str, scraped_articles: Dict[str, ScrapedArticle]
):
logger.info(f"正在为主题保存文章抓取结果:{topic}")
self.session_state.setdefault("scraped_articles", {})
self.session_state["scraped_articles"][topic] = scraped_articles
# 将文章抓取结果保存到存储
self.write_to_storage()
def get_search_results(
self, topic: str, use_search_cache: bool, num_attempts: int = 3
) -> Optional[SearchResults]:
# 如果 use_search_cache 为 True,则从会话状态获取缓存的 search_results
if use_search_cache:
try:
search_results_from_cache = self.get_cached_search_results(topic)
if search_results_from_cache is not None:
search_results = SearchResults.model_validate(
search_results_from_cache
)
logger.info(
f"从缓存中找到 {len(search_results.articles)} 篇文章。"
)
return search_results
except Exception as e:
logger.warning(f"无法从缓存读取搜索结果:{e}")
# 如果没有缓存的 search_results,则使用 web_searcher 查找最新文章
for attempt in range(num_attempts):
try:
searcher_response: RunResponse = self.web_searcher.run(topic)
if (
searcher_response is not None
and searcher_response.content is not None
and isinstance(searcher_response.content, SearchResults)
):
article_count = len(searcher_response.content.articles)
logger.info(
f"第 {attempt + 1} 次尝试找到 {article_count} 篇文章"
)
# 缓存搜索结果
self.add_search_results_to_cache(topic, searcher_response.content)
return searcher_response.content
else:
logger.warning(
f"第 {attempt + 1}/{num_attempts} 次尝试失败:响应类型无效"
)
except Exception as e:
logger.warning(f"第 {attempt + 1}/{num_attempts} 次尝试失败:{str(e)}")
logger.error(f"在 {num_attempts} 次尝试后未能获取搜索结果")
return None
def scrape_articles(
self, search_results: SearchResults, use_scrape_cache: bool
) -> Dict[str, ScrapedArticle]:
scraped_articles: Dict[str, ScrapedArticle] = {}
# 如果 use_scrape_cache 为 True,则从会话状态获取缓存的 scraped_articles
if use_scrape_cache:
try:
scraped_articles_from_cache = self.get_cached_scraped_articles(topic)
if scraped_articles_from_cache is not None:
scraped_articles = scraped_articles_from_cache
logger.info(
f"从缓存中找到 {len(scraped_articles)} 篇抓取文章。"
)
return scraped_articles
except Exception as e:
logger.warning(f"无法从缓存读取抓取文章:{e}")
# 抓取不在缓存中的文章
for article in search_results.articles:
if article.url in scraped_articles:
logger.info(f"在缓存中找到抓取文章:{article.url}")
continue
article_scraper_response: RunResponse = self.article_scraper.run(
article.url
)
if (
article_scraper_response is not None
and article_scraper_response.content is not None
and isinstance(article_scraper_response.content, ScrapedArticle)
):
scraped_articles[article_scraper_response.content.url] = (
article_scraper_response.content
)
logger.info(f"抓取文章:{article_scraper_response.content.url}")
# 将抓取文章保存到会话状态
self.add_scraped_articles_to_cache(topic, scraped_articles)
return scraped_articles
def write_research_report(
self, topic: str, scraped_articles: Dict[str, ScrapedArticle]
) -> Iterator[RunResponse]:
logger.info("正在撰写研究报告")
# 准备写入器的输入
writer_input = {
"topic": topic,
"articles": [v.model_dump() for v in scraped_articles.values()],
}
# 运行写入器并生成响应
yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)
# 将研究报告保存到缓存
self.add_report_to_cache(topic, self.writer.run_response.content)
# 如果脚本直接执行,则运行工作流程
if __name__ == "__main__":
from rich.prompt import Prompt
# 示例研究主题
example_topics = [
"2024年量子计算突破",
"人工智能意识研究",
"聚变能源发展",
"太空旅游对环境的影响",
"寿命研究进展",
]
topics_str = "\n".join(
f"{i + 1}. {topic}" for i, topic in enumerate(example_topics)
)
print(f"\n📚 示例研究主题:\n{topics_str}\n")
# 从用户那里获取主题
topic = Prompt.ask(
"[bold]请输入研究主题[/bold]\n✨",
default="2024年量子计算突破",
)
# 将主题转换为 URL 安全字符串,用于 session_id
url_safe_topic = topic.lower().replace(" ", "-")
# 初始化新闻报告生成工作流程
generate_research_report = ResearchReportGenerator(
session_id=f"generate-report-on-{url_safe_topic}",
storage=SqliteWorkflowStorage(
table_name="generate_research_report_workflow",
db_file="tmp/workflows.db",
),
)
# 执行启用缓存的工作流程
report_stream: Iterator[RunResponse] = generate_research_report.run(
topic=topic,
use_search_cache=True,
use_scrape_cache=True,
use_cached_report=True,
)
# 打印响应
pprint_run_response(report_stream, markdown=True)
创建虚拟环境
打开 Terminal
并创建一个 python 虚拟环境。
python3 -m venv .venv
source .venv/bin/activate
安装库
pip install openai duckduckgo-search newspaper4k lxml_html_clean sqlalchemy agno
运行工作流程
python research_workflow.py