2024年7月


文本转换为向量有多种方式:

方法一:
通过模型服务灵积DashScope将文本转换为向量
(推荐)

方法二:
通过ModelScope魔搭社区中的文本向量开源模型将文本转换为向量

方法三:
通过Jina Embeddings v2模型将文本转换为向量

方法四:
通过百川智能向量化模型将文本转换为向量


本文介绍方法三:如何通过
Jina Embeddings v2模型

文本转换为向量
,并入库至向量检索服务DashVector中进行向量检索。

前提条件


Jina Embeddings v2模型

简介

Jina Embeddings v2模型,唯一支持 8192 个词元长度的开源向量模型,在大规模文本向量化基准 (MTEB) 的功能和性能方面与 OpenAI 的闭源模型 text-embedding-ada-002 相当。

模型名称

向量维度

度量方式

向量数据类型

备注

jina-embeddings-v2-small-en

512

Cosine

Float32

  • 词元长度限制:8192

jina-embeddings-v2-base-en

768

Cosine

Float32

  • 词元长度限制:8192

jina-embeddings-v2-base-zh

768

Cosine

Float32

  • 词元长度限制:8192

说明

关于Jina Embeddings v2模型更多信息请参考:
Jina Embeddings v2模型

使用示例

说明

需要进行如下替换代码才能正常运行:

  1. DashVector api-key替换示例中的{your-dashvector-api-key}

  2. DashVector Cluster Endpoint替换示例中的{your-dashvector-cluster-endpoint}

  3. Jina AI api-key替换示例中的{your-jina-api-key}

Python示例:

from dashvector import Client
import requests
from typing import List


# 调用Jina Embeddings v2模型,将文本embedding为向量
def generate_embeddings(texts: List[str]):
    headers = {
      'Content-Type': 'application/json',
      'Authorization': 'Bearer {your-jina-api-key}'
    }
    data = {'input': texts, 'model': 'jina-embeddings-v2-base-zh'}
    response = requests.post('https://api.jina.ai/v1/embeddings', headers=headers, json=data)
    return [record["embedding"] for record in response.json()["data"]]
    

# 创建DashVector Client
client = Client(
    api_key='{your-dashvector-api-key}',
    endpoint='{your-dashvector-cluster-endpoint}'
)

# 创建DashVector Collection
rsp = client.create('jina-text-embedding', 768)
assert rsp
collection = client.get('jina-text-embedding')
assert collection

# 向量入库DashVector
collection.insert(
    ('ID1', generate_embeddings(['阿里云向量检索服务DashVector是性能、性价比具佳的向量数据库之一'])[0])
)

# 向量检索
docs = collection.query(
    generate_embeddings(['The best vector database'])[0]
)
print(docs)

随着主机、磁盘、网络等技术的发展,对于承载大量数据存储的服务器来说,服务器内置存储空间,或者说内置磁盘往往不足以满足存储需要。因此,在内置存储之外,服务器需要采用外置存储的方式扩展存储空间,今天在这里我们分析一下当前主流的存储架构。

一、DAS

Direct Attached Storage
,直接连接存储(
直连式存储
),最常见的一种存储方式。
意思是存储设备只与一台主机服务器连接,如PC中的磁盘或只有一个外部SCSI接口的JBOD(Just a Band of Disks可以简单理解成磁盘箱)都属于DAS架构。
存储设备与服务器主机之间的通常采用SCSI总线连接

特点:简单、集中、易用,主要在中小企业应用中。

二、SAN

1、SAN

Storage Area Network,存储区域网络

SAN的兴起源于上个世纪80年代FC协议的出现,FC是Fibre Channel的缩写,网状通道的意思

前面我们已经得知DAS是通过SCSI接口总线,而SCSI接口有16个节点的限制,不可能接入很多的磁盘。SCSI并行总线结构,传输距离短,是一种宽而短的电缆结构。

细长的串行的FC是一种可寻址容量大、稳定性强、速度快(1Gbps~8Gbps,现在成熟的技术已经达到上百G)、传输距离远的网络结构
,所以最终替代了SCSI接口和总线,但是SCSI协议或者说SCSI语言仍然载于FC进行传输。
而且FC不仅替代了磁盘阵列前端接口,也替代了后端接口,从而使磁盘阵列真正处于网络之中。
到后来,2001年又提出了SAS传输网络,Serial Attached SCSI,串行SCSI,所以FC协议也属于串行SCSI。所以SAS和FC协议一样跨越OSI七个层次。
紧接着出现了SAS盘,SAS盘接口和SATA盘接口是相同的,SAS协议通过STP(SATA Tuneling Protocol)来兼容SATA协议。而FC自身则无法做到,需要通过一个SCSI-ATA协议转换器。
SAS和FC在磁盘阵列设计中有配合使用,也有单一色的协议。
总之,都是
替换了原来的并行SCSI通路技术,将一个个磁盘作为网络上的节点,即彻底变成网络化存储系统了。
这样自然
阐述了”Storage Area Network,存储区域网络“的概念

2、IP SAN

