UDP Socket编程实战(三):多线程聊天室与线程安全

💬 开篇:前两篇实现了Echo Server和网络字典,都是单播模式——一个客户端发消息,服务器只回给他。但聊天室不是这样的:一个人发消息,所有人都要收到。这就要引入路由转发机制。同时,多个客户端并发访问,服务器要用线程池处理,这就涉及线程安全问题。这一篇会把UDP编程的几个核心技术点串起来:在线用户管理、消息路由、线程池、互斥锁,以及UDP全双工特性。

👍 点赞、收藏与分享:这篇会讲清楚从单播到多播的演进、线程安全的实践、以及inet_ntoa的多线程陷阱。如果对你有帮助,请点赞收藏!

🚀 循序渐进:从V2到V3的改造,从单线程到多线程,从简单回显到复杂路由,一步步理解网络服务的进阶设计。


一、聊天室的需求分析

1.1 单播和多播的区别

之前的Echo和Dict都是单播:客户端A发消息,服务器只回给A。

聊天室是多播:客户端A发消息,服务器要转发给所有在线的客户端(包括A自己)。


单播模式:
客户端A → 服务器

多播模式:
客户端A → 服务器 → 客户端A
→ 客户端B
→ 客户端C
→ ...

1.2 需要解决的问题

要实现聊天室,服务器要做这几件事:

  1. 维护在线用户列表:知道哪些客户端在线,他们的IP和端口是什么
  2. 消息路由:收到一个客户端的消息后,转发给所有在线用户
  3. 并发处理:多个客户端同时发消息,服务器要能同时处理
  4. 线程安全:多个线程同时访问在线用户列表,要加锁保护

二、在线用户管理

2.1 用什么数据结构存储在线用户

最简单的选择是vector<InetAddr>,把每个客户端的地址存进去。

class UdpServer : public nocopy
{
private:
    std::vector<InetAddr> _online_user;    // 在线用户列表
    pthread_mutex_t _user_mutex;           // 保护列表的互斥锁
};

为什么不用unordered_set?因为我们需要遍历所有用户来转发消息,vector遍历更快。而且用户数量通常不大,线性查找的性能损失可以接受。

2.2 添加用户到在线列表

void AddOnlineUser(InetAddr addr)
{
    LockGuard lockguard(&_user_mutex);    // 加锁,退出时自动解锁
    
    for (auto &user : _online_user)
    {
        if (addr == user)
            return;    // 已经在列表里了,不重复添加
    }
    
    _online_user.push_back(addr);
    lg.LogMessage(Debug, "%s:%d is add to onlineuser list...\n", 
                  addr.Ip().c_str(), addr.Port());
}
2.2.1 为什么要判断用户是否已存在

客户端每次发消息,服务器都会调用AddOnlineUser。如果不判断,同一个用户会被添加多次,导致转发消息时重复发送。

所以先遍历一遍列表,如果发现已经存在就直接返回。这里用到了InetAddroperator==重载。

2.2.2 InetAddr的==重载
bool operator == (const InetAddr& addr)
{
    return this->_ip == addr._ip && this->_port == addr._port;
}

两个地址相等的条件:IP相同且端口相同。这样vector的查找就能正确工作了。

2.2.3 为什么要加锁

_online_user会被多个线程同时访问:

  • 主线程收到消息后调用AddOnlineUser
  • 线程池里的工作线程调用Route遍历用户列表

如果不加锁,可能出现这种情况:线程A正在遍历用户列表,线程B同时在往列表里添加用户,导致迭代器失效或者数据不一致。

所以用pthread_mutex_t加锁,保证同一时刻只有一个线程能访问列表。

2.2.4 LockGuard是什么

这是我们之前写的RAII风格的锁封装类

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex) : _mutex(mutex)
    {
        pthread_mutex_lock(_mutex);    // 构造时加锁
    }
    
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);  // 析构时解锁
    }
    
private:
    pthread_mutex_t* _mutex;
};

好处是:不管函数怎么返回(正常返回、提前return、抛异常),锁都会被自动释放。避免了忘记解锁导致的死锁问题。


三、消息路由机制

3.1 Route()函数的实现

void Route(int sock, const std::string& message)
{
    std::vector<InetAddr> snapshot;
    {
        LockGuard lockguard(&_user_mutex);
        snapshot = _online_user;  // 拷贝一份快照
    }

    for (auto& user : snapshot) {
        const sockaddr_in& sa = user.GetAddr();
        sendto(sock, message.data(), message.size(), 0,
               (const sockaddr*)&sa, sizeof(sa));
    }
}

