wenmo8 发布的文章

最近微信出了linux版,用vmware装linux不过瘾,把一台闲置的笔记本装上了Manjaro KDE Plasma,经过一段时间的发展,Linux桌面可用性大大提高。
Kindle->Kindle Mate->Anki这条路在linux下
我用
Kindle ->
KindleVocab
->Anki这么代替了之后,
其他软件都能凑合用,加之用了电信的天翼云电脑后觉得又补全了几乎所有的缺憾

感谢信创,感谢国家,国内大公司出的软件都有信创包了

但是

天翼云电脑只有deb包(
https://desk.ctyun.cn/html/download/)

aur上的也安装不成功

我对yay命令不熟,只有用笨办法解决
首先是下载deb包

装了dpkg
安装失败

看失败信息,把这些包查了下chatgpt后都补上

然后用dpkg的强制安装。虽然报错,但是功能基本正常

前言

最近阅读
Aravis
源码,其中大量运用了GObject,于是打算学习一下。

此系列笔记仅主要面向初学者,不会很深入探讨源码的细节,专注于介绍GObject的基本用法。

此系列笔记参考
GObject Tutorial for beginners

本文可在
个人博客
中阅读,体验更加

套个盾:文中定义的名词只是为了更好地理解GObject,不具备权威性。

类和实例

在GObject中,每个可实例化类类型都与两个结构体相关联:一个是类结构体,一个是实例结构体。

  • 类结构体会被注册到类型系统中(具体注册方式在
    下一节
    讨论),在
    g_object_new
    首次调用时,类型系统会检查相应的类结构体是否已经被初始化为一个类变量,没有则创建并初始化。此后所有该类的实例变量都将共享这个已初始化的类变量。每个类变量只会被创建一次。
  • 每次调用
    g_object_new
    时都会创建实例变量。

在GObject系统中,类结构体和实例结构体都会被实例化,在内存中占有特定的空间。为了便于描述,我们将分配给类结构体的实例称为“类变量”,而分配给实例结构体的实例称为“实例变量”。

GObject实例的结构体定义如下

//file: gobject.h
typedef struct _GObject  GObject;
struct  _GObject
{
  GTypeInstance  g_type_instance;
  
  /*< private >*/
  guint          ref_count;  /* (atomic) */
  GData         *qdata;
};

GObject类的结构体定义如下(我们可以先不用了解结构的细节):

//file: gobject.h
typedef struct _GObjectClass             GObjectClass;
struct  _GObjectClass
{
  GTypeClass   g_type_class;

  /*< private >*/
  GSList      *construct_properties;

  /*< public >*/
  /* seldom overridden */
  GObject*   (*constructor)     (GType                  type,
                                 guint                  n_construct_properties,
                                 GObjectConstructParam *construct_properties);
  /* overridable methods */
  void       (*set_property)		(GObject        *object,
                                         guint           property_id,
                                         const GValue   *value,
                                         GParamSpec     *pspec);
  void       (*get_property)		(GObject        *object,
                                         guint           property_id,
                                         GValue         *value,
                                         GParamSpec     *pspec);
  void       (*dispose)			(GObject        *object);
  void       (*finalize)		(GObject        *object);
  /* seldom overridden */
  void       (*dispatch_properties_changed) (GObject      *object,
					     guint	   n_pspecs,
					     GParamSpec  **pspecs);
  /* signals */
  void	     (*notify)			(GObject	*object,
					 GParamSpec	*pspec);

  /* called when done constructing */
  void	     (*constructed)		(GObject	*object);

  /*< private >*/
  gsize		flags;

  gsize         n_construct_properties;

  gpointer pspecs;
  gsize n_pspecs;

  /* padding */
  gpointer	pdummy[3];
};

下面使用一个简单示例,来演示GObject的类和实例的使用

//file: example01.c
#include <glib-object.h>

int main (int argc, char **argv) 
{

    GObject* instance1,* instance2;     //指向实例的指针
    GObjectClass* class1,* class2;      //指向类的指针
   
    instance1 = g_object_new (G_TYPE_OBJECT, NULL);
    instance2 = g_object_new (G_TYPE_OBJECT, NULL);
    g_print ("The address of instance1 is %p\n", instance1);
    g_print ("The address of instance2 is %p\n", instance2);
 
    class1 = G_OBJECT_GET_CLASS (instance1);
    class2 = G_OBJECT_GET_CLASS (instance2);
    g_print ("The address of the class of instance1 is %p\n", class1);
    g_print ("The address of the class of instance2 is %p\n", class2);
 
    g_object_unref (instance1);
    g_object_unref (instance2);

    return 0;
}

其中:

  • g_object_new
    函数创建实例变量并返回指向它的指针。在实例变量第一次被创建之前,它对应的类变量也会被创建并初始化。
  • 参数
    G_TYPE_OBJECT
    是GObject基类的类型标识符,这是GObject类型系统的核心,所有其他GObject类型都从这个基类型派生。

  • G_OBJECT_GET_CLASS
    返回指向参数所属类变量的指针
  • g_object_unref
    会销毁实例变量并释放内存。

输出:

The address of instance1 is 0x55d3ddc05600
The address of instance2 is 0x55d3ddc05620
The address of the class of instance1 is 0x55d3ddc05370
The address of the class of instance2 is 0x55d3ddc05370

可以发现,两个实例变量的地址不同,但两个实例变量对应的类变量的地址相同,因为两个实例变量共享一个类变量

引用计数

引用计数机制的概念在此不做介绍

在GObject中,GObject实例具有引用计数机制:

//file: example02.c
#include <glib-object.h>
 
static void show_ref_count (GObject* instance) 
{
    if (G_IS_OBJECT (instance))
        /* Users should not use ref_count member in their program. */
        /* This is only for demonstration. */
        g_print ("Reference count is %d.\n", instance->ref_count);
    else
        g_print ("Instance is not GObject.\n");
}
 
int main (int argc, char **argv) 
{
    GObject* instance;

    instance = g_object_new (G_TYPE_OBJECT, NULL);
    g_print ("Call g_object_new.\n");
    show_ref_count (instance);
    g_object_ref (instance);
    g_print ("Call g_object_ref.\n");
    show_ref_count (instance);
    g_object_unref (instance);
    g_print ("Call g_object_unref.\n");
    show_ref_count (instance);
    g_object_unref (instance);
    g_print ("Call g_object_unref.\n");
    g_print ("Now the reference count is zero and the instance is destroyed.\n");
    g_print ("The instance memories are possibly returned to the system.\n");
    g_print ("Therefore, the access to the same address may cause a segmentation error.\n");

    return 0;
}

其中:

  • g_object_new
    创建一个实例变量,然后将变量的引用计数置为1
  • g_object_ref
    将其引用计数加1
  • g_object_unref
    将引用计数减1,如果此时引用计数为0,则析构变量。

输出:

Call g_object_new.
Reference count is 1.
Call g_object_ref.
Reference count is 2.
Call g_object_unref.
Reference count is 1.
Call g_object_unref.
Now the reference count is zero and the instance is destroyed.
The instance memories are possibly returned to the system.
Therefore, the access to the same address may cause a segmentation error.

初始化和析构过程

GObject初始化和销毁的实际过程比较复杂。以下是简单的描述,不做详细说明.

初始化

1.用类型系统注册GObject类型。这是在调用main函数之前的GLib的初始化过程中完成的。(如果编译器是gcc,则
__attribute__ ((constructor))
用于限定初始化函数。)
2.为GObjectClass和GObject结构分配内存
3.初始化GObjectClass结构内存。这个内存将是GObject的类变量。
4.初始化GObject结构内存。这个内存将是GObject的实例变量。

上述初始化过程在第一次调用
g_object_new
函数时执行。在第二次及后续调用
g_object_new
时,它只执行两个过程:①为GObject结构分配内存②初始化内存。

析构

1.销毁GObject实例。释放实例的内存

GObject变量类型是静态类型。静态类型永远不会破坏它的类。因此,即使被销毁的实例变量是最后一个,类变量仍然存在,直到程序终止。

参考文章

1.
GObject Tutorial for beginners

推荐

下一篇:
GObject学习笔记(二)类型创建与注册

简介

PasteForm是贴代码推出的 “新一代CRUD” ,基于ABPvNext,目的是通过对Dto的特性的标注,从而实现管理端的统一UI,借助于配套的PasteBuilder代码生成器,你可以快速的为自己的项目构建后台管理端!目前管理端只有Html+js版本的,后续将支持小程序,Vue等

案例源码

案例源码在

https://gitee.com/pastecode/paste-template

不定期升级

AllInDto!

通过引入PasteForm,一个项目哪怕100个数据表,一般的管理页面也才不到10个,除非有非常多的特殊功能,否则都能用PasteForm中的表格和表单来实现!

image

MarkDown

在开发管理端过程中,有时候也需要使用用到Markdown,之前已经接入了Richtext,本次升级也一并把这个带进去了!
首先你需要从案例项目的PasteTemplate的前端模块
PasteTemplate.HttpApi.Host/wwwroot/page/lib/editor.md/
注意这个里面就是Markdown用到的组件,用的是三方的!

特性信息

如果是字符串,没有设置maxlength,默认就会变更成richtext,也可以手动强制配置,当前特性适用于richtext和markdown

字段 类型 示例 说明
args1 字符 500 配置高度,默认为500
args2 字符 txt 表示另外一个值存储在哪个字段,所以另外一个字段一般设置隐藏
args3 字符 /api/app/Upload/Image 图片上传的地址

以上信息同样适用于richtext

案例UI

image

以上是我用双屏截图的,其实是一个完整的页面,注意看页面的表单中包含了text,textarea,richtext,markdown
那么他对应的dto是如何写的?

Dto对应代码

    /// <summary>
    /// 
    /// </summary>
    public class StringDto
    {
        ///<summary>
        ///姓名 模拟短文本输入
        ///</summary>
        [MaxLength(32)]
        [Required]
        public string Name { get; set; }

        ///<summary>
        ///文本区域 模拟文本区域的输入
        ///</summary>
        [MaxLength(128)]
        public string Desc { get; set; }

        ///<summary>
        ///文本区域 长度大于128则自动为textarea
        ///</summary>
        [MaxLength(256)]
        public string Text { get; set; }

        /////<summary>
        /////文本区域 可以手动指定为textarea,同理你也可以指定为html,text
        /////</summary>
        //[MaxLength(128)]
        //[ColumnDataType("textarea")]
        //public string Mark { get; set; }

        ///<summary>
        ///富文本 模拟富文本,前端HTML的是使用wangEditv5,默认不配置maxlength的就是html
        ///</summary>
        public string Blog { get; set; }

        /////<summary>
        /////MarkDown1 
        /////</summary>
        //[ColumnDataType("markdown")]
        //public string Mark1 { get; set; }

        ///<summary>
        ///MarkDown2 有默认值的
        ///</summary>
        [ColumnDataType("markdown")]
        public string Mark2 { get; set; } = "## 今日成果";

    }

由上面代码可知,只要在对应的字段上配置
[ColumnDataType("markdown")]即可
或者你也可以配置特性为[PasteMarkDown]
他们两个是等效的,可以说ColumnDataTypeAttribute是所有贴代码框架特性的基础属性
像PasteClass,PasteHidden,PasteButton等最终都是为了转化成ColumnDataTypeAttribute

提交信息

上面的UI中,我们是随便填写点东西后,提交,看到的提交信息如下

image

暂时先忽略mark2mdeditor-html-code和mark2mdeditor-markdown-doc字段,这个是markdown框架里面带了name被parseform来了,后续再考虑去掉他!
从上面的提交可以看到mark2是有内容的!

符合预期!

注意

由于markdown和richtext的特殊性,关于字段长度的设置需要按照实际来填写!
看特性信息args2,这个字段如何配置了,则需要对这个字段的特性标注为hidden
这样在UI上args2的字段是不显示的,而提交数据给后台,则可以接收到!

我们下期将介绍select和reload的巧妙结合案例... .. .

0、引言

我们知道,这当代操作系统中,多线程和多进程模型被广泛的使用以提高系统的并发效率。随着互联网不断的发展,面对如今的高并发场景,为每个任务都创建一个线程是不现实的,使用线程则需要系统不断的在用户态和内核态之间不断的切换,引起不必要的损耗,于是引入了协程。协程存在于用户空间,是一种轻量级的并发执行单元,其创建和上下文的开销更小,如何管理数量众多的协程是一个重要的话题。此篇笔记用于分享笔者学习Go语言协程调度的GMP模型的理解,以及源码的实现。当前使用的Go语言版本为1.22.4。

本篇笔记参考了以下文章:

[
Golang三关-典藏版] Golang 调度器 GMP 原理与调度全分析 | Go 技术论坛

Golang GMP 原理

Golang-gopark函数和goready函数原理分析

1、GMP模型拆解

Goroutine调度器的工作是将准备运行的goroutine分配到工作线程上,涉及到的主要概念如下:

1.1、G

G代表的是Goroutine,是Go语言对协程概念的抽象,其有以下的特点:

  • 是一个轻量级的线程
  • 拥有自己的栈、状态、以及执行的任务函数
  • 每一个G会被分配到一个可用的P,并且在M上运行

其结构定义位于runtime/runtime2.go中:

type g struct {
    // ...
    m         *m      
    // ...
    sched     gobuf
    // ...
}

type gobuf struct {
    sp   uintptr
    pc   uintptr
    ret  uintptr
    bp   uintptr // for framepointer-enabled architectures
}

在这里,我们核心关注其内嵌了一个m和一个
gobuf
类型的sched。
gobuf
主要用于Gorutine的上下文切换,其保存了G执行过程中的CPU寄存器的状态,使得G在暂停、调度和恢复运行时能够正确地恢复上下文。

G主要有以下几种状态:

const (
	_Gidle = iota // 0
	_Grunnable // 1
	_Grunning // 2
	_Gsyscall // 3
	_Gwaiting // 4
    //...
	_Gdead // 6
    //...
	_Gcopystack // 8
    _Gpreempted // 9
	//...
)
  • Gidle
    :表示这个G刚刚被分配,尚未初始化。

  • Grunnable
    :表示这个G在运行队列中,它当前不再执行用户代码,栈未被占用。

  • Grunning
    :表示这个G可能在执行用户代码,栈被这个G占用,它不在运行队列中,并且它被分配给了一个M和一个P(g.m和g.m.p是有效的)。

  • Gsyscall
    :表示这个G正在执行系统调用,它不在执行用户代码,栈被这个G占用。它不在运行队列中,并且它被分配给了一个M。

  • Gwaiting
    :表示这G被堵塞在运行时,它没有执行用户代码,也不在运行队列中,但是它应该被记录在某个地方,以便在必要时将其唤醒。(ready())gc、channel 通信或者锁操作时经常会进入这种状态。

  • Gdead
    :表示这个G当前未使用,它可能是刚被初始化,也可能是已经被销毁。

  • Gcopystack
    :表示这个G的栈正在被移动。

  • Gpreempted
    :表示这个G因抢占而被挂起,且该G自行停止,等待进一步的恢复。它类似于
    Gwaiting
    ,但是
    Gpreempted
    还没有一个负责将其状态恢复的管理者,只有某个
    suspendG
    操作将该G的状态从
    Gpreempted
    转换为
    Gwaiting
    ,这样调度器才会接管这个G。

在阅读有关调度逻辑的源码的时候,我们可以通过搜索
casgstatus
方法去定位到使得G状态改变的函数,例如:
casgstatus(gp, _Grunning, _Gsyscall)
表示将该G的状态从Grunning变换到Gsyscall,就可以找到对应的函数学习了。

1.2、M

M是Machine,也是Worker Thread,代表的是操作系统的线程。Go运行时在需要时创建或者销毁M,将G安排到M上执行,充分利用多核CPU的能力。其具有以下的特点:

  • M是Go与操作系统之间的桥梁,它负责执行分配给它的G。
  • M的数量会根据系统资源进行调整。
  • M可能会被特定的G通过
    LockOSThread
    锁定,这种G和M的绑定确保了特定Goroutine可以持续使用同一个线程。

结构定义如下:

type m struct{
	g0      *g     // goroutine with scheduling stack
	curg          *g       // current running goroutine
	tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
	p             puintptr // attached p for executing go code (nil if not executing go code)
	oldp          puintptr // the p that was attached before executing a syscall
	//...
}

每一个M结构体都会有一个名为
g0
的G,它是一个特殊的Goroutine,它并不复杂执行用户的代码,而是负责调度G。g0会分配G绑定到M中执行。
tls
表示的是“Local Thread Storage”,其存储了与当前线程相关的特定信息,而
tls
数组的第一个槽位通常用于存储
g0
的栈指针。

M存在一个状态,名为
“自旋态”
,处在自旋态的M会不断的往全局队列中寻找可运行的G去执行,并且解除自旋态。

1.3、P

P是Processor,代表逻辑处理器,是Goroutine调度的虚拟概念。每个P负责分配执行Goroutine的资源,其具有以下的特点:

  • P是G的执行上下文,它具有一个本地队列存储着G,以及对应的任务调度机制,负责在M上执行一个具体的G。
  • P的数量由环境变量
    GOMAXPROCS
    决定,如果其数量大于CPU的物理线程数量时就没有更多的意义了。
  • P是去执行Go代码所必备的资源,M必须绑定了一个P才能去执行Go代码。但是M可以在没有绑定P的情况下执行系统调用或者被阻塞。
type p struct {
	status      uint32
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	m           muintptr
	runnext guintptr
	//...
}
  • runq存储了这个P具有的goroutine队列,最大长度为256
  • runqhead和runqtail分别指向队列的头部和尾部
  • runnext存储了下一个可执行的goroutine

P也含有几个状态,如下:

const (
	_Pidle = iota
	_Prunning
	_Psyscall
	_Pgcstop
	_Pdead
)
  • Pidle:表示P没有被运行用户代码或者调度器,通常这个P在空闲P列表中,供调度器使用,但它也可能在其他状态之间转换。P由空闲队列
    idle list
    或者其他转换其状态的对象拥有,它的
    runq
    是空的。
  • Prunning:表示P被M拥有,并且正在运行用户代码或者调度器。只有拥有此P的M被允许更改P的状态,M可以将P转换为Pidle(当没有工作的时候)、Psyscall(当进入一个系统调用时)、Pgcstop(安顿垃圾回收时)。M还可以将P的所有权交接给另一个M(例如调度一个locked的G)
  • Psyscall:表示P没有在运行用户代码,与在系统调用中的M相关但不被其拥有。处于Psyscall状态的P可能会被其他M抢走。将P转换给另一个M是轻量级的,并且P会保持和原始的M的关联性。
  • Pgcstop:表示P被暂停以进行STW(Stop The World)(执行垃圾回收)。
  • Pdead:表示P不再被使用(GOMAXPROCS减少)。死去的P将会被剥夺资源,但是任然会保留少量的资源例如Trace Buffer,用于后续的跟踪分析需求。

1.4、Schedt

schedt
是全局goroutine队列的封装

type schedt struct {
    // ...
    lock mutex
    // ...
    runq     gQueue
    runqsize int32![](https://img2024.cnblogs.com/blog/3542244/202411/3542244-20241117153220788-1594654379.png)

    // ...
}
  • lock:是操作全局队列的锁
  • runq:存储G的队列
  • runqsize:全局G队列的容量

2、调度模型的工作流程

我们可以用下图来整体的表示该调度模型的流程:

在接下来的部分,我们将主要探讨GMP调度模型是怎么完成一轮调度的,即是如何完成g0到g再到g0的切换的,期间大致发生了什么。

2.1、G的状态转换

我们刚刚提及到,每一个M都有一个名为
g0
的Goroutine,去负责调度普通的g绑定到M上执行。g0和普通的g之间存在一个转换,当执行普通的g上的代码的时候,就会将执行权交给g,当g执行完代码或者因为原因需要被挂起、退出执行等,就会重新将执行权交给g0。

g0和P是一个协作的关系,P的队列决定了哪些goroutine可以在绑定P时被调用,而g0是执行调度逻辑的关键的goroutine,负责在必要时释放P的资源。

当g0需要将执行权交给g时,会调用一个名为
gogo
的方法,传入g的栈指针,去执行用户的代码。

func gogo(buf *gobuf)

当需要重新将执行权转交给g0时,都会执行一个名为
mcall
的方法。

func mcall(fn func(*g))

mcall在go需要进行协程调换时被调用,它传入一个回调函数
fn
,里面携带了当前正在运行的g的指针,它主要做了以下三点的工作:

  • 保存当前g的信息,即将PC/SP的信息存储到g->sched中,保证后续可以恢复g的执行现场。
  • 将当前M的堆栈从g切换到g0
  • 在g0的栈上执行新的函数fn,通常在fn中会进一步安排g的去向,并且调用
    schedule
    函数,让当前M去寻找另一个可以执行的G。

2.2、调度类型

我们现在知道了,g和g0是通过什么函数进行状态切换的。接下来我们就要来探讨,它们是什么情况下要进行切换,即调度策略有什么。

GMP调度模型一共有4种调度策略,分别为:
主动调度

被动调度

正常调度

抢占调度

  • 主动调度:提供给用户的方法,当用户调用了runtime.Gosched()方法时,此时当前的g会让出执行权,将g安排进任务队列等待下一次被调度。
  • 被动调度:当因不满足某种执行条件,通常为channel读写条件不满足时,会执行gopark()函数,此时的g将会被置为等待状态。
  • 正常调度:g正常的执行完毕,转接执行权。
  • 抢占调度:存在一个全局监控者moniter,它会每隔一段时间周期去检查是否有G运行太长时间,若发现了,将会通知P去进行和M的解绑,让出P。这里需要全局监控者的存在是因为当G进入到系统调用的时候,这个线程M会陷入僵持,无法主动去检查,需要外援辅助。

2.3、宏观调度流程

接下来我们来关注整体一轮的调度流程,对于g0和g的一轮调度,可以用下图来表示。

schedule
作为每一轮调度的开始,它会寻找到可以执行的G,然后调用
execute
将该g绑定到一个线程M上,然后执行
gogo
方法去真正的运行一个goroutine。当需要转换时,goroutine会在底层执行
mcall
方法,保存栈信息,然后执行回调函数
fn
,即绿框内的方法之一,将执行权重新交给g0。

2.3.1、schedule()

schedule()
方法定位于
runtime/proc
中,忽略非主流程部分,源码内容如下:

//找到一个是就绪态的G去运行
func schedule() {
	mp := getg().m

	//...

top:
	pp := mp.p.ptr()
	pp.preempt = false

	//如果该M在自旋,但是队列含有G,那么抛出异常。
	if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}

	gp, inheritTime, tryWakeP := findRunnable() //阻塞的寻找G

	
    //...

	//当前M将要运转一个G,解除自旋状态
	if mp.spinning {
		resetspinning()
	}

	//...

	execute(gp, inheritTime)
}

该方法主要是寻找一个可以运行的G,交给该线程去运行。我们在一开始提到,线程会存在一种名为
“自旋态”
的状态,它会不断的自旋去寻找可以执行的G来执行,成功找到了就解除了自旋态。

这里存在一个点我们值得去注意,处在自旋态的线程它不是在空占用计算资源吗?那么不就是降低了系统的性能吗?

其实这是一个中和的策略,假如每次当出现了一个新的Goroutine需要去执行的时候,我们才创建一个线程M去执行它,然后执行完了又删除掉不去复用,那么就会带来大量的创建销毁的资源消耗。我们希望当有一个新的Goroutine来的时候,能立即有一个M去执行它,就可以将空闲暂时无任务处理的M去自己寻找Goroutine,减少了创建销毁的资源消耗。但是我们也不能有太多的处于自旋态的线程,不然就造就另一个过多消耗的地方了。

我们先跟进一下
resetspinning()
,看看其执行的策略是什么。

1、resetspinning()

func resetspinning() {
	gp := getg()
	//...
	gp.m.spinning = false
	nmspinning := sched.nmspinning.Add(-1)
	//...
	wakep()
}



//尝试添加一个P去执行G。该方法被调用当一个G状态为runnable时。
func wakep() {
    //如果自旋的M数量不为0则返回
	if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
		return
	}

	// 禁用抢占,直到 pp 的所有权转移到 startm 中的下一个 M,否则在这里的抢占将导致 pp 被卡在等待进入 _Pgcstop 状态。
	mp := acquirem()

	var pp *p
	lock(&sched.lock)
    //尝试从空闲P队列获取一个P
	pp, _ = pidlegetSpinning(0)
	if pp == nil {
		if sched.nmspinning.Add(-1) < 0 {
			throw("wakep: negative nmspinning")
		}
		unlock(&sched.lock)
		releasem(mp)
		return
	}
	
	unlock(&sched.lock)

	startm(pp, true, false)

	releasem(mp)
}


