摘要:在构建复杂的 AI Agent 时,线性的 Chain 结构往往力不从心。Spring AI Alibaba Graph 引入了状态机与图论思想,让智能体拥有了“循环”与“记忆”。本文将深入剖析 Graph 的执行核心——GraphRunnerContextStreamingOutputNodeOutput 以及 OverAllState 的流转机制,并手把手带你用 Spring Boot 搭建一个具备自我修正能力的 AI 写作助手。

一、 核心概念解析

在深入代码之前,我们需要理解 Spring AI Alibaba Graph 引擎内部运作的四个核心齿轮。

1. OverAllState (全局状态/共享内存)

这是 Graph 的“大脑”。它是一个贯穿整个执行周期的共享对象(通常是一个 Map 或自定义 POJO)。

  • 作用:所有的节点(Node)都从这里读取数据,处理完后将结果写回这里。

  • 意义:实现了节点间的解耦。Node A 不需要知道 Node B 的存在,它们只需要针对 State 编程。

2. GraphRunnerContext (执行上下文)

这是 Graph 的“总线”。

  • 作用:它承载了当前的 OverAllState,同时管理着执行线程、回调函数以及流式输出的通道。

  • 生命周期:从 graph.run() 开始创建,直到图执行结束(End 节点)销毁。

3. NodeOutput (节点输出)

这是节点的“决策信号”。

  • 不仅仅是数据:一个节点执行完,不能只返回字符串。它需要返回:

    1. New State:更新后的部分状态。

    2. Route Signal:下一步去哪?(例如:suspend 暂停, end 结束, 或者自定义的路由 key)。

4. StreamingOutput (流式输出)

这是 Graph 的“实时广播”。

  • 痛点:在复杂的图执行中(比如 Node A -> Node B),用户不想等所有节点跑完才看到字。

  • 机制:Graph 引擎允许节点在执行过程中,通过 StreamingOutput 接口实时将 Token 推送给最外层的订阅者 (Client),实现类似 ChatGPT 的打字机效果,即使逻辑在后端跳跃了多个节点。

二、 Graph 完整执行过程图解

当一个 Graph 启动时,内部发生了什么

三、 实战:搭建“AI 写作与润色”智能体

我们将构建一个简单的 Agent:

  1. Writer 节点:负责根据主题写短文。

  2. Critic 节点:负责评审。如果字数不够,打回重写(Loop);如果合格,输出。

1. 项目搭建与依赖

环境:JDK 17+, Spring Boot 3.2+

Maven 依赖 (pom.xml):

<dependencies>
    <!-- Web 支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring AI Alibaba 核心与 Graph -->
    <!-- 注意:请使用最新的 Milestone 或 Release 版本 -->
    <dependency>
        <groupId>com.alibaba.cloud.ai</groupId>
        <artifactId>spring-ai-alibaba-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud.ai</groupId>
        <artifactId>spring-ai-alibaba-graph-core</artifactId>
    </dependency>
</dependencies>

配置 (application.yml):

spring:
  ai:
    dashscope:
      api-key: ${AI_API_KEY} # 填入你的通义千问 Key

2. 定义 OverAllState (全局状态)

这是一个 POJO,用来存储整个流程的数据。

import java.util.ArrayList;
import java.util.List;

public class WritingState {
    private String topic;          // 输入的主题
    private String content;        // 生成的内容
    private String critique;       // 评审意见
    private int iterationCount;    // 迭代次数(防止死循环)

    // Getters, Setters, Constructor
    public WritingState() { this.iterationCount = 0; }
}

3. 构建节点逻辑 (Node Implementation)

这里展示如何操作 GraphRunnerContext 和 NodeOutput。

A. Writer 节点 (写手)
import com.alibaba.cloud.ai.graph.GraphRunnerContext;
import com.alibaba.cloud.ai.graph.NodeOutput;
import org.springframework.ai.chat.client.ChatClient;

import java.util.function.BiFunction;

public class WriterNode implements BiFunction<WritingState, GraphRunnerContext, NodeOutput<WritingState>> {

    private final ChatClient chatClient;

