本期内容为自己总结归档,共分6章,本人遇到过的面试问题会重点标记。

第一章:线程基础

第二章:线程安全

第三章:锁机制

第四章:线程池与任务框架

第五章:线程协作工具和并发容器工具

第六章:底层原理

(若有任何疑问,可在评论区告诉我,看到就回复)

第五章:线程协作和并发容器工具

5.1 线程协作工具原理与应用

5.1.1 Wait/Notify机制实现原理

        Wait/Notify是Java中最基础的线程协作机制,它基于对象监视器锁实现,通过Object.wait()Object.notify()/notifyAll()方法实现线程间的等待与唤醒 。

Wait/Notify的底层实现

        Wait/Notify机制依赖于Java对象的监视器(Monitor),每个对象都有一个WaitSet(等待队列)。当线程调用wait()方法时,它会释放当前持有的监视器锁,并将自己加入WaitSet中等待 。当其他线程调用notify()时,会随机唤醒WaitSet中的一个线程;调用notifyAll()时,会唤醒WaitSet中所有等待线程 。

WaitSet的源码实现

// Monitor类中的WaitSet实现(JDK8)
public class Monitor {
    // WaitSet是一个双向链表
    private WaitSet waitSet = new WaitSet();

    // wait方法实现
    public void wait() throws InterruptedException {
        // 检查是否持有锁
        if (!is Owner()) {
            throw new IllegalMonitorStateException();
        }

        // 释放锁
        releaseLock();
        // 将线程加入WaitSet
        waitSet.addCurrentThread();
        // 线程挂起
        parkThread();
    }

    // notify方法实现
    public void notify() {
        // 检查是否持有锁
        if (!is Owner()) {
            throw new IllegalMonitorStateException();
        }

        // 随机选择WaitSet中的一个线程
        Thread t = waitSet.removeRandomThread();
        if (t != null) {
            // 唤醒线程
            unparkThread(t);
        }
    }

    // notifyAll方法实现
    public void notifyAll() {
        // 检查是否持有锁
        if (!is Owner()) {
            throw new IllegalMonitorStateException();
        }

        // 移除WaitSet中的所有线程
        List<Thread> threads = waitSet.removeALLThreads();
        for (Thread t : threads) {
            // 唤醒线程
            unparkThread(t);
        }
    }
}

Wait/Notify的使用规则

Wait/Notify必须与synchronized关键字配合使用,否则会抛出IllegalMonitorStateException异常 。具体规则如下:

  1. 调用wait()/notify()/notifyAll()前必须获取对象的监视器锁。
  2. 调用wait()后,线程会释放锁并进入WaitSet等待。
  3. 调用notify()/notifyAll()后,被唤醒的线程需要重新竞争锁才能继续执行。
  4. wait()调用必须放在try-catch块中,防止被中断后未重新获取锁。

Wait/Notify的典型应用场景

// 生产者-消费者模式
public classPCModel {
    private final List<String> buffer = new ArrayList<>();
    private final Object lock = new Object();

    // 生产者
    public void produce(String item) throws InterruptedException {
        synchronized (lock) {
            while (buffer.size() >= MAX_SIZE) {
                // 缓冲区满时,生产者等待
                lock.wait();
            }
            buffer.add(item);
            // 生产完成后,通知消费者
            lock.notifyAll();
        }
    }

    // 消费者
    public String consume() throws InterruptedException {
        synchronized (lock) {
            while (buffer.isEmpty()) {
                // 缓冲区空时,消费者等待
                lock.wait();
            }
            String item = buffer.remove(0);
            // 消费完成后,通知生产者
            lock.notifyAll();
            return item;
        }
    }
}

5.1.2 ⭐CountDownLatch实现原理

CountDownLatch是Java并发包中的一种同步工具,允许一个或多个线程等待一系列操作完成 。

JDK8中的实现原理

        CountDownLatch基于AQS(AbstractQueuedSynchronizer)的共享模式实现,通过state变量表示剩余计数 。初始时state设置为计数器值,每次调用countDown()会通过CAS操作将state减1 。当state减到0时,所有等待的线程会被唤醒 。

CountDownLatch的源码结构

