1. 并行执行

Agent 需要同时调用多个工具时(如查询天气+股票+新闻),异步模式可以显著减少用户等待时间,提升用户体验。

1.1 大模型支持

核心概念:

概念 定义 实现方式
并行工具调用(Parallel Tool Calling) 模型一次推理就能输出多个工具调用请求,这些工具可同时执行,无需等待前一个完成 模型原生能力 + 框架支持异步执行
顺序工具调用(Sequential Tool Calling) 模型一次只能输出一个工具调用,必须等前一个工具执行完,模型才能决定下一个工具 所有支持工具调用的模型都能实现

提示:即使模型不支持原生并行调用,也可通过 Agent 框架(如 Spring AI Alibaba)规划后并行执行多个工具 |依赖 Agent 的规划能力,模型只需支持基本工具调用。

主流商业模型原生支持并行工具调用,开源 / 轻量模型大多不原生支持(未考证),但可通过 Agent 框架或提示词工程实现并行执行,在智谱 AI 开发文档中,可以看到 tool_calls 返回的是集合类型,说明是支持并行工具调用的:

在这里插入图片描述

1.2 定义异步工具

定义一个异步工具,需要以下几个步骤:

  • 实现 AsyncToolCallback 接口
  • 实现 getToolDefinition 方法,工具元数据定义,告诉 Agent 这个工具叫什么、能干什么、需要什么参数
  • 实现 callAsync 核心异步执行方法,提交任务到公共线程池
  • 实现 call 方法,提供同步调用的兼容能力
  • 实现 getTimeout 方法,设置工具执行超时时间,如果工具执行超过时间,框架会自动终止任务,避免卡死

ReactAgent 判断需要调用工具时:

  • Agent 读取工具名称 + 描述 + 参数
  • 调用 callAsync() 启动异步任务
  • 主线程继续执行,不等待当前工具结果,可以执行下一个工具
  • 工具在独立线程中执行,执行结束后返回结果
  • Agent 接收所有异步工具执行结果,继续后续对话 / 推理

异步新闻工具 - 模拟 2.5API 调用延迟:

public class AsyncNewsTool implements AsyncToolCallback {

    @Override
    public ToolDefinition getToolDefinition() {
        return DefaultToolDefinition.builder()
            .name("news")
            .description("查询新闻资讯")
            .inputSchema("{\"type\":\"object\",\"properties\":{\"topic\":{\"type\":\"string\"}}}")
            .build();
    }

    @Override
    public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
        // 异步执行:提交任务到公共线程池
        return CompletableFuture.supplyAsync(() -> {
            // 打印执行日志:时间 + 线程名(验证异步)
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] news 开始执行");
            try {
                Thread.sleep(2500); // 模拟调用新闻API的网络延迟(2.5秒)
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断标志
                throw new RuntimeException("News query interrupted", e);
            }
            // 执行完成日志
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] news 完成");
            // 返回模拟的新闻结果(JSON格式,Agent可解析)
            return "{\"topic\":\"AI\",\"articles\":10,\"summary\":\"最新AI动态\"}";
        });
    }

    @Override
    public Duration getTimeout() {
        return Duration.ofSeconds(30);
    }

    @Override
    public String call(String toolInput) {
        return callAsync(toolInput, new ToolContext(Map.of())).join();
    }
}

异步股票工具 - 模拟 3API 调用延迟:

public class AsyncStockTool implements AsyncToolCallback {

    @Override
    public ToolDefinition getToolDefinition() {
        return DefaultToolDefinition.builder()
            .name("stock")
            .description("查询股票价格")
            .inputSchema("{\"type\":\"object\",\"properties\":{\"symbol\":{\"type\":\"string\"}}}")
            .build();
    }

    @Override
    public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 开始执行");
            try {
                Thread.sleep(3000); // 模拟 API 调用延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Stock query interrupted", e);
            }
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 完成");
            return "{\"symbol\":\"BABA\",\"price\":85.5,\"change\":\"+2.3%\"}";
        }); 
    }

    @Override
    public Duration getTimeout() {
        return Duration.ofSeconds(30);
    }

    @Override
    public String call(String toolInput) {
        return callAsync(toolInput, new ToolContext(Map.of())).join();
    }
}

异步天气工具 - 模拟 2API 调用延迟:

