type
status
date
slug
summary
tags
category
icon
password

1、概述

并发集合类都放在 java.util.concurrent 包下,常用的有以下类:
  • ConcurrentHashMap:线程安全的 HashMap
  • ConcurrentLinkedQueue :线程安全非阻塞的 LinkedList
  • CopyOnWriteArrayList:线程安全且读操作无锁的 ArrayList
  • BlockingQueue :JDK5 新增的线程安全的阻塞队列类,相比起传统队列,增加支持阻塞的插入和移除方法
    • ArrayBlockingQueue :数组实现的有界阻塞队列
    • LinkedBlockingQueue :链表实现的有界阻塞队列
    • PriorityBlockingQueue :支持优先级的无界阻塞队列
    • DelayQueue :支持延时获取元素的无界阻塞队列
    • SynchronousQueue :不存储元素的阻塞队列
    • LinkedTransferQueue :链表实现的无界阻塞队列。相对于其他阻塞队列,多了 tryTransfertransfer 方法
    • LinkedBlockingDeque :链表实现的双向阻塞队列

2、ConcurrentHashMap

ConcurrentHashMap 是线程安全的 map 容器,其中利用了锁分段的思想提高了并发度。线程安全的 map 其实还有其他实现类(例如 ConcurrentSkipListMap),但 ConcurrentHashMap 是最常用的 map 实现类,也是面试常问的并发集合类,应该重点掌握。

2.1 ConcurrentHashMap实现原理

ConcurrentHashMap 在 jdk1.7 以前和 jdk1.8 以后的实现方法大大不同。

2.1.1 jdk1.7 版本的实现

jdk1.7 版本采用 数组+链表 的数据结构,采用了分段锁的思想,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
整体包含 Segment + HashEntry 两种结构:
  • 一个 ConcurrentHashMap 里包含一个 Segment 数组,每个 Segment 存储一段数据。Segment 继承了 ReentrantLock 充当锁的角色,只有获取到 Segment 锁才能操作其中的数据。
  • Segment 内部类似于 HashMap 的结构,每个 Segment包含一个 HashEntry 数组,每个 HashEntry 元素又是一个链表结构。
notion image
这个版本下 ConcurrentHashMap 定位一个元素的过程需要进行两次 Hash 操作。第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的链表的头部。缺点就是实现较为臃肿复杂。

2.1.2 jdk1.8 版本的实现

jdk1.8 版本放弃了 Segment 的概念,直接用 数组+链表+红黑树 的数据结构来实现,并发控制使用 CAS + synchronized 来保证并发更新的安全,整个看起来就像是优化过且线程安全的 HashMap。
虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。
notion image
这个版本下 ConcurrentHashMap 的结构已经跟 HashMap 差不多了,当链表的长度超过 8 时,链表就会转换成红黑树。
ConcurrentHashMap 内部的关键属性和方法如下:
核心是 Node 类以及继承 Node 类的三个子类 TreebinTreeNodeForwardingNode。他们的继承结构如下所示:
notion image
那什么时候使用 Node,什么时候使用 TreeBin,什么时候使用 ForwardingNode呢?
其实在绝大部分场景中,使用的依然是 Node,从 Node 数据结构中,不难看出,Node 其实是一个链表,也就是说,一个正常的 Map 可能是长这样的:
notion image
上图中,绿色部分表示Node数组,里面的元素是 Node,也就是链表的头部,当两个元素在数据中的位置发生冲突时,就将它们通过链表的形式,放在一个槽位中,数组槽位对应的是一个链表。
当链表的长度大于等于8时,会将链表树状化,也就是变成一颗红黑树。如下图所示,其中一个槽位就变成了一颗树,Node 就变成了 TreeBinTreeBin 内部使用 TreeNode 构建红黑树)。
notion image
当数组容量快满时,即超过 75% 的容量时,数组还需要进行扩容,在扩容过程中,如果老的数组已经完成了复制,那么就会将老数组中的元素使用 ForwardingNode 对象替代,表示当前槽位的数据已经处理了,不需要再处理了,这样,当有多个线程同时参与扩容时,就不会冲突。

