注:以下所提的“文档”,是指LangGraph官方指南(https://langchain-ai.github.io/langgraph/guides/),参考版本是0.6.8。

以下代码的开发环境

[project]
name = "my-langgraph"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "dotenv>=0.9.9",
    "langchain>=0.3.27",
    "langchain-openai>=0.3.33",
    "langfuse>=3.5.0",
    "langgraph>=0.6.7",
    "langgraph-checkpoint-sqlite>=2.0.11",
    "numpy>=2.3.3",
]

一、文档的主线 & 核心主题对照

文档

主要内容 / 聚焦点

角色

典型 API / 模式

侧重点

概念篇

多代理的架构、设计决策、通信模式、handoffs 模式等

理论 / 架构层

架构(Network / Supervisor / Hierarchical / Handoffs)

理解为什么要多代理、如何组织多个 agent

代理篇

提供多代理的 预构建(prebuilt)实现,例如 Supervisor、Swarm

库 / API 层

create_supervisor、create_swarm、handoff 工具

快速上手用已有构造好的多代理模块

How-To 操作篇

手把手教你如何在 LangGraph 中写多代理:创建 handoffs、控制 agent 输入、构建系统流程

实践层

Command、handoff_tool、节点返回 Command、工具返回 Command

从零实现一个多代理系统的步骤与技巧

这三者结合起来,从理论 → 库支持 → 实现细节,覆盖了 LangGraph 在多代理这个模块的整体能力和使用路径。

二. 概念篇:Multi-Agent Systems Overview(架构与设计)

文档这里这部分是理解Multi-Agent在 LangGraph 背后的设计思路、模式、权衡点的关键文档。下面是对其结构与重点的总结。

1. 多代理为何重要 / 背景动机

  • 随着任务复杂度上升,单个 agent 往往不够清晰/高效:它可能有太多工具、不得不兼顾多个领域、对上下文管理变得复杂。
  • Multi-Agent可 模块化专精可控:将不同子任务或专业能力拆成多个 agent,各自处理一部分,最后协同整合。
  • 利于维护与演进:你可以更容易地替换 /升级某个 agent,而不破坏整个系统。

2. 多代理架构 / 模式分类

文档列出了几种主要架构方式,每种有其特性与适用场景:

架构

描述

特点 / 权衡

Network

每个 agent 都可以与其它 agent 通信;agent 自主决定下一个 agent 路径。

高自由度,路由灵活;但决策复杂,可能难以控制路径。

Supervisor

有一个中心的 supervisor agent 负责协调整个流程;其他 agent 各司其职,接收调度。

易于集中控制、监控,但 supervisor 成为瓶颈或单点决策中心。

Supervisor (tool-calling)

supervisor 以 “工具” 的形式调用各个 agent,把 agent 当作工具;路由通过工具调用实现。

更符合 tool-based 模式;agent 被抽象成工具,方便集成与调用。

Hierarchical

多层级的代理结构:如 supervisor-of-supervisors,或子团队 agent。

更复杂的组织结构,适合大规模结构化系统。

Custom / 混合

对于部分流程使用 deterministic 分支、部分使用 agent 路由、通信只在部分 agent 间开放。

灵活组合,适应复杂业务需求。

3. 通信 / 状态管理 / Handoffs

  • Handoffs 是Multi-Agent系统的基本通信方式:一个 agent 在结束时 “移交 (hand off)” 控制权给另一个 agent,同时可以传递状态更新 / payload。
  • 在 LangGraph 中,可以通过返回 Command(...) 对象从一个 agent 节点触发 handoff(指定下一个 agent 名称 / 更新状态 /是否跨图导航 (graph=Command.PARENT))
  • 通信方式可选:
    • 共享完整 “思考 /日志 /消息 history” 给所有 agent
    • 只共享最终结果 /部分摘要
    • 在 message history 中以 agent 名称标识不同 agent 输出。
  • 状态管理 / schema:不同 agent 可能有不同状态 schema(TypedDict)或共享部分状态。这要求设计清晰的边界 /映射规则。