public class AsyncWeatherTool implements AsyncToolCallback {

    @Override
    public ToolDefinition getToolDefinition() {
        return DefaultToolDefinition.builder()
            .name("weather")
            .description("查询天气信息")
            .inputSchema("{\"type\":\"object\",\"properties\":{\"city\":{\"type\":\"string\"}}}")
            .build();
    }

    @Override
    public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] weather 开始执行");
            try {
                Thread.sleep(2000); // 模拟 API 调用延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Weather query interrupted", e);
            }
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] weather 完成");
            return "{\"city\":\"北京\",\"temperature\":25,\"condition\":\"晴\"}";
        }); 
    }

    @Override
    public Duration getTimeout() {
        return Duration.ofSeconds(30);
    }

    @Override
    public String call(String toolInput) {
        return callAsync(toolInput, new ToolContext(Map.of())).join();
    }
}

1.3 并行配置参数

ReactAgent 中的并行配置参数有以下 4 个:

// Async tool execution configuration
protected boolean parallelToolExecution = false;       // 是否启用并行执行
protected int maxParallelTools = 5;                    // 最大并行工具数
protected Duration toolExecutionTimeout = Duration.ofMinutes(5);  // 工具超时时间
protected boolean wrapSyncToolsAsAsync = false;        // 是否将同步工具包装为异步

详细说明:

配置项 默认值 配置说明
parallelToolExecution false 启用/禁用并行工具执行。开启后多个工具调用可并发执行
maxParallelTools 5 最大并行执行的工具数量,用于限制并发量,防止系统资源耗尽
toolExecutionTimeout 5分钟 工具执行的超时时间,工具超时后会自动取消执行并返回错误信息,如果某个工具自己配置了超时,则取两个中的最小值
wrapSyncToolsAsAsync false 自动将同步工具包装为异步执行,使同步工具也具备并行执行的能力

使用示例:

ReactAgent agent = ReactAgent.builder()
    .name("parallel_agent")
    .model(chatModel)
    .tools(weatherTool, stockTool, newsTool)
    .parallelToolExecution(true)           // 启用并行
    .maxParallelTools(10)                  // 最大10个并行
    .toolExecutionTimeout(Duration.ofSeconds(30))  // 30秒超时
    .wrapSyncToolsAsAsync(true)            // 包装同步工具
    .build();

1.4 线程池配置

并行执行模式下,AgentToolNode 会使用以下方式获取线程池:

	/**
	 * Get the executor for tool execution from config or use default.
	 */
	private Executor getToolExecutor(RunnableConfig config) {
		return ParallelNode.getExecutor(config, AGENT_TOOL_NAME);
	}

ParallelNode.getExecutor() 中三层优先级查找:

// ParallelNode.getExecutor() 逻辑
Executor getExecutor(RunnableConfig config, String nodeId) {
    // 优先级1: 节点特定 executor(config.metadata(nodeId))
    // 优先级2: 默认 parallel executor(config.metadata("_DEFAULT_PARALLEL_EXECUTOR_"))
    // 优先级3: 内置 DEFAULT_EXECUTOR
}

方式 1 ,使用默认线程池(不配置时使用):

// ParallelNode 内置默认线程池
ThreadPoolExecutor DEFAULT_EXECUTOR = new ThreadPoolExecutor(
    corePoolSize:    Runtime.getRuntime().availableProcessors() * 2  // 最小4
    maxPoolSize:     Runtime.getRuntime().availableProcessors() * 4  // 最大200
    keepAliveTime:   60秒
    queueCapacity:   1000
    threadFactory:   "parallel-node-action-thread-{n}"
    rejectionPolicy: CallerRunsPolicy  // 拒绝时调用者线程执行
)

方式 2 ,通过 Builder 配置自定义线程池:

// 自定义线程池
Executor myExecutor = Executors.newFixedThreadPool(10);

ReactAgent agent = ReactAgent.builder()
    .executor(myExecutor)              // 设置自定义 executor
    .parallelToolExecution(true)       // 启用工具并行
    .maxParallelTools(5)
    .build();

// executor 会同时用于:
// 1. 工具并行执行(parallelToolExecution=true 时)
// 2. 作为 ParallelAgent 子图时的并行执行

