一文带你了解Java多线程

目录

  1. 概述
  2. 多线程架构
  3. 核心概念
  4. JDK源码实例
  5. 面试高频点
  6. 重难点分析
  7. 应用场景
  8. 最佳实践
  9. 总结

概述

Java多线程是Java语言的重要特性,允许程序同时执行多个任务。多线程可以提高程序的执行效率,充分利用多核CPU资源,是现代Java应用程序开发的核心技术。

多线程优势

优势 说明
提高性能 充分利用多核CPU,提高程序执行效率
响应性 避免阻塞主线程,提高用户界面响应速度
资源共享 多个线程共享进程资源,减少内存开销
经济性 比多进程更轻量级,创建和切换成本更低

多线程挑战

挑战 说明
线程安全 共享数据访问的同步问题,需要额外的同步机制
死锁 多个线程相互等待对方释放资源,导致程序无法继续执行
性能开销 线程创建、切换、销毁的开销,过度使用可能适得其反
复杂性 并发编程的复杂性增加,调试和排错更加困难

多线程架构

JVM线程模型

Java应用程序
JVM
用户线程
系统线程
主线程
工作线程1
工作线程2
工作线程N
GC线程
编译线程
信号处理线程
操作系统线程
CPU核心1
CPU核心2
CPU核心N

线程生命周期

start()
获得CPU时间片
时间片用完
wait(), sleep(), join()
被唤醒
执行完成/异常
异常
New
Runnable
Running
Blocked
Terminated

线程池架构

任务提交
线程池
核心线程池
任务队列
非核心线程池
线程1
线程2
线程N
临时线程1
临时线程2
LinkedBlockingQueue
ArrayBlockingQueue
SynchronousQueue

核心概念

1. 线程创建方式

// 方式1: 继承Thread类
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("MyThread running: " + Thread.currentThread().getName());
    }
}

// 方式2: 实现Runnable接口
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("MyRunnable running: " + Thread.currentThread().getName());
    }
}

// 方式3: 使用Lambda表达式
Runnable lambdaRunnable = () -> {
    System.out.println("Lambda running: " + Thread.currentThread().getName());
};

// 方式4: 使用Callable接口
public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Callable result: " + Thread.currentThread().getName();
    }
}

2. 线程同步机制

// synchronized关键字
public class SynchronizedExample {
    private int count = 0;
  
    // 同步方法
    public synchronized void increment() {
        count++;
    }
  
    // 同步代码块
    public void incrementBlock() {
        synchronized (this) {
            count++;
        }
    }
}

// ReentrantLock
public class LockExample {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;
  
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

// volatile关键字
public class VolatileExample {
    private volatile boolean flag = false;
  
    public void setFlag() {
        flag = true;
    }
  
    public boolean getFlag() {
        return flag;
    }
}

3. 线程通信

// wait/notify机制
public class WaitNotifyExample {
    private final Object lock = new Object();
    private boolean ready = false;
  
    public void waitForReady() throws InterruptedException {
        synchronized (lock) {
            while (!ready) {
                lock.wait();
            }
        }
    }
  
    public void setReady() {
        synchronized (lock) {
            ready = true;
            lock.notifyAll();
        }
    }
}

// CountDownLatch
public class CountDownLatchExample {
    private final CountDownLatch latch = new CountDownLatch(3);
  
    public void await() throws InterruptedException {
        latch.await();
    }
  
    public void countDown() {
        latch.countDown();
    }
}

// CyclicBarrier
public class CyclicBarrierExample {
    private final CyclicBarrier barrier = new CyclicBarrier(3, () -> {
        System.out.println("所有线程到达屏障点");
    });
  
    public void await() throws InterruptedException, BrokenBarrierException {
        barrier.await();
    }
}

JDK源码实例

1. Thread类核心源码分析

// Thread类的核心字段
public class Thread implements Runnable {
    // 线程名称
    private volatile String name;
  
    // 线程优先级
    private int priority;
  
    // 是否为守护线程
    private boolean daemon = false;
  
    // 线程要执行的任务
    private Runnable target;
  
    // 线程组
    private ThreadGroup group;
  
    // 线程ID
    private long tid;
  
    // 线程状态
    private volatile int threadStatus;
  
    // 线程本地变量
    ThreadLocal.ThreadLocalMap threadLocals = null;
  
