2023年3月

Transformer

Transformer是完全由Attention和Self-Attention结构搭建的深度神经网络结构。

其中最为重要的就是Attention和Self-Attention结构。

Attention结构

Attention Layer接收两个输入
\(X = [x_1, x_2, x_3, ..., x_m]\)
,Decoder的输入为
\(X' = [x_1^{'}, x_2^{'}, x_3^{'}, ...,x_t^{'}]\)
,得到一个输出
\(C = [c_1, c_2, c_3, ..., c_t]\)
,包含三个参数:
\(W_Q, W_K, W_V\)

具体的计算流程为:

  1. 首先,使用Encoder的输入来计算Key和Value向量,得到m个k向量和v向量:
    \(k_{:i} = W_Kx_{:i}, v_{:i} = W_vx_{:i}\)
  2. 然后,对Decoder的输入做线性变换,得到t个q向量:
    \(q_{:j} = W_Qx_{:j}^{'}\)
  3. 计算权重:
    \(\alpha_{:1} = Softmax(K^Tq_{:1})\)
  4. 计算Context vector:
    \(c_{:1} = \alpha_{11}v_{:1} + \alpha_{21}v_{:2} + ...\alpha_{m1}v_{:m} = V\alpha_{:1} = VSoftmax(K^Tq_{:1})\)
  5. 用相同的方式计算
    \(c_2, c_3, ..., c_t\)
    ,得到
    \(C = [c_1, c_2, ..., c_t]\)

Key:表示待匹配的值,Query表示查找值,这m个
\(\alpha_{:j}\)
就说明是query(
\(q_j\)
)和所有key(
\([k_{:1}, k_{:2}, ..., k_{:m}]\)
)之间的匹配程度。匹配程度越高,权重越大。V是对输入的一个线性变化,使用权重对其进行加权平均得到相关矩阵
\(C\)
。在Attention+RNN的结构中,是对输入状态进行加权平均,这里
\(V\)
相当于对
\([h_1, h_2, ..., h_m]\)
进行线性变换。

Self-Attention结构

Attention结构接收两个输入得到一个输出,Self-Attention结构接收一个输入得到一个输出,如下图所示。中间的计算过程与Attention完全一致。

Multi-head Self-Attention

上述的Self-Attention结构被称为单头Self-Attention(Single-Head Self-Attention)结构,Multi-Head Self-Attention就是将多个Single-Head Self-Attention的结构进行堆叠,结果Concatenate到一块儿。

假如有
\(l\)
个Single-Head Self-Attention组成一个Multi-Head Self-Attention,Single-Head Self-Attention的输入为
\(X = [x_{:1}, x_{:2}, x_{:3}, ..., x_{:m}]\)
,输出为
\(C = [c_{:1}, c_{:2}, c_{:3}, ..., c_{:m}]\)
维度为
\(dm\)

则,Multi-Head Self-Attention的输出维度为
\((ld)*m\)
,参数量为
\(l\)

\(W_Q, W_K, W_V\)

\(3l\)
个参数矩阵。

Multi-Head Attention操作一致,就是进行多次相同的操作,将结果Concatenate到一块儿。

BERT:Bidirectional Encoder Representations from Transformers

BERT的提出是为了预训练Transformer的Encoder网络【BERT[4] is for pre-training Transformer's[3] encoder.】,通过两个任务(1)预测被遮挡的单词(2)预测下一个句子,这两个任务不需要人工标注数据,从而达到使用海量数据训练超级大模型的目的。

BERT有两种任务:

  • Task 1: Predict the masked word,预测被遮挡的单词

输入:the [MASK] sat on the mat

groundTruth:cat

损失函数:交叉熵损失

  • Task 2: Predict the next sentence,预测下一个句子,判断两句话在文中是否真实相邻

输入:[CLS, first sentence, SEP, second sentence]

输出:true or false

损失函数:交叉熵损失

这样做二分类可以让Encoder学习并强化句子之间的相关性。

好处:

  • BERT does not need manually labeled data. (Nice, Manual labeling is expensive.)
  • Use large-scale data, e.g., English Wikipedia (2.5 billion words)
  • task 1: Randomly mask works(with some tricks)
  • task 2: 50% of the next sentences are real. (the other 50% are fake.)
  • BERT将上述两个任务结合起来预训练Transformer模型
  • 想法简单且非常有效

消耗极大【普通人玩不起,但是BERT训练出来的模型参数是公开的,可以拿来使用】:

  • BERT Base
    • 110M parameters
    • 16 TPUs, 4 days of training
  • BERT Large
    • 235M parameters
    • 64 TPUs, 4days of training

Summary

Transformer:

  • Transformer is a Seq2Seq model, it has an encoder and a decoder
  • Transformer model is not RNN
  • Transfomer is purely based on attention and dense layers(全连接层)
  • Transformer outperforms all the state-of-the-art RNN models

Attention的发展:

  • Attention was originally developed for Seq2Seq RNN models[1].
  • Self-Attention: attention for all the RNN models(not necessarily for Seq2Seq models)[2].
  • Attention can be used without RNN[3].

Reference

王树森的Transformer模型
[1] Bahdanau, Cho, & Bengio,
Neural machine translation by jointly learning to align and translate
. In
ICLR
, 2015.
[2] Cheng, Dong, & Lapata. Long Short-Term Memory-Networks for Machine Reading. In
EMNLP
, 2016.
[3] Vaswani et al.
Attention Is All You Need
. In
NIPS
, 2017.
[4] Devlin, Chang, Lee, and Toutanova. BERT: Pre-training of deep bidirectional transformers for language understanding. In
ACL
, 2019.

Uniapp 和 Vue 在路由方面有相似之处,因为 Uniapp 是基于 Vue 的。Uniapp 的路由系统是通过 Vue Router 实现的,因此两者有许多相同的概念和 API。

相同点:

  • 都支持基于 URL 的路由;
  • 都可以使用嵌套路由来管理多个页面之间的关系;
  • 都支持路由参数和查询参数的传递;
  • 都支持路由守卫(beforeEach、beforeEnter、beforeLeave)等功能。

不同点:

  • Vue Router 是针对 Web 应用开发的路由库,而 Uniapp 是为多个平台(包括小程序和 H5)开发的跨端框架。因此,Uniapp 的路由系统需要考虑不同平台的差异,例如小程序中没有浏览器的历史记录和前进后退功能;
  • 在使用 Uniapp 的时候,不需要手动配置路由,而是通过编写 pages.json 文件来配置页面路径和样式等信息。这一点与 Vue Router 的使用方式略有不同;
  • Uniapp 的路由跳转方法有些不同,例如 navigateTo、redirectTo、switchTab、reLaunch 等方法,这些方法都是针对不同场景的不同跳转方式。
  • 因此,Uniapp 和 Vue 在路由方面有相似之处,但是由于 Uniapp 的跨端特性和不同平台的差异,它们在具体实现和使用方式上也有些不同。


一. Uniapp路由的使用方式:
Uniapp 的路由使用方法分为两部分,一是在 pages.json 文件中配置页面路径和样式等信息,二是通过 uni.navigateTo、uni.redirectTo、uni.switchTab、uni.reLaunch 等方法进行页面跳转。下面详细介绍一下这两部分的内容。

配置页面路径和样式
在 Uniapp 中,我们可以在 pages.json 文件中配置页面路径和样式等信息。这个文件默认位于项目的根目录下,它是一个 JSON 格式的配置文件,用于描述所有页面的路径、样式、配置等信息。例如:

{
  "pages": [
    {
      "path": "pages/index/index",
      "style": {
        "navigationBarTitleText": "首页"
      }
    },
    {
      "path": "pages/detail/detail",
      "style": {
        "navigationBarTitleText": "详情页"
      }
    }
  ],
  "tabBar": {
    "list": [
      {
        "pagePath": "pages/index/index",
        "text": "首页",
        "iconPath": "static/tab-bar/home.png",
        "selectedIconPath": "static/tab-bar/home-selected.png"
      },
      {
        "pagePath": "pages/mine/mine",
        "text": "我的",
        "iconPath": "static/tab-bar/mine.png",
        "selectedIconPath": "static/tab-bar/mine-selected.png"
      }
    ]
  }
}

上面的代码片段中,我们定义了两个页面,分别是首页和详情页,它们的路径分别是 pages/index/index 和 pages/detail/detail,同时还定义了这两个页面的导航栏标题。另外,我们还定义了一个名为 tabBar 的对象,表示底部导航栏的样式和配置信息。这个对象中包含一个名为 list 的数组,数组中的每个对象表示一个底部导航栏项。

跳转页面
在 Uniapp 中,我们可以通过 uni.navigateTo、uni.redirectTo、uni.switchTab、uni.reLaunch 等方法进行页面跳转。这些方法都是基于页面路径进行跳转的,它们的区别在于跳转的方式和效果不同。例如:

  • uni.navigateTo:保留当前页面,跳转到应用内的某个页面。新打开的页面可以通过 uni.navigateBack 方法返回到原来的页面。
  • uni.redirectTo:关闭当前页面,跳转到应用内的某个页面。新打开的页面不可以通过 uni.navigateBack 方法返回到原来的页面。
  • uni.switchTab:关闭所有页面,跳转到应用内的某个页面。新打开的页面是底部导航栏中的一个页面。
  • uni.reLaunch:关闭所有页面,打开应用内的某个页面。

例如,以下代码示例演示如何通过 uni.navigateTo 方法跳转到详情页:

// 在 index 页面中跳转到详情页
uni.navigateTo({
  url: '/pages/detail/detail'
})

上面的代码片段中,我们通过 uni.navigateTo 方法跳转到了详情页,其中 url 参数表示要跳转的页面路径。这个路径应该是 pages.json 文件中定义的页面路径,可以是相对路径或绝对路径。

另外,如果要在跳转时传递参数,可以在路径后面加上查询字符串,例如:

// 在 index 页面中跳转到详情页,并传递参数
uni.navigateTo({
  url: '/pages/detail/detail?id=123'
})

在详情页中可以通过 this.$route.query.id 获取参数值。
除了以上方法之外,还可以使用 uni.navigateBack 方法返回上一个页面。例如:

// 在详情页中返回上一个页面
uni.navigateBack()

上面的代码片段中,我们调用了
uni.navigateBack
方法返回上一个页面。

二. vue中路由的使用方法

1. 安装Vue Router
在使用Vue.js的路由功能之前,需要先安装Vue Router。可以使用npm或者yarn安装:

npm install vue-router
# 或者
yarn add vue-router

2. 创建路由
在Vue.js中,我们需要先创建一个路由实例,然后定义路由映射关系。路由实例可以通过Vue Router提供的Vue.use()方法进行创建:

import Vue from 'vue';
import VueRouter from 'vue-router';
import Home from './views/Home.vue';
import About from './views/About.vue';

Vue.use(VueRouter);

const router = new VueRouter({
  mode: 'history',
  routes: [
    {
      path: '/',
      name: 'home',
      component: Home
    },
    {
      path: '/about',
      name: 'about',
      component: About
    }
  ]
});

export default router;

在上述代码中,我们首先引入了Vue.js和Vue Router模块,并使用Vue.use()方法注册Vue Router。接着,我们创建了一个路由实例,并定义了两个路由映射关系。其中,路由映射关系包括路由路径(path)、路由名称(name)和路由组件(component)。

3. 添加路由实例
在创建了路由实例后,我们需要将其添加到Vue.js的根实例中,以便在整个应用程序中使用路由功能。可以在main.js文件中进行添加:

import Vue from 'vue';
import App from './App.vue';
import router from './router';

new Vue({
  router,
  render: h => h(App),
}).$mount('#app');

在上述代码中,我们首先引入了Vue.js的根组件(App.vue)和路由实例(router),然后将路由实例作为根实例的一个选项进行添加。

4. 创建路由组件
在定义路由映射关系时,需要指定路由组件。路由组件是一个普通的Vue.js组件,用于渲染对应的路由页面。可以在components文件夹下创建路由组件,例如:

<template>
  <div>
    <h1>About Page</h1>
  </div>
</template>

<script>
export default {
  name: 'About'
}
</script>

在上述代码中,我们创建了一个名为About的组件,用于渲染关于页面。

5. 使用路由
在上述步骤完成后,就可以在Vue.js应用程序中使用路由了。可以在Vue.js组件中使用$route和$router来访问当前路由信息和导航功能,例如:

<template>
  <div>
    <h1>{{ $route.name
}}</h1>
<p>Welcome to {{ $route.name }} page</p>
<router-link to="/">Home</router-link>
<router-link to="/about">About</router-link>

  </div>
</template>
<script>
export default {
  name: 'App'
}
</script>

在上述代码中,我们使用$route.name访问当前路由名称,并在页面中显示。同时,使用<router-
link>标签实现页面之间的导航。to属性用于指定目标路由路径。 除了使用<router-
link>标签进行导航,还可以使用$router.push()方法进行编程式导航。例如:

this.$router.push('/about');

上述代码用于在当前页面中进行路由跳转,进入关于页面。

6. 使用路由守卫
Vue Router还提供了路由守卫功能,用于在导航过程中进行控制和过滤。可以使用beforeEach()方法实现全局路由守卫,例如:

router.beforeEach((to, from, next) => {
  if (to.name === 'about' && !isAuthenticated) next({ name: 'home' })
  else next()
})

在上述代码中,我们定义了一个全局路由守卫,用于在进入关于页面之前进行身份验证。如果当前用户没有登录,则自动跳转到主页。

除了全局路由守卫外,Vue Router还提供了其他几种路由守卫,例如beforeEnter()、beforeRouteEnter()和beforeRouteLeave()等,用于在特定路由进入或离开时进行控制和过滤。

总之,Uniapp 的路由使用方法与 Vue 的路由使用方法基本相似,都是通过配置文件和方法进行跳转,只不过在具体实现细节上有些不同。

概述

KCP协议结合了TCP和UDP协议的特点,是一个快速可靠的协议。
引述官方介绍:

KCP是一个快速可靠协议,能以比 TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发,需要使用者自己定义下层数据的发送方式,以 callback的方式提供给 KCP。连时钟都需要外部传递进来,内部不会有任何一次系统调用。
TCP是为流量设计的(每秒内可以传输多少KB的数据),讲究的是充分利用带宽。而 KCP是为流速设计的(单个数据从一端发送到一端需要多少时间),以10%-20%带宽浪费的代价换取了比 TCP快30%-40%的传输速度。

传统的TCP/UDP协议见参考链接。

协议格式

| 4bit conv | 1bit cmd | 1bit frg | 2bit wnd |
| 4bit ts | 4bit sn |
| 4bit una | 4bit len |
| anybit 数据 |

  • conv:会话序号,通信双方一致
  • cmd:报文类型
    • IKCP_CMD_ACK 确认
    • IKCP_CMD_PUSH 数据推送
    • IKCP_CMD_WASK 接收窗口查询大小
    • IKCP_CMD_WINS 接收窗口大小告知
  • wnd: 己方可用接收窗口大小,接收窗口大小 - 接收队列大小
  • frg:包分片(数量)
  • sn: 包分片序号
  • ts: 时间戳,用于计算RTO和RTT
  • una:待接收的序列号(确认号),表示该序列号之前的所有报文都收到了,可以删除
  • len:用户数据长度
  • 数据:用户数据

特点

RTO不翻倍

RTO(Retransmission-TimeOut)即重传超时时间,TCP和KCP是基于ARQ协议实现的可靠性,但TCP的超时计算是RTO
2,而KCP的超时计算是RTO
1.5,也就是说假如连续丢包3次,TCP是RTO
8,而KCP则是RTO
3.375,意味着可以更快地重新传输数据。

选择性重传

tcp 丢包时会全部重传从该包开始以后的数据,而 kcp 选择性重传,只重传真正丢失的数据包。

快速重传

收到fastresend(配置)个失序报文后,不等待超时,直接重传,减少丢包等待时间。
而TCP重传模式:

  • 超时重传:超过规定的时间 RTO 则重传
  • 快速重传:收到三个冗余ACK,不去等待 RTO ,直接重传

与TCP相同,都是通过累计确认实现的,发送端发送了1,2,3,4,5几个包,然后收到远端的ACK:1,3,4,5,当收到ACK = 3时,KCP知道2被跳过1次,收到ACK = 4时,知道2被跳过了2次,此时可以认为2号丢失,不用等超时,直接重传2号包,大大改善了丢包时的传输速度。

非延迟 ACK

停等ARQ协议

image

  • A会为每个即将发送的数据编号,编号的目的是为了标识数据和给数据排序
  • A发送完数据之后,会给这次发送的数据设置一个超时计时器
  • B收到数据,将会返回一个确认,该确认也有自己的编号
  • A收到确认,将删除副本且取消超时计时器,保留副本的原因是传输可能出错
  • B收到错误的数据,或者数据在传输过程中出错,总之就是说B没有收到想要的数据
  • A在超时计时器的设置时间内没有收到确认,此时重发数据

所以可靠的TCP有32位序列号和32位确认号,TCP和UDP都有16位校验和。

连续ARQ协议

image
连续ARQ协议不会响应每个数据段,而是仅仅响应编号最大的这个数据段,表示之前的数据都收到了,这个叫做UNA模式,而停等ARQ协议可以看作是ACK模式

ACK + UNA

ARQ (自动重传请求,Automatic Repeat-reQuest)模型响应有两种方式:

  • UNA:此编号前所有包已收到
  • ACK:该编号包已收到

只用 UNA 将导致全部重传,只用 ACK 则丢失成本太高,以往协议都是二选其一。而 kcp 协议中,除去单独的 ACK 包(精确)外,所有包都有 UNA 信息

非退让流控

KCP正常模式同TCP一样使用公平退让法则,即发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让、慢启动这四要素决定,慢启动是在刚开始发送数据时让窗口缓慢扩张,退半避让是在网络拥堵时窗口大小减半,快重传是在网络恢复时及时给予响应,与之配合的就是快恢复。但传送及时性要求很高的小数据时,可选择仅用前两项来控制发送频率。以牺牲部分公平性及带宽利用率之代价,换取了流畅传输的效果。KCP 实时性好,但带宽利用率较低,因为:

  • 非退让流控,不断尝试发送数据,有效包不多
  • 每个包应答,占用一定的带宽

窗口协议中有两种:

  • 拥塞窗口:防止过多的数据注入到网络中,这样可以使网络中的路由器 和链路不至于过载。
  • 滑动窗口:接收方告知发送方自己可以接收缓冲区的大小,通常与连续ARQ协议配合使用。

结构

image
大概流程如图示,若是想深入探索底层实现,请参考源码或者参考链接中的详解

参考链接

什么是JobSystem

并行编程

在游戏开发过程中我们经常会遇到要处理大量数据计算的需求,因此为了充分发挥硬件的多核性能,我们会需要用到并行编程,多线程编程也是并行编程的一种。

线程是在进程内的,是共享进程内存的执行流,线程上下文切换的开销是相当高的,大概有2000的CPU Circle,同时会导致缓存失效,导致万级别的CPU Circle,Job System的设计使用了线程池,一开始先将大量的计算任务分配下去尽量减少线程的执行流被打断,也降低了一些thread的切换开销。

Unreal Unity大部分都是这种模型,分配了一些work thread 然后其他的线程往这个线程塞Task,相比fixed thread模式性能好一些,多出了Task的概念,Unity里称这个为Job。

建议看看
Games104并行架构部分

Unity JobSystem

通常Unity在一个线程上执行代码,该线程默认在程序开始时运行,称为
主线程
。我们在主线程使用JobSystem的API,去给worker线程下发任务,就是使用
多线程

通常Unity JobSystem会和Burst编译器一起使用,Burst会把IL变成使用LLVM优化的CPU代码,执行效率可以说大幅提升,但是使用Burst时候debug会变得困难,会缺少一些报错的堆栈,此时关闭burst可以看到一些堆栈,更方便debug。
虽然并行编程有着种种的技巧,比如,线程之间沟通交流数据有需要加锁、原子操作等等的数据交换等操作。但是Unity为了让我们更容易的编写多线程代码,

通过一些规则的制定,规避了一些复杂行为,同时也限制了一些功能,必要时这些功能也可以通过添加attribute、或者使用指针的方式来打破一些规则。
规定包括但不限于:

  • 不允许访问
    静态变量
  • 不允许在Job里调度子Job
  • 只能向Job里传递
    值类型
    ,并且是通过拷贝的方式从主线程将数据传输进Job,当Job运行结束数据会拷贝回主线程,我们可以在主线程的job对象访问Job的执行结果。
  • 不允许在Native容器里添加托管类型
  • 不允许使用指针
  • 不允许多个Job同时写入同一个地方
  • 不允许在Job里分配额外内存

可以查看
官方文档

应用场景

基本上所有需要处理数据计算的场景都可以使用,我们可以用它做大量的游戏逻辑的计算,
我们也可以用它来做一些编辑器下的工具,可以达到加速的效果。

细节

接口

unity官方提供了一系列的接口,写一个Struct实现接口便可以执行多线程代码,提供的接口包括:

  • IJob:一个线程
  • IJobParallelFor
    :多线程,使用时传入一个数组,根据数组长度会划分出任务数量,每个任务的索引就是数组元素的索引
  • IJobParallelForTransform
    :并行访问Transform组件的,这是unity自己实现的比较特殊的读写Transform信息的Job,实测下来用起来貌似worker还是一个在动,但是经过Burst编译后快不少。
  • IJobFor:几乎没用

IJobParallelFor是最常用的,对数据源中的每一项都调用一次
Execute
方法。
Execute
方法中有一个整数参数。该索引用于访问和操作作业实现中的数据源的单个元素。

容器

Job使用的数据都需要使用Unity提供的Native容器,我们在主线程将要计算的数据装进NativeContainer里然后再传进Job。
主要会使用的容器就是NativeArray,其实就是一个原生的数组类型,其他的容器这里暂时不提
这些容器还要指定分配器,分配器包括

  • Allocator.Temp
    : 最快的配置。将其用于生命周期为一帧或更少的分配。从主线程传数据给Job时,不能使用Temp分配器。
  • Allocator.TempJob
    : 分配比 慢
    Temp
    但比 快
    Persistent
    。在四帧的生命周期内使用它进行线程安全分配。
  • Allocator.Persistent
    : 最慢的分配,但只要你需要它就可以持续,如果有必要,可以贯穿应用程序的整个生命周期。它是直接调用malloc. 较长的作业可以使用此 NativeContainer 分配类型。

容器在实现Job的Struct里可以打标记,包括ReadOnly、WriteOnly,一方面可以提升性能,另一方面有时候会有读写冲突的情况,此时应该尽量多标记ReadOnly,避免一些数据冲突。

创建 使用

官方文档已经说的很好。
https://docs.unity3d.com/Manual/JobSystemCreatingJobs.html
对于ParallelFor的Schedule多了一些参数,innerloopBatchCount这个参数可以留意一下,可以理解为一个线程次性拿走多少任务。

Job之间互相依赖

https://docs.unity3d.com/Manual/JobSystemJobDependencies.html

其实执行了一个Job之后,在主线再执行另一个Job也不会性能差很多,并且易于debug,可以断点查看多个阶段执行过程中Job的数据情况,但是追求完美还是可以把依赖填上。

性能测试比较

笔者曾经做过简单的使用Job和不用Job的对比,通过打上Unity Profiler的标记,可以方便的在图表里查看运行开销。

Profiler.BeginSample("Your Target Profiler Name");
// your code
Profiler.EndSample();

IJob

using System.Collections;
using System.Collections.Generic;
using Unity.Collections;
using Unity.Jobs;

using UnityEngine;
using Unity.Burst;
[BurstCompile] 
public class JobTest : MonoBehaviour
{

    public bool useJob;
    // Update is called once per frame
    void Update()
    {
        float startTime = Time.realtimeSinceStartup;
        if (useJob)
        {
            NativeArray<int> result = new NativeArray<int>(1, Allocator.TempJob);//four frame allocate
            MyJobSystem0 job0 = new MyJobSystem0();
            job0.a = 0;
            job0.b = 1;
            job0.result = result;
            JobHandle handle = job0.Schedule();
            handle.Complete();
            result.Dispose();
            Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
        else
        {
            var index = 0;
            for(int i = 0; i < 1000000; i++)
            {
                index++;
            }
            Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
    }
    
}
[BurstCompile] 
public struct MyJobSystem0 : IJob
{
    public int a;
    public int b;
    public NativeArray<int> result;

    public void Execute()
    {
        var index = 0;
        for(int i = 0; i < 1000000; i++)
        {
            index++;
        }
        result[0] = a + b;
    }
}

使用IJob执行一项复杂的工作,没有使用job跑了2-4ms,使用job也是跑了2-4 ms,但是使用了job+burst,这个for循环的速度就变得只有0.2-0.8 ms了,burst对此优化挺大的。

IJobParallelFor

using System;
using System.Collections;
using System.Collections.Generic;
using Unity.Collections;
using Unity.Jobs;
using UnityEngine;

public class JobForTest : MonoBehaviour
{
    public bool useJob;
    public int dataCount;
    private NativeArray<float> a;

    private NativeArray<float> b;

    private NativeArray<float> result;

    private List<float> noJobA;

    private List<float> noJobB;

    private List<float> noJobResult;
    // Update is called once per frame
    private void Start()
    {
        a = new NativeArray<float>(dataCount, Allocator.Persistent);
        b = new NativeArray<float>(dataCount, Allocator.Persistent);
        result = new NativeArray<float>(dataCount, Allocator.Persistent);
        noJobA = new List<float>();
        noJobB = new List<float>();
        noJobResult = new List<float>();
        
        for (int i = 0; i < dataCount; ++i)
        {
            a[i] = 1.0f;
            b[i] = 2.0f;
            noJobA.Add(1.0f);
            noJobB.Add(2.0f);
            noJobResult.Add(0.0f);
        }
    }

    void Update()
    {
        float startTime = Time.realtimeSinceStartup;
        if (useJob)
        {
            MyParallelJob jobData = new MyParallelJob();
            jobData.a = a;  
            jobData.b = b;
            jobData.result = result;
            // 调度作业,为结果数组中的每个索引执行一个 Execute 方法,且每个处理批次只处理一项
            JobHandle handle = jobData.Schedule(result.Length, 1);
            // 等待作业完成
            handle.Complete();
            
            Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");

        }
        else
        {

            for(int i = 0; i < dataCount; i++)
            {
                noJobA[i] = 1;
                noJobB[i] = 2;
                noJobResult[i] = noJobA[i]+noJobB[i];
            }
            Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
    }

    private void OnDestroy()
    {
        // 释放数组分配的内存
        a.Dispose();
        b.Dispose();
        result.Dispose();
    }
}

// 将两个浮点值相加的作业
public struct MyParallelJob : IJobParallelFor
{
    [ReadOnly]
    public NativeArray<float> a;
    [ReadOnly]
    public NativeArray<float> b;
    public NativeArray<float> result;

    public void Execute(int i)
    {
        result[i] = a[i] + b[i];
    }
}

普通for寻找两个list,遍历list元素然后相加,数据量10万,每一个批次这里是处理1个execute, 不开job 2.48ms,开job 1.34ms,job开了burst就0.28ms。

IJobParalForTransform

using Unity.Burst;
using Unity.Collections;
using Unity.Jobs;
using Unity.Mathematics;
using UnityEngine;
using UnityEngine.Jobs;

public class TransformJobs : MonoBehaviour
{
    public bool useJob;
    public int dataCount = 100;
    //public int batchCount;
    // 用于存储transform的NativeArray
    private TransformAccessArray m_TransformsAccessArray;
    private NativeArray<Vector3> m_Velocities;

    private PositionUpdateJob m_Job;
    private JobHandle m_PositionJobHandle;
    private GameObject[] sphereGameObjects; 
    //[BurstCompile]
    struct PositionUpdateJob : IJobParallelForTransform
    {
        // 给每个物体设置一个速度
        [ReadOnly]
        public NativeArray<Vector3> velocity;

        public float deltaTime;

        // 实现IJobParallelForTransform的结构体中Execute方法第二个参数可以获取到Transform
        public void Execute(int i, TransformAccess transform)
        {
            transform.position += velocity[i] * deltaTime;
        }
    }

    void Start()
    {
        m_Velocities = new NativeArray<Vector3>(dataCount, Allocator.Persistent);

        // 用代码生成一个球体,作为复制的模板
        var sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere);
        // 关闭阴影
        var renderer = sphere.GetComponent<MeshRenderer>();
        renderer.shadowCastingMode = UnityEngine.Rendering.ShadowCastingMode.Off;
        renderer.receiveShadows = false;

        // 关闭碰撞体
        var collider = sphere.GetComponent<Collider>();
        collider.enabled = false;

        // 保存transform的数组,用于生成transform的Native Array
        var transforms = new Transform[dataCount];
        sphereGameObjects = new GameObject[dataCount];
        int row = (int)Mathf.Sqrt(dataCount);
        // 生成1W个球
        for (int i = 0; i < row; i++)
        {
            for (int j = 0; j < row; j++)
            {
                var go = GameObject.Instantiate(sphere);
                go.transform.position = new Vector3(j, 0, i);
                sphereGameObjects[i * row + j] = go;
                transforms[i*row+j] = go.transform;
                m_Velocities[i*row+j] = new Vector3(0.1f * j, 0, 0.1f * j);
            }
        }

        m_TransformsAccessArray = new TransformAccessArray(transforms);
    }

    void Update()
    {
        //float startTime = Time.realtimeSinceStartup;
        if (useJob)
        {
            // 实例化一个job,传入数据
            m_Job = new PositionUpdateJob()
            {
                deltaTime = Time.deltaTime,
                velocity = m_Velocities,
            };

            // 调度job执行
            m_PositionJobHandle = m_Job.Schedule(m_TransformsAccessArray);
            //Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
        else
        {
            for (int i = 0; i < dataCount; ++i)
            {
                sphereGameObjects[i].transform.position +=  m_Velocities[i] * Time.deltaTime;
            }
            //Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
       
    }

    // 保证当前帧内Job执行完毕
    private void LateUpdate()
    {
        m_PositionJobHandle.Complete();
    }

    // OnDestroy中释放NativeArray的内存
    private void OnDestroy()
    {
        m_Velocities.Dispose();
        m_TransformsAccessArray.Dispose();
    }
}

100+vec3,不用job 0.02ms,用job +burst 0.02ms
1600+vec3,不用job 0.31ms,用job 0.07ms +burst 0.04ms
1万+vec3,不用job 2.23ms,用job 0.35ms + burst 0.12ms
1万+float3,不用job 2.55ms,用job 0.4ms
100万+float3,不用job 199ms ,用job 40ms + burst 31ms
100万+vec3,不用job 189ms ,用job 35ms + burst 31ms

高级技巧

使用特定的数学库中的实现

unity特定的数学库中的数据类型可以获取simd优化,比如vector3就可以换成float3,但是缺少的数学库,就要自己解决了,所以我一般就vector3。

在合适的时机Schedule和Complete

拥有作业所需的数据后就立即在作业上调用
Schedule
,并仅在需要结果时才开始在作业上调用
Complete
。最好是调度当前不与正在运行的任何其他作业竞争的、不需要等待的作业。例如,
如果在一帧结束和下一帧开始之间的一段时间没有作业正在运行,并且可以接受一帧延迟,则可以在一帧结束时调度作业,并在下一帧中使用其结果
。另一方面,如果游戏占满了与其他作业的转换期,但在帧中的其他位置存在大量未充分利用的时段,那么在这个时段调度作业会更加有效。

在单线程里运行JobSystem

IJobParallelForExtensions可以调用Run方法,会将所有的Job放到一个Thread里执行,之前我们提到了Schedule的innerloopBatchCount参数,将它调到和数据源一样大,也是在一个Thread里执行,
当我们的数据量小于1000,分配线程可能都觉得费劲,用单线程的JobSystem配合Burst效果可能更好。
需要注意的是,如果我们出现了并行写入问题(多个Thread同时写一个位置),在单线程模式下是不会报错的。

使用NativeDisableUnsafePtrRestriction

打上这个标记后可以在Job里使用Unsafe代码块,使用指针
有多个好处

  • 可以不需要拷贝数组就把主线程的数据塞进子线程,对数据量大,需要频繁调用的可以考虑
  • 可以包装一些托管内存,比如我这里就包装了一个二维数组,每个containsTriangleIndex其实是一个int的NativeArray

如果struct里有NativeArray,这个struct放进NativeArray的时候会过不了安全检查。
我这里是在主线程维护好了这些动态的数组,然后再传进了这个结构的。
在unsafe代码块里,Native容器相关的API中有GetUnsafePtr可以获得指针。

SamplePointRayTriangleJob samplePointRayTriangleJob = new SamplePointRayTriangleJob();  
samplePointRayTriangleJob.meshTriangles = jobMeshTriangles;  
samplePointRayTriangleJob.randomDirs = jobRandomDirs;  
samplePointRayTriangleJob.useGrid = useGrid;  
samplePointRayTriangleJob.allStartPoints = startPoints;  
samplePointRayTriangleJob.allTriangleBoundsJobDatas = (TriangleBoundsJobData*)triangleBoundsJobDatas.GetUnsafePtr();

NativeDisableParallelForRestriction并行写入

打上这个标记后,多个Thread同时数组的同一个地方进行写入,unity不会阻拦,但是自己也要处理好逻辑问题。

举个例子:下面这篇文章里
https://blog.csdn.net/n5/article/details/123742777
在Parallel Job里面进行光栅化三角形时,多个三角形有可能并行访问depth buffer/frame buffer的相同地方。这在多线程编程中属于race conditions,Job system内部会检测出来,会直接报错。

IndexOutOfRangeException: Index 219108 is out of restricted IJobParallelFor range [4392…4392] in ReadWriteBuffer.
ReadWriteBuffers are restricted to only read & write the element at the job index. You can use double buffering strategies to avoid race conditions due to reading & writing in parallel to the same elements from a job.

NativeDisableContainerSafetyRestriction

使用这个Attribute可以在子线程分配一块内存,比如我这里每个子线程是创建了一个数组来接受光线三角形求交,一根光线击中了多少个点,一个子任务会执行许多次光线遍历Mesh

这个主要是博主在Github上学习Unity官方的MeshApiExample项目看到的案例,有点像StaticBatch
可以查看这个链接:
把整个场景的Mesh合并

DeallocateOnJobCompletion

容器在job结束之后自动释放
这个博主用的很少 基本都是主动释放
可能在用非并行Job的时候 接受外面的NativeArray后自己不想管释放之类的。
可以查看一个github上别人的案例看看:
案例

自定义Native容器

https://docs.unity3d.com/Manual/job-system-custom-nativecontainer-example.html

思考

JobSystem与ComputeShader相比 优势

JobSystem主要是利用CPU来降低计算负载,在数量级上远远比不上GPU,在前面的性能测试中数据到万以上就相当吃力了。
ComputeShader是利用GPU来降低计算负载,,现在GPU Driven的技术也逐渐越来越多。

思考这两个的取舍主要应该看业务逻辑的数据流向,如果我们的数据是从CPU发起的,那么在把数据从CPU拷贝到GPU也是肯定是不如在CPU内做拷贝要快的,
如果我们的计算的数据最后是给CPU做下步计算的,如果用GPU做计算就会出现CPU等GPU的回读问题,数据若停留在GPU,那么ComputeShader自然好。

另外就是考虑两个后端的硬件特性,CPU高主频,处理复杂的逻辑,大量的循环、分支判断上比GPU要有优势,数量级上则GPU更有优势。

最后也可以考虑一下易用性问题,如果用到了很多原本在CPU里的数学库,在JobSystem里都是可以直接用的,ComputeShader的话则需要自己实现一版,不过脚手架这种东西属于见仁见智,
只要自己方便就好。

2023.3.21
flyingziming

资源调度器是 YARN 中最核心的组件之一,它是 ResourceManager 中的一个插拔式服务组件,负责整个集群资源的管理和分配。
Yarn 默认提供了三种可用资源调度器,分别是FIFO (First In First Out )、 Yahoo! 的 Capacity Scheduler 和 Facebook 的 Fair Scheduler。
本节会重点介绍资源调度器的基本框架,在之后文章中详细介绍 Capacity Scheduler 和 Fair Scheduler。

一、基本架构

资源调度器是最核心的组件之一,并且在 Yarn 中是可插拔的,Yarn 中定义了一套接口规范,以方便用户实现自己的调度器,同时 Yarn 中自带了FIFO,CapacitySheduler, FairScheduler三种常用资源调度器。
image.png

一)资源调度模型

Yarn 采用了双层资源调度模型。

  • 第一层中,RM 中的资源调度器将资源分配给各个 AM(Scheduler 处理的部分)
  • 第二层中,AM 再进一步将资源分配给它的内部任务(不是本节关注的内容)

Yarn 的资源分配过程是
异步
的,资源调度器将资源分配给一个应用程序后,它不会立刻 push 给对应的 AM,而是暂时放到一个缓冲区中,等待 AM 通过周期性的心跳主动来取(pull-based通信模型)

  • NM 通过周期心跳汇报节点信息
  • RM 为 NM 返回一个心跳应答,包括需要释放的 container 列表等信息
  • RM 收到的 NM 信息触发一个NODE_UPDATED事件,之后会按照一定策略将该节点上的资源分配到各个应用,并将分配结果放到一个内存数据结构中
  • AM 向 RM 发送心跳,获得最新分配的 container 资源
  • AM 将收到的新 container 分配给内部任务

二)资源表示模型

NM 启动时会向 RM 注册,注册信息中包含该节点可分配的 CPU 和内存总量,这两个值均可通过配置选项设置,具体如下:

  • yarn.nodemanager.resource.memory-mb
    :可分配的物理内存总量,默认是8G
  • yarn.nodemanager.vmem-pmem-ratio
    :任务使用单位物理内存量对应最多可使用的虚拟内存,默认值是2.1,表示使用1M的物理内存,最多可以使用2.1MB的虚拟内存总量
  • yarn.nodemanager.resource.cpu-vcores
    :可分配的虚拟CPU个数,默认是8。为了更细粒度地划分CPU资源和考虑到CPU性能差异,YARN允许管理员根据实际需要和CPU性能将每个物理CPU划分成若干个虚拟CPU,而管理员可为每个节点单独配置可用的虚拟CPU个数,且用户提交应用程序时,也可指定每个任务需要的虚拟CPU数

Yarn 支持的调度语义

  • 请求某个节点上的特定资源量
  • 请求某个特定机架上的特定资源量
  • 将某些节点加入(或移除)黑名单,不再为自己分配这些节点上的资源
  • 请求归还某些资源

Yarn 不支持的调度语义
(随着 Yarn 的不断迭代,可能会在未来实现):

  • 请求任意节点上的特定资源量
  • 请求任意机架上的特定资源量
  • 请求一组或几组符合某种特质的资源
  • 超细粒度资源。比如CPU性能要求、绑定CPU等
  • 动态调整Container资源,允许根据需要动态调整Container资源量

三)资源保证机制

当单个节点的闲置资源无法满足应用的一个 container 时,有两种策略:

  • 放弃当前节点等待下一个节点;
  • 在当前节点上预留一个 container 申请,等到节点有资源时优先满足预留。

YARN 采用了第二种增量资源分配机制(当应用程序申请的资源暂时无法保证时,为应用程序预留一个节点上的资源直到累计释放的空闲资源满足应用程序需求),这种机制会造成浪费,但不会出现饿死现象

四)层级队列管理

Yarn 的队列是层级关系,每个队列可以包含子队列,用户只能将任务提交到叶子队列。管理员可以配置每个叶子队列对应的操作系统用户和用户组,也可以配置每个队列的管理员。管理员可以杀死队列中的任何应用程序,改变任何应用的优先级等。
队列的命名用
.
来连接,比如
root.A1

root.A1.B1

二、三种调度器

Yarn 的资源调度器是可以配置的,默认实现有三种
FIFO

CapacityScheduler

FairScheduler

一)FIFO

FIFO 是 Hadoop设计之初提供的一个最简单的调度机制:先来先服务。
所有任务被统一提交到一个队里中,Hadoop按照提交顺序依次运行这些作业。只有等先来的应用程序资源满足后,再开始为下一个应用程序进行调度运行和分配资源。
优点:

  • 原理是和实现简单。也不需要任何单独的配置

缺点:

  • 无法提供 QoS,只能对所有的任务按照同一优先级处理。
  • 无法适应多租户资源管理。先来的大应用程序把集群资源占满,导致其他用户的程序无法得到及时执行。
  • 应用程序并发运行程度低。

二)Capacity Scheduler

Capacity Scheduler 容量调度是 Yahoo! 开发的多用户调度器,以队列为单位划分资源。
每个队列可设定一定比例的资源最低保证和使用上限。每个用户也可设置一定的资源使用上限,以防资源滥用。并支持资源共享,将队列剩余资源共享给其他队列使用。配置文件名称为 capacity-scheduler.xml。
主要特点:

  • 容量保证:
    可为每个队列设置资源最低保证(capacity)和资源使用上限(maximum-capacity,默认100%),而所有提交到该队列的应用程序可以共享这个队列中的资源。
  • 弹性调度:
    如果队列中的资源有剩余或者空闲,可以暂时共享给那些需要资源的队列,一旦该队列有新的应用程序需要资源运行,则其他队列释放的资源会归还给该队列,从而实现弹性灵活分配调度资源,提高系统资源利用率。
  • 多租户管理:
    支持多用户共享集群资源和多应用程序同时运行。且可对每个用户可使用资源量(user-limit-factor)设置上限。
  • 安全隔离:
    每个队列设置严格的ACL列表(acl_submit_applications),用以限制可以用户或者用户组可以在该队列提交应用程序。

三)Fair Scheduler

