2024年2月

第五章:事件驱动架构

近年来,事件驱动架构风格显著增长并广泛应用,我们对其理解方式也发生了改变。这种高采用率并不令人意外,因为事件驱动架构能够解决复杂的非确定性工作流和高度反应和响应的系统等难题。此外,新技术、工具、框架和基于云的服务使得事件驱动架构比以往更易访问和可行,并且许多团队正在转向事件驱动架构来解决他们复杂的业务问题。

拓扑结构

事件驱动架构是一种基于异步处理的架构风格,通过高度解耦的事件处理器来触发和响应系统中发生的事件。大多数事件驱动架构由以下组件组成:一个事件处理器、一个主动事件、一个处理事件和一个事件通道。这些组件及其关系如图
5-1
所示。

Figure 5-1. The main components of event-driven architecture

事件处理器(通常称为服务)是事件驱动架构中的主要部署单元。它可以以不同的粒度存在,从一个单一目的函数(例如订单验证)到一个庞大而复杂的流程(例如金融交易执行或结算)。事件处理器能够触发异步事件,并对被触发的异步事件作出响应。在大多数情况下,事件处理器同时具备这两个功能。

初始事件通常来自于主系统外部,并启动某种异步工作流程或过程。举例来说,下订单、购买苹果股票、在拍卖中对特定物品进行竞标、提出保险索赔等都属于初始事件。在大多数情况下,只有一个服务接收到初始事件,然后开始一系列处理该初始事件相关联的其他事件链条,但并非必须如此。例如,在在线拍卖中对物品进行竞标(即初始事件),可能会被
Bid Capture
服务和
Bid Tracker
服务同时捕捉到。

当某个服务的状态发生变化并且该服务向系统中其他部分广播了这个状态改变时,就会生成一个处理事件(今天通常称为派生事件)。触发事件和处理事件之间是一对多的关系
-
一个触发事件通常会产生许多不同的内部处理事件。例如,在工作流程中,下订单的触发事件可能会导致订单已下达的处理事件、应用付款的处理事件、库存更新的处理时间等等。请注意,触发事件通常以名词
-
动词格式表示,而处理时间通常以动词
-
名词格式表示。

事件通道是物理消息传递工具(如队列或主题),用于存储触发的事件并将这些触发的时间交付给响应这些时间的服务。在大多数情况下,触发时间使用点对点通道使用队列或消息传递服务,而处理时间则通常使用发布
-
订阅通道使用主题或通知服务。

示例

为了观察所有这些组件如何在一个完整的事件驱动架构中协同工作,考虑一下图
5-2
所示的例子:一个顾客希望订购
Mark Richards

Neal Ford

O'Reilly
)合著的《软件架构基础》。在这种情况下,触发事件将是

下订单

。该触发事件被订单放置服务接收,然后该服务为书籍进行订购。订单放置服务通过处理

已下订单

事件向系统中其他部分广播其所执行的操作。

请注意,在此示例中,当订单放置服务触发

已下订单

事件时,并不知道哪些其他服务(如果有)会对此事件做出响应。这说明了事件驱动架构具有高度解耦、非确定性的特点。

继续上述例子,请注意图
5-2
中有三个不同的服务对

已下订单

事件作出响应:支付服务、库存管理服务和通知服务。这些服务执行相应的业务功能,并通过其他处理事件向系统中其他部分广播它们所执行的操作。

Figure 5-2. Processing a book order using event-driven architecture

在这个例子中,需要特别注意的是,通知服务通过生成一个
NotifiedCustomer
处理事件来宣传自己所做的事情,但其他服务并不关心或响应该事件。那么为什么要触发一个没有人关心的事件呢?答案是出于架构可扩展性考虑。通过触发该事件,通知服务提供了未来可能有其他服务响应的钩子(例如通知跟踪服务),而无需对系统进行任何其他修改。因此,在事件驱动架构中,一个重要原则是始终让服务广告其状态变化(即采取了什么行动),无论其他服务是否对该事件作出响应。如果没有其他服务关心该事件,则该事件将从主题中消失(或根据使用的消息技术保存以供将来处理)。

事件驱动 vs 消息驱动

事件驱动系统和消息驱动系统之间存在差异吗?实际上确实存在微妙但重要的区别,需要进行了解和理解。事件驱动系统处理事件,而消息驱动系统处理消息。