IP SAN是随着TCP/IP协议和局域网LAN技术而兴起的,
SCSI语言可以通过Internet来传递
,SCSI协议运行在TCP/IP之上,即
ISCSI,Internet Small Computer System Interface

ISCSI发起方叫作Initiator,被连接方叫作Target。一般来说Initiator端为主机设备,Target端为提供存储空间的设备,如磁盘阵列。
ISCSI标准发布于2004年
,RFC3720。
人们把这种
ISCSI为代表的TCP/IP作为传输方式的网络存储系统称作IP SAN,基于IP的存储区域网络

这样IP SAN相对FC SAN取得了不少的
优势:可扩展性和低成本
。所以
FC凭借其速度优势占据高端市场,而IP则以低成本优势占据中低端市场
。当然FC和IP协议之间也存在融合,形成了FCIP和IFCP的模式。
因IP SAN是在SAN后产生的,所以SAN默认指FC SAN。
iSCSI SAN是通过iSCSI协议连接的。一般来说SAN可以分为Fc SAN 和IP SAN, 二者的区别在于一个是通过光纤网络连接至SAN, 一个是通过IP网络连接至SAN。iSCSI协议是通过IP协议实现的。因此也可以说iSCSI SAN是IP SAN的一种。像FCIP(FC over IP)等技术也可以划入IP SAN的范畴

三、NAS

Network Attached Storage,网络附加存储

NAS是随着网络文件系统的出现而出现的,网络文件系统也是OS中的一种文件系统。
微软
的叫做
CIFS,Common Internet File Syetem,意思是Internet范围的文件系统

Linux和UNIX
系统使用的
NFS,Network File System,网络文件系统
。两者都是应用层协议,都是基于TCP/IP协议进行的传输。但是,CIFS开销非常大,远大于NFS!
这种文件系统逻辑不是在本地运行,而是在网络上的其他节点运行,调用远程的文件系统模块,即远程式调用文件系统,Remote Procedure Call File System,简称RPC FS。
人们把这种
带有集中式文件系统功能的磁盘阵列,叫做NAS
。所以相对于SAN来说,不仅是磁盘或卷在远程节点上,连文件系统功能也搬运到了远程节点上。
NAS产品是真正即插即用的产品
。NAS设备一般支持多计算机平台,用户通过网络支持协议可进入相同的文档,因而NAS设备无需改造即可用于混合Unix/Windows NT局域网内,同时NAS的应用非常灵活。

NAS有一个关键性问题,即备份过程中的带宽消耗
。就是说LAN除了必须处理正常的最终用户传输流外,还必须处理包括备份操作的存储磁盘请求。
最后,从广义上说,各种存储架构都可以称为”SAN“,因为就算是直接连到主板上的IDE通道也可以连接两个磁盘,也可以认为是一个2节点网络。而且NAS和SAN也可以融合,NAS也可以看成是SAN的一种分支架构。

四、三种架构分析和应用

1、存储分类和结构

1230546-20200408162504587-686068344.png
上图展示了存储分类,以及其发展过程:从上到下(更详细一点应该是:FC SAN到NAS,再到IP SAN)、从左到右。
封闭系统主要指大型机。
开放系统指基于包括Windows、UNIX、Linux等操作系统的服务器。由于目前绝大部分用户采用的是开放系统,其外挂存储占有目前磁盘存储市场的70%以上。
如果把数据比喻成仓库,
三大架构经历了从DAS供自己使用,到SAN出租仓库给其他用户使用,再到NAS集中式理货服务外包的过程
。如果最后扩展到
分布式存储
,则可以比喻成
物流系统

而下图则展示了三者的结构图:
1230546-20200408162519254-1262826073.png
DAS:通过SCSI总线或者前端FC协议后端SCSI总线访问数据,采用SCSI接口。
SAN:通过专用光纤通道交换机或者TCP/IP协议访问数据,采用FC-AL接口、以太网。
NAS:用户通过TCP/IP协议访问数据,采用网络文件系统NFS、CIFS实现共享。

2、SAN和NAS区别

(1)
最主要的区别:SAN是一个网络上的磁盘,NAS是一个网络上的文件系统;SAN基于磁盘级别
的存储系统
,NAS基于文件级别
的存储系统。
(2)
SAN
是将目光集中在磁盘、磁带以及联接它们的
可靠的基础结构

NAS
是将目光集中在应用、用户和文件以及它们
共享的数据
上。
(3)SAN通过光纤比NAS通过以太网速度快很多;但光纤通道比以太网通道的网络更加复杂、成本更高,所以
NAS更容易部署、更低成本和更易于扩展
。而随着
IP SAN
的出现,这些缺陷也就不成为SAN的劣势。
(4)文件系统逻辑通过CPU运算和占用内存做缓存,所以NAS可以解放主机服务器上的CPU和内存资源,即
瘦服务器主机;所以NAS更适合于CPU密集型的应用环境

(5)
SAN
因其传输速度快,对于大块数据的CPU运算要求也不高,所以
适合于大块IO密集的环境

最后,随着
万兆以太网的出现和发展使得NAS和IP SAN在与FC SAN竞争时不会再逊色于传输带宽

3、三种存储架构的应用场景