方式 3 ,通过 RunnableConfig 配置自定义线程池:

RunnableConfig config = RunnableConfig.builder()
    .defaultParallelExecutor(myExecutor)  // 设置默认并行 executor
    .build();

agent.call("question", config);

方式 4 ,通过节点特定配置自定义线程池:

RunnableConfig config = RunnableConfig.builder()
    .addMetadata("__PARALLEL__(my_node)", nodeSpecificExecutor)
    .build();

1.5 构建 ReactAgent

构建 ReactAgent 时配置相关并行参数:

        // 启用深度思考
        ZhiPuAiChatOptions chatOptions = ZhiPuAiChatOptions.builder()
                .thinking(ZhiPuAiApi.ChatCompletionRequest.Thinking.enabled())
                .build();

        ReactAgent agent = ReactAgent.builder()
                .name("my_agent")
                .chatOptions(chatOptions)
                .model(zhiPuAiChatModel)
                .tools(new AsyncWeatherTool(), new AsyncStockTool(), new AsyncNewsTool())
                .returnReasoningContents(true) // 开启子图思考过程
                .parallelToolExecution(true)  // 并行支持
                .maxParallelTools(3) // 最大并行工具数
                .toolExecutionTimeout(Duration.ofMinutes(5)) // 工具执行超时时间
                .enableLogging(true)
                .systemPrompt("你是一个信息查询助手。")// 当用户问多个问题时,同时使用所有相关工具查询。
                .build();

        // 提问需要 LLM 决定调用多个工具
        String query = "请帮我查询:1)北京天气 2)阿里巴巴股票价格 3)最新AI新闻";
        System.out.println("用户提问: " + query);
        System.out.println();

        Instant start = Instant.now();
        Optional<OverAllState> result = null;
        try {
            result = agent.invoke(query);
        } catch (GraphRunnerException e) {
            throw new RuntimeException(e);
        }

        Instant end = Instant.now();
        long totalMs = Duration.between(start, end).toMillis();

        System.out.println("-".repeat(80));
        System.out.println("【并行执行结果】");
        System.out.println("  实际耗时: " + totalMs + " ms (" + (totalMs / 1000.0) + "秒)");

        if (result.isPresent()) {
            System.out.println("  执行成功: " + result.get().value("messages").isPresent());
        }

1.6 单元测试

运行后查看返回的 OverAllState 完整执行状态:

在这里插入图片描述

思考内容如下:

用户要求查询三个不同的信息:
1. 北京天气 - 需要使用weather函数,参数city为"北京"
2. 阿里巴巴股票价格 - 需要使用stock函数,阿里巴巴的股票代码通常是"BABA"
3. 最新AI新闻 - 需要使用news函数,topic为"AI"

我需要调用这三个函数来获取信息。

大模型同时返回三个工具调用请求:

在这里插入图片描述
工具执行完成后,同时返回三个工具执行结果给大模型:

在这里插入图片描述

大模型最终返回的响应:

在这里插入图片描述
查看控制台,可以看到这三个工具是同一时间执行的:

    [2026-04-17T02:06:19.884136800Z] [线程-ForkJoinPool.commonPool-worker-2] news 开始执行
    [2026-04-17T02:06:19.884136800Z] [线程-ForkJoinPool.commonPool-worker-3] weather 开始执行
    [2026-04-17T02:06:19.884136800Z] [线程-ForkJoinPool.commonPool-worker-1] stock 开始执行

2. 超时自动取消

【协作式取消的异步工具回调接口】CancellableAsyncToolCallback 继承 AsyncToolCallback,在异步执行基础上,支持新增了协作式取消的支持,专门用于优雅终止异步工具执行。

注意事项: 工具需要定期检查 CancellationToken 取消令牌,当收到取消请求时,优雅地停止执行。

2.1 取消令牌

2.1.1 CancellationToken

工具执行的【协作式取消】令牌接口,用于优雅终止异步工具/Agent的执行,工具需要定期检查取消状态并自行停止,协作式取消(非强制中断线程),工具主动检查取消信号,可安全清理资源后终止。

定义了以下方法:

  • isCancelled():判断是否已经收到取消请求
  • throwIfCancelled():【默认方法】检查取消状态,若已取消则抛出工具取消异常
  • onCancel(Runnable callback):注册【取消触发时的回调函数】,取消时执行资源释放、日志打印、状态重置等收尾操作

