2024年10月

我们是
袋鼠云数栈 UED 团队
,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。

本文作者:UED 团队

现代操作系统都是「多任务」的,也就是操作系统可以「并发」处理多个任务,比如可以在浏览页面的时候同时播放音乐。但是,一般来说我们的 PC 只有一个物理 CPU ,那么它是如何做到在只有一个 CPU 的情况下,并发处理多个任务的呢?我们简单探究一下。

前置知识

我们先简单熟悉一下 CPU 硬件相关的术语:

  • Sockets(physical CPU): 物理CPU,指我们主板上实际插入的CPU,一般来说 PC 只有一个,服务器可能会有多个
  • Cores: CPU物理核心,CPU商品上宣传的一共几核指代的就是这个
  • Logical Processors: 逻辑处理器,如果采用超线程(多线程)技术的话,会比物理核心数多

总的来说: Logical Processors = Sockets _ Cores _ SMT(HT) Multiple
逻辑处理器数量也就代表了操作系统认为能「并行」执行的任务的最高数量

并发 VS 并行

我们对「并发」和「并行」先下个定义,「并发」指的是系统允许多个任务
同时存在
,「并行」则指的是系统支持多个任务
同时执行
,「并发」和「并行」的关键区别在于是否能
同时执行
。在只有单一逻辑处理器的情况下,我们的操作系统只能「并发」执行任务,比如早期的单核 CPU 电脑。但是我们仍然可以边听歌边浏览网页,这是因为 CPU 速度足够快,可以在系统的使用过程中快速切换任务,这样我们就感觉到多个任务
同时存在

在单一逻辑处理器的情况下
,虽然我们可以「并发」执行任务,但实际上我们同时也只能执行一个任务,对于 IO 密集类型的任务,我们用到 CPU 的时间不多,决定任务快慢的往往是硬盘以及网络等硬件,「并发」执行也未尝不可,但是对于计算密集型的任务,我们需要占用更多的 CPU 时间,如果「并发」执行,则往往会造成任务的卡顿(响应时间过长),因此我们需要「并行」的执行该任务,而逻辑处理器的数量代表了能「并行」执行任务的最高数量,这也是为什么现在的处理器大多是多核处理器的原因所在。

进程 VS 线程

我们使用的一个个程序可以称为「进程」( process ),而 process 下可以开辟多个「线程」( thread ),这里引用一下 Microsoft 官方对于进程和线程的解释
About Processes and Threads
:

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

A thread is the entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread's set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread's process. Threads can also have their own security context, which can be used for impersonating clients.

在操作系统层面,process 相互独立,拥有一块独立的虚拟地址空间(内存中),而同一 process 下的 thread 共享该虚拟地址空间,这也是 process 和 thread 最典型,最根本的区别

多进程 VS 多线程

假如我们现在要开发一款浏览器,浏览器的基础功能包括 HTTP 请求,GUI 渲染等功能,如果我们采用单线程来开发,那么势必会遇到一个问题: 当需要网络请求的时候,我们的浏览器就会卡住,所有的用户操作如输入等都没有响应,等网络请求完成,我们才可以进行后续操作,非常影响用户体验,这也是为什么像浏览器这样的程序大多都是多线程的原因,我们需要任务
同时进行
。但是我们前面讲到的多进程也可以多任务同时进行,那么问题就来了,当我们需要实现多任务的时候,多进程和多线程该如何选择呢?

多进程

前面我们提到过,进程之间是相互独立的,每个进程有独立的虚址空间,那么当一个进程因为某些原因崩掉了,其他的进程也不会受到影响(主进程挂掉除外,但是主进程一般只负责调度,挂掉的几率较小),所以当我们需要较高的
稳定性
时,可以考虑多进程。但是创建进程的开销是比较大的,因此要考虑资源问题。

多线程

多线程可以共享虚址空间,而且创建一个线程的
开销较小
,这样我们就可以减少资源的占用。但是正是因为线程之间可以共享虚址空间,当一个线程挂掉了,整个进程会随之挂掉,所以多线程的稳定性相比多进程较差。

Node.js 中的多线程与多进程

child_process & cluster

Node.js提供了多种方法来创建多进程,例如 child_process 提供的
child_process.spawn()

child_process.fork()
,那么什么是 spawn :

Spawn
in computing refers to a function that loads and executes a new child process. The current process may wait for the child to terminate or may continue to execute concurrent computing.

所以
child_process.spawn
的作用是创建了一个子进程,然后在子进程执行一些命令,但是
child_process.spawn()
有一个缺点,就是不能进行进程间通信(IPC: Inter Process Communication),那么当需要进程间通信的时候,就需要使用
child_process.fork()

涉及到现实中多进程的运用,我们往往不会只起一个子进程,当我们需要进程间共享一个端口时,这时候就可以使用Node.js提供的
cluster
,
cluster
创建子进程内部也是通过
child_process.fork()
实现的,支持IPC

structured clone

当我们创建了一个子进程的时候,进程间的通信 Node.js 已经帮我们封装好了,使用
worker.send(message)

process.on('message', handle)
就可以实现进程间的通信,以
cluster
为例:

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', (msg) => {
    process.send(msg);
  });
}

但是需要注意一点,我们发送的 message 会被
structured clone
一份,然后传递给其他进程,因此我们需要注意如果传递了一个 Object 过去,Object 中定义的 Function 及其 prototype 等内容都不会被clone过去。这里发散一下,如果我们需要深拷贝一个对象,而且该对象满足Structured clone的相关算法要求,那么我们可以考虑使用
structuredClone

caniuse
)或者直接创建一个worker来拷贝(当然不推荐)

worker_threads

上述我们讲到进程间的资源是独立的,当我们想共享数据的时候,我们需要structured clone 对应的数据然后传递过去,这在共享数据量较小的时候还可以接受,但是当数据量较多时,克隆数据是一个比较大的开销,这是我们所不能接受的,因此我们需要多线程来共享内存(数据),Node.js 中也提供了相应的方法
worker_threads

多线程在 ko 中的实践

ko

ko
是基于 webpack
@5.x
的打包工具,其仓库采用了 Monorepo 的方式进行包管理。

在这里,ko 提供了 concurrency 模式,该模式下使用多线程执行 eslint 、prettier 或 stylelint ,这里简单介绍一下如何实现。

获取需要 lint 的所有文件

这里使用的是
fast-glob
,主要代码如下所示
factory/runner.ts

import fg, { Pattern } from 'fast-glob';

protected async getEntries(
  patterns: Pattern[],
  ignoreFiles: string[]
): Promise<string[]> {
  return fg(patterns, {
    dot: true,
    ignore: this.getIgnorePatterns(...ignoreFiles),
  });
}

private getIgnorePatterns(...ignoreFiles: string[]) {
    return ['.gitignore', ...ignoreFiles]
      .map(fileName => {
        const filePath = join(this.cwd, fileName);
        if (existsSync(filePath)) {
          return readFileSync(filePath, 'utf-8')
            .split('\n')
            .filter(str => str && !str.startsWith('#'));
        }
        return [];
      })
      .reduce((acc, current) => {
        current.forEach(p => {
          if (!acc.includes(p)) {
            acc.push(p);
          }
        });
        return acc;
      }, []);
  }

返回的是需要 lint 的所有文件路径

lint 相关的 Parser

我们以 eslint 为例
eslint/parser.ts
:

import { eslint } from 'ko-lint-config';
import LintParserFactory from '../factory/parser';
import { IParserOpts } from '../interfaces';

class ESLintParser extends LintParserFactory {
  static readonly EXTENSIONS = ['ts', 'tsx', 'js', 'jsx'];
  private eslintInstance: eslint.ESLint;
  private opts: IParserOpts;
  private config: Record<string, any>;

  constructor(opts: IParserOpts) {
    super();
    this.opts = opts;
    this.generateConfig();
    this.initInstance();
  }

  private initInstance() {
    const { write } = this.opts;
    this.eslintInstance = new eslint.ESLint({
      fix: write,
      overrideConfig: this.config,
      useEslintrc: false,
      extensions: ESLintParser.EXTENSIONS,
    });
  }

  public async format(file: string): Promise<string> {
    const formatter = await this.eslintInstance.loadFormatter();
    let resultText = '';
    try {
      const result = await this.eslintInstance.lintFiles(file);
      if (result[0].errorCount) {
        resultText = formatter.format(result) as string;
      }
      return resultText;
    } catch (ex) {
      console.log(ex);
      process.exit(1);
    }
  }

  public generateConfig() {
    if (this.opts.configPath) {
      this.config = this.getConfigFromFile(this.opts.configPath);
    } else {
      const localConfigPath = this.detectLocalRunnerConfig(this.opts.name);
      if (localConfigPath) {
        this.config = this.getConfigFromFile(localConfigPath);
      }
    }
  }
}

