2023年4月

利用 WAL 技术,数据库将随机写转换成了顺序写,大大提升了数据库的性能,由此也带来了内存脏页的问题。

脏页会被后台线程自动 flush,也会由于数据页淘汰而触发 flush,而刷脏页的过程由于会占用资源,可能会让你的更新和查询语句的响应时间长一些。

一、flush 脏页

当内存数据页跟磁盘数据页内容不一致的时候,我们成这个内存页为“脏页”;内存数据写入磁盘后,内存和磁盘上的数据页内容就一致了,称为“干净页”。

InnoDB引擎以页作为磁盘和内存之间交互的基本单位,数据库 I/O 操作的最小单位是页。也就是说,在数据库中,不论读一行,还是读多行,都是将这些行所在的页进行加载。

记录是按照行来存储的,但是数据库的读取并不以行为单位,否则一次读取(也就是一次 I/O 操作)只能处理一行数据,效率会非常低。

Buffer Pool 中存的就是一页一页的数据,当我们要查询的数据不在 Buffer Pool 中时,InnoDB 会将记录所在的页整个加载到 Buffer Pool 中去。

同样的,将 Buffer Pool 中的脏页刷入磁盘时,也是按照页为单位刷入磁盘的。

1、Free List

你从磁盘中读取一个数据页,会先从Free List中找出一个空闲缓存页的描述信息,然后将你读出的数据页中加载进缓存页中。同时将缓存页的描述信息从Free List中剔除,此外该描述信息块还会被维护进LRU链表中。

数据页被加载进Buffer Pool后你就可以对其进行变更操作了。

3、Flush List

如果我们修改了Buffer Pool中某个缓冲页的数据,那么它就与磁盘上的页不一致了,这样的缓冲页也被称之为脏页(dirty page)。

为了性能问题,我们每次修改缓冲页后,并不着急立刻把修改刷新到磁盘上,而是在未来的某个时间点进行刷新操作。

如果有了修复发生,不是立刻刷新,那之后再刷新的时,我们怎么知道Buffer Pool中哪些页是脏页,哪些页从来没有被修改过呢?

创建一个存储脏页的 Flush list,凡是被修改过的缓冲页对应的控制块都会作为节点加入到这个链表中。

4、LRU List

除了以上,Buffer Pool还有另外一种LRU List,整体结构如下:

在BufferPool中,内存管理如下:

  • 需要找 free 空闲数据块:free list
  • 需要找冷热访问的数据块:lru list
  • 需要知道哪些数据块是脏的:flush list

二、刷新方式有哪几种

1、从flush链表中刷新一部分页面到磁盘

后台线程会根据当时系统的繁忙程度确定刷新速率,定时从flush链表中刷新一部分页面到磁盘,即:BUF_FLUSH_LIST

有时后台线程刷新脏页的进度比较慢,导致用户准备加载一个磁盘页到Buffer Pool中时没有可用的缓冲页。此时,就会尝试查看LRU链表尾部,看是否存在可以直接释放掉的未修改缓冲页。

如果没有,则不得不将LRU链表尾部的一个脏页同步刷新到磁盘(与磁盘交互是很慢的,这会降低处理用户请求的速度),即:BUF_FLUSH_SINGLE_PAGE

2、从LRU链表的冷数据中刷新一部分页面到磁盘,即:BUF_FLUSH_LRU

后台线程会定时从LRU链表的尾部开始扫描一些页面,扫描的页面数量可以通过系统变量innodb_lru_scan_depth来指定,如果在LRU链表中发现脏页,则把它们刷新到磁盘。

控制块里会存储该缓冲页是否被修改的信息,所以在扫描LRU链表时,可以很轻松地获取到某个缓冲页是否是脏页的信息。

三、flush性能问题

flush脏页虽然是常态,但是出现以下这两种情况,都是会明显影响性能的:

  1. 一个查询要淘汰的脏页个数太多,会导致查询的响应时间明显变长;
  2. 日志写满,更新全部堵住,写性能跌为 0,这种情况对敏感业务来说,是不能接受的。

InnoDB 会在后台刷脏页,而刷脏页的过程是要将内存页写入磁盘。所以,无论是你的查询语句在需要内存的时候可能要求淘汰一个脏页,还是由于刷脏页的逻辑会占用 IO 资源并可能影响到了你的更新语句,都可能是造成你从业务端感知到 MySQL“抖”了一下的原因。

要尽量避免这种情况,你就要合理地设置 innodb_io_capacity 的值,并且平时要多关注脏页比例,不要让它经常接近 75%。

一旦一个查询请求需要在执行过程中先 flush 掉一个脏页时,这个查询就可能要比平时慢了。

而 MySQL 中的一个机制,可能让你的查询会更慢:在准备刷一个脏页的时候,如果这个数据页旁边的数据页刚好是脏页,就会把这个“邻居”也带着一起刷掉;而且这个把“邻居”拖下水的逻辑还可以继续蔓延,也就是对于每个邻居数据页,如果跟它相邻的数据页也还是脏页的话,也会被放到一起刷。

在 InnoDB 中,innodb_flush_neighbors 参数就是用来控制这个行为的,值为 1 的时候会有上述的“连坐”机制,值为 0 时表示不找邻居,自己刷自己的。

找“邻居”这个优化在机械硬盘时代是很有意义的,可以减少很多随机 IO。机械硬盘的随机 IOPS 一般只有几百,相同的逻辑操作减少随机 IO 就意味着系统性能的大幅度提升。

而如果使用的是 SSD 这类 IOPS 比较高的设备的话,建议你把 innodb_flush_neighbors 的值设置成 0。因为这时候 IOPS 往往不是瓶颈,而“只刷自己”,就能更快地执行完必要的刷脏页操作,减少 SQL 语句响应时间。

