2024年2月

Eclipse是一个开放源代码的集成开发环境(IDE),最初由IBM公司开发,现在由Eclipse基金会负责维护。它是一个跨平台的工具,可以用于开发多种编程语言,如Java、C/C++、Python、PHP、Rust等。

Eclipse提供了一个可扩展的架构,允许开发者通过安装插件来扩展其功能,因此它被广泛用于各种软件开发项目,包括企业级应用、移动应用、Web应用等。Eclipse的核心功能包括代码编辑器、调试器、版本控制系统集成(如Git)、构建工具集成(如Maven、Gradle)、项目管理工具等。

Eclipse包含如下特点:

  1. 插件架构:
    Eclipse的插件架构使得开发者可以根据需要灵活地扩展IDE的功能,这使得Eclipse适用于多种开发场景。
  2. 跨平台性:
    Eclipse可以在多个操作系统上运行,包括Windows、MacOS和Linux,这使得开发者可以在不同的平台上使用相同的开发工具。
  3. 开源:
    Eclipse是开源的,这意味着任何人都可以查看其源代码、修改和定制它,以满足自己的需求。
  4. 社区支持:
    Eclipse拥有一个庞大的用户和开发者社区,这意味着可以轻松地找到支持、插件和解决方案。

接下来为大家介绍一下Windows环境下Eclipse的安装:

一、安装前准备

请确保电脑本机安装了JDK,没有安装的小伙伴,请点击下方任意一个链接查看安装教程(这里我安装的是JDK 8版本的):

JDK 8安装教程 JDK 11安装教程 JDK 17 安装教程

JDK安装完成后,按
Win

R
键,输入
cmd
,进入控制台,输入
java -version
,如果出现如下内容,就说明JDK已经安装成功了。这里我安装的是JDK 8:

本文后续内容使用NDM(Neat Download Manager)下载文件,可以加快下载速度(推荐)。如需使用此款软件的小伙伴,可以查看这篇教程:
下载神器NDM(Neat Download Manager)安装配置教程(适用于Windows和MacOS)

二、下载安装Eclipse

1. 根据电脑上安装的JDK版本,选择要安装的Eclipse版本。以下是Eclipse所需最低JDK版本:

Eclipse版本 所需JDK版本
2020-06及之前的版本 JDK 8/11/17
2020-09至2022-06之前的版本 JDK 11/17
2022-09及之后的版本 JDK 17

2.
点我进入Eclipse官网下载列表
。由于我电脑上安装的是JDK 8,我需要选择2020-06及之前的版本,这里我选择2020-06版本的Eclipse下载:

3. 找到
Eclipse IDE for Enterprise Java Developers
,在右侧选择Windows版本下载:

4. 下载完成后,将Eclipse解压到一个你熟悉的位置(建议文件路径是全英文的),这里我解压到了D盘:

5. 按照下图所示操作将Eclipse设置成
桌面快捷方式
,以后只需要在桌面直接打开快捷方式就可以了,比较方便。

三、Eclipse创建Workspace和简单设置

这一部分我们需要对Eclipse进行以下配置,方便后续我们使用Eclipse。

3.1 Eclipse配置本地安装的JDK

1. 在桌面双击打开Eclipse,此时会弹出一个选择工作空间Workspace的界面。点击
Browse
,选择一个你熟悉的文件夹,然后点击
Launch
即可:

2. 进入Eclipse以后,我们需要将本地安装的JDK应用到Eclipse中。点击上方菜单栏的
Window
,然后点击最后一个
Preferences
,进入Eclipse设置:

3. 在左侧选项中,找到
Java
并展开,选择
Installed JREs

4. 在Installed JREs中,选中系统自带的JRE,然后在右侧点击
Edit

5. 点击右上角的
Directory
,选择JDK安装位置,直到下方出现很多的JRE,然后点击
Finish

6. 点击右下角
Apply and Close
,完成设置并关闭:

3.2 配置新建文件菜单

安装完Eclipse后,你可能会发现在其新建文件菜单中默认并未包含所需的文件类型。为了确保后续的使用便捷,我们需要对Eclipse新建文件菜单进行一些配置。

1. 在上方菜单栏点击
Window
,选择
Perspective
,然后点击
Customize Perspective

2. 点击
Menu Visibilitiy
,依次展开
File

New

3. 这里我勾选了Java常用的项目、文件类型,小白可以按照下图的方式进行勾选即可,完成后点击下方的
Apply And Close

3.3 调整代码字体样式

Eclipse创建的代码文件在编辑器中字体比较小,事实上,你可以在设置菜单中调整Eclipse中代码的字体大小和样式,直到满足你的审美要求。

1. 点击上方菜单栏的
Windows
,然后点击最后一个
Preferences
,进入Eclipse设置:

2. 进入设置以后,点击
General
-->
Appearance
-->
Color And Fonts

3. 在颜色和字体界面中,双击展开
Basic
,找到并选中最后一个
Text Font
,点击
Edit

4. 在字体窗口中可以自行调整字体样式和大小,完成后点击确定,后续设置界面点击右下角的Apply and Close。

这里我选择的Consolas字体,字体大小16号。下方示例可以预览字体:

四、创建Java项目并运行Java程序

完成上面的配置后,我们就可以愉快写一段Java代码了。

