相关文章链接

位运算详解

waken方法详解

ThreadPerTaskExecutor与线程创建详解

processSelectedKeys() vs runAllTasks()

NioServerSocketChannel-Unsafe初始化详解

NioEventLoop的run方法详解

NioEventLoopGroup深度解析

inEventLoop方法详解

executionMask详解

Netty源码分析–认真系列(一)

Netty源码分析–认真系列(二)

Netty源码分析–认真系列(二)

📚 本文详细讲解 Netty 中入站事件和出站事件的传播机制,以及编解码器的工作原理


📖 目录


一、入站事件处理

在上面我们说完了 Channel、ChannelPipeline、也介绍了一下 ChannelHandler 是什么,我们接下来说说接收到一条消息会发生什么,也就是入站消息处理。

1.1 什么是入站事件?

入站事件(Inbound Event) 是指从网络底层向应用层传播的事件,代表数据从外部进入到应用程序的过程。

📋 常见的入站事件
事件方法 触发时机 说明
channelRegistered() Channel 注册到 EventLoop 连接初始化阶段
channelActive() Channel 激活 连接建立成功
channelRead() 读取到数据 接收到网络数据
channelReadComplete() 数据读取完成 一次读取操作结束
channelInactive() Channel 失活 连接断开

1.2 入站事件的传播方向

💡 核心规则:入站事件从 head 节点开始,向 tail 节点传播

head → Handler1(Inbound) → Handler2(Inbound) → Handler3(Inbound) → tail

⚠️ 注意:这与出站事件相反(出站事件从 tail 到 head)

1.3 入站事件触发的完整流程

我们以 OP_ACCEPT 事件(接收新客户端连接)为例,看看接收消息之后会发生什么。

步骤 1️⃣:NioEventLoop 检测事件

服务端启动后,ServerSocketChannel 会注册到 bossGroup 的 EventLoop,EventLooprun() 方法会不断循环,检测网络事件。

// NioEventLoop.java
protected void run() {
    for (;;) {  // 无限循环
        try {
            // 阻塞等待事件(或定时等待)
            select(wakenUp.getAndSet(false));
            
            if (ioRatio == 100) {
                try {
                    // 处理所有就绪的 I/O 事件
                    processSelectedKeys();
                } finally {
                    // 执行任务队列中的任务
                    runAllTasks();
                }
            } 
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

📌 关键点select() 会阻塞等待,直到有事件就绪或超时。当客户端发起连接时,Selector 会检测到 OP_ACCEPT 事件。

步骤 2️⃣:处理就绪的事件
// NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 获取 Channel 的 Unsafe 对象
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    
    try {
        // 🔥 这是入站事件的关键触发点!
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 调用 Unsafe 的 read 方法
            // 对于 ServerSocketChannel,这会接受新连接
            // 对于 SocketChannel,这会读取数据
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

📖 扩展阅读:关于 unsafe 实例的详细初始化过程,请参考 NioServerSocketChannel-Unsafe初始化详解.md 文档

步骤 3️⃣:触发 channelRead 事件
// NioMessageUnsafe
public void read() {
    // 接受新连接,返回接受的连接数
    // doReadMessages 会调用 ServerSocketChannel.accept()
    // 并将新的 SocketChannel 添加到 readBuf 中
    int localRead = doReadMessages(readBuf);
    
    int size = readBuf.size();
    for (int i = 0; i < size; i++) {
        readPending = false;
        // 🔥 关键:触发 channelRead 事件!
        // readBuf.get(i) 是新接受的 NioSocketChannel
        pipeline.fireChannelRead(readBuf.get(i));
    }
    
    readBuf.clear();
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
}
步骤 4️⃣:从 head 开始传播事件
// DefaultChannelPipeline.java
public final ChannelPipeline fireChannelRead(Object msg) {
    //注意看这里将head传入
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

// AbstractChannelHandlerContext.java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    
    // 判断是否在 EventLoop 线程中
    if (executor.inEventLoop()) {
        // 在 EventLoop 线程中,直接调用
        next.invokeChannelRead(m);
    } else {
        // 不在 EventLoop 线程中,提交任务到 EventLoop
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

// AbstractChannelHandlerContext.java
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            // 调用当前 Handler 的 channelRead 方法
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        // Handler 还未添加完成,继续传播
        fireChannelRead(msg);
    }
}


步骤 5️⃣:查找下一个入站 Handler

findContextInbound() 方法负责查找下一个入站 Handler:

// AbstractChannelHandlerContext.java
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        // 向后遍历链表
        ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}
🔍 executionMask 机制详解

executionMaskChannelHandlerContext 的一个整型字段,用位掩码的方式记录当前 Handler 实现了哪些方法。

位掩码定义:

static final int MASK_EXCEPTION_CAUGHT = 1;              // 0000 0001
static final int MASK_CHANNEL_REGISTERED = 1 << 1;       // 0000 0010
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;     // 0000 0100
static final int MASK_CHANNEL_ACTIVE = 1 << 3;           // 0000 1000
static final int MASK_CHANNEL_INACTIVE = 1 << 4;         // 0001 0000
static final int MASK_CHANNEL_READ = 1 << 5;             // 0010 0000
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;    // 0100 0000
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;     // 1000 0000

工作原理:

  • 例如 headHandler 实现 channelRead,那 executionMask = 0010 0000
  • 若还实现了 channelRegistered 方法,executionMask = 0010 0010
  • 使用位与运算 & 判断:两个都是 1,结果才是 1
  • 传进来的 mask = 1 << 5 = 0010 0000,找到下一个实现了 channelRead 方法的 handler

📖 扩展阅读:关于 executionMask 的详细解释请参考 executionMask详解.md 文档
📖 扩展阅读:关于位运算相关知识请参考 位运算详解.md 文档

1.4 Pipeline 配置示例

在讲后续流程之前,我们先看一个简单的 pipeline 配置:

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) {
        // 在pipeline中添加自定义的业务handler
        channel.pipeline().addLast(new MyServerHandler());
    }
}

// 自定义的业务handler
public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 处理接收到的数据
        ByteBuf buf = (ByteBuf) msg;
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);
        System.out.println("收到消息:" + new String(data, "UTF-8"));
        
        // 继续向后传播(如果还有其他handler的话)
        ctx.fireChannelRead(msg);
    }
}