2.2 put()方法的实现

put() 方法是 ConcurrentHashMap 中最重要的方法,负责将给定的 key 和 value 对存入 HashMap
主要包括以下几个步骤:
  1. 对 key 的 hashcode 进行一次 hash 计算
  1. 如果没有初始化数组,则尝试初始化数组
  1. 将给定的 key-value 放入对应的槽位
  1. 如果当前正在扩容,则参与帮助扩容
  1. 检查是否需要进行扩容操作

2.2.1 hash计算

这个方法传入的参数是 key 的 hashcode
  • (h ^ (h >>> 16)) 目的是让 h 的高 16 位也参与寻址计算,使得到的 hash 值更分散,减少 hash 冲突产生
  • (h ^ (h >>> 16)) 得到的结果再 & HASH_BITS,目的是为了让得到的hash值结果始终是一个正数
  • HASH_BITS 的 值是 0x7fffffff ,即 0111 1111 1111 1111 1111 1111 1111 1111
假如 h 的值是 1100 0011 1010 0101 0001 1100 0001 1110
  1. (h >>> 16) 0000 0000 0000 0000 1100 0011 1010 0101
  1. (h ^ (h >>> 16)) 1100 0011 1010 0101 1101 1111 1011 1011
  1. (h ^ (h >>> 16)) & HASH_BITS 0100 0011 1010 0101 1101 1111 1011 1011

2.2.2 初始化数组

这一步主要是判断数组是否初始化,如果没有初始化就初始化数组,默认 table 长度是 16。如果用户传入参数设置 table 大小,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是2的幂次方。然后会设置负载因子为 0.75。

2.2.3 设置 key-value 对

当数组初始化完成后,这一步采用 CAS + synchronized 实现并发插入或更新操作。这一步的核心是 tabAt()casTabAt() 两个方法。
  1. (f = tabAt(tab, i = (n - 1) & hash)) == null ,说明 table 中这个位置第一次插入元素,利用 Unsafe.compareAndSwapObject 方法插入 Node 节点。
    1. 如果 CAS 成功,说明 Node 节点已经插入,随后 addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。
    2. 如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
  1. 其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发。
    1. 如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。
    2. 如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。
    3. 如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。

2.2.4 参与帮助扩容

如果一个节点的 hash 是 MOVE,则表示这是一个 ForwardingNode,也就是当前正在扩容中,为了尽快完成扩容,当前线程就会调用 helpTransfer() 参与到扩容的工作中。

2.2.5 检查是否需要进行扩容操作

最后调用addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,调用 transfer() 方法对 table 进行扩容。
整个扩容分为两部分:
  1. 构建一个 nextTable,大小为 table 的两倍。这个过程是单线程进行的
  1. 把 table 的数据复制到 nextTable 中。这个过程中并没有完全打乱当前已有的元素位置,而是在数组扩容 2 倍后,将一半的元素移动到新的空间中。新的位置总是在老位置的后面 n 个槽位( n 为原数组大小)
notion image

2.3 get()操作

put()方法相比,get()方法就比较简单了。
  1. 根据hash值 得到对应的槽位 (n - 1) & h
  1. 如果当前槽位第一个元素key就和请求的一样,直接返回
  1. 否则调用 Node 的 find() 方法查找
    1. 对于 ForwardingNode 使用的是 ForwardingNode.find()
    2. 对于红黑树使用的是 TreeBin.find()
    3. 对于链表型的槽位,依次顺序查找对应的key

3、ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的 linkedlist 容器,使用 CAS 原子指令来处理对数据的并发访问,这是一种非阻塞的实现方式。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。

4、CopyOnWriteArrayList

