2023年3月

V8和J2V8

V8

V8是Google开源的JavaScript和WebAssembly引擎,被用于Chrome浏览器和Node.js等。和其它JavaScript引擎把JavaScript转换成字节码或解释执行不同的是,V8在运行JavaScript之前,会将JavaScript编译成原生机器码,并且使用内联缓存等方法来提高性能,V8引擎执行JavaScript的速度可以媲美二进制程序。V8采用 C++ 编写,可以独立运行,也可以嵌入到任何 C++ 程序中。

J2V8

J2V8
是对V8引擎的一层Java封装,即J2V8借助JNI来实现Java层对C++层的访问,通过Java将V8的一些关键API暴露出来供外部程序使用。J2V8旨在为Java世界带来更加高效的JavaScript运行时环境,同时J2V8也可以在Windows、Linux、Mac OS等平台上运行。

J2V8的使用

我们在Android工程中演示J2V8如何具体使用。

创建Android工程

首先,我们通过Android Studio创建一个Android工程。

依赖J2V8

创建好Android工程之后,需要依赖J2V8库,如下:

// j2v8
implementation "com.eclipsesource.j2v8:j2v8:6.2.1@aar"

创建V8运行时实例

要使用J2V8,我们首先需要创建一个V8运行时实例,通过这个V8运行时实例,我们可以执行js脚本、注入js变量、注入原生方法等。
com.eclipsesource.v8.V8
提供了一系列创建V8运行时实例的静态方法,如下:

public static V8 createV8Runtime() {
    return createV8Runtime((String)null, (String)null);
}

public static V8 createV8Runtime(String globalAlias) {
    return createV8Runtime(globalAlias, (String)null);
}

public static V8 createV8Runtime(String globalAlias, String tempDirectory) {
    if (!nativeLibraryLoaded) {
        synchronized(lock) {
            if (!nativeLibraryLoaded) {
                load(tempDirectory);
            }
        }
    }

    checkNativeLibraryLoaded();
    if (!initialized) {
        _setFlags(v8Flags);
        initialized = true;
    }

    V8 runtime = new V8(globalAlias);
    synchronized(lock) {
        ++runtimeCounter;
        return runtime;
    }
}

我们用最简单的方法创建一个V8运行时实例,如下:

val v8 = V8.createV8Runtime()

执行js脚本

com.eclipsesource.v8.V8
提供了一系列执行js脚本的方法,如下:
J2V8执行js脚本的所有方法.png
这些方法涵盖了多种不同的使用场景,例如返回不同的数据类型、执行某些具体的js函数等等,我们可以根据需要选择合适的方法来执行js脚本。

注入变量

通过
com.eclipsesource.v8.V8Object
提供的一系列
add
方法,可以给
V8Object
实例注入js变量,如下:

v8.add("key1", "value1")
v8.add("key2", "value2")
v8.add("key3", "value3")

变量注入之后,我们可以在js中直接使用这些变量。

注入原生对象

J2V8支持注入原生对象,向js注入原生对象之后,在js中可以访问原生对象以及原生对象内部的方法和属性。
如何注入原生对象呢?我们以在js中调用原生代码输出日志的场景为例。
首先,在原生代码中定义一个
Console
类,如下:

/**
 * 输出日志的类
 */
class Console {

    fun log(tag: String, message: V8Array) {
        Log.d(tag, message.toString())
    }
}

Console
类实现了一个
log
方法,用于打印日志内容。
然后,创建
Console
类对象,如下:

val console = Console()

接下来,我们创建一个
V8Object
对象,并向V8运行时对象注入这个对象,对象名命名为
console
,如下:

val consoleObject = V8Object(v8)
v8.add("console", consoleObject)

再接下来,我们通过
consoleObject
注册原生方法。
这里,我们选择通过如下方法实现原生方法的注册:

public V8Object registerJavaMethod(Object object, String methodName, String jsFunctionName, Class<?>[] parameterTypes) {
    return this.registerJavaMethod(object, methodName, jsFunctionName, parameterTypes, false);
}

