分类 其它 下的文章

名称网址
Crontab 验证http://tool.lu/crontab/
Ping 服务器http://ping.chinaz.com/
时间戳转换http://tool.chinaz.com/Tools/unixtime.aspx
UrlEncode 编码/解码http://tool.chinaz.com/tools/urlencode.aspx
Base64 编码/解码http://tool.chinaz.com/Tools/Base64.aspx
base64 图片转换http://tool.chinaz.com/tools/imgtobase/
PHP Formathttps://tool.lu/php
JSON 格式化https://yusure.cn/tools/json/
JSON 压缩转义https://www.sojson.com/yasuo.html
JSON/PHP Array Converterhttps://wow.techbrood.com/fiddle/17793
XML 格式化https://tool.lu/xml/
16进制转文本http://www.bejson.com/convert/ox2str/  https://www.sojson.com/hexadecimal.html
ASCII码在线转换计算器https://www.mokuge.com/tool/asciito16/
ASCII码对照表http://c.biancheng.net/c/ascii/  https://tool.oschina.net/commons?type=4
进制转换https://tool.lu/hexconvert/
RGB颜色值与十六进制https://www.sioe.cn/yingyong/yanse-rgb-16/
RGB HSV 转换https://c.runoob.com/front-end/868
颜色互转http://colorizer.org/
UUID在线生成http://www.kjson.com/encrypt/uuid/
生成随机密码https://suijimimashengcheng.51240.com/
MD5 加密https://www.cmd5.com/hash.aspx?s=123456
MD5 解密https://www.cmd5.com/
SHA1 加密http://www.metools.info/code/c22.html
JWT Decoderhttp://jwt.calebb.net/
AES加密/解密https://the-x.cn/cryptography/Aes.aspx  http://tool.chacuo.net/cryptaes
DES加密/解密https://the-x.cn/cryptography/Des.aspx
3DES加密/解密https://the-x.cn/cryptography/TripleDes.aspx
二维码生成器https://cli.im/
二维码解码https://cli.im/deqr/
WebSocket 在线测试http://www.easyswoole.com/wstool.html  http://www.websocket-test.com/
太平洋与本地时间(China Time)换算表http://www.timebie.com/cn/stdpacific.php
UserAgent 生成http://www.gjw123.com/tools-createuseragent  https://gongjux.com/userAgentGenerator/

新装的机器,如果安装时候没有调整分区,装好之后应该都在/home 下的,所以需要调整到根目录。

目标:将VolGroup-lv_home缩小到20G,并将剩余的空间添加给VolGroup-lv_root


1.首先查看磁盘使用情况

[root@localhost ~]# df -h

文件系统 容量 已用 可用 已用% 挂载点

Filesystem Size Used Avail Use% Mounted on

/dev/mapper/VolGroup-lv_root 154G 7.9G 139G 6% /

tmpfs 1.9G 100K 1.9G 1% /dev/shm

/dev/sda1 485M 69M 391M 15% /boot

/dev/mapper/VolGroup-lv_home 299G 984M 283G 1% /home


2、卸载/home

[root@localhost ~]# umount /home

umount /home 如果提示无法卸载,则是有进程占用/home,使用如下命令来终止占用进程:

[root@localhost ~]# fuser -m /home


3、调整分区大小

[root@localhost ~]# resize2fs -p /dev/mapper/VolGroup-lv_home 20G

如果提示运行 e2fsck -f /dev/mapper/VolGroup-lv_home ,则执行相关命令:

[root@localhost ~]# e2fsck -f /dev/mapper/VolGroup-lv_home 然后重新执行命令:

[root@localhost ~]# resize2fs -p /dev/mapper/VolGroup-lv_home 20G

注:resize2fs 为重新设定磁盘大小,只是重新指定一下大小,并不对结果有影响,需要下面lvreduce的配合


4、挂载上/home,查看磁盘使用情况

[root@localhost ~]# mount /home

[root@localhost ~]# df -h


5、设置空闲空间

使用lvreduce指令用于减少LVM逻辑卷占用的空间大校可能会删除逻辑卷上已有的数据,所以在操作前必须进行确认。记得输入 y

[root@localhost ~]# lvreduce -L 20G /dev/mapper/VolGroup-lv_home

注:lvreduce -L 20G的意思为设置当前文件系统为20G,如果lvreduce -l 20G是指从当前文件系统上减少20G

使用lvreduce减小逻辑卷的大校注意:减小后的大小不能小于文件的大小,否则会丢失数据。

可以使用vgdisplay命令等查看一下可以操作的大校也可以是用fdisk -l命令查看详细信息。

[root@localhost ~]# vgdisplay

注:vgdisplay为显示LVM卷组的元数据信息


6.把闲置空间挂在到根目录下

[root@localhost ~]# lvextend -L +283G /dev/mapper/VolGroup-lv_root

注:lvextend -L +283G为在文件系统上增加283G

[root@localhost ~]# resize2fs -p /dev/mapper/VolGroup-lv_root


7、检查调整结果

[root@localhost ~]# df -h


今天我们聊一下高并发下的网络 IO 模型

高并发即我们所说的 C10K(一个 server 服务 1w 个 client),C10M,写出高并发的程序相信是每个后端程序员的追求,高并发架构其实有一些很通用的架构设计,如无锁化,缓存等,今天我们主要研究下高并发下的网络 IO 模型设计,我们知道不管是 Nginx,还是 Redis,Kafka,RocketMQ 等中间件,都能轻松支持非常高的 QPS,其实它们背后的网络 IO 模型设计理念都是一致的,所以了解这一块对我们了解设计出高并发的网络 IO 框架具体重要意义,本文将会从以下几个方面来循序渐近地向大家介绍如何设计出一个高并发的网络 IO 框架

  • 传统网络 IO 模型的缺陷

  • 针对传统网络 IO 模型缺陷的改进

  • 多线程/多进程

  • 阻塞改为非阻塞

  • IO 多路复用

  • Reactor 的几种模型介绍

传统网络 IO 模型的缺陷

我们首先来看下传统网络 IO 模型有哪些缺陷,主要看它们的阻塞点有哪些。我们用一张图来看下客户端和服务端的基于 TCP 的通信流程

服务端的伪代码如下

listenSocket = socket(); //调用socket系统调用创建一个主动套接字bind(listenSocket);  //绑定地址和端口listen(listenSocket); //将默认的主动套接字转换为服务器使用的被动套接字,也就是监听套接字while (1) { //循环监听是否有客户端连接请求到来   connSocket = accept(listenSocket); //接受客户端连接   recv(connsocket); //从客户端读取数据,只能同时处理一个客户端   send(connsocket); //给客户端返回数据,只能同时处理一个客户端}

可以看到,主要的通信流程如下

  1. server 创建监听 socket 后,执行 bind() 绑定 IP 和端口,然后调用 listen() 监听,代表 server 已经准备好接收请求了,listen 的主要作用其实是初始化半连接和全连接队列大小

  2. server 准备好后,client 也创建 socket ,然后执行 connect 向 server 发起连接请求,这一步会被阻塞,需要等待三次握手完成,第一次握手完成,服务端会创建 socket(这个 socket 是连接 socket,注意不要和第一步的监听 socket 搞混了),将其放入半连接队列中,第三次握手完成,系统会把 socket 从半连接队列摘下放入全连接队列中,然后 accept 会将其从全连接队列中摘下,之后此 socket 就可以与客户端 socket 正常通信了,默认情况下如果全连接队列里没有 socket,则 accept 会阻塞等待三次握手完成

