C# 优雅的为Tcp客户端设计心跳数据包
心跳机制是定时发送一个自定义的 结构体 (心跳包),让对方知道自己还活着,以确保连接的有效性的机制。 网络中的接收和发送数据都是使用操作系统中的 SOCKET 进行实现。 但是如果此 套接字 已经断开,那发送数据和接收数据的时候就一定会有问题。 可是如何判断这个套接字是否还可以使用呢? 这个就需要在系统中创建心跳机制。 其实TCP中已经为我们实现了一个叫做心跳的机制。但是该机制受限于操作系统,而且
文章目录
一、说明
为什么要设置心跳?
心跳机制是定时发送一个自定义的 结构体 (心跳包),让对方知道自己还活着,以确保连接的有效性的机制。 网络中的接收和发送数据都是使用操作系统中的 SOCKET 进行实现。 但是如果此 套接字 已经断开,那发送数据和接收数据的时候就一定会有问题。 可是如何判断这个套接字是否还可以使用呢? 这个就需要在系统中创建心跳机制。 其实TCP中已经为我们实现了一个叫做心跳的机制。
但是该机制受限于操作系统,而且很容易误报。所以很少被大家使用。
大家使用最多的,就是自己设计数据包,然后预留心跳格式,当对方收到心跳包时,直接返回响应包即可。
那么,按这个思路,让我们使用TouchSocket优雅的实现吧。
二、程序集源码
2.1 源码位置
2.2 说明文档
三、安装
Nuget安装TouchSocket即可,具体步骤详看链接博客。
四、数据格式
4.1 设计数据格式
使用心跳之前,必须要明确数据格式,绝对不能混淆业务数据。一般在适配Plc等现成模块时,他们是有固定的数据格式,这时候你可以参阅数据处理适配器,快速的解析数据。
但是在本文中,并没有规定的格式,所以我们需要先设计一种简单高效的数据格式。
如下:
| 数据长度 | 数据类型 | 载荷数据 |
|---|---|---|
| 2字节(Ushort) | 1字节(Byte) | n字节(<65535) |
4.2 解析数据格式
下列代码主要实现对上述数据格式的解析
internal class MyFixedHeaderDataHandlingAdapter : CustomFixedHeaderDataHandlingAdapter<MyRequestInfo>
{
public override int HeaderLength => 3;
public override bool CanSendRequestInfo => false;
protected override MyRequestInfo GetInstance()
{
return new MyRequestInfo();
}
protected override void PreviewSend(IRequestInfo requestInfo)
{
throw new NotImplementedException();
}
}
internal class MyRequestInfo : IFixedHeaderRequestInfo
{
public DataType DataType { get; set; }
public byte[] Data { get; set; }
public int BodyLength { get; private set; }
public bool OnParsingBody(byte[] body)
{
if (body.Length == this.BodyLength)
{
this.Data = body;
return true;
}
return false;
}
public bool OnParsingHeader(byte[] header)
{
if (header.Length == 3)
{
this.BodyLength = TouchSocketBitConverter.Default.ToUInt16(header, 0) - 1;
this.DataType = (DataType)header[2];
return true;
}
return false;
}
public void Package(ByteBlock byteBlock)
{
byteBlock.Write((ushort)((this.Data == null ? 0 : this.Data.Length) + 1));
byteBlock.Write((byte)this.DataType);
if (this.Data != null)
{
byteBlock.Write(this.Data);
}
}
public byte[] PackageAsBytes()
{
using var byteBlock = new ByteBlock();
this.Package(byteBlock);
return byteBlock.ToArray();
}
public override string ToString()
{
return $"数据类型={this.DataType},数据={(this.Data == null ? "null" : Encoding.UTF8.GetString(this.Data))}";
}
}
internal enum DataType : byte
{
Ping,
Pong,
Data
}
五、创建扩展类
下列代码可选,主要实现对Client增加Ping的扩展方法。方便调用。
/// <summary>
/// 一个心跳计数器扩展。
/// </summary>
internal static class DependencyExtensions
{
public static readonly DependencyProperty<Timer> HeartbeatTimerProperty =
DependencyProperty<Timer>.Register("HeartbeatTimer", null);
public static bool Ping<TClient>(this TClient client) where TClient : ITcpClientBase
{
try
{
client.Send(new MyRequestInfo() { DataType = DataType.Ping }.PackageAsBytes());
return true;
}
catch (Exception ex)
{
client.Logger.Exception(ex);
}
return false;
}
public static bool Pong<TClient>(this TClient client) where TClient : ITcpClientBase
{
try
{
client.Send(new MyRequestInfo() { DataType = DataType.Pong }.PackageAsBytes());
return true;
}
catch (Exception ex)
{
client.Logger.Exception(ex);
}
return false;
}
}
六、创建心跳插件类
下列代码主要实现心跳插件的功能。默认每五秒自动触发一次。且接收方收到Ping后,直接会回复Pong。
internal class HeartbeatAndReceivePlugin : PluginBase, ITcpConnectedPlugin<ITcpClientBase>, ITcpDisconnectedPlugin<ITcpClientBase>, ITcpReceivedPlugin<ITcpClientBase>
{
private readonly int m_timeTick;
private readonly ILog logger;
[DependencyInject]
public HeartbeatAndReceivePlugin( ILog logger,int timeTick= 1000 * 5)
{
this.m_timeTick = timeTick;
this.logger = logger;
}
public async Task OnTcpConnected(ITcpClientBase client, ConnectedEventArgs e)
{
if (client is ISocketClient)
{
return;//此处可判断,如果为服务器,则不用使用心跳。
}
if (client.GetValue(DependencyExtensions.HeartbeatTimerProperty) is Timer timer)
{
timer.Dispose();
}
client.SetValue(DependencyExtensions.HeartbeatTimerProperty, new Timer((o) =>
{
client.Ping();
}, null, 0, this.m_timeTick));
await e.InvokeNext();
}
public async Task OnTcpDisconnected(ITcpClientBase client, DisconnectEventArgs e)
{
if (client.GetValue(DependencyExtensions.HeartbeatTimerProperty) is Timer timer)
{
timer.Dispose();
client.SetValue(DependencyExtensions.HeartbeatTimerProperty, null);
}
await e.InvokeNext();
}
public async Task OnTcpReceived(ITcpClientBase client, ReceivedDataEventArgs e)
{
if (e.RequestInfo is MyRequestInfo myRequest)
{
this.logger.Info(myRequest.ToString());
if (myRequest.DataType == DataType.Ping)
{
client.Pong();
}
}
await e.InvokeNext();
}
}
七、测试、启动
/// <summary>
/// 示例心跳。
/// 博客地址<see href="https://blog.csdn.net/qq_40374647/article/details/125598921"/>
/// </summary>
/// <param name="args"></param>
private static void Main(string[] args)
{
var consoleAction = new ConsoleAction();
//服务器
var service = new TcpService();
service.Setup(new TouchSocketConfig()//载入配置
.SetListenIPHosts(new IPHost[] { new IPHost("127.0.0.1:7789"), new IPHost(7790) })//同时监听两个地址
.SetTcpDataHandlingAdapter(() => new MyFixedHeaderDataHandlingAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<HeartbeatAndReceivePlugin>();
}))
.Start();//启动
service.Logger.Info("服务器成功启动");
//客户端
var tcpClient = new TcpClient();
tcpClient.Setup(new TouchSocketConfig()
.SetRemoteIPHost(new IPHost("127.0.0.1:7789"))
.SetTcpDataHandlingAdapter(() => new MyFixedHeaderDataHandlingAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<HeartbeatAndReceivePlugin>();
}));
tcpClient.Connect();
tcpClient.Logger.Info("客户端成功连接");
consoleAction.OnException += ConsoleAction_OnException;
consoleAction.Add("1", "发送心跳", () =>
{
tcpClient.Ping();
});
consoleAction.Add("2", "发送数据", () =>
{
tcpClient.Send(new MyRequestInfo()
{
DataType = DataType.Data,
Data = Encoding.UTF8.GetBytes(Console.ReadLine())
}
.PackageAsBytes());
});
consoleAction.ShowAll();
while (true)
{
consoleAction.Run(Console.ReadLine());
}
}
private static void ConsoleAction_OnException(Exception obj)
{
Console.WriteLine(obj);
}
八、效果

本文示例demo
更多推荐


所有评论(0)