1 概述

  • Semaphore 是基于 AQS + CAS 实现的,可根据构造参数的布尔值,选择使用公平锁,还是非公平锁。Semaphore 默认使用非公平锁;

    在这里插入图片描述

2 构造函数

// AQS的实现
private final Sync sync;

// 默认使用非公平锁
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 根据fair布尔值选择使用公平锁还是非公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
  • syncSemaphore 内部用于实现同步控制的核心组件,它基于 **AQS(AbstractQueuedSynchronizer,抽象队列同步器)**实现。AQS 是 Java 并发包中同步组件(如锁、信号量)的“基石”,通过维护同步状态和等待队列来实现线程的同步与协作;

  • 构造函数 public Semaphore(int permits)

    • 入参 permits 表示信号量的许可数量(即同时允许多少个线程访问共享资源);
    • 该构造函数默认创建 NonfairSync 实例(非公平锁实现);
    • 非公平锁的特点:线程获取许可时不会严格遵循“先到先得”,新线程可能直接抢占许可,导致等待队列中的线程长时间阻塞,但吞吐量通常更高;
  • 构造函数 public Semaphore(int permits, boolean fair)

    • 入参 fair 是布尔值,用于指定是否使用公平锁
    • fairtrue,则创建 FairSync 实例(公平锁实现);若为 false,则创建 NonfairSync 实例(非公平锁);
    • 公平锁的特点:线程会严格按照“等待时间先后”获取许可,保证了等待队列中线程的公平性,但由于需要维护队列顺序,吞吐量可能略低。

3 公平锁与非公平锁

  • Semaphore 中公平锁与非公平锁的实现,可以在tryAcquireShared()方法中找到两种锁的区别;

    在这里插入图片描述

3.1 NonfairSync

  • Semaphore#NonfairSync#tryAcquireShared(int acquires)

    // 非公平锁,获取信号量
    protected int tryAcquireShared(int acquires) {
         return nonfairTryAcquireShared(acquires);
    }
    
    • 该方法是 NonfairSync(非公平锁实现类)中对 AQS 共享式获取逻辑的实现;
    • 它直接调用 nonfairTryAcquireShared(acquires) 方法,把“非公平获取信号量”的核心逻辑委托给该方法处理;
  • Semaphore#Sync#nonfairTryAcquireShared(int acquires)

    // 非公平锁,获取信号量
    final int nonfairTryAcquireShared(int acquires) {
        // 自旋
        for (;;) {
            // 获取Semaphore中可用的信号量数
            int available = getState();
            // 当前可用信号量数 - acquires
            int remaining = available - acquires;
            // 可用信号量数不足 或 CAS操作获取信号量失败,返回  当前可用信号量数 - acquires
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    
    • 这个方法通过自旋 + CAS 实现非公平的信号量获取,步骤如下:

      • 自旋(for(;;) 循环):不断尝试获取信号量,直到成功或确定无法获取;

      • 获取当前可用信号量:通过 getState() 方法获取 Semaphore 中当前可用的信号量数量(available)。getState() 是 AQS 提供的方法,用于维护同步状态(这里同步状态代表可用信号量的数量);

      • 计算剩余信号量remaining = available - acquires,其中 acquires 是线程要获取的信号量数量;

      • CAS 尝试更新状态

        • remaining < 0,说明可用信号量不足,直接返回 remaining(表示获取失败);

        • remaining ≥ 0,则通过 compareAndSetState(available, remaining) 尝试原子性地将“可用信号量”从 available 更新为 remaining。若 CAS 成功,返回 remaining(表示获取成功);若 CAS 失败,说明有其他线程同时修改了信号量状态,继续自旋重试。

3.2 FairSync

  • Semaphore#FairSync#tryAcquireShared():该方法是 FairSync(公平锁实现类)对 AQS 共享式获取逻辑的实现,核心是保证线程“先到先得”的公平性;

    protected int tryAcquireShared(int acquires) {
       // 自旋
        for (;;) {
            // 等待队列中挂起线程,返回-1 (根据返回的-1,将当前线程添加到等待队列中)
            if (hasQueuedPredecessors())
                return -1;
            // 尝试获取Semaphore的信号量,下面与非公平锁逻辑相同
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    
  • 第 5 行的 hasQueuedPredecessors() 是 AQS 提供的方法,用于判断当前线程是否有“前驱节点”(即等待队列中存在比当前线程更早等待的线程)

    • 若返回 true,说明等待队列中已有更早的线程在等待,当前线程直接返回 -1(表示获取失败,会被 AQS 加入等待队列);

    • 这一步是公平锁与非公平锁的核心区别:公平锁会严格检查等待队列的顺序,避免“插队”,而非公平锁则直接尝试抢占;

  • 在通过 hasQueuedPredecessors() 确认“可以尝试抢占”后,后续逻辑与非公平锁类似:

    • 自旋(for(;;) 循环):不断尝试获取信号量,直到成功或确定无法获取;

    • 获取并计算信号量:通过 getState() 获取当前可用信号量(available),再计算获取 acquires 个信号量后的剩余量(remaining = available - acquires);

    • CAS 原子更新:若 remaining ≥ 0,通过 compareAndSetState(available, remaining) 尝试原子性更新信号量状态;若成功则返回 remaining(获取成功),若失败则继续自旋重试;若 remaining < 0,则直接返回 remaining(获取失败)。

4 acquire()

  • Semaphore 默认实现的是非公平锁,下面就按非公平锁的实现进行源码分析;

  • Semaphore#acquire():入口方法

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    • 作用:尝试获取 1 个信号量,若信号量不足则阻塞线程;支持响应线程中断;

    • 实现:委托给 sync(基于 AQS 的同步组件)的 acquireSharedInterruptibly(1) 方法执行,1 表示要获取的信号量数量;

  • AQS#acquireSharedInterruptibly(int arg):共享式可中断获取逻辑

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    • 中断检查if (Thread.interrupted())先检查线程是否被中断,若已中断则抛出 InterruptedException

    • 尝试获取资源:调用 tryAcquireShared(arg)(由 Semaphore 的公平/非公平锁实现,如前所述的 FairSyncNonfairSync 的逻辑)。若返回值 < 0,说明获取失败,进入 doAcquireSharedInterruptibly(arg) 处理阻塞逻辑;

  • AQS#doAcquireSharedInterruptibly(int arg):共享式阻塞获取(可中断)

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列
        boolean failed = true;
        try {
            for (;;) { // 自旋
                final Node p = node.predecessor(); // 获取前驱节点
                if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 获取成功
                        setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)
                        p.next = null; // 断开原头节点引用,帮助GC
                        failed = false;
                        return;
                    }
                }
                // 若获取失败,判断是否需要挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 挂起线程并检查中断
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求
        }
    }
    
    • 加入等待队列addWaiter(Node.SHARED) 将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部;

    • 自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试 tryAcquireShared(arg)

    • 线程挂起与中断:若获取失败,通过 shouldParkAfterFailedAcquire 判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt() 会返回 true,进而抛出 InterruptedException

  • AQS#setHeadAndPropagate(Node node, int propagate):设置头节点并传播唤醒(共享模式关键)

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        setHead(node); // 将当前节点设为新的头节点
        // 若剩余资源>0、原头节点状态异常(或为null),则传播唤醒后续共享节点
        if (propagate > 0 || h == null || h.waitStatus < 0 || 
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared()) // 若后续节点是共享模式,唤醒它
                doReleaseShared();
        }
    }
    
    • 设置头节点setHead(node) 将当前节点标记为新的头节点(头节点代表“已获取资源并执行完毕”的线程);

    • 传播唤醒逻辑:由于 Semaphore 是共享锁,获取资源的线程需要“传播”唤醒后续等待的共享节点。若满足 propagate > 0(剩余资源充足)或队列状态异常等条件,会调用 doReleaseShared() 唤醒后续节点,保证共享资源的并发获取效率。

