2024年3月

面向对象编程(OOP)
是一个将现实世界抽象为一系列对象的编程范式,这些对象通过消息传递机制来互相交流和协作。

OOP 的主要特性包括四个基本概念:
封装
(Encapsulation)、
继承
(Inheritance)、
多态
(Polymorphism)以及
抽象
(Abstraction)。

封装(Encapsulation)

封装是一种将数据(
属性
)和行为(
方法
)绑定在一起的方法。

通过封装,可以隐藏对象的具体实现细节,仅暴露出有限的接口供外界访问。

优势

封装的优势:

  • 增强安全性
    :隐藏内部实现细节,防止外部直接访问对象内部的数据,减少因误用导致的错误

这里我编写了一个
UserCredentials
类,来进行演示一下
增强安全性
,分别体现在什么地方,代码如下:

/**
 * @author BNTang
 * @description 用户凭证类
 */
public class UserCredentials {
    // 私有属性,外部无法直接访问
    private String username;
    private String password;

    // 公有的构造函数,用于初始化用户名和密码
    public UserCredentials(String username, String password) {
        this.username = username;
        this.password = password;
    }

    // 公有的方法,用于验证密码是否正确
    public boolean authenticate(String inputPassword) {
        return inputPassword != null && inputPassword.equals(this.password);
    }

    // 获得用户名的公有方法
    public String getUsername() {
        return this.username;
    }

    // 重置密码的方法,增加安全性校验
    public void resetPassword(String oldPassword, String newPassword) {
        if (authenticate(oldPassword)) {
            this.password = newPassword;
            System.out.println("密码重置成功。");
        } else {
            System.out.println("旧密码不正确,密码重置失败。");
        }
    }

    // 私有的设置密码方法,外部无法访问
    private void setPassword(String password) {
        this.password = password;
    }
}

在我提供的
UserCredentials
类的代码中,隐藏内部实现细节、防止外部直接访问对象内部的数据以及减少因误用导致的错误的概念都得到了实现。

  1. 隐藏内部实现细节
private void setPassword(String password) {
    this.password = password;
}

setPassword
方法是私有的 (
private
),意味着它只能在类内部被调用。外部代码不能直接调用此方法来设置密码,这正是隐藏内部实现细节的体现。

  1. 防止外部直接访问对象内部的数据
private String username;
private String password;

这段代码中,用户名 (
username
) 和密码 (
password
) 被声明为私有变量 (
private
),这意味着它们不能从类的外部直接访问,只能通过类提供的公有方法(如构造方法、
getUsername

authenticate

resetPassword
方法等)来间接访问或修改。这种机制有效地保护了类的内部数据。

  1. 减少因误用导致的错误
public void resetPassword(String oldPassword, String newPassword) {
    if (authenticate(oldPassword)) {
        this.password = newPassword;
        System.out.println("密码重置成功。");
    } else {
        System.out.println("旧密码不正确,密码重置失败。");
    }
}


resetPassword
方法中,通过
authenticate
方法校验旧密码是否正确,只有在旧密码正确的情况下才允许用户设置新密码。这样的设计减少了因为外部代码错误使用(如直接设置密码而不进行旧密码验证)导致的安全问题,同时也确保了类内部数据的完整性和安全性。

  • 提高复用性
    :封装后的对象可以作为一个黑盒被重复使用,无需关心对象内部的复杂逻辑
  1. 封装后的对象作为一个黑盒被重复使用体现在:
UserCredentials adminCredentials = new UserCredentials("admin", "adminPass");
UserCredentials userCredentials = new UserCredentials("user", "userPass");

// 在不同场景中重复使用对象:
if (adminCredentials.authenticate("adminPass")) {
    // 执行管理员操作
}

if (userCredentials.authenticate("userPass")) {
    // 执行用户操作
}

adminCredentials

userCredentials

UserCredentials
的实例,在创建它们之后可以多次使用其
authenticate
方法来验证密码,这里的实例就像是提供认证功能的黑盒,使用者不必关心里面的逻辑是怎样的。

  1. 无需关心对象内部的复杂逻辑体现在
private String username;
private String password;

private void setPassword(String password) {
    this.password = password;
}

由于
username

password
属性被声明为私有的,外部代码不能直接访问或修改它们。设置密码的逻辑被隐藏在
setPassword
方法中,而这个方法也是私有的。外部代码需要通过公有方法如构造函数或
resetPassword
这些公有接口进行操作,因此外部代码不必关心如何存储或验证密码的内部逻辑,只需调用这些公有方法即可实现功能。

  • 易于维护
    :封装的代码更易理解与修改,修改内部实现时不会影响到使用该对象的代码
  1. 封装的代码更易理解与修改体现在
public boolean authenticate(String inputPassword) {
    return inputPassword != null && inputPassword.equals(this.password);
}

public void resetPassword(String oldPassword, String newPassword) {
    if (authenticate(oldPassword)) {
        this.password = newPassword;
        System.out.println("密码重置成功。");
    } else {
        System.out.println("旧密码不正确,密码重置失败。");
    }
}


authenticate

resetPassword
这两个公有方法中,封装的代码很易于理解:一个用于验证密码,一个用于重新设置密码。如果我们需要修改密码的存储逻辑,只需修改这些方法的内部逻辑,而无需修改方法的签名或其他使用这些方法的代码。

  1. 修改内部实现时不会影响到使用该对象的代码体现在
private String username;
private String password;

因为
username

password
是私有属性,所以它们对外部代码是不可见和不可访问的。我们可以在不改变任何使用
UserCredentials
对象的代码的情况下,自由改变这些属性的内部表示方法(比如对密码进行加密存储)。因为任何这样的改变都会被
UserCredentials
类的公共接口所封装和抽象化,从而不会泄露出去或者影响到依赖于这些公共接口的代码。

  • 接口与实现分离
    :提供清晰的接口,使得对象之间的依赖关系只基于接口,降低了耦合度
  1. 提供清晰的接口体现在
public boolean authenticate(String inputPassword);
public void resetPassword(String oldPassword, String newPassword);
public String getUsername();

这些公共方法形成了
UserCredentials
类的接口,它为外部代码提供了清晰的通信协议,明确了可以进行的操作。使用这个类的代码只需要知道这些方法的声明和预期行为,不需要了解它们背后的具体实现。

  1. 使得对象之间的依赖关系只基于接口体现在
UserCredentials credentials = new UserCredentials("username", "password");
boolean valid = credentials.authenticate("password");

只要
authenticate
方法的接口保持不变,外部代码就可以正常工作,完全无须关心
UserCredentials
内部是如何处理认证逻辑的。

  1. 降低了耦合度体现在
private void setPassword(String password) {
    // 假设这里改用了一种新的加密方式来设置密码
    this.password = encryptPassword(password);
}

即使改变了
setPassword
方法的内部实现(如加密),由于这个方法是私有的,外部代码不会受到影响。这种隔离提高了系统的模块化,使得各个部分可以独立变化而不互相干扰,从而降低了耦合度。

  • 隐藏实现细节,简化接口
    :用户只需知道对象公开的方法,不必了解其内部的复杂过程

应用场景

封装的应用场景:

  • 类的设计
    :在类定义时,通常将属性私有化(private),通过公共的方法(public methods)来访问和修改这些属性
  • 模块化组件
    :在设计模块化的系统时,每个组件都通过封装来定义自己的行为和接口,使得系统更易于组合和扩展
  • 库和框架的开发
    :开发者提供库和框架时,会通过封装隐藏复杂逻辑,只暴露简洁的 API 接口给其他开发者使用
  • 隔离变化
    :将可能变化的部分封装起来,变化发生时,只需修改封装层内部,不影响外部使用

通过封装,能够构建出结构清晰、易于管理和维护的代码。

完整代码可在此查阅:
GitHub

继承(Inheritance)

继承是一种能够让新创建的类(子类或派生类)接收另一个类(父类或基类)的属性和方法的机制。

在 Java 中,继承是通过使用
extends
关键字来实现的。从理论上解释一下,然后再通过代码示例来加深理解。

IS-A 关系

IS-A 是一种表达类之间关系的方式,主要用来表明一个实体(子类)是另一个实体(父类)的一种特殊类型。例如,Cat(猫)是 Animal(动物)的一种特殊类型。因此,可以说 Cat IS-A Animal。

里氏替换原则(Liskov Substitution Principle)

这是一个面向对象设计的原则,它表明如果 S 是 T 的一个子类型(在 Java 中意味着 S 类继承自 T 类),那么任何期望 T 类的对象的地方都可以用 S 类的对象来替换,而不会影响程序的行为。

向上转型(Upcasting)

向上转型是指子类类型的引用自动转换成父类类型。向上转型在多态中是常见的,它允许将子类的对象赋值给父类的引用。例如,可以将 Cat 类型的对象赋值给 Animal 类型的引用。

以代码形式展示上述概念:

/**
 * 动物
 *
 * @author BNTang
 * @date 2024/03/10 09:36:41
 * @description 创建一个表示动物的基类(父类)
 */
class Animal {
    // 动物类有一个叫的方法
    public void makeSound() {
        System.out.println("动物发出声音");
    }
}

// 创建一个 Cat 类(子类),继承自 Animal 类
class Cat extends Animal {
    // 重写父类的 makeSound 方法
    @Override
    public void makeSound() {
        // 这里的调用体现了多态性,即 Cat 的叫声不同于一般 Animal
        System.out.println("猫咪喵喵叫");
    }
}

public class InheritanceExample {
    public static void main(String[] args) {
        // Upcasting: 将 Cat 对象向上转型为 Animal 类型
        Animal myAnimal = new Cat();
        // 虽然 myAnimal 在编译时是 Animal 类型,但实际执行的是 Cat 的 makeSound 方法
        myAnimal.makeSound();

        // 创建一个 Animal 类型的对象,调用 makeSound 方法
        Animal anotherAnimal = new Animal();
        anotherAnimal.makeSound();

        // 这里可以看到,Cat 对象(myAnimal)能够替换 Animal 对象(anotherAnimal)的位置,
        // 并且程序的行为没有发生错误,体现了里氏替换原则
    }
}

定义了两个类:Animal 和 Cat。

Cat 类继承自 Animal 类,并重写了 makeSound 方法。在 main 方法中,创建了一个 Cat 对象,并将其向上转型为 Animal 类型的引用 myAnimal。调用 myAnimal 的 makeSound 方法时,会执行 Cat 类的重写方法而不是 Animal 类的方法,这就体现了多态性和里氏替换原则。同时,Cat 对象(向上转型后的 myAnimal)可以在任何需要 Animal 对象的地方使用,这也满足了 IS-A 关系的定义

完整代码可在此查阅:
GitHub

多态(Polymorphism)

