netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 netty实现简易服务端和客户端

1.1 交互流程

  •  服务端-实例化boss线程组
  • 服务端-实例化work线程组
  • 服务端-给两个线程组都配置一些参数
  • 服务端-绑定端口,启动服务
  • 客户端-启动客户端,实例化一个线程组,客户端不需要boss和work
  • 客户端-给线程组配置参数
  • 客户端-像服务端发起连接
  • 客户端-调用SimpleClientHandle#channelActive
  • 服务端-接收客户端传输数据SimpleServerHandler#channelRead, 并向客户端发送数据
  • 客户端-接收服务端传输数据SimpleClientHandle#channelRead

  

1.2 pom依赖

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!--常用工具类 -->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-compress</artifactId>
      <version>1.27.1</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.63.Final</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.21</version>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
    </dependency>
  </dependencies>

1.3 服务端代码

package com.bonnie.netty.str.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * netty服务端, 这快用的是String
 */
public class NettyServer {

    public static void main(String[] args) {
        // 创建Boss线程
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 创建work线程
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        // 实例化serverBootstrap
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // bossGroup保存AbstractBootStrap的group属性中
        // workGroup保存serverBootstrap的childGroup属性中
        serverBootstrap.group(bossGroup, workGroup)
                // 把NioServerSocketChannel.class保存到ReflectiveChannelFactory工厂对象中,将来要通过实例化把NioServerSocketChannel
                .channel(NioServerSocketChannel.class)
                // Boss线程要处理的业务 LoggingHandler会保存到AbstractBootStrap中的handler中
                .handler(new LoggingHandler(LogLevel.INFO))
                // work线程要处理的业务,把ChannelInitializer保存到ServerBootstrap中的childHandler中
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new StringDecoder())
                                .addLast(new StringEncoder())
                                .addLast(new SimpleServerHandler());
                    }
                });
        try {
            // 监听8080端口,并同步返回
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            // 阻止进入finally语句块,只有其他的线程调用 channelFuture.channel().close()的时候,然后进入finally代码块关闭连接池
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

}
package com.bonnie.netty.str.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.UUID;


/**
 * 服务端handle
 */
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 接收客户端数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String record = (String) msg;
        System.out.println("服务端接收消息:" + record);

        // 写回数据
        ctx.writeAndFlush(UUID.randomUUID().toString());
    }

}

1.4 客户端代码

package com.bonnie.netty.str.client;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * netty客户端
 */
public class NettyClient {

    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new StringDecoder())
                                .addLast(new StringEncoder())
                                .addLast(new SimpleClientHandle());
                    }
                });
        try {
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

}
package com.bonnie.netty.str.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

// 客户端接收数据
public class SimpleClientHandle extends ChannelInboundHandlerAdapter {

    private int count;

    /**
     * 客户端连接成功后,就会调用此方法,然后给服务器去发送数据
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接成功");

        for (int i=0; i<10; i++) {
            ctx.writeAndFlush("客户端消息" + i);
        }

        super.channelActive(ctx);
    }

    /**
     * 读取服务端发送的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String record = (String) msg;
        System.out.println("收到服务端消息:" + record);
        // 写回数据
        ctx.writeAndFlush(UUID.randomUUID());
    }

}

1.5 运行结果

在使用Netty进行数据传输,就会使用到ByteBuf,在传输的过程中我们都会希望把数据先进行编码,也就是 序列化,把对象或者字符串数据转成二进制在网络中进行传输,接收方然后通过解码把二进制又转成对象接 收数据,那么在传输数据的过程中就会出现拆包和粘包的现象,我们来看下粘包和拆包,粘包就是把多个小的 数据包封装成一个打包发送,拆包就是把一个大的数据包拆成多个小数据包发送。

2 拆包和粘包

粘包:数据从客户端发送到服务端,可以把多个小的数据包封装成一个大的数据包发送。

拆包:数据从客户端发送发到服务端,可以一个大的数据包拆分成多个小的数据包发送。

上面的案例中就发生了粘包,多个小的数据包合并成了一个大的数据包

服务端接收消息:客户端消息0客户端消息1客户端消息2客户端消息3客户端消息4客户端消息5客户端消息6客户端消息7客户端消息8客户端消息9
 

Netty中有4种解码器都是继承了ByteToMessageDecoder类,我们把这个类称之为一次解码器,他的作用是 通过ByteBuf去传递数据,我们把这种把ByteBuf(原始数据流) -> ByteBuf(用户数据) 称之为一次解码器, Netty中对于这个解码器提供了4种

3 netty四种解码器

3.1 FixedLengthFrameDecoder 固定长度解码器

客户端和服务端都是使用,然后设置每次传输数据的长度,所以这种方式就规定我们的数据长度是固定的,所以自然这种解码器不太好用

  // 服务端
  .addLast(new FixedLengthFrameDecoder(19))
 // 客户端
 .addLast(new FixedLengthFrameDecoder(37))

3.2 LineBasedFrameDecoder 发送消息加上换行符\n

这种是不固定长度,只需要告诉服务端什么时候可以结束即可,我们可以使用换行符表示数据的结束,在发送数据的末尾添加"\n"换行符

 //第二种:换行符 我每次\n结束 \n用的不多
.addLast(new LineBasedFrameDecoder(1024))
// 客户端:换行符
ctx.writeAndFlush("客户端消息" + i + "\n");

3.3 DelimiterBasedFrameDecoder 自定义分隔符解码器

使用其他方式作为结束标志的解码器,比如使用& ;等;只需要在发送数据的时候在数据后面加上分隔的符号就行了,只是分隔的符号需要包装成一个ByteBuf

// 第三种:分隔符
// maxFrameLength :报文的最大长度
// stripDelimiter:判断解码的时候是否要把分隔符去掉
// failFast:跟maxFrameLength合起来用,如果为true,如果大于消息的长度,直接抛异常,如果为false,
解码之后抛异常
// Delimiter:分隔符
ByteBuf buf = Unpooled.copiedBuffer("&".getBytes());
.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,true,true,buf))

3.4 LengthFieldBasedFrameDecoder 长度域解码器

长度域解码器是一种自定义的解码器,可以按照规定的方式去灵活的定义报文,下面会介绍下基本的解码器。

3.4.1 配置代码

 /**
   * maxFrameLength:防止内存溢出的保护机制,超过此长度的帧会被丢弃
  * lengthFieldOffset:长度字段在消息中的起始位置(从 0 开始)
  * lengthFieldLength:长度字段本身占用的字节数(1, 2, 3, 4, 8)
  * lengthAdjustment: 长度字段的值与实际内容长度的差值(可正可负)
  * initialBytesToStrip:解码后需要从帧头部跳过的字节数(通常用于跳过长度字段)
  */
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4, 0, 4))