Fair Scheduler 是 Facebook 开发的多用户调度器。设计目标是为所有的应用分配「公平」的资源(对公平的定义可以通过参数来设置)。公平不仅可以在队列中的应用体现,也可以在多个队列之间工作。
在 Fair 调度器中,我们不需要预先占用一定的系统资源,Fair 调度器会为所有运行的 job 动态的调整系统资源。如下图所示,当第一个大 job 提交时,只有这一个 job 在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair 调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
与Capacity Scheduler不同之处:
image.png

四)源码继承关系

看下面三个图中调度器的继承关系。这三个 Scheduler 都继承自
AbstractYarnScheduler
。这个抽象类又 extends AbstractService implements ResourceScheduler。继承
AbstractService
说明是一个服务,实现
ResourceScheduler
是 scheduler 的主要功能。

三者还有一些区别,
FairScheduler
没实现
Configurable
接口,少了
setConf()
方法;
FifoScheduler
不支持资源抢占,
FairScheduler
支持资源抢占却没实现
PreemptableResourceScheduler
接口。
image.png

image.png

image.png


YarnScheduler
中,定义了一个资源调度器应该实现的方法。在
AbstractYarnScheduler
中实现了大部分方法,若自己实现调度器可继承该类,将发开重点放在资源分配实现上。