经过三次握手后 client 和 server 就可以基于 socket 进行正常的进程通信了(即调用 write 发送写请求,调用 read 执行读请求),但需要注意的是 read,write 也很可能会被阻塞,需要满足一定的条件读写才会成功返回,在 LInux 中一切皆文件,socket 也不例外,每个打开的文件都有读写缓冲区,如下图所示

对文件执行 read(),write() 的具体流程如下

  1. 当执行 read() 时,会从内核读缓冲区中读取数据,如果缓冲区中没有数据,则会阻塞等待,等数据到达后,会通过 DMA 拷贝将数据拷贝到内核读缓冲区中,然后会唤醒用户线程将数据从内核读缓冲区拷贝到应用缓冲区中

  2. 当执行 write() 时,会将数据从应用缓冲区拷贝到内核写缓冲区,然后再通过 DMA 拷贝将数据从写缓冲区发送到设备上传输出去,如果写缓冲区满,则 write 会阻塞等待写缓冲区可写

经过以上分析,我们可以看到传统的 socket 通信会阻塞在 connect,accept,read/write 这几个操作上,这样的话如果 server 是单进程/线程的话,只要 server 阻塞,就不能再接收其他 client 的处理了,由此可知传统的 socket 无法支持 C10K

针对传统网络 IO 模型缺陷的改进

接下来我们来看看针对传统 IO 模型缺陷的改进,主要有两种

  1. 多进程/线程模型

  2. IO 多路程复用

多进程/线程模型

如果 server 是单进程,阻塞显然会导致 server 无法再处理其他 client 请求了,那我们试试把 server 改成多进程的?只要父进程 accept 了 socket ,就 fork 一个子进程,把这个 socket 交给子进程处理,这样就算子进程阻塞了,也不影响父进程继续监听和其他子进程处理连接

程序伪代码如下

