2024年4月

前一段时间有个Java技术栈的朋友联系到我,需要快速对接现有的无人值守称重系统,这里的对接是指替代现有系统,而非软件层面的对接,也就是利用现有的硬件开发一套替代现有软件的自动化系统。主要设备包括地磅秤、道闸、红外对射传感器、摄像头、小票打印、LED显示屏等等,全程使用LED显示屏提示人员当前的操作状态。

业务流程:

①摄像头识别车牌号

②开启前入磅道闸

③红外监测是否抵达称重区域

④采集地磅重量,自动判断仪表读数稳定

⑤摄像头抓拍现场图像,同时并发采集多路摄像头形成现场档案

⑥数据打包上传到MES系统

⑦打印小票

⑧开启后出磅道闸

这位同学基于java技术栈研究了一段时间进展较慢,应该是通过园子联系到我。我们简单沟通了一下,确定使用IoTBrowser来开发,虽然前期没有界面的展示需求,但是保留了UI控制的扩展性,最主要是用html+js开发起来简单、高效。我这边提供硬件层的驱动和js接口,他来实现上层业务逻辑控制。

因为目前项目处于前期技术验证阶段,所以前期拿了2款硬件进行测试。第一款是地磅秤,据了解地磅秤仪表使用耀华A9,IoTBrowser已经自带实现,js示例也提供了不需要二次开发。第二个就是控制道闸的开启与关闭,这个还没有实现,所以重点就是打通这个设备。

要进行硬件对接首先要知道对接的接口形式和数据协议,通过以下三步:

第一步,找到设备的品牌和型号;

第二步,快速在官网找到说明书,通过了解这块设备是施耐德品牌C2000型号,一款以太网型开关量模块,向下使用RS485接入道闸的串口,向上提供Modbus-TCP协议可以远程控制。

第三步,通过说明书找到具体的控制协议,然鹅Modbus协议是使用原始的16进制描述,并没有线圈相关的介绍。

找到了对应的协议,下一步就算摞起袖子开工。因为对方在宁夏而我在长沙,需要代码开发调试不可能在对方机器上安装一套VS开发工具再远程到他电脑,这样很不方便,所以使用代理软件将设备的Modbus端口临时转发出来,这样跨越千里通过网络就可以在异地联调设备。

经过几个小时的摸索,成功实现了设备的开启和关闭。中间过程还算顺利,就是使用NModbus时是使用的Int参数需要进行进制转换,这里浪费了一点时间。

        // 开关控制
        function open(address, startAddress, value) {
            var $msgWrite = $('#msgWrite');
            dds.iot.com.exeCommand({ id: wid, name: "WriteSingleCoil", data: { slaveAddress: address, startAddress: startAddress, value: value } }, function (ar) {
                if (ar.Success) {
                    $msgWrite.text('操作成功')
                } else {
                    $msgWrite.text('操作失败:' + ar.Message)
                }
            })
        }
        //开关状态读取
        function readStatus(address, startAddress, num) {
            dds.iot.com.exeCommand({ id: wid, name: "ReadCoils", data: { slaveAddress: address, startAddress: startAddress, numberOfPoints: num } }, function (ar) {
                if (ar.Success) {
                    $msg.text('数据:' + ar.Data)
                } else {
                    $msg.text('操作失败:' + ar.Message)
                }
            })
        }

        // 启动称重采集服务
        function startWeight() {
            var $weight = $("#weight");

            var type = 'test';// 修改为实际型号
            //var type = 'yh_a9';// 耀华XK3190-A9:yh_a9

            var port = 1;
            var baudRate = 9600;
            // 调用电子秤
            dds.iot.weight.start({
                type: type,
                port: port,
                baudRate: baudRate,
                onUpdateWeight: function (data) {
                    // 重量回调事件
                    $weight.html(data.weight);
                    console.log('最新重量:'+ data.weight)
                },
                complete: function (ar) {
                    if (!ar.Success) {
                        alert(ar.Message);
                    }
                }
            })
        }

上层封装了js和简单的UI参考示例,我这边的工作就顺利交付了。

IoTBrowser平台开源地址:https://gitee.com/yizhuqing/IoTBrowser/

简介

本文针对
官方例程
中的:02-multiple-acquisition-main-thread做简单的讲解,并简单介绍其中调用的
arv_camera_set_acquisition_mode

arv_camera_create_stream

arv_camera_get_payload

arv_buffer_new

arv_stream_push_buffer

arv_camera_start_acquisition

arv_stream_pop_buffer

arv_camera_stop_acquisition
函数。

aravis版本:0.8.31
操作系统:ubuntu-20.04
gcc版本:9.4.0

例程代码

这段代码使用Aravis的API,控制相机连续采集,并在主线程中从缓冲区获取前10帧图像(假设不丢帧),主要操作步骤如下:

  • 连接相机
  • 设置采集模式为连续采集
  • 创建流对象,并向流对象的buffer池中添加buffer
  • 开始采集
  • 从buffer池中获取10个图像数据
  • 停止采集
  • 释放资源