Pipeline 结构:

head → MyServerHandler → tail

1.5 从 head 到业务 handler 的传播

回到主线,我们刚才说到 head 处理完成后会调用 ctx.fireChannelRead(msg) 继续传播事件,这个方法会调用 findContextInbound(MASK_CHANNEL_READ) 从 head 开始向后遍历,找到第一个实现了 channelRead 方法的 handler,也就是我们的 MyServerHandler。找到之后,又会调用 invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    
    if (executor.inEventLoop()) {
        // 在EventLoop线程中,直接调用
        next.invokeChannelRead(m);
    } else {
        // 不在EventLoop线程中,提交任务
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            // 调用MyServerHandler的channelRead方法
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

这里会调用到我们自定义的 MyServerHandler 的 channelRead 方法,我们在这个方法里处理业务逻辑,比如解析数据、处理请求等。在 MyServerHandler 中,我们可以处理接收到的数据,如果需要继续传播给后面的 handler,就调用 ctx.fireChannelRead(msg),事件会继续向后传播,重复前面的流程:findContextInbound() -> invokeChannelRead() -> 下一个 handler 的 channelRead()。如果我们没有调用 ctx.fireChannelRead(msg),事件传播就在这里终止了。

1.6 最终到达 tail

如果事件一直传播,最终会到达 tail 节点:

// TailContext.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // tail会释放msg(如果是ByteBuf的话)
    onUnhandledInboundMessage(msg);
}

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        // 释放资源
        ReferenceCountUtil.release(msg);
    }
}

⚠️ 重要:tail 会检查 msg 是否被处理过,如果没有被处理(比如 ByteBuf 没有被读取),会打印警告日志,然后释放资源。这就是为什么我们在 handler 中要么消费掉 msg,要么继续传播给下一个 handler。

1.7 流程总结

完整调用链
NioEventLoop.run()
    ↓
processSelectedKey()
    ↓
unsafe.read()
    ↓
pipeline.fireChannelRead()
    ↓
head.channelRead()
    ↓
findContextInbound()
    ↓
业务Handler.channelRead()
    ↓
tail.channelRead()
核心要点
  • ✅ 入站事件从 headtail 传播
  • ✅ 通过 findContextInbound() 查找下一个处理器
  • ✅ 使用 executionMask 位掩码匹配方法
  • ✅ tail 负责释放未处理的资源
  • ✅ 每个 handler 都有机会处理事件

二、出站事件处理

上面我们说了入站事件,现在来说说出站事件。

💡 核心概念

  • 入站事件:数据从外部进入到应用程序的过程
  • 出站事件:数据从应用程序发送到外部的过程

下面我们以发送消息为案例聊聊出站事件。

2.1 Pipeline 配置示例

2.2 出站事件的传播方向

💡 核心规则:出站事件从当前位置head 节点传播(往前传播)

当前位置示意:

head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
                                                  ↑
                                              当前位置

2.3 出站事件传播流程

步骤 1️⃣:调用 write 方法
pipeline.addLast("decoder", new StringDecoder());      // InboundHandler
pipeline.addLast("encoder", new StringEncoder());      // OutboundHandler
pipeline.addLast("logger", new LoggingHandler());      // InboundHandler
pipeline.addLast("auth", new AuthHandler());           // OutboundHandler
pipeline.addLast("business", new BusinessHandler());   // InboundHandler

这里的 AuthHandler 是一个自定义的出站 handler,用来给消息添加认证信息:

public class AuthHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 给消息添加认证头
        String authMsg = "[AUTH]" + msg;
        // 继续向前传播
        ctx.write(authMsg, promise);
    }
}

结构图:

head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
(Out)    (In)       (Out)      (In)     (Out)    (In)
  ↑                                                ↑
头部                                              尾部

出站事件的开始就是往外发送消息,也就是在 handler 中调用 write 方法发送消息,例如在 BusinessHandler 中调用 write 方法:

public class BusinessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write("Hello");  // 从这里开始
    }
}

当前位置

head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
                                                  ↑
                                              当前位置

从 write 方法开始,我们来看看 write 方法:

ctx.write(str);

//AbstractChannelHandlerContext
public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        write(msg, false, promise);

        return promise;
    }

private void write(Object msg, boolean flush, ChannelPromise promise) {
    	//检查 msg 不能为 null
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            // 2. 检查 promise 是否有效
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);  // 释放消息
                return;  // 直接返回,不继续处理
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);  // 出错也要释放消息
            throw e;
        }
		// 3. 查找下一个出站处理器
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
   		// 6. 判断当前线程是否是 EventLoop 线程
        if (executor.inEventLoop()) {
             // 情况1:在 EventLoop 线程中,直接调用
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            // 情况2:不在 EventLoop 线程中,提交任务
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                task.cancel();
            }
        }
    }
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
       (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);

