当前位置: 首页 > news >正文

docker快速部署kafka

前言

记录docker部署kafka

部署kafka

#创建挂载目录
sudo mkdir -p /zero/kafka
#授权
chown -R 777 /zero/kafka/
#创建 Docker 网络(用于容器间通信,如果尚未创建)
docker network create app-tier

启动命令

docker run -d \ --name kafka \ --network app-tier \ --restart always \ --ulimit nofile=65536:65536 \ -p 9092:9092 \ -p 9094:9094 \ -v /zero/kafka:/bitnami/kafka \ -e TZ=Asia/Shanghai \ -e KAFKA_CFG_NODE_ID=0 \ -e KAFKA_CFG_PROCESS_ROLES=controller,broker \ -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://172.16.10.40:9094 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT \ -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \ bitnami/kafka:3.6.2

如果docker pull bitnami/kafka:3.6.2 拉取不了

vi /etc/docker/daemon.json

{
"registry-mirrors": ["https://jhacxx1q.mirror.aliyuncs.com",
"https://docker.1ms.run",
"https://docker.1panel.live",
"https://docker.ketches.cn",
"https://docker.m.daocloud.io/",
"https://hub-mirror.c.163.com",
"https://dockerproxy.com/",
"https://mirror.baidubce.com/",
"https://docker.nju.edu.cn/",
"https://docker.mirrors.sjtug.sjtu.edu.cn/",
"https://mirror.ccs.tencentyun.com",
"https://docker-0.unsee.tech",
"https://register.liberx.info/",
"https://docker.registry.cyou/",
"https://docker-cf.registry.cyou/",
"https://dockercf.jsdelivr.fyi/",
"https://docker.jsdelivr.fyi/",
"https://dockertest.jsdelivr.fyi/",
"https://mirror.iscas.ac.cn/",
"https://docker.rainbond.cc/",
"https://mirror.aliyuncs.com",
"https://docker.mirrors.ustc.edu.cn/"
]
}

#重新加载 systemd 配置

sudo systemctl daemon-reload

#重启 Docker 使配置生效

sudo systemctl restart docker

记录下kafka在java项目消费时如何处理:

pom引入依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency>

yml配置

store: auto-startup-kafka: true
kafka: topics: - name: lst_export_excel_business partitions: 2 replicas: 1 bootstrap-servers: 172.16.10.40:9094 producer: acks: -1 retries: 32 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 32768 compression-type: snappy buffer-memory: 67108864 properties: 'linger.ms': 5 consumer: group-id: ${KAFKA_CONSUMER_GROUP_ID:storeServerGroup} key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: latest enable-auto-commit: false auto-commit-interval: 3 heartbeat-interval: 5000 listener: ack-mode: manual_immediate

封装类

import java.util.Map; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties.AckMode; @Configuration @EnableKafka @Log4j2 public class KafkaConsumerConfig { /** * 消费者批量工程 */ @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory(KafkaProperties kafkaProperties) { Map<String, Object> properties = kafkaProperties.getConsumer().buildProperties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); // boostrap server 配置 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 一次poll操作最大获取的记录数量 properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者 properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者 log.info(" kafka batchFactory properties = {}",properties); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties)); factory.setBatchListener(true); // 设置批量消费 factory.getContainerProperties().setPollTimeout(1000); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); // 手动提交 return factory; } }
import cloud.jiuwei.store.constant.StoreConstants; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopicConfig { @Value("${spring.profiles.active:uat}") private String profile; private static String env; private static final String SUFFIX = "_uat"; @PostConstruct public void init() { env = profile; } private static String getTopicName(final String topic) { if (StoreConstants.UAT.equals(env)) { return topic + SUFFIX; } else { return topic; } } /** * 业务异步导出 excel */ private static final String STO_EXPORT_EXCEL_BUSINESS = "lst_export_excel_business"; @Bean(name = STO_EXPORT_EXCEL_BUSINESS) public static String getStoExportExcelBusiness() { return getTopicName(STO_EXPORT_EXCEL_BUSINESS); } }
import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; /** * 事务管理工具 */ @Component @Slf4j public class TransactionUtil { private static ThreadPoolTaskExecutor threadPoolTaskExecutor; public TransactionUtil(ThreadPoolTaskExecutor transactionPool) { threadPoolTaskExecutor = transactionPool; } /** * 在事务提交后同步执行 * * @param runnable */ public static void syncAfterCommit(Runnable runnable) { log.info("在事务提交后同步执行"); if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { runnable.run(); } }); } else { runnable.run(); } } /** * 在事务提交后异步执行 * * @param executor * @param runnable */ public static void asyncAfterCommit(Executor executor, Runnable runnable) { log.info("在事务提交后异步执行"); if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { executor.execute(runnable); } }); } else { executor.execute(runnable); } } /** * 在事务提交后异步执行(默认线程池) * * @param runnable */ public static void asyncAfterCommit(Runnable runnable) { log.info("在事务提交后异步执行(默认线程池)"); if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { threadPoolTaskExecutor.execute(runnable); } }); } else { threadPoolTaskExecutor.execute(runnable); } } }

生产者-生产消息

TransactionUtil.syncAfterCommit(() -> KafkaUtils.sendAsynMsg(KafkaTopicConfig.getStoExportExcelBusiness(), exportRecordVO, null));

消费者-消费消息