多态可以允许使用一个统一的接口来操作不同的底层数据类型或对象。多态分为
编译时
多态和
运行时
多态两种类型。
编译时多态(方法的重载),也被称为静态多态,主要是通过
方法重载
(Method Overloading)来实现的。方法重载指的是在同一个类中存在多个同名的方法,但这些方法的参数列表不同(参数数量或类型不同)。

编译器根据方法被调用时传入的参数类型和数量,来决定具体调用哪个方法。这种决策是在编译时做出的,因此称为编译时多态。

方法重载

代码示例:

/**
 * 打印机类
 * 用于演示方法重载
 *
 * @author BNTang
 */
class Printer {
    /**
     * 打印字符串
     *
     * @param content 要打印的字符串
     */
    public void print(String content) {
        System.out.println("打印字符串: " + content);
    }

    /**
     * 重载 print 方法,参数类型为 int,与打印字符串的方法区分开来
     *
     * @param number 要打印的数字
     */
    public void print(int number) {
        System.out.println("打印数字: " + number);
    }
}

public class OverloadingExample {
    public static void main(String[] args) {
        Printer printer = new Printer();

        // 调用 print 方法打印字符串
        printer.print("Hello, World!");

        // 调用重载的 print 方法打印数字
        printer.print(12345);

        // 编译器根据参数类型来决定调用哪个方法
    }
}

运行时多态,也被称为动态多态或动态绑定,是通过
方法覆盖
(Method Overriding)实现的。

运行时多态是在继承的基础上工作的,所以只要其中子类覆盖父类的方法。

运行时多态的决策是在程序执行期间进行的,即虚拟机在运行时刻根据对象的实际类型来确定调用哪个类中的方法。

方法覆盖

代码示例:

/**
 * 动物
 * 创建一个表示动物的基类(父类)
 *
 * @author BNTang
 */
class Animal {
    public void makeSound() {
        System.out.println("动物发出声音");
    }
}

class Dog extends Animal {
    @Override
    public void makeSound() {
        System.out.println("汪汪汪");
    }
}

public class PolymorphismExample {
    public static void main(String[] args) {
        // 向上转型
        Animal animal = new Dog();

        // 运行时多态,调用的是 Dog 类的 makeSound 方法
        animal.makeSound();
    }
}

虽然在编译时
animal
的类型是
Animal
,但是在运行时 JVM 会调用实际对象类型(也就是
Dog
)的
makeSound
方法,因此输出的将是 "汪汪汪",而不是 "动物发出声音"。这就是运行时多态的体现。

运行时多态的三个条件

  1. 继承
    :子类需要继承父类
  2. 方法覆盖
    :子类需要提供一个具体的实现,这个实现覆盖了父类的方法
  3. 向上转型
    :你可以将子类类型的引用转换为父类类型的引用(即将子类对象赋值给父类引用),之后通过这个父类引用来调用方法时,执行的将是子类的覆盖实现

利用多态写出可扩展性和可维护性更佳的代码,能够应对不断变化的需求。使得可以通过相同的接口来调用不同类的实现,提供了软件设计的灵活性。

完整代码可在此查阅:
GitHub

抽象(Abstraction)

导航

  • 引言
  • tablestore简介
  • 火线告警:500错误频发
  • 真相大白:单表数据超2亿,tablestore连接超时
  • 紧急发版:快速关闭查询功能
  • 数据清理:仅保留半年内的数据
  • 收紧入口:只同步一条到tablestore
  • 双保险:增加功能开关
  • 结语

本文首发
《记一次群聊消息查询优化的实践》

引言

我们在成长,代码也要成长。

一晃,做群聊业务两年多了。

随着业务的增长,群数量不断增长,聊天消息也在不断增长。

群聊的全局搜索的性能问题愈发凸显。

设计之初,考虑群消息的急剧增长,选择了使用阿里云的tablestore,这是一个类似ElasticSearch,拥有强大的搜索能力。

现在来看,还是过于乐观了。

当单表数据达到2亿+的时候,查询变得异常艰难,甚至频繁超时,被客户疯狂吐槽。

当然,遇到这些问题,或许是我们使用的方式不对,欢迎大家帮忙斧正。本篇仅用于问题记录和经验交流分享。

tablestore简介

表格存储(Tablestore)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控、推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。

基本概念

在使用表格存储前,您需要了解以下基本概念。

术语

说明

地域

地域(Region)物理的数据中心,
表格存储
服务会部署在多个阿里云地域中,您可以根据自身的业务需求选择不同地域的
表格存储
服务。更多信息,请参见
表格存储已经开通的Region

读写吞吐量

读吞吐量和写吞吐量的单位为读服务能力单元和写服务能力单元,服务能力单元(Capacity Unit,简称CU)是数据读写操作的最小计费单位。更多信息,请参见
读写吞吐量

实例

实例(Instance)是使用和管理
表格存储
服务的实体,每个实例相当于一个数据库。
表格存储
对应用程序的访问控制和资源计量都在实例级别完成。更多信息,请参见
实例

服务地址

每个实例对应一个服务地址(EndPoint),应用程序在进行表和数据操作时需要指定服务地址。更多信息,请参见
服务地址

数据生命周期

数据生命周期(Time To Live,简称TTL)是数据表的一个属性,即数据的存活时间,单位为秒。
表格存储
会在后台对超过存活时间的数据进行清理,以减少您的数据存储空间,降低存储成本。更多信息,请参见
数据版本和生命周期

应用场景

表格存储有互联网应用架构(包括数据库分层架构和分布式结构化数据存储架构)、数据湖架构和物联网架构三种典型应用架构。

  • 互联网应用
    • 历史订单数据场景
    • IM场景
    • Feed流场景
  • 大数据
    • 推荐系统
    • 舆情&风控分析(数据爬虫)场景
  • 物联网

基于我们的实际场景,选择了IM场景。


java sdk

更加详细的介绍请查看:
《java sdk》

火线告警:500错误频发



从日志记录可以看出,异常的主要集中在去年9月~12月之间,基本上超时请求。

除此以外,还有大量的慢查询。


这是其中一个包含搜索群消息逻辑的接口,确实很慢。

特别是在去年年底,几乎每天技术群都会有几个报警。

真相大白:单表数据超2亿,tablestore连接超时

经过排查,存储群聊消息的宽表超过接近3亿条。


因为是群聊场景,每条消息发出都会投递给其他群成员,我们是按照接收人的方式存储的,所以消息数量会激增。

消息存储数量已经过亿,这个就导致ts查询性能急剧下降,不知道Elastic Search的在这种数量下的性能如何,请有这方面经验的朋友指点一下。

如何应对呢?

能想到的就是删除TS中历史数据,保留一定时段内的数据,控制数据量在一定范围内。

但是保留多久的数据,产品、运营都无法给出一个合理的时段。

于是陷入僵局。

紧急发版:快速关闭查询功能

每一个群报警,都像是敌人发起的冲锋号角,我不能坐视不管。

短时间内没有更好地解决方案,和运营沟通后,选择暂时关闭群聊查询功能。


紧急给这个功能增加了开关,下班就发版了。报警群安静了,安稳过年。

数据清理:仅保留半年内的数据

选择妥协,别为难自己。

程序员朋友爱较真,有些人甚至到了丧心病狂的程度。

我有个写代码的同事,每次有人找他改问题都像是干架。

我呢,有时候也会钻进牛角尖。

经过一顿操作,我发现暂时没有很好的解决方案,于是我妥协了。

既然,没有很好的优化办法,那就把历史数据删除了,只保留半年的数据。
为此,转为写了一个定时任务,每天执行。

短时间内,我们将数据降低到2亿条以内。

收紧入口:只同步一条到tablestore

真正的猛士敢于直面困难,有些问题必须解决。

本以为定时删除数据的方案已经稳了,没想到增长的比删除的还快。

没到一个月,数据量又上来了。

我再次选择了关闭了群聊查询功能。

痛定思痛, 我决定优化方案——从源头控制写入TS的数量。

我们一起来看看之前的存储方案:



这样设计是考虑到可能会按照接收人来控制查看消息的权限。
比如尽管在一个聊天群,但是进群的人是有先后的,不同时间进群的人可能看到的消息不一样。

弊端就是同一份消息会存储多份。

为什么要这样做呢?

因为我们的IM并没有使用本地数据库的方式,所有消息都是从服务端拉取的。

上面的分析我们知道,数据量太大是导致tablestore查询性能差的根本原因。

那我们考虑对现有方案进行改进:

  • 放开消息查询的限制,不在限制接收人
  • 消息按照发送人存储,仅存储一条



这样存储,我们的数据将会大幅度下降。

双保险:增加功能开关

防患未然,功能可开关。

我们总是喋喋不休的强调程序的稳定性、健壮性、可扩展性...

但是,我们总是无法一次性写出完美的程序。

业务需求的变化、数据量的变化、三方接口的升级等都不得不让我们经常考虑到对程序的重构,总之有各种不确定性的因素出现。


《记一次加锁导致ECS服务器CPU飙高分析》
一文中,我们提到为了防患未然,对一些复杂功能增加开关是一个比较好的处理办法。

通过两个主要措施:

  • 数据清理:仅保留半年内的数据
  • 收紧入口:只同步一条到tablestore

几天之后,我们再次查看tablestore中单表的数据量已经大幅降低。


随着时间的推移,我们的策略会持续执行,那么数据量将会降低到千万级别。

当然,这个数据量,我们可以比较放心的打开这个群消息查询功能。

结语

善战者无赫赫之功。

哪有什么岁月静好,我们总是在打怪升级中成长。

或许对于大众而言所谓的"技术好",不是单纯的卖弄技术,而是能够针对灵活多变的场景,恰到好处的运用技术。

活到老,学到老。

这里笔者只根据个人的工作经验,一点点思考和分享,抛砖引玉,欢迎大家怕批评和斧正。

2024.03.11

成都

毕业半年多了,回顾从大学到现在搞过的很有意思的开源项目

回想当年,在高考结束后我的分数并不高,然后被调剂到了工业设计,再到后来感觉对计算机更感兴趣,于是对了很久的线努力转专业到了计算机,之后废了九牛二虎之力在大二一年修完了计算机专业大一大二两年的课程,到了大三开始搞事就开始做了一些项目。我发现项目做多了就很容易将项目串联起来,这点非常有意思。

本着分享与开源的精神,我与大家分享我的项目经验,我也就是凭借着这些项目成功在
23
届的秋招混进了字节,老大后来跟我说我是
23
届唯一一个双非进入我们大团队的,虽然我可能是当时秋招最菜的一个,但是最终还是混进去了,所以也算是希望分享一下为之付出的努力,与君共勉。计算机还是非常有意思的,我也希望能够保持对于技术的好奇心,同样也希望能做出一些有意思的项目来获得一丝成就感。

