2022年8月

今天我们聊一下高并发下的网络 IO 模型

高并发即我们所说的 C10K(一个 server 服务 1w 个 client),C10M,写出高并发的程序相信是每个后端程序员的追求,高并发架构其实有一些很通用的架构设计,如无锁化,缓存等,今天我们主要研究下高并发下的网络 IO 模型设计,我们知道不管是 Nginx,还是 Redis,Kafka,RocketMQ 等中间件,都能轻松支持非常高的 QPS,其实它们背后的网络 IO 模型设计理念都是一致的,所以了解这一块对我们了解设计出高并发的网络 IO 框架具体重要意义,本文将会从以下几个方面来循序渐近地向大家介绍如何设计出一个高并发的网络 IO 框架

  • 传统网络 IO 模型的缺陷

  • 针对传统网络 IO 模型缺陷的改进

  • 多线程/多进程

  • 阻塞改为非阻塞

  • IO 多路复用

  • Reactor 的几种模型介绍

传统网络 IO 模型的缺陷

我们首先来看下传统网络 IO 模型有哪些缺陷,主要看它们的阻塞点有哪些。我们用一张图来看下客户端和服务端的基于 TCP 的通信流程

服务端的伪代码如下

listenSocket = socket(); //调用socket系统调用创建一个主动套接字bind(listenSocket);  //绑定地址和端口listen(listenSocket); //将默认的主动套接字转换为服务器使用的被动套接字,也就是监听套接字while (1) { //循环监听是否有客户端连接请求到来   connSocket = accept(listenSocket); //接受客户端连接   recv(connsocket); //从客户端读取数据,只能同时处理一个客户端   send(connsocket); //给客户端返回数据,只能同时处理一个客户端}

可以看到,主要的通信流程如下

  1. server 创建监听 socket 后,执行 bind() 绑定 IP 和端口,然后调用 listen() 监听,代表 server 已经准备好接收请求了,listen 的主要作用其实是初始化半连接和全连接队列大小

  2. server 准备好后,client 也创建 socket ,然后执行 connect 向 server 发起连接请求,这一步会被阻塞,需要等待三次握手完成,第一次握手完成,服务端会创建 socket(这个 socket 是连接 socket,注意不要和第一步的监听 socket 搞混了),将其放入半连接队列中,第三次握手完成,系统会把 socket 从半连接队列摘下放入全连接队列中,然后 accept 会将其从全连接队列中摘下,之后此 socket 就可以与客户端 socket 正常通信了,默认情况下如果全连接队列里没有 socket,则 accept 会阻塞等待三次握手完成

经过三次握手后 client 和 server 就可以基于 socket 进行正常的进程通信了(即调用 write 发送写请求,调用 read 执行读请求),但需要注意的是 read,write 也很可能会被阻塞,需要满足一定的条件读写才会成功返回,在 LInux 中一切皆文件,socket 也不例外,每个打开的文件都有读写缓冲区,如下图所示

对文件执行 read(),write() 的具体流程如下

  1. 当执行 read() 时,会从内核读缓冲区中读取数据,如果缓冲区中没有数据,则会阻塞等待,等数据到达后,会通过 DMA 拷贝将数据拷贝到内核读缓冲区中,然后会唤醒用户线程将数据从内核读缓冲区拷贝到应用缓冲区中

  2. 当执行 write() 时,会将数据从应用缓冲区拷贝到内核写缓冲区,然后再通过 DMA 拷贝将数据从写缓冲区发送到设备上传输出去,如果写缓冲区满,则 write 会阻塞等待写缓冲区可写

经过以上分析,我们可以看到传统的 socket 通信会阻塞在 connect,accept,read/write 这几个操作上,这样的话如果 server 是单进程/线程的话,只要 server 阻塞,就不能再接收其他 client 的处理了,由此可知传统的 socket 无法支持 C10K

针对传统网络 IO 模型缺陷的改进

接下来我们来看看针对传统 IO 模型缺陷的改进,主要有两种

  1. 多进程/线程模型

  2. IO 多路程复用

多进程/线程模型

如果 server 是单进程,阻塞显然会导致 server 无法再处理其他 client 请求了,那我们试试把 server 改成多进程的?只要父进程 accept 了 socket ,就 fork 一个子进程,把这个 socket 交给子进程处理,这样就算子进程阻塞了,也不影响父进程继续监听和其他子进程处理连接

程序伪代码如下

