Netty 消息处理流程详解

概述

Netty 的消息处理流程是其核心功能之一,包括消息的接收、解码、业务处理、编码和发送等环节。理解消息处理流程对于构建高性能的网络应用至关重要。

消息处理整体架构

事件传播
消息发送流程
消息接收流程
Inbound事件
ChannelPipeline
Outbound事件
Head → Tail
Tail → Head
编码器处理
业务数据
ByteBuf写入
写缓冲区
网络发送
操作系统发送
操作系统缓冲区
网络数据
ByteBuf分配
数据读取
解码器处理
业务处理器

消息接收流程

整体接收序列

远程端 操作系统 Channel EventLoop ByteBuf Pipeline Decoder Handler 发送网络数据 接收并缓存数据 检测到OP_READ事件 分配缓冲区 读取数据到ByteBuf 返回读取字节数 fireChannelRead(byteBuf) 传递ByteBuf 解码数据 传递解码后对象 处理业务逻辑 释放缓冲区 远程端 操作系统 Channel EventLoop ByteBuf Pipeline Decoder Handler

详细接收步骤

1. 数据读取阶段
Selector检测到OP_READ
调用channel.read()
分配ByteBuf
调用doReadBytes()
从SocketChannel读取
数据存入ByteBuf
触发channelRead事件
ByteBuf分配策略
直接内存vs堆内存
内存池vs非内存池
自适应分配器

数据读取源码分析:

// AbstractNioByteChannel.read()方法
@Override
public void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);
    
    ByteBuf byteBuf = null;
    boolean close = false;
    
    try {
        do {
            // 1. 分配ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            
            // 2. 读取数据
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            
            if (allocHandle.lastBytesRead() <= 0) {
                // 没有数据可读
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                break;
            }
            
            allocHandle.incMessagesRead(1);
            readPending = false;
            
            // 3. 触发channelRead事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
            
        } while (allocHandle.continueReading());
        
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        
        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    }
}
2. 解码处理阶段
ByteBuf到达解码器
检查可读字节
足够解码?
执行解码逻辑
等待更多数据
创建解码后对象
传递解码对象
释放原始ByteBuf
返回等待状态
累积数据

解码器处理流程:

// ByteToMessageDecoder.channelRead()方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0;
                discardSomeReadBytes();
            }
            
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}
3. 业务处理阶段
解码器 业务处理器 业务服务 响应处理器 传递解码后对象 参数验证 调用业务逻辑 返回业务结果 创建响应对象 编码响应 返回编码后数据 ctx.writeAndFlush(response) 处理完成 alt [需要响应] [无需响应] 解码器 业务处理器 业务服务 响应处理器

消息发送流程

整体发送序列

业务处理器 编码器 Channel ChannelOutboundBuffer EventLoop 操作系统 远程端 write(response对象) 编码为ByteBuf 传递ByteBuf 添加消息到缓冲区 检查是否在EventLoop线程 立即刷新 提交flush任务 alt [在EventLoop线程] [不在EventLoop线程] 移除并获取消息 写入SocketChannel 发送数据 网络传输 业务处理器 编码器 Channel ChannelOutboundBuffer EventLoop 操作系统 远程端

详细发送步骤

1. 编码处理阶段
业务对象到达编码器
检查对象类型
是否支持编码?
执行编码逻辑
传递给下一个Handler
创建ByteBuf
序列化对象数据
写入ByteBuf
传递ByteBuf

编码器处理流程:

// MessageToByteEncoder.write()方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }
            
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}
2. 写缓冲区管理
消息到达Channel
检查写缓冲区
缓冲区是否满?
设置不可写标志
添加消息到队列
触发channelWritabilityChanged
等待可写事件
更新写状态
准备发送

写缓冲区管理源码:

// AbstractChannelHandlerContext.write()方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            task.cancel();
        }
    }
}

// AbstractChannel.write()方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

