2024年2月

作者:来自 vivo 互联网短视频研发团队

本文根据蔡创业、马运杰老师在“2023 vivo开发者大会"现场演讲内容整理而成。

在线点播场景,播放体验提升与成本优化是同等重要的两件事,并在部分场景体验优化与成本优化存在一定的互斥关系。vivo短视频深入分析播放链路的每个环节、并结合大数据统计,探索出了多种的体验优化策略;同时针对成本优化,上线了转码、PCDN、共享闲时带宽等多种策略用于降低带宽成本。基于技术优化和业务发展的要求,vivo短视频还上线了系统性的监控体系,对播放体验、带宽成本进行了多维度的监控。

一、业务介绍

vivo短视频项目的业务架构,从链路上来说,主要包括:

  • 内容生产,主要包括拍摄、导入、剪辑以及作品上传

  • 视频处理,包括画质增强、转码、压缩等

  • 分发

  • 消费,包括预加载、视频播放

除此之外,我们还设计了若干个子系统,其中比较重要的有:

  • 日志收集,主要用于收集用户主动反馈问题。

  • 监控统计,主要用于监控线上核心指标,为后续优化提供方向。

  • AB测试,主要用于新功能验证。

技术架构的最终目的是为了给用户呈现一个有着良好体验的产品,同时又兼顾到开发、运营成本。通过我们产品内置的意见反馈、大数据统计以及用户调研,我们发现/用户对vivo短视频的反馈主要集中在播放卡顿、不流畅、画质不清晰上面,从成本方面,我们的主要压力来自于CDN。

这些也就是本文的主题,即:如何做到既要播放体验好,又要业务成本低。接下来,将分别介绍一些在播放流畅度以及成本优化相关的探索和实践。

图片

二、体验优化

2.1 播放链路拆解

对于在线点播场景来说,影响用户体验的环节主要在视频开播以及播放过程。首先,在起播阶段,应尽力做到首帧零耗时,当用户观看视频时应直接展示视频画面而不是出现等待过程。

我们对开播过程做了拆解和监控,在无任何优化策略的场景下,网络连接环节耗时占比30%,下载环节占比15%, 解封装、解复用环节占比15%。

其次,在视频播放过程中,应做到流畅播放不卡顿,这就需要合理的预加载策略、码率控制以及下载策略。

通过以上的分析,确定了我们4个重点
优化方向

  • 建连优化,通过连接复用、保活等方式减少在连接环节的耗时

  • 分片下载,通过优化下载架构提升下载速度以及成功率

  • 预渲染,把耗时的解封装、解复用、解码等环节前置

  • 数据预加载,通过预加载减少在下载环节的耗时

在此基础上,我们上线了数据监控体系,对开播性能、优化策略、视频基础信息、画质、带宽利用率等方面进行了详细的监控。

图片

2.2 建连优化

通过以下4个策略来降低建连环节的耗时:

  • 在应用冷启动以及视频播放时,通过连接复用,减少了DNS解析、SSL以及TCP连接环节的耗时。

  • 在视频播放过程中,用户可能因为某些原因暂停视频,比如把应用切到后台,几分钟后再打开 应用,这种情况下可能出现连接被断开,当恢复播放时需要重新建连,从而导致播放卡顿的问题。通过连接保活,可确保再次播放时视频快速下载、快速 开播。

  • 传统的local dns可能会出现解析DNS劫持、解析缓慢等问题,通过http dns,可有效应对DNS劫持以及解析缓慢的问题,为了应对复杂多变的网络环境,我们还通过在线配置的方式,支持多种dns解析以及降级策略。

  • 当DNS解析失败时,通过服务端下发的IP实现直连,从而提升连接成功率。

以上就是我们在网络连接环节的一些优化策略,下面介绍我们在视频下载环节的优化。

图片

2.3 分片下载

为了实现预加载,即在视频播放之前把在线视频提前下载到本地,我们在播放器和CDN直接增加了一个本地代理的服务,播放的网络请求都由本地代理服务响应,本地代理服务再向CDN发起请求。在弱网下测试,我们发现
卡顿情况比较严重,
主要是存在不必要的网速竞争,并且常规单线程下载效率低于多线程并发下载。为此,我们通过分析协议以及多次实验,最终确定了全新的下载方式,即首个视频起播时使用单线程,后续的视频下载以及预加载都通过多线程分片请求完成。

同时,把播放器 与本地代理之间的socket通信方式修改为直连,避免了socket中不必要的读写缓冲区浪费。通过这样的调整,首帧耗时降低3.8%,播放失败率下降9%;并且还具备了播放过程中实时切换cdn的能力,即在首次开播时使用性能较好的标准cdn,在缓存较充足时切换为低成本的cdn。

图片

2.4 预渲染

通过以下优化策略提升预渲染效果:

  • 应用冷起后,首个视频的开播体验对用户的后续消费有着非常重要的影响,为了提升该的播放体验,在应用启动时预创建H265以及H264对应的codec实例,在开播环节直接使用预创建的解码器,开播耗时可降低50ms左右。

  • 播放器的创建环节涉及到较多的流程,比较耗时;另外,常规的每次播放视频都创建一个播放器对象的方式,容易出现因播放器对象泄漏导致的OOM、ANR甚至播放失败。基于这两个问题,我们创建了全局复用的播放资源池,每次视频播放时都从资源池中直接获取已经创建好的播放器对象,通过该策略,可有效的降低播放器创建耗时,并且彻底解决了播放器实例泄漏问题,对系统稳定性以及播放成功率都有明显的改善。

  • 前面我们介绍了预加载策略,即在视频播放之前提前下载部分数据到本地,视频播放时直接播放本地准备好的缓存,开播速度较优化前有明显的提升;但本地视频开播仍需要嗅探、解封装、解码这些环节,开播耗时仍存在优化空间。因此,我们基于播放器资源池,使用另一个空闲的播放器对象来提前完成下一个即将播放的视频的嗅探、解封装、解码过程,通过这个策略,首帧耗时可降低到50ms以内。

图片

2.5 预加载策略

首先介绍的是固定大小的预加载策略:视频播放时,把后续5个视频添加到缓存待下载队列,每个视频使用固定的预加载大小,当前视频的缓存处于高水位时,开始下载缓存队列中视频,当前视频缓存处于低水位即有可能即将发生卡顿时,停止下载缓存队列中的视频。

这个方案整体实现比较简单,但存在两个
问题

  • 预加载大小是固定的,未能与视频码率、时长关联,当视频码率、时长发生变化时,可能会出现缓存不足或者缓存浪费。

  • 对于一条用户会重度消费的视频,应提升预加载大小,从而提升用户在播放过程中的流畅度。

为了解决以上两个问题,我们上线了动态预加载策略。

图片

动态预加载就是在固定预加载的基础上,做了如下调整:

  • 缓存分级,把固定预加载策略中的单一缓存调整为3个不同优先级的缓存,优先级高的缓存较小,优先下载,优先级较低的缓存较大,下载优先级较低;一级缓存下载结束后开始下载二级缓存,二级缓存下载完成后再下载三级缓存。

  • 缓存大小不再固定,修改为根据视频时长和预加载时长动态计算当用户快速滑动时,一般情况下会命中一级缓存,确保视频可以顺利开播;当用户在列表中重度消费时,后续的视频将有充足的时间完成三级缓存的下载。

动态预加载策略上线后,首帧耗时降低了2.3%,卡顿率降低了19.5%,当然,这个策略也存在的明显的问题,即体验提升了,但带宽成本也提升了。我们需要思考,如何在不增加成本的前提下提升体验。

图片

我们来看一个示例:在一个视频列表中,有些视频用户喜欢观看,完播率较高,有些视频用户不感兴趣,会快速滑过。

也就是说,只有深度消费的视频,才真正的需要二级和三级缓存,快滑的视频能快速开播即可。基于这样的一个普遍性的案例,我们结合视频的观看时长调整了预加载策略。