public V8Object registerJavaMethod(Object object, String methodName, String jsFunctionName, Class<?>[] parameterTypes, boolean includeReceiver) {
    this.v8.checkThread();
    this.checkReleased();

    try {
        Method method = object.getClass().getMethod(methodName, parameterTypes);
        method.setAccessible(true);
        this.v8.registerCallback(object, method, this.getHandle(), jsFunctionName, includeReceiver);
        return this;
    } catch (NoSuchMethodException var7) {
        throw new IllegalStateException(var7);
    } catch (SecurityException var8) {
        throw new IllegalStateException(var8);
    }
}

这是一种基于反射的方法,其中,第一个参数传原生对象即
console
,第二个参数传原生方法名称即
log
,第三个参数传js方法的名称,这里和原生方法名称保持一致(当然也可以不一致),最后一个参数传
Class
类型的数组,表示方法参数的类型,数组元素和方法参数类型必须一一对应。如下:

consoleObject.registerJavaMethod(
    console, "log", "log", arrayOf(
        String::class.java, V8Array::class.java
    )
)

最后,一定要记得关闭手动创建的
V8Object
对象,释放native层的内存,否则会报内存泄露方面的错误。如下:

consoleObject.close()

以上步骤完成之后,我们就可以在js中愉快地打印日志了,我们使用V8运行时对象执行打印日志的js脚本,将此前注入的js变量打印出来,如下:

v8.executeScript("console.log('myConsole', [key1, key2, key3]);")

在Android Studio的Logcat中将会输出如下信息:

2022-12-25 13:21:48.932  4556-4556  myConsole               com.xy.j2v8                          D  value1,value2,value3

注入原生方法

上面在讲注入原生对象的过程当中,其实也包含了原生方法的注入,在注入原生方法的时候,除了上面讲到的方法,还有更简单的方法。
com.eclipsesource.v8.V8Object
还提供了如下方法:

public V8Object registerJavaMethod(JavaCallback callback, String jsFunctionName) {
    this.v8.checkThread();
    this.checkReleased();
    this.v8.registerCallback(callback, this.getHandle(), jsFunctionName);
    return this;
}

public V8Object registerJavaMethod(JavaVoidCallback callback, String jsFunctionName) {
    this.v8.checkThread();
    this.checkReleased();
    this.v8.registerVoidCallback(callback, this.getHandle(), jsFunctionName);
    return this;
}

两个方法只有第一个参数不一样,
com.eclipsesource.v8.JavaVoidCallback
表示原生方法无返回值,
com.eclipsesource.v8.JavaCallback
表示原生方法有返回值,返回类型是
java.lang.Object
类型的。
两个方法的第二个参数表示要注入的方法的名称。
我们使用第一个方法,来注入一个在js中弹Toast的方法,代码如下:

v8.registerJavaMethod({ v8Object, v8Array ->
    Toast.makeText(this, "$v8Object, $v8Array", Toast.LENGTH_SHORT).show()
}, "toast")

接下来,我们执行调用toast的js脚本:

v8.executeJSFunction("toast", "Hello, I am a toast!")

脚本执行之后,屏幕中将弹出文本内容为"Hello, I am a toast!"的toast。

线程模型

JavaScript本身是单线程的,J2V8也严格遵循这一点,即对单个V8运行时的所有访问必须来自同一线程,换句话说就是:V8运行时实例在哪个线程被创建,就只能在哪个线程被使用。
如果在一个线程创建了V8运行时实例,在另外一个线程中直接访问这个实例,会抛异常,如下:

Process: com.xy.j2v8, PID: 27274
java.lang.Error: Invalid V8 thread access: current thread is Thread[Thread-3,5,main] while the locker has thread Thread[main,5,main]
	at com.eclipsesource.v8.V8Locker.checkThread(V8Locker.java:99)
	at com.eclipsesource.v8.V8.checkThread(V8.java:840)
	at com.eclipsesource.v8.V8.executeScript(V8.java:715)
	at com.eclipsesource.v8.V8.executeScript(V8.java:685)
	at com.xy.j2v8.MainActivity.onCreate$lambda$1$lambda$0(MainActivity.kt:27)
	at com.xy.j2v8.MainActivity.$r8$lambda$jWdwVCxaGZnvyZS9q138fD71tFk(Unknown Source:0)
	at com.xy.j2v8.MainActivity$$ExternalSyntheticLambda2.run(Unknown Source:2)
	at java.lang.Thread.run(Thread.java:929)

J2V8单线程模型确保了在使用单个V8运行时时不存在多线程问题,例如线程之间资源竞争,出现死锁等。

GitHub

XyJ2V8


尊重原创,转载请注明出处
初学J2V8

背景

最近一位朋友找到我,让我帮看他们的一个aspnet core service无端cpu高的问题。从描述上看,这个service之前没有出现过cpu高的情况,最近也没有改过实际的什么code。很奇怪了,会有什么变化导致cpu上去了呢?

分析

由于比较容易复现 (据说一启动service,cpu就上去了),我便让那位朋友在cpu高的时候直接手动把.net进程dump了一下。于是就开始用windbg分析了

先看一下案发时进程中的线程情况,毕竟它们是让进程动起来的源泉哈。大部分线程都运行到如下类似位置(下面的callstack是虚拟化的,因为为了朋友的隐私,code已经虚拟化):

这里可以看出有约
38/2=19
个线程运行到
Services.CronJob+d__1.MoveNext
这个位置。我又问了那位朋友,当时的运行环境是大约20个cpu core。真巧哈,几乎所有cpu core都很有可能跑到了这个地方了。

注:上面如何知道有38/2个线程,而不是38个线程呢?这是因为一般来说,当某个函数正在被调用时,callstack中会显示出两次,如图哈:

看到没,在
"current frame"
下面显示的上一层调用关系中会也显示这个方法,此时它是
callee
哈。



那么这个
Services.CronJob+d__1.MoveNext
是何方神圣呢,名字叫cpu killer更贴切吧?

原创:扣钉日记(微信公众号ID:codelogs),欢迎分享,非公众号转载保留此声明。

问题发生

这周正在写代码,突然,旁边小哥问我个问题...

  • 小哥:我这有个接口,自己调用没有问题,但别人调用就不行,这种问题该如何排查?
  • 我:抓下包看看呢...
  • 小哥:是这样使用tcpdump吗?
  • 我:是的

待小哥抓到包后,使用wireshark打开,并找到了相应的请求,类似如下:
fail

然后我让小哥将这个请求,使用curl发一个同样的请求,看能不能复现这个错误,如下:

$ curl -X POST localhost:80/api \
      -H 'Content-Type: application/x-www-form-urlencoded' \
      -d 'eyJvcmRlcl9pZCI6MTIzNDU2Nzg5MDIxNDN9Cg=='

命令执行之后,重现了调用方一样的接口报错。

然后抓包小哥自己的正确请求是这样的:
ok


这里很容易发现,别人调不通接口,小哥能调通,原因是别人的请求体里面缺失
data=
这一段

学习操作系统原理最好的方法是自己写一个简单的操作系统。


在QEMU中会默认输出一些字符,有时候会干扰我们自己输出的字符。一个比较好的办法是向将屏幕清空,再输出我们想要输出的字符。下面就来学习如何清空屏幕。

一、清空屏幕

其实清空屏幕的原理很简单,就是将屏幕写满空格就行了。
下面来实战。mbr8.asm代码如下:

;定义常量(作用和C语言中的#define一样)
VIDEO_CHAR_MAX_COUNT equ 2000 ;默认屏幕最多显示字符数。

org 0x7c00

;初始化段寄存器
mov ax,0xb800
mov es,ax ;本程序中es专用于指向显存段

;清屏
call func_clear_screen

stop:
hlt
jmp stop 

;清屏函数(将屏幕写满空格就实现了清屏)
;输入参数:无。
;输出参数:无。
func_clear_screen:
mov ah,0x00 ;黑底黑字
mov al,' '  ;空格
mov cx,VIDEO_CHAR_MAX_COUNT ;循环控制
.start_blank:
mov bx,cx ;以下3行表示bx=(cx-1)*2 
dec bx
shl bx,1
mov [es:bx],ax ;[es:bx]表示字符对应的显存地址(从屏幕右下角往前清屏)
loop .start_blank
ret

times 510-($-$$) db 0
db 0x55,0xaa

编译运行截图如下:

从上面QEMU截图可以看到,之前QEMU默认输出的字符已经没有了,屏幕清空的很干净。

二、在清空的屏幕上输出字符串

一般我们都是先清空屏幕,再输出自己想要输出的字符串。
下面我们来看mbr9.asm的代码:

;定义常量(作用和C语言中的#define一样)
VIDEO_CHAR_MAX_COUNT equ 2000 ;默认屏幕最多显示字符数。

org 0x7c00 ;如果没有该行将无法正确打印要显示的字符串

;初始化段寄存器
mov ax,cs
mov ds,ax ;ds指向与cs相同的段
mov ax,0xb800
mov es,ax ;本程序中es专用于指向显存段

;清屏
call func_clear_screen

;打印字符串
mov si,string1
mov di,0 ;在屏幕第1行显示
call func_print_string

stop:
hlt
jmp stop 

;清屏函数(将屏幕写满空格就实现了清屏)
;输入参数:无。
;输出参数:无。
func_clear_screen:
mov ah,0x00 ;黑底黑字
mov al,' '  ;空格
mov cx,VIDEO_CHAR_MAX_COUNT ;循环控制
.start_blank:
mov bx,cx ;以下3行表示bx=(cx-1)*2 
dec bx
shl bx,1
mov [es:bx],ax ;[es:bx]表示字符对应的显存地址(从屏幕右下角往前清屏)
loop .start_blank
ret

;打印字符串函数。
;输入参数:ds:si,di。
;输出参数:无。
;ds:si 表示字符串起始地址,以0为结束符。
;di 表示字符串在屏幕上显示的起始位置(0~1999)。
func_print_string:
mov ah,0x07 ;ah表示字符属性,0x07表示黑底白字。
shl di,1 ;乘2(屏幕上每个字符对应2个显存字节)。
.start_char: ;以点开头的标号为局部标号,完整形式是 func_print_string.start_char,但在同一个全局标号func_print_string内部不需要写完整形式。
mov al,[si]
cmp al,0
jz .end_print
mov [es:di],ax ;将字符和属性放到对应的显存中。
inc si
add di,2
jmp .start_char
.end_print:
ret

string1:db "Hello GrapeOS!",0

times 510-($-$$) db 0
db 0x55,0xaa

mbr9.asm其实就是将mbr8.asm和mbr7.asm合并了一下。
下面来看编译运行截图:

从上面的QEMU截图中可以看到,我们在清空的屏幕上第一行显示了字符串“Hello GrapeOS!”,这就是我们想要的效果。


本讲视频版地址:
https://www.bilibili.com/video/BV1DD4y137ET/
本教程代码和资料:
https://gitee.com/jackchengyujia/grapeos-course
GrapeOS操作系统QQ群:643474045

kafka原理详解

消息队列概述

消息队列分类

点对点

  • 组成:消息队列(Queue)、发送者(Sender)、接收者(Receiver)

  • 特点:一个生产者生产的消息只能被一个接受者接收,消息一旦被消费,消息就不在消息队列中了

发布/订阅

  • 组成:消息队列(Queue)、发布者(Publisher)、订阅者(Subscriber)、主题(Topic)

  • 特点:每个消息可以有多个消费者,彼此互不影响,即发布到消息队列的消息能被多个接受者(订阅者)接收

常见的消息系统

  • ActiveMQ: 历史悠久,支持性较好,性能相对不高

  • RabbitMQ: 可靠性高、安全

  • Kafka: 分布式、高性能、高吞吐量、跨语言

  • RocketMQ: 阿里开源的消息中间件,纯Java实现

kafka架构

kafka介绍

Kafka是一个分布式的发布/订阅消息系统,最初由LinkedIn(领英)公司发布,使用Scala语言编写,后成为Apache的顶级项目。

kafka主要用于处理活跃的数据,如登录、浏览、点击、分享等用户行为产生的数据。

kafka架构组成

Broker

  • broker表示kafka的节点,kafka集群包含多个kafka服务节点,每个kafka服务节点就称为一个broker

Topic

  • 主题,用来存储不同类别的消息(kafka的消息数据是分主题存储在硬盘上的)

  • 存储消息时,需要指定存储在哪个主题下面,如发帖,发哪种类型的

Partition

  • 分区,每个topic包含一个或多个partition,在创建topic时指定包含的partition数据(目的是为了进行分布式存储)

  • 分区可以提高负载(每个分区是不同的磁盘,所以会提高负载)

Replication

  • 副本,每个partition分区可以有多个副本,分布在不同的Broker上

  • kafka会选出一个副本作为Leader,
    所有的读写请求都会通过Leader完成,Follower只负责备份数据

  • 所有Follower会自动从Leader中复制数据,当Leader宕机后,会从Follower中选出一个新的Leader继续提供服务,实现故障自动转移

Message

  • 消息,是通信数据的基本单位,每个消息都属于一个Partition,消息都是放在Partition里面的

Producer

  • 消息的生产者,向kafka的一个topic发布消息,发布消息时,需要指定发布到哪个topic主题

Consumer

  • 消息的消费者,订阅Topic并读取其发布的消息,消费或订阅哪个topic主题里的消息,可以订阅多个主题的消息(类似订阅多个微信公众号)

Consumer Group

  • 消费者组,
    每个Consumer属于一个特定的Consumer Group
    ,多个Consumer可以属于同一个Consumer Group

  • 各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。

ZooKeeper

  • 协调Kafka的正常运行,
    kafka将元数据信息保存在ZooKeeper中,但发送给Topic本身的消息数据并不存储在ZK中
    ,而是存储在磁盘文件中

  • 元数据信息包括:kafka有多少个节点、有哪些主题,主题叫什么,有哪些分区的等(消息自身的数据不在ZK中,而是在磁盘上对应的分区中)

kafka的工作流程

生产者向kafka发送数据的流程(六步)

一共六步:

  1. 生产者查询Leader:producer先从zookeeper的“/brokers/.../state”节点找到该partition的leader

  2. 找到Leader之后往Leader写数据:producer将消息发送给该leader

  3. Leader落盘:leader将消息写入本地log

  4. Leader通知Follower

  5. Follower从Leader中拉取数据
    :replication写入到Follower的本地log后,follower向leader发送ack

  6. Kafka向生产者回应ACK:leader收到所有的replication的ack之后,向producer发送ack

Kafka选择分区的模式(三种)

  1. 直接指定往哪个分区写

  2. 指定key,然后kafka根据key做hash后决定写哪个分区

  3. 各个分区轮询

生产者往kafka发送数据的模式(三种)

  1. 把数据发送给Leader就认为成功,效率最高,安全性低

  2. 把数据发送给Leader,等待Leader回复Ack后则认为发送成功

  3. 把数据发送给Leader,确保Follower从Leader拉取数据回复Ack给Leader,Leader再向生产者回复Ack才认为发送成功,安全性最高

数据消费

多个消费者可以组成一个消费者组,并用一个标签来标识这个消费者组(一个消费者实例可以运行在不同的进程甚至不同的服务器上)

  • 如果所有的消费者实例都在同一个消费者组中,那么消息记录会被很好的均衡发送到每个消费者实例

  • 如果所有的消费者实例都在不同的消费者组,那么每一条消息记录会被广播到每一个消费者实例

各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组

注意:每个消费者实例可以消费多个分区,但是每一个分区最多只能被消费者组中的一个实例消费

kafka的文件存储机制

topic、partition和segment

1)在kafka文件存储中,同一个topic下有多个不同的partition:

  • 每个partition就是一个目录,partition的命名规则为:topic名称+有序序号

  • 第一个partition序号从0开始,序号最大值为partition数量减一

2)每个partition的目录下面会有多组segment文件:

  • 每个partition(目录)相当于一个巨型大文件被平均分配到多个大小都相等的segment数据文件中(但每个segment file消息数量不一定相等,这种特性方便old segment file快速被删除)

  • 每组segment文件包含:.index文件、.log文件、.timeindex文件(.log文件就是实际存储message的地方,.index和.timeindex文件为索引文件,用于检索消息)

  • 每个partition只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定

  • 这样做能快速删除无用文件,有效提高磁盘利用率

