12 - Workflow:流水线式任务编排
一句话总结:Workflow 让你把多个 Agent 串成流水线,每一步的输出自动成为下一步的输入,执行顺序明确、数据流清晰。
Workflow 和 Team 的区别
上一篇讲了 Team,几个 Agent 凑在一起协作。但 Team 的本质是”动态的”——有个 leader 决定谁来干活、怎么干。这很灵活,但有时候你不需要灵活,你需要的是可预测。
打个比方:
- Team 像一场头脑风暴会议。大家坐在一起,主持人看情况让不同的人发言,讨论方向可能随时变化。
- Workflow 像一条装配流水线。零件从第一个工位传到最后一个工位,每一步做什么是提前定好的,谁也别想插队。
什么时候该用 Workflow?当你的任务可以拆成明确的阶段,前一步的产出就是后一步的原料。比如:调研 -> 分析 -> 写报告,或者采集数据 -> 清洗 -> 可视化。
最基本的顺序 Workflow
先来个最简单的例子:三个 Agent 组成一条流水线,依次完成数据收集、分析、报告撰写。
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Workflow
# 第一步:数据收集
data_agent = Agent(
name="数据收集员",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="你负责收集和整理原始信息。只收集数据,不做分析。",
)
data_step = Step(name="数据收集", agent=data_agent)
# 第二步:数据分析
analyst_agent = Agent(
name="分析师",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="你负责分析数据,找出关键趋势和洞察。",
)
analysis_step = Step(name="数据分析", agent=analyst_agent)
# 第三步:撰写报告
writer_agent = Agent(
name="报告撰写员",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="你负责把分析结果写成简洁的报告,不超过200字。",
markdown=True,
)
report_step = Step(name="撰写报告", agent=writer_agent)
# 组装流水线
workflow = Workflow(
name="研究报告流水线",
steps=[data_step, analysis_step, report_step],
)
workflow.print_response("分析一下2024年AI行业的发展趋势", stream=True)
三个 Step,按顺序排列,传给 Workflow。就这么简单。
数据是怎么在步骤之间流动的
你可能会问:每一步的输出是怎么传到下一步的?
答案是:自动的。Workflow 内部维护了一个 StepInput 对象,它会把前面所有步骤的输出(previous_step_outputs)带给下一步。对于 Agent 类型的 Step,Workflow 会自动把上一步的输出内容作为当前 Agent 的输入消息。
简单说就是:
用户输入 -> Step 1 (收集数据) -> 输出 A
|
输出 A -> Step 2 (分析) -> 输出 B
|
输出 B -> Step 3 (写报告) -> 最终结果
你不需要手动传递数据,Workflow 帮你搞定了。
并行步骤:同时干几件事
有些步骤之间互不依赖,没必要排队等。比如你想同时从多个渠道收集信息,然后汇总。Agno 提供了 Parallel 来实现这个。
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Parallel, Workflow
# 不同渠道的信息采集 Agent
tech_agent = Agent(
name="技术资讯采集",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="你负责收集技术领域的最新动态和趋势。",
)
market_agent = Agent(
name="市场数据采集",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="你负责收集市场规模、融资、竞争格局等商业数据。",
)
# 汇总分析 Agent
summary_agent = Agent(
name="汇总分析师",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="你负责把多个渠道的信息整合成一份完整的分析报告。",
markdown=True,
)
workflow = Workflow(
name="多渠道调研流水线",
steps=[
# 两个采集任务并行执行
Parallel(
Step(name="技术调研", agent=tech_agent),
Step(name="市场调研", agent=market_agent),
name="信息采集阶段",
),
# 汇总是串行的,等两个并行任务都完成后才开始
Step(name="汇总分析", agent=summary_agent),
],
)
workflow.print_response("调研大语言模型在企业中的应用现状", stream=True)
Parallel 里的步骤会同时执行,全部完成后,它们的输出会一起传给下一个步骤。汇总 Agent 能看到所有并行步骤的结果。
条件执行:根据情况决定要不要跑
有时候某个步骤不是每次都需要的。比如只有当研究内容包含数据和统计数字时,才需要额外做一轮事实核查。Condition 就是干这个的。
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Condition, Workflow
from agno.workflow.types import StepInput
# 核心 Agent
researcher = Agent(
name="研究员",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="研究给定主题,提供详细的发现和数据。",
)
fact_checker = Agent(
name="事实核查员",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="仔细核查内容中的数据和事实是否准确。",
)
writer = Agent(
name="撰稿人",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="根据所有可用的研究和核查结果,写一篇简洁的文章。",
markdown=True,
)
# 条件判断函数:上一步的输出里有没有数据性内容
def needs_fact_checking(step_input: StepInput) -> bool:
content = step_input.previous_step_content or ""
indicators = ["percent", "%", "million", "billion", "数据显示", "研究表明", "统计"]
return any(word in content.lower() for word in indicators)
workflow = Workflow(
name="智能调研流水线",
steps=[
Step(name="研究", agent=researcher),
Condition(
name="需要事实核查吗",
evaluator=needs_fact_checking,
steps=[Step(name="事实核查", agent=fact_checker)],
# else_steps 是可选的,不提供就直接跳过
),
Step(name="撰稿", agent=writer),
],
)
workflow.print_response("分析中国新能源汽车出口数据和趋势", stream=True)
evaluator 接收一个 StepInput,返回 True 就执行 steps 里的内容,返回 False 就跳过(或者执行 else_steps,如果你设了的话)。
自定义函数步骤:不只是 Agent
Step 的执行器不一定非得是 Agent。你也可以传一个自定义函数,适合做一些不需要 LLM 的中间处理,比如数据转换、格式化、调 API 等。
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Workflow
from agno.workflow.types import StepInput, StepOutput
researcher = Agent(
name="研究员",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="研究给定主题并提供关键发现。",
)
writer = Agent(
name="撰稿人",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="根据提供的大纲写一篇文章。",
markdown=True,
)
# 自定义函数:把研究结果格式化成大纲
def format_as_outline(step_input: StepInput) -> StepOutput:
content = step_input.previous_step_content or "无内容"
outline = f"""请根据以下研究材料撰写文章:
## 研究材料
{content}
## 写作要求
- 分3个部分,每部分有小标题
- 总字数控制在300字以内
- 语言简洁明了
"""
return StepOutput(content=outline)
workflow = Workflow(
name="文章生产线",
steps=[
Step(name="调研", agent=researcher),
Step(name="生成大纲", executor=format_as_outline),
Step(name="写作", agent=writer),
],
)
workflow.print_response("远程办公对团队协作的影响", stream=True)
自定义函数接收 StepInput,返回 StepOutput。中间那一步完全不需要调用 LLM,速度快、成本低。
什么时候用 Workflow,什么时候用 Team
| 场景 | 推荐 |
|---|---|
| 步骤必须按顺序执行 | Workflow |
| 需要动态协作和讨论 | Team |
| 可预测、可重复的流水线 | Workflow |
| 需要多个视角审视同一问题 | Team |
| 每一步角色明确、职责不同 | Workflow |
| 不确定需要几步才能完成 | Team |
一个经验法则:如果你能画出流程图,用 Workflow;如果你画不出来,用 Team。
当然两者也可以混着用。Step 的执行器可以是一个 Team,这样你就能在流水线的某一步里引入动态协作。
实战:内容创作流水线
来个更完整的例子,模拟一个内容创作团队的工作流程:
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Workflow
researcher = Agent(
name="调研员",
model=OpenAIChat(id="gpt-4o-mini"),
instructions=[
"你是一个专业的内容调研员。",
"根据主题搜索相关资料,整理出关键事实和数据。",
"输出格式:列出5-8个关键发现,每个发现一句话。",
],
)
outliner = Agent(
name="大纲策划",
model=OpenAIChat(id="gpt-4o-mini"),
instructions=[
"你是一个内容策划专家。",
"根据调研资料制定文章大纲。",
"大纲包括:标题、3-4个主要章节、每章的核心论点。",
],
)
writer = Agent(
name="写手",
model=OpenAIChat(id="gpt-4o-mini"),
instructions=[
"你是一个优秀的中文写手。",
"根据大纲写一篇完整的文章。",
"风格:专业但不枯燥,有观点有深度。",
"字数:500字左右。",
],
)
editor = Agent(
name="编辑",
model=OpenAIChat(id="gpt-4o-mini"),
instructions=[
"你是一个资深编辑。",
"润色文章,修正语法错误,优化表达。",
"确保逻辑通顺、观点清晰。",
"输出最终稿,不需要标注修改痕迹。",
],
markdown=True,
)
content_pipeline = Workflow(
name="内容创作流水线",
steps=[
Step(name="调研", agent=researcher),
Step(name="拟大纲", agent=outliner),
Step(name="写作", agent=writer),
Step(name="编辑", agent=editor),
],
)
content_pipeline.print_response(
"写一篇关于AI编程助手如何改变软件开发流程的文章",
stream=True,
)
四个 Agent,四个明确的角色,一条清晰的流水线。每一步都有专注的任务,最终产出一篇经过调研、规划、撰写、编辑四道工序的文章。
Workflow 还有什么
除了上面讲的,Workflow 还支持几个进阶特性:
- Loop – 循环执行一组步骤,直到满足结束条件或达到最大迭代次数。适合需要反复优化的场景。
- Router – 根据输入内容动态选择走哪条路径,类似于代码里的 switch/case。
- Steps – 把多个步骤打包成一个子流程,方便复用和组织。
- session_state – Workflow 级别的状态管理,步骤之间可以通过 session_state 共享结构化数据。
- 数据库持久化 – 通过
db参数接入 SQLite 或 PostgreSQL,保存 Workflow 的会话记录。
这些我们就不在入门教程里展开了,等你把基础用法跑通,再去看 cookbook 里的高级示例也不迟。
核心要点
- Workflow 是顺序流水线 – 步骤按定义顺序依次执行,前一步的输出自动传给下一步
- Step 是最小执行单元 – 每个 Step 绑定一个 Agent、一个 Team 或一个自定义函数
- Parallel 处理并行 – 互不依赖的步骤可以同时执行
- Condition 处理分支 – 根据条件决定某些步骤是否执行
- 自定义函数省钱省时 – 不需要 LLM 的中间处理用函数就够了
- Workflow 和 Team 互补 – 可预测选 Workflow,要灵活选 Team,也可以嵌套使用
下一篇预告
下一篇我们来聊推理能力 – 让 Agent 不只是”回答问题”,而是能够一步步思考、推理,像个真正会动脑子的助手。