有一天小白兔学编程,问老程序员:“大佬,Agent 系统怎么才能记住以前聊过什么?”老程序员摸着胡子笑道:“孩子,你得给它一个大脑,也叫做状态(State)。”如果说 LangGraph 是智能代理的身体,那么状态就是它的记忆和灵魂。
本章我们将从为什么要有状态讲起,再到如何使用 Python 的 TypedDict、Annotated 定义类型安全且灵活的状态,最后聊聊常见的设计误区。

为什么 LangGraph 把 State 放在第一位

只有记忆的 Agent 才是真正的 Agent

还记得你第一次使用 ChatGPT 时的惊喜吗?输入一句话,它秒回一个答案。但当你问第二个问题,它会根据第一轮的上下文回答,这正是记忆的力量。在早期的链式编排框架中,我们往往只传递输入和输出,缺乏系统性的状态管理。这种无状态模式对于简单任务还够用,但一旦涉及多轮对话、工具调用、条件分支,问题就来了:缺乏共享的上下文会让 Agent 忘乎所以,重复犯错。

LangGraph 的官方文档明确指出,状态是一份共享的、集中式的数据结构,代表工作流在某一时刻的快照 。每个节点读取这份状态并返回增量更新,框架通过 reducer 合并这些更新。这就像是大家共用一本“旅客日志”——每个人只在自己的页脚写下变化,最后由管理员整理成一本完整的日记。另一篇文章强调,状态不仅可以保存简单的键值对,还能包含嵌套字典或自定义类,保证信息的一致性和连续性 。没有状态,Agent 只能“短忆如鱼”;有了状态,它可以“前事不忘,后事之师”。

TypedDict + Annotated 的设计哲学

为什么不用普通字典?

Python 的字典非常灵活,用字典做状态简单粗暴。但在多人协作或复杂系统中,裸字典就像一盘杂乱的散沙,很难协调。LangGraph 建议用 TypedDict、Pydantic 或 dataclass 来定义状态 schema。这样做有三个优势:

  1. 类型安全:你可以清楚知道每个字段存储什么类型,如果误用,IDE 甚至 Pydantic 会直接报错。文档指出,State 是定义在 Graph 初始化时传入的 schema,可以是 TypedDict 或 Pydantic 。
  2. 可读性:状态字段一目了然;比起“随手往字典里塞数据”要清爽得多。
  3. 自动合并:配合 Annotated 可以指定 reducer,LangGraph 在并发情况下自动合并,防止数据丢失 。

来看一个简单例子,我们先定义一个有字段 messages 和 count 的 ChatState,并使用自定义 reducer 累加整数:

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
from operator import add

# 自定义一个整数累加函数
def add_ints(a, b):
    return a + b

class ChatState(TypedDict):
    # Annotated 标记告诉 LangGraph 在并发场景下如何合并
    messages: Annotated[list[str], add]
    count: Annotated[int, add_ints]

def append_msg(state: ChatState) -> dict:
    last = state['messages'][-1] if state['messages'] else ""
    return {"messages": [last + " world!"], "count": 1}

builder = StateGraph(ChatState)
builder.add_node("append_msg", append_msg)
builder.set_entry_point("append_msg")
graph = builder.compile()
print(graph.invoke({"messages": ["Hello"], "count": 0}))

执行结果类似:{‘messages’: [‘Hello world!’], ‘count’: 1}。这里 Annotated 指定了 add 和 add_ints 作为 reducer,告诉框架在合并时要累加。这比手动拼接列表或累加整数要优雅。

Pydantic 模型的优缺点

虽然 TypedDict 简单高效,某些情况下我们需要更严格的验证,这时可以使用 Pydantic。StateGraph 接受任何类型作为 state_schema,包括 Pydantic BaseModel 。使用 Pydantic 可以在运行时检查输入类型,避免传入错误数据。例如:

from langgraph.graph import StateGraph
from pydantic import BaseModel

class MyState(BaseModel):
    a: str

def my_node(state: MyState):
    return {"a": state.a.upper()}

builder = StateGraph(MyState)
builder.add_node("my_node", my_node)
builder.set_entry_point("my_node")
graph = builder.compile()
print(graph.invoke({"a": "hello"}))  # 正常运行

但是,Pydantic 也有局限:1)验证只在输入时进行,后续节点不再自动验证;2)Graph 输出的不是 Pydantic 对象,需要手动实例化;3)递归验证较慢 。因此,如果你追求性能且状态结构简单,TypedDict 更合适;如需复杂验证,再用 Pydantic。

下面是一个 Pydantic 验证的示例,注意最后手动将结果转回模型:

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class Nested(BaseModel):
    value: str

class ComplexState(BaseModel):
    text: str
    count: int
    nested: Nested

def process(state: ComplexState):
    return {"text": state.text.upper(), "count": state.count + 1}

builder = StateGraph(ComplexState)
builder.add_node("process", process)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()

input_state = ComplexState(text="hello", count=0, nested=Nested(value="test"))
result = graph.invoke(input_state)
print(result)  # {'text': 'HELLO', 'count': 1, 'nested': {'value': 'test'}}
# 手动转换回 Pydantic
output_model = ComplexState(**result)
print(output_model)

reducers:不仅仅是加法

见识了简单的加法 reducer,我们再看看其他 reducer:
• add_messages:专门用于拼接消息(包括 LangChain 的 Message 对象),保证多轮对话的消息不会丢失。
• Overwrite:用于完全覆盖旧值,绕过 reducer 合并。例如某个状态字段需要重置为初始值,而不是累计。
• 自定义 reducer:可以自己定义函数,例如用集合去重、统计平均值等。只要函数能接受两个参数并返回合并结果,就能作为 reducer。

例如,假设我们要统计访问次数并存储唯一用户列表:

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
from operator import add

def union_sets(a: set, b: set):
    return a | b

class VisitState(TypedDict):
    users: Annotated[set[str], union_sets]
    visits: Annotated[int, add]

def log_visit(state: VisitState) -> dict:
    user = "user_" + str(state.get("visits", 0) + 1)
    return {"users": {user}, "visits": 1}

builder = StateGraph(VisitState)
builder.add_node("log", log_visit)
builder.set_entry_point("log")
graph = builder.compile()
state = {"users": set(), "visits": 0}
state = graph.invoke(state)
state = graph.invoke(state)
print(state)  # users 包含两个 user,visits 为 2

通过这种方式,我们无需担心集合覆盖的问题。

状态的不可变性与累积更新

为什么要不可变?

如果你用过 React,你一定听说过“不要直接修改 state”。这条原则在 LangGraph 中同样适用。mutable 结构会导致不可预期的副作用:当多个节点同时执行并共享同一份字典时,一个节点改了字段会影响另一个节点的输入,犹如几个人拉扯同一根毛线,最后缠成一团。官方建议每个节点都返回一个新的状态更新,而不是修改原状态 。这样可以保证每个更新是独立的,框架可以根据 reducer 正确合并。

来看一个小例子,比较错误与正确的写法:

# 错误示范
def wrong_increment(state):
    state["count"] += 1  # 修改原 state
    return state

# 正确示范
def right_increment(state):
    return {"count": state.get("count", 0) + 1}

第一个函数会直接改变输入字典的 count 字段,如果另外一个并行节点也尝试增加 count,就可能发生竞态;第二个函数仅返回增量,由 LangGraph 的 reducer 决定如何合并。

累积更新:就像滚雪球

当我们在图中循环调用某个节点时,每次更新都会追加到状态里,从而形成历史轨迹。例如在一个问答机器人中,你想记录所有对话消息,那么每轮对话需要把用户输入和模型回复都写入状态。使用 Annotated[list, add] 即可自动拼接消息。

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
from operator import add

class QnAState(TypedDict):
    messages: Annotated[list[str], add]

def user_prompt(state: QnAState) -> dict:
    question = input("问机器人什么?")
    return {"messages": [question]}

def llm_reply(state: QnAState) -> dict:
    question = state["messages"][-1]
    answer = "你问的是:" + question
    return {"messages": [answer]}

builder = StateGraph(QnAState)
builder.add_node("ask", user_prompt)
builder.add_node("answer", llm_reply)
builder.add_edge("ask", "answer")
builder.add_edge("answer", "ask")
builder.set_entry_point("ask")
graph = builder.compile()

state = {"messages": []}
state = graph.invoke(state)
state = graph.invoke(state)
print(state["messages"])  # 列表中有两轮问答

这就像滚雪球一样,每轮交互增加两条消息,形成对话记录。由于我们采用不可变设计,状态的历史始终清晰可见,便于调试和审计。

持久化与时间旅行

LangGraph 提供持久化机制,可在任意节点前打断点,实现“时间旅行”。你可以在图的执行过程中通过检查点保存状态,一旦出现错误或需要人类干预,可以回到之前的节点重新执行。官方文档提到,持久化依赖于外部存储,并需要状态不可变以确保每个快照都是独立的 。就像玩游戏时存档,如果你随意在存档里改数据,加载时就会出错。

使用持久化功能时,你只需要配置一个 checkpointer(如 Postgres、SQLite),并传入 thread_id。每次状态更新都会自动保存到数据库。一旦进程崩溃或需要暂停,重新启动时传入同样的 thread_id 即可从断点恢复。由于状态是不可变且累积更新的,我们不用担心历史状态丢失。

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
import os

class CounterState(TypedDict):
    count: Annotated[int, add]

