2024年3月

Sentinel
是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量控制、熔断降级、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。
在 Sentinel 里面,所有的资源都对应一个资源名称(
resourceName
),每次资源调用都会创建一个
Entry
对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用
SphU
API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
  • NodeSelectorSlot
    负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot
    则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot
    则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot
    则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;(
    流控规则
    )
  • AuthoritySlot
    则根据配置的黑白名单和调用来源信息,来做黑白名单控制;(
    授权规则
    )
  • DegradeSlot
    则通过统计信息以及预设的规则,来做熔断降级;(
    降级规则
  • SystemSlot
    则通过系统的状态,例如 load1 等,来控制总的入口流量;(
    系统规则
    )
下面是官网的原图:
  1. Sentinel 核心类解析


    1. ProcessorSlotChain


      1. Sentinel 的核心骨架,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。核心结构:
  1. Context


    1. Context 代表调用链路上下文,贯穿一次调用链路中的所有
      Entry
      。Context 名称即为调用链路入口名称。
    2. Context 维持的方式:
      enter
      • 每一次资源调用都会创建一个
        Entry

        Entry
        包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。
      • CtEntry
        为普通的
        Entry
        ,在调用
        SphU.entry(xxx)
        的时候创建。特性:Linked entry within current context(内部维护着
        parent

        child
      • 需要注意的一点
        :CtEntry 构造函数中会做
        调用链的变换
        ,即将当前 Entry 接到传入 Context 的调用链路上(
        setUpEntryFor
        )。
      • 资源调用结束时需要
        entry.exit()
        。exit 操作会过一遍 slot chain exit,恢复调用栈,exit context 然后清空 entry 中的 context 防止重复调用。
  1. Node

Sentinel 里面的各种种类的统计节点:

  • EntranceNode

    入口节点
    ,特殊的链路节点,对应某个 Context 入口的所有调用数据。
  • DefaultNode

    链路节点
    ,用于统计调用链路上某个资源的数据,维持树状结构。
  • ClusterNode

    簇点
    ,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据
  • StatisticNode
    :最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构
例子:入现在有两个入口业务同时访问goods

业务1: controller中的资源@PostMapping("/order/query") 访问了service中的资源/goods
结果:

  • 不同的入口有不同的
    EntranceNode
  • 不同的链路有不同/goods资源都有单独的一个
    ClusterNode
    ,统计所有链路
  1. S
    PI
    扩展

Sentinel 提供多样化的 SPI 接口用于提供扩展的能力。开发者可以在用同一个
sentinel-core
的基础上自行扩展接口实现,从而可以方便地根据业务需求给 Sentinel 添加自定义的逻辑。目前 Sentinel 提供如下的扩展点:
  • 初始化过程扩展:提供
    InitFunc
    SPI接口,可以添加自定义的一些初始化逻辑,如动态规则源注册等。
  • Slot/Slot Chain 扩展:用于给 Sentinel 功能链添加自定义的功能并自由编排。
  • 指标统计扩展(StatisticSlot Callback):用于扩展 StatisticSlot 指标统计相关的逻辑。
  • Transport 扩展:提供 CommandHandler、CommandCenter 等接口,用于对心跳发送、监控 API Server 进行扩展。
  • 集群流控扩展:可以方便地定制 token client/server 自定义实现,可参考
    对应文档
  • 日志扩展:用于自定义 record log Logger,可用于对接 slf4j 等标准日志实现。
  1. Sentinel 工作原理


    1、
    @SentinelResource基于Aspect的
    AOP
    实现

    spring.factories加载
    SentinelAutoConfiguration.java
    -->SentinelResourceAspect.java
    -->@Aspect定义标记
    -->定义切入点@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    -->增强拦截@Around("sentinelResourceAnnotationPointcut()")
  • entryWithPriority()接口,entry统一会进入此接口,
    ThreadLocal保护线程之中,一个线程使用一个context,
-->SphU.entry()
-->Env.

sph

.entryWithType

()
-->CtSph#entryWithPriority()
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
    
    // ThreadLocal<Context> contextHolder,使用ThreadLocal线程保护获取上下文
Context context = ContextUtil.getContext();
    
    // 默认获取名称为sentinel_default_context的上下文
if (context == null) {
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }


ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

  
Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
return e;
}
  • 创建context上下文,ContextUtil.trueEnter()接口的实现

      • 里面使用到Lock锁和两次get()操作实现双重校验锁 DCL,保证原子性和性能问题

    protected static Context trueEnter(String name, String origin) {
    Context context = contextHolder.get();
        if (context == null) {
    Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            // Lock锁和两次get()操作实现双重校验锁 DCL,保证原子性和性能问题
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
    if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
    try {
                        LOCK.lock();
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
    if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);
                                // 里面使用到CopyOnWrite技术,首先将旧的contextNameNodeMap拷贝一份,然后更新拷贝的map,再用更新后的实例列表来覆盖旧的实例列表。防止高并发读脏数据。
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
    context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }
    
    return context;
    }

2、
接口基于AbstractSentinellnterceptor
拦截器
实现

  • 1.获取resourceName (controller方法的RegestMapping)
  • 2.获取contextName,默认是sentinel-springweb-context
  • 3.获取origin,基于自定义的RequestOriginParser
  • 4初始化Context ContextUtil.enter(contextilame, oring)

    4.1.创建EntranceNode(contextName)
    4.2创建Context,放入ThreadLocal
  • 5.标记资源,创建Entry,Entry e = SphU.entry(resourceName)
  • 执行ProcesserSlotChain
  1. Sentinel 的
    SlotChain

slot chain设计是一个槽卡调用链,其实可以分为两部分:
统计数据构建部分
(statistic)和
规则判断部分
(rule checking)。
统计数据构建部分:
NodeSelectorSlot

ClusterBuilderSlot

StatisticSlot
规则判断部分:
ParamFlowSlot

FlowSlot

AuthoritySlot

DegradeSlot

SystemSlot
调用顺序:
  1. NodeSelectorSlot

负责收集资源的路径,并将这些资源的调用路径,以树状结构存储
Node
起来,用于根据调用路径来限流降级,入下图收集node:
  1. ClusterBuilderSlot

则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  1. StatisticSlot

负责统计实时调用数据
,包括运行信息 (访问次数、线程数)、来源信息等
是实现限流的关键,其中基于
滑动时间窗口
算法维护了计数器,统计进入某个资源的请求次数
核心代码如下:
  1. ParamFlowSlot

热点规则:使用令牌通算法实现
  1. SystemSlot

系统规则
:则通过系统的状态,例如 load1 等,来控制总的入口流量;
  1. AuthoritySlot

授权规则
:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。
  1. FlowSlot

流控规则
:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
包括:
  • 三种流控模式:直接模式、关联模式、链路模式
  • 三种流控效果:快速失败、warm up、排队等待
限流算法:
滑动时间窗口算法
: 快速失败、warm up
漏桶算法
: 排队等待效果,(sentinel主要通过
  1. DegradeSlot

降级规则
:则通过统计信息以及预设的规则,来做熔断降级
实现原理
:通过StatisticSlot 获取的统计数据,通过三个状态OPEN、Close、Half-Open(中间状态)来判断是否降级熔断。
  1. 限流算法

限流
:对应用服务器的请求做限制,避免因过多请求而导致服务器过载甚至宕机。
限流算法常见的包括两种:
  • 计数器算法
    ,又包括窗口计数器算法、滑动窗口计数器算法
  • 令牌桶
    算法
    (Token Bucket)
  • 漏桶算法
    (Leaky Bucket)
sentinel在不同的场景下使用了以上三中不同的算法:
  1. 滑动窗口计数器算法

窗口计数器算法有固定窗口算法、滑动窗口计数算法

  • 固定窗口算法

    缺点:
    对于在两个窗口中间临界点的流量突刺,不能统计起来
  • 滑动窗口算法

    优点:每次根据当前时间获取时间
    窗口期
    ,可以统计流量突刺

    缺点:统计量大
sentinel使用了
滑动窗口算法,
把每个时间窗口期划分为多个
样本
,然后每个样本去统计流量数量,定义⼀个存储结构(bucket),⽤于存储计数的值(count),它与时间单元格⼦⼀⼀对应。数据结构可以采⽤数组或链表,Sentinel中采⽤的是定⻓的数组,⽐较有意思。
使用环形的数据存储,避免数据无限长
  1. 令牌桶
    算法

令牌桶算法说明:

  • 以固定的速率生成令牌,存入令牌桶中,如果令牌桶满了以后,多余令牌丢弃
  • 请求进入后,必须先尝试从桶中获取令牌,获取到令牌后才可以被处理
  • 如果令牌桶中没有令牌,则请求等待或丢弃
  1. 漏桶算法

漏桶算法说明

  • 将每个请求视作作”水滴”放入”漏桶”进行存储
  • ”漏桶”以固定速率向外"漏”出请求来执行,如果”漏桶”空了则停止”漏水”
sentinel实现
RateLimiterController.java
,然后
排队
的实现是通过:
Thread.

sleep(

waitTime

)

;

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
    if (acquireCount <= 0) {
return true;
    }
// Reject when count is less or equal than 0.
    // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
    if (count <= 0) {
return false;
    }

long currentTime = TimeUtil.currentTimeMillis();
    // Calculate the interval between every two requests.
    long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

    // Expected pass time of this request.
    long expectedTime = costTime + latestPassedTime.get();

    if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
        latestPassedTime.set(currentTime);
        return true;
    } else {
// Calculate the time to wait.
        long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
        if (waitTime > maxQueueingTimeMs) {
return false;
        } else {
long oldTime = latestPassedTime.addAndGet(costTime);
            try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
                if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
                    return false;
                }
// in race condition waitTime may <= 0
                if (waitTime > 0) {
Thread.sleep(waitTime);
                }
return true;
            } catch (InterruptedException e) {
            }
        }
    }
