Java - ConcurrentHashMap

类型和变量说明

Node类及其子类,用于存储键值对,存在以下几个实现:

  1. Node. 最基本的实现,用于与其他子类实例区分的特征是其中hash字段的值为所有 冲突的Key的共同Hash值,因此一定大于0
  2. ForwardingNode. 表示该槽中所有的节点已经被迁移至nextTable中,特征是hash 字段的值为常量MOVED,当前实现中MOVED的值为-1
  3. TreeNode. 表示红黑树的节点,与Node一样hash中存储的内容也是Key的Hash值
  4. TreeBin. 表示当前槽中的所有节点由链表转为了红黑树,TreeBin中包含了这棵 红黑树的根节点,即一个TreeNode实例。TreeBin的特征是hash字段为TREEBIN常量, 当前实现中TREEBIN的值为-2,但实际源码中使用的是instanceof进行判断

table成员变量,表示当前正在使用的Hash表

transferIndex成员变量,表示迁移时[0, transferIndex)这个范围内的table的索引 的迁移权还未被线程所占用

sizeCtl成员变量,用于Hash表的初始化和控制,它的取值为负数时,表示Hash表正在初始化 或者迁移。实际取负值并非任意,注释上说明的是-1表示正在初始化,其他情况下为扩容, 取值为-(1 + number of resize threads),但实际却会出现类似-2145714174这样很小的 负数,综上,sizeCtl的赋值逻辑暂时还未弄明白。

get方法

ConcurrentHashMap中的get方法并未使用任何同步手段,因此总体而言并不复杂。下面给出 get方法的源码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算hash值
    int h = spread(key.hashCode());
    // 通过hash值获取table中的对应槽
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 判断头节点是否就是想要找的节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 红黑树和已被迁移的情况下的判断
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 链表遍历并进行查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法的逻辑十分明确,就是根据tabAt()方法的到的Node实例进行不同的查询操作, 没有使用synchronize进行同步。因此,我们需要关注的是会不会由于其他操作的非原子性 而导致找不到某个原本存在的值。

  1. 由于其他线程正在操作链表,导致某些已存在的节点未被查询到
  2. 由于其他线程正在操作红黑树,但由于该红黑树的旋转而导致某些节点未被查询到

针对链表的情况,由于添加的节点都是向链表尾部添加,而删除的情况只要不修改被删除的 节点的next的字段就不会产生问题。针对红黑树则应该更为复杂,但这部分我还未阅读相 关的源码,因此没有相关的了解。总而言之,针对并发数据结构,读的操作需要达成的目标 是得到某个保有整体一致性的版本。

put方法

ConcurrentHashMap中的put方法是由putVal实现的,putVal可能产生的几种情况:

  • Hash数组尚未被初始化
  • Key在Hash数组中对应的槽为空,即尚未有任何元素的Hash值与该Key相同
  • Key在Hash数组中对应的槽不为空,且头节点的Key与给定Key相等
  • Key在Hash数组中对应的槽不为空,但头节点的Key与给定Key不想等,此时需要对该槽中所 存储的链表或者红黑树进行遍历
  • Key在Hash数组中对应的槽为空,但被标记为已经迁移了

table初始化

