2024年2月

为什么使用消息队列?

以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务,传统的调用方式是同步调用,这会存在一定的性能问题

使用消息队列可以实现异步的通信方式,相比于同步的通信方式,异步的方式可以让上游快速成功,极大提高系统的吞吐量

消息队列的使用场景有如下:

  • 异步处理:以上述用户下单购买商品为例,将多个不关联的任务放进消息队列,提高系统性能
  • 应用解耦:以上述用户下单购买商品为例,订单系统通知库存系统减库存,传统的做法是订单系统调用库存系统的接口,订单系统和库存系统高耦合,当库存系统出现故障时,订单就会失败。使用消息队列,用户下单后,订单系统完成持久化,将消息写入消息队列,返回用户下单成功。库存系统订阅下单消息,获取下单消息,进行减库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致系统异常
  • 流量削峰:举行秒杀活动时,为防止流量过大导致应用挂掉,服务器收到用户请求后,先写入消息队列,如果超过了消息队列长度的最大值,则直接抛弃或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理,缓解短时间的流量高峰


RabbitMQ 安装

以 ubuntu 22.04.3 为例,参考 RabbitMQ 官网提供的安装脚本

#!/bin/sh

## 要想安装最新版本的 rabbitmq,可以选择 Cloudsmith 存储库下载,为此我们必须安装 apt-transport https 包
sudo apt-get install curl gnupg apt-transport-https -y

## 获取 Cloudsmith 存储库提供的签名密钥并添加到系统中,这样这样才能使用 Cloudsmith 仓库下载包
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null

## 将描述 RabbitMQ 和 Erlang 包存储库的文件放在 /etc/apt/sources.list.d/ 目录下
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

# another mirror for redundancy
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

## Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main

# another mirror for redundancy
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF

## 更新存储库索引
sudo apt-get update -y

## 安装 erlang
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## 安装 rabbitmq
sudo apt-get install rabbitmq-server -y --fix-missing

使用以下命令启动、关闭和查看 rabbitmq 状态

sudo systemctl stop rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl status rabbitmq-server

要想访问 rabbitmq 的 web 管理界面,需要执行以下命令,启动 rabbitmq 的插件管理

rabbitmq-plugins enable rabbitmq_management

访问:
http://127.0.0.1:15672/
可查看 rabbitmq 的 web 管理界面,但首先要创建用户,这里创建一个管理员用户,使用该用户登录

# rabbitmqctl add_user {username} {password}
# 设置账号和密码
rabbitmqctl add_user admin 123

# rabbitmqctl set_user_tags {username} {role}
# 设置角色,administrator 是管理员角色
rabbitmqctl set_user_tags admin

# rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
# 设置权限:
# {vhost} 表示待授权用户访问的 vhost 名称,默认为 "/"
# {user} 表示待授权访问特定 vhost 的用户名称
# {conf} 表示待授权用户的配置权限,是一个匹配资源名称的正则表达式
# {write} 表示待授权用户的写权限,是一个匹配资源名称的正则表达式
# {read} 表示待授权用户的读权限,是一个资源名称的正则表达式
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

更多 rabbitmqctl 命令可参考官网:
https://www.rabbitmq.com/rabbitmqctl.8.html


RabbitMQ 组成与消息模型

RabbitMQ 是使用 Erlang 语言开发的开源的消息队列,基于 AMQP(高级消息队列协议)实现

RabbitMQ 的组成部分如下:

  • Message:消息,又可分为消息头和消息体,消息头由一系列可选属性组成
  • Producer:消息生产者,是向交换器发布消息的客户端应用程序
  • Consumer:消息消费者,从消息队列获取消息的客户端应用程序
  • Exchange:交换器,接收生产者发送的消息并路由到相应的队列,常用的交换器类型有 direct、fanout、topic
  • Binding:绑定,用于关联消息队列和交换器
  • Queue:消息队列,保存消息直到放送给消费者
  • Rounting-key:路由键,决定信息该投递到哪个队列的规则
  • Connection:链接,指应用与 rabbit 服务器建立的 TCP 链接
  • Channel:信道,TCP 里面的虚拟链接,一条 TCP 链接上可以创建多条信道,可以避免频繁创建和销毁 TCP 连接所带来的开销
  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象,是共享相同身份认证和加密环境的独立服务器域,可以理解为一个 mini 版的 RabbitMQ 服务器
  • Broker:消息队列服务器实体

组件协同工作的执行流程如下:

  • 消息生产者连接到 RabbitMQ Broker,创建 connection,开启 channel
  • 生产者声明交换机类型、名称、是否持久化等
  • 生产者发送消息,并指定消息是否持久化等属性,指定 routing key
  • exchange 收到消息后,根据 routing key 将消息路由到跟当前交换机绑定的相匹配的队列
  • 消费者监听消息队列,接收到消息后开始业务处理

从上述流程我们可以看到,消息首先要经过 Exchange 路由才能找到对应的 Queue。Exchange 有四种类型,分别是 Direct、Fanout、Topic、Headers

1. Direct Exchange

直连交换机,是一种带路由功能的交换机,需要绑定一个队列,绑定时要指定一个 RoutingKey(路由键)。生产者把消息发送到交换机时,也必须指定消息的 RoutingKey(路由键)。 Exchange 根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 与消息的 RoutingKey 一致,才会接收到消息

2. Fanout Exchange

扇形交换机,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机。生产者把消息发送到交换机,交换机把消息发给绑定过的所有队列,队列的消费者都能拿到消息,实现一条消息被多个消费者消费

3. Topic Exchange