在 MySQL 8.0 中,innodb_flush_neighbors 参数的默认值已经是 0 了。

参考资料:

https://hackmysql.com/post/book-6/

《MySQL实战45讲》

作者:京东零售  吴迪

前言

在实际项目开发中无论 M 端、PC 端,或多或少都有一个 utils 文件目录去管理项目中用到的一些常用的工具方法,比如:时间处理、价格处理、解析url参数、加载脚本等,其中很多是重复、基础、或基于某种业务场景的工具,存在项目间冗余的痛点以及工具方法规范不统一的问题。

  • 在实际开发过程中,经常使用一些开源工具库,如 lodash,以方便、快捷的进行项目开发。但是当 npm上没有自己中意或符合自身业务的工具时,我们不得不自己动手,此时拥有自己的、基于业务的工具库就显得尤为重要。
  • 我们所熟知的Vue、React等诸多知名前端框架,或公司提供的一些类库,它们是如何开发、构建、打包出来的,本文将带领你了解到如何从0到1构建基于自身业务的前端工具库。

构建工具库主流方案

1. WEBPACK

  • webpack 提供了构建和打包不同模块化规则的库,只是需要自己去搭建开发底层架构。
  • vue-cli,基于 webpack , vue-cli 脚手架工具可以快速初始化一个 vue 应用,它也可以初始化一个构建库。

2. ROLLUP

  • rollup 是一个专门针对JavaScript模块打包器,可以将应用或库的小块代码编译成更复杂的功能代码。
  • Vue、React 等许多流行前端框架的构建和打包都能看到 rollup 的身影。

为什么采用 ROLLUP 而不是 WEBPACK

  • webpack 主要职能是开发应用,而 rollup 主要针对的就是 js 库的开发,如果你要开发 js 库,那 webpack 的繁琐配置和打包后的文件体积就不太适用了,通过webpack打包构建出来的源代码增加了很多工具函数以外的模块依赖代码。
  • rollup 只是把业务代码转码成目标 js ,小巧且轻便。rollup对于代码的Tree-shaking和ES6模块有着算法优势上的支持,如果只想构建一个简单的库,并且是基于ES6开发的,加上其简洁的API,rollup得到更多开发者的青睐。

工具库底层架构设计

构建工具库底层架构大概需要哪些功能的支持:

架构依赖需知

在对底层架构设计的基础上,首先需要把用到的依赖库简单熟悉一下:

rollup 全家桶

•  rollup(工具库打包构建核心包)

•  rollup-plugin-livereload(rollup 插件,热更新,方便本地 debugger 开发)

•  rollup-plugin-serve(rollup 插件,本地服务代理,方便在本地 html 中调试工具)

•  rollup-plugin-terser(rollup 插件,代码压缩混淆)

•  rollup-plugin-visualizer(rollup 插件,可视化并分析 Rollup bundle,以查看模块占用)

•  @rollup/plugin-babel(rollup 插件,rollup 的 babel 插件,ES6 转 ES5)

•  @rollup/plugin-commonjs(rollup 插件,用来将 CommonJS 模块转换为 ES6,这样它们就可以包含在 Rollup 包中)

•  @rollup/plugin-json(rollup 插件,它将.json 文件转换为 ES6 模块)

•  @rollup/plugin-node-resolve(rollup 插件,它使用节点解析算法定位模块,用于在节点模块中使用第三方 node_modules 包)

•  @rollup/plugin-typescript(rollup 插件,对 typescript 的支持,将 typescript 进行 tsc 转为 js)

typescript 相关

•  typescript(使用 ts 开发工具库)

•  tslib(TypeScript 的运行库,它包含了 TypeScript 所有的帮助函数)

•  @typescript-eslint/eslint-plugin(TypeScript 的 eslint 插件,约束 ts 书写规范)

•  @typescript-eslint/parser(ESLint 解析器,它利用 TypeScript ESTree 来允许 ESLint 检测 TypeScript 源代码)

文档相关

•  typedoc(TypeScript 项目的文档生成器)

•  gulp(使用 gulp 构建文档系统)

•  gulp-typedoc(Gulp 插件来执行 TypeDoc 工具)

•  browser-sync(文档系统热更新)

单元测试相关

•  jest(一款优雅、简洁的 JavaScript 测试框架)

•  @types/jest(Jest 的类型定义)

•  ts-jest(一个支持源映射的 Jest 转换器,允许您使用 Jest 来测试用 TypeScript 编写的项目)

•  @babel/preset-typescript(TypeScript 的 Babel 预设)

其他依赖

•  eslint(代码规范约束)

•  @babel/core(@rollup/plugin-babel 依赖的 babel 解析插件)

•  @babel/plugin-transform-runtime(babel 转译依赖)

•  @babel/preset-env(babel 转译依赖)

•  chalk(控制台字符样式)

•  rimraf(UNIX 命令 rm -rf 用于 node)

•  cross-env(跨平台设置 node 环境变量)


底层架构搭建

1. 初始化项目

新建一个文件夹 utils-demo,执行 npm init,过程会询问构建项目的基本信息,按需填写即可:

npm init

2. 组织工具库业务开发 SRC 目录结构

创建工具库业务开发 src 文件目录,明确怎样规划工具库包,里面放置的是工具库开发需要的业务代码:

3. 安装项目依赖

要对 typescript 代码进行解析支持需要安装对 ts 支持的依赖,以及对开发的工具的一些依赖包:

yarn add typescript tslib rollup rollup-plugin-livereload rollup-plugin-serve rollup-plugin-terser rollup-plugin-visualizer 
@rollup/plugin-babel @rollup/plugin-commonjs @rollup/plugin-json @rollup/plugin-node-resolve @rollup/plugin-typescript 
@babel/core @babel/plugin-transform-runtime @babel/preset-env rimraf lodash chalk@^4.1.2 -D

