当前位置: 首页 > news >正文

消息队列篇

一. kafka篇

优点:是具有高性能、高吞吐量,分布式,可扩展消息队列。主要用于日志收集,流式处理,解耦。

缺点:1. 运维成本比较高 2. 默认是pull类型的,当消费者消费速率低于生产速率,会出现消息积压。 3. 单个分区有序跨分区不能保证有序

1. kafak默认单个分区顺序消费,如何保证全局有序?

1. 将整个Topic划分成一个分区

2. 应用层排序策略

1. 每条消息加上业务时间戳或者业务序列号

2. 消费者拉取消息后,先按时间戳进行排序再消费

3. 采用幂等性保证消息消费的顺序性

3. 按业务维度设置Key分区(局部有序)

局部业务有序,通过设置key,让相同维度消息划分到该区域。

2. 主要的概念

producer: 消息的生产者,向kafka发送消息。

consumer: 消息的消费者,从kafka读取消息。

topic:消息分类和管理的基本单位,生产者和消费者的桥梁。

partition:是topic的物理分片单位,保证消息的并发处理能力和顺序消费。

broker: kafka的服务器节点,存储消息。

offset: 消息的唯一标识符

consumer group:多个消费者的集合,同一个消费者组中的多个消费者,可以协同消费一个topic信息。同一个分区,只能被同一个消费者组中的一个消费者进行消费。(保证不重复消费)不同消费者组中的不同消费者可以消费同一个分区的消息。

消费者组的作用:1. 实现消息的并行消费 2. 负载均衡 3. 容错能力 4. 消息只被消费一次

kafka如何保证消息不被重复的进行消费?

生产者端考虑:

1.启用幂等性发送,通过设置enable.idempotence=true,生产者会对发送的消息设置唯一性的标识,Broker端在写入消息时候,会对标识进行判断,从而消息保证不会重复的写入。

2.事务的支持:对于跨分区的原子性写入,生产者端开启事务,生产者端发送完一批消息之后,提交事务。保证这批消息要么全部写入成功,要么全部失败,避免部分重复。

3.通过设置唯一性的标识。

4. 开启ack机制,保证所有副本都同步完成以后才进行消费。

消费者端考虑:

1. 通过手动提交offset,而非自动的提交,确保消息处理完成以后再进行提交。

2. 幂等性处理:消费者处理消息时候,先检查该消费是否已经进行消费,如果没有消费才进行消费。

3. 如何提高kafka消费速率?

提升消费速率的重点是增加分区 + 提升单个组内消费者数量。

下面说法不正确:

多个消费者组消费同一个 Topic,属于广播模式,每个组都会消费所有消息一次,并不会提高消费速率,反而可能增加系统负载;

4. kafka如何保证消息不丢失?

生产者端考虑:

1. 生产者端开启ACK机制,ACK=all。

2. 生产者端开启重试策略

消费者端考虑:

1. 设置手动提交offset,保证消费成功以后再进行提交。(enable.auto.commit=false)

2. 设置异常处理机制,保证消息消费失败之后,不会跳过,而是会重试或者日志记录。

broker端考虑:

1. 副本机制:Topic默认有多个副本,一个leader,其余的都是Follower。同步复制保障高可用。

2. ISR机制: kafka仅在所有的副本都写入成功以后,才确认写入成功。

3. 数据持久化机制

5. kafka为啥工作的这么快:

1. 磁盘顺序读写

2. 零拷贝

数据从磁盘到网卡直接传输,不经过用户空间,避免了用户态和内核态之间的多次拷贝和上下文切换。

kafka的ACK机制:

1. ACK=0:生产者端向broker端发送消息,不需要broker端确认,就认为消息发送成功。

2. ACK=1:生产者端向broker端发送消息,需要等待leader节点写入成功以后,才认为消息发送成功。

3. ACK=all:生产者端向broker端发送消费,需要等待leader副本和follower副本都成功写入以后,才认为消息发送成功。(broker端发送消息写入成功的消息给生产者端,生产者端认为消息写入成功)。

ISR(In-Sync Replicas)同步副本集合的缩写。ISR指的是与leader节点保持完全同步的副本集合。只有ISR内的follower节点才参与数据的同步和容灾切换。

6. 如何避免kafka的消息积压问题?