使用示例:

public CompletableFuture<String> callAsync(String args, ToolContext ctx, CancellationToken token) {
      return CompletableFuture.supplyAsync(() -> {
          for (int i = 0; i < 100; i++) {
              // 检查是否触发取消,触发则抛出异常终止任务
              token.throwIfCancelled();
              // 执行业务逻辑
          }
          return "result";
      });
  }

}

定义了一个空令牌(永不取消):

	/**
	 * 【空实现取消令牌】:永不触发取消
	 * <p>
	 * 适用场景:不需要取消功能的工具,作为默认参数使用,避免空指针
	 * <p>
	 * 特性:
	 * 1. isCancelled() 永远返回 false
	 * 2. onCancel() 注册的回调永远不会执行
	 * 3. 无任何取消能力,需要真实取消功能请使用 DefaultCancellationToken
	 */
	CancellationToken NONE = new CancellationToken() {

		/**
		 * 固定返回false,永远不取消
		 */
		@Override
		public boolean isCancelled() {
			return false;
		}

		/**
		 * 空实现:永不执行回调
		 */
		@Override
		public void onCancel(Runnable callback) {
			// 空实现,该令牌永远不会取消,因此无需处理回调
		}
	};

源码如下:

/**
 * 工具执行的【协作式取消】令牌接口
 *
 * @author disaster
 * @since 1.0.0
 * @see DefaultCancellationToken  默认的取消令牌实现类
 * @see CancellableAsyncToolCallback 可取消异步工具回调接口
 */
public interface CancellationToken {

	/**
	 * 判断是否已经收到取消请求
	 * @return true=已触发取消,false=未触发取消
	 */
	boolean isCancelled();

	/**
	 * 【默认方法】检查取消状态,若已取消则抛出工具取消异常
	 * <p>
	 * 开发中最常用的方法,一行代码实现取消检查+任务终止
	 * @throws ToolCancelledException 当触发取消时抛出该异常
	 */
	default void throwIfCancelled() throws ToolCancelledException {
		// 判断是否取消,是则抛出异常
		if (isCancelled()) {
			throw new ToolCancelledException("Tool execution was cancelled");
		}
	}

	/**
	 * 注册【取消触发时的回调函数】
	 * <p>
	 * 用途:取消时执行资源释放、日志打印、状态重置等收尾操作
	 * <p>
	 * 特性:如果已经触发取消,回调会立即执行
	 * @param callback 取消触发后执行的回调任务
	 */
	void onCancel(Runnable callback);

	/**
	 * 【空实现取消令牌】:永不触发取消
	 * <p>
	 * 适用场景:不需要取消功能的工具,作为默认参数使用,避免空指针
	 * <p>
	 * 特性:
	 * 1. isCancelled() 永远返回 false
	 * 2. onCancel() 注册的回调永远不会执行
	 * 3. 无任何取消能力,需要真实取消功能请使用 DefaultCancellationToken
	 */
	CancellationToken NONE = new CancellationToken() {

		/**
		 * 固定返回false,永远不取消
		 */
		@Override
		public boolean isCancelled() {
			return false;
		}

		/**
		 * 空实现:永不执行回调
		 */
		@Override
		public void onCancel(Runnable callback) {
			// 空实现,该令牌永远不会取消,因此无需处理回调
		}
	};

}

2.2.2 DefaultCancellationToken

CancellationToken 取消令牌的【默认线程安全实现类】。

核心特性:

  • 线程安全,支持多线程环境下的取消操作
  • 支持注册多个取消回调函数
  • 若注册回调时已取消,会立即执行回调
  • 支持与 CompletableFuture 绑定,实现取消信号自动传递

使用示例:

 DefaultCancellationToken token = new DefaultCancellationToken();

 // 注册资源清理回调
 token.onCancel(() -> cleanupResources());

 // 启动异步工具任务
 CompletableFuture<String> future = tool.callAsync(args, ctx, token);

 // 主动触发取消
 token.cancel();
 }

源码如下:

public class DefaultCancellationToken implements CancellationToken {

	/**
	 * 【内部工具类】取消回调包装器
	 * 作用:保证每个取消回调**只执行一次**,防止重复执行
	 */
	private static final class CancellationCallback {