这里遇到一个坑,关于最新 chalk5.0.0 不支持在 nodejs 中 require()导入,所以锁定包版本 chalk@^4.1.2

要对 typescript 进行解析和编译还需要配置 tsconfig.json,该文件中指定了用来编译这个项目的根文件和编译选项,在项目根目录,使用 tsc --init 命令快速生成 tsconfig.json 文件(前提全局安装 typescript)

npm i typescript -g
tsc --init

初始化 tsconfig 完成之后,根目录自动生成 tsconfig.json 文件,需要对其进行简单的配置,以适用于 ts 项目,其中具体含义可以参考
tsconfig.json
官网

4. 组织项目打包构建 SCRIPTS 目录结构

  1. 根目录创建项目打包构建 scripts 脚本文件目录,里面放置的是有关于项目打包构建需要的文件:

生成rollup配置项函数核心代码:

const moduleName = camelCase(name) // 当format为iife和umd时必须提供,将作为全局变量挂在window下:window.moduleName=...
const banner = generateBanner() // 包说明文案
// 生成rollup配置文件函数
const generateConfigs = (options) => {
  const { input, outputFile } = options
  console.log(chalk.greenBright(`获取打包入口:${input}`))
  const result = []
  const pushPlugins = ({ format, plugins, ext }) => {
    result.push({
      input, // 打包入口文件
      external: [], // 如果打包出来的文件有项目依赖,可以在这里配置是否将项目依赖一起打到包里面还是作为外部依赖
      // 打包出口文件
      output: {
        file: `${outputFile}${ext}`, // 出口文件名称
        sourcemap: true, // // 是否生成sourcemap
        format, // 打包的模块化格式
        name: moduleName, // 当format为iife和umd时必须提供,将作为全局变量挂在window下:window.moduleName=...
        exports: 'named' /** Disable warning for default imports */,
        banner, // 打包出来的文件在最顶部的说明文案
        globals: {} // 如果external设置了打包忽略的项目依赖,在此配置,项目依赖的全局变量
      },
      plugins // rollup插件
    })
  }
  buildType.forEach(({ format, ext }) => {
    let plugins = [...defaultPlugins]
    // 生产环境加入包分析以及代码压缩
    plugins = [
      ...plugins,
      visualizer({
        gzipSize: true,
        brotliSize: true
      }),
      terser()
    ]

    pushPlugins({ format, plugins, ext })
  })
return result
}



  1. rollup 在打包构建的过程中需要进行 babel 的转译,需要在根目录添加.babelrc 文件告知 babel:
{
  "presets": [
    [
      "@babel/preset-env"
    ]
  ],
  "plugins": ["@babel/plugin-transform-runtime"]
}


  1. 此时距离打包构建工具库只差一步之遥,配置打包脚本命令,在 package.json 中配置命令:
"scripts": {
    "build": "rimraf lib && rollup -c ./scripts/rollup.config.js" // rollup打包
 },


  1. 执行 yarn build,根目录会构建出一个 lib 文件夹,里面有打包构建的文件,还多了一个 stats.html,这个是可视化并分析 Rollup bundle,用来查看工具模块占用空间:

架构搭建优化

项目搭建到这里,不知机智的你能否发现问题:

  1. 只要添加了一个工具,就要在入口文件导出需要打包构建的工具,在多人开发提交代码的时候将引来冲突的产生:

  1. 使用工具库的时候,按需引用的颗粒度太细了,不能满足一些要求颗粒度粗的朋友,比如:

• 我想使用该包里面 date 相关工具,要这样吗?

import { dateA, dateB, dateC } from "utils-demo"

能不能这样?

import { date } from "utils-demo"
date.dateA()
date.dateB()
date.dateC()


• 在一些使用 script 脚本引入的场景下,就仅仅需要 date 相关的工具,要这样吗?

<script src="https://xxx/main.min.js">

能不能这样?

<script src="https://xxx/date.min.js">

这样仅仅使用 date 里面的工具,就没有必要将所有的工具都引入了

解决方案:

  1. 针对第一个代码冲突的问题,可以根据 src > modules 下目录结构自动生成入口文件 index.ts

自动构建入口文件核心代码:

const fs = require('fs') // node fs模块
const chalk = require('chalk') // 自定义输出样式
const { resolveFile, getEntries } = require('./utils')
let srcIndexContent = `
// tips:此文件是自动生成的,无需手动添加
`
getEntries(resolveFile('src/modules/*')).forEach(({ baseName, entry }) => {
  let moduleIndexContent = `
// tips:此文件是自动生成的,无需手动添加
`
  try {
    // 判断是否文件夹
    const stats = fs.statSync(entry)
    if (stats.isDirectory()) {
      getEntries(`${entry}/*.ts`).forEach(({ baseName }) => {
        baseName = baseName.split('.')[0]
        if (baseName.indexOf('index') === -1) {
          moduleIndexContent += `
export * from './${baseName}'
`
        }
      })
      fs.writeFileSync(`${entry}/index.ts`, moduleIndexContent, 'utf-8')
      srcIndexContent += `
export * from './modules/${baseName}'
export * as ${baseName} from './modules/${baseName}'
`
    } else {
      srcIndexContent += `
export * from './modules/${baseName.split('.')[0]}'
`
    }
  } catch (e) {
    console.error(e)
  }
})
fs.writeFileSync(resolveFile('src/index.ts'), srcIndexContent, 'utf-8')


  1. 针对颗粒度的问题,可以将 modules 下各种类型工具文件夹下面也自动生成入口文件,除了全部导出,再追加 import * as 模块类名称 类型的导出

至此,基本上解决了工具库打包的问题,但是架构中还缺少本地开发调试的环境,下面为大家介绍如何架构中添加本地开发调试的系统。

本地开发调试系统

首先要明确要加入本地开发调试系统的支持,需要做到以下:

•  跨平台(window不支持NODE_ENV=xxx)设置环境变量,根据环境配置不同的 rollup 配置项

•  引入本地开发需要的 html 静态服务器环境,并能做到热更新

  1. 跨平台设置环境变量很简单,使用 cross-env 指定 node 的环境
yarn add cross-env -D
  1. 配置 package.json 命令
 "scripts": {
    "entry": "node ./scripts/build-entry.js",
    "dev": "rimraf lib && yarn entry && cross-env NODE_ENV=development rollup -w -c ./scripts/rollup.config.js", // -w 表示监听的工具模块的修改
    "build": "rimraf lib && yarn entry && cross-env NODE_ENV=production rollup -c ./scripts/rollup.config.js"
  },


  1. 根据最开始架构设计的模块,在项目根目录新建 debugger 文件夹,里面存放的是工具调试的 html 静态页面

  1. 接下来就是配置 scripts > rollup.config.js ,将 NODE_ENV=development 环境加入 rollup 配置,修改生成rollup配置项函数核心代码:
(isProd ? buildType : devType).forEach(({ format, ext }) => {
    let plugins = [...defaultPlugins]
    if (isProd) {
      // 生产环境加入包分析以及代码压缩
      plugins = [...plugins, visualizer({
        gzipSize: true,
        brotliSize: true
      }), terser()]
    } else {
      // 非生产环境加入热更新和本地服务插件,方便本地debugger
      plugins = [...plugins, ...[
        // 热更新
        rollUpLiveLoad({
          watch: ['debugger', 'lib'],
          delay: 300
        }),
        // 本地服务代理
        rollupServe({
          open: true,
          // resolveFile('')代理根目录原因是为了在ts代码里debugger时可以方便看到调试信息
          contentBase: [resolveFile('debugger'), resolveFile('lib'), resolveFile('')]
        })
      ]]
    }
    pushPlugins({ format, plugins, ext })
  })


  1. 执行 yarn dev 之后浏览器会新打开窗口,输入刚添加的工具链接,并且它是热更新的:

工具库文档系统

一个完备的工具库需要有一个文档来展示开发的工具函数,它可能需要具备以下几点支持:

•  支持工具库中方法的可视化预览

•  支持修改工具的时候,具备热更新机制

typedoc
(TypeScript 项目的文档生成器)能完美支持 typescript 开发工具库的文档生成器的支持,它的核心原理就是读取源代码,根据工具的注释、ts的类型规范等,自动生成文档页面

关于热更新机制的支持,第一个自然想到
browser-sync
(文档系统热更新)

由于文档系统的预览功能有很多插件组合来实现的,可以借助 gulp (基于流的自动化构建工具),typedoc正好有对应的
gulp-typedocGulp
插件来执行 TypeDoc 工具插件

构建完成后打开文档系统,并且它是热更新的,修改工具方法后自动更新文档:

单元测试

为确保用户使用的工具代码的安全性、正确性以及可靠性,工具库的单元测试必不可少。单元测试选用的是 Facebook 出品的 Jest 测试框架,它对于 TypeScript 有很好的支持。

1. 环境搭建

  1. 首先全局安装 jest 使用 init 来初始化 jest 配置项
npm jest -g
jest --init
下面是本人设置的jest的配置项
✔ Would you like to use Jest when running "test" script in "package.json"? … yes
✔ Would you like to use Typescript for the configuration file? … yes
✔ Choose the test environment that will be used for testing › jsdom (browser-like)
✔ Do you want Jest to add coverage reports? … yes
✔ Which provider should be used to instrument code for coverage? › babel
✔ Automatically clear mock calls, instances and results before every test? … yes


执行完之后根目录会自动生成jest.config.ts 文件,里面设置了单元测试的配置规则,package.json 里面也多了一个 script 指令 test。

  1. 关于jest.config.js文件配置项具体含义可以查看官网,要想完成 jest 对于 TypeScript 的测试,还需要安装一些依赖:
yarn add jest ts-jest @babel/preset-typescript @types/jest -D
  1. jest 还需要借助 .babelrc 去解析 TypeScript 文件,再进行测试,编辑 .babelrc 文件,添加依赖包 @babel/preset-typescript:
{
  "presets": [
    "@babel/preset-typescript",
    [
      "@babel/preset-env"
    ]
  ],
  "plugins": ["@babel/plugin-transform-runtime"]
}


2. 单元测试文件的编写

  1. 通过以上环节,jest 单元测试环境基本搭建完毕,接下来在__tests__下编写测试用例

  1. 执行 yarn test

可以看到关于 debounce 防抖工具函数的测试情况显示在了控制台:

•  stmts 是语句覆盖率(statement coverage):是不是每个语句都执行了?

•  Branch 分支覆盖率(branch coverage):是不是每个 if 代码块都执行了?

•  Funcs 函数覆盖率(function coverage):是不是每个函数都调用了?

•  Lines 行覆盖率(line coverage):是不是每一行都执行了?

  1. 同时还会发现项目根目录多了一个 coverage 文件夹,里面就是 jest 生成的测试报告:

3. 单元测试文件的编写引发的思考

每次修改单元测试都要执行 yarn test 去查看测试结果,怎么解决?

jest提供了 watch 指令,只需要配置 scripts 脚本就可以做到,单元测试的热更新。

"scripts": {
  "test": "jest --watchAll"
},


以后会写很多工具的测试用例,每次 test 都将所有工具都进行了测试,能否只测试自己写的工具?

jest 也提供了测试单个文件的方法,这样 jest 只会对防抖函数进行测试(前提全局安装了 jest)。

jest debounce.test.ts --watch

工具库包的发布

