当前位置: 首页 > news >正文

消息队列设计:从同步到异步的性能突破

前言

2024年初,我们的订单系统经常出现"超时"问题。用户下单后,系统需要同时调用库存服务、支付服务、通知服务,任何一个服务慢都会导致整个请求超时。

我们决定引入消息队列,将同步调用改为异步处理。这个改造带来了显著的性能提升。


一、问题:同步调用的瓶颈

原始的订单流程是这样的:

python

@app.route('/api/orders', methods=['POST']) def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 同步调用库存服务 inventory_response = requests.post( 'http://inventory-service/deduct', json={'product_id': order.product_id, 'quantity': order.quantity} ) if inventory_response.status_code != 200: return {"error": "库存不足"}, 400 # 3. 同步调用支付服务 payment_response = requests.post( 'http://payment-service/pay', json={'order_id': order.id, 'amount': order.amount} ) if payment_response.status_code != 200: return {"error": "支付失败"}, 400 # 4. 同步调用通知服务 notify_response = requests.post( 'http://notify-service/send', json={'order_id': order.id, 'type': 'order_created'} ) return {"order_id": order.id}, 201

问题

  • 任何一个服务慢都会导致整个请求慢;
  • 任何一个服务故障都会导致订单创建失败;
  • 耦合度太高,难以扩展。

性能数据

  • 库存服务:200ms
  • 支付服务:300ms
  • 通知服务:150ms
  • 总耗时:200 + 300 + 150 =650ms

二、解决方案:引入RabbitMQ

我们选择RabbitMQ作为消息队列。改造后的流程:

2.1 发布订单创建事件

python

import pika import json def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 发布事件到消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() # 声明交换机和队列 channel.exchange_declare(exchange='orders', exchange_type='topic') # 发布消息 message = { 'order_id': order.id, 'product_id': order.product_id, 'quantity': order.quantity, 'amount': order.amount } channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message) ) connection.close() # 立即返回响应 return {"order_id": order.id}, 201

耗时:仅需10ms(发布到队列)

2.2 消费者:库存服务

python

def inventory_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='inventory_queue', durable=True) queue_name = result.method.queue # 绑定队列到交换机 channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 扣减库存 deduct_inventory( message['product_id'], message['quantity'] ) # 确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # 拒绝消息,重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('库存服务已启动,等待消息...') channel.start_consuming() if __name__ == '__main__': inventory_consumer()

2.3 消费者:支付服务

python

def payment_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='payment_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 处理支付 process_payment( message['order_id'], message['amount'] ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('支付服务已启动,等待消息...') channel.start_consuming()

2.4 消费者:通知服务

python

def notify_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='notify_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 发送通知 send_notification( message['order_id'], 'order_created' ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('通知服务已启动,等待消息...') channel.start_consuming()


三、可靠性保证

3.1 消息持久化

python

# 声明持久化队列 channel.queue_declare( queue='payment_queue', durable=True # 队列持久化 ) # 发布持久化消息 channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) )

3.2 消息确认机制

python

Copy code

# 手动确认消息 def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认 except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 拒绝并重新入队 # 禁用自动确认 channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=False # 手动确认 )

3.3 死信队列

python

# 声明死信交换机 channel.exchange_declare(exchange='dlx', exchange_type='direct') channel.queue_declare(queue='dead_letter_queue', durable=True) channel.queue_bind(exchange='dlx', queue='dead_letter_queue') # 声明普通队列,指定死信交换机 channel.queue_declare( queue='payment_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dead_letter' } )


四、监控和告警

python

import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def callback(ch, method, properties, body): start_time = time.time() try: process_message(body) duration = time.time() - start_time logger.info(f"消息处理成功, 耗时: {duration}ms") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"消息处理失败: {str(e)}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


五、国际化团队的挑战

在跨国团队中,消息队列的错误日志和告警需要支持多语言。我们使用同言翻译(Transync AI)来自动翻译消息队列的错误信息和监控告警,确保不同语言背景的团队成员能够快速理解问题并做出响应。


六、性能对比

指标同步调用异步消息队列提升
平均响应时间650ms10ms-98.5%
P99响应时间2000ms50ms-97.5%
系统吞吐量1000 req/s10000 req/s+900%
故障隔离-

七、最佳实践

  1. 幂等性设计:消费者应该能够安全地处理重复消息;
  2. 超时设置:为消息处理设置合理的超时时间;
  3. 监控队列深度:及时发现消费者处理不过来的情况;
  4. 分离关注点:生产者和消费者应该解耦;
  5. 定期审查:定期检查死信队列,找出问题消息。

八、结语

消息队列的引入,从根本上改变了我们的系统架构。从同步的紧耦合,到异步的松耦合,系统的可扩展性和可靠性都得到了显著提升。

如果你的系统也在经历性能瓶颈,消息队列可能是一个很好的解决方案。

http://www.jsqmd.com/news/79401/

相关文章:

  • 如何用AI优化fcitx5中文输入法的词库和预测
  • 传统调试vsAI辅助:解决Spring启动异常效率对比
  • 一个完全本地运行的视频转文字工具:Vid2X
  • 函数式编程学习(Java)
  • DB-GPT:AI如何革新数据库管理与查询
  • 浅析Spring中的PropertySource 的基本使用
  • 3小时打造6v电影网MVP原型实战
  • ZooKeeper 基本概述
  • 基于Springboot瑜伽馆管理系统【附源码+文档】
  • 当PDF遇上AI:MinerU如何用1.2B参数吊打千亿级大模型?
  • 微服务面试题:概览
  • LangGraph深度解析:从图基础到人机交互的AI工作流框架实践
  • 无需安装!在线体验n8n的5种创新方法
  • Java 开发最容易犯的 10 个错误
  • 意图识别深度原理解析:从向量空间到语义流形
  • RepoEval:定义仓库级代码补全评估的新基准
  • java Happens - before 原则到底是什么
  • 提升资源管理效率必备工具推荐
  • 2025 年 12 月雅安市汽车租赁服务权威推荐榜:轿车、豪车、越野车、婚车、大巴车、商务车、房车、旅游车、跑车、皮卡车一站式尊享服务 - 品牌企业推荐师(官方)
  • C++--
  • 2025年12月软件开发公司权威推荐榜:小程序开发、APP开发,专业定制与创新技术实力深度解析 - 品牌企业推荐师(官方)
  • 关于-根据-ISO8601-国际标准-计算一年中的周数-每年最少52周-每多53周
  • 《Ascend C:从“算子炼金术”到国产AI芯片生态的破局之战》
  • 刘洋洋新歌《梁祝之三世约》上线,唱尽轮回绝恋
  • 2025 年 12 月滚塑模具厂家权威推荐榜:滚塑钢模/铝模/铸铝模具/铝板模具/加工制品/产品/穿梭机/烘箱,匠心工艺与高效产能深度解析 - 品牌企业推荐师(官方)
  • ⭐解锁RAG与Spring AI的实战应用(万字详细教学与完整步骤流程实践)
  • MySQL 知识点复习- 6. ORDER BY, GROUP BY
  • Flink学习笔记:反压
  • 意图识别面试通关指南:从基础问答到场景落地
  • NOI范围下的背包DP模型