准备工作
搭建一个3节点的kafka集群环境,机器规划如下:
序号 | ip | hostname |
1 | 192.168.153.100 | s1 |
2 | 192.168.153.101 | s2 |
3 | 192.168.153.102 | s3 |
下载最新版本的kafka安装包:
下载后上传kafka_2.13-3.7.0.tgz文件到s1节点并解压,运行如下命令解压:
进入到/opt目录:
cd /opt
解压:
tar -zxvf kafka_2.13-3.7.0.tgz
重命名:
mv kafka_2.13-3.7.0 kafka
创建kafka存储日期目录:
mkdir -p /opt/kafka/kafka-logs
修改kafka配置
进入s1节点的/opt/kafka/config目录:
cd /opt/kafka/config
修改server.properties配置:
配置服务监听:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://s1:9092
配置kafka存储日志目录:
log.dirs=/opt/kafka/kafka-logs
配置zookeeper连接:
zookeeper.connect=s1:2181,s2:2181,s3:2181/kafka
拷贝/opt/kafka目录到s2,s3节点,这里采用scp命令进行拷贝:
拷贝到s2:
scp -r /opt/kafka root@s2:/opt
拷贝到s3:
scp -r /opt/kafka root@s3:/opt
在s2,s3节点上修改kafka的server.properties配置文件:
s2节点修改如下:
advertised.listeners=PLAINTEXT://s2:9092
s3节点修改如下:
advertised.listeners=PLAINTEXT://s3:9092
启动kafka集群
进入s1的/opt/kafka目录:
cd /opt/kafka
启动kafka服务:
bin/kafka-server-start.sh -daemon config/server.properties
在s2,s3节点上执行上面的命令
测试集群是否启动成功
创建topic进行测试:
bin/kafka-topics.sh --bootstrap-server=s1:9092 --create --topic topic-demo --partitions=3 --replication-factor=3
查看是否创建topic成功:
bin/kafka-topics.sh --bootstrap-server=s1:9092 --topic topic-demo --list
在s2,s2 启动消费者:
bin/kafka-console-consumer.sh --bootstrap-server s1:9092 --topic topic-demo
在s1启动生产者发送消息:
bin/kafka-console-producer.sh --bootstrap-server=s1:9092 --topic topic-demo
在s2,s3节点查看收到的消息:
查看topic详情:
bin/kafka-topics.sh --bootstrap-server=s1:9092 --topic topic-demo --describe