2024年1月

做为公司产品经理以项目经理的角色跟了一个不大不小的项目,现在项目结束了期间也踩了不少坑觉得有必要总结一下经验,也希望能给第一次跟项目的小伙伴带来一些帮助

项目调研

主要了解项目情况,其实大部分项目都是为了解决甲方的某个或某一类问题。所以调研阶段一定要弄清楚甲方主要的问题或痛点在哪里,导致问题的主要原因是什么,迫切需要解决问题的角色或部门是哪些。这些都是为后续输出有效建设方案做铺垫,前期调研有没有足够充分可能决定输出的方案是否正是甲方想要的。

这里有个角色特别关键就是甲方项目对接人,这个角色从项目开始到结束都发挥至关重要的作用。除了一些官方的对接,前期可以通过这个角色深挖项目预算,后续过程中也可以通过这个角色大事化小小事化了解决很多问题。关系处理好了可以省掉很多麻烦事,即使没能大事化小项目过程中也可以避免特地给你小事化大。

项目建设方案

项目方案决定是否能顺利拿下项目合同。有些项目不会透露真实的项目预算,或者官方透露出来的也是虚拟的。但基本离不开一个原则,谁都愿意花更少的钱解决更大的问题。对预算不明确的情况可以多输出几个方案。我们当时的项目就有同行竞争,人家出了一个各种智能高科技的方案,我们从甲方的角度考虑一次给出了四个方案,最终甲方从我们这选了一个建设成本最低实施最简单的方案,甚至甲方自己在十年前就有人提出过的方案(当然这里面因为科技发展十年前可行性不高到现在可行性变高了)。说到底最终还是为了省钱。

上面也提到调研时需要弄清楚迫切需要解决问题的角色或部门是哪些,那么方案可以相对针对性的设计。比如要解决财务问题,财务就比较关键甚至在解决某一财务问题时可能会增加一些基层人员的工作或者牺牲一些体验,在问题的迫切性面前有时候也是可以接受的。不过方案中至于是否可以牺牲某一部分去解决迫切的问题,最好让甲方决定。比如: 方案一优缺点是什么,方案二优缺点是什么,推荐方案是什么,让甲方自己选择。

项目合同

合同也就是我跟这个项目踩坑最多的地方。合同中应尽量避免给自己后续带来各种麻烦,如果涉及风险的将风险转交出去(如:转给你的乙方)。合同中涉及一些不清晰的条例如果后期可能对己方不利的一定要先明确清楚,当然有时候合同也可以有不清晰反而对己方有利的。所以合同敲定前一定要仔细斟酌。

有些项目会涉及到招标投标,投标书也是具备一定法律约束的,有些人在投标时为了争取到项目,会在标书中各种吹有些东西根本没有也会吹的天花乱坠。到后期验收甲方纠着标书不放就有的受了。标书可以吹大但千万别吹没有的做不到的。

项目验收

有些正规单位对项目验收会比较严格,甚至会有第三方监理单位介入。所以从项目开始就一路将该准备的资料都一并准备好,否则到后期验收、尾款就可以拖到怀疑人生。

一般在合同里会明确项目验收交接需要提供哪些资料。如果有第三方监理单位介入,最好在前期就跟监理确认清楚需要准备哪些资料,资料有哪些规范。对于监理而言,资料提前准备还是后期补没多大区别,他们的目的是保证项目该做到的都做到了符合要求,该准备的资料都准备齐全了。但是如果后期项目都到验收阶段了再去补前面的资料,就会变得很被动,比如实施方案、实施报告(还要甲方相关人员签字的那种),有的甚至有初验、终验等等不同时间点有不同甲方人员确认签字的,如果从项目开始到实施落地这些顺带就准备好了,后期验收就不用了专门花时间去整资料。这里耽误的就是收尾款的时间。我们项目按要求整理资料来来回回就花了差不多两个月时间。

本篇是mygin的第七篇,参照gin框架,感兴趣的可以从
Mygin第一篇
开始看,Mygin从零开始完全手写,在实现的同时,带你一窥gin框架的核心原理实现。

目的

  • 中间件Middleware优化
  • 默认log日志中间件
    在上篇
    Mygin实现中间件Middleware
    中间件Middleware很生硬,完全依赖循环,如果某个中间件想要cover住全部中间件,比如我想记录,整个请求的耗时时间,以便针对优化的功能。因此需要把之前生硬的方式做一些修改。

修改

1.实例化上下文修改
//实例化一个下上文
	c := &Context{
		Request:  r,
		Writer:   w,
		Params:   params,
		handlers: handlers,
		index:    -1, //默认下标为-1
	}
2.修改上下文Next方法
// Next 执行链中的剩余处理程序。
func (c *Context) Next() {
	c.index++
	//遍历handlers
	for c.index < int8(len(c.handlers)) {
		//真正调用执行handler方法
		c.handlers[c.index](c)
		c.index++
	}
}

日志

参照(复制)gin中的写法,新建mygin/logger.go日志中间件文件。

  • mygin/logger.go
package mygin

import (
	"fmt"
	"net/http"
	"time"
)

