Kafka 从 2.6.0 开始,默认使用 Java 11 , 3.0.0 开始,不再支持 Java 8,详见:
https://kafka.apache.org/downloads

image

  • Producer:消息生产者,就是向 kafka broker 发消息的客户端:
  • Consumer:消息消费者,向 kafka broker 取消息的客户端;
  • ConsumerGroup:消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    不同的组可以消费同一个消息,且只能被消费组内的一个消费者消费
  • Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
  • Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  • Replica: 副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个 Leader 和 若干个 follower
  • leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  • follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

安装 JDK 11

[root@kafka1host ~]# mkdir /usr/local/java
# 解压JDK
[root@kafka1host ~]# tar -zxvf jdk-11.0.17_linux-x64_bin.tar.gz -C /usr/local/java
# 注意 由于jdk1.8版本之后无 jre. 需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre

# 配置环境变量
[root@localhost~]# vi /etc/profile
export JAVA_HOME=/usr/local/java/jdk-11.0.17
export JRE_HOME=${JAVA_HOME}
export PATH=$PATH:${JAVA_HOME}/bin
export CLASSPATH=./:${JAVA_HOME}/lib:${JAVA_HOME}/lib

# 让环境变更生效
[root@localhost~]# source /etc/profile
#  此时 java -version 仍然显示旧版本 【1. 删除旧版本、2. 切换Java版本】
[root@kafka1host kafka]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@kafka1host kafka]#

切换 JAVA 版本,(
如需要卸载旧版本,点击此处
)


# 查看已安装的Java版本及其路径,/etc/alternatives/java 是当前Java版本的符号链接。
[root@kafka1host ~]# ls -l /usr/bin/java
lrwxrwxrwx. 1 root root 22 Apr 27  2021 /usr/bin/java -> /etc/alternatives/java
# 查看当前Java版本的可执行文件路径: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java 是当前Java版本的可执行文件路径。
[root@kafka1host ~]# ls -l /etc/alternatives/java
lrwxrwxrwx. 1 root root 71 Apr 27  2021 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java
# alternatives 注册 Java 版本信息
[root@kafka1host ~]# alternatives --install /usr/bin/java java /usr/local/java/jdk-11.0.17/bin/java 1
	# alternatives --install <link> <name> <path> <priority>
	# install 表示安装
	# link 是符号链接
	# name 则是标识符
	# path 是执行文件的路径
	# priority 则表示优先级
# 选择Java配置版本
[root@kafka1host ~]# sudo alternatives --config java

There are 3 programs which provide 'java'.

  Selection    Command
-----------------------------------------------
*+ 1           java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java)
   2           java-1.7.0-openjdk.x86_64 (/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64/jre/bin/java)
   3           /usr/local/java/jdk-11.0.17/bin/java
# 输入对应版本的编号,然后按Enter键
Enter to keep the current selection[+], or type selection number: 3
# 查看Java版本
[root@kafka1host ~]# java -version
java version "11.0.17" 2022-10-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.17+10-LTS-269)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.17+10-LTS-269, mixed mode)
[root@kafka1host ~]# 

注意: 由于jdk1.8版本之后无 jre. 如果需要运行 tomcat ,需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre

安装 Kafka

下载 Kafka 2.8.2

https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz

image

防火墙

配置防火墙规则

# 查看防火墙状态
[root@kafka1host ~]# firewall-cmd --state 
running
# 查看默认作用域 -- 一般不需要查看
[root@kafka1host ~]# firewall-cmd --get-default-zone
public
# 添加防火墙规则,允许访问 2181、9092 端口
[root@kafka1host ~]# firewall-cmd --permanent --zone=public --add-port=2181/tcp
# –permanent : 表示使设置永久生效,不加的话机器重启之后失效,
# –add-port=2181/tcp : 表示添加一个端口和协议的规则,
# --zone=public: 作用域(默认为 public 可不加)
[root@kafka1host ~]# firewall-cmd --permanent --add-port=9092/tcp
# 更新防火墙规则
[root@kafka1host ~]# firewall-cmd --reload
# 查看所有打开的端口
[root@kafka1host ~]# firewall-cmd --list-port
2181/tcp 9092/tcp

修改配置

修改配置
server.properties
注意配置中的“=”前后不能有空格

