2023年4月

原理简述

本篇主要学习
keepalived
配合
nginx
实现nginx的高可用, 也就是需要keepalived检测到nginx宕机时停用keepalived, 备用keepalived会自动接收过来.

简单的原理(如下图), 主备服务器会配置相同的vip(虚拟ip), 谁的优先级高谁来接收vip的请求, 然后nginx和keepalived部署在同一个服务器上面, keeplived控制机器接收到vip的请求, 交给了nginx来处理请求. nginx的功能主要是负责负载均衡, nginx的安装配置在此不再赘述, 可以参考这个:
ngix安装与使用

keepalived功能有很多, 此篇只是最简单的配合ngxin实现高可用的demo.

安装

  • 安装常用的的指令包:
    yum install -y curl gcc openssl-devel libnl3-devel net-snmp-devel

  • 安装:
    yum install -y keepalived

  • 启动:
    systemctl start keepalived

  • 重启:
    systemctl restart keepalived

  • 关闭:
    systemctl stop keepalived

  • 开机自启:
    systemctl enable keepalived

  • 修改配置文件:
    vim /etc/keepalived/keepalived.conf

    ! Configuration File for keepalived
    
    # 定义虚拟路由, 必须叫VI_1
    vrrp_instance VI_1 {
        state MASTER #设置为主服务器, 备份服务器设置为BACKUP
        interface enp0s3 #监控的网络接口(ifconfig或者ip addr指令找出网卡)
        priority 100 #(优先级, 主机大一点, 备份机小一点)
        virtual_router_id 99 #同一个vrrp_instance下routerId必须是一致的
    
        authentication {
            auth_type PASS #vrrp认证方式主备必须一致
            auth_pass 12345 #密码
        }
    
        virtual_ipaddress {
            192.168.0.99 #虚拟ip, 主从一致, 可配置多个
        }
    }
    
  • 另外一台机相同方法, 相同配置(state改成
    BACKUP
    , priority调整调一下, 此例中是80)

vrrp 的主从并不是通过stat配置的 MASTER BACKUP 决定的, 是通过优先级决定的

参考1:
Linux下Keepalived安装与配置

参考2:
Keepalived原理介绍和配置实践

参考3:
keepalived介绍、安装及配置详解

参考4:
https://codor.lanzoue.com/b012qnsvc
密码:1i77

检查是否脑裂

  • 使用
    tcpdump -i enp0s3 -nn host 224.0.0.18
    或者
    tcpdump -i enp0s3 | grep VRRP
    进行查看, 默认的广播通道为
    224.0.0.18
    (我把时间删除了, 内容是我改的)

    192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    192.168.0.117 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 80, authtype simple, intvl 1s, length 20
    192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    192.168.0.117 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 80, authtype simple, intvl 1s, length 20
    192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    192.168.0.117 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 80, authtype simple, intvl 1s, length 20
    
  • 如果结果如上, 说明出现了脑裂(主备都向外宣誓我是老大),

    出现这种情况的原因是防火墙或者iptables拦截了vrrp请求, 进行放行即可.

    防火墙(推荐):

    firewall-cmd --add-rich-rule='rule protocol value="vrrp" accept' --permanent
    firewall-cmd --reload
    

    iptables:

    iptables -A INPUT -s 192.168.1.0/24 -d 224.0.0.18 -j ACCEPT
    iptables -A INPUT -s 192.168.1.0/24 -p vrrp -j ACCEPT
    

    不存在可以进行安装,
    yum install iptables-services


  • 最后附上正常运行结果, 即只有100或者80优先级的机器来广播自己是老大

    09:26:55.782258 IP 192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    09:26:56.782910 IP 192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    09:26:57.783787 IP 192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    09:26:58.784709 IP 192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    09:26:59.784792 IP 192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    09:27:00.785171 IP 192.168.0.666666 > 224.0.0.18: VRRPv2, Advertisement, vrid 99, prio 100, authtype simple, intvl 1s, length 20
    

测试ip漂移

ip漂移
: 就是主备切换过程成, vip漂到真实ip上的过程. 也称为
主备切换
.

测试过程就是停用master机器上面的keepalived或者关机master机器, 查看backup机器是否正常接过来, 一般1s左右可以切换过去. 当出现脑裂情况的时候切换过程也能实现, 只是很慢大约7s左右. 具体原因未深究.

漂移过程可以通过抓包实现, 也可以通过两给ngxin转发到不同tomcat中的项目或网页, 或者修改ngxin的默认网页进行测试都可.

至此位置简单使用就完成了, 下面介绍几个功能配置

VRRP脚本

  • 签到keepalived的配置文件夹:
    cd /etc/keepalived/

  • 创建一个脚本文件:
    vim nginx_check.sh

    #!/bin/bash
    count=`ps -C nginx --no-header |wc -l`
    if [ $count -eq 0 ];then
    		killall keepalived
    fi
    
  • 赋予执行权限:
    chmode +x nginx_check.sh

  • 引入脚本:
    vim keepalived.conf


    vrrp_instance
    同级, 其中


    • chk_nginx
      : 脚本名称, 自定义的

    • script
      : 脚本位置

    • interval
      : 执行间隔

    • weight
      : 权重, 如果是负数, 当执行失败时候会影响vrrp_instance中的优先级priority, 因为主备切换是通过优先级的高低的进行切换的, 所以也可以通过这个优先级来进行主动控制主备切换. 而脚本中的内容可以很灵活地实现很多功能. 此个demo中只是简单实现检测到ngxin关闭后自动关闭keepalived, 也可以实现检测启动后进行开启, 然后延迟2s后查看是否启动成功, 未成功再进行关闭keepalived或者降低优先级(配合右键通知).

      vrrp_script chk_nginx {
      	script "/etc/keepalived/nginx_check.sh"
      	interval 2
      	#weight -30
      }
      
  • 配置到vrrp_instance中, 与authentication和virtual_ipaddress同级

    track_script {
    	chk_nginx
    }
    
  • 修改后的配置文件

    ! Configuration File for keepalived
    
    vrrp_script chk_nginx {
        script "/etc/keepalived/nginx_check.sh"
        interval 2
        #weight -30
    }
    
    vrrp_instance VI_1 {
        state MASTER
        interface enp0s3
        priority 100
        advert_int 1
        virtual_router_id 99
        authentication {
            auth_type PASS
            auth_pass 221531
        }
    
        track_script {
            chk_nginx
        }
    
        virtual_ipaddress {
            192.168.0.99
        }
    }
    
  • 测试

    正常启动时候, 手动关闭nginx, 查看keepalived的状态.

  • 参考:


邮件配置