第一个区别与您向整个系统发送的内容上下文有关。事件通知其他人状态变化或您所做的某些事情,例如

我刚刚下了订单



我刚刚提交了对某个物品的竞价

。另一方面,消息是针对特定服务发出的命令或请求,例如

将此付款应用于此订单



将该物品运送到此地址



给我客户的电子邮件地址

。请注意这里的区别
-
通过触发事件来响应事件的服务不知道哪些服务(或多少个)会作出响应,而消息通常指向已知单个服务(如支付)。

事件和消息之间另一个区别在于对于事件通道所有权问题。在事件中,发送者拥有该事件通道;而在消息中,则接收者拥有该通道。当考虑到时间或信息合同时,所有权变得更加重要。请考虑图
5-3

Order Placement
服务发送
Order Placed event
并由
Payment service
回复
的示例
。在这种情况下
,
发送者
(Order Placement)
拥有
event channel

contract.
换句话说
, contract
变更将由
Order Placement
服务启动,并且
Payment
服务以及所有其他响应该
event

service
都必须遵守并适应这些变化。

Figure 5-3. With events, the sender owns the event channel and
contract

然而,在消息驱动的系统中,情况正好相反
-
接收者拥有消息通道。如图
5-4
所示,订单放置服务以命令的形式告知支付服务应用付款。在这种情况下,支付服务不仅具备消息通道(队列),还具备消息合同。请注意,在基于消息的处理中,订单放置服务需要遵守由支付服务发起的合同更改。

Figure 5-4. With messages, the receiver owns the message channel and
contract

事件通道工件的类型也是区分事件驱动系统和消息驱动系统的一个关键因素。一般情况下,当触发事件时,事件驱动系统采用基于主题或通知服务的发布
-
订阅式消息传递方式,而消息驱动系统则常使用基于队列或消息服务的点对点消息传递方式。然而,并不意味着事件驱动系统不能使用点对点消息传递
——
在某些情况下,为了从另一个服务中检索特定信息或控制系统中事件顺序或时间,可能需要采用点对点消息传递。

斟酌与分析

由于事件驱动架构的异步和解耦特性,它在容错性、可扩展性和高性能方面表现卓越。在添加额外功能时,它还提供了出色的可扩展性。然而,尽管这些特点非常吸引人,尤其是对于当今复杂系统而言,在不采用事件驱动架构时也存在许多理由。以下两个部分概述了考虑使用事件驱动架构的原因,并更重要地指出了何时需要谨慎使用。

什么时候考虑这种风格

简而言之,事件驱动架构是在需要高性能、高可扩展性和高容错性的系统选择中具备重要地位的一种架构。然而,除了这些超级功能外,还有其他原因可以考虑采用这种架构风格。

如果您的业务处理方式是对系统内外发生的事件做出反应(而不仅仅是响应用户请求),那么这就是一个值得考虑的优秀架构风格。请听取您业务利益相关者的意见
-
他们是否使用

事件



触发器



对某事做出反应

的词汇?如果确实如此,那么很可能您面临的业务问题与该架构风格相匹配。此外,请自问
-
我是在响应用户请求还是对用户所作操作作出反应?这些都是用来确定业务问题是否与该架构风格相匹配的重要问题。

当您面临难以建模复杂、非确定性工作流程时,事件驱动架构也将成为一个不错的选择。几十年来,开发人员一直试图通过建立复杂决策树来概述复杂工作流程中每个可能结果,并屡次失败于此繁琐任务。像这样的系统有时被归类为
CEP
(复杂事件处理),并在事件驱动体系结构中进行本地管理。

什么时候不要考虑这种风格

如果大部分的处理都是基于请求的,那么你不应该考虑这种架构风格。基于请求的处理是指用户从数据库中获取数据(例如客户资料)或对系统中的实体进行基本
CRUD
操作(创建、读取、更新、删除)。此外,如果大部分的处理需要同步执行,即用户必须等待特定请求完成后才能继续,那么事件驱动架构很可能不适合你。

由于事件驱动架构中的所有处理最终都是一致的,因此对于需要高水平数据一致性的业务问题来说,这并非一个理想的架构风格。在事件驱动架构中,几乎没有或者很少保证处理何时发生,因此如果您期望某个特定时间点存在某些数据,请寻找其他能够确保数据一致性的架构风格,例如基于服务的架构。