# kafka broker 实际监听的地址和端口,集群间配置使用,如果不配置会使用 hostname 导致程序无法访问,如报:无法连接:kafka1host:9092
listeners=PLAINTEXT://172.16.30.100:9092

#允许删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

# 消息日志,默认日志会存放在 /tmp 目录下,有人喜欢放程序目录下
log.dirs=/tmp/kafka-logs

# 以下可不配置-------更多详细配置百度
# 对外提供的地址,它会注册到 zookeeper上,如果不配置,会使用上面 listeners 的值
# advertised.listeners=PLAINTEXT://10.100.25.230:9092

运行测试

一般 zookeeper 单独部署。这边单节点部署,为了省事,直接使用 kafka 包中内置的 Zookeeper。

启动 zookeeper
窗口A

# 先启动 zookeeper
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties

启动 Kafka , 新开一个命令行
窗口B

# 再启动 Kafka
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties

创建 topic 再开一个命令行
窗口C
测试

# 创建 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --create --topic kafka-vipsoft --replication-factor 1 --partitions 1 --zookeeper localhost:2181
Created topic kafka-vipsoft.

# 查看 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
kafka-vipsoft
# 删除 topic,如果 delete.topic.enable=true 没设的话,在kafka重启后才会生效
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --delete --topic kafka-vipsoft --zookeeper localhost:2181
Topic kafka-vipsoft is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

启动生产者
窗口C

# 启动生产者, 不能使用 localhost:9092 需要和 配置中的 listeners 保持一致
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-console-producer.sh --broker-list 172.0.30.100:9092 --topic kafka-vipsoft
> Hello 123

启动消费者,再开一个命令行
窗口D

#启动消费者
[root@kafka1host ~]# /usr/local/kafka-console-consumer.sh --bootstrap-server 172.16.3.203:9092 --topic kafka-vipsoft --from-beginning
Hello 123

自启动

创建启动命令
vi /usr/local/kafka_2.13-2.8.2/kafka_start.sh

#!/bin/sh

/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties &

sleep 5 #先启动 zookeeper,等5秒后启动 kafka 

#启动kafka
/usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties &

创建停止命令
vi /usr/local/kafka_2.13-2.8.2/kafka_stop.sh

#!/bin/sh

/usr/local/kafka_2.13-2.8.2/bin/kafka-server-stop.sh &

sleep 3 #先停Kafka 再停 zookeeper 否则 kafka 停不掉

/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-stop.sh

修改脚本执行权限

chmod 775 kafka_start.sh
chmod 775 kafka_stop.sh

验证脚本

sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh

设置开机启动
vi /etc/rc.d/rc.local


# 添加如下脚本
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh &

验证端口

查看多端口状态

netstat -ntpl | grep ':2181\|:9020'
[root@kafka1host ~]# netstat -ntpl | grep ':2181\|:9092'
tcp6       0      0 172.16.3.203:9092       :::*                    LISTEN      5419/java
tcp6       0      0 :::2181                 :::*                    LISTEN      4182/java

Kafka 管理(CMAK)

需要JDK 11

下载

CMAK(Kafka 图形界面管理工具)
https://github.com/yahoo/CMAK/releases
image

安装

[root@kafka1host ~]# unzip cmak-3.0.0.6.zip
[root@kafka1host ~]# cd cmak-3.0.0.6/
# 修改配置
[root@kafka1host cmak-3.0.0.6]# cp ./conf/application.conf ./conf/application.conf.bak
kafka-manager.zkhosts="localhost:2181"
cmak.zkhosts="localhost:2181"
#翻到最下面,修改密码
basicAuthentication.password="123456"

# 启动 cmak
[root@kafka1host cmak-3.0.0.6]# nohup bin/cmak 1>nohup.out 2>&1 &
[3] 17625
[root@kafka1host cmak-3.0.0.6]# netstat -ntpl | grep '9000'
tcp6       0      0 :::9000                 :::*                    LISTEN      17625/java
# 配置防火墙
[root@kafka1host cmak-3.0.0.6]# firewall-cmd --permanent --zone=public --add-port=9000/tcp
# 更新防火墙规则
[root@kafka1host cmak-3.0.0.6]# firewall-cmd --reload

添加 Kafka Coluster

image
image
image
image

标签: none

添加新评论