// CountDownLatch的AQS实现(JDK8)
public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) { setState(count); }

        int getCount() { return CompareAndSwap; }

        protected int tryAcquireShared(int acquires) {
            return (getCount() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = CompareAndSwap;
                if (c == 0) return false;
                int nextc = c - 1;
                if (nextc < 0) throw new Error("Countdown can't be negative");
                if (CAS(CompareAndSwap, c, nextc)) return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync尝试获取共享中断超时(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public int getCount() {
        return sync比获取();
    }
}

CountDownLatch的典型应用场景

  1. 多线程初始化后同步启动:如多个线程完成各自的数据加载后,主线程再开始处理业务。
  2. 并行计算后的结果汇总:如多个线程并行计算不同部分的结果,主线程等待所有子线程完成后汇总结果。
  3. 资源就绪等待:如等待数据库连接池就绪后再开始业务处理。

5.1.3 CyclicBarrier实现原理

CyclicBarrier允许一组线程相互等待,直到所有线程都到达屏障点,然后继续执行

JDK8中的实现原理

CyclicBarrier同样基于AQS实现,但与CountDownLatch不同的是,它使用parties变量表示需要等待的线程数量 。每个线程调用await()时,会将计数器减1,当计数器减到0时,所有线程被唤醒,并且计数器会被重置为初始值 。

CyclicBarrier的源码结构

// CyclicBarrier的AQS实现(JDK8)
public class CyclicBarrier {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final int parties;
        private final Runnable barrierAction;

        Sync(int parties, Runnable barrierAction) {
            this.parties = parties;
            this.barrierAction = barrierAction;
            setState(parties);
        }

        int getParties() { return parties; }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                int c = CompareAndSwap;
                if (c == 1) {
                    // 所有线程已到达,尝试唤醒
                    if (acquires == 1) {
                        return 1;
                    }
                    // 释放锁
                    releaseShared(1);
                    return 1;
                }
                if (c == 0) {
                    // 已被重置,不能获取
                    return -1;
                }
                // 尝试减1
                if (CAS(CompareAndSwap, c, c - 1)) {
                    return (c == 1) ? 1 : 0;
                }
            }
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = CompareAndSwap;
                if (c == 0) {
                    // 已被重置,不能释放
                    return false;
                }
                if (c < releases) {
                    // 超过需要释放的数量
                    throw new IllegalArgumentException();
                }
                // 唤醒屏障动作线程
                if (c == releases) {
                    if (barrierAction != null) {
                        // 最后一个到达的线程执行屏障动作
                        new Thread(() -> barrierAction.run()).start();
                    }
                    // 重置计数器
                    setState(parties);
                    return true;
                }
                // 其他线程释放
                return CAS(CompareAndSwap, c, c - releases);
            }
        }
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        sync = new Sync(parties, barrierAction);
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        return sync尝试获取共享中断();
    }

    public void reset() {
        sync releaseShared(1);
    }
}

CyclicBarrier的典型应用场景

  1. 分阶段并行任务:如分布式计算中,多个线程并行处理不同阶段的任务,每个阶段完成后需要等待所有线程就绪再进入下一阶段。
  2. 游戏匹配:如等待所有玩家准备好后再开始游戏。
  3. 多线程数据处理:如多个线程并行处理数据,完成后需要汇总结果。

5.1.4 Exchanger:线程间数据交换

public class ExchangerAdvanced {
    