DAS虽然比较古老了,但是还是很适用于那些数据量不大,对磁盘访问速度要求较高的中小企业;
NAS
多适用于
文件服务器
,用来存储
非结构化数据
,虽然
受限于以太网的速度
,但是
部署灵活,成本低

SAN
则适用于
大型应用或数据库系统
,缺点是
成本高、较为复杂

Swift的闭包(Closures)是一种将功能块和上下文整合并演示在代码中的一种手段。闭包可以捕获并存储其上下文中的变量和常量。与普遍存在于其他语言的匿名函数(如Python的lambda、JavaScript的函数表达式)类似,Swift的闭包提供了强大的功能,并在很多场景中替代了函数。

闭包有三种主要形式:

  1. 全局函数:有名字但不捕获任何值。
  2. 嵌套函数:有名字并能捕获其封闭函数内的值。
  3. 闭包表达式:无名字,可以从其上下文中捕获值。

闭包表达式语法

闭包表达式语法是构建闭包的一种方式,包含一个参数列表,一个返回类型和闭包的主体:

{ (parameters) -> returnType in
statements
}

让我们一步步通过示例探讨一下闭包的使用情形。

示例:最基本的闭包

let simpleClosure = {
print("Hello, Swift Closure!")
}

simpleClosure() // 输出: "Hello, Swift Closure!"

示例:带参数和返回值的闭包

let sumClosure = { (a: Int, b: Int) -> Int in
return a + b
}

let result = sumClosure(3, 5)
print(result) // 输出: 8

示例:作为参数传递闭包

闭包经常作为参数传递给函数,比如用作回调函数。

func performOperation(a: Int, b: Int, operation: (Int, Int) -> Int) -> Int {
return operation(a, b)
}

let result = performOperation(a: 10, b: 20, operation: { (x, y) -> Int in
return x * y
})
print(result) // 输出: 200

示例:尾随闭包

如果闭包是函数的最后一个参数,那么可以使用尾随闭包的语法来书写:

func performOperation(a: Int, b: Int, operation: (Int, Int) -> Int) -> Int {
return operation(a, b)
}

let result = performOperation(a: 10, b: 20) { (x, y) -> Int in
return x * y
}
print(result) // 输出: 200

示例:闭包捕获值

闭包可以捕获并存储其上下文中的变量值,这被称为“捕获”。

func makeIncrementer(incrementAmount: Int) -> () -> Int {
var total = 0

let incrementer: () -> Int = {
total += incrementAmount
return total
}

return incrementer
}

let incrementByTwo = makeIncrementer(incrementAmount: 2)

print(incrementByTwo()) // 输出: 2
print(incrementByTwo()) // 输出: 4

示例:自动闭包

自动闭包是一种自动捕获表达式的闭包,多用于延时计算和副作用控制。

swift
复制代码
var array = [1, 2, 3]

func removeFirstElement(array: @autoclosure () -> Int?) -> Int? {
return array()
}

let firstElement = removeFirstElement(array: array.removeFirst())
print(firstElement ?? "Array is empty") // 输出: 1

示例:逃逸闭包(Escaping Closures)

当闭包在函数返回之后还被调用时,需要将其标记为
@escaping

swift
复制代码
var completionHandlers: [() -> Void] = []

func someFunctionWithEscapingClosure(completionHandler: @escaping () -> Void) {
completionHandlers.append(completionHandler)
}

someFunctionWithEscapingClosure {
print("This is an escaping closure")
}

// 代码稍后执行
completionHandlers.first?() // 输出: "This is an escaping closure"

这些示例涵盖了Swift闭包的基础和一些高级场景。

背景

Pulsar 有提供一个查询 Broker 负载的接口:

    /**
     * Get load for this broker.
     *
     * @return
     * @throws PulsarAdminException
     */
LoadManagerReport getLoadReport() throws PulsarAdminException;

public interface LoadManagerReport extends ServiceLookupData {  
  
    ResourceUsage getCpu();  
  
    ResourceUsage getMemory();  
  
    ResourceUsage getDirectMemory();  
  
    ResourceUsage getBandwidthIn();  
  
    ResourceUsage getBandwidthOut();
}

可以返回一些 broker 的负载数据,比如 CPU、内存、流量之类的数据。

我目前碰到的问题是目前会遇到部分节点的负债不平衡,导致资源占用不均衡,所以想要手动查询所有节点的负载数据,然后人工进行负载。

理论上这些数据是在运行时实时计算的数据,如果对于单机的倒还好说,每次请求这个接口直接实时计算一次就可以了。

但对于集群的服务来说会有多个节点,目前 Pulsar 提供的这个接口只能查询指定节点的负载数据,也就是说每次得传入目标节点的 IP 和端口。

所以我的预期是可以提供一个查询所有节点负载的接口,已经提了
issue
,最近准备写 Purpose 把这个需求解决了。

实现这个需求的方案有两种:

  1. 拿到所有 broker 也就是服务节点信息,依次遍历调用接口,然后自己组装信息。
  2. 从 zookeeper 中获取负载信息。

理论上第二种更好,第一种实现虽然更简单,但每次都发起一次 http 请求,多少有些浪费。