1. 在上方菜单栏点击
File
,选择第一个
New
(或者按
Shift
Alt
N
),选择
Java Project

2. 按照下图操作,创建一个Java项目:

4. 如果出现下图弹窗,点击
Open Perspective
,此时Eclipse会以Java视图显示:

5. 在左侧Package Explorer,
双击展开
我们的项目MyJavaProject,鼠标右键
点击src

点击New

点击Class
创建Java文件:

6. 按照下图所示创建一个Java文件,注意文件名(类名要符合
标识符命名规范
):

7. 在MyDemo01类中写一个main方法,如下图所示:

注意:只要左上角页签MyProgram01.java前面会有一个星号标志,这说明我们未保存这个Java文件,需要按
Ctrl

S
保存。

8. 鼠标右键点击代码,找到
Run As
,点击
Java Application
运行Java程序:

9. 此时下方控制台就会输出我们在程序中写的内容,说明我们的代码编译运行成功!

五、Eclipse的卸载(可选)

这一部分会将软件卸载,请谨慎操作!如果不需要卸载,请直接跳过这一部分的内容!

1. 找到Eclipse安装目录,按
Shift
Delete
彻底删除(不经过回收站):

2. 如果想删除掉Eclipse在本机的相关配置,请按照下图所示的操作,找到存在
桌面
的文件夹:

3. 找到
.p2
文件夹,选中,按
Shirt
Delete
彻底删除:

至此,Eclipse卸载完成。

RAPTOR:递归摘要与树形检索的结合,提升RAG检索性能

随着 LLM 技术的发展,RAG 的价值也来越明显,可以视作 LLM 应用、落地的一个主要方向。RAG通过结合检索系统和生成模型,在生成回答时先从外部知识库种检索相关信息,辅助 LLM 进行更准确的生成。知识的粒度是多样的、零散的。如何
从知识库中精准地检索到相关的知识片段
是一个极具挑战性地问题。

概述

在目前构建 RAG 系统的流程中,基本都会涉及到对文档进行分块(有没有不需要进行分块的方法呢?)。现行的方式主要是通过滑动窗口进行分块,调一调分块的大小等。私以为,如何进行分块是一个很重要的问题。

目前检索增强的方法中的一个主要缺点,也是论文主要
想解决的问题
:检索到的主要是一些短的、连续的块。对于需要阅读整个文档才能回答的问题,检索到的块是不能包含足够的信息的。

论文的出发点很好,也是现在很多 RAG 系统的关键问题。分块的大小、如何分块是一个依赖于用户输入的问题。有些查询,只需要某个、某些块就可以回答,或者只需要块中的某句话就可以回答。而且,
文本一般是涵盖多个主题的
,并且具有
层次化的结构

且看作者的解决方法 —— 作者设计了一个树结构的索引和检索系统 RAPTOR(Recursive Abstractive Processing for Tree-Organized Retrieval),捕捉文本的多尺度,不同层次的信息。通过对文本块进行总结,向 LLM 提供不同层次的信息。

RAPTOR方法介绍

RAPTOR 在文本块的基础上,构建了一个递归的树结构,树中的节点标识不同粒度的语义信息。

基本流程:

  • 分块与嵌入。
    • 按 100 的大小对文本进行分块,如果一个句子长度超过 100,则直接将句子作为一个文本块,保证块内语义的一致性。(如何断句也很重要!)
    • 对文本块进行 embedding。
  • 递归的构建 RAPTOR 树。文本块及其 embedding 作为树的叶子节点。
    • 通过聚类把相似的块聚在一起。
    • 利用语言模型为簇内的文本生成总结,并为总结生成 embedding,也作为树的一个节点。
    • 递归执行上述过程。
  • 查询。即如何检索相关的块。文中提供了两种策略:
    • Tree Traversal Retrieval。遍历树的每一层,剪枝并选择最相关的节点。
    • Collapsed Tree Retrieval。评估整个树中每个节点的相关性,找到最相关的几个。

RAPTOR 树构建流程:

RAPTOR 树的两种查询策略:

整个流程很简洁,方法也很直观。其中比较重要的环节不言而喻:聚类。

文中选择了软聚类的方式:每个节点可以从属于多个簇。直接上来看,如果簇作为话题,那么每个文本块并不是只属于每个话题的,选择软聚类也是很直观的。文中的聚类算法是基于 GMM 聚类的,且聚类前通过 UMAP(Uniform Manifold Approximation and Projection ) 对文本表征进行了降维。论文对聚类的过程进行了一些改造,并通过 BIC(Bayesian Information Criterion)来选择最优簇数,一些细节可以参考原文。

总结

我认为,论文想解决的问题是很关键的,也是构建 RAG 系统中一个必须要考虑的问题。论文的思路很直观,和笔者在相关的实践中可以说是不谋而合。对于一个查询,在不考虑性能和其他问题的条件下,最理想的块应该是文本中与查询最相关的那段话、几句话甚至一句话(这就有点像抽取式的文本摘要了)。 如何找到能够恰好包含答案的文本块是一个很有挑战性的工作,期待找到更好的方案。


本文分享自华为云社区《
服务运行时动态挂载JavaAgent和插件——Sermant热插拔能力解析
》,作者:华为云高级软件工程师 栾文飞