const (
	green   = "\033[97;42m" // 绿色
	white   = "\033[90;47m" // 白色
	yellow  = "\033[90;43m" // 黄色
	red     = "\033[97;41m" // 红色
	blue    = "\033[97;44m" // 蓝色
	magenta = "\033[97;45m" // 洋红色
	cyan    = "\033[97;46m" // 青色
	reset   = "\033[0m"     // 重置颜色
)

type LogFormatterParams struct {
}

// MethodColor 方法颜色获取
func (l *LogFormatterParams) MethodColor(method string) string {
	switch method {
	case http.MethodGet:
		return blue
	case http.MethodPost:
		return cyan
	case http.MethodPut:
		return yellow
	case http.MethodDelete:
		return red
	case http.MethodPatch:
		return green
	case http.MethodHead:
		return magenta
	case http.MethodOptions:
		return white
	default:
		return reset
	}
}

// StatusCodeColor 状态颜色获取
func (l *LogFormatterParams) StatusCodeColor(code int) string {
	switch {
	case code >= http.StatusOK && code < http.StatusMultipleChoices:
		return green
	case code >= http.StatusMultipleChoices && code < http.StatusBadRequest:
		return white
	case code >= http.StatusBadRequest && code < http.StatusInternalServerError:
		return yellow
	default:
		return red
	}
}

// LoggerFunc 记录日志的方法
func (l *LogFormatterParams) LoggerFunc() HandlerFunc {
	return func(context *Context) {
		// 启动时间
		start := time.Now()

		// 后续处理请求
		context.Next()

        //后续处理请求 结束时间
		now := time.Now()

		str := fmt.Sprintf("[MyGIN] %v |%s %3d %s| %13v  |%s %-7s %s %#v\n",
			now.Format("2006/01/02 - 15:04:05"),
			l.StatusCodeColor(context.status), context.status, reset,
			now.Sub(start), //耗时
			l.MethodColor(context.Request.Method), context.Request.Method, reset,
			context.Request.URL.Path,
		)
		fmt.Println(str)
	}
}

日志中间件

测试

测试代码

package mygin

import (
	"net/http"
	"path"
	"testing"
)

func TestMyGin06(t *testing.T) {
	r := Default()
	r.Use()
	//测试需要登录
	group := r.Group("/api", func(context *Context) {
		//todo....
		context.String(http.StatusOK, "api Group 中间件失败了....\n")
		context.Abort()
	})

	group.Use()

	//这个回调不会执行
	group.GET("/hello/:name", func(context *Context) {
		name := context.Params.ByName("name")
		context.String(http.StatusOK, path.Join("hello ", name, "!"))
	})

	//测试没有发生Abort
	group2 := r.Group("/api2", func(context *Context) {
		//todo....
		context.String(http.StatusOK, "api Group 中间件成功了....\n")
	})

	//这个回调会执行
	group2.GET("/hello2/:name", func(context *Context) {
		name := context.Params.ByName("name")
		context.String(http.StatusOK, path.Join("hello2 ", name, "!\n"))
	})

	// 启动服务器并监听端口
	r.Run(":8088")
}
启动测试
go test
curl请求测试
curl -i http://localhost:8088/api2/hello2/scott
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Date: Tue, 30 Jan 2024 06:56:03 GMT
Content-Length: 49

api Group 中间件成功了....
hello2 /scott/!
➜  ~ curl -i http://localhost:8088/api/hello/scott
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Date: Tue, 30 Jan 2024 06:56:26 GMT
Content-Length: 33

api Group 中间件失败了....
查看控制台输出

本文给大家介绍一下在 Spring Boot 项目中如何集成消息队列 RabbitMQ,包含对 RibbitMQ 的架构介绍、应用场景、坑点解析以及代码实战。最后文末有免费领取龙年红包封面以及腾讯云社区答题领奖福利,欢迎大家领取。

我将使用 waynboot-mall 项目作为代码讲解,项目地址:
https://github.com/wayn666666/waynboot-mall。本文大纲如下,

image

RabbitMQ 架构介绍

image

RibbitMQ 是一个基于 AMQP 协议的开源消息队列系统,具有高性能、高可用、高扩展等特点。通常作为在系统间传递消息的中间件,它可以实现异步处理、应用解耦、流量削峰等功能。

image

RibbitMQ 的主要组件介绍如下,

  • producter:生产者,创建消息,然后将消息发布(发送)到 RabbitMQ。
  • channel: 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟链接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • broker: 标识消息队列服务器实体 rabbitmq-server。
  • 连接器:这是负责接收客户端连接请求和建立连接的组件。RabbitMQ 支持多种连接器,如 AMQP 0-9-1, AMQP 1.0, MQTT, STOMP 等。
  • v-host:虚拟主机,这是 RabbitMQ 的逻辑隔离单元,每个虚拟主机相当于一个独立的代理,拥有自己的交换器、队列、绑定、权限等。不同的虚拟主机之间是相互隔离的,不能共享资源。一个 RabbitMQ 实例可以创建多个虚拟主机,以满足不同的业务需求。
  • exchange:交换机,这是负责接收生产者发送的消息,并根据路由规则将消息分发到相应的队列或者其他交换器的组件。RabbitMQ 支持多种类型的交换器,如 fanout, direct, topic, headers 等。
  • binding:绑定,这是负责将交换器和队列之间建立关联关系的组件。绑定可以指定一个路由键或者模式匹配规则,以决定哪些消息可以被路由到哪些队列。
  • queue:队列,这是负责存储消费者需要消费的消息的组件。队列可以有多种属性和特性,如持久化、排他性、自动删除、死信队列、优先级队列等。队列可以绑定到一个或多个交换器上,并指定一个或多个路由键或者模式匹配规则。
  • consuemer:消费者,连接到 RabbitMQ 服务器,并订阅到队列上,接收来自队列的消息。