    public WriterNode(ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    @Override
    public NodeOutput<WritingState> apply(WritingState state, GraphRunnerContext context) {
        System.out.println("✍️ Writer 正在写作... 第 " + (state.getIterationCount() + 1) + " 版");

        String prompt = "请写一篇关于 " + state.getTopic() + " 的短文,100字左右。";
        if (state.getCritique() != null) {
            prompt += "\n修改意见:" + state.getCritique();
        }

        // 1. 调用大模型 (这里演示非流式,流式需调用 .stream())
        String content = chatClient.prompt().user(prompt).call().content();

        // 2. 更新 State (注意:尽量返回新的 State 对象或更新字段)
        state.setContent(content);
        state.setIterationCount(state.getIterationCount() + 1);

        // 3. 设置 StreamingOutput (模拟流式输出到 Context,供前端消费)
        // 在真实场景中,这里是将 ChatModel 的 Flux<String> 桥接到 context.stream()
        context.publishStreamUpdate("Writing Update: " + content.substring(0, Math.min(10, content.length())) + "...");

        // 4. 返回 NodeOutput
        // null 表示继续默认路径,或者在这个 Simple Graph 中只负责更新状态
        return NodeOutput.of(state);
    }
}
B. Critic 节点 (评审与路由)

这个节点充当 Router,决定是结束还是重写。

public class CriticNode implements BiFunction<WritingState, GraphRunnerContext, NodeOutput<WritingState>> {

    @Override
    public NodeOutput<WritingState> apply(WritingState state, GraphRunnerContext context) {
        System.out.println("🧐 Critic 正在评审...");

        // 简单逻辑:如果字数少于 50 且重试次数少于 3,重写
        if (state.getContent().length() < 50 && state.getIterationCount() < 3) {
            state.setCritique("字数太少,请扩写!");
            System.out.println("❌ 评审不通过: 字数太少");
            // 路由到 "rewrite" 分支
            return NodeOutput.of(state).withRoute("rewrite");
        }

        System.out.println("✅ 评审通过");
        // 路由到 "end" 分支
        return NodeOutput.of(state).withRoute("end");
    }
}

4. 组装 Graph (Configuration)

@Configuration
public class GraphConfig {

    @Bean
    public StateGraph<WritingState> writingGraph(ChatClient.Builder builder) {
        ChatClient chatClient = builder.build();

        // 1. 创建图构建器
        StateGraph<WritingState> graph = new StateGraph<>(WritingState.class);

        // 2. 注册节点
        graph.addNode("writer", new WriterNode(chatClient));
        graph.addNode("critic", new CriticNode());

        // 3. 定义边 (Edges)
        // Start -> Writer
        graph.setEntryPoint("writer");
        
        // Writer -> Critic (写完就去审)
        graph.addEdge("writer", "critic");

        // 4. 定义条件边 (Conditional Edges)
        // Critic -> (Router Logic) -> Writer or End
        graph.addConditionalEdges(
            "critic",
            // 路由逻辑已经在 CriticNode 的 NodeOutput.withRoute() 中定义了
            // 这里只需要映射 key 到 节点名
            Map.of(
                "rewrite", "writer", // 如果 Critic 返回 "rewrite",跳回 writer
                "end", StateGraph.END // 如果返回 "end",结束
            )
        );

        return graph;
    }
}

5. 暴露接口 (Controller)

这里展示如何初始化 State 并运行 Graph。

@RestController
@RequestMapping("/agent")
public class AgentController {

    @Autowired
    private StateGraph<WritingState> writingGraph;

    @GetMapping("/write")
    public WritingState runAgent(@RequestParam String topic) {
        // 1. 初始化 OverAllState
        WritingState initialState = new WritingState();
        initialState.setTopic(topic);

        // 2. 编译并运行
        // CompiledGraph 内部会创建 GraphRunnerContext
        WritingState finalState = writingGraph.compile().invoke(initialState);

        return finalState;
    }
}

四、 深度总结:GraphRunnerContext 与 State 的舞步

在上面的 Demo 中,我们观察到了完整的生命周期:

  1. 设置 OverAllState: 在 Controller 中,我们 new WritingState() 并传入初始 Topic。这是整个图的输入参数

  2. GraphRunnerContext 启动: 当调用 .invoke(initialState) 时,引擎内部创建了 GraphRunnerContext,它像一个容器,包裹住 WritingState。

  3. 节点交互:

    • WriterNode 从参数中拿到 State,读取 Topic,写入 Content。

    • 关键点:WriterNode 返回的 NodeOutput 包含修改后的 State。Context 会捕获这个 Output,并用它刷新全局的 OverAllState。

  4. 路由决策:

    • CriticNode 不仅修改了 State(写入 critique),还通过 NodeOutput.withRoute("rewrite") 发出了控制信号。

    • Context 读取到这个信号,查阅路由表,将执行指针拨回 WriterNode。

  5. 流式输出 (Streaming):

    • 虽然 Demo 中 Controller 返回的是最终对象,但如果在 SSE (Server-Sent Events) 场景下,我们可以监听 context 的 Stream 通道,前端就能看到 AI 思考和重写的全过程,而不是干等几秒钟。

我们通过 Spring AI Alibaba Graph搭建出来的AI应用,不再是编写僵化的代码,而是在编排思维。这正是构建下一代 AI Native 应用的核心能力。

欢迎关注,一起沟通、一起进步~

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