Day56 | J.U.C工具-Phaser详解
Phaser是J.U.C提供的并发工具,兼具CountDownLatch和CyclicBarrier特性,支持动态调整参与方数量。核心特点包括:1)可重用;2)运行时可增减参与方(register/deregister);3)支持分阶段(phase)自动推进;4)可构建分层结构降低竞争;5)提供终止机制。通过arriveAndAwaitAdvance()实现同步,onAdvance()控制阶段行为
之前我们讲过CountDownLatch和CyclicBarrier。
前者是一次性的栅栏,通过countDown()减数,await()等待计数值归零。
后者是可重用的栅栏,所有线程await()到达之后,重置栅栏,进入下一代。
两者都有一个共同的限制,那就是参与的线程数量在创建的时候就固定好了。
在CountDownLatch里面,我们通过new CountDownLatch(10)进行了设置,就必须countDown()10次。
在CyclicBarrier里面,我们通过new CyclicBarrier(10)进行了设置,就必须有10个线程await()。
但是在很多复杂的并发场景里面,我们没办法在开始的时候就确定到底有多少参与方。
比如,一个主任务可能会动态地派生出N个子任务,主任务需要等所有子任务(数量未知)都完成之后才能继续。
为了解决这个问题,J.U.C又给我们提供了一个工具--Phaser。
一、核心概念
Phaser继承了CyclicBarrier的可重用特性,在这个基础上还提供了更加灵活的功能。
Phaser可以在运行期间动态增加(register)或减少(deregister)参与方的数量。
Phaser中文直译就是阶段。所有参与方到达当前阶段之后,Phaser会自动进入下一个阶段(phase编号+1)。
Phaser还可以构建成一棵树。一个Phaser可以有父Phaser和子Phaser。主要是为了降低大规模并发时的竞争,提高吞吐量。
Phaser还可以被终止。终止之后,await()方法会马上返回,不会阻塞。
二、核心API
我们主要看一下Phaser里面跟到达和注册相关的API。
3.1 构造方法
public Phaser() {
this(null, 0);
}
public Phaser(int parties) {
this(null, parties);
}
public Phaser(Phaser parent) {
this(parent, 0);
}
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
前三个构造方法都是为了使用方便,把某些参数设置了默认值。
第四个构造是主构造方法,主要设置的是parent和parties。前者是父Phaser,后者是参与方的数量。
3.2 核心方法
int register()

调用这个方法会注册一个新的参与方,parties+1。
int bulkRegister(int parties)

给定指定数量的参与方进行注册。
int arrive()

当前线程到达栅栏,但不阻塞。unarrived(未到达数)- 1。
int arriveAndAwaitAdvance()

当前线程到达栅栏,并阻塞等待,直到本阶段所有其他参与方都到达。这里跟CyclicBarrier.await()是等价的。
int arriveAndDeregister()

当前线程到达栅栏,并注销自己。parties - 1。之后,Phaser在下一阶段就不会再等待这个线程了。
int getPhase()

获取当前阶段号,是从0开始的。
boolean onAdvance(int phase, int registeredParties)

