一 Langgraph orientation(出发点)

1 Latency in the seconds vs ms(通过并行/流式输出减少延迟以及感官延迟)

  • Parallelization – to save actual latency
  • Streaming – to save perceived latency

2 Long-Running Agents can fail, which is expensive and time consuming(通过记忆解决工程上的每一次从头开始)

  • Checkpointing – to reduce the cost of each retry

3 The non-deterministic nature of AI requires checkpoints, approvals, and testing(通过interupt 解决人机交互)

  • Human-in-the-loop - to collaborate with the user
  • Tracing, Observation and Evaluation (LangSmith)
    在这里插入图片描述

二 Langgraph Components(基础组建)

1. State&Nodes:

在这里插入图片描述
State seems like the thread data that I’ve been familiar with,can be persisted across time and in particular across failures of nodes,是一个流动的状态数据。

We get 3 ways to define the state:

  • TypeDict

  • dataclass

  • Pydantic Model

Nodes:The specific function that we define in graph, which is used for solving certain problems.
下面是基础用法:

from dataclasses import dataclass, field
from pydantic import BaseModel, Field
from typing import Annotated, List, Literal, TypedDict
from langgraph.graph import END, START, StataGraph
from langgraph.types import Command, interrupt

from typing import TypedDict, List

class State(TypedDict):
 messages: List[str]
 todo: List[str]
 approved: bool

@dataclass
class State:
 messages: List[str] = field(default_factory=list)
 todo: List[str] = field(default_factory=list)
 approved: bool = False
 
class State(BaseModel):
 messages: List[str] = Field(default_factory=list)
 todo: List[str] = Field(default_factory=list)
 approved: bool = False

 
def plan_node(state: State):
 # 假设根据用户输入做一个计划
 return {
     "todo": ["search docs", "summarize", "draft blog"],
     "messages": state["messages"] + ["[plan] generated todo list"]
 }

def work_node(state: State):
 done = f"[work] did: {state['todo'][0]}"
 return {
     "todo": state["todo"][1:],
     "messages": state["messages"] + [done]
 }

2. Edges: Control Flow

核心概念:Super Steps

Super Step 是 LangGraph 保证「并行安全、状态一致、流程可控」的核心执行语义。

Super Step = 一轮「节点并行执行 → 状态统一合并 → 再推进流程」的原子步骤

它不是一个函数,也不是一个类,而是 LangGraph 在内部推进 Graph 时的“节拍器, 同一个 Super Step 内,所有节点看到的是“同一份旧 State”,

在这里插入图片描述

  1. 顺序边
from langgraph.graph import StateGraph, START, END

builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("work", work_node)

builder.add_edge(START, "plan")
builder.add_edge("plan", "work")
builder.add_edge("work", END)
  1. Parallel

可以控制并行执行加速

def research_a(state: State):
    return {"messages": state["messages"] + ["[A] research result"]}

def research_b(state: State):
    return {"messages": state["messages"] + ["[B] research result"]}

def merge(state: State):
    return {"messages": state["messages"] + ["[merge] merged A+B"]}

builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("research_a", research_a)
builder.add_node("research_b", research_b)
builder.add_node("merge", merge)

builder.add_edge(START, "plan")

# 并行分叉
builder.add_edge("plan", "research_a")
builder.add_edge("plan", "research_b")

# 汇合(两个都跑完后进入 merge)
builder.add_edge("research_a", "merge")
builder.add_edge("research_b", "merge")
builder.add_edge("merge", END)

进一步的,可以使用map-reduce
在这里插入图片描述

langgraph用 Annotated 定义 reducer

from typing import Annotated, List, TypedDict
from langgraph.graph import StateGraph, START, END
from operator import add

class State(TypedDict):
    # 多个并行节点都可以往 results 里写
    # 最终用 add(list concat)做 reduce
    results: Annotated[List[str], add]

   
builder = StateGraph(State)

builder.add_node("map_a", map_a)
builder.add_node("map_b", map_b)
builder.add_node("map_c", map_c)

# fan-out
builder.add_edge(START, "map_a")
builder.add_edge(START, "map_b")
builder.add_edge(START, "map_c")

# fan-in(隐式)
builder.add_edge("map_a", END)
builder.add_edge("map_b", END)
builder.add_edge("map_c", END)

Let get into what’s happened under the hook:
1) 开始state

{"results": []}

2)Super Step 1(并行 Map)

  • map_a → {"results": ["result from A"]}
  • map_b → {"results": ["result from B"]}
  • map_c → {"results": ["result from C"]}

3) 最后:

{
  "results": [
    "result from A",
    "result from B",
    "result from C"
  ]
}

问题:什么时候该用 Map-Reduce,而不是 merge node?

✅ 用 State reducer(Map-Reduce)

  • 并行节点彼此独立
  • 只是“收集结果”
  • 合并逻辑简单(list / sum / dict merge)

✅ 用显式 merge node

  • 合并逻辑复杂
  • 需要排序、过滤、再推理
  • 合并本身是一个“智能步骤”
  1. condition controll
def route_after_work(state: State):
    # todo 还有就继续 work,否则结束
    return "work" if state["todo"] else END

builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("work", work_node)

builder.add_edge(START, "plan")
builder.add_edge("plan", "work")
builder.add_conditional_edges("work", route_after_work, ["work", END])

3 Memory(checkpoint)

在这里插入图片描述
执行中间结果的缓存。一个非常关键的观点: A thread is a collection of checkpoints

一个 thread 本质上是一组 checkpoints。

Checkpoint 的意义不只是失败恢复,还包括:

  • 调试(复盘每一步 State)
  • 评估(分析哪个阶段出问题)
  • 治理(知道 Agent 实际做过什么)

这一步,决定了 Agent 是否能进入生产环境。

from langgraph.checkpoint.memory import InMemorySaver

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "thread-1"}}

initial_state = {"messages": ["hi"], "todo": [], "approved": False}
final_state = graph.invoke(initial_state, config=config)
print(final_state["messages"])

4. Human in the Loop

LangGraph 并不假设 AI 可以全自动完成任务。

相反,它默认:

  • 某些节点必须人工确认
  • 某些决策应该暂停等待人类
  • 某些路径需要人工选择

这让 Agent 更像一个:

人机协作流程系统,而不是 autonomous bot。
在这里插入图片描述

from langgraph.types import interrupt

def approval_node(state: State):
   # 中断,把当前关键信息抛给人
   decision = interrupt({
       "question": "Approve this plan?",
       "todo": state["todo"],
       "messages_tail": state["messages"][-3:]
   })
   # decision 是人工返回的内容(比如 {"approved": True} 或 {"approved": False, "fix": "..."})
   return {
       "approved": bool(decision.get("approved", False)),
       "messages": state["messages"] + [f"[human] decision={decision}"]
   }
   
   
builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("approval", approval_node)
builder.add_node("work", work_node)

builder.add_edge(START, "plan")
builder.add_edge("plan", "approval")

def route_after_approval(state: State):
   return "work" if state["approved"] else END

builder.add_conditional_edges("approval", route_after_approval, ["work", END])
builder.add_conditional_edges("work", lambda s: "work" if s["todo"] else END, ["work", END])

graph = builder.compile(checkpointer=InMemorySaver())

运行到 interrupt 时,LangGraph 会暂停等待外部输入(UI/接口/你的控制台逻辑),这就是 HITL 的核心。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