这部分的逻辑对应的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 当table为空时不断自旋
    while ((tab = table) == null || tab.length == 0) {
        // 已完成初始化权限的竞争,当前线程无初始化权限,放弃CPU的使用权限
        if ((sc = sizeCtl) < 0)
            Thread.yield();
        // 尝试竞争table的初始化权限
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                // 双重检测,防止重复初始化
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                // 确保sizeCtl的还原
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

首先,需要明确的是仅会有1个线程对table进行初始化。为了达到这个目的,源码中的实现是通过 设置一个公共变量sizeCtl,线程通过自旋和对sizeCtl进行CAS竞争table的初始化权限。

自旋的条件为(tab = table) == null || tab.length == 0,即table为空。假设现在有A和B两个 线程,那么执行情况可能有以下几种:

并发竞争初始化权限

  1. A和B都检测table为空
  2. A和B都检测(sc = sizeCtl) < 0不成立
  3. A和B同时会执行U.compareAndSetInt(this, SIZECTL, sc, -1)的话
  4. 由成功的一方进行table初始化,而失败的一方则进入下一次循环

顺序竞争初始化权限

  1. A和B都检测table为空
  2. A和B都检测(sc = sizeCtl) < 0不成立
  3. B由于CPU的调度被暂停了,A执行U.compareAndSetInt(this, SIZECTL, sc, -1)成功
  4. A完成了table的初始化并还原的sizeCtl的值
  5. B执行U.compareAndSetInt(this, SIZECTL, sc, -1)成功
  6. B进行第2次检测发现table已经被初始化了,则还原sizeCtl并返回

不竞争初始化权限

  1. A检测table为空
  2. A检测(sc = sizeCtl) < 0不成立
  3. A执行U.compareAndSetInt(this, SIZECTL, sc, -1)成功并开始初始化
  4. B检测table为空
  5. B检测(sc = sizeCtl) < 0成立,B让渡出CPU的使用权等待A初始化完成
  6. A完成初始化,还原sizeCtl并返回

头节点检测

头节点检测的代码片段如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
for (Node<K,V>[] tab = table;;) {
    ...
    // key的hash所在槽没有任何节点
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // 通过cas将竞争头节点
        if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
            break;                   // no lock when adding to empty bin
    }
    // 当前冲突链表/红黑树已被迁移至新的hash表
    else if ((fh = f.hash) == MOVED)
        // 帮助完成迁移工作,之后进入下一次循环
        tab = helpTransfer(tab, f);
    // 头节点的key值与给定key相同且不允许同一个key重复插入
    else if (onlyIfAbsent // check first node without acquiring lock
             && fh == hash
             && ((fk = f.key) == key || (fk != null && key.equals(fk)))
             && (fv = f.val) != null)
        return fv;
    else {
        ...
    }
}

不去遍历冲突链表/红黑树,仅使用头节点的情况有2种:

  1. 头节点为空,这意味着不存在任何冲突,因此只需要通过CAS保证头节点变更的原子性即可, 在并发下成功的线程会直接返回,而失败则意味着进入下次循环再次根据实际情况判断。
  2. 头节点的Key与所传参数一致的情况下,如果允许同Key的覆盖,那么就需要考虑同时有多个 线程会同时操作这个头节点的情况,这里的操作包括可能会发生移除操作,因此不能简单的将 头节点的Value替换为参数传递的内容。但当不允许同Key的覆盖则不存在这一限制,因为我们 不需要修改头节点的内容,只需要得到它然后返回即可。

还有一种情况是由于扩容的发生,导致当前槽内的所有节点已被迁移值新的Hash表,这时如果 插入值到旧的Hash表就会发生数据丢失,而直接插入新表则需要考虑锁住新表中相应槽的头节点, 这会导致迁移的工作变得更加复杂以及耗时。因此,源码中的实现采用了辅助迁移的方式加速 扩容。

冲突链表/红黑树遍历

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
for (Node<K,V>[] tab = table;;) {
    ...
    else {
        V oldVal = null;
        // 锁住头节点
        synchronized (f) {
            // 链表遍历
            if (tabAt(tab, i) == f) {
                if (fh >= 0) {
                    binCount = 1;
                    for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        Node<K,V> pred = e;
                        if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key, value);
                            break;
                        }
                    }
                }
                // 红黑数遍历
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                   value)) != null) {
                        oldVal = p.val;
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                }
                // 异常情况
                else if (f instanceof ReservationNode)
                    throw new IllegalStateException("Recursive update");
            }
        }
        // 已解除头节点锁
        if (binCount != 0) {
            // 判断冲突节点个数是否超过门限
            if (binCount >= TREEIFY_THRESHOLD)
                // 尝试将链表转为红黑树
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
    }
}

无论是遍历链表还是二叉树,其本质都是查找是否存在Key一致的节点,如果存在就根据 onlyIfAbsent决定是否替换节点中的值。

比较值得注意的是使用synchronize锁住头节点之后进行的tabAt(tab, i) == f判断, 这里的检测是为了保证在等待锁的过程中,其他线程的操作是否导致了头节点的变化,而 实际线程操作可能产生头节点变化的情况有以下几种:

  1. 由于扩容导致头节点由NodeTreeBin变为ForwardingNode
  2. 由于冲突节点个数超过TREEIFY_THRESHOLD而导致节点由Node变为TreeBin
  3. 由于冲突节点个数减少而导致节点由TreeBin还原成Node
  4. 由于头节点的移除而导致的头节点变更

