写在前面

前两篇文章我们分析了Netty的线程模型和核心组件Channel、EventLoop。但有了Channel和EventLoop还不够——网络数据是"生"的,需要经过解析、处理、转换才能被业务使用。这个数据处理的过程,就是由ChannelPipeline和ChannelHandler完成的。

Pipeline是Netty责任链模式的核心实现,Handler是链上的处理节点。理解Pipeline和Handler的工作原理,是掌握Netty编程的关键。

一、责任链模式回顾

1.1 什么是责任链模式

责任链模式是一种行为设计模式,允许将请求沿着处理链传递,直到有一个处理者能够处理它。

经典场景

  • Web框架的过滤器链
  • 日志框架的日志级别过滤
  • 审批流程的多级审批

传统实现

// 传统责任链实现
abstract class Handler {
    protected Handler next;
    
    public Handler setNext(Handler next) {
        this.next = next;
        return next;
    }
    
    public abstract void handle(Request request);
}

class HandlerA extends Handler {
    @Override
    public void handle(Request request) {
        // 处理逻辑
        if (next != null) {
            next.handle(request);
        }
    }
}

// 使用
Handler chain = new HandlerA();
chain.setNext(new HandlerB()).setNext(new HandlerC());
chain.handle(request);

1.2 Netty责任链的特点

Netty的责任链实现更加复杂和强大:

特性

传统责任链

Netty Pipeline

方向

单向

双向(入站/出站)

节点类型

单一类型

两种类型

上下文

ChannelHandlerContext

动态修改

困难

支持运行时增删Handler

事件传播

手动调用next

提供便捷方法

二、ChannelPipeline架构

2.1 Pipeline的结构

ChannelPipeline是一个双向链表,节点是ChannelHandlerContext:

┌─────────────────────────────────────────────────────────────────────────┐
│                        ChannelPipeline                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   Head ──▶ Context1 ──▶ Context2 ──▶ Context3 ──▶ ... ──▶ Tail         │
│    │          │            │            │                    │         │
│    │          ▼            ▼            ▼                    ▼         │
│    │      Handler1     Handler2     Handler3              (end)        │
│    │          │            │            │                              │
│    ▼          ▼            ▼            ▼                              │
│  HeadHandler                                                            │
│                                                                         │
│   入站事件流向(从左到右):Head → Handler1 → Handler2 → Handler3 → Tail │
│   出站事件流向(从右到左):Tail → Handler3 → Handler2 → Handler1 → Head│
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

2.2 DefaultChannelPipeline源码

public class DefaultChannelPipeline implements ChannelPipeline {
    
    // 头节点(固定)
    final AbstractChannelHandlerContext head;
    
    // 尾节点(固定)
    final AbstractChannelHandlerContext tail;
    
    // 所属Channel
    protected final Channel channel;
    
    // 初始化
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        
        // 创建头尾节点
        tail = new TailContext(this);
        head = new HeadContext(this);
        
        // 建立双向链表
        head.next = tail;
        tail.prev = head;
    }
    
    // 添加Handler到链尾
    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, 
            String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            
            // 创建Context
            newCtx = newContext(group, filterName(name, handler), handler);
            
            // 添加到链表
            addLast0(newCtx);
            
            // 如果已注册,触发handlerAdded事件
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            
            callHandlerAdded0(newCtx);
        }
        return this;
    }
    
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
}

2.3 HeadContext与TailContext

Pipeline有两个特殊的节点:Head和Tail。

HeadContext

final class HeadContext extends AbstractChannelHandlerContext 
        implements ChannelOutboundHandler, ChannelInboundHandler {
    
    private final Unsafe unsafe;
    
    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, HeadContext.class);
        unsafe = pipeline.channel().unsafe();
    }
    
    // 入站事件处理
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.fireChannelRead(msg);  // 继续传播
    }
    
    // 出站事件处理(最终执行I/O操作)
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        unsafe.write(msg, promise);  // 调用Unsafe进行实际写操作
    }
    
    @Override
    public void flush(ChannelHandlerContext ctx) {
        unsafe.flush();  // 调用Unsafe进行实际刷新
    }
    
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        unsafe.close(promise);  // 调用Unsafe关闭连接
    }
}