一、概述

Sermant是基于Java字节码增强技术的无代理服务网格,其利用Java字节码增强技术,为宿主应用程序提供服务治理功能,以解决大规模微服务场景中的服务治理问题,通过Java字节码增强技术,可以非侵入的提供服务治理能力。在以往版本中,Sermant通过配置-javaagent指令在微服务启动时接入服务治理能力,当需要接入及卸载Sermant时都需要通过重新启动微服务来完成。但从1.2.0版本开始,Sermant实现了在服务不停机状态下进行安装和卸载的能力,为服务治理能力带来全新接入体验。本文将会对这种动态接入的机制,从技术基础到Sermant设计进行一次深入分析。

二、
JavaAgent
加载方式

首先介绍一下JavaAgent的不同接入方式,这是Sermant实现动态接入能力的技术基础。Java 中Instrumentation API 提供了一种修改字节码的机制,利用该API,可以通过修改字节码的方式来改变程序的行为,而不用触及程序的源码。JavaAgent为Instrumentation API的客户端,通过JavaAgent可以调用API进行字节码的操作,其提供了两种加载方式给开发者重载:

  • 静态加载:利用premain,在应用程序启动时加载 JavaAgent称为静态加载,静态加载会在启动时在执行任何代码之前修改字节码。

静态加载时,字节码增强是在类加载时发生的,当Java程序启动时,类加载过程中所有被加载的类都会经过JavaAgent所定义的类文件转换器的处理。

  • 动态加载:利用agentmain通过Java Attach API将JavaAgent加载到
    已运行的
    JVM中,动态加载可以通过字节码重转换的方式在运行时修改字节码。

动态加载时,和静态加载不同的是,此时JVM已在运行,目标类已被加载,就不能像静态加载时一样触发字节码增强过程,在使用动态加载的过程中,往.往会通过Instrumentation API来触发目标类(当然也可以指定所有已被加载的类)的重转换过程,在重转换过程中就会触发到Agent构建的类文件转换器,从而完成字节码增强过程。

动态加载方式为JavaAgent提供了在JVM运行时接入的能力,但通过类重转换来触发字节码增强相对于在类加载时增强有一定的局限性,例如不能在增强时修改类的继承关系,不能为类添加静态代码块,不能增强内存中和资源文件中字节码不一致的类等,这些也是在使用动态加载和多JavaAgent场景中常见的问题,综上,两种加载方式各有利弊,可以在使用时按照业务场景选择。

三、
Sermant
热插拔能力关键问题剖析

在了解技术基础后,我们能轻易的想到,理论上基于JavaAgent的动态加载方式,只需要在使用Sermant时,将通过premain方式启动改为通过agentmain方式启动,就可以将微服务治理能力动态的接入到微服务中,做到微服务零侵入、微服务不停机的状态下接入服务治理能力,但通往前方的路上总是充满了障碍:

3.1
如何保证动态安装过程中重转换可顺利执行?

这个问题的出现,根源在于JavaAgent通过agentmain方式加载到已运行的JVM中时,不同于静态加载,会在类初次被加载时完成字节码的转换,动态加载时一些需要被字节码增强类已经完成了类加载过程,这时候需要使用Instrumentation提供的类重转换(retransform classes)能力来修改字节码,在Instrumentation的Javadoc中关于这个能力有这样一段描述:

“The retransformation must not add, remove or rename fields or methods, change the signatures of methods, or change inheritance.
(重转换过程中,我们不能新增、删除或者重命名字段和方法,不能更改方法的签名,不能更改类的继承。)”

从中可以看出,在引入动态加载能力前,优先要保证字节码增强时,不可以有上述内容中所描述的限制操作。

不过Sermant不太需要担心这个问题,因为这种限制不仅仅在动态加载时会触发,在多个JavaAgent同时使用时也可能会触发,可以参考Sermant团队的另一篇文章:
《记一次多个JavaAgent同时使用的类增强冲突问题及分析》
。为了保证在多Agent场景下的兼容性,Sermant的字节码增强模板严格遵循Instrumentation API的限制,因此Sermant在兼容性上的不断改进过程中无心插柳,帮助动态加载能力铺平了路。

3.2
如何保证在服务治理插件安装和卸载时不互相影响?

Sermant的设计中,通过字节码增强引入的服务治理能力,是通过在目标方法上添加服务治理功能切面来完成的,每一个服务治理插件,通过一系列切面的配合来达成最终的服务治理效果。不同的服务治理功能,可能会对同一个目标方法进行处理。但并不会对同一个方法进行多次字节码增强,而是通过一次字节码增强织入调度切面(onMethodEnter、onMethodExit等),通过该切面对相关的服务治理能力(通过拦截器实现,每一个切面会对应一个拦截器的列表)进行调度:

对于服务治理能力的调度逻辑我们在另一篇文章
《开发者能力机制解析,玩转Sermant开发》
有讲过,本篇不再赘述。

基于框架的基本设计,就需要考虑两个问题,当插件在动态安装时,如何保证不重复字节码增强?当插件卸载时,如何保证不会导致有相同目标方法的插件失效。

  • 安装时如何保证不重复执行字节码增强?

