【Netty】四.Netty异步编程模型深度解析
摘要:本文深入解析Netty异步编程模型,对比同步/异步编程优劣,剖析JDK Future的局限性及Netty的改进方案。重点讲解ChannelFuture和Promise的实现原理,包括状态管理、监听器机制和链式调用。通过源码分析DefaultPromise的核心逻辑,演示异步编程实践案例,并指出sync与await的区别及死锁规避方法。最后给出可靠异步客户端的完整实现方案,涵盖连接重试、心跳检
写在前面
在现代高并发系统中,异步编程已经成为标配。Netty作为高性能网络框架,其核心设计理念就是"一切皆异步"——连接建立是异步的、数据读写是异步的、甚至关闭连接也是异步的。
这篇文章将深入剖析Netty的异步编程模型,从JDK Future的局限性说起,到ChannelFuture和Promise的实现原理,再到异步回调的链式编程,全面揭示Netty异步编程的精髓。
一、为什么需要异步编程
1.1 同步编程的困境
传统的同步网络编程模型:
// 同步阻塞模式
public class SyncServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
while (true) {
Socket socket = serverSocket.accept(); // 阻塞等待连接
InputStream in = socket.getInputStream();
byte[] buffer = new byte[1024];
int len = in.read(buffer); // 阻塞等待数据
// 处理请求
byte[] response = process(buffer, len);
OutputStream out = socket.getOutputStream();
out.write(response); // 阻塞发送响应
}
}
}
同步模式的问题:
- 线程阻塞:每个操作都会阻塞当前线程
- 资源浪费:阻塞期间线程无法执行其他任务
- 扩展性差:并发量受限于线程数量
1.2 异步编程的优势
异步模式下,操作发起后立即返回,结果通过回调通知:
// Netty异步模式
public class AsyncClient {
public void connect(String host, int port) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyHandler());
}
});
// 异步连接,立即返回
ChannelFuture future = bootstrap.connect(host, port);
// 添加回调,连接成功后执行
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
System.out.println("连接成功");
// 发送数据
f.channel().writeAndFlush("Hello");
} else {
System.err.println("连接失败: " + f.cause());
}
});
// 主线程可以继续执行其他任务
System.out.println("连接请求已发起");
}
}
异步模式的优势:
- 非阻塞:操作立即返回,线程可以执行其他任务
- 高吞吐:少量线程即可处理大量并发
- 资源高效:线程利用率高,减少上下文切换
二、JDK Future的局限性
2.1 JDK Future接口
JDK 5引入了Future接口,代表一个异步计算的结果:
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 是否已取消
boolean isCancelled();
// 是否已完成
boolean isDone();
// 获取结果(阻塞)
V get() throws InterruptedException, ExecutionException;
// 获取结果(超时阻塞)
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
2.2 使用示例
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交任务,立即返回Future
Future<String> future = executor.submit(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Hello, Future!";
});
// 做其他事情
System.out.println("任务已提交");
// 获取结果(阻塞)
String result = future.get(); // 阻塞等待结果
System.out.println(result);
2.3 JDK Future的问题
问题一:获取结果必须阻塞
// 只能通过get()获取结果,而get()是阻塞的
String result = future.get(); // 阻塞当前线程
无法在任务完成后自动收到通知,必须主动轮询或阻塞等待。
问题二:无法设置结果
Future是只读的,只有任务执行者可以设置结果,外部无法手动设置:
// 无法手动设置Future的结果
future.setResult("手动设置"); // 编译错误!没有这个方法
问题三:不支持链式调用
// 想要在第一个任务完成后执行第二个任务,需要手动编排
Future<String> future1 = executor.submit(task1);
String result1 = future1.get(); // 阻塞
Future<String> future2 = executor.submit(() -> task2(result1));
String result2 = future2.get(); // 再次阻塞
问题四:异常处理不便
try {
String result = future.get();
} catch (ExecutionException e) {
// 异常被包装在ExecutionException中
Throwable cause = e.getCause(); // 需要解包获取真实异常
}
2.4 JDK 8 CompletableFuture的改进
JDK 8引入了CompletableFuture,提供了更丰富的异步编程能力:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 链式调用
future.thenApply(s -> s + " World")
.thenAccept(System.out::println); // 输出: Hello World
// 异常处理
future.exceptionally(ex -> "Error: " + ex.getMessage());
但CompletableFuture是为通用异步计算设计的,没有针对网络I/O场景优化。Netty的ChannelFuture和Promise则专门为网络编程量身定制。
三、ChannelFuture源码分析
3.1 ChannelFuture接口
ChannelFuture是Netty对JDK Future的扩展:
public interface ChannelFuture extends Future<Void> {
// 获取关联的Channel
Channel channel();
// 是否成功(非阻塞)
@Override
boolean isSuccess();
// 是否可取消
@Override
boolean isCancellable();
// 获取异常原因
@Override
Throwable cause();
// 添加监听器
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
// 添加多个监听器
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
// 移除监听器
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
// 等待完成(阻塞)
@Override
ChannelFuture await() throws InterruptedException;
// 等待完成(不可中断)
@Override
ChannelFuture awaitUninterruptibly();
// 同步等待(阻塞,可抛出异常)
@Override
ChannelFuture sync() throws InterruptedException;
// 同步等待(不可中断)
@Override
ChannelFuture syncUninterruptibly();
}
3.2 核心方法对比
|
方法 |
JDK Future |
ChannelFuture |
说明 |
|
获取结果 |
get() 阻塞 |
await() 阻塞 |
ChannelFuture不返回结果值 |
|
检查完成 |
isDone() |
isDone() |
相同 |
|
检查成功 |
无 |
isSuccess() |
非阻塞检查 |
|
获取异常 |
无 |
cause() |
非阻塞获取 |
|
回调通知 |
无 |
addListener() |
核心优势 |
|
链式调用 |
无 |
返回this |
支持链式 |
3.3 DefaultChannelPromise源码
DefaultChannelPromise是ChannelFuture的核心实现:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise {
private final Channel channel;
public DefaultChannelPromise(Channel channel) {
this.channel = channel;
}
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = channel;
}
@Override
public Channel channel() {
return channel;
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
}
return e;
}
}
可以看到,ChannelPromise继承自DefaultPromise,核心逻辑在DefaultPromise中。
3.4 DefaultPromise源码分析
DefaultPromise是Netty异步模型的核心:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 状态常量
private static final int SUCCESS = 0;
private static final int FAILURE = 1;
private static final int CANCELLED = 2;
private static final int NOT_SIGNALLED = 3;
// 结果(可能是值或异常)
private volatile Object result;
// 监听器链表
private volatile Object listeners;
// 等待线程计数
private short waiters;
// 执行器(可选)
private final EventExecutor executor;
/**
* 判断是否成功
*/
@Override
public boolean isSuccess() {
Object result = this.result;
return result != null && !(result instanceof CauseHolder) && !(result instanceof Failure);
}
/**
* 获取异常
*/
@Override
public Throwable cause() {
Object result = this.result;
if (result instanceof CauseHolder) {
return ((CauseHolder) result).cause;
}
if (result instanceof Failure) {
return ((Failure) result).cause;
}
return null;
}
/**
* 设置成功结果
*/
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners(); // 通知所有监听器
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters(); // 唤醒等待线程
return true;
}
return false;
}
/**
* 设置失败结果
*/
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(cause));
}
/**
* 添加监听器
*/
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
Object listeners = this.listeners;
if (listeners == null) {
this.listeners = listener;
} else {
// 添加到链表
this.listeners = new DefaultFutureListeners((DefaultFutureListeners) listeners, listener);
}
// 如果已完成,立即通知
if (isDone()) {
notifyListeners();
}
return this;
}
/**
* 通知监听器
*/
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
notifyListenersNow();
} else {
// 在EventLoop中执行通知
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
}
private void notifyListenersNow() {
Object listeners = this.listeners;
while (listeners != null) {
this.listeners = null;
if (listeners instanceof DefaultFutureListeners) {
// 多个监听器
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 单个监听器
notifyListener0(this, (GenericFutureListener) listeners);
}
listeners = this.listeners; // 检查是否有新添加的监听器
}
}
private static void notifyListener0(Future<?> future, GenericFutureListener<?> l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by {}.operationComplete()", l.getClass().getName(), t);
}
}
/**
* 等待完成
*/
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters(); // waiters++
try {
wait(); // 等待通知
} finally {
decWaiters(); // waiters--
}
}
}
return this;
}
/**
* 死锁检测
*/
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(this);
}
}
}
3.5 状态流转图
┌─────────────────────────────────────────────────────────────────────────┐
│ Promise状态流转 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ INITIAL │ │
│ │ (初始状态) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ SUCCESS │ │ FAILURE │ │ CANCELLED │ │
│ │ (成功状态) │ │ (失败状态) │ │ (取消状态) │ │
│ │ │ │ │ │ │ │
│ │ result = value │ │ result = cause │ │ result = null │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ 状态一旦设置,不可更改 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
3.6 监听器通知机制
┌─────────────────────────────────────────────────────────────────────────┐
│ 监听器通知流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ addListener(listener) │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 添加到监听器链 │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ 是 ┌─────────────────┐ │
│ │ isDone()? │───────────▶│ 立即通知监听器 │ │
│ └────────┬────────┘ └─────────────────┘ │
│ │ 否 │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 等待完成... │ │
│ └────────┬────────┘ │
│ │ │
│ │ setSuccess() / setFailure() │
│ ▼ │
│ ┌─────────────────┐ │
│ │ notifyListeners│ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 遍历监听器链,依次调用 │ │
│ │ listener1.operationComplete() → listener2... → listenerN│ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
四、Promise:可写的Future
4.1 Promise接口
Promise继承自Future,增加了设置结果的能力:
public interface Promise<V> extends Future<V> {
// 设置成功结果
Promise<V> setSuccess(V result);
// 尝试设置成功结果(不抛异常)
boolean trySuccess(V result);
// 设置失败结果
Promise<V> setFailure(Throwable cause);
// 尝试设置失败结果(不抛异常)
boolean tryFailure(Throwable cause);
// 设置为不可取消
boolean setUncancellable();
}
4.2 Promise vs ChannelFuture
|
特性 |
ChannelFuture |
Promise |
|
可读 |
是 |
是 |
|
可写 |
否 |
是 |
|
创建者 |
Netty框架 |
开发者 |
|
使用场景 |
监听操作结果 |
手动设置结果 |
关系图:
┌─────────────────────────────────────────────────────────────────────────┐
│ Future继承体系 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ │
│ │ Future<V> │ (JDK,只读) │
│ └───────┬───────┘ │
│ │ │
│ ┌───────────────┴───────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ ChannelFuture │ │ Promise<V> │ (可写) │
│ │ (只读) │ │ │ │
│ └───────┬───────┘ └───────┬───────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ChannelPromise │◀──────────────│DefaultPromise │ │
│ │ (读写) │ │ │ │
│ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
4.3 Promise使用场景
场景一:异步操作结果回调
public class AsyncOperation {
/**
* 执行异步操作,返回Promise
*/
public Promise<String> doAsync() {
DefaultPromise<String> promise = new DefaultPromise<>();
// 在另一个线程中执行操作
new Thread(() -> {
try {
String result = heavyOperation();
promise.setSuccess(result); // 设置成功结果
} catch (Exception e) {
promise.setFailure(e); // 设置失败结果
}
}).start();
return promise;
}
/**
* 使用示例
*/
public void example() {
Promise<String> promise = doAsync();
promise.addListener((Future<String> future) -> {
if (future.isSuccess()) {
System.out.println("成功: " + future.getNow());
} else {
System.err.println("失败: " + future.cause());
}
});
}
}
场景二:RPC调用结果
public class RpcClient {
private final Map<Long, Promise<RpcResponse>> pendingRequests =
new ConcurrentHashMap<>();
/**
* 发送RPC请求
*/
public Promise<RpcResponse> sendRequest(RpcRequest request) {
DefaultPromise<RpcResponse> promise = new DefaultPromise<>();
// 保存待处理请求
pendingRequests.put(request.getRequestId(), promise);
// 发送请求
channel.writeAndFlush(request);
// 设置超时
eventLoop.schedule(() -> {
if (!promise.isDone()) {
promise.tryFailure(new TimeoutException("Request timeout"));
pendingRequests.remove(request.getRequestId());
}
}, 5, TimeUnit.SECONDS);
return promise;
}
/**
* 处理响应
*/
public void handleResponse(RpcResponse response) {
Promise<RpcResponse> promise = pendingRequests.remove(response.getRequestId());
if (promise != null) {
promise.setSuccess(response);
}
}
}
场景三:连接状态管理
public class ConnectionManager {
private Channel channel;
private Promise<Void> connectPromise;
/**
* 连接服务器
*/
public Promise<Void> connect(String host, int port) {
if (connectPromise != null && !connectPromise.isDone()) {
return connectPromise; // 连接进行中
}
connectPromise = new DefaultPromise<>(eventLoop);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ConnectionHandler());
}
});
bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
channel = future.channel();
connectPromise.setSuccess(null);
} else {
connectPromise.setFailure(future.cause());
}
});
return connectPromise;
}
/**
* 连接处理器
*/
private class ConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 连接断开,重置连接状态
channel = null;
connectPromise = null;
}
}
}
五、异步回调链式编程
5.1 GenericFutureListener
Netty通过GenericFutureListener接口实现回调:
public interface GenericFutureListener<F extends Future<?>> {
/**
* 操作完成时调用
*/
void operationComplete(F future) throws Exception;
}
使用示例:
ChannelFuture future = channel.writeAndFlush(message);
future.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("发送成功");
} else {
System.err.println("发送失败: " + future.cause());
}
}
});
// Lambda简化写法
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
System.out.println("发送成功");
}
});
5.2 ChannelFutureListener
Netty提供了常用的ChannelFutureListener实现:
// 关闭连接
future.addListener(ChannelFutureListener.CLOSE);
// 关闭连接失败时记录日志
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// 触发异常传播
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
源码:
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
// 关闭连接
ChannelFutureListener CLOSE = future -> {
future.channel().close();
};
// 失败时关闭
ChannelFutureListener CLOSE_ON_FAILURE = future -> {
if (!future.isSuccess()) {
future.channel().close();
}
};
// 失败时触发异常
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = future -> {
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
};
}
5.3 链式异步调用
通过addListener可以实现链式异步调用:
/**
* 链式异步调用示例
*/
public class ChainAsyncExample {
public void connectAndSend(String host, int port, Object message) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class);
// 1. 异步连接
bootstrap.connect(host, port).addListener((ChannelFutureListener) connectFuture -> {
if (!connectFuture.isSuccess()) {
System.err.println("连接失败: " + connectFuture.cause());
return;
}
Channel channel = connectFuture.channel();
// 2. 连接成功后发送消息
channel.writeAndFlush(message).addListener((ChannelFutureListener) sendFuture -> {
if (!sendFuture.isSuccess()) {
System.err.println("发送失败: " + sendFuture.cause());
return;
}
System.out.println("消息发送成功");
// 3. 发送成功后关闭连接
channel.close().addListener((ChannelFutureListener) closeFuture -> {
System.out.println("连接已关闭");
});
});
});
}
}
5.4 封装链式调用工具
为了简化链式调用,可以封装工具类:
/**
* 异步操作工具类
*/
public class AsyncUtils {
/**
* 顺序执行多个异步操作
*/
public static Promise<Void> sequence(EventExecutor executor,
AsyncOperation... operations) {
DefaultPromise<Void> result = new DefaultPromise<>(executor);
executeSequence(executor, operations, 0, result);
return result;
}
private static void executeSequence(EventExecutor executor,
AsyncOperation[] operations, int index, Promise<Void> result) {
if (index >= operations.length) {
result.setSuccess(null);
return;
}
Promise<Void> promise = operations[index].execute();
promise.addListener((GenericFutureListener<Future<Void>>) future -> {
if (future.isSuccess()) {
executeSequence(executor, operations, index + 1, result);
} else {
result.setFailure(future.cause());
}
});
}
/**
* 并行执行多个异步操作
*/
public static Promise<Void> parallel(EventExecutor executor,
Promise<?>... promises) {
DefaultPromise<Void> result = new DefaultPromise<>(executor);
AtomicInteger counter = new AtomicInteger(promises.length);
AtomicReference<Throwable> firstError = new AtomicReference<>();
for (Promise<?> promise : promises) {
promise.addListener((GenericFutureListener<Future<?>>) future -> {
if (!future.isSuccess()) {
firstError.compareAndSet(null, future.cause());
}
if (counter.decrementAndGet() == 0) {
Throwable error = firstError.get();
if (error != null) {
result.setFailure(error);
} else {
result.setSuccess(null);
}
}
});
}
return result;
}
/**
* 异步操作接口
*/
@FunctionalInterface
public interface AsyncOperation {
Promise<Void> execute();
}
}
使用示例:
// 顺序执行
AsyncUtils.sequence(eventLoop,
() -> connect(host, port),
() -> login(username, password),
() -> sendMessage(message),
() -> disconnect()
).addListener(future -> {
if (future.isSuccess()) {
System.out.println("所有操作完成");
} else {
System.err.println("操作失败: " + future.cause());
}
});
// 并行执行
AsyncUtils.parallel(eventLoop,
sendRequest1(),
sendRequest2(),
sendRequest3()
).addListener(future -> {
System.out.println("所有请求完成");
});
六、sync vs await的区别
6.1 方法对比
// sync() - 同步等待,失败时抛出异常
ChannelFuture future = channel.writeAndFlush(message);
future.sync(); // 阻塞等待,如果操作失败会抛出异常
// await() - 等待完成,不抛出异常
ChannelFuture future = channel.writeAndFlush(message);
future.await(); // 阻塞等待,需要手动检查结果
if (!future.isSuccess()) {
System.err.println("失败: " + future.cause());
}
6.2 源码分析
// DefaultPromise.sync()
@Override
public Promise<V> sync() throws InterruptedException {
await();
if (!isSuccess()) {
throw new ExecutionException(this, cause()); // 失败时抛异常
}
return this;
}
// DefaultPromise.await()
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
// ... 等待完成,不抛异常
return this;
}
6.3 使用场景
|
场景 |
推荐方法 |
原因 |
|
启动时等待服务就绪 |
sync() |
启动失败应抛异常终止 |
|
关闭时等待资源释放 |
sync() |
关闭失败应抛异常 |
|
业务消息发送 |
await() |
需要自定义错误处理 |
|
RPC调用等待响应 |
await() |
需要区分超时和失败 |
七、死锁问题与规避
7.1 死锁场景
在EventLoop线程中调用sync()或await()会导致死锁:
// 错误示例:在EventLoop中调用sync()
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 在EventLoop线程中调用sync()会抛异常
ChannelFuture future = ctx.writeAndFlush(response);
future.sync(); // BlockingOperationException!
}
}
7.2 Netty的死锁检测
// DefaultPromise.checkDeadLock()
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(this);
}
}
Netty会在await()时检测是否在EventLoop线程中调用,如果是则抛出BlockingOperationException。
7.3 正确做法
// 正确做法:使用addListener
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.writeAndFlush(response);
// 使用回调,不阻塞EventLoop
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
System.out.println("发送成功");
} else {
System.err.println("发送失败: " + f.cause());
}
});
}
}
八、实战:构建可靠的异步客户端
8.1 完整示例
/**
* 可靠的异步客户端
*/
public class ReliableClient {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private volatile Channel channel;
private final Promise<Void> closePromise;
// 重连参数
private final String host;
private final int port;
private final int maxRetries;
private final long retryInterval;
private int retryCount = 0;
public ReliableClient(String host, int port, int maxRetries, long retryInterval) {
this.host = host;
this.port = port;
this.maxRetries = maxRetries;
this.retryInterval = retryInterval;
this.group = new NioEventLoopGroup();
this.closePromise = new DefaultPromise<>(group.next());
this.bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS));
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new ClientHandler());
}
});
}
/**
* 连接服务器
*/
public Promise<Void> connect() {
DefaultPromise<Void> connectPromise = new DefaultPromise<>(group.next());
doConnect(connectPromise);
return connectPromise;
}
private void doConnect(Promise<Void> connectPromise) {
bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
channel = future.channel();
retryCount = 0;
System.out.println("连接成功: " + channel.remoteAddress());
connectPromise.setSuccess(null);
} else {
if (retryCount < maxRetries) {
retryCount++;
System.out.println("连接失败,第" + retryCount + "次重试...");
// 延迟重试
group.schedule(() -> doConnect(connectPromise),
retryInterval, TimeUnit.MILLISECONDS);
} else {
connectPromise.setFailure(future.cause());
}
}
});
}
/**
* 发送消息
*/
public Promise<Void> send(Object message) {
DefaultPromise<Void> promise = new DefaultPromise<>(group.next());
if (channel == null || !channel.isActive()) {
promise.setFailure(new IllegalStateException("Channel is not active"));
return promise;
}
channel.writeAndFlush(message).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(future.cause());
}
});
return promise;
}
/**
* 关闭客户端
*/
public Promise<Void> close() {
if (channel != null && channel.isActive()) {
channel.close().addListener((ChannelFutureListener) future -> {
group.shutdownGracefully();
closePromise.setSuccess(null);
});
} else {
group.shutdownGracefully();
closePromise.setSuccess(null);
}
return closePromise;
}
/**
* 客户端处理器
*/
private class ClientHandler extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
// 处理收到的消息
System.out.println("收到消息: " + msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
// 发送心跳
ctx.writeAndFlush(new HeartbeatMessage());
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("连接断开,尝试重连...");
channel = null;
// 自动重连
if (retryCount < maxRetries) {
retryCount++;
group.schedule(() -> {
connect().addListener((GenericFutureListener<Future<Void>>) future -> {
if (future.isSuccess()) {
System.out.println("重连成功");
}
});
}, retryInterval, TimeUnit.MILLISECONDS);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("异常: " + cause.getMessage());
ctx.close();
}
}
}
8.2 使用示例
public static void main(String[] args) {
ReliableClient client = new ReliableClient("localhost", 8080, 5, 3000);
// 连接
client.connect().addListener((GenericFutureListener<Future<Void>>) future -> {
if (future.isSuccess()) {
System.out.println("客户端就绪");
// 发送消息
client.send(new Message("Hello")).addListener(f -> {
if (f.isSuccess()) {
System.out.println("消息发送成功");
}
});
} else {
System.err.println("连接失败: " + future.cause());
}
});
// 关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
client.close().awaitUninterruptibly();
}));
}
九、总结
Netty的异步编程模型是其高性能的核心:
- ChannelFuture:只读的异步结果,支持监听器回调
- Promise:可写的异步结果,可以手动设置成功或失败
- 监听器机制:非阻塞获取结果,避免线程阻塞
- 链式调用:通过addListener实现异步操作串联
- 死锁检测:自动检测在EventLoop中的阻塞调用
- sync vs await:sync失败抛异常,await需要手动检查
下一篇,我们将深入剖析ByteBuf的内存管理机制,揭示Netty高性能内存管理的秘密。
更多推荐


所有评论(0)