当前位置: 首页 > news >正文

基于Docker的Kafka服务

目录

注意:

1. 说明

2. 服务器规划

3. docker-compose文件

kafka{i}.yaml

kafka-ui.yaml

4. kafka-ui配置集群监控

5. 参数表

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py

消费者-异步消费: AsyncKafkaConsumer1.py

7. 参考


注意:

最近重新安装基于bitnami/kafka的服务时,发现其已不再开源免费,现采用apache/kafka作为容器镜像。

附单节点配置:

services: kafka-single: image: apache/kafka:latest container_name: kafka-single ports: - 9092:9092 - 9093:9093 environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://kafka-single:9092,CONTROLLER://kafka-single:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-single:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-single:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 3

1. 说明

  • 创建一个本地开发环境所需的kafka集群
  • 分布在3个虚拟机上,以docker容器方式互联互通

2. 服务器规划

Host端口备注

host001.dev.sb

9092, 9093, 9081

kafka ui 访问

kafka0 节点

host002.dev.sb9092, 9093kafka1 节点
host003.dev.sb9092, 9093kafka2 节点

3. docker-compose文件

kafka{i}.yaml

- 其中 {i} 对应0,1,2

- 用户密码都配在文件里面

services: kafka: image: 'bitnami/kafka:3.6.2' container_name: kafka{i} hostname: kafka{i} restart: always ports: - 9092:9092 - 9093:9093 environment: # KRaft - KAFKA_CFG_NODE_ID={i} - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093,1@kafka1:9093,2@kafka2:9093 - KAFKA_KRAFT_CLUSTER_ID=sbcluster01-mnopqrstuv # Listeners - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_NUM_PARTITIONS=3 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # Clustering - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 # Log - KAFKA_CFG_LOG_RETENTION_HOURS = 72 # SASL - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN - KAFKA_CONTROLLER_USER=kfkuser - KAFKA_CONTROLLER_PASSWORD=youknow - KAFKA_INTER_BROKER_USER=kfkuser - KAFKA_INTER_BROKER_PASSWORD=youknow - KAFKA_CLIENT_USERS=kfkuser - KAFKA_CLIENT_PASSWORDS=youknow # Others - TZ=Asia/Shanghai volumes: - '/data0/Server/Db/kafka0:/bitnami/kafka' extra_hosts: - "kafka0:172.16.20.60" - "kafka1:172.16.20.61" - "kafka2:172.16.20.62"
kafka-ui.yaml
services: kafka-ui: image: 'provectuslabs/kafka-ui:master' container_name: kafka-ui restart: always ports: - 9081:8080 environment: - KAFKA_CLUSTERS_0_NAME=local - DYNAMIC_CONFIG_ENABLED=true - AUTH_TYPE=LOGIN_FORM - SPRING_SECURITY_USER_NAME=admin - SPRING_SECURITY_USER_PASSWORD=youknow extra_hosts: - "kafka0:172.16.20.60" - "kafka1:172.16.20.61" - "kafka2:172.16.20.62"

4. kafka-ui配置集群监控

5. 参数表

参数说明
KAFKA_CFG_PROCESS_ROLES

kafka角色,做broker, controller

示例:
KAFKA_CFG_PROCESS_ROLES=controller,broker