第二种方案直接从源头获取负载信息,只需要请求一次就可以了。

而正好社区提供了一个命令行工具可以直接打印所有的
broker
负载数据:

pulsar-perf monitor-brokers --connect-string <zookeeper host:port>

分布式系统常用组件

提供的命令行工具其实就是直接从 zookeeper 中查询的数据。

在分布式系统中需要一个集中的组件来管理各种数据,比如:

  1. 可以利用该组件来选举 leader 节点
  2. 使用该组件来做分布式锁
  3. 为分布式系统同步数据
  4. 统一的存放和读取某些数据

可以提供该功能的组件其实也不少:

Zookeeper 是老牌的分布式协调组件,可以做 leader 选举、配置中心、分布式锁、服务注册与发现等功能。

在许多中间件和系统中都有应用,比如:

  • Apache Pulsar
    中作为协调中心
  • Kafka
    中也有类似的作用。

  • Dubbo
    中作为服务注册发现组件。


etcd 的功能与 zookeeper 类似,可以用作服务注册发现,也可以作为 Key Value 键值对存储系统;在 kubernetes 中扮演了巨大作用,经历了各种考验,稳定性已经非常可靠了。


Oxia
则是 StreamNative 开发的一个用于替换 Zookeeper 的中间件,功能也与 Zookeeper 类似;目前已经可以在 Pulsar 中替换 Zookeeper,只是还没有大规模的使用。

Pulsar 中的应用

下面以 Pulsar 为例(使用 zookeeper),看看在这类大型分布式系统中是如何处理负载均衡的。

再开始之前先明确下负载均衡大体上会做哪些事情。

  1. 首先上报自己节点的负载数据
  2. Leader 节点需要定时收集所有节点的负载数据。
    1. 这些负载数据中包括:
      1. CPU
        、堆内存、堆外内存等通用数据的使用量
      2. 流出、流入流量
      3. 一些系统特有的数据,比如在
        Pulsar
        中就是:
        1. 每个
          broker
          中的
          topic

          consumer

          producer

          bundle
          等数据。
  3. 再由 leader 节点读取到这些数据后选择负载较高的节点,将数据迁移到负载较低的节点。

以上就是一个完整的负载均衡的流程,下面我们依次看看在
Pulsar
中是如何实现这些逻辑的。

在 Pulsar 中提供了多种负载均衡策略,以下是加载负载均衡器的逻辑:

static LoadManager create(final PulsarService pulsar) {  
    try {  
        final ServiceConfiguration conf = pulsar.getConfiguration();  
        // Assume there is a constructor with one argument of PulsarService.  
        final Object loadManagerInstance = Reflections.createInstance(conf.getLoadManagerClassName(),  
                Thread.currentThread().getContextClassLoader());  
        if (loadManagerInstance instanceof LoadManager) {  
            final LoadManager casted = (LoadManager) loadManagerInstance;  
            casted.initialize(pulsar);  
            return casted;  
        } else if (loadManagerInstance instanceof ModularLoadManager) {  
            final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);  
            casted.initialize(pulsar);  
            return casted;  
        }  
    } catch (Exception e) {  
        LOG.warn("Error when trying to create load manager: ", e);  
    }  
    // If we failed to create a load manager, default to SimpleLoadManagerImpl.  
    return new SimpleLoadManagerImpl(pulsar);  
}

默认使用的是
ModularLoadManagerImpl
, 如果出现异常那就会使用
SimpleLoadManagerImpl
作为兜底。

他们两个的区别是
ModularLoadManagerImpl
的功能更全,可以做更为细致的负载策略。

接下来以默认的
ModularLoadManagerImpl
为例讲解上述的流程。

上报负载数据

在负载均衡器启动的时候就会收集节点数据然后进行上报:

      public void start() throws PulsarServerException {
        try {

            String brokerId = pulsar.getBrokerId();
            brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
            // 收集本地负载数据
            updateLocalBrokerData();

			// 上报 zookeeper
            brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
        } catch (Exception e) {
            log.error("Unable to acquire lock for broker: [{}]", brokerZnodePath, e);
            throw new PulsarServerException(e);
        }
    }

首先获取到当前 broker 的 Id 然后拼接一个 zookeeper 节点的路径,将生成的 localData 上传到 zookeeper 中。

// 存放 broker 的节点信息
ls /loadbalance/brokers

[broker-1:8080, broker-2:8080]

// 根据节点信息查询负载数据
get /loadbalance/brokers/broker-1:8080

上报的数据:

