coding……
但行好事 莫问前程

Java编程拾遗『并发容器——ConcurrentHashMap』

在之前的文章中已经介绍过的哈希表数据结构有HashMap、TreeMap、LinkedHashMap、HashTable以及Collections类生成的各种同步Hash容器。但是我们也了解到,HashMap是非线程安全的,在多线程环境下会出现数据安全性问题,比如多线程同时rehash过程中产生的死循环(Java7)及元素丢失问题。HashTable和Collections类生成的同步容器倒是可以在保证线程安全,但是也有弊端,比如HashTable每个方法都使用syncronized同步,所有的方法都是互斥的,并发度较低,在竞争比较激烈的时候,往往存在性能问题。Collections生成的同步Hash容器也存在同样并发度低的问题,另外像putIfAbsent复合操作,HashTable和Collections生成的同步容器都是无法支持的。

而ConcurrentHashMap的出现就是为了解决现有并发Hash容器的上述问题,是java.util.concurrent包中重要的并发容器,底层通过数组 + 链表 + 红黑树实现。相比于HashTable(锁整张表),ConcurrentHashMap支持的并发度更高(锁数组中的一个元素)。另外ConcurrentHashMap在Java6、Java7及Java8中的实现有很大变化,本文就Java7和Java8的源码,对ConcurrentHashMap做简单的讲解。

1. Java7

Java7中ConcurrentHashMap采用数组+链表的数据结构,ConcurrentHashMap整体上是一个Segment的数组,而每个分段Segment又是一个HashEntry的数组,每个HashEntry是一个链表。简单的讲Java7中,每个Segment就是一个小的HashMap,每个Segment持有一把锁,对某个Segment的锁操作并不影响其它Segment,这样相比与HashTable并发度有很大提升

ConcurrentHashMap内部Segment数组声明如下:

/**
 * The segments, each of which is a specialized hash table
 */
final Segment<K,V>[] segments;

Segment类声明如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable { 
}

Segment继承了ReentrantLock,所以ConcurrentHashMap的每个Segment都是一把锁,不同的Segment之间的读写不构成竞争,大大降低了锁的竞争。需要注意的是,Segment数组一旦确定大小,就不能再扩容了,但是单个Segment里面的HashEntry数组(HashMap)是可以扩容的。

下面来看一下ConcurrentHashMap中Segment数组之外的成员变量,主要包含以下三个:

  • initialCapacity:初始总容量,默认16(ConcurrentHashMap中总KV数)
  • loadFactor:负载因子,默认0.75(单个Segment段,HashMap负载因子)
  • concurrencyLevel:并发级别,默认16(Segment数组大小)

针对上述三个成员变量,我们需要了解的是:

  • Segment数组长度ssize:取大于等于并发级别concurrencyLevel的最小的2的幂次,如concurrencyLevel=16,那么sszie=16,如concurrencyLevel=20,那么ssize=32
  • 单个Segment的初始容量cap:c = initialCapacity/ssize或initialCapacity/ssize + 1,是否加1取决于是否整除,比如15/7=2,那么c要取3,如16/8=2,那么c取2。cap的值为大于等于c的的最小2的幂次,最小值为2
  • 单个Segment的阈值threshold:cap*loadFactor(超过threadshold,单个Segment对应的HashMap就要进行扩容)

1.1 构造函数

在介绍构造函数之前,先来看一下ConcurrentHashMap中常量定义:

// 默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认segment层级
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// segment最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 一个segment最大容量
static final int MAX_SEGMENTS = 1 << 16;
// 锁之前重试次数
static final int RETRIES_BEFORE_LOCK = 2;
public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
                          float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 
    int sshift = 0;
    // segment数组的长度是由concurrentLevel计算来的,segment数组的长度是2的N次方,

    // 默认concurrencyLevel = 16, 所以ssize在默认情况下也是16,此时 sshift = 4

    // sshift相当于ssize从1向左移的次数

    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift; 
        ssize <<= 1;
    }
    // 段偏移量,默认值情况下此时segmentShift = 28
    this.segmentShift = 32 - sshift;
    // 散列算法的掩码,默认值情况下segmentMask = 15
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                          (HashEntry<K,V>[])new HashEntry[cap]);
    // 创建ssize长度的Segment数组
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

