wenmo8 发布的文章

本文分享自华为云社区
《【GaussTech技术专栏】GaussDB的BTree索引和UBTree索引》
,作者:GaussDB 数据库。

1. 简介

数据库通常使用索引来提高业务查询的速度。本文将深入介绍GaussDB中最常用的两种索引:BTree索引和UBTree索引。我们将重点解读BTree索引和UBTree索引的存储结构,探讨它们在读写并发、写写并发以及MVCC(多版本并发控制)能力方面的优势,并展望它们的未来演进。

2. BTree索引和UBTree索引结构

GaussDB的主流存储引擎有两种:Append Update存储引擎(Astore)和In-place Update存储引擎(Ustore)。更多Ustore,请阅《
GaussDB Ustore存储引擎解读
》。

在Astore中,索引默认采用BTree;在Ustore中,索引默认采用UBTree。

相比于BTree,UBTree在叶子节点层额外维护了数据的MVCC信息。不管是BTree还是UBTree,都是采用基于L&Y撰写的论文《Efficient Locking for Concurrent Operations on B-Trees》和《ASYMMETRIC CONCURRENTB-TREEALGORITHM》改造而来的Blink Tree,其组织结构如图1所示。

图1:Blink Tree组织结构

图1中①到⑦都是独立的索引页面,GaussDB默认页面大小都是8K,采用堆表引擎结构,其中索引页和数据页分开存储。索引的叶子节点保留有指向数据页的行指针<K, V>,相比于传统B+树索引,Blink Tree具有以下特点:

  1. 不仅叶子节点兄弟之间有连接,非叶子结点的兄弟之间也有连接;

  2. 除了每层的最右侧节点外,其余节点内部都有一个highkey,该值是节点的upper boundary(上边界);

  3. 一个节点的highkey,一定大于以该节点为根节点的子树的所有key;

  4. 除了叶子节点,其余节点每个<Key, Value>对(除highkey外)的value值均指向一个孩子节点,该key小于等于被指向子节点的最小值,是孩子节点的lower boundary(下边界)。

3. BTree索引和UBTree索引优势

3.1 高并发读写能力

BTree索引和UBTree索引,由于其特殊的结构设计,相比于传统的B+树,在数据查询方面有一定的性能优势,这主要是因为Blink Tree内部的两大特点:一是highkey的存在,二是所有层级兄弟节点之间直接相互连接的结构。这些特点使得BTree索引和UBTree索引在查询和插入操作时,能够采用更加友好的加锁协议,从而在高并发场景下实现了读写性能更佳。

  • 读写并发优势

我们先来讨论BTree索引和UBTree索引在处理读写并发时的优势。为了更直观地对比,首先,回顾一下传统的B+树是如何查询数据的,其查询的流程如图2所示。

图2:B+树查询流程

首先,查询事务会先锁住根节点①,然后,通过二分查找找到<K11, V11>,接着锁住节点②,通过二分查找找到<K22, V22>,同时释放节点①的锁。之后,事务会锁住节点④,继续二分查找,直到找到目标值<K42, V42>,再释放掉节点②的锁,最后会释放节点④的锁。

以上过程是典型的蟹行协议,即总是会先持有子节点的锁,再释放父节点的锁,且同一时刻最多持有两把锁。
显然,这对于高并发场景下的读写并发性能是不友好的

我们再来看看BTree索引和UBTree索引在查询时是如何加锁的,其查询流程如图3所示。

图3:Blink Tree查询流程

首先,查询事务会锁住根节点①,然后,利用二分查找找到<K11, V11>。接着,事务会先释放节点①的锁,再锁住节点②,再利用二分查找找到<K22, V22>。紧接着,事务会先释放节点②的锁,再锁住节点④,最后通过二分查找找到目标值<K42, V42>后,再释放节点④的锁。

在以上过程中,总是先释放父节点的锁,再持有子节点的锁,且同一时刻最多持有一把锁。显然,这对于高并发场景下的读写并发性能是更友好的。

现在我们来讨论一下,为什么BTree索引和UBTree索引在查询时,相比传统B+树索引会有以上优势?

这主要是因为,一个事务在查询时,可能会有另外一个事务导致索引的节点分裂,进而导致从根节点到叶子节点的遍历过程中,如果先释放父节点的锁,再获取子节点的锁,可能会在持有子节点锁的时刻,目标元组<K, V>已经转移到了新分裂的节点。这时候需要有某种机制,可以快速地识别到这种情况,并且可以定位到目标<K, V>所在的节点,而Blink Tree的特殊结构可以比较简单地实现这种机制。

图4:Blink Tree Move Right机制

更直观地,以图4的例子来说,当查询事务在执行到Unlock ②与Lock ④之间的时间段时,如果有另外一个写事务插入了数据<K42’, V42’>,这将导致节点④发生了分裂,从而使得目标元组<K42, V42>转移到了新分裂的节点⑤中。

当查询事务再次Lock ④后,通过比较目标key,即K42和节点④的highkey大小,发现K42大于节点④的highkey,这说明K42已经不存在于节点④内部,而是存在于节点④右边的某个节点中(BTree索引和UBTree索引有约束,节点只能往右分裂)。此时会启动move right机制,从节点④开始往右遍历,直到找到一个highkey大于K42的节点,在图4中就是节点⑤。最后,在节点⑤中找到目标元组<K42, V42>。在这个过程中,同一时刻仍然最多持有一把锁。

相比之下,传统B+树索引目前是不能做到这些的。首先,它不能识别到节点④已发生分裂,其次,由于非叶子节点层的节点之间没有建立连接,故它不能实施一种类似于move right的机制。

  • 写写并发优势

除了读写并发优势外,BTree索引和UBTree索引相比于B+树索引,还有数据写写并发的优势。BTree索引和UBTree索引与B+树索引在插入数据时,都会经历两个步骤:

  1. 找到插入位置;

  2. 插入数据。

步骤1是查询过程,BTree 索引和 UBTree索引在这个过程中,仍然可以保持同一时刻只有一把锁的优势。

步骤2是写入过程,如果节点需要分裂,BTree索引和 UBTree索引与B+树索引的处理机制并不相同,这时BTree索引和UBTree索引仍然有较大优势。

下面我们来探讨传统的B+树是如何插入数据的。

图5:B+树插入数据导致节点分裂

如图5所示,当新插入数据<K51’, V51’>找到了插入节点④时,发现节点④需要分裂成节点④和节点⑤,由于每一层节点的分裂,都有可能触发上一层节点发生分裂,因此会重新采用悲观加锁策略,将搜索路径上的所有节点(节点①,节点②和节点③)全部加写锁后,再执行插入和分裂操作。

这一操作对系统性能有显著影响。尽管部分存储引擎对B+树做了优化(比如加意向锁),但是往往只是优化读写并发的场景,写写场景的性能优化仍然比较困难。这是因为底层节点分裂时,需要一种轻量化的机制找到它的父节点以写入数据,而传统B+树难以实现。

那么为什么BTree索引和 UBTree索引在插入时就不需要悲观锁呢?