实际上我也只是昨天突发奇想想总结分享一下我做过的项目,因为写过的东西还是比较多的,所以也总结了好久。其实这其中我觉得最重要的问题是为什么要做这些项目,有些是想解决平时遇到的问题,比如学校
App
不好用就做个好用的、网页不让复制我就做个解除复制限制的,也有些是想提升技术水平做的,比如我写过的
Webpack
相关的、用
Canvas
实现的简历编辑器,还有一些是希望深入研究的,比如在线文档协同系列、富文本编辑器相关的项目。下面介绍的都是我的开源项目,内容基本也是按照时间线来的,也欢迎关注我的
GitHub:WindrunnerMax

此外需要声明的是,这很像是一篇论文的综述,文章的
信息密度非常高
,特别是很多与项目相关的链接,如果从零开始接受相关的内容还是需要一些时间的。

山科小站小程序

GitHub:SHST

首先说说为什么要做这个小程序,这其中有个背景是当时学校有个查各类信息的软件叫智校园,但是这个
App
很不好用,经常崩溃,在某一天我上午第一节课下课后这个
App
就打不开了,导致我想查个自习室都查不了,当时我在
J7
教学楼爬了四层楼都未找到自习室,当时就觉得很烦,于是就想自己做一个,不用这个了。

当时我还算是个计算机小白,毕竟在课上老师们大都还是照本宣科,书本上的东西也都比较死板,我就只能自己研究。回去之后我大概研究了两种方案,分别是搞
App
逆向看是否有什么逻辑问题,抓包看是否是数据问题。首先我是逆向了这个包没看出什么逻辑的毛病,不过也不是没有收获,这时候我拿到了接口请求的地址,然后我就测试请求是否有问题,这一测我就发现了问题所在,仅仅是一个拿初始化数据的接口失败了而已,而这个数据在我这里完全可以跳过,毕竟我只需要手动在我自己的服务器上维护这部分数据即可,于是配合之前逆向拿到的接口,一个全新的小程序就上线了,并且还集成了很多数据源扩展了很多功能。

最开始的时候只是我自己在用,第一版本是我在上课期间不务正业一个上午写完的,做的也比较粗糙,在更新几版之后,几乎全校的同学都用上了,数据大家可以看下。

image.png

image.png

再到后期因为很多原因比如多场景、小程序认证等问题,我又更新了很多个小程序以及
App
版本
GitHub:SHST-SDUST
。我个人觉得如果能够做一个大家都用的项目,并且能够不断将学习到的新知识应用到其中,比如我曾经尝试过迁移
TS
山科小站小程序
,以及做过组件库的封装
微信小程序校历组件
GitHub:Campus
,以及后期我从
UniApp
框架迁移到了
Taro
,同样也是从
Vue
迁移到了
React
实现。这样的项目会比纯技术的项目比如各种学习写
mini
框架会更有业务价值一些,不过这是两个方向上的价值,如果能结合技术和业务价值那是最好的。此外我觉得面试官同样也希望能够希望候选人能够在生活中运用自己的技术解决问题,能够保持对于技术的热爱与好奇心,这样更容易跟面试官聊到一起。也可以参考下当时我秋招时简历上对于项目的描述,
PS
: 数据并不是最新的。

image.png

实际上从本质来说这个小程序就是单纯的爬虫项目,在我对爬虫比较了解之后,我还做了一些有意思的事情,大家可能或多或少都受到过学校在各种平台/
App
上刷任务的通知,为了减少我自己的工作量,在当时我还做了个爬虫来帮助我们全班同学刷评论
GitHub:Younger
,帮了团支书一个大忙。还有一个是一个小游戏的辅助
GitHub:EliminVirus
,当时在朋友的带领下玩了消灭病毒这个小游戏,这个小游戏真的有意思,但是凭借我风骚的走位打了十几次硬是过不去
230
关,于是便有了这个辅助脚本,这个脚本不仅仅是一个爬虫,这其中还涉及到逆向微信小程序、微信
ROOT
抓包等知识,并且还写了文章
手机抓包HTTPS (Fiddler & Packet Capture)
Recover刷机简介
,此外还有一些诸如校园网认证的自动登录
GitHub:GIWIFI
之类的就不过多介绍了。

强智教务系统验证码识别

GitHub:SWVerifyCode

在实现山科小站小程序的时候,其中还有一个很大的问题是验证码需要识别,当时对于验证码识别的方法不是很懂,于是边学边查,最开始的时候就是用的最基本的
OpenCV
来处理像素,即使这个验证码很简单,但是同样是因为我对像素处理的不好导致识别率很低,经常要刷新几次才能成功识别。

到了后来不断完善了识别的能力,于是将其抽离出来独立出了验证码识别的仓库。在做上边介绍的刷评论
GitHub:Younger
脚本时,验证码变得很复杂,此时通过像素来处理就变的很吃力,于是学习了通过
CNN
来处理验证码
GitHub:Example
,并且到了后期搞了脚本的脚本用来实现自动训练的能力,并且也将其应用到了强智的验证码识别当中。

再到后来,因为在多种语言上的需要,将其封装了
Js
脚本的实现,帮助我在网页上登录强智的时候可以自动识别验证码而不需要我再手动输入了,再到后来因为学校的教务系统开启了公网访问,并且部署了
HTTPS
证书,国内的公网网站通常都是需要备案的,所以在此基础上我通过小程序
GitHub:SHST-ULTRA
实现了直接请求教务系统的方案,这意味着只要网络环境允许,小程序可以一直运行而不需要我的服务器介入。

image.png

飞机大战

GitHub:AirplaneWar

这是之前跟
@FubinSama
一起做的小学期作业,
C++(MFC)
实现的飞机大战,虽然代码写的很烂但是我觉得还是很有意思的,也算是在前端领域之外的技术扩展。

welcome.bmp

文本选中复制

GitHub:TKScript

这个项目应该很多同学都用过,在
GreasyFork
上的下载安装量已经超过
260
万了,这其中还不包括其他下载渠道比如
GitHub
、脚本猫等等。最开始做这个项目的原因是老师让我们写某个报告,然后我就准备先“参考”着看看网上的内容,发现很多类似的文库网站都不让复制,所以便有了这个脚本
文本选中复制

虽然这仅仅是一个解除网页复制限制的脚本,但是其中却涉及到很多技术细节,比如在调试的时候我应该怎么确定我剪贴板里有什么数据
Canvas图形编辑器-我的剪贴板里究竟有什么数据
、我应该怎么写脚本
深入浅出脚本管理器与油猴脚本
。如果遇到的都是基本的
DOM
文本还好说,毕竟无论如何文本都是下载到本地并且在
DOM
树上,无论我怎么处理都可以,但是如果是
Canvas
绘制的文本应该如何处理、如何设计更加通用的方案、如何适配的各个网站并且管理各个模块、如何将数据写到剪贴板等等都是需要考虑的,当然最难的还是调试这个网站为什么不能复制。之前我遇到的一个最离谱的是,浏览器在选中文本时焦点需要在文本上,而某网站有个按钮一直在抢焦点,这样的话用户就不能复制了,非常的离谱。在这里也可以参考下当时我秋招时简历上对于项目的描述,
PS
: 数据并不是最新的。

image.png

在后期我发现有人做了个浏览器扩展,其中嵌入了我的代码而且还是我打包过后的
GPL
协议的代码,最离谱的是其中还加了广告,此外再加上我也想学习一下浏览器拓展,于是我也就做了一个浏览器拓展,首先从零使用
Rspack
搭建了
Chrome
扩展的开发环境
从零实现的Chrome扩展
,并且研究了脚本管理器的实现
从油猴脚本管理器的角度审视Chrome扩展
,毕竟我是相当于从脚本管理器的基础上再做一套浏览器拓展的实现
GitHub:ForceCopy
。再之后因为
Chrome
主推
V3
版本的浏览器扩展实现,而
FireFox
主推
V2
版本的浏览器扩展实现,我还实现了一套兼容方案
初探webpack之单应用多端构建
。实际上我的浏览器插件同样也在这个仓库里,这个仓库是个
MonoRepo
,所有写过的脚本都在其中。

image.png

博客

GitHub:EveryDay

我的博客已经写了非常久了,在博客分支中已有
455
篇文章,仓库已有
1.5k

star
,这是切切实实从前端小白开始慢慢积攒写起来的文章,仓库的简介是“前端基础 个人博客 学习笔记”,所以如果按照目录一点点开始看的话,最开始的时候能够感受到特别简单,都是基础的
HTML CSS JavaScript
,然后越往后会越发的上强度,因为这实际上就是我的学习历程,都是这几年来由浅入深慢慢学习的过程。

写到这里我想起来之前我在群里说的暴论,把我的博客学明白了进字节应该问题不大,当然在这里也仅供参考,还有个例子是有位同学把我所有的文章都打印出来学习,最后也是双非进了字节,当然能进字节跟他自己的努力是分不开的,但是我的博客肯定也是起到了一点作用,要不谁会闲的没事画上百大洋去打印文章,毕竟一百大洋还是能吃不少好东西的。此外很多同学去买一些这种课那种课,有我免费的博客不看而非得去花钱买课是何必呢,我写的内容也都是由浅入深,特别是我的行文风格就是把问题写的特别详细,如果从头开始看基本不会出现理解不了的情况,因为我写文章同样也还是要给自己看的,时间太长我怕我自己都会忘所以对于问题的描述以及相关思考都写的非常详细,基本都会配有
DEMO
可以帮助理解。

这里还要引用一下我
README
的内容,这是一个前端小白的学习历程,如果只学习而不记录点什么那基本就等于白学了。这个版本库的名字
EveryDay
就是希望激励我能够每天学习,下面的文章就是从
2020.02.25
开始积累的文章,都是参考众多文章归纳整理学习而写的,文章包括了
HTML
基础、
CSS
基础、
JavaScript
基础与拓展、
Browser
浏览器相关、
Vue
使用与分析、
React
使用与分析、
Plugin
插件相关、
Patterns
设计模式、
Linux
命令、
LeetCode
题解等类别,内容都是比较基础的,毕竟我也还是个小白,此外基本上每个示例都是本着能够即时运行为目标的,新建一个
html
文件复制之后即可在浏览器运行或者直接可以在
console
中运行。如果想按照我写的顺序进行阅读的话可以
查看目录
,另外如果想更条理地查看的话可以访问
我的博客
,博客同时也是本版本库的
gh-pages
分支,是作为纯静态页面搭建在
Git Pages
上的,使用
jsdelivr
以及
cloudflare
作为缓存缓解国内访问速度问题。后期还在
gh-pages-ssg
分支上部署了
SSG
版本的
新版博客
,并且借助
ChatGPT
提供了英文翻译版本,分支是部署在
Vercel
上来缓解国内访问速度问题。在博客中的内容就相对比较多了,除了学习笔记之外还有一些做项目时的记录以及遇到的坑等。

image.png

文档编辑器

GitHub:DocEditor

