2024年8月

作者:来自 vivo 互联网服务器团队- Li Gang

本文介绍了一次 MySQL 数据迁移的流程,通过方案选型、业务改造、双写迁移最终实现了亿级数据的迁移。

一、背景

预约业务是 vivo 游戏中心的重要业务之一。由于历史原因,预约业务数据表与其他业务数据表存储在同一个数据库中。当其他业务出现慢 SQL 等异常情况时,可能会直接影响到预约业务,从而降低系统整体的可靠性和稳定性。为了尽可能提高系统的稳定性和数据隔离性,我们迫切需要将预约相关数据表从原来的数据库中迁移出来,单独建立一个预约业务的数据库。

二、方案选型

常见的迁移方案大致可以分为以下几类:

image

而预约业务有以下特点:

  • 读写场景多,频率高,在用户预约/取消预约/福利发放等场景均涉及到大量的读写。
  • 不可接受停机,停机不可避免的会造成经济损失,在有其他方案的情况下不适合选择此方案。
  • 大部分的场景能接受秒级的数据不一致,少部分不能。

结合这些特点,我们再评估下上面的方案:

image

停机迁移方案需要停机,不适用于预约场景。预约场景存在不活跃的用户数据,如果用渐进式迁移方案的话很难迁移干净,可能还需要再写一个迁移任务来辅助完成迁移。而双写方案最大的优势在于每一步操作都可向上回滚,能尽可能的保证业务不出问题。

因此,最终选择的是双写方案。预约业务涉及到的读写场景多,每一个场景单独进行改造的成本大,采用 Mybatis 插件来实现迁移所需的双写等功能,可以有效降低改造成本。

三、前期准备

3.1 全量同步&增量同步&一致性校验

这几步使用了公司提供的数据同步工具。全量同步基于 MySQLDump 实现;增量同步基于 binlog 实现;一致性校验通过在新老库各选一个分块,然后聚合列数据计算并对比其特征值实现。

3.2 代码改造

引入了新库,那自然就需要在项目里新建数据源,并创建表对应的 Mybatis Mapper 类。这里有一个小细节需要注意,Mybatis 默认的 BeanNameGenerator 是 AnnotationBeanNameGenerator,它会使用类名作为 BeanName 注册到 Spring 的 ioc 容器中,Spring 启动时如果发现有了两个重名 Bean 就会启动失败,笔者这里给 Mybatis 设置了一个新的 BeanNameGenerator ,使用类的全路径名作为 BeanName 解决了问题。

public class FullPathBeanNameGenerator implements BeanNameGenerator {
    @Override
    public String generateBeanName(BeanDefinition definition, BeanDefinitionRegistry registry) {
        return definition.getBeanClassName();
    }
}

还有一点是主键 id,本次预约迁移需要保证新老库主键 id 一致,预约业务没做分库分表,id 都是直接用 MySQL 的自增 id,没有用 id 生成器之类的中间件。因此插入新表时只需要使用插入老表后 Mybatis 自动设置好的 id 即可,这次迁移前先检查了一遍业务代码,确保插入语句都用了 Mybatis 的 useGeneratedKeys 功能来自动设置 id。

3.3 插件实现

Mybatis 插件可以拦截 SQL 语句执行过程中的某一点进行干预和处理,而 Executor 是 Mybatis 中负责执行 SQL 语句的核心组件。我们可以对 Executor 的 update 和 query 方法进行代理以实现迁移所需的功能。

插件需要为读写场景分别实现以下功能:

image

考虑到开关切换部分的代码逻辑较为简单,因此在下文中,笔者将不再过多介绍该部分的具体实现,而是着重介绍如何在插件中使用老库的执行语句来访问新的数据库。此外,代码里会涉及到 Mybatis 相关的一些概念,由于网上已经有较多详尽的资料,这里就不再赘述。

迁移插件代理了 Executor 的 query 和 update 方法,首先在插件里获取到当前执行的 SQL 语句所在的 Mapper 路径。

@Intercepts(
        {
                @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
                @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
                @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
        }
)
public class AppointMigrateInterceptor implements Interceptor {
 
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
 
        Object[] args = invocation.getArgs();
        // Mybatis插件代理的Executor的update或者query方法,第一个参数就是MappedStatement
        MappedStatement ms = (MappedStatement) args[0];
        SqlCommandType sqlCommandType = ms.getSqlCommandType();
        String id = ms.getId();
        // 从MappedStatement id中获取对应的Mapper接口文件全路径
        String sourceMapper = id.substring(0, id.lastIndexOf("."));
 
        // ...
    }
     
    // ...
}

得到老库 Mapper 路径后,将其转换为新库 Mapper 路径,再使用 Class.forName 获取到新库 Mapper 类,然后用新库的 sqlSessionFactory 开启 sqlSession,再获取反射调用所需的方法、对象、参数,在新库上执行语句。

protected Object invoke(Invocation invocation, TableConfiguration tableConfiguration) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
    // 获取 MappedStatement
    MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
 
    // 获取 Mybatis 封装好的入参,封装函数 MapperMethod.convertArgsToSqlCommandParam(Object[] args)
    Object parameter = invocation.getArgs()[1];
 
    // 使用 Class.forName 获取到的新库 Mapper
    Class<?> targetMapperClass = tableConfiguration.getTargetMapperClazz();
 
    // 使用新库的 sqlSessionFactory 创建 sqlSession
    SqlSession sqlSession = sqlSessionFactory.openSession();
    Object result = null;
    try{
        // 使用新库的 Mapper 路径获取对应的 MapperProxy 对象
        Object mapper = sqlSession.getMapper(targetMapperClass);
 
        // 将 Mybatis 封装好的参数转换为原始参数
        Object[] paramValues = getParamValue(parameter);
 
        // 使用 mappedStatement Id 从新库对应的 Mapper 里获取对应的方法
        Method method = getMethod(ms.getId(), targetMapperClass, paramValues);
        paramValues = fixNullParam(method, paramValues);
 
        // 反射调用新库 Mapper 的方法,本质上执行的是 MapperProxy.invoke
        result = method.invoke(mapper, paramValues);
    } finally {
        sqlSession.close();
    }
    return result;
}
 
private Object[] fixNullParam(Method method, Object[] paramValues) {
    if (method.getParameterTypes().length > 0 && paramValues.length == 0) {
        return new Object[]{null};
    }
    return paramValues;
}

上述代码里,getMethod 方法负责从新库 Mapper 类里找到对应的方法,以用于后续的反射调用。

private Method getMethod(String id, Class mapperClass) throws NoSuchMethodException {
    //获取参数对应的 class
    String methodName = id.substring(id.lastIndexOf(".") + 1);
    String key = id;
    // methodCache 用来缓存 MappedStatement 和对应的 Method,避免每次都从 Mapper 里查找
    Method method = methodCache.get(key);
    if (method == null){
        method = findMethodByMethodSignature(mapperClass, methodName);
        if (method == null){
            throw new NoSuchMethodException("No such method " + methodName + " in class " + mapperClass.getName());
        }
        methodCache.put(key,method);
    }
    return method;
}
 
private Method findMethodByMethodSignature(Class mapperClass,String methodName) throws NoSuchMethodException {
    // mybatis 的 Mapper 内的方法不支持重载,所以这里只要方法名匹配到了就行,不用进行参数的匹配
    Method method = null;
    for (Method m : mapperClass.getMethods()) {
        if (m.getName().equals(methodName)) {
            method = m;
            break;
        }
    }
    return method;
}

得到方法后,还需要得到反射调用所需的参数。Mybatis 执行到 Executor.update/query 方法时,参数已经经过   MapperMethod.convertArgsToSqlCommandParam(Object[] args) 方法封装,不能直接用来执行  MapperProxy.invoke ,需要转换后才可用。下图是MapperMethod.convertArgsToSqlCommandParam(Object[] args) 的封装过程,而下面的 getParamValue 是这个函数的逆过程。

image

