1、背景

在当前项目中,需要实现Web端直接播放RTSP视频流。该功能的核心目标是使得用户能够通过浏览器观看来自不同品牌的IPC(Internet Protocol Camera)设备的实时视频流。主要的IPC设备来自海康威视、大华科技以及宇视等厂商,这些设备普遍使用RTSP协议来传输视频数据。

  • 点播视频
    :MP4, WebM, MKV, AVI。
  • 实时流媒体
    :RTMP, HLS, DASH, RTSP, WebRTC。
  • 编码格式
    :H.264, H.265, VP8, VP9, AV1。

2、基于协议的视频流格式

这些格式用于实时视频传输和直播,通常用于广播、实时互动视频或流媒体平台:

  • RTMP (Real-Time Messaging Protocol)


    • 由Adobe开发,主要用于直播流媒体传输,通常结合Flash播放器使用。
    • 具有低延迟的特点,适合实时视频流和互动性强的视频。
    • 适用场景
      :直播平台(如Twitch、YouTube Live),实时互动视频。
  • HLS (HTTP Live Streaming)


    • 由Apple提出,基于HTTP协议,能够将视频文件分割成一系列小片段(TS文件),并通过M3U8播放列表传输。
    • HLS支持自适应比特率,能够根据网络状况调整视频质量。
    • 适用场景
      :直播和点播流媒体,广泛应用于iOS和Web端(如Netflix、Hulu)。
      video.js可直接播放HLS(H264)视频流
  • DASH (Dynamic Adaptive Streaming over HTTP)


    • 又称MPEG-DASH,是HLS的开源替代品,支持自适应比特率流,能够根据网络条件动态调整视频质量。
    • 与HLS类似,但不依赖Apple技术,更加开放。
    • 适用场景
      :视频点播、直播流媒体。
  • RTSP (Real-Time Streaming Protocol)


    • 主要用于控制流媒体服务器,支持实时流传输。RTSP本身并不传输视频数据,通常结合RTP(Real-Time Transport Protocol)一起使用。
    • 适用场景
      :视频监控、IP摄像头、实时视频播放。
  • WebRTC (Web Real-Time Communication)


    • 主要用于浏览器之间的实时视频通话,不需要插件,直接通过WebRTC协议进行音视频流的传输。
    • 适用场景
      :视频会议、P2P视频通话。

3、直播视频编码格式

这些编码格式用于压缩视频数据,降低带宽需求,提供更好的视频质量:

  • H.264 (AVC, Advanced Video Coding)


    • 最常用的视频编码格式,广泛应用于Web视频、流媒体、高清视频录制等场景。
    • 适用场景
      :几乎所有的视频流和播放设备,尤其是Web视频、4K视频。
  • H.265 (HEVC, High Efficiency Video Coding)


    • 是H.264的后继者,提供比H.264更高的压缩效率,适合4K视频流和高质量视频存储。
    • 需要更高的解码能力,适用于现代硬件。
    • 适用场景
      :4K视频流、超高清视频、网络视频流。
  • VP8/VP9


    • 由Google推出,VP8是WebM的标准视频编码,VP9是其后继者,提供类似H.265的压缩效率。
    • 适用场景
      :YouTube等视频平台,尤其是需要开放格式的场景。
  • AV1


    • 开放且高效的视频编码格式,目标是提供更高的压缩效率,适用于未来的视频流和存储需求。
    • 目前的硬件支持较少,但越来越多的浏览器和平台开始支持AV1。
    • 适用场景
      :未来的视频流、4K及以上视频内容。

4、实时视频流格式

适用于需要低延迟的实时视频流,常见于直播、视频会议、IP摄像头等场景:

  • MJPEG (Motion JPEG)


    • 每一帧图像都以JPEG格式压缩,适合低延迟要求的视频流,通常用于监控和摄像头。
    • 适用场景
      :视频监控、IP摄像头。
  • VP8/VP9


    • 与视频流格式一样,VP8/VP9也适用于实时视频流传输,特别是在WebRTC中广泛使用。

