2024年2月

一、什么是观察者模式

观察者模式
又叫做
发布-订阅模式
或者
源-监视器模式

结合它的各种别名大概就可以明白这种模式是做什么的。

其实就是观察与被观察,一个对象(被观察者)的状态改变会被通知到观察者,并根据通知产生各自的不同的行为。

以下为《设计模式的艺术》中给出的定义:

观察者模式(Observer Pattern):
定义对象之间的一种
一对多
依赖关系,使得每当一个对象
状态发生改变
时,其相关依赖对象皆
得到通知并被自动更新

二、观察者模式的4个角色

Subject(目标)

Subject是被观察的对象。

Subject可以是接口、抽象类或者具体类。

它一般有
4个要素

• 一个观察者集合,一般用Vector。

• 增加观察者的方法。

• 删除观察者的方法。

• 通知方法notify()。

ConcreteSubject(具体目标)

是Subject的子类,没什么特殊的,如果有抽象方法需要实现就实现,没有的话这个类不写也行。

Observer(观察者)

一般是个接口,声明一个update()方法。

ConcreteObserver(具体观察者)

刚开始学的话先会最简单的形式就可以了,就直接实现Observer接口,实现update()方法就完了。

三、观察者模式2个代码实现案例

通过2个案例,基本就可以知道观察者模式是怎么回事了,对照观察者模式的4个角色,敲一遍。

单纯熟悉4个角色的案例

先写一个被观察者Subject

public abstract class Subject {
    // 存放观察者的集合
    private Vector<Observer> obs = new Vector<>();

    public void addObserver(Observer obs) {
        this.obs.add(obs);
    }
    public void delObserver(Observer obs) {
        this.obs.remove(obs);
    }

    // 通知方法,写成抽象方法让ConcreteSubject实现也一样的
    protected void notifyObserver() {
        for (Observer ob : obs) {
            ob.update();
        }
    }
    // 这里也是可写可不写,根据业务需求
    public abstract void doSomething();

}

再写一个ConcreteSubject

public class ConcreteSubject extends Subject {
    @Override
    public void doSomething() {
        System.out.println("被观察者事件发生改变");
        this.notifyObserver();
    }
}

观察者接口Observer

public interface Observer {
    public void update();
}

观察者实现类ConcreteObserver,我们这里给出2个观察者

public class ConcreteObserver01 implements Observer {
    @Override
    public void update() {
        System.out.println("观察者01收到状态变化信息,并进行处理...");
    }
}
public class ConcreteObserver02 implements Observer {
    @Override
    public void update() {
        System.out.println("观察者02收到状态变化信息,并进行处理...");
    }
}

最后给出一个测试类,测试运行一个看看效果,看不明白代码就debug一下捋一捋。

以下代码的意思就是:

你得有东西被观察被监视吧?

所以先创建一个被观察者,比如我的账户余额。

你得设置哪些对象在观察、监视我的账户吧?

那就添加2个,我和我老婆。

然后就坐等账户余额有变化。

一旦发工资,状态变化!!!

dosomething()!

notifyObserver()!

遍历观察者列表!

update()!

这个时候我和我老婆分别对应各自实现的update()方法。

我马上去买了游戏。

我老婆马上去买了化妆品。

(不过这个例子好像不太合适,因为update方法里又会导致账户余额的变化,循环起来了,不过大概明白咋回事就行了。)

    public class Client {
        public static void main(String[] args) {
            ConcreteSubject subject = new ConcreteSubject();
            subject.addObserver(new ConcreteObserver01());
            subject.addObserver(new ConcreteObserver02());
            subject.doSomething();
        }
    }

盟友受攻击发通知,其他盟友做出响应

这个例子也是《设计模式的艺术》给出的案例。

我们的需求如下:

联盟成员收到攻击→发送通知给盟友→盟友做出响应。

如果按照上述思路设计,则每个成员必须持有其他所有成员的状态信息,导致系统开销过大。所以我们引入一个
战队控制中心
来统一维护所有战队成员信息。

依然是我们四步走:

先创建一个Subject被观察者,这里是AllyControlCenter控制中心

    public abstract class AllyControlCenter {

        /**
         * 战队名称
         */
        protected String allyName;
        /**
         * 定义一个集合,用来存储具体观察者,也就是战队成员
         */
        protected Vector<Observer> players = new Vector<Observer>();

        /**
         * 加入战队
         */
        public void join(Observer observer) {
            System.out.println(observer.getName() + "加入" + this.allyName + "战队!");
            players.add(observer);
        }

        /**
         * 退出战队
         */
        public void quit(Observer observer) {
            System.out.println(observer.getName() + "退出" + this.allyName + "战队!");
            players.remove(observer);
        }

        /**
         * 声明抽象通知方法
         * @param name
         */
        public abstract void notifyObserver(String name);

        /**
         * 设置成员变量方法
         * @param allyName
         */
        public void setAllyName(String allyName) {
            this.allyName = allyName;
        }

        /**
         * 获取成员变量方法
         * @return
         */
        public String getAllyName() {
            return this.allyName;
        }
    }

再创建一个ConcreteSubject具体被观察者ConcreteAllyControlCenter

    public class ConcreteAllayControlCenter extends AllyControlCenter {
        public ConcreteAllayControlCenter(String name) {
            System.out.println(name + "战队组建成功!");
            System.out.println("-------------");
            this.allyName = name;
        }

        /**
         * 实现通知方法
         * @param name
         */
        @Override
        public void notifyObserver(String name) {
            System.out.println(this.allyName + "战队紧急通知,盟友" + name + "遭受敌人攻击");
            // 遍历观察者集合,调用每一个盟友(除了自己)的支援方法
            for (Observer obs : players) {
                if (!obs.getName().equalsIgnoreCase(name)) {
                    obs.help();
                }
            }
        }
    }

创建一个抽象观察者Observer

    public interface Observer {
        public String getName();

        public void setName(String name);

        /**
         * 声明支援盟友的方法
         */
        public void help();

        /**
         * 声明遭受攻击的方法
         */
        public void beAttacked(AllyControlCenter acc);
    }

在创建具体观察者Player

public class Player implements Observer {
    private String name;

    public Player(String name) {
        this.name = name;
    }
    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public void setName(String name) {
        this.name = name;
    }

    /**
     * 支援盟友的方法实现
     */
    @Override
    public void help() {
        System.out.println("坚持住,"+this.name +"来救你了!");
    }

    /**
     * 遭受攻击的方法实现
     * 当遭受攻击时,调用战队控制中心类的通知方法notifyObserver()来通知盟友
     * @param acc
     */
    @Override
    public void beAttacked(AllyControlCenter acc) {
        System.out.println(this.name + "被攻击!");
        acc.notifyObserver(name);
    }
}

测试运行

public class Client {
    public static void main(String[] args) {
        // 定义被观察者
        AllyControlCenter acc = new ConcreteAllayControlCenter("金庸群侠");

        // 定义4个观察者
        Observer player1 = new Player("杨过");
        Observer player2 = new Player("令狐冲");
        Observer player3 = new Player("张无忌");
        Observer player4 = new Player("段誉");
        acc.join(player1);
        acc.join(player2);
        acc.join(player3);
        acc.join(player4);

        // 某成员遭受攻击
        player1.beAttacked(acc);

    }
}

四、我什么时候用观察者模式?

  1. 事件处理系统
    :例如,用户界面框架中,当用户进行某些操作(如点击按钮、移动鼠标等)时,可以使用观察者模式来通知相关的处理程序。

  2. 数据订阅与发布系统
    :在需要向多个客户端发布数据更新的场景中,例如股票行情显示、新闻更新等,可以使用观察者模式。

  3. 跨系统的消息交换
    :例如,在微服务架构中,服务间的事件可以通过观察者模式进行通信,确保各服务间的解耦。

  4. 状态监控和警报系统
    :在需要监控某些状态并在特定条件下发送警报的系统中,观察者模式可以用来实现监控对象和警报系统之间的通信。

  5. 配置管理
    :当系统配置信息发生变更时,使用观察者模式可以实时通知各个使用配置的组件进行相应的调整。


往期推荐:


