当前位置: 首页 > news >正文

kafka 集群部署

Kafka 集群安装说明

Kafka版本:3.8.0

Zookeeper版本:3.8.4

集群服务器:192.168.1.66 192.168.1.162

Kafka集群安装说明

为什么要用集群?

单机服务下,Kafka已经具备了非常高的性能。TPS能够达到百万级别。但是,在实际工作中使用时,单机搭建的Kafka会有很大的局限性。

一方面:消息太多,需要分开保存。Kafka是面向海量消息设计的,一个Topic下的消息会非常多,单机服务很难存得下来。这些消息就需要分成不同的Partition,分布到多个不同的Broker上。这样每个Broker就只需要保存一部分数据。这些分区的个数就称为分区数。

另一方面:服务不稳定,数据容易丢失。单机服务下,如果服务崩溃,数据就丢失了。为了保证数据安全,就需要给每个Partition配置一个或多个备份,保证数据不丢失。Kafka的集群模式下,每个Partition都有一个或多个备份。Kafka会通过一个统一的Zookeeper集群作为选举中心,给每个Partition选举出一个主节点Leader,其他节点就是从节点Follower。主节点负责响应客户端的具体业务请求,并保存消息。而从节点则负责同步主节点的数据。当主节点发生故障时,Kafka会选举出一个从节点成为新的主节点。

Kafka集群由多个Kafka服务节点组成,每个节点称为一个Broker。在Kafka集群中,没有“中心主节点”的概念,集群中的所有节点都是对等的。每个Broker就是一个Kafka服务实例,多个Broker构成一个Kafka集群。生产者发布的消息将保存在Broker中,消费者将从Broker中拉取消息进行消费。

Kafka集群具有以下特点:

高吞吐量、低延迟 :Kafka每秒可以处理几十万条消息,延迟最低只有几毫秒。

横向扩展 :Kafka集群支持热扩展,可以方便地增加或减少节点。

分区与副本 :每个topic可以分多个partition,且每个partition都有多个副本,副本用于实现数据冗余和故障转移。

持久性、可靠性 :消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。

容错性 :允许集群中节点失败,若副本数量为n,则允许n-1个节点失败。

高并发 :支持数千个客户端同时读写。

此外,Kafka还广泛应用于日志收集、消息系统、运营指标等场

Zookeeper集群安装说明

在Kafka集群中,Zookeeper扮演了关键的角色,主要体现在以下几个方面:

  1. 维护集群元数据:Zookeeper负责维护Kafka集群的元数据,这包括broker的状态、topic的分区配置以及consumer group的消费状态等。这些元数据的维护和管理对于Kafka集群的正常运行至关重要。

  2. Broker注册:由于Broker是分布式部署且相互独立的,需要有一个注册系统来管理整个集群中的Broker。Zookeeper就扮演了这个角色。每个Broker在启动时都会在Zookeeper上进行注册,创建属于自己的节点,并将自己的IP地址和端口信息记录到该节点中。这样,Kafka集群就能够通过Zookeeper来追踪和管理所有的Broker。

  3. 选举Leader:在Kafka集群中,每个分区都有一个leader和多个follower。当leader出现故障或不可用时,Zookeeper会进行leader选举,重新选举一个新的leader来处理该分区的数据。这是确保Kafka集群高可用性的关键机制之一。

  4. 集群配置管理:Kafka还通过Zookeeper来管理集群的配置,例如broker的配置信息、topic的配置信息等。这些配置信息对于Kafka集群的运行和性能优化都非常重要。

总的来说,Zookeeper在Kafka集群中扮演了元数据存储、Broker管理、Leader选举以及集群配置管理等关键角色,是确保Kafka集群正常运行和高可用性的重要组成部分。

Kafka+Zookeeper架构图

Zookeeper集群部署

下载地址:Zookeeper:https://zookeeper.apache.org/releases.html