    // 继承的线程本地变量
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

// start()方法实现
public synchronized void start() {
    // 检查线程状态
    if (threadStatus != 0)
        throw new IllegalThreadStateException();
  
    // 添加到线程组
    group.add(this);
  
    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            // 忽略异常
        }
    }
}

// 本地方法start0()
private native void start0();

2. ThreadPoolExecutor源码分析

// 核心字段
public class ThreadPoolExecutor extends AbstractExecutorService {
    // 控制线程池状态和线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  
    // 工作队列
    private final BlockingQueue<Runnable> workQueue;
  
    // 线程工厂
    private volatile ThreadFactory threadFactory;
  
    // 拒绝策略
    private volatile RejectedExecutionHandler handler;
  
    // 核心线程数
    private volatile int corePoolSize;
  
    // 最大线程数
    private volatile int maximumPoolSize;
  
    // 线程空闲时间
    private volatile long keepAliveTime;
}

// execute方法实现
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
  
    int c = ctl.get();
  
    // 如果运行的线程少于corePoolSize,尝试添加核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
  
    // 如果线程池正在运行,尝试将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  
    // 如果无法加入队列,尝试添加非核心线程
    else if (!addWorker(command, false))
        reject(command);
}

// addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
      
        // 检查线程池状态
        if (rs >= SHUTDOWN &&
            !(rs == SHUTDOWN &&
              firstTask == null &&
              !workQueue.isEmpty()))
            return false;
      
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
          
            // 尝试增加工作线程数量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
  
    // 创建Worker对象
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3. ConcurrentHashMap源码分析

// 核心字段
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
  
    // 节点数组
    transient volatile Node<K,V>[] table;
  
    // 下一个要使用的表
    private transient volatile Node<K,V>[] nextTable;
  
    // 基础计数器
    private transient volatile long baseCount;
  
    // 表初始化和调整控制
    private transient volatile int sizeCtl;
  
    // 调整大小时的线程数
    private transient volatile int transferIndex;
  
    // 自旋锁
    private transient volatile int cellsBusy;
}

// put方法实现
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
  
    int hash = spread(key.hashCode());
    int binCount = 0;
  
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
      
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

面试高频点

1. 线程基础

Q1: 线程和进程的区别?

A:

  • 进程: 操作系统分配资源的基本单位,有独立的内存空间
  • 线程: CPU调度的基本单位,共享进程的内存空间
  • 关系: 一个进程可以包含多个线程,线程是进程的执行单元
Q2: 线程的生命周期?

A:

  • New: 新建状态,线程被创建但未启动
  • Runnable: 可运行状态,线程可能正在运行或等待CPU时间片
  • Running: 运行状态,线程正在执行
  • Blocked: 阻塞状态,线程等待获取锁
  • Waiting: 等待状态,线程等待其他线程通知
  • Timed_Waiting: 计时等待状态,线程等待指定时间
  • Terminated: 终止状态,线程执行完成或异常退出
Q3: 如何创建线程?

A:

// 方式1: 继承Thread类
class MyThread extends Thread {
    public void run() { /* 线程逻辑 */ }
}
MyThread t1 = new MyThread();
t1.start();

// 方式2: 实现Runnable接口
class MyRunnable implements Runnable {
    public void run() { /* 线程逻辑 */ }
}
Thread t2 = new Thread(new MyRunnable());
t2.start();

// 方式3: 使用Lambda表达式
Thread t3 = new Thread(() -> { /* 线程逻辑 */ });
t3.start();

// 方式4: 使用线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> { /* 线程逻辑 */ });

2. 线程同步

Q4: synchronized和volatile的区别?

A:

  • synchronized:

    • 保证原子性、可见性、有序性
    • 可以修饰方法和代码块
    • 重量级锁,性能开销大
  • volatile:

    • 只保证可见性和有序性,不保证原子性
    • 只能修饰变量
    • 轻量级,性能开销小
Q5: 什么是线程安全?如何保证?

A:

  • 线程安全: 多线程环境下,程序能够正确执行,数据一致
  • 保证方式:
    • 使用synchronized关键字
    • 使用Lock接口实现类
    • 使用原子类(AtomicInteger等)
    • 使用线程安全的集合类
    • 使用ThreadLocal
Q6: 什么是死锁?如何避免?