5、其他协议和格式

  • MPEG-2


    • 传统的视频流格式,广泛用于电视广播和DVD视频,虽然现在被H.264和H.265替代,但仍在某些场合使用。
    • 适用场景
      :电视广播、卫星电视、DVD视频播放。
  • SRT (Secure Reliable Transport)


    • 一种低延迟、安全的视频流协议,特别适用于直播内容传输,确保在不稳定的网络条件下也能保证视频质量。
    • 适用场景
      :实时直播、电视台直播。

6、在web上实现播放rtsp/rtmp视频流,由于浏览器不 能自定义协议,不能直接播放,主要解决方案如下:

(1)、浏览器插件方案

这种方案依赖于浏览器插件或者控件,在浏览器中播放 RTSP/RTMP 流,但这种方法存在一些局限性,尤其在移动平台上不可用。

a.
VLC浏览器控件(已过期)

  • 简介
    :VLC是一个开源的媒体播放器,曾经可以通过浏览器插件来播放RTSP流。
  • 缺点
    :VLC控件已经过时,仅支持老版本的浏览器,而且在移动端完全不支持。
  • 推荐度
    :不推荐,已经过时,无法满足现代Web的需求。

b.
Flash控件(已淘汰)

  • 简介
    :Flash曾是Web视频流的标准,通过Flash插件可以播放RTMP流。
  • 缺点
    :Flash已被大多数现代浏览器淘汰,不支持移动平台,且存在安全问题。
  • 推荐度
    :不推荐,已经被现代浏览器废弃。

总结
:浏览器插件方案已经过时,不能满足现代Web应用的需求,不推荐在新项目中使用。

(2)、中间服务器方案

为了在Web端播放RTSP流,通常需要通过某些中间服务进行转换或处理,以下是几种主流的方案:

a.
RTSP转换为HTTP流

  • 工作原理
    :在服务器端使用工具(如FFmpeg)将RTSP流转为HTTP流(如HLS、DASH等),然后通过常规的HTML5
    <video>
    标签进行播放。
  • 优点

    • 支持现代浏览器和移动设备(因为HLS是基于HTTP的,支持跨平台)。
    • 使用FFmpeg等工具进行流转换,广泛支持。
  • 缺点

    适用场景
    :播放静态的RTSP流、点播视频流。

    • 转码延迟较高,实时性差,适用于点播或低延迟要求不高的场景。
    • 需要消耗较多的服务器资源进行转码。

b.
WebRTC技术

  • 工作原理
    :通过WebRTC将RTSP流转换为WebRTC流(RTSP to WebRTC),并在浏览器中播放。
  • 优点

    • 支持低延迟,适用于实时视频流(如视频会议、实时直播等)。
    • 无需插件,直接在浏览器中运行,支持现代浏览器(如Chrome、Firefox等)。
  • 缺点

    适用场景
    :实时视频流、低延迟直播。

    • 设置复杂,需要专门的服务器和协议转换。
    • 需要较高的技术要求和服务器支持。

c.
流媒体服务器

  • 工作原理
    :使用流媒体服务器(如Nginx-RTMP、Wowza Streaming Engine等)来处理RTSP流,并通过HTTP协议(如HLS、DASH、RTMP等)转发到Web端。
  • 优点

    • 适用于大规模的流媒体分发,支持RTSP、RTMP等协议。
    • HLS或DASH格式支持所有现代浏览器,尤其是移动设备。
    • 支持负载均衡、高可用性等企业级功能。
  • 缺点

    适用场景
    :大规模直播分发、点播视频


    • 需要搭建并维护流媒体服务器。
    • 服务器资源消耗较大,特别是在高并发下。

d.
使用第三方云服务

  • 工作原理
    :将RTSP流上传到云服务提供商(如AWS、Google Cloud、Alibaba Cloud等)的服务器,然后通过他们提供的API或者工具进行播放。
  • 优点

    • 省去自己搭建和维护流媒体服务器的麻烦。
    • 高可用、低延迟,支持全球分发。
    • 通常具有强大的后台分析、监控功能。
  • 缺点

    适用场景
    :需要稳定、高可用的视频流服务,适用于直播、实时流媒体服务。

    • 成本较高,特别是在流量大的情况下。
    • 依赖第三方服务,可能存在网络延迟或服务中断的风险。

7、本案例使用开源项目webRTC实时播放视频流

前端页面使用vue.js+elementui框架集成(
webrtc-streamer
目前rtsp视频流仅支持H264,目前不支持H265视频格式,下面有讲解另外一种模式播放H265的视频流

webrtc开源项目地址:https://github.com/mpromonet/webrtc-streamer/releases

webrtc-streamer启动页面

集成页面vue代码(组件)

<template>
<div id="video-contianer">
<video
class
="video"ref="video"preload="auto"autoplay="autoplay"muted
controls
:width
="width":height="height"
/>
<!-- <div
class
="mask"@click="handleClickVideo":class="{ 'active-video-border': selectStatus }"
></div> -->
</div>
</template>

<script>import WebRtcStreamer from"../../../public/webrtcstreamer";
const WEBRTC_STREAMER_URL
= 'http://192.168.168.194:8000'exportdefault{
name:
"videoCom",
props: {
rtsp: {
type: String,
required:
true,
},
isOn: {
type: Boolean,
default: false,
},
spareId: {
type: Number,
},
selectStatus: {
type: Boolean,
default: false,
},
height: {
type: Number,
default: 360,
},
width: {
type: Number,
default: 440},
},
data() {
return{
socket:
null,
result:
null, //返回值
pic: null,
webRtcServer:
null,
clickCount:
0, //用来计数点击次数
};
},
watch: {
rtsp() {
//do something
console.log(this.rtsp);this.webRtcServer && this.webRtcServer.disconnect();this.initVideo();
},
},
destroyed() {
this.webRtcServer && this.webRtcServer.disconnect();
},
beforeCreate() {
window.onbeforeunload
= () =>{this.webRtcServer && this.webRtcServer.disconnect();
};
},
created() {},
mounted() {
this.initVideo();
},
methods: {
initVideo() {
try{//连接后端的IP地址和端口
this.webRtcServer && this.webRtcServer.disconnect();var url =WEBRTC_STREAMER_URL;this.webRtcServer = newWebRtcStreamer(this.$refs.video,
url
//此处需要改为所部署的服务器IP
);//向后端发送rtsp地址
this.webRtcServer.connect(this.rtsp);
}
catch(error) {
console.log(error);
}
},
/*处理双击 单机*/dbClick() {this.clickCount++;if (this.clickCount === 2) {this.btnFull(); //双击全屏
this.clickCount = 0;
}
setTimeout(()
=>{if (this.clickCount === 1) {this.clickCount = 0;
}
},
250);
},
/*视频全屏*/btnFull() {
const elVideo
= this.$refs.video;if(elVideo.webkitRequestFullScreen) {
elVideo.webkitRequestFullScreen();
}
else if(elVideo.mozRequestFullScreen) {
elVideo.mozRequestFullScreen();
}
else if(elVideo.requestFullscreen) {
elVideo.requestFullscreen();
}
},
/*ison用来判断是否需要更换视频流
dbclick函数用来双击放大全屏方法
*/handleClickVideo() {if (this.isOn) {this.$emit("selectVideo", this.spareId);this.dbClick();
}
else{this.btnFull();
}
},
},
};
</script>

<style scoped lang="scss">.active-video-border {
border: 2px salmon solid;
}
#video
-contianer {
position: relative;
//width: 100%;
//height: 100%;
//.video {
//// width: 100%;
//// height: 100%;
//// object-fit: cover;
//}
.mask {
position: absolute;
top:
0;
left:
0;
width:
100%;
height:
100%;
cursor: pointer;
}
}
</style>
<Vedio rtsp="rtsp://admin:abc123456@192.168.168.168/video" :height="height" :width="width" ref="vedioSon"></Vedio>
<script>
import Vedio from "../../vedio/vedio.vue";
export default {
name: "User",
components: {Vedio },
}
</srcipt>

注意摄像头视频编码格式H264,webrtc目前不支持H265编码格式

8、使用
EasyMedia
流媒体服务进行视频转码,支持H265编码格式,还可以直接将海康视频直接解码播放

github地址:https://github.com/Janix520/EasyMedia
Springboot、netty实现的http-flv、websocket-flv流媒体服务(可用于直播点播),支持rtsp、h264、h265等、rtmp等多种源,h5纯js播放(不依赖flash),不需要依赖nginx等第三方,延迟大部分在1-5秒内(已经支持转复用,h264的流自动转封装,超低延迟)。
播放地址:http://192.168.168.154:8866/live?url=rtsp://admin:abc123456@192.168.168.168/video

前端使用flv.js就能直接播放视频流,或者使用西瓜播放器(npm install xgplayer npm install xgplayer-flv)

<!DOCTYPE html>
<htmllang="en">
<head>
<metacharset="UTF-8">
<metaname="viewport"content="width=device-width, initial-scale=1.0">
<title>FLV.js Demo</title>
<scriptsrc="https://cdn.jsdelivr.net/npm/flv.js/dist/flv.min.js"></script>
</head>
<body>
<h1>FLV.js </h1>
<videoid="videoElement"controls width="640"height="360"></video>

<script>
//检查浏览器是否支持 MSE (Media Source Extensions)
if(flvjs.isSupported()) {//初始化 FLV.js 播放器
const videoElement=document.getElementById('videoElement');
const flvPlayer
=flvjs.createPlayer({
type:
'flv',
url:
'ws://192.168.168.154:8866/live?url=rtsp://admin:abc123456@192.168.168.168/video',
});
flvPlayer.attachMediaElement(videoElement);
flvPlayer.load();
flvPlayer.play();
}
else{
alert(
'FLV.js 不支持您的浏览器');
}
</script>
</body>
</html>

西瓜视频播放器vue demo

<template>
<div>
<!--视频播放器容器-->
<divid="video-container" />
</div>
</template>

<script>
//引入西瓜播放器
import Player from'xgplayer';
import FlvPlugin from
"xgplayer-flv";
import
'xgplayer/dist/index.min.css'exportdefault{
name:
'XGPlayerFlvComponent',
data() {
return{
player:
null,//存储播放器实例
};
},
mounted() {
//初始化西瓜播放器
this.player= newPlayer({
id:
'video-container',//对应 HTML 容器的 ID
url:'http://192.168.168.154:8866/live?url=rtsp://admin:abc123456@192.168.168.168/video',//视频 URL
poster:'https://lf9-cdn-tos.bytecdntp.com/cdn/expire-1-M/byted-player-videos/1.0.0/xgplayer-demo-thumbnail.jpg',//视频封面
volume:0.6,//默认音量
playsinline:true,//移动端内嵌播放
autoplay:false,//是否自动播放
pip:true,//画中画支持
keyShortcut:'on',//启用快捷键
playbackRate: [0.5,1,1.5,2],//倍速播放选项
plugins: [FlvPlugin]
});
},
beforeDestroy() {
//销毁播放器,防止内存泄漏
if(this.player) {this.player.destroy();this.player= null;
}
},
};
</script>

<stylescoped>#video-container{margin:0 auto;display:flex;justify-content:center;align-items:center;background-color:#000;
}
</style>

