当前位置: 首页 > news >正文

sb-KafkaListener 20260425

sb-KafkaListener 20260425

1、pom.xml

<properties>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>3.0.2</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>



2、application.yml


#spring:
# kafka:
# # ✅ 正确位置:公共配置,生产者+消费者都用这个
# bootstrap-servers: 192.168.91.165:9092
# consumer:
# group-id: console-consumer-95339
# # 下面这两个是固定必配的,我一起给你加上
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#properties:
# spring:
# json:
# trusted:
# packages: "*"

server:
port: 18084
spring:
kafka:
# ✅ 正确位置:公共配置,生产者+消费者都用这个
bootstrap-servers: 192.168.91.165:9092
consumer:
auto-offset-reset: earliest #给 group01 加上 auto-offset-reset: earliest,让它从开头读
group-id: console-consumer-62695
#group-id: group01
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

#server.port=18084
#spring.kafka.bootstrap-servers= 192.168.91.165:9092
#spring.kafka.consumer.group-id= console-consumer-95339
#spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringDeserializer
 

3、KafkaConsumerService 
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {

private static final String TOPIC = "topic01";
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);

/**
* Kafka 消费者监听
* 自动使用你配置文件里的:group-id + bootstrap-servers
*/
@KafkaListener(topics = {TOPIC}) // 只需要写主题名即可
public void listen(String message) {
// 收到消息就会进入这里
log.info("========================================");
log.info("topics【"+TOPIC+"】收到 Kafka 消息:" + message);
log.info("========================================");
}
}


4、

image

 

image

 

 

image