师爷,翻译翻译什么叫AOP


翻译,师爷师爷什么叫事务


纪念JDBC


SpringBoot实现动态数据源配置


聚簇索引、回表与覆盖索引


Java锁到底是个什么东西

大家即使没用过Redis,也应该都听说过Redis的威名。

Redis是一种Nosql类型的数据存储,全称Remote Dictionary Server,也就是远程字典服务器,用过Dictionary的应该都知道它是一种键值对(Key-Value)的数据结构,所以Redis也称为KV存储。

Redis的用途十分广泛,包括帮助网页快速加载,管理登录状态,更新社交动态、游戏积分排名、电商抢购秒杀,等等,有点规模的应用后边都有它的身影。

Redis之所以这么流行,首先是因为它的处理速度特别快,它主要在内存中处理数据;其次它提供了多种数据结构,使用起来比较方便,而且这些数据结构的操作时间复杂度都很优秀;最后Redis会将数据保存到磁盘中,提供一定的持久性。

但是很多同学常常对Redis的数据安全有所担忧,大家经常问:Redis能保证数据不丢失吗?怎么做到的?本文会简单说说Redis的数据保护机制,它的好处和局限,以及我们应该怎样设置、有哪些高级技巧。

Redis面临的数据丢失风险

Redis中的数据是有可能丢失的。

首先,咱们搞清楚一个概念——数据持久性。简单来说,数据持久性就是确保你的数据在遇到各种意外情况时,比如断电、系统崩溃等,之后还能安然无恙的存在。就像是手机,即使没电了,充上电之后,里面的照片和信息都还在,没有丢失。

那么,Redis面临的数据丢失风险有哪些呢?

上文说到Redis主要在内存中操作数据,内存是一种临时存储,一旦断电(或者硬件故障、软件错误等),内存中的数据就会烟消云散。有的同学会说,数据不是会保存到硬盘吗?是的,但是还是可能会有一些数据来不及写入硬盘,这是Redis的持久化机制导致的,下边会进行详细说明。

而且,即使Redis将全部数据都及时保存到了硬盘,硬盘出现问题也可能会导致Redis的数据丢失。

另外有的同学会说,我只是在Redis中缓存数据,所有的数据在数据库中都有完整的记录。这个问题虽然有点超纲,但是这里还是简单交代下。这种情况下如果要恢复的数据量比较大,从数据库恢复数据的时间会比较长,这会延长故障的恢复时间。而且如果系统访问量比较大,还可能导致缓存穿透的问题,击垮数据库。

所以,尽管Redis给我们的应用带来了极速体验,但是如果不采取措施,数据丢失的风险是实实在在的。下面,我们将探讨Redis如何通过各种持久化策略来应对这些风险,尽量保证数据的安全。

基础策略

保证数据不丢失的基础策略就是使用Redis自带的持久化机制,Redis提供了两种主要的数据持久化方法:RDB(快照)和AOF(追加文件)。这两种方法各有千秋,让我们来详细了解一下。

RDB机制

RDB持久化是通过创建数据集的快照来工作的,在指定的时间间隔内,Redis会自动将内存中的数据集写入硬盘的一个文件(通常是dump.rdb)。这就像是给数据拍了一张快照,当需要的时候可以随时从这个快照恢复。

优点:

  • 性能高
    :快照生成时,用到了写时拷贝技术,此时Redis主进程只负责写入数据,实际保存工作由子进程完成,因此对性能影响较小。
  • 恢复快
    :与AOF相比,使用RDB文件恢复数据通常更快。

缺点

  • 数据可能丢失
    :如果Redis异常停止,那么最后一次快照之后的所有数据更改都会丢失。
  • 大数据集恢复时间长
    :虽然比AOF快,但是如果数据集非常大,恢复过程仍然可能需要较长时间。

AOF机制

AOF持久化通过记录每个写操作到一个日志文件中,实现数据的持久化。这就像是把每次数据变动都先记录下来,然后再更新到内存中,需要恢复时,按照这个操作日志一步步来就行了。

需要注意AOF记录也很难做到每个写操作都先持久化到硬盘中,这是因为硬盘的读写速度一般都很慢,比内存操作低几个数量级,如果每次都先写到硬盘,Redis也做不到目前的低延迟高并发。所以写操作一般都是先缓存一段时间,然后再批量flush到硬盘。

优点:

  • 数据安全性高
    :AOF持久化可以配置不同的同步频率,例如每秒同步,这样可以在保证性能的同时,减少数据丢失的风险。
  • 可读的日志文件
    :AOF文件是一个纯文本文件,可以被人读懂,便于理解和问题排查。

缺点

  • 文件体积大
    :由于记录了所有写操作,AOF文件的体积通常会大于RDB文件。
  • 恢复速度慢
    :与RDB相比,AOF在恢复大量数据时通常更慢,因为需要重新执行所有操作。

配置建议

RDB配置建议

大部分情况下都建议开启RDB,因为RDB需要的资源相对AOF小很多。如果对数据完整性的要求不高,或者能很快的从其它渠道恢复数据,一般只需要开启RDB就可够了。

合理设置快照间隔

Redis的RDB持久化允许我们配置多个不同的快照条件,以适应不同的数据更新频率和保证数据安全。我们可以在 redis.conf 配置文件中设置多个快照规则。以下是一个示例配置,展示了如何根据数据变化的频繁程度来设置快照的条件:

# 在900秒内如果至少有1个键被改变,则进行一次快照
save 900 1
# 在300秒内如果至少有10个键被改变,则进行一次快照
save 300 10
# 在60秒内如果至少有1000个键被改变,则进行一次快照
save 60 1000

这样的配置意味着:

  • 如果数据变化不是很频繁,我们不需要那么频繁地进行快照保存,以避免不必要的性能开销。
  • 当数据变化变得更加频繁时,我们通过更紧密的快照来减少数据丢失的风险。

动态调整快照规则

除了在配置文件中静态设置快照规则外,Redis还提供了命令让我们可以在运行时动态调整快照规则。使用CONFIG SET命令,我们可以根据应用的当前状态和需求,动态地调整快照条件:

# 动态设置快照规则
redis-cli CONFIG SET save "60 1000 300 10 900 1"

注意事项

  • 性能考量:虽然频繁的快照可以减少数据丢失的风险,但也可能会对性能产生影响,特别是在数据集很大的情况下。因此,需要根据实际情况权衡快照频率和性能。
  • 监控与调整:建议监控Redis的性能指标,并根据实际运行情况调整快照规则。随着业务的发展,可能需要定期回顾和调整这些设置。

AOF配置建议

当数据仅保存在Redis中,或对数据的丢失难以容忍时,建议开启AOF。

考虑到性能和数据安全,建议设置为每秒同步一次。这样既可以保证数据的及时性,又不会对性能影响太大。

以下是一个示例配置:

appendfsync everysec

定期重写AOF

随着时间的推移,AOF文件可能会变得很大,不仅会占用更多的磁盘空间,而且重启后或从故障恢复时处理的会比较慢。缓解这个问题,可以使用Redis提供的定期重写机制。

在AOF重写过程中,Redis会创建一个新的AOF文件,这个新文件仅包含重建当前数据集所需的最小命令集合。例如,如果一系列的INCR命令将某个键的值从0递增到了100,那么在重写后的AOF文件中可能只会记录一条SET命令来直接设置这个键为100,从而大大减小文件。

通过 auto-aof-rewrite-percentage 和 auto-aof-rewrite-min-size 参数可以配置自动重写的条件。

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
  • auto-aof-rewrite-percentage:重写时机:当前AOF文件大小相对于上一次重写后的文件大小的增长百分比。例如,若设置为100,则表示每当AOF文件大小翻倍时,Redis将自动触发AOF重写。
  • auto-aof-rewrite-min-size:即使满足了增长百分比条件,Redis也不会立即进行重写,还需要AOF文件达到一个最小尺寸。只有当文件大小超过这个设定值时,才会真正触发重写。

通过以上配置,在保证Redis性能的同时,数据安全性也有了基础的保证。

高级策略