export default ESLintParser;

所有的 parser 实现了 format() 方法,作用是输入一个文件的路径,然后进行 lint ,如果有相关的错误则返回错误结果。

Thread Pool

创建一个线程的是有开销的,虽然相比创建进程而言消耗的较小,但是我们也并不能无休止创建线程。线程是需要调度的,如果我们创建了很多线程,那么系统花在线程调度的时间往往会更长,导致的结果是我们开了多个线程,但是执行程序的耗时反而更长了。为了更好的使用线程,我们引入线程池的概念
WikiPedia

In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program

还是WikiPedia的示例图:
file

简单来说,线程池创建了一定数量的线程,每个线程从任务队列中获取任务并执行,然后继续执行下一个任务直到结束。ko中也实现了相关的线程池
threads/Pool.ts

import { join } from 'path';
import { Worker } from 'worker_threads';
import { IThreadOpts, IParserOpts } from '../interfaces';

class ThreadPool {
  private readonly workers: Worker[] = [];
  private readonly workerPList: Promise<boolean>[] = [];
  private readonly opts: IThreadOpts;
  private queue: string[];
  private stdout: string[] = [];

  constructor(opts: IThreadOpts) {
    console.log('Using Multithreading...');
    this.opts = opts;
    this.queue = this.opts.entries;
    this.format();
  }

  format() {
    const { concurrentNumber, configPath, write, name } = this.opts;
    if (this.workers.length < concurrentNumber) {
      this.workerPList.push(
        this.createWorker({
          configPath,
          write,
          name,
        })
      );
      this.format();
    }
  }

  createWorker(opts: IParserOpts): Promise<boolean> {
    const worker = new Worker(join(__dirname, './Worker.js'), {
      workerData: {
        opts,
      },
    });
    return new Promise(resolve => {
      worker.postMessage(this.queue.shift());
      worker.on('message', (result: string) => {
        this.stdout.push(result);
        if (this.queue.length === 0) {
          resolve(true);
        } else {
          const next = this.queue.shift();
          worker.postMessage(next);
        }
      });
      worker.on('error', err => {
        console.log(err);
        process.exit(1);
      });
      this.workers.push(worker);
    });
  }

  async exec(): Promise<string[]> {
    return Promise.all(this.workerPList).then(() => {
      return this.stdout;
    });
  }
}

export default ThreadPool;

这里的
workers
维护了多个 worker ,相当于线程池的概念,而任务队列对应的则是
queue
,也就是传入的需要 lint 的所有文件,当一个 worker 执行完一个文件的 lint 之后,从
queue
中拿一个新的文件继续执行新的 lint 任务,当
queue
为空时,我们结束任务并返回最终结果。

需要注意的一点是关于
concurrentNumber
也就是我们启动的线程数量,这里我们默认是
Logical Processors
的数量。

结果

那么我们来对比一下多线程和普通情况下的性能,以执行 eslint 为例:

硬件信息:

  • CPU: Apple M1
  • Memory: 8 GB LPDDR4

普通模式下的log为:

exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write
exec eslint with 704 files cost 31.71s

多线程模式下的log为:

exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write --concurrency
Using Multithreading...
exec eslint with 704 files cost 23.60s

可以看到性能有一定程度的提升,但是并没有我们想象中的性能提升多倍,这是为什么呢?我们简单分析一下:

  • 线程启动及其调度消耗了一定的时间
  • 线程内部涉及到了IO操作,而不是单纯的运算

但是可以肯定的是,随着需要 lint 的文件数量增多,两个模式下所用的时间差会增大。

线程安全

在 ko 中, 我们针对 lint 进行了多线程的操作,性能上有了一定程度的提升,但是我们线程间总的来说是相互独立的,没有使用到共享内存的情况。那么当我们需要共享内存时,会遇到一个问题,我们启用了多个线程,线程之间针对共享内存可能存在竞争关系,也就是可能会同时操作共享内存中的数据,这个时候我们就不能保证数据的准确性,专业术语描述为不是
线程安全
的。遇到这种情况,我们一般会涉及到一个专业术语

(
Lock
)

我们回到
work_threads
,看一下官方文档中是如何共享内存的:

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);

注意一点,如果我们想共享内存,我们可以传递
ArrayBuffer
或者
SharedArrayBuffer
,那么这两种类型的数据有什么特殊性呢?

答案是
ArrayBuffer

SharedArrayBuffer
支持
Atomics
一起使用,可以实现 Lock 相关的概念

最后

欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star

数组解构

const arr = ["1","2","3"]

let a,b,c

// 解构赋值
//将数组的第一个元素赋值给第一个变量,第二个元素赋值给第二个变量,依次类推

[a,b,c] = arr
console.log(a,b,c) // 1 2 3
// 声明变量同时解构
let [a,b,c] = ["1","2","3"]
console.log(a,b,c) // 1 2 3
// 如果变量数大于元素数量,则超出的变量部分赋值为undefined
const arr = ["1","2","3"]
let [a,b,c,d] = arr
console.log(a,b,c,d) // 1 2 3 undefined
// 默认值,如果变量声明了默认值,解构为undefined的时候,则会使用默认值,否则使用对应元素
const arr = ["1","2","3"]
let [a,b,c=10,d=11] = arr
console.log(a,b,c,d) // 1 2 3 11
// 解构数组时可以使用...获取剩余的元素

const arr = ["1","2","3","4","5"]
// 第一个元素赋值给a,第二个元素赋值给b,其余元素赋值给c
let [a,b, ...c] = arr
console.log(a) // 1
console.log(b) // 2
console.log(c) // ['3','4','4']
// 解构多维数组
const arr = [[1,2,3],[4,5,6]]
let [[a,b,c],obj] = arr

console.log(a,b,c) // 1 2  3
console.log(obj) // [4,5,6]

对象解构

const user = {"name":"q",age:18}
// 声明变量同时解构对象
let {name,age} = user

console.log(name,age) // q  18
// 先声明变量,再解构对象
const user = {"name":"q",age:18}

let name,age

({name,age} = user)  // {} 在js中是代码块,所以需要在外面使用()
console.log(name,age) // q 18
const user = {"name":"q",age:18}

// 如果变量名与属性名不匹配,则解构为undefined
let {a,b} = user
console.log(a,b) // undefined undefined

// 可以给变量起别名再解构
// 可以给变量既起别名,又设置默认值
let {name:c,age:d=20} = user

console.log(c,d) // q  18

对象的序列化

JS中的对象使用时都是存在于计算机的内存中的,序列化指将对象转换为一个可以存储的格式,在JS中对象的序列化通常是将一个对象转换为字符串(JSON字符串)

const user = {name:"l",age:18}

// 将一个对象转换为json字符串
const strUser = JSON.stringify(user)
console.log(strUser)

// 将一个json字符串转换为js对象
const objUser = JSON.parse(strUser)
console.log(objUser)

编写JSON的注意事项:

  1. JSON字符串有两种类型:


    • JSON对象 {}
    • JSON数组 []
  2. JSON字符串的属性名必须使用双引号引起来

  3. JSON中可以使用的属性值(元素)


    • 数字(Number)
    • 字符串(String) 必须使用双引号
    • 布尔值(Boolean)
    • 空值(Null)
    • 对象(Object {})
    • 数组(Array [])
// 利用json完成深复制,将对象转换成字符串,再将字符串转换成一个新对象
const strUser = JSON.stringify(user)
const objUser = JSON.parse(strUser)

Map

Map用来存储键值对结构的数据(key-value)

Object中存储的数据就可以认为是一种键值对结构

Map和Object的主要区别:

  • Object中的属性名只能是字符串或符号,如果传递了一个其他类型的属性名,JS解释器会自动将其转换为字符串
  • Map中任何类型都可以称为数据的key
map的使用
const info = new Map()

// 设置键值对
info.set("name","l")
info.set({},"a")
info.set(NaN,"b")

console.log(info)  // Map(3) { 'name' => 'l', {} => 'a', NaN => 'b' }

// 当前map大小
console.log(info.size) // 3


// 获取map的所有的key
console.log(info.keys())  // [Map Iterator] { 'name', {}, NaN }


// 获取map所有的value
console.log(info.values()) // [Map Iterator] { 'l', 'a', 'b' }


// 检查map是否包含指定key,包含true否则false
console.log(info.has(NaN)) // true

// 通过key获取指定元素
console.log(info.get("name"))  // l

// 删除元素
info.delete("name")
console.log(info)  // Map(2) { {} => 'a', NaN => 'b' }

// 清空map
info.clear()
console.log(info)  // Map(0) {}


map的转换
const info = new Map()

info.set("name","l")
info.set({},"a")
info.set(NaN,"b")

