2024年11月

分布式锁

概述

分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。

分布式锁是可以跨越多个实例,多个进程的锁

分布式锁具备的条件:

  • 互斥性:任意时刻,只能有一个客户端持有锁
  • 锁超时释放:持有锁超时,可以释放,防止死锁
  • 可重入性:一个线程获取了锁之后,可以再次对其请求加锁
  • 高可用、高性能:加锁和解锁开销要尽可能低,同时保证高可用
  • 安全性:锁只能被持有该锁的服务(或应用)释放。
  • 容错性:在持有锁的服务崩溃时,锁仍能得到释放,避免死锁。

分布式锁实现方案

分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:

  1. 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
  2. Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
  3. Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同

Redis实现分布式锁

SETNX

基本方案:Redis提供了setXX指令来实现分布式锁

格式: setnx key value
将key 的值设为value ,当且仅当key不存在。
若给定的 key已经存在,则SETNX不做任何动作。

设置分布式锁后,能保证并发安全,但上述代码还存在问题,如果执行过程中出现异常,程序就直接抛出异常退出,导致锁没有释放造成最终死锁的问题。(即使将锁放在finally中释放,但是假如是执行到中途系统宕机,锁还是没有被成功的释放掉,依然会出现死锁现象)

设置超时时间

SET lock_key unique_value NX PX 10000

但是,即使设置了超时时间后,还存在问题。

假设有多个线程,假设设置锁的过期时间10s,线程1上锁后执行业务逻辑的时长超过十秒,锁到期释放锁,线程2就可以获得锁执行,此时线程1执行完删除锁,删除的就是线程2持有的锁,线程3又可以获取锁,线程2执行完删除锁,删除的是线程3的锁,如此往后,这样就会出问题。

让线程只删除自己的锁

解决办法就是让线程只能删除自己的锁,即给每个线程上的锁添加唯一标识(这里UUID实现,基本不会出现重复),删除锁时判断这个标识:

但上述红框中由于判定和释放锁不是原子的,极端情况下,可能判定可以释放锁,在执行删除锁操作前刚好时间到了,其他线程获取锁执行,前者线程删除锁删除的依然是别的线程的锁,所以要让删除锁具有原子性,可以利用redis事务或
lua脚本实现原子操作判断+删除

Redis的单条命令操作是原子性的,但是多条命令操作并不是原子性的,因此Lua脚本实现的就是令Redis的多条命令也实现原子操作

redis事务不是原子操作的,详情请看
Redis的事务

但是,可以利用Redis的事务和watch实现的乐观锁 来监视锁的状态

    @RequestMapping(" /deduct_stock")
    public String deductStock() {
        String REDIS_LOCK = "good_lock";
        // 每个人进来先要进行加锁,key值为"good_lock"
        String value = UUID.randomUUID().toString().replace("-","");
        try{
            // 为key加一个过期时间
            Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);

            // 加锁失败
            if(!flag){
                return "抢锁失败!";
            }
            System.out.println( value+ " 抢锁成功");
            String result = template.opsForValue().get("goods:001");
            int total = result == null ? 0 : Integer.parseInt(result);
            if (total > 0) {
                // 如果在此处需要调用其他微服务,处理时间较长。。。
                int realTotal = total - 1;
                template.opsForValue().set("goods:001", String.valueOf(realTotal));
                System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002");
                return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002";
            } else {
                System.out.println("购买商品失败,服务端口为8002");
            }
            return "购买商品失败,服务端口为8002";
        }finally {
            // 谁加的锁,谁才能删除
            // 也可以使用redis事务
            // https://redis.io/commands/set
            // 使用Lua脚本,进行锁的删除

            Jedis jedis = null;
            try{
                jedis = RedisUtils.getJedis();

                String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
                        "then " +
                        "return redis.call('del',KEYS[1]) " +
                        "else " +
                        "   return 0 " +
                        "end";

                Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
                if("1".equals(eval.toString())){
                    System.out.println("-----del redis lock ok....");
                }else{
                    System.out.println("-----del redis lock error ....");
                }
            }catch (Exception e){

            }finally {

                if(null != jedis){
                    jedis.close();
                }
            }

            // redis事务
//            while(true){
//                template.watch(REDIS_LOCK);
//                if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
//                    template.setEnableTransactionSupport(true);
//                    template.multi();
//                    template.delete(REDIS_LOCK);
//                    List<Object> list = template.exec();
//                    if(list == null){
//                        continue;
//                    }
//                }
//                template.unwatch();
//                break;
//            }
        }
        
    }
}

尽管这样,还是会有问题,锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁的释放,但其实此时业务还在执行中,还是应该将业务执行结束之后再释放锁。

续时

因此可以设定,任务不完成,锁就不释放。

可以维护一个定时线程池
ScheduledExecutorService
,每隔 2s 去扫描加入队列中的 Task,判断失效时间是否快到了,如果快到了,则给锁续上时间。

那如何判断是否快到失效时间了呢?可以用以下公式:【失效时间】<= 【当前时间】+【失效间隔(三分之一超时)】

// 扫描的任务队列
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
 * 线程池,维护keyAliveTime
 */
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
        new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
    // 两秒执行一次「续时」操作
    SCHEDULER.scheduleAtFixedRate(() -> {
        // 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=
        Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
        while (iterator.hasNext()) {
            RedisLockDefinitionHolder holder = iterator.next();
            // 判空
            if (holder == null) {
                iterator.remove();
                continue;
            }
            // 判断 key 是否还有效,无效的话进行移除
            if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
                iterator.remove();
                continue;
            }
            // 超时重试次数,超过时给线程设定中断
            if (holder.getCurrentCount() > holder.getTryCount()) {
                holder.getCurrentTread().interrupt();
                iterator.remove();
                continue;
            }
            // 判断是否进入最后三分之一时间
            long curTime = System.currentTimeMillis();
            boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
            if (shouldExtend) {
                holder.setLastModifyTime(curTime);
                redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
                log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
                holder.setCurrentCount(holder.getCurrentCount() + 1);
            }
        }
    }, 0, 2, TimeUnit.SECONDS);
}

Redisson

使用Redis + lua方式可能存在的问题

  1. 不可重入性。同一个线程无法多次获取同一把锁
  2. 不可重试。获取锁只尝试一次就返回false,没有重试机制
  3. 超时释放。锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁的释放,存在安全隐患
  4. 主从一致性。如果Redis是主从集群,主从同步存在延迟,当主机宕机时,从成为了主,但可能存在从此时还未完成同步,因此从上就没有锁标识,此时会出现线程安全问题。

RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。

RLock如何加锁解锁,实现可重入性?

从RLock进入,找到RedissonLock类,找到tryLock 方法再继续找到tryAcquireOnceAsync 方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)

// waitTime 等待时间,多久时间内都会在这尝试获取锁
// leaseTime 加锁时是否设置过期时间
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining) {
                    this.scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
 }

此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync 部分

evalWriteAsync方法是eval命令执行lua的入口

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

eval命令执行Lua脚本的地方,此处将Lua脚本展开

-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then 
  -- 新增该锁并且hash中该线程id对应的count置1
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  -- 设置过期时间
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 

-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  -- 线程重入次数++
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 
return redis.call('pttl', KEYS[1]);
// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV[2] = this.getLockName(threadId)

总共3个参数完成了一段逻辑:

  1. 判断该锁是否已经有对应hash表存在,
    • 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
    • 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
  2. 最后返回这把锁的ttl剩余时间

再看看RLock如何解锁?

看unlock方法,同样查找方法名,一路到unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}

将lua脚本展开

-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end;
-- 存在,计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  -- 过期时间重设
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0; 
else
  -- 删除并发布解锁消息
  redis.call('del', KEYS[1]); 
  redis.call('publish', KEYS[2], ARGV[1]); 
  return 1;
end; 
return nil;

该Lua KEYS有2个Arrays.asList(getName(), getChannelName())

name 锁名称
channelName,用于pubSub发布消息的channel名称

ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
internalLockLeaseTime,watchDog配置的超时时间,默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值

具体执行步骤如下:

  1. 如果该锁不存在则返回nil;
  2. 如果该锁存在则将其线程的hash key计数器-1,
  3. 计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;

加锁解锁流程总结如下:

总的来说就是通过Hash类型来存储锁的次数:

RLock的锁重试问题

需要分析的是锁重试的,所以,在使用lock.tryLock()方法的时候,不能用无参的。

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return this.tryLock(waitTime, -1L, unit);
}

在调用tryAcquire方法后,返回了一个Long的ttl

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    } else {
        time -= System.currentTimeMillis() - current;
        if (time <= 0L) {
            this.acquireFailed(waitTime, unit, threadId);
            return false;
        } else {
		//省略

继续跟着代码进去查看,最后会发现,调用tryLockInnerAsync方法。这个方法就是获取锁的Lua脚本的。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

这个lua脚本上面提到了。就是 判断,如果获取到锁,返回一个nil.也就是null。如果没有获取到,就调用 pttl,name。其实就是获取当前name锁的剩余有效期。

获取到ttl。如果返回null说获取锁成功,直接返回true.如果返回的不是null,说明需要进行重试操作了。主要是根据时间进行判断的。经过一系列判断后,do,while是真正执行重试相关逻辑的。如下:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
	//如果返回null,说明获取到了锁,直接返回
    if (ttl == null) {
        return true;
    } else {
    	//当前时间与进入方法时的时间进行比较
		//System.currentTimeMillis() - current表示前面获取锁消耗时间
        time -= System.currentTimeMillis() - current;////time是重试锁的等待时间,
        if (time <= 0L) {//剩余等待时间,如果剩余等待时间<=0,设置获取锁失败。
            this.acquireFailed(waitTime, unit, threadId);
            return false;
        } else {
			//再次获取当前时间
            current = System.currentTimeMillis();
			//刚刚尝试完获取锁失败,如果继续立即尝试一般是获取不到锁的,因此这里选择订阅的方式
			//订阅当前锁,在unlock释放锁的时候有个:redis.call('publish', KEYS[2], ARGV[1]); 所以这里就订阅了
            RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
			//进行等待RFuture的结果,等多久?等time的时间
            if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
				//time时间过完了还没有等到锁释放的通知
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
							//如果等待超时,就取消订阅
                            this.unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }

                this.acquireFailed(waitTime, unit, threadId);
				//返回获取锁失败
                return false;
            } else {//到这里表示在tme时间内获得了释放锁的通知
                boolean var16;
                try {
					//检查之前订阅等待的消耗时间
                    time -= System.currentTimeMillis() - current;
                    if (time <= 0L) {//当前的剩余等待时间
                        this.acquireFailed(waitTime, unit, threadId);
                        boolean var20 = false;
                        return var20;
                    }
					//这里开始进行重试相关逻辑。主要就是当前时间和进入方法时候的时间进行比较
                    do {
                        long currentTime = System.currentTimeMillis();
						//这里就是第一次重试
                        ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                        if (ttl == null) {//null表示获取锁失败
                            var16 = true;
                            return var16;
                        }
						
						//再试一次
                        time -= System.currentTimeMillis() - currentTime;
                        if (time <= 0L) {
                            this.acquireFailed(waitTime, unit, threadId);
                            var16 = false;
                            return var16;
                        }

                        currentTime = System.currentTimeMillis();
                        if (ttl >= 0L && ttl < time) { //也不是一直试,等别人释放
                           ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } else {
                            ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                        }

                        time -= System.currentTimeMillis() - currentTime;
                    } while(time > 0L);//时间还充足,继续等待
					//时间到期了,还没获取到锁,返回失败
                    this.acquireFailed(waitTime, unit, threadId);
                    var16 = false;
                } finally {
                    this.unsubscribe(subscribeFuture, threadId);
                }

                return var16;
            }
        }
    }
}

主要是do while机制进行锁重试的,while会检查时间是否还充足会继续循环。当然这个循环不是直接while(true)的盲等机制,而是利用信号量和订阅的方式实现的,会等别人释放锁,再进行尝试,这种方式对cpu友好

Redisson的超时续约

跟随tryLock代码,在RedissonLock类中的tryAcquireOnceAsync方法中,会看到如下代码:

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {//设置了锁过期时间
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
	//leaseTime = -1时,即没有设置了锁过期时间
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//,默认30秒
		TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
		//ttlRemainingFuture完成以后
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {//没有抛异常
                if (ttlRemaining) {//获取锁成功
                    this.scheduleExpirationRenewal(threadId);//自动更新续期时间的任务调度
                }

            }
        });
        return ttlRemainingFuture;
    }
}
  1. 在使用trylock的时候,如果设置了锁过期时间,就不会执行续命相关逻辑了。
  2. 其中默认的watchdogTimeout时间是30秒。