从对基础策略的分析中我们了解到即使采用AOF日志,因为写日志的延迟,数据仍存在丢失的可能性。而且即使数据都写入到了硬盘,也无法处理单机硬盘故障导致数据丢失的问题。

这一小节就让我们来看下处理这个问题的一些高级策略,包括主从架构、哨兵系统和集群架构。这些策略可以提高数据的安全性和可用性。

主从架构实现多副本保存

在Redis的主从架构中,数据会从一个主节点复制到一个或多个从节点。这样做的好处是,即使主节点出现问题,我们也可以从从节点中恢复数据,而且从节点可以继续提供查询服务。

工作原理
:主节点负责处理所有的写操作,并将这些操作记录同步到从节点。从节点则可以处理读请求,分担主节点的读负载。

优点

  • 数据冗余
    :通过在多个从节点上保存数据副本,提高了数据的可靠性。
  • 读负载均衡
    :从节点可以处理读请求,帮助分担主节点的读负载。

配置示例:

# 从节点配置 
slaveof <masterip> <masterport>

主节点无需特别配置,只需正常启动。从节点的配置文件中增加slaveof配置,masterip、masterport是主节点的IP和端口。

哨兵系统实现故障转移

哨兵系统(Sentinel)是一种用于监控Redis主从节点状态的系统,能够在主节点故障时自动进行故障转移。

工作原理
:哨兵通过发送命令,检查主从节点的健康状态。如果主节点不可达,哨兵会自动将其中一个从节点提升为新的主节点,并更新其他从节点以指向新的主节点。

优点:

  • 自动故障转移
    :提高了系统的可用性,当主节点出现故障时,能够快速恢复。
  • 监控
    :哨兵还负责监控Redis节点的运行状态,提供了一定程度的自动管理。

配置示例:

# 哨兵配置文件 sentinel.conf
sentinel monitor mymaster <masterip> <masterport> 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
  • sentinel monitor mymaster:这条命令让哨兵监控一个名为 mymaster 的主节点,其IP和端口分别为 masterip 和 masterport 。数字2表示当至少有两个哨兵认为主节点不可达时,才会进行故障转移。这是为了避免因为网络闪断导致的误判。这也告诉我们如果需要更高的可用性,哨兵进程也要部署多个,一般3个或5个就够了。
  • sentinel down-after-milliseconds mymaster 5000:设置哨兵判断主节点为“下线”的时间。例如,这里设置为5000毫秒(5秒),如果哨兵在这段时间内无法达到主节点,则认为主节点下线。因为各种原因,哨兵可能会出现误判的问题,多等一会说不定又能访问主节点。
  • sentinel failover-timeout mymaster 60000
    :设置故障转移的超时时间,单位是毫秒。在这个例子中,设置为60000毫秒(60秒)。如果故障转移操作在这段时间内没有完成,则会被取消。
  • sentinel parallel-syncs mymaster 1:设置在故障转移后,同时可以有多少个从节点同时对新的主节点进行同步。这里设置为1,意味着一次只有一个从节点可以同步。在故障转移后,所有从服务器都需要与新的主服务器进行全量同步以保证数据一致性。由于全量同步会阻塞从节点,并且可能会消耗较大的网络带宽和CPU资源,所以通过限制并发同步的从节点数量,可以避免过多从节点同时进行同步带来的资源压力过大问题。

集群架构实现数据冗余

Redis集群通过分片的方式来存储数据,每个分片存储不同的数据。通过多个节点的协作,实现数据的冗余和分布式存储。

工作原理
:Redis集群将所有的数据分为16384个哈希槽,每个节点负责一部分哈希槽。客户端根据特定的哈希规则,将数据存储到相应的节点上。

优点

  • 数据分片
    :实现了数据的自动分片,便于管理大规模数据。
  • 高可用性
    :集群中的节点可以相互备份,即使部分节点失败,也不会影响整个集群的可用性。

配置示例: 配置Redis集群涉及到启动多个Redis实例,可使用redis-cli工具创建集群:

# 启动Redis实例(假设启动6个实例作为示例)
redis-server --port 7000 --cluster-enabled yes --cluster-config-file nodes-7000.conf --cluster-node-timeout 5000 --appendonly yes --appendfilename appendonly-7000.aof --dbfilename dump-7000.rdb --logfile 7000.log
# 重复上述命令,修改端口为7001-7005

# 使用redis-cli创建集群
redis-cli --cluster create <ip1>:7000 <ip2>:7001 <ip3>:7002 <ip4>:7003 <ip5>:7004 <ip6>:7005 --cluster-replicas 1
  • --cluster-enabled yes:启用Redis集群模式。
  • --cluster-config-file nodes-7000.conf
    :指定集群的配置文件。这个文件由Redis自动维护,记录了集群中所有节点的信息。
  • --cluster-node-timeout 5000
    :设置节点超时时间,单位是毫秒。如果一个节点在这段时间内没有响应,集群会认为该节点已经下线。
  • --appendonly yes
    :启用AOF持久化模式。在集群模式下,推荐使用AOF持久化来保证数据安全。
  • --appendfilename appendonly-7000.aof
    :指定AOF文件的名字。这里根据不同的端口号,为每个实例指定了不同的AOF文件名,以避免冲突。
  • --dbfilename dump-7000.rdb
    :指定RDB文件的名字。同样地,根据不同的端口号为每个实例指定了不同的RDB文件名。
  • --logfile 7000.log:指定日志文件的名字。这有助于在出现问题时进行故障排查。

通过主从架构、哨兵系统和集群架构,可以有效地实现数据的多副本保存、故障转移和数据冗余,提高系统的可靠性和可用性,基本上可以避免单机系统的数据丢失问题。

跨机房部署

服务器所在的机房也可能出现问题,比如光缆被挖断了、空调系统坏了、机房着火了等实际出现过的状况,为了解决这些问题,我们还可以通过跨机房的方法来提升Redis的数据可靠性和可用性。

  • 在不同机房间部署主从复制架构。在一个数据中心内设置主节点,在另一个或多个数据中心设置从节点。
  • Sentinel(哨兵)集群也应跨机房部署,以避免单点故障。
  • 使用Redis Cluster进行跨机房部署,每个机房都可以有多个分片(shard),并且每个分片的主节点和从节点分别位于不同的地理位置,这样即使一个机房完全不可用,其他机房的副本仍然能够提供服务。

跨机房部署时需要自行解决网络的通信问题,让各个节点之间可以无障碍的相互访问,机房间最好使用低延迟、高带宽的专线连接,以加速数据同步过程并降低网络问题导致的数据不一致风险。

还有面试中经常提及的两地三中心的
多活架构
,也可以安排上。每个机房都部署一套完整的、独立处理读写请求的Redis集群,并通过分布式锁或者数据同步中间件等技术保证各个集群间数据的一致性。

  • 分布式锁可以采用ZooKeeper、etcd、redis等,确保在多个数据中心进行同步更新时,只有一个数据中心的Redis集群在给定时间内对某个资源拥有写权限。
  • 数据同步中间件主要用于实时或近实时地将一个数据中心的写入操作同步到另一个数据中心。可以使用消息队列、专业的数据同步工具(比如阿里巴巴开源的Canal)等。
  • 多活架构还要设计数据分片策略、数据路由机制以及事务处理方式,比如根据地域或者一致性Hash分片来区分用户,然后使用客户端驱动路由或者网关路由来把用户导向不同的机房,最后使用分布式事务提交数据。

多活架构比较复杂,我也没有实际搞过,这里就不多说了。

其他运维措施

日常运维中的定期检查和文件备份,虽然平时看起来不起眼,但在关键时刻却能发挥巨大作用。

运维工具检测

就像我们用手表监测心率一样,使用专业的运维工具可以帮助我们实时监控Redis服务器的状态,包括性能指标、资源使用情况、可能的瓶颈等。

具体做法:

  • 选择合适的工具
    :市面上有许多优秀的运维监控工具,如Prometheus结合Grafana、Zabbix等,可以根据自己的需求和环境选择。
  • 定制监控项
    :根据你的具体需求,定制监控项。比如,内存使用率、磁盘使用率、命令执行延迟、连接数等,这些都是常见的监控指标。
  • 设置告警:设置阈值,一旦监控到的数据超过这个阈值,就会触发告警。这就像是你的手表在你心率异常时提醒你,帮助你及时发现并处理问题。