另一个放弃事件驱动架构并考虑其他架构风格的原因是在需要对事件的工作流程和时间控制时,管理异步事件处理变得非常困难。例如,想象一下协调以下场景的复杂情况:在触发事件
C
之前,必须完成事件
A
和事件
B
的处理,并且在等待
Event C
完成之后,
Event D

Event E
必须等待
Event C
开始处理之前启动。要成功应对这种混乱局面最好使用编排式服务导向架构或编排式微服务进行复杂协调。

错误处理也是使团队远离事件驱动架构的复杂性的另一个原因。由于通常没有中央工作流程编排器或控制器,在出现错误时,该服务尝试修复错误。此外,在该事件的工作流程中可能已经发生了其他操作,因为所有操作都是异步进行的。例如,假设订单放置服务为客户订购书籍触发了
OrderPlaced

。通知服务、支付服务和库存服务同时响应该

。然而,假设通知和支付服务都响应并完成其处理
,但当接收到该

时库存服
务抛出错误
,因为没有更多可用
的书籍
。现在如何解决?客户已经被通知并且他们
的信用卡已经被扣款
,但没有更多可供发送给
客户
的书籍了
。是否应撤销付款?是否应向客
户发送另一条通知?还是只需等待有更多库存?
哪个服

执行所有这些
错误
处理逻辑呢?
错误处
理确实
是事情
驱动








面之一。

架构特征


5-5
中的图表综合评估了事件驱动架构在体系结构特征方面的整体能力(架构特性)。星级评定表示一颗星代表该架构特性支持较弱,而五颗星则表示它非常适用于该特定的架构特性。

Figure 5-5. Architecture characteristics star ratings for event-driven
architecture

我们在做web应用的时候通常会遇到前端提交按钮重复点击的场景,在某些新增操作上就需要做幂等性限制来保证数据的可靠性。下面来用java aop实现幂等性校验。

一:首先我们需要一个自定义注解

packagecom.yuku.yuku_erp.annotation;import java.lang.annotation.*;/***@author名一
* @ClassName IdempotentAnnotation
* @description: 用来标记需要校验幂等性的接口
* @datetime 2024年 02月 03日 14:48
*
@version: 1.0*/@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interfaceIdempotentAnnotation {
String idempotentType();
}

二:创建一个幂等校验的切面类

packagecom.yuku.yuku_erp.aop;importcom.yuku.yuku_erp.annotation.IdempotentAnnotation;importcom.yuku.yuku_erp.constant.RedisKeyConstant;importcom.yuku.yuku_erp.exception.MyException;importcom.yuku.yuku_erp.utils.RedisShardedPoolUtil;importcom.yuku.yuku_erp.utils.TokenUtil;importlombok.extern.slf4j.Slf4j;importorg.aspectj.lang.JoinPoint;importorg.aspectj.lang.annotation.Aspect;importorg.aspectj.lang.annotation.Before;importorg.aspectj.lang.annotation.Pointcut;importorg.aspectj.lang.reflect.MethodSignature;importorg.springframework.stereotype.Component;importjava.lang.reflect.Method;/***@author名一
* @ClassName CheckIdempotentAop
* @description: 幂等性校验
* @datetime 2024年 02月 03日 14:59
*
@version: 1.0*/@Slf4j
@Aspect
@Component
public classCheckIdempotentAop {

@Pointcut(
"execution(* com.yuku.yuku_erp.controller..*.*(..))")public voidcheckCut(){
}

@Before(
"checkCut()")public voidcheckIdempotent(JoinPoint joinPoint){
MethodSignature signature
=(MethodSignature) joinPoint.getSignature();
Method method
=signature.getMethod();if (method.isAnnotationPresent(IdempotentAnnotation.class)){
IdempotentAnnotation annotation
= method.getAnnotation(IdempotentAnnotation.class);
String idempotentType
=annotation.idempotentType();
String idempotentToken
= TokenUtil.getRequest().getHeader("idempotentToken");
String idemToken
= idempotentType +idempotentToken;
log.info(
"checkIdempotent idempotentType:{}, idempotentToken:{}", idempotentType, idempotentToken);

Boolean flag
=RedisShardedPoolUtil.sismember(RedisKeyConstant.IDEMPOTENT_TOKEN_LIST, idemToken);if (!flag){
log.error(
"checkIdempotent error idempotentType:{}, idempotentToken:{}, flag:{}", idempotentType, idempotentToken, flag);throw new MyException("该接口已提交过,请不要重复提交");
}
RedisShardedPoolUtil.delSetByValue(RedisKeyConstant.IDEMPOTENT_TOKEN_LIST, idemToken);
log.info(
"checkIdempotent idempotentType:{}, idempotentToken:{}, flag:{}", idempotentType, idempotentToken, flag);
}
}
}

