ConcurrentHashMap作为Java中高性能的并发哈希表,其扩容机制是保证线程安全与高效性能的核心设计。本文将深入解析Java 8版本ConcurrentHashMap的扩容全过程,从触发条件到多线程协作,再到数据迁移和最终收尾,全面展现这一复杂机制的实现原理与源码逻辑。

一、扩容触发条件与判断机制

ConcurrentHashMap的扩容主要由三种场景触发:

  1. 元素数量超过扩容阈值:当通过putVal()方法插入新元素后,会调用addCount()方法更新计数器并检查是否需要扩容。当元素数量s超过sizectl(即扩容阈值)时,触发扩容 。
  2. 链表长度超过阈值且数组长度小于64:当某个桶中的链表长度超过8时,会调用treeifyBin()方法尝试将链表转为红黑树。但若数组长度小于64,则会优先触发扩容,而非树化 。
  3. putAll()方法调用:当调用putAll()方法批量插入元素时,会通过tryPresize()方法预计算所需容量,主动触发扩容 。

扩容的判断主要依赖于sizectl变量的状态:

  • sizectl > 0:表示初始容量或当前扩容阈值
  • sizectl = 0:表示未初始化
  • sizectl = -1:表示正在初始化
  • sizectl < -1:表示正在扩容,其中最高位表示扩容标识,低16位表示当前参与扩容的线程数

二、新数组创建与初始参数设置

当触发扩容时,ConcurrentHashMap会创建一个两倍于原数组的新数组,并设置相关参数:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 计算迁移步长,最小为16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN Transfer STRIDE)
        stride = MIN Transfer STRIDE; // subdivide range
    // 如果是第一个扩容线程,创建新数组
    if (nextTab == null) {
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            sizectl = Integer.MAX_VALUE; // OOM
            return;
        }
        nextTable = nextTab; // 设置新表
        transferIndex = n;    // 初始化迁移索引为原数组长度
    }
    // 计算新数组长度
    int nextn = nextTab.length;
    // 创建ForwardingNode,用于标记已迁移的桶
    ForwardingNode<K,V>前进节点= new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // 是否为最后一个线程
    for (int i = 0, bound = 0;;) {
        // 确定迁移区间
        Node<K,V> f;
        int fh;
        while (advance) {
            int nexti = bound + stride;
            if (nexti <= 0 || (n >= NCPU && bound == 0)) {
                advance = false;
            } else {
                advance = (bound = nexti) > 0;
            }
        }
        // 处理当前桶
        if ((f = tabAt(tab, i)) != null) {
            int fh = f.hash;
            if (fh >= 0) {
                // 正常节点,进行迁移
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 迁移链表或红黑树
                        Node<K,V> oldFirst = f;
                        tabAt(tab, i) =前进节点; // 替换为ForwardingNode
                        // 迁移过程
                        Node<K,V> newFirst = null, e = oldFirst.next;
                        int nodeCount = 0;
                        oldFirst.next = null; // 断开原链表
                        Node<K,V>bf = null;
                        do {
                            int h = oldFirst.hash;
                            Node<K,V>ln =bf, c = newFirst;
                            if ((h & n) == 0) {
                               bf = oldFirst; // 旧位置
                                oldFirst.next = ln == null ? c : ln;
                            } else {
                                // 新位置
                                oldFirst.next = c;
                                newFirst = oldFirst;
                            }
                            nodeCount++;
                        } while ((oldFirst = e) != null);
                        // 更新桶数据
                        if (nodeCount > 0) {
                            setTabAt(nextTab, i, newFirst);
                            if (bf != null) {
                                setTabAt(nextTab, i + n, bf);
                            }
                        }
                    }
                }
            }
            // 处理其他情况
        }
        // 更新迁移索引
        if (i == 0 || finishing) {
            advance = finishing = (tabAt(nextTab, (n << 1) - 1) != null);
        }
        // 检查是否所有线程已完成迁移
        if (finishing) {
            // 最后一个线程完成迁移
            if (U.compareAndSwapInt(this, SIZECTL, sc,
                    sc = (sc < 0) ? sc + 1 : sc)) {
                if ((sc &CAP 31) == 0) {
                    // 更新阈值并设置新表
                    nextTable = null;
                    table = nextTab;
                    sizectl = (n << 1) - ((n << 1) >>> 2); // 新阈值
                    return;
                }
            }
        }
        // 其他线程协助迁移
        else if (U.getAndAddInt(this, SIZECTL, -1) <= 0) {
            // 该线程成为最后一个协助线程
            finishing = advance = true;
            i = n; // 从原数组末尾开始
        }
        // 后续处理
    }
}