定期备份数据

备份就是我们给文件买了一份保险,无论是误操作还是系统故障,都能够确保数据不会丢失,可以快速恢复到备份时的状态。

具体做法

  • 定期执行
    :设定一个合理的备份频率,比如每天凌晨进行一次。频率的选择取决于你的业务需求和数据变化的频繁程度。
  • 自动化:利用crontab等工具自动化备份流程,让备份工作自动化进行,减少人为遗忘的风险。
  • 远程存储
    :将备份文件存储在远程服务器或云存储服务上。这样做的好处是,即使本地发生灾难性事件,数据仍然是安全的。

通过实施这些常规措施,我们可以大大提高数据的安全性和系统的稳定性。

总结

说了这么多,让我们做一个总结。

如果你的业务对数据的完整性要求非常高,建议开启AOF持久化,并设置合理的fsync策略(如每秒同步一次)。同时,配合使用主从复制和哨兵系统,以确保数据的高可用性和安全性。

对于读写性能有极高要求的场景,可以考虑只使用RDB持久化或者RDB与AOF结合的方式(数据完整性要求高,AOF用于故障恢复,RDB用于重启加速)。同时,通过增加从节点和合理分配读写负载,可以进一步提升性能、提高数据安全性。

如果业务数据量巨大,单个Redis实例难以满足存储需求,那么Redis集群是一个不错的选择。它通过分片来实现数据的分布式存储,同时保持高可用性。

日常的监控和备份也要搞起来,如果服务和数据及其重要,跨机房部署可以提供极大的数据安全性和系统稳定性。至于传说中的多活架构,不到万不得已不要轻易尝试,极为复杂,成本很高。

最后不要忘了定期演练,搞了这么多的机制到底能不能发挥作用?有没有被不小心搞坏,定期演练可以提前发现问题,及时解决,避免更大的损失。


以上就是本文的主要内容。

关注萤火架构,加速技术提升!

我们是
袋鼠云数栈 UED 团队
,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。

本文作者:佳岚

Suspense

Suspense
组件我们并不陌生,中文名可以理解为
暂停or悬停
, 在 React16 中我们通常在路由懒加载中配合 Lazy 组件一起使用 ,当然这也是官方早起版本推荐的唯一用法。

那它暂停了什么? 进行异步网络请求,然后再拿到请求后的数据进行渲染是很常见的需求,但这不可避免的需要先渲染一次没有数据的页面,数据返回后再去重新渲染。so , 我们想要暂停的就是第一次的无数据渲染。

通常我们在没有使用
Suspense
时一般采用下面这种写法, 通过一个
isLoading
状态来显示加载中或数据。这样代码是不会有任何问题,但我们需要手动去维护一个
isLoading
状态的值。

const [data, isLoading] = fetchData("/api");
if (isLoading) {
  return <Spinner />;
}
return <MyComponent data={data} />;

当我们使用
Suspense
后,使用方法会变为如下, 我们只需将进行异步数据获取的组件进行包裹,并将加载中组件通过
fallback
传入

return (
  <Suspense fallback={<Spinner />}>
    <MyComponent />
  </Suspense>
);

那 React 是如何知道该显示
MyComponent
还是
Spinner
的?

答案就在于
MyComponent
内部进行
fetch
远程数据时做了一些手脚。

export const App = () => {
  return (
    <div>
      <Suspense fallback={<Spining />}>
        <MyComponent />
      </Suspense>
    </div>
  );
};

function Spining() {
  return <p>loading...</p>;
}

let data = null;

function MyComponent() {
  if (!data) {
    throw new Promise((resolve) => {
      setTimeout(() => {
        data = 'kunkun';
        resolve(true);
      }, 2000);
    });
  }
  return (
    <p>
      My Component, data is {data}
    </p>
  );
}

Suspense
是根据捕获子组件内的异常来实现决定展示哪个组件的。这有点类似于
ErrorBoundary
,不过
ErrorBoundary
是捕获 Error 时就展示回退组件,而
Suspense
捕获到的 Error 需要是一个
Promise
对象(并非必须是 Promise 类型,thenable 的都可以)。

我们知道 Promise 有三个状态,
pending

fullfilled

rejected
,当我们进行远程数据获取时,会创建一个
Promise
,我们需要直接将这个
Promise
作为
Error
进行抛出,由 Suspense 进行捕获,捕获后对该
thenable
对象的
then
方法进行回调注册
thenable.then(retry)
, 而 retry 方法就会开始一个调度任务进行更新,后面会详细讲。
file

知道了大致原理,这时还需要对我们的
fetcher
进行一层包裹才能实际运用。

// MyComponent.tsx
const getList = wrapPromise(fetcher('http://api/getList'));

export function MyComponent() {
  const data = getList.read();

  return (
    <ul>
      {data?.map((item) => (
        <li>{item.name}</li>
      ))}
    </ul>
  );
}

function fetcher(url) {
  return new Promise((resove, reject) => {
    setTimeout(() => {
      resove([{ name: 'This is Item1' }, { name: 'This is Item2' }]);
    }, 1000);
  });
}

// Promise包裹函数,用来满足Suspense的要求,在初始化时默认就会throw出去
function wrapPromise(promise) {
  let status = 'pending';
  let response;

  const suspend = promise.then(
    (res) => {
      status = 'success';
      response = res;
    },
    (err) => {
      status = 'error';
      response = err;
    }
  );
  const read = () => {
    switch (status) {
      case 'pending':
        throw suspend;
      default:
        return response;
    }
  };

  return { read };

从上述代码我们可以注意到,通过
const data = getList.read()
这种同步的方式我们就能拿到数据了。
注意: 上面这种写法并非一种范式,目前官方也没有给出推荐的写法
为了与
Suspense
配合,则我们的请求可能会变得很
不优雅
,官方推荐是直接让我们使用第三方框架提供的能力使用
Suspense
请求数据,如
useSWR

下面时
useSWR
的示例,简明了很多,并且对于
Profile
组件,数据获取的写法可以看成是同步的了。

import { Suspense } from 'react'
import useSWR from 'swr'
 
function Profile () {
  const { data } = useSWR('/api/user', fetcher, { suspense: true })
  return <div>hello, {data.name}</div>
}
 
function App () {
  return (
    <Suspense fallback={<div>loading...</div>}>
      <Profile/>
    </Suspense>
  )
}

Suspense
的另一种用法就是与
懒加载lazy组件
配合使用,在完成加载前展示Loading

<Suspense fallback={<GlobalLoading />}>
   {lazy(() => import('xxx/xxx.tsx'))}
</Suspense>

由此得出,通过
lazy
返回的组件也应该包裹一层类似如上的 Promise,我们看看 lazy 内部是如何实现的。
其中
ctor
就是我们传入的
() => import('xxx/xxx.tsx')
, 执行
lazy
也只是帮我们封装了层数据结构。
ReactLazy.js

export function lazy<T>(
  ctor: () => Thenable<{default: T, ...}>,
): LazyComponent<T, Payload<T>> {
  const payload: Payload<T> = {
    // We use these fields to store the result.
    _status: Uninitialized,
    _result: ctor,
  };
  const lazyType: LazyComponent<T, Payload<T>> = {
    $$typeof: REACT_LAZY_TYPE,
    _payload: payload,
    _init: lazyInitializer,
  };
  return lazyType;
}

React 会在
Reconciler
过程中去实际执行,在协调的
render
阶段
beginWork
中可以看到对lazy单独处理的逻辑。
ReactFiberBeginWork.js

function mountLazyComponent(
  _current,
  workInProgress,
  elementType,
  renderLanes,
) {
  const props = workInProgress.pendingProps;
  const lazyComponent: LazyComponentType<any, any> = elementType;
  const payload = lazyComponent._payload;
  const init = lazyComponent._init;
	// 在此处初始化lazy
  let Component = init(payload);
	// 下略
}

那我们再来看看
init
干了啥,也就是封装前的
lazyInitializer
方法,整体跟我们之前实现的 fetch 封装是一样的。
ReactLazy.js

function lazyInitializer<T>(payload: Payload<T>): T {
  if (payload._status === Uninitialized) {
    const ctor = payload._result;
	// 这时候开始进行远程模块的导入
    const thenable = ctor();
    thenable.then(
      moduleObject => {
        if (payload._status === Pending || payload._status === Uninitialized) {
          // Transition to the next state.
          const resolved: ResolvedPayload<T> = (payload: any);
          resolved._status = Resolved;
          resolved._result = moduleObject;
        }
      },
      error => {
        if (payload._status === Pending || payload._status === Uninitialized) {
          // Transition to the next state.
          const rejected: RejectedPayload = (payload: any);
          rejected._status = Rejected;
          rejected._result = error;
        }
      },
    );
  }
  if (payload._status === Resolved) {
    const moduleObject = payload._result;
    }
    return moduleObject.default;
  } else {
    // 第一次执行肯定会先抛出异常
    throw payload._result;
  }
}