视频流地址测试工具VLC,万能解码器

网络抓包这个话题,有一定开发经验的多少都有所了解,常用软件Wireshark,Fiddler用起来也非常傻瓜,本文不会涉及。 一般的介绍网络抓包的文章,到能抓到数据包通常就结束了。但显然认识工具是一会儿事儿,理解本质,清楚实践中啥时候该用,是另外一会儿事儿。让新人小白自行举一反三,可能多少有些困难。

近期正好参与了一些网络相关问题的排查, 在此从实践角度备忘下,网络抓包的本质意义,以及使用中的一些具体问题。

一. 分歧终端机——抓包

开发A: 我给你发了a,b和c

开发B: 我没收到b ,你肯定漏了。

开发A: 我肯定发了,我日志写了

开发B: 你肯定没发,我的日志只有a,b

……发了!没发!发了!没发!我不管了!我也不管了!

以上场景有些阅历的同志,应该都似曾相识。这种到底发了没发,收了没收的问题。打官司打到你那里,要如何断案呢?

如果双方都开源可见,人都在你治下,可以看看两边代码, 日志可能写的位置不正确,不是发送的最末端,或者接收的最前端,还可能可以解决。

如果是你调用第三方黑盒,可能就不那么配合了。

你:发你们这个指令你们没反应啊