private Object[] getParamValue(Object parameter) {
    List<Object> paramValues = new ArrayList<>();
 
    if (parameter instanceof Map) {
        Map<String, Object> paramMap = (Map<String, Object>) parameter;
        if (paramMap.containsKey("collection")) {
            paramValues.add(paramMap.get("collection"));
        } else if (paramMap.containsKey("array")) {
            paramValues.add(paramMap.get("array"));
        } else {
            int count = 1;
            while (count <= paramMap.size() / 2){
                try {
                    paramValues.add(paramMap.get("param"+(count++)));
                }catch (BindingException e){
                    break;
                }
            }
        }
    } else if (parameter != null){
        paramValues.add(parameter);
    }
    return paramValues.toArray();
}

通过上述流程,我们就能使用 Mybatis 插件拦截老库的执行过程,实现迁移所需的读写数据源切换/新老库查询结果对比/先写老库再异步写新库等功能。

四、双写流程

4.1 上线双写改造后的业务代码,上线时只读写老库

  1. 读开关:只读老库
  2. 写开关:只写老库
  3. 新老库查询结果对比开关:关

此时业务仍只读写老库。

4.2 使用公司中间件平台提供的数据工具同步老库数据到新库

  1. 读开关:只读老库
  2. 写开关:只写老库
  3. 新老库查询结果对比开关:关

第1步和第2步并没有严格的顺序要求,只要在切换为双写前做完第1步和第2步就好。
条件允许的情况下,全量+增量同步时应选择不对外提供服务的离线从库作为数据源,避免主从延迟等问题对线上业务造成影响。

4.3 停止同步程序,然后开启双写

  1. 读开关:只读老库(开启查询结果对比开关)
  2. 写开关:双写
  3. 新老库查询结果对比开关:开

老库追上新库后,对数据做一次全量校验,避免出现数据不一致的情况。此外还需要开启新老库查询结果对比开关,通过日志监控观察新老库的查询结果是否一致。

停止数据同步和切换双写之间必然有时间差,如果先开启双写再停止数据同步,则可能出现插入重复数据或数据被覆盖的情况。因此需要对数据同步工具和迁移插件进行改造,以处理数据异常的情况,但是这样改造需要处理的情况较多,改造成本较高。所以这里选择先停止同步,再切换到双写,中间丢失的数据使用对比&补偿任务恢复,由于此时仍然全量读老库,所以对业务不会有影响。需要注意的是,双写阶段的时间不应太长,只要确保新老库数据一致就应该前进到下一步。

这一步在实际操作过程中需要注意以下情况:

4.3.1 自增主键

预约业务新库的主键 id 需要和老库保持一致,因此在迁移前检查了一遍业务代码,确保插入语句都用了 Mybatis 的 useGeneratedKeys 功能来返回 id ,这样插入新库时可以直接用设置好 id 的对象。但是这里有一个问题,批量插入时 Mybatis 自动设置的 id 和数据库生成的自增主键不一定完全一致,比如批量 insert ignore 和 on duplicate key update 语句。

这个问题和 useGeneratedKeys 的实现有关,代码可参考com.mysql.jdbc.StatementImpl#getGeneratedKeysInternal(long) 函数,以下是其执行逻辑:

  1. Mybatis 执行完插入语句后,MySQL 会返回这次插入影响的数据行数,注意,使用 insert ignore 插入时,忽略的那部分数据不会加到影响的行数上。
  2. Mybatis 使用 SELECT LAST_INSERT_ID() 查询这次插入的最小 id 。
  3. Mybatis 循环遍历插入时用的对象列表,循环的最大次数为第1步里获取的这次插入影响的行数,使用 n 代表当前的循环次数,列表中的每个对象的 id 被赋值为 LAST_INSERT_ID() + n*AUTO_INCREMENT 。

举例来说,假设老库的某张表里有数据 b ,其 id=1,此时往该表使用 insert ignore 批量插入三条数据 a,b,c,其在表内的 id 为 a:2、b:1、c:3,返回的影响行数为2,SELECT LAST_INSERT_ID() 返回的是2,因此 Mybatis 往对象里设置的主键分别为 a:2、b:3、c:null,再使用这个设置好 id 的对象列表插入新库时会导致新老库 id 不一致。

解决方案:由于直接删除 ignore 会改变这条 SQL 的语义,无法通过修改语句来解决问题。所以我们只能在迁移插件里跳过这条语句,使其固定写入老库。然后在业务层单独对其进行迁移改造,将插入新库的流程修改为先使用 id 以外的唯一键查询一次老库的数据,获取到 id 以后设置到对象列表里,再插入新库。

4.3.2 事务

预约业务有部分逻辑用到了事务,但这部分逻辑在双写期间均可以暂停功能,因此迁移插件没有实现事务的支持。如果需要支持业务的话可以不依赖插件,在业务层单独对那部分代码进行改造。

4.3.3 异步写入新库引起的问题

双写过程中是异步写新库,需要重点关注是否会有线程安全问题。举例来说,假设有个业务需要往表里插入一个列表,插入完列表后又对列表进行了修改,比如执行了 List.clear() 函数或者其中的对象发生了变更,由于是异步写新库,所以实际的执行流程可能如下:

  1. 老库 insert(list)
  2. list.clear()
  3. 新库 insert(list)

这会导致新库执行操作时,传入的对象和老库执行操作时不一样,导致新老库数据不一致。建议在迁移前人为的确认业务逻辑,避免异步写入导致新老库数据不一致。

4.4 开启对比和补偿程序,补偿切换开关的过程中遗失的数据

  1. 读开关:只读老库(对比开关开启)
  2. 写开关:双写
  3. 新老库查询结果对比开关:开
  4. 对比&补偿任务:开启

image

该对比&补偿任务有一个缺陷,其不能处理数据被删除的情况,如果老库里的数据被删除但是新库的数据删除失败,那使用更新时间区间就无法从老库查出这条数据,自然也无法进行对比&补偿。

双写期间,如果出现删老库成功但是删新库失败的情况会有日志告警,所以不会有问题。但是停止数据同步工具 → 开启双写开关这一过程中删除的数据无法补偿。不过大部分业务用的都是逻辑删除,只有一处用了物理删除,笔者在这一处添加了日志,如果切换过程中出现删除数据的日志,就需要手动进行补偿操作。实际操作过程中,开关的切换的耗时较短,只花了30秒左右,在这过程中没有打印删除数据的日志。

4.5 逐步切量请求到新库上

  1. 读开关:部分读新库 → 只读新库
  2. 写开关:双写
  3. 新老库查询结果对比开关:开
  4. 对比&补偿任务:开启

双写时,由于数据先写入老库再异步写入新库,因此新库的数据肯定会滞后于老库。如果将一部分读流量切换到新库上,就可能会在一些对延迟要求较高的业务场景中出现问题。对于这种场景,我们不能采用逐步切量的策略,只能同时切换读写开关,将其修改为只写老库+只读新库。

4.6 停止对比补偿程序,关闭双写,读写都切换到新库,开启反向补偿任务

  1. 读开关:只读新库
  2. 写开关:只写新库
  3. 新老库查询结果对比开关:关
  4. 对比&补偿任务:开启反向补偿

