2024年3月

ASP.NET Core MVC的“模块化”设计使我们可以构成应用的基本单元Controller定义在任意的模块(程序集)中,并在运行时动态加载和卸载。这种为“飞行中的飞机加油”的方案是如何实现的呢?该系列的两篇文章将关注于这个主题,本篇着重介绍“模块化”的总体设计,下篇我们将演示将介绍“分散定义Controller”的N种实现方案。

一、ApplicationPart & AssemblyPart
二、ApplicationPartFactory & DefaultApplicationPartFactory
三、IApplicationFeatureProvider & IApplicationFeatureProvider<TFeature>
四、ControllerFeatureProvider
五、ApplicationPartManager
六、设计总览
七、有效Controller类型的提取

一、ApplicationPart & AssemblyPart

MVC构建了一个抽象的模型来描述应用的组成。原则上来说,我们可以根据不同维度来描述当前的MVC应用由哪些部分构成,任何维度针下针对应用组成部分的描述都体现为一个
ApplicationPart
对象。因为没有限制对应用进行分解的维度,所以“应用组成部分”也是一个抽象的概念,它具有怎样的描述也是不确定的。也正是因为如此,对应的ApplicationPart类型也是一个抽象类型,我们只要任何一个ApplicationPart对象具有一个通过Name属性表示的名称就可以。

public abstract class ApplicationPart
{
    public abstract string Name { get; }
}

对于任何一个.NET Core应用来说,程序集永远是基本的部署单元,所以一个应用从部署的角度来看就是一组程序集。如果采用这种应用分解方式,我们可以将一个程序集视为应用一个组成部分,并可以通过如下这个
AssemblyPart
类型来表示。

public class AssemblyPart : ApplicationPart, IApplicationPartTypeProvider
{
    public Assembly 			Assembly { get; }
    public IEnumerable<TypeInfo> 	Types => Assembly.DefinedTypes;
    public override string 		Name => Assembly.GetName().Name;

    public AssemblyPart(Assembly assembly) => Assembly = assembly;
}

如上面的代码片段所示,一个AssemblyPart对象是对一个描述程序集的Assembly对象的封装,其Name属性直接返回程序集的名称。AssemblyPart类型还是实现了IApplicationPartTypeProvider接口,如下面的代码片段所示,该接口通过提供的Types属性提供当前定义在当前ApplicationPart范围内容的所有类型。AssemblyPart类型的Types属性会返回指定程序集中定义的所有类型。

public interface IApplicationPartTypeProvider
{
    IEnumerable<TypeInfo> Types { get; }
}

二、ApplicationPartFactory & DefaultApplicationPartFactory

如下所示的抽象类
ApplicationPartFactory
表示创建ApplicationPart对象的工厂。如代码片段所示,该接口定义了唯一的GetApplicationParts方法从指定的程序集中解析出表示应用组成部分的一组ApplicationPart对象。

public abstract class ApplicationPartFactory
{
    public abstract IEnumerable<ApplicationPart> GetApplicationParts(Assembly assembly);
}

如下所示的
DefaultApplicationPartFactory
是ApplicationPartFactory最常用的派生类。如代码片段所示,DefaultApplicationPartFactory类型实现的GetDefaultApplicationParts方法返回的ApplicationPart集合中只包含根据指定程序集创建的AssemblyPart对象。

public class DefaultApplicationPartFactory : ApplicationPartFactory
{
    public static DefaultApplicationPartFactory Instance { get; } = new DefaultApplicationPartFactory();

    public override IEnumerable<ApplicationPart> GetApplicationParts(Assembly assembly) => GetDefaultApplicationParts(assembly);

    public static IEnumerable<ApplicationPart> GetDefaultApplicationParts(Assembly assembly)
    {
        yield return new AssemblyPart(assembly);
    }
}

值得一提的是,ApplicationPartFactory类型还定义了如上这个名为GetApplicationPartFactory的静态方法,它会返回指定程序集对应的ApplicationPartFactory对象。这个方法涉及到如下这个
ProvideApplicationPartFactoryAttribute
特性,我们可以利用这个特性注册一个ApplicationPartFactory类型。GetApplicationPartFactory方法首先会从指定的程序集中提取这样一个特性,如果该特性存在,该方法会根据其GetFactoryType方法返回的类型创建返回的ApplicationPartFactory对象,否则它最终返回的就是DefaultApplicationPartFactory类型的静态属性Instance返回的DefaultApplicationPartFactory对象。

public abstract class ApplicationPartFactory
{
    public static ApplicationPartFactory GetApplicationPartFactory(Assembly assembly)
    {
        var attribute = CustomAttributeExtensions.GetCustomAttribute<ProvideApplicationPartFactoryAttribute>(assembly);
        return attribute == null
            ? DefaultApplicationPartFactory.Instance
            : (ApplicationPartFactory)Activator.CreateInstance(attribute.GetFactoryType());
    }
}

三、IApplicationFeatureProvider & IApplicationFeatureProvider<TFeature>

了解当前应用由哪些部分组成不是我们的目的,我们最终的意图是从构成应用的所有组成部分中搜集我们想要的信息,比如整个应用范围的所有有效Controller类型。我们将这种需要在应用全局范围内收集的信息抽象为“
特性(Feature)
”,那么我们最终的目的就变成了:在应用全局范围内构建某个特性。如下这个没有任何成员定义的标记接口
IApplicationFeatureProvider
代表特性的构建者。

public interface IApplicationFeatureProvider
{}

我们一般将某种特性定义成一个对应的类型,所以有了如下这个
IApplicationFeatureProvider<TFeature>
类型,泛型参数TFeature代表需要构建的特性类型。如代码片段所示,该接口定义了唯一的PopulateFeature方法来“完善”预先创建的特性对象(feature参数),该方法作为输入的第一个参数(parts)表示应用所有组成部分的ApplicationPart对象集合。

public interface IApplicationFeatureProvider<TFeature> : IApplicationFeatureProvider
{
    void PopulateFeature(IEnumerable<ApplicationPart> parts, TFeature feature);
}

四、ControllerFeatureProvider

ControllerFeatureProvider
类型实现了IApplicationFeatureProvider<
ControllerFeature
>接口,也正是它帮助我们解析出应用范围内所有有效的Controller类型。作为特性类型的ControllerFeature具有如下的定义,从所有应用组成部分收集的Controller类型就被存放在Controllers属性返回的集合中。

public class ControllerFeature
{
    public IList<TypeInfo> Controllers { get; }
}

在正式介绍ControllerFeatureProvider针对有效Controller类型的解析逻辑之前,我们得先知道一个有效的Controller类型具有怎样的特性。“
约定优于配置
”是MVC框架的主要涉及原则,名称具有“
Controller
”后缀(不区分大小写)的类型会自动成为候选的Controller类型。如果某个类型的名称没有采用“Controller”后缀,倘若类型上面标注了
ControllerAttribute
特性,它依然是候选的Controller类型。用来定义Web API的ApiControllerAttribute是ControllerAttribute的派生类。

[AttributeUsage((AttributeTargets) AttributeTargets.Class,  AllowMultiple=false, Inherited=true)]
public class ControllerAttribute : Attribute
{}

