2024年8月

最近在看一个系统代码时,发现系统里面在使用到了 ThreadLocal,乍一看,
好像很高级的样子
。我再仔细一看,这个场景并不会存在线程安全问题,完全只是在一个方法中传参使用的啊!(震惊)

难道是我水平太低,看不懂这个高级用法?经过和架构师请教和确认,这
完全就是一个 ThreadLocal 滥用的典型案例啊
!甚至,日常的业务系统中,
90%以上的 ThreadLocal 都在滥用或错用
!快来看看说的是不是你~

ThreadLocal 简介

ThreadLocal 也叫线程局部变量,是 Java 提供的一个工具类,它为每个线程提供一个独立的变量副本,从而实现线程间的
数据隔离

ThreadLocal 中的关键方法如下:

方法定义 方法用途
public T get() 返回当前线程所对应线程局部变量
public void set(T value) 设置当前线程的线程局部变量的值
public void remove() 删除当前线程局部变量的值

滥用:无伤大雅

在一些没有必要进行线程隔离的场景中使用“
好像高级
”的 ThreadLocal,看起来是挺唬人的,但这其实就是“
纸老虎
”。

滥用的典型案例是:在一个方法的内部,将入参信息写入 ThreadLocal 进行保存,在后续需要时从 ThreadLocal 中取出使用。一段简单的示例代码,可以参考:

public class TestService {

    private static final String COMMON = "1";

    private ThreadLocal<Map<String, Object>> commonThreadLocal = new ThreadLocal<>();

    public void testThreadLocal(String commonId, String activityId) {

        setCommonThreadLocal(commonId, activityId);

        // 省略业务代码①

        doSomething();

        // 省略业务代码②
    }

    /**
     * 将入参写入 ThreadLocal
     *
     * @param commonId
     * @param activityId
     */
    private void setCommonThreadLocal(String commonId, String activityId) {
        Map<String, Object> params = new HashMap<>();
        params.put("commonId", commonId);
        params.put("activityId", activityId);
        this.commonThreadLocal.set(params);
    }

    /**
     * 从 ThreadLocal 取出参数,进行业务处理
     */
    private void doSomething() {
        Map<String, Object> params = this.commonThreadLocal.get();
        String commonId = (String) params.get("commonId");
        if (StringUtils.equals(commonId, COMMON)) {
            // 省略业务代码
        }
    }
}

为什么说无伤大雅呢?因为这段代码的写入 ThreadLocal 和读取 ThreadLocal 都是在同一个线程中进行的,代码可以正常运行,并且运行结果正确。

但是,还是这段代码,也埋了
一个“坑”
,稍有不慎,将可能导致错误的结果。如果在处理业务逻辑中(①或者②处)使用了多线程技术,创建了其他线程,在其他线程中去获取ThreadLocal中写入的值,根据获取到的值进行相关业务逻辑处理,很可能得到预期之外的结果,从而演化为一个
错误案例

错用:血泪教训

错误案例

以一个常见的 Web 应用为例,方便起见,我在本机 Idea 使用 Spring Boot 创建一个工程,在 Controller 中使用 ThreadLocal 来保存线程中的用户信息,初识为 null。业务逻辑很简单,先从 ThreadLocal 获取一次值,然后把入参中的 uid 设置到 ThreadLocal 中,随后再获取一次值,最后返回两次获得的 uid。代码如下:

private static final ThreadLocal<String> USER_INFO_THREAD_LOCAL = ThreadLocal.withInitial(() -> null);

@RequestMapping("user")
public String user(@RequestParam("uid") String uid) {
    //查询 ThreadLocal 中的用户信息
    String before = USER_INFO_THREAD_LOCAL.get();
    //设置用户信息
    USER_INFO_THREAD_LOCAL.set(uid);
    //再查询一次 ThreadLocal 中的用户信息
    String after = USER_INFO_THREAD_LOCAL.get();

    return before + ";" + after;
}

启动工程,使用 uid=1,uid=2 ……作为入参进行测试,结果如下:

http://localhost:8080/user?uid=1

没有问题!

http://localhost:8080/user?uid=2

很稳!

多来几次,结果还是很稳的。