{"webServiceUrl":"http://broker-1:8080","pulsarServiceUrl":"pulsar://broker-1:6650","persistentTopicsEnabled":true,"nonPersistentTopicsEnabled":true,"cpu":{"usage":7.311714728372232,"limit":800.0},"memory":{"usage":124.0,"limit":2096.0},"directMemory":{"usage":36.0,"limit":256.0},"bandwidthIn":{"usage":0.8324254085661579,"limit":1.0E7},"bandwidthOut":{"usage":0.7155446715644209,"limit":1.0E7},"msgThroughputIn":0.0,"msgThroughputOut":0.0,"msgRateIn":0.0,"msgRateOut":0.0,"lastUpdate":1690979816792,"lastStats":{"my-tenant/my-namespace/0x4ccccccb_0x66666664":{"msgRateIn":0.0,"msgThroughputIn":0.0,"msgRateOut":0.0,"msgThroughputOut":0.0,"consumerCount":2,"producerCount":0,"topics":1,"cacheSize":0}},"numTopics":1,"numBundles":1,"numConsumers":2,"numProducers":0,"bundles":["my-tenant/my-namespace/0x4ccccccb_0x66666664"],"lastBundleGains":[],"lastBundleLosses":[],"brokerVersionString":"3.1.0-SNAPSHOT","protocols":{},"advertisedListeners":{"internal":{"brokerServiceUrl":"pulsar://broker-1:6650"}},"loadManagerClassName":"org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl","startTimestamp":1690940955211,"maxResourceUsage":0.140625,"loadReportType":"LocalBrokerData"}

采集数据

public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) {  
    SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();  
  
    // Override System memory usage and limit with JVM heap usage and limit  
    double maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();  
    double memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();  
    double memoryUsage = memoryUsageInBytes / MIBI;  
    double memoryLimit = maxHeapMemoryInBytes / MIBI;  
    systemResourceUsage.setMemory(new ResourceUsage(memoryUsage, memoryLimit));  
  
    // Collect JVM direct memory  
    systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),  
            (double) (DirectMemoryUtils.jvmMaxDirectMemory() / MIBI)));  
  
    return systemResourceUsage;  
}

会在运行时获取一些 JVM 和 堆外内存的数据。

收集所有节点数据

作为
leader
节点还需要收集所有节点的负载数据,然后根据一些规则选择将负载较高的节点移动到负债较低的节点中。

    private void updateAllBrokerData() {
	    // 从 zookeeper 中获取所有节点
        final Set<String> activeBrokers = getAvailableBrokers();
        final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
        for (String broker : activeBrokers) {
            try {
                String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
                // 依次读取各个节点的负载数据
                Optional<LocalBrokerData> localData = brokersData.readLock(key).get();
                if (!localData.isPresent()) {
                    brokerDataMap.remove(broker);
                    log.info("[{}] Broker load report is not present", broker);
                    continue;
                }

                if (brokerDataMap.containsKey(broker)) {
                    // Replace previous local broker data.
                    brokerDataMap.get(broker).setLocalData(localData.get());
                } else {
                    // Initialize BrokerData object for previously unseen
                    // brokers.
                    // 将数据写入到本地缓存
                    brokerDataMap.put(broker, new BrokerData(localData.get()));
                }
            } catch (Exception e) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
            }
        }
        // Remove obsolete brokers.
        for (final String broker : brokerDataMap.keySet()) {
            if (!activeBrokers.contains(broker)) {
                brokerDataMap.remove(broker);
            }
        }
    }

会从 zookeeper 的节点中获取到所有的 broker 列表(broker 会在启动时将自身的信息注册到 zookeeper 中。)

然后依次读取各自节点的负载数据,也就是在负载均衡器启动的时候上报的数据。

筛选出所有 broker 中需要 unload 的 bundle

在 Pulsar 中 topic 是最核心的概念,而为了方便管理大量 topic,提出了一个 Bundle 的概念; Bundle 是一批 topic 的集合,管理 Bundle 自然会比 topic 更佳容易。

所以在 Pulsar 中做负载均衡最主要的就是将负载较高节点中的 bundle 转移到低负载的 broker 中。

    private void updateAllBrokerData() {
        final Set<String> activeBrokers = getAvailableBrokers();
        final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
        for (String broker : activeBrokers) {
            try {
                String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
                Optional<LocalBrokerData> localData = brokersData.readLock(key).get();
                if (!localData.isPresent()) {
                    brokerDataMap.remove(broker);
                    log.info("[{}] Broker load report is not present", broker);
                    continue;
                }

                if (brokerDataMap.containsKey(broker)) {
                    // Replace previous local broker data.
                    brokerDataMap.get(broker).setLocalData(localData.get());
                } else {
                    // Initialize BrokerData object for previously unseen
                    // brokers.
                    brokerDataMap.put(broker, new BrokerData(localData.get()));
                }
            } catch (Exception e) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
            }
        }
        // Remove obsolete brokers.
        for (final String broker : brokerDataMap.keySet()) {
            if (!activeBrokers.contains(broker)) {
                brokerDataMap.remove(broker);
            }
        }
    }

负载均衡器在启动的时候就会查询所有节点的数据,然后写入到
brokerDataMap
中。


同时也会注册相关的 zookeeper 事件,当注册的节点发生变化时(一般是新增或者删减了 broker 节点)就会更新内存中缓存的负载数据。

之后 leader 节点会定期调用
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl#doLoadShedding
函数查询哪些数据需要卸载,然后进行重新负载。

final Multimap<String, String> bundlesToUnload = loadSheddingStrategy.findBundlesForUnloading(loadData, conf);

最核心的就是调用这个
findBundlesForUnloading
函数,会返回需要卸载 bundle 集合,最终会遍历这个集合调用 admin API 进行卸载和重平衡。