这仍然是Blink Tree特殊的存储结构决定的。我们来看Blink Tree是怎么在有其他写事务的干扰下实现节点分裂时找到正确的父节点的,如图6所示。

图6:Blink Tree 写写并发

事务1需在Blink Tree中插入新元组<K61’, V61’>。首先,从根节点到叶子节点遍历查找插入位置,在Unlock节点②和Lock节点④之间的时间段内,有事务2导致Blink Tree发生了分裂(节点②分裂成节点⑤),事务1找到节点④为插入节点,但发现节点④空间不足,遂节点④发生分裂,随后,事务1会根据记录的路径向上递归至父节点插入新的节点信息。当遍历到节点②时,事务1发现节点②并非节点④的父节点,于是从节点②开始move right,直到找到节点④真正的父节点。

由于move right机制的存在,BTree索引和UBTree索引从待分裂节点出发,向上能找到正确的父节点,并且只会对当前时刻涉及到写操作的节点加锁。这种方式相比于悲观地锁住整条搜索路径,更加轻量化,对于高并发的写写操作会更加友好。

3.2 MVCC能力和独立垃圾回收能力

UBTree索引是GaussDB Ustore的默认索引,它最大的特点是在叶子节点增加了MVCC能力以及独立过期旧版本回收能力。通过在索引页面的元组上维护版本信息,UBtree能够在索引层进行MVCC可见性检查。同时,UBtree也能通过版本信息独立判断索引元组是否已经无效(Dead),进而使得Ustore能实现数据表以及索引表上页级的空间清理,并在此基础上构建了一个不依赖Auto Vacuum的独立垃圾回收机制。

图7:UBTree与BTree查找、更新比较示意图

图7中的左图,展示了Astore和Ustore在索引扫描过程中的差异。索引扫描过程中,系统会通过扫描条件在索引上进行二分查找,找到符合条件元组的TID,再通过TID到数据表上查找对应的数据元组。对Astore而言,由于BTree索引没有版本信息,因此,元组的可见性检查需要完全在数据页面上进行。而对于Ustore,元组的可见性检查一部分可以在UBTree上完成,支持MVCC的UBTree索引过滤掉不可见的索引元组,从而减少了无效的页面访问和I/O操作。

图7中的右图,展示了Astore和Ustore索引列更新情况下的差异。在Astore中,更新操作会先将旧元组4标记为删除,再插入新元组10。由于BTree索引上4和10对应的TID不同,单次扫描中只有一个能通过可见性检查。

对于Ustore而言,更新操作采取in-place的方式,即直接在原位更新元组,可见性检查会返回一个对当前快照可见的版本。在in-place更新涉及索引列变更的情况下,4和10两个索引元组对应的TID会是相同的,若直接通过TID访问Ustore,会有两次读取可见版本的操作。然而,这种情况下UBTree通过索引层的可见性检查,能判断元组4已不可见,从而避免通过其对应的TID(如TID1)在Ustore上查找元组。

此外,对于UBTree而言,索引能实现页级别的独立清理。索引上的元组可以通过自身的版本信息来判断是否仍然有效。因此,在Insert空间不足时,可以直接清理页面上无效的元组。同时Ustore数据表上无效元组的行指针可以直接复用,这是因为UBTree同样可以检测出对应的索引元组无效,从而避免通过其TID来访问数据表。一旦数据表和索引都支持了页级别的独立清理逻辑,存储引擎就有可能一种实现完全不依赖Auto Vacuum的垃圾回收机制。

4. 总结与展望

本文先介绍了BTree索引和UBTree索引的存储结构BlinkTree,该索引的所有节点都和兄弟节点相连接,并且每个节点新增一个highkey作为节点key值的上边界。随后分析了BTree索引和UBTree索引,相比传统B+树在读写场景、写写场景有更强的并发能力的原因,这主要是因为Blink Tree特殊的存储结构,它可以支持move right机制,有效解决事务期间节点分裂的场景,使得事务在遍历Blink Tree时每个时刻持有更少的锁。最后,还介绍了UBTree的MVCC能力和独立垃圾回收能力。

尽管相比于传统B+树,BTree索引和UBTtree索引已经有诸多优势,但在未来,我们仍有许多关于BTree索引和UBTtree索引的优化工作。比如,UBTrree由于需要维护MVCC信息,导致索引占用空间变大。为了弥补这个劣势,我们可以将索引进行压缩或者寻找一种更节省空间的MVCC信息管理方法。GaussDB目前正在做一些这方面的研究工作,未来会分享我们的成果。

引用:

1.
https://dl.acm.org/doi/10.1145/319628.319663

2.
https://dl.acm.org/doi/10.5555/324493.324589

3.
http://mysql.taobao.org/monthly/2020/06/02/


华为开发者空间,汇聚鸿蒙、昇腾、鲲鹏、GaussDB、欧拉等各项根技术的开发资源及工具,致力于为每位开发者提供一台云主机、一套开发工具及云上存储空间,让开发者基于华为根生态创新。
点击链接
,免费领取您的专属云主机

在上一节,我们一起探讨了ChatGPT在功能测试用例生成方面的优势。接下来,我们将探讨ChatGPT自动生成功能测试用例的步骤。

1)    问题定义:让ChatGPT自动生成功能测试用例的第一步是清晰地定义要测试的功能或特性和提供足够的上下文信息。提供足够的上下文信息对于ChatGPT生成准确的功能测试用例至关重要。上下文信息可能包括产品的版本、环境信息、用户角色等,确保ChatGPT了解测试的背景,以便生成相关性高的功能测试用例。

2)    ChatGPT交互:测试人员与ChatGPT进行交互,向其提供问题描述和上下文信息。ChatGPT将根据这些信息来生成测试用例,在操作过程中必须注意提供清晰的问题描述和进行适时的追问。

向ChatGPT提供清晰、明确的问题描述,有助于ChatGPT理解需求,从而生成相关性高的测试用例。通过适时的追问,ChatGPT可能会向用户提出一些澄清性的问题,以确保它理解用户的需求。请及时做出回应,以便ChatGPT可以生成准确的测试用例。

3)    测试用例生成:ChatGPT生成的内容是自然语言描述,需要将其转化为可执行的测试用例。ChatGPT生成的测试用例要符合测试用例设计规范,必须保证输出的测试用例格式一致,且每个测试用例都要有用例编号、用例名称、操作步骤以及预期结果。借助自然语言处理工具可以将描述转化为测试步骤和预期结果。ChatGPT设计的测试用例有可能会因为使用ChatGPT的测试人员经验不足导致设计的提示词对需求覆盖不全面的问题。项目团队要有良好地沟通反馈机制,当上述情况出现时,应及时进行必要的调整。

4)    用例评审和改进:ChatGPT生成功能测试用例后,需要进行测试团队内、外部评审,收集相关评审意见,依据评审意见进行功能测试用例的修改与完善。功能测试用例必须要及时维护,以保证功能测试用例与软件系统及需求规格说明书的一致性。只要项目没有结束,测试团队就要及时维护功能测试用例,这可能是一个迭代的过程。测试工具集成,并不是所有企业都可以做到的,在一些中小型企业由于测试人员不足和能力有限等,通常在ChatGPT生成功能测试用例后,由测试人员执行测试用例。而有些中大型企业则自行开发一些测试平台,测试平台可能集成多个工具,ChatGPT生成功能测试用例后,测试平台能直接读取功能测试用例并直接生成自动化测试脚本,这显然是一种更高层次的测试用例设计。