HeadContext的特点:

  • 既是入站Handler,也是出站Handler
  • 入站事件从这里开始传播
  • 出站事件在这里结束,调用Unsafe执行实际I/O操作

TailContext

final class TailContext extends AbstractChannelHandlerContext 
        implements ChannelInboundHandler {
    
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, TailContext.class);
    }
    
    // 入站事件的终点
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 如果消息到达Tail还没被处理,释放资源
        ReferenceCountUtil.release(msg);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 异常到达Tail,记录日志
        onUnhandledInboundException(cause);
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        // 用户事件到达Tail,释放资源
        ReferenceCountUtil.release(evt);
    }
}

TailContext的特点:

  • 只是入站Handler
  • 入站事件在这里结束
  • 负责资源释放和异常兜底

2.4 完整的事件传播图

┌─────────────────────────────────────────────────────────────────────────────┐
│                           事件传播完整流程                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  【入站事件】(从左到右)                                                      │
│                                                                             │
│  Channel.read()                                                             │
│       │                                                                     │
│       ▼                                                                     │
│  ┌────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌────────┐  │
│  │  Head  │───▶│ Decoder  │───▶│ Handler  │───▶│ Encoder  │───▶│  Tail  │  │
│  │        │    │ (Inbound)│    │(Inbound) │    │(Inbound) │    │        │  │
│  └────────┘    └──────────┘    └──────────┘    └──────────┘    └────────┘  │
│       │                                                             │       │
│       │                                                             ▼       │
│       │                                                        release(msg) │
│       │                                                                     │
│                                                                             │
│  【出站事件】(从右到左)                                                      │
│                                                                             │
│  ctx.writeAndFlush(msg)                                                     │
│       │                                                                     │
│       ▼                                                                     │
│  ┌────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌────────┐  │
│  │  Head  │◀───│ Encoder  │◀───│ Handler  │◀───│ Decoder  │◀───│  Tail  │  │
│  │        │    │(Outbound)│    │(Outbound)│    │(Outbound)│    │        │  │
│  └────────┘    └──────────┘    └──────────┘    └──────────┘    └────────┘  │
│       │                                                                     │
│       ▼                                                                     │
│  unsafe.write(msg)                                                          │
│       │                                                                     │
│       ▼                                                                     │
│  网络发送                                                                    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

三、ChannelHandler详解

3.1 Handler的两种类型

Netty将Handler分为两类:入站Handler和出站Handler。

入站Handler:处理入站数据,如读取数据、连接建立、连接关闭等事件。

public interface ChannelInboundHandler extends ChannelHandler {
    // Channel注册到EventLoop
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    
    // Channel从EventLoop注销
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    
    // Channel活跃(连接建立)
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    
    // Channel非活跃(连接断开)
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    
    // 读取数据
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    
    // 读取完成
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    
    // 用户事件触发
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    
    // Channel可写状态变化
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    
    // 异常捕获
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

出站Handler:处理出站数据,如写数据、关闭连接、绑定端口等操作。

public interface ChannelOutboundHandler extends ChannelHandler {
    // 绑定地址
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, 
            ChannelPromise promise) throws Exception;
    
    // 连接远程地址
    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
    
    // 断开连接
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
    // 关闭Channel
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
    // 从EventLoop注销
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
    // 读取数据(触发读操作)
    void read(ChannelHandlerContext ctx) throws Exception;
    
    // 写数据
    void write(ChannelHandlerContext ctx, Object msg, 
            ChannelPromise promise) throws Exception;
    
    // 刷新数据
    void flush(ChannelHandlerContext ctx) throws Exception;
}

3.2 适配器模式

直接实现接口需要实现所有方法,很繁琐。Netty提供了适配器类:

// 入站适配器
public class ChannelInboundHandlerAdapter implements ChannelInboundHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();  // 默认传播事件
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);  // 默认传播事件
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);  // 默认传播异常
    }
    
    // 其他方法默认也是传播事件...
}

// 出站适配器
public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 
            throws Exception {
        ctx.write(msg, promise);  // 默认传播事件
    }
    
    // 其他方法默认也是传播事件...
}

使用示例

// 只需要重写关心的方法
public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 只处理读事件
        System.out.println("Received: " + msg);
        ctx.fireChannelRead(msg);  // 继续传播
    }
}

3.3 SimpleChannelInboundHandler

对于消息处理,Netty提供了更便捷的SimpleChannelInboundHandler:

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
    
    private final TypeParameterMatcher matcher;
    private final boolean autoRelease;
    
    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
        this(inboundMessageType, true);
    }
    
    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, 
            boolean autoRelease) {
        this.matcher = TypeParameterMatcher.get(inboundMessageType);
        this.autoRelease = autoRelease;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            // 类型匹配
            if (matcher.match(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);  // 调用子类实现
            } else {
                release = false;
                ctx.fireChannelRead(msg);  // 不匹配则传播
            }
        } finally {
            // 自动释放消息
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
    
    // 子类实现
    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}

使用示例

public class StringHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        // 只处理String类型消息
        // 消息会自动释放,无需手动调用ReferenceCountUtil.release()
        System.out.println("Received string: " + msg);
    }
}

public class MessageHandler extends SimpleChannelInboundHandler<MessageInfo> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageInfo msg) {
        // 只处理MessageInfo类型消息
        processMessage(msg);
    }
}

四、ChannelHandlerContext详解

4.1 Context的作用

ChannelHandlerContext是Handler与Pipeline交互的桥梁:

┌─────────────────────────────────────────────────────────────────────┐
│                    ChannelHandlerContext                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      核心引用                                 │   │
│  │  - pipeline: 所属的Pipeline                                  │   │
│  │  - handler: 包装的Handler                                    │   │
│  │  - prev: 上一个Context                                       │   │
│  │  - next: 下一个Context                                       │   │
│  │  - executor: 专属的EventExecutor(可选)                      │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      核心能力                                 │   │
│  │  - 事件传播:fireChannelRead()、fireChannelActive()...       │   │
│  │  - 出站操作:write()、flush()、close()...                    │   │
│  │  - 属性存储:attr()、hasAttr()                               │   │
│  │  - 线程判断:executor().inEventLoop()                        │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

4.2 AbstractChannelHandlerContext源码

abstract class AbstractChannelHandlerContext 
        implements ChannelHandlerContext, ResourceLeakHint {
    
    // 双向链表指针
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    
    // 所属Pipeline
    private final DefaultChannelPipeline pipeline;
    
    // Handler名称
    private final String name;
    
    // 是否为入站/出站Handler
    private final boolean inbound;
    private final boolean outbound;
    
    // 有序执行器(可选,用于指定Handler的执行线程)
    private final EventExecutor executor;
    
    // 初始化
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, 
            EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
        this.pipeline = pipeline;
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.executor = executor;
        // 根据Handler类型设置标记
        this.inbound = ChannelHandlerMask.isInbound(handlerClass);
        this.outbound = ChannelHandlerMask.isOutbound(handlerClass);
    }
    
    // 获取Channel
    @Override
    public Channel channel() {
        return pipeline.channel();
    }
    
    // 获取Pipeline
    @Override
    public ChannelPipeline pipeline() {
        return pipeline;
    }
    
    // 获取EventLoop
    @Override
    public EventLoop eventLoop() {
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return (EventLoop) executor;
        }
    }
}

4.3 事件传播方法

Context提供了丰富的事件传播方法:

// 入站事件传播
@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}

@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

// 出站事件传播
@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}

@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
    write(msg, false, promise);
    return promise;
}

