Future和CompletableFuture详解

Future是Java异步编程的核心,CompletableFuture是Java 8引入的强大异步编程工具。

Future接口详解

什么是Future

Future:表示一个异步计算的结果,提供了检查计算是否完成、等待计算完成、获取计算结果的方法。

核心思想

同步调用:
main线程 ──────→ 调用方法 ──────→ 等待返回 ──────→ 继续执行
                    |_____耗时操作_____|

异步调用(Future):
main线程 ──────→ 提交任务 ──────→ 继续执行其他逻辑 ──────→ get()获取结果
                      ↓
              工作线程执行任务(异步)

Future接口定义

public interface Future<V> {
    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 任务是否被取消
    boolean isCancelled();
    
    // 任务是否完成
    boolean isDone();
    
    // 获取结果(阻塞)
    V get() throws InterruptedException, ExecutionException;
    
    // 获取结果(超时)
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future的使用

public class FutureDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        
        // 提交Callable任务,返回Future
        Future<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("执行任务中...");
                Thread.sleep(2000);
                return "任务结果";
            }
        });
        
        System.out.println("继续执行main线程的其他逻辑...");
        
        // 检查任务状态
        System.out.println("任务是否完成: " + future.isDone());  // false
        System.out.println("任务是否取消: " + future.isCancelled());  // false
        
        // 阻塞获取结果
        System.out.println("获取结果: " + future.get());  // 阻塞2秒
        
        System.out.println("任务是否完成: " + future.isDone());  // true
        
        pool.shutdown();
    }
}

输出

继续执行main线程的其他逻辑...
任务是否完成: false
任务是否取消: false
执行任务中...
获取结果: 任务结果
任务是否完成: true

FutureTask深度解析

FutureTask类图

FutureTask继承关系:
    
         ┌────────────┐
         │  Runnable  │  ← 可以作为Thread的target
         └────────────┘
                ↑
                │
         ┌──────┴──────┐
         │RunnableFuture│
         └──────┬──────┘
                ↑
                │ 同时实现
         ┌──────┴──────┐
         │   Future    │  ← 提供Future的所有方法
         └─────────────┘
                ↑
                │
         ┌──────┴──────┐
         │ FutureTask  │  ← 包装Callable/Runnable
         └─────────────┘

FutureTask核心成员

public class FutureTask<V> implements RunnableFuture<V> {
    
    // ===== 任务状态(7种) =====
    private volatile int state;
    private static final int NEW          = 0;  // 新建
    private static final int COMPLETING   = 1;  // 完成中
    private static final int NORMAL       = 2;  // 正常完成
    private static final int EXCEPTIONAL  = 3;  // 异常
    private static final int CANCELLED    = 4;  // 取消
    private static final int INTERRUPTING = 5;  // 中断中
    private static final int INTERRUPTED  = 6;  // 已中断
    
    // ===== 核心成员变量 =====
    private Callable<V> callable;        // 任务对象
    private Object outcome;              // 返回结果或异常
    private volatile Thread runner;      // 执行任务的线程
    private volatile WaitNode waiters;   // get()阻塞线程队列(栈结构)
    
    // ===== 等待节点 =====
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
}

状态转换图