现在介绍的是智能预加载策略,整体流程如下:

  • 首先,在云端基于视频基础特征比如码率、时长、清晰度,网络以及时段、历史行为等特征,使用深度神经网络创建、训练模型,用于预测一个视频用户会深度消费还是快速滑过。

  • 其次,模型导出、转换之后,部署在客户端,在视频开播之前预测消费深度。

  • 最后,播放器仍保留之前的一级缓存,并作为最高优先级进行下载;所有的一级缓存都下载完成后,根据预测的消费深度调整二级缓存的大小,如果该条视频会重度消费,则开启二级缓存,否则放弃二级缓存。

这个方案目前还在实验中,后续我们也将持续探索机器学习在播放上的其他应用。

图片

体验优化的效果需要被准确、客观的衡量,并且能准确反映用户的真实体验。

在起播环节,我们设计了两个P0指标,即首帧耗时和失败率,同时,我们也设计了若干个p1指标,包括缓存大小、缓存命中率、预渲染命中率,下载速度等,这些指标的波动直接会影响到P0指标。

在视频播放环节,我们设计了卡顿率、卡顿时长以及seek卡顿等P0指标,同时,设计了百秒卡顿时长、百秒卡顿次数、缓存利用率等p1指标作为对P0指标的补充。

基于以上指标以及视频基础信息、预加载预渲染策略信息,我们设计并上线了分层监控系统,自上往下共分为
4个层级

  • P0指标
    :包括开播耗时、卡顿率等,这些是我们最为关注的核心指标

  • P1指标
    :作为对P0指标拆解和补充

  • 策略指标
    :包括预加载开启率、命中率、预渲染开启率、命中率,这两个策略对播放体验的影响比较明显

  • 最后一层是
    视频基础信息
    ,包括码率、时长、画质分等,这些指标也会影响到核心性能指标

图片

三、成本优化

前面介绍了我们短视频在播放体验方面所做的一些努力,经过前述的这些优化,目前我们短视频的播放流畅度已经达到行业内的一流水平。而随着业务的不断发展,播放的成本也随之水涨船高,成为业务必须要应对处理的首要问题,接下来将和大家分享下我们在播放成本优化方面的一些思考及实践方案。

3.1 成本拆解

首先,我们先了解下播放成本包含了哪些成本。它主要是由CDN成本、存储成本以及进行转码压缩等所需要的计算成本组成,而这里面CDN成本占了总成本的80%左右,是最大的一个成本来源,所以,接下来我们的分享主要是围绕如何降低CDN成本展开。

要知道怎么进行CDN成本的降低,就需要了解哪些是影响CDN成本的重要因素。如下图所示:

图片

  • 第一层拆解,CDN成本 = 单价x用量,这个相信大家都能很好理解。

  • 第二层的拆解,正常情况下,用量=用户实际播放的视频时长乘以视频的码率,而前面我们也介绍过,为了提升视频播放的起播速度,降低播放过程中的卡顿率,我们会对视频进行预加载及预缓冲,那这部分量可能最后用户并没有产生播放行为,也就造成了流量的浪费。因此我们引出了流量利用率的概念,在后续还会详细介绍。这里我们需要知道的是CDN实际计费的用量=用户播放的时长x码率除以流量利用率。

那现在有4个最基础的因素会影响成本,分别是单价、时长、码率、流量利用率,其中时长是业务追求的增长目标,无法用以降本,因此其他三个因素就成了我们重点
优化的方向
,它们分别是。

  • 寻找方案降低单价

  • 对视频码率进行极致压缩

  • 对利用率进行治理提升流量利用率

在正式介绍我们的降本方案之前,我们还需要先思考一个问题:成本的降低往往带来的是服务质量的降低,我们需要如何才能在保证播放体验的同时,降低播放成本,也就是大家经常讨论的,如何做到体验与成本的非零和博弈。

3.2 单价降低

本小节将为大家介绍我们的第一个降本方向,CDN单价的降低。

3.2.1 引入PCDN

我们的第一个方案是引入单价更低的PCDN技术,PCDN是目前一种新兴的内容分发网络,其主要是利用路由器、小盒子等廉价的边缘设备代替标准CDN的边缘节点,由于接入设备及接入网络更加廉价,故而成本相对于标准CDN,要低很多。

其网络架构如下图所示,APP通过SDK访问PCDN的边缘节点,如果内容热度值低,PCDN节点中没有该视频的缓存,则返回302状态码给到客户端,客户端再去访问标准CDN获取资源,当视频热度达到一定阈值时,PCDN会去标准CDN上回源获取对应的视频资源向客户端提供服务。

图片

从这里我们可以看到,PCDN的节点性能相比标准CDN更差,而访问时会有一定几率进行302跳转,增加了链路的耗时,所以必然会对视频播放时的起播速度和卡顿率造成较大的影响。

对此,我们制定了多个优化措施,来降低播放体验的受损情况,在成本和体验之间进行平衡。

(1)播放器策略优化

图片

第一个优化点是通过播放缓冲水位去控制是否走PCDN,在视频起播时使用高性能的标准CDN进行分片下载,而只有当前视频的缓冲数据达到阈值后,才使用PCDN进行下载,利用缓冲视频的时长,可以有效抵消PCDN的链路耗时增加。

第二个优化点是,在视频起播阶段发送1字节的探测包到PCDN节点,以此来确定PCDN节点上是否存在对应的视频内容,不存在时则后续分片都从标准CDN拉取,存在的话后续再走PCDN,这样可以大大减少302跳转发生的概率。

经过以上两个优化后,引入PCDN对我们的播放体验已基本不会产生负面影响了。以此,我们达到了保证播放体验的同时,降低了CDN成本的目的。

(2)业务策略优化

图片

由于PCDN节点服务的主要是热点视频,因此我们对热点分发的场景进行了梳理,对这些场景用到的视频提前预热到边缘节点。由于做了预热,PCDN节点提前缓存了相关视频,出现302跳转的概率进一步减少,因此我们可以适当降低缓冲水位的阈值,提前请求到PCDN节点。基于此方案,我们做了相关的A/B实验,实验的结果是,我们在保证QoE不变的情况下,提升了PCDN的分享率,即,会有更多的流量走到PCDN节点上,进一步降低了CDN成本。

当前我们PCDN分享率在46%左右,在探测成功后,只有1%以内的流量会出现302跳转,基本可以忽略不计。通过线上长期实验组的观测,卡顿率和起播耗时上下波动,无显著负向。

以上是我们进行单价降低的第一个方案,接入PCDN,接下来我们看下
另外一个方案

3.2.2 共享闲时带宽

行业内CDN计费的方式有多种,包括流量计费,峰值带宽计费等,对于峰值带宽计费,是以每天的带宽最高点作为计费值的,这种计费方式,对于持续稳定的流量会更加合适。而我们短视频是一个用户实时消费视频的应用,访问热度会有明显的波峰和波谷,比如中午大家休息的时候以及下班的时候会有更多的时间去看视频,而夜里睡觉时,业务的流量则相对较低。正常情况下我们CDN的带宽波形如下图所示,可以看到闲时我们的带宽是很低的,造成了很大的浪费。

图片

针对此种情况,我们和公司内其他业务进行合作,引入了他们的流量进行填谷,共享了我们的闲时带宽。可以看到填谷后,蓝色区域的比重明显增加了很多,其他业务会针对这部分流量进行成本分担,因此相当于降低了我们的成本单价。通过上述的PCDN及共享闲时带宽,CDN的单价得到了大幅的降低,从而有效节省了CDN成本,是我们进行降本的重要手段。

3.3 极致压缩

接下来,是我们降本的第二个方向,对视频进行极致的压缩从而降低视频的码率。

我们当下遇到的
问题
是:一个视频的清晰度是和码率强相关的,而为了达到相同的清晰度,不同内容场景的视频所需的码率是不一样的。

先前,为了保证用户的播放体验,减少低质视频的出现,我们设置的码率标准较高,导致很多视频没有得到有效的压缩。