头节点的变化会导致2种情况的发生:

  1. 后续的线程获取锁住新的头节点而导致多个线程并发修改同一个冲突链表/红黑树
  2. 当前线程以旧的头节点进行后续操作是无效的

因此,在检测到头节点变化后,会立刻放弃本次循环。

注意到一点,在插入执行成功之后,线程会解锁头节点之后再调用treeifyBin,个人 认为这是由于treeifyBin可能调用tryPresize,如果持续锁住该头节点,则会使得 整个迁移过程该槽无法进行任何修改,减低并发能力。

冲突链表树化treeifyBin

首先,需要明确treeifyBin并非一定会将冲突链表转为红黑树,而是尝试采用扩容 的方式解决问题。这部分的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n;
    if (tab != null) {
        // 判断是否尝试扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        // 获取key所在链表头节点,防止代码执行期间的节点变化
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 锁住头节点
            synchronized (b) {
                // 判断获取锁的期间头节点是否产生变化
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

根据源码,是否先进行扩容是依据常量MIN_TREEIFY_CAPACITY进行的。暂时不考虑扩容的 情况,冲突链表的树化与链表操作一样都需要获取头节点的锁,所使用的也是双重检测的方 法防止头节点的变化。不同的是,treeifyBin并不会进行重试,这是由于节点的变化一般 都是类型的变化,这些情况下要么是冲突链表已被其他线程转为红黑树,要么是已被迁移。

而当非类型变化时,也的确可能发生冲突节点个数大于MIN_TREEIFY_CAPACITY且由于并发 而不产生树化的情况:

  1. 线程A、B、C各添加1个节点,并释放头节点
  2. 线程D获取头节点锁
  3. 线程A、B、C执行treeifyBin,并锁住头节点失败阻塞
  4. 线程D移除头节点
  5. 现成A、B、C发现头节点变化,不执行树化

Hash迁移transfer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // stride为线程所负责的table迁移窗口大小,迁移窗口为[transferIndex - stride, transferIndex)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    // 初始化新的Hash表
    if (nextTab == null) {
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 更新迁移边界
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 更新迁移槽
        while (advance) {
            int nextIndex, nextBound;
            // 当前的迁移窗口中的槽还未迁移完毕
            if (--i >= bound || finishing)
                advance = false;
            // 已无新的迁移窗口可以获取
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 尝试获取新的迁移窗口
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 无法迁移
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n;
            }
        }
        // 当前迁移槽为空
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 当前迁移槽已被迁移
        else if ((fh = f.hash) == MOVED)
            advance = true;
        // 进行迁移
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 链表迁移
                    ...
                }
                else if (f instanceof TreeBin) {
                    // 红黑树迁移
                    ...
                }
                else if (f instanceof ReservationNode)
                    throw new IllegalStateException("Recursive update");
            }
        }
    }
}

在解释这部分代码前大致说明一下几个变量:

  1. transferIndex,当前可用迁移上界,初始为table的长度
  2. stride,步长,可以理解为负责迁移窗口的大小,大小由原始table的大小以及 CPU核数共同确定

举个例子说明一下,假定线程A发起了迁移工作,那么它会依据当前的transferIndex尝试 获取自己可以进行迁移的窗口,即在table[transferIndex - stride, transferIndex) 所包含的索引范围均由该线程迁移至nextTable。竞争的方式是将transferIndex变量通过 CAS更新为transferIndex - stride

有了上面的基础思路,代码逻辑就十分好理解了:

  1. 确定迁移窗口大小
  2. 检测nextTable是否初始化,如果没有则进行初始化
  3. 获取下一个迁移的槽,如果当前的窗口中还有能够使用的迁移槽则直接使用,否则尝试获取 新的窗口
  4. 根据槽中头节点的内容进行迁移

总结

ConcurrentHashMap使用了CAS和synchronized两个同步手段保证线程安全,相比与JDK1.7中 的分段锁实现,优化的部分有:

  1. JDK 1.7中的实现并发度在初始化的时候就已经确定,无法提高,而JDK 8中则能够通过扩容提 高并发能力
  2. 在解决冲突的方法上也做了优化,默认当冲突数大于8个时会将冲突链表转为红黑树以获得更高 的查询性能

在实现方面,ConcurrentHashMap大量采用了CAS与自旋的方案,避免了大量的加锁操作。

Built with Hugo
主题 StackJimmy 设计