return false;
}
  1. 限流算法对比

微服务中的灰度发布(又称为金丝雀发布)是一种持续部署策略,它允许在正式环境的小部分用户群体上先部署新版本的应用程序或服务,而不是一次性对所有用户同时发布全新的版本。

这种方式有助于在生产环境中逐步验证新版本的稳定性和兼容性,同时最小化潜在风险,不影响大部分用户的正常使用。

1.灰度发布关键步骤

在 Spring Cloud 微服务架构中,实现灰度发布通常涉及到以下几个方面:

  1. 流量分割

    • 根据一定的策略(如用户 ID、请求头信息、IP 地址等)将流入的请求分配给不同版本的服务实例。
    • 使用 Spring Cloud Gateway、Zuul 等 API 网关组件实现路由规则,将部分请求定向至新版本的服务节点。
  2. 版本标识

    • 新版本服务启动时会注册带有特定版本标签的服务实例到服务注册中心(如 Eureka 或 Nacos)。
    • 请求在路由时可以根据版本标签选择相应版本的服务实例。
  3. 监控与评估

    • 在灰度发布的阶段,运维团队会对新版本服务的性能、稳定性以及用户体验等方面进行实时监控和评估。
    • 如果新版本表现良好,则可以逐渐扩大灰度范围直至全面替换旧版本。
  4. 故障恢复与回滚
    :若新版本出现问题,可通过快速撤销灰度发布策略,使所有流量恢复到旧版本服务,实现快速回滚,确保服务整体可用性。

通过 Spring Cloud 的扩展组件和自定义路由策略,开发人员可以轻松实现灰度发布功能,确保在微服务架构中安全、平滑地进行版本迭代升级。

2.实现思路

灰色发布的常见实现思路有以下几种:

  • 根据用户划分
    :根据用户标识或用户组进行划分,在整个用户群体中只选择一小部分用户获得新功能。
  • 根据地域划分
    :在不同地区或不同节点上进行划分,在其中的一小部分地区或节点进行新功能的发布。
  • 根据流量划分
    :根据流量的百分比或请求次数进行划分,只将一部分请求流量引导到新功能上。

而在生产环境中,比较常用的是根据用户标识来实现灰色发布,也就是说先让一小部分用户体验新功能,以发现新服务中可能存在的某种缺陷或不足。

3.底层实现

Spring Cloud 全链路灰色发布的关键实现思路如下图所示:
image.png
灰度发布的具体实现步骤如下:

  1. 前端程序在灰度测试的用户 Header 头中打上标签,例如在 Header 中添加“gray-tag: true”,其表示要进行灰常测试(访问灰度服务),而其他则为访问正式服务。
  2. 在负载均衡器 Spring Cloud LoadBalancer 中,拿到 Header 中的“gray-tag”进行判断,如果此标签不为空,并等于“true”的话,表示要访问灰度发布的服务,否则只访问正式的服务。
  3. 在网关 Spring Cloud Gateway 中,将 Header 标签“gray-tag: true”继续往下一个调用服务中传递。
  4. 在后续的调用服务中,需要实现以下两个关键功能:
    1. 在负载均衡器 Spring Cloud LoadBalancer 中,判断灰度发布标签,将请求分发到对应服务。
    2. 将灰度发布标签(如果存在),继续传递给下一个调用的服务。

经过第四步的反复传递之后,整个 Spring Cloud 全链路的灰度发布就完成了。

4.具体实现

4.1 版本标识

在灰度发布的执行流程中,有一个核心的问题,如果在 Spring Cloud LoadBalancer 进行服务调用时,区分正式服务和灰度服务呢?

这个问题的解决方案是:在灰度服务既注册中心的 MetaData(元数据)中标识自己为灰度服务即可,而元数据中没有标识(灰度服务)的则为正式服务,以 Nacos 为例,它的设置如下:

spring:
  application:
    name: gray-user-service
  cloud:
    nacos:
      discovery:
        username: nacos
        password: nacos
        server-addr: localhost:8848
        namespace: public
        register-enabled: true 
        metadata: { "gray-tag":"true" } # 标识自己为灰度服务

4.2 负载均衡调用灰度服务

