一、说明

为什么要设置心跳?

心跳机制是定时发送一个自定义的 结构体 (心跳包),让对方知道自己还活着,以确保连接的有效性的机制。 网络中的接收和发送数据都是使用操作系统中的 SOCKET 进行实现。 但是如果此 套接字 已经断开,那发送数据和接收数据的时候就一定会有问题。 可是如何判断这个套接字是否还可以使用呢? 这个就需要在系统中创建心跳机制。 其实TCP中已经为我们实现了一个叫做心跳的机制。

但是该机制受限于操作系统,而且很容易误报。所以很少被大家使用。

大家使用最多的,就是自己设计数据包,然后预留心跳格式,当对方收到心跳包时,直接返回响应包即可。

那么,按这个思路,让我们使用TouchSocket优雅的实现吧。


二、程序集源码

2.1 源码位置
2.2 说明文档

文档首页

三、安装

Nuget安装TouchSocket即可,具体步骤详看链接博客。

VS、Unity安装和使用Nuget包

四、数据格式

4.1 设计数据格式

使用心跳之前,必须要明确数据格式,绝对不能混淆业务数据。一般在适配Plc等现成模块时,他们是有固定的数据格式,这时候你可以参阅数据处理适配器,快速的解析数据。

但是在本文中,并没有规定的格式,所以我们需要先设计一种简单高效的数据格式。

如下:

数据长度 数据类型 载荷数据
2字节(Ushort) 1字节(Byte) n字节(<65535)
4.2 解析数据格式

下列代码主要实现对上述数据格式的解析

internal class MyFixedHeaderDataHandlingAdapter : CustomFixedHeaderDataHandlingAdapter<MyRequestInfo>
{
    public override int HeaderLength => 3;

    public override bool CanSendRequestInfo => false;

    protected override MyRequestInfo GetInstance()
    {
        return new MyRequestInfo();
    }

    protected override void PreviewSend(IRequestInfo requestInfo)
    {
        throw new NotImplementedException();
    }
}

internal class MyRequestInfo : IFixedHeaderRequestInfo
{
    public DataType DataType { get; set; }
    public byte[] Data { get; set; }

    public int BodyLength { get; private set; }

    public bool OnParsingBody(byte[] body)
    {
        if (body.Length == this.BodyLength)
        {
            this.Data = body;
            return true;
        }
        return false;
    }

    public bool OnParsingHeader(byte[] header)
    {
        if (header.Length == 3)
        {
            this.BodyLength = TouchSocketBitConverter.Default.ToUInt16(header, 0) - 1;
            this.DataType = (DataType)header[2];
            return true;
        }
        return false;
    }

    public void Package(ByteBlock byteBlock)
    {
        byteBlock.Write((ushort)((this.Data == null ? 0 : this.Data.Length) + 1));
        byteBlock.Write((byte)this.DataType);
        if (this.Data != null)
        {
            byteBlock.Write(this.Data);
        }
    }

    public byte[] PackageAsBytes()
    {
        using var byteBlock = new ByteBlock();
        this.Package(byteBlock);
        return byteBlock.ToArray();
    }

    public override string ToString()
    {
        return $"数据类型={this.DataType},数据={(this.Data == null ? "null" : Encoding.UTF8.GetString(this.Data))}";
    }
}

internal enum DataType : byte
{
    Ping,
    Pong,
    Data
}

五、创建扩展类

下列代码可选,主要实现对Client增加Ping的扩展方法。方便调用。

/// <summary>
/// 一个心跳计数器扩展。
/// </summary>
internal static class DependencyExtensions
{
    public static readonly DependencyProperty<Timer> HeartbeatTimerProperty =
        DependencyProperty<Timer>.Register("HeartbeatTimer", null);

    public static bool Ping<TClient>(this TClient client) where TClient : ITcpClientBase
    {
        try
        {
            client.Send(new MyRequestInfo() { DataType = DataType.Ping }.PackageAsBytes());
            return true;
        }
        catch (Exception ex)
        {
            client.Logger.Exception(ex);
        }

        return false;
    }

    public static bool Pong<TClient>(this TClient client) where TClient : ITcpClientBase
    {
        try
        {
            client.Send(new MyRequestInfo() { DataType = DataType.Pong }.PackageAsBytes());
            return true;
        }
        catch (Exception ex)
        {
            client.Logger.Exception(ex);
        }

        return false;
    }
}