三:在需要切面的接口上使用幂等校验注解

@IdempotentAnnotation(idempotentType = "checkIdempotentToken")
@GetMapping(
"/checkIdempotentToken")
@ApiOperation(value
= "校验幂等性示例")public CommonResult<String>checkIdempotentToken(){returnCommonResult.success();
}

到此幂等校验就完成了

前言

近日心血来潮想做一个开源项目,目标是做一款可以适配多端、功能完备的模板工程,包含后台管理系统和前台系统,开发者基于此项目进行裁剪和扩展来完成自己的功能开发。本项目为前后端分离开发,后端基于
Java21

SpringBoot3
开发,后端使用
Spring Security

JWT

Spring Data JPA
等技术栈,前端提供了
vue

angular

react

uniapp

微信小程序
等多种脚手架工程。

项目地址:
https://gitee.com/breezefaith/fast-alden

在使用
Spring Security
时,笔者定义了一个
ThreadPoolTaskExecutor
Bean用于创建子线程,但是却遇到了子线程中无法获取到认证信息的问题,本文主要介绍该问题的解决方案。

原因分析


Spring Security
中想要获取登录用户信息,只能在当前线程中获取,不能在子线程中获取,其中一个重要的原因就是
SecurityContextHolder
默认将用户信息保存在
ThreadLocal
中。

ThreadLocal
叫做
本地线程变量
,意思是说,
ThreadLocal
中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,
ThreadLocal
为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。

解决方案

方案1:手动设置线程中的认证信息

在子线程的业务逻辑代码之前先手动设置认证信息,后续就可以通过
SecurityContextHolder
直接获取。

// 获取当前线程认证信息
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();

// 创建新线程
Runnable runnable = new Runnable() {
	public void run() {
		// 手动设置线程中的认证信息
		SecurityContextHolder.getContext().setAuthentication(authentication);
		
		// 线程处理逻辑(后续就能获取到认证信息)
		// ...
	}
};
new Thread(runnable).start();

方案2:使用
DelegatingSecurityContextRunnable
创建线程

Spring Security
考虑到了新线程需要访问认证信息的情况,提供了
DelegatingSecurityContextRunnable
类,通过该类构建新线程(返回一个
Runnable
对象),线程内部自然能获取认证信息。有兴趣的读者可以阅读一下
DelegatingSecurityContextRunnable
的源码,其思路与方法1是一致的,都是先获取到当前线程的认证信息,然后传递给新线程。

// 使用DelegatingSecurityContextRunnable创建线程
Runnable runnable = new DelegatingSecurityContextRunnable(() -> {
  // 线程处理逻辑
  // ...
});
new Thread(runnable).start();

方案3:修改
Spring Security
安全策略

默认情况下,
Spring Security
使用
ThreadLocal
存储认证信息,但实际上它也支持通过设置安全策略来修改认证信息的存储位置,它支持三种安全策略,有
MODE_THREADLOCAL

MODE_INHERITABLETHREADLOCAL

MODE_GLOBAL
。如果没有指定,则会默认使用
MODE_THREADLOCAL
策略。

  • MODE_THREALOCAL
    表示用户信息只能由当前线程访问。
  • MODE_INHERITABLETHREADLOCAL
    表示用户信息可以由当前线程及其子线程访问.。
  • MODE_GLOBAL
    表示用户信息没有线程限制,全局都可以访问,一般用于gui的开发中。

因此,将安全策略修改为
MODE_INHERITABLETHREADLOCAL
就可以在子线程中获取到认证信息。

Spring Security
还提供了两种修改安全策略的方式,一种是通过设置JVM参数
spring.security.strategy
,一种是调用
SecurityContextHolder

setStrategyName
方法。

通过设置JVM参数修改安全策略

