Netty从0到1系列之拆包、粘包【5】
本文介绍了Netty自定义通信协议的设计与实现。首先阐述了协议设计原则,包括消息边界、元数据、扩展性等要点。接着详细设计了一个包含魔数、版本号、序列化算法、消息类型等字段的协议结构,并提供了Mermaid格式的示意图。最后给出了Java实现示例,包括消息实体类定义,展示了如何封装协议字段(如魔数、版本号、序列化算法等)。该设计方案可有效解决粘包拆包问题,支持协议升级,适用于高性能网络通信场景。
文章目录
推荐阅读:
【01】Netty从0到1系列之I/O模型
【02】Netty从0到1系列之NIO
【03】Netty从0到1系列之Selector
【04】Netty从0到1系列之Channel
【05】Netty从0到1系列之Buffer(上)
【06】Netty从0到1系列之Buffer(下)
【07】Netty从0到1系列之零拷贝技术
【08】Netty从0到1系列之整体架构、入门程序
【09】Netty从0到1系列之EventLoop
【10】Netty从0到1系列之EventLoopGroup
【11】Netty从0到1系列之Future
【12】Netty从0到1系列之Promise
【13】Netty从0到1系列之Netty Channel
【14】Netty从0到1系列之ChannelFuture
【15】Netty从0到1系列之CloseFuture
【16】Netty从0到1系列之Netty Handler
【17】Netty从0到1系列之Netty Pipeline【上】
【18】Netty从0到1系列之Netty Pipeline【下】
【19】Netty从0到1系列之Netty ByteBuf【上】
【20】Netty从0到1系列之Netty ByteBuf【中】
【21】Netty从0到1系列之Netty ByteBuf【下】
【22】Netty从0到1系列之Netty 逻辑架构【上】
【23】Netty从0到1系列之Netty 逻辑架构【下】
【24】Netty从0到1系列之Netty 启动细节分析
【25】Netty从0到1系列之Netty 线程模型【上】
【26】Netty从0到1系列之Netty 线程模型【下】
【27】Netty从0到1系列之Netty ChannelPipeline
【28】Netty从0到1系列之Netty ChannelHandler
【29】Netty从0到1系列之Netty拆包、粘包【1】
【30】Netty从0到1系列之Netty拆包、粘包【2】
【31】Netty从0到1系列之Netty拆包、粘包【3】
【32】Netty从0到1系列之Netty拆包、粘包【4】
Netty拆包、粘包【5】
3.6 Netty自定义通信协议实现
在实际项目开发中,基于 Netty 实现自定义通信协议是非常常见的需求。自定义协议可以根据业务场景进行优化,提供更高的性能和安全性。下面我们将详细讲解如何设计和实现一个自定义通信协议。
3.6.1 自定义协议设计原则
一个设计良好的通信协议应考虑以下几点:
- 明确的消息边界(解决粘包拆包问题)
- 必要的元数据(版本、类型、长度等)
- 可扩展性(方便后续协议升级)
- 安全性(可选,如校验机制)
- 高效性(减少冗余数据)
3.6.2 协议结构设计
我们设计一个包含以下字段的自定义协议:
各个字段说明:
- 魔数:用于快速识别协议类型,防止非法数据
- 版本号:支持协议升级
- 序列化算法:标识数据的序列化方式(如 JSON、Protobuf 等)
- 消息类型:区分不同业务消息(如登录、心跳、业务请求等)
- 状态:标识请求的处理状态(仅响应消息使用)
- 消息 ID:用于消息标识和异步通信
- 数据长度:数据内容的长度
- 数据内容:实际的业务数据
- 校验和:可选,用于数据完整性校验
3.6.3 协议实现示例
✅ 1. 定义消息实体类
package cn.tcmeta.customeprotocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Data;
/**
* @author: laoren
* @date: 2025/9/15 14:30
* @description: 自定义通信协议实体类
* @version: 1.0.0
*/
@Data
public class CustomProtocol {
// 魔数,用于标识协议类型
public static final int MAGIC_NUMBER = 0xCAFE;
// 版本号
private byte version;
// 序列化算法:0-JSON, 1-Protobuf, 2-Java
private byte serializeAlgorithm;
// 消息类型:0-请求, 1-响应, 2-心跳
private byte messageType;
// 状态:0-成功, 1-失败, 2-超时 (仅响应消息使用)
private byte status;
// 消息ID
private long messageId;
// 数据内容
private byte[] data;
// 构造方法
public CustomProtocol() {}
public CustomProtocol(byte version, byte serializeAlgorithm, byte messageType,
byte status, long messageId, byte[] data) {
this.version = version;
this.serializeAlgorithm = serializeAlgorithm;
this.messageType = messageType;
this.status = status;
this.messageId = messageId;
this.data = data;
}
/**
* 编码方法:将协议对象转换为ByteBuf
* @return ByteBuf 数据对象
*/
public ByteBuf encode() {
// 计算总长度:魔数(4) + 版本(1) + 序列化算法(1) + 消息类型(1) + 状态(1) + 消息ID(8) + 数据长度(4) + 数据长度 + 校验和(4)
int dataLength = data == null ? 0 : data.length;
int totalLength = 4 + 1 + 1 + 1 + 1 + 8 + 4 + dataLength + 4;
// 创建ByteBuf
ByteBuf byteBuf = Unpooled.buffer(totalLength);
// 写入魔数
byteBuf.writeInt(MAGIC_NUMBER);
// 写入版本号
byteBuf.writeByte(version);
// 写入序列化算法
byteBuf.writeByte(serializeAlgorithm);
// 写入消息类型
byteBuf.writeByte(messageType);
// 写入状态
byteBuf.writeByte(status);
// 写入消息ID
byteBuf.writeLong(messageId);
// 写入数据长度
byteBuf.writeInt(dataLength);
// 写入数据内容
if (dataLength > 0) {
byteBuf.writeBytes(data);
}
// 计算并写入校验和
int checksum = calculateChecksum();
byteBuf.writeInt(checksum);
return byteBuf;
}
// 解码方法:从ByteBuf转换为协议对象
public static CustomProtocol decode(ByteBuf byteBuf) {
// 读取魔数
int magicNumber = byteBuf.readInt();
if (magicNumber != MAGIC_NUMBER) {
throw new IllegalArgumentException("Invalid magic number, not a custom protocol message");
}
// 读取版本号
byte version = byteBuf.readByte();
// 读取序列化算法
byte serializeAlgorithm = byteBuf.readByte();
// 读取消息类型
byte messageType = byteBuf.readByte();
// 读取状态
byte status = byteBuf.readByte();
// 读取消息ID
long messageId = byteBuf.readLong();
// 读取数据长度
int dataLength = byteBuf.readInt();
// 读取数据内容
byte[] data = new byte[dataLength];
if (dataLength > 0) {
byteBuf.readBytes(data);
}
// 读取校验和并验证
int checksum = byteBuf.readInt();
CustomProtocol protocol = new CustomProtocol(version, serializeAlgorithm, messageType,
status, messageId, data);
if (protocol.calculateChecksum() != checksum) {
throw new IllegalArgumentException("Checksum verification failed, data may be corrupted");
}
return protocol;
}
// 计算校验和
private int calculateChecksum() {
int checksum = MAGIC_NUMBER;
checksum ^= version & 0xFF;
checksum ^= serializeAlgorithm & 0xFF;
checksum ^= messageType & 0xFF;
checksum ^= status & 0xFF;
// 消息ID分4次加入校验和计算
checksum ^= (int)(messageId >> 32) & 0xFFFFFFFF;
checksum ^= (int)messageId & 0xFFFFFFFF;
// 数据内容加入校验和计算
if (data != null) {
for (byte b : data) {
checksum ^= b & 0xFF;
}
}
return checksum;
}
}
✅ 2. 自定义协议解码器
package cn.tcmeta.customeprotocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @author: laoren
* @date: 2025/9/15 14:56
* @description: 实现解码器
* @version: 1.0.0
*/
public class CustomProtocolDecoder extends ByteToMessageDecoder {
// 协议头部长度:魔数(4) + 版本(1) + 序列化算法(1) + 消息类型(1) + 状态(1) + 消息ID(8) + 数据长度(4) + 校验和(4)
private static final int HEADER_LENGTH = 4 + 1 + 1 + 1 + 1 + 8 + 4 + 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 确保至少有头部长度的数据
if (in.readableBytes() < HEADER_LENGTH) {
return; // 数据不中,等待更多数据
}
// 标记当前读取位置,以便重置
in.markReaderIndex();
// 读取魔数,判断是否是我们的协议
int magicNumber = in.readInt(); // 2字节
if (magicNumber != CustomProtocol.MAGIC_NUMBER) {
ctx.close(); // 关闭连接
return; // 不是我们的协议,忽略
}
// 回退到初始的位置,因为decode方法需要完整的读取整个协议包
in.resetReaderIndex();
// 读取数据长度: 头部长度 + 数据长度
// 先跳过前面16个字节(魔数4+ 版本1+序列化算法1+消息类型1+状态1+消息ID 8)
in.skipBytes(16);
int dataLength = in.readInt();
int totalLength = HEADER_LENGTH + dataLength;
in.resetReaderIndex(); // 重置一下,从头开始读取整个数据的长度
// 检查是否有足够的数据
if (in.readableBytes() < totalLength) {
in.resetReaderIndex(); // 重置读取位置
return;
}
// 读取完整的协议包
ByteBuf frame = in.readBytes(totalLength);
// 解码并添加到输出列表
CustomProtocol customProtocol = CustomProtocol.decode(frame);
out.add(customProtocol);
}
}
✅ 3. 自定义协议编码器
package cn.tcmeta.customeprotocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author: laoren
* @date: 2025/9/15 14:52
* @description: 实现编码器
* @version: 1.0.0
*/
public class CustomProtocolEncoder extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
ByteBuf encode = msg.encode();
out.writeBytes(encode);
}
}
✅ 4. 服务器端处理消息的Handler
package cn.tcmeta.customeprotocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author: laoren
* @date: 2025/9/15 15:20
* @description: ServerHandler
* @version: 1.0.0
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
// 消息ID生成器
private static final AtomicLong MESSAGE_ID_GENERATOR = new AtomicLong(1);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
if (msg instanceof CustomProtocol protocol) {
System.out.println("Received from client: " + protocol);
// 根据消息类型处理
switch (protocol.getMessageType()) {
case 0: // 请求消息
handleRequest(ctx, protocol);
break;
case 2: // 心跳消息
handleHeartbeat(ctx, protocol);
break;
default:
System.out.println("Unknown message type: " + protocol.getMessageType());
}
}
}
// 处理请求消息
private void handleRequest(ChannelHandlerContext ctx, CustomProtocol request) throws IOException {
// 从请求中解析数据
String requestData = SerializationUtil.bytesToString(request.getData());
System.out.println("Request data: " + requestData);
// 构建响应数据
String responseData = "Server received: " + requestData;
byte[] responseBytes = SerializationUtil.stringToBytes(responseData);
// 创建响应协议对象
CustomProtocol response = new CustomProtocol(
request.getVersion(), // 保持版本一致
request.getSerializeAlgorithm(), // 保持序列化算法一致
(byte) 1, // 响应消息类型
(byte) 0, // 成功状态
MESSAGE_ID_GENERATOR.getAndIncrement(), // 新的消息ID
responseBytes
);
// 发送响应
ctx.writeAndFlush(response);
}
// 处理心跳消息
private void handleHeartbeat(ChannelHandlerContext ctx, CustomProtocol heartbeat) {
System.out.println("Received heartbeat from client");
// 创建心跳响应
CustomProtocol response = new CustomProtocol(
heartbeat.getVersion(),
heartbeat.getSerializeAlgorithm(),
(byte) 2, // 心跳消息类型
(byte) 0, // 成功状态
heartbeat.getMessageId(), // 使用相同的消息ID
new byte[0] // 心跳响应不需要数据
);
// 发送心跳响应
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
✅ 5. 客户端处理Handler
package cn.tcmeta.customeprotocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 客户端处理器
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected to server");
// 连接成功后发送一条测试消息
try {
String message = "Hello, this is a custom protocol message!";
byte[] data = SerializationUtil.stringToBytes(message);
CustomProtocol protocol = new CustomProtocol(
(byte) 1, // 版本号
SerializationUtil.JSON, // 使用JSON序列化
(byte) 0, // 请求消息类型
(byte) 0, // 状态(未使用)
System.currentTimeMillis(), // 消息ID
data
);
ctx.writeAndFlush(protocol);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof CustomProtocol protocol) {
System.out.println("Received from server: " + protocol);
// 根据消息类型处理
switch (protocol.getMessageType()) {
case 1: // 响应消息
handleResponse(protocol);
break;
case 2: // 心跳响应
handleHeartbeatResponse(protocol);
break;
default:
System.out.println("Unknown message type: " + protocol.getMessageType());
}
}
}
// 处理响应消息
private void handleResponse(CustomProtocol response) {
String responseData = SerializationUtil.bytesToString(response.getData());
System.out.println("Server response: " + responseData);
}
// 处理心跳响应
private void handleHeartbeatResponse(CustomProtocol response) {
System.out.println("Received heartbeat response from server");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
✅ 6. 序列化与反序列化工具类封装
引入相关依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.19.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.28.2</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.7.2</version>
</dependency>
package cn.tcmeta.customeprotocol;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
/**
* 序列化工具类,支持多种序列化方式
*/
public class SerializationUtil {
// 序列化算法常量
public static final byte JSON = 0;
public static final byte PROTOBUF = 1;
public static final byte JAVA = 2;
// JSON序列化器
private static final ObjectMapper objectMapper = new ObjectMapper();
/**
* 根据指定算法序列化对象
*/
public static byte[] serialize(Object obj, byte algorithm) throws IOException {
return switch (algorithm) {
case JSON -> jsonSerialize(obj);
case PROTOBUF -> protobufSerialize(obj);
case JAVA -> javaSerialize(obj);
default -> throw new IllegalArgumentException("Unsupported serialization algorithm: " + algorithm);
};
}
/**
* 根据指定算法反序列化对象
*/
public static <T> T deserialize(byte[] data, Class<T> clazz, byte algorithm) throws IOException, ClassNotFoundException {
if (data == null || data.length == 0) {
return null;
}
return switch (algorithm) {
case JSON -> jsonDeserialize(data, clazz);
case PROTOBUF -> protobufDeserialize(data, clazz);
case JAVA -> javaDeserialize(data);
default -> throw new IllegalArgumentException("Unsupported serialization algorithm: " + algorithm);
};
}
/**
* JSON序列化
*/
private static byte[] jsonSerialize(Object obj) throws JsonProcessingException {
return objectMapper.writeValueAsBytes(obj);
}
/**
* JSON反序列化
*/
private static <T> T jsonDeserialize(byte[] data, Class<T> clazz) throws IOException {
return objectMapper.readValue(data, clazz);
}
/**
* Protobuf序列化
*/
private static byte[] protobufSerialize(Object obj) {
@SuppressWarnings("unchecked")
Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(obj.getClass());
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
}
/**
* Protobuf反序列化
*/
private static <T> T protobufDeserialize(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = RuntimeSchema.getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Protobuf deserialization failed", e);
}
}
/**
* Java原生序列化
*/
private static byte[] javaSerialize(Object obj) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(obj);
return bos.toByteArray();
}
}
/**
* Java原生反序列化
*/
@SuppressWarnings("unchecked")
private static <T> T javaDeserialize(byte[] data) throws IOException, ClassNotFoundException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (T) ois.readObject();
}
}
/**
* 将字符串转换为字节数组(用于简单文本消息)
*/
public static byte[] stringToBytes(String str) {
return str.getBytes(StandardCharsets.UTF_8);
}
/**
* 将字节数组转换为字符串(用于简单文本消息)
*/
public static String bytesToString(byte[] data) {
if (data == null) {
return null;
}
return new String(data, StandardCharsets.UTF_8);
}
}
✅ 7. 服务器
package cn.tcmeta.customeprotocol;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 基于自定义协议的服务器
*/
public class ProtocolServer {
private final int port;
public ProtocolServer(int port) {
this.port = port;
}
public void start() throws Exception {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(2);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加自定义协议的编解码器
p.addLast(new CustomProtocolDecoder());
p.addLast(new CustomProtocolEncoder());
// 添加业务处理器
p.addLast(new ServerHandler());
}
});
// 启动服务器
ChannelFuture f = b.bind(port).sync();
System.out.println("Custom protocol server started on port " + port);
// 等待服务器关闭
f.channel().closeFuture().sync();
} finally {
// 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new ProtocolServer(port).start();
}
}
✅ 客户端
package cn.tcmeta.customeprotocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 基于自定义协议的客户端
*/
public class ProtocolClient {
private final String host;
private final int port;
private Channel channel;
private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
public ProtocolClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加自定义协议的编解码器
p.addLast(new CustomProtocolDecoder());
p.addLast(new CustomProtocolEncoder());
// 添加客户端处理器
p.addLast(new ClientHandler());
}
});
// 连接服务器
ChannelFuture f = b.connect(host, port).sync();
this.channel = f.channel();
System.out.println("Connected to server " + host + ":" + port);
// 启动心跳定时器
startHeartbeat();
// 等待连接关闭
f.channel().closeFuture().sync();
} finally {
// 关闭资源
heartbeatExecutor.shutdown();
group.shutdownGracefully();
}
}
// 启动心跳机制
private void startHeartbeat() {
heartbeatExecutor.scheduleAtFixedRate(() -> {
if (channel != null && channel.isActive()) {
try {
sendHeartbeat();
} catch (IOException e) {
e.printStackTrace();
}
}
}, 5, 5, TimeUnit.SECONDS); // 每5秒发送一次心跳
}
// 发送心跳消息
public void sendHeartbeat() throws IOException {
CustomProtocol heartbeat = new CustomProtocol(
(byte) 1, // 版本号
SerializationUtil.JSON, // 使用JSON序列化
(byte) 2, // 心跳消息类型
(byte) 0, // 状态(未使用)
System.currentTimeMillis(), // 消息ID使用时间戳
new byte[0] // 心跳消息没有数据
);
channel.writeAndFlush(heartbeat);
System.out.println("Sent heartbeat");
}
// 发送请求消息
public void sendRequest(String message) throws IOException {
if (channel == null || !channel.isActive()) {
throw new IllegalStateException("Not connected to server");
}
// 序列化消息
byte[] data = SerializationUtil.stringToBytes(message);
// 创建协议对象
CustomProtocol protocol = new CustomProtocol(
(byte) 1, // 版本号
SerializationUtil.JSON, // 使用JSON序列化
(byte) 0, // 请求消息类型
(byte) 0, // 状态(未使用)
System.currentTimeMillis(), // 消息ID使用时间戳
data
);
// 发送消息
channel.writeAndFlush(protocol);
System.out.println("Sent request: " + message);
}
public static void main(String[] args) throws Exception {
ProtocolClient client = new ProtocolClient("localhost", 8080);
client.start();
// 在实际应用中,这里可以根据业务逻辑发送请求
// client.sendRequest("Hello, this is a custom protocol message!");
}
}
✅ 8. 日志输出
- 服务器
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=0, status=0, messageId=1757924189726, data=[72, 101, 108, 108, 111, 44, 32, 116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 99, 117, 115, 116, 111, 109, 32, 112, 114, 111, 116, 111, 99, 111, 108, 32, 109, 101, 115, 115, 97, 103, 101, 33])
Request data: Hello, this is a custom protocol message!
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924194606, data=[])
Received heartbeat from client
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924199607, data=[])
Received heartbeat from client
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924204607, data=[])
Received heartbeat from client
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924209606, data=[])
Received heartbeat from client
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924214606, data=[])
Received heartbeat from client
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924219606, data=[])
Received heartbeat from client
Received from client: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924224606, data=[])
Received heartbeat from client
- 客户端日志输出
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=1, status=0, messageId=1, data=[83, 101, 114, 118, 101, 114, 32, 114, 101, 99, 101, 105, 118, 101, 100, 58, 32, 72, 101, 108, 108, 111, 44, 32, 116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 99, 117, 115, 116, 111, 109, 32, 112, 114, 111, 116, 111, 99, 111, 108, 32, 109, 101, 115, 115, 97, 103, 101, 33])
Server response: Server received: Hello, this is a custom protocol message!
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924194606, data=[])
Received heartbeat response from server
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924199607, data=[])
Received heartbeat response from server
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924204607, data=[])
Received heartbeat response from server
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924209606, data=[])
Received heartbeat response from server
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924214606, data=[])
Received heartbeat response from server
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924219606, data=[])
Received heartbeat response from server
Sent heartbeat
Received from server: CustomProtocol(version=1, serializeAlgorithm=0, messageType=2, status=0, messageId=1757924224606, data=[])
Received heartbeat response from server
3.6.4 协议工作流程
3.6.5 自定义协议的优势、注意事项
- 优势
-
灵活性高:可以根据业务需求定制协议字段
-
性能优化:相比通用协议(如 HTTP)更精简,减少冗余数据
-
安全性:可以添加加密和校验机制
-
可扩展性:通过版本号支持协议升级
- 注意事项
-
兼容性:协议升级时要考虑向后兼容
-
错误处理:要有完善的异常处理机制,处理非法数据
-
性能考量:序列化算法的选择对性能影响很大,Protobuf 通常优于 JSON
-
调试工具:自定义协议需要配套的调试工具,方便开发和问题排查
-
文档:必须有清晰的协议文档,说明各字段含义和使用方式
- 实践经验总结
-
魔数设计:选择一个独特的魔数,避免与其他协议冲突
-
版本控制:从一开始就设计版本字段,为后续升级做准备
-
序列化选择:根据场景选择合适的序列化方式,兼顾性能和可读性
-
心跳机制:对于长连接,必须实现心跳机制检测连接状态
-
消息 ID:用于消息追踪和异步通信,建议使用全局唯一 ID
-
最大长度限制:防止超大消息导致的内存问题
-
校验机制:关键场景下添加校验和,确保数据完整性
更多推荐
所有评论(0)