2023年4月

前言

今天分享一下spring bean的作用域,理解bean的作用域能够在使用过程中避免一些问题,bean的作用域也是spring bean创建过程中一个重要的点。

Spring bean的作用域类型

  • singleton(单例模式):在整个应用程序的生命周期中,只创建一个Bean实例。默认情况下,所有的Bean都是单例模式。

  • prototype(原型模式):每次请求Bean时都会创建一个新的实例。

  • request(请求模式):每次HTTP请求都会创建一个新的实例,该作用域仅适用于Web应用程序的Spring MVC应用程序。

  • session(会话模式):每个用户会话都有一个Bean实例,该作用域仅适用于Web应用程序的Spring MVC应用程序。

  • globalSession(全局会话模式):用于Portlet应用程序,一个全局会话有一个Bean实例。

  • application(应用程序模式):在Web应用程序上下文中,只创建一个Bean实例。

从上面我们能看出spring有很多种作用域,不过在实际使用中,我们基本都使用单例bean,这也是spring bean的默认作用域,单例bean能提高性能,因为全局只存在一个bean实例,而多例bean则每次请求都需要创建bean实例,当并发比较大的时候,就会比较影响性能,所以这时候就需要使用单例bean,但是单例bean可能会存在线程安全问题,当bean中存在可变的成员变量时,就会存在线程安全问题,而多例bean因为每次都会创建一个实例,所以不存在bean共享,就不存在线程安全问题。

使用作用域

现在我们基本都是使用SpringBoot,都是基于注解开发,我们可以使用注解@Scope设置bean的作用域,可以标注在类上和方法上。

标注方法

当我们使用@Bean定义Bean的时候,设置作用域可以直接在方法上面使用@Scope。

@Bean
@Scope("prototype")
public ScopeBean scopeBean(){
    return new ScopeBean();
}

标注类

使用@Component,@Service等注解定义bean时,设置作用域直接在类上添加@Scope。

@Component
@Scope("prototype")
public class ScopeBean {

}

获取bean和创建bean

如果是单例bean,在spring容器启动的时候会将Bean注册进IOC容器中,如果是多例bean,则不注册进IOC容器中。

bean在实例化前会把bean的基础信息组装成BeanDefinition,然后保存到BeanFactory中,如果是单例的bean,则会继续对bean处理后将bean注册到IOC容器中,如果是多例bean,则不注册进,当从IOC容器中获取bean时,如果是单例则从IOC容器中获取已经创建好的bean,如果是多例,则从BeanFactory中获取bean的定义信息BeanDefinition,然后重复和单例bean一样的步骤创建bean,只是创建后它不会保存进IOC容器,而是直接返回。

作用域和懒加载

如果bean设置为懒加载(Lazy)和单例bean,那么bean在spring启动时不会注册进容器中,而是等第一次获取bean的时候在创建bean,后面获取bean的时候再从IOC容器中获取,如果设置为多例bean,那么懒加载其实是没用的。

源码解析

源码比较简单,其实就是判断bean的类型,然后再进入不同的逻辑进行处理,在AbstractBeanFactory中,通过doGetBean()方法获取bean。

判断bean是否存在容器中

下图通过getSingleton()方法从singletonObjects中获取bean,如果获取到直接返回,代表bean已经创建过勒,已经存在与IOC容器中,singletonObjects就是整个spring中装bean实例的容器,他就是一个Map,spring整个bean的创建过程目的就是把bean放进它里面,
所以spring简单得不行,但是更复杂得不行

bean不存在,则创建

下图有两个判断,判断单例bean和多例bean,单例bean会创建bean后然后加入到singletonObjects中,多例bean则直接返回,他们的创建都是走同一个方法,同一样的流程。

具体的创建流程就不一一细说,只是从大体思路去讲解,因为往里面说就贴出大篇代码,没啥必要,感兴趣可以去看源码。

总结

上面说了bean的作用域类型,并对每种作用域类型进行分析,最常用的其实就是单例,并对多例和单例使用过程中的一些注意事项进行讲解,接着说了怎么使用作用域,还有懒加载和作用域的关联,并对bean的创建和获取从源码和白话进行大体的分析。

今天的分享就到这里,感谢你的观看,我们下期见!

前言

一、人物简介

  • 第一位闪亮登场,有请今后会一直教我们C语言的老师 —— 自在。

  • 第二位上场的是和我们一起学习的小白程序猿 —— 逍遥。

二、构成和表示方式

  • 关系运算符的作用是判断符号两边大小的关系
  • C语言中的关系运算符主要有六个,如下表所示
运算符 名称 示例 描述
== 等于 a == b 判断a是否等于b
!= 不等于 a != b 判断a是否不等于b
> 大于 a > b 判断a是否大于b
< 小于 a < b 判断a是否小于b
>= 大于等于 a >= b 判断a是否大于等于b
<= 小于等于 a <= b 判断a是否小于等于b
  • 关系运算符的两边可以是变量、数值 或 表达式

*以下内容为本人的学习笔记,如需要转载,请声明原文链接
微信公众号「ENG八戒」
https://mp.weixin.qq.com/s/aFeiOGO-N9O7Ab_8KJ2wxw

开发者虽然主要负责工程里的开发任务,但是每个开发完毕的功能都是需要开发者自测通过的,所以经常会听到开发者提起单元测试的话题。那么今天我就带大伙一起来看看大名鼎鼎的谷歌 C++ 测试框架 GoogleTest.

本文上接《
C++ 测试框架 GoogleTest 初学者入门篇 甲
》,欢迎关注微信公众号【ENG八戒】查看更多精彩内容。

搜索框传播样式-标准色版.png


安装配置环境

Googletest 的安装配置过程并不复杂,但是对于小白来说还是有些困难,所以很有必要手把手说一下。

为了更清晰地说明环境配置过程,下面会选择对 Googletest 源码执行编译,然后安装。编译源码时,Googletest 官方手册推荐使用的工具有两种,这里只介绍其中的 Cmake。

其实在各个平台,用户都有使用 Googletest 的需求,这里专挑 Linux 和 Windows 平台来分别说明怎么安装配置 Googletest。

linux 平台

我的虚拟机里 Linux 系统是 Ubuntu 18.04,使用 apt 在线直接安装的 cmake 版本最高只能去到 3.10,为了顺利编译最新版的 Googletest 源码,选择安装官网最新版的 cmake。

安装最新版 cmake

先卸载原来的 cmake 版本,再添加指定安装源为 kitware.com 并更新安装缓冲,最后开始安装,下面的指令可依次操作

# 卸载 cmake
sudo apt purge --auto-remove cmake

# 下载 cmake 安装包,
wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | sudo tee /etc/apt/trusted.gpg.d/kitware.gpg >/dev/null

sudo apt-add-repository 'deb https://apt.kitware.com/ubuntu/ bionic main'

sudo apt update

sudo apt install cmake

下载编译 gtest 开发包

下载安装源码包

sudo apt-get install libgtest-dev

进入源码安装目录,执行编译

cd /usr/src/gtest
sudo cmake CMakeLists.txt
sudo make

编译完之后,我看到网上有些教程会推荐直接拷贝库文件(.a)到 /usr/lib 目录

sudo cp *.a /usr/lib

不过,我建议是直接用 install 命令,这样库文件和头文件都可以一步部署到位

sudo make install

好了,到这里 ubuntu 下面的 googletest 就安装结束了,你也可用之前的应用样例来验证一下环境是否 OK 。

Windows 平台

在 Windows 平台安装最新版 cmake 比较简单,这里掠过。

我这里安装的版本是

cmake version 3.24.2

CMake suite maintained and supported by Kitware (kitware.com/cmake).

下载编译 googletest 源码包

这里选择 googletest 的最新稳定版源码包,可以去 github 下载,下面贴出我这边实验用的下载链接

https://codeload.github.com/google/googletest/zip/refs/tags/v1.13.0

下载之后解压到本地文件夹,进到这个本地文件夹,然后启动命令行终端 cmd,当前位置应该是这个本地文件夹,接着按照下面的指令去编译,默认输出
静态
的调试版本库文件(.lib)

mkdir build && cd build
cmake ..
cmake --build .

编译成功后,在目录 .\build\lib\Debug 下面多了很多生成文件,但是我们只需要这两个静态库文件

gtest.lib
gtest_main.lib

那么,怎么输出
动态
的调试版本库文件(.lib)呢?

按照上图修改保存,然后输入和上面一样的编译指令即可。

编译成功后,在目录 .\build\bin\Debug 下面多了很多生成文件,但是我们只要这两个动态库文件

gtest.dll
gtest_main.dll

另外,和编译静态库文件一样,在目录 .\build\lib\Debug 下生成的这两个静态库文件也是需要的,用于链接目标 exe 文件。

gtest.dll
gtest_main.dll

测试工程配置示例

新建一个测试工程 TEST,调用 googletest 库分为静态调用和动态调用,分别对应上一节的两种输出文件,现在就分开来演示一下怎么配置目标测试工程。

TEST 工程只有一个源码文件 main.cpp

调用静态调试库

首先,需要找到 googletest 的源码目录下的 .\googletest\include\gtest

把整个文件夹拷贝到目标测试工程 TEST 目录下,然后把 gtest 编译输出的静态库文件(.lib)也拷贝到目标测试工程 TEST 的 .\gtest\lib 目录下

然后看一下 CMakeLists.txt 可以这样写

cmake_minimum_required(VERSION 2.14)

project(runTests)

# Locate GTest
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
message("CMAKE_CURRENT_SOURCE_DIR:" ${CMAKE_CURRENT_SOURCE_DIR})

link_directories(${CMAKE_CURRENT_SOURCE_DIR}/gtest/lib)

# for window static debug
set(CMAKE_CXX_FLAGS_DEBUG "/MTd")
# for window dynamic debug
# set(CMAKE_CXX_FLAGS_DEBUG "/MDd")
# for window dynamic release
# set(CMAKE_CXX_FLAGS_DEBUG "/MD")
# for window static release
# set(CMAKE_CXX_FLAGS_DEBUG "/MT")

# Link runTests with what we want to test and the GTest and pthread library
add_executable(${PROJECT_NAME} main.cpp)
target_link_libraries(${PROJECT_NAME}
                        gtest
                        gtest_main)

Cmake 工程的编译方法都是一样的,照搬上面提到的编译指令即可,这里略过

调用动态调试库

配置过程,和调用静态调试库是差不多的,但是在把 gtest 编译输出的静态库文件(.lib)拷贝到目标测试工程 TEST 的 .\gtest\lib 目录下时,也需要把 gtest 编译输出的动态库文件(.dll)一起拷贝到目标测试工程 TEST 的 .\gtest\lib 目录下,这样的目的是为了在测试工程 TEST 编译的过程中,方便部署可执行文件(.exe)的运行环境,也就是 exe 文件依赖的 googletest 库文件。

然后看一下 CMakeLists.txt 可以这样写

# for default window dynamic debug
cmake_minimum_required(VERSION 2.14)

project(runTests)

set(DESTINATION_DIR ${PROJECT_BINARY_DIR}/Debug)
execute_process( COMMAND ${CMAKE_COMMAND} -E make_directory ${DESTINATION_DIR})
execute_process( COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/gtest/lib/gtest_main.dll ${DESTINATION_DIR})
execute_process( COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/gtest/lib/gtest.dll ${DESTINATION_DIR})

# Locate GTest
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
message("CMAKE_CURRENT_SOURCE_DIR:" ${CMAKE_CURRENT_SOURCE_DIR})

link_directories(${CMAKE_CURRENT_SOURCE_DIR}/gtest/lib)

# Link runTests with what we want to test and the GTest and pthread library
add_executable(${PROJECT_NAME} main.cpp)
target_link_libraries(${PROJECT_NAME}
                        gtest
                        gtest_main)

在 cmake 运行过程中,会把 exe 文件依赖的 googletest 库文件(.dll)拷贝到即将生成的 exe 文件的存放位置。


由于篇幅受限,本系列教程还未完结,下一篇《C++ 测试框架 GoogleTest 初学者入门篇 丙》将在本公众号稍后推送,如果你想看了解更多精彩内容,欢迎关注我的微信公众号 【ENG八戒】

学习可以等,时间不等人!

关注我,带你学习编程领域更多核心技能!

搜索框传播样式-标准色版.png

前言

介绍高性能队列Disruptor原理以及使用例子。

Disruptor是什么?

Disruptor是外汇和加密货币交易所运营商
LMAX group
建立高性能的金融交易所的结果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。可以对标JDK中的ArrayBlockingQueue。是目前单机且基于内存存储的最高性能的队列实现。见
与ArrayBlockingQueue性能对比

Disruptor高性能秘诀

使用CAS代替锁

锁非常昂贵,因为它们在竞争时需要仲裁。这种仲裁是通过到操作系统内核的上下文切换来实现的,该内核将挂起等待锁的线程,直到它被释放。系统提供的原子操作CAS(Compare And Swap/Set)是很好的锁替代方案,Disruptor中同步就是使用的这种。

比如多生产者模式中com.lmax.disruptor.MultiProducerSequencer就是用了Java里sun.misc.Unsafe类基于CAS实现的API。

image-20210922160128392

等待策略com.lmax.disruptor.BlockingWaitStrategy使用了基于CAS实现的ReentrantLock。

image-20210922160301925

独占缓存行

为了提高效率CPU硬件不会以字节或字为单位移动内存,而是以缓存行,通常大小为 32-256 字节的缓存行,最常见的缓存行是 64 字节。这意味着,如果两个变量在同一个缓存行中,并且由不同的线程写入,那么它们会出现与单个变量相同的写入争用问题。为了获得高性能,如果要最小化争用,那么确保独立但同时写入的变量不共享相同的缓存行是很重要的。

比如com.lmax.disruptor.RingBuffer中属性前后都用未赋值的long来独占。com.lmax.disruptor.SingleProducerSequencerPad也有相同处理方式。

image-20210922151808613

image-20210922151833921

环形队列

  • 使用有界队列,减少线程争用

队列相比链表在访问速度上占据优势,而有界队列相比可动态扩容的无界队列则避免扩容产生的同步问题效率更高。Disruptor和JDK中的ArrayBlockingQueue一样使用有界队列。队列长度要设为2的n次幂,有利于二进制计算。

  • 使用环形数组,避免生产和消费速度差异导致队列头和尾争用

Disruptor在逻辑上将数组的的头尾看成是相连的,即一个环形数组(RingBuffer)。

  • Sequence

生产和消费都需要维护自增序列值(Sequence),从0开始。

生产方只维护一个代表生产的最后一个元素的序号。代表生产的最后一个元素的序号。每次向Disruptor发布一个元素都调用Sequenced.next()来获取下个位置的写入权。

在单生产者模式(SINGLE)由于不存在并发写入,则不需要解决同步问题。在多生产者模式(MULTI)就需要借助JDK中基于CAS(Compare And Swap/Set)实现的API来保证线程安全。

多个消费者各自维护自己的消费序列值(Sequence)保存数组中。

而环形通过与运算(sequence & indexMask)实现的,indexMask就是环形队列的长度-1。以环形队列长度8为例,第9个元素Sequence为8,8 & 7 = 0,刚好又回到了数组第1个位置。

见com.lmax.disruptor.RingBuffer.elementAt(long sequence)

image-20210922184325086

预分配内存

环形队列存放的是Event对象,而且是在Disruptor创建的时候调用EventFactory创建并一次将队列填满。Event保存生产者生产的数据,消费也是通过Event获取,后续生产则只需要替换掉Event中的属性值。这种方式避免了重复创建对象,降低JVM的GC产频率。

见com.lmax.disruptor.RingBuffer.fill(EventFactory
eventFactory)

image-20210922184413900

消费者8种等待策略

当消费速度大于生产速度情况下,消费者执行的等待策略。

策略类名 描述
BlockingWaitStrategy(常用) 使用ReentrantLock,失败则进入等待队列等待唤醒重试。当吞吐量和低延迟不如CPU资源重要时使用。
YieldingWaitStrategy(常用) 尝试100次,全失败后调用Thread.yield()让出CPU。该策略将使用100%的CPU,如果其他线程请求CPU资源,这种策略更容易让出CPU资源。
SleepingWaitStrategy(常用) 尝试200次 。前100次直接重试,后100次每次失败后调用Thread.yield()让出CPU,全失败线程睡眠(默认100纳秒 )。
BusySpinWaitStrategy 线程一直自旋等待,比较耗CPU。最好是将线程绑定到特定的CPU核心上使用。
LiteBlockingWaitStrategy 与BlockingWaitStrategy类似,区别在增加了原子变量signalNeeded,如果两个线程同时分别访问waitFor()和signalAllWhenBlocking(),可以减少ReentrantLock加锁次数。
LiteTimeoutBlockingWaitStrategy 与LiteBlockingWaitStrategy类似,区别在于设置了阻塞时间,超过时间后抛异常。
TimeoutBlockingWaitStrategy 与BlockingWaitStrategy类似,区别在于设置了阻塞时间,超过时间后抛异常。
PhasedBackoffWaitStrategy 根据时间参数和传入的等待策略来决定使用哪种等待策略。当吞吐量和低延迟不如CPU资源重要时,可以使用此策略。

消费者序列

所有消费者的消费序列(Sequence)都放在一个数组中,见com.lmax.disruptor.AbstractSequencer,通过SEQUENCE_UPDATER来更新对应的序列值。

image-20210922182346493

调用更新的地方在com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence... gatingSequences)。

消费太慢队列满了怎么办?

生产者线程被阻塞。生产者调用Sequenced.next()争夺写入权的时候需要判断最小的消费序列值进行比较。如果写入的位置还未消费则会进入循环不断获取最小消费序列值进行比较。

见包com.lmax.disruptor下SingleProducerSequencer或MultiProducerSequencer中next(int n)方法。

image-20210922183913542

Disruptor开发步骤

  • 创建Event、EventFactory、EventHandler和ExceptionHandler类

Event是环形队列(RingBuffer)中的元素,是生产者数据的载体;EventFactory是定义Event创建方式的工厂类;EventHandler则是Event的处理器,定义如何消费Event中的数据。

另外有必要定义一个消费异常处理器ExceptionHandler,它是和EventHandler绑定的。当EventHandler.onEvent()执行抛出异常时会执行对应的异常回调方法。

  • 实例化Disruptor

创建Disruptor需要指定5个参数eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。

EventFactory是上面定义的Event工厂类;

ringBufferSize是环形队列的长度,这个值要是2的N次方;

threadFactory是定义消费者线程创建方式的工厂类;

producerType是指明生产者是一个(SINGLE)还是多个(MULTI)。默认是MULTI,会使用CAS(Compare And Swap/Set)保证线程安全。如果指定为SINGLE,则不使用没必要的CAS,使单线程处理更高效。

waitStrategy指明消费者等待生产时的策略。

  • 设置消费者

指明EventHandler并绑定ExceptionHandler。指定多个EventHandler时,会为每个EventHandler分配一个线程,一个Event会被多个并行EventHandler处理。

也可以指明多个WorkHandler,每个WorkHandler分配一个线程并行消费队列中的Event,一个Event只会被一个WorkHandler处理。

  • 创建/实例化EventTranslator

EventTranslator定义生产者数据转换为Event的方式,不同数量参数有不同的接口用来实现。

  • 最后用Disruptor.publishEvent() 来发布元素指明EventTranslator和参数

例子程序

  • 先引入Maven依赖
<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.4</version>
</dependency>
  • Event
/**
 * 事件
 *
 * @param <T>发布的数据类型
 */
public class MyEvent<T> {

    private T data;

    public T getData() {
        return data;
    }

    public MyEvent<T> setData(T data) {
        this.data = data;
        return this;
    }
}
  • EventFactory
import com.lmax.disruptor.EventFactory;