		/** 真实的业务回调任务 */
		private final Runnable delegate;

		/** 原子标记:标记回调是否已执行,保证线程安全 */
		private final AtomicBoolean executed = new AtomicBoolean(false);

		/**
		 * 构造方法:包装业务回调
		 * @param delegate 业务回调逻辑
		 */
		private CancellationCallback(Runnable delegate) {
			this.delegate = delegate;
		}

		/**
		 * 执行回调,**保证仅执行一次**
		 * 通过CAS原子操作确保多线程下只执行一次
		 */
		private void runOnce() {
			// CAS:仅当未执行时,才执行回调并标记为已执行
			if (executed.compareAndSet(false, true)) {
				delegate.run();
			}
		}

	}

	/**
	 * 取消状态标记(原子布尔值)
	 * true=已取消,false=未取消
	 * 保证多线程下的状态可见性与原子性
	 */
	private final AtomicBoolean cancelled = new AtomicBoolean(false);

	/**
	 * 取消回调列表
	 * 使用 CopyOnWriteArrayList 保证线程安全,支持高并发读写
	 */
	private final List<CancellationCallback> callbacks = new CopyOnWriteArrayList<>();

	/**
	 * 判断当前令牌是否已触发取消
	 * @return 已取消返回true,未取消返回false
	 */
	@Override
	public boolean isCancelled() {
		return cancelled.get();
	}

	/**
	 * 注册取消触发后的回调任务
	 * <p>
	 * 逻辑:
	 * 1. 包装回调并加入列表
	 * 2. 如果当前已经取消,立即执行回调
	 * @param callback 取消时执行的任务
	 */
	@Override
	public void onCancel(Runnable callback) {
		CancellationCallback cancellationCallback = new CancellationCallback(callback);
		// 添加回调到集合
		callbacks.add(cancellationCallback);
		// 如果已经取消,直接执行一次回调
		if (cancelled.get()) {
			cancellationCallback.runOnce();
		}
	}

	/**
	 * 【核心方法】主动触发取消
	 * <p>
	 * 特性:**幂等性**,多次调用效果和一次一致,回调仅执行一次
	 * 逻辑:
	 * 1. CAS原子修改状态为已取消
	 * 2. 遍历执行所有注册的回调
	 */
	public void cancel() {
		// 仅当状态从 false -> true 时,才执行后续逻辑
		if (cancelled.compareAndSet(false, true)) {
			// 执行所有取消回调,每个仅执行一次
			callbacks.forEach(CancellationCallback::runOnce);
		}
	}

	/**
	 * 创建【绑定CompletableFuture】的取消令牌
	 * <p>
	 * 功能:当异步Future被取消时,自动触发当前令牌的取消
	 * 注意:单向绑定,取消令牌不会反向取消Future
	 * @param future 要绑定的异步任务
	 * @return 绑定后的取消令牌实例
	 */
	public static DefaultCancellationToken linkedTo(CompletableFuture<?> future) {
		DefaultCancellationToken token = new DefaultCancellationToken();
		// 监听Future完成/取消事件
		future.whenComplete((result, error) -> {
			// 如果Future被取消,自动触发令牌取消
			if (future.isCancelled()) {
				token.cancel();
			}
		});
		return token;
	}
}

2.2 异步执行中的超时和取消

核心处理逻辑:

  1. CancellableAsyncToolCallback 创建取消令牌
  2. 发生超时异常,调用令牌 cancel() 方法,主线程返回工具异常信息(不抛出异常),子线程仍在执行
  3. 工具中检测到令牌处于取消状态时,执行取消回调用于资源清理