三、多线程并发迁移数据实现

ConcurrentHashMap的扩容最显著的特点是支持多线程并发迁移数据,其核心机制如下:

1. 任务分片机制

ConcurrentHashMap采用反向任务分片的方式,将数组的迁移任务分配给多个线程并行处理:

// 确定迁移步长
int stride = (NCPU > 1) ? (n >>> 3) / NCPU : n;
if (stride < MIN Transfer STRIDE) stride = MIN Transfer STRIDE;

// 首个线程创建新数组后,设置迁移索引为n
transferIndex = n;

每个线程通过以下逻辑领取任务区间:

// 从后往前领取任务区间
int nexti = bound + stride;
if (nexti <= 0 || (n >= NCPU && bound == 0)) {
    advance = false;
} else {
    advance = (bound = nexti) > 0;
}

2. ForwardingNode标记机制

ForwardingNode是ConcurrentHashMap扩容中的关键标记,当一个桶开始迁移时,会通过CAS操作将桶头节点替换为ForwardingNode:

// 创建ForwardingNode
ForwardingNode<K,V>前进节点= new ForwardingNode<K,V>(nextTab);

// 迁移时替换桶头
if (tabAt(tab, i) == f) {
    Node<K,V> oldFirst = f;
    tabAt(tab, i) =前进节点; // 替换为ForwardingNode
    // ...后续迁移操作
}

其他线程在访问该桶时,发现ForwardingNode后会调用helpTransfer()方法协助迁移:

// get()方法中遇到ForwardingNode
else if (eh < 0) {
    return (p = e.find(h, key)) != null ? p.val : null;
}

// find()方法中处理ForwardingNode
Node<K,V> find(int h, Object k) {
    Node<K,V> e = this;
    if (k != null) {
        do {
            int ehash = e.hash;
            if (ehash == h) {
                Object ek = e.key;
                if ((ek == null && k == null) || (ek != null && ek.equals(k)))
                    return e;
            }
            // 遇到ForwardingNode则协助迁移
            else if (ehash < 0) {
                if (e != head || !CAS) tabAt(c, i) ==前进节点)
                    return null;
                // 协助迁移
                Node<K,V> n = helpTransfer(c, i);
                if (n == null)
                    return null;
                e = n;
            }
            else {
                e = e.next;
            }
        } while (!节点为空);
    }
    return null;
}

3. 锁粒度控制

ConcurrentHashMap在迁移过程中采用细粒度锁,仅对当前处理的桶头节点加锁,避免全局锁带来的性能瓶颈:

// 对当前桶头节点加锁
synchronized (f) {
    if (tabAt(tab, i) == f) {
        // 迁移操作
    }
}

4. 红黑树处理

当桶中的链表已转为红黑树时,迁移过程会有所不同:

// 处理红黑树节点
if (f instanceof TreeBin) {
    TreeBin<K,V> t = (TreeBin<K,V>)f;
    // 拆分红黑树节点到新数组的两个桶中
    TabAt(nextTab, i, t.split(nextn));
}

红黑树节点会被拆分到新数组的i和i+n两个位置,保证扩容后红黑树的结构完整性。

四、扩容完成后的收尾工作

当所有线程完成数据迁移后,ConcurrentHashMap需要进行以下收尾工作:

if ((sc &CAP 31) == 0) {
    // 更新阈值并设置新表
    nextTable = null;
    table = nextTab;
    sizectl = (n << 1) - ((n << 1) >>> 2); // 新阈值
    return;
}

1. 状态更新

扩容完成后,ConcurrentHashMap需要更新以下状态

  • 将sizectl设置为新数组长度的扩容阈值(0.75倍新容量)
  • 将table指向新数组nextTab
  • 清空nextTable和transferIndex,释放资源

2. 线程计数与完成判断

ConcurrentHashMap通过sizectl的负值编码管理扩容线程

  • 扩容开始时,sizectl被设置为(rs << RESIZE_STAMP_SHIFT) + 2,其中rs为扩容标识,初始线程数为2
  • 每个参与迁移的线程通过CAS操作将sizectl加1,表示增加一个迁移线程
  • 线程完成迁移任务后,通过CAS操作将sizectl减1
  • 当sizectl减至0时,表示所有线程已完成迁移,最后一个线程执行postTransfer()完成收尾工作