static final int MASK_BIND = 1 << 9;                  // 0010 0000 0000
static final int MASK_CONNECT = 1 << 10;              // 0100 0000 0000
static final int MASK_DISCONNECT = 1 << 11;           // 1000 0000 0000
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;                // 1000 0000 0000 0000
static final int MASK_FLUSH = 1 << 16;

我们来看到这行代码,这行代码的作用是查找下一个出站处理器,准确点来说是查找下一个实现 write() 方法的 handler:

private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    
    do {
        ctx = ctx.prev;  // 往前移动
    } while ((ctx.executionMask & mask) == 0);  // 判断是否实现了指定方法
    
    return ctx;
}

我们这里点进去看 findContextOutbound 方法的源码,发现和上面说的 findContextInbound 方法是一样的,就不过多赘述了。

接下来看 invokeWrite 方法:

next.invokeWrite(m, promise);
void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        // 调用 Handler 的 write 方法
        invokeWrite0(msg, promise);
    } else {
        // Handler 还没准备好,继续传递
        write(msg, promise);
    }
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        // 调用 handler 的 write 方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

在 invokeWrite0 方法内部可以看见,会调用 handler 的 write 方法。根据我们的 pipeline 配置,从 business 位置开始,通过 findContextOutbound() 往前查找,会找到 auth 这个 OutboundHandler。

2.1 AuthHandler 处理

// AuthHandler.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // 给消息添加认证头
    String authMsg = "[AUTH]" + msg;
    // 继续向前传播
    ctx.write(authMsg, promise);
}

当前位置

head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
                                         ↑
                                     当前位置

AuthHandler 处理完消息后,调用 ctx.write(authMsg, promise) 继续向前传播,又会重复前面的流程:findContextOutbound() -> invokeWrite() -> 下一个 handler 的 write()。这次会找到 encoder。

2.2 StringEncoder 处理

// StringEncoder.java (Netty内置)
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // 将String编码为ByteBuf
    ByteBuf buf = ...;  // 编码逻辑
    ctx.write(buf, promise);  // 继续传播
}

当前位置

head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
                      ↑
                  当前位置

encoder 将 String 编码成 ByteBuf 后,继续调用 ctx.write() 向前传播。这时候再往前找,decoder 是 InboundHandler,不会被找到,会直接跳过,最终找到 head。

2.4 最终到达 head

// HeadContext.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

当前位置:

head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
 ↑
当前位置

📌 关键点:head 是出站事件的终点,它会调用 unsafe.write() 将数据写入到底层的 SocketChannel。但是注意,这里只是把数据写到了 Netty 的缓冲区,还没有真正发送到网络。

// AbstractUnsafe.java
public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    // 将消息添加到出站缓冲区
    outboundBuffer.addMessage(msg, promise);
}
🔥 writeAndFlush 的完整流程

如果我们调用的是 ctx.writeAndFlush(),那么在 write 完成后,还会触发 flush 事件:

// AbstractUnsafe.java
public final void flush() {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    outboundBuffer.addFlush();
    flush0();  // 真正的发送逻辑
}

protected void flush0() {
    doWrite(outboundBuffer);  // 调用JDK的SocketChannel.write()
}

2.5 流程总结

完整调用链
业务Handler.channelRead()
    ↓
ctx.write()
    ↓
findContextOutbound()
    ↓
auth.write() (认证处理)
    ↓
encoder.write() (编码)
    ↓
head.write()
    ↓
unsafe.write() (写入缓冲区)
    ↓
[如果是writeAndFlush]
    ↓
unsafe.flush() (真正发送)
核心要点
  • ✅ 出站事件从当前位置head 传播
  • ✅ 通过 findContextOutbound() 查找下一个出站处理器
  • ✅ head 调用 unsafe.write() 写入缓冲区
  • writeAndFlush() 会触发 flush 事件真正发送数据
  • ✅ 每个 OutboundHandler 都有机会处理事件

三、解码器详解

3.1 为什么需要解码器?

网络传输的本质

在网络通信中,所有数据都必须以字节流的形式传输:

应用层数据(Java对象、字符串等) 
    ↓ 编码
字节流(网络传输)
    ↓ 解码
应用层数据(Java对象、字符串等)
TCP 粘包/拆包问题

TCP 是面向流的协议,数据像水流一样连续传输,没有明确的消息边界:

发送端发送:
消息1: [Hello]
消息2: [World]

接收端可能收到:
情况1(粘包): [HelloWorld]
情况2(拆包): [Hel] [loWorld]
情况3(混合): [HelloWor] [ld]

🎯 解码器的核心职责:从连续的字节流中正确地识别和提取完整的消息


3.2 ByteToMessageDecoder 深度解析

📌 定义:ByteToMessageDecoder 是将字节(Byte)解码为消息对象(Message)的解码器
输入:ByteBuf
输出:自定义的消息对象

核心机制

ByteToMessageDecoder 是 Netty 中最基础的解码器,它的工作原理:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    // 累积缓冲区 - 这是关键!
    ByteBuf cumulation;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf data = (ByteBuf) msg;
            
            // 1. 将新数据追加到累积缓冲区
            cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            
            // 2. 尝试解码
            callDecode(ctx, cumulation, out);
            
            // 3. 如果解码成功,将结果传递给下一个Handler
            fireChannelRead(ctx, out);
        }
    }
    
    protected abstract void decode(ChannelHandlerContext ctx, 
                                   ByteBuf in, 
                                   List<Object> out) throws Exception;
}
累积缓冲区(Cumulation)机制

🔑 这是理解解码器的关键!

第一次接收: [0x01, 0x02]
cumulation: [0x01, 0x02]  → decode() 被调用,数据不够,不解码