public interface YarnScheduler extends EventHandler<SchedulerEvent> {
  // 获得一个队列的基本信息
  public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
      boolean recursive) throws IOException;

  // 获取集群资源
  public Resource getClusterResource();

  /**
   * AM 和资源调度器之间最主要的一个方法
   * AM 通过该方法更新资源请求、待释放资源列表、黑名单列表增减
   */
  @Public
  @Stable
  Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<ContainerId> release,
      List<String> blacklistAdditions, List<String> blacklistRemovals,
      List<UpdateContainerRequest> increaseRequests,
      List<UpdateContainerRequest> decreaseRequests);

  // 获取节点资源使用情况报告
  public SchedulerNodeReport getNodeReport(NodeId nodeId);

ResourceScheduler
本质是个事件处理器,主要处理10种事件(CapacityScheduler 还会多处理几种抢占相关的事件),可以到对应 Scheduler 的
handle()
方法中查看这些事件处理逻辑:

  • NODE_ADDED
    : 集群中增加一个节点
  • NODE_REMOVED
    : 集群中移除一个节点
  • NODE_RESOURCE_UPDATE
    : 集群中有一个节点的资源增加了
  • NODE_LABELS_UPDATE
    : 更新node labels
  • NODE_UPDATE
    : 该事件是 NM 通过心跳和 RM 通信时发送的,会汇报该 node 的资源使用情况,同时触发一次分配操作。
  • APP_ADDED
    : 增加一个Application
  • APP_REMOVED
    : 移除一个application
  • APP_ATTEMPT_ADDED
    : 增加一个application Attempt
  • APP_ATTEMPT_REMOVED
    : 移除一个application attempt
  • CONTAINER_EXPIRED
    : 回收一个超时的container

