前景引入

互斥锁作为保障临界区资源互斥访问的核心机制,成功解决了多线程 / 多进程环境下单一资源的竞争问题,确保同一时间仅有一个执行流能够操作共享资源。但在实际的并发场景中,资源的管控需求往往超出 “独占” 范畴:有时需要限制同时访问某类资源的执行流数量(如仅允许 3 个线程同时读写文件),有时需要协调多个执行流的执行顺序(如生产者生产后唤醒消费者),这些场景下互斥锁 “非 0 即 1” 的二元管控模式已无法满足需求。正是为了突破互斥锁的这种局限性,信号量(Semaphore)作为一种更灵活的同步原语被提出,它通过一个计数器实现了对多份共享资源的精细化管控,成为并发编程中解决复杂同步与互斥问题的关键工具。

互斥锁可以理解为独占整个电影院:同一时间只允许一个人占用全场,其他人都必须等待,它解决的是单一资源的独占访问问题。

信号量更像是对电影院里的座位进行预订:影院一共有 N 个座位,就允许最多 N 个人同时入场,每进来一个人占用一个座位,满座后其他人必须等待;有人离场释放座位,下一个人才能进入。

从核心逻辑来看,信号量可简化理解为一个支持原子操作的计数器:它将互斥锁所保护的 “独占式整体资源”,拆解为计数器大小对应的若干个 “可并发访问的资源小单元”。当线程成功申请信号量资源时,计数器原子性自减(保证多线程修改不冲突);当线程执行完临界区代码、释放信号量资源时,计数器原子性自增。正因如此,若将信号量的计数器初始值设为 1(即二元信号量),其 “同一时间仅允许一个线程占用资源” 的特性,完全等价于互斥锁的独占逻辑 —— 这也是互斥锁本质上是特殊的二元信号量的原因。

接口认识

变量类型名称

sem_t 是 POSIX 标准定义的信号量专用数据类型(本质是一个结构体)。

信号量资源初始化

sem_init() 是 POSIX 标准中用于初始化信号量对象的核心接口,也是使用信号量的第一步 —— 未通过该接口初始化的 sem_t 类型信号量变量无法被 sem_wait()/sem_post() 等接口正常操作,其作用是为信号量设定 “作用域” 和 “初始资源数量”,奠定信号量的核心行为基础。

参数名 数据类型 核心含义 关键说明
sem sem_t * 信号量指针 指向待初始化的 sem_t 类型变量,是信号量操作的核心载体
pshared int 进程共享标识 0:信号量仅在当前进程的线程间共享(90% 以上日常场景);非 0:信号量可跨多个进程共享(需配合共享内存)
value unsigned int 初始计数器值 信号量对应的初始可用资源数量(如影院初始座位数),也是信号量计数器的初始值

信号量资源申请

该接口用于线程 / 进程申请信号量资源,只有成功申请到信号量的线程 / 进程,才能继续执行后续代码块。类比电影院场景:信号量代表影院内的所有座位,信号量申请操作就是预定影院中的一个座位,预定成功即可入场(执行后续逻辑),预定失败则需等待他人释放座位(信号量)。

1. sem_wait:信号量的 “阻塞式申请”

sem_wait() 是信号量最基础的资源申请接口,核心作用是以阻塞方式抢占信号量对应的共享资源单元。其行为逻辑为:调用该接口时,会先原子性检查信号量计数器值 —— 若计数器大于 0(仍有可用资源),则计数器原子性减 1,接口立即返回,当前线程可继续执行后续临界区代码;若计数器等于 0(无可用资源),当前线程会被挂起并加入信号量的等待队列,进入无限阻塞状态,直到其他线程调用 sem_post() 释放资源(计数器自增),该线程被唤醒后重新竞争资源。

2. sem_trywait:信号量的 “非阻塞式申请”

sem_trywait() 是信号量的非阻塞资源申请接口,核心作用是尝试抢占资源,失败后立即返回,绝不阻塞。其行为逻辑为:调用该接口时,同样原子性检查信号量计数器 —— 若计数器大于 0,计数器原子性减 1,接口返回 0(申请成功),线程可执行后续代码;若计数器等于 0,接口不会阻塞线程,而是直接返回非 0 错误码(通常为 EAGAIN),告知 “资源申请失败”,线程可立即执行降级逻辑(如放弃申请、重试或处理其他任务)。

3. sem_timedwait:信号量的 “限时阻塞式申请”

sem_timedwait() 是信号量的限时阻塞申请接口,核心作用是在指定时间内阻塞等待资源,超时后自动放弃,是 “无限阻塞” 和 “非阻塞” 的折中方案。其行为逻辑为:调用该接口时需传入一个绝对超时时间点(如当前时间 + 10 秒),首先原子性检查计数器 —— 若有可用资源则立即申请成功;若无可用资源,线程进入阻塞状态,但仅等待至指定超时时间:若超时前有其他线程释放资源,线程被唤醒并申请资源;若超时时间到达仍无可用资源,接口返回 ETIMEDOUT 错误码,线程结束等待并执行超时逻辑。

信号量资源释放

sem_post() 是 POSIX 标准中用于释放信号量资源的核心接口,与信号量资源申请接口形成 “申请 - 释放” 的完整闭环 —— 其核心作用是将信号量计数器原子性递增,归还已占用的资源单元,并唤醒等待队列中因申请资源失败而阻塞的线程。

信号量资源销毁

sem_destroy() 是 POSIX 标准中用于销毁已初始化信号量的核心接口,也是信号量使用生命周期的最后一步 —— 其核心作用是释放 sem_t 类型信号量占用的系统内核资源,避免资源泄漏,必须与 sem_init() 接口配对使用。

