type
status
date
slug
summary
tags
category
icon
password

1、概述

上一篇介绍了 AQS 的原理,JCU 包里面几乎所有的有关锁、多线程并发以及线程同步器等重要组件的实现都是基于 AQS 这个组件。常用的并发组件如下
  • ReentrantLockLock 接口最常用的实现类,可重入的互斥锁,功能类似于 synchronized,但是会比 synchronized 更强大灵活。
  • ReentrantReadWriteLock :读写锁,允许多个读线程同时访问,但只允许一个写线程访问,其会阻塞所有的读写线程。适用于读多写少的场景。
  • CountDownLatch :让一个或多个线程等待,直到其他线程执行的操作完成后再执行。
  • CyclicBarrier :让一组线程等待至某个状态之后再全部同时执行。功能和 CountDownLatch 类似,但是CountDownLatch只能使用一次,而CyclicBarrier可以重复使用。
  • Semaphore:信号量,用于限制同时访问资源的线程数。
  • Phaser(简单了解):功能上类似于 CyclicBarrierCountDownLatch ,适用于在多线程环境下同步协调分阶段计算任务(Fork/Join框架中的子任务之间需同步时,优先使用Phaser)
  • Exchanger(简单了解):允许两个线程在某个同步点交换数据,类似于 SynchronousQueue的双向形式,适用于管道设计。

2、ReentrantLock

ReentrantLock 是 JUC 提供的一种与 synchronized 关键字作用类似的锁,与 synchronized一样都支持重入锁。相比起 synchronizedReentrantLock 提供以下功能增强:
  • 等待可中断ReentrantLock 中一个线程可以通过 interrupt 方法取消另一个线程的锁等待。
  • 可实现公平锁ReentrantLock 提供了公平锁和非公平锁两种实现,公平锁可以保证线程先来先得。而 synchronized 依赖于 CPU 调度,多个线程同时竞争锁,不能保证先到先得的公平性。
  • 可绑定多个条件ReentrantLock 可以绑定多个 Condition ,每个 Condition 对应一个等待队列;而 synchronized竞争失败的线程都只会进入该对象锁的 Monitor 的阻塞队列中,也就是没有条件可以选择。
  • 可设置竞争锁资源的超时时间ReentrantLock 可以设置竞争锁资源的超时时间,而 synchronized 只能无限等待。
在介绍 ReentrantLock 之前先介绍下 Lock 接口。

2.1 Lock 接口

notion image
Lock 接口位于 java.util.concurrent.locks 包下。LockReadWriteLock 是两大锁的根接口,
  • Lock 接口支持那些语义不同(重入、公平等)的锁规则,代表实现类是ReentrantLock
  • ReadWriteLock 接口定义了读写锁,读锁可以共享,写锁只能独占。此包只提供了一个实现,即 ReentrantReadWriteLock
Lock 接口对外提供的 api 接口有:
lock()tryLock()tryLock(long time, TimeUnit unit)lockInterruptibly() 都是用来获取锁的。unLock() 方法是用来释放锁的。 newCondition() 返回绑定到此 Lock 的新的 Condition 实例 ,用于线程间的协作。
synchronizedLock 之间的关联:
  • synchronized配合 waitnotify 可以实现线程在条件不满足时等待,条件满足时唤醒。
  • Lock 接口使用Condition 对象来实现 waitnotify 的功能。
    • Condition 接口表示与锁有关联的条件变量,一个 Lock 可以与多个 Condition 对象关联。Condition 可以看做是 Obejct 类的 wait()notify()notifyAll() 方法的替代品,但是提供了更强大的功能。
    • Condition 接口实现是 ConditionObject。因为Condition 的操作需要获取相关联的锁,所以ConditionObject 设置为 AQS 里面的内部类。

2.1.1 lock()

lock() 方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。使用 Lock 必须在 try…catch…块中进行,并且将释放锁的操作放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生。

2.1.2 tryLock() & tryLock(long time, TimeUnit unit)

