【Netty】三.ChannelPipeline与ChannelHandler责任链深度解析
本文深入解析了Netty中ChannelPipeline和ChannelHandler的核心机制。Pipeline采用双向链表结构实现责任链模式,包含固定的Head和Tail节点,分别处理I/O操作和资源释放。Handler分为入站(Inbound)和出站(Outbound)两种类型,通过ChannelHandlerContext与Pipeline交互。文章详细阐述了事件传播流程、动态修改Pipe
写在前面
前两篇文章我们分析了Netty的线程模型和核心组件Channel、EventLoop。但有了Channel和EventLoop还不够——网络数据是"生"的,需要经过解析、处理、转换才能被业务使用。这个数据处理的过程,就是由ChannelPipeline和ChannelHandler完成的。
Pipeline是Netty责任链模式的核心实现,Handler是链上的处理节点。理解Pipeline和Handler的工作原理,是掌握Netty编程的关键。
一、责任链模式回顾
1.1 什么是责任链模式
责任链模式是一种行为设计模式,允许将请求沿着处理链传递,直到有一个处理者能够处理它。
经典场景:
- Web框架的过滤器链
- 日志框架的日志级别过滤
- 审批流程的多级审批
传统实现:
// 传统责任链实现
abstract class Handler {
protected Handler next;
public Handler setNext(Handler next) {
this.next = next;
return next;
}
public abstract void handle(Request request);
}
class HandlerA extends Handler {
@Override
public void handle(Request request) {
// 处理逻辑
if (next != null) {
next.handle(request);
}
}
}
// 使用
Handler chain = new HandlerA();
chain.setNext(new HandlerB()).setNext(new HandlerC());
chain.handle(request);
1.2 Netty责任链的特点
Netty的责任链实现更加复杂和强大:
|
特性 |
传统责任链 |
Netty Pipeline |
|
方向 |
单向 |
双向(入站/出站) |
|
节点类型 |
单一类型 |
两种类型 |
|
上下文 |
无 |
ChannelHandlerContext |
|
动态修改 |
困难 |
支持运行时增删Handler |
|
事件传播 |
手动调用next |
提供便捷方法 |
二、ChannelPipeline架构
2.1 Pipeline的结构
ChannelPipeline是一个双向链表,节点是ChannelHandlerContext:
┌─────────────────────────────────────────────────────────────────────────┐
│ ChannelPipeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Head ──▶ Context1 ──▶ Context2 ──▶ Context3 ──▶ ... ──▶ Tail │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │
│ │ Handler1 Handler2 Handler3 (end) │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ HeadHandler │
│ │
│ 入站事件流向(从左到右):Head → Handler1 → Handler2 → Handler3 → Tail │
│ 出站事件流向(从右到左):Tail → Handler3 → Handler2 → Handler1 → Head│
│ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 DefaultChannelPipeline源码
public class DefaultChannelPipeline implements ChannelPipeline {
// 头节点(固定)
final AbstractChannelHandlerContext head;
// 尾节点(固定)
final AbstractChannelHandlerContext tail;
// 所属Channel
protected final Channel channel;
// 初始化
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// 创建头尾节点
tail = new TailContext(this);
head = new HeadContext(this);
// 建立双向链表
head.next = tail;
tail.prev = head;
}
// 添加Handler到链尾
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group,
String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 创建Context
newCtx = newContext(group, filterName(name, handler), handler);
// 添加到链表
addLast0(newCtx);
// 如果已注册,触发handlerAdded事件
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
callHandlerAdded0(newCtx);
}
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
}
2.3 HeadContext与TailContext
Pipeline有两个特殊的节点:Head和Tail。
HeadContext:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
}
// 入站事件处理
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg); // 继续传播
}
// 出站事件处理(最终执行I/O操作)
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise); // 调用Unsafe进行实际写操作
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush(); // 调用Unsafe进行实际刷新
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.close(promise); // 调用Unsafe关闭连接
}
}
HeadContext的特点:
- 既是入站Handler,也是出站Handler
- 入站事件从这里开始传播
- 出站事件在这里结束,调用Unsafe执行实际I/O操作
TailContext:
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
}
// 入站事件的终点
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 如果消息到达Tail还没被处理,释放资源
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 异常到达Tail,记录日志
onUnhandledInboundException(cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
// 用户事件到达Tail,释放资源
ReferenceCountUtil.release(evt);
}
}
TailContext的特点:
- 只是入站Handler
- 入站事件在这里结束
- 负责资源释放和异常兜底
2.4 完整的事件传播图
┌─────────────────────────────────────────────────────────────────────────────┐
│ 事件传播完整流程 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 【入站事件】(从左到右) │
│ │
│ Channel.read() │
│ │ │
│ ▼ │
│ ┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ Head │───▶│ Decoder │───▶│ Handler │───▶│ Encoder │───▶│ Tail │ │
│ │ │ │ (Inbound)│ │(Inbound) │ │(Inbound) │ │ │ │
│ └────────┘ └──────────┘ └──────────┘ └──────────┘ └────────┘ │
│ │ │ │
│ │ ▼ │
│ │ release(msg) │
│ │ │
│ │
│ 【出站事件】(从右到左) │
│ │
│ ctx.writeAndFlush(msg) │
│ │ │
│ ▼ │
│ ┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ Head │◀───│ Encoder │◀───│ Handler │◀───│ Decoder │◀───│ Tail │ │
│ │ │ │(Outbound)│ │(Outbound)│ │(Outbound)│ │ │ │
│ └────────┘ └──────────┘ └──────────┘ └──────────┘ └────────┘ │
│ │ │
│ ▼ │
│ unsafe.write(msg) │
│ │ │
│ ▼ │
│ 网络发送 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
三、ChannelHandler详解
3.1 Handler的两种类型
Netty将Handler分为两类:入站Handler和出站Handler。
入站Handler:处理入站数据,如读取数据、连接建立、连接关闭等事件。
public interface ChannelInboundHandler extends ChannelHandler {
// Channel注册到EventLoop
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
// Channel从EventLoop注销
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
// Channel活跃(连接建立)
void channelActive(ChannelHandlerContext ctx) throws Exception;
// Channel非活跃(连接断开)
void channelInactive(ChannelHandlerContext ctx) throws Exception;
// 读取数据
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
// 读取完成
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
// 用户事件触发
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
// Channel可写状态变化
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
// 异常捕获
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
出站Handler:处理出站数据,如写数据、关闭连接、绑定端口等操作。
public interface ChannelOutboundHandler extends ChannelHandler {
// 绑定地址
void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception;
// 连接远程地址
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 断开连接
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 关闭Channel
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 从EventLoop注销
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 读取数据(触发读操作)
void read(ChannelHandlerContext ctx) throws Exception;
// 写数据
void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception;
// 刷新数据
void flush(ChannelHandlerContext ctx) throws Exception;
}
3.2 适配器模式
直接实现接口需要实现所有方法,很繁琐。Netty提供了适配器类:
// 入站适配器
public class ChannelInboundHandlerAdapter implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered(); // 默认传播事件
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg); // 默认传播事件
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause); // 默认传播异常
}
// 其他方法默认也是传播事件...
}
// 出站适配器
public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
ctx.write(msg, promise); // 默认传播事件
}
// 其他方法默认也是传播事件...
}
使用示例:
// 只需要重写关心的方法
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 只处理读事件
System.out.println("Received: " + msg);
ctx.fireChannelRead(msg); // 继续传播
}
}
3.3 SimpleChannelInboundHandler
对于消息处理,Netty提供了更便捷的SimpleChannelInboundHandler:
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType,
boolean autoRelease) {
this.matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
// 类型匹配
if (matcher.match(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg); // 调用子类实现
} else {
release = false;
ctx.fireChannelRead(msg); // 不匹配则传播
}
} finally {
// 自动释放消息
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
// 子类实现
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}
使用示例:
public class StringHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 只处理String类型消息
// 消息会自动释放,无需手动调用ReferenceCountUtil.release()
System.out.println("Received string: " + msg);
}
}
public class MessageHandler extends SimpleChannelInboundHandler<MessageInfo> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageInfo msg) {
// 只处理MessageInfo类型消息
processMessage(msg);
}
}
四、ChannelHandlerContext详解
4.1 Context的作用
ChannelHandlerContext是Handler与Pipeline交互的桥梁:
┌─────────────────────────────────────────────────────────────────────┐
│ ChannelHandlerContext │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 核心引用 │ │
│ │ - pipeline: 所属的Pipeline │ │
│ │ - handler: 包装的Handler │ │
│ │ - prev: 上一个Context │ │
│ │ - next: 下一个Context │ │
│ │ - executor: 专属的EventExecutor(可选) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 核心能力 │ │
│ │ - 事件传播:fireChannelRead()、fireChannelActive()... │ │
│ │ - 出站操作:write()、flush()、close()... │ │
│ │ - 属性存储:attr()、hasAttr() │ │
│ │ - 线程判断:executor().inEventLoop() │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
4.2 AbstractChannelHandlerContext源码
abstract class AbstractChannelHandlerContext
implements ChannelHandlerContext, ResourceLeakHint {
// 双向链表指针
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
// 所属Pipeline
private final DefaultChannelPipeline pipeline;
// Handler名称
private final String name;
// 是否为入站/出站Handler
private final boolean inbound;
private final boolean outbound;
// 有序执行器(可选,用于指定Handler的执行线程)
private final EventExecutor executor;
// 初始化
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.pipeline = pipeline;
this.name = ObjectUtil.checkNotNull(name, "name");
this.executor = executor;
// 根据Handler类型设置标记
this.inbound = ChannelHandlerMask.isInbound(handlerClass);
this.outbound = ChannelHandlerMask.isOutbound(handlerClass);
}
// 获取Channel
@Override
public Channel channel() {
return pipeline.channel();
}
// 获取Pipeline
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
// 获取EventLoop
@Override
public EventLoop eventLoop() {
if (executor == null) {
return channel().eventLoop();
} else {
return (EventLoop) executor;
}
}
}
4.3 事件传播方法
Context提供了丰富的事件传播方法:
// 入站事件传播
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
// 出站事件传播
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
write(msg, false, promise);
return promise;
}
private void invokeChannelRead(AbstractChannelHandlerContext next, Object msg) {
final Object m = pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
4.4 findContextInbound/Outbound
查找下一个能处理该事件的Context:
// 查找下一个入站Context
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor;
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
// 查找下一个出站Context
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor;
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
// 判断是否跳过该Context
private static boolean skipContext(AbstractChannelHandlerContext ctx,
EventExecutor currentExecutor, int mask, int onlyMask) {
// 检查是否为入站/出站Handler
if ((ctx.executionMask & onlyMask) == 0) {
return true;
}
// 检查是否支持该事件
if ((ctx.executionMask & mask) == 0) {
return true;
}
// 检查Executor是否相同
if (ctx.executor == currentExecutor) {
return false;
}
return false;
}
查找逻辑图:
入站事件查找(向右找):
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐
│ Head │───▶│ Context1 │───▶│ Context2 │───▶│ Context3 │───▶│ Tail │
│(Inbound)│ │(Outbound)│ │(Inbound) │ │(Outbound)│ │(Inbound)│
└────────┘ └──────────┘ └──────────┘ └──────────┘ └────────┘
│ │ │ │ │
│ 跳过 找到 跳过 │
│ │ │ │ │
└──────────────┴───────────────┴───────────────┴──────────────┘
入站事件传播路径
出站事件查找(向左找):
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐
│ Head │◀───│ Context1 │◀───│ Context2 │◀───│ Context3 │◀───│ Tail │
│(Outbound)│ │(Outbound)│ │(Inbound) │ │(Outbound)│ │ │
└────────┘ └──────────┘ └──────────┘ └──────────┘ └────────┘
│ │ │ │ │
找到 找到 跳过 找到 │
│ │ │ │ │
└──────────────┴───────────────┴───────────────┴──────────────┘
出站事件传播路径
五、Pipeline动态修改
5.1 运行时添加Handler
// 在Pipeline末尾添加
pipeline.addLast("handler", new MyHandler());
// 在指定位置添加
pipeline.addBefore("handler", "newHandler", new NewHandler());
pipeline.addAfter("handler", "newHandler", new NewHandler());
// 在Pipeline头部添加
pipeline.addFirst("firstHandler", new FirstHandler());
5.2 运行时移除Handler
// 根据名称移除
pipeline.remove("handler");
// 根据Handler实例移除
pipeline.remove(myHandler);
// 移除所有Handler
pipeline.removeFirst();
pipeline.removeLast();
5.3 动态修改的应用场景
场景一:协议升级
public class HttpUpgradeHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
if ("websocket".equalsIgnoreCase(request.headers().get("Upgrade"))) {
// 移除HTTP处理器
pipeline.remove("httpHandler");
// 添加WebSocket处理器
pipeline.addLast("websocketHandler", new WebSocketHandler());
// 握手响应
handleWebSocketHandshake(ctx, request);
} else {
// 正常HTTP处理
ctx.fireChannelRead(request);
}
}
}
场景二:SSL动态开启
public class SslDetectionHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 检测是否为SSL/TLS握手
if (in.readableBytes() >= 5) {
int firstByte = in.getByte(in.readerIndex()) & 0xFF;
if (firstByte == 0x16) { // SSL/TLS握手
// 添加SSL处理器
SslHandler sslHandler = sslContext.newHandler(ctx.alloc());
pipeline.addBefore("decoder", "ssl", sslHandler);
}
// 移除自己
pipeline.remove(this);
}
}
}
场景三:登录后添加业务Handler
public class LoginHandler extends SimpleChannelInboundHandler<LoginRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequest request) {
if (authenticate(request)) {
// 登录成功,添加业务Handler
pipeline.addLast("messageHandler", new MessageHandler());
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
// 移除登录Handler
pipeline.remove(this);
// 响应登录成功
ctx.writeAndFlush(new LoginResponse(true));
} else {
ctx.writeAndFlush(new LoginResponse(false));
}
}
}
六、编解码器位置与设计
6.1 编解码器的正确位置
编解码器是特殊的Handler,位置至关重要:
正确的Pipeline配置:
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐
│ Head │───▶│ Decoder │───▶│ Business │───▶│ Encoder │───▶│ Tail │
│ │ │ (Inbound) │ │ Handler │ │(Outbound)│ │ │
└────────┘ └──────────┘ └──────────┘ └──────────┘ └────────┘
入站数据流:网络字节流 → Decoder解码 → 业务对象 → Handler处理
出站数据流:业务对象 → Encoder编码 → 网络字节流 → 网络发送
6.2 MessageToMessageCodec
Netty提供了便捷的编解码基类:
/**
* 消息到消息的编解码器
* @param <INBOUND> 入站消息类型
* @param <OUTBOUND> 出站消息类型
*/
public abstract class MessageToMessageCodec<INBOUND, OUTBOUND>
extends ChannelDuplexHandler {
private final MessageToMessageEncoder<OUTBOUND> encoder = new Encoder();
private final MessageToMessageDecoder<INBOUND> decoder = new Decoder();
// 解码方法(子类实现)
protected abstract void decode(ChannelHandlerContext ctx, INBOUND msg,
List<Object> out) throws Exception;
// 编码方法(子类实现)
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND msg,
List<Object> out) throws Exception;
}
使用示例:
public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
private static final int HEADER_LENGTH = 8;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
// 读取消息头
long length = msg.readLong();
int type = msg.readInt();
// 读取消息体
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
// 解析为消息对象
Message message = parseMessage(type, bytes);
out.add(message);
}
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) {
ByteBuf buffer = ctx.alloc().buffer();
// 写入消息头
byte[] body = serializeMessage(msg);
buffer.writeLong(HEADER_LENGTH + body.length);
buffer.writeInt(msg.getType());
// 写入消息体
buffer.writeBytes(body);
out.add(buffer);
}
}
6.3 ByteToMessageDecoder
处理粘包拆包的利器:
/**
* 字节到消息的解码器
* 自动处理粘包拆包问题
*/
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 累积缓冲区
ByteBuf cumulation;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 累积数据
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用解码方法
callDecode(ctx, cumulation, out);
} finally {
// ...
}
} else {
ctx.fireChannelRead(msg);
}
}
// 解码方法(子类实现)
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception;
}
自定义协议解码器:
public class CustomProtocolDecoder extends ByteToMessageDecoder {
private static final int MIN_FRAME_LENGTH = 12; // 最小帧长度
private static final int MAX_FRAME_LENGTH = 65535; // 最大帧长度
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 检查是否有足够的数据读取帧头
if (in.readableBytes() < MIN_FRAME_LENGTH) {
return; // 等待更多数据
}
// 标记读取位置
in.markReaderIndex();
// 读取帧长度
int frameLength = in.readInt();
// 校验帧长度
if (frameLength < MIN_FRAME_LENGTH || frameLength > MAX_FRAME_LENGTH) {
in.skipBytes(in.readableBytes()); // 丢弃非法数据
throw new CorruptedFrameException("Invalid frame length: " + frameLength);
}
// 检查是否有完整的帧
if (in.readableBytes() < frameLength - 4) {
in.resetReaderIndex(); // 重置读取位置,等待更多数据
return;
}
// 读取帧内容
int type = in.readInt();
byte[] body = new byte[frameLength - 8];
in.readBytes(body);
// 创建消息对象
Message message = new Message(type, body);
out.add(message);
}
}
七、异常处理
7.1 异常传播机制
异常会沿着Pipeline传播,直到被处理:
// 异常传播
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause); // 传播到下一个Handler
}
// 如果没有Handler处理,最终到达TailContext
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 记录警告日志
logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.", cause);
}
}
7.2 统一异常处理
建议在Pipeline末尾添加统一的异常处理器:
public class ExceptionHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 根据异常类型处理
if (cause instanceof IOException) {
// I/O异常,关闭连接
logger.warn("I/O exception: {}", cause.getMessage());
ctx.close();
} else if (cause instanceof CorruptedFrameException) {
// 协议异常,记录并关闭
logger.error("Corrupted frame: {}", cause.getMessage());
ctx.close();
} else if (cause instanceof TimeoutException) {
// 超时异常
logger.warn("Timeout: {}", cause.getMessage());
ctx.writeAndFlush(new ErrorMessage("Request timeout"));
} else {
// 未知异常
logger.error("Unexpected exception", cause);
ctx.writeAndFlush(new ErrorMessage("Internal server error"));
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 监听写操作异常
promise.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
exceptionCaught(ctx, future.cause());
}
});
ctx.write(msg, promise);
}
}
// Pipeline配置
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("handler", new BusinessHandler());
pipeline.addLast("exceptionHandler", new ExceptionHandler()); // 放在最后
八、实战:构建完整的Pipeline
8.1 IM系统Pipeline配置
/**
* 即时通讯系统Pipeline配置
*/
public class IMChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 1. 空闲检测(60秒读超时)
pipeline.addLast("idle", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
// 2. 消息解码器
pipeline.addLast("decoder", new MessageDecoder());
// 3. 消息编码器
pipeline.addLast("encoder", new MessageEncoder());
// 4. 心跳处理器
pipeline.addLast("heartbeat", new HeartbeatHandler());
// 5. 登录处理器
pipeline.addLast("login", new LoginHandler());
// 6. 消息处理器
pipeline.addLast("message", new MessageHandler());
// 7. 异常处理器
pipeline.addLast("exception", new ExceptionHandler());
}
}
8.2 消息处理流程
客户端发送消息:
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Idle │───▶│ Decoder │───▶│ Heartbeat│───▶│ Login │───▶│ Message │
│ │ │ │ │ │ │ │ │ │
└────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │ │
│ 解码消息 心跳检测? 登录验证? 处理消息 │
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
空闲检测? MessageInfo 心跳响应? 登录响应? 业务处理
│
▼
超时关闭
九、总结
ChannelPipeline和ChannelHandler是Netty责任链模式的核心实现:
- Pipeline是双向链表:Head和Tail是固定节点,中间是用户添加的Handler
- Handler分两类:Inbound处理入站事件,Outbound处理出站事件
- Context是桥梁:连接Handler和Pipeline,提供事件传播能力
- 动态修改:Pipeline支持运行时增删Handler,实现协议升级等高级功能
- 编解码器:特殊的Handler,处理消息的序列化和反序列化
- 异常处理:异常沿Pipeline传播,需要统一处理
下一篇,我们将深入分析Netty的异步编程模型,揭示ChannelFuture和Promise的实现原理。
更多推荐


所有评论(0)