写在前面

在现代高并发系统中,异步编程已经成为标配。Netty作为高性能网络框架,其核心设计理念就是"一切皆异步"——连接建立是异步的、数据读写是异步的、甚至关闭连接也是异步的。

这篇文章将深入剖析Netty的异步编程模型,从JDK Future的局限性说起,到ChannelFuture和Promise的实现原理,再到异步回调的链式编程,全面揭示Netty异步编程的精髓。

一、为什么需要异步编程

1.1 同步编程的困境

传统的同步网络编程模型:

// 同步阻塞模式
public class SyncServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        while (true) {
            Socket socket = serverSocket.accept();  // 阻塞等待连接
            
            InputStream in = socket.getInputStream();
            byte[] buffer = new byte[1024];
            int len = in.read(buffer);  // 阻塞等待数据
            
            // 处理请求
            byte[] response = process(buffer, len);
            
            OutputStream out = socket.getOutputStream();
            out.write(response);  // 阻塞发送响应
        }
    }
}

同步模式的问题:

  • 线程阻塞:每个操作都会阻塞当前线程
  • 资源浪费:阻塞期间线程无法执行其他任务
  • 扩展性差:并发量受限于线程数量

1.2 异步编程的优势

异步模式下,操作发起后立即返回,结果通过回调通知:

// Netty异步模式
public class AsyncClient {
    public void connect(String host, int port) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new MyHandler());
                    }
                });
        
        // 异步连接,立即返回
        ChannelFuture future = bootstrap.connect(host, port);
        
        // 添加回调,连接成功后执行
        future.addListener((ChannelFutureListener) f -> {
            if (f.isSuccess()) {
                System.out.println("连接成功");
                // 发送数据
                f.channel().writeAndFlush("Hello");
            } else {
                System.err.println("连接失败: " + f.cause());
            }
        });
        
        // 主线程可以继续执行其他任务
        System.out.println("连接请求已发起");
    }
}

异步模式的优势:

  • 非阻塞:操作立即返回,线程可以执行其他任务
  • 高吞吐:少量线程即可处理大量并发
  • 资源高效:线程利用率高,减少上下文切换

二、JDK Future的局限性

2.1 JDK Future接口

JDK 5引入了Future接口,代表一个异步计算的结果:

public interface Future<V> {
    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 是否已取消
    boolean isCancelled();
    
    // 是否已完成
    boolean isDone();
    
    // 获取结果(阻塞)
    V get() throws InterruptedException, ExecutionException;
    
    // 获取结果(超时阻塞)
    V get(long timeout, TimeUnit unit) 
            throws InterruptedException, ExecutionException, TimeoutException;
}

2.2 使用示例

ExecutorService executor = Executors.newFixedThreadPool(10);

// 提交任务,立即返回Future
Future<String> future = executor.submit(() -> {
    Thread.sleep(1000);  // 模拟耗时操作
    return "Hello, Future!";
});

// 做其他事情
System.out.println("任务已提交");

// 获取结果(阻塞)
String result = future.get();  // 阻塞等待结果
System.out.println(result);

2.3 JDK Future的问题

问题一:获取结果必须阻塞

// 只能通过get()获取结果,而get()是阻塞的
String result = future.get();  // 阻塞当前线程

无法在任务完成后自动收到通知,必须主动轮询或阻塞等待。

问题二:无法设置结果

Future是只读的,只有任务执行者可以设置结果,外部无法手动设置:

// 无法手动设置Future的结果
future.setResult("手动设置");  // 编译错误!没有这个方法

问题三:不支持链式调用

// 想要在第一个任务完成后执行第二个任务,需要手动编排
Future<String> future1 = executor.submit(task1);
String result1 = future1.get();  // 阻塞
Future<String> future2 = executor.submit(() -> task2(result1));
String result2 = future2.get();  // 再次阻塞

问题四:异常处理不便

try {
    String result = future.get();
} catch (ExecutionException e) {
    // 异常被包装在ExecutionException中
    Throwable cause = e.getCause();  // 需要解包获取真实异常
}