while(1) {
  connfd = accept(listenfd);  // 阻塞建立连接  // fork 创建一个新进程  if (fork() == 0) {
    // accept 后子进程开始工作    doWork(connfd);
  }
}void doWork(connfd) {
  int n = read(connfd, buf);  // 阻塞读数据  doSomeThing(buf);  // 利用读到的数据做些什么  close(connfd);     // 关闭连接,循环等待下一个连接}

通过这种方式确实解决了单进程 server 阻塞无法处理其他 client 请求的问题,但众所周知 fork 创建子进程是非常耗时的,包括页表的复制,进程切换时页表的切换等都非常耗时,每来一个请求就创建一个进程显然是无法接受的

为了节省进程创建的开销,于是有人提出把多进程改成多线程,创建线程(使用 pthread_create)的开销确实小了很多,但同样的,线程与进程一样,都需要占用堆栈等资源,而且碰到阻塞,唤醒等都涉及到用户态,内核态的切换,这些都极大地消耗了性能

由此可知采用多进程/线程的方式并不可取

画外音: 在 Linux 下进程和线程都是用统一的 task_struct 表示,区别不大,所以下文描述不管是进程还是线程区别都不大

阻塞改为非阻塞

既然多进程/多线程的方式并不可取,那能否将进程的阻塞操作(connect,accept,read/write)改为非阻塞呢,这样只要调用这些操作,如果相应的事件未准备好,就立马返回 EWOULDBLOCK 或 EAGAIN 错误,此时进程就不会被阻塞了,使用 fcntl 可以可以将 socket 设置为非阻塞,以 read 为例伪代码如下

connfd = accept(listenfd);
fcntl(connfd, F_SETFL, O_NONBLOCK);// 此时 connfd 变为非阻塞,如果数据未就绪,read 会立即返回int n = read(connfd, buffer) != SUCCESS;

read 的非阻塞操作流程图如下

非阻塞read非阻塞read

这样的话调用 read 就不会阻塞等待而会马上返回了,也就实现了非阻塞的效果,不过需要注意的,我们这里说的非阻塞并非严格意义上的非阻塞,这里的非阻塞只是针对网卡数据拷贝到内核缓冲区这一段,如果数据就绪后,再执行 read 此时依然是阻塞的,此时用户进程会占用 CPU 去把数据从内核缓冲区拷贝到用户缓冲区中,可以看到这种模式是同步非阻塞的,这里我们简单解释下阻塞/非阻塞,同步/非同步的概念

  • 阻塞/非阻塞指的是在数据从网卡拷贝到内核缓冲区期间,进程能不能动,如果能动,就是非阻塞的,不能动就是阻塞的

  • 同步/非同步指的是数据就绪后是否需要用户进程亲自调用 read 来搬运数据(将数据从内核空间拷贝到用户空间),如果需要,则是同步,如果不需要则是非同步(即异步),异步 I/O 示意图如下:

    异步 IO异步 IO

异步 IO 执行流程如下:进程发起 I/O 请求后,让内核在整个操作处理完后再通知进程,这整个操作包括网卡拷贝数据到内核缓冲区,将数据从内核缓冲区拷贝到用户缓冲区这两个阶段,内核在处理数据期间(从无数据到拷贝完成),应用进程是可以继续执行其他逻辑的,异步编程需要操作系统支持,目前只有 windows 完美支持,Linux 暂不支持。可以看出异步 I/O 才是真正意义上的非阻塞操作,因为数据从内核缓冲区拷贝到用户缓冲区这一步不需要用户进程来操作,而是由内核代劳了

我们以一个案例来总结下阻塞/非阻塞,同步/异步:当你去餐馆点餐时,如果在厨师做菜期间,你啥也不能干,那就是阻塞,如果在此期间你可以玩手机,喝喝茶,能动,那就是非阻塞,如果厨师做好了菜,你需要亲自去拿,那就是同步,如果厨师做好了,菜由服务员直接送到你的餐桌,那就是非同步(异步)

现在回过头来看将阻塞转成非阻塞是否满足了我们的需求呢?看起来进程确实可以动了,但进程需要不断地以轮询数据的形式调用 accept,read/write 这此操作来询问内核数据是否就绪了,这些都是系统调用,对性能的消耗很大,而且会持续占用 CPU,导致 CPU 负载很高,远不如等数据就绪好了再通知进程去取更高效。这就好比,厨师做菜期间,你不断地去问菜做好了没有,显然没有意义,更高效的方式无疑是等厨师菜做好了主动通知你去取

IO 多路复用

经过前面的分析我们可以得出两个结论

  1. 使用多进程/多线程 IO 模型是不可行的,这一步可以优化为单线程

  2. 应该等数据就绪好了之后再通知用户进程去读取数据,而不是做毫无意义的轮询,注意这里的数据就绪不光是指前文所述的 read 的数据已就绪,而是泛指 accept,read/write 这三个事件的数据都已就绪

于是 IO 多路复用模型诞生了,它是指用一个进程来监听 listen socket(监听 socket) 的连接建立事件,connect socket(已连接 socket) 的读写事件(读写),一旦数据就绪,内核就会唤醒用户进程去处理这些 socket 的相应的事件

IO多路复用IO多路复用

这里简单介绍一下 fd(文件描述符),以便大家更好地了解之后 IO 多路复用中出现的 fd 集合等概念

文件系统简介

我们知道在 Linux 中无论是文件,socket,还是管道,设备等,一切皆文件,Linux 抽象出了一个 VFS(virtual file system) 层,屏蔽了所有的具体的文件,VFS 提供了统一的接口给上层调用,这样应用层只与 VFS 打交道,极大地方便了用户的开发,仔细对比你会发现,这和 Java 中的面向接口编程很类似

通过 open(),socket() 创建文件后,都有一个 fd(文件描述符) 与之对应,对于每一个进程,都有有一个文件描述符列表(File Discriptor Table) 来记录其打开的文件,这个列表的每一项都指向其背后的具体文件,而每一项对应的数组下标就是 fd,对于用户而言,可以认为 fd 代表其背后指向的文件

fd 的值从 0 开始,其中 0,1,2 是固定的,分别指向标准输入(指向键盘),标准输出/标准错误(指向显示器),之后每打开一个文件,fd 都会从 3 开始递增,但需要注意的是 fd 并不一定都是递增的,如果关闭了文件,之前的 fd 是可以被回收利用的

IO 多路复用其实也就是用一个进程来监听多个 fd 的数据事件,用户可以把自己感兴趣的 fd 及对应感兴趣的事件(accept,read/write)传给内核,然后内核就会检测 fd ,一旦某个 socket 有事件了,内核可以唤醒用户进程来处理

那么怎样才能知道某个 fd 是否有事件呢,一种很容易想到的做法是搞个轮询,每次调用一下 read(fd),让内核告知是否数据已就绪,但是这样的话如果有 n 个感兴趣的 fd 就会有 n 次 read 系统调用,开销很大,显然不可接受

所以使用 IO 多路复用监听 fd 的事件可行,但必须解决以下三个涉及到性能瓶颈的点

  1. 如何高效将用户感兴趣的 fd 和事件传给内核

  2. 某个 socket 数据就绪后,内核如何高效通知用户进程进行处理

  3. 用户进程如何高效处理事件

前面两步的处理目前有 select,poll,epoll 三种 IO 多路事件模型,我们一起来看看,看完你就会知道为啥 epoll 的性能是如此高效了

select

我们先来看下 select 函数的定义

返回:若有就绪描述符则为其数目,若超时则为 0,若出错则为-1int select(int maxfd, 
                     fd_set *readset, 
                     fd_set *writeset, 
                     fd_set *exceptset, 
                     const struct timeval *timeout);

maxfd 是待测试的描述符基数,为待测试的最大描述符加 1,readset,writeset,exceptset 分别为读描述符集合,写描述符集合,异常描述符集合,这三个分别通知内核,在哪些描述描述符上检测数据可以读,可写,有异常事件发生,timeout 可以设置阻塞时间,如果为 null 代表一直阻塞

这里需要说明一下,为啥 maxfd 为待测试的描述符加 1 呢,主要是因为数组的下标是从 0 开始的,假设进程新建了一个 listenfd,它的 fd 为 3,那么代表它有 4 个 感兴趣的 fd(每个进程有固定的 fd = 0,1,2 这三个描述符),由此可知 maxfd = 3 + 1 = 4

接下来我们来看看读,写,异常集合是怎么回事,如何设置针对 fd 的感兴趣事件呢,其实事件集合是采用了一种位结构(bitset)的方式,比如现在假设我们对标准输入(fd = 0),listenfd(fd = 3)的读事件感兴趣,那么就可以在 readset 对应的位上置 1

画外音: 使用 FD_SET 可将相应位置置1,如 FD_SET(listenfd, &readset)

如下

即 readset 为 {1, 0,0, 1},在调用 select 后会将 readset 传给内核,如果内核发现 listenfd 有连接已就绪的事件,则内核也会在将相应位置置 1(其他无就绪事件的 fd 对应的位置为 0)然后会回传给用户线程,此时的 readset 如下,即 {1,0,0,0}

于是进程就可以根据 readset 相应位置是否是 1(用 FD_ISSET(i, &read_set) 来判断)来判断读事件是否就绪了

需要注意的是由于 accept 的 socket 会越来越多,maxfd 和事件 set 都需要及时调整(比如新 accept 一个已连接的 socket,maxfd 可能会变,另外也需要将其加入到读写描述符集合中以让内核监听其读写事件)

可以看到 select 将感兴趣的事件集合存在一个数组里,然后一次性将数组拷贝给了内核,这样 n 次系统调用就转化为了一次,然后用户进程会阻塞在 select 系统调用上,由内核不断循环遍历,如果遍历后发现某些 socket 有事件(accept 或 read/write 准备好了),就会唤醒进程,并且会把数据已就绪的 socket 数量传给用户进程,动图如下

select 图解,图片来自《低并发编程》select 图解,图片来自《低并发编程》

select 的伪代码如下

int listen_fd,conn_fd; //监听套接字和已连接套接字的变量listen_fd = socket() //创建套接字bind(listen_fd)   //绑定套接字listen(listen_fd) //在套接字上进行监听,将套接字转为监听套接字fd_set master_rset;  //被监听的描述符集合,关注描述符上的读事件int max_fd = listen_fd//初始化 master_rset 数组,使用 FD_ZERO 宏设置每个元素为 0 FD_ZERO(&master_rset);//使用 FD_SET 宏设置 master_rset 数组中位置为 listen_fd 的文件描述符为 1,表示需要监听该文件描述符FD_SET(listen_fd,&master_rset);

fd_set working_set;while(1) {

   // 每次都要将 master_set copy 给 working_set,因为 select 返回后 working_set 会被内核修改     working_set = master_set
   /**
    * 调用 select 函数,检测 master_rset 数组保存的文件描述符是否已有读事件就绪,
    * 返回就绪的文件描述符个数,我们只关心读事件,所以其它参数都设置为 null 了
    */   nready = select(max_fd+1, &working_set, NULL, NULL, NULL);

   // 依次检查已连接套接字的文件描述符   for (i = 0; i < max_fd && nready > 0; i++) {
      // 说明 fd = i 的事件已就绪      if (FD_ISSET(i, &working_set)) {
          nready -= 1;
          //调用 FD_ISSET 宏,在 working_set 数组中检测 listen_fd 对应的文件描述符是否就绪         if (FD_ISSET(listen_fd, &working_set)) {
             //如果 listen_fd 已经就绪,表明已有客户端连接;调用 accept 函数建立连接             conn_fd = accept();
             //设置 master_rset 数组中 conn_fd 对应位置的文件描述符为 1,表示需要监听该文件描述符             FD_SET(conn_fd, &master_rset);
             if (conn_fd > max_fd) {
                max_fd = conn_fd;
             }
         } else {
            //有数据可读,进行读数据处理           read(i, xxx)
        }
      }
    }
}

看起来 select 很好,但在生产上用处依然不多,主要是因为 select 有以下劣势:

  1. 每次调用 select,都需要把 fdset 从用户态拷贝到内核态,在高并发下是个巨大的性能开销(可优化为不拷贝)

  2. 调用 select 阻塞后,用户进程虽然没有轮询,但在内核还是通过遍历的方式来检查 fd 的就绪状态(可通过异步 IO 唤醒的方式)

  3. select 只返回已就绪 fd 的数量,用户线程还得再遍历所有的 fd 查看哪些 fd 已准备好了事件(可优化为直接返回给用户进程数据已就绪的 fd 列表)

正在由于 1,2 两个缺点,所以 select 限制了 maxfd 的大小为 1024,表示只能监听 1024 个 fd 的事件,这离 C10k 显然还是有距离的

poll

poll 的机制其实和 select 一样,唯一比较大的区别其实是把 1024 这个限制给放开了,虽然通过放开限制可以使内核监听上万 socket,但由于以上说的两点劣势,它的性能依然不高,所以生产上也不怎么使用

epoll

接下来我们再来介绍下生产上用得最多的 epoll,epoll 其实和 select,poll 这两个系统调用不一样,它本来其实是个内核的数据结构,这个数据结构允许进程监听多个 socket 的 事件,一般我们通过 epoll_create 来创建这个实例

然后我们再调用 epoll_ctl 把 感兴趣的 fd 加入到 epoll 实例中的 interest_list,然后再调用 epoll_wait 即可将控制权交给内核,这样内核就会检测此 interest_list,如果发现 socket 已就绪就会把已就绪的 fd 加入到一个 ready_list(简称 rdlist) 中,然后再唤醒用户进程,之后用户进程只要遍历此 rdlist 即可