Suspense 底层是如何实现的?

其底层细节非常之多,在开始之前,我们先回顾下 React 的大致架构

Scheduler
: 用于调度任务,我们每次
setState
可以看成是往其中塞入一个
Task
,由
Scheduler
内部的优先级策略进行判断何时调度运行该
Task

Reconciler
: 协调器,进行 diff 算法,构建 fiber 树

Renderer
: 渲染器,将 fiber 渲染成 dom 节点

Fiber 树的结构, 在 reconciler 阶段,采用
深度优先
的方式进行遍历,往下递即调用
beginWork
的过程,往上回溯即调用
ComplteWork
的过程
file
我们先直接进入
Reconciler
中分析下
Suspense

fiber
节点是如何被创建的
beginWork

function beginWork(
  current: Fiber | null,
  workInProgress: Fiber,
  renderLanes: Lanes,
): Fiber | null {
	switch (workInProgress.tag) {
		case HostText:
      return updateHostText(current, workInProgress);
    case SuspenseComponent:
      return updateSuspenseComponent(current, workInProgress, renderLanes);
		// 省略其他类型
	}
}

  • beginWork
    中会根据
    **不同的组件类型**
    执行不同的创建方法, 而
    Suspense
    对应的会进入到
    updateSuspenseComponent

updateSuspenseComponent

function updateSuspenseComponent(current, workInProgress, renderLanes) {
  const nextProps = workInProgress.pendingProps;

  let showFallback = false;
  // 标识该Suspense是否已经捕获过子组件的异常了
  const didSuspend = (workInProgress.flags & DidCapture) !== NoFlags;

  if (
    didSuspend
  ) {
    showFallback = true;
    workInProgress.flags &= ~DidCapture;
  } 

  // 第一次组件加载
  if (current === null) {

    const nextPrimaryChildren = nextProps.children;
    const nextFallbackChildren = nextProps.fallback;
   
    // 第一次默认不展示fallback,因为要先走到children后才会产生异常
    if (showFallback) {
      const fallbackFragment = mountSuspenseFallbackChildren(
        workInProgress,
        nextPrimaryChildren,
        nextFallbackChildren,
        renderLanes,
      );
      const primaryChildFragment: Fiber = (workInProgress.child: any);
      primaryChildFragment.memoizedState = mountSuspenseOffscreenState(
        renderLanes,
      );

      return fallbackFragment;
    } 
     else {
      return mountSuspensePrimaryChildren(
        workInProgress,
        nextPrimaryChildren,
        renderLanes,
      );
    }
  } else {
    // 如果是更新,操作差不多,此处略
  }
}
  • 第一次
    updateSuspenseComponent
    时 ,我们会把
    mountSuspensePrimaryChildren
    的结果作为下一个需要创建的
    fiber
    , 因为需要先去触发异常。
  • 实际上
    mountSuspensePrimaryChildren
    会为我们的
    PrimaryChildren
    在包上一层
    OffscreenFiber
function mountSuspensePrimaryChildren(
  workInProgress,
  primaryChildren,
  renderLanes,
) {
  const mode = workInProgress.mode;
  const primaryChildProps: OffscreenProps = {
    mode: 'visible',
    children: primaryChildren,
  };
  const primaryChildFragment = mountWorkInProgressOffscreenFiber(
    primaryChildProps,
    mode,
    renderLanes,
  );
  primaryChildFragment.return = workInProgress;
  workInProgress.child = primaryChildFragment;
  return primaryChildFragment;
}

什么是
OffscreenFiber/Component

通过其需要的 mode 参数值,我们可以大胆的猜测,应该是一个能控制是否显示子组件的组件,如果
hidden
,则会通过 CSS 样式隐藏子元素。
file
在这之后的 Fiber 树结构
file
当我们向下执行到
MyComponent
时,由于抛出了错误,当前的
reconciler
阶段会被暂停
让我们再回到 Reconciler 阶段的起始点可以看到有
Catch
语句。
renderRootConcurrent

function renderRootConcurrent(root: FiberRoot, lanes: Lanes) {
 // 省略..
  do {
    try {
      workLoopConcurrent();
      break;
    } catch (thrownValue) {
      handleError(root, thrownValue);
    }
  } while (true);
 // 省略..
}

performConcurrentWorkOnRoot(root, didTimeout) {
	// 省略..
	let exitStatus = shouldTimeSlice
    ? renderRootConcurrent(root, lanes)
    : renderRootSync(root, lanes);
  // 省略..
}

我们再看看错误处理函数
handleError
中做了些什么
handleError