/* SPDX-License-Identifier:Unlicense */

/* Aravis header */
#include <arv.h>
/* Standard headers */
#include <stdlib.h>
#include <stdio.h>

/*
 * Connect to the first available camera, then acquire 10 buffers.
 */
int main (int argc, char **argv)
{
	ArvCamera *camera;
	GError *error = NULL;

	//连接相机
	camera = arv_camera_new ("192.168.6.63", &error);

	if (ARV_IS_CAMERA (camera)) {
		ArvStream *stream = NULL;

		printf ("Found camera '%s'\n", arv_camera_get_model_name (camera, NULL));
		//设置相机采集模式为连续采集
		arv_camera_set_acquisition_mode (camera, ARV_ACQUISITION_MODE_CONTINUOUS, &error);

		if (error == NULL)
			//创建流对象
			stream = arv_camera_create_stream (camera, NULL, NULL, &error);

		if (ARV_IS_STREAM (stream)) {
			int i;
			size_t payload;

			//从相机对象中获取图像负载大小(每个图像的字节大小)
			payload = arv_camera_get_payload (camera, &error);
			if (error == NULL) {
				for (i = 0; i < 2; i++)
					//在流对象buffer池中插入两个buffer
					arv_stream_push_buffer (stream, arv_buffer_new (payload, NULL));
			}

			if (error == NULL)
				//开始采集
				arv_camera_start_acquisition (camera, &error);

			if (error == NULL) {
				for (i = 0; i < 10; i++) {
					ArvBuffer *buffer;
					//从流对象中获取buffer,此时buffer中已经包含了图像数据
					buffer = arv_stream_pop_buffer (stream);
					if (ARV_IS_BUFFER (buffer)) {
						printf ("Acquired %d×%d buffer\n",
							arv_buffer_get_image_width (buffer),
							arv_buffer_get_image_height (buffer));
						//将buffer放回流对象的buffer池中,而不是销毁
						arv_stream_push_buffer (stream, buffer);
					}
				}
			}

			if (error == NULL)
				//停止采集
				arv_camera_stop_acquisition (camera, &error);

			g_clear_object (&stream);
		}

		g_clear_object (&camera);
	}

	if (error != NULL) {
		printf ("Error: %s\n", error->message);
		return EXIT_FAILURE;
	}

	return EXIT_SUCCESS;
}

此例程较为简单,每一步的细节查看注释即可,此处不过多讲解。

运行结果:

函数说明

arv_camera_set_acquisition_mode

简介:设置相机的采集模式

void arv_camera_set_acquisition_mode(
	ArvCamera* camera,
	ArvAcquisitionMode value,
	GError** error
)

其中value是个枚举值,可选的值有:
ARV_ACQUISITION_MODE_CONTINUOUS

ARV_ACQUISITION_MODE_SINGLE_FRAME

ARV_ACQUISITION_MODE_MULTI_FRAME
分别代表连续采集,单帧采集和多帧采集。

arv_camera_create_stream

简介:创建流对象
注意:最终必须调用g_object_unref()释放内存

ArvStream* arv_camera_create_stream(
	ArvCamera* camera,
	ArvStreamCallback callback,
	void* user_data,
	GError** error
)

其中callback是个回调函数,用于数据帧的处理;user_data是向回调函数中传递的用户数据。本例中这两个参数均为NULL,表示不使用回调函数。

Available since: 0.2.0

arv_camera_get_payload

简介:从相机检索一个图像所需的存储空间大小。此值一般用于创建流缓冲区。

guint arv_camera_get_payload(ArvCamera* camera, GError** error)

Available since: 0.8.0

arv_buffer_new

简介:此函数用于创建一个新的缓冲区,专门用来存储视频流图像的数据。函数提供了灵活的内存管理选项,让调用者可以选择预先分配内存(后续可由调用者重新分配)或由函数来分配内存。

ArvBuffer* arv_buffer_new(size_t size, void* preallocated)

其中preallocated指向预分配的内存缓冲区。若此参数不为NULL,则指向的内存将用作缓冲区存储数据;为NULL则由函数分配内存。

Available since: 0.2.0

arv_stream_push_buffer

简介:将一个buffer添加到一个特定的流中。当缓冲区被推送到流中时,流对象接管了该缓冲区对象的所有权。

void arv_stream_push_buffer(ArvStream* stream, ArvBuffer* buffer)

Available since: 0.2.0

arv_camera_start_acquisition

简介:开始采集

void arv_camera_start_acquisition(ArvCamera* camera, GError** error)

Available since: 0.8.0

arv_stream_pop_buffer

简介:从流的输出队列中弹出一个buffer并返回。这个buffer可能包含无效的图像数据,调用者应在使用图像数据前检查其有效性。
若输出队列为空,则阻塞至数据来临。

ArvBuffer* arv_stream_pop_buffer(ArvStream* stream)

Available since: 0.2.0

arv_camera_stop_acquisition

简介:停止采集

void arv_camera_stop_acquisition(ArvCamera* camera, GError** error)

Available since: 0.8.0

