之前我们讲过CountDownLatch和CyclicBarrier。

前者是一次性的栅栏,通过countDown()减数,await()等待计数值归零。

后者是可重用的栅栏,所有线程await()到达之后,重置栅栏,进入下一代。

两者都有一个共同的限制,那就是参与的线程数量在创建的时候就固定好了。

在CountDownLatch里面,我们通过new CountDownLatch(10)进行了设置,就必须countDown()10次。

在CyclicBarrier里面,我们通过new CyclicBarrier(10)进行了设置,就必须有10个线程await()。

但是在很多复杂的并发场景里面,我们没办法在开始的时候就确定到底有多少参与方。

比如,一个主任务可能会动态地派生出N个子任务,主任务需要等所有子任务(数量未知)都完成之后才能继续。

为了解决这个问题,J.U.C又给我们提供了一个工具--Phaser。

一、核心概念

Phaser继承了CyclicBarrier的可重用特性,在这个基础上还提供了更加灵活的功能。

Phaser可以在运行期间动态增加(register)或减少(deregister)参与方的数量。

Phaser中文直译就是阶段。所有参与方到达当前阶段之后,Phaser会自动进入下一个阶段(phase编号+1)。

Phaser还可以构建成一棵树。一个Phaser可以有父Phaser和子Phaser。主要是为了降低大规模并发时的竞争,提高吞吐量。

Phaser还可以被终止。终止之后,await()方法会马上返回,不会阻塞。

二、核心API

我们主要看一下Phaser里面跟到达和注册相关的API。

3.1 构造方法


public Phaser() {
        this(null, 0);
    }

public Phaser(int parties) {
        this(null, parties);
    }

public Phaser(Phaser parent) {
        this(parent, 0);
    }

public Phaser(Phaser parent, int parties) {
        if (parties >>> PARTIES_SHIFT != 0)
            throw new IllegalArgumentException("Illegal number of parties");
        int phase = 0;
        this.parent = parent;
        if (parent != null) {
            final Phaser root = parent.root;
            this.root = root;
            this.evenQ = root.evenQ;
            this.oddQ = root.oddQ;
            if (parties != 0)
                phase = parent.doRegister(1);
        }
        else {
            this.root = this;
            this.evenQ = new AtomicReference<QNode>();
            this.oddQ = new AtomicReference<QNode>();
        }
        this.state = (parties == 0) ? (long)EMPTY :
            ((long)phase << PHASE_SHIFT) |
            ((long)parties << PARTIES_SHIFT) |
            ((long)parties);
    }

前三个构造方法都是为了使用方便,把某些参数设置了默认值。

第四个构造是主构造方法,主要设置的是parent和parties。前者是父Phaser,后者是参与方的数量。

3.2 核心方法

int register()

 

调用这个方法会注册一个新的参与方,parties+1。

int bulkRegister(int parties)

 

给定指定数量的参与方进行注册。

int arrive()

 

当前线程到达栅栏,但不阻塞。unarrived(未到达数)- 1。

int arriveAndAwaitAdvance()

 

当前线程到达栅栏,并阻塞等待,直到本阶段所有其他参与方都到达。这里跟CyclicBarrier.await()是等价的。

int arriveAndDeregister()

 

当前线程到达栅栏,并注销自己。parties - 1。之后,Phaser在下一阶段就不会再等待这个线程了。

int getPhase()

 

获取当前阶段号,是从0开始的。

boolean onAdvance(int phase, int registeredParties)

 

这是一个可以覆写的方法。当一个阶段完成,进入下一个阶段的时候,Phaser会自动调用他。

如果这个方法返回true,Phaser就会进入终止状态。

3.3 Phaser的基础使用

了解了一些基础概念和核心的API,我们通过一个小案例,来使用一下Phaser。

package com.lazy.snail.day56;

import java.util.concurrent.Phaser;

/**
 * @ClassName PhaserDemo1
 * @Description TODO
 * @Author lazysnail
 * @Date 2025/11/3 10:53
 * @Version 1.0
 */
