一:背景

1. 讲故事

最近聊了不少和异步相关的话题,有点疲倦了,今天再写最后一篇作为近期这类话题的一个封笔吧,下篇继续写我熟悉的
生产故障
系列,突然亲切感油然而生,哈哈,免费给别人看程序故障,是一种积阴德阳善的事情,欲知前世因,今生受者是。欲知来世果,今生做者是。

在任务延续方面,我个人的总结就是三类,分别为:

  1. StateMachine
  2. ContinueWith
  3. Awaiter

话不多说,我们逐个研究下底层是咋玩的?

二:异步任务延续的玩法

1. StateMachine

说到状态机大家再熟悉不过了,也是 async,await 的底层化身,很多人看到 async await 就想到了IO场景,其实IO场景和状态机是两个独立的东西,状态机是一种设计模式,把这个模式套在IO场景会让代码更加丝滑,仅此而已。为了方便讲述,我们写一个 StateMachine 与 IO场景 无关的一段测试代码。


    internal class Program
    {
        static void Main(string[] args)
        {
            UseAwaitAsync();

            Console.ReadLine();
        }

        static async Task<string> UseAwaitAsync()
        {
            var html = await Task.Run(() =>
            {
                Thread.Sleep(1000);
                var response = "<html><h1>博客园</h1></html>";
                return response;
            });
            Console.WriteLine($"GetStringAsync 的结果:{html}");
            return html;
        }
    }

那这段代码在底层是如何运作的呢?刚才也说到了asyncawait只是迷惑你的一种幻象,我们必须手握
辟邪宝剑
斩开幻象显真身,这里借助 ilspy 截图如下:

从卦中看,本质上就是借助
AsyncTaskMethodBuilder<string>
建造者将 awaiter 和 stateMachine 做了一个绑定,感兴趣的朋友可以追一下 AwaitUnsafeOnCompleted() 方法,最后状态机
<UseAwaitAsync>d__1
实例会放入到
Task.Run
的 m_continuationObject 字段。如果有朋友对流程比较蒙的话,我画了一张简图。

图和代码都有了,接下来就是眼见为实。分别在
AddTaskContinuation

RunContinuations
方法中做好埋点,前者可以看到 延续任务 是怎么加进去的,后者可以看到 延续任务 是怎么取出来的。


心细的朋友会发现这卦上有一个很特别的地方,就是
allowInlining=true
,也就是回调函数(StateMachine)是在当前线程上一撸到底的。

有些朋友可能要问,能不能让
延续任务
跑在单独线程上? 可以是可以,但你得把 Task.Run 改成 Task.Factory.StartNew ,这样就可以设置TaskCreationOptions参数,参考代码如下:

    var html = await Task.Factory.StartNew(() =>{}, TaskCreationOptions.RunContinuationsAsynchronously);

2. ContinueWith

那些同处于被裁的35岁大龄程序员应该知道Task是 framework 4.0 时代出来的,而async,await是4.5出来的,所以在这个过渡期中有大量的项目会使用ContinueWith 导致回调地狱。。。 这里我们对比一下两者有何不同,先写一段参考代码。


    internal class Program
    {
        static void Main(string[] args)
        {
            UseContinueWith();

            Console.ReadLine();
        }

        static Task<string> UseContinueWith()
        {
            var query = Task.Run(() =>
            {
                Thread.Sleep(1000);
                var response = "<html><h1>博客园</h1></html>";
                return response;
            }).ContinueWith(t =>
            {
                var html = t.Result;
                Console.WriteLine($"GetStringAsync 的结果:{html}");
                return html;
            });

            return query;
        }
    }

从卦代码看确实没有asyncawait简洁,那 ContinueWith 内部做了什么呢?感兴趣的朋友可以跟踪一下,本质上和 StateMachine 的玩法是一样的,都是借助 m_continuationObject 来实现延续,画个简图如下:

代码和模型图都有了,接下来就是用 dnspy 开干了。。。还是在
AddTaskContinuation

RunContinuations
上埋伏断点观察。


从卦中可以看到,延续任务使用新线程来执行的,并没有一撸到底,这明显与
asyncawait
的方式不同,有些朋友可能又要说了,那如何实现和StateMachine一样的呢?这就需要在 ContinueWith 中新增 ExecuteSynchronously 同步参数,参考如下:

    var query = Task.Run(() => { }).ContinueWith(t =>
    {
    }, TaskContinuationOptions.ExecuteSynchronously);

3. Awaiter

使用Awaiter做任务延续的朋友可能相对少一点,它更多的是和 StateMachine 打配合,当然单独使用也可以,但没有前两者灵活,它更适合那些不带返回值的任务延续,本质上也是借助
m_continuationObject
字段实现的一套底层玩法,话不多说,上一段代码:


        static Task<string> UseAwaiter()
        {
            var awaiter = Task.Run(() =>
            {
                Thread.Sleep(1000);
                var response = "<html><h1>博客园</h1></html>";
                return response;
            }).GetAwaiter();

            awaiter.OnCompleted(() =>
            {
                var html = awaiter.GetResult();
                Console.WriteLine($"UseAwaiter 的结果:{html}");
            });

            return Task.FromResult(string.Empty);
        }

前面两种我配了图,这里没有理由不配了,哈哈,模型图如下:

接下来把程序运行起来,观察截图:


从卦中观察,它和StateMachine一样,默认都是 一撸到底 的方式。

三:RunContinuations 观察

这一小节我们单独说一下
RunContinuations
方法,因为这里的实现太精妙了,不幸的是Dnspy和ILSpy反编译出来的代码太狗血,原汁原味的简化后代码如下:

    private void RunContinuations(object continuationObject) // separated out of FinishContinuations to enable it to be inlined
    {
        bool canInlineContinuations =
            (m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) == 0 &&
            RuntimeHelpers.TryEnsureSufficientExecutionStack();

        switch (continuationObject)
        {
            // Handle the single IAsyncStateMachineBox case.  This could be handled as part of the ITaskCompletionAction
            // but we want to ensure that inlining is properly handled in the face of schedulers, so its behavior
            // needs to be customized ala raw Actions.  This is also the most important case, as it represents the
            // most common form of continuation, so we check it first.
            case IAsyncStateMachineBox stateMachineBox:
                AwaitTaskContinuation.RunOrScheduleAction(stateMachineBox, canInlineContinuations);
                LogFinishCompletionNotification();
                return;

            // Handle the single Action case.
            case Action action:
                AwaitTaskContinuation.RunOrScheduleAction(action, canInlineContinuations);
                LogFinishCompletionNotification();
                return;

            // Handle the single TaskContinuation case.
            case TaskContinuation tc:
                tc.Run(this, canInlineContinuations);
                LogFinishCompletionNotification();
                return;

            // Handle the single ITaskCompletionAction case.
            case ITaskCompletionAction completionAction:
                RunOrQueueCompletionAction(completionAction, canInlineContinuations);
                LogFinishCompletionNotification();
                return;
        }
    }

卦中的 case 挺有意思的,除了本篇聊过的 TaskContinuation 和 IAsyncStateMachineBox 之外,还有另外两种 continuationObject,这里说一下 ITaskCompletionAction 是怎么回事,其实它是
Task.Result
的底层延续类型,所以大家应该能理解为什么 Task.Result 能唤醒,主要是得益于
Task.m_continuationObject =completionAction
所致。

说了这么说,如何眼见为实呢?可以从源码中寻找答案。


        private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            var mres = new SetOnInvokeMres();

            AddCompletionAction(mres, addBeforeOthers: true);

            var returnValue = mres.Wait(Timeout.Infinite, cancellationToken);
        }

        private sealed class SetOnInvokeMres : ManualResetEventSlim, ITaskCompletionAction
        {
            internal SetOnInvokeMres() : base(false, 0) { }
            public void Invoke(Task completingTask) { Set(); }
            public bool InvokeMayRunArbitraryCode => false;
        }

从卦中可以看到,其实就是把 ITaskCompletionAction 接口的实现类 SetOnInvokeMres 塞入了 Task.m_continuationObject 中,一旦Task执行完毕之后就会调用 Invoke() 下的
Set()
来实现事件唤醒。

四:总结

虽然
异步任务延续
有三种实现方法,但底层都是一个套路,即借助
Task.m_continuationObject
字段玩出的各种花样,当然他们也是有一些区别的,即对
m_continuationObject
任务是否用单独的线程调度,产生了不同的意见分歧。
图片名称

主流架构

Device Plugin:K8s制定设备插件接口规范,定义异构资源的上报和分配,设备厂商只需要实现相应的API接口,无需修改kubelet源码即可实现对其他硬件设备的支持。
Extended Resource:Scheduler可以根据Pod的创建删除计算资源可用量,而不再局限于CPU和内存的资源统计,进而将有特殊资源需求的Pod调度到相应的节点上。

通过Device Plugin 异构资源调度流程如下:

  1. Device plugin 向kubelet上报当前节点资源情况
  2. 用户通过yaml文件创建负载,定义Resource Request
  3. kube-scheduler根据从kubelet同步到的资源信息和Pod的资源请求,为Pod绑定合适的节点
  4. kubelet监听到绑定到当前节点的Pod,调用Device plugin的allocate接口为Pod分配设备
  5. kubelet启动Pod内的容器,将设备映射给容器

file

GPU虚拟化方案大致分为用户态隔离和内核态隔离:

  1. 用户态主要是通过vcuda的方式,劫持cuda调用,比如下面介绍的两种开源
  2. 内核态主要是用过虚拟gpu驱动的方式,比如腾讯云的qgpu和阿里云的cgpu,不过这两个都是闭源的

Nvidia-GPU

NVIDIA 提供的 Time-Slicing GPUs in Kubernetes 是一种通过 oversubscription(超额订阅) 来实现 GPU 共享的策略,有两种策略,单卡调度模式和超卖模式。
单卡的意思就是一个Pod调度一张GPU,当这个GPU有Pod使用了,就不可被其他Pod使用。

超卖模式这种策略能让多个任务在同一个 GPU 上进行,而不是每个任务都独占一个 GPU。Time Slicing(时间片)指的是 GPU 本身的时间片调度。
也就是说假如有两个进程同时使用同一个GPU,两个进程同时把 CUDA 任务发射到 GPU 上去,GPU 并不会同时执行,而是采用时间片轮转调度的方式。
进程和进程间的显存和算力没有任何限制,谁抢到就是谁的。