在字节码增强开发过程中,类文件转换器(ClassFileTransformer)是一定会接触到的概念,开发者需要基于该转换器来进行字节码的处理。在大多数的字节码增强框架中,都会对其进行封装,用于降低字节码处理的难度。Sermant基于ByteBuddy提供的类文件转换器实现了一种可重入的类转换器,在插件动态安装时,虽然目标方法已经被已安装的插件增强过了,但此时还是会触发类文件转换(因为动态安装插件的过程是独立的),当触发类文件转换时,所有相关的类文件转换器都会被唤醒,再次触发类文件转换过程。每次可重入类转换器被唤醒时,将发生以下行为:

在Sermant中维护了一个针对目标方法的字节码增强锁(AdviceKey锁),即针对每一个目标方法,维护了1个信号量当做锁,用于让各类文件转换器来检查目标方法的字节码增强状态,当目标方法对应的类被类转换时,就会触发Sermant所提供的类文件转换器,此时类文件转换器将尝试获取针对目标方法的信号量,如果能获取信号量,则执行对目标方法的字节码增强,如果不能获取,则不执行字节码增强。

基于字节码增强锁,在转换器触发时,主要有两条路径可以走,类文件转换器会通过目标方法的AdviceKey
(
类名+
方法hash+
类加载器组成的一个唯一表示,用于表示字节码增强的目标)
来检查其所关联的锁,判断当前目标方法是否已被Sermant进行过字节码增强(织入拦截器调度的切面):

  1. 能获取锁,说明未被增强:则当前文件转换器获取当前AdviceKey所关联的锁,将其获取的锁通过其对应的插件来维护,并且执行字节码增强,将服务治理所需的拦截器放入该AdviceKey所对应的拦截器列表;
  2. 不能获取锁,说明已被增强:则只将拦截器放入该AdviceKey对应的拦截器列表中,不执行字节码增强。

通过上述机制,就可以保证Sermant在安装不同服务治理插件时,不会进行重复的字节码增强,避免无端的性能和资源损耗。

  • 卸载时如何保证不会导致其他插件失效?

当插件需要卸载时,会再次触发相关目标类的重转换,与安装时不同的是,这次需要被卸载的插件释放自身已经持有的AdviceKey锁。释放锁后,触发目标类重转换时,目标类所对应的各个插件的类文件转换器将会再次触发和安装时相同的流程:

在这个过程中,未被卸载的插件所提供的对目标类的类文件转换器,会在目标类重转换时,再次触发,并且只会经历获取锁和字节码增强的过程。这样就保证,如果还有插件需要对该目标方法进行字节码增强时,可以获得目标方法所对应的锁,不会因为目标方法的交集而导致其他插件能力失效。

四、总结

本篇文章对Sermant的热插拔能力的核心机制进行了解析,希望可以为开发者及使用者在开发或使用相关能力时带来更多的灵感和便利。更多的热插拔能力介绍可以参考官网相关文档,
Sermant Agent使用手册
,后续我们也会针对热插拔适用的场景进行进一步分享,敬请期待。

Sermant作为专注于服务治理领域的字节码增强框架,致力于提供高性能、可扩展、易接入、功能丰富的服务治理体验,并会在每个版本中做好性能、功能、体验的看护,广泛欢迎大家的加入。

点击关注,第一时间了解华为云新鲜技术~

1. 概述 - 何来此文

有些天没有更新文章了,大约两三周。

为什么今天会再写一次源码分析相关的文章呢?

在几年前我写过一篇:
《Kubernetes client-go 源码分析 - workqueue》
,不过过去2年我没有搞 K8s,忘得差不多了……

这几周我接了个活,写一个 vgpu 调度器,我以为是三天的活,结果搞了三周,笑死。

这个项目涉及到 K8s
自定义控制器

调度器拓展

webhook
等,还是非常有趣的。虽然写起来有点费头发,但是成就感满满。可能你的想法和我一开始一样,感觉就是500行代码的事情,三天绰绰有余!哎,都是泪,5000行都写不完……

过去2年时间我在 DevOps 领域干活,K8s 相关的知识点很多已经记不清了。这会 vgpu 调度器也实现完了,忙绿告一段落,同时心里的疑惑也攒满了,各种我知其然(会用)但是不知其所以然(不记得细节,原理)的点,趁着过年期间,补一补。

今天从 workqueue 开始。

写完本文,我又回过头瞟了下几年前写的 workqueue 源码分析文章,略有感慨。当年更加关注的是如何快速看懂源码,更加关注“workqueue 的实现”本身;而今天再看 workqueue,我则更加关注“为什么要看 workqueue”,在自定义控制器里用到了 workqueue,这些用法的道理何在,原理何在。

同时这次刷 workqueue 源码,我能够深切感受到这段源码的优雅,不止是看懂逻辑,更多是一种享受。就像看《Harry Potter》原著,除了沉浸于作者构造的真切魔法场景,体会看小说的乐趣外,还能被字里行间溢出的那种才华而感动,感慨“原来英文也可以写的文采斐然”,感慨“原来代码还能写的这么漂亮”。

2. Queue 的实现

Queue 对应的接口定义如下:

  • util/workqueue/queue.go:26
type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}

这里重点关注
Add

Get

Done
的实现。Queue 的实现类型如下:

  • util/workqueue/queue.go:115
