2023年3月

etcd原理详解

etcd概述

etcd的特点

etcd是一个Go言编写的分布式、高可用的一致性键值存储系统,用于提供可靠的分布式键值存储、配置共享和服务发现等功能

etcd具有以下特点:

  • 简单:


    • 易使用:


      • etcdv2基于HTTP+JSON的API用curl就可以轻松使用(v2)

      • etcdv3的客户端使用gRPC与server进行通信,通信的消息协议使用protobuf进行约定,代替了v2版本的HTTP+JSON格式(v3)

    • 易部署:使用Go语言编写,跨平台,部署和维护简单。

  • 可靠:


    • 强一致:使用Raft算法充分保证了分布式系统数据的强一致性;

    • 高可用:具有容错能力,假设集群有n个节点,当有(n-1)/2节点发送故障,依然能提供服务;

    • 持久化:数据更新后,会通过WAL格式数据持久化到磁盘,支持Snapshot快照。

  • 快速:每个实例每秒支持一千次写操作,极限写性能可达10K QPS。

  • 安全:可选SSL客户认证机制。

  • v3版本使用HTTP/2协议,同一个连接可以同时处理多个请求,摒弃多个请求需要建立多个连接的方式。

etcd的应用场景

  • 服务发现(service discovery)

  • 消息发布与订阅

  • 负载均衡

  • 分布式通知与协调

  • 分布式锁、分布式队列

  • 集群监控与Leader竞选

etcd架构

etcd主要分为四个部分:

  1. HTTP Server:用于处理用户发送的API请求以及其它etcd节点的同步与心跳信息请求。

  2. Store:用于处理etcd支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件处理与执行等等,是etcd对用户提供的大多数API功能的具体实现。

  3. Raft:Raft强一致性算法的具体实现,是etcd的核心。

  4. WAL:Write Ahead Log(预写式日志),是etcd的数据存储方式。除了在内存中存有所有数据的状态以及节点的索引以外,etcd就通过WAL进行持久化存储。WAL中,所有的数据提交前都会事先记录日志。Snapshot是为了防止数据过多而进行的状态快照;Entry表示存储的具体日志内容。

Raft协议

Raft的基本概念

每个etcd节点都维护了一个状态机,并且任何时刻最多存在一个有效的主节点,主节点处理所有来自客户端的写操作,通过Raft协议保证写操作对状态机的改动会可靠的同步到其他节点。

Raft协议一共包含如下3类角色:

  • Leader(领袖):领袖由群众投票选举得出,每次选举,只能选出一名领袖;

  • Candidate(候选人):当没有领袖时,某些群众可以成为候选人,然后去竞争领袖的位置;

  • Follower(群众):没有竞选资格

在进行选举过程中,有几个重要的概念:

  • Leader Election(领导人选举):简称选举,就是从候选人中选出领袖;

  • Term(任期):它其实是个单独递增的连续数字,每一次任期就会重新发起一次领导人选举;

  • Election Timeout(选举超时):就是一个超时时间,当群众超时未收到领袖的心跳时,会重新进行选举。

选主

角色转换

  • 群众 -> 候选人:当开始选举,或者“选举超时”时

  • 候选人 -> 候选人:当“选举超时”,或者开始新的“任期”

  • 候选人 -> 领袖:获取大多数投票时

  • 候选人 -> 群众:其它节点成为领袖,或者开始新的“任期”

  • 领袖 -> 群众:发现自己的任期ID比其它节点分任期ID小时,会自动放弃领袖位置

选举的特点

Raft协议是用于维护一组服务节点数据一致性的协议。一组服务节点构成一个集群,并且有一个节点来对外提供服务。当集群初始化,或者主节点挂掉以后,面临选主的问题。集群中每个节点,任意时刻都处于Leader、Follower、Candidate这三个角色之一。

  1. 当集群初始化时,每个节点都是Follower角色

  2. 集群中存在至多1个有效的主节点,通过心跳与其他节点同步数据

  3. 当Follower在一定时间内没有收到来自主节点的心跳,会将自己的角色改变为Candidate,并发起一次选主投票


    • 当收到包括自己在内超过半数节点赞成后,选举成功

    • 当收到票数不足半数选举失败,或者选举超时

    • 若本轮未选出主节点,将进行下一轮选举(出现这种情况,是由于多个节点同时选举,所有节点均为获得超过半数选票)

  4. Candidate节点收到来自主节点的信息后,会立即终止选举过程,进入Follower角色。


    • 为了避免陷入选主失败循环,每个节点未收到心跳发起选举的时间是一定范围内的随机值,这样能够避免2个节点同时发起选主。