腾讯GPU-manager

基于Nvidia的k8s Device Plugin 实现
GPUManager
是腾讯自研的容器层GPU虚拟化方案,除兼容Nvidia 官方插件的GPU资源管理功能外,还增加碎片资源调度、GPU调度拓扑优化、GPU资源Quota等功能,在容器层面实现了GPU资源的化整为零,而在原理上仅使用了wrap library和linux动态库链接技术,就实现了GPU 算力和显存的上限隔离。

在工程设计上,GPUManager方案包括三个部分,cuda封装库vcuda、k8s device plugin 插件gpu-manager-daemonset和k8s调度插件gpu-quota-admission。

vcuda库是一个对nvidia-ml和libcuda库的封装库,通过劫持容器内用户程序的cuda调用限制当前容器内进程对GPU和显存的使用。

gpu-manager-daemonset是标准的k8s device plugin,实现了GPU拓扑感知、设备和驱动映射等功能。GPUManager支持共享和独占两种模式,当负载里tencent.com/vcuda-core request 值在0-100情况下,采用共享模式调度,优先将碎片资源集中到一张卡上,当负载里的tencent.com/vcuda-core request为100的倍数时,采用独占模式调度,需要注意的是GPUManager仅支持0~100和100的整数倍的GPU需求调度,无法支持150,220类的非100整数倍的GPU需求调度。

gpu-quota-admission是一个k8s Scheduler extender,实现了Scheduler的predicates接口,kube-scheduler在调度tencent.com/vcuda-core资源请求的Pod时,predicates阶段会调用gpu-quota-admission的predicates接口对节点进行过滤和绑定,同时gpu-quota-admission提供了GPU资源池调度功能,解决不同类型的GPU在namespace下的配额问题。
file

方案优点:

  1. 同时支持碎片和整卡调度,提高GPU资源利用率
  2. 支持同一张卡上容器间GPU和显存的使用隔离
  3. 基于拓扑感知,提供最优的调度策略
  4. 对用户程序无侵入,用户无感

方案缺点:

  1. 驱动和加速库的兼容性依赖于厂商
  2. 存在约5%的性能损耗

此项目腾讯云官方已不再支持,社区也处在无人维护状态,亲测cuda12有问题,调用报错

HAMi

HAMi
可为多种异构设备提供虚拟化功能,支持设备共享和资源隔离。
支持的设备:
file

file

HAMi 由多个组件组成,包括统一的 mutatingwebhook、统一的调度器扩展器、不同的设备插件以及针对每种异构 AI 设备的容器内虚拟化技术。
https://github.com/Project-HAMi/HAMi/tree/master

能力:

  • 支持碎片、整卡、多卡调度隔离,支持按量或者按百分比调度隔离
  • 支持指定目标卡型
  • 支持指定目标卡

目前该项目非常活跃,并且支持的cuda版本也比较友好,
>10.1

原文地址:
https://leason.top/Kubernetes-GPU-虚拟化方案.html#more

您是否正在与应用程序中的性能瓶颈作斗争?不要再观望了!Visual Studio 2022 在其性能分析套件中引入了 Meter Histogram(直方图)功能,为您提供了前所未有的分析和可视化直方图数据工具。

Meter Histogram 是 Visual Studio 性能分析套件的一个关键增强。此功能使您能够捕获和分析由各种性能工具生成的直方图数据,从而清晰地了解应用程序数据的分布。通过可视化这些数据,您可以轻松地识别可能导致性能瓶颈的模式和异常。

理解直方图数据可视化

当从直方图仪器记录数据时, Meter Histogram 功能将直方图结果可视化,提供数据随时间分布的清晰直观的表示。这种可视化帮助开发人员快速找到可能影响应用程序性能的模式和异常。

此外,泳道图提供了性能指标的详细、分段视图。通过将数据分解为易于理解的部分,该特性增强了您查明和解决性能问题的能力,从而更容易理解任何瓶颈的根本原因。

在快节奏的软件开发世界中,优化性能至关重要。Visual Studio 2022 中的 Meter Histogram 功能使开发人员能够更深入地了解他们的应用程序,从而获得更高效和高性能的软件。对于任何希望将性能分析提升到下一个水平的人来说,这个工具都是必备的。

感谢您帮助我们持续改进

我们非常重视您的反馈,因为它有助于我们使 Visual Studio 变得更好。如果您对 Meter Histogram 特性或其他任何东西有任何想法或问题,请使用 Visual Studio 中的报告问题工具。

通过 Twitter @VS_Debugger、Twitter @VisualStudio、YouTube 和 LinkedIn 与 VisualStudio 团队保持联系。

感谢您选择 Visual Studio!

原文连接:https://devblogs.microsoft.com/visualstudio/unlocking-insights-with-meter-histogram-in-the-profiler/

前言
本文是关于iced库的部件介绍,iced库是基于rust的GUI库,作者自述是受Elm启发。
iced目前的版本是0.13.1,相较于此前的0.12版本,有较大改动。
本合集是基于新版本的关于分部件(widget)的使用介绍,包括源代码介绍、实例使用等。

环境配置
系统:window10
平台:visual studio code
语言:rust
库:iced 0.13

图像导入及显示

