把业务逻辑写成纯函数之后,我再也不想写 Service 层了
一、先说 disintegrate:这件事不是一个"框架",是一套 trait 约束
我用的事件溯源库叫 disintegrate,它和传统 ORM 框架的思路完全不一样——它不给你 BaseService、不给你继承、不包办 SQL。它只给了三个 trait:
Decision:一个业务操作的"意图"。核心方法是process(&self, state: &StateQuery) -> Result<Vec<Event>, Error>。输入当前聚合的投影状态,输出零个或多个事件,或者返回错误。StateMutate:定义"事件怎么折叠回状态"。核心方法是mutate(&mut self, event: Event),一个 match 把所有事件变体对号入座,更新对应字段。StateQuery:通过 derive 宏自动生成事件流查询——标注#[id]字段,框架就知道这个聚合的事件流该怎么加载。
这三个 trait 拼在一起形成了一个闭环:加载事件 → StateMutate 折叠成当前状态 → Decision 根据状态做判断产出新事件 → 新事件追加到事件流 → 下一轮 StateMutate 消费。框架帮你做加载、折叠和持久化,你只需要填空——定义事件长什么样、折叠逻辑怎么写、每个操作该产出什么事件。
提前声明:本文分享的是我在 Pico-CRM 项目里的实践,架构选择有上下文依赖,仅供参考。对事件溯源不感兴趣也没关系,其中纯函数化的思路也适用于传统三层架构。
二、缘起:Service 层越写越心虚
事情是这样的。
给 CRM 上事件溯源之前,订单模块的逻辑散落在 Service 层里:校验放一段、状态切换放一段、拼 SQL 放一段、打日志再放一段。一个"取消订单"的操作,代码从第 40 行写到第 120 行,中间还夹着两处if的边界条件——过了两周我自己都不敢改。
上了事件溯源之后,用 disintegrate 的 Decision trait 重构成纯函数。现在取消订单的逻辑就一个 struct:
// backend/src/domain/crm/order/es/decisions.rspubstructCancelOrderDecision{merchant_id:String,order_uuid:String,reason:String,updated_at:DateTime<Utc>,operator_uuid:Option<String>,}implDecisionforCancelOrderDecision{typeEvent=OrderEventEnvelope;// 这个操作会产出什么类型的事件typeStateQuery=OrderState;// 执行前需要读取哪个聚合的当前状态typeError=String;// 校验失败返回的错误类型fnstate_query(&self)->Self::StateQuery{OrderState::new(&self.merchant_id,&self.order_uuid)}fnprocess(&self,state:&Self::StateQuery)->Result<Vec<Self::Event>,Self::Error>{letmutorder=state.to_domain()?;// 从扁平状态重建领域对象order.cancel(self.reason.clone())?;// 调领域方法做校验Ok(vec![OrderEventEnvelope::OrderCancelled{...}])// 校验通过,产出事件}}就一个process()方法:从状态重建领域对象 → 调用领域方法校验 → 返回事件。没了。看 struct 的字段就知道这个操作需要什么参数,完全自文档化。改业务逻辑的时候,看一眼关联类型就知道它会碰哪个聚合的状态(StateQuery)、会产出什么类型的事件(Event)、可能抛出什么错误(Error)。
三、StateMutate:事件怎么变成状态
Decision 产出事件之后,事件得变回状态,下次别的操作才能用。这件事归StateMutate管:
// backend/src/domain/crm/order/es/state.rsimplStateMutateforOrderState{fnmutate(&mutself,event:Self::Event){matchevent{OrderEvent::OrderCreated{merchant_id,order_uuid,status,...}=>{self.exists=true;self.merchant_id=merchant_id;self.order_uuid=order_uuid;self.status=Some(status);// ... 初始化所有字段}OrderEvent::OrderStatusChanged{status,completed_at,updated_at,..}=>{self.status=Some(status);// 只改状态self.completed_at=completed_at;// 完成时间self.updated_at=Some(updated_at);}OrderEvent::OrderCancelled{cancellation_reason,updated_at,..}=>{self.status=Some(OrderStatus::Cancelled.as_str().to_string());self.cancellation_reason=Some(cancellation_reason);self.completed_at=None;// 取消后清空完成时间self.updated_at=Some(updated_at);}// ... 其他事件类型}}}每个事件变体只更新自己关心的字段。OrderCreated全量初始化,OrderStatusChanged只改 status 和 completed_at,OrderCancelled只改 status 和取消原因。没有 I/O、没有副作用、没有async——就是从一个结构体的字段搬到另一个结构体的字段上。
一个很容易忽略的细节是,OrderState所有字段都是Option<T>,初始值是Default::default()(全是 None)。事件一条一条喂进去,状态一层一层叠加。因为没有任何副作用,重放事件一定回到同一状态——这正是事件溯源最底层的保障。
重建领域对象:to_domain()
你可能会问:Decision 里不是用state.to_domain()重建领域对象吗?这个是怎么做的?其实就是把扁平的状态字段拼回完整的Order:
// backend/src/domain/crm/order/es/state.rspubfnto_domain(&self)->Result<Order,String>{if!self.exists{returnErr(format!("order {} not found",self.order_uuid));}Ok(Order{uuid:self.order_uuid.clone(),status:OrderStatus::parse(self.status.as_deref().unwrap_or("pending"))?,amount_cents:self.amount_cents,// ... 所有字段})}重建出来的Order就是有完整方法的领域对象,可以直接调cancel()、update_details()。而且Order根本不感知事件溯源的存在——它对order.cancel("客户改期".to_string())来说,自己就是一个普通的领域对象,该做的校验照做,该抛的错照抛。
四、核心体验:Given/When/Then 测试不需要数据库
这套模式最大的爽点是测试。process()是纯函数,mutate()是纯函数,所以测试链路变成了:
- Given:用一条或多条历史事件喂给
StateMutate,构造出"历史上的这一刻" - When:创建 Decision 调
process(),传入构造好的状态 - Then:断言产出的事件列表,或校验失败的错误信息
没有数据库。没有 mock。没有 Docker。
disintegrate 内置了TestHarness,把上面的三步封装成链式调用:
// backend/src/domain/crm/order/es/decisions.rs#[test]fnit_rejects_invalid_status_transition(){TestHarness::given([seed_created_event(&sample_order())]).when(UpdateOrderStatusDecision::new("...","order-1",OrderStatus::Completed,ts(2),None,)).then_err("invalid order status transition: pending -> completed".to_string());}想看已完成的订单不能改排班?
#[test]fnit_rejects_assignment_updates_for_completed_orders(){TestHarness::given([seed_created_event(&sample_order()),/* OrderStatusChanged → Completed 事件 */,]).when(UpdateOrderAssignmentDecision::new(/* ... */)).then_err("schedule assignment can only be updated in planned status (current: done)".to_string(),);}正向用例也一样简洁:
#[test]fnit_cancels_order_with_reason(){TestHarness::given([seed_created_event(&sample_order())]).when(CancelOrderDecision::new("...","order-1","客户改期".into(),ts(2),None)).then([OrderEventEnvelope::OrderCancelled{cancellation_reason:"客户改期".to_string(),// ...}]);}TestHarness内部做的事情一目了然:把 given 事件列表喂给StateMutate构造当前状态 → 调Decision::process()→ 对比产出事件和 then 里的期望值。Order 聚合 6 个 Decision、7 个测试用例,全部跑完不到 10 毫秒。对比之前需要docker-compose up等 15 秒再跑测试的日子,这个体验差距真的回不去。
五、Repository 层只做一件事:构造 Decision 然后丢给框架
Decision 写好了,Repository 层就剩一行:
// backend/src/infrastructure/repositories/crm/order_repository_impl.rsasyncfncancel_order(&self,uuid:String,reason:String,...)->Result<Order,String>{letdecision_maker=disintegrate_postgres::decision_maker(event_store,NoSnapshot);decision_maker.make(CancelOrderDecision::new(merchant_id,uuid,reason,Utc::now(),operator_uuid)).await?;Self::load_order_from_events(&merchant_id,&uuid)}decision_maker.make()一行干了四件事:
- 根据
state_query()加载历史事件 - 通过
StateMutate折叠出当前状态 - 调
Decision::process()执行业务逻辑 - 原子追加新事件到事件存储
Repository 只负责构造 Decision 并传进去,一行都不碰业务逻辑。对比传统 Service 层:校验参数、查数据库、状态检查、UPDATE、写日志——全塞一个方法里。Decision 模式把脏活交给框架,把业务判断还给process()纯函数。
六、三个聚合,统一模式
Pico-CRM 里的 Order、Schedule、ServiceRequest 三个聚合,全部同一套 Decision + StateMutate:
| 聚合 | Decision 数 | 典型操作 |
|---|---|---|
| Order | 6 个 | Create / UpdateDetails / StatusChanged / Cancel / Assignment / Settlement |
| Schedule | 4 个 | Create / UpdateAssignment / StatusChanged / Delete |
| ServiceRequest | 3 个 | Create / UpdateDetails / StatusChanged |
三个聚合加起来 13 个 Decision、50+ 个测试用例,每个测试都是 Given/When/Then 三行代码。新增一个业务操作的标准流程只有四步:
- 定义事件变体(如果是新事件类型)
- 实现
Decisiontrait - 补一行
StateMutate::mutate()分支 - 写 Given/When/Then 测试
四步,每一步都是纯函数,每一步都编译期检查。不存在"忘了改某处导致运行时崩"的问题。
七、总结
Decision + StateMutate 这套模式解决了一个非常实际的问题:业务逻辑到底写在哪?
它的答案就三个字:纯函数。
process()给定相同的 state,一定产出相同的事件。不会偷偷查库、不会静默发 HTTP、不会改全局变量。mutate()给定相同的事件序列,一定折叠出相同的状态。没有副作用,可以任意次重放。TestHarness把测试变成 Given/When/Then 三连,50 个用例不到 10 毫秒跑完,提交前跑一遍眼睛都不眨。
回头想,这算不上什么高深设计——就是把副作用推到最外层(decision_maker.make()),让核心逻辑变成可推理的纯数据变换。但就这一层隔离,让开发体验从"改代码要连着数据库一起测"变成了"写完 Decision 直接跑 Given/When/Then,零点几秒见结果"。
你现在的项目里业务逻辑和数据库耦合在一起吗?有没有试过用类似的方式把逻辑抽成纯函数?欢迎在评论区聊聊。
项目开源在 GitHub,搜 Pico-CRM 就能看到完整代码。
