当前位置: 首页 > news >正文

Java CompletableFuture 接口与原理详解

CompletableFuture是 Java 8 引入的一个强大且灵活的异步编程工具类,位于java.util.concurrent包中。它同时实现了Future<T>CompletionStage<T>两个接口,不仅支持获取异步计算结果,还提供了丰富的链式调用、组合、异常处理等能力,是构建高性能、非阻塞、响应式应用的核心组件之一。

CompletableFuture 常见接口

CompletableFuture核心接口包括:创建、链式调用、组合、异常处理、聚合、执行策略控制等。

场景方法
异步生成值supplyAsync(Supplier<T>)
异步执行无返回runAsync(Runnable)
转换结果thenApply, thenApplyAsync
消费结果thenAccept, thenAcceptAsync
无参动作thenRun, thenRunAsync
顺序依赖thenCompose
合并两个结果thenCombine, thenAcceptBoth
任一完成applyToEither, acceptEither
多任务全完成allOf
多任务任一完成anyOf
异常恢复exceptionally
统一处理handle
最终清理whenComplete
手动完成complete(T), completeExceptionally(Throwable)

创建 CompletableFuture

手动完成(无异步)

CompletableFuture<String> future = new CompletableFuture<>(); future.complete("Manual result"); System.out.println(future.join()); // Manual result

异步执行(有返回值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Async result"; }); System.out.println(future.join()); // Async result

异步执行(无返回值)

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Running async task"); }); future.join(); // 等待完成

可传入自定义线程池

ExecutorService executor = Executors.newFixedThreadPool(2); CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Custom pool", executor);

链式转换与消费(单阶段)

thenApply– 转换结果(同步)

CompletableFuture<Integer> f = CompletableFuture .supplyAsync(() -> "100") .thenApply(Integer::parseInt) .thenApply(x -> x * 2); System.out.println(f.join()); // 200

thenApplyAsync– 异步转换(新线程)

CompletableFuture<String> f = CompletableFuture .completedFuture("hello") .thenApplyAsync(s -> { System.out.println("Thread: " + Thread.currentThread().getName()); return s.toUpperCase(); }); System.out.println(f.join()); // HELLO(在 ForkJoinPool 线程中执行)

thenAccept– 消费结果(无返回)

CompletableFuture .supplyAsync(() -> "World") .thenAccept(s -> System.out.println("Hello " + s)); // Hello World

thenRun– 无输入

CompletableFuture .supplyAsync(() -> "ignored") .thenRun(() -> System.out.println("Task done!"));

扁平化嵌套(依赖另一个 CompletableFuture)

thenCompose– 顺序依赖(类似 flatMap)

thenCompose适用于第二个异步操作依赖第一个的结果,并且第二个异步操作也希望继续返回CompletableFuture<T>的场景。

用 thenCompose 避免 CompletableFuture<CompletableFuture<T>> 嵌套:

CompletableFuture<String> getUser = CompletableFuture.supplyAsync(() -> "Alice"); CompletableFuture<Integer> getLength = getUser.thenCompose(name -> CompletableFuture.supplyAsync(() -> name.length()) ); // 1、可以直接通过 getLength.join() 得到5 // System.out.println(getLength.join()); // 2、也可以继续更多的链式调用: CompletableFuture<String> userLevel = getLength.thenCompose(len -> CompletableFuture.supplyAsync(() -> { if (len >= 5) return "VIP"; else return "Normal"; }) ); System.out.println(userLevel.join()); // 直接通过 userLevel.join() 得到: VIP

对比错误写法(产生嵌套 future,难以处理):

CompletableFuture<String> getUser = CompletableFuture.supplyAsync(() -> "Alice"); CompletableFuture<CompletableFuture<Integer>> getLength = getUser.thenApply(name -> CompletableFuture.supplyAsync(() -> name.length()) // CompletableFuture<Integer> ); // 最终返回类型是 CompletableFuture<CompletableFuture<Integer>> // 无法直接 .join() 得到 5,也难以增加更多的链式调用

当然,如果第二个异步操作不返回CompletableFuture,而是返回 String 等普通类型,那么使用thenApplyAsync就可以:

// 转换为普通值,可以不必要使用thenCompose: CompletableFuture<String> f1 = getUserIdAsync() .thenApplyAsync(id -> "User-" + id); // 第二个异步操作里直接返回 String System.out.println(f1.join());

组合两个 CompletableFuture

thenCombine– 合并两个结果(AND)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + " " + b); System.out.println(combined.join()); // Hello World

thenAcceptBoth– 消费两个结果

f1.thenAcceptBoth(f2, (a, b) -> System.out.println(a + " " + b));

runAfterBoth– 两者都完成后执行

f1.runAfterBoth(f2, () -> System.out.println("Both done"));

任一完成即响应(OR)

applyToEither– 返回第一个完成的结果(可转换)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { sleep(2000); return "Slow"; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "Fast"; }); String result = f1.applyToEither(f2, s -> s + " wins!"); System.out.println(result); // Fast wins!