反向补偿是从新库补偿数据到老库,由于该任务是定时执行,开启后,新库和老库的数据会有 1~2 分钟的延迟,万一写新库的逻辑有问题,可以切回老库。至于为什么用反向补偿任务而不是使用先写新库再异步写老库的策略,是因为双写是用 MyBatis 插件实现的,插件代理的是 excutor 的 update 和 query 方法,如果异步写入老库,有可能会发生以下情况:

  1. 假设有两个线程,业务线程 A 需要写入一条数据,迁移插件拦截后,先同步写入新库,写完新库后提交任务给线程 B 中异步写入老库,提交完任务后插件立刻返回。
  2. 由于插件已返回结果,executor 上层的 sqlsession 调用 close() 方法关闭 executor (见 org.mybatis.spring.SqlSessionTemplate.SqlSessionInterceptor#invoke ),此时线程 B 可能还没执行完写老库的操作。
  3. 线程 B 执行过程中,由于 executor 已经关闭,导致其写老库失败。

因此无法使用 Mybatis 插件来实现异步写老库。

4.7 停止反向补偿任务,删除表迁移相关代码

停止反向补偿前,需要关注是否还有业务在读老库。观察一段时间,确认老库没有补偿任务以外的读写流量后,可以关闭补偿任务,清理迁移过程中产生的代码,清理老库数据。

五、总结

在进行数据表迁移的过程中,虽然遇到了一些问题,但是制定的方案中每一步都有回退措施,即使出现问题也不会影响业务的正常运行。此外,笔者在迁移过程中对各种异常情况进行了监控,能及时发现并解决问题。如果其他业务需要进行类似的迁移,需要关注以下几个方面:

  1. 迁移插件实现:在对迁移过程进行反思后,笔者人为通过代理或重写 MapperProxy 的方式来实现迁移插件可能是更加合理的方案。这种方案有两个优点:一方面,可以避免处理 Mybatis 复杂的参数转换流程,从而减少潜在的错误和异常;另一方面,可以实现先写新库再异步写老库的操作。但是这个方案没有经过实践,还不能确定是否有可行性。
  2. 自增主键:需要确定业务是否需要保证新老库的 id 一致。
  3. 事务:双写过程中应该结合业务考虑是否需要实现事务支持。本次迁移过程中,我们暂停了部分需要事务支持的业务。
  4. 异步写入:先写老库再异步写入新库的方式可能导致新老库数据不一致,迁移插件自身无法解决这个问题,只能人工提前排查可能存在的隐患。

Dapr是一套开源、可移植的事件驱动型运行时,允许开发人员轻松立足云端与边缘位置运行弹性、微服务、无状态以及有状态等应用程序类型。Dapr能够确保开发人员专注于编写业务逻辑,而不必分神于解决分布式系统难题,由此显著提高生产力并缩短开发时长。
Dapr
是用于构建云原生应用程序的开发人员框架,可以更轻松帮助开发人员在 Kubernetes 上构建运行多个微服务,并与外部状态存储/数据库、机密存储、发布/订阅代理以及其他云服务和自托管解决方案进行交互。

2024年8月14日正式发布了1.14版本,Dapr v1.14的发布标志着分布式应用程序运行时的重要更新。这个版本引入了许多新特性和改进,旨在帮助开发者构建更安全、可靠的微服务。以下是Dapr v1.14的一些主要亮点:

  1. 作业API和调度服务(预览) :工作API是一个用于调度未来任务的编排器,可以在特定时间或特定间隔内执行。这适用于多种场景,如自动数据库备份、定期数据处理和ETL、电子邮件通知、维护任务和系统更新以及批处理等。Dapr的工作API确保这些场景中的任务能够一致且可靠地执行,提高效率并降低错误风险。此外,Dapr还引入了一个新的调度服务,这是一个控制平面服务,用于调度actor提醒。

  2. API更新 :Dapr v1.14对API进行了更新,以支持新的特性和组件。这些更新包括对作业API的引入,该API用于调度未来任务,无论是特定时间还是特定间隔。 使用Actor和Workflow时,性能改进可提高吞吐量并降低延迟,Actor多租户使用命名空间来隔离相同的Actor类型,防止名称冲突。流式订阅,用于动态主题订阅,无需重新启动sidecar。改进的HTTP指标过滤,通过路径匹配防止高基数导致过度的CPU和内存使用。出站消息投影,支持跨多个发布/订阅代理和状态存储的事务提交。

  3. Dapr Shared(预览): 默认情况下,Dapr 作为 sidecar 注入,以便为您的应用程序启用 Dapr API,以实现最佳的可用性和可靠性。Dapr Shared 支持两种替代部署策略来创建 Dapr 应用程序,使用 Kubernetes Daemonset 进行每节点部署或 Deployment 进行每集群部署。
  4. Dapr的构建块 :Dapr提供了一系列分布式系统的构建块,用于以标准方式构建微服务应用程序并部署到任何环境。这些构建块API是独立的,意味着可以在应用程序中使用任意数量的它们。

  5. 平台无关性 :Dapr是平台无关的,可以在本地、任何Kubernetes集群、虚拟或物理机器以及其他Dapr集成的托管环境中运行应用程序。这使得可以在云和边缘运行微服务应用程序。

  6. 升级注意事项 :需要注意的是,这个版本包含一些破坏性变化。有关升级到Dapr v1.14的信息,请参考官方文档中的相关部分。

总的来说,Dapr v1.14的发布为开发者提供了更加强大和灵活的工具,以构建和部署分布式应用程序。这些更新不仅增强了Dapr的功能性,还提高了其适用性和效率,使其成为构建现代微服务架构的首选平台之一,关于Dapr v1.14的更详细内容参看官方博客文章: https://blog.dapr.io/posts/2024/08/14/dapr-v1.14-is-now-available/


相关链接:

导航

  • 业务背景
  • 问题分析与定位
  • 探索可行的解决方案
    • 数据库层面处理——唯一索引
    • 应用程序层面处理——分布式锁
  • 分布式锁概述
    • 分布式锁需要具备哪些特性?
    • 分布式锁有哪些实现方式?
      • 基于数据库的实现方式
      • 基于Redisson实现方式
  • Redission介绍
    • 概述
    • 可重入锁
  • 基于Redisson解决方案
    • 方案梳理
    • Springboot集成Redisson
  • 结语
  • 参考

本文首发
《使用分布式锁解决IM聊天数据重复插入的问题》

业务背景和问题

在IM聊天业务中,除了自建聊天服务器,构架闭环的咨询聊天,往往还需要接入三方的平台的IM流量。

这个就不得不去适配各种平台的推流方式。

在我们自建的IM聊天服务解决方案中,IM会话创建和消息的接收是两个独立模块(接口)。
这种设计方式从客户端层面就将两个流程分开且保证了顺序性,有效避免了一些不可预知的问题。

但是,三方流量平台的是通过消息推流的方式将流量投递给我们,我们必须在接收流量的过程中完成客户、会话、消息的创建。



如果所有消息是排队,一个一个执行,那么这个流程是没有问题的。

但是,我们发现三方推送消息的时候偶尔会发生推送同一客户的多条消息的情况,这种并发写入,导致数据重复写入。

这种情况下,就可能会导致新客户创建多次,对应的会话也会创建多个。



而且还会带来数据查询中偶尔出现
selectOne
的异常。

desc":"org.mybatis.spring.MyBatisSystemException: nested exception is org.apache.ibatis.exceptions.TooManyResultsException: Expected one result (or null) to be returned by selectOne(), but found: 2

在没有查明具体问题之前,我们在特定查询的时候增加了
limit 1
限制,原则上取最新的那一条。

问题分析与定位

对于聊天场景来说,这种脏数据的产生是不能容忍的。

为了找到问题的根本解决办法,我们开始专项排查。

我把代码走读了一遍,发现代码层面没有明显的bug。但是,从数据上来看大概率是消息并发投递导致。

为了证明这种猜想,我编写了一个测试用例来验证。

具体做法,就是写一个python脚本程序,模拟10个线程,每个线程都会调用消息接收的业务接口,并且每个消息的
fromUser

toUser
都是一样的。

核心思想就是同时推送给一个人多条消息。

经过验证,数据重复写入的问题复现了。并发请求原因已经实锤。

这里给个简单示意图,解释一下并发请求的流程。



可行性方案探索

我们自己也思考了一下,大致的解决方案有两种:

  • 数据层面解决
  • 应用程序层面解决

数据层面解决

这个很好理解,利用Mysql字段唯一索引阻止重复插入,这是数据库自己的机制。
但是,因为
user
表中
tenantUserId
字段最初就为设计唯一索引。

ALTER TABLE user ADD UNIQUE uk_tenant_user_id( tenantUserId );

一旦为
tenantUserId
列加上唯一索引后,当上述并发情况发生时,请求1和请求2中必然有一者会优先完成数据的插入操作,而另一者则会得到类似错误。因此,最终保证
user
表中只有一条
tenantUserId
=xxx的记录存在。

 Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'xxx' for key 'tenantUserId'\n##

经过评估,目前单表已经仅仅2000w数据。短时间内升级不太现实。
而且历史数据的修复也不是一个小工程。

应用程序层面解决

另一种解决的思路是我们不依赖底层的数据库来为我们提供唯一性的保障,而是靠应用程序自身的代码逻辑来避免并发冲突。

之所以我们会遇到重复插入数据的问题,是因为“检测数据是否已经存在”和“插入数据”两个动作被分割开来。由于这两个步骤不具备原子性,才导致两个不同的请求可以同时通过第一步的检测。如果我们能够把这两个动作合并为一个原子操作,就可以避免数据冲突了。这时候我们就需要通过加锁,来实现这个代码块的原子性。


考虑到我们的应用程序API是多机部署的,我们决定采用业界比较成熟的分布式锁方案。

分布式锁概述

分布式锁需要具备哪些特性?

  • 在分布式系统环境下,同一时间只有一台机器的一个线程可以获取到锁
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性
  • 具备锁失效机制,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁实现主要有如下三种:

  • 基于数据库实现分布式锁
  • 基于Zookeeper实现分布式锁
  • 基于Redis实现分布式锁

每种的具体实现可以参考
《什么是分布式锁?实现分布式锁的三种方式》

除了以上三种分布式锁实现以外,还有一种是基于
Redission
实现方式。
因为我们业务接口是基于Springboot框架,所以查阅了相关资料我们选择一种
Redission
实现。

Redission介绍

概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

以下是Redisson的结构:

Redisson作为独立节点 可以用于独立执行其他节点发布到分布式执行服务 和 分布式调度任务服务 里的远程任务。



可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改
Config.lockWatchdogTimeout
来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

另外Redisson还通过加锁的方法提供了
leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

Redisson同时还为分布式锁提供了异步执行的相关方法:

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore 对象.

关于
Redisson
的更多介绍请移步
Redisson 中文文档

基于Redisson解决方案

在本案例中,我们采用了基于Redisson实现分布式锁的方式。

方案梳理

技术方案确定了,但是还是需要结合实际场景合理应用。
那么,我们在哪些环节加锁呢?



我们再次对消息接收处理流程进行梳理,在原来的基础上增加了分布式锁。

Springboot集成Redisson

pom.xml中引入redisson

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.34.1</version>
</dependency>  

yml文件中redis配置

  redis:
    enabled: true
    host: xxxx
    port: 6371
    password: xxx
    database: 2
    timeout: 10000
    connectionPoolSize: 15
    connectionMinimumIdleSize: 5

redissonConfig.java

@Configuration
@ConditionalOnExpression("${spring.redis.enabled}")
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.timeout}")
    private String timeout;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.database}")
    private int database;

    @Value("${spring.redis.connectionPoolSize}")
    private int connectionPoolSize;

    @Value("${spring.redis.connectionMinimumIdleSize}")
    private int connectionMinimumIdleSize;

    @Bean(name = "redissonClient")
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.setCodec(new StringCodec());
        SingleServerConfig singleServerConfig =
                config.useSingleServer()
                        .setAddress("redis://" + host + ":" + port)
                        .setDatabase(database)
                        .setConnectionPoolSize(connectionPoolSize)
                        .setConnectionMinimumIdleSize(connectionMinimumIdleSize)
                        .setTimeout(Integer.parseInt(timeout));
        if (StringUtils.isNotBlank(password)) {
            singleServerConfig.setPassword(password);
        }
        return Redisson.create(config);
    }
}