private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
	//获取一个entry,将entry放到map里,getEntryName()就是当前锁名称。
	//放到map里,即一个锁对应一个entry
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {//表示重入的,第二次放
        oldEntry.addThreadId(threadId);
    } else {//表示第一次放
        entry.addThreadId(threadId);
        this.renewExpiration();//第一次放,进行续约
    }

}

看门狗机制:在获取锁成功以后,开启一个定时任务,每隔一段时间就会去重置锁的超时时间,以
确保锁是在程序执行完unlock手动释放的,不会发生因为业务阻塞,key超时而自动释放的情况

到期续约方法:

private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {       //Timeout定时任务,或者叫周期任务
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
						//执行续命的操作
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    RedissonLock.this.renewExpiration();//再次调用
                                }

                            }
                        });
                    }
                }
            }
			//刷新周期, this.internalLockLeaseTime / 3L, 默认释放时间是30秒,除以3就是每10秒更新一次
		//续命时间为1/3的过期时间,设置续命单位是秒
		},this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); 
		ee.setTimeout(task);
	}
}

查看renewExpirationAsync方法源码,其调用了Lua脚本执行续命操作的。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

pexpire重置锁的有效期。

总体逻辑如下:

  1. 开启一个任务,10秒钟后执行
  2. 开始的这个任务中重置有效期。假设设置的是默认30秒,则重置为30秒
  3. 更新后又重复步骤1、2

那么什么时候取消这个续约的任务呢?在释放锁unlock时

 public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise();
    RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
		//取消这个任务
        this.cancelExpirationRenewal(threadId);
        if (e != null) {
            result.tryFailure(e);
        } else if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
            result.tryFailure(cause);
        } else {
            result.trySuccess((Object)null);
        }
    });
    return result;
}

multilock解决主从一致性问题

如果Redis是主从集群,主从同步存在延迟,当主机宕机时,从成为了主,但可能存在从此时还未完成同步,因此从上就没有锁标识,此时会出现并发安全问题。

因此redisson提出来了MutiLock锁,使用这把锁就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

使用multilock()方法。必须在所有的节点都获取锁成功,才算成功。 缺点是运维成本高,实现复杂。

@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient2 redissonClient2;
@Resource
private RedissonClient3 redissonClient3;

RLock lock = redissonClient.getMultilock(lock1,lock2,lock3)

总结Redisson

Redisson分布式锁解决前三个问题原理

总结Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能来实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,开启一个定时任务,每隔一段时间(releaseTime/3),重置超时时间。
  • 使用multilock: 多个独立的redis节点,必须在所有节点都获取重入锁,才算获取成功;

redLock

不管是redLock,还是redissonLock,两者底层都是通过相同的lua脚本来加锁、释放锁的,所以,两者只是外部形态的不同,底层是一样的。redLock是继承了redissonMultiLock,大部分的逻辑,都是在redissonMultiLock中去实现的,所以源码部分,大部分都是RedissonMultiLock

原理

  • redLock的使用,需要有奇数台独立部署的Redis节点
  • 在加锁的时候,会分别去N台节点上加锁,如果半数以上的节点加锁成功,就认为当前线程加锁成功

面试题专栏

Java面试题专栏
已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;

那么可以私信我,我会尽我所能帮助你。

在平时编写Markdown文档上传博客园时,图片都是个大问题。由于本地图片无法直接上传博客园,所以就希望制作一个图床,把图片自动上传到云端,这样只需要有个正确的下载链接,就能在任何地方浏览图片。

长话短说,下面是简单的个人图床制作过程:

第一步:Gitee注册、创建仓库、获取私人令牌

  • 打开
    Gitee网址
    ,如果没有账号就自行注册、登录,之所以选择Gitee是因为它在国内免费、网络速度快。
  • 创建Picture图床仓库,登录后在Gitee左侧中间部分找到
    “我活跃的仓库”
    点击
    “新建”
    按钮。

image-20241126074616806

  • 关于新创建仓库的设置

    image-20241126075609522

  • 只要按照上面步骤,相信仓库大家都已经创建完成,下面是关于私人令牌的获取。

    在Gitee首页右上角点击个人头像有个弹窗,点击设置

    image-20241126080123137

进入设置后,在左面选项栏下拉点击
私人令牌

image-20241126080309351

进入后点击
生成新令牌
,并
设置令牌权限
,点击提交

image-20241126080600779

image-20241126080804993

验证密码,生成令牌后复制保存,等会要用

image-20241126080935537

到这Gitee工作就已经全部准备完成。

第二步Picgo应用的获取、设置

  • 下载Picgo软件,官网地址https://github.com/Molunerfinn/PicGo/releases

    如果这个地址打不开,就先用下载好的:
    https://wwgn.lanzoul.com/igBWe2gakd1i
    密码:gtgq

  • 软件下载完成后,桌面就会多一个Picgo图标

    image-20241126082438653

    点击后你会发现电脑并没有什么变化,这是因为Picgo软件默认状态为
    “静默启动”
    ,启动成功后就把图标挂在状态栏,并不会打开一个新窗口。点击这个图标打开软件窗口、设置参数(是不是太啰嗦了?)

    image-20241126082817964

在Picgo软件中安装Gitee插件并完成配置

image-20241126083506531

这里不方便截图,直接找个视频,把别人制作好的步骤截图:

image-20241126083925169

image-20241126084027825

最后点击确定,Picgo就和Gitee建立链接了,首页选择Gitee图床

image-20241126084438511

设置Picgo软件,打开时间戳重命名防止图片名称重复

image-20241126084706023

到这里第二步就完成了,可以上传一张图片看看,上传成功后在相册可以查看。

第三部配置Typora软件,实现Markdown文档图片自由。

image-20241126085343736

1.概述

在rpc请求里,有了请求req就必然有回复resp。本文就来解析发送req的节点收到resp该怎么处理。

2.handle_peer_resp源码解析