邮件功能是linux上面的
mail
指令.

  • 安装
    mail
    :
    yum -y install mailx

  • 编辑配置文件(设置发送人信息):
    vim /etc/mail.rc
    , 在末尾处添加

    set from=xxx@163.com
    set smtp=smtp.163.com
    set smtp-auth-user=xxx@qq.com
    set smtp-auth-password=KJFHTOSXZQPNFAIU  #邮箱需要开启POP3/SMTP服务并设置密钥
    set smtp-auth=login
    set ssl-verify=ignore
    
  • 测试mail功能:
    echo test mail | mail -s testa 收件人id@qq.com


    • -s
      后面是主题的意思

    • echo test maill
      中的test mail 是邮件正文.

    • 最后跟着收件人

  • 配置到keepalived中, 方法1


    • 创建脚本
      vim mail_send.sh
      (记得赋予执行权限)

      可以使用
      ./mail_send.sh master
      进行测试

      #!/bin/bash
      contact='收件人邮箱@qq.com'
      notify() {
        	  mailsubject="$(hostname) to be $1, vip  转移"
        	  mailbody="$(date +'%F %T'): vrrp transition, $(hostname) changed to be $1"
        	  echo "$mailbody" | mail -s "$mailsubject" $contact
      }
      case $1 in
        	  master)
        			  notify master
        			  ;;
        	  backup)
        			  notify backup
        			  ;;
        	  fault)
        			  notify fault
        			  ;;
        	  *)
        			  echo "Usage: $(basename $0) {master|backup|fault}"
        			  exit 1
        			  ;;
      esac
      
    • 修改配置文件:
      vim keepalived.conf

      vrrp_instance下与authentication同级处

      notify_master "/etc/keepalived/mail_send.sh master"
      notify_backup "/etc/keepalived/mail_send.sh backup"
      notify_fault "/etc/keepalived/mail_send.sh fault"
      
    • 整体配置文件

      ! Configuration File for keepalived
      
      vrrp_script chk_nginx {
          script "/etc/keepalived/nginx_check.sh"
          interval 2
          #weight -30
      }
      
      vrrp_instance VI_1 {
          state MASTER
          interface enp0s3
          priority 100
          advert_int 1
          virtual_router_id 99
          # 当进入master/backup/fault状态时触发脚本, 可携带参数
          notify_master "/etc/keepalived/mail_send.sh master"
          notify_backup "/etc/keepalived/mail_send.sh backup"
          notify_fault "/etc/keepalived/mail_send.sh fault"
          authentication {
              auth_type PASS
              auth_pass 221531
          }
      
          track_script {
              chk_nginx
          }
      
          virtual_ipaddress {
              192.168.0.99
          }
      }
      
  • 配置到keepalived中, 方法2


    • 脚本内容, 下面这个是漂移到master时, 另外创建backup和fault

      #!/bin/bash
      contacts='收件人邮箱1, 收件人邮箱2'
      ip a > ipa_temp.txt
      
      echo "$(date +'%F %T'): Keepalived instance I became MASTER on $(hostname).    --- from master" | mail -s "Master Keepalived notification" -a ipa_temp.txt "$contacts"
      
    • 修改配置文件:
      vim keepalived.conf

      vrrp_instance下与authentication同级处, 后面的
      root
      是执行人和所在组

      notify_master /etc/keepalived/mail_send_master.sh root root
      notify_backup /etc/keepalived/mail_send_backup.sh root root
      notify_fault /etc/keepalived/mail_send_fault.sh root root
      
  • 测试状态转移时有没有邮箱接收到即可, 通过重启, 停用

  • 参考:



学习链接

一丶IO模型&Java IO

Unix为程序员提供了以下5种基本的io模型:

  • blocking io: 阻塞io
  • nonblocking io: 非阻塞io
  • I/O multiplexing: io多路复用
  • signal driven I/O:信号驱动io
  • asynchronous I/O:异步io

但我们平时工作中说的最多是,
阻塞

非阻塞

同步

异步

1.阻塞非阻塞,同步异步

  • 阻塞
    调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
    非阻塞
    调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

    image-20230405114445720

    可用把上面的阻塞队列看作是外卖柜

    put
    方法就是外卖员放外卖,如果容量不够那么一直等待其他用户拿走外卖,这是阻塞。

    offer
    方法也是外卖员放外卖,但是他发现容量不够的时候,返回false,然后采取其他行动,比如打电话喊你下来拿外卖。

  • 同步与异步关注的是
    消息通信机制
    。同步是发起调用在没有得到结果之前,该调用就不返回。异步是发起调用后,这个调用就直接返回了。

    消息队列中间件的作用之一就是异步,发送方将消息发送就立马返回了,不需要等待这个消息被消费者处理。

    同步就是你打电话问外卖员外卖到哪里了,外卖员告知你之前你不挂断电话。

    异步就是你外卖app上发消息问外卖员,发完消息你立马可用做其他的事情。

    异步情况下你怎么知道外卖到哪里了昵?


    • 通知

      外卖员通过平台回复你

    • 回调

      你给外卖员注册了一个回调事件——收到消息后,请回电告知,然后你调用结束,继续处理你的事情,但是外卖员收到消息后,会回调进行电话。

2.Unix的io模型

io操作分为两步:

  • 等待数据就绪

    例如读文件的过程中需要等待磁盘扫描所需数据,等待数据到达内核缓冲区

  • 将数据从内核空间拷贝到用户空间

    对于一次读取IO的操作,数据并不会直接拷贝到应用程序的缓冲区(用户空间),它首先会被拷贝到操作系统内核的缓冲区(内核空间)中,然后才会从操作系统内核的缓冲区拷贝到应用程序的缓冲区。

2.1 blocking io阻塞io

img

首先是我们用户进行进行系统调用,产生中断,操作系统切换到内核态,随后是内核完成数据准备和数据从内核空间复制到用户空间,然后应用进程继续运行。

这里说的阻塞,是系统调用不会立即返回,而是需要阻塞知道数据准备完成,并拷贝到用户空间。

2.2 nonblocking io 非阻塞io

img

可看到,和阻塞io的区别在于,准备数据的这个过程,是应用程序不断进行系统调用,询问操作系统内核是否完成了数据准备,此系统调用不会阻塞直到数据准备完成,而是立马返回。

但是第二阶段,数据从内核空间复制到用户空间是阻塞的,这个过程通常是比较快速的,因为这时候已经有DMA控制器完成了数据从磁盘搬运到内存,只需要拷贝到用户态空间中即可。

2.3 I/O multiplexing io多路复用

img

可以看到IO多路复用的流程和
blocking io阻塞io
类似,甚至还会多一次系统调用。那么IO多路复用存在的意义是什么昵?

假设我们当前的进程是一个服务端程序,存在多个网络io需要处理,我们需要多个线程取处理多个网络io,并且多个线程都是阻塞在系统调用上的,这是对线程资源的浪费。

io多路复用的优点就是:可以使用一个线程监听多路io,这个线程阻塞与
select
系统调用上,当多路io存在任何一个io可读的时候,线程将被唤醒,然后进行数据的拷贝,并进行处理,从而节省线程资源。

2.4 signal driven I/O信号驱动io

img

可以看到,信号驱动的io在数据准备阶段是非阻塞的,当操作系统完成数据准备后将发送信号来通知用户进程发生了某事件,用户进程需要编写对应的信号处理函数,在信号处理函数中阻塞与内核数据拷贝,待拷贝完成后对数据进行处理。

2.5 asynchronous I/O 异步io

img

上面四种模型都会在
数据从内核空间,拷贝到用户空间
这一步发生阻塞,也就是说至少第二步是需要同步等待操作系统完成拷贝的。

异步io模型则解决了这个问题,应用程序只要通知内核要读取的套接字对象, 以及数据的接收地址, 则整个过程都是由内核独立来完成, 包括数据从内核空间向用户空间的拷贝,拷贝完成后再通过信号来通知用户进程。