while(1) {
  connfd = accept(listenfd);  // 阻塞建立连接  // fork 创建一个新进程  if (fork() == 0) {
    // accept 后子进程开始工作    doWork(connfd);
  }
}void doWork(connfd) {
  int n = read(connfd, buf);  // 阻塞读数据  doSomeThing(buf);  // 利用读到的数据做些什么  close(connfd);     // 关闭连接,循环等待下一个连接}

通过这种方式确实解决了单进程 server 阻塞无法处理其他 client 请求的问题,但众所周知 fork 创建子进程是非常耗时的,包括页表的复制,进程切换时页表的切换等都非常耗时,每来一个请求就创建一个进程显然是无法接受的

为了节省进程创建的开销,于是有人提出把多进程改成多线程,创建线程(使用 pthread_create)的开销确实小了很多,但同样的,线程与进程一样,都需要占用堆栈等资源,而且碰到阻塞,唤醒等都涉及到用户态,内核态的切换,这些都极大地消耗了性能

由此可知采用多进程/线程的方式并不可取

画外音: 在 Linux 下进程和线程都是用统一的 task_struct 表示,区别不大,所以下文描述不管是进程还是线程区别都不大

阻塞改为非阻塞

既然多进程/多线程的方式并不可取,那能否将进程的阻塞操作(connect,accept,read/write)改为非阻塞呢,这样只要调用这些操作,如果相应的事件未准备好,就立马返回 EWOULDBLOCK 或 EAGAIN 错误,此时进程就不会被阻塞了,使用 fcntl 可以可以将 socket 设置为非阻塞,以 read 为例伪代码如下

connfd = accept(listenfd);
fcntl(connfd, F_SETFL, O_NONBLOCK);// 此时 connfd 变为非阻塞,如果数据未就绪,read 会立即返回int n = read(connfd, buffer) != SUCCESS;

read 的非阻塞操作流程图如下

非阻塞read非阻塞read

这样的话调用 read 就不会阻塞等待而会马上返回了,也就实现了非阻塞的效果,不过需要注意的,我们这里说的非阻塞并非严格意义上的非阻塞,这里的非阻塞只是针对网卡数据拷贝到内核缓冲区这一段,如果数据就绪后,再执行 read 此时依然是阻塞的,此时用户进程会占用 CPU 去把数据从内核缓冲区拷贝到用户缓冲区中,可以看到这种模式是同步非阻塞的,这里我们简单解释下阻塞/非阻塞,同步/非同步的概念

  • 阻塞/非阻塞指的是在数据从网卡拷贝到内核缓冲区期间,进程能不能动,如果能动,就是非阻塞的,不能动就是阻塞的

  • 同步/非同步指的是数据就绪后是否需要用户进程亲自调用 read 来搬运数据(将数据从内核空间拷贝到用户空间),如果需要,则是同步,如果不需要则是非同步(即异步),异步 I/O 示意图如下:

    异步 IO异步 IO

异步 IO 执行流程如下:进程发起 I/O 请求后,让内核在整个操作处理完后再通知进程,这整个操作包括网卡拷贝数据到内核缓冲区,将数据从内核缓冲区拷贝到用户缓冲区这两个阶段,内核在处理数据期间(从无数据到拷贝完成),应用进程是可以继续执行其他逻辑的,异步编程需要操作系统支持,目前只有 windows 完美支持,Linux 暂不支持。可以看出异步 I/O 才是真正意义上的非阻塞操作,因为数据从内核缓冲区拷贝到用户缓冲区这一步不需要用户进程来操作,而是由内核代劳了

我们以一个案例来总结下阻塞/非阻塞,同步/异步:当你去餐馆点餐时,如果在厨师做菜期间,你啥也不能干,那就是阻塞,如果在此期间你可以玩手机,喝喝茶,能动,那就是非阻塞,如果厨师做好了菜,你需要亲自去拿,那就是同步,如果厨师做好了,菜由服务员直接送到你的餐桌,那就是非同步(异步)

现在回过头来看将阻塞转成非阻塞是否满足了我们的需求呢?看起来进程确实可以动了,但进程需要不断地以轮询数据的形式调用 accept,read/write 这此操作来询问内核数据是否就绪了,这些都是系统调用,对性能的消耗很大,而且会持续占用 CPU,导致 CPU 负载很高,远不如等数据就绪好了再通知进程去取更高效。这就好比,厨师做菜期间,你不断地去问菜做好了没有,显然没有意义,更高效的方式无疑是等厨师菜做好了主动通知你去取

IO 多路复用

经过前面的分析我们可以得出两个结论

  1. 使用多进程/多线程 IO 模型是不可行的,这一步可以优化为单线程

  2. 应该等数据就绪好了之后再通知用户进程去读取数据,而不是做毫无意义的轮询,注意这里的数据就绪不光是指前文所述的 read 的数据已就绪,而是泛指 accept,read/write 这三个事件的数据都已就绪

于是 IO 多路复用模型诞生了,它是指用一个进程来监听 listen socket(监听 socket) 的连接建立事件,connect socket(已连接 socket) 的读写事件(读写),一旦数据就绪,内核就会唤醒用户进程去处理这些 socket 的相应的事件

IO多路复用IO多路复用

这里简单介绍一下 fd(文件描述符),以便大家更好地了解之后 IO 多路复用中出现的 fd 集合等概念

文件系统简介

我们知道在 Linux 中无论是文件,socket,还是管道,设备等,一切皆文件,Linux 抽象出了一个 VFS(virtual file system) 层,屏蔽了所有的具体的文件,VFS 提供了统一的接口给上层调用,这样应用层只与 VFS 打交道,极大地方便了用户的开发,仔细对比你会发现,这和 Java 中的面向接口编程很类似

通过 open(),socket() 创建文件后,都有一个 fd(文件描述符) 与之对应,对于每一个进程,都有有一个文件描述符列表(File Discriptor Table) 来记录其打开的文件,这个列表的每一项都指向其背后的具体文件,而每一项对应的数组下标就是 fd,对于用户而言,可以认为 fd 代表其背后指向的文件

fd 的值从 0 开始,其中 0,1,2 是固定的,分别指向标准输入(指向键盘),标准输出/标准错误(指向显示器),之后每打开一个文件,fd 都会从 3 开始递增,但需要注意的是 fd 并不一定都是递增的,如果关闭了文件,之前的 fd 是可以被回收利用的

IO 多路复用其实也就是用一个进程来监听多个 fd 的数据事件,用户可以把自己感兴趣的 fd 及对应感兴趣的事件(accept,read/write)传给内核,然后内核就会检测 fd ,一旦某个 socket 有事件了,内核可以唤醒用户进程来处理

那么怎样才能知道某个 fd 是否有事件呢,一种很容易想到的做法是搞个轮询,每次调用一下 read(fd),让内核告知是否数据已就绪,但是这样的话如果有 n 个感兴趣的 fd 就会有 n 次 read 系统调用,开销很大,显然不可接受

所以使用 IO 多路复用监听 fd 的事件可行,但必须解决以下三个涉及到性能瓶颈的点

  1. 如何高效将用户感兴趣的 fd 和事件传给内核

  2. 某个 socket 数据就绪后,内核如何高效通知用户进程进行处理

  3. 用户进程如何高效处理事件

前面两步的处理目前有 select,poll,epoll 三种 IO 多路事件模型,我们一起来看看,看完你就会知道为啥 epoll 的性能是如此高效了

select

我们先来看下 select 函数的定义

返回:若有就绪描述符则为其数目,若超时则为 0,若出错则为-1int select(int maxfd, 
                     fd_set *readset, 
                     fd_set *writeset, 
                     fd_set *exceptset, 
                     const struct timeval *timeout);

maxfd 是待测试的描述符基数,为待测试的最大描述符加 1,readset,writeset,exceptset 分别为读描述符集合,写描述符集合,异常描述符集合,这三个分别通知内核,在哪些描述描述符上检测数据可以读,可写,有异常事件发生,timeout 可以设置阻塞时间,如果为 null 代表一直阻塞

这里需要说明一下,为啥 maxfd 为待测试的描述符加 1 呢,主要是因为数组的下标是从 0 开始的,假设进程新建了一个 listenfd,它的 fd 为 3,那么代表它有 4 个 感兴趣的 fd(每个进程有固定的 fd = 0,1,2 这三个描述符),由此可知 maxfd = 3 + 1 = 4

接下来我们来看看读,写,异常集合是怎么回事,如何设置针对 fd 的感兴趣事件呢,其实事件集合是采用了一种位结构(bitset)的方式,比如现在假设我们对标准输入(fd = 0),listenfd(fd = 3)的读事件感兴趣,那么就可以在 readset 对应的位上置 1

画外音: 使用 FD_SET 可将相应位置置1,如 FD_SET(listenfd, &readset)

如下

即 readset 为 {1, 0,0, 1},在调用 select 后会将 readset 传给内核,如果内核发现 listenfd 有连接已就绪的事件,则内核也会在将相应位置置 1(其他无就绪事件的 fd 对应的位置为 0)然后会回传给用户线程,此时的 readset 如下,即 {1,0,0,0}

于是进程就可以根据 readset 相应位置是否是 1(用 FD_ISSET(i, &read_set) 来判断)来判断读事件是否就绪了

需要注意的是由于 accept 的 socket 会越来越多,maxfd 和事件 set 都需要及时调整(比如新 accept 一个已连接的 socket,maxfd 可能会变,另外也需要将其加入到读写描述符集合中以让内核监听其读写事件)

可以看到 select 将感兴趣的事件集合存在一个数组里,然后一次性将数组拷贝给了内核,这样 n 次系统调用就转化为了一次,然后用户进程会阻塞在 select 系统调用上,由内核不断循环遍历,如果遍历后发现某些 socket 有事件(accept 或 read/write 准备好了),就会唤醒进程,并且会把数据已就绪的 socket 数量传给用户进程,动图如下

select 图解,图片来自《低并发编程》select 图解,图片来自《低并发编程》

select 的伪代码如下

int listen_fd,conn_fd; //监听套接字和已连接套接字的变量listen_fd = socket() //创建套接字bind(listen_fd)   //绑定套接字listen(listen_fd) //在套接字上进行监听,将套接字转为监听套接字fd_set master_rset;  //被监听的描述符集合,关注描述符上的读事件int max_fd = listen_fd//初始化 master_rset 数组,使用 FD_ZERO 宏设置每个元素为 0 FD_ZERO(&master_rset);//使用 FD_SET 宏设置 master_rset 数组中位置为 listen_fd 的文件描述符为 1,表示需要监听该文件描述符FD_SET(listen_fd,&master_rset);

fd_set working_set;while(1) {

   // 每次都要将 master_set copy 给 working_set,因为 select 返回后 working_set 会被内核修改     working_set = master_set
   /**
    * 调用 select 函数,检测 master_rset 数组保存的文件描述符是否已有读事件就绪,
    * 返回就绪的文件描述符个数,我们只关心读事件,所以其它参数都设置为 null 了
    */   nready = select(max_fd+1, &working_set, NULL, NULL, NULL);

   // 依次检查已连接套接字的文件描述符   for (i = 0; i < max_fd && nready > 0; i++) {
      // 说明 fd = i 的事件已就绪      if (FD_ISSET(i, &working_set)) {
          nready -= 1;
          //调用 FD_ISSET 宏,在 working_set 数组中检测 listen_fd 对应的文件描述符是否就绪         if (FD_ISSET(listen_fd, &working_set)) {
             //如果 listen_fd 已经就绪,表明已有客户端连接;调用 accept 函数建立连接             conn_fd = accept();
             //设置 master_rset 数组中 conn_fd 对应位置的文件描述符为 1,表示需要监听该文件描述符             FD_SET(conn_fd, &master_rset);
             if (conn_fd > max_fd) {
                max_fd = conn_fd;
             }
         } else {
            //有数据可读,进行读数据处理           read(i, xxx)
        }
      }
    }
}

看起来 select 很好,但在生产上用处依然不多,主要是因为 select 有以下劣势:

  1. 每次调用 select,都需要把 fdset 从用户态拷贝到内核态,在高并发下是个巨大的性能开销(可优化为不拷贝)

  2. 调用 select 阻塞后,用户进程虽然没有轮询,但在内核还是通过遍历的方式来检查 fd 的就绪状态(可通过异步 IO 唤醒的方式)

  3. select 只返回已就绪 fd 的数量,用户线程还得再遍历所有的 fd 查看哪些 fd 已准备好了事件(可优化为直接返回给用户进程数据已就绪的 fd 列表)

正在由于 1,2 两个缺点,所以 select 限制了 maxfd 的大小为 1024,表示只能监听 1024 个 fd 的事件,这离 C10k 显然还是有距离的

poll

poll 的机制其实和 select 一样,唯一比较大的区别其实是把 1024 这个限制给放开了,虽然通过放开限制可以使内核监听上万 socket,但由于以上说的两点劣势,它的性能依然不高,所以生产上也不怎么使用

epoll

接下来我们再来介绍下生产上用得最多的 epoll,epoll 其实和 select,poll 这两个系统调用不一样,它本来其实是个内核的数据结构,这个数据结构允许进程监听多个 socket 的 事件,一般我们通过 epoll_create 来创建这个实例