#将apache-zookeeper-3.8.4-bin.tar.gz 上传到两台服务器/data/app/pats_opts目录下解压 Cd /data/app/pats_opts tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz mv apache-zookeeper-3.8.4-bin zookeeper 修改配置文件 Cd zookeeper/conf cp zoo_sample.cfg zoo.cfg mkdir data logs vim zoo.cfg #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 tickTime=2000 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s initLimit=10 #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认 为Follwer死掉,并从服务器列表中删除Follwer syncLimit=5 #修改,指定保存Zookeeper中的数据的目录,目录需要单独创建 dataDir=/data/app/pats_opts/zookeeper/conf/data #添加, 指定存放日志的目录,目录需要单独创建 dataLogDir=/data/app/pats_opts/zookeeper/conf/logs #客户端连接端口 clientPort=2181 #添加集群信息 server.1=192.168.1.66:3188:3288 server.2=192.168.1.162:3188:3288 --------------------------------------------------------------- server.A=B:C:D #●A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件#myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server #●B是这个服务器的地址 #●c是这个服务器Follower与集群中的Leader服务器交换信息的端口 #●D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是#用来执行选举时服务器相互通信的端口 #如果指定节点不参加选举,在末尾加observer server.3=192.168.19.102:3188:3288:observer #在每个节点的dataDir指定的目录下创建一个myid的文件 #.66 echo 1 > /data/app/pats_opts/zookeeper/conf/data/myid #.162 echo 2 > /data/app/pats_opts/zookeeper/conf/data/myid

启动脚本

more server.sh #!/bin/bash #chkconfig:2345 20 90 #description: Zookeeper Service Control Script ZK_HOME='/data/app/pats_opts/zookeeper' case $1 in start) echo "-----zookeeper启动-----" $ZK_HOME/bin/zkServer.sh start ;; stop) echo "----zookeeper停止-------" $ZK_HOME/bin/zkServer.sh stop ;; restart) echo "----zookeeper重启-------" $ZK_HOME/bin/zkServer.sh restart ;; status) echo "-----zookeeper状态------" $ZK_HOME/bin/zkServer.sh status ;; *) echo "Usage: $0 {start|stop|restart|status}" esac #使用命令 ./server.sh start|stop|restart|status

Kafka集群部署

下载地址:Kafka:https://kafka.apache.org/downloads

#1:将kafka_2.12-3.8.0.tgz 上传到两台服务器/data/app/pats_opts目录下解压 cd /data/app/pats_opts/ tar -zxvf kafka_2.12-3.8.0.tgz mv kafka_2.12-3.8.0 kafka cd kafka && mkdir logs #2:修改配置文件 vim server.properties broker.id=0 #此处brokerid 要是唯一的 delete.topic.enable=true #默认没有这个配置也是false listeners=PLAINTEXT://192.168.1.66:9092 num.partitions=1 log.dirs=/data/app/pats_opts/kafka/logs zookeeper.connect=192.168.1.66:2181,192.168.1.162:2181

配置文件参数查考如下截图

kafka启停脚本

more server.sh #!/bin/bash #chkconfig:2345 22 88 #description:Kafka Service Control Script KAFKA_HOME='/data/app/pats_opts/kafka' case $1 in start) echo "---------- Kafka 启动 ------------" ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties ;; stop) echo "---------- Kafka 停止 ------------" ${KAFKA_HOME}/bin/kafka-server-stop.sh ;; restart) $0 stop $0 start ;; status) echo "---------- Kafka 状态 ------------" count=$(ps -ef | grep kafka | egrep -cv "grep|$$") if [ "$count" -eq 0 ];then echo "kafka is not running" else echo "kafka is running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" esac #启停脚本使用 ./server.sh start|stop|restart|status

Kafdrop 下载安装部署

官方下载地址:https://github.com/obsidiandynamics/kafdrop/releases

kafdrop 下载下来就是一个jar包,可以直接使用,将下载的jar包上传到指定目录下

kafdrop 启停脚本

