Java线程池实现多任务并发执行

1️⃣ 创建一些任务来落地多任务并发执行

每一个数组里面的数据可以看成任务,或者是需要并发的业务接口,
数组与数组之间,可以看作为他们之间有血缘关系,简单来说就是:
taskJksj里面的10个任务执行完之后,才可以执行taskJxdx里面的4个任务,执行完
taskJxdx之后,才可以执行taskNbzz里面的2个任务

2️⃣ 创建线程池

要将taskJksj、taskJxdx、taskNbzz这几个数组中里面定义的任务通过线程池并发执行

3️⃣ ThreadPoolExecutor源码分析以及为什么不用newFixedThreadPool()和newCachedThreadPool()

1.首先为什么不用
newFixedThreadPool()和newCachedThreadPool()

点进去查看这两个方法的源码

2.为什么用ThreadPoolExecutor创建

通过上面两个例子就能看到,这俩方法很不靠谱,如果你不明白他的原理,看到项目上以前创建线程的代码就是这样的,你想都不想就copy过来,那后面绝对就是在给自己挖坑;
通过发现这俩方法,他们都是return new ThreadPoolExecutor(),所以真正的大佬其实是
ThreadPoolExecutor,他俩只是调用了
ThreadPoolExecutor而已。

  • 核心线程数:初始定义的线程数量,是绝对会开启的固定的线程数量
  • 最大线程数:当前线程池支持的最大线程数量,如果超过了这个数量那么肯定就报错了
  • 阻塞队列   :当前进来线程池的线程大于核心线程数且小于最大线程数,那么就把当前线程池的线程-核心线程数的线程放在阻塞队列里,让他等着
    假如核心线程数 2 个,最大线程数5个,阻塞队列长度  3,当前进来了4个线程,那么就将 4 - 2 = 2 个线程放在阻塞队列里面,让他先等待
  • 默认工厂  :当前进来线程池的线程大于最大线程数且小于(最大线程数+阻塞队列长度),那么就需要开放剩下的三个线程通道,让另外的3个线程通道进行工作,
    核心线程数 2 个,最大线程数5个,    阻塞队列长度3,当前进来了8个,可以看到进来了8个线程,已经满足了最大线程和阻塞队列长度之和了,简单理解就是
    现在进来的线程把目前这个线程池所有能利用的空间都占满了,只有 2个线程工作不够,需要把另外的3个(5 - 2)赶快放开让他们也工作,这个打开另外
    三个线程的这个工作就需要工厂来做,让工厂把这三个线程打开
  • 拒绝策略 :当进入线程池的线程过多,远远超过了最大线程数+阻塞队列,那么就需要拒绝这些即将要进入线程池的线程。
  • 等待时间:在等待时间段中,当线程池里面的线程都执行的差不多了,又回到了"
    进来线程池的线程大于核心线程数且小于最大线程数"时,就没有必要把5个线程通道全部打开,浪费资源,所以就把
    其    他的三个线程关掉,留2个核心的就行
  • 等待时间单位:单位,时分秒

4️⃣ 执行任务

为三个任务编写对应的执行多线程方法,写法都是一样的,重复copy即可,最后执行的效果就是

import java.util.concurrent.*;/*** @Author : YuanXin
* @create 2024/2/1 11:11
* @Description :
*/ public classMain {public static voidmain(String[] args) {

taskListImpl taskList
= newtaskListImpl();

String taskJksj
=taskList.poolExecutorJksj();

String taskJxdx
= null;if (taskJksj.equals("taskJksjSuccess")) {

taskJxdx
=taskList.poolExecutorJxdx();

}
if (taskJxdx.equals("taskJxdxSuccess")) {

taskList.poolExecutorNbzz();

}

}
}
classtaskListImpl {//创建一些任务 int[] taskJksj = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0};int[] taskJxdx = new int[]{15, 16, 17, 18};int[] taskNbzz = new int[]{101, 102};publicString poolExecutorJksj() {

ThreadPoolExecutor pool
= newThreadPoolExecutor(3,10,3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(5),
Executors.defaultThreadFactory(),
newThreadPoolExecutor.AbortPolicy()
);
//ExecutorService cachedThreadPool = Executors.newCachedThreadPool();//ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); try{for (int i = 0; i < taskJksj.length; i++) {int num =i;


pool.execute(()
->{

taskJksjPool(num);

});

}
}
catch(Exception e) {throw newRuntimeException(e);
}
finally{
pool.shutdown();
}
return "taskJksjSuccess";

}
public void taskJksjPool(intnum) {

System.out.println(Thread.currentThread().getName()
+ " " +taskJksj[num]);

}
publicString poolExecutorJxdx() {

ThreadPoolExecutor pool
= new ThreadPoolExecutor(3, 10, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5), Executors.defaultThreadFactory(), newThreadPoolExecutor.AbortPolicy());try{for (int i = 0; i < taskJxdx.length; i++) {int num =i;

pool.execute(()
->{

taskJxdxPool(num);

});

}
}
catch(Exception e) {throw newRuntimeException(e);
}
finally{
pool.shutdown();
}
return "taskJxdxSuccess";

}
public void taskJxdxPool(intnum) {

System.out.println(Thread.currentThread().getName()
+ " " +taskJxdx[num]);

}
publicString poolExecutorNbzz() {

ThreadPoolExecutor pool
= new ThreadPoolExecutor(3, 10, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5), Executors.defaultThreadFactory(), newThreadPoolExecutor.AbortPolicy());try{for (int i = 0; i < taskNbzz.length; i++) {int num =i;

pool.execute(()
->{

taskNbzzPool(num);

});

}
}
catch(Exception e) {throw newRuntimeException(e);
}
finally{
pool.shutdown();
}
return "taskNbzzSuccess";

}
public void taskNbzzPool(intnum) {

System.out.println(Thread.currentThread().getName()
+ " " +taskNbzz[num]);

}

}

标签: none

添加新评论