2024年3月

1、准备材料

正点原子stm32f407探索者开发板V2.4

STM32CubeMX软件(
Version 6.10.0

Keil µVision5 IDE(
MDK-Arm

野火DAP仿真器

XCOM V2.6串口助手

2、学习目标

本文主要学习 FreeRTOS 互斥量的相关知识,
包括优先级翻转问题、优先级继承、死锁现象、创建/删除互斥量 和 获取/释放互斥量等知识

3、前提知识

3.1、优先级翻转问题

使用二值信号量用于进程间同步时可能会出现优先级翻转的问题,什么是“优先级翻转”问题呢?
考虑如下所述的任务运行过程

  • 在 t1 时刻,低优先级的任务 TaskLP 切入运行状态,并且获取到了一个二值信号量 Binary Semaphores
  • 在 t2 时刻,高优先级的任务 TaskHP 请求获取二值信号量 Binary Semaphores ,但是由于 TaskLP 还未释放该二值信号量,所以在 t3 时刻,任务 TaskHP 进入阻塞状态等待二值信号量被释放
  • 在 t4 时刻,中等优先级的任务 TaskMP 进入就绪状态,由于不需要获取二值信号量,因此抢占低优先级任务任务 TaskLP 切入运行状态
  • 在 t5 时刻,任务 TaskMP 运行结束,任务 TaskLP 再次切入运行状态
  • 在 t6 时刻,任务 TaskLP 运行结束,释放二值信号量 Binary Semaphores,此时任务 TaskHP 从等待二值信号量的阻塞状态切入运行状态
  • 在t7时刻,任务 TaskHP 运行结束

根据上述流程读者可以发现一个问题,
即在 t4 时刻中等优先级的任务 TaskMP 先于高优先级的任务 TaskHP 抢占了处理器,这破坏了 FreeRTOS 基于优先级抢占式执行的原则,我们将这种情况称为优先级翻转问题
,上述描述的任务运行过程具体时刻流程图如下图所示

优先级翻转可能是一个严重的问题,但在小型嵌入式系统中,通常可以在系统设计时通过考虑如何访问资源来避免该问题

3.2、优先级继承

为了解决使用二值信号量可能会出现的优先级翻转问题,
对二值信号量做了改进,增加了一种名为 “优先级继承” 的机制,改进后的实例称为了互斥量,注意虽然互斥量可以减缓优先级翻转问题的出现,但是并不能完全杜绝

接下来我们来通过例子介绍什么是优先级继承?

仍然考虑由 “3.1、优先级翻转问题” 小节中提出的任务运行过程的例子,具体流程如下所述,读者可以细心理解其中的不同之处

  • 在 t1 时刻,低优先级的任务 TaskLP 切入运行状态,并且获取到了一个互斥量 Mutexes
  • 在 t2 时刻,高优先级的任务 TaskHP 请求获取互斥量 Mutexes ,但是由于 TaskLP 还未释放该互斥量,所以在 t3 时刻,任务 TaskHP 进入阻塞状态等待互斥量被释放,
    但是与二值信号量不同的是,此时 FreeRTOS 将任务 TaskLP 的优先级临时提高到与任务 TaskHP 一致的优先级,也即高优先级
  • 在 t4 时刻,中等优先级的任务 TaskMP 进入就绪状态发生任务调度,但是由于任务 TaskLP 此时优先级被提高到了高优先级,因此任务 TaskMP 仍然保持就绪状态等待优先级较高的任务执行完毕
  • 在 t5 时刻,任务 TaskLP 执行完毕释放互斥量 Mutexes,此时任务 TaskHP 抢占处理器切入运行状态,并恢复任务 TaskLP 原来的优先级
  • 在 t6 时刻,任务 TaskHP 执行完毕,此时轮到任务 TaskMP 执行
  • 在 t7 时刻,任务 TaskMP 运行结束

根据互斥量的上述任务流程读者可以发现与二值信号量的不同之处,上述描述的任务运行过程具体时刻流程图如下图所示

3.3、什么是互斥量?

互斥量/互斥锁是一种特殊类型的二进制信号量,用于控制对在两个或多个任务之间共享资源的访问

互斥锁可以被视为一个与正在共享的资源相关联的令牌,对于合法访问资源的任务,它必须首先成功 “获取” 令牌,成为资源的持有者,当持有者完成对资源的访问之后,其需要 ”归还” 令牌,只有 “归还” 令牌之后,该令牌才可以再次被其他任务所 “获取” ,这样保证了互斥的对共享资源的访问,上述机制如下图所示
(注释1)

3.4、死锁现象

“死锁” 是使用互斥锁进行互斥的另一个潜在陷阱,当两个任务因为都在等待对方占用的资源而无法继续进行时,就会发生死锁
,考虑如下所述的情况

  1. 任务 A 执行并成功获取互斥量 X
  2. 任务 A 被任务 B 抢占
  3. 任务 B 在尝试获取互斥量 X 之前成功获取互斥量 Y,但互斥量 X 由任务 A 持有,因此对任务 B 不可用,任务 B 选择进入阻塞状态等待互斥量 X 被释放
  4. 任务 A 继续执行,它尝试获取互斥量 Y,但互斥量 Y 由任务 B 持有,所以对于任务 A 来说是不可用的,任务 A 选择进入阻塞状态等待待释放的互斥量 Y

经过上述的这样一个过程,读者可以发现任务 A 在等待任务 B 释放互斥量 Y ,而任务 B 在等待任务 A 释放互斥量 X ,两个任务都在阻塞状态无法执行,从而导致 ”死锁“ 现象的发生,与优先级翻转一样,避免 “死锁” 的最佳方法是在设计时考虑其潜在影响,并设计系统以确保不会发生死锁

3.5、什么是递归互斥量?

任务也有可能与自身发生死锁,如果任务尝试多次获取相同的互斥体而不首先返回互斥体,就会发生这种情况,考虑以下设想:

  1. 任务成功获取互斥锁
  2. 在持有互斥体的同时,任务调用库函数
  3. 库函数的实现尝试获取相同的互斥锁,并进入阻塞状态等待互斥锁变得可用

在此场景结束时,任务处于阻塞状态以等待互斥体返回,但任务已经是互斥体持有者。 由于任务处于阻塞状态等待自身,因此发生了死锁

通过使用递归互斥体代替标准互斥体可以避免这种类型的死锁,同一任务可以多次 “获取” 递归互斥锁,并且只有在每次 “获取” 递归互斥锁之后都调用一次 “释放” 递归互斥锁,才会返回该互斥锁

因此递归互斥量可以视为特殊的互斥量,一个互斥量被一个任务获取之后就不能再次获取,其他任务想要获取该互斥量必须等待这个任务释放该互斥连,但是递归互斥量可以被一个任务重复获取多次,当然每次获取必须与一次释放配对使用

注意不管是互斥量,还是递归互斥量均存在优先级继承机制,但是由于 ISR 并不是任务,因此互斥量和递归互斥量不能在中断中使用

3.5、创建互斥量

互斥量在使用之前必须先创建
,因为互斥量分为互斥量和递归互斥量两种,所以 FreeRTOS 也提供了不同的 API 函数,具体如下所述

/**
  * @brief  动态分配内存创建互斥信号量函数
  * @retval 创建互斥信号量的句柄
  */
SemaphoreHandle_t xSemaphoreCreateMutex(void);

/**
  * @brief  静态分配内存创建互斥信号量函数
  * @param  pxMutexBuffer:指向StaticSemaphore_t类型的变量,该变量将用于保存互斥锁型信号量的状态
  * @retval 返回成功创建后的互斥锁的句柄,如果返回NULL则表示内存不足创建失败
  */
SemaphoreHandle_t xSemaphoreCreateMutexStatic(StaticSemaphore_t *pxMutexBuffer);

/**
  * @brief  动态分配内存创建递归互斥信号量函数
  * @retval 创建递归互斥信号量的句柄,如果返回NULL则表示内存不足创建失败
  */
SemaphoreHandle_t xSemaphoreCreateRecursiveMutex(void);

/**
  * @brief  动态分配内存创建二值信号量函数
  * @param  pxMutexBuffer:指向StaticSemaphore_t类型的变量,该变量将用于保存互斥锁型信号量的状态
  */
SemaphoreHandle_t xSemaphoreCreateRecursiveMutex(
								StaticSemaphore_t pxMutexBuffer);

3.6、获取互斥量

获取互斥量直接使用获取信号量的函数即可,但对于递归互斥量需要专门的获取函数,具体如下所述

/**
  * @brief  获取信号量函数
  * @param  xSemaphore:正在获取的信号量的句柄
  * @param  xTicksToWait:等待信号量变为可用的时间
  * @retval 成功获取信号量则返回pdTRUE, xTicksToWait过期,信号量不可用,则返回pdFALSE
  */
BaseType_t xSemaphoreTake(SemaphoreHandle_t xSemaphore, TickType_t xTicksToWait);

/**
  * @brief  获取递归互斥量
  * @param  xMutex:正在获得的互斥锁的句柄
  * @param  xTicksToWait:等待信号量变为可用的时间
  * @retval 成功获取信号量则返回pdTRUE, xTicksToWait过期,信号量不可用,则返回pdFALSE
  */
BaseType_t xSemaphoreTakeRecursive(SemaphoreHandle_t xMutex,
								   TickType_t xTicksToWait);

3.7、释放互斥量

释放互斥量直接使用释放信号量的函数即可,但对于递归互斥量需要专门的释放函数,具体如下所述

/**
  * @brief  释放信号量函数
  * @param  xSemaphore:要释放的信号量的句柄
  * @retval 成功释放信号量则返回pdTRUE, 若发生错误,则返回pdFALSE
  */
BaseType_t xSemaphoreGive(SemaphoreHandle_t xSemaphore);

/**
  * @brief  释放递归互斥量
  * @param  xMutex:正在释放或“给出”的互斥锁的句柄
  * @retval 成功释放递归互斥量后返回pdTRUE
  */
BaseType_t xSemaphoreGiveRecursive(SemaphoreHandle_t xMutex);

3.8、删除互斥量

直接使用信号量的删除函数即可,具体如下所述

/**
  * @brief  获取信号量函数
  * @param  xSemaphore:要删除的信号量的句柄
  * @retval None
  */
void vSemaphoreDelete(SemaphoreHandle_t xSemaphore);

4、实验一:优先级翻转问题

4.1、实验目标

既然实验是讨论优先级翻转问题,那么我们来复现 “3.1、优先级翻转问题” 小节中所描述到的任务运行过程,具体如下所述

  1. 创建一个二值信号量 BinarySem_PI,用于演示优先级翻转问题
  2. 创建一个低优先级任务 Task_Low ,在该任务中获取二值信号量 BinarySem_PI ,并通过延时模拟长时间连续运行,运行结束后释放该二值信号量,整个过程会通过串口输出提示信息
  3. 创建一个中等优先级任务 Task_Middle,该任务负责在 Task_Low 模拟长时间连续运行期间抢占其处理器控制权限
  4. 创建一个高优先级任务 Task_High,该任务总是尝试获取二值信号量 BinarySem_PI

4.2、CubeMX相关配置

首先读者应按照"
FreeRTOS教程1 基础知识
"章节配置一个可以正常编译通过的 FreeRTOS 空工程,然后在此空工程的基础上增加本实验所提出的要求

本实验需要初始化 USART1 作为输出信息渠道,具体配置步骤请阅读“
STM32CubeMX教程9 USART/UART 异步通信
”,如下图所示

单击 Middleware and Software Packs/FREERTOS,在 Configuration 中单击 Tasks and Queues 选项卡双击默认任务修改其参数,然后增加另外两个不同优先级的任务,具体如下图所示

然后在 Configuration 中单击 Timers and Semaphores ,在 Binary Semaphores 中单击 Add 按钮新增加一个名为 BinarySem_PI 的二值信号量,具体如下图所示

配置 Clock Configuration 和 Project Manager 两个页面,接下来直接单击 GENERATE CODE 按钮生成工程代码即可

4.3、添加其他必要代码

按照 “
STM32CubeMX教程9 USART/UART 异步通信
” 实验 “6、串口printf重定向” 小节增加串口 printf 重定向代码,具体不再赘述

首先应该在 freertos.c 中添加信号量相关 API 和 printf() 函数的头文件,如下所述

/*freertos.c中添加头文件*/
#include "semphr.h"
#include "stdio.h"

然后在该文件中实现三个不同优先级的任务,主要是一些串口输出给用户的提示信息,方便演示实验目的,具体如下所述

/*低优先级任务*/
void AppTask_Low(void *argument)
{
  /* USER CODE BEGIN AppTask_Low */
  /* Infinite loop */
	uint8_t str1[]="Task_Low take it\r\n";
	uint8_t str2[]="Task_Low give it\r\n";
	uint8_t str3[]="return Task_Low\r\n";
	for(;;)
	{
		//获取信号量
		if(xSemaphoreTake(BinarySem_PIHandle, pdMS_TO_TICKS(200))==pdTRUE)  
		{
			printf("%s",str1);
			//模拟任务连续运行
			HAL_Delay(500);		
			printf("%s",str3);
			HAL_Delay(500);
			printf("%s",str2);
			//释放信号量
			xSemaphoreGive(BinarySem_PIHandle);		
		}
	}
  /* USER CODE END AppTask_Low */
}

/*中等优先级任务*/
void AppTask_Middle(void *argument)
{
  /* USER CODE BEGIN AppTask_Middle */
  /* Infinite loop */
	uint8_t strMid[]="Task_Middle is running\r\n";
	for(;;)
	{
		printf("%s", strMid);
		vTaskDelay(500);
	}
  /* USER CODE END AppTask_Middle */
}

/*高优先级任务*/
void AppTask_High(void *argument)
{
  /* USER CODE BEGIN AppTask_High */
  /* Infinite loop */
	uint8_t strHigh1[]="Into Task_High\r\n";
	uint8_t strHigh2[]="Task_High get token\r\n";
	uint8_t strHigh3[]="Task_High give token\r\n";
	for(;;)
	{
		printf("%s",strHigh1);
		//获取信号量
		if(xSemaphoreTake(BinarySem_PIHandle, portMAX_DELAY)==pdTRUE)  
		{
			printf("%s",strHigh2);
			printf("%s",strHigh3);
			//释放信号量
			xSemaphoreGive(BinarySem_PIHandle);	
		}
		vTaskDelay(500);
	}
  /* USER CODE END AppTask_High */
}

在 "
FreeRTOS教程5 信号量
" 文章 ”3.2、创建信号量“ 小节中曾提到,信号量被创建完之后是无效的,但是这里我们需要让刚创建的二值信号量有效,否则 Task_High 和 Task_Low 都将无法获取二值信号量,因此最后修改二值信号量的初始值为 1 即可,具体如下所示

/*将初始值0修改为1*/
BinarySem_PIHandle = osSemaphoreNew(1, 1, &BinarySem_PI_attributes);

4.4、烧录验证

烧录程序,打开串口助手,按住开发板复位按键,目的是为了让串口助手接收程序从最开始输出的信息,这里我们只分析第一轮,因为延时、语句执行等微小的时间差异会导致第二轮任务进入阻塞和退出阻塞的时间与第一轮有差异,如下所述为第一轮详细的任务执行流程

  1. 当创建完三个不同优先级的任务后不会立即得到执行,而是进入就绪状态等待调度器的启动
  2. 当调度器启动之后会按照优先级从最高优先级开始执行,因此串口输出 “Into Task_High” 表示进入高优先级任务,然后在高优先级任务 Task_High 中获得二值信号量,然后立马释放二值信号量,最后进入 500ms 的阻塞状态
  3. 当高优先级任务进入阻塞状态后,接下来会执行就绪状态的中等优先级任务 Task_Middle ,该任务无具体功能,仅仅通过串口输出 “Task_Middle is running”,然后同样进入 500ms 的阻塞状态
  4. 由于高优先级和中等优先级任务都进入阻塞状态,这时才轮到低优先级任务 Task_Low 执行,低优先级任务 Task_Low 成功获取到二值信号量并通过串口输出 “Task_Low take it” ,然后利用 500ms 的 HAL 库延时函数模拟连续运行
  5. 在 Task_Low 连续运行期间,在其即将执行完第一个 HAL_Delay(500); 时,高优先级任务 Task_High 从 500ms 的阻塞状态恢复,然后尝试获取已经被 Task_Low 获取的二值信号量,结果就是进入阻塞状态等待 Task_Low 释放二值信号量
  6. 紧接着 Task_Middle 从 500ms 的阻塞状态恢复,通过串口输出 “Task_Middle is running”,接着再次进入 500ms 阻塞状态
  7. 由于高优先级和中等优先级任务再次进入阻塞状态,因此调度器返回 Task_Low 被抢占时的程序处继续执行,因此 Task_Low 通过串口输出 “return Task_Low” ,然后利用第二个 HAL_Delay(500); 继续模拟长时间运行
  8. 在 Task_Low 第二个 HAL_Delay(500); 即将执行完毕时,Task_Middle 再次从 500ms 的阻塞状态恢复,通过串口输出 “Task_Middle is running” ,然后再次进入 500ms 阻塞状态(这里 Task_High 由于不是因为延时进入的阻塞状态所以未恢复运行状态)
  9. 最后返回 Task_Low 任务,释放二值信号量,一旦 Task_Low 任务释放二值信号量,等待二值信号量的高优先级任务 Task_High 会立马退出阻塞状态成功获取到二值信号量,并会通过串口输出 “Task_High get token“

从上述过程可知,从 Task_Low 获取二值信号量之后到第一轮结束,Task_High 等待 Task_Low 释放二值信号量,等待期间中等优先级的任务 Task_Middle 却先于高优先级任务 Task_High 得到了执行,这就是所谓的优先级翻转问题,上述过程所述的实际串口输出如下图所示

4.5、互斥量的应用

首先在 STM32CubeMX 中单击 Middleware and Software Packs/FREERTOS,在 Configuration 中单击 Mutexes 选项卡,单击 Add 按钮增加互斥量 Mutex_PI ,具体如下图所示

然后将上述实验使用的所有二值信号量句柄 BinarySem_PIHandle 修改为互斥量 Mutex_PIHandle
,不需要做其他任何操作,烧录程序即可

打开串口助手,观察串口助手的输出,如下所述为第一轮详细的任务执行流程

  1. 前4个步骤与 ”4.4、烧录验证“ 小节一致,只不过从二值信号量修改为互斥量
  2. 在第5步时,高优先级任务 Task_High 从 500ms 的阻塞状态恢复,输出 ”Into Task_High“ ,然后尝试获取已经被 Task_Low 获取的互斥量,结果就是进入阻塞状态等待 Task_Low 释放互斥量,
    同时将 Task_Low 的优先级临时提高到和高优先级任务 Task_High 一样的优先级
  3. 紧接着 Task_Middle 从 500ms 的阻塞状态恢复,但是由于现在 Task_Low 任务的优先级要高于中等优先级任务 Task_Middle ,因此不能抢占 Task_Low 任务,故无法执行任务体输出 ”Task_Middle is running“ ,所以其状态变为就绪状态,它将等待所有高优先级的任务执行完后才会执行
  4. 于是优先级被临时提高到高优先级的任务 Task_Low 继续执行其函数体内容,输出 ”return Task_Low“ ,然后执行第二个 HAL_Delay(500); ,最后释放互斥量,通过串口输出 ”Task_Low give it“
  5. 一旦互斥量被 Task_Low 释放,处于阻塞状态的 Task_High 就会立马恢复运行状态获取到互斥量,所以会通过串口输出 ”Task_High get token“ 和 ”Task_High give token“ ,
    同时当互斥量被 Task_High 任务成功获取之后,会将任务 Task_Low 临时提高的优先级恢复到其原来的低优先级
    ,最后 Task_High 调用延时函数进入 500ms 的阻塞状态
  6. 当高优先级任务 Task_High 进入阻塞状态后,系统内现在剩余就绪状态的中等优先级任务 Task_Middle 和 低优先级任务 Task_Low ,所以轮到 Task_Middle 任务执行,其将通过串口输出 ”Task_Middle is runing“ ,至此一轮结束

读者可以自行对比将二值信号量更换为互斥量之后的串口输出结果,可以发现在步骤4中,中等优先级的任务 Task_Middle 不再先于高优先级的任务 Task_High 得到执行,上述整个过程串口数据的完整输出如下图所示

5、注释详解

注释1
:图片来源于
Mastering_the_FreeRTOS_Real_Time_Kernel-A_Hands-On_Tutorial_Guide.pdf

参考资料

STM32Cube高效开发教程(基础篇)

Mastering_the_FreeRTOS_Real_Time_Kernel-A_Hands-On_Tutorial_Guide.pdf

一、先说结论

本文将结合我的工作实战经历,总结和提炼一种
从单体架构到分布式微服务都适用的一种文件上传和校验的通用解决方案,形成一个完整的方法论
。本文主要解决手段包括
多线程

设计模式

分而治之

MapReduce
等,虽然文中使用的编程语言为Java,但解决问题和优化思路是互通的,适合有一定开发经验的开发者阅读,希望对大家有帮助。

二、引言

文件上传的场景应该都不陌生,不管是C端还是B端,都会有文件上传的场景。用户在平台页面点击上传文件,用户请求在最后会到达后端服务器,后端服务器会对上传的文件进行各种校验,比如文件名称校验、文件大小校验、文件内容校验等,其中
业务逻辑最复杂、技术上有挑战性的当属文件内容校验
了。为什么这么说呢?接着看。

三、背景

文件校验和上传,看似是一件很简单的工作,要做好,可能也并非一件容易得事情。我以一个电商后台系统为例,上传csv格式的sku信息文档将会面临下面几方面挑战:

  1. 上传sku数量多
    :上传文件中sku数量不定,从个位数到百万级不等;为了好的用户体验,需要在较短的时间内上传校验完成并返回结果;

  2. 业务逻辑复杂
    :文件上传校验需要校验每条内容,校验规则多且复杂,校验规则包括录入的sku格式是否符合,如不符合需要给出提示语1;校验上传的sku是否合法有效,如果需要给出相应的提示语2;校验该操作人是否有该sku管理权限,如果没有给出相应的提示语3……每个校验逻辑中可能还包含许多分支、循环逻辑……

  3. 外部依赖RPC多
    :上传校验过程中涉及多个外部依赖RPC的调用,比如sku的管理权限校验,需要调用用户中台RPC接口获取上传人的基本信息;校验sku是否是本次活动范围,需要调用直播中台RPC接口……

四、关键问题拆解和解决思路

  1. 上传数量多且要求体验友好,就要求要注意高性能方面的优化:对于业务服务器来说,如果是单机性能优化,需要考虑使用多线程技术来充分发挥服务器性能;如果是分布式的服务,在优化单机性能无法业务场景需要的时候,还可以考虑依靠中间件来协同不同服务器,发挥集群优势。

  2. 业务逻辑复杂,就要求写出来的代码有较高的可阅读性、可维护性,不要成为“大泥球”:除了在系统架构方面的优化之外,对于开发人员,可以考虑使用设计模式来提高代码质量。

  3. 外部RPC依赖多,网络数据IO操作,接口性能可能无法保证,就需要使用异步调用的方式来保证性能;

五、系统架构

假设有这么一个电商活动管理系统,从架构上来说,可以分为
服务层、业务层、数据层和外部依赖
,架构图如下:

  • 服务层:包括对外服务和外部调用;
  • 业务层:活动的生命周期,包括创建、查看、修改、关闭流程;
  • 数据层:数据存储,主要是数据库集群和缓存集群;
  • 外部依赖:外部依赖的RPC服务,包括商品RPC服务等;

在技术实现方面,该系统是
前后端分离
的系统,前后端通过域名进行交互。
前端服务主要提供操作页面,用户可以在页面端进行各种操作,例如创建活动、查看活动、修改活动、关闭活动等;

后端采用的是微服务架构,按照功能拆分为提供HTTP接口的
soa应用
、提供MQ消费功能的
MQ应用
、提供RPC服务的
RPC应用
,存储使用的是
MySQL和Redis集群
,大概架构图如下:

六、Java多线程实践

6.1 使用Java多线程优化单机性能

分析上面的场景,明显是IO密集型的场景。IO 密集型指的是大部分时间都在执行 IO 操作,主要包括网络 IO 和磁盘 IO,以及与计算机连接的一些外围设备的访问。在上面场景中,校验过程中需要调用大量RPC接口,大部分时间调用都在等待网络IO,所以可以使用
异步和多线程的设计方法
来提升网络IO性能,从而优化整体性能。

关于Java多线程在这里不赘述了,直接看关键代码实现吧:

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    @ResponseBody
    @RequestMapping(value = "uploadSku", method = RequestMethod.POST)
    public Result uploadSku(@RequestParam(value = "file", required = false) MultipartFile file) throws IOException {
        Result result = new Result();
        result.setSuccess(true);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(file.getInputStream()));

        try {
            // 校验文件名称
            result = checkFileNameFormat(file);
            if (!result.isSuccess()) {
                return result;
            }

            // 校验文件内容格式并填充校验任务
            List<UploadResInfo> uploadResInfos = new ArrayList<>();
            List<SkuCheckTask> tasks = checkFileContentAndFillSkuCheckTask(result, bufferedReader, uploadResInfos);

            // 执行校验任务
            result = dealSkuSkuCheckTask(tasks, uploadResInfos);

        } catch (Exception e) {
            result.setSuccess(false);
            result.setErrorMessage("上传文件异常!");
        }
        return result;
    }
    
        /**
     * @param tasks
     * @param uploadResInfos
     * @return
     */
    private Result dealSkuSkuCheckTask(List<SkuCheckTask> tasks, List<UploadResInfo> uploadResInfos) throws Exception {
        Result result = new Result();
        result.setSuccess(true);
        List<Long> passedSkus = new ArrayList<>();
        if (!CollectionUtils.isEmpty(tasks)) {
            List<Future<Result>> futureList = executorService.invokeAll(tasks);
            for (Future<Result> tempResult : futureList) {
                if (tempResult.get().isSuccess()) {
                    Result tempRes = tempResult.get();
                    if (null != tempRes.getResult().get("uploadResInfos")) {
                        uploadResInfos.addAll((List<UploadResInfo>) tempRes.getResult().get("uploadResInfos"));
                    }
                    passedSkus.addAll((List<Long>) tempRes.getObject());
                }
            }
        }
        result.addDefaultModel("passedSkus", passedSkus);
        if (passedSkus.size() == 0) {
            result.setErrorMessage("上传都不通过");
        }
        return result;
    }
public class SkuCheckTask implements Callable<Result> {

    private List<Long> skuList;

    public SkuCheckTask(List<Long> skuList) {
        this.skuList = skuList;
    }

    @Override
    public Result call() throws Exception {
        Result result = new Result();
        result.setSuccess(true);
        List<Long> passedSkuList = new ArrayList<>();
        List<UploadResInfo> uploadResInfos = new ArrayList<>();

        for (int i = 0; i < skuList.size(); i++) {
            if (checkSku(skuList.get(i))) {
                passedSkuList.add(skuList.get(i));
            } else {
                UploadResInfo uploadResInfo = new UploadResInfo(skuList.get(i).toString(), false, "RPC校验失败");
                uploadResInfos.add(uploadResInfo);
            }
        }
        result.setObject(passedSkuList);
        result.addDefaultModel("uploadResInfos", uploadResInfos);
        return result;
    }

    /**
     * 校验sku,复杂校验逻辑
     *
     * @param sku
     * @return
     */
    private boolean checkSku(Long sku) {
        // 复杂校验逻辑,例如多个RPC调用等耗时操作
        System.out.println("校验sku:" + sku);
        return true;
    }
}

6.2 线程数的设置

我们知道,调整线程池中的线程数量的主要是为了充分并合理地使用 CPU 和内存等资源,从而最大限度地提高程序的性能。

对于
CPU密集型任务
(比如加解密、压缩和解压、计算),最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。因为CPU密集型任务本来就会占用大量的CPU资源,CPU 的每个核心工作基本都是满负荷的,而如果设置了过多的线程,每个线程都要去争取CPU资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多反而会导致性能下降。

对于
IO密集型任务
(比如数据库读写、文件读写、网络通信等),这种任务并不会太消耗CPU资源,反而是在等待IO操作。线程数设置可以参考以下公式:

线程数 = CPU核心数 * (1 + 平均等待时间/平均工作时间)

在本程序中,使用了线程池:
FixedThreadPool
,并将线程数设置为10。这里的考虑是容器为16C32G的配置,除了上传任务,服务端还会处理其他的任务,还有其他的线程池,为了综合考虑,这里只是分配了10个线程数。当然,最佳实践是使用远程配置中心动态调整线程池线程数,实现
动态线程池
,在实践中进行调整和压测,最终找到合适的线程数配置。

七、责任链模式实践

对于上述这个校验逻辑,最常见的处理方式是使用 if…else…条件判断语句来处理,这样处理可能存在这样的问题:

  1. 代码复杂度高
    :该场景中的判定条件通常不是简单的判断,需要调用外部RPC接口查询数据,从结果中解析到需要的字段,才能进行逻辑判断。这样代码的嵌套层数就会很多,代码复杂度就会很高,不用太久,这段代码将发展成为“大泥球”。
  2. 代码耦合度高
    :如果业务需求新增校验逻辑,那么就要继续添加 if…else…判定条件;另外,这个条件判定的顺序也是写死的,如果想改变顺序,那么也只能修改这个条件语句。

那么面对上面这种场景,如何实现更优雅呢?。其实这里也很简单,就是把判定条件的部分放到处理类中,这就是
责任链模式
。如果满足条件 1,则由 Handler1 来处理,不满足则向下传递;如果满足条件 2,则由 Handler2 来处理,不满足则继续向下传递,以此类推,直到条件结束。部分代码如下:

Handler接口:

public interface SkuCheckHandler {
    BaseResult doHandler(UploadInfo uploadInfo);
}

SkuCheckHandler接口实现Handler1:

public class Handler1 implements SkuCheckHandler {
    @Override
    public BaseResult doHandler(UploadInfo uploadInfo) {
        // 调用用户中台校验权限
        return new BaseResult();
    }
}

遍历Handler进行校验,如果Handler校验不通过直接返回校验结果,校验通过则继续进入下一个Handler进行校验:

public class SkuCheckHandlerChain {

    private List<SkuCheckHandler> handlers = new ArrayList<>();

    public void addHandler(SkuCheckHandler skuCheckHandler) {
        this.handlers.add(skuCheckHandler);
    }

    public BaseResult handle(UploadInfo uploadInfo){
        BaseResult baseResult = new BaseResult();
        baseResult.setSuccess(true);
        for (SkuCheckHandler handler : handlers) {
            baseResult = handler.doHandler(uploadInfo);
            if (!baseResult.isSuccess()) {
                return baseResult;
            }
        }
        return baseResult;
    }

}

责任链设置和调用:

    private boolean checkSku(Long sku) {
        // 复杂校验逻辑,例如多个RPC调用等耗时操作
        System.out.println("校验sku:" + sku);
        // 后续校验都依赖商品信息,所以需要调商品RPC获取Sku信息-uploadInfo
        UploadInfo uploadInfo = new UploadInfo();
        SkuCheckHandlerChain handlerChain = new SkuCheckHandlerChain();
        handlerChain.addHandler(new Handler1());
        handlerChain.addHandler(new Handler2());
        BaseResult baseResult = handlerChain.handle(uploadInfo);
        return baseResult.isSuccess();
    }

如果想了解更多责任链模式,可以参考:《设计模式:如何优雅地使用责任链模式》

八、分布式文件上传最佳实践

8.1 MapReduce简介

当使用了多线程技术,并优化了线程数,似乎单机性能已经达到了极限。但是如果此时仍然不能满足业务场景需要,那又该怎么优化呢?

有人可能会想到
垂直扩容
,升级更高配的机器来提升性能。这个办法当然是可行的,也是最简单粗暴的方式,唯一的缺点就是“费钱”,土豪请随意。一般来说,Google的方式可能更加值得借鉴,Google使用“
3M胶带粘在一起的服务器
”打败了成本更高的高配计算机。

在面对海量数据背景下,Google科学家
杰夫·迪恩
提出了
MapReduce技术
。MapReduce其实并不复杂,使用的正是
分而治之
(Divide and Conquer)的思想。打个不太恰当的比方就是,
老板分作业,小兵完成作业,老板进行汇总

MapReduce其实也是
自顶向下的递归
。MapReduce先在最顶层将一个复杂的大任务分解成为成百上千个小任务;然后将每个小任务分配到一个服务器上去求解;最后再将每个服务器上面的结果综合起来,得到原来大任务的最终结果。第一个
自顶向下分解的过程称为Map
,第二个
自底向上合并的过程称为Reduce

其核心原理其实可以看这张图,图片出自论文《MapReduce: Simplified Data Processing on Large Clusters》。

8.2 MapReduce在文件上传场景的应用

单机服务器性能无法满足,应该考虑合理利用多台机器,不同
微服务之间相互协作
,共同完成上传的任务。借鉴MapReduce核心思想,可以使用现有系统架构,实现大文件的分布式上传和校验。

一图胜前言,方案说明都在图片中了,详细请看:

九、踩坑和代码调试

9.1 踩坑1:MQ消费中使用LoginContext获取用户信息异常

其中有个踩坑点需要注意,在soa应用中常用的LoginContext获取用户信息;
在MQ应用中,使用LoginContext将无法获取到用户信息
,如果使用将会出现空指针异常;出现异常之后,MQ消费将会进行重试,重试也一直会发生异常,从而死循环,无法得到正确的结果。

9.2 代码调试-Idea远程Debug

在开发工作中,代码写完并不是万事大吉了。部署到服务器测试过程中,可能还会发现各种各样意料之外的错误。当服务器日志打印过多或者过少都影响问题排查的效率,以文件上传场景为例,如果不打印完整的出入参,出现问题没有日志可以用来排查问题;如果每个方法都打印完整的出入参日志,当上传文件中sku数量较多,可以想象下如果有100w条的sku信息,从这么多的日志中去排查问题无异于“大海捞针”。

那这个问题无解了吗?当然不是,远程Debug可以提升排查效率,同事妹子看见了都直呼YYDS。其实这个工具就是我们几乎人人都在用的Idea,Idea自带了远程调试工具。下面是我的使用经验,适用于部署在Tomcat容器工程代码:

9.2.1 环境配置

  1. 远程Tomcat配置

远程Tomcat添加启动参数并重启生效:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

  1. Idea配置

话不多说,图上都有:

  1. 启动调试

9.2.2 常见问题

  1. 为什么调试断点没生效?

本地和远程代码要相同,不一样则会出现无法进入断点的情况;
如果代码一致还是无法进入,尝试重启,一般可以解决;

  1. 进入断点调试之后,服务器还可以处理其他请求吗?

服务器在断点处停住了,无法处理其他请求;

  1. 改了本地代码可以直接debug吗?

不可以,需要部署在远程服务器之后再次启动debug;

通用解决方案总结

通过上述过程之后,总结出一套通用的大文件上传和校验的解决方案。总结一下就是,如果现在技术架构还处在单机架构的阶段,可以考虑使用多线程技术优化单机性能;为了使代码优雅一点,可以考虑使用责任链模式;如果现在技术架构已经发展到分布式和微服务了,可以借鉴分而治之的思想,让多服务器协作工作,发挥多服务器的优势。

如果用三个词总结,那就是:多线程、责任链模式、分而治之和MapReduce

一起学习

欢迎各位在评论区或者私信我一起交流讨论,或者加我主页weixin,备注技术渠道(如博客园),进入技术交流群,我们一起讨论和交流,共同进步!

也欢迎大家关注我的
博客园、公众号(码上暴富)
,点赞、留言、转发。
你的支持,是我更文的最大动力!

写在开头

在之前的几篇博文中,我们都提到了
volatile
关键字,这个单词中文释义为:不稳定的,易挥发的,在Java中代表变量修饰符,用来修饰会被不同线程访问和修改的变量,对于方法,代码块,方法参数,局部变量以及实例常量,类常量多不能进行修饰。

自JDK1.5之后,官网对volatile进行了语义增强,这让它在Java多线程领域越发重要!因此,我们今天就抽一晚上时间,来学一学这个关键字,首先,我们从标题入手,思考这样的一个问题:

volatile是如何保证可见性的?又是如何禁止指令重排的,它为什么不能实现原子性呢?

带着疑问,我们一起走进volatile的世界,探索它与可见性,有序性,原子性之间的爱恨情仇!

volatile如何保证可见性?

volatile保证了不同线程对共享变量进行操作时的可见性,即一个线程修改了共享变量的值,共享变量修改后的值对其他线程立即可见。

我们先通过之前写的一个小案例来感受一下什么是可见性问题:

【代码示例1】

public class Test {
    //是否停止 变量
    private static boolean stop = false;
    public static void main(String[] args) throws InterruptedException {
        //启动线程 1,当 stop 为 true,结束循环
        new Thread(() -> {
            System.out.println("线程 1 正在运行...");
            while (!stop) ;
            System.out.println("线程 1 终止");
        }).start();
        //休眠 1 秒
        Thread.sleep(1000);
        //启动线程 2, 设置 stop = true
        new Thread(() -> {
            System.out.println("线程 2 正在运行...");
            stop = true;
            System.out.println("设置 stop 变量为 true.");
        }).start();
    }
}

输出:

线程 1 正在运行...
线程 2 正在运行...
设置 stop 变量为 true.

原因:
我们会发现,线程1运行起来后,休眠1秒,启动线程2,可即便线程2把stop设置为true了,线程1仍然没有停止,这个就是因为 CPU 缓存导致的可见性导致的问题。线程 2 设置 stop 变量为 true,线程 1 在 CPU 1上执行,读取的 CPU 1 缓存中的 stop 变量仍然为 false,线程 1 一直在循环执行。
image

那这个问题怎么解决呢?很好解决!我们排volatile上场可以秒搞定,只需要给stop变量加上volatile修饰符即可!

【代码示例2】

//给stop变量增加volatile修饰符
private static volatile boolean stop = false;

输出:

线程 1 正在运行...
线程 2 正在运行...
设置 stop 变量为 true.
线程 1 终止

从结果中看,线程1成功的读取到了线程而设置为true的stop变量值,解决了可见性问题。那volatile到底是什么让变量在多个线程之间保持可见性的呢?请看下图!
image

如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取,具体实现可总结为5步。

  • 1️⃣在生成最低成汇编指令时,对volatile修饰的共享变量写操作增加Lock前缀指令,Lock 前缀的指令会引起 CPU 缓存写回内存;
  • 2️⃣CPU 的缓存回写到内存会导致其他 CPU 缓存了该内存地址的数据无效;
  • 3️⃣volatile 变量通过缓存一致性协议保证每个线程获得最新值;
  • 4️⃣缓存一致性协议保证每个 CPU 通过嗅探在总线上传播的数据来检查自己缓存的值是不是修改;
  • 5️⃣当 CPU 发现自己缓存行对应的内存地址被修改,会将当前 CPU 的缓存行设置成无效状态,重新从内存中把数据读到 CPU 缓存。

volatile如何保证有序性?

在之前的学习我们了解到,为了充分利用缓存,提高程序的执行速度,编译器在底层执行的时候,会进行指令重排序的优化操作,但这种优化,在有些时候会带来
有序性
的问题。

那何为有序性呢?我们可以通俗理解为:
程序执行的顺序要按照代码的先后顺序。
当然,之前我们还说过发生有序性问题时,我们可以通过给变量添加volatile修饰符进行解决。

首先,我们来回顾一下之前写的一个关于有序性问题的测试类。
【代码示例1】

int a = 1;(1)
int b = 2;(2)
int c = a + b;(3)

上面的这段代码中,c变量依赖a,b的值,因此,在编译器优化重排时,c肯定会在a,b赋值以后执行,但a,b之间没有依赖关系,可能会发生重排序,但这种重排序即便到了多线程中依旧不会存在问题,因为即便重排对执行结果也无影响。

但有些时候,指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致,我们继续看下面这段代码:

【代码示例2】

public class Test {

    private static int num = 0;
    private static boolean ready = false;
    //禁止指令重排,解决顺序性问题
    //private static volatile boolean ready = false;

    public static class ReadThread extends Thread {

        @Override
        public void run() {

            while (!Thread.currentThread().isInterrupted()) {
                if (ready) {//(1)
                    System.out.println(num + num);//(2)
                }
                System.out.println("读取线程...");
            }
        }
    }

    public static class WriteRead extends Thread {

        @Override
        public void run() {
            num = 2;//(3)
            ready = true;//(4)
            System.out.println("赋值线程...");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReadThread rt = new ReadThread();
        rt.start();

        WriteRead wr = new WriteRead();
        wr.start();

        Thread.sleep(10);
        rt.interrupt();
        System.out.println("rt stop...");
    }
}

我们定义了2个线程,一个用来求和操作,一个用来赋值操作,因为定义的是成员变量,所以代码(1)(2)(3)(4)之间不存在依赖关系,在运行时极可能发生指令重排序,如将(4)在(3)前执行,顺序为(4)(1)(3)(2),这时输出的就是0而不是4,但在很多性能比较好的电脑上,这种重排序情况不易复现。
这时,我们给ready 变量添加一个volatile关键字,就成功的解决问题了。

volatile关键字可以禁止指令重排的原因主要有两个!

一、3 个 happens-before 规则的实现

  1. 对一个 volatile 变量的写 happens-before 任意后续对这个 volatile 变量的读;
  2. 一个线程内,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操作;
  3. happens-before 传递性,A happens-before B,B happens-before C,则 A happens-before C。

二、内存屏障
变量声明为 volatile 后,在对这个变量进行读写操作的时候,会通过插入特定的
内存屏障
的方式来禁止指令重排序。

内存屏障(Memory Barrier 又称内存栅栏,是一个 CPU 指令),为了实现volatile 内存语义,volatile 变量的写操作,在变量的前面和后面分别插入内存屏障;volatile 变量的读操作是在后面插入两个内存屏障。

具体屏障规则:

  1. 在每个 volatile 写操作的前面插入一个 StoreStore 屏障;
  2. 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障;
  3. 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障;
  4. 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。

屏障说明:

  1. StoreStore:禁止之前的普通写和之后的 volatile 写重排序;
  2. StoreLoad:禁止之前的 volatile 写与之后的 volatile 读/写重排序;
  3. LoadLoad:禁止之后所有的普通读操作和之前的 volatile 读重排序;
  4. LoadStore:禁止之后所有的普通写操作和之前的 volatile 读重排序。

OK,知道了这些内容之后,我们再回头看代码示例2中,增加了volatile关键字后的执行顺序,在赋值线程启动后,执行顺序会变成(3)(4)(1)(2),这时打印的结果就为4啦!

volatile为什么不能保证原子性?

我们讲完了volatile修饰符保证可见性与有序性的内容,接下来我们思考另外一个问题,它能够保证原子性吗?为什么?我们依旧通过一段代码去证明一下!

【代码示例3】

public class Test {
    //计数变量
    static volatile int count = 0;
    public static void main(String[] args) throws InterruptedException {
        //线程 1 给 count 加 10000
        Thread t1 = new Thread(() -> {
            for (int j = 0; j <10000; j++) {
                count++;
            }
            System.out.println("thread t1 count 加 10000 结束");
        });
        //线程 2 给 count 加 10000
        Thread t2 = new Thread(() -> {
            for (int j = 0; j <10000; j++) {
                count++;
            }
            System.out.println("thread t2 count 加 10000 结束");
        });
        //启动线程 1
        t1.start();
        //启动线程 2
        t2.start();
        //等待线程 1 执行完成
        t1.join();
        //等待线程 2 执行完成
        t2.join();
        //打印 count 变量
        System.out.println(count);
    }
}

我们创建了2个线程,分别对count进行加10000操作,理论上最终输出的结果应该是20000万对吧,但实际并不是,我们看一下真实输出。

输出:

thread t1 count 加 10000 结束
thread t2 count 加 10000 结束
14281

原因:
Java 代码中 的 count++并非原子的,而是一个复合性操作,至少需要三条CPU指令:

  • 指令 1:把变量 count 从内存加载到CPU的寄存器
  • 指令 2:在寄存器中执行 count + 1 操作
  • 指令 3:+1 后的结果写入CPU缓存或内存

即使是单核的 CPU,当线程 1 执行到指令 1 时发生线程切换,线程 2 从内存中读取 count 变量,此时线程 1 和线程 2 中的 count 变量值是相等,都执行完指令 2 和指令 3,写入的 count 的值是相同的。从结果上看,两个线程都进行了 count++,但是 count 的值只增加了 1。这种情况多发生在cpu占用时间较长的线程中,若单线程对count仅增加100,那我们就很难遇到线程的切换,得出的结果也就是200啦。

要想解决也很简单,利用 synchronized、Lock或者AtomicInteger都可以,我们在后面的文章中会聊到的,请继续保持关注哦!

结尾彩蛋

如果本篇博客对您有一定的帮助,大家记得
留言+点赞+收藏
呀。原创不易,转载请联系Build哥!

image

如果您想与Build哥的关系更近一步,还可以关注“JavaBuild888”,在这里除了看到《Java成长计划》系列博文,还有提升工作效率的小笔记、读书心得、大厂面经、人生感悟等等,欢迎您的加入!

image

Nuget包 Microsoft.Extensions.Telemetry.Abstractions 包含的新的日志记录source generator,它支持使用[LogProperties]将整个对象作为State与日志一起记录。

我将展示一种方法来控制如何使用[LogProperties]对象自动丰富日志。

示例

您可以使用日志source generator创建一个如下所示的方法,并使用[LoggerMessage]属性对其进行装饰:

public static partial classLog
{
[LoggerMessage(
EventId
= 0,
Level
=LogLevel.Error,
Message
= "Can not open SQL connection {err}")]public static partial void CouldNotOpenConnection(this ILogger logger, stringerr);
}
private static async Task Main(string[] args)
{
using ILoggerFactory loggerFactory =LoggerFactory.Create(
builder
=>builder.AddJsonConsole(
options
=>options.JsonWriterOptions= newJsonWriterOptions()
{
Indented
= true}));

ILogger logger
= loggerFactory.CreateLogger("Program");

logger.CouldNotOpenConnection(
"network err");
}

您也可以传递[LogProperties]对象装饰的对象,它们将与您的消息一起记录。

使用前安装nuget包。

<PackageReference Include="Microsoft.Extensions.Telemetry.Abstractions" Version="8.3.0" />

然后定义一个记录日志方法,将一个对象传递给日志方法,并用LogProperties装饰:

public classNetWorkInfo
{
public string IPAddress { get; set; }public int Port { get; set; }
}
public static partial classLog
{
[LoggerMessage(
EventId
= 0,
Level
=LogLevel.Error,
Message
= "Can not open SQL connection {err}")]public static partial void CouldNotOpenConnection(this ILogger logger, stringerr, [LogProperties] NetWorkInfo netWork);
}

logger.CouldNotOpenConnection(
"network err", new NetWorkInfo { IPAddress = "123.1.1", Port = 7777 });

运行可以看到新增的Netwrok所有属性都会添加到消息的State属性中:

忽略属性

如果您不想在日志中包括[LogProperties]对象的特定属性,可以使用[LogPropertyIgnore]对其进行装饰:

public classNetWorkInfo
{
public string IPAddress { get; set; }//从日志中移除 [LogPropertyIgnore]public int Port { get; set; }
}

原理

其原理也是使用的source generator,可在vs中看到生成的代码

今天是第二堂课,我们将继续学习爬虫技术。在上一节课中,我们已经学会了如何爬取干饭教程。正如鲁迅所说(我没说过),当地吃完饭就去外地吃,这启发了我去爬取城市天气信息,并顺便了解当地美食。这个想法永远是干饭人的灵魂所在。

今天我们的目标是学习如何爬取城市天气信息,因为要计划去哪里玩耍,首先得了解天气情况。虽然我们的手机已经装有许多免费天气软件,但是也不妨碍我们学习。

在我们开始学习爬虫技术之前,首先需要找到一个容易爬取数据的天气网站。并不要求特定网站,只要易于爬取的网站即可。毕竟我们目前并不需要爬取特定网站来抢票或抢购商品,我们的主要目的是学习爬虫技术。

天气爬虫

在进行爬虫操作时,如果不确定一个网站是否易于爬取,可以先尝试输入该网站的首页地址,查看能否成功解析出HTML网页。如果解析出来的页面与实际浏览的页面一致,那么说明该网站可能没有设置反爬虫机制;反之,如果解析出来的页面与实际不同,那么该网站很可能设置了反爬虫措施。在学习阶段,建议选择较为容易爬取的网站进行练习,避免过早挑战难度过大的网站。

好的,废话不多说,我们现在就开始抓取该网站上的所有城市信息。

城市列表

天气信息肯定与城市相关,因此几乎每个天气网站都会有城市列表。让我们先来抓取这些城市列表并保存起来,以备后续使用。以下是相应的代码:

# 导入urllib库的urlopen函数
from urllib.request import urlopen,Request
# 导入BeautifulSoup
from bs4 import BeautifulSoup as bf

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
req = Request("https://www.tianqi.com/chinacity.html",headers=headers)
# 发出请求,获取html
# 获取的html内容是字节,将其转化为字符串
html = urlopen(req)
html_text = bytes.decode(html.read())
obj = bf(html_text,'html.parser')
# 使用find_all函数获取所有图片的信息
province_tags = obj.find_all('h2')
for province_tag in province_tags:
    province_name = province_tag.text.strip()
    cities = []
    print(province_name)
    next_sibling = province_tag.find_next_sibling()
    city_tags = next_sibling.find_all('a')
    for city_tag in city_tags:
        city_name = city_tag.text.strip()
        cities.append(city_name)
        print(city_name)

在上述操作中,主要的步骤是从城市地址页面中获取信息,对其进行解析以获取省份和城市之间的对应关系。目前仅仅进行了简单的打印输出。

城市天气

在获取城市信息之后,接下来的步骤是根据城市信息获取天气信息。在这里,我们仅考虑直辖市的天气情况,而省份的天气信息获取相比直辖市多了一步省份的跳转。我们暂时不进行省份天气信息的演示。现在,让我们一起来看一下代码:

# 导入urllib库的urlopen函数
from urllib.request import urlopen,Request
# 导入BeautifulSoup
from bs4 import BeautifulSoup as bf

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
req = Request(f"https://www.tianqi.com/beijing/",headers=headers)
# 发出请求,获取html
# 获取的html内容是字节,将其转化为字符串
html = urlopen(req)
html_text = bytes.decode(html.read())
obj = bf(html_text,'html.parser')
city_tags = obj.find_all('div',class_='mainWeather')
for city_tag in city_tags:
    a_tags = city_tag.find_all('a', class_=lambda value: value != 'd15')
    for a_tag in a_tags:
        title = a_tag.get('title')
        print(title)
foods = obj.find_all('ul',class_='paihang_good_food')
for food in foods:
    a_tags = food.find_all('a')
    for a_tag in a_tags:
        href = a_tag.get('href')
        print(href)
        title = a_tag.get('title')
        print(title)
weather_info = obj.find_all('dl', class_='weather_info')
for info in weather_info:
    city_name = info.find('h1').text
    date = info.find('dd', class_='week').text
    temperature = info.find('p', class_='now').text
    humidity = info.find('dd', class_='shidu').text
    air_quality = info.find('dd', class_='kongqi').h5.text
    print(f"地点:{city_name}")
    print(f"时间:{date}")
    print(f"当前温度:{temperature}")
    print(humidity)
    print(air_quality)   

以上代码不仅仅把天气解析出来,而且将当前地址的天气和各个城区 的天气以及当地美食都解析了出来。当地美食因为链接是变动的,所以将链接和美食做了响应的映射关系保存。

城市美食

在确定天气适宜的情况下,我们通常都会想了解当地有哪些特色美食,毕竟不能总是吃快餐,特色美食才是我们吃货的灵魂所在。

以下是一个示例代码:

# 导入urllib库的urlopen函数
from urllib.request import urlopen,Request
# 导入BeautifulSoup
from bs4 import BeautifulSoup as bf

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
req = Request(f"https://www.tianqi.com/meishi/1737.html",headers=headers)
# 发出请求,获取html
# 获取的html内容是字节,将其转化为字符串
html = urlopen(req)
html_text = bytes.decode(html.read())
obj = bf(html_text,'html.parser')
span_tag = obj.find('span', class_='traffic')
text_content = ''.join(span_tag.stripped_strings)
print(text_content)

在这里,我主要解析了当前美食推荐的原因。实际上,链接应该与之前解析的天气信息相关联,但为了演示方便,我在示例代码中使用了固定值。

包装一下

将以上内容单独制作成小案例确实是一种有效的方式,但将其整合成一个简单的小应用则更具实用性,因为这样可以实现更灵活的交互。让我们一起来看一下最终的代码:

# 导入urllib库的urlopen函数
from urllib.request import urlopen,Request
import urllib,string
# 导入BeautifulSoup
from bs4 import BeautifulSoup as bf
from random import choice,sample
from colorama import init
from os import system
from termcolor import colored
from readchar import  readkey
from xpinyin import Pinyin

p = Pinyin()

city_province_mapping = []

province_sub_weather = []

good_foods = []
FGS = ['green', 'yellow', 'blue', 'cyan', 'magenta', 'red']

def clear():
    system("CLS")

def get_city_province_mapping():
    print(colored('开始搜索城市',choice(FGS)))
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
    req = Request("https://www.tianqi.com/chinacity.html",headers=headers)
    # 发出请求,获取html
    # 获取的html内容是字节,将其转化为字符串
    html = urlopen(req)
    html_text = bytes.decode(html.read())
    obj = bf(html_text,'html.parser')
    # 使用find_all函数获取所有图片的信息
    province_tags = obj.find_all('h2')
    for province_tag in province_tags:
        province_name = province_tag.text.strip()
        cities = []

        next_sibling = province_tag.find_next_sibling()
        city_tags = next_sibling.find_all('a')
        for city_tag in city_tags:
            city_name = city_tag.text.strip()
            cities.append(city_name)

        city_province_mapping.append((province_name,cities))


def get_province_weather(province):
    print(colored(f'已选择:{province}',choice(FGS)))
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
    req = Request(f"https://www.tianqi.com/{province}/",headers=headers)
    # 发出请求,获取html
    # 获取的html内容是字节,将其转化为字符串
    html = urlopen(req)
    html_text = bytes.decode(html.read())
    obj = bf(html_text,'html.parser')
    city_tags = obj.find_all('div',class_='mainWeather')
    # city_tags = obj.find_all('ul',class_='raweather760')
    province_sub_weather.clear()
    print(colored('解析主要城市中',choice(FGS)))
    for city_tag in city_tags:
        a_tags = city_tag.find_all('a', class_=lambda value: value != 'd15')
        for a_tag in a_tags:
            title = a_tag.get('title')
            province_sub_weather.append(title)
    foods = obj.find_all('ul',class_='paihang_good_food')
    print(colored('解析热搜美食中',choice(FGS)))
    for food in foods:
        a_tags = food.find_all('a')
        for a_tag in a_tags:
            href = a_tag.get('href')
            title = a_tag.get('title')
            good_foods.append((href, title))
    weather_info = obj.find_all('dl', class_='weather_info')
    print(colored('解析完毕',choice(FGS)))
    for info in weather_info:
        city_name = info.find('h1').text
        date = info.find('dd', class_='week').text
        temperature = info.find('p', class_='now').text
        humidity = info.find('dd', class_='shidu').text
        air_quality = info.find('dd', class_='kongqi').h5.text

        print(colored(f"地点:{city_name}",choice(FGS)))
        print(colored(f"时间:{date}",choice(FGS)))
        print(colored(f"当前温度:{temperature}",choice(FGS)))
        print(colored(humidity,choice(FGS)))
        print(colored(air_quality,choice(FGS)))   
def search_food(link):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'}
    req = Request(f"https://www.tianqi.com{link}",headers=headers)
    # 发出请求,获取html
    # 获取的html内容是字节,将其转化为字符串
    html = urlopen(req)
    html_text = bytes.decode(html.read())
    obj = bf(html_text,'html.parser')
    span_tag = obj.find('span', class_='traffic')
    text_content = ''.join(span_tag.stripped_strings)
    print(colored(text_content,choice(FGS)))

def print_menu():
    for i in range(0, 4, 3):
        names = [f'{i + j}:{city_province_mapping[i + j][0]}' for j in range(3) if i + j < 4]
        print(colored('\t\t'.join(names),choice(FGS)))

def print_food():
    if not good_foods:
        print(colored('请选择城市,才可查看',choice(FGS)))
        return
    for i in range(0, len(good_foods), 3):
        names = [f'{i + j}:{good_foods[i + j][1]}' for j in range(3) if i + j < len(good_foods)]
        print(colored('\t\t'.join(names),choice(FGS)))

def print_hot(weather):
    if not weather:
        print(colored('请选择城市,才可查看',choice(FGS)))
        return
    for i in range(0,len(weather), 3):
        names = [f'{i + j}:{weather[i + j]}' for j in range(3) if i + j < len(weather)]
        print(colored('\t\t'.join(names),choice(FGS)))

get_city_province_mapping()

# get_province_weather('beijing')
# search_food(good_foods[1][0])
init() ## 命令行输出彩色文字
print(colored('已搜索完毕!',choice(FGS)))
print(colored('m:返回首页',choice(FGS)))
print(colored('h:查看当前城区天气',choice(FGS)))
print(colored('f:查看当地美食',choice(FGS)))
print(colored('q:退出天气',choice(FGS)))
my_key = ['q','m','c','h','f']
while True:
    while True:
        move = readkey()
        if move in my_key:
            break
    if move == 'q': ## 键盘‘Q’是退出
        break 
    if move == 'c': ## 键盘‘C’是清空控制台
        clear()
    if move == 'h':  
        print_hot(province_sub_weather)
    if move == 'f':  
        print_food()
        num = int(input('请输入美食编号:=====>'))
        if num <= len(good_foods):
            search_food(good_foods[num][0])
    if move == 'm':
        print_menu()
        num = int(input('请输入城市编号:=====>'))
        if num <= len(city_province_mapping):
            pinyin_without_tone = p.get_pinyin(city_province_mapping[num][0],'')
            get_province_weather(pinyin_without_tone)

按照我的习惯,我通常喜欢在控制台中进行打印输出,这样可以避免不必要的UI依赖。虽然整个过程并不算太复杂,但解析数据确实需要花费一些时间。尽管如此,还是成功完成了天气信息的爬取任务。

总结

在今天的学习中,所涉及的知识点基本延续了上一次的内容,并没有太多新的拓展。主要是对网页进行解析,提取信息并保存,最后根据这些信息来动态改变链接地址,最终完成了一个简单的与用户交互的演示项目。我希望你也能跟着动手实践,尽管这个过程可能会有些痛苦,不过虽然并没有给你的技术水平带来实质性提升,但至少可以拓展你的技术广度。