很简单:遍历所有在线用户,把消息发给每个人。

锁内拷贝用户列表,锁外发送

3.2 GetAddr()方法

InetAddr类新增了一个方法:

const struct sockaddr_in& GetAddr()
{
    return _addr;
}

返回原始的sockaddr_in结构体,供sendto使用。注意返回的是引用,避免拷贝。

3.3 消息格式的设计

std::string message = "[";
message += addr.Ip();
message += ":";
message += std::to_string(addr.Port());
message += "]# ";
message += buffer;

消息格式是[IP:Port]# 内容,比如:

[192.168.1.100:54321]# Hello everyone!

这样客户端收到消息后,能看到是谁发的。


四、引入线程池

4.1 为什么需要线程池

如果服务器是单线程的,Route函数要遍历所有用户发送消息。假设有100个在线用户,每次发送耗时1ms,那么路由一次消息就要100ms。这期间服务器不能接收新消息,会阻塞。

用线程池的话,主线程只负责接收消息,把路由任务扔给线程池处理。主线程立刻可以继续接收下一个消息,不会被阻塞。

4.2 任务类型的定义

using task_t = std::function<void()>;

线程池的任务就是一个无参数无返回值的函数。

4.3 Init()中启动线程池

void Init()
{
    // ... socket创建和bind的代码 ...
    
    ThreadPool<task_t>::GetInstance()->Start();
}

ThreadPool是单例模式,调用GetInstance()获取实例,然后Start()启动工作线程。

4.4 Start()中提交任务

void Start()
{
    char buffer[defaultsize];
    for (;;)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        
        ssize_t n = recvfrom(_sockfd, buffer, sizeof(buffer) - 1,
                             0, (struct sockaddr *)&peer, &len);
        if (n > 0)
        {
            InetAddr addr(peer);
            AddOnlineUser(addr);    // 添加到在线列表
            
            buffer[n] = 0;
            
            // 构造消息
            std::string message = "[";
            message += addr.Ip();
            message += ":";
            message += std::to_string(addr.Port());
            message += "]# ";
            message += buffer;
            
            // 创建任务并提交给线程池
            task_t task = std::bind(&UdpServer::Route, this, _sockfd, message);
            ThreadPool<task_t>::GetInstance()->Push(task);
        }
    }
}
4.4.1 std::bind的用法
task_t task = std::bind(&UdpServer::Route, this, _sockfd, message);

std::bind把成员函数Route和参数绑定在一起,生成一个无参数的可调用对象。

  • &UdpServer::Route:成员函数指针
  • this:当前对象指针(成员函数需要一个对象才能调用)
  • _sockfd, message:函数参数

绑定后的task就是一个void()类型的函数对象,线程池可以直接调用。

4.4.2 为什么要拷贝message

注意message是按值传递给std::bind的,会发生拷贝。为什么不用引用?

因为message是局部变量,下一轮循环就被修改了。如果用引用,线程池里的任务执行的时候,message可能已经变成别的内容了。

所以必须拷贝一份,让任务持有自己的数据副本。


五、多线程客户端设计

5.1 UDP的全双工特性

UDP协议支持全双工:因为UDP 是无连接、按数据报收发,应用层可以使用不同线程并发 recvfrom/sendto。但输出(终端打印)可能交错,且并发使用同一 socket 仍应注意线程之间的协调与退出处理。

这和TCP不同。TCP虽然也是全双工,但底层有滑动窗口、流量控制等机制,读写会互相影响。UDP没有这些复杂机制,读写完全独立,应用层更直接。

5.2 收发分离的设计

基于全双工特性,客户端可以用两个线程:

  • 一个线程专门发送消息(读取用户输入 → 发送)
  • 一个线程专门接收消息(接收 → 打印)
void RecverRoutine(ThreadData &td)
{
    char buffer[4096];
    while (true)
    {
        struct sockaddr_in temp;
        socklen_t len = sizeof(temp);
        
        ssize_t n = recvfrom(td._sockfd, buffer, sizeof(buffer) - 1,
                             0, (struct sockaddr *)&temp, &len);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cerr << buffer << std::endl;    // 用cerr方便观察效果
        }
        else break;
    }
}