我们很高兴地宣布在 ARM64 中为 Visual Studio 推出 SQL Server Data Tools(SSDT)。这个增强是在令人兴奋的17.10预览版2中发布的。arm64 上 Visual Studio 的 SSDT 版本为 arm64 上的 Visual Studio 添加了 SQL 开发功能。这个最新的开发带来了许多专为增强您在 ARM64 设备上的数据库开发体验而定制的特性。

关键特性

- SQL 项目(打开,构建,发布):通过在 ARM64 上直接从 Visual Studio 打开现有项目,无缝地深入到您的 SQL 项目中,并构建和发布项目。

- 模式比较:使用模式比较功能确保数据一致性并轻松识别更改,使您可以毫不费力地可视化和同步模式差异。

- 数据比较:方便快速准确的数据库间数据比较。

- 查询编辑器:使用针对 ARM64 架构优化的直观查询编辑器,直接在 Visual Studio 中创作和微调 SQL 查询。使用 GitHub Copilot 通过将自然语言翻译成 T-SQL 代码来帮助您进行数据库开发。

- 表设计器:使用表设计器 GUI 设计和修改表,为创建和管理数据库表提供用户友好的界面。

- 数据库属性编辑器:使用数据库属性编辑器定制数据库设置,允许您配置数据库的各个方面。

- 对象重构:使用对象重构功能简化数据库开发过程,使您能够有效地重组和优化数据库模式。

特性列表

安装步骤

- 从微软官网下载 Visual Studio Installer。

- 在 Available 选项中,找到 17.10.0 Preview 2 (Enterprise / Community / Professional)。

- 单击 Installation 按钮,打开17.10.0 Preview 2的 Installation 对话框。转到 Individual Components 并搜索 SQL。

- 选择“SQL Server Data Tools”。它将自动选择所有 SQL 组件。

- 根据需要选择其他组件/工作负载,然后单击 Install。

- 打开 Visual Studio 并创建 SQL Server Database Project。

结论

随着 Visual Studio 17.10 Preview 2中 ARM64 架构的 SSDT 的推出,我们的目标是为 SQL Server 数据库开发提供一个健壮的环境。探索现在可用的令人兴奋的功能,并继续关注即将发布的版本。

今天就开始下载吧。

您的反馈对我们非常重要,请在 https://aka.ms/ssdt-feedback 分享与 SSDT 相关的任何反馈。

我们感谢您花时间报告问题/建议,并希望您在使用 Visual Studio 时继续给我们反馈,告诉我们您喜欢什么以及我们可以改进什么。您的反馈对于帮助我们使 Visual Studio 成为最好的工具至关重要!您可以通过开发者社区与我们分享反馈,通过发送反馈来报告问题或分享您的建议,推动对新功能或现有功能的改进。

通过在 YouTube, Twitter, LinkedIn, Twitch 和 Microsoft Learn 上关注我们与 Visual Studio 团队保持联系。

原文链接:https://devblogs.microsoft.com/visualstudio/arm64-in-ssdt/

引言

在当今互联网领域,尤其在大型电商平台如淘宝这样的复杂分布式系统中,数据的高效管理和快速访问至关重要。面对数以千万计的商品、交易记录以及其他各类业务数据,如何在MySQL等传统关系型数据库之外,借助内存数据库Redis的力量,对部分高频访问数据进行高效的缓存处理,是提升整个系统性能的关键一环。

比如淘宝,京东,拼多多等电商系统每日处理的订单量级庞大,其数据库中存储的商品、用户信息及相关交易数据可达数千万条。为了降低数据库查询的压力,加速数据读取,Redis常被用于搭建二级缓存系统,以容纳部分最为活跃的“热点数据”。然而,在资源有限的情况下,如何确保仅有的20万条缓存数据精准匹配到系统中的热点数据,避免频繁的冷数据替换热数据导致的缓存失效,这就涉及到了一套精密的数据管理策略和缓存淘汰机制的设计。

本文将围绕这一实战场景展开讨论:在MySQL拥有2000万条数据的前提下,如何确保Redis仅缓存的20万条数据全都是系统中的热点数据,从而最大程度上发挥缓存的优势,提高系统的响应速度和并发能力,进而提升用户的购物体验和服务质量。通过对Redis内部机制的深入理解以及对业务场景的精细分析,我们将揭示一套综合运用各种技术手段来确保Redis中热点数据准确有效的管理方案。

image.png

技术背景

在探讨如何确保Redis中存储的20万数据均为热点数据之前,首先需要明确MySQL与Redis在实际业务环境中的互补关系以及Redis自身的内存管理和数据淘汰机制。

MySQL与Redis的关系及应用场景

MySQL作为一种成熟的关系型数据库管理系统,适用于存储大量持久化且具有复杂关系的数据,其强大的事务处理能力和安全性保障了数据的一致性和完整性。但在大规模并发环境下,尤其是对那些读多写少、访问频次极高的热点数据,直接从MySQL中读取可能会成为系统性能瓶颈。