[AttributeUsage((AttributeTargets) (AttributeTargets.Class | AttributeTargets.Assembly), AllowMultiple=false, Inherited=true)]
public class ApiControllerAttribute : ControllerAttribute, IApiBehaviorMetadata, IFilterMetadata
{}

除了满足上面介绍的命名约定或者特性标注要求外,一个有效的Controller类型必须是一个
公共

非抽象
的、
非泛型
实例类型
,所以非公有类型、静态类型、泛型类型和抽象类型均为无效的Controller类型。如果一个类型上标注了
NonControllerAttribute
特性,它自然也不是有效的Controller类型。由于NonControllerAttribute特性支持继承(Inherited=true),对于某个标注了该特性的类型来说,所有派生于它的类型都不是有效的Controller类型。

[AttributeUsage((AttributeTargets) AttributeTargets.Class, AllowMultiple=false, Inherited=true)]
public sealed class NonControllerAttribute : Attribute
{}

如下所示的是ControllerFeatureProvider类型的完整定义,上述的针对有效Controller类型的判断就是实现在IsController方法中。在实现的PopulateFeature方法中,它从提供的ApplicationPart对象中提取出对应类型同时实现了IApplicationPartTypeProvider接口的提取出来(AssemblyPart就实现了这个接口),然后从它们提供的类型中按照IsController方法提供的规则筛选出有效的Controller类型,并添加到ControllerFeature对象的Controllers属性返回的列表中。

public class ControllerFeatureProvider : IApplicationFeatureProvider<ControllerFeature>
{
    public void PopulateFeature(IEnumerable<ApplicationPart> parts, ControllerFeature feature)
    {
        foreach (var part in parts.OfType<IApplicationPartTypeProvider>())
        {
            foreach (var type in part.Types)
            {
                if (IsController(type) && !feature.Controllers.Contains(type))
                {
                    feature.Controllers.Add(type);
                }
            }
        }
    }

    protected virtual bool IsController(TypeInfo typeInfo)
    {
        if (!typeInfo.IsClass)
        {
            return false;
        }
        if (typeInfo.IsAbstract)
        {
            return false;
        }
        if (!typeInfo.IsPublic)
        {
            return false;
        }
        if (typeInfo.ContainsGenericParameters)
        {
            return false;
        }
        if (typeInfo.IsDefined(typeof(NonControllerAttribute)))
        {
            return false;
        }

        if (!typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase) && !typeInfo.IsDefined(typeof(ControllerAttribute)))
        {
            return false;
        }

        return true;
    }
}

五、ApplicationPartManager

在基于应用所有组成部分基础上针对某种特性的构建是通过ApplicationPartManager对象驱动实现的,我们很有必要了解该类型的完整定义。我们可以将表示应用组成部分的ApplicationPart对象添加到ApplicationParts属性表示的列表中,而FeatureProviders属性表示的列表则用于存储注册的IApplicationFeatureProvider对象。用于构建特性对象的PopulateFeature<TFeature>方法会实现了IApplicationFeatureProvider<TFeature>接口的IApplicationFeatureProvider提取出来,并调用其PopulateFeature方法完善指定的TFeature对象。

public class ApplicationPartManager
{
    public IList<IApplicationFeatureProvider> 	FeatureProviders { get; } = new List<IApplicationFeatureProvider>();
    public IList<ApplicationPart> ApplicationParts { get; } = new List<ApplicationPart>();

    public void PopulateFeature<TFeature>(TFeature feature)
    {
        foreach (var provider in FeatureProviders.OfType<IApplicationFeatureProvider<TFeature>>())
        {
            provider.PopulateFeature(ApplicationParts, feature);
        }
    }

    internal void PopulateDefaultParts(string entryAssemblyName)
    {
        var assemblies = GetApplicationPartAssemblies(entryAssemblyName);
        var seenAssemblies = new HashSet<Assembly>();
        foreach (var assembly in assemblies)
        {
            if (!seenAssemblies.Add(assembly))
            {
                continue;
            }
            var partFactory = ApplicationPartFactory.GetApplicationPartFactory(assembly);
            foreach (var applicationPart in partFactory.GetApplicationParts(assembly))
            {
                ApplicationParts.Add(applicationPart);
            }
        }
    }

    private static IEnumerable<Assembly> GetApplicationPartAssemblies(string entryAssemblyName)
    {
        var entryAssembly = Assembly.Load(new AssemblyName(entryAssemblyName));
        var assembliesFromAttributes = entryAssembly
            .GetCustomAttributes<ApplicationPartAttribute>()
            .Select(name => Assembly.Load(name.AssemblyName))
            .OrderBy(assembly => assembly.FullName, StringComparer.Ordinal)
            .SelectMany(GetAsemblyClosure);
        return GetAsemblyClosure(entryAssembly).Concat(assembliesFromAttributes);
    }

    private static IEnumerable<Assembly> GetAsemblyClosure(Assembly assembly)
    {
        yield return assembly;
        var relatedAssemblies = RelatedAssemblyAttribute
            .GetRelatedAssemblies(assembly, throwOnError: false)
            .OrderBy(assembly => assembly.FullName, StringComparer.Ordinal);
        foreach (var relatedAssembly in relatedAssemblies)
        {
            yield return relatedAssembly;
        }
    }
}

定义在ApplicationPartManager类型中的内部方法PopulateDefaultParts同样重要,该方法会根据指定的
入口程序集
名称来构建组成应用的所有ApplicationPart对象。PopulateDefaultParts方法构建的ApplicationPart对象类型都是AssemblyPart,所以如何得到组成当前应用的程序集成了该方法的核心逻辑,这一逻辑实现在GetApplicationPartAssemblies方法中。

如上面的代码片段所示,GetApplicationPartAssemblies方法返回的程序集除了包含指定的入口程序集之外,还包括通过标注在入口程序集上的
ApplicationPartAttribute
特性指定的程序集。除此之外,如果前面这些程序集通过标注如下这个
RelatedAssemblyAttribute
特性指定了关联程序集,这些程序集同样会包含在返回的程序集列表中。

[AttributeUsage((AttributeTargets) AttributeTargets.Assembly, AllowMultiple=true)]
public sealed class RelatedAssemblyAttribute : Attribute
{
    public string AssemblyFileName { get; }
    public RelatedAssemblyAttribute(string assemblyFileName);
    public static IReadOnlyList<Assembly> GetRelatedAssemblies(Assembly assembly, bool throwOnError);
}

从PopulateDefaultParts方法的定义可以看出,我们可以在程序集上标注ApplicationPartAttribute和RelatedAssemblyAttribute特性的方式将非入口程序集作为应用ApplicationPart。这里需要着重强调的是:
ApplicationPartAttribute特性只能标注到入口程序集中
,而RelatedAssemblyAttribute特性只能标注到入口程序集以及ApplicationPartAttribute特性指向的程序集上,该特性
不具有可传递性
。以图1为例,我们在入口程序集A上标注了一个指向程序集B的ApplicationPartAttribute特性,同时在程序集B和C上标注了一个分别指向程序集C和D的RelatedAssemblyAttribute特性,那么作为应用ApplicationPart的程序集只包含A、B和C。

image

图1RelatedAssemblyAttribute不具有可传递性

六、设计总览