function handleError(root, thrownValue): void {
	// 这时的workInProgress指向MyComponent
  let erroredWork = workInProgress;
  try {
    throwException(
      root,
      erroredWork.return,
      erroredWork,
      thrownValue,
      workInProgressRootRenderLanes,
    );
    completeUnitOfWork(erroredWork);
}

function throwException(root: FiberRoot, returnFiber: Fiber, sourceFiber: Fiber, value: mixed, rootRenderLanes: Lanes) 
{
  // 给MyComponent打上未完成标识
  sourceFiber.flags |= Incomplete;

  if (
    value !== null &&
    typeof value === 'object' &&
    typeof value.then === 'function'
  ) {
    // wakeable就是我们抛出的Promise
    const wakeable: Wakeable = (value: any);

    // 向上找到第一个Suspense边界
    const suspenseBoundary = getNearestSuspenseBoundaryToCapture(returnFiber);
    if (suspenseBoundary !== null) {
      // 打上标识
      suspenseBoundary.flags &= ~ForceClientRender;
      suspenseBoundary.flags |= ShouldCapture;
      // 注册监听器
			attachRetryListener(suspenseBoundary, root, wakeable, rootRenderLanes);
			return;
  }
}

主要做了三件事

  • 给抛出错误的组件打上
    Incomplete
    标识
  • 如果捕获的错误是 thenable 类型,则认定为是 Suspense 的子组件,向上找到最接近的一个
    Suspense
    边界,并打上
    ShouldCapture
    标识
  • 执行
    attachRetryListener
    对 Promise 错误监听,当状态改变后开启一个调度任务重新渲染 Suspense

在错误处理的事情做完后,就不应该再往下递了,开始调用
completeUnitOfWork
往上归, 这时由于我们给 MyComponent 组件打上了
Incomplete
标识,这个标识表示由于异常等原因渲染被搁置,那我们是不是就要开始往上找能够处理这个异常的组件?

我们再看看
completeUnitOfWork
干了啥

function completeUnitOfWork(unitOfWork: Fiber): void {
 // 大致逻辑
  let completedWork = unitOfWork;
  if ((completedWork.flags & Incomplete) !== NoFlags) {
      const next = unwindWork(current, completedWork, subtreeRenderLanes);
			if (next) {
					workInProgress = next;
					return
			}
			// 给父节点打上Incomplete标记
			if (returnFiber !== null) {
		      returnFiber.flags |= Incomplete;
		      returnFiber.subtreeFlags = NoFlags;
		      returnFiber.deletions = null;
			}
	}
}

可以看到最终打上
Incomplete
标识的组件都会进入
unwindWork
流程 , 并一直将祖先节点打上
Incomplete
标识,直到
unwindWork
中找到一个能处理异常的边界组件,也就
ClassComponent
,
SuspenseComponent
, 会去掉
ShouldCapture
标识,加上
DidCapture
标识

这时,对于
Suspense
来说需要的
DidCapture
已经拿到了,下面就是重新从
Suspense
开始走一遍
beginWork
流程

再次回到 Suspense 组件, 这时由于有了
DidCapture
标识,则展示
fallback
对于
fallback
组件的
fiber
节点是通过
mountSuspenseFallbackChildren
生成的

function mountSuspenseFallbackChildren(
  workInProgress,
  primaryChildren,
  fallbackChildren,
  renderLanes,
) {
  const primaryChildProps: OffscreenProps = {
    mode: 'hidden',
    children: primaryChildren,
  };

  let primaryChildFragment = mountWorkInProgressOffscreenFiber(
      primaryChildProps,
      mode,
      NoLanes,
    );
  let fallbackChildFragment = createFiberFromFragment(
      fallbackChildren,
      mode,
      renderLanes,
      null,
    );

  primaryChildFragment.return = workInProgress;
  fallbackChildFragment.return = workInProgress;
  primaryChildFragment.sibling = fallbackChildFragment;
  workInProgress.child = primaryChildFragment;
  return fallbackChildFragment;
}

它主要做了三件事


  • PrimaryChild

    Offscreen
    组件通过css隐藏

  • fallback
    组件又包了层
    Fragment
    返回

  • fallbackChild
    作为
    sibling
    链接至
    PrimaryChild

file
到这时渲染 fallback 的 fiber 树已经基本构建完了,之后进入
commit
阶段从根节点
rootFiber
开始深度遍历该
fiber树
进行 render。

等待一段时间后,
primary
组件数据返回,我们之前在
handleError
中添加的监听器
attachRetryListener
被触发,开始新的一轮任务调度。注:源码中调度回调实际在 Commit 阶段才添加的。

这时由于
Suspense
节点已经存在,则走的是
updateSuspensePrimaryChildren
中的逻辑,与之前首次加载时
monutSuspensePrimaryChildren
不同的是多了删除的操作, 在 commit 阶段时则会删除
fallback
组件, 展示
primary
组件。
updateSuspensePrimaryChildren

if (currentFallbackChildFragment !== null) {
    // Delete the fallback child fragment
    const deletions = workInProgress.deletions;
    if (deletions === null) {
      workInProgress.deletions = [currentFallbackChildFragment];
      workInProgress.flags |= ChildDeletion;
    } else {
      deletions.push(currentFallbackChildFragment);
    }
  }

至此,Suspense 的一生我们粗略的过完了,在源码中对 Suspense 的处理非常多,涉及到优先级相关的本篇都略过。
Suspense 中使用了
Offscreen
组件来渲染子组件,这个组件的特性是能根据传入 mode 来控制子组件样式的显隐,这有一个好处,就是能保存组件的状态,有些许类似于 Vue 的
keep-alive
。其次,它拥有着最低的调度优先级,比空闲时优先级还要低,这也意味着当 mode 切换时,它会被任何其他调度任务插队打断掉。
file

useTransition

useTransition
可以让我们在不阻塞 UI 渲染的情况下更新状态。
useTransition

startTransition
允许将某些更新标记为
低优先级更新
。默认情况下,其他更新被视为
紧急更新
。React 将允许更紧急的更新(例如更新文本输入)来中断不太紧急的更新(例如展示搜索结果列表)。
其核心原理其实就是将
startTransition
内调用的状态变更方法都标识为
低优先级的lane
(
lane优先级参考
)去更新。

const [isPending, startTransition] = useTransition()

startTransition(() => {
	setData(xxx)
})

一个输入框的例子

function Demo() {
  const [value, setValue] = useState();
  const [isPending, startTransition] = useTransition();

  return (
    <div>
      <h1>useTramsotopm Demo</h1>
      <input
        onChange={(e) => {
          startTransition(() => {
            setValue(e.target.value);
          });
        }}
      />
      <hr />
      {isPending ? <p>加载中。。</p> : <List value={value} />}
    </div>
  );
}

function List({ value }) {
  const items = new Array(5000).fill(1).map((_, index) => {
    return (
      <li>
        <ListItem index={index} value={value} />
      </li>
    );
  });
  return <ul>{items}</ul>;
}

function ListItem({ index, value }) {
  return (
    <div>
      <span>index: </span>
      <span>{index}</span>
      <span>value: </span>
      <span>{value}</span>
    </div>
  );
}

当我每次进行输入时,会触发 List 进行大量更新,但由于我使用了
startTransition

List
的更新进行
延后
,所以
Input
输入框不会出现明显卡顿现象
演示地址
https://stackblitz.com/edit/stackblitz-starters-kmkcjs?file=src%2Ftransition%2FList.tsx
file

由于更新被滞后了,所以我们怎么知道当前有没有被更新呢?
这时候第一个返回参数
isPending
就是用来告诉我们当前是否还在等待中。
但我们可以看到,
input
组件目前是
非受控组件
,如果改为
受控组件
,即使使用了
startTransition
一样会出现卡顿,因为 input 响应输入事件进行状态更新应该是要同步的。
所以这时候下面介绍的
useDeferredValue
作用就来了。

useDeferredValue

useDeferredValue
可让您推迟更新部分 UI, 它与
useTransition
做的事差不多,不过
useTransition
是在状态更新层,推迟状态更新来实现非阻塞,而
useDeferredValue
则是在状态已经更新后,先使用状态更新前的值进行渲染,来延迟因状态变化而导致的组件重新渲染。

它的基本用法

function Page() {
  const [value, setValue] = useState('');
  const deferredValue = useDeferredValue(setValue);
}

我们再用
useDeferredValue
去实现上面输入框的例子

function Demo() {
  const [value, setValue] = useState('');
  const deferredValue = useDeferredValue(value);

  return (
    <div>
      <h1>useDeferedValue Demo</h1>
      <input
        value={value}
        onChange={(e) => {
          setValue(e.target.value)
        }}
      />
      <hr />
      <List value={deferredValue} />
    </div>
  );
}

我们将
input
作为
受控组件
,对于会因输入框值而造成
大量渲染

List
,我们使用
deferredValue

其变化过程如下

  1. 当输入变化时,
    deferredValue
    首先会是变化前的旧值进行重新渲染,由于值没有变,所以 List 没有重新渲染,也就没有出现阻塞情况,这时,input 的值能够实时响应到页面上。
  2. 在这次旧值渲染完成后,deferredValue 变更为新的值,React 会在后台开始对新值进行重新渲染,
    List
    组件开始 rerender,且此次 rerender 会被标识为
    低优先级渲染
    ,能够被
    中断
  3. 如果此时又有输入框输入,则中断此次后台的重新渲染,重新走1,2的流程

我们可以打印下
deferredValue
的值看下
初始情况输入框为1,打印了两次1
file

输入2时,再次打印了两次1,随后打印了两次2
file

参考

最后

欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star

前言

分布式锁在日常开发中,用处非常的多。包括但不限于抢红包,秒杀,支付下单,幂等,等等场景。
分布式锁的实现方式有多种,包括redis实现,mysql实现,zookeeper实现等等。而其中redis非常适合作为分布式锁使用,并且在各个公司都大规模的使用。

本文将
由浅入深的
探究Redis分布式锁的实现,最终实现一个可工业使用的Redis分布式锁。欢迎大家一步一步跟读,一起学习一起进步。

什么是分布式锁

分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

举一个最简单的例子。有一个数据库字段status=0,表示初始状态。
只有在status=0初始状态
。才能修改这个值。现在有两个人,张三和李四。

  • 张三,发起请求将status=0 修改为 status=1
  • 李四,发起请求将status=0 修改为 status=2

因为只有status=0才会修改,代码在修改之前都会去查询status的值,并且判断是否为0。如果为0才会去更新,不为0,则拒绝更新。这其实就是一个幂等的实现。

  • 假如没有分布式锁。短时间内请求两次,
    此时两次都获取status=0
    ,一个修改成了1,一个修改成了2。破坏代码逻辑,有问题
  • 假如加上分布式锁。短时间内请求两次,
    只有第一笔请求结束之后,第二笔才会执行
    。也就是第二笔获取status,只能获取到最新的值,比如status=1,则不修改。

Redis分布式锁方案一:SETNX (不推荐)

public String lockA(String key) {
	String val = UUID.randomUUID().toString();
	// set k  v  nx  如果不存在则设置成功,如果存在则设置失败
	boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, val);
	if (success) {
		log.info("lock success");
		try {
			// do something
		} finally {
			stringRedisTemplate.delete(key);
		}

	} else {
		log.info("lock fail");
	}

	return "lockA";
}

这个方案有一个最大的问题就是,如果线程A获取锁成功,并没有设置过期时间。那么如果此时doSomething里面是一个死循环或者程序在期间重启了,就会导致这个锁就不会被释放,那么
别的线程永远获取不到锁啦
。这个问题非常严重,对业务影响极大。不推荐使用。

Redis分布式锁方案二:SETNX + expire (不推荐)

那既然没有过去时间,我就设置一个过期时间不就行了,代码如下。

public String lockB(String key) {
	String val = UUID.randomUUID().toString();

	// set k  v  nx  如果不存在则设置成功,如果存在则设置失败
	boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, val);
	stringRedisTemplate.expire(key, 60, TimeUnit.SECONDS);
	if (success) {
		log.info("lock success");
		try {
			// do something
		} finally {
			stringRedisTemplate.delete(key);
		}

	} else {
		log.info("lock fail");
	}

	return "lockB";
}