为了方便快速对 fd 进行增删改查,必须设计好 interest list 的数据结构,经综合考虑,内核使用了红黑树,而 rdlist 则采用了链表的形式,这样一旦在红黑树上发现了就绪的 socket ,就会把它放到 rdlist 中

epoll 的伪代码如下

int sock_fd,conn_fd; //监听套接字和已连接套接字的变量sock_fd = socket() //创建套接字bind(sock_fd)   //绑定套接字listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字epfd = epoll_create(); //创建epoll实例,//创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);//创建epoll_event变量struct epoll_event ee//监听读事件ee.events = EPOLLIN;//监听的文件描述符是刚创建的监听套接字ee.data.fd = sock_fd;//将监听套接字加入到监听列表中    epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); while (1) {
   //等待返回已经就绪的描述符    n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
   //遍历所有就绪的描述符        for (int i = 0; i < n; i++) {
       //如果是监听套接字描述符就绪,表明有一个新客户端连接到来        if (ep_events[i].data.fd == sock_fd) { 
          conn_fd = accept(sock_fd); //调用accept()建立连接          ee.events = EPOLLIN;  
          ee.data.fd = conn_fd;
          //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件                epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); 

       } else { //如果是已连接套接字描述符就绪,则可以读数据           ...//读取数据并处理           read(ep_events[i].data.fd, ..)
       }
   }
}

epoll 的动图如下

epoll 图解,图片来自《低并发编程》epoll 图解,图片来自《低并发编程》

可以看到 epoll 很好地解决了 select 的痛点

  1. 「每次调用 select 都把 fd 集合拷贝给内核」优化为「只有第一次调用 epoll_ctl 添加感兴趣的 fd 到内核的 epoll 实例中,之后只要调用 epoll_wait 即可,数据集合不再需要拷贝」

  2. 「用户进程调用 select 阻塞后,内核会通过遍历的方式来同步检查 fd 的就绪状态」优化为「内核使用异步事件通知」

  3. 「select 仅返回已就绪 fd 的数量,用户线程还得再遍历一下所有的 fd 来挨个检查哪个 fd 的事件已就绪了」优化为「内核直接返回已就绪的 fd 集合」

除了以上针对 select 痛点进行的改进之外,epoll 还引入了一种边缘触发(edge trigger,ET)的模式,这种模式也会让 epoll 在高并发下的表现更加优秀,而 select/poll 则只有水平触发模式(level trigger,LT),首先我们来了解一下什么是水平触发和边缘触发

  • 水平触发:只要读缓冲区可读(或可写缓冲区可写),就会一直触发可读(或可写)信号

  • 边缘触发:当套接字的缓冲状态发生变化时才会触发读写信号。 对于读缓冲,有新到达的数据被添加到读缓冲时才触发

对于水平触发而言,只要缓冲区里还有数据,内核就会不停地触发读事件,也就意味着如果收到了大量的数据而应用程序每次只会读取一小部分数据时就会不停地从内核态切换到用户态,浪费大量的内核资源,而对于边缘触发而言,只有在套接字的缓冲状态发生变化(即新收到数据或刚好发出数据)时才会触发读写信号,也就意味着内核只通知唤醒用户进程一次,这在高并发下无疑是更佳选择,当然了也正是由于边缘触发模式下内核只会触发一次的原因,read 要尽可能地将数据全部读走(一般是在一个循环里不断地 read ,直到没有数据),否则一旦没有新的数据进来,缓冲区中剩余的数据就无法读取了

既然 epoll 这么好,那么它的性能到底比 select,poll 强多少呢,关于这一点,我们最好做对其进行做下压测,我们来看下 libevent 框架对 select,poll,Epoll,Kqueue(可以认为是 mac 下的 epoll)的压测数据

640640

可以看到,在 100 活跃连接(所谓活跃连接就是读写比较频繁),每个连接发生 1000 次读写操作的情况下,随着句柄数量的增加,epoll 和 Kqueue 的响应时间几乎不变,而 select 和 poll 的响应时间则是急遽增加,所以 epoll 非常适合应对大量网络连接,少量活跃连接的情况

不过需要注意一下这里的限制条件:epoll 在应对大量网络连接时,只有在活跃连接数较少的情况下性能才表现优异,如果图中 15000 的网络连接都是活跃连接,那么 epoll 和 select 的表现是差不多的,甚至有可能 epoll 还不如 select,为什么会这样呢?

  1. select/poll 的开销主要是因为无论就绪的 fd 有多少,都要遍历一遍全部的 fd 来找到就绪的 fd 再处理,如果活跃连接数很少,那么很多时间都浪费在遍历上了,但如有很多活跃连接,那遍历的开销就可忽略不计

  2. 为什么活跃连接多,epoll 表现反而不佳呢,其实主要是因为在唤醒过程中 epoll 实现较为复杂,比如为了保证就绪队列的写入安全,使用了自旋锁,如下

    static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key){
       int pwake = 0;
       unsigned long flags;
       struct epitem *epi = ep_item_from_wait(wait);
       struct eventpoll *ep = epi->ep;
       int ewake = 0;
    
       /* 获得自旋锁 ep->lock来保护就绪队列
        * 自旋锁ep->lock在 epoll_wait() 调用的 ep_poll() 里被释放
        * /
       spin_lock_irqsave(&ep->lock, flags);
    
       /* If this file is already in the ready list we exit soon */   /* 在这里将就绪事件添加到 rdllist */   if (!ep_is_linked(&epi->rdllink)) {
           list_add_tail(&epi->rdllink, &ep->rdllist);
           ep_pm_stay_awake_rcu(epi);
       }
       ...
    }

    这样的话如果活跃连接多的话,锁的开销就比较大了

IO 多路复用+非阻塞

那么当 IO 多路程复用检查到数据就绪后(select(),poll(),epoll_wait() 返回后),该怎么处理呢,有人说直接 accept,read/write 不就完了,话是没错,但之前我们也说了,这些操作其实默认是阻塞的,我们需要将其改成非阻塞,为什么呢,数据不是已经就绪了吗,说明 accept,read/write 这些操作可以正常获取数据啊?

其实数据就绪只是说在调用 select(),poll(),epoll_wait() 返回时的数据是就绪的,但当你再去调用 accept,read/write 时可能就会变成未就绪了,举个例子:当某个 socket 接收缓冲区有新数据分节到达,然后 select 报告这个 socket 描述符可读,但随后,协议栈检查到这个新分节有错误,然后丢弃这个分节,这时候调用 read 则无数据可读,这样的话就会产生一个严重的后果:由于线程阻塞在了 read 上,便再也不能调用 select 来监听 socket 事件了,所以 IO 多路程复用一定要和非阻塞配合使用,也就是说要把 listenfd 和 connectfd 设置为非阻塞才行

Reactor 模式

经过以上介绍相信大家对 IO 多路复用的原理有了比较深刻的理解,我们知道 IO 多路程复用是用一个进程来管理多个 socket 的, 那么是否还有优化的空间呢,我们以 select 为例来拆解一下 IO 多路复用的流程

主要流程如下:调用 select 来监听连接,读写事件,收到事件后判断是否是监听 socket 上的事件,是的话调用 accept(),否则判断是否是已连接 socket 上的读写事件,是的话调用 read(),write()

单进程/线程 Reactor

目前这样的写法没有问题,不过所有的逻辑都藕合在一起,可扩展性不是很好,我们可以将相近的功能划分到同一个模块(以类的形式)中如下