而这个函数会有多种实现,本质上就是根据传入的各个节点的负载数据,然后根据自定义的规则返回一批需要卸载的数据。

以默认的
org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
规则为例:


它是根据带宽、内存、流量等各个指标的权重算出每个节点的负载值,之后为整个集群计算出一个平均负载值。

以上图为例:超过
ShedBundles
的数据就需要被卸载掉,然后转移到低负载的节点中。

所以最左边节点和超出的 bundle 部分就需要被返回。

具体的计算逻辑如下:

    private void filterAndSelectBundle(LoadData loadData, Map<String, Long> recentlyUnloadedBundles, String broker,
                                       LocalBrokerData localData, double minimumThroughputToOffload) {
        MutableDouble trafficMarkedToOffload = new MutableDouble(0);
        MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
        loadData.getBundleDataForLoadShedding().entrySet().stream()
                .map((e) -> {
                    String bundle = e.getKey();
                    BundleData bundleData = e.getValue();
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                    return Pair.of(bundle, throughput);
                }).filter(e ->
                        !recentlyUnloadedBundles.containsKey(e.getLeft())
                ).filter(e ->
                        localData.getBundles().contains(e.getLeft())
                ).sorted((e1, e2) ->
                        Double.compare(e2.getRight(), e1.getRight())
                ).forEach(e -> {
                    if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload
                            || atLeastOneBundleSelected.isFalse()) {
                        selectedBundlesCache.put(broker, e.getLeft());
                        trafficMarkedToOffload.add(e.getRight());
                        atLeastOneBundleSelected.setTrue();
                    }
                });
    }

从代码里看的出来就是在一个备选集合中根据各种阈值和判断条件筛选出需要卸载的 bundle。



SimpleLoadManagerImpl
的实现如下:

synchronized (currentLoadReports) {
	for (Map.Entry<ResourceUnit, LoadReport> entry : currentLoadReports.entrySet()) {
		ResourceUnit overloadedRU = entry.getKey();
		LoadReport lr = entry.getValue();
		// 所有数据做一个简单的筛选,超过阈值的数据需要被 unload
		if (isAboveLoadLevel(lr.getSystemResourceUsage(), overloadThreshold)) {
			ResourceType bottleneckResourceType = lr.getBottleneckResourceType();
			Map<String, NamespaceBundleStats> bundleStats = lr.getSortedBundleStats(bottleneckResourceType);
			if (bundleStats == null) {
				log.warn("Null bundle stats for bundle {}", lr.getName());
				continue;

			}

就是很简单的通过将判断节点的负载是否超过了阈值
isAboveLoadLevel
,然后做一个简单的排序就返回了。

从这里也看得出来
SimpleLoadManagerImpl

ModularLoadManager
的区别,
SimpleLoadManagerImpl
更简单,并没有提供多个
doLoadShedding
的筛选实现。

总结

总的来说对于无状态的服务来说,理论上我们只需要做好负载算法即可(轮训、一致性哈希、低负载优先等)就可以很好的平衡各个节点之间的负载。

而对于有状态的服务来说,负载均衡就是将负载较高节点中的数据转移到负载低的节点中。

其中的关键就是需要存储各个节点的负载数据(业界常用的是存储到 zookeeper 中),然后再由一个 leader 节点从这些节点中根据某种负载算法选择出负载较高的节点以及负载较低的节点,最终把数据迁移过去即可。

需求场景

按着惯例,还是以一个应用场景作为代理模式的切入点。现在有一个订单系统,要求是:一旦订单被创建,只有订单的创建人才可以修改订单中的数据,其他人则不能修改。

基本实现思路

按着最直白的思路,就是查询数据库中订单的创建人和当前Session中的登录账号ID是否一致。

classOrder {privateString orderId;private String creatorId; //订单创建者的ID
    private String details; //订单详情//省略其他属性和getter/setter方法
 
    publicOrder(String orderId, String creatorId, String details) {this.orderId =orderId;this.creatorId =creatorId;this.details =details;
}
//其他业务逻辑... }

系统修改

classOrderService {private Map<String, Order> orders = new HashMap<>();//创建订单
    public voidcreateOrder(String orderId, String creatorId, String details) {
Order order
= newOrder(orderId, creatorId, details);
orders.put(orderId, order);
}
//修改订单 public voidmodifyOrder(String orderId, String userId, String newDetails) {
Order order
=orders.get(orderId);if (order != null) {//检查是否拥有权限 if(order.getCreateId().equals(userId)){
order.setDetails(newDetails);
}
else{
System.out.println(
"权限不足.");
}
}
else{
System.out.println(
"订单不存在.");
}
}
//其他业务逻辑... }

该思路的问题

上述代码其实本身是没有问题的,也是Web贫血模式的常见实现思路,即在Service中通过大量的if else进行完成,如果非说问题的话,就是随着对于订单的操作越多Service代码会越发膨胀,例如,需求一开始是只要求改描述,下次又要求更改名称,下下次对于权限又细分等等,Service的modifyOrder就会增加很多的if else和set方法 ,扩展和维护十分的不优雅。或许下面的代理模式能提供一些能够优雅解决的新思路。

代理模式

代理模式的核心定义是:为其他对象提供一种代理以此来控制对这个对象的访问。代理模式是以对象组合的方式对对象进行保护或者说功能扩展的一种方式。

代理模式结构

Sunject
:目标接口,定义目标对象的具体操作。

Proxy
:代理对象,实现与具体的目标对象一样的接口,这样就可以代理具体的目标对象。保存一个指向具体目标对象的引用,可以再需要的时候调用具体的目标对象,调用目标对象时进行控制和保护。

RealSubject
:具体的目标对象,真正实现目标接口要求的功能

//定义真实主题角色接口
interfaceImage {voiddisplay();
}
//实现真实主题角色 class RealImage implementsImage {privateString fileName;publicRealImage(String fileName) {this.fileName =fileName;
loadFromDisk(fileName);
}

@Override
public voiddisplay() {
System.out.println(
"Displaying " +fileName);
}
//模拟从磁盘加载图片资源 private voidloadFromDisk(String fileName) {
System.out.println(
"Loading " + fileName + " from disk.");
}
}
//定义代理主题角色 interface Proxy extendsImage {voiddisplay();
}
//实现代理主题角色 class ProxyImage implementsProxy {privateRealImage realImage;publicProxyImage(String fileName) {//延迟加载RealImage对象 this.realImage = null;
}