应用场景

image

RabbitMQ 是一个非常强大和灵活的消息中间件,它可以应用于多种场景和需求。以下是一些常见的 RabbitMQ 应用场景和实战经验:

  • 异步处理:当系统需要执行一些耗时或者不重要的任务时,可以使用 RabbitMQ 将任务封装成消息发送到队列中,然后由专门的消费者来异步地执行这些任务。这样可以提高系统的响应速度和用户体验,同时也可以避免因为任务失败或超时而影响主流程的执行。例如在 waynboot-mall 项目中,用户下单后需要发送邮件通知,这个任务就可以使用 RabbitMQ 异步处理。
  • 流量削峰:当系统面临突发的高并发请求时,如果直接让所有请求打到后端服务器上,可能会导致服务器崩溃或者响应缓慢。这时可以使用 RabbitMQ 作为一个缓冲层,将请求先发送到队列中,然后由后端服务器按照自己的处理能力从队列中拉取请求进行处理。这样可以平滑地分摊请求压力,避免系统崩溃或者服务降级。例如,在 waynboot-mall 项目中,每天晚上八点有秒杀活动,这时可以使用 RabbitMQ 来削峰限流,保证系统的稳定运行。
  • 消息广播:当系统需要将消息发送到多个接收方时,可以使用 RabbitMQ 的发布/订阅模式,将消息发送到一个 fanout 类型的交换器上,然后由多个队列绑定到这个交换器上,从而实现消息的广播功能。这样可以实现一对多的消息通信,同时也可以根据不同的业务需求,订阅不同的消息内容。例如,在 waynboot-mall 项目中,当商品信息发生变化时,需要通知搜索系统、推荐系统、缓存系统等多个系统,这时可以使用 RabbitMQ 的消息广播功能。
  • 消息路由:当系统需要根据不同的条件将消息发送到不同的接收方时,可以使用 RabbitMQ 的路由模式,将消息发送到一个 direct 或者 topic 类型的交换器上,然后由多个队列绑定到这个交换器上,并指定不同的路由键或者模式匹配规则,从而实现消息的路由功能。这样可以实现多对多的消息通信,同时也可以灵活地控制消息的分发和消费。例如,在 waynboot-mall 项目中,当订单状态发生变化时,需要通知不同的系统进行不同的处理,这时可以使用 RabbitMQ 的消息路由功能。

坑点分析

image

在使用 RabbitMQ 的过程中,有一些常见的问题需要注意:

  • 消息确认:消息确认是 RabbitMQ 保证消息可靠传递的机制。消息确认分为生产者确认和消费者确认。生产者确认是指生产者发送消息后,等待 RabbitMQ 返回一个确认消息,表明消息已经被正确接收和存储。消费者确认是指消费者接收消息后,向 RabbitMQ 发送一个确认消息,表明消息已经被正确处理和消费。在 waynboot-mall 项目中,消费者开启了手动消息确认。
  • 消息持久化:消息持久化是指将消息存储到磁盘上,以防止 RabbitMQ 重启或者崩溃时丢失消息。消息持久化需要满足以下三个条件:交换器、队列和消息都需要设置为持久化。持久化会影响 RabbitMQ 的性能,因为需要进行磁盘 IO 操作。建议根据业务需求选择是否需要持久化消息,并合理地配置磁盘空间和清理策略。在 waynboot-mall 项目中,交换器、队列设置了持久化,消息没有设置持久化(消息设置持久化会对 RabbitMQ 的性能造成较大影响)。
  • 死信队列:死信队列是指存储那些因为某些原因无法被正常消费的消息的队列。死信队列可以用来处理一些异常或者失败的情况,如消息过期、队列达到最大长度、消费者拒绝等。建议使用死信队列来监控和处理这些情况,并根据业务需求选择合适的重试或者补偿策略。在 waynboot-mall 项目中,当订单消费者处理消息失败重试三次后,会将订单消息发送到死信队列。
  • 集群和镜像:集群和镜像是 RabbitMQ 实现高可用和高扩展的两种方式。集群是指将多个 RabbitMQ 实例组成一个逻辑单元,共享元数据和负载均衡。镜像是指将同一个队列在多个节点上创建副本,实现数据冗余和容错。建议根据业务需求选择合适的集群模式和镜像类型,并注意集群中的网络分区、脑裂等问题。

代码实战

在 waynboot-mall 项目中,消息层包含两个模块 waynboot-message-core 以及 waynboot-message-consumer,目录结构如下,