void raft_server::handle_peer_resp(ptr<resp_msg>& resp, const ptr<rpc_exception>& err)
{
    if (err)
    {
        l_->info(sstrfmt("peer response error: %s").fmt(err->what()));
        return;
    }

    // update peer last response time
    {
        read_lock(peers_lock_);
        auto peer = peers_.find(resp->get_src());
        if (peer != peers_.end())
        {
            peer->second->set_last_resp(system_clock::now());
        }
        else
        {
            l_->info(sstrfmt("Peer %d not found, ignore the message").fmt(resp->get_src()));
            return;
        }
    }

    l_->debug(lstrfmt("Receive a %s message from peer %d with Result=%d, Term=%llu, NextIndex=%llu")
                  .fmt(
                      __msg_type_str[resp->get_type()],
                      resp->get_src(),
                      resp->get_accepted() ? 1 : 0,
                      resp->get_term(),
                      resp->get_next_idx()));

    {
        recur_lock(lock_);
        // if term is updated, no more action is required
        if (update_term(resp->get_term()))
        {
            return;
        }

        // ignore the response that with lower term for safety
        switch (resp->get_type())
        {
            case msg_type::vote_response:
                handle_voting_resp(*resp);
                break;
            case msg_type::append_entries_response:
                handle_append_entries_resp(*resp);
                break;
            case msg_type::install_snapshot_response:
                handle_install_snapshot_resp(*resp);
                break;
            default:
                l_->err(sstrfmt("Received an unexpected message %s for response, system exits.")
                            .fmt(__msg_type_str[resp->get_type()]));
                ctx_->state_mgr_->system_exit(-1);
                ::exit(-1);
                break;
        }
    }
}
  • 1.与rep_handlers类似,resp_handlers同样有一个总的处理resp的函数,通过switch-case来分流。
  • 2.在交给具体的resp_handlers之前,handle_peer_resp还更新了peer的last_response_time。
  • 3.如果这个resp可以更新节点的term,说明节点已经落后了,无需进行任何操作。

3.handle_voting_resp源码解析

void raft_server::handle_voting_resp(resp_msg& resp)
{
    if (resp.get_term() != state_->get_term())
    {
        l_->info(sstrfmt("Received an outdated vote response at term %llu v.s. current term %llu")
                     .fmt(resp.get_term(), state_->get_term()));
        return;
    }

    if (election_completed_)
    {
        l_->info("Election completed, will ignore the voting result from this server");
        return;
    }

    if (voted_servers_.find(resp.get_src()) != voted_servers_.end())
    {
        l_->info(sstrfmt("Duplicate vote from %d for term %lld").fmt(resp.get_src(), state_->get_term()));
        return;
    }

    {
        read_lock(peers_lock_);
        voted_servers_.insert(resp.get_src());
        if (resp.get_accepted())
        {
            votes_granted_ += 1;
        }

        if (voted_servers_.size() >= (peers_.size() + 1))
        {
            election_completed_ = true;
        }

        if (votes_granted_ > (int32)((peers_.size() + 1) / 2))
        {
            l_->info(sstrfmt("Server is elected as leader for term %llu").fmt(state_->get_term()));
            election_completed_ = true;
            become_leader();
        }
    }
}
  • 1.if (resp.get_term() != state_->get_term())判断term是否相同,相同继续。
  • 2.判断if (election_completed_)选举是否完成,因为candidate只需要一半以上就会成功,所以可能出现election结束了但还收到了resp的情况。
  • 3.判断发送resp的节点在不在candidate的voted_servers_里面,在的话说明收到了同一个节点的两票,出错。
  • 4.如果 (voted_servers_.size() >= (peers_.size() + 1))说明选举已经结束。
  • 5.通过resp.get_accepted()来统计自己的得票,如果超过了一半说明成功了,调用become_leader();

4.handle_append_entries_resp源码解析

void raft_server::handle_append_entries_resp(resp_msg& resp)
{
    read_lock(peers_lock_);
    peer_itor it = peers_.find(resp.get_src());
    if (it == peers_.end())
    {
        l_->info(sstrfmt("the response is from an unkonw peer %d").fmt(resp.get_src()));
        return;
    }

    // if there are pending logs to be synced or commit index need to be advanced, continue to send appendEntries to
    // this peer
    bool need_to_catchup = true;
    ptr<peer> p = it->second;
    if (resp.get_accepted())
    {
        {
            auto_lock(p->get_lock());
            p->set_next_log_idx(resp.get_next_idx());
            p->set_matched_idx(resp.get_next_idx() - 1);
        }

        // try to commit with this response
        std::vector<ulong> matched_indexes(peers_.size() + 1);
        matched_indexes[0] = log_store_->next_slot() - 1;
        int i = 1;
        for (it = peers_.begin(); it != peers_.end(); ++it, i++)
        {
            matched_indexes[i] = it->second->get_matched_idx();
        }

        std::sort(matched_indexes.begin(), matched_indexes.end(), std::greater<ulong>());
        commit(matched_indexes[(peers_.size() + 1) / 2]);
        need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
    }
    else
    {
        std::lock_guard<std::mutex> guard(p->get_lock());
        if (resp.get_next_idx() > 0 && p->get_next_log_idx() > resp.get_next_idx())
        {
            // fast move for the peer to catch up
            p->set_next_log_idx(resp.get_next_idx());
        }
        else if (p->get_next_log_idx() > 0)
        {
            p->set_next_log_idx(p->get_next_log_idx() - 1);
        }
    }

    // This may not be a leader anymore, such as the response was sent out long time ago
    // and the role was updated by UpdateTerm call
    // Try to match up the logs for this peer
    if (role_ == srv_role::leader && need_to_catchup)
    {
        request_append_entries(*p);
    }
}
  • 1.首先判断发送resp的节点是不是还在peers_列表里面,不在报错。
  • 2.如果resp的accepted = 1,说明append-entry成功了,设置match_idx与next_idx。
  • 3.提取peers_列表里面所有的match_idx,然后sort排序。
  • 4.排序后取中位数mid_idx,说明至少有一半以上的follower都应用到了mid_idx,将mid应用到自己(也就是leader)的状态机。
  • 5.如果没接受(accepted = 0),那么就调整next_idx继续逼近。(具体可看
    cornerstone中msg类型解析
    )
  • 6.如果该节点还需要catch_up,再发送一遍。(可能是该leader在很久之前给他发的req,现在节点才回复,导致节点依然落后。)

知识点:
log_entry是先让follower应用到状态机,只有超过半数以上的都应用了,leader才会应用到自己的状态机。具体到实现,可以将所有节点的match_idx排序然后取中位数。

5.handle_install_snapshot_resp源码解析

