etcd详解
etcd概述
etcd的特点
etcd是一个Go言编写的分布式、高可用的一致性键值存储系统,用于提供可靠的分布式键值存储、配置共享和服务发现等功能
etcd具有以下特点:
简单:
易使用:
etcdv2基于HTTP+JSON的API用curl就可以轻松使用(v2)
etcdv3的客户端使用gRPC与server进行通信,通信的消息协议使用protobuf进行约定,代替了v2版本的HTTP+JSON格式(v3)
易部署:使用Go语言编写,跨平台,部署和维护简单。
可靠:
强一致:使用Raft算法充分保证了分布式系统数据的强一致性;
高可用:具有容错能力,假设集群有n个节点,当有(n-1)/2节点发送故障,依然能提供服务;
持久化:数据更新后,会通过WAL格式数据持久化到磁盘,支持Snapshot快照。
快速:每个实例每秒支持一千次写操作,极限写性能可达10K QPS。
安全:可选SSL客户认证机制。
v3版本使用HTTP/2协议,同一个连接可以同时处理多个请求,摒弃多个请求需要建立多个连接的方式。
etcd的应用场景
服务发现(service discovery)
消息发布与订阅
负载均衡
分布式通知与协调
分布式锁、分布式队列
集群监控与Leader竞选
etcd架构
etcd主要分为四个部分:
HTTP Server:用于处理用户发送的API请求以及其它etcd节点的同步与心跳信息请求。
Store:用于处理etcd支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件处理与执行等等,是etcd对用户提供的大多数API功能的具体实现。
Raft:Raft强一致性算法的具体实现,是etcd的核心。
WAL:Write Ahead Log(预写式日志),是etcd的数据存储方式。除了在内存中存有所有数据的状态以及节点的索引以外,etcd就通过WAL进行持久化存储。WAL中,所有的数据提交前都会事先记录日志。Snapshot是为了防止数据过多而进行的状态快照;Entry表示存储的具体日志内容。
Raft协议
Raft的基本概念
每个etcd节点都维护了一个状态机,并且任何时刻最多存在一个有效的主节点,主节点处理所有来自客户端的写操作,通过Raft协议保证写操作对状态机的改动会可靠的同步到其他节点。
Raft协议一共包含如下3类角色:
Leader(领袖):领袖由群众投票选举得出,每次选举,只能选出一名领袖;
Candidate(候选人):当没有领袖时,某些群众可以成为候选人,然后去竞争领袖的位置;
Follower(群众):没有竞选资格
在进行选举过程中,有几个重要的概念:
Leader Election(领导人选举):简称选举,就是从候选人中选出领袖;
Term(任期):它其实是个单独递增的连续数字,每一次任期就会重新发起一次领导人选举;
Election Timeout(选举超时):就是一个超时时间,当群众超时未收到领袖的心跳时,会重新进行选举。
选主
角色转换
群众 -> 候选人:当开始选举,或者“选举超时”时
候选人 -> 候选人:当“选举超时”,或者开始新的“任期”
候选人 -> 领袖:获取大多数投票时
候选人 -> 群众:其它节点成为领袖,或者开始新的“任期”
领袖 -> 群众:发现自己的任期ID比其它节点分任期ID小时,会自动放弃领袖位置
选举的特点
Raft协议是用于维护一组服务节点数据一致性的协议。一组服务节点构成一个集群,并且有一个节点来对外提供服务。当集群初始化,或者主节点挂掉以后,面临选主的问题。集群中每个节点,任意时刻都处于Leader、Follower、Candidate这三个角色之一。
当集群初始化时,每个节点都是Follower角色
集群中存在至多1个有效的主节点,通过心跳与其他节点同步数据
当Follower在一定时间内没有收到来自主节点的心跳,会将自己的角色改变为Candidate,并发起一次选主投票
当收到包括自己在内超过半数节点赞成后,选举成功
当收到票数不足半数选举失败,或者选举超时
若本轮未选出主节点,将进行下一轮选举(出现这种情况,是由于多个节点同时选举,所有节点均为获得超过半数选票)
Candidate节点收到来自主节点的信息后,会立即终止选举过程,进入Follower角色。
为了避免陷入选主失败循环,每个节点未收到心跳发起选举的时间是一定范围内的随机值,这样能够避免2个节点同时发起选主。
选举流程示例
1)选举人选举
现在有A、B、C三个节点
成为候选人:
每个节点都有自己的“超时时间”,且它是随机的,区间值为150~300ms(所以出现相同随机时间的概率比较小)
假设节点 B 最先超时,这时它就成为候选人。
选举领导人:
候选人 B 开始发起投票,群众 A 和 C 返回投票,当候选人 B 获取大部分选票后,选举成功,候选人 B 成为领袖。
心跳探测:
为了时刻宣誓自己的领导人地位,领袖 B 需要时刻向群众发起心跳
当群众 A 和 C 收到领袖B的心跳后,群众 A 和 C 的“超时时间”会重置为0,然后重新计数,依次反复。
注意:领袖广播心跳的周期必须要短于“选举定时器”的超时时间,否则群众会频繁成为候选者,也就会出现频繁发生选举,切换Leader的情况。
2)领袖挂掉
当领袖 B 挂掉,群众 A 和 C 的“选举定时器”会一直运行,当群众 A 先超时时,会成为候选人。
后续流程和“领导人选举”流程一样,即通知投票 -> 接收投票 -> 成为领袖 -> 心跳探测。
3)出现多个候选者情况
当出现多个候选者 A 和 D 时,两个候选者会同时发起投票:
如果票数不同,最先得到大部分投票的节点会成为领袖。
如果获取的票数相同,会重新发起新一轮的投票。
当 C 成为新的候选者,此时的任期Term为5,发起新一轮的投票,其它节点发起投票后,会更新自己的任期值,最后选择新的领袖为C节点。
脑裂情况
当网络问题导致脑裂,出现双Leader情况时,每个网络可以理解为一个独立的网络,因为原先的Leader独自在一个区,所以向他提交的数据不可能被复制到大多数节点上,所以数据永远都不会提交
当网络恢复之后,旧的Leader发现集群中的新Leader的Term比自己大,则自动降级为Follower,并从新Leader处同步数据达成集群数据一致
脑裂情况其实只是异常情况的一种,当Leader通知Follower更新日志、Leader提交更新时,都存在各种异常情况导致的问题
日志复制
日志复制,是指主节点将每次操作形成日志条目,并持久化到本地磁盘,然后通过网络IO发送给其他节点。其他节点根据日志的 逻辑时钟(TERM) 和 日志编号(INDEX) 来判断是否将该日志记录持久化到本地。
当主节点收到包括自己在内超过半数节点成功返回,那么认为该日志是可提交的(committed),并将日志输入到状态机,将结果返回给客户端。
每次选主都会形成一个唯一的TERM编号,相当于逻辑时钟。每一条日志都有全局唯一的编号。
主节点通过网络IO向其他节点追加日志:
若某节点收到日志追加的消息,首先判断该日志的TERM是否过期,以及该日志条目的INDEX是否比当前以及提交的日志的INDEX更早
若已过期,或者比提交的日志更早,那么就拒绝追加,并返回该节点当前的已提交的日志的编号。否则,将日志追加,并返回成功
当主节点收到其他节点关于日志追加的回复后,若发现有拒绝,则根据该节点返回的已提交日志编号,发生其编号下一条日志。
主节点像其他节点同步日志,还作了拥塞控制:主节点发现日志复制的目标节点拒绝了某次日志追加消息,将进入日志探测阶段,一条一条发送日志,直到目标节点接受日志,然后进入快速复制阶段,可进行批量日志追加。
按照日志复制的逻辑,集群中慢节点不影响整个集群的性能。且数据只从主节点复制到Follower节点,这样大大简化了逻辑流程。
安全性
截止此刻,选主以及日志复制并不能保证节点间数据一致。当一个某个节点挂掉了,一段时间后再次重启,并当选为主节点。而在其挂掉这段时间内,集群若有超过半数节点存活,集群会正常工作,那么会有日志提交。这些提交的日志无法传递给挂掉的节点。当挂掉的节点再次当选主节点,它将缺失部分已提交的日志。在这样场景下,按Raft协议,它将自己日志复制给其他节点,会将集群已经提交的日志给覆盖掉。这显然是不可接受的。
其他协议解决这个问题的办法是,新当选的主节点会询问其他节点,和自己数据对比,确定出集群已提交数据,然后将缺失的数据同步过来。这个方案有明显缺陷,增加了集群恢复服务的时间(集群在选举阶段不可服务),并且增加了协议的复杂度。
Raft解决的办法是,在选主逻辑中,对能够成为主的节点加以限制,确保选出的节点已定包含了集群已经提交的所有日志。如果新选出的主节点已经包含了集群所有提交的日志,那就不需要从和其他节点比对数据了。简化了流程,缩短了集群恢复服务的时间。
这里存在一个问题,加以这样限制之后,还能否选出主呢?答案是:只要仍然有超过半数节点存活,这样的主一定能够选出。因为已经提交的日志必然被集群中超过半数节点持久化,显然前一个主节点提交的最后一条日志也被集群中大部分节点持久化。当主节点挂掉后,集群中仍有大部分节点存活,那这存活的节点中一定存在一个节点包含了已经提交的日志了。
etcd集群部署
etcd安装
环境信息
node | IP | OS | etcd版本 |
---|---|---|---|
etcd0 | 10.0.0.80 | CentOS7.9 | etcd-v3.5.4 |
etcd1 | 10.0.0.81 | CentOS7.9 | etcd-v3.5.4 |
etcd2 | 10.0.0.82 | CentOS7.9 | etcd-v3.5.4 |
etcd二进制安装
etcd下载:
https://github.com/etcd-io/etcd/releases/download/v3.5.4/etcd-v3.5.4-linux-amd64.tar.gz
wget -c https://github.com/etcd-io/etcd/releases/download/v3.5.4/etcd-v3.5.4-linux-amd64.tar.gz
tar zxvf etcd-v3.5.4-linux-amd64.tar.gz -C /usr/local/
ln -s /usr/local/etcd-v3.5.4-linux-amd64/ /usr/local/etcd
sed -i '$aPATH="/usr/local/etcd/:$PATH"' /etc/profile
source /etc/profile
mkdir -pv /data/etcd/{log,data} # data为节点数据存储目录,log为日志目录
etcd集群初始化
在每台etcd节点上进行集群初始化
# 在每台etcd节点上进行集群初始化
# 注意:在etcd2和etcd3上操作时注意修改--name并且将对应的IP地址改成对应节点的IP
etcd参数说明
–name
:节点名称--data-dir
:指定节点的数据存储目录;若不指定,则默认是当前目录;这些数据包括节点ID,集群ID,集群初始化配置,Snapshot文件,若未指 定–wal-dir,还会存储WAL文件。–wal-dir
:指定节点的was文件存储目录,若指定了该参数,wal文件会和其他数据文件分开存储–advertise-client-urls
:告知客户端的URL, 也就是服务的URL,
tcp2379端口用于监听客户端请求–initial-advertise-peer-urls
:建议用于节点之间通信的url,节点间将以该值进行通信。告知集群其他节点的URL,
tcp2380端口用于集群通信–listen-peer-urls
:监听的用于节点之间通信的url,可监听多个,集群内部将通过这些url进行数据交互(如选举,数据同步等)–initial-cluster-token
:集群的ID–initial-cluster
:集群中所有节点–initial-cluster-state
:集群状态,
new为新创建集群,existing为已存在的集群
检查etcd集群状态
# 检查集群状态
[root@etcd0 ~]# etcdctl member list --write-out=table --endpoints=http://10.0.0.80:2379
+------------------+---------+-------+-----------------------+---------------------------------------------+------------+
| ID | STATUS | NAME | PEER ADDRS | CLIENT ADDRS | IS LEARNER |
+------------------+---------+-------+-----------------------+---------------------------------------------+------------+
| 31eb17a19c60d188 | started | etcd0 | http://10.0.0.80:2380 | http://10.0.0.80:2379,http://10.0.0.80:4001 | false |
| d23e97e2884bcaba | started | etcd1 | http://10.0.0.81:2380 | http://10.0.0.81:2379,http://10.0.0.81:4001 | false |
| f0a7d39c34766dd7 | started | etcd2 | http://10.0.0.82:2380 | http://10.0.0.82:2379,http://10.0.0.82:4001 | false |
+------------------+---------+-------+-----------------------+---------------------------------------------+------------+
# 检查节点健康状态
[root@etcd0 ~]# etcdctl endpoint health --cluster --endpoints="http://10.0.0.80:2379"
http://10.0.0.80:4001 is healthy: successfully committed proposal: took = 5.78219ms
http://10.0.0.82:2379 is healthy: successfully committed proposal: took = 7.975032ms
http://10.0.0.81:2379 is healthy: successfully committed proposal: took = 6.516737ms
http://10.0.0.80:2379 is healthy: successfully committed proposal: took = 7.419094ms
http://10.0.0.82:4001 is healthy: successfully committed proposal: took = 5.000158ms
http://10.0.0.81:4001 is healthy: successfully committed proposal: took = 8.251767ms
# 检查集群详细状态
[root@etcd0 ~]# etcdctl endpoint status --endpoints="http://10.0.0.80:2379,http://10.0.0.81:2379,http://10.0.0.82:2379" --write-out=table
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| http://10.0.0.80:2379 | 31eb17a19c60d188 | 3.5.4 | 20 kB | true | false | 2 | 23 | 23 | |
| http://10.0.0.81:2379 | d23e97e2884bcaba | 3.5.4 | 20 kB | false | false | 2 | 23 | 23 | |
| http://10.0.0.82:2379 | f0a7d39c34766dd7 | 3.5.4 | 20 kB | false | false | 2 | 23 | 23 | |
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
etcd节点迁移
当遇到硬件故障发生的时候,需要快速恢复节点。ETCD集群可以做到在不丢失数据的,并且不改变节点ID的情况下,迁移节点:
停止待迁移节点上的etc进程;
将数据目录打包复制到新的节点;
更新该节点对应集群中peer url,让其指向新的节点;
使用相同的配置,在新的节点上启动etcd进程;
etcd操作
写入数据
# 使用put写入键值对
[root@etcd0 ~]# etcdctl put mytest "hello" --endpoints=http://10.0.0.80:2379
OK
[root@etcd0 ~]# etcdctl put mytest1 "hello 001" --endpoints=http://10.0.0.80:2379
OK
[root@etcd0 ~]# etcdctl put mytest2 "hello 002" --endpoints=http://10.0.0.80:2379
OK
获取数据
# 使用get来通过具体的键获取数据
[root@etcd0 ~]# etcdctl get mytest --endpoints=http://10.0.0.80:2379
mytest
hello
# 通过前缀来获取数据
[root@etcd0 ~]# etcdctl get mytest --prefix --endpoints=http://10.0.0.80:2379
mytest
hello
mytest1
hello 001
mytest2
hello 002
删除数据
[root@etcd0 ~]# etcdctl del mytest --endpoints=http://10.0.0.80:2379
1 # 删除了一条数据
[root@etcd0 ~]# etcdctl del mytest --prefix --endpoints=http://10.0.0.80:2379
2 # 删除了两条数据
监听数据
watch用于获取监听信息的更改,其可以持续监听
# 先开一个窗口监听
[root@redis01 ~]# etcdctl watch mytest --endpoints=http://10.0.0.80:2379
# 然后再开一个窗口写入
[root@redis01 ~]# etcdctl put mytest 001 --endpoints=http://10.0.0.80:2379
OK
# 先前的窗口会受到更新信息,并且会持续监听
[root@redis01 ~]# etcdctl watch mytest --endpoints=http://10.0.0.80:2379
PUT
mytest
001
go语言操作etcd
连接etcd
连接k8s中的etcd
# 操作k8s中的etcd需要指定对应的ca,以及对应的证书和私钥,这里使用了apiserver的证书和私钥来连接etcd
etcdctl --endpoints=https://10.0.0.10:2379 --cacert="/etc/kubernetes/pki/etcd/ca.crt" --cert="/etc/kubernetes/pki/apiserver-etcd-client.crt" --key="/etc/kubernetes/pki/apiserver-etcd-client.key"
go语言设置TLS调用etcd:
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"go.etcd.io/etcd/clientv3"
"io/ioutil"
)
func main() {
// 设置客户端证书和私钥
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
fmt.Println("Error loading client key pair:", err)
return
}
// 加载根证书
caCert, err := ioutil.ReadFile("ca.crt")
if err != nil {
fmt.Println("Error reading CA certificate:", err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// 创建 TLS 配置
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
// 创建 etcd 客户端连接
endpoint := "localhost:2379"
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{endpoint},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
fmt.Println("Error creating etcd client:", err)
return
}
// 在连接上设置上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 执行 etcd 操作
resp, err := client.Put(ctx, "example_key", "example_value")
if err != nil {
fmt.Println("Error putting value:", err)
return
}
fmt.Println(resp)
}
PUT和GET操作
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"
"go.etcd.io/etcd/clientv3"
)
// use etcd/clientv3
func main() {
// 连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"10.0.0.80:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put操作
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"./my.log","topic":"web_log"},{"path":"./xxx.log","topic":"xxx_conf"}]`
_, err = cli.Put(ctx, "/logagent/192.168.10.2/collect_config", value)
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get操作
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "/logagent/192.168.10.2/collect_config")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
watch操作
watch操作示例:
// watch传入的key是否有变化,如果有变化,将其获取后创建成配置对象发往taillog的新配置通道
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key) // watch对应的key,第一个参数需要传入一个context对象
for wresp := range ch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// key有变换,进行通知,先判断操作的类型
var newConf []*LogEntry
if ev.Type != clientv3.EventTypeDelete {
// 如果不是删除操作, 则创建一个newConf对象传入taillog的新配置通道
err := json.Unmarshal(ev.Kv.Value, &newConf)
if err != nil {
fmt.Println("json unmarshal failed , err: ", err)
continue
}
}
fmt.Printf("get new conf: %v\n", newConf)
newConfCh <- newConf
}
}
}