resetspinning
中,我们先将当前M解除了自旋态,然后尝试去唤醒一个P,即进入到
wakep()
方法中。

if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
		return
	}

在wakep方法内,我们先检查了当前处在自旋的M的数量,假如>0,则不再去唤醒一个新的P,这是为了防止同一时间内过多的自旋的M空运转消耗CPU资源。

pp, _ = pidlegetSpinning(0)
	if pp == nil {
		if sched.nmspinning.Add(-1) < 0 {
			throw("wakep: negative nmspinning")
		}
		unlock(&sched.lock)
		releasem(mp)
		return
	}

接着会尝试从空闲P队列中获取一个P,如果没有空闲的P,那么此时会减少自旋线程的数量(这里只是减少了数量,但是具体这个处在自旋的线程接下来去做什么了我也没有明白)并且返回。

startm(pp, true, false)

假如获取了一个空闲的P,会为这一个P分配一个线程M。

2、findRunnable()

findRunnable是一轮调度流程中最核心的方法,它用于找到一个可执行的G。

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
	mp := getg().m
top:
    pp := mp.p.ptr()
	//...
 	
    //每61次调度周期就检查一次全局G队列,防止在特定情况只依赖于本地队列。
	if pp.schedtick%61 == 0 && sched.runqsize > 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 1)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
    //...
    // local runq
	if gp, inheritTime := runqget(pp); gp != nil {
		return gp, inheritTime, false
	}

	// global runq
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
    
    //在正式的去偷取G之前,用非阻塞的方式检查是否有就绪的网络协程,这是对netpoll的一个优化。
	if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
		if list, delta := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			injectglist(&list)
			netpollAdjustWaiters(delta)
			trace := traceAcquire()
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.ok() {
				trace.GoUnpark(gp, 0)
				traceRelease(trace)
			}
			return gp, false, false
		}
	}
    
    //如果当前的M出于自旋状态,或者说处于自旋状态的M的数量小于活跃的P数量的一半时,则进行G窃取。(防止当系统的并行度较低时,自旋的M过多占用CPU资源)
	if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
		if !mp.spinning {
			mp.becomeSpinning()
		}

		gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}
		if newWork {
			// There may be new timer or GC work; restart to
			// discover.
			goto top
		}

		now = tnow
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			// Earlier timer to wait for.
			pollUntil = w
		}
	}
    
    //...