六、创建心跳插件类

下列代码主要实现心跳插件的功能。默认每五秒自动触发一次。且接收方收到Ping后,直接会回复Pong。

internal class HeartbeatAndReceivePlugin : PluginBase, ITcpConnectedPlugin<ITcpClientBase>, ITcpDisconnectedPlugin<ITcpClientBase>, ITcpReceivedPlugin<ITcpClientBase>
{
    private readonly int m_timeTick;
    private readonly ILog logger;

    [DependencyInject]
    public HeartbeatAndReceivePlugin( ILog logger,int timeTick= 1000 * 5)
    {
        this.m_timeTick = timeTick;
        this.logger = logger;
    }


    public async Task OnTcpConnected(ITcpClientBase client, ConnectedEventArgs e)
    {
        if (client is ISocketClient)
        {
            return;//此处可判断,如果为服务器,则不用使用心跳。
        }

        if (client.GetValue(DependencyExtensions.HeartbeatTimerProperty) is Timer timer)
        {
            timer.Dispose();
        }

        client.SetValue(DependencyExtensions.HeartbeatTimerProperty, new Timer((o) =>
        {
            client.Ping();
        }, null, 0, this.m_timeTick));
        await e.InvokeNext();
    }

    public async Task OnTcpDisconnected(ITcpClientBase client, DisconnectEventArgs e)
    {
        if (client.GetValue(DependencyExtensions.HeartbeatTimerProperty) is Timer timer)
        {
            timer.Dispose();
            client.SetValue(DependencyExtensions.HeartbeatTimerProperty, null);
        }

        await e.InvokeNext();
    }

    public async Task OnTcpReceived(ITcpClientBase client, ReceivedDataEventArgs e)
    {
        if (e.RequestInfo is MyRequestInfo myRequest)
        {
            this.logger.Info(myRequest.ToString());
            if (myRequest.DataType == DataType.Ping)
            {
                client.Pong();
            }
        }
        await e.InvokeNext();
    }
}

七、测试、启动

/// <summary>
/// 示例心跳。
/// 博客地址<see href="https://blog.csdn.net/qq_40374647/article/details/125598921"/>
/// </summary>
/// <param name="args"></param>
private static void Main(string[] args)
{
    var consoleAction = new ConsoleAction();

    //服务器
    var service = new TcpService();
    service.Setup(new TouchSocketConfig()//载入配置
            .SetListenIPHosts(new IPHost[] { new IPHost("127.0.0.1:7789"), new IPHost(7790) })//同时监听两个地址
            .SetTcpDataHandlingAdapter(() => new MyFixedHeaderDataHandlingAdapter())
            .ConfigureContainer(a =>
            {
                a.AddConsoleLogger();
            })
            .ConfigurePlugins(a =>
            {
                a.Add<HeartbeatAndReceivePlugin>();
            }))
            .Start();//启动
    service.Logger.Info("服务器成功启动");

    //客户端
    var tcpClient = new TcpClient();
    tcpClient.Setup(new TouchSocketConfig()
        .SetRemoteIPHost(new IPHost("127.0.0.1:7789"))
        .SetTcpDataHandlingAdapter(() => new MyFixedHeaderDataHandlingAdapter())
        .ConfigureContainer(a =>
        {
            a.AddConsoleLogger();
        })
        .ConfigurePlugins(a =>
        {
            a.Add<HeartbeatAndReceivePlugin>();
        }));
    tcpClient.Connect();
    tcpClient.Logger.Info("客户端成功连接");

    consoleAction.OnException += ConsoleAction_OnException;
    consoleAction.Add("1", "发送心跳", () =>
      {
          tcpClient.Ping();
      });
    consoleAction.Add("2", "发送数据", () =>
      {
          tcpClient.Send(new MyRequestInfo()
          {
              DataType = DataType.Data,
              Data = Encoding.UTF8.GetBytes(Console.ReadLine())
          }
          .PackageAsBytes());
      });
    consoleAction.ShowAll();
    while (true)
    {
        consoleAction.Run(Console.ReadLine());
    }
}

private static void ConsoleAction_OnException(Exception obj)
{
    Console.WriteLine(obj);
}

八、效果

在这里插入图片描述

本文示例demo

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