Java 并发编程详解与示例

1. 并发基础概念

1.1 进程与线程

  • 进程:操作系统资源分配的基本单位

  • 线程:CPU调度的基本单位,共享进程资源

1.2 Java 内存模型 (JMM)

public class MemoryModelExample {
    private static boolean flag = false;
    private static int number = 0;
    
    public static void main(String[] args) throws InterruptedException {
        Thread writer = new Thread(() -> {
            number = 42;
            flag = true;  // 可能发生指令重排序
        });
        
        Thread reader = new Thread(() -> {
            while (!flag) {
                // 循环等待
            }
            System.out.println("Number: " + number);  // 可能看到0
        });
        
        reader.start();
        Thread.sleep(100);
        writer.start();
    }
}

2. 线程创建与管理

2.1 继承 Thread 类


public class MyThread extends Thread {
    private final String name;
    
    public MyThread(String name) {
        this.name = name;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(name + " - Count: " + i);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                System.out.println(name + " interrupted");
                return;
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        MyThread t1 = new MyThread("Thread-1");
        MyThread t2 = new MyThread("Thread-2");
        
        t1.start();
        t2.start();
        
        t1.join();  // 等待线程结束
        t2.join();
        
        System.out.println("All threads completed");
    }
}

2.2 实现 Runnable 接口


public class MyRunnable implements Runnable {
    private final String name;
    private volatile boolean running = true;
    
    public MyRunnable(String name) {
        this.name = name;
    }
    
    public void stop() {
        running = false;
    }
    
    @Override
    public void run() {
        int count = 0;
        while (running && count < 10) {
            System.out.println(name + " - Working: " + count++);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println(name + " interrupted");
                break;
            }
        }
        System.out.println(name + " finished");
    }
    
    public static void main(String[] args) throws InterruptedException {
        MyRunnable r1 = new MyRunnable("Worker-1");
        MyRunnable r2 = new MyRunnable("Worker-2");
        
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        
        t1.start();
        t2.start();
        
        Thread.sleep(3000);
        r1.stop();  // 优雅停止线程
        
        t1.join();
        t2.join();
    }
}

2.3 使用 Callable 和 Future


import java.util.concurrent.*;

public class CallableExample {
    
    static class ComputationTask implements Callable<Long> {
        private final int number;
        
        public ComputationTask(int number) {
            this.number = number;
        }
        
        @Override
        public Long call() throws Exception {
            System.out.println("Computing factorial of " + number);
            long result = factorial(number);
            System.out.println("Completed factorial of " + number);
            return result;
        }
        
        private long factorial(int n) {
            if (n <= 1) return 1;
            long result = 1;
            for (int i = 2; i <= n; i++) {
                result *= i;
                try {
                    Thread.sleep(100); // 模拟计算时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            return result;
        }
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        Future<Long> future1 = executor.submit(new ComputationTask(10));
        Future<Long> future2 = executor.submit(new ComputationTask(15));
        
        System.out.println("Tasks submitted, waiting for results...");
        
        // 非阻塞方式检查结果
        while (!future1.isDone() || !future2.isDone()) {
            System.out.println("Still calculating...");
            Thread.sleep(500);
        }
        
        Long result1 = future1.get();
        Long result2 = future2.get();
        
        System.out.println("Factorial of 10: " + result1);
        System.out.println("Factorial of 15: " + result2);
        
        executor.shutdown();
    }
}

3. 线程同步

3.1 synchronized 关键字


public class SynchronizedExample {
    
    static class Counter {
        private int count = 0;
        
        // 同步方法
        public synchronized void increment() {
            count++;
        }
        
        // 同步代码块
        public void decrement() {
            synchronized (this) {
                count--;
            }
        }
        
        public synchronized int getCount() {
            return count;
        }
    }
    
    static class Worker implements Runnable {
        private final Counter counter;
        private final boolean increment;
        
        public Worker(Counter counter, boolean increment) {
            this.counter = counter;
            this.increment = increment;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                if (increment) {
                    counter.increment();
                } else {
                    counter.decrement();
                }
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        
        Thread t1 = new Thread(new Worker(counter, true));
        Thread t2 = new Thread(new Worker(counter, true));
        Thread t3 = new Thread(new Worker(counter, false));
        
        t1.start();
        t2.start();
        t3.start();
        
        t1.join();
        t2.join();
        t3.join();
        
        System.out.println("Final count: " + counter.getCount()); // 应该是1000
    }
}

3.2 Lock 接口


import java.util.concurrent.locks.*;

public class LockExample {
    
    static class SharedResource {
        private final ReentrantLock lock = new ReentrantLock();
        private int value = 0;
        
        public void updateValue(int newValue) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " updating value");
                Thread.sleep(1000); // 模拟耗时操作
                value = newValue;
                System.out.println(Thread.currentThread().getName() + " updated value to: " + value);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                lock.unlock();
            }
        }
        
        // 使用 tryLock 避免死锁
        public boolean tryUpdateValue(int newValue) {
            if (lock.tryLock()) {
                try {
                    value = newValue;
                    return true;
                } finally {
                    lock.unlock();
                }
            }
            return false;
        }
    }
    
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();
        
        Runnable task = () -> {
            for (int i = 0; i < 3; i++) {
                resource.updateValue(i);
            }
        };
        
        Thread t1 = new Thread(task, "Thread-1");
        Thread t2 = new Thread(task, "Thread-2");
        
        t1.start();
        t2.start();
    }
}

3.3 ReadWriteLock


import java.util.concurrent.locks.*;

public class ReadWriteLockExample {
    
    static class DataStore {
        private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
        private final Lock readLock = rwLock.readLock();
        private final Lock writeLock = rwLock.writeLock();
        private String data = "Initial Data";
        
        public String readData() {
            readLock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " reading data");
                Thread.sleep(100); // 模拟读取耗时
                return data;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            } finally {
                readLock.unlock();
            }
        }
        
        public void writeData(String newData) {
            writeLock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " writing data");
                Thread.sleep(500); // 模拟写入耗时
                data = newData;
                System.out.println(Thread.currentThread().getName() + " wrote: " + newData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                writeLock.unlock();
            }
        }
    }
    
    public static void main(String[] args) {
        DataStore dataStore = new DataStore();
        
        // 创建多个读取线程
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    System.out.println("Read: " + dataStore.readData());
                }
            }, "Reader-" + i).start();
        }
        
        // 创建写入线程
        new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                dataStore.writeData("Data " + i);
            }
        }, "Writer").start();
    }
}

4. 线程间通信

4.1 wait() 和 notify()


public class WaitNotifyExample {
    
    static class Message {
        private String message;
        private boolean empty = true;
        