-Dspring.security.strategy=MODE_INHERITABLETHREADLOCAL

image

通过
SecurityContextHolder
修改安全策略

可以借助
@PostConstruct
注解在程序启动后修改安全策略。

@Configuration
public class CommonSecurityConfig {
    @PostConstruct
    public void setStrategyName() {
        // 程序启动后修改认证信息上下文存储策略,支持子线程中获取认证信息
        SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
    }
}

@PostConstruct
注解是由Java提供的,它用来修饰一个非静态的void方法。它会在服务器加载Servlet的时候运行,并且只运行一次。
image

总结

本文主要介绍
SpringBoot3
使用
Spring Security
时如何在子线程中获取到认证信息。如有错误,还望批评指正。

在后续实践中我也是及时更新自己的学习心得和经验总结,希望与诸位看官一起进步。

写在开头

作为ArrayList的同门师兄弟,LinkedList的师门地位逊色不少,除了在做算法题的时候我们会用到它之外,在实际的开发工作中我们极少使用它,就连它的创造者都说:“I wrote it,and I never use it”,想想颇有点好笑,但这并不影响我们去学习它,个人认为它底层的链表逻辑对于我们代码思想的培养还是挺有帮助的。

源码解析

看过build哥文章的同学应该都知道,俺喜欢通过源码去学习和分析对象或代码逻辑,因此,话不多说,直接上源码!

public class LinkedList<E>
    extends AbstractSequentialList<E>
    implements List<E>, Deque<E>, Cloneable, java.io.Serializable
{
  //...
}

如上为JDK8中LinkedList的继承实现关系,通过这些关系我们可以大致分析出它所具备的特性:

  1. 实现List接口 表明它是一个列表,支持添加、删除、查找等操作,并且可以通过下标进行访问;
  2. Deque继承自 Queue 接口,具有双端队列的特性,支持从两端插入和删除元素,方便实现栈和队列等数据结构;
  3. Cloneable :表明它具有拷贝能力,可以进行深拷贝或浅拷贝操作;
  4. Serializable : 表明它可以进行序列化操作,也就是可以将对象转换为字节流进行持久化存储或网络传输,非常方便。

LinkedList提供了非常多的方法供我们使用,继续阅读源码可以看到

// 在链表尾部插入元素
public boolean add(E e) {
    linkLast(e);
    return true;
}

// 在链表指定位置插入元素
public void add(int index, E element) {
    // 下标越界检查
    checkPositionIndex(index);

    // 判断 index 是不是链表尾部位置
    if (index == size)
        // 如果是就直接调用 linkLast 方法将元素节点插入链表尾部即可
        linkLast(element);
    else
        // 如果不是则调用 linkBefore 方法将其插入指定元素之前
        linkBefore(element, node(index));
}

// 将元素节点插入到链表尾部
void linkLast(E e) {
    // 将最后一个元素赋值(引用传递)给节点 l
    final Node<E> l = last;
    // 创建节点,并指定节点前驱为链表尾节点 last,后继引用为空
    final Node<E> newNode = new Node<>(l, e, null);
    // 将 last 引用指向新节点
    last = newNode;
    // 判断尾节点是否为空
    // 如果 l 是null 意味着这是第一次添加元素
    if (l == null)
        // 如果是第一次添加,将first赋值为新节点,此时链表只有一个元素
        first = newNode;
    else
        // 如果不是第一次添加,将新节点赋值给l(添加前的最后一个元素)的next
        l.next = newNode;
    size++;
    modCount++;
}

// 在指定元素之前插入元素
void linkBefore(E e, Node<E> succ) {
    // assert succ != null;断言 succ不为 null
    // 定义一个节点元素保存 succ 的 prev 引用,也就是它的前一节点信息
    final Node<E> pred = succ.prev;
    // 初始化节点,并指明前驱和后继节点
    final Node<E> newNode = new Node<>(pred, e, succ);
    // 将 succ 节点前驱引用 prev 指向新节点
    succ.prev = newNode;
    // 判断尾节点是否为空,为空表示当前链表还没有节点
    if (pred == null)
        first = newNode;
    else
        // succ 节点前驱的后继引用指向新节点
        pred.next = newNode;
    size++;
    modCount++;
}
// 获取链表的第一个元素
public E getFirst() {
    final Node<E> f = first;
    if (f == null)
        throw new NoSuchElementException();
    return f.item;
}