主题交换机和直连交换机类似,也可以根据 RoutingKey 把消息路由到不同的队列,只不过 Topic 类型的交换机可以让队列在绑定 RoutingKey 时使用通配符。这种模型的 RoutingKey 一般由一个或多个单词组成,多个单词以
.
符号分割

通配符有两种:

  • *
    符号:匹配一个词,比如
    a.*
    可以匹配
    a.b

    a.c
    ,但是匹配不了
    a.b.c
  • #
    符号:匹配一个或多个词,比如
    rabbit.#
    可以匹配
    rabbit.a.b

    rabbit.a
    ,也可以匹配
    rabbit.a.b.c

4. Headers Exchange

头部交换机不是用 RoutingKey 进行路由匹配,而是匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列


SpringBoot 整合 RabbitMQ

生产者端引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者端添加配置

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: 123

生产者端配置队列、交换机

@Configuration
public class RabbitMqConfig {

  @Bean
  public Queue rabbitmqTestDirectQueue() {
      // Direct 队列
      // name:队列名称
      // durable:是否持久化
      // exclusive:是否独享,如果设置 true,则只有创建者可以使用此队列
      // autoDelete: 是否自动删除,也就是临时队列,当最后一个消费者断开连接就会自动删除
      return new Queue("test_direct_queue", true, false, false);
  }

  @Bean
  public DirectExchange rabbitmqTestDirectExchange() {
      // Direct 交换机
      return new DirectExchange("test_direct_exchange", true, false);
  }

  @Bean
  public Binding bindDirect() {
      return BindingBuilder
              // // 绑定 Direct 队列
              .bind(rabbitmqTestDirectQueue())
              //到 Direct 交换机
              .to(rabbitmqTestDirectExchange())
              // 并设置匹配键
              .with("test_direct_routing");
  }

  @Bean
  public Queue rabbitmqTestFanoutQueueA() {
      // fanout 队列 a
      return new Queue("test_fanout_queue_a", true, false, false);
  }

  @Bean
  public Queue rabbitmqTestFanoutQueueB() {
      // fanout 队列 b
      return new Queue("test_fanout_queue_b", true, false, false);
  }

  @Bean
  public FanoutExchange rabbitmqTestFanoutExchange() {
      // Fanout 交换机
      return new FanoutExchange("test_fanout_exchange", true, false);
  }

  @Bean
  public Binding bindFanoutA() {
      return BindingBuilder
              // 绑定 Fanout 队列 a
              .bind(rabbitmqTestFanoutQueueA())
              //到 Fanout 交换机
              .to(rabbitmqTestFanoutExchange());
  }

  @Bean
  public Binding bindFanoutB() {
      return BindingBuilder
              // 绑定 Fanout 队列 b
              .bind(rabbitmqTestFanoutQueueB())
              //到 Fanout 交换机
              .to(rabbitmqTestFanoutExchange());
  }

  @Bean
  public Queue rabbitmqTestTopicQueueA() {
      // topic 队列 a
      return new Queue("test_topic_queue_a", true, false, false);
  }

  @Bean
  public Queue rabbitmqTestTopicQueueB() {
      // topic 队列 b
      return new Queue("test_topic_queue_b", true, false, false);
  }

  @Bean
  public TopicExchange rabbitmqTestTopicExchange() {
      // Topic 交换机
      return new TopicExchange("test_topic_exchange", true, false);
  }

  @Bean
  public Binding bindTopicA() {
      return BindingBuilder
              // 绑定 Topic 队列 a
              .bind(rabbitmqTestTopicQueueA())
              //到 Topic 交换机
              .to(rabbitmqTestTopicExchange())
              // 并设置匹配键
              .with("a.*");
  }

  @Bean
  public Binding bindTopicB() {
      return BindingBuilder
              // 绑定 Topic 队列 b
              .bind(rabbitmqTestTopicQueueB())
              //到 Topic 交换机
              .to(rabbitmqTestTopicExchange())
              // 并设置匹配键
              .with("b.*");
  }

  @Bean
  public Queue rabbitmqTestHeadersQueueA() {
      // headers 队列 a
      return new Queue("test_headers_queue_a", true, false, false);
  }

  @Bean
  public Queue rabbitmqTestHeadersQueueB() {
      // headers 队列 b
      return new Queue("test_headers_queue_b", true, false, false);
  }

  @Bean
  public HeadersExchange rabbitmqTestHeadersExchange() {
      // Headers 交换机
      return new HeadersExchange("test_headers_exchange", true, false);
  }

  @Bean
  public Binding bindHeadersA() {
      Map<String, Object> map = new HashMap<>();
      map.put("key_a1", "a1");
      map.put("key_a2", "a2");
      return BindingBuilder
              // 绑定 Headers 队列 a
              .bind(rabbitmqTestHeadersQueueA())
              //到 Headers 交换机
              .to(rabbitmqTestHeadersExchange())
              // 全部匹配
              .whereAll(map).match();
  }

  @Bean
  public Binding bindHeadersB() {
      Map<String, Object> map = new HashMap<>();
      map.put("key_b1", "b1");
      map.put("key_b2", "b2");
      return BindingBuilder
              // 绑定 Headers 队列 b
              .bind(rabbitmqTestHeadersQueueB())
              //到 Headers 交换机
              .to(rabbitmqTestHeadersExchange())
              // 部分匹配
              .whereAny(map).match();
  }
}

生产者端发送消息