// AbstractUnsafe.write()方法
@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, newClosedChannelException(initialCloseCause));
        ReferenceCountUtil.release(msg);
        return;
    }
    
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    
    outboundBuffer.addMessage(msg, size, promise);
}
3. 刷新和发送阶段
完全发送
部分发送
调用flush()
检查写缓冲区
是否有待发送数据?
获取可写数据
直接返回
调用doWrite()
写入SocketChannel
更新发送进度
检查是否完全发送
从队列移除
继续下次发送
触发写成功事件
注册OP_WRITE事件

刷新发送源码分析:

// AbstractChannelHandlerContext.flush()方法
@Override
public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeFlush();
    } else {
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        executor.execute(task);
    }
    
    return this;
}

// NioSocketChannel.doWrite()方法
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    int writeSpinCount = config().getWriteSpinCount();
    do {
        if (in.isEmpty()) {
            clearOpWrite();
            return;
        }
        
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();
        
        switch (nioBufferCnt) {
            case 0:
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);
    
    incompleteWrite(writeSpinCount < 0);
}

事件传播机制

Inbound 事件传播

Channel
HeadContext
Decoder1
Decoder2
BusinessHandler
TailContext
channelRegistered
channelActive
channelRead
channelReadComplete
channelInactive
channelUnregistered

Outbound 事件传播

BusinessHandler
Encoder2
Encoder1
HeadContext
Channel
bind
connect
disconnect
close
write
flush

零拷贝机制

FileRegion 零拷贝

File FileRegion Channel 操作系统 远程端 创建文件区域 记录文件描述符和范围 transferTo(targetChannel) sendfile系统调用 内核空间直接传输 网络发送数据 数据不经过用户空间 File FileRegion Channel 操作系统 远程端

FileRegion 使用示例:

// 文件传输零拷贝
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof FileRequest) {
        FileRequest request = (FileRequest) msg;
        File file = new File(request.getPath());
        
        RandomAccessFile raf = new RandomAccessFile(file, "r");
        FileRegion region = new DefaultFileRegion(
            raf.getChannel(), 0, file.length());
        
        ctx.writeAndFlush(region).addListener(future -> {
            raf.close();
        });
    }
}

CompositeByteBuf 零拷贝

CompositeByteBuf
多个ByteBuf组合
避免数据拷贝
虚拟合并视图
使用场景
HTTP协议组合
协议头部拼接
消息分片重组

性能优化机制

批量处理优化

批量读取
配置读取次数
连续读取数据
减少系统调用
批量写入
聚集写入
多个缓冲区
单次系统调用

自适应缓冲区

AdaptiveRecvByteBufAllocator
动态调整缓冲区大小
根据历史数据量
预测下次数据量
调整策略
指数增加
逐步减少
范围限制

异常处理机制

读取异常处理

读取异常
IOException
DecoderException
OutOfMemoryError
连接重置
连接关闭
解码失败
数据格式错误
内存不足
缓冲区分配失败
异常处理
关闭连接
记录日志
释放资源

写入异常处理

写入异常
IOException
EncoderException
WriteBufferWaterMark
连接断开
缓冲区满
编码失败
对象序列化错误
写缓冲区溢出
触发高水位
异常处理
设置Channel不可写
等待可写事件
恢复写入

内存管理

ByteBuf 生命周期

分配ByteBuf
使用ByteBuf
引用计数管理
释放ByteBuf
引用计数
retain增加计数
release减少计数
计数是否为0?
回收内存
继续使用

内存泄漏检测

内存泄漏检测
采样频率
记录分配栈
监控释放情况
检测级别
DISABLED
SIMPLE
ADVANCED
PARANOID
处理策略
记录日志
生成报告
定位泄漏源

总结

Netty 的消息处理流程体现了以下特点:

  1. 异步非阻塞:整个消息处理过程都是异步的
  2. 零拷贝优化:通过 FileRegion 和 CompositeByteBuf 减少数据拷贝
  3. 批量处理:支持批量读写,减少系统调用
  4. 内存管理:完善的内存分配和回收机制
  5. 异常处理:完善的异常处理和资源释放机制
  6. 性能优化:自适应缓冲区、批量处理等优化措施

理解消息处理流程有助于:

  • 编写高效的 Handler
  • 优化消息处理性能
  • 诊断消息处理问题
  • 设计合理的协议编解码器
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