一、ConcurrentHashMap出现的原因

  我们之前学过HashMap,也知道HashMap不是线程安全的,在多线程环境下,HashMap的put方法有可能引起死循环。于是HashTable这个类出现,它在大量的方法前都加了内置锁Synchronized,这就保证了它的线程安全性,但是这种方法太极端,导致效率低下。当一个线程访问了HashTable的同步方法时,其它线程就只能等待该线程释放锁。在这种情况,针对多线程的情况,ConcurrentHashMap应运而生。

二、ConcurrentHashMap的数据结构

  ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问
这里写图片描述
(图片来自:http://www.cnblogs.com/dolphin0520/p/3920373.html
  和HashMap不同之处是,HashMap是使用一个数组来连接各个Entry链,而ConcurrentHashMap则是使用了Segment数组(继承ReentrantLock)来链接各个HashEntry数组,然后各个HashEntry数组中连接各自的HashEntry链。(这个要很注意)每一个Segment都有一个锁,所以这可以达到并发得访问各个Segment的Entry。
  这里写图片描述
  

三、ConcurrentHashMap的定义

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

  ConcurrentHashMap继承自AbstractMap,实现了ConcurrentMap接口,使得它具有Map的属性,同时又有多线程相关的属性

ConcurrentHashMap的成员变量:

    //初始的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //初始的加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //初始的并发等级(下面会叙述作用) static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //最小的segment数量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment数量 static final int MAX_SEGMENTS = 1 << 16;  // static final int RETRIES_BEFORE_LOCK = 2;

四、ConcurrentHashMap的构造函数

   //通过指定的容量,加载因子和并发等级创建一个新的ConcurrentHashMap 
   public ConcurrentHashMap(int initialCapacity, 
                             float loadFactor, int concurrencyLevel) { 
         //对容量,加载因子和并发等级做限制 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //限制并发等级不可以大于最大等级 if (concurrencyLevel > MAX_SEGMENTS) 
            concurrencyLevel = MAX_SEGMENTS; // 下面即通过并发等级来确定Segment的大小 //sshift用来记录向左按位移动的次数 int sshift = 0; //ssize用来记录Segment数组的大小 int ssize = 1; //Segment的大小为大于等于concurrencyLevel的第一个2的n次方的数 while (ssize < concurrencyLevel) { 
            ++sshift; 
            ssize <<= 1; 
        } 
 this.segmentShift = 32 - sshift; //segmentMask的值等于ssize - 1(这个值很重要) this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) 
            initialCapacity = MAXIMUM_CAPACITY; //c记录每个Segment上要放置多少个元素 int c = initialCapacity / ssize; //假如有余数,则Segment数量加1 if (c * ssize < initialCapacity) 
            ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) 
            cap <<= 1; 
 //创建第一个Segment,并放入Segment[]数组中,作为第一个Segment 
        Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), 
                             (HashEntry<K,V>[])new HashEntry[cap]); 
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; 
    }

  对于容量和加载因子:我在HashMap那篇文章已经讲解得很清楚: Java容器(四):HashMap(Java 7)的实现原理
  并发等级(concurrencyLevel):用来确定Segment的数量,Segment的个数为大于等于concurrencyLevel的 第一个 2的n次方的数,例如当concurrencyLevel为12,13,14,15,16时,此时的Segment的数量为16
  segmentMask:这个为什么要为Segment数组的长度 -1, 这个也在HashMap中讲过了,主要是为了让低位为1,这样在做&运算确定Segment的索引时能够更加分散

五、ConcurrentHashMap的put操作

    public V put(K key, V value) { 
        Segment<K,V> s; //ConcurrentHashMap的key和value都不能为null if (value == null) throw new NullPointerException(); 
 //这里对key求hash值,并确定应该放到segment数组的索引位置 int hash = hash(key); //j为索引位置,思路和HashMap的思路一样,这里不再多说 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck 
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment 
            s = ensureSegment(j); //这里很关键,找到了对应的Segment,则把元素放到Segment中去 return s.put(key, hash, value, false); 
    }

  得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:
  0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。
  其实大体和HashMap相似

  下面看看具体如何插入到Segment中的