第三方:你程序逻辑问题吧。

你:真发了,我日志你看。

第三方:你日志写的不对。

你:我日志写的对,不信你看代码。

第三方:看不懂,别人都没问题。你可能网络有问题……

你:我网络没问题。

第三方:设备可能信号不好吧, 你拿着它移动下。。。多移动下。

此时就需要引入权威第三方,网络抓包工具。做web开发的同学可能并没非常清楚的意识到,浏览器的请求网络调试功能,就是一个简单易用的抓包工具,所以Web页面调试,只需要看浏览器的网络调试,究竟是发的有问题还是接收处理问题,一目了然,很少扯皮。

对于非web开发的同学,就没有这么好的调试基础设施了。W
ireshark之类的抓包工具,其实就是实现了操作系统层面的网络调试工具。

抓包工具是调试双方的双向代理人, 究竟谁给谁发了啥,或者没发啥。它一清二楚,有最终解释权。
同时,抓包工具能够暴露更多的细节,TCP协议如今被广泛使用,对于发了,发不到,或者延时很大,能够观察到TCP协议层次的传输控制过程。

总结来说,通过在两端之间放置抓包工具可以分析,全部的通信异常的情况:
发没发出,发没发到,传输质量如何。

对于前面说的类型的官司,可以让他们装个抓包工具自己看。涉及第三方的 ,出示下抓包日志,也会让对方不把你作为新手小白, 重视你反馈的问题。

二、网络链路问题排查实践

现实中的问题通常比书本上复杂。 通信不会简单的发生于两个PC之间,只需要简单的各装一个抓包软件就能够使得通信过程一目了然。

你需要找出通信问题的设备其中一端,可能是一个封闭系统
,甚至嵌入式设备。封闭的系统上无法安装常规的抓包软件。分为几种情况。

1. 接入同一个集线器

集线器实际已经淡出历史舞台了,但是它可以让我们理解局域网抓包的原理及限制。

集线器不具备智能转发功能,会将一个端口(物理)接收到的数据,广播至所有连接的端口。
收到的网卡会根据MAC地址,忽略不是发给自己的包。(能收到只是自觉忽略)

网卡混杂模式,设置为混杂模式,会使得网卡不再直接忽略与自己无关的数据包,所有流经数据包都能够被捕获。

因为,集线器不智能,所以抓包变得很简单。只要插到相同的集线器上,所有发给调试设备和调试设备发出的网络数据包,都能够抓包PC的网卡捕获,从而实现抓包分析。

2. 接入同一个交换机

交换机比集线器智能了一些,其中的一个特性就是,智能转发, 该发给那个口的包,就发给那个口。
交换机不再无脑广播

导致了,按照交换机的接法,只简单连接,已经无法抓包了。因为待调试设备的数据包,并不会流经抓包pc接入的插口的网线,确实更安全了。

但因为有时候网络监控又是正义且合理的需求,于是交换机,多数具有一个类似于
端口镜像或者端口监控的功能。

其作用是将被监控端口的流量,复制到监控端口, 使得监控端口的网卡,可以捕获数据包。

在交换机设置中,打开对应的功能,又可以抓包啦。注意端口指的是物理插口,不要与网络端口混淆。

3.接入Wi-Fi

通过Wi-Fi抓包又引入了一些不同,比较靠谱的方案是
在上级交换机进行端口镜像,Wi-Fi路由器设置为AP模式,无线转有线,相对简单易于理解。

对于通过无线Wi-Fi进行抓包,我并没有做过实际测试,
仅做原理性的讨论。

物理上 Wi-Fi路由器有支持多频的,2.4G与5G。 连接两个频段的设备,我理解应该是不能捕获对方数据包的。

有鉴权加密的Wifi 路由器,需要用户名密码的, 会在通信时叠加应用加密协议。可能可以抓包,但应该无法解密。知道用户名密码,理论上可以,要额外根据wifi加密协议进行解密。