三、资源调度维度

目前有两种:
DefaultResourceCalculator

DominantResourceCalculator

  • DefaultResourceCalculator
    : 仅考虑内存资源
  • DominantResourceCalculator
    : 同时考虑内存和 CPU 资源(后续更新中支持更多类型资源,FPGA、GPU 等)。该算法扩展了最大最小公平算法(max-min fairness)。
    • 在 DRF 算法中,将所需份额(资源比例)最大的资源称为主资源,而 DRF 的基本设计思想则是将最大最小公平算法应用于主资源上,进而将多维资源调度问题转化为单资源调度问题,即 DRF 总是最大化所有主资源中最小的
    • 感兴趣的话,可到源码中
      DominantResourceCalculator#compare
      探究实现逻辑
    • 对应的论文
      《Dominant Resource Fairness: Fair Allocation of Multiple Resource Types》

(这里注意!很多文章和书中写的是「YARN 资源调度器默认采用了 DominantResourceCalculator」,实际并不是这样的!)

  • FifoScheduler
    默认使用
    DefaultResourceCalculator
    且不可更改。
  • CapacityScheduler
    是在
    capacity-scheduler.xml
    中配置
    yarn.scheduler.capacity.resource-calculator
    参数决定的。
  • FairScheduler
    才默认使用
    DominantResourceCalculator

四、资源抢占模型

这里仅简要介绍资源抢占模型,在后面的文章中会深入源码分析抢占的流程。

  • 在资源调度器中,每个队列可设置一个最小资源量和最大资源量,其中,最小资源量是资源紧缺情况下每个队列需保证的资源量,而最大资源量则是极端情况下队列也不能超过的资源使用量
  • 为了提高资源利用率,资源调度器(包括Capacity Scheduler和Fair Scheduler)会将负载较轻的队列的资源暂时分配给负载重的队列,仅当负载较轻队列突然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源分配给它。

五、总结

本文介绍了 Yarn 资源调度器的基本框架,包括基本架构,以及简要介绍三种 YARN 实现的调度器,并对资源调度维度,资源抢占模型等进行了介绍。
后续文章中将会围绕三种 YARN 调度器,深入源码进行探究。看其在源码中是如何一步步实现对应功能的。


参考文章:
《Hadoop 技术内幕:深入解析 YARN 架构设计与实现原理》第六章
深入解析yarn架构设计与技术实现-资源调度器
Yarn源码分析5-资源调度