当使用ChatGPT进行功能测试用例自动生成时,测试团队可以迅速生成功能测试用例并将其融入测试流程。这种方法提高了测试工作的自动化水平,减轻了测试团队的工作负担。然而,在最初阶段使用ChatGPT协助生成功能测试用例时,可能会遇到生成的功能测试用例覆盖不全面的问题,这可能是提示词编写问题、测试人员知识和经验不足等因素导致的。为了解决这些问题,团队可以进行一些ChatGPT工具的使用培训、技术及经验交流等活动。

上一篇:《人工智能大语言模型起源篇(一),从哪里开始》

(5)Howard 和 Ruder 于2018年发表的《Universal Language Model Fine-tuning for Text Classification》,
https://arxiv.org/abs/1801.06146

这篇论文从历史的角度来看非常有意思。尽管它是在原始的《Attention Is All You Need》变换器发布一年后写的,但它并没有涉及变换器,而是专注于递归神经网络。然而,它仍然值得注意,因为它有效地提出了语言模型的预训练和迁移学习,用于下游任务。

尽管迁移学习在计算机视觉中已经被确立,但在自然语言处理(NLP)中还不普遍。ULMFit 是首批展示预训练语言模型并对其进行微调,从而在许多NLP任务中取得最先进成果的论文之一。

ULMFit 提出的微调语言模型的三阶段过程如下:

  1. 在大规模文本语料库上训练语言模型。

  2. 在任务特定的数据上微调这个预训练的语言模型,使其能够适应文本的特定风格和词汇。

  3. 在任务特定数据上微调分类器,并逐步解冻各层,以避免灾难性遗忘。

这个过程——在大规模语料库上训练语言模型,然后在下游任务上进行微调——是基于变换器的模型和像BERT、GPT-2/3/4、RoBERTa等基础模型所使用的核心方法。

然而,ULMFiT的关键部分——逐步解冻,通常在实践中不会常规进行,尤其是在使用变换器架构时,通常会一次性微调所有层。


来源:
https://arxiv.org/abs/1801.06146

(6)Devlin、Chang、Lee 和 Toutanova 于2018年发表的《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》,
https://arxiv.org/abs/1810.04805

继原始的变换器架构之后,大型语言模型的研究开始分为两个方向:一种是用于预测建模任务(如文本分类)的编码器风格变换器,另一种是用于生成建模任务(如翻译、总结和其他形式的文本生成)的解码器风格变换器。

上面的BERT论文介绍了掩蔽语言模型(masked-language modeling)和下一句预测(next-sentence prediction)这一原始概念。它仍然是最具影响力的编码器风格架构。如果你对这一研究方向感兴趣,我推荐你进一步了解RoBERTa,它通过去除下一句预测任务,简化了预训练目标。


来源:
https://arxiv.org/abs/1810.04805

(7)Radford 和 Narasimhan 于2018年发表的《Improving Language Understanding by Generative Pre-Training》,
https://www.semanticscholar.org/paper/Improving-Language-Understanding-by-Generative-Radford-Narasimhan/cd18800a0fe0b668a1cc19f2ec95b5003d0a5035

原始的GPT论文介绍了流行的解码器风格架构,并通过下一个词预测进行预训练。BERT可以被看作是一个双向变换器,因为它的预训练目标是掩蔽语言模型,而GPT是一个单向的、自回归模型。虽然GPT的嵌入也可以用于分类任务,但GPT方法是当今最具影响力的大型语言模型(LLM)的核心,例如ChatGPT。

如果你对这个研究方向感兴趣,我建议你进一步阅读GPT-2
https://www.semanticscholar.org/paper/Language-Models-are-Unsupervised-Multitask-Learners-Radford-Wu/9405cc0d6169988371b2755e573cc28650d14dfe和GPT-3
https://arxiv.org/abs/2005.14165的论文。这两篇论文展示了LLM能够进行零-shot和少-shot学习,并突出了LLM的突现能力。GPT-3仍然是当前一代LLM(如ChatGPT)训练的流行基准和基础模型——我们稍后会作为单独的条目讨论导致ChatGPT的InstructGPT方法。

              来源: https://www.semanticscholar.org/paper/Improving-Language-Understanding-by-Generative-Radford-Narasimhan/cd18800a0fe0b668a1cc19f2ec95b5003d0a5035

(8)Lewis、Liu、Goyal、Ghazvininejad、Mohamed、Levy、Stoyanov 和 Zettlemoyer 于2019年发表的《BART: Denoising Sequence-to-Sequence Pre-training for Natural Language Generation, Translation, and Comprehension》,
https://arxiv.org/abs/1910.13461

如前所述,BERT类型的编码器风格LLM通常更适用于预测建模任务,而GPT类型的解码器风格LLM则更擅长生成文本。为了兼顾两者的优点,上面的BART论文将编码器和解码器部分结合在一起(这与原始的变换器架构(本清单中的第二篇论文)并无太大区别)。


来源:
https://arxiv.org/abs/1910.13461

(9)Yang、Jin、Tang、Han、Feng、Jiang、Yin 和 Hu 于2023年发表的《Harnessing the Power of LLMs in Practice: A Survey on ChatGPT and Beyond》,
https://arxiv.org/abs/2304.13712

这不是一篇研究论文,但可能是迄今为止最好的架构综述,展示了不同架构的演变过程。然而,除了讨论BERT风格的掩蔽语言模型(编码器)和GPT风格的自回归语言模型(解码器)外,它还提供了关于预训练和微调数据的有用讨论和指导。

                                          现代 LLM 的进化树,来自 https://arxiv.org/abs/2304.13712。

前言

本文将继续介绍ansible-playbook中roles的各种用法

环境准备

组件 版本
操作系统 Ubuntu 22.04.4 LTS
ansible 2.17.6

基本用法

文件结构

.
├── deploy.hosts
├── deploy.yaml
└── roles
    └── base
        └── tasks
            └── main.yaml

定义全局公共变量

新增group_vars目录,并且新增文件

.
├── deploy.hosts
├── deploy.yaml
├── group_vars
│   └── all.yaml
└── roles
    └── base
        └── tasks
            └── main.yaml

▶ cat group_vars/all.yaml
info: IT paiqiu
▶ cat roles/base/tasks/main.yaml
- name: first
  debug:
    msg: "hello world {{ info }}"

运行:

▶ ansible-playbook -i deploy.hosts deploy.yaml

...
TASK [base : first] **********************************************************************************************
ok: [10.22.11.166] => {
    "msg": "hello world IT paiqiu"
}
...

定义role级变量

可以覆盖全局变量

在role
base
下面新增vars目录,并且新增main.yaml文件