iced中显示图像,可以使用image部件,但image部件不是默认启用的,需要启用feature。在
toml
文件添加:

iced={version="0.13.1", features=["image"]}

然后在代码中导入:

use iced::widget::{button,column,text,row,container,slider,image};

之前我们在介绍为iced窗口设置图标时还提到过一个外部图像库image:

image="0.25.5"

导入image,为了不与iced的image部件名称冲突,需要为image改个名称:

extern crate image as eximg;

1、图像导入

image

图像导入,通常需要使用文件对话框来导入文件,在iced中并没有提供对应的部件,因此,我们需要使用一个外部库,RFD,首先在toml中添加:

rfd="0.15.2"

下面是rfd库支持的功能:
image
包括了文件导入、保存、文件夹选择等,当然rfd也可以支持消息对话框。

use rfd::{FileDialog,MessageDialog};

本文的目的是实现图像文件的导入与显示,并可以调整一些图像属性,如大小、透明度等,因此,我们将添加一个按钮用于触发文件选中对话框,获取图像文件的路径并显示。
我们为结构体添加一个变量用于记录图像路径:

#[derive(Clone)]
struct Counter {
    slidervalue:f32,
    slidervalue2:f32,
    imgpath:String,
}

上述imgpath即图像路径变量,另外两个变量用于接受slider部件的实时值,因为我们需要调整图像的大小和透明度,正好可以适应slider,而关于slider的使用介绍,我们在上一篇博文里有单独介绍:
[rustGUI][iced]基于rust的GUI库iced(0.13)的部件学习(02):滑动条部件实现部件(文本或其他)缩放(slider)
然后我们添加按钮触发消息:

#[derive(Debug,Clone, Copy)]
enum Message {
    SliderChange(f32),
    SliderChange2(f32),
    BtnLoad,
}

上述的BtnLoad是按钮触发消息,另外两个是slider的滑动时触发,就不多说了。
我们希望的是,点击按钮时弹出文件对话框,因此需要在view函数里添加:

let btn_load=button(text("加载图片").size(15)).width(80).height(40)
                    .on_press(Message::BtnLoad);

然后在update函数里更新状态:

Message::BtnLoad =>{
                if let Some(file)=FileDialog::new()
                            .set_title("选择图片")
                            .add_filter("Image",&["png","jpg","jpeg","bmp"])
                            .add_filter("png",&["png"])
                            .set_directory("C://")
                            .pick_file(){
                                
                                println!("当前选择的文件:{:?}",file);
                                self.imgpath=file.display().to_string();

                            };
                
            }

上述代码中,
FileDialog::new()
是rfd库中文件夹选择的用法,看rfd的官方示例:
image
我们在实例中,将获取的图像路径传递给我们设置的变量imgpath。

2、图像显示

当我们获取到了图像的本地路径之后,就可以使用iced的image部件来显示图像,我们首先来看下iced中image的定义:

官方源码
#[derive(Debug)]
pub struct Image<Handle = image::Handle> {
    handle: Handle,
    width: Length,
    height: Length,
    content_fit: ContentFit,
    filter_method: FilterMethod,
    rotation: Rotation,
    opacity: f32,
}

我们主要关注的就是handle这个参数,它的类型是Handle,可以理解为图像的原始数据来源,Handle在iced中的定义:

官方源码
pub enum Handle {
    /// A file handle. The image data will be read
    /// from the file path.
    ///
    /// Use [`from_path`] to create this variant.
    ///
    /// [`from_path`]: Self::from_path
    Path(Id, PathBuf),

    /// A handle pointing to some encoded image bytes in-memory.
    ///
    /// Use [`from_bytes`] to create this variant.
    ///
    /// [`from_bytes`]: Self::from_bytes
    Bytes(Id, Bytes),

    /// A handle pointing to decoded image pixels in RGBA format.
    ///
    /// Use [`from_rgba`] to create this variant.
    ///
    /// [`from_rgba`]: Self::from_rgba
    Rgba {
        /// The id of this handle.
        id: Id,
        /// The width of the image.
        width: u32,
        /// The height of the image.
        height: u32,
        /// The pixels.
        pixels: Bytes,
    },
}

可以看到,Handle定义了三种获取图像原始数据的方式,一种是直接从图像路径获取,一种是图像的字节数组获取,一种是从rgba数据获取。
我们在本文显然是使用第一种方式,即图像路径来获取handle,用于显示。iced官方给了典型的image使用代码:

image("ferris.png")

这是最简单的应用,不过我们在实例使用时,因为希望能调整图像的大小与透明度,所以可以这样写:

let img_handle=image::Handle::from_path(self.imgpath.clone());
let img1=image(img_handle).opacity(opacity).width(img_w).height(img_h);

其中,opacity用于调整透明度,width和height用于调整图像尺寸。
我们来看下实际显示效果:
image

3、图像调整

有了之前slider调整text文字大小的经验,此处就比较简单了,因为我们已经获取了图像路径,显然也就能获取图像尺寸了,这里,我们使用外部的image库来处理。image库的功能还是很强大的,看下它支持的图像处理格式:
image
为了方便管理,我们将
获取图像尺寸
封装为一个函数,与此前的img_to_icon函数放到一起,使用时直接调用即可:

///
/// 获取图片大小
/// 
/// 例:jpg or png -> (w,h)
pub fn get_img_size(path:&str) ->(f32,f32){

    if path != "" {
        let img_re=eximg::open(path);
        match img_re{
            Ok(img)=>{
                return (img.width() as f32,img.height() as f32)
            },
            Err(e)=>{
                let res=MessageDialog::new()
                        .set_title("错误")
                        .set_level(rfd::MessageLevel::Error)
                        .set_buttons(rfd::MessageButtons::Ok)
                        .set_description(&e.to_string())
                        .show();
                if res == rfd::MessageDialogResult::Yes{
                    return (0.0,0.0)
                }
                return (0.0,0.0)
            }
        }
        //return  (img.width() as f32,img.height() as f32)
    }
    return (0.0,0.0)

}
 let img_size=imgprocess::get_img_size(&self.imgpath);

看下完整的view函数:

let btn_load=button(text("加载图片").size(15)).width(80).height(40)
                   .on_press(Message::BtnLoad);
       let sl1=slider(
           0.0..=1.0, 
           self.slidervalue, 
           Message::SliderChange)
                               .height(40).width(200)
                               .step(0.01);
      
       let opacity= self.slidervalue;
       let tt1=text("透明度:").size(15);
       let tt2=text(format!("{:.2}",opacity)).size(15);
       let row1=row![tt1,sl1,tt2,].spacing(10)
                                               .align_y(iced::Center);


       let slider_scale=slider(0.1..=2.0,self.slidervalue2,Message::SliderChange2)
                                                       .height(40).width(200)
                                                       .step(0.01);
       let tt3=text("缩放:").size(15);
       let tt4=text(format!("{:.2}",self.slidervalue2)).size(15);
       let row2=row![tt3,slider_scale,tt4,].spacing(10);

       let tt_imgpath=text("当前图片路径:").size(15);
       let tt_imgpath2=text(format!("{}",self.imgpath)).size(15);
       let row3=row![tt_imgpath,tt_imgpath2,].spacing(10);

       let img_handle=image::Handle::from_path(self.imgpath.clone());
       let img_size=imgprocess::get_img_size(&self.imgpath);
       let img_w=img_size.0 * self.slidervalue2;
       let img_h=img_size.1 * self.slidervalue2;
       let tt_imgsize=text(format!("图片大小:{:?}",img_size)).size(15);
       //let row4=row![].spacing(10);

       let img1=image(img_handle).opacity(opacity).width(img_w).height(img_h);
       //let cont_color=Color::from_rgb(120.0, 120.0, 0.0);
       let cont_color2=iced::color!(0xE9E7E7,0.5);//#E9E7E7FF
       let cont_img=container(img1)
               .width(1000).height(1000)
               .align_x(iced::Center).align_y(iced::Center)
               .style(move |t|styles::mycontainerstyle(t, cont_color2));
       

       column![
           btn_load,
           row1,
           row2,
           row3,
           tt_imgsize,
           cont_img,
       ].align_x(iced::Center)
       .padding(20)
       .into()

它整体显示效果如下:
image

4、综述

先来看看动态演示:
image
结合前文slider的使用,本文介绍了如何在iced中导入图像、显示图像以及调整图像,当然也是很简单的示例。比如导入按钮,后续将会使用菜单替代。

本文github源码附上:
https://github.com/yangshuqi1201/RabbitMQ.Core

【前言】
RabbitMQ提供了五种消息模型,分别是简单模型、工作队列模型、发布/订阅模型、路由模型和主题模型。‌‌

  • 简单模型
    (Simple)‌
    :在这种模式下,一个生产者将消息发送到一个队列,只有一个消费者监听并处理该队列中的消息。这种模型适用于简单的场景,但存在消息可能未被正确处理就被删除的风险。

  • 工作队列模型
    (Work Queue)‌
    :此模型允许多个消费者共同处理队列中的任务,实现负载均衡。生产者将消息发送到队列,多个消费者竞争获取并处理这些消息。这种模型适用于需要高效处理大量任务的场景。

  • ‌发布/订阅模型
    (Publish/Subscribe)‌
    :在这种模式下,生产者将消息发送到一个交换机,交换机以广播
    (Fanout)
    的形式将消息发送给所有订阅了相应队列的消费者。这种模型适用于需要广播消息给所有感兴趣消费者的场景。

  • 路由模型
    (Routing)‌
    :使用direct交换机,生产者发送消息时需要指定路由键,交换机根据路由键将消息路由到相应的队列。这种模型适用于需要对消息进行精确控制的场景。

  • ‌主题模型
    (Topics)‌
    :使用topic交换机,支持使用通配符进行模式匹配,生产者发送的消息可以通过特定的路由键匹配到多个队列。这种模型适用于需要灵活匹配消息的场景。

  • 这些模型在应用场景、消息传递方式和交换机使用上有所不同,用户可以根据具体需求选择合适的模型来优化系统的性能和可靠性。


在之前我使用RabbitMQ实现了Direct类型的交换机实现了基础的生产消费。
RabbitMQ的部署请见:
https://www.cnblogs.com/sq1201/p/18635209
这篇文章我将基于.NET8.0来实现RabbitMQ的广播模式,以及死信队列的基础用法。