其主要的执行步骤如下:

(一)第六十一次调度
if pp.schedtick%61 == 0 && sched.runqsize > 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 1)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}

首先检查P的调度次数,假如这次是P的第61此次调度,并且全局的G队列长度>0,就会从全局队列获取一个G。这是为了防止在特定情况下,只运行本地队列的G,忽视了全局队列。

其内部调用的
globrunqget
方法主流程如下:

//尝试从G的全局队列获取一批G
func globrunqget(pp *p, max int32) *g {
	assertLockHeld(&sched.lock)
	//检查全局队列是否为空
	if sched.runqsize == 0 {
		return nil
	}

    //计算需要获取的G的数量
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}
	if max > 0 && n > max {
		n = max
	}
    //确保从队列中获取的G数量不超过当前本地队列的G数量的一半,避免全局队列所有的G都转移到本地队列中导致负载不均衡
	if n > int32(len(pp.runq))/2 {
		n = int32(len(pp.runq)) / 2
	}
	sched.runqsize -= n

	gp := sched.runq.pop()
	n--
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(pp, gp1, false)
	}
	return gp
}
//计算需要获取的G的数量
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}
	if max > 0 && n > max {
		n = max
	}
	if n > int32(len(pp.runq))/2 {
		n = int32(len(pp.runq)) / 2
	}

n为要从全局G队列获取的G的数量,可以看到它会至少获取一个G,至多获取
runqsize/gomaxprocs+1
个G,它保证了一个P不过多的获取G从而影响负载均衡。并且不允许n一次获取全局G队列一半以上的G,保证负载均衡。

gp := sched.runq.pop()
	n--
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(pp, gp1, false)
	}

决定好获取多少个G后,第一个G会直接通过指针返回,剩余的则是将其添加到P的本地队列中。

在当前(一)的调用中,函数设置了max值为1,因此只会从全局队列获取1个G返回。


虽然在(一)中不会执行
runqput
,但是我们还是来看看是怎么将G添加到P的本地队列的。