private void invokeChannelRead(AbstractChannelHandlerContext next, Object msg) {
    final Object m = pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

4.4 findContextInbound/Outbound

查找下一个能处理该事件的Context:

// 查找下一个入站Context
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor;
    do {
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}

// 查找下一个出站Context
private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor;
    do {
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}

// 判断是否跳过该Context
private static boolean skipContext(AbstractChannelHandlerContext ctx, 
        EventExecutor currentExecutor, int mask, int onlyMask) {
    // 检查是否为入站/出站Handler
    if ((ctx.executionMask & onlyMask) == 0) {
        return true;
    }
    // 检查是否支持该事件
    if ((ctx.executionMask & mask) == 0) {
        return true;
    }
    // 检查Executor是否相同
    if (ctx.executor == currentExecutor) {
        return false;
    }
    return false;
}

查找逻辑图

入站事件查找(向右找):
┌────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌────────┐
│  Head  │───▶│ Context1 │───▶│ Context2 │───▶│ Context3 │───▶│  Tail  │
│(Inbound)│   │(Outbound)│    │(Inbound) │    │(Outbound)│    │(Inbound)│
└────────┘    └──────────┘    └──────────┘    └──────────┘    └────────┘
     │              │               │               │              │
     │           跳过              找到            跳过            │
     │              │               │               │              │
     └──────────────┴───────────────┴───────────────┴──────────────┘
                              入站事件传播路径

出站事件查找(向左找):
┌────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌────────┐
│  Head  │◀───│ Context1 │◀───│ Context2 │◀───│ Context3 │◀───│  Tail  │
│(Outbound)│  │(Outbound)│    │(Inbound) │    │(Outbound)│    │        │
└────────┘    └──────────┘    └──────────┘    └──────────┘    └────────┘
     │              │               │               │              │
   找到           找到            跳过            找到             │
     │              │               │               │              │
     └──────────────┴───────────────┴───────────────┴──────────────┘
                              出站事件传播路径

五、Pipeline动态修改

5.1 运行时添加Handler

// 在Pipeline末尾添加
pipeline.addLast("handler", new MyHandler());

// 在指定位置添加
pipeline.addBefore("handler", "newHandler", new NewHandler());
pipeline.addAfter("handler", "newHandler", new NewHandler());

// 在Pipeline头部添加
pipeline.addFirst("firstHandler", new FirstHandler());

5.2 运行时移除Handler

// 根据名称移除
pipeline.remove("handler");

// 根据Handler实例移除
pipeline.remove(myHandler);

// 移除所有Handler
pipeline.removeFirst();
pipeline.removeLast();

5.3 动态修改的应用场景

场景一:协议升级

public class HttpUpgradeHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
        if ("websocket".equalsIgnoreCase(request.headers().get("Upgrade"))) {
            // 移除HTTP处理器
            pipeline.remove("httpHandler");
            
            // 添加WebSocket处理器
            pipeline.addLast("websocketHandler", new WebSocketHandler());
            
            // 握手响应
            handleWebSocketHandshake(ctx, request);
        } else {
            // 正常HTTP处理
            ctx.fireChannelRead(request);
        }
    }
}

场景二:SSL动态开启

public class SslDetectionHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 检测是否为SSL/TLS握手
        if (in.readableBytes() >= 5) {
            int firstByte = in.getByte(in.readerIndex()) & 0xFF;
            
            if (firstByte == 0x16) {  // SSL/TLS握手
                // 添加SSL处理器
                SslHandler sslHandler = sslContext.newHandler(ctx.alloc());
                pipeline.addBefore("decoder", "ssl", sslHandler);
            }
            
            // 移除自己
            pipeline.remove(this);
        }
    }
}

场景三:登录后添加业务Handler