我们将其分成三个模块,Reactor, Acceptor,Handler,主要工作流程如下

  1. Reactor 对象首先调用 select 来监听 socket 事件,收到事件后会通过 dispatch 分发

  2. 如果是连接建立事件,则由 Acceptor 处理,Acceptor 通过调用 accept 接收连接,并且会创建一个 Handler 来处理后续的读写等事件

  3. 如果不是连接建立事件,则 Reactor 会调用连接对应的 Handler 进行响应,handler 会完成 read,业务处理,write 的完整业务流程

以上这些操作其实和之前的 IO 多路复用一样,所有的的都是由一个进程进行操作的,这里多了一个新名词 Reactor,它指的是对事件的响应,如果来了一个事件就把相应的事件 dispatch 给对应的 acceptor 或 handler 对象来处理,由于整个操作都在一个进程里处理,我们把这种模式称为单 Reactor 模型,单 Reactor 要求对事件的响应要快,比如对数据业务的处理要快,像 Redis 就很适合,因为它的数据都是基于内存操作的(当然像 bigKey 这种异常场景除外)

单 Reactor 多线程模型

如果在单进程 Reactor 模型中,业务处理耗时较长,那么线程就会被阻塞,就无法再处理其它事件了,可能会造成严重的性能问题,而且单进程 Reactor 还有一个劣势,那就是无法充分复用多核的优势,于是人们又提出了 单 Reactor 多线程模型,即把业务处理这一块放到一个线程池中处理

通过这种方式主进程的压力得到了释放,也充分复用了多核优势来提升并发度

但依然有如下两个瓶颈点

  1. 子线程处理好数据后需要将其传给 handler 进行发送处理,这涉及到共享数据的互斥和保护机制

  2. 主进程承担的所有事件的监听和响应,瞬时的高并发可能成为性能瓶颈

多 Reactor 多进程/线程模型

为以解决单 Reactor 多线程模型存在的两个问题,人们又提出了多 Reactor 多进程/线程模型模块,示意图如下

工作原理如下

  1. 主进程主要负责 accept 连接,接收后会将其传给 subReactor,subReactor 将其连接加入连接队列中来监控其事件

  2. 一旦子进程中有新的事件被监听到了,则 subReactor 会将其交给 Handler 进入处理

使用这种方式由于数据的 read,业务处理,write 都在一个线程中处理,所以避免了数据的同步加锁操作,父子进程职责很明确,父进程负责 accept,子进程则负责完成后续业务处理

以上介绍的只是标准的 Reactor 模型,但实际上生产上应用的 Reactor 不一定完全遵照这些标准,可能会有一些变化,比如 Nginx 的 Reactor 虽然也是多 Reactor 多进程模型,但它是一种变体:每个子进程都监听了同一个端口,内核接收到连接已建立的事件后会通过负载均衡的方式将其转给其中一个子进程,然后子进程会将其加入到连接队列中监控其事件,监控到事件后也不会转交给其他线程而是自己处理

总结

随着互联网的发展,server 面对的连接越来越多,传统的网络 IO 模型由于在 connect,accept,read/write 这三步中会有阻塞操作,显然无法满足我们 C10K 要求,于是人们提出了多进程/线程模型,每接收一个连接分配一个进程/线程来负责后续的交互,但创建进程/线程本身需要创建堆栈,复制页表等资源,而且进程/线程的上下文切换涉及到用户态,内核态的切换,会造成严重的性能瓶颈,那么把阻塞操作改成非阻塞呢,这样做进程/线程确实是不会被阻塞了,但也意味着 CPU 会做大量的无用功,承担不必要的高负载

综合考虑人们提出了 IO 多路复用这种先进的理念,即一个线程管理监听多个 socket 的事件,当然了其实将 socket 设置成非阻塞的,然后让一个线程不断地去轮询也是能达到一个线程监听多个 socket 的目的,但这样做需要对每个 socket 调用一个 read 系统调用,所以 IO 多路复用还有另一层更重要的意义:将多个系统调用转成一个系统调用再交给内核去监听事件。

IO 多路复用主要有三个模型,select,poll,epoll,每一个都是对前者的改进:

  1. select 是每次调用时都要把 fd 集合拷贝给内核,而且内核是通过不断遍历这种同步的方式来检查 fd 是否有事件就绪,并且一旦检测到有事件后也只是返回有就绪事件的 fd 的个数,用户线程也需要遍历 fd 集合来查看 fd 是否已就绪,select 限制了 fd 的集合只能有 1024 个

  2. poll 和 select 的原理差不多,只不过是放开了 1024 个 fd 的限制,fd 的个数可以任意设置,这样也就让支持 C10k 成为了可能,但由于它的实现机制与 select 类似,所以也存在和 select 一样的性能瓶颈

  3. 为了彻底解决 select,poll 的性能瓶颈,epoll 出现了,它把需要监听的 fd 传给内核 epoll 实例,epoll 以红黑树的形式来管理这些 fd,这样每次 epoll 只需调用 epoll_wait 即可将控制权转给内核来监听 fd 的事件,避免了无意义的 fd 集合的拷贝,同时由于红黑树的高效,一旦某个 socket 来事件了,可以迅速从红黑树中查找到相应的 socket,然后再唤醒用户进程,此过程是异步的,比起 select 的同步唤醒又是一大进步,此时唤醒用户进程后,内核会把数据已就绪的 fd 放到一个就绪列表里传给用户进程,用户进程只需要遍历此就绪列表即可,比起 select 需要全部遍历也是一大进步,除此之外 epoll 引入了边缘触发让其在高并发下的表现也更加优异,正是由于 epoll 对 select,poll 的这些改进,也让它成为了 IO 多路复用绝对的王者

IO 多路复用是指用一个进程/线程去监听多个 socket 的事件,也即意味着一旦事件就绪了进程需要快速地处理这些事件(不然其他 socket 事件的处理的会阻塞),这种对 Redis 非常合适,因为它是基于内存操作的,处理非常快,但对于其它的开源框架,只有一个进程/线程显然是不太满足业务需要的,比如业务处理,可能是 CPU 密集型的,如果只一个进程/线程的话,可能会阻塞在业务处理上,于是人们基于 IO 多路复用又提出了 Reactor 模型,Reactor 即对事件的反应,然后派发事件给相应的处理器,Reator 模型有多个变种,如单 Reactor,单 Reactor 多线程,多 Reacor 多进程/线程模型,这三种模型各有各的优势,主要是为了充分利用多线程/多核来提升性能或是为了避免瞬时的高并发让主线程崩溃

由此可见,网络 IO 模型经历了传统的网络 IO 模型 ---> IO 多路程复用(select,poll,epoll) --> Reactor 模型这三个阶段,主要是为了满足日益增长的 C10k 甚至 C100K 等超高连接数的要求。


从零开始自己动手写阻塞队列

前言

在我们平时编程的时候一个很重要的工具就是容器,在本篇文章当中主要给大家介绍阻塞队列的原理,并且在了解原理之后自己动手实现一个低配版的阻塞队列。

需求分析

在前面的两篇文章ArrayDeque(JDK双端队列)源码深度剖析深入剖析(JDK)ArrayQueue源码当中我们仔细介绍了队列的原理,如果大家感兴趣可以查看一下!

而在本篇文章所谈到的阻塞队列当中,是在并发的情况下使用的,上面所谈到的是队列是并发不安全的,但是阻塞队列在并发下情况是安全的。阻塞队列的主要的需求如下:

  • 队列基础的功能需要有,往队列当中放数据,从队列当中取数据。

  • 所有的队列操作都要是并发安全的。

  • 当队列满了之后再往队列当中放数据的时候,线程需要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程需要被唤醒。

  • 当队列空了之后再往队列当中取数据的时候,线程需要被挂起,当有线程往队列当中加入数据的时候被挂起的线程需要被唤醒。

  • 在我们实现的队列当中我们使用数组去存储数据,因此在构造函数当中需要提供数组的初始大小,设置用多大的数组。