@Controller
public class RabbitMQController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendDirectMsg")
    public String sendDirectMsg() {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "test_send_direct_msg");
        rabbitTemplate.convertAndSend("test_direct_exchange", "test_direct_routing", map);
        return "OK";
    }

    @GetMapping("sendFanoutMsg")
    public String sendFanoutMsg() {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "test_send_fanout_msg");
        rabbitTemplate.convertAndSend("test_fanout_exchange", "", map);
        return "OK";
    }

    @GetMapping("sendTopicMsgA")
    public String sendTopicMsgA() {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "test_send_topic_msg_a");
        rabbitTemplate.convertAndSend("test_topic_exchange", "a.c", map);
        return "OK";
    }

    @GetMapping("sendTopicMsgB")
    public String sendTopicMsgB() {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "test_send_topic_msg_b");
        rabbitTemplate.convertAndSend("test_topic_exchange", "b.c", map);
        return "OK";
    }

    @GetMapping("sendHeadersMsgA")
    public String sendHeadersMsgA() {

        MessageProperties msgp = new MessageProperties();
        msgp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        msgp.setContentType("UTF-8");

        Map<String, Object> map = new HashMap<>();
        map.put("key_a1", "a1");
        map.put("key_a2", "a2");
        msgp.getHeaders().putAll(map);

        Message msg = new Message("test_send_headers_msg_a".getBytes(), msgp);
        rabbitTemplate.convertAndSend("test_headers_exchange", null, msg);
        return "OK";
    }

    @GetMapping("sendHeadersMsgB")
    public String sendHeadersMsgB() {

        MessageProperties msgp = new MessageProperties();
        msgp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        msgp.setContentType("UTF-8");

        Map<String, Object> map = new HashMap<>();
        map.put("key_b1", "a1");
        map.put("key_b3", "b3");
        msgp.getHeaders().putAll(map);

        Message msg = new Message("test_send_headers_msg_b".getBytes(), msgp);
        rabbitTemplate.convertAndSend("test_headers_exchange", null, msg);
        return "OK";
    }
}

消费端引入的依赖和配置与生产端一样,接收消息

/**
 * Direct 队列消费者
 */
@Slf4j
@Component
@RabbitListener(queues = {"test_direct_queue"})
public class RabbitMqDirectReceiver {
  
  @RabbitHandler
  public void process(Map<String, Object> map) {
    log.info("消费者接收到消息: {}", map.toString());
  }
}
/**
 * Fanout 队列消费者 a
 */
@Slf4j
@Component
@RabbitListener(queues = {"test_fanout_queue_a"})
public class RabbitMqFanoutAReceiver {
  
  @RabbitHandler
  public void process(Map<String, Object> map) {
    log.info("消费者接收到消息: {}", map.toString());
  }
}
/**
 * Fanout 队列消费者 b
 */
@Slf4j
@Component
@RabbitListener(queues = {"test_fanout_queue_b"})
public class RabbitMqFanoutBReceiver {
  
  @RabbitHandler
  public void process(Map<String, Object> map) {
    log.info("消费者接收到消息: {}", map.toString());
  }
}
/**
 * Topic 队列消费者 a
 */
@Slf4j
@Component
@RabbitListener(queues = {"test_topic_queue_a"})
public class RabbitMqTopicAReceiver {
  
  @RabbitHandler
  public void process(Map<String, Object> map) {
    log.info("消费者接收到消息: {}", map.toString());
  }
}
/**
 * Topic 队列消费者 b
 */
@Slf4j
@Component
@RabbitListener(queues = {"test_topic_queue_b"})
public class RabbitMqTopicBReceiver {
  
  @RabbitHandler
  public void process(Map<String, Object> map) {
    log.info("消费者接收到消息: {}", map.toString());
  }
}
/**
 * Topic 队列消费者 a
 */
@Slf4j
@Component
@RabbitListener(queues = {"test_topic_queue_a"})
public class RabbitMqTopicAReceiver {
  
  @RabbitHandler
  public void process(Map<String, Object> map) {
    log.info("消费者接收到消息: {}", map.toString());
  }
}
/**
 * Headers 队列消费者 a
 */
@Slf4j
@Component
public class RabbitMqHeadersAReceiver {
  
  @RabbitListener(queuesToDeclare = @Queue("test_headers_queue_a"))
  public void process(Message msg) throws Exception {
    MessageProperties msgp = message.getMessageProperties();
    String contentType = msgp.getContentType();
    log.info("消费者接收到消息: {}", new String(message.getBody(), contentType));
  }
}
/**
 * Headers 队列消费者 b
 */
@Slf4j
@Component
public class RabbitMqHeadersBReceiver {
  
  @RabbitListener(queuesToDeclare = @Queue("test_headers_queue_b"))
  public void process(Message msg) throws Exception {
    MessageProperties msgp = message.getMessageProperties();
    String contentType = msgp.getContentType();
    log.info("消费者接收到消息: {}", new String(message.getBody(), contentType));
  }
}


RabbitMQ 消息确认机制

消费者从队列中获取到消息之后,在处理消息时出现异常,那这条正在处理的消息就没有完成消息消费,数据就会丢失。生产者同样如此,生产者发消息给交换机,也不能保证消息准确发送过去了

RabbitMQ 的消息确认分为两部分:

  • 消息发送确认:用来确认生产者是否成功将消息投递到队列
  • 消息接收确认:用来确认消费者是否成功接收到消息

1. 生产端确认

RabbitMQ 提供了两种机制,用于告知生产端是否发送消息成功:

  • publisher-confirm:消息投递交换机,返回成功/失败信息
  • publisher-return:消息投递交换机成功,但路由到队列失败,返回失败信息

生产端配置如下:

spring:
  rabbitmq:
    # 开启 publisher-confirm
    publisher-confirm-type: correlated
    # 开启 publisher-return
    publisher-returns: true

publish-confirm-type
有三个值:

  • none:禁用发布确认模式,默认值
  • simple:同步等待 confirm 结果,直到超时
  • correlated:异步通知 confirm 结果,需要定义回调函数 ConfirmCallback

生产端配置 ConfirmCallback 函数和 ReturnCallback 函数

@Slf4j
@Configuration
public class ProviderCallBackConfig {
 
    @Autowired
    private CachingConnectionFactory factory;
 
    @Bean
    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        // mandatory 设置为 true,若 exchange 无法找到合适的 queue 存储消息就会调用 basic.return 方法将消息返还给生产者
        //  mandatory 设置为 false 时,出现上述情况 broker 会直接将消息丢弃
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
          log.info("发送消息至 exchange, 消息唯一标识: {}, 确认状态: {}, 造成原因: {}",correlationData, ack, cause);
        });
 
        rabbitTemplate.setReturnsCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("发送消息至 queue 失败, 消息:{}, 回应码:{}, 回应信息:{}, 交换机:{}, 路由键:{}", message, replyCode, replyText, exchange, routingKey);
        });

        return rabbitTemplate;
    }
}

生产者发送消息设置消息唯一标识

@GetMapping("sendDirectMsg")
public String sendDirectMsg() {

    CorrelationData data = new CorrelationData();
    data.setId("666666")

    Map<String, Object> map = new HashMap<>();
    map.put("msg", "test_send_direct_msg");

    rabbitTemplate.convertAndSend("test_direct_exchange", "test_direct_routing", map, data);
    return "OK";
}

2. 消费者确认

RabbitMQ 支持消息确定 ACK 机制,消费者从 RabbitMQ 收到消息并处理完成后,返回给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列删除

RabbitMQ 的消息确认方式有两种:自动确认和手动确认

RabbitMQ 默认是自动确认,即消息推送给消费者后,马上确认并销毁,但假如消费消息的过程中发生了异常,由于消息已经销毁,这样就会造成消息丢失

手动确认又分为肯定确认和否定确认

肯定确认:

// 第一个参数表示当前的投递标签号,相当于当前消息的 Id
// 第二个参数表示是否批量确认,true 表示批量确认当前及之前的所有消息,false表示只确认当前消息
channel.basicAck(envelope.getDeliveryTag(), false);

否定确认:

// 第一个参数表示当前的投递标签号,相当于当前消息的 Id
// 第二个参数表示是否批量拒绝,true 表示所有投递标签小于当前消息且未确认的消息都将被拒绝,false 表示仅拒绝当前消息
// 第三个参数表示被拒绝的消息是否重新放回队列,true 表示消息重新放回队列投递,false 表示丢弃该消息
channel.basicNack(envelope.getDeliveryTag(), false, true);
# rejeck 与 nack 作用相同,但不支持批量操作
channel.basicReject(envelope.getDeliveryTag(), true);

Springboot 提供了三种确认模式,配置如下:

# none:默认所有消息消费成功
# auto:根据消息处理逻辑是否抛出异常自动发送 ack(正常)或 nack(异常)
# manual:消费者手动调用 ack、nack、reject 几种方法进行确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

整合 SpringBoot 代码如下:

@Slf4j
@Component
public class MsgConfirmController {

    @RabbitListener(queues = "test_confirm_queue")
    public void consumerConfirm(Message message, Channel channel) throws Exception {
        if(message.getBody().equals("2")) {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
            log.info("接收的消息为: {}", message);
        } else {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            log.info("未消费数据");
        }
    }
}


RabbitMQ 推拉模型

RabbitMQ 有两种消息处理模式:推模式和拉模式

推模式下,生产者发布消息到队列时,会立即将这条消息发送给所有订阅该队列的消费者,优点:实现实时通信,缺点:如果消费者的处理能力跟不上生产者的速度,就会在消费者处造成消息堆积,因此需要根据消费能力做流控(比如 RabbitMQ 用 QOS 来限制),RabbitMQ 默认使用推消息

拉模式下,生产者发布消息到队列时,不会立即发送消息给消费者,而是等待消费者请求消息后才发送,优点:消费端可以按照自己的处理速度来消费,缺点:消息传递存在延迟,当处理速度小于发布速度时,容易造成消息堆积在队列

SpringBoot 实现拉消息代码如下:

@Slf4j
@Component
public class RabbitMQPullConsumer {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  public void process() {
    rabbitTemplate.execute(new ChannelCallback<Object>() {
      Object result;
      GetResponse response;
      try {
        response = channel.basicGet("test_pull_queue", false);
        result = new String(response.getBody(),  "UTF-8");
        log.info("消费者接收到消息: {}", result);
        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
      } catch(Exception e) {
        log.info("消费者接收消息失败", e);
        if(response != null) {
          try {
            channel.basicAck(response.getEnvelope().getDeliveryTag(), false, true);
          } catch(Exception e) {
            log.info("消费者拒绝消息失败", e);
          }
        }
      }
    }
  }
}

RabbitMQ 的 Channel 提供了 basicGet 方法用于拉取消息,第二个参数为是否自动 ack。这里我们需要手动调用 process 方法来拉取消息,一般来说会让一个线程负责循环拉取消息,存入一个长度有限的阻塞队列,另一个线程从阻塞队列取出消息,处理完一条则手动 Ack 一条。如果想批量拉取消息,可以连续调用 basicGet 方法拉取多条消息,处理完成之后一次性 ACK

当我们想要封装一些自定义功能给别人使用的时候,创建Spring Boot Starter的形式是最好的实现方式。如果您还不会构建自己的Spring Boot Starter的话,本文将带你一起创建一个自己的Spring Boot Starter。

快速入门