结果符合预期,这真的没有问题吗?

问到这里,你是不是也有点怀疑了?是不是我要翻车了?写到这里就被迫结束了。
NO!NO!NO!继续看!

我调整 application.properties 参数,方便复现问题:

server.tomcat.max-threads=1

继续执行上面的测试:

http://localhost:8080/user?uid=1

没有问题!

http://localhost:8080/user?uid=2

什么?uid2 读取到了 uid1 的信息!!!

http://localhost:8080/user?uid=1

什么?uid1 也读取到了 uid2 的信息!!!

这岂不是乱套了,全乱了,整个晋西北都乱成了一锅粥!

问题原因

为什么数据会错乱呢?

数据错乱,究竟是怎么回事呢?按理说,在设置用户信息之前第一次获取的值始终应该是 null,然后设置之后再去读取,读到的应该是设置之后的值才对啊。

真相是这样的,程序运行在 Tomcat 中,
Tomcat 的工作线程是基于线程池的,线程池其实是复用了一些固定的线程的

如果线程被复用,那么很可能从 ThreadLocal 获取的值是之前其他用户的遗留下的值

为什么调整线程池参数,就测试出问题了呢?

Spring Boot 内嵌的 Tomcat 服务器的默认线程池最大线程数是 200,但通过修改
application.properties

application.yml
文件来调整。关键参数如下:

  • 最大工作线程数 (server.tomcat.max-threads):默认值为 200,Tomcat 可以同时处理的最大线程数。
  • 最小工作线程数 (server.tomcat.min-spare-threads):默认值为 10,Tomcat 在启动时初始化的线程数。
  • 最大连接数 (server.tomcat.max-connections):默认值为 10000,Tomcat 在任何时候可以接受的最大连接数。
  • 等待队列长度 (server.tomcat.accept-count):默认值为 100,当所有线程都在使用时,等待队列的最大长度。

我调整参数(server.tomcat.max-threads=1)之后,很容易复用到之前的线程,复用线程情况下,触发了代码中隐藏的 Bug

如果不调整的话,在较大流量的场景下也会触发这个 Bug

解决办法

那应该如何修改呢?其实方案很简单,在 finally 代码块中
显式清除 ThreadLocal 中的数据
。这样,即使复用了之前的线程,也不会获取到错误的用户信息。修正后的代码如下:

private static final ThreadLocal<String> USER_INFO_THREAD_LOCAL = ThreadLocal.withInitial(() -> null);

@RequestMapping("right")
public String right(@RequestParam("uid") String uid) {
    String before = USER_INFO_THREAD_LOCAL.get();
    USER_INFO_THREAD_LOCAL.set(uid);
    try {
        String after = USER_INFO_THREAD_LOCAL.get();
        return before + ";" + after;
    } finally {
        USER_INFO_THREAD_LOCAL.remove();
    }
}

正确使用

前面是滥用和错用的例子,那应该如何正确使用 ThreadLocal 呢? 正确的使用场景包括:

  1. 在网关场景下,使用 ThreadLocal 来存储追踪请求的 ID、请求来源等信息;
  2. RPC 等框架中使用 ThreadLocal 保存请求上下文信息;
  3. ……

最常见的案例是
用户登录拦截
,从 HttpServletRequest 获取到用户信息,并保存到 ThreadLocal 中,方便后续随时取用,代码如下:

public class ContextHttpInterceptor implements HandlerInterceptor {

    private static final ThreadLocal<Context> contextThreadLocal = new ThreadLocal<Context>();

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
        try {
            Context context = new Context();
            String pin = request.getParameter("pin");
            if (StringUtils.isNotBlank(pin)) {
                context.setPin(pin);
            }
            contextThreadLocal.set(context);
        } catch (Exception e) {
        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse resposne, Object o,
                           ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse resposne,
                                Object o, Exception e) throws Exception {
        contextThreadLocal.remove();
    }
}


public class Context {
    private String pin;

    public String getPin() {
        return pin;
    }

    public void setPin(String pin) {
        this.pin = pin;
    }
}

总结