4. Hand-offs 与工具融合

  • Hand-offs 可以包装为工具:即你定义一个特殊工具(@tool),其返回 Command(...),让 agent 用工具调用发起 handoff。这样 handoff 行为在工具层就内嵌在 agent 的行动集合里。
  • 这种方式有一个好处:agent 的决策(prompt /工具调用)和 handoff 路由统一在工具调用层,而不必在节点里硬写跳转逻辑。

总结(概念篇结论)

  • Multi-Agent是将复杂任务拆成若干 agent 协作执行的方式,是 LangGraph 强功能之一。
  • 架构设计(Network / Supervisor / Hierarchical / Custom)决定了代理间通信、控制流和协调责任。
  • Handoffs 是 agent 路由的核心机制;状态传递、通信日志设计、schema 设计是构建健壮多-agent 系统的关键要素。

三. Agents篇(Prebuilt  Multi-Agent实现)

指南文档这部分讲的是 LangGraph 本身或相关扩展提供的 “开箱即用” Multi-Agent系统支持:Supervisor / Swarm / Handoffs 等。它让你不用从头写路由逻辑,就能快速搭建一个Multi-Agent 系统。

下面是该文档的主要内容与要点。

1. Supervisor 预构建实现

  • 提供 langgraph-supervisor 库(可安装)来支持 supervisor 架构。
  • 在 supervisor 模式中,你定义多个 agent(例如 flight_assistant、hotel_assistant),再用 create_supervisor(...) 构造一个 supervisor graph,把各 agent 作为工具 /子节点交给 supervisor 管理。
  • 示例代码片段:
flight_assistant = create_react_agent(...)
hotel_assistant = create_react_agent(...)
supervisor = create_supervisor(
    agents=[flight_assistant, hotel_assistant],
    model=ChatOpenAI(...),
    prompt="You manage a hotel booking assistant and a flight booking assistant..."
).compile()
  • 在这个架构里,用户对话进入 supervisor,supervisor 决定要调用哪个 agent,并把任务 /消息路由给它。 agent 完成后返回,其结果回到 supervisor /主图。

2. Swarm 预构建实现

  • 提供 langgraph-swarm 库用来构建 Swarm 模式 多代理系统。 Swarm 模式的特点是 agent 间可以动态交接控制权(handoff),而不是固定 supervisor 指挥。
  • 在 swarm 模式中,agent 被赋予 handoff 工具(transfer_to_X)以主动转交控制权给另一个 agent。系统还记住最后活跃 agent,以便后续继续那个 agent 处理对话。
  • 示例代码片段:

transfer_to_hotel = create_handoff_tool(agent_name="hotel_assistant")

transfer_to_flight = create_handoff_tool(agent_name="flight_assistant")

flight_agent = create_react_agent(..., tools=[book_flight, transfer_to_hotel])

hotel_agent = create_react_agent(..., tools=[book_hotel, transfer_to_flight])

swarm = create_swarm(

    agents=[flight_agent, hotel_agent],

    default_active_agent="flight_assistant"

).compile()

  • 在 swarm 模式下,对话过程中 agent 可以根据上下文主动调用 handoff 工具,把控制权移给另一个 agent。

3. Handoffs 支持 &共通机制

  • 无论是 supervisor 或 swarm,核心通信都是通过 handoff 实现:agent 返回 Command(...) 指定下一个 agent 及传递的数据。
  • Handoff 工具可通过 @tool 装饰器封装,使 handoff 操作对 agent 来说更自然、可作为工具调用。
  • 多代理预构建层还会处理细节的 Agent-to-agent message routing、状态传递、工具整合、流式处理等,使用户能更方便使用。