首先,kafka的消息积压问题是因为消费者的速率低于生产者的速率导致的。在进行问题排查时候,可以从三个层面考虑。是否生产者写入的速率过快,是否是broker端的分区数过少,是否是消费者端的消费者数量过少。

解决方法:

增加Topic的分区数:

1. 增加Topic的分区数,让多个消费者进行并发的消费。

提高消费者的能力:

1. 增加消费者组内的消费者的数量

2. 采用线程池异步消费

3. 批量进行拉取,批量消费,而非单条拉取,单条消费。

kafka哪些参数调优:

1. 生产者端: ACK = all避免消息丢失。开启重试机制+幂等性机制保证消息不会丢失。

2. broker端:一个Topic分多个分区,一个分区设置多个副本。避免消息积压,避免消息丢失。

3. 消费者端:手动提交偏移量,设置幂等性效验。防止消息丢失,以及消息重复消费。

批量进行拉取,批量进行消费解决消息积压问题。

7. kafka的使用场景

1. 日志收集系统(ELK)

2. 实时数据分析(spark,flink)

3. 用户行为追踪(埋点)

4.消息解耦与异步通信(微服务架构)

8. kafka+ZK进行部署

方式1:使用 ZooKeeper(传统方式)
方式2:KRaft 模式(新方式,不需要 ZooKeeper)
kafka:kafka_2.12-3.9.1 ZooKeeper:3.8.4

| 元数据类型 | 说明 | 存储位置(传统 Kafka+ZooKeeper) |
| --------------------- | ------------------------------------- | ------------------------ |
| **Broker 列表** | 所有 Kafka 节点的信息(id、host、port) | ZooKeeper |
| **Topic 信息** | 包括 Topic 名称、分区数量、副本数量 | ZooKeeper |
| **Partition 信息** | 每个分区对应的 Leader、Follower、ISR 列表 | ZooKeeper |
| **Controller Broker** | 当前集群的 Controller 节点(负责选举 Leader、分区变更) | ZooKeeper |
| **Consumer Group 信息** | 每个消费组的 offset、成员信息(谁在消费哪个分区) | ZooKeeper |
| **ACL & 配置** | 访问控制和 Topic 配置 | ZooKeeper |

所以在 Kafka 2.x + ZooKeeper 架构下,几乎所有集群级别的控制信息都在 ZooKeeper 里,
而实际消息数据(生产的消息)是存在 每个 Broker 的磁盘日志文件里,而不是 ZooKeeper

9. Kafka 和 ZooKeeper 的关系

Kafka broker 启动时会从 ZooKeeper 获取元数据(Broker 列表、Topic 信息等)
Kafka broker 会向 ZooKeeper 注册自己
Kafka controller 通过 ZooKeeper 选举产生
消费组 offset 在旧版 Kafka(2.x)是保存在 ZooKeeper 的,现在大多数版本 offset 可以保存在 Kafka 内部 Topic(__consumer_offsets)

10. 消费者端的消费原理

长轮询机制:
消费者会不断向 Kafka broker 发送 poll 请求
如果没有新消息,会阻塞等待(默认有超时时间)
一旦有消息到达,立即被拉取并触发 onMessage() 方法
自动消费:
不需要手动调用任何方法
Spring Kafka 框架会自动管理消费者的生命周期
消息到达后自动反序列化并调用你的处理方法
并发消费:
可以通过配置增加并发消费者数量
例如:设置 concurrency = 3 会启动 3 个消费者线程同时消费