综上所述,一个应用可以分解成一组代表应用组成部分的ApplicationPart对象,派生的AssemblyPart类型体现了针对程序集的应用分解维度,它实现了IApplicationPartTypeProvider接口并将程序集中定义的类型输出到实现的Types属性中。作为创建ApplicationPart对象的工厂,抽象类ApplicationPartFactory旨在提供由指定程序集承载的所有ApplicationPart对象,派生于该抽象类的DefaultApplicationPartFactory类型最终创建的是根据指定程序集创建的AssemblyPart对象。

image

图2 ApplicationPartManager及其相关类型

我们可以利用ApplicationPartManager对象针对组成当前应用的ApplicationPart对象上构建某种类型的特性。具体的特性构建通过注册的一个或者多个IApplicationFeatureProvider对象完成,针对具体特类型的IApplicationFeatureProvider<TFeature>接口派生于该接口。针对Controller类型的提取实现在ControllerFeatureProvider类型中,它实现了IApplicationFeatureProvider<ControllerFeature>接口,提取出来的Controller类型就封装在ControllerFeature对象中。这里提及的接口、类型以及它们之间的关系体现在如图2所示的UML中。

七、有效Controller类型的提取

前面的内容告诉我们,利用ApplicationPartManager对象并借助注册的ControllerFeatureProvider可以帮助我们成功解析出当前应用范围内的所有Controller类型。那么MVC框架用来解析有效Controller类型的是怎样一个ApplicationPartManager对象呢?

ApplicationPartManager会作为MVC框架的核心服务被注册到依赖注入框架中。如下面的代码片段所示,当AddMvcCore扩展方法被执行的时候,它会重用已经注册的ApplicationPartManager实例。如果这样的服务实例不曾注册过,该方法会创建一个ApplicationPartManager对象。AddMvcCore方法接下来会提取出表示当前承载上下文的IWebHostEnvironment对象,并将其ApplicationName属性作为入口程序集调用ApplicationPartManager对象的内部方法PopulateDefaultParts构建出组成当前应用的所有ApplicationPart(AssemblyPart)。

public static class MvcCoreServiceCollectionExtensions
{
    public static IMvcCoreBuilder AddMvcCore(this IServiceCollection services)
    {
        …
        var manager = GetServiceFromCollection<ApplicationPartManager>(services);
        if (manager == null)
        {
            manager = new ApplicationPartManager();
            IWebHostEnvironment environment = GetServiceFromCollection<IWebHostEnvironment>(services);
            var applicationName = environment?.ApplicationName;
            if (!string.IsNullOrEmpty(applicationName))
            {
                manager.PopulateDefaultParts(applicationName);
            }
        }
        if (!manager.FeatureProviders.OfType<ControllerFeatureProvider>().Any())
        {
            manager.FeatureProviders.Add(new ControllerFeatureProvider());
        }
        services.TryAddSingleton(manager);
        return new MvcCoreBuilder(services, applicationPartManager);
    }
    private static T GetServiceFromCollection<T>(IServiceCollection services)
    {
        return (T)services.LastOrDefault(d => d.ServiceType == typeof(T))?.ImplementationInstance;
    }
}

接下来用于解析Controller类型的ControllerFeatureProvider对象会被创建出来并注册到ApplicationPartManager对象上。这个ApplicationPartManager对象将作为单例服务被注册到依赖注入框架中。面向Controller的MVC编程模型利用
ControllerActionDescriptorProvider
对象来提供描述Action元数据的ActionDescriptor对象。如下面的代码片段所示,该类型的构造函数中注入了两个对象,其中ApplicationPartManager对象用来提取当前应用所有有效的Controller类型,ApplicationModelFactory对象则在此基础上进一步构建出MVC应用模型(Application Model),Action元数据就是根据此应用模型创建出来的。具体来说,针对Controller类型的解析实现在私有方法GetControllerTypes中。

internal class ControllerActionDescriptorProvider : IActionDescriptorProvider
{
    public int Order => -1000;
    private readonly ApplicationPartManager 	_partManager;
    private readonly ApplicationModelFactory	_applicationModelFactory;

    public ControllerActionDescriptorProvider(ApplicationPartManager partManager, ApplicationModelFactory applicationModelFactory)
    {
        _partManager 			= partManager;
        _applicationModelFactory 	= applicationModelFactory;
    }

    public void OnProvidersExecuted(ActionDescriptorProviderContext context);
    public void OnProvidersExecuting(ActionDescriptorProviderContext context);

    private IEnumerable<TypeInfo> GetControllerTypes()
    {
        var feature = new ControllerFeature();
        _partManager.PopulateFeature<ControllerFeature>(feature);
        return (IEnumerable<TypeInfo>) feature.Controllers;
    }
}

相关文章

数据库系列:MySQL慢查询分析和性能优化
数据库系列:MySQL索引优化总结(综合版)
数据库系列:高并发下的数据字段变更
数据库系列:覆盖索引和规避回表
数据库系列:数据库高可用及无损扩容
数据库系列:使用高区分度索引列提升性能
数据库系列:前缀索引和索引长度的取舍
数据库系列:MySQL引擎MyISAM和InnoDB的比较
数据库系列:InnoDB下实现高并发控制
数据库系列:事务的4种隔离级别
数据库系列:RR和RC下,快照读的区别
数据库系列:MySQL InnoDB锁机制介绍
数据库系列:MySQL不同操作分别用什么锁?
数据库系列:业内主流MySQL数据中间件梳理

1 背景

互联网大厂的业务场景,业务流量的规模都可以达到千万甚至亿级别,这时候简单的数据层连接和调用无法承接业务规模的需求。
需要数据库中间件来管理一些中间桥接工作,类似 连接池、负载均衡、故障隔离、监控预警等工作,大幅度提高性能和稳定性,应对大流量的冲击。
下面我们具体来看看,我们使用数据库中间件具体解决哪些存在痛点的业务场景。

2 数据库中间件解决的问题和实现方案

2.1 连接池管理

2.1.1 业务场景

客户端连接无限制,不可复用,不可隔离,业务间互相影响(如单个服务超载调用可能导致雪崩)

2.1.2 解决方案

归口到Proxy统一管理,提供连接池、连接数限制、重试、超时断开保护等能力。

image

如上,各业务线统一读写访问入口,所以业务连接池不会有

db1.read:3306,db2.write.com:3306

而是统一只有

db.all:3306

Service服务对db的访问,统一到一个入口上来了,由中间层来对请求进行智能路由、连接池管理、限流、重试、超时等工作。这种模式解决了业务直接管理数据实现层的问题。

2.2 读写分离机制

2.2.1 业务场景

需要为业务提供读写分离机制,而不是由业务花费精力去维护

2.2.2 解决方案

数据服务存在主从部署模式,业务在路有时Porxy自动区分读写,写路由到主存储服务(master),读路由到从存储服务(slave)
对业务是透明的,业务开发的同学无需关注读写的隔离。
image

这边可以看到,Proxy会自动将读和写操作分流,业务开发人员无需关注即可。

2.3 负载均衡策略

2.3.1 业务场景

需要为数据库分片提供负载均衡机制,如果多个读库甚至多个写库,需要一定的负载策略,类似VIP