2.4 JDK 8 CompletableFuture的改进

JDK 8引入了CompletableFuture,提供了更丰富的异步编程能力:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

// 链式调用
future.thenApply(s -> s + " World")
      .thenAccept(System.out::println);  // 输出: Hello World

// 异常处理
future.exceptionally(ex -> "Error: " + ex.getMessage());

但CompletableFuture是为通用异步计算设计的,没有针对网络I/O场景优化。Netty的ChannelFuture和Promise则专门为网络编程量身定制。

三、ChannelFuture源码分析

3.1 ChannelFuture接口

ChannelFuture是Netty对JDK Future的扩展:

public interface ChannelFuture extends Future<Void> {
    
    // 获取关联的Channel
    Channel channel();
    
    // 是否成功(非阻塞)
    @Override
    boolean isSuccess();
    
    // 是否可取消
    @Override
    boolean isCancellable();
    
    // 获取异常原因
    @Override
    Throwable cause();
    
    // 添加监听器
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    
    // 添加多个监听器
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    
    // 移除监听器
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    
    // 等待完成(阻塞)
    @Override
    ChannelFuture await() throws InterruptedException;
    
    // 等待完成(不可中断)
    @Override
    ChannelFuture awaitUninterruptibly();
    
    // 同步等待(阻塞,可抛出异常)
    @Override
    ChannelFuture sync() throws InterruptedException;
    
    // 同步等待(不可中断)
    @Override
    ChannelFuture syncUninterruptibly();
}

3.2 核心方法对比

方法

JDK Future

ChannelFuture

说明

获取结果

get() 阻塞

await() 阻塞

ChannelFuture不返回结果值

检查完成

isDone()

isDone()

相同

检查成功

isSuccess()

非阻塞检查

获取异常

cause()

非阻塞获取

回调通知

addListener()

核心优势

链式调用

返回this

支持链式

3.3 DefaultChannelPromise源码

DefaultChannelPromise是ChannelFuture的核心实现:

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise {
    
    private final Channel channel;
    
    public DefaultChannelPromise(Channel channel) {
        this.channel = channel;
    }
    
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
    
    @Override
    public Channel channel() {
        return channel;
    }
    
    @Override
    protected EventExecutor executor() {
        EventExecutor e = super.executor();
        if (e == null) {
            return channel().eventLoop();
        }
        return e;
    }
}

可以看到,ChannelPromise继承自DefaultPromise,核心逻辑在DefaultPromise中。

3.4 DefaultPromise源码分析