3)segment文件

  • segment文件由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件

  • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充

存储和查找message的过程

1)数据写入过程

每个Partition都是一个有序并且不可改变的消息记录集合(每个partition都是一个有序队列),当新的数据写入时,就被追加到partition的末尾。

在每个partition中,
每条消息都会被分配一个顺序的唯一标识,这个标识被称为Offset(偏移量),用于partition唯一标识一条消息

2)数据查找过程

在partition中通过offset查找message:

  1. 查找segment file:每一个segment文件名都包含了上一个segment最后一条消息的offset值,所以只要根据offset二分查找文件列表,就能定位到具体segment文件

  2. 通过segment file查找message:当定位到segment文件后,可以通过对应的.index元数据文件,在对应的.log文件中顺序查找对应的offset,然后即可拿到数据

3)说明:

  • kafka只能保证在同一个partition内部消息是有序的,在不同的partition之间,并不能保证消息有序

  • 为什么kafka快:因为它把对磁盘的随机读变成了顺序读

kafka安装部署及操作

kafka单机部署

安装ZooKeeper

kafka需要依赖ZooKeeper,所以需要先安装并启动ZooKeeper,kafka使用zk有两种方式:

  1. 使用kafka自带的ZooKeeper(一般不推荐使用内置的ZooKeeper)

  2. 单独搭建ZooKeeper