.
├── deploy.hosts
├── deploy.yaml
├── group_vars
│   └── all.yaml
└── roles
    └── base
        ├── tasks
        │   └── main.yaml
        └── vars
            └── main.yaml
▶ cat roles/base/tasks/main.yaml
- name: first
  debug:
    msg: "hello world {{ info }}"
▶ cat roles/base/vars/main.yaml
info:
  name: wilson
  addr: cd

运行:

▶ ansible-playbook -i deploy.hosts deploy.yaml

...
TASK [base : first] **********************************************************************************************
ok: [10.22.11.166] => {
    "msg": "hello world {'name': 'wilson', 'addr': 'cd'}"
}
...

定义静态文件目录

相当于定义role级别的根目录,更好的管理需要传输的文件,在role
base
下面创建目录
files
,并且在
files
下面放入需要传输的文件
test.img

.
├── deploy.hosts
├── deploy.yaml
├── group_vars
│   └── all.yaml
└── roles
    └── base
        ├── files
        │   └── test.img
        ├── tasks
        │   └── main.yaml
        └── vars
            └── main.yaml

传输
test.img
至目标机器

- name: first
  copy: src=test.img dest=/tmp/test.img

test.img默认回去roles/base/files当中寻找,所以前面不需要添加路径

copy模块会自动检查待传输文件的md5,如果没有变化,那就不会再传输了

定义模版

每次传输文件都会通过输入的变量重新渲染之后再传输,在role
base
下面创建目录
templates
,并且创建文件
test.conf

.
├── deploy.hosts
├── deploy.yaml
├── group_vars
│   └── all.yaml
└── roles
    └── base
        ├── files
        │   └── test.img
        ├── tasks
        │   └── main.yaml
        ├── templates
        │   └── test.conf
        └── vars
            └── main.yaml

在模版文件中定义了两个变量,一个是手动设置的变量version,一个是ansible内建变量inventory_hostname

▶ cat roles/base/templates/test.conf
{{ version }}
{{ inventory_hostname }}
▶ cat roles/base/tasks/main.yaml
- name: first
  template: src=test.conf dest=/tmp/test.conf mode=0644

运行:

▶ ansible-playbook -i deploy.hosts -e version=2 deploy.yaml

PLAY [deploy] ****************************************************************************************************

TASK [base : first] **********************************************************************************************
changed: [10.22.11.166]

PLAY RECAP *******************************************************************************************************
10.22.11.166               : ok=1    changed=1    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0

登录到目标机器查看:

▶ cat test.conf
2
10.22.11.166

2
是本次任务传入的变量,
10.22.11.166
则是目标机器的ip地址

定义事件驱动

通常用于某些任务完成之后需要触发的动作,比如:发布完某个配置文件之后需要重启服务。在role
base
下创建目录
handlers
,并创建文件
main.yaml

.
├── deploy.hosts
├── deploy.yaml
├── group_vars
│   └── all.yaml
└── roles
    └── base
        ├── files
        │   └── test.img
        ├── handlers
        │   └── main.yaml
        ├── tasks
        │   └── main.yaml
        ├── templates
        │   └── test.conf
        └── vars
            └── main.yaml

创建事件
first handler

▶ cat roles/base/handlers/main.yaml
- name: first handler
  debug:
    msg: echo 'hello world'
▶ cat roles/base/tasks/main.yaml
- name: first
  template: src=test.conf dest=/tmp/test.conf mode=0644
  notify: first handler

运行:

▶ ansible-playbook -i deploy.hosts -e version=2 deploy.yaml

PLAY [deploy] ****************************************************************************************************

TASK [base : first] **********************************************************************************************
changed: [10.22.11.166]

RUNNING HANDLER [base : first handler] ***************************************************************************
ok: [10.22.11.166] => {
    "msg": "echo 'hello world'"
}

PLAY RECAP *******************************************************************************************************
10.22.11.166               : ok=2    changed=1    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0

同样的,如果事件前驱没有发生,那事件通知也不会发生了,比如事件前驱是传输一个文件,如果文件没有变化,就不会再传输,那事件也不会再发生了

新增更多的task任务

当前只有
main.yaml
,在增加一个专门用于传输文件的任务
files.yaml

▶ cat roles/base/tasks/main.yaml
- name: main
  include_tasks: files.yaml
  vars:
    app_version: 0.2
▶ cat roles/base/tasks/files.yaml
- name: first
  debug:
    msg: "version: {{ app_version }}"

运行:

▶ ansible-playbook -i deploy.hosts deploy.yaml

PLAY [deploy] ****************************************************************************************************

TASK [base : main] ***********************************************************************************************
included: /home/wilson/workspace/ansible/roles/base/tasks/files.yaml for 10.22.11.166

TASK [base : first] **********************************************************************************************
ok: [10.22.11.166] => {
    "msg": "version: 0.2"
}

PLAY RECAP *******************************************************************************************************
10.22.11.166               : ok=2    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0

如果文件过多,可以通过循环来处理:

- name: main
  include_tasks: "{{ yaml_items }}"
  loop:
    - files.yaml
    - packages.yaml
  loop_control:
    loop_var: yaml_items
  vars:
    app_version: 0.2

补充一下常用的命令

执行命令
shell

- name: display
  shell: echo 'hello world'

传输文件
copy

copy模块会自动检查待传输文件的md5,如果没有变化,那就不会再传输了

- name: copy file
  copy: src=test.img dest=/tmp/test.img

修改文件内容(一行)
lineinfile

  • path: 指定要操作的文件路径(必填)
  • line: 定义要插入或替换的完整行内容(可选)
  • regexp: 用于匹配现有行的正则表达式。与 line 配合使用,匹配的行将被替换
  • state:
    • present(默认):确保指定行存在
    • absent:移除匹配的行
  • create: 如果文件不存在,是否创建文件(默认 yes)
  • insertafter: 指定插入行的位置(默认 EOF)
    • EOF:文件末尾
    • BOF:文件开头
    • 正则表达式:在匹配行之后插入
      insertbefore: 和 insertafter 类似,但在匹配行之前插入

通过正则匹配行之后修改当前行

- name: modify
  lineinfile: dest=/tmp/test.txt regexp='^DC' line="DC=hello" state=present

修改文件内容(多行/块)
blockinfile

  • path: 指定要操作的文件路径(必填)
  • block: 定义要插入的文本内容(必填)
  • marker: 指定标记块的开始和结束字符串,默认为 # {mark} ANSIBLE MANAGED BLOCK
  • state:
    • present(默认):确保块存在
    • absent:移除块
  • create: 如果文件不存在,是否创建文件(默认 yes)
  • insertafter: 指定插入块的位置(默认 EOF)
    • 可用值:
      • EOF:文件末尾
      • BOF:文件开头
      • 正则表达式:在匹配行之后插入
  • insertbefore: 和 insertafter 类似,但是在匹配行之前插入

1)在某一行下面增加新的内容,并且将增加的内容打上标记

- name: add new content with markers
  blockinfile:
    path: /tmp/test.txt
    marker: "# {mark} by wilson"
    insertafter: '^DC'
    block: |
      name: wilson
      city: cd

运行之后查看结果