总结(代理篇结论)

  • LangGraph 提供 Supervisor / Swarm 模式的预构建多代理框架,降低用户自己写路由逻辑的难度。
  • Supervisor 模式适合集中调度型系统;Swarm 模式适合 agent 间自由交互 /任务切换型系统。
  • Handoff 是所有预构建实现的基础机制,它把 agent 间的控制流和状态传递标准化。

四. 操作篇:How-To Multi-Agent(实现细节指南)

这是教你如何从零构建多代理系统的实操指南。它补充了概念篇的架构设计与代理篇的预构建 API,主要聚焦在 handoffs 与控制 agent 输入 /构造系统图的细节。

以下是它的结构与关键要点。

1. Handoffs — 基本机制

  • 创建 handoff 工具:用 @tool 装饰函数,使其返回 Command(goto=..., update=..., graph=Command.PARENT),表示切换到另一个 agent 节点、更新父图 state、在父图层级进行跳转。
  • 控制 agent 输入:不仅可以 hand off 给另一个 agent,还可以透过 Send()(或类似 primitive)把构造好的子任务 / payload 直接发送给某个 agent,使其接收定制输入。
  • 示例中 create_handoff_tool(...) 的代码说明了如何从一个 agent 调用 handoff 工具,将 state /消息注入另一个 agent 的输入。

2. 构建多代理系统(手写 /组合)

  • 你可以把多个 agent(例如 flight_assistant、hotel_assistant)当作 graph 节点加入一个父图。父图节点之间通过 handoffs 路径连接。
  • 父图起始节点通常指向一个 agent(比如 flight_assistant);agent 决策流程可能 hand off 给其他 agent 或结束。
  • 在父图中也可以写 wrapper 节点 / 路由节点,以在 agent 之间做转换 / 中继 /条件跳转。

3. 多轮 / 连续对话 & agent 状态

  • 多代理系统常用于多轮交互 /对话场景,每轮 agent 可能接到用户输入 /工具结果 /其他 agent 输出后决定接下来的 agent 路径。How-To 文档中提到 “Multi-turn conversation” 部分。
  • agent 状态 (messages、memory) 一般共享给父图,再由父图给每个 agent 作为输入。也可以在 handoff 时只传部分 state。
  • 若 agent 内部有多节点 /子图,也可以在子节点内部发出 Command(graph=Command.PARENT) 跳出子图到另一个 agent 节点。文档对此有提醒:为了可视化 /监控,需要在父图中 wrap 子图调用函数为 Command 型节点 (即节点返回 Command) 才能跟踪跨图跳转。

4. 使用预构建模块的衔接

  • 操作篇指出建议使用预构建的 create_react_agent / ToolNode / handoff_tool 等工具,因为它们天然支持将 Command 类型工具 / agent 嵌入。
  • 例如,用 create_handoff_tool(...) 为 agent 提供路由能力;用 create_react_agent 构造 agent;把 agent 节点添加到父图;再把父图 .compile() 得到多 agent 系统。
  • 操作指南也演示如何组合 agent 与 handoff,以及如何控制 agent 接收的消息/输入。

五. 使用建议 /注意事项 /挑战