DefaultPromise是Netty异步模型的核心:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    
    // 状态常量
    private static final int SUCCESS = 0;
    private static final int FAILURE = 1;
    private static final int CANCELLED = 2;
    private static final int NOT_SIGNALLED = 3;
    
    // 结果(可能是值或异常)
    private volatile Object result;
    
    // 监听器链表
    private volatile Object listeners;
    
    // 等待线程计数
    private short waiters;
    
    // 执行器(可选)
    private final EventExecutor executor;
    
    /**
     * 判断是否成功
     */
    @Override
    public boolean isSuccess() {
        Object result = this.result;
        return result != null && !(result instanceof CauseHolder) && !(result instanceof Failure);
    }
    
    /**
     * 获取异常
     */
    @Override
    public Throwable cause() {
        Object result = this.result;
        if (result instanceof CauseHolder) {
            return ((CauseHolder) result).cause;
        }
        if (result instanceof Failure) {
            return ((Failure) result).cause;
        }
        return null;
    }
    
    /**
     * 设置成功结果
     */
    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();  // 通知所有监听器
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }
    
    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();  // 唤醒等待线程
            return true;
        }
        return false;
    }
    
    /**
     * 设置失败结果
     */
    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private boolean setFailure0(Throwable cause) {
        return setValue0(new CauseHolder(cause));
    }
    
    /**
     * 添加监听器
     */
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        Object listeners = this.listeners;
        if (listeners == null) {
            this.listeners = listener;
        } else {
            // 添加到链表
            this.listeners = new DefaultFutureListeners((DefaultFutureListeners) listeners, listener);
        }
        
        // 如果已完成,立即通知
        if (isDone()) {
            notifyListeners();
        }
        
        return this;
    }
    
    /**
     * 通知监听器
     */
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            notifyListenersNow();
        } else {
            // 在EventLoop中执行通知
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    notifyListenersNow();
                }
            });
        }
    }
    
    private void notifyListenersNow() {
        Object listeners = this.listeners;
        while (listeners != null) {
            this.listeners = null;
            
            if (listeners instanceof DefaultFutureListeners) {
                // 多个监听器
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                // 单个监听器
                notifyListener0(this, (GenericFutureListener) listeners);
            }
            
            listeners = this.listeners;  // 检查是否有新添加的监听器
        }
    }
    
    private static void notifyListener0(Future<?> future, GenericFutureListener<?> l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by {}.operationComplete()", l.getClass().getName(), t);
        }
    }
    
    /**
     * 等待完成
     */
    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }
        
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        
        // 死锁检测
        checkDeadLock();
        
        synchronized (this) {
            while (!isDone()) {
                incWaiters();  // waiters++
                try {
                    wait();  // 等待通知
                } finally {
                    decWaiters();  // waiters--
                }
            }
        }
        return this;
    }
    
    /**
     * 死锁检测
     */
    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(this);
        }
    }
}

3.5 状态流转图

┌─────────────────────────────────────────────────────────────────────────┐
│                        Promise状态流转                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│                         ┌─────────────────┐                             │
│                         │    INITIAL      │                             │
│                         │   (初始状态)     │                             │
│                         └────────┬────────┘                             │
│                                  │                                      │
│              ┌───────────────────┼───────────────────┐                  │
│              │                   │                   │                  │
│              ▼                   ▼                   ▼                  │
│   ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐        │
│   │    SUCCESS      │  │    FAILURE      │  │   CANCELLED     │        │
│   │   (成功状态)     │  │   (失败状态)     │  │   (取消状态)    │        │
│   │                 │  │                 │  │                 │        │
│   │  result = value │  │  result = cause │  │  result = null  │        │
│   └─────────────────┘  └─────────────────┘  └─────────────────┘        │
│                                                                         │
│   状态一旦设置,不可更改                                                   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

3.6 监听器通知机制

┌─────────────────────────────────────────────────────────────────────────┐
│                        监听器通知流程                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   addListener(listener)                                                 │
│          │                                                              │
│          ▼                                                              │
│   ┌─────────────────┐                                                   │
│   │  添加到监听器链  │                                                   │
│   └────────┬────────┘                                                   │
│            │                                                            │
│            ▼                                                            │
│   ┌─────────────────┐     是      ┌─────────────────┐                   │
│   │   isDone()?     │───────────▶│  立即通知监听器  │                   │
│   └────────┬────────┘             └─────────────────┘                   │
│            │ 否                                                          │
│            ▼                                                            │
│   ┌─────────────────┐                                                   │
│   │   等待完成...    │                                                   │
│   └────────┬────────┘                                                   │
│            │                                                            │
│            │ setSuccess() / setFailure()                                │
│            ▼                                                            │
│   ┌─────────────────┐                                                   │
│   │  notifyListeners│                                                   │
│   └────────┬────────┘                                                   │
│            │                                                            │
│            ▼                                                            │
│   ┌─────────────────────────────────────────────────────────┐           │
│   │              遍历监听器链,依次调用                        │           │
│   │  listener1.operationComplete() → listener2... → listenerN│           │
│   └─────────────────────────────────────────────────────────┘           │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

四、Promise:可写的Future

4.1 Promise接口

Promise继承自Future,增加了设置结果的能力:

public interface Promise<V> extends Future<V> {
    
    // 设置成功结果
    Promise<V> setSuccess(V result);
    
    // 尝试设置成功结果(不抛异常)
    boolean trySuccess(V result);
    
