当前位置: 首页 > news >正文

一个 pg_try_advisory_lock,搞定 CQRS 投影选主

给 Pico-CRM 上事件溯源的时候,订单、排班、服务需求三个核心聚合的事件流跑得挺顺畅。事件写进去了,但一个问题马上冒出来——谁负责把事件投成读模型?

多台服务器部署的时候,投影不能每台都跑,否则订单投影写三遍、排班投影写三遍,读模型的写入压力直接翻三倍,还可能出现竞态写入。但你又不能赌某台机器罢工了整个投影就停了。

最后用了 PostgreSQL 的 advisory lock,一个函数就实现了投影的 leader 选举,整个选主逻辑不到 20 行 Rust 代码,零外部依赖。

一、为什么投影不能每台都跑

先理清概念。

事件溯源的写入链路通常是:命令 → 事件存储(append only) → 投影监听器 → 读模型

事件存储(EventStore)没毛病,所有实例都能写。但投影监听器是个后台常驻任务,它不断轮询事件流的尾部,把新事件投成读模型的行。

举个例子:订单创建事件发生后,投影监听器在orders表里 INSERT 一条订单行。如果你有三个实例同时跑订单投影,同一个事件会被 INSERT 三次——要么报 duplicate key,要么出现三行一样的订单。

所以需要leader 选举:多个实例中只选出一个来跑投影,其他实例不跑,等 leader 挂了再换人。

说到这里,你会发现这个场景有几个特点:

  • 选主逻辑得简单——我不想为了选主再部署一个 Zookeeper
  • 锁必须和连接生命周期绑定——进程挂了锁自动释放,不用处理脑裂
  • 最好用现有的基础设施——咱已经有一个 Postgres 了

pg_try_advisory_lock完美满足这三点。

二、pg_try_advisory_lock 是什么

PostgreSQL 的 advisory lock(建议锁)是一种应用层锁,跟行锁、表锁没关系,完全由应用自己定义锁的语义。

关键区别:

锁类型作用范围和事务的关系释放时机
行锁/表锁表/行事务内事务结束自动释放
advisory lock应用自定义事务无关连接断开或显式释放

注意第三列和第四列:advisory lock不在事务内,锁的生命周期跟着连接走。连接断开,锁自动释放。

这对选主来说太友好了:

  • 你开一个连接,获取 advisory lock
  • 持有连接的进程只要不挂,锁就一直有效
  • 进程挂了 → 连接断 → 锁自动释放 → 另一个实例捡起来

没有任何过期 key 清理、心跳续约、脑裂修复的代码。Postgres 帮你兜底。

三、Rust 里怎么用

先定义一个锁的 key,保证全局唯一:

constPROJECTION_LEADER_LOCK_KEY:i64=0x5049_434f_4351_5253;

这个十六进制转成 ASCII 是PICOCQRS,纯属防碰撞,没别的意义。

然后是获取锁的函数:

// backend/src/infrastructure/event_store/mod.rspubasyncfnhold_projection_leader_lock()->Result<bool,String>{letpool=event_store_pool().await?;letmutconn=pool.acquire().await.map_err(|e|format!("acquire projection leader lock connection error: {}",e))?;// 关键:用 pg_try_advisory_lock,不加锁直接返回 false,不阻塞letacquired:bool=sqlx::query_scalar("SELECT pg_try_advisory_lock($1)").bind(PROJECTION_LEADER_LOCK_KEY).fetch_one(&mut*conn).await.map_err(|e|format!("acquire projection leader lock error: {}",e))?;if!acquired{returnOk(false);// 别人已经是 leader,直接退出}// 关键:把连接 spawn 到后台永久挂起,保持锁不释放tokio::spawn(asyncmove{let_projection_lock_conn=conn;pending::<()>().await;// 永不返回});Ok(true)}

两个关键细节:

用 try 而不是直接 lock

pg_try_advisory_lockpg_advisory_lock的区别是:前者拿不到立刻返回false,后者拿不到就阻塞等待。

选主场景你要的是"要么拿下当 leader,要么算了当 standby",不是排队等,所以用 try 版本。

必须把连接挂起

advisory lock 释放的唯一途径是连接断开。如果你拿到锁后把连接还回连接池,锁就丢了——下一个从池里拿到同一连接的请求可能随时把锁断开。

所以我拿到锁后,直接把连接 spawn 到一个 tokio 任务里,然后pending::<()>().await——这是一个永不完成的 future,连接活着,锁就一直持有。进程挂了 tokio 任务也就没了,连接自然断开,锁释放。

这个写法是一个很经典的 pattern:锁 = 连接 = 进程存活,三者生命周期完全耦合,简单却可靠。

四、完整的启动流程

项目代码是这样串起来的:

// backend/src/infrastructure.rspubasyncfnbootstrap_cqrs(read_model_db:DatabaseConnection)->Result<(),String>{// 1. 初始化事件存储的 schemaevent_store::initialize().await?;// 2. 竞选 leaderif!event_store::hold_projection_leader_lock().await?{eprintln!("projection leader lock is already held by another process; \ skipping listener startup");returnOk(());// 没选上,直接返回,不启动监听器}// 3. 是 leader,启动所有投影监听器projections::spawn_all_listeners(read_model_db).await?;Ok(())}

服务入口在server/src/main.rs里调用:

bootstrap_cqrs(db.connection.clone()).await.unwrap_or_else(|err|panic!("启动 CQRS 基础设施失败: {}",err));

设计上,多实例部署时的行为是:

  • 最先起来的拿锁 → 当 leader → 启动投影监听器
  • 后起来的pg_try_advisory_lock返回 false → 打印一行日志跳过 → 正常启动 HTTP 服务,只是不跑投影

