Java多线程与共享变量
System.out.println("转账成功: " + amount + " 从 " + this.name + " 到 " + to.name);System.out.println("任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName());System.out.println("总次数: " + (successCo
Java多线程与共享变量
1. 引言
2. 多线程基础与共享变量概念
2.1 线程与进程的区别
public static void main(String[] args) {
// 获取当前线程信息
Thread mainThread = Thread.currentThread();
System.out.println("当前线程: " + mainThread.getName());
System.out.println("线程ID: " + mainThread.getId());
System.out.println("线程状态: " + mainThread.getState());
System.out.println("线程优先级: " + mainThread.getPriority());
}
}
2.2 共享变量的定义与风险
private int sharedCounter = 0; // 共享变量
public void increment() {
sharedCounter++; // 非原子操作
}
public int getCounter() {
return sharedCounter;
}
}
3. 多线程环境的核心问题
3.1 可见性问题(Visibility)
private static boolean ready = false;
private static int number = 0;
public static void main(String[] args) throws InterruptedException {
Thread readerThread = new Thread(() -> {
while (!ready) {
// 空循环,等待ready变为true
}
System.out.println("Number: " + number);
});
Thread writerThread = new Thread(() -> {
number = 42;
ready = true; // 可能重排序,导致readerThread看到ready为true但number仍为0
});
readerThread.start();
Thread.sleep(100); // 确保readerThread先运行
writerThread.start();
readerThread.join();
writerThread.join();
}
}
3.2 原子性问题(Atomicity)
private int count = 0;
public void increment() {
count++; // 实际上包含读取-修改-写入三个步骤
}
public static void main(String[] args) throws InterruptedException {
AtomicityProblem problem = new AtomicityProblem();
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
problem.increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
// 预期结果应该是1000000,但实际运行结果通常小于这个值
System.out.println("最终计数: " + problem.count);
}
}
3.3 有序性问题(Ordering)
4. 解决方案与最佳实践
4.1 synchronized关键字
private int counter = 0;
private final Object lock = new Object();
// 同步方法
public synchronized void incrementMethod() {
counter++;
}
// 同步块
public void incrementBlock() {
synchronized (lock) {
counter++;
}
}
// 静态同步方法
public static synchronized void staticIncrement() {
// 静态方法使用类对象作为锁
}
public int getCounter() {
return counter;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedSolution solution = new SynchronizedSolution();
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
solution.incrementBlock();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("同步后的计数: " + solution.getCounter()); // 总是1000000
}
}
4.2 volatile关键字
private volatile boolean shutdownRequested = false;
public void shutdown() {
shutdownRequested = true;
}
public void doWork() {
while (!shutdownRequested) {
// 执行工作任务
System.out.println("Working...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("Worker thread stopped.");
}
public static void main(String[] args) throws InterruptedException {
VolatileExample example = new VolatileExample();
Thread workerThread = new Thread(example::doWork);
workerThread.start();
// 让工作线程运行一段时间
Thread.sleep(1000);
// 请求关闭,由于volatile的可见性保证,工作线程会立即看到这个变化
example.shutdown();
workerThread.join();
System.out.println("Main thread finished.");
}
}
4.3 原子类(Atomic Classes)
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicClassExample {
private AtomicInteger atomicCounter = new AtomicInteger(0);
private AtomicLong atomicLong = new AtomicLong(0L);
private AtomicReference<String> atomicReference = new AtomicReference<>("initial");
public void increment() {
atomicCounter.incrementAndGet(); // 原子自增
}
public void updateLong() {
atomicLong.accumulateAndGet(10, (x, y) -> x + y); // 原子累加
}
public boolean updateReference(String expected, String newValue) {
return atomicReference.compareAndSet(expected, newValue); // CAS操作
}
public int getCounter() {
return atomicCounter.get();
}
public static void main(String[] args) throws InterruptedException {
AtomicClassExample example = new AtomicClassExample();
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
example.increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("原子计数器: " + example.getCounter()); // 总是1000000
// 测试AtomicReference
boolean success = example.updateReference("initial", "updated");
System.out.println("更新引用结果: " + success);
System.out.println("当前引用值: " + example.atomicReference.get());
}
}
4.4 Lock接口与实现
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockExample {
private int counter = 0;
private final Lock lock = new ReentrantLock();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
public void incrementWithLock() {
lock.lock();
try {
counter++;
} finally {
lock.unlock(); // 确保在finally块中释放锁
}
}
public int getCounterWithReadLock() {
readLock.lock();
try {
return counter;
} finally {
readLock.unlock();
}
}
public void incrementWithWriteLock() {
writeLock.lock();
try {
counter++;
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
LockExample example = new LockExample();
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
example.incrementWithLock();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("使用Lock的计数: " + example.getCounterWithReadLock());
}
}
4.5 ThreadLocal变量
private static ThreadLocal<Integer> threadLocalCounter = ThreadLocal.withInitial(() -> 0);
private static ThreadLocal<String> userContext = new ThreadLocal<>();
public static void increment() {
threadLocalCounter.set(threadLocalCounter.get() + 1);
}
public static int get() {
return threadLocalCounter.get();
}
public static void setUser(String user) {
userContext.set(user);
}
public static String getUser() {
return userContext.get();
}
public static void clear() {
threadLocalCounter.remove();
userContext.remove();
}
public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
setUser(Thread.currentThread().getName());
for (int i = 0; i < 5; i++) {
increment();
System.out.println(Thread.currentThread().getName() +
" - 用户: " + getUser() +
", 计数: " + get());
}
clear(); // 清理ThreadLocal,防止内存泄漏
};
Thread thread1 = new Thread(task, "线程-1");
Thread thread2 = new Thread(task, "线程-2");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
}
5. 高级并发模式
5.1 生产者-消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerExample {
private static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int itemsToProduce;
public Producer(BlockingQueue<Integer> queue, int itemsToProduce) {
this.queue = queue;
this.itemsToProduce = itemsToProduce;
}
@Override
public void run() {
try {
for (int i = 0; i < itemsToProduce; i++) {
System.out.println("生产: " + i);
queue.put(i); // 阻塞如果队列满
Thread.sleep(100); // 模拟生产时间
}
// 发送结束信号
queue.put(-1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
public Consumer(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 阻塞如果队列空
if (item == -1) {
// 把结束信号放回,让其他消费者也能看到
queue.put(-1);
break;
}
System.out.println(name + " 消费: " + item);
Thread.sleep(200); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(new Producer(queue, 20));
Thread consumer1 = new Thread(new Consumer(queue, "消费者1"));
Thread consumer2 = new Thread(new Consumer(queue, "消费者2"));
producer.start();
consumer1.start();
consumer2.start();
producer.join();
consumer1.join();
consumer2.join();
System.out.println("生产消费模式完成");
}
}
5.2 线程池与Executor框架
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExample {
private static class CountingTask implements Callable<Integer> {
private final int taskId;
private final AtomicInteger sharedCounter;
public CountingTask(int taskId, AtomicInteger sharedCounter) {
this.taskId = taskId;
this.sharedCounter = sharedCounter;
}
@Override
public Integer call() throws Exception {
System.out.println("任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName());
int localCount = 0;
for (int i = 0; i < 1000; i++) {
sharedCounter.incrementAndGet();
localCount++;
// 模拟任务处理时间
if (i % 100 == 0) {
Thread.sleep(10);
}
}
System.out.println("任务 " + taskId + " 完成,本地计数: " + localCount);
return localCount;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
AtomicInteger sharedCounter = new AtomicInteger(0);
// 提交任务
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<Integer> future = executor.submit(new CountingTask(i, sharedCounter));
futures.add(future);
}
// 获取结果
int totalLocalCount = 0;
for (Future<Integer> future : futures) {
totalLocalCount += future.get();
}
System.out.println("所有任务本地计数总和: " + totalLocalCount);
System.out.println("共享计数器最终值: " + sharedCounter.get());
// 关闭线程池
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
}
}
6. 性能优化与最佳实践
6.1 减少锁竞争
// 不好的设计 - 所有操作使用同一把锁
private static class PoorDesign {
private final Object lock = new Object();
private int counter1 = 0;
private int counter2 = 0;
public void increment1() {
synchronized (lock) {
counter1++;
}
}
public void increment2() {
synchronized (lock) {
counter2++;
}
}
}
// 好的设计 - 使用分离的锁
private static class GoodDesign {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
private int counter1 = 0;
private int counter2 = 0;
public void increment1() {
synchronized (lock1) {
counter1++;
}
}
public void increment2() {
synchronized (lock2) {
counter2++;
}
}
}
// 使用原子变量进一步优化
private static class OptimizedDesign {
private AtomicInteger counter1 = new AtomicInteger(0);
private AtomicInteger counter2 = new AtomicInteger(0);
public void increment1() {
counter1.incrementAndGet();
}
public void increment2() {
counter2.incrementAndGet();
}
}
}
6.2 避免死锁
private static class Account {
private final String name;
private int balance;
public Account(String name, int balance) {
this.name = name;
this.balance = balance;
}
public void transfer(Account to, int amount) {
// 确定锁的获取顺序来避免死锁
Account first = this.name.compareTo(to.name) < 0 ? this : to;
Account second = this.name.compareTo(to.name) < 0 ? to : this;
synchronized (first) {
synchronized (second) {
if (this.balance >= amount) {
this.balance -= amount;
to.balance += amount;
System.out.println("转账成功: " + amount + " 从 " + this.name + " 到 " + to.name);
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Account accountA = new Account("A", 1000);
Account accountB = new Account("B", 1000);
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
accountA.transfer(accountB, 10);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
accountB.transfer(accountA, 10);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("账户A余额: " + accountA.balance);
System.out.println("账户B余额: " + accountB.balance);
}
}
7. 测试与调试技巧
7.1 多线程测试
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentTest {
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger failureCount = new AtomicInteger(0);
public boolean process(int value) {
// 模拟业务处理
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return value % 2 == 0; // 简单判断逻辑
}
public void concurrentTest() throws InterruptedException {
int threadCount = 10;
int iterations = 100;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executor.submit(() -> {
try {
startLatch.await(); // 等待开始信号
for (int j = 0; j < iterations; j++) {
int value = threadId * iterations + j;
boolean result = process(value);
if (result) {
successCount.incrementAndGet();
} else {
failureCount.incrementAndGet();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
// 同时启动所有线程
startLatch.countDown();
// 等待所有线程完成
endLatch.await();
executor.shutdown();
System.out.println("测试完成:");
System.out.println("成功次数: " + successCount.get());
System.out.println("失败次数: " + failureCount.get());
System.out.println("总次数: " + (successCount.get() + failureCount.get()));
}
public static void main(String[] args) throws InterruptedException {
ConcurrentTest test = new ConcurrentTest();
test.concurrentTest();
}
}
8. 总结
- 三大核心问题:可见性、原子性和有序性是多线程编程的主要挑战
- 解决方案:synchronized、volatile、原子类和Lock提供了不同层次的线程安全保证
- 最佳实践:合理使用ThreadLocal、减少锁竞争、避免死锁等
- 高级模式:生产者-消费者、线程池等模式提供了高效的并发处理方案
- Java并发编程实战
- Java内存模型规范
- JUC (java.util.concurrent) 包源码
更多推荐
所有评论(0)