什么是JobSystem

并行编程

在游戏开发过程中我们经常会遇到要处理大量数据计算的需求,因此为了充分发挥硬件的多核性能,我们会需要用到并行编程,多线程编程也是并行编程的一种。

线程是在进程内的,是共享进程内存的执行流,线程上下文切换的开销是相当高的,大概有2000的CPU Circle,同时会导致缓存失效,导致万级别的CPU Circle,Job System的设计使用了线程池,一开始先将大量的计算任务分配下去尽量减少线程的执行流被打断,也降低了一些thread的切换开销。

Unreal Unity大部分都是这种模型,分配了一些work thread 然后其他的线程往这个线程塞Task,相比fixed thread模式性能好一些,多出了Task的概念,Unity里称这个为Job。

建议看看
Games104并行架构部分

Unity JobSystem

通常Unity在一个线程上执行代码,该线程默认在程序开始时运行,称为
主线程
。我们在主线程使用JobSystem的API,去给worker线程下发任务,就是使用
多线程

通常Unity JobSystem会和Burst编译器一起使用,Burst会把IL变成使用LLVM优化的CPU代码,执行效率可以说大幅提升,但是使用Burst时候debug会变得困难,会缺少一些报错的堆栈,此时关闭burst可以看到一些堆栈,更方便debug。
虽然并行编程有着种种的技巧,比如,线程之间沟通交流数据有需要加锁、原子操作等等的数据交换等操作。但是Unity为了让我们更容易的编写多线程代码,

通过一些规则的制定,规避了一些复杂行为,同时也限制了一些功能,必要时这些功能也可以通过添加attribute、或者使用指针的方式来打破一些规则。
规定包括但不限于:

  • 不允许访问
    静态变量
  • 不允许在Job里调度子Job
  • 只能向Job里传递
    值类型
    ,并且是通过拷贝的方式从主线程将数据传输进Job,当Job运行结束数据会拷贝回主线程,我们可以在主线程的job对象访问Job的执行结果。
  • 不允许在Native容器里添加托管类型
  • 不允许使用指针
  • 不允许多个Job同时写入同一个地方
  • 不允许在Job里分配额外内存

可以查看
官方文档

应用场景

基本上所有需要处理数据计算的场景都可以使用,我们可以用它做大量的游戏逻辑的计算,
我们也可以用它来做一些编辑器下的工具,可以达到加速的效果。

细节

接口

unity官方提供了一系列的接口,写一个Struct实现接口便可以执行多线程代码,提供的接口包括:

  • IJob:一个线程
  • IJobParallelFor
    :多线程,使用时传入一个数组,根据数组长度会划分出任务数量,每个任务的索引就是数组元素的索引
  • IJobParallelForTransform
    :并行访问Transform组件的,这是unity自己实现的比较特殊的读写Transform信息的Job,实测下来用起来貌似worker还是一个在动,但是经过Burst编译后快不少。
  • IJobFor:几乎没用

IJobParallelFor是最常用的,对数据源中的每一项都调用一次
Execute
方法。
Execute
方法中有一个整数参数。该索引用于访问和操作作业实现中的数据源的单个元素。

容器

Job使用的数据都需要使用Unity提供的Native容器,我们在主线程将要计算的数据装进NativeContainer里然后再传进Job。
主要会使用的容器就是NativeArray,其实就是一个原生的数组类型,其他的容器这里暂时不提
这些容器还要指定分配器,分配器包括

  • Allocator.Temp
    : 最快的配置。将其用于生命周期为一帧或更少的分配。从主线程传数据给Job时,不能使用Temp分配器。
  • Allocator.TempJob
    : 分配比 慢
    Temp
    但比 快
    Persistent
    。在四帧的生命周期内使用它进行线程安全分配。
  • Allocator.Persistent
    : 最慢的分配,但只要你需要它就可以持续,如果有必要,可以贯穿应用程序的整个生命周期。它是直接调用malloc. 较长的作业可以使用此 NativeContainer 分配类型。

容器在实现Job的Struct里可以打标记,包括ReadOnly、WriteOnly,一方面可以提升性能,另一方面有时候会有读写冲突的情况,此时应该尽量多标记ReadOnly,避免一些数据冲突。

创建 使用

官方文档已经说的很好。
https://docs.unity3d.com/Manual/JobSystemCreatingJobs.html
对于ParallelFor的Schedule多了一些参数,innerloopBatchCount这个参数可以留意一下,可以理解为一个线程次性拿走多少任务。