第二次接收: [0x03, 0x04]
cumulation: [0x01, 0x02, 0x03, 0x04]  → decode() 被调用,数据够了,解码成功

解码后:
cumulation: []  → 清空或保留剩余数据
out: [解码后的对象]  → 传递给下一个Handler

关键点:

  • cumulation 会一直保存未解码的数据
  • ✅ 每次新数据到达,都会追加到 cumulation
  • decode() 方法会被反复调用,直到无法解码更多数据
decode() 方法的正确实现
public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 关键:检查可读字节数
        if (in.readableBytes() >= 4) {
            // 读取4个字节并转换为int
            out.add(in.readInt());
        }
        // 如果数据不够,什么都不做,等待下次调用
    }
}

重要理解:

  1. in 参数就是 cumulation,包含了所有累积的数据
  2. ✅ 如果数据不够,直接返回,不要抛异常
  3. ✅ 读取数据后,inreaderIndex 会自动前移
  4. ✅ 添加到 out 的对象会自动传递给下一个Handler
完整的数据流示例

假设我们要解码一个协议:[长度(4字节)][数据]

public class LengthFieldDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 第一步:检查是否有足够的字节读取长度字段
        if (in.readableBytes() < 4) {
            return; // 数据不够,等待更多数据
        }
        
        // 第二步:标记当前读位置(重要!)
        in.markReaderIndex();
        
        // 第三步:读取长度
        int length = in.readInt();
        
        // 第四步:检查是否有足够的数据
        if (in.readableBytes() < length) {
            in.resetReaderIndex(); // 重置读位置
            return; // 数据不够,等待更多数据
        }
        
        // 第五步:读取实际数据
        byte[] data = new byte[length];
        in.readBytes(data);
        
        // 第六步:添加到输出列表
        out.add(new String(data));
    }
}

数据流演示

接收数据1: [0x00, 0x00, 0x00, 0x05, 0x48]
  ↓
cumulation: [0x00, 0x00, 0x00, 0x05, 0x48]
            └─────长度字段─────┘ └─数据─┘
                  (值=5)        (1字节)
  ↓
decode() 执行过程:
  1. in.readableBytes() = 5 (总共5字节)
  2. 检查:5 >= 4 ✓ (可以读取长度字段)
  3. in.markReaderIndex() (标记位置0)
  4. length = in.readInt() = 5 (读取4字节,readerIndex移到4)
  5. 此时 in.readableBytes() = 1 (剩余1字节)
  6. 检查:1 < 5 ✗ (数据不够!需要5字节,只有1字节)
  7. in.resetReaderIndex() (重置到位置0)
  8. return (等待更多数据)

接收数据2: [0x65, 0x6C, 0x6C, 0x6F]
  ↓
cumulation: [0x00, 0x00, 0x00, 0x05, 0x48, 0x65, 0x6C, 0x6C, 0x6F]
            └─────长度字段─────┘ └──────5字节数据──────┘
                  (值=5)              "Hello"
  ↓
decode() 执行过程:
  1. in.readableBytes() = 9 (总共9字节)
  2. 检查:9 >= 4 ✓
  3. in.markReaderIndex()
  4. length = in.readInt() = 5 (readerIndex移到4)
  5. 此时 in.readableBytes() = 5 (剩余5字节)
  6. 检查:5 >= 5 ✓ (数据够了!)
  7. 读取5字节:[0x48, 0x65, 0x6C, 0x6C, 0x6F]
  8. 转换为字符串:"Hello"
  9. out.add("Hello")
  ↓
cumulation: [] (数据已全部读取,清空)
out: ["Hello"] (传递给下一个Handler)

关键理解:

  • ❌ 第一次接收了5个字节:[长度字段4字节] + [数据1字节]
  • 📍 读取长度字段后,readerIndex 从 0 移动到 4
  • 📊 此时 readableBytes() = 总字节数(5) - readerIndex(4) = 1字节
  • ⚠️ 但长度字段的值是5,表示需要5字节的数据
  • ❌ 所以只有1字节数据,不够!

3.3 ReplayingDecoder 深度解析

📌 定义:ReplayingDecoder 简化了 ByteToMessageDecoder 的编码方式,自动处理数据不足的情况

为什么需要 ReplayingDecoder?

使用 ByteToMessageDecoder 时,我们需要不断检查数据是否足够:

// 繁琐的检查
if (in.readableBytes() < 4) return;
int length = in.readInt();
if (in.readableBytes() < length) return;

ReplayingDecoder 简化了这个过程:

public class SimpleDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 不需要检查!直接读取
        int length = in.readInt();
        byte[] data = new byte[length];
        in.readBytes(data);
        out.add(new String(data));
    }
}
ReplayingDecoder 的魔法原理

ReplayingDecoder 使用了一个特殊的 ByteBuf 包装器:

class ReplayingDecoderByteBuf extends ByteBuf {
    @Override
    public int readInt() {
        // 检查是否有足够的字节
        if (readerIndex + 4 > writerIndex) {
            // 抛出特殊的信号异常
            throw REPLAY;
        }
        return super.readInt();
    }
}

工作流程:

1. decode() 开始执行
2. 调用 in.readInt()
3. ReplayingDecoderByteBuf 检查:只有2字节,不够!
4. 抛出 REPLAY 异常
5. ReplayingDecoder 捕获异常
6. 重置 readerIndex,等待更多数据
7. 新数据到达,重新执行 decode()

3.4 MessageToMessageDecoder 深度解析

📌 定义:MessageToMessageDecoder 是将一种消息对象转换为另一种消息对象的解码器
输入:消息对象A
输出:消息对象B