type Type struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	queue []t

	// dirty defines all of the items that need to be processed.
	dirty set

	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set

	cond *sync.Cond
    // ......
}

来看这三个字段:queue、dirty 和 processing:

  • queue
    :queue 定义了这个队列中 items 的顺序,这是一个
    []interface{}
    类型的切片,可以保存任意类型的元素。
  • dirty
    :dirty 的类型是 set 的底层类型是
    map[interface{}]struct{}
    ,也就是
    queue
    中的元素集合会存到这个
    dirty set
    中,也就是“待处理的 items”。
  • processing
    :processing 的类型也是 set,保存的是正在被处理的 items。

也就是说
Queue
会将待处理的 items 全部放到
queue
中,这个
queue
是有序的;同时为了让
queue
中不存在重复的 items,所以加了一个
dirty set
,毕竟判断 map 中是否存在某个 key 比判断 slice 中是否存在某个 item 要快得多。

另外当一个 item 从
queue
中被取出时,也就是出队后,这个 item 会被加到
processing set
中,同时被从
dirty set
中移除,也就是
processing set
保存了目前正在被处理,但是没有处理完的 items。换言之,会存在某个操作来表示“一个 item 已经被处理完成”,也就是
Done(item interface{})
方法。

下一步我们具体来看
Add

Get

Done
的实现。

2.1 Queue.Add(item interface{}) 方法

  • util/workqueue/queue.go:163
func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	if q.shuttingDown {
		return
	}
	if q.dirty.has(item) {
		return
	}

	q.metrics.add(item)

	q.dirty.insert(item)
	if q.processing.has(item) {
		return
	}

	q.queue = append(q.queue, item)
	q.cond.Signal()
}

这里用到了条件变量
sync.Cond
(我在前面贴 Type 结构体代码时特地保留了 cond 字段),不难想到当队列为空的时候,和 Add 相对应的
Get()
方法会 Wait,等到 Add 完成 item 的添加,调用
Signal()
来唤醒。

Add()
方法的实现主要就两个逻辑:

  1. 如果 item 在
    dirty set
    中不存在,就往
    dirty set
    中插入。这样也就解决了
    queue
    中重复放入某个 item 的问题。这个特性在
    workqueue
    包的
    doc.go
    中被描述为“Stingy”,也就是 Queue 的实现不允许同一个 item 被并发处理。再说的简单一点,就是加入
    queue
    中放进去了几个一模一样的 item,这时候如果多个 goroutine 去并发
    Get()
    后其实会拿到一样的 item(比如是一个 pod),那不就乱套了。
  2. 如果 item 在
    processing set
    中不存在,那就加入到
    queue
    中。也就是说如果一个 item “正在被处理”,那么它不会被重复加到 queue 里去排队。显然这也是为了“Stingy”。但是这个 item 会被放到
    dirty set
    里,如果
    processing set
    中也存在这个 item,那么当这个 item “Done”之后,这次加进来的 item 会从
    dirty set
    中被提到
    queue
    里。这个逻辑后面可以看到。

总结一句话:
Add() 会以“吝啬”的方式将 item 入队,这个过程会保证一个 Queue 中同一时刻不存在重复的 item。

2.2 Queue.Get() 方法

  • util/workqueue/queue.go:196
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}

Get()
方法尝试从
queue
中获取第一个 item,同时将其加入到
processing set
中,并且从
dirty set
中删除。

2.3 Queue.Done(item interface{}) 方法

  • util/workqueue/queue.go:223
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}

Done()
方法用来标记一个 item 被处理完成了。调用
Done()
方法的时候,这个 item 被从
processing set
中删除。另外前面提到 Add 的过程中如果发现 item 在
processing set
中存在,那么这个 item 会被暂存到
dirty set
中。这里在处理 item 的 Done 逻辑的时候也就顺带把暂存到
dirty set
中的 item 取出来,加入到 queue 里去了。这个行为同样是有意义的,在“Stingy”的同时允许一个 item 在处理过程中被重新入队,等待下一次重新处理。

3. DelayingQueue 的实现

继续来看 DelayingQueue 的实现。

  • util/workqueue/delaying_queue.go:30
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

这个
Interface
也就是前文提到的
Queue

DelayingQueue

Queue
的基础上加了一个
AddAfter()
接口,实现了“过一会再加入一个 item”的功能。这样也就使得一个 item 处理失败之后,能够在指定延时之后再重新入队。

行,直接来看
AddAfter()
接口是怎么实现的:

  • util/workqueue/delaying_queue.go:205
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// immediately add things with no delay
	if duration <= 0 {
		q.Add(item)
		return
	}

	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}

可以看到当
duration > 0
的时候,
AddAfter()
方法只是简单地构造一个 waitFor 对象,然后将其加入
chan *waitFor
类型的
waitingForAddCh
中。所以后面应该关注的核心逻辑是如何消费这个
waitingForAddCh。

跟一下
waitingForAddCh
的实现,可以找到在
newDelayingQueue()
函数中初始化了
waitingForAddCh
,然后调用了一个 delayingType 的
waitingLoop()
方法:

  • util/workqueue/delaying_queue.go:103
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
	ret := &delayingType{
		Interface:       q,
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name, provider),
	}

	go ret.waitingLoop()
	return ret
}