在上述构造函数中我们指定了concurrencyLevel,也即是多少把锁。这个数量不能超过上限:MAX_SEGMENTS(1 << 16),锁的个数必须是2的幂次方,如果我们指定的concurrencyLevel不是2的幂次方,构造函数会找到最接近的一个不小于我们指定的值的一个2的幂次方数作为segment数组长度。假如两个线程同时向表中插入元素,线程1插入的第0个segment,线程2插入的是第1个segment,线程1和线程2互不影响,能够同时并行,但是HashTable就做不到这一点。

上面构造函数中,segmentShift和segmentMask是用来通过位运算得到segment的索引位置的,在后面put方法中再介绍。

1.2 put方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask; // 通过位运算得到segment的索引位置
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

来看一下segment索引方式是: (hash >>> segmentShift) & segmentMask; 而不是hash % segment数组长度。这儿是一个优化:因为取模”%”操作相对位运算来说是很慢的,因此这里是用位运算来得到segment索引。而当segment数组长度是2的幂次方记为segmentSize时:hash % segmentSize == hash & (segmentSize - 1),这点在之前HashMap的文章中介绍过。下面来看一下Segment类中的put方法:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null :  // 获取segment的锁,这里会有一个优化:获取锁的时候首先会通过tryLock()尝试若干次
        scanAndLockForPut(key, hash, value);  // 如果若干次之后还没有获取锁,则用lock()方法阻塞等待,直到获取锁
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash; // 得到segment的table的索引,也是通过位运算
        HashEntry<K,V> first = entryAt(tab, index); // table中index位置的first节点
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) { // 对应的key已经有了value
                    oldValue = e.value;
                    if (!onlyIfAbsent) { // 是否覆盖原来的value
                        e.value = value; // 覆盖原来的value
                        ++modCount;
                    }
                    break;
                }
                e = e.next;  // 遍历
            }
            else {
                if (node != null)
                    node.setNext(first); // 如果node已经在scanAndLockForPut()方法中初始化过
                else
                    node = new HashEntry<K,V>(hash, key, value, first); // 如果node为null,则初始化
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 如果超过阈值,则扩容
                else
                    setEntryAt(tab, index, node); // 通过UNSAFE设置table数组的index位置的元素为node
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

put操作首先会获当前取Segment的锁,然后确定带添加的KV在HashEntry数组中的索引,并遍历链表确认key是否已存在,如果存在则进行替换,put操作结束并解锁。如果key不存在,则需要在当前HashEntry数组添加一个节点(头插法),并判断是否需要扩容(这里的扩容是HashEntry数组的扩容,而不是Segment数组的扩容,Segment数组大小一旦确定就不能再改变了)。

1.3 get方法

ConcurrentHashMap中get操作是不需要加锁的,虽然HashEntry类中value是volatile的,Segment中的HashEntry[] table数组也是volatile,但ConcurrentHashMap的读操作并不是强一致性的,put方法写入的元素,并不能保证get方法一定能读到

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 计算key应该落在segments数组的哪个segment中
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); // 计算key应该落在table的哪个位置
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k))) // 如果key和当前节点的key指向同一块内存地址或者当前节点的hash
                return e.value;                                       // 等于key的hash并且key"equals"当前节点的key则说明当前节点就是目标节点
        }
    }
    return null; // key不在当前哈希表中,返回null
}

这里我们简单分析一下get方法的弱一致性,上面put方法插入一个节点并调用setEntryAt方法,设置table数组的index位置的元素为node。setEntryAt方法底层依赖的是UNSAFE类的putOrderedObject方法,该方法并不具有volatile语义,不能保证get方法中通过getObjectVolatile读操作一定能读取到put方法写入的内容,所以get方法是弱一致的。这点其实就是很容易通过happens-before分析得到,关于happens-before,会在下篇讲解Java内存模型的文章中详细介绍,不了解的,可以先去看一下这篇文章用happen-before规则重新审视DCL

1.4 rehash扩容操作

