2024年1月

聚类算法

前言

作为数模小白,看了很多讲解新概念新模型的文章,这些文章往往要么讲的很浅不讲原理只讲应用,让人知其然不知其所以然。要么讲的很深小白看不懂,同时总是忽略关键部分,经常性引入陌生概念让初学者疑惑,因此有了本文,任何能熟练掌握线性代数知识且逻辑思维能力尚可的人都可以理解,而无需其他数模知识,涉及任何新概念都会先讲解后使用

所以我就打算把自己学完之后的笔记整理出来,其中每一篇文章中可能含有直接从其他文章中搜集过来的公式,因为这些文章原本只是我自己的笔记,我懒得敲公式,只是写完之后想着整理出来,弘扬一下互联网精神,于是就有了这个系列。

综述

聚类算法广义上属于分类算法的一种,但我们平时所说的分类算法往往指的是狭义分类算法,因此我们还是需要明确狭义分类(Classification)算法和聚类(Clustering)算法的区别的。

分类算法关注的问题是通过一个数据类别已知的训练数据集来获取分类方法,以便对数据类别未知的数据集进行分类,属于监督学习算法。而聚类算法是直接拿着一个数据类别未知的数据集就通过统计学手段进行分类,属于无监督学习算法。

监督学习算法上来便拿到了给定的类别且分好类了的正确数据集,因此将未知数据集分类后我们也能清楚的知道每个类别的意义,但是,像降维算法(如前文讲过的主成分分析)和聚类算法这样的无监督学习方法,我们就并不清楚每个分类到底是依据什么进行划分的、含义为何了,需要认为的对分类结果进行分析

聚类算法的分类

划分类聚类方法

划分类聚类方法需要事先认为指定cluster的数目或者聚类中心,直到达到“同一cluster内的点足够近,不同cluster的点足够远”的目标。

k-means及其变体k-means++、bi-kmeans、kernel k-means等就是常用的划分式聚类方法,让我们依次进行介绍

k-means算法

先进行可视化数据,然后通过观察指定聚类数目,然后进行以下流程:

假设xi是数据点,μi是初始的聚类质心是数据点,L(x)=
iShot_2024-01-21_13.31.29
是该算法中度量聚类效果的指标,L(x)越小则聚类效果越好,因此经典K-means算法的目标函数可以写成Lmin(x)=

我们先随机选择k个初始质心,即初始聚类中心,然后遍历数据集中的每一个数据点,将数据点分配到质心距离其最近的那个cluster中,然后计算每个簇的均值,将均值作为新的质心,然后再重新遍历每个数据点,周而复始,直到L(X)足够小,达到事先指定的标注为止。

这里要引入一个概念,凸优化函数和非凸优化函数,我们将图像的极小值定义为最优解,那么凸优化函数指只有一个极小值的函数,换句话说极小值(我们找到的最优解)一定是最小值(全局最优解)的函数,我们找到的最优解一定就是全局最优解。而非凸优化函数指函数有多个极小值,我们找的最优解很可能不是全局最优解,而是局部最优解。

而由于该算法中的Lmin(x)就是一个非凸优化函数,换句话说我们很可能收敛于局部最优解而不是全局最优解,具体收敛于哪个极值点取决于我们选取的初始质心点,因此科学家提出了K-means的改进算法来解决这个问题

(忘掉数学里的凸函数凹函数的定义吧,凸优化函数和非凸优化函数最早就是指数学里的凸函数和凹函数,但时过境迁,计算机领域中的这一概念已经发生了极大的语义偏移,如果还用数学中的概念进行解释则会造成极大的误解和不变)

K-means++算法

K-means++更改了初始数据点选取方法,K-means++的选取算法如下:

先随机选取一个数据点作为初始聚类中心,然后计算每个数据点和已有最近聚类中心的距离,距离越大被选中作为下一个聚类中心的概率也就越大,然后运用轮盘法随机抽取下一个聚类中心,直到聚类中心数量达到实现设定的数量为止。

  1. means++仅仅优化了初始质心点选取这步,其他步骤与K-mean基本一致,但尽管如此也大大减小了收敛于局部最优解的可能性。

matlab中提供了kmeans函数,默认情况下kmeans函数执行的就是kmeans++算法

同时,K值该如何确定呢?一般我们算出各个K值的D值,D=类内平均距离/类间平均距离,然后还出D关于K的图像,取一个能使D足够低的尽可能小的K值作为我们kmeans的超参数K即可。

代码示例:

%随机获取150个点

X = [randn(50,2)+ones(50,2);randn(50,2)-ones(50,2);randn(50,2)+[ones(50,1),-ones(50,1)]];

opts = statset('Display','final');

%调用Kmeans函数

%X N*P的数据矩阵

%Idx N*1的向量,存储的是每个点的聚类标号

%Ctrs K*P的矩阵,存储的是K个聚类质心位置

%SumD 1*K的和向量,存储的是类间所有点与该类质心点距离之和

%D N*K的矩阵,存储的是每个点与所有质心的距离;

