Netty源码分析--认真系列(二)
本文深入分析Netty的事件处理机制,重点讲解入站事件和出站事件的传播流程。主要内容包括:1) 入站事件处理流程,从NioEventLoop检测事件开始,通过Unsafe对象触发channelRead事件,最终在Pipeline中从head向tail传播;2) 详细解析executionMask位掩码机制,该机制高效记录Handler实现的方法;3) 介绍常见入站事件类型及其触发时机,如chann
相关文章链接
processSelectedKeys() vs runAllTasks()
NioServerSocketChannel-Unsafe初始化详解
Netty源码分析–认真系列(二)
📚 本文详细讲解 Netty 中入站事件和出站事件的传播机制,以及编解码器的工作原理
📖 目录
一、入站事件处理
在上面我们说完了 Channel、ChannelPipeline、也介绍了一下 ChannelHandler 是什么,我们接下来说说接收到一条消息会发生什么,也就是入站消息处理。
1.1 什么是入站事件?
入站事件(Inbound Event) 是指从网络底层向应用层传播的事件,代表数据从外部进入到应用程序的过程。
📋 常见的入站事件
| 事件方法 | 触发时机 | 说明 |
|---|---|---|
channelRegistered() |
Channel 注册到 EventLoop | 连接初始化阶段 |
channelActive() |
Channel 激活 | 连接建立成功 |
channelRead() |
读取到数据 | 接收到网络数据 |
channelReadComplete() |
数据读取完成 | 一次读取操作结束 |
channelInactive() |
Channel 失活 | 连接断开 |
1.2 入站事件的传播方向
💡 核心规则:入站事件从 head 节点开始,向 tail 节点传播
head → Handler1(Inbound) → Handler2(Inbound) → Handler3(Inbound) → tail
⚠️ 注意:这与出站事件相反(出站事件从 tail 到 head)
1.3 入站事件触发的完整流程
我们以 OP_ACCEPT 事件(接收新客户端连接)为例,看看接收消息之后会发生什么。
步骤 1️⃣:NioEventLoop 检测事件
服务端启动后,ServerSocketChannel 会注册到 bossGroup 的 EventLoop,EventLoop 的 run() 方法会不断循环,检测网络事件。
// NioEventLoop.java
protected void run() {
for (;;) { // 无限循环
try {
// 阻塞等待事件(或定时等待)
select(wakenUp.getAndSet(false));
if (ioRatio == 100) {
try {
// 处理所有就绪的 I/O 事件
processSelectedKeys();
} finally {
// 执行任务队列中的任务
runAllTasks();
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
📌 关键点:
select()会阻塞等待,直到有事件就绪或超时。当客户端发起连接时,Selector会检测到OP_ACCEPT事件。
步骤 2️⃣:处理就绪的事件
// NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取 Channel 的 Unsafe 对象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
// 🔥 这是入站事件的关键触发点!
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 调用 Unsafe 的 read 方法
// 对于 ServerSocketChannel,这会接受新连接
// 对于 SocketChannel,这会读取数据
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
📖 扩展阅读:关于
unsafe实例的详细初始化过程,请参考 NioServerSocketChannel-Unsafe初始化详解.md 文档
步骤 3️⃣:触发 channelRead 事件
// NioMessageUnsafe
public void read() {
// 接受新连接,返回接受的连接数
// doReadMessages 会调用 ServerSocketChannel.accept()
// 并将新的 SocketChannel 添加到 readBuf 中
int localRead = doReadMessages(readBuf);
int size = readBuf.size();
for (int i = 0; i < size; i++) {
readPending = false;
// 🔥 关键:触发 channelRead 事件!
// readBuf.get(i) 是新接受的 NioSocketChannel
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
步骤 4️⃣:从 head 开始传播事件
// DefaultChannelPipeline.java
public final ChannelPipeline fireChannelRead(Object msg) {
//注意看这里将head传入
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// AbstractChannelHandlerContext.java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 判断是否在 EventLoop 线程中
if (executor.inEventLoop()) {
// 在 EventLoop 线程中,直接调用
next.invokeChannelRead(m);
} else {
// 不在 EventLoop 线程中,提交任务到 EventLoop
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
// AbstractChannelHandlerContext.java
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 调用当前 Handler 的 channelRead 方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// Handler 还未添加完成,继续传播
fireChannelRead(msg);
}
}
步骤 5️⃣:查找下一个入站 Handler
findContextInbound() 方法负责查找下一个入站 Handler:
// AbstractChannelHandlerContext.java
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
// 向后遍历链表
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
🔍 executionMask 机制详解
executionMask 是 ChannelHandlerContext 的一个整型字段,用位掩码的方式记录当前 Handler 实现了哪些方法。
位掩码定义:
static final int MASK_EXCEPTION_CAUGHT = 1; // 0000 0001
static final int MASK_CHANNEL_REGISTERED = 1 << 1; // 0000 0010
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; // 0000 0100
static final int MASK_CHANNEL_ACTIVE = 1 << 3; // 0000 1000
static final int MASK_CHANNEL_INACTIVE = 1 << 4; // 0001 0000
static final int MASK_CHANNEL_READ = 1 << 5; // 0010 0000
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; // 0100 0000
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; // 1000 0000
工作原理:
- 例如 headHandler 实现
channelRead,那executionMask = 0010 0000 - 若还实现了
channelRegistered方法,executionMask = 0010 0010 - 使用位与运算
&判断:两个都是 1,结果才是 1 - 传进来的
mask = 1 << 5 = 0010 0000,找到下一个实现了channelRead方法的 handler
📖 扩展阅读:关于 executionMask 的详细解释请参考 executionMask详解.md 文档
📖 扩展阅读:关于位运算相关知识请参考 位运算详解.md 文档
1.4 Pipeline 配置示例
在讲后续流程之前,我们先看一个简单的 pipeline 配置:
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
// 在pipeline中添加自定义的业务handler
channel.pipeline().addLast(new MyServerHandler());
}
}
// 自定义的业务handler
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 处理接收到的数据
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
System.out.println("收到消息:" + new String(data, "UTF-8"));
// 继续向后传播(如果还有其他handler的话)
ctx.fireChannelRead(msg);
}
}
Pipeline 结构:
head → MyServerHandler → tail
1.5 从 head 到业务 handler 的传播
回到主线,我们刚才说到 head 处理完成后会调用 ctx.fireChannelRead(msg) 继续传播事件,这个方法会调用 findContextInbound(MASK_CHANNEL_READ) 从 head 开始向后遍历,找到第一个实现了 channelRead 方法的 handler,也就是我们的 MyServerHandler。找到之后,又会调用 invokeChannelRead():
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 在EventLoop线程中,直接调用
next.invokeChannelRead(m);
} else {
// 不在EventLoop线程中,提交任务
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 调用MyServerHandler的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
这里会调用到我们自定义的 MyServerHandler 的 channelRead 方法,我们在这个方法里处理业务逻辑,比如解析数据、处理请求等。在 MyServerHandler 中,我们可以处理接收到的数据,如果需要继续传播给后面的 handler,就调用 ctx.fireChannelRead(msg),事件会继续向后传播,重复前面的流程:findContextInbound() -> invokeChannelRead() -> 下一个 handler 的 channelRead()。如果我们没有调用 ctx.fireChannelRead(msg),事件传播就在这里终止了。
1.6 最终到达 tail
如果事件一直传播,最终会到达 tail 节点:
// TailContext.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// tail会释放msg(如果是ByteBuf的话)
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
// 释放资源
ReferenceCountUtil.release(msg);
}
}
⚠️ 重要:tail 会检查 msg 是否被处理过,如果没有被处理(比如 ByteBuf 没有被读取),会打印警告日志,然后释放资源。这就是为什么我们在 handler 中要么消费掉 msg,要么继续传播给下一个 handler。
1.7 流程总结
完整调用链
NioEventLoop.run()
↓
processSelectedKey()
↓
unsafe.read()
↓
pipeline.fireChannelRead()
↓
head.channelRead()
↓
findContextInbound()
↓
业务Handler.channelRead()
↓
tail.channelRead()
核心要点
- ✅ 入站事件从 head 到 tail 传播
- ✅ 通过
findContextInbound()查找下一个处理器 - ✅ 使用
executionMask位掩码匹配方法 - ✅ tail 负责释放未处理的资源
- ✅ 每个 handler 都有机会处理事件
二、出站事件处理
上面我们说了入站事件,现在来说说出站事件。
💡 核心概念:
- 入站事件:数据从外部进入到应用程序的过程
- 出站事件:数据从应用程序发送到外部的过程
下面我们以发送消息为案例聊聊出站事件。
2.1 Pipeline 配置示例
2.2 出站事件的传播方向
💡 核心规则:出站事件从当前位置向 head 节点传播(往前传播)
当前位置示意:
head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
↑
当前位置
2.3 出站事件传播流程
步骤 1️⃣:调用 write 方法
pipeline.addLast("decoder", new StringDecoder()); // InboundHandler
pipeline.addLast("encoder", new StringEncoder()); // OutboundHandler
pipeline.addLast("logger", new LoggingHandler()); // InboundHandler
pipeline.addLast("auth", new AuthHandler()); // OutboundHandler
pipeline.addLast("business", new BusinessHandler()); // InboundHandler
这里的 AuthHandler 是一个自定义的出站 handler,用来给消息添加认证信息:
public class AuthHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 给消息添加认证头
String authMsg = "[AUTH]" + msg;
// 继续向前传播
ctx.write(authMsg, promise);
}
}
结构图:
head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
(Out) (In) (Out) (In) (Out) (In)
↑ ↑
头部 尾部
出站事件的开始就是往外发送消息,也就是在 handler 中调用 write 方法发送消息,例如在 BusinessHandler 中调用 write 方法:
public class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write("Hello"); // 从这里开始
}
}
当前位置
head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
↑
当前位置
从 write 方法开始,我们来看看 write 方法:
ctx.write(str);
//AbstractChannelHandlerContext
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//检查 msg 不能为 null
ObjectUtil.checkNotNull(msg, "msg");
try {
// 2. 检查 promise 是否有效
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg); // 释放消息
return; // 直接返回,不继续处理
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg); // 出错也要释放消息
throw e;
}
// 3. 查找下一个出站处理器
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// 6. 判断当前线程是否是 EventLoop 线程
if (executor.inEventLoop()) {
// 情况1:在 EventLoop 线程中,直接调用
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 情况2:不在 EventLoop 线程中,提交任务
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
task.cancel();
}
}
}
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); static final int MASK_BIND = 1 << 9; // 0010 0000 0000 static final int MASK_CONNECT = 1 << 10; // 0100 0000 0000 static final int MASK_DISCONNECT = 1 << 11; // 1000 0000 0000 static final int MASK_CLOSE = 1 << 12; static final int MASK_DEREGISTER = 1 << 13; static final int MASK_READ = 1 << 14; static final int MASK_WRITE = 1 << 15; // 1000 0000 0000 0000 static final int MASK_FLUSH = 1 << 16;
我们来看到这行代码,这行代码的作用是查找下一个出站处理器,准确点来说是查找下一个实现 write() 方法的 handler:
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev; // 往前移动
} while ((ctx.executionMask & mask) == 0); // 判断是否实现了指定方法
return ctx;
}
我们这里点进去看 findContextOutbound 方法的源码,发现和上面说的 findContextInbound 方法是一样的,就不过多赘述了。
接下来看 invokeWrite 方法:
next.invokeWrite(m, promise);
void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 调用 Handler 的 write 方法
invokeWrite0(msg, promise);
} else {
// Handler 还没准备好,继续传递
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
// 调用 handler 的 write 方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
在 invokeWrite0 方法内部可以看见,会调用 handler 的 write 方法。根据我们的 pipeline 配置,从 business 位置开始,通过 findContextOutbound() 往前查找,会找到 auth 这个 OutboundHandler。
2.1 AuthHandler 处理
// AuthHandler.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 给消息添加认证头
String authMsg = "[AUTH]" + msg;
// 继续向前传播
ctx.write(authMsg, promise);
}
当前位置
head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
↑
当前位置
AuthHandler 处理完消息后,调用 ctx.write(authMsg, promise) 继续向前传播,又会重复前面的流程:findContextOutbound() -> invokeWrite() -> 下一个 handler 的 write()。这次会找到 encoder。
2.2 StringEncoder 处理
// StringEncoder.java (Netty内置)
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 将String编码为ByteBuf
ByteBuf buf = ...; // 编码逻辑
ctx.write(buf, promise); // 继续传播
}
当前位置
head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
↑
当前位置
encoder 将 String 编码成 ByteBuf 后,继续调用 ctx.write() 向前传播。这时候再往前找,decoder 是 InboundHandler,不会被找到,会直接跳过,最终找到 head。
2.4 最终到达 head
// HeadContext.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
当前位置:
head ←→ decoder ←→ encoder ←→ logger ←→ auth ←→ business ←→ tail
↑
当前位置
📌 关键点:head 是出站事件的终点,它会调用
unsafe.write()将数据写入到底层的 SocketChannel。但是注意,这里只是把数据写到了 Netty 的缓冲区,还没有真正发送到网络。
// AbstractUnsafe.java
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 将消息添加到出站缓冲区
outboundBuffer.addMessage(msg, promise);
}
🔥 writeAndFlush 的完整流程
如果我们调用的是 ctx.writeAndFlush(),那么在 write 完成后,还会触发 flush 事件:
// AbstractUnsafe.java
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
outboundBuffer.addFlush();
flush0(); // 真正的发送逻辑
}
protected void flush0() {
doWrite(outboundBuffer); // 调用JDK的SocketChannel.write()
}
2.5 流程总结
完整调用链
业务Handler.channelRead()
↓
ctx.write()
↓
findContextOutbound()
↓
auth.write() (认证处理)
↓
encoder.write() (编码)
↓
head.write()
↓
unsafe.write() (写入缓冲区)
↓
[如果是writeAndFlush]
↓
unsafe.flush() (真正发送)
核心要点
- ✅ 出站事件从当前位置往 head 传播
- ✅ 通过
findContextOutbound()查找下一个出站处理器 - ✅ head 调用
unsafe.write()写入缓冲区 - ✅
writeAndFlush()会触发 flush 事件真正发送数据 - ✅ 每个 OutboundHandler 都有机会处理事件
三、解码器详解
3.1 为什么需要解码器?
网络传输的本质
在网络通信中,所有数据都必须以字节流的形式传输:
应用层数据(Java对象、字符串等)
↓ 编码
字节流(网络传输)
↓ 解码
应用层数据(Java对象、字符串等)
TCP 粘包/拆包问题
TCP 是面向流的协议,数据像水流一样连续传输,没有明确的消息边界:
发送端发送:
消息1: [Hello]
消息2: [World]
接收端可能收到:
情况1(粘包): [HelloWorld]
情况2(拆包): [Hel] [loWorld]
情况3(混合): [HelloWor] [ld]
🎯 解码器的核心职责:从连续的字节流中正确地识别和提取完整的消息
3.2 ByteToMessageDecoder 深度解析
📌 定义:ByteToMessageDecoder 是将字节(Byte)解码为消息对象(Message)的解码器
输入:ByteBuf
输出:自定义的消息对象
核心机制
ByteToMessageDecoder 是 Netty 中最基础的解码器,它的工作原理:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 累积缓冲区 - 这是关键!
ByteBuf cumulation;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf data = (ByteBuf) msg;
// 1. 将新数据追加到累积缓冲区
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
// 2. 尝试解码
callDecode(ctx, cumulation, out);
// 3. 如果解码成功,将结果传递给下一个Handler
fireChannelRead(ctx, out);
}
}
protected abstract void decode(ChannelHandlerContext ctx,
ByteBuf in,
List<Object> out) throws Exception;
}
累积缓冲区(Cumulation)机制
🔑 这是理解解码器的关键!
第一次接收: [0x01, 0x02]
cumulation: [0x01, 0x02] → decode() 被调用,数据不够,不解码
第二次接收: [0x03, 0x04]
cumulation: [0x01, 0x02, 0x03, 0x04] → decode() 被调用,数据够了,解码成功
解码后:
cumulation: [] → 清空或保留剩余数据
out: [解码后的对象] → 传递给下一个Handler
关键点:
- ✅
cumulation会一直保存未解码的数据 - ✅ 每次新数据到达,都会追加到
cumulation - ✅
decode()方法会被反复调用,直到无法解码更多数据
decode() 方法的正确实现
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 关键:检查可读字节数
if (in.readableBytes() >= 4) {
// 读取4个字节并转换为int
out.add(in.readInt());
}
// 如果数据不够,什么都不做,等待下次调用
}
}
重要理解:
- ✅
in参数就是cumulation,包含了所有累积的数据 - ✅ 如果数据不够,直接返回,不要抛异常
- ✅ 读取数据后,
in的readerIndex会自动前移 - ✅ 添加到
out的对象会自动传递给下一个Handler
完整的数据流示例
假设我们要解码一个协议:[长度(4字节)][数据]
public class LengthFieldDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 第一步:检查是否有足够的字节读取长度字段
if (in.readableBytes() < 4) {
return; // 数据不够,等待更多数据
}
// 第二步:标记当前读位置(重要!)
in.markReaderIndex();
// 第三步:读取长度
int length = in.readInt();
// 第四步:检查是否有足够的数据
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 重置读位置
return; // 数据不够,等待更多数据
}
// 第五步:读取实际数据
byte[] data = new byte[length];
in.readBytes(data);
// 第六步:添加到输出列表
out.add(new String(data));
}
}
数据流演示:
接收数据1: [0x00, 0x00, 0x00, 0x05, 0x48]
↓
cumulation: [0x00, 0x00, 0x00, 0x05, 0x48]
└─────长度字段─────┘ └─数据─┘
(值=5) (1字节)
↓
decode() 执行过程:
1. in.readableBytes() = 5 (总共5字节)
2. 检查:5 >= 4 ✓ (可以读取长度字段)
3. in.markReaderIndex() (标记位置0)
4. length = in.readInt() = 5 (读取4字节,readerIndex移到4)
5. 此时 in.readableBytes() = 1 (剩余1字节)
6. 检查:1 < 5 ✗ (数据不够!需要5字节,只有1字节)
7. in.resetReaderIndex() (重置到位置0)
8. return (等待更多数据)
接收数据2: [0x65, 0x6C, 0x6C, 0x6F]
↓
cumulation: [0x00, 0x00, 0x00, 0x05, 0x48, 0x65, 0x6C, 0x6C, 0x6F]
└─────长度字段─────┘ └──────5字节数据──────┘
(值=5) "Hello"
↓
decode() 执行过程:
1. in.readableBytes() = 9 (总共9字节)
2. 检查:9 >= 4 ✓
3. in.markReaderIndex()
4. length = in.readInt() = 5 (readerIndex移到4)
5. 此时 in.readableBytes() = 5 (剩余5字节)
6. 检查:5 >= 5 ✓ (数据够了!)
7. 读取5字节:[0x48, 0x65, 0x6C, 0x6C, 0x6F]
8. 转换为字符串:"Hello"
9. out.add("Hello")
↓
cumulation: [] (数据已全部读取,清空)
out: ["Hello"] (传递给下一个Handler)
关键理解:
- ❌ 第一次接收了5个字节:
[长度字段4字节] + [数据1字节] - 📍 读取长度字段后,
readerIndex从 0 移动到 4 - 📊 此时
readableBytes()= 总字节数(5) - readerIndex(4) = 1字节 - ⚠️ 但长度字段的值是5,表示需要5字节的数据
- ❌ 所以只有1字节数据,不够!
3.3 ReplayingDecoder 深度解析
📌 定义:ReplayingDecoder 简化了 ByteToMessageDecoder 的编码方式,自动处理数据不足的情况
为什么需要 ReplayingDecoder?
使用 ByteToMessageDecoder 时,我们需要不断检查数据是否足够:
// 繁琐的检查
if (in.readableBytes() < 4) return;
int length = in.readInt();
if (in.readableBytes() < length) return;
ReplayingDecoder 简化了这个过程:
public class SimpleDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 不需要检查!直接读取
int length = in.readInt();
byte[] data = new byte[length];
in.readBytes(data);
out.add(new String(data));
}
}
ReplayingDecoder 的魔法原理
ReplayingDecoder 使用了一个特殊的 ByteBuf 包装器:
class ReplayingDecoderByteBuf extends ByteBuf {
@Override
public int readInt() {
// 检查是否有足够的字节
if (readerIndex + 4 > writerIndex) {
// 抛出特殊的信号异常
throw REPLAY;
}
return super.readInt();
}
}
工作流程:
1. decode() 开始执行
2. 调用 in.readInt()
3. ReplayingDecoderByteBuf 检查:只有2字节,不够!
4. 抛出 REPLAY 异常
5. ReplayingDecoder 捕获异常
6. 重置 readerIndex,等待更多数据
7. 新数据到达,重新执行 decode()
3.4 MessageToMessageDecoder 深度解析
📌 定义:MessageToMessageDecoder 是将一种消息对象转换为另一种消息对象的解码器
输入:消息对象A
输出:消息对象B
与 ByteToMessageDecoder 的区别
ByteToMessageDecoder: ByteBuf → 消息对象
MessageToMessageDecoder: 消息对象A → 消息对象B
💡 应用场景:通常用在第二层解码,对已经解码出来的消息对象做进一步的转换
3.5 解码器选择指南
| 场景 | 推荐解码器 | 原因 |
|---|---|---|
| 简单协议,性能要求高 | ByteToMessageDecoder | 最灵活,性能最好 |
| 复杂协议,代码简洁优先 | ReplayingDecoder | 代码简单,但性能稍差 |
| 消息类型转换 | MessageToMessageDecoder | 专门用于类型转换 |
| 已有成熟协议 | Netty内置解码器 | 如 LineBasedFrameDecoder |
核心要点
- ✅ 累积缓冲区是解码器的核心机制
- ✅ decode() 会被多次调用,不要保存状态
- ✅ 正确移动 readerIndex,避免无限循环
- ✅ 注意内存管理,必要时使用 retain()
- ✅ 使用状态机处理复杂协议
四、编码器详解
场景1:协议转换
// 第一层:字节 → Integer
public class ByteToIntDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
// 第二层:Integer → String
public class IntToStringDecoder extends MessageToMessageDecoder<Integer> {
protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) {
out.add("Number: " + msg);
}
}
// Pipeline 配置
pipeline.addLast(new ByteToIntDecoder());
pipeline.addLast(new IntToStringDecoder());
pipeline.addLast(new MyBusinessHandler()); // 接收 String
场景2:消息过滤和转换
public class HttpToCustomDecoder extends MessageToMessageDecoder<HttpRequest> {
@Override
protected void decode(ChannelHandlerContext ctx, HttpRequest msg, List<Object> out) {
// 将 HTTP 请求转换为自定义对象
CustomRequest request = new CustomRequest();
request.setMethod(msg.method().name());
request.setUri(msg.uri());
request.setHeaders(msg.headers());
out.add(request);
}
}
4.3 类型匹配机制详解
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
protected MessageToMessageDecoder() {
// 通过反射获取泛型类型
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
public boolean acceptInboundMessage(Object msg) {
// 检查消息类型是否匹配
return matcher.match(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (acceptInboundMessage(msg)) {
// 类型匹配,进行解码
decode(ctx, (I) msg, out);
} else {
// 类型不匹配,直接传递给下一个Handler
ctx.fireChannelRead(msg);
}
}
}
示例:
public class IntegerDecoder extends MessageToMessageDecoder<Integer> {
// 泛型 I = Integer
}
// 消息流
pipeline.fireChannelRead("Hello"); // 不匹配,直接传递
pipeline.fireChannelRead(123); // 匹配,调用 decode()
pipeline.fireChannelRead(45.6); // 不匹配,直接传递
五、解码器的内存管理
5.1 自动释放机制
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (acceptInboundMessage(msg)) {
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
// 关键:自动释放消息
ReferenceCountUtil.release(cast);
}
}
} finally {
// 传递解码后的消息
for (Object decoded : out) {
ctx.fireChannelRead(decoded);
}
}
}
5.2 何时需要 retain()?
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() >= 4) {
// 场景1:直接读取数据(推荐)
int value = in.readInt();
out.add(value); // ✅ 不需要 retain
// 场景2:传递 ByteBuf 的切片
ByteBuf slice = in.readSlice(4);
out.add(slice); // ❌ 错误!slice 会被自动释放
// 正确做法
ByteBuf slice = in.readSlice(4);
out.add(slice.retain()); // ✅ 增加引用计数
}
}
}
六、实战:自定义协议解码器
6.1 协议定义
+--------+--------+------------+
| Length | Type | Data |
| 4 bytes| 1 byte | N bytes |
+--------+--------+------------+
6.2 完整实现
public class CustomProtocolDecoder extends ByteToMessageDecoder {
private static final int HEADER_SIZE = 5; // 4 + 1
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 步骤1:检查头部是否完整
if (in.readableBytes() < HEADER_SIZE) {
return;
}
// 步骤2:标记读位置
in.markReaderIndex();
// 步骤3:读取长度和类型
int length = in.readInt();
byte type = in.readByte();
// 步骤4:验证长度
if (length < 0 || length > 1024 * 1024) { // 最大1MB
throw new DecoderException("Invalid length: " + length);
}
// 步骤5:检查数据是否完整
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// 步骤6:读取数据
byte[] data = new byte[length];
in.readBytes(data);
// 步骤7:创建消息对象
CustomMessage message = new CustomMessage();
message.setType(type);
message.setData(data);
// 步骤8:添加到输出列表
out.add(message);
}
}
6.3 使用状态机优化(ReplayingDecoder 的高级用法)
public class StatefulDecoder extends ReplayingDecoder<DecodeState> {
private int length;
private byte type;
public StatefulDecoder() {
super(DecodeState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
switch (state()) {
case READ_LENGTH:
length = in.readInt();
checkpoint(DecodeState.READ_TYPE);
case READ_TYPE:
type = in.readByte();
checkpoint(DecodeState.READ_DATA);
case READ_DATA:
byte[] data = new byte[length];
in.readBytes(data);
CustomMessage message = new CustomMessage();
message.setType(type);
message.setData(data);
out.add(message);
checkpoint(DecodeState.READ_LENGTH);
break;
}
}
enum DecodeState {
READ_LENGTH,
READ_TYPE,
READ_DATA
}
}
状态机的优势:
- 清晰的解码步骤
- 每个状态只关注一件事
checkpoint()保存进度,避免重复解码
八、总结
8.1 解码器选择指南
三种解码器的核心区别在于它们处理的数据类型不同:
- ByteToMessageDecoder:字节流 → 消息对象,这是最基础的解码器,直接处理网络传输过来的字节数据
- ReplayingDecoder:字节流 → 消息对象,和 ByteToMessageDecoder 作用一样,只是简化了编码方式,不需要手动检查数据是否足够
- MessageToMessageDecoder:消息对象 → 消息对象,用于对已经解码的消息做二次转换,比如将 Integer 转成 String
| 场景 | 推荐解码器 | 原因 |
|---|---|---|
| 简单协议,性能要求高 | ByteToMessageDecoder | 最灵活,性能最好 |
| 复杂协议,代码简洁优先 | ReplayingDecoder | 代码简单,但性能稍差 |
| 消息类型转换 | MessageToMessageDecoder | 专门用于类型转换 |
| 已有成熟协议 | Netty内置解码器 | 如 LineBasedFrameDecoder |
8.2 核心要点
- 累积缓冲区是解码器的核心机制
- decode() 会被多次调用,不要保存状态
- 正确移动 readerIndex,避免无限循环
- 注意内存管理,必要时使用 retain()
- 使用状态机处理复杂协议
4.1 为什么需要编码器?
编码器和解码器是一对相反的操作:
解码器:字节流 → Java对象(入站)
编码器:Java对象 → 字节流(出站)
💡 核心作用:在网络通信中,所有数据最终都要以字节的形式传输,编码器就是负责这个转换工作的
4.2 MessageToByteEncoder 深度解析
📌 定义:MessageToByteEncoder 是将消息对象编码为字节的编码器
输入:自定义的消息对象
输出:ByteBuf
核心工作流程
当我们调用 ctx.write(msg) 发送消息时,编码器的工作流程是这样的:
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf buf = null;
try {
// 1. 检查消息类型是否匹配
if (acceptOutboundMessage(msg)) {
I cast = (I) msg;
// 2. 分配ByteBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 3. 调用encode方法编码
encode(ctx, cast, buf);
} finally {
// 4. 释放原始消息
ReferenceCountUtil.release(cast);
}
// 5. 传递编码后的ByteBuf
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 类型不匹配,直接传递
ctx.write(msg, promise);
}
} finally {
if (buf != null) {
buf.release();
}
}
}
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}
整个流程:
- ✅ 检查类型
- ✅ 分配缓冲区
- ✅ 编码
- ✅ 释放原消息
- ✅ 传递ByteBuf
ByteBuf 的分配
编码器会自动分配 ByteBuf,默认使用直接内存:
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg, boolean preferDirect) {
return preferDirect ?
ctx.alloc().ioBuffer() : // 直接内存
ctx.alloc().heapBuffer(); // 堆内存
}
💡 为什么用直接内存?
直接内存适合网络传输,因为可以零拷贝直接发送到网络,而堆内存需要先拷贝到直接内存才能发送
4.3 MessageToMessageEncoder 深度解析
📌 定义:MessageToMessageEncoder 是将一种消息对象转换为另一种消息对象的编码器
输入:消息对象A
输出:消息对象B
与 MessageToByteEncoder 的区别
MessageToByteEncoder: 消息对象 → ByteBuf
MessageToMessageEncoder: 消息对象A → 消息对象B
💡 应用场景:通常用在第一层编码,对业务对象做预处理
4.4 编码器选择指南
| 场景 | 推荐编码器 | 原因 |
|---|---|---|
| 对象 → 字节 | MessageToByteEncoder | 直接编码为网络数据 |
| 对象 → 对象 | MessageToMessageEncoder | 协议转换、数据转换 |
| 字符串发送 | StringEncoder | Netty内置,开箱即用 |
| 自定义协议 | MessageToByteEncoder | 灵活控制字节格式 |
核心要点
- ✅ 编码器处理出站数据,继承
ChannelOutboundHandlerAdapter - ✅ 自动释放原始消息,避免内存泄漏
- ✅ preferDirect 控制内存类型,默认使用直接内存
- ✅ MessageToMessageEncoder 可以一对多,一个输入产生多个输出
- ✅ 注意 Pipeline 顺序,出站Handler从后往前执行
4.5 编解码器对比
| 特性 | 解码器 | 编码器 |
|---|---|---|
| 数据方向 | 入站(Inbound) | 出站(Outbound) |
| 基类 | ChannelInboundHandlerAdapter | ChannelOutboundHandlerAdapter |
| 核心方法 | channelRead() | write() |
| 累积机制 | 有(cumulation) | 无 |
| 典型用途 | 字节 → 对象 | 对象 → 字节 |
🎯 核心理解:编码器将 Java 对象转换成字节流,和解码器正好相反。理解了解码器,编码器就很容易理解了。
📚 总结
本文详细讲解了 Netty 的事件传播机制和编解码器原理:
入站事件
- ✅ 从 head 到 tail 传播
- ✅ 使用 executionMask 位掩码匹配方法
- ✅ tail 负责释放未处理的资源
出站事件
- ✅ 从当前位置往 head 传播
- ✅ head 调用 unsafe 写入缓冲区
- ✅ writeAndFlush 触发真正的网络发送
解码器
- ✅ 累积缓冲区是核心机制
- ✅ decode() 会被多次调用
- ✅ 正确移动 readerIndex
编码器
- ✅ 自动释放原始消息
- ✅ 默认使用直接内存
- ✅ 支持一对多编码
📖 相关文档:
编码器通过泛型来指定它能处理的消息类型:
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
out.writeInt(msg);
}
}
// 使用时
ctx.write(123); // ✓ 匹配,会调用encode()
ctx.write("Hello"); // ✗ 不匹配,直接传递给下一个Handler
这个机制和解码器的类型匹配是一样的,都是通过泛型来判断。
2.3 ByteBuf 的分配
编码器会自动分配 ByteBuf,默认使用直接内存:
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg, boolean preferDirect) {
return preferDirect ?
ctx.alloc().ioBuffer() : // 直接内存
ctx.alloc().heapBuffer(); // 堆内存
}
直接内存适合网络传输,因为可以零拷贝直接发送到网络,而堆内存需要先拷贝到直接内存才能发送。所以默认使用直接内存是合理的。
2.4 简单示例
我们来看一个简单的例子,将 Integer 编码成 4 个字节:
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
out.writeInt(msg);
}
}
数据流演示:
输入: Integer 值 = 258 (0x00000102)
encode() 执行:
out.writeInt(258)
ByteBuf 状态:
索引: 0 1 2 3
┌─────┬─────┬─────┬─────┐
数据: │ 00 │ 00 │ 01 │ 02 │
└─────┴─────┴─────┴─────┘
输出: ByteBuf [0x00, 0x00, 0x01, 0x02]
2.5 自定义协议编码器
假设我们要实现这个协议:[长度(4字节)][类型(1字节)][数据(N字节)]
public class CustomProtocolEncoder extends MessageToByteEncoder<CustomMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
byte[] data = msg.getData();
byte type = msg.getType();
// 写入长度
out.writeInt(data.length);
// 写入类型
out.writeByte(type);
// 写入数据
out.writeBytes(data);
}
}
数据流演示:
输入: CustomMessage{type=1, data="Hello"}
encode() 执行过程:
1. data.length = 5
2. type = 0x01
3. data = [0x48, 0x65, 0x6C, 0x6C, 0x6F]
编码后的 ByteBuf:
索引: 0 1 2 3 4 5 6 7 8 9
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
数据: │ 00 │ 00 │ 00 │ 05 │ 01 │ 48 │ 65 │ 6C │ 6C │ 6F │
└─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┘
└────────长度=5────────┘ └类型┘ └──────"Hello"──────┘
三、MessageToMessageEncoder 深度解析
从名字可以看出,MessageToMessageEncoder 是将一种消息对象(Message)转换为另一种消息对象(Message)的编码器。它的输入和输出都是消息对象,而不是字节流。这个编码器通常用在第一层编码,也就是在 MessageToByteEncoder 之前,对业务对象做预处理。
3.1 与 MessageToByteEncoder 的区别
MessageToByteEncoder: 消息对象 → ByteBuf
MessageToMessageEncoder: 消息对象A → 消息对象B
可以看到,MessageToByteEncoder 的输出是字节流,而 MessageToMessageEncoder 的输出还是消息对象,需要后续的编码器继续处理。
3.2 典型应用场景
最常见的场景是多层编码,比如先把业务对象转成协议对象,再把协议对象转成字节流:
// 第一层:业务对象 → 协议对象
public class UserToProtocolEncoder extends MessageToMessageEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, List<Object> out) {
ProtocolMessage msg = new ProtocolMessage();
msg.setType(MessageType.USER_INFO);
msg.setData(user.toBytes());
out.add(msg);
}
}
// 第二层:协议对象 → ByteBuf
public class ProtocolToByteEncoder extends MessageToByteEncoder<ProtocolMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ProtocolMessage msg, ByteBuf out) {
out.writeInt(msg.getType());
out.writeBytes(msg.getData());
}
}
// Pipeline 配置
pipeline.addLast(new UserToProtocolEncoder());
pipeline.addLast(new ProtocolToByteEncoder());
数据流:
User对象
↓ UserToProtocolEncoder
ProtocolMessage对象
↓ ProtocolToByteEncoder
ByteBuf
↓
网络传输
3.3 一对多编码
MessageToMessageEncoder 还有一个特点,就是可以一对多编码,一个输入可以产生多个输出:
public class MessageSplitEncoder extends MessageToMessageEncoder<LargeMessage> {
private static final int MAX_SIZE = 1024;
@Override
protected void encode(ChannelHandlerContext ctx, LargeMessage msg, List<Object> out) {
byte[] data = msg.getData();
// 将大消息拆分成多个小消息
for (int i = 0; i < data.length; i += MAX_SIZE) {
int length = Math.min(MAX_SIZE, data.length - i);
byte[] chunk = new byte[length];
System.arraycopy(data, i, chunk, 0, length);
SmallMessage small = new SmallMessage();
small.setData(chunk);
small.setSequence(i / MAX_SIZE);
out.add(small); // 添加多个输出
}
}
}
数据流演示:
输入: LargeMessage (3000字节)
encode() 执行:
拆分成3个 SmallMessage
- SmallMessage[0]: 1024字节
- SmallMessage[1]: 1024字节
- SmallMessage[2]: 952字节
输出: List<SmallMessage> (3个对象)
Pipeline 后续处理:
每个 SmallMessage 会被单独传递给下一个Handler
四、编码器的内存管理
4.1 自动释放机制
编码器会自动释放原始消息,避免内存泄漏:
try {
encode(ctx, cast, buf);
} finally {
// 自动释放原始消息
ReferenceCountUtil.release(cast);
}
这个机制和解码器是一样的,编码完成后,原始消息就没用了,需要释放掉。
4.2 何时需要 retain()?
如果你在 encode 方法中需要异步处理消息,就需要手动 retain:
// ❌ 错误:异步处理会导致问题
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
executor.submit(() -> {
// 此时 msg 可能已经被释放!
byte[] data = msg.getData(); // 可能抛出异常
out.writeBytes(data);
});
}
// ✅ 正确:同步处理,不需要 retain
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
byte[] data = msg.getData();
out.writeBytes(data);
}
一般情况下,encode 方法都是同步执行的,不需要考虑 retain 的问题。
五、编码器与解码器的配合使用
5.1 完整的编解码链
编码器和解码器通常成对出现,一个负责编码,一个负责解码:
// 服务端 Pipeline
pipeline.addLast("decoder", new CustomProtocolDecoder()); // 入站
pipeline.addLast("encoder", new CustomProtocolEncoder()); // 出站
pipeline.addLast("handler", new BusinessHandler());
// 客户端 Pipeline
pipeline.addLast("encoder", new CustomProtocolEncoder()); // 出站
pipeline.addLast("decoder", new CustomProtocolDecoder()); // 入站
pipeline.addLast("handler", new ClientHandler());
数据流向:
客户端发送:
User对象
↓ CustomProtocolEncoder (出站)
ByteBuf
↓ 网络传输
ByteBuf
↓ CustomProtocolDecoder (入站)
User对象
↓ BusinessHandler
服务端响应:
Response对象
↓ CustomProtocolEncoder (出站)
ByteBuf
↓ 网络传输
ByteBuf
↓ CustomProtocolDecoder (入站)
Response对象
↓ ClientHandler
5.2 对称的编解码器实现
编码器和解码器的实现应该是对称的,编码器怎么写,解码器就怎么读:
// 编码器
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) {
byte[] data = msg.getData();
out.writeInt(data.length); // 写入长度
out.writeByte(msg.getType()); // 写入类型
out.writeBytes(data); // 写入数据
}
}
// 解码器(对称实现)
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 5) return;
in.markReaderIndex();
int length = in.readInt(); // 读取长度
byte type = in.readByte(); // 读取类型
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[length];
in.readBytes(data); // 读取数据
Message msg = new Message();
msg.setType(type);
msg.setData(data);
out.add(msg);
}
}
可以看到,编码器写什么,解码器就读什么,顺序完全一致。
七、Netty 内置编码器
Netty 提供了一些常用的编码器,可以直接使用:
// 1. 字符串编码器
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
// 2. Base64 编码器
pipeline.addLast(new Base64Encoder());
// 3. 长度字段编码器(自动添加长度前缀)
pipeline.addLast(new LengthFieldPrepender(4));
// 4. 对象序列化编码器
pipeline.addLast(new ObjectEncoder());
LengthFieldPrepender 特别有用,它会自动在数据前面添加长度字段:
// 配置
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringEncoder());
// 发送
ctx.write("Hello");
// 编码过程:
// 1. StringEncoder: "Hello" → [0x48, 0x65, 0x6C, 0x6C, 0x6F]
// 2. LengthFieldPrepender: 添加长度 → [0x00, 0x00, 0x00, 0x05, 0x48, 0x65, 0x6C, 0x6C, 0x6F]
八、总结
8.1 编码器选择指南
三种编码器的核心区别在于它们处理的数据类型不同:
- MessageToByteEncoder:消息对象 → 字节流,这是最基础的编码器,直接将对象编码成网络可传输的字节数据
- MessageToMessageEncoder:消息对象 → 消息对象,用于对业务对象做预处理,比如将 User 对象转成 ProtocolMessage 对象
| 场景 | 推荐编码器 | 原因 |
|---|---|---|
| 对象 → 字节 | MessageToByteEncoder | 直接编码为网络数据 |
| 对象 → 对象 | MessageToMessageEncoder | 协议转换、数据转换 |
| 字符串发送 | StringEncoder | Netty内置,开箱即用 |
| 自定义协议 | MessageToByteEncoder | 灵活控制字节格式 |
8.2 核心要点
- 编码器处理出站数据,继承
ChannelOutboundHandlerAdapter - 自动释放原始消息,避免内存泄漏
- preferDirect 控制内存类型,默认使用直接内存
- MessageToMessageEncoder 可以一对多,一个输入产生多个输出
- 注意 Pipeline 顺序,出站Handler从后往前执行,但类型匹配机制会确保正确的处理顺序
8.3 与解码器的对比
| 特性 | 解码器 | 编码器 |
|---|---|---|
| 数据方向 | 入站(Inbound) | 出站(Outbound) |
| 基类 | ChannelInboundHandlerAdapter | ChannelOutboundHandlerAdapter |
| 核心方法 | channelRead() | write() |
| 累积机制 | 有(cumulation) | 无 |
| 典型用途 | 字节 → 对象 | 对象 → 字节 |
整个编码器的核心就是将 Java 对象转换成字节流,和解码器正好相反。理解了解码器,编码器就很容易理解了。
更多推荐


所有评论(0)