void SenderRoutine(ThreadData &td)
{
    while (true)
    {
        std::string inbuffer;
        std::cout << "Please Enter# ";
        std::getline(std::cin, inbuffer);
        
        auto server = td._serveraddr.GetAddr();
        ssize_t n = sendto(td._sockfd, inbuffer.c_str(), inbuffer.size(),
                           0, (struct sockaddr *)&server, sizeof(server));
        if (n <= 0)
            std::cout << "send error" << std::endl;
    }
}
5.2.1 为什么接收线程用cerr
std::cerr << buffer << std::endl;

cerr而不是cout,是为了和发送线程的cout区分开。cerr默认不带缓冲,立刻输出,避免和cout的输出交错。

在实际测试中,你可以看到接收的消息和输入提示符不会乱序。

5.2.2 ThreadData的作用
class ThreadData
{
public:
    ThreadData(int sock, struct sockaddr_in &server) 
        : _sockfd(sock), _serveraddr(server)
    {
    }
    
public:
    int _sockfd;
    InetAddr _serveraddr;
};

这是一个简单的数据结构,用来在主线程和工作线程之间传递共享数据。

因为两个线程都要用到socket fd和服务器地址,所以把它们打包到一起。

5.3 启动两个线程

ThreadData td(sock, server);

Thread<ThreadData> recver("recver", RecverRoutine, td);
Thread<ThreadData> sender("sender", SenderRoutine, td);

recver.Start();
sender.Start();

recver.Join();
sender.Join();

Thread是线程封装类

启动两个线程后,主线程等待它们结束。但实际上这两个线程是死循环,永远不会退出,除非用户强制终止程序。

六、本篇总结

6.1 核心要点

聊天室架构

  • 维护在线用户列表(vector<InetAddr>
  • 收到消息后添加发送方到列表,然后路由转发给所有在线用户
  • 消息格式包含发送方的IP和端口,方便客户端识别

线程安全

  • _online_user会被多个线程同时访问,必须加锁保护
  • 用RAII风格的LockGuard自动管理锁,避免忘记解锁
  • AddOnlineUserRoute都要加锁

线程池的使用

  • 主线程只负责接收消息,把路由任务提交给线程池
  • std::bind把成员函数和参数绑定成无参数任务
  • 任务要拷贝数据(不能用引用),避免局部变量失效

多线程客户端

  • UDP是全双工的,可以同时读写
  • 收发分离:一个线程发送,一个线程接收
  • cerr打印接收的消息,和cout的输入提示区分开

inet_ntoa的陷阱

  • 单线程:多次调用结果会互相覆盖,要马上复制
  • 多线程:共享静态缓冲区,结果可能乱掉
  • 解决方案:用inet_ntop,由调用者提供缓冲区,线程安全

6.2 容易混淆的点

  1. 为什么要判断用户是否已存在:客户端每次发消息都会调用AddOnlineUser,不判断的话同一个用户会被添加多次,导致重复发送。

  2. std::bind的message为什么要拷贝:因为是局部变量,下一轮循环就变了。如果用引用,线程池执行任务时数据已经错乱。

  3. UDP全双工和TCP全双工的区别:UDP没有流量控制和拥塞控制,读写完全独立。TCP虽然也是全双工,但底层机制复杂,读写会互相影响。

  4. 为什么接收线程用cerr而不是coutcerr无缓冲立刻输出,避免和cout的输出交错。在聊天室场景下,接收消息和输入提示不会乱序。

  5. inet_ntop的缓冲区大小怎么确定:IPv4用INET_ADDRSTRLEN(值是16),IPv6用INET6_ADDRSTRLEN(值是46)。这是标准定义的常量,不要硬编码数字。

  6. 为什么某些系统上inet_ntoa看起来线程安全:可能用了线程局部存储TLS,但不是标准保证的。不能依赖实现细节,应该用标准推荐的inet_ntop


💬 总结:这一篇把UDP编程的进阶技术都讲完了。从单播到多播,从单线程到多线程,从简单回显到复杂聊天室,整个演进过程涵盖了网络编程的核心技能:并发处理、线程安全、任务分发、全双工通信。下一篇(可选)会专门深入地址转换函数的所有细节,把inet_addrinet_ptoninet_ntoainet_ntop四个函数的使用场景和陷阱全部总结清楚。

👍 点赞、收藏与分享:如果这三篇UDP实战帮你把网络编程的基础打稳了,请点赞收藏!后面的TCP编程会更复杂,但有了这些基础,理解起来会容易很多!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