使用kafka自带的ZooKeeper:

 # kafka的bin目录中,有自带的zk的启动命令
 /usr/local/kafka/bin/zookeeper-server-start.sh
 
 # kafka的config目录中,有自带的zk的配置文件
 /usr/local/kafka/bin/zookeeper.properties

如果要使用kafka内置的ZooKeeper,修改好配置文件
./config/zookeeper.properties
(主要修改zk的data位置和端口),直接启动即可

 # 后台启动,并指定配置文件
 zookeeper-server-start.sh -daemon ../config/zookeeper.properties

安装kafka

kafka需要java环境,需要安装jdk

 # 1.安装jdk
 yum install -y java-1.8.0-openjdk
 
 # 2.准备kafka安装包
 tar zxvf kafka_2.11-2.2.0.tgz -C /usr/local/
 ln -s /usr/local/kafka_2.11-2.2.0 /usr/local/kafka
 mkdir -pv /data/kafka/data/   # 创建kafka数据存储目录
 # 配置环境变量
 sed -i '$aPATH="/usr/local/kafka/bin:$PATH"' /etc/profile
 source /etc/profile
 
 # 3.修改kafka配置文件
 vim /usr/local/kafka/config/server.properties
 listeners=PLAINTEXT://10.0.0.80:9092    # kafka默认监听端口号为9092,
 log.dirs=/data/kafka/data               # 指定kafka数据存放目录
 zookeeper.connect=localhost:2181        # 指定ZooKeeper地址,kafka要将元数据存放到zk中,这里会在本机启动一个zk
 
 # 4.启动kafka
 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
 
 # 5.查看进程及端口
 ps -ef | grep kafka
 ss -tnl | grep 9092                     # kafka监听在9092端口