┌─────────────────────────────────────────────────────────────┐
│              FutureTask状态转换详细图                        │
└─────────────────────────────────────────────────────────────┘

                        ┌─────────┐
                        │   NEW   │ ← 初始状态
                        │ (state=0)│
                        └────┬────┘
                             │
                ┌────────────┼────────────┐
                │            │            │
                ▼            ▼            ▼
         ┌──────────┐  ┌──────────┐  ┌──────────┐
         │ 正常执行  │  │ 异常执行  │  │  取消    │
         └──────────┘  └──────────┘  └──────────┘
                │            │            │
                ▼            ▼            ├─────────┐
        ┌────────────┐ ┌────────────┐    │         │
        │ COMPLETING │ │ COMPLETING │    │         │
        │ (state=1)  │ │ (state=1)  │    ▼         ▼
        └─────┬──────┘ └─────┬──────┘ ┌─────┐  ┌───────────┐
              │              │        │CANCE│  │INTERRUPTING│
              ▼              ▼        │LLED │  │(state=3)   │
        ┌──────────┐   ┌──────────┐  │(2)  │  └──────┬────┘
        │  NORMAL  │   │EXCEPTION-│  └─────┘         │
        │(state=2) │   │AL(state=3)│                 ▼
        └──────────┘   └──────────┘         ┌────────────┐
            ✅              ❌                │INTERRUPTED │
         正常完成        异常完成             │ (state=4)  │
                                            └────────────┘
                                                 ❌
                                             取消并中断

【状态码】:
0 = NEW          初始状态
1 = COMPLETING   执行中(过渡态)
2 = NORMAL       正常完成
3 = EXCEPTIONAL  异常完成
4 = CANCELLED    被取消(无中断)
5 = INTERRUPTING 中断中(过渡态)
6 = INTERRUPTED  已中断

构造方法

// 1. 传入Callable
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;  // 初始状态为NEW
}

// 2. 传入Runnable + 返回值
public FutureTask(Runnable runnable, V result) {
    // 使用适配器模式将Runnable转换为Callable
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
}

// Runnable适配器
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    
    public T call() {
        task.run();  // 执行Runnable的run方法
        return result;  // 返回指定的结果
    }
}

run()方法源码分析

public void run() {
    // 条件1:state != NEW说明任务已被执行或取消
    // 条件2:CAS设置runner为当前线程,失败说明被其他线程抢占
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 【核心】执行call()方法,获取结果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 设置异常
                setException(ex);
            }
            if (ran)
                // 设置结果
                set(result);
        }
    } finally {
        runner = null;  // help GC
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

run()方法详细流程图

┌──────────────────────────────────────────────┐
│          FutureTask#run()                    │
│          任务执行流程                         │
└────────────────────┬─────────────────────────┘
                     │
                     ▼
        ┌─────────────────────────┐
        │ state != NEW?           │ ← 任务已执行/取消?
        └──────┬─────────┬────────┘
              YES       NO
               │         │
               ▼         ▼
          ┌────────┐  ┌──────────────────────┐
          │ return │  │ CAS设置runner        │
          │  任务  │  │ runner: null → 当前线程│
          │  已结束│  └──────┬──────┬────────┘
          └────────┘        失败   成功
                             │     │
                             ▼     ▼
                        ┌────────┐ ┌──────────────┐
                        │ return │ │ double-check │
                        │  被抢占│ │ c != null && │
                        └────────┘ │ state == NEW │
                                   └──────┬───────┘
                                          │
                                          ▼
                                   ┌──────────────┐
                                   │ result =     │
                                   │ c.call()     │ ← 核心执行
                                   └───┬──────┬───┘
                                     成功   异常
                                      │     │
                                      ▼     ▼
                          ┌──────────┐  ┌──────────────┐
                          │ran = true│  │ran = false   │
                          │          │  │result = null │
                          └────┬─────┘  └──────┬───────┘
                               │               │
                               ▼               ▼
                     ┌──────────────┐  ┌──────────────────┐
                     │ set(result)  │  │ setException(ex) │
                     ├──────────────┤  ├──────────────────┤
                     │ 1. CAS:      │  │ 1. CAS:          │
                     │  NEW→        │  │  NEW→            │
                     │  COMPLETING  │  │  COMPLETING      │
                     │ 2. outcome=v │  │ 2. outcome=ex    │
                     │ 3. state=    │  │ 3. state=        │
                     │  NORMAL      │  │  EXCEPTIONAL     │
                     │ 4. finish-   │  │ 4. finish-       │
                     │  Completion()│  │  Completion()    │
                     └──────┬───────┘  └──────┬───────────┘
                            │                 │
                            └────────┬────────┘
                                     ▼
                         ┌───────────────────────┐
                         │ finishCompletion()    │
                         │ 唤醒所有get()阻塞线程  │
                         ├───────────────────────┤
                         │ 1. 遍历waiters链表     │
                         │ 2. LockSupport.unpark│
                         │    每个等待线程        │
                         └───────────────────────┘

【关键点】:
✅ CAS保证只有一个线程执行
✅ COMPLETING是过渡状态
✅ outcome保存结果或异常
✅ finishCompletion唤醒所有等待线程

set()方法:设置正常结果

protected void set(V v) {
    // CAS设置状态:NEW → COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;  // 【保存结果】
        // 设置最终状态:COMPLETING → NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        // 唤醒所有等待的线程
        finishCompletion();
    }
}