tryLock() 方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回 true;如果获取失败(即锁已被其他线程获取),则返回 false,也就是说,这个方法无论如何都会立即返回,拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit) 方法和 tryLock() 方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回 false,同时可以响应中断。如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

2.1.3 lockInterruptibly()

lockInterruptibly() 方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。例如,当两个线程同时通过 lock.lockInterruptibly() 想获取某个锁时,假若此时线程 A 获取到了锁,而线程 B 开始等待,那么对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。
由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException,但推荐使用后者。

2.1.4 newCondition()

newCondition() 必须搭配 lock 一起使用,注意:
  • 调用 condition 的 await() 或者 signal() 方法之前必须先获取锁。
  • 如果 lock 绑定了多个 condition,已经获取到 lock 的线程在任意一个 condition 上执行 await() 方法都会立刻退出放弃 lock。
  • 被唤醒线程从 await()返回后不是立刻获取到 lock,而是加入竞争锁的同步队列,和其他线程一起竞争锁。

2.2 ReentrantLock使用

2.2.1 锁重入

结果打印如下:

2.2.2 可中断

可中断的特性就是为了避免线程“死等”的问题,让线程的等待受开发者控制。
因为这个中断线程等待的操作是需要其他线程来调用 interrput方法,所以这种特性是一种被动的防止死锁的方式,开发者要考虑在什么情况下,何时来打断等待的线程,所以这种方式在使用上会有一些额外的考虑因素影响实际的代码编写。
结果打印如下:

2.2.3 绑定多条件

结果打印如下

2.3 ReentrantLock原理

notion image
ReentrantLock 的实现原理是:
  • ReentrantLock 实现了 Lock 接口。
  • ReentrantLock 中有一个 Sync 类型类型的成员变量 syncSync 是一个抽象类继承于 AQS
  • Sync 有两个子类 NonfairSync 和 FairSync,分别对应非公平锁和公平锁。
  • ReentrantLock 中的 AQS 使用 state 表示占有线程对锁的持有数量,为 0 表示锁未被持有,为 1 表示锁被某个线程持有,> 1 表示锁被某个线程持有多次(即重入)。
这里公平锁和非公平锁的区别是:
  • 非公平锁:当前线程释放锁之后(时间片用完或者被中断退出),由于当前线程的业务逻辑没有完成,当前线程继续参与下一轮锁竞争的时候极有可能会继续获取锁成功(因为可以减少线程切换开销,线程调度器更倾向于当前线程),并没有遵循 CLH 队列先到先得得原则,所以才叫非公平锁,容易会造成其他线程饥饿。
  • 公平锁:竞争锁的时候按照先到先得得原则,FairSync 需要检查当前线程是否是 CLH 队列的第一个,如果不是的话就获取锁失败,然后把当前线程线程放到 CLH 队列的队尾。
非公平锁的性能高于公平锁,因为节省很多线程切换时间,吞吐量自然就上去了。公平锁适用于对性能要求不高,追求公平的场景。ReentrantLock 默认使用非公平锁。
公平锁和非公平锁在代码上的实现上逻辑大体一致,只有下面地方有差异。
notion image
在获取锁的时候,公平锁比非公平锁多了 !hasQueuedPredecessors() 一个判断,hasQueuedPredecessors() 这个方法在当前线程是同步队列的第一个元素或者同步队列为空的时候才会返回 false,!hasQueuedPredecessors() 才为 true,才会进行后面的逻辑。Sync 保证了没有线程等待时间超过当前线程,保证了 FIFO 先进先出的公平性。
详细可参考

3、ReentrantReadWriteLock

有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
ReentrantReadWriteLock 读写锁就是针对这种场景设计的。ReentrantReadWriteLock 包含读锁和写锁两把锁,可以保证读写、写写互斥,而读读却是可以并发执行的。在使用中,我们只需根据需要操作读锁或写锁就可以了。

3.1 ReentrantReadWriteLock使用

结果打印如下,可以看到写线程会阻塞两个读线程,但两个读线程互相不会阻塞,可以同时读到数据。

