《Java并发编程研读》第五章:线程协作和并发容器工具
本文系统总结了Java并发编程中的线程协作工具与并发容器,涵盖Wait/Notify、CountDownLatch、CyclicBarrier等协作机制,以及ConcurrentHashMap、CopyOnWriteArrayList等并发容器。
本期内容为自己总结归档,共分6章,本人遇到过的面试问题会重点标记。
(若有任何疑问,可在评论区告诉我,看到就回复)
第五章:线程协作和并发容器工具
5.1 线程协作工具原理与应用
5.1.1 Wait/Notify机制实现原理
Wait/Notify是Java中最基础的线程协作机制,它基于对象监视器锁实现,通过Object.wait()和Object.notify()/notifyAll()方法实现线程间的等待与唤醒 。
Wait/Notify的底层实现:
Wait/Notify机制依赖于Java对象的监视器(Monitor),每个对象都有一个WaitSet(等待队列)。当线程调用wait()方法时,它会释放当前持有的监视器锁,并将自己加入WaitSet中等待 。当其他线程调用notify()时,会随机唤醒WaitSet中的一个线程;调用notifyAll()时,会唤醒WaitSet中所有等待线程 。
WaitSet的源码实现:
// Monitor类中的WaitSet实现(JDK8)
public class Monitor {
// WaitSet是一个双向链表
private WaitSet waitSet = new WaitSet();
// wait方法实现
public void wait() throws InterruptedException {
// 检查是否持有锁
if (!is Owner()) {
throw new IllegalMonitorStateException();
}
// 释放锁
releaseLock();
// 将线程加入WaitSet
waitSet.addCurrentThread();
// 线程挂起
parkThread();
}
// notify方法实现
public void notify() {
// 检查是否持有锁
if (!is Owner()) {
throw new IllegalMonitorStateException();
}
// 随机选择WaitSet中的一个线程
Thread t = waitSet.removeRandomThread();
if (t != null) {
// 唤醒线程
unparkThread(t);
}
}
// notifyAll方法实现
public void notifyAll() {
// 检查是否持有锁
if (!is Owner()) {
throw new IllegalMonitorStateException();
}
// 移除WaitSet中的所有线程
List<Thread> threads = waitSet.removeALLThreads();
for (Thread t : threads) {
// 唤醒线程
unparkThread(t);
}
}
}
Wait/Notify的使用规则:
Wait/Notify必须与synchronized关键字配合使用,否则会抛出IllegalMonitorStateException异常 。具体规则如下:
- 调用
wait()/notify()/notifyAll()前必须获取对象的监视器锁。 - 调用
wait()后,线程会释放锁并进入WaitSet等待。 - 调用
notify()/notifyAll()后,被唤醒的线程需要重新竞争锁才能继续执行。 wait()调用必须放在try-catch块中,防止被中断后未重新获取锁。
Wait/Notify的典型应用场景:
// 生产者-消费者模式
public classPCModel {
private final List<String> buffer = new ArrayList<>();
private final Object lock = new Object();
// 生产者
public void produce(String item) throws InterruptedException {
synchronized (lock) {
while (buffer.size() >= MAX_SIZE) {
// 缓冲区满时,生产者等待
lock.wait();
}
buffer.add(item);
// 生产完成后,通知消费者
lock.notifyAll();
}
}
// 消费者
public String consume() throws InterruptedException {
synchronized (lock) {
while (buffer.isEmpty()) {
// 缓冲区空时,消费者等待
lock.wait();
}
String item = buffer.remove(0);
// 消费完成后,通知生产者
lock.notifyAll();
return item;
}
}
}
5.1.2 ⭐CountDownLatch实现原理
CountDownLatch是Java并发包中的一种同步工具,允许一个或多个线程等待一系列操作完成 。