|-- waynboot-message-core     // 核心消息配置,供其他服务集成使用
|   |-- config
|   |-- constant
|   |-- dto
|-- waynboot-message-consumer // 消息消费服务,订阅队列接收消息,调用其他服务执行一些具体的业务逻辑
|   |-- api
|   |-- config
|   |-- consumer

waynboot-message-core 包目录说明如下,

  • config:核心消息配置目录,包含业务上使用的订单消息、邮件消息、死信消息、延迟消息的交换机、队列、路由绑定配置以及 RabbitTemplate 配置。
  • constants:核心消息配置的相关常量目录,包含 MQ 的常量类,这里面会定义订单、邮件、死信、延迟消息的交换机名称、队列名称、路由键名称等。
  • dto:核心消息配置的数据转换实体目录,包含 OrderDTO 等。

waynboot-message-consumer 包目录说明如下,

  • api:消息消费服务调用其他服务定义的 api 包目录,包含 MobileApi 类用来调用 moibile-api。
  • config:消息消费服务的核心配置目录,包含 RestTemplate 配置类。
  • consumer:消息消费服务的消费者包目录,包含下单、发送邮件、未支付订单超时取消等消费者。

添加 POM 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>

指定虚拟主机

image

在 waynboot-mall 项目中,通过 yml 文件的 spring.rabbitmq.virtual-host=“/” 属性来指定虚拟主机名称。

建议大家在使用 RabbitMQ 时都配置好自己项目的虚拟主机名称,来达到各系统资源隔离的目的。当然如果 RabbitMQ 服务只有一个项目在用,那就用默认的 / 作为虚拟主机名称也是可以的。

小知识:出于多租户和安全因素设计的,vhost 把 AMQP 的基本组件划分到一个虚拟的分组中。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个 RabbitMQ 服务器时,可以划分出多个虚拟主机。RabbitMQ 默认的虚拟主机路径是 /。

生产者发送消息

在 waynboot-mall 项目中,用订单消息来举例,生产者发送消息需要经过三个步骤

1. 创建订单消息的交换机、队列以及路由绑定

public class MQConstants {
    public static final String ORDER_DIRECT_QUEUE = "order_direct_queue";
    public static final String ORDER_DIRECT_EXCHANGE = "order_direct_exchange";
    public static final String ORDER_DIRECT_ROUTING = "order_direct_routing";
}

@Configuration
public class BusinessRabbitConfig {
    @Bean
    public Queue orderDirectQueue() {
        return new Queue(MQConstants.ORDER_DIRECT_QUEUE);
    }

    @Bean
    DirectExchange orderDirectExchange() {
        return new DirectExchange(MQConstants.ORDER_DIRECT_EXCHANGE);
    }

    @Bean
    Binding bindingOrderDirect() {
        return BindingBuilder.bind(orderDirectQueue()).to(orderDirectExchange()).with(MQConstants.ORDER_DIRECT_ROUTING);
    }

}

在 BusinessRabbitConfig 中,我们创建了订单交换机、队列以及路由绑定关系。在 Spring 项目中,项目启动时,就会自动在 RabbitMQ 服务器上创建好这些东西。

image

image

2. 生产者配置

image

生产者的消息发送确认主要包含两部分,

producter -> rabbitmq broker exchange -> queue

  • 消息从 producte( 生产者)发送到 rabbitmq broker(RabbitMQ 服务器)的交换机中,发送后会触发
    confirmCallBack
    回调
  • 消息从 exchange 发送到 queue,投递失败则会调用
    returnCallBack
    回调

waynboot-mall 项目的 yml 中关于 RabbitMQ 的相关配置如下,

spring:
  # 配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 消息确认配置项
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列(Queue)
    publisher-returns: true
    # 虚拟主机名称
    virtual-host: /
publisher-confirm-type 属性

可以看到,我们设置了 publisher-confirm-type 属性为 correlated,表示开启发布确认模式,用来确认消息已发送到交换机,publisher-confirm-type 有三个选项:

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回
    confirmCallBack
    回调方法
  • SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
publisher-returns 属性

在 RabbitMQ 中,消息发送到交换机中也不代表消费者一定能接收到消息,所以我们还需要设置 publisher-returns 为 true 来表示确认交换机中消息已经发送到队列里。true 表示开启失败回调,开启后当消息无法路由到指定队列时会触发
ReturnCallback
回调。

接着是 RabbitTemplateConfig 的代码,这里面会定义前面提到的
confirmCallBack

returnCallBack
相关代码,

@Slf4j
@Component
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        // 交换机收到消息回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        // 队列收到消息回调,如果失败的话会进行 returnCallback 的回调处理,反之成功就不会回调。
        rabbitTemplate.setReturnsCallback(returned -> {
            log.info("returnCallback:     " + "消息:" + returned.getMessage());
            log.info("returnCallback:     " + "回应码:" + returned.getReplyCode());
            log.info("returnCallback:     " + "回应信息:" + returned.getReplyText());
            log.info("returnCallback:     " + "交换机:" + returned.getExchange());
            log.info("returnCallback:     " + "路由键:" + returned.getRoutingKey());
        });

        return rabbitTemplate;
    }
}

