文章目录
- Pre
- 概述
- 场景
- 问题复现
- 现象
- 问题分析
- 解决方案:加锁版本
- Fix
- 效果
- 改进与更优方案:使用原子操作避免加锁
- 使用 `computeIfAbsent` 等原子方法
- 单线程执行填充操作
- 总结
Pre
J.U.C Review - 并发容器集合解析
Java Review - 并发组件ConcurrentHashMap使用时的注意事项及源码分析
概述
JDK 1.5 后推出的 ConcurrentHashMap
,是一个高性能的线程安全的哈希表容器。“线程安全”这四个字特别容易让人误解,因为 ConcurrentHashMap
只能保证提供的原子性读写操作是线程安全的.
我们通常误以为使用了 ConcurrentHashMap
后,就不需要担心线程安全问题。然而ConcurrentHashMap
只能保证单个操作(如 put
、get
)的线程安全,无法确保多个操作(如 size
和 putAll
)之间的一致性。
场景
有一个含 900 个元素的 Map,现在再补充 100 个元素进去,这个补充操作由 10 个线程并发进行。
问题复现
java">private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
private ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new
));
}
@GetMapping("wrongVersion")
public String wrongVersion() throws InterruptedException {
// 初始化900个元素
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size: {}", concurrentHashMap.size());
// 使用 ForkJoinPool 进行并发操作
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
// 查询缺少的元素数量
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size: {}", gap);
// 补充缺少的元素
concurrentHashMap.putAll(getData(gap));
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
// 输出最终大小
log.info("finish size: {}", concurrentHashMap.size());
return "OK";
}
开发人员误以为使用了 ConcurrentHashMap
就不会有线程安全问题,于是不加思索地写出了下面的代码:在每一个线程的代码逻辑中先通过 size
方法拿到当前元素数量,计算 ConcurrentHashMap
目前还需要补充多少元素,并在日志中输出了这个值,然后通过
putAll
方法把缺少的元素添加进去。
现象
- 初始大小:900
- 多个线程执行时计算到的差值(
gap
)不一致,有的线程甚至发现了负值。 - 最终
ConcurrentHashMap
的大小不是预期的 1000。
问题分析
-
ConcurrentHashMap 的误用:
虽然ConcurrentHashMap
能保证基本的线程安全,但无法假设多个方法调用之间的一致性。例如,当一个线程通过size()
获取元素个数时,另一个线程可能已经向Map
中插入了新元素。这会导致不同线程看到的数据不一致。 -
非原子性操作的问题:
size()
返回的是ConcurrentHashMap
的瞬时快照,但在并发情况下,它可能只是一个中间状态,不能用于精确控制逻辑。putAll()
并不是原子操作,如果在它执行的过程中其他线程也在修改Map
,就会出现预期外的结果。
解决方案:加锁版本
为了解决上述问题,我们可以在 补充元素的逻辑 上加锁,确保这部分逻辑的原子性。
Fix
java">@GetMapping("rightVersion")
public String rightVersion() throws InterruptedException {
// 初始化900个元素
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size: {}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
synchronized (concurrentHashMap) {
// 查询缺少的元素数量
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size: {}", gap);
// 补充缺少的元素
concurrentHashMap.putAll(getData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size: {}", concurrentHashMap.size());
return "OK";
}
效果
- 只有一个线程会发现需要补充 100 个元素,其他线程看到的
gap
为 0。 - 最终
ConcurrentHashMap
的大小为 1000,符合预期。
改进与更优方案:使用原子操作避免加锁
虽然加锁能解决问题,但这样会牺牲并发性能。我们可以通过一些更优的方案来避免锁的使用:
使用 computeIfAbsent
等原子方法
ConcurrentHashMap
提供了如 computeIfAbsent
、merge
等原子操作,我们可以利用这些方法简化并发逻辑。
单线程执行填充操作
由于填充操作涉及多个步骤,不易实现原子性。可以考虑将所有填充操作交给一个线程来完成,从而避免并发问题。
java">@GetMapping("betterVersion")
public String betterVersion() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size: {}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
// 使用单线程来执行填充操作,避免并发问题
forkJoinPool.submit(() -> {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size: {}", gap);
concurrentHashMap.putAll(getData(gap));
}).get(); // 等待任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size: {}", concurrentHashMap.size());
return "OK";
}
总结
使用并发工具类的注意事项
-
多操作间状态一致性问题:
ConcurrentHashMap
能保证单次操作的线程安全,但无法保证多个操作之间的一致性。 -
避免使用快照方法控制流程:
在并发情况下,size()
等方法只能用作参考,不能用于控制流程逻辑。 -
尽量使用原子操作:
使用computeIfAbsent
、merge
等方法,可以在一定程度上避免显式加锁。 -
不要滥用锁:
在高并发场景下,频繁加锁会严重影响性能,合理设计并发逻辑才能充分发挥ConcurrentHashMap
的性能优势。
教训: 误以为使用了并发工具就可以解决一切线程安全问题,期望通过把线程不安全的类替换为线程安全的类来一键解决问题。比如,认为使用了
ConcurrentHashMap
就可以解决线程安全问题,没对复合逻辑加锁导致业务逻辑错误。如果希望在一整段业务逻辑中,对容器的操作都保持整体一致性的话,需要加锁处理