2.3.2 解决方案

归口到Proxy统一管理,提供负载均衡能力,包括但不限于:

  • 轮询(Round Robin)
  • 来源 IP 哈希(IP Hash)
  • 最少连接(Least Connection)
  • 加权轮询(Weighted Round Robin)
  • 加权最少连接(Weighted Least Connection)
  • 随机策略

image

2.4 数据库分库分表sharding

2.4.1 业务场景

数据量比较大的场景下,需提供业务分库分表能力

2.4.2 解决方案

调用方屏蔽分表细节,跟单表操作一直。Proxy实现对分库分表的核心细节,不同大部分中间件会有一些限制(如带分表字段,不支持跨库join等)
image

从上面的图中可以看到,Proxy在逻辑表和物理表中间起桥接作用,开发同学不需要知道太多细节,也不需要改变访问的SQL脚本。

2.5 故障切换

2.5.1 业务场景

主/备发生异常、宕机等故障,需业务来处理故障切换

2.5.2 解决方案

屏蔽后端DB故障问题,自动故障切换

  1. master 故障,slave切换为master
    image

  2. slave 故障,多副本模式下进行异常副本的驱逐
    image

  3. slave 故障,单实例情况下读写都调度到master
    image

2.6 安全保障等

常见问题 说明 解决方案
权限 1. 账号权限放太开,没有最小化权限;
2.机器迁移等都需手动申请权限较麻烦
1. 通过user/pwds/ip等做权限控制,最小化权限。
2. 机器迁移自动权限扩缩容。
安全 需业务自己处理sql注入,核心数据加密等问题 Proxy增加一层保护机制,包含sql注入,sql黑名单(如sleep/drop),数据加密等
日志审计 MySQL 数据被修改了,无法确认被谁修改的 提供便捷的日志审计,日志流控和动态变更,方便定位查询

2.7 监控和预警

2.7.1 业务场景

需业务根据自己需求搭建监控系统

2.7.2 解决方案

提供诸多监控和预警能力,包括但不限于:
连接数监测、慢查询数统计、慢请求Sql采集、Sql语句统计、索引命中率分析、索引分析、DB负载、所属主机性能分析
image

2.8 备份和恢复能力

2.8.1 业务场景

Proxy需要为业务提供备份能力,通过简单配置即可,避免业务花费较大精力关注

2.8.2 解决方案

  • 提供多种类备份策略,一键配置
    • 全量备份
    • 增量备份
  • 备份模式
    • 定时备份
    • 手动备份
  • 通过管控平台功能快速恢复数据
    • 指定范围恢复,精确到表
    • 指定时间恢复,精确到秒

参考以下备份流程:
image

3 总结

本文介绍了互联网场景数据库中间件主要解决的业务难题,后续有时间我们详细介绍下下业内评价较高的几种数据库中间件,如 ProxySQL、 DBProxy、TDDL 等。

前一段时间我用 WPF 开发了一个查看 emoji 表情的小工具
https://github.com/he55/EmojiViewer
,由于最近我使用 macOS 系统比较多,我想能在 macOS 系统上也能使用这个工具。于是我尝试将 WPF 应用迁移到 Electron 框架,感觉这个框架很强大,在这里记录一下应用迁移的过程。

安装 Electron 环境

  • 安装 nodejs。到官网
    https://nodejs.org/en
    下载最新的 nodejs,然后安装
  • 打开命令行输入
    git clone https://github.com/electron/electron-quick-start.git
    命令克隆 Electron 模板项目,使用模板可以快速搭建应用。
  • 然后使用
    cd electron-quick-start
    目录进入到目录,接着运行
    npm install
    命令还原项目。
  • 使用 vscode 打开文件夹,项目文件如下

编写代码

  • Electron 分为主进程和渲染进程,对文件、系统和窗口的操作需要在主线程,界面渲染在渲染进程。创建窗口属于主进程的工作,需要到
    main.js
    文件编写代码。创建窗口使用
    BrowserWindow
    对象,
    width

    height
    分别设置窗口宽度和高度,
    autoHideMenuBar
    设置是否隐藏菜单,最后使用
    loadFile
    加载页面文件并显示窗口。
function createWindow() {
  const mainWindow = new BrowserWindow({
    width: 915,
    height: 560,
    autoHideMenuBar: true,
    webPreferences: {
      preload: path.join(__dirname, 'preload.js')
    }
  })

  mainWindow.loadFile('index.html')
}
  • 监听
    whenReady
    事件,等待应用初始化完成后显示窗口
app.whenReady().then(() => {
  createWindow()

  app.on('activate', function () {
    if (BrowserWindow.getAllWindows().length === 0) createWindow()
  })
})
  • 修改
    index.html
    文件,界面部分使用了 vue 进行渲染
<!DOCTYPE html>
<html>

<head>
  <meta charset="UTF-8">
  <script src="vue.global.js"></script>
  <link href="./styles.css" rel="stylesheet">
  <title>EmojiViewer</title>
</head>

<body>
  <div id="app" class="container">
    <ul class="left">
      <li v-for="(item, key) in categories" :class="{active: item.isActive}" @click="catetoryItemClick(item)">{{ key }}</li>
    </ul>
    <ul class="main" ref="mainElement">
      <li v-for="emoji in emojis" :class="{active: emoji.isActive}" @click="emojiItemClick(emoji)">
        <img :src="emoji.previewImage" alt="">
        <p>{{emoji.name}}</p>
      </li>
    </ul>
    <div class="right">
      <img :src="selectedEmoji.previewImage">
      <p>{{ selectedEmoji.name }}</p>
      <button @click="copyEmoji(selectedEmoji)" type="button">Copy Emoji</button>
      <button @click="copyImage(selectedEmoji)" type="button">Copy Image</button>
      <button @click="openFile(selectedEmoji)" type="button">Open File</button>
    </div>
  </div>

  <script src="./renderer.js"></script>
</body>

</html>

  • renderer.js
    文件中编写页面处理代码
window.addEventListener('DOMContentLoaded', async () => {
    const { createApp, ref, onMounted } = Vue
    let emojiData = await ipc.getData()

    createApp({
        setup() {
            const mainElement = ref(null)

            const categories = ref(emojiData)
            const emojis = ref([])
            const selectedEmoji = ref({})

            function copyEmoji(emoji) {
                ipc.ipc('writeText', emoji.metadata.glyph)
            }
            function copyImage(emoji) {
                ipc.ipc('writeImage', emoji.previewImage)
            }
            function openFile(emoji) {
                ipc.ipc('showItemInFolder', emoji.previewImage)
            }

            let lastSelectedEmojis
            function catetoryItemClick(items) {
                if (lastSelectedEmojis) {
                    lastSelectedEmojis.isActive = false
                }

                items.isActive = true
                lastSelectedEmojis = items

                // const main = document.querySelector('.main')
                mainElement.value.scrollTop = 0
                emojis.value = items
            }

            function emojiItemClick(emoji) {
                if (selectedEmoji.value) {
                    selectedEmoji.value.isActive = false
                }

                emoji.isActive = true
                selectedEmoji.value = emoji
            }

            onMounted(() => {
                catetoryItemClick(emojiData['Activities'])
                emojiItemClick(emojiData['Activities'][0])
            })

            return {
                mainElement,
                categories,
                emojis,
                selectedEmoji,
                catetoryItemClick,
                emojiItemClick,
                copyEmoji,
                copyImage,
                openFile,
            }
        }
    }).mount('#app')
})
  • 读取文件,node 提供了文件操作相关的 api 可以很方便的操作文件系统。