// 获取链表的最后一个元素
public E getLast() {
    final Node<E> l = last;
    if (l == null)
        throw new NoSuchElementException();
    return l.item;
}

// 获取链表指定位置的元素
public E get(int index) {
  // 下标越界检查,如果越界就抛异常
  checkElementIndex(index);
  // 返回链表中对应下标的元素
  return node(index).item;
}


更多的API方法可以参考:
LinkedList全量方法

使用LinkedList

在Java中我们写一个小测试代码来用一下LinkedList的增删改查

【代码示例1】

  // 创建LinkedList集合
  LinkedList link = new LinkedList();
  // 1、添加元素
  link.add("happy");
  link.add("new");
  link.offer("year"); // 向集合尾部追加元素
  link.push("javabuild"); // 向集合头部添加元素
  System.out.println(link); // 输出集合中的元素
  // 2、获取元素
  Object object = link.peek(); //获取集合第一个元素
  System.out.println(object); // 输出集合中的元素
  // 3、删除元素
  link.removeFirst(); // 删除集合第一个元素
  link.pollLast(); // 删除集合最后一个元素
  System.out.println(link);

输出:

[javabuild, happy, new, year]
javabuild
[happy, new]

对比ArrayList

  1. ArrayList 和 LinkedList 都是不同步的,也就是不保证线程安全;
  2. ArrayList 底层使用的是 Object 数组;LinkedList 底层使用的是双向链表数据结构;
  3. LinkedList 不支持高效的随机元素访问,而 ArrayList(实现了 RandomAccess 接口) 支持。
  4. ArrayList存在扩容问题,LinkedList不存在,直接放在集合尾部,修改指针即可;

提问:为什么LinkedList不支持高效的随机访问,或者说为什么不去实现RandomAccess 接口?

我们看过RandomAccess 接口的底层的同学知道,这个接口也是个标识性接口,只要实现了这个接口就意味着支持通过索引访问元素。由于 LinkedList 底层数据结构是链表,内存地址不连续,只能通过指针来定位,不支持随机快速访问,所以不能实现 RandomAccess 接口。
但是!
在LinkedList中依旧提供了get(int index):获取链表指定位置的元素。

// 获取链表指定位置的元素
public E get(int index) {
  // 下标越界检查,如果越界就抛异常
  checkElementIndex(index);
  // 返回链表中对应下标的元素
  return node(index).item;
}

源码中get方法实现通过位置获取元素的核心是node(index)方法,我们跟进去继续看一下!

// 返回指定下标的非空节点
Node<E> node(int index) {
    // 断言下标未越界
    // assert isElementIndex(index);
    // 如果index小于size的二分之一  从前开始查找(向后查找)  反之向前查找
    if (index < (size >> 1)) {
        Node<E> x = first;
        // 遍历,循环向后查找,直至 i == index
        for (int i = 0; i < index; i++)
            x = x.next;
        return x;
    } else {
        Node<E> x = last;
        for (int i = size - 1; i > index; i--)
            x = x.prev;
        return x;
    }
}

该方法中通过传入的index参数和size的1/2进行比较,小于则从链表头向后查找,否则从链表尾向前遍历查找,这与ArrayList中的get(index)方法还是有本质上的区别!

结尾彩蛋

如果本篇博客对您有一定的帮助,大家记得
留言+点赞+收藏
呀。原创不易,转载请联系Build哥!

开心一刻

今天坐在太阳下刷着手机

老妈走过来问我:这么好的天气,怎么没出去玩

我:我要是有钱,你都看不见我的影子

老妈:你就不知道带个碗,别要边玩?

我:......

优先级队列

说到队列,相信大家一定不陌生,是一种很基础的数据结构,它有一个很重要的特点:
先进先出

但说到优先级队列,可能有些小伙伴还不清楚,因为接触的不多嘛

示例基于:
RabbitMQ 3.9.11

业务场景

我手头上正好有一个项目,系统之间通过
RabbitMQ
通信,
调度系统
是消息生产者,
文件生成系统
是消息消费者

默认情况下,先下发的消息会先被消费,也就是先进队列的消息会先出队列

业务高峰期,重要程度不同的文件都需要生成,那如何保证重要文件先生成了?

