JAVA 并发编程
1. 锁
1.1 锁的类别
公平锁与非公平锁
公平锁:线程严格申请锁的顺序争抢锁,这些线程会被加入一个等待队列中
非公平锁:线程并非严格按照先到先得的规则来争抢锁,即,有可能晚到的线程先拿到锁。
ReentrantLock
默认是非公平锁。synchronized
是非公平锁。可重入锁(递归锁)
这里要提到一点:如果一个对象存在多个
synchronized
修饰的方法,当一个线程访问其中一个同步方法时,其他线程访问不了这个对象的其他同步方法。根据synchronized
的原理很好理解。 可重入锁,某个对象存在多个同步方法,当线程已经获得锁的情况下,该线程可以继续访问其他同步方法,而不必等待,这样做是为了避免死锁。
ReentrantLock
和synchronized
都是可重入锁。自旋锁(spin lock)
如果锁被其他线程占有,这个请求锁的线程便会被加入等待队列,此时
CPU
会继续调度其他线程,由于频繁切换线程的开销比较大,而且争抢锁不是很频繁。自旋锁就是当没有获取到锁,CPU
不会挂起该线程,而是一直轮询(仍然占有cpu
),直到该线程争抢到锁。这样做减轻了频繁切换线程,但是轮询增加了CPU
负担。读写锁
相对于独占锁,每个锁只能被一个线程占有,读写锁对于读请求,读锁可以由多个线程共享;写操作,写锁,只允许一个线程独占,以此来提高并发性。即读读可以并存,读写、写写不可以共存。 能保证读写、写读和写写的过程是互斥,读读的时候是共享的。
java.util.concurrent
包中ReentrantReadWriteLock
就是读写锁。1
2
3
4
5
6
7
8
9//读的时候加锁,共享锁
lock.readLock().lock();
...
lock.readLock().unlock();
//写的时候加锁,独占锁
lock.writeLock().lock();
...
lock.writeLock().unlock();1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59public class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
WriteLock writeLock = lock.writeLock();
ReadLock readLock = lock.readLock();
public void put(String key, Object value) {
try {
writeLock.lock();
System.out.println(Thread.currentThread().getName() + " 正在写入...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入完成,写入结果是 " + value);
} finally {
writeLock.unlock();
}
}
public void get(String key) {
try {
readLock.lock();
System.out.println(Thread.currentThread().getName() + " 正在读...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object res = map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成,读取结果是 " + res);
} finally {
readLock.unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache cache = new MyCache();
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(() -> {
cache.put(temp + "", temp + "");
}).start();
}
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(() -> {
cache.get(temp + "");
}).start();
}
}
}
死锁的四个必要条件
互斥:资源在某一时刻只允许被一个线程所访问
不可抢占:进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。
循环等待:
占有且等待:进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
本质原因:系统资源有限、进程推进顺不当。
1.2 synchronized 的原理
普通方法:锁是当前实例对象
静态方法:所示当前实例对象的Class对象
方法块:括号内的对象
monitor机制
每一个线程都有一个可用的mointor record
列表,每一个被锁住的对象都会和一个monitor record
关联(存放在对象头信息中)。当一个对象的monitor
被持有后,该对象即处于锁定状态。
同步代码块开始之前,monitor enter
指令插入,线程获取monitor
所有权,方法块结束,monitor exit
指令插入,释放锁。
Java对象在内存中的结构
1.3 volatile 关键字
Java Memory Model
在Java内存模型中,每个线程执行时,都会拷贝主内存中数据到自己的栈内存(工作内存)中,当程序结束后,再把数据写回主内存。由此,当多个线程共同访问这个值,各个线程是相互独立的,其中一个修改,其他的并不知情,当多个线程协作修改数据,就会造成数据不一致。
volatile
的作用就是,被其修饰的变量,一旦值发生改变,其他线程就会放弃自己栈内存中的值,重新向内存中取值,从而保证了数据的内存可见性。
特性:禁止指令重排、不保证原子性、内存可见性
1.4 synchronized 和 Lock 有什么区别?
- 原始结构
- synchronized 是关键字属于 JVM 层面,反应在字节码上是
monitorenter
和monitorexit
,其底层是通过monito
r 对象来完成,其实 wait/notify 等方法也是依赖 monitor 对象只有在同步快或方法中才能调用wait/notify
等方法。 - Lock 是具体类(
java.util.concurrent.locks.Lock
)是 api 层面的锁。
- synchronized 是关键字属于 JVM 层面,反应在字节码上是
- 使用方法
synchronized
不需要用户手动去释放锁,当synchronized
代码执行完后系统会自动让线程释放对锁的占用。ReentrantLock
则需要用户手动的释放锁,若没有主动释放锁,可能导致出现死锁的现象,lock() 和 unlock() 方法需要配合 try/finally 语句来完成。
- 等待是否可中断
synchronized
不可中断,除非抛出异常或者正常运行完成。ReentrantLock
可中断,设置超时方法tryLock(long timeout, TimeUnit unit),lockInterruptibly()
放代码块中,调用 interrupt() 方法可中断。
- 加锁是否公平
synchronized
非公平锁ReentrantLock
默认非公平锁,构造方法中可以传入 boolean 值,true 为公平锁,false 为非公平锁。
- 锁可以绑定多个 Condition
synchronized
没有Condition
。ReentrantLock
用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像synchronized
要么随机唤醒一个线程要么唤醒全部线程。
2. 线程池
为什么要用线程池?答:它预先创建好一部分线程,使用完后放回池中,避免了创建与销毁线程的昂贵开销,使得性能大大提升 。
主要特点为:
- 线程复用
- 控制最大并发数量
- 管理线程
2.1 线程的状态
在Java中,线程共有以下几种状态
创建。
Java中有三种方法创建线程。
(1). 继承
Thread
类。重写run
方法1
2
3
4
5
6class Mythreas extends Thread{
public void run(){
...
}
}(2). 实现
Runnable
接口中的run
方法。1
2
3
4
5
6class Mythread2 implements Runnable{
public void run(){
...
}
}(3). 实现
Callable
接口中的call
方法,注意,该方法是有返回值的,并且含有泛型。执行Callable
方式,需要FutureTask
实现类的支持,用于接受运算结果。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38class ThreadDemo implements Callable<Integer> {
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += 1;
}
return sum;
}
}
public class ThreadPoolDemo {
public static void main(String[] args) {
Mythread t1 = new Mythread();
//1.执行Callable方式,需要FutureTask实现类的支持,用于接受运算结果。
FutureTask<Integer> result = new FutureTask<Integer>(t1);
new Thread(result).start();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
// 通过线程池提交,Future类接受运算结果
Future<Integer> submit = poolExecutor.submit(t1);
Integer sum = null;//FutureTask也可用闭锁的操作
try {
sum = result.get();
Integer res = submit.get();
System.out.println(res);
System.out.println(sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}就绪。线程已经
start
,只是没有得到CPU
时间片。执行yield
方法。运行。线程得到时间片执行。
阻塞。线程执行
sleep/join
方法,或wait
方法。注意执行两个方法线程状态的区别,sleep
方法不会释放所持有的锁,时间结束后转到就绪
态;wait
方法会释放持有的锁,线程进入等待队列,直到有线程执行notify/notifyAll
方法,该线程被唤醒,进入锁池中,争抢锁。拿到锁就可以转为就绪态。销毁
2.2 线程池参数详解
首先给出java
中线程池的构造函数(其中之一),有几个参数,然后解释他的工作流程。
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
核心线程数maximumPoolSize 最大线程数
keepAliveTime
保活时间TimeUnit unit
时间单位BlockingQueue<Runnable> workQueue
RejectedExecutionHandler handler
拒绝策略ThreadPoolExecutor.AbortPolicy
: 丢弃任务并抛出RejectedExecutionException
异常。 (默认)ThreadPoolExecutor.DiscardPolicy
:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务工作流程
- 当有任务到达时,如果已经创建的线程数小于
corePoolSize
,那么创建一个线程来执行这个任务。 - 如果已经创建的线程数等于
corePoolSize
,并且存在空闲的线程,那么空闲的线程来执行这个任务。 - 如果没有空闲线程,那么那这个任务加入到阻塞队列中。
- 如果阻塞队列也满了,判断
corePoolSize < maximumPoolSize
,比如maximumPoolSize = 5,corePoolSize = 3
,那么创建一个线程来执行这个任务。否则,执行拒绝策略,或者存活线程数目达到最大线程数目,也会执行拒绝策略。(即:当最大线程数与队列均满了以后,才会执行拒绝策略。 ) - 当大于
corePoolSize
并且空闲的线程(余下线程),在超过keepAliveTime
后,会被回收。
- 当有任务到达时,如果已经创建的线程数小于
tips:阿里开发规范中强调:【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
2.3 阻塞队列
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
ArrayBlockingQueue
:一个由数组结构组成的有界阻塞队列。LinkedBlockingQueue
:一个由链表结构组成的有界阻塞队列。PriorityBlockingQueue
:一个支持优先级排序的无界阻塞队列。DelayQueue
: 一个使用优先级队列实现的无界阻塞队列。SynchronousQueue
: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态 。LinkedTransferQueue
: 一个由链表结构组成的无界阻塞队列。LinkedBlockingDeque
: 一个由链表结构组成的双向阻塞队列。
- 核心方法API
抛出异常 | 当阻塞队列满时,再往队列里面add插入元素会抛IllegalStateException: Queue full 当阻塞队列空时,再往队列Remove元素时候回抛出 NoSuchElementException |
特殊值 | 插入方法,成功返回true 失败返回false 移除方法,成功返回元素,队列里面没有就返回 null |
一直阻塞 | 当阻塞队列满时,生产者继续往队列里面put 元素,队列会一直阻塞直到put数据or响应中断退出当阻塞队列空时,消费者试图从队列 take 元素,队列会一直阻塞消费者线程直到队列可用. |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后限时后生产者线程就会退出 |
2.4 线程池的种类
Executors
类在创建线程池时,底层还是采用ThreadPoolExecutor
的构造方法。
Executors.newFixedThreadPool
定长线程池1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
作用:创建一个可重用固定线程数量的线程池,以共享的无界队列方式来运行这些线程。
特征:
(1)线程池中的线程处于一定的量,可以很好的控制线程的并发量
(2)线程可以重复被使用,在显示关闭之前,都将一直存在
(3)超出一定量的线程被提交时候需在队列中等待
newCachedThreadPool
缓存线程池1
2
3
4
5
6
7
8public class Executors {
....
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}根据需要创建线程,可以看到,
corePoolSize = 0 maximumPoolSize = Integer.MAX_VALUE
即没有核心线程,当60s内没有任务时,将会回收存活的线程,60s内有任务时,他可以重用已有的线程 。就是来一个任务创建一个线程,最多创建21亿个线程。newScheduledThreadPool
定时线程池1
2
3
4
5
6
7
8public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}该线程池适合执行延时任务。
newSingleThreadExecutor
只有一个线程的线程池1
2
3
4
5
6public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
总结:实际开发用哪个?哪个都不用,上面的阿里规范提到,用ThreadPoolExecutor
来初始化一个线程池,参数自己指定。
3. Compare and Swap
3.1 原理
compare and swap
简称CAS
,本质上一条原子指令,即要么执行,要么不执行,利用CPU
底层来实现并发过程中数据不一致问题。java.util.concurrent.atmoic.AtomicInteger
等类底层实现就采用了CAS
,来解决并发过程中i++
导致的数据不一致问题。实际上i++
在被虚拟机编译后,并非一条语句,而被划分为三条指令,因此在多线程环境下会造成数据不一致。
3.2 带来的问题
典型的ABA
问题。即CAS
关注了结果正确,而忽视了过程是否正确。
解决方案:原子引用。类似于MySQL
中的时间戳。
4. 线程不安全类
4.1 ArrayList
1 | public class ContainerDemo { |
会报: java.util.ConcurrentModificationException
- 解决方案
new Vector();
Collections.synchronizedList(new ArrayList<>());
new CopyOnWriteArrayList<>();
- 优化建议
- 在读多写少的时候推荐使用
CopeOnWriteArrayList
这个类。底层采用了ReentrantLock
,读不加锁,set/add
加锁处理。
- 在读多写少的时候推荐使用
1 | public class ContainerDemo2 { |