2.java中的io模型


阻塞

非阻塞

同步

异步
进行组合

  • 阻塞同步io

    这就是java中的BIO

  • 非阻塞同步io

    这就是java中的NIO,java中的nio是通过io多路复用实现的

  • 非阻塞异步io

    这就是java中的AIO,java中的AIO也是通过io多路复用实现,呈现出异步的表象

二丶Java BIO

下面探讨下java中BIO实现Socket编程方面的不足

public static void main(String[] args) throws IOException {

    ExecutorService threadPool 
            = new ThreadPoolExecutor(10,10,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));

    // 1 创建一个socket server监听tcp 6666661端口
    ServerSocket serverSocket = new ServerSocket(6666661);
    // 2 阻塞式接受来自客户端的连接
    while (true) {
        //这一步是阻塞的  阻塞直到有客户端连接上来
        Socket socket = serverSocket.accept();
        System.out.println(socket.getRemoteSocketAddress() + "连接到服务端");
        // 3 为了不影响后续连接进来处理,使用多线程来处理连接
        threadPool.execute(() -> process(socket));
    }
}

private static void process(Socket socket) {
    try (OutputStream out = socket.getOutputStream()) {
        byte[] buffer = new byte[1024];
        int len;
        while ((len = socket.getInputStream().read(buffer)) > 0) {
            System.out.println(socket.getRemoteSocketAddress() + "发送数据:" + new String(buffer, 0, len));
            out.write(buffer, 0, len);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

上面代码实现了,如果客户端请求过来讲客户端请求原封不动的写回,可以看到为了实现实现服务端支持多个客户端的连接,我们使用的线程池。

首先
Socket socket = serverSocket.accept()
,这一步会阻塞直到有客户端连接上来(这一步无所谓,甚至避免了主线程无休止的自旋)

其次
process
方法中拿到输入输出流写回的操作也是阻塞的,这一步需要使用操作系统提供的系统调用,将数据从网卡或者硬盘读入内核空间,然后从内核空间拷贝到用户空间,我们的java程序才可以进行读操作,写则反之。

由于
read,write
这两个方法是阻塞式的,它需要阻塞直到系统调用完成,我们的程序傻傻阻塞等待,因此我们使用了线程池,希望一个线程处理一个客户端请求,阻塞也只阻塞线程池中的线程。但是
process
方法中阻塞的这部分,会体现在我们线程池的线程,也就是说,线程池中存在一些线程阻塞于
read
,
write
函数。

这种模型的优点:

  • 简单直接,可以让开发人员专注于编写process的业务逻辑
  • 不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。
  • 使用多线程利用多核心cpu的能力,当线程阻塞的时候,cpu可以切换时间片给其他线程

这种模型的缺点:

  • 非常依赖于线程,线程是宝贵的资源,虽然使用线程池进行了复用,当前当大量请求到来的时候,我们无法无限制的开辟线程。众多的线程被挂起,被唤醒还会导致上下文切换频繁,cpu利用率降低
  • 线程本身占用较大内存,过多的线程导致jvm内存岌岌可危

那么怎么解决上述的问题昵,能不能解放线程不让他们阻塞在read和write中,能读那就读,不能读那就继续处理其他socket?

三丶Java NIO

image-20230405164149306

回顾这张图,我们上面说的
解放线程不让他们阻塞在read和write中,能读那就读,不能读那就继续处理其他socket
,不正是上面非阻塞的方式,希望系统调用可以立即返回,而不是阻塞。

Java中的nio基于io多路复用实现了同步非阻塞的处理方式

public static void main(String[] args) throws IOException, InterruptedException {
    // 1 创建selector用来侦听多路IO消息 '文件描述符'
    // selector 担任了重要的通知角色,可以将任意IO注册到selector上,通过非阻塞轮巡selector来得知哪些路IO有消息了
    // 底层是epoll(linux下)
    // 后续会把server端注册上来,有服务端被客户端连接上来的IO消息
    // 也会把每个客户端连接注册上来,有客户端发送过来的数据
    Selector selector = Selector.open();
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    // 2 把server端注册上去
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 6666661));
    //配置为非阻塞
    serverSocketChannel.configureBlocking(false);
    //关心accept事件,
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
        // 3 这一步是阻塞的,基于io多路复用中的select poll,epoll
        // 这里可以设置等待事件
        if (selector.select() == 0) {
            continue;
        }

        // 4 如果有至少一路IO有消息,那么set不为空
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        for (SelectionKey key : selectionKeys) {
            if (key.isAcceptable()) {
                System.out.println("客户端连接");
                // 因为我们只注册了serverSocketChannel这一个可以accept的所以这里用强转即可
                SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
                socketChannel.configureBlocking(false);
                // 5 当第一次客户端连接时,就将这个连接也作为channel注册上,他是可读型的
                //当前只是有客户端连接上来了,但是并不代表可读,还需要DMA将网卡数据搬运到内存
                socketChannel.register(selector, SelectionKey.OP_READ);
            } else if (key.isReadable()) {
                // 6 因为步骤5把客户端连接也注册上来了,并且是可读上面的数据的,如果该channel被选出来说明有客户端数据来了
                SocketChannel socketChannel = (SocketChannel) key.channel();
                // 7 必须借助ByteBuffer接受和发送数据
                byteBuffer.clear();
                if (socketChannel.read(byteBuffer) <= 0) {
                    continue;
                }
                byteBuffer.flip();
                byte[] b = new byte[byteBuffer.limit()];
                byteBuffer.get(b);
                System.out.println(key + " 数据来了: " + new String(b));
                byteBuffer.clear();
                byteBuffer.put(b);
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
            }
        }
        // 8 非常重要一定要清理掉每个channel的key,来表示已经处理过了,不然下一次还会被select
        selectionKeys.clear();
    }
}

select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的,它还支持超时阻塞模式。这是一个线程监听多路io的体现,只要有一个事件就绪那么select就会返回。

socketChannel.configureBlocking(false)
将 socketChannel设置为非阻塞其读写操作都是非阻塞的,也就说如果无法读,那么read函数返回-1,将会让当前线程去遍历其他就绪的事件,而不是傻傻等待,这是
非阻塞io的体现