public class LoginHandler extends SimpleChannelInboundHandler<LoginRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequest request) {
        if (authenticate(request)) {
            // 登录成功,添加业务Handler
            pipeline.addLast("messageHandler", new MessageHandler());
            pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
            
            // 移除登录Handler
            pipeline.remove(this);
            
            // 响应登录成功
            ctx.writeAndFlush(new LoginResponse(true));
        } else {
            ctx.writeAndFlush(new LoginResponse(false));
        }
    }
}

六、编解码器位置与设计

6.1 编解码器的正确位置

编解码器是特殊的Handler,位置至关重要:

正确的Pipeline配置:

┌────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌────────┐
│  Head  │───▶│  Decoder │───▶│ Business │───▶│  Encoder │───▶│  Tail  │
│        │    │ (Inbound) │    │ Handler  │    │(Outbound)│    │        │
└────────┘    └──────────┘    └──────────┘    └──────────┘    └────────┘

入站数据流:网络字节流 → Decoder解码 → 业务对象 → Handler处理
出站数据流:业务对象 → Encoder编码 → 网络字节流 → 网络发送

6.2 MessageToMessageCodec

Netty提供了便捷的编解码基类:

/**
 * 消息到消息的编解码器
 * @param <INBOUND>  入站消息类型
 * @param <OUTBOUND> 出站消息类型
 */
public abstract class MessageToMessageCodec<INBOUND, OUTBOUND> 
        extends ChannelDuplexHandler {
    
    private final MessageToMessageEncoder<OUTBOUND> encoder = new Encoder();
    private final MessageToMessageDecoder<INBOUND> decoder = new Decoder();
    
    // 解码方法(子类实现)
    protected abstract void decode(ChannelHandlerContext ctx, INBOUND msg, 
            List<Object> out) throws Exception;
    
    // 编码方法(子类实现)
    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND msg, 
            List<Object> out) throws Exception;
}

使用示例

public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
    
    private static final int HEADER_LENGTH = 8;
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        // 读取消息头
        long length = msg.readLong();
        int type = msg.readInt();
        
        // 读取消息体
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);
        
        // 解析为消息对象
        Message message = parseMessage(type, bytes);
        out.add(message);
    }
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) {
        ByteBuf buffer = ctx.alloc().buffer();
        
        // 写入消息头
        byte[] body = serializeMessage(msg);
        buffer.writeLong(HEADER_LENGTH + body.length);
        buffer.writeInt(msg.getType());
        
        // 写入消息体
        buffer.writeBytes(body);
        
        out.add(buffer);
    }
}

6.3 ByteToMessageDecoder

处理粘包拆包的利器:

/**
 * 字节到消息的解码器
 * 自动处理粘包拆包问题
 */
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    
    // 累积缓冲区
    ByteBuf cumulation;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    // 累积数据
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 调用解码方法
                callDecode(ctx, cumulation, out);
            } finally {
                // ...
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    // 解码方法(子类实现)
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, 
            List<Object> out) throws Exception;
}

自定义协议解码器

public class CustomProtocolDecoder extends ByteToMessageDecoder {
    
    private static final int MIN_FRAME_LENGTH = 12;  // 最小帧长度
    private static final int MAX_FRAME_LENGTH = 65535;  // 最大帧长度
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 检查是否有足够的数据读取帧头
        if (in.readableBytes() < MIN_FRAME_LENGTH) {
            return;  // 等待更多数据
        }
        
        // 标记读取位置
        in.markReaderIndex();
        
        // 读取帧长度
        int frameLength = in.readInt();
        
        // 校验帧长度
        if (frameLength < MIN_FRAME_LENGTH || frameLength > MAX_FRAME_LENGTH) {
            in.skipBytes(in.readableBytes());  // 丢弃非法数据
            throw new CorruptedFrameException("Invalid frame length: " + frameLength);
        }
        
        // 检查是否有完整的帧
        if (in.readableBytes() < frameLength - 4) {
            in.resetReaderIndex();  // 重置读取位置,等待更多数据
            return;
        }
        
        // 读取帧内容
        int type = in.readInt();
        byte[] body = new byte[frameLength - 8];
        in.readBytes(body);
        