JDK8中的实现原理:
CountDownLatch基于AQS(AbstractQueuedSynchronizer)的共享模式实现,通过state变量表示剩余计数 。初始时state设置为计数器值,每次调用countDown()会通过CAS操作将state减1 。当state减到0时,所有等待的线程会被唤醒 。
CountDownLatch的源码结构:
// CountDownLatch的AQS实现(JDK8)
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); }
int getCount() { return CompareAndSwap; }
protected int tryAcquireShared(int acquires) {
return (getCount() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = CompareAndSwap;
if (c == 0) return false;
int nextc = c - 1;
if (nextc < 0) throw new Error("Countdown can't be negative");
if (CAS(CompareAndSwap, c, nextc)) return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync尝试获取共享中断超时(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public int getCount() {
return sync比获取();
}
}
CountDownLatch的典型应用场景:
- 多线程初始化后同步启动:如多个线程完成各自的数据加载后,主线程再开始处理业务。
- 并行计算后的结果汇总:如多个线程并行计算不同部分的结果,主线程等待所有子线程完成后汇总结果。
- 资源就绪等待:如等待数据库连接池就绪后再开始业务处理。
5.1.3 CyclicBarrier实现原理
CyclicBarrier允许一组线程相互等待,直到所有线程都到达屏障点,然后继续执行

JDK8中的实现原理:
CyclicBarrier同样基于AQS实现,但与CountDownLatch不同的是,它使用parties变量表示需要等待的线程数量 。每个线程调用await()时,会将计数器减1,当计数器减到0时,所有线程被唤醒,并且计数器会被重置为初始值 。
CyclicBarrier的源码结构:
// CyclicBarrier的AQS实现(JDK8)
public class CyclicBarrier {
private static final class Sync extends AbstractQueuedSynchronizer {
private final int parties;
private final Runnable barrierAction;
Sync(int parties, Runnable barrierAction) {
this.parties = parties;
this.barrierAction = barrierAction;
setState(parties);
}
int getParties() { return parties; }
protected int tryAcquireShared(int acquires) {
for (;;) {
int c = CompareAndSwap;
if (c == 1) {
// 所有线程已到达,尝试唤醒
if (acquires == 1) {
return 1;
}
// 释放锁
releaseShared(1);
return 1;
}
if (c == 0) {
// 已被重置,不能获取
return -1;
}
// 尝试减1
if (CAS(CompareAndSwap, c, c - 1)) {
return (c == 1) ? 1 : 0;
}
}
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = CompareAndSwap;
if (c == 0) {
// 已被重置,不能释放
return false;
}
if (c < releases) {
// 超过需要释放的数量
throw new IllegalArgumentException();
}
// 唤醒屏障动作线程
if (c == releases) {
if (barrierAction != null) {
// 最后一个到达的线程执行屏障动作
new Thread(() -> barrierAction.run()).start();
}
// 重置计数器
setState(parties);
return true;
}
// 其他线程释放
return CAS(CompareAndSwap, c, c - releases);
}
}
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
sync = new Sync(parties, barrierAction);
}
public int await() throws InterruptedException, BrokenBarrierException {
return sync尝试获取共享中断();
}
public void reset() {
sync releaseShared(1);
}
}
CyclicBarrier的典型应用场景:
- 分阶段并行任务:如分布式计算中,多个线程并行处理不同阶段的任务,每个阶段完成后需要等待所有线程就绪再进入下一阶段。
- 游戏匹配:如等待所有玩家准备好后再开始游戏。
- 多线程数据处理:如多个线程并行处理数据,完成后需要汇总结果。
5.1.4 Exchanger:线程间数据交换
public class ExchangerAdvanced {
// 生产者-消费者交换模式
static class ProducerConsumerExchanger {
public void demonstrate() {
Exchanger<DataBuffer> exchanger = new Exchanger<>();
// 生产者
Thread producer = new Thread(() -> {
DataBuffer emptyBuffer = new DataBuffer();
try {
for (int i = 0; i < 5; i++) {
// 填充数据到空缓冲区
DataBuffer fullBuffer = fillBuffer(emptyBuffer, i);
System.out.println("生产者填充缓冲区 " + i);
// 交换缓冲区
emptyBuffer = exchanger.exchange(fullBuffer);
System.out.println("生产者获得空缓冲区");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
DataBuffer buffer = new DataBuffer();
try {
for (int i = 0; i < 5; i++) {
// 交换并获取填充的缓冲区
buffer = exchanger.exchange(buffer);
System.out.println("消费者获得填充缓冲区 " + i);
// 处理数据
processBuffer(buffer);
System.out.println("消费者处理完成缓冲区 " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
private DataBuffer fillBuffer(DataBuffer buffer, int data) {
buffer.setData("Data-" + data);
return buffer;
}
private void processBuffer(DataBuffer buffer) {
System.out.println("处理数据: " + buffer.getData());
buffer.clear();
}
static class DataBuffer {
private String data;
public void setData(String data) {
this.data = data;
}
public String getData() {
return data;
}
public void clear() {
this.data = null;
}
}
}
// 多线程数据交换网络
static class DataExchangeNetwork {
public void demonstrate() throws InterruptedException {
int threadCount = 4;
List<Exchanger<String>> exchangers = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
exchangers.add(new Exchanger<>());
}
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
final Exchanger<String> leftExchanger = exchangers.get(i);
final Exchanger<String> rightExchanger = exchangers.get((i + 1) % threadCount);
futures.add(executor.submit(() -> {
try {
String myData = "Thread-" + threadId;
for (int round = 0; round < 3; round++) {
System.out.println("线程" + threadId + " 第" + round + "轮开始");
// 向右邻居发送数据,从左邻居接收数据
String receivedData = rightExchanger.exchange(myData);
System.out.println("线程" + threadId + " 接收到: " + receivedData);
// 更新自己的数据
myData = receivedData;
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
}
// 等待所有线程完成
for (Future<?> future : futures) {
future.get();
}
executor.shutdown();
}
}
// Exchanger实现原理(简化版)
static class CustomExchanger<V> {
private static final int EMPTY = 0;
private static final int WAITING = 1;
private static final int BUSY = 2;
private final Object lock = new Object();
private V slot; // 交换槽
private int state = EMPTY; // 状态
private Thread waitingThread; // 等待线程
public V exchange(V value) throws InterruptedException {
synchronized (lock) {
// 检查中断状态
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 情况1:交换槽为空,当前线程成为等待者
if (state == EMPTY) {
slot = value;
state = WAITING;
waitingThread = Thread.currentThread();
try {
while (state == WAITING) {
lock.wait();
}
} catch (InterruptedException e) {
if (state == BUSY) {
// 已经被配对,不能取消
Thread.currentThread().interrupt();
return slot;
} else {
// 取消等待
state = EMPTY;
slot = null;
waitingThread = null;
throw e;
}
}
// 被唤醒,返回配对线程的数据
V result = slot;
slot = null;
state = EMPTY;
waitingThread = null;
return result;
}
// 情况2:已有线程在等待,进行配对
if (state == WAITING) {
V result = slot; // 保存等待线程的数据
slot = value; // 放入当前线程的数据
state = BUSY;
// 唤醒等待线程
waitingThread.interrupt();
lock.notifyAll();
return result;
}
// 情况3:交换槽繁忙(理论上不应该发生)
throw new IllegalStateException("Exchanger is busy");
}
}
}
}
5.2 并发容器工具原理与应用
5.2.1 ConcurrentHashMap实现原理
ConcurrentHashMap是Java中最常用的线程安全哈希表,它在不同版本中采用了不同的实现策略。

实现原理:
JDK8的ConcurrentHashMap摒弃了JDK7的分段锁(Segment)机制,改用CAS + synchronized的细粒度锁策略 。具体实现如下:
- 数据结构:采用数组+链表+红黑树的结构,与HashMap类似。
- 锁机制:对每个桶(链表或红黑树)的头节点加锁,锁粒度更细 。
- CAS操作:用于无锁初始化和状态更新,如
table数组的初始化和扩容 。 - 红黑树优化:当链表长度超过8时,自动转换为红黑树,查询复杂度从O(n)降至O(logn) 。
ConcurrentHashMap的源码关键点:
// ConcurrentHashMap的putVal方法(JDK8)
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null)
throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化table数组
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 桶为空,尝试CAS插入新节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // 无锁插入成功
}
else if ((fh = f.hash) == MOVED) {
// 其他线程正在扩容,帮助迁移
tab = helpTransfer(tab, f);
}
else {
// 桶不为空且不是ForwardingNode,对头节点加锁
synchronized (f) {
if (tabAt(tab, i) == f) { // 再次确认头节点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
return oldVal;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 插入新节点
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树操作
Node<K,V> p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value);
if (p != null) {
V oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
return oldVal;
}
}
}
}
}
}
if (binCount != 0) {
if (binCount >=树化阈值)
treeifyBin(tab, i); // 链表转红黑树
if (oldVal != null)
return oldVal;
}
addCount(1L, binCount); // 更新计数器
return null;
}
ConcurrentHashMap的典型应用场景:
- 缓存系统:如配置缓存、用户信息缓存等读多写少的场景 。
- 分布式计算:如任务分发、结果收集等需要高并发访问的场景。
- 事件处理:如事件监听器注册表,需要线程安全的哈希表存储事件与监听器的映射关系。
5.2.2 CopyOnWriteArrayList实现原理与JDK差异
CopyOnWriteArrayList是Java中一种基于"写时复制"(Copy-On-Write)策略的线程安全列表

JDK8中的实现原理:
CopyOnWriteArrayList通过以下机制保证线程安全:
- 数据结构:内部使用一个
volatile修饰的数组存储元素 。 - 读操作:直接访问当前数组,无需锁控制。
- 写操作:通过
ReentrantLock保证原子性,具体流程如下 :- 获取锁
- 复制当前数组到新数组
- 在新数组上执行修改
- 将新数组设置为当前数组(通过
setArray()方法) - 释放锁
- 迭代器:基于创建时的数组快照,不会抛出
ConcurrentModificationException
CopyOnWriteArrayList的源码关键点:
// CopyOnWriteArrayList的add方法(JDK8)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWriteArrayList的典型应用场景:
- 事件监听器列表:如GUI框架中的事件监听器,注册后极少修改,但事件触发频繁读取 。
- 配置管理:如白名单、黑名单等配置信息,加载后极少更新,但高频读取 。
- 缓存快照:如缓存数据的只读视图,允许短暂的数据延迟
5.2.3 阻塞队列

应用场景选择指南:
| 队列类型 | 特点 | 适用场景 | 容量 |
|---|---|---|---|
| ArrayBlockingQueue | 数组,固定容量,可选公平锁 | 固定资源池,流量控制 | 有界 |
| LinkedBlockingQueue | 链表,可选容量,双锁分离 | 高吞吐生产者-消费者 | 可选有界 |
| PriorityBlockingQueue | 优先级,无界,自然排序 | 任务调度,优先级处理 | 无界 |
| DelayQueue | 延迟元素,时间排序 | 定时任务,缓存过期 | 无界 |
| SynchronousQueue | 直接传递,无存储 | 线程间直接交换 | 0 |
| LinkedTransferQueue | 传输模式,无界 | 高效的消息传递 | 无界 |
5.3 ⭐面试高频考点
5.3.1 CountDownLatch vs CyclicBarrier区别
| 维度 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 重用性 | 一次性,计数到0后失效 | 可循环使用,自动重置 |
| 参与者 | 一个线程等待多个线程 | 多个线程互相等待 |
| 计数方向 | 递减计数 | 递增到指定值 |
| 核心方法 | countDown(), await() | await(), reset() |
| 应用场景 | 启动准备,资源初始化 | 分阶段并行计算 |
代码对比:
// CountDownLatch:主线程等待工作线程
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
doWork();
latch.countDown(); // 完成后减1
});
}
latch.await(); // 主线程等待
// CyclicBarrier:工作线程互相等待
CyclicBarrier barrier = new CyclicBarrier(3,
() -> System.out.println("所有线程到达"));
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
doWork();
barrier.await(); // 互相等待
nextPhase();
});
}
5.3.2 ConcurrentHashMap并发度优化
面试要点:(详细见5.2.1)
-
JDK7分段锁:默认16个Segment,每个Segment独立加锁
-
JDK8优化:synchronized+CAS+红黑树
-
扩容策略:多线程协助扩容,避免单点瓶颈
-
统计大小:使用CounterCell分散计数
5.5.3 CopyOnWriteArrayList适用场景
三大特性:
-
写时复制:写操作复制整个数组,读操作无锁
-
快照迭代:迭代器使用创建时的数组快照
-
最终一致性:读操作可能看不到最新的写入
适用场景:
-
监听器列表管理
-
读多写少的配置数据
-
不要求强一致性的场景
不适用场景:
-
频繁写入的大集合
-
要求强一致性的场景
-
内存敏感的应用
更多推荐



所有评论(0)