def inc(state: CounterState) -> dict:
    return {"count": 1}

# 配置 SQLite 检查点
db_path = "/mnt/data/checkpoint.db"
if os.path.exists(db_path):
    os.remove(db_path)
checkpointer = SqliteSaver(db_path)

builder = StateGraph(CounterState)
builder.add_node("inc", inc)
builder.set_entry_point("inc")
graph = builder.compile(checkpointer=checkpointer)

state = {"count": 0}
state = graph.invoke(state, config={"configurable": {"thread_id": "demo"}})
state = graph.invoke(state, config={"configurable": {"thread_id": "demo"}})
print(state)  # 计数2
# 模拟崩溃或退出……随后重启
after_restart = graph.invoke(state, config={"configurable": {"thread_id": "demo"}})
print(after_restart)  # 计数3,自动恢复

多节点如何安全地共享状态

并行执行的挑战

在现实项目中,一个 Agent 往往要调用多个工具并分析结果。例如,你想读取邮件内容、提取关键词、同时向数据库写入日志。如果所有节点都修改同一个状态字段且没定义 reducer,那么结果可能互相覆盖;如果使用错误的 reducer 又可能导致重复或数据丢失。

LangGraph 通过 reducer 机制解决并行合并问题。每个节点返回的增量会通过 reducer 合并 。如果你需要组合多个 schema(输入状态、输出状态、私有状态),LangGraph 会将它们并集成一个整体 。这相当于让每个房间都有自己的抽屉,大家只能往自己的抽屉放东西,管理员则负责把这些抽屉组合成大衣柜。

来看一个例子:假设我们要对文本进行分词、情感分析和实体识别,并把结果分别写入状态中。我们可以定义以下状态:

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph

class NLPState(TypedDict):
    tokens: Annotated[list[str], add]
    sentiment: str
    entities: Annotated[list[str], add]

# 三个节点分别处理不同任务
def tokenize(state: NLPState):
    text = state["text"]
    return {"tokens": text.split()}

def analyze_sentiment(state: NLPState):
    sentiment = "positive" if "love" in state["text"] else "neutral"
    return {"sentiment": sentiment}

def extract_entities(state: NLPState):
    entities = [w for w in state.get("tokens", []) if w.istitle()]
    return {"entities": entities}

builder = StateGraph(NLPState)
builder.add_node("tokenize", tokenize)
builder.add_node("sentiment", analyze_sentiment)
builder.add_node("entities", extract_entities)
# 并行调用,各自更新不同字段
builder.add_edge("start", "tokenize")
builder.add_edge("start", "sentiment")
builder.add_edge("start", "entities")
builder.set_entry_point("start")
graph = builder.compile()

state = {"tokens": [], "sentiment": "", "entities": [], "text": "I love LangGraph and Python"}
result = graph.invoke(state)
print(result)

在这个示例中,tokens 和 entities 使用 add reducer 确保并行调用返回的列表可以合并;sentiment 使用默认覆盖模式,因为它只有一个值。值得注意的是,如果某个字段不需要在多个节点中合并,则可以省略 Annotated,默认使用最后一次写入覆盖前面的值。

巧用多 schema:输入、输出和私有状态

官方文档允许我们定义多个 schema:例如 InputState、ProcessingState、OutputState。这在多 agent 协作中非常有用。假设你的工作流分为前端用户输入、模型内部处理和后台日志记录三个阶段,可以将它们分开定义:

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph

class InputState(TypedDict):
    text: str

class ProcessingState(TypedDict):
    tokens: Annotated[list[str], add]

class OutputState(TypedDict):
    summary: str

# 处理节点读取输入并生成 tokens
def process(state: InputState) -> dict:
    return {"tokens": state["text"].split()}

# 总结节点读取 tokens 并生成 summary
def summarize(state: ProcessingState) -> dict:
    summary = f"{len(state['tokens'])} words"
    return {"summary": summary}

builder = StateGraph(state_schema=InputState | ProcessingState | OutputState)
builder.add_node("process", process)
builder.add_node("summarize", summarize)
builder.add_edge("process", "summarize")
builder.set_entry_point("process")
graph = builder.compile()

state = {"text": "LangGraph is awesome", "tokens": [], "summary": ""}
result = graph.invoke(state)
print(result)  # tokens 与 summary 均被填充

这样做的好处是不同节点只需关注自己负责的部分,减少耦合,防止误写其他字段。

并发下的工具调用

我们再深入一点。上一节提到的自定义工具节点可以并行执行多个外部调用,并通过 reducer 将结果合并。这里扩展一个 Map-Reduce 的例子:假设要检索多个网页并提取标题,然后汇总统计词频。LangGraph 提供一个 Send API 用于并行执行同一节点的多个实例,但是在本章我们只需理解其基本思想:一个“发送”节点根据状态中的输入拆分任务,多个子任务并行执行,再由一个“接收”节点收集结果。