然后我们再调用 epoll_ctl 把 感兴趣的 fd 加入到 epoll 实例中的 interest_list,然后再调用 epoll_wait 即可将控制权交给内核,这样内核就会检测此 interest_list,如果发现 socket 已就绪就会把已就绪的 fd 加入到一个 ready_list(简称 rdlist) 中,然后再唤醒用户进程,之后用户进程只要遍历此 rdlist 即可

为了方便快速对 fd 进行增删改查,必须设计好 interest list 的数据结构,经综合考虑,内核使用了红黑树,而 rdlist 则采用了链表的形式,这样一旦在红黑树上发现了就绪的 socket ,就会把它放到 rdlist 中

epoll 的伪代码如下

int sock_fd,conn_fd; //监听套接字和已连接套接字的变量sock_fd = socket() //创建套接字bind(sock_fd)   //绑定套接字listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字epfd = epoll_create(); //创建epoll实例,//创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);//创建epoll_event变量struct epoll_event ee//监听读事件ee.events = EPOLLIN;//监听的文件描述符是刚创建的监听套接字ee.data.fd = sock_fd;//将监听套接字加入到监听列表中    epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); while (1) {
   //等待返回已经就绪的描述符    n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
   //遍历所有就绪的描述符        for (int i = 0; i < n; i++) {
       //如果是监听套接字描述符就绪,表明有一个新客户端连接到来        if (ep_events[i].data.fd == sock_fd) { 
          conn_fd = accept(sock_fd); //调用accept()建立连接          ee.events = EPOLLIN;  
          ee.data.fd = conn_fd;
          //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件                epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); 

       } else { //如果是已连接套接字描述符就绪,则可以读数据           ...//读取数据并处理           read(ep_events[i].data.fd, ..)
       }
   }
}

epoll 的动图如下

epoll 图解,图片来自《低并发编程》epoll 图解,图片来自《低并发编程》

可以看到 epoll 很好地解决了 select 的痛点

  1. 「每次调用 select 都把 fd 集合拷贝给内核」优化为「只有第一次调用 epoll_ctl 添加感兴趣的 fd 到内核的 epoll 实例中,之后只要调用 epoll_wait 即可,数据集合不再需要拷贝」

  2. 「用户进程调用 select 阻塞后,内核会通过遍历的方式来同步检查 fd 的就绪状态」优化为「内核使用异步事件通知」

  3. 「select 仅返回已就绪 fd 的数量,用户线程还得再遍历一下所有的 fd 来挨个检查哪个 fd 的事件已就绪了」优化为「内核直接返回已就绪的 fd 集合」

除了以上针对 select 痛点进行的改进之外,epoll 还引入了一种边缘触发(edge trigger,ET)的模式,这种模式也会让 epoll 在高并发下的表现更加优秀,而 select/poll 则只有水平触发模式(level trigger,LT),首先我们来了解一下什么是水平触发和边缘触发

  • 水平触发:只要读缓冲区可读(或可写缓冲区可写),就会一直触发可读(或可写)信号

  • 边缘触发:当套接字的缓冲状态发生变化时才会触发读写信号。 对于读缓冲,有新到达的数据被添加到读缓冲时才触发

对于水平触发而言,只要缓冲区里还有数据,内核就会不停地触发读事件,也就意味着如果收到了大量的数据而应用程序每次只会读取一小部分数据时就会不停地从内核态切换到用户态,浪费大量的内核资源,而对于边缘触发而言,只有在套接字的缓冲状态发生变化(即新收到数据或刚好发出数据)时才会触发读写信号,也就意味着内核只通知唤醒用户进程一次,这在高并发下无疑是更佳选择,当然了也正是由于边缘触发模式下内核只会触发一次的原因,read 要尽可能地将数据全部读走(一般是在一个循环里不断地 read ,直到没有数据),否则一旦没有新的数据进来,缓冲区中剩余的数据就无法读取了

既然 epoll 这么好,那么它的性能到底比 select,poll 强多少呢,关于这一点,我们最好做对其进行做下压测,我们来看下 libevent 框架对 select,poll,Epoll,Kqueue(可以认为是 mac 下的 epoll)的压测数据

640640

可以看到,在 100 活跃连接(所谓活跃连接就是读写比较频繁),每个连接发生 1000 次读写操作的情况下,随着句柄数量的增加,epoll 和 Kqueue 的响应时间几乎不变,而 select 和 poll 的响应时间则是急遽增加,所以 epoll 非常适合应对大量网络连接,少量活跃连接的情况

不过需要注意一下这里的限制条件:epoll 在应对大量网络连接时,只有在活跃连接数较少的情况下性能才表现优异,如果图中 15000 的网络连接都是活跃连接,那么 epoll 和 select 的表现是差不多的,甚至有可能 epoll 还不如 select,为什么会这样呢?

  1. select/poll 的开销主要是因为无论就绪的 fd 有多少,都要遍历一遍全部的 fd 来找到就绪的 fd 再处理,如果活跃连接数很少,那么很多时间都浪费在遍历上了,但如有很多活跃连接,那遍历的开销就可忽略不计

  2. 为什么活跃连接多,epoll 表现反而不佳呢,其实主要是因为在唤醒过程中 epoll 实现较为复杂,比如为了保证就绪队列的写入安全,使用了自旋锁,如下

    static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key){
       int pwake = 0;
       unsigned long flags;
       struct epitem *epi = ep_item_from_wait(wait);
       struct eventpoll *ep = epi->ep;
       int ewake = 0;
    
       /* 获得自旋锁 ep->lock来保护就绪队列
        * 自旋锁ep->lock在 epoll_wait() 调用的 ep_poll() 里被释放
        * /
       spin_lock_irqsave(&ep->lock, flags);
    
       /* If this file is already in the ready list we exit soon */   /* 在这里将就绪事件添加到 rdllist */   if (!ep_is_linked(&epi->rdllink)) {
           list_add_tail(&epi->rdllink, &ep->rdllist);
           ep_pm_stay_awake_rcu(epi);
       }
       ...
    }

    这样的话如果活跃连接多的话,锁的开销就比较大了

IO 多路复用+非阻塞

那么当 IO 多路程复用检查到数据就绪后(select(),poll(),epoll_wait() 返回后),该怎么处理呢,有人说直接 accept,read/write 不就完了,话是没错,但之前我们也说了,这些操作其实默认是阻塞的,我们需要将其改成非阻塞,为什么呢,数据不是已经就绪了吗,说明 accept,read/write 这些操作可以正常获取数据啊?

其实数据就绪只是说在调用 select(),poll(),epoll_wait() 返回时的数据是就绪的,但当你再去调用 accept,read/write 时可能就会变成未就绪了,举个例子:当某个 socket 接收缓冲区有新数据分节到达,然后 select 报告这个 socket 描述符可读,但随后,协议栈检查到这个新分节有错误,然后丢弃这个分节,这时候调用 read 则无数据可读,这样的话就会产生一个严重的后果:由于线程阻塞在了 read 上,便再也不能调用 select 来监听 socket 事件了,所以 IO 多路程复用一定要和非阻塞配合使用,也就是说要把 listenfd 和 connectfd 设置为非阻塞才行

Reactor 模式

经过以上介绍相信大家对 IO 多路复用的原理有了比较深刻的理解,我们知道 IO 多路程复用是用一个进程来管理多个 socket 的, 那么是否还有优化的空间呢,我们以 select 为例来拆解一下 IO 多路复用的流程

主要流程如下:调用 select 来监听连接,读写事件,收到事件后判断是否是监听 socket 上的事件,是的话调用 accept(),否则判断是否是已连接 socket 上的读写事件,是的话调用 read(),write()

单进程/线程 Reactor

目前这样的写法没有问题,不过所有的逻辑都藕合在一起,可扩展性不是很好,我们可以将相近的功能划分到同一个模块(以类的形式)中如下

我们将其分成三个模块,Reactor, Acceptor,Handler,主要工作流程如下

  1. Reactor 对象首先调用 select 来监听 socket 事件,收到事件后会通过 dispatch 分发

  2. 如果是连接建立事件,则由 Acceptor 处理,Acceptor 通过调用 accept 接收连接,并且会创建一个 Handler 来处理后续的读写等事件

  3. 如果不是连接建立事件,则 Reactor 会调用连接对应的 Handler 进行响应,handler 会完成 read,业务处理,write 的完整业务流程

以上这些操作其实和之前的 IO 多路复用一样,所有的的都是由一个进程进行操作的,这里多了一个新名词 Reactor,它指的是对事件的响应,如果来了一个事件就把相应的事件 dispatch 给对应的 acceptor 或 handler 对象来处理,由于整个操作都在一个进程里处理,我们把这种模式称为单 Reactor 模型,单 Reactor 要求对事件的响应要快,比如对数据业务的处理要快,像 Redis 就很适合,因为它的数据都是基于内存操作的(当然像 bigKey 这种异常场景除外)

