Java 8 ConcurrentHashMap 扩容机制深度解析【源码、全流程 mermaid】
本文深入解析了Java 8中ConcurrentHashMap的扩容机制,重点分析了其多线程协作的扩容过程。扩容主要由元素数量超阈值、链表过长且数组小于64、批量插入三种场景触发,通过sizectl变量状态进行判断。扩容时创建2倍新数组,采用任务分片机制将迁移工作分配给多个线程并行处理。关键设计包括:通过ForwardingNode标记已迁移桶,使用反向任务分片避免竞争,以及多线程协同完成数据迁移
ConcurrentHashMap作为Java中高性能的并发哈希表,其扩容机制是保证线程安全与高效性能的核心设计。本文将深入解析Java 8版本ConcurrentHashMap的扩容全过程,从触发条件到多线程协作,再到数据迁移和最终收尾,全面展现这一复杂机制的实现原理与源码逻辑。
一、扩容触发条件与判断机制
ConcurrentHashMap的扩容主要由三种场景触发:
- 元素数量超过扩容阈值:当通过putVal()方法插入新元素后,会调用addCount()方法更新计数器并检查是否需要扩容。当元素数量s超过sizectl(即扩容阈值)时,触发扩容 。
- 链表长度超过阈值且数组长度小于64:当某个桶中的链表长度超过8时,会调用treeifyBin()方法尝试将链表转为红黑树。但若数组长度小于64,则会优先触发扩容,而非树化 。
- putAll()方法调用:当调用putAll()方法批量插入元素时,会通过tryPresize()方法预计算所需容量,主动触发扩容 。
扩容的判断主要依赖于sizectl变量的状态:
- sizectl > 0:表示初始容量或当前扩容阈值
- sizectl = 0:表示未初始化
- sizectl = -1:表示正在初始化
- sizectl < -1:表示正在扩容,其中最高位表示扩容标识,低16位表示当前参与扩容的线程数
二、新数组创建与初始参数设置
当触发扩容时,ConcurrentHashMap会创建一个两倍于原数组的新数组,并设置相关参数:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算迁移步长,最小为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN Transfer STRIDE)
stride = MIN Transfer STRIDE; // subdivide range
// 如果是第一个扩容线程,创建新数组
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizectl = Integer.MAX_VALUE; // OOM
return;
}
nextTable = nextTab; // 设置新表
transferIndex = n; // 初始化迁移索引为原数组长度
}
// 计算新数组长度
int nextn = nextTab.length;
// 创建ForwardingNode,用于标记已迁移的桶
ForwardingNode<K,V>前进节点= new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // 是否为最后一个线程
for (int i = 0, bound = 0;;) {
// 确定迁移区间
Node<K,V> f;
int fh;
while (advance) {
int nexti = bound + stride;
if (nexti <= 0 || (n >= NCPU && bound == 0)) {
advance = false;
} else {
advance = (bound = nexti) > 0;
}
}
// 处理当前桶
if ((f = tabAt(tab, i)) != null) {
int fh = f.hash;
if (fh >= 0) {
// 正常节点,进行迁移
synchronized (f) {
if (tabAt(tab, i) == f) {
// 迁移链表或红黑树
Node<K,V> oldFirst = f;
tabAt(tab, i) =前进节点; // 替换为ForwardingNode
// 迁移过程
Node<K,V> newFirst = null, e = oldFirst.next;
int nodeCount = 0;
oldFirst.next = null; // 断开原链表
Node<K,V>bf = null;
do {
int h = oldFirst.hash;
Node<K,V>ln =bf, c = newFirst;
if ((h & n) == 0) {
bf = oldFirst; // 旧位置
oldFirst.next = ln == null ? c : ln;
} else {
// 新位置
oldFirst.next = c;
newFirst = oldFirst;
}
nodeCount++;
} while ((oldFirst = e) != null);
// 更新桶数据
if (nodeCount > 0) {
setTabAt(nextTab, i, newFirst);
if (bf != null) {
setTabAt(nextTab, i + n, bf);
}
}
}
}
}
// 处理其他情况
}
// 更新迁移索引
if (i == 0 || finishing) {
advance = finishing = (tabAt(nextTab, (n << 1) - 1) != null);
}
// 检查是否所有线程已完成迁移
if (finishing) {
// 最后一个线程完成迁移
if (U.compareAndSwapInt(this, SIZECTL, sc,
sc = (sc < 0) ? sc + 1 : sc)) {
if ((sc & ~CAP 31) == 0) {
// 更新阈值并设置新表
nextTable = null;
table = nextTab;
sizectl = (n << 1) - ((n << 1) >>> 2); // 新阈值
return;
}
}
}
// 其他线程协助迁移
else if (U.getAndAddInt(this, SIZECTL, -1) <= 0) {
// 该线程成为最后一个协助线程
finishing = advance = true;
i = n; // 从原数组末尾开始
}
// 后续处理
}
}
三、多线程并发迁移数据实现
ConcurrentHashMap的扩容最显著的特点是支持多线程并发迁移数据,其核心机制如下:
1. 任务分片机制
ConcurrentHashMap采用反向任务分片的方式,将数组的迁移任务分配给多个线程并行处理:
// 确定迁移步长
int stride = (NCPU > 1) ? (n >>> 3) / NCPU : n;
if (stride < MIN Transfer STRIDE) stride = MIN Transfer STRIDE;
// 首个线程创建新数组后,设置迁移索引为n
transferIndex = n;
每个线程通过以下逻辑领取任务区间:
// 从后往前领取任务区间
int nexti = bound + stride;
if (nexti <= 0 || (n >= NCPU && bound == 0)) {
advance = false;
} else {
advance = (bound = nexti) > 0;
}
2. ForwardingNode标记机制
ForwardingNode是ConcurrentHashMap扩容中的关键标记,当一个桶开始迁移时,会通过CAS操作将桶头节点替换为ForwardingNode:
// 创建ForwardingNode
ForwardingNode<K,V>前进节点= new ForwardingNode<K,V>(nextTab);
// 迁移时替换桶头
if (tabAt(tab, i) == f) {
Node<K,V> oldFirst = f;
tabAt(tab, i) =前进节点; // 替换为ForwardingNode
// ...后续迁移操作
}
其他线程在访问该桶时,发现ForwardingNode后会调用helpTransfer()方法协助迁移:
// get()方法中遇到ForwardingNode
else if (eh < 0) {
return (p = e.find(h, key)) != null ? p.val : null;
}
// find()方法中处理ForwardingNode
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
int ehash = e.hash;
if (ehash == h) {
Object ek = e.key;
if ((ek == null && k == null) || (ek != null && ek.equals(k)))
return e;
}
// 遇到ForwardingNode则协助迁移
else if (ehash < 0) {
if (e != head || !CAS) tabAt(c, i) ==前进节点)
return null;
// 协助迁移
Node<K,V> n = helpTransfer(c, i);
if (n == null)
return null;
e = n;
}
else {
e = e.next;
}
} while (!节点为空);
}
return null;
}
3. 锁粒度控制
ConcurrentHashMap在迁移过程中采用细粒度锁,仅对当前处理的桶头节点加锁,避免全局锁带来的性能瓶颈:
// 对当前桶头节点加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
// 迁移操作
}
}
4. 红黑树处理
当桶中的链表已转为红黑树时,迁移过程会有所不同:
// 处理红黑树节点
if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
// 拆分红黑树节点到新数组的两个桶中
TabAt(nextTab, i, t.split(nextn));
}
红黑树节点会被拆分到新数组的i和i+n两个位置,保证扩容后红黑树的结构完整性。
四、扩容完成后的收尾工作
当所有线程完成数据迁移后,ConcurrentHashMap需要进行以下收尾工作:
if ((sc & ~CAP 31) == 0) {
// 更新阈值并设置新表
nextTable = null;
table = nextTab;
sizectl = (n << 1) - ((n << 1) >>> 2); // 新阈值
return;
}
1. 状态更新
扩容完成后,ConcurrentHashMap需要更新以下状态:
- 将sizectl设置为新数组长度的扩容阈值(0.75倍新容量)
- 将table指向新数组nextTab
- 清空nextTable和transferIndex,释放资源
2. 线程计数与完成判断
ConcurrentHashMap通过sizectl的负值编码管理扩容线程:
- 扩容开始时,sizectl被设置为
(rs << RESIZE_STAMP_SHIFT) + 2
,其中rs为扩容标识,初始线程数为2 - 每个参与迁移的线程通过CAS操作将sizectl加1,表示增加一个迁移线程
- 线程完成迁移任务后,通过CAS操作将sizectl减1
- 当sizectl减至0时,表示所有线程已完成迁移,最后一个线程执行postTransfer()完成收尾工作
3. 异常处理
若扩容过程中发生OOM(内存溢出),ConcurrentHashMap会进行特殊处理:
// 尝试创建新数组时捕获异常
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizectl = Integer.MAX_VALUE; // 标记OOM
return;
}
当sizectl被设置为Integer.MAX_VALUE时,后续操作将不再尝试扩容。
五、扩容全过程Mermaid流程图
六、扩容机制的核心优势
ConcurrentHashMap的扩容机制具有以下核心优势:
- 渐进式迁移:通过多线程并行处理,将原本需要串行完成的迁移任务分散到多个线程中,大大提高了扩容效率。
- 低阻塞设计:仅在必要时对单个桶加锁,避免了全局锁带来的性能问题,确保扩容过程中大部分数据仍可正常访问。
- 无锁CAS操作:使用CAS操作协调线程间的协作,减少了线程间的显式同步,提高了并发性能。
- 动态负载均衡:根据CPU核心数动态计算迁移步长,平衡了线程间的负载,避免了某些线程过载而其他线程闲置的情况。
- 智能树化处理:在扩容时自动处理红黑树节点,保证了树化结构的完整性和性能优势。
七、扩容过程中的线程协作
ConcurrentHashMap的扩容过程中,线程间的协作主要通过以下机制实现:
- 任务分片:每个线程负责迁移一部分桶,通过transferIndex和stride确定迁移区间,避免线程间任务重叠。
- ForwardingNode标记:当一个桶开始迁移时,插入ForwardingNode标记,其他线程遇到此标记会自动跳转到新表并协助迁移。
- 动态调整:线程在迁移过程中会根据剩余任务动态调整区间,确保所有桶都被正确迁移。
- 最终检查:最后一个完成迁移的线程会执行最终检查,确保所有数据都已正确迁移,并更新相关状态。
八、扩容性能分析
ConcurrentHashMap的扩容机制在性能上具有以下特点:
- 时间复杂度:单次扩容的时间复杂度为O(n),与数组长度成线性关系,但通过多线程并行处理,实际执行时间可大大缩短。
- 空间复杂度:扩容时需要额外的空间存储新数组,空间复杂度为O(2n),但旧数组在扩容完成后会被回收。
- 并发性能:相比传统HashMap的单线程扩容,ConcurrentHashMap的多线程扩容在高并发环境下性能优势明显。
- 负载均衡:通过动态计算迁移步长,确保了不同线程间的负载均衡,提高了整体迁移效率。
九、总结
Java 8版本的ConcurrentHashMap通过一种创新的多线程并发扩容机制,实现了线程安全与高性能的完美平衡。其核心在于任务分片、ForwardingNode标记和细粒度锁的巧妙结合,使得扩容过程可以由多个线程并行完成,同时保证了扩容期间的正常读写操作。这种设计避免了传统扩容方法中的全局锁问题,大大提高了在高并发环境下的性能表现。
ConcurrentHashMap的扩容机制充分体现了现代并发编程中"无锁"或"少锁"的设计理念,通过CAS操作和状态标记实现线程间的协作,而非依赖传统的锁机制。这种设计不仅保证了线程安全,还最大限度地减少了线程阻塞,提高了并发性能。
更多推荐
所有评论(0)