1、调整调度

1.1 将重要文件的调度提前,保证重要文件的消息先进入队列;但需要考虑调度能否提前,如果生成文件依赖的上游数据还未就绪了?

1.2 将普通文件的调度延后,有点围魏救赵的感觉,万一某一天不需要生成重要文件,那服务器岂不是有一段时间的空置期,而这段空置期本可以生成普通文件

总的来说就是不够灵活:有重要文件的时候先生成重要文件,没有重要文件的时候生成普通文件

2、提高服务器配置

这个就不用过多解释了把,加大
文件生成系统
的硬件配置,提高其文件生成能力

保证文件(不论重要还是普通)都能在调度的时间开始生成,也就无需区分重要与普通了

那么重要文件先生成这个命题就不成立了

想想都美,可实际情况,大家都懂的!

3、优先级队列

RabbitMQ

Priority Queue
非常契合这个业务场景,详情请往下看

队列优先级

相较于普通队列,优先级队列肯定有一个标志来标明它是一个优先级队列

这个标志就是参数:
x-max-priority
,定义优先级的最大值

我们先来看下
RabbitMQ
控制台如何配置

相关参数配置好之后,点击
Add queue
即创建出了一个
优先级队列

创建完成之后,你会发现队列上有一个
Pri
标志,说明这是一个优先级队列

实际开发工程中,一般不会在
RabbitMQ
控制台创建队列,往往是服务启动的时候,通过服务自动创建
exchange

queue

实现也非常简单


@Configurationpublic classRabbitConfig {

@Bean
publicDirectExchange directExchange() {return new DirectExchange(QSL_EXCHANGE, true, false);
}

@Bean
publicQueue queue() {
Map
<String, Object> args = new HashMap<>();
args.put(
"x-max-priority", 5);return new Queue(QSL_QUEUE, true, false, false, args);
}

@Bean
publicBinding bindingQueue() {return BindingBuilder.bind(queue()).to(directExchange()).with("com.qsl");
}
}

View Code

服务启动成功后,我们可以在
RabbitMQ
控制台看到队列:
com.qsl.queue
,其
x-max-priority
等于 5

消息优先级

消息属性
priority
可以指定消息的优先级

停止服务后,我们手动往队列
com.qsl.queue
中放一些带有优先级的消息

优先级分别是:
3,1,5,5,10,4
对应的消息体分别是:
3,1,5_1,5_2,10,4

此时队列中共有 6 个消息准备就绪

启动服务,进行消息消费,消费顺序如下

可以总结出一个规律:
优先级高的先出队列,优先级相同的,先进先出

那优先级是
10
的那个消息是什么情况,它为什么不是第一个出队?

因为队列
com.qsl.queue
的最大优先级是 5,即使消息的优先级设置成 10,其实际优先级也只有 5,这样是不是就理解了?

实际开发工程中,基本不会在
RabbitMQ
控制台手动发消息,肯定是由服务发送消息

我们模拟下带有优先级的消息发送

是不是
so easy !

x-max-priority

值支持范围是
1 ~ 255
,推荐使用
1 ~ 5
之间的值,如果需要更高的优先级则推荐
1 ~ 10

1 ~ 10
已经足够使用,不推荐使用更高的优先级,更高的优先级值需要更多的
CPU

内存资源

没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息

数据结构

底层数据结构:堆

具体请看:
数据结构之堆 → 不要局限于堆排序

ACK超时

之前一直不知道这一点,直到有一次碰到了如下异常

一查才知道ACK超时了

超时异常

从消费者获取到消息(消息投递成功)开始,在超时时间(默认30分钟)内未确认回复,则关闭通道,并抛出
PRECONDITION_FAILED
通道异常

并且消息会重新进入队列,等待再次被消费

ACK超时的配置项:
consumer_timeout
,默认值是
1800000
,单位是毫秒,也就是 30 分钟

可用命令
rabbitmqctl eval 'application:get_env(rabbit,consumer_timeout).'
查看

判断是否ACK超时的调度间隔是一分钟,所以
consumer_timeout
不支持低于一分钟的值,也不建议低于五分钟的值

我们将
consumer_timeout
调整成 2 分钟,看看超时异常

有 2 种调整方式

1、修改
/etc/rabbitmq.conf

配置文件没有则新建,然后在配置文件中将
consumer_timeout
设置成
120000
(没有该配置项则新增)