A:

  • 死锁: 多个线程相互等待对方释放资源,导致程序无法继续执行
  • 避免方法:
    • 避免嵌套锁
    • 使用锁的顺序性
    • 使用tryLock()方法
    • 设置锁超时时间

3. 线程池

Q7: 线程池的核心参数有哪些?

A:

  • corePoolSize: 核心线程数
  • maximumPoolSize: 最大线程数
  • keepAliveTime: 线程空闲时间
  • workQueue: 工作队列
  • threadFactory: 线程工厂
  • handler: 拒绝策略
Q8: 线程池的拒绝策略有哪些?

A:

  • AbortPolicy: 直接抛出异常(默认)
  • CallerRunsPolicy: 由调用线程执行任务
  • DiscardPolicy: 丢弃任务,不抛出异常
  • DiscardOldestPolicy: 丢弃队列中最老的任务
Q9: 如何选择合适的线程池?

A:

  • CPU密集型: 线程数 = CPU核心数 + 1
  • IO密集型: 线程数 = CPU核心数 * 2
  • 混合型: 根据实际情况调整

4. 并发工具类

Q10: CountDownLatch和CyclicBarrier的区别?

A:

  • CountDownLatch:

    • 一次性使用,计数到0后不能重置
    • 等待其他线程完成
  • CyclicBarrier:

    • 可重复使用,计数到0后自动重置
    • 等待所有线程到达同步点
Q11: ThreadLocal的原理和内存泄漏?

A:

  • 原理: 每个线程维护一个ThreadLocalMap,存储线程本地变量
  • 内存泄漏: ThreadLocalMap的key是弱引用,value是强引用,可能导致内存泄漏
  • 解决: 使用完ThreadLocal后调用remove()方法

重难点分析

1. 内存模型和可见性

问题描述

Java内存模型(JMM)定义了线程如何与内存交互,多线程环境下的可见性问题是最常见的并发问题。

核心概念
// 内存可见性问题示例
public class VisibilityProblem {
    private boolean flag = false; // 没有volatile修饰
  
    public void setFlag() {
        flag = true; // 线程A修改
    }
  
    public boolean getFlag() {
        return flag; // 线程B读取,可能看不到线程A的修改
    }
}

// 解决方案1: 使用volatile
public class VisibilitySolution1 {
    private volatile boolean flag = false;
  
    public void setFlag() {
        flag = true;
    }
  
    public boolean getFlag() {
        return flag;
    }
}

// 解决方案2: 使用synchronized
public class VisibilitySolution2 {
    private boolean flag = false;
  
    public synchronized void setFlag() {
        flag = true;
    }
  
    public synchronized boolean getFlag() {
        return flag;
    }
}
重难点分析
  • happens-before原则: 理解Java内存模型的核心原则
  • 内存屏障: 理解volatile和synchronized如何插入内存屏障
  • 指令重排序: 理解编译器和处理器的指令重排序优化

2. 锁的优化和升级

问题描述

Java中的锁有多种状态,从轻量级到重量级,理解锁的升级过程对于性能优化很重要。

锁的状态
// 偏向锁、轻量级锁、重量级锁的示例
public class LockUpgradeExample {
    private Object lock = new Object();
  
    public void method1() {
        synchronized (lock) {
            // 第一次获取锁,偏向锁
            System.out.println("偏向锁");
        }
    }
  
    public void method2() {
        synchronized (lock) {
            // 有竞争,升级为轻量级锁
            System.out.println("轻量级锁");
        }
    }
  
    public void method3() {
        synchronized (lock) {
            // 竞争激烈,升级为重量级锁
            System.out.println("重量级锁");
        }
    }
}
重难点分析
  • 偏向锁: 减少无竞争情况下的锁开销
  • 轻量级锁: 使用CAS操作避免重量级锁
  • 重量级锁: 使用操作系统互斥量
  • 锁升级过程: 理解锁从偏向锁到重量级锁的升级过程

3. 线程池的调优

问题描述

线程池参数配置不当可能导致性能问题,如线程饥饿、内存溢出等。

调优策略
// 线程池调优示例
public class ThreadPoolTuningExample {
  