对于开放的Wi-Fi,无鉴权,不加密,对于连接在同频段,频道的设备,应该是可以捕获其他设备的数据包,正常的抓包。

~实际情况如何,望有实际经验的大神补充

在Excel中添加数据条是一种数据可视化技巧,它通过条形图的形式在单元格内直观展示数值的大小,尤其适合比较同一列或行中各个单元格的数值。这种表示方式可以让大量的数字信息一目了然。本文将介绍如何
使用Python在Excel中的指定单元格区域添加数据条

Python Excel库

Spire.XLS for Python 库
支持设置各种Excel条件格式以及其他Excel文档操作。该库可以直接使用以下pip命令安装:(也可以下载产品包后再从本地路径安装)

pip install Spire.XLS

Python 在Excel 中的添加数据条

为Excel中指定单元格区域添加数据条,需要通过Spire.XLS for Python库提供的条件格式的各类接口实现。主要步骤如下:

  1. 使用
    LoadFromFile()
    方法加载 Excel文件;
  2. 获取指定工作表,然后在其中添加条件格式并返回
    XlsConditionalFormats
    对象。
  3. 使用
    AddRange()
    方法指定需要应用条件格式化的单元格区域。
  4. 使用
    AddCondition()
    方法添加条件,然后通过
    IConditionalFormat.FormatType
    属性将其类型设置为数据条DataBar。
  5. 设置数据条的填充效果和颜色。
  6. 使用
    SaveToFile()
    方法保存文档。


Python代码:

from spire.xls import *
from spire.xls.common import *

#加载Excel文件
workbook =Workbook()
workbook.LoadFromFile(
"报告1.xlsx")#获取第一张工作表 sheet =workbook.Worksheets[0]#在指定单元格区域应用条件格式 xcfs =sheet.ConditionalFormats.Add()
xcfs.AddRange(sheet.Range[
"D2:D12"])#添加数据条 format =xcfs.AddCondition()
format.FormatType
=ConditionalFormatType.DataBar#设置填充效果和颜色 format.DataBar.BarFillType =DataBarFillType.DataBarFillGradient
format.DataBar.BarColor
=Color.get_Red()#保存文档 workbook.SaveToFile("Excel数据条.xlsx", ExcelVersion.Version2016)
workbook.Dispose()

输出文件:


Spire.XLS for Python库还支持设置其他条件格式,如交替行颜色、突出显示高于或低于平均值的数值、突出显示排名前几和后几位的数值等。

https://www.e-iceblue.cn/xls_python_conditional_formatting/python-apply-conditional-formatting-in-excel.html

一、引言

上一篇博客
分享了使用
Channel
来实现针对大量数据的多线程异步处理,感谢大哥们在评论中提出的宝贵的问题和建议!本篇将分享使用
ActionBlock
如何实现,欢迎在评论区留言讨论。

二、ActionBlock介绍

什么是 ActionBlock?

ActionBlock
是 .NET 中 TPL Dataflow 库的一部分,用于处理数据流和并行任务。它提供了一种简单而强大的方式来处理并行任务,并且可以轻松地实现生产者-消费者模式。

ActionBlock 的特点

  • 并行处理:
    ActionBlock
    可以配置为并行处理多个任务,从而提高处理效率
  • 异步编程:
    支持异步编程模型,可以避免阻塞线程,提高应用程序的响应速度和吞吐量
  • 数据流控制:
    可以通过设置最大并行度和其他选项来控制数据流的处理方式
  • 任务调度:
    可以用于调度和管理并行任务,确保任务按预期执行

ActionBlock 的使用场景

  • 生产者-消费者模式
    :可以用于实现生产者-消费者模式,其中生产者将数据发送到
    ActionBlock
    ,消费者从
    ActionBlock
    中读取数据并进行处理
  • 数据流处理
    :适用于需要处理大量数据并且需要并行处理的场景,例如日志处理、数据转换等
  • 任务调度
    :可以用于调度和管理并行任务,确保任务按预期执行

ActionBlock 的基本用法

使用
ActionBlock
非常简单,主要步骤如下:

  1. 创建 ActionBlock:定义一个 ActionBlock,指定要执行的操作和并行选项
  2. 发送数据到 ActionBlock:使用
    SendAsync
    方法将数据发送到 ActionBlock
  3. 完成 ActionBlock:在所有数据发送完成后,调用
    Complete
    方法通知 ActionBlock 不再接收新的数据
  4. 等待处理完成:使用
    Completion
    属性等待所有数据处理完成

以下是一个简单的示例代码,展示了如何使用 ActionBlock:

using System.Threading.Tasks.Dataflow;

var actionBlock = new ActionBlock<int>(async item =>
{
    // 模拟异步处理
    await Task.Delay(100);
    Console.WriteLine($"Processed item: {item}");
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 4 // 设置最大并行度
});

// 发送数据到 ActionBlock
for (int i = 0; i < 10; i++)
{
    await actionBlock.SendAsync(i);
}

// 完成 ActionBlock
actionBlock.Complete();
// 等待处理完成
await actionBlock.Completion;

Console.WriteLine("All items processed.");

三、假设场景

假设我们有一组数据需要经过两个步骤的处理。每个数据项都需要进行初步处理,然后进行进一步处理。希望步骤2可以在步骤1产生结果数据后立即开始处理,而不是等待步骤1完全处理完毕。

四、解决方案

使用
TransformBlock

ActionBlock
来实现生产者-消费者模式。生产者负责读取数据并将其发送到
TransformBlock
中,消费者从
TransformBlock
中读取数据并进行处理。
以下是一个简单的示例代码,演示如何使用
TransformBlock

ActionBlock
实现生产者-消费者模式来处理数据:

using System.Threading.Tasks.Dataflow;

var cts = new CancellationTokenSource();
// 假设有一组数据
var dataItems = Enumerable.Range(0, 1000).Select(x => $"data_{x}").ToList();

var processor = new DataProcessor(10, cts.Token);
await processor.ProcessAsync(dataItems);

Console.ReadKey();

/// <summary>
/// 数据处理器
/// </summary>
public class DataProcessor(int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    public async Task ProcessAsync(List<string> dataItems)
    {
        // 创建一个 TransformBlock 用于步骤1的处理,并将结果发送到步骤2的 ActionBlock
        var step1Block = new TransformBlock<string, string>(async dataItem => await Step1(dataItem), new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
            CancellationToken = cancellationToken
        });
         
        // 创建一个 ActionBlock 用于步骤2的处理
        var step2Block = new ActionBlock<string>(async dataItem =>
        {
            await Step2(dataItem);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
            CancellationToken = cancellationToken
        });

        // 将 TransformBlock 链接到 ActionBlock
        step1Block.LinkTo(step2Block, new DataflowLinkOptions { PropagateCompletion = true });

        // 启动多个步骤1的任务(生产者)
        foreach (var dataItem in dataItems)
        {
            await step1Block.SendAsync(dataItem, cancellationToken);
        }

        // 完成步骤1的 TransformBlock 的写入
        step1Block.Complete();
        // 等待步骤1的 TransformBlock 处理完成
        await step1Block.Completion;

        // 完成步骤2的 ActionBlock 的写入
        step2Block.Complete();
        // 等待步骤2的 ActionBlock 处理完成
        await step2Block.Completion;
    }

    private async Task<string> Step1(string dataItem)
    {
        // 模拟步骤1的处理(如初步处理数据)
        await Task.Delay(10, cancellationToken);
        Console.WriteLine($"Step1 processed data item: {dataItem}");
        return dataItem;
    }

    private async Task Step2(string dataItem)
    {
        // 模拟步骤2的处理(如进一步处理数据)
        await Task.Delay(10, cancellationToken);
        Console.WriteLine($"Step2 processed data item: {dataItem}");
    }
}

