当前位置: 首页 > news >正文

简单梳理梳理java应用

### **序**

本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。

#### **可用类库**

* kafka client
* spring for apache kafka
* spring integration kafka
* spring cloud stream binder kafka

基于java版的kafka client与spring进行集成

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>

#### **与springboot的集成**

https://www.zhihu.com/zvideo/1994207677006489215/
https://www.zhihu.com/zvideo/1994207676800988202/
https://www.zhihu.com/zvideo/1994207675509133884/
https://www.zhihu.com/zvideo/1994207675374921419/
https://www.zhihu.com/zvideo/1994207675790160971/
https://www.zhihu.com/zvideo/1994207674745758972/
https://www.zhihu.com/zvideo/1994207675072943678/
https://www.zhihu.com/zvideo/1994207674812888358/
https://www.zhihu.com/zvideo/1994207673290338491/
https://www.zhihu.com/zvideo/1994207673063859997/
https://www.zhihu.com/zvideo/1994207672652830570/

对于springboot 1.5版本之前的话,需要自己去配置java configuration,而1.5版本以后则提供了auto config,具体详见org.springframework.boot.autoconfigure.kafka这个包,主要有

* KafkaAutoConfiguration spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

private final KafkaProperties properties;

public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}

@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<, > kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}

@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<Object, Object>();
}

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<, > kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}

@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<, > kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<Object, Object>(
this.properties.buildProducerProperties());
}

}

* KafkaAnnotationDrivenConfiguration spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

private final KafkaProperties properties;

KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
this.properties = properties;
}

@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
return configurer;
}

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<, > kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}

@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA\_LISTENER\_ANNOTATION\_PROCESSOR\_BEAN\_NAME)
protected static class EnableKafkaConfiguration {

}
}

* ConcurrentKafkaListenerContainerFactoryConfigurer spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

public class ConcurrentKafkaListenerContainerFactoryConfigurer {

private KafkaProperties properties;

/\*\*
\* Set the {@link KafkaProperties} to use.
\* @param properties the properties
\*/
void setKafkaProperties(KafkaProperties properties) {
this.properties = properties;
}

/\*\*
\* Configure the specified Kafka listener container factory. The factory can be
\* further tuned and default settings can be overridden.
\* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
\* instance to configure
\* @param consumerFactory the {@link ConsumerFactory} to use
\*/
public void configure(
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
if (container.getAckMode() != null) {
containerProperties.setAckMode(container.getAckMode());
}
if (container.getAckCount() != null) {
containerProperties.setAckCount(container.getAckCount());
}
if (container.getAckTime() != null) {
containerProperties.setAckTime(container.getAckTime());
}
if (container.getPollTimeout() != null) {
containerProperties.setPollTimeout(container.getPollTimeout());
}
if (container.getConcurrency() != null) {
listenerContainerFactory.setConcurrency(container.getConcurrency());
}
}

}

> 创建并发的多个KafkaMessageListenerContainer,相当于一个应用实例创建多个consumer 如果是1.5版本及以上的springboot,使用起来就比较简单了,注入kafkaTemplate直接发消息,然后简单配置一下就可以消费消息

### **spring integration kafka**

> spring integration是spring关于Enterprise Integration Patterns的实现,而spring integration kafka则基于spring for apache kafka提供了inbound以及outbound channel的适配器 Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x

这个的话,没有自动配置,又引入了integration相关的概念,整体来讲,相对复杂一些。

#### **consumer配置**

@Bean
public KafkaMessageListenerContainer<String, String> container(
ConsumerFactory<String, String> kafkaConsumerFactory) {
return new KafkaMessageListenerContainer<>(kafkaConsumerFactory,
new ContainerProperties(new TopicPartitionInitialOffset(topic, 0)));
}
@Bean
public ConsumerFactory<, > kafkaConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG, brokerAddress);
props.put(ConsumerConfig.GROUP\_ID\_CONFIG, consumerGroup);
props.put(ConsumerConfig.ENABLE\_AUTO\_COMMIT\_CONFIG, true);
props.put(ConsumerConfig.AUTO\_COMMIT\_INTERVAL\_MS\_CONFIG, 100);
props.put(ConsumerConfig.SESSION\_TIMEOUT\_MS\_CONFIG, 15000);
props.put(ConsumerConfig.AUTO\_OFFSET\_RESET\_CONFIG,"earliest");
props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public PollableChannel fromKafka() {
return new QueueChannel();
}