    // CPU密集型任务
    public ExecutorService createCpuIntensivePool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCores + 1,           // 核心线程数
            cpuCores + 1,           // 最大线程数
            0L,                     // 空闲时间
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), // 无界队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
    }
  
    // IO密集型任务
    public ExecutorService createIoIntensivePool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCores * 2,           // 核心线程数
            cpuCores * 4,           // 最大线程数
            60L,                    // 空闲时间
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000), // 有界队列
            new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );
    }
  
    // 混合型任务
    public ExecutorService createMixedPool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCores,               // 核心线程数
            cpuCores * 2,           // 最大线程数
            30L,                    // 空闲时间
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500), // 有界队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
    }
}
重难点分析
  • 队列选择: 无界队列vs有界队列
  • 线程数计算: 根据任务类型计算合适的线程数
  • 拒绝策略: 选择合适的拒绝策略
  • 监控和调优: 实时监控线程池状态,动态调整参数

4. 并发集合的实现原理

问题描述

理解并发集合的实现原理有助于正确使用和性能优化。

核心原理
// ConcurrentHashMap分段锁原理
public class SegmentLockExample {
  
    // 模拟分段锁
    private final Object[] segments = new Object[16];
  
    public SegmentLockExample() {
        for (int i = 0; i < segments.length; i++) {
            segments[i] = new Object();
        }
    }
  
    public void put(String key, String value) {
        int hash = key.hashCode();
        int segmentIndex = hash % segments.length;
      
        synchronized (segments[segmentIndex]) {
            // 只锁定对应的段,提高并发性
            System.out.println("Put: " + key + " -> " + value + " in segment " + segmentIndex);
        }
    }
  
    public String get(String key) {
        int hash = key.hashCode();
        int segmentIndex = hash % segments.length;
      
        synchronized (segments[segmentIndex]) {
            // 只锁定对应的段
            System.out.println("Get: " + key + " from segment " + segmentIndex);
            return "value"; // 模拟返回值
        }
    }
}
重难点分析
  • 分段锁: 理解ConcurrentHashMap的分段锁机制
  • CAS操作: 理解无锁算法的实现
  • 扩容机制: 理解并发环境下的扩容过程
  • 性能对比: 理解不同并发集合的性能特点

应用场景

1. Web服务器

场景描述

Web服务器需要同时处理多个客户端请求,多线程可以提高并发处理能力。

实现示例
// 简单的多线程Web服务器
public class MultiThreadedWebServer {
    private final ServerSocket serverSocket;
    private final ExecutorService threadPool;
  
    public MultiThreadedWebServer(int port, int threadPoolSize) throws IOException {
        this.serverSocket = new ServerSocket(port);
        this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
    }
  
