一文带你了解Java多线程
Java多线程是Java语言的核心特性,允许程序同时执行多个任务,提高程序执行效率并充分利用多核CPU资源。本文系统介绍了Java多线程的架构、核心概念和实现方式,包括线程创建(继承Thread类、实现Runnable接口、Callable接口)、同步机制(synchronized、ReentrantLock、volatile)和线程通信(wait/notify、CountDownLatch、Cy
·
一文带你了解Java多线程
目录
概述
Java多线程是Java语言的重要特性,允许程序同时执行多个任务。多线程可以提高程序的执行效率,充分利用多核CPU资源,是现代Java应用程序开发的核心技术。
多线程优势
优势 | 说明 |
---|---|
提高性能 | 充分利用多核CPU,提高程序执行效率 |
响应性 | 避免阻塞主线程,提高用户界面响应速度 |
资源共享 | 多个线程共享进程资源,减少内存开销 |
经济性 | 比多进程更轻量级,创建和切换成本更低 |
多线程挑战
挑战 | 说明 |
---|---|
线程安全 | 共享数据访问的同步问题,需要额外的同步机制 |
死锁 | 多个线程相互等待对方释放资源,导致程序无法继续执行 |
性能开销 | 线程创建、切换、销毁的开销,过度使用可能适得其反 |
复杂性 | 并发编程的复杂性增加,调试和排错更加困难 |
多线程架构
JVM线程模型
线程生命周期
线程池架构
核心概念
1. 线程创建方式
// 方式1: 继承Thread类
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread running: " + Thread.currentThread().getName());
}
}
// 方式2: 实现Runnable接口
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("MyRunnable running: " + Thread.currentThread().getName());
}
}
// 方式3: 使用Lambda表达式
Runnable lambdaRunnable = () -> {
System.out.println("Lambda running: " + Thread.currentThread().getName());
};
// 方式4: 使用Callable接口
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "Callable result: " + Thread.currentThread().getName();
}
}
2. 线程同步机制
// synchronized关键字
public class SynchronizedExample {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
// 同步代码块
public void incrementBlock() {
synchronized (this) {
count++;
}
}
}
// ReentrantLock
public class LockExample {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
// volatile关键字
public class VolatileExample {
private volatile boolean flag = false;
public void setFlag() {
flag = true;
}
public boolean getFlag() {
return flag;
}
}
3. 线程通信
// wait/notify机制
public class WaitNotifyExample {
private final Object lock = new Object();
private boolean ready = false;
public void waitForReady() throws InterruptedException {
synchronized (lock) {
while (!ready) {
lock.wait();
}
}
}
public void setReady() {
synchronized (lock) {
ready = true;
lock.notifyAll();
}
}
}
// CountDownLatch
public class CountDownLatchExample {
private final CountDownLatch latch = new CountDownLatch(3);
public void await() throws InterruptedException {
latch.await();
}
public void countDown() {
latch.countDown();
}
}
// CyclicBarrier
public class CyclicBarrierExample {
private final CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障点");
});
public void await() throws InterruptedException, BrokenBarrierException {
barrier.await();
}
}
JDK源码实例
1. Thread类核心源码分析
// Thread类的核心字段
public class Thread implements Runnable {
// 线程名称
private volatile String name;
// 线程优先级
private int priority;
// 是否为守护线程
private boolean daemon = false;
// 线程要执行的任务
private Runnable target;
// 线程组
private ThreadGroup group;
// 线程ID
private long tid;
// 线程状态
private volatile int threadStatus;
// 线程本地变量
ThreadLocal.ThreadLocalMap threadLocals = null;
// 继承的线程本地变量
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
// start()方法实现
public synchronized void start() {
// 检查线程状态
if (threadStatus != 0)
throw new IllegalThreadStateException();
// 添加到线程组
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
// 忽略异常
}
}
}
// 本地方法start0()
private native void start0();
2. ThreadPoolExecutor源码分析
// 核心字段
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制线程池状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 工作队列
private final BlockingQueue<Runnable> workQueue;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 线程空闲时间
private volatile long keepAliveTime;
}
// execute方法实现
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果运行的线程少于corePoolSize,尝试添加核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池正在运行,尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果无法加入队列,尝试添加非核心线程
else if (!addWorker(command, false))
reject(command);
}
// addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加工作线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// 创建Worker对象
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3. ConcurrentHashMap源码分析
// 核心字段
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
// 节点数组
transient volatile Node<K,V>[] table;
// 下一个要使用的表
private transient volatile Node<K,V>[] nextTable;
// 基础计数器
private transient volatile long baseCount;
// 表初始化和调整控制
private transient volatile int sizeCtl;
// 调整大小时的线程数
private transient volatile int transferIndex;
// 自旋锁
private transient volatile int cellsBusy;
}
// put方法实现
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
面试高频点
1. 线程基础
Q1: 线程和进程的区别?
A:
- 进程: 操作系统分配资源的基本单位,有独立的内存空间
- 线程: CPU调度的基本单位,共享进程的内存空间
- 关系: 一个进程可以包含多个线程,线程是进程的执行单元
Q2: 线程的生命周期?
A:
- New: 新建状态,线程被创建但未启动
- Runnable: 可运行状态,线程可能正在运行或等待CPU时间片
- Running: 运行状态,线程正在执行
- Blocked: 阻塞状态,线程等待获取锁
- Waiting: 等待状态,线程等待其他线程通知
- Timed_Waiting: 计时等待状态,线程等待指定时间
- Terminated: 终止状态,线程执行完成或异常退出
Q3: 如何创建线程?
A:
// 方式1: 继承Thread类
class MyThread extends Thread {
public void run() { /* 线程逻辑 */ }
}
MyThread t1 = new MyThread();
t1.start();
// 方式2: 实现Runnable接口
class MyRunnable implements Runnable {
public void run() { /* 线程逻辑 */ }
}
Thread t2 = new Thread(new MyRunnable());
t2.start();
// 方式3: 使用Lambda表达式
Thread t3 = new Thread(() -> { /* 线程逻辑 */ });
t3.start();
// 方式4: 使用线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> { /* 线程逻辑 */ });
2. 线程同步
Q4: synchronized和volatile的区别?
A:
-
synchronized:
- 保证原子性、可见性、有序性
- 可以修饰方法和代码块
- 重量级锁,性能开销大
-
volatile:
- 只保证可见性和有序性,不保证原子性
- 只能修饰变量
- 轻量级,性能开销小
Q5: 什么是线程安全?如何保证?
A:
- 线程安全: 多线程环境下,程序能够正确执行,数据一致
- 保证方式:
- 使用synchronized关键字
- 使用Lock接口实现类
- 使用原子类(AtomicInteger等)
- 使用线程安全的集合类
- 使用ThreadLocal
Q6: 什么是死锁?如何避免?
A:
- 死锁: 多个线程相互等待对方释放资源,导致程序无法继续执行
- 避免方法:
- 避免嵌套锁
- 使用锁的顺序性
- 使用tryLock()方法
- 设置锁超时时间
3. 线程池
Q7: 线程池的核心参数有哪些?
A:
- corePoolSize: 核心线程数
- maximumPoolSize: 最大线程数
- keepAliveTime: 线程空闲时间
- workQueue: 工作队列
- threadFactory: 线程工厂
- handler: 拒绝策略
Q8: 线程池的拒绝策略有哪些?
A:
- AbortPolicy: 直接抛出异常(默认)
- CallerRunsPolicy: 由调用线程执行任务
- DiscardPolicy: 丢弃任务,不抛出异常
- DiscardOldestPolicy: 丢弃队列中最老的任务
Q9: 如何选择合适的线程池?
A:
- CPU密集型: 线程数 = CPU核心数 + 1
- IO密集型: 线程数 = CPU核心数 * 2
- 混合型: 根据实际情况调整
4. 并发工具类
Q10: CountDownLatch和CyclicBarrier的区别?
A:
-
CountDownLatch:
- 一次性使用,计数到0后不能重置
- 等待其他线程完成
-
CyclicBarrier:
- 可重复使用,计数到0后自动重置
- 等待所有线程到达同步点
Q11: ThreadLocal的原理和内存泄漏?
A:
- 原理: 每个线程维护一个ThreadLocalMap,存储线程本地变量
- 内存泄漏: ThreadLocalMap的key是弱引用,value是强引用,可能导致内存泄漏
- 解决: 使用完ThreadLocal后调用remove()方法
重难点分析
1. 内存模型和可见性
问题描述
Java内存模型(JMM)定义了线程如何与内存交互,多线程环境下的可见性问题是最常见的并发问题。
核心概念
// 内存可见性问题示例
public class VisibilityProblem {
private boolean flag = false; // 没有volatile修饰
public void setFlag() {
flag = true; // 线程A修改
}
public boolean getFlag() {
return flag; // 线程B读取,可能看不到线程A的修改
}
}
// 解决方案1: 使用volatile
public class VisibilitySolution1 {
private volatile boolean flag = false;
public void setFlag() {
flag = true;
}
public boolean getFlag() {
return flag;
}
}
// 解决方案2: 使用synchronized
public class VisibilitySolution2 {
private boolean flag = false;
public synchronized void setFlag() {
flag = true;
}
public synchronized boolean getFlag() {
return flag;
}
}
重难点分析
- happens-before原则: 理解Java内存模型的核心原则
- 内存屏障: 理解volatile和synchronized如何插入内存屏障
- 指令重排序: 理解编译器和处理器的指令重排序优化
2. 锁的优化和升级
问题描述
Java中的锁有多种状态,从轻量级到重量级,理解锁的升级过程对于性能优化很重要。
锁的状态
// 偏向锁、轻量级锁、重量级锁的示例
public class LockUpgradeExample {
private Object lock = new Object();
public void method1() {
synchronized (lock) {
// 第一次获取锁,偏向锁
System.out.println("偏向锁");
}
}
public void method2() {
synchronized (lock) {
// 有竞争,升级为轻量级锁
System.out.println("轻量级锁");
}
}
public void method3() {
synchronized (lock) {
// 竞争激烈,升级为重量级锁
System.out.println("重量级锁");
}
}
}
重难点分析
- 偏向锁: 减少无竞争情况下的锁开销
- 轻量级锁: 使用CAS操作避免重量级锁
- 重量级锁: 使用操作系统互斥量
- 锁升级过程: 理解锁从偏向锁到重量级锁的升级过程
3. 线程池的调优
问题描述
线程池参数配置不当可能导致性能问题,如线程饥饿、内存溢出等。
调优策略
// 线程池调优示例
public class ThreadPoolTuningExample {
// CPU密集型任务
public ExecutorService createCpuIntensivePool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCores + 1, // 核心线程数
cpuCores + 1, // 最大线程数
0L, // 空闲时间
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), // 无界队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
// IO密集型任务
public ExecutorService createIoIntensivePool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCores * 2, // 核心线程数
cpuCores * 4, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
}
// 混合型任务
public ExecutorService createMixedPool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCores, // 核心线程数
cpuCores * 2, // 最大线程数
30L, // 空闲时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
}
重难点分析
- 队列选择: 无界队列vs有界队列
- 线程数计算: 根据任务类型计算合适的线程数
- 拒绝策略: 选择合适的拒绝策略
- 监控和调优: 实时监控线程池状态,动态调整参数
4. 并发集合的实现原理
问题描述
理解并发集合的实现原理有助于正确使用和性能优化。
核心原理
// ConcurrentHashMap分段锁原理
public class SegmentLockExample {
// 模拟分段锁
private final Object[] segments = new Object[16];
public SegmentLockExample() {
for (int i = 0; i < segments.length; i++) {
segments[i] = new Object();
}
}
public void put(String key, String value) {
int hash = key.hashCode();
int segmentIndex = hash % segments.length;
synchronized (segments[segmentIndex]) {
// 只锁定对应的段,提高并发性
System.out.println("Put: " + key + " -> " + value + " in segment " + segmentIndex);
}
}
public String get(String key) {
int hash = key.hashCode();
int segmentIndex = hash % segments.length;
synchronized (segments[segmentIndex]) {
// 只锁定对应的段
System.out.println("Get: " + key + " from segment " + segmentIndex);
return "value"; // 模拟返回值
}
}
}
重难点分析
- 分段锁: 理解ConcurrentHashMap的分段锁机制
- CAS操作: 理解无锁算法的实现
- 扩容机制: 理解并发环境下的扩容过程
- 性能对比: 理解不同并发集合的性能特点
应用场景
1. Web服务器
场景描述
Web服务器需要同时处理多个客户端请求,多线程可以提高并发处理能力。
实现示例
// 简单的多线程Web服务器
public class MultiThreadedWebServer {
private final ServerSocket serverSocket;
private final ExecutorService threadPool;
public MultiThreadedWebServer(int port, int threadPoolSize) throws IOException {
this.serverSocket = new ServerSocket(port);
this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
}
public void start() {
System.out.println("Web服务器启动,监听端口: " + serverSocket.getLocalPort());
while (true) {
try {
Socket clientSocket = serverSocket.accept();
threadPool.submit(new ClientHandler(clientSocket));
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static class ClientHandler implements Runnable {
private final Socket clientSocket;
public ClientHandler(Socket socket) {
this.clientSocket = socket;
}
@Override
public void run() {
try {
// 处理客户端请求
handleRequest(clientSocket);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleRequest(Socket socket) throws IOException {
// 模拟处理HTTP请求
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String request = in.readLine();
System.out.println("处理请求: " + request + " 线程: " + Thread.currentThread().getName());
// 模拟处理时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 返回响应
out.println("HTTP/1.1 200 OK");
out.println("Content-Type: text/html");
out.println();
out.println("<html><body><h1>Hello from " + Thread.currentThread().getName() + "</h1></body></html>");
}
}
public static void main(String[] args) throws IOException {
MultiThreadedWebServer server = new MultiThreadedWebServer(8080, 10);
server.start();
}
}
2. 数据处理
场景描述
大数据处理场景下,多线程可以并行处理数据,提高处理效率。
实现示例
// 多线程数据处理示例
public class MultiThreadedDataProcessor {
public static void main(String[] args) throws InterruptedException {
// 模拟大量数据
List<String> data = generateData(1000000);
// 单线程处理
long startTime = System.currentTimeMillis();
processDataSingleThread(data);
long singleThreadTime = System.currentTimeMillis() - startTime;
// 多线程处理
startTime = System.currentTimeMillis();
processDataMultiThread(data, 8);
long multiThreadTime = System.currentTimeMillis() - startTime;
System.out.println("单线程处理时间: " + singleThreadTime + "ms");
System.out.println("多线程处理时间: " + multiThreadTime + "ms");
System.out.println("性能提升: " + (double)singleThreadTime / multiThreadTime + "倍");
}
private static List<String> generateData(int size) {
List<String> data = new ArrayList<>();
for (int i = 0; i < size; i++) {
data.add("Data-" + i);
}
return data;
}
private static void processDataSingleThread(List<String> data) {
for (String item : data) {
processItem(item);
}
}
private static void processDataMultiThread(List<String> data, int threadCount) throws InterruptedException {
int batchSize = data.size() / threadCount;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int startIndex = i * batchSize;
final int endIndex = (i == threadCount - 1) ? data.size() : (i + 1) * batchSize;
new Thread(() -> {
try {
for (int j = startIndex; j < endIndex; j++) {
processItem(data.get(j));
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
}
private static void processItem(String item) {
// 模拟数据处理
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
3. 异步任务处理
场景描述
异步任务处理可以提高系统响应性,避免阻塞主线程。
实现示例
// 异步任务处理示例
public class AsyncTaskProcessor {
private final ExecutorService executorService;
private final BlockingQueue<Future<?>> taskQueue;
public AsyncTaskProcessor(int threadPoolSize) {
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.taskQueue = new LinkedBlockingQueue<>();
}
public Future<String> submitTask(Callable<String> task) {
Future<String> future = executorService.submit(task);
taskQueue.offer(future);
return future;
}
public void processCompletedTasks() {
List<Future<?>> completedTasks = new ArrayList<>();
// 收集已完成的任务
taskQueue.drainTo(completedTasks);
for (Future<?> future : completedTasks) {
if (future.isDone()) {
try {
String result = (String) future.get();
System.out.println("任务完成,结果: " + result);
} catch (Exception e) {
System.err.println("任务执行异常: " + e.getMessage());
}
} else {
// 任务未完成,重新放回队列
taskQueue.offer(future);
}
}
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
AsyncTaskProcessor processor = new AsyncTaskProcessor(4);
// 提交多个异步任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
processor.submitTask(() -> {
// 模拟任务执行
Thread.sleep(1000 + new Random().nextInt(2000));
return "Task-" + taskId + " completed";
});
}
// 定期检查完成的任务
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
processor.processCompletedTasks();
}, 0, 500, TimeUnit.MILLISECONDS);
// 运行一段时间后关闭
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.shutdown();
processor.shutdown();
}
}
4. 生产者-消费者模式
场景描述
生产者-消费者模式是多线程编程的经典模式,适用于任务队列、消息队列等场景。
实现示例
// 生产者-消费者模式示例
public class ProducerConsumerExample {
private final BlockingQueue<Integer> queue;
private final int maxSize;
private volatile boolean running = true;
public ProducerConsumerExample(int maxSize) {
this.queue = new ArrayBlockingQueue<>(maxSize);
this.maxSize = maxSize;
}
// 生产者
public class Producer implements Runnable {
private final String name;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
try {
int item = 0;
while (running) {
queue.put(item);
System.out.println(name + " 生产: " + item);
item++;
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
public class Consumer implements Runnable {
private final String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
try {
while (running) {
Integer item = queue.take();
System.out.println(name + " 消费: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void start() {
// 启动生产者
Thread producer1 = new Thread(new Producer("Producer-1"));
Thread producer2 = new Thread(new Producer("Producer-2"));
// 启动消费者
Thread consumer1 = new Thread(new Consumer("Consumer-1"));
Thread consumer2 = new Thread(new Consumer("Consumer-2"));
Thread consumer3 = new Thread(new Consumer("Consumer-3"));
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
// 运行一段时间后停止
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
running = false;
// 等待线程结束
try {
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
consumer3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample(10);
example.start();
}
}
最佳实践
1. 线程安全编程
// 线程安全的最佳实践
public class ThreadSafetyBestPractices {
// 1. 使用不可变对象
public static final class ImmutablePoint {
private final int x;
private final int y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() { return x; }
public int getY() { return y; }
public ImmutablePoint move(int dx, int dy) {
return new ImmutablePoint(x + dx, y + dy);
}
}
// 2. 使用线程安全的集合
private final Map<String, String> threadSafeMap = new ConcurrentHashMap<>();
private final List<String> threadSafeList = new CopyOnWriteArrayList<>();
// 3. 使用原子类
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicReference<String> reference = new AtomicReference<>();
// 4. 使用ThreadLocal
private static final ThreadLocal<SimpleDateFormat> dateFormatHolder =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
// 5. 正确的同步方式
private final Object lock = new Object();
private int sharedCounter = 0;
public void incrementCounter() {
synchronized (lock) {
sharedCounter++;
}
}
public int getCounter() {
synchronized (lock) {
return sharedCounter;
}
}
}
2. 性能优化
// 多线程性能优化示例
public class PerformanceOptimizationExample {
// 1. 使用合适的线程池大小
public ExecutorService createOptimizedThreadPool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCores, // 核心线程数
cpuCores * 2, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
// 2. 使用Fork/Join框架处理分治任务
public class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 10000;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
int mid = (start + end) >>> 1;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join();
}
}
// 3. 使用CompletableFuture进行异步编程
public CompletableFuture<String> processAsync(String input) {
return CompletableFuture
.supplyAsync(() -> processInput(input))
.thenApply(this::transformResult)
.thenApply(this::formatOutput);
}
private String processInput(String input) {
// 模拟处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + input;
}
private String transformResult(String result) {
return result.toUpperCase();
}
private String formatOutput(String output) {
return "[" + output + "]";
}
}
3. 错误处理
// 多线程错误处理最佳实践
public class ErrorHandlingBestPractices {
// 1. 正确处理InterruptedException
public void handleInterruptedException() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
// 清理资源
cleanup();
// 退出方法
return;
}
}
// 2. 使用UncaughtExceptionHandler
public void setUncaughtExceptionHandler() {
Thread thread = new Thread(() -> {
throw new RuntimeException("测试异常");
});
thread.setUncaughtExceptionHandler((t, e) -> {
System.err.println("线程 " + t.getName() + " 发生异常: " + e.getMessage());
// 记录日志
logError(t, e);
// 发送告警
sendAlert(t, e);
});
thread.start();
}
// 3. 线程池异常处理
public ExecutorService createExceptionHandlingThreadPool() {
ThreadFactory threadFactory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("线程池线程异常: " + throwable.getMessage());
logError(thread, throwable);
});
return t;
};
return Executors.newFixedThreadPool(10, threadFactory);
}
// 4. 使用Future处理异常
public void handleFutureException() {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<String> future = executor.submit(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功";
});
try {
String result = future.get(5, TimeUnit.SECONDS);
System.out.println("结果: " + result);
} catch (TimeoutException e) {
System.err.println("任务超时");
future.cancel(true);
} catch (ExecutionException e) {
System.err.println("任务执行异常: " + e.getCause().getMessage());
}
} finally {
executor.shutdown();
}
}
private void cleanup() {
// 清理资源
}
private void logError(Thread thread, Throwable error) {
// 记录错误日志
}
private void sendAlert(Thread thread, Throwable error) {
// 发送告警
}
}
总结
1. 学习路径图
2. 技能掌握层次图
3. 学习重点分布图
4. 技能树结构图
5. 学习收益关系图
6. 实践建议流程图
7. 关键要点总结
要点 | 说明 | 重要程度 |
---|---|---|
多线程基础 | 理解线程创建、生命周期、同步机制 | ⭐⭐⭐⭐⭐ |
线程安全 | 掌握synchronized、volatile、Lock等同步工具 | ⭐⭐⭐⭐⭐ |
线程池 | 理解线程池原理、参数配置、性能调优 | ⭐⭐⭐⭐ |
并发工具 | 熟练使用CountDownLatch、CyclicBarrier、Semaphore等 | ⭐⭐⭐⭐ |
性能优化 | 掌握锁优化、线程池调优、异步编程等技巧 | ⭐⭐⭐ |
8. 学习建议
建议 | 说明 | 执行难度 |
---|---|---|
理论结合实践 | 理解概念后多写代码验证 | 中等 |
阅读源码 | 深入理解JDK并发类的实现原理 | 困难 |
性能测试 | 通过基准测试验证性能优化效果 | 中等 |
问题排查 | 学会使用工具诊断多线程问题 | 困难 |
9. 发展方向
方向 | 说明 | 发展前景 |
---|---|---|
响应式编程 | 学习Reactor、RxJava等响应式框架 | ⭐⭐⭐⭐⭐ |
异步编程 | 掌握CompletableFuture、异步IO等 | ⭐⭐⭐⭐ |
分布式并发 | 学习分布式锁、分布式事务等 | ⭐⭐⭐⭐⭐ |
性能调优 | 深入JVM调优、系统性能优化 | ⭐⭐⭐⭐ |
10. 最终建议
通过系统学习Java多线程,可以:
- 提高程序性能和响应性 - 充分利用多核CPU资源
- 解决复杂的并发编程问题 - 为高并发系统开发打下基础
- 提升技术深度和广度 - 在职业发展中获得竞争优势
实践建议: 在实际项目中多应用多线程技术,通过实践加深理解,逐步掌握并发编程的精髓。记住:理论指导实践,实践验证理论。
更多推荐
所有评论(0)