选举流程示例

1)选举人选举

现在有A、B、C三个节点

  1. 成为候选人:


    • 每个节点都有自己的“超时时间”,且它是随机的,区间值为150~300ms(所以出现相同随机时间的概率比较小)

    • 假设节点 B 最先超时,这时它就成为候选人。

  2. 选举领导人:


    • 候选人 B 开始发起投票,群众 A 和 C 返回投票,当候选人 B 获取大部分选票后,选举成功,候选人 B 成为领袖。

  3. 心跳探测:


    • 为了时刻宣誓自己的领导人地位,领袖 B 需要时刻向群众发起心跳

    • 当群众 A 和 C 收到领袖B的心跳后,群众 A 和 C 的“超时时间”会重置为0,然后重新计数,依次反复。

  • 注意:领袖广播心跳的周期必须要短于“选举定时器”的超时时间,否则群众会频繁成为候选者,也就会出现频繁发生选举,切换Leader的情况。

2)领袖挂掉

  1. 当领袖 B 挂掉,群众 A 和 C 的“选举定时器”会一直运行,当群众 A 先超时时,会成为候选人。

  2. 后续流程和“领导人选举”流程一样,即通知投票 -> 接收投票 -> 成为领袖 -> 心跳探测。

3)出现多个候选者情况

  1. 当出现多个候选者 A 和 D 时,两个候选者会同时发起投票:


    • 如果票数不同,最先得到大部分投票的节点会成为领袖。

    • 如果获取的票数相同,会重新发起新一轮的投票。

  2. 当 C 成为新的候选者,此时的任期Term为5,发起新一轮的投票,其它节点发起投票后,会更新自己的任期值,最后选择新的领袖为C节点。

脑裂情况

当网络问题导致脑裂,出现双Leader情况时,每个网络可以理解为一个独立的网络,因为原先的Leader独自在一个区,所以向他提交的数据不可能被复制到大多数节点上,所以数据永远都不会提交

当网络恢复之后,旧的Leader发现集群中的新Leader的Term比自己大,则自动降级为Follower,并从新Leader处同步数据达成集群数据一致

脑裂情况其实只是异常情况的一种,当Leader通知Follower更新日志、Leader提交更新时,都存在各种异常情况导致的问题

日志复制

日志复制,是指主节点将每次操作形成日志条目,并持久化到本地磁盘,然后通过网络IO发送给其他节点。其他节点根据日志的 逻辑时钟(TERM) 和 日志编号(INDEX) 来判断是否将该日志记录持久化到本地。
当主节点收到包括自己在内超过半数节点成功返回,那么认为该日志是可提交的(committed),并将日志输入到状态机,将结果返回给客户端。

每次选主都会形成一个唯一的TERM编号,相当于逻辑时钟。每一条日志都有全局唯一的编号。

主节点通过网络IO向其他节点追加日志:

  1. 若某节点收到日志追加的消息,首先判断该日志的TERM是否过期,以及该日志条目的INDEX是否比当前以及提交的日志的INDEX更早

  2. 若已过期,或者比提交的日志更早,那么就拒绝追加,并返回该节点当前的已提交的日志的编号。否则,将日志追加,并返回成功

  3. 当主节点收到其他节点关于日志追加的回复后,若发现有拒绝,则根据该节点返回的已提交日志编号,发生其编号下一条日志。

主节点像其他节点同步日志,还作了拥塞控制:主节点发现日志复制的目标节点拒绝了某次日志追加消息,将进入日志探测阶段,一条一条发送日志,直到目标节点接受日志,然后进入快速复制阶段,可进行批量日志追加。