Spring Cloud LoadBalancer 判断并调用灰度服务的关键实现代码如下:

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,
                                                          Request request) {
        // 实例为空
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + this.serviceId);
            }
            return new EmptyResponse();
        } else { // 服务不为空
            RequestDataContext dataContext = (RequestDataContext) request.getContext();
            HttpHeaders headers = dataContext.getClientRequest().getHeaders();
            // 判断是否为灰度发布(请求)
            if (headers.get(GlobalVariables.GRAY_KEY) != null &&
                    headers.get(GlobalVariables.GRAY_KEY).get(0).equals("true")) {
                // 灰度发布请求,得到新服务实例列表
                List<ServiceInstance> findInstances = instances.stream().
                        filter(s -> s.getMetadata().get(GlobalVariables.GRAY_KEY) != null &&
                                s.getMetadata().get(GlobalVariables.GRAY_KEY).equals("true"))
                        .toList();
                if (findInstances.size() > 0) { // 存在灰度发布节点
                    instances = findInstances;
                }
            } else { // 查询非灰度发布节点
                // 灰度发布测试请求,得到新服务实例列表
                instances = instances.stream().
                        filter(s -> s.getMetadata().get(GlobalVariables.GRAY_KEY) == null ||
                                !s.getMetadata().get(GlobalVariables.GRAY_KEY).equals("true"))
                        .toList();
            }
            // 随机正数值 ++i( & 去负数)
            int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
            // ++i 数值 % 实例数 取模 -> 轮询算法
            int index = pos % instances.size();
            // 得到服务实例方法
            ServiceInstance instance = (ServiceInstance) instances.get(index);
            return new DefaultResponse(instance);
        }
    }

以上代码为自定义负载均衡器,并使用了轮询算法。如果 Header 中有灰度标签,则只查询灰度服务的节点实例,否则则查询出所有的正式节点实例(以供服务调用或服务转发)。

4.3 网关传递灰度标识

要在网关 Spring Cloud Gateway 中传递灰度标识,只需要在 Gateway 的全局自定义过滤器中设置 Response 的 Header 即可,具体实现代码如下:

package com.example.gateway.config;

import com.loadbalancer.canary.common.GlobalVariables;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Component
public class LoadBalancerFilter implements GlobalFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 得到 request、response 对象
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        if (request.getQueryParams().getFirst(GlobalVariables.GRAY_KEY) != null) {
            // 设置金丝雀标识
            response.getHeaders().set(GlobalVariables.GRAY_KEY,
                    "true");
        }
        // 此步骤正常,执行下一步
        return chain.filter(exchange);
    }
}

4.4 微服务中传递灰度标签

HTTP 调用工具 Openfeign,我们需要在微服务间继续传递灰度标签,它的实现代码如下:

import feign.RequestInterceptor;
import feign.RequestTemplate;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;

@Component
public class FeignRequestInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate template) {
        // 从 RequestContextHolder 中获取 HttpServletRequest
        ServletRequestAttributes attributes = (ServletRequestAttributes)
                RequestContextHolder.getRequestAttributes();
        // 获取 RequestContextHolder 中的信息
        Map<String, String> headers = getHeaders(attributes.getRequest());
        // 放入 openfeign 的 RequestTemplate 中
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            template.header(entry.getKey(), entry.getValue());
        }
    }

    /**
     * 获取原请求头
     */
    private Map<String, String> getHeaders(HttpServletRequest request) {
        Map<String, String> map = new LinkedHashMap<>();
        Enumeration<String> enumeration = request.getHeaderNames();
        if (enumeration != null) {
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                String value = request.getHeader(key);
                map.put(key, value);
            }
        }
        return map;
    }
}

课后思考

说说负载均衡的底层实现?为什么在网关和 Openfeign 中要传递灰度标签?

本文已收录到我的面试小站
www.javacn.site
,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。

一、前言

虽然 Java 对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。

例如,当有多个任务同时需要处理的时候,一个任务对应一个线程来执行,以此来提升任务的执行效率,模型图如下:

如果任务数非常少,这种模式倒问题不大,但是如果任务数非常的多,可能就会存在很大的问题:

  • 1.线程数不可控:随着任务数的增多,线程数也会增多,这些线程都没办法进行统一管理
  • 2.系统的开销很大:创建线程对系统来说开销很高,随着线程数也会增多,可能会出现系统资源紧张的问题,严重的情况系统可能直接死机

假如把很多任务让一组线程来执行,而不是一个任务对应一个新线程,这种通过接受任务并进行分发处理的就是
线程池

线程池内部维护了若干个线程,当没有任务的时候,这些线程都处于等待状态;当有新的任务进来时,就分配一个空闲线程执行;当所有线程都处于忙碌状态时,新任务要么放入队列中等待,要么增加一个新线程进行处理,要么直接拒绝。

很显然,这种通过线程池来执行多任务的思路,优势明显:

  • 1.资源更加可控:能有效的控制线程数,防止线程数过多,导致系统资源紧张
  • 2.资源消耗更低:因为线程可以复用,可以有效的降低创建和销毁线程的时间和资源
  • 3.执行效率更高:当新的任务进来时,可以不需要等待线程的创建立即执行

关于这一点,我们可以看一个简单的对比示例。

/**
 * 使用一个任务对应一个线程来执行
 * @param args
 */