在 RabbitTemplateConfig 类代码里,我们可以设置
confirmCallBack

returnCallBack
回调函数后,监控生产者发送消息是否被交换机接收、以及交换机是否把消息发送到队列中。

3. 使用 RabbitTemplate 发送消息

在 Spring Boot 项目中,集成了 spring-boot-starter-amqp 依赖后,就可以直接注入 RabbitTemplate 来发送消息。

这里用 waynboot-mall 项目中的异步下单流程举例,代码如下,

@Slf4j
@Service
@AllArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {

    private RabbitTemplate rabbitTemplate;

    @Override
    public R asyncSubmit(OrderVO orderVO) {
        OrderDTO orderDTO = new OrderDTO();
        ...

        // 开始异步下单
        String uid = IdUtil.getUid();
        // 1. 创建消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突
        CorrelationData correlationData = new CorrelationData(uid);
        // 2. 创建消息载体 Message ,AMQP 规范中定义的消息承载类,用来在生产者和消费者之前传递消息
        Map<String, Object> map = new HashMap<>();
        map.put("order", orderDTO);
        map.put("notifyUrl", WaynConfig.getMobileUrl() + "/callback/order/submit");
        try {
            Message message = MessageBuilder
                    .withBody(JSON.toJSONString(map).getBytes(Constants.UTF_ENCODING))
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .build();
            // 3. 发送消息到 RabbitMQ 服务器,需要指定交换机、路由键、消息载体以及消息ID
            rabbitTemplate.convertAndSend(MQConstants.ORDER_DIRECT_EXCHANGE, MQConstants.ORDER_DIRECT_ROUTING, message, correlationData);
        } catch (UnsupportedEncodingException e) {
            log.error(e.getMessage(), e);
        }
        return R.success().add("actualPrice", actualPrice).add("orderSn", orderSn);
    }
}

waynboot-mall 项目中在使用 rabbitTemplate 发送消息时,按照如下步骤,大家可以参考

  1. 创建消息 ID,确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,消费者消费时出现 ack 冲突。
  2. 创建消息载体 Message ,AMQP 规范中定义的消息承载类,用来在生产者和消费者之前传递消息。
  3. 发送消息到 RabbitMQ 服务器,需要指定交换机、路由键、消息载体以及消息 ID。


以上就是生产者发送消息时所有相关代码了,接着我们看下消费者处理消息的相关代码。

消费者处理消息

在 waynboot-mall 项目中,还是用订单消息来举例,消费者 yml 配置如下,

1. 消费者配置

在 RabbitMQ 的消息消费环节,需要注意的一点就是,如果需要确保消费者不出现漏消费,则需要开启消费者的手动 ack 模式。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    ...
    listener:
      simple:
        # 消息确认方式,其有三种配置方式,分别是none、manual(手动ack) 和auto(自动ack) 默认auto
        acknowledge-mode: manual
        # 一个消费者最多可处理的nack(未确认)消息数量,默认是250
        prefetch: 250
        # 设置消费者数量
        concurrency: 1
acknowledge-mode 属性

在 yml 文件的消费者配置中,acknowledge-mode 属性用于指定消息确认模式,有三种模式:

  1. 手动确认 manual,在该模式下,消费者消费消息后需要根据消费情况给 Broker 返回一个回执,是确认 ack 使 Broker 删除该条已消费的消息,还是失败确认返回 nack,还是拒绝该消息。开启手动确认后,如果消费者接收到消息后还没有返回 ack 就宕机了,这种情况下消息也不会丢失,只有 RabbitMQ 接收到返回 ack 后,消息才会从队列中被删除。
  2. 自动确认 none,rabbitmq 默认消费者正确处理所有请求(不设置时的默认方式)。
  3. 根据请况确认 auto,主要分成以下几种情况:
    • 如果消费者在消费的过程中没有抛出异常,则自动确认。
    • 当消费者消费的过程中抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且该消息不会重回队列。
    • 当抛出 ImmediateAcknowledgeAmqpException 异常,消息会被确认。
    • 如果抛出其他的异常,则消息会被拒绝,但是与前两个不同的是,该消息会重回队列,如果此时只有一个消费者监听该队列,那么该消息重回队列后又会推送给该消费者,会造成死循环的情况。
prefetch 属性

消费者配置中,prefetch 属性用于指定消费者每次从队列获取的消息数量。

每个 customer 会在 MQ 预取一些消息放入内存的 LinkedBlockingQueue 中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果 ack 模式为 none,则忽略。

prefetch 默认值以前是 1,这可能会导致高效使用者的利用率不足。从 spring-amqp 2.0 版开始,默认的 prefetch 值是 250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch 的值应当设置为 1。

