Linux CentOS 7 安装 Kafka 2.8.2 - 单机版 & JDK 11 & 切换 JDK版本 & cmak
Kafka 从 2.6.0 开始,默认使用 Java 11 , 3.0.0 开始,不再支持 Java 8,详见:
https://kafka.apache.org/downloads
- Producer:消息生产者,就是向 kafka broker 发消息的客户端:
- Consumer:消息消费者,向 kafka broker 取消息的客户端;
- ConsumerGroup:消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
不同的组可以消费同一个消息,且只能被消费组内的一个消费者消费 - Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
- Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
- Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
- Replica: 副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个 Leader 和 若干个 follower
- leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
- follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。
安装 JDK 11
[root@kafka1host ~]# mkdir /usr/local/java
# 解压JDK
[root@kafka1host ~]# tar -zxvf jdk-11.0.17_linux-x64_bin.tar.gz -C /usr/local/java
# 注意 由于jdk1.8版本之后无 jre. 需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre
# 配置环境变量
[root@localhost~]# vi /etc/profile
export JAVA_HOME=/usr/local/java/jdk-11.0.17
export JRE_HOME=${JAVA_HOME}
export PATH=$PATH:${JAVA_HOME}/bin
export CLASSPATH=./:${JAVA_HOME}/lib:${JAVA_HOME}/lib
# 让环境变更生效
[root@localhost~]# source /etc/profile
# 此时 java -version 仍然显示旧版本 【1. 删除旧版本、2. 切换Java版本】
[root@kafka1host kafka]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@kafka1host kafka]#
切换 JAVA 版本,(
如需要卸载旧版本,点击此处
)
# 查看已安装的Java版本及其路径,/etc/alternatives/java 是当前Java版本的符号链接。
[root@kafka1host ~]# ls -l /usr/bin/java
lrwxrwxrwx. 1 root root 22 Apr 27 2021 /usr/bin/java -> /etc/alternatives/java
# 查看当前Java版本的可执行文件路径: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java 是当前Java版本的可执行文件路径。
[root@kafka1host ~]# ls -l /etc/alternatives/java
lrwxrwxrwx. 1 root root 71 Apr 27 2021 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java
# alternatives 注册 Java 版本信息
[root@kafka1host ~]# alternatives --install /usr/bin/java java /usr/local/java/jdk-11.0.17/bin/java 1
# alternatives --install <link> <name> <path> <priority>
# install 表示安装
# link 是符号链接
# name 则是标识符
# path 是执行文件的路径
# priority 则表示优先级
# 选择Java配置版本
[root@kafka1host ~]# sudo alternatives --config java
There are 3 programs which provide 'java'.
Selection Command
-----------------------------------------------
*+ 1 java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java)
2 java-1.7.0-openjdk.x86_64 (/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64/jre/bin/java)
3 /usr/local/java/jdk-11.0.17/bin/java
# 输入对应版本的编号,然后按Enter键
Enter to keep the current selection[+], or type selection number: 3
# 查看Java版本
[root@kafka1host ~]# java -version
java version "11.0.17" 2022-10-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.17+10-LTS-269)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.17+10-LTS-269, mixed mode)
[root@kafka1host ~]#
注意: 由于jdk1.8版本之后无 jre. 如果需要运行 tomcat ,需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre
安装 Kafka
下载 Kafka 2.8.2
https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
防火墙
配置防火墙规则
# 查看防火墙状态
[root@kafka1host ~]# firewall-cmd --state
running
# 查看默认作用域 -- 一般不需要查看
[root@kafka1host ~]# firewall-cmd --get-default-zone
public
# 添加防火墙规则,允许访问 2181、9092 端口
[root@kafka1host ~]# firewall-cmd --permanent --zone=public --add-port=2181/tcp
# –permanent : 表示使设置永久生效,不加的话机器重启之后失效,
# –add-port=2181/tcp : 表示添加一个端口和协议的规则,
# --zone=public: 作用域(默认为 public 可不加)
[root@kafka1host ~]# firewall-cmd --permanent --add-port=9092/tcp
# 更新防火墙规则
[root@kafka1host ~]# firewall-cmd --reload
# 查看所有打开的端口
[root@kafka1host ~]# firewall-cmd --list-port
2181/tcp 9092/tcp
修改配置
修改配置
server.properties
注意配置中的“=”前后不能有空格
# kafka broker 实际监听的地址和端口,集群间配置使用,如果不配置会使用 hostname 导致程序无法访问,如报:无法连接:kafka1host:9092
listeners=PLAINTEXT://172.16.30.100:9092
#允许删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
# 消息日志,默认日志会存放在 /tmp 目录下,有人喜欢放程序目录下
log.dirs=/tmp/kafka-logs
# 以下可不配置-------更多详细配置百度
# 对外提供的地址,它会注册到 zookeeper上,如果不配置,会使用上面 listeners 的值
# advertised.listeners=PLAINTEXT://10.100.25.230:9092
运行测试
一般 zookeeper 单独部署。这边单节点部署,为了省事,直接使用 kafka 包中内置的 Zookeeper。
启动 zookeeper
窗口A
# 先启动 zookeeper
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties
启动 Kafka , 新开一个命令行
窗口B
# 再启动 Kafka
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties
创建 topic 再开一个命令行
窗口C
测试
# 创建 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --create --topic kafka-vipsoft --replication-factor 1 --partitions 1 --zookeeper localhost:2181
Created topic kafka-vipsoft.
# 查看 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
kafka-vipsoft
# 删除 topic,如果 delete.topic.enable=true 没设的话,在kafka重启后才会生效
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --delete --topic kafka-vipsoft --zookeeper localhost:2181
Topic kafka-vipsoft is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
启动生产者
窗口C
# 启动生产者, 不能使用 localhost:9092 需要和 配置中的 listeners 保持一致
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-console-producer.sh --broker-list 172.0.30.100:9092 --topic kafka-vipsoft
> Hello 123
启动消费者,再开一个命令行
窗口D
#启动消费者
[root@kafka1host ~]# /usr/local/kafka-console-consumer.sh --bootstrap-server 172.16.3.203:9092 --topic kafka-vipsoft --from-beginning
Hello 123
自启动
创建启动命令
vi /usr/local/kafka_2.13-2.8.2/kafka_start.sh
#!/bin/sh
/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties &
sleep 5 #先启动 zookeeper,等5秒后启动 kafka
#启动kafka
/usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties &
创建停止命令
vi /usr/local/kafka_2.13-2.8.2/kafka_stop.sh
#!/bin/sh
/usr/local/kafka_2.13-2.8.2/bin/kafka-server-stop.sh &
sleep 3 #先停Kafka 再停 zookeeper 否则 kafka 停不掉
/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-stop.sh
修改脚本执行权限
chmod 775 kafka_start.sh
chmod 775 kafka_stop.sh
验证脚本
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh
设置开机启动
vi /etc/rc.d/rc.local
# 添加如下脚本
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh &
验证端口
查看多端口状态
netstat -ntpl | grep ':2181\|:9020'
[root@kafka1host ~]# netstat -ntpl | grep ':2181\|:9092'
tcp6 0 0 172.16.3.203:9092 :::* LISTEN 5419/java
tcp6 0 0 :::2181 :::* LISTEN 4182/java
Kafka 管理(CMAK)
需要JDK 11
下载
CMAK(Kafka 图形界面管理工具)
https://github.com/yahoo/CMAK/releases
安装
[root@kafka1host ~]# unzip cmak-3.0.0.6.zip
[root@kafka1host ~]# cd cmak-3.0.0.6/
# 修改配置
[root@kafka1host cmak-3.0.0.6]# cp ./conf/application.conf ./conf/application.conf.bak
kafka-manager.zkhosts="localhost:2181"
cmak.zkhosts="localhost:2181"
#翻到最下面,修改密码
basicAuthentication.password="123456"
# 启动 cmak
[root@kafka1host cmak-3.0.0.6]# nohup bin/cmak 1>nohup.out 2>&1 &
[3] 17625
[root@kafka1host cmak-3.0.0.6]# netstat -ntpl | grep '9000'
tcp6 0 0 :::9000 :::* LISTEN 17625/java
# 配置防火墙
[root@kafka1host cmak-3.0.0.6]# firewall-cmd --permanent --zone=public --add-port=9000/tcp
# 更新防火墙规则
[root@kafka1host cmak-3.0.0.6]# firewall-cmd --reload