[Idx,Ctrs,SumD,D] = kmeans(X,3,'Replicates',3,'Options',opts);

%画出聚类为1的点。X(Idx==1,1),为第一类的样本的第一个坐标;X(Idx==1,2)为第二类的样本的第二个坐标

plot(X(Idx==1,1),X(Idx==1,2),'r.','MarkerSize',14)

hold on

plot(X(Idx==2,1),X(Idx==2,2),'b.','MarkerSize',14)

hold on

plot(X(Idx==3,1),X(Idx==3,2),'g.','MarkerSize',14)

%绘出聚类中心点,kx表示是圆形

plot(Ctrs(:,1),Ctrs(:,2),'kx','MarkerSize',14,'LineWidth',4)

plot(Ctrs(:,1),Ctrs(:,2),'kx','MarkerSize',14,'LineWidth',4)

plot(Ctrs(:,1),Ctrs(:,2),'kx','MarkerSize',14,'LineWidth',4)

legend('Cluster 1','Cluster 2','Cluster 3','Centroids','Location','NW')

Ctrs

SumD

bi-K-means算法

bikmeans更改了度量聚类效果的指标,运用SSE(误差平方和)来衡量聚类效果,SSE越小则聚类效果越好,SSE是凸优化函数,换句话说这是一个能稳定收敛于全局最优解的算法。

要注意的是,虽然这是一个能求出全局最优解的方法,但不代表他求出来的解就一定不会把数据点分错类,最优解不是正解,现实世界中不存在正解,随机因素是不可能排除的,因此不存在绝对不出错的分类算法。

bi-K-means算法首先将所有数据点视为一个cluster,然后将在这个cluster上进行k=2的经典k-means聚类,然后从两个结果cluster中选取一个进行继续划分,直到cluster 的总数达到事先规定的类数。其中,选取哪一个cluster进行划分取决于对其划分能否最大程度地降低SSE的值。

SSE是什么呢?如果目前有k个cluster,每个cluster都有一个属于自己的SSEi,k个SSEi的和就是SSE。SSEi就等于第i个cluster的元素到所在簇质心距离的平方和。

有趣的是,bi-K-means中进行的k=2的经典K-means聚类其实也可以被K-means++所替代,但是K-means++并不能对运算速度进行太多提升,甚至可能降低运算速度,K-means++是增大了得到全局最优解的概率可是bi-K-means本身就是一个100%得到全局最优解的算法,更是完全没必要使用k-means++作为子算法了,因此bi-K-means算法内部子算法选用的是经典K-means算法。

代码略

让我们比较一下三种算法

以上三种算法都只能用于解决数据集线性可分的问题对于数据集线性不太能分甚至很不太能分的问题就会得出荒谬的结果,因此引入了其他算法来解决数据集线性不可分的问题。

kernel-k-means算法和基于密度的划分方法等方法都可以解决这个问题,我们先来介绍kernel-k-means。

kernel-K-means

该算法将二维数据通过一个恰当的非线性映射转化为三维数据,从而把数据集转化为线性可分的数据集,然后在三维空间中利用其他kmeans算法进行常规聚类。形象的原理如下图所示:

具体转化规则有很多种,实际问题中可能还需要具体分析,该算法比较复杂,如果最开始的直接观察可视化数据的时候发现数据集线性不可分,那不如直接用基于密度的聚类方法,因此这个算法暂时跳过。

基于密度的聚类方法

DBSCAN算法

首先要引入几个概念,对于含有若干数据点的集合X而言,X内的点会被分为核心点、边界点、噪声点三类,具体定义如下

对于特定的数据点x0而言,所有与x距离小于epsilon的点构成的集合被称作x0的epsilon邻域,而x0的epsilon邻域的点的个数称为x0的密度,记作ρ(x0),epsilon往往是运算前给定的超参数

DBSCAN算法中还有一个超参数叫做邻域密度阈值,记作M,规定在X中,密度大于等于M的点被称为X的核心点,核心点构成的集合记作Xc,所有核心点epsilon邻域内的所有非核心点称作X的边界点(一个边界点可能在多个非核心店的epsilon邻域中),边界点构成的集合记作Xbd。X中既不是核心点又不是边界点的数据点就被命名为噪声点。

该算法具体流程如下

先标记所有数据点都是“未访问状态”,先随机选取一个未访问的数据点p,将其标记已访问,再判断p的密度和M的关系,如果p<M,则将p标记为噪声点,如果p>=M,则创建一个新的cluster C,并将p归入C中,对于p的epsilon邻域内的每一个点(不管是否访问过)都归入集合N中,然后依次访问集合N的每一个数据点p’,若p’的密度也大于等于M,则将p’的epsilon邻域中的所有未访问的点都添加到集合N中,否则将p’归入C中。然后只要还有未访问数据点,就周而复始继续选择下一个未访问的数据点,直到所有数据点都已被归类。

重新叙述一遍,