CopyOnWriteArrayList 是线程安全的 arraylist 容器。利用 “写时复制” 思想,保证读/写同时进行不会冲突。只有当多个线程同时进行写操作的时候,才需要同步,适用于“读多写少”的场景。
CopyOnWriteArrayList内部:
  • 通过一个Object[]数组来保存数据。
  • 通过一个Object对象lock来对写操作进行加锁,使用的是synchronized (lock)方式。
  • CopyOnWriteArrayList 的所有读方法都不会加锁,所有增、删、改方法都会加锁。
CopyOnWriteArrayList 总结如下:
  • 由于使用了 “写时复制” 的思想,适用于 “读多写少” 的场景;
  • 由于在进行写操作的时候会复制原数组,对内存的占用会比较大,不适用于大数据量的场景;
  • 只能保证最终的数据一致性,不能保证实时的数据一致性——读操作只能从旧数组中读取数据,而此时可能已经复制了一个新数组,并且正在修改新数组的数据。
  • 迭代是对快照进行的,不会抛出 ConcurrentModificationException,且迭代过程中不支持修改操作。

5、BlockingQueue

阻塞队列(BlockingQueue)是 JDK5 新增的线程安全的高效队列类,基于生产者-消费者模式。相比起传统队列,增加支持阻塞的插入和移除方法:
  • 当线程向队列中插入元素时,如果队列已满,则阻塞线程,直到队列有空闲位置(非满)。
  • 当线程从队列中取元素(删除队列元素)时,如果队列为空,则阻塞线程,直到队列有元素。
BlockingQueue 支持的插入/移除方法如下:
方法 / 处理方式
抛出异常
返回特殊值
一直阻塞
超时退出
插入方法
add(e)
offer(e)
put(e)
offer(e,time,unit)
移除方法
remove()
poll()
take()
poll(time,unit)
检查方法(从队列头取回但不移除)
element()
peek()
/
/
可以看到,对于每种基本方法,“抛出异常”和“返回特殊值”的方法定义和Queue是完全一样的。BlockingQueue只是增加了两类和阻塞相关的方法
  • put(e)take() 方法会一直阻塞调用线程,直到线程被中断或队列状态可用;
  • offer(e, time, unit)poll(time, unit) 方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。
注意事项:
  • BlockingQueue 不接受 null 元素,会抛出 NullPointerException,null 用于标记poll操作的失败。
  • BlockingQueue接口的实现类都必须是线程安全的,实现类一般通过“锁”保证线程安全;
  • BlockingQueue 可以是限定容量的。对于没有容量限制的 BlockingQueue 实现,该方法总是返回 Integer.MAX_VALUE

5.1 ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。在创建 ArrayBlockingQueue 对象时必须制定容量大小。
ArrayBlockingQueue 内部的阻塞队列是通过重入锁 ReenterLock 和 Condition 条件队列实现的,所以 ArrayBlockingQueue 中的元素存在公平访问与非公平访问的区别。
  • 对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。
  • 而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。

5.2 LinkedBlockingQueue

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。在创建LinkedBlockingQueue 对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
LinkedBlockingQueue 的实现有点类似于 ArrayBlockingQueue,内部也是通过 ReenterLock 来加锁。但是 ArrayBlockingQueue中插入元素和移除元素时都需要获取同一个锁,即这两个动作不能同时进行;LinkedBlockingQueue中插入元素和从链表头移除元素是两把锁,两个操作同时进行效率,效率更高。

5.3 PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueu e时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

5.4 DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素
DelayQueue非常有用,可以将DelayQueue运用在以下应用场景:
  • 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了
  • 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的

5.5 SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个take操作,否则不能继续添加元素
它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的 SynchronousQueue,如果设置为 true,则等待的线程会采用先进先出的顺序访问队列。
SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

5.6 LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
  • transfer方法:如果当前有消费者正在等待接收元素(消费者使用take0方法或带时间限制的poll0方法时),transfer方法可以把生产者传人的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
  • tryTransfer方法:用来试探生产者传人的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传人的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

5.7 LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时人队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取( peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。
在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。
Java并发系列(五):线程和线程池Java并发系列(三):并发工具
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