可以看到
waitingForAddCh
的容量是 1000,类型是
chan *waitFor
,而
waitFor
的类型是:

type waitFor struct {
	data    t
	readyAt time.Time
	// index in the priority queue (heap)
	index int
}

下一步继续看
waitingLoop()
方法的实现:

  • util/workqueue/delaying_queue.go:232
func (q *delayingType) waitingLoop() {
	defer utilruntime.HandleCrash()

	// Make a placeholder channel to use when there are no items in our list
	never := make(<-chan time.Time)

	// Make a timer that expires when the item at the head of the waiting queue is ready
	var nextReadyAtTimer clock.Timer

	// 这是一个优先级队列,用一个最小堆保存了所有 waitFor(waitForPriorityQueue 的类型是 []*waitFor)
	waitingForQueue := &waitForPriorityQueue{}
	heap.Init(waitingForQueue)

	waitingEntryByData := map[t]*waitFor{}

	// 后面的所有逻辑都在这个 for 循环里
	for {
		if q.Interface.ShuttingDown() {
			return
		}

		now := q.clock.Now()

		// Add ready entries
		// 一直判断 waitingForQueue 这个堆里有没有元素,如果有,就 Peek 出来第一个元素看是不是 ready,如果 ready
		// 那就通过 q.Add() 方法将其数据放入队列(也就是前文那个 Queue 实现的 Add 逻辑)
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
			if entry.readyAt.After(now) {
				break
			}

			entry = heap.Pop(waitingForQueue).(*waitFor)
			q.Add(entry.data)
			delete(waitingEntryByData, entry.data)
		}

		// Set up a wait for the first item's readyAt (if one exists)
		// 代码到这里也就是说前面一个 for 里的 break 被执行了,换言之最小堆中的最小元素,也就是最快 ready 的一个都还没有 ready;
		// 这里将 nextReadyAtTimer 计时器的时间设置为(最快 ready 的第一个元素的 ready 时间 - 当前时间),
		// 这样当 nextReadyAt 这个 channel 有数据的时候,最小堆里的第一个元素也就 ready 了。
		nextReadyAt := never
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
			entry := waitingForQueue.Peek().(*waitFor)
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		}

		// 刚才设置好了一个合适的 nextReadyAt,现在开始 select 等待某个 channel 有反应
		select {
		case <-q.stopCh:
			return
		// 心跳时间是10s
		case <-q.heartbeat.C():
			// continue the loop, which will add ready items
		// 执行到这里也就是第一个 item ready 了
		case <-nextReadyAt:
			// continue the loop, which will add ready items

		// 当有元素被通过 AddAfter() 方法加进来时,waitingForAddCh 就会有内容,这时候会被取出来;
		// 如果没有 ready,那就调用 insert(大概率主要就是插入最小堆的过程);如果 ready 了那就直接 Add 到 Queue 里。
		case waitEntry := <-q.waitingForAddCh:
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				q.Add(waitEntry.data)
			}
			// 通过一个循环将 waitingForAddCh 中的所有元素都消费掉,根据 ready 情况要么插入最小堆(优先级队列),要么直接入队。
			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh:
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
						q.Add(waitEntry.data)
					}
				default:
					drained = true
				}
			}
		}
	}
}

我继续看了一眼
insert()
方法的实现,里面就2个逻辑:如果元素不存在,就插入 waitingForQueue 这个优先级队列;如果存在,就更新其 readAt 时间;代码就不具体贴了。

这个
waitingLoop()
写的还是很漂亮,滴水不漏,逻辑严谨,相当优雅。waitingLoop 通过优先级队列实现元素按照时间顺序从近到远排序,这样就能最高效地获取到最快 ready 的元素。然后又根据优先级队列中最快能 ready 的元素的 ready 剩余时间来构造等待计时器,等待的过程中又监测着 waitingForAddCh 这个 channel 中是否有新元素…… 这个过程很完美地体现了 Golang 特色的 Channel 机制的优雅与强大。

4. RateLimitingQueue 的实现

看完了延时队列,继续来看限速队列。

先贴接口:

  • util/workqueue/rate_limiting_queue.go:22
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
	DelayingInterface

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}

层层递进,限速队列建立在延时队列之上,RateLimitingInterface 接口包含了 DelayingInterface 接口的所有方法。显然这里我们主要得关注
AddRateLimited(item interface{})
方法和
Forget(item interface{})
方法的意图与实现。
Forget
方法在我们开发自定义控制器的时候其实会用到,故需知其然,能知其所以然则更佳。

接着找到
AddRateLimited(item interface{})
方法的实现:

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

是不是很酷,通过一个 rateLimiter 确定一个 item 的 ready 时间,然后通过延时队列完成延时入队的逻辑。这里就只剩下 rateLimiter 这个限速器的实现需要研究了。

如果看
Forget(item interface{})
方法:

func (q *rateLimitingType) Forget(item interface{}) {
	q.rateLimiter.Forget(item)
}

同样,内容都在 rateLimiter 里。行,接着来看 rateLimiter 的实现。

5. rateLimiter 限速器的实现

rateLimiter 对应的接口定义如下:

  • util/workqueue/default_rate_limiters.go:27
type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
	When(item interface{}) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// NumRequeues returns back how many failures the item has had
	NumRequeues(item interface{}) int
}