void raft_server::handle_install_snapshot_resp(resp_msg& resp)
{
    read_lock(peers_lock_);
    peer_itor it = peers_.find(resp.get_src());
    if (it == peers_.end())
    {
        l_->info(sstrfmt("the response is from an unkonw peer %d").fmt(resp.get_src()));
        return;
    }

    // if there are pending logs to be synced or commit index need to be advanced, continue to send appendEntries to
    // this peer
    bool need_to_catchup = true;
    ptr<peer> p = it->second;
    if (resp.get_accepted())
    {
        std::lock_guard<std::mutex> guard(p->get_lock());
        ptr<snapshot_sync_ctx> sync_ctx = p->get_snapshot_sync_ctx();
        if (sync_ctx == nilptr)
        {
            l_->info("no snapshot sync context for this peer, drop the response");
            need_to_catchup = false;
        }
        else
        {
            if (resp.get_next_idx() >= sync_ctx->get_snapshot()->size())
            {
                l_->debug("snapshot sync is done");
                ptr<snapshot> nil_snp;
                p->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
                p->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
                p->set_snapshot_in_sync(nil_snp);
                need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
            }
            else
            {
                l_->debug(sstrfmt("continue to sync snapshot at offset %llu").fmt(resp.get_next_idx()));
                sync_ctx->set_offset(resp.get_next_idx());
            }
        }
    }
    else
    {
        l_->info("peer declines to install the snapshot, will retry");
    }

    // This may not be a leader anymore, such as the response was sent out long time ago
    // and the role was updated by UpdateTerm call
    // Try to match up the logs for this peer
    if (role_ == srv_role::leader && need_to_catchup)
    {
        request_append_entries(*p);
    }
}
  • 核心代码是,其他与上面一致。
if (resp.get_next_idx() >= sync_ctx->get_snapshot()->size())
            {
                l_->debug("snapshot sync is done");
                ptr<snapshot> nil_snp;
                p->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
                p->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
                p->set_snapshot_in_sync(nil_snp);
                need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
            }
            else
            {
                l_->debug(sstrfmt("continue to sync snapshot at offset %llu").fmt(resp.get_next_idx()));
                sync_ctx->set_offset(resp.get_next_idx());
            }
  • 因为这是snapshot,不需要发idx,所以resp.get_next_idx()实际上是follower已经接受snapshot的offset。如果接受的offset >= sync_ctx->get_snapshot()->size(),说明已经完成了,设置next_idx与match_idx。否则继续从已经接受的offset位置继续发送。

6.额外的ext_resp处理源码解析

void raft_server::handle_ext_resp(ptr<resp_msg>& resp, const ptr<rpc_exception>& err)
{
    recur_lock(lock_);
    if (err)
    {
        handle_ext_resp_err(*err);
        return;
    }

    l_->debug(lstrfmt("Receive an extended %s message from peer %d with Result=%d, Term=%llu, NextIndex=%llu")
                  .fmt(
                      __msg_type_str[resp->get_type()],
                      resp->get_src(),
                      resp->get_accepted() ? 1 : 0,
                      resp->get_term(),
                      resp->get_next_idx()));

    switch (resp->get_type())
    {
        case msg_type::sync_log_response:
            if (srv_to_join_)
            {
                // we are reusing heartbeat interval value to indicate when to stop retry
                srv_to_join_->resume_hb_speed();
                srv_to_join_->set_next_log_idx(resp->get_next_idx());
                srv_to_join_->set_matched_idx(resp->get_next_idx() - 1);
                sync_log_to_new_srv(resp->get_next_idx());
            }
            break;
        case msg_type::join_cluster_response:
            if (srv_to_join_)
            {
                if (resp->get_accepted())
                {
                    l_->debug("new server confirms it will join, start syncing logs to it");
                    sync_log_to_new_srv(resp->get_next_idx());
                }
                else
                {
                    l_->debug("new server cannot accept the invitation, give up");
                }
            }
            else
            {
                l_->debug("no server to join, drop the message");
            }
            break;
        case msg_type::leave_cluster_response:
            if (!resp->get_accepted())
            {
                l_->debug("peer doesn't accept to stepping down, stop proceeding");
                return;
            }

            l_->debug("peer accepted to stepping down, removing this server from cluster");
            rm_srv_from_cluster(resp->get_src());
            break;
        case msg_type::install_snapshot_response:
        {
            if (!srv_to_join_)
            {
                l_->info("no server to join, the response must be very old.");
                return;
            }

            if (!resp->get_accepted())
            {
                l_->info("peer doesn't accept the snapshot installation request");
                return;
            }

            ptr<snapshot_sync_ctx> sync_ctx = srv_to_join_->get_snapshot_sync_ctx();
            if (sync_ctx == nilptr)
            {
                l_->err("Bug! SnapshotSyncContext must not be null");
                ctx_->state_mgr_->system_exit(-1);
                ::exit(-1);
                return;
            }

            if (resp->get_next_idx() >= sync_ctx->get_snapshot()->size())
            {
                // snapshot is done
                ptr<snapshot> nil_snap;
                l_->debug("snapshot has been copied and applied to new server, continue to sync logs after snapshot");
                srv_to_join_->set_snapshot_in_sync(nil_snap);
                srv_to_join_->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
                srv_to_join_->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
            }
            else
            {
                sync_ctx->set_offset(resp->get_next_idx());
                l_->debug(sstrfmt("continue to send snapshot to new server at offset %llu").fmt(resp->get_next_idx()));
            }

            sync_log_to_new_srv(srv_to_join_->get_next_log_idx());
        }
        break;
        case msg_type::prevote_response:
            handle_prevote_resp(*resp);
            break;
        default:
            l_->err(lstrfmt("received an unexpected response message type %s, for safety, stepping down")
                        .fmt(__msg_type_str[resp->get_type()]));
            ctx_->state_mgr_->system_exit(-1);
            ::exit(-1);
            break;
    }
}

在解析前我们先梳理一下调用顺序:
1.leader向新节点发送invite_srv_to_join_cluster,新节点收到invite_srv_to_join_cluster请求后更新自己的role_,leader_等状态,并调用reconfigure重置cluster的config。更新完后发送join_cluster_response给leader。
2.leader收到该response后调用switch-case里面的msg_type::join_cluster_response分支来处理。处理完join_cluster_response会调用ync_log_to_new_srv向新节点发送sync_log_req。
3.新节点收到sync_log_req后发送sync_log_response给leader,leader收到后调用switch-case里面的msg_type::sync_log_response分支。

