当前位置: 首页 > news >正文

Kafka基础篇

Kafaka安装和使用以及整和

  • 一、 安装(docker)
    • 1)创建docker-compose.yml文件
    • 2)测试
  • 二、 kafaka基础知识
    • 1)kafaka核心架构
    • 2) 工作流程
  • 三、Spring Boot 整合Kafka
    • 1. 导入依赖 ,配置yml文件
    • 2. API讲解
      • 2.1) `@KafkaListener`
      • 2.2) `KafkaTemplate`
      • 2.3) 实现手动提交偏移量

一、 安装(docker)

1)创建docker-compose.yml文件

mkdirkafka-democdkafka-demotouchdocker-compose.ymldockercompose up-d

docker-compose.yml

version:'3'services:kafka:image:apache/kafka:latest# 镜像container_name:kafka# 容器名ports:# 映射端口-"9092:9092"environment:KAFKA_NODE_ID:1# 当钱节点# KRaft 模式KAFKA_PROCESS_ROLES:broker,controllerKAFKA_LISTENERS:PLAINTEXT://:9092,CONTROLLER://:9093# 监听端口# controller Kafka集群节点之间通信# plaintext 普通客户端端口KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092# 客户端如访问kafakaKAFKA_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_LISTENER_SECURITY_PROTOCOL_MAP:CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTKAFKA_CONTROLLER_QUORUM_VOTERS:1@kafka:9093# nodeId@host:portKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:0

2)测试

dockerexec-itkafkabash# 进入容器# 创建topic/opt/kafka/bin/kafka-topics.sh\--create\--topictest-topic\--bootstrap-server localhost:9092# 查看topic/opt/kafka/bin/kafka-topics.sh\--list\--bootstrap-server localhost:9092# 启动生产者/opt/kafka/bin/kafka-console-producer.sh\--topictest-topic\--bootstrap-server localhost:9092# 输入hello kafka


启动一个新终端

dockerexec-itkafkabash/opt/kafka/bin/kafka-console-consumer.sh\--topictest-topic\--from-beginning\--bootstrap-server localhost:9092

可以看到

二、 kafaka基础知识

1)kafaka核心架构

  1. Producer(生产者)
    • 负责发送消息
    • 可以是订单系统 、 日志系统 、webApp
  2. Consumer(消费者)
    • 从主题订阅新消息的Kafka 客户端。消费者通过检查消息偏移量来区分消息是否已读。
  3. Topic(主题)
    • Kafka 消息通过主题进行分类。
    • 类似 数据库的表
      例如:order-topic log-topic user-topic
  4. Partition(分区)
    • 一个 Topic 可以拆成多个 Partition。
    • 分区后可以可以并行写、并行读、横向扩展
    • 本质就是多个日志文件
  5. Offset(偏移量)
    Kafka 中消息长这样:
Partition-00->hello1->world2->kafka

这个编号就是offset:

0、1、2

作用:

  • 标识消息位置
  • 记录消费进度
  1. Broker
    Kafka 集群中的一台服务器。

    • 组成 Kafka 集群
    • 提供高可用
    • 提供分布式能力
Broker-1 Broker-2 Broker-3
  1. Consumer Group(消费者组)

    • 多个消费者构成的消费者组,同时消费多个分区以实现高并发。
    • 每一个消费者属于一个特定的消费者组。
    • 消费者组中,一个消费者可以消费多个分区。
    • 一个分区只能被指定给一个消费者。
  2. Replica(副本)

    • Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

2) 工作流程

# 1.生产者发送消息Producer ↓ Topic# 2.Kafka写入PartitionTopic ├── P0 ├── P1 └── P2# 3.Consumer 拉取消息Consumer ->主动拉取

三、Spring Boot 整合Kafka