【一】首先创建项目,安装RabbitMQ的包(此处我没有选择最新版,因为最新版全面使用异步,关于IModel也改为了IChannel,最新版的语法有待研究),我的项目结构如下:

在这里延伸一下Asp.netcore的小知识点,以实现灵活而强大的配置管理系统。

1. Microsoft.Extensions.Configuration.Abstractions
功能:

  • 提供配置的基础接口和抽象,定义了与应用程序配置相关的核心机制。

  • 是依赖注入(DI)配置的一部分,为应用程序提供了对配置源的抽象访问。
    包含的核心功能和接口:

  • IConfiguration: 表示应用程序的配置,支持按层级结构访问配置值。
    var value = configuration["MySetting"];

  • IConfigurationSection: 表示配置中的一个具体部分,用于访问嵌套的层级配置。
    var section = configuration.GetSection("MySection");
    var subValue = section["SubSetting"];

  • IConfigurationProvider: 表示一个提供配置值的源(如文件、环境变量等)。

  • IConfigurationRoot: 是配置的根对象,支持动态监控配置变更。

适用场景:

  • 基础配置系统的搭建。
  • 当你需要自定义自己的配置提供程序时(例如:将数据库或远程 API 作为配置源)。
    示例:
using Microsoft.Extensions.Configuration;

var builder = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json")
    .AddEnvironmentVariables();

var configuration = builder.Build();
Console.WriteLine(configuration["MySetting"]);

2. Microsoft.Extensions.Configuration.Binder

功能:

  • 提供了扩展方法,用于 将配置绑定到强类型对象。
  • 在读取配置时,通常需要将配置值转换为 C# 对象(例如类或结构),而这个包提供了关键的绑定功能。

包含的核心功能:

  • Bind 方法
    : 将配置值绑定到自定义类型。
var myOptions = new MyOptions();
configuration.Bind("MySection", myOptions);
  • Get 方法:
    从配置中直接返回类型化对象。
var myOptions = configuration.Get<MyOptions>("MySection");
  • GetValue 方法:
    直接从配置中获取某个特定的值,并将其转换为指定的类型。
int timeout = configuration.GetValue<int>("Timeout");

适用场景:

  • 强类型配置的支持:
    当需要将配置文件内容(如 JSON、环境变量)与代码中的类型对应时。
  • 简化复杂配置读取逻辑。
    示例:
using Microsoft.Extensions.Configuration;

public class MyOptions
{
    public string Setting1 { get; set; }
    public int Setting2 { get; set; }
}

// 读取配置
var builder = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json");

var configuration = builder.Build();

// 将配置绑定到强类型对象
var options = new MyOptions();
configuration.Bind("MySection", options);

Console.WriteLine(options.Setting1);
Console.WriteLine(options.Setting2);

// 或者直接获取类型化对象
var options2 = configuration.Get<MyOptions>("MySection");

【二】编写appsettings.json文件

【三】定义有关于配置文件信息的DTO

【四】编写RabbitMQ生产消费通用类,RabbitMQManager类。

点击查看代码
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using RabbitMQ.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;

namespace RabbitMQ.Service
{
    public class RabbitMQManager
    {
        //使用数组的部分,是给消费端用的,目前生产者只设置了一个,消费者可能存在多个。
        /// <summary>
        /// RabbitMQ工厂发送端
        /// </summary>
        private IConnectionFactory _connectionSendFactory;
        /// <summary>
        /// RabbitMQ工厂接收端
        /// </summary>
        private IConnectionFactory _connectionReceiveFactory;

        /// <summary>
        /// 连接 发送端
        /// </summary>
        private IConnection _connectionSend;
        /// <summary>
        /// 连接 消费端
        /// </summary>
        private IConnection[] _connectionReceive;

        /// <summary>
        /// MQ配置信息
        /// </summary>
        public MqConfigInfo _mqConfigs;

        /// <summary>
        /// 通道 发送端
        /// </summary>
        private IModel _modelSend;
        /// <summary>
        /// 通道 消费端
        /// </summary>
        private IModel[] _modelReceive;

        /// <summary>
        /// 事件
        /// </summary>
        private EventingBasicConsumer[] _basicConsumer;

        /// <summary>
        /// 消费者个数
        /// </summary>
        public int _costomerCount;
        public RabbitMQManager(IConfiguration configuration)
        {
            _mqConfigs = new MqConfigInfo
            {
                Host = configuration["MQ:Host"],
                Port = Convert.ToInt32(configuration["MQ:Port"]),
                User = configuration["MQ:User"],
                Password = configuration["MQ:Password"],
                ExchangeName = configuration["MQ:ExchangeName"],
                DeadLetterExchangeName = configuration["MQ:DeadLetterExchangeName"],
                DeadLetterQueueName = configuration["MQ:Queues:2:QueueName"]
            };
        }