基于这三篇指南文档 + 实践经验,这里给一些建议与提醒,帮助你在实际用 LangGraph 构建多代理系统时规避坑、写得更健壮。

  1. 合理设计 agent 间的通信边界 /输入输出接口
    不要让 agent 直接读写过多除自己以外的 state 字段。使用 handoff payload /明确更新字段会更安全。
  2. Command 路由要明确 / 有限状态
    agent 返回的 Command(goto=...) 路径尽量限定在已知 agent 节点,以避免无限跳转或死循环。
  3. 子图跨 agent 路由注意 graph=Command.PARENT
    当你在子图内部想跳转到另一个 agent(而非同一子图内节点)时,要指定 graph=Command.PARENT,并在父图里有对应 agent 节点。否则跳转可能无效或不可视化。
  4. 日志 / tracing /调试要加标识 agent 名称
    在 message history /日志里加上 agent 名(或前缀)能帮助追踪哪个 agent 的输出 /思路。
  5. 并发 /异步 /速率限制
    多 agent 常伴有并发调用、工具并行、速率限制冲突等问题。要注意给 agent 或模型加 rate limiter /timeout /失败重试策略。
  6. 慎用预构建模块与自定义模块
    预构建(Supervisor / Swarm)适合中等复杂场景、减少样板代码。但对于非常定制 /复杂路由逻辑,可能需要自己写 handoff + routing 节点。
  7. 保持状态兼容 /升级可扩展性
    多代理系统中,随着 agent 数量 /需求升级,状态 schema 可能会变(增 /删字段)。要考虑兼容旧 checkpoint /历史。
  8. 避免单点瓶颈 /监控与可插拔性
    在 Supervisor 架构中,supervisor 是中心协调者,可能成为瓶颈。要监控其响应性能。Swarm 架构可以缓解这个集中瓶颈问题。
  9. 版本兼容 /API 变动
    多代理功能在 LangGraph 生态中仍在进化,某些 API(Command 参数、子图跨 agent 跳转、handoff 工具)可能在未来版本调整。编写时要预留适配空间。

六、例子代码


在代码中使用两个节点flight_agent和hotel_agent来模拟agent间的network模式机制。

import os, argparse, time, json
import sqlite3
from typing import TypedDict, Annotated, Dict, Any, List
from operator import add

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.config import get_stream_writer
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import Command

"""
工作流(StateGraph) 实现,不用预构建 Agent;
两个代理节点:flight_agent / hotel_agent;
Handoff(移交):代理之间通过 Command 跳转;
工具调用(简化版):book_flight / book_hotel 更新行程;
SQLite 持久化 / 检查点;
真实 LLM(OpenAI)可选,没配 Key 会自动本地回退;
控制台流式:updates/custom/values 观察执行;

一句话用法:你把一句话“发给系统”,它会路由给合适的代理;代理可完成预订或把会话 handoff 给另一个代理。

图结构:START → flight_agent → finalize,hotel_agent → finalize,finalize → END
"""

# ============== 可选 LLM ==============
def _get_llm():
    """若设置了 OPENAI_API_KEY 且安装了 langchain-openai,则返回 ChatOpenAI;否则返回 None(走回退规则)。"""
    llm = ChatOpenAI(model="qwen", api_key="empty", base_url="http://192.168.0.12:1120/v1", temperature=0.3)
    return llm

# ============== 全局状态 ==============
class MState(TypedDict, total=False):
    user_input: str                        # 本轮用户输入
    active_agent: str                      # 当前活跃代理名:flight_agent / hotel_agent
    messages: Annotated[List[str], add]    # 对话轨迹(简化文本)
    itinerary: Dict[str, Any]              # 行程(flight / hotel)
    last_action: Annotated[List[str], add]                       # 代理最新动作(说明 / 结果)
    log: Annotated[List[str], add]         # 运行日志(流式可视)

# ============== 工具(简化版) ==============
def book_flight_tool(state: MState, *, route_hint: str = "") -> Dict[str, Any]:
    """模拟订机票:写入 itinerary.flight"""
    w = get_stream_writer()
    time.sleep(0.05)
    flight = {
        "from": "SFO",
        "to": "JFK",
        "date": "2025-10-01",
        "carrier": "LG-Air",
        "route_hint": route_hint or "auto"
    }
    w({"tool": "book_flight", "flight": flight})
    new_it = dict(state.get("itinerary") or {})
    new_it["flight"] = flight
    return {"itinerary": new_it, "last_action": [f"Booked flight {flight['from']}→{flight['to']} on {flight['date']}"]}