setException()方法:设置异常结果

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;  // 保存异常
        // 设置最终状态:COMPLETING → EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
        finishCompletion();
    }
}

finishCompletion()方法:唤醒所有get()阻塞的线程

private void finishCompletion() {
    // 遍历waiters链表(栈)
    for (WaitNode q; (q = waiters) != null;) {
        // CAS设置waiters为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 唤醒等待线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;  // help GC
                q = next;
            }
            break;
        }
    }
    done();  // 钩子方法
    callable = null;  // help GC
}

get()方法源码分析

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果任务未完成,阻塞等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 返回结果
    return report(s);
}

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

awaitDone()方法:阻塞等待(核心方法)

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    
    // 【三次自旋】
    for (;;) {
        // 检查中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        
        int s = state;
        // 任务已完成,返回状态
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 任务即将完成,让出CPU
        else if (s == COMPLETING)
            Thread.yield();
        // 【第1次自旋】创建WaitNode
        else if (q == null)
            q = new WaitNode();
        // 【第2次自旋】入队(头插法)
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 【第3次自旋】阻塞
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 【阻塞当前线程】
            LockSupport.park(this);
    }
}

WaitNode队列图示

waiters(栈结构,头插头取):

初始状态:
waiters → null

线程1调用get():
waiters → [thread1] → null

线程2调用get():
waiters → [thread2] → [thread1] → null

线程3调用get():
waiters → [thread3] → [thread2] → [thread1] → null

任务完成后,finishCompletion()依次唤醒:
thread3 → thread2 → thread1

report()方法:返回结果或抛出异常

private V report(int s) throws ExecutionException {
    Object x = outcome;  // 获取结果
    if (s == NORMAL)
        return (V)x;  // 正常返回
    if (s >= CANCELLED)
        throw new CancellationException();  // 任务被取消
    throw new ExecutionException((Throwable)x);  // 抛出异常
}

cancel()方法源码分析

public boolean cancel(boolean mayInterruptIfRunning) {
    // CAS修改状态:NEW → INTERRUPTING/CANCELLED
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    // 中断执行任务的线程
                    t.interrupt();
            } finally {
                // 设置最终状态:INTERRUPTING → INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒所有get()阻塞的线程
        finishCompletion();
    }
    return true;
}

FutureTask完整示例

public class FutureTaskDemo {
    public static void main(String[] args) throws Exception {
        // 创建FutureTask
        FutureTask<Integer> task = new FutureTask<>(() -> {
            System.out.println("任务开始执行...");
            Thread.sleep(2000);
            return 100;
        });
        
        // 启动线程执行任务
        new Thread(task, "t1").start();
        
        // 多个线程可以同时get
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + 
                                 " 获取结果: " + task.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();
        
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + 
                                 " 获取结果: " + task.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t3").start();
        
        // main线程获取结果
        System.out.println("main 获取结果: " + task.get());
    }
}

输出

任务开始执行...
main 获取结果: 100
t2 获取结果: 100
t3 获取结果: 100

submit vs execute

方法对比