执行异步工具调用核心方法:

	/**
	 * 执行异步工具调用:内置超时处理 + 外部取消令牌跟踪能力
	 * <p>
	 * 核心功能:
	 * <ul>
	 *   <li>支持 CancellableAsyncToolCallback 可取消异步工具,实现协作式取消</li>
	 *   <li>工具超时时,自动触发取消令牌,让工具优雅终止</li>
	 *   <li>支持并行工具执行:将取消令牌存入外部Map,供全局超时管理器统一控制</li>
	 * </ul>
	 * @param callback 异步工具回调实例(普通/可取消)
	 * @param request 工具调用请求对象(包含参数、工具ID、名称)
	 * @param toolContextMap 工具上下文参数集合
	 * @param config 运行时配置(线程ID、取消令牌等)
	 * @param extraStateFromToolCall 收集工具执行产生的状态更新
	 * @param cancellationTokens 并行执行专用:存储取消令牌的Map(可为null)
	 * @param toolIndex 并行工具索引(作为key存入取消令牌Map)
	 * @return 工具执行响应结果(成功/失败)
	 */
	private ToolCallResponse executeAsyncTool(AsyncToolCallback callback, ToolCallRequest request,
			Map<String, Object> toolContextMap, RunnableConfig config, Map<String, Object> extraStateFromToolCall,
			Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {

		// 构建工具执行上下文
		ToolContext context = new ToolContext(toolContextMap);

		// 声明取消令牌:仅可取消异步工具会初始化
		DefaultCancellationToken cancellationToken = null;

		try {
			CompletableFuture<String> future;

			// ===================== 核心分支:区分工具类型 =====================
			// 1. 如果是【可取消异步工具】:创建取消令牌,并注册到并行管理器
			if (callback instanceof CancellableAsyncToolCallback cancellable) {
				// 初始化取消令牌实例
				cancellationToken = new DefaultCancellationToken();
				// 并行执行场景:将令牌存入外部Map,允许外层超时处理器统一取消
				if (cancellationTokens != null && toolIndex >= 0) {
					cancellationTokens.put(toolIndex, cancellationToken);
				}
				// 调用可取消工具的异步方法,传入取消令牌
				future = cancellable.callAsync(request.getArguments(), context, cancellationToken);
			}
			// 2. 普通异步工具:直接执行,无取消能力
			else {
				future = callback.callAsync(request.getArguments(), context);
			}

			// 防御性判断:异步任务为空,直接返回错误
			if (future == null) {
				return ToolCallResponse.error(request.getToolCallId(), request.getToolName(),
						"Async tool returned null future");
			}

			// ===================== 执行异步任务:超时控制 =====================
			// 等待任务完成,使用工具自身配置的超时时间,超时抛出 TimeoutException
			String result = future.orTimeout(callback.getTimeout().toMillis(), TimeUnit.MILLISECONDS).join();

			// 日志输出:工具执行完成
			if (enableActingLog) {
				logger.info("[ThreadId {}] Agent {} acting, async tool {} finished",
						config.threadId().orElse(THREAD_ID_DEFAULT), agentName, request.getToolName());
				if (logger.isDebugEnabled()) {
					logger.debug("Tool {} returned: {}", request.getToolName(), result);
				}
			}

			// 返回工具执行成功结果
			return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
		}
		// ===================== 异步执行异常捕获 =====================
		catch (CompletionException e) {
			// 获取真实异常原因
			Throwable cause = e.getCause() != null ? e.getCause() : e;

			// 异常1:工具执行超时 → 触发取消 + 清理状态
			if (cause instanceof TimeoutException) {
				// 超时后主动取消工具,释放资源
				if (cancellationToken != null) {
					cancellationToken.cancel();
				}
				// 清理脏数据:超时后丢弃所有未完成的状态更新
				extraStateFromToolCall.clear();
				logger.warn("Async tool {} timed out, discarding any state updates", request.getToolName());
				return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
			}
			// 异常2:工具业务执行异常 → 使用异常处理器处理
			else if (cause instanceof ToolExecutionException toolExecutionException) {
				logger.error("Async tool {} execution failed, handling with processor: {}", request.getToolName(),
						toolExecutionExceptionProcessor.getClass().getName(), toolExecutionException);
				String result = toolExecutionExceptionProcessor.process(toolExecutionException);
				return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
			}
			// 异常3:其他异步执行异常
			else {
				logger.error("Async tool {} execution failed: {}", request.getToolName(), cause.getMessage(), cause);
				return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
			}
		}
		// 异常4:任务被主动取消
		catch (CancellationException e) {
			logger.warn("Async tool {} execution was cancelled", request.getToolName(), e);
			return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
		}
		// 异常5:兜底捕获所有未知异常
		catch (Exception e) {
			logger.error("Async tool {} execution failed: {}", request.getToolName(), e.getMessage(), e);
			return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
		}
	}

2.2.1 CompletableFuture

CompletableFuture 是Java 8 引入了的一个强大的异步编程核心类,解决了传统 Future 的痛点,支持异步执行、超时控制、主动取消、链式回调、异常处理,是你 Spring AI Alibaba 中异步工具、并行工具调用、超时 / 取消功能的底层基石。

核心方法包括:

  • supplyAsync() → 开启异步任务
  • orTimeout() → 设置任务超时
  • join() → 等待异步任务完成
  • cancel() → 主动取消任务

核心异常:

  • TimeoutException:任务执行超时抛出
  • CancellationException:主动调用取消抛出
对比项 CancellationException 取消异常 TimeoutException 超时异常
触发原因 主动取消(人 / 系统叫停) 被动超时(任务太慢)
谁触发 token.cancel() / dispose() orTimeout() 时间到
性质 正常信号 执行异常
日志级别 WARN(警告) ERROR/WARN(错误)
你的场景 用户停止 AI、断开连接 API 调用超时卡死
处理目标 终止任务,释放资源 终止任务,防止阻塞

2.2.2 超时时间设置

单个异步工具执行时会设置超时时间:

// AgentToolNode.java 第 723 行(单个异步工具)
future.orTimeout(callback.getTimeout().toMillis(), TimeUnit.MILLISECONDS)

并行执行时,会统一的超时时间:

// AgentToolNode.java 第 348 行(并行执行)
.orTimeout(toolExecutionTimeout.toMillis(), TimeUnit.MILLISECONDS)

2.2.3 异常处理

在执行异步工具调用 executeAsyncTool 方法中,可以找到任务取消的相关逻辑。

如果是【可取消异步工具】(实现了 CancellableAsyncToolCallback ):

  1. 初始化取消令牌实例 DefaultCancellationToken
  2. 如果是并行工具执行,将当前工具的取消令牌存入统一的外部 Map
  3. 调用 CancellableAsyncToolCallback#callAsync 并传入取消令牌
			if (callback instanceof CancellableAsyncToolCallback cancellable) {
				cancellationToken = new DefaultCancellationToken();
				// Store the token in the external map so outer timeout handler can cancel it
				if (cancellationTokens != null && toolIndex >= 0) {
					cancellationTokens.put(toolIndex, cancellationToken);
				}
				future = cancellable.callAsync(request.getArguments(), context, cancellationToken);
			}

TimeoutException 处理:

  1. 捕获工具执行超时 TimeoutException 异常,设置取消令牌的状态为【取消】
  2. 返回 ToolCallResponse.error
				// Cancel the token to notify the tool to stop gracefully
				if (cancellationToken != null) {
					cancellationToken.cancel();
				}
				extraStateFromToolCall.clear();
				logger.warn("Async tool {} timed out, discarding any state updates", request.getToolName());
				return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));