def book_hotel_tool(state: MState, *, near: str = "") -> Dict[str, Any]:
    """模拟订酒店:写入 itinerary.hotel"""
    w = get_stream_writer()
    time.sleep(0.05)
    hotel = {
        "city": "New York",
        "name": "LG Plaza",
        "nights": 2,
        "near": near or "Times Square"
    }
    w({"tool": "book_hotel", "hotel": hotel})
    new_it = dict(state.get("itinerary") or {})
    new_it["hotel"] = hotel
    return {"itinerary": new_it, "last_action": [f"Booked hotel {hotel['name']} ({hotel['nights']} nights)"]}

# ============== 代理实现(两个节点) ==============
def _simple_llm_plan(agent: str, user_msg: str) -> Dict[str, Any]:
    """
    小型“策略器”:优先用 LLM 读意图;否则用关键词回退。
    返回结构:
      {"handoff": None|"flight_agent"|"hotel_agent", "action": "book_flight"/"book_hotel"/"chat", "note": str}
    """
    llm = _get_llm()
    if llm:
        from langchain_core.messages import SystemMessage, HumanMessage
        sys = (
            f"You are a routing and action planner for a {agent.replace('_',' ')}.\n"
            "Decide ONE of actions: book_flight, book_hotel, chat.\n"
            "Optionally decide a handoff target: flight_agent or hotel_agent (only if needed).\n"
            "Respond in strict JSON: {\"handoff\": null|\"flight_agent\"|\"hotel_agent\", "
            "\"action\": \"book_flight\"|\"book_hotel\"|\"chat\", \"note\": \"...\"}."
        )
        user = f"User says: {user_msg}"
        try:
            resp = llm.invoke([SystemMessage(content=sys), HumanMessage(content=user)])
            txt = resp.content.strip()
            data = json.loads(txt)
            return {
                "handoff": data.get("handoff"),
                "action": data.get("action", "chat"),
                "note": data.get("note", "")
            }
        except Exception:
            pass

    # ---- 回退规则(无 LLM 时)----
    s = user_msg.lower()
    handoff = None
    if "handoff:hotel" in s:
        handoff = "hotel_agent"
    elif "handoff:flight" in s:
        handoff = "flight_agent"

    action = "chat"
    if "book" in s and ("flight" in s or "ticket" in s or "plane" in s):
        action = "book_flight"
    elif "book" in s and ("hotel" in s or "room" in s):
        action = "book_hotel"

    return {"handoff": handoff, "action": action, "note": "(fallback rule)"}

def flight_agent(state: MState) -> Any:
    """
    航班代理:
      - 读取 user_input,规划(LLM/回退)
      - 可触发 book_flight_tool
      - 若需要,把控制权 handoff 给 hotel_agent
    """
    w = get_stream_writer()
    user_msg = state.get("user_input", "")
    w({"agent": "flight", "event": "enter", "user_input": user_msg})

    plan = _simple_llm_plan("flight_agent", user_msg)
    w({"agent": "flight", "event": "plan", **plan})

    updates: Dict[str, Any] = {"messages": [f"[flight] plan: {plan}"], "active_agent": "flight_agent"}
    # 执行动作
    if plan["action"] == "book_flight":
        updates.update(book_flight_tool(state, route_hint="direct"))
    elif plan["action"] == "book_hotel":
        # 在航班代理也能代订酒店(现实中可限制),这里演示动作与 handoff 解耦
        updates.update(book_hotel_tool(state, near="JFK"))
    else:
        updates["last_action"] = ["Chatted (no booking)"]
    updates.setdefault("log", []).append("flight_agent handled")

    # 是否需要 handoff
    if plan["handoff"] == "hotel_agent":
        # 返回 Command:跳转到 hotel_agent,并带着增量更新
        return Command(goto="hotel_agent", update=updates)

    # 否则正常继续
    return updates