这个方案2和方案1有同样的问题。setnx 和 expire不是一个原子执行。在获取锁成功之后,准备执行expire的时候,程序重启,也会导致同样方案1的问题,此处不再赘述。不推荐使用。

Redis分布式锁方案三:SET EX NX (不推荐)

那既然不是原子性,我们就用原子性就好了。从
redis 2.6.12
开始,set方法支持 set ex nx

image.png

public String lockB(String key) {
	String val = UUID.randomUUID().toString();

	// set k  v ex  nx  如果不存在则设置成功,如果存在则设置失败
	boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, val, 60, TimeUnit.SECONDS);
	if (success) {
		log.info("lock success");
		try {
			// do something
		} finally {
			stringRedisTemplate.delete(key);
		}

	} else {
		log.info("lock fail");
	}

	return "lockB";
}

从方案三开始,此代码就比较有健壮性了。有部分公司使用的就是方案三,但仍然存在两个问题

  • doSomething还没执行完,锁过期就被自动释放了。那么其他线程就可以获取此锁了。就会导致此代码块可能被多个线程执行。当然使用的时候可以把过期时间设置大一点,比如60分钟,3个小时等等,但总归不太好。
  • 线程A获取锁,没执行完成,锁过期了。此时线程B获取锁执行了。然后A执行完成去释放锁的时候,但他释放的是线程B获取的锁,此时是有问题的,并且问题还不小。同样不推荐使用。

Redis分布式锁方案四: (推荐)

既然时间太短,我就设置过期时间长一点。既然会被误删,我们就判断一下。代码如下

public String lockD(String key) {
	String val = UUID.randomUUID().toString();

	// set k  v  nx  如果不存在则设置成功,如果存在则设置失败
	boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, val, 60, TimeUnit.MINUTES);
	if (success) {
		log.info("lock success");
		try {
			// do something
		} finally {
			if (val.equals(stringRedisTemplate.opsForValue().get(key))) {
				stringRedisTemplate.delete(key);
			}

			// String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
			// DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
			// redisScript.setScriptText(script);
			// redisScript.setResultType(Long.class);
			// return stringRedisTemplate.execute(redisScript, Collections.singletonList(key));

		}

	} else {
		log.info("lock fail");
	}

	return "lockD";
}

大部分公司,我相信使用的都是方案四。方案四正常来说,在使用过程中极大概率不会出现任何问题,除非你们的量非常的大。但其仍有问题,finally删除锁的那块不是原子性。

比如线程A获取锁成功uuid=123, 释放成功。线程B获取锁,uuid=456,锁过期,自动释放。
此时A再次获取锁,uuid=456(恰巧是456,概率非常低)。那么A就会释放B的锁。因此为了更加严谨一点,我们使用lua脚本来保证,判断+删除的原子性。

方案四已经符合绝大多数公司的使用了,但其不好估计的过期时间,以及释放的原子性,仍 概率性的存在问题。所以社区为了解决此问题,有了以下方案。

Redis分布式锁方案五: Redission方案 (推荐)

Redisson官网介绍: Easy Redis Java client with features of an in-memory data grid(易于使用的 Redis Java 客户端,具备内存数据网格的特性)

Redisson 是一个基于 Java 的 Redis 客户端库,它提供了一系列的高级功能,使得在 Java 应用程序中使用 Redis 变得更加方便和强大。Redisson 的目标是充分利用 Redis 的各种特性,同时提供易于使用的 Java 接口。

RedissonClient
是 Java 中 Redisson 库提供的一个接口,它封装了对 Redis 数据库的各种操作,提供了丰富的方法来与 Redis 进行交互。Redisson 是一个在 Redis 的基础上实现的 Java 内存数据网格(In-Memory Data Grid)。它不仅提供了对基本数据结构的操作,还提供了分布式的 Java 对象和服务,例如
分布式锁
、集合、映射、发布/订阅、计数器等。

我们这次使用到的是redission的分布式锁。

// 获取锁  
public String lockE(String key) {

	// 获取锁
	RLock lock = redissonClient.getLock(key);
	try {
		// 获取锁。此处30s不是指执行30s,而是获取锁的超时时间
		if (lock.tryLock(30, TimeUnit.SECONDS)) {
			log.info("lock success");
		}
	} catch (Exception e) {
		
	} finally {
		if (lock != null && lock.isHeldByCurrentThread()) {
			lock.unlock();
		}
	}

	return "lockE";
}

此方案基本适用于99.99%的公司,当然可能会出现Redlock的问题,此处不过多讨论,感兴趣的同学可以网上自行搜索。

image.png

只要线程加锁成功,默认过期时间是30s。后台会自动启动一个
watch dog
看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用Redisson解决了
锁过期释放,业务没执行完
问题。

具体Redission常见问题,以及源码分析,可以详见:
Redis分布式锁实现Redisson 15问(面试常问)

最后

本文由浅入深的介绍了分布式锁。解释了为什么大部分公司用的都是方案四以及方案五的实现,而不是方案1,2,3。我们需要知道每个方案的优劣势,从而选出最适合我们业务的一种技术方案,这是每个架构师都应该具备的一种能力。

在 part 1 中,我们探讨了目前比较流行的四种 OpenAI 开源工具。在今天的 part 2 中我们将探究另外三种不同的 OpenAI 开源工具并介绍一些与 Appilot 相关的内容。

Kubectl-GPT

Kubectl-GPT 是一个 kubectl 插件,可
使用 GPT 模型从自然语言输入生成
kubectl
命令
。该插件引入了
kubectl GPT
命令,它的唯一使命就是在 Kubernetes 集群中实现您的请求。

安装

Homebrew

# Install Homebrew: https://brew.sh/ 
brew tap devinjeon/kubectl-gpt https://github.com/devinjeon/kubectl-gpt
brew install kubectl-gpt

Krew

# Install Krew: https://krew.sigs.k8s.io/docs/user-guide/setup/install/
kubectl krew index add devinjeon https://github.com/devinjeon/kubectl-gpt
kubectl krew install devinjeon/gpt

需要使用自然语言输入运行命令行工具,生成
kubectl
命令。

kubectl gpt "<WHAT-YOU-WANT-TO-DO>"

前提条件

开始之前,请确保将 OpenAI API 密钥设置为名为
OPENAI_API_KEY
的环境变量。

然后您可以在
.zshrc

.bashrc
文件中添加以下一行:

export OPENAI_API_KEY=<your-key>

当然,这取决于 OpenAI GPT API 所支持的语言,比如:

# English
kubectl gpt "Print the creation time and pod name of all pods in all namespaces."
kubectl gpt "Print the memory limit and request of all pods"
kubectl gpt "Increase the replica count of the coredns deployment to 2"
kubectl gpt "Switch context to the kube-system namespace"

Kube-Copilot

它是由 OpenAI 提供支持的 Kubernetes Copilot。主要功能为:

  • 使用 ChatGPT(GPT-4 或 GPT-3.5)
    自动执行 Kubernetes 集群操作
  • 诊断和分析 Kubernetes 工作负载的潜在问题。
  • 根据提供的提示说明生成 Kubernetes 清单。
  • 利用本地 kubectl 和 trivy 命令进行 Kubernetes 集群访问和安全漏洞扫描。
  • 无需离开终端即可访问网络并执行 Google 搜索

安装

在 Kubernetes 中运行时

Option 1:使用带 Helm 的 Web UI(推荐)

# Option 1: OpenAI
export OPENAI_API_KEY="<replace-this>"
helm install kube-copilot kube-copilot \
  --repo https://feisky.xyz/kube-copilot \
  --set openai.apiModel=gpt-4 \
  --set openai.apiKey=$OPENAI_API_KEY

# Option 2: Azure OpenAI Service
export OPENAI_API_KEY="<replace-this>"
export OPENAI_API_BASE="<replace-this>"
helm install kube-copilot kube-copilot \
  --repo https://feisky.xyz/kube-copilot \
  --set openai.apiModel=gpt-4 \
  --set openai.apiKey=$OPENAI_API_KEY \
  --set openai.apiBase=$OPENAI_API_BASE
# Forwarding requests to the service
kubectl port-forward service/kube-copilot 8080:80
echo "Visit http://127.0.0.1:8080 to use the copilot"

Option 2:使用带 CLI 的 kubectl

kubectl run -it --rm copilot \
  --env="OPENAI_API_KEY=$OPENAI_API_KEY" \
  --restart=Never \
  --image=ghcr.io/feiskyer/kube-copilot \
  -- execute --verbose 'What Pods are using max memory in the cluster'

在本地安装

使用下面的 pip 命令安装 copilot:

pip install kube-copilot

设置操作

  • 确保本地计算机上安装了
    kubectl
    ,并为 Kubernetes 集群访问配置了 kubeconfig 文件。
  • 安装
    trivy
    以评估容器映像安全问题(用于
    audit
    命令)。
  • 将 OpenAI API 密钥设置为
    OPENAI_API_KEY
    的环境变量,以启用 ChatGPT 功能。
  • 对于 Azure OpenAI 服务,还要设置
    OPENAI_API_TYPE=azure
    ,以及
    OPENAI_API_BASE=https://<replace-this>.openai.azure.com/
  • Google 搜索默认为禁用。要启用它,请设置
    GOOGLE_API_KEY

    GOOGLE_CSE_ID

使用 CLI的方法
:直接在终端运行。

Usage: kube-copilot [OPTIONS] COMMAND [ARGS]...
Kubernetes Copilot powered by OpenAI
Options:
  --version  Show the version and exit.
  --help     Show this message and exit.
Commands:
  analyze   analyze issues for a given resource
  audit     audit security issues for a Pod
  diagnose  diagnose problems for a Pod
  execute   execute operations based on prompt instructions
  generate  generate Kubernetes manifests

审核 Pod 的安全问题
:可以使用
kube-copilot audit POD [NAMESPACE]
来审核 Pod 的安全问题。

Usage: kube-copilot audit [OPTIONS] POD [NAMESPACE]
 audit security issues for a Pod
Options:
  --verbose      Enable verbose information of copilot execution steps
  --model MODEL  OpenAI model to use for copilot execution, default is gpt-4
  --help         Show this message and exit.

诊断 Pod 存在的问题
:利用
kube-copilot diagnose POD [NAMESPACE]
即能诊断 Pod 的问题。

Usage: kube-copilot diagnose [OPTIONS] POD [NAMESPACE]
 diagnose problems for a Pod
Options:
  --verbose      Enable verbose information of copilot execution steps
  --model MODEL  OpenAI model to use for copilot execution, default is gpt-4
  --help         Show this message and exit.

分析 K8s Object 的潜在问题
:运行
kube-copilot analyze RESOURCE NAME [NAMESPACE]
将分析给定资源对象的潜在问题。

Usage: kube-copilot analyze [OPTIONS] RESOURCE NAME [NAMESPACE]
  analyze issues for a given resource
Options:
  --verbose     Enable verbose information of copilot execution steps
  --model TEXT  OpenAI model to use for copilot execution, default is gpt-4
  --help        Show this message and exit.

根据提示指令执行操作

kube-copilot execute INSTRUCTIONS
能根据提示指令执行操作。它也可用于询问任何问题。

Usage: kube-copilot execute [OPTIONS] INSTRUCTIONS
execute operations based on prompt instructions
Options:
  --verbose      Enable verbose information of copilot execution steps
  --model MODEL  OpenAI model to use for copilot execution, default is gpt-4
  --help         Show this message and exit.

生成 Kubernetes 清单
:使用
kube-copilot generate
命令,根据提示说明创建 Kubernetes 清单。生成清单后,系统会提示您确认是否要应用它们。

Usage: kube-copilot generate [OPTIONS] INSTRUCTIONS
generate Kubernetes manifests
Options:
  --verbose     Enable verbose information of copilot execution steps
  --model TEXT  OpenAI model to use for copilot execution, default is gpt-4
  --help        Show this message and exit.

Kubernetes ChatGPT bot

这是用于 Kubernetes 问题的 ChatGPT1 bot。它能向 AI 询问如何解决 Prometheus 警报,并获得精炼的回复。

Prometheus 将通过 webhook 接收器将警报转发给 bot。随后 bot 会向 OpenAI 发送查询,询问如何修复警报,您只需要耐心等待结果即可。

这样的 bot 是通过 Robusta.dev 实现的,一个用于响应 Kubernetes 警报的开源平台。我们还有一个用于多集群 Kubernetes 可观察性的 SaaS 平台。

一个 Slack 工作区即为设置它的前提条件。

然后,您只需:

  • 使用 Helm 安装 Robusta
  • 加载 ChatGPT playbook。将以下内容添加到
    generated_values.yaml

playbookRepos:
  chatgpt_robusta_actions:
    url: "https://github.com/robusta-dev/kubernetes-chatgpt-bot.git"
customPlaybooks:
# Add the 'Ask ChatGPT' button to all Prometheus alerts
- triggers:
  - on_prometheus_alert: {}
  actions:
  - chat_gpt_enricher: {}


  • generated_values.yaml
    中添加 OpenAI API 密钥。确保编辑现有的
    globalConfig
    部分,不要添加重复的部分。

globalConfig:
  chat_gpt_token: YOUR KEY GOES HERE

  • 进行 Helm 升级以应用新值

helm upgrade robusta robusta/robusta 
 --values=generated_values.yaml 
 --set clusterName=<YOUR_CLUSTER_NAME>

  • 将 Prometheus 警报发送到 Robusta。或者,直接使用 Robusta 捆绑的 Prometheus 堆栈。

演示

先部署损坏的 pod,使该 pod 将停留在待处理状态:

kubectl apply -f https://raw.githubusercontent.com/robusta-dev/kubernetes-demos/main/pending_pods/pending_pod_node_selector.yaml

随即立即触发 Prometheus 警报,跳过正常延迟:

robusta playbooks trigger prometheus_alert alert_name=KubePodCrashLooping namespace=default pod_name=example-pod

Slack 中会出现一个带有按钮的警报。单击该按钮向 ChatGPT 询问有关该警报的信息即可。

Appilot

Appilot 是一款面向 DevOps 场景的开源 AI 助手,它可以充分利用 AI 大语言模型的能力让用户直接输入自然语言进一步简化 K8s 管理体验。

Appilot 基于大语言模型进行推理,并且可以运行在本地个人电脑上。用户可以根据自身的需求和使用习惯,将 Appilot 集成到任意平台,进而实现通过输入自然语言即可调用后端平台的能力,轻松完成应用管理、环境管理、K8s debug 等任务。

Appilot 项目地址
https://github.com/seal-io/appilot