more server.sh #!/bin/bash source /etc/profile case "$1" in start) nohup java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=192.168.1.66:9092,192.168.1.162:9092 >> nohup.out 2>&1 & echo 'is running ...' sleep 2 KAFDROP="`ps -ef|grep kafdrop-4.0.2.jar|wc -l`" if [ $KAFDROP -eq 2 ]; then echo 'kafdrop-4.0.2.jar start success' else echo 'kafdrop-4.0.2.jar start fail' fi ;; stop) ps -ef|grep kafdrop-4.0.2.jar|grep -v grep|awk '{print $2}'|xargs kill -9 &>/dev/null sleep 2 if [ $? -ne 0 ]; then echo 'kafdrop-4.0.2.jar Stop the abnormality. Maybe the service is not started or there are other reasons' exit 1 else echo 'kafdrop-4.0.2.jar The service has been successfully stopped' fi ;; restart) $0 stop sleep 2 $0 start ;; check) KAFDROP=`ps -ef|grep kafdrop-4.0.2.jar|wc -l` if [ $KAFDROP -eq 2 ]; then echo 'kafdrop-4.0.2.jar service is running ...' else echo 'kafdrop-4.0.2.jar service is not running ...' fi ;; *) echo "Please input the correct parameters:$0 {start|stop|restart|check}" exit 1 esac exit 0 #启停脚本使用 ./server.sh start|stop|restart|check
http://www.jsqmd.com/news/778166/

相关文章:

  • 语言模型在沟通障碍场景下的性能优化实践
  • clawplay:基于Python的剧本化Web自动化与数据抓取框架实战
  • 中层管理者眼中的“A小姐”与“C先生”:绩效考核之外考验管理者的逆向领导力
  • SPG:扩散语言模型的稳定强化学习策略梯度方法
  • 祛痘泥膜哪个牌子好12天深度排浊净肌,告别脸蛋脏闷感 - 全网最美
  • 什么去黑头泥膜好用 7 天搞定顽固性黑头,亲测巨有效 - 全网最美
  • 2026年陕西及西安职高升学首选榜单及本科逆袭路径 - 深度智识库
  • AI辅助CTF解题:提示词工程与安全研究新范式
  • 免费开源矢量图形编辑器 Inkscape 1.4.4 发布:修复众多问题,提升性能还添新功能
  • 隐私计算框架Tensory:加密张量运算与机器学习安全实践
  • LLM增强扩散模型:提升文本到图像生成的语义理解
  • codebase-intel:为AI编程助手注入项目记忆与工程纪律的上下文智能层
  • 2026年上海瑜伽教培机构对比|亚太瑜伽TOP1,评分、价格、推荐率全解析 - 速递信息
  • 使用Taotoken CLI工具一键配置多开发环境下的模型调用参数
  • 2026 AI大会日程倒计时启动:3月锁定名额,6月关闭注册,8月关闭论文投稿(附各大会DDL对照表)
  • 2026年AI训练素材、图片、视频等数据集供应商推荐(附选型对比与避坑指南) - 品牌2025
  • EDA工具演进:从自动化到决策赋能,破解芯片设计生产力悖论
  • 素数筛-试除法 埃氏筛 线性筛
  • HookLaw:用React Hooks范式统一管理JavaScript副作用
  • FPGA与PC高速数据通道:基于FTDI同步FIFO的实战设计
  • 2026年设计师必备:十大电商主图、印刷行业图片与样机素材优质网站推荐 - 品牌2025
  • 2026年5月济南建设工程/股权/知识产权/租赁/合同纠纷处理指南:为何刘迅律师是您的优选专家? - 2026年企业推荐榜
  • Eclair:将Datalog逻辑程序编译为LLVM原生代码的实验性编译器
  • SAFE框架:提升LLM长文本生成质量的关键技术
  • 大语言模型逻辑键结构:原理、分析与优化实践
  • Docker容器化部署SoulseekQt:实现音乐共享服务的无头化与网页访问
  • 2026年GPON OLT厂家推荐:国内主流品牌实力解析,高性价比选型指南 - 速递信息
  • Claude Context:基于MCP与向量数据库的AI编程助手代码库语义搜索方案
  • Cursor设备ID修改脚本解析:原理、风险与合规替代方案
  • 分布式代理节点动作对齐检测与纠正技术解析