KAFKA_KRAFT_CLUSTER_ID集群id, 同属节点需一样
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS投票选举列表
KAFKA_CFG_CONTROLLER_LISTENER_NAMES控制器名称
KAFKA_CFG_NUM_PARTITIONS默认分区数
KAFKA_CFG_LISTENERS监听器的地址和端口
KAFKA_CFG_ADVERTISED_LISTENERS发布监听器的地址和端口
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP监听器的协议 这里sasl_plain表示 仅认证加密 传输不加密
KAFKA_CLIENT_USERS加密客户端账号
KAFKA_CLIENT_PASSWORDS加密客户端密码
#Clustering
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORKafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORKafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRKafka 内部使用的 __transaction_state 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与
leader 保持同步的副本集合
#Log
KAFKA_CFG_LOG_DIRS日志目录
KAFKA_CFG_LOG_RETENTION_HOURS数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理,默认168小时,一周时间

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer import json def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [{msg.partition()}]") def create_async_producer(config): """Creates an instance of an asynchronous Kafka producer.""" return Producer(config) def produce_messages(producer, topic, messages): """Asynchronously produces messages to a Kafka topic.""" for message in messages: # Trigger any available delivery report callbacks from previous produce() calls producer.poll(0) # Asynchronously produce a message, the delivery report callback # will be triggered from poll() above, or flush() below, when the message has # been successfully delivered or failed permanently. producer.produce( topic, json.dumps(message).encode("utf-8"), callback=delivery_report ) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. producer.flush() if __name__ == "__main__": # Kafka configuration # Replace these with your server's configuration conf = { "bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092", "client.id": "PythonProducer", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanisms": "PLAIN", "sasl.username": "kfkuser", "sasl.password": "youknow", } # Create an asynchronous Kafka producer async_producer = create_async_producer(conf) # Messages to send to Kafka messages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}] # Produce messages produce_messages(async_producer, "zx001.msg.user", messages_to_send)
消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException import asyncio import json import logging from datetime import datetime # 设置日志格式,'%()'表示日志参数 log_format = "%(message)s" logging.basicConfig( filename="logs/kafka_messages1.log", format=log_format, level=logging.INFO ) async def consume_loop(consumer, topics): try: # 订阅主题 consumer.subscribe(topics) while True: # 轮询消息 msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event print( "%% %s [%d] reached end at offset %d\n" % (msg.topic(), msg.partition(), msg.offset()) ) elif msg.error(): raise KafkaException(msg.error()) else: # 正常消息 raw_message = msg.value() # print(f"Raw message: {raw_message}") str_msg = raw_message.decode("utf-8") parsed_message = json.loads(str_msg) parsed_message["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"Received message: {type(parsed_message)} : {parsed_message}") json_data = json.dumps(parsed_message, ensure_ascii=False) logging.info("{}".format(json_data)) await asyncio.sleep(0.01) # 小睡片刻,让出控制权 finally: # 关闭消费者 consumer.close() async def consume(): # 消费者配置 conf = { "bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092", "group.id": "MsgGroup2", "auto.offset.reset": "earliest", "client.id" : "PythonConsumer", "security.protocol" : "SASL_PLAINTEXT", "sasl.mechanisms" : "PLAIN", "sasl.username" : "kfkuser", "sasl.password" : "youknow" } # 创建消费者 consumer = Consumer(conf) await consume_loop(consumer, ["zx001.msg.user"]) if __name__ == "__main__": asyncio.run(consume())

7. 参考

- https://hub.docker.com/r/apache/kafka

- Apache Kafka® Quick Start - Local Install With Docker

- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub

- https://juejin.cn/post/7187301063832109112

http://www.jsqmd.com/news/447008/

相关文章:

  • 从0到1:使用Job Iteration重构长时运行的Rails后台任务
  • 基于物品的协同过滤算法简单实战应用
  • 南大通用数据库安装使用教程(GBase8s)
  • android-ndk-rs未来展望:新特性与社区发展路线图
  • 10个火宝短剧实用技巧:提升AI短剧制作效率的终极指南
  • 终极Cuttlefish邮件服务器常见问题解决方案:从安装到高级配置全指南
  • windows文件实时同步
  • Nano Stores在React Native中的终极应用指南:简单快速的状态管理解决方案
  • 突破Session隔离:GH-Injector-Library全方法通用绕过技巧
  • CSP-S 2024 提高级 第一轮(初赛) 完善程序(2)
  • 探索apm生态:发现10个改变Atom体验的精选插件
  • Git Quick Stats自动化部署终极指南:CI/CD流水线集成完整教程
  • 【linux】shell命令
  • 如何快速掌握DotNetCore微服务:从零开始的保险销售系统实战教程
  • 探索Veloren:如何体验这款开源像素RPG的无限魅力?
  • 如何用Vue和Vuex构建你的第一个俄罗斯方块游戏:完整指南
  • Lovefield跨浏览器兼容性终极指南:Chrome、Firefox、IE的完整解决方案
  • 如何使用Git Quick Stats实现高效仓库统计监控与自动化告警
  • Lovefield终极性能调优指南:10个技巧让你的Web数据库运行更快
  • 7步完美贡献StyleGAN3:官方PyTorch实现的高质量PR提交指南
  • 终极指南:5步开发prettier-plugin-tailwindcss自定义解析器
  • JS Confetti核心API解析:掌握addConfetti与位置控制
  • StyleGAN3终极指南:如何彻底消除生成图像伪影的完整技术解析
  • Flux v1与Kustomize集成:多环境配置管理的终极指南
  • 如何快速掌握Mogenerator:iOS/Mac开发必备的Core Data代码生成工具
  • Alpakka核心组件全解析:从AWS到Kafka的20+连接器实战
  • vue企业官网模板 企业门户网站源码 开箱即用 网站二改,省时省力
  • 彼得林奇对公司高管薪酬结构与长期业绩的相关性研究
  • 如何快速实现Zaplib在生产环境的部署:Webpack集成与优化技巧
  • JARM vs JA3:两大TLS指纹技术对比,谁才是网络安全检测的王者?