工作流的关键在于控制和灵活性。

您的工作流逻辑就是一个 python 函数,因此您对工作流逻辑拥有完全的控制权。您可以:

  • 在处理前验证输入
  • 根据输入,创建代理并并行运行它们
  • 根据需要缓存结果
  • 纠正任何中间错误
  • 流式传输输出
  • 返回单个或多个输出

这种程度的控制对于可靠性至关重要。

流式传输

理解工作流的关键在于,您编写的是一个 python 函数,这意味着您可以决定该函数是否流式传输输出。要流式传输输出,请从工作流的 run() 方法中 yield 一个 Iterator[RunResponse]

news_report_generator.py
# 定义工作流
class GenerateNewsReport(Workflow):
    agent_1: Agent = ...

    agent_2: Agent = ...

    agent_3: Agent = ...

    def run(self, ...) -> Iterator[RunResponse]:
        # 运行代理并收集响应
        # 这些可以是批量响应,您也可以根据需要流式传输中间结果
        final_agent_input = ...

        # 从 writer 代理生成最终响应
        agent_3_response_stream: Iterator[RunResponse] = self.agent_3.run(final_agent_input, stream=True)

        # Yield 响应
        yield agent_3_response_stream

# 实例化工作流
generate_news_report = GenerateNewsReport()

# 运行工作流并获取 RunResponse 对象迭代器形式的响应
report_stream: Iterator[RunResponse] = generate_news_report.run(...)

# 打印响应
pprint_run_response(report_stream, markdown=True)

批量

只需从工作流的 run() 方法返回一个 RunResponse 对象即可返回单个输出。

news_report_generator.py
# 定义工作流
class GenerateNewsReport(Workflow):
    agent_1: Agent = ...

    agent_2: Agent = ...

    agent_3: Agent = ...

    def run(self, ...) -> RunResponse:
        # 运行代理并收集响应
        final_agent_input = ...

        # 从 writer 代理生成最终响应
        agent_3_response: RunResponse = self.agent_3.run(final_agent_input)

        # 返回响应
        return agent_3_response

# 实例化工作流
generate_news_report = GenerateNewsReport()

# 运行工作流并获取 RunResponse 对象形式的响应
report: RunResponse = generate_news_report.run(...)

# 打印响应
pprint_run_response(report, markdown=True)