目录

12 - Workflow:流水线式任务编排

一句话总结:Workflow 让你把多个 Agent 串成流水线,每一步的输出自动成为下一步的输入,执行顺序明确、数据流清晰。


Workflow 和 Team 的区别

上一篇讲了 Team,几个 Agent 凑在一起协作。但 Team 的本质是”动态的”——有个 leader 决定谁来干活、怎么干。这很灵活,但有时候你不需要灵活,你需要的是可预测。

打个比方:

  • Team 像一场头脑风暴会议。大家坐在一起,主持人看情况让不同的人发言,讨论方向可能随时变化。
  • Workflow 像一条装配流水线。零件从第一个工位传到最后一个工位,每一步做什么是提前定好的,谁也别想插队。

什么时候该用 Workflow?当你的任务可以拆成明确的阶段,前一步的产出就是后一步的原料。比如:调研 -> 分析 -> 写报告,或者采集数据 -> 清洗 -> 可视化。


最基本的顺序 Workflow

先来个最简单的例子:三个 Agent 组成一条流水线,依次完成数据收集、分析、报告撰写。

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Workflow

# 第一步:数据收集
data_agent = Agent(
    name="数据收集员",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="你负责收集和整理原始信息。只收集数据,不做分析。",
)
data_step = Step(name="数据收集", agent=data_agent)

# 第二步:数据分析
analyst_agent = Agent(
    name="分析师",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="你负责分析数据,找出关键趋势和洞察。",
)
analysis_step = Step(name="数据分析", agent=analyst_agent)

# 第三步:撰写报告
writer_agent = Agent(
    name="报告撰写员",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="你负责把分析结果写成简洁的报告,不超过200字。",
    markdown=True,
)
report_step = Step(name="撰写报告", agent=writer_agent)

# 组装流水线
workflow = Workflow(
    name="研究报告流水线",
    steps=[data_step, analysis_step, report_step],
)

workflow.print_response("分析一下2024年AI行业的发展趋势", stream=True)

三个 Step,按顺序排列,传给 Workflow。就这么简单。


数据是怎么在步骤之间流动的

你可能会问:每一步的输出是怎么传到下一步的?

答案是:自动的。Workflow 内部维护了一个 StepInput 对象,它会把前面所有步骤的输出(previous_step_outputs)带给下一步。对于 Agent 类型的 Step,Workflow 会自动把上一步的输出内容作为当前 Agent 的输入消息。

简单说就是:

用户输入 -> Step 1 (收集数据) -> 输出 A
                                   |
                          输出 A -> Step 2 (分析) -> 输出 B
                                                      |
                                             输出 B -> Step 3 (写报告) -> 最终结果

你不需要手动传递数据,Workflow 帮你搞定了。


并行步骤:同时干几件事

有些步骤之间互不依赖,没必要排队等。比如你想同时从多个渠道收集信息,然后汇总。Agno 提供了 Parallel 来实现这个。

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Parallel, Workflow

# 不同渠道的信息采集 Agent
tech_agent = Agent(
    name="技术资讯采集",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="你负责收集技术领域的最新动态和趋势。",
)

market_agent = Agent(
    name="市场数据采集",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="你负责收集市场规模、融资、竞争格局等商业数据。",
)

# 汇总分析 Agent
summary_agent = Agent(
    name="汇总分析师",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="你负责把多个渠道的信息整合成一份完整的分析报告。",
    markdown=True,
)

workflow = Workflow(
    name="多渠道调研流水线",
    steps=[
        # 两个采集任务并行执行
        Parallel(
            Step(name="技术调研", agent=tech_agent),
            Step(name="市场调研", agent=market_agent),
            name="信息采集阶段",
        ),
        # 汇总是串行的,等两个并行任务都完成后才开始
        Step(name="汇总分析", agent=summary_agent),
    ],
)

workflow.print_response("调研大语言模型在企业中的应用现状", stream=True)

Parallel 里的步骤会同时执行,全部完成后,它们的输出会一起传给下一个步骤。汇总 Agent 能看到所有并行步骤的结果。


条件执行:根据情况决定要不要跑

有时候某个步骤不是每次都需要的。比如只有当研究内容包含数据和统计数字时,才需要额外做一轮事实核查。Condition 就是干这个的。

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Condition, Workflow
from agno.workflow.types import StepInput

# 核心 Agent
researcher = Agent(
    name="研究员",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="研究给定主题,提供详细的发现和数据。",
)

fact_checker = Agent(
    name="事实核查员",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="仔细核查内容中的数据和事实是否准确。",
)

writer = Agent(
    name="撰稿人",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="根据所有可用的研究和核查结果,写一篇简洁的文章。",
    markdown=True,
)

# 条件判断函数:上一步的输出里有没有数据性内容
def needs_fact_checking(step_input: StepInput) -> bool:
    content = step_input.previous_step_content or ""
    indicators = ["percent", "%", "million", "billion", "数据显示", "研究表明", "统计"]
    return any(word in content.lower() for word in indicators)

workflow = Workflow(
    name="智能调研流水线",
    steps=[
        Step(name="研究", agent=researcher),
        Condition(
            name="需要事实核查吗",
            evaluator=needs_fact_checking,
            steps=[Step(name="事实核查", agent=fact_checker)],
            # else_steps 是可选的,不提供就直接跳过
        ),
        Step(name="撰稿", agent=writer),
    ],
)