Job之间互相依赖

https://docs.unity3d.com/Manual/JobSystemJobDependencies.html

其实执行了一个Job之后,在主线再执行另一个Job也不会性能差很多,并且易于debug,可以断点查看多个阶段执行过程中Job的数据情况,但是追求完美还是可以把依赖填上。

性能测试比较

笔者曾经做过简单的使用Job和不用Job的对比,通过打上Unity Profiler的标记,可以方便的在图表里查看运行开销。

Profiler.BeginSample("Your Target Profiler Name");
// your code
Profiler.EndSample();

IJob

using System.Collections;
using System.Collections.Generic;
using Unity.Collections;
using Unity.Jobs;

using UnityEngine;
using Unity.Burst;
[BurstCompile] 
public class JobTest : MonoBehaviour
{

    public bool useJob;
    // Update is called once per frame
    void Update()
    {
        float startTime = Time.realtimeSinceStartup;
        if (useJob)
        {
            NativeArray<int> result = new NativeArray<int>(1, Allocator.TempJob);//four frame allocate
            MyJobSystem0 job0 = new MyJobSystem0();
            job0.a = 0;
            job0.b = 1;
            job0.result = result;
            JobHandle handle = job0.Schedule();
            handle.Complete();
            result.Dispose();
            Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
        else
        {
            var index = 0;
            for(int i = 0; i < 1000000; i++)
            {
                index++;
            }
            Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
    }
    
}
[BurstCompile] 
public struct MyJobSystem0 : IJob
{
    public int a;
    public int b;
    public NativeArray<int> result;

    public void Execute()
    {
        var index = 0;
        for(int i = 0; i < 1000000; i++)
        {
            index++;
        }
        result[0] = a + b;
    }
}

使用IJob执行一项复杂的工作,没有使用job跑了2-4ms,使用job也是跑了2-4 ms,但是使用了job+burst,这个for循环的速度就变得只有0.2-0.8 ms了,burst对此优化挺大的。

IJobParallelFor

using System;
using System.Collections;
using System.Collections.Generic;
using Unity.Collections;
using Unity.Jobs;
using UnityEngine;

public class JobForTest : MonoBehaviour
{
    public bool useJob;
    public int dataCount;
    private NativeArray<float> a;

    private NativeArray<float> b;

    private NativeArray<float> result;

    private List<float> noJobA;

    private List<float> noJobB;

    private List<float> noJobResult;
    // Update is called once per frame
    private void Start()
    {
        a = new NativeArray<float>(dataCount, Allocator.Persistent);
        b = new NativeArray<float>(dataCount, Allocator.Persistent);
        result = new NativeArray<float>(dataCount, Allocator.Persistent);
        noJobA = new List<float>();
        noJobB = new List<float>();
        noJobResult = new List<float>();
        
        for (int i = 0; i < dataCount; ++i)
        {
            a[i] = 1.0f;
            b[i] = 2.0f;
            noJobA.Add(1.0f);
            noJobB.Add(2.0f);
            noJobResult.Add(0.0f);
        }
    }

    void Update()
    {
        float startTime = Time.realtimeSinceStartup;
        if (useJob)
        {
            MyParallelJob jobData = new MyParallelJob();
            jobData.a = a;  
            jobData.b = b;
            jobData.result = result;
            // 调度作业,为结果数组中的每个索引执行一个 Execute 方法,且每个处理批次只处理一项
            JobHandle handle = jobData.Schedule(result.Length, 1);
            // 等待作业完成
            handle.Complete();
            
            Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");

        }
        else
        {

            for(int i = 0; i < dataCount; i++)
            {
                noJobA[i] = 1;
                noJobB[i] = 2;
                noJobResult[i] = noJobA[i]+noJobB[i];
            }
            Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
    }

    private void OnDestroy()
    {
        // 释放数组分配的内存
        a.Dispose();
        b.Dispose();
        result.Dispose();
    }
}

// 将两个浮点值相加的作业
public struct MyParallelJob : IJobParallelFor
{
    [ReadOnly]
    public NativeArray<float> a;
    [ReadOnly]
    public NativeArray<float> b;
    public NativeArray<float> result;

    public void Execute(int i)
    {
        result[i] = a[i] + b[i];
    }
}

普通for寻找两个list,遍历list元素然后相加,数据量10万,每一个批次这里是处理1个execute, 不开job 2.48ms,开job 1.34ms,job开了burst就0.28ms。

IJobParalForTransform

using Unity.Burst;
using Unity.Collections;
using Unity.Jobs;
using Unity.Mathematics;
using UnityEngine;
using UnityEngine.Jobs;

public class TransformJobs : MonoBehaviour
{
    public bool useJob;
    public int dataCount = 100;
    //public int batchCount;
    // 用于存储transform的NativeArray
    private TransformAccessArray m_TransformsAccessArray;
    private NativeArray<Vector3> m_Velocities;

    private PositionUpdateJob m_Job;
    private JobHandle m_PositionJobHandle;
    private GameObject[] sphereGameObjects; 
    //[BurstCompile]
    struct PositionUpdateJob : IJobParallelForTransform
    {
        // 给每个物体设置一个速度
        [ReadOnly]
        public NativeArray<Vector3> velocity;

        public float deltaTime;

        // 实现IJobParallelForTransform的结构体中Execute方法第二个参数可以获取到Transform
        public void Execute(int i, TransformAccess transform)
        {
            transform.position += velocity[i] * deltaTime;
        }
    }

    void Start()
    {
        m_Velocities = new NativeArray<Vector3>(dataCount, Allocator.Persistent);

        // 用代码生成一个球体,作为复制的模板
        var sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere);
        // 关闭阴影
        var renderer = sphere.GetComponent<MeshRenderer>();
        renderer.shadowCastingMode = UnityEngine.Rendering.ShadowCastingMode.Off;
        renderer.receiveShadows = false;

        // 关闭碰撞体
        var collider = sphere.GetComponent<Collider>();
        collider.enabled = false;

        // 保存transform的数组,用于生成transform的Native Array
        var transforms = new Transform[dataCount];
        sphereGameObjects = new GameObject[dataCount];
        int row = (int)Mathf.Sqrt(dataCount);
        // 生成1W个球
        for (int i = 0; i < row; i++)
        {
            for (int j = 0; j < row; j++)
            {
                var go = GameObject.Instantiate(sphere);
                go.transform.position = new Vector3(j, 0, i);
                sphereGameObjects[i * row + j] = go;
                transforms[i*row+j] = go.transform;
                m_Velocities[i*row+j] = new Vector3(0.1f * j, 0, 0.1f * j);
            }
        }

        m_TransformsAccessArray = new TransformAccessArray(transforms);
    }

    void Update()
    {
        //float startTime = Time.realtimeSinceStartup;
        if (useJob)
        {
            // 实例化一个job,传入数据
            m_Job = new PositionUpdateJob()
            {
                deltaTime = Time.deltaTime,
                velocity = m_Velocities,
            };

            // 调度job执行
            m_PositionJobHandle = m_Job.Schedule(m_TransformsAccessArray);
            //Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
        else
        {
            for (int i = 0; i < dataCount; ++i)
            {
                sphereGameObjects[i].transform.position +=  m_Velocities[i] * Time.deltaTime;
            }
            //Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
        }
       
    }

    // 保证当前帧内Job执行完毕
    private void LateUpdate()
    {
        m_PositionJobHandle.Complete();
    }

    // OnDestroy中释放NativeArray的内存
    private void OnDestroy()
    {
        m_Velocities.Dispose();
        m_TransformsAccessArray.Dispose();
    }
}

100+vec3,不用job 0.02ms,用job +burst 0.02ms
1600+vec3,不用job 0.31ms,用job 0.07ms +burst 0.04ms
1万+vec3,不用job 2.23ms,用job 0.35ms + burst 0.12ms
1万+float3,不用job 2.55ms,用job 0.4ms
100万+float3,不用job 199ms ,用job 40ms + burst 31ms
100万+vec3,不用job 189ms ,用job 35ms + burst 31ms

高级技巧

使用特定的数学库中的实现

unity特定的数学库中的数据类型可以获取simd优化,比如vector3就可以换成float3,但是缺少的数学库,就要自己解决了,所以我一般就vector3。

在合适的时机Schedule和Complete

拥有作业所需的数据后就立即在作业上调用
Schedule
,并仅在需要结果时才开始在作业上调用
Complete
。最好是调度当前不与正在运行的任何其他作业竞争的、不需要等待的作业。例如,
如果在一帧结束和下一帧开始之间的一段时间没有作业正在运行,并且可以接受一帧延迟,则可以在一帧结束时调度作业,并在下一帧中使用其结果
。另一方面,如果游戏占满了与其他作业的转换期,但在帧中的其他位置存在大量未充分利用的时段,那么在这个时段调度作业会更加有效。

在单线程里运行JobSystem

IJobParallelForExtensions可以调用Run方法,会将所有的Job放到一个Thread里执行,之前我们提到了Schedule的innerloopBatchCount参数,将它调到和数据源一样大,也是在一个Thread里执行,
当我们的数据量小于1000,分配线程可能都觉得费劲,用单线程的JobSystem配合Burst效果可能更好。
需要注意的是,如果我们出现了并行写入问题(多个Thread同时写一个位置),在单线程模式下是不会报错的。

使用NativeDisableUnsafePtrRestriction

打上这个标记后可以在Job里使用Unsafe代码块,使用指针
有多个好处

  • 可以不需要拷贝数组就把主线程的数据塞进子线程,对数据量大,需要频繁调用的可以考虑
  • 可以包装一些托管内存,比如我这里就包装了一个二维数组,每个containsTriangleIndex其实是一个int的NativeArray

如果struct里有NativeArray,这个struct放进NativeArray的时候会过不了安全检查。
我这里是在主线程维护好了这些动态的数组,然后再传进了这个结构的。
在unsafe代码块里,Native容器相关的API中有GetUnsafePtr可以获得指针。

SamplePointRayTriangleJob samplePointRayTriangleJob = new SamplePointRayTriangleJob();  
samplePointRayTriangleJob.meshTriangles = jobMeshTriangles;  
samplePointRayTriangleJob.randomDirs = jobRandomDirs;  
samplePointRayTriangleJob.useGrid = useGrid;  
samplePointRayTriangleJob.allStartPoints = startPoints;  
samplePointRayTriangleJob.allTriangleBoundsJobDatas = (TriangleBoundsJobData*)triangleBoundsJobDatas.GetUnsafePtr();

NativeDisableParallelForRestriction并行写入

打上这个标记后,多个Thread同时数组的同一个地方进行写入,unity不会阻拦,但是自己也要处理好逻辑问题。

举个例子:下面这篇文章里
https://blog.csdn.net/n5/article/details/123742777
在Parallel Job里面进行光栅化三角形时,多个三角形有可能并行访问depth buffer/frame buffer的相同地方。这在多线程编程中属于race conditions,Job system内部会检测出来,会直接报错。

IndexOutOfRangeException: Index 219108 is out of restricted IJobParallelFor range [4392…4392] in ReadWriteBuffer.
ReadWriteBuffers are restricted to only read & write the element at the job index. You can use double buffering strategies to avoid race conditions due to reading & writing in parallel to the same elements from a job.

NativeDisableContainerSafetyRestriction

使用这个Attribute可以在子线程分配一块内存,比如我这里每个子线程是创建了一个数组来接受光线三角形求交,一根光线击中了多少个点,一个子任务会执行许多次光线遍历Mesh

这个主要是博主在Github上学习Unity官方的MeshApiExample项目看到的案例,有点像StaticBatch
可以查看这个链接:
把整个场景的Mesh合并

DeallocateOnJobCompletion

容器在job结束之后自动释放
这个博主用的很少 基本都是主动释放
可能在用非并行Job的时候 接受外面的NativeArray后自己不想管释放之类的。
可以查看一个github上别人的案例看看:
案例

自定义Native容器

https://docs.unity3d.com/Manual/job-system-custom-nativecontainer-example.html

思考

JobSystem与ComputeShader相比 优势

JobSystem主要是利用CPU来降低计算负载,在数量级上远远比不上GPU,在前面的性能测试中数据到万以上就相当吃力了。
ComputeShader是利用GPU来降低计算负载,,现在GPU Driven的技术也逐渐越来越多。

思考这两个的取舍主要应该看业务逻辑的数据流向,如果我们的数据是从CPU发起的,那么在把数据从CPU拷贝到GPU也是肯定是不如在CPU内做拷贝要快的,
如果我们的计算的数据最后是给CPU做下步计算的,如果用GPU做计算就会出现CPU等GPU的回读问题,数据若停留在GPU,那么ComputeShader自然好。

另外就是考虑两个后端的硬件特性,CPU高主频,处理复杂的逻辑,大量的循环、分支判断上比GPU要有优势,数量级上则GPU更有优势。

最后也可以考虑一下易用性问题,如果用到了很多原本在CPU里的数学库,在JobSystem里都是可以直接用的,ComputeShader的话则需要自己实现一版,不过脚手架这种东西属于见仁见智,
只要自己方便就好。

2023.3.21
flyingziming

标签: none

添加新评论