在上面put方法中,在添加一个HashEntry节点后,需要判断当前Segment段KV的数目是否大于threshold,如果大于threshold则需要调用rehash方法进行扩容:

private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1; // 每次扩容成原来capacity的2倍,这样元素在新的table中的索引要么不变要么是原来的索引加上2的一个倍数
    threshold = (int)(newCapacity * loadFactor); // 新的扩容阈值
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的segment table数组
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;  // 在拷贝原来链表的元素到新的table中时有个优化:通过遍历找到原先链表中的lastRun节点,这个节点以及它的后续节点都不需要重新拷贝,直接放到新的table中就行
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun; // lastRun节点以及lastRun后续节点都不需要重新拷贝,直接赋值引用
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { // 循环拷贝原先链表lastRun之前的节点到新的table链表中
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]); // rehash之后,执行添加新的节点
    newTable[nodeIndex] = node;
    table = newTable;
}

rehash过程中是加排它锁的,这样当前Segment其他的写入请求将被阻塞等待。而对于读请求,如果读请求在rehash之前,此时segment中的table数组指针还是指向原先旧的数组,此时新的KV还没有插入,读取的还是旧的内容;如果读请求在rehash之后,因为table数组和HashEntry的value都是volatile,所以读线程能及时读取到更新的值(这点跟不需要扩容直接调用setEntryAt有区别,直接调用时,作用的对象时tab引用,tab引用是局部变量,不具有volatile语义,后续对该引用通过UNSAFE类调用putOrderedObject方法也不是volatile写,所以有可能对读线程不可见)。综上,rehash不会影响到读。

1.5 remove方法

public V remove(Object key) {
    int hash = hash(key);
    Segment<K,V> s = segmentForHash(hash); // key落在哪个segment中
    return s == null ? null : s.remove(key, hash, null); // 如果segment为null,则说明哈希表中没有key,直接返回null,否则调用Segment的remove
}
final V remove(Object key, int hash, Object value) { // Segment的remove方法
    if (!tryLock()) // 获取Segment的锁,套路还是一样的首先进行若干次 `tryLock()`, 如果失败了则通过 `lock()` 方法阻塞等待直到获取锁
        scanAndLock(key, hash);
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> e = entryAt(tab, index); // 找到key具体在table的哪个链表中,e代表链表当前节点
        HashEntry<K,V> pred = null; // pred代表e节点的前置节点
        while (e != null) {
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key ||
                (e.hash == hash && key.equals(k))) { // 找到了这个key对应的HashEntry
                V v = e.value;
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null) // 如果当前节点的前置节点为空,说明要删除的节点是当前链表的头节点,直接将当前链表的头节点指向当前节点的next就可以了
                        setEntryAt(tab, index, next);
                    else
                        pred.setNext(next); // 否则修改前置节点的next指针,指向当前节点的next节点,这样当前节点将不再"可达",可以被GC回收
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e;
            e = next;
        }
    } finally {
        unlock(); // 解锁
    }
    return oldValue;
}

remove操作时,首先会找到这个key落在哪个Segment中,如果key没有落在任何一个Segment中,说明key不存在,直接返回null。找到具体的Segment后,调用Segment的remove方法来进行删除:找到key落在Segment的table数组中的哪个链表中,遍历链表,如果要删除的节点是当前链表的头节点,则直接修改链表的头指针为当前节点的next节点;如果要删除的节点不是头节点,继续遍历找到目标节点,修改目标节点的前置节点的next指针指向目标节点的next节点完成操作。

remove时首先会加排它锁,这样当前Segment其他的写入请求将被阻塞等待。对于读请求,如果读取的key不是当前要删除的key不会有任何问题。如果读取的key恰巧是当前需要删除key:读请求在remove之前,这时可以读取到;如果读请求在remove操作之后,注意这里的setEntryAt,这里跟put方法类似,不具有volatile语义,读线程不一定能读取到remove变更,如果带删除的节点不是数组的头节点,那么通过setNext方法设置next指针,由于next是volatile修饰的,读线程可以读取到remove的变更。

1.6 size方法