    // 设置失败结果
    Promise<V> setFailure(Throwable cause);
    
    // 尝试设置失败结果(不抛异常)
    boolean tryFailure(Throwable cause);
    
    // 设置为不可取消
    boolean setUncancellable();
}

4.2 Promise vs ChannelFuture

特性

ChannelFuture

Promise

可读

可写

创建者

Netty框架

开发者

使用场景

监听操作结果

手动设置结果

关系图

┌─────────────────────────────────────────────────────────────────────────┐
│                        Future继承体系                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│                      ┌───────────────┐                                  │
│                      │  Future<V>    │  (JDK,只读)                      │
│                      └───────┬───────┘                                  │
│                              │                                          │
│              ┌───────────────┴───────────────┐                          │
│              │                               │                          │
│              ▼                               ▼                          │
│      ┌───────────────┐               ┌───────────────┐                  │
│      │ ChannelFuture │               │  Promise<V>   │  (可写)           │
│      │   (只读)       │               │               │                  │
│      └───────┬───────┘               └───────┬───────┘                  │
│              │                               │                          │
│              │                               │                          │
│              ▼                               ▼                          │
│      ┌───────────────┐               ┌───────────────┐                  │
│      │ChannelPromise │◀──────────────│DefaultPromise │                  │
│      │ (读写)         │               │               │                  │
│      └───────────────┘               └───────────────┘                  │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

4.3 Promise使用场景

场景一:异步操作结果回调

public class AsyncOperation {
    
    /**
     * 执行异步操作,返回Promise
     */
    public Promise<String> doAsync() {
        DefaultPromise<String> promise = new DefaultPromise<>();
        
        // 在另一个线程中执行操作
        new Thread(() -> {
            try {
                String result = heavyOperation();
                promise.setSuccess(result);  // 设置成功结果
            } catch (Exception e) {
                promise.setFailure(e);  // 设置失败结果
            }
        }).start();
        
        return promise;
    }
    
    /**
     * 使用示例
     */
    public void example() {
        Promise<String> promise = doAsync();
        
        promise.addListener((Future<String> future) -> {
            if (future.isSuccess()) {
                System.out.println("成功: " + future.getNow());
            } else {
                System.err.println("失败: " + future.cause());
            }
        });
    }
}

场景二:RPC调用结果

public class RpcClient {
    
    private final Map<Long, Promise<RpcResponse>> pendingRequests = 
            new ConcurrentHashMap<>();
    
    /**
     * 发送RPC请求
     */
    public Promise<RpcResponse> sendRequest(RpcRequest request) {
        DefaultPromise<RpcResponse> promise = new DefaultPromise<>();
        
        // 保存待处理请求
        pendingRequests.put(request.getRequestId(), promise);
        
        // 发送请求
        channel.writeAndFlush(request);
        
        // 设置超时
        eventLoop.schedule(() -> {
            if (!promise.isDone()) {
                promise.tryFailure(new TimeoutException("Request timeout"));
                pendingRequests.remove(request.getRequestId());
            }
        }, 5, TimeUnit.SECONDS);
        
        return promise;
    }
    
    /**
     * 处理响应
     */
    public void handleResponse(RpcResponse response) {
        Promise<RpcResponse> promise = pendingRequests.remove(response.getRequestId());
        if (promise != null) {
            promise.setSuccess(response);
        }
    }
}

场景三:连接状态管理

public class ConnectionManager {
    
    private Channel channel;
    private Promise<Void> connectPromise;
    
    /**
     * 连接服务器
     */
    public Promise<Void> connect(String host, int port) {
        if (connectPromise != null && !connectPromise.isDone()) {
            return connectPromise;  // 连接进行中
        }
        
        connectPromise = new DefaultPromise<>(eventLoop);
        
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoop)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new ConnectionHandler());
                    }
                });
        
        bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                channel = future.channel();
                connectPromise.setSuccess(null);
            } else {
                connectPromise.setFailure(future.cause());
            }
        });
        
        return connectPromise;
    }
    
    /**
     * 连接处理器
     */
    private class ConnectionHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            // 连接断开,重置连接状态
            channel = null;
            connectPromise = null;
        }
    }
}

