Netty-09消息处理流程
Netty消息处理流程可分为接收和发送两大阶段。接收流程包括:1)Selector检测OP_READ事件后分配ByteBuf并读取数据;2)解码器处理ByteBuf数据,累积不足数据等待完整包;3)业务处理器处理解码后的对象并调用业务服务。发送流程则反向进行编码和网络发送。整个流程通过ChannelPipeline完成事件传播,接收消息从Head到Tail传递,发送消息从Tail到Head传递。关
·
Netty 消息处理流程详解
概述
Netty 的消息处理流程是其核心功能之一,包括消息的接收、解码、业务处理、编码和发送等环节。理解消息处理流程对于构建高性能的网络应用至关重要。
消息处理整体架构
消息接收流程
整体接收序列
详细接收步骤
1. 数据读取阶段
数据读取源码分析:
// AbstractNioByteChannel.read()方法
@Override
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 1. 分配ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 2. 读取数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 没有数据可读
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 3. 触发channelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
}
2. 解码处理阶段
解码器处理流程:
// ByteToMessageDecoder.channelRead()方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
3. 业务处理阶段
消息发送流程
整体发送序列
详细发送步骤
1. 编码处理阶段
编码器处理流程:
// MessageToByteEncoder.write()方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
2. 写缓冲区管理
写缓冲区管理源码:
// AbstractChannelHandlerContext.write()方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
task.cancel();
}
}
}
// AbstractChannel.write()方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
// AbstractUnsafe.write()方法
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
3. 刷新和发送阶段
刷新发送源码分析:
// AbstractChannelHandlerContext.flush()方法
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
executor.execute(task);
}
return this;
}
// NioSocketChannel.doWrite()方法
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
clearOpWrite();
return;
}
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
writeSpinCount -= doWrite0(in);
break;
case 1: {
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
事件传播机制
Inbound 事件传播
Outbound 事件传播
零拷贝机制
FileRegion 零拷贝
FileRegion 使用示例:
// 文件传输零拷贝
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FileRequest) {
FileRequest request = (FileRequest) msg;
File file = new File(request.getPath());
RandomAccessFile raf = new RandomAccessFile(file, "r");
FileRegion region = new DefaultFileRegion(
raf.getChannel(), 0, file.length());
ctx.writeAndFlush(region).addListener(future -> {
raf.close();
});
}
}
CompositeByteBuf 零拷贝
性能优化机制
批量处理优化
自适应缓冲区
异常处理机制
读取异常处理
写入异常处理
内存管理
ByteBuf 生命周期
内存泄漏检测
总结
Netty 的消息处理流程体现了以下特点:
- 异步非阻塞:整个消息处理过程都是异步的
- 零拷贝优化:通过 FileRegion 和 CompositeByteBuf 减少数据拷贝
- 批量处理:支持批量读写,减少系统调用
- 内存管理:完善的内存分配和回收机制
- 异常处理:完善的异常处理和资源释放机制
- 性能优化:自适应缓冲区、批量处理等优化措施
理解消息处理流程有助于:
- 编写高效的 Handler
- 优化消息处理性能
- 诊断消息处理问题
- 设计合理的协议编解码器
更多推荐
所有评论(0)