ConcurrentHashMap的size操作,首先会进行若干次尝试,每次对各个Segment的count求和,如果任意前后两次求和结果相同,则说明在这段时间之内各个Segment的元素个数没有改变,直接返回当前的求和结果就行了。如果超过一定重试次数之后,会采取悲观策略,直接锁定各个Segment,然后依次求和。当锁定所有Segment时,采取悲观策略,整个哈希表都不能有写入操作。

public int size() {
    // 首先不加锁,每次对各个Segment的count累加求和,如果任意两次的累加结果相同,则直接返回这个结果;超过一定的次数之后悲观锁定所有的Segment,再求和。锁定之后整个哈希表不能有任何的写入操作。
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) { // 最大乐观重试次数
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) { // 对各个Segment的count累加,不加锁
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last) // 如果本次累加结果和上次相同,说明这中间没有插入或者删除操作,直接返回这个结果
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size; // 如果溢出,返回最大整型作为结果,否则返回累加结果
}

2. Java8

Java7为了实现并行访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。Java8为进一步提高并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。同时为了提高哈希碰撞下的寻址性能,Java8在链表长度超过一定阈值(8)时,将链表转换为红黑树。与HashMap一致,在Java8中,ConcurrentHashMap是通过数组 + 链表 + 红黑树实现的

ConcurrentHashMap中一些重要的成员属性声明如下:

/**
* 整个hash表的结构。也被称作hash桶数组
*/
transient volatile Node<K,V>[] table;

/**
* 扩容时用的nextTable,只有在扩容时才为非空
*/
private transient volatile Node<K,V>[] nextTable;

/**
* 主要用于表初始化和扩容时的控制,各种数值有不同的含义
* -1:      表正在初始化或者扩容
* 其他负数: 多线程扩容时出现,表示-(1+扩容线程数) ,也就是扩容线程数等于负数取正后减1
* 0和正数:  0为默认值。当table仍未null是,该值表示表初始化时的大小。否则在表示下次扩容时的阈值
*		     为当前容量*loadfactor,超过该值后进行扩容
*/
private transient volatile int sizeCtl;

/**
* 扩容时记录的索引 
*/
private transient volatile int transferIndex;
    
static final int MOVED     = -1; // forwarding Node的hash值
static final int TREEBIN   = -2; // TreeBin Node的hash值
static final int HASH_BITS = 0x7fffffff; // 用于hash取正,使其为正常结点

Node类声明如下(链表结构):

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    //methods
}

扩容后,红黑树节点TreeNode声明如下:

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    //methods
}

用于表示红黑树的类TreeBin声明如下:

static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;  //红黑树根节点
    volatile TreeNode<K,V> first;   //链表头结点,TreeBin仍然保存了链表结构
    volatile Thread waiter;  //最近的一个设置 WAITER 标识位的线程 
    volatile int lockState;  //锁状态标志位

    static final int WRITER = 1; // 写锁标志位
    static final int WAITER = 2; // 等待锁标志位
    static final int READER = 4; // 读锁标志位

    //methods
}

用于扩容时,实现了扩容时旧表和新表的连接的结构ForwardingNode声明如下:

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;

    //methods
}

Node<K,V>:继承Map.Entry<K,V>,是其他三种Node的基类。ConcurrentHashMap中链表的普通结点就是Node<K,V>,整个表的结构也是Node<K,V>的数组。Node<K,V>与HashMap中的定义很相似,它对value和next属性设置了volatile,保证了其他线程的可见性。

TreeNode<K,V>:继承Node<K,V>,与1.8的HashMap一样。当链表长度过长时,链表会转换为红黑树,TreeNode<K,V>就是红黑树的结点。

TreeBin<K,V>:继承Node<K,V>,hash值固定为-2,是红黑树的包装结点。与1.8的HashMap不同的是,ConcurrentHashMap的数组槽中放入的不是TreeNode结点,而是将TreeNode的包装起来的TreeBin的对象中。此外可以从TreeNode包含的字段中发现除了包含树的根节点root,还有读写锁方面的状态变量。