/** * excel 导出 kafka 监听 */ @Component @Slf4j public class ExportExcelBusinessListener{
@KafkaListener(autoStartup = "${store.auto-startup-kafka}", topics = "#{@lst_export_excel_business}") public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) { log.info("【excel业务导出监听】接收:{}", record.toString()); ExportRecordBusinessPO queryExportRecordBusinessPO = null; try { ExportRecordBusinessVO exportRecordBusinessVO = JSON.parseObject(record.value(), ExportRecordBusinessVO.class); queryExportRecordBusinessPO = exportRecordBusinessService.getById(exportRecordBusinessVO.getId()); if (queryExportRecordBusinessPO == null) { log.warn("【excel业务导出监听】记录不存在,忽略消息"); ack.acknowledge(); return; } //修改数据状态 LambdaUpdateWrapper<ExportRecordBusinessPO> updateWrapper = new LambdaUpdateWrapper<ExportRecordBusinessPO>() .eq(ExportRecordBusinessPO::getId, queryExportRecordBusinessPO.getId()) .set(ExportRecordBusinessPO::getStatus, BusinessExportStatusEnum.EXPORTING.getCode()) .set(ExportRecordBusinessPO::getUpdateTime, new Date()); exportRecordBusinessService.update(null, updateWrapper); ExportRecordBusinessDTO exportRecordBusinessDTO = export(exportRecordBusinessVO); log.info("【excel业务导出监听】导出文件结果:{}",exportRecordBusinessDTO); ExportRecordBusinessPO exportRecordBusinessPO = BeanUtil.toBean(exportRecordBusinessDTO, ExportRecordBusinessPO.class); exportRecordBusinessService.updateById(exportRecordBusinessPO); ack.acknowledge(); }catch (BizException bizException) { log.error("【excel业务导出监听】业务异常", bizException); updateStatusToFailed(queryExportRecordBusinessPO, ack); } catch (Exception e) { log.error("【excel业务导出监听】系统异常", e); updateStatusToFailed(queryExportRecordBusinessPO, ack); } } private void updateStatusToFailed(ExportRecordBusinessPO po, Acknowledgment ack) { if (po == null || po.getId() == null) { ack.acknowledge(); return; } LambdaUpdateWrapper<ExportRecordBusinessPO> wrapper = new LambdaUpdateWrapper<ExportRecordBusinessPO>() .eq(ExportRecordBusinessPO::getId, po.getId()) .set(ExportRecordBusinessPO::getStatus, BusinessExportStatusEnum.EXPORT_FAILED.getCode()) .set(ExportRecordBusinessPO::getUpdateTime, new Date()); exportRecordBusinessService.update(null, wrapper); ack.acknowledge(); }

}

备注:

1.kafka消费者默认消费时长超过5分钟后ack提交失败

2.如果代码执行时间较长 记录生产者发送kafka数据json,消费时ack配合异步+定时任务监控处理中数据重新发送kafka执行 实现预防与补偿机制

http://www.jsqmd.com/news/471918/

相关文章:

  • 2026年安徽马鞍山小程序制作靠谱的公司推荐 - 工业设备
  • JavaScript-原型链结构图
  • 盘点2026年兰州热门装修公司,楚邦装饰客户评价好不好,价格贵吗? - mypinpai
  • Fish Speech 1.5开源模型价值解析:免费替代ElevenLabs/Polly方案
  • mPLUG-Owl3-2B与Yi-VL对比:轻量级中文多模态模型在图文检索任务中的表现
  • 2026-03-13 npm install -g yarn后不管怎么配置都无法查看yarn版本,即便配置了环境变量==》使用corepack重新安装yarn
  • 总结潜水搅拌机专业服务厂家选购要点,南京维克环保靠谱吗? - 工业品牌热点
  • 2026年贵州手表回收哪家靠谱 优质厂家详解 兼顾专业与便捷适配个人企业 - 深度智识库
  • 云容笔谈效果展示:1024×1024艺术边框装裱人像——东方红颜超清细节实拍级呈现
  • AIGlasses_for_navigation基础教程:盲道分割模型yolo-seg.pt加载与推理优化
  • 服务不错的商铺装修企业怎么收费,哪家比较好? - 工业推荐榜
  • coze-loop快速上手:粘贴即优化,5分钟掌握AI重构与思路解释
  • 避坑指南|西安酒店装修厂家排名,告别报价混乱、保修无保障 - 朴素的承诺
  • Z-Image-Turbo孙珍妮LoRA模型实战:从CSDN文档到真实图片生成的端到端复现
  • 千层架制造厂哪家售后好,如何挑选到满意的? - myqiye
  • AIGlasses_for_navigation惊艳效果:盲道像素级分割+中心线拟合动态轨迹生成
  • LiuJuan20260223Zimage多平台部署:Docker/Kubernetes环境下Xinference集群化实践
  • Nanbeige4.1-3B Chainlit高级功能:多会话标签管理+跨对话上下文引用
  • 解锁数据库极速引擎:索引底层机制、聚簇与非聚簇之争及性能避坑指南
  • translategemma-27b-it环境部署:无需conda/pip,Ollama镜像开箱即用
  • 开源人脸检测模型选型:cv_resnet101_face-detection_cvpr22papermogface在边缘设备可行性分析
  • InstructPix2Pix调参指南:Image Guidance对效果影响
  • OpenClaw等智能体帮助我们梦想落地,拜托机械劳动的困扰
  • Java多线程神器——ThreadForge ,让多线程从此简单
  • Qwen3-4B Instruct-2507实战教程:用temperature=0.3生成稳定技术文档
  • 开源可部署+多场景落地:AnythingtoRealCharacters2511在文化数字化工程中的实践
  • StructBERT-中文-generic-large部署指南:从零开始搭建语义搜索服务
  • GTE文本向量模型实战:智能合同审查系统(条款实体+风险关系抽取)
  • cv_unet_image-colorization企业级部署:Docker容器化上色服务搭建教程
  • SDXL-Turbo多场景落地:游戏原画草稿生成、广告视觉快速迭代实操