初探富文本之富文本概述
初探富文本之编辑器引擎
初探富文本之OT协同算法
初探富文本之CRDT协同算法
初探富文本之OT协同实例
初探富文本之CRDT协同实例
初探富文本之React组件实时预览
初探富文本之富文本diff算法与文档对比视图的实现

在实习的时候,在偶然的机会上我接触到了富文本编辑器,并且感觉对在线文档产生了比较大的兴趣,本来我想着在公司里实践一下,但是因为业务线不同并且本身不赚钱的或者说收益比较低的变更,在商业排序上是比较靠后的,所以我就想自己做一个并且实践一下。在这其中学习了很多富文本的知识
基于slate构建文档编辑器
,对于
Rollup
打包工具也有更深的了解
Rollup的基本使用
,学习并发布了
NPM
包以便复用富文本能力,同样也有部署在
GitPages

在线DEMO

在现在看来,当时很多设计都是有问题的,插件化这部分设计不是很完善,
Core
模块也没有真正完整抽离出来,并且代码写的也并没有那么完善,但是这个编辑器为我以后做的工作打下了基础,并且不知道大家是否看到在文中第一段我说的“我发现项目做多了就很容易将项目串联起来,这点非常有意思”,这个文档编辑器就在我很多项目中集成。

同样引用一下
README
的内容,基于
slate.js
构建的文档编辑器,
slate
提供了控制富文本的
core
,简单来说他本身并不提供各种富文本编辑功能,所有的富文本功能都需要自己来通过其提供的
API
来实现,甚至他的插件机制也需要通过自己来拓展,所以在插件的实现方面就需要自己制定一些策略。在交互与
ui
方面对于飞书文档的参考比较多,整体来说坑也是比较多的,尤其是在做交互策略方面,不过做好兜底以后实现基本的文档编辑器功能是没有问题的。

image.png

简历编辑器

GitHub:ResumeEditor

依旧先说说为什么要做这个项目,在这里主要是遇到了一个问题,当时正值秋招时期,我也为怎么更新简历发愁,于是我借了学长的某简历平台账号想做来个简历,但是模版嘛你懂的,很多细节都不满意,限制太多,经常要么就是某个模块的内容必须要按照固定格式写,还有诸如行间距等内容都不能调,要么就是写的内容比较多,本来预览的时候还挺好,导出的时候
PDF
就超过了一页。

在某个夜晚我正在洗澡,然后灵光一闪我为什么不自己做一个简历编辑器呢,我设想的简历编辑器是什么样子的,那一定是自由拖拽调整位置、自由调整大小、导出的简历保证是一页,那么这不就是一个很好的低代码产品嘛!在专有领域上低代码还是有价值的,并且还能够串联我上边的文档编辑器项目!简直是一举多得,于是这个项目便被确立了起来
基于NoCode构建简历编辑器

实际上我秋招的简历就是用我的这个简历编辑器实现的
在线DEMO
,前文中项目介绍的简历截图也就是用这个项目生成的,这同样也是我说的项目串联中的一环,我自己的简历是用我自己的项目实现的,并且这个项目还串联了我的富文本项目,这可以称作项目的自举。

image.png

Webpack相关

GitHub:Webpack

初探webpack之编写plugin
初探webpack之编写loader
初探webpack之从零搭建Vue开发环境
初探webpack之单应用多端构建

作为专业搞前端的,不理解
Webpack
的打包实现显然是不合适的,所以我也学习过一些
Webpack
的实践,并且写了相关博客和
DEMO
。目前看起来在工作中学会这些暂时是足够的,如果有再深入的使用的话肯定还需要继续学习并输出文章。

image.png

此外,我还是觉得我们学习了东西最重要的还是解决问题,例如我学习了
Webpack

plugin

loader
之后,确确实实帮我解决了实现浏览器扩展的时候兼容
Chrome

FireFox
的问题
GitHub:Script

image.png

React在线预览

GitHub:ReactLive

这个项目是与工作内容有关系的,在实现某个需求的时候,我们需要实现一种类似于动态编译的能力,当时敲定的实现是基于
MarkDown
的方案,后边恰逢周末我就趁着这两天时间调研并开始写了这个博客
初探富文本之React组件实时预览(React Playground)
,后边也成功说服了大家,项目的方案也更换为基于这个方案的类似实现。

大家可能也注意到了上边我引用的博客还是与富文本相关的,那么也就是说这个项目又成功串联了我的富文本项目,实际上在
ReactLive

在线DEMO
以及文档编辑器的
在线DEMO
中都有实现,在
ReactLive
中是与博客内容完整相关的
DEMO
,其中还涉及到了多个编译器性能测试的相关部分。

实际上这个项目在文档站中的应用场景会比较契合一些,在一些场景中比如组件库的文档编写时,我们希望能够有实时预览的能力,也就是用户可以在文档中直接编写代码,然后在页面中实时预览,这样可以让用户更加直观的了解组件的使用方式,这也是很多组件库文档中都会有的一个功能。

image.png

在线文档协同

GitHub:Collab

初探富文本之OT协同算法
初探富文本之CRDT协同算法
初探富文本之OT协同实例
初探富文本之CRDT协同实例

实际上我们聊了很多文档编辑器的内容,但是大都是本地实现的独立的编辑器,涉及到多人协作的时候,独立的编辑器就会出现问题,例如用户
A

B
同时在写文档,很明显在两位同学进行保存的时候就会出现文档覆盖的问题,在这种情况下我们就需要文档协同来调度。

当然协同的成本还是比较高的,如果成本不能接受的情况下使用悲观锁的方式也是可以接受的,顾名思义是基于一种以悲观的态度类来防止一切数据冲突的方式,其以一种预防的姿态在修改数据之前把数据锁住,然后再对数据进行读写,在其释放锁之前其他的任何人都不能对数据进行操作。

但是有个场景非常特殊,那就是划词评论的能力,设想一个场景,我们的在线文档系统是由线上状态和草稿状态组成的,当然这也是常见的解决方案,此时我线上的文档文本是
xxxx
,草稿被添加过文字了此时的状态为
yyyyxxxx
,如果此时用户在线上的
xxxx
四个字也就是索引为
0-4
上添加了评论,那么此时在草稿的
0-4

yyyy
,这样的话添加评论的位置就产生了错误,导致下次发版本到线上之后评论的位置就错了。

为了解决上述的问题实际上就必须要引入协同算法来解决,协同本质上就是引入了合并与冲突的解决算法,所以在离线状态引入协同算法来解决实际问题也是有很大价值的,是做在线文档功能必不可少的一环能力。

所以我研究了大概得一个月的协同方案,并且又用了一个月的时间写了上述的四篇文章,当时是在实习的过程中写的,只能靠周末的时候学习,所以还是比较费劲的,尤其是我希望本着简单的原则提供
DEMO
出来,这就更耗费了时间,所以这几篇文档真的是很有质量的内容。

image.png

流程图编辑器

GitHub:FlowChartEditor

在之前看到了某个项目,其流程图编辑器的
UI
跟我印象中很久之前用的
ProcessOn
特别像,然后我就比较好奇这件事,后来慢慢了解到
ProcessOn
可能是基于
mxGraph
实现的(现在可能是自研
SVG+Canvas
方案),又因此重新看到了
DrawIO
这个项目,于是我就想既然
ProcessOn
能够做出集成方案,那整体来说方案应该是可行的,于是开始调研这个项目。

因为我的目标是纯前端的
NPM
包的集成方案,在研究了一段时间后,我将整个项目迁移了过来,并且完成了
在线DEMO
,并且将能力归类整理出文章
基于drawio构建流程图编辑器
,并且在这里我依然尝试着将项目串联起来,将流程图编辑器这个项目作为插件嵌入到了我的文档编辑器
在线DEMO
当中。实际上实现整个嵌入功能的时候并不简单,特别是将其作为独立的
NPM
包发布的这部分,我提交了
60+
次来做项目的兼容与
BUG
处理,
mxGraph
是比较老的项目了,代码都是纯
Js
并且都是基于原型链的修改,或者我这么说吧,即使是在我精简之后,
Graph.js
这一个文件的代码就有
10637
行。

image.png

基于WebRTC的局域网文件

GitHub:FileTransfer

在前一段时间,我想在手机上向电脑发送文件,因为要发送的文件比较多,所以我想直接通过
USB
连到电脑上传输,等我将手机连到电脑上之后,我发现手机竟然无法被电脑识别,能够充电但是并不能传文件,因为我的电脑是
Mac
而手机是
Android
,所以无法识别设备这件事就变得合理了起来。那么接着我想用
WeChat
去传文件,但是一想到传文件之后我还需要手动将文件删掉否则会占用我两份手机存储并且传输还很慢,我就又开始在网上寻找其他软件,这时候我突然想起来了
AirDrop
也就是隔空投送,就想着有没有类似的软件可以用,然后我就找到了
Snapdrop
这个项目,我觉得这个项目很神奇,不需要登录就可以在局域网内发现设备并且传输文件,于是在好奇心的驱使下我也学习了一下,并且基
于WebRTC/WebSocket
实现了类似的文件传输方案,并且在实现的过程中解决了如下问题:

  1. 局域网内可以互相发现,不需要手动输入对方
    IP
    地址等信息。
  2. 多个设备中的任意两个设备之间可以相互传输文本消息与文件数据。
  3. 设备间的数据传输采用基于
    WebRTC

    P2P
    方案,无需服务器中转数据。
  4. 跨局域网传输且
    NAT
    穿越受限的情况下,基于
    WebSocket
    服务器中转传输。

通过这种方式,任何拥有浏览器的设备都有传输数据的可能,不需要借助数据线传输文件,也不会受限于
Apple
全家桶才能使用的隔空投送,并且在实现的过程中我还拓展了多文件发送、文本消息、尝试公网连接等能力,总结起来通过这种方式我们可以获得如下的收益:

  1. 天然的跨平台优势,常用的家庭PC或者移动设备通常都会拥有浏览器,所以我们可以轻松应用于常见的
    IOS/Android/Mac
    设备向
    PC
    台式设备传输文件的场景等等。
  2. 无视家庭路由器的
    AP
    隔离功能,即使因为各种原因比如合租时房东对路由器设备开启了
    AP
    隔离,我们的服务依旧可以正常交换数据,这样避免了在路由器不受我们控制的情况下通过
    WIFI
    传输文件的掣肘,但是这通常不适用于大型公司的对称型
    NAT
    ,至于原因我们后边也会聊到。
  3. 公网的
    P2P
    数据交换,在现在
    IPv6
    的普及下,如果设备支持
    IPv6
    并且拥有公网
    IPv6
    地址的话,是可以直接进行数据传输的,点对点的数据交换不会受限于服务器的数据转发,并且隐私性会更高一些,当然因为国内网络环境的复杂性以及运营商对于
    UDP
    数据包的支持受限、
    IPv6
    地址只出不进等等限制,公网传输的实用性还是差一些的。