在解决这些问题时,我们面临着这样几个
困难

  • 第一是内容库中有千万量级的视频内容,内容量非常大;

  • 第二是这些视频场景非常复杂,千变万化;

  • 第三则是这些原始视频的质量也是参差不齐的。

为了能够在这样的情况下进行成本优化,我们需要根据视频的内容特征,自适应调整编码参数,在保证视频清晰度的前提下,对视频进行极致压缩。

接下来,来看下我们是如何做的。

图片

首先我们基于神经网络,自研了一套内容自适应编码算法。算法模型的训练过程如下:首先第一步我们会根据内容库中的场景标签,从内容库中收集足够大视频数据集作为模型的训练数据。

然后对训练集中的视频进行不同比例的压缩转码作为Ground Truth,再对压缩后的视频提取特征,这些特征包括视频的复杂度特征、码率信息、画质特征、码流特征等;最后利用上述特征,进行神经网络拟合训练,得到视频质量与压缩率的关系模型,该模型可以预测压缩比例和视频压缩质量之间的对应关系曲线。

图片

上图展示的是我们模型的预测效果,白色弧线是视频在经过不同比例压缩后得到的VMAF曲线,是实际的Ground Truth,而蓝色曲线则是我们模型的预测曲线,可以看到预测曲线和Ground Truth非常接近。于是,我们便可以通过预测曲线,在保证清晰度一致的情况下,确定不同视频需要的编码参数,达到内容自适应编码的目的。

最后再来看下我们极致压缩的
完整流程

图片

  • 首先第一步是对视频进行前置增强处理,这一步处理的目的是提升原始视频的画质,从而可以抵消一部分视频编码带来的损伤,另外去除噪声等退化也有利于压缩过程中降低视频的码率。

  • 在经过增强修复后,需要对视频进行场景划分,一个视频可能包含多个场景,这些场景所需要的编码参数也不一样,通常我们称之为Per-title分场景编码,更细粒度的选择不同的编码参数对视频进行压缩。

  • 第三步就是对每一个场景的视频提取视频特征,通过我们的自适应模型决策出最佳的编码参数,进行编码合成,最后得到输出视频。

通过这几步的处理,我们的压缩率相比之前有了大幅度的降低,通过线上的数据统计,我们的平均压缩率从60%降低到了40%,可以看到这个优化效果非常明显,以上就是我们极致压缩的方案,通过进一步压缩码率,降低我们的CDN成本。

3.4 利用率治理

我们的最后一个优化方向是
利用率治理

先解释下什么是流量利用率:在播放过程中,为了提升播放的流畅度,需要提前去缓存当前视频以及预加载后面的视频,同时网络层也会有socket的buffer,用户如果使用不到这些流量,那就会产生流量浪费,这些浪费的流量与用户实际播放的流量相加,就是CDN实际产生的流量,流量利用率就等于实际播放的流量除以CDN实际产生的流量。

由此可见,在整个播放链路上,都存在流量的浪费,而我们希望通过利用率的治理,控制并减少这些浪费的产生。

图片

3.4.1 利用率漏斗建设

我们
需要治理的第一个问题是:在版本迭代的过程中,播放的策略也是在不断优化的
,比如我们可能为了体验着想,增加了预加载的数量,或者做了多级预加载。但是在做这些优化的时候,我们无法有效的衡量每个优化究竟对CDN成本带来了多大的变化,会不会增加流量的浪费。

对于这个问题,我们的
解决方案是,针对每个版本,建立了如下图所示利用率漏斗,并加入到了灰度报告中
,严格监控每个版本出现的流量浪费情况,防止播放策略优化导致CDN成本大幅增加。

图片

3.4.2 利用率提升

我们需要
治理的第二个问题是:对于预加载或者预缓冲等策略,我们都会设置一个上限阈值,保证体验的同时防止消耗过多流量
,但是这个阈值一直都是以我们工程师的经验设置的,无法确切的知道这个阈值是否合理。

针对这个问题,我们的
治理方案是,通过前述建立的漏斗数据,针对阈值进行线上A/B实验,通过收集不同阈值下体验和成本的变化数据,找到投产比拐点,从而确定最优阈值。

图片

3.4.3 治理效果

这边展示了我们治理后的
效果

(1)首先在治理前,如下图左侧显示,有两个突刺点,这是因为某些版本优化时,没有识别到成本的增长量级,并且在灰度期间无法明显看出成本的变化,及时止损,最终导致我们的CDN带宽大幅增长。而
在治理后,可以看到,我们的带宽变得平稳很多,不再出现突刺点,带宽突增问题得到了有效。

(2)其次,如下图右侧所示,在治理前,我们的流量利用率在60%左右,而在
治理后,我们在保证体验不受损的情况下,将利用率提升到了70%,从而节省了相应的CDN成本。

通过对流量利用率的监控和治理,我们可以清晰的掌控播放链路中每个节点可能产生的流量及带宽情况,找到成本优化点,降低我们的CDN成本。

图片

四、总结&展望

如下图所示,可以看到我们的优化结合了大数据、A/B实验、AI技术等,通过对这些技术的应用,我们进行了播放体验和播放成本两个方面的优化。

图片

  • 首先是对播放体验优化,我们依次从网络层、播放层、应用层进行了相关的策略优化,这里面主要包括分片下载、预加载/预渲染、分级缓存等策略。

  • 其次是播放成本的优化,我们分别从单价、码率、利用率等方向进行了降本,这里面主要包括PCDN、极致压缩、利用率治理等方案。

这些优化方案,是我们短视频团队长时间的实践积累,帮助我们在体验和成本之间做到了双赢。

图片

最后是我们未来的一些展望,我们会持续聚焦音视频前沿技术,在压缩编码方面,我们会去研究引入H266技术,进一步压缩视频的码率,而在增强方面,我们会对端侧增强技术进行预研,通过端云协同增强,进一步做到降本增效。

背景

有个需求,原先只涉及到一种A情况设备的筛选,每次筛选会经过多个流程,比如先a功能,a功能通过再筛选b功能,然后再筛选c功能,以此类推。现在新增了另外一种B情况的筛选,B情况同样需要A情况的筛选流程,并且需要在A情况的基础上,新增另外的功能筛选,这里假设A需要a、b、c功能的筛选,而B需要a、b、c、d功能的筛选,并且这些功能的筛选的顺序可能发生变动,比如新增了某个筛选,这个筛选涉及到的计算量少那肯定可以把这个置在前面先处理,不满足条件就return,咋一看,这个需求很符合责任链模式的应用场景,下面介绍编码。这里的代码参考了
马丁
玩编程 在其12306项目里面的责任链模式,并做出一些相应改动,以适配当前的场景。

代码

责任链模式顶层接口

这里继承了Ordered类,是为了方便后续对处理器进行排序。

public interface AbstractChainHandler<REQUEST> extends Ordered {

    default boolean handler(REQUEST requestParam){

        return true;
    };
    
}

A情况的接口和B情况的接口。

public interface DeviceTypeAChainFilter extends AbstractChainHandler<DeviceFilterBO> {

}
public interface DeviceTypeBChainFilter extends AbstractChainHandler<DeviceFilterBO> {

}

定义成接口,后续往里面添加处理器的时候,方便查看当前A规则和B规则都有哪些处理器:

image-20240206171639069

具体的处理器

处理器1:

@Component
public class DeviceFunctionChainHandler implements DeviceTypeAChainFilter, DeviceTypeBChainFilter {