按照日志复制的逻辑,集群中慢节点不影响整个集群的性能。且数据只从主节点复制到Follower节点,这样大大简化了逻辑流程。

安全性

截止此刻,选主以及日志复制并不能保证节点间数据一致。当一个某个节点挂掉了,一段时间后再次重启,并当选为主节点。而在其挂掉这段时间内,集群若有超过半数节点存活,集群会正常工作,那么会有日志提交。这些提交的日志无法传递给挂掉的节点。当挂掉的节点再次当选主节点,它将缺失部分已提交的日志。在这样场景下,按Raft协议,它将自己日志复制给其他节点,会将集群已经提交的日志给覆盖掉。这显然是不可接受的。

其他协议解决这个问题的办法是,新当选的主节点会询问其他节点,和自己数据对比,确定出集群已提交数据,然后将缺失的数据同步过来。这个方案有明显缺陷,增加了集群恢复服务的时间(集群在选举阶段不可服务),并且增加了协议的复杂度。

Raft解决的办法是,在选主逻辑中,对能够成为主的节点加以限制,确保选出的节点已定包含了集群已经提交的所有日志。如果新选出的主节点已经包含了集群所有提交的日志,那就不需要从和其他节点比对数据了。简化了流程,缩短了集群恢复服务的时间。

这里存在一个问题,加以这样限制之后,还能否选出主呢?答案是:只要仍然有超过半数节点存活,这样的主一定能够选出。因为已经提交的日志必然被集群中超过半数节点持久化,显然前一个主节点提交的最后一条日志也被集群中大部分节点持久化。当主节点挂掉后,集群中仍有大部分节点存活,那这存活的节点中一定存在一个节点包含了已经提交的日志了。

etcd集群部署

etcd安装

环境信息

node IP OS etcd版本
etcd0 10.0.0.80 CentOS7.9 etcd-v3.5.4
etcd1 10.0.0.81 CentOS7.9 etcd-v3.5.4
etcd2 10.0.0.82 CentOS7.9 etcd-v3.5.4

etcd二进制安装

etcd下载:
https://github.com/etcd-io/etcd/releases/download/v3.5.4/etcd-v3.5.4-linux-amd64.tar.gz

 wget -c https://github.com/etcd-io/etcd/releases/download/v3.5.4/etcd-v3.5.4-linux-amd64.tar.gz
 tar zxvf etcd-v3.5.4-linux-amd64.tar.gz -C /usr/local/
 ln -s /usr/local/etcd-v3.5.4-linux-amd64/ /usr/local/etcd
 sed -i '$aPATH="/usr/local/etcd/:$PATH"' /etc/profile
 source /etc/profile
 
 mkdir -pv /data/etcd/{log,data}  # data为节点数据存储目录,log为日志目录

etcd集群初始化

在每台etcd节点上进行集群初始化

 # 在每台etcd节点上进行集群初始化
 # 注意:在etcd2和etcd3上操作时注意修改--name并且将对应的IP地址改成对应节点的IP
 #!/bin/sh
 etcd  --name etcd0 \
 --data-dir /data/etcd/data \
 --advertise-client-urls http://10.0.0.80:2379,http://10.0.0.80:4001 \
 --listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
 --initial-advertise-peer-urls http://10.0.0.80:2380 \
 --listen-peer-urls http://0.0.0.0:2380 \
 --initial-cluster-token etcd-cluster-1 \
 --initial-cluster etcd0=http://10.0.0.80:2380,etcd1=http://10.0.0.81:2380,etcd2=http://10.0.0.82:2380 \
 --initial-cluster-state new > /data/etcd/log/etcd.log 2>&1
 
 # 以后台进程方式运行在后台
 # 将以上命令保存到 ./etcd0.sh
 chmod +x etcd0.sh
 nohup ./etcd0.sh &
 
 # 注意:初始化只是在集群初始化时运行一次,后续服务重启的话,需要去掉所有的initial参数,否则会报错
 # 集群初始化完毕后的启动命令:
 etcd  --name etcd0 \
 --data-dir /data/etcd/data \
 --advertise-client-urls http://10.0.0.80:2379,http://10.0.0.80:4001 \
 --listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
 --listen-peer-urls http://0.0.0.0:2380