Redis则是一种高性能的内存键值数据库,以其极快的速度和灵活的数据结构著称。在淘宝这类大型电商平台中,Redis主要用于缓存频繁访问的数据,例如热门商品信息、用户购物车、会话状态等,以此减轻主数据库的压力,提高响应速度,增强系统的可扩展性和容错性。

对于Redis高性能原理,请参考:
京东二面:Redis为什么快?我说Redis是纯内存操作的,然后他对我笑了笑。
对于Redis的使用的业务场景,请参考:
美团一面:项目中使用过Redis吗?我说用Redis做缓存。他对我哦了一声

Redis内存管理和数据淘汰机制简介

Redis的所有数据都存储在内存中,这意味着它的容量相较于磁盘存储更为有限。为了解决内存容量不足的问题,Redis提供了多种数据淘汰策略。其中,与保证热点数据密切相关的是LFU(Least Frequently Used)策略,它能够根据数据对象的访问频次,将访问次数最少(即最不常用)的数据淘汰出内存,以便为新的数据腾出空间。

对于Redis高性能的一方面原因就是Redis高效的管理内存,具体请参考:
京东二面:Redis为什么快?我说Redis是纯内存操作的,然后他对我笑了笑。

此外,Redis允许用户根据自身需求选择不同的淘汰策略,例如“volatile-lfu”只针对设置了过期时间的key采用LFU算法,“allkeys-lfu”则对所有key都执行LFU淘汰规则。

热点数据定义及其识别方法

热点数据是指在一定时间内访问频率极高、对系统性能影响重大的数据集。在电商平台中,这可能表现为热销商品详情、活动页面信息、用户高频查询的搜索关键词等。识别热点数据主要依赖于对业务日志、请求统计和系统性能监控工具的分析,通过收集和分析用户行为数据,发现并量化哪些数据是系统访问的热点,以便有针对性地将它们缓存至Redis中。

实现方案

在实际应用中,确保Redis中存储的数据为热点数据,我们可以从以下几个方案考虑实现。

LFU淘汰策略

Redis中的LFU(Least Frequently Used)淘汰策略是一种基于访问频率的内存管理机制。当Redis实例的内存使用量达到预先设定的最大内存限制(由
maxmemory
配置项指定)时,LFU策略会根据数据对象的访问频次,将访问次数最少(即最不常用)的数据淘汰出内存,以便为新的数据腾出空间。

LFU算法的核心思想是通过跟踪每个键的访问频率来决定哪些键应当优先被淘汰。具体实现上,Redis并非实时精确地计算每个键的访问频率,而是采用了近似的LFU方法,它为每个键维护了一个访问计数器(counter)。每当某个键被访问时,它的计数器就会递增。随着时间推移,Redis会依据这些计数器的值来决定淘汰哪些键。

在Redis 4.0及其后续版本中,LFU策略可以通过设置
maxmemory-policy
配置项为
allkeys-lfu

volatile-lfu
来启用。其中:

  • allkeys-lfu
    :适用于所有键,无论它们是否有过期时间,都会基于访问频率淘汰键。
  • volatile-lfu
    :仅针对设置了过期时间(TTL)的键,按照访问频率淘汰键。

Redis实现了自己的LFU算法变体,它使用了一个基于访问计数和老化时间的组合策略来更好地适应实际情况。这意味着不仅考虑访问次数,还会考虑到键的访问频率随时间的变化,防止长期未访问但曾经很热门的键占据大量内存空间而不被淘汰。在实现上,Redis使用了一种称为“频率跳表(frequency sketch)”的数据结构来存储键的访问频率,允许快速查找和更新计数器。为了避免长期未访问但计数器较高的键永久保留,Redis会在一段时间后降低键的访问计数,模拟访问频率随时间衰减的效果。

在Redis中使用LFU淘汰策略,在配置文件
redis.conf
中找到
maxmemory-policy
选项,将其设置为LFU相关策略之一:

maxmemory-policy allkeys-lfu # 对所有键启用LFU淘汰策略 
# 或者 
maxmemory-policy volatile-lfu # 对有过期时间的键启用LFU淘汰策略

确保你也设置了Redis的最大内存使用量(
maxmemory
),只有当内存到达这个上限时,才会触发淘汰策略:

maxmemory <size_in_bytes> # 指定Redis可以使用的最大内存大小

LFU策略旨在尽可能让那些近期最不活跃的数据优先被淘汰,以此保持缓存中的数据相对活跃度更高,提高缓存命中率,从而提升系统的整体性能。(这也是我们面试中需要回答出来的答案)

LRU淘汰策略

Redis中的LRU(Least Recently Used)淘汰策略是一种用于在内存不足时自动删除最近最少使用的数据以回收内存空间的方法。尽管Redis没有完全精确地实现LRU算法(因为这在O(1)时间内实现成本较高),但Redis确实提供了一种近似LRU的行为。

当我们配置了最大内存限制,如果内存超出这个限制时,Redis会选择性地删除一些键值对来腾出空间。Redis提供了几种不同的淘汰策略,其中之一就是
volatile-lru

