状态机——SpringStateMachine并行区域状态流转
SpringStateMachine并行区域状态流转
- 1、问题概述
- 2、核心概念与状态设计
- 3、状态与事件枚举定义
- 4、并行区域核心配置
- 5、模拟调用流转过程
- 6、其他实现方案
- 6.1、修复版的原生 .withJoin() 方案(配置独立事件)
- 6.2、完全应用层驱动(把状态机当成“纯单线”状态机)
- 6.3、利用持久化拦截器(StateMachineInterceptor)在落盘时收网
- 6.4、基于流程引擎的动态微子状态机(Orchestration)
1、问题概述
在 Spring StateMachine 中,并行区域(Orthogonal Regions,或称正交区域) 用于处理同一个实体上多个相互独立、互不干扰的状态流转线。
在一个汽车电商履约系统中,当用户支付完成后,订单进入 “审核履约中(ON_AUDIT)” 状态。为了提高效率,系统需要并行(同时)开启两条相互独立的流水线:
- 风控审批线:初始为 RISK_CHECKING→ \rightarrow→审批通过到达终点 RISK_PASSED。
- 物流仓储线:初始为 PACKING→ \rightarrow→打包完成到达终点 PACKED。
核心痛点:
这两条线是由外部不同的微服务异步回调驱动的,订单系统根本不知道谁先完成、谁后完成。
- 如果原生的 .withJoin() 评估器在 3.2.0 中发生并发冲突或短路误判(比如风控刚过,物流还没动,就直接秒通关)。
- 外部系统只认自己的业务事件(RISK_APPROVE 或 PACKING_DONE),绝不可能贴心地帮订单系统判断何时该补发一个汇合信号。
终极解决方案:’
利用扩展状态(Extended State)标记开关 + Action 内部事件自投递。任何一条线到达终点时,状态机自己对自己来一发收网事件ALL_DONE。由守卫(Guard)卡死,少一条线都不开门,两条线全齐则自动融合流转到最终的 “待发货(PENDING_DELIVER)”。
2、核心概念与状态设计
要实现并行区域,状态必须具备父子嵌套结构(Hierarchical States)。
我们需要定义一个父状态(Parent State),在这个父状态内部划分出多个独立的区域(Regions),每个区域拥有自己独立的子状态(Substates)和流转逻辑。
假设我们要实现以下场景:
- 父状态:
ON_AUDIT(审核与准备中) - 并行区域 1(风控线):
RISK_CHECKING(风控检查中)→ \rightarrow→RISK_PASSED(风控通过) - 并行区域 2(物流线):
PACKING(打包中)→ \rightarrow→PACKED(打包完成) - 最终目标状态:
PENDING_DELIVER(等待发货),当且仅当风控通过且打包完成时进入。
3、状态与事件枚举定义
publicenumCarOrderState{PENDING_PAY,// 待付款// 父状态ON_AUDIT,// 审核准备中(包含并行区域)// 区域 1 的子状态(风控线)RISK_CHECKING,//风控检查中RISK_PASSED,//风控通过// 区域 2 的子状态(物流线)PACKING,//打包中PACKED,//打包完成// 最终状态PENDING_DELIVER// 等待发货}publicenumCarOrderEvent{USER_PAY,//用户支付RISK_APPROVE,//风控审批通过PACKING_DONE,//打包完成ALL_DONE// 内部隐式事件:全自动汇合收网(业务层无感知)}4、并行区域核心配置
在 Spring StateMachine 的 Fluent API 中,配置并行区域(Orthogonal Regions)的核心逻辑是:通过在同一个父状态(Parent)下,连续声明多个 withStates() 块来隐式划分不同的区域(Regions)。
当我们要在一个特定的父状态内部(比如 ON_AUDIT 状态内)创建多条并行的子状态线时,Spring 的标准设计范式是:
- 第一个
.withStates():声明它是 ON_AUDIT 的子状态,并指定这条线(Region 1)的初始状态。 - 通过
.and()连接。 - 第二个
.withStates():再次声明父状态是 ON_AUDIT。此时 Spring 侦测到你为同一个父状态声明了第二套独立的子状态,它就会在内部自动为你开辟出 Region 2。
官方标准的伪代码模型如下,只要 parent() 指向同一个状态,每多一个 withStates(),就代表多了一条正交线:
states.withStates().parent(OrderState.ON_AUDIT)// 绑定父状态.initial(OrderState.RISK_CHECKING)// 【区域 1】的起点.state(OrderState.RISK_PASSED).and()// 隔离线.withStates().parent(OrderState.ON_AUDIT)// 绑定相同的父状态.initial(OrderState.PACKING)// 【区域 2】的起点(Spring 会自动识别并创建新 Region).state(OrderState.PACKED);具体实现如下:
importorg.springframework.context.annotation.Configuration;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.statemachine.config.EnableStateMachineFactory;importorg.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;importorg.springframework.statemachine.config.builders.StateMachineStateConfigurer;importorg.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;importorg.springframework.statemachine.guard.Guard;importreactor.core.publisher.Mono;@Configuration@EnableStateMachineFactory(name="carOrderStateMachineFactory")publicclassCarOrderStateMachineConfigextendsEnumStateMachineConfigurerAdapter<CarOrderState,CarOrderEvent>{/** * 1. 声明状态节点树 */@Overridepublicvoidconfigure(StateMachineStateConfigurer<CarOrderState,CarOrderEvent>states)throwsException{states.withStates().initial(CarOrderState.PENDING_PAY).end(CarOrderState.PENDING_DELIVER)// 明确定义整张图纸的最终终态.state(CarOrderState.ON_AUDIT)// 注册正交并行的父状态.and()// 声明并行区域 1(风控线).withStates().parent(CarOrderState.ON_AUDIT).initial(CarOrderState.RISK_CHECKING).state(CarOrderState.RISK_PASSED).and()// 声明并行区域 2(物流线).withStates().parent(CarOrderState.ON_AUDIT).initial(CarOrderState.PACKING).state(CarOrderState.PACKED);}/** * 2. 声明流转路由与自动推进机制 */@Overridepublicvoidconfigure(StateMachineTransitionConfigurer<CarOrderState,CarOrderEvent>transitions)throwsException{transitions// 顶层流转:用户支付完成 -> 瞬间激活并行的两条子流水线.withExternal().source(CarOrderState.PENDING_PAY).target(CarOrderState.ON_AUDIT).event(CarOrderEvent.USER_PAY).and()// 区域 1 推进:接收风控通过,记录风控开关,并自发收网信号.withExternal().source(CarOrderState.RISK_CHECKING).target(CarOrderState.RISK_PASSED).event(CarOrderEvent.RISK_APPROVE).action(ctx->{ctx.getExtendedState().getVariables().put("risk_ok",true);// 3.2.0 标准响应式自投递,通知状态机尝试收网ctx.getStateMachine().sendEvent(Mono.just(MessageBuilder.withPayload(CarOrderEvent.ALL_DONE).build())).subscribe();}).and()// 区域 2 推进:接收物流打包,记录物流开关,并自发收网信号.withExternal().source(CarOrderState.PACKING).target(CarOrderState.PACKED).event(CarOrderEvent.PACKING_DONE).action(ctx->{ctx.getExtendedState().getVariables().put("pack_ok",true);// 3.2.0 标准响应式自投递,通知状态机尝试收网ctx.getStateMachine().sendEvent(Mono.just(MessageBuilder.withPayload(CarOrderEvent.ALL_DONE).build())).subscribe();}).and()// 3. 全自动收网大闸:顶替原生 withJoin() 的高稳方案// 使用 withLocal 确保父状态和所有子区域在任何时刻都能稳定接收、识别这个自发事件.withLocal().source(CarOrderState.ON_AUDIT).target(CarOrderState.PENDING_DELIVER).event(CarOrderEvent.ALL_DONE).guard(checkBothFinishedGuard());}/** * 4. 汇合双重合规守卫 */privateGuard<CarOrderState,CarOrderEvent>checkBothFinishedGuard(){returncontext->{// 从状态机实例的共享内存(ExtendedState)中读取开关BooleanriskOk=context.getExtendedState().get("risk_ok",Boolean.class);BooleanpackOk=context.getExtendedState().get("pack_ok",Boolean.class);riskOk=(riskOk!=null&&riskOk);packOk=(packOk!=null&&packOk);System.out.printf("【全自动收网拦截检查】风控线完工: %b, 物流线完工: %b\n",riskOk,packOk);// 当且仅当两条并行线都将自己的开关改为了 true,大闸才放行returnriskOk&&packOk;};}}5、模拟调用流转过程
外部业务场景里,风控回调和物流回调各走各的,它们只发自己的业务事件,不需要也不可能补发任何收网信号。
@ServicepublicclassCarOrderServiceImpl{@AutowiredprivateStateMachineFactory<CarOrderState,CarOrderEvent>factory;publicvoidtestParallel()throwsInterruptedException{// 1. 获取一台全新的状态机实例StateMachine<CarOrderState,CarOrderEvent>sm=factory.getStateMachine("order_100");// 2. 3.2.0 标准响应式启动,并强阻塞直到初始化完成sm.startReactively().block();System.out.println("--- 状态机初始化完毕 ---");printCurrentStates(sm);// 3. 外部触发:用户完成付款sm.sendEvent(Mono.just(MessageBuilder.withPayload(CarOrderEvent.USER_PAY).build())).blockLast();// 必须用 blockLast() 强行让当前的测试线程死等后台并行线流转完System.out.println("--- 发送 USER_PAY 后 ---");printCurrentStates(sm);// 预期并行激活:[ON_AUDIT, PACKING, RISK_CHECKING]// 4. 外部触发:【风控微服务】异步审核通过,回调订单sm.sendEvent(Mono.just(MessageBuilder.withPayload(CarOrderEvent.RISK_APPROVE).build())).blockLast();System.out.println("--- 发送 RISK_APPROVE 后 ---");printCurrentStates(sm);// 预期:物流没做,状态机自动触发检查后,依然死死卡在 [ON_AUDIT, PACKING, RISK_PASSED]// 5. 外部触发:【仓储微服务】商品打包完成,回调订单sm.sendEvent(Mono.just(MessageBuilder.withPayload(CarOrderEvent.PACKING_DONE).build())).blockLast();System.out.println("--- 发送 PACKING_DONE 后 ---");printCurrentStates(sm);// 预期:两条线全齐,状态机内部自发 ALL_DONE 冲破大闸,融合成最终终态 [PENDING_DELIVER]}privatevoidprintCurrentStates(StateMachine<CarOrderState,CarOrderEvent>sm){// 并行状态下,sm.getState().getIds() 会返回一个集合,包含所有激活的子状态System.out.println("当前内存激活的状态集合: "+sm.getState().getIds());}}输出如下:
---状态机初始化完毕---当前内存激活的状态集合:[PENDING_PAY]---发送USER_PAY后---当前内存激活的状态集合:[ON_AUDIT,RISK_CHECKING,PACKING]【全自动收网拦截检查】风控线完工:true,物流线完工:false---发送RISK_APPROVE后---当前内存激活的状态集合:[ON_AUDIT,RISK_PASSED,PACKING]【全自动收网拦截检查】风控线完工:true,物流线完工:true---发送PACKING_DONE后---当前内存激活的状态集合:[PENDING_DELIVER]这就是从头到尾最完整、最纯粹的现代 Spring StateMachine 并行区域落地指南。它既巧妙避开了 3.2.0 原生 withJoin 评估器的严重缺陷,又完美兼顾了分布式业务系统解耦调用的原则,是目前企业级开发中最成熟稳定的标准实现。
6、其他实现方案
6.1、修复版的原生 .withJoin() 方案(配置独立事件)
前面提过,原生 .withJoin() 容易发生“风控过了,物流直接被略过”的单线秒通关 Bug。这是因为我们在两条并行线上使用了不同的业务事件(RISK_APPROVE 和 PACKING_DONE),导致状态机引擎在评估天平时发生了短路。
另一种原生修复思路是:在两条并行线的终点,配置同一个触发汇合的事件(比如都叫 TRY_JOIN),或者在流转完后,外部系统不仅调用自己的完成事件,还要额外调用一次汇合事件。
// 必须注册一个 JOIN 类型的伪状态states.withChoice().and().withJoin().id(CarOrderState.JOIN_NODE);// 显式注册汇合节点transitions// 区域 1:风控完成到达终点.withExternal().source(CarOrderState.RISK_CHECKING).target(CarOrderState.RISK_PASSED).event(CarOrderEvent.RISK_APPROVE).and()// 区域 2:物流完成到达终点.withExternal().source(CarOrderState.PACKING).target(CarOrderState.PACKED).event(CarOrderEvent.PACKING_DONE).and()// 原生 Join 节点:当且仅当两个 source 都就绪,且受到外部某事件或进入该状态时,自动拉向目标.withJoin().source(CarOrderState.RISK_PASSED).source(CarOrderState.PACKED).target(CarOrderState.PENDING_DELIVER);- 优点:完全流式配置,代码看起来最符合 UML 状态机规范。
- 缺点:在 3.2.0 响应式并发队列下,由于多线程时序问题,此机制依然存在不稳定性,且对伪状态节点的声明要求极高。
6.2、完全应用层驱动(把状态机当成“纯单线”状态机)
这是很多互联网大厂(如美团、阿里)在实际落地复杂分布式订单时最喜欢用的架构:不在状态机内部配置任何“并行区域”或嵌套状态(把并行放到状态机外面去)。
架构设计:
- 状态机只定义纯单线主流程:PENDING_PAY→ \rightarrow→ON_AUDIT→ \rightarrow→PENDING_DELIVER。
- 并在数据库里建一张业务附属表(或者叫履约流水表),专门记录:risk_status(风控状态)和 pack_status(仓储状态)。
- 当风控或物流回调时,不直接调状态机,而是直接更新数据库里对应的状态字段。
- 每次更新完字段后,在 Java 代码里做一次 if (riskOk && packOk) 的判断。如果全齐了,再调用状态机发送一个纯单线的 sm.sendEvent(CarOrderEvent.AUDIT_SUCCESS),驱动订单走向 PENDING_DELIVER。
- 优点:极大地简化了状态机的配置,避开了所有框架层关于多线程和并行的坑,非常利于数据库事务(@Transactional)控制。
- 缺点:状态机失去了“表达复杂并行控制”的能力,并行的业务逻辑漏到了业务代码里。
6.3、利用持久化拦截器(StateMachineInterceptor)在落盘时收网
Spring StateMachine 提供了强大的拦截器机制(StateMachineInterceptor),它可以在状态机的任何一个状态准备写入数据库(持久化)之前执行。
我们不需要在配置类里写任何自触发 Action,而是挂载一个全局拦截器:
publicclassAutoJoinInterceptorextendsStateMachineInterceptorAdapter<CarOrderState,CarOrderEvent>{@OverridepublicvoidpreStateChange(State<CarOrderState,CarOrderEvent>state,Message<CarOrderEvent>message,Transition<CarOrderState,CarOrderEvent>transition,StateMachine<CarOrderState,CarOrderEvent>stateMachine,StateMachine<CarOrderState,CarOrderEvent>rootStateMachine){// 每次状态机准备变动状态落盘前,拦截器都会被触发if(stateMachine.getState()!=null){Collection<CarOrderState>currentIds=stateMachine.getState().getIds();// 拦截器直接从外部判定当前激活的状态集合if(currentIds.contains(CarOrderState.RISK_PASSED)&¤tIds.contains(CarOrderState.PACKED)){// 拦截器在落盘前,直接强行修改目标流转,或者异步补发事件// 注意:由于拦截器在生命周期更底层,这里发送事件需要确保线程安全}}}}- 优点:业务配置类(Transitions)极其干净,收网逻辑被抽离到了底层的架构基础设施(拦截器)中。
- 缺点:拦截器属于高级底层的“黑魔法”,调试极其困难,一旦写出死循环,很难排查。
6.4、基于流程引擎的动态微子状态机(Orchestration)
如果并行的逻辑非常动态(比如今天只有风控和物流,明天可能还要加上“法务审核”、“财务对账”,变成 4 条并行线),写死在状态机图纸里就会面临频繁改代码的灾难。
此时通常会采用状态机 + 任务编排引擎(如轻量级组件/线程池)的复合架构:
- 父状态单纯叫 ON_AUDIT。
- 进入 ON_AUDIT 后,通过状态机的 doAction 触发一个外部任务分配器,动态开启多个多线程异步任务。
- 状态机在 ON_AUDIT 保持静默。
- 外部的并发框架(如 Java 的 CompletableFuture.allOf() 或 Spring Cloud Data Flow)在外面死等所有任务完成。
- 外面全齐了之后,由外面的总控线程向状态机发送一个汇总事件,推向终点。
- 优点:支持动态并行,随时增加或减少并行分支数量。
- 缺点:开发量大,需要引入状态机之外的第二套并发控制框架。