    // 生产者-消费者交换模式
    static class ProducerConsumerExchanger {
        public void demonstrate() {
            Exchanger<DataBuffer> exchanger = new Exchanger<>();
            
            // 生产者
            Thread producer = new Thread(() -> {
                DataBuffer emptyBuffer = new DataBuffer();
                try {
                    for (int i = 0; i < 5; i++) {
                        // 填充数据到空缓冲区
                        DataBuffer fullBuffer = fillBuffer(emptyBuffer, i);
                        System.out.println("生产者填充缓冲区 " + i);
                        
                        // 交换缓冲区
                        emptyBuffer = exchanger.exchange(fullBuffer);
                        System.out.println("生产者获得空缓冲区");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            
            // 消费者
            Thread consumer = new Thread(() -> {
                DataBuffer buffer = new DataBuffer();
                try {
                    for (int i = 0; i < 5; i++) {
                        // 交换并获取填充的缓冲区
                        buffer = exchanger.exchange(buffer);
                        System.out.println("消费者获得填充缓冲区 " + i);
                        
                        // 处理数据
                        processBuffer(buffer);
                        System.out.println("消费者处理完成缓冲区 " + i);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            
            producer.start();
            consumer.start();
        }
        
        private DataBuffer fillBuffer(DataBuffer buffer, int data) {
            buffer.setData("Data-" + data);
            return buffer;
        }
        
        private void processBuffer(DataBuffer buffer) {
            System.out.println("处理数据: " + buffer.getData());
            buffer.clear();
        }
        
        static class DataBuffer {
            private String data;
            
            public void setData(String data) {
                this.data = data;
            }
            
            public String getData() {
                return data;
            }
            
            public void clear() {
                this.data = null;
            }
        }
    }
    
    // 多线程数据交换网络
    static class DataExchangeNetwork {
        public void demonstrate() throws InterruptedException {
            int threadCount = 4;
            List<Exchanger<String>> exchangers = new ArrayList<>();
            
            for (int i = 0; i < threadCount; i++) {
                exchangers.add(new Exchanger<>());
            }
            
            ExecutorService executor = Executors.newFixedThreadPool(threadCount);
            List<Future<?>> futures = new ArrayList<>();
            
            for (int i = 0; i < threadCount; i++) {
                final int threadId = i;
                final Exchanger<String> leftExchanger = exchangers.get(i);
                final Exchanger<String> rightExchanger = exchangers.get((i + 1) % threadCount);
                
                futures.add(executor.submit(() -> {
                    try {
                        String myData = "Thread-" + threadId;
                        
                        for (int round = 0; round < 3; round++) {
                            System.out.println("线程" + threadId + " 第" + round + "轮开始");
                            
                            // 向右邻居发送数据,从左邻居接收数据
                            String receivedData = rightExchanger.exchange(myData);
                            System.out.println("线程" + threadId + " 接收到: " + receivedData);
                            
                            // 更新自己的数据
                            myData = receivedData;
                            
                            Thread.sleep(100);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }));
            }
            
            // 等待所有线程完成
            for (Future<?> future : futures) {
                future.get();
            }
            
            executor.shutdown();
        }
    }
    
    // Exchanger实现原理(简化版)
    static class CustomExchanger<V> {
        private static final int EMPTY = 0;
        private static final int WAITING = 1;
        private static final int BUSY = 2;
        
        private final Object lock = new Object();
        private V slot;          // 交换槽
        private int state = EMPTY; // 状态
        private Thread waitingThread; // 等待线程
        
        public V exchange(V value) throws InterruptedException {
            synchronized (lock) {
                // 检查中断状态
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                
                // 情况1:交换槽为空,当前线程成为等待者
                if (state == EMPTY) {
                    slot = value;
                    state = WAITING;
                    waitingThread = Thread.currentThread();
                    
                    try {
                        while (state == WAITING) {
                            lock.wait();
                        }
                    } catch (InterruptedException e) {
                        if (state == BUSY) {
                            // 已经被配对,不能取消
                            Thread.currentThread().interrupt();
                            return slot;
                        } else {
                            // 取消等待
                            state = EMPTY;
                            slot = null;
                            waitingThread = null;
                            throw e;
                        }
                    }
                    
                    // 被唤醒,返回配对线程的数据
                    V result = slot;
                    slot = null;
                    state = EMPTY;
                    waitingThread = null;
                    return result;
                }
                
                // 情况2:已有线程在等待,进行配对
                if (state == WAITING) {
                    V result = slot; // 保存等待线程的数据
                    slot = value;    // 放入当前线程的数据
                    state = BUSY;
                    
                    // 唤醒等待线程
                    waitingThread.interrupt();
                    lock.notifyAll();
                    
                    return result;
                }
                
                // 情况3:交换槽繁忙(理论上不应该发生)
                throw new IllegalStateException("Exchanger is busy");
            }
        }
    }
}

5.2 并发容器工具原理与应用

5.2.1 ConcurrentHashMap实现原理

ConcurrentHashMap是Java中最常用的线程安全哈希表,它在不同版本中采用了不同的实现策略。

实现原理

JDK8的ConcurrentHashMap摒弃了JDK7的分段锁(Segment)机制,改用CAS + synchronized的细粒度锁策略 。具体实现如下:

  1. 数据结构:采用数组+链表+红黑树的结构,与HashMap类似。
  2. 锁机制:对每个桶(链表或红黑树)的头节点加锁,锁粒度更细  。
  3. CAS操作:用于无锁初始化和状态更新,如table数组的初始化和扩容 。
  4. 红黑树优化:当链表长度超过8时,自动转换为红黑树,查询复杂度从O(n)降至O(logn) 。

ConcurrentHashMap的源码关键点

// ConcurrentHashMap的putVal方法(JDK8)
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null)
        throw new NullPointerException();

    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // 初始化table数组

        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 桶为空,尝试CAS插入新节点
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break; // 无锁插入成功
        }

        else if ((fh = f.hash) == MOVED) {
            // 其他线程正在扩容,帮助迁移
            tab = helpTransfer(tab, f);
        }

        else {
            // 桶不为空且不是ForwardingNode,对头节点加锁
            synchronized (f) {
                if (tabAt(tab, i) == f) { // 再次确认头节点
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                                V oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                return oldVal;
                            }

                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 插入新节点
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }

                    else if (f instanceof TreeBin) {
                        // 红黑树操作
                        Node<K,V> p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value);
                        if (p != null) {
                            V oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                            return oldVal;
                        }
                    }
                }
            }
        }
    }

    if (binCount != 0) {
        if (binCount >=树化阈值)
            treeifyBin(tab, i); // 链表转红黑树
        if (oldVal != null)
            return oldVal;
    }

    addCount(1L, binCount); // 更新计数器
    return null;
}

ConcurrentHashMap的典型应用场景

  1. 缓存系统:如配置缓存、用户信息缓存等读多写少的场景 。
  2. 分布式计算:如任务分发、结果收集等需要高并发访问的场景。
  3. 事件处理:如事件监听器注册表,需要线程安全的哈希表存储事件与监听器的映射关系。

5.2.2 CopyOnWriteArrayList实现原理与JDK差异

CopyOnWriteArrayList是Java中一种基于"写时复制"(Copy-On-Write)策略的线程安全列表

JDK8中的实现原理

CopyOnWriteArrayList通过以下机制保证线程安全:

  1. 数据结构:内部使用一个volatile修饰的数组存储元素 。
  2. 读操作:直接访问当前数组,无需锁控制。
  3. 写操作:通过ReentrantLock保证原子性,具体流程如下 : 
    1.   获取锁 
    2. 复制当前数组到新数组
    3. 在新数组上执行修改
    4. 将新数组设置为当前数组(通过setArray()方法)
    5. 释放锁
  4. 迭代器:基于创建时的数组快照,不会抛出ConcurrentModificationException 

CopyOnWriteArrayList的源码关键点

// CopyOnWriteArrayList的add方法(JDK8)
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

CopyOnWriteArrayList的典型应用场景

  1. 事件监听器列表:如GUI框架中的事件监听器,注册后极少修改,但事件触发频繁读取 。
  2. 配置管理:如白名单、黑名单等配置信息,加载后极少更新,但高频读取 。
  3. 缓存快照:如缓存数据的只读视图,允许短暂的数据延迟 

5.2.3 阻塞队列

应用场景选择指南

队列类型 特点 适用场景 容量
ArrayBlockingQueue 数组,固定容量,可选公平锁 固定资源池,流量控制 有界
LinkedBlockingQueue 链表,可选容量,双锁分离 高吞吐生产者-消费者 可选有界
PriorityBlockingQueue 优先级,无界,自然排序 任务调度,优先级处理 无界
DelayQueue 延迟元素,时间排序 定时任务,缓存过期 无界
SynchronousQueue 直接传递,无存储 线程间直接交换 0
LinkedTransferQueue 传输模式,无界 高效的消息传递 无界

5.3  ⭐面试高频考点

5.3.1 CountDownLatch vs CyclicBarrier区别

维度 CountDownLatch CyclicBarrier
重用性 一次性,计数到0后失效 可循环使用,自动重置
参与者 一个线程等待多个线程 多个线程互相等待
计数方向 递减计数 递增到指定值
核心方法 countDown(), await() await(), reset()
应用场景 启动准备,资源初始化 分阶段并行计算

代码对比

// CountDownLatch:主线程等待工作线程
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
    executor.submit(() -> {
        doWork();
        latch.countDown(); // 完成后减1
    });
}
latch.await(); // 主线程等待

// CyclicBarrier:工作线程互相等待
CyclicBarrier barrier = new CyclicBarrier(3, 
    () -> System.out.println("所有线程到达"));
for (int i = 0; i < 3; i++) {
    executor.submit(() -> {
        doWork();
        barrier.await(); // 互相等待
        nextPhase();
    });
}

5.3.2 ConcurrentHashMap并发度优化

面试要点:(详细见5.2.1)

  1. JDK7分段锁:默认16个Segment,每个Segment独立加锁

  2. JDK8优化:synchronized+CAS+红黑树

  3. 扩容策略:多线程协助扩容,避免单点瓶颈

  4. 统计大小:使用CounterCell分散计数

5.5.3 CopyOnWriteArrayList适用场景

三大特性

  1. 写时复制:写操作复制整个数组,读操作无锁

  2. 快照迭代:迭代器使用创建时的数组快照

  3. 最终一致性:读操作可能看不到最新的写入

适用场景

  • 监听器列表管理

  • 读多写少的配置数据

  • 不要求强一致性的场景

不适用场景

  • 频繁写入的大集合

  • 要求强一致性的场景

  • 内存敏感的应用

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