allkeys-lru
,这两种都试图模拟LRU行为。

  • volatile-lru
    :仅针对设置了过期时间(TTL)的键,按照最近最少使用的原则来删除键。
  • allkeys-lru
    :不论键是否设置过期时间,都会根据最近最少使用的原则来删除键。

Redis实现LRU的方式并不是真正意义上的双向链表加引用计数这样的完整LRU结构,因为每个键值对的插入、删除和访问都需要维持这样的数据结构会带来额外的开销。所以Redis实现LRU会采取以下方式进行:

  1. Redis内部为每个键值对维护了一个“空转时间”(idle time)的字段,它是在Redis实例启动后最后一次被访问或修改的时间戳。
  2. 当内存达到阈值并触发淘汰时,Redis不会遍历整个键空间找出绝对意义上的最近最少使用的键,而是随机抽取一批键检查它们的空转时间,然后删除这批键中最久未被访问的那个。
    Redis在大多数情况下能较好地模拟LRU效果,有助于保持活跃数据在内存中,减少因频繁换入换出带来的性能损失。

内存淘汰策略通常是在Redis服务器端的配置文件(如
redis.conf
)中设置,而不是在应用中配置。你需要在Redis服务器端的配置中设置
maxmemory-policy
参数为
allkeys-lru
。(同LFU策略)

使用Redis的LRU淘汰策略实现热点数据的方式,简单易行,能较好地应对大部分情况下的热点数据问题。但是若访问模式复杂或数据访问分布不均匀,单纯的LRU策略可能不够精准,不能确保绝对的热点数据留存。

结合访问频率设定过期时间

在实际应用中,除了依赖Redis的淘汰策略外,还可以结合业务逻辑,根据数据的访问频率动态设置Key的过期时间。例如,当某个Key被频繁访问时,延长其在Redis中的有效期,反之则缩短。

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public void updateKeyTTL(String key, int ttlInSeconds) {
    redisTemplate.expire(key, ttlInSeconds, TimeUnit.SECONDS);
}

// 示例调用,当检测到某个数据访问增多时,增加其缓存过期时间
public void markAsHotSpot(String key) {
    updateKeyTTL(key, 3600); // 将热点数据缓存时间延长至1小时
}

这种方式灵活性强,可根据实际访问情况动态调整缓存策略。但是需要在应用程序中进行较多定制开发,以捕捉并响应数据访问的变化。

基于时间窗口的缓存淘汰策略

在给定的时间窗口(如过去1小时、一天等)内,对每个数据项的访问情况进行实时跟踪和记录,可以使用计数器或其他数据结构统计每条数据的访问次数。到达时间窗口边界时,计算每个数据项在该窗口内的访问频率,这可以是绝对访问次数、相对访问速率或者其他反映访问热度的指标。根据预先设定的阈值,将访问次数超过阈值的数据项加入Redis缓存,或者将其缓存时间延长以确保其能在缓存中停留更久。而对于访问次数低于阈值的数据项,要么从缓存中移除,要么缩短其缓存有效期,使其更容易被后续淘汰策略处理。

@Service
public class TimeWindowCacheEvictionService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private final Map<String, AtomicInteger> accessCounts = new ConcurrentHashMap<>();

    // 时间窗口长度(例如,1小时)
    private static final long TIME_WINDOW_MILLIS = TimeUnit.HOURS.toMillis(1);

    @Scheduled(fixedRate = TIME_WINDOW_MILLIS)
    public void evictBasedOnFrequency() {
        accessCounts.entrySet().forEach(entry -> {
            int accessCount = entry.getValue().get();
            if (accessCount > THRESHOLD) { // 假设THRESHOLD是访问次数阈值
                // 将数据存入或更新到Redis缓存,并设置较长的过期时间
                redisTemplate.opsForValue().set(entry.getKey(), getDataFromDB(entry.getKey()), CACHE_EXPIRATION_TIME, TimeUnit.MINUTES);
            } else if (redisTemplate.hasKey(entry.getKey())) {
                // 访问次数低,从缓存中移除或缩短过期时间
                redisTemplate.delete(entry.getKey());
            }
        });

        // 清零访问计数器,准备下一个时间窗口
        accessCounts.clear();
    }

    public void trackDataAccess(String dataId) {
        accessCounts.computeIfAbsent(dataId, k -> new AtomicInteger()).incrementAndGet();
    }
}

关于@Scheduled是Springboot中实现定时任务的一种方式,对于其他几种方式,请参考:
玩转SpringBoot:SpringBoot的几种定时任务实现方式

通过这种方法,系统能够基于实际访问情况动态调整缓存内容,确保Redis缓存中存放的总是具有一定热度的数据。当然,这种方法需要与实际业务场景紧密结合,并结合其他缓存策略共同作用,以实现最优效果。同时,需要注意此种策略可能带来的额外计算和存储成本。

手动缓存控制

针对已识别的热点数据,可以通过监听数据库变更或业务逻辑触发器主动将数据更新到Redis中。例如,当商品销量剧增变为热点商品时,立即更新Redis缓存。

这种方式可以确保热点数据及时更新,提高了缓存命中率。