3. 异常处理

若扩容过程中发生OOM(内存溢出),ConcurrentHashMap会进行特殊处理:

// 尝试创建新数组时捕获异常
try {
    @SuppressWarnings("unchecked")
    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
    nextTab = nt;
} catch (Throwable ex) {
    sizectl = Integer.MAX_VALUE; // 标记OOM
    return;
}

当sizectl被设置为Integer.MAX_VALUE时,后续操作将不再尝试扩容。

五、扩容全过程Mermaid流程图

迁移策略
多线程协助迁移
满足条件
不满足条件
桶为空
桶已迁移
桶未迁移
链表结构?
拆分链表/树结构
高低位分组
按位与结果拆分链表
低位=0和高位=1
树结构迁移
拆分为两棵红黑树
迁移链表节点
迁移树节点
发现ForwardingNode?
其他线程访问表
加入迁移任务
正常访问
计算步长stride
分配迁移任务范围
开始扩容过程
检查扩容条件
初始化新哈希表
大小为原表2倍
退出
设置transferIndex
记录迁移进度
检查当前桶状态
设置ForwardingNode
标记已迁移
处理下一个桶
锁定桶头节点
迁移节点到新表
保持相对顺序
所有桶迁移完成?
完成迁移
更新表引用和阈值
扩容完成

六、扩容机制的核心优势

ConcurrentHashMap的扩容机制具有以下核心优势:

  1. 渐进式迁移通过多线程并行处理,将原本需要串行完成的迁移任务分散到多个线程中,大大提高了扩容效率。
  2. 低阻塞设计仅在必要时对单个桶加锁,避免了全局锁带来的性能问题,确保扩容过程中大部分数据仍可正常访问。
  3. 无锁CAS操作使用CAS操作协调线程间的协作,减少了线程间的显式同步,提高了并发性能。
  4. 动态负载均衡根据CPU核心数动态计算迁移步长,平衡了线程间的负载,避免了某些线程过载而其他线程闲置的情况。
  5. 智能树化处理在扩容时自动处理红黑树节点,保证了树化结构的完整性和性能优势。

七、扩容过程中的线程协作

ConcurrentHashMap的扩容过程中,线程间的协作主要通过以下机制实现:

  1. 任务分片每个线程负责迁移一部分桶,通过transferIndex和stride确定迁移区间,避免线程间任务重叠。
  2. ForwardingNode标记当一个桶开始迁移时,插入ForwardingNode标记,其他线程遇到此标记会自动跳转到新表并协助迁移。
  3. 动态调整线程在迁移过程中会根据剩余任务动态调整区间,确保所有桶都被正确迁移。
  4. 最终检查最后一个完成迁移的线程会执行最终检查,确保所有数据都已正确迁移,并更新相关状态。

八、扩容性能分析

ConcurrentHashMap的扩容机制在性能上具有以下特点:

  1. 时间复杂度单次扩容的时间复杂度为O(n),与数组长度成线性关系,但通过多线程并行处理,实际执行时间可大大缩短。
  2. 空间复杂度扩容时需要额外的空间存储新数组,空间复杂度为O(2n),但旧数组在扩容完成后会被回收。
  3. 并发性能相比传统HashMap的单线程扩容,ConcurrentHashMap的多线程扩容在高并发环境下性能优势明显。
  4. 负载均衡通过动态计算迁移步长,确保了不同线程间的负载均衡,提高了整体迁移效率。

九、总结

Java 8版本的ConcurrentHashMap通过一种创新的多线程并发扩容机制,实现了线程安全与高性能的完美平衡。其核心在于任务分片、ForwardingNode标记和细粒度锁的巧妙结合,使得扩容过程可以由多个线程并行完成,同时保证了扩容期间的正常读写操作。这种设计避免了传统扩容方法中的全局锁问题,大大提高了在高并发环境下的性能表现。

ConcurrentHashMap的扩容机制充分体现了现代并发编程中"无锁"或"少锁"的设计理念,通过CAS操作和状态标记实现线程间的协作,而非依赖传统的锁机制。这种设计不仅保证了线程安全,还最大限度地减少了线程阻塞,提高了并发性能。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