public interface ExecutorService {
    // 1. execute:只能提交Runnable,无返回值
    void execute(Runnable command);
    
    // 2. submit:可以提交Runnable/Callable,返回Future
    Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);
    <T> Future<T> submit(Callable<T> task);
}

submit源码分析

// AbstractExecutorService
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 包装成FutureTask,返回值为null
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);  // 调用execute执行
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    // 包装成FutureTask,返回值为指定的result
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 包装成FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

异常处理对比

public class ExecuteVsSubmitDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        
        // ===== execute:直接抛出异常 =====
        pool.execute(() -> {
            System.out.println("execute 执行");
            int i = 1 / 0;  // 异常会被打印到控制台
        });
        
        Thread.sleep(100);
        
        // ===== submit:吞掉异常 =====
        pool.submit(() -> {
            System.out.println("submit 执行");
            int i = 1 / 0;  // 异常被吞掉,不会打印
        });
        
        Thread.sleep(100);
        
        // ===== submit:通过get()获取异常 =====
        Future<?> future = pool.submit(() -> {
            System.out.println("submit 执行(get)");
            int i = 1 / 0;
        });
        
        try {
            future.get();  // 调用get()时才抛出异常
        } catch (ExecutionException e) {
            System.out.println("捕获异常: " + e.getCause());
        }
        
        pool.shutdown();
    }
}

输出

execute 执行
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
submit 执行
submit 执行(get)
捕获异常: java.lang.ArithmeticException: / by zero

对比总结

特性 execute submit
参数 只能Runnable Runnable/Callable
返回值 void Future
异常处理 直接抛出 吞掉异常,需要get()获取
底层 直接执行 包装成FutureTask再execute

CompletableFuture入门

为什么需要CompletableFuture

Future的局限性

Future的缺点:
1. get()阻塞:无法实现真正的异步回调
2. 不能组合:无法链式调用,无法组合多个Future
3. 不能手动完成:无法主动设置结果
4. 异常处理困难:需要try-catch
5. 无法取消:cancel()不够灵活

CompletableFuture的优势

CompletableFuture(JDK 8):
1. ✅ 支持异步回调(thenApply、thenAccept等)
2. ✅ 支持链式调用
3. ✅ 支持组合多个异步任务(thenCompose、thenCombine等)
4. ✅ 支持手动完成(complete)
5. ✅ 更好的异常处理(exceptionally、handle)

CompletableFuture类图

CompletableFuture继承关系:

    ┌──────────┐
    │  Future  │  ← 提供get()、cancel()等
    └──────────┘
         ↑
         │
    ┌────┴────────────┐
    │CompletionStage  │  ← 提供异步编排能力
    └─────────────────┘
         ↑
         │ 同时实现
    ┌────┴──────────────┐
    │CompletableFuture  │  ← 强大的异步编程工具
    └───────────────────┘

创建CompletableFuture

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // ===== 1. 手动创建(需要手动complete) =====
        CompletableFuture<String> future1 = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                future1.complete("手动完成");  // 手动设置结果
            } catch (Exception e) {
                future1.completeExceptionally(e);  // 手动设置异常
            }
        }).start();
        System.out.println(future1.get());  // 手动完成
        
        // ===== 2. runAsync:无返回值 =====
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            System.out.println("runAsync执行");
        });
        future2.get();  // null
        
        // ===== 3. supplyAsync:有返回值 =====
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyAsync执行");
            return "返回结果";
        });
        System.out.println(future3.get());  // 返回结果
        
        // ===== 4. 指定线程池 =====
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("使用自定义线程池: " + 
                             Thread.currentThread().getName());
            return "自定义线程池";
        }, pool);
        System.out.println(future4.get());
        pool.shutdown();
    }
}

CompletableFuture核心API

回调方法