五、异步回调链式编程

5.1 GenericFutureListener

Netty通过GenericFutureListener接口实现回调:

public interface GenericFutureListener<F extends Future<?>> {
    /**
     * 操作完成时调用
     */
    void operationComplete(F future) throws Exception;
}

使用示例

ChannelFuture future = channel.writeAndFlush(message);

future.addListener(new GenericFutureListener<ChannelFuture>() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            System.out.println("发送成功");
        } else {
            System.err.println("发送失败: " + future.cause());
        }
    }
});

// Lambda简化写法
future.addListener((ChannelFutureListener) f -> {
    if (f.isSuccess()) {
        System.out.println("发送成功");
    }
});

5.2 ChannelFutureListener

Netty提供了常用的ChannelFutureListener实现:

// 关闭连接
future.addListener(ChannelFutureListener.CLOSE);

// 关闭连接失败时记录日志
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

// 触发异常传播
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);

源码

public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
    
    // 关闭连接
    ChannelFutureListener CLOSE = future -> {
        future.channel().close();
    };
    
    // 失败时关闭
    ChannelFutureListener CLOSE_ON_FAILURE = future -> {
        if (!future.isSuccess()) {
            future.channel().close();
        }
    };
    
    // 失败时触发异常
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = future -> {
        if (!future.isSuccess()) {
            future.channel().pipeline().fireExceptionCaught(future.cause());
        }
    };
}

5.3 链式异步调用

通过addListener可以实现链式异步调用:

/**
 * 链式异步调用示例
 */
public class ChainAsyncExample {
    
    public void connectAndSend(String host, int port, Object message) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class);
        
        // 1. 异步连接
        bootstrap.connect(host, port).addListener((ChannelFutureListener) connectFuture -> {
            if (!connectFuture.isSuccess()) {
                System.err.println("连接失败: " + connectFuture.cause());
                return;
            }
            
            Channel channel = connectFuture.channel();
            
            // 2. 连接成功后发送消息
            channel.writeAndFlush(message).addListener((ChannelFutureListener) sendFuture -> {
                if (!sendFuture.isSuccess()) {
                    System.err.println("发送失败: " + sendFuture.cause());
                    return;
                }
                
                System.out.println("消息发送成功");
                
                // 3. 发送成功后关闭连接
                channel.close().addListener((ChannelFutureListener) closeFuture -> {
                    System.out.println("连接已关闭");
                });
            });
        });
    }
}

5.4 封装链式调用工具

为了简化链式调用,可以封装工具类:

/**
 * 异步操作工具类
 */
public class AsyncUtils {
    
    /**
     * 顺序执行多个异步操作
     */
    public static Promise<Void> sequence(EventExecutor executor, 
            AsyncOperation... operations) {
        DefaultPromise<Void> result = new DefaultPromise<>(executor);
        
        executeSequence(executor, operations, 0, result);
        
        return result;
    }
    
    private static void executeSequence(EventExecutor executor, 
            AsyncOperation[] operations, int index, Promise<Void> result) {
        if (index >= operations.length) {
            result.setSuccess(null);
            return;
        }
        
        Promise<Void> promise = operations[index].execute();
        promise.addListener((GenericFutureListener<Future<Void>>) future -> {
            if (future.isSuccess()) {
                executeSequence(executor, operations, index + 1, result);
            } else {
                result.setFailure(future.cause());
            }
        });
    }
    