void raft_server::sync_log_to_new_srv(ulong start_idx)
{
    // only sync committed logs
    int32 gap = (int32)(quick_commit_idx_ - start_idx);
    if (gap < ctx_->params_->log_sync_stop_gap_)
    {
        l_->info(lstrfmt("LogSync is done for server %d with log gap %d, now put the server into cluster")
                     .fmt(srv_to_join_->get_id(), gap));
        ptr<cluster_config> new_conf = cs_new<cluster_config>(log_store_->next_slot(), config_->get_log_idx());
        new_conf->get_servers().insert(
            new_conf->get_servers().end(), config_->get_servers().begin(), config_->get_servers().end());
        new_conf->get_servers().push_back(conf_to_add_);
        bufptr new_conf_buf(new_conf->serialize());
        ptr<log_entry> entry(cs_new<log_entry>(state_->get_term(), std::move(new_conf_buf), log_val_type::conf));
        log_store_->append(entry);
        config_changing_ = true;
        request_append_entries();
        return;
    }

    ptr<req_msg> req;
    if (start_idx > 0 && start_idx < log_store_->start_index())
    {
        req = create_sync_snapshot_req(*srv_to_join_, start_idx, state_->get_term(), quick_commit_idx_);
    }
    else
    {
        int32 size_to_sync = std::min(gap, ctx_->params_->log_sync_batch_size_);
        bufptr log_pack = log_store_->pack(start_idx, size_to_sync);
        req = cs_new<req_msg>(
            state_->get_term(),
            msg_type::sync_log_request,
            id_,
            srv_to_join_->get_id(),
            0L,
            start_idx - 1,
            quick_commit_idx_);
        req->log_entries().push_back(
            cs_new<log_entry>(state_->get_term(), std::move(log_pack), log_val_type::log_pack));
    }

    srv_to_join_->send_req(req, ex_resp_handler_);
}
  • 1.msg_type::sync_log_response类型:
                srv_to_join_->resume_hb_speed();
                srv_to_join_->set_next_log_idx(resp->get_next_idx());
                srv_to_join_->set_matched_idx(resp->get_next_idx() - 1);
                sync_log_to_new_srv(resp->get_next_idx());

收到了节点的resp后,那么给该节点添加hb任务,设置next_idx与match_idx。然后调用sync_log_to_new_srv再去同步一遍给该节点,直到两者数据一致,否则重复发送sync_log_request。(类似redis里面主从同步的时候,把主节点在主从同步时候的写入操作写入一个buffer,然后在最后再发给从节点再同步一遍。)
根据上面sync_log_to_new_srv源码我们可以看到,sync_log不是单纯采用request_append_entry去数据同步。因为新加入的节点落后很多,所以leader采用的机制是先发送snapshot,直到新节点的last_log_idx大于leader的start_idx,接着分情况讨论,如果两者idx的差(gap) < ctx_->params_->log_sync_stop_gap_,说明gap不足以打包成log_pack,则调用request_append_entry,否则打包成log_pack发送。

  • 2.case msg_type::join_cluster_response类型:
case msg_type::join_cluster_response:
            if (srv_to_join_)
            {
                if (resp->get_accepted())
                {
                    l_->debug("new server confirms it will join, start syncing logs to it");
                    sync_log_to_new_srv(resp->get_next_idx());
                }
                else
                {
                    l_->debug("new server cannot accept the invitation, give up");
                }
            }
            else
            {
                l_->debug("no server to join, drop the message");
            }
            break;

解析完上一个resp,这里就好理解了。如果srv_to_join存在则调用sync_log_to_new_srv来数据同步。

  • 3.case msg_type::leave_cluster_response类型:
case msg_type::leave_cluster_response:
            if (!resp->get_accepted())
            {
                l_->debug("peer doesn't accept to stepping down, stop proceeding");
                return;
            }

            l_->debug("peer accepted to stepping down, removing this server from cluster");
            rm_srv_from_cluster(resp->get_src());
            break;

重点是rm_srv_from_cluster。

void raft_server::rm_srv_from_cluster(int32 srv_id)
{
    ptr<cluster_config> new_conf = cs_new<cluster_config>(log_store_->next_slot(), config_->get_log_idx());
    for (cluster_config::const_srv_itor it = config_->get_servers().begin(); it != config_->get_servers().end(); ++it)
    {
        if ((*it)->get_id() != srv_id)
        {
            new_conf->get_servers().push_back(*it);
        }
    }

    l_->info(lstrfmt("removed a server from configuration and save the configuration to log store at %llu")
                 .fmt(new_conf->get_log_idx()));
    config_changing_ = true;
    bufptr new_conf_buf(new_conf->serialize());
    ptr<log_entry> entry(cs_new<log_entry>(state_->get_term(), std::move(new_conf_buf), log_val_type::conf));
    log_store_->append(entry);
    request_append_entries();
}

先把要移除的srv从leader的config移除,然后把cluster的更改写入leader的log_store,调用request_append_entries广播给各个follower,达到所有节点更改的效果。

  • 4.install_snapshot_response类型:
 case msg_type::install_snapshot_response:
        {
            if (!srv_to_join_)
            {
                l_->info("no server to join, the response must be very old.");
                return;
            }

            if (!resp->get_accepted())
            {
                l_->info("peer doesn't accept the snapshot installation request");
                return;
            }

            ptr<snapshot_sync_ctx> sync_ctx = srv_to_join_->get_snapshot_sync_ctx();
            if (sync_ctx == nilptr)
            {
                l_->err("Bug! SnapshotSyncContext must not be null");
                ctx_->state_mgr_->system_exit(-1);
                ::exit(-1);
                return;
            }

            if (resp->get_next_idx() >= sync_ctx->get_snapshot()->size())
            {
                // snapshot is done
                ptr<snapshot> nil_snap;
                l_->debug("snapshot has been copied and applied to new server, continue to sync logs after snapshot");
                srv_to_join_->set_snapshot_in_sync(nil_snap);
                srv_to_join_->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
                srv_to_join_->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
            }
            else
            {
                sync_ctx->set_offset(resp->get_next_idx());
                l_->debug(sstrfmt("continue to send snapshot to new server at offset %llu").fmt(resp->get_next_idx()));
            }

            sync_log_to_new_srv(srv_to_join_->get_next_log_idx());
        }
        break;

(1)因为snapshot是分段传送的,如果没有srv_to_join_,则根本无法跟踪offset,因此报错。
(2)if (sync_ctx == nilptr)与(1)同理,必须要有sync_ctx,否则无法跟踪offset。
(3)在前面handle_install_snapshot_resp里面我们说过resp->get_next_idx()记录的其实是snapshot的offset,根据offset我们分两种情况,如果offset >= sync_ctx->get_snapshot()->size()说明snapshot已经完成了,更新next_idx与match_idx。否则从resp里面的offset继续同步。
(4)安装完snapshot之后还要调用更小粒度的sync_log_to_new_srv(srv_to_join_->get_next_log_idx())来进一步同步数据。(类似redis里面持久化先应用RDB快速同步再用AOF更细粒度同步)

  • 5.case msg_type::prevote_response类型:
case msg_type::prevote_response:
            handle_prevote_resp(*resp);
            break;

重点是handle_prevote_resp:

void raft_server::handle_prevote_resp(resp_msg& resp)
{
    if (!prevote_state_)
    {
        l_->info(sstrfmt("Prevote has completed, term received: %llu, current term %llu")
                     .fmt(resp.get_term(), state_->get_term()));
        return;
    }

    {
        read_lock(peers_lock_);
        bool vote_added = prevote_state_->add_voted_server(resp.get_src());
        if (!vote_added)
        {
            l_->info("Prevote has from %d has been processed.");
            return;
        }

        if (resp.get_accepted())
        {
            prevote_state_->inc_accepted_votes();
        }

        if (prevote_state_->get_accepted_votes() > (int32)((peers_.size() + 1) / 2))
        {
            l_->info(sstrfmt("Prevote passed for term %llu").fmt(state_->get_term()));
            become_candidate();
        }
        else if (prevote_state_->num_of_votes() >= (peers_.size() + 1))
        {
            l_->info(sstrfmt("Prevote failed for term %llu").fmt(state_->get_term()));
            prevote_state_->reset();  // still in prevote state, just reset the prevote state
            restart_election_timer(); // restart election timer for a new round of prevote
        }
    }
}

如果得到票数超过一半,成为candidate,否则再开始新一轮prevote。

7.总结

  • 1.log_entry是先让follower应用到状态机,只有超过半数以上的都应用了,leader才会应用到自己的状态机。具体到实现,可以将所有节点的match_idx排序然后取中位数。
  • 2.对于snapshot的req与resp,可以利用idx这一项来记录offset。
  • 3.对于新节点数据同步,采用snapshot,log_pack等方式加快数据同步。
  • 4.数据同步需要多次同步,直到粒度满足要求。
  • 5.因为snapshot是分段传送的,如果无法跟踪offset,说明resp错误。

前一篇:《用于自然语言处理的循环神经网络RNN》

序言:
本节主要讲解如何使用循环神经网络(RNN)创建一个文本分类器。RNN 是一类适合处理序列数据的神经网络的统称,而我们将在本节中使用 RNN 的一种常见变体——LSTM(长短期记忆网络)来实现这一文本分类器。

使用RNN创建文本分类器

在第六章中,你尝试使用嵌入层为讽刺数据集创建分类器。在那种情况下,单词会先被转换为向量,然后聚合后再输入全连接层进行分类。而如果使用RNN层(例如LSTM),则不需要聚合,可以直接将嵌入层的输出传递到循环层中。

关于循环层的维度,有一个常见的经验法则是:它的大小通常和嵌入维度相同。这并不是必须的,但可以作为一个不错的起点。注意,在第六章中我提到嵌入维度通常是词汇量的四次方根,但在使用RNN时,这个规则往往会被忽略,因为如果遵循这个规则,循环层的维度可能会太小。

例如,第六章中开发的讽刺分类器的简单模型架构,可以更新为如下形式,以使用双向LSTM:

model = tf.keras.Sequential([

tf.keras.layers.Embedding(vocab_size, embedding_dim),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim)),

tf.keras.layers.Dense(24, activation='relu'),

tf.keras.layers.Dense(1, activation='sigmoid')

])

损失函数和分类器可以设置为以下内容(注意学习率为0.00001或1e–5):

adam = tf.keras.optimizers.Adam(learning_rate=0.00001,

beta_1=0.9, beta_2=0.999, amsgrad=False)

model.compile(loss='binary_crossentropy',

optimizer=adam, metrics=['accuracy'])

当打印出模型架构的摘要时,你会看到类似以下的内容。注意,词汇量大小为20,000,嵌入维度为64。这会在嵌入层中产生1,280,000个参数,而双向层会有128个神经元(64个前向,64个后向):

Layer (type) Output Shape Param #

=================================================================

embedding_11 (Embedding) (None, None, 64) 1280000


bidirectional_7 (Bidirection) (None, 128) 66048


dense_18 (Dense) (None, 24) 3096


dense_19 (Dense) (None, 1) 25

=================================================================

Total params: 1,349,169

Trainable params: 1,349,169

Non-trainable params: 0


图7-9显示了经过30个epoch的训练结果。

正如你所见,网络在训练数据上的准确率迅速超过90%,但在验证数据上稳定在80%左右。这与我们之前得到的结果类似,但检查图7-10中的损失图表可以发现,尽管验证集的损失在15个epoch之后有所分歧,但它趋于平稳,且相比第六章中的损失图表,值更低,即使使用了20,000个单词,而不是2,000个。

                          图7-9:LSTM在30个epoch中的准确率

                                图7-10:LSTM在30个epoch中的损失

不过,这只是使用了单个LSTM层。在下一节中,你将看到如何使用堆叠的LSTM层,并探索其对该数据集分类准确率的影响。

堆叠 LSTM

在上一节中,你已经了解了如何在嵌入层后使用 LSTM 层来帮助对讽刺数据集进行分类。但实际上,LSTM 可以堆叠使用,这种方法在许多最先进的自然语言处理模型中被广泛采用。

在 TensorFlow 中堆叠 LSTM 非常简单。你可以像添加全连接层一样添加额外的 LSTM 层,但有一个例外:除最后一层外,所有层都需要将 return_sequences 属性设置为 True。以下是一个示例:

model = tf.keras.Sequential([

tf.keras.layers.Embedding(vocab_size, embedding_dim),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim, return_sequences=True)),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim)),

tf.keras.layers.Dense(24, activation='relu'),

tf.keras.layers.Dense(1, activation='sigmoid')

])

最后一层也可以将 return_sequences=True 设置为 True,这样它会返回值序列供全连接层分类,而不是单个值。在解析模型输出时,这种设置可能非常有用,我们稍后会讨论这一点。

模型架构将会如下所示:

Layer (type) Output Shape Param #

=================================================================

embedding_12 (Embedding) (None, None, 64) 1280000


bidirectional_8 (Bidirection) (None, None, 128) 66048


bidirectional_9 (Bidirection) (None, 128) 98816


dense_20 (Dense) (None, 24) 3096


dense_21 (Dense) (None, 1) 25

=================================================================

Total params: 1,447,985

Trainable params: 1,447,985

Non-trainable params: 0


添加额外的 LSTM 层将增加大约 100,000 个需要学习的参数,总量增加了约 8%。虽然可能会稍微减慢网络速度,但如果带来了合理的性能提升,这个代价还是可以接受的。