#### **producer配置**

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(topic));
handler.setMessageKeyExpression(new LiteralExpression(messageKey));
return handler;
}
@Bean
public ProducerFactory<String, String> kafkaProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP\_SERVERS\_CONFIG, brokerAddress);
props.put(ProducerConfig.RETRIES\_CONFIG, 0);
props.put(ProducerConfig.BATCH\_SIZE\_CONFIG, 16384);
props.put(ProducerConfig.LINGER\_MS\_CONFIG, 1);
props.put(ProducerConfig.BUFFER\_MEMORY\_CONFIG, 33554432);
props.put(ProducerConfig.KEY\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(kafkaProducerFactory());
}

#### **收发信息**

@Autowired
@Qualifier("fromKafka")
private PollableChannel fromKafka;

@Autowired
@Qualifier("toKafka")
MessageChannel toKafka;

Message msg = fromKafka.receive(10000l);
toKafka.send(new GenericMessage<Object>(UUID.randomUUID().toString()));

### **spring cloud stream**

> 基于Spring Integration构建,在spring cloud环境中又稍作加工,也稍微有点封装了. 具体详见spring cloud stream kafka实例以及spring-cloud-stream-binder-kafka属性配置

### **doc**

* spring-kafka
* spring-integration
* spring-integration-kafka
* spring-integration-samples-kafka
* spring-cloud-stream
* spring boot与kafka集成
* 总结kafka的consumer消费能力很低的情况下的处理方案

http://www.jsqmd.com/news/235653/

相关文章:

  • React Native搭建环境通俗解释:初学者看懂两大方案
  • PCB布线规则设计中地平面分割的实战案例分析
  • screen指令入门必看:终端多路复用基础操作指南
  • CANFD差分信号传输机制图解说明
  • 利用Multisim进行带宽扩展放大器仿真的完整示例
  • LeetCode 1266.访问所有点的最小时间:贪心(数学)+python一行版
  • 快速理解HAL_UART_RxCpltCallback在工业协议解析中的角色
  • 全面讲解Elasticsearch向量类型(dense_vector)用法
  • 软著撰写要点
  • Elasticsearch日志分析系统架构设计全面讲解
  • 基于KRR核岭回归(Kernel Ridge Regression)多变量回归预测 (多输入单输出) Matlab回归
  • Multisim14.2安装教程:防病毒软件冲突解决方法
  • 视觉与惯导融合定位技术:自动驾驶手把手教程
  • W5500以太网模块PCB布局布线操作指南
  • I2C时序噪声干扰识别:一文说清信号完整性诊断方法
  • Linux 内核学习(16) --- linux x86-64 虚拟地址空间和区域
  • 基于Java+SpringBoot+SSM办公管理系统(源码+LW+调试文档+讲解等)/办公系统/管理系统/办公自动化系统/企业办公管理系统/智能办公管理系统/协同办公管理系统
  • 学霸同款2026继续教育AI论文写作软件TOP10:选对工具轻松过关
  • 手把手教你用Keil C51开发继电器控制系统
  • IGBT——原理和分类
  • Hive与Kylin整合:构建企业级OLAP解决方案
  • 【欠驱动AUV】欠驱动自主水下航行器(AUV)的轨迹跟踪和路径跟随算法的不同分析方法进行仿真研究(Matlab代码、Simulink仿真)
  • Altium Designer工业EMC设计核心要点
  • 基于Java+SpringBoot+SSM动漫分享系统(源码+LW+调试文档+讲解等)/动漫交流平台/动漫资源分享/动漫社区系统/动漫分享网站/动漫共享平台
  • 《创业之路》-829-一个组织中,最复杂、最难处理的其实不是技术、不是产品设计和业务流程,其实是“人”本身。
  • 常见的垃圾回收器
  • 015-MD5极志愿
  • I2S协议PCB布线关键点:零基础掌握走线规则
  • 【叶片单元动量理论】分析给定螺旋桨几何形状在不同前进比下恒定转速下的性能研究(Matlab代码实现)
  • JVM中的类加载Minor GC与Full GC