利用数据结构优化

使用Sorted Set等数据结构可以进一步精细化热点数据管理。例如,记录每个商品最近的访问的活跃时间,并据此决定缓存哪些商品数据。

// 商品访问活跃时更新其在Redis中的排序
String goodsActivityKey = "goods_activity";
redisTemplate.opsForZSet().add(goodsActivityKey, sku, System.currentTimeMillis());

// 定时清除较早的非热点商品数据
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点清理前一天的数据
public void cleanInactiveUsers() {
    long yesterdayTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
    redisTemplate.opsForZSet().removeRangeByScore(goodsActivityKey, 0, yesterdayTimestamp);
}

这种方式能够充分利用Redis内建的数据结构优势,实现复杂的数据淘汰逻辑。

实际业务中实践方案

在例如淘宝这样庞大的电商生态系统中,面对MySQL中海量的业务数据和Redis有限的内存空间,我们采用了多元化的策略以确保缓存的20万数据是真正的热点数据。

LFU策略的运用

自Redis 4.0起,我们可以通过配置Redis淘汰策略为近似的LFU(
volatile-lfu

allkeys-lfu
),使得Redis能够自动根据数据访问频率进行淘汰决策。LFU策略基于数据的访问次数,使得访问越频繁的数据越不容易被淘汰,从而更好地保持了热点数据在缓存中的存在。

访问频率动态调整

除了依赖Redis内置的LFU淘汰策略,我们还可以实现应用层面的访问频率追踪和响应式缓存管理。例如,每当商品被用户访问时,系统会更新该商品在Redis中的访问次数,同时根据访问频率动态调整缓存过期时间,确保访问频率高的商品在缓存中的生存期得到延长。

@Service
public class ProductService {

    @Autowired
    private RedisTemplate<String, Product> redisTemplate;

    public void updateProductViewCount(String productId) {
        // 更新产品访问次数
        redisTemplate.opsForValue().increment("product:view_count:" + productId);

        // 根据访问次数调整缓存过期时间
        Long viewCount = redisTemplate.opsForValue().get("product:view_count:" + productId);
        if (viewCount > THRESHOLD_VIEW_COUNT) {
            redisTemplate.expire("product:info:" + productId, LONGER_CACHE_EXPIRATION, TimeUnit.MINUTES);
        }
    }
}

数据结构优化

我们还可以利用Redis丰富的数据结构,如有序集合(Sorted Sets)和哈希(Hashes),来实现商品热度排行、用户行为分析等功能。例如,通过Sorted Set存储商品的浏览量,自动按照浏览量高低进行排序,并淘汰访问量低的商品缓存。

// 更新商品浏览量并同步到Redis有序集合
public void updateProductRanking(String productId, long newViewCount) {
    redisTemplate.opsForZSet().add("product_ranking", productId, newViewCount);
    // 自动淘汰浏览量低的商品缓存
    redisTemplate.opsForZSet().removeRange("product_ranking", 0, -TOP_RANKED_PRODUCT_COUNT - 1);
}

总结

本文详细阐述了在电商平台例如淘宝及其他类似场景下,如何结合LFU策略与访问频率调整,优化Redis中20万热点数据的管理。通过配置Redis近似的LFU淘汰策略,结合应用层面对访问频率的实时追踪与响应式调整,以及利用多样化的Redis数据结构如有序集合和哈希表,成功实现了热点数据的精确缓存与淘汰。

通过电商平台的一些实际业务实践证明,这种综合策略可以有效提升缓存命中率,降低数据库访问压力,确保缓存资源始终服务于访问最频繁的数据。未来随着数据挖掘与分析技术的进步,以及Redis或其他内存数据库功能的拓展,预计将进一步细化和完善热点数据的识别与管理机制。例如,探索更具前瞻性的预测性缓存策略,或是结合机器学习模型对用户行为进行深度分析,以更精准地预判和存储未来的热点数据。

本文已收录于我的个人博客:
码农Academy的博客,专注分享Java技术干货,包括Java基础、Spring Boot、Spring Cloud、Mysql、Redis、Elasticsearch、中间件、架构设计、面试题、程序员攻略等

一、 写在前面

随着深度学习技术的不断发展,模型的训练成本也越来越高。训练一个高效的通用模型,需要大量的训练数据和算力。在很多非大模型相关的常规任务上,往往也需要使用多卡来进行并行训练。在多卡训练中,最为常用的就是分布式数据并行(DistributedDataParallel, DDP)。但是现有的有关DDP的教程和博客比较少,内容也比较分散繁琐。在大多数情况下,我们只需要学会如何使用即可,不需要特别深入地了解原理。为此,写下这个系列博客,简明扼要地介绍一下DDP的使用,抛开繁杂的细节和原理,帮助快速上手使用(All in one blog)。

篇幅较长,分为上下两篇:这篇简要介绍相关背景和理论知识,下篇详细介绍代码框架和搭建流程。

二、什么是分布式并行训练
1. 并行训练

在Pytorch中,有两种并行训练方式:

1)模型并行。模型并行通常是指你的模型非常大,大到一块卡根本放不下,因而需要把模型进行拆分放到不同的卡上。

2)数据并行。数据并行通常用于训练数据非常庞大的时候,比如有几百万张图像用于训练模型。此时,如果只用一张卡来进行训练,那么训练时间就会非常的长。并且由于单卡显存的限制,训练时的batch size不能设置得过大。但是,对于很多模型的性能而言,由于BN层的使用,都会和batch size的大小正相关。此外,很多基于对比学习的训练算法,由于其对负样本的需求,性能也与batch size的大小正相关。因此,我们需要使用多卡训练,不仅可以训练加速,并且可以设置更大的batch size来提升性能。

2. 数据并行

在Pytorch中有两种方式来实现数据并行:

1)数据并行(DataParallel,DP)。DataParallel采用参数服务器架构,其训练过程是单进程的。在训练时,会将一块GPU作为server,其余的GPU作为worker,在每个GPU上都会保留一个模型的副本用于计算。训练时,首先将数据拆分到不同的GPU上,然后在每个worker上分别进行计算,最终将梯度汇总到server上,在server进行模型参数更新,然后将更新后的模型同步到其他GPU上。这种方式有一个很明显的弊端,作为server的GPU其通信开销和计算成本非常大。它需要和其他所有的GPU进行通信,并且梯度汇总、参数更新等步骤都是由它完成,导致效率比较低。并且,随着多卡训练的GPU数量增强,其通信开销也会线性增长。

Parameter Server架构

不过DataParallel的代码十分简洁,仅需在原有单卡训练的代码中加上一行即可。

model = nn.DataParallel(model) 

如果你的数据集并不大,只有几千的规模,并且你多卡训练时的卡也不多,只有4块左右,那么DataParallel会是一个不错的选择。

关于Parameter Server更详细的原理介绍,可以参考:

深度学习加速:算法、编译器、体系结构与硬件设计

一文讀懂「Parameter Server」的分布式機器學習訓練原理

2)分布式数据并行(DistributedDataParallel,DDP)。DDP采用Ring-All-Reduce架构,其训练过程是多进程的。如果要用DDP来进行训练,我们通常需要修改三个地方的代码:数据读取器dataloader,日志输出print,指标评估evaluate。其代码实现略微复杂,不过我们只需要始终牢记一点即可:
每一块GPU都对应一个线程,除非我们手动实现相应代码,不然各个线程的数据都是不互通的。Pytorch只为我们实现了同步梯度和参数更新的代码,其余的需要我们自己实现。

Ring-All-Reduce架构

三、DDP的基本原理
1. DDP的训练过程

DDP的训练过程可以总结为如下步骤:

1)在训练开始时,整个数据集被均等分配到每个GPU上。每个GPU独立地对其分配到的数据进行前向传播(计算预测输出)和反向传播(计算梯度)。

2)同步各个GPU上的梯度,以确保模型更新的一致性,该过程通过Ring-All-Reduce算法实现。

3)一旦所有的GPU上的梯度都同步完成,每个GPU就会使用这些聚合后的梯度来更新其维护的模型副本的参数。因为每个GPU都使用相同的更新梯度,所以所有的模型副本在任何时间点上都是相同的。

2. Ring-All-Reduce算法

Ring-All-Reduce架构是一个环形架构,所有GPU的位置都是对等的。每个GPU上都会维持一个模型的副本,并且只需要和它相连接的两个GPU通信。

对于第k个GPU而言,只需要接收来自于第k-1个GPU的数据,并将数据汇总后发送给第k+1个GPU。这个过程在环中持续进行,每个GPU轮流接收、聚合并发送梯度。

经过 N 次的迭代循环后(N是GPU的数量),每个GPU将累积得到所有其他GPU的梯度数据的总和。此时,每个GPU上的梯度数据都是完全同步的。

DDP的通信开销与GPU的数量无关,因而比DP更为高效。如果你的训练数据达到了十万这个量级,并且需要使用4卡及以上的设备来进行训练,DDP将会是你的最佳选择。

关于DDP和Ring-All-Reduce算法的更多实现原理和细节,可以参考:

Bringing HPC Techniques to Deep Learning

Pytorch 分散式訓練 DistributedDataParallel — 概念篇

Technologies behind Distributed Deep Learning: AllReduce

四、如何搭建一个Pytorch DDP代码框架
1. 与DDP有关的基本概念

在开始使用DDP之前,我们需要了解一些与DDP相关的概念。

参数 含义 查看方式
group 分布式训练的进程组,每个group可以进行自己的通信和梯度同步 Group通常在初始化分布式环境时创建,并通过
torch.distributed.new_group
等API创建自定义groups。
world size 参与当前分布式训练任务的总进程数。在单机多GPU的情况下,world size通常等于GPU的数量;在多机情况下,它是所有机器上所有GPU的总和。 torch.distributed.get_world_size()
rank Rank是指在所有参与分布式训练的进程中每个进程的唯一标识符。Rank通常从0开始编号,到world size - 1结束。 torch.distributed.get_rank()
local rank Local rank是当前进程在其所在节点内的相对编号。例如,在一个有4个GPU的单机中,每个GPU进程的local rank将是0, 1, 2, 3。这个参数常用于确定每个进程应当使用哪个GPU。 Local rank不由PyTorch的分布式API直接提供,而通常是在启动分布式训练时由用户设定的环境变量,或者通过训练脚本的参数传入。
2. 与DDP有关的一些操作