经过 30 个 epoch 的训练后,结果如图 7-11 所示。虽然验证集的准确率表现平稳,但查看损失(图 7-12)会揭示一个不同的故事。

                        图 7-11:堆叠 LSTM 架构的准确率

从图 7-12 可以看出,尽管训练和验证的准确率表现良好,但验证集的损失迅速上升,这是过拟合的明显迹象。

                              图 7-12:堆叠 LSTM 架构的损失

这种过拟合的表现为:训练准确率逐渐接近 100%,损失平稳下降,而验证准确率相对稳定,但验证损失急剧上升。这说明模型对训练集过于专注而产生了过拟合问题。正如第六章的例子所示,仅查看准确率指标可能会让人产生一种错误的安全感,因此必须结合损失图表分析。

优化堆叠 LSTM

在第六章中,你已经看到一个非常有效的减少过拟合的方法是降低学习率。可以探索一下这个方法对循环神经网络是否也有积极影响。

例如,以下代码将学习率从 0.00001 降低了 20%,变为 0.000008:

adam = tf.keras.optimizers.Adam(learning_rate=0.000008, beta_1=0.9, beta_2=0.999, amsgrad=False)

model.compile(loss='binary_crossentropy', optimizer=adam, metrics=['accuracy'])

图 7-13 展示了这种变化对训练的影响。虽然差异不大,但曲线(尤其是验证集)变得更加平滑了。

                                    图 7-13:降低学习率对堆叠 LSTM 准确率的影响

类似地,查看图 7-14 也显示,虽然整体趋势类似,但降低学习率使得损失增长速度明显降低:在 30 个 epoch 后,损失约为 0.6,而更高学习率时接近 0.8。这表明调整学习率超参数是值得探索的。

                                图 7-14:降低学习率对堆叠 LSTM 损失的影响

使用 Dropout

除了调整学习率,还可以考虑在 LSTM 层中使用 Dropout。正如第三章所讨论的,Dropout 的作用是随机丢弃一些神经元,以避免由于邻近神经元的影响而产生的偏差。

在 LSTM 层中,可以通过一个参数直接实现 Dropout。例如:

model = tf.keras.Sequential([

tf.keras.layers.Embedding(vocab_size, embedding_dim),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim, return_sequences=True, dropout=0.2)),

tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim, dropout=0.2)),

tf.keras.layers.Dense(24, activation='relu'),

tf.keras.layers.Dense(1, activation='sigmoid')

])

需要注意的是,使用 Dropout 会显著降低训练速度。在我的实验中,在 Colab 上训练时间从每个 epoch 大约 10 秒增加到了 180 秒。

使用 Dropout 的准确率结果见图 7-15。从图中可以看到,Dropout 对网络的准确率几乎没有负面影响,这是一件好事!通常人们会担心丢弃神经元会让模型表现更差,但这里显然不是这样。

                                      图 7-15:使用 Dropout 的堆叠 LSTM 的准确率

此外,对损失也有积极影响,如图 7-16 所示。尽管曲线明显分离,但相比之前,它们更接近了,并且验证集的损失趋于平稳,约为 0.5,比之前的 0.8 要好得多。这个例子表明,Dropout 是一种能够改善基于 LSTM 的 RNN 性能的实用技术。

                                        图 7-16:启用 Dropout 的 LSTM 损失曲线

探索这些技术来避免数据过拟合是值得的,同时也要结合前几节中介绍的数据预处理技术。但还有一种方法我们尚未尝试——一种使用预训练词嵌入替代自学嵌入的迁移学习方法。我们将在下一节中探索这一内容。

总结:
本篇文章详细讲解了如何利用 LSTM 神经网络构建高效的文本分类器,并通过优化学习率、堆叠层数及应用 Dropout 等方法,提升模型性能,避免过拟合,为文本处理任务提供了实用的实现方案。

业务场景

image

teacher表中的tech_class字段存储的是每个老师所教授的课程,课程之间以英文逗号分隔。现在要用语句统计每个课程对应的教师数量。语句及效果如下:

image

语句其实很简单,各种博客或者gpt都有不错且可行的解决方案,我们主要来理解下这段语句的执行原理,更好的学习。

part1 REGEXP_SUBSTR

关于REGEXP_SUBSTR的官方文档

具体语法这里不再赘叙,我们从单个例子入手看看效果:

image

image

REGEXP_SUBSTR可以将字段字符串根据所给正则表达式匹配并拆分(注意不是分割,但效果上等同于分割)。

最后一个参数代表要取出第几个匹配的结果:

image

那为什么这里要使用LEVEL?LEVEL是什么?

关于LEVEL的官方解释
具体如图:

image

使用之前要注意,官方文档里有句话:

To define a hierarchical relationship in a query, you must use the CONNECT BY clause.

所以关于connect by,你可以先往后看。

使用LEVEL后的效果:

image

LEVEL是一个在CONNECT BY子句中使用的伪列,它代表当前递归层次的级别。在每次递归调用中,LEVEL的值会增加1。在这个例子中,LEVEL的值会从1开始,一直到tech_class中逗号分隔的子串的数量——3。

为什么这样会有81条?我们的预期结果其实是3条。让我们继续探究......

image

image

image

Football是字段里的第一个值,只有1条;Basketball是字段里的第二个值,有10条;PingPang是字段里的第三个值,有70条!貌似越往后数据重复越多,而且次数增长的可怕,但很难发现出有什么规律。检索后基本确定出现重复数据是因为在递归过程中,regexp_substr函数没有正确移动到下一个匹配项,而是重复移动到了Basketball或者PingPang,至于它底层是什么重复移动的,额我也没搞明白....。

对此我们需要添加
prior
确保每次递归时都能正确提取。

part3 prior

关于prior的简单介绍
connect by中加prior可以限定父子的对应关系,限定递归路径。这里对同条记录进行递归:

image

加sys_guid()是为了保证层次查询,存在循环时,不出现无限递归。它为每行生成一个唯一标识,从而避免无限循环。

Part3 connect by

CONNECT BY的官方文档--分级查询
connect by常常结合prior一起实现父级查询。因此connect by LEVEL prior一般都一起出现。

最后再次附上针对原始的业务需求的完整的语句及输出:

select
regexp_substr(tech_class, '[^,]+', 1, LEVEL) as class_name,
tech_name
from teacher
CONNECT BY LEVEL <= REGEXP_COUNT(tech_class, '[^,]+')
and prior tech_class = tech_class
and prior sys_guid() is not null
order by class_name

image