        public synchronized String read() {
            while (empty) {
                try {
                    wait();  // 等待消息可用
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            }
            empty = true;
            notifyAll();  // 通知写入线程
            return message;
        }
        
        public synchronized void write(String message) {
            while (!empty) {
                try {
                    wait();  // 等待消息被消费
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            empty = false;
            this.message = message;
            notifyAll();  // 通知读取线程
        }
    }
    
    static class Writer implements Runnable {
        private final Message message;
        
        public Writer(Message message) {
            this.message = message;
        }
        
        @Override
        public void run() {
            String[] messages = {"Message 1", "Message 2", "Message 3", "Message 4"};
            for (String msg : messages) {
                message.write(msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            message.write("FINISH");
        }
    }
    
    static class Reader implements Runnable {
        private final Message message;
        
        public Reader(Message message) {
            this.message = message;
        }
        
        @Override
        public void run() {
            for (String latestMessage = message.read(); 
                 !"FINISH".equals(latestMessage);
                 latestMessage = message.read()) {
                System.out.println("Received: " + latestMessage);
            }
            System.out.println("All messages received");
        }
    }
    
    public static void main(String[] args) {
        Message message = new Message();
        
        Thread writerThread = new Thread(new Writer(message), "Writer");
        Thread readerThread = new Thread(new Reader(message), "Reader");
        
        readerThread.start();
        writerThread.start();
    }
}

4.2 BlockingQueue


import java.util.concurrent.*;

public class BlockingQueueExample {
    
    static class Producer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final int count;
        
        public Producer(BlockingQueue<Integer> queue, int count) {
            this.queue = queue;
            this.count = count;
        }
        
        @Override
        public void run() {
            try {
                for (int i = 0; i < count; i++) {
                    System.out.println("Producing: " + i);
                    queue.put(i);  // 阻塞如果队列满
                    Thread.sleep(100);
                }
                queue.put(-1);  // 结束信号
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    static class Consumer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final String name;
        
        public Consumer(BlockingQueue<Integer> queue, String name) {
            this.queue = queue;
            this.name = name;
        }
        
        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();  // 阻塞如果队列空
                    if (item == -1) {
                        queue.put(-1);  // 放回结束信号给其他消费者
                        break;
                    }
                    System.out.println(name + " consuming: " + item);
                    Thread.sleep(200);
                }
                System.out.println(name + " finished");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        
        Thread producer = new Thread(new Producer(queue, 20), "Producer");
        Thread consumer1 = new Thread(new Consumer(queue, "Consumer-1"), "Consumer-1");
        Thread consumer2 = new Thread(new Consumer(queue, "Consumer-2"), "Consumer-2");
        
        producer.start();
        consumer1.start();
        consumer2.start();
    }
}

5. 并发工具类

5.1 CountDownLatch

 5.1.1什么是 CountDownLatch?

CountDownLatch 是 Java 并发包 (java.util.concurrent) 中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作后再继续执行。

核心机制:通过一个计数器来实现同步,计数器的初始值在创建时设定,每次调用 countDown() 方法计数器减1,当计数器值为0时,所有等待的线程将被释放。

 5.1.2基本用法

import java.util.concurrent.*;

public class CountDownLatchExample {
    
    static class Worker implements Runnable {
        private final CountDownLatch startSignal;
        private final CountDownLatch doneSignal;
        private final int workerId;
        
        public Worker(int workerId, CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.workerId = workerId;
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
        
        @Override
        public void run() {
            try {
                System.out.println("Worker " + workerId + " waiting to start");
                startSignal.await();  // 等待开始信号
                
                System.out.println("Worker " + workerId + " started working");
                Thread.sleep(1000 + workerId * 100);  // 模拟工作
                
                System.out.println("Worker " + workerId + " completed work");
                doneSignal.countDown();  // 通知完成
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        int workerCount = 5;
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(workerCount);
        
        // 创建工作线程
        for (int i = 0; i < workerCount; i++) {
            new Thread(new Worker(i, startSignal, doneSignal)).start();
        }
        
        System.out.println("Main thread preparing resources...");
        Thread.sleep(2000);  // 模拟资源准备
        
        System.out.println("Main thread starting all workers");
        startSignal.countDown();  // 释放所有工作线程
        
        doneSignal.await();  // 等待所有工作线程完成
        System.out.println("All workers completed, main thread continuing");
    }
}
5.1.3为什么要使用 CountDownLatch?
1. 线程协调与同步

// 场景:多个服务启动完成后,主服务才能启动
public class ServiceInitializer {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        
        // 启动多个服务
        new Thread(new DatabaseService(latch)).start();
        new Thread(new CacheService(latch)).start();
        new Thread(new ConfigService(latch)).start();
        
        // 等待所有服务启动完成
        latch.await();
        System.out.println("所有服务已就绪,启动主应用...");
    }
}
2. 并行任务处理

// 场景:并行处理多个任务,等待所有任务完成
public class ParallelProcessor {
    public void processTasks(List<Task> tasks) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(tasks.size());
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (Task task : tasks) {
            executor.submit(() -> {
                try {
                    task.process();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        // 等待所有任务完成
        latch.await();
        executor.shutdown();
        System.out.println("所有任务处理完成");
    }
}
3. 性能测试和基准测试

// 场景:模拟并发用户同时执行操作
public class ConcurrentTest {
    public static void main(String[] args) throws InterruptedException {
        int userCount = 100;
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(userCount);
        
        for (int i = 0; i < userCount; i++) {
            new Thread(new User(startSignal, doneSignal, "User-" + i)).start();
        }
        
        System.out.println("所有用户准备就绪...");
        Thread.sleep(1000); // 准备时间
        
        // 同时释放所有用户线程
        startSignal.countDown();
        
        // 等待所有用户完成操作
        doneSignal.await();
        System.out.println("所有用户操作完成");
    }
}

class User implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private final String name;
    
    public User(CountDownLatch startSignal, CountDownLatch doneSignal, String name) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
        this.name = name;
    }
    
    @Override
    public void run() {
        try {
            startSignal.await(); // 等待开始信号
            System.out.println(name + " 开始执行操作");
            // 执行具体操作...
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            doneSignal.countDown();
        }
    }
}
主要方法
方法 描述
CountDownLatch(int count) 构造函数,指定计数器的初始值
void await() 使当前线程等待,直到计数器为0
boolean await(long timeout, TimeUnit unit) 带超时的等待
void countDown() 计数器减1
long getCount() 返回当前计数
使用场景总结
  1. 启动顺序控制:确保某些服务在其他服务启动完成后再启动

  2. 并行计算:等待所有子任务完成后进行结果汇总

  3. 测试框架:协调多个测试线程的同步执行

  4. 资源初始化:等待所有资源加载完成后再继续

  5. 游戏开发:等待所有玩家准备就绪

注意事项
  • 一次性使用:CountDownLatch 的计数器不能被重置,如果需要重复使用,考虑使用 CyclicBarrier

  • 异常处理:确保在 finally 块中调用 countDown(),避免线程异常导致计数器无法减少

  • 避免死锁:合理设置超时时间,防止无限期等待

CountDownLatch 提供了一种简单而强大的线程同步机制,特别适用于"一个线程等待多个线程"或"多个线程互相等待"的场景。

5.2 CyclicBarrier

5.2.1什么是 CyclicBarrier?

CyclicBarrier 是 Java 并发包 (java.util.concurrent) 中的另一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个屏障点(barrier point)后才能继续执行。

核心机制:通过一个计数器来实现循环同步,当指定数量的线程都调用 await() 方法后,这些线程才会继续执行,同时屏障会重置以便下次使用。

5.2.2基本用法

import java.util.concurrent.*;

public class CyclicBarrierExample {
    
    static class MatrixProcessor {
        private final int[][] matrix;
        private final int[] results;
        private final CyclicBarrier barrier;
        
        public MatrixProcessor(int[][] matrix) {
            this.matrix = matrix;
            this.results = new int[matrix.length];
            this.barrier = new CyclicBarrier(matrix.length, () -> {
                // 所有线程到达屏障后执行
                int finalResult = 0;
                for (int result : results) {
                    finalResult += result;
                }
                System.out.println("All rows processed. Final sum: " + finalResult);
            });
        }
        
        public void process() {
            for (int i = 0; i < matrix.length; i++) {
                final int row = i;
                new Thread(() -> {
                    try {
                        // 处理行
                        int sum = 0;
                        for (int value : matrix[row]) {
                            sum += value;
                            Thread.sleep(10);  // 模拟处理时间
                        }
                        results[row] = sum;
                        System.out.println("Row " + row + " processed, sum: " + sum);
                        
                        // 等待其他行处理完成
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        Thread.currentThread().interrupt();
                    }
                }).start();
            }
        }
    }
    
    public static void main(String[] args) {
        int[][] matrix = {
            {1, 2, 3},
            {4, 5, 6},
            {7, 8, 9},
            {10, 11, 12}
        };
        
        MatrixProcessor processor = new MatrixProcessor(matrix);
        processor.process();
    }
}
5.2.3为什么要使用 CyclicBarrier?
1. 多阶段并行计算
// 场景:并行计算任务,需要多个阶段同步
public class ParallelComputation {
    public static void main(String[] args) {
        int threadCount = 4;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, 
            () -> System.out.println("=== 阶段完成,开始下一阶段 ==="));
        
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            executor.execute(new ComputationTask(barrier, i));
        }
        
        executor.shutdown();
    }
}

class ComputationTask implements Runnable {
    private final CyclicBarrier barrier;
    private final int taskId;
    
    public ComputationTask(CyclicBarrier barrier, int taskId) {
        this.barrier = barrier;
        this.taskId = taskId;
    }
    
    @Override
    public void run() {
        try {
            // 第一阶段计算
            System.out.println("任务 " + taskId + " 执行第一阶段计算");
            Thread.sleep(1000 + taskId * 100);
            barrier.await();
            
            // 第二阶段计算
            System.out.println("任务 " + taskId + " 执行第二阶段计算");
            Thread.sleep(800 + taskId * 100);
            barrier.await();
            
            // 第三阶段计算
            System.out.println("任务 " + taskId + " 执行第三阶段计算");
            Thread.sleep(600 + taskId * 100);
            barrier.await();
            
            System.out.println("任务 " + taskId + " 全部完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2. 复杂数据处理流水线

// 场景:数据分片处理,每个阶段需要同步
public class DataProcessingPipeline {
    public static void main(String[] args) {
        int dataPartitions = 3;
        CyclicBarrier barrier = new CyclicBarrier(dataPartitions, 
            () -> System.out.println("所有分区数据处理完成,进入下一阶段"));
        
        for (int i = 0; i < dataPartitions; i++) {
            new Thread(new DataProcessor(barrier, i)).start();
        }
    }
}

class DataProcessor implements Runnable {
    private final CyclicBarrier barrier;
    private final int partitionId;
    
    public DataProcessor(CyclicBarrier barrier, int partitionId) {
        this.barrier = barrier;
        this.partitionId = partitionId;
    }
    
    @Override
    public void run() {
        try {
            // 数据加载阶段
            System.out.println("分区 " + partitionId + " 加载数据");
            loadData();
            barrier.await();
            
            // 数据清洗阶段
            System.out.println("分区 " + partitionId + " 清洗数据");
            cleanData();
            barrier.await();
            
            // 数据分析阶段
            System.out.println("分区 " + partitionId + " 分析数据");
            analyzeData();
            barrier.await();
            
            // 结果输出阶段
            System.out.println("分区 " + partitionId + " 输出结果");
            outputResult();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void loadData() throws InterruptedException {
        Thread.sleep(500 + (long) (Math.random() * 1000));
    }
    
    private void cleanData() throws InterruptedException {
        Thread.sleep(300 + (long) (Math.random() * 800));
    }
    
    private void analyzeData() throws InterruptedException {
        Thread.sleep(700 + (long) (Math.random() * 1200));
    }
    
    private void outputResult() throws InterruptedException {
        Thread.sleep(200 + (long) (Math.random() * 500));
    }
}
游戏开发中的多玩家同步

// 场景:多玩家游戏,每个回合需要等待所有玩家完成操作
public class MultiplayerGame {
    public static void main(String[] args) {
        int playerCount = 4;
        CyclicBarrier roundBarrier = new CyclicBarrier(playerCount, 
            () -> System.out.println("=== 所有玩家完成操作,开始新回合 ==="));
        
        for (int i = 0; i < playerCount; i++) {
            new Thread(new Player(roundBarrier, "Player-" + (i + 1))).start();
        }
    }
}

class Player implements Runnable {
    private final CyclicBarrier barrier;
    private final String name;
    private int score = 0;
    
    public Player(CyclicBarrier barrier, String name) {
        this.barrier = barrier;
        this.name = name;
    }
    
    @Override
    public void run() {
        try {
            // 模拟多个游戏回合
            for (int round = 1; round <= 3; round++) {
                System.out.println(name + " 在第 " + round + " 回合中操作...");
                playRound();
                System.out.println(name + " 完成第 " + round + " 回合,等待其他玩家");
                barrier.await(); // 等待所有玩家完成当前回合
                
                // 屏障打开后,所有线程同时继续
                System.out.println(name + " 开始第 " + (round + 1) + " 回合准备");
            }
            
            System.out.println(name + " 游戏结束,最终得分: " + score);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void playRound() throws InterruptedException {
        Thread.sleep(1000 + (long) (Math.random() * 2000));
        score += (int) (Math.random() * 100);
    }
}
主要方法
方法 描述
CyclicBarrier(int parties) 创建屏障,指定参与线程数
CyclicBarrier(int parties, Runnable barrierAction) 创建屏障,指定参与线程数和屏障动作
int await() 等待所有线程到达屏障
int await(long timeout, TimeUnit unit) 带超时的等待
int getParties() 返回需要的线程数
int getNumberWaiting() 返回当前在屏障处等待的线程数
boolean isBroken() 查询屏障是否处于损坏状态
void reset() 重置屏障到初始状态
CyclicBarrier vs CountDownLatch
特性 CyclicBarrier CountDownLatch
重用性 ✅ 可重复使用 ❌ 一次性使用
计数器 自动重置 手动重置
等待机制 线程互相等待 一个或多个线程等待其他线程
使用场景 多阶段任务同步 一次性任务完成等待
使用场景总结
  1. 多阶段并行计算:将复杂计算分解为多个阶段,每个阶段需要同步

  2. 数据分片处理:处理分片数据,确保所有分片完成当前阶段后再进入下一阶段

  3. 游戏开发:多玩家游戏回合同步

  4. 模拟测试:多个测试线程需要同步执行不同阶段

  5. 迭代算法:需要多次迭代的并行算法,每次迭代后同步

注意事项
  • 可重复使用:CyclicBarrier 可以重复使用,每次所有线程到达屏障后会自动重置

  • 屏障动作:可以指定一个 Runnable 作为屏障动作,在所有线程到达屏障后、释放线程前执行

  • 异常处理:如果一个线程在等待时被中断或超时,其他等待线程会收到 BrokenBarrierException

  • 重置操作:可以通过 reset() 方法手动重置屏障,但会中断所有等待的线程

高级用法示例

// 复杂的分阶段并行处理
public class AdvancedCyclicBarrierExample {
    public static void main(String[] args) {
        int workerCount = 3;
        AtomicInteger phase = new AtomicInteger(1);
        
        CyclicBarrier barrier = new CyclicBarrier(workerCount, () -> {
            int currentPhase = phase.getAndIncrement();
            System.out.println("=== 阶段 " + currentPhase + " 完成 ===");
            
            if (currentPhase >= 3) {
                System.out.println("=== 所有处理阶段完成 ===");
            }
        });
        
        for (int i = 0; i < workerCount; i++) {
            new Thread(new AdvancedWorker(barrier, i)).start();
        }
    }
}

class AdvancedWorker implements Runnable {
    private final CyclicBarrier barrier;
    private final int workerId;
    
    public AdvancedWorker(CyclicBarrier barrier, int workerId) {
        this.barrier = barrier;
        this.workerId = workerId;
    }
    
    @Override
    public void run() {
        try {
            // 阶段1:数据准备
            prepareData();
            barrier.await();
            
            // 阶段2:数据处理
            processData();
            barrier.await();
            
            // 阶段3:结果汇总
            aggregateResults();
            barrier.await();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void prepareData() throws InterruptedException {
        System.out.println("Worker " + workerId + " 准备数据");
        Thread.sleep(1000);
    }
    
    private void processData() throws InterruptedException {
        System.out.println("Worker " + workerId + " 处理数据");
        Thread.sleep(1500);
    }
    
    private void aggregateResults() throws InterruptedException {
        System.out.println("Worker " + workerId + " 汇总结果");
        Thread.sleep(800);
    }
}

CyclicBarrier 提供了一种强大的多线程同步机制,特别适用于需要多个线程在多阶段任务中保持同步的场景,其可重复使用的特性使其在循环或迭代处理中非常有用。

5.3 Semaphore

5.3.1什么是 Semaphore?

Semaphore(信号量)是 Java 并发包 (java.util.concurrent) 中的同步工具类,用于控制同时访问特定资源的线程数量。它通过维护一组"许可证"(permits)来协调多线程对共享资源的访问。

核心机制:Semaphore 内部维护一个计数器,线程通过 acquire() 获取许可证(计数器减1),通过 release() 释放许可证(计数器加1)。当计数器为0时,尝试获取许可证的线程会被阻塞,直到有其他线程释放许可证。

5.3.2 基本用法

import java.util.concurrent.*;

public class SemaphoreExample {
    
    static class ConnectionPool {
        private final Semaphore semaphore;
        private final boolean[] connections;
        
        public ConnectionPool(int poolSize) {
            this.semaphore = new Semaphore(poolSize, true);  // 公平信号量
            this.connections = new boolean[poolSize];
        }
        
        public Integer getConnection() throws InterruptedException {
            semaphore.acquire();  // 获取许可
            return getAvailableConnection();
        }
        
        public void releaseConnection(int index) {
            if (releaseConnectionInternal(index)) {
                semaphore.release();  // 释放许可
            }
        }
        
        private synchronized Integer getAvailableConnection() {
            for (int i = 0; i < connections.length; i++) {
                if (!connections[i]) {
                    connections[i] = true;
                    System.out.println(Thread.currentThread().getName() + " acquired connection " + i);
                    return i;
                }
            }
            return null;  // 不应该发生
        }
        
        private synchronized boolean releaseConnectionInternal(int index) {
            if (connections[index]) {
                connections[index] = false;
                System.out.println(Thread.currentThread().getName() + " released connection " + index);
                return true;
            }
            return false;
        }
    }
    
    static class DatabaseUser implements Runnable {
        private final ConnectionPool pool;
        private final int userId;
        
        public DatabaseUser(ConnectionPool pool, int userId) {
            this.pool = pool;
            this.userId = userId;
        }
        
        @Override
        public void run() {
            try {
                Integer connection = pool.getConnection();
                if (connection != null) {
                    System.out.println("User " + userId + " using connection " + connection);
                    Thread.sleep(2000 + userId * 100);  // 模拟数据库操作
                    pool.releaseConnection(connection);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        ConnectionPool pool = new ConnectionPool(3);  // 只有3个连接
        
        // 创建10个用户线程竞争连接
        for (int i = 0; i < 10; i++) {
            new Thread(new DatabaseUser(pool, i), "User-" + i).start();
        }
    }
}
5.3.3为什么要使用 Semaphore?
1. 资源池管理

// 场景:数据库连接池
public class ConnectionPool {
    private final Semaphore semaphore;
    private final List<Connection> connections;
    
    public ConnectionPool(int poolSize) {
        this.semaphore = new Semaphore(poolSize);
        this.connections = new ArrayList<>();
        
        // 初始化连接池
        for (int i = 0; i < poolSize; i++) {
            connections.add(createConnection());
        }
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire(); // 获取访问权限
        return getAvailableConnection();
    }
    
    public void releaseConnection(Connection connection) {
        returnConnection(connection);
        semaphore.release(); // 释放访问权限
    }
    
    private Connection createConnection() {
        // 创建数据库连接
        return new MockConnection();
    }
    
    private synchronized Connection getAvailableConnection() {
        // 从池中获取可用连接
        return connections.remove(0);
    }
    
    private synchronized void returnConnection(Connection connection) {
        // 将连接返回池中
        connections.add(connection);
    }
}

// 使用连接池
public class ConnectionPoolExample {
    public static void main(String[] args) {
        ConnectionPool pool = new ConnectionPool(5);
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.execute(() -> {
                try {
                    Connection conn = pool.getConnection();
                    System.out.println("任务 " + taskId + " 获取连接,执行查询...");
                    Thread.sleep(1000);
                    pool.releaseConnection(conn);
                    System.out.println("任务 " + taskId + " 释放连接");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

2. 限流和流量控制

// 场景:API 限流
public class RateLimiter {
    private final Semaphore semaphore;
    private final ScheduledExecutorService scheduler;
    
    public RateLimiter(int permitsPerSecond) {
        this.semaphore = new Semaphore(permitsPerSecond);
        this.scheduler = Executors.newScheduledThreadPool(1);
        
        // 每秒重置许可证
        scheduler.scheduleAtFixedRate(() -> {
            int availablePermits = semaphore.availablePermits();
            int drainCount = permitsPerSecond - availablePermits;
            if (drainCount > 0) {
                semaphore.release(drainCount);
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
    
    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }
    
    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

// 使用限流器
public class APIRateLimitExample {
    public static void main(String[] args) {
        // 限制每秒最多 10 个请求
        RateLimiter rateLimiter = new RateLimiter(10);
        
        // 模拟大量请求
        for (int i = 0; i < 50; i++) {
            final int requestId = i;
            new Thread(() -> {
                if (rateLimiter.tryAcquire()) {
                    System.out.println("请求 " + requestId + " 被处理");
                    // 处理请求...
                } else {
                    System.out.println("请求 " + requestId + " 被限流");
                }
            }).start();
            
            try {
                Thread.sleep(100); // 模拟请求间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        rateLimiter.shutdown();
    }
}
3. 生产者-消费者模式

// 场景:有界缓冲区
public class BoundedBuffer<E> {
    private final Semaphore availableItems;     // 可消费的项目数
    private final Semaphore availableSpaces;    // 可用的空间数
    private final E[] buffer;
    private int putPosition = 0;
    private int takePosition = 0;
    
    @SuppressWarnings("unchecked")
    public BoundedBuffer(int capacity) {
        this.availableItems = new Semaphore(0);
        this.availableSpaces = new Semaphore(capacity);
        this.buffer = (E[]) new Object[capacity];
    }
    
    public void put(E item) throws InterruptedException {
        availableSpaces.acquire();  // 等待可用空间
        synchronized (this) {
            buffer[putPosition] = item;
            putPosition = (putPosition + 1) % buffer.length;
        }
        availableItems.release();   // 增加可消费项目
    }
    
    public E take() throws InterruptedException {
        availableItems.acquire();   // 等待可消费项目
        E item;
        synchronized (this) {
            item = buffer[takePosition];
            buffer[takePosition] = null;
            takePosition = (takePosition + 1) % buffer.length;
        }
        availableSpaces.release();  // 增加可用空间
        return item;
    }
}

// 测试生产者-消费者
public class ProducerConsumerExample {
    public static void main(String[] args) {
        BoundedBuffer<Integer> buffer = new BoundedBuffer<>(5);
        
        // 生产者
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    buffer.put(i);
                    System.out.println("生产: " + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    Integer item = buffer.take();
                    System.out.println("消费: " + item);
                    Thread.sleep(150);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
    }
}
主要方法
方法 描述
Semaphore(int permits) 创建指定许可证数量的信号量
Semaphore(int permits, boolean fair) 创建信号量,指定是否公平模式
void acquire() 获取一个许可证,阻塞直到可用
void acquire(int permits) 获取指定数量的许可证
boolean tryAcquire() 尝试获取许可证,立即返回结果
boolean tryAcquire(long timeout, TimeUnit unit) 带超时的尝试获取
void release() 释放一个许可证
void release(int permits) 释放指定数量的许可证
int availablePermits() 返回当前可用许可证数量
boolean isFair() 检查是否为公平模式
公平模式 vs 非公平模式

public class FairVsNonFairExample {
    public static void main(String[] args) {
        // 公平模式 - 按照请求顺序分配许可证
        Semaphore fairSemaphore = new Semaphore(1, true);
        
        // 非公平模式 - 可能插队
        Semaphore nonFairSemaphore = new Semaphore(1, false);
        
        testSemaphore("公平模式", fairSemaphore);
        // testSemaphore("非公平模式", nonFairSemaphore);
    }
    
    private static void testSemaphore(String mode, Semaphore semaphore) {
        System.out.println("=== " + mode + " ===");
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadId + " 尝试获取许可证");
                    semaphore.acquire();
                    System.out.println("线程 " + threadId + " 获得许可证");
                    Thread.sleep(1000);
                    System.out.println("线程 " + threadId + " 释放许可证");
                    semaphore.release();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
            
            // 稍微延迟启动,使线程按顺序创建
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
使用场景总结
1. 资源访问控制

// 控制对昂贵资源(如数据库连接、网络连接)的并发访问
public class ExpensiveResourceManager {
    private final Semaphore semaphore;
    private final Set<ExpensiveResource> resources;
    
    public ExpensiveResourceManager(int maxConcurrent) {
        this.semaphore = new Semaphore(maxConcurrent);
        this.resources = Collections.synchronizedSet(new HashSet<>());
    }
    
    public void useResource() throws InterruptedException {
        semaphore.acquire();
        try {
            ExpensiveResource resource = acquireResource();
            // 使用资源...
            resource.doSomething();
        } finally {
            releaseResource();
            semaphore.release();
        }
    }
}
2. 系统负载保护

// 防止系统过载,确保同时处理的请求不超过系统承载能力
public class LoadProtection {
    private final Semaphore loadSemaphore;
    private final int maxLoad;
    
    public LoadProtection(int maxLoad) {
        this.maxLoad = maxLoad;
        this.loadSemaphore = new Semaphore(maxLoad);
    }
    
    public boolean processRequest(Request request) {
        if (loadSemaphore.tryAcquire()) {
            try {
                // 处理请求
                return handleRequest(request);
            } finally {
                loadSemaphore.release();
            }
        } else {
            // 系统过载,拒绝请求
            return false;
        }
    }
    
    public double getCurrentLoad() {
        return (double) (maxLoad - loadSemaphore.availablePermits()) / maxLoad;
    }
}
3. 对象池实现

// 通用对象池实现
public class ObjectPool<T> {
    private final Semaphore semaphore;
    private final BlockingQueue<T> pool;
    private final Supplier<T> objectFactory;
    
    public ObjectPool(int size, Supplier<T> objectFactory) {
        this.semaphore = new Semaphore(size);
        this.pool = new LinkedBlockingQueue<>(size);
        this.objectFactory = objectFactory;
        
        // 初始化对象池
        for (int i = 0; i < size; i++) {
            pool.offer(objectFactory.get());
        }
    }
    
    public T borrowObject() throws InterruptedException {
        semaphore.acquire();
        return pool.take();
    }
    
    public void returnObject(T object) {
        if (object != null) {
            pool.offer(object);
            semaphore.release();
        }
    }
    
    public int getAvailableCount() {
        return semaphore.availablePermits();
    }
}
高级用法
1. 多资源类型管理

// 管理不同类型的资源,每种资源有不同的并发限制
public class MultiResourceManager {
    private final Map<String, Semaphore> resourceSemaphores;
    
    public MultiResourceManager(Map<String, Integer> resourceLimits) {
        this.resourceSemaphores = new ConcurrentHashMap<>();
        resourceLimits.forEach((resource, limit) -> 
            resourceSemaphores.put(resource, new Semaphore(limit))
        );
    }
    
    public boolean acquireResource(String resourceType) {
        Semaphore semaphore = resourceSemaphores.get(resourceType);
        if (semaphore != null) {
            return semaphore.tryAcquire();
        }
        return false;
    }
    
    public void releaseResource(String resourceType) {
        Semaphore semaphore = resourceSemaphores.get(resourceType);
        if (semaphore != null) {
            semaphore.release();
        }
    }
}
2. 动态调整许可证数量

// 支持运行时动态调整并发限制
public class DynamicSemaphore {
    private Semaphore semaphore;
    private int currentPermits;
    
    public DynamicSemaphore(int initialPermits) {
        this.currentPermits = initialPermits;
        this.semaphore = new Semaphore(initialPermits);
    }
    
    public synchronized void setPermits(int newPermits) {
        if (newPermits > currentPermits) {
            // 增加许可证
            int additional = newPermits - currentPermits;
            semaphore.release(additional);
        } else if (newPermits < currentPermits) {
            // 减少许可证 - 这个比较复杂,需要特殊处理
            adjustPermitsDown(newPermits);
        }
        currentPermits = newPermits;
    }
    
    private void adjustPermitsDown(int newPermits) {
        // 简化的实现:创建新的 Semaphore
        Semaphore oldSemaphore = semaphore;
        semaphore = new Semaphore(newPermits);
        // 在实际应用中,需要更复杂的逻辑来处理正在等待的线程
    }
    
    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
    
    public void release() {
        semaphore.release();
    }
}
注意事项
  1. 异常安全:始终在 finally 块中释放许可证,避免许可证泄露

  2. 公平性选择:根据场景选择公平或非公平模式,非公平模式通常有更好的吞吐量

  3. 资源清理:确保在不再需要时正确关闭资源

  4. 死锁预防:避免多个信号量之间的循环等待

  5. 性能考虑:在高并发场景下,考虑使用 tryAcquire() 而非阻塞的 acquire()

Semaphore 是一个强大的并发控制工具,特别适用于资源池管理、流量控制、系统保护等场景。正确使用 Semaphore 可以显著提高系统的稳定性和性能。

5.4 Phaser

5.4.1什么是 Phaser?

Phaser 是 Java 7 引入的一个强大的、可重用的同步屏障,它结合了 CountDownLatch 和 CyclicBarrier 的功能,并提供了更灵活的同步控制。Phaser 支持动态调整参与线程的数量,并且可以分多个阶段(phase)进行同步。

核心机制:Phaser 维护一个相位(phase)计数器,线程可以在每个阶段注册、到达、等待其他线程,并且可以动态地增加或减少参与同步的线程数量。

5.4.2基本用法

import java.util.concurrent.*;

public class PhaserExample {
    
    static class MultiPhaseTask implements Runnable {
        private final Phaser phaser;
        private final String name;
        
        public MultiPhaseTask(Phaser phaser, String name) {
            this.phaser = phaser;
            this.name = name;
            phaser.register();  // 注册到Phaser
        }
        
        @Override
        public void run() {
            // 阶段1
            System.out.println(name + " starting phase 1");
            doWork(1000);
            System.out.println(name + " completed phase 1");
            phaser.arriveAndAwaitAdvance();  // 等待其他线程完成阶段1
            
            // 阶段2
            System.out.println(name + " starting phase 2");
            doWork(1500);
            System.out.println(name + " completed phase 2");
            phaser.arriveAndAwaitAdvance();  // 等待其他线程完成阶段2
            
            // 阶段3
            System.out.println(name + " starting phase 3");
            doWork(500);
            System.out.println(name + " completed phase 3");
            phaser.arriveAndDeregister();  // 完成并注销
        }
        
        private void doWork(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1);  // 主线程注册
        
        // 创建任务
        for (int i = 0; i < 3; i++) {
            new Thread(new MultiPhaseTask(phaser, "Task-" + i)).start();
        }
        
        // 主线程等待所有阶段完成
        System.out.println("Main thread waiting for phase completion");
        phaser.arriveAndAwaitAdvance();  // 阶段1完成
        System.out.println("Phase 1 completed");
        
        phaser.arriveAndAwaitAdvance();  // 阶段2完成
        System.out.println("Phase 2 completed");
        
        phaser.arriveAndAwaitAdvance();  // 阶段3完成
        System.out.println("Phase 3 completed");
        
        System.out.println("All phases completed");
    }
}
5.4.3为什么要使用 Phaser?
1. 动态线程管理
// 场景:任务执行过程中可以动态增减参与线程
public class DynamicPhaserExample {
    public static void main(String[] args) {
        // 创建 Phaser,初始参与线程数为 1(主线程)
        Phaser phaser = new Phaser(1);
        
        // 启动 3 个工作线程
        for (int i = 0; i < 3; i++) {
            phaser.register(); // 注册新线程
            new Thread(new DynamicWorker(phaser, "Worker-" + i)).start();
        }
        
        // 主线程也参与同步
        System.out.println("主线程开始等待...");
        phaser.arriveAndAwaitAdvance();
        System.out.println("第一阶段完成,当前相位: " + phaser.getPhase());
        
        // 动态添加新线程
        phaser.register();
        new Thread(new DynamicWorker(phaser, "New-Worker")).start();
        
        phaser.arriveAndAwaitAdvance();
        System.out.println("第二阶段完成,当前相位: " + phaser.getPhase());
        
        // 主线程退出,减少参与线程数
        phaser.arriveAndDeregister();
    }
}

class DynamicWorker implements Runnable {
    private final Phaser phaser;
    private final String name;
    
    public DynamicWorker(Phaser phaser, String name) {
        this.phaser = phaser;
        this.name = name;
    }
    
    @Override
    public void run() {
        System.out.println(name + " 开始工作,注册到 Phaser");
        
        // 参与多个阶段
        for (int phase = 0; phase < 3; phase++) {
            System.out.println(name + " 执行阶段 " + phase);
            doWork(500 + (long) (Math.random() * 1000));
            System.out.println(name + " 完成阶段 " + phase + ",等待其他线程");
            
            // 到达屏障并等待其他线程
            phaser.arriveAndAwaitAdvance();
            
            // 模拟某些线程提前退出
            if (name.equals("Worker-1") && phase == 1) {
                System.out.println(name + " 提前退出");
                phaser.arriveAndDeregister();
                return;
            }
        }
        
        // 正常完成所有阶段后注销
        phaser.arriveAndDeregister();
        System.out.println(name + " 完成所有工作并注销");
    }
    
    private void doWork(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
2. 复杂多阶段任务协调

// 场景:复杂的多阶段数据处理流程
public class MultiStageProcessing {
    public static void main(String[] args) {
        int initialParties = 4;
        Phaser phaser = new Phaser(initialParties) {
            // 重写 onAdvance 方法,在每个阶段完成后执行特定操作
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=== 阶段 " + phase + " 完成 ===");
                System.out.println("注册线程数: " + registeredParties);
                
                // 如果阶段达到3或者没有注册的线程,则终止
                return phase >= 2 || registeredParties == 0;
            }
        };
        
        // 创建处理任务
        for (int i = 0; i < initialParties; i++) {
            new Thread(new DataProcessor(phaser, i)).start();
        }
        
        // 等待所有线程完成
        while (!phaser.isTerminated()) {
            phaser.arriveAndAwaitAdvance();
        }
        System.out.println("所有处理完成,Phaser 已终止");
    }
}

class DataProcessor implements Runnable {
    private final Phaser phaser;
    private final int processorId;
    
    public DataProcessor(Phaser phaser, int processorId) {
        this.phaser = phaser;
        this.processorId = processorId;
    }
    
    @Override
    public void run() {
        try {
            // 阶段 0: 数据加载
            System.out.println("处理器 " + processorId + " 加载数据");
            loadData();
            phaser.arriveAndAwaitAdvance();
            
            // 阶段 1: 数据处理
            System.out.println("处理器 " + processorId + " 处理数据");
            processData();
            phaser.arriveAndAwaitAdvance();
            
            // 阶段 2: 结果保存
            System.out.println("处理器 " + processorId + " 保存结果");
            saveResults();
            phaser.arriveAndDeregister(); // 完成所有阶段,注销
            
        } catch (Exception e) {
            e.printStackTrace();
            phaser.arriveAndDeregister(); // 发生异常时也要注销
        }
    }
    
    private void loadData() throws InterruptedException {
        Thread.sleep(800 + (long) (Math.random() * 700));
    }
    
    private void processData() throws InterruptedException {
        Thread.sleep(1200 + (long) (Math.random() * 1000));
    }
    
    private void saveResults() throws InterruptedException {
        Thread.sleep(500 + (long) (Math.random() * 500));
    }
}
3. 分层任务执行

// 场景:主任务和子任务的层次化同步
public class HierarchicalPhaserExample {
    public static void main(String[] args) {
        // 主 Phaser,协调整个任务流程
        Phaser mainPhaser = new Phaser(1);
        
        // 执行多个复杂任务,每个任务有自己的子 Phaser
        for (int taskId = 0; taskId < 3; taskId++) {
            System.out.println("开始执行任务 " + taskId);
            executeComplexTask(mainPhaser, taskId);
            mainPhaser.arriveAndAwaitAdvance();
            System.out.println("任务 " + taskId + " 完成");
        }
        
        mainPhaser.arriveAndDeregister();
        System.out.println("所有任务执行完成");
    }
    
    private static void executeComplexTask(Phaser parentPhaser, int taskId) {
        // 为每个任务创建子 Phaser
        Phaser taskPhaser = new Phaser(parentPhaser, 0) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("任务 " + taskId + " 阶段 " + phase + " 完成");
                return phase >= 2; // 执行3个阶段后终止
            }
        };
        
        // 启动子任务
        int subtaskCount = 2 + taskId; // 不同任务有不同数量的子任务
        for (int i = 0; i < subtaskCount; i++) {
            taskPhaser.register();
            new Thread(new Subtask(taskPhaser, taskId, i)).start();
        }
        
        // 等待子任务完成
        while (!taskPhaser.isTerminated()) {
            taskPhaser.arriveAndAwaitAdvance();
        }
    }
}

class Subtask implements Runnable {
    private final Phaser phaser;
    private final int taskId;
    private final int subtaskId;
    
    public Subtask(Phaser phaser, int taskId, int subtaskId) {
        this.phaser = phaser;
        this.taskId = taskId;
        this.subtaskId = subtaskId;
    }
    
    @Override
    public void run() {
        try {
            System.out.printf("子任务 [任务%d-%d] 开始%n", taskId, subtaskId);
            
            // 阶段 0
            phaseWork("准备", 300);
            phaser.arriveAndAwaitAdvance();
            
            // 阶段 1
            phaseWork("计算", 600);
            phaser.arriveAndAwaitAdvance();
            
            // 阶段 2
            phaseWork("清理", 200);
            phaser.arriveAndDeregister();
            
            System.out.printf("子任务 [任务%d-%d] 完成%n", taskId, subtaskId);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            phaser.arriveAndDeregister();
        }
    }
    
    private void phaseWork(String phaseName, long baseTime) throws InterruptedException {
        long workTime = baseTime + (long) (Math.random() * 500);
        System.out.printf("子任务 [任务%d-%d] 执行 %s 阶段 (%d ms)%n", 
                         taskId, subtaskId, phaseName, workTime);
        Thread.sleep(workTime);
    }
}
主要方法
方法 描述
Phaser() 创建 Phaser,初始参与数为 0
Phaser(int parties) 创建指定初始参与数的 Phaser
Phaser(Phaser parent) 创建父 Phaser 的子 Phaser
Phaser(Phaser parent, int parties) 创建指定父 Phaser 和初始参与数的 Phaser
int register() 注册一个新参与者,返回当前相位
int arrive() 到达屏障,但不等待,返回当前相位
int arriveAndAwaitAdvance() 到达屏障并等待其他参与者
int arriveAndDeregister() 到达屏障并注销自己
int awaitAdvance(int phase) 等待相位前进到指定值
int getPhase() 返回当前相位
int getRegisteredParties() 返回注册的参与者数量
boolean isTerminated() 检查 Phaser 是否已终止
高级特性
1. 分层 Phaser

// 创建 Phaser 树,减少竞争
public class PhaserTreeExample {
    public static void main(String[] args) {
        Phaser rootPhaser = new Phaser(1);
        
        // 创建多个子 Phaser
        for (int i = 0; i < 3; i++) {
            Phaser childPhaser = new Phaser(rootPhaser, 0);
            startChildTasks(childPhaser, i, 2);
        }
        
        // 根 Phaser 等待所有子 Phaser 完成
        rootPhaser.arriveAndAwaitAdvance();
        System.out.println("所有子任务完成");
        rootPhaser.arriveAndDeregister();
    }
    
    private static void startChildTasks(Phaser phaser, int groupId, int taskCount) {
        for (int i = 0; i < taskCount; i++) {
            phaser.register();
            new Thread(new TreeTask(phaser, groupId, i)).start();
        }
    }
}

class TreeTask implements Runnable {
    private final Phaser phaser;
    private final int groupId;
    private final int taskId;
    
    public TreeTask(Phaser phaser, int groupId, int taskId) {
        this.phaser = phaser;
        this.groupId = groupId;
        this.taskId = taskId;
    }
    
    @Override
    public void run() {
        System.out.printf("任务 [组%d-%d] 开始%n", groupId, taskId);
        
        for (int phase = 0; phase < 2; phase++) {
            doWork(phase);
            phaser.arriveAndAwaitAdvance();
        }
        
        phaser.arriveAndDeregister();
        System.out.printf("任务 [组%d-%d] 完成%n", groupId, taskId);
    }
    
    private void doWork(int phase) {
        try {
            Thread.sleep(500 + (long) (Math.random() * 500));
            System.out.printf("任务 [组%d-%d] 完成阶段 %d%n", groupId, taskId, phase);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
2. 自定义终止条件

// 通过重写 onAdvance 方法自定义终止条件
public class CustomTerminationPhaser {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3) {
            private int successCount = 0;
            
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("阶段 " + phase + " 完成,成功计数: " + successCount);
                
                // 自定义终止条件:达到3个阶段或者成功次数超过5次
                if (phase >= 2 || successCount >= 5) {
                    System.out.println("Phaser 终止");
                    return true; // 终止 Phaser
                }
                return false; // 继续下一个阶段
            }
            
            public void incrementSuccess() {
                successCount++;
            }
        };
        
        for (int i = 0; i < 3; i++) {
            new Thread(new CustomTask(phaser, i)).start();
        }
    }
}

class CustomTask implements Runnable {
    private final CustomTerminationPhaser phaser;
    private final int taskId;
    
    public CustomTask(Phaser phaser, int taskId) {
        this.phaser = (CustomTerminationPhaser) phaser;
        this.taskId = taskId;
    }
    
    @Override
    public void run() {
        Random random = new Random();
        
        while (!phaser.isTerminated()) {
            System.out.println("任务 " + taskId + " 执行工作");
            
            // 模拟工作,可能成功或失败
            boolean success = random.nextBoolean();
            if (success) {
                phaser.incrementSuccess();
                System.out.println("任务 " + taskId + " 成功");
            }
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            
            phaser.arriveAndAwaitAdvance();
        }
        
        System.out.println("任务 " + taskId + " 退出");
    }
}
Phaser vs CountDownLatch vs CyclicBarrier
特性 Phaser CountDownLatch CyclicBarrier
重用性 ✅ 可重复使用 ❌ 一次性使用 ✅ 可重复使用
动态调整 ✅ 支持动态注册/注销 ❌ 固定数量 ❌ 固定数量
分层结构 ✅ 支持父子 Phaser ❌ 不支持 ❌ 不支持
阶段数量 🔄 无限阶段 ❌ 单阶段 🔄 固定阶段循环
灵活性 ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐
使用场景总结
  1. 动态任务执行:任务执行过程中可以动态增减参与线程

  2. 复杂工作流:多阶段、多层次的任务协调

  3. 游戏开发:多玩家游戏的回合制同步

  4. 科学计算:分阶段并行算法

  5. 数据处理:复杂的数据处理流水线

注意事项

  1. 性能考虑:Phaser 比 CountDownLatch 和 CyclicBarrier 更重量级,在简单场景下可能过度复杂

  2. 异常处理:确保在异常情况下正确调用 arriveAndDeregister() 避免线程泄露

  3. 终止检测:使用 isTerminated() 检查 Phaser 是否已终止

  4. 分层优化:对于大量线程,使用分层 Phaser 可以减少竞争

Phaser 提供了最灵活的同步控制,特别适用于复杂的、动态的多阶段同步场景。虽然学习曲线较陡峭,但在合适的场景下能提供强大的同步能力。

6. 线程池

6.1 ExecutorService


import java.util.concurrent.*;
import java.util.*;

public class ThreadPoolExample {
    
    static class ComputationTask implements Callable<Double> {
        private final int taskId;
        
        public ComputationTask(int taskId) {
            this.taskId = taskId;
        }
        
        @Override
        public Double call() throws Exception {
            System.out.println("Task " + taskId + " started by " + Thread.currentThread().getName());
            
            // 模拟计算密集型任务
            double result = 0;
            for (int i = 0; i < 1000000; i++) {
                result += Math.sin(i) * Math.cos(i);
            }
            
            System.out.println("Task " + taskId + " completed");
            return result;
        }
    }
    
    public static void main(String[] args) throws Exception {
        // 创建固定大小线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(4);
        
        // 创建任务列表
        List<ComputationTask> tasks = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tasks.add(new ComputationTask(i));
        }
        
        // 提交所有任务并获取Future列表
        System.out.println("Submitting tasks...");
        List<Future<Double>> futures = fixedPool.invokeAll(tasks);
        
        // 处理结果
        System.out.println("Processing results...");
        for (int i = 0; i < futures.size(); i++) {
            try {
                Double result = futures.get(i).get();
                System.out.println("Task " + i + " result: " + result);
            } catch (ExecutionException e) {
                System.out.println("Task " + i + " failed: " + e.getCause());
            }
        }
        
        // 关闭线程池
        fixedPool.shutdown();
        
        // 等待线程池终止
        if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
            System.out.println("Forcing shutdown...");
            fixedPool.shutdownNow();
        }
        
        System.out.println("All tasks completed");
    }
}

6.2 ScheduledExecutorService


import java.util.concurrent.*;
import java.time.*;

public class ScheduledExecutorExample {
    
    static class ScheduledTask implements Runnable {
        private final String name;
        private int count = 0;
        
        public ScheduledTask(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            count++;
            System.out.println(LocalTime.now() + " - " + name + " executed " + count + " times");
            
            // 模拟任务执行时间
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        
        System.out.println("Starting scheduled tasks at: " + LocalTime.now());
        
        // 固定延迟执行(每次执行结束后延迟固定时间再执行)
        ScheduledTask fixedDelayTask = new ScheduledTask("FixedDelayTask");
        scheduler.scheduleWithFixedDelay(fixedDelayTask, 0, 3, TimeUnit.SECONDS);
        
        // 固定频率执行(按照固定频率执行,不考虑任务执行时间)
        ScheduledTask fixedRateTask = new ScheduledTask("FixedRateTask");
        scheduler.scheduleAtFixedRate(fixedRateTask, 0, 3, TimeUnit.SECONDS);
        
        // 单次延迟执行
        scheduler.schedule(() -> {
            System.out.println(LocalTime.now() + " - One-time task executed");
        }, 10, TimeUnit.SECONDS);
        
        // 运行一段时间后关闭
        Thread.sleep(20000);
        System.out.println("Shutting down scheduler...");
        scheduler.shutdown();
        
        if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
            scheduler.shutdownNow();
        }
    }
}

7. 并发集合

7.1 ConcurrentHashMap


import java.util.concurrent.*;
import java.util.*;

public class ConcurrentHashMapExample {
    
    static class CacheManager {
        private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, AtomicInteger> accessCount = new ConcurrentHashMap<>();
        
        public void put(String key, String value) {
            cache.put(key, value);
            accessCount.putIfAbsent(key, new AtomicInteger(0));
        }
        
        public String get(String key) {
            String value = cache.get(key);
            if (value != null) {
                accessCount.get(key).incrementAndGet();
            }
            return value;
        }
        
        public void printStatistics() {
            System.out.println("Cache Statistics:");
            cache.forEach((key, value) -> {
                int count = accessCount.get(key).get();
                System.out.println("Key: " + key + ", Access Count: " + count);
            });
        }
    }
    
    static class CacheUser implements Runnable {
        private final CacheManager cache;
        private final String[] keys;
        
        public CacheUser(CacheManager cache, String[] keys) {
            this.cache = cache;
            this.keys = keys;
        }
        
        @Override
        public void run() {
            Random random = new Random();
            for (int i = 0; i < 100; i++) {
                String key = keys[random.nextInt(keys.length)];
                String value = cache.get(key);
                if (value != null) {
                    // 模拟使用缓存值
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        CacheManager cache = new CacheManager();
        
        // 初始化缓存
        String[] keys = {"user:1", "user:2", "user:3", "config:1", "config:2"};
        for (String key : keys) {
            cache.put(key, "Value for " + key);
        }
        
        // 创建多个线程访问缓存
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new CacheUser(cache, keys), "CacheUser-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        // 打印统计信息
        cache.printStatistics();
    }
}

7.2 CopyOnWriteArrayList


import java.util.concurrent.*;
import java.util.*;

public class CopyOnWriteExample {
    
    static class EventManager {
        private final CopyOnWriteArrayList<EventListener> listeners = 
            new CopyOnWriteArrayList<>();
        
        public void addListener(EventListener listener) {
            listeners.add(listener);
            System.out.println("Listener added. Total listeners: " + listeners.size());
        }
        
        public void removeListener(EventListener listener) {
            listeners.remove(listener);
            System.out.println("Listener removed. Total listeners: " + listeners.size());
        }
        
        public void fireEvent(String event) {
            // 遍历时对原列表的修改不会影响当前迭代
            for (EventListener listener : listeners) {
                listener.onEvent(event);
            }
        }
    }
    
    interface EventListener {
        void onEvent(String event);
    }
    
    static class LoggingListener implements EventListener {
        private final String name;
        
        public LoggingListener(String name) {
            this.name = name;
        }
        
        @Override
        public void onEvent(String event) {
            System.out.println(name + " received: " + event);
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        EventManager manager = new EventManager();
        
        // 创建监听器
        LoggingListener listener1 = new LoggingListener("Listener-1");
        LoggingListener listener2 = new LoggingListener("Listener-2");
        LoggingListener listener3 = new LoggingListener("Listener-3");
        
        // 在事件触发过程中动态添加/移除监听器
        Thread eventThread = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                manager.fireEvent("Event " + i);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        
        Thread modificationThread = new Thread(() -> {
            try {
                Thread.sleep(800);
                manager.addListener(listener2);
                
                Thread.sleep(800);
                manager.addListener(listener3);
                
                Thread.sleep(800);
                manager.removeListener(listener1);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 初始添加一个监听器
        manager.addListener(listener1);
        
        eventThread.start();
        modificationThread.start();
        
        eventThread.join();
        modificationThread.join();
    }
}

8. 原子变量


import java.util.concurrent.atomic.*;
import java.util.*;

public class AtomicExample {
    
    static class AtomicCounter {
        private final AtomicInteger count = new AtomicInteger(0);
        private final AtomicLong total = new AtomicLong(0);
        private final AtomicReference<String> status = new AtomicReference<>("STOPPED");
        
        public void increment() {
            count.incrementAndGet();
        }
        
        public void add(int value) {
            total.addAndGet(value);
        }
        
        public boolean start() {
            return status.compareAndSet("STOPPED", "RUNNING");
        }
        
        public boolean stop() {
            return status.compareAndSet("RUNNING", "STOPPED");
        }
        
        public int getCount() {
            return count.get();
        }
        
        public long getTotal() {
            return total.get();
        }
        
        public String getStatus() {
            return status.get();
        }
    }
    
    static class CounterUser implements Runnable {
        private final AtomicCounter counter;
        private final Random random = new Random();
        
        public CounterUser(AtomicCounter counter) {
            this.counter = counter;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                counter.increment();
                counter.add(random.nextInt(100));
                
                // 偶尔尝试改变状态
                if (i % 200 == 0) {
                    if (counter.getStatus().equals("STOPPED")) {
                        counter.start();
                    } else {
                        counter.stop();
                    }
                }
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        AtomicCounter counter = new AtomicCounter();
        
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new CounterUser(counter), "Worker-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("Final count: " + counter.getCount());
        System.out.println("Final total: " + counter.getTotal());
        System.out.println("Final status: " + counter.getStatus());
        
        // 验证原子性
        int expectedCount = 10 * 1000;  // 10个线程,每个1000次
        System.out.println("Count is correct: " + (counter.getCount() == expectedCount));
    }
}

9. 最佳实践和注意事项

9.1 避免死锁


public class DeadlockPrevention {
    
    static class Account {
        private final String name;
        private int balance;
        
        public Account(String name, int balance) {
            this.name = name;
            this.balance = balance;
        }
        
        public void debit(int amount) {
            balance -= amount;
        }
        
        public void credit(int amount) {
            balance += amount;
        }
        
        public String getName() {
            return name;
        }
    }
    
    static class AccountService {
        // 按固定顺序获取锁来避免死锁
        public boolean transfer(Account from, Account to, int amount) {
            // 确定锁的顺序
            Account firstLock = from.getName().compareTo(to.getName()) < 0 ? from : to;
            Account secondLock = from.getName().compareTo(to.getName()) < 0 ? to : from;
            
            synchronized (firstLock) {
                synchronized (secondLock) {
                    if (from.balance >= amount) {
                        from.debit(amount);
                        to.credit(amount);
                        System.out.println("Transferred " + amount + " from " + 
                                         from.getName() + " to " + to.getName());
                        return true;
                    }
                    return false;
                }
            }
        }
        
        // 使用tryLock避免死锁
        public boolean transferWithTryLock(Account from, Account to, int amount) 
                throws InterruptedException {
            
            // 尝试在有限时间内获取两个锁
            long stopTime = System.nanoTime() + 5000000000L; // 5秒超时
            
            while (System.nanoTime() < stopTime) {
                if (Thread.currentThread().getName().equals(from.getName())) {
                    synchronized (from) {
                        if (Thread.holdsLock(to)) {
                            // 执行转账
                            if (from.balance >= amount) {
                                from.debit(amount);
                                to.credit(amount);
                                System.out.println("Transferred " + amount + " from " + 
                                                 from.getName() + " to " + to.getName());
                                return true;
                            }
                            return false;
                        }
                    }
                } else {
                    synchronized (to) {
                        if (Thread.holdsLock(from)) {
                            // 执行转账
                            if (from.balance >= amount) {
                                from.debit(amount);
                                to.credit(amount);
                                System.out.println("Transferred " + amount + " from " + 
                                                 from.getName() + " to " + to.getName());
                                return true;
                            }
                            return false;
                        }
                    }
                }
                Thread.sleep(10); // 短暂休眠后重试
            }
            throw new RuntimeException("Transfer timeout");
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Account account1 = new Account("Account-A", 1000);
        Account account2 = new Account("Account-B", 1000);
        AccountService service = new AccountService();
        
        // 模拟并发转账
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                service.transfer(account1, account2, 10);
            }
        }, "Account-A");
        
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                service.transfer(account2, account1, 5);
            }
        }, "Account-B");
        
        t1.start();
        t2.start();
        
        t1.join();
        t2.join();
        
        System.out.println("Final balance - " + account1.getName() + ": " + 
                          account1.balance + ", " + account2.getName() + ": " + account2.balance);
    }
}

9.2 性能考虑


import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class PerformanceComparison {
    private static final int ITERATIONS = 1000000;
    private static final int THREAD_COUNT = 4;
    
    static class SynchronizedCounter {
        private long count = 0;
        
        public synchronized void increment() {
            count++;
        }
        
        public synchronized long getCount() {
            return count;
        }
    }
    
    static class LockCounter {
        private final ReentrantLock lock = new ReentrantLock();
        private long count = 0;
        
        public void increment() {
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }
        
        public long getCount() {
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }
    }
    
    static class AtomicCounter {
        private final AtomicLong count = new AtomicLong(0);
        
        public void increment() {
            count.incrementAndGet();
        }
        
        public long getCount() {
            return count.get();
        }
    }
    
    static class CounterTask implements Runnable {
        private final Runnable incrementAction;
        private final int iterations;
        
        public CounterTask(Runnable incrementAction, int iterations) {
            this.incrementAction = incrementAction;
            this.iterations = iterations;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < iterations; i++) {
                incrementAction.run();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 测试synchronized
        SynchronizedCounter syncCounter = new SynchronizedCounter();
        runBenchmark("Synchronized", () -> syncCounter.increment());
        
        // 测试Lock
        LockCounter lockCounter = new LockCounter();
        runBenchmark("Lock", () -> lockCounter.increment());
        
        // 测试Atomic
        AtomicCounter atomicCounter = new AtomicCounter();
        runBenchmark("Atomic", () -> atomicCounter.increment());
    }
    
    private static void runBenchmark(String name, Runnable incrementAction) 
            throws InterruptedException {
        
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.submit(() -> {
                for (int j = 0; j < ITERATIONS / THREAD_COUNT; j++) {
                    incrementAction.run();
                }
                latch.countDown();
            });
        }
        
        latch.await();
        long endTime = System.currentTimeMillis();
        
        executor.shutdown();
        
        System.out.println(name + " counter time: " + (endTime - startTime) + "ms");
    }
}

总结

Java并发编程涉及多个重要概念和工具:

  1. 线程基础:Thread、Runnable、Callable

  2. 同步机制:synchronized、Lock、ReadWriteLock

  3. 线程通信:wait/notify、BlockingQueue

  4. 并发工具:CountDownLatch、CyclicBarrier、Semaphore、Phaser

  5. 线程池:ExecutorService、ScheduledExecutorService

  6. 并发集合:ConcurrentHashMap、CopyOnWriteArrayList

  7. 原子变量:AtomicInteger、AtomicLong、AtomicReference

在实际开发中,应根据具体场景选择合适的并发工具,并注意线程安全、性能和死锁预防等问题。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