    public void start() {
        System.out.println("Web服务器启动,监听端口: " + serverSocket.getLocalPort());
      
        while (true) {
            try {
                Socket clientSocket = serverSocket.accept();
                threadPool.submit(new ClientHandler(clientSocket));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  
    private static class ClientHandler implements Runnable {
        private final Socket clientSocket;
      
        public ClientHandler(Socket socket) {
            this.clientSocket = socket;
        }
      
        @Override
        public void run() {
            try {
                // 处理客户端请求
                handleRequest(clientSocket);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
      
        private void handleRequest(Socket socket) throws IOException {
            // 模拟处理HTTP请求
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
          
            String request = in.readLine();
            System.out.println("处理请求: " + request + " 线程: " + Thread.currentThread().getName());
          
            // 模拟处理时间
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
          
            // 返回响应
            out.println("HTTP/1.1 200 OK");
            out.println("Content-Type: text/html");
            out.println();
            out.println("<html><body><h1>Hello from " + Thread.currentThread().getName() + "</h1></body></html>");
        }
    }
  
    public static void main(String[] args) throws IOException {
        MultiThreadedWebServer server = new MultiThreadedWebServer(8080, 10);
        server.start();
    }
}

2. 数据处理

场景描述

大数据处理场景下,多线程可以并行处理数据,提高处理效率。

实现示例
// 多线程数据处理示例
public class MultiThreadedDataProcessor {
  
    public static void main(String[] args) throws InterruptedException {
        // 模拟大量数据
        List<String> data = generateData(1000000);
      
        // 单线程处理
        long startTime = System.currentTimeMillis();
        processDataSingleThread(data);
        long singleThreadTime = System.currentTimeMillis() - startTime;
      
        // 多线程处理
        startTime = System.currentTimeMillis();
        processDataMultiThread(data, 8);
        long multiThreadTime = System.currentTimeMillis() - startTime;
      
        System.out.println("单线程处理时间: " + singleThreadTime + "ms");
        System.out.println("多线程处理时间: " + multiThreadTime + "ms");
        System.out.println("性能提升: " + (double)singleThreadTime / multiThreadTime + "倍");
    }
  
    private static List<String> generateData(int size) {
        List<String> data = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            data.add("Data-" + i);
        }
        return data;
    }
  
    private static void processDataSingleThread(List<String> data) {
        for (String item : data) {
            processItem(item);
        }
    }
  
    private static void processDataMultiThread(List<String> data, int threadCount) throws InterruptedException {
        int batchSize = data.size() / threadCount;
        CountDownLatch latch = new CountDownLatch(threadCount);
      
        for (int i = 0; i < threadCount; i++) {
            final int startIndex = i * batchSize;
            final int endIndex = (i == threadCount - 1) ? data.size() : (i + 1) * batchSize;
          
            new Thread(() -> {
                try {
                    for (int j = startIndex; j < endIndex; j++) {
                        processItem(data.get(j));
                    }
                } finally {
                    latch.countDown();
                }
            }).start();
        }
      
        latch.await();
    }
  
    private static void processItem(String item) {
        // 模拟数据处理
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3. 异步任务处理

场景描述

异步任务处理可以提高系统响应性,避免阻塞主线程。

实现示例
// 异步任务处理示例
public class AsyncTaskProcessor {
    private final ExecutorService executorService;
    private final BlockingQueue<Future<?>> taskQueue;
  
    public AsyncTaskProcessor(int threadPoolSize) {
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
        this.taskQueue = new LinkedBlockingQueue<>();
    }
  
    public Future<String> submitTask(Callable<String> task) {
        Future<String> future = executorService.submit(task);
        taskQueue.offer(future);
        return future;
    }
  
    public void processCompletedTasks() {
        List<Future<?>> completedTasks = new ArrayList<>();
      
        // 收集已完成的任务
        taskQueue.drainTo(completedTasks);
      
        for (Future<?> future : completedTasks) {
            if (future.isDone()) {
                try {
                    String result = (String) future.get();
                    System.out.println("任务完成,结果: " + result);
                } catch (Exception e) {
                    System.err.println("任务执行异常: " + e.getMessage());
                }
            } else {
                // 任务未完成,重新放回队列
                taskQueue.offer(future);
            }
        }
    }
  
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
  
    public static void main(String[] args) {
        AsyncTaskProcessor processor = new AsyncTaskProcessor(4);
      
        // 提交多个异步任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            processor.submitTask(() -> {
                // 模拟任务执行
                Thread.sleep(1000 + new Random().nextInt(2000));
                return "Task-" + taskId + " completed";
            });
        }
      
        // 定期检查完成的任务
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            processor.processCompletedTasks();
        }, 0, 500, TimeUnit.MILLISECONDS);
      
        // 运行一段时间后关闭
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
      
        scheduler.shutdown();
        processor.shutdown();
    }
}

4. 生产者-消费者模式

场景描述

生产者-消费者模式是多线程编程的经典模式,适用于任务队列、消息队列等场景。

实现示例
// 生产者-消费者模式示例
public class ProducerConsumerExample {
    private final BlockingQueue<Integer> queue;
    private final int maxSize;
    private volatile boolean running = true;
  
    public ProducerConsumerExample(int maxSize) {
        this.queue = new ArrayBlockingQueue<>(maxSize);
        this.maxSize = maxSize;
    }
  
    // 生产者
    public class Producer implements Runnable {
        private final String name;
      
        public Producer(String name) {
            this.name = name;
        }
      