    @Override
    public boolean handler(DeviceFilterBO deviceFilterBO) {
        if (deviceFilterBO.getDeviceBO().getCondition() % 2 == 0) {
            System.out.println("处理器A:筛选功能不通过");
            return false;
        }
        // 筛选功能
        System.out.println("处理器A:筛选功能通过");
        return true;
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

处理器2:

@Component
public class DeviceResolutionChainHandler implements DeviceTypeAChainFilter, DeviceTypeBChainFilter {

    @Override
    public boolean handler(DeviceFilterBO deviceFilterBO) {
        // 分辨率支持
        System.out.println("处理器B:分辨率支持");
        return true;
    }

    @Override
    public int getOrder() {
        return 10;
    }

}

处理器3:

@Component
public class DeviceCaculateOutputChainHandler implements DeviceTypeBChainFilter {

    @Override
    public boolean handler(DeviceFilterBO deviceFilterBO) {
        // 接口支持
        System.out.println("处理器C:输出接口支持");
        // 计算设备数量满足要求
        System.out.println("处理器C:根据输出接口计算的设备数量满足要求");
        return true;
    }

    @Override
    public int getOrder() {
        return 30;
    }
}

处理器4:

@Component
public class DeviceCaculateInputChainHandler implements DeviceTypeAChainFilter, DeviceTypeBChainFilter {

    @Override
    public boolean handler(DeviceFilterBO deviceFilterBO) {
        if (deviceFilterBO.getDeviceBO().getCondition() % deviceFilterBO.getCondition() == 0) {
            System.out.println("处理器D:输入接口不支持");
            return false;
        }
        ArrayList<DeviceBO> deviceRes = (ArrayList<DeviceBO>) AbstractChainContext.threadLocal.get();
        deviceRes.add(deviceFilterBO.getDeviceBO());
        // 接口支持
        System.out.println("处理器D:输入接口支持");
        // 计算设备数量满足要求
        System.out.println("处理器D:根据输入接口计算的设备数量满足要求");
        return true;
    }

    @Override
    public int getOrder() {
        return 40;
    }
}

可以看到,处理器都用@Component进行标识,后续通过ioc容器获取这些处理器进行分类和执行。并且,可以看到A..filter接口有三个实现者,这说明A有三种处理器,同理B有四种处理器,并且由于顶层接口继承了Order类,所有具体的处理器都会标识当前的order,如上面的10,20,30...这里把Order的数字间隔放大一些,比如10,20,30,如果以后要往这些间隔插入新的处理逻辑也方便。

获取具体处理器和执行hanlder的上下文类

先将不同的处理规则的接口都放在某个特定包下

image-20240206172405349

先去扫描这个包下的所有接口,然后再去Spring Ioc容器里面拿出这些接口的实现类,把不同的接口实现类按接口名字作为标识,按Order对这些实现类进行排序,然后放到一个List里面,以接口名字作为key,实现类List作为value,后续调用链式调用的时候,传入具体的接口名字(处理规则名字),实现链式顺序调用,具体实现如下

AbstractChainContext上下文类:

public final class AbstractChainContext<REQUEST, RESPONSE> implements CommandLineRunner {

    private final static Map<String, List<AbstractChainHandler>> abstractChainHandlerContainer = new HashMap<>();

    public final static ThreadLocal threadLocal = new ThreadLocal<>();

    public void handler(String mark, REQUEST requestParam) {
        List<AbstractChainHandler> abstractChainHandlers = abstractChainHandlerContainer.get(mark);
        if (CollectionUtils.isEmpty(abstractChainHandlers)) {
            throw new RuntimeException(String.format("[%s] Chain of Responsibility ID is undefined.", mark));
        }
        for (AbstractChainHandler abstractChainHandler : abstractChainHandlers) {
            if(!abstractChainHandler.handler(requestParam)){
                break;
            }
        }
    }


    @Override
    public void run(String... args) {
        List<Class<?>> interfaces = getInterfacesInPackage("com.zh.demo.designpattern.chain.type");
        for (Class<?> interfaceType : interfaces) {
            Map<String, AbstractChainHandler> beansOfType = (Map<String, AbstractChainHandler>) ApplicationContextHolder.getBeansOfType(interfaceType);
            // 转成list
            List<AbstractChainHandler> sortedList = beansOfType.values().stream()
                    .sorted(Comparator.comparing(Ordered::getOrder))
                    .collect(Collectors.toList());
            int index = interfaceType.getName().lastIndexOf(".") + 1;
            abstractChainHandlerContainer.put(interfaceType.getName().substring(index), sortedList);
        }
    }

    public static List<Class<?>> getInterfacesInPackage(String packageName) {
        List<Class<?>> result = new ArrayList<>();
        try {
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            String path = packageName.replace('.', '/');
            Enumeration<URL> resources = classLoader.getResources(path);

            while (resources.hasMoreElements()) {
                URL resource = resources.nextElement();
                File directory = new File(resource.getFile());
                File[] files = directory.listFiles();

                if (files != null) {
                    for (File file : files) {
                        if (file.getName().endsWith(".class")) {
                            String className = packageName + '.' + file.getName().replace(".class", "");
                            Class<?> clazz = Class.forName(className);

                            if (clazz.isInterface()) {
                                result.add(clazz);
                            }
                        }
                    }
                }
            }
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }

        return result;
    }
}

在上面变量中,用了个 public final static ThreadLocal threadLocal = new ThreadLocal<>(); 这个是用来保存设备的筛选列表。

定义好不同筛选规则的枚举类:

public enum DeviceChainMarkEnum {

    /**
     * A设备过滤器
     */
    DEVICE_TYPEA_FILTER("DeviceTypeAChainFilter"),

    /**
     * B设备过滤器
     */
    DEVICE_TYPEB_FILTER("DeviceTypeBChainFilter");

    String name;

    public String getName() {
        return name;
    }

    DeviceChainMarkEnum(String name) {
        this.name = name;
    }

}

Service的编写

@Service
@RequiredArgsConstructor
@Slf4j
public class DemoServiceImpl implements DemoService {

    private final AbstractChainContext<DeviceFilterBO, Object> devcieTypeChainContext;

    @Override
    public List<DeviceBO> filterDeviceTypeA(ParmDTO parmDTO) {
        ArrayList<DeviceBO> deviceList = new ArrayList<>();
        // 简化条件
        parmDTO.setCondition(2);
        // 实际情况应该是从数据库读取设备的信息
        for (int i = 0; i < 5; i++) {
            DeviceBO deviceDTO = DeviceBO.builder().condition(new Random().nextInt(100)).build();
            deviceList.add(deviceDTO);
        }
        ArrayList<DeviceBO> deviceRes = new ArrayList<>();
       // 把需要的结果放到threadLocal中,在具体的处理器中对结果List进行处理
        AbstractChainContext.threadLocal.set(deviceRes);
        // 筛选多个设备 对符合的设备加入到deviceRes
        for (DeviceBO deviceBo : deviceList) {
            DeviceFilterBO deviceFilterBO = DeviceFilterBO.builder().condition(parmDTO.getCondition()).deviceBO(deviceBo).build();
            // 以A规则进行处理
            devcieTypeChainContext.handler(DeviceChainMarkEnum.DEVICE_TYPEA_FILTER.getName(), deviceFilterBO);
        }
        AbstractChainContext.threadLocal.remove();
        System.out.println("筛选结果数量:" + deviceRes.size());
        return deviceRes;
    }

    @Override
    public List<DeviceBO> filterDeviceTypeB(ParmDTO parmDTO) {
        ArrayList<DeviceBO> deviceList = new ArrayList<>();
        // 简化条件
        parmDTO.setCondition(2);
        // 实际情况应该是从数据库读取设备的信息
        for (int i = 0; i < 5; i++) {
            DeviceBO deviceDTO = DeviceBO.builder().condition(new Random().nextInt(100)).build();
            deviceList.add(deviceDTO);
        }
        ArrayList<DeviceBO> deviceRes = new ArrayList<>();
        // 把需要的结果放到threadLocal中,在具体的处理器中对结果List进行处理
        AbstractChainContext.threadLocal.set(deviceRes);
        // 筛选多个设备 对符合的设备加入到deviceRes
        for (DeviceBO deviceBo : deviceList) {
            DeviceFilterBO deviceFilterBO = DeviceFilterBO.builder().condition(parmDTO.getCondition()).deviceBO(deviceBo).build();
            // 以B规则进行处理
            devcieTypeChainContext.handler(DeviceChainMarkEnum.DEVICE_TYPEB_FILTER.getName(), deviceFilterBO);
        }
        AbstractChainContext.threadLocal.remove();
        System.out.println("筛选结果数量:" + deviceRes.size());
        return deviceRes;
    }

}

这里假设有五种设备,每个设备通过DeviceBO里面的condition设置条件,演示一遍筛选过程

DeviceBO类:

@Builder
@Data
public class DeviceBO {

    private int condition;

}

演示筛选规则A,一共五个设备数据,只有一个筛选通过了,这里涉及到A,B,D三种处理器

image-20240222103239661

演示筛选规则B,一共五个设备数据,2个筛选通过了,这里涉及到A,B,C,D三种处理器

image-20240222103338406

源码

Johnynzh/chain-of-responsibility-demo: 责任链模式与spring容器的搭配应用 (github.com)

少年!看你骨骼惊奇,是万中无一的练武奇才,我这儿有本武林秘籍,见与你有缘就送你了!

如来神掌

Windows I/O完成端口是一个我至今都说不好的话题,请宽容的接受我这不是科班出身的自学成才的野生程序员身份。以前在上海一公司做产品追溯的时候,我的老大拿出一本《Windows核心编程》经常向我吹嘘什么“ Windows I/O完成端口”编程模型的时候我是云里雾里。后来看了公司常用的一个叫“线程池”的类的源码,豁然有点醒悟了,不就是类似Queue这样的东西么?按先进先出顺序处理业务数据,这明明就不是线程池啊,误导人了。但是这个类确实挺好用的,公司它都使用了很多年了。不想独享特此分享出来。

    public class CoreThreadPool : IDisposable
    {
        /// <summary>
        /// 队列元素申明
        /// </summary>
        [StructLayout(LayoutKind.Sequential)]
        private class PoolData
        {
            /// <summary>
            /// 外部要求放入队列的数据
            /// </summary>
            public object Data;
            /// <summary>
            /// 需要执行的命令(Exit/Command(自定义))
            /// </summary>
            public PoolCommand Command;
            public PoolData()
            {
                Command = PoolCommand.Exit;
            }
            public PoolData(object data)
            {
                Data = data;
                Command = PoolCommand.Command;
            }
            public PoolData(PoolCommand cmd)
            {
                Command = cmd;
            }
        }
        protected enum PoolCommand
        {
            Command,
            Exit
        }
        protected SafeFileHandle complatePort;
        /// <summary>
        /// 线程池主线程
        /// </summary>
        protected Thread thread;
        protected volatile bool isOpened;
        [method: CompilerGenerated]
        [CompilerGenerated]
        public event Action<object> Exceute;
        [method: CompilerGenerated]
        [CompilerGenerated]
        public event Action<object> ExitExceute;
        /// <summary>
        /// 线程池是否正在运行
        /// </summary>
        public bool IsOpened
        {
            get
            {
                return this.isOpened;
            }
            set
            {
                this.isOpened = value;
            }
        }
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        private static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        private static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort, out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey, out IntPtr lpOverlapped, uint dwMilliseconds);
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
        /// <summary>
        /// 启动线程池的主线程
        /// </summary>
        public void Start()
        {
            isOpened = true;
            if (thread != null)
            {
                throw new Exception("线程池已经是启动状态!");
            }
            complatePort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 0u);
            if (complatePort.IsInvalid)
            {
                throw new Exception(string.Format("创建IOCP出错!原因是:{0}", Marshal.GetLastWin32Error().ToString()));
            }
            thread = new Thread(new ParameterizedThreadStart(this.Run));
            thread.Start(complatePort);
        }
        /// <summary>
        /// 外部提交数据对象到队列
        /// </summary>
        /// <param name="data"></param>
        public void Post(object data)
        {
            PostData(new PoolData(data));
        }
        /// <summary>
        /// 线程池主线程执行逻辑
        /// </summary>
        /// <param name="CompletionPortID"></param>
        private void Run(object CompletionPortID)
        {
            SafeFileHandle completionPort = (SafeFileHandle)CompletionPortID;
            while (IsOpened)
            {
                uint num;
                IntPtr intPtr;
                IntPtr value;
                //从队列里取出最前面的对象
                GetQueuedCompletionStatus(completionPort, out num, out intPtr, out value, 4294967295u);
                if (num > 0u)
                {
                    GCHandle gCHandle = GCHandle.FromIntPtr(value);
                    PoolData poolData = (PoolData)gCHandle.Target;
                    gCHandle.Free();
                    if (poolData.Command != PoolCommand.Command)
                    {
                        IsOpened = false;
                        break;
                    }
                    RaiseExecute(poolData.Data);
                }
            }
            RaiseExitExecute("线程池已经停止。");
            isOpened = false;
            thread = null;
        }
        /// <summary>
        /// 触发Execute事件
        /// </summary>
        /// <param name="data"></param>
        private void RaiseExecute(object data)
        {
            Exceute?.Invoke(data);
        }
        /// <summary>
        /// 触发ExitExecute事件
        /// </summary>
        /// <param name="data"></param>
        private void RaiseExitExecute(object data)
        {
            ExitExceute?.Invoke(data);
        }
        /// <summary>
        /// 结束线程池主线程
        /// </summary>
        public void Stop()
        {
            PostData(new PoolData(PoolCommand.Exit));
            IsOpened = false;
        }
        /// <summary>
        /// 内部提交数据到线程池队列中
        /// </summary>
        /// <param name="data"></param>
        private void PostData(PoolData data)
        {
            if (complatePort.IsClosed)
            {
                return;
            }
            GCHandle value = GCHandle.Alloc(data);
            PostQueuedCompletionStatus(complatePort, (uint)IntPtr.Size, IntPtr.Zero, GCHandle.ToIntPtr(value));
        }
        public void Dispose()
        {
            if (thread != null && thread.ThreadState != ThreadState.Stopped)
            {
                Stop();
            }
        }
    }