至此工具库距离开发者使用仅一步之遥,就是发布到npm上,发包前需要在 package.json 中声明库的一些入口,关键词等信息。

  "main": "lib/main.js", // 告知引用该包模块化方式的默认文件路径
  "module": "lib/main.esm.js", // 告知引用该包模块化方式的文件路径
  "types": "lib/types/index.d.ts", // 告知引用该包的类型声明文件路径
  "sideEffects": false, // false 为了告诉 webpack 我这个 npm 包里的所有文件代码都是没有副作用的
  "files": [ // 开发者引用该包后node_modules包里面的文件
    "lib",
    "README.md"
  ],
  "keywords": [
    "typescript",
    "utils-demo",
    "utils"
  ],
  "scripts": {
    "pub": "npm publish"
  },


登陆
npm
,你会看到自己的 packages 里面有了刚刚发布的工具库包:

写在最后

以上就是作者整理的从0到1构建基于自身业务的前端工具库的全过程,希望能给阅读本文的开发人员带来一些新的想法与尝试。

在此基础上已经成功在京东npm源发布了应用于京东汽车前端的工具库
@jdcar/car-utils
,并在各个业务线及系统得到落地。

当然,架构优化之路也还远未结束,比如:打包构建的速度、本地开发按需构建、工具库脚手架化等,后续我们也会基于自身业务以及一些新技术,持续深入优化,在性能上进一步提升,在功能上进一步丰富。本文或存在一些浅显不足之处,也欢迎大家评论指点。

参考资料