// runqput尝试将G放到本地队列中
//如果next是False,runqput会将G添加到本地队列的尾部
//如果是True,runqput会将G添加到下一个将被调度的G的槽位
//如果运行队列满了,那么将会把g放回全局队列
func runqput(pp *p, gp *g, next bool) {
    //
	if randomizeScheduler && next && randn(2) == 0 {
		next = false
	}

	if next {
	retryNext:
		oldnext := pp.runnext
		if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&pp.runqhead) //加载队列头的位置
	t := pp.runqtail
	if t-h < uint32(len(pp.runq)) { //检查本地队列是否已满
		pp.runq[t%uint32(len(pp.runq))].set(gp) //未满将gp插入runqtail的指定位置
		atomic.StoreRel(&pp.runqtail, t+1) //更新runtail,表示插入的G可供消费
		return
	}
	if runqputslow(pp, gp, h, t) { //如果本地队列已满,则尝试放回全局队列
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}
if randomizeScheduler && next && randn(2) == 0 {
		next = false
	}

在第一步中,我们看到即使
next
被设置为true,即要求了该G应该被放置在本地P队列的
runnext
槽位中,
也会有概率地将next置为false

if next {
	retryNext:
		oldnext := pp.runnext
		if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

假如next仍为true,此时先获取原本P调度器中,runnext槽位的G(oldnext),然后会不断地尝试将新的G替换掉旧的G直到成功为止。当成功之后,在下面的操作流程中会把旧的G放入到P的本地队列中。

retry:
	h := atomic.LoadAcq(&pp.runqhead) //加载队列头的位置
	t := pp.runqtail
	if t-h < uint32(len(pp.runq)) { //检查本地队列是否已满
		pp.runq[t%uint32(len(pp.runq))].set(gp) //未满将gp插入runqtail的指定位置
		atomic.StoreRel(&pp.runqtail, t+1) //更新runtail,表示插入的G可供消费
		return
	}
	if runqputslow(pp, gp, h, t) { //如果本地队列已满,则尝试放回全局队列
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}

在将G加入进P的本地队列的流程中,需要获取队列头部和尾部的坐标,用来判断本地队列是否已满,未满则将G插入进本地队列的尾部中。否则执行
runqputslow
方法,尝试放回全局队列。


接下来继续跟进
runqputslow
方法的执行流程。

//将G和一批工作(本地队列的G)放入到全局队列
func runqputslow(pp *p, gp *g, h, t uint32) bool {
	var batch [len(pp.runq)/2 + 1]*g //本地队列一半的G

	// First, grab a batch from local queue.
	n := t - h
	n = n / 2
	if n != uint32(len(pp.runq)/2) {
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
	}
	if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
		return false
	}
	batch[n] = gp

	if randomizeScheduler { //打乱顺序
		for i := uint32(1); i <= n; i++ {
			j := cheaprandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

	// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}
	var q gQueue
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// Now put the batch on global queue.
	lock(&sched.lock)
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
	return true
}

其执行流程如下:

var batch [len(pp.runq)/2 + 1]*g //本地队列一半的G

首先创建一个batch数组,是容量为P的本地队列当前含有的G的数量的一半,用于存储将转移的G。

n := t - h
	n = n / 2
	if n != uint32(len(pp.runq)/2) {
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
	}

接着,开始将本地队列一半的G的指针,存储在batch中。

if randomizeScheduler { //打乱顺序
		for i := uint32(1); i <= n; i++ {
			j := cheaprandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

然后会打乱batch中的顺序,保证随机性。

// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}
	var q gQueue
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// Now put the batch on global queue.
	lock(&sched.lock)
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
	return true

最后一部是将batch中的各个G用指针连接起来,转换为
链表
的形式,并且链接在全局队列中。

runqput
连接的流程较长,用下图来概括:

(二)本地队列获取
// local runq
	if gp, inheritTime := runqget(pp); gp != nil {
		return gp, inheritTime, false
	}

假如不是第61次调用,
findrunnable
会尝试从本地队列中获取一个G用于调度。我们来看runqget方法的执行。

// 从本地可运行队列中获取 g。
func runqget(pp *p) (gp *g, inheritTime bool) {
	// 如果有 runnext,则它是下一个要运行的 G。
	next := pp.runnext
    // 如果 runnext 非零且 CAS 操作失败,它只能被另一个 P 窃取,因为其他 P 可以竞争将 runnext 设置为零,但只有当前 P 可以将其设置为非零。
	// 因此,如果 CAS 失败,则无需重试该操作。
	if next != 0 && pp.runnext.cas(next, 0) {
		return next.ptr(), true
	}

	for {
		h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
		t := pp.runqtail
		if t == h {
			return nil, false
		}
		gp := pp.runq[h%uint32(len(pp.runq))].ptr()
		if atomic.CasRel(&pp.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

假如可以获取到P的runnext,则返回这一个G,否则就获取本地队列的头部的G。

(三)全局队列获取
// global runq
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}

假如无法从本地队列获取到G,则说明了P的本地队列为空,此时会尝试从全局队列获取G。调用了
globrunqget
方法从全局队列获取G,注意此时因为设置了max为0表示不生效,该方法
可能会从全局队列中获取多个G放到P的本地队列内
。关于该方法的具体代码已经在(一)中讲解。

(四)网络事件获取
    //在正式的去偷取G之前,用非阻塞的方式检查是否有就绪的网络协程,这是对netpoll的一个优化。
	if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
		if list, delta := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			injectglist(&list)
			netpollAdjustWaiters(delta)
			trace := traceAcquire()
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.ok() {
				trace.GoUnpark(gp, 0)
				traceRelease(trace)
			}
			return gp, false, false
		}
	}

假如本地队列和全局队列都没有G可以获取,此时我们将进入GMP调度模型的一个特殊机制:
WorkStealing
,即从其他的P调度器中偷取其本地队列的G到自己的本地队列中,这是GMP调度模型独有的机制,可以更加充分地利用线程提高系统整体效率。

在此之前,会先尝试用非阻塞的方式获取准备就绪的网络协程,如果有则先执行网络协程。

为什么在携程的调度中,还要专门引入对网络协程事件的检测?这一部分不应该解耦吗?

这是我自己的一个思考,我认为这应该是Go的运行时的设计原则的一个方面体现。runtime的主要任务是负责
协程调度

资源管理
,但是在实际应用中,网络事件的处理通常会和协程调度紧密关联。Go使用
非阻塞网络轮询机制
(netpoll)允许在有网络事件发生时能快速的唤醒和调度相应的协程去处理,在进行了一次本地队列和全局队列的检查后,进行一次网络协程的检查能保证对网络I/O的快速响应。

(五)工作窃取
	if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
		if !mp.spinning {
			mp.becomeSpinning()
		}

		gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}
		//...
	}

当本地队列和全局队列都没有G时,此时会进行工作窃取机制,尝试从其他调度器P中窃取G。

if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
		if !mp.spinning {
			mp.becomeSpinning()
		}

如果当前的
自旋的M的数量<空闲的P的数量的一半
,就会将当前M设置为自旋态。

gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}

调用
stealWork
进行窃取。


func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
	pp := getg().m.p.ptr()

	ranTimer := false

    //最多从其他P窃取4次任务
	const stealTries = 4
	for i := 0; i < stealTries; i++ {
        //在进行最后一次的遍历前,优先检查其他P的Timer队列
		stealTimersOrRunNextG := i == stealTries-1
		//随机生成遍历起点
		for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
			//...
			p2 := allp[enum.position()]
			if pp == p2 {
				continue
			}

			
			//...

			//如果P是非空闲的,则尝试窃取
			if !idlepMask.read(enum.position()) {
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}

	//如果在所有尝试中均未找到可运行的 Goroutine 或 Timer,则返回 nil,并返回 pollUntil(下一次轮询的时间)。
	return nil, false, now, pollUntil, ranTimer
}
const stealTries = 4
	for i := 0; i < stealTries; i++ {

当前P会尝试从其他的P的本地队列中进行窃取,最多会进行4次。

for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
			//...
			p2 := allp[enum.position()]
			if pp == p2 {
				continue
			}

			
			//...

			//如果P是非空闲的,则尝试窃取
			if !idlepMask.read(enum.position()) {
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}

使用
runqsteal
方法进行窃取。


//从p2偷去一半的工作到p中
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
	t := pp.runqtail
	n := runqgrab(p2, &pp.runq, t, stealRunNextG)
	if n == 0 {
		return nil
	}
	n--
	gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()
	if n == 0 {
		return gp
	}
	h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
	if t-h+n >= uint32(len(pp.runq)) {
		throw("runqsteal: runq overflow")
	}
	atomic.StoreRel(&pp.runqtail, t+n) // store-release, makes the item available for consumption
	return gp
}

runqsteal
方法会将p2的本地队列中偷取其一半的G放到p的本地队列中,我们进而跟进
runqgrab
方法;


func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
		h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
		t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
		n := t - h
		n = n - n/2
		if n == 0 {
			if stealRunNextG {
				//尝试偷取P的下一个将要调度的G
				if next := pp.runnext; next != 0 {
                    //如果P正在运行,为了避免产生频繁的任务状态“抖动”,互相抢占任务导致的调度竞争,所以休眠一会,等待P调度完成再尝试获取。
					if pp.status == _Prunning {
						if !osHasLowResTimer {
							usleep(3)
						} else {
							osyield()
						}
					}
                    //尝试窃取任务
					if !pp.runnext.cas(next, 0) {
						continue
					}
                    //窃取成功
					batch[batchHead%uint32(len(batch))] = next
					return 1
				}
			}
			return 0
		}
        //如果n超过队列一半,则由于并发访问导致h和t不一致,要重新开始。
		if n > uint32(len(pp.runq)/2) { // read inconsistent h and t
			continue
		}
        //从runq批量抓取任务
		for i := uint32(0); i < n; i++ {
			g := pp.runq[(h+i)%uint32(len(pp.runq))]
			batch[(batchHead+i)%uint32(len(batch))] = g
		}
		if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
			return n
		}
	}
}


n=n-n/2
我们可以得知,是获取一半数量的G。

通过
stealWork->runqsteal->runqgrab
的方法链路,完成了将其他P的本地队列G搬运到当前P的本地队列中的过程。