function loadData(assetPath) {
  const dirs = fs.readdirSync(assetPath)
  const data = []
  const groupData = {}
  for (const dir of dirs) {
    const fullPath = path.resolve(assetPath, dir)
    const metadata = require(path.resolve(fullPath, 'metadata.json'))
    let previewImage

    let imagePaths = [path.resolve(fullPath, '3D'), path.resolve(fullPath, 'Default', '3D')]
    for (const imagePath of imagePaths) {
      if (fs.existsSync(imagePath)) {
        let files = fs.readdirSync(imagePath)
        if (files.length === 0)
          return
        previewImage = path.resolve(imagePath, files[0])
      }
    }

    const { unicode, group } = metadata
    const obj = {
      metadata,
      id: unicode,
      name: dir,
      previewImage,
    }
    data.push(obj)

    if (!groupData[group])
      groupData[group] = []
    groupData[group].push(obj)
  }
  return groupData
}

完整代码(WPF 版本)
https://github.com/he55/EmojiViewer
完整代码(vue 版本)
https://github.com/he55/web-learn/tree/main/9.electron-emoji-viewer(vue)
完整代码(js 原生版本)
https://github.com/he55/web-learn/tree/main/6.electron-emoji-viewer

树、森林

树的存储结构

双亲表示法

image

双亲表示法的存储结构

#define MAX_TREE_SIZE 100
typedef struct {
    int data;
    int parent;
}PTNode;
typedef struct {
    PTNode nodes[MAX_TREE_SIZE];
    int n;
}PTree;

【注】
区别树的顺序存储结构与二叉树的顺序存储结构。在树的顺序存储结构中,数组下标代表节点的编号,下标中所存的内容指示了节点之间的关系。而在二叉树的顺序存储结构中, 数组下标既表达了节点的编号,又指示了二叉树中节点之间的关系。当然,二叉树属于树,因此二叉树也可以用树的存储结构来存储,但树却不能都用都用二叉树的存储结构来存储。

孩子表示法

孩子表示法是将每个节点的孩子节点视为一个线性表,且以单链表作为存储结构,则
\(n\)
个节点就有
\(n\)
个孩子链表(叶节点的孩子链表为空表)。而
\(n\)
个头指针又组成一个线性表。

与双亲表示法相反,孩子表示法寻找孩子的操作非常方便,而寻找双亲的操作则需要遍历
\(n\)
个节点中孩子链表指针域所指向的
\(n\)
个孩子链表。

孩子兄弟表示法

又称
二叉树表示法
,即以二叉链表作为树的存储结构。孩子兄弟表示法使每个节点包括三个部分的内容:
节点值、指向节点第一个孩子节点的指针、指向节点下一个兄弟节点的指针

结构体如下:

typedef struct CSNode {
    inr data;
    struct CSNode *firstchild, *nextsibling;
}CSNode, *CSTree;

树、森林、二叉树的转换

树转换为二叉树

(1)在兄弟节点之间画一条线;

(2)对每个节点,只保留它与第一个孩子的连线,而与其他孩子的连线全部删除;

(3)以树根为轴心,顺时针旋转45°。

image
image

以下代码引用我大佬同学:作者:Amαdeus,出处:https://www.cnblogs.com/MAKISE004/p/17089756.html

//树 转化为 二叉树
BinaryTree CSTree_Transform_to_BinaryTree(CSTree ct){
 if(!ct) return NULL;

 BinaryTree T = (BinaryTree)malloc(sizeof(BiNode));
 T->data = ct->data;
 //相当于将left变为firstchild, 将right变为nextsibling 本质的形态没有改变
 T->leftchild = CSTree_Transform_to_BinaryTree(ct->firstchild);
 T->rightchild = CSTree_Transform_to_BinaryTree(ct->nextsibling);

 return T;
}

森林转换为二叉树

(1)将森林中的每棵树转换成相应的二叉树;

(2)每棵树的根视为兄弟关系,加上连线;

(3)以第一棵树的根为轴心顺时针旋转45°。

以下代码引用我大佬同学:作者:Amαdeus,出处:https://www.cnblogs.com/MAKISE004/p/17089756.html

//森林 转化为 二叉树
BinaryTree Forest_Transform_to_BinaryTree(CSTree ct[], int low, int high){
 if(low > high)  return NULL;

 //每个树变成二叉树
 BinaryTree T = CSTree_Transform_to_BinaryTree(ct[low]);  
 //通过rightchild连接每一个二叉树的根节点
 T->rightchild = Forest_Transform_to_BinaryTree(ct, low + 1, high);

 return T;
}

树、森林的遍历

树的遍历

  • 先根遍历。若树非空,则按如下规则遍历:


    • 先访问根节点
    • 再依次遍历根节点的每棵子树,遍历子树时仍遵循先根后子树的规则
  • 后根遍历。若树非空,则按如下规则遍历:


    • 先依次遍历根节点的每棵子树,遍历子树时仍遵循先子树后根的规则
    • 再访问根节点

树的先根遍历与对应二叉树的先序序列相同,树的后根遍历与对应二叉树的中序序列相同。

森林的遍历

  • 先序遍历森林。若森林非空,则按如下规则遍历:


    • 访问森林中第一棵树的根节点
    • 先序遍历第一棵树中根节点的子树森林
    • 先序遍历除去第一棵树之后剩余的树构成的森林
  • 中序遍历森林。若森林非空,则按如下规则遍历:


    • 中序遍历森林中第一棵树的根节点的子树森林
    • 访问第一棵树的根节点
    • 中序遍历粗去第一棵树之后剩余的树构成的森林

树与二叉树的应用

哈夫曼树和哈夫曼编码

几个概念

  • 路径:树中一个节点到另一个节点之间的分支构成

  • 路径长度:路径上的分支数目

  • 权:树中节点被赋予的一个表示某种意义的数值

  • 带权路径长度:从树的根到一个节点的路径长度与该节点上权值的乘积


    \[WPL = \sum_{i=1}^{n}w_il_i
    \]

    其中,
    \(w_i\)
    是第
    \(i\)
    个叶节点所带的权值,
    \(l_i\)
    是该叶节点到根节点的路径长度

在含有
\(n\)
个带权叶节的二叉树中,其中带权路径长度最小的二叉树称为
哈夫曼树

下面看个算法:递归求WPL

int getWPL(struct TreeNode *root, int depth) {
    if (root == NULL) { // 如果节点为空,返回0
        return 0;
    }
    if (root->left == NULL && root->right == NULL) { // 如果节点是叶子节点,返回带权路径长度
        return depth * root->val;
    }
    // 如果节点不是叶子节点,递归计算左子树和右子树的WPL,并相加返回
    return getWPL(root->left, depth + 1) + getWPL(root->right, depth + 1);
}

接下来咱们举个栗子,来看一下哈夫曼编码

image

看题:来自北邮考研机试

3531. 哈夫曼树 - AcWing题库