public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 一个任务对应一个线程,使用20000个线程执行任务
    for (int i = 0; i < 20000; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
            }
        }).start();
    }
    // 等待任务执行完毕
    while (true){
        if(list.size() >= 20000){
            break;
        }
    }
    System.out.println("一个任务对应一个线程,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
/**
 * 使用线程池进行执行任务
 * @param args
 */
public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 使用线程池进行执行任务,默认4个线程
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000));
    for (int i = 0; i < 20000; i++) {
    	// 提交任务
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
            }
        });
    }

    // 等待任务执行完毕
    while (true){
        if(list.size() >= 20000){
            break;
        }
    }
    System.out.println("使用线程池,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

两者执行耗时情况对比,如下:

一个任务对应一个线程,执行耗时:3073ms
---------------------------
使用线程池,执行耗时:578ms

从结果上可以看出,同样的任务数,采用线程池和不采用线程池,执行耗时差距非常明显,一个任务对应一个新的线程来执行,反而效率不如采用 4 个线程的线程池执行的快。

为什么会产生这种现象,下面我们就一起来聊聊线程池。

二、线程池概述

站在专业的角度讲,线程池其实是一种利用池化思想来实现线程管理的技术,它将线程的创建和任务的执行进行解耦,同时复用已经创建的线程来降低频繁创建和销毁线程所带来的资源消耗。通过合理的参数设置,可以实现更低的系统资源使用率、更高的任务并发执行效率。

在 Java 中,线程池最顶级的接口是
Executor
,名下的实现类关系图如下:

关键接口和实现类,相关的描述如下:

  • 1.
    Executor
    是最顶级的接口,它的作用是将任务的执行和线程的创建进行抽象解藕
  • 2.
    ExecutorService
    接口继承了
    Executor
    接口,在
    Executor
    的基础上,增加了一些关于管理线程池的一些方法,比如查看任务的状态、获取线程池的状态、终止线程池等标准方法
  • 3.
    ThreadPoolExecutor
    是一个线程池的核心实现类,完整的封装了线程池相关的操作方法,通过它可以创建线程池
  • 4.
    ScheduledThreadPoolExecutor
    是一个使用线程池的定时调度实现类,完整的封装了定时调度相关的操作方法,通过它可以创建周期性线程池

整个关系图中,其中
ThreadPoolExecutor
是线程池最核心的实现类,开发者可以使用它来创建线程池。

2.1、ThreadPoolExecutor 构造方法

ThreadPoolExecutor
类的完整构造方法一共有七个参数,理解这些参数的配置对使用好线程池至关重要,完整的构造方法核心源码如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

各个参数的解读如下:

  • corePoolSize:核心线程数量,用于执行任务的核心线程数。
  • maximumPoolSize:最大线程数量,线程池中允许创建线程的最大数量
  • keepAliveTime:空闲线程存活的时间。只有当线程池中的线程数大于 corePoolSize 时,这个参数才会起作用
  • unit:空闲线程存活的时间单位
  • workQueue:任务队列,用于存储还没来得及执行的任务
  • threadFactory:线程工厂。用于执行任务时创建新线程的工厂
  • handler:拒绝策略,当线程池和和队列容量处于饱满,使用某种策略来拒绝任务提交

2.2、ThreadPoolExecutor 执行流程

创建完线程池之后就可以提交任务了,当有新的任务进来时,线程池就会工作并分配线程去执行任务。

ThreadPoolExecutor
的典型用法如下:

// 创建固定大小的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
// 提交任务
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
...

针对任务的提交方式,
ThreadPoolExecutor
还提供了两种方法。

  • execute()
    方法:一种无返回值的方法,也是最核心的任务提交方法
  • submit()
    方法:支持有返回值,通过
    FutureTask
    对象来获取任务执行完后的返回值,
    底层依然调用的是
    execute()
    方法

ThreadPoolExecutor
执行提交的任务流程虽然比较复杂,但是通过对源码的分析,大致的任务执行流程,可以用如下图来概括。

整个执行流程,大体步骤如下:

  • 1.初始化完线程池之后,默认情况下,线程数为0,当有任务到来后才会创建新线程去执行任务
  • 2.每次收到提交的任务之后,会先检查核心线程数是否已满,如果没有,就会继续创建新线程来执行任务,直到核心线程数达到设定值
  • 3.当核心线程数已满,会检查任务队列是否已满,如果没有,就会将任务存储到阻塞任务队列中
  • 4.当任务队列已满,会再次检查线程池中的线程数是否达到最大值,如果没有,就会创建新的线程来执行任务
  • 5.如果任务队列已满、线程数已达到最大值,此时线程池已经无法再接受新的任务,当收到任务之后,会执行拒绝策略

我们再回头来看上文提到的
ThreadPoolExecutor
构造方法中的七个参数,这些参数会直接影响线程的执行情况,各个参数的变化情况,可以用如下几点来概括:

  • 1.当线程池中的线程数小于 corePoolSize 时,新任务都不排队而是直接创新新线程来执行
  • 2.当线程池中的线程数大于等于 corePoolSize,workQueue 未满时,将新任务添加到 workQueue 中而不是创建新线程来执行
  • 3.当线程池中的线程数大于等于 corePoolSize,workQueue 已满,但是线程数小于 maximumPoolSize 时,此时会创建新的线程来处理被添加的任务
  • 4.当线程池中的线程数大于等于 maximumPoolSize,并且 workQueue 已满,新任务会被拒绝,使用 handler 执行被拒绝的任务

ThreadPoolExecutor
执行任务的部分核心源码如下!

2.2.1、execute 提交任务
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
	// 工作线程数量 < corePoolSize,直接创建线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
	// 工作线程数量 >= corePoolSize,将任务添加至阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
		// 往阻塞队列中添加任务的时候,如果线程池非运行状态,将任务remove,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 阻塞队列已满,尝试添加新的线程去执行,如果工作线程数量 >= maximumPoolSize,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
2.2.2、addWorker 创建线程加入线程池
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

		// 线程池状态处于非 RUNNING 状态,添加worker失败
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
		// 判断线程池中线程数量大于等于该线程池允许的最大线程数量,如果大于则worker失败,反之cas更新线程池中的线程数
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
		// 创建工作线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                	// 如果线程池处于 RUNNING 状态并且线程已经启动,则抛出线程异常启动
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
					// 将线程加入已创建的工作线程集合,更新用于追踪线程池中线程数量 largestPoolSize 字段
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
				// 启动线程执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
2.2.3、runWorker 执行任务
final void runWorker(Worker w) {
	// 获取执行任务线程
    Thread wt = Thread.currentThread();
    // 获取执行任务
    Runnable task = w.firstTask;
	// 将worker中的任务置空
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    	// 从当前工作线程种获取任务,或者循环从阻塞任务队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
			// 双重检查线程池是否正在停止,如果线程池停止,并且当前线程能够中断,则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
				// 前置执行任务钩子函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
					// 执行当前任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
					// 后置执行任务钩子函数
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
		// 回收线程
        processWorkerExit(w, completedAbruptly);
    }
}
2.2.4、reject 执行拒绝策略
final void reject(Runnable command) {
	// 执行拒绝策略
    handler.rejectedExecution(command, this);
}

当线程池中的线程数大于等于 maximumPoolSize,并且 workQueue 已满,新任务会被拒绝,使用
RejectedExecutionHandler
接口的
rejectedExecution()
方法来处理被拒绝的任务。

线程池提供了四种拒绝策略实现类来拒绝任务,具体如下:

描述
AbortPolicy 直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略
DiscardPolicy 什么也不做,直接丢弃任务
DiscardOldestPolicy 将阻塞队列中的任务移除出来,然后执行当前任务
CallerRunsPolicy 尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了

2.3、ThreadPoolExecutor 线程池状态

我们知道 Java 种的线程一共 6 种状态,其实线程池也有状态。

因为线程池也是异步执行的,有的任务正在执行,有的任务存储在任务队列中,有的线程处于工作状态,有的线程处于空闲状态等待回收,为了更加精细化的管理线程池,线程池也设计了 5 中状态,部分核心源码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

	// 线程池线程数的bit数
	private static final int COUNT_BITS = Integer.SIZE - 3;
	
	// 线程池状态
	private static final int RUNNING    = -1 << COUNT_BITS;
	private static final int SHUTDOWN   =  0 << COUNT_BITS;
	private static final int STOP       =  1 << COUNT_BITS;
	private static final int TIDYING    =  2 << COUNT_BITS;
	private static final int TERMINATED =  3 << COUNT_BITS;
}

其中的状态流程,可以用如下图来描述!