与 ByteToMessageDecoder 的区别
ByteToMessageDecoder:     ByteBuf → 消息对象
MessageToMessageDecoder:  消息对象A → 消息对象B

💡 应用场景:通常用在第二层解码,对已经解码出来的消息对象做进一步的转换


3.5 解码器选择指南

场景 推荐解码器 原因
简单协议,性能要求高 ByteToMessageDecoder 最灵活,性能最好
复杂协议,代码简洁优先 ReplayingDecoder 代码简单,但性能稍差
消息类型转换 MessageToMessageDecoder 专门用于类型转换
已有成熟协议 Netty内置解码器 如 LineBasedFrameDecoder
核心要点
  • 累积缓冲区是解码器的核心机制
  • decode() 会被多次调用,不要保存状态
  • 正确移动 readerIndex,避免无限循环
  • 注意内存管理,必要时使用 retain()
  • 使用状态机处理复杂协议

四、编码器详解

场景1:协议转换

// 第一层:字节 → Integer
public class ByteToIntDecoder extends ByteToMessageDecoder {
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() >= 4) {
            out.add(in.readInt());
        }
    }
}

// 第二层:Integer → String
public class IntToStringDecoder extends MessageToMessageDecoder<Integer> {
    protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) {
        out.add("Number: " + msg);
    }
}

// Pipeline 配置
pipeline.addLast(new ByteToIntDecoder());
pipeline.addLast(new IntToStringDecoder());
pipeline.addLast(new MyBusinessHandler());  // 接收 String

场景2:消息过滤和转换

public class HttpToCustomDecoder extends MessageToMessageDecoder<HttpRequest> {
    @Override
    protected void decode(ChannelHandlerContext ctx, HttpRequest msg, List<Object> out) {
        // 将 HTTP 请求转换为自定义对象
        CustomRequest request = new CustomRequest();
        request.setMethod(msg.method().name());
        request.setUri(msg.uri());
        request.setHeaders(msg.headers());
        
        out.add(request);
    }
}

4.3 类型匹配机制详解

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
    private final TypeParameterMatcher matcher;
    
    protected MessageToMessageDecoder() {
        // 通过反射获取泛型类型
        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }
    
    public boolean acceptInboundMessage(Object msg) {
        // 检查消息类型是否匹配
        return matcher.match(msg);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (acceptInboundMessage(msg)) {
            // 类型匹配,进行解码
            decode(ctx, (I) msg, out);
        } else {
            // 类型不匹配,直接传递给下一个Handler
            ctx.fireChannelRead(msg);
        }
    }
}

示例

public class IntegerDecoder extends MessageToMessageDecoder<Integer> {
    // 泛型 I = Integer
}

// 消息流
pipeline.fireChannelRead("Hello");   // 不匹配,直接传递
pipeline.fireChannelRead(123);       // 匹配,调用 decode()
pipeline.fireChannelRead(45.6);      // 不匹配,直接传递

五、解码器的内存管理

5.1 自动释放机制

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        if (acceptInboundMessage(msg)) {
            I cast = (I) msg;
            try {
                decode(ctx, cast, out);
            } finally {
                // 关键:自动释放消息
                ReferenceCountUtil.release(cast);
            }
        }
    } finally {
        // 传递解码后的消息
        for (Object decoded : out) {
            ctx.fireChannelRead(decoded);
        }
    }
}

5.2 何时需要 retain()?

public class MyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() >= 4) {
            // 场景1:直接读取数据(推荐)
            int value = in.readInt();
            out.add(value);  // ✅ 不需要 retain
            
            // 场景2:传递 ByteBuf 的切片
            ByteBuf slice = in.readSlice(4);
            out.add(slice);  // ❌ 错误!slice 会被自动释放
            
            // 正确做法
            ByteBuf slice = in.readSlice(4);
            out.add(slice.retain());  // ✅ 增加引用计数
        }
    }
}

六、实战:自定义协议解码器

6.1 协议定义

+--------+--------+------------+
| Length | Type   | Data       |
| 4 bytes| 1 byte | N bytes    |
+--------+--------+------------+

6.2 完整实现

public class CustomProtocolDecoder extends ByteToMessageDecoder {
    
    private static final int HEADER_SIZE = 5;  // 4 + 1
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 步骤1:检查头部是否完整
        if (in.readableBytes() < HEADER_SIZE) {
            return;
        }
        
        // 步骤2:标记读位置
        in.markReaderIndex();
        
        // 步骤3:读取长度和类型
        int length = in.readInt();
        byte type = in.readByte();
        
        // 步骤4:验证长度
        if (length < 0 || length > 1024 * 1024) {  // 最大1MB
            throw new DecoderException("Invalid length: " + length);
        }
        
        // 步骤5:检查数据是否完整
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        
        // 步骤6:读取数据
        byte[] data = new byte[length];
        in.readBytes(data);
        
        // 步骤7:创建消息对象
        CustomMessage message = new CustomMessage();
        message.setType(type);
        message.setData(data);
        
        // 步骤8:添加到输出列表
        out.add(message);
    }
}

6.3 使用状态机优化(ReplayingDecoder 的高级用法)

public class StatefulDecoder extends ReplayingDecoder<DecodeState> {
    
    private int length;
    private byte type;
    