先标记所有数据点都是“未访问状态”,先随机选取一个未访问的数据点p,将其标记为已访问。如果ρ(p)<M,则将p标记为噪声点,否则,将p可达点构成的集合设为X,将X中的所有数据点标记为已访问,并且创建一个新的cluster C,将X中的所有核心点归入C中。

最后,只要还有未访问数据点,就周而复始继续选择下一个未访问的数据点,直到所有数据点都已被归类。

(p的可达点递归式定义如下,在p的epsilon邻域中的点,或者在p的可达点的epsilon邻域中的点都是p的可达点)

在DBSCAN算法中,使用了统一的epsilon值,当数据密度不均匀的时候,如果设置了较小的epsilon值,则较稀疏处的点将很可能被集体认定为噪声点,很难形成cluster;如果设置了较大的epsilon值,则密度较大处的邻近的多个cluster容易被划分为同一个cluster,如下图所示。

·如果设置的epsilon较大,将会获得A,B,C这3个cluster

·如果设置的epsilon较小,将会只获得C1、C2、C3这3个cluster

一般来说,DBSCAN算法有以下三个特点:

1.需要提前确定epsilon和M的值,但不需要提前设置聚类的个数

2.对初值选取敏感,对噪声不敏感

3.对密度不均的数据聚合效果不好

OPTICS算法

由于DBSCAN算法中epsilon的选取会相当程度地影响算法效果,而epsilon又很难人为指定一个很好的值,因此有了OPTICS算法,后者实际上是对前者的一种有效改进,主要解决的就是DBSCAN对epsilon极敏感的问题。

我们还需要引入几个概念,

  1. 核心距离r,定义为距离数据点p的epsilon邻域内第M近的数据点与p的距离(把p本身算作距离p第一近的点),当然如果ρ(p)<M那么p就不是核心点,压根就没有核心距离刻个概念
  2. 可达距离rd,对于数据点p而言,p和数据点o的可达距离定义为p的核心距离和op间距的较大值,当然如果ρ(p)<M那么p就不是核心点,压根就没有核心距离这个概念