workflow.print_response("分析中国新能源汽车出口数据和趋势", stream=True)

evaluator 接收一个 StepInput,返回 True 就执行 steps 里的内容,返回 False 就跳过(或者执行 else_steps,如果你设了的话)。


自定义函数步骤:不只是 Agent

Step 的执行器不一定非得是 Agent。你也可以传一个自定义函数,适合做一些不需要 LLM 的中间处理,比如数据转换、格式化、调 API 等。

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Workflow
from agno.workflow.types import StepInput, StepOutput

researcher = Agent(
    name="研究员",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="研究给定主题并提供关键发现。",
)

writer = Agent(
    name="撰稿人",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="根据提供的大纲写一篇文章。",
    markdown=True,
)

# 自定义函数:把研究结果格式化成大纲
def format_as_outline(step_input: StepInput) -> StepOutput:
    content = step_input.previous_step_content or "无内容"
    outline = f"""请根据以下研究材料撰写文章:

## 研究材料
{content}

## 写作要求
- 分3个部分,每部分有小标题
- 总字数控制在300字以内
- 语言简洁明了
"""
    return StepOutput(content=outline)

workflow = Workflow(
    name="文章生产线",
    steps=[
        Step(name="调研", agent=researcher),
        Step(name="生成大纲", executor=format_as_outline),
        Step(name="写作", agent=writer),
    ],
)

workflow.print_response("远程办公对团队协作的影响", stream=True)

自定义函数接收 StepInput,返回 StepOutput。中间那一步完全不需要调用 LLM,速度快、成本低。


什么时候用 Workflow,什么时候用 Team

场景 推荐
步骤必须按顺序执行 Workflow
需要动态协作和讨论 Team
可预测、可重复的流水线 Workflow
需要多个视角审视同一问题 Team
每一步角色明确、职责不同 Workflow
不确定需要几步才能完成 Team

一个经验法则:如果你能画出流程图,用 Workflow;如果你画不出来,用 Team。

当然两者也可以混着用。Step 的执行器可以是一个 Team,这样你就能在流水线的某一步里引入动态协作。


实战:内容创作流水线

来个更完整的例子,模拟一个内容创作团队的工作流程:

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Step, Workflow

researcher = Agent(
    name="调研员",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions=[
        "你是一个专业的内容调研员。",
        "根据主题搜索相关资料,整理出关键事实和数据。",
        "输出格式:列出5-8个关键发现,每个发现一句话。",
    ],
)

outliner = Agent(
    name="大纲策划",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions=[
        "你是一个内容策划专家。",
        "根据调研资料制定文章大纲。",
        "大纲包括:标题、3-4个主要章节、每章的核心论点。",
    ],
)

writer = Agent(
    name="写手",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions=[
        "你是一个优秀的中文写手。",
        "根据大纲写一篇完整的文章。",
        "风格:专业但不枯燥,有观点有深度。",
        "字数:500字左右。",
    ],
)

editor = Agent(
    name="编辑",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions=[
        "你是一个资深编辑。",
        "润色文章,修正语法错误,优化表达。",
        "确保逻辑通顺、观点清晰。",
        "输出最终稿,不需要标注修改痕迹。",
    ],
    markdown=True,
)

content_pipeline = Workflow(
    name="内容创作流水线",
    steps=[
        Step(name="调研", agent=researcher),
        Step(name="拟大纲", agent=outliner),
        Step(name="写作", agent=writer),
        Step(name="编辑", agent=editor),
    ],
)

content_pipeline.print_response(
    "写一篇关于AI编程助手如何改变软件开发流程的文章",
    stream=True,
)

四个 Agent,四个明确的角色,一条清晰的流水线。每一步都有专注的任务,最终产出一篇经过调研、规划、撰写、编辑四道工序的文章。


Workflow 还有什么

除了上面讲的,Workflow 还支持几个进阶特性:

  • Loop – 循环执行一组步骤,直到满足结束条件或达到最大迭代次数。适合需要反复优化的场景。
  • Router – 根据输入内容动态选择走哪条路径,类似于代码里的 switch/case。
  • Steps – 把多个步骤打包成一个子流程,方便复用和组织。
  • session_state – Workflow 级别的状态管理,步骤之间可以通过 session_state 共享结构化数据。
  • 数据库持久化 – 通过 db 参数接入 SQLite 或 PostgreSQL,保存 Workflow 的会话记录。

这些我们就不在入门教程里展开了,等你把基础用法跑通,再去看 cookbook 里的高级示例也不迟。


核心要点

  1. Workflow 是顺序流水线 – 步骤按定义顺序依次执行,前一步的输出自动传给下一步
  2. Step 是最小执行单元 – 每个 Step 绑定一个 Agent、一个 Team 或一个自定义函数
  3. Parallel 处理并行 – 互不依赖的步骤可以同时执行
  4. Condition 处理分支 – 根据条件决定某些步骤是否执行
  5. 自定义函数省钱省时 – 不需要 LLM 的中间处理用函数就够了
  6. Workflow 和 Team 互补 – 可预测选 Workflow,要灵活选 Team,也可以嵌套使用

下一篇预告

下一篇我们来聊推理能力 – 让 Agent 不只是”回答问题”,而是能够一步步思考、推理,像个真正会动脑子的助手。