第1001次实践体验过程

上次做的人脸考勤程序在处理多个人同时考勤时我就使用了刚刚的类。

  private CoreThreadPool pool = newCoreThreadPool();private CoreThreadPool poolExt = newCoreThreadPool();

...

pool.Exceute
+=Pool_Exceute;
pool.Start();
poolExt.Exceute
+=PoolExt_Exceute;
poolExt.Start()
private void Pool_Exceute(objectobj)
{
var entity = obj asUserInfo;if (entity == null) return;try{#region TODO本地防止重复请求 using (DefaultDbContext db = newDefaultDbContext())
{
var dbEntity = db.Attenducelog.Where(e => e.Emp_No ==entity.EmpNo).First();
DateTime dt;
if (dbEntity == null)
{
//第一次考勤 dbEntity = newAttenducelog_Entity();
dbEntity.Emp_No
=entity.EmpNo;
dt
= DateTime.Now.AddDays(-1);
dbEntity.Log_DateTime
=dt;
db.Attenducelog.Add(dbEntity);
db.SaveChanges();
}
else{//已经多次考勤 dt =dbEntity.Log_DateTime;
}
TimeSpan ts
= DateTime.Now -dt;if (ts.TotalSeconds < 61)
{
return;
}
else{//已经多次考勤,本次成功了才记录打卡时间 dbEntity = db.Attenducelog.Where(e => e.Emp_No ==entity.EmpNo).First();
dbEntity.Log_DateTime
=DateTime.Now;
db.Attenducelog.Update(dbEntity);
db.SaveChanges();
}
}
#endregion string url = $"{config.AppSettings.Settings["Platform"].Value}/business/attendancedetails/AddAttendanceDetails";#region dtoPlatAttendanceDto dto= newPlatAttendanceDto();
dto.KeyId
=Guid.NewGuid().ToString();
dto.Status
= 0;
dto.AuditDate
= DateTime.Now.ToString("yyyy-MM-dd");
dto.CreateBy
= "AttendanceClient";
dto.AttendanceDatetime
= DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
dto.FkStore
= config.AppSettings.Settings["StoreID"].Value;
dto.EmpName
=entity.Name;
dto.EmpNo
=entity.EmpNo;
dto.WorkShift
= "";
dto.LocalDatetime
=DateTime.Now;#endregion string jsonData =JsonConvert.SerializeObject(dto);string rs =Program.PostJsonData(url, jsonData);if (!string.IsNullOrEmpty(rs) && JObject.Parse(rs).Value<int>("code").Equals(200))
{
JObject rs_Object
=JObject.Parse(rs);string data = rs_Object["data"].ToString();
JObject log
=JObject.Parse(data);string sound_TIPS = log.Value<string>("remark").Split("&".ToCharArray()).LastOrDefault();string tips = "[" + entity.Name + "]" + log.Value<string>("remark").Split("&".ToCharArray()).LastOrDefault();
AppSpVoiceSpeak(sound_TIPS);
MessageTip.ShowOk(tips,
3000);
}
}
catch(Exception ex)
{
if (ex.Message.Contains("无法连接到远程服务器"))
{
Thread.Sleep(
100);
ViewFaceCore.Controls.MessageTip.ShowError(
"无法连接到远程服务器" + Environment.NewLine + "Unable to connect to remote server", 300);
}
}
finally{
Thread.Sleep(
100);
}
}
        /// <summary>
        ///持续检测一次人脸,直到停止。/// </summary>
        /// <param name="token">取消标记</param>
        private async voidStartDetector(CancellationToken token)
{
List
<double> fpsList = new List<double>();double fps = 0;
Stopwatch stopwatchFPS
= newStopwatch();
Stopwatch stopwatch
= newStopwatch();
isDetecting
= true;try{if (VideoPlayer == null)
{
return;
}
if (token == null)
{
return;
}
while (VideoPlayer.IsRunning && !token.IsCancellationRequested)
{
try{if(CheckBoxFPS.Checked)
{
stopwatch.Restart();
if (!stopwatchFPS.IsRunning)
{ stopwatchFPS.Start(); }
}
Bitmap bitmap
= VideoPlayer.GetCurrentVideoFrame(); //获取摄像头画面 if (bitmap == null)
{
await Task.Delay(10, token);
FormHelper.SetPictureBoxImage(FacePictureBox, bitmap);
continue;
}
if (!CheckBoxDetect.Checked)
{
await Task.Delay(1000 / 60, token);
FormHelper.SetPictureBoxImage(FacePictureBox, bitmap);
continue;
}
List
<Models.FaceInfo> faceInfos = new List<Models.FaceInfo>();using (FaceImage faceImage =bitmap.ToFaceImage())
{
var infos = await faceFactory.Get<FaceTracker>().TrackAsync(faceImage);for (int i = 0; i < infos.Length; i++)
{
Models.FaceInfo faceInfo
= newModels.FaceInfo
{
Pid
=infos[i].Pid,
Location
=infos[i].Location
};
if (CheckBoxFaceMask.Checked ||CheckBoxFaceProperty.Checked)
{
Model.FaceInfo info
=infos[i].ToFaceInfo();if(CheckBoxFaceMask.Checked)
{
var maskStatus = await faceFactory.Get<MaskDetector>().PlotMaskAsync(faceImage, info);
faceInfo.HasMask
=maskStatus.Masked;
}
if(CheckBoxFaceProperty.Checked)
{
FaceRecognizer faceRecognizer
= null;if(faceInfo.HasMask)
{
faceRecognizer
=faceFactory.GetFaceRecognizerWithMask();
}
else{
faceRecognizer
= faceFactory.Get<FaceRecognizer>();
}
var points = await faceFactory.Get<FaceLandmarker>().MarkAsync(faceImage, info);float[] extractData = awaitfaceRecognizer.ExtractAsync(faceImage, points);
UserInfo userInfo
=CacheManager.Instance.Get(faceRecognizer, extractData);if (userInfo != null)
{
faceInfo.Name
=userInfo.Name;
faceInfo.Age
=userInfo.Age;switch(userInfo.Gender)
{
caseGenderEnum.Male:
faceInfo.Gender
=Gender.Male;break;caseGenderEnum.Female:
faceInfo.Gender
=Gender.Female;break;caseGenderEnum.Unknown:
faceInfo.Gender
=Gender.Unknown;break;
}
pool.Post(userInfo);
}
else{
faceInfo.Age
= await faceFactory.Get<AgePredictor>().PredictAgeAsync(faceImage, points);
faceInfo.Gender
= await faceFactory.Get<GenderPredictor>().PredictGenderAsync(faceImage, points);
}
}
}
faceInfos.Add(faceInfo);
}
}
using (Graphics g =Graphics.FromImage(bitmap))
{
#region 绘制当前时间StringFormat format= newStringFormat();
format.Alignment
=StringAlignment.Center;
format.LineAlignment
=StringAlignment.Center;
g.DrawString($
"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}", new Font("微软雅黑", 32), Brushes.White, new Rectangle(0, 0, Width - 32, 188), format);
g.DrawString($
"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}", new Font("微软雅黑", 32), Brushes.White, new Rectangle(2, 2, Width - 32, 188), format);#endregion //如果有人脸,在 bitmap 上绘制出人脸的位置信息 if(faceInfos.Any())
{
g.DrawRectangles(
new Pen(Color.Red, 4), faceInfos.Select(p =>p.Rectangle).ToArray());if(CheckBoxDetect.Checked)
{
for (int i = 0; i < faceInfos.Count; i++)
{
StringBuilder builder
= newStringBuilder();if(CheckBoxFaceProperty.Checked)
{
if (!string.IsNullOrEmpty(faceInfos[i].Name))
{
builder.Append(faceInfos[i].Name);
}
}
if (builder.Length > 0)
{
g.DrawString(builder.ToString(),
new Font("微软雅黑", 32), Brushes.White, new PointF(faceInfos[i].Location.X + faceInfos[i].Location.Width + 24, faceInfos[i].Location.Y));
g.DrawString(builder.ToString(),
new Font("微软雅黑", 32), Brushes.White, new PointF(faceInfos[i].Location.X + faceInfos[i].Location.Width + 24 + 2, faceInfos[i].Location.Y + 2));
}
}
}
}
if(CheckBoxFPS.Checked)
{
stopwatch.Stop();
if (numericUpDownFPSTime.Value > 0)
{
fpsList.Add(1000f
/stopwatch.ElapsedMilliseconds);if (stopwatchFPS.ElapsedMilliseconds >=numericUpDownFPSTime.Value)
{
fps
=fpsList.Average();
fpsList.Clear();
stopwatchFPS.Reset();
}
}
else{
fps
= 1000f /stopwatch.ElapsedMilliseconds;
}
g.DrawString($
"{fps:#.#} FPS", new Font("微软雅黑", 24), Brushes.Green, new Point(10, 10));
}
}
FormHelper.SetPictureBoxImage(FacePictureBox, bitmap);
}
catch(TaskCanceledException)
{
break;
}
catch{ }
}
}
catch(Exception ex)
{
Program.AppLogger.Error(ex);
}
finally{
isDetecting
= false;
}
}