3.4.2 参数详细解释

maxFrameLength:防止内存溢出的保护机制,超过此长度的帧会被丢弃
含义:允许的最大帧长度(字节数)
作用:防止内存溢出,保护系统安全
行为:当解码出的帧长度超过此值时,会抛出 TooLongFrameException,并关闭连接
建议值:根据业务最大消息大小设置,如 1024*1024(1MB)


lengthFieldOffset:长度字段在消息中的起始位置(从 0 开始)
含义:长度字段在整个消息中的起始偏移量(从 0 开始计数)
作用:定位长度字段的位置
示例:如果消息格式是 [标志位(1字节)][长度(2字节)][内容],则此值为 1


lengthFieldLength:长度字段本身占用的字节数(1, 2, 3, 4, 8)
含义:长度字段本身占用的字节数
允许值:1, 2, 3, 4, 8(分别对应不同范围的数值)
1 字节:0-255
2 字节:0-65535
4 字节:0-4294967295
注意:选择的字节数需能容纳业务最大消息长度


lengthAdjustment:长度字段的值与实际内容长度的差值(可正可负)
含义:长度调整值,用于修正长度字段的值
作用:当长度字段的值不正好等于内容长度时,通过此值进行调整
计算公式:实际内容长度 = 长度字段值 + lengthAdjustment
常见场景:
当长度字段包含自身长度时,使用负值(如 -2 表示减去长度字段自身的 2 字节)
当长度字段后还有其他固定字段时,使用正值(如 +1 表示加 1 字节的版本号字段)


initialBytesToStrip:解码后需要从帧头部跳过的字节数(通常用于跳过长度字段)
含义:解码后从帧的起始位置跳过的字节数
作用:移除不需要的头部信息(通常是长度字段本身)
常见设置:
0:保留所有字节(包括长度字段)
lengthFieldLength:跳过长度字段
其他值:跳过长度字段及前面 / 后面的额外字段

3.4.3 报文格式Length+Content

 // 服务端,获取数据的时候只获取内容,不过去填充的长度,所以最后一个参数为4
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4, 0, 4))
.addLast(new LengthFieldPrepender(4, 0, false))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new SimpleServerHandler());
// inbound执行顺序:LengthFieldBasedFrameDecoder->StringDecoder->SimpleServerHandler
// outbound执行顺序:StringEncoder->LengthFieldPrepende
// 客户端 服务端,我们获取数据的时候只获取内容,不过去填充的长度,所以最后一个参数为4
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4, 0, 4))
.addLast(new LengthFieldPrepender(4, 0, false))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new SimpleClientHandle());
// inbound执行顺序:LengthFieldBasedFrameDecoder->StringDecoder->SimpleClientHandle
// outbound执行顺序:StringEncoder->LengthFieldPrepende

3.4.4 报文格式Type+Length+Content

3.4.5 报文格式Length+Type+Content

Length从位置0开始,偏移量是4
Length的值为12(Type和Content看成是一个整体1+11),Content的长度和Length的值的差值是1
接收到的实际值从Type+Content,所以是4

