简介
本文介绍Java中的ConcurrentHashMap的原理。
JDK7与JDK8区别
项 | JDK1.7 | JDK1.8 |
实现方式 | HashMap(数组 + 链表) | HashMap(数组 + 链表 + 红黑树) |
保证线程安全的方式 | 使用ReentrantLock 对 Segment同步 | 读操作:volatile; 写操作:synchronized + CAS |
粒度 | 对需要进行数据操作的Segment加锁 | 对每个桶(数组项)加锁 |
JDK8原理概述
概述
JDK8中ConcurrentHashMap结构基本上和HashMap一样,采用了HashMap(数组 + 链表 + 红黑树) + synchronized + CAS 的实现方式来设计。读操作使用volatile,写操作使用synchronized 和CAS。
CAS:在判断数组中当前位置为null的时候,使用CAS把这个新的Node写入数组中对应的位置。
synchronized :当数组中的指定位置不为空时,通过加锁来添加这个节点(链表或者红黑树)。
JDK8中采用的是Node(放弃了Segment)。Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。
class Node<K,V>implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; //... 省略部分代码 }
JDK8源码分析
put
线程访问哈希表的bucket时,使用 sychronized关键字,防止多个线程同时操作同一个 bucket(即锁住bucket)。如果该结点的 hash值不少于0,则遍历链表更新节点或插入新节点;如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点;更新了节点数量,还要考虑扩容和链表转红黑树。
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //1. 计算key的hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //4. 当前正在扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //5. 当前为链表,在链表中插入新的键值对 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 6.当前为红黑树,将新的键值对插入到红黑树中 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 addCount(1L, binCount); return null; }
在判断数组中当前位置为null的时候,使用CAS把这个新的Node写入数组中对应的位置。对应的源码是这个:
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
get
get操作可以无锁(不加锁)是由于 Node 元素 的val 指针 和 next指针 用volatile修饰的,在多线程环境下,线程A修改结点的 val 或者新增结点的时候,对线程B都是可见的。
size
//略
JDK7原理概述
在JDK1.7中ConcurrentHashMap采用了HashMap + ReentrantLock + Segment的方式实现。
ConcurrentHashMap中的分段锁称为Segment,它类似于HashMap的结构,即:内部拥有一个Entry数组,数组中的每个元素又是一个链表,同时又是一个ReentrantLock(Segment继承了ReentrantLock)。
ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
ConcurrentHashMap的内部结构图
put过程
- 对key进行第1次hash,通过hash值确定Segment的位置
- 获取当前Segment的HashEntry数组后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置(头部)
- 通过继承ReentrantLock的tryLock方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock方法去获取锁,超过指定次数就挂起,等待唤醒
- 然后对当前索引的HashEntry链进行遍历,如果有重复的key,则替换;如果没有重复的,则插入到链头
- 释放锁
get操作
和put操作类似,也是要两次hash。但是get操作的Concurrenthashmap不需要加锁,原因是存储元素都标记了volatile。
size操作
size操作就是遍历两次所有的Segments,每次记录Segment的modCount值,然后将两次的modCount进行比较
- 如果相同,则表示期间没有发生过写入操作,就将原先遍历的结果返回。
- 如果经判断发现两次统计出的modCount并不一致,要全部Segment加锁来进行count的获取和统计。在此期间每个Segement都被锁住,无法进行其他操作,统计出的count自然很准确。
此结构优缺点
优点
- 写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment。
缺点
- Hash的过程要比普通的HashMap要长
请先
!