  1. 创建一个新的 Maven 项目。第三方封装的命名格式是
    xxx-spring-boot-starter
    ,例如:
    didispace-spring-boot-starter

  2. 编辑
    pom.xml
    ,添加
    spring-boot-autoconfigure

    spring-boot-starter
    依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
</dependencies>
  1. 创建一个用
    @Configuration
    注释的配置类,在这里您可以使用
    @Bean
    来创建使用
    @ConditionalOnClass

    @ConditionalOnMissingBean
    等条件注释来控制何时应用配置。
@Configuration
@ConditionalOnClass(MyFeature.class)
@ConditionalOnProperty(prefix = "myfeature", name = "enabled", matchIfMissing = true)
public class MyFeatureAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public MyFeature myFeature() {
        return new MyFeature();
    }
}

  1. src/main/resources/META-INF
    目录下创建
    spring.factories
    文件,并在
    org.springframework.boot.autoconfigure.EnableAutoConfiguration
    关键字下列出您的自动配置类,比如:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.didispace.myfeature.MyFeatureAutoConfiguration

该配置的作用是让Spring Boot应用在引入您自定义Starter的时候可以自动这里的配置类。

注意:Spring Boot 2.7开始,不再推荐使用
spring.factories
,而是改用
/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
,文件内容直接放需要自动加载配置类路径即可。这个变更具体可见之前的这篇文章:
《Spring Boot 2.7开始spring.factories不推荐使用了》

验证测试

在制作Spring Boot Starter的时候,一定记得使用单元测试来验证和确保自动化配置类在任何条件逻辑在启动器下能够按照正确的预期运行。

创建单元测试

使用
@SpringBootTest
加载完整的应用程序上下文,并验证启动程序是否正确配置了 Bean 和属性。

@SpringBootTest(classes = TestApplication.class)
public class MyStarterAutoConfigurationTest {

    @Autowired(required = false)
    private MyService myService;

    @Test
    public void testMyServiceAutoConfigured() {
        assertNotNull(myService, "MyService should be auto-configured");
    }
}

覆盖不同的配置

如果有不同的配置方案,那么还需要使用
@TestPropertySource

@DynamicPropertySource
覆盖属性以测试不同配置下的情况。

或者也可以直接简单的通过
@SpringBootTest
中的属性来配置,比如下面这样:

@SpringBootTest(properties = "my.starter.custom-property=customValue")
public class MyStarterPropertiesTest {

    @Value("${my.starter.custom-property}")
    private String customProperty;

    @Test
    public void testPropertyOverride() {
        assertEquals("customValue", customProperty, "Custom property should be overridden by @SpringBootTest");
    }
}

覆盖
@Conditional
的不同分支

如果您的启动器包含条件配置,比如:
@ConditionalOnProperty

@ConditionalOnClass
等注解,那么就必须编写测试来覆盖所有条件以验证是否已正确。

比如下面这样:

@SpringBootTest(classes = {TestApplication.class, MyConditionalConfiguration.class})
@ConditionalOnProperty(name = "my.starter.enable", havingValue = "true")
public class MyStarterConditionalTest {

    @Autowired
    private ApplicationContext context;

    @Test
    public void conditionalBeanNotLoadedWhenPropertyIsFalse() {
        assertFalse(
            context.containsBean("conditionalBean"),
            "Conditional bean should not be loaded when 'my.starter.enable' is false"
        );
    }
}

为了覆盖不同的条件分支,我们通常还需要使用
@TestConfiguration
注解来有选择地启用或禁用某些自动配置。

小结

本文介绍了两个Spring Boot的进阶内容:

  1. 如何创建 Spring Boot Starter
  2. 如何为 Spring Boot Starter 提供单元测试

掌握这项技能可以帮你更好的为Spring Boot提供模块划的功能封装。如果您学习过程中如遇困难?可以加入我们超高质量的
Spring技术交流群
,参与交流与讨论,更好的学习与进步!更多
Spring Boot教程可以点击直达!
,欢迎收藏与转发支持!

最后再给大家推荐一些有关Spring Boot Starter和自动化配置的扩展阅读:

欢迎关注我的公众号:程序猿DD。第一时间了解前沿行业消息、分享深度技术干货、获取优质学习资源

一、摘要

在上篇文章中,我们讲到
ReentrantLock
可以保证了只有一个线程能执行加锁的代码。

但是有些时候,这种保护显的有点过头,比如下面这个方法,它仅仅就是只读取数据,不修改数据,它实际上允许多个线程同时调用的。

public class Counter {

    private final Lock lock = new ReentrantLock();

    private int count;

