【java学习】多线程之高并发编程:线程&任务&启动任务
1,synchronized(同步块)(1)概念用synchronized 关键字来实现同步块,所有加上synchronized 的块语句,在多线程访问的时候,同一时刻只能有一个线程能够访问,另一个线程必须等到当前线程访问完毕才能访问此代码块,解决了多线程的安全问题。即:加锁。(2)缺陷多个线程判断锁,耗费资源,降低代码执行效率。(3)作用①实现操作的原子性什么是原子性?原子操作即:不可分割的单一
1,概念
1)同步和异步
同步和异步通常用来形容一次方法调用。
1>同步方法
调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。
目的:都是为了解决多线程中的对同一资源的访问冲突;
a>场景
比如银行的转账系统,对数据库的保存操作等等,都会使用同步交互操作。
b>实现方式
- ThreadLocal
- synchronized( )
- wait()
- notify()
- volatile
2>异步方法
调用更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的操作。而异步方法通常会在另外一个线程中,“真实”地执行着。整个过程,不会阻碍调用者的工作。
2)缓存一致性(缓存连贯性、缓存同调)
在多核情况下,每条线程可能运行于不同的CPU 中,因此每个线程
运行时有自己的高速缓存(对单核CPU 来说,其实也会出现这种问题,只不过
是以线程调度的形式来分别执行的)。为了保留在这些高速缓存中的共享资源,保持数据一致性的机制,叫做缓存一致性。
为了解决缓存不一致性问题,通常来说有以下2 种解决方法:
1>通过在总线加LOCK#锁的方式(总线LOCK协议)
CPU 和其他部件进行通信都是通过总线来进行的,如果对总线加LOCK#锁的话,就阻塞了其他CPU 对其他部件访问(如内存),从而使得只能有一个CPU 能使用这个变量的内存。这样做的缺点是:在锁住总线期间,其他CPU 无法访问内存,导致效率低下。
2>通过缓存一致性协议(MESI协议)
该协议保证了每个缓存中使用的共享变量的副本是一致的。
核心思想:当CPU 向内存写入数据时,如果发现操作的变量是共享变量,即在其他CPU 中也存在该变量的副本,会发出信号通知其他CPU 将该变量的缓存行置为无效状态,因此当其他CPU 需要读取这个变量时,发现自己缓存中缓存该变量的缓存行是无效的,那么它就会从内存重新读取。
3)单核与多核
单CPU的多进程其实是多任务的快速轮转调度,不是并发执行;
只有多CPU才能真正的并发运行。
4)并发的三大特性
指令重排导致了有序性;线程切换导致了原子性;缓存导致了可见性 。
- 有序性:即程序执行的顺序按照代码的先后顺序执行。
- 原子性:一个或多个操作,要么全部执行且在执行过程中不被任何因素打断,要么全部不执行。
- 可见性 :当一个线程修改了共享变量的值,其他线程能够看到修改的值。通过 总线LOCK协议、MESI协议来保障。
2,线程和进程
了解概念即可,实际应用场景中线程的使用均是通过线程池来实现的。
3,任务执行&调度:@Async和@Scheduled
1)任务执行器(TaskExecutor)
1>TaskExecutor
java5引入了java.util.concurrent.Executor
,spring Framework 2.0引入了interface TaskExecutor extends Executor
。
TaskExecutor 常用实现:
- Simple.AsyncTaskExecutor
每次创建一个新线程 - ConcurrentTaskExecutor
- ThreadPoolTaskExecutor(常用)
2>@Async
Spring 3.0之后提供了一个@Async注解。
原理:AOP,根据切入点创建代理,调用@Async注解标注的方法时,会调用代理,执行切入点处理器invoke方法,将方法的执行提交给线程池,实现异步执行。
注解 | 场景说明 |
---|---|
@EnableAsync | 配置类中通过此注解开启对异步任务的支持; |
@Async | 在实际执行的bean方法使用该注解来声明其是一个异步任务 |
使用时要注意:
- 必须开启异步任务配置:
@EnableAsync
- 通过IOC方式调用@Async注解的方法,不可以直接调用方法。
- 定义线程池
默认使用SimpleAsyncTaskExecutor,而此实现每次执行一个提交的任务时候都会新建一个线程,没有线程的复用,一般使用ThreadPoolTaskExecutor 来代替。
建议显式定义AsyncTaskExecutor,并通过@Async("name")
指定线程池。 - 方法上或类上都可以注解,注解到类上时,表示所有的方法都将异步。
/**
* 线程池的配置
*/
@Configuration
@EnableAsync
public class AsyncConfig {
private static final int MAX_POOL_SIZE = 50;
private static final int CORE_POOL_SIZE = 20;
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
// 指定最大线程数
asyncTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
// 核心线程数目
asyncTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
// 队列中最大的数目
asyncTaskExecutor.setQueueCapacity(20);
// 线程空闲后的最大存活时间
asyncTaskExecutor.setKeepAliveSeconds(60);
// 线程名称前缀
asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool-");
// 等待任务在关机时完成-表明等待所有线程执行完
asyncTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间(默认为0,此时立即停止)
asyncTaskExecutor.setAwaitTerminationSeconds(60);
//拒绝策略
asyncTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
}
//其它类异步调用的时候,@Async会默认从线程池获取线程,当然也可以显式的指定
@Async("asyncTaskExecutor")
除了使用@Async注解来开启异步任务,也可以使用线程池对象,来开启异步任务:
@Autowired
AsyncTaskExecutor asyncTaskExecutor;//注入线程池对象
//通过线程池对象提交异步任务
asyncTaskExecutor.submit(() -> {
log.info("异步任务开始");
//省略异步任务业务逻辑...
log.info("异步任务结束");
});
3>Future异步回调
Future表示异步计算的结果,提供了用于检查计算是否完成、等待计算完成、以及检索计算结果的方法。
fork-join框架就是实现了Future接口。核心方法:
方法 | 说明 |
---|---|
get() | 等待任务完成,获取执行结果,如果任务取消会抛出异常; |
get(long timeout, TimeUnit unit) | 指定等待任务完成的时间,等待超时会抛出异常; |
isDone() |
判断任务是否完成; |
isCancelled() |
判断任务是否被取消; |
cancel(boolean mayInterruptIfRunning) |
尝试取消此任务的执行,如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败; |
@Slf4j
public class FutureTask {
@Async
public Future<String> taskOne() throws Exception {
//执行内容同上,省略
return new AsyncResult<>("1完成");
}
}
Future<String> taskOne = futureTask.taskOne();
if (taskOne.isDone()) {
log.info("任务1返回结果={}", taskOne.get());
}
2)任务调度器(TaskScheduler)
Spring 3.0还引用了任务调度接口 TaskScheduler。
定时任务:
指定某个特定时间执行,或者多次重复执行的任务。
1>TaskScheduler 接口
public interface TaskScheduler {
//推荐!
//通过触发器(Trigger )来决定task是否执行(更加灵活)
ScheduledFuture schedule(Runnable task, Trigger trigger);
//在starttime的时候执行一次
ScheduledFuture schedule(Runnable task, Date startTime);
//从starttime开始每个period时间段执行一次task
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
//每隔period执行一次
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
//从startTime开始每隔delay长时间执行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
//每隔delay时间执行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
- schedule
单一任务调度,通过定义Trigger可实现重复调度。
1)PeriodicTrigger
用于定期执行的Trigger。它有两种模式:
模式 | 说明 | 备注 |
---|---|---|
fixedRate | 两次任务开始时间之间间隔指定时长 | 默认 单位:ms |
fixedDelay | 上一次任务的结束时间与下一次任务开始时间``间隔指定时长 |
2)CronTrigger
通过Cron表达式来生成调度计划。比如:
//工作日的9点到17点,每个小时的15分执行一次
scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));
- scheduleAtFixedRate
多次执行任务。
固定频率执行任务。 - scheduleWithFixedDelay
多次执行任务。
固定延迟执行任务。
2>相关注解:@Scheduled
注解 | 场景说明 | 备注 |
---|---|---|
@EnableScheduling | 在配置类上使用,开启计划任务的支持 | 类上 |
@Scheduled | 来申明这是一个任务,包括cron,fixDelay,fixRate等类型。方法上,需先开启计划任务的支持 | 参数说明: fixedDelay 方法执行的间隔时间;从上一次方法执行完开始计算间隔时间。 fixedRate 按照一定的速率执行 当上一次方法阻塞,会累计应该执行的次数;不阻塞时,一次性执行完毕,再按速率执行。 cron 搭配cron表达式 按顺序依次表示:秒( 0~59 )分时(0~23 )天(0~31 )月(0~11 );initialDelay 表示容器启动后,延迟多少ms后再开始执行,一般和fixedRate 一起使用 |
注意:
调度仅支持当前物理机调度,不适用分布式架构。
4,线程池
5,线程安全与同步
保证多个线程访问同一个共享资源时,数据是一致的。
其实就是满足程序访问共享资源时的原子性、有序性、可见性。
1)synchronized(同步块)
1>概念
- 互斥,悲观锁
java.util.concurrent.locks.Lock接口的实现类。 - 阻塞
其他线程没有获得锁时会一直阻塞。多个线程判断锁,耗费资源,降低代码执行效率。 - 可重入性
线程已经获得锁后可以反复进入锁,每次进入锁计数+1,每释放一次锁时计数-1,计数变为0时锁彻底释放,其他线程可获得锁。
2>原理
同步方法的常量池中会有一个ACC_SYNCHRONIZED标识,当其他线程访问某个方法时,如果有ACC_SYNCHRONIZED设置则需要先获得监视器(Monitor,每个对象都有一个Monitor,它用于保证同一时间只能有一个线程访问当前对象)锁才能执行方法,执行之后再释放监视器锁。如果方法发生异常,抛异常之前会释放监视器锁。
1.6之前,synchronized加锁时会调用对象的objectMonitor的enter方法,解锁时会调用exit方法。锁是重量级锁:java线程需要映射到os的原生线程之上,如果阻塞或者唤醒一个线程就需要os的帮忙,进行用户态到核心态的切换,花费很多cpu时间。
a)锁的升级过程
1.6之后优化了锁:自旋锁(1.4推出,默认关闭。1.6默认打开)
锁状态 | 对象头-是否偏向锁 | 锁标志位 | 其它 |
---|---|---|---|
无锁 | 0 | 01 | hashCode |
偏向锁 | 1 | 01 | ThreadId… |
轻量级锁 | 00 | 指向线程栈中锁记录的指针 | |
重量级锁 | 10 | 指向等待队列的指针 | |
GC标记 | 11 | 空 |
- 偏向锁
java线程第一次访问对象时,对象头里设置ThreadId、锁状态设置为偏向锁。
其他线程访问对象,如果对象ThreadId是自己,直接访问;如果不是自己,锁升级为轻量锁。ThreadId存储位变成指向线程栈中锁记录的指针。 - 轻量锁
其他线程访问对象时,如果是轻量锁,尝试将指向线程栈中锁记录的指针替换为自己的,替换成功说明锁释放。否则自旋15次,还没有获得锁则锁升级为重量锁。将对象头中指向线程栈中锁记录的指针替换为指向等待队列的指针。 - 重量级锁
如果一个线程想要获取该锁,需要把自己加入等待队列,jdk8中对于重量级锁的自旋默认不开启。锁释放后,jvm从等待队列选择一个唤醒进入线程就绪状态。
b)锁粗化
尽量减小锁的粒度。JIT会自动粗化锁:一系列操作反复加锁、解锁,会将加锁范围扩散(粗化)到整个操作序列的外部。
d)锁消除
JIT经过逃逸分析之后发现并无线程安全问题的话会自动做锁消除。
3>作用
①实现操作的原子性
什么是原子性?
原子操作即:不可分割的单一的操作。
如 i++可以分为3步原子操作:① 读取变量 i 当前值 ②拿 i 的当前值,和1做加法运算 ③将加法结果赋值给 i 变量。
非原子操作在cpu的时间片切换的影响下可能会受到其他线程的干扰。
synchronized关键字实现操作的原子性,本质是通过该关键字所包括的临界区的排他性保证在任何一个时刻只有一个线程能够执行临界区中的代码。
②保障内存的可见性
什么是内存的可见性?
CPU在执行代码时,为减少变量访问时间将其缓存到CPU的缓存区。如此再次访问变量,是从cache中读取而不是从内存读取。同样也未写入内存。每个CPU有自己的缓存区,其内容对其他CPU不可见。
synchronized在解锁之前会把数据重新写回内存。
③保证多线程代码的正确性
保证一个线程执行临界区中的代码时,所修改的变量值对于稍后执行该临界区的线程来说是可见的。
④保证指令有序性
synchronized保证了单线程的执行,单线程下指令重排会进行限制(实际上还是有重排)。
4>使用
同步常用的有三种:
①同步块
用的锁是obj。
demo
package Thread;
import javax.swing.plaf.SliderUI;
public class MyThread implements Runnable {
private int i = 4;
Object obj = new Object();
@Override
public void run() {
while (true) {
/*
* 对象如同锁,持有锁才可以在同步中执行。
* 没有锁的即使获取cpu的执行权,也无法进入同步块。
* 要等待当前进程释放锁,其他进程才能抢占对象锁。
*/
synchronized (obj) {
if (i > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "--this is MyThread:" + i--);
}
}
}
}
}
②同步方法
用的锁是this。
/**
*线程安全的计数器
*/
public class ThreadSafeCounter{
private int counter = 0;
public void increment(){
synchronized(this){
counter++;
}
}
public int get(){
synchronized(this){
return counter;
}
}
}
③静态同步方法
用的锁是该类。
因为静态方法中不能定义this。静态进内存时,内存中没有本类对象,但是一定有该类对应的字节码文件对象。静态同步方法,使用的锁是该方法所在类的字节码文件对象:通过【类名.class】,获取。
public synchronized static void doSth(){
}
相当于:
synchronized(A.class){
}
2)volatile
轻量级的synchronized,在多处理器编程中保证共享变量的统一性。
1>场景
双重校验锁DCL(double checked locking)–使用volatile 的场景之一。
由于有些时候对 volatile的操作,不会被保存,说明不会造成阻塞。不可用与多线程环境下的计数器。
2>特点
- 保证可见性;
对于volatile修饰的变量,进行写操作时,jvm会发送一条lock前缀的指令,将修改后的值刷入主存。其它线程由于缓存一致性协议,会把变量的值从主存加载到自己的缓存中。 - 保证有序性
通过内存屏障(一组CPU指令)禁止指令重排优化(Re-order)。 - 不保证原子性;
想要保证原子性采用AtomicLong和AtoicInteger等类型定义变量。 - 只能修饰变量;
3>CAS (compare and swap,比较并交换)
- CAS包括三个操作数:旧的预期值(A)、主内存的值(B)、要修改的值(C);当且仅当A==B的时候,A的值才会被修改成C。
为了避免来回修改的值一样导致线程错误,一般CAS的值是版本号,不会出现多次操作修改的值一样的情况。 - 这个操作是原子性
- 非阻塞性的 乐观锁。
多个线程操作CAS时,只有一个线程可以修改值。其它线程竞争失败后也不会挂起,而是自旋重试(CPU空转)。 - jdk1.5的java.util.concurrent(JUC)就是在CAS的基础上实现的。
4>内存屏障
a)概念
是一组处理器指令,解决禁止指令重排序和内存可见性的问题。
b)作用
1.先于这个内存屏障的指令必须先执行,后于这个内存屏障的指令必须后执行。
2.使得内存可见性。
所以,如果你的字段是volatile,在读指令前插入读屏障,可以让高速缓存中的数据失效,重新从主内存加载数据。在写指令之后插入写屏障,能让写入缓存的最新数据写回到主内存。
c)类型
- LoadLoad 屏障
对于这样的语句Load1; LoadLoad; Load2,在Load2 及后续读取操作要读取的数据被访问前,保证Load1 要读取的数据被读取完毕。 - StoreStore 屏障
对于这样的语句Store1; StoreStore; Store2,在Store2 及后续写入操作执行前,保证Store1 的写入操作对其它处理器可见。 - LoadStore 屏障
对于这样的语句Load1; LoadStore; Store2,在Store2 及后续写入操作被刷出前,保证Load1 要读取的数据被读取完毕。 - StoreLoad 屏障
对于这样的语句Store1; StoreLoad; Load2,在Load2 及后续所有读取操作执行前,保证Store1 的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。
5>volatile 和synchronized 区别
1.volatile 是变量修饰符,而synchronized 则作用于代码块或方法。
2.volatile 不会对变量加锁,不会造成线程的阻塞;synchronized 会对变量加锁,可能会造成线程的阻塞。
3.volatile 仅能实现变量的修改可见性,并不能保证原子性;而synchronized 则可以保证变量的修改可见性和原子性。
(synchronized 有两个重要含义:它确保了一次只有一个线程可以执行代码的受保护部分(互斥),而且它确保了一个线程更改的数据对于其它线程是可见的(更改的可见性),在释放锁之前会将对变量的修改刷新到主存中)。
4.volatile 标记的变量不会被编译器优化,禁止指令重排序;synchronized 标记的变量可以被编译器优化。
6>什么场景下可以使用volatile 替换synchronized?
只需要保证共享资源的可见性的时候可以使用volatile 替代,
synchronized 保证可操作的原子性、一致性和可见性。
3)java锁
4)数据类型的安全性
线程安全:
指当多线程访问时,采用了加锁的机制;即当一个线程访问该类的某个数据时,会对这个数据进行保护,其他线程不能对其访问,直到该线程读取完之后,其他线程才可以使用。防止出现数据不一致或者数据被污染等意外情况。
线程不安全:
就是不提供数据访问时的数据保护,多个线程能够同时操作某个数据,从而出现数据不一致或者数据污染等意外情况。
1>集合
常见的线程不安全的集合:
ArrayList(多个线程操作会抛异常:ConcurrentModificationException);
常见的线程安全的集合:
CopyOnWriteArrayList(写时复制)
Vector
2>AtomicLong
AtomicLong是作用是对长整形进行原子操作。
在32位操作系统中,64位的long 和 double 变量由于会被JVM当作两个分离的32位来进行操作,所以不具有原子性。而使用AtomicLong能让long的操作保持原子型。
常见用法:
//0、创建具有初始值 0 的新 AtomicLong。
AtomicLong atomicLong = new AtomicLong();
System.out.println("Value:" + atomicLong.get()); //0
//1、设置值
atomicLong.set(10);
System.out.println("Value:" + atomicLong.get()); //10
//2、创建具有给定初始值的新 AtomicLong。
AtomicLong atomicLong1 = new AtomicLong(10);
System.out.println("Value:" + atomicLong1.get()); //10
//3、以原子方式将给定值添加到当前值,先加上特定的值 相当于++5
System.out.println("Value:" + atomicLong1.addAndGet(5)); //15
//4、先获取当前值再加上特定的值 相当于5++
System.out.println("Value:" + atomicLong1.getAndAdd(5)); //15
System.out.println("Value:" + atomicLong1); //20
//5、如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。 compareAndSet(long expect, long update)
System.out.println("Value:" + atomicLong1.compareAndSet(20,15));//true
System.out.println("Value:" + atomicLong1);//15
//6、 以原子方式将当前值减 1,先减去1再获取值 atomicLong1.decrementAndGet()
//7、先获取当前值再减1 getAndDecrement()
//8、先获取当前值再加1 getAndIncrement()
//9、先加1再获取当前值 incrementAndGet()
//10、先获取当前值再设置新的值 getAndSet()
3>AtomicReference
AtomicReference类提供了对象引用的非阻塞原子性读写操作。
针对对象引用的修改包含了如下两个步骤:获取该引用和改变该引用(每一个步骤都是原子性的操作,但组合起来就无法保证原子性了)。即使用volatile修饰也不能保证原子性。
synchronized是一种阻塞式的解决方案,同一时刻只能有一个线程真正在工作,其他线程都将陷入阻塞,效率低。利用AtomicReference的非阻塞原子性解决方案能更加高效一点。
说明 | 说明 | 举例 | 备注 |
---|---|---|---|
无参构造函数 | AtomicReference() |
需要再次调用set()方法为AtomicReference内部的value指定初始值 | |
有参构造函数 | AtomicReference(V initialValue); |
private static AtomicReference<DebitCard> debitCardRef = new AtomicReference<>(new DebitCard("zhangsan", 0)); |
已经指定了初始值 |
原子性地更新AtomicReference内部的value值 | boolean compareAndSet(V expect, V update) |
expect:当前值,update:要更新的新值。当expect和AtomicReference的当前值不相等时,修改会失败,返回值为false | |
原子性地更新AtomicReference内部的value值,并且返回AtomicReference的旧值。 | getAndSet(V newValue) |
||
原子性地更新value值,并且返回AtomicReference的旧值,该方法需要传入一个Function接口。 | getAndUpdate(UnaryOperator<V> updateFunction) |
||
原子性地更新value值,并且返回AtomicReference更新后的新值,该方法需要传入一个Function接口。 | updateAndGet(UnaryOperator<V> updateFunction) |
||
原子性地更新value值,并且返回AtomicReference更新前的旧值。 | getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction) |
第一个参数是更新后的新值,第二个是BinaryOperator接口。 | |
原子性地更新value值,并且返回AtomicReference更新后的值。 | accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction) |
第一个参数是更新的新值,第二个是BinaryOperator接口。 | |
获取AtomicReference的当前对象引用值 | get() |
||
设置AtomicReference最新的对象引用值,该新值的更新对其他线程立即可见。 | set(V newValue) |
||
设置AtomicReference的对象引用值。 | lazySet(V newValue) |
6,线程间通信
7,类加载器
8,多线程之设计模式
9,java.util.concurrent.locks.Lock
1)概念
2)用法
使用Lock 必须在try-catch-finally 块中进行,并且将释放锁的操作放在finally 块中进行,以保证锁一定被释放,防止死锁的发生。通常使用Lock 来进行同步的话,是以下面这种形式去使用的:
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
3)Lock 和synchronized 的区别
1)Lock 是一个接口,而synchronized 是Java 中的关键字,synchronized 是内置的语言实现;
2)synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock 在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock 时需要在finally 块中释放锁;
3)Lock 可以让等待锁的线程响应中断(可中断锁),而synchronized却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断(不可中断锁);
4)通过Lock 可以知道有没有成功获取锁(tryLock()方法:如果获取了锁,则返回true;否则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待),而synchronized 却无法办到。
5)Lock 可以提高多个线程进行读操作的效率(读写锁)。
6)Lock 可以实现公平锁,synchronized 不保证公平性。
在性能上来说,如果线程竞争资源不激烈时,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock 的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
10,RecursiveTask Fork/join
1)Fork/join框架
Java7 的一个用于并行执行的任务的框架(分治法)。
1>工作流程
- 声明一个ForkJoinPool,用于所有task任务执行。
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
//参数:
//1. 最大线程数(默认值为cpu核数)
//2. 创建线程的工厂(默认:ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory)
//3. 异常处理器(默认为 null):在线程执行任务时对由于某些无法预料的错误而导致任务线程中断时,该处理器会进行一些处理。
//4. 表示工作线程内的任务队列是采用何种方式进行调度。true:FIFO;fasle: LIFO(默认)。
private final ForkJoinPool forkJoinPool = new ForkJoinPool(5, factory, null, false);
- 建立一个 ForkJoinTask并放入ForkJoinPool。
submit提交任务并开始执行。
//GetBatchTask extends RecursiveTask<List>
GetBatchTask getBatchTask = new GetBatchTask();
ForkJoinTask<List> submit = forkJoinPool.submit(getBatchTask);
- computer()方法执行
task任务的具体实现。
采用分治算法,递归拆分任务(根据阈值将任务拆分到最细,如果任务足够小就不拆分,单任务执行),调用task.fork();
将任务加入线程队列,并手动保存当前所有子任务;
通过循环task.join()
,手动获取所有子任务结果并归并所有结果。
返回task最终结果。 - submit.get()
主线程调用。
轮询监测每个任务状态(阻塞),如果任务都结束则返回最终结果;如果任务异常可能会抛出异常,最好通过异常处理器进行显式的异常处理。 - 异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常;
ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。
if(task.isCompletedAbnormally()) {
System.out.println(task.getException());
}
2>子类
RecurisiveTask(有返回结果);
RecursiveAction(没有返回结果)
3>原理(工作窃取算法)
- ForkJoinPool:线程池
java.util.concurrent.ForkJoinPool,是ExecutorService的一个实现,用于管理工作线程,可以监控关线程池状态和性能的信息。
与ThreadPoolExecutor区别:
ForkJoinPool每个线程都有自己的队列;ThreadPoolExecutor共用一个队列。 - ForkJoinWorkerThread:执行任务时具体的线程
工作窃取算法:每个工作线程从其自己的双端队列中获取任务。如果自己的双端队列中的任务已经执行完毕,双端队列为空,工作线程就会从另一个忙线程的双端队列尾部或全局入口队列中获取任务。
fork一个新任务时,线程将任务放入队尾,从自己的双端队列拿任务消费时,从队尾拿数据;如果窃取其它线程的任务,从队头拿数据。
线程空闲后进入休眠状态,如果线程池销毁跟着一并销毁。 - ForkJoinTask:任务
实现了Future接口 - get方法是怎么阻塞并获取所有数据的?
task.join()方法和Thread的join()方法类似,都是通过获取对象锁来进行wait、notifyAll的。
2)实现
需求是:计算1+2+3+4的结果.
package cn.day8;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 阈值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// TODO Auto-generated method stub
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= TH`RES`HOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
//创建分治任务线程池,可以追踪到线程名称。线程池一定要作为公共的进行管理,并且要限制初始化的大小和队列的长度,否则会一直吃资源。
ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
// 生成一个计算任务,负责计算1+2+3+4
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
11,网站的高并发,大流量访问怎么解决?
1)HTML 页面静态化
访问频率较高但内容变动较小,使用网站HTML 静态化方案来优化访问速度。将社区内的帖子、文章进行实时的静态化,有更新的时候再重新静态化也是大量使用的策略。
优势:
一、减轻服务器负担。
二、加快页面打开速度,静态页面无需访问数据库,打开速度较动态页面有明显提高;
三、很多搜索引擎都会优先收录静态页面,不仅被收录的快,还收录的全,容易被搜索引擎找到;
四、HTML 静态页面不会受程序相关漏洞的影响,减少攻击,提高安全性。
2)图片服务器和应用服务器相分离
现在很多的网站上都会用到大量的图片,而图片是网页传输中占主要的数据量,也是影响网站性能的主要因素。因此很多网站都会将图片存储从网站中分离出来,另外架构一个或多个服务器来存储图片,将图片放到一个虚拟目录中,而网页上的图片都用一个URL 地址来指向这些服务器上的图片的地址,这样的话网站的性能就明显提高了。
优势:
一、分担Web 服务器的I/O 负载-将耗费资源的图片服务分离出来,提高服务器的性能和稳定性。
二、能够专门对图片服务器进行优化-为图片服务设置有针对性的缓存方案,减少带宽成本,提高访问速度。
三、提高网站的可扩展性-通过增加图片服务器,提高图片吞吐能力。
3)缓存
4)镜像
镜像是冗余的一种类型,一个磁盘上的数据在另一个磁盘上存在一个完全相同的副本即为镜像。
5)负载均衡
6)并发控制
加锁,如乐观锁和悲观锁。
如:订票系统,某车次只有一张火车票,假定有1w 个人同时打开12306 网站来订票,如何解决并发问题?(可扩展
到任何高并发网站要考虑的并发读写问题)。
不但要保证1w 个人能同时看到有票(数据的可读性),还要保证最终只能由一个人买到票(数据的排他性)。使用数据库层面的并发访问控制机制。采用乐观锁即可解决此问题。乐观锁意思是不锁定表的情况下,利用业务的控制来解决并发问题,这样既保证数据的并发可读性,又保证保存数据的排他性,保证性能的同时解决了并发带来的脏数据问题。hibernate 中实现乐观锁。
7)消息队列
通过mq 一个一个排队方式,跟12306 一样。
8)一台客户端有三百个客户与三百个客户端有三百个客户对服务器施压,有什么区别?
300个用户在一个客户端上,会占用客户机更多的资源,而影响测试的结果。
线程之间可能发生干扰,而产生一些异常。
300个用户在一个客户端上,需要更大的带宽。
IP地址的问题,可能需要使用IP Spoof来绕过服务器对于单一IP地址最大连接数的限制。
所有用户在一个客户端上,不必考虑分布式管理的问题;而用户分布在不同的客户端上,需要考虑使用控制器来整体调配不同客户机上的用户。同时,还需要给予相应的权限配置和防火墙设置。
12,启动任务
springBoot框架项目,有时候有预加载数据需求——提前加载到缓存中或类的属性中,并且希望执行操作的时间是在容器启动末尾时间执行操作。针对这种场景,SpringBoot提供了两个接口,分别是CommandLineRunner和ApplicationRunner,两个接口除了实现不一样,其它都没什么区别。
依赖:两个接口都在spring-boot的jar包中(spring-boot的jar包依附关系:spring-boot<-spring-boot-starter<-spring-boot-starter-web),项目只需要依赖spring-boot-starter-web的jar便可使用。
1)ApplicationRunner
@FunctionalInterface
public interface ApplicationRunner {
void run(ApplicationArguments args) throws Exception;
}
- spring程序启动后,run方法会被自动调用。
- 执行顺序:
- 场景:初始化操作。
实例化Kafka客户端,异步加载缓存。
1>使用
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Component
public class StartRegisterSchedule implements ApplicationRunner {
@Autowired
RestTemplate restTemplate;
@Override
public void run(ApplicationArguments args) {
// 获取命令行参数
String[] sourceArgs = args.getSourceArgs();
List<String> nonOptionArgs = args.getNonOptionArgs();
Set<String> optionNames = args.getOptionNames();
}
}
2>问题
- ApplicationRunner没有调用?
可能其他启动任务非异步调用被阻塞了。
2)CommandLineRunner
public interface CommandLineRunner {
void run(String... args) throws Exception;
}
1>使用
实现CommandLineRuner接口,实现run
方法
更多推荐
所有评论(0)