Spring AI Alibaba 1.x 系列【22】Agent 并行工具执行与超时 / 协作式取消实战
当 Agent 需要同时调用多个工具时(如查询天气 + 股票 + 新闻),异步模式可以显著减少用户等待时间,提升用户体验。
文章目录
1. 并行执行
当 Agent 需要同时调用多个工具时(如查询天气+股票+新闻),异步模式可以显著减少用户等待时间,提升用户体验。
1.1 大模型支持
核心概念:
| 概念 | 定义 | 实现方式 |
|---|---|---|
| 并行工具调用(Parallel Tool Calling) | 模型一次推理就能输出多个工具调用请求,这些工具可同时执行,无需等待前一个完成 | 模型原生能力 + 框架支持异步执行 |
| 顺序工具调用(Sequential Tool Calling) | 模型一次只能输出一个工具调用,必须等前一个工具执行完,模型才能决定下一个工具 | 所有支持工具调用的模型都能实现 |
提示:即使模型不支持原生并行调用,也可通过 Agent 框架(如 Spring AI Alibaba)规划后并行执行多个工具 |依赖 Agent 的规划能力,模型只需支持基本工具调用。
主流商业模型原生支持并行工具调用,开源 / 轻量模型大多不原生支持(未考证),但可通过 Agent 框架或提示词工程实现并行执行,在智谱 AI 开发文档中,可以看到 tool_calls 返回的是集合类型,说明是支持并行工具调用的:

1.2 定义异步工具
定义一个异步工具,需要以下几个步骤:
- 实现
AsyncToolCallback接口 - 实现
getToolDefinition方法,工具元数据定义,告诉Agent这个工具叫什么、能干什么、需要什么参数 - 实现
callAsync核心异步执行方法,提交任务到公共线程池 - 实现
call方法,提供同步调用的兼容能力 - 实现
getTimeout方法,设置工具执行超时时间,如果工具执行超过时间,框架会自动终止任务,避免卡死
当 ReactAgent 判断需要调用工具时:
Agent读取工具名称 + 描述 + 参数- 调用
callAsync()启动异步任务 - 主线程继续执行,不等待当前工具结果,可以执行下一个工具
- 工具在独立线程中执行,执行结束后返回结果
Agent接收所有异步工具执行结果,继续后续对话 / 推理
异步新闻工具 - 模拟 2.5秒 API 调用延迟:
public class AsyncNewsTool implements AsyncToolCallback {
@Override
public ToolDefinition getToolDefinition() {
return DefaultToolDefinition.builder()
.name("news")
.description("查询新闻资讯")
.inputSchema("{\"type\":\"object\",\"properties\":{\"topic\":{\"type\":\"string\"}}}")
.build();
}
@Override
public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
// 异步执行:提交任务到公共线程池
return CompletableFuture.supplyAsync(() -> {
// 打印执行日志:时间 + 线程名(验证异步)
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] news 开始执行");
try {
Thread.sleep(2500); // 模拟调用新闻API的网络延迟(2.5秒)
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断标志
throw new RuntimeException("News query interrupted", e);
}
// 执行完成日志
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] news 完成");
// 返回模拟的新闻结果(JSON格式,Agent可解析)
return "{\"topic\":\"AI\",\"articles\":10,\"summary\":\"最新AI动态\"}";
});
}
@Override
public Duration getTimeout() {
return Duration.ofSeconds(30);
}
@Override
public String call(String toolInput) {
return callAsync(toolInput, new ToolContext(Map.of())).join();
}
}
异步股票工具 - 模拟 3秒 API 调用延迟:
public class AsyncStockTool implements AsyncToolCallback {
@Override
public ToolDefinition getToolDefinition() {
return DefaultToolDefinition.builder()
.name("stock")
.description("查询股票价格")
.inputSchema("{\"type\":\"object\",\"properties\":{\"symbol\":{\"type\":\"string\"}}}")
.build();
}
@Override
public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 开始执行");
try {
Thread.sleep(3000); // 模拟 API 调用延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Stock query interrupted", e);
}
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 完成");
return "{\"symbol\":\"BABA\",\"price\":85.5,\"change\":\"+2.3%\"}";
});
}
@Override
public Duration getTimeout() {
return Duration.ofSeconds(30);
}
@Override
public String call(String toolInput) {
return callAsync(toolInput, new ToolContext(Map.of())).join();
}
}
异步天气工具 - 模拟 2 秒 API 调用延迟:
public class AsyncWeatherTool implements AsyncToolCallback {
@Override
public ToolDefinition getToolDefinition() {
return DefaultToolDefinition.builder()
.name("weather")
.description("查询天气信息")
.inputSchema("{\"type\":\"object\",\"properties\":{\"city\":{\"type\":\"string\"}}}")
.build();
}
@Override
public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] weather 开始执行");
try {
Thread.sleep(2000); // 模拟 API 调用延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Weather query interrupted", e);
}
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] weather 完成");
return "{\"city\":\"北京\",\"temperature\":25,\"condition\":\"晴\"}";
});
}
@Override
public Duration getTimeout() {
return Duration.ofSeconds(30);
}
@Override
public String call(String toolInput) {
return callAsync(toolInput, new ToolContext(Map.of())).join();
}
}
1.3 并行配置参数
ReactAgent 中的并行配置参数有以下 4 个:
// Async tool execution configuration
protected boolean parallelToolExecution = false; // 是否启用并行执行
protected int maxParallelTools = 5; // 最大并行工具数
protected Duration toolExecutionTimeout = Duration.ofMinutes(5); // 工具超时时间
protected boolean wrapSyncToolsAsAsync = false; // 是否将同步工具包装为异步
详细说明:
| 配置项 | 默认值 | 配置说明 |
|---|---|---|
| parallelToolExecution | false | 启用/禁用并行工具执行。开启后多个工具调用可并发执行 |
| maxParallelTools | 5 | 最大并行执行的工具数量,用于限制并发量,防止系统资源耗尽 |
| toolExecutionTimeout | 5分钟 | 工具执行的超时时间,工具超时后会自动取消执行并返回错误信息,如果某个工具自己配置了超时,则取两个中的最小值 |
| wrapSyncToolsAsAsync | false | 自动将同步工具包装为异步执行,使同步工具也具备并行执行的能力 |
使用示例:
ReactAgent agent = ReactAgent.builder()
.name("parallel_agent")
.model(chatModel)
.tools(weatherTool, stockTool, newsTool)
.parallelToolExecution(true) // 启用并行
.maxParallelTools(10) // 最大10个并行
.toolExecutionTimeout(Duration.ofSeconds(30)) // 30秒超时
.wrapSyncToolsAsAsync(true) // 包装同步工具
.build();
1.4 线程池配置
并行执行模式下,AgentToolNode 会使用以下方式获取线程池:
/**
* Get the executor for tool execution from config or use default.
*/
private Executor getToolExecutor(RunnableConfig config) {
return ParallelNode.getExecutor(config, AGENT_TOOL_NAME);
}
ParallelNode.getExecutor() 中三层优先级查找:
// ParallelNode.getExecutor() 逻辑
Executor getExecutor(RunnableConfig config, String nodeId) {
// 优先级1: 节点特定 executor(config.metadata(nodeId))
// 优先级2: 默认 parallel executor(config.metadata("_DEFAULT_PARALLEL_EXECUTOR_"))
// 优先级3: 内置 DEFAULT_EXECUTOR
}
方式 1 ,使用默认线程池(不配置时使用):
// ParallelNode 内置默认线程池
ThreadPoolExecutor DEFAULT_EXECUTOR = new ThreadPoolExecutor(
corePoolSize: Runtime.getRuntime().availableProcessors() * 2 // 最小4
maxPoolSize: Runtime.getRuntime().availableProcessors() * 4 // 最大200
keepAliveTime: 60秒
queueCapacity: 1000
threadFactory: "parallel-node-action-thread-{n}"
rejectionPolicy: CallerRunsPolicy // 拒绝时调用者线程执行
)
方式 2 ,通过 Builder 配置自定义线程池:
// 自定义线程池
Executor myExecutor = Executors.newFixedThreadPool(10);
ReactAgent agent = ReactAgent.builder()
.executor(myExecutor) // 设置自定义 executor
.parallelToolExecution(true) // 启用工具并行
.maxParallelTools(5)
.build();
// executor 会同时用于:
// 1. 工具并行执行(parallelToolExecution=true 时)
// 2. 作为 ParallelAgent 子图时的并行执行
方式 3 ,通过 RunnableConfig 配置自定义线程池:
RunnableConfig config = RunnableConfig.builder()
.defaultParallelExecutor(myExecutor) // 设置默认并行 executor
.build();
agent.call("question", config);
方式 4 ,通过节点特定配置自定义线程池:
RunnableConfig config = RunnableConfig.builder()
.addMetadata("__PARALLEL__(my_node)", nodeSpecificExecutor)
.build();
1.5 构建 ReactAgent
构建 ReactAgent 时配置相关并行参数:
// 启用深度思考
ZhiPuAiChatOptions chatOptions = ZhiPuAiChatOptions.builder()
.thinking(ZhiPuAiApi.ChatCompletionRequest.Thinking.enabled())
.build();
ReactAgent agent = ReactAgent.builder()
.name("my_agent")
.chatOptions(chatOptions)
.model(zhiPuAiChatModel)
.tools(new AsyncWeatherTool(), new AsyncStockTool(), new AsyncNewsTool())
.returnReasoningContents(true) // 开启子图思考过程
.parallelToolExecution(true) // 并行支持
.maxParallelTools(3) // 最大并行工具数
.toolExecutionTimeout(Duration.ofMinutes(5)) // 工具执行超时时间
.enableLogging(true)
.systemPrompt("你是一个信息查询助手。")// 当用户问多个问题时,同时使用所有相关工具查询。
.build();
// 提问需要 LLM 决定调用多个工具
String query = "请帮我查询:1)北京天气 2)阿里巴巴股票价格 3)最新AI新闻";
System.out.println("用户提问: " + query);
System.out.println();
Instant start = Instant.now();
Optional<OverAllState> result = null;
try {
result = agent.invoke(query);
} catch (GraphRunnerException e) {
throw new RuntimeException(e);
}
Instant end = Instant.now();
long totalMs = Duration.between(start, end).toMillis();
System.out.println("-".repeat(80));
System.out.println("【并行执行结果】");
System.out.println(" 实际耗时: " + totalMs + " ms (" + (totalMs / 1000.0) + "秒)");
if (result.isPresent()) {
System.out.println(" 执行成功: " + result.get().value("messages").isPresent());
}
1.6 单元测试
运行后查看返回的 OverAllState 完整执行状态:

思考内容如下:
用户要求查询三个不同的信息:
1. 北京天气 - 需要使用weather函数,参数city为"北京"
2. 阿里巴巴股票价格 - 需要使用stock函数,阿里巴巴的股票代码通常是"BABA"
3. 最新AI新闻 - 需要使用news函数,topic为"AI"
我需要调用这三个函数来获取信息。
大模型同时返回三个工具调用请求:

工具执行完成后,同时返回三个工具执行结果给大模型:

大模型最终返回的响应:

查看控制台,可以看到这三个工具是同一时间执行的:
[2026-04-17T02:06:19.884136800Z] [线程-ForkJoinPool.commonPool-worker-2] news 开始执行
[2026-04-17T02:06:19.884136800Z] [线程-ForkJoinPool.commonPool-worker-3] weather 开始执行
[2026-04-17T02:06:19.884136800Z] [线程-ForkJoinPool.commonPool-worker-1] stock 开始执行
2. 超时自动取消
【协作式取消的异步工具回调接口】CancellableAsyncToolCallback 继承 AsyncToolCallback,在异步执行基础上,支持新增了协作式取消的支持,专门用于优雅终止异步工具执行。
注意事项: 工具需要定期检查 CancellationToken 取消令牌,当收到取消请求时,优雅地停止执行。
2.1 取消令牌
2.1.1 CancellationToken
工具执行的【协作式取消】令牌接口,用于优雅终止异步工具/Agent的执行,工具需要定期检查取消状态并自行停止,协作式取消(非强制中断线程),工具主动检查取消信号,可安全清理资源后终止。
定义了以下方法:
isCancelled():判断是否已经收到取消请求throwIfCancelled():【默认方法】检查取消状态,若已取消则抛出工具取消异常onCancel(Runnable callback):注册【取消触发时的回调函数】,取消时执行资源释放、日志打印、状态重置等收尾操作
使用示例:
public CompletableFuture<String> callAsync(String args, ToolContext ctx, CancellationToken token) {
return CompletableFuture.supplyAsync(() -> {
for (int i = 0; i < 100; i++) {
// 检查是否触发取消,触发则抛出异常终止任务
token.throwIfCancelled();
// 执行业务逻辑
}
return "result";
});
}
}
定义了一个空令牌(永不取消):
/**
* 【空实现取消令牌】:永不触发取消
* <p>
* 适用场景:不需要取消功能的工具,作为默认参数使用,避免空指针
* <p>
* 特性:
* 1. isCancelled() 永远返回 false
* 2. onCancel() 注册的回调永远不会执行
* 3. 无任何取消能力,需要真实取消功能请使用 DefaultCancellationToken
*/
CancellationToken NONE = new CancellationToken() {
/**
* 固定返回false,永远不取消
*/
@Override
public boolean isCancelled() {
return false;
}
/**
* 空实现:永不执行回调
*/
@Override
public void onCancel(Runnable callback) {
// 空实现,该令牌永远不会取消,因此无需处理回调
}
};
源码如下:
/**
* 工具执行的【协作式取消】令牌接口
*
* @author disaster
* @since 1.0.0
* @see DefaultCancellationToken 默认的取消令牌实现类
* @see CancellableAsyncToolCallback 可取消异步工具回调接口
*/
public interface CancellationToken {
/**
* 判断是否已经收到取消请求
* @return true=已触发取消,false=未触发取消
*/
boolean isCancelled();
/**
* 【默认方法】检查取消状态,若已取消则抛出工具取消异常
* <p>
* 开发中最常用的方法,一行代码实现取消检查+任务终止
* @throws ToolCancelledException 当触发取消时抛出该异常
*/
default void throwIfCancelled() throws ToolCancelledException {
// 判断是否取消,是则抛出异常
if (isCancelled()) {
throw new ToolCancelledException("Tool execution was cancelled");
}
}
/**
* 注册【取消触发时的回调函数】
* <p>
* 用途:取消时执行资源释放、日志打印、状态重置等收尾操作
* <p>
* 特性:如果已经触发取消,回调会立即执行
* @param callback 取消触发后执行的回调任务
*/
void onCancel(Runnable callback);
/**
* 【空实现取消令牌】:永不触发取消
* <p>
* 适用场景:不需要取消功能的工具,作为默认参数使用,避免空指针
* <p>
* 特性:
* 1. isCancelled() 永远返回 false
* 2. onCancel() 注册的回调永远不会执行
* 3. 无任何取消能力,需要真实取消功能请使用 DefaultCancellationToken
*/
CancellationToken NONE = new CancellationToken() {
/**
* 固定返回false,永远不取消
*/
@Override
public boolean isCancelled() {
return false;
}
/**
* 空实现:永不执行回调
*/
@Override
public void onCancel(Runnable callback) {
// 空实现,该令牌永远不会取消,因此无需处理回调
}
};
}
2.2.2 DefaultCancellationToken
CancellationToken 取消令牌的【默认线程安全实现类】。
核心特性:
- 线程安全,支持多线程环境下的取消操作
- 支持注册多个取消回调函数
- 若注册回调时已取消,会立即执行回调
- 支持与
CompletableFuture绑定,实现取消信号自动传递
使用示例:
DefaultCancellationToken token = new DefaultCancellationToken();
// 注册资源清理回调
token.onCancel(() -> cleanupResources());
// 启动异步工具任务
CompletableFuture<String> future = tool.callAsync(args, ctx, token);
// 主动触发取消
token.cancel();
}
源码如下:
public class DefaultCancellationToken implements CancellationToken {
/**
* 【内部工具类】取消回调包装器
* 作用:保证每个取消回调**只执行一次**,防止重复执行
*/
private static final class CancellationCallback {
/** 真实的业务回调任务 */
private final Runnable delegate;
/** 原子标记:标记回调是否已执行,保证线程安全 */
private final AtomicBoolean executed = new AtomicBoolean(false);
/**
* 构造方法:包装业务回调
* @param delegate 业务回调逻辑
*/
private CancellationCallback(Runnable delegate) {
this.delegate = delegate;
}
/**
* 执行回调,**保证仅执行一次**
* 通过CAS原子操作确保多线程下只执行一次
*/
private void runOnce() {
// CAS:仅当未执行时,才执行回调并标记为已执行
if (executed.compareAndSet(false, true)) {
delegate.run();
}
}
}
/**
* 取消状态标记(原子布尔值)
* true=已取消,false=未取消
* 保证多线程下的状态可见性与原子性
*/
private final AtomicBoolean cancelled = new AtomicBoolean(false);
/**
* 取消回调列表
* 使用 CopyOnWriteArrayList 保证线程安全,支持高并发读写
*/
private final List<CancellationCallback> callbacks = new CopyOnWriteArrayList<>();
/**
* 判断当前令牌是否已触发取消
* @return 已取消返回true,未取消返回false
*/
@Override
public boolean isCancelled() {
return cancelled.get();
}
/**
* 注册取消触发后的回调任务
* <p>
* 逻辑:
* 1. 包装回调并加入列表
* 2. 如果当前已经取消,立即执行回调
* @param callback 取消时执行的任务
*/
@Override
public void onCancel(Runnable callback) {
CancellationCallback cancellationCallback = new CancellationCallback(callback);
// 添加回调到集合
callbacks.add(cancellationCallback);
// 如果已经取消,直接执行一次回调
if (cancelled.get()) {
cancellationCallback.runOnce();
}
}
/**
* 【核心方法】主动触发取消
* <p>
* 特性:**幂等性**,多次调用效果和一次一致,回调仅执行一次
* 逻辑:
* 1. CAS原子修改状态为已取消
* 2. 遍历执行所有注册的回调
*/
public void cancel() {
// 仅当状态从 false -> true 时,才执行后续逻辑
if (cancelled.compareAndSet(false, true)) {
// 执行所有取消回调,每个仅执行一次
callbacks.forEach(CancellationCallback::runOnce);
}
}
/**
* 创建【绑定CompletableFuture】的取消令牌
* <p>
* 功能:当异步Future被取消时,自动触发当前令牌的取消
* 注意:单向绑定,取消令牌不会反向取消Future
* @param future 要绑定的异步任务
* @return 绑定后的取消令牌实例
*/
public static DefaultCancellationToken linkedTo(CompletableFuture<?> future) {
DefaultCancellationToken token = new DefaultCancellationToken();
// 监听Future完成/取消事件
future.whenComplete((result, error) -> {
// 如果Future被取消,自动触发令牌取消
if (future.isCancelled()) {
token.cancel();
}
});
return token;
}
}
2.2 异步执行中的超时和取消
核心处理逻辑:
- 给
CancellableAsyncToolCallback创建取消令牌 - 发生超时异常,调用令牌
cancel()方法,主线程返回工具异常信息(不抛出异常),子线程仍在执行 - 工具中检测到令牌处于取消状态时,执行取消回调用于资源清理
执行异步工具调用核心方法:
/**
* 执行异步工具调用:内置超时处理 + 外部取消令牌跟踪能力
* <p>
* 核心功能:
* <ul>
* <li>支持 CancellableAsyncToolCallback 可取消异步工具,实现协作式取消</li>
* <li>工具超时时,自动触发取消令牌,让工具优雅终止</li>
* <li>支持并行工具执行:将取消令牌存入外部Map,供全局超时管理器统一控制</li>
* </ul>
* @param callback 异步工具回调实例(普通/可取消)
* @param request 工具调用请求对象(包含参数、工具ID、名称)
* @param toolContextMap 工具上下文参数集合
* @param config 运行时配置(线程ID、取消令牌等)
* @param extraStateFromToolCall 收集工具执行产生的状态更新
* @param cancellationTokens 并行执行专用:存储取消令牌的Map(可为null)
* @param toolIndex 并行工具索引(作为key存入取消令牌Map)
* @return 工具执行响应结果(成功/失败)
*/
private ToolCallResponse executeAsyncTool(AsyncToolCallback callback, ToolCallRequest request,
Map<String, Object> toolContextMap, RunnableConfig config, Map<String, Object> extraStateFromToolCall,
Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {
// 构建工具执行上下文
ToolContext context = new ToolContext(toolContextMap);
// 声明取消令牌:仅可取消异步工具会初始化
DefaultCancellationToken cancellationToken = null;
try {
CompletableFuture<String> future;
// ===================== 核心分支:区分工具类型 =====================
// 1. 如果是【可取消异步工具】:创建取消令牌,并注册到并行管理器
if (callback instanceof CancellableAsyncToolCallback cancellable) {
// 初始化取消令牌实例
cancellationToken = new DefaultCancellationToken();
// 并行执行场景:将令牌存入外部Map,允许外层超时处理器统一取消
if (cancellationTokens != null && toolIndex >= 0) {
cancellationTokens.put(toolIndex, cancellationToken);
}
// 调用可取消工具的异步方法,传入取消令牌
future = cancellable.callAsync(request.getArguments(), context, cancellationToken);
}
// 2. 普通异步工具:直接执行,无取消能力
else {
future = callback.callAsync(request.getArguments(), context);
}
// 防御性判断:异步任务为空,直接返回错误
if (future == null) {
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(),
"Async tool returned null future");
}
// ===================== 执行异步任务:超时控制 =====================
// 等待任务完成,使用工具自身配置的超时时间,超时抛出 TimeoutException
String result = future.orTimeout(callback.getTimeout().toMillis(), TimeUnit.MILLISECONDS).join();
// 日志输出:工具执行完成
if (enableActingLog) {
logger.info("[ThreadId {}] Agent {} acting, async tool {} finished",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, request.getToolName());
if (logger.isDebugEnabled()) {
logger.debug("Tool {} returned: {}", request.getToolName(), result);
}
}
// 返回工具执行成功结果
return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
}
// ===================== 异步执行异常捕获 =====================
catch (CompletionException e) {
// 获取真实异常原因
Throwable cause = e.getCause() != null ? e.getCause() : e;
// 异常1:工具执行超时 → 触发取消 + 清理状态
if (cause instanceof TimeoutException) {
// 超时后主动取消工具,释放资源
if (cancellationToken != null) {
cancellationToken.cancel();
}
// 清理脏数据:超时后丢弃所有未完成的状态更新
extraStateFromToolCall.clear();
logger.warn("Async tool {} timed out, discarding any state updates", request.getToolName());
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
}
// 异常2:工具业务执行异常 → 使用异常处理器处理
else if (cause instanceof ToolExecutionException toolExecutionException) {
logger.error("Async tool {} execution failed, handling with processor: {}", request.getToolName(),
toolExecutionExceptionProcessor.getClass().getName(), toolExecutionException);
String result = toolExecutionExceptionProcessor.process(toolExecutionException);
return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
}
// 异常3:其他异步执行异常
else {
logger.error("Async tool {} execution failed: {}", request.getToolName(), cause.getMessage(), cause);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
}
}
// 异常4:任务被主动取消
catch (CancellationException e) {
logger.warn("Async tool {} execution was cancelled", request.getToolName(), e);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
}
// 异常5:兜底捕获所有未知异常
catch (Exception e) {
logger.error("Async tool {} execution failed: {}", request.getToolName(), e.getMessage(), e);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
}
}
2.2.1 CompletableFuture
CompletableFuture 是Java 8 引入了的一个强大的异步编程核心类,解决了传统 Future 的痛点,支持异步执行、超时控制、主动取消、链式回调、异常处理,是你 Spring AI Alibaba 中异步工具、并行工具调用、超时 / 取消功能的底层基石。
核心方法包括:
supplyAsync()→ 开启异步任务orTimeout()→ 设置任务超时join()→ 等待异步任务完成cancel()→ 主动取消任务
核心异常:
TimeoutException:任务执行超时抛出CancellationException:主动调用取消抛出
| 对比项 | CancellationException 取消异常 | TimeoutException 超时异常 |
|---|---|---|
| 触发原因 | 主动取消(人 / 系统叫停) | 被动超时(任务太慢) |
| 谁触发 | token.cancel() / dispose() | orTimeout() 时间到 |
| 性质 | 正常信号 | 执行异常 |
| 日志级别 | WARN(警告) | ERROR/WARN(错误) |
| 你的场景 | 用户停止 AI、断开连接 | API 调用超时卡死 |
| 处理目标 | 终止任务,释放资源 | 终止任务,防止阻塞 |
2.2.2 超时时间设置
单个异步工具执行时会设置超时时间:
// AgentToolNode.java 第 723 行(单个异步工具)
future.orTimeout(callback.getTimeout().toMillis(), TimeUnit.MILLISECONDS)
并行执行时,会统一的超时时间:
// AgentToolNode.java 第 348 行(并行执行)
.orTimeout(toolExecutionTimeout.toMillis(), TimeUnit.MILLISECONDS)
2.2.3 异常处理
在执行异步工具调用 executeAsyncTool 方法中,可以找到任务取消的相关逻辑。
如果是【可取消异步工具】(实现了 CancellableAsyncToolCallback ):
- 初始化取消令牌实例
DefaultCancellationToken - 如果是并行工具执行,将当前工具的取消令牌存入统一的外部
Map - 调用
CancellableAsyncToolCallback#callAsync并传入取消令牌
if (callback instanceof CancellableAsyncToolCallback cancellable) {
cancellationToken = new DefaultCancellationToken();
// Store the token in the external map so outer timeout handler can cancel it
if (cancellationTokens != null && toolIndex >= 0) {
cancellationTokens.put(toolIndex, cancellationToken);
}
future = cancellable.callAsync(request.getArguments(), context, cancellationToken);
}
TimeoutException 处理:
- 捕获工具执行超时
TimeoutException异常,设置取消令牌的状态为【取消】 - 返回
ToolCallResponse.error
// Cancel the token to notify the tool to stop gracefully
if (cancellationToken != null) {
cancellationToken.cancel();
}
extraStateFromToolCall.clear();
logger.warn("Async tool {} timed out, discarding any state updates", request.getToolName());
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
CancellationException 处理:
catch (CancellationException e) {
logger.warn("Async tool {} execution was cancelled", request.getToolName(), e);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
}
注意:异步工具运行在独立子线程,TimeoutException 不会杀死正在运行的线程,线程会继续在后台执行任务,必须手动配合 CancellationToken.cancel() 才能让任务真正停止。
2.3 超时问题演示
修改异步工具,使用循环模拟一个需要多个步骤才能完成的任务(需要 2.5 秒),并设置超时时间为 1 秒:
public class AsyncStockTool implements AsyncToolCallback {
@Override
public ToolDefinition getToolDefinition() {
return DefaultToolDefinition.builder()
.name("stock")
.description("查询股票价格")
.inputSchema("{\"type\":\"object\",\"properties\":{\"symbol\":{\"type\":\"string\"}}}")
.build();
}
@Override
public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 开始执行");
try {
for (int i = 0; i < 5; i++) {
System.out.printf("查询股票价格第 %d 步%n", i + 1);
Thread.sleep(500); // 模拟 API 调用延迟
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Stock query interrupted", e);
}
System.out.println(" [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 完成");
return "{\"symbol\":\"BABA\",\"price\":85.5,\"change\":\"+2.3%\"}";
});
}
@Override
public Duration getTimeout() {
return Duration.ofSeconds(1);
}
@Override
public String call(String toolInput) {
return callAsync(toolInput, new ToolContext(Map.of())).join();
}
}
执行后可以看到问题,子线程超时,直接返回了错误信息【阿里巴巴股票价格查询也超时了】,但是子线程还是继续执行,存在一定的资源浪费:

2.4 优雅的取消
定义可取消的异步股票工具:
/**
* 可取消的异步股票工具 - 模拟 API 调用,支持超时/主动取消
*
* <p>改造要点:</p>
* <ul>
* <li>实现 CancellableAsyncToolCallback 接口</li>
* <li>callAsync 方法增加 CancellationToken 参数</li>
* <li>在执行过程中定期检查取消状态</li>
* <li>注册 onCancel 回调进行资源清理</li>
* </ul>
*
* @author disaster
* @since 1.0.0
*/
public class AsyncStockTool implements CancellableAsyncToolCallback {
@Override
public ToolDefinition getToolDefinition() {
return DefaultToolDefinition.builder()
.name("stock")
.description("查询股票价格,支持取消")
.inputSchema("{\"type\":\"object\",\"properties\":{\"symbol\":{\"type\":\"string\"}}}")
.build();
}
/**
* 带取消令牌的异步执行方法
*
* <p>关键改动:</p>
* <ul>
* <li>增加 CancellationToken 参数</li>
* <li>定期检查取消状态</li>
* <li>注册 onCancel 回调进行资源清理</li>
* </ul>
*/
@Override
public CompletableFuture<String> callAsync(String arguments, ToolContext context,
CancellationToken cancellationToken) {
return CompletableFuture.supplyAsync(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(" [" + Instant.now() + "] [线程-" + threadName + "] stock 开始执行");
// 【关键 1】注册取消回调 - 用于资源清理
cancellationToken.onCancel(() -> {
System.out.println(" [" + Instant.now() + "] [线程-" + threadName + "] stock 收到取消通知,清理资源...");
// 这里可以关闭HTTP连接、释放数据库连接等
});
// 模拟分步执行股票查询(5步 * 500ms = 2.5秒)
for (int i = 0; i < 5; i++) {
// 【关键 2】每一步都检查取消状态,触发则抛出异常终止任务
// 方式 1 :检查到取消时抛出 ToolCancelledException ,停止子线程
cancellationToken.throwIfCancelled();
// 方式 2 :检查到取消时自定义处理逻辑
/*
if (cancellationToken.isCancelled()){
// 自定义处理逻辑
System.out.println("......................");
}
*/
// 打印查询进度
System.out.printf(" [" + Instant.now() + "] [线程-" + threadName + "] 查询股票价格第 %d 步%n", i + 1);
// 模拟 API 调用延迟
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Stock query interrupted", e);
}
}
System.out.println(" [" + Instant.now() + "] [线程-" + threadName + "] stock 完成");
return "{\"symbol\":\"BABA\",\"price\":85.5,\"change\":\"+2.3%\",\"status\":\"success\"}";
});
}
/**
* 默认超时时间:1秒
*/
@Override
public Duration getTimeout() {
return Duration.ofSeconds(1);
}
/**
* 同步调用(兼容非取消场景)
*
* <p>使用 CancellationToken.NONE 表示不支持取消</p>
*/
@Override
public String call(String toolInput) {
return callAsync(toolInput, new ToolContext(Map.of()), CancellationToken.NONE).join();
}
}
再次执行,可以看到工具执行到第二步后,就被优雅的停止了,资源清理也正常执行:

2.5 主动取消
问:ReactAgent 可以主动取消工具任务吗?
答:目前 ❌ 不支持
限制原因:
- 框架当前设计:仅支持超时自动取消,未开放手动取消
API - 流式调用 / 会话中断的主动取消能力未在
1.x版本落地 - 取消令牌仅由框架内部管理,不对外暴露
更多推荐


所有评论(0)