etcd参数说明

  • –name
    :节点名称

  • --data-dir
    :指定节点的数据存储目录;若不指定,则默认是当前目录;这些数据包括节点ID,集群ID,集群初始化配置,Snapshot文件,若未指 定–wal-dir,还会存储WAL文件。

  • –wal-dir
    :指定节点的was文件存储目录,若指定了该参数,wal文件会和其他数据文件分开存储

  • –advertise-client-urls
    :告知客户端的URL, 也就是服务的URL,
    tcp2379端口用于监听客户端请求

  • –initial-advertise-peer-urls
    :建议用于节点之间通信的url,节点间将以该值进行通信。告知集群其他节点的URL,
    tcp2380端口用于集群通信

  • –listen-peer-urls
    :监听的用于节点之间通信的url,可监听多个,集群内部将通过这些url进行数据交互(如选举,数据同步等)

  • –initial-cluster-token
    :集群的ID

  • –initial-cluster
    :集群中所有节点

  • –initial-cluster-state
    :集群状态,
    new为新创建集群,existing为已存在的集群

检查etcd集群状态

 # 检查集群状态
 [root@etcd0 ~]# etcdctl member list --write-out=table --endpoints=http://10.0.0.80:2379
 +------------------+---------+-------+-----------------------+---------------------------------------------+------------+
 |       ID       | STATUS | NAME |     PEER ADDRS       |               CLIENT ADDRS                 | IS LEARNER |
 +------------------+---------+-------+-----------------------+---------------------------------------------+------------+
 | 31eb17a19c60d188 | started | etcd0 | http://10.0.0.80:2380 | http://10.0.0.80:2379,http://10.0.0.80:4001 |      false |
 | d23e97e2884bcaba | started | etcd1 | http://10.0.0.81:2380 | http://10.0.0.81:2379,http://10.0.0.81:4001 |      false |
 | f0a7d39c34766dd7 | started | etcd2 | http://10.0.0.82:2380 | http://10.0.0.82:2379,http://10.0.0.82:4001 |      false |
 +------------------+---------+-------+-----------------------+---------------------------------------------+------------+
 
 # 检查节点健康状态
 [root@etcd0 ~]# etcdctl endpoint health --cluster --endpoints="http://10.0.0.80:2379"
 http://10.0.0.80:4001 is healthy: successfully committed proposal: took = 5.78219ms
 http://10.0.0.82:2379 is healthy: successfully committed proposal: took = 7.975032ms
 http://10.0.0.81:2379 is healthy: successfully committed proposal: took = 6.516737ms
 http://10.0.0.80:2379 is healthy: successfully committed proposal: took = 7.419094ms
 http://10.0.0.82:4001 is healthy: successfully committed proposal: took = 5.000158ms
 http://10.0.0.81:4001 is healthy: successfully committed proposal: took = 8.251767ms
 
 # 检查集群详细状态
 [root@etcd0 ~]# etcdctl endpoint status --endpoints="http://10.0.0.80:2379,http://10.0.0.81:2379,http://10.0.0.82:2379" --write-out=table
 +-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
 |       ENDPOINT       |       ID       | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
 +-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
 | http://10.0.0.80:2379 | 31eb17a19c60d188 |   3.5.4 |   20 kB |      true |      false |         2 |         23 |                 23 |       |
 | http://10.0.0.81:2379 | d23e97e2884bcaba |   3.5.4 |   20 kB |     false |      false |         2 |         23 |                 23 |       |
 | http://10.0.0.82:2379 | f0a7d39c34766dd7 |   3.5.4 |   20 kB |     false |      false |         2 |         23 |                 23 |       |
 +-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
 

etcd节点迁移

当遇到硬件故障发生的时候,需要快速恢复节点。ETCD集群可以做到在不丢失数据的,并且不改变节点ID的情况下,迁移节点:

  1. 停止待迁移节点上的etc进程;

  2. 将数据目录打包复制到新的节点;

  3. 更新该节点对应集群中peer url,让其指向新的节点;

  4. 使用相同的配置,在新的节点上启动etcd进程;

etcd操作