def hotel_agent(state: MState) -> Any:
    """
    酒店代理:
      - 读取 user_input,规划(LLM/回退)
      - 可触发 book_hotel_tool
      - 若需要,把控制权 handoff 给 flight_agent
    """
    w = get_stream_writer()
    user_msg = state.get("user_input", "")
    w({"agent": "hotel", "event": "enter", "user_input": user_msg})

    plan = _simple_llm_plan("hotel_agent", user_msg)
    w({"agent": "hotel", "event": "plan", **plan})

    updates: Dict[str, Any] = {"messages": [f"[hotel] plan: {plan}"], "active_agent": "hotel_agent"}
    if plan["action"] == "book_hotel":
        updates.update(book_hotel_tool(state, near="city center"))
    elif plan["action"] == "book_flight":
        updates.update(book_flight_tool(state, route_hint="cheap"))
    else:
        updates["last_action"] = ["Chatted (no booking)"]
    updates.setdefault("log", []).append("hotel_agent handled")

    if plan["handoff"] == "flight_agent":
        return Command(goto="flight_agent", update=updates)

    return updates

def finalize(state: MState) -> Dict[str, Any]:
    """收尾:生成简报文本(不结束会话,仅做示例)。"""
    w = get_stream_writer()
    it = state.get("itinerary") or {}
    flight = it.get("flight")
    hotel = it.get("hotel")
    brief = ["[SUMMARY]"]
    if flight:
        brief.append(f"Flight: {flight['from']}→{flight['to']} on {flight['date']} ({flight['carrier']})")
    if hotel:
        brief.append(f"Hotel:  {hotel['name']} in {hotel['city']} ({hotel['nights']} nights)")
    if not (flight or hotel):
        brief.append("(no bookings yet)")
    text = "\n".join(brief)
    w({"node": "finalize", "event": "preview", "preview": text})
    return {"messages": [text], "last_action": ["summarized"]}

# ============== 图装配(含 SQLite 持久化) ==============
def build_app(db_path="multi_agent.sqlite"):
    conn = sqlite3.connect(db_path, check_same_thread=False)
    saver = SqliteSaver(conn)

    g = StateGraph(MState)
    g.add_node("flight_agent", flight_agent)
    g.add_node("hotel_agent", hotel_agent)
    g.add_node("finalize", finalize)

    # 简单主干:START → flight_agent → finalize
    # (handoff 会在两个代理间来回跳,直到某个代理不再返回 Command)
    g.add_edge(START, "flight_agent")
    # 让两个代理都能流向 finalize(若未 handoff)
    g.add_edge("flight_agent", "finalize")
    g.add_edge("hotel_agent", "finalize")
    g.add_edge("finalize", END)

    return g.compile(checkpointer=saver)

# ============== CLI ==============
def _stream(app, init, config):
    for mode, chunk in app.stream(
        init,
        config=config,
        stream_mode=["updates", "custom", "values"],
        durability="sync",
    ):
        if mode == "custom":
            print("🟣 [custom ]", chunk)
        elif mode == "updates":
            print("🔸 [updates]", chunk)
        elif mode == "values":
            # 打点:只展示关键字段,避免太长
            view = {k: chunk.get(k) for k in ["active_agent","last_action","itinerary"] if k in chunk}
            print("🟢 [values ]", view)

def cmd_chat(args):
    """
    发送一条用户输入,系统会:
    - 进入 flight_agent(默认起点)
    - 代理可能 handoff 到另一个代理
    - 最终到 finalize 给出总结
    所有步骤持久化到 SQLite(按 thread_id 分叉)。
    """
    app = build_app(args.db)
    cfg = {"configurable": {"thread_id": args.thread}}
    print(f"\n=== CHAT (thread={args.thread}) ===")
    user_msg = args.text
    _stream(app, {"user_input": user_msg}, cfg)

    st = app.get_state(cfg)
    print("\n--- STATE (snapshot) ---")
    view = {k: st.values.get(k) for k in ["active_agent","last_action","itinerary","messages"]}
    print(json.dumps(view, indent=2, ensure_ascii=False))