    public StatefulDecoder() {
        super(DecodeState.READ_LENGTH);
    }
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        switch (state()) {
            case READ_LENGTH:
                length = in.readInt();
                checkpoint(DecodeState.READ_TYPE);
            case READ_TYPE:
                type = in.readByte();
                checkpoint(DecodeState.READ_DATA);
            case READ_DATA:
                byte[] data = new byte[length];
                in.readBytes(data);
                
                CustomMessage message = new CustomMessage();
                message.setType(type);
                message.setData(data);
                out.add(message);
                
                checkpoint(DecodeState.READ_LENGTH);
                break;
        }
    }
    
    enum DecodeState {
        READ_LENGTH,
        READ_TYPE,
        READ_DATA
    }
}

状态机的优势

  • 清晰的解码步骤
  • 每个状态只关注一件事
  • checkpoint() 保存进度,避免重复解码

八、总结

8.1 解码器选择指南

三种解码器的核心区别在于它们处理的数据类型不同:

  • ByteToMessageDecoder:字节流 → 消息对象,这是最基础的解码器,直接处理网络传输过来的字节数据
  • ReplayingDecoder:字节流 → 消息对象,和 ByteToMessageDecoder 作用一样,只是简化了编码方式,不需要手动检查数据是否足够
  • MessageToMessageDecoder:消息对象 → 消息对象,用于对已经解码的消息做二次转换,比如将 Integer 转成 String
场景 推荐解码器 原因
简单协议,性能要求高 ByteToMessageDecoder 最灵活,性能最好
复杂协议,代码简洁优先 ReplayingDecoder 代码简单,但性能稍差
消息类型转换 MessageToMessageDecoder 专门用于类型转换
已有成熟协议 Netty内置解码器 如 LineBasedFrameDecoder

8.2 核心要点

  1. 累积缓冲区是解码器的核心机制
  2. decode() 会被多次调用,不要保存状态
  3. 正确移动 readerIndex,避免无限循环
  4. 注意内存管理,必要时使用 retain()
  5. 使用状态机处理复杂协议

4.1 为什么需要编码器?

编码器和解码器是一对相反的操作:

解码器:字节流 → Java对象(入站)
编码器:Java对象 → 字节流(出站)

💡 核心作用:在网络通信中,所有数据最终都要以字节的形式传输,编码器就是负责这个转换工作的


4.2 MessageToByteEncoder 深度解析

📌 定义:MessageToByteEncoder 是将消息对象编码为字节的编码器
输入:自定义的消息对象
输出:ByteBuf

核心工作流程

当我们调用 ctx.write(msg) 发送消息时,编码器的工作流程是这样的:

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ByteBuf buf = null;
        try {
            // 1. 检查消息类型是否匹配
            if (acceptOutboundMessage(msg)) {
                I cast = (I) msg;
                
                // 2. 分配ByteBuf
                buf = allocateBuffer(ctx, cast, preferDirect);
                
                try {
                    // 3. 调用encode方法编码
                    encode(ctx, cast, buf);
                } finally {
                    // 4. 释放原始消息
                    ReferenceCountUtil.release(cast);
                }
                
                // 5. 传递编码后的ByteBuf
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                // 类型不匹配,直接传递
                ctx.write(msg, promise);
            }
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
    
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}

整个流程:

  1. ✅ 检查类型
  2. ✅ 分配缓冲区
  3. ✅ 编码
  4. ✅ 释放原消息
  5. ✅ 传递ByteBuf
ByteBuf 的分配

编码器会自动分配 ByteBuf,默认使用直接内存:

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg, boolean preferDirect) {
    return preferDirect ? 
           ctx.alloc().ioBuffer() :    // 直接内存
           ctx.alloc().heapBuffer();   // 堆内存
}

💡 为什么用直接内存?
直接内存适合网络传输,因为可以零拷贝直接发送到网络,而堆内存需要先拷贝到直接内存才能发送


4.3 MessageToMessageEncoder 深度解析

📌 定义:MessageToMessageEncoder 是将一种消息对象转换为另一种消息对象的编码器
输入:消息对象A
输出:消息对象B

与 MessageToByteEncoder 的区别
MessageToByteEncoder:     消息对象 → ByteBuf
MessageToMessageEncoder:  消息对象A → 消息对象B

💡 应用场景:通常用在第一层编码,对业务对象做预处理


4.4 编码器选择指南

场景 推荐编码器 原因
对象 → 字节 MessageToByteEncoder 直接编码为网络数据
对象 → 对象 MessageToMessageEncoder 协议转换、数据转换
字符串发送 StringEncoder Netty内置,开箱即用
自定义协议 MessageToByteEncoder 灵活控制字节格式
核心要点
  • 编码器处理出站数据,继承 ChannelOutboundHandlerAdapter
  • 自动释放原始消息,避免内存泄漏
  • preferDirect 控制内存类型,默认使用直接内存
  • MessageToMessageEncoder 可以一对多,一个输入产生多个输出
  • 注意 Pipeline 顺序,出站Handler从后往前执行

4.5 编解码器对比

特性 解码器 编码器
数据方向 入站(Inbound) 出站(Outbound)
基类 ChannelInboundHandlerAdapter ChannelOutboundHandlerAdapter
核心方法 channelRead() write()
累积机制 有(cumulation)
典型用途 字节 → 对象 对象 → 字节

🎯 核心理解:编码器将 Java 对象转换成字节流,和解码器正好相反。理解了解码器,编码器就很容易理解了。


📚 总结

本文详细讲解了 Netty 的事件传播机制和编解码器原理:

入站事件

  • ✅ 从 head 到 tail 传播
  • ✅ 使用 executionMask 位掩码匹配方法
  • ✅ tail 负责释放未处理的资源

出站事件

  • ✅ 从当前位置往 head 传播
  • ✅ head 调用 unsafe 写入缓冲区
  • ✅ writeAndFlush 触发真正的网络发送