1. 导入依赖 ,配置yml文件

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
spring:kafka:bootstrap-servers:192.168.59.128:9092# kafka集群地址producer:#生产者配置retries:3acks:all# 要求所有副本都确认收到消息之后才算发送成功batch-size:16384# 批量发送大小 (16KB),累积到该大小后批零发送,提升吞吐量buffer-memory:33554432# 生产者缓冲区总大小 32MB,用于缓存待发送消息key-serializer:org.apache.kafka.common.serialization.StringSerializer# KEY的序列化器value-serializer:org.apache.kafka.common.serialization.StringSerializer# 消息的序列化器properties:linger.ms:1# 消息在发送前最多等待1ms,配和batch-size 实现微批量consumer:# 消费者配置group-id:pet-life-consumer-group# 消费者组ID,同一组的消费者回分摊消费分区auto-offset-reset:earliest# 无初始偏移量时,从最早的消息开始消费(另一个常用值是 latest)enable-auto-commit:true# 自动提交消费位移,简化消费端逻辑auto-commit-interval:1000# 自动提交的间隔为 1000ms(即 1 秒)#消息 Key 和 Value 的反序列化器,与生产者对应key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializermax-poll-records:500# 单次 poll() 调用最多拉取 500 条消息listener:missing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误,避免启动失败concurrency:3# 消费监听器并发线程数为 3,即同时启动 3 个消费者实例消费分区

2. API讲解

2.1)@KafkaListener

public@interfaceKafkaListener{Stringid()default"";//Listener 唯一 ID。StringcontainerFactory()default"";//指定监听容器工厂String[]topics()default{};//指定监听的topicStringtopicPattern()default"";//正则匹配topicTopicPartition[]topicPartitions()default{};//精确撇脂topic及其分区StringcontainerGroup()default"";StringerrorHandler()default"";//异常处理器StringgroupId()default"";//指定消费者组booleanidIsGroup()defaulttrue;// 只id 和groupId是否相同StringclientIdPrefix()default"";//Kafka Client ID 前缀StringbeanRef()default"__listener";Stringconcurrency()default"";// 并发消费线程数量StringautoStartup()default"";// 是否自动消费String[]properties()default{};//额外 Kafka 配置。booleansplitIterables()defaulttrue;StringcontentTypeConverter()default"";Stringbatch()default"";Stringfilter()default"";Stringinfo()default"";StringcontainerPostProcessor()default"";}

示例:

@KafkaListener(topics="test")publicvoidlisten(Stringmessage){System.out.println("Received message: "+message);}

2.2)KafkaTemplate

KafkaTemplate本质上就是对 Kafka Producer的高级封装

发送消息的API如下:

方法作用
send(topic, data)普通发送
send(topic, key, data)带 key
send(topic, partition, key, data)指定分区
send(record)完整 ProducerRecord
send(message)Spring Message
sendDefault()默认 Topic
executeInTransaction()事务消息
  • 其中带 key 和普通发送的区别 :

    Kafka会对key进行哈希运算,对于同一个key会进入同一分区,能够保证**局部顺序性**
  • 完整ProducerRecord

publicclassProducerRecord<K,V>{privatefinalStringtopic;// 主题privatefinalIntegerpartition;//分区privatefinalHeadersheaders;// 元数据头部,用于在消息体之外传递额外信息privatefinalKkey;// keyprivatefinalVvalue;//消息privatefinalLongtimestamp;// 时间戳}
场景示例
链路追踪传入 traceId、spanId,实现分布式追踪
消息去重传入唯一 messageId,消费端幂等判断
内容标识标记消息的 contentType、encoding
路由标记标记消息来源 source、目标 target
自定义标记任意业务标识,如 priority、retryCount

接收header的方法 :

// 方式1:用 @Header 注解取单个值@KafkaListener(topics="test-topic")publicvoidlisten(Stringmessage,@Header("traceId")StringtraceId,@Header("source")Stringsource,Acknowledgmentack){System.out.println("traceId="+traceId+", source="+source+", body="+message);ack.acknowledge();}// 方式2:接收完整 Headers 对象@KafkaListener(topics="test-topic")publicvoidlisten(Stringmessage,Headersheaders,Acknowledgmentack){headers.forEach(h->System.out.println("key="+h.key()+", value="+newString(h.value())));ack.acknowledge();}// 方式3:接收 ConsumerRecord(最完整,包含分区、偏移量等所有信息)@KafkaListener(topics="test-topic")publicvoidlisten(ConsumerRecord<String,String>record,Acknowledgmentack){System.out.println("topic="+record.topic()+", partition="+record.partition()+", offset="+record.offset()+", headers="+record.headers()+", value="+record.value());ack.acknowledge();}

2.3) 实现手动提交偏移量