@Override
public voiddisplay() {if (realImage == null) {
realImage
= new RealImage("image.png");
}
realImage.display();
}
}
public classProxyPatternDemo {public static voidmain(String[] args) {
Proxy proxy
= new ProxyImage("image.png");
proxy.display();
}
}

代理模式实现案例

相当于现在如果有了一个订单对象实例,那么就需要控制外部对它的访问,满足条件的可以访问,不满足条件的就不能访问。使用代理模式来实现就是需要在Order对象之外再包一层对象,用于操作权限控制。本质上是一种保护代理思路。

首先创建一个订单的操作接口

public interfaceOrderApi {
String getId();
String getName();
String getDetails();
String getCreatorId();
voidsetId(String id);voidsetDetails(String details);voidsetName(String name);voidsetCreatorId(String creatorId);
}

一个基本的订单实体类作为目标代理对象

class Order implementsOrderApi {privateString id;privateString name;privateString details;privateString creatorId;publicOrder(String id, String name, String details, String creatorId) {this.id =id;this.name =name;this.details =details;this.creatorId =creatorId;
}

@Override
publicString getId() {returnid;
}

@Override
publicString getName() {returnname;
}

@Override
publicString getDetails() {returndetails;
}

@Override
publicString getCreatorId() {returncreatorId;
}

@Override
public voidsetId(String id) {this.id =id;
}

@Override
public voidsetName(String name) {this.name =name;
}

@Override
public voidsetCreatorId(String creatorId) {this.creatorId =creatorId;
}

@Override
public voidsetDetails(String details) {this.details=details;
}
}

实现一个代理对象

class OrderProxy implementsOrderApi {privateOrder order;publicOrderProxy(Order order) {this.order =order;
}

@Override
publicString getId() {returnorder.getIdO();
}

@Override
publicString getName() {returnorder.getNameO();
}

@Override
publicString getDetails() {returnorder.getDetailsO();
}

@Override
publicString getCreatorId() {returnorder.getCreatorIdO();
}

@Override
public voidsetId(String id) {//在这里添加权限检查逻辑 if(isCreator()) {
order.setId(id);
}
else{throw new SecurityException("Only the creator can change the order ID.");
}
}

@Override
public voidsetName(String name) {//在这里添加权限检查逻辑 if(isCreator()) {
order.setName(name);
}
else{throw new SecurityException("Only the creator can change the order name.");
}
}

@Override
public voidsetCreatorId(String creatorId) {//创建者ID通常不允许更改 throw new UnsupportedOperationException("Changing creator ID is not allowed.");
}
private booleanisCreator(String userId) {//这里应该添加检查userId是否是订单的创建者//为了示例简单,这里假设userId总是传入正确的,返回true return true;
}
}

代理模式的理解

特点与分类

代理模式在客户和被客户访问的对象之间,引入了一定程度的间接性,客户是直接使用代理,让代理来与被访问的对象进行交互。不同的代理类型,这种附加的间接性有不同的用途,也就具有不同的特点。

  • 远程代理:隐藏了一个对象存在于不同的地址空间的事实,也即是客户通过远程代理去访问一个对象,根本就不关心这个对象在哪里,也不关心如何通过网络去访问到这个对象。从客户的角度来讲,它只是在使用代理对象而已。
  • 虚代理:可以根据需要来创建“大”对象,只有到必须创建对象的时候,虚代理才会创建对象,从而大大加快程序运行速度,并节省资源。通过虚代理可以对系统进行优化。
  • 保护代理:可以在访问一个对象的前后,执行很多附加的操作,除了进行权限控制之外,还可以进行很多跟业务相关的处理,而不需要修改被代理的对象。也就是说,可以通过代理来给目标对象增加功能。
  • 智能指引:和保护代理类似,也是允许在访问一个对象的前后,执行很多附加的操作,这样一来就可以做很多额外的事情,比如,引用计数等。