thenApply(转换结果)
public class ThenApplyDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("执行任务1");
                return 100;
            })
            .thenApply(result -> {
                System.out.println("执行任务2,上一步结果: " + result);
                return result * 2;
            })
            .thenApply(result -> {
                System.out.println("执行任务3,上一步结果: " + result);
                return "最终结果: " + result;
            });
        
        System.out.println(future.get());
    }
}

输出

执行任务1
执行任务2,上一步结果: 100
执行任务3,上一步结果: 200
最终结果: 200
thenAccept(消费结果)
CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(result -> {
        System.out.println("消费结果: " + result);  // 无返回值
    });
thenRun(不关心结果)
CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> {
        System.out.println("任务完成,不关心结果");  // 无参数,无返回值
    });

组合方法

thenCompose(串行组合)
public class ThenComposeDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("任务1");
                return "Hello";
            })
            .thenCompose(result -> CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2,上一步结果: " + result);
                return result + " World";
            }));
        
        System.out.println(future.get());  // Hello World
    }
}
thenCombine(并行组合)
public class ThenCombineDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1");
            try { Thread.sleep(1000); } catch (Exception e) {}
            return 100;
        });
        
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2");
            try { Thread.sleep(2000); } catch (Exception e) {}
            return 200;
        });
        
        CompletableFuture<Integer> result = future1.thenCombine(future2, (r1, r2) -> {
            System.out.println("合并结果: " + r1 + " + " + r2);
            return r1 + r2;
        });
        
        System.out.println("最终结果: " + result.get());  // 300
    }
}
allOf(等待所有完成)
public class AllOfDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (Exception e) {}
            return "任务1";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(2000); } catch (Exception e) {}
            return "任务2";
        });
        
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(3000); } catch (Exception e) {}
            return "任务3";
        });
        
        // 等待所有任务完成
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(
            future1, future2, future3
        );
        
        allFuture.get();  // 阻塞3秒
        
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
    }
}
anyOf(等待任意完成)
public class AnyOfDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(3000); } catch (Exception e) {}
            return "任务1";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (Exception e) {}
            return "任务2";
        });
        
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(2000); } catch (Exception e) {}
            return "任务3";
        });
        
        // 等待任意一个完成
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
            future1, future2, future3
        );
        
        System.out.println("最快完成的任务: " + anyFuture.get());  // 任务2
    }
}

异常处理

exceptionally
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("出错了");
        return "正常结果";
    })
    .exceptionally(ex -> {
        System.out.println("异常处理: " + ex.getMessage());
        return "默认值";
    });

System.out.println(future.get());  // 默认值
handle(同时处理结果和异常)
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("出错了");
        return "正常结果";
    })
    .handle((result, ex) -> {
        if (ex != null) {
            System.out.println("异常: " + ex.getMessage());
            return "异常处理后的值";
        }
        return result;
    });

System.out.println(future.get());  // 异常处理后的值

方法后缀

方法命名规则:

1. 无后缀(thenApply):
   └─ 由上一步的线程执行

2. Async后缀(thenApplyAsync):
   └─ 由ForkJoinPool.commonPool()执行

3. Async+Executor后缀(thenApplyAsync(fn, executor)):
   └─ 由自定义线程池执行

实战应用

应用1:商品详情页聚合

public class ProductDetailDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        
        long start = System.currentTimeMillis();
        
        // 并行查询多个接口
        CompletableFuture<String> baseInfoFuture = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "基本信息";
        }, pool);
        
        CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {
            sleep(2000);
            return "价格信息";
        }, pool);
        
        CompletableFuture<String> inventoryFuture = CompletableFuture.supplyAsync(() -> {
            sleep(1500);
            return "库存信息";
        }, pool);
        
        CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> {
            sleep(1800);
            return "评论信息";
        }, pool);
        
        // 等待所有接口返回
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(
            baseInfoFuture, priceFuture, inventoryFuture, reviewFuture
        );
        
        allFuture.get();
        
        // 聚合结果
        String result = String.format("商品详情:%s, %s, %s, %s",
                                    baseInfoFuture.get(),
                                    priceFuture.get(),
                                    inventoryFuture.get(),
                                    reviewFuture.get());
        
        System.out.println(result);
        System.out.println("总耗时: " + (System.currentTimeMillis() - start) + "ms");
        
        pool.shutdown();
    }
    
    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (Exception e) {}
    }
}