解码器

  • ✅ 累积缓冲区是核心机制
  • ✅ decode() 会被多次调用
  • ✅ 正确移动 readerIndex

编码器

  • ✅ 自动释放原始消息
  • ✅ 默认使用直接内存
  • ✅ 支持一对多编码

📖 相关文档

编码器通过泛型来指定它能处理的消息类型:

public class IntegerEncoder extends MessageToByteEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
        out.writeInt(msg);
    }
}

// 使用时
ctx.write(123);        // ✓ 匹配,会调用encode()
ctx.write("Hello");    // ✗ 不匹配,直接传递给下一个Handler

这个机制和解码器的类型匹配是一样的,都是通过泛型来判断。

2.3 ByteBuf 的分配

编码器会自动分配 ByteBuf,默认使用直接内存:

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg, boolean preferDirect) {
    return preferDirect ? 
           ctx.alloc().ioBuffer() :    // 直接内存
           ctx.alloc().heapBuffer();   // 堆内存
}

直接内存适合网络传输,因为可以零拷贝直接发送到网络,而堆内存需要先拷贝到直接内存才能发送。所以默认使用直接内存是合理的。

2.4 简单示例

我们来看一个简单的例子,将 Integer 编码成 4 个字节:

public class IntegerEncoder extends MessageToByteEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
        out.writeInt(msg);
    }
}

数据流演示:

输入: Integer 值 = 258 (0x00000102)

encode() 执行:
  out.writeInt(258)
  
ByteBuf 状态:
  索引:     0     1     2     3
         ┌─────┬─────┬─────┬─────┐
  数据:  │ 00  │ 00  │ 01  │ 02  │
         └─────┴─────┴─────┴─────┘
         
输出: ByteBuf [0x00, 0x00, 0x01, 0x02]

2.5 自定义协议编码器

假设我们要实现这个协议:[长度(4字节)][类型(1字节)][数据(N字节)]

public class CustomProtocolEncoder extends MessageToByteEncoder<CustomMessage> {
    
    @Override
    protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
        byte[] data = msg.getData();
        byte type = msg.getType();
        
        // 写入长度
        out.writeInt(data.length);
        
        // 写入类型
        out.writeByte(type);
        
        // 写入数据
        out.writeBytes(data);
    }
}

数据流演示:

输入: CustomMessage{type=1, data="Hello"}

encode() 执行过程:
  1. data.length = 5
  2. type = 0x01
  3. data = [0x48, 0x65, 0x6C, 0x6C, 0x6F]

编码后的 ByteBuf:
索引:     0     1     2     3     4     5     6     7     8     9
       ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
数据:  │ 00  │ 00  │ 00  │ 05  │ 01  │ 48  │ 65  │ 6C  │ 6C  │ 6F  │
       └─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┘
        └────────长度=5────────┘ └类型┘ └──────"Hello"──────┘

三、MessageToMessageEncoder 深度解析

从名字可以看出,MessageToMessageEncoder 是将一种消息对象(Message)转换为另一种消息对象(Message)的编码器。它的输入和输出都是消息对象,而不是字节流。这个编码器通常用在第一层编码,也就是在 MessageToByteEncoder 之前,对业务对象做预处理。

3.1 与 MessageToByteEncoder 的区别

MessageToByteEncoder:  消息对象 → ByteBuf
MessageToMessageEncoder: 消息对象A → 消息对象B

可以看到,MessageToByteEncoder 的输出是字节流,而 MessageToMessageEncoder 的输出还是消息对象,需要后续的编码器继续处理。

3.2 典型应用场景

最常见的场景是多层编码,比如先把业务对象转成协议对象,再把协议对象转成字节流:

// 第一层:业务对象 → 协议对象
public class UserToProtocolEncoder extends MessageToMessageEncoder<User> {
    @Override
    protected void encode(ChannelHandlerContext ctx, User user, List<Object> out) {
        ProtocolMessage msg = new ProtocolMessage();
        msg.setType(MessageType.USER_INFO);
        msg.setData(user.toBytes());
        out.add(msg);
    }
}

// 第二层:协议对象 → ByteBuf
public class ProtocolToByteEncoder extends MessageToByteEncoder<ProtocolMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ProtocolMessage msg, ByteBuf out) {
        out.writeInt(msg.getType());
        out.writeBytes(msg.getData());
    }
}

// Pipeline 配置
pipeline.addLast(new UserToProtocolEncoder());
pipeline.addLast(new ProtocolToByteEncoder());

数据流:

User对象
  ↓ UserToProtocolEncoder
ProtocolMessage对象
  ↓ ProtocolToByteEncoder
ByteBuf
  ↓
网络传输

3.3 一对多编码

MessageToMessageEncoder 还有一个特点,就是可以一对多编码,一个输入可以产生多个输出:

public class MessageSplitEncoder extends MessageToMessageEncoder<LargeMessage> {
    
    private static final int MAX_SIZE = 1024;
    
    @Override
    protected void encode(ChannelHandlerContext ctx, LargeMessage msg, List<Object> out) {
        byte[] data = msg.getData();
        
        // 将大消息拆分成多个小消息
        for (int i = 0; i < data.length; i += MAX_SIZE) {
            int length = Math.min(MAX_SIZE, data.length - i);
            byte[] chunk = new byte[length];
            System.arraycopy(data, i, chunk, 0, length);
            
            SmallMessage small = new SmallMessage();
            small.setData(chunk);
            small.setSequence(i / MAX_SIZE);
            
            out.add(small);  // 添加多个输出
        }
    }
}

数据流演示:

输入: LargeMessage (3000字节)

