type
status
date
slug
summary
tags
category
icon
password
1、概述
并发集合类都放在
java.util.concurrent
包下,常用的有以下类:ConcurrentHashMap
:线程安全的 HashMap
ConcurrentLinkedQueue
:线程安全非阻塞的 LinkedList
CopyOnWriteArrayList
:线程安全且读操作无锁的 ArrayList
BlockingQueue
:JDK5 新增的线程安全的阻塞队列类,相比起传统队列,增加支持阻塞的插入和移除方法ArrayBlockingQueue
:数组实现的有界阻塞队列LinkedBlockingQueue
:链表实现的有界阻塞队列PriorityBlockingQueue
:支持优先级的无界阻塞队列DelayQueue
:支持延时获取元素的无界阻塞队列SynchronousQueue
:不存储元素的阻塞队列LinkedTransferQueue
:链表实现的无界阻塞队列。相对于其他阻塞队列,多了tryTransfer
和transfer
方法LinkedBlockingDeque
:链表实现的双向阻塞队列
2、ConcurrentHashMap
ConcurrentHashMap
是线程安全的 map 容器,其中利用了锁分段的思想提高了并发度。线程安全的 map 其实还有其他实现类(例如 ConcurrentSkipListMap
),但 ConcurrentHashMap
是最常用的 map 实现类,也是面试常问的并发集合类,应该重点掌握。2.1 ConcurrentHashMap实现原理
ConcurrentHashMap 在 jdk1.7 以前和 jdk1.8 以后的实现方法大大不同。
2.1.1 jdk1.7 版本的实现
jdk1.7 版本采用
数组+链表
的数据结构,采用了分段锁的思想,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。整体包含
Segment
+ HashEntry
两种结构:- 一个 ConcurrentHashMap 里包含一个 Segment 数组,每个 Segment 存储一段数据。Segment 继承了 ReentrantLock 充当锁的角色,只有获取到 Segment 锁才能操作其中的数据。
- Segment 内部类似于 HashMap 的结构,每个 Segment包含一个 HashEntry 数组,每个 HashEntry 元素又是一个链表结构。
这个版本下 ConcurrentHashMap 定位一个元素的过程需要进行两次 Hash 操作。第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的链表的头部。缺点就是实现较为臃肿复杂。
2.1.2 jdk1.8 版本的实现
jdk1.8 版本放弃了
Segment
的概念,直接用 数组+链表+红黑树
的数据结构来实现,并发控制使用 CAS + synchronized
来保证并发更新的安全,整个看起来就像是优化过且线程安全的 HashMap。虽然在 JDK1.8 中还能看到Segment
的数据结构,但是已经简化了属性,只是为了兼容旧版本。
这个版本下
ConcurrentHashMap
的结构已经跟 HashMap
差不多了,当链表的长度超过 8 时,链表就会转换成红黑树。ConcurrentHashMap 内部的关键属性和方法如下:
核心是
Node
类以及继承 Node
类的三个子类 Treebin
、TreeNode
和 ForwardingNode
。他们的继承结构如下所示:那什么时候使用
Node
,什么时候使用 TreeBin
,什么时候使用 ForwardingNode
呢?其实在绝大部分场景中,使用的依然是 Node,从 Node 数据结构中,不难看出,Node 其实是一个链表,也就是说,一个正常的 Map 可能是长这样的:
上图中,绿色部分表示Node数组,里面的元素是 Node,也就是链表的头部,当两个元素在数据中的位置发生冲突时,就将它们通过链表的形式,放在一个槽位中,数组槽位对应的是一个链表。
当链表的长度大于等于8时,会将链表树状化,也就是变成一颗红黑树。如下图所示,其中一个槽位就变成了一颗树,Node 就变成了
TreeBin
( TreeBin
内部使用 TreeNode
构建红黑树)。当数组容量快满时,即超过 75% 的容量时,数组还需要进行扩容,在扩容过程中,如果老的数组已经完成了复制,那么就会将老数组中的元素使用
ForwardingNode
对象替代,表示当前槽位的数据已经处理了,不需要再处理了,这样,当有多个线程同时参与扩容时,就不会冲突。2.2 put()方法的实现
put() 方法是 ConcurrentHashMap 中最重要的方法,负责将给定的 key 和 value 对存入 HashMap
主要包括以下几个步骤:
- 对 key 的 hashcode 进行一次 hash 计算
- 如果没有初始化数组,则尝试初始化数组
- 将给定的 key-value 放入对应的槽位
- 如果当前正在扩容,则参与帮助扩容
- 检查是否需要进行扩容操作
2.2.1 hash计算
这个方法传入的参数是 key 的 hashcode
(h ^ (h >>> 16))
目的是让 h 的高 16 位也参与寻址计算,使得到的 hash 值更分散,减少 hash 冲突产生
(h ^ (h >>> 16))
得到的结果再& HASH_BITS
,目的是为了让得到的hash值结果始终是一个正数
HASH_BITS
的 值是0x7fffffff
,即0111 1111 1111 1111 1111 1111 1111 1111
。
假如 h 的值是
1100 0011 1010 0101 0001 1100 0001 1110
(h >>> 16)
0000 0000 0000 0000 1100 0011 1010 0101
(h ^ (h >>> 16))
1100 0011 1010 0101 1101 1111 1011 1011
(h ^ (h >>> 16)) & HASH_BITS
0100 0011 1010 0101 1101 1111 1011 1011
2.2.2 初始化数组
这一步主要是判断数组是否初始化,如果没有初始化就初始化数组,默认 table 长度是 16。如果用户传入参数设置 table 大小,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是2的幂次方。然后会设置负载因子为 0.75。
2.2.3 设置 key-value 对
当数组初始化完成后,这一步采用 CAS + synchronized 实现并发插入或更新操作。这一步的核心是
tabAt()
和 casTabAt()
两个方法。(f = tabAt(tab, i = (n - 1) & hash)) == null
,说明 table 中这个位置第一次插入元素,利用 Unsafe.compareAndSwapObject 方法插入 Node 节点。- 如果 CAS 成功,说明 Node 节点已经插入,随后
addCount(1L, binCount)
方法会检查当前容量是否需要进行扩容。 - 如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
- 其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发。
- 如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。
- 如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。
- 如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。
2.2.4 参与帮助扩容
如果一个节点的 hash 是 MOVE,则表示这是一个 ForwardingNode,也就是当前正在扩容中,为了尽快完成扩容,当前线程就会调用
helpTransfer()
参与到扩容的工作中。2.2.5 检查是否需要进行扩容操作
最后调用
addCount(1L, binCount)
方法会检查当前容量是否需要进行扩容。当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl
,调用 transfer()
方法对 table 进行扩容。整个扩容分为两部分:
- 构建一个 nextTable,大小为 table 的两倍。这个过程是单线程进行的
- 把 table 的数据复制到 nextTable 中。这个过程中并没有完全打乱当前已有的元素位置,而是在数组扩容 2 倍后,将一半的元素移动到新的空间中。新的位置总是在老位置的后面 n 个槽位( n 为原数组大小)
2.3 get()操作
与
put()
方法相比,get()
方法就比较简单了。- 根据hash值 得到对应的槽位
(n - 1) & h
- 如果当前槽位第一个元素key就和请求的一样,直接返回
- 否则调用 Node 的
find()
方法查找 - 对于 ForwardingNode 使用的是
ForwardingNode.find()
- 对于红黑树使用的是
TreeBin.find()
- 对于链表型的槽位,依次顺序查找对应的key
3、ConcurrentLinkedQueue
ConcurrentLinkedQueue 是线程安全的 linkedlist 容器,使用 CAS 原子指令来处理对数据的并发访问,这是一种非阻塞的实现方式。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。
4、CopyOnWriteArrayList
CopyOnWriteArrayList 是线程安全的 arraylist 容器。利用 “写时复制” 思想,保证读/写同时进行不会冲突。只有当多个线程同时进行写操作的时候,才需要同步,适用于“读多写少”的场景。
CopyOnWriteArrayList内部:
- 通过一个Object[]数组来保存数据。
- 通过一个Object对象lock来对写操作进行加锁,使用的是synchronized (lock)方式。
- CopyOnWriteArrayList 的所有读方法都不会加锁,所有增、删、改方法都会加锁。
CopyOnWriteArrayList 总结如下:
- 由于使用了 “写时复制” 的思想,适用于 “读多写少” 的场景;
- 由于在进行写操作的时候会复制原数组,对内存的占用会比较大,不适用于大数据量的场景;
- 只能保证最终的数据一致性,不能保证实时的数据一致性——读操作只能从旧数组中读取数据,而此时可能已经复制了一个新数组,并且正在修改新数组的数据。
- 迭代是对快照进行的,不会抛出 ConcurrentModificationException,且迭代过程中不支持修改操作。
5、BlockingQueue
阻塞队列(BlockingQueue)是 JDK5 新增的线程安全的高效队列类,基于生产者-消费者模式。相比起传统队列,增加支持阻塞的插入和移除方法:
- 当线程向队列中插入元素时,如果队列已满,则阻塞线程,直到队列有空闲位置(非满)。
- 当线程从队列中取元素(删除队列元素)时,如果队列为空,则阻塞线程,直到队列有元素。
BlockingQueue 支持的插入/移除方法如下:
方法 / 处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法(从队列头取回但不移除) | element() | peek() | / | / |
可以看到,对于每种基本方法,“抛出异常”和“返回特殊值”的方法定义和Queue是完全一样的。BlockingQueue只是增加了两类和阻塞相关的方法
put(e)
和take()
方法会一直阻塞调用线程,直到线程被中断或队列状态可用;
offer(e, time, unit)
和poll(time, unit)
方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。
注意事项:
- BlockingQueue 不接受 null 元素,会抛出 NullPointerException,null 用于标记poll操作的失败。
- BlockingQueue接口的实现类都必须是线程安全的,实现类一般通过“锁”保证线程安全;
- BlockingQueue 可以是限定容量的。对于没有容量限制的 BlockingQueue 实现,该方法总是返回
Integer.MAX_VALUE
。
5.1 ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。在创建 ArrayBlockingQueue 对象时必须制定容量大小。
ArrayBlockingQueue 内部的阻塞队列是通过重入锁 ReenterLock 和 Condition 条件队列实现的,所以 ArrayBlockingQueue 中的元素存在公平访问与非公平访问的区别。
- 对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。
- 而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。
5.2 LinkedBlockingQueue
LinkedBlockingQueue 是一个用链表实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。在创建LinkedBlockingQueue 对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
LinkedBlockingQueue 的实现有点类似于 ArrayBlockingQueue,内部也是通过 ReenterLock 来加锁。但是 ArrayBlockingQueue中插入元素和移除元素时都需要获取同一个锁,即这两个动作不能同时进行;LinkedBlockingQueue中插入元素和从链表头移除元素是两把锁,两个操作同时进行效率,效率更高。
5.3 PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueu e时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
5.4 DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素
DelayQueue非常有用,可以将DelayQueue运用在以下应用场景:
- 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了
- 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的
5.5 SynchronousQueue
SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个take操作,否则不能继续添加元素
它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的 SynchronousQueue,如果设置为 true,则等待的线程会采用先进先出的顺序访问队列。
SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
5.6 LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
- transfer方法:如果当前有消费者正在等待接收元素(消费者使用take0方法或带时间限制的poll0方法时),transfer方法可以把生产者传人的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
- tryTransfer方法:用来试探生产者传人的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传人的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
5.7 LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时人队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取( peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。
在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。
- Author:mcbilla
- URL:http://mcbilla.com/article/c0cbe779-e999-4769-92fc-d347ff7c8bfd
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts