2024年3月

1、准备材料

正点原子stm32f407探索者开发板V2.4

STM32CubeMX软件(
Version 6.10.0

Keil µVision5 IDE(
MDK-Arm

野火DAP仿真器

XCOM V2.6串口助手

2、学习目标

本文主要学习 FreeRTOS 任务通知的相关知识,
包括FreeRTOS中的通信手段、任务通知的优缺点、任务通知 API 函数等知识

3、前提知识

3.1、FreeRTOS 中的通信手段

一个 FreeRTOS 负责的系统中,总是存在很多任务和中断,这些不同的任务和中断之间往往需要大量的通信来保证整个系统的运行,到本篇文章为止也已经介绍了包括队列、二值/计数信号量、互斥量、递归互斥量和事件组在内的多种通信方式

3.1.1、通过中介对象进行通信

但是目前已经介绍的这些通信方式有一个共同的特点,
当使用通信对象时,事件和数据不会直接发送到接收任务或接收 ISR ,而是发送到通信对象
。同样,任务和 ISR 从通信对象接收事件和数据,而不是直接从发送事件或数据的任务或 ISR 接收,这个特点可以用下图表示
(注释1)

3.1.2、直接任务通信

本篇文章所介绍的 “任务通知” 允许在不需要额外的中间媒介(通信对象)的情况下,实现任务与其他任务直接交互,并与 ISR 同步
。通过使用任务通知,任务或 ISR 可以直接向接收任务发送事件,该特点可以用下图表示

可以在 FreeRTOSConfig.h 文件中设置
configUSE_TASK_NOTIFICATIONS
参数为 1 启动任务通知功能,启动该功能之后,
会在每个任务的 TCB (任务控制块)中增加 8 字节空间,此时每个任务都有一个“通知状态”(可以是 “挂起” 或 “未挂起” )和一个 “通知值” (32 位无符号整数)
。当任务收到通知时,其通知状态将设置为挂起,当任务读取其通知值时,其通知状态将设置为未挂起

3.2、任务通知的优缺点

3.2.1、优点

任务通知在性能和 RAM 占用上存在优势,具体为以下两点

  1. 使用任务通知向任务发送事件或数据比使用队列、信号量或事件组执行等效操作要快得多
  2. 启用任务通知功能的固定开销仅为每个任务 8 个字节的 RAM ,而队列、信号量、事件组等在使用前都必须创建,占用空间较大

3.2.2、缺点

任务通知比通信对象更快并且使用更少的 RAM ,但任务通知不能在所有场景中使用,如下所示记录了无法使用任务通知的场景

  1. 通信对象可用于将事件和数据从 ISR 发送到任务,以及从任务发送到 ISR;
    任务通知只能用于将事件和数据从 ISR 发送到任务,不能用于将事件或数据从任务发送到 ISR
  2. 任何知道通信对象句柄的任务和 ISR 都可以访问通信对象,因此多个任务或 ISR 都可以发送或接收消息;任务通知只能将事件和数据发送到某个具体的接收任务中,发送的事件和数据只能由接收任务使用处理
  3. 队列是一种通信对象,一次可以保存多个数据项,已发送到队列但尚未从队列接收的数据将缓冲在队列对象内;任务通知通过更新接收任务的通知值来向任务发送数据,
    任务的通知值一次只能保存一个值
  4. 事件组是一种通信对象,可用于一次向多个任务发送事件;
    任务通知直接发送给接收任务,因此只能由接收任务处理
  5. 如果通信对象暂时处于无法向其写入更多数据或事件的状态(例如,当队列已满时,无法向队列发送更多数据),则尝试写入该对象的任务可以选择进入阻塞状态以等待其写操作完成;如果任务尝试向已经有待处理通知的任务发送任务通知,则发送任务不可能在阻塞状态下等待接收任务重置其通知状态

3.3、任务通知 API 概述

在任务通知这一部分,FreeRTOS 为使用者提供了三组 API 函数,三组 API 的特点如下所述

  1. 强大通用但较复杂的 xTaskNotify() 和 xTaskNotifyWait() API 函数
  2. 用作二进制或计数信号量的更轻量级且更快的替代方案的 xTaskNotifyGive() 和 ulTaskNotifyTake() API 函数
  3. 在序号 1 的基础上增加
    pulPreviousNotifyValue
    参数值的 xTaskNotifyAndQuery() API函数

上面三组不同类型 API 还拥有其对应的中断安全版本函数, 任务通知可以用来代替二进制信号量、计数信号量、事件组,有时甚至可以代替队列, 但是在大多数情况下使用者可能不需要使用如上述序号 1 中所述 API 函数提供的完全灵活性,一组更简单的函数就足够了

因此开发者提供了 xTaskNotifyGive() API 函数以允许将任务通知用作二值或计数信号量的更轻量级且更快的替代方案,并且提供 ulTaskNotifyTake() API 函数作为 xTaskNotifyWait() 的更简单但灵活性较差的替代方案,所以具体使用哪一组 API 函数可以根据使用者的需要按需使用

3.4、xTaskNotifyGive() 和 ulTaskNotifyTake() API 函数

xTaskNotifyGive() 直接向任务发送通知,
并对接收任务的通知值进行递增
(加一,因为是模拟信号量),
如果接收任务尚未挂起,则调用 xTaskNotifyGive() 会将接收任务的通知状态设置为挂起
,该 API 实际上是作为宏实现的,而不是函数,其具体声明如下所述

/**
  * @brief  任务通知用作轻量级且更快的二进制或计数信号量替代方案时所使用的通知发送函数
  * @param  xTaskToNotify:通知发送到的任务的句柄
  * @retval 只会返回pdPASS
  */
BaseType_t xTaskNotifyGive(TaskHandle_t xTaskToNotify);

/**
  * @brief  上述函数的的中断安全版本函数
  * @param  xTaskToNotify:通知发送到的任务的句柄
  * @param  pxHigherPriorityTaskWoken:用于通知应用程序编写者是否应该执行上下文切换
  * @retval None
  */
void vTaskNotifyGiveFromISR(TaskHandle_t xTaskToNotify,
							BaseType_t *pxHigherPriorityTaskWoken);

当一个任务使用 xTaskNotifyGive() API 函数将通知值用作二值或等效计数信号量时, 则被通知的任务应使用 ulTaskNotifyTake() API 函数来接收或等待通知值

ulTaskNotifyTake() 允许任务在阻塞状态下等待其通知值大于零,并在返回之前递减(减一)或清除任务的通知值,其具体函数声明如下所述

/**
  * @brief  任务通知被用作更快、更轻的二进制或计数信号量替代时使用通知接收函数
  * @param  xClearCountOnExit:设置为pdTRUE,则该函数返回之前,调用任务的通知值将被清零;设置为pdFASLE,并且通知值大于0,则调用任务的通知值将在该函数返回之前递减
  * @param  xTicksToWait:调用任务应保持阻塞状态以等待其通知值大于零的最长时间
  * @retval 阻塞时间到期也没能等到消息则返回 0 ,阻塞时间到期前等到消息则返回之前的通知值
  */
uint32_t ulTaskNotifyTake(BaseType_t xClearCountOnExit, TickType_t xTicksToWait);

3.5、xTaskNotify() API 函数

xTaskNotify() 是 xTaskNotifyGive() 的功能更强大的版本,可用于通过以下任意方式更新接收任务的通知值

  1. 接收任务的通知值递增(加一),在这种情况下 xTaskNotify()
    相当于 xTaskNotifyGive()
  2. 接收任务的通知值中设置一位或多位,这允许任务的通知值用作
    事件组
    的更轻量级和更快的替代方案
  3. 将一个全新的数字写入接收任务的通知值,但前提是接收任务自上次更新以来已读取其通知值,这允许任务的通知值提供与
    长度为 1 的队列
    提供的功能类似的功能
  4. 将一个全新的数字写入接收任务的通知值,即使接收任务自上次更新以来尚未读取其通知值,这允许任务的通知值提供与 xQueueOverwrite() API 函数提供的功能类似的功能,由此产生的行为有时被称为
    “邮箱”

xTaskNotify() 比 xTaskNotifyGive() 更灵活、更强大,并且由于额外的灵活性和强大功能,它的使用也稍微复杂一些,使用 xTaskNotify() 函数时,如果接收任务尚未挂起,则调用 xTaskNotify() 将始终将其设置为挂起状态,如下所示为其具体函数声明

/**
  * @brief  任务通知函数
  * @param  xTaskToNotify:通知发送到的任务的句柄
  * @param  ulValue:ulValue的使用方式取决于eAction值,参考 “3.5.1、eAction 参数” 小节
  * @param  eAction:一个枚举类型,指定如何更新接收任务的通知值,参考 “3.5.1、eAction 参数” 小节
  * @retval 除 “3.5.1、eAction 参数” 小节提到的一种情况外,均返回pdPASS
  */
BaseType_t xTaskNotify(TaskHandle_t xTaskToNotify,
					   uint32_t ulValue,
					   eNotifyAction eAction);

/**
  * @brief  任务通知的中断安全版本函数
  * @param  xTaskToNotify:通知发送到的任务的句柄
  * @param  ulValue:ulValue的使用方式取决于eAction值,参考 “3.5.1、eAction 参数” 小节
  * @param  eAction:一个枚举类型,指定如何更新接收任务的通知值,参考 “3.5.1、eAction 参数” 小节
  * @param  pxHigherPriorityTaskWoken:用于通知应用程序编写者是否应该执行上下文切换
  * @retval 除 “3.5.1、eAction 参数” 小节提到的一种情况外,均返回pdPASS
  */
BaseType_t xTaskNotifyFromISR(TaskHandle_t xTaskToNotify,
							  uint32_t ulValue,
							  eNotifyAction eAction
							  BaseType_t *pxHigherPriorityTaskWoken);

3.5.1、
eAction
参数

eAction
参数是一个 eNotifyAction 枚举类型,其定义了 5 中不同枚举类型,用于模拟二值信号量、计数信号量、队列、事件组和 ”邮箱“ 等功能,其具体定义如下所述

eNotifyAction 值 对接收任务的最终影响
eNoAction 接收任务的通知状态设置为待处理,而不更新其通知值,未使用 xTaskNotify() 中 ulValue 参数
eSetBits 接收任务的通知值与 xTaskNotify() 中 ulValue 参数中传递的值进行按位或运算,例如:如果 ulValue 设置为 0x01,则接收任务的通知值中将置位第 0 位
eIncrement 接收任务的通知值递增,未使用 xTaskNotify() 中 ulValue 参数
eSetValueWithoutOverwrite 如果接收任务在调用 xTaskNotify() 之前有待处理的通知,则不执行任何操作,并且 xTaskNotify() 将返回 pdFAIL;如果在调用 xTaskNotify() 之前接收任务没有待处理的通知,则接收任务的通知值将设置为 xTaskNotify() 中 ulValue 参数中传递的值
eSetValueWithOverwrite 接收任务的通知值设置为 xTaskNotify() ulValue 参数中传递的值,无论接收任务在调用 xTaskNotify() 之前是否有待处理的通知

3.6、xTaskNotifyWait() API 函数

xTaskNotifyWait() 是 ulTaskNotifyTake() 的功能更强大的版本,它允许任务以可选的超时等待调用任务的通知状态变为待处理(如果它尚未处于待处理状态),xTaskNotifyWait() 提供了在进入函数和退出函数时清除调用任务的通知值中的位的参数
ulBitsToClearOnEntry

ulBitsToClearOnExit

/**
  * @brief  任务通知的中断安全版本函数
  * @param  ulBitsToClearOnEntry:参考 “3.6.1、ulBitsToClearOnEntry 参数” 小节
  * @param  ulBitsToClearOnExit:参考 “3.6.2、_ulBitsToClearOnExit_ 参数” 小节
  * @param  pulNotificationValue:用于传递任务的通知值,因为等待通知的函数可能由于 ulBitsToClearOnExit 参数在函数退出时收到的消息值已被更改
  * @param  xTicksToWait:调用任务应保持阻塞状态以等待其通知状态变为挂起状态的最长时间
  * @retval 参考 “3.6.2、xTaskNotifyWait() 函数返回值” 小节
  */
BaseType_t xTaskNotifyWait(uint32_t ulBitsToClearOnEntry,
						   uint32_t ulBitsToClearOnExit,
						   uint32_t *pulNotificationValue,
						   TickType_t xTicksToWait);

3.6.1、
ulBitsToClearOnEntry
参数

如果调用任务在调用 xTaskNotifyWait() 之前没有待处理的通知,则在进入该函数时,将在任务的通知值中清除参数
ulBitsToClearOnEntry
中设置的任何位

例如,如果参数
ulBitsToClearOnEntry
为 0x01,则任务通知值的位 0 将被清除,再举一个例子,将参数
ulBitsToClearOnEntry
设置为 0xffffffff(ULONG_MAX)将清除任务通知值中的所有位,从而有效地将值清除为 0

3.6.2、
ulBitsToClearOnExit
参数

如果调用任务因为收到通知而退出 xTaskNotifyWait() ,或者因为在调用 xTaskNotifyWait() 时已经有通知挂起,那么在参数
ulBitsToClearOnExit
中设置的任何位将在任务退出 xTaskNotifyWait() 函数之前在任务的通知值中被清除

例如,如果参数
ulBitsToClearOnExit
为 0x03 ,则任务通知值的位 0 和位 1 将在函数退出之前被清除,再举个例子,将参数
ulBitsToClearOnExit
为 0xffffffff(ULONG_MAX)将清除任务通知值中的所有位,从而有效地将值清除为 0

3.6.2、xTaskNotifyWait() 函数返回值

有两种可能的返回值,分别为 pdPASS 和 pdFALSE ,具体如下所述

① pdPASS

  1. 调用 xTaskNotifyWait() 时调用任务已经有待处理的通知
  2. 调用 xTaskNotifyWait() 时调用任务没有待处理的通知,由于设置了阻塞时间因此进入阻塞状态等待消息挂起,在阻塞时间到期之前成功等到消息挂起

② pdFALSE

  1. 调用 xTaskNotifyWait() 时调用任务没有待处理的通知,由于设置了阻塞时间因此进入阻塞状态等待消息挂起,但是直到阻塞时间到期都没有等到消息挂起

3.7、其他 API 函数

除了上面的一些常用 API 之外,还有一些工具或不常用的 API 函数,因为启用任务通知后会在任务控制块中增加一个任务状态和一个任务通知值,因此 FreeRTOS 提供了
清除任务状态的 xTaskNotifyStateClear() API 函数和 清除任务通知值的 ulTaskNotifyValueClear() API 函数

另外增加了 "3.3、任务通知 API 概述" 小节中提到的在 xTaskNotify() API 函数上增加了
pulPreviousNotifyValue
参数的 xTaskNotifyAndQuery() API函数和其中断安全版本函数,上述提到的四个函数声明具体如下所述

/**
  * @brief  清除任务通知状态
  * @param  xTask:要操作的任务句柄
  * @retval 如果要操作的任务有待处理的通知,并且该通知已清除,则返回pdTRUE;如果该任务没有待处理的通知,则返回pdFALSE
  */
BaseType_t xTaskNotifyStateClear(TaskHandle_t xTask);

/**
  * @brief  清除任务通知值
  * @param  xTask:要操作的任务句柄
  * @param  ulBitsToClear:xTask的通知值中要清除的位的位掩码,比如设置为0x01表示将通知值的第0位清除
  * @retval ulBitsToClear指定的位被清除之前目标任务的通知值的值
  */
uint32_t ulTaskNotifyValueClear(TaskHandle_t xTask,
								uint32_t ulBitsToClear);

/**
  * @brief  执行与xTaskNotify()相同的操作,此外它还在附加的pulPreviousNotifyValue中返回目标任务的先前通知值(调用函数时的通知值,而不是函数返回时的通知值)
  * @param  xTaskToNotify:被通知任务的句柄
  * @param  ulValue:通知值,ulValue的使用方式取决于eAction值,参考 “3.5.1、eAction 参数” 小节
  * @param  eAction:一个枚举类型,指定如何更新接收任务的通知值,参考 “3.5.1、eAction 参数” 
  * @param  pulPreviousNotifyValue:返回目标任务的先前通知值
  * @retval 除 “3.5.1、eAction 参数” 小节提到的一种情况外,均返回pdPASS
  */
BaseType_t xTaskNotifyAndQuery(TaskHandle_t xTaskToNotify,
							   uint32_t ulValue,
							   eNotifyAction eAction,
							   uint32_t *pulPreviousNotifyValue);
							   

/**
  * @brief  上述函数的中断安全版本
  * @param  pxHigherPriorityTaskWoken:通知应用程序编程者是否需要进行上下文切换
  */
BaseType_t xTaskNotifyAndQueryFromISR(TaskHandle_t xTaskToNotify,
									  uint32_t ulValue,
									  NotifyAction eAction,
									  uint32_t *pulPreviousNotifyValue,
									  BaseType_t *pxHigherPriorityTaskWoken);

4、实验一:使用任务通知替代信号量

4.1、实验目标

既然本实验目的是使用任务通知替代信号量,那么我们可以使用任务通知
重新实现一下 "
FreeRTOS教程5 信号量
" 文章中 “4、实验一:二值信号量的应用” 小节内容

4.2、CubeMX相关配置

复制 “
FreeRTOS教程5 信号量
” 文章中 “4、实验一:二值信号量的应用”小节所描述的实验工程,然后通过 “.ioc” 后缀的文件打开该工程的 STM32CubeMX 软件配置界面,单击 Middleware and Software Packs/FREERTOS ,在 Configuration 中找到 Timers and Semaphores ,
删除原来创建好名为 BinarySem_ADC 的二值信号量
,然后直接重新生成工程代码即可

4.3、添加其他必要代码

重新实现 ADC 采集转换完成中断回调函数和任务函数 TASK_ADC ,主要是将原来使用二值信号量同步 ISR 和 TASK_ADC 的程序修改为使用任务通知,具体如下所示

/*ADC数据处理任务*/
void TASK_ADC(void *argument)
{
  /* USER CODE BEGIN TASK_ADC */
	//定义一个变量用于表示任务待处理的事件数量
	uint32_t ulEventsToProcess;
  /* Infinite loop */
  for(;;)
  {
		//等待任务通知
		ulEventsToProcess = ulTaskNotifyTake(pdTRUE, pdMS_TO_TICKS(500));
		//如果等到事件
		if(ulEventsToProcess != 0)
		{
			//当待处理的事件不为0就一直处理,处理一次待处理的事件减少1
			while(ulEventsToProcess > 0)
			{
				uint32_t Volt = (3300 * adc_value)>>12;
				printf("val:%d, Volt:%d\r\n", adc_value, Volt);
				ulEventsToProcess --;
			}
		}
  }
  /* USER CODE END TASK_ADC */
}


/*转换完成中断回调*/
void HAL_ADC_ConvCpltCallback(ADC_HandleTypeDef *hadc)
{
	/*定时器中断启动单通道转换*/
	if(hadc->Instance == ADC1)
	{
		//获取原始ADC采集值
		adc_value = HAL_ADC_GetValue(hadc);
		BaseType_t highTaskWoken = pdFALSE;
		if(task_ADCHandle != NULL)
		{
			//采集完毕给TASK_ADC任务发送采集完毕的通知
			vTaskNotifyGiveFromISR(task_ADCHandle, &highTaskWoken);
			portYIELD_FROM_ISR(highTaskWoken);
		}
	}
}

4.4、烧录验证

实验现象与 “
FreeRTOS教程5 信号量
” 文章 “4.4、烧录验证” 小节内容一样,如下图所示

5、实验二:使用任务通知传递数据

5.1、实验目标

“4、实验一:使用任务通知替代信号量” 小节的实验流程如下所述

  1. TASK_ADC 等待消息的到来
  2. ADC_ISR 获取原始 ADC 采集值,然后将采集值写入全局变量 adc_value 中,并且发送消息给 TASK_ADC 表示采集完成
  3. TASK_ADC 消息挂起,退出阻塞状态,然后对存入全局变量 adc_value 中的采集值进行处理,最后通过串口将信息输出

本实验要实现的目的仍然为单通道的 ADC 采集,只不过不需要定义全局变量 adc_value 来存储采集到的原始 ADC 的值,之前提到过启用任务通知后,
任务会有一个 32 位的通知值,当我们需要传递的数据为 32 位或更低位的数据时我们可以用这个通知值来直接传递数据

但是注意不是所有情况下都可以用来传递数据的,这要根据
eAction
参数来决定,具体可以参考 “3.5.1
eAction
参数” 小节内容,
这里我们将其选择为 eSetValueWithOverwrite ,但是要注意这时只能传递一个数据,传递完毕如果接收端不处理下次该数据就会被覆盖掉
,本实验流程如下所述

  1. TASK_ADC 等待消息的到来
  2. ADC_ISR 获取原始 ADC 采集值,将原始 ADC 采集值作为通知值传递给 TASK_ADC
  3. TASK_ADC 消息挂起,然后退出阻塞状态,取出通知值进行处理,最后通过串口将信息输出

5.2、CubeMX相关配置

复制 “4、实验一:使用任务通知替代信号量” 小节配置好的工程即可

5.3、添加其他必要代码

打开工程代码,修改 TASK_ADC 任务函数体和 ADC 采集完毕中断回调函数,具体如下所述

/*ADC处理任务*/
void TASK_ADC(void *argument)
{
	/* USER CODE BEGIN TASK_ADC */
	//定义一个变量用于表示任务待处理的事件数量
	uint32_t notifyValue;
	/* Infinite loop */
	for(;;)
	{
		//进入xTaskNotifyWait函数时不清除任何位
		uint32_t ulBitsToClearOnEntry = 0x00;
		//退出xTaskNotifyWait函数时清除所有位
		uint32_t ulBitsToClearOnExit = 0xFFFFFFFF;
		//等待任务通知
		BaseType_t result = xTaskNotifyWait(ulBitsToClearOnEntry, ulBitsToClearOnExit, &notifyValue, portMAX_DELAY);
		//如果等到事件
		if(result == pdTRUE)
		{
			//对采集值处理并通过串口输出
			uint32_t Volt = (3300 * notifyValue)>>12;
			printf("val:%d, Volt:%d\r\n", notifyValue, Volt);
		}
	}
  /* USER CODE END TASK_ADC */
}

/*ADC转换完成中断回调*/
void HAL_ADC_ConvCpltCallback(ADC_HandleTypeDef *hadc)
{
	//定时器中断启动单通道转换
	if(hadc->Instance == ADC1)
	{
		//获取原始ADC采集值
		uint32_t adc_value = HAL_ADC_GetValue(hadc);
		BaseType_t highTaskWoken = pdFALSE;
		if(task_ADCHandle != NULL)
		{
			//采集完毕后将采集值作为消息数据发送给TASK_ADC任务
			xTaskNotifyFromISR(task_ADCHandle, adc_value, eSetValueWithOverwrite, &highTaskWoken);
			portYIELD_FROM_ISR(highTaskWoken);
		}
	}
}

5.4、烧录验证

实验现象与 “
FreeRTOS教程5 信号量
” 文章 “4.4、烧录验证” 小节内容一样,此处不再赘述

6、注释详解

注释1
:图片来源于
Mastering_the_FreeRTOS_Real_Time_Kernel-A_Hands-On_Tutorial_Guide.pdf

参考资料

STM32Cube高效开发教程(基础篇)

Mastering_the_FreeRTOS_Real_Time_Kernel-A_Hands-On_Tutorial_Guide.pdf

前言

今天大姚给大家分享一款.NET开源(MIT License)、免费、功能强大的Windows快速文件搜索和应用程序启动器:Flow Launcher。

工具介绍

Flow Launcher 是一款方便实用的 Windows 文件搜索和应用程序启动器,能够帮助你快速查找文件、启动应用程序和执行系统操作,提高工作效率和操作便利性。并且生态完善,有插件商店,你可以查看完整的插件列表,或通过 "设置 "中的 "插件商店 "菜单快速安装插件。

支持语言

  • 支持拼音搜索。
  • 支持从中文、英文、意大利语等多种语言。

工具源代码

工具下载安装

工具快捷键

快捷键 描述
Alt
+
Space
打开搜索窗口(默认且可配置)
Enter 执行
Ctrl
+
Shift
+
Enter
以管理员身份运行

向上滚动和向下滚动

返回结果/打开上下文菜单
Ctrl
+
O
,
Shift
+
Enter
打开上下文菜单
Tab 自动完成
F1 切换预览面板(默认且可配置)
Esc 返回结果/隐藏搜索窗口
Ctrl
+
C
复制当前文件夹/文件
Ctrl
+
I
打开流程设置
Ctrl
+
R
再次运行当前查询(刷新结果)
F5 重新加载所有插件数据
Ctrl
+
F12
在搜索窗口中切换游戏模式
Ctrl
+
+
,
-
快速更改显示的最大结果数
Ctrl
+
[
,
]
快速更改搜索窗口宽度
Ctrl
+
H
打开搜索历史记录
Ctrl
+
Backspace
返回到上一个目录

部分功能截图

项目源码地址



更多项目实用功能和特性欢迎前往项目开源地址查看

一、Redis 支持三种主要的集群模式

  1. 主从复制模式(Master-Slave Replication):


    • 在这种模式下,主节点(Master)负责处理写入操作,而从节点(Slave)则是主节点的副本,用于处理读取操作和提供数据冗余。这种模式通过复制主节点的数据到多个从节点来提高数据的可用性和读取性能。当主节点发生故障时,可以从从节点中选举一个新的主节点,以此来实现高可用性。
  2. 哨兵模式(Sentinel):


    • 哨兵模式是在主从复制模式的基础上增加了自动故障转移的功能。哨兵节点(Sentinel)监控主节点和从节点的状态,当主节点出现故障时,哨兵会自动将一个从节点升级为新的主节点,并通知其他从节点和客户端新的主节点地址,以此来保证Redis服务的高可用性。
  3. 集群模式(Cluster):


    • Redis集群模式通过数据分片(sharding)来实现分布式存储。集群中的每个节点负责一部分数据(即一部分哈希槽),并且每个主节点可以有一个或多个从节点。集群模式不仅支持数据的自动分区,还支持主从复制和故障转移,从而实现了高可用性和可扩展性。

这三种模式各有特点和适用场景,可以根据业务需求和系统架构来选择合适的集群模式。主从复制模式和哨兵模式主要用于提高数据的可用性和读取性能,而集群模式则更适合于需要横向扩展和处理大量数据的场景。

二、Redis集群常用运维指令

Redis集群模式下的运维指令主要用于集群的搭建、管理、监控和维护。以下是一些常用的Redis集群运维指令:

  1. 创建集群:


    redis-cli --cluster create host1:port1 host2:port2 ... hostN:portN --cluster-replicas N

    这个命令用于创建一个新的Redis集群,其中
    host1:port1 ... hostN:portN
    是集群节点的地址和端口,
    --cluster-replicas N
    指定了每个主节点的从节点数量。

  2. 检查集群状态:


    redis-cli --cluster check host:port

    这个命令用于检查指定Redis集群节点的状态。

  3. 获取集群信息:


    redis-cli --cluster info host:port

    这个命令用于获取集群的相关信息,如集群状态、节点信息等。

  4. 添加节点到集群:


    redis-cli --cluster add-node new_host:new_port existing_host:existing_port node_id

    这个命令用于将新节点添加到现有的集群中。
    new_host:new_port
    是新节点的地址和端口,
    existing_host:existing_port
    是现有集群中任意节点的地址和端口,
    node_id
    是现有节点的ID。

  5. 删除节点从集群:


    redis-cli --cluster del-node host:port node_id

    这个命令用于从集群中删除指定的节点。
    host:port
    是执行命令的节点地址和端口,
    node_id
    是要删除的节点ID。

  6. 重新分配槽位:


    redis-cli --cluster reshard host:port --cluster-from node_id --cluster-to new_node_id --cluster-slots num_slots

    这个命令用于在集群中重新分配槽位。
    --cluster-from
    指定源节点ID,
    --cluster-to
    指定目标节点ID,
    --cluster-slots
    指定要迁移的槽位数量。

  7. 集群重平衡:


    redis-cli --cluster rebalance host:port --cluster-weight node1
    =w1
    ...
    nodeN
    =wN

    这个命令用于根据指定的权重对集群节点的槽位进行重平衡。

  8. 设置节点超时时间:


    redis-cli --cluster set-timeout host:port milliseconds

    这个命令用于设置集群节点的超时时间。

  9. 导入外部Redis数据到集群:


    redis-cli --cluster import host:port --cluster-from host:port

    这个命令用于将外部Redis实例的数据导入到集群中。

  10. 执行集群操作:


    redis-cli --cluster call host:port command arg1 arg2
    ...

    这个命令用于在集群的所有节点上执行指定的命令。

三、Redis有哪几种数据结构,分别的适用场景

Redis 支持多种数据结构,每种数据结构都有其特定的使用场景和优势。以下是 Redis 的主要数据结构及其适用场景:

  1. 字符串(String):


    • 适用场景:字符串是最基本的类型,可以存储任何形式的数据,比如文本、数字、JSON 等。常用于缓存功能,如缓存用户的会话信息、存储配置参数、计数器等。
  2. 列表(List):


    • 适用场景:列表是一个有序的字符串集合,可以实现栈(先进后出)或队列(先进先出)的功能。适用于消息队列、最新列表(如微博动态)、排行榜等场景。
  3. 集合(Set):


    • 适用场景:集合是一个无序且元素唯一的集合。适用于存储不重复的数据集、实现共同好友功能、标签系统、发布/订阅模型等。
  4. 有序集合(Sorted Set):


    • 适用场景:有序集合中的每个元素都关联一个分数(score),元素按分数有序排列。适用于排行榜、范围查询(如获取排名前 10 的用户)、计分板等。
  5. 哈希表(Hash):


    • 适用场景:哈希表是一个键值对集合,适合存储对象。适用于存储用户信息、缓存网站对象、存储多个相关字段的数据等。
  6. 位图(Bitmap):


    • 适用场景:位图是字符串的特例,通常用于表示大量的布尔值。适用于统计活跃用户、权限控制、状态标记等。
  7. 超日志(HyperLogLog):


    • 适用场景:超日志是一种概率数据结构,用于高效地估算集合中唯一元素的数量(基数)。适用于统计网站访问独立IP数、分析大集合的基数等。
  8. 地理空间(Geo):


    • 适用场景:地理空间数据结构用于存储地理位置信息,并能够执行地理位置查询。适用于地理位置索引、附近位置查询、位置跟踪等。
  9. 流(Stream):


    • 适用场景:流数据结构是 Redis 5.0 版本引入的,适用于构建消息队列、实现时间序列数据模型、提供持久化消息队列功能等。

每种数据结构都有其特定的命令集来操作,可以根据实际业务需求选择合适的数据结构来优化性能和存储效率。在实际应用中,有时候也会将多种数据结构组合使用,以满足更复杂的业务逻辑。

写在开头

在之前的学习我们了解到,为了充分利用缓存,提高程序的执行速度,编译器在底层执行的时候,会进行指令重排序的优化操作,但这种优化,在有些时候会带来
有序性
的问题。

那何为有序性呢?我们可以通俗理解为:
程序执行的顺序要按照代码的先后顺序。
当然,之前我们还说过发生有序性问题时,我们可以通过给变量添加volatile修饰符进行解决。那么今天,我们继续学习,一起探讨一下volatile与指令重排之间的冤家路窄!

有序性问题

首先,我们来回顾一下之前写的一个关于有序性问题的测试类。

【代码示例1】

int a = 1;(1)
int b = 2;(2)
int c = a + b;(3)

上面的这段代码中,c变量依赖a,b的值,因此,在编译器优化重排时,c肯定会在a,b赋值以后执行,但a,b之间没有依赖关系,可能会发生重排序,但这种重排序即便到了多线程中依旧不会存在问题,因为即便重排对执行结果也无影响。

但有些时候,指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致,我们继续看下面这段代码:

【代码示例2】

public class Test {

    private static int num = 0;
    private static boolean ready = false;
    //禁止指令重排,解决顺序性问题
    //private static volatile boolean ready = false;

    public static class ReadThread extends Thread {

        @Override
        public void run() {

            while (!Thread.currentThread().isInterrupted()) {
                if (ready) {//(1)
                    System.out.println(num + num);//(2)
                }
                System.out.println("读取线程...");
            }
        }
    }

    public static class WriteRead extends Thread {

        @Override
        public void run() {
            num = 2;//(3)
            ready = true;//(4)
            System.out.println("赋值线程...");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReadThread rt = new ReadThread();
        rt.start();

        WriteRead wr = new WriteRead();
        wr.start();

        Thread.sleep(10);
        rt.interrupt();
        System.out.println("rt stop...");
    }
}

我们定义了2个线程,一个用来求和操作,一个用来赋值操作,因为定义的是成员变量,所以代码(1)(2)(3)(4)之间不存在依赖关系,在运行时极可能发生指令重排序,如将(4)在(3)前执行,顺序为(4)(1)(3)(2),这时输出的就是0而不是4,但在很多性能比较好的电脑上,这种重排序情况不易复现。
这时,我们给ready 变量添加一个volatile关键字,就成功的解决问题了。

原因解析

volatile关键字可以禁止指令重排的原因主要有两个!

一、3 个 happens-before 规则的实现

  1. 对一个 volatile 变量的写 happens-before 任意后续对这个 volatile 变量的读;
  2. 一个线程内,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操作;
  3. happens-before 传递性,A happens-before B,B happens-before C,则 A happens-before C。

二、内存屏障
变量声明为 volatile 后,在对这个变量进行读写操作的时候,会通过插入特定的
内存屏障
的方式来禁止指令重排序。

内存屏障(Memory Barrier 又称内存栅栏,是一个 CPU 指令),为了实现volatile 内存语义,volatile 变量的写操作,在变量的前面和后面分别插入内存屏障;volatile 变量的读操作是在后面插入两个内存屏障。

具体屏障规则:

  1. 在每个 volatile 写操作的前面插入一个 StoreStore 屏障;
  2. 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障;
  3. 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障;
  4. 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。

屏障说明:

  1. StoreStore:禁止之前的普通写和之后的 volatile 写重排序;
  2. StoreLoad:禁止之前的 volatile 写与之后的 volatile 读/写重排序;
  3. LoadLoad:禁止之后所有的普通读操作和之前的 volatile 读重排序;
  4. LoadStore:禁止之后所有的普通写操作和之前的 volatile 读重排序。

OK,知道了这些内容之后,我们再回头看代码示例2中,增加了volatile关键字后的执行顺序,在赋值线程启动后,执行顺序会变成(3)(4)(1)(2),这时打印的结果就为4啦!

volatile为什么不能保证原子性?

我们讲完了volatile修饰符保证可见性与有序性的内容,接下来我们思考另外一个问题,它能够保证原子性吗?为什么?我们依旧通过一段代码去证明一下!

【代码示例3】

public class Test {
    //计数变量
    static volatile int count = 0;
    public static void main(String[] args) throws InterruptedException {
        //线程 1 给 count 加 10000
        Thread t1 = new Thread(() -> {
            for (int j = 0; j <10000; j++) {
                count++;
            }
            System.out.println("thread t1 count 加 10000 结束");
        });
        //线程 2 给 count 加 10000
        Thread t2 = new Thread(() -> {
            for (int j = 0; j <10000; j++) {
                count++;
            }
            System.out.println("thread t2 count 加 10000 结束");
        });
        //启动线程 1
        t1.start();
        //启动线程 2
        t2.start();
        //等待线程 1 执行完成
        t1.join();
        //等待线程 2 执行完成
        t2.join();
        //打印 count 变量
        System.out.println(count);
    }
}

我们创建了2个线程,分别对count进行加10000操作,理论上最终输出的结果应该是20000万对吧,但实际并不是,我们看一下真实输出。

输出:

thread t1 count 加 10000 结束
thread t2 count 加 10000 结束
14281

原因:
Java 代码中 的 count++并非原子的,而是一个复合性操作,至少需要三条CPU指令:

  • 指令 1:把变量 count 从内存加载到CPU的寄存器
  • 指令 2:在寄存器中执行 count + 1 操作
  • 指令 3:+1 后的结果写入CPU缓存或内存

即使是单核的 CPU,当线程 1 执行到指令 1 时发生线程切换,线程 2 从内存中读取 count 变量,此时线程 1 和线程 2 中的 count 变量值是相等,都执行完指令 2 和指令 3,写入的 count 的值是相同的。从结果上看,两个线程都进行了 count++,但是 count 的值只增加了 1。这种情况多发生在cpu占用时间较长的线程中,若单线程对count仅增加100,那我们就很难遇到线程的切换,得出的结果也就是200啦。

要想解决也很简单,利用 synchronized、Lock或者AtomicInteger都可以,我们在后面的文章中会聊到的,请继续保持关注哦!

结尾彩蛋

如果本篇博客对您有一定的帮助,大家记得
留言+点赞+收藏
呀。原创不易,转载请联系Build哥!

image

如果您想与Build哥的关系更近一步,还可以关注“JavaBuild888”,在这里除了看到《Java成长计划》系列博文,还有提升工作效率的小笔记、读书心得、大厂面经、人生感悟等等,欢迎您的加入!

image


引言


大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第十三篇内容:阻塞队列。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!!

在多线程编程的世界里,生产者-消费者问题是一个经典且频繁出现的场景。设想这样一个情况:有一群持续不断地生产资源的线程(我们称之为“生产者”),以及另一群持续消耗这些资源的线程(称为“消费者”)。他们共享一个缓冲池,生产者将新生成的资源存入其中,而消费者则从缓冲池中取出并处理这些资源。这种设计模式有效地简化了并发编程的复杂性,一方面消除了生产者与消费者类之间的代码耦合,另一方面通过解耦生产和消费过程,使得系统可以更灵活地分配和调整负载。

然而,在实际实现过程中,尤其是在Java等支持多线程的语言中,直接操作共享变量来同步生产和消费行为会带来诸多挑战。如果没有采取适当的同步机制,当多个生产者或消费者同时访问缓冲池时,很容易造成数据竞争、重复消费甚至是死锁等问题。例如,当缓冲池为空时,消费者应被阻塞以免无谓地消耗CPU资源;而当缓冲池已满时,则需要阻止生产者继续添加元素,转而唤醒等待中的消费者去消耗资源。

为了解决上述难题,Java标准库提供了强大的工具——
java.util.concurrent.BlockingQueue
接口及其实现类。阻塞队列作为Java并发编程的重要组成部分,允许开发者无需手动处理复杂的线程同步逻辑,只需简单地向队列中添加或移除元素,即可确保线程安全的操作。无论是插入还是获取元素的操作,若队列当前状态不允许该操作执行,相应的线程会被自动阻塞,直至条件满足时再被唤醒。

举例来说,我们可以创建一个
ArrayBlockingQueue
实例,设置其容量大小,并让生产者线程通过调用
put()
方法将新生产的对象放入队列,如果队列已满,
put()
方法会阻塞生产者线程直到有消费者线程从队列中移除了某个元素腾出空间为止:

ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 创建一个容量为10的阻塞队列

// 生产者线程
new Thread(() -> {
    for (int i = 0; ; i++) { // 不断生产资源
        try {
            queue.put(i); // 尝试将资源放入队列,若队列满则阻塞
            System.out.println("生产者放入了一个资源:" + i);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
}).start();

// 消费者线程
new Thread(() -> {
    while (true) { // 不断消费资源
        try {
            Integer resource = queue.take(); // 尝试从队列中取出资源,若队列空则阻塞
            System.out.println("消费者消费了一个资源:" + resource);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
}).start();

总之,借助阻塞队列这一特性,程序员能更专注于业务逻辑,而不必过分担忧底层的线程同步问题,从而极大地提升了并发程序的设计效率和可靠性。在接下来的内容中,我们将深入探讨阻塞队列的具体操作方法、多种实现类及其内部工作原理,并结合实际案例来进一步理解它在Java多线程编程中的核心价值。


阻塞队列作用


阻塞队列的由来与作用在多线程编程中扮演着至关重要的角色。其诞生源于解决生产者-消费者问题这一经典的并发场景,它有效地降低了开发复杂度,并确保了数据交换的安全性。

在传统的生产者-消费者模式下,假设存在多个生产者线程和消费者线程,它们共享一个有限容量的缓冲池(或称为队列)。生产者线程负责生成资源并将其存入缓冲池,而消费者线程则从缓冲池取出资源进行消费。如果直接使用普通的非同步队列,在多线程环境下进行资源的存取操作时,可能会出现以下问题:

  1. 线程安全问题 :当多个线程同时访问同一个队列时,可能出现竞态条件导致的数据不一致,例如重复消费、丢失数据或者数据状态错乱。
  2. 死锁与活跃性问题 :在没有正确同步机制的情况下,生产者和消费者线程可能陷入互相等待对方释放资源的状态,从而导致死锁;或者当缓冲区已满/空时,线程因无法继续执行而进入无限期等待状态,影响系统整体的效率和响应性。
  3. 自定义同步逻辑复杂 :为了解决上述问题,开发者需要自行编写复杂的等待-通知逻辑,即当队列满时阻止生产者添加元素,唤醒消费者消费;反之,当队列空时阻止消费者获取元素,唤醒生产者填充资源。这些逻辑容易出错且不易维护。

Java平台通过引入
java.util.concurrent.BlockingQueue
接口及其一系列实现类,大大简化了生产者-消费者问题的解决方案。BlockingQueue不仅提供了线程安全的队列访问方式,而且自动处理了上述的各种同步问题,使得生产者和消费者能够自然地协作,无需关注底层的线程同步细节。

举例来说,下面是一个使用ArrayBlockingQueue作为共享资源容器的简单生产者-消费者示例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueExample {
    static final int QUEUE_CAPACITY = 10;
    static ArrayBlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

    public static void main(String[] args) {
        Thread producerThread = new Thread(() -> produce());
        Thread consumerThread = new Thread(() -> consume());

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    static void produce() {
        for (int i = 0; ; i++) {
            try {
                sharedQueue.put(i);
                System.out.println("生产者放入了一个元素:" + i);
                TimeUnit.MILLISECONDS.sleep(100); // 模拟生产间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    static void consume() {
        while (true) {
            try {
                Integer item = sharedQueue.take();
                System.out.println("消费者消费了一个元素:" + item);
                TimeUnit.MILLISECONDS.sleep(150); // 模拟消费间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

在这个例子中,生产者线程调用
put()
方法将整数元素添加到ArrayBlockingQueue中,当队列满时,该方法会阻塞生产者直到有空间可用。消费者线程则通过调用
take()
方法从队列中移除并消费元素,当队列为空时,消费者会被阻塞直至有新的元素被加入。这样,阻塞队列充当了协调生产者和消费者工作节奏的核心组件,保证了整个系统的稳定性和高效运行。


阻塞队列的操作方法详解


阻塞队列的操作方法详解是理解和使用Java并发包中
java.util.concurrent.BlockingQueue
的关键部分。它提供了一系列丰富的方法来插入、移除和检查元素,这些方法在处理多线程环境下共享数据时确保了线程安全,并能够根据不同的需求采取不同的策略。

抛出异常操作:

  • add(E e) :如果尝试向满的队列添加元素,则抛出 IllegalStateException("Queue full") 异常。
  • remove() :若队列为空则抛出 NoSuchElementException 异常,用于移除并返回队列头部的元素。
  • element() :返回但不移除队列头部的元素,同样在队列为空时抛出 NoSuchElementException 异常。

返回特殊值操作:

  • offer(E e) :尝试将元素放入队列,如果队列已满则返回 false ,否则返回 true 表示成功加入。
  • poll() :尝试从队列中移除并返回头部元素,若队列为空则返回 null
  • peek() :查看队列头部元素而不移除,队列为空时也返回 null

一直阻塞操作:

  • put(E e) :将指定元素添加到队列中,如果队列已满,则当前线程会被阻塞直到有空间可用。
  • take() :从队列中移除并返回头部元素,如果队列为空,调用此方法的线程会阻塞等待其他线程存入元素。

超时退出操作:

  • offer(E e, long timeout, TimeUnit unit) :试图将元素添加到队列,若在给定超时时间内仍无法加入,则返回 false ,否则返回 true
  • poll(long timeout, TimeUnit unit) :试图从队列中移除并返回一个元素,若在给定超时时间内队列依然为空,则返回 null

举例说明,以下代码展示了如何使用
BlockingQueue
的一些基本操作:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {
    static final int QUEUE_CAPACITY = 5;
    static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

    public static void main(String[] args) throws InterruptedException {
        // 使用put()方法添加元素,当队列满时阻塞生产者
        for (int i = 0; i < 7; i++) {
            queue.put("Item " + i);
            System.out.println("已放入: " + "Item " + i);
        }

        // 使用take()方法消费元素,当队列空时阻塞消费者
        while (!queue.isEmpty()) {
            String item = queue.take();
            System.out.println("已取出: " + item);
        }

        // 使用offer()方法尝试添加,不会阻塞生产者
        if (!queue.offer("额外项"1, TimeUnit.SECONDS)) {
            System.out.println("添加失败,队列已满或超时");
        }
    }
}

在上述示例中,
ArrayBlockingQueue
的容量为5,当尝试通过
put()
方法添加第6个元素时,生产者线程将会被阻塞;而消费者线程通过
take()
方法逐个取出元素时,如果遇到队列为空的情况,也会被阻塞直至新的元素加入。此外,我们还演示了
offer()
方法配合超时参数,在指定的时间内尝试添加元素,超过这个时间限制仍未成功添加时,方法会立即返回结果而不是继续阻塞。


阻塞队列的实现类


阻塞队列的实现类解析是深入理解Java并发编程中BlockingQueue接口的关键环节。Java标准库提供了多种阻塞队列的实现,每种都有其特定的设计和适用场景。

ArrayBlockingQueue

ArrayBlockingQueue基于数组结构,因此具有固定容量,并且支持公平或非公平锁策略。构造时需要指定容量大小,一旦创建后无法更改。如下示例代码创建了一个容量为10的公平锁ArrayBlockingQueue:

ArrayBlockingQueue<String> fairQueue = new ArrayBlockingQueue<>(10true);

该队列在满或者空时,会通过内部维护的notEmpty和notFull条件变量来控制生产者和消费者的阻塞与唤醒。

LinkedBlockingQueue

LinkedBlockingQueue使用链表数据结构,可以设置初始容量(默认值为
Integer.MAX_VALUE
),意味着如果不指定容量,则它是一个无界队列。此队列遵循先进先出(FIFO)原则。以下是如何创建一个初始容量为20的LinkedBlockingQueue:

LinkedBlockingQueue<Integer> linkedQueue = new LinkedBlockingQueue<>(20);

DelayQueue

DelayQueue中的元素必须实现Delayed接口,每个元素都有一个可延迟的时间,只有当这个延迟时间过期后,消费者才能从队列中取出该元素。这种特性适用于处理定时任务等场景。以下是如何向DelayQueue添加一个延时对象:

class DelayedTask implements Delayed {
    // 实现Delayed接口的方法
}

DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(...)); // 填充带有延迟信息的任务

PriorityBlockingQueue

PriorityBlockingQueue是一种无界的优先级队列,元素按照优先级顺序被取出。优先级通过构造函数传入的Comparator决定,若不提供则按元素的自然排序。下面是如何创建并插入一个根据自定义比较器排序的队列:

class Task implements Comparable<Task{
    int priority;
    // 实现Comparable接口的方法
}

Comparator<Task> comparator = Comparator.comparing(Task::getPriority);
PriorityBlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(10, comparator);
priorityQueue.put(new Task(...));

SynchronousQueue

SynchronousQueue是一种特殊的阻塞队列,它没有内部容量,始终要求生产和消费操作完全匹配:每个put操作都需要有对应的take操作同时发生,反之亦然。对于希望直接传递对象而不进行存储的场景非常有用。下面是SynchronousQueue的基本用法:

SynchronousQueue<Integer> syncQueue = new SynchronousQueue<>();
Thread producerThread = new Thread(() -> {
    try {
        syncQueue.put(1); // 这里将一直阻塞,直到有消费者线程调用take()
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});
Thread consumerThread = new Thread(() -> {
    try {
        Integer value = syncQueue.take(); // 这里将一直阻塞,直到有生产者线程调用put()
        System.out.println("Consumed: " + value);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

producerThread.start();
consumerThread.start();

总之,不同类型的阻塞队列设计各异,开发者应根据实际应用场景选择合适的阻塞队列实现,以充分利用它们各自的优势,确保多线程环境下的高效、安全同步。


阻塞队列的原理剖析


阻塞队列的原理剖析主要围绕其如何利用Java并发包中的锁和条件变量机制来实现线程间的高效同步。以ArrayBlockingQueue为例,其内部使用了ReentrantLock以及两个Condition对象notEmpty和notFull来进行生产和消费过程的控制。

锁(ReentrantLock)的作用
在ArrayBlockingQueue中,所有对共享资源的操作都被保护在一个ReentrantLock之内,确保同一时间只有一个线程能够执行put或take操作。例如,当一个生产者线程试图向满的队列中添加元素时,它必须首先获取到lock锁,否则将被阻塞在外等待。

final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取锁,支持中断

条件变量(Condition)的运用

  • notEmpty:当队列为空时,消费者线程调用 take() 方法会阻塞并注册到notEmpty条件上,直到有生产者线程put了一个新元素进入队列,并通过 notEmpty.signal() 唤醒消费者线程继续执行。
  • notFull:反之,当队列已满时,生产者线程调用 put() 方法会被阻塞并注册到notFull条件上,直到有消费者线程从队列中取走一个元素,使得队列不满,然后通过 notFull.signal() 唤醒生产者线程继续插入元素。
while (count == items.length) { // 判断队列是否已满
    notFull.await(); // 生产者线程在此阻塞等待
}
enqueue(e); // 添加元素至队列

// 对于消费者线程:
while (count == 0) { // 判断队列是否为空
    notEmpty.await(); // 消费者线程在此阻塞等待
}
return dequeue(); // 从队列移除并返回一个元素

put与take操作流程详解

  • put(E e) 方法:生产者线程首先尝试获取锁,如果成功则检查队列是否已满,未满则直接加入元素并唤醒一个等待的消费者线程;若队列已满,则当前线程会在notFull条件上等待,直至其他线程消费元素后释放空间。
  • take() 方法:消费者线程同样先尝试获取锁,如果成功则检查队列是否为空,不为空则立即移除并返回一个元素,并唤醒一个等待的生产者线程;若队列为空,则当前线程在notEmpty条件上等待,直至其他线程放入元素后提供可消费的数据。

总结来说,阻塞队列通过巧妙地结合ReentrantLock及其内部的多个Condition对象实现了线程间的协作与同步,确保了生产者线程在队列未满时可以顺利地添加元素,而消费者线程则在队列非空时能及时消费元素。这种设计避免了线程间的无效竞争和资源浪费,保证了多线程环境下的数据一致性及程序性能。


阻塞队列的应用实例与场景


阻塞队列在多线程编程中具有广泛的应用,特别是在生产者-消费者模式、任务调度以及线程池管理等场景中扮演着至关重要的角色。

生产者-消费者模型实例与分析
在一个典型的生产者-消费者场景中,我们可以使用ArrayBlockingQueue来实现两个线程间的同步交互。下面是一个简化的示例代码:

import java.util.concurrent.ArrayBlockingQueue;

public class Test {
    private static final int QUEUE_CAPACITY = 10;
    private final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

    public static void main(String[] args) throws InterruptedException {
        Test test = new Test();
        Thread producer = new Thread(test.new Producer());
        Thread consumer = new Thread(test.new Consumer());

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                try {
                    queue.put(i);
                    System.out.println("生产者插入了一个元素:" + i + ",队列剩余空间:" + (QUEUE_CAPACITY - queue.size()));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Integer item = queue.take();
                    System.out.println("消费者消费了一个元素:" + item + ",当前队列大小:" + queue.size());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

在这个例子中,生产者线程持续地向ArrayBlockingQueue中添加整数,当队列满时,put操作会自动阻塞;而消费者线程则不断从队列中移除并打印元素,当队列为空时,take操作也会被阻塞。通过这种方式,阻塞队列成功协调了两个线程的执行节奏,避免了资源竞争和数据不一致的问题。

线程池中的应用
Java线程池(ThreadPoolExecutor)是另一个利用阻塞队列作为核心组件的典型例子。在创建线程池时,可以指定一个BlockingQueue作为工作队列,用于存储待执行的任务。当核心线程忙碌或超出其最大容量时,新提交的任务会被放入此队列中等待执行。如下所示:

import java.util.concurrent.*;

public class ThreadPoolExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5// 核心线程数
            10// 最大线程数
            60// 空闲线程存活时间
            TimeUnit.SECONDS,
            workQueue // 使用LinkedBlockingQueue作为工作队列
        );

        // 提交多个任务到线程池
        for (int i = 0; i < 20; i++) {
            executor.execute(() -> {
                // 执行具体任务逻辑
                System.out.println("正在执行任务:" + Thread.currentThread().getName());
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,ThreadPoolExecutor内部的工作机制正是依赖于阻塞队列对任务进行缓存和分配。当线程池无法立即处理所有提交的任务时,新的任务会被放入LinkedBlockingQueue中排队等待,直到有空闲的线程可用。这种设计极大地提高了系统处理并发任务的能力,并保证了线程资源的有效利用。


总结


在深入浅出Java多线程之阻塞队列的学习过程中,我们已经了解到阻塞队列作为Java并发编程中的重要工具,它不仅简化了生产者-消费者模式的实现,还有效地解决了线程间同步问题。通过ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue和SynchronousQueue等不同的实现类,我们可以根据实际需求选择适合的阻塞队列类型,以确保线程安全地存储和传递数据。

回顾本篇文档中给出的实例,我们可以看到阻塞队列在多线程环境下的高效运作机制,比如在生产者-消费者模型中,生产者线程使用
put()
方法将元素放入队列,并在队列满时被阻塞;而消费者线程利用
take()
方法从队列中取出元素,在队列空时也被相应地阻塞。这种设计使得系统无需显式处理复杂的等待-通知逻辑,极大地提高了程序开发效率和系统的稳定性。

此外,阻塞队列还在Java线程池(ThreadPoolExecutor)中扮演着核心角色,作为任务缓冲区,保证了线程资源的有效分配和调度。例如,当新任务提交到已饱和的线程池时,任务会被暂存于工作队列中,如LinkedBlockingQueue,等待线程执行完成后再从队列中取出并执行。

总结来说,阻塞队列是Java并发编程的核心组件之一,熟练运用它可以更好地解决多线程间的同步问题,提高系统整体性能。