// 使用Array.from将 map转换为数组
const arr = Array.from(info)
console.log(arr)  // [ [ 'name', 'l' ], [ {}, 'a' ], [ NaN, 'b' ] ]

const arr = [["name",1],["age",2]]

// 将二维数组转换为map
const map = new Map(arr)
console.log(map) // Map(2) { 'name' => 1, 'age' => 2 }

遍历map

const info = new Map()

info.set("a",1)
info.set("b",2)


// 方式一
for (const entry of info){
    const [key,value] = entry
    console.log(key,value)
}

for (const [key,value] of info){
    console.log(key,value)
}

// 方式二

info.forEach((key,value)=>{
    console.log(key,value)
})

Set

  • Set用来创建一个集合
  • 它的功能和数组类似,不同点在于Set中不能存储重复的数据

// 创建集合
const set = new Set()


// 添加数据,如果集合中已经存在要添加的元素,则不会添加
set.add("a")
set.add("b")


// 获取集合的大小
console.log(set.size) // 2

// 检查集合中是否包含指定元素,包含true否则false
console.log(set.has("a")) // true


// 读取指定元素-需要将set转换成数组,然后通过下标读取
const arr = [...set]
console.log(arr[0]) // a

// 删除元素
set.delete("b")

//  遍历集合
for (const v of set){
    console.log(v)
}

// 通过数组创建集合,基于集合元素的唯一性,会对数组进行去重

const arr = [1,2,3,4,4,1,1,5]

const set =  new Set(arr)
console.log(set) // Set(5) { 1, 2, 3, 4, 5 }

Math

Math是一个工具类,提供了数学运算的一些常量和方法

用法大全


// 常量 圆周率
console.log(Math.PI)  // 3.141592653589793

// 求一个数的绝对值
console.log(Math.abs(-123))  // 123

// 求多个值中的最小值
console.log(Math.min(1, 2, 3))  // 1

// 求多个值中的最大值
console.log(Math.max(1,2,3))  // 3

// 求指定数值的幂次方
console.log(Math.pow(2,3)) // 8

// 求一个数的平方根
console.log(Math.sqrt(4)) // 2

// 向下取整
console.log(Math.floor(1.5)) // 1

// 向上取整
console.log(Math.ceil(1.5)) // 2

// 四舍五入取整
console.log(Math.round(3.5)) // 4

// 去除小数位
console.log(Math.trunc(3.5)) // 3

// 生成0~1之间的随机浮点数,不包含0和1
console.log(Math.random()) // 0.11258771607929718

// 生成 0~5之间的随机整数
// Math.random范围 是0~1,*5将范围扩大5倍 0~5 默认不包含0和5,然后四舍五入,根据四舍五入包含0和5
console.log(Math.round(Math.random() * 5))

// 生成 x ~ y 任意范围的随机数
/*
1~6之间的随机数
范围扩大5倍,是0~5,然后+1是1~6,得出随机小数后四舍五入,包含1和6
*/
console.log(Math.round(Math.random() * 5 + 1))


// 11~20之间的整数
// [0,1] -> [0,9] -> [0+11,9+11]
// 先写+号后面的部分,11,,20-11=9
console.log(Math.round(Math.random() * 9 + 11))


// 根据 四舍五入、取证等方法的规则,来决定左右数值是否包含

Date

在JS中所有的和时间相关的数据都由Date对象来表示

用法大全

Date使用

// 创建当前的时间的对象
let date = new Date()
console.log(date) // 2024-10-30T10:48:29.454Z

// 创建指定时间对象-年,月(从下标0开始),日,时,分,秒,毫秒
date = new Date(2024,9,30,13,13,13,13)
console.log(date) // 2020-01-01T05:13:13.000Z

// 创建指定时间的对象-通过时间字符串-年-月-日 时:分:秒
date = new Date("2024-10-30 12:00:00")
console.log(date) // 2024-10-30T04:00:00.000Z

// 创建指定时间的对象-通过时间戳
date = new Date(1730265193013)
console.log(date) // 2024-10-30T05:13:13.013Z




// 获取时间对象的年份
console.log(date.getFullYear())
// 获取时间对象的月份-返回的是月份的索引,索引从0开始
console.log(date.getMonth())
// 获取时间对象的日
console.log(date.getDate())
// 获取日期对象是周几(0-6) 0表示周日
console.log(date.getDay())
// 返回当前日期对象的时间戳
console.log(date.getTime())

// 返回当前时间的时间戳
console.log(Date.now())
Date格式化
let date = new Date()


// 将日期转换为本地时间格式字符串
let time = date.toLocaleDateString()
console.log(time)  // 10/31/2024

// 将时间转换为本地时间格式字符串
time = date.toLocaleTimeString()
console.log(time) // 10:32:05 AM

// 日期时间转换为本地时间格式字符串
time = date.toLocaleString()
console.log(time) // 10/31/2024, 10:32:50 AM

// 日期时间转换为指定地区、格式字符串-指定国家时间
time = date.toLocaleString(
    "en-US", // 指定语言和国家
    {dateStyle: "full", timeStyle: "full"}
    //                             dateStyle 日期的风格
    //                             timeStyle 时间的风格
    //                                 full
    //                                 long
    //                                 medium
    //                                 short
    //                             hour12 是否采用12小时值
    //                                 true
    //                                 false
    //                             weekday 星期的显示方式
    //                                 long
    //                                 short
    //                                 narrow
    //
    //                             year
    //                                 numeric
    //                                 2-digit


)
console.log(time)

包装类

在JS中,除了直接创建原始值外,也可以创建原始值的对象

//         通过 new String() 可以创建String类型的对象
//         通过 new Number() 可以创建Number类型的对象
//         通过 new Boolean() 可以创建Boolean类型的对象

//         注意:千万不要这么做,通过该方式创建的是对象,无法与原始值做比较

let str = new String("hello world")
let num = new Number(10)
let bool = new Boolean(true)
num.max = 100 // 给num对象设置属性

console.log(str) // [String: 'hello world']
console.log(num) // [Number: 10]
console.log(num.max) // 100 
console.log(bool) // [Boolean: true]

console.log(num === 10) // false

包装类:

  • JS中一共有5个包装类
  • String --> 字符串包装为String对象
  • Number --> 数值包装为Number对象
  • Boolean --> 布尔值包装为Boolean对象
  • BigInt --> 大整数包装为BigInt对象
  • Symbol --> 符号包装为Symbol对象

通过包装类可以将一个原始值包装为一个对象

当我们对一个原始值调用方法或属性时,JS解释器会临时将原始值包装为对应的对象,然后调用这个对象的属性或方法


// 原始值
let str = "hello world"

// 给原始值添加属性,会将原始值转换为一次性临时的原始值对象
str.name = "11111"
// 赋值和读取两次调用str.name,是两次临时的原始值对象,无法通过该方法读取
console.log(str.name) // undefined



let num = 100
// 原始值number是没有方法的,当调用方法时,将num临时的转为number对象,调用number对象的toString方法
num = num.toString()

由于原始值会被临时转换为对应的对象,这就意味着原始值对象中的方法都可以直接通过原始值来调用

// 字符串的本质是一个字符数组, hello-->["h","e","l","l","0"]

let str = "hello"

// 可以通过操作数组的一些方法去操作字符串

// 索引读取字符-不可以负索引
console.log(str[0]) // h
// 遍历字符串
for (let char of str){
    console.log(char)
}
// 获取字符串长度
console.log(str.length) // 5

// 根据索引获取字符串,可以接受负数索引
console.log(str.at(-1)) // o

// 索引读取字符-不可以负索引
console.log(str.charAt(0)) // h

// 用来连接两个或者多个字符串,返回一个新字符串
console.log(str.concat(" world"," !")) // hello world !


// 检查字符串中是否包含某个内容,返回true或者false
console.log(str.includes("ll")) //检测全部字符串 true
console.log(str.includes("h",2)) // 从下标2开始检测  false

// 检查指定字符第一次出的位置
console.log(str.indexOf("l")) // 2

// 检查指定字符最后一次出现的位置
console.log(str.lastIndexOf("l")) // 3

// 检查字符串是否以指定内容开头
console.log(str.startsWith("he")) // true

// 检查字符串是否以指定内容结尾
console.log(str.endsWith("o")) // true

// 给字符串补位 如果字符串长度不够指定长度,从开头开始补,返回一个新字符串,不指定补位字符,则默认是空
console.log(str.padStart(7,"0")) // 00hello
// 给字符串补位 如果字符串长度不够指定长度,从结尾开始补0,返回一个新字符串
console.log(str.padEnd(8,"0")) // hello000

// 使用一个新字符串替换字符串中的一个指定内容,返回一个新字符串
console.log(str.replace("l","a")) // healo