在耗费了两个周末的时间之后,将整个功能完善了出来
仿照AirDrop(隔空投送)优雅地在局域网中传输文件
,并且可以尝试使用
在线DEMO
,因为本身数据不会经由服务器转发,所以部署后大概占用
20MB
左右的内存,使用效果可以查看
README

视频

image.png

基于Canvas的简历编辑器

GitHub:CanvasEditor

掘金老给我推Canvas,于是我也学习Canvas做了个简历编辑器
Canvas图形编辑器-数据结构与History(undo/redo)
Canvas图形编辑器-我的剪贴板里究竟有什么数据

最开始本着学习的态度以及对技术的好奇心来做的,因为暂时找不到可以用
Canvas
实现的比较有意思的场景,所以才选择了继续做简历编辑器
在线DEMO
。因此除了一些工具类的包例如
ArcoDesign

ResizeObserve

Jest
等包之外,关于 数据结构
packages/delta
、插件化
packages/plugin
、核心模块
packages/core
等都是手动实现的,项目中的富文本内容还是用我的文档编辑器实现的,又将项目串了起来,并且整个项目还是用
Rspack
打包的基准环境,前边学习的
Webpack
又派上了用场帮我解决了一个打包问题。

实际上这也是本着 自己学习的项目能自己写就自己写,公司/商业化项目能有已有包就用已有包 的原则来的,在这里的目标是学习而不是做产品,自己学习肯定是希望能够更多地接触相对底层一些的能力,自己可以多踩一些坑会对相关能力有更深的理解,如果是公司的项目那肯定是成熟的产品优先,成熟的产品对于边界
case
的处理以及积攒的
issue
也不是轻易能够比拟的。

此外,除了想学习的理由之外,针对于市面上简历编辑器本身的问题与痛点,也做了相关的解决方案:

  1. 固定模版不好用,各种模版用起来细节上并不是很满意,要么是模块的位置固定,要么是页面边距不满意,而通过
    Canvas
    实现的简历编辑器都是图形,完全依靠画布绘制图形,在给定的基础图形上可以任意绘制,不会有排版问题。
  2. 数据安全不能保证,因为简历上通常会存在很多个人信息,例如电话、邮箱等等,这些简历网站通常都需要登录才能用,数据都存在服务端,虽然泄漏的可能性不大,但是保护隐私还是很重要的,此编辑器是纯前端项目,数据全部存储在本地,没有任何服务器上传行为,可以完全保证数据安全。
  3. 维持一页简历不易,之前使用某简历模版网站时,某一项写的字较多时导出就会出现多页的情况,而我们大家大概都听说过简历最好是一页,所以在实现此编辑器时是直接通过排版的方式生成
    PDF
    ,所以在设置页面大小后,导出的
    PDF
    总会是保持一页,看起来会更美观。

虽然这只是一个小小的简历编辑器,但是针对编辑器涉及的能力还有很多,整体来看会涉及到很多技术问题,例如数据结构、
History
模块、复制粘贴模块、画布分层、事件管理、无限画布、按需绘制、性能优化、焦点控制、参考线、富文本、快捷键、层级控制、渲染顺序、事件模拟、
PDF
排版等等,目前来说想要做好还是有很大难度的,也是一个可以深入研究的项目。

image.png

总结

感觉还是做了一些比较有意思的项目的,我个人觉得如果是搞技术的话,保持对技术的好奇心是一件很重要的事,能用技术来解决平时遇到的问题也是非常重要的一点,昨天听群友说他做了个项目用算法来下棋,面试官问他这个项目有什么用,我觉得这就是个很有意思的项目,过年回家的时候跟大爷下棋能跟大爷下的有来有回可不是有用吗,提供情绪价值也是很重要的。

再说回来我做的这些项目,最开始我的目的就非常简单,我觉得这件事有意思我就去做了,并且在做的过程中不断解决了很多问题,并且也不断提升了技术水平,还能写很多博客,何乐而不为。其实在这里我还思考过一个问题,什么是好项目,这个问题就很宽泛,我觉得在这其中很重要的是将做过的项目联系起来,或者真的将其用到了自己的生活中,这对于自己来说就是好项目,或许对于面试来说这个项目的价值并不大,但是如果能对自己有所帮助,无论是生活上还是技术上,那就很重要。

还有一个问题我觉得也是值得讨论的,内卷和努力的区别究竟是什么,我目前的理解是内卷一定是有多个人一起在恶性的竞争,比如两个人在比谁工作的时间长,谁加班加得多,这种就是纯纯的内卷,努力的话是一个人也可以做的事,当然多个人也可以,大家没有利益冲突,相互学习相互帮助,一个人的话就是做着自己感兴趣的事,比如每天下班后学习英语。但是话又说回来了,个人的努力实际上是在更大范围上来说还是竞争,大家肯定也体会到整个环境越来越卷了,从另一个角度上讲,这或许也是社会发展的动力所在吧。

那么搞了这么多项目,那么我的朋友,代价是什么呢?代价就是到现在都还没有对象,过年回家差点就要去相亲了,我很难绷。

本文将从leader处理器入手,详细分析node的增删改查流程及监听器原理。

回顾数据读写流程

leader

  1. ZookeeperServer.processPacket封装Request并提交给业务处理器
  2. LeaderRequestProcessor做本地事务升级
  3. PrepRequestProcessor做事务准备
  4. ProposalRequestProcessor事务操作发proposal给follower节点,持久化到log文件
  5. CommitProcessor读请求直接转发给下游处理器,事务操作等待到了quorum状态转发给下游处理器
  6. ToBeAppliedRequestProcessor清理toBeApplied集
  7. FinalRequestProcessor将事务写到ZKDatabase中,给客户端发响应

follower

  1. 处理PROPOSAL:使用SyncRequestProcessor处理器持久化,之后SendAckRequestProcessor给leader发ack
  2. 处理COMMIT:提交给CommitProcessor处理器,之后FinalRequestProcessor将事务写到ZKDatabase中

创建node

涉及create、create2、createContainer、createTTL等命令。

PrepRequestProcessor事务准备

反序列化请求参数

switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
    CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
    break;
case OpCode.createTTL:
    // 默认不支持ttl
    CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
    pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
    break;
// ...

CreateRequest封装创建node的参数:

public class CreateRequest implements Record {
  private String path;
  private byte[] data;
  private java.util.List<org.apache.zookeeper.data.ACL> acl;
  private int flags;
}

CreateTTLRequest封装创建node加ttl的参数:

public class CreateTTLRequest implements Record {
  private String path;
  private byte[] data;
  private java.util.List<org.apache.zookeeper.data.ACL> acl;
  private int flags;
  private long ttl;
}

事务准备

protected void pRequest2Txn(int type, long zxid, Request request, Record record)
        throws KeeperException, IOException, RequestProcessorException {
    // ...

    switch (type) {
      case OpCode.create:
      case OpCode.create2:
      case OpCode.createTTL:
      case OpCode.createContainer: {
        pRequest2TxnCreate(type, request, record);
        break;
      }
    // ...
    }
}

private void pRequest2TxnCreate(
        int type, Request request, Record record) throws IOException, KeeperException {
    int flags;
    String path;
    List<ACL> acl;
    byte[] data;
    long ttl;
    if (type == OpCode.createTTL) {
        CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
        // 给flags等参数赋值
    } else {
        CreateRequest createRequest = (CreateRequest) record;
        // 给flags等参数赋值
        ttl = -1;
    }
    // CreateMode:
    // PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
    // CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
    CreateMode createMode = CreateMode.fromFlag(flags);
    // 验证临时节点、ttl参数、检查session
    // 默认不支持ttl
    validateCreateRequest(path, createMode, request, ttl);
    String parentPath = validatePathForCreate(path, request.sessionId); // 父节点path

    List<ACL> listACL = fixupACL(path, request.authInfo, acl); // 请求携带的权限
    ChangeRecord parentRecord = getRecordForPath(parentPath); // 得到父节点
    // 验证CREATE权限
    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
    int parentCVersion = parentRecord.stat.getCversion();
    if (createMode.isSequential()) { // 顺序节点
        // 例如/users/admin0000000001
        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
    }
    validatePath(path, request.sessionId);
    // 略
    boolean ephemeralParent = 
        EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
    // 父节点不可以是临时节点

    int newCversion = parentRecord.stat.getCversion() + 1; // 父节点的childVersion++
    // 检查字节限额
    zks.checkQuota(path, null, data, OpCode.create);
    // 不同类型创建对应的Txn对象
    if (type == OpCode.createContainer) {
        request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
    } else if (type == OpCode.createTTL) {
        request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
    } else {
        request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
    }

    TxnHeader hdr = request.getHdr();
    long ephemeralOwner = 0;
    if (createMode.isContainer()) {
        ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
    } else if (createMode.isTTL()) {
        ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
    } else if (createMode.isEphemeral()) {
        ephemeralOwner = request.sessionId; // 临时节点使用sessionId
    }
    // czxid(created),mzxid(modified),ctime,mtime,version,cversion(childVersion),
    // aversion(aclVersion),ephemeralOwner,pzxid(lastModifiedChildren)
    StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);

    // 父节点
    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount++;
    parentRecord.stat.setCversion(newCversion);
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord);
    // 新增节点
    ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
    nodeRecord.data = data;
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord);
}

protected void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}

outstandingChanges保存未提交的事务变化,比如在生成顺序节点时需要使用cversion值,但是在事务提交到ZKDatabase之前,库里面的值是旧的,所以在上面的代码中,是从outstandingChanges查找节点,给cversion++后再生成顺序节点。

在事务提交之后,才会清理outstandingChanges集。

ProposalRequestProcessor发Proposal

public void processRequest(Request request) throws RequestProcessorException {
    if (request instanceof LearnerSyncRequest) { // sync命令流程,暂不分析
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request); // 提交给下游处理器
        }
        if (request.getHdr() != null) { // 事务操作需要发proposal并写磁盘
            try {
                zks.getLeader().propose(request);
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 把事务log写到文件中
            // 之后通过AckRequestProcessor处理器给leader ack
            syncProcessor.processRequest(request);
        }
    }
}

CommitProcessor提交事务

public void processRequest(Request request) {
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    queuedRequests.add(request); // 所有请求队列
    if (needCommit(request)) { // 需要提交的请求进入到写队列
        queuedWriteRequests.add(request);
        numWriteQueuedRequests.incrementAndGet();
    } else {
        numReadQueuedRequests.incrementAndGet();
    }
    wakeup();
}

run方法对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。

FinalRequestProcessor应用事务

该处理器位于处理器链的末尾,负责将事务应用到ZKDatabase、查询数据、返回响应。

applyRequest

该方法将事务应用到ZKDatabase中:

private ProcessTxnResult applyRequest(Request request) {
    // 应用事务
    ProcessTxnResult rc = zks.processTxn(request);

    // closeSession

    // metrics

    return rc;
}

zks.processTxn负责处理session、处理事务、清理outstandingChanges集。重点看一下处理事务的步骤。

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        case OpCode.create:
            CreateTxn createTxn = (CreateTxn) txn;
            rc.path = createTxn.getPath();
            createNode(
                createTxn.getPath(),
                createTxn.getData(),
                createTxn.getAcl(),
                createTxn.getEphemeral() ? header.getClientId() : 0,
                createTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                null);
            break;
        case OpCode.create2:
            CreateTxn create2Txn = (CreateTxn) txn;
            rc.path = create2Txn.getPath();
            Stat stat = new Stat();
            createNode(
                create2Txn.getPath(),
                create2Txn.getData(),
                create2Txn.getAcl(),
                create2Txn.getEphemeral() ? header.getClientId() : 0,
                create2Txn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createTTL:
            CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
            rc.path = createTtlTxn.getPath();
            stat = new Stat();
            createNode(
                createTtlTxn.getPath(),
                createTtlTxn.getData(),
                createTtlTxn.getAcl(),
                EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                createTtlTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createContainer:
            CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
            rc.path = createContainerTxn.getPath();
            stat = new Stat();
            createNode(
                createContainerTxn.getPath(),
                createContainerTxn.getData(),
                createContainerTxn.getAcl(),
                EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                createContainerTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        // ...
        }
    }
    // ...
}

createNode

public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    StatPersisted stat = createStat(zxid, time, ephemeralOwner);
    DataNode parent = nodes.get(parentName); // 父节点必须存在
    if (parent == null) {
        throw new NoNodeException();
    }
    synchronized (parent) {
        Long acls = aclCache.convertAcls(acl);

        Set<String> children = parent.getChildren();
        if (children.contains(childName)) { // 节点不能存在
            throw new NodeExistsException();
        }

        nodes.preChange(parentName, parent);
        if (parentCVersion == -1) { // childVersion++
            parentCVersion = parent.stat.getCversion();
            parentCVersion++;
        }

        if (parentCVersion > parent.stat.getCversion()) {
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
        }
        // 创建node
        DataNode child = new DataNode(data, acls, stat);
        parent.addChild(childName);
        nodes.postChange(parentName, parent);
        nodeDataSize.addAndGet(getNodeSize(path, child.data));
        nodes.put(path, child); // 维护NodeHashMap
        // 处理临时节点
        EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.add(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.add(path);
        } else if (ephemeralOwner != 0) {
            HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
            synchronized (list) {
                list.add(path);
            }
        }
        // 返回节点stat
        if (outputStat != null) {
            child.copyStat(outputStat);
        }
    }
    // now check if its one of the zookeeper node child 略

    // 触发NodeCreated监听
    dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
    // 触发父节点的NodeChildrenChanged监听
    childWatches.triggerWatch(
        parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

返回响应

case OpCode.create: {
    lastOp = "CREA";
    rsp = new CreateResponse(rc.path); // 创建Response
    err = Code.get(rc.err); // processTxn的err
    requestPathMetricsCollector.registerRequest(request.type, rc.path);
    break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
    lastOp = "CREA";
    rsp = new Create2Response(rc.path, rc.stat); // 创建Response
    err = Code.get(rc.err); // processTxn的err
    requestPathMetricsCollector.registerRequest(request.type, rc.path);
    break;
}

最后会使用cnxn把响应返回给客户端:

ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
cnxn.sendResponse(hdr, rsp, "response");

EphemeralType

  • VOID
  • NORMAL
  • CONTAINER
  • TTL

ephemeralOwner标识znode是临时的,以及哪个会话创建了该节点。通过zookeeper.extendedTypesEnabled属性可以启用ttl节点等扩展功能。ephemeralOwner的"特殊位"用于表示启用了哪个功能,而ephemeral Owner的剩余位是特定于功能的。

当zookeeper.extendedTypesEnabled为true时,将启用扩展类型。扩展ephemeralOwner填充高8位(0xff00000000000000L),高8位之后的两个字节用于表示ephemeralOwner扩展特征,剩余5个字节由该功能指定,可用于所需的任何目的。

目前,唯一扩展功能是TTL节点,扩展特征值为0。对于TTL节点,ephemeralOwner具有0xff的高8位,接下来2个字节是0,后面的5个字节是以毫秒为单位的ttl值。因此ttl值为1毫秒的ephemeralOwner是0xff00000000000001。

要添加新的扩展功能:

  • 向枚举添加新名称
  • 在ttl之后,定义常量extended_BIT_xxxx,即0x0001
  • 通过静态初始值设定项向extendedFeatureMap添加映射

注意:从技术上讲,容器节点也是扩展类型,但由于它是在该功能之前实现的,因此被特别表示。根据定义,只有高位集(0x8000000000000000L)的临时所有者是容器节点(无论是否启用扩展类型)。

ttl节点

  • 默认不开启,使用
  • Added in 3.5.3
  • 创建PERSISTENT或PERSISTENT_SEQUENTIAL节点时,可以设置以毫秒为单位的ttl。如果znode没有在ttl内修改,并且没有子节点,它将在将来的某个时候成为服务器删除的候选节点

container节点

  • Added in 3.5.3
  • 当container节点的最后一个子节点被删除时,该container节点将成为服务器在未来某个时候删除的候选节点

Stat类

封装节点属性,字段如下:

  • czxid
    The zxid of the change that caused this znode to be created.
  • mzxid
    The zxid of the change that last modified this znode.
  • pzxid
    The zxid of the change that last modified children of this znode.
  • ctime
    The time in milliseconds from epoch when this znode was created.
  • mtime
    The time in milliseconds from epoch when this znode was last modified.
  • version
    The number of changes to the data of this znode.
  • cversion
    The number of changes to the children of this znode.
  • aversion
    The number of changes to the ACL of this znode.
  • ephemeralOwner
    The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  • dataLength
    The length of the data field of this znode.
  • numChildren
    The number of children of this znode.

删除node

涉及delete、deleteContainer等命令。

PrepRequestProcessor事务准备

反序列化请求参数

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        // ...
        case OpCode.deleteContainer:
            DeleteContainerRequest deleteContainerRequest =
                request.readRequestRecord(DeleteContainerRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
            break;
        case OpCode.delete:
            DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
            break;
        }
        // ...
    }
}

DeleteContainerRequest类:

public class DeleteContainerRequest implements Record {
    private String path;
}

DeleteRequest类:

public class DeleteRequest implements Record {
  private String path;
  private int version;
}

事务准备

protected void pRequest2Txn(int type, long zxid, Request request,
             Record record) throws KeeperException, IOException, RequestProcessorException {
    // 略

    switch (type) {
    // 略
    case OpCode.deleteContainer: {
        DeleteContainerRequest txn = (DeleteContainerRequest) record;
        String path = txn.getPath();
        String parentPath = getParentPathAndValidate(path);
        ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
        if (nodeRecord.childCount > 0) { // 有子节点不允许删除
            throw new KeeperException.NotEmptyException(path);
        }
        if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
            throw new KeeperException.BadVersionException(path);
        }
        ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
        request.setTxn(new DeleteTxn(path));
        // addChangeRecord 略
        break;
    }
    case OpCode.delete:
        zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        DeleteRequest deleteRequest = (DeleteRequest) record;
        String path = deleteRequest.getPath();
        String parentPath = getParentPathAndValidate(path);
        ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
        // 检查DELETE权限
        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
        ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
        checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // 检查version
        if (nodeRecord.childCount > 0) { // 有子节点不允许删除
            throw new KeeperException.NotEmptyException(path);
        }
        request.setTxn(new DeleteTxn(path));
        // addChangeRecord 略
        break;
    }
}

FinalRequestProcessor应用事务

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        // ...
        case OpCode.delete:
        case OpCode.deleteContainer:
            DeleteTxn deleteTxn = (DeleteTxn) txn;
            rc.path = deleteTxn.getPath();
            deleteNode(deleteTxn.getPath(), header.getZxid());
            break;
        }
        // ...
    }
}

deleteNode

public void deleteNode(String path, long zxid) throws NoNodeException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);

    DataNode parent = nodes.get(parentName);
    if (parent == null) { // 获取父节点且必须存在
        throw new NoNodeException();
    }
    synchronized (parent) {
        nodes.preChange(parentName, parent);
        parent.removeChild(childName);
        if (zxid > parent.stat.getPzxid()) {
            parent.stat.setPzxid(zxid); // The zxid of the change that last modified children of this znode
        }
        nodes.postChange(parentName, parent);
    }

    DataNode node = nodes.get(path); // 获取删除节点
    if (node == null) {
        throw new NoNodeException();
    }
    nodes.remove(path); // 从NodeHashMap删除
    synchronized (node) { // 移除权限
        aclCache.removeUsage(node.acl);
        nodeDataSize.addAndGet(-getNodeSize(path, node.data));
    }

    // 移除临时节点、container、ttl等缓存
    synchronized (parent) {
        long owner = node.stat.getEphemeralOwner();
        EphemeralType ephemeralType = EphemeralType.get(owner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.remove(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.remove(path);
        } else if (owner != 0) {
            Set<String> nodes = ephemerals.get(owner);
            if (nodes != null) {
                synchronized (nodes) {
                    nodes.remove(path);
                }
            }
        }
    }

    // 略

    // 触发NodeDeleted监听
    WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
    childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
    // 触发父节点的NodeChildrenChanged监听
    childWatches.triggerWatch(
        "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

设置node数据

PrepRequestProcessor事务准备

反序列化请求参数

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        // ...
        case OpCode.setData:
            SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
            break;
        // other case
        }
    }
    // ...
}

SetDataRequest类:

public class SetDataRequest implements Record {
  private String path;
  private byte[] data;
  private int version;
}

事务准备

protected void pRequest2Txn(int type, long zxid, Request request,
          Record record) throws KeeperException, IOException, RequestProcessorException {
    // 略

    switch (type) {
    // ...
    case OpCode.setData:
        zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        SetDataRequest setDataRequest = (SetDataRequest) record;
        path = setDataRequest.getPath();
        validatePath(path, request.sessionId);
        nodeRecord = getRecordForPath(path); // 获取节点对象
        // 检查权限
        zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
        // 检查字节限额
        zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
        // version++
        int newVersion = checkAndIncVersion(
            nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
        // 创建SetDataTxn
        request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
        // addChangeRecord
        break;
    // other case
    }
}

FinalRequestProcessor应用事务

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        // other case
        case OpCode.setData:
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            rc.path = setDataTxn.getPath();
            rc.stat = setData(
                setDataTxn.getPath(),
                setDataTxn.getData(),
                setDataTxn.getVersion(),
                header.getZxid(),
                header.getTime());
            break;
        // other case
        }
    }
    // ...
}

setData