acceptEither– 消费第一个结果

f1.acceptEither(f2, System.out::println); // Fast

runAfterEither– 任一完成后执行

f1.runAfterEither(f2, () -> System.out.println("One finished"));

多任务聚合

allOf– 所有完成(无返回值)

CompletableFuture<Void> all = CompletableFuture.allOf( CompletableFuture.runAsync(() -> sleep(1000)), CompletableFuture.runAsync(() -> sleep(1500)), CompletableFuture.runAsync(() -> sleep(800)) ); all.join(); // 等待全部完成(约 1500ms) System.out.println("All tasks done");

若需获取所有结果:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A"); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B"); CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2); all.join(); System.out.println(f1.join() + f2.join()); // AB

anyOf– 任一完成即返回(返回 Object)

CompletableFuture<Object> any = CompletableFuture.anyOf( CompletableFuture.supplyAsync(() -> "First"), CompletableFuture.supplyAsync(() -> { sleep(1000); return "Second"; }) ); System.out.println(any.join()); // First(类型为 Object,需强转)

异常处理

exceptionally– 仅处理异常(类似 catch)

CompletableFuture<String> f = CompletableFuture .supplyAsync(() -> { throw new RuntimeException("Error!"); }) .exceptionally(ex -> "Fallback: " + ex.getMessage()); System.out.println(f.join()); // Fallback: java.lang.RuntimeException: Error!

handle– 统一处理正常/异常结果