encode() 执行:
  拆分成3个 SmallMessage
  - SmallMessage[0]: 1024字节
  - SmallMessage[1]: 1024字节
  - SmallMessage[2]: 952字节

输出: List<SmallMessage> (3个对象)

Pipeline 后续处理:
  每个 SmallMessage 会被单独传递给下一个Handler

四、编码器的内存管理

4.1 自动释放机制

编码器会自动释放原始消息,避免内存泄漏:

try {
    encode(ctx, cast, buf);
} finally {
    // 自动释放原始消息
    ReferenceCountUtil.release(cast);
}

这个机制和解码器是一样的,编码完成后,原始消息就没用了,需要释放掉。

4.2 何时需要 retain()?

如果你在 encode 方法中需要异步处理消息,就需要手动 retain:

// ❌ 错误:异步处理会导致问题
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
    executor.submit(() -> {
        // 此时 msg 可能已经被释放!
        byte[] data = msg.getData();  // 可能抛出异常
        out.writeBytes(data);
    });
}

// ✅ 正确:同步处理,不需要 retain
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
    byte[] data = msg.getData();
    out.writeBytes(data);
}

一般情况下,encode 方法都是同步执行的,不需要考虑 retain 的问题。


五、编码器与解码器的配合使用

5.1 完整的编解码链

编码器和解码器通常成对出现,一个负责编码,一个负责解码:

// 服务端 Pipeline
pipeline.addLast("decoder", new CustomProtocolDecoder());  // 入站
pipeline.addLast("encoder", new CustomProtocolEncoder());  // 出站
pipeline.addLast("handler", new BusinessHandler());

// 客户端 Pipeline
pipeline.addLast("encoder", new CustomProtocolEncoder());  // 出站
pipeline.addLast("decoder", new CustomProtocolDecoder());  // 入站
pipeline.addLast("handler", new ClientHandler());

数据流向:

客户端发送:
  User对象
    ↓ CustomProtocolEncoder (出站)
  ByteBuf
    ↓ 网络传输
  ByteBuf
    ↓ CustomProtocolDecoder (入站)
  User对象
    ↓ BusinessHandler

服务端响应:
  Response对象
    ↓ CustomProtocolEncoder (出站)
  ByteBuf
    ↓ 网络传输
  ByteBuf
    ↓ CustomProtocolDecoder (入站)
  Response对象
    ↓ ClientHandler

5.2 对称的编解码器实现

编码器和解码器的实现应该是对称的,编码器怎么写,解码器就怎么读:

// 编码器
public class MessageEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) {
        byte[] data = msg.getData();
        out.writeInt(data.length);  // 写入长度
        out.writeByte(msg.getType());  // 写入类型
        out.writeBytes(data);  // 写入数据
    }
}

// 解码器(对称实现)
public class MessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 5) return;
        
        in.markReaderIndex();
        int length = in.readInt();  // 读取长度
        byte type = in.readByte();  // 读取类型
        
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        
        byte[] data = new byte[length];
        in.readBytes(data);  // 读取数据
        
        Message msg = new Message();
        msg.setType(type);
        msg.setData(data);
        out.add(msg);
    }
}

可以看到,编码器写什么,解码器就读什么,顺序完全一致。

七、Netty 内置编码器

Netty 提供了一些常用的编码器,可以直接使用:

// 1. 字符串编码器
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));

// 2. Base64 编码器
pipeline.addLast(new Base64Encoder());

// 3. 长度字段编码器(自动添加长度前缀)
pipeline.addLast(new LengthFieldPrepender(4));

// 4. 对象序列化编码器
pipeline.addLast(new ObjectEncoder());

LengthFieldPrepender 特别有用,它会自动在数据前面添加长度字段:

// 配置
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringEncoder());

// 发送
ctx.write("Hello");

// 编码过程:
// 1. StringEncoder: "Hello" → [0x48, 0x65, 0x6C, 0x6C, 0x6F]
// 2. LengthFieldPrepender: 添加长度 → [0x00, 0x00, 0x00, 0x05, 0x48, 0x65, 0x6C, 0x6C, 0x6F]

八、总结

8.1 编码器选择指南

三种编码器的核心区别在于它们处理的数据类型不同:

  • MessageToByteEncoder:消息对象 → 字节流,这是最基础的编码器,直接将对象编码成网络可传输的字节数据
  • MessageToMessageEncoder:消息对象 → 消息对象,用于对业务对象做预处理,比如将 User 对象转成 ProtocolMessage 对象
场景 推荐编码器 原因
对象 → 字节 MessageToByteEncoder 直接编码为网络数据
对象 → 对象 MessageToMessageEncoder 协议转换、数据转换
字符串发送 StringEncoder Netty内置,开箱即用
自定义协议 MessageToByteEncoder 灵活控制字节格式

8.2 核心要点

  1. 编码器处理出站数据,继承 ChannelOutboundHandlerAdapter
  2. 自动释放原始消息,避免内存泄漏
  3. preferDirect 控制内存类型,默认使用直接内存
  4. MessageToMessageEncoder 可以一对多,一个输入产生多个输出
  5. 注意 Pipeline 顺序,出站Handler从后往前执行,但类型匹配机制会确保正确的处理顺序

8.3 与解码器的对比

特性 解码器 编码器
数据方向 入站(Inbound) 出站(Outbound)
基类 ChannelInboundHandlerAdapter ChannelOutboundHandlerAdapter
核心方法 channelRead() write()
累积机制 有(cumulation)
典型用途 字节 → 对象 对象 → 字节

整个编码器的核心就是将 Java 对象转换成字节流,和解码器正好相反。理解了解码器,编码器就很容易理解了。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