其实触发数据就一句代码,看起来像这样:
pool.Post(userInfo);

好了,高手请看笑话吃瓜,有需要的同学可亲自尝试。bye 了个 bye!

去年我们梳理过OpenAI,Anthropic和DeepMind出品的经典RLHF论文。今年我们会针对经典RLHF算法存在的不稳定,成本高,效率低等问题讨论一些新的方案。不熟悉RLHF的同学建议先看这里哦
解密Prompt7. 偏好对齐RLHF-OpenAI·DeepMind·Anthropic对比分析

RLHF算法当前存在的一些问题有

  1. RL的偏好样本的人工标注成本太高,效率低,容易存在标注偏好不一致的问题
  2. RLHF属于online训练策略,在训练过程中需要让模型进行解码,时间成本高训练效率低
  3. RLHF在训练过程中需要同时部署Reward模型和SFT模型和更新后的模型,显存占用高训练成本高
  4. RLHF需要两阶段的训练,需要先训练reward模型,再使用reward模型更新SFT模S型

这一章我们先聊聊训练策略的新方案。用
新方案
而不是优化或者改良,因为
平替们
的效果需要更长时间的验证。

SLiC-HF

  • SLiC-HF: Sequence Likelihood Calibration with Human Feedback
  • CALIBRATING SEQUENCE LIKELIHOOD IMPROVES CONDITIONAL LANGUAGE GENERATION