这几个状态的转化关系,可以用如下几个步骤来概括:

  • 1.线程池创建完之后,默认就进入了可执行状态
    RUNNING
    ,此时线程数为 0,当有任务进来时,再创建新线程来执行,可以看成是一个慢启动的过程
  • 2.当线程池处于运行状态时,可以通过
    shutdown()
    或者
    shutdownNow()
    方法来改变运行状态。
    shutdown()
    是一个平稳的关闭操作,线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务,这时候线程池处于 SHUTDOWN 状态;
    shutdownNow()
    是一个立即关闭的操作,线程池立刻停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务,这时候线程池处于 STOP 状态
  • 3.当任务队列和线程池均为空的时候,SHUTDOWN 或者 STOP 状态,就会进入 TIDYING 状态,等待被终止
  • 4.当
    terminated()
    方法被调用完成之后,线程池会从 TIDYING 状态进入 TERMINATED 状态,此时线程池就结束了

三、线程池应用

正如文章的开头所介绍的,使用线程池的方式,通常可以用如下几个步骤来概括:

// 1.创建固定大小为4的线程数、空闲线程的存活时间为15秒、阻塞任务队列的上限为1000的线程池完整示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

// 2.提交任务
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
...

// 3.使用完毕之后,可以手动关闭线程池
executor.shutdown();

正如上文所说,其中
execute()

submit()
方法都可以用来提交任务,稍有不同的是:
submit()
方法同时还支持获取任务执行完毕的返回结果。

针对线程池的使用,Java 还提供了
Executors
工具类,开发者可以通过此工具,快速创建不同类型的线程池。

下面我们一起来看下
Executors
为用户提供的几种创建线程池的方法。

3.1、newSingleThreadExecutor

newSingleThreadExecutor()
方法表示创建一个单线程的线程池,核心源码如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

从构造参数上可以很清晰的看到,线程池中的线程数为 1,不会被线程池自动回收,workQueue 选择的是无界的
LinkedBlockingQueue
阻塞队列,不管来多少任务存入阻塞队列中,前面一个任务执行完毕,再执行队列中的剩余任务。

简单应用示例如下:

public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 创建一个单线程线程池
    ExecutorService executor  = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
                System.out.println("thread name:" + Thread.currentThread().getName());
            }
        });
    }

    while (true){
        if(list.size() >= 10){
            break;
        }
    }
    System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

运行结果如下:

thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
执行耗时:13ms

3.2、newFixedThreadPool

newFixedThreadPool()
方法表示创建一个固定大小线程数的线程池,核心源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

固定大小的线程池和单线程的线程池有异曲同工之处,无非是让线程池中能运行的线程数量支持手动指定。

简单应用示例如下:

public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 创建固定大小线程数为3的线程池
    ExecutorService executor  = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 10; i++) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
                System.out.println("thread name:" + Thread.currentThread().getName());
            }
        });
    }

    while (true){
        if(list.size() >= 10){
            break;
        }
    }
    System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

运行结果如下:

thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-2
thread name:pool-1-thread-1
执行耗时:10ms

3.3、newCachedThreadPool

newCachedThreadPool()
方法表示创建一个可缓存的无界线程池,核心源码如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从构造参数上可以看出,线程池中的最大线程数为
Integer.MAX_VALUE
,也就是
Integer
的最大值,workQueue 选择的是
SynchronousQueue
阻塞队列,这个阻塞队列不像
LinkedBlockingQueue
,它没有容量,只负责做临时任务缓存,如果有任务进来立刻会被执行。

也就是说,只要添加进去了任务,线程就会立刻去执行,当任务超过线程池的线程数则创建新的线程去执行,线程数量的最大上线为
Integer.MAX_VALUE
,当线程池中的线程空闲时间超过 60s,则会自动回收该线程。

简单应用示例如下:

public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 创建可缓存的无界线程池
    ExecutorService executor  = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
                System.out.println("thread name:" + Thread.currentThread().getName());
            }
        });
    }

    while (true){
        if(list.size() >= 10){
            break;
        }
    }
    System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

运行结果如下:

thread name:pool-1-thread-1
thread name:pool-1-thread-2
thread name:pool-1-thread-3
thread name:pool-1-thread-4
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-4
thread name:pool-1-thread-4
thread name:pool-1-thread-4
执行耗时:13ms

3.4、newScheduledThreadPool

newScheduledThreadPool()
方法表示创建周期性的线程池,可以指定线程池中的核心线程数,支持定时及周期性任务的执行,核心源码如下:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

从构造参数上可以看出,线程池支持指定核心线程数,最大线程数为
Integer.MAX_VALUE
,workQueue 选择的是
DelayedWorkQueue
延迟阻塞队列,这个阻塞队列支持任务延迟消费,新加入的任务不会立刻被执行,只有时间到期之后才会被取出;当非核心线程处于空闲状态时,会立刻进行收回。

ScheduledExecutorService
支持三种类型的定时调度方法,分别如下:

  • schedule
    :支持指定多久执行一次任务
  • scheduleAtFixedRate
    :支持周期性间隔多久的执行任务
  • scheduleWithFixedDelay
    :同样也是指周期性的执行任务,不过它指的是上一个任务执行完之后,延迟多久执行下一个任务

下面我们一起来看看它们的应用方式。

3.4.1、schedule 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor  = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) +  " 准备启动");
// 定时执行一次的任务,延迟1s后执行
executor.schedule(new Runnable() {

    @Override
    public void run() {
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  ", schedule");

    }
}, 1, TimeUnit.SECONDS);

输出结果:

2023-11-17 01:41:12 准备启动
2023-11-17 01:41:13 thread name:pool-1-thread-1, schedule
3.4.2、scheduleAtFixedRate 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor  = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) +  " 准备启动");

// 周期性地执行任务,第一个任务延迟1s后执行,之后每隔2s周期性执行任务,需要等待上一次的任务执行完毕才执行下一个
executor.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " begin");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " end");
    }
}, 1, 2, TimeUnit.SECONDS);

输出结果:

2023-11-17 02:00:44 准备启动
2023-11-17 02:00:45 thread name:pool-1-thread-1 begin
2023-11-17 02:00:48 thread name:pool-1-thread-1 end
2023-11-17 02:00:48 thread name:pool-1-thread-1 begin
2023-11-17 02:00:51 thread name:pool-1-thread-1 end
2023-11-17 02:00:51 thread name:pool-1-thread-1 begin
2023-11-17 02:00:54 thread name:pool-1-thread-1 end
3.4.3、scheduleWithFixedDelay 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor  = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) +  " 准备启动");
// 周期性地执行任务,第一个任务延迟1s后执行,之后上一个任务执行完毕之后,延迟2秒再执行下一个任务
executor.scheduleWithFixedDelay(new Runnable() {

    @Override
    public void run() {
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " begin");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " end");

    }
}, 1, 2, TimeUnit.SECONDS);

输出结果:

2023-11-17 01:53:26 准备启动
2023-11-17 01:53:27 thread name:pool-1-thread-1 begin
2023-11-17 01:53:30 thread name:pool-1-thread-1 end
2023-11-17 01:53:32 thread name:pool-1-thread-1 begin
2023-11-17 01:53:35 thread name:pool-1-thread-1 end
2023-11-17 01:53:37 thread name:pool-1-thread-1 begin
2023-11-17 01:53:40 thread name:pool-1-thread-1 end

3.5、工厂方法小结

从以上的介绍中,我们可以对这四种线程池的参数做一个汇总,内容如下表:

工厂方法 corePoolSize maximumPoolSize keepAliveTime workQueue
newSingleThreadExecutor 1 1 0 LinkedBlockingQueue
newFixedThreadPool nThreads nThreads 0 LinkedBlockingQueue
newCachedThreadPool 0 Integer.MAX_VALUE 60s SynchronousQueue
newScheduledThreadPool corePoolSize Integer.MAX_VALUE 0 DelayedWorkQueue

这四个线程池,主要的区别在于:corePoolSize、maximumPoolSize、keepAliveTime、workQueue 这四个参数,其中线程工厂为默认类
DefaultThreadFactory
,线程饱和的拒绝策略为默认类
AbortPolicy

四、小结

结合以上的分析,最后我们再来总结一下。

对于线程池的使用,不太建议采用
Executors
工具去创建,尽量通过
ThreadPoolExecutor
的构造方法来创建,原因在于:有利于规避资源耗尽的风险;同时建议开发者手动设定任务队列的上限,防止服务出现 OOM。

虽然
Executors
工具提供了四种创建线程池的方法,能帮助开发者省去繁琐的参数配置,但是
newSingleThreadExecutor

newFixedThreadPool
方法创建的线程池,任务队列上限为
Integer.MAX_VALUE
,这意味着可以无限提交任务,这在高并发的环境下,系统可能会出现 OOM,导致整个线程池不可用;其次
newCachedThreadPool
方法也存在同样的问题,无限的创建线程可能会给系统带来更多的资源消耗。

其次,创建线程池的时候应该尽量给线程定义一个具体的业务名字前缀,方便定位问题,不同类型的业务尽量使用不同的线程池来实现。

例如可以使用
guava
包,创建自定义的线程工厂。

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();

当然,你也可以自行实现一个线程工厂,需要继承
ThreadFactory
接口,案例如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + "-" + threadNum.incrementAndGet());
        return t;
    }
}

创建一个线程名称以
order
开头的线程工厂。

NamingThreadFactory threadFactory = new NamingThreadFactory(Executors.defaultThreadFactory(), "order");

最后,再来说说关于线程池中线程数,如何合理设定的问题?

  • 对于需要消耗 CPU 资源的密集型任务,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响
  • 对于需要消耗 I/O 资源的密集型任务,可以将线程数设置为 2N,原因在于:线程在处理 I/O 的时间段内不会占用 CPU 资源,这时就可以将 CPU 交出给其它线程使用,因此可以多配置一些线程数

那如何判断当前是 CPU 密集型任务还是 I/O 密集型任务呢?

最简单的方法就是:如果当前任务涉及到网络读取,文件读取等,这类都是 IO 密集型任务,除此之外,可以看成是 CPU 密集型任务。

本文篇幅比较长,难免有描述不对的地方,欢迎大家留言指出!

五、参考

  1. https://zhuanlan.zhihu.com/p/350067478

  2. https://blog.csdn.net/qq_40093255/article/details/116990431

  3. https://www.cnblogs.com/xrq730/p/4856453.html

本文介绍在
Visual Studio 2022
中配置、编译
C++
计算机视觉库
OpenCV
的方法。

1 OpenCV库配置

首先,我们进行
OpenCV
库的下载与安装。作为一个开源的库,我们直接在其
官方下载网站
中进行下载即可;如下图所示,我们首先选择需要下载的操作系统。

image

随后,即可在弹出的新界面中自动开始
OpenCV
库的下载。

下载完毕后,可以得到
OpenCV
库的
.exe
格式文件。

我们双击这一
.exe
格式文件,即可开始
OpenCV
库的
文件提取
过程,也就相当于是
安装
过程;其中,我们首先需要选择
OpenCV
库安装的路径。

随后,即可开始
OpenCV
库的文件提取。

OpenCV
库文件提取完毕后,会得到如下所示的文件夹。这里建议将这一文件夹放在纯英文路径下。

接下来,基于
Windows电脑环境变量(用户变量、系统变量)的修改
提到的方法,配置
OpenCV
库相关的环境路径。其中,需要在“
系统变量
”的“
Path
”中进行操作。

如下图所示,我们将
OpenCV
库中
...\build\x64\vc15\bin
路径放入“
系统变量
”的“
Path
”中,在我这里这一路径就是
C:\opencv\build\x64\vc15\bin

其中,需要注意的是,
OpenCV
库中一般会有两个
VC
版本对应的文件夹,例如我这里下载的
4.6.0
版本的
OpenCV
库,其就有
vc14

vc15
两个文件夹;具体选择哪一个文件夹中的
bin
文件夹,需要结合我们的
Visual Studio
软件版本来判断——一般的,只要
Visual Studio
软件版本是
2017
年及之后的版本(例如我这里就是
Visual Studio 2022
),那么就选择
vc15
这个文件夹;如果
Visual Studio
软件版本是
2015
年的,那么就选择
vc14
这个文件夹;如果
Visual Studio
软件版本是
2015
年之前更早的版本,那么最好就更换老版本的
OpenCV
库,从而找到适配的
VC
版本。

2 Visual Studio环境配置

接下来,我们基于
安装Visual Studio的详细流程
提到的方法,新建一个项目,其名称与路径大家可以自行设置。

随后,按照上述文章中的方法,新建一个
.cpp
格式的源文件。

接下来,按照
如何在Visual Studio新C++项目中调用之前配置过的库?
提到的方法,分别进行
OpenCV
库的
附加包含目录

附加库目录

附加依赖项
的配置。

首先,将
附加包含目录
配置为
OpenCV
库的
C:\opencv\build\include

C:\opencv\build\include\opencv2
等2个路径;如下图所示。

随后,将
附加库目录
配置为
OpenCV
库的
C:\opencv\build\x64\vc15\lib
这一路径;如下图所示。这里需要注意,具体使用哪一个
VC
版本对应的文件夹路径,还是和前文提到的一样,依据大家的
Visual Studio
版本来确定。

最后,我们找到
OpenCV
库的
C:\opencv\build\x64\vc15\lib
文件夹,可以看到其中有两个
.lib
格式的文件。

其中,如果我们需要配置
Debug
版本的
OpenCV
库,那么就选择名称最后面带有字母
d

.lib
格式文件(也就是上图中选中的那一个文件);如果需要配置
Release
版本的
OpenCV
库,那么就选择名称最后不带有字母
d

.lib
格式文件。随后,将这一文件的路径复制到
附加依赖项
中;如下图所示,我这里准备配置
Debug
版本的
OpenCV
库,因此就选择了名称最后面带有字母
d

.lib
格式文件。此外,这里还是一样的,具体使用哪一个
VC
版本对应的文件夹路径,依据大家的
Visual Studio
版本来确定即可。