然后重启
rabbitmq

2、动态修改

执行命令
rabbitmqctl eval 'application:set_env(rabbit,consumer_timeout,120000).'
即可

不需要重启
rabbitmq

需要注意的是,这种修改不是永久生效,一旦
rabbitmq
重启,
consumer_timeout
将会恢复到默认值

我们用第 2 种方式进行调整

然后我们在消费端睡眠 3 分钟后进行ACK

最后在
rabbitmq
控制台手动发送一个消息,异常信息如下


2024-02-15 13:08:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6
2024-02-15 13:10:47|AMQP Connection 192.168.3.225:5672|org.springframework.amqp.rabbit.connection.CachingConnectionFactory|ERROR|1575|Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 120000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|ERROR|33|消息确认异常:
java.lang.IllegalStateException: Channel closed; cannot ack
/nack
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:
1175)
at com.sun.proxy.$Proxy50.basicAck(Unknown Source)
at com.qsl.rabbit.listener.TestListener.onMessage(TestListener.java:
31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:
43)
at java.lang.reflect.Method.invoke(Method.java:
498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:
171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:
120)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:
53)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:
220)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:
148)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:
133)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:
1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:
1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:
1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:
1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:
1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:
975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:
921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$
1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:
1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:
1202)
at java.lang.Thread.run(Thread.java:
745)2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|INFO|1436|Restarting Consumer@2e6f610d: tags=[[amq.ctag-hE7fVqLNKO44ytMHalsf2A]], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.3.225:5672/,1), conn: Proxy@3b1ed14b Shared Rabbit Connection: SimpleConnection@13275d8 [delegate=amqp://admin@192.168.3.225:5672/, localPort= 55710], acknowledgeMode=MANUAL local queue size=0 2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6

View Code


RabbitMQ 3.12
开始,可以为每个队列配置过期时长,而之前只能为每个
Rabbit
节点配置过期时长

如何处理

如果碰到ACK超时,那么我们该如何处理

1、增加超时时长

这往往是最容易想到的,默认 30 分钟不行就改成 60 分钟嘛

但并不是无脑增加超时时长,默认值往往是综合情况下比较优的一个值,并不推荐加长

2、异步处理

用线程池处理异步处理消息,及时进行消息ACK

但需要考虑拒绝策略,如果用的是:
CallerRunsPolicy
,还是有可能触发ACK超时

3、幂等处理

消息消费做幂等处理,是规范,而不仅仅只是针对ACK超时

消息正在消费中,或者已经消费完成,这个消息就不应该再次被消费,可以打印日志然后直接ACK,而无需进行业务处理

4、自动ACK

虽然自动ACK可以简化消息确认的流程,但它也可能带来一些潜在的问题,例如:

消息丢失风险:自动ACK意味着一旦消费者接收到消息,
RabbitMQ
就会将其从队列中移除。如果消费者在处理消息时发生故障或崩溃,未处理的消息可能会丢失

限流作用减弱:ACK机制可以帮助限流,即通过控制ACK的发送速率来限制消费者处理消息的速度。如果使用自动ACK,这种限流作用会减弱,可能导致消费者过快地消费消息,超出其实际处理能力

缺乏灵活性
:自动ACK不允许消费者在处理完消息后再决定是否要确认消息,这限制了消费者的灵活性。例如,消费者可能需要根据消息内容或处理结果来决定是否重新入队或丢弃消息

等等

总之,自动ACK慎用

具体如何处理,需要结合具体业务,选择比较合适的方式

总结

优先级队列

通过配置
x-max-priority
参数标明队列是优先级队列

队列的优先级取值范围推荐
1 ~ 5
,不推荐超过 10

通过属性
priority
可以指定消息的优先级,没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息

优先级高的消息先出队列(先被处理),优先级低的消息后出队列(后被处理),优先级相同的则是先进先出

ACK超时

ACK超时是一种保护机制,其实可以类比
HTTP
请求超时、数据库连接查询超时

RabbitMQ
的ACK超时默认是 30 分钟,可以修改配置项
consumer_timeout
进行调整

至于如何避免ACK超时,需要结合具体的业务选择合适的方式

示例代码

spring-boot-rabbitmq

参考

Classic Queues Support Priorities

acknowledgement-timeout