上面准备好之后,就可以在使用了。

核心代码实现

        //新创建增加分布式锁
        String mutex = StrUtil.format("im:lock:user:{}", createUserDto.getTenantUserId());
        RLock lock = redissonClient.getLock(mutex);
        boolean successLock = lock.tryLock();
        if (!successLock) {
            // 获取分布式锁失败
            log.info(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【getOrCreateUser】", JsonUtil.toJson(createUserDto)));
            throw new BizException("该顾客已经在创建中了", ResponseCodeEnum.GET_R_LOCK_FAIL.getCode());
        }
        //创建用户
        User visitor = new User();
        visitor.setUserName(createUserDto.getTenantUserId());
        //...
        //消息创建过程中,首次创建顾客、会话,
        //在获取锁失败的情况下,增加重试机制
        try {
            receiveMessage(inputDto);
        }catch (BizException ex)
        {
            log.error(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【receiveMessage】", ex));
            if(ex.getCode().equals(ResponseCodeEnum.GET_R_LOCK_FAIL.getCode())) {
                //重试一次
                Thread.sleep(1000);
                log.info(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【receiveMessage.retry】", JsonUtil.toJson(inputDto)));
                receiveMessage(inputDto);
            }
        }

Notes: 关于Springboot中如何使用Redisson,更加具体实现代码请移步
《Spring Boot 实战纪实》
,项目源码中可以查阅。

测试用例

确保写的代码是可调式的。-
《对几次通宵加班发版的复盘和思考》

在这么多年的职业生涯中,我逐渐摸索出一个确保代码质量的笨方法——单步调试。

这里我们也写一个测试用例。具体是思路前面也提过,这里不再赘述。

import json
import requests
import time
import uuid
import threading

def receive_xhs_msg():

    try:
          #请求url
          url = """http://localhost:7071/api/message/receive"""

          # 增加请求头
          headers = {
            "Content-Type": "application/json; charset=UTF-8"
          }
          message_id=str(uuid.uuid4())
          print('message_id:'+message_id)
          userInfo={
                "header_image":"xxx.jpg",
                "nickname":"- ",
                "user_id":"63038d28000000001200d311"
          }
   
          payload={
                    "content":"6ED5KduMqTDJZ1ztw+ZPgw==~split~OMo7DD2gqsJqBafx9WKsZlnNNkcEYD4hLLPczczIFmr+YMtTB9Wz4ZI0MYCM4cF28kG7rfqnXdR9cRmamEJzHmKLfTmVxv5jzGUFVQOU00iimtunMAEJ4x76oJDrdAVUc4bJfV5zFLotz/Bm0WM9TADvD2cLhpHsVmaZRXaiJ96wMQgqx+K727l5S15jmMa5PiLqZqBO2q/G+WEkJSbfLQ==",
                    "from_user_id":"63038d28000000001200d311",
                    "intentCommentId":"",
                    "message_id":message_id,
                    "message_source":2,
                    "message_type":"HINT",
                    "timestamp":"1723268668573",
                    "to_user_id":"575d2c135e87e733f0162b88",
                    "user_info":[userInfo]
            }

          #转换成json
          getJson=json.dumps(payload)
          #构造发送请求
          response=requests.post(url=url,data=getJson,headers=headers)
          #打印响应数据
          print(response.text)
          time.sleep(1)
    except Exception as e:
          print('Error:',e)
    finally:
          print('执行完成')

if __name__ == '__main__':
      threads = []
      for _ in range(10):  # 循环创建10个线程
            t = threading.Thread(target=receive_xhs_msg)
            threads.append(t)
      for t in threads:  # 循环启动10个线程
            t.start()
            t.join()

结语

分布式锁在日常工作中应用广泛,比如接口防抖(防重复提交),并发处理等。

在近期的IM消息处理中,正好有了一次生动的实践。

一点浅浅的经验,分享给大家,希望能起到抛砖引玉的作用。

参考

本文书接上回《
一种很变态但有效的DDD建模沟通方式
》,关注公众号(老肖想当外语大佬)获取信息:

  1. 最新文章更新;

  2. DDD框架源码(.NET、Java双平台);

  3. 加群畅聊,建模分析、技术交流;

  4. 视频和直播在B站。

终于到了写代码的环节

如果你已经阅读过本系列前面的所有文章,我相信你对需求分析和建模设计有了更深刻的理解,那么就可以实现“需求-模型-代码”三者一致性的前半部分,如下图所示:

图片

那么接下来,我们来分析一下如何实现“模型-代码”的一致性,尝试通过一篇文章的篇幅,展示符合DDD价值判断的代码组织方式的关键部分,初步窥探一下DDD实践的代码样貌:

图片

领域模型与充血模型

现在假设我们通过需求分析,完成了对模型的设计,并推演确认模型满足提出的所有需求,既然模型满足需求,那么意味着我们设计的模型具备下面特征:

  1. 每个模型有自己明确的职责,这些职责分别对应这着不同的需求点;

  2. 每个模型都包含自己履行职责所需要的所有属性信息;

  3. 每个模型都包含履行职责行为能力,并可以发出对应行为产生的事件;

那么提炼下来,我们会发现模型必须是“
充血模型
”,即同时包含属性和行为,模型与代码的对应关系如下:

图片

我们可以类图来表达模型,即一个聚合根,也可以称之为一个领域,当然一个聚合根可以包含一些复杂类型属性或集合属性,下图示意了一个简单的用户聚合:

图片

下面展示了该模型的示例代码:

Java代码:

package com.yourcompany.domain.aggregates;

import com.yourcompany.domain.aggregates.events.*;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.DynamicUpdate;
import org.hibernate.annotations.Fetch;
import org.hibernate.annotations.FetchMode;
import org.hibernate.annotations.SQLDelete;
import org.hibernate.annotations.Where;
import org.netcorepal.cap4j.ddd.domain.event.annotation.DomainEvent;
import org.netcorepal.cap4j.ddd.domain.event.impl.DefaultDomainEventSupervisor;

import javax.persistence.*;

/**
 * 用户
 * <p>
 * 本文件由[cap4j-ddd-codegen-maven-plugin]生成
 * 警告:请勿手工修改该文件的字段声明,重新生成会覆盖字段声明
 */
/* @AggregateRoot */
@Entity
@Table(name = "`user`")
@DynamicInsert
@DynamicUpdate

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Getter
public class User {

    // 【行为方法开始】

    public void init() {
        DefaultDomainEventSupervisor.instance.attach(UserCreatedDomainEvent.builder()
                .user(this)
                .build(), this);
    }

    public void changeEmail(String email) {
        this.email = email;
        DefaultDomainEventSupervisor.instance.attach(UserEmailChangedDomainEvent.builder()
                .user(this)
                .build(), this);
    }

    // 【行为方法结束】


    // 【字段映射开始】本段落由[cap4j-ddd-codegen-maven-plugin]维护,请不要手工改动

    @Id
    @GeneratedValue(generator = "org.netcorepal.cap4j.ddd.application.distributed.SnowflakeIdentifierGenerator")
    @GenericGenerator(name = "org.netcorepal.cap4j.ddd.application.distributed.SnowflakeIdentifierGenerator", strategy = "org.netcorepal.cap4j.ddd.application.distributed.SnowflakeIdentifierGenerator")
    @Column(name = "`id`")
    Long id;


    /**
     * varchar(100)
     */
    @Column(name = "`name`")
    String name;

    /**
     * varchar(100)
     */
    @Column(name = "`email`")
    String email;

    // 【字段映射结束】本段落由[cap4j-ddd-codegen-maven-plugin]维护,请不要手工改动
}

C#代码:

图片

领域事件的定义如下:

Java代码:

package com.yourcompany.domain.aggregates.events;

import com.yourcompany.domain.aggregates.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.netcorepal.cap4j.ddd.domain.event.annotation.DomainEvent;

/**
 * 用户创建事件
 */
@DomainEvent
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserCreatedDomainEvent {
    User user;
}
package com.yourcompany.domain.aggregates.events;

import com.yourcompany.domain.aggregates.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.netcorepal.cap4j.ddd.domain.event.annotation.DomainEvent;
/**
 * 用户邮箱变更事件
 */
@DomainEvent
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserEmailChangedDomainEvent {
    User user;
}

C#代码:

//定义领域事件
using NetCorePal.Extensions.Domain;
namespace YourNamespace;

public record UserCreatedDomainEvent(User user) : IDomainEvent;

public record UserEmailChangedDomainEvent(User user) : IDomainEvent;

至此,我们的一个领域模型的代码就完成了。

领域模型之外的关键要素

让我们再回到“模型拟人化”的类比上,想象一下在企业里一个任务是怎么被完成的,下图展示了一个典型流程:

图片

如果我们将这个过程对应到软件系统,可以得到如下流程:

图片

根据上面的对应我可以知道除了领域模型之外,其他的关键要素:

  1. Controller

  2. Command与CommandHandler

  3. DomainEventHandler

接下来,我们分别对这些部分进行说明

Controller

有过web项目开发经验的开发者,对Controller并不陌生,它是web服务与前端交互的入口,在这里Controller的主要职责是:

  1. 接收外部输入

  2. 将请求输入及当前用户会话等信息组装成命令

  3. 发出/执行命令

  4. 响应命令执行结果

Java代码:

package com.yourcompany.adapter.portal.api;

import com.yourcompany.adapter.portal.api._share.ResponseData;
import com.yourcompany.application.commands.CreateUserCommand;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.Valid;

/**
 * 用户控制器
 */
@Tag(name = "用户")
@RestController
@RequestMapping(value = "/api/user")
@Slf4j
public class UserController {

    @Autowired
    CreateUserCommand.Handler createUserCommandHandler;

    @PostMapping("/")
    public ResponseData<Long> createUserCommand(@RequestBody @Valid CreateUserCommand cmd) {
        Long result = createUserCommandHandler.exec(cmd);
        return ResponseData.success(result);
    }
}

C#代码:

[Route("api/[controller]")]
[ApiController]
public class UserController(IMediator mediator) : ControllerBase
{
    [HttpPost]
    public async Task<ResponseData<UserId>> Post([FromBody] CreateUserRequest request)
    {
        var cmd = new CreateUserCommand(request.Name, request.Email);
        var id = await mediator.Send(cmd);
        return id.AsResponseData();
    }
}

===

===

Command与CommandHandler

基于前面的对应关系,Command对应任务,那么我们可以这样理解:

  1. Command是执行任务所需要的信息

  2. CommandHandler负责将命令信息传递给领域模型

  3. CommandHandler最后要将领域模型持久化

下面是一个简单的示例:

Java代码:

package com.yourcompany.application.commands;

import com.yourcompany.domain.aggregates.User;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.netcorepal.cap4j.ddd.application.command.Command;
import org.netcorepal.cap4j.ddd.domain.repo.AggregateRepository;
import org.netcorepal.cap4j.ddd.domain.repo.UnitOfWork;
import org.springframework.stereotype.Service;


/**
 * 创建用户命令
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CreateUserCommand {
    String name;
    String email;

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public static class Handler implements Command<CreateUserCommand, Long> {
        private final AggregateRepository<User, Long> repo;
        private final UnitOfWork unitOfWork;

        @Override
        public Long exec(CreateUserCommand cmd) {
            User user = User.builder()
                    .name(cmd.name)
                    .email(cmd.email)
                    .build();
            user.init();
            unitOfWork.persist(user);
            unitOfWork.save();
            return user.getId();
        }
    }
}

C#代码:

public record CreateUserCommand(string Name, string Email) : ICommand<UserId>;

public class CreateUserCommandHandler(IUserRepository userRepository) 
    : ICommandHandler<CreateUserCommand, UserId>
{
    public async Task<UserId> Handle(CreateUserCommand request, CancellationToken cancellationToken)
    {
        var user = new User(request.Name, request.Email);
        user = await userRepository.AddAsync(user, cancellationToken);
        return user.Id;
    }
}

===

===

DomainEventHandler

当我们的命令执行完成,领域模型会产生领域事件,那么关心领域事件,期望在领域事件发生时执行一些操作,就可以使用DomainEventHandler来完成:

  1. DomainEventHandler根据事件信息产生新的命令并发出

  2. 每个DomainEventHandler只做一件事,即只发出一个命令

Java代码:

package com.yourcompany.application.subscribers;

import com.yourcompany.application.commands.DoSomethingCommand;
import com.yourcompany.domain.aggregates.events.UserCreatedDomainEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

/**
 * 用户创建领域事件
 */
@Service
@RequiredArgsConstructor
public class UserCreatedDomainEventHandler {
    private final DoSomethingCommand.Handler handler;

    @EventListener(UserCreatedDomainEvent.class)
    public void handle(UserCreatedDomainEvent event) {
        handler.exec(DoSomethingCommand.builder()
                .param(event.getUser().getId())
                .build());
    }
}

C#代码:

public class UserCreatedDomainEventHandler(IMediator mediator) 
           : IDomainEventHandler<UserCreatedDomainEvent>
{
    public Task Handle(UserCreatedDomainEvent notification, CancellationToken cancellationToken)
    {
        return mediator.Send(new DoSomethingCommand(notification.User.Id), cancellationToken);
    }
}

===

===

模型的持久化

在前文,我们一直强调一个观点,“在设计模型时忘掉数据库”,那么当我们完成模型设计之后,如何将模型存储进数据库呢?通常我们会使用仓储模式在负责模型的“存取”操作,下面代码示意了一个仓储具备的基本能力以及仓储的定义,略微不同的是,我们实现了
工作单元模式
(UnitOfWork),以屏蔽数据库的“增删改查”语义,我们只需要从仓储中“取出模型”、“操作模型”、“保存模型”即可。

Java代码:

package com.yourcompany.adapter.domain.repositories;

import com.yourcompany.domain.aggregates.User;

/**
 * 本文件由[cap4j-ddd-codegen-maven-plugin]生成
 */
public interface UserRepository extends org.netcorepal.cap4j.ddd.domain.repo.AggregateRepository<User, Long> {
    // 【自定义代码开始】本段落之外代码由[cap4j-ddd-codegen-maven-plugin]维护,请不要手工改动

    @org.springframework.stereotype.Component
    public static class UserJpaRepositoryAdapter extends org.netcorepal.cap4j.ddd.domain.repo.AbstractJpaRepository<User, Long>
{
        public UserJpaRepositoryAdapter(org.springframework.data.jpa.repository.JpaSpecificationExecutor<User> jpaSpecificationExecutor, org.springframework.data.jpa.repository.JpaRepository<User, Long> jpaRepository) {
            super(jpaSpecificationExecutor, jpaRepository);
        }
    }

    // 【自定义代码结束】本段落之外代码由[cap4j-ddd-codegen-maven-plugin]维护,请不要手工改动
}

C#代码:

public interface IRepository<TEntity, TKey> : IRepository<TEntity>
  where TEntity : notnull, Entity<TKey>, IAggregateRoot
  where TKey : notnull
{
  IUnitOfWork UnitOfWork { get; }
  TEntity Add(TEntity entity);
  Task<TEntity> AddAsync(TEntity entity, CancellationToken cancellationToken = default (CancellationToken));
  int DeleteById(TKey id);
  Task<int> DeleteByIdAsync(TKey id, CancellationToken cancellationToken = default (CancellationToken));
  TEntity? Get(TKey id);
  Task<TEntity?> GetAsync(TKey id, CancellationToken cancellationToken = default (CancellationToken));
}


public interface IUserRepository : IRepository<User, UserId>
{
}

public class UserRepository(ApplicationDbContext context) 
    : RepositoryBase<User, UserId, ApplicationDbContext>(context), IUserRepository
{
}

===

===

查询的处理

下面展示了一个简单的查询的代码

Java代码:

package com.yourcompany.application.queries;

import com.yourcompany._share.exception.KnownException;
import com.yourcompany.domain.aggregates.User;
import com.yourcompany.domain.aggregates.schemas.UserSchema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.netcorepal.cap4j.ddd.application.query.Query;
import org.netcorepal.cap4j.ddd.domain.repo.AggregateRepository;
import org.springframework.stereotype.Service;


/**
 * 查询用户
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserQuery {
    private Long id;

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public static class Handler implements Query<UserQuery, UserQueryDto> {
        private final AggregateRepository<User, Long> repo;

        @Override
        public UserQueryDto exec(UserQuery param) {
            User entity = repo.findOne(UserSchema.specify(
                    root -> root.id().eq(param.id)
            )).orElseThrow(() -> new KnownException("不存在"));

            return UserQueryDto.builder()
                    .id(entity.getId())
                    .name(entity.getName())
                    .email(entity.getEmail())
                    .build();
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public static class UserQueryDto {
        private Long id;
        private String name;
        private String email;
    }
}

C#代码:

public class UserQuery(ApplicationDbContext applicationDbContext)
{
    public async Task<UserDto?> QueryOrder(UserId userId, CancellationToken cancellationToken)
    {
        return await applicationDbContext.Users.Where(p => p.Id == userId)
            .Select(p => new UserDto(p.Id, p.Name)).SingleOrDefault();
    }
}

===

===

CQRS似乎是唯一正解

我们在实际的软件系统中,查询往往是场景复杂的,不同的查询需求,可能打破模型的整体性,显然使用领域模型本身来满足这些需求是不现实的,那么就需要针对需求场景,组织对应的数据结构作为输出结果,这就与“CQRS”模式不谋而合,或者说“CQRS”就是为了解决这个问题而被提出的,并且这个模式与“命令-事件”的思维浑然一体,前面的代码示例也印证了这一点,因此我们认为DDD的实践落地,需要借助CQRS的模式。

图片

源码资料

本文示例分别使用了cap4j(Java)和netcorepal-cloud-framework(dotnet),欢迎参与项目讨论和贡献,项目地址如下:

https://github.com/netcorepal/cap4j

https://github.com/netcorepal/netcorepal-cloud-framework

开心一刻

今天心情不好,想约哥们喝点

我:心情不好,给你女朋友说一声,来我家,过来喝点

哥们:行!我给她说一声

我:你想吃啥?我点外卖

哥们:你俩定吧,我已经让她过去了

我:???我踏马让你过来!和她说一声

哥们:哈哈哈,我踏马寻思让她过去呢

成功给我逗笑了

前情回顾

SpringBoot2.7 霸王硬上弓 Logback1.3 → 不甜但解渴
实现了
spring-boot 2.x.x

logback 1.3.x
的集成,分两步

  1. 关闭 Spring Boot 的 LoggingSystem
  2. 配置文件用 logback.xml

从示例看,集成是成功的;但有些问题是没有分析的,比如

  1. System.setProperty("org.springframework.boot.logging.LoggingSystem", "none") 是如何生效的
  2. Spring Boot 的 LoggingSystem 是如何与日志组件绑定的
  3. Spring Boot 默认依赖 3 个日志组件:logback、log4j、jul,为什么默认启用的是 logback,而非其它两个?

基于如上 3 个问题,我们一起去翻一翻 Spring Boot 的源码;在看源码之前,我先带大家回顾一些内容,方便下文的源码分析

  1. 设计模式之观察者模式 → 事件机制的底层原理

    讲了观察者模式的实现,以及在 JDK 中的应用(JDK 事件模型)、Spring 中的应用(事件机制);大家可以重点看下 Spring 的那个案例,使用非常简单,总结一句就是


    SpringBoot 启动过程中发送的事件,所有 ApplicationListener 都会收到(即 onApplicationEvent 方法会被调用)

  2. spring-boot-2.0.3启动源码篇一 - SpringApplication构造方法

    大家不要通篇去读,重点看
    getSpringFactoriesInstances
    ,与本文息息相关的归纳成一句


    查找类路径下全部的 META-INF/spring.factories 的文件路径,并加载所有 spring.factories 中的内容到 SpringFactoriesLoader 的 cache 中,然后从缓存中获取 ApplicationListener 类型的类并进行实例化

下文是基于 Spring Boot 默认情况下的源码分析,而非集成 logback 1.3.x 的源码分析,大家注意下

集成 logback 1.3.x 需要关闭 Spring Boot 的 LoggingSystem,那还分析个毛

源码分析

问题来了,从哪开始跟?我就不绕圈子了,从
LoggingApplicationListener
开始跟,首先它在
META-INF/spring.factories

spring.factories_LoggingApplicationListener

其次它实现了
ApplicationListener

LoggingApplicationListener类图

那么 Spring Boot 在启动过程中会实例化
LoggingApplicationListener
,Spring Boot 启动过程中发送的事件都会来到
LoggingApplicationListener

onApplicationEvent
方法

@Override
public void onApplicationEvent(ApplicationEvent event) {
	if (event instanceof ApplicationStartingEvent) {
		onApplicationStartingEvent((ApplicationStartingEvent) event);
	}
	else if (event instanceof ApplicationEnvironmentPreparedEvent) {
		onApplicationEnvironmentPreparedEvent((ApplicationEnvironmentPreparedEvent) event);
	}
	else if (event instanceof ApplicationPreparedEvent) {
		onApplicationPreparedEvent((ApplicationPreparedEvent) event);
	}
	else if (event instanceof ContextClosedEvent) {
		onContextClosedEvent((ContextClosedEvent) event);
	}
	else if (event instanceof ApplicationFailedEvent) {
		onApplicationFailedEvent();
	}
}

Spring Boot 启动过程分不同的阶段,在每个阶段都会发送对应阶段的事件,
LoggingApplicationListener
针对这些事件会有不同的处理,我们暂且只需要关注以下事件

ApplicationStartingEvent,对应的处理方法:onApplicationStartingEvent

ApplicationEnvironmentPreparedEvent,对应的处理方法:onApplicationEnvironmentPreparedEvent

ApplicationPreparedEvent,对应的处理方法:onApplicationPreparedEvent

onApplicationStartingEvent

private void onApplicationStartingEvent(ApplicationStartingEvent event) {
	this.loggingSystem = LoggingSystem.get(event.getSpringApplication().getClassLoader());
	this.loggingSystem.beforeInitialize();
}

方法很简单,获取日志系统,然后调用其
beforeInitialize
方法,我们跟进
LoggingSystem.get

public static LoggingSystem get(ClassLoader classLoader) {
	String loggingSystemClassName = System.getProperty(SYSTEM_PROPERTY);
	if (StringUtils.hasLength(loggingSystemClassName)) {
		if (NONE.equals(loggingSystemClassName)) {
			return new NoOpLoggingSystem();
		}
		return get(classLoader, loggingSystemClassName);
	}
	LoggingSystem loggingSystem = SYSTEM_FACTORY.getLoggingSystem(classLoader);
	Assert.state(loggingSystem != null, "No suitable logging system located");
	return loggingSystem;
}

打个断点调试下,你们就会发现
SYSTEM_PROPERTY
的值是
org.springframework.boot.logging.LoggingSystem

system_property

从系统属性中获取
org.springframework.boot.logging.LoggingSystem
,是不是和

System.setProperty("org.springframework.boot.logging.LoggingSystem", "none") 是如何生效的

对应上了?如果获取的值是
none
,直接返回
NoOpLoggingSystem
实例

/**
 * {@link LoggingSystem} that does nothing.
 */
static class NoOpLoggingSystem extends LoggingSystem {

	@Override
	public void beforeInitialize() {

	}

	@Override
	public void setLogLevel(String loggerName, LogLevel level) {

	}

	@Override
	public List<LoggerConfiguration> getLoggerConfigurations() {
		return Collections.emptyList();
	}

	@Override
	public LoggerConfiguration getLoggerConfiguration(String loggerName) {
		return null;
	}

}

全是空实现,相当于关闭了 Spring Boot 的 LoggingSystem;
org.springframework.boot.logging.LoggingSystem
还可以设置成其他值,但需要有对应的实现。默认情况下
loggingSystemClassName
的值是
null
,会跳过 if 来到
SYSTEM_FACTORY.getLoggingSystem(classLoader);

@Override
public LoggingSystem getLoggingSystem(ClassLoader classLoader) {
	List<LoggingSystemFactory> delegates = (this.delegates != null) ? this.delegates.apply(classLoader) : null;
	if (delegates != null) {
		for (LoggingSystemFactory delegate : delegates) {
			LoggingSystem loggingSystem = delegate.getLoggingSystem(classLoader);
			if (loggingSystem != null) {
				return loggingSystem;
			}
		}
	}
	return null;
}

这里推荐用断点调试去跟源码,按
F7
之后会来到
LoggingSystemFactory#fromSpringFactories

/**
 * Return a {@link LoggingSystemFactory} backed by {@code spring.factories}.
 * @return a {@link LoggingSystemFactory} instance
 */
static LoggingSystemFactory fromSpringFactories() {
	return new DelegatingLoggingSystemFactory(
			(classLoader) -> SpringFactoriesLoader.loadFactories(LoggingSystemFactory.class, classLoader));
}

SpringFactoriesLoader.loadFactories
是不是很眼熟?(不眼熟的去看:
spring-boot-2.0.3启动源码篇一 - SpringApplication构造方法
)此时它会做三件事

  1. 从 SpringFactoriesLoader#cache 中获取 LoggingSystemFactory 类型的工厂类的类名列表


    spring.factories_LoggingSystemFactory

    之前已经加载到 SpringFactoriesLoader#cache 中,所以此时从缓存中获取;注意看三个实现类的顺序,
    LogbackLoggingSystem.Factory
    在最前面

  2. 实例化这些工厂类

  3. 对这些工厂类实例按 @Order 升序排序

    这三个工厂类的 @Order 值是一样的,都是
    @Order(Ordered.LOWEST_PRECEDENCE)
    ,所以顺序不变,
    LogbackLoggingSystem.Factory
    仍在最前面


    LoggingSystemFactory列表

回到
DelegatingLoggingSystemFactory#getLoggingSystem
,对这些工厂类实例逐个遍历,得到
LoggingSystem
立即返回,不再遍历后面的工厂实例;第一个遍历的的是
LogbackLoggingSystem.Factory
,调用其
getLoggingSystem
方法

private static final boolean PRESENT = ClassUtils.isPresent("ch.qos.logback.classic.LoggerContext",
		Factory.class.getClassLoader());

@Override
public LoggingSystem getLoggingSystem(ClassLoader classLoader) {
	if (PRESENT) {
		return new LogbackLoggingSystem(classLoader);
	}
	return null;
}

ch.qos.logback.classic.LoggerContext
存在(即存在logback依赖),直接创建
LogbackLoggingSystem
实例并返回;至此 Spring Boot 的 LoggingSystem 确定将基于
logback
,而非
log4j
,也非
jul
,问题

Spring Boot 的 LoggingSystem 是如何与日志组件绑定的

Spring Boot 默认依赖 3 个日志组件:logback、log4j、jul,为什么默认启用的是 logback,而非其它两个?

是不是清楚了?LoggingSystem 确定为 LogbackLoggingSystem 后回到
LoggingApplicationListener#onApplicationStartingEvent
方法的第二行,即调用
LogbackLoggingSystem#beforeInitialize
方法

@Override
public void beforeInitialize() {
	LoggerContext loggerContext = getLoggerContext();
	if (isAlreadyInitialized(loggerContext)) {
		return;
	}
	super.beforeInitialize();
	loggerContext.getTurboFilterList().add(FILTER);
}

主要初始化
LoggerContext
,跟进
getLoggerContext()

private LoggerContext getLoggerContext() {
	ILoggerFactory factory = StaticLoggerBinder.getSingleton().getLoggerFactory();
	Assert.isInstanceOf(LoggerContext.class, factory,
			() -> String.format(
					"LoggerFactory is not a Logback LoggerContext but Logback is on "
							+ "the classpath. Either remove Logback or the competing "
							+ "implementation (%s loaded from %s). If you are using "
							+ "WebLogic you will need to add 'org.slf4j' to "
							+ "prefer-application-packages in WEB-INF/weblogic.xml",
					factory.getClass(), getLocation(factory)));
	return (LoggerContext) factory;
}

StaticLoggerBinder
有没有很熟悉?看下它的全类名:
org.slf4j.impl.StaticLoggerBinder
,在
logback-classic-1.2.12.jar
下 ,而
logback 1.3.x
没有这个类

所以 spring-boot 2.x.x 默认不支持 logback 1.3.x

总结下,
onApplicationStartingEvent
方法确定了日志系统是
LogbackLoggingSystem

onApplicationEnvironmentPreparedEvent

private void onApplicationEnvironmentPreparedEvent(ApplicationEnvironmentPreparedEvent event) {
	SpringApplication springApplication = event.getSpringApplication();
	if (this.loggingSystem == null) {
		this.loggingSystem = LoggingSystem.get(springApplication.getClassLoader());
	}
	initialize(event.getEnvironment(), springApplication.getClassLoader());
}

很显然
loggingSystem
不为
null
,我们直接跟
initialize
方法

protected void initialize(ConfigurableEnvironment environment, ClassLoader classLoader) {
	getLoggingSystemProperties(environment).apply();
	this.logFile = LogFile.get(environment);
	if (this.logFile != null) {
		this.logFile.applyToSystemProperties();
	}
	// 日志分组,暂不关注
	this.loggerGroups = new LoggerGroups(DEFAULT_GROUP_LOGGERS);
	// 设置早期日志级别,主要debug和trace之间的抉择
	initializeEarlyLoggingLevel(environment);
	// 初始化日志系统
	initializeSystem(environment, this.loggingSystem, this.logFile);
	// 设置最终日志级别
	initializeFinalLoggingLevels(environment, this.loggingSystem);
	registerShutdownHookIfNecessary(environment, this.loggingSystem);
}

我们暂时只关注
initializeSystem
方法

initializeSystempng

继续往下跟,来到
LogbackLoggingSystem#initialize

LogbackLoggingSystem

继续往下跟,来到
AbstractLoggingSystem#initialize

AbstractLoggingSystem

继续往下跟,来到
AbstractLoggingSystem#initializeWithConventions

private void initializeWithConventions(LoggingInitializationContext initializationContext, LogFile logFile) {
	String config = getSelfInitializationConfig();
	if (config != null && logFile == null) {
		// self initialization has occurred, reinitialize in case of property changes
		reinitialize(initializationContext);
		return;
	}
	if (config == null) {
		config = getSpringInitializationConfig();
	}
	if (config != null) {
		loadConfiguration(initializationContext, config, logFile);
		return;
	}
	loadDefaults(initializationContext, logFile);
}

其中
getSelfInitializationConfig()
就是从
classpath
下逐个寻找

logback-test.groovy, logback-test.xml, logback.groovy, logback.xml

这四个文件,一旦找到则直接返回;因为找到了
logback.xml
,所以来到第一个 if

initializeWithConventions-reinitialize

继续跟进,来到
LogbackLoggingSystem#reinitialize

LogbackLoggingSystem


logback.xml
中的配置进行加载;至此,Spring Boot 的 LoggingSystem 与 Logback 的绑定就完成了,你们清楚了吗?

我们重新回到
AbstractLoggingSystem#initializeWithConventions
,如果
classpath

logback-test.groovy, logback-test.xml, logback.groovy, logback.xml

这四个文件都没有,会来到
config = getSpringInitializationConfig();
,逐步跟下去会来到
AbstractLoggingSystem#getSpringConfigLocations

protected String[] getSpringConfigLocations() {
	String[] locations = getStandardConfigLocations();
	for (int i = 0; i < locations.length; i++) {
		String extension = StringUtils.getFilenameExtension(locations[i]);
		locations[i] = locations[i].substring(0, locations[i].length() - extension.length() - 1) + "-spring."
				+ extension;
	}
	return locations;
}

这个方法大家都能看懂吧,
locations
的值

logback-test.groovy, logback-test.xml, logback.groovy, logback.xml

逐个遍历,然后进行拼接,最终得到

logback-test-spring.groovy, logback-test-spring.xml, logback-spring.groovy, logback-spring.xml

getSpringConfigLocations

同样从
classpath
下逐个寻找,一旦找到直接返回;这也是为什么我们的日志配置文件是
logback-spring.xml
也能生效的原因。我们可以给 Spring Boot 的日志配置文件排个优先级

logback-test.groovy > logback-test.xml > logback.groovy > logback.xml > logback-test-spring.groovy > logback-test-spring.xml > logback-spring.groovy > logback-spring.xml

总结下,
onApplicationEnvironmentPreparedEvent
完成了日志系统的初始化(日志配置文件的加载)

onApplicationPreparedEvent

private void onApplicationPreparedEvent(ApplicationPreparedEvent event) {
	ConfigurableApplicationContext applicationContext = event.getApplicationContext();
	ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
	if (!beanFactory.containsBean(LOGGING_SYSTEM_BEAN_NAME)) {
		beanFactory.registerSingleton(LOGGING_SYSTEM_BEAN_NAME, this.loggingSystem);
	}
	if (this.logFile != null && !beanFactory.containsBean(LOG_FILE_BEAN_NAME)) {
		beanFactory.registerSingleton(LOG_FILE_BEAN_NAME, this.logFile);
	}
	if (this.loggerGroups != null && !beanFactory.containsBean(LOGGER_GROUPS_BEAN_NAME)) {
		beanFactory.registerSingleton(LOGGER_GROUPS_BEAN_NAME, this.loggerGroups);
	}
	if (!beanFactory.containsBean(LOGGING_LIFECYCLE_BEAN_NAME) && applicationContext.getParent() == null) {
		beanFactory.registerSingleton(LOGGING_LIFECYCLE_BEAN_NAME, new Lifecycle());
	}
}

代码不复杂,就是注册了几个
Bean
到 Spring 容器,其中的
loggingSystem
是我们暂时比较关注的,默认情况下其类型是:
LogbackLoggingSystem

日志打印

Spring Boot 的 LoggingSystem 完成与 Logback 的绑定后,它是如何使用然后打印日志的呢?是不是也像

业务日志打印样例

这样来使用的?那绝对不可能的!

绝对不可能

这么使用的话,跟 Spring Boot 的 LoggingSystem 有鸡毛的关系?我们来看下 Spring Boot 中日志的使用,
SpringApplication
179 行就用到了

SpringApplicaton179行

我们会发现
Log

LogFactory

spring-jcl-5.3.31.jar
包下

spring-jcl_LogFactory

spring-jcl 类似 slf4j,也是一个日志门面,本文不展开

跟进
LogFactory.getLog
,一路跟下去会来到
LogAdapter#createLog

public static Log createLog(String name) {
	switch (logApi) {
		case LOG4J:
			return Log4jAdapter.createLog(name);
		case SLF4J_LAL:
			return Slf4jAdapter.createLocationAwareLog(name);
		case SLF4J:
			return Slf4jAdapter.createLog(name);
		default:
			// Defensively use lazy-initializing adapter class here as well since the
			// java.logging module is not present by default on JDK 9. We are requiring
			// its presence if neither Log4j nor SLF4J is available; however, in the
			// case of Log4j or SLF4J, we are trying to prevent early initialization
			// of the JavaUtilLog adapter - e.g. by a JVM in debug mode - when eagerly
			// trying to parse the bytecode for all the cases of this switch clause.
			return JavaUtilAdapter.createLog(name);
	}
}

logApi
的值获取如下

private static final String LOG4J_SPI = "org.apache.logging.log4j.spi.ExtendedLogger";

private static final String LOG4J_SLF4J_PROVIDER = "org.apache.logging.slf4j.SLF4JProvider";

private static final String SLF4J_SPI = "org.slf4j.spi.LocationAwareLogger";

private static final String SLF4J_API = "org.slf4j.Logger";

private static final LogApi logApi;

static {
	if (isPresent(LOG4J_SPI)) {
		if (isPresent(LOG4J_SLF4J_PROVIDER) && isPresent(SLF4J_SPI)) {
			// log4j-to-slf4j bridge -> we'll rather go with the SLF4J SPI;
			// however, we still prefer Log4j over the plain SLF4J API since
			// the latter does not have location awareness support.
			logApi = LogApi.SLF4J_LAL;
		}
		else {
			// Use Log4j 2.x directly, including location awareness support
			logApi = LogApi.LOG4J;
		}
	}
	else if (isPresent(SLF4J_SPI)) {
		// Full SLF4J SPI including location awareness support
		logApi = LogApi.SLF4J_LAL;
	}
	else if (isPresent(SLF4J_API)) {
		// Minimal SLF4J API without location awareness support
		logApi = LogApi.SLF4J;
	}
	else {
		// java.util.logging as default
		logApi = LogApi.JUL;
	}
}

根据优先级逐个去类路径下寻找类,找到了直接返回;Spring Boot 默认情况下用的是 SLF4J + Logback,所以
logApi
的值是
SLF4J_SPI
,那么
LogAdapter#createLog
的返回值的类型是
LogAdapter$Slf4jLocationAwareLog

LogAdapter$Slf4jLocationAwareLog

相当于完成了
spring-jcl

slf4j
的适配;这么说来,Spring Boot 日志还是走的 SLF4J + Logback ?跟 Spring Boot 的 LoggingSystem 有什么关系呢?敬请期待下篇

未完待续

总结

  1. onApplicationStartingEvent

    确定日志系统类型并创建对应的
    LoggingSystem
    ,默认情况下是
    LogbackLoggingSystem

  2. onApplicationEnvironmentPreparedEvent

    完成日志配置文件的加载以及
    LoggingSystem
    的初始化

  3. Spring Boot 的日志打印貌似与 LoggingSystem 没有关系?下篇分析