    public int get() {
        // 加锁
        lock.lock();
        try {
            return count;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
}

站在程序性能的角度,实际上我们想要的是这样的效果。

  • 1.读和读之间不互斥,因为只读操作不会有数据安全问题
  • 2.写和写之间互斥,避免一个写操作影响另外一个写操作,引发数据计算错误问题
  • 3.读和写之间互斥,避免读操作的时候写操作修改了内容,引发数据脏读问题

总结起来就是,允许多个线程同时读,但只要有一个线程在写,其他线程就必须排队等待。

在 JDK 中有一个读写锁
ReadWriteLock
,使用它就可以解决这个问题,它可以保证以下两点:

  • 1.只允许一个线程写入,其他线程既不能写入也不能读取
  • 2.没有写入时,多个线程允许同时读,可以提高程序并发性能

实际上,读写锁
ReadWriteLock
里面有两个锁实现,一个是读操作相关的锁,称为共享锁,当多个线程同时操作时,不会让多个线程进行排队等待,大大的提升了程序并发读的执行效率;另一个是写操作相关的锁,称为排他锁,当多个线程同时操作时,只允许一个线程写入,其他线程进入排队等待;两者进行组合操作,就可以实现上面的预期效果。

下面我们一起来看看它的基本用法!

二、ReadWriteLock 基本用法

2.1、读和读共享

读和读之间不互斥,当多个线程进行读的时候,不会让多个线程进行排队等待。

我们可以看一个简单的例子!

public class Counter {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private int count;

    public void read() {
        // 加读锁
        lock.readLock().lock();
        try {
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date());
            System.out.println(time + " 当前线程:" + Thread.currentThread().getName() + "获得了读锁,count:" + count);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放读锁
            lock.readLock().unlock();
        }
    }
}
public class MyThreadTest {

    public static void main(String[] args) {
        Counter counter = new Counter();
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                counter.read();
            }
        });

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                counter.read();
            }
        });

        threadA.start();
        threadB.start();
    }
}

看一下运行结果:

2023-10-23 16:12:28:119 当前线程:Thread-0获得了读锁,count:0
2023-10-23 16:12:28:119 当前线程:Thread-1获得了读锁,count:0

从日志时间上可以很清晰的看到,尽管加锁了,并且休眠了 5 秒,但是两个线程还是几乎同时执行
try()
方法里面的代码,证明了读和读之间是不互斥的,可以显著提高程序的运行效率。

2.2、写和写之间互斥

写和写之间互斥,当多个线程进行写的时候,只允许一个线程写入,其他线程进入排队等待。

我们可以看一个简单的例子!

public class Counter {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private int count;

    public void write() {
        // 加写锁
        lock.writeLock().lock();
        try {
            count++;
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date());
            System.out.println(time + " 当前线程:" + Thread.currentThread().getName() + "获得了写锁,count:" + count);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放写锁
            lock.writeLock().unlock();
        }
    }
}
public class MyThreadTest {

    public static void main(String[] args) {
        Counter counter = new Counter();
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                counter.write();
            }
        });

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                counter.write();
            }
        });

        threadA.start();
        threadB.start();
    }
}

看一下运行结果:

2023-10-23 16:29:59:103 当前线程:Thread-0获得了写锁,count:1
2023-10-23 16:30:04:108 当前线程:Thread-1获得了写锁,count:2

从日志时间上可以很清晰的看到,两个线程进行串行执行,证明了写和写之间是互斥的。

2.3、读和写之间互斥

读和写之间互斥,当多个线程交替进行读写的时候,操作上互斥,只有一个线程能进入,其他线程进入排队等待。

我们可以看一个简单的例子!

public class Counter {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private int count;

    public void read() {
        // 加读锁
        lock.readLock().lock();
        try {
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date());
            System.out.println(time + " 当前线程:" + Thread.currentThread().getName() + "获得了读锁,count:" + count);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放读锁
            lock.readLock().unlock();
        }
    }

    public void write() {
        // 加写锁
        lock.writeLock().lock();
        try {
            count++;
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date());
            System.out.println(time + " 当前线程:" + Thread.currentThread().getName() + "获得了写锁,count:" + count);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放写锁
            lock.writeLock().unlock();
        }
    }
}

public class MyThreadTest {

    public static void main(String[] args) {
        Counter counter = new Counter();
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                counter.read();
            }
        });

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                counter.write();
            }
        });

        threadA.start();
        threadB.start();
    }
}

看一下运行结果:

2023-10-23 16:36:08:786 当前线程:Thread-0获得了读锁,count:0
2023-10-23 16:36:13:791 当前线程:Thread-1获得了写锁,count:1

从日志时间上可以很清晰的看到,两个线程进行串行执行,证明了读和写之间是互斥的。

三、小结

总结下来,
ReadWriteLock
有以下特点:

  • 允许多个线程在没有写入时同时读取,可以提高读取效率
  • 当存在写入情况时,只允许一个线程写入,其他线程进入排队等待
  • 适合读多写少的场景

对于同一个数据,有大量线程读取,但仅有少数线程修改,使用
ReadWriteLock
可以显著的提升程序并发执行效率。

例如,一个论坛的帖子,浏览可以看做读取操作,是非常频繁的,而回复可以看做写入操作,它是不频繁的,这种情况就可以使用
ReadWriteLock
来实现。

本文主要围绕
ReadWriteLock
的基本使用做了一次知识总结,如果有不正之处,请多多谅解,并欢迎批评指出。

四、参考

1、
https://www.cnblogs.com/xrq730/p/4855631.html

2、
https://www.liaoxuefeng.com/wiki/1252599548343744/1306581002092578

数据抽取平台pydatax实现过程中,有2个关键点:

1、是否能在python3中调用执行datax任务,自己测试了一下可以,代码如下:
这个str1就是配置的shell文件

try:
result
=os.popen(str1).read()exceptException as e:print(e)

2、是否能获取datax执行后的信息:用来捕获执行的情况和错误信息

上面执行后的result就包含了datax的执行信息,对信息进行筛选,就可以获得

pydatax的表设计

在上面的2个关键点解决后,其他问题就比较简单,设计相关的表:

datax_config   datax抽取表的模板配置(源表名,目标表名,模板id,抽取的字段,抽取条件(增量,全量,特殊),抽取时间,执行顺序等)
datax_config_repair   datax的出错修复表,结构和datax_config一样,用于datax出错后,修复数据用
datax_etl_error    datax的etl的报错信息(非异常字符的报错)
datax_json   datax的模板id配置(全量和增量2个模板文件名)
datax_log   datax运行抽取表的执行信息(是否执行完成,抽取行数,速度,读出行数,流量等)
datax_row_error  datax执行中,字段有异常字符的报错信息

pydatax在项目中使用

项目1: 直接配置datax的模板json,从oracle 11g抽取到postgresql中,

因postgresql中会对"0x"这些异常字符报错,如oracle中字段有这样字段,必须在抽取字段使用:

使用 replace(name,chr(0),'\'\'') as name 来代替 以前的字段 name

项目2: 客户有9个分公司,用的ERP有9套,有9个库,不同版本,抽取的同一个表字段长度有不一样,字段可能有多有少,客户ERP核心分公司ERP几个月后有大版本升级。

因项目2中:数据仓库使用的GreePlum,datax的驱动用的是gpdbwriter-v1.0.4-hashdata.jar,该驱动自动删除"0x"非法字符,就不存在该错误

不可能写9个抽取json模板,再抽取
,只能原有json模板上修改

字段长度不同:
取9个库的最大值,作为目标表字段的字段长度

字段个数不同:
取其一个核心分公司库表为基础建表,其他8个库表,如果有就保留,没有就字段数据为NULL,每次执行查询取出8个库的字段:

# 获取分公司库该表的字段,如对比核心库表字段的缺失,使用null as字段替换,如果多余则废弃,
# 字段对比以核心库为标准
def get_org_src_columns(src_columns,org_name,tab_name):
src_columns
=src_columns
# 分公司字段
org_cols
=get_org_cols(org_name,tab_name)
lst
=src_columns.split(",")
cols1
= (org_cols + ',')
src_columns1
= (src_columns + ',')for i inlst:
str1
=i.strip() + ',' # 去掉空格,对比使用,字段名+',',这样避免有重复前缀的字段名,导致误判if (cols1.find(str(','+str1)) == -1):
src_columns1
= src_columns1.replace(str(','+str1), ',NULL as' +str1)return src_columns1.rstrip(',')

# 获取分公司库的表的字段用
','合并成一个字符串
def get_org_cols(org_name,tab_name):
conn
=ora_conn()
cur
= conn.cursor()
cols
=""
sql
="select WM_CONCAT(COLUMN_NAME) cols from (SELECT COLUMN_NAME FROM all_tab_columns WHERE OWNER=upper('"+org_name+"') " \
"
and table_name =upper('"+tab_name+"') order by COLUMN_ID asc) t ";
cur.
execute(sql)
datas
=cur.fetchall()for row indatas:
cols
= str(row[0])return cols;

修改json模板支持同时抽取9个数据库,修改的
9个库同时抽取oracle数据到greeplum全量json模板
,见下载文件的:oracle_gp_table_df_job.json:

    src_table_columns=row.get("src_table_column")#其他8家分公司库
    src_table_columns_fz=get_org_src_columns(src_table_columns,"FZ",src_table_name)
src_table_columns_jcg
=get_org_src_columns(src_table_columns,"JCG",src_table_name)
src_table_columns_ks
=get_org_src_columns(src_table_columns,"KS",src_table_name)
src_table_columns_qzdf
=get_org_src_columns(src_table_columns,"QZDF",src_table_name)
src_table_columns_sdsht
=get_org_src_columns(src_table_columns,"SDSHT",src_table_name)
src_table_columns_wfjx
=get_org_src_columns(src_table_columns,"WFJX",src_table_name)
src_table_columns_wst
=get_org_src_columns(src_table_columns,"WST",src_table_name)
src_table_columns_std
=get_org_src_columns(src_table_columns,"STD",src_table_name)


str1
= "/usr/bin/python /opt/module/datax/bin/datax.py /opt/module/datax/job/json/"+etl_mode+"-p \""\"-Dsrc_table_name='"+src_table_name+"'"\"-Ddes_table_name='"+des_table_name+"'"\"-Dsplit_pk_field='"+split_pk_field+"'"\"-Drelation='"+relation+"'"\"-Dcondition='"+dcondition+"'"\"-Dsrc_table_columns='"+src_table_columns+"'"\"-Dsrc_table_columns_fz='" + src_table_columns_fz + "'"\"-Dsrc_table_columns_jcg='" + src_table_columns_jcg + "'"\"-Dsrc_table_columns_ks='" + src_table_columns_ks + "'"\"-Dsrc_table_columns_qzdf='" + src_table_columns_qzdf + "'"\"-Dsrc_table_columns_sdsht='" + src_table_columns_sdsht + "'"\"-Dsrc_table_columns_wfjx='" + src_table_columns_wfjx + "'"\"-Dsrc_table_columns_wst='" + src_table_columns_wst + "'"\"-Dsrc_table_columns_std='" + src_table_columns_std + "'"\"-Ddes_table_columns='"+des_table_columns+"' \""

这样修改后,就可以同时抽取9个库的数据,同时配置时,只需要配置核心库的相关字段等数据即可!

说明:
1,该平台没有可视化页面的后台管理系统,如果加上后台管理系统,就更完美,但目前是足够使用的!

DATAX的GreePlum驱动plugin下载:

https://files.cnblogs.com/files/zping/gpdbwriter.rar?t=1708999240&download=true

pydatax
源码下载地址:

https://files.cnblogs.com/files/zping/pydatax.rar?t=1708755764&download=true

前言

本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除!
公众号链接

目标网站

aHR0cHM6Ly93d3cueGlhb2hvbmdzaHUuY29tLw==