final V put(K key, int hash, V value, boolean onlyIfAbsent) { //这里是并发的关键,每一个Segment进行put时,都会加锁 
            HashEntry<K,V> node = tryLock() ? null : 
                scanAndLockForPut(key, hash, value); 
            V oldValue; try { //tab是当前segment所连接的HashEntry数组 
                HashEntry<K,V>[] tab = table; //确定key的hash值所在HashEntry数组的索引位置 int index = (tab.length - 1) & hash; //取得要放入的HashEntry链的链头 
                HashEntry<K,V> first = entryAt(tab, index); //遍历当前HashEntry链 for (HashEntry<K,V> e = first;;) { //如果链头不为null if (e != null) { 
                        K k; //如果在该链中找到相同的key,则用新值替换旧值,并退出循环 if ((k = e.key) == key || 
                            (e.hash == hash && key.equals(k))) { 
                            oldValue = e.value; if (!onlyIfAbsent) { 
                                e.value = value; 
                                ++modCount; 
                            } break; 
                        } //如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else 
                        e = e.next; 
                    } else {//如果没有找到key相同的,则把当前Entry插入到链头 
 if (node != null) 
                            node.setNext(first); else 
                            node = new HashEntry<K,V>(hash, key, value, first); //此时数量+1 int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) //如果超出了限制,要进行扩容 
                            rehash(node); else 
                            setEntryAt(tab, index, node); 
                        ++modCount; count = c; 
                        oldValue = null; break; 
                    } 
                } 
            } finally { //最后释放锁 
                unlock(); 
            } return oldValue; 
        }

  我们来重新理一理思路:
  1. 首先对key进行第1次hash,通过hash值确定segment的位置
  2. 然后在segment内进行操作,获取锁
  3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
  4. 然后对当前索引的HashEntry链进行遍历,如果有重复的key,则替换;如果没有重复的,则插入到链头
  5. 关闭锁

  可见,在整个put过程中,进行了2次hash操作,才最终确定key的位置。

五、ConcurrentHashMap的remove操作

    public V remove(Object key) { //求key的hash int hash = hash(key); //求得hash对应的Segment 
        Segment<K,V> s = segmentForHash(hash); //在segment内进行删除 return s == null ? null : s.remove(key, hash, null); 
    }

     final V remove(Object key, int hash, Object value) { //获取锁 if (!tryLock()) 
                scanAndLock(key, hash); 
            V oldValue = null; try { //tab是当前segment所连接的HashEntry数组 
                HashEntry<K,V>[] tab = table; //确定key的hash值所在HashEntry数组的索引位置 int index = (tab.length - 1) & hash; //取得要放入的HashEntry链的链头 
                HashEntry<K,V> e = entryAt(tab, index); //pred用来记录待删除节点的前一个节点 
                HashEntry<K,V> pred = null; while (e != null) { 
                    K k; 
                    HashEntry<K,V> next = e.next; //当找到了待删除及节点的位置 if ((k = e.key) == key || 
                        (e.hash == hash && key.equals(k))) { 
                        V v = e.value; if (value == null || value == v || value.equals(v)) { //如果待删除节点的前节点为null,即待删除节点时链头节点,此时把该位置指向第2个结点就行了 if (pred == null) 
                                setEntryAt(tab, index, next); //如果有前节点,则待删除节点的前节点的next指向待删除节点的的下一个节点,删除成功 else 
                                pred.setNext(next); 
                            ++modCount; 
                            --count; 
                            oldValue = v; 
                        } break; 
                    } 
                    pred = e; 
                    e = next; 
                } 
            } finally { //最后关闭锁 
                unlock(); 
            } return oldValue; 
        }

  我们来理一理思路:
  1. 首先对key进行第1次hash,通过hash值确定segment的位置
  2. 然后在segment内进行操作,获取锁
  3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
  4. 用一个HashEntry引用来记录待删除节点的前一个节点,然后删除待删除节点
  5. 关闭锁

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

高并发秒杀系统分析详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。