wenmo8 发布的文章

沃土云创开源开发者专项计划是华为给开源开发者提供
专属激励资源
,鼓励开发者积极参与
开源 for Huawei
适配,践行“让优秀开发者支持更优秀开发者”的理念。

之前我们介绍了
fake2db
的适配过程,本文主要分享一位开源开发者参与Beam适配GaussDB的实践经验。

Beam是一个由ASF社区孵化的开源统一编程模型,适用于复杂的数据处理,提供了一个可移动(兼容性好)的 API 层。这层 API 的核心概念基于 Beam 模型(以前被称为 Dataflow 模型),并在 Beam 这个计算引擎上执行。

随着业务数据量不断增加,业界出现了越来越多的分布式数据处理框架,从最早的hadoop Mapreduce到Apache Spark、Apache Storm、以及Apache Flink。新的分布式处理框架带来了更高性能,更强大功能,更低的延迟等,但是当用户把程序切换到新的分布式处理框架的代价也非常大,需要重写对应的业务逻辑。

解决这个问题需要注意两部分,首先,
需要一个编程范式
,能够统一规范分布式数据处理,例如采用批处理还是流处理;其次数据处理程序需要可以
在各个分布式引擎上正常执行,
用户可以根据实际情况切换计算引擎和执行环境。Apache Beam 正好可以解决以上问题。

了解和分析开源项目

以Beam为例,官网文档详尽地说明了怎么使用。开发者需实现对应的接口,尤其是其中连接数据库的方法,这通常要求使用不同的连接方式,
主要是实现Apply方法
。完成前期准备工作,有利于开发者在动手之前就能对项目的适配目标和功能有较清晰的认识,从而避免开发者在后续开发中因直接面对大量代码无从下手,节省了大量寻找适配路径的宝贵时间。

此外,在正式开发之前,建议在本地环境或华为云环境中先运行项目。这样不仅能通过实际操作来直观感受项目的功能,而且提前部署好环境还能为后续的测试和验证工作带来便利。

如何开发开源项目

经过对Beam的了解,对适配该项目开发工作有了初步认识。在开始开发Beam项目时候,第一步将华为云
OpenSourceForHuaweiDemoJava 仓库
代码fork至个人仓库(地址:https://gitcode.com/HuaweiCloudDeveloper/OpenSourceForHuaweiDemoJava/overview),然后克隆到本地开发环境中。

鉴于Beam项目主要采用Java语言开发,接下来我们将华为云提供的Java SDK添加为项目依赖,并借助该SDK来实现Beam组件中的Apply关键方法。在适配GaussDB时主要是重写Apply方法实现数据库的连接。

在开发Beam过程中,我们参考了华为云官方接口文档(https://support.huaweicloud.com/intl/zh-cn/sdkreference-iothub/iot_10_10002.html),以全面掌握其SDK的各项功能。成功实现接口后,我们进行了单元测试,确保软件功能正常运行。

案例验证流程

若Beam计算引擎可以从GaussDB中读取和写入数据,则可说明Beam这个计算引擎能在GaussDB中正常使用,demo案例验证过程如下:

首先需要在GaussDB数据库中创建一张表beam_t,其中表的主键字段是id,在表中输入几笔测试数据;

然后使用Beam这个计算引擎框架编写代码,在华为云CCE中部署运行项目,这样可以亲身体验项目的功能,从而获得更直观的感受。

代码实现如下逻辑:

  1. 先重写Beam计算框架中的Apply方法,目的是创建与GaussDB数据库连接

  2. 从GaussDB数据库中读取 beam_t 表中字段id的最大值,并记录为max1

  3. 将max1加上1得到max2值

  4. 将max2值回写到GaussDB数据库对应的 beam_t表中

  5. 再次查询 GaussDB数据库中表 beam_t 字段id最大值,并关闭数据库连接

  6. 对比max1和max2发现前后最大值有变化,可以验证Beam计算引擎是可以连接GaussDB并读取数据以及向GaussDB写入数据的

测试流程如下:

  • 先在客户端查表中最大值是9:

  • 执行运行程序(接口返回运行程序前是9,运行程序后是10,符合预期),CCE部署:

  • 接口测试:

  • 用客户端再查对应表的最大id值是10:

至此,通过以上测试证明Beam可以从GaussDB读取数据以及写入数据。

本案例使用的数据库驱动是GaussDB官网推荐的驱动gsjdbc4.jar(
主类名为“ org.postgresql.Driver ”,数据库连接的url前缀为“ jdbc:postgresql ”)。基于GaussDB是从PostgreSQL演变而来,Beam能够支持
postgresql数据库,理论 GaussDB也能够支持 gsjdbc4.jar,果不其然,通过使用 gsjdbc4.jar替换
postgresql的驱动jar,也同样可以连接操作GaussDB 。

如何提交开源贡献

当你完成了开源项目Beam的适配工作,并且新增的代码已经经过严格测试确认无误后,你可以通过提交一个Pull Request(PR)来请求官方社区将你的功能代码合并到项目中。在此之前,请务必仔细阅读项目的贡献指南和代码编写规范,以确保你的代码能够顺利地被项目接纳并合并。

诚邀各位技术达人加入到使用GaussDB序列的Beam社区,您有何疑问,都可在此推文评论区留言,或前往开源开发者专属的
问答板块
提问,热切希望能与您深入交流,共同探讨。

点击关注,第一时间了解华为云新鲜技术~

开心一刻

今早,女朋友给我发微信
她:宝贝,你要记住
她:我可是你女朋友,你相亲就亲,想抱就抱
她:不要老是问我,男生要主动一些
我:可是你上次报警可不是这么说的

开心一刻

基础回顾

Spring Boot 集成 Kafka 非常简单,我相信你们都会,但我还是想带你们回顾下;只需要进行以下几步即可完成 Spring Boot 与 Kafka 的集成

  1. 引入依赖

    如果只是单纯的集成,不考虑其他功能,那么添加如下依赖即可

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    

    Spring Boot 并没有提供
    starter
    的方式来集成 Kafka,不要一根筋的去找 starter


    如果还需要
    web
    功能,则可以像如下一样添加依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    

    依赖就是如此简单;扯个题外问题


    spring-kafka
    的版本在哪指定的?

  2. 添加配置

    如果 Kafka 未开启认证,那配置可以非常简单

    spring:
      kafka:
        bootstrap-servers: 192.168.0.87:9092,192.168.0.88:9092,192.168.0.89:9092
    

    但实际使用中,往往会开启认证,并对
    consumer
    做定制化配置,配置往往类似如下

    spring:
      kafka:
        bootstrap-servers: 192.168.0.87:9092,192.168.0.88:9092,192.168.0.89:9092
        consumer:
          # 自动提交消费位移
          enable-auto-commit: false
          # 偏移量初始位置
          auto-offset-reset: latest
          # 一次拉取记录最大数
          max-poll-records: 5
          properties:
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: PLAIN
            #sasl.mechanism: SCRAM-SHA-256
            #username、password需要调整成实际值
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
            #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
        listener:
          ack-mode: manual
        producer:
          properties:
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: PLAIN
            #sasl.mechanism: SCRAM-SHA-256
            #username、password需要调整成实际值
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
            #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";  
    

    也不复杂,相信你们都能看懂

  3. 进行使用

    分两点:
    消费消息

    发送消息

    消费消息
    实现很简单

    /**
     * @author: 青石路
     */
    @Component
    public class KafkaConsumer {
    
        private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @KafkaListener(topics = "tp_qsl_order_cancel", groupId = "gid_qsl_order_cancel")
        public void listenOrder(String message, Acknowledgment acknowledgment) {
            try {
                log.info("收到kafka message: {}", message);
                // TODO 业务处理
            } finally {
                acknowledgment.acknowledge();
            }
        }
    }
    

    监听的
    topic

    tp_qsl_order_cancel
    ,消费者组指定为
    gid_qsl_order_cancel
    ;这样,消费监听就算完成了

    发送消息
    实现同样简单,注入
    KafkaTemplate
    ,然后调用其
    send
    方法即可

    /**
     * @author: 青石路
     */
    @Component
    public class KafkaSender {
    
        private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void send(String topic, String msg) {
            kafkaTemplate.send(topic, msg).addCallback(
                    success -> {
                        if (success != null) {
                            log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                    success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                        }
                    },
                    failure -> {
                        log.error("消息发送失败:", failure.getCause());
                    }
            );
        }
    }
    

    KafkaTemplate 提供了多个 send 方法


    KafkaTemplate_send方法

    我们可以按需选择

上面 3 步都完成后,即可启动应用进行测试了

  1. 消费消息

    这个测试很简单,直接往
    tp_qsl_order_cancel
    topic 中发送一条消息即可


    发送消息

    点击
    发送消息
    后,控制台输出


    消费者日志

    消息正常消费,没有任何毛病

  2. 发送消息

    我加了一个
    OrderController

    /**
     * @author: 青石路
     */
    @RestController
    @RequestMapping("order")
    public class OrderController {
    
        @Resource
        private KafkaSender kafkaSender;
    
        @GetMapping("add")
        public String add(String orderInfo) {
            // TODO 订单业务处理
            // 下发消息到库存
            kafkaSender.send("tp_qsl_inventory_order_add", orderInfo);
            return "下单成功";
        }
    }
    

    便于测试消息发送;直接发起
    http
    请求

    http://localhost:8080/order/add?orderInfo=订单完整信息
    

    然后就可以去
    tp_qsl_inventory_order_add
    topic 中看消息是否发送成功


    消息发送成功

    消息正常发送,也没有任何毛病

至此,Spring Boot 集成 Kafka 就算大功告成了;如此简单,相信你们都能轻松拿捏

拿捏

Kafka 多源

上述只讲了单 Kafka 源的情况,也就是
消费消息

发送消息
针对的是同个 Kafka 源;但实际工作中,同个项目连接多个 Kafka 源的情况是非常常见的,我们就以 2 个 Kafka 源为例,从其中一个源消费消息、向另一个源发送消息,该如何实现?其实也不难,按以下几步调整即可

  1. 配置文件调整

    既然有 2 个 Kafka 源,那么我们的配置文件就需要配置 2 个,类似如下

    spring:
      kafka:
        first:
          bootstrap-servers: 192.168.0.87:9092,192.168.0.88:9092,192.168.0.89:9092
          consumer:
            # 自动提交消费位移
            enable-auto-commit: false
            # 偏移量初始位置
            auto-offset-reset: latest
            # 一次拉取记录最大数
            max-poll-records: 5
            # properties:
              # security.protocol: SASL_PLAINTEXT
              # sasl.mechanism: PLAIN
              #sasl.mechanism: SCRAM-SHA-256
              #username、password需要调整成实际值
              # sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
              #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
          listener:
            ack-mode: manual
        second:
          bootstrap-servers: 192.168.0.90:9092
          #producer:
            #properties:
              #security.protocol: SASL_PLAINTEXT
              #sasl.mechanism: PLAIN
              #sasl.mechanism: SCRAM-SHA-256
              #username、password需要调整成实际值
              #sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
              #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
    

    这里的
    first

    second
    不是固定的,你们想怎么命名就怎么命名;既然这么灵活,那 Spring Boot 肯定是不支持的,那么如上配置,Spring Boot 是识别不了的,相当于没配,此时去启动应用,Spring Boot 会启用默认配置去连接
    localhost:9092


    默认连接localhost的kafka

    所以我们需要自定义配置 Kafka,而一旦我们进行了自定义,那么 Spring Boot 则不会启用默认配置

  2. 自定义配置 Kafka

    针对每个 Kafka 源单独配置,配置内容比较固定


    1. FirstKafkaConfig

      /**
       * 第一个Kafka配置
       * @author: 青石路
       */
      @Configuration
      public class FirstKafkaConfig {
      
          @ConfigurationProperties(prefix = "spring.kafka.first")
          @Bean("firstKafkaProperties")
          public KafkaProperties firstKafkaProperties() {
              return new KafkaProperties();
          }
      
          @Bean("firstKafkaTemplate")
          public KafkaTemplate<String, String> firstKafkaTemplate() {
              return new KafkaTemplate<>(firstProducerFactory());
          }
      
          @Bean("firstKafkaListenerContainerFactory")
          public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> fisrtKafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(firstConsumerFactory());
              factory.getContainerProperties().setAckMode(firstKafkaProperties().getListener().getAckMode());
              return factory;
          }
      
          @Bean("firstConsumerFactory")
          public ConsumerFactory<String, String> firstConsumerFactory() {
              return new DefaultKafkaConsumerFactory<>(firstKafkaProperties().buildConsumerProperties());
          }
      
          @Bean("firstProducerFactory")
          public DefaultKafkaProducerFactory<String, String> firstProducerFactory() {
              return new DefaultKafkaProducerFactory<>(firstKafkaProperties().buildProducerProperties());
          }
      }
      
    2. SecondKafkaConfig

      /**
       * 第二个Kafka配置
       * @author: 青石路
       */
      @Configuration
      public class SecondKafkaConfig {
      
          @ConfigurationProperties(prefix = "spring.kafka.second")
          @Bean("secondKafkaProperties")
          public KafkaProperties secondKafkaProperties() {
              return new KafkaProperties();
          }
      
          @Bean("secondKafkaTemplate")
          public KafkaTemplate<String, String> secondKafkaTemplate() {
              return new KafkaTemplate<>(secondProducerFactory());
          }
      
          @Bean("secondKafkaListenerContainerFactory")
          public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> fisrtKafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(secondConsumerFactory());
              return factory;
          }
      
          @Bean("secondConsumerFactory")
          public ConsumerFactory<String, String> secondConsumerFactory() {
              return new DefaultKafkaConsumerFactory<>(secondKafkaProperties().buildConsumerProperties());
          }
      
          @Bean("secondProducerFactory")
          public DefaultKafkaProducerFactory<String, String> secondProducerFactory() {
              return new DefaultKafkaProducerFactory<>(secondKafkaProperties().buildProducerProperties());
          }
      }
      

    重点在


    @ConfigurationProperties(prefix = "spring.kafka.first")

    @ConfigurationProperties(prefix = "spring.kafka.second")


    多源之间不要配重、不要配混

  3. 调整消息监听与消息发送

    因为配置了多源,那么
    KafkaListenerContainerFactory
    也对应配置了多个,所以我们要指定用哪个 KafkaListenerContainerFactory 来创建消息监听器

    @KafkaListener(topics = "tp_qsl_order_cancel", groupId = "gid_qsl_order_cancel")
    
    // 调整成
    
    @KafkaListener(topics = "tp_qsl_order_cancel", groupId = "gid_qsl_order_cancel", containerFactory = "firstKafkaListenerContainerFactory")
    

    消费消息
    端就算调整完成;同理,
    KafkaTemplate
    也配置了多个,那么发送消息的时候也需要指定用哪个 KafkaTemplate 来发送

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    
    // 调整成
    
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    

    发送消息
    端也就调整完成