逆向分析

打开网站

这里 eyj这种开头的,做过正向的应该很熟悉。这明显是{ 大括号的base64编码啊。
那这里我们尝试以下base64解码

如上图 得到json值。
通过hook json.Stringify() 定位加密点。

(function() {
    var stringify = JSON.stringify;
    JSON.stringify = function(params) {
        console.log("Hook JSON.stringify ——> ", params);
        debugger;
        return stringify(params);
    }
})();
//或者下一个//
(function () {
    var stringify = JSON.stringify;
    JSON.stringify = function () {
        if (arguments[0] && arguments[0]['payload']){
            debugger;
        }
        let temp = stringify.apply(this, arguments)
        return temp
    }
})();

然后我们需要注入到浏览器中去
这里可以通过油猴脚本。或者手动注入。
这里建议手动注入在发包之前注入。
这里直接走第一个堆栈 跳过两个栈既可。


然后往上找堆栈。


l = (a && void 0 !== window._webmsxyw ? window._webmsxyw : encrypt_sign)(s, i) || {};

这里可以看到这里有个三元表达式。
调用
window._webmsxyw
传值 s和 i
s: url切割的链接
i: 传值的参数(get的undefined,post为data)

然后继续往下面的栈走

发现到了这个地方 可以发现这个文件是个jsvmp。
那我们如何去解决这个JSVMP呢?
这里呢,可以通过两种方式去解决了。

  1. 补环境
  2. 纯算

补环境 分析

这里滑到最上面

可以看到。JSVMP是个自执行函数。那是不是代表。我们把这一整段代码。放到浏览器中可以正常运行呢?
我们试试。


可以看到可以正常运行。那后面就很简单了。
把代码放到js中运行即可。
这里直接运行。

定义window。

window = global;


ok 又少createElement。

document.createElement=function createElement(x) {
        return {}
    }

这里简单的补完之后。

好了又开始报缺少apply函数了。
那我们就知道了 原来这个地方就是JSVMP所有指令集执行的地方。
那这个点我们可以暂时记下来。虽然没什么用。但是在后续做纯算的时候可以做插桩。
其实到这里包括后续的getAttribute 找起来很麻烦需要自己手动浏览器打断点。非常的麻烦。
这里建议挂个proxy框架来补。
这里不带着大家补了。
最后补出来大概一百多行。
包含canvas。regex校验等。

纯算分析

这里对于纯算的手撸和还原 可以看看志远佬的视频。真是让人收获颇丰。
这里整体说一下完整算法呢,还是比较简单的、
在手撸xs算法之前 先来了解一下什么是插桩。

何为插桩

有些时候我们想获取某个变量的值,看其如何变化!但是又不想每次都是断点之后再在控制台打印,
那么有没有什么简便的办法呢?那就是插桩!
插桩的主要应用场景就是在面对一个复杂的控制流时,可以通过插桩实现环境自吐。
通过如下手段进行浏览器插桩。

插桩查找

这里在这个地方插桩。至于为什么上文补环境的时候已经说了。

_ace_1ae3c(_ace_8712.apply(typeof _ace_25a6._ace_936 == "undefined" ? _ace_4752e : _ace_25a6._ace_936, _ace_bdcc), _ace_be07c, _ace_be07c, 0);

这个地方插桩可以透露出很多的信息。因为这个地方是调用函数的地方。
所以我们可以打印出一些内容。

console.log(_ace_8712,_ace_25a6._ace_936,_ace_bdcc);false;

这里是一个条件表达式。最后返回值是false。这是什么意思呢?
意思是 在我们断住这个断点的时候需要执行一段JS。但是这个js不会因为什么情况而断住。
好 然后我们放开断点。

可以看到打印出了很多信息。这些信息大部分都是返回值。
我们可以通过返回值得到很多有用的信息。
这里我们从明文开始加密的地方开始断点。然后在刚刚打条件表达式的时候再打上一次断点。
明文密文都有了。就可以分析加密的过程了。

插桩分析


如上图所示。打印出来大概2w多行代码。
我们通过其中的逻辑(这里分析忽略)可知。
传参得到payload的函数的传参由x1,x2,x3,x4拼接而成。
x1为md5,
x2为环境检测函数后函数的返回值可以写死。
x3为cookie中的a1
x4为时间戳。

到这里。我们来捋一下整个的流程。
目标:

x-s 解密base64=
> 得到 一段Json
=> Json中Payload是我们需要加密的值。

结果:

由x1链接。x2定值。x3 cookie中的a1 以及x4时间戳拼接出来的字符串base64加密。再进行3des加密得到Payload。

这里需要注意的是。分析全靠经验而言。

  1. 分析x1是md5。
    1732584193、271733879、1732584194、271733878
    上面的内容是需要大家熟记的内容,在解决MD5加密时能够给你带来不少的好处,节约大量的时间。
  2. 分析base64加密。这个就更简单了。常见的屁股后面==号。或者代码中带着
    ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=
    这些。上述代码其实就是通过对于数组的移位生成的编码。
  3. 分析Des。不知道秘钥和IV没关系。尝试去插桩自吐出来的值扣除相应的iv或者key也能完成加密。

结语

这里江夏在星球也写了一份文章,底部有代码 感兴趣的可以看看

https://articles.zsxq.com/id_qe1og69f8wa9.html

无论是补环境还是硬扣算法。其实都阔以得到正常结果。不同的是运行速度以及花费的精力的成本不同。
选择什么样的方法最后还是靠自己。

最后运行。两者都能得到结果

公众号链接
星球链接