要说SLiC-HF,肯定要先说下前置的Calibartion Sequence likelihood(SLiC)的对齐技术,毕竟上面这两篇论文的部分作者都相同,思路自然是一脉相承。

SLiC

SLiC对标SFT,也是post-training的指令对齐方案。方案针对指令微调阶段使用MLE也就是next token prediction带来的稀疏训练问题。因为给定context,是有无数种output可能的。而微调阶段只使用唯一的答案进行训练,导致模型训练不充分。一个明显的现象就是
序列的解码概率越高,并不意味着生成序列的质量越好,这意味着生成序列其实是未修正的(uncalibrated)

SLiC的思路有些类似半监督。也就是标注数据有限,导致模型参数更新的空间有限的情况下,我们可以使用半监督的平滑性和一致性原则,既和标注样本相似的样本label相同,反之不同的思路,使用无标注样本对模型进行更新

那我们把半监督的思路放到文本生成:

第一步.先使用SFT对齐后的模型,针对标注样本,每个样本生成m个推理候选结果,这些就是半监督中的未标注样本

第二步.使用无监督样本进行对比训练,核心就是训练模型对和标注答案更相似的候选样本给予更高的解码概率,反之更低

这里训练就有两个细节

  1. 序列相似如何定义?这里没有引入新的向量模型,直接使用大模型解码输出层的向量表征(seq * hidden)和标注结果的向量表征来计算cosine相似度,相似度计算参考了BertScore的F1值。并且这里对序列进行了切分,分别计算span=1,2,4,8等不同长度的F1值,再进行聚合。

  1. 损失函数如何定义?论文尝试了以下4种不同的对比损失函数,主要差异在pair-wise还是list-wise,拟合相似度的相对排序(i-j),还是绝对打分(P(yi|x)-P(yj|x))的高低。消融实验显示第一个Rank Loss的效果最好。也就是从所有解码生成的候选中随机采样两个,以上F1更高的为正样本,反之为负样本。计算解码概率的Hinge-Loss

这里论文同样加入了正则项,避免模型过度偏离原始SFT对齐的模型,分别尝试了KL和MLE两种不同的正则。消融实验显示KL正则项的效果更好。
所以综上SLiC使用了无监督的思路,用对比学习来进行对齐。下面我们来看如何使用SLiC来对齐人类偏好

SLiC-HF

偏好样本

首先SLiC-HF用的是
offline
的训练方案,所以先说下偏好样本是如何构建的。论文尝试了Direct和Sample and Rank两种样本构建方案。

Direct方案就是直接使用Reddit摘要数据集中人工标注的正负偏好样本作为
\(y^+,y^-\)
,优点是成本低,缺点是这里的解码结果可能和SFT模型的解码分布存在偏差。

Sample and Rank,也就是先使用以上偏好数据训练Reward模型,论文尝试了两种方案,一个是绝对偏好,模型预测Good/Bad使用解码概率作为label。另一个是相对偏好,也就是模型学习两个摘要之间的相对好坏。

之后使用SFT模型随机解码(temperature=0.7)生成的8个解码候选,使用以上模型打分或排序后,随机采样8个正负样本对。

效果上Sample and Rank要优于Direct,但如果Driect部分是直接使用SFT模型生成候选再人工标注的话,其实结果可能也不差。

损失函数

已经有了正负样本对,那其实只需要用到上面的对比损失函数了,不需要使用半监督了。不过这里的正则器没有选用KL,而是直接使用SFT样本的MLE来防止模型能力衰减。最终的损失函数如下

除了Offline的样本构建训练效率更高之外,SLiC-HF直接使用序列概率表征偏好,因此不需要使用reward模型,同时对比来自样本而非来自模型,因此也不再需要使用冻结参数的SFT模型。训练过程内容中只有一个SFT模型进行梯度更新。

DPO

DPO和SLiC同样是基于offline的正负偏好样本对,通过对比学习来进行偏好对齐。DPO的偏好样本标注是直接基于SFT模型生成候选,然后人工标注得到正负(win,loss)样本对,然后直接使用损失函数进行拟合,不训练reward模型。不过二者的对比损失函数不同,DPO的损失函数如下

以上
\(\pi\)
是模型解码输出层每个token
的输出概率logp求和,
\(\theta\)
是参与梯度更新的模型,ref是SFT对齐后的模型参数作为基准参数被冻结。

所以简单直观的理解也就是DPO的损失函数,让模型对偏好样本的解码概率相比ref升高,让模型对负样本的解码概率相比ref下降。和Triplet Loss的对比损失函数的思路有些相似。

我们和SLiC-HF做下对比,首先SLiC是hinge-loss(maximum-margin),DPO不是。其次SLiC是正负样本直接对比,DPO是正负样本概率分别和基准模型(SFT模型)进行对比,二者的差异有些类似simases和triplet loss,只不过DPO的锚点不是锚点样本而是基准模型。所以模型既需要拟合相对偏好,也需要保证绝对分布不会答复偏离原始SFT模型。在后面的一些对比论文中普遍结论是DPO的损失函数更优,SLiC的对比函数会导致一些reward hacking

论文还进一步从梯度计算的角度进行了阐述,如果上述损失函数对
\(\theta\)
求导。会得到以下公式

其中
\(\hat{r_{\theta}}(x,y)=\beta log(\frac{\pi_{\theta}(y|x)}{\pi_{ref}(y|x)})\)
是DPO的核心,既对齐模型的输出层的概率偏离原始SFT模型的幅度能隐式表征偏好,作为 pseudo Reward来进行模型对齐

。正负样本差异越大越多更新幅度越大,梯度方向是提高偏好样本的解码概率,降低负样本的解码概率。

RRHF

RRHF同样是offline构建正负样本对,再采用对比学习进行偏好对齐的方案,那这里我们只看RRHF和SLiC的差异点。
其一是RRHF使用了长度归一化的序列概率来表征偏好,SLiC直接使用了解码概率

其二是SLiC使用了Hinge-Loss,而RRHF是直接拟合正负样本的概率差

其三是正负样本的构建方案,SLiC是基于SFT模型进行随机解码生成候选,并基于Reward模型离线构建正负样本,而RRHF的候选采样方案还对比了beam-search,diversity-beam-search,以及Iterate-beam-search,也就是每训练一个epoch基于微调后的模型重新生成一波候选。Iterate-beam-search的采样方案会有一些效果提升,考虑生成样本会随分布修正而逐渐优化,可以覆盖更多的分布空间。以及Iterate-beam-search其实和PPO在线解码进行模型更新的方案更加相似,但相对效率更高。

三合一大礼包- RSO

STATISTICAL REJECTION SAMPLING IMPROVES PREFERENCE OPTIMIZATION

RSO方案融合了以上三者,主要是DPO和SLiC,分别对损失函数和偏好样本对的构建方式进行了改良。先说损失函数,RSO把SLiC的Hinge-loss加入到DPO的sigmoid-norm损失函数中,得到了如下的hinge-norm损失函数

再有是偏好样本构建,RSO指出既然以上对比函数的目标是拟合最优的Policy,那理论上偏好样本对也应该从
\(\pi^*\)
来构建。近似于以上RRHF的Iterate-beam-search的最后一个Iterate的样本分布。但
\(\pi^*\)
还没训练出来要如何拿到它的对比样本呢?

这里RSO提出可以采用从
\(\pi_{SFT}\)
中拒绝采样来近似
\(\pi_{r}\)
的分布,对比SLiC的SFT-sample-rank,称之为RSO-Sample-Rank。具体构建方式还是从SFT生成多个解码候选,并使用训练的Reward模型对每个候选进行打分,接着进行拒绝采样。