都调整完成后,我们启动应用,会发现启动失败,并提示如下信息

多源启动报错

这特喵的,跟预想的不一样吖

成龙_what

遇到问题先不要慌,我们仔细看下提示信息,我给你们翻译一下

KafkaAnnotationDrivenConfiguration 构造方法需要一个 KafkaProperties 实例,但在 Spring 容器中找到了 3 个,它懵圈了,不知道该使用哪一个

可以通过 @Primary 提高实例的优先级,或者使用 @Qualifier 明确指定使用哪个实例

所以处理方式就来了,使用 @Primary 来提高某个 KafkaProperties 实例的优先级,KafkaAnnotationDrivenConfiguration 就不会懵圈了,会使用优先级高的 KafkaProperties 实例

因为 KafkaAnnotationDrivenConfiguration 不是我们写的,没法通过 @Qualifier 明确指定

我们直接提高
firstKafkaProperties
的优先级

@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean("firstKafkaProperties")
@Primary
public KafkaProperties firstKafkaProperties() {
    return new KafkaProperties();
}

再启动应用,发现正常启动了;你们就可以进行
消费消息

发送消息
测试了,我就不演示了,反正我测试都是通过的,不信?不信就不信,你能把我怎么样嘛

来打我呀

启停配置化

Kafka 不管是单源还是多源,应用进行集成,都是非常合理的需求,我们开发做对应的实现也是应该的;但如下这个需求我多少是有点抵触的