实现可就有意思了,变着花样限速:

  1. BucketRateLimiter
  2. ItemExponentialFailureRateLimiter
  3. ItemFastSlowRateLimiter
  4. MaxOfRateLimiter
  5. StepRateLimiter
  6. WithMaxWaitRateLimiter

六种实现,对应不同的限速思路,体现算法之美的地方。

6. 控制器里用的默认限速器

在 sample-controller 里有这样一行代码:

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

我们先看下默认 workqueue 里用的限速器是哪一个。

  • util/workqueue/default_rate_limiters.go:39
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

这里调用了
NewMaxOfRateLimiter()
函数,这个函数会返回一个
MaxOfRateLimiter

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
	return &MaxOfRateLimiter{limiters: limiters}
}

MaxOfRateLimiter
内部保存多个其他限速器的实现,然后返回一个“限速最久”的结果:

// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
	limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)
	for _, limiter := range r.limiters {
		curr := limiter.When(item)
		// 拿到最久的限速时长
		if curr > ret {
			ret = curr
		}
	}

	return ret
}

你们前面贴的这个
DefaultControllerRateLimiter()
函数的实现就很容易理解了:

func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

这里接收两个限速器,然后看谁凶就用谁的限速结果。这两个限速器分别是:

  • ItemExponentialFailureRateLimiter
  • BucketRateLimiter

ItemExponentialFailureRateLimiter
限速器会指数级增加限速时长,也就是先延迟5毫秒,如果失败就变成10毫秒、20毫秒、40毫秒…… 最大不超过 1000秒。

BucketRateLimiter
是一个比较基础的令牌桶实现,在这里的2个参数10和100的含义是令牌桶里最多有100个令牌,每秒发放10个。

到这里控制器用的限速器逻辑也就完全能理解了。每个限速器的具体实现代码就不具体贴了,到这里我想要掌握的信息已经足够了。不过在早几年的我的一篇博客
《Kubernetes client-go 源码分析 - workqueue》
里有具体的每个限速器的实现逻辑分析,如果你感兴趣也可以跳转过去阅读。

7. 总结

在编写自定义控制器的时候我们会用到这样的代码:

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

这里实例化的队列是一个限速队列。


ResourceEventHandlerFuncs
里我们会写
queue.Add(key)
这样的代码,这里的 Add 也就有了限速逻辑,也就是当一个待调谐对象不停地被接收到时,worker 是“有脾气”的,来得越频繁,门口站着等的时间也就越长。

控制器里还会写到这样的代码:

  • obj, shutdown := c.workqueue.Get()
  • c.workqueue.Done(obj)
  • c.workqueue.Forget(obj)
  • c.workqueue.AddRateLimited(obj)

这些方法也就对应着一个 obj 从 workqueue 中取出来(
Get()
),如果处理完成,就调用
Done(obj)
,从而从队列中彻底移除,同时调用
Forget(obj)
告诉记速器可以忽略这个 obj 了(可别下次同名 obj 来的时候,误判了,让人家白等半天)。最后如果处理失败,遇到点啥临时异常情况,那就放回到 workqueue 里去,用
AddRateLimited(obj)
方法。

至此,workqueue 相关的用法我们就知其然,亦知其所以然了。

本文介绍在
Visual Studio
软件中配置、编译
C++
环境下
matplotlibcpp
库的详细方法。

matplotlibcpp
库是一个
C++
环境下的绘图工具,其通过调用
Python
接口,实现在
C++
代码中通过
matplotlib
库的命令绘制各类图像。由于其需要调用
Python
接口,因此在配置
matplotlibcpp
库时有些较为麻烦的操作。本文就将
matplotlibcpp
库的具体配置方法进行详细介绍。

1 Git配置

Git
是一个分布式开源版本控制系统,在后期我们需要基于其完成
vcpkg
包管理器的下载与安装,因此需要首先完成
Git
的配置;具体方法大家可以参考
下载、安装Git并拷贝GitHub项目到本地的流程
这篇文章。

2 vcpkg配置

vcpkg
是一个开源的
C++
包管理器,在后期我们需要基于其完成
matplotlibcpp
库的下载与安装,因此需要首先完成
vcpkg
的配置。

首先,选定一个路径作为
vcpkg
的保存路径;随后,在这一文件夹下,按下
Shift
按钮并同时右击鼠标,选择“
在此处打开Powershell窗口
”。

随后,将弹出如下所示的窗口。

接下来,在其中输入如下的代码,并运行。

git clone https://github.com/microsoft/vcpkg

具体如下图所示。

稍等片刻,出现如下所示的界面,说明
vcpkg
安装完毕。

随后,输入如下代码,进入
vcpkg
保存路径。

cd vcpkg

再输入如下代码,激活
vcpkg
环境。

.\bootstrap-vcpkg.bat

具体如下图所示。

运行完毕后,将得到如下所示的结果。

接下来,再输入如下所示的代码,将
vcpkg
与我们的
Visual Studio
软件相连接。

.\vcpkg integrate install

具体如下图所示。

代码运行完毕后,如下图所示。

3 matplotlibcpp配置

接下来,我们即可开始进行
matplotlibcpp
库的配置。

3.1 matplotlibcpp安装