3 代码测试

通过上述步骤,我们完成了
OpenCV
库的配置工作;此时可以通过一些简单的代码来测试
OpenCV
库配置是否正确。

例如,可以通过以下代码来测试
OpenCV
库的配置情况。其中,
pic_path
是一个指向图片文件的路径,大家可以基于自己电脑中的任何一张图片文件来修改这一路径。

#include <opencv2/opencv.hpp>

using namespace cv;

int main() {
	const char* pic_path = "E:/99_Other/公众号与博客/03_图库/Amos2.png";
	Mat pic = imread(pic_path, 1);
	imshow("My Picture", pic);
	waitKey();
	return 0;
}

随后,在
Visual Studio
中运行上述代码。

运行代码后,如果出现如下所示的情景,即我们通过
pic_path
指定的图片可以正常显示出来,说明
OpenCV
库的配置没有问题。

至此,大功告成。

Playwright 是一个由 Microsoft 开发的开源工具,用于自动化 Web 浏览器的测试和操作。它提供了一种跨浏览器、跨平台的自动化解决方案,可以在 Chromium、Firefox 和 WebKit(Safari)等多种浏览器上进行测试和操作。本篇随笔介绍Playwright的一些特点,以及能够完成的工作,并总结一些使用 Playwright 的最佳实践和技巧供参考。

1)使用 Playwright 的好处和优势

使用 Playwright 带来的好处和优势包括:

1. 跨浏览器支持

Playwright 支持在多种浏览器上运行测试,包括 Chromium、Firefox 和 WebKit(Safari),确保您的应用程序在不同浏览器中的兼容性。这使得您可以在单个测试套件中覆盖多种浏览器,减少了测试的重复工作。

2. 跨平台支持

Playwright 提供了适用于 Windows、Mac 和 Linux 的 API,使您可以在不同操作系统上运行相同的测试脚本。这使得开发团队可以更加灵活地协作和共享测试代码,无需担心平台差异带来的问题。

3. 多语言支持

Playwright 支持多种编程语言,包括 JavaScript、TypeScript、Python、C# 和 Java 等,适应不同开发团队的需求。这使得开发人员可以使用他们最熟悉的语言编写测试脚本,提高了生产力和代码质量。

4. 自动化测试

Playwright 可以模拟用户在浏览器中的操作,如点击、输入、滚动等,用于编写端到端的自动化测试脚本。它提供了丰富的 API 和工具,可以轻松地模拟复杂的用户行为,帮助您发现和修复应用程序中的问题。

5. 可靠性和稳定性

Playwright 以其稳定性和可靠性而闻名,可以确保测试脚本的稳定运行,减少了误报和虚假警报的发生。它提供了强大的错误处理和调试功能,可以帮助您快速定位和解决测试中的问题。

6. 社区和生态系统

Playwright 拥有一个活跃的社区和丰富的生态系统,提供了大量的文档、示例代码、教程和社区支持。您可以从社区中获取有价值的反馈和建议,加速学习和问题解决的过程。

Playwright 主要可以用于以下方面的工作:

  1. 自动化测试: Playwright 可以用于编写自动化测试脚本,对 Web 应用程序进行功能测试、回归测试、端到端测试等。

  2. 界面录制和回放: 尽管 Playwright 本身不提供界面录制和回放的功能,但你可以利用它的 API 记录用户的操作,并将其转换为自动化测试脚本。

  3. 性能测试: 使用 Playwright,你可以模拟不同网络条件和设备环境下的用户操作,评估 Web 应用程序的性能和稳定性。

  4. Web 数据采集: 你可以编写脚本来访问网站并提取所需的数据,比如抓取商品信息、新闻文章、股票数据等。

  5. 表单填写和提交: 你可以使用 Playwright 来模拟用户在网页中填写表单并提交,比如注册、登录、订阅等操作。

  6. 页面截图和视频录制: 你可以使用 Playwright 来捕获页面的截图或者录制页面操作的视频,用于测试报告、可视化展示等。

  7. 文件上传和下载: Playwright 可以模拟用户上传文件和下载文件的操作,用于测试文件上传功能或者下载资源。

  8. 多浏览器测试: Playwright 支持在多种浏览器(Chromium、Firefox、WebKit)上运行测试,确保 Web 应用程序在不同浏览器中的兼容性和一致性。

Playwright 是一个功能强大、灵活且易于使用的工具,可以帮助开发人员和测试人员提高效率,确保 Web 应用程序的质量和稳定性。你可以根据具体的需求和场景,使用 Playwright 来实现各种自动化处理和测试任务。

微软官方的在线介绍:
https://playwright.dev/dotnet/docs/intro

GitHub地址:
https://github.com/microsoft/playwright

.NET接口的GitHub地址:
https://github.com/microsoft/playwright-dotnet

下面介绍一些使用 Playwright 的最佳实践和技巧例子代码。

1、如何进行模拟表单的交互操作

使用 Playwright 进行模拟表单的交互操作非常简单,你可以使用 Playwright 提供的 API 来模拟用户在浏览器中的行为,比如填写表单、点击按钮等。以下是一些实际的示例代码,演示了如何使用 Playwright 来与表单进行交互操作:

下面代码例子,展示了如何使用 Playwright 在浏览器中填写表单、选择下拉框中的选项,并提交表单。

1)填写表单并提交

usingMicrosoft.Playwright;usingSystem;usingSystem.Threading.Tasks;classProgram
{
static async Task Main(string[] args)
{
var playwright = awaitPlaywright.CreateAsync();var browser = await playwright.Chromium.LaunchAsync(new BrowserTypeLaunchOptions { Headless = false});var page = awaitbrowser.NewPageAsync();await page.GotoAsync("https://example.com");//填写表单 await page.FillAsync("input[name='username']", "myusername");await page.FillAsync("input[name='password']", "mypassword");//提交表单 await page.ClickAsync("button[type='submit']");//等待页面跳转 awaitpage.WaitForNavigationAsync();

Console.WriteLine(
"Form submitted!");awaitbrowser.CloseAsync();
}
}

2)选择下拉框中的选项

usingMicrosoft.Playwright;usingSystem;usingSystem.Threading.Tasks;classProgram
{
static async Task Main(string[] args)
{
var playwright = awaitPlaywright.CreateAsync();var browser = await playwright.Chromium.LaunchAsync(new BrowserTypeLaunchOptions { Headless = false});var page = awaitbrowser.NewPageAsync();await page.GotoAsync("https://example.com");//选择下拉框中的选项 await page.SelectOptionAsync("select[name='country']", "Canada");//提交表单 await page.ClickAsync("button[type='submit']");//等待页面跳转 awaitpage.WaitForNavigationAsync();

Console.WriteLine(
"Form submitted!");awaitbrowser.CloseAsync();
}
}

3)如何进行文件上传操作

使用 Playwright 进行文件上传操作非常简单。