客户方的环境有诸多约束、限制,权限管控非常严,还有各种防火墙,需要各种申请流程,非常耗时;项目分多个应用,应用之间存在交互(Kafka 是方式之一),每个应用的交付时间不一样,自然在客户环境的演示时间也不一样,所以为了演示不受 Kafka 的限制,需要给每个 Kafka 源增加一个开关配置,通过开关来分别控制每个 Kafka 源的启停

这里的启停指的是
启用

停用
;演示的时候,哪些 Kafka 源能正常使用就启用这些,哪些还不能使用就停用哪些,同时业务代码中也需要做适配调整。面对这个需求,你们说是不是不合理?所以你们能理解我的抵触了吧。但为了更好的演示,给甲方爸爸留下专业的印象,增加开关貌似是当下最合适的无奈之选,极不情愿的开启改造之旅

极不情愿
  1. 增加开关配置

    在配置文件中增加开关配置,每个 Kafka 源有其独立的配置,有几个源就配置几个开关

    spring:
      kafka:
        first:
          enabled: true
          ...
        second:
          enabled: true
          ...
    

    enabled
    配置成
    true
    表示启用,
    false
    表示停用

  2. 自定义配置适配开关

    需要根据开关值来决定是否启用
    FirstKafkaConfig

    SecondKafkaConfig
    ,Spring Boot 正好提供了一个具有该功能的注解:
    ConditionalOnProperty
    ,直接安排上

    /**
     * 第一个Kafka配置
     * @author: 青石路
     */
    @Configuration
    @ConditionalOnProperty(name = "spring.kafka.first.enabled", havingValue = "true")
    public class FirstKafkaConfig {
        ...
    
    /**
     * 第二个Kafka配置
     * @author: 青石路
     */
    @Configuration
    @ConditionalOnProperty(name = "spring.kafka.second.enabled", havingValue = "true")
    public class SecondKafkaConfig {
        ...
    

    这样就实现了通过开关来
    启停
    Kafka 源

  3. 消费消息与发送消息适配开关

    消费端适配很简单

    /**
     * @author: 青石路
     */
    @Component
    @ConditionalOnProperty(name = "spring.kafka.first.enabled", havingValue = "true")
    public class KafkaConsumer {
        ...
    

    发送端适配则有点不一样,方式有多种,我提供一种;修改 KafkaSender,改 2 处即可

    KafkaTemplate 调整成非强制依赖,将
    @Autowired

    required
    设置成 false

    @Autowired
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    // 调整成
    
    @Autowired(required = false)
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    

    使用 KafkaTemplate 时做 null 判断

    public void send(String topic, String msg) {
        kafkaTemplate.send(topic, msg).addCallback(
                success -> {
                    if (success != null) {
                        log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                    }
                },
                failure -> {
                    log.error("消息发送失败:", failure.getCause());
                }
        );
    }
    
    // 调整成
    
    public void send(String topic, String msg) {
        if (kafkaTemplate == null) {
            log.warn("未启用secondKafka,不发送消息");
            return;
        }
        kafkaTemplate.send(topic, msg).addCallback(
                success -> {
                    if (success != null) {
                        log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                    }
                },
                failure -> {
                    log.error("消息发送失败:", failure.getCause());
                }
        );
    }
    

至此改造就算完成;开关都为
true
的情况下,效果与未加开关前的多源是一致的,也就是正常的,我已经测过了,你们不放心的话自己再去测试一下;开关都为
false
时,相当于没注册消费监听器,也就相当于没有消费者,那么往
tp_qsl_order_cancel
topic 中发消息,是没有消费者消费消息的,那么控制台就不会有任何输出,同理,此时的
KafkaTemplate
是没有注册成功的(也就是 null),发起 http 请求

http://localhost:8080/order/add?orderInfo=大订单

控制台输出如下

启停配置化_都停用_发送消息日志

正是我们想要的效果,说明都为
false
的情况也是正确的;接下来我们看下
false、true
的情况

启停配置化_启动失败

好家伙,直接启动失败!但这个问题我们前面碰到过,那么如何处理呢?用
@Primary
标记
secondKafkaProperties
?假设我们这么做了,那开关都为
true
的情况下,
KafkaProperties
实例岂不是有多个
Primary
,Spring Boot 又会懵圈,不知道该使用哪个 KafkaProperties 实例,显然这种方式行不通;我们把问题拓展下,多个 KafkaProperties 实例存在的情况下,需要动态指定一个 Primary,但不能是 Spring Boot 自动配置的那个,即

spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties

除了这个,随便给哪个 KafkaProperties 实例指定成 Primary 都是没问题的,因为我们的业务代码中都明确指定了使用的是我们自定义的 kafka,所以我们需要在 Bean 实例化之前修改某个 KafkaProperties 的
BeanDefinition
,设置其 Primary 为 true;实现方式有很多,我这里提供一种:
BeanFactoryPostProcessor

/**
 * @author: 青石路
 */
@Component
public class KafkaPrimaryProcessor implements BeanFactoryPostProcessor {

    private static final Logger log = LoggerFactory.getLogger(KafkaPrimaryProcessor.class);

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        String[] beanNames = beanFactory.getBeanNamesForType(KafkaProperties.class);
        if (beanNames.length <= 1) {
            return;
        }
        for (String beanName : beanNames) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
            // springboot的自动配置
            if (beanName.contains(KafkaProperties.class.getName())) {
                continue;
            }
            log.info("多KafkaProperties,指定primary[{}]", beanName);
            beanDefinition.setPrimary(true);
            return;
        }
    }
}

这个代码相信你们都能看懂,会从多个 KafkaProperties BeanDefinition 中取第一个(除了自动配置的),设置其 Primary 为 true,所以我们还需要调整下
firstKafkaProperties
,拿掉其
@Primary

@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean("firstKafkaProperties")
@Primary
public KafkaProperties firstKafkaProperties() {
    return new KafkaProperties();
}

// 调整成

@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean("firstKafkaProperties")
public KafkaProperties firstKafkaProperties() {
    return new KafkaProperties();
}

这么调整之后,无论是有几个 Kafka 源,以及如何启停这些源,都能正常运转,是不是很优秀,值得鼓掌!

愣着干啥,鼓掌

话说,需求至此已经算完美实现了,可以完结了,但作为一个开发,尤其是一个有追求的开发,还有一个疑点未得到解决,心里始终不舒坦,是什么疑点呢,我们继续往下看

排除自动配置

既然是我们自定义配置 Kafka,不再依赖 Spring Boot 的自动配置,我们是不是可以排除掉 Spring Boot 的 Kafka 自动配置?理论上来说是可行的,那就干呗;直接排除掉
KafkaAutoConfiguration.class

/**
 * @author: 青石路
 */
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

既然排除了自动配置,那么也就不需要指定 KafkaProperties 的 Primary 了,
KafkaPrimaryProcessor
直接删掉,其他不用调整;将开关都设置成
true
,我们启动应用后测试下

  1. 发送消息

    直接 http 请求

    http://localhost:8080/order/add?orderInfo=排除自动配置
    

    日志显示发送成功


    排除自动配置_发送消息日志

    我们在看下 Topic
    tp_qsl_inventory_order_add
    中消息


    排除自动配置_发送消息_topic中消息

    发送消息是没问题的

  2. 消费消息

    往 Topic
    tp_qsl_order_cancel
    中发送消息


    排除自动配置_消费消息_topic中写消息

    点击
    发送消息
    后,发现控制台并没有任何输出!!!


    芭比Q了

    先别慌,我们冷静分析下,控制台没有任何输出说明消费者没注册成功,也就是
    @KafkaListener
    没生效,为什么没生效,肯定是没有被解析,谁解析它呢,KafkaListener 中应该有说明


    KafkaListener 描述

    已经描述的很清楚


    通过注册 KafkaListenerAnnotationBeanPostProcessor 来处理 KafkaListener 注解

    可以手动注册 KafkaListenerAnnotationBeanPostProcessor,也可以通过 EnableKafka 注解来注册


    EnableKafka
    方便点,我们使用它

    /**
     * @author: 青石路
     */
    @SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
    @EnableKafka
    public class KafkaApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    }
    

    重新启动应用,会发现控制台有如下输出


    EnableKafka

    消费消息也正常了;因为重启应用了,保险起见,发送消息最好再测一次,记得测!!!