        @Override
        public void run() {
            try {
                int item = 0;
                while (running) {
                    queue.put(item);
                    System.out.println(name + " 生产: " + item);
                    item++;
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
  
    // 消费者
    public class Consumer implements Runnable {
        private final String name;
      
        public Consumer(String name) {
            this.name = name;
        }
      
        @Override
        public void run() {
            try {
                while (running) {
                    Integer item = queue.take();
                    System.out.println(name + " 消费: " + item);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
  
    public void start() {
        // 启动生产者
        Thread producer1 = new Thread(new Producer("Producer-1"));
        Thread producer2 = new Thread(new Producer("Producer-2"));
      
        // 启动消费者
        Thread consumer1 = new Thread(new Consumer("Consumer-1"));
        Thread consumer2 = new Thread(new Consumer("Consumer-2"));
        Thread consumer3 = new Thread(new Consumer("Consumer-3"));
      
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
      
        // 运行一段时间后停止
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
      
        running = false;
      
        // 等待线程结束
        try {
            producer1.join();
            producer2.join();
            consumer1.join();
            consumer2.join();
            consumer3.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
  
    public static void main(String[] args) {
        ProducerConsumerExample example = new ProducerConsumerExample(10);
        example.start();
    }
}

最佳实践

1. 线程安全编程

// 线程安全的最佳实践
public class ThreadSafetyBestPractices {
  
    // 1. 使用不可变对象
    public static final class ImmutablePoint {
        private final int x;
        private final int y;
      
        public ImmutablePoint(int x, int y) {
            this.x = x;
            this.y = y;
        }
      
        public int getX() { return x; }
        public int getY() { return y; }
      
        public ImmutablePoint move(int dx, int dy) {
            return new ImmutablePoint(x + dx, y + dy);
        }
    }
  
    // 2. 使用线程安全的集合
    private final Map<String, String> threadSafeMap = new ConcurrentHashMap<>();
    private final List<String> threadSafeList = new CopyOnWriteArrayList<>();
  
    // 3. 使用原子类
    private final AtomicInteger counter = new AtomicInteger(0);
    private final AtomicReference<String> reference = new AtomicReference<>();
  
    // 4. 使用ThreadLocal
    private static final ThreadLocal<SimpleDateFormat> dateFormatHolder = 
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
  
    // 5. 正确的同步方式
    private final Object lock = new Object();
    private int sharedCounter = 0;
  
    public void incrementCounter() {
        synchronized (lock) {
            sharedCounter++;
        }
    }
  
    public int getCounter() {
        synchronized (lock) {
            return sharedCounter;
        }
    }
}

2. 性能优化

// 多线程性能优化示例
public class PerformanceOptimizationExample {
  
    // 1. 使用合适的线程池大小
    public ExecutorService createOptimizedThreadPool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
      
        return new ThreadPoolExecutor(
            cpuCores,                    // 核心线程数
            cpuCores * 2,               // 最大线程数
            60L,                        // 空闲时间
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000), // 有界队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
    }
  
    // 2. 使用Fork/Join框架处理分治任务
    public class SumTask extends RecursiveTask<Long> {
        private final long[] array;
        private final int start;
        private final int end;
        private static final int THRESHOLD = 10000;
      
        public SumTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
      
        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            }
          
            int mid = (start + end) >>> 1;
            SumTask leftTask = new SumTask(array, start, mid);
            SumTask rightTask = new SumTask(array, mid, end);
          
            leftTask.fork();
            rightTask.fork();
          
            return leftTask.join() + rightTask.join();
        }
    }
  
    // 3. 使用CompletableFuture进行异步编程
    public CompletableFuture<String> processAsync(String input) {
        return CompletableFuture
            .supplyAsync(() -> processInput(input))
            .thenApply(this::transformResult)
            .thenApply(this::formatOutput);
    }
  
    private String processInput(String input) {
        // 模拟处理
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Processed: " + input;
    }
  
    private String transformResult(String result) {
        return result.toUpperCase();
    }
  
    private String formatOutput(String output) {
        return "[" + output + "]";
    }
}

3. 错误处理

// 多线程错误处理最佳实践
public class ErrorHandlingBestPractices {
  
    // 1. 正确处理InterruptedException
    public void handleInterruptedException() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // 恢复中断状态
            Thread.currentThread().interrupt();
            // 清理资源
            cleanup();
            // 退出方法
            return;
        }
    }
  
    // 2. 使用UncaughtExceptionHandler
    public void setUncaughtExceptionHandler() {
        Thread thread = new Thread(() -> {
            throw new RuntimeException("测试异常");
        });
      
        thread.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("线程 " + t.getName() + " 发生异常: " + e.getMessage());
            // 记录日志
            logError(t, e);
            // 发送告警
            sendAlert(t, e);
        });
      
        thread.start();
    }
  
    // 3. 线程池异常处理
    public ExecutorService createExceptionHandlingThreadPool() {
        ThreadFactory threadFactory = r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, throwable) -> {
                System.err.println("线程池线程异常: " + throwable.getMessage());
                logError(thread, throwable);
            });
            return t;
        };
      