本文给大家介绍了 ThreadLocal 的无伤大雅的滥用案例、血泪教训的错误案例,分析问题原因和解决方法,也给出了正确的案例,希望对大家理解和使用 ThreadLocal 有帮助。

真正的高手往往使用最朴实无华的招数,写出无可挑剔的代码
;有时候炫技式的代码可能会出错。

大师级程序员把系统当作故事来讲,而不是当作程序来写
。把故事讲好,即方便自己阅读,也方便别人阅读,共勉。

一起学习

欢迎各位在评论区或者私信我一起交流讨论,或者加我主页 weixin,备注技术渠道(如博客园),进入技术交流群,我们一起讨论和交流,共同进步!

也欢迎大家
关注我的博客园、公众号(码上暴富)
,点赞、留言、转发。
你的支持,是我更文的最大动力!

前言:

我们在写vue项目时,弹框是非常常用的组件,并且在同一个项目中,弹框大多类似。所以我们可以抽离封装出一个通用的弹框;

因为vue3可向下兼容,所以作者这边会使用vue2的写法,vue3写法大同小异。

第一步:新建相关文件

一般来说是在
src/components/dialog
下新建如下两个文件:

  1. index.vue:该文件是组件内容相关的,用来书写弹框组件的结构、样式、和动态逻辑;
  2. index.js:该文件使用虚拟节点创建组件内容,并且注册组件。

第二步:书写组件内容

index.vue组件内容如下:

  1. 结构 + js 代码
<template>
  <div class="default-message" :id="boxId">
    <div class="default-message-content">
      <div class="default-message-title">{{ title }}</div>
      <div class="default-message-value" v-html="message"></div>
      <div class="default-message-btns">
        <div
          class="default-message-cancle default-message-btn"
          v-if="cancelBtnHtml"
          @click.prevent.stop="handleCancel"
        >
          {{ cancelBtnHtml }}
        </div>
        <div
          class="default-message-submit default-message-btn"
          @click.prevent.stop="handleOk"
        >
          {{ okBtnHtml }}
        </div>
      </div>
    </div>
  </div>
</template>

<script>
import i18n from "@/i18n";
import { defineComponent } from "vue";
export default defineComponent({
  name: "Dialog",
  data() {
    return {
      i18nTitle: '',
      i18nOkBtn: '',
    };
  },
  props: {
    boxId: {
      type: String,
      default: "",
    },
    // 标题
    title: {
      type: String,
      default: "",
    },
    // 内容
    message: {
      type: String,
      default: "",
    },
    // 确定按钮文字
    okBtnHtml: {
      type: String,
      default: '',
    },
    // 取消按钮文字
    cancelBtnHtml: {
      type: String,
      default: "",
    },
    // 成功回调
    ok_function: {
      type: Function,
    },
    // 失败回调
    cancel_function: {
      type: Function,
    },
  },
  methods: {
    handleCancel() {
      this.removeModal();
      this.cancel_function && this.cancel_function();
    },
    handleOk() {
      this.removeModal();
      this.ok_function && this.ok_function();
    },
    removeModal() {
      const modelDom = document.getElementById(
        "__default__container__content__"
      );
      if (modelDom) {
        document.body.removeChild(modelDom);
      }
    },
  },
  created() {
    this.i18nTitle = i18n.global.t('modal_warm_tip_title');
    this.i18nOkBtn = i18n.global.t('activity_ok');
  },
});
</script>

结构说明:

  • .default-message
    使我们整个弹框的容器,一般宽高都设置为100%,这个部分会有一个半透明的背景色(覆盖页面内容,防止弹框了还能操作页面);
  • .default-message-content
    为整个弹框的内容区域,包括标题、提示信息、取消按钮、确定按钮;
  • 取消按钮和确定按钮支持执行传入的事件
    ,方便我们在弹框弹出后点击按钮执行相应操作;
  1. 样式
