【架构实战】CQRS命令查询职责分离:读写分离的进阶实践
【架构实战】CQRS命令查询职责分离:读写分离的进阶实践
一、背景:一个报表查询拖垮了整个交易系统
2020年双十一,我们交易系统出现了一次P0事故。
事情的起因很简单:运营想在活动看板上实时查看"各渠道的GMV和转化率"。后端查询需要JOIN五张表(订单表、订单明细表、渠道表、用户表、支付流水表),做了三个聚合计算(SUM、COUNT DISTINCT、GROUP BY)。
上午10点,运营打开了看板页面。30秒后,交易系统响应时间从50ms飙升到8秒。数据库CPU从30%直接打到100%。所有下单请求开始超时。
根本原因:查询和命令共享了同一套数据模型。交易系统用的主库既要处理高并发写入(下单),又要承受复杂的分析查询(报表)。这在传统架构里是无解的——你不可能对着一张既要快速插入又要复杂查询的表,同时优化写性能和读性能。
这就是CQRS要解决的问题。
二、CQRS核心原理
2.1 什么是CQRS
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是Greg Young在2010年提出的一种架构模式。核心思想只有一句话:将系统的读操作和写操作分离到不同的模型。
【传统架构】 ┌──────────┐ │ Controller │ └──────┬───┘ │ ┌──────▼───┐ │ Service │ ← 读写混合,一个模型同时处理 └──────┬───┘ │ ┌────────────┼────────────┐ │ │ │ ┌─────▼─────┐ ┌────▼────┐ ┌─────▼─────┐ │ 查询1 │ │ 写入 │ │ 查询2 │ │ (报表) │ │ (下单) │ │ (详情) │ └───────────┘ └─────────┘ └───────────┘ 【CQRS架构】 ┌──────────┐ │ 应用层 │ └──────┬───┘ │ ┌────────────┼────────────┐ │ │ ┌─────▼─────┐ ┌────▼────┐ │ 命令模型 │ │ 查询模型 │ │(Command) │ │(Query) │ │ 写优化 │────事件─────▶│ 读优化 │ └─────┬─────┘ └────┬────┘ │ │ ┌─────▼─────┐ ┌────▼────┐ │ 写库 │ │ 读库 │ │ MySQL(主) │ │ ES/Redis │ └───────────┘ └──────────┘2.2 CQRS vs 传统读写分离
很多人以为CQRS就是数据库主从读写分离。这是一个常见的误解。
| 维度 | 数据库读写分离 | CQRS |
|---|---|---|
| 分离层级 | 数据库层 | 应用层(模型层) |
| 数据模型 | 同一个表结构 | 不同的数据模型(可以不同表、甚至不同数据库) |
| 一致性 | 主从复制延迟(秒级) | 事件异步同步(毫秒到秒级) |
| 优化方向 | 主库优化写入,从库分担读压力 | 写模型优化业务完整性,读模型优化查询性能 |
| 适用场景 | 读多写少 | 读写模型差异大、查询复杂 |
一句话总结:CQRS是模型级别的读写分离,传统方案是数据级别的读写分离。
2.3 何时该用CQRS
不是所有系统都需要CQRS。判断标准:
需要CQRS的标志: ✅ 查询需要的数据结构与写入的数据结构差异很大 ✅ 查询需要JOIN多张表,写入只需要单表 ✅ 查询量远大于写入量(100:1以上) ✅ 查询需要跨多个限界上下文聚合数据 不需要CQRS的标志: ❌ 读写模型几乎一致 ❌ 业务逻辑简单,CRUD即可 ❌ 团队规模小,引入CQRS增加维护成本 ❌ 查询和写入量都比较低三、实战:交易系统CQRS改造
3.1 命令模型(写模型)
// ===== 命令端:领域模型,保持业务完整性 =====// 命令对象@DatapublicclassCreateOrderCommand{@NotBlankprivateStringuserId;@NotEmptyprivateList<OrderItemCommand>items;privateStringcouponId;privateStringaddressId;}// 命令处理器@ServicepublicclassCreateOrderCommandHandler{@AutowiredprivateOrderRepositoryorderRepository;@AutowiredprivateInventoryServiceinventoryService;@AutowiredprivateEventBuseventBus;@TransactionalpublicOrderIdhandle(CreateOrderCommandcommand){// 1. 创建订单聚合(充血模型,包含业务规则)Orderorder=Order.create(command);// 2. 扣减库存command.getItems().forEach(item->inventoryService.deduct(item.getProductId(),item.getQuantity()));// 3. 持久化orderRepository.save(order);// 4. 发布领域事件 → 驱动读模型更新eventBus.publish(newOrderCreatedEvent(order));returnorder.getId();}}// 订单聚合(写模型:关注业务完整性)@Entity@Table(name="t_order")publicclassOrder{@IdprivateStringorderId;privateStringuserId;privateBigDecimaltotalAmount;privateStringstatus;// CREATED → PAID → SHIPPED → COMPLETEDprivateStringaddressId;privateStringcouponId;privateLocalDateTimecreatedAt;// 订单明细(一对多)@OneToMany(cascade=CascadeType.ALL,fetch=FetchType.LAZY)@JoinColumn(name="order_id")privateList<OrderItem>items;// ===== 业务方法 =====publicstaticOrdercreate(CreateOrderCommandcommand){Orderorder=newOrder();order.orderId=OrderIdGenerator.generate();order.userId=command.getUserId();order.status="CREATED";order.createdAt=LocalDateTime.now();// 计算订单金额order.items=command.getItems().stream().map(item->OrderItem.create(order.orderId,item)).collect(Collectors.toList());order.totalAmount=order.items.stream().map(OrderItem::getSubTotal).reduce(BigDecimal.ZERO,BigDecimal::add);returnorder;}publicvoidpay(StringpayMethod){if(!"CREATED".equals(this.status)){thrownewOrderException("只能支付创建状态的订单");}this.status="PAID";}}3.2 查询模型(读模型)
// ===== 查询端:扁平化、宽表、NoSQL,优化查询性能 =====// 读模型DTO:一张宽表,包含所有展示需要的数据@Document(indexName="order_view")@DatapublicclassOrderView{@IdprivateStringorderId;privateStringuserId;privateStringuserName;// 冗余用户名privateStringuserPhone;// 冗余用户手机privateBigDecimaltotalAmount;privateStringstatus;privateStringstatusDisplay;// 状态中文名:已创建/已支付/已发货privateStringaddressDetail;// 冗余地址详情privateStringcouponName;// 冗余优惠券名称privateList<OrderItemView>items;// 嵌套对象privateLocalDateTimecreatedAt;privateLocalDateTimepaidAt;}// 查询处理器@ServicepublicclassOrderQueryHandler{@AutowiredprivateElasticsearchTemplateesTemplate;@AutowiredprivateRedisTemplate<String,OrderView>redisTemplate;publicPageResult<OrderListDTO>listOrders(ListOrdersQueryquery){// 构建ES查询NativeSearchQuerysearchQuery=newNativeSearchQueryBuilder().withQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("userId",query.getUserId())).must(QueryBuilders.rangeQuery("createdAt").gte(query.getStartTime()).lte(query.getEndTime()))).withSort(SortBuilders.fieldSort("createdAt").order(SortOrder.DESC)).withPageable(PageRequest.of(query.getPage(),query.getSize())).build();// 从ES查询(毫秒级响应)returnesTemplate.search(searchQuery,OrderView.class);}publicOrderDetailDTOgetOrderDetail(StringorderId){// 先查缓存StringcacheKey="order:detail:"+orderId;OrderViewcached=redisTemplate.opsForValue().get(cacheKey);if(cached!=null){returnOrderDetailDTO.from(cached);}// 缓存未命中,查ESOrderViewview=esTemplate.get(orderId,OrderView.class);if(view!=null){redisTemplate.opsForValue().set(cacheKey,view,5,TimeUnit.MINUTES);}returnOrderDetailDTO.from(view);}// 运营看板:实时聚合查询(ClickHouse)publicDashboardDTOgetDashboard(){// 从ClickHouse查询聚合数据Stringsql=""" SELECT channel_id, COUNT(DISTINCT user_id) as uv, COUNT(*) as order_cnt, SUM(amount) as gmv, SUM(amount) / COUNT(*) as avg_order_amount FROM order_wide_table WHERE created_at >= today() GROUP BY channel_id """;returnclickhouseTemplate.query(sql,DashboardDTO.class);}}3.3 事件处理器:同步读写模型
// ===== 写模型变更 → 发布事件 → 读模型更新 =====@ComponentpublicclassOrderEventProjector{@AutowiredprivateElasticsearchTemplateesTemplate;@AutowiredprivateRedisTemplate<String,OrderView>redisTemplate;// 监听订单创建事件@EventListener@Async@TransactionalEventListener(phase=TransactionPhase.AFTER_COMMIT)publicvoidonOrderCreated(OrderCreatedEventevent){Orderorder=event.getOrder();// 构建读模型(聚合多源数据)OrderViewview=newOrderView();view.setOrderId(order.getOrderId());view.setUserId(order.getUserId());view.setUserName(userService.getName(order.getUserId()));// 冗余用户名view.setTotalAmount(order.getTotalAmount());view.setStatus("CREATED");view.setStatusDisplay("已创建");view.setAddressDetail(addressService.getDetail(order.getAddressId()));if(order.getCouponId()!=null){view.setCouponName(couponService.getName(order.getCouponId()));}view.setItems(order.getItems().stream().map(item->{OrderItemViewitemView=newOrderItemView();itemView.setProductId(item.getProductId());itemView.setProductName(productService.getName(item.getProductId()));itemView.setQuantity(item.getQuantity());itemView.setPrice(item.getPrice());returnitemView;}).collect(Collectors.toList()));// 写入ES(查询模型)esTemplate.save(view);// 清除列表缓存StringlistCacheKey="order:list:"+order.getUserId();redisTemplate.delete(listCacheKey);log.info("读模型已更新: orderId={}",order.getOrderId());}// 监听订单支付完成事件@EventListener@Async@TransactionalEventListener(phase=TransactionPhase.AFTER_COMMIT)publicvoidonOrderPaid(OrderPaidEventevent){// 只更新变化的状态字段(增量更新)UpdateRequestupdateRequest=newUpdateRequest().set("status","PAID").set("statusDisplay","已支付").set("paidAt",event.getPaidAt());esTemplate.update(updateRequest,OrderView.class,event.getOrderId());// 清除详情缓存StringdetailCacheKey="order:detail:"+event.getOrderId();redisTemplate.delete(detailCacheKey);}}四、CQRS的核心挑战与应对
4.1 最终一致性
问题:写模型更新了,但读模型还没更新,用户刷新后看到旧数据。
应对策略:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 命令端返回最新状态 | 写入成功后,直接在响应中返回最新数据 | 简单场景,下单后展示订单详情 |
| 前端轮询+loading | 前端显示"处理中",定时刷新直到数据一致 | 秒杀、支付结果查询 |
| 事件驱动+WebSocket推送 | 读模型更新后,主动推送给前端 | 实时要求高的场景 |
| 乐观UI更新 | 提交后立即在前端显示新状态,后台异步纠正 | 社交互动(点赞、评论) |
我们的方案:命令执行成功后,对于关键操作(支付、退款),使用WebSocket主动推送状态变更;对于非关键操作(浏览记录),接受秒级延迟。
4.2 读写模型差异大如何保证数据正确
问题:读模型从多个数据源聚合数据,如何保证聚合逻辑的正确性?
应对策略:投影(Projection)模式——每个事件驱动的读模型更新,都应该是一个可重放的、幂等的函数。
// 读模型重建:当发现数据不一致时,可以全量重建@ComponentpublicclassOrderViewRebuilder{@AutowiredprivateOrderRepositoryorderRepository;@AutowiredprivateElasticsearchTemplateesTemplate;/** 全量重建指定时间范围的读模型 */@Scheduled(cron="0 0 3 * * ?")// 每天凌晨3点publicvoidrebuildDailyViews(){LocalDateTimeyesterday=LocalDateTime.now().minusDays(1);LocalDateTimetoday=LocalDateTime.now();List<Order>orders=orderRepository.findByCreatedAtBetween(yesterday,today);for(Orderorder:orders){OrderViewview=buildOrderView(order);esTemplate.save(view);}log.info("读模型重建完成: 共{}条订单",orders.size());}}4.3 事件顺序问题
如果订单先发生"创建"事件,再发生"支付"事件,但如果"支付"事件先到达投影器,就会导致状态错误。
解决方案:
// 利用数据库的乐观锁保证顺序@Document(indexName="order_view")publicclassOrderView{@VersionprivateLongversion;// ES版本号,乐观锁// 只有高版本才能覆盖低版本}// 投影器:只接受更高版本的事件publicvoidonOrderPaid(OrderPaidEventevent){// 检查当前读模型的版本OrderViewcurrent=esTemplate.get(event.getOrderId(),OrderView.class);if(current!=null&¤t.getVersion()>=event.getVersion()){log.warn("忽略过期事件: orderId={}, currentVersion={}, eventVersion={}",event.getOrderId(),current.getVersion(),event.getVersion());return;}// 更新读模型...}五、CQRS配合Event Sourcing的进阶用法
当CQRS和Event Sourcing结合,可以构建出可审计、可回溯的终极架构:
// 事件存储(Event Store)CREATETABLEevent_store(event_idVARCHAR(64)PRIMARYKEY,aggregate_idVARCHAR(64)NOTNULL,aggregate_typeVARCHAR(50)NOTNULL,event_typeVARCHAR(100)NOTNULL,event_dataJSONNOTNULL,versionINTNOTNULL,occurred_atTIMESTAMPNOTNULL,INDEXidx_aggregate(aggregate_id,version));// 命令处理器:不修改状态,只追加事件@ServicepublicclassOrderCommandHandler{@TransactionalpublicvoidpayOrder(PayOrderCommandcommand){// 1. 加载事件流List<Event>events=eventStore.load(command.getOrderId());// 2. 重放事件,重建聚合状态Orderorder=Order.replay(events);// 3. 执行业务操作,产生新事件OrderPaidEventpaidEvent=order.pay(command.getPayMethod());// 4. 追加事件eventStore.append(command.getOrderId(),paidEvent,events.size()+1);// 5. 发布事件(异步更新读模型)eventBus.publish(paidEvent);}}六、技术选型建议
| 组件 | 推荐方案 | 备选方案 |
|---|---|---|
| 写数据库 | MySQL/PostgreSQL | MongoDB |
| 读数据库(搜索) | Elasticsearch | Solr |
| 读数据库(聚合) | ClickHouse | Druid、Doris |
| 缓存 | Redis Cluster | Caffeine(本地) |
| 事件总线 | RocketMQ/Kafka | RabbitMQ |
| 事件存储 | PostgreSQL/EventStoreDB | Axon Framework |
七、总结
CQRS不是银弹,它有明显的代价:系统复杂度翻倍,运维成本增加,最终一致性需要额外处理。
但如果你的系统面临以下情况,CQRS是值得投入的:
- 读写模型差异大:查询需要JOIN多张表,写入需要保持事务完整性
- 读写比例悬殊:大量复杂查询拖慢写入性能
- 团队足够成熟:能驾驭事件驱动、最终一致性、投影重建等概念
- 业务价值足够高:性能提升带来的业务收益 > 架构复杂度带来的成本
核心经验:
- 从最简单的CQRS开始:先分离读写Service,再分离数据模型,最后分离数据库
- 读模型可以有多个:搜索用ES、聚合用ClickHouse、列表用Redis,各司其职
- 不要追求即时一致性:接受秒级延迟,用"好体验"覆盖"小延迟"
- 建立读模型重建机制:比"保证永远正确"更重要的是"出错后能快速修复"
CQRS的本质是用空间换时间、用最终一致性换性能。当你接受这个权衡时,你离高可用的系统架构就不远了。
个人观点,仅供参考