至此,心中疑点得以解决,如此才算完美解决!

一脸得意

Tips:

跟进 KafkaAutoConfiguration,它有如下代码

@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })

跟进 KafkaAnnotationDrivenConfiguration,其最下面有如下代码

@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {

}

这里使用了
@EnableKafka
,这也是为什么自动配置(
KafkaAutoConfiguration
)能解析
@KafkaListener
的答案!

总结

  1. Kafka 多源实现,大家需要掌握,至于启停配置化,大家就当看个热闹

    但是启停配置化的实现(
    @ConditionalOnProperty
    ),还是值得大家掌握的

    Spring Boot 的条件注解非常多,在 Spring Boot 内部被广泛使用,感兴趣的可以查看:
    spring-boot-2.0.3源码篇 - @Configuration、Condition与@Conditional

  2. 如果不使用Spring Boot的自动配置,建议把对应的自动配置类排除掉

    自动配置

    手动配置
    同时存在的话可能会产生冲突,就像文中的
    KafkaProperties
    多实例;直接排除可能会导致缺少某些功能,肯定是没有启用这些功能的依赖,细心去寻找依赖并启用即可

  3. 完整代码:
    spring-boot-kafka

HybridCache是什么

在 .NET 9 中,Microsoft 将 HybridCache 带入了框架体系。

HybridCache 是一种新的缓存模型,设计用于封装本地缓存和分布式缓存,使用者无需担心选择缓存类型,从而优化性能和维护效率。

实际上,HybridCache 基于 IDistributedCache 提供的接口和操作,但增加了一些其他的特性,如封装两类不同缓存库(本地和分布),支持标签删除(Tag-based Cache Eviction)和约束选项。

需要注意
的是,HybridCache仍处于preview阶段。


HybridCache 与 IDistributedCache 的区别

IDistributedCache:

  1. 仅支持分布式缓存,如 Redis、SQL Server、MemoryCache。
  2. 选择依赖于目标缓存和管理设备。
  3. 不支持标签删除,只能基于键值操作。

HybridCache:

  1. 支持封装本地和分布式缓存,在读取时优先读取本地缓存,如本地不存在,再读取分布式。
  2. 支持标签删除,通过指定标签管理缓存内容。
  3. 选项更加精简,支持自动化操作和选项约束。


HybridCache 的好处

  1. 性能优化:
    本地缓存速度超过分布式,使用 HybridCache 可以减少读取分布缓存库时的延迟。
  2. 精简化工程:
    使用者不需再自行核实选择哪个缓存,增加了工程效率。
  3. 标签管理:
    缓存标签记录不同类型数据,便于分类管理和删除缓存。
  4. 安全性:
    支持选项约束,使缓存操作更严格,防止错误使用和内容亏失。


代码示例

以下代码展示如何使用 HybridCache:

1. 添加缓存服务
var builder = WebApplication.CreateBuilder(args);

// 注册 HybridCache 服务
builder.Services.AddHybridCache();

// 注册 Redis 缓存服务,为 HybridCache 提供分布式缓存
builder.Services.AddStackExchangeRedisCache(options =>
{
    options.Configuration = builder.Configuration.GetConnectionString("RedisConnectionString");
});

builder.Services.AddControllers();
2. 实现接口操作

读取缓存

[HttpGet("GetCache")]
public async Task<string[]> Get()
{
    return await _cache.GetOrCreateAsync(
        "a-1", async cancel => await Task.FromResult(Summaries)
    );
}

删除缓存

[HttpGet("DeleteCache")]
public async Task<bool> Delete()
{
    await _cache.RemoveAsync("a-1");
    return true;
}

通过标签读取缓存

[HttpGet("GetCacheByTag")]
public async Task<string[]> GetCacheByTag()
{
    var tags = new List<string> { "tag1", "tag2", "tag3" };
    var entryOptions = new HybridCacheEntryOptions
    {
        Expiration = TimeSpan.FromMinutes(1),
        LocalCacheExpiration = TimeSpan.FromMinutes(1)
    };
    return await _cache.GetOrCreateAsync(
        "a-1", async cancel => await Task.FromResult(Summaries),
        entryOptions, tags
    );
}

通过标签删除缓存

[HttpGet("DeleteCacheByTag")]
public async Task<bool> DeleteCacheByTag()
{
    var tags = new List<string> { "tag1" };
    await _cache.RemoveByTagAsync(tags);
    return true;
}


小结

.NET 9 的 HybridCache 提供了一种便捷且高效的缓存解决方案,将本地缓存和分布式缓存无缝结合,为开发者简化了缓存逻辑,同时提供了更多高级功能,如标签管理和选项约束。通过代码示例可以看出,HybridCache 的操作直观且易于实现,非常适合现代应用场景。

如果你正在使用 .NET 9,尝试将 HybridCache 应用于你的项目中,体验其高效与简洁!

推荐一个强大C#脚本引擎,方便我们在项目中,动态执行C#脚本。

01 项目简介

CS-Script是非常成熟的C#脚本引擎,自2004年起就发布了,即.NET发布后的两年。

支持托管和独立(CLI)执行模型,可以作为PowerShell的纯C#替代品,也可以通过托管脚本引擎在运行时扩展.NET应用程序。

允许在不影响代码库的情况下无缝切换底层编译技术,目前支持的编译器有dotnet.exe和csc.exe。

02 开发工具支持与平台兼容

与Visual Studio、VSCode、Sublime Text 3等开发工具有良好的集成,可以在Windows和Linux上运行,类库针对".NET Standard"进行编译,因此可以被任何托管应用程序托管。

03 使用方法

1、安装依赖库

Install-Package CS-Script

2、有返回值的的动态脚本

using CSScriptLib; 