public Stat setData(String path, byte[] data, int version,
                    long zxid, long time) throws NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    if (n == null) { // 检查节点存在
        throw new NoNodeException();
    }
    byte[] lastData;
    synchronized (n) {
        lastData = n.data;
        nodes.preChange(path, n);
        n.data = data; // 节点数据
        n.stat.setMtime(time); // 修改时间
        n.stat.setMzxid(zxid); // 修改zxid
        n.stat.setVersion(version); // 版本
        n.copyStat(s);
        nodes.postChange(path, n);
    }

    // 略

    // 触发NodeDataChanged监听
    dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
    return s;
}

查询node数据

PrepRequestProcessor验证session

经过该处理器时,只做session验证。

之后的ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor都是直接通过,不做事务处理,直接交给FinalRequestProcessor处理器查询数据、发送响应。

FinalRequestProcessor查询数据

使用handleGetDataRequest方法查询数据:

private Record handleGetDataRequest(
        Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
    GetDataRequest getDataRequest = (GetDataRequest) request;
    String path = getDataRequest.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    // 检查权限
    zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
    Stat stat = new Stat();
    // 查询数据
    // 如果watcher参数不为null会给path添加一个监听器
    byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
    return new GetDataResponse(b, stat);
}

GetDataRequest类:

public class GetDataRequest implements Record {
  private String path;
  private boolean watch;
}

节点监听

addWatch命令

case OpCode.addWatch: {
    lastOp = "ADDW";
    AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
    // 最终使用DataTree的addWatch方法注册监听器
    // cnxn是ServerCnxn对象,实现了Watcher接口
    zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
    rsp = new ErrorResponse(0);
    break;
}

DataTree的addWatch方法

public void addWatch(String basePath, Watcher watcher, int mode) {
    // PERSISTENT|PERSISTENT_RECURSIVE
    WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
    // dataWatches和childWatches是WatchManager类型对象
    dataWatches.addWatch(basePath, watcher, watcherMode);
    if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
        childWatches.addWatch(basePath, watcher, watcherMode);
    }
}

WatcherMode枚举

public enum WatcherMode {
    STANDARD(false, false),
    PERSISTENT(true, false), // persistent=0
    PERSISTENT_RECURSIVE(true, true), // persistentRecursive=1
    ;
}

PERSISTENT和PERSISTENT_RECURSIVE是3.6.0版本新增的特性。

Watcher接口

实现类需要实现process方法:

void process(WatchedEvent event);

WatchedEvent代表一个监听事件:

public class WatchedEvent {
    // 当前zk服务器的状态
    private final KeeperState keeperState;
    // NodeCreated|NodeDeleted|NodeDataChanged|NodeChildrenChanged等
    private final EventType eventType;
    private final String path;
    private final long zxid;
}

重要的实现类:

  • NIOServerCnxn
  • NettyServerCnxn

WatchManager类

This class manages watches. It allows watches to be associated with a string and removes watchers and their watches in addition to managing triggers.

核心字段:

// path -> Watcher集
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
// Watcher -> path->WatchStats(PERSISTENT|STANDARD + PERSISTENT|PERSISTENT_RECURSIVE等)
private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();

WatchStats类

public final class WatchStats {
    private static final WatchStats[] WATCH_STATS = new WatchStats[] {
            new WatchStats(0), // NONE
            new WatchStats(1), // STANDARD
            new WatchStats(2), // PERSISTENT
            new WatchStats(3), // STANDARD + PERSISTENT
            new WatchStats(4), // PERSISTENT_RECURSIVE
            new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
            new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
            new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
    };

    public static final WatchStats NONE = WATCH_STATS[0];

    private final int flags;

    private WatchStats(int flags) {
        this.flags = flags;
    }

    private static int modeToFlag(WatcherMode mode) {
        // mode = STANDARD; return 1 << 0 = 1(0001)
        // mode = PERSISTENT; return 1 << 1 = 2(0010)
        // mode = PERSISTENT_RECURSIVE; return 1 << 2 = (0100)
        return 1 << mode.ordinal();
    }

    public WatchStats addMode(WatcherMode mode) {
        int flags = this.flags | modeToFlag(mode); // |计算保留多种状态
        return WATCH_STATS[flags];
    }

    public WatchStats removeMode(WatcherMode mode) {
        int mask = ~modeToFlag(mode); // 取反
        int flags = this.flags & mask;
        if (flags == 0) {
            return NONE;
        }
        return WATCH_STATS[flags];
    }

    // ...
}

addWatch

public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
    Set<Watcher> list = watchTable.get(path);
    if (list == null) {
        list = new HashSet<>(4);
        watchTable.put(path, list);
    }
    list.add(watcher); // 添加watchTable

    Map<String, WatchStats> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashMap<>();
        watch2Paths.put(watcher, paths);
    }

    WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
    WatchStats newStats = stats.addMode(watcherMode);

    if (newStats != stats) {
        paths.put(path, newStats);
        if (watcherMode.isRecursive()) {
            ++recursiveWatchQty;
        }
        return true;
    }

    return false;
}

triggerWatch

public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
    Set<Watcher> watchers = new HashSet<>();
    synchronized (this) {
        // path迭代器,从子节点path向前遍历
        // 例如/apps/app1/name
        // next = /apps/app1/name, next = /apps/app1, next = /apps ...
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        for (String localPath : pathParentIterator.asIterable()) {
            // 获取遍历Watcher集
            Set<Watcher> thisWatchers = watchTable.get(localPath);
            Iterator<Watcher> iterator = thisWatchers.iterator(); 
            while (iterator.hasNext()) {
                Watcher watcher = iterator.next();
                // 获取watcher对应的WatchStats
                Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
                WatchStats stats = paths.get(localPath); // if stats==null continue
                if (!pathParentIterator.atParentPath()) {
                    watchers.add(watcher); // 加入watchers中
                    WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
                    if (newStats == WatchStats.NONE) { // STANDARD模式下会移除监听器
                        iterator.remove();
                        paths.remove(localPath);
                    } else if (newStats != stats) {
                        paths.put(localPath, newStats);
                    }
                } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
                    // 递归模式下才将父节点加入watchers中
                    watchers.add(watcher);
                }
            }
            if (thisWatchers.isEmpty()) {
                watchTable.remove(localPath);
            }
        }
    }
    // 略

    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }

    // 略

    return new WatcherOrBitSet(watchers);
}

NIOServerCnxn

上面查找到watchers之后会触发process方法,看一下NIOServerCnxn的方法实现:

public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);

    // 转型成WatcherEvent才能通过网络传输
    WatcherEvent e = event.getWrapper();
    // 把事件推送给客户端
    int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
    ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
}

一、简介

在 Java 的
java.util.concurrent
包中,除了提供底层锁、并发同步等工具类以外,还提供了一组原子操作类,大多以
Atomic
开头,他们位于
java.util.concurrent.atomic
包下。

所谓原子类操作,顾名思义,就是这个操作要么全部执行成功,要么全部执行失败,是保证并发编程安全的重要一环。

相比通过
synchronized

lock
等方式实现的线程安全同步操作,原子类的实现机制则完全不同。它采用的是
通过无锁(lock-free)的方式来实现线程安全(thread-safe)访问,底层原理主要基于
CAS
操作来实现

某些业务场景下,通过原子类来操作,既可以实现线程安全的要求,又可以实现高效的并发性能,同时编程方面更加简单。

下面我们一起来看看它的具体玩法!

二、常用原子操作类


java.util.concurrent.atomic
包中,因为原子类众多,如果按照类型进行划分,可以分为五大类,每个类型下的原子类可以用如下图来概括(不同 JDK 版本,可能略有不同,本文主要基于 JDK 1.8 进行采样)。

虽然原子操作类很多,但是大体的用法基本类似,只是针对不同的数据类型进行了单独适配,这些原子类都可以保证多线程下数据的安全性,使用起来也比较简单。

2.1、基本类型

基本类型的原子类,也是最常用的原子操作类,
JDK
为开发者提供了三个基础类型的原子类,内容如下:

  • AtomicBoolean
    :布尔类型的原子操作类
  • AtomicInteger
    :整数类型的原子操作类
  • AtomicLong
    :长整数类型的原子操作类


AtomicInteger
为例,常用的操作方法如下:

方法 描述
int get() 获取当前值
void set(int newValue) 设置 value 值
int getAndIncrement() 先取得旧值,然后加1,最后返回旧值
int getAndDecrement() 先取得旧值,然后减1,最后返回旧值
int incrementAndGet() 加1,然后返回新值
int decrementAndGet() 减1,然后返回新值
int getAndAdd(int delta) 先取得旧值,然后增加指定值,最后返回旧值
int addAndGet(int delta) 增加指定值,然后返回新值
boolean compareAndSet(int expect, int update) 直接使用CAS方式,将【旧值】更新成【新值】,核心方法

AtomicInteger
的使用方式非常简单,使用示例如下:

AtomicInteger atomicInteger = new AtomicInteger();
// 先获取值,再自增,默认初始值为0
int v1 = atomicInteger.getAndIncrement();
System.out.println("v1:" + v1);

// 获取自增后的ID值
int v2 = atomicInteger.incrementAndGet();
System.out.println("v2:" + v2);

// 获取自减后的ID值
int v3 = atomicInteger.decrementAndGet();
System.out.println("v3:" + v3);

// 使用CAS方式,将就旧值更新成 10
boolean v4 = atomicInteger.compareAndSet(v3,10);
System.out.println("v4:" + v4);

// 获取最新值
int v5 = atomicInteger.get();
System.out.println("v5:" + v5);

输出结果:

v1:0
v2:2
v3:1
v4:true
v5:10

下面我们以对某个变量累加 10000 次为例,采用 10 个线程,每个线程累加 1000 次来实现,对比不同的实现方式执行结果的区别(预期结果值为 10000)。

方式一:线程不安全操作实现
public class Demo1 {

    /**
     * 初始化一个变量
     */
    private static volatile int a = 0;

    public static void main(String[] args) throws InterruptedException {
        final int threads = 10;
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        for (int i = 0; i < threads; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        a++;
                    }
                    countDownLatch.countDown();
                }
            }).start();
        }

        // 阻塞等待10个线程执行完毕
        countDownLatch.await();
        // 输出结果值
        System.out.println("结果值:" + a);
    }
}

输出结果:

结果值:9527

从日志上可以很清晰的看到,实际结果值与预期不符,即使变量
a
加了
volatile
关键字,也无法保证累加结果的正确性。

针对
volatile
关键字,在之前的文章中我们有所介绍,它只能保证变量的可见性和程序的有序性,无法保证程序操作的原子性,导致运行结果与预期不符。

方式二:线程同步安全操作实现
public class Demo2 {

    /**
     * 初始化一个变量
     */
    private static int a = 0;