ForwardingNode<K,V>:继承Node<K,V>,hash值固定为-1。只在扩容 transfer的时候出现。当数组槽为空或已经完成数组槽的扩容后插入数组槽中告知其他线程。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。

2.1 构造函数

public ConcurrentHashMap() {
}

无参默认构造器,Node数组的长度为16

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                MAXIMUM_CAPACITY :
                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

指定初始表长的构造器,表长会利用tableSizeFor指定为大于等于输入参数的最接近的2的次幂。

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

指定ConcurrentHashMap初始容量和负载因子,调用的是下面的构造函数。

public ConcurrentHashMap(int initialCapacity,
                            float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

指定ConcurrentHashMap初始容量、负载因子和concurrencyLevel,这里的concurrencyLevel并不是真正的并发级别,Java8中并发级别是Node数组的长度,是根据initialCapacity决定的。

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

通过Map初始化ConcurrentHashMap,实际是通过调用putAll方法实现的。

从上述构造函数可以看出,构造函数主要是设定sizeCtl的值,并未对哈希表进行初始化。当哈希表未初始化的时候,sizeCtl的值其实指定的是表的长度。初始化的工作在put操作时实现。

2.2 put方法

public V put(K key, V value) {
        return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //如果key或value为null,直接抛NPE
    if (key == null || value == null) throw new NullPointerException();
    //获取hash值
    int hash = spread(key.hashCode());
    //用来记录所在hash数组槽中结点的个数,后面会用于判断是否链表过长需要转红黑树
    int binCount = 0;
    //循环,直到插入以后才会跳出
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //若表未初始化调用initTable()函数初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //定位到table的位置,用tabAt函数volatile读取table[i]
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果数组槽为空,则用CAS将结点挂上去
            if (casTabAt(tab, i, null,
                            new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果头结点hash值为-1,则为ForwardingNode结点,说明在扩容,调用hlepTransfer帮助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //如果是普通链表结点或树结点,则synchronized同步,同步的结点为数组槽的头结点
            //remove/replace也会尝试锁住头节点,保证锁住数组槽的头节点能够阻塞其他基本的写操作
            synchronized (f) {
                //再检查一下,避免加锁的空隙中其他线程进行操作使头结点发生变化,类似于单例模式的双重校验锁
                if (tabAt(tab, i) == f) {
                    //普通链表结点
                    if (fh >= 0) {
                        // 因为第一个节点处理了,这里赋值为1
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                                // 找到“相等”的节点,看看是否需要更新value的值  
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 遍历到链表末尾还没碰见“相等”,那么就添加新节点到链表的末尾
                            // 注意是尾插法
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                            value, null);
                                break;
                            }
                        }
                    }
                    //TreeBin结点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //调用putTreeVal利用红黑树的方法进行添加
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //如果超过了树化的阈值,即链表长度太长,则调用treeifyBin将链表转换为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);  //调用addCount函数使计数器加1,可能会引发扩容
    return null;
}

put方法内,主要完成了以下几个操作:

  • 判断是否key和value是否为null,如果为为null,则抛出空指针异常
  • 判断哈希表是不是未初始化,如果尚未初始化,则调用initTable()函数初始化哈希表
  • 根据待插入key的hash值,定位到相应的数组槽,如果数组槽为空,则用CAS将结点挂上去
  • 判断是否是ForwardingNode结点,若是则调用hlepTransfer帮助扩容
  • 利用synchronized锁住链表头结点,如果是普通链表结点则用尾插法插入,如果是树结点调用putTreeVal函数插入红黑树
  • 判断链表长度是否超过阈值,进一步决定是否需要转化为红黑树

如果待插入key定位的数组槽为空,则不需要加锁,通过CAS将节点挂到数组槽中,否则会使用synchronized锁。

在这里,我们再单独来看一下put操作时hash函数spread和哈希表初始化函数initTable方法的实现:

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

通过key的hashCode高16位和低16位按位异或操作,最后还和HASH_BITS相与,HASH_BITS在上文中提到过值为 0x7fffffff。它的作用主要是使hash值为正数。在ConcurrentHashMap中,Hash值为负数有特别的意义,如-1表示ForwardingNode结点,-2表示TreeBin结点。当hash数组槽中的第一个结点hash值为负数时,会根据结点的类型利用多态调用相应的find函数。这点在下面讲get方法时再介绍。

/**
* 初始化hash表,利用SizeCtl记录的值作为初始化的容量
*/
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //循环,直到当表不为空且表长不为0
    while ((tab = table) == null || tab.length == 0) {
        //sizeCtl小于0,说明有其他线程正在执行表初始化
        //调用Thread.yield()让出CPU,若不成功则会一直在wihle循环中自旋,直至其他线程对表初始化完成
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //调用CAS函数将sizeCtl值设置为-1,表明当前线程要执行表初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //检查table数组是否已经被初始化,没初始化就真正初始化
                if ((tab = table) == null || tab.length == 0) {
                    //若sc(sizeCtl)的值为0,则为默认值16
                    //若sc>0,说明构造函数中有指定,则为sizeCtl的值
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //真正执行表的初始化操作
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    //table的复制
                    table = tab = nt;
                    //n - (n >>> 2) = n - n/4 = 0.75n,也就是说sc的值等于threshold
                    sc = n - (n >>> 2);
                }
            } finally {
                //最后要把sc的值赋给sizeCtl
                sizeCtl = sc;
            }
            //跳出循环
            break;
        }
    }
    return tab;
}