        // 创建消息对象
        Message message = new Message(type, body);
        out.add(message);
    }
}

七、异常处理

7.1 异常传播机制

异常会沿着Pipeline传播,直到被处理:

// 异常传播
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    ctx.fireExceptionCaught(cause);  // 传播到下一个Handler
}

// 如果没有Handler处理,最终到达TailContext
final class TailContext extends AbstractChannelHandlerContext 
        implements ChannelInboundHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 记录警告日志
        logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                "It usually means the last handler in the pipeline did not handle the exception.", cause);
    }
}

7.2 统一异常处理

建议在Pipeline末尾添加统一的异常处理器:

public class ExceptionHandler extends ChannelDuplexHandler {
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 根据异常类型处理
        if (cause instanceof IOException) {
            // I/O异常,关闭连接
            logger.warn("I/O exception: {}", cause.getMessage());
            ctx.close();
        } else if (cause instanceof CorruptedFrameException) {
            // 协议异常,记录并关闭
            logger.error("Corrupted frame: {}", cause.getMessage());
            ctx.close();
        } else if (cause instanceof TimeoutException) {
            // 超时异常
            logger.warn("Timeout: {}", cause.getMessage());
            ctx.writeAndFlush(new ErrorMessage("Request timeout"));
        } else {
            // 未知异常
            logger.error("Unexpected exception", cause);
            ctx.writeAndFlush(new ErrorMessage("Internal server error"));
        }
    }
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 监听写操作异常
        promise.addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                exceptionCaught(ctx, future.cause());
            }
        });
        ctx.write(msg, promise);
    }
}

// Pipeline配置
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("handler", new BusinessHandler());
pipeline.addLast("exceptionHandler", new ExceptionHandler());  // 放在最后

八、实战:构建完整的Pipeline

8.1 IM系统Pipeline配置

/**
 * 即时通讯系统Pipeline配置
 */
public class IMChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        // 1. 空闲检测(60秒读超时)
        pipeline.addLast("idle", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
        
        // 2. 消息解码器
        pipeline.addLast("decoder", new MessageDecoder());
        
        // 3. 消息编码器
        pipeline.addLast("encoder", new MessageEncoder());
        
        // 4. 心跳处理器
        pipeline.addLast("heartbeat", new HeartbeatHandler());
        
        // 5. 登录处理器
        pipeline.addLast("login", new LoginHandler());
        
        // 6. 消息处理器
        pipeline.addLast("message", new MessageHandler());
        
        // 7. 异常处理器
        pipeline.addLast("exception", new ExceptionHandler());
    }
}

8.2 消息处理流程

客户端发送消息:
┌────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  Idle  │───▶│  Decoder │───▶│ Heartbeat│───▶│  Login   │───▶│ Message  │
│        │    │          │    │          │    │          │    │          │
└────────┘    └──────────┘    └──────────┘    └──────────┘    └──────────┘
     │              │               │               │               │
     │         解码消息         心跳检测?        登录验证?        处理消息    │
     │              │               │               │               │
     ▼              ▼               ▼               ▼               ▼
  空闲检测?     MessageInfo     心跳响应?        登录响应?       业务处理
     │
     ▼
  超时关闭

九、总结

ChannelPipeline和ChannelHandler是Netty责任链模式的核心实现:

  1. Pipeline是双向链表:Head和Tail是固定节点,中间是用户添加的Handler
  2. Handler分两类:Inbound处理入站事件,Outbound处理出站事件
  3. Context是桥梁:连接Handler和Pipeline,提供事件传播能力
  4. 动态修改:Pipeline支持运行时增删Handler,实现协议升级等高级功能
  5. 编解码器:特殊的Handler,处理消息的序列化和反序列化
  6. 异常处理:异常沿Pipeline传播,需要统一处理

下一篇,我们将深入分析Netty的异步编程模型,揭示ChannelFuture和Promise的实现原理。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