题解:

// 优先队列求哈夫曼树最短带权路径长度
#include<bits/stdc++.h>
using namespace std;

int main() {
    int n;
    cin >> n;
    priority_queue<int, vector<int>, greater<int> > q;
    for(int i = 0; i < n; i ++) {
        int x;
        cin >> x;
        q.push(x);
    }
    int ans = 0;
    while(q.size() > 1) {
        int t1 = q.top();
        q.pop();
        int t2 = q.top();
        q.pop();
        q.push(t1 + t2);
        ans += t1 + t2;
    }
    cout << ans << endl;
    return 0;
}

并查集

这里不想做太多解释,我们看一下y总的模版(写的时候已经快要零点了,第二天还要早起)
(PS:第二天果然多睡了半个小时)

(1)朴素并查集:

    int p[N]; //存储每个点的祖宗节点

    // 返回x的祖宗节点
    int find(int x) {
        if (p[x] != x) p[x] = find(p[x]);
        return p[x];
    }

    // 初始化,假定节点编号是1~n
    for (int i = 1; i <= n; i ++ ) p[i] = i;

    // 合并a和b所在的两个集合:
    p[find(a)] = find(b);


(2)维护size的并查集:

    int p[N], size[N];
    //p[]存储每个点的祖宗节点, size[]只有祖宗节点的有意义,表示祖宗节点所在集合中的点的数量

    // 返回x的祖宗节点
    int find(int x) {
        if (p[x] != x) p[x] = find(p[x]);
        return p[x];
    }

    // 初始化,假定节点编号是1~n
    for (int i = 1; i <= n; i ++ ) {
        p[i] = i;
        size[i] = 1;
    }

    // 合并a和b所在的两个集合:
    size[find(b)] += size[find(a)];
    p[find(a)] = find(b);


(3)维护到祖宗节点距离的并查集:

    int p[N], d[N];
    //p[]存储每个点的祖宗节点, d[x]存储x到p[x]的距离

    // 返回x的祖宗节点
    int find(int x) {
        if (p[x] != x) {
            int u = find(p[x]);
            d[x] += d[p[x]];
            p[x] = u;
        }
        return p[x];
    }

    // 初始化,假定节点编号是1~n
    for (int i = 1; i <= n; i ++ ) {
        p[i] = i;
        d[i] = 0;
    }

    // 合并a和b所在的两个集合:
    p[find(a)] = find(b);
    d[find(a)] = distance; // 根据具体问题,初始化find(a)的偏移量

作者:yxc
链接:https://www.acwing.com/blog/content/404/
来源:AcWing
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

836. 合并集合 - AcWing题库

#include<bits/stdc++.h>
using namespace std;

const int N = 100010;
int p[N];//定义多个集合

int find(int x) {
    if(p[x] != x) p[x] = find(p[x]);
    /*
    经上述可以发现,每个集合中只有祖宗节点的p[x]值等于他自己,即:
    p[x]=x;
    */
    return p[x];
    //找到了便返回祖宗节点的值
}C

int main() {
    int n, m;
    scanf("%d%d", &n, &m);
    for(int i = 1; i <= n; i ++) p[i]=i;
    while(m --) {
        char op[2];
        int a, b;
        scanf("%s%d%d", op, &a, &b);
        if(*op == 'M') p[find(a)] = find(b);//集合合并操作
        else
        	if(find(a)==find(b))
        	//如果祖宗节点一样,就输出yes
        		printf("Yes\n");
       	 	else
        		printf("No\n");
    }
    return 0;
}

一、摘要

在之前的文章中,我们介绍了生产者和消费者模型的最基本实现思路,相信大家对它已经有一个初步的认识。

在 Java 的并发包里面还有一个非常重要的接口:BlockingQueue。

BlockingQueue
是一个阻塞队列,更为准确的解释是:
BlockingQueue
是一个基于阻塞机制实现的线程安全的队列。通过它也可以实现生产者和消费者模型,并且效率更高、安全可靠,相比之前介绍的生产者和消费者模型,它可以同时实现生产者和消费者并行运行。

那什么是阻塞队列呢?

简单的说,就是当参数在入队和出队时,通过加锁的方式来避免线程并发操作时导致的数据异常问题。

在 Java 中,能对线程并发执行进行加锁的方式主要有
synchronized

ReentrantLock
,其中
BlockingQueue
采用的是
ReentrantLock
方式实现。

与此对应的还有非阻塞机制的队列,主要是采用 CAS 方式来控制并发操作,例如:
ConcurrentLinkedQueue
,这个我们在后面的文章再进行分享介绍。

今天我们主要介绍
BlockingQueue
相关的知识和用法,废话不多说了,进入正题!

二、BlockingQueue 方法介绍

打开
BlockingQueue
的源码,你会发现它继承自
Queue
,正如上文提到的,它本质是一个队列接口。

public interface BlockingQueue<E> extends Queue<E> {
	//...省略
}

关于队列,我们在之前的集合系列文章中对此有过深入的介绍,本篇就再次简单的介绍一下。

队列其实是一个数据结构,元素遵循先进先出的原则,所有新元素的插入,也被称为入队操作,会插入到队列的尾部;元素的移除,也被称为出队操作,会从队列的头部开始移除,从而保证先进先出的原则。


Queue
接口中,总共有 6 个方法,可以分为 3 类,分别是:插入、移除、查询,内容如下:

方法 描述
add(e) 插入元素,如果插入失败,就抛异常
offer(e) 插入元素,如果插入成功,就返回 true;反之 false
remove() 移除元素,如果移除失败,就抛异常
poll() 移除元素,如果移除成功,返回 true;反之 false
element() 获取队首元素,如果获取结果为空,就抛异常
peek() 获取队首元素,如果获取结果为空,返回空对象

因为
BlockingQueue

Queue
的子接口,了解
Queue
接口里面的方法,有助于我们对
BlockingQueue
的理解。

除此之外,
BlockingQueue
还单独扩展了一些特有的方法,内容如下:

方法 描述
put(e) 插入元素,如果没有插入成功,线程会一直阻塞,直到队列中有空间再继续
offer(e, time, unit) 插入元素,如果在指定的时间内没有插入成功,就返回 false;反之 true
take() 移除元素,如果没有移除成功,线程会一直阻塞,直到队列中新的数据被加入
poll(time, unit) 移除元素,如果在指定的时间内没有移除成功,就返回 false;反之 true
drainTo(Collection c, int maxElements) 一次性取走队列中的数据到 c 中,可以指定取的个数。该方法可以提升获取数据效率,不需要多次分批加锁或释放锁

分析源码,你会发现相比普通的
Queue
子类,
BlockingQueue
子类主要有以下几个明显的不同点:

  • 1.元素插入和移除时线程安全:主要是通过在入队和出队时进行加锁,保证了队列线程安全,加锁逻辑采用
    ReentrantLock
    实现
  • 2.支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素;同时支持超时机制,防止线程一直阻塞

三、BlockingQueue 用法详解

打开源码,
BlockingQueue
接口的实现类非常多,我们重点讲解一下其中的 5 个非常重要的实现类,分别如下表所示。