在这些代理类型中,最常见的是保护代理和远程代理。上述的例子就是一个典型的保护代理的实现,即具体订单的操作是不变的,如果需要对订单的操作进行特殊处理,一切变动皆集中在代理对象中,代理对象对于订单对象起到了保护隔离的作用,同时代码层面上也承载了“频繁变化”的需求内容,将“变化”隔离出来,对于后续的需求扩展也是十分有效。

建议在如下情况中选用代理模式。

  • 需要为一个对象在不同的地址空间提供局部代表的时候,可以使用远程代理。
  • 需要按照需要创建开销很大的对象的时候,可以使用虚代理。
  • 需要控制对原始对象的访问的时候,可以使用保护代理。
  • 需要在访问对象执行一些附加操作的时候,可以使用智能指引代理。

具体目标和代理间的关系

Java中代理模式的应用

Java 对代理模式提供了内建的支持,在java.lang,refect 包下面,提供了一个 Proxy的类和一个InvocationHandler 的接口。
通常把前面自己实现的代理模式称为 Java 的静态代理。这种实现方式有一个较大的缺点,就是如果Subject接口发生变化,那么代理类和具体的目标实现都要变化,不是很灵活。而使用Java内建的对代理模式支持的功能来实现则没有这个问题。
通常把使用 Java 内建的对代理模式支持的功能来实现的代理称为Java的动态代理,动态代理跟静态代理相比,明显的变化是:静态代理实现的时候,在Subject接口上定义很多的方法,代理类里面自然也要实现很多方法:而动态代理实现的时候,虽然Subicct接口上定义了很多方法,但是动态代理类始终只有一个invoke 方法。这样,当Subject接口发生变化的时候,动态代理的接口就不需要跟着变化了。
Java的动态代理目前只能代理接口,基本的实现是依靠Java的反射机制和动态生成class的技术,来动态生成被代理的接口的实现对象。具体的内部实现细节这里不去讨论。如果要实现类的代理,可以使用cglib(一个开源的Code Generation Library)。

还是来看看示例,那就修改上面保护代理的示例,看看如何使用Java的动态代理来实现同样的功能。
(1)订单接口的定义是完全一样的,就不再赘述了。
(2)订单对象的实现,只是添加了一个 toString,,以方便测试输出,这里也不去示例了。在前面的示例中,toString已实现在代理类里面了。
(3)直接看看代理类的实现,大致有如下变化。

  • 要实现InvocationHandler接口。
  • 需要提供一个方法来实现:把具体的目标对象和动态代理绑定起来,并在绑定好过后,返回被代理的目标对象的接口,以利于客户端的操作。
  • 需要实现 invoke 方法,在这个方法里面,具体判断当前是在调用什么方法,需要如何处理
import java.lang.reflect.*;/*** 使用Java中的动态代理*/
public class DynamicProxy implementsInvocationHandler {//被代理的对象
    privateOrderApi order;/*** 获取绑定好代理和具体目标对象后的目标对象的接口
*
@paramorder 具体的订单对象,相当于具体目标对象
*
@return绑定好代理和具体目标对象后的目标对象的接口*/ publicOrderApi getProxyInterface(Order order) {//设置被代理的对象,方便invoke里面的操作 this.order =order;//把真正的订单对象和动态代理关联起来 OrderApi orderApi =(OrderApi) Proxy.newProxyInstance(
order.getClass().getClassLoader(),
order.getClass().getInterfaces(),
this);returnorderApi;
}
public Object invoke(Object proxy, Method method, Object[] args) throwsThrowable {//如果是调用setter方法就需要检查权限 if (method.getName().startsWith("set")) {//假设Order类有一个getOrderUser()方法来获取订单创建者的用户ID//并且order.getOrderUser().equals(args[0])来检查是否是创建者 if (order.getOrderUser() != null && order.getOrderUser().equals(args[0])) {//可以操作 returnmethod.invoke(order, args);
}
else{//如果不是创建者,不能修改 System.out.println("对不起," + args[0] + ",您无权修改本订单中的数据");
}
}
else{//不是调用的setter方法就继续运行 returnmethod.invoke(order, args);
}
return null;
}
}

使用规则

public classClient {public static voidmain(String[] args) {//张三先登录系统创建了一个订单
        Order order = new Order("XXX", 100, "张三");//创建一个动态代理
        DynamicProxy dynamicProxy = newDynamicProxy();//然后把订单和动态代理关联起来
        OrderApi orderApi =dynamicProxy.getProxyInterface(order);//以下就需要使用被代理过的接口来操作了//李四想要来修改,那就会报错
        orderApi.setOrderNum(123, "李四");//输出order
        System.out.println("李四修改后订单记录没有变化:" +orderApi);//张三修改就不会有问题
        orderApi.setOrderNum(123, "张三");//再次输出order
        System.out.println("张三修改后,订单记录:" +orderApi);
}
}

代理在Java中的使用十分常见,例如Spring中的AOP,其本质就是代理模式