public class PhaserDemo1 {
    public static void main(String[] args) {
        int parties = 5;
        Phaser phaser = new Phaser(parties);

        for (int i = 0; i < parties; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + finalI + " 正在执行任务...");
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("线程 " + finalI + " 完成任务,等待栅栏...");

                    phaser.arriveAndAwaitAdvance();

                    System.out.println("线程 " + finalI + " 越过栅栏,继续执行...");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

这个案例比较简单,只是用Phaser实现了多线程同步。

创建了5个线程,每个线程先执行自己的任务,然后通过phaser.arriveAndAwaitAdvance()在栅栏处等待。

只有当所有5个线程都到达栅栏后,才会一起继续执行后续代码。

Phaser在这里只是充当了一个可重用的同步屏障,确保所有线程完成当前阶段后再进入下一阶段。

类似CyclicBarrier。

四、核心应用

下面我们通过两个示例来深入的了解下Phaser。

4.1 动态参与方特性

动态的调整参与方应该是Phaser最强大的功能了。


package com.lazy.snail.day56;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName PhaserDemo2
 * @Description TODO
 * @Author lazysnail
 * @Date 2025/11/3 11:20
 * @Version 1.0
 */
public class PhaserDemo2 {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(1);
        System.out.println("主线程(Phase " + phaser.getPhase() + "):启动 " + phaser.getRegisteredParties() + " 个参与方");

        for (int i = 0; i < 3; i++) {
            new Task(phaser, i).start();
        }

        TimeUnit.SECONDS.sleep(1);
        System.out.println("主线程(Phase " + phaser.getPhase() + "):等待 " + phaser.getRegisteredParties() + " 个参与方到达...");
        phaser.arriveAndAwaitAdvance();
        System.out.println("主线程(Phase " + phaser.getPhase() + "):所有任务启动完毕!");

        System.out.println("主线程(Phase " + phaser.getPhase() + "):等待 " + phaser.getRegisteredParties() + " 个参与方完成...");
        phaser.arriveAndAwaitAdvance();
        System.out.println("主线程(Phase " + phaser.getPhase() + "):所有存活任务已完成!");

        System.out.println("主线程(Phase " + phaser.getPhase() + "):等待 " + phaser.getRegisteredParties() + " 个参与方退出...");
        phaser.arriveAndAwaitAdvance();
        System.out.println("主线程(Phase " + phaser.getPhase() + "):所有任务均已退出!");

        System.out.println("Phaser 终止状态:" + phaser.isTerminated());
    }

    static class Task extends Thread {
        private final Phaser phaser;
        private final int id;

        Task(Phaser phaser, int id) {
            this.phaser = phaser;
            this.id = id;
            this.phaser.register();
            System.out.println("任务" + id + ":已注册,当前参与方 " + phaser.getRegisteredParties());
        }

        @Override
        public void run() {
            try {
                System.out.println("任务" + id + ":已启动,等待...");
                phaser.arriveAndAwaitAdvance(); 
                System.out.println("任务" + id + ":(Phase " + phaser.getPhase() + ") 开始工作...");

                Thread.sleep(1000 + id * 500);

                if (id == 0 || id == 1) {
                    System.out.println("任务" + id + ":工作完成,退出!");
                    phaser.arriveAndDeregister();
                } else {
                    System.out.println("任务" + id + ":工作完成,等待...");
                    phaser.arriveAndAwaitAdvance();
                }

                if (id == 2) {
                    System.out.println("任务" + id + ":(Phase " + phaser.getPhase() + ") 开始第二阶段工作...");
                    Thread.sleep(1000);
                    System.out.println("任务" + id + ":第二阶段完成,退出!");
                    phaser.arriveAndDeregister();
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

我们在代码中先通过new Phaser(1)注册了第一个参与方。

Task在构造的时候,就会进行register。for循环结束的时候,我们就有了四个参与方,一个主线程和3个任务。

主线程里通过arriveAndAwaitAdvance等待所有任务到达。

每个Task线程通过arriveAndAwaitAdvance通知已启动完成。

阶段1的时候,任务0和1执行arriveAndDeregister()后退出,然后减少参与方计数。

任务2执行arriveAndAwaitAdvance()继续参与下一阶段。

阶段2的时候,任务2开始第二阶段的工作。

阶段3主线程检测到所有任务都已退出。

来看一下时序图:

 

4.2 使用onAdvance()终止Phaser

Phaser可以通过覆写onAdvance()来控制阶段行为,在特定条件,比如达到某个阶段或参与方为空时终止。


package com.lazy.snail.day56;

import java.util.concurrent.Phaser;

/**
 * @ClassName PhaserDemo3
 * @Description TODO
 * @Author lazysnail
 * @Date 2025/11/3 11:43
 * @Version 1.0
 */
public class PhaserDemo3 {
    public static void main(String[] args) {
        int parties = 3;
        int phases = 4;

        Phaser phaser = new Phaser(parties) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("--- 阶段 " + phase + " 完成,当前参与方 " + registeredParties + " ---");
                if (phase == phases - 1 || registeredParties == 0) {
                    System.out.println(">>> Phaser 终止!");
                    return true;
                }
                return false;
            }
        };

        for (int i = 0; i < parties; i++) {
            new Worker(phaser, "Worker-" + i).start();
        }
    }

    static class Worker extends Thread {
        private final Phaser phaser;

        Worker(Phaser phaser, String name) {
            super(name);
            this.phaser = phaser;
        }

        @Override
        public void run() {
            while (!phaser.isTerminated()) {
                try {
                    System.out.println(getName() + ":(Phase " + phaser.getPhase() + ") 正在工作...");
                    Thread.sleep(1000 + (long)(Math.random() * 1000));
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(getName() + ":退出。");
        }
    }
}

初始化阶段,我们在代码里通过new Phaser(parties) 创建了一个有3个参与方的Phaser,而且重写了 onAdvance方法用于阶段完成时的回调和控制终止。

for循环创建了3个Worker线程,每个Worker在构造的时候就已经作为参与方注册到Phaser里面。

阶段0的时候,所有3个Worker线程同时开始工作。

每个Worker执行完工作后调用phaser.arriveAndAwaitAdvance()等待其他线程。

当最后一个Worker到达的时候,触发onAdvance(0, 3)回调。

由于phase=0不等于phases-1=3,返回false,Phaser继续运行。

阶段1的时候,所有Worker被唤醒,进入阶段1的工作,还是执行工作后等待同步。

最后一个到达时触发onAdvance(1, 3) 回调,返回false,继续下一阶段。

阶段2的时候,Worker进入阶段2的工作循环,任然完成工作后同步等待。

触发onAdvance(2, 3)回调,返回false,继续运行。

阶段3的时候,Worker进入阶段3,也是最后一个阶段了,同样完成工作后同步等待。

触发onAdvance(3, 3)回调,这个时候phase=3等于phases-1=3,返回true,Phaser就终止了。

最后所有Worker线程在while循环里检测到phaser.isTerminated()为true。

跳出循环,所有线程正常结束。

还是看一下直观的时序图:

 

五、分层Phaser

分层Phaser是Phaser的独特设计,如果你有很多的线程需要同步,这些线程都去竞争同一个Phaser对象的state 变量,CAS冲突会非常严重。

Phaser可以构建一颗树,我们可以创建N个子Phaser,假如每个Phaser管理100个线程。

创建一个父Phaser就可以管理这N个子线程。

当一个子 Phaser上的100个线程都arrive了,子Phaser不会马上唤醒这100个线程。

他会作为一个参与方,去调用父Phaser的arrive()方法。

只有所有子Phaser都跟父Phaser报道之后,父Phaser的阶段才算完成。

父Phaser阶段完成之后,他会释放所有子Phaser,子Phaser再去释放他们管理的100个线程。

比如下面这段代码(不能运行):


package com.lazy.snail.day56;

import java.util.concurrent.Phaser;

/**
 * @ClassName PhaserDemo4
 * @Description TODO
 * @Author lazysnail
 * @Date 2025/11/3 13:22
 * @Version 1.0
 */
public class PhaserDemo4 {
    public static void main(String[] args) {
        Phaser rootPhaser = new Phaser(1);

        Phaser group1 = new Phaser(rootPhaser, 0);
        Phaser group2 = new Phaser(rootPhaser, 0);

        for (int i = 0; i < 3; i++) {
            new Worker(group1, "Group1-Worker" + i).start();
            new Worker(group2, "Group2-Worker" + i).start();
        }
        
        rootPhaser.arriveAndAwaitAdvance();
        System.out.println("所有组都完成了!");
    }
}

Group1的3个线程在group1 Phaser里面同步。

Group2的3个线程在group2 Phaser里面同步。

当group1所有线程到达,group1会跟rootPhaser说我完成了。

当group2所有线程到达,group2会跟rootPhaser说我完成了。

rootPhaser收到所有子组的报告之后才算根同步完成。

如果没有分层,6个线程会竞争1个Phaser,进行6次CAS操作。

如果像Phaser一样进行了分层,Group1内进行3次CAS,Group2内进行3次CAS,然后在根同步的时候再进行两次CAS。

虽然多了两次CAS操作,但是竞争却被分散了。

六、核心源码解析

6.1 核心属性

Phaser里面一个比较核心的变量就是state。

这个64位的long被分成了四部分:

 

63 (1 bit)

62-32 (31 bits)

31-16 (16 bits)

15-0 (16 bits)

terminated (终止标记)

phase (当前阶段号)

parties (当前参与方总数)

unarrived (尚未到达的参与方)

terminated(1位)是标记位。如果是1 (负数),Phaser就终止。

phase(31位)是当前阶段号。最大为Integer.MAX_VALUE,超过之后环绕回0。

parties(16位)是当前总共注册了多少个参与方。最大支持0xffff (65535) 个参与方。

unarrived(16位)是在当前阶段还有多少参与方没到。

这个state把所有的状态都压缩在了里面,通过CAS进行原子更新。

Phaser里还有两个关键的AtomicReference<QNode> 变量:

 

evenQ叫偶数队列,oddQ叫奇数队列。

当一个线程调用arriveAndAwaitAdvance()他又不是最后一个到达者的时候,需要阻塞。

internalAwaitAdvance会检查当前phase是奇数还是偶数。

把自己包装成一个QNode。

通过CAS将自己压栈到对应的evenQ或oddQ。

当最后一个到达者调用releaseWaiters(phase)的时候,会去对应的队列里,遍历栈,然后用LockSupport.unpark()唤醒所有等待的线程。

这种奇偶分离,是为了减少出队和入队的并发冲突。

6.2 核心方法

int doArrive(int adjust)

arrive()和arriveAndDeregister()都会调用这个方法。

方法中主要是三条关键的路径(不是最后一个到达者、是最后到达者又分为根Phaser和子Phaser):

 

还是看直观的流程图(部分简化):

 

结语

Phaser其实在实际的开发过程中用得不多,只是有些框架底层或者性能比较敏感的场景下有使用。

我们学习主要理解他的实现思路。

下一篇预告

待定

如果你觉得这系列文章对你有帮助,欢迎关注专栏,我们一起坚持下去!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