这是一个可以覆写的方法。当一个阶段完成,进入下一个阶段的时候,Phaser会自动调用他。
如果这个方法返回true,Phaser就会进入终止状态。
3.3 Phaser的基础使用
了解了一些基础概念和核心的API,我们通过一个小案例,来使用一下Phaser。
package com.lazy.snail.day56;
import java.util.concurrent.Phaser;
/**
* @ClassName PhaserDemo1
* @Description TODO
* @Author lazysnail
* @Date 2025/11/3 10:53
* @Version 1.0
*/
public class PhaserDemo1 {
public static void main(String[] args) {
int parties = 5;
Phaser phaser = new Phaser(parties);
for (int i = 0; i < parties; i++) {
int finalI = i;
new Thread(() -> {
try {
System.out.println("线程 " + finalI + " 正在执行任务...");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程 " + finalI + " 完成任务,等待栅栏...");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + finalI + " 越过栅栏,继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
这个案例比较简单,只是用Phaser实现了多线程同步。
创建了5个线程,每个线程先执行自己的任务,然后通过phaser.arriveAndAwaitAdvance()在栅栏处等待。
只有当所有5个线程都到达栅栏后,才会一起继续执行后续代码。
Phaser在这里只是充当了一个可重用的同步屏障,确保所有线程完成当前阶段后再进入下一阶段。
类似CyclicBarrier。
四、核心应用
下面我们通过两个示例来深入的了解下Phaser。
4.1 动态参与方特性
动态的调整参与方应该是Phaser最强大的功能了。
package com.lazy.snail.day56;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/**
* @ClassName PhaserDemo2
* @Description TODO
* @Author lazysnail
* @Date 2025/11/3 11:20
* @Version 1.0
*/
public class PhaserDemo2 {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(1);
System.out.println("主线程(Phase " + phaser.getPhase() + "):启动 " + phaser.getRegisteredParties() + " 个参与方");
for (int i = 0; i < 3; i++) {
new Task(phaser, i).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程(Phase " + phaser.getPhase() + "):等待 " + phaser.getRegisteredParties() + " 个参与方到达...");
phaser.arriveAndAwaitAdvance();
System.out.println("主线程(Phase " + phaser.getPhase() + "):所有任务启动完毕!");
System.out.println("主线程(Phase " + phaser.getPhase() + "):等待 " + phaser.getRegisteredParties() + " 个参与方完成...");
phaser.arriveAndAwaitAdvance();
System.out.println("主线程(Phase " + phaser.getPhase() + "):所有存活任务已完成!");
System.out.println("主线程(Phase " + phaser.getPhase() + "):等待 " + phaser.getRegisteredParties() + " 个参与方退出...");
phaser.arriveAndAwaitAdvance();
System.out.println("主线程(Phase " + phaser.getPhase() + "):所有任务均已退出!");
System.out.println("Phaser 终止状态:" + phaser.isTerminated());
}
static class Task extends Thread {
private final Phaser phaser;
private final int id;
Task(Phaser phaser, int id) {
this.phaser = phaser;
this.id = id;
this.phaser.register();
System.out.println("任务" + id + ":已注册,当前参与方 " + phaser.getRegisteredParties());
}
@Override
public void run() {
try {
System.out.println("任务" + id + ":已启动,等待...");
phaser.arriveAndAwaitAdvance();
System.out.println("任务" + id + ":(Phase " + phaser.getPhase() + ") 开始工作...");
Thread.sleep(1000 + id * 500);
if (id == 0 || id == 1) {
System.out.println("任务" + id + ":工作完成,退出!");
phaser.arriveAndDeregister();
} else {
System.out.println("任务" + id + ":工作完成,等待...");
phaser.arriveAndAwaitAdvance();
}
if (id == 2) {
System.out.println("任务" + id + ":(Phase " + phaser.getPhase() + ") 开始第二阶段工作...");
Thread.sleep(1000);
System.out.println("任务" + id + ":第二阶段完成,退出!");
phaser.arriveAndDeregister();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
我们在代码中先通过new Phaser(1)注册了第一个参与方。
Task在构造的时候,就会进行register。for循环结束的时候,我们就有了四个参与方,一个主线程和3个任务。
主线程里通过arriveAndAwaitAdvance等待所有任务到达。
每个Task线程通过arriveAndAwaitAdvance通知已启动完成。
阶段1的时候,任务0和1执行arriveAndDeregister()后退出,然后减少参与方计数。
任务2执行arriveAndAwaitAdvance()继续参与下一阶段。
阶段2的时候,任务2开始第二阶段的工作。
阶段3主线程检测到所有任务都已退出。
来看一下时序图:

4.2 使用onAdvance()终止Phaser
Phaser可以通过覆写onAdvance()来控制阶段行为,在特定条件,比如达到某个阶段或参与方为空时终止。
package com.lazy.snail.day56;
import java.util.concurrent.Phaser;
/**
* @ClassName PhaserDemo3
* @Description TODO
* @Author lazysnail
* @Date 2025/11/3 11:43
* @Version 1.0
*/
public class PhaserDemo3 {
public static void main(String[] args) {
int parties = 3;
int phases = 4;
Phaser phaser = new Phaser(parties) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("--- 阶段 " + phase + " 完成,当前参与方 " + registeredParties + " ---");
if (phase == phases - 1 || registeredParties == 0) {
System.out.println(">>> Phaser 终止!");
return true;
}
return false;
}
};
for (int i = 0; i < parties; i++) {
new Worker(phaser, "Worker-" + i).start();
}
}
static class Worker extends Thread {
private final Phaser phaser;
Worker(Phaser phaser, String name) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
while (!phaser.isTerminated()) {
try {
System.out.println(getName() + ":(Phase " + phaser.getPhase() + ") 正在工作...");
Thread.sleep(1000 + (long)(Math.random() * 1000));
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(getName() + ":退出。");
}
}
}
初始化阶段,我们在代码里通过new Phaser(parties) 创建了一个有3个参与方的Phaser,而且重写了 onAdvance方法用于阶段完成时的回调和控制终止。
for循环创建了3个Worker线程,每个Worker在构造的时候就已经作为参与方注册到Phaser里面。
阶段0的时候,所有3个Worker线程同时开始工作。
每个Worker执行完工作后调用phaser.arriveAndAwaitAdvance()等待其他线程。
当最后一个Worker到达的时候,触发onAdvance(0, 3)回调。
由于phase=0不等于phases-1=3,返回false,Phaser继续运行。
阶段1的时候,所有Worker被唤醒,进入阶段1的工作,还是执行工作后等待同步。
最后一个到达时触发onAdvance(1, 3) 回调,返回false,继续下一阶段。
阶段2的时候,Worker进入阶段2的工作循环,任然完成工作后同步等待。
触发onAdvance(2, 3)回调,返回false,继续运行。
阶段3的时候,Worker进入阶段3,也是最后一个阶段了,同样完成工作后同步等待。
触发onAdvance(3, 3)回调,这个时候phase=3等于phases-1=3,返回true,Phaser就终止了。
最后所有Worker线程在while循环里检测到phaser.isTerminated()为true。
跳出循环,所有线程正常结束。
还是看一下直观的时序图:

五、分层Phaser
分层Phaser是Phaser的独特设计,如果你有很多的线程需要同步,这些线程都去竞争同一个Phaser对象的state 变量,CAS冲突会非常严重。
Phaser可以构建一颗树,我们可以创建N个子Phaser,假如每个Phaser管理100个线程。
创建一个父Phaser就可以管理这N个子线程。
当一个子 Phaser上的100个线程都arrive了,子Phaser不会马上唤醒这100个线程。
他会作为一个参与方,去调用父Phaser的arrive()方法。
只有所有子Phaser都跟父Phaser报道之后,父Phaser的阶段才算完成。
父Phaser阶段完成之后,他会释放所有子Phaser,子Phaser再去释放他们管理的100个线程。
比如下面这段代码(不能运行):
package com.lazy.snail.day56;
import java.util.concurrent.Phaser;
/**
* @ClassName PhaserDemo4
* @Description TODO
* @Author lazysnail
* @Date 2025/11/3 13:22
* @Version 1.0
*/
public class PhaserDemo4 {
public static void main(String[] args) {
Phaser rootPhaser = new Phaser(1);
Phaser group1 = new Phaser(rootPhaser, 0);
Phaser group2 = new Phaser(rootPhaser, 0);
for (int i = 0; i < 3; i++) {
new Worker(group1, "Group1-Worker" + i).start();
new Worker(group2, "Group2-Worker" + i).start();
}
rootPhaser.arriveAndAwaitAdvance();
System.out.println("所有组都完成了!");
}
}
Group1的3个线程在group1 Phaser里面同步。
Group2的3个线程在group2 Phaser里面同步。
当group1所有线程到达,group1会跟rootPhaser说我完成了。
当group2所有线程到达,group2会跟rootPhaser说我完成了。
rootPhaser收到所有子组的报告之后才算根同步完成。
如果没有分层,6个线程会竞争1个Phaser,进行6次CAS操作。
如果像Phaser一样进行了分层,Group1内进行3次CAS,Group2内进行3次CAS,然后在根同步的时候再进行两次CAS。
虽然多了两次CAS操作,但是竞争却被分散了。
六、核心源码解析
6.1 核心属性
Phaser里面一个比较核心的变量就是state。
这个64位的long被分成了四部分:

|
63 (1 bit) |
62-32 (31 bits) |
31-16 (16 bits) |
15-0 (16 bits) |
|
terminated (终止标记) |
phase (当前阶段号) |
parties (当前参与方总数) |
unarrived (尚未到达的参与方) |
terminated(1位)是标记位。如果是1 (负数),Phaser就终止。
phase(31位)是当前阶段号。最大为Integer.MAX_VALUE,超过之后环绕回0。
parties(16位)是当前总共注册了多少个参与方。最大支持0xffff (65535) 个参与方。
unarrived(16位)是在当前阶段还有多少参与方没到。
这个state把所有的状态都压缩在了里面,通过CAS进行原子更新。
Phaser里还有两个关键的AtomicReference<QNode> 变量:

evenQ叫偶数队列,oddQ叫奇数队列。
当一个线程调用arriveAndAwaitAdvance()他又不是最后一个到达者的时候,需要阻塞。
internalAwaitAdvance会检查当前phase是奇数还是偶数。
把自己包装成一个QNode。
通过CAS将自己压栈到对应的evenQ或oddQ。
当最后一个到达者调用releaseWaiters(phase)的时候,会去对应的队列里,遍历栈,然后用LockSupport.unpark()唤醒所有等待的线程。
这种奇偶分离,是为了减少出队和入队的并发冲突。
6.2 核心方法
int doArrive(int adjust)
arrive()和arriveAndDeregister()都会调用这个方法。
方法中主要是三条关键的路径(不是最后一个到达者、是最后到达者又分为根Phaser和子Phaser):

还是看直观的流程图(部分简化):

结语
Phaser其实在实际的开发过程中用得不多,只是有些框架底层或者性能比较敏感的场景下有使用。
我们学习主要理解他的实现思路。
下一篇预告
待定
如果你觉得这系列文章对你有帮助,欢迎关注专栏,我们一起坚持下去!
更多推荐



所有评论(0)