<style lang="less" scoped>
.default-message {
  position: fixed;
  right: 0;
  top: 0;
  bottom: 0;
  left: 0;
  width: 100%;
  height: 100%;
  z-index: 1000;
  background: rgba(0, 0, 0, 0.7);

  .default-message-title {
    color: #333;
    margin: 0;
    line-height: 1.5;
    font-size: 18px;
    min-height: 18px;
    padding-top: 20px;
    text-overflow: ellipsis;
    font-weight: bold;
    cursor: move;
    text-align: center;
  }

  .default-message-content {
    width: 85%;
    position: absolute;
    top: 50%;
    left: 50%;
    transform: translate3d(-50%, -50%, 0);
    background-color: #fff;
    border-radius: 6px;
    transition: all 0.2s ease-in;
    color: #999;
    font-size: 18px;
  }

  .default-message-value {
    padding: 28px 18px;
    text-align: center;
    position: relative;
    color: #999;
    text-align: center;
    font-size: 14px;
    color: rgba(102, 102, 102, 1);
  }
  .default-message-btns {
    // border-top: 1px solid #ddd;
    display: flex;
    height: 60px;
    position: relative;
    &:after {
      position: absolute;
      content: "";
      display: inline-block;
      left: 0;
      right: 0;
      top: 0;
      height: 1px;
      transform: scaleY(0.5);
      background: #ddd;
    }
    .default-message-btn {
      flex: 1;
      display: flex;
      align-items: center;
      justify-content: center;
      font-size: 16px;
      padding: 0 3px;
    }
    .default-message-submit {
      color: #26a2ff;
    }
    .default-message-cancle {
      color: #999;
      position: relative;
      &:after {
        position: absolute;
        content: "";
        display: inline-block;
        top: 0;
        right: 0;
        bottom: 0;
        width: 1px;
        transform: scaleX(0.5);
        background: #ddd;
      }
    }
  }
  @keyframes fadeIn {
    from {
      opacity: 0;
    }
    to {
      opacity: 1;
    }
  }
}
</style>

第三步:注册成全局组件

import {createVNode, render} from 'vue';
import MessageConstructor from './index.vue';

const $dialog = function (options) {
  // 已存在一个弹窗则不重复渲染
  if (!document.getElementById ('__default__container__content__')) {
    // 创建div
    const container = document.createElement ('div');
    // container.className = `__default__container__message__`;
    container.id = '__default__container__content__';
    //创建虚拟节点
    const vm = createVNode (MessageConstructor, options);
    //渲染虚拟节点
    render (vm, container);
    document.body.appendChild (container);
  }
};

export default {
  //组件注册
  install (app) {
    app.config.globalProperties.$dialog = $dialog;
  },
};

到这里,我们的弹框组件就完成了,接下来我们来使用看看。

项目中使用弹框

使用的方法也非常简单,所见即所得。

app.config.globalProperties.$dialog({
  title: "弹框标题",
  message: "弹框提示信息文案",
  okBtnHtml: "确定",
  cancelBtnHtml: "取消",
  ok_function: () => {
    console.log("点击弹框确定按钮处理函数");
  },
  cancel_function: () => {
    console.log("点击弹框取消按钮处理函数");
  }
});

说明:

  1. 标题、提示文案、以及取消和确定按钮文案,我们这边直接传入,
    ok_function
    是确定按钮的回调,我们可以在这里做任何点击确定后想做的事,包括发送请求和异步操作,
    cancel_function
    同理。
  2. 弹框图片示例:

image.png

写在后面

这是一个比较基础的弹框组件,这边示例的代码是比较全的,对细节要求不大的小伙伴可以直接用;

背景颜色、字体、布局等这些细节,因为每个业务场景不同,大家可以根据自己的需要适当调整;

弹框是固定单位的,如果小伙伴的项目需要使用响应式大小,直接对应替换大小单位即可;

对你有帮助的话给作者点点关注吧,你的支持是我不断更新的动力!Peace and love~~

本文主要是介绍 wiz 为知笔记服务器 docker 从旧服务器迁移到新服务器的步骤以及问题排查。

旧服务器升级 wiz docker

目的:保持和新服务器拉取的镜像版本一致。

官方只留了 wiz docker 镜像最新版,拉取不了旧版本镜像,所以先升级旧服务器上的 wiz docker。

升级方法

docker stop wiz
docker rm wiz
docker pull wiznote/wizserver:latest
docker run --name wiz -it -d -v /home/ubuntu/wizdata:/wiz/storage -v /etc/localtime:/etc/localtime -p 9000:80 -p 9269:9269/udp wiznote/wizserver