    public static void main(String[] args) throws InterruptedException {
        final int threads = 10;
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        for (int i = 0; i < threads; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    synchronized (Demo2.class){
                        for (int j = 0; j < 1000; j++) {
                            a++;
                        }
                    }
                    countDownLatch.countDown();
                }
            }).start();
        }

        // 阻塞等待10个线程执行完毕
        countDownLatch.await();
        // 输出结果值
        System.out.println("结果值:" + a);
    }
}

输出结果:

结果值:10000

当多个线程操作同一个变量或者方法的时候,可以在方法上加
synchronized
关键字,可以同时实现变量的可见性、程序的有序性、操作的原子性,达到运行结果与预期一致的效果。

同时也可以采用
Lock
锁来实现多线程操作安全的效果,执行结果也会与预期一致。

方式三:原子类操作实现
public class Demo3 {

    /**
     * 初始化一个原子操作类
     */
    private static AtomicInteger a = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {
        final int threads = 10;
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        for (int i = 0; i < threads; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        // 采用原子性操作累加
                        a.incrementAndGet();
                    }
                    countDownLatch.countDown();
                }
            }).start();
        }
        // 阻塞等待10个线程执行完毕
        countDownLatch.await();
        // 输出结果值
        System.out.println("结果值:" + a.get());
    }
}

输出结果:

结果值:10000

从日志结果上可见,原子操作类也可以实现在多线程环境下执行结果与预期一致的效果,关于底层实现原理,我们等会在后文中进行介绍。


synchronized

Lock
等实现方式相比,原子操作类因为采用无锁的方式实现,因此某些场景下可以带来更高的执行效率。

2.2、对象引用类型

上文提到的基本类型的原子类,只能更新一个变量,如果需要原子性更新多个变量,这个时候可以采用对象引用类型的原子操作类,将多个变量封装到一个对象中,
JDK
为开发者提供了三个对象引用类型的原子类,内容如下:

  • AtomicReference
    :对象引用类型的原子操作类
  • AtomicStampedReference
    :带有版本号的对象引用类型的原子操作类,可以解决 ABA 问题
  • AtomicMarkableReference
    :带有标记的对象引用类型的原子操作类


AtomicReference
为例,构造一个对象引用,具体用法如下:

public class User {

    private String name;

    private Integer age;

    public User(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
AtomicReference<User> atomicReference = new AtomicReference<>();
// 设置原始值
User user1 = new User("张三", 20);
atomicReference.set(user1);

// 采用CAS方式,将user1更新成user2
User user2 = new User("李四", 21);
atomicReference.compareAndSet(user1, user2);
System.out.println("更新后的对象:" +  atomicReference.get().toString());

输出结果:

更新后的对象:User{name='李四', age=21}

2.3、对象属性类型

在某项场景下,可能你只想原子性更新对象中的某个属性值,此时可以采用对象属性类型的原子操作类,
JDK
为开发者提供了三个对象属性类型的原子类,内容如下:

  • AtomicIntegerFieldUpdater
    :属性为整数类型的原子操作类
  • AtomicLongFieldUpdater
    :属性为长整数类型的原子操作类
  • AtomicReferenceFieldUpdater
    :属性为对象类型的原子操作类

需要注意的是,这些原子操作类需要满足以下条件才可以使用。

  • 1.被操作的字段不能是 static 类型
  • 2.被操纵的字段不能是 final 类型
  • 3.被操作的字段必须是 volatile 修饰的
  • 4.属性必须对于当前的 Updater 所在区域是可见的,简单的说就是尽量使用
    public
    修饰字段


AtomicIntegerFieldUpdater
为例,构造一个整数类型的属性引用,具体用法如下:

public class User {

    private String name;

    /**
     * age 字段加上 volatile 关键字,并且改成 public 修饰
     */
    public volatile int age;

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
User user = new User("张三", 20);
AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
// 将 age 的年龄原子性操作加 1
fieldUpdater.getAndIncrement(user);
System.out.println("更新后的属性值:" + fieldUpdater.get(user));

输出结果:

更新后的属性值:21

2.4、数组类型

数组类型的原子操作类,并不是指对数组本身的原子操作,而是对数组中的元素进行原子性操作,这一点需要特别注意,如果要针对整个数组进行更新,可以采用对象引入类型的原子操作类进行处理。

JDK
为开发者提供了三个数组类型的原子类,内容如下:

  • AtomicIntegerArray
    :数组为整数类型的原子操作类
  • AtomicLongArray
    :数组为长整数类型的原子操作类
  • AtomicReferenceArray
    :数组为对象类型的原子操作类


AtomicIntegerArray
为例,具体用法如下:

int[] value = new int[]{0, 3, 5};
AtomicIntegerArray array = new AtomicIntegerArray(value);
// 将下标为[0]的元素,原子性操作加 1
array.getAndIncrement(0);
System.out.println("下标为[0]的元素,更新后的值:" + array.get(0));

输出结果:

下标为[0]的元素,更新后的值:1

2.5、累加器类型

累加器类型的原子操作类,是从 jdk 1.8 开始加入的,专门用来执行数值类型的数据累加操作,性能更好。

它的实现原理与基本数据类型的原子类略有不同,当多线程竞争时采用分段累加的思路来实现目标值,在多线程环境中,它比
AtomicLong
性能要高出不少,特别是写多的场景。

JDK
为开发者提供了四个累加器类型的原子类,内容如下:

  • LongAdder
    :长整数类型的原子累加操作类
  • LongAccumulator

    LongAdder
    的功能增强版,它支持自定义的函数操作
  • DoubleAdder
    :浮点数类型的原子累加操作类
  • DoubleAccumulator
    :同样的,也是
    DoubleAdder
    的功能增强版,支持自定义的函数操作


LongAdder
为例,具体用法如下:

LongAdder adder = new LongAdder();
// 自增加 1,默认初始值为0
adder.increment();
adder.increment();
adder.increment();
System.out.println("最新值:" +  adder.longValue());

输出结果:

最新值:3

三、原理解析

在上文中,我们提到了原子类的底层主要基于
CAS
来实现,
CAS
的全称是:Compare and Swap,翻译过来就是:比较并替换。

CAS
是实现并发算法时常用的一种技术,它包含三个操作数:内存位置、预期原值及新值。在执行
CAS
操作的时候,会将内存位置的值与预期原值比较,如果一致,会将该位置的值更新为新值;否则,不做任何操作。

我们还是以上文介绍的
AtomicInteger
为例,部分源码内容如下:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 使用 Unsafe.compareAndSwapInt 方法进行 CAS 操作
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    // 变量使用 volatile 保证可见性
    private volatile int value;

    /**
     * get 方法
     */
    public final int get() {
        return value;
    }

    /**
     * 原子性自增操作
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

从源码上可以清晰的看出,变量
value
使用了
volatile
关键字,保证数据可见性和程序的有序性;原子性自增操作
incrementAndGet()
方法,路由到
Unsafe.getAndAddInt()
方法上。

我们继续往下看
Unsafe.getAndAddInt()
这个方法,部分源码内容如下:

public final class Unsafe {

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        // 1.循环比较并替换,只有成功才返回
        do {
            // 2.调用底层方法得到 value 值
            var5 = this.getIntVolatile(var1, var2);
            // 3.通过var1和var2得到底层值,var5为当前值,如果底层值与当前值相同,则将值设为var5+var4
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
        // 4.如果替换成功,返回当前值
        return var5;
    }

    /**
     * CAS 核心方法,由其他语言实现,不再分析
     */
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
}

从以上的源码可以清晰的看到,
incrementAndGet()
方法主要基于
Unsafe.compareAndSwapInt
方法来实现,同时进行了循环比较与替换的操作,只有替换成功才会返回,这个过程也被称为自旋操作,确保程序执行成功,进一步保证了操作的原子性。

其它的方法实现思路也类似。

如果我们自己通过
CAS
编写
incrementAndGet()
,大概长这样:

public int incrementAndGet(AtomicInteger var) {
    int prev, next;
    do {
        prev = var.get();
        next = prev + 1;
    } while ( !var.compareAndSet(prev, next));
    return next;
}

当并发数量比较低的时候,采用
CAS
这种方式可以实现更快的执行效率;当并发数量比较高的时候,因为存在循环比较与替换的逻辑,如果长时间循环,可能会更加消耗 CPU 资源,此时采用
synchronized

Lock
来实现线程同步,可能会更有优势。

四、ABA问题

从上文的分析中,我们知道 CAS 在操作的时候会检查预期原值是否发生变化,当预期原值没有发生变化才会更新值。

在实际业务中,可能会出现这么一个现象:线程 t1 正尝试将共享变量的值 A 进行修改,但还没修改;此时另一个线程 t2 获取到 CPU 时间片,将共享变量的值 A 修改成 B,然后又修改为 A,此时线程 t1 检查发现共享变量的值没有发生变化,就会主动去更新值,导致出现了错误更新,但是实际上原始值在这个过程中发生了好几次变化。这个现象我们称它为 ABA 问题。

ABA 问题的解决思路就是使用版本号,在变量前面追加上版本号,每次变量更新的时候把版本号加 1,原来的
A-B-A
就会变成
1A-2B-3A


java.util.concurrent.atomic
包下提供了
AtomicStampedReference
类,它支持指定版本号来更新,可以通过它来解决 ABA 问题。


AtomicStampedReference
类的
compareAndSet()
方法中,会检查当前引用是否等于预期引用,并且当前版本号是否等于预期版本号,如果全部相等,则以原子方式将该引用的值设置为给定的更新值,同时更新版本号。

具体示例如下:

// 初始化一个带版本号的原子操作类,原始值:a,原始版本号:1
AtomicStampedReference<String> reference = new AtomicStampedReference<>("a", 1);

// 将a更为b,同时将版本号加1,第一个参数:预期原值;第二个参数:更新后的新值;第三个参数:预期原版本号;第四个参数:更新后的版本号
boolean result1 = reference.compareAndSet("a", "b", reference.getStamp(), reference.getStamp() + 1);
System.out.println("第一次更新:" + result1);

// 将b更为a,因为预期原版本号不对,所以更新失败
boolean result2 = reference.compareAndSet("b", "a", 1, reference.getStamp());
System.out.println("第二次更新:" + result2);

输出结果:

第一次更新:true
第二次更新:false

五、小结

本文主要围绕
AtomicInteger
的用法和原理进行一次知识总结,
JUC
包下的原子操作类非常的多,但是大体用法基本相似,只是针对不同的数据类型做了细分处理。

在实际业务开发中,原子操作类通常用于计数器,累加器等场景,比如编写一个多线程安全的全局唯一 ID 生成器。

public class IdGenerator {

    private static AtomicLong atomic = new AtomicLong(0);

    public long getNextId() {
        return atomic.incrementAndGet();
    }
}

希望本篇的知识总结,能帮助到大家!

六、参考

1.
https://www.liaoxuefeng.com/wiki/1252599548343744/1306581083881506

2.
https://blog.csdn.net/zzti_erlie/article/details/123001758

3.
https://juejin.cn/post/7057032581165875231