对于低容量消息和多个消费者的情况(也包括单 listener 容器的 concurrency 配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动 ack 下并设置 prefetch=1。

如果要保证消息的可靠不丢失,当 prefetch 大于 1 时,可能会出现因为服务宕机引起的数据丢失,故建议将 prefetch=1。

concurrency 属性

消费者配置中,concurrency 属性设置的是对每个 listener 在初始化的时候设置的并发消费者的个数。在上面的 yml 配置中,concurrency=1,即每个 Listener 容器将开启一个线程去处理消息。在 2.0 以后的版本中,可以在注解中配置该参数,实例代码如下,

@RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE, concurrency = "2")
public void process(Channel channel, Message message) throws IOException {
    String body = new String(message.getBody());
    log.info("OrderPayConsumer 消费者收到消息: {}", body);
    ...
}

2. 使用 RabbitListener 注解消费消息

在 waynboot-mall 项目中,消费者监听队列代码如下,

@Slf4j
@Component
public class OrderPayConsumer {
    @Resource
    private RedisCache redisCache;
    @Resource
    private MobileApi mobileApi;

    @RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE)
    public void process(Channel channel, Message message) throws IOException {
        // 1. 转换订单消息
        String body = new String(message.getBody());
        log.info("OrderPayConsumer 消费者收到消息: {}", body);
        // 2. 获取消息ID
        String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        // 3. 获取发送tag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 4. 消费消息幂等性处理
        if (redisCache.getCacheObject(ORDER_CONSUMER_MAP.getKey()) != null) {
            // redis中包含该 key,说明该消息已经被消费过
            log.error("msgId: {},消息已经被消费", msgId);
            channel.basicAck(deliveryTag, false);// 确认消息已消费
            return;
        }
        try {
            // 5. 下单处理
            mobileApi.submitOrder(body);
            // 6. 手动ack,消息成功确认
            channel.basicAck(deliveryTag, false);
            // 7. 设置消息已被消费标识
            redisCache.setCacheObject(ORDER_CONSUMER_MAP.getKey(), msgId, ORDER_CONSUMER_MAP.getExpireSecond());
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
            log.error(e.getMessage(), e);
        }
    }
}

waynboot-mall 项目中在使用 RabbitListener 注解消费消息时,按照如下步骤,大家可以参考

  1. 将 message 参数转换成订单消息。
  2. 从 message 参数中获取消息唯一 msgId。
  3. 从 message 参数中获取消息发送 tag。
  4. 幂等性处理,根据第二步获取的 msgId ,消费消息时需要先判断 msgId 是否已经被处理。
  5. 调用 mobile-api 服务,进行下单逻辑处理,在 mobileApi.submitOrder(body) 方法中使用 Spring-Retry 的 @Retryable 注解,进行自动重试。
  6. 手动 ack,basicAck(long deliveryTag, boolean multiple)。basicAck 方法表示成功确认,使用此方法后,消息就会被 rabbitmq 服务器删除。

其中参数 long deliveryTag 为消息的唯一序号也就是第三步获取的发送 tag,第二个 boolean multiple 参数表示是否一次消费多条消息,false 表示只确认该序列号对应的消息,true 则表示确认该序列号对应的消息以及比该序列号小的所有消息,比如我先发送 2 条消息,他们的序列号分别为 2,3,并且他们都没有被确认,还留在队列中,那么如果当前消息序列号为 4,那么当 multiple 为 true,则序列号为 2、3 的消息也会被一同确认。

  1. 幂等性处理,消息已经被成功消费后,根据第二步获取的 msgId 设置幂等标识。

总结一下

这篇文章给大家讲解了在 Spring Boot 项目中如何集成消息队列 RabbitMQ 用于业务逻辑解耦,有架构介绍、应用场景、坑点解析、代码实战 4 个部分,能带领大家比较全面的了解一波 RabbitMQ。大家在自己的项目中如果需要引入 RabbitMQ 时,都可以参考本文的代码实战配置,帮助大家快速集成、避免踩坑。

image

分类模型
评估中,通过各类损失(
loss
)函数的分析,可以衡量模型预测结果与真实值之间的差异。
不同的损失函数可用于不同类型的分类问题,以便更好地评估模型的性能。

本篇将介绍分类模型评估中常用的几种损失计算方法。

1. 汉明损失

Hamming loss

汉明损失
)是一种衡量分类模型预测错误率的指标。
它直接衡量了模型预测错误的样本比例,因此更直观地反映出模型的预测精度,
而且,它对不平衡数据比较敏感,也适用于多分类的问题,不仅限于二分类问题。

1.1. 计算公式

\(L(y, \hat{y}) = \frac{1}{n * m} \sum_{i=0}^{n-1} \sum_{j=0}^{m - 1} 1(\hat{y}_{i,j} \not= y_{i,j})\)
其中,
\(n\)
是样本数量,
\(m\)
是标签数量,
\(y_{i,j}\)
是样本
\(i\)
的第
\(j\)
个标签的真实值,
\(\hat{y}_{i,j}\)
是对应的预测值,
\(1(x)\)
是指示函数。

1.2. 使用示例

from sklearn.metrics import hamming_loss
import numpy as np

n = 100
y_true = np.random.randint(1, 10, n)
y_pred = np.random.randint(1, 10, n)

s = hamming_loss(y_true, y_pred)
print("hamming loss:{}".format(s))

# 运行结果
hamming loss:0.8

2. 铰链损失

Hinge loss