阻塞队列实现原理

线程阻塞和唤醒

在上面我们已经谈到了阻塞队列是并发安全的,而且我们还有将线程唤醒和阻塞的需求,因此我们可以选择可重入锁ReentrantLock保证并发安全,但是我们还需要将线程唤醒和阻塞,因此我们可以选择条件变量Condition进行线程的唤醒和阻塞操作,在Condition当中我们将会使用到的,主要有以下两个函数:

  • signal用于唤醒线程,当一个线程调用Conditionsignal函数的时候就可以唤醒一个被await函数阻塞的线程。

  • await用于阻塞线程,当一个线程调用Conditionawait函数的时候这个线程就会阻塞。

数组循环使用

因为队列是一端进一端出,因此队列肯定有头有尾。

当我们往队列当中加入一些数据之后,队列的情况可能如下:

在上图的基础之上我们在进行四次出队操作,结果如下:

在上面的状态下,我们继续加入8个数据,那么布局情况如下:

我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。

为了保证数组的循环使用,我们需要用一个变量记录队列头在数组当中的位置,用一个变量记录队列尾部在数组当中的位置,还需要有一个变量记录队列当中有多少个数据。

代码实现

成员变量定义

根据上面的分析我们可以知道,在我们自己实现的类当中我们需要有如下的类成员变量:

// 用于保护临界区的锁private final ReentrantLock lock;// 用于唤醒取数据的时候被阻塞的线程private final Condition notEmpty;// 用于唤醒放数据的时候被阻塞的线程private final Condition notFull;// 用于记录从数组当中取数据的位置 也就是队列头部的位置private int takeIndex;// 用于记录从数组当中放数据的位置 也就是队列尾部的位置private int putIndex;// 记录队列当中有多少个数据private int count;// 用于存放具体数据的数组private Object[] items;

构造函数

我们的构造函数也很简单,最核心的就是传入一个数组大小的参数,并且给上面的变量进行初始化赋值。

@SuppressWarnings("unchecked")public MyArrayBlockingQueue(int size) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.notFull = lock.newCondition();// 其实可以不用初始化 类会有默认初始化 默认初始化为0takeIndex = 0;putIndex = 0;count = 0;// 数组的长度肯定不能够小于0if (size <= 0)throw new RuntimeException("size can not be less than 1");items = (E[])new Object[size];}

put函数

这是一个比较重要的函数了,在这个函数当中如果队列没有满,则直接将数据放入到数组当中即可,如果数组满了,则需要将线程挂起。