单 Reactor 多线程模型

如果在单进程 Reactor 模型中,业务处理耗时较长,那么线程就会被阻塞,就无法再处理其它事件了,可能会造成严重的性能问题,而且单进程 Reactor 还有一个劣势,那就是无法充分复用多核的优势,于是人们又提出了 单 Reactor 多线程模型,即把业务处理这一块放到一个线程池中处理

通过这种方式主进程的压力得到了释放,也充分复用了多核优势来提升并发度

但依然有如下两个瓶颈点

  1. 子线程处理好数据后需要将其传给 handler 进行发送处理,这涉及到共享数据的互斥和保护机制

  2. 主进程承担的所有事件的监听和响应,瞬时的高并发可能成为性能瓶颈

多 Reactor 多进程/线程模型

为以解决单 Reactor 多线程模型存在的两个问题,人们又提出了多 Reactor 多进程/线程模型模块,示意图如下

工作原理如下

  1. 主进程主要负责 accept 连接,接收后会将其传给 subReactor,subReactor 将其连接加入连接队列中来监控其事件

  2. 一旦子进程中有新的事件被监听到了,则 subReactor 会将其交给 Handler 进入处理

使用这种方式由于数据的 read,业务处理,write 都在一个线程中处理,所以避免了数据的同步加锁操作,父子进程职责很明确,父进程负责 accept,子进程则负责完成后续业务处理

以上介绍的只是标准的 Reactor 模型,但实际上生产上应用的 Reactor 不一定完全遵照这些标准,可能会有一些变化,比如 Nginx 的 Reactor 虽然也是多 Reactor 多进程模型,但它是一种变体:每个子进程都监听了同一个端口,内核接收到连接已建立的事件后会通过负载均衡的方式将其转给其中一个子进程,然后子进程会将其加入到连接队列中监控其事件,监控到事件后也不会转交给其他线程而是自己处理

总结

随着互联网的发展,server 面对的连接越来越多,传统的网络 IO 模型由于在 connect,accept,read/write 这三步中会有阻塞操作,显然无法满足我们 C10K 要求,于是人们提出了多进程/线程模型,每接收一个连接分配一个进程/线程来负责后续的交互,但创建进程/线程本身需要创建堆栈,复制页表等资源,而且进程/线程的上下文切换涉及到用户态,内核态的切换,会造成严重的性能瓶颈,那么把阻塞操作改成非阻塞呢,这样做进程/线程确实是不会被阻塞了,但也意味着 CPU 会做大量的无用功,承担不必要的高负载

综合考虑人们提出了 IO 多路复用这种先进的理念,即一个线程管理监听多个 socket 的事件,当然了其实将 socket 设置成非阻塞的,然后让一个线程不断地去轮询也是能达到一个线程监听多个 socket 的目的,但这样做需要对每个 socket 调用一个 read 系统调用,所以 IO 多路复用还有另一层更重要的意义:将多个系统调用转成一个系统调用再交给内核去监听事件。

IO 多路复用主要有三个模型,select,poll,epoll,每一个都是对前者的改进:

  1. select 是每次调用时都要把 fd 集合拷贝给内核,而且内核是通过不断遍历这种同步的方式来检查 fd 是否有事件就绪,并且一旦检测到有事件后也只是返回有就绪事件的 fd 的个数,用户线程也需要遍历 fd 集合来查看 fd 是否已就绪,select 限制了 fd 的集合只能有 1024 个

  2. poll 和 select 的原理差不多,只不过是放开了 1024 个 fd 的限制,fd 的个数可以任意设置,这样也就让支持 C10k 成为了可能,但由于它的实现机制与 select 类似,所以也存在和 select 一样的性能瓶颈

  3. 为了彻底解决 select,poll 的性能瓶颈,epoll 出现了,它把需要监听的 fd 传给内核 epoll 实例,epoll 以红黑树的形式来管理这些 fd,这样每次 epoll 只需调用 epoll_wait 即可将控制权转给内核来监听 fd 的事件,避免了无意义的 fd 集合的拷贝,同时由于红黑树的高效,一旦某个 socket 来事件了,可以迅速从红黑树中查找到相应的 socket,然后再唤醒用户进程,此过程是异步的,比起 select 的同步唤醒又是一大进步,此时唤醒用户进程后,内核会把数据已就绪的 fd 放到一个就绪列表里传给用户进程,用户进程只需要遍历此就绪列表即可,比起 select 需要全部遍历也是一大进步,除此之外 epoll 引入了边缘触发让其在高并发下的表现也更加优异,正是由于 epoll 对 select,poll 的这些改进,也让它成为了 IO 多路复用绝对的王者

IO 多路复用是指用一个进程/线程去监听多个 socket 的事件,也即意味着一旦事件就绪了进程需要快速地处理这些事件(不然其他 socket 事件的处理的会阻塞),这种对 Redis 非常合适,因为它是基于内存操作的,处理非常快,但对于其它的开源框架,只有一个进程/线程显然是不太满足业务需要的,比如业务处理,可能是 CPU 密集型的,如果只一个进程/线程的话,可能会阻塞在业务处理上,于是人们基于 IO 多路复用又提出了 Reactor 模型,Reactor 即对事件的反应,然后派发事件给相应的处理器,Reator 模型有多个变种,如单 Reactor,单 Reactor 多线程,多 Reacor 多进程/线程模型,这三种模型各有各的优势,主要是为了充分利用多线程/多核来提升性能或是为了避免瞬时的高并发让主线程崩溃

由此可见,网络 IO 模型经历了传统的网络 IO 模型 ---> IO 多路程复用(select,poll,epoll) --> Reactor 模型这三个阶段,主要是为了满足日益增长的 C10k 甚至 C100K 等超高连接数的要求。


从零开始自己动手写阻塞队列

前言

在我们平时编程的时候一个很重要的工具就是容器,在本篇文章当中主要给大家介绍阻塞队列的原理,并且在了解原理之后自己动手实现一个低配版的阻塞队列。

需求分析

在前面的两篇文章ArrayDeque(JDK双端队列)源码深度剖析深入剖析(JDK)ArrayQueue源码当中我们仔细介绍了队列的原理,如果大家感兴趣可以查看一下!

而在本篇文章所谈到的阻塞队列当中,是在并发的情况下使用的,上面所谈到的是队列是并发不安全的,但是阻塞队列在并发下情况是安全的。阻塞队列的主要的需求如下:

  • 队列基础的功能需要有,往队列当中放数据,从队列当中取数据。

  • 所有的队列操作都要是并发安全的。

  • 当队列满了之后再往队列当中放数据的时候,线程需要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程需要被唤醒。

  • 当队列空了之后再往队列当中取数据的时候,线程需要被挂起,当有线程往队列当中加入数据的时候被挂起的线程需要被唤醒。

  • 在我们实现的队列当中我们使用数组去存储数据,因此在构造函数当中需要提供数组的初始大小,设置用多大的数组。

阻塞队列实现原理

线程阻塞和唤醒

在上面我们已经谈到了阻塞队列是并发安全的,而且我们还有将线程唤醒和阻塞的需求,因此我们可以选择可重入锁ReentrantLock保证并发安全,但是我们还需要将线程唤醒和阻塞,因此我们可以选择条件变量Condition进行线程的唤醒和阻塞操作,在Condition当中我们将会使用到的,主要有以下两个函数:

  • signal用于唤醒线程,当一个线程调用Conditionsignal函数的时候就可以唤醒一个被await函数阻塞的线程。

  • await用于阻塞线程,当一个线程调用Conditionawait函数的时候这个线程就会阻塞。

数组循环使用

因为队列是一端进一端出,因此队列肯定有头有尾。

当我们往队列当中加入一些数据之后,队列的情况可能如下:

在上图的基础之上我们在进行四次出队操作,结果如下:

在上面的状态下,我们继续加入8个数据,那么布局情况如下:

我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。

为了保证数组的循环使用,我们需要用一个变量记录队列头在数组当中的位置,用一个变量记录队列尾部在数组当中的位置,还需要有一个变量记录队列当中有多少个数据。

代码实现

成员变量定义

根据上面的分析我们可以知道,在我们自己实现的类当中我们需要有如下的类成员变量:

// 用于保护临界区的锁private final ReentrantLock lock;// 用于唤醒取数据的时候被阻塞的线程private final Condition notEmpty;// 用于唤醒放数据的时候被阻塞的线程private final Condition notFull;// 用于记录从数组当中取数据的位置 也就是队列头部的位置private int takeIndex;// 用于记录从数组当中放数据的位置 也就是队列尾部的位置private int putIndex;// 记录队列当中有多少个数据private int count;// 用于存放具体数据的数组private Object[] items;

构造函数

我们的构造函数也很简单,最核心的就是传入一个数组大小的参数,并且给上面的变量进行初始化赋值。