def cmd_history(args):
    app = build_app(args.db)
    cfg = {"configurable": {"thread_id": args.thread}}
    hist = list(app.get_state_history(cfg))
    print(f"\n=== HISTORY (thread={args.thread}) ===")
    if not hist:
        print("(empty)")
        return
    for i, st in enumerate(hist):
        cp = (st.config or {}).get("configurable", {}).get("checkpoint_id")
        nxt = st.next
        vals = st.values or {}
        preview = {k: vals.get(k) for k in ["active_agent","last_action"]}
        print(f"[{i}] next={nxt!r} checkpoint_id={cp!r} {preview}")

def cmd_travel(args):
    """从指定 checkpoint_id 恢复一次(回溯),不修改状态。"""
    app = build_app(args.db)
    cfg = {"configurable": {"thread_id": args.thread, "checkpoint_id": args.checkpoint}}
    print(f"\n=== TRAVEL (thread={args.thread}, checkpoint={args.checkpoint}) ===")
    _stream(app, None, cfg)
    st = app.get_state(cfg)
    print("\n--- AFTER TRAVEL ---")
    print({k: st.values.get(k) for k in ["active_agent","last_action"]})

def cmd_patch(args):
    """在某个 checkpoint 上修改部分 state,然后从该点恢复执行(形成新分支)。"""
    app = build_app(args.db)
    base_cfg = {"configurable": {"thread_id": args.thread, "checkpoint_id": args.checkpoint, "checkpoint_ns": ""}}
    try:
        patch_vals = json.loads(args.values)
        assert isinstance(patch_vals, dict)
    except Exception as e:
        raise SystemExit(f"--values 必须是 JSON 对象,如 '{{\"user_input\":\"book a hotel near JFK\"}}'  // {e}")

    print(f"\n=== PATCH (thread={args.thread}, checkpoint={args.checkpoint}) ===")
    print("patch values:", patch_vals)
    new_cfg = app.update_state(base_cfg, values=patch_vals, )
    _stream(app, None, new_cfg)
    st = app.get_state({"configurable": {"thread_id": args.thread}})
    print("\n--- AFTER PATCH ---")
    print({k: st.values.get(k) for k in ["active_agent","last_action","itinerary"]})

"""
怎么玩
# 1) 发一条消息(含“handoff:hotel”将触发移交到 hotel_agent)
python multi_agent_workflow.py chat --thread t-1 \
  --text "Please book a flight SFO→JFK tomorrow, then handoff:hotel to book a hotel near Times Square."

# 2) 查看历史检查点(拿 checkpoint_id)
python multi_agent_workflow.py history --thread t-1

# 3a) 回溯恢复(不改状态)
python multi_agent_workflow.py travel --thread t-1 --checkpoint <ID_FROM_HISTORY>

# 3b) 在检查点上改 state 再恢复(形成分支)
python multi_agent_workflow.py patch --thread t-1 \
  --checkpoint <ID_FROM_HISTORY> \
  --values '{"user_input":"Please book a hotel for 2 nights near JFK."}'
  
你能看到的点
- Handoff(Command):flight_agent / hotel_agent 节点根据规划决定是否 return Command(goto="...") 把控制权交给对方;
- 工具调用:根据规划触发 book_flight_tool / book_hotel_tool,把结果写入 state.itinerary;
- 真实 LLM(可选):若配了 OPENAI_API_KEY,代理会用 LLM 产出严格 JSON的路由/动作计划;否则走回退规则;
- 持久化 + 时间旅行:每一步都进 SQLite checkpoint;支持 history / travel / patch;
- 流式可观测:updates/custom/values 实时打印节点/工具事件与状态增量。
"""

class _Args:
    # 一个简单的“命名空间”对象,用来模拟 argparse.Namespace
    def __init__(self, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)