kafka脚本程序及配置文件

几个kafka的操作脚本

  • kafka-server-start.sh kafka启动程序

  • kafka-server-stop.sh kafka停止程序

  • kafka-topics.sh 创建topic程序

  • kafka-console-producer.sh 命令行模拟生产者生产消息数据程序

  • kafka-console-consumer.sh 命令行模拟消费者消费消息数据程序

kafka的配置文件

vim /usr/local/kafka/config/server.properties

 ############################# Server Basics #############################
 # broker的id,值为整数,且必须唯一,在一个集群中不能重复,默认为0
 broker.id=0
 
 ############################# Socket Server Settings #############################
 # kafka默认监听的端口为9092
 #listeners=PLAINTEXT://:9092
 
 # 处理网络请求的线程数量,默认为3个
 num.network.threads=3
 
 # 执行磁盘IO操作的线程数量,默认为8个
 num.io.threads=8
 
 # socket服务发送数据的缓冲区大小,默认100KB
 socket.send.buffer.bytes=102400
 # socket服务接受数据的缓冲区大小,默认100KB
 socket.receive.buffer.bytes=102400
 
 # socket服务所能接受的一个请求的最大大小,默认为100M
 socket.request.max.bytes=104857600
 
 
 ############################# Log Basics #############################
 # kafka存储消息数据的目录
 log.dirs=../data
 
 # 每个topic默认的partition数量
 num.partitions=1
 
 # 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
 num.recovery.threads.per.data.dir=1
 
 
 ############################# Log Flush Policy #############################
 # 消息刷新到磁盘中的消息条数阈值
 #log.flush.interval.messages=10000
 # 消息刷新到磁盘中的最大时间间隔
 #log.flush.interval.ms=1000
 
 ############################# Log Retention Policy #############################
 # 日志保留小时数,超时会自动删除,默认为7天
 log.retention.hours=168
 
 # 日志保留大小,超出大小会自动删除,默认为1G,log.retention.bytes这是指定 Broker 为消息保存的总磁盘容量大小
 #log.retention.bytes=1073741824
 
 # 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
 log.segment.bytes=1073741824
 
 # 每隔多长时间检测数据是否达到删除条件
 log.retention.check.interval.ms=300000
 
 
 ############################# Zookeeper #############################
 # Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
 zookeeper.connect=localhost:2181
 # 连接zookeeper的超时时间
 zookeeper.connection.timeout.ms=6000
 # 是否可以删除topic,默认为false
 delete.topic.enable=true

kafka集群部署

环境信息

节点 IP ZK Port Kafka Port OS
node01 10.0.0.80 2181 9092 CentOS7.9
node02 10.0.0.81 2181 9092 CentOS7.9
node03 10.0.0.82 2181 9092 CentOS7.9

部署ZooKeeper集群

kakfa依赖ZooKeeper,可以用以下两种方式使用ZooKeeper:

  1. 使用kafka自带的ZooKeeper(一般不推荐使用内置的ZooKeeper)

  2. 单独搭建ZooKeeper

搭建ZooKeeper集群见ZooKeeper文档。

部署kafka集群

