Agno 支持使用 SingleStoreStorage
类将 Singlestore 作为工作流的存储后端。
从 此处 获取 Singlestore 的凭据。
import json
import os
from os import getenv
from typing import Iterator
import httpx
from agno.agent import Agent
from agno.run.response import RunResponse
from agno.storage.singlestore import SingleStoreStorage
from agno.tools.newspaper4k import Newspaper4kTools
from agno.utils.certs import download_cert
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import Workflow
from sqlalchemy.engine import create_engine
class HackerNewsReporter(Workflow):
description: str = (
"获取 Hacker News 的热门故事并撰写报告。"
)
hn_agent: Agent = Agent(
description="获取 hackernews 的热门故事。 "
"分享所有可能的信息,包括 url、分数、标题以及摘要(如果可用)。",
show_tool_calls=True,
)
writer: Agent = Agent(
tools=[Newspaper4kTools()],
description="撰写一篇引人入胜的 Hacker News 热门故事报告。",
instructions=[
"将提供热门故事及其链接。",
"仔细阅读每篇文章并思考内容",
"然后生成一篇最终的纽约时报级别的文章",
"将文章分成几个部分,并在结尾提供关键要点。",
"确保标题吸引人且有趣。",
"分享每篇文章的分数、标题、URL 和摘要。",
"为各个部分提供相关的标题,并在每个部分中提供细节/事实/流程。",
"忽略无法阅读或理解的文章。",
"请记住:您是在为纽约时报写作,因此文章的质量很重要。",
],
)
def get_top_hackernews_stories(self, num_stories: int = 10) -> str:
"""使用此函数从 Hacker News 获取热门故事。
Args:
num_stories (int): 要返回的故事数量。默认为 10。
Returns:
str: 热门故事的 JSON 字符串。
"""
# 获取热门故事 ID
response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json")
story_ids = response.json()
# 获取故事详情
stories = []
for story_id in story_ids[:num_stories]:
story_response = httpx.get(
f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
)
story = story_response.json()
story["username"] = story["by"]
stories.append(story)
return json.dumps(stories)
def run(self, num_stories: int = 5) -> Iterator[RunResponse]:
# 在此处设置 hn_agent 的工具,以避免循环引用
self.hn_agent.tools = [self.get_top_hackernews_stories]
logger.info(f"正在从 HackerNews 获取 {num_stories} 个热门故事。")
top_stories: RunResponse = self.hn_agent.run(num_stories=num_stories)
if top_stories is None or not top_stories.content:
yield RunResponse(
run_id=self.run_id, content="抱歉,无法获取热门故事。"
)
return
logger.info("正在阅读每个故事并撰写报告。")
yield from self.writer.run(top_stories.content, stream=True)
if __name__ == "__main__":
USERNAME = getenv("SINGLESTORE_USERNAME")
PASSWORD = getenv("SINGLESTORE_PASSWORD")
HOST = getenv("SINGLESTORE_HOST")
PORT = getenv("SINGLESTORE_PORT")
DATABASE = getenv("SINGLESTORE_DATABASE")
SSL_CERT = getenv("SINGLESTORE_SSL_CERT", None)
# 如果未提供 SSL_CERT,则下载证书
if not SSL_CERT:
SSL_CERT = download_cert(
cert_url="https://portal.singlestore.com/static/ca/singlestore_bundle.pem",
filename="singlestore_bundle.pem",
)
if SSL_CERT:
os.environ["SINGLESTORE_SSL_CERT"] = SSL_CERT
# SingleStore 数据库 URL
db_url = f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}?charset=utf8mb4"
if SSL_CERT:
db_url += f"&ssl_ca={SSL_CERT}&ssl_verify_cert=true"
# 创建数据库引擎
db_engine = create_engine(db_url)
# 运行工作流
report: Iterator[RunResponse] = HackerNewsReporter(
storage=SingleStoreStorage(
table_name="workflow_sessions",
mode="workflow",
db_engine=db_engine,
schema=DATABASE,
),
debug_mode=False,
).run(num_stories=5)
# 打印报告
pprint_run_response(report, markdown=True, show_time=True)
参数 | 类型 | 默认值 | 描述 |
---|---|---|---|
table_name | str | - | 要使用的表名。 |
schema | Optional[str] | "ai" | Schema 名称。 |
db_url | Optional[str] | None | 数据库 URL,如果提供了的话。 |
db_engine | Optional[Engine] | None | 要使用的数据库引擎。 |
schema_version | int | 1 | Schema 的版本号。 |
auto_upgrade_schema | bool | False | 如果为 true ,则在必要时自动升级 schema。 |