def chat_via_code(thread: str="t-1",
                  text: str="Please book a flight SFO→JFK tomorrow, then handoff:hotel to book a hotel near Times Square.",
                  db: str="multi_agent_workflow_example.sqlite"):
    """
    等价于命令:
    python multi_agent_workflow.py chat --thread t-1 \
        --text "Please book a flight SFO→JFK tomorrow, then handoff:hotel to book a hotel near Times Square."

    请预订明天SFO→JFK的航班,然后handoff:hotel预订时代广场附近的酒店。
    """
    args = _Args(thread=thread, text=text, db=db)
    cmd_chat(args)

def history_via_code(thread: str="t-1",
                  db: str="multi_agent_workflow_example.sqlite"):
    """
    等价于命令:
    python multi_agent_workflow.py history --thread t-1
    """
    args = _Args(thread=thread, db=db)
    cmd_history(args)

def travel_via_code(thread: str, checkpoint: str,
                  db: str="multi_agent_workflow_example.sqlite"):
    """
    等价于命令:
    python multi_agent_workflow.py travel --thread t-1 --checkpoint <ID_FROM_HISTORY>
    """
    args = _Args(thread=thread, checkpoint=checkpoint, db=db)
    cmd_travel(args)

def patch_via_code(thread: str, checkpoint: str, values:str,
                  db: str="multi_agent_workflow_example.sqlite"):
    """
    等价于命令:
    python multi_agent_workflow.py patch --thread t-1 \
        --checkpoint <ID_FROM_HISTORY> \
        --values '{"user_input":"Please book a hotel for 2 nights near JFK."}'
    """
    args = _Args(thread=thread, checkpoint=checkpoint, values=values, db=db)
    cmd_patch(args)

def main():
    # chat_via_code()
    # history_via_code()
    # travel_via_code(thread="t-1", checkpoint="1f09dd2c-dc49-6e96-8000-8c1a5fd72750")
    patch_via_code(thread="t-1", checkpoint="1f09dd2c-dc49-6e96-8000-8c1a5fd72750",
                   values='{"user_input":"Please book a hotel for 2 nights near JFK."}')

def run_console():
    p = argparse.ArgumentParser(description="Multi-Agent Workflow (handoff + tools + persistence + optional LLM)")
    sub = p.add_subparsers(dest="cmd", required=True)

    pc = sub.add_parser("chat", help="send one user message through the multi-agent workflow")
    pc.add_argument("--thread", default="t-9001")
    pc.add_argument("--text",   default="Please book a flight tomorrow SFO to JFK, then handoff:hotel to arrange hotel near Times Square.")
    pc.add_argument("--db",     default="multi_agent.sqlite")
    pc.set_defaults(func=cmd_chat)

    ph = sub.add_parser("history", help="list checkpoints of a thread")
    ph.add_argument("--thread", default="t-9001")
    ph.add_argument("--db",     default="multi_agent.sqlite")
    ph.set_defaults(func=cmd_history)

    pt = sub.add_parser("travel", help="resume from a checkpoint_id")
    pt.add_argument("--thread",    default="t-9001")
    pt.add_argument("--checkpoint", required=True)
    pt.add_argument("--db",        default="multi_agent.sqlite")
    pt.set_defaults(func=cmd_travel)

    pp = sub.add_parser("patch", help="patch state at checkpoint_id and resume")
    pp.add_argument("--thread",    default="t-9001")
    pp.add_argument("--checkpoint", required=True)
    pp.add_argument("--values",     required=True,
                    help='JSON dict, e.g. \'{"user_input":"book a hotel near JFK"}\'')
    pp.add_argument("--db",        default="multi_agent.sqlite")
    pp.set_defaults(func=cmd_patch)

    args = p.parse_args()
    if not os.getenv("OPENAI_API_KEY"):
        print("WARN: OPENAI_API_KEY not set. Will use fallback routing rules instead of LLM planning.")
    args.func(args)

def multi_agent_workflow_example():
    main()

if __name__ == "__main__":
    multi_agent_workflow_example()

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