▶ cat /tmp/test.txt
DC=hello
# BEGIN by wilson
name: wilson
city: cd
# END by wilson

2)删除已标记的内容

- name: Remove the managed block
  blockinfile:
    marker: "# {mark} by wilson"
    path: /tmp/test.txt
    state: absent

添加内容块打上标记之后会让修改与删除都变得非常方便

使用模版

- name: hostname config
  template: src=etc/hostname dest=/etc/hostname

设置文件权限
file

- name: change file 755
  file:
    path: /tmp/test.txt
    owner: wilson
    group: wilson
    state: file
    mode: 755

使用循环
with_items

with_items

给多个目录修改权限

- name: change directory 755
  file:
    path: '{{ item.dir }}'
    owner: wilson
    group: wilson
    state: directory
    mode: '{{ item.mode }}'
  with_items:
    - { dir: '/tmp/1', mode: '0755'}
    - { dir: '/tmp/2', mode: '0755'}

loop

当然使把
with_items
替换成
loop
也是可以的

- name: change directory 755
  file:
    path: '{{ item.dir }}'
    owner: wilson
    group: wilson
    state: directory
    mode: '{{ item.mode }}'
  with_items:
    - { dir: '/tmp/1', mode: '0755'}
    - { dir: '/tmp/2', mode: '0755'}

嵌套循环
with_nested

输出每一种组合,相当于笛卡尔积

- name: nested loop
  debug:
    msg: "{{ item[0] }} {{ item[1] }}"
  with_nested:
    - [ "hello1", "hello2"]
    - [ "world1", "world2"]

输出结果:

TASK [base : nested loop] ****************************************************************************************
ok: [127.0.0.1] => (item=['hello1', 'world1']) => {
    "msg": "hello1 world1"
}
ok: [127.0.0.1] => (item=['hello1', 'world2']) => {
    "msg": "hello1 world2"
}
ok: [127.0.0.1] => (item=['hello2', 'world1']) => {
    "msg": "hello2 world1"
}
ok: [127.0.0.1] => (item=['hello2', 'world2']) => {
    "msg": "hello2 world2"
}

文件内容循环
with_file

逐行打印文件内容

- name: display file content
  debug:
    msg: "{{ item }}"
  with_file:
    - /tmp/test.txt

幂等性

使用roles去管理多设备的时候,编写脚本需要时刻注意幂等性,即每一次执行都要保证同样的结果

联系我

联系我,做深入的交流


至此,本文结束
在下才疏学浅,有撒汤漏水的,请各位不吝赐教...

kubernetes-horizontal-color.png

本文主要分析 k8s 中的 device-plugin 机制工作原理,并通过实现一个简单的 device-plugin 来加深理解。


1. 背景

默认情况下,k8s 中的 Pod 只能申请 CPU 和 Memory 这两种资源,就像下面这样:

resources:
  requests:
    memory: "1024Mi"
    cpu: "100m"
  limits:
    memory: "2048Mi"
    cpu: "200m"

随着 AI 热度越来越高,更多的业务 Pod 需要申请 GPU 资源,
GPU 环境搭建指南:如何在裸机、Docker、K8s 等环境中使用 GPU
中我们分析了如何在 k8s 环境中使用 GPU,就是靠
Device Plugin
机制,通过该机制使得 k8s 能感知到节点上的 GPU 资源,就像原生的 CPU 和 Memory 资源一样使用。

实际上在早期,K8s 也提供了一种名为
alpha.kubernetes.io/nvidia-gpu
的资源来支持 NVIDIA GPU,不过后面也发现了很多问题,每增加一种资源都要修改 k8s 核心代码,k8s 社区压力山大。于是在 1.8 版本引入了
device plugin
机制,通过插件形式来接入其他资源,设备厂家只需要开发对应的 xxx-device-plugin 就可以将资源接入到 k8s 了。

ps:类似的还有引入
CSI
让存储插件从 Kubernetes 内部(in-tree)代码库中分离出来,改为独立的、可插拔的外部组件(out-of-tree),还有
CRI

CNI
等等,这里的 Device Plugin 也能算作其中的一种。

Device Plugin 有两层含义,下文中根据语义自行区分:

  • 首先它可以代表 k8s 中的 Device Plugin framework
  • 其次也可以代表厂家的具体实现,比如 NVIDIA/k8s-device-plugin,就是用于接入 NVIDIA GPU 资源的 Device Plugin 实现

2. 原理

Device Plugin 的工作原理其实不复杂,可以分为
插件注册

kubelet 调用插件
两部分。

  • 插件注册:DevicePlugin 启动时会想节点上的 Kubelet 发起注册,这样 Kubelet就可以感知到该插件的存在了
  • kubelet 调用插件:注册完成后,当有 Pod 申请对于资源时,kubelet 就会调用该插件 API 实现具体功能

如 k8s 官网上的图所示:

deviceplugin-framework-overview.svg

Kubelet 部分

为了提供该功能,Kubelet 新增了一个
Registration
gRPC service:

service Registration {
	rpc Register(RegisterRequest) returns (Empty) {}
}

device plugin 可以调用该接口向 Kubelet 进行注册,注册接口需要提供三个参数:

  • device plugin 对应的 unix socket 名字
    :后续 kubelet 根据名称找到对应的 unix socket,并向插件发起调用

  • device plugin 调 API version
    :用于区分不同版本的插件

  • device plugin 提供的 ResourceName
    :遇到不能处理的资源申请时(CPU和Memory之外的资源),Kubelet 就会根据申请的资源名称来匹配对应的插件


    • ResourceName 需要按照
      vendor-domain/resourcetype
      格式,例如
      nvidia.com/gpu

device plugin 部分

要进行设备管理,device plugin 插件需要实现以下接口:

  • GetDevicePluginOptions
    :这个接口用于获取设备插件的信息,可以在其返回的响应中指定一些设备插件的配置选项,可以看做是插件的元数据

  • ListAndWatch
    :该接口用于列出可用的设备并持续监视这些设备的状态变化。

  • GetPreferredAllocation
    :将分配偏好信息提供给 device plugin,以便 device plugin 在分配时可以做出更好的选择

  • Allocate
    :该接口用于向设备插件请求分配指定数量的设备资源。
  • PreStartContainer
    : 该接口在容器启动之前调用,用于配置容器使用的设备资源。

只有
ListAndWatch

Allocate
两个接口是必须的,其他都是可以选的。

工作流程

一般所有的 Device Plugin 实现最终都会以 Pod 形式运行在 k8s 集群中,又因为需要管理所有节点,因此都会以 DaemonSet 方式部署。

device plugin 启动之后第一步就是向 Kubelet 注册,让 Kubelet 知道有一个新的设备接入了。

为了能够调用 Kubelet 的 Register 接口,Device Plugin Pod 会将宿主机上的 kubelet.sock 文件(unix socket)挂载到容器中,通过 kubelet.sock 文件发起调用以实现注册。

