2024年1月

全面介绍CUDA与pytorch cuda实战

关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研发经验、团队管理经验,同济本复旦硕,复旦机器人智能实验室成员,阿里云认证的资深架构师,项目管理专业人士,上亿营收AI产品研发负责人

file

一、CUDA:定义与演进

CUDA(Compute Unified Device Architecture)是由NVIDIA开发的一个并行计算平台和应用编程接口(API)模型。它允许开发者使用NVIDIA的GPU进行高效的并行计算,从而加速计算密集型任务。在这一节中,我们将详细探讨CUDA的定义和其演进过程,重点关注其关键的技术更新和里程碑。

CUDA的定义

file
CUDA是一种允许软件开发者和软件工程师直接访问虚拟指令集和并行计算元素的平台和编程模型。它包括CUDA指令集架构(ISA)和并行计算引擎在GPU上的实现。CUDA平台是为了利用GPU的强大计算能力而设计,特别适合处理可以并行化的大规模数据计算任务。

CUDA的演进历程

CUDA的诞生

  • 2006年:CUDA的初现
    • NVIDIA在2006年发布了CUDA,这标志着GPU计算的一个重大突破。在这之前,GPU主要被用于图形渲染。

CUDA的早期版本

  • CUDA 1.0(2007年)
    • 这是CUDA的首个公开可用版本,为开发者提供了一套全新的工具和API,用于编写GPU加速程序。
  • CUDA 2.0(2008年)
    • 引入了对双精度浮点运算的支持,这对科学计算尤为重要。

CUDA的持续发展

  • CUDA 3.0(2010年)和CUDA 4.0(2011年)
    • 引入了多项改进,包括对更多GPU架构的支持和更高效的内存管理。CUDA 4.0特别强调了对多GPU系统的支持,允许更加灵活的数据共享和任务分配。

CUDA的成熟期

  • CUDA 5.0(2012年)到CUDA 8.0(2016年)
    • 这一时期CUDA的更新聚焦于提高性能、增强易用性和扩展其编程模型。引入了动态并行性,允许GPU线程自动启动新的核函数,极大地增强了程序的灵活性和并行处理能力。

CUDA的现代版本

  • CUDA 9.0(2017年)到CUDA 11.0(2020年)
    • 这些版本继续推动CUDA的性能和功能边界。加入了对最新GPU架构的支持,如Volta和Ampere架构,以及改进的编译器和更丰富的库函数。CUDA 11特别重视对大规模数据集和AI模型的支持,以及增强的异构计算能力。

每个CUDA版本的发布都是对NVIDIA在并行计算领域技术革新的体现。从早期的基础设施搭建到后来的性能优化和功能扩展,CUDA的发展历程展示了GPU计算技术的成熟和深入应用。在深度学习和高性能计算领域,CUDA已成为一个不可或缺的工具,它不断推动着计算极限的扩展。

通过对CUDA定义的理解和其演进历程的回顾,我们可以清楚地看到CUDA如何从一个初步的概念发展成为今天广泛应用的高性能计算平台。每一次更新都反映了市场需求的变化和技术的进步,使CUDA成为了处理并行计算任务的首选工具。

二、CUDA与传统CPU计算的对比

在深入理解CUDA的价值之前,将其与传统的CPU计算进行比较是非常有帮助的。这一章节旨在详细探讨GPU(由CUDA驱动)与CPU在架构、性能和应用场景上的主要差异,以及这些差异如何影响它们在不同计算任务中的表现。

架构差异

CPU:多功能性与复杂指令集

  • 设计理念

    • CPU设计注重通用性和灵活性,适合处理复杂的、串行的计算任务。
  • 核心结构

    • CPU通常包含较少的核心,但每个核心能够处理复杂任务和多任务并发。

GPU:并行性能优化

  • 设计理念

    • GPU设计重点在于处理大量的并行任务,适合执行重复且简单的操作。
  • 核心结构

    • GPU包含成百上千的小核心,每个核心专注于执行单一任务,但在并行处理大量数据时表现卓越。

性能对比

处理速度

  • CPU

    • 在执行逻辑复杂、依赖于单线程性能的任务时,CPU通常表现更优。
  • GPU

    • GPU在处理可以并行化的大规模数据时,如图像处理、科学计算,表现出远超CPU的处理速度。

能效比

  • CPU

    • 在单线程任务中,CPU提供更高的能效比。
  • GPU

    • 当任务可以并行化时,GPU在能效比上通常更有优势,尤其是在大规模计算任务中。

应用场景

CPU的优势场景

  • 复杂逻辑处理

    • 适合处理需要复杂决策树和分支预测的任务,如数据库查询、服务器应用等。
  • 单线程性能要求高的任务

    • 在需要强大单线程性能的应用中,如某些类型的游戏或应用程序。

GPU的优势场景

  • 数据并行处理

    • 在需要同时处理大量数据的场景下,如深度学习、大规模图像或视频处理。
  • 高吞吐量计算任务

    • 适用于需要高吞吐量计算的应用,如科学模拟、天气预测等。

了解CPU和GPU的这些关键差异,可以帮助开发者更好地决定何时使用CPU,何时又应转向GPU加速。在现代计算领域,结合CPU和GPU的优势,实现异构计算,已成为提高应用性能的重要策略。CUDA的出现使得原本只能由CPU处理的复杂任务现在可以借助GPU的强大并行处理能力得到加速。

总体来说,CPU与GPU(CUDA)在架构和性能上的差异决定了它们在不同计算任务中的适用性。CPU更适合处理复杂的、依赖于单线程性能的任务,而GPU则在处理大量并行数据时表现出色。

三、CUDA在深度学习中的应用

深度学习的迅速发展与CUDA技术的应用密不可分。这一章节将探讨为什么CUDA特别适合于深度学习应用,以及它在此领域中的主要应用场景。

CUDA与深度学习:为何完美契合

并行处理能力

  • 数据并行性

    • 深度学习模型,特别是神经网络,需要处理大量数据。CUDA提供的并行处理能力使得这些计算可以同时进行,大幅提高效率。
  • 矩阵运算加速

    • 神经网络的训练涉及大量的矩阵运算(如矩阵乘法)。GPU的并行架构非常适合这种类型的计算。

高吞吐量

  • 快速处理大型数据集

    • 在深度学习中处理大型数据集时,GPU能够提供远高于CPU的吞吐量,加快模型训练和推理过程。

动态资源分配

  • 灵活的资源管理

    • CUDA允许动态分配和管理GPU资源,使得深度学习模型训练更为高效。

深度学习中的CUDA应用场景

模型训练

  • 加速训练过程

    • 在训练阶段,CUDA可以显著减少模型对数据的训练时间,尤其是在大规模神经网络和复杂数据集的情况下。
  • 支持大型模型

    • CUDA使得训练大型模型成为可能,因为它能够有效处理和存储巨大的网络权重和数据集。

模型推理

  • 实时数据处理

    • 在推理阶段,CUDA加速了数据的处理速度,使得模型能够快速响应,适用于需要实时反馈的应用,如自动驾驶车辆的视觉系统。
  • 高效资源利用

    • 在边缘计算设备上,CUDA可以提供高效的计算,使得在资源受限的环境下进行复杂的深度学习推理成为可能。

数据预处理

  • 加速数据加载和转换

    • 在准备训练数据时,CUDA可以用于快速加载和转换大量的输入数据,如图像或视频内容的预处理。

研究与开发

  • 实验和原型快速迭代

    • CUDA的高效计算能力使研究人员和开发者能够快速测试新的模型架构和训练策略,加速研究和产品开发的进程。

CUDA在深度学习中的应用不仅加速了模型的训练和推理过程,而且推动了整个领域的发展。它使得更复杂、更精确的模型成为可能,同时降低了处理大规模数据集所需的时间和资源。此外,CUDA的普及也促进了深度学习技术的民主化,使得更多的研究者和开发者能够访问到高效的计算资源。

总的来说,CUDA在深度学习中的应用极大地加速了模型的训练和推理过程,使得处理复杂和大规模数据集成为可能。

四、CUDA编程实例

在本章中,我们将通过一个具体的CUDA编程实例来展示如何在PyTorch环境中利用CUDA进行高效的并行计算。这个实例将聚焦于深度学习中的一个常见任务:矩阵乘法。我们将展示如何使用PyTorch和CUDA来加速这一计算密集型操作,并提供深入的技术洞见和细节。

选择矩阵乘法作为示例

矩阵乘法是深度学习和科学计算中常见的计算任务,它非常适合并行化处理。在GPU上执行矩阵乘法可以显著加速计算过程,是理解CUDA加速的理想案例。

环境准备

在开始之前,确保你的环境中安装了PyTorch,并且支持CUDA。你可以通过以下命令进行检查:

import torch
print(torch.__version__)
print('CUDA available:', torch.cuda.is_available())

这段代码会输出PyTorch的版本并检查CUDA是否可用。

示例:加速矩阵乘法

以下是一个使用PyTorch进行矩阵乘法的示例,我们将比较CPU和GPU(CUDA)上的执行时间。

准备数据

首先,我们创建两个大型随机矩阵:

import torch
import time

# 确保CUDA可用
assert torch.cuda.is_available()

# 创建两个大型矩阵
size = 1000
a = torch.rand(size, size)
b = torch.rand(size, size)

在CPU上进行矩阵乘法

接下来,我们在CPU上执行矩阵乘法,并测量时间:

start_time = time.time()
c = torch.matmul(a, b)
end_time = time.time()

print("CPU time: {:.5f} seconds".format(end_time - start_time))

在GPU上进行矩阵乘法

现在,我们将相同的操作转移到GPU上,并比较时间:

# 将数据移动到GPU
a_cuda = a.cuda()
b_cuda = b.cuda()

# 在GPU上执行矩阵乘法
start_time = time.time()
c_cuda = torch.matmul(a_cuda, b_cuda)
end_time = time.time()

# 将结果移回CPU
c_cpu = c_cuda.cpu()

print("GPU time: {:.5f} seconds".format(end_time - start_time))

在这个示例中,你会注意到使用GPU进行矩阵乘法通常比CPU快得多。这是因为GPU可以同时处理大量的运算任务,而CPU在执行这些任务时则是顺序的。

深入理解

数据传输的重要性

在使用CUDA进行计算时,数据传输是一个重要的考虑因素。在我们的例子中,我们首先将数据从CPU内存传输到GPU内存。这一过程虽然有一定的时间开销,但对于大规模的计算任务来说,这种开销是值得的。

并行处理的潜力

GPU的并行处理能力使得它在处理类似矩阵乘法这样的操作时极为高效。在深度学习中,这种能力可以被用来加速网络的训练和推理过程。

优化策略

为了最大化GPU的使用效率,合理的优化策略包括精细控制线程布局、合理使用共享内存等。在更复杂的应用中,这些优化可以带来显著的性能提升。

五、PyTorch CUDA深度学习案例实战

在本章节中,我们将通过一个实际的深度学习项目来展示如何在PyTorch中结合使用CUDA。我们选择了一个经典的深度学习任务——图像分类,使用CIFAR-10数据集。此案例将详细介绍从数据加载、模型构建、训练到评估的整个流程,并展示如何利用CUDA加速这个过程。

环境设置

首先,确保你的环境已经安装了PyTorch,并支持CUDA。可以通过以下代码来检查:

import torch

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

如果输出显示CUDA可用,则可以继续。

CIFAR-10数据加载

CIFAR-10是一个常用的图像分类数据集,包含10个类别的60000张32x32彩色图像。

加载数据集

使用PyTorch提供的工具来加载和归一化CIFAR-10:

import torch
import torchvision
import torchvision.transforms as transforms

# 数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# 加载训练集
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4, shuffle=True, num_workers=2)

# 加载测试集
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

构建神经网络

接下来,我们定义一个简单的卷积神经网络(CNN):

import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

net = Net()

CUDA加速

将网络转移到CUDA上:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

训练网络

使用CUDA加速训练过程:

import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

for epoch in range(2):  # 多次循环遍历数据集
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 2000 == 1999:    # 每2000个小批次打印一次
            print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0

print('Finished Training')

测试网络

最后,我们在测试集上评估网络性能:

correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))

关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研发经验、团队管理经验,同济本复旦硕,复旦机器人智能实验室成员,阿里云认证的资深架构师,项目管理专业人士,上亿营收AI产品研发负责人
如有帮助,请多关注
TeahLead KrisChang,10+年的互联网和人工智能从业经验,10年+技术和业务团队管理经验,同济软件工程本科,复旦工程管理硕士,阿里云认证云服务资深架构师,上亿营收AI产品业务负责人。

我十几年年前的thinkpad 通过更换cpu复活了:
联想ThinkPad E430c i3变i7:笔记本电脑CPU升级思路—CPU参数

为什么现在的市面上可以更换cpu的笔记本非常稀少呢?

intel移动CPU1-3代,CPU是和台式机一样的可以更换的。4代以后都是焊死的

什么样的笔记本可以更换CPU

查看封装方式:

  • PGA封装方式的处理器是可以升级的

  • BGA封装是直接焊在主板上面的
    ,所以不可更换。

阿斯顿发

PGA与BGA

  • PGA
    :在三种封装中体积最大,但是更换方便,而且更换的操作失误要求低。

  • BGA
    :三种封装中体积最小,但是更换接近于0,同时由于封装工艺问题,BGA的触点如果在封装过程中没有对准或者结合,极有可能意味着报废,所以相比于LGA,PGA成品率更低。

PGA

PGA的全称叫做“pin grid array”,或者叫“插针网格阵列封装”。

PGA的全称叫做“pin grid array”,或者叫“插针网格阵列封装”

主流的AMD桌面CPU,老的酷睿移动MQ系列基本都采用了PGA封装方式。如:Intel 775以前的大部分桌面处理器以及大部分以M,MQ结尾的移动处理器;AMD几乎全部的家用桌面处理器。

插座是个很讨厌的东西,按厂商讨厌它的强度排序有以下几个原因:

  • 太厚

  • 太大

  • 机械强度不够

  • 电磁特性烂

  • 会导致消费者自行升级影响高端型号销售

  • 兼容性测试和可靠性测试多了项目

  • 消费者会因为兼容性问题找麻烦

反正这是个对厂商来说有百害而无一利的东西。自然不会用。

BGA

BGA的全称叫做“ball grid array”,或者叫“球柵网格阵列封装”。

BGA的全称叫做“ball grid array”,或者叫“球柵网格阵列封装”

目前绝大部分的Intel笔记本CPU和智能手机CPU都使用了这种封装方式。比如,Intel所有以H,HQ,U,Y等结尾,包括但不限低压的处理器;AMD低压移动处理器;所有的手机处理器等。

现在cpu从i3后都是使用BGA封装。那么笔记本厂商为什么不使用台式cpu装入笔记本呢?

桌面CPU的笔记本

可以看下未来人类这个品牌,很多本子是台式机cpu的!

未来人类是台湾蓝天 Clevo 电脑在中国大陆授权的贴牌品牌。

但是把台式cpu装入笔记本后:重量可能会达到3kg,简直就是「习武之人」的最爱。

其次是卖相,有点像俄罗斯的傻大黑粗的感觉!

比如:的微星GT76 Titan搭载了英特尔酷睿i9-9900K台式机处理器

11级热管、桌面处理器:msi 微星 发布新款游戏笔记本 GT76 Titan

其重量约为4.4公斤,具体可以看:
https://www.msi.com/Laptop/GT76-Titan-9SX/Specification

关键是太贵呀
!三四万软妹币,像我这种屌丝程序员,表示买不起!

稍微有点B格的牌子,架构都够买2台同样的台式机配置


转载
本站
文章《
笔记本为什么不出可升级CPU的,用台式CPU不行吗?
》,
请注明出处:
https://www.zhoulujun.cn/html/OS/Windows/WindowsTips/2023_1224_9001.html

虚拟线程是在 Java 21 版本中实现的一种轻量级线程。它由 JVM 进行创建以及管理。虚拟线程和传统线程(我们称之为平台线程)之间的主要区别在于,我们可以轻松地在一个 Java 程序中运行大量、甚至数百万个虚拟线程。

由于虚拟线程的数量众多,也就赋予了 Java 程序强大的力量。虚拟线程适合用来处理大量请求,它们可以更有效地运行 “一个请求一个线程” 模型编写的 web 应用程序,可以提高吞吐量以及减少硬件浪费。

由于虚拟线程是 java.lang.Thread 的实现,并且遵守自 Java SE 1.0 以来指定 java.lang.Thread 的相同规则,因此开发人员无需学习新概念即可使用它们。

但是虚拟线程才刚出来,对我们来说有一些陌生。由于 Java 历来版本中无法生成大量平台线程(多年来 Java 中唯一可用的线程实现),已经让程序员养成了一套关于平台线程的使用习惯。这些习惯做法在应用于虚拟线程时会适得其反,我们需要摒弃。

此外虚拟线程和平台线程在创建成本上的巨大差异,也提供了一种新的关于线程使用的方式。Java 的设计者鼓励使用虚拟线程而不必担心虚拟线程的创建成本。

本文无意全面涵盖虚拟线程的每个重要细节,目的是给大家使用虚拟线程提供一套使用指南,帮助大家能更好使用的虚拟线程,发挥其作用并避免踩坑。

本文完整大纲如下,

image

使用信号量限制并发

在某些场景下,我们需要限制某个操作的并发数。例如某些外部服务可能无法同时处理超过 10 个并发请求。

由于平台线程是一种宝贵的资源,通常在线程池中进行管理,因此线程池的使用对于如今的程序员相当普遍。

比如上面例子要限制并发请求数,某些人会使用线程池来处理,代码如下,

ExecutorService es = Executors.newFixedThreadPool(10);
...
Result foo() {
    try {
        var fut = es.submit(() -> callLimitedService());
        return f.get();
    } catch (...) { ... }
}

上面代码示例可以确保外部服务最多只有 10 个并发请求,因为我们的线程池中只有最多 10 个线程。

限制并发只是使用线程池的副产品。线程池旨在共享稀缺资源,而虚拟线程并不稀缺,因此永远不应该池化虚拟线程!

使用虚拟线程时,如果要限制访问某些服务的并发请求,则应该使用专门为此目的设计的 Semaphore 类。示例代码如下,

Semaphore sem = new Semaphore(10);
...
Result foo() {
    sem.acquire();
    try {
        return callLimitedService();
    } finally {
        sem.release();
    }
}

在这个示例中,同一时刻只有 10 个虚拟线程可以进入 foo() 方法取得锁,而其他虚拟线程将会被阻塞。

简单地使用信号量阻塞某些虚拟线程可能看起来与将任务提交到固定数量线程池有很大不同,但事实并非如此。

将任务提交到等待任务池会将它们排队处理,信号量在内部(或任何其他阻塞同步构造)构造了一个阻塞线程队列,这些任务在阻塞线程队列上也会进行排队处理。

image

我们可以将平台线程池认作是从等待任务队列中提取任务进行处理的工作人员,然后将虚拟线程视为任务本身,在任务或者线程可以执行之前将会被阻塞,但它们在计算机中的底层表示上实际是相同的。

这里想告诉大家的就是不管是线程池的任务排队,还是信号量内部的线程阻塞,它们之间是由等效性的。在虚拟线程某些需要限制并发数场景下,直接使用信号量即可。

不要在线程局部变量中缓存可重用对象

虚拟线程支持线程局部变量,就像平台线程一样。通常线程局部变量用于将一些特定于上下文的信息与当前运行的代码关联起来,例如当前事务和用户 ID。

对于虚拟线程来说,使用线程局部变量是完全合理的。但是如果考虑更安全、更有效的线程局部变量,可以使用 Scoped Values。

更多有关 Scoped Values 介绍,请参阅
https://docs.oracle.com/en/java/javase/21/core/scoped-values.html#GUID-9A4565C5-82AE-4F03-A476-3EAA9CDEB0F6

线程局部变量有一种用途与虚拟线程是不太适合的,那就是缓存可重用对象。

可重用对象的创建成本通常很高,通常消耗大量内存且可变,还不是线程安全的。它们被缓存在线程局部变量中,以减少它们实例化的次数以及它们在内存中的实例数量,好处是它们可以被线程上不同时间运行的多个任务重用,避免昂贵对象的重复创建。

例如 SimpleDateFormat 的实例创建成本很高,而且不是线程安全的。为了解决创建成本、线程不安全问题,通常是将此类实例缓存在 ThreadLocal 中,如下例所示:

static final ThreadLocal<SimpleDateFormat> cachedFormatter =
       ThreadLocal.withInitial(SimpleDateFormat::new);

void foo() {
  ...
	cachedFormatter.get().format(...);
	...
}

仅当线程(以及因此在线程本地缓存的昂贵对象)被多个任务共享和重用时(就像平台线程被池化时的情况一样),这种缓存才有用。许多任务在线程池中运行时可能会调用 foo,但由于池中仅包含几个线程,因此该对象只会被实例化几次(每个池线程一次)并被缓存和重用。

但是虚拟线程永远不会被池化,也不会被不相关的任务重用。因为每个任务都有自己的虚拟线程,所以每次从不同任务调用 foo 都会触发新 SimpleDateFormat 的实例化。而且由于可能有大量的虚拟线程同时运行,昂贵的对象可能会消耗相当多的内存。这些结果与线程本地缓存想要实现的结果恰恰相反。

对于线程局部变量缓存可重用对象的问题,没有什么好的通用替代方案,但对于 SimpleDateFormat,我们应该将其替换为 DateTimeFormatter。DateTimeFormatter 是不可变的,因此单个实例就可以由所有线程共享,

static final DateTimeFormatter formatter = DateTimeFormatter….;

void foo() {
  ...
	formatter.format(...);
	...
}

需要注意的是,使用线程局部变量来缓存共享的昂贵对象有时是由一些异步框架在幕后完成的,其隐含的假设是这些可重用对象只会由极少数池线程使用。

所以混合虚拟线程和异步框架一起使用可能不是一个好主意,对某些方法的调用可能会导致可重用对象被重复创建。

避免长时间和频繁的 synchronized

当前虚拟线程实现由一个限制是,在同步块或方法内执行 synchronized 阻塞操作会导致 JDK 的虚拟线程调度程序阻塞宝贵的操作系统线程,而如果阻塞操作是在同步块或方法外完成的,则不会被阻塞。我们称这种情况为 “Pinning”。

如果阻塞操作既长期又频繁,则 “Pinning” 可能会对服务器的吞吐量产生不利影响。如果阻塞操作短暂(例如内存中操作)或不频繁则可能不会产生不利影响。

为了检测可能有害的 “Pinning” 实例,(JDK Flight Recorder (JFR) 在 “Pinning” 阻塞时间超过 20 毫秒时,会发出 jdk.VirtualThreadPinned 事件。

或者我们可以使用系统属性 jdk.tracePinnedThreads 在线程被 “Pinning” 阻塞时发出堆栈跟踪。

启动 Java 程序时添加 -Djdk.tracePinnedThreads=full 运行,会在线程被 “Pinning” 阻塞时打印完整的堆栈跟踪,突出显示本机帧和持有监视器的帧。使用 -Djdk.tracePinnedThreads=short 运行,会将输出限制为仅有问题的帧。

如果这些机制检测到既长期又频繁 “Pinning” 的地方,请在这些特定地方将 synchronized 替换为 ReentrantLock。以下是长期且频繁使用 synchronized 的示例,

synchronized(lockObj) {
    frequentIO();
}

我们可以将其替换为以下内容:

lock.lock();
try {
    frequentIO();
} finally {
    lock.unlock();
}

参考资料

最后说两句

针对虚拟线程的使用,相信大家心里已经有了答案。在对虚拟线程需要限制并发数的场景,使用信号量即可。在虚拟线程中使用线程局部变量时要注意避免缓存昂贵的可重用对象。对于使用到 synchronized 同步块或者方法的虚拟线程,建议替换为 ReentrantLock,避免影响吞吐量。

关注公众号【waynblog】每周分享技术干货、开源项目、实战经验、国外优质文章翻译等,您的关注将是我的更新动力!

在上一篇
Pulsar3.0新功能介绍
中提到,在升级到 3.0 的过程中碰到一个致命的问题,就是升级之后 topic 被删除了。

正好最近社区也补充了相关细节,本次也接着这个机会再次复盘一下,毕竟这是一个非常致命的 Bug。

现象

先来回顾下当时的情况:升级当晚没有出现啥问题,各个流量指标、生产者、消费者数量都是在正常范围内波动。

事后才知道,因为只是删除了很少一部分的 topic,所以从监控中反应不出来。

早上上班后陆续有部分业务反馈应用连不上 topic,提示
topic nof found
.

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'Producer': Invocation of init method failed; nested exception is org.apache.pulsar.client.api.PulsarClientException$TopicDoesNotExistException: Topic Not Found.

因为只是部分应用在反馈,所以起初怀疑是 broker 升级之后导致老版本的 pulsar-client 存在兼容性问题。

所以我就拿了平时测试用的 topic 再配合多个老版本的 sdk 进行测试,发现没有问题。

直到这一步还好,至少证明是小范故障。

因为提示的是 topic 不存在,所以就准备查一下 topic 的元数据是否正常。


查询后发现元数据是存在的。

之后我便想看看提示了 topic 不存在的 topic 的归属,然后再看看那个 broker 中是否有异常日志。


发现查看归属的接口也是提示 topic 不存在,此时我便怀疑是 topic 的负载出现了问题,导致这些 topic 没有绑定到具体的 broker。

于是便重启了 broker,结果依然没有解决问题。

之后我们查询了 topic 的 internal state 发现元数据中会少一个分区。

紧急恢复

我们尝试将这个分区数恢复后,发现这个 topic 就可以正常连接了。

于是再挑选了几个异常的 topic 发现都是同样的问题,恢复分区数之后也可以正常连接了。

所以我写了一个工具遍历了所有的 topic,检测分区数是否正常,不正常时便修复。

void checkPartition() {  
    String namespace = "tenant/ns";  
    List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);  
    for (String topic : topicList) {  
        PartitionedTopicStats stats = pulsarAdmin.topics().getPartitionedStats(topic, true);  
        int partitions = stats.getMetadata().partitions;  
        int size = stats.getPartitions().size();  
        if (partitions != size) {  
            log.info("topic={},partitions={},size={}", topic, partitions, size);  
            pulsarAdmin.topics().updatePartitionedTopic(topic, partitions);  
        }    
	}
}

排查

修复好所有 topic 之后便开始排查根因,因为看到的是元数据不一致所以怀疑是 zk 里的数据和 broker 内存中的数据不同导致的这个问题。

但我们查看了 zookeeper 中的数据发现一切又是正常的,所以只能转变思路。

之后我们通过有问题的 topic 在日志中找到了一个关键日志:


以及具体的堆栈。

此时具体的原因已经很明显了,元数据这些自然是没问题;根本原因是 topic 被删除了,但被删除的 topic 只是某个分区,所以我们在查询
internalState
时才发发现少一个 topic。

通过这个删除日志定位到具体的删除代码:

org.apache.pulsar.broker.service.persistent.PersistentTopic#checkReplication


原来是这里的
configuredClusters
值为空才导致的 topic 调用了
deleteForcefully()
被删除。

而这个值是从 topic 的 Policy 中获取的。

复现问题

通过上图中的堆栈跟踪,怀疑是重启 broker 导致的 topic unload ,同时 broker 又在构建 topic 导致了对 topicPolicy 的读写。

最终导致 topicPolicy 为空。

只要写个单测可以复现这个问题就好办了:

    @Test
    public void testCreateTopicAndUpdatePolicyConcurrent() throws Exception {

        final int topicNum = 100;
        final int partition = 10;

        // (1) Init topic
        admin.namespaces().createNamespace("public/retention");
        final String topicName = "persistent://public/retention/policy_with_broker_restart";
        for (int i = 0; i < topicNum; i++) {
            final String shadowTopicNames = topicName + "_" + i;
            admin.topics().createPartitionedTopic(shadowTopicNames, partition);
        }

        // (2) Set Policy
        for (int i = 90; i < 100; i++) {
            final String shadowTopicNames = topicName + "_" + i;
            CompletableFuture.runAsync(() -> {
                while (true) {
                    PublishRate publishRate = new PublishRate();
                    publishRate.publishThrottlingRateInMsg = 100;
                    try {
                        admin.topicPolicies().setPublishRate(shadowTopicNames, publishRate);
                    } catch (PulsarAdminException e) {
                    }
                }
            });
        }

        for (int i = 90; i < 100; i++) {
            final String shadowTopicNames = topicName + "_" + i;
            CompletableFuture.runAsync(() -> {
                while (true) {
                    try {
                        admin.lookups().lookupPartitionedTopic(shadowTopicNames);
                    } catch (Exception e) {
                    }
                }
            });
        }

        admin.namespaces().unload("public/retention");
        admin.namespaces().unload("public/retention");
        admin.namespaces().unload("public/retention");
        Thread.sleep(1000* 5);

        for (int i = 0; i < topicNum; i++) {
            final String shadowTopicNames = topicName + "_" + i;
            log.info("check topic: {}", shadowTopicNames);
            PartitionedTopicStats partitionedStats = admin.topics().getPartitionedStats(shadowTopicNames, true);
            Assert.assertEquals(partitionedStats.getPartitions().size(), partition);
        }

    }

同时还得查询元数据有耗时才能复现:

只能手动 sleep 模拟这个耗时

具体也可以参考这个 issue
https://github.com/apache/pulsar/issues/21653#issuecomment-1842962452

此时就会发现有 topic 被删除了,而且是随机删除的,因为出现并发的几率本身也是随机的。

这里画了一个流程图就比较清晰了,在 broker 重启的时候会有两个线程同时topicPolicy 进行操作。

在 thread3 读取 topicPolicy 进行判断时,thread2 可能还没有把数据准备好,所以就导致了 topic 被删除。

修复


既然知道了问题原因就好修复了,我们只需要把 thread3 和 thread2 修改为串行执行就好了。

这也是处理并发最简单高效的方法,就是直接避免并发;加锁、队列啥的虽然也可以解决,但代码复杂度也高了很多,所以能不并发就尽量不要并发。

但要把这个修复推送到社区上游主分支最好是要加上单测,这样即便是后续有其他的改动也能保证这个 bug 不会再次出现。

之后在社区大佬的帮助下完善了单测,最终合并了这个修复。

再次证明写单测往往比代码更复杂,也更花费时间。

PR:
https://github.com/apache/pulsar/pull/21704

使用修复镜像

因为社区合并代码再发版的周期较长,而我们又急于修复该问题;不然都不敢重启 broker,因为每重启一次都可能会导致不知道哪个 topic 就被删除了。

所以我们自己在本地构建了一个修复的镜像,准备在线上进行替换。


此时坑又来了,我们满怀信心的替换了一个镜像再观察日志发现居然还有删除的日志

生成二维码1(简单)

包引用:

<PackageReference Include="QRCoder" Version="1.4.3" />

using QRCoder;
        /// <summary>
        ///生成二维码/// </summary>
        /// <param name="data">escape后的数据,防止中文等特殊字符引起问题</param>
        /// <param name="size">二维码宽高</param>
        /// <param name="level">0.L (7%), 1.M (15%), 2.Q (25%) 3.H (30%)</param>
        /// <param name="border">是否包含白边</param>
        /// <returns></returns>
[HttpGet]public IActionResult Generate(string text, int size = 400, int level = 1, bool border = false)
{
if (string.IsNullOrEmpty(text))return BadRequest("内容不能为空.");//设置最小和最大尺寸50-1920 size = size < 50 ? 50 : size > 1920 ? 1920: size;

QRCodeGenerator qrGenerator
= new();
QRCodeData qrCodeData
=qrGenerator.CreateQrCode(text, (QRCodeGenerator.ECCLevel)level);int pixelsPerModule = size / 21; //size = pixelsPerModule * 21 using PngByteQRCode qrCode = new(qrCodeData);byte[] qrCodeBytes =qrCode.GetGraphic(pixelsPerModule, drawQuietZones: border);//将字节数组转换为Base64字符串 string base64Image =Convert.ToBase64String(qrCodeBytes);//返回Base64字符串作为图片的src属性值// return $"data:image/png;base64,{base64Image}"; return File(qrCodeBytes, "image/png", "QrCode.png");
}

优点
:生成的base64比较小

缺点
:不能控制白边的宽度

size大小需要是21的倍数,否则生成的二维码宽度<=指定的size大小

生成二维码2(自定义白边的宽度

引用包:

<PackageReference Include="SixLabors.ImageSharp" Version="3.1.2" />

usingQRCoder;usingSixLabors.ImageSharp;usingSixLabors.ImageSharp.Formats.Png;usingSixLabors.ImageSharp.PixelFormats;usingSixLabors.ImageSharp.Processing;usingWebApplication7.Models;using Color =SixLabors.ImageSharp.Color;using Image =SixLabors.ImageSharp.Image;using Point = SixLabors.ImageSharp.Point;
[HttpGet]
public IActionResult Generate2(string text, int size = 400, int level = 1, int borderWidth = 0)
{
if (string.IsNullOrEmpty(text))return BadRequest("内容不能为空.");//设置最小和最大尺寸50-1920 size = size < 50 ? 50 : size > 1920 ? 1920: size;

QRCodeGenerator qrGenerator
= new();
QRCodeData qrCodeData
=qrGenerator.CreateQrCode(text, (QRCodeGenerator.ECCLevel)level);using PngByteQRCode qrCode = new(qrCodeData);byte[] qrCodeBytes = qrCode.GetGraphic(50, drawQuietZones: false);using MemoryStream stream = new(qrCodeBytes);using Image<Rgba32> qrCodeImage = Image.Load<Rgba32>(stream);//调整图像大小 qrCodeImage.Mutate(ctx =>ctx.Resize(size, size));//添加空白来模拟白边 borderWidth = borderWidth <= 0 ? 50 : borderWidth; //调整白边的宽度 Image<Rgba32> borderedImage = new(qrCodeImage.Width + 2 * borderWidth, qrCodeImage.Height + 2 *borderWidth);
borderedImage.Mutate(ctx
=> ctx.BackgroundColor(Color.White).DrawImage(qrCodeImage, newPoint(borderWidth, borderWidth), 1f));//将调整大小后的图像保存为字节数组 using MemoryStream resizedStream = new();
borderedImage.Save(resizedStream,
newPngEncoder());byte[] resizedBytes =resizedStream.ToArray();//将字节数组转换为Base64字符串 string base64Image =Convert.ToBase64String(resizedBytes);//返回Base64字符串作为图片的src属性值 // return $"data:image/png;base64,{base64Image}"; return File(resizedBytes, "image/png", "QrCode.png");
}

优缺点与方法1相反