    /**
     * 并行执行多个异步操作
     */
    public static Promise<Void> parallel(EventExecutor executor, 
            Promise<?>... promises) {
        DefaultPromise<Void> result = new DefaultPromise<>(executor);
        
        AtomicInteger counter = new AtomicInteger(promises.length);
        AtomicReference<Throwable> firstError = new AtomicReference<>();
        
        for (Promise<?> promise : promises) {
            promise.addListener((GenericFutureListener<Future<?>>) future -> {
                if (!future.isSuccess()) {
                    firstError.compareAndSet(null, future.cause());
                }
                
                if (counter.decrementAndGet() == 0) {
                    Throwable error = firstError.get();
                    if (error != null) {
                        result.setFailure(error);
                    } else {
                        result.setSuccess(null);
                    }
                }
            });
        }
        
        return result;
    }
    
    /**
     * 异步操作接口
     */
    @FunctionalInterface
    public interface AsyncOperation {
        Promise<Void> execute();
    }
}

使用示例

// 顺序执行
AsyncUtils.sequence(eventLoop,
    () -> connect(host, port),
    () -> login(username, password),
    () -> sendMessage(message),
    () -> disconnect()
).addListener(future -> {
    if (future.isSuccess()) {
        System.out.println("所有操作完成");
    } else {
        System.err.println("操作失败: " + future.cause());
    }
});

// 并行执行
AsyncUtils.parallel(eventLoop,
    sendRequest1(),
    sendRequest2(),
    sendRequest3()
).addListener(future -> {
    System.out.println("所有请求完成");
});

六、sync vs await的区别

6.1 方法对比

// sync() - 同步等待,失败时抛出异常
ChannelFuture future = channel.writeAndFlush(message);
future.sync();  // 阻塞等待,如果操作失败会抛出异常

// await() - 等待完成,不抛出异常
ChannelFuture future = channel.writeAndFlush(message);
future.await();  // 阻塞等待,需要手动检查结果
if (!future.isSuccess()) {
    System.err.println("失败: " + future.cause());
}

6.2 源码分析

// DefaultPromise.sync()
@Override
public Promise<V> sync() throws InterruptedException {
    await();
    if (!isSuccess()) {
        throw new ExecutionException(this, cause());  // 失败时抛异常
    }
    return this;
}

// DefaultPromise.await()
@Override
public Promise<V> await() throws InterruptedException {
    if (isDone()) {
        return this;
    }
    // ... 等待完成,不抛异常
    return this;
}

6.3 使用场景

场景

推荐方法

原因

启动时等待服务就绪

sync()

启动失败应抛异常终止

关闭时等待资源释放

sync()

关闭失败应抛异常

业务消息发送

await()

需要自定义错误处理

RPC调用等待响应

await()

需要区分超时和失败

七、死锁问题与规避

7.1 死锁场景

在EventLoop线程中调用sync()或await()会导致死锁:

// 错误示例:在EventLoop中调用sync()
public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 在EventLoop线程中调用sync()会抛异常
        ChannelFuture future = ctx.writeAndFlush(response);
        future.sync();  // BlockingOperationException!
    }
}

7.2 Netty的死锁检测

// DefaultPromise.checkDeadLock()
protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(this);
    }
}

Netty会在await()时检测是否在EventLoop线程中调用,如果是则抛出BlockingOperationException。

7.3 正确做法

// 正确做法:使用addListener
public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.writeAndFlush(response);
        
        // 使用回调,不阻塞EventLoop
        future.addListener((ChannelFutureListener) f -> {
            if (f.isSuccess()) {
                System.out.println("发送成功");
            } else {
                System.err.println("发送失败: " + f.cause());
            }
        });
    }
}

八、实战:构建可靠的异步客户端

8.1 完整示例

/**
 * 可靠的异步客户端
 */
public class ReliableClient {
    
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private volatile Channel channel;
    private final Promise<Void> closePromise;
    
    // 重连参数
    private final String host;
    private final int port;
    private final int maxRetries;
    private final long retryInterval;
    private int retryCount = 0;
    
    public ReliableClient(String host, int port, int maxRetries, long retryInterval) {
        this.host = host;
        this.port = port;
        this.maxRetries = maxRetries;
        this.retryInterval = retryInterval;
        
        this.group = new NioEventLoopGroup();
        this.closePromise = new DefaultPromise<>(group.next());
        
        this.bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS));
                        pipeline.addLast(new MessageDecoder());
                        pipeline.addLast(new MessageEncoder());
                        pipeline.addLast(new ClientHandler());
                    }
                });
    }
    
    /**
     * 连接服务器
     */
    public Promise<Void> connect() {
        DefaultPromise<Void> connectPromise = new DefaultPromise<>(group.next());
        
        doConnect(connectPromise);
        
        return connectPromise;
    }
    
    private void doConnect(Promise<Void> connectPromise) {
        bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                channel = future.channel();
                retryCount = 0;
                System.out.println("连接成功: " + channel.remoteAddress());
                connectPromise.setSuccess(null);
            } else {
                if (retryCount < maxRetries) {
                    retryCount++;
                    System.out.println("连接失败,第" + retryCount + "次重试...");
                    
                    // 延迟重试
                    group.schedule(() -> doConnect(connectPromise), 
                            retryInterval, TimeUnit.MILLISECONDS);
                } else {
                    connectPromise.setFailure(future.cause());
                }
            }
        });
    }
    
    /**
     * 发送消息
     */
    public Promise<Void> send(Object message) {
        DefaultPromise<Void> promise = new DefaultPromise<>(group.next());
        
        if (channel == null || !channel.isActive()) {
            promise.setFailure(new IllegalStateException("Channel is not active"));
            return promise;
        }
        
        channel.writeAndFlush(message).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                promise.setSuccess(null);
            } else {
                promise.setFailure(future.cause());
            }
        });
        
        return promise;
    }
    
    /**
     * 关闭客户端
     */
    public Promise<Void> close() {
        if (channel != null && channel.isActive()) {
            channel.close().addListener((ChannelFutureListener) future -> {
                group.shutdownGracefully();
                closePromise.setSuccess(null);
            });
        } else {
            group.shutdownGracefully();
            closePromise.setSuccess(null);
        }
        
        return closePromise;
    }
    
    /**
     * 客户端处理器
     */
    private class ClientHandler extends SimpleChannelInboundHandler<Message> {
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
            // 处理收到的消息
            System.out.println("收到消息: " + msg);
        }
        
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                // 发送心跳
                ctx.writeAndFlush(new HeartbeatMessage());
            }
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("连接断开,尝试重连...");
            channel = null;
            
            // 自动重连
            if (retryCount < maxRetries) {
                retryCount++;
                group.schedule(() -> {
                    connect().addListener((GenericFutureListener<Future<Void>>) future -> {
                        if (future.isSuccess()) {
                            System.out.println("重连成功");
                        }
                    });
                }, retryInterval, TimeUnit.MILLISECONDS);
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.println("异常: " + cause.getMessage());
            ctx.close();
        }
    }
}

8.2 使用示例

public static void main(String[] args) {
    ReliableClient client = new ReliableClient("localhost", 8080, 5, 3000);
    
    // 连接
    client.connect().addListener((GenericFutureListener<Future<Void>>) future -> {
        if (future.isSuccess()) {
            System.out.println("客户端就绪");
            
            // 发送消息
            client.send(new Message("Hello")).addListener(f -> {
                if (f.isSuccess()) {
                    System.out.println("消息发送成功");
                }
            });
        } else {
            System.err.println("连接失败: " + future.cause());
        }
    });
    
    // 关闭钩子
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        client.close().awaitUninterruptibly();
    }));
}

九、总结

Netty的异步编程模型是其高性能的核心:

  1. ChannelFuture:只读的异步结果,支持监听器回调
  2. Promise:可写的异步结果,可以手动设置成功或失败
  3. 监听器机制:非阻塞获取结果,避免线程阻塞
  4. 链式调用:通过addListener实现异步操作串联
  5. 死锁检测:自动检测在EventLoop中的阻塞调用
  6. sync vs await:sync失败抛异常,await需要手动检查

下一篇,我们将深入剖析ByteBuf的内存管理机制,揭示Netty高性能内存管理的秘密。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