ProductManager 从会议记录中生成任务,在 Linear 中创建相应的议题,并通过 Slack 通知团队任务详情。

创建一个名为 product_manager.py 的文件,并包含以下代码:

product_manager.py
import os
from datetime import datetime
from typing import Dict, List, Optional

from agno.agent.agent import Agent
from agno.run.response import RunEvent, RunResponse
from agno.storage.postgres import PostgresStorage
from agno.tools.linear import LinearTools
from agno.tools.slack import SlackTools
from agno.utils.log import logger
from agno.workflow.workflow import Workflow
from pydantic import BaseModel, Field


class Task(BaseModel):
    task_title: str = Field(..., description="The title of the task")
    task_description: Optional[str] = Field(
        None, description="The description of the task"
    )
    task_assignee: Optional[str] = Field(None, description="The assignee of the task")


class LinearIssue(BaseModel):
    issue_title: str = Field(..., description="The title of the issue")
    issue_description: Optional[str] = Field(
        None, description="The description of the issue"
    )
    issue_assignee: Optional[str] = Field(None, description="The assignee of the issue")
    issue_link: Optional[str] = Field(None, description="The link to the issue")


class LinearIssueList(BaseModel):
    issues: List[LinearIssue] = Field(..., description="A list of issues")


class TaskList(BaseModel):
    tasks: List[Task] = Field(..., description="A list of tasks")


class ProductManagerWorkflow(Workflow):
    description: str = "Generate linear tasks and send slack notifications to the team from meeting notes."

    task_agent: Agent = Agent(
        name="Task Agent",
        instructions=[
            "Given a meeting note, generate a list of tasks with titles, descriptions and assignees."
        ],
        response_model=TaskList,
    )

    linear_agent: Agent = Agent(
        name="Linear Agent",
        instructions=["Given a list of tasks, create issues in Linear."],
        tools=[LinearTools()],
        response_model=LinearIssueList,
    )

    slack_agent: Agent = Agent(
        name="Slack Agent",
        instructions=[
            "Send a slack notification to the #test channel with a heading (bold text) including the current date and tasks in the following format: ",
            "*Title*: <issue_title>",
            "*Description*: <issue_description>",
            "*Assignee*: <issue_assignee>",
            "*Issue Link*: <issue_link>",
        ],
        tools=[SlackTools()],
    )

    def get_tasks_from_cache(self, current_date: str) -> Optional[TaskList]:
        if "meeting_notes" in self.session_state:
            for cached_tasks in self.session_state["meeting_notes"]:
                if cached_tasks["date"] == current_date:
                    return cached_tasks["tasks"]
        return None

    def get_tasks_from_meeting_notes(self, meeting_notes: str) -> Optional[TaskList]:
        num_tries = 0
        tasks: Optional[TaskList] = None
        while tasks is None and num_tries < 3:
            num_tries += 1
            try:
                response: RunResponse = self.task_agent.run(meeting_notes)
                if (
                    response
                    and response.content
                    and isinstance(response.content, TaskList)
                ):
                    tasks = response.content
                else:
                    logger.warning("Invalid response from task agent, trying again...")
            except Exception as e:
                logger.warning(f"Error generating tasks: {e}")

        return tasks

    def create_linear_issues(
        self, tasks: TaskList, linear_users: Dict[str, str]
    ) -> Optional[LinearIssueList]:
        project_id = os.getenv("LINEAR_PROJECT_ID")
        team_id = os.getenv("LINEAR_TEAM_ID")
        if project_id is None:
            raise Exception("LINEAR_PROJECT_ID is not set")
        if team_id is None:
            raise Exception("LINEAR_TEAM_ID is not set")

        # Create issues in Linear
        logger.info(f"Creating issues in Linear: {tasks.model_dump_json()}")
        linear_response: RunResponse = self.linear_agent.run(
            f"Create issues in Linear for project {project_id} and team {team_id}: {tasks.model_dump_json()} and here is the dictionary of users and their uuid: {linear_users}. If you fail to create an issue, try again."
        )
        linear_issues = None
        if linear_response:
            logger.info(f"Linear response: {linear_response}")
            linear_issues = linear_response.content

        return linear_issues

    def run(
        self, meeting_notes: str, linear_users: Dict[str, str], use_cache: bool = False
    ) -> RunResponse:
        logger.info(f"Generating tasks from meeting notes: {meeting_notes}")
        current_date = datetime.now().strftime("%Y-%m-%d")

        if use_cache:
            tasks: Optional[TaskList] = self.get_tasks_from_cache(current_date)
        else:
            tasks = self.get_tasks_from_meeting_notes(meeting_notes)

        if tasks is None or len(tasks.tasks) == 0:
            return RunResponse(
                run_id=self.run_id,
                event=RunEvent.workflow_completed,
                content="Sorry, could not generate tasks from meeting notes.",
            )

        if "meeting_notes" not in self.session_state:
            self.session_state["meeting_notes"] = []
        self.session_state["meeting_notes"].append(
            {"date": current_date, "tasks": tasks.model_dump_json()}
        )

        linear_issues = self.create_linear_issues(tasks, linear_users)

        # Send slack notification with tasks
        if linear_issues:
            logger.info(
                f"Sending slack notification with tasks: {linear_issues.model_dump_json()}"
            )
            slack_response: RunResponse = self.slack_agent.run(
                linear_issues.model_dump_json()
            )
            logger.info(f"Slack response: {slack_response}")

        return slack_response