代码解释:

  1. 创建Step1的 TransformBlock
    :在
    ProcessAsync
    方法中,我们首先创建了一个 TransformBlock,用于Step1的处理,TransformBlock 接受一个输入数据项,进行处理后返回一个输出数据项,
    TransformBlock<string, string>
    表示输入和输出都是
    string
    类型
  2. 创建Step2的 ActionBlock
    :创建一个 ActionBlock 用于Step2的处理,ActionBlock 接受一个输入数据项并进行处理,但不返回输出数据项。
    ActionBlock<string>
    表示输入是
    string
    类型
  3. 链接 TransformBlock 和 ActionBlock
    :将 TransformBlock 链接到 ActionBlock ,以便将Step1的处理结果发送到Step2进行处理,使用
    LinkTo
    方法将两个块连接起来,并设置
    PropagateCompletion
    为 true,表示当 TransformBlock 完成时,ActionBlock 也会完成
  4. 启动Step1的任务
    :逐个将数据项发送到 TransformBlock,并等待所有数据处理完成,使用
    SendAsync
    方法将数据项发送到 TransformBlock
  5. 等待任务完成
    :使用
    Complete
    方法通知 TransformBlock 不再接收新的数据,并使用
    Completion
    属性等待所有数据处理完成。然后完成Step2的 ActionBlock 的写入,并等待Step2的 ActionBlock 处理完成

区块链技术已经好多年没有听到有人提了,不过比特币却一直是不是的又新闻出现,当然国内已经把比特币交易归入到了不合法的地位了。区块链技术是国家战略的技术,但是这个技术说实话确实不是很高深,或者说蛮easy的,找几个C++技术好的人就完全可以从零开始手撸一个新版比特币出来,但是之所以区块链被归入到国家战略技术,其主要原因是这种区块链的去中心化加密技术是前所未有的,人们是无法准确预估其未来的价值的,但是作为一个新技术的出现,而且还是从来没有过的,这就和P2P技术刚出来一样,之前没有过,于是大家都要认真对待,这或许就是中国人内在的危机意识罢了。但是,为啥最近几年区块链技术已经没有人提了呢,我们可以看下作为中国区块链技术的顶级企业,迅雷的区块链技术,我们看下其官网:

https://blockchain.xunlei.com/


从上面可以看到迅雷的区块链技术宣传已经是2019年的事情了。

image

而现在如果要登录迅雷的区块链技术的官方网站也是一件困难的事情了,至少在密码找回的验证码的服务器应该是不工作了,估计是太久没有人用也没人维护就无法连接了。


我们可以看下迅雷的区块链的总用户量和上链的应用数以及时间:

https://blockchain.xunlei.com/explorer/#/


image

可以看到,迅雷的区块链注册用户只有1457个,也就是说全部运行的话那么这个链的节点数也就是1457个,当然这还必须是全部都运行的话,而所有的加入这个区块链的合约只有93个,这就是说总共只有93个app是注册到这个区块链上的,换句话说也就是最多只有93个APP会往这个区块链上写入数据。


我们可以看到,迅雷区块链上最后的交易是停留在2020年的,也就是说2020年之后就再也没有人使用这个区块链,换句话来说就是迅雷的区块链是已经有4年没有写入过数据了。

image

image

image


我上一次使用区块链是去年,那时是使用中国红十字会的捐款记录的区块链,估计这个区块链的运行节点应该是在两位数,最多不会超过三位数,如果一个区块链只有十几个节点在运行,甚至只有几个节点在运行,甚至只有一个节点在运行,那么这个区块链还有什么意义呢,或者说这种情况下的区块链和传统的中心化记账系统又有什么区别呢,难道就为了一个名字好听吗。想想自己最初使用区块链的时候还是在十多年前,那时候身边同学有玩比特币的,那时候比特币的价格从几十块,几百块的价格已经涨到了千元以上,搞比特币交易无非就是拿一两个月的生活费倒手赚个百十块钱,然后出去吃个火锅或者烤肉,那时候玩比特币也就是为了这个新奇的技术,那时候玩比特币和玩电驴P2P和暴风影音是一样的,都是为了个新奇,自己也没有继续搞,后来也没本钱搞了,不过自己也不喜欢风投,因此只赶上了技术兴起却没赶上投资赚钱,不过这个也没担风险,也就无所谓了。


迅雷的区块链教程的最新发布时间也是在2020年,如:

https://mp.weixin.qq.com/s/ZuWFaDZXepmBCwm37gONIA

image

可以说,在某种意义上来说,迅雷的区块链已经名存实亡了,那么这是不是也意味着整个区块链技术市场也都大多是这个情况了呢。