kafka-在centos7上集群部署
目录
1.环境准备
1.1环境概述
1.2配置hosts(kafka01、kafka02、kafka03)
1.3关闭防火墙(kafka01、kafka02、kafka03)
1.4设置时钟同步(kafka01、kafka02、kafka03)
1.5配置免密登录(kafka01)
1.6安装JDK(kafka01、kafka02、kafka03)
2.zookeeper集群环境部署
2.1下载zookeeper(kafka01、kafka02、kafka03)
2.2解压zookeeper(kafka01、kafka02、kafka03)
2.3配置环境变量(kafka01、kafka02、kafka03)
2.4修改配置(kafka01、kafka02、kafka03)
2.5标识节点(kafka01、kafka02、kafka03)
2.6启动集群(kafka01、kafka02、kafka03)
3.kafka集群环境部署
3.1下载kafka(kafka01、kafka02、kafka03)
3.2解压kafka(kafka01、kafka02、kafka03)
3.3配置环境变量(kafka01、kafka02、kafka03)
3.4修改配置(kafka01、kafka02、kafka03)
3.5启动集群(kafka01、kafka02、kafka03)
3.6验证群集(kafka01)
3.6.1创建主题
3.6.2查看主题
3.6.3删除主题
3.6.4生产消息
3.6.5消费消息
1.环境准备
1.1环境概述
序号 | 主机名称 | IP地址 | 角色 | 配置 |
1 | kafka01 | 10.9.254.41 | zookeeper01、kafka01 | 4C4G60G |
2 | kafka02 | 10.9.254.42 | zookeeper02、kafka02 | 4C4G60G |
3 | kafka03 | 10.9.254.43 | zookeeper03、kafka03 | 4C4G60G |
版本信息
序号 | 信息 | 备注 |
1 | 系统版本 | CentOS 7.6 |
2 | JDK版本 | 8u201 |
3 | zookeeper版本 | 3.7.0 |
4 | kafka版本 | 2.8.0 |
1.2配置hosts(kafka01、kafka02、kafka03)
cat >/etc/hosts
10.9.254.41 kafka01 zookeeper01
10.9.254.42 kafka02 zookeeper02
10.9.254.43 kafka03 zookeeper03
EOF
[root@localhost ~]# hostnamectl set-hostname kafka01 [root@localhost ~]# su - root [root@kafka01 ~]# cat >/etc/hosts > 10.9.254.41 kafka01 zookeeper01 > 10.9.254.42 kafka02 zookeeper02 > 10.9.254.43 kafka03 zookeeper03 > EOF [root@kafka01 ~]# cat /etc/hosts |
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 10.9.254.41 kafka01 zookeeper01 10.9.254.42 kafka02 zookeeper02 10.9.254.43 kafka03 zookeeper03 |
1.3关闭防火墙(kafka01、kafka02、kafka03)
[root@kafka01 ~]# systemctl disable --now firewalld [root@kafka01 ~]# setenforce 0 |
setenforce: SELinux is disabled |
[root@kafka01 ~]# sed -i 's#SELINUX=enforcing#SELINUX=disabled#g' /etc/sysconfig/selinux [root@kafka01 ~]# sed -i 's#SELINUX=enforcing#SELINUX=disabled#g' /etc/selinux/config [root@kafka01 ~]# cat /etc/selinux/config |
# This file controls the state of SELinux on the system. # SELINUX= can take one of these three values: # enforcing - SELinux security policy is enforced. # permissive - SELinux prints warnings instead of enforcing. # disabled - No SELinux policy is loaded. SELINUX=disabled # SELINUXTYPE= can take one of three values: # targeted - Targeted processes are protected, # minimum - Modification of targeted policy. Only selected processes are protected. # mls - Multi Level Security protection. SELINUXTYPE=targeted |
1.4设置时钟同步(kafka01、kafka02、kafka03)
[root@kafka01 ~]# dnf -y install chrony [root@kafka01 ~]# ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime [root@kafka01 ~]# echo 'Asia/Shanghai' >/etc/timezone [root@kafka01 ~]# systemctl start chronyd [root@kafka01 ~]# systemctl enable chronyd [root@kafka01 ~]# vi /etc/chrony.conf |
#pool 2.pool.ntp.org iburst server ntp.aliyun.com iburst server cn.ntp.org.cn iburst |
[root@kafka01 ~]# systemctl restart chronyd [root@kafka01 ~]# chronyc sources -v |
.-- Source mode '^' = server, '=' = peer, '#' = local clock. / .- Source state '*' = current best, '+' = combined, '-' = not combined, | / 'x' = may be in error, '~' = too variable, '?' = unusable. || .- xxxx [ yyyy ] +/- zzzz || Reachability register (octal) -. | xxxx = adjusted offset, || Log2(Polling interval) --. | | yyyy = measured offset, || \ | | zzzz = estimated error. || | | \ MS Name/IP address Stratum Poll Reach LastRx Last sample =============================================================================== ^* 203.107.6.88 2 6 17 3 +1563us[+5585us] +/- 23ms ^? 36.154.179.82 0 7 0 - +0ns[ +0ns] +/- 0ns |
1.5配置免密登录(kafka01)
[root@kafka01 ~]# yum -y install openssh-clients [root@kafka01 ~]# ssh-keygen -t rsa |
Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:jxd+TBGMR+DsLZA0LAupLJF2MD0i7x+8ac9qGlyY0SY root@kafka01 The key's randomart image is: +---[RSA 2048]----+ | +o . .o .=o | |+o.=o ...=. o. | |o+E.+. oo o.. | |. +* . o . . | | oo.. S + o | | ...o + = | | o. + . + o | | .*. . . | | .+.oo | +----[SHA256]-----+ |
[root@kafka01 ~]# ssh-copy-id -p 43215 -i .ssh/id_rsa.pub kafka01 [root@kafka01 ~]# ssh-copy-id -p 43215 -i .ssh/id_rsa.pub kafka02 [root@kafka01 ~]# ssh-copy-id -p 43215 -i .ssh/id_rsa.pub kafka03 |
1.6安装JDK(kafka01、kafka02、kafka03)
[root@kafka01 ~]# rpm -qa | grep java [root@kafka01 ~]# ll /usr/local/src/ |
total 172080 -rw-r--r-- 1 root root 176209195 Aug 11 22:01 jdk-8u201-linux-x64.rpm |
[root@kafka01 ~]# rpm -ivh /usr/local/src/jdk-8u201-linux-x64.rpm |
warning: /usr/local/src/jdk-8u201-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY Preparing... ################################# [100%] Updating / installing... 1:jdk1.8-2000:1.8.0_201-fcs ################################# [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar... |
[root@kafka01 ~]# java -version |
java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode) |
[root@kafka01 ~]# vim /etc/profile |
JAVA_HOME=/usr/java/jdk1.8.0_201-amd64 JRE_HOME=/usr/java/jdk1.8.0_201-amd64/jre PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME PATH CLASSPATH |
[root@kafka01 ~]# source /etc/profile [root@kafka01 ~]# echo $PATH |
/usr/local/sbin:/usr/bin:/bin:/usr/sbin:/sbin:/root/bin:/usr/java/jdk1.8.0_201-amd64/bin:/usr/java/jdk1.8.0_201-amd64/jre/bin |
2.zookeeper集群环境部署
2.1下载zookeeper(kafka01、kafka02、kafka03)
https://archive.apache.org/dist/zookeeper/
https://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
[root@kafka01 ~]# wget -P /usr/local/src/ https://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz [root@kafka01 ~]# ll /usr/local/src/ |
total 184180 -rw-r--r-- 1 root root 12387614 Mar 27 18:12 apache-zookeeper-3.7.0-bin.tar.gz -rw-r--r-- 1 root root 176209195 Aug 11 22:01 jdk-8u201-linux-x64.rpm |
2.2解压zookeeper(kafka01、kafka02、kafka03)
[root@kafka01 ~]# mkdir -p /opt/software [root@kafka01 ~]# tar -zxvf /usr/local/src/apache-zookeeper-3.7.0-bin.tar.gz -C /opt/software/ [root@kafka01 ~]# mv /opt/software/apache-zookeeper-3.7.0-bin/ /opt/software/zookeeper-3.7.0 [root@kafka01 ~]# ll /opt/software/zookeeper-3.7.0/ |
total 36 drwxr-xr-x 2 kaifa kaifa 4096 Mar 17 17:45 bin drwxr-xr-x 2 kaifa kaifa 77 Mar 17 17:45 conf drwxr-xr-x 5 kaifa kaifa 4096 Mar 17 17:45 docs drwxr-xr-x 2 root root 4096 Aug 11 22:31 lib -rw-r--r-- 1 kaifa kaifa 11358 Mar 17 17:45 LICENSE.txt -rw-r--r-- 1 kaifa kaifa 432 Mar 17 17:45 NOTICE.txt -rw-r--r-- 1 kaifa kaifa 2214 Mar 17 17:45 README.md -rw-r--r-- 1 kaifa kaifa 3570 Mar 17 17:45 README_packaging.md |
2.3配置环境变量(kafka01、kafka02、kafka03)
[root@kafka01 ~]# vim /etc/profile |
export ZOOKEEPER_HOME=/opt/software/zookeeper-3.7.0 export PATH=${ZOOKEEPER_HOME}/bin:$PATH |
[root@kafka01 ~]# source /etc/profile [root@kafka01 ~]# echo $ZOOKEEPER_HOME |
/opt/software/zookeeper-3.7.0 |
2.4修改配置(kafka01、kafka02、kafka03)
[root@kafka01 ~]# cd /opt/software/zookeeper-3.7.0/conf/ [root@kafka01 conf]# cp zoo_sample.cfg zoo.cfg [root@kafka01 conf]# vim zoo.cfg |
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/software/zookeeper-3.7.0/zoo_data dataLogDir=/opt/software/zookeeper-3.7.0/zoo_logs clientPort=2181 server.1=kafka01:2888:3888 server.2=kafka02:2888:3888 server.3=kafka03:2888:3888 |
[root@kafka01 conf]# mkdir /opt/software/zookeeper-3.7.0/zoo_data [root@kafka01 conf]# mkdir /opt/software/zookeeper-3.7.0/zoo_logs |
2.5标识节点(kafka01、kafka02、kafka03)
[root@kafka01 conf]# echo "1" > /opt/software/zookeeper-3.7.0/zoo_data/myid [root@kafka01 conf]# cat /opt/software/zookeeper-3.7.0/zoo_data/myid |
1 |
[root@kafka02 conf]# echo "2" > /opt/software/zookeeper-3.7.0/zoo_data/myid [root@kafka02 conf]# cat /opt/software/zookeeper-3.7.0/zoo_data/myid |
2 |
[root@kafka03 conf]# echo "3" > /opt/software/zookeeper-3.7.0/zoo_data/myid [root@kafka03 conf]# cat /opt/software/zookeeper-3.7.0/zoo_data/myid |
3 |
2.6启动集群(kafka01、kafka02、kafka03)
[root@kafka01 ~]# zkServer.sh start |
ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Starting zookeeper ... STARTED |
[root@kafka01 ~]# jps |
27591 Jps 27326 QuorumPeerMain |
[root@kafka01 ~]# zkServer.sh status |
ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower |
[root@kafka01 ~]# zkServer.sh stop |
ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED |
编写群起脚本(kafka01)
[root@kafka01 ~]# vim /opt/software/zookeeper-3.7.0/bin/zookeeper |
#!/bin/bash red='\e[91m' green='\e[92m' yellow='\e[93m' magenta='\e[95m' cyan='\e[96m' none='\e[0m' zk_home=$ZOOKEEPER_HOME cluster_array=(kafka01 kafka02 kafka03) # 1. 获取输入参数个数,如果没有参数,直接退出 if (($# == 0)); then echo -e ------------------- ${yellow}no args${none} ------------------- exit fi # 2. 根据指令循环操作集群 case $1 in "start") { for host in ${cluster_array[@]}; do echo -e ------------------- $yellow$host$none ------------------- ssh -p 43215 $host "$zk_home/bin/zkServer.sh start" done } ;; "stop") { for host in ${cluster_array[@]}; do echo -e ------------------- $yellow$host$none ------------------- ssh -p 43215 $host "$zk_home/bin/zkServer.sh stop" done } ;; "status") { for host in ${cluster_array[@]}; do echo -e ------------------- $yellow$host$none ------------------- ssh -p 43215 $host "$zk_home/bin/zkServer.sh status" done } ;; esac |
[root@kafka01 ~]# chmod 755 /opt/software/zookeeper-3.7.0/bin/zookeeper [root@kafka01 ~]# zookeeper start |
------------------- kafka01 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Starting zookeeper ... STARTED ------------------- kafka02 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Starting zookeeper ... STARTED ------------------- kafka03 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Starting zookeeper ... STARTED |
[root@kafka01 ~]# zookeeper status |
------------------- kafka01 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower ------------------- kafka02 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader ------------------- kafka03 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower |
[root@kafka01 ~]# zookeeper stop |
------------------- kafka01 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED ------------------- kafka02 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED ------------------- kafka03 ------------------- /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper-3.7.0/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED |
3.kafka集群环境部署
3.1下载kafka(kafka01、kafka02、kafka03)
http://kafka.apache.org/downloads
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
[root@kafka01 ~]# wget -P /usr/local/src/ https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz [root@kafka01 ~]# ll /usr/local/src |
total 253912 -rw-r--r-- 1 root root 12387614 Mar 27 18:12 apache-zookeeper-3.7.0-bin.tar.gz -rw-r--r-- 1 root root 176209195 Aug 11 22:01 jdk-8u201-linux-x64.rpm -rw-r--r-- 1 root root 71403603 Apr 19 12:44 kafka_2.13-2.8.0.tgz |
3.2解压kafka(kafka01、kafka02、kafka03)
[root@kafka01 ~]# tar -zxvf /usr/local/src/kafka_2.13-2.8.0.tgz -C /opt/software/ [root@kafka01 ~]# mv /opt/software/kafka_2.13-2.8.0/ /opt/software/kafka-2.8.0 [root@kafka01 ~]# ll /opt/software/kafka-2.8.0/ |
total 40 drwxr-xr-x 3 root root 4096 Apr 14 22:34 bin drwxr-xr-x 3 root root 4096 Apr 14 22:34 config drwxr-xr-x 2 root root 8192 Aug 11 23:37 libs -rw-r--r-- 1 root root 14515 Apr 14 22:28 LICENSE drwxr-xr-x 2 root root 236 Apr 14 22:34 licenses -rw-r--r-- 1 root root 953 Apr 14 22:28 NOTICE drwxr-xr-x 2 root root 44 Apr 14 22:34 site-docs |
3.3配置环境变量(kafka01、kafka02、kafka03)
[root@kafka01 ~]# vim /etc/profile |
export KAFKA_HOME=/opt/software/kafka-2.8.0 export PATH=${KAFKA_HOME}/bin:$PATH |
[root@kafka01 ~]# source /etc/profile [root@kafka01 ~]# echo $KAFKA_HOME |
/opt/software/kafka-2.8.0 |
3.4修改配置(kafka01、kafka02、kafka03)
[root@kafka01 ~]# mkdir /opt/software/kafka-2.8.0/kafka-logs [root@kafka01 ~]# vim /opt/software/kafka-2.8.0/config/server.properties |
broker.id=1 log.dirs=/opt/software/kafka-2.8.0/kafka-logs zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181 |
[root@kafka02 ~]# vim /opt/software/kafka-2.8.0/config/server.properties |
broker.id=2 log.dirs=/opt/software/kafka-2.8.0/kafka-logs zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181 |
[root@kafka03 ~]# vim /opt/software/kafka-2.8.0/config/server.properties |
broker.id=3 log.dirs=/opt/software/kafka-2.8.0/kafka-logs zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181 |
[root@kafka01 ~]# vim /opt/software/kafka-2.8.0/config/producer.properties |
bootstrap.servers=kafka01:9092,kafka02:9092,kafka03:9092 |
[root@kafka01 ~]# vim /opt/software/kafka-2.8.0/config/consumer.properties |
bootstrap.servers=kafka01:9092,kafka02:9092,kafka03:9092 |
3.5启动集群(kafka01、kafka02、kafka03)
[root@kafka01 ~]# kafka-server-start.sh -daemon /opt/software/kafka-2.8.0/config/server.properties |
[root@kafka01 ~]# kafka-server-stop.sh |
编写群起脚本
[root@kafka01 ~]# vim /opt/software/kafka-2.8.0/bin/kafka |
#! /bin/bash case $1 in "start"){ for i in kafka01 kafka02 kafka03 do echo " --------启动 $i Kafka-------" ssh -p 43215 $i "/opt/software/kafka-2.8.0/bin/kafka-server-start.sh -daemon /opt/software/kafka-2.8.0/config/server.properties " done };; "stop"){ for i in kafka01 kafka02 kafka03 do echo " --------停止 $i Kafka-------" ssh -p 43215 $i "/opt/software/kafka-2.8.0/bin/kafka-server-stop.sh stop" done };; esac |
[root@kafka01 ~]# chmod 755 /opt/software/kafka-2.8.0/bin/kafka [root@kafka01 ~]# kafka stop |
--------停止 kafka01 Kafka------- No kafka server to stop --------停止 kafka02 Kafka------- No kafka server to stop --------停止 kafka03 Kafka------- No kafka server to stop |
[root@kafka01 ~]# kafka start |
--------启动 kafka01 Kafka------- --------启动 kafka02 Kafka------- --------启动 kafka03 Kafka------- |
[root@kafka01 ~]# jps |
31122 Kafka 31277 Jps 27326 QuorumPeerMain |
3.6验证群集(kafka01)
3.6.1创建主题
创建一个副本数为 1、分区数为 3、名为 test 的主题
[root@kafka01 ~]# kafka-topics.sh --zookeeper kafka01:2181 --create --replication-factor 1 --partitions 3 --topic test |
Created topic test. |
3.6.2查看主题
列出所有主题
[root@kafka01 ~]# kafka-topics.sh --zookeeper kafka01:2181 --list |
test |
查看某个主题的详情
[root@kafka01 ~]# kafka-topics.sh --zookeeper kafka01:2181 --describe --topic test |
Topic: test TopicId: 7_2bA04STq2UGGFMX30RQQ PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: test Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test Partition: 2 Leader: 2 Replicas: 2 Isr: 2 |
[root@kafka01 ~]# ll /opt/software/kafka-2.8.0/kafka-logs/ |
total 16 -rw-r--r-- 1 root root 0 Aug 12 00:01 cleaner-offset-checkpoint -rw-r--r-- 1 root root 4 Aug 12 00:21 log-start-offset-checkpoint -rw-r--r-- 1 root root 88 Aug 12 00:12 meta.properties -rw-r--r-- 1 root root 13 Aug 12 00:21 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 13 Aug 12 00:21 replication-offset-checkpoint drwxr-xr-x 2 root root 141 Aug 12 00:18 test-1 |
[root@kafka02 ~]# ll /opt/software/kafka-2.8.0/kafka-logs/ |
total 16 -rw-r--r-- 1 root root 0 Aug 12 00:12 cleaner-offset-checkpoint -rw-r--r-- 1 root root 4 Aug 12 00:22 log-start-offset-checkpoint -rw-r--r-- 1 root root 88 Aug 12 00:12 meta.properties -rw-r--r-- 1 root root 13 Aug 12 00:22 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 13 Aug 12 00:22 replication-offset-checkpoint drwxr-xr-x 2 root root 141 Aug 12 00:18 test-2 |
[root@kafka03 ~]# ll /opt/software/kafka-2.8.0/kafka-logs/ |
total 16 -rw-r--r-- 1 root root 0 Aug 12 00:12 cleaner-offset-checkpoint -rw-r--r-- 1 root root 4 Aug 12 00:22 log-start-offset-checkpoint -rw-r--r-- 1 root root 88 Aug 12 00:12 meta.properties -rw-r--r-- 1 root root 13 Aug 12 00:22 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 13 Aug 12 00:23 replication-offset-checkpoint drwxr-xr-x 2 root root 141 Aug 12 00:18 test-0 |
3.6.3删除主题
[root@kafka01 ~]# kafka-topics.sh --zookeeper kafka01:2181 --delete --topic test |
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. |
3.6.4生产消息
在kafka01中生产消息
[root@kafka01 ~]# kafka-console-producer.sh --broker-list kafka01:9092 --topic test |
>hello >world > |
3.6.5消费消息
在kafka02中消费消息
[root@kafka02 ~]# kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test |
hello world ^CProcessed a total of 2 messages |