避免业务还没有跑完,就提交的偏移量。如果执行过程中出现故障,但是这条消息已经消费过了,造成数据丢失。

修改yml文件

spring:kafka:consumer:# 消费者配置enable-auto-commit:false# 关闭自动提交,配合 ack-mode: manual 手动提交偏移量listener:ack-mode:manualmissing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误,避免启动失败
@KafkaListener(topicPattern="test-.*",groupId="test-group")publicvoidlisten(Stringmessage,Acknowledgmentack){try{System.out.println("Received message: "+message);// 业务逻辑处理...}finally{ack.acknowledge();// 手动提交偏移量}}
http://www.jsqmd.com/news/849200/

相关文章:

  • 相位恢复技术:XY-Hamiltonian优化框架与应用
  • Ascend(昇腾)性能优化文章导航
  • 新大陆物联网竞赛经验谈
  • 别再混用网络了!手把手教你用华为VRF隔离生产网和办公网(附完整配置命令)
  • 微信小程序 Vue3基于springboot框架的社区团购自提系统
  • 服装拉链袋厂家/服装包装袋厂家哪家好?2026年优质复合包装袋定制厂家盘点|江西cpe膜厂家推荐:勤思塑业领衔 - 栗子测评
  • 用 BuildingAI 玩上了 Image2 和 Nano Banana!开源免费 + 一键兑换,效果戳→
  • 【软考中级备考日记|系统集成项目管理工程师Day2:信息系统基础精讲+新一代信息技术核心考点+20道专项必刷题】
  • 2026优选:靠谱超声波液位计/流量计/热量表源头厂家推荐解读 - 栗子测评
  • 今天开始学爬虫1
  • D1011UK,28V电压下10W输出500MHz频段实现50%漏极效率功率晶体管
  • 做数字人别踩坑!浩凯实业整理数字人麦克风推荐清单,USB摄像头及各类阵列、全向麦克风厂家深度解析 - 栗子测评
  • 激光全息防伪标签哪家好?2026二维码防伪标签公司推荐:宏鑫源防伪测评 - 栗子测评
  • JOIN、IN、EXISTS谁最快?实测三种写法性能差异与执行计划深度剖析
  • 2026年CRM系统综合实力排名
  • 5分钟搞定U盘验货!这款绿色工具真香到离谱
  • AsmDude2:如何在Visual Studio 2022中实现汇编开发效率提升300%
  • 一切命运皆可破,我命由我不由天
  • 2026再生橡胶厂家推荐:新疆橡胶管厂家+路锥厂家推荐清单 - 栗子测评
  • 糜子CRISPR转化 伯远生物
  • 2026年优选金属圆锯机厂家推荐:润泰机械领衔,实力强的金属圆锯机厂家/高速圆锯机厂家汇总 - 栗子测评
  • 影像技术实战15:图片偏黄、偏蓝、发灰?OpenCV 白平衡、CLAHE 与色彩校正工程方案
  • PotplayerPanVideo终极指南:告别网盘播放限制,享受本地播放器流畅体验
  • Day33-1: Serilog(日志中间件)VS OperLogHelper(操作日志帮助类)
  • MiniMax Agent 正式更名 Mavis 上线多智能体协作
  • BagelVLA:通过交错式视觉-语言-动作生成,增强机器人长时程操纵能力
  • 2026年4月行业内靠谱的铜大缸设计厂商推荐,铜大缸/铸铁铸铝雕塑/铜狮子铜大象/铜钟/铜香炉,铜大缸加工厂口碑推荐 - 品牌推荐师
  • 批量操作进阶:百万行级数据导入的性能极限
  • 采购必看:管路蒸汽成型设备厂家哪家好?2026管路成型隧道炉厂家推荐:领拓工业领衔|优质管路蒸汽成型设备厂家盘点 - 栗子测评
  • 影像技术实战16:视频抽帧重复太多?dHash + 时间窗口构建关键画面去重方案