四丶Java AIO

 public static void main(String[] args) throws IOException, InterruptedException {
        AsynchronousServerSocketChannel serverChannel =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(6666661));
        System.out.println(Thread.currentThread() + "开始监听6666661端口");
        serverChannel.accept(null, new CompletionHandler<>() {
            @SneakyThrows
            @Override
            public void completed(AsynchronousSocketChannel channel, Object attachment) {
                // 递归注册accept
                serverChannel.accept(attachment, this);
                System.out.println(Thread.currentThread() + "有客户端连接上来了" + channel.getRemoteAddress());
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                channel.read(buffer, null, new CompletionHandler<Integer, ByteBuffer>() {
                    @SneakyThrows
                    @Override
                    public void completed(Integer len, ByteBuffer attachment) {
                        // 递归注册read
                        channel.read(buffer, null, this);
                        buffer.flip();
                        System.out.println(channel.getRemoteAddress() + ":" + new String(buffer.array(), 0, len));
                        buffer.clear();
                        channel.write(ByteBuffer.wrap("HelloClient".getBytes()));
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {

                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }


AIO
中,所有创建的通道都会直接在
OS
上注册监听,当出现
IO
请求时,会先由操作系统接收、准备、拷贝好数据,然后再通知监听对应通道的程序处理数据。

客户端的连接到来后同样会先注册到选择器上,但客户端的
I/O
请求会先交由
OS
处理,当内核将数据拷贝完成后才会分配一条线程处理。这一点不同于BIO和NIO,NIO和BIO在内核拷贝数据到用户态的这一步任然是阻塞的。

LRU缓存替换策略

缓存是一种非常常见的设计,通过将数据缓存到访问速度更快的存储设备中,来提高数据的访问速度,如内存、CPU缓存、硬盘缓存等。

但与缓存的高速相对的是,缓存的成本较高,因此容量往往是有限的,当缓存满了之后,就需要一种策略来决定将哪些数据移除出缓存,以腾出空间来存储新的数据。

这样的策略被称为缓存替换策略(Cache Replacement Policy)。

常见的缓存替换策略有:FIFO(First In First Out)、LRU(Least Recently Used)、LFU(Least Frequently Used)等。

今天给大家介绍的是LRU算法。

核心思想

LRU算法基于这样一个假设:如果数据最近被访问过,那么将来被访问的几率也更高。

大部分情况下这个假设是成立的,因此LRU算法也是比较常用的缓存替换策略。

基于这个假设,我们在实现的时候,需要维护一个有序的数据结构,来记录数据的访问历史,当缓存满了之后,就可以根据这个数据结构来决定将哪些数据移除出缓存。

不适用场景

但如果数据的访问模式不符合LRU算法的假设,那么LRU算法就会失效。

例如:数据的访问模式是周期性的,那么LRU算法就会把周期性的数据淘汰掉,这样就会导致缓存命中率的下降。

换个说法比如,如果现在缓存的数据只在白天被访问,晚上访问的是另一批数据,那么在晚上,LRU算法就会把白天访问的数据淘汰掉,第二天白天又会把昨天晚上访问的数据淘汰掉,这样就会导致缓存命中率的下降。

后面有时间会给大家介绍LFU(Least Frequently Used)算法,以及LFU和LRU的结合LFRU(Least Frequently and Recently Used)算法,可以有效的解决这个问题。

算法基本实现

上文提到,LRU算法需要维护一个有序的数据结构,来记录数据的访问历史。通常我们会用双向链表来实现这个数据结构,因为双向链表可以在O(1)的时间复杂度内往链表的头部或者尾部插入数据,以及在O(1)的时间复杂度内删除数据。

我们将数据存储在双向链表中,每次访问数据的时候,就将数据移动到链表的尾部,这样就可以保证链表的尾部就是最近访问的数据,链表的头部就是最久没有被访问的数据。

当缓存满了之后,如果需要插入新的数据,因为链表的头部就是最久没有被访问的数据,所以我们就可以直接将链表的头部删除,然后将新的数据插入到链表的尾部。

如果我们要实现一个键值对的缓存,我们可以用一个哈希表来存储键值对,这样就可以在O(1)的时间复杂度内完成查找操作,.NET 中我们可以使用 Dictionary。

同时我们使用 LinkedList 来作为双向链表的实现,存储缓存的 key,以此记录数据的访问历史。

我们在每次操作 Dictionary 进行插入、删除、查找的时候,都需要将对应的 key 也插入、删除、移动到链表的尾部。

// 实现 IEnumerable 接口,方便遍历
public class LRUCache<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>>
{
    private readonly LinkedList<TKey> _list;

    private readonly Dictionary<TKey, TValue> _dictionary;

    private readonly int _capacity;
    
    public LRUCache(int capacity)
    {
        _capacity = capacity;
        _list = new LinkedList<TKey>();
        _dictionary = new Dictionary<TKey, TValue>();
    }

    public TValue Get(TKey key)
    {
        if (_dictionary.TryGetValue(key, out var value))
        {
            // 在链表中删除 key,然后将 key 添加到链表的尾部
            // 这样就可以保证链表的尾部就是最近访问的数据,链表的头部就是最久没有被访问的数据
            // 但是在链表中删除 key 的时间复杂度是 O(n),所以这个算法的时间复杂度是 O(n)
            _list.Remove(key);
            _list.AddLast(key);
            return value;
        }

        return default;
    }

    public void Put(TKey key, TValue value)
    {
        if (_dictionary.TryGetValue(key, out _))
        {
            // 如果插入的 key 已经存在,将 key 对应的值更新,然后将 key 移动到链表的尾部
            _dictionary[key] = value;
            _list.Remove(key);
            _list.AddLast(key);
        }
        else
        {          
            if (_list.Count == _capacity)
            {
                // 缓存满了,删除链表的头部,也就是最久没有被访问的数据
                _dictionary.Remove(_list.First.Value);
                _list.RemoveFirst();
            }

            _list.AddLast(key);
            _dictionary.Add(key, value);
        }
    }

    public void Remove(TKey key)
    {
        if (_dictionary.TryGetValue(key, out _))
        {
            _dictionary.Remove(key);
            _list.Remove(key);
        }
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        foreach (var key in _list)
        {
            yield return new KeyValuePair<TKey, TValue>(key, _dictionary[key]);
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
var lruCache = new LRUCache<int, int>(4);

lruCache.Put(1, 1);
lruCache.Put(2, 2);
lruCache.Put(3, 3);
lruCache.Put(4, 4);

Console.WriteLine(string.Join(" ", lruCache));
Console.WriteLine(lruCache.Get(2));
Console.WriteLine(string.Join(" ", lruCache));
lruCache.Put(5, 5);
Console.WriteLine(string.Join(" ", lruCache));
lruCache.Remove(3);
Console.WriteLine(string.Join(" ", lruCache));

输出:

[1, 1] [2, 2] [3, 3] [4, 4] // 初始化
2                           // 访问 2
[1, 1] [3, 3] [4, 4] [2, 2] // 2 移动到链表尾部
[3, 3] [4, 4] [2, 2] [5, 5] // 插入 5
[4, 4] [2, 2] [5, 5]        // 删除 3

算法优化

上面的实现中,对缓存的查询、插入、删除都会涉及到链表中数据的删除(移动也是删除再插入)。

因为我们在 LinkedList 中存储的是 key,所以我们需要先通过 key 在链表中找到对应的节点,然后再进行删除操作,这就导致了链表的删除操作的时间复杂度是 O(n)。

虽然 Dictionary 的查找、插入、删除操作的时间复杂度都是 O(1),但因为链表操作的时间复杂度是 O(n),整个算法的最差时间复杂度是 O(n)。

算法优化的关键在于如何降低链表的删除操作的时间复杂度。

优化思路:

  1. 在 Dictionary 中存储 key 和 LinkedList 中节点的映射关系
  2. 在 LinkedList 的节点中存储 key-value

也就是说,我们让两个本来不相关的数据结构之间产生联系。

不管是在插入、删除、查找缓存的时候,都可以通过这种联系来将时间复杂度降低到 O(1)。

  1. 通过 key 在 Dictionary 中找到对应的节点,然后再从 LinkedList 节点中取出 value,时间复杂度是 O(1)
  2. LinkedList 删除数据之前,先通过 key 在 Dictionary 中找到对应的节点,然后再删除,这样就可以将链表的删除操作的时间复杂度降低到 O(1)
  3. LinkedList 删除头部节点时,因为节点中存储了 key,所以我们可以通过 key 在 Dictionary 中删除对应的节点,时间复杂度是 O(1)
public class LRUCache_V2<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>>
{
    private readonly LinkedList<KeyValuePair<TKey, TValue>> _list;
    
    private readonly Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>> _dictionary;
    
    private readonly int _capacity;
    
    public LRUCache_V2(int capacity)
    {
        _capacity = capacity;
        _list = new LinkedList<KeyValuePair<TKey, TValue>>();
        _dictionary = new Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>>();
    }
    
    public TValue Get(TKey key)
    {
        if (_dictionary.TryGetValue(key, out var node))
        {
            _list.Remove(node);
            _list.AddLast(node);
            return node.Value.Value;
        }
        
        return default;
    }
    
    public void Put(TKey key, TValue value)
    {
        if (_dictionary.TryGetValue(key, out var node))
        {
            node.Value = new KeyValuePair<TKey, TValue>(key, value);
            _list.Remove(node);
            _list.AddLast(node);
        }
        else
        {
            if (_list.Count == _capacity)
            {
                _dictionary.Remove(_list.First.Value.Key);
                _list.RemoveFirst();
            }
            
            var newNode = new LinkedListNode<KeyValuePair<TKey, TValue>>(new KeyValuePair<TKey, TValue>(key, value));
            _list.AddLast(newNode);
            _dictionary.Add(key, newNode);
        }
    }
    
    public void Remove(TKey key)
    {
        if (_dictionary.TryGetValue(key, out var node))
        {
            _dictionary.Remove(key);
            _list.Remove(node);
        }
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return _list.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

进一步优化

因为我们对 双向链表 的存储需求是定制化的,要求节点中存储 key-value,直接使用 C# 的 LinkedList 我们就需要用 KeyValuePair 这样的结构来间接存储,会导致一些不必要的内存开销。

我们可以自己实现一个双向链表,这样就可以直接在节点中存储 key-value,从而减少内存开销。

public class LRUCache_V3<TKey, TValue>
{
    private readonly DoubleLinkedListNode<TKey, TValue> _head;

    private readonly DoubleLinkedListNode<TKey, TValue> _tail;

    private readonly Dictionary<TKey, DoubleLinkedListNode<TKey, TValue>> _dictionary;

    private readonly int _capacity;

    public LRUCache_V3(int capacity)
    {
        _capacity = capacity;
        _head = new DoubleLinkedListNode<TKey, TValue>();
        _tail = new DoubleLinkedListNode<TKey, TValue>();
        _head.Next = _tail;
        _tail.Previous = _head;
        _dictionary = new Dictionary<TKey, DoubleLinkedListNode<TKey, TValue>>();
    }

    public TValue Get(TKey key)
    {
        if (_dictionary.TryGetValue(key, out var node))
        {
            RemoveNode(node);
            AddLastNode(node);
            return node.Value;
        }

        return default;
    }

    public void Put(TKey key, TValue value)
    {
        if (_dictionary.TryGetValue(key, out var node))
        {
            RemoveNode(node);
            AddLastNode(node);
            node.Value = value;
        }
        else
        {
            if (_dictionary.Count == _capacity)
            {
                var firstNode = RemoveFirstNode();

                _dictionary.Remove(firstNode.Key);
            }

            var newNode = new DoubleLinkedListNode<TKey, TValue>(key, value);
            AddLastNode(newNode);
            _dictionary.Add(key, newNode);
        }
    }

    public void Remove(TKey key)
    {
        if (_dictionary.TryGetValue(key, out var node))
        {
            _dictionary.Remove(key);
            RemoveNode(node);
        }
    }

    private void AddLastNode(DoubleLinkedListNode<TKey, TValue> node)
    {
        node.Previous = _tail.Previous;
        node.Next = _tail;
        _tail.Previous.Next = node;
        _tail.Previous = node;
    }

    private DoubleLinkedListNode<TKey, TValue> RemoveFirstNode()
    {
        var firstNode = _head.Next;
        _head.Next = firstNode.Next;
        firstNode.Next.Previous = _head;
        firstNode.Next = null;
        firstNode.Previous = null;
        return firstNode;
    }

    private void RemoveNode(DoubleLinkedListNode<TKey, TValue> node)
    {
        node.Previous.Next = node.Next;
        node.Next.Previous = node.Previous;
        node.Next = null;
        node.Previous = null;
    }
    
    internal class DoubleLinkedListNode<TKey, TValue>
    {    
        public DoubleLinkedListNode()
        {
        }

        public DoubleLinkedListNode(TKey key, TValue value)
        {
            Key = key;
            Value = value;
        }

        public TKey Key { get; set; }
        
        public TValue Value { get; set; }

        public DoubleLinkedListNode<TKey, TValue> Previous { get; set; }

        public DoubleLinkedListNode<TKey, TValue> Next { get; set; }
    }
}

Benchmark

使用 BenchmarkDotNet 对3个版本进行性能测试对比。

[MemoryDiagnoser]
public class WriteBenchmarks
{
    // 保证写入的数据有一定的重复性,借此来测试LRU的最差时间复杂度
    private const int Capacity = 1000;
    private const int DataSize = 10_0000;
    
    private List<int> _data;

    [GlobalSetup]
    public void Setup()
    {
        _data = new List<int>();
        var shared = Random.Shared;
        for (int i = 0; i < DataSize; i++)
        {
            _data.Add(shared.Next(0, DataSize / 10));
        }
    }
    
    [Benchmark]
    public void LRUCache_V1()
    {
        var cache = new LRUCache<int, int>(Capacity);
        foreach (var item in _data)
        {
            cache.Put(item, item);
        }
    }
    
    [Benchmark]
    public void LRUCache_V2()
    {
        var cache = new LRUCache_V2<int, int>(Capacity);
        foreach (var item in _data)
        {
            cache.Put(item, item);
        }
    }
    
    [Benchmark]
    public void LRUCache_V3()
    {
        var cache = new LRUCache_V3<int, int>(Capacity);
        foreach (var item in _data)
        {
            cache.Put(item, item);
        }
    }
}

public class ReadBenchmarks
{
    // 保证写入的数据有一定的重复性,借此来测试LRU的最差时间复杂度
    private const int Capacity = 1000;
    private const int DataSize = 10_0000;
    
    private List<int> _data;
    private LRUCache<int, int> _cacheV1;
    private LRUCache_V2<int, int> _cacheV2;
    private LRUCache_V3<int, int> _cacheV3;

    [GlobalSetup]
    public void Setup()
    {
        _cacheV1 = new LRUCache<int, int>(Capacity);
        _cacheV2 = new LRUCache_V2<int, int>(Capacity);
        _cacheV3 = new LRUCache_V3<int, int>(Capacity);
        _data = new List<int>();
        var shared = Random.Shared;
        for (int i = 0; i < DataSize; i++)
        {
            int dataToPut  = shared.Next(0, DataSize / 10);
            int dataToGet = shared.Next(0, DataSize / 10);
            _data.Add(dataToGet);
            _cacheV1.Put(dataToPut, dataToPut);
            _cacheV2.Put(dataToPut, dataToPut);
            _cacheV3.Put(dataToPut, dataToPut);
        }
    }
    
    [Benchmark]
    public void LRUCache_V1()
    {
        foreach (var item in _data)
        {
            _cacheV1.Get(item);
        }
    }
    
    [Benchmark]
    public void LRUCache_V2()
    {
        foreach (var item in _data)
        {
            _cacheV2.Get(item);
        }
    }
    
    [Benchmark]
    public void LRUCache_V3()
    {
        foreach (var item in _data)
        {
            _cacheV3.Get(item);
        }
    }
}

写入性能测试结果:

|      Method |      Mean |     Error |    StdDev |    Median |     Gen0 |     Gen1 | Allocated |
|------------ |----------:|----------:|----------:|----------:|---------:|---------:|----------:|
| LRUCache_V1 | 16.890 ms | 0.3344 ms | 0.8012 ms | 16.751 ms | 750.0000 | 218.7500 |   4.65 MB |
| LRUCache_V2 |  7.193 ms | 0.1395 ms | 0.3958 ms |  7.063 ms | 703.1250 | 226.5625 |   4.22 MB |
| LRUCache_V3 |  5.761 ms | 0.1102 ms | 0.1132 ms |  5.742 ms | 585.9375 | 187.5000 |   3.53 MB |

查询性能测试结果:

|      Method |      Mean |     Error |    StdDev |    Gen0 | Allocated |
|------------ |----------:|----------:|----------:|--------:|----------:|
| LRUCache_V1 | 19.475 ms | 0.3824 ms | 0.3390 ms | 62.5000 |  474462 B |
| LRUCache_V2 |  1.994 ms | 0.0273 ms | 0.0242 ms |       - |       4 B |
| LRUCache_V3 |  1.595 ms | 0.0187 ms | 0.0175 ms |       - |       3 B |

本文将帮助您理解 DAO 的概念,并帮助您构建一个基本的 DAO。

什么是 DAO?

您可以将 DAO 视为基于互联网的实体(比如企业),由其股东(拥有代币和比例投票权的成员)共同拥有和管理。在 DAO 中,决策是通过提案做出的,DAO 的成员可以对这些提案进行投票,然后执行它们。

DAO 完全由可公开查看/可验证的代码管理,没有一个人(如 CEO)负责决策。

DAO 如何运作?

如前所述,DAO 由代码管理,但是如果运行代码的机器的人决定关闭机器或编辑代码怎么办?

所需要的是让相同的代码在由不同实体托管的一组机器上运行,这样即使其中一个关闭,另一个也可以接管。区块链帮助我们解决了上述问题,基于 EVM 的区块链(如 Ethereum 和 Polygon)允许我们在公共去中心化分类账上运行智能合约。部署在这些网络上的智能合约将传播到网络上所有可以查看和验证它的节点,并且没有任何一方控制网络。

具有代币成员资格的 DAO 向其成员发行代币,代币代表系统中的投票权。根据所设置的治理,任何人都可以创建 DAO 更改提案,并将其提交给具有法定人数(通过所需的最低百分比/票数)和投票持续时间的投票。成员可以对提案进行查看和投票,投票权与成员拥有的代币数量成正比。投票期结束后,我们检查提案是否通过,如果通过,则执行。

DAO 的一些示例是
MakerDAO

Aragon

下图显示了流程。

file

让我们开始建设

我们将在我们的代码库中使用
OpenZeppelin
合约,我还将使用 Patrick Collins 的
DAO 模板
中的一些代码。

先决条件

您将需要以下内容才能开始。

  1. Node.js :您可以从 Node.js
    网站
    下载最新版本。我在撰写本文时使用的版本是 16.14.2。
  2. Yarn
    :我们将使用 Y
    arn
    作为包管理器。
  3. Hardhat
    :我们将使用
    Hardhat
    作为本地开发环境。

资料库

我已经编写代码并将其推送到 GitHub,如果您想自己尝试,可以在
此处
获取代码,不过我建议您留下来,因为我会解释代码。

场景

我们将构建一个将执行以下操作的 DAO:

场景 1

  1. 添加初始成员。(让我们称他们为创始人)。
  2. 让创始人创建一个提案。(提出要在智能合约上执行的功能)。
  3. 让方正对上述提案进行投票,因为方正拥有 100% 的投票份额,因此它将通过。
  4. 执行提案。(以及智能合约中的功能)

场景 2

  1. 添加一个初始成员(我们称他们为 Founder)。
  2. 添加另一个成员并向他们发行价值 20% 的创始人份额的新代币。
  3. 让创始人创建一个提案(提出一个要在智能合约上执行的功能)。
  4. 让创始人和新成员对上述提案进行投票。法定人数设置为 90%。
  5. 执行提案(以及智能合约中的功能)。

合约

如前所述,我们将使用 OpenZeppelin 的治理合约。合同如下:

  1. Governor contract:
    Governor contract决定了一个quorum所需的票数/百分比(例如,如果quorum是4%,那么只需要4%的选民投票通过该提案),投票周期即多久投票是否开放,投票延迟,即提案创建后多长时间允许成员更改他们拥有的代币数量。总督还提供创建提案、投票和执行提案的功能。
  2. TimeLock:
    TimeLock 合约为不同意决定执行前退出系统的成员提供时间。
  3. Token:
    Token合约是一种特殊类型的ERC20合约,实现ERC20Votes扩展。这允许将投票权映射到过去余额的快照而不是当前余额,这有助于防止成员知道即将提出重要提案并试图通过购买更多代币然后在之后抛售来增加他们的投票权投票。
  4. 目标:
    这是合约,其代码将在提案通过投票后执行。

代码

让我们开始把这一切放在一起。使用 Hardhat 创建一个空的示例项目。在您的终端中运行以下命令。

yarn add — 开发安全帽

接下来,让我们使用 hardhat 创建我们的文件夹结构。

你应该看到这样的提示

file

单击“创建基本示例项目”。该过程完成后,您应该会看到类似这样的内容。

file

合约

让我们开始添加合约,首先让我们添加 GovernorContract。
我们可以从OpenZeppelin
获得相同的代码,或者您可以复制下面的代码或从我的仓库中复制代码。我的合约代码修复了 OpenZeppelin 版本中的一个
问题
,以及投票延迟、法定人数和投票周期的参数化,类似于 Patrick Collins 版本。

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.4;
import “@openzeppelin/contracts/governance/Governor.sol”;
import “@openzeppelin/contracts/governance/extensions/GovernorSettings.sol”;
import “@openzeppelin/contracts/governance/extensions/GovernorCountingSimple.sol”;
//import “@openzeppelin/contracts/governance/extensions/GovernorVotes.sol”;
import “@openzeppelin/contracts/governance/extensions/GovernorVotesQuorumFraction.sol”;
import “@openzeppelin/contracts/governance/extensions/GovernorTimelockControl.sol”;
contract GovernorContract is Governor, GovernorSettings, GovernorCountingSimple, GovernorVotes, GovernorVotesQuorumFraction, GovernorTimelockControl {
constructor(IVotes _token, TimelockController _timelock,uint256 _quorumPercentage,
uint256 _votingPeriod,
uint256 _votingDelay)
Governor(“GovernorContract”)
GovernorSettings(_votingDelay,_votingPeriod,0)
GovernorVotes(_token)
GovernorVotesQuorumFraction(_quorumPercentage)
GovernorTimelockControl(_timelock)
{}
// The following functions are overrides required by Solidity.
function votingDelay()
public
view
override(IGovernor, GovernorSettings)
returns (uint256)
{
return super.votingDelay();
}
function votingPeriod()
public
view
override(IGovernor, GovernorSettings)
returns (uint256)
{
return super.votingPeriod();
}
function quorum(uint256 blockNumber)
public
view
override(IGovernor, GovernorVotesQuorumFraction)
returns (uint256)
{
return super.quorum(blockNumber);
}
function getVotes(address account, uint256 blockNumber)
public
view
override(Governor, IGovernor)
returns (uint256)
{
return _getVotes(account, blockNumber, _defaultParams());
}
function state(uint256 proposalId)
public
view
override(Governor, GovernorTimelockControl)
returns (ProposalState)
{
return super.state(proposalId);
}
function propose(address[] memory targets, uint256[] memory values, bytes[] memory calldatas, string memory description)
public
override(Governor, IGovernor)
returns (uint256)
{
return super.propose(targets, values, calldatas, description);
}
function proposalThreshold()
public
view
override(Governor, GovernorSettings)
returns (uint256)
{
return super.proposalThreshold();
}
function _execute(uint256 proposalId, address[] memory targets, uint256[] memory values, bytes[] memory calldatas, bytes32 descriptionHash)
internal
override(Governor, GovernorTimelockControl)
{
super._execute(proposalId, targets, values, calldatas, descriptionHash);
}
function _cancel(address[] memory targets, uint256[] memory values, bytes[] memory calldatas, bytes32 descriptionHash)
internal
override(Governor, GovernorTimelockControl)
returns (uint256)
{
return super._cancel(targets, values, calldatas, descriptionHash);
}
function _executor()
internal
view
override(Governor, GovernorTimelockControl)
returns (address)
{
return super._executor();
}
function supportsInterface(bytes4 interfaceId)
public
view
override(Governor, GovernorTimelockControl)
returns (bool)
{
return super.supportsInterface(interfaceId);
}
}

接下来,让我们添加令牌合约,这在 OpenZeppelin 上也是可用的。我的代码有一个额外的“issueToken”函数(稍后会详细介绍)。

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.2;
import "@openzeppelin/contracts/token/ERC20/ERC20.sol";
import "@openzeppelin/contracts/token/ERC20/extensions/draft-ERC20Permit.sol";
import "@openzeppelin/contracts/token/ERC20/extensions/ERC20Votes.sol";
contract MyToken is ERC20, ERC20Permit, ERC20Votes {
constructor() ERC20("MyToken", "MTK") ERC20Permit("MyToken") {
_mint(msg.sender, 1000);
}
// The functions below are overrides required by Solidity.
function _afterTokenTransfer(address from, address to, uint256 amount)
internal
override(ERC20, ERC20Votes)
{
super._afterTokenTransfer(from, to, amount);
}
function _mint(address to, uint256 amount)
internal
override(ERC20, ERC20Votes)
{
super._mint(to, amount);
}
function _burn(address account, uint256 amount)
internal
override(ERC20, ERC20Votes)
{
super._burn(account, amount);
}
function issueToken(address to, uint256 amount) public{
_mint(to, amount);
}
}

最后,让我们检查一下 Target 合约,在我们的例子中,我们将使用 Patrick Collins 使用的相同 Box 合约。

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
import "@openzeppelin/contracts/governance/TimelockController.sol";
contract TimeLock is TimelockController {
// minDelay is how long you have to wait before executing
// proposers is the list of addresses that can propose
// executors is the list of addresses that can execute
constructor(
uint256 minDelay,
address[] memory proposers,
address[] memory executors
) TimelockController(minDelay, proposers, executors) {}
}

测试

现在我们有了合同,我们需要编写测试。在“test”文件夹下创建一个文件“sample-test.js”。让我们开始编写我们的测试。首先,让我们使用以下数据创建一个名为“helper.config.js”的配置文件。

module.exports=
{
      MIN_DELAY:3600,
      QUORUM_PERCENTAGE:90,
      VOTING_PERIOD:5,
      VOTING_DELAY:3,
      ADDRESS_ZERO :"0x0000000000000000000000000000000000000000"
}

法定人数为 90%,投票周期为 5 个区块,投票延迟为 3 个区块。TimeLock 的最小延迟为 3600 秒。

让我们编写代码将所有合约部署到本地网络(Hardhat 在内部管理它,我们不需要启动任何进程)

governanceToken = await ethers.getContractFactory("MyToken")
deployedToken=await governanceToken.deploy();
await deployedToken.deployed();
transactionResponse = await deployedToken.delegate(owner.address)
await transactionResponse.wait(1)
timeLock = await ethers.getContractFactory("TimeLock")
deployedTimeLock=await timeLock.deploy(MIN_DELAY,[],[]);
await deployedTimeLock.deployed();
governor = await ethers.getContractFactory("GovernorContract")
deployedGovernor=await governor.deploy(deployedToken.address,deployedTimeLock.address,QUORUM_PERCENTAGE,VOTING_PERIOD,VOTING_DELAY);
await deployedGovernor.deployed()
box = await ethers.getContractFactory("Box")
deployedBox=await box.deploy()
await deployedBox.deployed()

提案创建

接下来,创建提案。我们传递将在 Box 合约及其参数上调用的函数的编码值。

propose 函数的输出是一个包含
Proposal Id 的交易。
这用于跟踪提案。

const proposalDescription="propose this data"
let encodedFunctionCall = box.interface.encodeFunctionData("store", [77])
const proposeTx = await deployedGovernor.propose([deployedBox.address],[0],[encodedFunctionCall],proposalDescription);

提案是在值为 77 的 Box 合约上触发
存储功能。

表决

然后我们对该提案进行投票,投票“1”表示同意。

注意:
在这种情况下,我们只有一名成员(拥有 100% 的选票)在投票。

const voteWay = 1
const reason = "I vote yes"
let voteTx = await deployedGovernor.castVoteWithReason(proposalId, voteWay, reason)

队列和执行

接下来,来自 DAO 的任何成员都可以排队并执行该提案,如果提案通过投票,它将被执行,并且 Box 合约上的存储函数将被调用,值为 77。你可能已经注意到像
moveTime

moveBlocks ,
这些来自 Patrick Collins
DAO 模板
,在开发环境中可用于模拟时间的流逝和区块挖掘,它们帮助我们模拟投票期的完成、时间锁定延迟等。

const queueTx = await deployedGovernor.queue([deployedBox.address],[0],[encodedFunctionCall],descriptionHash)
await queueTx.wait(1)
await moveTime(MIN_DELAY + 1)
await moveBlocks(1)
console.log("Executing...")
const executeTx = await deployedGovernor.execute(
[deployedBox.address],
[0],
[encodedFunctionCall],
descriptionHash
)
await executeTx.wait(1)
const value=await deployedBox.retrieve();
console.log(value)

运行测试

我们现在可以使用以下命令运行测试

纱线安全帽测试

向新成员发行代币

我们上面看到的是场景 1 的流程。对于场景 2,我们需要向新成员发行新代币并让他们对提案进行投票。

发行代币的代码如下所示

[owner, addr1, addr2] = await ethers.getSigners();
const signer=await ethers.getSigner(addr1.address);
const deployedTokenUser2=await deployedToken.connect(signer)
await deployedTokenUser2.issueToken(addr1.address,200)

函数
getSigners()
返回 Hardhat 开发环境中所有帐户的列表,然后我们向该地址发行 200 个令牌。

新成员投票

现在我们有了另一个成员,我们可以用他来投票,但是新成员不能投票,除非他首先将自己添加为代币合约的代表,这样做是为了让拥有代币但不想参与决策的成员不需要花费额外的 gas 成本来维护他们在账本上的投票权快照。

自委托的代码如下。

[owner, addr1, addr2] = await ethers.getSigners();
const signer=await ethers.getSigner(addr1.address);
const deployedTokenUser2=await deployedToken.connect(signer)
await deployedTokenUser2.issueToken(addr1.address,200)

文章来源:
https://blog.blockmagnates.com/how-to-build-a-dao-decentralized-autonomous-organization-in-solidity-af1cf900d95d

通过Github 获取更多区块链学习资料!

https://github.com/Manuel-yang/BlockChainSelfLearning

承接上文

承接之前的【精华推荐 |【算法数据结构专题】「延时队列算法」史上非常详细分析和介绍如何通过时间轮(TimingWheel)实现延时队列的原理指南】,让我们基本上已经知道了「时间轮算法」原理和核心算法机制,接下来我们需要面向于实战开发以及落地角度进行分析如何实现时间轮的算法机制体系。


前言回顾

什么是时间轮

  • 调度模型
    :时间轮是为解决高效调度任务而产生的调度模型/算法思想。
  • 数据结构
    :通常由hash表和双向链表实现的数据结构。

为什么用时间轮?

对比传统队列的优势

相比传统的队列形式的调度器来说,时间轮能够批量高效的管理各种延时任务、周期任务、通知任务等等。例如延时队列/延时任务体系

延时任务/队列体系

**延时任务、周期性任务,应用场景主要在延迟大规模的延时任务、周期性的定时任务等。

案例-Kafka的延时操作系列

比如,对于耗时的网络请求(
比如Produce时等待ISR副本复制成功
)会被封装成DelayOperation进行延迟处理操作,防止阻塞Kafka请求处理线程,从而影响效率和性能。

传统队列带来的性能问题

Kafka没有使用传统的队列机制(JDK自带的Timer+DelayQueue实现)。因为
时间复杂度
上这两者插入和删除操作都是 O(logn),不能满足Kafka的高性能要求。

基于JDK自带的Timer+DelayQueue实现

JDK Timer和DelayQueue底层都是个优先队列,即采用了minHeap的数据结构,最快需要执行的任务排在队列第一个,不一样的是Timer中有个线程去拉取任务执行,DelayQueue其实就是个容器,需要配合其他线程工作。

ScheduledThreadPoolExecutor是JDK的定时任务实现的一种方式,其实也就是DelayQueue+池化线程的一个实现

基于时间轮TimeWheel的实现

时间轮算法的插入删除操作都是O(1)的时间复杂度,满足了Kafka对于性能的要求。除了Kafka以外,像 Netty 、ZooKeepr、Dubbo等开源项目、甚至Linux内核中都有使用到时间轮的实现。

当流量小时,使用DelayQueue效率更高。当流量大事,使用时间轮将大大提高效率。

时间轮算法是怎么样的,算法思想是什么?

时间轮的数据结构

数据结构模型主要由:时间轮环形队列和时间轮任务列表组成。

  • 时间轮(TimingWheel)
    是一个存储定时任务的
    环形队列(cycle array)
    ,底层采用环形数组实现,数组中的每个元素可以对应一个
    时间轮任务任务列表(TimeWheelTaskList)

  • 时间轮任务任务列表(TimeWheelTaskList)
    是一个环形的双向链表,其中的每个元素都是延时/定时任务项(TaskEntry),其中封装了任务基本信息和元数据(Metadata)。

可以看到图中的几个参数:

  • startMs:
    开始时间

  • tickMs:
    时间轮执行的最小单位

    时间轮
    由多个时间格组成,每个
    时间格
    代表当前时间轮的基本时间跨度就是tickMs。

  • wheelSize:
    时间轮中环形队列的数量

    时间轮
    的时间格数量是固定的,可用wheelSize来表示。

  • interval:
    时间轮的整体时间跨度
    =
    tickMs * wheelSize

根据上面这两个属性,我们就可以讲整个时间轮的总体时间跨度(interval)可以通过公式tickMs × wheelSize计算得出。例如果时间轮的
tickMs=1ms

wheelSize=20
,那么可以计算得出interval为
20ms

currentTime游标指针

此外,时间轮还有一个游标指针,我们称之为(currentTime),它用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。

整个时间轮的总体跨度是不变的,随着指针currentTime的不断流动,当前时间轮所能处理的时间段也在不断后移,整个时间轮的时间范围在currentTime和currentTime+interval之间。

currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimeWheelTaskList中的所有任务。

currentTime游标指针的运作流程
  1. 初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插入进来会存放到时间格为2的任务列表中。

  2. 随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2所对应的任务列表中的任务做相应的到期操作。

  3. 此时若又有一个定时为8ms的任务插入进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10。

总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime+interval之间

层次化时间轮机制

如果当提交了超过整体跨度(interval)的延时任务,如何解决呢?因此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到层次时间轮中。

层次化时间轮实现原理介绍

层次化时间轮任务升级跃迁

在任务插入时,如果第一层时间轮不满足条件,就尝试插入到高一层的时间轮,以此类推。

  • 第一层的时间轮tickMs=1ms, wheelSize=20, interval=20ms

  • 第二层的时间轮的tickMs为第一层时间轮的interval,即为20ms

第一层和第二层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。正好是第一层时间轮的20倍。
以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms

  • 第N层时间轮走了一圈,等于N+1层时间轮走一格。即高一层时间轮的时间跨度等于当前时间轮的整体跨度。

层次化时间轮任务降级跃迁

随着时间推进,也会有一个时间轮降级的操作,原本延时较长的任务会从高一层时间轮重新提交到时间轮中,然后会被放在合适的低层次的时间轮当中等待处理;

案例介绍

层次化时间轮任务升级跃迁

例如:350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入到第二层时间轮中时间格17所对应的TimeWheelTaskList中。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格1的TimeWheelTaskList中。

层次化时间轮任务降级跃迁

在 [400ms,800ms) 区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格1中,时间格1对应的TimerTaskList的超时时间为400ms

随着时间的流逝,当次TimeWheelTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。

这里就有一个时间轮降级的操作,会将这个剩余时间为50ms的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。

再经历了40ms之后,此时这个任务又被“察觉”到,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历10ms后,此任务真正到期,最终执行相应的到期操作。

后续章节预告

接下来小编会出【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下),进行实战编码进行开发对应的层次化的时间轮算法落地。