public void put(E x){// put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程// 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作// 因此这里需要上锁lock.lock();try {// 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了// 这个时候需要将线程挂起while (count == items.length)notFull.await(); // 将调用 await的线程挂起// 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了// 这个时候需要将数组入队 // 调用入队函数将数据入队enqueue(x);} catch (InterruptedException e) {e.printStackTrace();} finally {// 解锁lock.unlock();}}// 将数据入队private void enqueue(E x) {this.items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒}

offer函数

offer函数和put函数一样,但是与put函数不同的是,当数组当中数据填满之后offer函数返回false,而不是被阻塞。

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 如果数组满了 则直接返回false 而不是被阻塞if (count == items.length)return false;else {// 如果数组没有满则直接入队 并且返回 trueenqueue(e);return true;}} finally {lock.unlock();}}

add函数

这个函数和上面两个函数作用一样,也是往队列当中加入数据,但当单队列满了之后这个函数会抛出异常。

public boolean add(E e) {if (offer(e))return true;elsethrow new RuntimeException("Queue full");}

take函数

这个函数主要是从队列当中取出一个数据,但是当队列为空的时候,这个函数会阻塞调用该函数的线程:

public E take() throws InterruptedException {// 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据// 进行加锁操作lock.lock();try {// 当 count 等于0 说明队列为空// 需要将线程挂起等待while (count == 0)notEmpty.await();// 当被唤醒之后进行出队操作return dequeue();}finally {lock.unlock();}}private E  dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了if (++takeIndex == items.length)takeIndex = 0;count--; // 队列当中数据少一个了// 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程// 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒notFull.signal(); // 唤醒一个被 put 函数阻塞的线程return x;}

重写toString函数

因为我们在后面的测试函数当中会打印我们这个类,而打印这个类的时候会调用对象的toString方法得到一个字符串,最后打印这个字符串。

@Overridepublic String toString() {StringBuilder stringBuilder = new StringBuilder();stringBuilder.append("[");// 这里需要上锁 因为我们在打印的时候需要打印所有的数据// 打印所有的数据就需要对数组进行遍历操作 而在进行遍历// 操作的时候是不能进行插入和删除操作的 因为打印的是某// 个时刻的数据lock.lock();try {if (count == 0)stringBuilder.append("]");else {int cur = 0;// 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count// 个数据while (cur != count) {// 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");cur += 1;}// 删除掉最后一次没用的 ", "stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());stringBuilder.append(']');}}finally {lock.unlock();}return stringBuilder.toString();}

完整代码

整个我们自己完成的阻塞队列的代码如下:

import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class MyArrayBlockingQueue<E> {// 用于保护临界区的锁private final ReentrantLock lock;// 用于唤醒取数据的时候被阻塞的线程private final Condition notEmpty;// 用于唤醒放数据的时候被阻塞的线程private final Condition notFull;// 用于记录从数组当中取数据的位置 也就是队列头部的位置private int takeIndex;// 用于记录从数组当中放数据的位置 也就是队列尾部的位置private int putIndex;// 记录队列当中有多少个数据private int count;// 用于存放具体数据的数组private Object[] items;@SuppressWarnings("unchecked")public MyArrayBlockingQueue(int size) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.notFull = lock.newCondition();// 其实可以不用初始化 类会有默认初始化 默认初始化为0takeIndex = 0;putIndex = 0;count = 0;if (size <= 0)throw new RuntimeException("size can not be less than 1");items = (E[])new Object[size];}public void put(E x){lock.lock();try {while (count == items.length)notFull.await();enqueue(x);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}private void enqueue(E x) {this.items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}private E  dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;notFull.signal();return x;}public boolean add(E e) {if (offer(e))return true;elsethrow new RuntimeException("Queue full");}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}public E take() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await();return dequeue();}finally {lock.unlock();}}@Overridepublic String toString() {StringBuilder stringBuilder = new StringBuilder();stringBuilder.append("[");lock.lock();try {if (count == 0)stringBuilder.append("]");else {int cur = 0;while (cur != count) {stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");cur += 1;}stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());stringBuilder.append(']');}}finally {lock.unlock();}return stringBuilder.toString();}}

现在对上面的代码进行测试:

我们现在使用阻塞队列模拟一个生产者消费者模型,设置阻塞队列的大小为5,生产者线程会往队列当中加入数据,数据为0-9的10个数字,消费者线程一共会消费10次。

import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);Thread thread = new Thread(() -> {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i);queue.put(i);}}, "生产者");Thread thread1 = new Thread(() -> {for (int i = 0; i < 10; i++) {try {System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take());System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue);} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者");thread.start();TimeUnit.SECONDS.sleep(3);thread1.start();}}

上面代码的输出如下所示:

生产者 往队列当中加入数据:0生产者 往队列当中加入数据:1生产者 往队列当中加入数据:2生产者 往队列当中加入数据:3生产者 往队列当中加入数据:4生产者 往队列当中加入数据:5消费者 从队列当中取出数据:0生产者 往队列当中加入数据:6消费者 当前队列当中的数据:[1, 2, 3, 4, 5]消费者 从队列当中取出数据:1消费者 当前队列当中的数据:[2, 3, 4, 5]消费者 从队列当中取出数据:2消费者 当前队列当中的数据:[3, 4, 5, 6]生产者 往队列当中加入数据:7消费者 从队列当中取出数据:3消费者 当前队列当中的数据:[4, 5, 6, 7]消费者 从队列当中取出数据:4消费者 当前队列当中的数据:[5, 6, 7]消费者 从队列当中取出数据:5消费者 当前队列当中的数据:[6, 7]生产者 往队列当中加入数据:8消费者 从队列当中取出数据:6消费者 当前队列当中的数据:[7, 8]消费者 从队列当中取出数据:7消费者 当前队列当中的数据:[8]消费者 从队列当中取出数据:8消费者 当前队列当中的数据:[]生产者 往队列当中加入数据:9消费者 从队列当中取出数据:9消费者 当前队列当中的数据:[]

从上面的输出结果我们知道,生产者线程打印5之后被挂起了,因为如果没有被挂起,生产者线程肯定可以一次性输出完成,因为消费者线程阻塞了3秒。但是他没有输出完成说明在打印5之后,因为阻塞队列满了,因而生产者线程被挂起了。然后消费者开始消费,这样阻塞队列当中就有空间了,生产者线程就可以继续生产了。

总结

在本篇文章当中,主要向大家介绍了阻塞队列的原理并且实现了一个低配版的数组阻塞队列,其实如果你了解数组队列和锁的话,这个代码实现起来还是相对比较简单的,我们只需要使用锁去保证我们的程序并发安全即可。

  • 我们在实现put函数的时候,如果当前队列已经满了,则当前线程需要调用await函数进行阻塞,当线程被唤醒或者队列没有满可以继续执行的时候,我们在往队列当中加入数据之后需要调用一次signal函数,因为这样可以唤醒在调用take函数的时候因为队列空而阻塞的线程。

  • 我们实现take函数的时候,如果当前队列已经空了,则当前线程也需要调用await函数进行阻塞,当线程被唤醒或者队列不为空线程可以继续执行,在出队之后需要调用一次signal函数,因为这样可以唤醒在调用put函数的时候因为队列满而阻塞的线程。


以上就是本篇文章的所有内容了,我是LeHung,我们下期再见!!!更多精彩内容合集可访问项目:https://github.com/Chang-LeHung/CSCore


前言

我们在写并发程序的时候,一个非常常见的需求就是保证在某一个时刻只有一个线程执行某段代码,像这种代码叫做临界区,而通常保证一个时刻只有一个线程执行临界区的代码的方法就是锁。在本篇文章当中我们将会仔细分析和学习自旋锁,所谓自旋锁就是通过while循环实现的,让拿到锁的线程进入临界区执行代码,让没有拿到锁的线程一直进行while死循环,这其实就是线程自己“旋”在while循环了,因而这种锁就叫做自旋锁。

自旋锁

原子性

在谈自旋锁之前就不得不谈原子性了。所谓原子性简单说来就是一个一个操作要么不做要么全做,全做的意思就是在操作的过程当中不能够被中断,比如说对变量data进行加一操作,有以下三个步骤:

  • data从内存加载到寄存器。

  • data这个值加一。

  • 将得到的结果写回内存。

原子性就表示一个线程在进行加一操作的时候,不能够被其他线程中断,只有这个线程执行完这三个过程的时候其他线程才能够操作数据data

我们现在用代码体验一下,在Java当中我们可以使用AtomicInteger进行对整型数据的原子操作:

import java.util.concurrent.atomic.AtomicInteger;public class AtomicDemo {public static void main(String[] args) throws InterruptedException {AtomicInteger data = new AtomicInteger();data.set(0); // 将数据初始化位0Thread t1 = new Thread(() -> {for (int i = 0; i < 100000; i++) {data.addAndGet(1); // 对数据 data 进行原子加1操作}});Thread t2 = new Thread(() -> {for (int i = 0; i < 100000; i++) {data.addAndGet(1);// 对数据 data 进行原子加1操作}});// 启动两个线程t1.start();t2.start();// 等待两个线程执行完成t1.join();t2.join();// 打印最终的结果System.out.println(data); // 200000}}

从上面的代码分析可以知道,如果是一般的整型变量如果两个线程同时进行操作的时候,最终的结果是会小于200000。

我们现在来模拟一下一般的整型变量出现问题的过程:

  • 主内存data的初始值等于0,两个线程得到的data初始值都等于0。

  • 现在线程一将data加一,然后线程一将data的值同步回主内存,整个内存的数据变化如下:

  • 现在线程二data加一,然后将data的值同步回主内存(将原来主内存的值覆盖掉了):

我们本来希望data的值在经过上面的变化之后变成2,但是线程二覆盖了我们的值,因此在多线程情况下,会使得我们最终的结果变小。

但是在上面的程序当中我们最终的输出结果是等于20000的,这是因为给data进行+1的操作是原子的不可分的,在操作的过程当中其他线程是不能对data进行操作的。这就是原子性带来的优势。

自己动手写自旋锁

AtomicInteger类

现在我们已经了解了原子性的作用了,我们现在来了解AtomicInteger类的另外一个原子性的操作——compareAndSet,这个操作叫做比较并交换(CAS),他具有原子性。

public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger();atomicInteger.set(0);atomicInteger.compareAndSet(0, 1);}

compareAndSet函数的意义:首先会比较第一个参数(对应上面的代码就是0)和atomicInteger的值,如果相等则进行交换,也就是将atomicInteger的值设置为第二个参数(对应上面的代码就是1),如果这些操作成功,那么compareAndSet函数就返回true,如果操作失败则返回false,操作失败可能是因为第一个参数的值(期望值)和atomicInteger不相等,如果相等也可能因为在更改atomicInteger的值的时候失败(因为可能有多个线程在操作,因为原子性的存在,只能有一个线程操作成功)。

自旋锁实现原理

我们可以使用AtomicInteger类实现自旋锁,我们可以用0这个值表示未上锁,1这个值表示已经上锁了。

  • AtomicInteger类的初始值为0。

  • 在上锁时,我们可以使用代码atomicInteger.compareAndSet(0, 1)进行实现,我们在前面已经提到了只能够有一个线程完成这个操作,也就是说只能有一个线程调用这行代码然后返回true其余线程都返回false,这些返回false的线程不能够进入临界区,因此我们需要这些线程停在atomicInteger.compareAndSet(0, 1)这行代码不能够往下执行,我们可以使用while循环让这些线程一直停在这里while (!value.compareAndSet(0, 1));,只有返回true的线程才能够跳出循环,其余线程都会一直在这里循环,我们称这种行为叫做自旋,这种锁因而也被叫做自旋锁

  • 线程在出临界区的时候需要重新将锁的状态调整为未上锁的上状态,我们使用代码value.compareAndSet(1, 0);就可以实现,将锁的状态还原为未上锁的状态,这样其他的自旋的线程就可以拿到锁,然后进入临界区了。

自旋锁代码实现

import java.util.concurrent.atomic.AtomicInteger;public class SpinLock {// 0 表示未上锁状态// 1 表示上锁状态protected AtomicInteger value;public SpinLock() {this.value = new AtomicInteger();// 设置 value 的初始值为0 表示未上锁的状态this.value.set(0);}public void lock() {// 进行自旋操作while (!value.compareAndSet(0, 1));}public void unlock() {// 将锁的状态设置为未上锁状态value.compareAndSet(1, 0);}}

上面就是我们自己实现的自旋锁的代码,这看起来实在太简单了,但是它确实帮助我们实现了一个锁,而且能够在真实场景进行使用的,我们现在用代码对上面我们写的锁进行测试。

测试程序:

public class SpinLockTest {public static int data;public static SpinLock lock = new SpinLock();public static void add() {for (int i = 0; i < 100000; i++) {// 上锁 只能有一个线程执行 data++ 操作 其余线程都只能进行while循环lock.lock();data++;lock.unlock();}}public static void main(String[] args) throws InterruptedException {Thread[] threads = new Thread[100];// 设置100个线程for (int i = 0; i < 100; i ++) {threads[i] = new Thread(SpinLockTest::add);}// 启动一百个线程for (int i = 0; i < 100; i++) {threads[i].start();}// 等待这100个线程执行完成for (int i = 0; i < 100; i++) {threads[i].join();}System.out.println(data); // 10000000}}

在上面的代码单中,我们使用100个线程,然后每个线程循环执行100000data++操作,上面的代码最后输出的结果是10000000,和我们期待的结果是相等的,这就说明我们实现的自旋锁是正确的。

自己动手写可重入自旋锁

可重入自旋锁

在上面实现的自旋锁当中已经可以满足一些我们的基本需求了,就是一个时刻只能够有一个线程执行临界区的代码。但是上面的的代码并不能够满足重入的需求,也就是说上面写的自旋锁并不是一个可重入的自旋锁,事实上在上面实现的自旋锁当中重入的话就会产生死锁。

我们通过一份代码来模拟上面重入产生死锁的情况:

public static void add(int state) throws InterruptedException {TimeUnit.SECONDS.sleep(1);if (state <= 3) {lock.lock();System.out.println(Thread.currentThread().getName() + "\t进入临界区 state = " + state);for (int i = 0; i < 10; i++)data++;add(state + 1); // 进行递归重入 重入之前锁状态已经是1了 因为这个线程进入了临界区lock.unlock();}}
  • 在上面的代码当中加入我们传入的参数state的值为1,那么在线程执行for循环之后再次递归调用add函数的话,那么state的值就变成了2。

  • if条件仍然满足,这个线程也需要重新获得锁,但是此时锁的状态是1,这个线程已经获得过一次锁了,但是自旋锁期待的锁的状态是0,因为只有这样他才能够再次获得锁,进入临界区,但是现在锁的状态是1,也就是说虽然这个线程获得过一次锁,但是它也会一直进行while循环而且永远都出不来了,这样就形成了死锁了。

可重入自旋锁思想

针对上面这种情况我们需要实现一个可重入的自旋锁,我们的思想大致如下:

  • 在我们实现的自旋锁当中,我们可以增加两个变量,owner一个用于存当前拥有锁的线程,count一个记录当前线程进入锁的次数。

  • 如果线程获得锁,owner = Thread.currentThread()并且count = 1

  • 当线程下次再想获取锁的时候,首先先看owner是不是指向自己,则一直进行循环操作,如果是则直接进行count++操作,然后就可以进入临界区了。

  • 我们在出临界区的时候,如果count大于一的话,说明这个线程重入了这把锁,因此不能够直接将锁设置为0也就是未上锁的状态,这种情况直接进行count--操作,如果count等于1的话,说明线程当前的状态不是重入状态(可能是重入之后递归返回了),因此在出临界区之前需要将锁的状态设置为0,也就是没上锁的状态,好让其他线程能够获取锁。

可重入锁代码实现:

实现的可重入锁代码如下:

public class ReentrantSpinLock extends SpinLock {private Thread owner;private int count;@Overridepublic void lock() {if (owner == null || owner != Thread.currentThread()) {while (!value.compareAndSet(0, 1));owner = Thread.currentThread();count = 1;}else {count++;}}@Overridepublic void unlock() {if (count == 1) {count = 0;value.compareAndSet(1, 0);}elsecount--;}}

下面我们通过一个递归程序去验证我们写的可重入的自旋锁是否能够成功工作。

测试程序:

import java.util.concurrent.TimeUnit;public class ReentrantSpinLockTest {public static int data;public static ReentrantSpinLock lock = new ReentrantSpinLock();public static void add(int state) throws InterruptedException {TimeUnit.SECONDS.sleep(1);if (state <= 3) {lock.lock();System.out.println(Thread.currentThread().getName() + "\t进入临界区 state = " + state);for (int i = 0; i < 10; i++)data++;add(state + 1);lock.unlock();}}public static void main(String[] args) throws InterruptedException {Thread[] threads = new Thread[10];for (int i = 0; i < 10; i++) {threads[i] = new Thread(new Thread(() -> {try {ReentrantSpinLockTest.add(1);} catch (InterruptedException e) {e.printStackTrace();}}, String.valueOf(i)));}for (int i = 0; i < 10; i++) {threads[i].start();}for (int i = 0; i < 10; i++) {threads[i].join();}System.out.println(data);}}

上面程序的输出:

Thread-3	进入临界区 state = 1Thread-3	进入临界区 state = 2Thread-3	进入临界区 state = 3Thread-0	进入临界区 state = 1Thread-0	进入临界区 state = 2Thread-0	进入临界区 state = 3Thread-9	进入临界区 state = 1Thread-9	进入临界区 state = 2Thread-9	进入临界区 state = 3Thread-4	进入临界区 state = 1Thread-4	进入临界区 state = 2Thread-4	进入临界区 state = 3Thread-7	进入临界区 state = 1Thread-7	进入临界区 state = 2Thread-7	进入临界区 state = 3Thread-8	进入临界区 state = 1Thread-8	进入临界区 state = 2Thread-8	进入临界区 state = 3Thread-5	进入临界区 state = 1Thread-5	进入临界区 state = 2Thread-5	进入临界区 state = 3Thread-2	进入临界区 state = 1Thread-2	进入临界区 state = 2Thread-2	进入临界区 state = 3Thread-6	进入临界区 state = 1Thread-6	进入临界区 state = 2Thread-6	进入临界区 state = 3Thread-1	进入临界区 state = 1Thread-1	进入临界区 state = 2Thread-1	进入临界区 state = 3300

从上面的输出结果我们就可以知道,当一个线程能够获取锁的时候他能够进行重入,而且最终输出的结果也是正确的,因此验证了我们写了可重入自旋锁是有效的!

总结

在本篇文章当中主要给大家介绍了自旋锁和可重入自旋锁的原理,并且实现了一遍,其实代码还是比较简单关键需要大家将这其中的逻辑理清楚:

  • 所谓自旋锁就是通过while循环实现的,让拿到锁的线程进入临界区执行代码,让没有拿到锁的线程一直进行while死循环。

  • 可重入的含义就是一个线程已经竞争到了一个锁,在竞争到这个锁之后又一次有重入临界区代码的需求,如果能够保证这个线程能够重新进入临界区,这就叫可重入。

  • 我们在实现自旋锁的时候使用的是AtomicInteger类,并且我们使用0和1这两个数值用于表示无锁和锁被占用两个状态,在获取锁的时候使用while循环不断进行CAS操作,直到操作成功返回true,在释放锁的时候使用CAS将锁的状态从1变成0。

  • 实现可重入锁最重要的一点就是需要记录是那个线程获得了锁,同时还需要记录获取了几次锁,因为我们在解锁的时候需要进行判断,之后count = 1的情况才能将锁的状态从1设置成0。


以上就是本篇文章的所有内容了,我是LeHung,我们下期再见!!!更多精彩内容合集可访问项目:https://github.com/Chang-LeHung/CSCore