工作流的关键在于控制和灵活性。
您的工作流逻辑就是一个 python 函数,因此您对工作流逻辑拥有完全的控制权。您可以:
- 在处理前验证输入
- 根据输入,创建代理并并行运行它们
- 根据需要缓存结果
- 纠正任何中间错误
- 流式传输输出
- 返回单个或多个输出
这种程度的控制对于可靠性至关重要。
流式传输
理解工作流的关键在于,您编写的是一个 python 函数,这意味着您可以决定该函数是否流式传输输出。要流式传输输出,请从工作流的 run()
方法中 yield 一个 Iterator[RunResponse]
。
# 定义工作流
class GenerateNewsReport(Workflow):
agent_1: Agent = ...
agent_2: Agent = ...
agent_3: Agent = ...
def run(self, ...) -> Iterator[RunResponse]:
# 运行代理并收集响应
# 这些可以是批量响应,您也可以根据需要流式传输中间结果
final_agent_input = ...
# 从 writer 代理生成最终响应
agent_3_response_stream: Iterator[RunResponse] = self.agent_3.run(final_agent_input, stream=True)
# Yield 响应
yield agent_3_response_stream
# 实例化工作流
generate_news_report = GenerateNewsReport()
# 运行工作流并获取 RunResponse 对象迭代器形式的响应
report_stream: Iterator[RunResponse] = generate_news_report.run(...)
# 打印响应
pprint_run_response(report_stream, markdown=True)
只需从工作流的 run()
方法返回一个 RunResponse
对象即可返回单个输出。
# 定义工作流
class GenerateNewsReport(Workflow):
agent_1: Agent = ...
agent_2: Agent = ...
agent_3: Agent = ...
def run(self, ...) -> RunResponse:
# 运行代理并收集响应
final_agent_input = ...
# 从 writer 代理生成最终响应
agent_3_response: RunResponse = self.agent_3.run(final_agent_input)
# 返回响应
return agent_3_response
# 实例化工作流
generate_news_report = GenerateNewsReport()
# 运行工作流并获取 RunResponse 对象形式的响应
report: RunResponse = generate_news_report.run(...)
# 打印响应
pprint_run_response(report, markdown=True)
Responses are generated using AI and may contain mistakes.