CancellationException 处理:

		catch (CancellationException e) {
			logger.warn("Async tool {} execution was cancelled", request.getToolName(), e);
			return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
		}

注意:异步工具运行在独立子线程,TimeoutException 不会杀死正在运行的线程,线程会继续在后台执行任务,必须手动配合 CancellationToken.cancel() 才能让任务真正停止。

2.3 超时问题演示

修改异步工具,使用循环模拟一个需要多个步骤才能完成的任务(需要 2.5 秒),并设置超时时间为 1 秒:

public class AsyncStockTool implements AsyncToolCallback {

    @Override
    public ToolDefinition getToolDefinition() {
        return DefaultToolDefinition.builder()
                .name("stock")
                .description("查询股票价格")
                .inputSchema("{\"type\":\"object\",\"properties\":{\"symbol\":{\"type\":\"string\"}}}")
                .build();
    }

    @Override
    public CompletableFuture<String> callAsync(String arguments, ToolContext context) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 开始执行");
            try {
                for (int i = 0; i < 5; i++) {
                    System.out.printf("查询股票价格第 %d 步%n", i + 1);
                    Thread.sleep(500); // 模拟 API 调用延迟
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Stock query interrupted", e);
            }
            System.out.println("    [" + Instant.now() + "] [线程-" + Thread.currentThread().getName() + "] stock 完成");
            return "{\"symbol\":\"BABA\",\"price\":85.5,\"change\":\"+2.3%\"}";
        }); 
    }

    @Override
    public Duration getTimeout() {
        return Duration.ofSeconds(1);
    }

    @Override
    public String call(String toolInput) {
        return callAsync(toolInput, new ToolContext(Map.of())).join();
    }
}