实现类 功能
ArrayBlockingQueue 基于数组的阻塞队列,使用数组存储数据,需要指定长度,所以是一个有界队列
LinkedBlockingQueue 基于链表的阻塞队列,使用链表存储数据,默认是一个无界队列;也可以通过构造方法中的
capacity
设置最大元素数量,所以也可以作为有界队列
SynchronousQueue 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费
PriorityBlockingQueue 基于优先级别的阻塞队列,底层基于数组实现,是一个无界队列
DelayQueue 延迟队列,其中的元素只有到了其指定的延迟时间,才能够从队列中出队

下面我们对以上实现类的用法,进行一一介绍。

3.1、ArrayBlockingQueue

ArrayBlockingQueue
是一个基于数组的阻塞队列,初始化的时候必须指定队列大小,源码实现比较简单,采用的是
ReentrantLock

Condition
实现生产者和消费者模型,部分核心源码如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

	/** 使用数组存储队列中的元素 */
	final Object[] items;

	/** 使用独占锁ReetrantLock */
	final ReentrantLock lock;

	/** 等待出队的条件 */
	private final Condition notEmpty;

	/** 等待入队的条件 */
	private final Condition notFull;

	/** 初始化时,需要指定队列大小 */
	public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /** 初始化时,也指出指定是否为公平锁, */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**入队操作*/
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    /**出队操作*/
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

ArrayBlockingQueue
采用
ReentrantLock
进行加锁,只有一个
ReentrantLock
对象,这意味着生产者和消费者无法并行运行。

我们看一个简单的示例代码如下:

public class Container {

    /**
     * 初始化阻塞队列
     */
    private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

    /**
     * 添加数据到阻塞队列
     * @param value
     */
    public void add(Integer value) {
        try {
            queue.put(value);
            System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 从阻塞队列获取数据
     */
    public void get() {
        try {
            Integer value = queue.take();
            System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
/**
 * 生产者
 */
public class Producer extends Thread {

    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.add(i);
        }
    }
}
/**
 * 消费者
 */
public class Consumer extends Thread {

    private Container container;

    public Consumer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.get();
        }
    }
}
/**
 * 测试类
 */
public class MyThreadTest {

    public static void main(String[] args) {
        Container container = new Container();

        Producer producer = new Producer(container);
        Consumer consumer = new Consumer(container);

        producer.start();
        consumer.start();
    }
}

运行结果如下:

生产者:Thread-0,add:0
生产者:Thread-0,add:1
生产者:Thread-0,add:2
生产者:Thread-0,add:3
生产者:Thread-0,add:4
生产者:Thread-0,add:5
消费者:Thread-1,value:0
消费者:Thread-1,value:1
消费者:Thread-1,value:2
消费者:Thread-1,value:3
消费者:Thread-1,value:4
消费者:Thread-1,value:5

可以很清晰的看到,生产者线程执行完毕之后,消费者线程才开始消费。

3.2、LinkedBlockingQueue

LinkedBlockingQueue
是一个基于链表的阻塞队列,初始化的时候无须指定队列大小,默认队列长度为
Integer.MAX_VALUE
,也就是 int 型最大值。

同样的,采用的是
ReentrantLock

Condition
实现生产者和消费者模型,不同的是它使用了两个
lock
,这意味着生产者和消费者可以并行运行,程序执行效率进一步得到提升。

部分核心源码如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 使用出队独占锁ReetrantLock */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 等待出队的条件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 使用入队独占锁ReetrantLock */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 等待入队的条件 */
    private final Condition notFull = putLock.newCondition();

    /**入队操作*/
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    /**出队操作*/
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
}

把最上面的样例
Container
中的阻塞队列实现类换成
LinkedBlockingQueue
,调整如下:

/**
 * 初始化阻塞队列
 */
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

再次运行结果如下:

生产者:Thread-0,add:0
消费者:Thread-1,value:0
生产者:Thread-0,add:1
消费者:Thread-1,value:1
生产者:Thread-0,add:2
消费者:Thread-1,value:2
生产者:Thread-0,add:3
生产者:Thread-0,add:4
生产者:Thread-0,add:5
消费者:Thread-1,value:3
消费者:Thread-1,value:4
消费者:Thread-1,value:5

可以很清晰的看到,生产者线程和消费者线程,交替并行执行。

3.3、SynchronousQueue

SynchronousQueue
是一个没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费,相当于传统的一个请求对应一个应答模式。

相比
ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue
实现机制也不同,它主要采用队列和栈来实现数据的传递,中间不存储任何数据,生产的数据必须得消费者处理,线程阻塞方式采用 JDK 提供的
LockSupport park/unpark
函数来完成,也支持公平和非公平两种模式。

  • 当采用公平模式时:使用一个 FIFO 队列来管理多余的生产者和消费者
  • 当采用非公平模式时:使用一个 LIFO 栈来管理多余的生产者和消费者,这也是
    SynchronousQueue
    默认的模式

部分核心源码如下:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    /**不同的策略实现*/
    private transient volatile Transferer<E> transferer;

	/**默认非公平模式*/
    public SynchronousQueue() {
        this(false);
    }

    /**可以选策略,也可以采用公平模式*/
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

	/**入队操作*/
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

    /**出队操作*/
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
}

同样的,把最上面的样例
Container
中的阻塞队列实现类换成
SynchronousQueue
,代码如下:

public class Container {

    /**
     * 初始化阻塞队列
     */
    private final BlockingQueue<Integer> queue = new SynchronousQueue<>();


