Future和CompletableFuture详解
Future:表示一个异步计算的结果,提供了检查计算是否完成、等待计算完成、获取计算结果的方法。核心思想同步调用:main线程 ──────→ 调用方法 ──────→ 等待返回 ──────→ 继续执行|_____耗时操作_____|异步调用(Future):main线程 ──────→ 提交任务 ──────→ 继续执行其他逻辑 ──────→ get()获取结果↓工作线程执行任务(异步)//
·
Future和CompletableFuture详解
Future是Java异步编程的核心,CompletableFuture是Java 8引入的强大异步编程工具。
Future接口详解
什么是Future
Future:表示一个异步计算的结果,提供了检查计算是否完成、等待计算完成、获取计算结果的方法。
核心思想:
同步调用:
main线程 ──────→ 调用方法 ──────→ 等待返回 ──────→ 继续执行
|_____耗时操作_____|
异步调用(Future):
main线程 ──────→ 提交任务 ──────→ 继续执行其他逻辑 ──────→ get()获取结果
↓
工作线程执行任务(异步)
Future接口定义
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否被取消
boolean isCancelled();
// 任务是否完成
boolean isDone();
// 获取结果(阻塞)
V get() throws InterruptedException, ExecutionException;
// 获取结果(超时)
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future的使用
public class FutureDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 提交Callable任务,返回Future
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("执行任务中...");
Thread.sleep(2000);
return "任务结果";
}
});
System.out.println("继续执行main线程的其他逻辑...");
// 检查任务状态
System.out.println("任务是否完成: " + future.isDone()); // false
System.out.println("任务是否取消: " + future.isCancelled()); // false
// 阻塞获取结果
System.out.println("获取结果: " + future.get()); // 阻塞2秒
System.out.println("任务是否完成: " + future.isDone()); // true
pool.shutdown();
}
}
输出:
继续执行main线程的其他逻辑...
任务是否完成: false
任务是否取消: false
执行任务中...
获取结果: 任务结果
任务是否完成: true
FutureTask深度解析
FutureTask类图
FutureTask继承关系:
┌────────────┐
│ Runnable │ ← 可以作为Thread的target
└────────────┘
↑
│
┌──────┴──────┐
│RunnableFuture│
└──────┬──────┘
↑
│ 同时实现
┌──────┴──────┐
│ Future │ ← 提供Future的所有方法
└─────────────┘
↑
│
┌──────┴──────┐
│ FutureTask │ ← 包装Callable/Runnable
└─────────────┘
FutureTask核心成员
public class FutureTask<V> implements RunnableFuture<V> {
// ===== 任务状态(7种) =====
private volatile int state;
private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 完成中
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 异常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED = 6; // 已中断
// ===== 核心成员变量 =====
private Callable<V> callable; // 任务对象
private Object outcome; // 返回结果或异常
private volatile Thread runner; // 执行任务的线程
private volatile WaitNode waiters; // get()阻塞线程队列(栈结构)
// ===== 等待节点 =====
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
状态转换图
┌─────────────────────────────────────────────────────────────┐
│ FutureTask状态转换详细图 │
└─────────────────────────────────────────────────────────────┘
┌─────────┐
│ NEW │ ← 初始状态
│ (state=0)│
└────┬────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 正常执行 │ │ 异常执行 │ │ 取消 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ├─────────┐
┌────────────┐ ┌────────────┐ │ │
│ COMPLETING │ │ COMPLETING │ │ │
│ (state=1) │ │ (state=1) │ ▼ ▼
└─────┬──────┘ └─────┬──────┘ ┌─────┐ ┌───────────┐
│ │ │CANCE│ │INTERRUPTING│
▼ ▼ │LLED │ │(state=3) │
┌──────────┐ ┌──────────┐ │(2) │ └──────┬────┘
│ NORMAL │ │EXCEPTION-│ └─────┘ │
│(state=2) │ │AL(state=3)│ ▼
└──────────┘ └──────────┘ ┌────────────┐
✅ ❌ │INTERRUPTED │
正常完成 异常完成 │ (state=4) │
└────────────┘
❌
取消并中断
【状态码】:
0 = NEW 初始状态
1 = COMPLETING 执行中(过渡态)
2 = NORMAL 正常完成
3 = EXCEPTIONAL 异常完成
4 = CANCELLED 被取消(无中断)
5 = INTERRUPTING 中断中(过渡态)
6 = INTERRUPTED 已中断
构造方法
// 1. 传入Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 初始状态为NEW
}
// 2. 传入Runnable + 返回值
public FutureTask(Runnable runnable, V result) {
// 使用适配器模式将Runnable转换为Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
// Runnable适配器
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run(); // 执行Runnable的run方法
return result; // 返回指定的结果
}
}
run()方法源码分析
public void run() {
// 条件1:state != NEW说明任务已被执行或取消
// 条件2:CAS设置runner为当前线程,失败说明被其他线程抢占
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 【核心】执行call()方法,获取结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 设置异常
setException(ex);
}
if (ran)
// 设置结果
set(result);
}
} finally {
runner = null; // help GC
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
run()方法详细流程图:
┌──────────────────────────────────────────────┐
│ FutureTask#run() │
│ 任务执行流程 │
└────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────┐
│ state != NEW? │ ← 任务已执行/取消?
└──────┬─────────┬────────┘
YES NO
│ │
▼ ▼
┌────────┐ ┌──────────────────────┐
│ return │ │ CAS设置runner │
│ 任务 │ │ runner: null → 当前线程│
│ 已结束│ └──────┬──────┬────────┘
└────────┘ 失败 成功
│ │
▼ ▼
┌────────┐ ┌──────────────┐
│ return │ │ double-check │
│ 被抢占│ │ c != null && │
└────────┘ │ state == NEW │
└──────┬───────┘
│
▼
┌──────────────┐
│ result = │
│ c.call() │ ← 核心执行
└───┬──────┬───┘
成功 异常
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ran = true│ │ran = false │
│ │ │result = null │
└────┬─────┘ └──────┬───────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ set(result) │ │ setException(ex) │
├──────────────┤ ├──────────────────┤
│ 1. CAS: │ │ 1. CAS: │
│ NEW→ │ │ NEW→ │
│ COMPLETING │ │ COMPLETING │
│ 2. outcome=v │ │ 2. outcome=ex │
│ 3. state= │ │ 3. state= │
│ NORMAL │ │ EXCEPTIONAL │
│ 4. finish- │ │ 4. finish- │
│ Completion()│ │ Completion() │
└──────┬───────┘ └──────┬───────────┘
│ │
└────────┬────────┘
▼
┌───────────────────────┐
│ finishCompletion() │
│ 唤醒所有get()阻塞线程 │
├───────────────────────┤
│ 1. 遍历waiters链表 │
│ 2. LockSupport.unpark│
│ 每个等待线程 │
└───────────────────────┘
【关键点】:
✅ CAS保证只有一个线程执行
✅ COMPLETING是过渡状态
✅ outcome保存结果或异常
✅ finishCompletion唤醒所有等待线程
set()方法:设置正常结果
protected void set(V v) {
// CAS设置状态:NEW → COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 【保存结果】
// 设置最终状态:COMPLETING → NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 唤醒所有等待的线程
finishCompletion();
}
}
setException()方法:设置异常结果
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t; // 保存异常
// 设置最终状态:COMPLETING → EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
finishCompletion()方法:唤醒所有get()阻塞的线程
private void finishCompletion() {
// 遍历waiters链表(栈)
for (WaitNode q; (q = waiters) != null;) {
// CAS设置waiters为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒等待线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // help GC
q = next;
}
break;
}
}
done(); // 钩子方法
callable = null; // help GC
}
get()方法源码分析
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果任务未完成,阻塞等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回结果
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
awaitDone()方法:阻塞等待(核心方法)
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 【三次自旋】
for (;;) {
// 检查中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任务已完成,返回状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 任务即将完成,让出CPU
else if (s == COMPLETING)
Thread.yield();
// 【第1次自旋】创建WaitNode
else if (q == null)
q = new WaitNode();
// 【第2次自旋】入队(头插法)
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 【第3次自旋】阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 【阻塞当前线程】
LockSupport.park(this);
}
}
WaitNode队列图示:
waiters(栈结构,头插头取):
初始状态:
waiters → null
线程1调用get():
waiters → [thread1] → null
线程2调用get():
waiters → [thread2] → [thread1] → null
线程3调用get():
waiters → [thread3] → [thread2] → [thread1] → null
任务完成后,finishCompletion()依次唤醒:
thread3 → thread2 → thread1
report()方法:返回结果或抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome; // 获取结果
if (s == NORMAL)
return (V)x; // 正常返回
if (s >= CANCELLED)
throw new CancellationException(); // 任务被取消
throw new ExecutionException((Throwable)x); // 抛出异常
}
cancel()方法源码分析
public boolean cancel(boolean mayInterruptIfRunning) {
// CAS修改状态:NEW → INTERRUPTING/CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中断执行任务的线程
t.interrupt();
} finally {
// 设置最终状态:INTERRUPTING → INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有get()阻塞的线程
finishCompletion();
}
return true;
}
FutureTask完整示例
public class FutureTaskDemo {
public static void main(String[] args) throws Exception {
// 创建FutureTask
FutureTask<Integer> task = new FutureTask<>(() -> {
System.out.println("任务开始执行...");
Thread.sleep(2000);
return 100;
});
// 启动线程执行任务
new Thread(task, "t1").start();
// 多个线程可以同时get
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +
" 获取结果: " + task.get());
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +
" 获取结果: " + task.get());
} catch (Exception e) {
e.printStackTrace();
}
}, "t3").start();
// main线程获取结果
System.out.println("main 获取结果: " + task.get());
}
}
输出:
任务开始执行...
main 获取结果: 100
t2 获取结果: 100
t3 获取结果: 100
submit vs execute
方法对比
public interface ExecutorService {
// 1. execute:只能提交Runnable,无返回值
void execute(Runnable command);
// 2. submit:可以提交Runnable/Callable,返回Future
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
}
submit源码分析
// AbstractExecutorService
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 包装成FutureTask,返回值为null
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask); // 调用execute执行
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 包装成FutureTask,返回值为指定的result
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 包装成FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
异常处理对比
public class ExecuteVsSubmitDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
// ===== execute:直接抛出异常 =====
pool.execute(() -> {
System.out.println("execute 执行");
int i = 1 / 0; // 异常会被打印到控制台
});
Thread.sleep(100);
// ===== submit:吞掉异常 =====
pool.submit(() -> {
System.out.println("submit 执行");
int i = 1 / 0; // 异常被吞掉,不会打印
});
Thread.sleep(100);
// ===== submit:通过get()获取异常 =====
Future<?> future = pool.submit(() -> {
System.out.println("submit 执行(get)");
int i = 1 / 0;
});
try {
future.get(); // 调用get()时才抛出异常
} catch (ExecutionException e) {
System.out.println("捕获异常: " + e.getCause());
}
pool.shutdown();
}
}
输出:
execute 执行
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
submit 执行
submit 执行(get)
捕获异常: java.lang.ArithmeticException: / by zero
对比总结
| 特性 | execute | submit |
|---|---|---|
| 参数 | 只能Runnable | Runnable/Callable |
| 返回值 | void | Future |
| 异常处理 | 直接抛出 | 吞掉异常,需要get()获取 |
| 底层 | 直接执行 | 包装成FutureTask再execute |
CompletableFuture入门
为什么需要CompletableFuture
Future的局限性:
Future的缺点:
1. get()阻塞:无法实现真正的异步回调
2. 不能组合:无法链式调用,无法组合多个Future
3. 不能手动完成:无法主动设置结果
4. 异常处理困难:需要try-catch
5. 无法取消:cancel()不够灵活
CompletableFuture的优势:
CompletableFuture(JDK 8):
1. ✅ 支持异步回调(thenApply、thenAccept等)
2. ✅ 支持链式调用
3. ✅ 支持组合多个异步任务(thenCompose、thenCombine等)
4. ✅ 支持手动完成(complete)
5. ✅ 更好的异常处理(exceptionally、handle)
CompletableFuture类图
CompletableFuture继承关系:
┌──────────┐
│ Future │ ← 提供get()、cancel()等
└──────────┘
↑
│
┌────┴────────────┐
│CompletionStage │ ← 提供异步编排能力
└─────────────────┘
↑
│ 同时实现
┌────┴──────────────┐
│CompletableFuture │ ← 强大的异步编程工具
└───────────────────┘
创建CompletableFuture
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// ===== 1. 手动创建(需要手动complete) =====
CompletableFuture<String> future1 = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(1000);
future1.complete("手动完成"); // 手动设置结果
} catch (Exception e) {
future1.completeExceptionally(e); // 手动设置异常
}
}).start();
System.out.println(future1.get()); // 手动完成
// ===== 2. runAsync:无返回值 =====
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("runAsync执行");
});
future2.get(); // null
// ===== 3. supplyAsync:有返回值 =====
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync执行");
return "返回结果";
});
System.out.println(future3.get()); // 返回结果
// ===== 4. 指定线程池 =====
ExecutorService pool = Executors.newFixedThreadPool(2);
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
System.out.println("使用自定义线程池: " +
Thread.currentThread().getName());
return "自定义线程池";
}, pool);
System.out.println(future4.get());
pool.shutdown();
}
}
CompletableFuture核心API
回调方法
thenApply(转换结果)
public class ThenApplyDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("执行任务1");
return 100;
})
.thenApply(result -> {
System.out.println("执行任务2,上一步结果: " + result);
return result * 2;
})
.thenApply(result -> {
System.out.println("执行任务3,上一步结果: " + result);
return "最终结果: " + result;
});
System.out.println(future.get());
}
}
输出:
执行任务1
执行任务2,上一步结果: 100
执行任务3,上一步结果: 200
最终结果: 200
thenAccept(消费结果)
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(result -> {
System.out.println("消费结果: " + result); // 无返回值
});
thenRun(不关心结果)
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> {
System.out.println("任务完成,不关心结果"); // 无参数,无返回值
});
组合方法
thenCompose(串行组合)
public class ThenComposeDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("任务1");
return "Hello";
})
.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
System.out.println("任务2,上一步结果: " + result);
return result + " World";
}));
System.out.println(future.get()); // Hello World
}
}
thenCombine(并行组合)
public class ThenCombineDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1");
try { Thread.sleep(1000); } catch (Exception e) {}
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2");
try { Thread.sleep(2000); } catch (Exception e) {}
return 200;
});
CompletableFuture<Integer> result = future1.thenCombine(future2, (r1, r2) -> {
System.out.println("合并结果: " + r1 + " + " + r2);
return r1 + r2;
});
System.out.println("最终结果: " + result.get()); // 300
}
}
allOf(等待所有完成)
public class AllOfDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (Exception e) {}
return "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (Exception e) {}
return "任务2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(3000); } catch (Exception e) {}
return "任务3";
});
// 等待所有任务完成
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
future1, future2, future3
);
allFuture.get(); // 阻塞3秒
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
}
}
anyOf(等待任意完成)
public class AnyOfDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(3000); } catch (Exception e) {}
return "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (Exception e) {}
return "任务2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (Exception e) {}
return "任务3";
});
// 等待任意一个完成
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
future1, future2, future3
);
System.out.println("最快完成的任务: " + anyFuture.get()); // 任务2
}
}
异常处理
exceptionally
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "正常结果";
})
.exceptionally(ex -> {
System.out.println("异常处理: " + ex.getMessage());
return "默认值";
});
System.out.println(future.get()); // 默认值
handle(同时处理结果和异常)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "正常结果";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex.getMessage());
return "异常处理后的值";
}
return result;
});
System.out.println(future.get()); // 异常处理后的值
方法后缀
方法命名规则:
1. 无后缀(thenApply):
└─ 由上一步的线程执行
2. Async后缀(thenApplyAsync):
└─ 由ForkJoinPool.commonPool()执行
3. Async+Executor后缀(thenApplyAsync(fn, executor)):
└─ 由自定义线程池执行
实战应用
应用1:商品详情页聚合
public class ProductDetailDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 并行查询多个接口
CompletableFuture<String> baseInfoFuture = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "基本信息";
}, pool);
CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "价格信息";
}, pool);
CompletableFuture<String> inventoryFuture = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "库存信息";
}, pool);
CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> {
sleep(1800);
return "评论信息";
}, pool);
// 等待所有接口返回
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
baseInfoFuture, priceFuture, inventoryFuture, reviewFuture
);
allFuture.get();
// 聚合结果
String result = String.format("商品详情:%s, %s, %s, %s",
baseInfoFuture.get(),
priceFuture.get(),
inventoryFuture.get(),
reviewFuture.get());
System.out.println(result);
System.out.println("总耗时: " + (System.currentTimeMillis() - start) + "ms");
pool.shutdown();
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (Exception e) {}
}
}
输出:
商品详情:基本信息, 价格信息, 库存信息, 评论信息
总耗时: 2005ms ← 并行执行,只需等待最长的2秒
应用2:异步回调链
public class AsyncCallbackDemo {
public static void main(String[] args) throws Exception {
CompletableFuture
.supplyAsync(() -> {
System.out.println("1. 查询用户ID");
return "user123";
})
.thenApplyAsync(userId -> {
System.out.println("2. 根据用户ID查询用户信息: " + userId);
sleep(1000);
return "User(id=" + userId + ", name=张三)";
})
.thenApplyAsync(userInfo -> {
System.out.println("3. 根据用户信息查询订单: " + userInfo);
sleep(1000);
return "订单列表[10条]";
})
.thenAccept(orders -> {
System.out.println("4. 显示订单: " + orders);
})
.exceptionally(ex -> {
System.out.println("异常处理: " + ex.getMessage());
return null;
})
.get(); // 阻塞等待完成
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (Exception e) {}
}
}
应用3:超时控制
public class TimeoutDemo {
public static void main(String[] args) {
try {
String result = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(3000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "正常结果";
})
.completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS) // JDK 9+
.get();
System.out.println(result); // 超时默认值
} catch (Exception e) {
e.printStackTrace();
}
}
}
🎯 知识点总结
Future vs CompletableFuture
| 特性 | Future | CompletableFuture |
|---|---|---|
| 阻塞获取 | get()阻塞 | get()阻塞,但支持异步回调 |
| 回调 | ❌ | ✅ thenApply、thenAccept等 |
| 组合 | ❌ | ✅ thenCompose、thenCombine等 |
| 异常处理 | try-catch | exceptionally、handle |
| 手动完成 | ❌ | ✅ complete |
| 并行 | ❌ | ✅ allOf、anyOf |
CompletableFuture核心方法
创建:
├─ supplyAsync(有返回值)
└─ runAsync(无返回值)
转换:
├─ thenApply(转换结果)
├─ thenAccept(消费结果)
└─ thenRun(不关心结果)
组合:
├─ thenCompose(串行)
├─ thenCombine(并行)
├─ allOf(等待所有)
└─ anyOf(等待任意)
异常:
├─ exceptionally
└─ handle
💡 常见面试题
Q1:FutureTask的run()方法返回值是void,为什么get()能获取到结果?
答:因为FutureTask有一个成员变量outcome,run()方法执行完后会把结果保存到outcome中,get()方法直接返回outcome的值。
Q2:多个线程同时调用get()会怎样?
答:多个线程调用get()会被封装成WaitNode加入到waiters链表(栈结构)中阻塞等待。任务完成后,finishCompletion()会遍历waiters链表,依次唤醒所有等待的线程。
Q3:submit()和execute()的区别?
答:
- execute只能提交Runnable,submit可以提交Runnable/Callable
- execute无返回值,submit返回Future
- execute直接抛出异常,submit吞掉异常(需要通过get()获取)
- submit底层是将任务包装成FutureTask,然后调用execute执行
Q4:CompletableFuture比Future强在哪里?
答:
- 支持异步回调(thenApply、thenAccept等),不需要阻塞等待
- 支持任务组合(thenCompose、thenCombine)
- 支持并行等待(allOf、anyOf)
- 更好的异常处理(exceptionally、handle)
- 支持手动完成(complete)
Q5:FutureTask的状态有哪些?
答:7种状态:
- NEW(新建)
- COMPLETING(完成中)
- NORMAL(正常完成)
- EXCEPTIONAL(异常)
- CANCELLED(取消)
- INTERRUPTING(中断中)
- INTERRUPTED(已中断)
更多推荐


所有评论(0)