Kafka基础篇
Kafaka安装和使用以及整和
- 一、 安装(docker)
- 1)创建docker-compose.yml文件
- 2)测试
- 二、 kafaka基础知识
- 1)kafaka核心架构
- 2) 工作流程
- 三、Spring Boot 整合Kafka
- 1. 导入依赖 ,配置yml文件
- 2. API讲解
- 2.1) `@KafkaListener`
- 2.2) `KafkaTemplate`
- 2.3) 实现手动提交偏移量
一、 安装(docker)
1)创建docker-compose.yml文件
mkdirkafka-democdkafka-demotouchdocker-compose.ymldockercompose up-ddocker-compose.yml
version:'3'services:kafka:image:apache/kafka:latest# 镜像container_name:kafka# 容器名ports:# 映射端口-"9092:9092"environment:KAFKA_NODE_ID:1# 当钱节点# KRaft 模式KAFKA_PROCESS_ROLES:broker,controllerKAFKA_LISTENERS:PLAINTEXT://:9092,CONTROLLER://:9093# 监听端口# controller Kafka集群节点之间通信# plaintext 普通客户端端口KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092# 客户端如访问kafakaKAFKA_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_LISTENER_SECURITY_PROTOCOL_MAP:CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTKAFKA_CONTROLLER_QUORUM_VOTERS:1@kafka:9093# nodeId@host:portKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:02)测试
dockerexec-itkafkabash# 进入容器# 创建topic/opt/kafka/bin/kafka-topics.sh\--create\--topictest-topic\--bootstrap-server localhost:9092# 查看topic/opt/kafka/bin/kafka-topics.sh\--list\--bootstrap-server localhost:9092# 启动生产者/opt/kafka/bin/kafka-console-producer.sh\--topictest-topic\--bootstrap-server localhost:9092# 输入hello kafka启动一个新终端
dockerexec-itkafkabash/opt/kafka/bin/kafka-console-consumer.sh\--topictest-topic\--from-beginning\--bootstrap-server localhost:9092可以看到
二、 kafaka基础知识
1)kafaka核心架构
- Producer(生产者)
- 负责发送消息
- 可以是订单系统 、 日志系统 、webApp
- Consumer(消费者)
- 从主题订阅新消息的Kafka 客户端。消费者通过检查消息偏移量来区分消息是否已读。
- Topic(主题)
- Kafka 消息通过主题进行分类。
- 类似 数据库的表
例如:order-topic log-topic user-topic
- Partition(分区)
- 一个 Topic 可以拆成多个 Partition。
- 分区后可以可以并行写、并行读、横向扩展
- 本质就是多个日志文件
- Offset(偏移量)
Kafka 中消息长这样:
Partition-00->hello1->world2->kafka这个编号就是offset:
0、1、2作用:
- 标识消息位置
- 记录消费进度
Broker
Kafka 集群中的一台服务器。- 组成 Kafka 集群
- 提供高可用
- 提供分布式能力
Broker-1 Broker-2 Broker-3Consumer Group(消费者组)
- 多个消费者构成的消费者组,同时消费多个分区以实现高并发。
- 每一个消费者属于一个特定的消费者组。
- 消费者组中,一个消费者可以消费多个分区。
- 一个分区只能被指定给一个消费者。
Replica(副本)
- Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
2) 工作流程
# 1.生产者发送消息Producer ↓ Topic# 2.Kafka写入PartitionTopic ├── P0 ├── P1 └── P2# 3.Consumer 拉取消息Consumer ->主动拉取三、Spring Boot 整合Kafka
1. 导入依赖 ,配置yml文件
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>spring:kafka:bootstrap-servers:192.168.59.128:9092# kafka集群地址producer:#生产者配置retries:3acks:all# 要求所有副本都确认收到消息之后才算发送成功batch-size:16384# 批量发送大小 (16KB),累积到该大小后批零发送,提升吞吐量buffer-memory:33554432# 生产者缓冲区总大小 32MB,用于缓存待发送消息key-serializer:org.apache.kafka.common.serialization.StringSerializer# KEY的序列化器value-serializer:org.apache.kafka.common.serialization.StringSerializer# 消息的序列化器properties:linger.ms:1# 消息在发送前最多等待1ms,配和batch-size 实现微批量consumer:# 消费者配置group-id:pet-life-consumer-group# 消费者组ID,同一组的消费者回分摊消费分区auto-offset-reset:earliest# 无初始偏移量时,从最早的消息开始消费(另一个常用值是 latest)enable-auto-commit:true# 自动提交消费位移,简化消费端逻辑auto-commit-interval:1000# 自动提交的间隔为 1000ms(即 1 秒)#消息 Key 和 Value 的反序列化器,与生产者对应key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializermax-poll-records:500# 单次 poll() 调用最多拉取 500 条消息listener:missing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误,避免启动失败concurrency:3# 消费监听器并发线程数为 3,即同时启动 3 个消费者实例消费分区2. API讲解
2.1)@KafkaListener
public@interfaceKafkaListener{Stringid()default"";//Listener 唯一 ID。StringcontainerFactory()default"";//指定监听容器工厂String[]topics()default{};//指定监听的topicStringtopicPattern()default"";//正则匹配topicTopicPartition[]topicPartitions()default{};//精确撇脂topic及其分区StringcontainerGroup()default"";StringerrorHandler()default"";//异常处理器StringgroupId()default"";//指定消费者组booleanidIsGroup()defaulttrue;// 只id 和groupId是否相同StringclientIdPrefix()default"";//Kafka Client ID 前缀StringbeanRef()default"__listener";Stringconcurrency()default"";// 并发消费线程数量StringautoStartup()default"";// 是否自动消费String[]properties()default{};//额外 Kafka 配置。booleansplitIterables()defaulttrue;StringcontentTypeConverter()default"";Stringbatch()default"";Stringfilter()default"";Stringinfo()default"";StringcontainerPostProcessor()default"";}示例:
@KafkaListener(topics="test")publicvoidlisten(Stringmessage){System.out.println("Received message: "+message);}2.2)KafkaTemplate
KafkaTemplate本质上就是对 Kafka Producer的高级封装
发送消息的API如下:
| 方法 | 作用 |
|---|---|
| send(topic, data) | 普通发送 |
| send(topic, key, data) | 带 key |
| send(topic, partition, key, data) | 指定分区 |
| send(record) | 完整 ProducerRecord |
| send(message) | Spring Message |
| sendDefault() | 默认 Topic |
| executeInTransaction() | 事务消息 |
其中带 key 和普通发送的区别 :
Kafka会对key进行哈希运算,对于同一个key会进入同一分区,能够保证**局部顺序性**完整
ProducerRecord
publicclassProducerRecord<K,V>{privatefinalStringtopic;// 主题privatefinalIntegerpartition;//分区privatefinalHeadersheaders;// 元数据头部,用于在消息体之外传递额外信息privatefinalKkey;// keyprivatefinalVvalue;//消息privatefinalLongtimestamp;// 时间戳}| 场景 | 示例 |
|---|---|
| 链路追踪 | 传入 traceId、spanId,实现分布式追踪 |
| 消息去重 | 传入唯一 messageId,消费端幂等判断 |
| 内容标识 | 标记消息的 contentType、encoding |
| 路由标记 | 标记消息来源 source、目标 target |
| 自定义标记 | 任意业务标识,如 priority、retryCount |
接收header的方法 :
// 方式1:用 @Header 注解取单个值@KafkaListener(topics="test-topic")publicvoidlisten(Stringmessage,@Header("traceId")StringtraceId,@Header("source")Stringsource,Acknowledgmentack){System.out.println("traceId="+traceId+", source="+source+", body="+message);ack.acknowledge();}// 方式2:接收完整 Headers 对象@KafkaListener(topics="test-topic")publicvoidlisten(Stringmessage,Headersheaders,Acknowledgmentack){headers.forEach(h->System.out.println("key="+h.key()+", value="+newString(h.value())));ack.acknowledge();}// 方式3:接收 ConsumerRecord(最完整,包含分区、偏移量等所有信息)@KafkaListener(topics="test-topic")publicvoidlisten(ConsumerRecord<String,String>record,Acknowledgmentack){System.out.println("topic="+record.topic()+", partition="+record.partition()+", offset="+record.offset()+", headers="+record.headers()+", value="+record.value());ack.acknowledge();}2.3) 实现手动提交偏移量
避免业务还没有跑完,就提交的偏移量。如果执行过程中出现故障,但是这条消息已经消费过了,造成数据丢失。
修改yml文件
spring:kafka:consumer:# 消费者配置enable-auto-commit:false# 关闭自动提交,配合 ack-mode: manual 手动提交偏移量listener:ack-mode:manualmissing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误,避免启动失败@KafkaListener(topicPattern="test-.*",groupId="test-group")publicvoidlisten(Stringmessage,Acknowledgmentack){try{System.out.println("Received message: "+message);// 业务逻辑处理...}finally{ack.acknowledge();// 手动提交偏移量}}