LangGraph-多智能体(Multi-agent)
注:以下所提的“文档”,是指LangGraph官方指南(),参考版本是0.6.8。以下代码的概念篇多代理的架构、设计决策、通信模式、handoffs 模式等理论 / 架构层架构(Network / Supervisor / Hierarchical / Handoffs)理解为什么要多代理、如何组织多个 agent代理篇提供多代理的,例如 Supervisor、Swarm库 / API 层crea
注:以下所提的“文档”,是指LangGraph官方指南(https://langchain-ai.github.io/langgraph/guides/),参考版本是0.6.8。
以下代码的开发环境:
[project]
name = "my-langgraph"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"dotenv>=0.9.9",
"langchain>=0.3.27",
"langchain-openai>=0.3.33",
"langfuse>=3.5.0",
"langgraph>=0.6.7",
"langgraph-checkpoint-sqlite>=2.0.11",
"numpy>=2.3.3",
]
一、文档的主线 & 核心主题对照
|
文档 |
主要内容 / 聚焦点 |
角色 |
典型 API / 模式 |
侧重点 |
|
概念篇 |
多代理的架构、设计决策、通信模式、handoffs 模式等 |
理论 / 架构层 |
架构(Network / Supervisor / Hierarchical / Handoffs) |
理解为什么要多代理、如何组织多个 agent |
|
代理篇 |
提供多代理的 预构建(prebuilt)实现,例如 Supervisor、Swarm |
库 / API 层 |
create_supervisor、create_swarm、handoff 工具 |
快速上手用已有构造好的多代理模块 |
|
How-To 操作篇 |
手把手教你如何在 LangGraph 中写多代理:创建 handoffs、控制 agent 输入、构建系统流程 |
实践层 |
Command、handoff_tool、节点返回 Command、工具返回 Command |
从零实现一个多代理系统的步骤与技巧 |
这三者结合起来,从理论 → 库支持 → 实现细节,覆盖了 LangGraph 在多代理这个模块的整体能力和使用路径。
二. 概念篇:Multi-Agent Systems Overview(架构与设计)
文档这里这部分是理解Multi-Agent在 LangGraph 背后的设计思路、模式、权衡点的关键文档。下面是对其结构与重点的总结。
1. 多代理为何重要 / 背景动机
- 随着任务复杂度上升,单个 agent 往往不够清晰/高效:它可能有太多工具、不得不兼顾多个领域、对上下文管理变得复杂。
- Multi-Agent可 模块化、专精、可控:将不同子任务或专业能力拆成多个 agent,各自处理一部分,最后协同整合。
- 利于维护与演进:你可以更容易地替换 /升级某个 agent,而不破坏整个系统。
2. 多代理架构 / 模式分类
文档列出了几种主要架构方式,每种有其特性与适用场景:
|
架构 |
描述 |
特点 / 权衡 |
|
Network |
每个 agent 都可以与其它 agent 通信;agent 自主决定下一个 agent 路径。 |
高自由度,路由灵活;但决策复杂,可能难以控制路径。 |
|
Supervisor |
有一个中心的 supervisor agent 负责协调整个流程;其他 agent 各司其职,接收调度。 |
易于集中控制、监控,但 supervisor 成为瓶颈或单点决策中心。 |
|
Supervisor (tool-calling) |
supervisor 以 “工具” 的形式调用各个 agent,把 agent 当作工具;路由通过工具调用实现。 |
更符合 tool-based 模式;agent 被抽象成工具,方便集成与调用。 |
|
Hierarchical |
多层级的代理结构:如 supervisor-of-supervisors,或子团队 agent。 |
更复杂的组织结构,适合大规模结构化系统。 |
|
Custom / 混合 |
对于部分流程使用 deterministic 分支、部分使用 agent 路由、通信只在部分 agent 间开放。 |
灵活组合,适应复杂业务需求。 |
3. 通信 / 状态管理 / Handoffs
- Handoffs 是Multi-Agent系统的基本通信方式:一个 agent 在结束时 “移交 (hand off)” 控制权给另一个 agent,同时可以传递状态更新 / payload。
- 在 LangGraph 中,可以通过返回 Command(...) 对象从一个 agent 节点触发 handoff(指定下一个 agent 名称 / 更新状态 /是否跨图导航 (graph=Command.PARENT))
- 通信方式可选:
- 共享完整 “思考 /日志 /消息 history” 给所有 agent
- 只共享最终结果 /部分摘要
- 在 message history 中以 agent 名称标识不同 agent 输出。
- 状态管理 / schema:不同 agent 可能有不同状态 schema(TypedDict)或共享部分状态。这要求设计清晰的边界 /映射规则。
4. Hand-offs 与工具融合
- Hand-offs 可以包装为工具:即你定义一个特殊工具(@tool),其返回 Command(...),让 agent 用工具调用发起 handoff。这样 handoff 行为在工具层就内嵌在 agent 的行动集合里。
- 这种方式有一个好处:agent 的决策(prompt /工具调用)和 handoff 路由统一在工具调用层,而不必在节点里硬写跳转逻辑。
总结(概念篇结论)
- Multi-Agent是将复杂任务拆成若干 agent 协作执行的方式,是 LangGraph 强功能之一。
- 架构设计(Network / Supervisor / Hierarchical / Custom)决定了代理间通信、控制流和协调责任。
- Handoffs 是 agent 路由的核心机制;状态传递、通信日志设计、schema 设计是构建健壮多-agent 系统的关键要素。
三. Agents篇(Prebuilt Multi-Agent实现)
指南文档这部分讲的是 LangGraph 本身或相关扩展提供的 “开箱即用” Multi-Agent系统支持:Supervisor / Swarm / Handoffs 等。它让你不用从头写路由逻辑,就能快速搭建一个Multi-Agent 系统。
下面是该文档的主要内容与要点。
1. Supervisor 预构建实现
- 提供 langgraph-supervisor 库(可安装)来支持 supervisor 架构。
- 在 supervisor 模式中,你定义多个 agent(例如 flight_assistant、hotel_assistant),再用 create_supervisor(...) 构造一个 supervisor graph,把各 agent 作为工具 /子节点交给 supervisor 管理。
- 示例代码片段:
flight_assistant = create_react_agent(...)
hotel_assistant = create_react_agent(...)
supervisor = create_supervisor(
agents=[flight_assistant, hotel_assistant],
model=ChatOpenAI(...),
prompt="You manage a hotel booking assistant and a flight booking assistant..."
).compile()
- 在这个架构里,用户对话进入 supervisor,supervisor 决定要调用哪个 agent,并把任务 /消息路由给它。 agent 完成后返回,其结果回到 supervisor /主图。
2. Swarm 预构建实现
- 提供 langgraph-swarm 库用来构建 Swarm 模式 多代理系统。 Swarm 模式的特点是 agent 间可以动态交接控制权(handoff),而不是固定 supervisor 指挥。
- 在 swarm 模式中,agent 被赋予 handoff 工具(transfer_to_X)以主动转交控制权给另一个 agent。系统还记住最后活跃 agent,以便后续继续那个 agent 处理对话。
- 示例代码片段:
|
transfer_to_hotel = create_handoff_tool(agent_name="hotel_assistant") transfer_to_flight = create_handoff_tool(agent_name="flight_assistant") flight_agent = create_react_agent(..., tools=[book_flight, transfer_to_hotel]) hotel_agent = create_react_agent(..., tools=[book_hotel, transfer_to_flight]) swarm = create_swarm( agents=[flight_agent, hotel_agent], default_active_agent="flight_assistant" ).compile() |
- 在 swarm 模式下,对话过程中 agent 可以根据上下文主动调用 handoff 工具,把控制权移给另一个 agent。
3. Handoffs 支持 &共通机制
- 无论是 supervisor 或 swarm,核心通信都是通过 handoff 实现:agent 返回 Command(...) 指定下一个 agent 及传递的数据。
- Handoff 工具可通过 @tool 装饰器封装,使 handoff 操作对 agent 来说更自然、可作为工具调用。
- 多代理预构建层还会处理细节的 Agent-to-agent message routing、状态传递、工具整合、流式处理等,使用户能更方便使用。
总结(代理篇结论)
- LangGraph 提供 Supervisor / Swarm 模式的预构建多代理框架,降低用户自己写路由逻辑的难度。
- Supervisor 模式适合集中调度型系统;Swarm 模式适合 agent 间自由交互 /任务切换型系统。
- Handoff 是所有预构建实现的基础机制,它把 agent 间的控制流和状态传递标准化。
四. 操作篇:How-To Multi-Agent(实现细节指南)
这是教你如何从零构建多代理系统的实操指南。它补充了概念篇的架构设计与代理篇的预构建 API,主要聚焦在 handoffs 与控制 agent 输入 /构造系统图的细节。
以下是它的结构与关键要点。
1. Handoffs — 基本机制
- 创建 handoff 工具:用 @tool 装饰函数,使其返回 Command(goto=..., update=..., graph=Command.PARENT),表示切换到另一个 agent 节点、更新父图 state、在父图层级进行跳转。
- 控制 agent 输入:不仅可以 hand off 给另一个 agent,还可以透过 Send()(或类似 primitive)把构造好的子任务 / payload 直接发送给某个 agent,使其接收定制输入。
- 示例中 create_handoff_tool(...) 的代码说明了如何从一个 agent 调用 handoff 工具,将 state /消息注入另一个 agent 的输入。
2. 构建多代理系统(手写 /组合)
- 你可以把多个 agent(例如 flight_assistant、hotel_assistant)当作 graph 节点加入一个父图。父图节点之间通过 handoffs 路径连接。
- 父图起始节点通常指向一个 agent(比如 flight_assistant);agent 决策流程可能 hand off 给其他 agent 或结束。
- 在父图中也可以写 wrapper 节点 / 路由节点,以在 agent 之间做转换 / 中继 /条件跳转。
3. 多轮 / 连续对话 & agent 状态
- 多代理系统常用于多轮交互 /对话场景,每轮 agent 可能接到用户输入 /工具结果 /其他 agent 输出后决定接下来的 agent 路径。How-To 文档中提到 “Multi-turn conversation” 部分。
- agent 状态 (messages、memory) 一般共享给父图,再由父图给每个 agent 作为输入。也可以在 handoff 时只传部分 state。
- 若 agent 内部有多节点 /子图,也可以在子节点内部发出 Command(graph=Command.PARENT) 跳出子图到另一个 agent 节点。文档对此有提醒:为了可视化 /监控,需要在父图中 wrap 子图调用函数为 Command 型节点 (即节点返回 Command) 才能跟踪跨图跳转。
4. 使用预构建模块的衔接
- 操作篇指出建议使用预构建的 create_react_agent / ToolNode / handoff_tool 等工具,因为它们天然支持将 Command 类型工具 / agent 嵌入。
- 例如,用 create_handoff_tool(...) 为 agent 提供路由能力;用 create_react_agent 构造 agent;把 agent 节点添加到父图;再把父图 .compile() 得到多 agent 系统。
- 操作指南也演示如何组合 agent 与 handoff,以及如何控制 agent 接收的消息/输入。
五. 使用建议 /注意事项 /挑战
基于这三篇指南文档 + 实践经验,这里给一些建议与提醒,帮助你在实际用 LangGraph 构建多代理系统时规避坑、写得更健壮。
- 合理设计 agent 间的通信边界 /输入输出接口
不要让 agent 直接读写过多除自己以外的 state 字段。使用 handoff payload /明确更新字段会更安全。 - Command 路由要明确 / 有限状态
agent 返回的 Command(goto=...) 路径尽量限定在已知 agent 节点,以避免无限跳转或死循环。 - 子图跨 agent 路由注意 graph=Command.PARENT
当你在子图内部想跳转到另一个 agent(而非同一子图内节点)时,要指定 graph=Command.PARENT,并在父图里有对应 agent 节点。否则跳转可能无效或不可视化。 - 日志 / tracing /调试要加标识 agent 名称
在 message history /日志里加上 agent 名(或前缀)能帮助追踪哪个 agent 的输出 /思路。 - 并发 /异步 /速率限制
多 agent 常伴有并发调用、工具并行、速率限制冲突等问题。要注意给 agent 或模型加 rate limiter /timeout /失败重试策略。 - 慎用预构建模块与自定义模块
预构建(Supervisor / Swarm)适合中等复杂场景、减少样板代码。但对于非常定制 /复杂路由逻辑,可能需要自己写 handoff + routing 节点。 - 保持状态兼容 /升级可扩展性
多代理系统中,随着 agent 数量 /需求升级,状态 schema 可能会变(增 /删字段)。要考虑兼容旧 checkpoint /历史。 - 避免单点瓶颈 /监控与可插拔性
在 Supervisor 架构中,supervisor 是中心协调者,可能成为瓶颈。要监控其响应性能。Swarm 架构可以缓解这个集中瓶颈问题。 - 版本兼容 /API 变动
多代理功能在 LangGraph 生态中仍在进化,某些 API(Command 参数、子图跨 agent 跳转、handoff 工具)可能在未来版本调整。编写时要预留适配空间。
六、例子代码
在代码中使用两个节点flight_agent和hotel_agent来模拟agent间的network模式机制。
import os, argparse, time, json
import sqlite3
from typing import TypedDict, Annotated, Dict, Any, List
from operator import add
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.config import get_stream_writer
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import Command
"""
工作流(StateGraph) 实现,不用预构建 Agent;
两个代理节点:flight_agent / hotel_agent;
Handoff(移交):代理之间通过 Command 跳转;
工具调用(简化版):book_flight / book_hotel 更新行程;
SQLite 持久化 / 检查点;
真实 LLM(OpenAI)可选,没配 Key 会自动本地回退;
控制台流式:updates/custom/values 观察执行;
一句话用法:你把一句话“发给系统”,它会路由给合适的代理;代理可完成预订或把会话 handoff 给另一个代理。
图结构:START → flight_agent → finalize,hotel_agent → finalize,finalize → END
"""
# ============== 可选 LLM ==============
def _get_llm():
"""若设置了 OPENAI_API_KEY 且安装了 langchain-openai,则返回 ChatOpenAI;否则返回 None(走回退规则)。"""
llm = ChatOpenAI(model="qwen", api_key="empty", base_url="http://192.168.0.12:1120/v1", temperature=0.3)
return llm
# ============== 全局状态 ==============
class MState(TypedDict, total=False):
user_input: str # 本轮用户输入
active_agent: str # 当前活跃代理名:flight_agent / hotel_agent
messages: Annotated[List[str], add] # 对话轨迹(简化文本)
itinerary: Dict[str, Any] # 行程(flight / hotel)
last_action: Annotated[List[str], add] # 代理最新动作(说明 / 结果)
log: Annotated[List[str], add] # 运行日志(流式可视)
# ============== 工具(简化版) ==============
def book_flight_tool(state: MState, *, route_hint: str = "") -> Dict[str, Any]:
"""模拟订机票:写入 itinerary.flight"""
w = get_stream_writer()
time.sleep(0.05)
flight = {
"from": "SFO",
"to": "JFK",
"date": "2025-10-01",
"carrier": "LG-Air",
"route_hint": route_hint or "auto"
}
w({"tool": "book_flight", "flight": flight})
new_it = dict(state.get("itinerary") or {})
new_it["flight"] = flight
return {"itinerary": new_it, "last_action": [f"Booked flight {flight['from']}→{flight['to']} on {flight['date']}"]}
def book_hotel_tool(state: MState, *, near: str = "") -> Dict[str, Any]:
"""模拟订酒店:写入 itinerary.hotel"""
w = get_stream_writer()
time.sleep(0.05)
hotel = {
"city": "New York",
"name": "LG Plaza",
"nights": 2,
"near": near or "Times Square"
}
w({"tool": "book_hotel", "hotel": hotel})
new_it = dict(state.get("itinerary") or {})
new_it["hotel"] = hotel
return {"itinerary": new_it, "last_action": [f"Booked hotel {hotel['name']} ({hotel['nights']} nights)"]}
# ============== 代理实现(两个节点) ==============
def _simple_llm_plan(agent: str, user_msg: str) -> Dict[str, Any]:
"""
小型“策略器”:优先用 LLM 读意图;否则用关键词回退。
返回结构:
{"handoff": None|"flight_agent"|"hotel_agent", "action": "book_flight"/"book_hotel"/"chat", "note": str}
"""
llm = _get_llm()
if llm:
from langchain_core.messages import SystemMessage, HumanMessage
sys = (
f"You are a routing and action planner for a {agent.replace('_',' ')}.\n"
"Decide ONE of actions: book_flight, book_hotel, chat.\n"
"Optionally decide a handoff target: flight_agent or hotel_agent (only if needed).\n"
"Respond in strict JSON: {\"handoff\": null|\"flight_agent\"|\"hotel_agent\", "
"\"action\": \"book_flight\"|\"book_hotel\"|\"chat\", \"note\": \"...\"}."
)
user = f"User says: {user_msg}"
try:
resp = llm.invoke([SystemMessage(content=sys), HumanMessage(content=user)])
txt = resp.content.strip()
data = json.loads(txt)
return {
"handoff": data.get("handoff"),
"action": data.get("action", "chat"),
"note": data.get("note", "")
}
except Exception:
pass
# ---- 回退规则(无 LLM 时)----
s = user_msg.lower()
handoff = None
if "handoff:hotel" in s:
handoff = "hotel_agent"
elif "handoff:flight" in s:
handoff = "flight_agent"
action = "chat"
if "book" in s and ("flight" in s or "ticket" in s or "plane" in s):
action = "book_flight"
elif "book" in s and ("hotel" in s or "room" in s):
action = "book_hotel"
return {"handoff": handoff, "action": action, "note": "(fallback rule)"}
def flight_agent(state: MState) -> Any:
"""
航班代理:
- 读取 user_input,规划(LLM/回退)
- 可触发 book_flight_tool
- 若需要,把控制权 handoff 给 hotel_agent
"""
w = get_stream_writer()
user_msg = state.get("user_input", "")
w({"agent": "flight", "event": "enter", "user_input": user_msg})
plan = _simple_llm_plan("flight_agent", user_msg)
w({"agent": "flight", "event": "plan", **plan})
updates: Dict[str, Any] = {"messages": [f"[flight] plan: {plan}"], "active_agent": "flight_agent"}
# 执行动作
if plan["action"] == "book_flight":
updates.update(book_flight_tool(state, route_hint="direct"))
elif plan["action"] == "book_hotel":
# 在航班代理也能代订酒店(现实中可限制),这里演示动作与 handoff 解耦
updates.update(book_hotel_tool(state, near="JFK"))
else:
updates["last_action"] = ["Chatted (no booking)"]
updates.setdefault("log", []).append("flight_agent handled")
# 是否需要 handoff
if plan["handoff"] == "hotel_agent":
# 返回 Command:跳转到 hotel_agent,并带着增量更新
return Command(goto="hotel_agent", update=updates)
# 否则正常继续
return updates
def hotel_agent(state: MState) -> Any:
"""
酒店代理:
- 读取 user_input,规划(LLM/回退)
- 可触发 book_hotel_tool
- 若需要,把控制权 handoff 给 flight_agent
"""
w = get_stream_writer()
user_msg = state.get("user_input", "")
w({"agent": "hotel", "event": "enter", "user_input": user_msg})
plan = _simple_llm_plan("hotel_agent", user_msg)
w({"agent": "hotel", "event": "plan", **plan})
updates: Dict[str, Any] = {"messages": [f"[hotel] plan: {plan}"], "active_agent": "hotel_agent"}
if plan["action"] == "book_hotel":
updates.update(book_hotel_tool(state, near="city center"))
elif plan["action"] == "book_flight":
updates.update(book_flight_tool(state, route_hint="cheap"))
else:
updates["last_action"] = ["Chatted (no booking)"]
updates.setdefault("log", []).append("hotel_agent handled")
if plan["handoff"] == "flight_agent":
return Command(goto="flight_agent", update=updates)
return updates
def finalize(state: MState) -> Dict[str, Any]:
"""收尾:生成简报文本(不结束会话,仅做示例)。"""
w = get_stream_writer()
it = state.get("itinerary") or {}
flight = it.get("flight")
hotel = it.get("hotel")
brief = ["[SUMMARY]"]
if flight:
brief.append(f"Flight: {flight['from']}→{flight['to']} on {flight['date']} ({flight['carrier']})")
if hotel:
brief.append(f"Hotel: {hotel['name']} in {hotel['city']} ({hotel['nights']} nights)")
if not (flight or hotel):
brief.append("(no bookings yet)")
text = "\n".join(brief)
w({"node": "finalize", "event": "preview", "preview": text})
return {"messages": [text], "last_action": ["summarized"]}
# ============== 图装配(含 SQLite 持久化) ==============
def build_app(db_path="multi_agent.sqlite"):
conn = sqlite3.connect(db_path, check_same_thread=False)
saver = SqliteSaver(conn)
g = StateGraph(MState)
g.add_node("flight_agent", flight_agent)
g.add_node("hotel_agent", hotel_agent)
g.add_node("finalize", finalize)
# 简单主干:START → flight_agent → finalize
# (handoff 会在两个代理间来回跳,直到某个代理不再返回 Command)
g.add_edge(START, "flight_agent")
# 让两个代理都能流向 finalize(若未 handoff)
g.add_edge("flight_agent", "finalize")
g.add_edge("hotel_agent", "finalize")
g.add_edge("finalize", END)
return g.compile(checkpointer=saver)
# ============== CLI ==============
def _stream(app, init, config):
for mode, chunk in app.stream(
init,
config=config,
stream_mode=["updates", "custom", "values"],
durability="sync",
):
if mode == "custom":
print("🟣 [custom ]", chunk)
elif mode == "updates":
print("🔸 [updates]", chunk)
elif mode == "values":
# 打点:只展示关键字段,避免太长
view = {k: chunk.get(k) for k in ["active_agent","last_action","itinerary"] if k in chunk}
print("🟢 [values ]", view)
def cmd_chat(args):
"""
发送一条用户输入,系统会:
- 进入 flight_agent(默认起点)
- 代理可能 handoff 到另一个代理
- 最终到 finalize 给出总结
所有步骤持久化到 SQLite(按 thread_id 分叉)。
"""
app = build_app(args.db)
cfg = {"configurable": {"thread_id": args.thread}}
print(f"\n=== CHAT (thread={args.thread}) ===")
user_msg = args.text
_stream(app, {"user_input": user_msg}, cfg)
st = app.get_state(cfg)
print("\n--- STATE (snapshot) ---")
view = {k: st.values.get(k) for k in ["active_agent","last_action","itinerary","messages"]}
print(json.dumps(view, indent=2, ensure_ascii=False))
def cmd_history(args):
app = build_app(args.db)
cfg = {"configurable": {"thread_id": args.thread}}
hist = list(app.get_state_history(cfg))
print(f"\n=== HISTORY (thread={args.thread}) ===")
if not hist:
print("(empty)")
return
for i, st in enumerate(hist):
cp = (st.config or {}).get("configurable", {}).get("checkpoint_id")
nxt = st.next
vals = st.values or {}
preview = {k: vals.get(k) for k in ["active_agent","last_action"]}
print(f"[{i}] next={nxt!r} checkpoint_id={cp!r} {preview}")
def cmd_travel(args):
"""从指定 checkpoint_id 恢复一次(回溯),不修改状态。"""
app = build_app(args.db)
cfg = {"configurable": {"thread_id": args.thread, "checkpoint_id": args.checkpoint}}
print(f"\n=== TRAVEL (thread={args.thread}, checkpoint={args.checkpoint}) ===")
_stream(app, None, cfg)
st = app.get_state(cfg)
print("\n--- AFTER TRAVEL ---")
print({k: st.values.get(k) for k in ["active_agent","last_action"]})
def cmd_patch(args):
"""在某个 checkpoint 上修改部分 state,然后从该点恢复执行(形成新分支)。"""
app = build_app(args.db)
base_cfg = {"configurable": {"thread_id": args.thread, "checkpoint_id": args.checkpoint, "checkpoint_ns": ""}}
try:
patch_vals = json.loads(args.values)
assert isinstance(patch_vals, dict)
except Exception as e:
raise SystemExit(f"--values 必须是 JSON 对象,如 '{{\"user_input\":\"book a hotel near JFK\"}}' // {e}")
print(f"\n=== PATCH (thread={args.thread}, checkpoint={args.checkpoint}) ===")
print("patch values:", patch_vals)
new_cfg = app.update_state(base_cfg, values=patch_vals, )
_stream(app, None, new_cfg)
st = app.get_state({"configurable": {"thread_id": args.thread}})
print("\n--- AFTER PATCH ---")
print({k: st.values.get(k) for k in ["active_agent","last_action","itinerary"]})
"""
怎么玩
# 1) 发一条消息(含“handoff:hotel”将触发移交到 hotel_agent)
python multi_agent_workflow.py chat --thread t-1 \
--text "Please book a flight SFO→JFK tomorrow, then handoff:hotel to book a hotel near Times Square."
# 2) 查看历史检查点(拿 checkpoint_id)
python multi_agent_workflow.py history --thread t-1
# 3a) 回溯恢复(不改状态)
python multi_agent_workflow.py travel --thread t-1 --checkpoint <ID_FROM_HISTORY>
# 3b) 在检查点上改 state 再恢复(形成分支)
python multi_agent_workflow.py patch --thread t-1 \
--checkpoint <ID_FROM_HISTORY> \
--values '{"user_input":"Please book a hotel for 2 nights near JFK."}'
你能看到的点
- Handoff(Command):flight_agent / hotel_agent 节点根据规划决定是否 return Command(goto="...") 把控制权交给对方;
- 工具调用:根据规划触发 book_flight_tool / book_hotel_tool,把结果写入 state.itinerary;
- 真实 LLM(可选):若配了 OPENAI_API_KEY,代理会用 LLM 产出严格 JSON的路由/动作计划;否则走回退规则;
- 持久化 + 时间旅行:每一步都进 SQLite checkpoint;支持 history / travel / patch;
- 流式可观测:updates/custom/values 实时打印节点/工具事件与状态增量。
"""
class _Args:
# 一个简单的“命名空间”对象,用来模拟 argparse.Namespace
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def chat_via_code(thread: str="t-1",
text: str="Please book a flight SFO→JFK tomorrow, then handoff:hotel to book a hotel near Times Square.",
db: str="multi_agent_workflow_example.sqlite"):
"""
等价于命令:
python multi_agent_workflow.py chat --thread t-1 \
--text "Please book a flight SFO→JFK tomorrow, then handoff:hotel to book a hotel near Times Square."
请预订明天SFO→JFK的航班,然后handoff:hotel预订时代广场附近的酒店。
"""
args = _Args(thread=thread, text=text, db=db)
cmd_chat(args)
def history_via_code(thread: str="t-1",
db: str="multi_agent_workflow_example.sqlite"):
"""
等价于命令:
python multi_agent_workflow.py history --thread t-1
"""
args = _Args(thread=thread, db=db)
cmd_history(args)
def travel_via_code(thread: str, checkpoint: str,
db: str="multi_agent_workflow_example.sqlite"):
"""
等价于命令:
python multi_agent_workflow.py travel --thread t-1 --checkpoint <ID_FROM_HISTORY>
"""
args = _Args(thread=thread, checkpoint=checkpoint, db=db)
cmd_travel(args)
def patch_via_code(thread: str, checkpoint: str, values:str,
db: str="multi_agent_workflow_example.sqlite"):
"""
等价于命令:
python multi_agent_workflow.py patch --thread t-1 \
--checkpoint <ID_FROM_HISTORY> \
--values '{"user_input":"Please book a hotel for 2 nights near JFK."}'
"""
args = _Args(thread=thread, checkpoint=checkpoint, values=values, db=db)
cmd_patch(args)
def main():
# chat_via_code()
# history_via_code()
# travel_via_code(thread="t-1", checkpoint="1f09dd2c-dc49-6e96-8000-8c1a5fd72750")
patch_via_code(thread="t-1", checkpoint="1f09dd2c-dc49-6e96-8000-8c1a5fd72750",
values='{"user_input":"Please book a hotel for 2 nights near JFK."}')
def run_console():
p = argparse.ArgumentParser(description="Multi-Agent Workflow (handoff + tools + persistence + optional LLM)")
sub = p.add_subparsers(dest="cmd", required=True)
pc = sub.add_parser("chat", help="send one user message through the multi-agent workflow")
pc.add_argument("--thread", default="t-9001")
pc.add_argument("--text", default="Please book a flight tomorrow SFO to JFK, then handoff:hotel to arrange hotel near Times Square.")
pc.add_argument("--db", default="multi_agent.sqlite")
pc.set_defaults(func=cmd_chat)
ph = sub.add_parser("history", help="list checkpoints of a thread")
ph.add_argument("--thread", default="t-9001")
ph.add_argument("--db", default="multi_agent.sqlite")
ph.set_defaults(func=cmd_history)
pt = sub.add_parser("travel", help="resume from a checkpoint_id")
pt.add_argument("--thread", default="t-9001")
pt.add_argument("--checkpoint", required=True)
pt.add_argument("--db", default="multi_agent.sqlite")
pt.set_defaults(func=cmd_travel)
pp = sub.add_parser("patch", help="patch state at checkpoint_id and resume")
pp.add_argument("--thread", default="t-9001")
pp.add_argument("--checkpoint", required=True)
pp.add_argument("--values", required=True,
help='JSON dict, e.g. \'{"user_input":"book a hotel near JFK"}\'')
pp.add_argument("--db", default="multi_agent.sqlite")
pp.set_defaults(func=cmd_patch)
args = p.parse_args()
if not os.getenv("OPENAI_API_KEY"):
print("WARN: OPENAI_API_KEY not set. Will use fallback routing rules instead of LLM planning.")
args.func(args)
def multi_agent_workflow_example():
main()
if __name__ == "__main__":
multi_agent_workflow_example()
更多推荐


所有评论(0)