(六)总览

最后,我们用绘图来整体回顾
findRunnable
的执行流程。

2.3.2、execute()

当我们成功的通过
findRunnable()
找到了可以被执行的G的时候,就会对当前的G调用
execute()
方法,开始去调用这个G。

func execute(gp *g, inheritTime bool) {
	mp := getg().m


	//绑定G和M
	mp.curg = gp
	gp.m = mp
    //更改G的状态
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + stackGuard
	if !inheritTime {
        //更新P的调度次数
		mp.p.ptr().schedtick++
	}
	//....
	//执行G的任务
	gogo(&gp.sched)
}

可以看到
execute
的主要任务就是将当前的G和M进行绑定,即把G分配给这个线程M,然后调整它的状态为执行态,最后调用
gogo
方法完成对用户方法的运行。

2.3.3、mcall()

从2.3.2小节中我们知道,执行的execute函数完成了g0和g的切换,将对M的执行权交给了g,然后调用了
gogo
方法运行g。当需要重新将M的执行权从g切换到g0的时候,需要执行
mcall()
方法,完成切换。
mcall()
方法的作用我们在2.1小节中提到过,该方法是通过汇编语言实现的,主要的作用是完成了对g的栈信息的保存、将当前堆栈从g切换到g0、在g0的栈上执行mcall方法中传入的
fn
回调函数。

什么时候调用
mcall()
,就涉及到我们在2.2小节讲到了调度类型了。接下来我们通过源码一一分析。

1、主动调度

主动调度是提供给用户的让权方法,执行的是runtime包下的
Gosched
方法。

func Gosched() {
	checkTimeouts()
	mcall(gosched_m)
}

Gosched方法就调用了mcall,并且传入回调函数
gosched_m

// Gosched continuation on g0.
func gosched_m(gp *g) {
	goschedImpl(gp, false)
}

func goschedImpl(gp *g, preempted bool) {
	//...
	casgstatus(gp, _Grunning, _Grunnable)// 将Goroutine状态从运行中更改为可运行状态
	//...

	dropg()//解绑G和M
	lock(&sched.lock)
	globrunqput(gp)//将G放入到全局队列中,等待下一次的调度
	unlock(&sched.lock)

	//...

	schedule()// 调用调度器,从全局队列或本地队列选择下一个Goroutine运行
}

gosched_m
完成了对G的状态的转换,然后调用
dropg
将M和G解绑,再将G放回到全局队列里面,最终调用schedule进行新一轮的调度。

2、被动调度

当当前G需要被被动调用的时候,就会调用
goprak()
,将其置为阻塞态,等待别人的唤醒。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {
	//...
	mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
	mp := getg().m

	trace := traceAcquire()

	casgstatus(gp, _Grunning, _Gwaiting)
	//...

	dropg()

	//...
	schedule()
}

gopark
内部调用了
mcall(park_m)

park_m
将G的状态置为waiting,并且将M和G解绑,然后开启新一轮的调度。

进入等待的G需要被动的被其他事件唤醒,此时就会调用
goready
方法。

func goready(gp *g, traceskip int) {
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}


//ready 函数的作用是将指定的 Goroutine (gp) 标记为“可运行”状态并将其放入运行队列。它会在 Goroutine 从等待(_Gwaiting)状态转换为可运行(_Grunnable)状态时使用,以确保调度器能够选择并执行它。
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
	status := readgstatus(gp)

	// Mark runnable.
	mp := acquirem() // 获取当前线程(M),并禁止其被抢占,以避免将 P 错误地保留在本地变量中。
    //确认G的状态
	if status&^_Gscan != _Gwaiting {
		dumpgstatus(gp)
		throw("bad g->status in ready")
	}
	//...
	casgstatus(gp, _Gwaiting, _Grunnable)
	//....
    //将该G放入到当前P的运行队列
	runqput(mp.p.ptr(), gp, next)
    //检查是否有空闲的 P,若有则唤醒,以便它能够处理新加入的可运行 Goroutine。
	wakep()
    //释放当前 M 的锁,以重新允许抢占。
	releasem(mp)
}

ready
方法会将G的状态重新切换成运行态,并且将G放入到P的运行队列里面。从代码中我们可以看到,被唤醒的G并不会立刻执行,而是加入到本地队列中等待下一次被调度。

3、正常调度

假如G被正常的执行完毕,就会调用
goexit1()
方法完成g和g0的切换。

func goexit1() {
	//...
	mcall(goexit0)
}


// goexit continuation on g0.
func goexit0(gp *g) {
	gdestroy(gp)
	schedule()
}

最终,协程G被销毁,并且开启新一轮的调度。

4、抢占调度

抢占调度最为复杂,因为它需要全局监控者m去检查所有的P是否被长期阻塞,这需要花时间去检索,而不能直接锁定到哪个P需要被抢占。全局监控者会调用
retake()
方法去检查,其流程如下:

//retake 函数用于在 Go 的调度器中处理一些调度策略,确保 Goroutine 的执行不被长时间阻塞。它通过检查所有的处理器 (P),尝试中断过长的系统调用并在合适的条件下重新夺回 P 的控制权。
func retake(now int64) uint32 {
	n := 0
	lock(&allpLock)
	for i := 0; i < len(allp); i++ {
		pp := allp[i]
		if pp == nil {
			continue
		}
		pd := &pp.sysmontick
		s := pp.status
		sysretake := false
		if s == _Prunning || s == _Psyscall {
            //// 如果 `P` 的状态为 `_Prunning` 或 `_Psyscall`,则检查其运行时长。
			t := int64(pp.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
                //超过最大运行时间,抢占P
				preemptone(pp)
				//如果处于系统调用状态,`preemptone()` 无法中断 P,因为没有 M 绑定到 P。
				sysretake = true
			}
		}
		if s == _Psyscall {
			// 如果 `P` 在系统调用中停留超过 1 个监控周期,则尝试收回。
			t := int64(pp.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
            //如果当前P的运行队列为空,切存在至少一个自旋的M,并且未超出等待时间则跳过回收
			if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			// 为了获取 `sched.lock`,先释放 `allpLock`
			unlock(&allpLock)
			
            //回收操作...
            handoffp(pp)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}
for i := 0; i < len(allp); i++ {
		pp := allp[i]
		if pp == nil {
			continue
		}

逐一的获取P,进行检查。

if s == _Prunning || s == _Psyscall {
            //// 如果 `P` 的状态为 `_Prunning` 或 `_Psyscall`,则检查其运行时长。
			t := int64(pp.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
                //超过最大运行时间,抢占P
				preemptone(pp)
				//如果处于系统调用状态,`preemptone()` 无法中断 P,因为没有 M 绑定到 P。
				sysretake = true
			}
		}

当P的运行时间超过最大运行时间的时候,就会调用
preemptone
方法,尝试去抢占P。

值得注意的地方是,
preemptone
方法是设计成
“尽力而为”
的,因为并发的存在,
我们并不能确保它一定能通知到我们需要解绑的G
,因为可能会存在以下状况:

  • 当我们尝试去发出抢占通知P上的G需要停止运行的时候,可能在发出通知的过程,这个G就完成运行,调用到下一个G了,我们可能会通知了错误的G。
  • 当G进入到系统调用的状态的时候,P和M就会解绑,我们也通知不到G了。
  • 就算通知到了目标的G,它也可能在执行newstack,此时会忽略请求。

因此,
preemptone
方法
只会尝试在自己未和M解绑以及m上的g此时不是g0的情况下,将
gp.preempt
置为true,表示发出了通知便返回true。具体的抢占将可能会在未来的某一时刻发生。

if s == _Psyscall {
			// 如果 `P` 在系统调用中停留超过 1 个监控周期,则尝试收回。
			t := int64(pp.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
            //如果当前P的运行队列为空,切存在至少一个自旋的M,并且未超出等待时间则跳过回收
			if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			// 为了获取 `sched.lock`,先释放 `allpLock`
			unlock(&allpLock)
			
            //回收操作...
    if atomic.Cas(&pp.status, s, _Pidle) {
        //....
        	handoffp(pp)
    }

		}

当满足以下三个条件的时候,就会执行抢占调度:

  • p的本地队列有等待执行的G
  • 当前没有空闲的p和m
  • 执行系统调用的时间超过10ms

此时就会调用抢占调度,先将p的状态置为idle,表示可以被其他的M获取绑定,然后调用
handoffp
方法。

func handoffp(pp *p) {
	// handoffp must start an M in any situation where
	// findrunnable would return a G to run on pp.

	// if it has local work, start it straight away
	if !runqempty(pp) || sched.runqsize != 0 {
		startm(pp, false, false)
		return
	}
	// if there's trace work to do, start it straight away
	if (traceEnabled() || traceShuttingDown()) && traceReaderAvailable() != nil {
		startm(pp, false, false)
		return
	}
	// if it has GC work, start it straight away
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) {
		startm(pp, false, false)
		return
	}
	// no local work, check that there are no spinning/idle M's,
	// otherwise our help is not required
	if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) { // TODO: fast atomic
		sched.needspinning.Store(0)
		startm(pp, true, false)
		return
	}
	lock(&sched.lock)
	if sched.gcwaiting.Load() {
		pp.status = _Pgcstop
		sched.stopwait--
		if sched.stopwait == 0 {
			notewakeup(&sched.stopnote)
		}
		unlock(&sched.lock)
		return
	}
	if pp.runSafePointFn != 0 && atomic.Cas(&pp.runSafePointFn, 1, 0) {
		sched.safePointFn(pp)
		sched.safePointWait--
		if sched.safePointWait == 0 {
			notewakeup(&sched.safePointNote)
		}
	}
	if sched.runqsize != 0 {
		unlock(&sched.lock)
		startm(pp, false, false)
		return
	}
	// If this is the last running P and nobody is polling network,
	// need to wakeup another M to poll network.
	if sched.npidle.Load() == gomaxprocs-1 && sched.lastpoll.Load() != 0 {
		unlock(&sched.lock)
		startm(pp, false, false)
		return
	}

	// The scheduler lock cannot be held when calling wakeNetPoller below
	// because wakeNetPoller may call wakep which may call startm.
	when := nobarrierWakeTime(pp)
	pidleput(pp, 0)
	unlock(&sched.lock)

	if when != 0 {
		wakeNetPoller(when)
	}
}

当我们满足以下情况之一的时候,就会为当前的P新分配一个M进行调度:

  • 全局队列不为空或者本地队列不为空,即有可以运行的G。
  • 需要有trace去执行。
  • 有垃圾回收的工作需要执行。
  • 当前时刻没有自旋的线程M并且没有空闲的P(表示当前时刻任务繁忙)。
  • 当前P是唯一在运行的P,并且有网络事件等待处理。

当满足五个条件之一的时候,都会进入到
startm()
方法中,为当前的P分配一个M。


func startm(pp *p, spinning, lockheld bool) {
	mp := acquirem()
	if !lockheld {
		lock(&sched.lock)
	}
	if pp == nil {
		if spinning {
		}
		pp, _ = pidleget(0)
		if pp == nil {
			if !lockheld {
				unlock(&sched.lock)
			}
			releasem(mp)
			return
		}
	}
	nmp := mget()
	if nmp == nil {
		id := mReserveID()
		unlock(&sched.lock)

		var fn func()
		if spinning {
			fn = mspinning
		}
		newm(fn, pp, id)

		if lockheld {
			lock(&sched.lock)
		}
		releasem(mp)
		return
	}
	//...
	releasem(mp)
}
if pp == nil {
		if spinning {
		}
		pp, _ = pidleget(0)
		if pp == nil {
			if !lockheld {
				unlock(&sched.lock)
			}
			releasem(mp)
			return
		}
	}

假如传入的pp是nil,那么会自动设置为空闲p队列中的第一个p,假如仍然为nil表示当前没有空闲的p,会退出方法。

nmp := mget()
	if nmp == nil {
		id := mReserveID()
		unlock(&sched.lock)

		var fn func()
		if spinning {
			fn = mspinning
		}
		newm(fn, pp, id)

		if lockheld {
			lock(&sched.lock)
		}
		releasem(mp)
		return
	}

然后会尝试获取当前的空闲的m,假如不存在则新创建一个m。

至此,关于GMP模型的节选部分的讲解就完成了,可能有许多我理解的不对的地方欢迎大家讨论,谢谢观看。

RAG 系统高效检索提升秘籍:如何精准选择 BGE 智源、GTE 阿里与 Jina 等的嵌入与精排模型的完美搭配

Text Embedding 榜单:MTEB、C-MTEB
《MTEB: Massive Text Embedding Benchmark(海量文本嵌入基准)》
判断哪些文本嵌入模型效果较好,通常需要一个评估指标来进行比较,《MTEB: Massive Text Embedding Benchmark(海量文本嵌入基准)》就是一个海量文本嵌入模型的评估基准

  • 论文地址:
    https://arxiv.org/abs/2210.07316
    MTEB 包含 8 个语义向量任务,涵盖 58 个数据集和 112 种语言。通过在 MTEB 上对 33 个模型进行基准测试,建立了迄今为止最全面的文本嵌入基准。我们发现没有特定的文本嵌入方法在所有任务中都占主导地位。这表明该领域尚未集中在一个通用的文本嵌入方法上,并将其扩展到足以在所有嵌入任务上提供最先进的结果
  • github 地址:
    https://github.com/embeddings-benchmark/mteb#leaderboard

1.BGE - 智源 [★]

语义向量模型(Embedding Model)是语言模型生态体系中的重要组成部分,这一技术被广泛应用于搜索(Search)、问答(QA)、大语言模型检索增强(RAG)等应用场景之中。智源 BGE(BAAI General Embedding)模型自去年 8 月发布后广受好评,被开源爱好者集成至 LangChain、Llama_index 等项目,全球下载量已达 713 万次。

近日,智源发布了 BGE 家族新成员——通用语义向量模型 BGE-M3,支持超过 100 种语言,具备领先的多语言、跨语言检索能力,全面且高质量地支撑 “句子”、“段落”、“篇章”、“文档” 等不同粒度的输入文本,最大输入长度为 8192,并且一站式集成了稠密检索、稀疏检索、多向量检索三种检索功能,在多个评测基准中达到最优水平。

BGE-M3 是首个集多语言(Multi-Linguality)、多粒度(Multi-Granularity)、多功能(Multi-Functionality)三大技术特征于一体的语义向量模型,极大提升了语义向量模型在现实世界的可用性。目前,BGE-M3 已向社区全面开源并支持免费商用许可

  • 论文:
    https://arxiv.org/abs/2402.03216
    M3-Embedding: Multi-Linguality, Multi-Functionality, Multi-Granularity Text Embeddings Through Self-Knowledge Distillation

本文介绍了一种名为 M3-Embedding 的新嵌入模型,该模型在多语言性、多功能性和多粒度方面具有独特优势。它为超过 100 种工作语言的语义检索提供了统一支持。它可以同时完成三种常见的检索功能:密集检索、多向量检索和稀疏检索。此外,它还能处理不同粒度的输入,从短句到长达 8,192 个标记的长文档。M3-Embedding 的有效训练提出了一系列技术贡献。值得注意的是,我们提出了一种新颖的自知识蒸馏方法,其中不同检索功能的相关性分数可以作为教师信号整合,以增强训练质量。我们还优化了批处理策略,实现了大批量和高训练吞吐量,以提高嵌入的判别性。M3-Embedding 在我们的实验中表现出卓越的性能,在多语言、跨语言和长文档检索基准测试中取得了新的最先进成果


检索流程的建议:建议使用 “
混合检索 + 重排序
” 的方式进行检索。

混合检索 (Hybrid Retrieval)
利用多种方法的优势,提升检索准确率和泛化能力。经典的混合检索方案包括结合嵌入检索与 BM25 算法,现在你可以使用支持密集与稀疏检索的 BGE-M3,在生成密集嵌入的同时获得类似 BM25 的词元权重。可以参考 Vespa 和 Milvus 实现混合检索。

重排序 (Re-Ranking)
模型通过交叉编码器的方式,比双编码器模型具有更高的准确率。例如,在检索后使用 bge-reranker 或 bge-reranker-v2 进一步筛选文本。

  • 技术规格
模型名 维度 序列长度 介绍
BAAI/bge-m3 1024 8192 多语言,基于统一的微调(密集、稀疏、ColBERT)
BAAI/bge-m3-unsupervised 1024 8192 对比学习训练,来自 bge-m3-retromae
BAAI/bge-large-en-v1.5 1024 512 英文模型
BAAI/bge-base-en-v1.5 768 512 英文模型
BAAI/bge-small-en-v1.5 384 512 英文模型
  • 不同检索方法的介绍
  1. 密集检索
    : 通过将文本映射到单一嵌入向量进行检索,例如 DPR、BGE-v1.5。
  2. 稀疏检索(词汇匹配)
    : 通过计算文本中出现的词元权重,常用模型如 BM25、unicoil、splade。
  3. 多向量检索
    : 使用多个向量来表示文本,例如 ColBERT。
  • 对于嵌入检索,可以按照 BGE 的用法使用 BGE-M3,只是 BGE-M3 不再需要为查询添加指令。

  • 对于混合检索,可以使用 Vespa 和 Milvus。

  • BGE-M3 使用教程

  1. 安装:

    git clone https://github.com/FlagOpen/FlagEmbedding.git
    cd FlagEmbedding
    pip install -e .
    
    
    

    或直接安装:

    pip install -U FlagEmbedding
    
    
    
  2. 生成密集嵌入
    :

    from FlagEmbedding import BGEM3FlagModel
    model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)
    sentences = ["What is BGE M3?", "Definition of BM25"]
    embeddings = model.encode(sentences, batch_size=12, max_length=8192)['dense_vecs']
    
    
    
  3. 生成稀疏嵌入
    :

    from FlagEmbedding import BGEM3FlagModel
    model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)
    output = model.encode(sentences, return_dense=True, return_sparse=True)
    lexical_weights = output['lexical_weights']
    print(model.convert_id_to_token(lexical_weights))
    
    
    
  4. 生成多向量嵌入
    :

    from FlagEmbedding import BGEM3FlagModel
    model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)
    output = model.encode(sentences, return_dense=True, return_sparse=True, return_colbert_vecs=True)
    print(model.colbert_score(output['colbert_vecs'][0], output['colbert_vecs'][1]))
    
    
    
  5. 文本对评分
    :

    sentence_pairs = [[i,j] for i in sentences_1 for j in sentences_2]
    scores = model.compute_score(sentence_pairs, max_passage_length=128, weights_for_different_modes=[0.4, 0.2, 0.4])
    
    
    

BGE-M3 提供了高效的多功能、多语言、多粒度文本嵌入解决方案,适用于多种复杂的检索任务。
在基于 Milvus 的
工程化应用
中,使用 BGE-M3 模型进行混合语义搜索可以极大提升搜索的准确性和效果。下面将介绍如何在 Milvus 环境中应用 BGE-M3 模型进行密集与稀疏向量的混合搜索,并使用 BGE CrossEncoder 模型对搜索结果进行重新排序。

  • 环境准备
  1. 安装依赖
    需要安装
    pymilvus
    作为客户端连接 Milvus 服务器,并安装
    pymilvus[model]
    以支持 BGE-M3 模型嵌入功能:

    pip install pymilvus
    pip install pymilvus[model]
    
    
    
  2. 设置 Milvus 环境
    Milvus 支持在 2.4.0 及以上版本中进行稀疏向量搜索,因此需要确保本地 Milvus 环境版本符合要求。可以通过 Docker 安装最新版本的 Milvus:

    docker run -d --name milvus-standalone -p 19530:19530 -p 9091:9091 milvusdb/milvus:v2.4.0
    
    
    
  • 应用场景示例

