Java 并发编程详解与示例
本文详细介绍了Java并发编程的核心概念与实践。内容涵盖:1)线程基础(Thread/Runnable/Callable创建方式);2)同步机制(synchronized/Lock/ReadWriteLock);3)线程通信(wait-notify/BlockingQueue);4)并发工具类(CountDownLatch/CyclicBarrier/Semaphore/Phaser);5)线程池
Java 并发编程详解与示例
1. 并发基础概念
1.1 进程与线程
-
进程:操作系统资源分配的基本单位
-
线程:CPU调度的基本单位,共享进程资源
1.2 Java 内存模型 (JMM)
public class MemoryModelExample {
private static boolean flag = false;
private static int number = 0;
public static void main(String[] args) throws InterruptedException {
Thread writer = new Thread(() -> {
number = 42;
flag = true; // 可能发生指令重排序
});
Thread reader = new Thread(() -> {
while (!flag) {
// 循环等待
}
System.out.println("Number: " + number); // 可能看到0
});
reader.start();
Thread.sleep(100);
writer.start();
}
}
2. 线程创建与管理
2.1 继承 Thread 类
public class MyThread extends Thread {
private final String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + " - Count: " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(name + " interrupted");
return;
}
}
}
public static void main(String[] args) throws InterruptedException {
MyThread t1 = new MyThread("Thread-1");
MyThread t2 = new MyThread("Thread-2");
t1.start();
t2.start();
t1.join(); // 等待线程结束
t2.join();
System.out.println("All threads completed");
}
}
2.2 实现 Runnable 接口
public class MyRunnable implements Runnable {
private final String name;
private volatile boolean running = true;
public MyRunnable(String name) {
this.name = name;
}
public void stop() {
running = false;
}
@Override
public void run() {
int count = 0;
while (running && count < 10) {
System.out.println(name + " - Working: " + count++);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(name + " interrupted");
break;
}
}
System.out.println(name + " finished");
}
public static void main(String[] args) throws InterruptedException {
MyRunnable r1 = new MyRunnable("Worker-1");
MyRunnable r2 = new MyRunnable("Worker-2");
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
Thread.sleep(3000);
r1.stop(); // 优雅停止线程
t1.join();
t2.join();
}
}
2.3 使用 Callable 和 Future
import java.util.concurrent.*;
public class CallableExample {
static class ComputationTask implements Callable<Long> {
private final int number;
public ComputationTask(int number) {
this.number = number;
}
@Override
public Long call() throws Exception {
System.out.println("Computing factorial of " + number);
long result = factorial(number);
System.out.println("Completed factorial of " + number);
return result;
}
private long factorial(int n) {
if (n <= 1) return 1;
long result = 1;
for (int i = 2; i <= n; i++) {
result *= i;
try {
Thread.sleep(100); // 模拟计算时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return result;
}
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Long> future1 = executor.submit(new ComputationTask(10));
Future<Long> future2 = executor.submit(new ComputationTask(15));
System.out.println("Tasks submitted, waiting for results...");
// 非阻塞方式检查结果
while (!future1.isDone() || !future2.isDone()) {
System.out.println("Still calculating...");
Thread.sleep(500);
}
Long result1 = future1.get();
Long result2 = future2.get();
System.out.println("Factorial of 10: " + result1);
System.out.println("Factorial of 15: " + result2);
executor.shutdown();
}
}
3. 线程同步
3.1 synchronized 关键字
public class SynchronizedExample {
static class Counter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
// 同步代码块
public void decrement() {
synchronized (this) {
count--;
}
}
public synchronized int getCount() {
return count;
}
}
static class Worker implements Runnable {
private final Counter counter;
private final boolean increment;
public Worker(Counter counter, boolean increment) {
this.counter = counter;
this.increment = increment;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
if (increment) {
counter.increment();
} else {
counter.decrement();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(new Worker(counter, true));
Thread t2 = new Thread(new Worker(counter, true));
Thread t3 = new Thread(new Worker(counter, false));
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
System.out.println("Final count: " + counter.getCount()); // 应该是1000
}
}
3.2 Lock 接口
import java.util.concurrent.locks.*;
public class LockExample {
static class SharedResource {
private final ReentrantLock lock = new ReentrantLock();
private int value = 0;
public void updateValue(int newValue) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " updating value");
Thread.sleep(1000); // 模拟耗时操作
value = newValue;
System.out.println(Thread.currentThread().getName() + " updated value to: " + value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
// 使用 tryLock 避免死锁
public boolean tryUpdateValue(int newValue) {
if (lock.tryLock()) {
try {
value = newValue;
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Runnable task = () -> {
for (int i = 0; i < 3; i++) {
resource.updateValue(i);
}
};
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
t1.start();
t2.start();
}
}
3.3 ReadWriteLock
import java.util.concurrent.locks.*;
public class ReadWriteLockExample {
static class DataStore {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private String data = "Initial Data";
public String readData() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " reading data");
Thread.sleep(100); // 模拟读取耗时
return data;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
readLock.unlock();
}
}
public void writeData(String newData) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " writing data");
Thread.sleep(500); // 模拟写入耗时
data = newData;
System.out.println(Thread.currentThread().getName() + " wrote: " + newData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
}
}
public static void main(String[] args) {
DataStore dataStore = new DataStore();
// 创建多个读取线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (int j = 0; j < 3; j++) {
System.out.println("Read: " + dataStore.readData());
}
}, "Reader-" + i).start();
}
// 创建写入线程
new Thread(() -> {
for (int i = 0; i < 3; i++) {
dataStore.writeData("Data " + i);
}
}, "Writer").start();
}
}
4. 线程间通信
4.1 wait() 和 notify()
public class WaitNotifyExample {
static class Message {
private String message;
private boolean empty = true;
public synchronized String read() {
while (empty) {
try {
wait(); // 等待消息可用
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
empty = true;
notifyAll(); // 通知写入线程
return message;
}
public synchronized void write(String message) {
while (!empty) {
try {
wait(); // 等待消息被消费
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
empty = false;
this.message = message;
notifyAll(); // 通知读取线程
}
}
static class Writer implements Runnable {
private final Message message;
public Writer(Message message) {
this.message = message;
}
@Override
public void run() {
String[] messages = {"Message 1", "Message 2", "Message 3", "Message 4"};
for (String msg : messages) {
message.write(msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
message.write("FINISH");
}
}
static class Reader implements Runnable {
private final Message message;
public Reader(Message message) {
this.message = message;
}
@Override
public void run() {
for (String latestMessage = message.read();
!"FINISH".equals(latestMessage);
latestMessage = message.read()) {
System.out.println("Received: " + latestMessage);
}
System.out.println("All messages received");
}
}
public static void main(String[] args) {
Message message = new Message();
Thread writerThread = new Thread(new Writer(message), "Writer");
Thread readerThread = new Thread(new Reader(message), "Reader");
readerThread.start();
writerThread.start();
}
}
4.2 BlockingQueue
import java.util.concurrent.*;
public class BlockingQueueExample {
static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int count;
public Producer(BlockingQueue<Integer> queue, int count) {
this.queue = queue;
this.count = count;
}
@Override
public void run() {
try {
for (int i = 0; i < count; i++) {
System.out.println("Producing: " + i);
queue.put(i); // 阻塞如果队列满
Thread.sleep(100);
}
queue.put(-1); // 结束信号
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
public Consumer(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 阻塞如果队列空
if (item == -1) {
queue.put(-1); // 放回结束信号给其他消费者
break;
}
System.out.println(name + " consuming: " + item);
Thread.sleep(200);
}
System.out.println(name + " finished");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producer = new Thread(new Producer(queue, 20), "Producer");
Thread consumer1 = new Thread(new Consumer(queue, "Consumer-1"), "Consumer-1");
Thread consumer2 = new Thread(new Consumer(queue, "Consumer-2"), "Consumer-2");
producer.start();
consumer1.start();
consumer2.start();
}
}
5. 并发工具类
5.1 CountDownLatch
5.1.1什么是 CountDownLatch?
CountDownLatch 是 Java 并发包 (java.util.concurrent
) 中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作后再继续执行。
核心机制:通过一个计数器来实现同步,计数器的初始值在创建时设定,每次调用 countDown()
方法计数器减1,当计数器值为0时,所有等待的线程将被释放。
5.1.2基本用法
import java.util.concurrent.*;
public class CountDownLatchExample {
static class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final int workerId;
public Worker(int workerId, CountDownLatch startSignal, CountDownLatch doneSignal) {
this.workerId = workerId;
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
System.out.println("Worker " + workerId + " waiting to start");
startSignal.await(); // 等待开始信号
System.out.println("Worker " + workerId + " started working");
Thread.sleep(1000 + workerId * 100); // 模拟工作
System.out.println("Worker " + workerId + " completed work");
doneSignal.countDown(); // 通知完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(workerCount);
// 创建工作线程
for (int i = 0; i < workerCount; i++) {
new Thread(new Worker(i, startSignal, doneSignal)).start();
}
System.out.println("Main thread preparing resources...");
Thread.sleep(2000); // 模拟资源准备
System.out.println("Main thread starting all workers");
startSignal.countDown(); // 释放所有工作线程
doneSignal.await(); // 等待所有工作线程完成
System.out.println("All workers completed, main thread continuing");
}
}
5.1.3为什么要使用 CountDownLatch?
1. 线程协调与同步
// 场景:多个服务启动完成后,主服务才能启动
public class ServiceInitializer {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
// 启动多个服务
new Thread(new DatabaseService(latch)).start();
new Thread(new CacheService(latch)).start();
new Thread(new ConfigService(latch)).start();
// 等待所有服务启动完成
latch.await();
System.out.println("所有服务已就绪,启动主应用...");
}
}
2. 并行任务处理
// 场景:并行处理多个任务,等待所有任务完成
public class ParallelProcessor {
public void processTasks(List<Task> tasks) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(tasks.size());
ExecutorService executor = Executors.newFixedThreadPool(5);
for (Task task : tasks) {
executor.submit(() -> {
try {
task.process();
} finally {
latch.countDown();
}
});
}
// 等待所有任务完成
latch.await();
executor.shutdown();
System.out.println("所有任务处理完成");
}
}
3. 性能测试和基准测试
// 场景:模拟并发用户同时执行操作
public class ConcurrentTest {
public static void main(String[] args) throws InterruptedException {
int userCount = 100;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(userCount);
for (int i = 0; i < userCount; i++) {
new Thread(new User(startSignal, doneSignal, "User-" + i)).start();
}
System.out.println("所有用户准备就绪...");
Thread.sleep(1000); // 准备时间
// 同时释放所有用户线程
startSignal.countDown();
// 等待所有用户完成操作
doneSignal.await();
System.out.println("所有用户操作完成");
}
}
class User implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final String name;
public User(CountDownLatch startSignal, CountDownLatch doneSignal, String name) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
this.name = name;
}
@Override
public void run() {
try {
startSignal.await(); // 等待开始信号
System.out.println(name + " 开始执行操作");
// 执行具体操作...
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneSignal.countDown();
}
}
}
主要方法
方法 | 描述 |
---|---|
CountDownLatch(int count) |
构造函数,指定计数器的初始值 |
void await() |
使当前线程等待,直到计数器为0 |
boolean await(long timeout, TimeUnit unit) |
带超时的等待 |
void countDown() |
计数器减1 |
long getCount() |
返回当前计数 |
使用场景总结
-
启动顺序控制:确保某些服务在其他服务启动完成后再启动
-
并行计算:等待所有子任务完成后进行结果汇总
-
测试框架:协调多个测试线程的同步执行
-
资源初始化:等待所有资源加载完成后再继续
-
游戏开发:等待所有玩家准备就绪
注意事项
-
一次性使用:CountDownLatch 的计数器不能被重置,如果需要重复使用,考虑使用
CyclicBarrier
-
异常处理:确保在
finally
块中调用countDown()
,避免线程异常导致计数器无法减少 -
避免死锁:合理设置超时时间,防止无限期等待
CountDownLatch 提供了一种简单而强大的线程同步机制,特别适用于"一个线程等待多个线程"或"多个线程互相等待"的场景。
5.2 CyclicBarrier
5.2.1什么是 CyclicBarrier?
CyclicBarrier 是 Java 并发包 (java.util.concurrent
) 中的另一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个屏障点(barrier point)后才能继续执行。
核心机制:通过一个计数器来实现循环同步,当指定数量的线程都调用 await()
方法后,这些线程才会继续执行,同时屏障会重置以便下次使用。
5.2.2基本用法
import java.util.concurrent.*;
public class CyclicBarrierExample {
static class MatrixProcessor {
private final int[][] matrix;
private final int[] results;
private final CyclicBarrier barrier;
public MatrixProcessor(int[][] matrix) {
this.matrix = matrix;
this.results = new int[matrix.length];
this.barrier = new CyclicBarrier(matrix.length, () -> {
// 所有线程到达屏障后执行
int finalResult = 0;
for (int result : results) {
finalResult += result;
}
System.out.println("All rows processed. Final sum: " + finalResult);
});
}
public void process() {
for (int i = 0; i < matrix.length; i++) {
final int row = i;
new Thread(() -> {
try {
// 处理行
int sum = 0;
for (int value : matrix[row]) {
sum += value;
Thread.sleep(10); // 模拟处理时间
}
results[row] = sum;
System.out.println("Row " + row + " processed, sum: " + sum);
// 等待其他行处理完成
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
public static void main(String[] args) {
int[][] matrix = {
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
{10, 11, 12}
};
MatrixProcessor processor = new MatrixProcessor(matrix);
processor.process();
}
}
5.2.3为什么要使用 CyclicBarrier?
1. 多阶段并行计算
// 场景:并行计算任务,需要多个阶段同步
public class ParallelComputation {
public static void main(String[] args) {
int threadCount = 4;
CyclicBarrier barrier = new CyclicBarrier(threadCount,
() -> System.out.println("=== 阶段完成,开始下一阶段 ==="));
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.execute(new ComputationTask(barrier, i));
}
executor.shutdown();
}
}
class ComputationTask implements Runnable {
private final CyclicBarrier barrier;
private final int taskId;
public ComputationTask(CyclicBarrier barrier, int taskId) {
this.barrier = barrier;
this.taskId = taskId;
}
@Override
public void run() {
try {
// 第一阶段计算
System.out.println("任务 " + taskId + " 执行第一阶段计算");
Thread.sleep(1000 + taskId * 100);
barrier.await();
// 第二阶段计算
System.out.println("任务 " + taskId + " 执行第二阶段计算");
Thread.sleep(800 + taskId * 100);
barrier.await();
// 第三阶段计算
System.out.println("任务 " + taskId + " 执行第三阶段计算");
Thread.sleep(600 + taskId * 100);
barrier.await();
System.out.println("任务 " + taskId + " 全部完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. 复杂数据处理流水线
// 场景:数据分片处理,每个阶段需要同步
public class DataProcessingPipeline {
public static void main(String[] args) {
int dataPartitions = 3;
CyclicBarrier barrier = new CyclicBarrier(dataPartitions,
() -> System.out.println("所有分区数据处理完成,进入下一阶段"));
for (int i = 0; i < dataPartitions; i++) {
new Thread(new DataProcessor(barrier, i)).start();
}
}
}
class DataProcessor implements Runnable {
private final CyclicBarrier barrier;
private final int partitionId;
public DataProcessor(CyclicBarrier barrier, int partitionId) {
this.barrier = barrier;
this.partitionId = partitionId;
}
@Override
public void run() {
try {
// 数据加载阶段
System.out.println("分区 " + partitionId + " 加载数据");
loadData();
barrier.await();
// 数据清洗阶段
System.out.println("分区 " + partitionId + " 清洗数据");
cleanData();
barrier.await();
// 数据分析阶段
System.out.println("分区 " + partitionId + " 分析数据");
analyzeData();
barrier.await();
// 结果输出阶段
System.out.println("分区 " + partitionId + " 输出结果");
outputResult();
} catch (Exception e) {
e.printStackTrace();
}
}
private void loadData() throws InterruptedException {
Thread.sleep(500 + (long) (Math.random() * 1000));
}
private void cleanData() throws InterruptedException {
Thread.sleep(300 + (long) (Math.random() * 800));
}
private void analyzeData() throws InterruptedException {
Thread.sleep(700 + (long) (Math.random() * 1200));
}
private void outputResult() throws InterruptedException {
Thread.sleep(200 + (long) (Math.random() * 500));
}
}
3 游戏开发中的多玩家同步
// 场景:多玩家游戏,每个回合需要等待所有玩家完成操作
public class MultiplayerGame {
public static void main(String[] args) {
int playerCount = 4;
CyclicBarrier roundBarrier = new CyclicBarrier(playerCount,
() -> System.out.println("=== 所有玩家完成操作,开始新回合 ==="));
for (int i = 0; i < playerCount; i++) {
new Thread(new Player(roundBarrier, "Player-" + (i + 1))).start();
}
}
}
class Player implements Runnable {
private final CyclicBarrier barrier;
private final String name;
private int score = 0;
public Player(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
// 模拟多个游戏回合
for (int round = 1; round <= 3; round++) {
System.out.println(name + " 在第 " + round + " 回合中操作...");
playRound();
System.out.println(name + " 完成第 " + round + " 回合,等待其他玩家");
barrier.await(); // 等待所有玩家完成当前回合
// 屏障打开后,所有线程同时继续
System.out.println(name + " 开始第 " + (round + 1) + " 回合准备");
}
System.out.println(name + " 游戏结束,最终得分: " + score);
} catch (Exception e) {
e.printStackTrace();
}
}
private void playRound() throws InterruptedException {
Thread.sleep(1000 + (long) (Math.random() * 2000));
score += (int) (Math.random() * 100);
}
}
主要方法
方法 | 描述 |
---|---|
CyclicBarrier(int parties) |
创建屏障,指定参与线程数 |
CyclicBarrier(int parties, Runnable barrierAction) |
创建屏障,指定参与线程数和屏障动作 |
int await() |
等待所有线程到达屏障 |
int await(long timeout, TimeUnit unit) |
带超时的等待 |
int getParties() |
返回需要的线程数 |
int getNumberWaiting() |
返回当前在屏障处等待的线程数 |
boolean isBroken() |
查询屏障是否处于损坏状态 |
void reset() |
重置屏障到初始状态 |
CyclicBarrier vs CountDownLatch
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
重用性 | ✅ 可重复使用 | ❌ 一次性使用 |
计数器 | 自动重置 | 手动重置 |
等待机制 | 线程互相等待 | 一个或多个线程等待其他线程 |
使用场景 | 多阶段任务同步 | 一次性任务完成等待 |
使用场景总结
-
多阶段并行计算:将复杂计算分解为多个阶段,每个阶段需要同步
-
数据分片处理:处理分片数据,确保所有分片完成当前阶段后再进入下一阶段
-
游戏开发:多玩家游戏回合同步
-
模拟测试:多个测试线程需要同步执行不同阶段
-
迭代算法:需要多次迭代的并行算法,每次迭代后同步
注意事项
-
可重复使用:CyclicBarrier 可以重复使用,每次所有线程到达屏障后会自动重置
-
屏障动作:可以指定一个 Runnable 作为屏障动作,在所有线程到达屏障后、释放线程前执行
-
异常处理:如果一个线程在等待时被中断或超时,其他等待线程会收到
BrokenBarrierException
-
重置操作:可以通过
reset()
方法手动重置屏障,但会中断所有等待的线程
高级用法示例
// 复杂的分阶段并行处理
public class AdvancedCyclicBarrierExample {
public static void main(String[] args) {
int workerCount = 3;
AtomicInteger phase = new AtomicInteger(1);
CyclicBarrier barrier = new CyclicBarrier(workerCount, () -> {
int currentPhase = phase.getAndIncrement();
System.out.println("=== 阶段 " + currentPhase + " 完成 ===");
if (currentPhase >= 3) {
System.out.println("=== 所有处理阶段完成 ===");
}
});
for (int i = 0; i < workerCount; i++) {
new Thread(new AdvancedWorker(barrier, i)).start();
}
}
}
class AdvancedWorker implements Runnable {
private final CyclicBarrier barrier;
private final int workerId;
public AdvancedWorker(CyclicBarrier barrier, int workerId) {
this.barrier = barrier;
this.workerId = workerId;
}
@Override
public void run() {
try {
// 阶段1:数据准备
prepareData();
barrier.await();
// 阶段2:数据处理
processData();
barrier.await();
// 阶段3:结果汇总
aggregateResults();
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
private void prepareData() throws InterruptedException {
System.out.println("Worker " + workerId + " 准备数据");
Thread.sleep(1000);
}
private void processData() throws InterruptedException {
System.out.println("Worker " + workerId + " 处理数据");
Thread.sleep(1500);
}
private void aggregateResults() throws InterruptedException {
System.out.println("Worker " + workerId + " 汇总结果");
Thread.sleep(800);
}
}
CyclicBarrier 提供了一种强大的多线程同步机制,特别适用于需要多个线程在多阶段任务中保持同步的场景,其可重复使用的特性使其在循环或迭代处理中非常有用。
5.3 Semaphore
5.3.1什么是 Semaphore?
Semaphore(信号量)是 Java 并发包 (java.util.concurrent
) 中的同步工具类,用于控制同时访问特定资源的线程数量。它通过维护一组"许可证"(permits)来协调多线程对共享资源的访问。
核心机制:Semaphore 内部维护一个计数器,线程通过 acquire()
获取许可证(计数器减1),通过 release()
释放许可证(计数器加1)。当计数器为0时,尝试获取许可证的线程会被阻塞,直到有其他线程释放许可证。
5.3.2 基本用法
import java.util.concurrent.*;
public class SemaphoreExample {
static class ConnectionPool {
private final Semaphore semaphore;
private final boolean[] connections;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize, true); // 公平信号量
this.connections = new boolean[poolSize];
}
public Integer getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可
return getAvailableConnection();
}
public void releaseConnection(int index) {
if (releaseConnectionInternal(index)) {
semaphore.release(); // 释放许可
}
}
private synchronized Integer getAvailableConnection() {
for (int i = 0; i < connections.length; i++) {
if (!connections[i]) {
connections[i] = true;
System.out.println(Thread.currentThread().getName() + " acquired connection " + i);
return i;
}
}
return null; // 不应该发生
}
private synchronized boolean releaseConnectionInternal(int index) {
if (connections[index]) {
connections[index] = false;
System.out.println(Thread.currentThread().getName() + " released connection " + index);
return true;
}
return false;
}
}
static class DatabaseUser implements Runnable {
private final ConnectionPool pool;
private final int userId;
public DatabaseUser(ConnectionPool pool, int userId) {
this.pool = pool;
this.userId = userId;
}
@Override
public void run() {
try {
Integer connection = pool.getConnection();
if (connection != null) {
System.out.println("User " + userId + " using connection " + connection);
Thread.sleep(2000 + userId * 100); // 模拟数据库操作
pool.releaseConnection(connection);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(3); // 只有3个连接
// 创建10个用户线程竞争连接
for (int i = 0; i < 10; i++) {
new Thread(new DatabaseUser(pool, i), "User-" + i).start();
}
}
}
5.3.3为什么要使用 Semaphore?
1. 资源池管理
// 场景:数据库连接池
public class ConnectionPool {
private final Semaphore semaphore;
private final List<Connection> connections;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize);
this.connections = new ArrayList<>();
// 初始化连接池
for (int i = 0; i < poolSize; i++) {
connections.add(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取访问权限
return getAvailableConnection();
}
public void releaseConnection(Connection connection) {
returnConnection(connection);
semaphore.release(); // 释放访问权限
}
private Connection createConnection() {
// 创建数据库连接
return new MockConnection();
}
private synchronized Connection getAvailableConnection() {
// 从池中获取可用连接
return connections.remove(0);
}
private synchronized void returnConnection(Connection connection) {
// 将连接返回池中
connections.add(connection);
}
}
// 使用连接池
public class ConnectionPoolExample {
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(5);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
try {
Connection conn = pool.getConnection();
System.out.println("任务 " + taskId + " 获取连接,执行查询...");
Thread.sleep(1000);
pool.releaseConnection(conn);
System.out.println("任务 " + taskId + " 释放连接");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
2. 限流和流量控制
// 场景:API 限流
public class RateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
this.semaphore = new Semaphore(permitsPerSecond);
this.scheduler = Executors.newScheduledThreadPool(1);
// 每秒重置许可证
scheduler.scheduleAtFixedRate(() -> {
int availablePermits = semaphore.availablePermits();
int drainCount = permitsPerSecond - availablePermits;
if (drainCount > 0) {
semaphore.release(drainCount);
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
public void shutdown() {
scheduler.shutdown();
}
}
// 使用限流器
public class APIRateLimitExample {
public static void main(String[] args) {
// 限制每秒最多 10 个请求
RateLimiter rateLimiter = new RateLimiter(10);
// 模拟大量请求
for (int i = 0; i < 50; i++) {
final int requestId = i;
new Thread(() -> {
if (rateLimiter.tryAcquire()) {
System.out.println("请求 " + requestId + " 被处理");
// 处理请求...
} else {
System.out.println("请求 " + requestId + " 被限流");
}
}).start();
try {
Thread.sleep(100); // 模拟请求间隔
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
rateLimiter.shutdown();
}
}
3. 生产者-消费者模式
// 场景:有界缓冲区
public class BoundedBuffer<E> {
private final Semaphore availableItems; // 可消费的项目数
private final Semaphore availableSpaces; // 可用的空间数
private final E[] buffer;
private int putPosition = 0;
private int takePosition = 0;
@SuppressWarnings("unchecked")
public BoundedBuffer(int capacity) {
this.availableItems = new Semaphore(0);
this.availableSpaces = new Semaphore(capacity);
this.buffer = (E[]) new Object[capacity];
}
public void put(E item) throws InterruptedException {
availableSpaces.acquire(); // 等待可用空间
synchronized (this) {
buffer[putPosition] = item;
putPosition = (putPosition + 1) % buffer.length;
}
availableItems.release(); // 增加可消费项目
}
public E take() throws InterruptedException {
availableItems.acquire(); // 等待可消费项目
E item;
synchronized (this) {
item = buffer[takePosition];
buffer[takePosition] = null;
takePosition = (takePosition + 1) % buffer.length;
}
availableSpaces.release(); // 增加可用空间
return item;
}
}
// 测试生产者-消费者
public class ProducerConsumerExample {
public static void main(String[] args) {
BoundedBuffer<Integer> buffer = new BoundedBuffer<>(5);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
buffer.put(i);
System.out.println("生产: " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
Integer item = buffer.take();
System.out.println("消费: " + item);
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
主要方法
方法 | 描述 |
---|---|
Semaphore(int permits) |
创建指定许可证数量的信号量 |
Semaphore(int permits, boolean fair) |
创建信号量,指定是否公平模式 |
void acquire() |
获取一个许可证,阻塞直到可用 |
void acquire(int permits) |
获取指定数量的许可证 |
boolean tryAcquire() |
尝试获取许可证,立即返回结果 |
boolean tryAcquire(long timeout, TimeUnit unit) |
带超时的尝试获取 |
void release() |
释放一个许可证 |
void release(int permits) |
释放指定数量的许可证 |
int availablePermits() |
返回当前可用许可证数量 |
boolean isFair() |
检查是否为公平模式 |
公平模式 vs 非公平模式
public class FairVsNonFairExample {
public static void main(String[] args) {
// 公平模式 - 按照请求顺序分配许可证
Semaphore fairSemaphore = new Semaphore(1, true);
// 非公平模式 - 可能插队
Semaphore nonFairSemaphore = new Semaphore(1, false);
testSemaphore("公平模式", fairSemaphore);
// testSemaphore("非公平模式", nonFairSemaphore);
}
private static void testSemaphore(String mode, Semaphore semaphore) {
System.out.println("=== " + mode + " ===");
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 尝试获取许可证");
semaphore.acquire();
System.out.println("线程 " + threadId + " 获得许可证");
Thread.sleep(1000);
System.out.println("线程 " + threadId + " 释放许可证");
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 稍微延迟启动,使线程按顺序创建
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
使用场景总结
1. 资源访问控制
// 控制对昂贵资源(如数据库连接、网络连接)的并发访问
public class ExpensiveResourceManager {
private final Semaphore semaphore;
private final Set<ExpensiveResource> resources;
public ExpensiveResourceManager(int maxConcurrent) {
this.semaphore = new Semaphore(maxConcurrent);
this.resources = Collections.synchronizedSet(new HashSet<>());
}
public void useResource() throws InterruptedException {
semaphore.acquire();
try {
ExpensiveResource resource = acquireResource();
// 使用资源...
resource.doSomething();
} finally {
releaseResource();
semaphore.release();
}
}
}
2. 系统负载保护
// 防止系统过载,确保同时处理的请求不超过系统承载能力
public class LoadProtection {
private final Semaphore loadSemaphore;
private final int maxLoad;
public LoadProtection(int maxLoad) {
this.maxLoad = maxLoad;
this.loadSemaphore = new Semaphore(maxLoad);
}
public boolean processRequest(Request request) {
if (loadSemaphore.tryAcquire()) {
try {
// 处理请求
return handleRequest(request);
} finally {
loadSemaphore.release();
}
} else {
// 系统过载,拒绝请求
return false;
}
}
public double getCurrentLoad() {
return (double) (maxLoad - loadSemaphore.availablePermits()) / maxLoad;
}
}
3. 对象池实现
// 通用对象池实现
public class ObjectPool<T> {
private final Semaphore semaphore;
private final BlockingQueue<T> pool;
private final Supplier<T> objectFactory;
public ObjectPool(int size, Supplier<T> objectFactory) {
this.semaphore = new Semaphore(size);
this.pool = new LinkedBlockingQueue<>(size);
this.objectFactory = objectFactory;
// 初始化对象池
for (int i = 0; i < size; i++) {
pool.offer(objectFactory.get());
}
}
public T borrowObject() throws InterruptedException {
semaphore.acquire();
return pool.take();
}
public void returnObject(T object) {
if (object != null) {
pool.offer(object);
semaphore.release();
}
}
public int getAvailableCount() {
return semaphore.availablePermits();
}
}
高级用法
1. 多资源类型管理
// 管理不同类型的资源,每种资源有不同的并发限制
public class MultiResourceManager {
private final Map<String, Semaphore> resourceSemaphores;
public MultiResourceManager(Map<String, Integer> resourceLimits) {
this.resourceSemaphores = new ConcurrentHashMap<>();
resourceLimits.forEach((resource, limit) ->
resourceSemaphores.put(resource, new Semaphore(limit))
);
}
public boolean acquireResource(String resourceType) {
Semaphore semaphore = resourceSemaphores.get(resourceType);
if (semaphore != null) {
return semaphore.tryAcquire();
}
return false;
}
public void releaseResource(String resourceType) {
Semaphore semaphore = resourceSemaphores.get(resourceType);
if (semaphore != null) {
semaphore.release();
}
}
}
2. 动态调整许可证数量
// 支持运行时动态调整并发限制
public class DynamicSemaphore {
private Semaphore semaphore;
private int currentPermits;
public DynamicSemaphore(int initialPermits) {
this.currentPermits = initialPermits;
this.semaphore = new Semaphore(initialPermits);
}
public synchronized void setPermits(int newPermits) {
if (newPermits > currentPermits) {
// 增加许可证
int additional = newPermits - currentPermits;
semaphore.release(additional);
} else if (newPermits < currentPermits) {
// 减少许可证 - 这个比较复杂,需要特殊处理
adjustPermitsDown(newPermits);
}
currentPermits = newPermits;
}
private void adjustPermitsDown(int newPermits) {
// 简化的实现:创建新的 Semaphore
Semaphore oldSemaphore = semaphore;
semaphore = new Semaphore(newPermits);
// 在实际应用中,需要更复杂的逻辑来处理正在等待的线程
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
public void release() {
semaphore.release();
}
}
注意事项
-
异常安全:始终在
finally
块中释放许可证,避免许可证泄露 -
公平性选择:根据场景选择公平或非公平模式,非公平模式通常有更好的吞吐量
-
资源清理:确保在不再需要时正确关闭资源
-
死锁预防:避免多个信号量之间的循环等待
-
性能考虑:在高并发场景下,考虑使用
tryAcquire()
而非阻塞的acquire()
Semaphore 是一个强大的并发控制工具,特别适用于资源池管理、流量控制、系统保护等场景。正确使用 Semaphore 可以显著提高系统的稳定性和性能。
5.4 Phaser
5.4.1什么是 Phaser?
Phaser 是 Java 7 引入的一个强大的、可重用的同步屏障,它结合了 CountDownLatch 和 CyclicBarrier 的功能,并提供了更灵活的同步控制。Phaser 支持动态调整参与线程的数量,并且可以分多个阶段(phase)进行同步。
核心机制:Phaser 维护一个相位(phase)计数器,线程可以在每个阶段注册、到达、等待其他线程,并且可以动态地增加或减少参与同步的线程数量。
5.4.2基本用法
import java.util.concurrent.*;
public class PhaserExample {
static class MultiPhaseTask implements Runnable {
private final Phaser phaser;
private final String name;
public MultiPhaseTask(Phaser phaser, String name) {
this.phaser = phaser;
this.name = name;
phaser.register(); // 注册到Phaser
}
@Override
public void run() {
// 阶段1
System.out.println(name + " starting phase 1");
doWork(1000);
System.out.println(name + " completed phase 1");
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成阶段1
// 阶段2
System.out.println(name + " starting phase 2");
doWork(1500);
System.out.println(name + " completed phase 2");
phaser.arriveAndAwaitAdvance(); // 等待其他线程完成阶段2
// 阶段3
System.out.println(name + " starting phase 3");
doWork(500);
System.out.println(name + " completed phase 3");
phaser.arriveAndDeregister(); // 完成并注销
}
private void doWork(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // 主线程注册
// 创建任务
for (int i = 0; i < 3; i++) {
new Thread(new MultiPhaseTask(phaser, "Task-" + i)).start();
}
// 主线程等待所有阶段完成
System.out.println("Main thread waiting for phase completion");
phaser.arriveAndAwaitAdvance(); // 阶段1完成
System.out.println("Phase 1 completed");
phaser.arriveAndAwaitAdvance(); // 阶段2完成
System.out.println("Phase 2 completed");
phaser.arriveAndAwaitAdvance(); // 阶段3完成
System.out.println("Phase 3 completed");
System.out.println("All phases completed");
}
}
5.4.3为什么要使用 Phaser?
1. 动态线程管理
// 场景:任务执行过程中可以动态增减参与线程
public class DynamicPhaserExample {
public static void main(String[] args) {
// 创建 Phaser,初始参与线程数为 1(主线程)
Phaser phaser = new Phaser(1);
// 启动 3 个工作线程
for (int i = 0; i < 3; i++) {
phaser.register(); // 注册新线程
new Thread(new DynamicWorker(phaser, "Worker-" + i)).start();
}
// 主线程也参与同步
System.out.println("主线程开始等待...");
phaser.arriveAndAwaitAdvance();
System.out.println("第一阶段完成,当前相位: " + phaser.getPhase());
// 动态添加新线程
phaser.register();
new Thread(new DynamicWorker(phaser, "New-Worker")).start();
phaser.arriveAndAwaitAdvance();
System.out.println("第二阶段完成,当前相位: " + phaser.getPhase());
// 主线程退出,减少参与线程数
phaser.arriveAndDeregister();
}
}
class DynamicWorker implements Runnable {
private final Phaser phaser;
private final String name;
public DynamicWorker(Phaser phaser, String name) {
this.phaser = phaser;
this.name = name;
}
@Override
public void run() {
System.out.println(name + " 开始工作,注册到 Phaser");
// 参与多个阶段
for (int phase = 0; phase < 3; phase++) {
System.out.println(name + " 执行阶段 " + phase);
doWork(500 + (long) (Math.random() * 1000));
System.out.println(name + " 完成阶段 " + phase + ",等待其他线程");
// 到达屏障并等待其他线程
phaser.arriveAndAwaitAdvance();
// 模拟某些线程提前退出
if (name.equals("Worker-1") && phase == 1) {
System.out.println(name + " 提前退出");
phaser.arriveAndDeregister();
return;
}
}
// 正常完成所有阶段后注销
phaser.arriveAndDeregister();
System.out.println(name + " 完成所有工作并注销");
}
private void doWork(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
2. 复杂多阶段任务协调
// 场景:复杂的多阶段数据处理流程
public class MultiStageProcessing {
public static void main(String[] args) {
int initialParties = 4;
Phaser phaser = new Phaser(initialParties) {
// 重写 onAdvance 方法,在每个阶段完成后执行特定操作
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("=== 阶段 " + phase + " 完成 ===");
System.out.println("注册线程数: " + registeredParties);
// 如果阶段达到3或者没有注册的线程,则终止
return phase >= 2 || registeredParties == 0;
}
};
// 创建处理任务
for (int i = 0; i < initialParties; i++) {
new Thread(new DataProcessor(phaser, i)).start();
}
// 等待所有线程完成
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
}
System.out.println("所有处理完成,Phaser 已终止");
}
}
class DataProcessor implements Runnable {
private final Phaser phaser;
private final int processorId;
public DataProcessor(Phaser phaser, int processorId) {
this.phaser = phaser;
this.processorId = processorId;
}
@Override
public void run() {
try {
// 阶段 0: 数据加载
System.out.println("处理器 " + processorId + " 加载数据");
loadData();
phaser.arriveAndAwaitAdvance();
// 阶段 1: 数据处理
System.out.println("处理器 " + processorId + " 处理数据");
processData();
phaser.arriveAndAwaitAdvance();
// 阶段 2: 结果保存
System.out.println("处理器 " + processorId + " 保存结果");
saveResults();
phaser.arriveAndDeregister(); // 完成所有阶段,注销
} catch (Exception e) {
e.printStackTrace();
phaser.arriveAndDeregister(); // 发生异常时也要注销
}
}
private void loadData() throws InterruptedException {
Thread.sleep(800 + (long) (Math.random() * 700));
}
private void processData() throws InterruptedException {
Thread.sleep(1200 + (long) (Math.random() * 1000));
}
private void saveResults() throws InterruptedException {
Thread.sleep(500 + (long) (Math.random() * 500));
}
}
3. 分层任务执行
// 场景:主任务和子任务的层次化同步
public class HierarchicalPhaserExample {
public static void main(String[] args) {
// 主 Phaser,协调整个任务流程
Phaser mainPhaser = new Phaser(1);
// 执行多个复杂任务,每个任务有自己的子 Phaser
for (int taskId = 0; taskId < 3; taskId++) {
System.out.println("开始执行任务 " + taskId);
executeComplexTask(mainPhaser, taskId);
mainPhaser.arriveAndAwaitAdvance();
System.out.println("任务 " + taskId + " 完成");
}
mainPhaser.arriveAndDeregister();
System.out.println("所有任务执行完成");
}
private static void executeComplexTask(Phaser parentPhaser, int taskId) {
// 为每个任务创建子 Phaser
Phaser taskPhaser = new Phaser(parentPhaser, 0) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("任务 " + taskId + " 阶段 " + phase + " 完成");
return phase >= 2; // 执行3个阶段后终止
}
};
// 启动子任务
int subtaskCount = 2 + taskId; // 不同任务有不同数量的子任务
for (int i = 0; i < subtaskCount; i++) {
taskPhaser.register();
new Thread(new Subtask(taskPhaser, taskId, i)).start();
}
// 等待子任务完成
while (!taskPhaser.isTerminated()) {
taskPhaser.arriveAndAwaitAdvance();
}
}
}
class Subtask implements Runnable {
private final Phaser phaser;
private final int taskId;
private final int subtaskId;
public Subtask(Phaser phaser, int taskId, int subtaskId) {
this.phaser = phaser;
this.taskId = taskId;
this.subtaskId = subtaskId;
}
@Override
public void run() {
try {
System.out.printf("子任务 [任务%d-%d] 开始%n", taskId, subtaskId);
// 阶段 0
phaseWork("准备", 300);
phaser.arriveAndAwaitAdvance();
// 阶段 1
phaseWork("计算", 600);
phaser.arriveAndAwaitAdvance();
// 阶段 2
phaseWork("清理", 200);
phaser.arriveAndDeregister();
System.out.printf("子任务 [任务%d-%d] 完成%n", taskId, subtaskId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
phaser.arriveAndDeregister();
}
}
private void phaseWork(String phaseName, long baseTime) throws InterruptedException {
long workTime = baseTime + (long) (Math.random() * 500);
System.out.printf("子任务 [任务%d-%d] 执行 %s 阶段 (%d ms)%n",
taskId, subtaskId, phaseName, workTime);
Thread.sleep(workTime);
}
}
主要方法
方法 | 描述 |
---|---|
Phaser() |
创建 Phaser,初始参与数为 0 |
Phaser(int parties) |
创建指定初始参与数的 Phaser |
Phaser(Phaser parent) |
创建父 Phaser 的子 Phaser |
Phaser(Phaser parent, int parties) |
创建指定父 Phaser 和初始参与数的 Phaser |
int register() |
注册一个新参与者,返回当前相位 |
int arrive() |
到达屏障,但不等待,返回当前相位 |
int arriveAndAwaitAdvance() |
到达屏障并等待其他参与者 |
int arriveAndDeregister() |
到达屏障并注销自己 |
int awaitAdvance(int phase) |
等待相位前进到指定值 |
int getPhase() |
返回当前相位 |
int getRegisteredParties() |
返回注册的参与者数量 |
boolean isTerminated() |
检查 Phaser 是否已终止 |
高级特性
1. 分层 Phaser
// 创建 Phaser 树,减少竞争
public class PhaserTreeExample {
public static void main(String[] args) {
Phaser rootPhaser = new Phaser(1);
// 创建多个子 Phaser
for (int i = 0; i < 3; i++) {
Phaser childPhaser = new Phaser(rootPhaser, 0);
startChildTasks(childPhaser, i, 2);
}
// 根 Phaser 等待所有子 Phaser 完成
rootPhaser.arriveAndAwaitAdvance();
System.out.println("所有子任务完成");
rootPhaser.arriveAndDeregister();
}
private static void startChildTasks(Phaser phaser, int groupId, int taskCount) {
for (int i = 0; i < taskCount; i++) {
phaser.register();
new Thread(new TreeTask(phaser, groupId, i)).start();
}
}
}
class TreeTask implements Runnable {
private final Phaser phaser;
private final int groupId;
private final int taskId;
public TreeTask(Phaser phaser, int groupId, int taskId) {
this.phaser = phaser;
this.groupId = groupId;
this.taskId = taskId;
}
@Override
public void run() {
System.out.printf("任务 [组%d-%d] 开始%n", groupId, taskId);
for (int phase = 0; phase < 2; phase++) {
doWork(phase);
phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
System.out.printf("任务 [组%d-%d] 完成%n", groupId, taskId);
}
private void doWork(int phase) {
try {
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.printf("任务 [组%d-%d] 完成阶段 %d%n", groupId, taskId, phase);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
2. 自定义终止条件
// 通过重写 onAdvance 方法自定义终止条件
public class CustomTerminationPhaser {
public static void main(String[] args) {
Phaser phaser = new Phaser(3) {
private int successCount = 0;
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("阶段 " + phase + " 完成,成功计数: " + successCount);
// 自定义终止条件:达到3个阶段或者成功次数超过5次
if (phase >= 2 || successCount >= 5) {
System.out.println("Phaser 终止");
return true; // 终止 Phaser
}
return false; // 继续下一个阶段
}
public void incrementSuccess() {
successCount++;
}
};
for (int i = 0; i < 3; i++) {
new Thread(new CustomTask(phaser, i)).start();
}
}
}
class CustomTask implements Runnable {
private final CustomTerminationPhaser phaser;
private final int taskId;
public CustomTask(Phaser phaser, int taskId) {
this.phaser = (CustomTerminationPhaser) phaser;
this.taskId = taskId;
}
@Override
public void run() {
Random random = new Random();
while (!phaser.isTerminated()) {
System.out.println("任务 " + taskId + " 执行工作");
// 模拟工作,可能成功或失败
boolean success = random.nextBoolean();
if (success) {
phaser.incrementSuccess();
System.out.println("任务 " + taskId + " 成功");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
phaser.arriveAndAwaitAdvance();
}
System.out.println("任务 " + taskId + " 退出");
}
}
Phaser vs CountDownLatch vs CyclicBarrier
特性 | Phaser | CountDownLatch | CyclicBarrier |
---|---|---|---|
重用性 | ✅ 可重复使用 | ❌ 一次性使用 | ✅ 可重复使用 |
动态调整 | ✅ 支持动态注册/注销 | ❌ 固定数量 | ❌ 固定数量 |
分层结构 | ✅ 支持父子 Phaser | ❌ 不支持 | ❌ 不支持 |
阶段数量 | 🔄 无限阶段 | ❌ 单阶段 | 🔄 固定阶段循环 |
灵活性 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ |
使用场景总结
-
动态任务执行:任务执行过程中可以动态增减参与线程
-
复杂工作流:多阶段、多层次的任务协调
-
游戏开发:多玩家游戏的回合制同步
-
科学计算:分阶段并行算法
-
数据处理:复杂的数据处理流水线
注意事项
-
性能考虑:Phaser 比 CountDownLatch 和 CyclicBarrier 更重量级,在简单场景下可能过度复杂
-
异常处理:确保在异常情况下正确调用
arriveAndDeregister()
避免线程泄露 -
终止检测:使用
isTerminated()
检查 Phaser 是否已终止 -
分层优化:对于大量线程,使用分层 Phaser 可以减少竞争
Phaser 提供了最灵活的同步控制,特别适用于复杂的、动态的多阶段同步场景。虽然学习曲线较陡峭,但在合适的场景下能提供强大的同步能力。
6. 线程池
6.1 ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class ThreadPoolExample {
static class ComputationTask implements Callable<Double> {
private final int taskId;
public ComputationTask(int taskId) {
this.taskId = taskId;
}
@Override
public Double call() throws Exception {
System.out.println("Task " + taskId + " started by " + Thread.currentThread().getName());
// 模拟计算密集型任务
double result = 0;
for (int i = 0; i < 1000000; i++) {
result += Math.sin(i) * Math.cos(i);
}
System.out.println("Task " + taskId + " completed");
return result;
}
}
public static void main(String[] args) throws Exception {
// 创建固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 创建任务列表
List<ComputationTask> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(new ComputationTask(i));
}
// 提交所有任务并获取Future列表
System.out.println("Submitting tasks...");
List<Future<Double>> futures = fixedPool.invokeAll(tasks);
// 处理结果
System.out.println("Processing results...");
for (int i = 0; i < futures.size(); i++) {
try {
Double result = futures.get(i).get();
System.out.println("Task " + i + " result: " + result);
} catch (ExecutionException e) {
System.out.println("Task " + i + " failed: " + e.getCause());
}
}
// 关闭线程池
fixedPool.shutdown();
// 等待线程池终止
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println("Forcing shutdown...");
fixedPool.shutdownNow();
}
System.out.println("All tasks completed");
}
}
6.2 ScheduledExecutorService
import java.util.concurrent.*;
import java.time.*;
public class ScheduledExecutorExample {
static class ScheduledTask implements Runnable {
private final String name;
private int count = 0;
public ScheduledTask(String name) {
this.name = name;
}
@Override
public void run() {
count++;
System.out.println(LocalTime.now() + " - " + name + " executed " + count + " times");
// 模拟任务执行时间
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
System.out.println("Starting scheduled tasks at: " + LocalTime.now());
// 固定延迟执行(每次执行结束后延迟固定时间再执行)
ScheduledTask fixedDelayTask = new ScheduledTask("FixedDelayTask");
scheduler.scheduleWithFixedDelay(fixedDelayTask, 0, 3, TimeUnit.SECONDS);
// 固定频率执行(按照固定频率执行,不考虑任务执行时间)
ScheduledTask fixedRateTask = new ScheduledTask("FixedRateTask");
scheduler.scheduleAtFixedRate(fixedRateTask, 0, 3, TimeUnit.SECONDS);
// 单次延迟执行
scheduler.schedule(() -> {
System.out.println(LocalTime.now() + " - One-time task executed");
}, 10, TimeUnit.SECONDS);
// 运行一段时间后关闭
Thread.sleep(20000);
System.out.println("Shutting down scheduler...");
scheduler.shutdown();
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
}
}
7. 并发集合
7.1 ConcurrentHashMap
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentHashMapExample {
static class CacheManager {
private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, AtomicInteger> accessCount = new ConcurrentHashMap<>();
public void put(String key, String value) {
cache.put(key, value);
accessCount.putIfAbsent(key, new AtomicInteger(0));
}
public String get(String key) {
String value = cache.get(key);
if (value != null) {
accessCount.get(key).incrementAndGet();
}
return value;
}
public void printStatistics() {
System.out.println("Cache Statistics:");
cache.forEach((key, value) -> {
int count = accessCount.get(key).get();
System.out.println("Key: " + key + ", Access Count: " + count);
});
}
}
static class CacheUser implements Runnable {
private final CacheManager cache;
private final String[] keys;
public CacheUser(CacheManager cache, String[] keys) {
this.cache = cache;
this.keys = keys;
}
@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 100; i++) {
String key = keys[random.nextInt(keys.length)];
String value = cache.get(key);
if (value != null) {
// 模拟使用缓存值
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
CacheManager cache = new CacheManager();
// 初始化缓存
String[] keys = {"user:1", "user:2", "user:3", "config:1", "config:2"};
for (String key : keys) {
cache.put(key, "Value for " + key);
}
// 创建多个线程访问缓存
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new CacheUser(cache, keys), "CacheUser-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
// 打印统计信息
cache.printStatistics();
}
}
7.2 CopyOnWriteArrayList
import java.util.concurrent.*;
import java.util.*;
public class CopyOnWriteExample {
static class EventManager {
private final CopyOnWriteArrayList<EventListener> listeners =
new CopyOnWriteArrayList<>();
public void addListener(EventListener listener) {
listeners.add(listener);
System.out.println("Listener added. Total listeners: " + listeners.size());
}
public void removeListener(EventListener listener) {
listeners.remove(listener);
System.out.println("Listener removed. Total listeners: " + listeners.size());
}
public void fireEvent(String event) {
// 遍历时对原列表的修改不会影响当前迭代
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}
interface EventListener {
void onEvent(String event);
}
static class LoggingListener implements EventListener {
private final String name;
public LoggingListener(String name) {
this.name = name;
}
@Override
public void onEvent(String event) {
System.out.println(name + " received: " + event);
}
}
public static void main(String[] args) throws InterruptedException {
EventManager manager = new EventManager();
// 创建监听器
LoggingListener listener1 = new LoggingListener("Listener-1");
LoggingListener listener2 = new LoggingListener("Listener-2");
LoggingListener listener3 = new LoggingListener("Listener-3");
// 在事件触发过程中动态添加/移除监听器
Thread eventThread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
manager.fireEvent("Event " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
Thread modificationThread = new Thread(() -> {
try {
Thread.sleep(800);
manager.addListener(listener2);
Thread.sleep(800);
manager.addListener(listener3);
Thread.sleep(800);
manager.removeListener(listener1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 初始添加一个监听器
manager.addListener(listener1);
eventThread.start();
modificationThread.start();
eventThread.join();
modificationThread.join();
}
}
8. 原子变量
import java.util.concurrent.atomic.*;
import java.util.*;
public class AtomicExample {
static class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicLong total = new AtomicLong(0);
private final AtomicReference<String> status = new AtomicReference<>("STOPPED");
public void increment() {
count.incrementAndGet();
}
public void add(int value) {
total.addAndGet(value);
}
public boolean start() {
return status.compareAndSet("STOPPED", "RUNNING");
}
public boolean stop() {
return status.compareAndSet("RUNNING", "STOPPED");
}
public int getCount() {
return count.get();
}
public long getTotal() {
return total.get();
}
public String getStatus() {
return status.get();
}
}
static class CounterUser implements Runnable {
private final AtomicCounter counter;
private final Random random = new Random();
public CounterUser(AtomicCounter counter) {
this.counter = counter;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
counter.increment();
counter.add(random.nextInt(100));
// 偶尔尝试改变状态
if (i % 200 == 0) {
if (counter.getStatus().equals("STOPPED")) {
counter.start();
} else {
counter.stop();
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
AtomicCounter counter = new AtomicCounter();
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new CounterUser(counter), "Worker-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("Final count: " + counter.getCount());
System.out.println("Final total: " + counter.getTotal());
System.out.println("Final status: " + counter.getStatus());
// 验证原子性
int expectedCount = 10 * 1000; // 10个线程,每个1000次
System.out.println("Count is correct: " + (counter.getCount() == expectedCount));
}
}
9. 最佳实践和注意事项
9.1 避免死锁
public class DeadlockPrevention {
static class Account {
private final String name;
private int balance;
public Account(String name, int balance) {
this.name = name;
this.balance = balance;
}
public void debit(int amount) {
balance -= amount;
}
public void credit(int amount) {
balance += amount;
}
public String getName() {
return name;
}
}
static class AccountService {
// 按固定顺序获取锁来避免死锁
public boolean transfer(Account from, Account to, int amount) {
// 确定锁的顺序
Account firstLock = from.getName().compareTo(to.getName()) < 0 ? from : to;
Account secondLock = from.getName().compareTo(to.getName()) < 0 ? to : from;
synchronized (firstLock) {
synchronized (secondLock) {
if (from.balance >= amount) {
from.debit(amount);
to.credit(amount);
System.out.println("Transferred " + amount + " from " +
from.getName() + " to " + to.getName());
return true;
}
return false;
}
}
}
// 使用tryLock避免死锁
public boolean transferWithTryLock(Account from, Account to, int amount)
throws InterruptedException {
// 尝试在有限时间内获取两个锁
long stopTime = System.nanoTime() + 5000000000L; // 5秒超时
while (System.nanoTime() < stopTime) {
if (Thread.currentThread().getName().equals(from.getName())) {
synchronized (from) {
if (Thread.holdsLock(to)) {
// 执行转账
if (from.balance >= amount) {
from.debit(amount);
to.credit(amount);
System.out.println("Transferred " + amount + " from " +
from.getName() + " to " + to.getName());
return true;
}
return false;
}
}
} else {
synchronized (to) {
if (Thread.holdsLock(from)) {
// 执行转账
if (from.balance >= amount) {
from.debit(amount);
to.credit(amount);
System.out.println("Transferred " + amount + " from " +
from.getName() + " to " + to.getName());
return true;
}
return false;
}
}
}
Thread.sleep(10); // 短暂休眠后重试
}
throw new RuntimeException("Transfer timeout");
}
}
public static void main(String[] args) throws InterruptedException {
Account account1 = new Account("Account-A", 1000);
Account account2 = new Account("Account-B", 1000);
AccountService service = new AccountService();
// 模拟并发转账
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
service.transfer(account1, account2, 10);
}
}, "Account-A");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
service.transfer(account2, account1, 5);
}
}, "Account-B");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final balance - " + account1.getName() + ": " +
account1.balance + ", " + account2.getName() + ": " + account2.balance);
}
}
9.2 性能考虑
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class PerformanceComparison {
private static final int ITERATIONS = 1000000;
private static final int THREAD_COUNT = 4;
static class SynchronizedCounter {
private long count = 0;
public synchronized void increment() {
count++;
}
public synchronized long getCount() {
return count;
}
}
static class LockCounter {
private final ReentrantLock lock = new ReentrantLock();
private long count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public long getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
static class AtomicCounter {
private final AtomicLong count = new AtomicLong(0);
public void increment() {
count.incrementAndGet();
}
public long getCount() {
return count.get();
}
}
static class CounterTask implements Runnable {
private final Runnable incrementAction;
private final int iterations;
public CounterTask(Runnable incrementAction, int iterations) {
this.incrementAction = incrementAction;
this.iterations = iterations;
}
@Override
public void run() {
for (int i = 0; i < iterations; i++) {
incrementAction.run();
}
}
}
public static void main(String[] args) throws InterruptedException {
// 测试synchronized
SynchronizedCounter syncCounter = new SynchronizedCounter();
runBenchmark("Synchronized", () -> syncCounter.increment());
// 测试Lock
LockCounter lockCounter = new LockCounter();
runBenchmark("Lock", () -> lockCounter.increment());
// 测试Atomic
AtomicCounter atomicCounter = new AtomicCounter();
runBenchmark("Atomic", () -> atomicCounter.increment());
}
private static void runBenchmark(String name, Runnable incrementAction)
throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(() -> {
for (int j = 0; j < ITERATIONS / THREAD_COUNT; j++) {
incrementAction.run();
}
latch.countDown();
});
}
latch.await();
long endTime = System.currentTimeMillis();
executor.shutdown();
System.out.println(name + " counter time: " + (endTime - startTime) + "ms");
}
}
总结
Java并发编程涉及多个重要概念和工具:
-
线程基础:Thread、Runnable、Callable
-
同步机制:synchronized、Lock、ReadWriteLock
-
线程通信:wait/notify、BlockingQueue
-
并发工具:CountDownLatch、CyclicBarrier、Semaphore、Phaser
-
线程池:ExecutorService、ScheduledExecutorService
-
并发集合:ConcurrentHashMap、CopyOnWriteArrayList
-
原子变量:AtomicInteger、AtomicLong、AtomicReference
在实际开发中,应根据具体场景选择合适的并发工具,并注意线程安全、性能和死锁预防等问题。
更多推荐
所有评论(0)