集群部署后,Kubelet 就会启动,

  • 1)Kubelet 启动 Registration gRPC 服务(kubelet.sock),提供 Register 接口

  • 2)device-plugin 启动后,通过 kubelet.sock 调用 Register 接口,向 Kubelet 进行注册,注册信息包括 device plugin 的 unix socket,API Version,ResourceName

  • 3)注册成功后,Kubelet 通过 device-plugin 的 unix socket 向 device plugin 调用 ListAndWatch, 获取当前节点上的资源

  • 4)Kubelet 向 api-server 更新节点状态来记录上一步中发现的资源


    • 此时
      kubelet get node -oyaml
      就能查看到 Node 对象的 Capacity 中多了对应的资源

    5)用户创建 Pod 并申请该资源,调度完成后,对应节点上的 kubelet 调用 device plugin 的 Allocate 接口进行资源分配

大致如下:

k8s-device-plugin-timeline


【Kubernetes 系列】
持续更新中,搜索公众号【
探索云原生
】订阅,阅读更多文章。


3. 实现

源码:
https://github.com/lixd/i-device-plugin

device plugin 实现大致分为三部分:

  • 1)启动时向 Kubelet 发起注册
    • 注意监控 kubelet 的重启,一般是使用
      fsnotify
      类似的库监控 kubelet.sock 的重新创建事件。如果 kubelet.sock 重新创建了,则认为 kubelet 是重启了,那么需要重新注册
  • 2)gRPC Server:主要是实现
    ListAndWatch

    Allocate
    两个方法

实现 gRPC Server

简单起见,这里只实现了
ListAndWatch

Allocate
这两个必须的方法。

对 gRPC 不熟悉的童鞋可以看下这个 -->
gRPC 系列教程

ListAndWatch

这是一个 gRPC 的 Stream 方法,建立长连接,可以持续向 Kubelet 发送设备的信息。

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (c *GopherDevicePlugin) ListAndWatch(_ *pluginapi.Empty, srv pluginapi.DevicePlugin_ListAndWatchServer) error {
	devs := c.dm.Devices()
	klog.Infof("find devices:%s", String(devs))

	err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
	if err != nil {
		return errors.WithMessage(err, "send device failed")
	}

	klog.Infoln("waiting for device update")
	for range c.dm.notify {
		devs = c.dm.Devices()
		klog.Infof("device update,new device list:%s", String(devs))
		_ = srv.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
	}
	return nil
}

发现设备的部分代码如下:

func (d *DeviceMonitor) List() error {
	err := filepath.Walk(d.path, func(path string, info fs.FileInfo, err error) error {
		if info.IsDir() {
			klog.Infof("%s is dir,skip", path)
			return nil
		}

		d.devices[info.Name()] = &pluginapi.Device{
			ID:     info.Name(),
			Health: pluginapi.Healthy,
		}
		return nil
	})

	return errors.WithMessagef(err, "walk [%s] failed", d.path)
}

很简单,就是遍历查看
/etc/gophers
目录下的所有文件,每个文件都会当做一个设备。

然后再启动一个 Goroutine 监控设备的变化,即
/etc/gophers
目录下文件有变化时通过 chan 发送通知,将最新的设备信息发送给 Kubelet。

func (d *DeviceMonitor) Watch() error {
	klog.Infoln("watching devices")

	w, err := fsnotify.NewWatcher()
	if err != nil {
		return errors.WithMessage(err, "new watcher failed")
	}
	defer w.Close()

	errChan := make(chan error)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				errChan <- fmt.Errorf("device watcher panic:%v", r)
			}
		}()
		for {
			select {
			case event, ok := <-w.Events:
				if !ok {
					continue
				}
				klog.Infof("fsnotify device event: %s %s", event.Name, event.Op.String())

				if event.Op == fsnotify.Create {
					dev := path.Base(event.Name)
					d.devices[dev] = &pluginapi.Device{
						ID:     dev,
						Health: pluginapi.Healthy,
					}
					d.notify <- struct{}{}
					klog.Infof("find new device [%s]", dev)
				} else if event.Op&fsnotify.Remove == fsnotify.Remove {
					dev := path.Base(event.Name)
					delete(d.devices, dev)
					d.notify <- struct{}{}
					klog.Infof("device [%s] removed", dev)
				}

			case err, ok := <-w.Errors:
				if !ok {
					continue
				}
				klog.Errorf("fsnotify watch device failed:%v", err)
			}
		}
	}()

	err = w.Add(d.path)
	if err != nil {
		return fmt.Errorf("watch device error:%v", err)
	}

	return <-errChan
}

Allocate

Allocate 则是需要告知 kubelet 怎么将设备分配给容器,这里实现比较简单,就是在对应容器中增加一个环境变量,Gopher=$deviceId

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
func (c *GopherDevicePlugin) Allocate(_ context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
	ret := &pluginapi.AllocateResponse{}
	for _, req := range reqs.ContainerRequests {
		klog.Infof("[Allocate] received request: %v", strings.Join(req.DevicesIDs, ","))
		resp := pluginapi.ContainerAllocateResponse{
			Envs: map[string]string{
				"Gopher": strings.Join(req.DevicesIDs, ","),
			},
		}
		ret.ContainerResponses = append(ret.ContainerResponses, &resp)
	}
	return ret, nil
}

简单看一下 NVIDIA 的 device plugin 是怎么实现 Allocate 的。

// Allocate which return list of devices.
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
	responses := pluginapi.AllocateResponse{}
	for _, req := range reqs.ContainerRequests {
		if err := plugin.rm.ValidateRequest(req.DevicesIDs); err != nil {
			return nil, fmt.Errorf("invalid allocation request for %q: %w", plugin.rm.Resource(), err)
		}
		response, err := plugin.getAllocateResponse(req.DevicesIDs)
		if err != nil {
			return nil, fmt.Errorf("failed to get allocate response: %v", err)
		}
		responses.ContainerResponses = append(responses.ContainerResponses, response)
	}

	return &responses, nil
}

核心其实是这个方法:

// updateResponseForDeviceListEnvvar sets the environment variable for the requested devices.
func (plugin *NvidiaDevicePlugin) updateResponseForDeviceListEnvvar(response *pluginapi.ContainerAllocateResponse, deviceIDs ...string) {
	response.Envs[plugin.deviceListEnvvar] = strings.Join(deviceIDs, ",")
}

给容器添加了一个环境变量,value 为设备 id,具体 deviceID 提供了两种测量,可能是编号或者 uuid

const (
	DeviceIDStrategyUUID  = "uuid"
	DeviceIDStrategyIndex = "index"
)

key 是一个变量 plugin.deviceListEnvvar,初始化如下:

	plugin := NvidiaDevicePlugin{
		deviceListEnvvar:     "NVIDIA_VISIBLE_DEVICES",
		socket:               pluginPath + ".sock",
	  // ...
	}

也就是说 NVIDIA 这个 device plugin 实现 Allocate 主要就是给容器增加了环境变量,例如:

NVIDIA_VISIBLE_DEVICES="0,1"

在文章
GPU 环境搭建指南:使用 GPU Operator 加速 Kubernetes GPU 环境搭建
中提到 GPU Operator 会使用 NVIDIA Container Toolit Installer 安装 NVIDIA Container Toolit。