注:第 4 行的挂载目录、绑定的端口请根据你自己的部署参数修改

迁移旧服务器挂载的 wiz 数据目录

我的目录是
/home/ubuntu/wizdata
,将
wizdata
目录想方法传到 新服务器,我是先用 7z 压缩目录然后通过 ftp 传到新服务器后再解压。

7z 相关命令

# Ubuntu/Debian 安装命令
sudo apt-get install p7zip-full
# 压缩整个目录
7z a wizdata.7z wizdata/*
# 测试压缩文件的完整性
7z t wizdata.7z
# 解压到指定目录
7z x wizdata.7z -o./wizdata

注:传到新服务器后一定要先测试压缩文件完整性,不然挂载后会出错

如果测试没有错误,会显示
Everything is Ok

image

目录权限设置

解压
wizdata
后,最好将目录权限设置成和旧服务器一致。

image

image

# 全部子目录及文件权限改为 777
chmod 777 * -R 

新服务器部署 wiz docker

# 部署为知笔记服务器 Docker 镜像
docker run --name wiz -it -d -v /home/zoyo/wizdata:/wiz/storage -v /etc/localtime:/etc/localtime -p 3180:80 -p 39269:9269/udp wiznote/wizserver

注:挂载目录、绑定的端口请根据你自己的部署参数修改

官方指南:
为知笔记服务器Docker镜像部署介绍

完美迁移

image

迁移问题排查指北

  • 检查新旧服务器 wiz docker 镜像版本是否一致

  • 检查迁移的 wizdata 目录是否数据损坏

  • 检查新旧服务器挂载的 wizdata 目录权限是否一致

  • 检查挂载的 wizdata 目录路径是否正确

    我这里就犯过低级错误,解压后 wizdata 目录路径为
    /home/zoyo/wizdata/wizdata
    ,而挂载的目录路径为
    /home/zoyo/wizdata

  • 查看容器日志排查问题
    docker logs -f 7e455ab9c988

  • 进入容器查看 mysql 日志排查问题

    # 查看 wiz docker 镜像 ID
    sudo docker ps
    # 进入 wiz docker 终端
    sudo docker exec -it 3bceba9f92df bash
    # 查看 mysql 错误日志
    cat /var/lib/mysql/mysql_error.log
    

我前几天给新电脑装上了Windows 10系统,想要美化一下,遇到了很多问题,就出了这篇博客,帮大家踩踩坑。

在开始之前,先提醒大家一句:
美化有风险,玩机需谨慎。为以防万一,请大家在进行任何操作前创建一个系统还原点。


首先给大家避避坑,千万别用Steam,不挂梯子根本下载不了,美化资源基本上都要付费。WallPaper Engine可以使用自己的壁纸替代,省钱的同时还能节省内存资源。

话不多说,上截图!!

显示截图
这次的教程基于MydockFinder和Mac OS图标包,所有文件会打包在一个压缩包内上传到123pan。
下载资源包
第一步,下载好文件夹后解压,打开主题文件夹,先安装UltraUXThemePatcher,再运行剩下的两个exe,可能会很慢,在完成前千万不要重启电脑,否则有可能导致系统损坏。
安装完,重启系统,可以看到系统的图标已经变成了苹果样式的。
按Win+i快捷键打开设置 > 个性化 > 主题,选择BigSur主题,壁纸和图标就搞定了


接下来,我们要安装MydockFinder。打开mydockfinder文件夹,安装两个vc运行库,然后把Mydock文件夹移动到C:\Program Files
打开Mydock文件夹,运行Dock_64.exe。
至此,基本配置已完成。
接下来,右键桌面底部的dock,点击偏好设置,根据自己屏幕的分辨率调整全局界面显示大小。然后勾选开机自启和自动隐藏任务栏。重启电脑,美化就完成了。


Kafka 从 2.6.0 开始,默认使用 Java 11 , 3.0.0 开始,不再支持 Java 8,详见:
https://kafka.apache.org/downloads

image

  • Producer:消息生产者,就是向 kafka broker 发消息的客户端:
  • Consumer:消息消费者,向 kafka broker 取消息的客户端;
  • ConsumerGroup:消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    不同的组可以消费同一个消息,且只能被消费组内的一个消费者消费
  • Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
  • Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  • Replica: 副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个 Leader 和 若干个 follower
  • leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  • follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

安装 JDK 11

[root@kafka1host ~]# mkdir /usr/local/java
# 解压JDK
[root@kafka1host ~]# tar -zxvf jdk-11.0.17_linux-x64_bin.tar.gz -C /usr/local/java
# 注意 由于jdk1.8版本之后无 jre. 需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre

# 配置环境变量
[root@localhost~]# vi /etc/profile
export JAVA_HOME=/usr/local/java/jdk-11.0.17
export JRE_HOME=${JAVA_HOME}
export PATH=$PATH:${JAVA_HOME}/bin
export CLASSPATH=./:${JAVA_HOME}/lib:${JAVA_HOME}/lib

# 让环境变更生效
[root@localhost~]# source /etc/profile
#  此时 java -version 仍然显示旧版本 【1. 删除旧版本、2. 切换Java版本】
[root@kafka1host kafka]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@kafka1host kafka]#

切换 JAVA 版本,(
如需要卸载旧版本,点击此处
)


# 查看已安装的Java版本及其路径,/etc/alternatives/java 是当前Java版本的符号链接。
[root@kafka1host ~]# ls -l /usr/bin/java
lrwxrwxrwx. 1 root root 22 Apr 27  2021 /usr/bin/java -> /etc/alternatives/java
# 查看当前Java版本的可执行文件路径: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java 是当前Java版本的可执行文件路径。
[root@kafka1host ~]# ls -l /etc/alternatives/java
lrwxrwxrwx. 1 root root 71 Apr 27  2021 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java
# alternatives 注册 Java 版本信息
[root@kafka1host ~]# alternatives --install /usr/bin/java java /usr/local/java/jdk-11.0.17/bin/java 1
	# alternatives --install <link> <name> <path> <priority>
	# install 表示安装
	# link 是符号链接
	# name 则是标识符
	# path 是执行文件的路径
	# priority 则表示优先级
# 选择Java配置版本
[root@kafka1host ~]# sudo alternatives --config java

There are 3 programs which provide 'java'.

  Selection    Command
-----------------------------------------------
*+ 1           java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java)
   2           java-1.7.0-openjdk.x86_64 (/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64/jre/bin/java)
   3           /usr/local/java/jdk-11.0.17/bin/java
# 输入对应版本的编号,然后按Enter键
Enter to keep the current selection[+], or type selection number: 3
# 查看Java版本
[root@kafka1host ~]# java -version
java version "11.0.17" 2022-10-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.17+10-LTS-269)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.17+10-LTS-269, mixed mode)
[root@kafka1host ~]# 

注意: 由于jdk1.8版本之后无 jre. 如果需要运行 tomcat ,需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre

安装 Kafka

下载 Kafka 2.8.2

https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz

image

防火墙

配置防火墙规则

# 查看防火墙状态
[root@kafka1host ~]# firewall-cmd --state 
running
# 查看默认作用域 -- 一般不需要查看
[root@kafka1host ~]# firewall-cmd --get-default-zone
public
# 添加防火墙规则,允许访问 2181、9092 端口
[root@kafka1host ~]# firewall-cmd --permanent --zone=public --add-port=2181/tcp
# –permanent : 表示使设置永久生效,不加的话机器重启之后失效,
# –add-port=2181/tcp : 表示添加一个端口和协议的规则,
# --zone=public: 作用域(默认为 public 可不加)
[root@kafka1host ~]# firewall-cmd --permanent --add-port=9092/tcp
# 更新防火墙规则
[root@kafka1host ~]# firewall-cmd --reload
# 查看所有打开的端口
[root@kafka1host ~]# firewall-cmd --list-port
2181/tcp 9092/tcp

修改配置

修改配置
server.properties
注意配置中的“=”前后不能有空格

# kafka broker 实际监听的地址和端口,集群间配置使用,如果不配置会使用 hostname 导致程序无法访问,如报:无法连接:kafka1host:9092
listeners=PLAINTEXT://172.16.30.100:9092

#允许删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

# 消息日志,默认日志会存放在 /tmp 目录下,有人喜欢放程序目录下
log.dirs=/tmp/kafka-logs

# 以下可不配置-------更多详细配置百度
# 对外提供的地址,它会注册到 zookeeper上,如果不配置,会使用上面 listeners 的值
# advertised.listeners=PLAINTEXT://10.100.25.230:9092

运行测试

一般 zookeeper 单独部署。这边单节点部署,为了省事,直接使用 kafka 包中内置的 Zookeeper。

启动 zookeeper
窗口A

# 先启动 zookeeper
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties

启动 Kafka , 新开一个命令行
窗口B

# 再启动 Kafka
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties

创建 topic 再开一个命令行
窗口C
测试

# 创建 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --create --topic kafka-vipsoft --replication-factor 1 --partitions 1 --zookeeper localhost:2181
Created topic kafka-vipsoft.

# 查看 topic
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
kafka-vipsoft
# 删除 topic,如果 delete.topic.enable=true 没设的话,在kafka重启后才会生效
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --delete --topic kafka-vipsoft --zookeeper localhost:2181
Topic kafka-vipsoft is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

启动生产者
窗口C

# 启动生产者, 不能使用 localhost:9092 需要和 配置中的 listeners 保持一致
[root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-console-producer.sh --broker-list 172.0.30.100:9092 --topic kafka-vipsoft
> Hello 123

启动消费者,再开一个命令行
窗口D

#启动消费者
[root@kafka1host ~]# /usr/local/kafka-console-consumer.sh --bootstrap-server 172.16.3.203:9092 --topic kafka-vipsoft --from-beginning
Hello 123

自启动

创建启动命令
vi /usr/local/kafka_2.13-2.8.2/kafka_start.sh

#!/bin/sh

/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties &

sleep 5 #先启动 zookeeper,等5秒后启动 kafka 

#启动kafka
/usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties &

创建停止命令
vi /usr/local/kafka_2.13-2.8.2/kafka_stop.sh

#!/bin/sh

/usr/local/kafka_2.13-2.8.2/bin/kafka-server-stop.sh &

sleep 3 #先停Kafka 再停 zookeeper 否则 kafka 停不掉

/usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-stop.sh

修改脚本执行权限

chmod 775 kafka_start.sh
chmod 775 kafka_stop.sh

验证脚本

sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh

设置开机启动
vi /etc/rc.d/rc.local


# 添加如下脚本
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh &

验证端口

查看多端口状态

netstat -ntpl | grep ':2181\|:9020'
[root@kafka1host ~]# netstat -ntpl | grep ':2181\|:9092'
tcp6       0      0 172.16.3.203:9092       :::*                    LISTEN      5419/java
tcp6       0      0 :::2181                 :::*                    LISTEN      4182/java

Kafka 管理(CMAK)

需要JDK 11

下载

CMAK(Kafka 图形界面管理工具)
https://github.com/yahoo/CMAK/releases
image

安装

[root@kafka1host ~]# unzip cmak-3.0.0.6.zip
[root@kafka1host ~]# cd cmak-3.0.0.6/
# 修改配置
[root@kafka1host cmak-3.0.0.6]# cp ./conf/application.conf ./conf/application.conf.bak
kafka-manager.zkhosts="localhost:2181"
cmak.zkhosts="localhost:2181"
#翻到最下面,修改密码
basicAuthentication.password="123456"

# 启动 cmak
[root@kafka1host cmak-3.0.0.6]# nohup bin/cmak 1>nohup.out 2>&1 &
[3] 17625
[root@kafka1host cmak-3.0.0.6]# netstat -ntpl | grep '9000'
tcp6       0      0 :::9000                 :::*                    LISTEN      17625/java
# 配置防火墙
[root@kafka1host cmak-3.0.0.6]# firewall-cmd --permanent --zone=public --add-port=9000/tcp
# 更新防火墙规则
[root@kafka1host cmak-3.0.0.6]# firewall-cmd --reload

添加 Kafka Coluster

image
image
image
image