/**
 * 创建事件的工厂
 *
 * @param <T>发布的数据类型
 */
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {

    @Override
    public MyEvent<T> newInstance() {
        return new MyEvent<>();
    }
}
  • EventHandler
import com.lmax.disruptor.EventHandler;

/**
 * 事件消费方法
 *
 * @param <T>发布的数据类型
 */
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {

    @Override
    public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {
        System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + tMyEvent.getData());
    }
}
  • ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;

/**
 * 消费者异常处理器
 *
 * @param <T>发布的数据类型
 */
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {

    @Override
    public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {
        System.out.println("handleEventException");
    }

    @Override
    public void handleOnStartException(Throwable ex) {
        System.out.println("handleOnStartException");
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {
        System.out.println("handleOnShutdownException");
    }
}

单消费者

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 单消费者
 */
public class SingleConsumerSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一个处理器
        MyEventHandler<String> eventHandler = new MyEventHandler<>();
        disruptor.handleEventsWith(eventHandler);
        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 这里是一个参数的转换器,另外还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)
        // 和多个(EventTranslatorVararg)参数的转换器可以使用,参数类型可以不一样
        EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
                new EventTranslatorOneArg<MyEvent<String>, String>() {
                    @Override
                    public void translateTo(MyEvent<String> event, long sequence, String arg0) {
                        event.setData(arg0);
                    }
                };

        // 发布
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i);
        }

        disruptor.shutdown();
    }
}

单消费者Lambda写法

这种只是迎合Java8 Lambda语法特性,代码更简洁。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class LambdaSample {


    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一个处理器
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
        disruptor.handleEventsWith(eventHandler);
        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 一个参数的转换器
        disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg ");
        // 两个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg ", 1);
        // 三个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
                , "Three arg ", 1, false);
        // 多个参数的转换器
        disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {
            List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
            event.setData("Var arg " + String.join(",", paramList));
        }, "param1", "param2", "param3");

        disruptor.shutdown();
    }
}

多消费者重复消费元素

关键只在于指定多个EventHandler,并且EventHandler还可以分别绑定不同的ExceptionHandler。

每个EventHandler分配一个线程,一个Event会被每个EventHandler处理,适合两个不同的业务都需要处理同一个元素的情况,类似广播模式。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 一个元素多个消费者重复消费
 */
public class RepetitionConsumerSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);


        // 这里指定了2个消费者,那就会产生2个消费线程,一个事件会被消费2次
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
        EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler——2消费:" + event.getData());
        disruptor.handleEventsWith(eventHandler, eventHandler2);
        // 分别指定异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
        disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);

        disruptor.start();

        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
        }

        disruptor.shutdown();
    }
}

多消费者

关键只在于定义WorkHandler,然后实例化多个来消费。

每个WorkHandler分配一个线程,一个元素只会被一个WorkHandler处理。

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class MultiConsumerSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        // 设置2个消费者,2个线程,一个Event只被一个消费者消费
        WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
                System.out.println(Thread.currentThread().getName() + "WorkHandler消费:" + tMyEvent.getData());
        disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);

        disruptor.start();

        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
        }

        disruptor.shutdown();
    }
}

参考链接

Disruptor 主页

Disruptor 技术文档

GitHub Disruptor

GitHub Disruptor Getting Started

Maven Repository Disruptor Framework

LMAX 官网

golang pprof 监控系列(3) —— memory,block,mutex 统计原理

大家好,我是蓝胖子。

在上一篇文章
golang pprof监控系列(2) —— memory,block,mutex 使用
里我讲解了这3种性能指标如何在程序中暴露以及各自监控的范围。也有提到memory,block,mutex 把这3类数据放在一起讲,是因为他们统计的原理是很类似的。今天来看看它们究竟是如何统计的。

先说下结论,
这3种类型在runtime内部都是通过一个叫做bucket的结构体做的统计
,bucket结构体内部有指针指向下一个bucket 这样构成了bucket的链表,每次分配内存,或者每次阻塞产生时,会判断是否会创建一个新的bucket来记录此次分配信息。

先来看下bucket里面有哪些信息。

bucket结构体介绍

// src/runtime/mprof.go:48
type bucket struct {
	next    *bucket
	allnext *bucket
	typ     bucketType // memBucket or blockBucket (includes mutexProfile)
	hash    uintptr
	size    uintptr
	nstk    uintptr
}

挨个详细解释下这个bucket结构体:
首先是两个指针,
一个next 指针,一个allnext指针
,allnext指针的作用就是形成一个链表结构,刚才提到的每次记录分配信息时,如果新增了bucket,那么这个bucket的allnext指针将会指向 bucket的链表头部。

bucket的链表头部信息是由一个全局变量存储起来的,代码如下:

// src/runtime/mprof.go:140
var (
	mbuckets  *bucket // memory profile buckets
	bbuckets  *bucket // blocking profile buckets
	xbuckets  *bucket // mutex profile buckets
	buckhash  *[179999]*bucket

不同的指标类型拥有不同的链表头部变量,mbuckets 是内存指标的链表头,bbuckets 是block指标的链表头,xbuckets 是mutex指标的链表头。

这里还有个buckethash结构,无论那种指标类型,只要有bucket结构被创建,那么都将会在buckethash里存上一份
,而buckethash用于解决hash冲突的方式则是将冲突的bucket通过指针形成链表联系起来,这个指针就是刚刚提到的next指针了。

至此,解释完了bucket的next指针,和allnext指针,我们再来看看bucket的其他属性。

// src/runtime/mprof.go:48
type bucket struct {
	next    *bucket
	allnext *bucket
	typ     bucketType // memBucket or blockBucket (includes mutexProfile)
	hash    uintptr
	size    uintptr
	nstk    uintptr
}

type
属性含义很明显了,代表了bucket属于那种指标类型。

hash
则是存储在buckethash结构内的hash值,也是在buckethash 数组中的索引值。

size
记录此次分配的大小,对于内存指标而言有这个值,其余指标类型这个值为0。

nstk
则是记录此次分配时,堆栈信息数组的大小。还记得在上一讲
golang pprof监控系列(2) —— memory,block,mutex 使用
里从网页看到的堆栈信息吗。

heap profile: 7: 5536 [110: 2178080] @ heap/1048576
2: 2304 [2: 2304] @ 0x100d7e0ec 0x100d7ea78 0x100d7f260 0x100d7f78c 0x100d811cc 0x100d817d4 0x100d7d6dc 0x100d7d5e4 0x100daba20
#	0x100d7e0eb	runtime.allocm+0x8b		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:1881
#	0x100d7ea77	runtime.newm+0x37		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:2207
#	0x100d7f25f	runtime.startm+0x11f		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:2491
#	0x100d7f78b	runtime.wakep+0xab		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:2590
#	0x100d811cb	runtime.resetspinning+0x7b	/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:3222
#	0x100d817d3	runtime.schedule+0x2d3		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:3383
#	0x100d7d6db	runtime.mstart1+0xcb		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:1419
#	0x100d7d5e3	runtime.mstart0+0x73		/Users/lanpangzi/goproject/src/go/src/runtime/proc.go:1367
#	0x100daba1f	runtime.mstart+0xf		/Users/lanpangzi/goproject/src/go/src/runtime/asm_arm64.s:117

nstk 就是记录的堆栈信息数组的大小,看到这里,你可能会疑惑,
这里仅仅是记录了堆栈大小,堆栈的内容呢?关于分配信息的记录呢?

要回答这个问题,得搞清楚创建bucket结构体的时候,内存是如何分配的。

首先要明白结构体在进行内存分配的时候是一块连续的内存,例如刚才介绍bucket结构体的时候讲到的几个属性都是在一块连续的内存上,当然,指针指向的地址可以不和结构体内存连续,但是指针本身是存储在这一块连续内存上的。

接着,我们来看看runtime是如何创建一个bucket的。

// src/runtime/mprof.go:162
func newBucket(typ bucketType, nstk int) *bucket {
	size := unsafe.Sizeof(bucket{}) + uintptr(nstk)*unsafe.Sizeof(uintptr(0))
	switch typ {
	default:
		throw("invalid profile bucket type")
	case memProfile:
		size += unsafe.Sizeof(memRecord{})
	case blockProfile, mutexProfile:
		size += unsafe.Sizeof(blockRecord{})
	}

	b := (*bucket)(persistentalloc(size, 0, &memstats.buckhash_sys))
	bucketmem += size
	b.typ = typ
	b.nstk = uintptr(nstk)
	return b
}

上述代码是创建一个bucket时源码, 其中persistentalloc 是runtime内部一个用于分配内存的方法,底层还是用的mmap,这里就不展开了,只需要知道该方法可以分配一段内存,size 则是需要分配的内存大小。

persistentalloc返回后的unsafe.Pointer可以强转为bucket类型的指针,unsafe.Pointer是go编译器允许的 代表指向任意类型的指针 类型。所以关键是看 分配一个bucket结构体的时候,这个size的内存空间是如何计算出来的。

首先unsafe.Sizeof 得到分配一个bucket代码结构 本身所需要的内存长度,然后加上了nstk 个uintptr 类型的内存长度 ,uintptr代表了一个指针类型,还记得刚刚提到nstk的作用吗?nstk表明了堆栈信息数组的大小,而数组中每个元素就是一个uintptr类型,指向了具体的堆栈位置。

接着判断 需要创建的bucket的类型,如果是memProfile 内存类型 则又用unsafe.Sizeof 得到一个memRecord的结构体所占用的空间大小,如果是blockProfile,或者是mutexProfile 则是在size上加上一个blockRecord结构体占用的空间大小。memRecord和blockRecord 里承载了此次内存分配或者此次阻塞行为的详细信息。

// src/runtime/mprof.go:59
type memRecord struct {
	active memRecordCycle
	future [3]memRecordCycle
}

// src/runtime/mprof.go:120
type memRecordCycle struct {
	allocs, frees           uintptr
	alloc_bytes, free_bytes uintptr
}

关于内存分配的详细信息最后是有memRecordCycle 承载的,里面有此次内存分配的内存大小和分配的对象个数。那memRecord 里的active 和future又有什么含义呢,为啥不干脆用memRecordCycle结构体来表示此次内存分配的详细信息? 这里我先预留一个坑,放在下面在解释,现在你只需要知道,在分配一个内存bucket结构体的时候,也分配了一段内存空间用于记录关于内存分配的详细信息。

然后再看看blockRecord。

// src/runtime/mprof.go:135
type blockRecord struct {
	count  float64
	cycles int64
}

blockRecord 就比较言简意赅,count代表了阻塞的次数,cycles则代表此次阻塞的周期时长,关于周期的解释可以看看我前面一篇文章
golang pprof监控系列(2) —— memory,block,mutex 使用
,简而言之,周期时长是cpu记录时长的一种方式。你可以把它理解成就是一段时间,不过时间单位不在是秒了,而是一个周期。

可以看到,在计算一个bucket占用的空间的时候,除了bucket结构体本身占用的空间,还预留了堆栈空间以及memRecord或者blockRecord 结构体占用的内存空间大小

你可能会疑惑,这样子分配一个bucket结构体,那么如何取出bucket中的memRecord 或者blockRecord结构体呢? 答案是 通过计算memRecord在bucket 中的位置,然后强转unsafe.Pointer指针。

拿memRecord举例,

//src/runtime/mprof.go:187
func (b *bucket) mp() *memRecord {
	if b.typ != memProfile {
		throw("bad use of bucket.mp")
	}
	data := add(unsafe.Pointer(b), unsafe.Sizeof(*b)+b.nstk*unsafe.Sizeof(uintptr(0)))
	return (*memRecord)(data)
}

上面的地址可以翻译成如下公式:

memRecord开始的地址 = bucket指针的地址 +  bucket结构体的内存占用长度 + 栈数组占用长度 

这一公式成立的前提便是 分配结构体的时候,是连续的分配了一块内存,所以我们当然能通过bucket首部地址以及中间的空间长度计算出memRecord开始的地址。

至此,bucket的结构体描述算是介绍完了,但是还没有深入到记录指标信息的细节,下面我们深入研究下记录细节,正戏开始。

记录指标细节介绍

由于内存分配的采样还是和block阻塞信息的采样有点点不同,所以我还是决定分两部分来介绍下,先来看看内存分配时,是如何记录此次内存分配信息的。

memory

首先在上篇文章
golang pprof监控系列(2) —— memory,block,mutex 使用
我介绍过 MemProfileRate ,MemProfileRate 用于控制内存分配的采样频率,代表平均每分配MemProfileRate字节便会记录一次内存分配记录。

当触发记录条件时,runtime便会调用 mProf_Malloc 对此次内存分配进行记录,

// src/runtime/mprof.go:340
func mProf_Malloc(p unsafe.Pointer, size uintptr) {
	var stk [maxStack]uintptr
	nstk := callers(4, stk[:])
	lock(&proflock)
	b := stkbucket(memProfile, size, stk[:nstk], true)
	c := mProf.cycle
	mp := b.mp()
	mpc := &mp.future[(c+2)%uint32(len(mp.future))]
	mpc.allocs++
	mpc.alloc_bytes += size
	unlock(&proflock)
	systemstack(func() {
		setprofilebucket(p, b)
	})
}

实际记录之前还会先获取堆栈信息,上述代码中stk 则是记录堆栈的数组,然后通过 stkbucket 去获取此次分配的bucket,stkbucket 里会判断是否先前存在一个相同bucket,如果存在则直接返回。而判断是否存在相同bucket则是看存量的bucket的分配的内存大小和堆栈位置是否和当前一致。

// src/runtime/mprof.go:229
for b := buckhash[i]; b != nil; b = b.next {
		if b.typ == typ && b.hash == h && b.size == size && eqslice(b.stk(), stk) {
			return b
		}
	}

通过刚刚介绍bucket结构体,可以知道 buckhash 里容纳了程序中所有的bucket,通过一段逻辑算出在bucket的索引值,也就是i的值,然后取出buckhash对应索引的链表,循环查找是否有相同bucket。相同则直接返回,不再创建新bucket。

让我们再回到记录内存分配的主逻辑,stkbucket 方法创建或者获取 一个bucket之后,会通过mp()方法获取到其内部的memRecord结构,然后将此次的内存分配的字节累加到memRecord结构中。

不过这里并不是直接由memRecord 承载累加任务,而是memRecord的memRecordCycle 结构体。

c := mProf.cycle
	mp := b.mp()
	mpc := &mp.future[(c+2)%uint32(len(mp.future))]
	mpc.allocs++
	mpc.alloc_bytes += size

这里先是从memRecord 结构体的future结构中取出一个memRecordCycle,然后在memRecordCycle上进行累加字节数,累加分配次数。

这里有必要介绍下mProf.cycle 和memRecord中的active和future的作用了。

我们知道内存分配是一个持续性的过程,内存的回收是由gc定时执行的,golang设计者认为,如果每次产生内存分配的行为就记录一次内存分配信息,那么很有可能这次分配的内存虽然程序已经没有在引用了,但是由于还没有垃圾回收,所以会造成内存分配的曲线就会出现严重的倾斜(因为内存只有垃圾回收以后才会被记录为释放,也就是memRecordCycle中的free_bytes 才会增加,所以内存分配曲线会在gc前不断增大,gc后出现陡降)。

所以,在记录内存分配信息的时候,是将当前的内存分配信息经过一轮gc后才记录下来,mProf.cycle 则是当前gc的周期数,每次gc时会加1,在记录内存分配时,将当前周期数加2与future取模后的索引值记录到future ,而在释放内存时,则将 当前周期数加1与future取模后的索引值记录到future,想想这里为啥要加1才能取到 对应的memRecordCycle呢? 因为当前的周期数比起内存分配的周期数已经加1了,所以释放时只加1就好。

// src/runtime/mprof.go:362
func mProf_Free(b *bucket, size uintptr) {
	lock(&proflock)
	c := mProf.cycle
	mp := b.mp()
	mpc := &mp.future[(c+1)%uint32(len(mp.future))]
	mpc.frees++
	mpc.free_bytes += size
	unlock(&proflock)
}

在记录内存分配时,只会往future数组里记录,那读取内存分配信息的 数据时,怎么读取呢?

还记得memRecord 里有一个类型为memRecordCycle 的active属性吗,在读取的时候,runtime会调用
mProf_FlushLocked()方法,将当前周期的future数据读取到active里。


// src/runtime/mprof.go:59
type memRecord struct {
	active memRecordCycle
	future [3]memRecordCycle
}

// src/runtime/mprof.go:120
type memRecordCycle struct {
	allocs, frees           uintptr
	alloc_bytes, free_bytes uintptr
}


// src/runtime/mprof.go:305
func mProf_FlushLocked() {
	c := mProf.cycle
	for b := mbuckets; b != nil; b = b.allnext {
		mp := b.mp()

		// Flush cycle C into the published profile and clear
		// it for reuse.
		mpc := &mp.future[c%uint32(len(mp.future))]
		mp.active.add(mpc)
		*mpc = memRecordCycle{}
	}
}

代码比较容易理解,mProf.cycle获取到了当前gc周期,然后用当前周期从future里取出 当前gc周期的内存分配信息 赋值给acitve ,对每个内存bucket都进行这样的赋值。

赋值完后,后续读取当前内存分配信息时就只读active里的数据了,至此,算是讲完了runtime是如何对内存指标进行统计的。

接着,我们来看看如何对block和mutex指标进行统计的。

block mutex

block和mutex的统计是由同一个方法,saveblockevent 进行记录的,不过方法内部针对这两种类型还是做了一点点不同的处理。

有必要注意再提一下,mutex是在解锁unlock时才会记录一次阻塞行为,而block在记录mutex锁阻塞信息时,是在开始执行lock调用的时候记录的 ,除此以外,block在select 阻塞,channel通道阻塞,wait group 产生阻塞时也会记录一次阻塞行为。

// src/runtime/mprof.go:417
func saveblockevent(cycles, rate int64, skip int, which bucketType) {
	gp := getg()
	var nstk int
	var stk [maxStack]uintptr
	if gp.m.curg == nil || gp.m.curg == gp {
		nstk = callers(skip, stk[:])
	} else {
		nstk = gcallers(gp.m.curg, skip, stk[:])
	}
	lock(&proflock)
	b := stkbucket(which, 0, stk[:nstk], true)

	if which == blockProfile && cycles < rate {
		// Remove sampling bias, see discussion on http://golang.org/cl/299991.
		b.bp().count += float64(rate) / float64(cycles)
		b.bp().cycles += rate
	} else {
		b.bp().count++
		b.bp().cycles += cycles
	}
	unlock(&proflock)
}

首先还是获取堆栈信息,然后stkbucket() 方法获取到 一个bucket结构体,然后bp()方法获取了bucket里的blockRecord 结构,并对其count次数和cycles阻塞周期时长进行累加。

// src/runtime/mprof.go:135
type blockRecord struct {
	count  float64
	cycles int64
}

注意针对blockProfile 类型的次数累加 还进行了特别的处理,还记得上一篇
golang pprof监控系列(2) —— memory,block,mutex 使用
提到的BlockProfileRate参数吗,它是用来设置block采样的纳秒采样率的,如果阻塞周期时长cycles小于BlockProfileRate的话,则需要fastrand函数乘以设置的纳秒时间BlockProfileRate 来决定是否采样了,所以如果是小于BlockProfileRate 并且saveblockevent进行了记录阻塞信息的话,说明我们只是采样了部分这样情况的阻塞,所以次数用BlockProfileRate 除以 此次阻塞周期时长数,得到一个估算的总的 这类阻塞的次数。

读取阻塞信息就很简单了,直接读取阻塞bucket的count和周期数即可。

总结

至此,算是介绍完了这3种指标类型的统计原理,简而言之,就是通过一个携带有堆栈信息的bucket对每次内存分配或者阻塞行为进行采样记录,读取内存分配信息 或者阻塞指标信息的 时候便是所有的bucket信息读取出来。