JUC高并发编程—Fork / Join
一、Fork/Join 解决什么问题?
1. 传统 ThreadPoolExecutor 缺点
普通线程池:任务提交到阻塞队列,线程从队列抢任务,任务粒度不能拆分。 如果一个超大耗时任务(比如遍历 100 万条数据求和)丢给单一线程,其他线程全部空闲,CPU 利用率极低,无法充分多核并行。
2. Fork/Join 核心思想:分而治之
类比归并排序:
- Fork(拆分):大任务拆成若干小任务,直到小任务足够简单不用再拆;
- Join(合并):等待所有子小任务执行完成,收集所有子结果汇总成最终结果。
一句话通俗理解: 大活拆成一堆小活分给多个人干,所有人干完把结果汇总,最大化利用多核 CPU。
3. 适用场景 vs 不适用场景
✅ 适合:CPU 密集型、可递归拆分、计算量大
- 大数据分段求和、数组归并排序、树形递归遍历、海量数据统计 ❌ 不适合:IO 密集型(数据库 / 网络等待),线程阻塞会严重降低效率
二、Fork/Join 底层核心原理(重中之重)
1. 核心类结构
plaintext
ExecutorService 线程池顶层接口 ↑ ForkJoinPool :Fork/Join专属线程池(替代ThreadPoolExecutor) ↓ ForkJoinTask<V> :任务抽象类(不能直接new) ├ RecursiveTask<V> :有返回值的拆分任务(最常用) └ RecursiveAction :无返回值,只执行逻辑不汇总结果2. 最关键机制:工作窃取 Work-Stealing(ForkJoinPool 灵魂)
普通线程池:全局共享 1 个队列,多线程竞争锁抢任务,空闲线程只能等待。 ForkJoinPool:
- 每个工作线程自带双端队列 Deque;
- 线程自己 fork 拆分出来的子任务,放到自己队列队尾;
- 线程优先从自己队列尾部取任务执行;
- 如果某个线程自己队列为空(干完了),会去其他忙碌线程的队列头部偷任务执行;
优势:
- 减少锁竞争,性能更高;
- 不会出现部分线程忙死、部分线程闲置,CPU 充分打满;
- 双端队列:自己取尾部,别人偷头部,互不冲突。
3. Fork、Join 底层流程
fork():把拆分后的子任务提交到当前线程的本地双端队列,异步执行;join():阻塞等待当前子任务完成,并获取任务返回结果;- 如果子任务还没执行,当前线程会去偷其他任务执行,不会空等浪费 CPU;
- 递归拆分阈值(重点):必须设置阈值,否则无限递归拆分栈溢出 例:数组长度小于 1000 就不拆,直接串行计算;大于 1000 继续二分拆分。
4. ForkJoinPool 线程池参数
java
运行
public ForkJoinPool( int parallelism, // 并行度,默认CPU核心数,代表最大并发线程数 ForkJoinWorkerThreadFactory factory, // 线程工厂 UncaughtExceptionHandler handler, // 异常处理器 boolean asyncMode // 是否异步模式:true只从队列头部取,不用于递归合并 )日常开发直接用默认构造new ForkJoinPool()即可,并行度等于 CPU 核心数。
三、两大任务类区分(RecursiveTask / RecursiveAction)
1. RecursiveTask<V> 有返回值
需要拆分计算后汇总结果:求和、求最大值、数据统计 重写compute()方法,返回泛型结果
2. RecursiveAction 无返回值
只做遍历、写入、打印,不需要合并结果:批量文件处理、批量数据更新 重写compute(),返回 void
四、完整代码示例 1:RecursiveTask 数组分段求和(最经典)
需求:超大数组 1~100000 求和,递归二分拆分,最后合并所有分段结果
java
运行
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; // 拆分求和任务:有返回值,继承RecursiveTask class SumTask extends RecursiveTask<Long> { // 数组、计算区间 [start, end] private final long[] arr; private final int start; private final int end; // 阈值:区间长度小于该值不再拆分,直接计算 private static final int THRESHOLD = 1000; public SumTask(long[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Long compute() { // 1. 区间长度小于阈值,直接串行计算,终止递归 if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i <= end; i++) { sum += arr[i]; } return sum; } // 2. 区间过大,二分拆分 int mid = (start + end) / 2; SumTask leftTask = new SumTask(arr, start, mid); SumTask rightTask = new SumTask(arr, mid + 1, end); // fork:异步提交子任务到本地队列 leftTask.fork(); rightTask.fork(); // join:阻塞获取两个子任务结果 Long leftSum = leftTask.join(); Long rightSum = rightTask.join(); // 合并结果返回 return leftSum + rightSum; } } public class ForkJoinSumDemo { public static void main(String[] args) { // 构造10万长度数组,存放1~100000 long[] arr = new long[100000]; for (int i = 0; i < arr.length; i++) { arr[i] = i + 1; } // 创建ForkJoin线程池 ForkJoinPool pool = new ForkJoinPool(); // 提交根任务 SumTask rootTask = new SumTask(arr, 0, arr.length - 1); Long total = pool.invoke(rootTask); System.out.println("1~100000总和 = " + total); // 校验结果:公式 n*(n+1)/2 System.out.println("公式计算校验 = " + 100000L * 100001 / 2); pool.shutdown(); } }执行流程拆解
- 根任务区间 0~99999,远超阈值 1000,拆左右两半;
- 两半各自继续 fork 拆分,直到所有子任务区间≤1000;
- 空闲线程自动窃取其他线程未执行的子任务;
- 底层子任务全部计算完成后,逐层 join 向上汇总;
- 最终得到全局总和。
五、代码示例 2:RecursiveAction 无返回值批量遍历打印
场景:只遍历处理数据,不需要汇总结果,继承 RecursiveAction
java
运行
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; class PrintTask extends RecursiveAction { private final int start; private final int end; private static final int THRESHOLD = 20; public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 区间小,直接打印 if (end - start <= THRESHOLD) { for (int i = start; i <= end; i++) { System.out.print(Thread.currentThread().getName() + "-" + i + " "); } System.out.println(); return; } // 拆分 int mid = (start + end) / 2; PrintTask left = new PrintTask(start, mid); PrintTask right = new PrintTask(mid + 1, end); // 两个子任务异步执行 left.fork(); right.compute(); // 优化:一个fork,一个当前线程直接执行,减少队列压力 left.join(); } } public class ForkJoinActionDemo { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); PrintTask task = new PrintTask(1, 200); pool.invoke(task); pool.shutdown(); } }小优化技巧
不要全部都调用fork(),可以一个 fork、一个直接调用compute(): 减少队列任务数量,降低内存开销,性能更好。
六、关键 API 方法详解
1. 任务方法(ForkJoinTask)
fork()将子任务提交到当前工作线程的本地双端队列,异步执行,无返回值。join()阻塞等待任务完成,返回计算结果;线程阻塞时会自动窃取其他任务,不浪费 CPU。invoke()同步执行当前任务,内部自动完成 fork+join,主线程调用阻塞等待最终结果(最常用提交方式)。invokeAll(task1,task2)批量 fork 多个任务,等价于多个 fork+join 组合。
2. ForkJoinPool 提交任务三种方式
invoke(task):同步阻塞,直接返回任务结果(递归计算首选)submit(task):异步提交,返回 ForkJoinTask,后续可 join/get 获取结果execute(task):异步无返回,类似普通线程池 execute
3. get () 和 join () 核心区别
join():只能抛出未检查异常(RuntimeException),递归拆分场景专用;get():继承 Future 方法,会抛出受检异常InterruptedException、ExecutionException,需要 try-catch 捕获; 递归分治代码优先使用join(),代码更简洁。
七、工作窃取机制演示对比(通俗举例)
假设 CPU4 核,4 条线程 T1/T2/T3/T4
- T1 分到超大任务,不断 fork 拆分,本地队列堆满子任务;
- T2/T3/T4 很快干完自己的小任务,队列为空;
- T2 去偷 T1 队列头部的子任务执行,T3、T4 同理;
- 所有 CPU 核心同时干活,不会出现资源闲置。
普通线程池做不到:如果 T1 持有大任务,其他线程只能空等全局队列,无法拆分窃取。
八、Fork/Join 典型业务应用场景
场景 1:大规模数值计算(最主流)
数组求和、求最大 / 最小值、矩阵运算、统计学批量计算。
场景 2:分治类算法并行化
归并排序、快速排序、二分查找并行优化。
场景 3:树形结构递归遍历
文件夹递归遍历、树形菜单批量数据处理、树节点统计。
场景 4:海量内存数据批量处理
千万级内存集合过滤、转换、聚合(非数据库 IO)。
九、ForkJoinPool 与 ThreadPoolExecutor 对比
表格
| 特性 | ForkJoinPool | 普通 ThreadPoolExecutor |
|---|---|---|
| 任务模型 | 支持递归拆分(fork/join) | 任务不可拆分,只能整体执行 |
| 队列模型 | 每个线程独立双端队列,工作窃取 | 全局共享单阻塞队列,竞争锁 |
| 适用负载 | CPU 密集、可分治计算 | IO 密集、独立零散小任务 |
| 线程等待 | join 阻塞时自动偷任务,无空闲 | get 阻塞线程原地等待,CPU 空转 |
| 结果合并 | 原生支持子任务结果汇总 | 需要手动维护集合汇总 Future |
十、使用注意事项(避坑重点)
1. 必须设置合理阈值 THRESHOLD
阈值太小:拆分层级极深,创建大量任务对象,GC 压力大; 阈值太大:拆分太少,并发度不足,多核利用率低; 经验值:单任务串行执行 1~5ms 左右最合适。
2. 禁止在 ForkJoin 任务中执行 IO 阻塞
如果子任务出现数据库 / 网络阻塞,对应工作线程被占死,无法窃取任务,并行效率大幅下降。IO 场景使用普通线程池。
3. 避免无限递归拆分
区间划分逻辑出错会无限 fork,直接栈溢出 StackOverflowError。
4. 线程池尽量复用,不要频繁新建
ForkJoinPool 创建开销大,全局单例复用;不要在循环里 new ForkJoinPool。
5. JDK8 Stream 底层就是 ForkJoin
并行流list.parallelStream()底层使用 ForkJoinPool 实现分治并行,本质是对 Fork/Join 框架的封装。 示例:
// 并行流求和,底层ForkJoin long sum = LongStream.rangeClosed(1, 100000).parallel().sum();十一、整体核心总结
- 核心设计:分治算法 + 工作窃取双端队列,专门优化多核 CPU 密集计算;
- 两大任务:RecursiveTask(有返回合并)、RecursiveAction(无返回纯执行);
- 核心方法:fork 拆分任务、join 阻塞合并结果、invoke 同步执行;
- 优势:线程空闲时自动窃取任务,CPU 利用率拉满;
- 局限:只适合纯内存计算,IO 阻塞场景不推荐,优先普通线程池。