首先,依然在刚刚的界面中,输入如下代码,安装
matplotlibcpp
库。

.\vcpkg install matplotlib-cpp

代码运行结束后,得到如下所示的结果。

随后,输入如下所示的代码,安装64位的
matplotlibcpp
库。

 .\vcpkg install matplotlib-cpp:x64-windows

运行代码后,得到如下所示的结果。

3.2 matplotlibcpp配置

首先,在刚刚配置的
vcpkg
的保存路径中,通过以下路径,找到
matplotlibcpp.h
文件,并将其打开。

随后,在其
#include
部分的最下方,添加如下代码。

#include <string>

具体如下图所示。

同时,在该文件
340
行左右,将
template
开头的两行注释掉,如下图所示。

4 Python配置

由于
matplotlibcpp
库是通过调用
Python
接口,实现在
C++
代码中通过
matplotlib
库的命令绘制各类图像,因此配置
matplotlibcpp
库时还需要保证电脑中拥有
Python
环境。而这里的
Python
环境也有一个具体的要求——需要具有
Debug
版本的
Python

因此,可以分为3种情况:第一种情况,是大家电脑中
之前没有安装过任何Python环境
;第二种情况,是大家
之前有通过Anaconda下载Python环境
;第三种情况,则是大家之前
有通过Python官方下载Python环境
。针对这三种情况该具体如何配置,我们也会在接下来的文章中具体提及。

首先,对于第二种情况,也就是
之前有通过Anaconda下载Python环境
的情况,大家从这里开始看就好。首先,需要看一下
Anaconda

Python
的版本;如下图所示,我这里就是在
Anaconda
中有
3.9.12
版本的
Python

其次,对于第一种情况,也就是
之前没有安装过任何Python环境
的情况,大家从这里开始看就好。我们在
Python

官方下载地址
中,下载最新的
Python
版本即可(如果是之前有通过
Anaconda
下载
Python
环境的情况,大家这里下载和自己
Anaconda

Python
版本不一样的版本即可。

随后,双击打开刚刚下载好的安装包。对于第三种情况,即大家之前
有通过Python官方下载Python环境
的情况,那么直接找到当初的安装包,然后进行如下的操作即可。

首先,选择“
Customize installation
”选项。

接下来的页面,选择默认的配置即可。

随后的页面,选中第一个方框中所包含的勾选项,并在其下方配置自定义安装路径;这个路径建议大家自己修改一下,同时记下来这个路径,之后会经常用到。

随后,依据文章
Windows电脑环境变量(用户变量、系统变量)的修改
提到的方法,首先将以下两个路径添加到
环境变量
中的
用户变量

Path
中。具体这两个路径的前缀,和大家前面所选的
Python
安装路径有关。

接下来,将这两个路径同样在
环境变量

系统变量

Path
中添加一下;此外,还要注意,如果大家的
环境变量
中,有原本的
Python
路径,大家最好将原本的路径放在我们新建的变量的下方,如下图所示。

此外,还需要在
系统变量
中,添加如下所示的两个内容;其中,“
变量
”一栏依次填写
PYTHONHOME

PYTHONPATH
,“

”一栏就是刚刚我们的
Python
安装路径。

随后,我们在计算机中进入
Python
环境,就默认进入我们刚刚配置的、新的
Python
环境;之后如果我们需要正常使用
Python
了,可以用我们这次配置的新的
Python
;也可以将刚刚配置的
PYTHONHOME

PYTHONPATH
两个系统变量删除,并将原有
Python
所对应的
环境变量
提前到刚刚配置好的
Python

环境变量
之前,从而使用我们原先版本的
Python

接下来,我们需要对新创建的
Python
进行
matplotlib
库与
numpy
库的安装。这里就使用
Python
最传统的
pip
安装方法即可,首先输入如下的代码。

pip install -U matplotlib

出现如下所示的界面即说明
matplotlib
库已经安装完毕。

随后,输入如下所示的代码。

pip install numpy scipy matplotlib

即可完成
numpy
库的安装。

5 解决方案配置

接下来,我们创建或打开需要调用
matplotlibcpp
库的解决方案。

首先,将前述
Python
安装路径下的以下两个
.dll
文件复制(具体文件名称与
Python
版本有关)。

并将其复制到解决方案的文件夹下。

随后,依据文章
如何在Visual Studio新C++项目中调用之前配置过的库?
中提到的方法,分别进行以下配置。

首先,在“
附加包含目录
”中,将
Python

numpy
库的
include
文件夹放入其中。

其次,在“
附加库目录
”中,将
Python
安装路径下
libs
文件夹的路径放入其中。

再次,在“
附加依赖项
”中,将
Python
安装路径下
libs
文件夹中如下所示的4个
.lib
文件放入其中。

随后,对于需要调用
matplotlibcpp
库的程序,需要添加以下代码。

#include "matplotlibcpp.h"
namespace plt = matplotlibcpp;

具体如下图所示。

随后,即可开始运行代码。这里提供一个最简单的
matplotlibcpp
库调用代码。

#include "matplotlibcpp.h"

namespace plt = matplotlibcpp;

int main() {
	plt::plot({ 1, 2, 3, 4 });
	plt::show();
	return 0;
}

运行代码,出现如下所示的窗口。

以上,即完成了
matplotlibcpp
库的配置。