写入数据

 # 使用put写入键值对
 [root@etcd0 ~]# etcdctl put mytest "hello" --endpoints=http://10.0.0.80:2379
 OK
 [root@etcd0 ~]# etcdctl put mytest1 "hello 001" --endpoints=http://10.0.0.80:2379
 OK
 [root@etcd0 ~]# etcdctl put mytest2 "hello 002" --endpoints=http://10.0.0.80:2379
 OK

获取数据

 # 使用get来通过具体的键获取数据
 [root@etcd0 ~]# etcdctl get mytest --endpoints=http://10.0.0.80:2379
 mytest
 hello
 
 # 通过前缀来获取数据
 [root@etcd0 ~]# etcdctl get mytest --prefix --endpoints=http://10.0.0.80:2379
 mytest
 hello
 mytest1
 hello 001
 mytest2
 hello 002

删除数据

 [root@etcd0 ~]# etcdctl del mytest --endpoints=http://10.0.0.80:2379
 1  # 删除了一条数据
 [root@etcd0 ~]# etcdctl del mytest --prefix --endpoints=http://10.0.0.80:2379
 2  # 删除了两条数据

监听数据

watch用于获取监听信息的更改,其可以持续监听

 # 先开一个窗口监听
 [root@redis01 ~]# etcdctl watch mytest --endpoints=http://10.0.0.80:2379
 
 
 # 然后再开一个窗口写入
 [root@redis01 ~]# etcdctl put mytest 001 --endpoints=http://10.0.0.80:2379
 OK
 
 # 先前的窗口会受到更新信息,并且会持续监听
 [root@redis01 ~]# etcdctl watch mytest --endpoints=http://10.0.0.80:2379
 PUT
 mytest
 001
 

go语言操作etcd

连接etcd

连接k8s中的etcd

 # 操作k8s中的etcd需要指定对应的ca,以及对应的证书和私钥,这里使用了apiserver的证书和私钥来连接etcd
 etcdctl --endpoints=https://10.0.0.10:2379  --cacert="/etc/kubernetes/pki/etcd/ca.crt" --cert="/etc/kubernetes/pki/apiserver-etcd-client.crt" --key="/etc/kubernetes/pki/apiserver-etcd-client.key"

go语言设置TLS调用etcd:

package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"go.etcd.io/etcd/clientv3"
"io/ioutil"
)

func main() {
// 设置客户端证书和私钥
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
fmt.Println("Error loading client key pair:", err)
return
}

// 加载根证书
caCert, err := ioutil.ReadFile("ca.crt")
if err != nil {
fmt.Println("Error reading CA certificate:", err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

// 创建 TLS 配置
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

// 创建 etcd 客户端连接
endpoint := "localhost:2379"
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{endpoint},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
fmt.Println("Error creating etcd client:", err)
return
}

// 在连接上设置上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 执行 etcd 操作
resp, err := client.Put(ctx, "example_key", "example_value")
if err != nil {
fmt.Println("Error putting value:", err)
return
}
fmt.Println(resp)
}

PUT和GET操作

package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"

"go.etcd.io/etcd/clientv3"
)

// use etcd/clientv3