        /// <summary>
        /// 初始化生产者连接
        /// </summary>
        public void InitProducerConnection()
        {
            Console.WriteLine("【开始】>>>>>>>>>>>>>>>生产者连接");

            _connectionSendFactory = new ConnectionFactory
            {
                HostName = _mqConfigs.Host,
                Port = _mqConfigs.Port,
                UserName = _mqConfigs.User,
                Password = _mqConfigs.Password
            };
            if (_connectionSend != null && _connectionSend.IsOpen)
            {
                return; //已有连接
            }

            _connectionSend = _connectionSendFactory.CreateConnection(); //创建生产者连接

            if (_modelSend != null && _modelSend.IsOpen)
            {
                return; //已有通道
            }

            _modelSend = _connectionSend.CreateModel(); //创建生产者通道

            // 声明主交换机 为 Fanout 类型,持久化
            _modelSend.ExchangeDeclare(
                exchange: _mqConfigs.ExchangeName,
                type: ExchangeType.Fanout,
                durable: true, // 明确设置为持久化
                autoDelete: false,
                arguments: null
            );

            // 声明死信交换机 为Fanout类型,持久化
            _modelSend.ExchangeDeclare(
                exchange: _mqConfigs.DeadLetterExchangeName,
                type: ExchangeType.Fanout,
                durable: true, // 明确设置为持久化
                autoDelete: false,
                arguments: null
            );

            // 声明死信队列
            _modelSend.QueueDeclare(
                queue: _mqConfigs.DeadLetterQueueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null
            );

            // 绑定死信队列到死信交换机
            _modelSend.QueueBind(_mqConfigs.DeadLetterQueueName, _mqConfigs.DeadLetterExchangeName, routingKey: "");

            Console.WriteLine("【结束】>>>>>>>>>>>>>>>生产者连接");
        }


        /// <summary>
        /// 消息发布到交换机(Fanout模式)
        /// </summary>
        /// <param name="message">消息内容</param>
        /// <param name="exchangeName">交换机名称</param>
        /// <returns>发布结果</returns>
        public async Task<(bool Success, string ErrorMessage)> PublishAsync(string message, string exchangeName)
        {
            try
            {
                byte[] body = Encoding.UTF8.GetBytes(message);

                await Task.Run(() =>
                {
                    _modelSend.BasicPublish(
                        exchange: exchangeName,
                        routingKey: string.Empty, // Fanout 模式无需 RoutingKey
                        basicProperties: null,
                        body: body
                    );
                });

                return (true, string.Empty);
            }
            catch (Exception ex)
            {
                return (false, $"发布消息时发生错误: {ex.Message}");
            }
        }

        /// <summary>
        /// 消费者初始化连接配置
        /// </summary>
        public void InitConsumerConnections(List<QueueConfigInfo> queueConfigs)
        {
            Console.WriteLine("【开始】>>>>>>>>>>>>>>>消费者连接");

            //创建单个连接工厂
            _connectionReceiveFactory = new ConnectionFactory
            {
                HostName = _mqConfigs.Host,
                Port = _mqConfigs.Port,
                UserName = _mqConfigs.User,
                Password = _mqConfigs.Password
            };
            _costomerCount = queueConfigs.Sum(q => q.ConsumerCount); // 获取所有队列的消费者总数

            // 初始化数组         
            _connectionReceive = new IConnection[_costomerCount];
            _modelReceive = new IModel[_costomerCount];
            _basicConsumer = new EventingBasicConsumer[_costomerCount];

            int consumerIndex = 0; // 用于跟踪当前消费者索引

            foreach (var queueConfig in queueConfigs)
            {
                for (int i = 0; i < queueConfig.ConsumerCount; i++)
                {
                    string queueName = queueConfig.QueueName;

                    // 创建连接
                    _connectionReceive[consumerIndex] = _connectionReceiveFactory.CreateConnection();
                    _modelReceive[consumerIndex] = _connectionReceive[consumerIndex].CreateModel();
                    _basicConsumer[consumerIndex] = new EventingBasicConsumer(_modelReceive[consumerIndex]);

                    // 声明主交换机(确保交换机存在)
                    _modelReceive[consumerIndex].ExchangeDeclare(_mqConfigs.ExchangeName, ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);

                    // 声明死信交换机为 Fanout 类型
                    _modelReceive[consumerIndex].ExchangeDeclare(_mqConfigs.DeadLetterExchangeName, ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);

                    if (queueName == _mqConfigs.DeadLetterQueueName)
                    {
                        // 死信队列的声明和绑定
                        _modelReceive[consumerIndex].QueueDeclare(
                            queue: queueName,
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null
                        );

                        // 只将死信队列绑定到死信交换机
                        _modelReceive[consumerIndex].QueueBind(queueName, _mqConfigs.DeadLetterExchangeName, routingKey: "");
                    }
                    else
                    {
                        // 业务队列的声明和绑定
                        _modelReceive[consumerIndex].QueueDeclare(
                            queue: queueName,
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: new Dictionary<string, object>
                            {
                        { "x-dead-letter-exchange", _mqConfigs.DeadLetterExchangeName },
                        { "x-dead-letter-routing-key", "" }
                            }
                        );

                        // 只将业务队列绑定到主交换机
                        _modelReceive[consumerIndex].QueueBind(queueName, _mqConfigs.ExchangeName, routingKey: "");
                    }

                    

                    // 设置QoS,确保每次只处理一个消息
                    _modelReceive[consumerIndex].BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    consumerIndex++;
                }
            }

            Console.WriteLine("【结束】>>>>>>>>>>>>>>>消费者连接初始化完成");
        }