[1] rollup 英文文档(
https://rollupjs.org/guide/en/#quick-start

[2] rollup 中文文档(
https://rollupjs.org/guide/zh/#introduction

[3] Rollup.js 实战学习笔记(
https://chenshenhai.github.io/rollupjs-note/

[4] Rollup 打包工具的使用(
https://juejin.cn/post/6844904058394771470

[5] TypeScript、Rollup 搭建工具库(
https://juejin.cn/post/6844904035309322254

[6] 使用 rollup.js 封装各项目共用的工具包(
https://juejin.cn/post/6993720790046736420

[7] 如何开发一个基于 TypeScript 的工具库并自动生成文档(
https://juejin.cn/post/6844903881030238221

[8] 一款优雅、简洁的 JavaScript 测试框架(
https://jestjs.io/zh-Hans/

前言

Disruptor是一个高性能的无锁并发框架,其主要应用场景是在高并发、低延迟的系统中,如金融领域的交易系统,游戏服务器等。其优点就是非常快,号称能支撑每秒600万订单。需要注意的是,Disruptor是单机框架,对标JDK中的Queue,而非可用于分布式系统的MQ

本文基于Disruptor v3.4.*版本

Demo

既然是简单使用,这阶段只需要关注:

  • 生产者
  • 消费者:EventHandler
  • 消息的传递:消息的载体Event

简单例子

首先,我们定义消息的载体Event,生产者向消费者传递的消息通过Event承载

class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    @Override
    public String toString() {
        return "LongEvent{" + "value=" + value + '}';
    }
}

然后定义Event生产工厂,这用于初始化Event

EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
};

接下来就可以构建Disruptor了,以下是完整代码

// 消息载体(event)
static class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    @Override
    public String toString() {
        return "LongEvent{" + "value=" + value + '}';
    }
}

// 发布消息的转换器
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
    event.set(buffer.getLong(0));
}

public static void main(String[] args) throws Exception {

    // event生产工厂,初始化RingBuffer的时候使用
    EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    };

    // 指定RingBuffer的大小(必须是2的n次方)
    int bufferSize = 1024;

    // 构造Disruptor(默认使用多生产者模式、BlockingWaitStrategy阻塞策略)
    Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
    //  Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());
    // 设置消费者
    EventHandler<LongEvent> handler = (event, sequence, endOfBatch) -> {
        System.out.println("Event: " + event);
    };
    disruptor.handleEventsWith(handler);

    // 启动disruptor,启动所有需要运行的线程
    disruptor.start();

    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long i = 0; i < 100; i++) {
        bb.putLong(i);
        // 发布事件
        ringBuffer.publishEvent(LongEventMain::translate, bb);
    }
}

消费者组合(多使用场景)

Disruptor不仅可以当高性能的队列使用,还支持消费者的串行、并行消费等

以下只展示关键代码(设置消费者),其余部分参考上一节的简单demo

  1. 单链串行

    Untitled

    disruptor.handleEventsWith(handlerA).then(handlerB);
    
  2. 并行

    Untitled

    disruptor.handleEventsWith(handlerA, handlerB);
    
  3. 链内串行,多链并行

    Untitled

    disruptor.handleEventsWith(handlerA).then(handlerC);
    disruptor.handleEventsWith(handlerB).then(handlerD);
    
  4. 菱形(C、D都执行完才到E)

    Untitled

    disruptor.handleEventsWith(handlerA).then(handlerC);
    disruptor.handleEventsWith(handlerB).then(handlerD);
    disruptor.after(handlerC, handlerD).then(handlerE);
    
    
  5. 分组(AB都执行完才到CD)

    Untitled

    disruptor.handleEventsWith(handlerA, handlerB).then(handlerC, handlerD);
    
  6. 分组不重复消费

    组内竞争,组外串行:每个消息在每个分组中只有一个消费者能消费成功,如果就是分组A中只有HandlerA2能得到数据,分组B中只有HandlerB1获得

    Untitled

    // 注意:此处的handler实现的是WorkHandler接口
    disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
                    .then(handlerB1, handlerB2, handlerB3);
    
  7. 分组不重复消费(菱形)

    Untitled

    // handlerA、handlerB实现WorkHandler接口
    // handlerC 实现EventHandler或WorkHandler接口均可
    disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
                    .then(handlerB1, handlerB2, handlerB3)
                    .then(handlerC);
    

    等待策略


    消费者速度比生产者快时,需要等待。因此就有了不同的等待策略以适应不同场景


    • BlockingWaitStrategy

      默认策略。使用锁和 Condition 的等待、唤醒机制。速度慢,但节省CPU资源并且在不同部署环境中能提供更加一致的性能表现。

    • YieldingWaitStrategy

      二段式,一阶段自旋100次,二阶段执行Thread.yield,需要低延迟的场景可使用此策略

    • SleepingWaitStrategy

      三段式,一阶段自旋,二阶段执行Thread.yield,三阶段睡眠

    • BusySpinWaitStrategy

      性能最高的策略,与 YieldingWaitStrategy 一样在低延迟场景使用,但是此策略要求消费者数量低于 CPU 逻辑内核总数


    其他小技巧


    1. 清除消息载体 Event 中的数据

      如果 Event 中存在大对象,应该在消费者链的末尾,添加一个清除数据的消费者,以帮助jvm垃圾回收。demo中的 LongEvent 是
      private long value;
      所以没必要添加。

总结

本文介绍了 Disruptor 的简单使用,以及复杂场景下消费者的配置。下篇开坑 Disruptor 源码解析。


参考资料

Disruptor官方文档

016. 请实现如下功能|谈谈你对闭包的理解

摘自<流畅的python> 第七章 函数装饰器和闭包

  • 实现一个函数(可以不是函数)avg,计算不断增加的系列值的平均值,效果如下

    def avg(...):
        pass
    avg(10) =>返回10
    avg(20) =>返回10+20的平均值15
    avg(30) =>返回10+20+30的平均值20
    

  • Python常见面试题015.请实现一个如下功能的函数
    有点类似,但又不太一样

  • 关键是你需要有个变量来存储历史值

类的实现方式

  • 参考代码

    class Average():
        def __init__(self):
            self.series = []
        def __call__(self, value):
            self.series.append(value)
            return sum(self.series)/len(self.series)
    
    avg = Average()
    print(avg(10))
    print(avg(20))
    print(avg(30))
    
  • avg是个Average的实例

  • avg有个属性series,一开始是个空列表

  • __call__
    使得avg对象可以像函数一样调用

  • 调用的时候series会保留,因为series只在第一次初始化的时候置为空列表

  • 下面的事情就变得简单了


  • 但有没有其他做法呢?
  • 有的,答案是:闭包

闭包实现

  • 参考代码

    def make_average():
        series = []
        def averager(value):
            series.append(value)
            return sum(series)/len(series)
        return averager
    avg = make_average()
    print(avg(10))
    print(avg(20))
    print(avg(30))
    
  • 仔细对比2个代码,你会发现相似度是极高的

  • 一个是类,一个是函数

  • 类中存储历史值的是self.series,函数中的是series
    局部变量

  • 类实例能调用是实现了
    __call__
    ,函数的实现中,avg是make_average()的返回值averager,是个函数名,所以它也能调用

闭包 closure 初识

  • 闭包closure定义:


    • 在一个
      外函数
      中定义了一个
      内函数
    • 内函数里运用了外函数的
      临时变量
    • 外函数的返回值是
      内函数的引用
  • 以上面的为例

    def make_average(): # 外函数
        series = [] # 临时变量(局部变量)
        def averager(value): # 内函数
            series.append(value)
            return sum(series)/len(series)
        return averager # 返回内函数的引用
    
  • 下面这些话你可能听的云里雾里的,姑且听一下。

  • series 是 make_averager 函数的局部变量,因为那个函数的定义体中初始化了series:series = []

  • 调用 avg(10) 时,make_averager 函数已经返回了,而它的本地作用域也一去不复返了

  • 在 averager 函数中,series 是自由变量(free variable)。这是一个技术术语,指未在本地作用域中绑定的变量

  • image-20230410161657612

  • averager 的闭包延伸到那个函数的作用域之外,包含自由变量 series 的绑定

反汇编(dis=Disassembler)

from dis import dis
dis(make_average)
  2           0 BUILD_LIST               0
              2 STORE_DEREF              0 (series)

  3           4 LOAD_CLOSURE             0 (series)
              6 BUILD_TUPLE              1
              8 LOAD_CONST               1 (<code object averager at 0x000002225DD1CBE0, file "<ipython-input-1-a43a8601eedd>", line 3>)
             10 LOAD_CONST               2 ('make_average.<locals>.averager')
             12 MAKE_FUNCTION            8 (closure)
             14 STORE_FAST               0 (averager)

  6          16 LOAD_FAST                0 (averager)
             18 RETURN_VALUE

Disassembly of <code object averager at 0x000002225DD1CBE0, file "<ipython-input-1-a43a8601eedd>", line 3>:
  4           0 LOAD_DEREF               0 (series)
              2 LOAD_METHOD              0 (append)
              4 LOAD_FAST                0 (value)
              6 CALL_METHOD              1
              8 POP_TOP

  5          10 LOAD_GLOBAL              1 (sum)
             12 LOAD_DEREF               0 (series)
             14 CALL_FUNCTION            1
             16 LOAD_GLOBAL              2 (len)
             18 LOAD_DEREF               0 (series)
             20 CALL_FUNCTION            1
             22 BINARY_TRUE_DIVIDE
             24 RETURN_VALUE
  • 读懂上面的,不是人干的事情,不过你依然有可能

    https://docs.python.org/zh-cn/3/library/dis.html#bytecodes
    


code属性

  • 怎么样不云里雾里呢

  • 查看
    avg.__code__
    属性

    [_ for _ in dir(avg.__code__) if _[:2]=='co']
    

    ['co_argcount',
     'co_cellvars',
     'co_code',
     'co_consts',
     'co_filename',
     'co_firstlineno',
     'co_flags',
     'co_freevars',
     'co_kwonlyargcount',
     'co_lnotab',
     'co_name',
     'co_names',
     'co_nlocals',
     'co_posonlyargcount',
     'co_stacksize',
     'co_varnames']
    
  • 官方解释


    属性 描述
    co_argcount 参数数量(不包括仅关键字参数、* 或 ** 参数)
    co_code 原始编译字节码的字符串
    co_cellvars 单元变量名称的元组(通过包含作用域引用)
    co_consts 字节码中使用的常量元组
    co_filename 创建此代码对象的文件的名称
    co_firstlineno 第一行在Python源码的行号
    co_flags CO_*
    标志的位图,详见
    此处
    co_lnotab 编码的行号到字节码索引的映射
    co_freevars 自由变量的名字组成的元组(通过函数闭包引用)
    co_posonlyargcount 仅限位置参数的数量
    co_kwonlyargcount 仅限关键字参数的数量(不包括 ** 参数)
    co_name 定义此代码对象的名称
    co_names 局部变量名称的元组
    co_nlocals 局部变量的数量
    co_stacksize 需要虚拟机堆栈空间
    co_varnames 参数名和局部变量的元组
  • 通过
    __code__
    分析

    def make_average(): 
        series = []
        def averager(value): 
            series.append(value)
            total = sum(series)
            return total/len(series)
        return averager 
    avg = make_average()
    avg.__code__.co_varnames  # 参数名和局部变量的元组
    # ('value', 'total')  # value是参数,total是局部变量名
    avg.__code__.co_freevars 
    # ('series',) # 自由变量的名字组成的元组(通过函数闭包引用)
    
    
    
    
    
  • 结合
    avg.__closure__

    avg.__closure__
    # (<cell at 0x000002225FA4DC70: list object at 0x000002225EE35600>,)
    # 这是个cell对象,list对象
    len(avg.__closure__) # 1
    avg.__closure__[0].cell_contents # [] 因为你还没调用
    avg(10)
    avg(20)
    avg(30)
    avg.__closure__[0].cell_contents # [10, 20, 30] 保存着真正的值
    
    
    
  • 闭包是一种函数,它会保留定义函数时存在的自由变量的绑定,这样调用函数时,虽然定义作用域不可用了,但是仍能使用那些绑定。

  • 只有嵌套在其他函数中的函数才可能需要处理不在全局作用域中的外部变量

nolocal 声明

  • 前面的make_averager 函数的方法效率不高

  • 因为我们把所有值存储在历史数列中,然后在每次调用 averager 时使用 sum 求和

  • 更好的实现方式是,只存储目前的总值和元素个数,然后使用这两个数计算均值

  • 所以你可能这样实现

    def make_average(): 
        total = 0
        length = 0
        def averager(value): 
            total = total + value
            length = length + 1
            return total/length
        return averager 
    avg = make_average()
    
    
  • 执行avg(10)的时候你就会报错

    UnboundLocalError                         Traceback (most recent call last)
    <ipython-input-11-ace390caaa2e> in <module>
    ----> 1 avg(10)
    
    <ipython-input-9-eaa25222e808> in averager(value)
          4     def averager(value):
          5         # nonlocal total,length
    ----> 6         total = total + value
          7         length = length + 1
          8         return total/length
    
    UnboundLocalError: local variable 'total' referenced before assignment
    
  • 这个问题你应该看到过,在前面的面试题002中看到过这样的错误

  • 关键的错误是在于

    total = total + value
    length = length + 1
    
  • 这样的赋值会把total和length都变成局部变量

    from dis import dis
    dis(make_average)
    

      2           0 LOAD_CONST               1 (0)
                  2 STORE_FAST               0 (total)
    
      3           4 LOAD_CONST               1 (0)
                  6 STORE_FAST               1 (length)
    
      4           8 LOAD_CONST               2 (<code object averager at 0x0000026A8ED0E660, file "<ipython-input-12-12a610cc685c>", line 4>)
                 10 LOAD_CONST               3 ('make_average.<locals>.averager')
                 12 MAKE_FUNCTION            0
                 14 STORE_FAST               2 (averager)
    
      8          16 LOAD_FAST                2 (averager)
                 18 RETURN_VALUE
    
    Disassembly of <code object averager at 0x0000026A8ED0E660, file "<ipython-input-12-12a610cc685c>", line 4>:
      5           0 LOAD_FAST                1 (total)
                  2 LOAD_FAST                0 (value)
                  4 BINARY_ADD
                  6 STORE_FAST               1 (total)
    
      6           8 LOAD_FAST                2 (length)
                 10 LOAD_CONST               1 (1)
                 12 BINARY_ADD
                 14 STORE_FAST               2 (length)
    
      7          16 LOAD_FAST                1 (total) #此处 LOAD_FAST 加载局部变量
                 18 LOAD_FAST                2 (length)
                 20 BINARY_TRUE_DIVIDE
                 22 RETURN_VALUE
    
  • 是对数字、字符串、元组等不可变类型来说,只能读取,不能更新。如果尝试重新绑定,例如 count = count + 1,其实会隐式创建局部变量 count。这样,count 就不是自由变量了,因此不会保存在闭包中

  • 为了解决这个问题,Python 3 引入了 nonlocal 声明。它的作用是把变量标记为自由变量,即使在函数中为变量赋予新值了,也会变成自由变量。如果为 nonlocal 声明的变量赋予新值,闭包中保存的绑定会更新。

  • 解决的代码

    def make_average(): 
        total = 0
        length = 0
        def averager(value):
            nonlocal total,length
            total = total + value
            length = length + 1
            return total/length
        return averager 
    avg = make_average()
    # 你就可以avg(10)这样了~
    

在开发中我们有时候需要
每隔 一段时间发送一次电子邮件
,或者
在某个特定的时间进行发送邮件

无需手动去操作,基于这样的情况下我们需要用到了定时任务,一般可以写个定时器,来完成相应的需求,在 node.js 中自已实现也非常容易,接下来要介绍的是node-schedule来完成定时任务

用express.js实现 每个星期三中午12点 发送邮件给某个用户

1.安装第三方库 Node Schedule、nodemailer

npm i -s node-schedule nodemailer

2.新建一个 TaskScheduler 定时任务类

// 引入 node-schedule 模块
const schedule = require('node-schedule');

/*
* TODO:编写 Cron 表达式时,有五个占位符可以使用,分别表示分钟、小时、日期、月份和星期几。
*      每个占位符可以使用特定的值、值范围、逗号分隔的值列表和通配符等等
*
*       * * * * * *
*       | | | | | |
*       | | | | | day of week
*       | | | | month
*       | | | day of month
*       | | hour
*       | minute
*       second ( optional )
*
*      示例 Cron 表达式:
*           每分钟的第30秒触发: 30 * * * * *
*           每小时的1分30秒触发 :30 1 * * * *
*           每天的凌晨1点1分30秒触发 :30 1 1 * * *
*           每月的1日1点1分30秒触发 :30 1 1 1 * *
*           每年的1月1日1点1分30秒触发 :30 1 1 1 1 *
*           每周1的1点1分30秒触发 :30 1 1 * * 1
* */

// 创建一个任务调度器类
class TaskScheduler {
    // 构造函数,接受 cron 表达式和要执行的任务作为参数
    constructor(cronExpression, task) {
        // 将传入的 cron 表达式和任务保存为成员变量
        this.cronExpression = cronExpression;
        this.task = task;
        // 初始化 job 为 null
        this.job = null;
    }

    // 启动任务
    start() {
        // 如果当前没有正在运行的任务,则创建新的任务
        if (!this.job) {
            this.job = schedule.scheduleJob(this.cronExpression, this.task);
            console.log(`定时任务启动: ${this.cronExpression}`);
        }
    }

    // 停止任务
    stop() {
        // 如果当前有正在运行的任务,则取消任务并将 job 设为 null
        if (this.job) {
            this.job.cancel();
            console.log(`定时任务停止: ${this.cronExpression}`);
            this.job = null;
        }
    }
}

// 导出任务调度器类
module.exports = TaskScheduler;

3.创建一个发送邮件的方法

const nodemailer = require("nodemailer");
/**
 * 邮箱发送
 *
 * @param  {string}  to 对方邮箱
 * @param  {string}  content 发送内容
 */

// 创建Nodemailer传输器 SMTP 或者 其他 运输机制
let transporter = nodemailer.createTransport(
    {
        service: 'QQ', // 使用内置传输发送邮件 查看支持列表:https://nodemailer.com/smtp/well-known/
        port: 465, // SMTP 端口
        secureConnection: true, // 使用 SSL
        auth: {
            user: '1840354092@qq.com', // 发送方邮箱的账号
            pass: '******', // 邮箱授权密码
        }
    }
);

exports.send = (to, content) => {
    return new Promise((resolve, reject) => {
        transporter.sendMail({
            from: `"ZY.API" <1840354092@qq.com>`, // 发送方邮箱的账号
            to: to, // 邮箱接受者的账号
            subject: "Welcome to ZY.API", // Subject line
            // text: '"MG'Blog ?"', // 文本内容
            html: `
        <img src="http://www.zhouyi.run:3001/api/v1/files/preview?p=pexels-photo-276452.jpeg&&mimetype=image/jpeg" alt=""  style="height:auto;display:block;" />
        <p >??? <a href="http://www.zhouyi.run/#/">ZY.API</a></p>
        <p style="font-weight: bold">${content}</p>
        <p ><a style="font-size: 18px;font-weight: bolder" href="http://www.zhouyi.run/#/">确认</a></p>
        <p style="text-indent: 2em;">祝您工作顺利,心想事成</p>`
        }, (error, info) => {
            if (error) {
                reject(error)
            }
            resolve(info)
        });
    })
}

4.创建一个
每个星期三中午12点 发送邮件
的任务实例并且引入发送邮件的方法

const TaskScheduler = require('./TaskScheduler')
const {send} = require('../../utils/utils.mailer')


const task = async function () {
    await send('1840354092@qq.com', '每个星期三中午12点 发送邮件')
    return console.log('允许定时任务每个星期三中午12点 发送邮件...' + new Date().getMinutes() + "-" + new Date().getSeconds());
};

// 创建一个 每个星期三中午12点 发送邮件
module.exports = new TaskScheduler('0 0 12 ? * WED', task);

5.路由使用该定时发送邮件类

/**
 *@author ZY
 *@date 2023/4/10
 *@Description:任务相关的接口
 */

const express = require('express');
const router = express.Router();
const SendEmail = require('../../scheduler/task/SendEmail')

/****************************************************************************/


/**
 * 开始发送邮件定时任务
 * @route GET /v1/task/startSendEmail
 * @group 定时任务 - 定时任务相关
 * @returns {object} 200 - {"status": 1,"message": "登录成功.","data": {...},"time": 1680598858753}
 * @returns {Error}  default - Unexpected error
 */

router.get('/startSendEmail', function (req, res) {
    //用户的定时任务开始
    SendEmail.start();
    res.send('用户的定时任务开始!');
});

/**
 * 停止发送邮件定时任务
 * @route GET /v1/task/stopSendEmail
 * @group 定时任务 - 定时任务相关
 * @returns {object} 200 - {"status": 1,"message": "登录成功.","data": {...},"time": 1680598858753}
 * @returns {Error}  default - Unexpected error
 */

router.get('/stopSendEmail', function (req, res) {
    SendEmail.stop();
    res.send('用户的定时任务开始!');
});

module.exports = router;

6.到这里差不多就可以开始定时任务和停止定时任务了,我这里是设置30秒发一次邮件

image.png
e747763d0a54354115cc9b00fda0f34.jpg


狂点这里查看完整项目代码