// 使用一个新字符串替换所有指定内容,返回一个新字符串
console.log(str.replaceAll("l","a")) // heaao

// 对字符串进行切片,左闭右开
console.log(str.slice(1,3)) // el

// 截取字符串,左闭右开
console.log(str.substring(1,3)) // el

// 将一个字符串拆分为一个数组,参数指定以什么字符间隔拆分
console.log(str.split("")) // [ 'h', 'e', 'l', 'l', 'o' ]

// 将字符串转换为小写
console.log(str.toLowerCase()) // hello

// 将字符串转换为大写
console.log(str.toLocaleUpperCase()) // HELLO

// 去除首尾空格
console.log(str.trim())

// 去除开始空格
console.log(str.trimStart())

// 去除结束空格
console.log(str.trimEnd())

正则表达式

  • 正则表达式用来定义一个规则
  • 通过这个规则计算机可以检查一个字符串是否符合规则或者将字符串中符合规则的内容提取出来
  • 正则表达式也是JS中的一个对象,所以要使用正则表达式,需要先创建正则表达式的对象
创建正则表达式对象
// 通过构造函数创建正则表达式对象,RegExp() 接收两个字符串参数 1.正则表达式 2.匹配模式
// 字符串转义原因,表达式编写注意转义
let regObj = new RegExp("\\w","i")


// 使用字面量创建一个正则表达式对象, /正在表达式/匹配模式
// 字面量创建不用考虑转义问题
let reg = /\w/i


let reg2 = new RegExp("a")
// test方法:get定的字符串str中测试是否存在与正则表达式re匹配的部分
console.log(reg2.test("abc")) // 字符串str 表达式a  str中包含a   返回true
正则表达式语法

// 在正则表达式中大部分字符都可以直接写
// | 在正则表达式中表示或

// 字符串是否有abc或者bcd
let re = /abc|bcd/



// [] 表示字符集

// 字符串是否包含字符集里面的任意字母,等同于 a | b | c
re = /[abc]/

// 任意的小写字母
re = /[a-z]/

// 任意的大写字母
re = /[A-Z]/

// 任意的大小写字母
re = /[a-zA-Z]/

// 任意的数字
re = /[0-9]/


// [^]表示排除

// 除了a以外的任意字符
re = /[^a]/

// .表示除了换行以外的任意字符 ,如果想匹配. 需要转义 /\./
re = /./

// 其他字符集

// 任意的单词字符 [A-Za-z0-9_]
re = /\w/

// 排除单词字符  [^A-Za-z0-9_]
re = /\W/

// 任意数字 [0-9]
re = /\d/

// 排除数字 [^0-9]
re = /\D/

// 空格
re = /\s/

// 排除空格
re = /\S/

// 单词边界
re = /\b/

// 排除单词边界
re = /\B/

// 开头和结尾,^表示字符串的开头,$表示字符串的结尾

// 匹配开始位置的a
re = /^a/

// 匹配结束位置的a
re = /a$/

// 匹配字幕a,完全匹配,字符串和正则完全一致
re = /^a$/
量词


// 3个a
let re = /a{3}/

// 至少3个a
re = /a{3,}/