这个 NVIDIA Container Toolit 的作用就是添加对 GPU 的支持,也包括了识别 NVIDIA_VISIBLE_DEVICES 这个环境变量,然后将对应设备挂载到容器里。

除此之外还会把设备挂载到容器里:

func (plugin *NvidiaDevicePlugin) apiDeviceSpecs(devRoot string, ids []string) []*pluginapi.DeviceSpec {
	optional := map[string]bool{
		"/dev/nvidiactl":        true,
		"/dev/nvidia-uvm":       true,
		"/dev/nvidia-uvm-tools": true,
		"/dev/nvidia-modeset":   true,
	}

	paths := plugin.rm.GetDevicePaths(ids)

	var specs []*pluginapi.DeviceSpec
	for _, p := range paths {
		if optional[p] {
			if _, err := os.Stat(p); err != nil {
				continue
			}
		}
		spec := &pluginapi.DeviceSpec{
			ContainerPath: p,
			HostPath:      filepath.Join(devRoot, p),
			Permissions:   "rw",
		}
		specs = append(specs, spec)
	}

	return specs
}

核心为:

		spec := &pluginapi.DeviceSpec{
			ContainerPath: p,
			HostPath:      filepath.Join(devRoot, p),
			Permissions:   "rw",
		}

这里指定了设备在宿主机上的 Path 和挂载到容器之后的 Path,后续就可以根据这些信息进行设备挂载了。

其他方法

另外几个方法非强制的,因此只做一个空实现。

// GetDevicePluginOptions returns options to be communicated with Device
// Manager
func (c *GopherDevicePlugin) GetDevicePluginOptions(_ context.Context, _ *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
	return &pluginapi.DevicePluginOptions{PreStartRequired: true}, nil
}

// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
func (c *GopherDevicePlugin) GetPreferredAllocation(_ context.Context, _ *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
	return &pluginapi.PreferredAllocationResponse{}, nil
}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
func (c *GopherDevicePlugin) PreStartContainer(_ context.Context, _ *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
	return &pluginapi.PreStartContainerResponse{}, nil
}

向 Kubelet 进行注册

注册也是很简单,调用 deviceplugin 提供的 RegisterRequest 方法即可。

// Register registers the device plugin for the given resourceName with Kubelet.
func (c *GopherDevicePlugin) Register() error {
	conn, err := connect(pluginapi.KubeletSocket, common.ConnectTimeout)
	if err != nil {
		return errors.WithMessagef(err, "connect to %s failed", pluginapi.KubeletSocket)
	}
	defer conn.Close()

	client := pluginapi.NewRegistrationClient(conn)
	reqt := &pluginapi.RegisterRequest{
		Version:      pluginapi.Version,
		Endpoint:     path.Base(common.DeviceSocket),
		ResourceName: common.ResourceName,
	}

	_, err = client.Register(context.Background(), reqt)
	if err != nil {
		return errors.WithMessage(err, "register to kubelet failed")
	}
	return nil
}

监控 kubelet.sock 状态

使用 fsnotify 库监控 kubelet.sock 文件状态,通过 kubelet.sock 文件的变化来判断 kubelet 是否重启,当 kubelet 重启后 device plugin 也需要重启,然后注册到新的 kubelet.sock。

// WatchKubelet restart device plugin when kubelet restarted
func WatchKubelet(stop chan<- struct{}) error {
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		return errors.WithMessage(err, "Unable to create fsnotify watcher")
	}
	defer watcher.Close()

	go func() {
		// Start listening for events.
		for {
			select {
			case event, ok := <-watcher.Events:
				if !ok {
					continue
				}
				klog.Infof("fsnotify events: %s %v", event.Name, event.Op.String())
				if event.Name == pluginapi.KubeletSocket && event.Op == fsnotify.Create {
					klog.Warning("inotify: kubelet.sock created, restarting.")
					stop <- struct{}{}
				}
			case err, ok := <-watcher.Errors:
				if !ok {
					continue
				}
				klog.Errorf("fsnotify failed restarting,detail:%v", err)
			}
		}
	}()

	// watch kubelet.sock
	err = watcher.Add(pluginapi.KubeletSocket)
	if err != nil {
		return errors.WithMessagef(err, "Unable to add path %s to watcher", pluginapi.KubeletSocket)
	}
	return nil
}

为什么需要重新注册

因为Kubelet 中使用一个 map 来存储注册的插件,因此每次 Kubelet 重启都会丢失,所以我们在实现 device plugin 时就要监控 Kubelet 重启状态并重新注册。

Kubelet Register 方法
实现如下:

// /pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go#L143-L165
func (s *server) Register(ctx context.Context, r *api.RegisterRequest) (*api.Empty, error) {
	klog.InfoS("Got registration request from device plugin with resource", "resourceName", r.ResourceName)
	metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()

	if !s.isVersionCompatibleWithPlugin(r.Version) {
		err := fmt.Errorf(errUnsupportedVersion, r.Version, api.SupportedVersions)
		klog.InfoS("Bad registration request from device plugin with resource", "resourceName", r.ResourceName, "err", err)
		return &api.Empty{}, err
	}

	if !v1helper.IsExtendedResourceName(core.ResourceName(r.ResourceName)) {
		err := fmt.Errorf(errInvalidResourceName, r.ResourceName)
		klog.InfoS("Bad registration request from device plugin", "err", err)
		return &api.Empty{}, err
	}

	if err := s.connectClient(r.ResourceName, filepath.Join(s.socketDir, r.Endpoint)); err != nil {
		klog.InfoS("Error connecting to device plugin client", "err", err)
		return &api.Empty{}, err
	}

	return &api.Empty{}, nil
}

核心在 connectClient 方法:

func (s *server) connectClient(name string, socketPath string) error {
	c := NewPluginClient(name, socketPath, s.chandler)

	s.registerClient(name, c)
	if err := c.Connect(); err != nil {
		s.deregisterClient(name)
		klog.ErrorS(err, "Failed to connect to new client", "resource", name)
		return err
	}

	go func() {
		s.runClient(name, c)
	}()

	return nil
}

怎么保存这个 client 的呢?

func (s *server) registerClient(name string, c Client) {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	s.clients[name] = c
	klog.V(2).InfoS("Registered client", "name", name)
}

定义如下:

type server struct {
	socketName string
	socketDir  string
	mutex      sync.Mutex
	wg         sync.WaitGroup
	grpc       *grpc.Server
	rhandler   RegistrationHandler
	chandler   ClientHandler
	clients    map[string]Client // 使用 map 存储,并为持久化
}

main.go

main 方法分为三个部分:

  • 1)启动 gRPC 服务
  • 2)向 Kubelet 进行注册
  • 3)监控 kubelet.sock 状态
func main() {
	klog.Infof("device plugin starting")
	dp := device_plugin.NewGopherDevicePlugin()
	go dp.Run()

	// register when device plugin start
	if err := dp.Register(); err != nil {
		klog.Fatalf("register to kubelet failed: %v", err)
	}

	// watch kubelet.sock,when kubelet restart,exit device plugin,then will restart by DaemonSet
	stop := make(chan struct{})
	err := utils.WatchKubelet(stop)
	if err != nil {
		klog.Fatalf("start to kubelet failed: %v", err)
	}

	<-stop
	klog.Infof("kubelet restart,exiting")
}