func main() {
// 连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"10.0.0.80:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()

// put操作
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"./my.log","topic":"web_log"},{"path":"./xxx.log","topic":"xxx_conf"}]`
_, err = cli.Put(ctx, "/logagent/192.168.10.2/collect_config", value)
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}

// get操作
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "/logagent/192.168.10.2/collect_config")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}

watch操作

watch操作示例:

// watch传入的key是否有变化,如果有变化,将其获取后创建成配置对象发往taillog的新配置通道
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key) // watch对应的key,第一个参数需要传入一个context对象
for wresp := range ch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// key有变换,进行通知,先判断操作的类型
var newConf []*LogEntry
if ev.Type != clientv3.EventTypeDelete {
// 如果不是删除操作, 则创建一个newConf对象传入taillog的新配置通道
err := json.Unmarshal(ev.Kv.Value, &newConf)
if err != nil {
fmt.Println("json unmarshal failed , err: ", err)
continue
}
}
fmt.Printf("get new conf: %v\n", newConf)
newConfCh <- newConf
}
}
}

每一周,我们的同事都会向社区的成员们发布一些关于 Hugging Face 相关的更新,包括我们的产品和平台更新、社区活动、学习资源和内容更新、开源库和模型更新等,我们将其称之为「Hugging News」,本期 Hugging News 有哪些有趣的消息,快来看看吧!

社区活动

全球社区的 Keras Dreambooth 活动发布

我们的全球 Hugging Face 社区正在举办一个使用 KerasCV 训练 Dreambooth 模型的活动,时间是 3 月 7 日到 4 月 1 日。欢迎大家参加!

同时不要忘记参加我们中国社区
与 PaddlePaddle 联合的举办的微调黑客松
,以及
与百姓 AI 联合举办的聊天机器人黑客松活动

开源库更新

Diffusers 现已支持使用 ControlNet

感谢社区贡献者 Takuma Mori (@takuma104) 的巨大贡献,帮助将 ControlNet 集成到 Diffusers 中,Diffusers 现已支持使用 ControlNet。类似 Diffusers 中的其他 Pipeline,Diffusers 同样为 ControlNet 提供了
StableDiffusionControlNetPipeline
供用户使用。
StableDiffusionControlNetPipeline
的核心是
controlnet
参数,它接收用户指定的训练过的
ControlNetModel
实例作为输入,同时保持扩散模型的预训练权重不变。


更多详细情况和使用说明,请查看我们的推文:

《使用

操作系统 :CentOS 7.6.1810_x64

Python 版本 : 3.9.12

一、背景描述

使用python开发过程中,会遇到需要使用缓存加速应用的情况,比如下面这些场景:

  • 数据转换加速

字符串时间转换成int时间戳;

字符串时间转换成datetime类型;

...

  • 数据解析加速

bytes数据转换为int(数据包解析场景的端口、序列号等);

bytes数据转换为string(数据包解析场景的ip地址等);

...

本文提供两种实现方式来加速应用,这里记录下,希望对你有帮助。

二、具体实现

1、使用python自带的OrderedDict实现LRU

实现思路:

1)使用OrderedDict作为缓存,并设置大小;

2)第一次解析时,将解析结果加入缓存;

3)缓存元素数量超过设定数量,执行pop操作;

示例代码如下 :

from collections importOrderedDictclassLRU:def __init__(self, func, maxsize=128):
self.func
=func
self.maxsize
=maxsize
self.cache
=OrderedDict()def __call__(self, *args):if args inself.cache:
value
=self.cache[args]
self.cache.move_to_end(args)
returnvalue
value
= self.func(*args)if len(self.cache) >=self.maxsize:
self.cache.popitem(False)
self.cache[args]
=valuereturn value

2、使用lru-dict库实现LRU

pypi地址:
https://pypi.org/project/lru-dict/

github地址:
https://github.com/amitdev/lru-dict

安装lru-dict库:

pip install lru-dict

示例代码:

from lru importLRU
l
= LRU(5) #Create an LRU container that can hold 5 items print l.peek_first_item(), l.peek_last_item() #return the MRU key and LRU key#Would print None None for i in range(5):
l[i]
=str(i)print l.items() #Prints items in MRU order#Would print [(4, '4'), (3, '3'), (2, '2'), (1, '1'), (0, '0')] print l.peek_first_item(), l.peek_last_item() #return the MRU key and LRU key#Would print (4, '4') (0, '0') l[5] = '5' #Inserting one more item should evict the old item printl.items()#Would print [(5, '5'), (4, '4'), (3, '3'), (2, '2'), (1, '1')]

由于lru-dict库是使用c实现的,使用源代码安装可能存在环境问题,可直接使用pypi上面提供的预编译whl文件:

说明:

1)源码包为 lru-dict-1.1.8.tar.gz;

2)本文使用的whl文件是 lru_dict-1.1.8-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl ;

3)可从以下途径获取上述文件:

关注微信公众号(聊聊博文,文末可扫码)后回复
2023031901
获取。

三、运行效果

这里演示下两种实现方式的具体效果,并做下对比。

1、测试用例1(lru库实现)

测试代码:

运行效果:

运行时间:15.046 秒

2、测试用例2( OrderedDict实现)

测试代码:

运行效果:

运行时间: 28.934秒

结论:

lru-dict库比较快。

说明:

1)使用OrderedDict实现LRU的优点在于不用安装额外的库;

2)lru-dict是使用c语言开发的第三方库,需要使用pip进行安装,性能比较好,但和平台相关性比较强;

四、资源下载

本文涉及示例代码及whl文件,可从百度网盘获取:

https://pan.baidu.com/s/1N6wWHhMkvXcyVI5mEhn1JA

关注微信公众号(聊聊博文,文末可扫码)后回复
2023031901
获取。






在 Go 中
for
用来循环和迭代,
Go 语言没有
while

do

until
这几个关键字,我们只能使用
for
。这也算是件好事!

让我们来为一个重复字符 5 次的函数编写测试。

先编写测试用例

packageiterationimport "testing"

func TestRepeat(t *testing.T) {
repeated :
= Repeat("a")
expected :
= "aaaaa" if repeated !=expected {
t.Errorf(
"expected '%q' but got '%q'", expected, repeated)
}
}

先使用最少的代码来让测试先跑起来

packageiterationfunc Repeat(character string) string{return ""}

把代码补充完整,使得它能够通过测试

func Repeat(character string) string{var repeated string
    for i := 0; i < 5; i++{
repeated
= repeated +character
}
returnrepeated
}

就像大多数类 C 的语言一样,
for
语法很不起眼。

与其它语言如 C,Java 或 JavaScript 不同,在 Go 中
for
语句前导条件部分并没有圆括号,而且大括号 { } 是必须的。你可能会好奇下面这行

    var repeated string







我们目前都是使用
:=

来声明和初始化变量。然后
:=

只是简写(简短模式定义看这里

一、限流

想要在
Ocelot
中设置限流,需要在
设置如下绿色所示:

{"GlobalConfiguration": {"RateLimitOptions": {
"DisableRateLimitHeaders": false,
"QuotaExceededMessage": "Customize Tips!",
"HttpStatusCode": 999,
"ClientIdHeader": "Test"
}
},
"Routes": [
{
"DownstreamPathTemplate": "/{everything}","DownstreamScheme": "https","DownstreamHostAndPorts": [
{
"Host": "localhost","Port": 5001}
],
"UpstreamPathTemplate": "/api/{everything}","UpstreamHttpMethod": [ "Get", "Post"],"RateLimitOptions": {
"ClientWhitelist": [],
"EnableRateLimiting": true,
"Period": "1s",
"PeriodTimespan": 1,
"Limit": 1
}
}
]
}


Route
中设置的参数解释如下:

ClientWhitelist
:白名单,白名单内的客户端请求不会被限流。

Period
:限流时间,如1s、5m、1h、1d等。如果在此时间段内发出的请求超过限制,则需要等待
PeriodTimespan
结束后再发出另一个请求。

PeriodTimespan
:触发限流后多久后可以再次发起请求。

Limit
:限流时间内限制的请求次数。


GlobalConfiguration
中设置的参数解释如下:

DisableRateLimitHeaders
:是否禁用X-Rate-Limit和Retry-After标头。

QuotaExceededMessage
:限流返回的错误提示。

HttpStatusCode
:设置限流发生后返回的HTTP状态码。

ClientIdHeader
:指定应用于标识客户端的标头,默认是
ClientId

然后看下效果:

二、熔断


Ocelot
中熔断采用了
Plloy
来实现,我们需要先安装:

Install-Package Ocelot.Provider.Polly

需要注入方法

builder.Services.AddOcelot().AddPolly();

然后在每个路由配置中添加以下配置,表示出现超过3次异常将会熔断1秒,请求超过5秒算超时。

"QoSOptions": {"ExceptionsAllowedBeforeBreaking":3,"DurationOfBreak":1000,"TimeoutValue":5000}

ExceptionsAllowedBeforeBreaking
必须大于0才能执行此规则,表示允许异常的次数。
DurationOfBreak
表示熔断时长,单位为毫秒。
TimeoutValue
表示超时时间,超过多久算超时,单位毫秒。

我们测试一下从下面的动图可以看到,我们三次超时后,熔断了的请求直接返回了503。

三、负载均衡

Ocelot
可以在每个路由的可用下游服务之间进行负载均衡,负载平衡有以下几种类型:

LeastConnection
:追踪那些服务正在请求中,并且将最新的请求发送给当前请求最少的服务。

RoundRobin
:轮询可用的服务并且发送请求,需要配合Consul才能对下线服务进行检测,否则还是直接报错的。

NoLoadBalancer
:从配置或者服务中发现第一个可用的服务发送请求。

CookieStickySessions
:使用cookie关联所有相关的请求到制定的服务,下面会细说。

使用负载均衡就是在一条路由中设置多个下游服务,然后选择一个负载均衡类型,如下:

{"DownstreamPathTemplate": "/api/posts/{postId}","DownstreamScheme": "https","DownstreamHostAndPorts": [
{
"Host": "10.0.1.10","Port": 5000,
},
{
"Host": "10.0.1.11","Port": 5000,
}
],
"UpstreamPathTemplate": "/posts/{postId}","LoadBalancerOptions": {"Type": "LeastConnection"},"UpstreamHttpMethod": [ "Put", "Delete"]
}

1、服务发现中使用负载均衡

如下设置完服务名后,会自动查找下游的主机和端口,并在任何可用的服务之间进行负载均衡请求。如果从
Consul
中添加和删除服务,那么
Ocelot
会停止调用删除的服务,并且调用添加的服务。

{"DownstreamPathTemplate": "/api/posts/{postId}","DownstreamScheme": "https","UpstreamPathTemplate": "/posts/{postId}","UpstreamHttpMethod": [ "Put"],"ServiceName": "product","LoadBalancerOptions": {"Type": "LeastConnection"},
}

2、CookieStickySessions

CookieStickySessions
类型的负载均衡就是为了实现在有很多下游服务的时候共享会话状态,只需要如下设置就行。

LoadBalancerOptions
就是需要设置为
CookieStickySessions

Key
是用于实现此功能的
cookie
的关键字,
Expiry
是希望执行会话的时间,以毫秒计算,每次请求都将刷新时间。

{"DownstreamPathTemplate": "/api/posts/{postId}","DownstreamScheme": "https","DownstreamHostAndPorts": [
{
"Host": "10.0.1.10","Port": 5000,
},
{
"Host": "10.0.1.11","Port": 5000,
}
],
"UpstreamPathTemplate": "/posts/{postId}","LoadBalancerOptions": {"Type": "CookieStickySessions","Key": "ASP.NET_SessionId","Expiry": 1800000},"UpstreamHttpMethod": [ "Put", "Delete"]
}

如果有多个主机服务或者使用了
Consul
,那么
CookieStickySessions
会使用轮询的方式来选择下个服务器,目前是硬编码的,但是可以改变。

3、自定义负载均衡策略

当我们启用自定义的负载均衡时,
Ocelot
会根据负载均衡器的名称去查找,如果找到了就会使用。如果负载均衡的类型没有与注册的负载均衡类的名称匹配,那么将会返回
500
错误。没有设置负载均衡策略将是不会进行负载均衡的。

我们创建一个自定义负载均衡策略类MyCustomLoadBalancer,继承
ILoadBalancer接口实现对应的接口

public classMyCustomLoadBalancer: ILoadBalancer
{
private readonly Func<Task<List<Service>>>_services;private readonly object _lock = new object();private int_last;public MyCustomLoadBalancer(Func<Task<List<Service>>>services)
{
_services
=services;
}
public async Task<Response<ServiceHostAndPort>>Lease(HttpContext httpContext)
{
var services = await_services();lock(_lock)
{
if (_last >=services.Count)
{
_last
= 0;
}
var next =services[_last];
_last
++;return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
}
}
public voidRelease(ServiceHostAndPort hostAndPort)
{
}
}

然后在注入该策略

s.AddOcelot().AddCustomLoadBalancer<MyCustomLoadBalancer>();

最后在配置文件中配置路由的策略类型名为我们定以的策略类的名称即可

//...
"LoadBalancerOptions": {"Type": "MyCustomLoadBalancer"}//...