// 3~6个a
re = /a{3,6/

// 多字符匹配:两次匹配只对量词前面的一个字符生效,比如 /ab{3}/,只会检测b,不会检测ab,如果要检测ab,需要使用()
re = /(ab){3}/


//  开头任何个小写字母
re = /^[a-z]{3}/


// + 表示一个以上,等同于{1,}
re = /a+/

// *表示任意数量
re = /a*/

// ?表示0-1次  等同于{0,1}
re = /a?/


正咋表达式匹配模式
  1. 默认(贪婪模式)
    • 在 JavaScript 正则表达式中,默认情况下量词(如
      *

      +

      ?
      )是贪婪的。这意味着它们会尽可能多地匹配字符。
    • 例如,对于正则表达式
      /a.*b/
      和字符串
      "aabbbc"
      ,它会匹配从第一个
      a
      到最后一个
      b
      的整个字符串,即
      "aabbbc"
      。因为
      .*
      会尽可能多地匹配中间的字符,直到遇到最后一个
      b
  2. 非贪婪模式(懒模式)
    • 通过在量词后面添加
      ?
      可以将其转换为非贪婪模式。在非贪婪模式下,量词会尽可能少地匹配字符。
    • 例如,对于正则表达式
      /a.*?b/
      和字符串
      "aabbbc"
      ,它会匹配
      "aab"
      。因为
      .*?
      会在遇到第一个
      b
      时就停止匹配,只匹配最少的字符来满足表达式。
  3. 全局匹配模式(
    g

    • 当在正则表达式末尾添加
      g
      标志(如
      /pattern/g
      )时,它会进行全局匹配。这意味着会在整个字符串中查找所有匹配的部分,而不是只找到第一个匹配就停止。
    • 例如,对于正则表达式
      /a/g
      和字符串
      "aaab"
      ,它会匹配所有的
      a
      ,返回一个包含三个
      a
      的数组(在使用
      match
      方法时)。如果没有
      g
      标志,只会返回第一个
      a
  4. 不区分大小写匹配模式(
    i

    • 在正则表达式末尾添加
      i
      标志(如
      /pattern/i
      )会使匹配不区分大小写。
    • 例如,对于正则表达式
      /a/i
      和字符串
      "Aa"
      ,它会匹配
      A
      ,因为不区分大小写。
  5. 多行匹配模式(
    m

    • 当添加
      m
      标志(如
      /pattern/m
      )时,
      ^

      $
      的匹配行为会发生改变。
      ^
      不仅可以匹配字符串开头,还可以匹配每一行的开头;
      $
      不仅可以匹配字符串结尾,还可以匹配每一行的结尾。
    • 例如,对于正则表达式
      /^a/m
      和字符串
      "a\nb"
      ,它会匹配第一行开头的
      a
      ,如果没有
      m
      标志,它只会匹配整个字符串开头的
      a
      ,对于这个例子就不会匹配
正则表达式提取

exec()用于获取字符串中符合正则表达式的内容

let str = "abc123bcc456cbdd"


let re = /\d+/ig


// '123'是本次匹配到的结果,index是匹配到的字符串在原始输入字符串中的起始索引位置是 3,input是原始字符串,group是组
console.log(re.exec(str)) // [ '123', index: 3, input: 'abc123bcc456cbdd', groups: undefined ]
console.log(re.exec(str)) // [ '456', index: 9, input: 'abc123bcc456cbdd', groups: undefined ]
// 每调用一次只会匹配一次,调用多次会依次向后匹配,为null说明匹配结束没有了
console.log(re.exec(str)) // null



// 使用()可以进行分组

str = "abcadcacc"

// 匹配a*c,使用()把中间的字符分组
re = /a([a-z])c/

// abc是本次匹配到的内容,b是分组内的内容
console.log(re.exec(str)) // [ 'abc', 'b', index: 0, input: 'abcadcacc', groups: undefined ]

字符串的正则方法
let str = "llaacqqabcccaccdd"
// 可以根据正则表达式来对一个字符串进行拆分

console.log(str.split(/a[a-z]c/)) // [ 'll', 'qq', 'cc', 'dd' ]

// 根据正则表达式替换字符串中的指定内容
console.log(str.replace(/a[a-z]c/g,"@")) // ll@qq@cc@dd


// 可以去搜索符合正则表达式的内容第一次在字符串中出现的位置
console.log(str.search(/a[a-z]c/)) // 2

// 根据正则表达式去匹配字符串中所有符合要求的内容
console.log(str.match(/a/g)) // [ 'a', 'a', 'a', 'a' ]


// 根据正则表达式去匹配字符串中符合要求的内容(必须设置g 全局匹配),返回的是一个迭代器
console.log(str.matchAll(/a/g)) // Object [RegExp String Iterator] {}


垃圾回收机制

  • 如果一个对象没有任何的变量对其进行引用,那么这个对象就是一个垃圾对象
  • 垃圾对象的存在,会严重的影响程序的性能
  • 在JS中有自动的垃圾回收机制,这些垃圾对象会被解释器自动回收,我们无需手动处理
  • 对于垃圾回收来说,我们唯一能做的事情就是将不再使用的变量设置为null
let obj = { value: 1 };
let anotherObj = obj;  // obj的引用次数增加到2
obj = null;            // obj的引用次数减少到1
anotherObj = null;     // obj的引用次数减少到0,可被垃圾收集器回收

Flink DataStream/API

未变的重要特性

虽然官宣建议弃用 JDK 8,使用JDK 11+;但:仍继续支持 JDK 8

个人猜测:JDK 8 的用户群实在太大,牵一发而动全身,防止步子扯太大,遏制自身项目的发展势头。

依赖模块的变化

版本变化

  • flink.version : 1.12.6 => 1.15.4
  • flink.connector.version : 1.12.6 => 1.15.4
  • flink.connector.cdc.version : 1.3.0 => 2.3.0
  • apache flink cdc 1.3.0
<dependency>
	<groupId>com.alibaba.ververica</groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>1.3.0</version>
</dependency>
  • apache flink cdc 2.3.0
<dependency>
	<groupId>com.alibaba.ververica</groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>2.3.0</version>
</dependency>
  • 详情参见:

各模块摆脱了 scala

详情参见:

https://github.com/apache/flink/blob/release-1.15.4/docs/content.zh/release-notes/flink-1.15.md
【推荐】
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/

  • org.apache.flink:flink-clients:${flink.version}

  • flink-streaming-java:

  • org.apache.flink:flink-table-api-java-bridge

org.apache.flink:flink-table-api-java-bridge_${scala.version}:${flink.version}

  • org.apache.flink:flink-connector-kafka:${flink.version}

  • org.apache.flink:flink-runtime-web:${flink.version}

  • `org.apache.flink:flink-statebackend-rocksdb:${flink.version}``

  • org.apache.flink:flink-table-planner:${flink.version}

org.apache.flink:flink-table-planner-blink_${scala.version}:${flink.version}

  • 从 Flink 1.15 开始,发行版包含两个规划器:
  • flink-table-planner_2.12-${flink.version}.jar
    : in /opt, 包含查询规划器
  • flink-table-planner-loader-${flink.version}.jar
    【推荐】 : 默认加载
    /lib
    ,包含隐藏在隔离类路径后面的查询计划器

注意:这2个规划器(planner_2)不能同时存在于类路径中。如果将它们都加载到
/lib
表作业
中,则会失败,报错
Could not instantiate the executor. Make sure a planner module is on the classpath

Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:108)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:100)
    at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
    at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
    at table.FlinkTableTest.main(FlinkTableTest.java:15)
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.

Ambiguous factory classes are:

org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.loader.DelegateExecutorFactory
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:105)
    ... 4 more

Process finished with exit code 1
  • flink 1.14 版本以后,之前版本
    flink-table-*-blink-*
    转正。所以:
  • flink-table-planner-blink
    =>
    flink-table-planner
  • flink-table-runtime-blink
    =>
    flink-table-runtime

停止支持 scala 2.11,但支持 2.12

scala.version = 2.12
flinkversion = 1.15.4

  • org.apache.flink:flink-connector-hive_${scala.version}:${flink.version}

  • org.apache.flink:flink-table-api-java-bridge_${scala.version}:${flink.version}

相比 flink 1.12.6 时:
org.apache.flink:flink-table-api-java-bridge_${scala.version=2.11}:${flink.version=1.12.6}

  • 若报下列错误,即:版本不同引起的包冲突。

NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/collect/Lists

原因: flink 1.16、1.15 、1.12.6 等版本使用的 flink-shaded-guava 版本基本不一样,且版本不兼容,需要修改 cdc 中的 flink-shaded-guava 版本。

  • 不同flink版本对应
    flink-shaded-guava
    模块的版本
  • flink 1.12.6 : flink-shaded-guava
    18.0-12.0
  • flink 1.15.4 : flink-shaded-guava
    30.1.1-jre-15.0
  • flink 1.16.0 : flink-shaded-guava
    30.1.1-jre-16.0

  • 如果工程内没有主动引入
    org.apache.flink:flink-shaded-guava
    工程,则无需关心此问题————
    flink-core
    /
    flink-runtime
    /
    flink-clients
    等模块内部会默认引入正确的版本

flink 1.15.4

flink 1.12.6

MySQL JDBC Version :
≥ 8.0.16
=>
≥8.0.27

  • 版本依据: Apache Flink CDC 官网

针对报错:
Caused by: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;

如果MySQL是8.0,fink cdc 2.1 之后由
debezium
连接器引起的问题。

  • 将依赖改为8.0.21之后:
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.32</version>
</dependency>

应用程序的源代码调整

KafkaRecordDeserializer : 不再存在/不再被支持(flink1.13.0及之后),并替换为
KafkaDeserializationSchema

KafkaSourceBuilder
创建本对象的语法稍有变化

  • org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer
    | flink-connector-kafka_2.11 : 1.12.6

https://github.com/apache/flink/blob/release-1.12.7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializer.java

  • flink 1.13.0 : 不再存在/不再支持 KafkaRecordDeserializer

https://github.com/apache/flink/tree/release-1.13.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer

  • flink 14.0

https://github.com/apache/flink/tree/release-1.14.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer

  • flink 1.15.4

https://github.com/apache/flink/tree/release-1.15.4/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java

  • flink-connector-kafka : 3.0.0 | 了解即可,暂无需被此工程干扰上面思路

https://github.com/apache/flink-connector-kafka/blob/v3.0.0/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java

  • 改造原因、改造思路

在 Apache Flink 1.13.0起,
KafkaRecordDeserializer
已被弃用、并被移除。
如果你正在使用的是Flink的旧版本,并且你看到了
KafkaRecordDeserializer
的提示,你应该将其替换为使用
KafkaDeserializationSchema
【推荐】或
KafkaDeserializer

KafkaDeserializationSchema
相比
KafkaRecordDeserializer
,多了需要强制实现的2个方法:

  • boolean isEndOfStream(T var1)
    : 默认返回 false 即可
  • T deserialize(ConsumerRecord<byte[], byte[]> var1)
    : 老方法
    void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
    内部调用的即本方法
// flink 1.15.4
//org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema

package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;

@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    default void open(DeserializationSchema.InitializationContext context) throws Exception {
    }

    boolean isEndOfStream(T var1);

    T deserialize(ConsumerRecord<byte[], byte[]> var1) throws Exception;//方法1

    default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {//方法2
        T deserialized = this.deserialize(message);// 复用/调用的方法1
        if (deserialized != null) {
            out.collect(deserialized);
        }
    }
}

故新适配新增的
T deserialize(ConsumerRecord<byte[], byte[]> var1)
方法是很容易的:

import com.xxx.StringUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
//import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;

//public class MyKafkaRecordDeserializer implements KafkaRecordDeserializer<Tuple2<String, String>> {
public class MyKafkaRecordDeserializer implements KafkaDeserializationSchema<Tuple2<String, String>> {
/*    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        KafkaDeserializationSchema.super.open(context);
    }*/

    @Override
    public boolean isEndOfStream(Tuple2<String, String> stringStringTuple2) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {//适配新方法1 | 强制
        if(consumerRecord.key() == null){
            return new Tuple2<>("null", StringUtils.bytesToHexString(consumerRecord.value()) );
        }
        return new Tuple2<>( new String(consumerRecord.key() ) , StringUtils.bytesToHexString(consumerRecord.value() ) );
    }

//    @Override
//    public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, String>> collector) throws Exception {//适配老方法2 | 非强制
//        collector.collect(new Tuple2<>(consumerRecord.key() == null ? "null" : new String(consumerRecord.key()), StringUtils.bytesToHexString(consumerRecord.value())));
//    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
}

使用本类、创建本类对象的方式,也稍有变化:

// org.apache.flink.connector.kafka.source.KafkaSourceBuilder | flink-connector-kafka:1.15.4
KafkaSourceBuilder<Tuple2<String, String>> kafkaConsumerSourceBuilder = KafkaSource.<Tuple2<String, String>>builder()
	.setTopics(canTopic)
	.setProperties(kafkaConsumerProperties)
	.setClientIdPrefix(Constants.JOB_NAME + "#" + System.currentTimeMillis() + "")
	.setDeserializer( KafkaRecordDeserializationSchema.of(new MyKafkaRecordDeserializer()) ); // flink 1.15.4
	//.setDeserializer(new MyKafkaRecordDeserializer());// flink 1.12.6
  • 推荐文献
  • com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
    | flink cdc 1.3.0

https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java
包路径被调整、类名大小写有变化

https://github.com/apache/flink-cdc/blob/release-2.0.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlSource.java
com.ververica.cdc.connectors.mysql.MySqlSource
自 flink cdc 2.1.0 及之后
被建议弃用
、但
com.ververica.cdc.connectors.mysql.source.MySqlSource
被推荐可用
https://github.com/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlSource.java
Flink CDC这个MySqlSource弃用了,还有别的方式吗? - aliyun
【推荐】

有两个MysqlSource,一个是弃用的,另一个是可用的,包名不同。
com.ververica.cdc.connectors.mysql.source
这个包下的是可用的。

  • com.ververica.cdc.connectors.mysql.source.MySqlSource
    | flink cdc 2.3.0

https://github.com/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlSource.java

serverId : 如果选择新的
MySqlSource
类,则:其设置入参稍有变化
  • com.alibaba.ververica.cdc.connectors.mysql.MySQLSource#serverId()
    | flink cdc 1.3.0

https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java

  • com.ververica.cdc.connectors.mysql.source.MySqlSource
    | flink cdc 2.1.0 、 2.3.0 【被推荐使用】

https://github.com/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java

没有
serverId
方法
https://github.com/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java

serverId
方法,通过
MySqlSource.<String>builder()

MySqlSourceBuilder

/**
 * A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like
 * '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is
 * required when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all
 * currently-running database processes in the MySQL cluster. This connector joins the MySQL
 * cluster as another server (with this unique ID) so it can read the binlog. By default, a
 * random number is generated between 5400 and 6400, though we recommend setting an explicit
 * value."
 */
public MySqlSourceBuilder<T> serverId(String serverId) {
	this.configFactory.serverId(serverId);
	return this;
}
  • com.ververica.cdc.connectors.mysql.source.MySqlSource#serverId(int serverId)
    | flink cdc 2.1.0 【被建议弃用】、flink cdc 2.3.0 【被废止/无法用】

https://github.com/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlSource.java

/**
 * A numeric ID of this database client, which must be unique across all currently-running
 * database processes in the MySQL cluster. This connector joins the MySQL database cluster
 * as another server (with this unique ID) so it can read the binlog. By default, a random
 * number is generated between 5400 and 6400, though we recommend setting an explicit value.
 */
public Builder<T> serverId(int serverId) {
	this.serverId = serverId;
	return this;
}
  • 改造Demo: flink 1.3.0
SourceFunction<String> mySqlSource = 
	MySqlSource.<String>builder()
	//数据库地址
	.hostname(jobParameterTool.get("cdc.mysql.hostname"))
	//端口号
	.port(Integer.parseInt(jobParameterTool.get("cdc.mysql.port")))
	//用户名
	.username(jobParameterTool.get("cdc.mysql.username"))
	//密码
	.password(jobParameterTool.get("cdc.mysql.password"))
	//监控的数据库
	.databaseList(jobParameterTool.get("cdc.mysql.databaseList"))
	//监控的表名,格式数据库.表名
	.tableList(jobParameterTool.get("cdc.mysql.tableList"))
	//虚拟化方式
	.deserializer(new MySQLCdcMessageDeserializationSchema())
	//时区
	.serverTimeZone("UTC")
	.serverId( randomServerId(5000, Constants.JOB_NAME + "#xxxConfig") )
	.startupOptions(StartupOptions.latest())
	.build();


public static Integer randomServerId(int interval, String jobCdcConfigDescription){
	//startServerId ∈[ interval + 0, interval + interval)
	//int serverId = RANDOM.nextInt(interval) + interval; // RANDOM.nextInt(n) : 生成介于 [0,n) 区间的随机整数
	//serverId = [ 7000 + 0, Integer.MAX_VALUE - interval)
	int serverId = RANDOM.nextInt(Integer.MAX_VALUE - interval - 7000) + 7000;
	log.info("Success to generate random server id result! serverId : {}, interval : {}, jobCdcConfigDescription : {}"
			, serverId , interval , jobCdcConfigDescription );
	return serverId;
}
  • 改造Demo: flink 2.3.0
MySqlSource<String> mySqlSource = 
	MySqlSource.<String>builder()
	//数据库地址
	.hostname(jobParameterTool.get("cdc.mysql.hostname"))
	//端口号
	.port(Integer.parseInt(jobParameterTool.get("cdc.mysql.port")))
	//用户名
	.username(jobParameterTool.get("cdc.mysql.username"))
	//密码
	.password(jobParameterTool.get("cdc.mysql.password"))
	//监控的数据库
	.databaseList(jobParameterTool.get("cdc.mysql.databaseList"))
	//监控的表名,格式数据库.表名
	.tableList(jobParameterTool.get("cdc.mysql.tableList"))
	//虚拟化方式
	.deserializer(new MySQLCdcMessageDeserializationSchema())
	//时区
	.serverTimeZone("UTC")
	.serverId( randomServerIdRange(5000, Constants.JOB_NAME + "#xxxConfig") )
	.startupOptions(StartupOptions.latest())
	.build();


//新增强制要求: interval >= 本算子的并行度
public static String randomServerIdRange(int interval, String jobCdcConfigDescription){
	// 生成1个起始随机数 |
	//startServerId = [interval + 0, interval + interval )
	//int startServerId = RANDOM.nextInt(interval) + interval; // RANDOM.nextInt(n) : 生成介于 [0,n) 区间的随机整数
	//startServerId = [ 7000 + 0, Integer.MAX_VALUE - interval)
	int startServerId = RANDOM.nextInt(Integer.MAX_VALUE - interval - 7000) + 7000;

	//endServerId ∈ [startServerId, startServerId + interval];
	int endServerId = startServerId + interval;
	log.info("Success to generate random server id result! startServerId : {},endServerId : {}, interval : {}, jobCdcConfigDescription : {}"
			, startServerId, endServerId , interval , jobCdcConfigDescription );
	return String.format("%d-%d", startServerId, endServerId);
}
MySQLSourceBuilder#build 方法: 返回类型存在变化:
SourceFunction/DebeziumSourceFunction<T>
=>
MySqlSource<T>
  • org.apache.flink.streaming.api.functions.source.SourceFunction
    =>
    com.ververica.cdc.connectors.mysql.source.MySqlSource
//com.alibaba.ververica.cdc.connectors.mysql.MySQLSource.Builder#build | flink cdc 1.3.0
// 返回: com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
// public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T>
//public abstract class org.apache.flink.streaming.api.functions.source.RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT>
public DebeziumSourceFunction<T> build() {
	Properties props = new Properties();
	props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
	props.setProperty("database.server.name", "mysql_binlog_source");
	props.setProperty("database.hostname", (String)Preconditions.checkNotNull(this.hostname));
	props.setProperty("database.user", (String)Preconditions.checkNotNull(this.username));
	props.setProperty("database.password", (String)Preconditions.checkNotNull(this.password));
	props.setProperty("database.port", String.valueOf(this.port));
	props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
	if (this.serverId != null) {
		props.setProperty("database.server.id", String.valueOf(this.serverId));
	}
	...
}


//com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder#build | flink cdc 2.3.0
//// 返回: 
public MySqlSource<T> build() {
	return new MySqlSource(this.configFactory, (DebeziumDeserializationSchema)Preconditions.checkNotNull(this.deserializer));
}
  • 使用变化Demo: Flink cdc 1.3.0

mysqlSource 想要监听 mysql 表结构变更(例如:添加新的字段),要怎么办?设置 - aliyun

Properties properties = new Properties();
properties.setProperty("database.hostname", "localhost");
properties.setProperty("database.port", "3306");
properties.setProperty("database.user", "your_username");
properties.setProperty("database.password", "your_password");
properties.setProperty("database.server.id", "1"); // 设置唯一的 server id
properties.setProperty("database.server.name", "mysql_source");

DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .username("your_username")
    .password("your_password")
    .databaseList("your_database")
    .tableList("your_table")
    .includeSchemaChanges(true) // 开启监听表结构变更
    .deserializer(new StringDebeziumDeserializationSchema())
    .build();

DataStreamSource<String> stream = env.addSource(sourceFunction);//可以使用 addSource

stream.print();
env.execute("MySQL CDC Job");
  • 使用变化Demo: Flink cdc 2.3.0

https://flink-tpc-ds.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc(ZH).html
无法使用
env.addSource(SourceFunction, String sourceName)
,只能使用
env.fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
        .tableList("yourDatabaseName.yourTableName") // 设置捕获的表
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 设置 3s 的 checkpoint 间隔
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // 设置 source 节点的并行度为 4
      .setParallelism(4)
      .print().setParallelism(1); // 设置 sink 节点并行度为 1 

    env.execute("Print MySQL Snapshot + Binlog");
  }
}

StartupOptions : 包路径被调整(2.0.0及之后)

  • import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
    | flink 1.3.0

https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/StartupOptions.java

  • com.ververica.cdc.connectors.mysql.table.StartupOptions
    | flink 2.3.0

https://github.com/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupOptions.java

  • com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema
    | flink cdc 1.3.0

com.ververica:flink-connector-debezium:1.3.0
https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumDeserializationSchema.java

  • com.ververica.cdc.debezium.DebeziumDeserializationSchema
    | flink cdc 2.3.0

com.ververica:flink-connector-debezium:2.3.0
https://github.com/apache/flink-cdc/blob/release-2.3.0/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumDeserializationSchema.java

X 参考文献

  • com.alibaba.ververica:flink-connector-mysql-cdc:1.3.0

https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/pom.xml
【推荐】 Flink 1.12.6

  • com.ververica:flink-connector-mysql-cdc:2.0

MYSQL (Database: 5.7, 8.0.x / JDBC Driver: 8.0.16 ) | Flink 1.12 + | JDK 8+
https://github.com/apache/flink-cdc/tree/release-2.0
https://github.com/apache/flink-cdc/blob/release-2.0/flink-connector-mysql-cdc/pom.xml

  • com.ververica:flink-connector-mysql-cdc:2.3.0

https://github.com/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/pom.xml
【推荐】 Flink 1.15.4

  • org.apache.flink:flink-connector-mysql-cdc:${flink.cdc.version}
  • apache flink
  • apache flink-connector-kafka

本文分享自华为云社区
《【华为云MySQL技术专栏】MySQL 8.0 EXPLAIN ANALYZE 工具介绍》
,作者:GaussDB 数据库。

1. EXPLAIN ANALYZE可以解决什么问题

MySQL 8.0.18 版本开始支持查询分析工具EXPLAIN ANALYZE,该工具不仅会实际执行SQL语句,还会展示SQL语句详细的执行信息,包含执行算子(Iterator)粒度的扫描行数、执行耗时、迭代次数等信息。

EXPLAIN ANALYZE工具是MySQL EXPLAIN FORMAT=TREE 功能的扩展,除了展示执行计划和代价估算之外,还提供了细粒度执行算子的耗时等信息。这使得DBA和开发人员能够基于代价估算和算子实际执行耗时信息,判断执行计划是否合理,并识别出后续的优化点。

2. EXPLAIN ANALYZE如何使用

以TPC-H基准测试中的Q14 查询为例,该SQL为两个表的连接及GROUP BY聚合操作,用于统计发货日志在1996年1月的促销商品收入占比 。

select
	100.00 * sum(case
		when p_type like 'PROMO%'
			then l_extendedprice * (1 - l_discount)
		else 0
	end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from
	lineitem,
	part
where
	l_partkey = p_partkey
	and l_shipdate >= '1996-01-01'
	and l_shipdate < date_add( '1996-01-01', interval '1' month);

通过EXPLAIN FORMAT=TREE
语句,可以看出执行计划和代价估算信息:

-> Aggregate: sum((lineitem.L_EXTENDEDPRICE * (1 - lineitem.L_DISCOUNT))), sum((case when (part.P_TYPE like 'PROMO%') then (lineitem.L_EXTENDEDPRICE * (1 - lineitem.L_DISCOUNT)) else 0 end))
  -> Nested loop inner join (cost=83997.65 rows=66041)
      -> Filter: ((lineitem.L_SHIPDATE >= DATE'1996-01-01') and (lineitem.L_SHIPDATE < <cache>(('1996-01-01' + interval '1' month)))) (cost=60883.30 rows=66041)
          -> Table scan on lineitem (cost=60883.30 rows=594488)
      -> Single-row index lookup on part using PRIMARY (P_PARTKEY=lineitem.L_PARTKEY) (cost=0.25 rows=1)

通过EXPLAIN ANALYZE
语句,可以看出每个算子详细的执行信息,如下:

-> Aggregate: sum((lineitem.L_EXTENDEDPRICE * (1 - lineitem.L_DISCOUNT))), sum((case when (part.P_TYPE like 'PROMO%') then (lineitem.L_EXTENDEDPRICE * (1 - lineitem.L_DISCOUNT)) else 0 end)) (actual time=203.753..203.753 rows=1 loops=1)
  -> Nested loop inner join (cost=83997.65 rows=66041) (actual time=0.056..200.386 rows=7884 loops=1)
      -> Filter: ((lineitem.L_SHIPDATE >= DATE'1996-01-01') and (lineitem.L_SHIPDATE < <cache>(('1996-01-01' + interval '1' month)))) (cost=60883.30 rows=66041) (actual time=0.042..183.957 rows=7884 loops=1)
          -> Table scan on lineitem (cost=60883.30 rows=594488) (actual time=0.039..140.993 rows=600572 loops=1)
      -> Single-row index lookup on part using PRIMARY (P_PARTKEY=lineitem.L_PARTKEY) (cost=0.25 rows=1) (actual time=0.002..0.002 rows=1 loops=7884)

相比EXPLAIN FORMAT=TREE
,EXPLAIN ANALYZE会
实际执行SQL语句,并统计每个算子的详细耗时信息,每个算子额外提供如下信息:

(actual time=m_start..m_end rows=m_rows loops=m_loops)
  • m_start: 该算子返回第一行数据的实际时间(毫秒)

  • m_end: 该算子返回所有数据的实际时间(毫秒)

  • m_rows: 该算子实际的返回行数

  • m_loops: 该算子实际的迭代次数

例如,Filter
算子过滤lineitem
表的L_SHIPDATE
字段在 ['1996-01-01', '1996-02-01') 区间的数据。

Filter: ((lineitem.L_SHIPDATE >= DATE'1996-01-01') and (lineitem.L_SHIPDATE < <cache>(('1996-01-01' + interval '1' month))))
(cost=60883.30 rows=66041)
(actual time=0.042..183.957 rows=7884 loops=1)

优化器基于统计信息估算出的代价为 60883.30,预测返回行数为 66041;然而,实际执行后发现,真实的返回行数为7884。其中,Filter算子过滤掉了592688行 (600572 - 7884)。迭代次数为1(对应于Nested Loop Join中外表的扫描次数),返回给上层算子(Nested loop inner join)第一行数据的时间为 0.
042 毫秒
,返回给上层算子所有数据的时间为 183.
957 毫秒

例如,点查算子Single-
row
index
lookup
on
part
using
PRIMARY,作为Nested loop inner join的内表,通过条件part.p_partkey = lineitem.l_partkey循环获取满足条件的行。

Single-row index lookup on part using PRIMARY (P_PARTKEY=lineitem.L_PARTKEY)
(cost=0.25 rows=1)
(actual time=0.002..0.002 rows=1 loops=7884)

优化器估算出的代价为0.25,预测返回行数为 1;然而,实际执行后发现,真实的返回行数为1,但迭代次数为7884,与外表FILTER
算子执行后的结果数据量相等,每次迭代只返回上层算子1行。因此,返回给上层算子(Nested loop inner join)第一行数据的时间和所有数据的时间相等,都是0.002毫秒,可以推算出内表点查的累计耗时为 15.768 毫秒(7884 * 0.002毫秒)。

基于以上分析,我们可以看出该SQL语句执行耗时约200 毫秒,lineitem表的全表扫描耗时约140 毫秒,Filter算子耗时约40 毫秒,part表循环点查约16 毫秒。

3. EXPLAIN ANALYZE源码实现

MySQL 8.0 使用火山执行引擎,火山模型是数据库系统中广泛使用的迭代模型。SQL语句经过查询解析生成抽象语法树(AST),然后经过查询优化,最终生成执行树,执行树的每个节点对应一个执行算子(Iterator)。每个算子提供了Init,Read,End接口,每个算子从子节点获取数据,执行该算子的相关工作,并返回结果给父节点。

以MySQL 8.0.22版本为例,它提供了37个执行算子来处理数据读取、多表连接、聚合操作、数据物化等多个操作场景,每个执行算子都继承自一个基类RowIterator。

例如, TableScanIterator(处理全表扫描)和 NestedLoopIterator(处理2表连接)的类图如图1所示:

图1 TableScanIterator 和 NestedLoopIterator 类图

EXPLAIN ANALYZE 工具的作用是展示SQL语句的执行计划以及详细记录各个算子的执行耗时。在MySQL 8.0中,这一功能的实现是通过新增一个接口模板类TimingIterator,将37个执行算子封装起来,以便统计各个执行算子的详细执行耗时信息。这样做的好处是实现简单,无需对所有算子单独适配,而且不影响非EXPLAIN ANALYZE语句的执行效率。

例如,全表扫描算子TableScanIterator 对应TimingIterator<TableScanIterator>,表连接算子  NestedLoopIterator 对应 TimingIterator<NestedLoopIterator>,其类图如图2所示:

图2 TimingIterator<TableScanIterator> 和 TimingIterator<NestedLoopIterator> 类图

3.1 执行树生成

数据库优化器在确定了最优的访问路径(AccessPath)之后,会通过函数 CreateIteratorFromAccessPath 生成执行树,该函数会依据算子类型,调用NewIterator函数生成对应的算子。

如果是普通DQL(SELECT)语句,则生成对应的算子;如果是 EXPLAIN ANALYZE语句,则生成一个 TimingIterator<RealIterator>Wapper对象,其真实执行算子被保存在 TimingIterator::m_iterator 中。

例如,EXPLAIN ANALYZE语句,TableScanIterator 会生成TimingIterator<TableScanIterator> 算子,NestedLoopIterator 会生成 TimingIterator<NestedLoopIterator> 算子,执行流程如图3所示。

图3 执行树生成流程

3.2 统计算子执行耗时

TimingIterator 模板类的主体实现如下表所示,执行的统计信息记录在几个私有成员变量中。

template <class RealIterator>
class TimingIterator final : public RowIterator {
 public:
  bool Init() override;
  int Read() override;
  std::string TimingString() const override; // 打印函数,输出算子执行时间信息

 private:
  uint64_t m_num_rows = 0; // 该算子累计处理的记录数
  uint64_t m_num_init_calls = 0; // 调用 Init 函数的次数
  // 返回第一行的执行时间
  steady_clock::time_point::duration m_time_spent_in_first_row{0}; 
  // 返回所有行的执行时间
  steady_clock::time_point::duration m_time_spent_in_other_rows{0};
  bool m_first_row; // 是否为第一行数据
  RealIterator m_iterator; // 真实的执行算子
};

在SQL语句实际执行过程中,通过 Init 和 Read 函数的调度来记录详细执行信息,具体实现如下:

template <class RealIterator>
bool TimingIterator<RealIterator>::Init() {
  ++m_num_init_calls;  // Init 函数的调用次数递增
  steady_clock::time_point start = now();
  bool err = m_iterator.Init(); // 调用真实执行算子的Init函数
  steady_clock::time_point end = now();
  m_time_spent_in_first_row += end - start; // 累计获取第一行数据的时间
  m_first_row = true;
  return err;
}

template <class RealIterator>
int TimingIterator<RealIterator>::Read() {
  steady_clock::time_point start = now();
  int err = m_iterator.Read(); // 调用真实执行算子的Read 函数
  steady_clock::time_point end = now();
  if (m_first_row) {
    m_time_spent_in_first_row += end - start; // 更新获取第一行数据的时间
    m_first_row = false; // 获取第一行数据结束
  } else {
    m_time_spent_in_other_rows += end - start; // 更新获取所有行数据的时间
  }
  if (err == 0) {
    ++m_num_rows; // 刷新该算子累计处理的记录数
  }
  return err;
}

3.3 打印算子执行耗时

SQL语句执行结束后,会调用函数  TimingIterator<RealIterator>::TimingString 打印算子执行耗时信息,调用堆栈信息如下表所示:

dispatch_command
  mysql_parse
    mysql_execute_command
      Sql_cmd_dml::execute
        Sql_cmd_dml::execute_inner
          explain_query
            ExplainIterator
              PrintQueryPlan
                ExplainAccessPath
                  TimingIterator<RealIterator>::TimingString

TimingIterator<RealIterator>::TimingString 函数,会基于执行阶段的统计打印以下信息:

  • 该算子返回第一行数据的实际时间(毫秒)

  • 该算子返回所有数据的实际时间(毫秒)

  • 该算子实际的返回行数

  • 该算子实际的迭代次数

4. 总结

综上,我们分析了MySQL 8.0 EXPLAIN ANALYZE命令的使用,并结合源码介绍其实现思路,帮助数据库使用者和开发者更好的使用、理解该功能。

当遇到慢查询时,我们也可借助于EXPLAIN ANALYZE工具观察执行计划是否合理、分析SQL执行的主要耗时点,进而去优化SQL执行。

参考资料

https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-18.html

https://dev.mysql.com/worklog/task/?id=4168

https://dev.mysql.com/doc/refman/8.0/en/using-explain.html


华为开发者空间,汇聚鸿蒙、昇腾、鲲鹏、GaussDB、欧拉等各项根技术的开发资源及工具,致力于为每位开发者提供一台云主机、一套开发工具及云上存储空间,让开发者基于华为根生态创新。
点击链接,免费领取您的专属云主机~

点击关注,第一时间了解华为云新鲜技术~

前言

本文将介绍一个专为ASP.NET Core设计的轻量级插件框架——PluginCore,该框架不仅能够简化插件的开发与集成,还能大幅提高开发效率。

另外,还将简要介绍相关的前端技术和SDK支持,帮助我们快速上手。

项目介绍

PluginCore是一个轻量级插件框架,通过最小化的配置简化插件的集成与管理,能够快速上手并专注于核心业务逻辑的开发。

该框架支持动态WebAPI、插件隔离与共享、前后端分离以及热插拔等特性,非常适合需要高度模块化与可扩展性的应用场景。

项目特点

  • 简单易用:遵循“约定优于配置”的原则,最大限度减少配置需求,让您专注于核心业务逻辑。
  • 开箱即用:前端与后端自动集成,只需几行代码即可完成整个集成流程。
  • 动态WebAPI:每个插件都可以添加新的Controller,拥有独立的路由配置。
  • 插件隔离与共享:提供完善的插件隔离机制,并支持类型共享。
  • 前后端分离:允许在插件的wwwroot文件夹中放置前端资源文件,直接通过插件ID访问。
  • 热插拔:支持在不停机的情况下上传、安装、启用、禁用、卸载和删除插件;甚至可以在运行时动态添加HTTP请求中间件。
  • 依赖注入:在实现IPlugin接口的插件类构造函数中支持DI。
  • 模块化:所有过程均模块化处理,并全面支持DI,便于替换和自定义插件机制。
  • 易扩展:支持编写插件SDK和扩展插件,提供自定义插件钩子。
  • 插件依赖树:声明式的依赖关系,自动根据依赖关系确定加载顺序。
  • 生命周期管理:可控的插件生命周期,包括事件分发机制。
  • 前端挂件:可在前端定义扩展点,并通过插件注入挂件,支持HTML/CSS/JavaScript。
  • 无数据库依赖:完全不需要数据库支持。
  • 零侵入性:对现有系统几乎没有侵入性。
  • 极少外部依赖:除用于解压缩的SharpZipLib之外,无其他第三方依赖。

项目技术

  • 后端: .NET Standard, .NET Core, .NET, ASP.NET Core
  • 前端: Vue.js, vue-i18n, Vue Router, Vuex, Element UI
  • 前端工具: Babel, Mock.js, SASS, Autoprefixer, ESLint, Axios, NPM

项目使用

项目结构

一分钟集成

推荐使用NuGet集成,在项目的根目录执行以下命令。

如果使用的是Visual Studio,可以通过"工具"->"NuGet包管理器"->"包管理控制台"来执行安装命令:

PM> Install-Package PluginCore.AspNetCore

在ASP.NET Core项目中集成

修改Startup.cs文件,添加以下代码:

usingPluginCore.AspNetCore.Extensions;public classStartup
{
public voidConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddPluginCore();
//1. 添加 PluginCore }public voidConfigure(IApplicationBuilder app, IWebHostEnvironment env)
{
if(env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}

app.UseHttpsRedirection();
app.UseRouting();

app.UsePluginCore();
//2. 使用 PluginCore app.UseAuthorization();
app.UseEndpoints(endpoints
=>{ endpoints.MapControllers(); });
}
}

完成后,访问https://localhost:5001/PluginCore/Admin即可进入PluginCore管理界面。(请将URL替换为您实际的地址)

注意

请登录PluginCore管理界面后,及时更改默认的用户名和密码:

{"Admin": {"UserName": "admin","Password": "ABC12345"},"FrontendMode": "LocalEmbedded","RemoteFrontend": "https://cdn.jsdelivr.net/gh/yiyungent/plugincore-admin-frontend@0.1.2/dist-cdn"}

更改后立即生效,无需重启站点,但需要重新登录PluginCore管理界面。

Docker体验

如果希望通过Docker体验PluginCore,可以使用以下命令:

docker run -d -p 5004:80 -e ASPNETCORE_URLS="http://*:80" --name plugincore-aspnetcore3-1 yiyungent/plugincore-aspnetcore3-1

访问 https://localhost:5001/PluginCore/Admin 进入 PluginCore 管理界面(注意将端口替换为你的实际端口)

项目效果

项目应用实例

1、yiyungent/KnifeHub

【PluginCore.AspNetCore 最佳实践】工具平台,涵盖日常生活、学习、工作及开发所需的各类工具集。

https://github.com/yiyungent/KnifeHub

2、yiyungent/Dragonfly

利用ASP.NET Core与Selenium实现的Web自动化解决方案。

https://github.com/yiyungent/Dragonfly

项目地址

GitHub:
https://github.com/yiyungent/PluginCore

在线文档:
https://yiyungent.github.io/PluginCore/zh

最后

如果你觉得这篇文章对你有帮助,不妨点个赞支持一下!你的支持是我继续分享知识的动力。如果有任何疑问或需要进一步的帮助,欢迎随时留言。

也可以加入微信公众号
[DotNet技术匠]
社区,与其他热爱技术的同行一起交流心得,共同成长!
优秀是一种习惯,欢迎大家留言学习!