铰链损失
)常用于“最大间隔”分类,其最著名的应用是作为支持向量机(SVM)的目标函数。
Hinge loss
主要用于二分类问题,并且通常与特定的算法(如SVM)结合使用。

2.1. 计算公式

\(L(y, w) = \frac{1}{n} \sum_{i=0}^{n-1} \max\left\{1 - w_i y_i, 0\right\}\)
其中,
\(n\)
是样本数量,
\(y_i\)
是真实值,
\(w_i\)
是相应的预测决策(由
decision_function
方法输出)。

2.2. 使用示例

from sklearn.metrics import hinge_loss
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split
import numpy as np

n = 100
X = np.random.randint(0, 2, size=(n, 1))
y = np.random.randint(0, 2, n)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

reg = LinearSVC(dual="auto")
reg.fit(X_train, y_train)

y_pred_decision = reg.decision_function(X_test)

s = hinge_loss(y_test, y_pred_decision)
print("hinge loss:{}".format(s))

# 运行结果
hinge loss:1.0136184446302712

上面的示例中,首先构建一个支持向量机的训练模型和随机的样本数据。
最后在测试集上计算
hinge loss

3. 对数损失

对数损失

log loss
)通过考虑模型预测的概率与实际标签的
对数误差
来评估模型的性能。
它特别关注模型对于每个样本的预测概率的准确性,对于错误的分类,
Log loss
会给予较大的惩罚。

对数损失
的值越小,表示模型的预测概率越接近实际标签,模型的性能越好。

3.1. 计算公式

\(LL = - \frac{1}{N} \sum_{i=0}^{N-1} \sum_{k=0}^{K-1} y_{i,k} \log p_{i,k}\)
其中,
\(N\)
是样本数量,
\(K\)
是分类标签的数量,
\(y_{i,k}\)
是第
\(i\)
个样本在标签
\(k\)
上的真实值,
\(p_{i,k}\)
是对应的概率估计。

3.2. 使用示例

from sklearn.metrics import log_loss
import numpy as np

n = 100
k = 10
y_true = np.random.randint(0, k, n)
y_prob = np.random.rand(n, k)

# 这一步转换后,
# y_prob 每一行的和都为1
for i in range(len(y_prob)):
    y_prob[i, :] = y_prob[i, :] / np.sum(y_prob[i, :])


s = log_loss(y_true, y_prob)
print("log loss:{}".format(s))

# 运行结果
log loss:2.6982702715125466

上面的示例中,
\(n\)
是样本数量,
\(k\)
是标签数量。

4. 零一损失

零一损失

zero-one loss
)非常直观,直接对应着分类判断错误的个数,能很清晰地反映出模型预测错误的比例。
它计算简单,易于理解和实现,对于二分类问题特别直观,但是对于非凸性质不太适用。

4.1. 计算公式

\(L(y, \hat{y}) = \frac{1}{n} \sum_{i=0}^{n-1} 1(\hat{y}_i \not= y_i)\)
其中,
\(n\)
是样本数量,
\(y_i\)
是真实值,
\(\hat{y_i}\)
是预测值,
\(1(x)\)
是指示函数。

4.2. 使用示例

from sklearn.metrics import zero_one_loss
import numpy as np

n = 100
y_true = np.random.randint(1, 10, n)
y_pred = np.random.randint(1, 10, n)

s1 = zero_one_loss(y_true, y_pred)
s2 = zero_one_loss(y_true, y_pred, normalize=False)
print("zero-one loss比率:{}\nzero-one loss数量:{}".format(s1, s2))

# 运行结果
zero-one loss比率:0.89
zero-one loss数量:89

5. Brier 分数损失

Brier 分数损失

Brier score loss
)关注模型预测的概率与实际结果之间的差异。
与只关注预测类别的其他指标不同,它衡量了预测概率的可靠性;
与一些仅适用于二分类问题的评估指标相比,Brier score loss可以应用于多类别分类问题。

它的数值越小,表示模型的概率预测越准确,具有很好的解释性。

5.1. 计算公式

\(BS = \frac{1}{n} \sum_{i=0}^{n - 1}(y_i - p_i)^2\)
其中,
\(n\)
是样本数量,
\(y_i\)
是真实值,
\(p_i\)
是预测概率估计的均方误差。

5.2. 使用示例

from sklearn.metrics import brier_score_loss
import numpy as np

n = 100
y_true = np.random.randint(0, 2, n)
y_prob = np.random.rand(n)

s = brier_score_loss(y_true, y_prob)
print("brier score loss:{}".format(s))

# 运行结果
brier score loss:0.3141953858083935

示例中计算损失用的模拟数据中,
y_true
表示真实值,
y_prob
表示预测概率的均方误差。

6. 总结

本篇归纳总结了
分类模型
中关于
损失函数
的一些使用方式:

  • 汉明损失,Hamming loss
  • 铰链损失,Hinge loss
  • 对数损失,log loss
  • 零一损失,zero one loss
  • Brier 分数损失,Brier score loss