2.3 get方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //获取hash值
    int h = spread(key.hashCode());
    //首先表不得为空,若不为空,则利用hash值定位,定位方法和HashMap一样
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //判断第一个结点是否为要找的结点,判断方法和HashMap一样
        //先比较hash值,若相等则需要地址相等或者equals为true中的一个成立,先比较地址可以节约时间
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果hash值小于0,有两种情况
        //-1是ForwardingNode,则用find函数转发到nextTable上查找
        //-2是TreeBin,调用TreeBin的find函数。根据自身读写锁情况,判断是用红黑树方式查找,还是用链表方式查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //为正常的链表结点,遍历查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法中主要完成了以下操作:

  • 根据key的hashCode,定位到具体的hash槽
  • 判断hash槽是否为空,若hash槽不为空,判断第一个结点是否是要查找的结点,如果是则直接返回
  • 如果hash槽第一个节点不是要查找的节点,则根据hash值是否为负数,将查找操作分派给相应的find函数。若是ForwardingNode,则用find函数转发到nextTable上查找;若是TreeBin结点,调用TreeBin的find函数,去红黑树中查找。否则遍历链表去查找

Java8 Node类中成员变量next和val都是volatile的,通过happens-before分析可以得出,put方法中的写操作是对get方法可见的,所以Java8中get方法不是弱一致性的。

这里我们顺便看一下ForwardingNode中的find方法,如下:

static final class ForwardingNode<K,V> extends Node<K,V> {  
    final Node<K,V>[] nextTable;  
    ForwardingNode(Node<K,V>[] tab) {  
        super(MOVED, null, null, null);  
        this.nextTable = tab;  
    }  
  