// 使用CS-Script的Evaluator静态类加载一个动态方法
// 这个方法接受一个字符串形式的C#代码,并编译执行它
dynamic script = CSScript.Evaluator
    .LoadMethod(@"int Product(int a, int b)
                       {
                           return a * b; // 计算两个数的乘积
                       }");

// 调用加载的方法,并传入两个参数3和2
int result = script.Product(3, 2);

// 输出结果到控制台
Console.WriteLine(result); // 输出:6

3、无返回值的动态脚本

using CSScriptLib;

// 使用CS-Script的Evaluator静态类创建一个委托(Delegate)
// 这个委托指向一个接受一个字符串参数的方法,该方法将字符串输出到控制台
var log = CSScript.Evaluator
                  .CreateDelegate(@"void Log(string message)
                                    {
                                        Console.WriteLine(message); // 使用Console.WriteLine将传入的消息输出到控制台
                                    }");

// 调用创建的委托,并传入字符串"Test message"
log("Test message"); // 执行Log方法,输出"Test message"到控制台


4、简单表达式脚本

using CSScriptLib; 

// 使用CS-Script的Evaluator静态类的Eval方法直接评估一个C#表达式
// 这里评估的表达式是"6 + 3",即计算6和3的和
int sum = CSScript.Evaluator.Eval("6 + 3");

// 输出计算结果到控制台
Console.WriteLine(sum); // 打印计算结果,即9

5、实现了特定接口的C#脚本

using CSScriptLib; 

// 使用CS-Script的Evaluator静态类的LoadCode方法加载一个实现了ICalc接口的C#脚本
// 这个脚本定义了一个名为Script的类,它实现了ICalc接口中的Sum方法
ICalc calc = CSScript.Evaluator
    .LoadCode<ICalc>(@"using System;
                        public class Script: ICalc // 继承ICalc接口
                        {
                            public int Sum(int a, int b) // 实现Sum方法
                            {
                                return a + b; // 返回两个整数的和
                            }
                        }");

// 调用加载的脚本中的Sum方法,并传入参数1和2
int result = calc.Sum(1, 2);

// 输出结果到控制台
Console.WriteLine(result); // 打印Sum方法的执行结果,即3

04 项目地址

https://github.com/oleg-shilo/cs-script

更多开源项目:
https://github.com/bianchenglequ/NetCodeTop

- End -

推荐阅读

2个零基础入门框架教程!

一款可以替代Navicat的数据库管理工具

CSCore:一个.Net功能强大且灵活的开源音频处理库

Blazor开源UI简洁组件:10个热门.Net开源项目推荐!

ExcelDataReader:一个.Net高性能Excel开源读取器

C#类型系统

C# 是一种强类型语言。 每个变量和常量都有一个类型,每个求值的表达式也是如此。 每个方法声明都为每个输入参数和返回值指定名称、类型和种类(值、引用或输出)。 .NET 类库定义了内置数值类型和表示各种构造的复杂类型。 其中包括文件系统、网络连接、对象的集合和数组以及日期。 典型的 C# 程序使用类库中的类型,以及对程序问题域的专属概念进行建模的用户定义类型。

简单来说就是指:
类,参数,字段,属性,方法,模块,程序集
等元素

眼见为实

//命名空间(模块)=>类型系统
namespace Example_4_1
{
	//类属于类型系统
    class UserLogin
    {
		//字段,属性=>类型系统
        private string username;
        private string password;
		//构造方法=>类型系统
        public UserLogin(string username, string password)
        {
            this.username = username;//参数=>类型系统
            this.password = password;
        }
		//方法=>类型系统
        public bool Login(string inputUsername, string inputPassword)
        {
            if (inputUsername == username && inputPassword == password)
            {
                return true;
            }
            else
            {
                return false;
            }
        }
    }
}

编译器使用类型信息来确保在代码中执行的所有操作都是类型安全的,比如

int a = 5;
int b = a + 2; //OK

bool test = true;

// Error. Operator '+' cannot be applied to operands of type 'int' and 'bool'.
int c = a + test;

IL类型系统

编译器将类型信息作为元数据(metadata)嵌入到程序中。所以C#的类型系统,在IL层面叫
"metadata"

眼见为实

image

CLR类型系统

CLR在运行时使用metadata来进一步保证类型安全,避免出现类型转换错误。

眼见为实

C#中所有的类型元素,在 CLR 上都有对应的类与之承载
使用windbg来一探究竟

  1. AppDomain
    image
    在.net core中 出于跨平台需要,相对.net framework只剩下了两个,System Domain与Domain 1

  2. Assembly
    image

  3. Module
    image

  4. Class
    image
    我们可以用二级命令来显示,模块中定义的类型(class),以及模块引用的类型。

  5. Method
    image
    同时也显示了父类objcet的方法

  6. Field
    image
    dump方法表中的EEClass,得出类的字段

CLR类型系统与C++类型系统的映射

网络上有一种说法,C#中的这个#,实际上是++++。相当于C++的超集C++++。那么为什么这么说呢?从CLR的角度出发,CLR中所有类型,在C++都有一一对应。

https://github.com/dotnet/runtime/blob/main/src/coreclr/vm/appdomain.cpp
https://github.com/dotnet/runtime/blob/main/src/coreclr/vm/class.cpp
https://github.com/dotnet/runtime/blob/main/src/coreclr/vm/field.cpp

值类型与引用类型

在面试八股文中,有一个经常出现的问题:值类型与引用类型的区别?
而这个问题,一个高频次的答案就是:Class是引用类型,struct是值类型。值类型分配在栈用,引用类型分配在堆中。
这个说法说并不准确,为什么呢?因为它是从实现的角度对两个概念进行描述,相当于先射箭再画靶。而不是基于两种类型内在的真正差别

ECMA335对两种类型真正的定义

值类型:这种类型的实例直接包含其所有数据。值类型的值是自包含,自解释的
引用类型:这种类型的实例包含对其数据的引用。引用类型所描述的值是指示其他值的位置

值类型 引用类型
生存期 包含其所有数据,自包含,自解释。值类型包含的数据生存期与实例本身一样长 描述了其他值的位置,其他值的生存期并不取决于引用类型值本身
共享性 不可共享,如果我们想在其他地方使用它。默认使用“传值”语义,按字节复制一份,原始值不受影响。 可被共享,如果我们想在如果我们想在其他地方使用它。默认使用”传引用“语义。因此在传递之后,会多出一个指向同一个位置的引用类型实例。
相等性 仅当它们的值的二进制序列一样时才认为相同 当它们所指示的位置一样就认为相同

从定义中可以看出,没有地方说明,谁存储在栈中,谁存储在堆中。
实际上,
值类型分配在栈上,引用类型分配在堆上。只是微软在设计CLI标准时根据实际情况所作出的一个设计决策。
由于它确实是一个非常好的决策,因此微软在实现不同的CLI时,沿用了这个决策。但请记住,这并不是银弹,不同的硬件平台有不同的设计
事实上类型的存储实现,主要通过JIT编译的设计来体现。JIT编译器在x86/x64的硬件平台上,由于有栈,堆,寄存器存在。JIT可以随意使用,只要它愿意,它可以把值类型分配在堆中,分配在寄存器中都是可以的。只要不违反类型的定义,又有何不可呢?

值类型内存布局

如果仅从定义出发,将所有值类型保存在堆上是完全可行的,只是使用栈或者CPU寄存器实在太香了而已。
因此主要考虑生存期与共享这两个因素,放在栈空间中更加合适。

眼见为实

    internal class Program
    {
        static void Main(string[] args)
        {
            var myStruct = new MyStruct();
            myStruct.x = 100;
            myStruct.y = 102; 
        }

    }
    struct MyStruct
    {
        public int x;
        public int y;
    }

image

可以看到,值类型的内存布局没有任何额外开销,直接在线程栈中完成分配,并随线程栈释放。完美契合符合生存期/共享性的概念。

引用类型的内存布局

由于引用类型可以共享数据,因此它们的生存期并不确定。所以考虑引用类型存储到哪里要比值类型要简单得多。
通常来说,
引用类型存储在栈上不符合定义
,此时哪里能存储引用类型就很明显了。
image

眼见为实

    internal class Program
    {
        static void Main(string[] args)
        {
            var myClass = new MyClass();
            myClass.x = 100;
            myClass.y = 102;

            Debugger.Break();
        }

    }

    class MyClass
    {
        public int x;
        public int y;
    }

image
与值类型的汇编相比较,可以很明显的看出差异。

  1. 值类型的汇编,直接把100和102赋值给了rbp寄存器的偏移量上,从而实现跟随栈空间一起释放
  2. 引用类型的汇编,则是先在托管堆中创建一个内存空间,再将内存地址赋值给rax寄存器,再基于内存地址的偏移量进行赋值操作。不会随着栈空间一起释放

眼见为实

使用windbg也可以验证,MyClass对象分配在托管堆中,并结构为objHeader + methodtable + 对象本身
image

值类型一定存储在栈空间中吗?

值类型除了可以存储在“栈”上,也可以在“寄存器”,“托管堆”中

眼见为实:值类型在寄存器中

    internal class ExampleStruct
    {
        public int Main(int i)
        {
            var myStruct = new MyStruct();
            myStruct.vaule1 = i;
            return Helper(myStruct);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]//告诉编译器,让方法尽量内联
        private int Helper(MyStruct arg)
        {
            return arg.vaule1;
        }
    }

    struct MyStruct
    {
        public int vaule1;
        public int vaule2;
        public int vaule3;
        public int vaule4;
    }

在64位release模式下,因为Helper方法被内联。所以并不会调用Helper方法,因此省略了传递Struct数据给Help方法,JIT编译器将整个操作优化成只需要对CPU寄存器进行操作。

可能是环境问题,我的release版本始终没有内联,导致无法在windbg中复现。有兴趣的小伙伴可以参考<.NET内存管理宝典> 168页内容。

眼见为实:值类型在托管堆中

    internal class ExampleStruct2
    {
        public void Main()
        {
            var myStruct = new MyStruct2
            {
                vaule1 = 100,
                vaule2 = 102
            };
			
			//因为委托是引用类型,引用类型内部引用一个值类型,也会把值类型提升至托管堆
            var f = () =>
            {
                Console.WriteLine(t.vaule1);
                Console.WriteLine(t.vaule2);
            };  
            f();
            Debugger.Break();
        }
    }
    struct MyStruct2
    {
        public int vaule1;
        public int vaule2;
        public int vaule3;
        public int vaule4;
    }

image

可以看到,当值类型被引用类型所持有时,同样会分配在堆空间中。

引用类型一定存储在堆空间中吗?

在.NET9 之前,这句话是成立的,因为栈空间不符合引用类型的定义。

但在.NET9之后,这个概念发生了改变。我们先来思考一段代码

    public class ExampleClass
    {
        public void Main()
        {
            var myClass = new MyClass()
            {
                vaule1 = 100,
                vaule2 = 102
            };
            Console.WriteLine(myClass.vaule1);
            Console.WriteLine(myClass.vaule2);
        }
    }
    public class MyClass
    {
        public int vaule1;
        public int vaule2;
        public int vaule3;
        public int vaule4;
    }

虽然MyClass是一个引用类型,但在方法中,myClass实际上随着Main方法的执行完成而不再使用。因此把myClass放在堆空间中,会造成GC的负担以及内存浪费。能不能让JIT更智能一点?如果引用类型的生命周期与线程栈类似,是否可以放在栈空间中呢?

答案是肯定的,而且在JAVA中已经运用很多年了,这就是大名鼎鼎的逃逸分析(escape analysis)

.NET9 的逃逸分析(escape analysis)

.NET9 刚刚启用此特性,因此范围比较有限。不过在未来,相信会进一步扩大范围,实现更高性能的内存分配。

.NET 9内存分配:
image
.NET 8内存分配:
image

再引用一个官方的例子
https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-9/#object-stack-allocation

// dotnet run -c Release -f net8.0 --filter "*" --runtimes net8.0 net9.0

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

BenchmarkSwitcher.FromAssembly(typeof(Tests).Assembly).Run(args);

[MemoryDiagnoser(false)]
[DisassemblyDiagnoser]
[HideColumns("Job", "Error", "StdDev", "Median", "RatioSD")]
public class Tests
{
    [Benchmark]
    public int GetValue() => new MyObj(42).Value;

    private class MyObj
    {
        public MyObj(int value) => Value = value;
        public int Value { get; }
    }
}

在.net 8 中的汇编如下:

; Tests.GetValue()
       push      rax
       mov       rdi,offset MT_Tests+MyObj
       call      CORINFO_HELP_NEWSFAST
       mov       dword ptr [rax+8],2A
       mov       eax,[rax+8]
       add       rsp,8
       ret
; Total bytes of code 31

在.net 8中内存分配如下
image

在.net 9中的汇编如下

; Tests.GetValue()
       mov       eax,2A
       ret
; Total bytes of code 6

在.net 9中内存分配如下
image

可以看到,.NET9通过方法内联,直接将new MyObj(42).Value提升为 return 42 . 不会在堆中创建MyObj对象,而是直接在栈空间赋值。

总结

以上例子可以看到,值类型与引用类型其内核就是生命周期是否可控,是否被其他线程共享?无论什么类型,只要它生存期大于线程栈或者被其他线程所共享访问。那么它就会被分配在堆上。反之,则分配在栈或者寄存器上。
更简单来说, JIT如果不知道对象什么时候被释放,那么它一定会分配到堆空间中。如果知道什么时候被释放,那么它会尽量分配到栈空间中,甚至寄存器中。