Reactor反应式编程实战:从基础到高级应用
Reactor反应式编程实战:从基础到高级应用
一、反应式编程概述
1.1 什么是反应式编程
反应式编程(Reactive Programming)是一种基于异步数据流和变化传播的编程范式。其核心特点包括:
- 异步非阻塞:避免线程阻塞,提高资源利用率
- 数据流驱动:以数据流为核心,通过操作符进行数据转换
- 背压控制:消费者可以控制生产者的速率
- 组合性:通过操作符组合复杂逻辑
1.2 Reactor核心概念
| 概念 | 描述 |
|---|---|
| Mono | 0或1个元素的异步序列 |
| Flux | 0到N个元素的异步序列 |
| Operator | 操作符,用于转换和处理数据流 |
| Backpressure | 背压,消费者控制数据流速 |
| Scheduler | 调度器,控制执行线程 |
1.3 反应式编程优势
传统同步编程: ┌─────────┐ ┌─────────┐ ┌─────────┐ │ 请求 │───▶│ 处理 │───▶│ 响应 │ └─────────┘ └─────────┘ └─────────┘ │ │ │ ▼ ▼ ▼ 阻塞等待 阻塞等待 阻塞等待 反应式编程: ┌─────────┐ ┌─────────┐ ┌─────────┐ │ 请求 │───▶│ 处理 │───▶│ 响应 │ └─────────┘ └─────────┘ └─────────┘ │ │ │ ▼ ▼ ▼ 立即返回 异步处理 回调通知二、Reactor基础
2.1 依赖配置
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.5.10</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.5.10</version> <scope>test</scope> </dependency>2.2 创建Mono和Flux
// 创建Mono Mono<String> emptyMono = Mono.empty(); Mono<String> justMono = Mono.just("Hello"); Mono<String> fromSupplier = Mono.fromSupplier(() -> "World"); Mono<String> errorMono = Mono.error(new RuntimeException("Error")); // 创建Flux Flux<Integer> emptyFlux = Flux.empty(); Flux<Integer> justFlux = Flux.just(1, 2, 3, 4, 5); Flux<Integer> rangeFlux = Flux.range(1, 10); Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); Flux<String> fromIterable = Flux.fromIterable(List.of("A", "B", "C"));2.3 订阅与消费
// 订阅Mono justMono.subscribe( value -> System.out.println("Value: " + value), error -> System.err.println("Error: " + error), () -> System.out.println("Complete") ); // 订阅Flux rangeFlux.subscribe( value -> System.out.println("Value: " + value), error -> System.err.println("Error: " + error), () -> System.out.println("Complete") );三、核心操作符
3.1 转换操作符
// map:转换每个元素 Flux<Integer> mapped = Flux.range(1, 5) .map(n -> n * 2); // flatMap:扁平化处理 Flux<String> flatMapped = Flux.just("a", "b", "c") .flatMap(letter -> Flux.just(letter.toUpperCase(), letter.toLowerCase())); // concatMap:顺序扁平化 Flux<String> concatMapped = Flux.just("a", "b", "c") .concatMap(letter -> Flux.just(letter.toUpperCase(), letter.toLowerCase())); // transform:动态应用操作符 Function<Flux<String>, Flux<String>> filterAndMap = flux -> flux.filter(s -> s.length() > 2) .map(String::toUpperCase); Flux<String> transformed = Flux.just("a", "bb", "ccc", "dddd") .transform(filterAndMap);3.2 过滤操作符
// filter:过滤元素 Flux<Integer> filtered = Flux.range(1, 10) .filter(n -> n % 2 == 0); // take:取前N个元素 Flux<Integer> taken = Flux.range(1, 100) .take(5); // skip:跳过前N个元素 Flux<Integer> skipped = Flux.range(1, 10) .skip(3); // distinct:去重 Flux<Integer> distinct = Flux.just(1, 2, 2, 3, 3, 3, 4) .distinct(); // takeWhile:条件取元素 Flux<Integer> takeWhile = Flux.range(1, 10) .takeWhile(n -> n < 5);3.3 组合操作符
// merge:合并多个Flux Flux<Integer> flux1 = Flux.range(1, 3); Flux<Integer> flux2 = Flux.range(4, 3); Flux<Integer> merged = Flux.merge(flux1, flux2); // concat:顺序连接 Flux<Integer> concatenated = Flux.concat(flux1, flux2); // zip:配对合并 Flux<String> zipped = Flux.zip( Flux.just("A", "B", "C"), Flux.just("1", "2", "3"), (letter, number) -> letter + number ); // combineLatest:组合最新值 Flux<String> combined = Flux.combineLatest( Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100)), Flux.just("1", "2").delayElements(Duration.ofMillis(150)), (letter, number) -> letter + number );3.4 错误处理
// onErrorReturn:错误时返回默认值 Mono<String> errorHandled = Mono.error(new RuntimeException("Oops")) .onErrorReturn("Default Value"); // onErrorResume:错误时切换到备用流 Mono<String> errorResumed = Mono.error(new RuntimeException("Oops")) .onErrorResume(error -> Mono.just("Recovered from: " + error.getMessage())); // onErrorMap:转换错误类型 Mono<String> errorMapped = Mono.error(new RuntimeException("Oops")) .onErrorMap(error -> new CustomException("Custom error", error)); // retry:重试 Flux<Integer> retried = Flux.range(1, 3) .flatMap(n -> { if (n == 2) return Mono.error(new RuntimeException("Fail")); return Mono.just(n); }) .retry(2); // retryWhen:条件重试 Flux<Integer> retryWhenFlux = Flux.range(1, 3) .flatMap(n -> { if (n == 2) return Mono.error(new RuntimeException("Fail")); return Mono.just(n); }) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));四、背压处理
4.1 背压策略
// BUFFER:缓冲所有元素(默认) Flux.range(1, 1000) .onBackpressureBuffer(100) .subscribe(); // DROP:丢弃超出容量的元素 Flux.range(1, 1000) .onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped)) .subscribe(); // LATEST:只保留最新元素 Flux.range(1, 1000) .onBackpressureLatest() .subscribe(); // ERROR:背压时抛出异常 Flux.range(1, 1000) .onBackpressureError() .subscribe();4.2 背压控制示例
// 消费者控制流速 Flux.range(1, 100) .doOnRequest(n -> System.out.println("Requested: " + n)) .subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { // 请求第一个元素 request(1); } @Override protected void hookOnNext(Integer value) { System.out.println("Received: " + value); // 处理完后再请求下一个 request(1); } });五、调度器
5.1 调度器类型
// 弹性调度器 - 创建新线程处理 Flux.range(1, 5) .subscribeOn(Schedulers.elastic()) .subscribe(); // 并行调度器 - 使用CPU核心数的线程池 Flux.range(1, 5) .subscribeOn(Schedulers.parallel()) .subscribe(); // 单线程调度器 - 共享单线程 Flux.range(1, 5) .subscribeOn(Schedulers.single()) .subscribe(); // 即时调度器 - 当前线程执行 Flux.range(1, 5) .subscribeOn(Schedulers.immediate()) .subscribe(); // 自定义调度器 Scheduler customScheduler = Schedulers.newBoundedElastic(10, 100, "custom"); Flux.range(1, 5) .subscribeOn(customScheduler) .subscribe();5.2 subscribeOn vs publishOn
// subscribeOn:控制订阅发生的线程 // publishOn:控制后续操作的线程 Flux.range(1, 3) .doOnNext(n -> System.out.println("First: " + Thread.currentThread().getName())) .subscribeOn(Schedulers.parallel()) .doOnNext(n -> System.out.println("Second: " + Thread.currentThread().getName())) .publishOn(Schedulers.elastic()) .doOnNext(n -> System.out.println("Third: " + Thread.currentThread().getName())) .subscribe();六、反应式流规范
6.1 核心接口
// Publisher:发布者接口 public interface Publisher<T> { void subscribe(Subscriber<? super T> subscriber); } // Subscriber:订阅者接口 public interface Subscriber<T> { void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete(); } // Subscription:订阅接口 public interface Subscription { void request(long n); void cancel(); } // Processor:处理器接口(既是发布者也是订阅者) public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }6.2 实现自定义Publisher
public class CustomPublisher<T> implements Publisher<T> { private final List<T> items; public CustomPublisher(List<T> items) { this.items = items; } @Override public void subscribe(Subscriber<? super T> subscriber) { Subscription subscription = new CustomSubscription(subscriber, items); subscriber.onSubscribe(subscription); } private static class CustomSubscription<T> implements Subscription { private final Subscriber<? super T> subscriber; private final List<T> items; private int index = 0; private boolean cancelled = false; public CustomSubscription(Subscriber<? super T> subscriber, List<T> items) { this.subscriber = subscriber; this.items = items; } @Override public void request(long n) { if (cancelled) return; for (long i = 0; i < n && index < items.size(); i++) { subscriber.onNext(items.get(index++)); } if (index >= items.size()) { subscriber.onComplete(); } } @Override public void cancel() { cancelled = true; } } }七、Spring WebFlux集成
7.1 WebFlux控制器
@RestController @RequestMapping("/api") public class UserController { @Autowired private UserService userService; @GetMapping("/users") public Flux<User> getAllUsers() { return userService.findAll(); } @GetMapping("/users/{id}") public Mono<User> getUserById(@PathVariable String id) { return userService.findById(id); } @PostMapping("/users") public Mono<User> createUser(@RequestBody Mono<User> userMono) { return userMono.flatMap(userService::create); } @PutMapping("/users/{id}") public Mono<User> updateUser(@PathVariable String id, @RequestBody Mono<User> userMono) { return userService.update(id, userMono); } @DeleteMapping("/users/{id}") public Mono<Void> deleteUser(@PathVariable String id) { return userService.deleteById(id); } }7.2 WebClient客户端
@Configuration public class WebClientConfig { @Bean public WebClient webClient() { return WebClient.builder() .baseUrl("https://api.example.com") .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)) .build(); } } @Service public class ApiClient { @Autowired private WebClient webClient; public Mono<User> fetchUser(String userId) { return webClient.get() .uri("/users/{id}", userId) .retrieve() .onStatus(HttpStatus::isError, response -> response.bodyToMono(String.class) .flatMap(error -> Mono.error(new RuntimeException(error))) ) .bodyToMono(User.class); } public Flux<User> fetchUsers() { return webClient.get() .uri("/users") .retrieve() .bodyToFlux(User.class); } }7.3 反应式数据访问
@Repository public interface UserRepository extends ReactiveMongoRepository<User, String> { Flux<User> findByLastName(String lastName); Mono<User> findByEmail(String email); Flux<User> findByAgeBetween(int min, int max); @Query("{ 'username': { $regex: ?0 } }") Flux<User> findByUsernameContaining(String pattern); } @Service public class UserService { @Autowired private UserRepository userRepository; public Flux<User> findAll() { return userRepository.findAll(); } public Mono<User> findById(String id) { return userRepository.findById(id); } public Mono<User> create(User user) { user.setId(UUID.randomUUID().toString()); user.setCreatedAt(LocalDateTime.now()); return userRepository.save(user); } public Mono<User> update(String id, Mono<User> userMono) { return userRepository.findById(id) .flatMap(existing -> userMono .doOnNext(user -> { existing.setUsername(user.getUsername()); existing.setEmail(user.getEmail()); existing.setUpdatedAt(LocalDateTime.now()); }) .then(Mono.just(existing)) ) .flatMap(userRepository::save); } public Mono<Void> deleteById(String id) { return userRepository.deleteById(id); } }八、测试与调试
8.1 单元测试
@ExtendWith(MockitoExtension.class) class UserServiceTest { @Mock private UserRepository userRepository; @InjectMocks private UserService userService; @Test void testFindById() { User expectedUser = new User("1", "test", "test@example.com"); when(userRepository.findById("1")) .thenReturn(Mono.just(expectedUser)); StepVerifier.create(userService.findById("1")) .expectNext(expectedUser) .verifyComplete(); verify(userRepository).findById("1"); } @Test void testCreateUser() { User user = new User(null, "test", "test@example.com"); User savedUser = new User("1", "test", "test@example.com"); when(userRepository.save(any(User.class))) .thenReturn(Mono.just(savedUser)); StepVerifier.create(userService.create(user)) .expectNextMatches(u -> u.getId() != null && u.getUsername().equals("test")) .verifyComplete(); } @Test void testFindByIdNotFound() { when(userRepository.findById("999")) .thenReturn(Mono.empty()); StepVerifier.create(userService.findById("999")) .verifyComplete(); } }8.2 调试技巧
// 日志操作符 Flux.range(1, 5) .doOnSubscribe(sub -> System.out.println("Subscribed")) .doOnRequest(n -> System.out.println("Requested: " + n)) .doOnNext(value -> System.out.println("Next: " + value)) .doOnError(error -> System.err.println("Error: " + error)) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Cancelled")) .subscribe(); // 延迟日志 Flux.range(1, 5) .log("Flux", Level.INFO) .subscribe(); // 检查点 Flux.range(1, 5) .checkpoint("After range") .map(n -> n * 2) .checkpoint("After map") .subscribe();九、性能优化
9.1 避免阻塞操作
// 错误示例:在反应式流中调用阻塞方法 Flux.range(1, 10) .map(n -> blockingDatabaseCall(n)) // 阻塞! .subscribe(); // 正确示例:将阻塞操作包装在Mono中 Flux.range(1, 10) .flatMap(n -> Mono.fromCallable(() -> blockingDatabaseCall(n)) .subscribeOn(Schedulers.boundedElastic())) .subscribe();9.2 使用并行处理
// 并行处理 Flux.range(1, 100) .parallel() .runOn(Schedulers.parallel()) .map(n -> processItem(n)) .sequential() .subscribe();9.3 批量操作优化
// 批量处理 Flux.range(1, 1000) .buffer(100) .flatMap(batch -> { return batchService.processBatch(batch); }) .subscribe();十、常见模式
10.1 缓存模式
@Service public class CachedService { private final Map<String, Mono<User>> cache = new ConcurrentHashMap<>(); @Autowired private UserRepository userRepository; public Mono<User> getUser(String id) { return cache.computeIfAbsent(id, key -> userRepository.findById(key) .cache(Duration.ofMinutes(5)) ); } }10.2 重试模式
public Mono<User> fetchWithRetry(String userId) { return webClient.get() .uri("/users/{id}", userId) .retrieve() .bodyToMono(User.class) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .filter(error -> error instanceof WebClientResponseException) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> new RuntimeException("Failed after retries", retrySignal.failure()) ) ); }10.3 超时模式
public Mono<User> fetchWithTimeout(String userId) { return webClient.get() .uri("/users/{id}", userId) .retrieve() .bodyToMono(User.class) .timeout(Duration.ofSeconds(5)) .onErrorResume(TimeoutException.class, e -> Mono.just(User.getDefaultUser()) ); }十一、总结
反应式编程是处理异步、高并发场景的强大工具,Reactor框架提供了丰富的操作符和工具来构建复杂的异步数据流。通过本文的介绍,你可以:
- 基础概念:理解Mono、Flux、操作符等核心概念
- 操作符使用:掌握转换、过滤、组合、错误处理等操作符
- 背压控制:理解并实现背压策略
- 调度器:合理使用调度器控制线程模型
- Spring WebFlux:构建反应式Web应用
- 测试调试:编写反应式代码的测试用例
- 性能优化:避免阻塞操作,优化数据流处理
反应式编程需要转变思维模式,但一旦掌握,就能编写出高效、可扩展的异步应用。