    //ForwardingNode的查找操作,直接在新数组nextTable上去进行查找  
    Node<K,V> find(int h, Object k) {  
        //使用循环,避免多次碰到ForwardingNode导致递归过深  
        outer: for (Node<K,V>[] tab = nextTable;;) {  
            Node<K,V> e; int n;  
            //最后一个条件定位了在新表中的位置,方法与HashMap一致
            if (k == null || tab == null || (n = tab.length) == 0 ||  (e = tabAt(tab, (n - 1) & h)) == null)   
                return null;  
            for (;;) {  
                int eh; K ek; 
                //第一个节点就是要找的节点,直接返回
                if ((eh = e.hash) == h &&  ((ek = e.key) == k || (ek != null && k.equals(ek)))  
                    return e;  
                if (eh < 0) {  
                    //继续碰见ForwardingNode的情况,这里相当于是递归调用一次本方法
                    if (e instanceof ForwardingNode) {   
                        tab = ((ForwardingNode<K,V>)e).nextTable;  
                        continue outer;  
                    }  
                    else  
                        //碰见特殊节点,调用其find方法进行查找
                        return e.find(h, k);   
                }  
                //普通节点直接循环遍历链表
                if ((e = e.next) == null)   
                    return null;  
            }  
        }  
    }  
}

扩容时当数组槽为空或已经完成数组槽的扩容后将ForwardingNode结点插入数组槽中,而find操作在新表中进行查询。利用ForwardingNode,巧妙地将旧表和新表连接起来,保证了其他线程扩容时也能对结点正常访问。

2.4 remove方法

public V remove(Object key) {
    return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果在哈希数组为null或长度为0或key对应的数组槽为null,则直接跳出循环
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //如果头结点hash值为-1,则为ForwardingNode结点,说明在扩容,调用hlepTransfer帮助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            //如果是普通链表结点或树结点,则synchronized同步,同步的结点为数组槽的头结点
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                //再检查一下,避免加锁的空隙中其他线程进行操作使头结点发生变化,类似于单例模式的双重校验锁
                if (tabAt(tab, i) == f) {
                    //普通链表结点,遍历链表删除
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    //TreeBin节点
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

remove操作时,首先会找到这个key落在table数组的hash槽,table数组为null或length为0或未找到key对应的hash槽,说明key不存在,直接返回null。找到具体的hash槽后,如果hash槽的头节点hash值为-1,表示为ForwardingNode结点,说明在扩容,调用hlepTransfer帮助扩容。否则hash槽的引用为普通链表或红黑树。如果为链表,则遍历链表,寻找带删除的节点,并通过修改目标节点的前置节点的next指针指向目标节点的next节点完成操作。如果为红黑树,则调用removeTreeNode删除节点。最终通过addCount方法,将哈希表中的元素数目减1。

remove时会加排它锁,这样当前Segment其他的写入请求将被阻塞等待。对于读请求,如果读取的key不是当前要删除的key不会有任何问题。如果读取的key恰巧是当前需要删除key:读请求在remove之前,这时可以读取到;如果读请求在remove操作之后,注意这里的setTabAt,跟Java7的setEntryAt方法不同的是,setTabAt方法底层依赖的是Unsafe类的putObjectVolatile方法,是具有volatile语义的。另外Node的next属性也是volatile修饰的,所以请求在remove操作之后的场景,是可以读取到remove的变更的。也就是讲在Java8中,remove操作不是弱一致性的。

2.5 size方法

Java8 ConcurrentHashMap使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,如下:

if ((as = counterCells) != null ||
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}

初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化。

如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,如下:

else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象:

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    break;

所以在Java8中的size方法实现比Java7简单的多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,size方法实现如下:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

到这里,ConcurrentHashMap的实现就讲完了,对比Java7和Java8的实现,感觉Java7的实现相对好理解一下,Java8实现相对复杂一些。但是无论是Java7还是Java8,ConcurrentHashMap的目标都是一致的,都实现了并发安全的HashMap版本。具有较高的并发性,支持原子更新操作,不会抛出ConcurrentModificationException,并且提供了一些线程安全的原子复合操作(比如putIfAbsent)。

Java中没有并发版的HashSet,但可以通过Collections.newSetFromMap方法基于ConcurrentHashMap构建。HashMap/HashSet基于哈希,不能对元素排序,对应的可排序的容器类是TreeMap/TreeSet,并发包中可排序的对应版本不是基于树,而是基于Skip List(跳跃表),并发包中的类分别是ConcurrentSkipListMap和ConcurrentSkipListSet,这些并发容器会在接下来的文章介绍。

参考链接:

1. Java API

2. 探索ConcurrentHashMap高并发性的实现机制(Java6)

3. Java7 ConcurrentHashMap源码浅析

4. Java 8 ConcurrentHashMap源码分析

5. ConcurrentHashMap(Java1.8)源码阅读

6. 从ConcurrentHashMap的演进看Java多线程核心技术

7. jdk1.8的HashMap和ConcurrentHashMap

8. Jdk1.6和1.7版本中ConcurrentHashMap的弱一致性

赞(3) 打赏
Zhuoli's Blog » Java编程拾遗『并发容器——ConcurrentHashMap』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

zhuoli's blog

联系我关于我

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