Java 异步编程之 Thread、Runnable、Callable、CompletableFuture 与线程池实战
一、为什么需要异步编程?
在后端开发中,一个接口里经常会做很多事情。
比如用户下单:
1. 创建订单
2. 扣减库存
3. 扣减余额
4. 发送短信
5. 写操作日志
6. 通知第三方系统
其中:创建订单、扣库存、扣余额 属于核心流程,通常需要同步执行。
但是:发送短信、写日志、通知第三方 不一定要阻塞主流程,可以考虑异步执行。
异步编程的价值是:
提升接口响应速度
提高系统吞吐量
减少主线程等待时间
更好地利用 CPU 和 IO 资源
但是异步也不是万能的,它会带来:
异常处理复杂
事务边界复杂
线程安全问题
线程池资源管理问题
所以学习异步编程,不能只会写代码,还要理解它的底层逻辑和适用场景。
二、Java 线程基础:Thread
Thread是 Java 中表示线程的类。
最基础的创建线程方式是继承Thread,重写run()方法。
1. 继承 Thread 创建线程
public class MyThread extends Thread { @Override public void run() { System.out.println("子线程执行:" + Thread.currentThread().getName()); } public static void main(String[] args) { MyThread thread = new MyThread(); thread.start(); System.out.println("主线程执行:" + Thread.currentThread().getName()); } }可能输出:
主线程执行:main
子线程执行:Thread-0
线程执行顺序不是固定的,具体由操作系统调度决定。
2. start() 和 run() 的区别
thread.start(); 表示真正启动一个新线程。
thread.run(); 只是普通方法调用,不会创建新线程。
3. 为什么不推荐直接继承 Thread?
虽然继承Thread可以创建线程,但实际开发中不推荐。
原因有三个:
1. Java 是单继承,继承 Thread 后就不能继承其他类。
2. 线程和任务耦合,不利于代码复用。
3. 生产环境更推荐使用线程池,而不是频繁 new Thread。
三、任务模型一:Runnable
Runnable是一个任务接口,表示一个没有返回值的任务。
@FunctionalInterface public interface Runnable { void run(); }它只有一个run()方法。
1. 实现 Runnable
public class SendSmsTask implements Runnable { @Override public void run() { System.out.println("发送短信:" + Thread.currentThread().getName()); } public static void main(String[] args) { SendSmsTask task = new SendSmsTask(); Thread thread = new Thread(task); thread.start(); } }这里的结构比继承Thread更清晰:
SendSmsTask 负责描述任务
Thread 负责执行任务。
2. 匿名内部类写法
public class RunnableDemo { public static void main(String[] args) { Runnable task = new Runnable() { @Override public void run() { System.out.println("异步写日志:" + Thread.currentThread().getName()); } }; new Thread(task).start(); } }3. Lambda 写法
因为Runnable是函数式接口,所以可以用 Lambda 简化:
public class RunnableLambdaDemo { public static void main(String[] args) { new Thread(() -> { System.out.println("异步任务执行:" + Thread.currentThread().getName()); }).start(); } }Lambda 只是简化写法,本质还是实现Runnable的run()方法。
4. Runnable 的特点
没有返回值
不能直接抛出受检异常
可以交给 Thread 执行
可以交给线程池执行
四、任务模型二:Callable
Runnable没有返回值,如果我们希望异步任务执行完成后返回一个结果,就可以使用Callable。
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }Callable有两个特点:
有返回值
可以抛出异常
1. Callable 示例
import java.util.concurrent.Callable; public class QueryUserTask implements Callable<String> { @Override public String call() throws Exception { System.out.println("查询用户信息:" + Thread.currentThread().getName()); Thread.sleep(1000); return "用户信息:张三"; } }但是注意,Thread不能直接执行Callable。
下面这种写法是错误的:
FutureTask<String> task = new FutureTask(new QueryUserTask()); Thread thread = new Thread(task); // 编译错误原因是 thread 的构造方法接收 Runnable,不接收 Callable。
所以Callable如果想交给Thread执行,需要借助FutureTask。
FutureTask<String> futureTask = new QueryUserTask<>(); Thread thread = new Thread(futureTask); thread.start();六、FutureTask:连接 Callable 和 Thread 的桥梁
FutureTask很重要。
它既是Runnable,又是Future。
源码关系大概是:
public class FutureTask<V> implements RunnableFuture<V> { }而:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }所以:
FutureTask 可以被 Thread 执行。 FutureTask 也可以通过 get() 获取结果。1. Callable + FutureTask + Thread
import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static void main(String[] args) throws Exception { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { System.out.println("子线程开始查询用户:" + Thread.currentThread().getName()); Thread.sleep(2000); return "用户信息:张三"; } }; FutureTask<String> futureTask = new FutureTask<>(callable); Thread thread = new Thread(futureTask); thread.start(); System.out.println("主线程继续执行:" + Thread.currentThread().getName()); String result = futureTask.get(); //同步阻塞,等待获取异步结果 System.out.println("获取异步结果:" + result); } }2. FutureTask 的缺点
FutureTask适合简单异步任务,但缺点也明显:
get() 会阻塞
不方便做回调
不方便组合多个任务
异常处理不够优雅
任务编排能力弱
七、CompletableFuture:更强大的异步编排工具
FutureTask能解决简单异步任务,但是它不适合复杂任务编排。
例如商品详情页需要同时查:
商品信息
库存信息
价格信息
优惠券信息
评价信息
如果串行查:
商品 200ms
库存 300ms
价格 200ms
优惠券 400ms
评价 300ms
总耗时约 1400ms
如果并行查:
多个任务同时执行
总耗时约等于最慢的任务,也就是 400ms 左右
这类场景就适合使用CompletableFuture。
1. CompletableFuture 的定位
CompletableFuture可以理解为:
Future 的增强版 + 异步任务编排工具。
它支持:
异步执行
任务回调
任务串联
任务合并
多个任务并行
异常兜底
八、CompletableFuture 常用方法详解
1. runAsync:执行无返回值任务
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("异步发送短信:" + Thread.currentThread().getName()); } }, executor); future.join();适合:
发送短信
写日志
清理缓存
异步通知
因为没有返回值,所以类型是:CompletableFuture<Void>
2. supplyAsync:执行有返回值任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("查询用户:" + Thread.currentThread().getName()); return "用户信息:张三"; }, executor); String result = future.join(); System.out.println(result);supplyAsync适合有返回结果的任务。
3. thenApply:接收结果,并返回新结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "用户1001"; }, executor).thenApply(userInfo -> { return userInfo + " 的订单信息"; }); String result = future.join(); System.out.println(result);输出:用户1001 的订单信息
特点:接收上一步结果 返回一个新结果
4. thenAccept:接收结果,不返回新结果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "订单创建成功"; }, executor).thenAccept(result -> { System.out.println("发送通知:" + result); }); future.join();特点:接收上一步结果 没有返回值
5. thenRun:不接收结果,也不返回结果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "订单结果"; }, executor).thenRun(() -> { System.out.println("订单流程结束,记录日志"); }); future.join();特点:
不关心上一步结果
只是在上一步完成后继续执行
6. thenApply 和 thenApplyAsync 的区别
这是重点。
thenApply() 不一定开启新线程,通常由上一步任务完成的线程继续执行。
thenApplyAsync() 会把下一步重新提交到线程池执行。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("第一步:" + Thread.currentThread().getName()); return "用户信息"; }, executor).thenApplyAsync(result -> { System.out.println("第二步:" + Thread.currentThread().getName()); return result + " + 订单信息"; }, executor);选择建议:
7. thenCombine:合并两个任务结果
如果两个任务互不依赖,可以并行执行,最后合并结果。
CompletableFuture<UserDTO> userFuture = CompletableFuture.supplyAsync(() -> { return userService.queryUserInfo(1001L); }, executor); CompletableFuture<OrderDTO> orderFuture = CompletableFuture.supplyAsync(() -> { return orderService.queryOrderInfo(1001L); }, executor); CompletableFuture<UserOrderDTO> resultFuture = userFuture.thenCombine(orderFuture, (UserDTO user, OrderDTO order) -> { UserOrderDTO dto = new UserOrderDTO(); dto.setUser(user); dto.setOrder(order); return dto; }); UserOrderDTO result = resultFuture.join();thenCombine的意思是:
等两个任务都完成后,拿到两个任务结果,再合并成一个新结果。
8. allOf:等待多个任务全部完成
CompletableFuture<UserDTO> userFuture = CompletableFuture.supplyAsync(() -> userService.queryUserInfo(1001L), executor); CompletableFuture<OrderDTO> orderFuture = CompletableFuture.supplyAsync(() -> orderService.queryOrderInfo(1001L), executor); CompletableFuture<CouponDTO> couponFuture = CompletableFuture.supplyAsync(() -> couponService.queryCoupon(1001L), executor); CompletableFuture.allOf(userFuture, orderFuture, couponFuture).join(); UserDTO user = userFuture.join(); OrderDTO order = orderFuture.join(); CouponDTO coupon = couponFuture.join();注意:
CompletableFuture.allOf(...)
返回的是:
CompletableFuture<Void>
每个任务结果还需要分别:
userFuture.join();
orderFuture.join();
couponFuture.join();
9. anyOf:任意一个任务完成即可
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleep(3000); return "服务A返回结果"; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleep(1000); return "服务B返回结果"; }, executor); Object result = CompletableFuture.anyOf(future1, future2).join(); System.out.println(result);输出大概率是: 服务B返回结果
适合场景:
多个服务查同一份数据
谁先返回就用谁
容灾查询
九、CompletableFuture 异常处理
异步任务也可能失败。
如果不处理异常:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return "success"; }, executor); String result = future.join();join()会抛出:CompletionException
1. exceptionally:异常兜底
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return "success"; }, executor).exceptionally(ex -> { System.out.println("异步任务异常:" + ex.getMessage()); return "默认结果"; }); String result = future.join(); System.out.println(result);输出: 默认结果
exceptionally只在异常时执行。
2. handle:成功失败都处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "正常结果"; }, executor).handle((result, ex) -> { if (ex != null) { return "失败兜底结果"; } return result; }); String result = future.join(); System.out.println(result);handle的特点:
成功也执行 失败也执行 可以返回新的结果
3. whenComplete:只做观察,不改变结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "订单创建成功"; }, executor).whenComplete((result, ex) -> { if (ex != null) { System.out.println("任务失败:" + ex.getMessage()); } else { System.out.println("任务成功:" + result); } }); String result = future.join();whenComplete适合:
打印日志
监控埋点
记录任务状态
十、线程池:生产环境不建议直接 new Thread
前面的代码里,我们经常写:new Thread(task).start();
但是生产环境不建议频繁这么写。
原因是:
线程创建销毁有成本
线程数量不可控
高并发下可能创建大量线程,压垮服务器
不方便统一管理线程
所以实际开发中更推荐使用线程池。
1. ExecutorService 执行 Runnable
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorRunnableDemo { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(3); pool.execute(new Runnable() { @Override public void run() { System.out.println("执行 Runnable 任务:" + Thread.currentThread().getName()); } }); pool.shutdown(); } }execute()用于执行没有返回值的任务。
2. ExecutorService 执行 Callable
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorCallableDemo { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("执行 Callable 任务:" + Thread.currentThread().getName()); Thread.sleep(1000); return "订单查询结果"; } }); String result = future.get(); System.out.println("获取结果:" + result); pool.shutdown(); } }这里:pool.submit(callable);
返回的是: Future<String>
然后通过:future.get(); 获取结果。
3. execute 和 submit 的区别
异常表现也不同:
execute() 执行任务异常,通常会直接打印异常。
submit() 执行任务异常,异常会封装进 Future,调用 get() 时才抛出。
4. ThreadPoolExecutor:推荐的线程池创建方式
虽然下面写法简单:
ExecutorService pool = Executors.newFixedThreadPool(10);
但是生产环境更推荐手动创建ThreadPoolExecutor。
原因是Executors一些方法底层使用无界队列,任务过多时可能造成内存压力。
推荐写法
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolConfigDemo { public static ThreadPoolExecutor buildExecutor() { return new ThreadPoolExecutor( 10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }核心参数说明
new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler );线程池执行流程
假设:
corePoolSize = 10
maximumPoolSize = 20
queue = 100
执行流程:
常见拒绝策略
十一、CompletableFuture + 线程池实战:商品详情页聚合查
1. 业务场景
商品详情页需要查询:
商品基本信息
库存信息
价格信息
优惠券信息
评价信息
这些查询之间没有强依赖,可以并行执行。
2. 线程池配置
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class AsyncExecutorConfig { public static ThreadPoolExecutor buildExecutor() { return new ThreadPoolExecutor( 10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }3. DTO 示例
public class ProductDetailDTO { private ProductDTO product; private StockDTO stock; private PriceDTO price; private CouponDTO coupon; private CommentDTO comment; public ProductDTO getProduct() { return product; } public void setProduct(ProductDTO product) { this.product = product; } public StockDTO getStock() { return stock; } public void setStock(StockDTO stock) { this.stock = stock; } public PriceDTO getPrice() { return price; } public void setPrice(PriceDTO price) { this.price = price; } public CouponDTO getCoupon() { return coupon; } public void setCoupon(CouponDTO coupon) { this.coupon = coupon; } public CommentDTO getComment() { return comment; } public void setComment(CommentDTO comment) { this.comment = comment; } }简单 DTO:
public class ProductDTO { private Long productId; private String productName; public ProductDTO(Long productId, String productName) { this.productId = productId; this.productName = productName; } }public class StockDTO { private Integer stock; public StockDTO(Integer stock) { this.stock = stock; } }public class StockDTO { private Integer stock; public StockDTO(Integer stock) { this.stock = stock; } }public class CouponDTO { private String couponName; public CouponDTO(String couponName) { this.couponName = couponName; } }public class CommentDTO { private Integer commentCount; public CommentDTO(Integer commentCount) { this.commentCount = commentCount; } }4. 模拟 Service
public class ProductService { public ProductDTO queryProduct(Long productId) { sleep(200); return new ProductDTO(productId, "iPhone 15"); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }public class StockService { public StockDTO queryStock(Long productId) { sleep(300); return new StockDTO(100); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }public class PriceService { public PriceDTO queryPrice(Long productId) { sleep(200); return new PriceDTO(5999); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }public class CouponService { public CouponDTO queryCoupon(Long productId) { sleep(400); return new CouponDTO("满5000减300"); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }public class CouponService { public CouponDTO queryCoupon(Long productId) { sleep(400); return new CouponDTO("满5000减300"); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }5. 聚合查询实现
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; public class ProductDetailService { private final ProductService productService = new ProductService(); private final StockService stockService = new StockService(); private final PriceService priceService = new PriceService(); private final CouponService couponService = new CouponService(); private final CommentService commentService = new CommentService(); private final ThreadPoolExecutor executor = AsyncExecutorConfig.buildExecutor(); public ProductDetailDTO queryProductDetail(Long productId) { CompletableFuture<ProductDTO> productFuture = CompletableFuture.supplyAsync(() -> { return productService.queryProduct(productId); }, executor); CompletableFuture<StockDTO> stockFuture = CompletableFuture.supplyAsync(() -> { return stockService.queryStock(productId); }, executor); CompletableFuture<PriceDTO> priceFuture = CompletableFuture.supplyAsync(() -> { return priceService.queryPrice(productId); }, executor); CompletableFuture<CouponDTO> couponFuture = CompletableFuture.supplyAsync(() -> { return couponService.queryCoupon(productId); }, executor).exceptionally(ex -> { System.out.println("优惠券查询失败:" + ex.getMessage()); return null; }); CompletableFuture<CommentDTO> commentFuture = CompletableFuture.supplyAsync(() -> { return commentService.queryComment(productId); }, executor).exceptionally(ex -> { System.out.println("评价查询失败:" + ex.getMessage()); return new CommentDTO(0); }); CompletableFuture.allOf( productFuture, stockFuture, priceFuture, couponFuture, commentFuture ).join(); ProductDetailDTO detailDTO = new ProductDetailDTO(); detailDTO.setProduct(productFuture.join()); detailDTO.setStock(stockFuture.join()); detailDTO.setPrice(priceFuture.join()); detailDTO.setCoupon(couponFuture.join()); detailDTO.setComment(commentFuture.join()); return detailDTO; } }6. 测试代码
public class ProductDetailTest { public static void main(String[] args) { ProductDetailService productDetailService = new ProductDetailService(); long start = System.currentTimeMillis(); ProductDetailDTO detailDTO = productDetailService.queryProductDetail(1001L); long end = System.currentTimeMillis(); System.out.println("查询完成,耗时:" + (end - start) + "ms"); System.out.println(detailDTO); } }串行查询大约需要:200 + 300 + 200 + 400 + 300 = 1400ms
并行查询大约需要:取最慢任务耗时,大约 400ms
这就是CompletableFuture + 线程池的典型应用场景。
十二、异步编程中的事务问题
异步编程一定要注意事务。
错误示例:
@Transactional public void createOrder(Long userId, Long productId) { orderMapper.insertOrder(userId, productId); CompletableFuture.runAsync(() -> { stockMapper.deductStock(productId); }, executor); accountMapper.deductBalance(userId); }很多人以为:
createOrder() 方法加了 @Transactional,
所以里面所有数据库操作都在一个事务里。
但这是错误的。
因为:
Spring 事务默认绑定当前线程。
CompletableFuture 开启的是新线程。
新线程不会自动加入外层 @Transactional 事务。
也就是说:stockMapper.deductStock(productId); 通常不在外层事务中
如果外层事务回滚,异步线程里的库存扣减不一定回滚,可能造成数据不一致。
核心写流程建议同步事务执行
@Transactional public void createOrder(Long userId, Long productId) { orderMapper.insertOrder(userId, productId); stockMapper.deductStock(productId); accountMapper.deductBalance(userId); }如果是跨服务场景,要考虑:
可靠消息
本地消息表
TCC
Saga
补偿机制
幂等控制
最终一致性
十三、异步编程最佳实践
1. 不要所有任务都异步
适合异步的任务:
耗时任务
非核心任务
可以延迟完成的任务
失败后可以补偿的任务
多个互不依赖的查询任务
2. 一定要使用自定义线程池
不要大量使用:CompletableFuture.supplyAsync(() -> queryData());
因为默认使用的是:ForkJoinPool.commonPool()
这是公共线程池,不适合承载大量业务任务。
推荐:
ompletableFuture.supplyAsync(() -> queryData(), executor);
3. 一定要处理异常
不要写成:
CompletableFuture.supplyAsync(() -> { return remoteService.query(); }, executor);推荐加异常处理:
CompletableFuture.supplyAsync(() -> { return remoteService.query(); }, executor).exceptionally(ex -> { System.out.println("查询失败:" + ex.getMessage()); return defaultValue; });4. join 也会阻塞
虽然CompletableFuture是异步的,但是future.join();
仍然会阻塞当前线程。
如果任务还没有完成,当前线程会等待。
所以:异步的是任务执行。 阻塞的是最终等待结果的 join。
5. 注意线程池隔离
不同业务最好使用不同线程池。
比如:
订单线程池 短信线程池 报表线程池 文件处理线程池不要所有异步任务共用一个线程池。
否则某个慢任务堆积,可能拖垮其他业务。
十四、核心知识点总结
