一、Fork-join
1.并发与并行
- 并发 是逻辑上的“同时”,强调处理多个任务的能力。
- 并行 是物理上的“同时”,强调真正同时执行多个任务。
2.Fork-join编程介绍
这是创建了一个工作窃取线程池。
2.1.它创建了什么?
- 类型:
ForkJoinPool的实例 - 特点:并行级别 = 当前机器的 CPU 核心数(例如 8 核 CPU → 8 个并行线程)
- 工作模式:工作窃取(Work-Stealing)
2.2.什么是工作窃取?
传统线程池(如 FixedThreadPool)每个线程有自己的任务队列。如果线程 A 做完了自己的任务,而线程 B 还有很多任务,A 只能闲着。这会造成资源浪费。
工作窃取模式:
- 每个线程都有双端队列(Deque)
- 自己的线程从队头取任务执行
- 空闲线程会去偷其他线程队尾的任务来执行
// 线程B的队列:任务1 任务2 任务3 任务4 任务5
// 线程B自己从头取: 取出任务1(执行中)
// 空闲的线程A来偷: 从尾偷任务5(执行中)// 结果:两个线程同时工作,减少空闲,提高吞吐量
2.3.代码示例与演示
ExecutorService executor = Executors.newWorkStealingPool();// 提交任务
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 1; i <= 10; i++) {final int taskId = i;tasks.add(() -> {String threadName = Thread.currentThread().getName();System.out.println("任务" + taskId + " 被 " + threadName + " 执行");Thread.sleep(500); // 模拟工作return "结果" + taskId;});
}// 执行所有任务并获取结果
List<Future<String>> futures = executor.invokeAll(tasks);
for (Future<String> future : futures) {System.out.println(future.get());
}executor.shutdown();
输出:
任务3 被 ForkJoinPool-1-worker-3 执行
任务1 被 ForkJoinPool-1-worker-1 执行
任务5 被 ForkJoinPool-1-worker-3 执行 // worker-3 偷到了任务5
任务2 被 ForkJoinPool-1-worker-2 执行
...
2.4.核心特性对比
| 特性 | newWorkStealingPool | newFixedThreadPool(4) |
|---|---|---|
| 默认线程数 | CPU核心数 | 固定N个 |
| 队列类型 | 每个线程有自己的双端队列 | 所有线程共享一个阻塞队列 |
| 空闲线程行为 | 去偷别人的任务 | 等待(可能空闲) |
| 适用场景 | 计算密集、任务产生子任务 | 通用场景 |
| 是否保证顺序 | ❌ 不保证 | ✅ 保证提交顺序 |
2.5.重要细节:并行度
// 获取CPU核心数
int cores = Runtime.getRuntime().availableProcessors();
System.out.println("CPU核心数:" + cores); // 假设是8// 等价于:
ExecutorService es1 = Executors.newWorkStealingPool();
// 并行度 = 8// 也可以指定并行度:
ExecutorService es2 = Executors.newWorkStealingPool(4);
// 并行度 = 4(即使CPU有8核,也只使用4个并行线程)
2.6.适用场景
2.6.1.很适合的场景
- 递归/分治任务(典型如归并排序、斐波那契数列计算)
- 计算密集型:如图像处理、大数据统计
- 任务会动态产生子任务:
// 工作窃取线程池特别适合这种
public List<String> processNode(Node node) {List<SubTask> subTasks = node.getChildren();// 每个子任务可能又产生更多子任务...
}
2.6.2.不适合的场景
- IO密集型任务(大量等待、阻塞):工作窃取线程在等待IO时不会去偷任务
- 需要保证任务执行顺序
- 所有任务相互独立且耗时相同:工作窃取的优势不明显
2.7.newWorkStealingPool 默认使用守护线程
ExecutorService es = Executors.newWorkStealingPool();
es.execute(() -> {Thread.sleep(10000);System.out.println("任务完成");
});
// 没有调用 es.shutdown()
// 程序会立即退出,因为守护线程不会阻止JVM关闭
对比 newFixedThreadPool 使用非守护线程,主线程结束后程序还会等待任务执行完毕。
二、BlockingQueue
1.基本概念介绍
Queue队列,先进先出的数据结构。BlockingQueue支持阻塞操作的队列。当队列为空的时候,获取元素的线程会等待队列为非空。当队列满的时候,存储元素的线程会等待队列可用
2.JDK阻塞队列实战与原理分析
2.1.Queue
- ArrayBlockingQueue:基于数组结构的有界阻塞队列(长度不可变)
- LinkedBlockingQueue:基于链表结构的有界阻塞队列(默认容量Integer.MAX_VALUE)
- LinkedTransferQueue:基于链表结构的无界阻塞/传递队列
- LinkedBlockingDueue:基于链表结构的有界阻塞双端队列(默认容量Integer.MAX_VALUE)
- SynchoronousQueue:不存储元素的阻塞/传递队列
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayQueue:支持延时获取元素的无界阻塞队列
2.2.ArrayBlockingQueue实现
public class MyArrayBlockingQueue <E>{// 存储队列中的元素final Object[] items;// 下次向哪个位置放置元素int takeIndex;// 下次从那个位置获取元素int putIndex;// items 中的元素个数int count;final ReentrantLock lock;// 队列非空private final Condition notEmpty;// 队列非满private final Condition notFull;final E itemAt(int i) {return (E) items[i];}public MyArrayBlockingQueue(int capacity, boolean fair) {if (capacity < 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock();notEmpty = lock.newCondition();notFull = lock.newCondition();}public MyArrayBlockingQueue(int capacity) {this(capacity, false);}public boolean put(E e) throws InterruptedException{Assert.assertNotNull(e);lock.lock();try {// 队列满了就等待,使用while的原因可能存在抢锁失败while (count == items.length)notFull.await();items[putIndex] = e;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signalAll();return true;} finally {lock.unlock();}}public E take() throws InterruptedException {lock.lock();try {// 队列空了就等待,使用while的原因可能存在抢锁失败while (count == 0)notEmpty.await();E e = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count --;notFull.signalAll();return e;} finally {lock.unlock();}}
}