如果 leader 挂了,锁随着连接断开自动释放,下一次谁先起来谁就是新 leader。

五、投影监听器的配置也是一起考虑的

leader 选出来后,剩下的就是每个投影 listener 的具体配置了。三个投影(订单、排班、服务需求)结构一样,举个订单的例子:

// backend/src/infrastructure/projections/crm/order_projection.rsPgEventListener::builder(listener_event_store).uninitialized().register_listener(projection,PgEventListenerConfig::poller(Duration::from_millis(250))// 250ms 轮询.with_notifier()// 同时监听 PG NOTIFY,有事件立刻拉.with_retry(|err,attempts|{super::projection_listener_retry("order",err,attempts)}),).start().await

250ms 轮询 + PG NOTIFY 双通道,有事件时 NOTIFY 通知立刻处理,没事件时 250ms 定期兜底,同时指数退避重试(最多 10 次后 abort)。

这套选主 + 轮询 + 通知的组合拳,是反复折腾几个版本后定下来的形态。

六、踩过的坑

第一个坑:忘记挂起连接,锁秒级丢失。最早写的时候,hold_projection_leader_lock拿完锁就把连接还回池了,结果锁当场没了,起第二个进程照样能拿到锁,两边同时开跑。原因是 advisory lock 的释放语义是"连接断开或连接回池",不是"函数作用域到才释放"。必须把连接一直持有。

第二个坑:用了pg_advisory_lock而非 try 版本。开发时只起了一个实例没发现,但本地起第二个进程测试时,第二个直接卡住不动了——pg_advisory_lock拿不到锁会阻塞等待。改成 try 版本后,拿不到直接返回 false,不影响服务启动。

第三个坑:连接池复用问题。如果你用事务级的连接获取锁,然后回池,下次同一个连接被另一个查询任务复用时,那个任务完全不知道连接上挂了一个锁。如果那个任务执行完还了连接,锁又没了。别问我怎么发现的,反正 debug 了一下午。

第四个坑:ES_DATABASE_URL 和业务库不是同一个。项目里事件存储(EventStore)和读模型用独立的数据库连接,所以选主锁必须在事件存储库里操作。如果业务库和事件库是同一个实例但不同 Database,锁的作用域仅限于同一个 Database。

总结

PostgreSQL 的 advisory lock 做分布式选主,胜在够省力。没有额外的组件依赖,没有心跳续约的代码,没有到期清理的麻烦。锁跟连接绑死,连接跟进程绑死,进程挂了锁自然释放。pg_try_advisory_lock一个函数拿锁,拿不到就当 standby,思路很干净。

如果你们的项目也是 Rust + Postgres 栈,或者任何语言 + Postgres 都用得到这个技巧。不一定非得是 CQRS 投影,任何"多个实例只能一个人干"的定时任务、后台清理、数据修复场景,都可以用这个套路。

你的项目里用的什么选主方案?是自己搓的 Redis 锁,还是 etcd/ZK,还是直接用 Postgres 的 advisory lock?欢迎评论区聊聊。


项目开源在 GitHub,搜Pico-CRM即可找到,欢迎 star 和交流。

http://www.jsqmd.com/news/820461/

相关文章:

  • 魔兽争霸3现代化改造:5步解锁高帧率与大分辨率终极方案
  • Windows平台Faiss安装与配置实战指南
  • 我是怎么用 AI 把自己的知识“榨”出来的:Skill的再实践
  • 无损精准查缆:鼎讯 G-340A 在铁路高速场景的应用
  • 5分钟实现本地知识库:AnythingLLM原生嵌入器的终极指南
  • 国产AI陪聊,洋AI干活?
  • ACM会议论文被误标为期刊?Perplexity元数据清洗实战:用Python+ACM REST API批量修正1372篇文献类型
  • 前端项目环境管理利器:打造轻量级上下文切换工具
  • 从零构建高质量Awesome技术资源库:ChatGPT生态实践指南
  • PlotAI:用自然语言生成Python图表,AI重塑数据可视化工作流
  • 告别CH554:手把手教你用STM32F070实现电容触摸屏的I2C转USB HID驱动
  • Driver Store Explorer终极指南:免费开源工具彻底清理Windows驱动存储
  • 2026-2032年全球铸造焦炭市场规模冲刺37亿美元
  • Arm架构ID_ISAR4_EL1寄存器解析与同步原语实践
  • 开源AI代理框架agenzaar:模块化设计构建智能体应用
  • 谁能定义云安全AI时代?——具有“安全原生”的聚合与防护平台
  • QuPath病理图像多通道智能流水线:从人工重复到算法赋能的范式跃迁
  • PostgreSQL游标:海量数据处理与高效分页的核心机制
  • 国产网络监控工具深度评测——对比博睿,乐维
  • MZmine:开源质谱数据分析平台的架构革命与技术突破
  • 别再用免费版硬扛交付!Pro计划中被低估的“商用素材合规审计工具”如何帮你规避97%版权风险?
  • 2026营销策划岗位怎么提升个人能力水平:从创意执行到策略操盘
  • 光标控制平面:提升开发者编辑效率的智能导航引擎
  • Vue响应式原理的核心逻辑与实践价值
  • 【独家逆向工程报告】Sora 2输出帧率/色彩空间/音频采样率硬指标对照表,匹配YouTube推荐算法的黄金参数组合
  • 研发本就是“工具“,所以注定会被更好的工具替代?
  • Python小红书数据采集终极指南:xhs库完整使用教程与实战案例
  • 开源安全告警自动化分诊工具OpenClaw-Triage架构解析与实战部署
  • Auxiliar-ai:AI辅助编程工具的设计、应用与集成实践
  • 深度拆解douyin-downloader:抖音批量下载工具的架构内幕与关键技术突破