CompletableFuture<String> f = CompletableFuture .supplyAsync(() -> { throw new RuntimeException("Oops!"); }) .handle((result, ex) -> { // 可以统一处理结果,常用于提供fallback、错误恢复、统一结果格式等 if (ex != null) { return "Default Value"; // 吞掉异常,返回默认值 } return result; // 此处还可以修改返回值 }); System.out.println(f.join()); // 输出: Default Value(无异常!)

whenComplete– 类似 finally(不改变结果)

CompletableFuture<String> f = CompletableFuture .supplyAsync(() -> { throw new RuntimeException("Oops!"); }) .whenComplete((result, ex) -> { // 不可干预结果,常用于记录日志、关闭资源、指标统计等副作用操作 if (ex != null) { System.out.println("Logged error: " + ex.getMessage()); } }); // 异常仍然会抛出! f.join(); // 抛出 CompletionException -> RuntimeException("Oops!")

whenComplete不改变返回值,即使抛异常也会传播原始异常。

完成状态检查与获取

CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Done"); // 阻塞等待(推荐) String result = f.join(); // 不抛出受检异常 // 或使用 get(抛出 InterruptedException / ExecutionException) // String result = f.get(); // 检查状态 System.out.println(f.isDone()); // true System.out.println(f.isCompletedExceptionally()); // false System.out.println(f.isCancelled()); // false

推荐使用join()而非get(),避免处理受检异常。

执行策略控制(同步 vs 异步回调)

方法执行线程
thenApply前一个任务的完成线程(可能是同步或异步)
thenApplyAsync总是在另一个线程中执行(默认 ForkJoinPool.commonPool(),可指定 Executor)
CompletableFuture.supplyAsync(() -> { System.out.println("Stage1: " + Thread.currentThread().getName()); return "data"; }) .thenApply(s -> { System.out.println("thenApply (sync): " + Thread.currentThread().getName()); return s; }) .thenApplyAsync(s -> { System.out.println("thenApplyAsync: " + Thread.currentThread().getName()); return s; }) .join();

输出示例:

Stage1: ForkJoinPool.commonPool-worker-1 thenApply (sync): ForkJoinPool.commonPool-worker-1 thenApplyAsync: ForkJoinPool.commonPool-worker-2

为了避免阻塞主线程(因为可能不确定前一个任务是同步还是异步), I/O 或耗时操作一般建议都使用xxxAsync

完整实战示例:电商下单流程

ExecutorService ioPool = Executors.newFixedThreadPool(3); CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> { // 1. 创建订单 return "ORDER-1001"; }, ioPool); CompletableFuture<String> payment = order.thenCompose(ordId -> CompletableFuture.supplyAsync(() -> { // 2. 支付(依赖订单ID) return "PAID-" + ordId; }, ioPool) ); CompletableFuture<String> inventory = CompletableFuture.supplyAsync(() -> { // 3. 扣减库存(并行) return "INVENTORY-OK"; }, ioPool); CompletableFuture<String> shipping = payment.thenCombine(inventory, (pay, inv) -> { // 4. 发货(需支付成功 + 库存扣减) return "SHIPPED-" + pay; }); // 异常兜底 CompletableFuture<String> finalResult = shipping.exceptionally(ex -> { System.err.println("Order failed: " + ex.getMessage()); return "FAILED"; }); System.out.println(finalResult.join()); // SHIPPED-PAID-ORDER-1001 ioPool.shutdown();

CompletableFuture 实现原理分析

核心数据结构

CompletableFuture的核心是非阻塞式异步计算,通过注册回调函数(如thenApply,thenAccept等)在结果就绪时自动触发后续操作。

关键字段与结构:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { volatile Object result; // 存储结果或异常(BiRecord/AltResult) volatile Completion stack; // 指向依赖的 Completion 链表(栈结构)的头指针 }
  • result字段
    • 若为null:未完成。
    • 若为AltResult:表示异常或 null 值。
    • 若为普通对象:表示成功完成的结果。
  • stack字段
    • 类型为Completion(抽象类),是一个栈式单向链表(LIFO,后进先出),记录所有依赖当前CompletableFuture的后续操作(即“依赖图”),其注册顺序是后注册的靠前(栈顶),执行顺序是先执行栈顶(后注册的)。
    • 所有thenApplythenCompose等方法都会创建一个Completion子类实例(如UniApply,BiAccept,ThenCompose等),并压入此栈。

Completion(抽象类)是所有回调动作的基类,代表“当某个 future 完成后要做的事”。常见子类包括:

子类作用
UniApply对应 thenApply
UniAccept对应 thenAccept
BiApply对应 thenCombine
ThenCompose对应 thenCompose
AsyncRun对应 runAsync

核心流程源码分析

CompletableFuture生命周期的核心流程是:

  • 注册回调(构建依赖)
  • 完成任务(设置结果)
  • 触发依赖(传播完成)

一个典型流程如下(JDK 8):

CompletableFuture<Integer> future1 = new CompletableFuture<>(); // 注册回调(构建依赖) CompletableFuture<String> future2 = future1.thenApplyAsync(x -> "val=" + x); CompletableFuture<Void> future3 = future2.thenAccept(System.out::println); // 完成任务(设置结果),complete 内部的 postComplete 会触发依赖(传播完成) future1.complete(42); // 触发 complete() → completeValue() → postComplete() // postComplete() 会依次触发 f2(UniApply),然后 f3(UniAccept)

“注册回调”源码分析:

public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { // 当当前 CompletableFuture 完成后,在指定线程池(这里是 asyncPool)中异步执行 fn 函数, // 并返回一个新的 CompletableFuture<U> 来表示这个转换的结果。 return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); // 创建一个新的 CompletableFuture<V> 对象 d,作为最终返回值(即 thenApplyAsync 返回的那个 future) CompletableFuture<V> d = new CompletableFuture<V>(); // 只要需要异步执行(e != null),或者无法立即完成(当前 future 未完成),就进入后续的“注册回调”逻辑。 if (e != null || !d.uniApply(this, f, null)) { // 创建一个 UniApply 对象 c ,代表一个“单输入应用操作”(unary apply) // UniApply 是 Completion 的一种具体实现 CompletableFuture.UniApply<T,V> c = new CompletableFuture.UniApply<T,V>(e, d, this, f); // 将 c(即这个依赖任务)压入当前 CompletableFuture(this)的栈式依赖链表中, // 这样,当 this 完成时,会遍历所有注册的依赖任务(如 c)并触发它们 push(c); // 尝试立即触发这个依赖任务(一种优化,避免“刚注册就完成”时的延迟) c.tryFire(SYNC); } return d; }

“完成任务”与“触发依赖”源码分析:

public boolean complete(T value) { // 尝试以正常值完成 future:返回 true 表示本次 CAS 成功,即我们是第一个完成者。 boolean triggered = completeValue(value); // 无论是否由本线程完成,只要当前 CompletableFuture 已完成(包括刚被我们完成), // 就调用 postComplete(),触发所有已注册的依赖任务(如 thenApply, thenAccept 等)。 // postComplete() 是一个非递归的、线程安全的、广度+深度混合遍历器, // 用于在 future 完成时,可靠地唤醒整个依赖网络,且不会栈溢出、不会死锁、不会漏任务。 postComplete(); return triggered; } // 以非异常结果完成 future,除非它已经完成 final boolean completeValue(T t) { return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t); } // 当确定当前 future 已完成时,弹出并尝试触发所有可达的依赖任务 // 此方法应在 future 完成后调用(如 complete()、obtrudeValue()、内部完成逻辑等) final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. */ CompletableFuture<?> f = this; CompletableFuture.Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; CompletableFuture.Completion t; if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } }

注册 → 完成 → 传播的流程总结:

  • 当调用thenApplyAsync等方法时,会创建一个表示后续操作的Completion(如UniApply),若当前任务未完成,则将其压入自身的stack依赖栈中(注册
  • 当任务通过complete(value)被完成时,使用 CAS 原子地设置result字段(完成);
  • 随后立即调用postComplete(),从stack中逐个弹出并执行所有已注册的Completion,每个Completion在执行时会消费当前结果、计算新值,并完成其关联的下游CompletableFuture,从而递归触发整个依赖链的级联执行(传播)。
  • 整个过程无锁、非阻塞,依靠 volatile + CAS + 回调栈实现高效异步流水线。

任务依赖结构图解

CompletableFuture的依赖关系可从两个层面理解:

  • Future 层的依赖(逻辑关系):不同CompletableFuture实例之间的依赖关系。具体来说,当一个future完成后,它会触发另一个future的完成。这种依赖关系是由Completion对象来管理的。
  • Completion 链表层的依赖(存储关系):每个CompletableFuture内部维护了一个单向链表,用于存储所有依赖于该futureCompletion对象。这些Completion对象代表了“当这个 future 完成后要执行的操作”。
层级名称结构作用
第一层Future 依赖图DAG(有向无环图)描述“哪个 future 依赖哪个”的逻辑关系
第二层Completion 链表每个 future 内部的单向链表(栈)存储“当这个 future 完成后要执行哪些具体操作”

以下面代码为例:

CompletableFuture<String> f1 = new CompletableFuture<>(); // 第一层:f1 完成后,触发两个独立的后续 future CompletableFuture<Integer> f2 = f1.thenApply(s -> s.length()); // 分支 A CompletableFuture<String> f3 = f1.thenApply(s -> s.toUpperCase()); // 分支 B // 第二层:f2 和 f3 各自又有多个下游 CompletableFuture<Void> f4 = f2.thenAccept(x -> System.out.println("Len: " + x)); // f2 → f4 CompletableFuture<Void> f5 = f2.thenAccept(x -> System.out.println("Double: " + x * 2)); // f2 → f5 CompletableFuture<Void> f6 = f3.thenAccept(s -> System.out.println("Upper: " + s)); // f3 → f6

这个例子中,f1是源头;f2f3并行依赖于f1f2有两个下游f4,f5f3有一个下游f6

逻辑层面的依赖:Future 之间的依赖关系(DAG 图)

存储层面的依赖每个 Future 内部的 Completion 链表

  • Completion 链表是实现 DAG 依赖的底层机制:每个“依赖边”都对应一个Completion对象。
  • 整个系统是一个由 Completion 链表组成的网络,通过postComplete()动态传播完成状态。

每个Completion都持有:

  • src: 源CompletionFuture(当前这个completion所属的future
  • dep: 目标CompletionFuture(要被完成的那个future
  • fn: 要执行的函数

执行流程(当f1.complete("hello")被调用):

  1. f1完成,值为"hello"
  2. f1.postComplete()开始处理f1.stack
  • 先弹出c2f3的任务):
    • 执行toUpperCase("hello")"HELLO"
    • 完成f3(设置其 result)
    • 触发f3.postComplete()
      • 执行c5:打印"Upper: HELLO"
  • 再弹出c1f2的任务):
    • 执行length("hello")5
    • 完成f2
    • 触发f2.postComplete()
      • 先执行c4:打印"Double: 10"
      • 再执行c3:打印"Len: 5"

注意:虽然f2f3是并行分支,但在这个单线程完成场景下,它们是串行执行的(因为postComplete是循环处理)。但在异步或并发场景中,它们可能真正并行(如果用了不同线程池)。

http://www.jsqmd.com/news/199948/

相关文章:

  • 紧急警告:Dify凭证配置不当可能导致系统被入侵?立即检查这3项设置
  • Dify DOCX图片提取难题全攻克:从模糊识别到高保真导出的完整流程
  • JavaScript Promise封装GLM-4.6V-Flash-WEB异步调用
  • PyCharm模板代码提升GLM-4.6V-Flash-WEB开发效率
  • java中去掉字符串中的第一次出现的某个子字符串,以and为例
  • MyBatisPlus动态SQL结合GLM-4.6V-Flash-WEB日志分析模块
  • Arbess速成手册(9) - 集成GitLab实现Python项目自动化构建并主机部署
  • HTML拖拽上传图片至GLM-4.6V-Flash-WEB服务
  • 通达信妖股异动副图公式 源码
  • Markdown目录生成让GLM技术文档结构更清晰
  • 全网最全 Java 数据库优化硬核指南:架构、SQL、索引、监控一站搞定
  • 微PE官网工具箱集成GLM-4.6V-Flash-WEB进行故障界面识别
  • 深度拆解GEO优化的技术原理与AI搜索时代品牌破局之道
  • CSDN官网私信功能联系GLM技术博主获取帮助
  • Arbess速成手册(10) - 集成GitLab实现PHP项目自动化构建并主机部署
  • GitHub镜像网站HTTPS证书问题解决方案
  • Dify + React安全测试最佳实践(仅限高级开发者的5个内部方法)
  • Dify描述生成受限?揭秘3种绕过限制的实战方法
  • 国巨 PA 系列宽端子电流感测电阻:高适配性的同类产品优选替代方案
  • ChromeDriver执行JS脚本提取GLM网页动态内容
  • Dify凭证管理疑难杂症(8个真实案例+企业级解决方案)
  • 【Dify React安全测试实战指南】:从零构建高安全前端应用的5大核心策略
  • MyBatisPlus代码生成器快速构建GLM后台CRUD
  • sourcefare速成手册(6) - 集成soular,使用soular用户统一认证登录
  • 告别网络盲区
  • 2026年高口碑无框眼镜品牌推荐榜单:解决你的选择难题 - 睿易优选
  • 微PE官网网络工具检测GLM服务器连接状态
  • 【前端架构师亲授】:Dify集成Next.js必须掌握的7项性能优化技巧
  • HuggingFace镜像网站推荐:阿里云、清华源哪个更快?
  • 基于51单片机智能光照度计台灯恒照度PCF8591闭环控灯设计DIY18-996