@SuppressWarnings("unchecked")public MyArrayBlockingQueue(int size) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.notFull = lock.newCondition();// 其实可以不用初始化 类会有默认初始化 默认初始化为0takeIndex = 0;putIndex = 0;count = 0;// 数组的长度肯定不能够小于0if (size <= 0)throw new RuntimeException("size can not be less than 1");items = (E[])new Object[size];}

put函数

这是一个比较重要的函数了,在这个函数当中如果队列没有满,则直接将数据放入到数组当中即可,如果数组满了,则需要将线程挂起。

public void put(E x){// put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程// 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作// 因此这里需要上锁lock.lock();try {// 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了// 这个时候需要将线程挂起while (count == items.length)notFull.await(); // 将调用 await的线程挂起// 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了// 这个时候需要将数组入队 // 调用入队函数将数据入队enqueue(x);} catch (InterruptedException e) {e.printStackTrace();} finally {// 解锁lock.unlock();}}// 将数据入队private void enqueue(E x) {this.items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒}

offer函数

offer函数和put函数一样,但是与put函数不同的是,当数组当中数据填满之后offer函数返回false,而不是被阻塞。

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 如果数组满了 则直接返回false 而不是被阻塞if (count == items.length)return false;else {// 如果数组没有满则直接入队 并且返回 trueenqueue(e);return true;}} finally {lock.unlock();}}

add函数

这个函数和上面两个函数作用一样,也是往队列当中加入数据,但当单队列满了之后这个函数会抛出异常。

public boolean add(E e) {if (offer(e))return true;elsethrow new RuntimeException("Queue full");}

take函数

这个函数主要是从队列当中取出一个数据,但是当队列为空的时候,这个函数会阻塞调用该函数的线程:

public E take() throws InterruptedException {// 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据// 进行加锁操作lock.lock();try {// 当 count 等于0 说明队列为空// 需要将线程挂起等待while (count == 0)notEmpty.await();// 当被唤醒之后进行出队操作return dequeue();}finally {lock.unlock();}}private E  dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了if (++takeIndex == items.length)takeIndex = 0;count--; // 队列当中数据少一个了// 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程// 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒notFull.signal(); // 唤醒一个被 put 函数阻塞的线程return x;}

重写toString函数

因为我们在后面的测试函数当中会打印我们这个类,而打印这个类的时候会调用对象的toString方法得到一个字符串,最后打印这个字符串。

@Overridepublic String toString() {StringBuilder stringBuilder = new StringBuilder();stringBuilder.append("[");// 这里需要上锁 因为我们在打印的时候需要打印所有的数据// 打印所有的数据就需要对数组进行遍历操作 而在进行遍历// 操作的时候是不能进行插入和删除操作的 因为打印的是某// 个时刻的数据lock.lock();try {if (count == 0)stringBuilder.append("]");else {int cur = 0;// 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count// 个数据while (cur != count) {// 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");cur += 1;}// 删除掉最后一次没用的 ", "stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());stringBuilder.append(']');}}finally {lock.unlock();}return stringBuilder.toString();}

完整代码

整个我们自己完成的阻塞队列的代码如下:

import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class MyArrayBlockingQueue<E> {// 用于保护临界区的锁private final ReentrantLock lock;// 用于唤醒取数据的时候被阻塞的线程private final Condition notEmpty;// 用于唤醒放数据的时候被阻塞的线程private final Condition notFull;// 用于记录从数组当中取数据的位置 也就是队列头部的位置private int takeIndex;// 用于记录从数组当中放数据的位置 也就是队列尾部的位置private int putIndex;// 记录队列当中有多少个数据private int count;// 用于存放具体数据的数组private Object[] items;@SuppressWarnings("unchecked")public MyArrayBlockingQueue(int size) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.notFull = lock.newCondition();// 其实可以不用初始化 类会有默认初始化 默认初始化为0takeIndex = 0;putIndex = 0;count = 0;if (size <= 0)throw new RuntimeException("size can not be less than 1");items = (E[])new Object[size];}public void put(E x){lock.lock();try {while (count == items.length)notFull.await();enqueue(x);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}private void enqueue(E x) {this.items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}private E  dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;notFull.signal();return x;}public boolean add(E e) {if (offer(e))return true;elsethrow new RuntimeException("Queue full");}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}public E take() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await();return dequeue();}finally {lock.unlock();}}@Overridepublic String toString() {StringBuilder stringBuilder = new StringBuilder();stringBuilder.append("[");lock.lock();try {if (count == 0)stringBuilder.append("]");else {int cur = 0;while (cur != count) {stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");cur += 1;}stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());stringBuilder.append(']');}}finally {lock.unlock();}return stringBuilder.toString();}}

现在对上面的代码进行测试:

我们现在使用阻塞队列模拟一个生产者消费者模型,设置阻塞队列的大小为5,生产者线程会往队列当中加入数据,数据为0-9的10个数字,消费者线程一共会消费10次。

import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);Thread thread = new Thread(() -> {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i);queue.put(i);}}, "生产者");Thread thread1 = new Thread(() -> {for (int i = 0; i < 10; i++) {try {System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take());System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue);} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者");thread.start();TimeUnit.SECONDS.sleep(3);thread1.start();}}

上面代码的输出如下所示:

生产者 往队列当中加入数据:0生产者 往队列当中加入数据:1生产者 往队列当中加入数据:2生产者 往队列当中加入数据:3生产者 往队列当中加入数据:4生产者 往队列当中加入数据:5消费者 从队列当中取出数据:0生产者 往队列当中加入数据:6消费者 当前队列当中的数据:[1, 2, 3, 4, 5]消费者 从队列当中取出数据:1消费者 当前队列当中的数据:[2, 3, 4, 5]消费者 从队列当中取出数据:2消费者 当前队列当中的数据:[3, 4, 5, 6]生产者 往队列当中加入数据:7消费者 从队列当中取出数据:3消费者 当前队列当中的数据:[4, 5, 6, 7]消费者 从队列当中取出数据:4消费者 当前队列当中的数据:[5, 6, 7]消费者 从队列当中取出数据:5消费者 当前队列当中的数据:[6, 7]生产者 往队列当中加入数据:8消费者 从队列当中取出数据:6消费者 当前队列当中的数据:[7, 8]消费者 从队列当中取出数据:7消费者 当前队列当中的数据:[8]消费者 从队列当中取出数据:8消费者 当前队列当中的数据:[]生产者 往队列当中加入数据:9消费者 从队列当中取出数据:9消费者 当前队列当中的数据:[]

从上面的输出结果我们知道,生产者线程打印5之后被挂起了,因为如果没有被挂起,生产者线程肯定可以一次性输出完成,因为消费者线程阻塞了3秒。但是他没有输出完成说明在打印5之后,因为阻塞队列满了,因而生产者线程被挂起了。然后消费者开始消费,这样阻塞队列当中就有空间了,生产者线程就可以继续生产了。

总结

在本篇文章当中,主要向大家介绍了阻塞队列的原理并且实现了一个低配版的数组阻塞队列,其实如果你了解数组队列和锁的话,这个代码实现起来还是相对比较简单的,我们只需要使用锁去保证我们的程序并发安全即可。

  • 我们在实现put函数的时候,如果当前队列已经满了,则当前线程需要调用await函数进行阻塞,当线程被唤醒或者队列没有满可以继续执行的时候,我们在往队列当中加入数据之后需要调用一次signal函数,因为这样可以唤醒在调用take函数的时候因为队列空而阻塞的线程。

  • 我们实现take函数的时候,如果当前队列已经空了,则当前线程也需要调用await函数进行阻塞,当线程被唤醒或者队列不为空线程可以继续执行,在出队之后需要调用一次signal函数,因为这样可以唤醒在调用put函数的时候因为队列满而阻塞的线程。


以上就是本篇文章的所有内容了,我是LeHung,我们下期再见!!!更多精彩内容合集可访问项目:https://github.com/Chang-LeHung/CSCore


前言

我们在写并发程序的时候,一个非常常见的需求就是保证在某一个时刻只有一个线程执行某段代码,像这种代码叫做临界区,而通常保证一个时刻只有一个线程执行临界区的代码的方法就是锁。在本篇文章当中我们将会仔细分析和学习自旋锁,所谓自旋锁就是通过while循环实现的,让拿到锁的线程进入临界区执行代码,让没有拿到锁的线程一直进行while死循环,这其实就是线程自己“旋”在while循环了,因而这种锁就叫做自旋锁。

自旋锁

原子性

在谈自旋锁之前就不得不谈原子性了。所谓原子性简单说来就是一个一个操作要么不做要么全做,全做的意思就是在操作的过程当中不能够被中断,比如说对变量data进行加一操作,有以下三个步骤:

  • data从内存加载到寄存器。

  • data这个值加一。

  • 将得到的结果写回内存。

原子性就表示一个线程在进行加一操作的时候,不能够被其他线程中断,只有这个线程执行完这三个过程的时候其他线程才能够操作数据data

我们现在用代码体验一下,在Java当中我们可以使用AtomicInteger进行对整型数据的原子操作:

import java.util.concurrent.atomic.AtomicInteger;public class AtomicDemo {public static void main(String[] args) throws InterruptedException {AtomicInteger data = new AtomicInteger();data.set(0); // 将数据初始化位0Thread t1 = new Thread(() -> {for (int i = 0; i < 100000; i++) {data.addAndGet(1); // 对数据 data 进行原子加1操作}});Thread t2 = new Thread(() -> {for (int i = 0; i < 100000; i++) {data.addAndGet(1);// 对数据 data 进行原子加1操作}});// 启动两个线程t1.start();t2.start();// 等待两个线程执行完成t1.join();t2.join();// 打印最终的结果System.out.println(data); // 200000}}

从上面的代码分析可以知道,如果是一般的整型变量如果两个线程同时进行操作的时候,最终的结果是会小于200000。

我们现在来模拟一下一般的整型变量出现问题的过程:

  • 主内存data的初始值等于0,两个线程得到的data初始值都等于0。

  • 现在线程一将data加一,然后线程一将data的值同步回主内存,整个内存的数据变化如下:

  • 现在线程二data加一,然后将data的值同步回主内存(将原来主内存的值覆盖掉了):

我们本来希望data的值在经过上面的变化之后变成2,但是线程二覆盖了我们的值,因此在多线程情况下,会使得我们最终的结果变小。

但是在上面的程序当中我们最终的输出结果是等于20000的,这是因为给data进行+1的操作是原子的不可分的,在操作的过程当中其他线程是不能对data进行操作的。这就是原子性带来的优势。

自己动手写自旋锁

AtomicInteger类

现在我们已经了解了原子性的作用了,我们现在来了解AtomicInteger类的另外一个原子性的操作——compareAndSet,这个操作叫做比较并交换(CAS),他具有原子性。

public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger();atomicInteger.set(0);atomicInteger.compareAndSet(0, 1);}

compareAndSet函数的意义:首先会比较第一个参数(对应上面的代码就是0)和atomicInteger的值,如果相等则进行交换,也就是将atomicInteger的值设置为第二个参数(对应上面的代码就是1),如果这些操作成功,那么compareAndSet函数就返回true,如果操作失败则返回false,操作失败可能是因为第一个参数的值(期望值)和atomicInteger不相等,如果相等也可能因为在更改atomicInteger的值的时候失败(因为可能有多个线程在操作,因为原子性的存在,只能有一个线程操作成功)。

自旋锁实现原理

我们可以使用AtomicInteger类实现自旋锁,我们可以用0这个值表示未上锁,1这个值表示已经上锁了。

  • AtomicInteger类的初始值为0。

  • 在上锁时,我们可以使用代码atomicInteger.compareAndSet(0, 1)进行实现,我们在前面已经提到了只能够有一个线程完成这个操作,也就是说只能有一个线程调用这行代码然后返回true其余线程都返回false,这些返回false的线程不能够进入临界区,因此我们需要这些线程停在atomicInteger.compareAndSet(0, 1)这行代码不能够往下执行,我们可以使用while循环让这些线程一直停在这里while (!value.compareAndSet(0, 1));,只有返回true的线程才能够跳出循环,其余线程都会一直在这里循环,我们称这种行为叫做自旋,这种锁因而也被叫做自旋锁

  • 线程在出临界区的时候需要重新将锁的状态调整为未上锁的上状态,我们使用代码value.compareAndSet(1, 0);就可以实现,将锁的状态还原为未上锁的状态,这样其他的自旋的线程就可以拿到锁,然后进入临界区了。

自旋锁代码实现

import java.util.concurrent.atomic.AtomicInteger;public class SpinLock {// 0 表示未上锁状态// 1 表示上锁状态protected AtomicInteger value;public SpinLock() {this.value = new AtomicInteger();// 设置 value 的初始值为0 表示未上锁的状态this.value.set(0);}public void lock() {// 进行自旋操作while (!value.compareAndSet(0, 1));}public void unlock() {// 将锁的状态设置为未上锁状态value.compareAndSet(1, 0);}}

上面就是我们自己实现的自旋锁的代码,这看起来实在太简单了,但是它确实帮助我们实现了一个锁,而且能够在真实场景进行使用的,我们现在用代码对上面我们写的锁进行测试。

测试程序:

public class SpinLockTest {public static int data;public static SpinLock lock = new SpinLock();public static void add() {for (int i = 0; i < 100000; i++) {// 上锁 只能有一个线程执行 data++ 操作 其余线程都只能进行while循环lock.lock();data++;lock.unlock();}}public static void main(String[] args) throws InterruptedException {Thread[] threads = new Thread[100];// 设置100个线程for (int i = 0; i < 100; i ++) {threads[i] = new Thread(SpinLockTest::add);}// 启动一百个线程for (int i = 0; i < 100; i++) {threads[i].start();}// 等待这100个线程执行完成for (int i = 0; i < 100; i++) {threads[i].join();}System.out.println(data); // 10000000}}

在上面的代码单中,我们使用100个线程,然后每个线程循环执行100000data++操作,上面的代码最后输出的结果是10000000,和我们期待的结果是相等的,这就说明我们实现的自旋锁是正确的。

自己动手写可重入自旋锁

可重入自旋锁

在上面实现的自旋锁当中已经可以满足一些我们的基本需求了,就是一个时刻只能够有一个线程执行临界区的代码。但是上面的的代码并不能够满足重入的需求,也就是说上面写的自旋锁并不是一个可重入的自旋锁,事实上在上面实现的自旋锁当中重入的话就会产生死锁。

我们通过一份代码来模拟上面重入产生死锁的情况:

public static void add(int state) throws InterruptedException {TimeUnit.SECONDS.sleep(1);if (state <= 3) {lock.lock();System.out.println(Thread.currentThread().getName() + "\t进入临界区 state = " + state);for (int i = 0; i < 10; i++)data++;add(state + 1); // 进行递归重入 重入之前锁状态已经是1了 因为这个线程进入了临界区lock.unlock();}}
  • 在上面的代码当中加入我们传入的参数state的值为1,那么在线程执行for循环之后再次递归调用add函数的话,那么state的值就变成了2。

  • if条件仍然满足,这个线程也需要重新获得锁,但是此时锁的状态是1,这个线程已经获得过一次锁了,但是自旋锁期待的锁的状态是0,因为只有这样他才能够再次获得锁,进入临界区,但是现在锁的状态是1,也就是说虽然这个线程获得过一次锁,但是它也会一直进行while循环而且永远都出不来了,这样就形成了死锁了。

可重入自旋锁思想

针对上面这种情况我们需要实现一个可重入的自旋锁,我们的思想大致如下:

  • 在我们实现的自旋锁当中,我们可以增加两个变量,owner一个用于存当前拥有锁的线程,count一个记录当前线程进入锁的次数。

  • 如果线程获得锁,owner = Thread.currentThread()并且count = 1

  • 当线程下次再想获取锁的时候,首先先看owner是不是指向自己,则一直进行循环操作,如果是则直接进行count++操作,然后就可以进入临界区了。

  • 我们在出临界区的时候,如果count大于一的话,说明这个线程重入了这把锁,因此不能够直接将锁设置为0也就是未上锁的状态,这种情况直接进行count--操作,如果count等于1的话,说明线程当前的状态不是重入状态(可能是重入之后递归返回了),因此在出临界区之前需要将锁的状态设置为0,也就是没上锁的状态,好让其他线程能够获取锁。

可重入锁代码实现:

实现的可重入锁代码如下:

public class ReentrantSpinLock extends SpinLock {private Thread owner;private int count;@Overridepublic void lock() {if (owner == null || owner != Thread.currentThread()) {while (!value.compareAndSet(0, 1));owner = Thread.currentThread();count = 1;}else {count++;}}@Overridepublic void unlock() {if (count == 1) {count = 0;value.compareAndSet(1, 0);}elsecount--;}}

下面我们通过一个递归程序去验证我们写的可重入的自旋锁是否能够成功工作。

测试程序:

import java.util.concurrent.TimeUnit;public class ReentrantSpinLockTest {public static int data;public static ReentrantSpinLock lock = new ReentrantSpinLock();public static void add(int state) throws InterruptedException {TimeUnit.SECONDS.sleep(1);if (state <= 3) {lock.lock();System.out.println(Thread.currentThread().getName() + "\t进入临界区 state = " + state);for (int i = 0; i < 10; i++)data++;add(state + 1);lock.unlock();}}public static void main(String[] args) throws InterruptedException {Thread[] threads = new Thread[10];for (int i = 0; i < 10; i++) {threads[i] = new Thread(new Thread(() -> {try {ReentrantSpinLockTest.add(1);} catch (InterruptedException e) {e.printStackTrace();}}, String.valueOf(i)));}for (int i = 0; i < 10; i++) {threads[i].start();}for (int i = 0; i < 10; i++) {threads[i].join();}System.out.println(data);}}

上面程序的输出:

Thread-3	进入临界区 state = 1Thread-3	进入临界区 state = 2Thread-3	进入临界区 state = 3Thread-0	进入临界区 state = 1Thread-0	进入临界区 state = 2Thread-0	进入临界区 state = 3Thread-9	进入临界区 state = 1Thread-9	进入临界区 state = 2Thread-9	进入临界区 state = 3Thread-4	进入临界区 state = 1Thread-4	进入临界区 state = 2Thread-4	进入临界区 state = 3Thread-7	进入临界区 state = 1Thread-7	进入临界区 state = 2Thread-7	进入临界区 state = 3Thread-8	进入临界区 state = 1Thread-8	进入临界区 state = 2Thread-8	进入临界区 state = 3Thread-5	进入临界区 state = 1Thread-5	进入临界区 state = 2Thread-5	进入临界区 state = 3Thread-2	进入临界区 state = 1Thread-2	进入临界区 state = 2Thread-2	进入临界区 state = 3Thread-6	进入临界区 state = 1Thread-6	进入临界区 state = 2Thread-6	进入临界区 state = 3Thread-1	进入临界区 state = 1Thread-1	进入临界区 state = 2Thread-1	进入临界区 state = 3300

从上面的输出结果我们就可以知道,当一个线程能够获取锁的时候他能够进行重入,而且最终输出的结果也是正确的,因此验证了我们写了可重入自旋锁是有效的!

总结

在本篇文章当中主要给大家介绍了自旋锁和可重入自旋锁的原理,并且实现了一遍,其实代码还是比较简单关键需要大家将这其中的逻辑理清楚:

  • 所谓自旋锁就是通过while循环实现的,让拿到锁的线程进入临界区执行代码,让没有拿到锁的线程一直进行while死循环。

  • 可重入的含义就是一个线程已经竞争到了一个锁,在竞争到这个锁之后又一次有重入临界区代码的需求,如果能够保证这个线程能够重新进入临界区,这就叫可重入。

  • 我们在实现自旋锁的时候使用的是AtomicInteger类,并且我们使用0和1这两个数值用于表示无锁和锁被占用两个状态,在获取锁的时候使用while循环不断进行CAS操作,直到操作成功返回true,在释放锁的时候使用CAS将锁的状态从1变成0。

  • 实现可重入锁最重要的一点就是需要记录是那个线程获得了锁,同时还需要记录获取了几次锁,因为我们在解锁的时候需要进行判断,之后count = 1的情况才能将锁的状态从1设置成0。


以上就是本篇文章的所有内容了,我是LeHung,我们下期再见!!!更多精彩内容合集可访问项目:https://github.com/Chang-LeHung/CSCore

一、面向对象与面向过程的区别

面向过程就是分析出解决问题所需要的步骤,然后用函数把这些步骤一步一步实现,使用的时候一个一个依次调用就可以了;

面向对象是把构成问题事务分解成各个对象,建立对象的目的不是为了完成一个步骤,而是为了描叙某个事物在整个解决问题的步骤中的行为。

可以拿生活中的实例来理解面向过程与面向对象,例如五子棋,面向过程的设计思路就是首先分析问题的步骤:

1、开始游戏,

2、黑子先走,

3、绘制画面,

4、判断输赢,

5、轮到白子,

6、绘制画面,

7、判断输赢,

8、返回步骤2,

9、输出最后结果。

把上面每个步骤用不同的方法来实现。

如果是面向对象的设计思想来解决问题 面向对象的设计则是从另外的思路来解决问题

整个五子棋可以分为

1、黑白双方,这两方的行为是一模一样的,

2、棋盘系统,负责绘制画面,

3、规则系统,负责判定诸如犯规、输赢等。

第一类对象(玩家对象)负责接受用户输入,并告知第二类对象(棋盘对象)棋子布局的变化,棋盘对象接收到了棋子的变化就要负责在屏幕上面显示出这种变化,同时利用第三类对象(规则系统)来对棋局进行判定。

可以明显地看出,面向对象是以功能来划分问题,而不是步骤。

同样是绘制棋局,这样的行为在面向过程的设计中分散在了多个步骤中,很可能出现不同的绘制版本,因为通常设计人员会考虑到实际情况进行各种各样的简化。

而面向对象的设计中,绘图只可能在棋盘对象中出现,从而保证了绘图的统一。

上述的内容是从网上查到的,觉得这个例子非常的生动形象,我就写了下来,现在就应该理解了他俩的区别了吧,其实就是两句话,面向对象就是高度实物抽象化、面向过程就是自顶向下的编程!

二、面向对象的特点

在了解其特点之前,咱们先谈谈对象,对象就是现实世界存在的任何事务都可以称之为对象,有着自己独特的个性

属性用来描述具体某个对象的特征。

比如小志身高180M,体重70KG,这里身高、体重都是属性。

面向对象的思想就是把一切都看成对象,而对象一般都由属性+方法组成!

属性属于对象静态的一面,用来形容对象的一些特性,方法属于对象动态的一面,咱们举一个例子,小明会跑,会说话,跑、说话这些行为就是对象的方法!所以为动态的一面, 我们把属性和方法称为这个对象的成员!

类:具有同种属性的对象称为类,是个抽象的概念。

比如“人”就是一类,期中有一些人名,比如小明、小红、小玲等等这些都是对象,类就相当于一个模具,他定义了它所包含的全体对象的公共特征和功能,对象就是类的一个实例化,小明就是人的一个实例化!

我们在做程序的时候,经常要将一个变量实例化,就是这个原理!

我们一般在做程序的时候一般都不用类名的,比如我们在叫小明的时候,不会喊“人,你干嘛呢!”而是说的是“小明,你在干嘛呢!”

面向对象有三大特性,分别是封装性、继承性和多态性,这里小编不给予太多的解释,因为在后边的博客会专门总结的!

三、面向过程与面向对象的优缺点

很多资料上全都是一群很难理解的理论知识,整的小编头都大了,后来发现了一个比较好的文章,写的真是太棒了,通俗易懂,想要不明白都难!

用面向过程的方法写出来的程序是一份蛋炒饭,而用面向对象写出来的程序是一份盖浇饭。

所谓盖浇饭,北京叫盖饭,东北叫烩饭,广东叫碟头饭,就是在一碗白米饭上面浇上一份盖菜,你喜欢什么菜,你就浇上什么菜。

我觉得这个比喻还是比较贴切的。

蛋炒饭制作的细节,我不太清楚,因为我没当过厨师,也不会做饭,但最后的一道工序肯定是把米饭和鸡蛋混在一起炒匀。

盖浇饭呢,则是把米饭和盖菜分别做好,你如果要一份红烧肉盖饭呢,就给你浇一份红烧肉;

如果要一份青椒土豆盖浇饭,就给浇一份青椒土豆丝。

蛋炒饭的好处就是入味均匀,吃起来香。

如果恰巧你不爱吃鸡蛋,只爱吃青菜的话,那么唯一的办法就是全部倒掉,重新做一份青菜炒饭了。

盖浇饭就没这么多麻烦,你只需要把上面的盖菜拨掉,更换一份盖菜就可以了。

盖浇饭的缺点是入味不均,可能没有蛋炒饭那么香。

到底是蛋炒饭好还是盖浇饭好呢?其实这类问题都很难回答,非要比个上下高低的话,就必须设定一个场景,否则只能说是各有所长。

如果大家都不是美食家,没那么多讲究,那么从饭馆角度来讲的话,做盖浇饭显然比蛋炒饭更有优势,他可以组合出来任意多的组合,而且不会浪费。

盖浇饭的好处就是"菜"“饭"分离,从而提高了制作盖浇饭的灵活性。

饭不满意就换饭,菜不满意换菜。用软件工程的专业术语就是"可维护性"比较好,“饭” 和"菜"的耦合度比较低。

蛋炒饭将"蛋”“饭"搅和在一起,想换"蛋”"饭"中任何一种都很困难,耦合度很高,以至于"可维护性"比较差。

软件工程追求的目标之一就是可维护性,可维护性主要表现在3个方面:可理解性、可测试性和可修改性。

面向对象的好处之一就是显著的改善了软件系统的可维护性。

看了这篇文章,简单的总结一下!

面向过程

优点:性能比面向对象高,因为类调用时需要实例化,开销比较大,比较消耗资源;比如单片机、嵌入式开发、 Linux/Unix等一般采用面向过程开发,性能是最重要的因素。

缺点:没有面向对象易维护、易复用、易扩展

面向对象

优点:易维护、易复用、易扩展,由于面向对象有封装、继承、多态性的特性,可以设计出低耦合的系统,使系统 更加灵活、更加易于维护

缺点:性能比面向过程低


图片

单例模式(Singleton Pattern) 是一种常用的软件设计模式,该模式的主要目的是确保某一个类只有一个实例存在。当你希望在整个系统中,某个类只能出现一个实例时,单例对象就能派上用场。

比如,某个服务器程序的配置信息存放在一个文件中,客户端通过一个 AppConfig 的类来读取配置文件的信息。如果在程序运行期间,有很多地方都需要使用配置文件的内容,也就是说,很多地方都需要创建 AppConfig 对象的实例,这就导致系统中存在多个 AppConfig 的实例对象,而这样会严重浪费内存资源,尤其是在配置文件内容很多的情况下。

事实上,类似 AppConfig 这样的类,我们希望在程序运行期间只存在一个实例对象。
在 Python 中,我们可以用多种方法来实现单例模式:

  1. 使用模块
  2. 使用装饰器
  3. 使用类
  4. 基于 __new__ 方法实现
  5. 基于 metaclass 方式实现

下面来详细介绍:

使用模块
其实,Python 的模块就是天然的单例模式,因为模块在第一次导入时,会生成 .pyc 文件,当第二次导入时,就会直接加载 .pyc 文件,而不会再次执行模块代码。

因此,我们只需把相关的函数和数据定义在一个模块中,就可以获得一个单例对象了。

如果我们真的想要一个单例类,可以考虑这样做:

class Singleton(object):
    def foo(self):
        pass
singleton = Singleton()

将上面的代码保存在文件 mysingleton.py 中,要使用时,直接在其他文件中导入此文件中的对象,这个对象即是单例模式的对象

from mysingleton import singleton

使用装饰器

def Singleton(cls):
    _instance = {}

    def _singleton(*args, **kargs):
        if cls not in _instance:
            _instance[cls] = cls(*args, **kargs)
        return _instance[cls]

    return _singleton


@Singleton
class A(object):
    a = 1

    def __init__(self, x=0):
        self.x = x


a1 = A(2)
a2 = A(3)

使用类

class Singleton(object):

    def __init__(self):
        pass

    @classmethod
    def instance(cls, *args, **kwargs):
        if not hasattr(Singleton, "_instance"):
            Singleton._instance = Singleton(*args, **kwargs)
        return Singleton._instance

一般情况,大家以为这样就完成了单例模式,但是当使用多线程时会存在问题:

class Singleton(object):

    def __init__(self):
        pass

    @classmethod
    def instance(cls, *args, **kwargs):
        if not hasattr(Singleton, "_instance"):
            Singleton._instance = Singleton(*args, **kwargs)
        return Singleton._instance

import threading

def task(arg):
    obj = Singleton.instance()
    print(obj)

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

程序执行后,打印结果如下:

<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>
<__main__.Singleton object at 0x02C933D0>

看起来也没有问题,那是因为执行速度过快,如果在 __init__ 方法中有一些 IO 操作,就会发现问题了。

下面我们通过 time.sleep 模拟,我们在上面 __init__ 方法中加入以下代码:

def __init__(self):
    import time
    time.sleep(1)

重新执行程序后,结果如下:

<__main__.Singleton object at 0x034A3410>
<__main__.Singleton object at 0x034BB990>
<__main__.Singleton object at 0x034BB910>
<__main__.Singleton object at 0x034ADED0>
<__main__.Singleton object at 0x034E6BD0>
<__main__.Singleton object at 0x034E6C10>
<__main__.Singleton object at 0x034E6B90>
<__main__.Singleton object at 0x034BBA30>
<__main__.Singleton object at 0x034F6B90>
<__main__.Singleton object at 0x034E6A90>

问题出现了!按照以上方式创建的单例,无法支持多线程。

解决办法:加锁!未加锁部分并发执行,加锁部分串行执行,速度降低,但是保证了数据安全。

import time
import threading


class Singleton(object):
    _instance_lock = threading.Lock()

    def __init__(self):
        time.sleep(1)

    @classmethod
    def instance(cls, *args, **kwargs):
        with Singleton._instance_lock:
            if not hasattr(Singleton, "_instance"):
                Singleton._instance = Singleton(*args, **kwargs)
        return Singleton._instance


def task(arg):
    obj = Singleton.instance()
    print(obj)
    

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()


time.sleep(20)
obj = Singleton.instance()
print(obj)

打印结果如下:

<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>
<__main__.Singleton object at 0x02D6B110>

这样就差不多了,但是还是有一点小问题,就是当程序执行时,执行了 time.sleep(20) 后,下面实例化对象时,此时已经是单例模式了。

但我们还是加了锁,这样不太好,再进行一些优化,把 intance 方法,改成下面这样就行:

@classmethod
def instance(cls, *args, **kwargs):
    if not hasattr(Singleton, "_instance"):
        with Singleton._instance_lock:
            if not hasattr(Singleton, "_instance"):
                Singleton._instance = Singleton(*args, **kwargs)
    return Singleton._instance

这样,一个可以支持多线程的单例模式就完成了。+

import time
import threading


class Singleton(object):
    _instance_lock = threading.Lock()

    def __init__(self):
        time.sleep(1)

    @classmethod
    def instance(cls, *args, **kwargs):
        if not hasattr(Singleton, "_instance"):
            with Singleton._instance_lock:
                if not hasattr(Singleton, "_instance"):
                    Singleton._instance = Singleton(*args, **kwargs)
        return Singleton._instance


def task(arg):
    obj = Singleton.instance()
    print(obj)
    
    
for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()
    
    
time.sleep(20)
obj = Singleton.instance()
print(obj)

这种方式实现的单例模式,使用时会有限制,以后实例化必须通过 obj = Singleton.instance()

如果用 obj = Singleton(),这种方式得到的不是单例。

基于 __new__ 方法实现

通过上面例子,我们可以知道,当我们实现单例时,为了保证线程安全需要在内部加入锁。

我们知道,当我们实例化一个对象时,是先执行了类的 __new__ 方法(我们没写时,默认调用 object.__new__),实例化对象;然后再执行类的 __init__ 方法,对这个对象进行初始化,所有我们可以基于这个,实现单例模式。

import threading


class Singleton(object):
    _instance_lock = threading.Lock()

    def __init__(self):
        pass


    def __new__(cls, *args, **kwargs):
        if not hasattr(Singleton, "_instance"):
            with Singleton._instance_lock:
                if not hasattr(Singleton, "_instance"):
                    Singleton._instance = object.__new__(cls)  
        return Singleton._instance

obj1 = Singleton()
obj2 = Singleton()
print(obj1,obj2)

def task(arg):
    obj = Singleton()
    print(obj)

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

打印结果如下:

<__main__.Singleton object at 0x038B33D0> <__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>
<__main__.Singleton object at 0x038B33D0>

采用这种方式的单例模式,以后实例化对象时,和平时实例化对象的方法一样 obj = Singleton() 。

基于 metaclass 方式实现

相关知识:

  1. 类由 type 创建,创建类时,type 的 __init__ 方法自动执行,类() 执行 type 的 __call__ 方法(类的 __new__ 方法,类的 __init__ 方法)
  2. 对象由类创建,创建对象时,类的 __init__ 方法自动执行,对象()执行类的 __call__ 方法

例子:

class Foo:
    def __init__(self):
        pass

    def __call__(self, *args, **kwargs):
        pass

obj = Foo()
# 执行type的 __call__ 方法,调用 Foo类(是type的对象)的 __new__方法,用于创建对象,然后调用 Foo类(是type的对象)的 __init__方法,用于对对象初始化。

obj()    # 执行Foo的 __call__ 方法    

元类的使用:

class SingletonType(type):
    def __init__(self,*args,**kwargs):
        super(SingletonType,self).__init__(*args,**kwargs)

    def __call__(cls, *args, **kwargs): # 这里的cls,即Foo类
        print('cls',cls)
        obj = cls.__new__(cls,*args, **kwargs)
        cls.__init__(obj,*args, **kwargs) # Foo.__init__(obj)
        return obj

class Foo(metaclass=SingletonType): # 指定创建Foo的type为SingletonType
    def __init__(self,name):
        self.name = name
    def __new__(cls, *args, **kwargs):
        return object.__new__(cls)

obj = Foo('xx')

实现单例模式:

import threading

class SingletonType(type):
    _instance_lock = threading.Lock()
    def __call__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            with SingletonType._instance_lock:
                if not hasattr(cls, "_instance"):
                    cls._instance = super(SingletonType,cls).__call__(*args, **kwargs)
        return cls._instance

class Foo(metaclass=SingletonType):
    def __init__(self,name):
        self.name = name


obj1 = Foo('name')
obj2 = Foo('name')
print(obj1,obj2)

作者:听风

https://www.cnblogs.com/huchong/p/8244279.html