为了简化,我们用线程池模拟并行抓取:

from typing import TypedDict, Annotated
from operator import add
from concurrent.futures import ThreadPoolExecutor
from langgraph.graph import StateGraph

class ScrapeState(TypedDict):
    urls: list[str]
    titles: Annotated[list[str], add]
    word_count: Annotated[int, add]

def scrape_page(url: str) -> str:
    # 简化:用 URL 长度作为标题
    return f"Title {len(url)}"

def scrape_node(state: ScrapeState) -> dict:
    with ThreadPoolExecutor(max_workers=4) as executor:
        titles = list(executor.map(scrape_page, state["urls"]))
    total_words = sum(len(t.split()) for t in titles)
    return {"titles": titles, "word_count": total_words}

builder = StateGraph(ScrapeState)
builder.add_node("scrape", scrape_node)
builder.set_entry_point("scrape")
graph = builder.compile()

state = {"urls": ["http://a.com", "http://b.com"], "titles": [], "word_count": 0}
result = graph.invoke(state)
print(result)

通过给 titles 设置 add reducer,可以让下一轮任务追加更多标题;word_count 使用 add 累计词数。这样的并发机制在真实场景中可以扩展到数百个子任务,避免单节点阻塞。

常见 State 设计反模式

无类型的杂物堆

初学者的常见错误是把所有值都塞进一个字典,字段名随意,类型随缘。结果就是同事看不懂,你自己三天后也看不懂。最佳实践文章提醒我们,状态应该简洁、显式且类型安全 。若要记录临时变量,可以用局部变量或函数返回值,不必全放进状态。

反例:

# 非推荐:状态杂乱无章
state = {"foo": 123, "bar": [1, 2, 3], "x": "maybe", "y": {"unknown": True}}

改进:

class CleanState(TypedDict):
    user_id: int
    cart: list[str]
    is_logged_in: bool

state = {"user_id": 42, "cart": [], "is_logged_in": False}

直接修改状态

我们反复强调,节点应返回新的状态增量,而非直接修改输入 。如果你不幸这样做了,恭喜,你会看到一堆难以复现的 bug。

反例:

def evil(state):
    state["scores"].append(100)
    return state

正确:

def good(state):
    return {"scores": state["scores"] + [100]}

隐式依赖与副作用

在节点中偷偷写数据库、调用外部 API,却不记录在状态里,是非常危险的。正如最佳实践指出,持久化应该交由 LangGraph 的检查点机制处理 。副作用应该显式地暴露给框架和调用方。例如,你可以返回一个标记 {“need_write_db”: True},由上层统一执行。

反例:

def process_order(state):
    db.save(state["order"])  # 悄悄写入数据库
    return {"status": "done"}

正确:

def process_order(state):
    return {"status": "done", "save_to_db": state["order"]}
# 调用者检查 save_to_db 字段决定是否写数据库

全局状态滥用

有些人恨不得把所有局部变量都写入状态,以便“下一步可能用到”。结果是状态臃肿且难以维护。正确做法是:只有那些需要跨节点共享的数据才放入状态,其他变量用局部变量处理。记住:状态是流程的公共记忆,不是临时笔记本。

忽视循环控制

循环是 Agent 必不可少的一部分,比如重试任务、持续对话等。但如果你忘记设置退出条件,就会出现“死循环地狱”。最佳实践建议,在状态中存储计数器或上一次执行结果,用于判断是否继续 。

例子:我们为一个处理节点设置最大尝试次数:

from typing import TypedDict

class RetryState(TypedDict):
    attempts: int
    max_attempts: int

def do_task(state: RetryState) -> dict:
    if state["attempts"] < state["max_attempts"]:
        return {"attempts": state["attempts"] + 1}
    return {}

def should_continue(state: RetryState) -> str:
    return "retry" if state["attempts"] < state["max_attempts"] else "stop"

通过在状态中存储 attempts 和 max_attempts,我们可以优雅地控制循环。

小结与展望

看到这里,如果你还在坚持阅读,相信你已经 get 到了 LangGraph 状态的精髓:集中式、类型安全、不可变、可持久化。我们从为什么需要状态出发,讲到了如何用 TypedDict 和 Annotated 定义 schema,解释了 reducer 的机制和 Pydantic 的优缺点,演示了并发场景下的状态合并和多 schema 的应用,并列举了常见的反模式。

在下一章,我们将把视线转向节点(Node)设计的艺术,继续带你深入 LangGraph 的世界。别忘了,在代码世界里,学习永远不能忘了带点开心和自嘲。记住我的话:编写 Agent 就像养猫,既要有耐心,又要懂它的脾气,而状态就是你家的猫食——储得好才不闹饥荒。我们下一章见!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