所有节点(node01、node02、node03)上操作:

 # 1.安装jdk
 yum install -y java-1.8.0-openjdk
 
 # 2.准备kafka安装包
 tar zxvf kafka_2.11-2.2.0.tgz -C /usr/local/
 ln -s /usr/local/kafka_2.11-2.2.0 /usr/local/kafka
 mkdir -pv /data/kafka/data/   # 创建kafka数据存储目录
 # 配置环境变量
 sed -i '$aPATH="/usr/local/kafka/bin:$PATH"' /etc/profile
 source /etc/profile
 
 # 3.修改kafka配置文件
 broker.id=1                              # 各自节点的id号,每个节点都有自己的id,值为整数,且必须唯一,在一个集群中不能重复,默认为0
 listeners=PLAINTEXT://10.0.0.80:9092                            # kafka默认监听的端口号为9092,指定各自节点的地址和端口
 log.dirs=/data/kafka/data                                       # 指定kafka数据的存放目录
 zookeeper.connect=10.0.0.80:2181,10.0.0.81:2181,10.0.0.82:2181  # zookeeper的连接信息,kafka要将元数据信息存放到zk中
 zookeeper.connection.timeout.ms=600000                          #连接zk超时时间调大,否则可能起不来,默认: 6000
 
 # 4.启动kafka
 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
 
 # 5.查看进程及端口
 ps -ef | grep kafka
 ss -tnl | grep 9092                     # kafka监听在9092端口

生产和消费消息测试

  • kafka-server-start.sh kafka启动程序

  • kafka-server-stop.sh kafka停止程序

  • kafka-topics.sh 创建topic程序

  • kafka-console-producer.sh 命令行模拟生产者生产消息数据程序

  • kafka-console-consumer.sh 命令行模拟消费者消费消息数据程序

topic相关操作

操作topic使用
kafka-topic.sh
脚本

 # 查看主题topic列表,需指定zk的地址
 kafka-topics.sh --list --zookeeper 10.0.0.80:2181  
 
 # 创建topic hello
 kafka-topics.sh --create --zookeeper 10.0.0.80:2181 --replication-factor 1 --partitions 3 --topic hello    
 # --create                     是创建主题topic
 # --zookeeper localhost:2181   主题topic信息是存储在zk中,需要指定zk服务的地址
 # --replication-factor 1       主题topic信息的副本数,因为现在只要一个节点,所以只能是1,有多个节点时候,可以指定副本数多个
 # --partitions 3               主题topic有多少个分区
 # --topic test-topic           指定主题topic的名字
                                                                            
 # 查看某个具体的主题topic消息
 kafka-topics.sh --describe --zookeeper 10.0.0.80:2181 --topic hello                                                    
 
 # 修改主题topic信息,增加到5个分区
 kafka-topics.sh --alter --zookeeper 10.0.0.80:2181 --topic hello --partitions 5    
 
 # 删除主题topic hello
 kafka-topics.sh --delete --zookeeper 10.0.0.80:2181 --topic hello                                                      

生产和消费命令

  • 生产消息:
    kafka-console-producer.sh

  • 消费消息:
    kafka-console-consumer.sh

1)生产消息

使用kafka自带的生产者命令生产消息 (可开一个窗口,模拟生产者)

 # 生产者生产消息,是往topic里发送消息的,需要指明kafka地址和topic的名字
 kafka-console-producer.sh --broker-list 10.0.0.80:9092 --topic test-topic
 >hello
 >test1
 >test2
 >

2)消费消息

使用kafka自带的消费者命令消费消息 (可开多个窗口,模拟消费者)

 # 消费者消费消息,也是从指定topic里取出的,需要指明kafka地址和topic的名字,加--from-beginning是从头开始收,不加就从当前状态开始收
 kafka-console-consumer.sh --bootstrap-server 10.0.0.80:9092 --topic test-topic --from-beginning

查看消息本体及相关数据

查看kafka存放的消息

 # 来到kafka的数据目录,查看kafka存放的消息
 cd /data/kafka/data/  
 ls -d ./test-topic*         # kafka存放的消息会被分布存储在各个分区,这里目录名test-topic就表示对应的topic名称,后缀-0就表示对应的分区
 ./test-topic-0              # 有几个分区就会有几个这样的目录,消息被分布存储在各个目录(目录名称格式: topic名称-分区编号)
 
 # 查看对应分区下的文件(每个分区中存放的消息内容都不一样)
 ls ./test-topic-0/
 00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex leader-epoch-checkpoint
 
 # 查看消息本体
 cat ./test-topic-0/00000000000000000000.log
 =CͰ