5 release()

  • Semaphore 默认实现的是非公平锁,下面就按非公平锁的实现进行源码分析;

  • Semaphore#release():入口方法

    public void release() {
        sync.releaseShared(1);
    }
    
    • 作用:归还 1 个信号量

    • 实现:委托给 sync(基于 AQS 的同步组件)的 releaseShared(1) 方法执行,1 表示要归还的信号量数量;

  • AQS#releaseShared(int arg):共享式释放逻辑

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 尝试归还信号量
            doReleaseShared(); // 唤醒等待队列中的线程
            return true;
        }
        return false;
    }
    
    • 尝试归还资源:调用 tryReleaseShared(arg)(由 Semaphore 实现),若归还成功则进入下一步;

    • 唤醒等待线程:调用 doReleaseShared() 唤醒 AQS 等待队列中阻塞的线程,让它们有机会获取刚归还的信号量;

  • Semaphore#Sync#tryReleaseShared(int releases):信号量归还的核心逻辑

    protected final boolean tryReleaseShared(int releases) {
        for (;;) { // 自旋
            int current = getState(); // 获取当前可用信号量(AQS的同步状态)
            int next = current + releases; // 计算归还后的信号量总数
            if (next < current) // 防止int溢出(若next为负,说明超出int最大值)
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next)) // CAS原子更新信号量
                return true;
        }
    }
    
    • 自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,确保“归还信号量”的操作是原子的(避免多线程同时归还时的状态冲突);

    • 状态更新getState() 获取当前信号量数量,next = current + releases 计算归还后的数量,再通过 compareAndSetState 原子性更新状态;

  • AQS#doReleaseShared():唤醒等待队列的共享线程

    private void doReleaseShared() {
        for (;;) { // 自旋
            Node h = head; // 获取等待队列的头节点
            if (h != null && h != tail) { // 队列非空且有等待线程
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                        continue; // CAS更新状态失败,继续自旋
                    unparkSuccessor(h); // 唤醒头节点的后继线程
                } 
                // 处理JDK1.5的bug:将头节点状态设为PROPAGATE,保证共享模式下的唤醒传播
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head) // 头节点未变化,说明唤醒操作完成
                break;
        }
    }
    
    • 自旋保证唤醒可靠性:循环处理队列,确保唤醒操作能覆盖所有需要唤醒的线程。

    • 状态判断与唤醒:若头节点状态为 SIGNAL,则通过 unparkSuccessor(h) 唤醒其后继线程;同时处理共享模式下的状态传播(PROPAGATE),保证多个共享线程能依次被唤醒。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