基于Locust实现MQTT协议服务的压测脚本
最近在忙业务的间隙,穿插着做了些性能测试。
一、背景简介
业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。
目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。
本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。
捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):
- 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
- 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
- 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。
其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。
二、技术选型
这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。
所以就继续编码好了,仍然首选python,想到了
locust
库,后来看官方文档的时候,看到
locust
也针对
mqtt
协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。
搜索中又发现
Python
中用于
mqtt
协议的库叫
paho.mqtt
,支持连接代理,消息的订阅、收发等等,于是最后确定使用:
locust
+
paho.mqtt
的组合来实现本次的负载脚本。
三、代码编写
1. 脚本代码
暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。
脚本目前做了这些事情:
- 从db中查询有效可用的所有测试车辆信息数据
- 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
- 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
- 脚本统计连接数、请求数、响应时间等信息写到报表中
- 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv
import datetime
import queue
import os
import sys
import time
import ssl
from paho.mqtt import client as mqtt_client
# 根据不同系统进行路径适配
if os.name == "nt":
path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.insert(0, path)
from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
sys.path.append("/app/qa_test_app/")
from GB_test.utils.mysql_operating import DB
from locust import User, TaskSet, events, task, between, run_single_user
BROKER_ADDRESS = "broker服务地址"
PORT = 6666661
PASSWORD = "666666666666"
PUBLISH_TIMEOUT = 10000 # 超时时间
TEST_TOPIC = "test_topic"
TEST_VALUE = [16, 3, -26, 4, 0, 36,.......] # 用来publish的测试数据,仅示意
BYTES_DATA = bytes(i % 256 for i in TEST_VALUE) # 业务需要转换成 byte 类型后再发送
# 创建队列
client_queue = queue.Queue()
# 连接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:
# 把可用的车辆信息存到队列中去
client_queue.put(t)
def fire_success(**kwargs):
"""请求成功时调用"""
events.request.fire(**kwargs)
def calculate_resp_time(t1, t2):
"""计算响应时间"""
return int((t2 - t1) * 1000)
class MQTTMessage:
"""已发送的消息实体类"""
def __init__(self, _type, qos, topic, payload, start_time, timeout):
self.type = _type,
self.qos = qos,
self.topic = topic
self.payload = payload
self.start_time = start_time
self.timeout = timeout
# 统计总共发送成功的消息数量
total_published = 0
disconnect_record_list = [] # 定义存放连接断开的记录的列表容器
class PublishTask(TaskSet):
@task
def task_publish(self):
self.client.loop_start()
topic = TEST_TOPIC
payload = BYTES_DATA
# 记录发送的开始时间
start_time = time.time()
mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
published_mid = mqtt_msg_info.mid
# 将发送成功的消息内容,放入client实例的 published_message 字段
self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
0,
topic,
payload,
start_time,
PUBLISH_TIMEOUT)
# 发送成功回调
self.client.on_publish = self.on_publish
# 断开连接回调
self.client.on_disconnect = self.on_disconnect
@staticmethod
def on_disconnect(client, userdata, rc):
""" broker连接断开,放入列表容器"""
disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
disconnect_record_list.append(disconnected_info)
print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))
@staticmethod
def on_publish(client, userdata, mid):
if mid:
# 记录消息发送成功的时间
end_time = time.time()
# 从已发送的消息容器中,取出消息
message = client.published_message.pop(mid, None)
# 计算开始发送到发送成功的耗时
publish_resp_time = calculate_resp_time(message.start_time, end_time)
fire_success(
request_type="p_success",
name="client_id: " + str(client._client_id),
response_time=publish_resp_time,
response_length=len(message.payload),
exception=None,
context=None
)
global total_published
# 成功发送累加1
total_published += 1
class MQTTLocustUser(User):
tasks = [PublishTask]
wait_time = between(2, 2)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 从队列中获取客户端 username 和 client_id
current_client = client_queue.get()
self.client = mqtt_client.Client(current_client[1])
self.client.username_pw_set(current_client[0], PASSWORD)
# self.client.username_pw_set(current_client[0] + "1", PASSWORD) # 模拟client连接报错
# 定义一个容器,存放已发送的消息
self.client.published_message = {}
def on_start(self):
# 设置tls
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
self.client.tls_set_context(context)
self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
self.client.on_connect = self.on_connect
def on_stop(self):
print("publish 成功, 当前已成功发送数量:{}".format(total_published))
if len(disconnect_record_list) == 0:
print("无断开连接的client")
else:
# 把断开记录里的信息写入csv
with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
for i in disconnect_record_list:
writer.writerow(i)
print("断开连接的client信息已写入csv文件")
@staticmethod
def on_connect(client, userdata, flags, rc, props=None):
if rc == 0:
print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))
fire_success(
request_type="c_success",
name='count_connected',
response_time=0,
response_length=0,
exception=None,
context=None
)
else:
print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))
fire_success(
request_type="c_fail",
name="client_id: " + str(client._client_id),
response_time=0,
response_length=0,
exception=None,
context=None
)
if __name__ == '__main__':
run_single_user(MQTTLocustUser)
2. 代码分析-locust库部分
并发请求能力还是使用的
locust
库的能力。官方只提供了
http
协议接口的相关类,没直接提供
mqtt
协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承
User
和
TaskSet
即可。
User
类
首先是先定义
User
类,这里就是用来生成我要用来测试的车辆。
类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。
client
来源于
from paho.mqtt import client as mqtt_client
提供的能力,固定用法,按照人家的文档使用就行。
红色框里,是
User
类的2个重要熟悉属性:
tasks
: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的
PublishTask
类下面定义的内容。wait_time
: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。
绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。
蓝色框里有2个方法,也是
locust
提供的能力:
on_start
:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。on_stop
:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。
TaskSet
类
定义好
User
类,就需要来定义
TaskSet
类,你得告诉产生出来的用户,要干点啥。
我这根据业务需要,就是让车辆不停的像broker发送数据即可。
红色部分,同样是
paho.mqtt
提供的能力,会启动新的线程去执行你定义的事情。
黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是
MQTTMessageInfo
类。
注意返回的2个属性:
mid
: 返回这个消息发送的顺序rc
: 表示发送的响应状态,0 就是成功
绿色部分,还记得我在上面的
User
类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。
2. 代码分析-paho.mqtt库部分
上面的代码已经用到了不少
paho.mqtt
的能力,这里再进行整体梳理下。
- client.Client():声明一个client
- client.username_pw_set(): 设置客户端的用户名,密码
- client.tls_set_context: 设置ssl模式
- client.connect(): 连接代理
- client.publish:向代理推送消息
还用到了一些回调函数:
- on_connect:连接操作成功时回调
- on_publish:发布成功时回调
- on_disconnect:客户端与代理断开连接时回调
另外还用到了一个事件函数
events.request
。
当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个
event
。
查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。
比如我在发送回调函数里调用了该方法。
所以最后在控制台显示的报告里就有我定义的内容了。
由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在
on_disconnect
回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。
后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。
三、小结
后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。
现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。
另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。