在这个示例中,我们使用 BGE-M3 模型对文本进行密集和稀疏向量的嵌入,并将结果插入到 Milvus 数据库中进行搜索和排序。你可以选择随机生成向量,或使用 BGE-M3 模型生成高质量的密集和稀疏向量表示。

  • 实现步骤
  1. 嵌入文本为密集和稀疏向量
    使用 BGE-M3 模型将文档和查询转换为向量表示:

    from pymilvus.model.hybrid import BGEM3EmbeddingFunction
    ef = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
    dense_dim = ef.dim["dense"]
    
    docs_embeddings = ef(docs)
    query_embeddings = ef([query])
    
    
    
  2. 创建 Milvus 集合
    定义包含文本、密集向量和稀疏向量的集合:

    fields = [
        FieldSchema(, dtype=DataType.VARCHAR, is_primary=True, auto_id=True, max_length=100),
        FieldSchema(, dtype=DataType.VARCHAR, max_length=512),
        FieldSchema(, dtype=DataType.SPARSE_FLOAT_VECTOR),
        FieldSchema(, dtype=DataType.FLOAT_VECTOR, dim=dense_dim),
    ]
    schema = CollectionSchema(fields, "")
    col = Collection("hybrid_demo", schema, consistency_level="Strong")
    
    
    
  3. 插入数据并创建索引
    将生成的稀疏和密集向量插入集合并创建索引:

    entities = [docs, docs_embeddings["sparse"], docs_embeddings["dense"]]
    col.insert(entities)
    col.create_index("sparse_vector", {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"})
    col.create_index("dense_vector", {"index_type": "FLAT", "metric_type": "IP"})
    col.load()
    
    
    
  4. 进行混合搜索
    使用稀疏和密集向量进行搜索,并结合 BGE CrossEncoder 模型对结果重新排序:

    res = col.hybrid_search([sparse_req, dense_req], rerank=RRFRanker(), limit=k, output_fields=['text'])
    
    
    
  5. 结果展示
    如果使用 BGE CrossEncoder 模型对结果进行排序,可以看到查询相关性更高的结果。

2.GTE - 阿里 [★]

GTE 模型,也称为 General Text Embeddings,是阿里巴巴达摩院推出的文本 Embedding 技术。

此前,通义实验室推出了 GTE(General Text Embedding)系列文本向量模型,涵盖了基于 BERT 架构的模型及基于 Qwen LLM 系列训练的 LLM embedding 模型,如
gte-Qwen2-1.5B-instruct

gte-Qwen2-7B-instruct
。目前,基于双向注意力的 Encoder-only 结构的模型在同一规模下相较于 Decoder-only 模型在召回和排序效果上明显更优。然而,当前基于 Encoder-only 的 Embedding 和 Ranking 模型仍面临一些来自 BERT 时代的遗留问题,例如最长上下文长度仅为 512,以及预训练阶段使用的语料明显不足。为解决这些问题,GTE 模型开发团队从零开始训练了一种能支持长上下文和多语言的 Encoder-only 基础模型,并在此基础上推出了最新版本的 GTE-MultiLingual 系列模型(简称 mGTE 模型)。该系列模型具备以下显著特点:

  • 高性能
    :在多个数据集上与同规模开源模型的对比中,对比效果领先。
  • 长文档支持
    :Embedding 和 Reranker 均可处理 8k token 文本长度,且支持通过 ntk-rope 等方法扩展到更长的上下文。
  • 多语言支持
    :模型支持 75 种语言,涵盖当前主要大模型所支持的所有语种。
  • 弹性向量表示(Elastic Embedding)
    :模型支持输出 128-768 维度之间的任意向量表示,以便在性能和存储成本之间取得最佳平衡。在 128 维的情况下,与 768 维相比,召回性能损失小于 2%,同时节省 6 倍的存储空间。
  • 稀疏向量表示(Sparse Embedding)
    :模型可以输出句子中每个单词的词权重作为稀疏表示,适用于需要精确匹配的场景。

                                                         图 1 文本表征和文本排序模型架构示意图

mGTE 系列模型构建流程如图 2 所示,首先,训练了支持长下文的多语言 Encoder-only 底座模型 GTE-base-multilinguish。并在底座基础上继续训练文本表示模型
gte-multilingual-base
和排序模型
gte-multilingual-base

                                                              图 2 模型训练过程示意图

  • 模型结构

    为了提升模型多语言以及长文本领域相关的能力,该系列模型参考了目前 Decode-Only 架构大语言模型训练过程中一些常见的技巧,对原始的 BERT 架构做了以下几点改动,具体模型结构如图 3 所示


    • 位置编码: 将 BERT 模型中采用的绝对位置 embedding 方式改为了旋转位置编码 RoPE [1],以便能更好的支持长上下文的训练,同时保持上下文长度扩展的可能性。
    • 激活函数: 将 BERT 模型中线性层(FFN)部分改为了 GLU(gated linear unit,[2]),这也是在 LLM 训练过程中已经经过充分验证能有效提升模型训练稳定性的技巧。

此外,为了满足多语言和长文本处理能力的需求,模型使用了 XLM-Roberta[3] 系列的词表。

                                        图 3 GTE 底座模型结构示意图

效果评测

1. 检索效果评测

为了评测文本表征模型的检索效果,尤其是模型的多语言和长文本处理能力,mGTE 模型主要在以下几个数据集进行评测

  • MLDR[9]: 多语言长文档检索评测集, 包括 13 个语种数据。
  • MIRACL[10]: 多语言检索评测集合, 包含 18 个语种数据。
  • MKQA[11]: 跨语言检索评测集,包含 25 个不同的语种。
  • BEIR[12]: 英文多领域检索评测集合。
  • LoCo[13]:英文长文档检索评测集合。

表 3 展示了在这 5 个数据集上 mGTE 模型和同规模模型的效果对比:

  • 受益于原生的长文本底座训练,mGTE 表征模型在长文本检索效果上明显优于其它模型。
  • 在短文本检索场景, mGTE 对比同规模的模型效果大幅度领先,对比更大规模的模型效果也很接近。
  • Sparse 向量检索效果在大部分场景优于 BM25,特别在长文档场景对比现有 Dense 向量检索有明显效果优势。

2. 多任务文本表征效果评测

MTEB[14] 是一个涵盖多任务文本表示的通用评测数据集,英语、法语和波兰语这四种不同语言上对 mGTE 模型与其他模型的性能进行了对比。类似于检索任务,mGTE 模型在与开源社区中同规模的 Encoder-only 系列模型进行比较时表现出非常好的效果。当然,与更大型的基于 LLM 的模型相比,mGTE 仍存在明显差距。然而,考虑到 mGTE 小模型在推理性能方面的优势,其在实际应用场景中应具备更大的发挥空间。

3. 排序效果评测

类似地,mGTE-reranker 对 MLDR、MIRACL、MKQA 和 BEIR 数据集上的排序结果进行了评估。所有的排序模型都基于 mGTE-TRM-base 模型生成的向量召回的结果 top100 进行重新排序。具体结果如下:

                                                                                         图 5 排序模型结果对比图

  • 各个排序模型相比召回模型均表现出更佳的性能,证明了在检索链路中引入排序模型的必要性。
  • 与同尺寸甚至更大尺寸的模型相比,mGTE-reranker-base 模型在各个数据集上均取得了相当甚至更好的效果,尤其是在多语言长文档的检索场景中。

模型的使用方法可以参考 Huggingface 上的样例:

  • Embedding 模型:
import torch.nn.functional as F
from transformers import AutoModel, AutoTokenizer

input_texts = [
    "what is the capital of China?",
    "how to implement quick sort in python?",
    "北京",
    "快排算法介绍"
]

model_path = 'Alibaba-NLP/gte-multilingual-base'
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModel.from_pretrained(model_path, trust_remote_code=True)


batch_dict = tokenizer(input_texts, max_length=8192, padding=True, truncation=True, return_tensors='pt')

outputs = model(**batch_dict)

dimension=768 
embeddings = outputs.last_hidden_state[:, 0][:dimension]

embeddings = F.normalize(embeddings, p=2, dim=1)
scores = (embeddings[:1] @ embeddings[1:].T) * 100
print(scores.tolist())



  • Ranking 模型:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('Alibaba-NLP/gte-multilingual-reranker-base')
model = AutoModelForSequenceClassification.from_pretrained('Alibaba-NLP/gte-multilingual-reranker-base', trust_remote_code=True)
model.eval()

pairs = [["中国的首都在哪儿","北京"], ["what is the capital of China?", "北京"], ["how to implement quick sort in python?","Introduction of quick sort"]]
with torch.no_grad():
    inputs = tokenizer(pairs, padding=True, truncation=True, return_tensors='pt', max_length=512)
    scores = model(**inputs, return_dict=True).logits.view(-1, ).float()
    print(scores)


3.E5 Embedding - 微软

E5-embedding 是由 intfloat 团队研发的一款先进的 Embedding 模型。E5 的设计初衷是为各种需要单一向量表示的任务提供高效且即用的文本 Embedding,与其他 Embedding 模型相比,E5 在需要高质量、多功能和高效的文本 Embedding 的场景中表现尤为出色。

以下是 E5-embedding 的一些特点:

  1. 新的训练方法:E5 采用了 “EmbEddings from bidirEctional Encoder rEpresentations” 这一创新方法进行训练,这意味着它不仅仅依赖传统的有标记数据,也不依赖低质量的合成文本对。
  2. 高质量的文本表示:E5 能为文本提供高质量的向量表示,这使得它在多种任务上都能表现出色,尤其是在需要句子或段落级别表示的任务中。
  3. 多场景:无论是在 Zero-shot 场景还是微调应用中,E5 都能提供强大的现成文本 Embedding,这使得它在多种 NLP 任务中都有很好的应用前景。
BEIR # of layers embedding dimension Huggingface
E5-small-v2 49.0 12 384 intfloat/e5-small-v2
E5-base-v2 50.3 12 768 intfloat/e5-base-v2
E5-large-v2 50.6 24 1024 intfloat/e5-large-v2
E5-small 46.0 12 384 intfloat/e5-small
E5-base 48.8 12 768 intfloat/e5-base
E5-large 50.0 24 1024 intfloat/e5-large
E5-small-unsupervised 40.8 12 384 intfloat/e5-small-unsupervised
E5-base-unsupervised 42.9 12 768 intfloat/e5-base-unsupervised
E5-large-unsupervised 44.2 24 1024 intfloat/e5-large-unsupervised

The models with
-unsupervised
suffix only pre-trains on unlabeled datasets.

  • Multilingual Pre-trained Models
BEIR # of layers embedding dimension Huggingface
multilingual-e5-small 46.6 12 384 intfloat/multilingual-e5-small
multilingual-e5-base 48.9 12 768 intfloat/multilingual-e5-base
multilingual-e5-large 51.4 24 1024 intfloat/multilingual-e5-large
multilingual-e5-large-instruct 52.5 24 1024 intfloat/multilingual-e5-large-instruct

4.Jina Embedding V3 [★]

github

https://github.com/jina-ai/serve

huggingface
:
https://huggingface.co/jinaai/jina-embeddings-v3

jina-embeddings-v3 是一个前沿的多语言文本嵌入模型,具有 570M 个参数和 8192 个令牌长度,优于 OpenAI 和 Cohere 在 MTEB 上的最新专有嵌入。

Jina Embeddings V3 来了,这款 5.7 亿参数的顶级文本向量模型,在多语言和长文本检索任务上达到当前最佳水平 SOTA。内置多种 LoRA 适配器,可以根据你的需求,针对 检索、聚类、分类和匹配 的不同场景进行定制,获得更精准的向量化效果。

  • 多语言支持: 支持 89 种语言,全面超越 multilingual-e5-large-instruct

支持的语言:
虽然基础模型支持 100 种语言,将调整工作重点放在以下 30 种语言上: 阿拉伯语、孟加拉语、中文、丹麦语、荷兰语、英语、芬兰语、法语、格鲁吉亚语、德语、希腊语、印地语、印尼语、意大利语、日语、韩语、拉脱维亚语、挪威语、波兰语、葡萄牙语、罗马尼亚语、俄语、斯洛伐克语、西班牙语、瑞典语、泰语、土耳其语、乌克兰语、乌尔都语和越南语。

  • 长文本处理: 支持 8192 token 的输入长度,在 LongEmbed 基准测试中表现出色

  • 任务定制更精准: 内置多种 LoRA 适配器,针对检索、聚类、分类和匹配等任务,生成定制化向量,效果更精准。


    retrieval.query:用于非对称检索任务中的查询嵌入
    retrieval.passage:用于非对称检索任务中的段落嵌入
    separation:用于聚类和重新排序应用程序中的嵌入
    classification:用于分类任务中的嵌入
    text-matching:用于量化两个文本之间的相似度的任务中的嵌入,例如 STS 或对称检索任务

  • 输出维度可定制: 默认输出维度为 1024,但你完全可以根据需要把它缩减到 32,性能几乎不受影响,这都归功于俄罗斯套娃表示学习技术的加持。

  • 论文连接

    https://arxiv.org/abs/2409.10173

  • Model Architecture 模型架构


java -embeddings-v3 的关键创新是使用了 LoRA 适配器。介绍了五个特定于任务的 LoRA 适配器,以优化四个任务的嵌入。模型的输入由两部分组成: 文本 (要嵌入的长文档) 和任务。jina-embeddings-v3 支持 4 个任务并实现 5 个适配器供您选择: 用于非对称检索任务中的查询和通道嵌入的 retrievalquery 和 retrievalpassage,用于聚类任务的分离,用于分类任务的分类,以及用于涉及语义相似性的任务的文本匹配,例如 STS 或对称检索。LoRA 适配器占总参数的比例不到 3%,因此给计算增加的开销非常小。

为了进一步提高性能和减少内存消耗,我们集成了 FlashAttention 2,支持激活检查点,并使用 DeepSpeed 框架进行高效的分布式训练。

jina-embeddings-v3 的性能已在多种多语言和跨语言 MTEB 任务中进行评估

jina-embeddings-v3 在 LongEmbed 基准测试的六个长文档检索任务上的表现,相较于其他模型有显著提升。得分是 nDCG@10;得分越高越好。这表明基于 RoPE 的位置嵌入的有效性,它优于 baai-bge-m3 使用的固定位置嵌入和 jina-embeddings-v2 中使用的基于 ALiBi 的方法。


Scaling law of embedding models. The average MTEB performance on English tasks is plotted against the number of model parameters. Each dot represents an embedding model. The trendline, representing all models, is highlighted, with multilingual models emphasized in cyan. One can see that jina-embeddings-v3 demonstrates superior performance compared to models of similar size, also showing a superlinear improvement over its predecessor, jina-embeddings-v2. This graph was created by selecting top-100 embedding models from the MTEB leaderboard , excluding those without size information, typically closed-source or proprietary models. Submissions identified as obvious trolling were also filtered out

5.Cohere

  • 官网网址

    https://cohere.com/

  • cohere-python

    https://github.com/cohere-ai/cohere-python
    Cohere 为各种阅读和写作任务训练大型语言模型 (LLMs),例如摘要、内容创建和情感分析。其语言模型针对三个主要用例进行了优化:检索文本(retrieving text)、生成文本(generating text)和分类文本(classifying text)

  • 检索文本(Retrieving Text)
    Cohere 产品中提供了三个主要 API 端点,这些端点专注于检索文本以进行搜索、聚类和推荐。


    • Embed(嵌入):该端点提供英语和 100 多种语言的准确嵌入,使用户能够发现趋势、比较语言并根据数据构建自己的文本分析应用程序。

    • Semantic Search(语义搜索):为用户提供语义搜索功能,可根据含义(而不仅仅是关键字)查找文本、文档和文章。它允许开发人员为任何语言构建更好的搜索系统。

    • Rerank(重排):重新排名提高了关键字或矢量搜索系统的搜索质量。它可以帮助系统更好地理解上下文以及问题和查询的含义,同时承诺对现有系统进行最小程度的修改。

# python -m pip install cohere --upgrade

import cohere

co = cohere.ClientV2("<<apiKey>>")
response = co.chat(
    model="command-r-plus", 
    messages=[{"role": "user", "content": "hello world!"}]
)

print(response)

6.Instructor - 港大

Instructor 是由香港大学自然语言处理实验室团队推出的一种指导微调的文本 Embedding 模型。该模型可以生成针对任何任务(例如分类、检索、聚类、文本评估等)和领域(例如科学、金融等)的文本 Embedding,只需提供任务指导,无需任何微调。Instructor 在 70 个不同的 Embedding 任务(MTEB 排行榜)上都达到了最先进的性能。该模型可以轻松地与定制的 sentence-transformer 库一起使用。

Instructor 模型主要特点如下:

  • 多任务适应性:只需提供任务指导,即可生成针对任何任务的文本 Embedding。
  • 高性能:在 MTEB 排行榜上的 70 个不同的 Embedding 任务上都达到了最先进的性能。
  • 易于使用:与定制的 sentence-transformer 库结合使用,使得模型的使用变得非常简单。

此外,模型还提供了其他使用案例,如计算句子相似性、信息检索和聚类等。

7.XLM-Roberta-Facebook

XLM-Roberta(简称 XLM-R)是 Facebook AI 推出的一种多语言版本的 Roberta 模型。它是在大量的多语言数据上进行预训练的,目的是为了提供一个能够处理多种语言的强大的文本表示模型。XLM-Roberta 模型在多种跨语言自然语言处理任务上都表现出色,包括机器翻译、文本分类和命名实体识别等。

模型主要特点如下:

  • 多语言支持:XLM-Roberta 支持多种语言,可以处理来自不同语言的文本数据。
  • 高性能:在多种跨语言自然语言处理任务上,XLM-Roberta 都表现出了最先进的性能。
  • 预训练模型:XLM-Roberta 是在大量的多语言数据上进行预训练的,这使得它能够捕获跨语言的文本表示。

8.text-embedding-ada-002 --OpenAI

官网:
https://platform.openai.com/docs/guides/embeddings/use-cases

text-embedding-ada-002 是一个由 Xenova 团队开发的文本 Embedding 模型。该模型提供了一个与 Hugging Face 库兼容的版本的 text-embedding-ada-002 分词器,该分词器是从 openai/tiktoken 适应而来的。这意味着它可以与 Hugging Face 的各种库一起使用,包括 Transformers、Tokenizers 和 Transformers.js。

模型主要特点如下:

  • 兼容性:该模型与 Hugging Face 的各种库兼容,包括 Transformers、Tokenizers 和 Transformers.js。
  • 基于 openai/tiktoken:该模型的分词器是从 openai/tiktoken 适应而来的。
  1. 精排模型推荐


专业级语义搜索优化:利用 Cohere AI、BGE Re-Ranker 及 Jina Reranker 实现精准结果重排

参考文章:
https://blog.csdn.net/sinat_39620217/article/details/141850425

文本嵌入技术 Text Embedding 模型详解:text2vec、OpenAI ada-002 到 M3E 及 BGE-M3 的演变

相关榜单:

https://huggingface.co/spaces/mteb/leaderboard

10 xinference 推理

文章参考:
Xinference 实战指南:全面解析 LLM 大模型部署流程

Xorbits Inference (Xinference) 是一个开源平台,用于简化各种 AI 模型的运行和集成。借助 Xinference,您可以使用任何开源 LLM、嵌入模型和多模态模型在云端或本地环境中运行推理,并创建强大的 AI 应用。通过 Xorbits Inference,可以轻松地一键部署你自己的模型或内置的前沿开源模型