    /**
     * 添加数据到阻塞队列
     * @param value
     */
    public void add(Integer value) {
        try {
            queue.put(value);
            Thread.sleep(100);
            System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    /**
     * 从阻塞队列获取数据
     */
    public void get() {
        try {
            Integer value = queue.take();
            Thread.sleep(200);
            System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

再次运行结果如下:

生产者:Thread-0,add:0
消费者:Thread-1,value:0
生产者:Thread-0,add:1
消费者:Thread-1,value:1
生产者:Thread-0,add:2
消费者:Thread-1,value:2
生产者:Thread-0,add:3
消费者:Thread-1,value:3
生产者:Thread-0,add:4
消费者:Thread-1,value:4
生产者:Thread-0,add:5
消费者:Thread-1,value:5

可以很清晰的看到,生产者线程和消费者线程,交替串行执行,生产者每投递一条数据,消费者处理一条数据。

3.4、PriorityBlockingQueue

PriorityBlockingQueue
是一个基于优先级别的阻塞队列,底层基于数组实现,可以认为是一个无界队列。

PriorityBlockingQueue

ArrayBlockingQueue
的实现逻辑,基本相似,也是采用
ReentrantLock
来实现加锁的操作。

最大不同点在于:

  • 1.
    PriorityBlockingQueue
    内部基于数组实现的最小二叉堆算法,可以对队列中的元素进行排序,插入队列的元素需要实现
    Comparator
    或者
    Comparable
    接口,以便对元素进行排序
  • 2.其次,队列的长度是可扩展的,不需要显式指定长度,上限为
    Integer.MAX_VALUE - 8

部分核心源码如下:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

 	/**队列元素*/
    private transient Object[] queue;

    /**比较器*/
    private transient Comparator<? super E> comparator;

    /**采用ReentrantLock进行加锁*/
    private final ReentrantLock lock;

    /**条件等待与通知*/
    private final Condition notEmpty;

    /**入队操作*/
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

    /**出队操作*/
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
}

同样的,把最上面的样例
Container
中的阻塞队列实现类换成
PriorityBlockingQueue
,调整如下:

/**
 * 初始化阻塞队列
 */
private final BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

生产者插入数据的内容,我们改下插入顺序。

/**
 * 生产者
 */
public class Producer extends Thread {

    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        container.add(5);
        container.add(3);
        container.add(1);
        container.add(2);
        container.add(0);
        container.add(4);
    }
}

最后运行结果如下:

生产者:Thread-0,add:5
生产者:Thread-0,add:3
生产者:Thread-0,add:1
生产者:Thread-0,add:2
生产者:Thread-0,add:0
生产者:Thread-0,add:4
消费者:Thread-1,value:0
消费者:Thread-1,value:1
消费者:Thread-1,value:2
消费者:Thread-1,value:3
消费者:Thread-1,value:4
消费者:Thread-1,value:5

从日志上可以很明显看出,对于整数,默认情况下,按照升序排序,消费者默认从 0 开始处理。

3.5、DelayQueue

DelayQueue
是一个线程安全的延迟队列,存入队列的元素不会立刻被消费,只有到了其指定的延迟时间,才能够从队列中出队。

底层采用的是
PriorityQueue
来存储元素,
DelayQueue
的特点在于:插入队列中的数据可以按照自定义的
delay
时间进行排序,快到期的元素会排列在前面,只有
delay
时间小于 0 的元素才能够被取出。

部分核心源码如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    /**采用ReentrantLock进行加锁*/
    private final transient ReentrantLock lock = new ReentrantLock();

    /**采用PriorityQueue进行存储数据*/
    private final PriorityQueue<E> q = new PriorityQueue<E>();

	/**条件等待与通知*/
    private final Condition available = lock.newCondition();

    /**入队操作*/
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    /**出队操作*/
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
}

同样的,把最上面的样例
Container
中的阻塞队列实现类换成
DelayQueue
,代码如下:

public class Container {

    /**
     * 初始化阻塞队列
     */
    private final BlockingQueue<DelayedUser> queue = new DelayQueue<DelayedUser>();


    /**
     * 添加数据到阻塞队列
     * @param value
     */
    public void add(DelayedUser value) {
        try {
            queue.put(value);
            System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    /**
     * 从阻塞队列获取数据
     */
    public void get() {
        try {
            DelayedUser value = queue.take();
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            System.out.println(time + " 消费者:"+ Thread.currentThread().getName()+",value:" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

DelayQueue
队列中的元素需要显式实现
Delayed
接口,定义一个
DelayedUser
类,代码如下:

public class DelayedUser implements Delayed {

    /**
     * 当前时间戳
     */
    private long start;

    /**
     * 延迟时间(单位:毫秒)
     */
    private long delayedTime;

    /**
     * 名称
     */
    private String name;

    public DelayedUser(long delayedTime, String name) {
        this.start = System.currentTimeMillis();
        this.delayedTime = delayedTime;
        this.name = name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // 获取当前延迟的时间
        long diffTime = (start + delayedTime) - System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        // 判断当前对象的延迟时间是否大于目标对象的延迟时间
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "DelayedUser{" +
                "delayedTime=" + delayedTime +
                ", name='" + name + '\'' +
                '}';
    }
}

生产者插入数据的内容,做如下调整。

/**
 * 生产者
 */
public class Producer extends Thread {

    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.add(new DelayedUser(1000 * i, "张三" +  i));
        }
    }
}

最后运行结果如下:

生产者:Thread-0,add:DelayedUser{delayedTime=0, name='张三0'}
生产者:Thread-0,add:DelayedUser{delayedTime=1000, name='张三1'}
生产者:Thread-0,add:DelayedUser{delayedTime=2000, name='张三2'}
生产者:Thread-0,add:DelayedUser{delayedTime=3000, name='张三3'}
生产者:Thread-0,add:DelayedUser{delayedTime=4000, name='张三4'}
生产者:Thread-0,add:DelayedUser{delayedTime=5000, name='张三5'}
2023-11-03 14:55:33 消费者:Thread-1,value:DelayedUser{delayedTime=0, name='张三0'}
2023-11-03 14:55:34 消费者:Thread-1,value:DelayedUser{delayedTime=1000, name='张三1'}
2023-11-03 14:55:35 消费者:Thread-1,value:DelayedUser{delayedTime=2000, name='张三2'}
2023-11-03 14:55:36 消费者:Thread-1,value:DelayedUser{delayedTime=3000, name='张三3'}
2023-11-03 14:55:37 消费者:Thread-1,value:DelayedUser{delayedTime=4000, name='张三4'}
2023-11-03 14:55:38 消费者:Thread-1,value:DelayedUser{delayedTime=5000, name='张三5'}

可以很清晰的看到,延迟时间最低的排在最前面。

四、小结

最后我们来总结一下
BlockingQueue
阻塞队列接口,它提供了很多非常丰富的生产者和消费者模型的编程实现,同时兼顾了线程安全和执行效率的特点。

开发者可以通过
BlockingQueue
阻塞队列接口,简单的代码编程即可实现多线程中数据高效安全传输的目的,确切的说,它帮助开发者减轻了不少的编程难度。

在实际的业务开发中,其中
LinkedBlockingQueue
使用的是最广泛的,因为它的执行效率最高,在使用的时候,需要平衡好队列长度,防止过大导致内存溢出。

举个最简单的例子,比如某个功能上线之后,需要做下压力测试,总共需要请求 10000 次,采用 100 个线程去执行,测试服务是否能正常工作。如何实现呢?

可能有的同学想到,每个线程执行 100 次请求,启动 100 个线程去执行,可以是可以,就是有点笨拙。

其实还有另一个办法,就是将 10000 个请求对象,存入到阻塞队列中,然后采用 100 个线程去消费执行,这种编程模型会更佳灵活。

具体示例代码如下:

public static void main(String[] args) throws InterruptedException {
    // 将每个用户访问百度服务的请求任务,存入阻塞队列中
    // 也可以也采用多线程写入
    BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    for (int i = 0; i < 10000; i++) {
        queue.put("https://www.baidu.com?paramKey=" + i);
    }

    // 模拟100个线程,执行10000次请求访问百度
    final int threadNum = 100;
    for (int i = 0; i < threadNum; i++) {
        final int threadCount = i + 1;
        new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("thread " + threadCount + " start");
                boolean over = false;
                while (!over) {
                    String url = queue.poll();
                    if(Objects.nonNull(url)) {
                        // 发起请求
                        String result =HttpUtils.getUrl(url);
                        System.out.println("thread " + threadCount + " run result:" + result);
                    }else {
                        // 任务结束
                        over = true;
                        System.out.println("thread " + threadCount + " final");
                    }
                }
            }
        }).start();
    }
}

本文主要围绕
BlockingQueue
阻塞队列接口,从方法介绍到用法详解,做了一次知识总结,如果有描述不对的地方,欢迎留言指出!

五、参考

1、
https://www.cnblogs.com/xrq730/p/4855857.html

2、
https://juejin.cn/post/6999798721269465102