jdk+zookeeper+kafka 搭建kafka集群
环境准备
环境资源包:
jdk-8u341-linux-x64.tar.gz
kafka_2.12-2.2.0.tgz
zookeeper-3.4.14.tar.gz
| server-id | ip | 状态 |
|---|---|---|
| server1 | 10.206.120.10 | leader |
| server2 | 10.206.120.2 | follower |
| server3 | 10.206.120.3 | follower |
一、安装jdk
因为kafka需要Java环境,所以优先配置jdk环境,若已经配置了java环境,此步骤可以忽略
[root@VM-120-2-centos ~]# tar -xvf jdk-8u341-linux-x64.tar.gz [root@VM-120-2-centos ~]# mv jdk1.8.0_341/ /usr/local/ #在文件末尾加入以下语句 [root@VM-120-2-centos ~]# vim /etc/profile export JAVA_HOME=/usr/local/jdk1.8.0_341 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib [root@VM-120-2-centos ~]# source /etc/profile [root@VM-120-2-centos ~]# java -version java version "1.8.0_341" Java(TM) SE Runtime Environment (build 1.8.0_341-b10) Java HotSpot(TM) 64-Bit Server VM (build 25.341-b10, mixed mode) 至此jdk环境配置完成
二、zookeeper集群安装
[root@VM-120-2-centos ~]# cd [root@VM-120-2-centos ~]# tar -xvf zookeeper-3.4.14.tar.gz [root@VM-120-2-centos ~]# mv zookeeper-3.4.14 /usr/local/zookeeper [root@VM-120-2-centos ~]# cd /usr/local/zookeeper/conf/ [root@VM-120-2-centos ~]# cp zoo_sample.cfg zoo.cfg [root@VM-120-2-centos ~]# vim zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataLogDir=/usr/local/zookeeper/logs dataDir=/usr/local/zookeeper/data clientPort=2181 autopurge.snapRetainCount=500 autopurge.purgeInterval=24 server.1= 10.206.120.10:2888:3888 server.2= 10.206.120.2:2888:3888 server.3= 10.206.120.2:2888:3888 [root@VM-120-2-centos ~]# mkdir /usr/local/zookeeper/data #10.206.120.10服务器上执行 [root@VM-120-2-centos ~]# echo "1" > /usr/local/zookeeper/data/myid #10.206.120.2服务器上执行 [root@VM-120-2-centos ~]# echo "2" > /usr/local/zookeeper/data/myid #10.206.120.3服务器上执行 [root@VM-120-2-centos ~]# echo "3" > /usr/local/zookeeper/data/myid [root@VM-120-2-centos ~]# cd ../bin/ [root@VM-120-2-centos ~]# ./zkServer.sh start [root@VM-120-2-centos ~]# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: leader [root@VM-120-2-centos ~]# netstat -ntlp 看看端口是否正常启动
二、kafka集群安装
[root@VM-120-2-centos ~]# cd [root@VM-120-2-centos ~]# tar -xvf kafka_2.12-2.2.0.tgz [root@VM-120-2-centos ~]# mv kafka_2.12-2.2.0 /usr/local/kafka [root@VM-120-2-centos ~]# cd /usr/local/kafka/config/ [root@VM-120-2-centos ~]# cp server.properties server.properties.bak #10.206.120.2服务器上将以下内容写入文件 [root@VM-120-2-centos ~]# vim server.properties broker.id=2 listeners=PLAINTEXT://10.206.120.2:9092 advertised.listeners=PLAINTEXT://43.137.8.225:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=204857600 log.dirs=/usr/local/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=1680 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=10 max.poll.interval.ms=800000 max.poll.records=50 #10.206.120.10服务器上将以下内容写入文件 [root@VM-120-2-centos ~]# vim server.properties broker.id=1 listeners=PLAINTEXT://10.206.120.10:9092 advertised.listeners=PLAINTEXT://118.195.137.101:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=204857600 log.dirs=/usr/local/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=1680 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=10 max.poll.interval.ms=800000 max.poll.records=50 #10.206.120.3服务器上将以下内容写入文件 [root@VM-120-2-centos ~]# vim server.properties broker.id=3 listeners=PLAINTEXT://10.206.120.3:9092 advertised.listeners=PLAINTEXT://175.27.146.204:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=204857600 log.dirs=/usr/local/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=1680 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=10 max.poll.interval.ms=800000 max.poll.records=50 [root@VM-120-2-centos ~]# cd ../bin/ [root@VM-120-2-centos ~]# nohup ./kafka-server-start.sh ../config/server.properties & #查看9092端口是否启动,启动即为正常 [root@VM-120-10-centos bin]# netstat -ntlp
#下面所有命令在kafka的bin目录下执行(笔者是/usr/local/kafka/bin) #创建topic话题 ./kafka-topics.sh --create --bootstrap-server 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --replication-factor 3 --topic test --partitions 3 #开启一个生产者 ./kafka-console-producer.sh --broker-list 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --topic test #开启一个消费者 ./kafka-console-consumer.sh --bootstrap-server 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --topic test --from-beginning #列出所有topic ./kafka-topics.sh --zookeeper 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 -list #输出某个topic ./kafka-topics.sh --zookeeper 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --delete --topic test #详细描述某个topic ./kafka-topics.sh --zookeeper 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --describe --topic test


至此集群搭建成功,可以正常使用了
四、测试
外网测试
#1、上传测试压缩包并解压如下图1
#2、更改application.yml将ip更改为集群主机的外网ip
spring:
application:
name: kafka-tester
kafka:
bootstrap-servers:
- 10.206.120.3:9092
- 10.206.120.2:9092
- 10.206.120.10:9092
producer:
topic: test
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
topic: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#3、 后台启动kafka测试包
nohup ./test.sh &
#4、开启一个消费者,查看jar包生产结果图2(说明kafka-test.jar包启动正常,kafka集群工作正常)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 175.27.146.204:9092,118.195.137.101:9092,43.137.8.225:9092 --topic test --from-beginning
#5、查看consumer_offsets只有一个副本数图三
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 175.27.146.204:2181,118.195.137.101:2181,43.137.8.225:2181 --describe --topic __consumer_offsets



将集群中的一个follower关机,模拟服务器宕机
结果发现消息队列出现问题(kafka集群不可用)

将服务器启动,并将zookeeper和kafka集群启动,此时集群才正常

故当__consumer_offsets副本数设置为1时,若服务器宕机了,则kafka集群无法正常使用,服务器启动,并将zookeeper和kafka集群启动,此时集群才正常
解决方案
1.修改系统_offsets副本数为3
修改kafka的核心配置文件server.properties
将num.partitions参数(默认为1)修改为3,
offsets.topic.replication.factor=3(默认为1)
另外需要添加auto.create.topics.enable=true

由于__consumer_offsets是kafka默认的主题,无法删除,我们可以删除zookeeper中的__consumer_offsets。
进入zookeeper/bin目录执行./zkCli.sh
ls /broksers/topics
rmr /brokers/topics/__consumer_offsets
ls /broksers/topics

先将集群停掉
在重新启动zookeeper和kafka
再次查看__consumer_offsets。发现副本数已经是3

将集群中的一个follower关机,模拟服务器宕机
消费队列正常

cd /usr/local/zookeeper/bin/
./zkServer.sh start
cd /usr/local/kafka/bin/
nohup ./kafka-server-start.sh …/config/server.properties &
./kafka-topics.sh –zookeeper 175.27.146.204:2181,118.195.137.101:2181,43.137.8.225:2181 –describe –topic __consumer_offsets

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/8650bc7957.html