4. 测试

部署

首先是部署 i-device-plugin,一般使用 DaemonSet 方式部署,完整 yaml 如下:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: i-device-plugin
  namespace: kube-system
  labels:
    app: i-device-plugin
spec:
  selector:
    matchLabels:
      app: i-device-plugin
  template:
    metadata:
      labels:
        app: i-device-plugin
    spec:
      containers:
        - name: i-device-plugin
          image: docker.io/lixd96/i-device-plugin:latest
          imagePullPolicy: IfNotPresent
          resources:
            limits:
              cpu: "1"
              memory: "512Mi"
            requests:
              cpu: "0.1"
              memory: "128Mi"
          volumeMounts:
            - name: device-plugin
              mountPath: /var/lib/kubelet/device-plugins
            - name: gophers
              mountPath: /etc/gophers
      volumes:
        - name: device-plugin
          hostPath:
            path: /var/lib/kubelet/device-plugins
        - name: gophers
          hostPath:
            path: /etc/gophers

以 hostPath 方式将用到的两个目录挂载到 Pod 里:

  • /var/lib/kubelet/device-plugins:请求 kubelet.sock 发起调用,同时将 device-plugin gRPC 服务的 sock 文件写入该目录供 kubelet 调用
  • /etc/gophers:在该 Demo 中,把 /etc/gophers 目录下的文件作为设备,因此需要将其挂载到 Pod 里。

确保 i-device-plugin 已经启动。

[root@test ~]# kubectl -n kube-system get po
i-device-plugin-vnw6z            1/1     Running   0          17s

初始化

在该 Demo 中,把 /etc/gophers 目录下的文件作为设备,因此我们只需要到 /etc/gophers 目录下创建文件,模拟有新的设备接入即可。

mkdir /etc/gophers

touch /etc/gophers/g1

查看 device plugin 日志

[root@test ~]# kubectl -n kube-system logs -f i-device-plugin-vnw6z
I0719 13:52:24.674737       1 main.go:10] device plugin starting
I0719 13:52:24.675440       1 device_monitor.go:33] /etc/gophers is dir,skip
I0719 13:52:24.675679       1 device_monitor.go:49] watching devices
I0719 13:52:24.682141       1 api.go:22] find devices []
I0719 13:52:24.682315       1 api.go:29] waiting for device update
I0719 13:53:09.369381       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g1 CREATE
I0719 13:53:09.370394       1 device_monitor.go:79] find new device [g1]
I0719 13:53:09.370445       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g1 CHMOD
I0719 13:53:09.370659       1 api.go:32] device update,new device list [g1]

可以看到,已经感知到新增的设备了。

不出意外的话可以在 node 上看到新资源了

[root@test gophers]# k get node n1 -oyaml|grep  capacity -A 7
  capacity:
    cpu: "4"
    ephemeral-storage: 20960236Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    lixueduan.com/gopher: "1"
    memory: 8154984Ki
    pods: "110"

果然,node capacity 中新增了
lixueduan.com/gopher: "1"

创建测试 Pod

接下来创建一个 Pod 申请该资源试试

apiVersion: v1
kind: Pod
metadata:
  name: gopher-pod
spec:
  containers:
  - name: gopher-container
    image: busybox
    command: ["sh", "-c", "echo Hello, Kubernetes! && sleep 3600"]
    resources:
      requests:
        lixueduan.com/gopher: "1"
      limits:
        lixueduan.com/gopher: "1"

Pod 启动成功

[root@test ~]# kubectl get po
NAME         READY   STATUS    RESTARTS   AGE
gopher-pod   1/1     Running   0          27s

之前分配设备是添加 Gopher=xxx 这个环境变量,现在看下是否正常分配

[root@test ~]# kubectl exec -it gopher-pod -- env|grep Gopher
Gopher=g1

ok,环境变量存在,可以看到分配给该 Pod 的设备是 g1。

新增设备

使用同样的 yaml 改下名称再创建一个 Pod

[root@test ~]# k get po
NAME          READY   STATUS    RESTARTS   AGE
gopher-pod    1/1     Running   0          3m9s
gopher-pod2   0/1     Pending   0          2s

因为只有一个 gopher 资源,因此第二个 Pod pending 了。

Events:
  Type     Reason            Age   From               Message
  ----     ------            ----  ----               -------
  Warning  FailedScheduling  7s    default-scheduler  0/1 nodes are available: 1 Insufficient lixueduan.com/gopher. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..

在创建一个设备

touch /etc/gophers/g2

device plugin 立马感知到了设备变化,相关日志如下:

I0719 14:01:00.308599       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g2 CREATE
I0719 14:01:00.308986       1 device_monitor.go:79] find new device [g2]
I0719 14:01:00.309017       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g2 CHMOD
I0719 14:01:00.309141       1 api.go:32] device update,new device list [g2,g1]

node 上的资源数量也更新为 2

[root@argo-1 ~]# k get node argo-1 -oyaml|grep  capacity -A 7
  capacity:
    cpu: "4"
    ephemeral-storage: 20960236Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    lixueduan.com/gopher: "2"
    memory: 8154984Ki
    pods: "110"

然后 pod2 也可以正常启动了

[root@test ~]# kubectl get po
NAME          READY   STATUS    RESTARTS   AGE
gopher-pod    1/1     Running   0          4m31s
gopher-pod2   1/1     Running   0          84s

删除设备

然后删除 g2 设备

rm -rf /etc/gophers/g2

device plugin 也是能正常感知到,相关日志

I0719 14:03:55.904983       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g2 REMOVE
I0719 14:03:55.905203       1 device_monitor.go:84] device [g2] removed
I0719 14:03:55.905267       1 api.go:32] device update,new device list [g1]

查看 Node 上的资源数量更新没有

[root@test ~]# k get node argo-1 -oyaml|grep  capacity -A 7
  capacity:
    cpu: "4"
    ephemeral-storage: 20960236Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    lixueduan.com/gopher: "1"
    memory: 8154984Ki
    pods: "110"

对应资源也变成 1 个了,一切正常。

5. 小结

本文主要分析了 k8s 中的 Device Plugin 机制的工作原理,并实现了一个简单的
i-device-plugin
来进一步加深理解。

Device Plugin 的工作原理其实不复杂,可以分为
插件注册

kubelet 调用插件
两部分:

  • 插件注册:DevicePlugin 启动时会想节点上的 Kubelet 发起注册,这样 Kubelet就可以感知到该插件的存在了
  • kubelet 调用插件:注册完成后,当有 Pod 申请对于资源时,kubelet 就会调用该插件 API 实现具体功能

deviceplugin-framework-overview.svg


【Kubernetes 系列】
持续更新中,搜索公众号【
探索云原生
】订阅,阅读更多文章。


6. 参考

k8s 文档:device-plugins

https://github.com/NVIDIA/k8s-device-plugin

Kubernetes开发知识–device-plugin的实现

Kubelet Register 源码