usingMicrosoft.Playwright;usingSystem;usingSystem.Threading.Tasks;classProgram
{
static async Task Main(string[] args)
{
var playwright = awaitPlaywright.CreateAsync();var browser = await playwright.Chromium.LaunchAsync(new BrowserTypeLaunchOptions { Headless = false});var page = awaitbrowser.NewPageAsync();await page.GotoAsync("https://www.example.com");//找到文件上传输入框 var fileInput = await page.QuerySelectorAsync("input[type='file']");//上传文件 string filePath = "path/to/your/file.txt"; //你要上传的文件路径 awaitfileInput.SetInputFilesAsync(filePath);

Console.WriteLine(
"File uploaded successfully!");awaitbrowser.CloseAsync();
}
}

实际处理的时候,你只需要替换
filePath
变量为你要上传的文件的路径即可。

需要注意的是,文件上传操作的实现可能会因网站的实现方式而有所不同。有些网站可能使用一些 JavaScript 或者 Ajax 技术来实现文件上传,可能需要针对具体的网站进行调整和测试。

另外,如果你在 Headless 模式下运行 Playwright,可能无法看到实际的文件选择对话框,但代码仍然会模拟文件上传操作。

2、如何进行Web 数据采集和截屏等

使用 Playwright 进行 Web 数据采集非常方便,你可以编写脚本来访问网站并提取所需的数据。以下是一个简单的示例代码,演示了如何使用 Playwright 来进行 Web 数据采集:

usingMicrosoft.Playwright;usingSystem;usingSystem.Threading.Tasks;classProgram
{
static async Task Main(string[] args)
{
var playwright = awaitPlaywright.CreateAsync();var browser = await playwright.Chromium.LaunchAsync(new BrowserTypeLaunchOptions { Headless = true});var page = awaitbrowser.NewPageAsync();await page.GotoAsync("https://www.example.com");//提取页面标题 string title = awaitpage.TitleAsync();
Console.WriteLine(
"Page title:" +title);//提取页面 URL string url =page.Url;
Console.WriteLine(
"Page URL:" +url);//提取页面文本内容 string pageContent = awaitpage.GetTextContentAsync();
Console.WriteLine(
"Page content:" +pageContent);//提取特定元素的文本内容 var elementText = await page.EvaluateAsync<string>("document.querySelector('h1').textContent");
Console.WriteLine(
"Header text:" +elementText);//提取页面中的链接 var links = await page.QuerySelectorAllAsync("a");foreach (var link inlinks)
{
var href = await link.GetAttributeAsync("href");
Console.WriteLine(
"Link:" +href);
}
awaitbrowser.CloseAsync();
}
}

在这个示例中,我们使用 Playwright 打开了一个网页(https://www.example.com),然后提取了页面的标题、URL、文本内容以及特定元素(h1 标签)的文本内容,最后提取了页面中的所有链接。你可以根据实际需求修改代码,提取你感兴趣的其他数据。

请注意,上述示例中使用的是 Chromium 浏览器,你也可以选择使用其他支持的浏览器,比如 Firefox 或者 WebKit(Safari)。另外,你也可以在 LaunchAsync 方法中设置 Headless 参数为 false,这样浏览器将会以可视化的方式打开,便于调试和观察执行过程。

如果我们需要批量获取某个表格的数据,可以结合正则表达式的处理,模拟爬虫批量获取符合条件的记录,存储到本地来使用。

static async Task Main(string[] args)
{
var playwright = awaitPlaywright.CreateAsync();var browser = await playwright.Chromium.LaunchAsync(new BrowserTypeLaunchOptions { Headless = true});var page = awaitbrowser.NewPageAsync();await page.GotoAsync("https://www.example.com");//等待表格加载完毕 await page.WaitForSelectorAsync("table");//获取表格内容 var tableHtml = await page.InnerHTMLAsync("table");//使用正则表达式提取表格数据 var regex = new Regex(@"<tr>(.*?)</tr>");var matches =regex.Matches(tableHtml);foreach (Match match inmatches)
{
//这里可以根据表格结构和需要自行解析数据 var rowHtml = match.Groups[1].Value;
Console.WriteLine(
"Row HTML:" +rowHtml);
}
awaitbrowser.CloseAsync();
}

也可以参考使用 Playwright 截屏的代码处理。

        private asyncTask OpenWebPage()
{
var playwright = awaitPlaywright.CreateAsync();var browser = await playwright.Chromium.LaunchAsync(new BrowserTypeLaunchOptions { Headless = false});var page = awaitbrowser.NewPageAsync();await page.GotoAsync("https://www.example.com");//等待页面加载完成 awaitpage.WaitForLoadStateAsync(LoadState.NetworkIdle);//截图保存 await page.ScreenshotAsync(new PageScreenshotOptions { Path = "screenshot.png"});awaitbrowser.CloseAsync();
}

在这个示例中,我们创建了一个 WPF 程序的窗口,其中包含一个按钮。当用户点击按钮时,WPF 程序将会使用 Playwright 打开一个 Chromium 浏览器,并访问示例网站(https://www.example.com),然后截取页面的屏幕截图。

你可以根据自己的需求和具体情况,进一步扩展这个示例,实现更复杂的自动化任务或者与其他功能的集成。

3、如何使用Playwright进行多浏览器测试

使用 Playwright 进行多浏览器测试非常简单,因为 Playwright 提供了跨浏览器的 API,你可以在不同的浏览器上运行相同的测试脚本。

以下是一个简单的示例,演示了如何在 Chromium、Firefox 和 WebKit 浏览器上运行同一个测试脚本:

usingSystem;usingSystem.Threading.Tasks;usingMicrosoft.Playwright;classProgram
{
static async Task Main(string[] args)
{
//创建 Playwright 实例 var playwright = awaitPlaywright.CreateAsync();//在 Chromium 浏览器上运行测试 awaitRunTests(playwright.Chromium);//在 Firefox 浏览器上运行测试 awaitRunTests(playwright.Firefox);//在 WebKit 浏览器上运行测试 awaitRunTests(playwright.Webkit);
}
static asyncTask RunTests(IBrowserType browserType)
{
//启动浏览器 var browser = await browserType.LaunchAsync(new BrowserTypeLaunchOptions { Headless = true});//创建页面 var page = awaitbrowser.NewPageAsync();//在页面上执行测试 await page.GotoAsync("https://www.example.com");//其他测试步骤...//关闭浏览器 awaitbrowser.CloseAsync();
}
}

在这个示例中,我们首先创建了 Playwright 实例,然后分别使用
playwright.Chromium

playwright.Firefox

playwright.Webkit
获取 Chromium、Firefox 和 WebKit 浏览器的 API。接着,我们定义了一个
RunTests
方法,该方法接受一个
IBrowserType
参数,根据传入的浏览器类型启动浏览器,并在页面上执行测试。最后,我们在
Main
方法中分别调用
RunTests
方法来在不同的浏览器上运行测试。

这样,你就可以使用 Playwright 轻松地在多个浏览器上运行相同的测试脚本,确保你的 Web 应用程序在不同浏览器中的兼容性和一致性。