3.4.6 报文格式Type1+Length+Type2+Content 


开始的1个字节是长度域,所以需要设置长度域偏移为1 长度域4个字节 我们需要把type2+body当做body处理,所以数据长度需要加1 接收数据不包括type1和长度域,所以需要跳过5个字节。

3.5 LengthFieldBasedFrameDecoder长度域的源码简单分析 

LengthFieldPrepender构造方法

public LengthFieldPrepender(ByteOrder byteOrder, int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength) {
// 长度字段是用字节表示的,只能是1 2 3 4 8
        if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) {
            throw new IllegalArgumentException("lengthFieldLength must be either 1, 2, 3, 4, or 8: " + lengthFieldLength);
        } else {
// 传入参数
            this.byteOrder = (ByteOrder)ObjectUtil.checkNotNull(byteOrder, "byteOrder");
            this.lengthFieldLength = lengthFieldLength;
            this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
            this.lengthAdjustment = lengthAdjustment;
        }
    }

 编码方法

 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
 // 看看是否需要添加长度字段的补偿值
        int length = msg.readableBytes() + this.lengthAdjustment;
        // 为true时,则length的值 = length的长度+内容的长度
        if (this.lengthIncludesLengthFieldLength) {
            length += this.lengthFieldLength;
        }

        ObjectUtil.checkPositiveOrZero(length, "length");
        switch (this.lengthFieldLength) {
            case 1:
                if (length >= 256) {
                    throw new IllegalArgumentException("length does not fit into a byte: " + length);
                }

                out.add(ctx.alloc().buffer(1).order(this.byteOrder).writeByte((byte)length));
                break;
            case 2:
                if (length >= 65536) {
                    throw new IllegalArgumentException("length does not fit into a short integer: " + length);
                }

                out.add(ctx.alloc().buffer(2).order(this.byteOrder).writeShort((short)length));
                break;
            case 3:
                if (length >= 16777216) {
                    throw new IllegalArgumentException("length does not fit into a medium integer: " + length);
                }

                out.add(ctx.alloc().buffer(3).order(this.byteOrder).writeMedium(length));
                break;
            case 4:
            // 首先按照大端序列或者小端序列写入4个字节的长度字段
                out.add(ctx.alloc().buffer(4).order(this.byteOrder).writeInt(length));
                break;
            case 5:
            case 6:
            case 7:
            default:
                throw new Error("should not reach here");
            case 8:
                out.add(ctx.alloc().buffer(8).order(this.byteOrder).writeLong((long)length));
        }
        // 在Netty中ByteBuf的回收使用的是引用计数器,当你为他增加一个引用时,请使用retain()增加一个引
        //用,当引用释放时请调用release()方法减少一个引用,不然在回收的时候将会出问题。
        // 把byteBuf保存到list中
        out.add(msg.retain());
        // 那么此时out的list中就会有两个对象,一个是代表长度的对象,一个是代表内容的对象
        // 最后会通过NioSocketChannel去发送数据

    }

     解码:看服务端的长度域解码器是如何解码的

   protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 解码
        Object decoded = this.decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }

    }
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (this.discardingTooLongFrame) {
            this.discardingTooLongFrame(in);
        }

        if (in.readableBytes() < this.lengthFieldEndOffset) {
            return null;
        } else {
// lengthFieldOffset表示长度字段需不需要偏移,这里不需要,报文的第一个就是长度字段
            int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
// 获取到长度字段,并且讲字段表示的内容的值返回
            long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
// 如果内容小于0,直接报错
            if (frameLength < 0L) {
                failOnNegativeLengthField(in, frameLength, this.lengthFieldEndOffset);
            }
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength = 0 + 4 = 4
// frameLength = 内容 + 长度,其实这里就是获取整个报文的长度
            frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
// 如果整个报文的长度比lengthFieldEndOffset还小,就说明没有内容
            if (frameLength < (long)this.lengthFieldEndOffset) {
                failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, this.lengthFieldEndOffset);
            }
// 如果整个报文的长度比最大还要大,另行处理

            if (frameLength > (long)this.maxFrameLength) {
                this.exceededFrameLength(in, frameLength);
                return null;
            } else {
                int frameLengthInt = (int)frameLength;
                if (in.readableBytes() < frameLengthInt) {
                    return null;
                } else {
// 跳过长度字节,如果大于最大的内容长度,数据有问题
                    if (this.initialBytesToStrip > frameLengthInt) {
                        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, this.initialBytesToStrip);
                    }
// 开始跳过4个字节,直接获取内容
                    in.skipBytes(this.initialBytesToStrip);
                    int readerIndex = in.readerIndex();
                    int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
// 返回内容
                    ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
                    in.readerIndex(readerIndex + actualFrameLength);
                    return frame;
                }
            }
        }
    }


                
                               

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