输出

商品详情:基本信息, 价格信息, 库存信息, 评论信息
总耗时: 2005ms  ← 并行执行,只需等待最长的2秒

应用2:异步回调链

public class AsyncCallbackDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture
            .supplyAsync(() -> {
                System.out.println("1. 查询用户ID");
                return "user123";
            })
            .thenApplyAsync(userId -> {
                System.out.println("2. 根据用户ID查询用户信息: " + userId);
                sleep(1000);
                return "User(id=" + userId + ", name=张三)";
            })
            .thenApplyAsync(userInfo -> {
                System.out.println("3. 根据用户信息查询订单: " + userInfo);
                sleep(1000);
                return "订单列表[10条]";
            })
            .thenAccept(orders -> {
                System.out.println("4. 显示订单: " + orders);
            })
            .exceptionally(ex -> {
                System.out.println("异常处理: " + ex.getMessage());
                return null;
            })
            .get();  // 阻塞等待完成
    }
    
    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (Exception e) {}
    }
}

应用3:超时控制

public class TimeoutDemo {
    public static void main(String[] args) {
        try {
            String result = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        Thread.sleep(3000);  // 模拟耗时操作
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "正常结果";
                })
                .completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS)  // JDK 9+
                .get();
            
            System.out.println(result);  // 超时默认值
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

🎯 知识点总结

Future vs CompletableFuture

特性 Future CompletableFuture
阻塞获取 get()阻塞 get()阻塞,但支持异步回调
回调 ✅ thenApply、thenAccept等
组合 ✅ thenCompose、thenCombine等
异常处理 try-catch exceptionally、handle
手动完成 ✅ complete
并行 ✅ allOf、anyOf

CompletableFuture核心方法

创建:
├─ supplyAsync(有返回值)
└─ runAsync(无返回值)

转换:
├─ thenApply(转换结果)
├─ thenAccept(消费结果)
└─ thenRun(不关心结果)

组合:
├─ thenCompose(串行)
├─ thenCombine(并行)
├─ allOf(等待所有)
└─ anyOf(等待任意)

异常:
├─ exceptionally
└─ handle

💡 常见面试题

Q1:FutureTask的run()方法返回值是void,为什么get()能获取到结果?

:因为FutureTask有一个成员变量outcome,run()方法执行完后会把结果保存到outcome中,get()方法直接返回outcome的值。

Q2:多个线程同时调用get()会怎样?

:多个线程调用get()会被封装成WaitNode加入到waiters链表(栈结构)中阻塞等待。任务完成后,finishCompletion()会遍历waiters链表,依次唤醒所有等待的线程。

Q3:submit()和execute()的区别?

  1. execute只能提交Runnable,submit可以提交Runnable/Callable
  2. execute无返回值,submit返回Future
  3. execute直接抛出异常,submit吞掉异常(需要通过get()获取)
  4. submit底层是将任务包装成FutureTask,然后调用execute执行

Q4:CompletableFuture比Future强在哪里?

  1. 支持异步回调(thenApply、thenAccept等),不需要阻塞等待
  2. 支持任务组合(thenCompose、thenCombine)
  3. 支持并行等待(allOf、anyOf)
  4. 更好的异常处理(exceptionally、handle)
  5. 支持手动完成(complete)

Q5:FutureTask的状态有哪些?

:7种状态:

  • NEW(新建)
  • COMPLETING(完成中)
  • NORMAL(正常完成)
  • EXCEPTIONAL(异常)
  • CANCELLED(取消)
  • INTERRUPTING(中断中)
  • INTERRUPTED(已中断)
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