@KafkaListener( topics = SECKILL_ORDER_TOPIC, groupId = "seckill-order-service", concurrency = "3" ) public void onMessage(ConsumerRecord<String, String> record) { String key = record.key(); String value = record.value(); log.info("收到秒杀订单消息, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), key, value); try { // 1. 反序列化消息体 SeckillOrderMessage message = objectMapper.readValue(value, SeckillOrderMessage.class); // 2. 调用领域服务创建订单(内部已包含幂等控制) SeckillOrder order = orderService.createSeckillOrder(message); log.info("处理秒杀订单消息成功, orderId={}, userId={}, productId={}", order.getId(), order.getUserId(), order.getProductId()); } catch (Exception ex) { // 这里简单打印日志,生产环境中应结合重试机制与死信队列处理异常消息。 log.error("处理秒杀订单消息失败, key={}, value={}", key, value, ex); // 可以根据需要抛出异常交给 KafkaListener 容错机制处理,这里选择吞掉避免阻塞后续消息。 } }

工作流程:
UserServiceApplication.start()

Spring 容器初始化

扫描到 @KafkaListener

创建 Kafka 监听容器

启动后台线程开始 poll 消息

【等待消息...】

生产者发送消息到 seckill-order topic

Kafka broker 收到消息

消费者 poll 到消息

触发 onMessage() 方法 ← 这里处理你的业务逻辑

11. Topic和消费者组再理解

同一个消费者组: 一条消息->只会被一个消费者消费

目的:实现负载均衡

不同消费者组: 一条消息->每个消费者组都能消费

Kafka 的 Consumer Group 主要解决两个问题:
1️.消费扩展(并行处理)
2️.业务解耦(不同系统消费同一数据)
一个 Consumer Group = 一个业务逻辑 / 一个系统

为什么不用一个 Consumer Group 处理所有业务?
1️. 数据只能消费一次,订单系统消费了,库存系统就消费不到
2. 业务耦合严重
订单 库存 风控 日志
生产环境的数据设计: Topic = 数据流 Consumer Group = 业务系统 Consumer = 并发处理能力

典型的业务场景:

同一个 Topic:
order-topic
不同系统消费:
订单系统 -> 处理订单
库存系统 -> 扣库存
日志系统 -> 记录日志
风控系统 -> 风控检测
他们使用不同 Consumer Group:
order-group stock-group log-group risk-group
每个系统都会消费同一条消息。
在Kafka中,消息是存储在Topic的Partition中的,每个消费者组都会维护自己的offset。同一个消费者组内,一条消息只会被其中一个消费者消费,用于实现负载均衡。
但不同消费者组之间互不影响,因此同一条消息可以被多个消费者组分别消费。

12. Topic的Partition数量的设定?

Partition>=Consumer 但是不能无限增加
Kafka官方建议:
单个Broker
最好 < 4000 Partition
更常见生产环境:
1000 ~ 2000 Partition / Broker
每个Partition会有:
索引文件,日志文件,网络连接,缓存
Kafka分区并不是越多越好。增加分区可以提升并发消费能力,因为Kafka的并发度主要由Partition数量决定。
但是不能无限的增加:因为分区过多,会增加Broker内存压力,原数据管理的开销,文件句柄数量,并且会导致Consumer Group Rebalance时间变长。因此在生产环境中需要根据吞吐量
和消费者数量合理规划分区数量。
Partition和Consumer关系的设定?原因?
官方推荐Partition一般是Consumer的2-3倍。
原因:
1. 方便扩容
Partition = 12 Consumer = 6
未来方便把消费者扩容到10
2. 容错能力更强
Partition > Consumer 有一个 Consumer 挂掉,Rebalance分区分布会更均衡。
3️.提高负载均衡
Partition = 10 Consumer = 3
C1 → P0 P1 P2 P3
C2 → P4 P5 P6
C3 → P7 P8 P9
Kafka会自动做均衡分配。
面试可以这样回答:
Partition 和 Consumer 并不一定需要 1:1。虽然 1:1 时并发利用率最高,但在生产环境通常会让
Partition 数量大于 Consumer 数量。这样可以支持未来扩容、提高负载均衡能力,并避免消费者宕机带来的压力集中问题。
因此一般会设计Partition数量为Consumer 的 2~3 倍。

13. kafka如何解决消息积压的问题

生产环境需要先判断是否是消息积压问题,可以通过脚本或者监控工具:Prometheus,Grafana。
监控工具如果出现异常就会告警,Lag报警,例如:Lag > 10000。
主要说一下通过脚本:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-group

| TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG |
| ----------- | --------- | -------------- | -------------- | --- |
| order-topic | 0 | 1200 | 1300 | 100 |
当前消费位置 = 1200
最新消息位置 = 1300
Lag = 100

解决方法:
1. 增加消费者数量 (前提要小于partion数量) 2. 增加分区数量(提高吞吐量)
3. 修改消费者的逻辑(1. 批量拉取消息进行消费 2. 异步处理,消息拉取线程只负责拉取消息,
消息处理线程处理任务)

真实场景:
Producer:5000 msg/s Consumer: 3000 msg/s
每秒就会产生:2000 Lag
如果持续 10 分钟:Lag = 2000 × 600 = 1,200,000
就会出现 严重消息积压。

14. 如何保证kafka顺序消费?

单机环境:topic只划分一个分区,最简单。
缺点:吞吐量低。
分布式环境 :
1. 局部有序性: 根据业务key划分,相同业务划分到同一个分区。
订单 orderId 用户 userId 账户 accountId
2. 全局有序性:
多个Partition消费
->收集消息
->根据时间戳或序列号排序
消费者:
先缓存 再排序
缺点:复杂度高 延迟高 内存消耗大

二. rabbitmq篇

优点: 高可用性,低延迟,支持多种路由协议(MQTT,AMQP),轻量级的消息队列。

缺点:1. 吞吐量不高 2. 消息积压的时候性能明显的下降 3. 自身不保证顺序性

单机吞吐量几万QPS

使用场景:1. 微服务异步解耦 2. 实时订单的处理 3. 短信消息通知

三. RocketMQ篇

优点:高吞吐量,低延迟,高可用性。单机吞吐量可以达到10万QPS。原生支持全局顺序和分区顺序。

缺点:1. 社区活跃度不如 Kafka 和 RabbitMQ;2. 生态不如 Kafka 成熟;

使用场景:大型的电商下单,金融交易系统

http://www.jsqmd.com/news/489374/

相关文章:

  • sql2o配置与实战:5分钟上手的数据库结果映射工具
  • 基于深度置信网络(DBN)与模糊神经网络(FNN)分类附Matlab代码
  • 猜数字小游戏来了~(冲冲冲!)
  • 基于决策树RGB图像分类附Matlab代码
  • SAP Fiori 图标体系实战:用 Icon Explorer、Virtual Element 与 Fiori Elements 提升业务识别效率
  • Nginx常见问题解决
  • PHing vs Make:PHP开发者必知的构建工具对比分析
  • Microsoft Agent Framework 测试豆包的根据图片生成矢量图的能力
  • 从0到1掌握PyNaCl:开发者必须了解的10个核心API
  • 2026年评价高的宽幅涂层机品牌推荐:辊式涂层机/立式玻纤涂层机实力厂家推荐 - 行业平台推荐
  • SAP Fiori 基础复合角色的设计逻辑、项目实践与 Clean Core 思维
  • phaser3-project-template完全指南:快速搭建专业HTML5游戏开发环境
  • 别把 SUM 2.0 当成转换按钮:一篇讲透 SAP S/4HANA System Conversion Tasks 的技术全景图
  • 2026年评价高的实验涂层机公司推荐:辊式涂层机实力品牌厂家推荐 - 行业平台推荐
  • 2026年比较好的心理测评大数据中心品牌推荐:心理测评大数据中心软件/心理测评大数据中心定制设备/心理测评大数据中心解决方案实力公司推荐 - 行业平台推荐
  • Matic Network存款与提款机制详解:ExitNFT与WithdrawManager工作原理解密
  • 把 SAP S/4HANA 系统转换做成一场可控工程:从预转换整改到 SUM 落地的任务全景图
  • YoloSide源代码探秘:PySide6界面与YOLOv8推理的完美结合
  • curriculum项目源码分析:深入理解Elixir模块设计与实现
  • 解决C++模板膨胀问题:ClangBuildAnalyzer高级分析功能实战
  • 看懂 SAP Readiness Check Functional Report:把 S/4HANA 转型风险前移到项目启动阶段
  • @react-native-menu/menu深入剖析:从源码看跨平台菜单组件的实现原理
  • DiscordBotClient与Vencord深度集成:打造个性化机器人管理体验
  • 掌握ScalaTest Matchers:让断言代码更简洁、更可读
  • Open UI5 源代码解析之614:Factory.js
  • MobileCoin交易流程全解析:从创建账户到完成匿名转账的每个步骤
  • curriculum项目最佳实践:提升Elixir代码质量的10个技巧
  • DeepGTAV v2:将GTA V转变为视觉自动驾驶研究环境的终极指南
  • 从0到1掌握RootlessKit:开发者必备的无特权容器工具详解
  • DC-TTS与Tacotron性能对比:为什么卷积网络训练速度更快?