        return Executors.newFixedThreadPool(10, threadFactory);
    }
  
    // 4. 使用Future处理异常
    public void handleFutureException() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
      
        try {
            Future<String> future = executor.submit(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("随机异常");
                }
                return "成功";
            });
          
            try {
                String result = future.get(5, TimeUnit.SECONDS);
                System.out.println("结果: " + result);
            } catch (TimeoutException e) {
                System.err.println("任务超时");
                future.cancel(true);
            } catch (ExecutionException e) {
                System.err.println("任务执行异常: " + e.getCause().getMessage());
            }
        } finally {
            executor.shutdown();
        }
    }
  
    private void cleanup() {
        // 清理资源
    }
  
    private void logError(Thread thread, Throwable error) {
        // 记录错误日志
    }
  
    private void sendAlert(Thread thread, Throwable error) {
        // 发送告警
    }
}

总结

1. 学习路径图

Java多线程学习
基础阶段
进阶阶段
高级阶段
实践应用
线程创建与生命周期
基本同步机制
线程通信基础
线程池深入
并发工具类
JDK源码分析
内存模型JMM
锁优化机制
性能调优
Web服务器
数据处理
异步编程
掌握程度: 80%
掌握程度: 60%
掌握程度: 40%
掌握程度: 70%

2. 技能掌握层次图

入门
熟练
精通
专家
理解基本概念
会写简单示例
掌握核心API
解决常见问题
深入原理机制
性能优化能力
架构设计能力
疑难问题解决

3. 学习重点分布图

30% 25% 20% 15% 10% 学习重点时间分配 基础概念 同步机制 线程池 源码分析 性能优化

4. 技能树结构图

Java多线程技能树
基础知识
核心技能
高级技能
应用技能
线程概念
生命周期
创建方式
synchronized
volatile
Lock接口
线程池
JMM模型
锁优化
性能调优
源码分析
Web开发
数据处理
异步编程
分布式

5. 学习收益关系图

学习投入
技能提升
项目应用
问题解决
技术成长
编程能力
系统设计
性能优化
高并发系统
响应式应用
分布式架构
性能问题
并发问题
架构问题
技术深度
技术广度
职业发展

6. 实践建议流程图

开始学习
理论学习
代码实践
项目应用
问题总结
技能提升
是否掌握?
继续深入
源码研究
性能优化
架构设计
成为专家

7. 关键要点总结

要点 说明 重要程度
多线程基础 理解线程创建、生命周期、同步机制 ⭐⭐⭐⭐⭐
线程安全 掌握synchronized、volatile、Lock等同步工具 ⭐⭐⭐⭐⭐
线程池 理解线程池原理、参数配置、性能调优 ⭐⭐⭐⭐
并发工具 熟练使用CountDownLatch、CyclicBarrier、Semaphore等 ⭐⭐⭐⭐
性能优化 掌握锁优化、线程池调优、异步编程等技巧 ⭐⭐⭐

8. 学习建议

建议 说明 执行难度
理论结合实践 理解概念后多写代码验证 中等
阅读源码 深入理解JDK并发类的实现原理 困难
性能测试 通过基准测试验证性能优化效果 中等
问题排查 学会使用工具诊断多线程问题 困难

9. 发展方向

方向 说明 发展前景
响应式编程 学习Reactor、RxJava等响应式框架 ⭐⭐⭐⭐⭐
异步编程 掌握CompletableFuture、异步IO等 ⭐⭐⭐⭐
分布式并发 学习分布式锁、分布式事务等 ⭐⭐⭐⭐⭐
性能调优 深入JVM调优、系统性能优化 ⭐⭐⭐⭐

10. 最终建议

通过系统学习Java多线程,可以:

  • 提高程序性能和响应性 - 充分利用多核CPU资源
  • 解决复杂的并发编程问题 - 为高并发系统开发打下基础
  • 提升技术深度和广度 - 在职业发展中获得竞争优势

实践建议: 在实际项目中多应用多线程技术,通过实践加深理解,逐步掌握并发编程的精髓。记住:理论指导实践,实践验证理论

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