# Create the workflow
product_manager = ProductManagerWorkflow(
    session_id="product-manager",
    storage=PostgresStorage(
        table_name="product_manager_workflows",
        db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
    ),
)

meeting_notes = open("cookbook/workflows/product_manager/meeting_notes.txt", "r").read()
users_uuid = {
    "Sarah": "8d4e1c9a-b5f2-4e3d-9a76-f12d8e3b4c5a",
    "Mike": "2f9b7d6c-e4a3-42f1-b890-1c5d4e8f9a3b",
    "Emma": "7a1b3c5d-9e8f-4d2c-a6b7-8c9d0e1f2a3b",
    "Alex": "4c5d6e7f-8a9b-0c1d-2e3f-4a5b6c7d8e9f",
    "James": "1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d",
}

# Run workflow
product_manager.run(meeting_notes=meeting_notes, linear_users=users_uuid)

会议记录

meeting_notes.txt
每日站会 - 技术团队
日期:2024-01-15
时间:9:30 AM - 9:45 AM

与会者:
- Sarah (技术主管)
- Mike (后端开发工程师)
- Emma (前端开发工程师)
- Alex (DevOps 工程师)
- James (QA 工程师)

Sarah (技术主管):
“大家早上好!我们来过一下今天的更新和新任务。Mike,你先开始好吗?”

Mike (后端开发工程师):
“好的。我将着手实现我们上周讨论的新的认证服务。主要任务包括设置 JWT 令牌管理和与用户服务集成。预计完成时间约为 3-4 天。”

Emma (前端开发工程师):
“我今天将处理用户仪表盘的重新设计。这包括实现新的分析小部件和提高移动端响应性。我应该能在周四之前准备好一个初步版本供审查。”

Alex (DevOps 工程师):
“我将专注于设置新的监控系统。将配置 Prometheus 和 Grafana 以获得更好的可观测性。还需要更新我们的 CI/CD 管道以包含新的安全扫描工具。”

James (QA 工程师):
“一旦 Mike 的认证服务准备好,我将为其创建自动化测试用例。同时,我正在更新我们的端到端测试套件,并记录仪表盘功能的新测试流程。”

Sarah (技术主管):
“大家更新得很棒。请记住,我们明天下午 2 点有架构评审会议。请准备好你们的组件文档。如果有人需要任何帮助或遇到障碍,请告诉我。祝大家一天都有成效!”

会议于上午 9:45 结束

使用方法

1

创建虚拟环境

打开 Terminal 并创建一个 python 虚拟环境。

python3 -m venv .venv
source .venv/bin/activate
2

安装库

pip install "psycopg[binary]" slack-sdk
3

运行代理

python product_manager.py