        /// <summary>
        /// 消费者连接
        /// </summary>
        public async Task ConncetionReceive(int consumeIndex, string exchangeName, string queueName, Func<string, Task> action)
        {

            await StartListenerAsync(async (model, ea) =>
            {
                try
                {
                    byte[] message = ea.Body.ToArray();
                    string msg = Encoding.UTF8.GetString(message);
                    Console.WriteLine($"队列 {queueName},消费者索引 {consumeIndex} 接收到消息:{msg}");

                    await action(msg);
                    _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);//确认消息
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"处理消息时发生错误: {ex.Message}");
                    // 拒绝消息且不重新入队,触发死信机制
                    _modelReceive[consumeIndex].BasicNack(ea.DeliveryTag, false, false);
                }

            }, queueName, consumeIndex);
        }

        /// <summary>
        /// 手动确认消费机制
        /// </summary>
        /// <param name="handler"></param>
        /// <param name="queueName"></param>
        /// <param name="consumeIndex"></param>
        /// <returns></returns>
        private async Task StartListenerAsync(AsyncEventHandler<BasicDeliverEventArgs> handler, string queueName, int consumeIndex)
        {
            _basicConsumer[consumeIndex].Received += async (sender, ea) => await handler(sender, ea);
            _modelReceive[consumeIndex].BasicConsume(
                queue: queueName,
                autoAck: false,
                consumer: _basicConsumer[consumeIndex]
            );

            Console.WriteLine($"队列 {queueName} 的消费者 {consumeIndex} 已启动监听");
        }


    }
}

【五】编写 RabbitMQService服务类,负责初始化 RabbitMQManager,并调用其方法完成 RabbitMQ 的连接和配置。

using Microsoft.Extensions.Configuration;
using RabbitMQ.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;


namespace RabbitMQ.Service
{
    public class RabbitMQService
    {
        private readonly RabbitMQManager _rabbitmqManager;

        public RabbitMQService(IConfiguration configuration)
        {
            _rabbitmqManager = new RabbitMQManager(configuration);

           
            //初始化生产者连接
            _rabbitmqManager.InitProducerConnection();

            var queueConfigs = configuration.GetSection("MQ:Queues").Get<List<QueueConfigInfo>>();

            //初始化消费者连接
            _rabbitmqManager.InitConsumerConnections(queueConfigs);
        }

        public RabbitMQManager Instance => _rabbitmqManager;
    }
}

【六】编写ActionService服务类,来实现模拟的消费者调用方法以及死信队列处理死信消息的逻辑

点击查看代码
namespace RabbitMQ.Service
{
    public class ActionService
    {
        /// <summary>
        /// 付款
        /// </summary>
        public async Task ExActionOne(string msg)
        {      
            Console.WriteLine($"消费成功了【{msg}】消息以后正在执行付款操作");
            // 模拟失败条件
            if (msg.Contains("fail"))
            {
                throw new Exception("Simulated processing failure in ExActionOne");
            }
            await Task.Delay(1000); // 替换 Thread.Sleep
        }
        /// <summary>
        /// 库存扣减
        /// </summary>
        public async Task ExActionTwo(string msg)
        {
            Console.WriteLine($"消费成功了【{msg}】消息以后正在执行库存扣减操作");
            // 模拟失败条件
            if (msg.Contains("fail"))
            {
                throw new Exception("Simulated processing failure in ExActionTwo");
            }
            await Task.Delay(1000); // 替换 Thread.Sleep
        }
        /// <summary>
        /// 处理死信队列的消息
        /// </summary>
        public async Task ExActionDeadLetter(string message)
        {
            Console.WriteLine($"处理死信消息: {message}");
            // 在这里可以记录日志、发送通知等
            await Task.Delay(1000);
        }
    }
}

【七】在启动项Program文件中注入我们的服务注册为单例模式

【八】在Webapi控制器中编写测试方法,模拟实现给主交换机发送消息,当消费失败的时候,消息被发送到死信队列由死信队列消费进行后续操作

namespace RabbitMQ.Core.Controllers
{
    [Route("api/[controller]/[action]")]
    [ApiController]
    public class TestController : ControllerBase
    {
        private readonly RabbitMQService _rabbitmqService;
        private readonly IConfiguration _configuration;

        public TestController(RabbitMQService rabbitmqService, IConfiguration configuration)
        {
            _rabbitmqService = rabbitmqService;
            _configuration = configuration;
        }
        /// <summary>
        /// 测试rabbitmq发送消息
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        public async Task<IActionResult> TestRabbitMqPublishMessage(string pubMessage)
        {

            var result = await _rabbitmqService.Instance.PublishAsync(
                pubMessage,
                _configuration["MQ:ExchangeName"]
            );

            if (!result.Success)
            {
                Console.WriteLine($"【生产者】消息发送失败:{result.ErrorMessage}");
            }

            Console.WriteLine("【生产者】消息发送完成");

            return Ok();
        }
    }
}

【九】演示结果

PS:
我这里的死信交换机声明的还是广播模式,其实对于死信队列来说,交换机声明为Direct模式,使用RouteKey去匹配队列也是完全没问题的,而且针对于一条消息,不同队列有不同的消费结果,具体实现场景是所有队列的消费者都消费失败以后才算是一个失败的消息还是说有队列消费成功就不算是失败消息,这都是要结合实际业务场景去进行构思的。