当前位置: 首页 > news >正文

Java连接Kafka示例

1、引入依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency>

2、生产者

import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class KafkaProducerTest implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; private String clientid; public KafkaProducerTest(String topicName,String clientid) { Properties props = new Properties(); props.put("bootstrap.servers", "10.1.11.212:32765,10.1.11.212:32766,10.1.11.212:32767"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); // ======================== 你要的 SASL 认证配置 ======================== props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + "username=\"admin\" " + "password=\"Hc@Cloud01\";"); this.producer = new KafkaProducer<String, String>(props); this.topic = topicName; this.clientid = clientid; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr= "你好,这是第"+messageNo+"条数据 clientid=" + clientid; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("发送的信息:" + messageStr); } //生产1000条就退出 if(messageNo == 1000){ System.out.println("成功发送了"+messageNo+"条"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test1 = new KafkaProducerTest("logstash-08-04", "clientid1"); Thread thread1 = new Thread(test1); thread1.start(); } }

2、消费者

package com.example.demo; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private String clientid; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName,String clientid) { Properties props = new Properties(); props.put("bootstrap.servers", "10.1.11.212:32765,10.1.11.212:32766,10.1.11.212:32767"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); // ======================== 你要的 SASL 认证配置 ======================== props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + "username=\"admin\" " + "password=\"Hc@Cloud01\";"); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); this.clientid = clientid; } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { //消费100条就打印 ,但打印的数据不一定是这个规律的 if(messageNo%100==0){ System.out.println(messageNo+"=======成功消费:receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } //当消费了1000条就退出 if(messageNo == 1000){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("logstash-08-04", "clientid1"); Thread thread1 = new Thread(test1); thread1.start(); } }

4、logback.xml(可选)

<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="on" />
http://www.jsqmd.com/news/622358/

相关文章:

  • 2026年停车场照明哪家性价比高?多维度分析与选择参考 - 品牌排行榜
  • Qwen3-Embedding-4B惊艳案例:用128维向量实现高效语义搜索
  • 2026停车场照明品牌发展观察:智能节能技术引领行业升级 - 品牌排行榜
  • Poppler for Windows:让PDF处理变得简单高效的开源工具
  • Ant Media Server性能优化:10个提升流媒体质量的关键技巧
  • 重0到1基于langchain框架搭建一个智能体(chapter 1)
  • 雪女-斗罗大陆-造相Z-Turbo在元宇宙中的应用:为用户虚拟化身生成个性化动漫形象
  • 5分钟学会TurboDiffusion:Wan2.1快速生成产品演示视频教程
  • 奥运排行榜背后的数据博弈:如何为不同国家定制最佳排名策略
  • 2026停车场照明哪家好?智慧节能方案对比参考 - 品牌排行榜
  • C编码小技巧(代码注释,日志开启/关闭,#pragma once)
  • SmolVLA企业级应用:基于.NET框架的智能业务系统集成
  • TitanHide核心原理:SSDT Hook技术深度解析
  • Pixel Dream Workshop 控制生成技术:Depth与Canny控制网实战
  • SDMatte在嵌入式设备上的轻量化部署探索:基于STM32的启发
  • 终极ink运行时引擎解析:容器、控制命令与故事状态管理全指南
  • 2026年专业的论文降重网站助力学术写作高效完成 - 品牌排行榜
  • Sentinel-1 Burst数据处理避坑实录:从aria2c报错到wget脚本救场
  • HsMod:炉石传说终极个性化定制方案,实现游戏体验8倍效率提升
  • 2022.12四级听力真题解析:高效备考策略与实战技巧
  • Claude参数曝光,AI模型竞争格局再掀波澜
  • Klib入门指南:5分钟掌握C语言高性能通用库
  • 基于伏羲大模型的全球气象可视化:JavaScript与Vue前端交互实现
  • 量化交易自学指南其七——策略编写
  • 如何防止别人恶意刷接口?
  • HsMod终极指南:深入解析炉石传说BepInEx插件架构与高级定制
  • Gemma-3-12b-it部署成本优化:INT4量化后显存降低60%且精度损失<2%实测
  • RVC效果展示:AI翻唱作品集,听听我的声音有多像
  • 2026年论文降重效果好的网站选择与实用参考 - 品牌排行榜
  • Vue前端集成StructBERT零样本分类模型的实战教程