当前位置: 首页 > news >正文

Kafka 和springboot 整合Logback日志

Spring Boot 整合 Kafka 进行日志处理是一个常见的任务,可以帮助你更好地管理和分析应用程序的日志。以下是一个基本的步骤指南,帮助你完成这个整合。

本文介绍了如何在SpringBoot应用中整合Kafka,利用Logback收集日志并发送到Kafka。详细步骤包括添加Kafka相关依赖,配置logback.xml,自定义KafkaAppender以及提供测试。

首先安装kafka.。之前已经安装过了,我们这里不再讲。,可以参考前面讲的类容:

https://blog.csdn.net/lchmyhua88/article/details/155640953?spm=1001.2014.3001.5502

接着我们按照下面的步骤开始:

  • 1.pom依赖
  • 2.logback.xml配置
  • 3.配置 kafka消费者
  • 4.监听消费消息&测试代码

1. 添加依赖

<!-- Spring Boot Starter Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Logback Kafka Appender --> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC2</version> </dependency> <!-- Kafka Clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>

2.配置 Logback

<configuration> <!-- 控制台输出appender --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <!-- Kafka appender --> <appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> <!-- Kafka topic名称 --> <topic>log-topic</topic> <!-- Kafka bootstrap servers --> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <!-- Kafka producer配置 --> <producerConfig>bootstrap.servers=192.168.33.10:9092</producerConfig> <producerConfig>acks=0</producerConfig> <producerConfig>linger.ms=1000</producerConfig> <producerConfig>max.block.ms=0</producerConfig> <!-- 过滤器设置(可选) --> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <!-- 根logger配置 --> <root level="INFO"> <appender-ref ref="CONSOLE"/> <appender-ref ref="KAFKA"/> </root> </configuration>

3.配置 kafka消费者,为了方便测试我们把生产者消费者都配置上

spring.kafka.bootstrap-servers=192.168.33.10:9092 #spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonDeserializer

4.注入kafka bean ,方便后面调试生产者。

@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

5.监听消费消息

@Component public class LogConsumer { @KafkaListener(topics = "log-topic",groupId = "my-group") public void listen(String message) { System.out.println("收到日志消息: " + message); } }

6.添加日志打印

@RestController @RequestMapping("/kafka") public class LogController { @Autowired private KafkaProducerServiceImpl producerService; private static final Logger logger = (Logger) LoggerFactory.getLogger(LogController.class); @GetMapping("/log") public String log() { //producerService.sendLogMessage("This is a test log message"); logger.info("这是一条测试日志消息"); logger.warn("这是一条警告日志消息"); logger.error("这是一条错误日志消息"); return "Log message sent"; } }

下面我们开始测试,通过调用接口:

控制台可以看到我写3条日志,kafka消费3条消息

这样我们就通过logback把日志收集到kafka里面。方便数据分析。当然你也可以写入es进行分析画像。

我们可以通过kafka manger控制台来监控消息。

下载kafka-manager,下载地址: https://github.com/yahoo/kafka-manager

解压后配置application.conf kafka地址:

保存后,指定8888端口启动:

bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8888

启动后打开控制面板,可以看到kafka集群节点信息和分区,消费偏移等信息:

http://192.168.33.10:8888/

详情可以去官网看看介绍:

https://github.com/yahoo/CMAK

http://www.jsqmd.com/news/88019/

相关文章:

  • 2025精选:河北粘钉一体机供应商口碑前十强,可靠的粘钉一体机精选实力品牌 - 品牌推荐师
  • 2025年国内专业的多媒体讲台电教桌公司排行,联动多媒体讲台电教桌/机场多媒体讲台电教桌/多媒体讲台电教桌销售厂家排行榜 - 品牌推荐师
  • 黑马点评前125节课遇到的问题及解决方案(在看网课过程中会有很多老师运行成功但我们失败并且老师还不没有讲到的情况,本文致力于解决这个问题,记录了本人在做这个项目的时候遇到的所有问题)
  • 服务器内存条与工作站内存条区别
  • 【Qt】配置安卓创建环境
  • 探索多虚拟电厂联合调度优化模型:集中式算法的实践
  • (19)Bean的循环依赖问题
  • ADVANCE Day23
  • C++ 相对 C 的语法补充:解决痛点,让代码更简洁安全
  • (20)回顾反射机制
  • 21、Linux 系统中的文件归档、备份与正则表达式使用
  • 内存条电压
  • Flutter + OpenHarmony 架构演进:从单体到模块化、微前端与动态能力的现代化应用体系
  • 22、正则表达式全解析
  • Vue的Class绑定对象语法如何让动态类名切换变得直观高效?
  • 23、正则表达式与文本处理全解析
  • 如何快速构建行为面试中的领导力案例:面向求职者的完整指南
  • 18、Linux 网络工具使用指南
  • 数字电路模拟程序迭代及课堂测验总结 - 23207101
  • 直流微电网混合储能模型Simulink仿真探索
  • 39、高级Shell脚本编程技巧与概念
  • 基于 Rust 实现单向网闸环境下的 MQTT 消息透明传输
  • 25、文本处理工具全解析
  • 24、文本处理工具全解析:从排序到比较,掌握高效文本操作技巧
  • java-BlockingQueue、CountDownLatch讲解
  • 26、文本格式化工具全解析
  • QT6 windows 11 VS2022 发布后启动
  • 27、Unix 系统中的文档格式化与打印
  • # 深度解析:爬虫工艺获取淘宝商品详情并封装为API的全流程应用
  • 二叉树基本概念及遍历