在DDP中,每个进程的数据是互不影响的(除了采用Ring-All-Reduce同步梯度)。如果我们要汇总或者同步不同进程上的数据,就需要用到一些对应的函数。

1)all_reduce

all_reduce
操作会在所有进程中聚合每个进程的数据(如张量),并将结果返回给所有进程。聚合可以是求和、取平均、找最大值等。当你需要获得所有进程的梯度总和或平均值时,可以使用
all_reduce
。这在计算全局平均或总和时非常有用,比如全局平均损失。

一个示例代码如下:

import torch.distributed as dist

tensor_a = torch.tensor([1.0], device=device)
# 所有进程中的tensor_a将会被求和,并且结果会被分配给每个进程中的tensor_a。
dist.all_reduce(tensor_a, op=dist.ReduceOp.SUM)

2)all_gather

all_gather
操作用于在每个进程中收集所有进程的数据。它不像
all_reduce
那样聚合数据,而是将每个进程的数据保留并汇总成一个列表。当每个进程计算出一个局部结果,并且你需要在每个进程中收集所有结果进行分析或进一步处理时,可以使用
all_gather

一个示例代码如下:

import torch
import torch.distributed as dist

# 每个进程有一个tensor_a,其值为当前进程的rank
tensor_a = torch.tensor([rank], device=device)  # 假设rank是当前进程的编号
gather_list = [torch.zeros_like(tensor_a) for _ in range(dist.get_world_size())]
# 收集所有进程的tensor_a到每个进程的gather_list
dist.all_gather(gather_list, tensor)

3)broadcast

broadcast
操作将一个进程的数据(如张量)发送到所有其他进程中。这通常用于当一个进程生成了某些数据,需要确保其他所有进程都得到相同的数据时。在在开始训练之前,可以用于同步模型的初始权重或者在所有进程中共享某些全局设置。一个示例代码如下:

import torch.distributed as dist

tensor_a = torch.tensor([1.0], device=device)
if rank == 0:
    tensor_a.fill_(10.0)  # 只有rank 0设置tensor_a为10
dist.broadcast(tensor_a, src=0)  # rank 0将tensor_a广播到所有其他进程
3. 要实现DDP训练,我们需要解决哪些问题?

1)如何将数据均等拆分到每个GPU

在分布式训练中,为了确保每个GPU都能高效地工作,需要将训练数据均等地分配到每个GPU上。如果数据分配不均,可能导致某些GPU数据多、某些GPU数据少,从而影响整体的训练效率。

在PyTorch中,可以使用
torch.utils.data.DataLoader
结合
torch.utils.data.distributed.DistributedSampler

DistributedSampler
会自动根据数据集、进程总数(world size)和当前进程编号(rank)来分配数据,确保每个进程获取到的数据互不重复且均衡分布。

2)如何在IO操作时避免重复

在使用PyTorch的分布式数据并行(DDP)进行模型训练时,由于每个进程都是独立运行的,IO操作如打印(
print
)、保存(
save
)或加载(
load
)等如果未经特别处理,将会在每个GPU进程上执行。这样的行为通常会导致以下问题:
重复打印
(每个进程都会输出同样的打印信息到控制台,导致输出信息重复,难以阅读)、
文件写入冲突
(如果多个进程尝试同时写入同一个文件,会产生写入冲突,导致数据损坏或者输出不正确)、
资源浪费
(每个进程重复加载相同的数据文件会增加IO负担,降低效率和浪费资源)。

一个简单且可行的解决方案是只在特定进程中进行相关操作,例如,只在rank为0的进程中执行,如有必要,再同步到其他进程。

3)如何收集每个进程上的数据进行评估

在DDP训练中,每个GPU进程独立计算其数据的评估结果(如准确率、损失等),在评估时,可能需要收集和整合这些结果。

通过
torch.distributed.all_gather
函数,可以将所有进程的评估结果聚集到每个进程中。这样每个进程都可以获取到完整的评估数据,进而计算全局的指标。如果只需要全局的汇总数据(如总损失或平均准确率),可以使用
torch.distributed.reduce

all_reduce
操作直接计算汇总结果,这样更加高效。

4. 一个最简单的DDP代码框架

篇幅太长,见下篇。

五、查资料过程中的一个小惊喜

在查找DDP有关过程中,发现了一些博客和视频做得很不错,而且这里面有一部分是女生做的。博客和视频的质量都很高,内容安排合理,逻辑表达清晰,参考资料也很全面。我看到的时候,还是很惊艳的,巾帼不让须眉!链接如下:

国立中央大学的
李馨伊

复旦大学的
_Meilinger_