运维平台中有许多的周期/定时/异步任务,例如证书扫描、数据备份、日志清理、线上作业等等,这些任务的执行都是借助于Celery来完成的。任务多了之后就会遇到一系列的问题,例如我之前写过的
将任务分多队列来解决生产环境下的任务优先级问题
,除此之外还要经常关注队列的状态以及任务的运行情况,为了方便查看任务以及Worker的运行情况,我在后台中添加了队列状态这个功能

这个功能参考了Celery知名监控工具Flower的实现,之所以没有直接使用Flower,主要有几方面的考虑,其一是Flower的页面风格与我们本身的平台风格差异较大,其二是Flower要单独启动进程或者是去看代码深度集成,比较麻烦,其三也是想要更加深入的了解下Celery,毕竟项目用了它,了解不深遇到问题不好解决。基于以上几点考虑,就自己实现了Celery监控功能的开发。好在并不复杂,Celery本身提供了一组API,可以查询任务队列的状态、执行结果等信息。借助于这些API就能完成比较详细的监控,Celery的API主要有两个

inspect

app.control.inspect(): 这个方法返回一个Inspect对象,可以使用它来获取任务队列、工作节点等的信息。例如
inspect().active()
可以获取当前活动的任务列表,
inspect().registered()
可以获取已注册的任务列表,不指定worker的情况下查看全部worker的数据,如果指定worker则查看对应worker的数据

具体的用法如下:

from celery import Celery

app = Celery('your_celery_app_name')

# 检查工作节点的在线状态
worker_status = app.control.inspect([worker]).ping()

# 返回工作节点的统计信息,如活动任务数、完成任务数等
worker_stats = app.control.inspect([worker]).stats()

# 返回活动任务的信息
active_tasks = app.control.inspect([worker]).active()

# 返回已注册任务的信息
registered_tasks = app.control.inspect([worker]).registered()

# 返回计划中的任务的信息
scheduled_tasks = app.control.inspect([worker]).scheduled()

# 返回已预订任务的信息
reserved_tasks = app.control.inspect([worker]).reserved()

# 返回已撤销任务的信息
revoked_tasks = app.control.inspect([worker]).revoked()

# 返回活动队列的信息
active_queues = app.control.inspect([worker]).active_queues()

# 查询worker的配置信息
worker_conf = app.control.inspect([worker]).conf()

# 返回工作节点的报告信息
worker_reports = app.control.inspect([worker]).report()

# 查询特定任务的信息
task_info = app.control.inspect([worker]).query_task(task_id)

通过
inspect
可以查看celery整体以及单个worker的相关信息,例如首页的状态就是通过
ping
来实现的

一个card表示一个worker,点击card可以查看worker的详细信息,例如Pool标签的数据就来自于
stats
,Register标签的数据就来自于
registered
,Tasks标签就分别展示了
active

scheduled

reserved

revoked
的任务数据

通过这些信息可以全面掌握Celery的运行状态以及每个Worker的运行情况,做到对Celery的全局掌控

AsyncResult

AsyncResult: 这个类可以查询特定任务的状态和结果。通过任务的ID创建一个AsyncResult对象,并使用其方法来获取任务的状态、结果等信息。例如
AsyncResult(task_id).state
可以获取任务的状态,
AsyncResult(task_id).result
可以获取任务的执行结果

具体的用法如下:

from celery import Celery

app = Celery('your_celery_app_name')

result = app.AsyncResult(task_id)

# 获取任务状态
state = result.state

# 获取任务结果
result = result.result

# 返回一个布尔值,检查任务是否已经完成
is_ready = result.ready()

# 返回一个布尔值,检查任务是否成功完成
is_successful = result.successful()

# 返回一个布尔值,检查任务是否执行失败
is_failed = result.failed()

# 返回一个字符串,获取任务的错误追溯信息
traceback = result.traceback

# 返回一个AsyncResult对象,获取任务的父任务
parent_task = result.parent

# 返回一个列表,包含任务的子任务的AsyncResult对象,获取任务的子任务
child_tasks = result.children

# 返回一个字典,获取任务的其他信息
info = result.info

# 获取任务的结果,可以指定超时时间和是否向上传播异常
result = result.get(timeout=10, propagate=False)

# 忘记任务,将任务从结果存储中删除。一旦任务被遗忘,将无法查询其状态和结果
result.forget()

通过
AsyncResult
可以获取到任务执行的相关信息,对任务执行过程和结果都有很好的把控。不过这需要任务的ID,任务ID通常可以通过任务执行时获取,
ops_coffee_auto_notify_task.delay()
异步执行任务后返回的就是任务的ID。但对于任务的话我通常会单独记录任务执行的过程和状态,而不依赖Celery的结果记录,所以对于
AsyncResult
的需求并不强

但平常需要开关系统内置的周期任务,此时就需要知道系统任务列表,通过获取
PeriodicTask
表的数据即可,同时通过修改
PeriodicTask
表的
enabled
字段值来达到开启或暂停的目的

inspect
更关注Celery的Worker运行情况,而
AsyncResult
则更关注于Celery的任务运行状态,通过两者的配合就能更好的掌握周期/定时/异步任务的执行情况了,通过监控功能的开发也对Celery有了更深的了解,同时
运维自动化平台
也变得更加完善