执行后可以看到问题,子线程超时,直接返回了错误信息【阿里巴巴股票价格查询也超时了】,但是子线程还是继续执行,存在一定的资源浪费:

在这里插入图片描述

2.4 优雅的取消

定义可取消的异步股票工具:

/**
 * 可取消的异步股票工具 - 模拟 API 调用,支持超时/主动取消
 *
 * <p>改造要点:</p>
 * <ul>
 *   <li>实现 CancellableAsyncToolCallback 接口</li>
 *   <li>callAsync 方法增加 CancellationToken 参数</li>
 *   <li>在执行过程中定期检查取消状态</li>
 *   <li>注册 onCancel 回调进行资源清理</li>
 * </ul>
 *
 * @author disaster
 * @since 1.0.0
 */
public class AsyncStockTool implements CancellableAsyncToolCallback {

    @Override
    public ToolDefinition getToolDefinition() {
        return DefaultToolDefinition.builder()
                .name("stock")
                .description("查询股票价格,支持取消")
                .inputSchema("{\"type\":\"object\",\"properties\":{\"symbol\":{\"type\":\"string\"}}}")
                .build();
    }

    /**
     * 带取消令牌的异步执行方法
     *
     * <p>关键改动:</p>
     * <ul>
     *   <li>增加 CancellationToken 参数</li>
     *   <li>定期检查取消状态</li>
     *   <li>注册 onCancel 回调进行资源清理</li>
     * </ul>
     */
    @Override
    public CompletableFuture<String> callAsync(String arguments, ToolContext context,
                                               CancellationToken cancellationToken) {

        return CompletableFuture.supplyAsync(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("    [" + Instant.now() + "] [线程-" + threadName + "] stock 开始执行");

            // 【关键 1】注册取消回调 - 用于资源清理
            cancellationToken.onCancel(() -> {
                System.out.println("    [" + Instant.now() + "] [线程-" + threadName + "] stock 收到取消通知,清理资源...");
                // 这里可以关闭HTTP连接、释放数据库连接等
            });

            // 模拟分步执行股票查询(5步 * 500ms = 2.5秒)
            for (int i = 0; i < 5; i++) {
                // 【关键 2】每一步都检查取消状态,触发则抛出异常终止任务
                // 方式 1 :检查到取消时抛出 ToolCancelledException ,停止子线程
                cancellationToken.throwIfCancelled();
                // 方式 2 :检查到取消时自定义处理逻辑
/*
                if (cancellationToken.isCancelled()){
                    // 自定义处理逻辑
                    System.out.println("......................");
                }
*/

                // 打印查询进度
                System.out.printf("    [" + Instant.now() + "] [线程-" + threadName + "] 查询股票价格第 %d 步%n", i + 1);

                // 模拟 API 调用延迟
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Stock query interrupted", e);
                }
            }

            System.out.println("    [" + Instant.now() + "] [线程-" + threadName + "] stock 完成");
            return "{\"symbol\":\"BABA\",\"price\":85.5,\"change\":\"+2.3%\",\"status\":\"success\"}";

        });
    }

    /**
     * 默认超时时间:1秒
     */
    @Override
    public Duration getTimeout() {
        return Duration.ofSeconds(1);
    }

    /**
     * 同步调用(兼容非取消场景)
     *
     * <p>使用 CancellationToken.NONE 表示不支持取消</p>
     */
    @Override
    public String call(String toolInput) {
        return callAsync(toolInput, new ToolContext(Map.of()), CancellationToken.NONE).join();
    }
}

再次执行,可以看到工具执行到第二步后,就被优雅的停止了,资源清理也正常执行:

在这里插入图片描述

2.5 主动取消

问:ReactAgent 可以主动取消工具任务吗?
答:目前 ❌ 不支持

限制原因:

  • 框架当前设计:仅支持超时自动取消,未开放手动取消 API
  • 流式调用 / 会话中断的主动取消能力未在 1.x 版本落地
  • 取消令牌仅由框架内部管理,不对外暴露
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