基础代码演示

基于以上接口认识,我们可以简单的进行信号量从初始化->使用->销毁的demo:

#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

sem_t sem;

void *thread_func(void *arg)
{
    char *thread_name = (char *)arg;

    printf("[%s] 尝试申请信号量...\n", thread_name);
    sem_wait(&sem);
    printf("[%s] 成功申请到信号量,执行临界区逻辑\n", thread_name);
    sleep(2);
    sem_post(&sem);
    printf("[%s] 释放信号量,临界区执行完毕\n", thread_name);

    pthread_exit(NULL);
}

int main()
{
    if (sem_init(&sem, 0, 2) == -1)
    {
        perror("sem_init 初始化失败");
        exit(EXIT_FAILURE);
    }
    printf("信号量初始化成功,初始计数器值=2\n");

    pthread_t t1, t2, t3;
    pthread_create(&t1, NULL, thread_func, (void *)"线程1");
    pthread_create(&t2, NULL, thread_func, (void *)"线程2");
    pthread_create(&t3, NULL, thread_func, (void *)"线程3");

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);

    sem_destroy(&sem);
    printf("信号量已销毁,程序结束\n");

    return 0;
}

接口简单应用——基于生产者-消费者模型的封装

掌握信号量的核心概念后,我们可以将其与生产者 - 消费者模型结合,完成代码整合与接口封装,实现知识点的串联落地。为简化实现,本文采用数组(vector)模拟环形队列的方式来抽象生产 / 消费的核心过程:信号量用于管控队列中空闲位置和已填充数据的数量,互斥锁用于保护多生产者 / 多消费者对队列的并发操作,最终完整实现生产 - 消费的同步逻辑。

互斥锁头文件封装:

// Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>

namespace MutexModule
{
    class Mutex
    {
    public:
        Mutex()
        {
            pthread_mutex_init(&_mutex, nullptr);
        }
        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
            (void)n;
        }
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
            (void)n;
        }
        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

    private:
        pthread_mutex_t _mutex;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mutex):_mutex(mutex)
        {
            _mutex.Lock();
        }
        ~LockGuard()
        {
            _mutex.Unlock();
        }
    private:
        Mutex &_mutex;
    };
}

信号量头文件封装:

// Sem.hpp
#pragma once
#include <semaphore.h>
#define defaultvalue 1
namespace Sem
{
    class SemModule
    {
    public:
        SemModule(int _sem_value = defaultvalue)
        {
            sem_init(&_sem, 0, _sem_value);
        }
        void Wait()
        {
            sem_wait(&_sem);
        }
        void Post()
        {
            sem_post(&_sem);
        }
        ~SemModule()
        {
            sem_destroy(&_sem);
        }

    private:
        sem_t _sem;
    };
}

环形队列头文件封装:

// RingQueue.hpp
#pragma once
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"
#define defaultsize 5
using namespace std;
using namespace Sem;
using namespace MutexModule;
namespace RQModule
{
    template <typename T>
    class RQ
    {
    public:
        RQ(int cap = defaultsize)
            : _rq(cap),
              _rqsize(cap),
              _empty(cap),
              _full(0),
              _pstep(0),
              _cstep(0)
        {
        }
        void EQueue(const T &in)
        {
            _empty.Wait();
            _plock.Lock();
            _rq[_pstep++] = in;
            _pstep %= _rqsize;
            _full.Post();
            _plock.Unlock();
        }
        const T *Pop()
        {
            _full.Wait();
            _clock.Lock();
            const T *get = &_rq[_cstep++];
            _cstep %= _rqsize;
            _empty.Post();
            _clock.Unlock();
            return get;
        }
        ~RQ()
        {
        }

    private:
        vector<T> _rq;
        int _rqsize;
        // 生产者
        SemModule _empty;
        unsigned int _pstep;
        // 消费者
        SemModule _full;
        unsigned int _cstep;
        Mutex _plock;
        Mutex _clock;
    };
}

主程序函数:

// Main.cc
#include "RingQueue.hpp"
#include <pthread.h>
#include <iostream>
#include <unistd.h>
using namespace RQModule;
RQ<int> *rq_ = new RQ<int>(5);
int Data = 1;
const int Datasize = 5;
void *consumer(void *args)
{
    while (1)
    {
        const char *name = static_cast<const char *>(args);
        const int *accept = rq_->Pop();
        std::cout << "我是" << name << std::endl
                  << "我得到了数据" << *accept;
        sleep(1);
    }
}
void *producer(void *args)
{
    while (1)
    {
        rq_->EQueue(Data++);
        Data %= Datasize;
    }
}
int main()
{
    pthread_t ptd1, ptd2, ctd1, ctd2, ctd3;
    pthread_create(&ptd1, nullptr, producer, (void *)"ptd-1");
    pthread_create(&ptd2, nullptr, producer, (void *)"ptd-2");
    pthread_create(&ctd1, nullptr, consumer, (void *)"ctd-1");
    pthread_create(&ctd2, nullptr, consumer, (void *)"ctd-2");
    pthread_create(&ctd3, nullptr, consumer, (void *)"ctd-3");
    pthread_join(ptd1, nullptr);
    pthread_join(ptd2, nullptr);
    pthread_join(ctd1, nullptr);
    pthread_join(ctd2, nullptr);
    pthread_join(ctd3, nullptr);
    delete rq_;
}

运行效果

当然这里环形队列使用了模版,可以在队列放入其他类型变量(函数指针、字符串......)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