首先拒绝采样使用g(x)拟合f(x), 计算一个常数C,使得
\(c*g(x)>=f(x)\)
。则采样过程是从g(x)中采样,当随机变量
\(U\sim(0,1)<=\frac{f(x)}{c*g(x)}\)
则保留样本,反之拒绝。

这里g(x)就是SFT模型
\(\pi_{sft}\)
,f(x)是最终对齐的模型
\(\pi_{r_{\tau}}\)
,理论上
\(m*\pi_{sft}>=\pi_{r_{\tau}}\)
,这样当
\(U<= \frac{\pi_{r_{\tau}}}{m*\pi_{sft}}\)
我们保留样本,但因为这里的的
\(\pi_{r_{\tau}}\)
并无法获得,因此我们用DPO中推导的Policy和reward的关系

为了diff掉正则项Z,论文使用所有随机解码样本的最大reward的(x,y)来作为常数C的估计。

最终得到的拒绝采样的代码如下

截图\_选择区域\_20240211093757

效果上论文对比了DPO,SLiC,RSO,以及不同损失函数,不同采样方案的效果差异。整体上采样带来的收益是更为显著,DPO的损失函数上加不加hinge差异并不大,但都会优于SLiC的直接对比损失函数。

截图\_选择区域\_20240211093937

想看更全的大模型相关论文梳理·微调及预训练数据和框架·AIGC应用,移步Github >>
DecryPrompt

写在开头

某大厂的面试现场,一位目光深邃,头顶稀疏的中年面试官坐在椅子上,这时候的我走了进来。
面试官:“小伙子,学过Java中容器和数据结构了吧?”
我:“嗯,学了”
面试官:“ok,那你来聊一聊Java中的迭代器(Iterator ),要说清楚他们的应用场景哈”
我:“哦,好滴”
内心独白:“这面试官不按套路出牌啊,本来以为会问问ArrayList,HashMap呢,或者手撕排序算法,这上来直接让撕迭代器”
虽然面试官不按套路出牌,但这时我们也不能乱,迅速的平复心情后,大脑飞速运转,回想着之前学的内容,其实迭代器和比较器确实在容器和数据结构中有所体现。

Iterator (迭代器)

在解释迭代器之前,我们先来聊一下23种设计模式之一:迭代器模式,它是 Java 中常用的设计模式之一。用于顺序访问集合对象的元素,无需知道集合对象的底层实现。
而Iterator则是在这种设计思想下诞生的产物,Iterator 是可以遍历集合的对象,为各种容器提供了公共的操作接口,隔离对容器的遍历操作和底层实现,从而解耦。

【源码解析1】

public interface Iterator<E> {
    //是否有下一个元素
	boolean hasNext();
	 
	//下一个元素
	E next();
	 
	//从迭代器指向的集合中删除迭代器返回的最后一个元素
	default void remove() {
	    throw new UnsupportedOperationException("remove");
	}
	 
	//遍历所有元素
	default void forEachRemaining(Consumer<? super E> action) {
	    Objects.requireNonNull(action);
	    while (hasNext())
	        action.accept(next());
	}
}

那我们日常使用中如何通过迭代器去遍历集合中的数据呢?我们接下往下看(一步一步耐心的给面试官解释,不要紧张,保持逻辑清楚!)

【代码示例1】

public class Test {
    public static void main(String[] args) {
        ArrayList<String> str = new ArrayList<>();
        str.add("aaa");
        str.add("bbb");
        str.add("ccc");
        //迭代器遍历
        Iterator it = str.iterator();
        while (it.hasNext()) {
            System.out.print(it.next() + ",");
        }
    }
}

输出:

aaa,bbb,ccc,

我们创建一个包含ArrayList容器,里面包含aaa,bbb,ccc元素,通过调用str对象的iterator()方法去遍历元素,然后将遍历的元素打印出来,如上输出(这部分可以手撕给面试官看,方便后续的展开阐述)

面试官:“那你知道为什么ArrayList可以调用迭代器方法吗,底层逻辑有没有看过?”

当面试官问到这个问题的时候,我们心中一喜,因为他成功的被我们引导到了我们熟悉的方向上,那么接下来,我们要好好唠一唠了!

我:“嗯,之前学习的时候,有跟过这部分底层源码,我说说看,不对的麻烦您多给指正哈”

我们知道Collection是Set、List、Queue的父接口,而Collection接口又继承了另外一个接口,叫Iterable,得到了它的一个接口方法iterator()。

【源码解析2】

public interface Collection<E> extends Iterable<E> {
	...
	Iterator<E> iterator();
	...

而对于我们日常使用的集合类来说,如ArrayList,它的继承关系让它可以得到iterator对象,我们可以画一个流程图来分析一下。

ArrayList中重写了AbstractList中的iterator()方法,并返回一个内部类对象Itr,我们看一下这个内部类的实现源码。

【源码解析3】

private class Itr implements Iterator<E> {
        int cursor;       // index of next element to return
        int lastRet = -1; // index of last element returned; -1 if no such
        int expectedModCount = modCount;

        Itr() {}

        public boolean hasNext() {
            return cursor != size;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            checkForComodification();
            int i = cursor;
            if (i >= size)
                throw new NoSuchElementException();
            Object[] elementData = ArrayList.this.elementData;
            if (i >= elementData.length)
                throw new ConcurrentModificationException();
            cursor = i + 1;
            return (E) elementData[lastRet = i];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            checkForComodification();

            try {
                ArrayList.this.remove(lastRet);
                cursor = lastRet;
                lastRet = -1;
                expectedModCount = modCount;
            } catch (IndexOutOfBoundsException ex) {
                throw new ConcurrentModificationException();
            }
        }

        @Override
        @SuppressWarnings("unchecked")
        public void forEachRemaining(Consumer<? super E> consumer) {
            Objects.requireNonNull(consumer);
            final int size = ArrayList.this.size;
            int i = cursor;
            if (i >= size) {
                return;
            }
            final Object[] elementData = ArrayList.this.elementData;
            if (i >= elementData.length) {
                throw new ConcurrentModificationException();
            }
            while (i != size && modCount == expectedModCount) {
                consumer.accept((E) elementData[i++]);
            }
            // update once at end of iteration to reduce heap write traffic
            cursor = i;
            lastRet = i - 1;
            checkForComodification();
        }

        final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
        }
    }

讲到这里,我们可以给一个阶段性的总结:所以在继承了Collection接口,并实现了iterator()方法的所有集合类,都可以使用迭代器进行元素的遍历。
【温馨提示】
若此时的你足够强大,足够自信,还可以进一步引出增强for循环遍历,它的底层原理也是Iterator

【代码示例2】

for (String str : list) {
    System.out.print(str + ",");
}

【反编译】

Iterator var3 = list.iterator();

while(var3.hasNext()) {
    String str = (String)var3.next();
    System.out.print(str + ",");
}

反编译后我们可以看得出,底层的实现就是迭代器,而这个for-each的写法不过是Java的一个语法糖罢了,这部分属于附加题,讲不明白的,可以不提。
本来以为迭代器这个话题到此就结束了,没想到面试官紧接着又抛出了一个问题
面试官:“LinkedList也是如此吗?”
听到这里咱们心里一紧,他终究是要挖光我们的家底呀
确实!LinkedList有所不同,LinkedList 并没有直接重写 Iterable 接口的 iterator 方法,而是由它的父类 AbstractSequentialList 来完成。
进入源码会发现,这个AbstractSequentialList 中提供了一个listIterator对象,而LinkedList进行了方法的重写。
【源码解析4】

public interface ListIterator<E> extends Iterator<E> {
    boolean hasNext();
    E next();
    boolean hasPrevious();
    E previous();
}

这让它拥有了可以从任意下标开始遍历,而且支持双向遍历的能力。注意ListIterator只支持List类型集合。
到此,我们对于Iterator的了解全盘拖出了,当然还有一些细枝末节的知识,但我相信能回答到这里,已经获得面试官的认可啦。

结尾彩蛋

如果本篇博客对您有一定的帮助,大家记得
留言+点赞+收藏
呀。原创不易,转载请联系Build哥!