具体流程如下

  1. 初始化一个有序队列和一个结果队列,并将所有数据点初始化为“未访问状态”
  2. 从未访问状态节点中抽取一个样本点p,对于特定的p,先把p标记为已访问,
  3. 然后计算ρ(p),若ρ(p)<M,则p不是核心点,重新开始第二步;若ρ(p)>=M,则将p标记为核心点并放入结果队列中,将p的epsilon邻域中的所有点按照可达距离进行升序排列放入对有序队列中
  4. 如果有序队列不为空,就弹出队首(即目前可达距离最近的数据点,如果队列为空,则跳至第七步。
  5. 如果弹出的队首q在结果队列中,则直接返回第四步;若弹出的队首q不在结果队列中,则先将其加入结果队列。
  6. 然后判断q是否为核心点,如果q不是核心点,则直接放回第四步,如果q是核心点,那么将q的epsilon邻域内的点也放入有序队列中,并将有序队列重新升序排列(如果已经在有序队列中了的点被重复放入有序队列中,则将其距离更新为较小的那一个),然后返回第四步
  7. 如果所有节点均已访问过,则运算停止,否则返回第二步

至此为止,我们其实都还没获得具体分类,只获得了一个结果队列。我们要先人为定义一个最大半径R,然后从结果序列中按顺序取出样本点,直到结果队列为空,对于取出的样本点p而言,如果p的可达距离小于等于R,则将其归入当前cluster中,否则如果改点的核心距离大于R则归类为噪声点,如果核心距离小于等于R则创建一个新的cluster,将其归入新的cluster中

epsilon依旧是人为指定的,但我们在OPTICS中使用的更多的是由epsilon算出来的核心距离r,这使得OPTICS中对epsilon的值不再那么敏感,随便设置为一个较大的数都可以得到大体一致的优秀分类结果,一般直接设置为正无穷。

因此这个算法中我们实际上用到的超参数是R与M,我们大可以把结果队列的可达距离关于队列编号(排在队列第几位编号就是几)的条形统计图画出来,比如像下图这般:

然后就可以很轻易的观察出应该选取的R与M,大大降低了选出恰当超参数的难度,从而大大提高了算法可靠性。

其实,OPTICS中用到的epsilon往往取正无穷,并不相当于DESCAN中的epsilon,而OPTICS中的R其实才相当于DESCAN中的epsilon,根据结果队列的图像挑选R和M的过程其实就是较为客观地确定DESCAN中的epsilon与M的过程,毕竟我们根据这个排序可以得到基于任何epsilon与M的DBSCAN算法的结果,可以理解为把epsilon放宽为一整个范围,形象地展示了每个epsilon会对应的分类结果,而非只取特定的epsilon

因此,OPTICS聚类有效降低了算法对于超参数的敏感性,且有效解决了密度不均导致分类效果较差的问题,相对于K-means算法还解决了线性不可分的数据集的有效处理问题。

DBSCAN可以直接用matlab的dbscan函数实现,OPTICS则需要手写实现,具体代码实现如下:

% [RD,CD,order]=optics(x,k)

% -------------------------------------------------------------------------

% Aim:

% Ordering objects of a data set to obtain the clustering structure

% -------------------------------------------------------------------------

% Input:

% x - data set

% k - number of objects in a neighborhood of the selected object,即M

% -------------------------------------------------------------------------

% Output:

% RD - vector with reachability distances (m,1)

% CD - vector with core distances (m,1)

% order - vector specifying the order of objects (1,m)

% -------------------------------------------------------------------------

% Example of use:

% x=[randn(30,2)*.4;randn(40,2)*.5+ones(40,1)*[4 4]];

% [RD,CD,order]=optics(x,4)

function [RD,CD,order,D]=optics(x,k)

[m,n]=size(x); % m=70,n=2

CD=zeros(1,m);

RD=ones(1,m)*10^10;

% Calculate Core Distances

for i=1:m

D=sort(dist(x(i,:),x));

CD(i)=D(k+1); % 第k+1个距离是密度的界限阈值

end

order=[];

seeds=[1:m];

ind=1;

while ~isempty(seeds)

ob=seeds(ind);

%disp(sprintf('aaaa%i',ind))

seeds(ind)=[];

order=[order ob]; % 更新order

var1 = ones(1,length(seeds))*CD(ob);

var2 = dist(x(ob,:),x(seeds,:));

mm=max([var1;var2]); % 比较两个距离矩阵,选择较大的距离矩阵

ii=(RD(seeds))>mm;

RD(seeds(ii))=mm(ii);

[i1 ind]=min(RD(seeds));

%disp(sprintf('bbbb%i',ind))

end

RD(1)=max(RD(2:m))+.1*max(RD(2:m));

function [D]=dist(i,x)

% function: [D]=dist(i,x)

%

% Aim:

% Calculates the Euclidean distances between the i-th object and all objects in x

% Input:

% i - an object (1,n)

% x - data matrix (m,n); m-objects, n-variables

%

% Output:

% D - Euclidean distance (m,1)

[m,n]=size(x);

D=(sum((((ones(m,1)*i)-x).^2)')); % 距离和

if n==1

D=abs((ones(m,1)*i-x))';

end

层次化聚类方法

前面的那些算法会产生一种误差,叫做链式效应,是说如果分出来的cluster的形状是一个狭长的链子而非一个超球体,那么由于链子的存在而被归为一类的链子两端的元素其实差异很大。

为了降低链式效应,层次聚类法应运而生,层次聚类算法 (hierarchical clustering) 时间复杂度较高,大约为O(n^3),一般分为两类:

Agglomerative 层次聚类:又称自底向上(bottom-up)的层次聚类或凝聚性层次聚类,每一个对象最开始都是一个 cluster,每次按一定的准则将最相近的两个 cluster 合并生成一个新的 cluster,如此往复,直至最终所有的对象都属于一个 cluster。这里主要关注此类算法。

Divisive 层次聚类: 又称自顶向下(top-down)的层次聚类或分裂型层次聚类,最开始所有的对象均属于一个 cluster,每次按一定的准则将某个 cluster 划分为多个 cluster,如此往复,直至每个对象均是一个 cluster。

拿Agglomerative 层次聚类为例,较为简单的一个实现方法如下:

matlab中的linkage函数可以用于实现Agglomerative 层次聚类法

function [idx, Z] = agglomerative(X, K)

% X: n-by-p matrix, each row is a data point

% K: number of clusters

% idx: n-by-1 vector, cluster index for each data point

% Z: (n-1)-by-3 matrix, linkage matrix

n = size(X, 1);

D = pdist(X); % compute pairwise distance between data points

Z = linkage(D); % compute linkage matrix

idx = cluster(Z, 'maxclust', K); % assign clusters based on max distance threshold

idx = XC;

end

总结

在Java编程中,为了降低开销和优化程序的效率,我们常常使用线程池来管理线程的创建和销毁,并尽量复用已创建的对象。这样做不仅可以提高程序的运行效率,还能减少垃圾回收器对对象的回收次数。

在Golang中,我们知道协程(goroutine)由于其体积小且效率高,在高并发场景中扮演着重要的角色。然而,资源始终是有限的,即使你的内存很大,也会有一个限度。因此,协程池在某些情况下肯定也会有其独特的适用场景。毕竟,世上并不存在所谓的银弹,只有在特定情况下才能找到最优的解决方案。因此,在Golang中,我们仍然需要考虑使用协程池的情况,并根据具体场景来选择最佳的解决方案。

今天,我们将从Java线程池的角度出发,手把手地带你实现一个Golang协程池。如果你觉得有些困难,记住我们的宗旨是使用固定数量的协程来处理所有的任务。这样做的好处是可以避免协程数量过多导致资源浪费,也能确保任务的有序执行。让我们一起开始,一步步地构建这个Golang协程池吧!

协程池

首先,让我们编写一个简单的多协程代码,并逐步进行优化,最终实现一个简化版的协程池。假如我们有10个任务需要处理。我们最简单做法就是直接使用
go
关键字直接去生成相应的协程即可,比如这样:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg  sync.WaitGroup
    wg.Add(5)
    for i := 1; i <= 5; i++ {
        go func(index int) {
            fmt.Printf("Goroutine %d started\n", index)
            time.Sleep(1 * time.Second)
            fmt.Printf("Goroutine %d finished\n", index)
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println("All goroutines finished")
}

我们当前的用法非常简单,无需专门维护goroutine容量大小。每当有一个任务出现,我们就创建一个goroutine来处理。现在,让我们进一步优化这种用法。

优化版协程池

现在我们需要首先创建一个pool对象和一个专门用于运行协程的worker对象。如果你熟悉Java中线程池的源码,对于worker这个名称应该不会陌生。worker是一个专门用于线程复用的对象,而我们的worker同样负责运行任务。

实体Job没啥好说的,就是我们具体的任务。我们先来定义一个简单的任务实体Job:

type Job struct {
    id  int
    num int
}

我们来看下主角Worker定义:

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模拟任务处理
            fmt.Println("jobs 模拟任务处理")
            time.Sleep(2 * time.Second) // 休眠2秒钟
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

你可以看到,我的实体拥有一个channel。这个channel类似于我们想要获取的任务队列。与Java中使用链表形式并通过独占锁获取共享链表这种临界资源的方式不同,我选择使用了Golang中的channel来进行通信。因为Golang更倾向于使用channel进行通信,而不是共享资源。所以,我选择了使用channel。为了避免在添加任务时直接阻塞,我特意创建了一个带有任务缓冲的channel。

在这里,
Start()
方法是一个真正的协程,我会从channel中持续获取任务,以保持这个协程不被销毁,直到没有任务可获取为止。这个逻辑类似于Java中的线程池。

让我们进一步深入地探讨一下Pool池的定义:

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

他的定义可能有点复杂,但是我可以用简单的语言向你解释,就是那些东西,只是用Golang的写法来实现,与Java的实现方式类似。

首先,我定义了一个名为worker的数组,用于存储当前存在的worker数量。这里并没有实现核心和非核心worker的区分。另外,我还创建了一个独立的channel,用于保存可缓冲任务的大小。这些参数在初始化时是必须提供的。

Start
方法的主要目的是启动worker开始执行任务。首先,它会启动一个核心协程,等待任务被派发。然后,dispatchJobs方法会开始监听channel是否有任务,如果有任务,则会将任务分派给worker进行处理。在整个过程中,通过channel进行通信,没有使用链表或其他共享资源实体。需要注意的是,dispatchJobs方法在另一个协程中被调用,这是为了避免阻塞主线程。

getLeastBusyWorker
方法是用来获取阻塞任务最少的worker的。这个方法的主要目标是保持任务平均分配。而
AddJob
方法则是用来直接向channel中添加job任务的,这个方法比较简单,不需要过多解释。

协程池最终实现

image

经过一系列的反复修改和优化,我们终于成功实现了一个功能完善且高效的Golang协程池。下面是最终版本的代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    id  int
    num int
}

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模拟任务处理
            fmt.Println("jobs 模拟任务处理")
            time.Sleep(2 * time.Second) // 休眠2秒钟
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

func main() {
    pool := NewPool(3, 10)
    pool.Start()
    // 添加任务到协程池
    for i := 1; i <= 15; i++ {
        pool.AddJob(Job{
            id:  i,
            num: i,
        })
    }

    // 等待所有任务完成
    pool.wg.Wait()
    close(pool.jobChan)
    fmt.Println("All jobs finished")
}

三方协程池

在这种场景下,既然已经有一个存在的场景,那么显然轮子是肯定有的。不论使用哪种编程语言,我们都可以探索一下,以下是Golang语言中关于三方协程池的实现工具。

ants
: ants是一个高性能的协程池实现,支持动态调整协程池的大小,可以通过简单的API调用来将任务提交给协程池进行执行。官方地址:
https://github.com/panjf2000/ants

gopool
:gopool 是一个高性能的 goroutine 池,旨在重用 goroutine 并限制 goroutine 的数量。官方地址:
https://github.com/bytedance/gopkg/tree/develop/util/gopool

这些库都提供了简单易用的API,可以方便地创建和管理协程池,同时也支持动态调整协程池的大小,以满足不同场景下的需求。

总结

当然,我写的简易版协程池还有很多可以优化的地方,比如可以实现动态扩容等功能。今天我们要简单总结一下协程池的优势,主要是为了降低资源开销。协程池的好处在于可以重复利用协程,避免频繁创建和销毁协程,从而减少系统开销,提高系统性能。此外,协程池还可以提高响应速度,因为一旦接收到任务,可以立即执行,不需要等待协程创建的时间。另外,协程池还具有增强可管理性的优点,可以对协程进行集中调度和统一管理,方便进行性能调优。

wmproxy

wmproxy
已用
Rust
实现
http/https
代理,
socks5
代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现
websocket
代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

国内: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

设计目标

部署软件,实现内网穿透功能。

配置文件准备

服务端的配置文件
mapping_server.toml

default_level = "trace"
[proxy]
#绑定的ip地址
bind_addr = "0.0.0.0:8091"

username = "wmproxy"
password = "wmproxy"

#内网映射http绑定地址
map_http_bind = "0.0.0.0:8001"
#内网映射tcp绑定地址
map_tcp_bind = "0.0.0.0:8002"
#内网映射https绑定地址
map_https_bind = "0.0.0.0:8003"
#内网映射的公钥证书,为空则是默认证书
# map_cert = 
#内网映射的私钥证书,为空则是默认证书
# map_key =
# 双向认证
# two_way_tls = true
# #接收客户端是为是加密客户端
# tc = true
#当前服务模式,server为服务端,client为客户端
mode = "server"

客户端的配置文件
mapping_client.toml
,需注意填上正确的服务端地址。此处服务端地址填的是docker的名字,非docker启动换成相应的ip

control = "0.0.0.0:8838"

[proxy]
# 连接服务端地址
server = "wmproxy_server_docker:8091"
bind_addr = "0.0.0.0:8090"
flag = "http https socks5"
# 连接服务端是否加密
# ts = true
# two_way_tls = true
username = "wmproxy"
password = "wmproxy"

# 内网映射配置的数组

  #将localhost的域名转发到本地的0.0.0.0:8080
[[proxy.mappings]]
name = "web"
mode = "http"
local_addr = "192.168.17.22:8080"
domain = "localhost"

headers = [
  "proxy x-forward-for {client_ip}",
  "proxy + from $url",
  "+ last-modified 'from proxy'",
  "- etag",
]

#将tcp的流量无条件转到0.0.0.0:8080
[[proxy.mappings]]
name = "tcp"
mode = "tcp"
local_addr = "192.168.17.22:8080"
domain = ""

安装

docker

安装方法docker,注意在docker里配置文件的监听地址需改成
0.0.0.0
,否则无法映射到宿主机。当前将两个docker配置到相同的network环境中,是为了互通,如果分别部署到不同的网络环境可以删除掉。

docker pull dreamwhat/wmproxy

配置服务端
docker-compose.yaml

version: '3.5'
services:

  wmproxy_server:
    container_name: wmproxy_server_docker        # 指定容器的名称
    image: dreamwhat/wmproxy:0.2.4
    command:
      - sh
      - -c
      - |
        wmproxy run -c /etc/config/mapping_server.toml
    ports:
      - "127.0.0.1:8837:8837"
      - "8091:8091"
      - "8001:8001"  #http映射
      - "8002:8002"  #tcp映射
      - "8003:8003"  #https映射
    volumes:
      - ./mapping_server.toml/:/etc/config/mapping_server.toml:r

networks:
  default:
    name: wmproxy-network

通过docker-compose进行启动。

docker-compose up

有显示如下信息表明已开始监听请求:如果需要后台加上
docker-compose up -d
即可。

wmproxy_server_docker  | 2024-01-15T06:13:22.790886788+00:00 INFO wmproxy::option - 绑定代理:0.0.0.0:8091,提供代理功能。
wmproxy_server_docker  | 2024-01-15T06:13:22.790919459+00:00 INFO wmproxy::option - 内网穿透,http绑定:0.0.0.0:8001,提供http内网功能。
wmproxy_server_docker  | 2024-01-15T06:13:22.790960305+00:00 INFO wmproxy::option - 内网穿透,https绑定:0.0.0.0:8003,提供https内网功能。
wmproxy_server_docker  | 2024-01-15T06:13:22.791089397+00:00 INFO wmproxy::option - 内网穿透,tcp 绑定:0.0.0.0:8002,提供tcp内网功能。

配置客户端
docker-compose.yaml
,因为本机IP为192.168.17.22,此处做个8080的服务器做响应

version: '3.5'
services:

  wmproxy_client:
    container_name: wmproxy_client_docker        # 指定容器的名称
    image: dreamwhat/wmproxy:0.2.4
    # image: wmproxy
    command:
      - sh
      - -c
      - |
        wmproxy run -c /etc/config/mapping_client.toml
    ports:
      - "127.0.0.1:8838:8838"
      - "8090:8090"
    volumes:
      - ./mapping_client.toml/:/etc/config/mapping_client.toml:r

networks:
  default:
    name: wmproxy-network

启动方式同上,服务端看到如下输出则表示正常:

wmproxy_server_docker  | 2024-01-15T06:15:47.988802113+00:00 TRACE wmproxy::wmcore - 代理收到客户 端连接: 172.25.0.3:57214->0.0.0.0:8091

客户端同样会输出启动成功信息。

  • 客户端的代理信息会通过服务端进行代理处理。
  • 服务端的HTTP请求会转发到客户端进行处理。

客户端代理验证

客户端为8090处理代理端口,如果不需要可以不映射端口。
此时客户端的代理请求将会通过服务端进行转发,可以用这种方式访问到服务端内网的数据。以下通过代理访问
www.baidu.com
,因为用户密码均配置wmproxy,所以需要进行设置

curl.exe -x http://wmproxy:wmproxy@127.0.0.1:8090 http://www.baidu.com
curl.exe -x socks5://wmproxy:wmproxy@127.0.0.1:8090 http://www.baidu.com

此时服务端的docker上可以看到如下输出:

wmproxy_server_docker  | 2024-01-15T06:23:36.796799098+00:00 TRACE wmproxy::check::health - connect addr = 183.2.172.185:80
wmproxy_server_docker  | 2024-01-15T06:23:36.798174881+00:00 TRACE wmproxy::check::health - success connect addr = 183.2.172.185:80

通过ping www.baidu.com可以知道,此时服务端发起了www.baidu.com的连接。

PS C:\Users\PC> ping www.baidu.com

Pinging www.a.shifen.com [183.2.172.42] with 32 bytes of data:
Reply from 183.2.172.42: bytes=32 time=24ms TTL=51

且通过curl的输出内容为正确返回

PS C:\Users\PC> curl.exe -x http://wmproxy:wmproxy@127.0.0.1:8090 http://www.baidu.com
<!DOCTYPE html>
<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css href=http://s1.bdstatic.com/r/www/cache/bdorz/baidu.min.css><title>百度一下,你就知道</title></head> <body link=#0000cc> <div id=wrapper> 
...

此时客户端通过服务端访问网络为通的。

内网穿透验证

此时我们启动本地监听的8080服务器。请求返回如下

PS C:\Users\PC> curl http://127.0.0.1:8080


StatusCode        : 200
StatusDescription : OK
Content           : {72, 101, 108, 108...}
RawContent        : HTTP/1.1 200 OK
                    Content-Length: 13
                    Server: wenmeng

                    Hello World

Headers           : {[Content-Length, 13], [Server, wenmeng]}
RawContentLength  : 13

测试8001的内网穿透,当前我们绑定地址为
localhost
,因为我们在客户端的配置里进行了如下配置:

[[proxy.mappings]]
name = "web"
mode = "http"
local_addr = "192.168.17.22:8080"
domain = "localhost"
  • 我们尝试用ip进行访问,告诉我们无法访问,正确预期。
PS C:\Users\PC> curl.exe http://127.0.0.1:8001/
not found
  • 我们用localhost的网址进行访问,返回预期的结果,且加上了带回来的
    Last-Modified
    ,此时该途径测试通过。
PS C:\Users\PC> curl http://localhost:8001


StatusCode        : 200
StatusDescription : OK
Content           : {72, 101, 108, 108...}
RawContent        : HTTP/1.1 200 OK
                    Content-Length: 13
                    Last-Modified: from proxy
                    Server: wmproxy

                    Hello World

Headers           : {[Content-Length, 13], [Last-Modified, from proxy], [Server, wmproxy]}
RawContentLength  : 13

但是通过8002因为是tcp转发的,此时无论是127.0.0.1或者为localhost将同样的进行处理。此时得到的结果均和正常访问8080一模一样的进行返回。

PS C:\Users\PC> curl http://127.0.0.1:8002


StatusCode        : 200
StatusDescription : OK
Content           : {72, 101, 108, 108...}
RawContent        : HTTP/1.1 200 OK
                    Content-Length: 13
                    Server: wenmeng

                    Hello World

Headers           : {[Content-Length, 13], [Server, wenmeng]}
RawContentLength  : 13

二进制文件部署

通过从
https://github.com/tickbh/wmproxy/tags
下载相应平台的二进制文件启动,此处用windows的表示即重命名成wmproxy.exe
服务端启动

wmproxy -c mapping_server.toml

客户端启动,其中连接的目标ip需进行修改

wmproxy -c mapping_client.toml

我们将会得到相同的体验。

小结

此章中讲述了内网穿透如何部署及客户端访问到服务端内网的资源,这两种场景中相对常见,通过客户端与服务端的加密通讯,网络传中的被嗅探的可能将进一步减少。

点击
[关注]

[在看]

[点赞]
是对作者最大的支持

1. 背景

本qiang~这段时间调研了LLM上下文扩展的问题,并且实打实的运行了几个开源的项目,所谓实践与理论相结合嘛!

此文是本qiang~
针对上下文扩展问题的总结,包括解决方案的整理概括,文中参考了多篇有意义的文章
,他山之石可以攻玉。

大语言模型的扩展有诸多意义,如进行更长的会话、总结更长的文档等。

2. 上下文扩展方案

2.1 位置插值

位置插值(Position Interpolation)是Meta研究人员在去年发布的论文《EXTENDING CONTEXT WINDOW OF LARGE LANGUAGE MODELS VIA POSITION INTERPOLATION》提出的方案,基线模型为LLAMA,LLAMA采用的位置编码是苏神提出的ROPE(如果苏神的文章理论不清楚,推荐拜读下FireFly作者的《图解RoPE旋转位置编码及其特性》,连接在文末),但ROPE的外推性效果不佳,位置插值则做了进一步的改进优化。

位置插值的原理图如下:

原理其实很简单,
通过线性降低输入位置索引以匹配原始上下文窗口大小,然后通过少量微调工作,然后将LLaMA 7B和65B模型初始的2048扩展到32768,效率和效果均有保障

位置插值的代码可以参考transformers中LlamaLinearScalingRotaryEmbedding方法,该防范继承了ROPE的基础类LlamaRotaryEmbedding,改动之处仅在于图中标红之处。

2.2 LongLoRA

LongLoRA是港中文大学和MIT联合发出的论文《LONGLORA:EFFICIENT FINE-TUNING OF LONGCONTEXT LARGE LANGUAGE MODELS》提出的方法,本论文的主要改进之处在于:

1. 基于位置插值方法,在上下文扩展任务中引入LoRA方法,降低对硬件资源的专需。

2. 提出了shift short attention,将attention的直接计算改进为分组计算,且保障相邻组间信息共享。

3. 将norm层及embed层也加入到微调训练中,该部分的参数占比相对较少。

LoRA大家应该很熟悉,下面将重点介绍shift short attention。原理图如下:

(1) 首先将head维度的特征拆分为2块

(2) 然后将其中一组的特征被移动,移动大小为group size的一半

(3) 将tokens拆分成组,且reshape为batch维,然后attention计算

(4) 最后将计算后的结果进行还原。

shift short attention的伪代码如下,具体代码可以参考LongLoRA的github仓库:

2.3 LongQLoRA

LongQLoRA的论文是《LONGQLORA: EFFICIENT AND EFFECTIVE METHOD TO EXTEND CONTEXT LENGTH OF LARGE LANGUAGE MODELS》,主要的思想就是在LongLoRA的基础上引入了量化操作,进一步降低了显卡需求。(Ps: 其实LongLoRA项目本身也集成了量化微调)

LongQLoRA仅在一张32G的V100上,可以将LLaMA2的7B和13B从4096扩展到8192甚至12K,仅需要1000步微调即可。

LongQLoRA本身也是基于transformers架构,因此引入量化配置仅需要些许改动即可,具体如下:

3. 总结

一句话足矣~

本文主要展示了LLM长文本扩展的方法,包括位置插值、LongLoRA、LongQLoRA等论文的简单概述。

此外,所有的论文最好能够结合源码进行开展,目前本qiang~就在践行这一条路线,欢迎大家一块交流。

4. 参考

(1) ROPE原理: https://spaces.ac.cn/archives/8265

(2) 图解ROPE: https://mp.weixin.qq.com/s/-1xVXjoM0imXMC7DKqo-Gw

(3) 位置插值论文: https://arxiv.org/pdf/2306.15595v2.pdf

(4) LongLoRA论文: https://arxiv.org/pdf/2309.12307v2.pdf

(5) LongLoRA代码:https://github.com/dvlab-research/longlora

(6) LongQLoRA论文:https://arxiv.org/pdf/2311.04879v2.pdf

(7) LongQLoRA代码:https://github.com/yangjianxin1/longqlora

我的博客里有一个devops页面,专门用来汇总我写过的一些DevOps运维自动化相关的技术文章,页面很简单,就是一段文字描述加上一堆的文章链接,像下面这个样子

一直以来这个页面都安安静静的存在着,访问者甚少,像是一个默默无闻没人关注的孩子,躲在角落里,偶尔有人来看上两眼,也会因为他的丑陋而离开,并没有深入的去了解一下他,殊不知他十分的老道和博学,能够提供非常有用的灵感及思路,那些耐心看过他的人都得到了可观的回报。酒香也怕巷子深,好货还需会吆喝。虽然他很丑,但他毕竟也是我的孩子,我倾注了热血,我希望他也能得到大家的关注,我一直在想我该怎么帮帮他

帮他换一套皮肤,帮他丰富下内涵,让他看起来更好看,用起来也更好用吧。有了这样的想法我就在想这个孩子该如何改变,这个页面该如何改版。DevOps相关的文章大都是在介绍一系列的运维自动化系统,那能不能把系统架构画出来,页面上展示架构,架构里的每个模块都可以点击,点击之后跳转到相应模块都介绍页面去,就与我看到的DevOps工具元素周期表类似

然而架构较为抽象和复杂,理解起来十分困难不够直观,直观?或许我可以从直观入手,什么最直观?那必须是系统本身最直观,可以点击查看,有很好的交互体验。我不能直接把项目代码给放出来,那是否可以做个类似的系统界面,有交互能点击,点了页面上的某个位置或模块就能查看这个模块相关的文档,这个体验就很棒了吧,说干就干

很快就用了CoreUI纯前端写了这个
DevOps文章汇总
的页面,实现了想要的效果,能点击有交互,沉浸式体验,很直观,同时还适配了移动端,毕竟我们的系统也是全面兼容手机使用的,能够做到7*24小时不间断移动运维

这个页面就与我们的系统页面布局相同,使用体验类似,点击左上角应用名称后边的切换按钮可以呼出全部应用,想要深入了解哪个应用直接点击对应应用即可,目前已上线,赶紧来体验:
https://blog.ops-coffee.cn/devops