3.2 ReentrantReadWriteLock原理

notion image
ReentrantReadWriteLock 的内部结构如上所示:
  • ReentrantReadWriteLock 实现了 ReadWriteLock 接口,需要实现 readLock() 和 writeLock() 两个方法。不同于一般的 Lock 接口实现类只需要实现一个锁,ReadWriteLock 接口实现类需要实现读锁和写锁两个锁。
  • ReentrantReadWriteLock 的内部类 Sync 继承了 AQS,可以实现重入的功能。Sync 还有两个子类 NonfairSync 和 FairSync 分别实现了非公平锁和公平锁的功能,以上和 ReentrantLock 的实现思路相同。
  • ReentrantReadWriteLock 本身并没有实现 Lock 接口,所以 ReentrantReadWriteLock 本身并没有提供 lock()/tryLock() 这些方法,但 ReentrantReadWriteLock 提供一个全局的 Sync 类的属性 syncReentrantReadWriteLock 的内部类 ReadLock 和 WriteLock 实现了 Lock 接口,内部使用了同一个属性 sync,所以两者的实现方法 lock()/tryLock() 都基于同一个属性 sync,也就是基于同一个 sync 内部的 state 状态。
  • ReadLock 调用了 sync 的 acquireShared/releaseShared 方法,实现了共享锁功能;而 WriteLock 调用了 sync 的 acquire/release 方法,实现了互斥锁功能。
  • ReentrantReadWriteLock 本身也提供了公平锁和非公平锁,原理类似于 ReentrantLock
    看完上面你可能会好奇,一个 int 类型的 state 变量怎么可以同时被两个锁使用呢?
    原理是:int 类型的 state 变量的长度是 32 位,将 state 变量分割成高 16 位和低 16 位,分别表示读状态和写状态
    notion image
    假设当前同步状态值为 S:
    • 获取读锁状态:S >> 16,右移16位,相当于将低 16 位全部抹去
    • 读锁状态加1:S + (1 << 16)
    • 获取写锁状态:S & 0x0000FFFF,将高 16 位全部抹去。WriteLock 提供了一个 EXCLUSIVE_MASK 变量进行&运算,值等于 0x0000FFFF。
    • 写锁状态加1:S + 1
    WriteLock 可以降级为 ReadLock,顺序是:先获得 WriteLock 再获得 ReadLock,然后释放 WriteLock,这时候线程将保持 Readlock 的持有,相当于 WriteLock 降级为 Readlock

    4、CountDownLatch

    CountDownLatch 的作用是允许一个或多个线程等待,一直到其他线程执行的操作完成后再执行。使用过程一般是:
    • CountDownLatch 的构造函数接收一个 int 型参数 N 作为计数器。
    • 需要等待的线程会调用 CountDownLatchawait() 方法,该线程就会阻塞直到计数器的值减为 0。
    • 达到自己预期的线程会调用 CountDownLatchcountDown()方法,计数器值减 1。
    • 计数器 N 既可以代表 N 个线程,也可以是一个线程的 N 个执行步骤。特别的,当 N 的值为 1 时,退化为单一事件,即由一个线程来通知其他线程,效果等同于对象的 waitnotifyAll

    4.1 CountDownLatch使用

    第一个场景:让多个并发线程等待到同一个时间点一起执行,这种用法类似于 waitnotifyAll 的效果,用于模拟并发的场景。实现过程如下:
    1. 创建 CountDownLatch 并设置计数器值,值为 1。
    1. 启动多线程,每个线程调用 await() 方法,子线程会在这里阻塞住。
    1. 主线程调用 countDown() 方法,这样所有线程就可以一起往下执行。
    结果打印如下:
    第二个场景:让单个线程等待多个线程完成后再向下执行,这种用法类似于 join 的效果,用于汇总合并的场景。实现过程:
    1. 创建 CountDownLatch 并设置计数器值,值和线程数相同。
    1. 启动多线程,每个线程在执行完任务之后调用 countDown() 方法。
    1. 主线程调用 await() 方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务,count 值为0,主线程继续执行。
    结果打印如下:

    4.2 CountDownLatch原理

    CountDownLatch 的内部类 Sync 实现了 AQS 的共享模式,所以实现了 tryAcquireShared/tryReleaseShared 方法。
    然后 CountDownLatch 的实现方法 await()/countDown() 再分别调用 sync 里面的方法:
    因为初始化 state 值是大于 0 的整数,线程拿到锁后要把 state 的值减 1。这是和互斥锁最大的区别:互斥锁在获取到锁会把 state 的值加 1。
    • countDown() 是执行任务后达到预期的线程调用的,用于将 state 值减 1,这是一个释放锁的操作,所以调用了 sync 的 tryReleaseShared() 方法。
    • await() 是被阻塞线程调用的,用于等待 state 值变成 0。这是一个获取锁的操作,所以调用了 sync 的 tryAcquireShared() 方法。

    5、CyclicBarrier

    上面的 CountDownLatch 虽然可以实现多个线程等待一个执行条件满足后再继续往下执行,但是他在某些场景下不能很好的适用:
    • CountDownLatch 只能使用一次,如果需要多次使用只能创建多个 CountDownLatch 实例。
    • CountDownLatch 需要显式使用 countDown() 去唤醒被阻塞的线程。
    CyclicBarrier 的功能类似于 CountDownLatch ,他的作用是所有的线程必须到齐后才能一起通过这个CyclicBarrier。功能上比 CountDownLatch 更强大一点:
    • 每个线程在到达栅栏的时候都会调用 await() 方法将自己阻塞,当阻塞的线程数达到了 CyclicBarrier 初始的数量时,所有进入等待状态的线程被唤醒并继续。
    • CyclicBarrier 会自动重置屏障,也就是说 CyclicBarrier 可以多次使用。
    CyclicBarrier API 列表
    方法
    作用
    await()
    阻塞当前线程,直到阻塞线程数达到 CountDownLatch 的初始值
    await(long timeout, TimeUnit unit)
    await() ,增加超时时间
    getNumberWaiting()
    返回当前在屏障处等待的参与者数目
    getParties()
    返回要求启动此 barrier 的参与者数目
    isBroken()
    查询此屏障是否处于损坏状态
    void
    将屏障重置为其初始状态

    5.1 CyclicBarrier使用

    1. CyclicBarrier 初始值为 3, 表示3个线程到达屏障后,一起向下执行
    1. 3个线程都到达屏障后,屏障会自动重置,可以重复使用
    控制台输出:

    5.2 CyclicBarrier原理

    CyclicBarrier 的实现并没有使用内部类继承 AQS,而是直接使用 ReentrantLock + Condition 来实现整个同步逻辑。CyclicBarrier 的主要属性:
    Generation 是 CyclicBarrier 的内部类,表示代的概念,因为 CyclicBarrier 是可以复用的,那么每次所有的线程通过了栅栏,就表示一代过去了,Generation 就会被自动重置。下一代开始,依然需要相同数目的线程到达栅栏才会释放。
    CyclicBarrier的工作流程:
    1. 假如初始时 count = parties = 5,当第一个线程到达栅栏处,count减1,然后把它加入到 Condition 的等待队列中,第二个线程到达栅栏处也是如此,如此重复。
    1. 最后一个线程到达栅栏处,count 减为 0,调用 Condition 的 signalAll() 唤醒其他所有线程,所有线程从 Condition 的等待队列被加入 AQS 的同步队列。
    1. 等待当前线程运行完毕,调用 lock.unlock() 的时候依次从 AQS 的等待队列中唤醒一个线程继续运行,也就是说实际上线程先依次(排队)到达栅栏处,再依次往下运行。
    CyclicBarrierCountDownLatch 的区别:
    • CyclicBarrier 是最后一个线程到达时自动唤醒;CountDownLatch 需要通过显式地调用 countDown() 来唤醒。
    • CyclicBarrier 基于 ReentrantLock + Condition 来实现;CountDownLatch 基于 AQS 来实现。
    • CyclicBarrier 具有「代」的概念,可以重复使用;CountDownLatch 只能使用一次。
    • CyclicBarrier 只能实现多个线程到达栅栏处一起运行,但是可以携带一个在栅栏处执行的任务;CountDownLatch 不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立

    6、Semaphore

    Semaphore 通常被称为信号量,作用是限制指定数量的线程线程同时访问一个资源,可以用于流量控制和资源控制。
    比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

    6.1 Semaphore使用

    6.2 Semaphore原理

    notion image
    Semaphore 内部有 3 个类,继承了 AQS。一个公平锁,一个非公平锁,这点和 ReentrantLock 一摸一样。
    Semaphore 有两个重要的方法,acquire 用于获取令牌,release 用于释放令牌。
    如果你看过 ReentrantLock 的源码,Semaphore 的源码还是非常简单的。

    7、Phaser

    Phaser(移相器,一种电子元件)是JDK7中引入的新的并发工具辅助类,oralce 官网文档描述 Phaser 是一个可重复使用的同步栅栏,功能上与 CountDownLatch 和 CyclicBarrier 类似但支持的场景更加灵活,这个类可能是目前并发包里面实现最复杂的一个了。
    Phaser的灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变。
    Phaser API 列表
    方法
    作用
    Phaser()
    默认的构造方法,初始化注册的线程数量为0
    Phaser(int parties)
    一个指定线程数量的构造方法
    register()
    添加一个新的注册者
    bulkRegister(int parties)
    添加指定数量的多个注册者
    arrive()
    到达栅栏点直接执行,无须等待其他的线程
    arriveAndAwaitAdvance()
    到达栅栏点,必须等待其他所有注册者到达
    arriveAndDeregister()
    到达栅栏点,注销自己无须等待其他的注册者到达
    onAdvance(int phase, int registeredParties)
    多个线程达到注册点之后,会调用该方法。

    7.1 Phaser使用

    一个简单的替代 CountDownLatch 实现一次性的共享锁例子
    1. 主线程先向 Phaser 注册自己,然后调用了 arriveAndDeregister() 方法注销了自己,主线程无需等待其他线程到达栅栏,即可向下执行。
    1. 起 5 个线程,每个线程到达栅栏的时间不一致,线程到达栅栏后,先调用 arriveAndAwaitAdvance() 阻塞当前线程直到所有的线程都到达栅栏。等 5 个线程同时到达栅栏处后,同时向下执行。
    控制台输出:

    7.2 Phaser原理

    Phaser 并没有采用 AQS 同步框架实现,而是单独定义了相关功能api,其中state采用64位的long类型表示,然后64bit又分成4个定义分别代表没有到达栅栏的数量(0-15bit),注册的数量(16-31bit),栅栏的代数量(32-62bit),最后一位(63bit)代表当前的Phaser是否是终止状态,这也意味着我们能够注册的最大数量不能超过65535,否则会抛出不合法参数异常,这一点在使用时需要注意。
    notion image

    8、Exchanger

    Exchanger 可以在两个线程之间交换数据(注意只能是2个线程,不支持更多的线程之间互换数据)。
    Exchanger提供一个同步点,在这个同步点两个线程建议交换彼此的数据。这两个线程通过 exchange 方法交换数据:第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法,等两个线程都到达同步点,这两个线程就可以交换数据。
    Exchanger 可以用于校对工作,比如我们需要将纸质银行流水人工录入成电子银行流水,为了避免错误,采用 AB 岗两人录入,录入完再对比是否一致。

    8.1 Exchanger使用

    控制台输出
    Java并发系列(四):并发集合Java并发系列(二):AQS
    mcbilla
    mcbilla
    一个普通的干饭人🍚
    Announcement
    type
    status
    date
    slug
    summary
    tags
    category
    icon
    password
    🎉欢迎来到飙戈的博客🎉
    -- 感谢您的支持 ---
    👏欢迎学习交流👏