type
status
date
slug
summary
tags
category
icon
password
1、概述
上一篇介绍了 AQS 的原理,JCU 包里面几乎所有的有关锁、多线程并发以及线程同步器等重要组件的实现都是基于 AQS 这个组件。常用的并发组件如下
ReentrantLock
:Lock
接口最常用的实现类,可重入的互斥锁,功能类似于synchronized
,但是会比synchronized
更强大灵活。
ReentrantReadWriteLock
:读写锁,允许多个读线程同时访问,但只允许一个写线程访问,其会阻塞所有的读写线程。适用于读多写少的场景。
CountDownLatch
:让一个或多个线程等待,直到其他线程执行的操作完成后再执行。
CyclicBarrier
:让一组线程等待至某个状态之后再全部同时执行。功能和CountDownLatch
类似,但是CountDownLatch
只能使用一次,而CyclicBarrier
可以重复使用。
Semaphore
:信号量,用于限制同时访问资源的线程数。
Phaser
(简单了解):功能上类似于CyclicBarrier
和CountDownLatch
,适用于在多线程环境下同步协调分阶段计算任务(Fork/Join框架中的子任务之间需同步时,优先使用Phaser)
Exchanger
(简单了解):允许两个线程在某个同步点交换数据,类似于SynchronousQueue
的双向形式,适用于管道设计。
2、ReentrantLock
ReentrantLock
是 JUC 提供的一种与 synchronized
关键字作用类似的锁,与 synchronized
一样都支持重入锁。相比起 synchronized
,ReentrantLock
提供以下功能增强:- 等待可中断:
ReentrantLock
中一个线程可以通过interrupt
方法取消另一个线程的锁等待。
- 可实现公平锁:
ReentrantLock
提供了公平锁和非公平锁两种实现,公平锁可以保证线程先来先得。而synchronized
依赖于 CPU 调度,多个线程同时竞争锁,不能保证先到先得的公平性。
- 可绑定多个条件:
ReentrantLock
可以绑定多个Condition
,每个Condition
对应一个等待队列;而synchronized
竞争失败的线程都只会进入该对象锁的 Monitor 的阻塞队列中,也就是没有条件可以选择。
- 可设置竞争锁资源的超时时间:
ReentrantLock
可以设置竞争锁资源的超时时间,而synchronized
只能无限等待。
在介绍
ReentrantLock
之前先介绍下 Lock
接口。2.1 Lock 接口
Lock
接口位于 java.util.concurrent.locks
包下。Lock
和 ReadWriteLock
是两大锁的根接口,Lock
接口支持那些语义不同(重入、公平等)的锁规则,代表实现类是ReentrantLock
。
-
ReadWriteLock
接口定义了读写锁,读锁可以共享,写锁只能独占。此包只提供了一个实现,即ReentrantReadWriteLock
。
Lock
接口对外提供的 api 接口有:lock()
、tryLock()
、tryLock(long time, TimeUnit unit)
和 lockInterruptibly()
都是用来获取锁的。unLock()
方法是用来释放锁的。 newCondition()
返回绑定到此 Lock
的新的 Condition
实例 ,用于线程间的协作。synchronized
和Lock
之间的关联:
synchronized
配合wait
和notify
可以实现线程在条件不满足时等待,条件满足时唤醒。
Lock
接口使用Condition
对象来实现wait
和notify
的功能。
Condition
接口表示与锁有关联的条件变量,一个Lock
可以与多个Condition
对象关联。Condition
可以看做是 Obejct 类的wait()
、notify()
、notifyAll()
方法的替代品,但是提供了更强大的功能。Condition
接口实现是ConditionObject
。因为Condition
的操作需要获取相关联的锁,所以ConditionObject
设置为 AQS 里面的内部类。
2.1.1 lock()
lock()
方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。使用 Lock 必须在 try…catch…
块中进行,并且将释放锁的操作放在 finally
块中进行,以保证锁一定被被释放,防止死锁的发生。2.1.2 tryLock()
& tryLock(long time, TimeUnit unit)
tryLock()
方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回 true;如果获取失败(即锁已被其他线程获取),则返回 false,也就是说,这个方法无论如何都会立即返回,拿不到锁时不会一直在那等待。tryLock(long time, TimeUnit unit)
方法和 tryLock()
方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回 false,同时可以响应中断。如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。2.1.3 lockInterruptibly()
lockInterruptibly()
方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。例如,当两个线程同时通过 lock.lockInterruptibly()
想获取某个锁时,假若此时线程 A 获取到了锁,而线程 B 开始等待,那么对线程 B 调用 threadB.interrupt()
方法能够中断线程 B 的等待过程。由于
lockInterruptibly()
的声明中抛出了异常,所以 lock.lockInterruptibly()
必须放在 try
块中或者在调用 lockInterruptibly()
的方法外声明抛出 InterruptedException
,但推荐使用后者。2.1.4 newCondition()
newCondition()
必须搭配 lock 一起使用,注意:- 调用 condition 的
await()
或者signal()
方法之前必须先获取锁。
- 如果 lock 绑定了多个 condition,已经获取到 lock 的线程在任意一个 condition 上执行
await()
方法都会立刻退出放弃 lock。
- 被唤醒线程从
await()
返回后不是立刻获取到 lock,而是加入竞争锁的同步队列,和其他线程一起竞争锁。
2.2 ReentrantLock使用
2.2.1 锁重入
结果打印如下:
2.2.2 可中断
可中断的特性就是为了避免线程“死等”的问题,让线程的等待受开发者控制。
因为这个中断线程等待的操作是需要其他线程来调用
interrput
方法,所以这种特性是一种被动的防止死锁的方式,开发者要考虑在什么情况下,何时来打断等待的线程,所以这种方式在使用上会有一些额外的考虑因素影响实际的代码编写。结果打印如下:
2.2.3 绑定多条件
结果打印如下
2.3 ReentrantLock原理
ReentrantLock
的实现原理是:ReentrantLock
实现了 Lock 接口。
ReentrantLock
中有一个Sync
类型类型的成员变量sync
,Sync
是一个抽象类继承于AQS
。
Sync
有两个子类NonfairSync
和FairSync
,分别对应非公平锁和公平锁。
ReentrantLock
中的AQS
使用state
表示占有线程对锁的持有数量,为 0 表示锁未被持有,为 1 表示锁被某个线程持有,> 1 表示锁被某个线程持有多次(即重入)。
这里公平锁和非公平锁的区别是:
- 非公平锁:当前线程释放锁之后(时间片用完或者被中断退出),由于当前线程的业务逻辑没有完成,当前线程继续参与下一轮锁竞争的时候极有可能会继续获取锁成功(因为可以减少线程切换开销,线程调度器更倾向于当前线程),并没有遵循 CLH 队列先到先得得原则,所以才叫非公平锁,容易会造成其他线程饥饿。
- 公平锁:竞争锁的时候按照先到先得得原则,FairSync 需要检查当前线程是否是 CLH 队列的第一个,如果不是的话就获取锁失败,然后把当前线程线程放到 CLH 队列的队尾。
非公平锁的性能高于公平锁,因为节省很多线程切换时间,吞吐量自然就上去了。公平锁适用于对性能要求不高,追求公平的场景。
ReentrantLock
默认使用非公平锁。公平锁和非公平锁在代码上的实现上逻辑大体一致,只有下面地方有差异。
在获取锁的时候,公平锁比非公平锁多了
!hasQueuedPredecessors()
一个判断,hasQueuedPredecessors()
这个方法在当前线程是同步队列的第一个元素或者同步队列为空的时候才会返回 false,!hasQueuedPredecessors()
才为 true,才会进行后面的逻辑。Sync 保证了没有线程等待时间超过当前线程,保证了 FIFO 先进先出的公平性。详细可参考
3、ReentrantReadWriteLock
有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
ReentrantReadWriteLock
读写锁就是针对这种场景设计的。ReentrantReadWriteLock
包含读锁和写锁两把锁,可以保证读写、写写互斥,而读读却是可以并发执行的。在使用中,我们只需根据需要操作读锁或写锁就可以了。3.1 ReentrantReadWriteLock使用
结果打印如下,可以看到写线程会阻塞两个读线程,但两个读线程互相不会阻塞,可以同时读到数据。
3.2 ReentrantReadWriteLock原理
ReentrantReadWriteLock
的内部结构如上所示:ReentrantReadWriteLock
实现了ReadWriteLock
接口,需要实现readLock()
和writeLock()
两个方法。不同于一般的Lock
接口实现类只需要实现一个锁,ReadWriteLock
接口实现类需要实现读锁和写锁两个锁。
ReentrantReadWriteLock
的内部类Sync
继承了AQS
,可以实现重入的功能。Sync
还有两个子类NonfairSync
和FairSync
分别实现了非公平锁和公平锁的功能,以上和 ReentrantLock 的实现思路相同。
ReentrantReadWriteLock
本身并没有实现Lock
接口,所以ReentrantReadWriteLock
本身并没有提供lock()/tryLock()
这些方法,但ReentrantReadWriteLock
提供一个全局的Sync
类的属性sync
。ReentrantReadWriteLock
的内部类ReadLock
和WriteLock
实现了Lock
接口,内部使用了同一个属性sync
,所以两者的实现方法lock()/tryLock()
都基于同一个属性sync
,也就是基于同一个sync
内部的state
状态。
ReadLock
调用了sync
的acquireShared/releaseShared
方法,实现了共享锁功能;而WriteLock
调用了sync
的acquire/release
方法,实现了互斥锁功能。
ReentrantReadWriteLock
本身也提供了公平锁和非公平锁,原理类似于ReentrantLock
。
看完上面你可能会好奇,一个 int 类型的 state 变量怎么可以同时被两个锁使用呢?
原理是:int 类型的 state 变量的长度是 32 位,将 state 变量分割成高 16 位和低 16 位,分别表示读状态和写状态。
假设当前同步状态值为 S:
- 获取读锁状态:S >> 16,右移16位,相当于将低 16 位全部抹去
- 读锁状态加1:S + (1 << 16)
- 获取写锁状态:S & 0x0000FFFF,将高 16 位全部抹去。
WriteLock
提供了一个EXCLUSIVE_MASK
变量进行&运算,值等于 0x0000FFFF。
- 写锁状态加1:S + 1
WriteLock
可以降级为 ReadLock
,顺序是:先获得 WriteLock
再获得 ReadLock
,然后释放 WriteLock
,这时候线程将保持 Readlock
的持有,相当于 WriteLock
降级为 Readlock
。4、CountDownLatch
CountDownLatch
的作用是允许一个或多个线程等待,一直到其他线程执行的操作完成后再执行。使用过程一般是:CountDownLatch
的构造函数接收一个 int 型参数 N 作为计数器。
- 需要等待的线程会调用
CountDownLatch
的await()
方法,该线程就会阻塞直到计数器的值减为 0。
- 达到自己预期的线程会调用
CountDownLatch
的countDown()
方法,计数器值减 1。
- 计数器 N 既可以代表 N 个线程,也可以是一个线程的 N 个执行步骤。特别的,当 N 的值为 1 时,退化为单一事件,即由一个线程来通知其他线程,效果等同于对象的
wait
和notifyAll
。
4.1 CountDownLatch使用
第一个场景:让多个并发线程等待到同一个时间点一起执行,这种用法类似于
wait
和 notifyAll
的效果,用于模拟并发的场景。实现过程如下:- 创建 CountDownLatch 并设置计数器值,值为 1。
- 启动多线程,每个线程调用
await()
方法,子线程会在这里阻塞住。
- 主线程调用
countDown()
方法,这样所有线程就可以一起往下执行。
结果打印如下:
第二个场景:让单个线程等待多个线程完成后再向下执行,这种用法类似于
join
的效果,用于汇总合并的场景。实现过程:- 创建
CountDownLatch
并设置计数器值,值和线程数相同。
- 启动多线程,每个线程在执行完任务之后调用
countDown()
方法。
- 主线程调用
await()
方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务,count
值为0,主线程继续执行。
结果打印如下:
4.2 CountDownLatch原理
CountDownLatch
的内部类 Sync 实现了 AQS 的共享模式,所以实现了 tryAcquireShared/tryReleaseShared
方法。然后
CountDownLatch
的实现方法 await()/countDown()
再分别调用 sync
里面的方法:因为初始化
state
值是大于 0 的整数,线程拿到锁后要把 state
的值减 1。这是和互斥锁最大的区别:互斥锁在获取到锁会把 state
的值加 1。countDown()
是执行任务后达到预期的线程调用的,用于将state
值减 1,这是一个释放锁的操作,所以调用了sync
的tryReleaseShared()
方法。
await()
是被阻塞线程调用的,用于等待state
值变成 0。这是一个获取锁的操作,所以调用了sync
的tryAcquireShared()
方法。
5、CyclicBarrier
上面的
CountDownLatch
虽然可以实现多个线程等待一个执行条件满足后再继续往下执行,但是他在某些场景下不能很好的适用:CountDownLatch
只能使用一次,如果需要多次使用只能创建多个CountDownLatch
实例。
CountDownLatch
需要显式使用countDown()
去唤醒被阻塞的线程。
CyclicBarrier
的功能类似于 CountDownLatch
,他的作用是所有的线程必须到齐后才能一起通过这个CyclicBarrier
。功能上比 CountDownLatch
更强大一点:- 每个线程在到达栅栏的时候都会调用
await()
方法将自己阻塞,当阻塞的线程数达到了CyclicBarrier
初始的数量时,所有进入等待状态的线程被唤醒并继续。
CyclicBarrier
会自动重置屏障,也就是说CyclicBarrier
可以多次使用。
CyclicBarrier API 列表
方法 | 作用 |
await() | 阻塞当前线程,直到阻塞线程数达到 CountDownLatch 的初始值 |
await(long timeout, TimeUnit unit) | 同 await() ,增加超时时间 |
getNumberWaiting() | 返回当前在屏障处等待的参与者数目 |
getParties() | 返回要求启动此 barrier 的参与者数目 |
isBroken() | 查询此屏障是否处于损坏状态 |
void | 将屏障重置为其初始状态 |
5.1 CyclicBarrier使用
CyclicBarrier
初始值为 3, 表示3个线程到达屏障后,一起向下执行
- 3个线程都到达屏障后,屏障会自动重置,可以重复使用
控制台输出:
5.2 CyclicBarrier原理
CyclicBarrier 的实现并没有使用内部类继承 AQS,而是直接使用
ReentrantLock + Condition
来实现整个同步逻辑。CyclicBarrier 的主要属性:Generation
是 CyclicBarrier 的内部类,表示代的概念,因为 CyclicBarrier 是可以复用的,那么每次所有的线程通过了栅栏,就表示一代过去了,Generation
就会被自动重置。下一代开始,依然需要相同数目的线程到达栅栏才会释放。CyclicBarrier的工作流程:
- 假如初始时
count = parties = 5
,当第一个线程到达栅栏处,count减1,然后把它加入到 Condition 的等待队列中,第二个线程到达栅栏处也是如此,如此重复。
- 最后一个线程到达栅栏处,count 减为 0,调用 Condition 的
signalAll()
唤醒其他所有线程,所有线程从 Condition 的等待队列被加入 AQS 的同步队列。
- 等待当前线程运行完毕,调用
lock.unlock()
的时候依次从 AQS 的等待队列中唤醒一个线程继续运行,也就是说实际上线程先依次(排队)到达栅栏处,再依次往下运行。
CyclicBarrier
和CountDownLatch
的区别:
- CyclicBarrier 是最后一个线程到达时自动唤醒;CountDownLatch 需要通过显式地调用 countDown() 来唤醒。
- CyclicBarrier 基于
ReentrantLock + Condition
来实现;CountDownLatch 基于 AQS 来实现。
- CyclicBarrier 具有「代」的概念,可以重复使用;CountDownLatch 只能使用一次。
- CyclicBarrier 只能实现多个线程到达栅栏处一起运行,但是可以携带一个在栅栏处执行的任务;CountDownLatch 不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立
6、Semaphore
Semaphore
通常被称为信号量,作用是限制指定数量的线程线程同时访问一个资源,可以用于流量控制和资源控制。比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。
6.1 Semaphore使用
6.2 Semaphore原理
Semaphore 内部有 3 个类,继承了 AQS。一个公平锁,一个非公平锁,这点和 ReentrantLock 一摸一样。
Semaphore 有两个重要的方法,
acquire
用于获取令牌,release
用于释放令牌。如果你看过 ReentrantLock 的源码,Semaphore 的源码还是非常简单的。
7、Phaser
Phaser(移相器,一种电子元件)是JDK7中引入的新的并发工具辅助类,oralce 官网文档描述 Phaser 是一个可重复使用的同步栅栏,功能上与 CountDownLatch 和 CyclicBarrier 类似但支持的场景更加灵活,这个类可能是目前并发包里面实现最复杂的一个了。
Phaser的灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变。
Phaser API 列表
方法 | 作用 |
Phaser() | 默认的构造方法,初始化注册的线程数量为0 |
Phaser(int parties) | 一个指定线程数量的构造方法 |
register() | 添加一个新的注册者 |
bulkRegister(int parties) | 添加指定数量的多个注册者 |
arrive() | 到达栅栏点直接执行,无须等待其他的线程 |
arriveAndAwaitAdvance() | 到达栅栏点,必须等待其他所有注册者到达 |
arriveAndDeregister() | 到达栅栏点,注销自己无须等待其他的注册者到达 |
onAdvance(int phase, int registeredParties) | 多个线程达到注册点之后,会调用该方法。 |
7.1 Phaser使用
一个简单的替代
CountDownLatch
实现一次性的共享锁例子- 主线程先向
Phaser
注册自己,然后调用了arriveAndDeregister()
方法注销了自己,主线程无需等待其他线程到达栅栏,即可向下执行。
- 起 5 个线程,每个线程到达栅栏的时间不一致,线程到达栅栏后,先调用
arriveAndAwaitAdvance()
阻塞当前线程直到所有的线程都到达栅栏。等 5 个线程同时到达栅栏处后,同时向下执行。
控制台输出:
7.2 Phaser原理
Phaser 并没有采用 AQS 同步框架实现,而是单独定义了相关功能api,其中state采用64位的long类型表示,然后64bit又分成4个定义分别代表没有到达栅栏的数量(0-15bit),注册的数量(16-31bit),栅栏的代数量(32-62bit),最后一位(63bit)代表当前的Phaser是否是终止状态,这也意味着我们能够注册的最大数量不能超过65535,否则会抛出不合法参数异常,这一点在使用时需要注意。
8、Exchanger
Exchanger
可以在两个线程之间交换数据(注意只能是2个线程,不支持更多的线程之间互换数据)。Exchanger
提供一个同步点,在这个同步点两个线程建议交换彼此的数据。这两个线程通过 exchange 方法交换数据:第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法,等两个线程都到达同步点,这两个线程就可以交换数据。Exchanger
可以用于校对工作,比如我们需要将纸质银行流水人工录入成电子银行流水,为了避免错误,采用 AB 岗两人录入,录入完再对比是否一致。8.1 Exchanger使用
控制台输出
- Author:mcbilla
- URL:http://mcbilla.com/article/c3df6ef6-3b10-4623-94fc-2823e3f7ba9b
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts