2024年2月

订单履约系统的核心能力

通过分析订单履约的全流程和各个业务活动,我们可以梳理出订单履约的核心业务链路,基于业务链路,我们抽象出订单履约系统的三大系统能力,分别为履约服务表达、履约调度、物流配送。

履约服务表达
:负责向客户明确和准确地传达履约服务,包括提供订单的预计处理时间、配送时效、费用计算以及履约可达性等。保障消费者在下单时有清晰的预期,并在整个订单履约过程中保持这一预期的透明和一致。

履约调度
:涉及订单的接收、处理、门店/仓库分配。这一能力确保订单根据预定的规则和优先级,有效地分配给门店/仓库。提升内部操作的效率,减少履约时间,同时最大限度地减少延期情况。

物流配送
:确保商品从门店/仓库准时地运送到客户手中。这包括与第三方服务提供商的合作、配送管理、配送路径的优化以及送货服务的执行。

订单履约系统的应用架构

应用层它定义了软件的应用功能,负责接收用户的请求,协调领域层能力来执行任务,并将结果返回给用户,模块包括:

  • C端履约服务
    • 预计送达时间:为消费者提供订单的预计处理时间、配送时效等,通常基于订单处理时间、配送情况、配送距离等多种因素计算。
    • 实时订单状态查询:允许消费者实时查看他们的订单所处阶段。包括订单待接单、拣货、打包、已发货、配送中等状态。
    • 配送轨迹跟踪:提供订单从出库到最终送达的完整路径跟踪,消费者可以查看订单的当前位置和过往的配送节点,了解配送进度。
    • 配送信息修改:在订单还未最终发出之前,消费者可能需要更改配送信息,如地址或配送时间。
    • 配送费用明细:显示消费者的订单配送费用的详细分解,包括配送费、包装费、服务费等。
    • 确认收货:消费者可以通过系统确认收货,是完成订单流程的最后一步。
  • B端管理模块:
    • 订单派单
      :接收来自C端的订单并按照既定规则将其自动分配给相应的门店/仓库。
    • 订单管理
      :全面管理订单的生命周期,包括订单的确认、处理、状态跟踪、修改和取消管操作。
    • 拣货管理
      :管理仓库内的拣货操作,确保正确的商品被准确无误地从货架上拣选出来并准备进行打包和发货。
    • 发货管理
      :根据订单的地址、商品大小、重量和客户选择的履约方式,选择最适合的配送方式,并对发货流程进行跟踪。
    • 逆向履约
      :当客户不满意或需退换商品时,逆向履约模块负责处理退货请求,并管理退货退款和换货流程。

领域层是业务逻辑的核心,专注于表示业务概念、业务状态流转和业务规则,沉淀可复用的服务能力,模块包括:

  • 履约服务表达
    :负责向客户提供关于履约服务的明确信息。包括预计的送货时间、费用计算、服务选项(如定时达、次日达等)以及履约可达性要求。
  • 订单履约调度
    :提供订单履约调度的核心能力,确保订单被高效地处理和执行。它涉及订单从接收到最终准备配送的所有调度和处理过程,包括订单拆分、分配、拣货、包装、发货等

写在最后

本文主要讨论了订单履约系统的应用架构。

首先提出了订单履约系统的三大核心能力,分别是履约服务表达、履约调度和物流配送。文中还详细介绍了订单履约系统的应用架构,包括C端履约服务和B端管理模块,以及领域层的能力。

本文介绍基于
C++
语言
GDAL
库,为
CreateCopy()
函数创建的栅格图像
添加
更多波段
的方法。


C++
语言的
GDAL
库中,我们可以基于
CreateCopy()
函数与
Create()
函数创建新的栅格图像文件。其中,
CreateCopy()
函数需要基于一个
已有的栅格图像文件
作为模板,将模板文件的各项属性信息(例如空间参考信息、像元个数、像元大小、波段数量等),自动作为
新创建的栅格图像文件
的属性信息;而
Create()
函数则是仅仅新建立一个栅格图像,需要我们自行定义新栅格图像的各类属性信息。

因此,一般我们选择
CreateCopy()
函数来创建栅格图像文件较为方便,因为其不需要我们手动为所创建的栅格图像配置各种属性信息;但是有时我们希望所创建的
新的栅格图像
,其与
作为模板的图像
之间的属性有一定差异。例如,我们现在依据一个具有
1
个波段的
.tif
格式的模板图像,创建一个新的
.tif
格式的图像;而我们需要使得新的图像具有
3
个波段,除此之外其他属性信息与模板图像一致。这就需要我们在调用
CreateCopy()
函数之后,进行一些额外的操作。

首先,
GDAL
库提供了
AddBand()
函数,可以为
GDALDataset*
类型的数据添加波段;但是,
AddBand()
函数
对于大部分格式的栅格图像而言都不起作用
——例如,最常见的
.tif
格式的栅格图像文件,其就不支持利用
AddBand()
函数增添自身的波段数量。大家在实践过程中,如果用的是其他格式的栅格图像文件,可以先直接用
AddBand()
函数尝试一下,看看其对于自己当前格式的数据是否有效;如果没有效果的话,就需要用接下来的方法来实现需求了。

整体思路其实也很简单——我们在依据
.tif
格式的
模板栅格图像文件
创建新的
.tif
格式的
栅格图像文件
前,先建立一个
.vrt
格式的文件。
.vrt
格式文件是
GDAL
库中提供的一种
虚拟数据格式
,这一数据格式的详细介绍大家可以参考
GDAL
库的帮助文档,这里我们就不再详细说明了;目前只需要知道,
.vrt
格式文件是支持利用
AddBand()
函数增添自身的波段数量的。随后,我们为
.vrt
格式文件增添波段,再用
CreateCopy()
函数基于这一
.vrt
格式文件创建新的
.tif
格式的栅格图像文件,从而实现我们的需求。

	const char* pszFormat = "GTiff";
	GDALDriver* poDriver, * poDriver_VRT;
	poDriver = GetGDALDriverManager()->GetDriverByName(pszFormat);
	poDriver_VRT = GetGDALDriverManager()->GetDriverByName("VRT");
	GDALDataset* poSrcDS = (GDALDataset*)GDALOpenShared(mod_file.c_str(), GA_ReadOnly);
	GDALDataset* poVRTDS = poDriver_VRT->CreateCopy(mod_file.replace(mod_file.find(".tif"), 4, ".vrt").c_str(), poSrcDS, FALSE, NULL, NULL, NULL);
	poVRTDS->AddBand(GDT_Float64, NULL);
	poVRTDS->AddBand(GDT_Float64, NULL);

	char** papszOptions = NULL;
	papszOptions = CSLSetNameValue(papszOptions, "TILED", "YES");
	papszOptions = CSLSetNameValue(papszOptions, "COMPRESS", "LZW");

上述代码也很好理解。首先,我们创建两个
GDALDataset*
变量,分别指向
.tif
格式的
模板栅格图像文件
与我们将要创立的
.vrt
格式文件;随后,先用一次
CreateCopy()
函数,将模板文件的全部属性信息复制到
.vrt
格式文件中。接下来,就利用
AddBand()
函数,为
.vrt
格式文件增添两个波段。此时,加上原有的
1
个波段,
.vrt
格式文件就已经拥有了
3
个波段;而除此之外,
.vrt
格式文件的所有属性信息都是与
.tif
格式的
模板栅格图像文件
一致的。

接下来,就可以开始配置我们所需要创立的新的
.tif
格式栅格图像文件。其中,再用一次
CreateCopy()
函数,将
.vrt
格式文件的全部属性信息复制到新的
.tif
格式的栅格图像文件中。这样,我们新的
.tif
格式的栅格图像文件也就具有
3
个波段了。

	GDALDataset* poDstDS;
	poDstDS = poDriver->CreateCopy(out_file.c_str(), poVRTDS, FALSE, papszOptions, GDALTermProgress, NULL);

	GDALRasterBand* poOutBand;
	poOutBand = poDstDS->GetRasterBand(1);
	poOutBand->RasterIO(GF_Write, 0, 0, nXSize, nYSize, combination_out_pafScanline[pic_index_2 - 1], nXSize, nYSize, GDT_Float64, 0, 0);
	GDALRasterBand* poOutBand_2;
	poOutBand_2 = poDstDS->GetRasterBand(2);
	poOutBand_2->RasterIO(GF_Write, 0, 0, nXSize, nYSize, out_pafScanline[pic_index_2 - 1], nXSize, nYSize, GDT_Float64, 0, 0);
	GDALRasterBand* poOutBand_3;
	poOutBand_3 = poDstDS->GetRasterBand(3);
	poOutBand_3->RasterIO(GF_Write, 0, 0, nXSize, nYSize, qa_pixel_paf[pic_index_2 - 1], nXSize, nYSize, GDT_Float64, 0, 0);

上述代码就是基于
.vrt
格式文件,创建新的
.tif
格式的栅格图像文件,并对新的图像文件的
3
个波段依次赋值的全部过程。

通过上述方式,我们就实现了
CreateCopy()
函数创建新的栅格图像且
为新的栅格图像增添波段数量
的需求。

本文分享自华为云社区《
java代码实现异步返回结果如何判断异步执行完成
》,作者: 皮牙子抓饭。

在许多应用程序中,我们经常使用异步操作来提高性能和响应度。在Java中,我们可以使用多线程或者异步任务来执行耗时操作,并且在后台处理过程完成后获取结果。但是,在使用异步操作时,我们通常需要知道异步任务何时完成,以便进行下一步的操作。 本篇文章将介绍几种常见的方法来判断Java代码中异步操作是否完成。

1. 使用Future和Callable

Java中的Future接口定义了一种方式来表示异步操作的未来结果。我们可以使用Callable接口来定义异步任务,它返回一个Future对象,我们可以利用Future对象的方法来检查任务是否完成。 下面是一个例子:

javaCopy code
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public classAsyncDemo {public static voidmain(String[] args) throws Exception {
ExecutorService executorService
=Executors.newSingleThreadExecutor();//定义异步任务 Callable<String> asyncTask = () ->{
Thread.sleep(
2000); //模拟耗时操作 return "Async task completed";
};
//提交异步任务 Future<String> future =executorService.submit(asyncTask);//判断任务是否完成 while (!future.isDone()) {
System.
out.println("Task not done yet...");
Thread.sleep(
500);
}
//获取结果 String result = future.get();
System.
out.println(result);//关闭线程池 executorService.shutdown();
}
}

在上面的代码中,我们创建了一个单线程的ExecutorService来执行异步任务。我们使用submit方法提交异步任务,并得到一个Future对象。然后,我们可以使用isDone()方法来判断任务是否完成,如果任务没有完成,则等待片刻后再次检查。一旦任务完成,我们可以使用get()方法获取任务的结果。

2. 使用CompletableFuture

自Java 8起,Java提供了CompletableFuture类来更加方便地处理异步操作。CompletableFuture是Future的一个实现,同时也支持对未来结果的处理和组合。 下面是一个例子:

javaCopy code
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public classAsyncDemo {public static voidmain(String[] args) throws Exception {//定义异步任务 CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->{try{
TimeUnit.SECONDS.sleep(
2); //模拟耗时操作 } catch(InterruptedException e) {
e.printStackTrace();
}
return "Async task completed";
});
//判断任务是否完成 while (!future.isDone()) {
System.
out.println("Task not done yet...");
TimeUnit.MILLISECONDS.sleep(
500);
}
//获取结果 String result = future.get();
System.
out.println(result);
}
}

在上述代码中,我们使用supplyAsync方法创建了一个CompletableFuture对象,并定义了异步任务。然后,我们可以使用isDone()方法来判断任务是否完成。通过调用get()方法可以获取最终的结果。

当涉及到实际应用场景时,异步操作的一个常见用例是在Web应用中执行并行的HTTP请求以提高性能。以下是一个示例代码,展示了如何使用异步操作来执行多个HTTP请求,并在所有请求完成后进行处理。

javaCopy code
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.
*;public classAsyncHttpExample {public static voidmain(String[] args) throws Exception {
List
<Future<String>> futures = new ArrayList<>();

ExecutorService executor
= Executors.newFixedThreadPool(5);

List
<String> urls =List.of("https://www.example.com/api1","https://www.example.com/api2","https://www.example.com/api3");for(String url : urls) {
Callable
<String> task = () ->{returnperformRequest(url);
};

Future
<String> future =executor.submit(task);
futures.add(future);
}

executor.shutdown();
for (Future<String>future : futures) {try{
String result
= future.get();
System.
out.println("Received response:" +result);
}
catch (InterruptedException |ExecutionException e) {
e.printStackTrace();
}
}
}
private staticString performRequest(String url) throws IOException {
HttpURLConnection connection
= null;
BufferedReader reader
= null;
StringBuilder response
= newStringBuilder();try{
URL requestUrl
= newURL(url);
connection
=(HttpURLConnection) requestUrl.openConnection();
connection.setRequestMethod(
"GET");

reader
= new BufferedReader(newInputStreamReader(connection.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
}
finally{if (connection != null) {
connection.disconnect();
}
if (reader != null) {
reader.close();
}
}
returnresponse.toString();
}
}

在这个示例中,我们创建了一个固定大小的线程池,并为每个URL创建了一个异步任务。每个任务在自己的线程中执行HTTP请求,并返回响应结果。我们使用Future来跟踪每个任务的执行状态和结果。一旦所有任务都被提交,我们调用shutdown()方法关闭线程池,然后通过迭代每个Future对象,使用get()方法获取任务的结果。最后,我们可以根据需要对结果进行进一步处理,这里只是简单地打印出每个请求的响应。

java.util.concurrent.Callable 是 Java 并发编程中的一个接口,它表示一个可调用的任务,可以在计算中返回一个值。与 Runnable 接口不同,Callable 接口的 call() 方法可以返回一个结果,并且可以在执行过程中抛出受检异常。 Callable 接口定义了以下方法:

  • V call() throws Exception:执行任务并返回结果。可以抛出受检异常。
  • boolean equals(Object obj):比较该 Callable 与指定对象是否相等。
  • default <U> Callable<U> compose(Function<? super V, ? extends U> var1):将该 Callable 的结果应用于给定函数,并返回 Callable。
  • default <V2> Callable<V2> andThen(Function<? super V, ? extends V2> var1):将给定函数应用于该 Callable 的结果,并返回新的 Callable。
  • default Predicate<V> isEqual(Object var1):返回谓词,用于判断对象是否与这个 Callable 的结果相等。
  • default Supplier<V> toSupplier():返回将该 Callable 的结果作为值的供应商。 在实际应用中,Callable 接口常常与 ExecutorService 结合使用,通过将 Callable 对象提交给线程池来执行。线程池会返回一个 Future 对象,用于跟踪任务的执行状态和获取结果。 以下是一个示例代码,展示了如何使用 Callable 接口:
javaCopy code
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public classCallableExample {public static voidmain(String[] args) throws Exception {
Callable
<Integer> task = () ->{int sum = 0;for (int i = 1; i <= 100; i++) {
sum
+=i;
}
returnsum;
};

ExecutorService executor
=Executors.newSingleThreadExecutor();
Future
<Integer> future =executor.submit(task);//可以在此处执行其他任务 Integer result= future.get(); //获取任务的结果,会阻塞直到任务完成 System.out.println("Sum:" +result);

executor.shutdown();
}
}

在上述示例中,我们创建了一个实现了 Callable 接口的任务,并将其提交给一个单线程的线程池来执行。我们通过 Future 对象来获取 Callable 任务的执行结果,其中 get() 方法会阻塞当前线程,直到任务完成并返回结果。

总结

通过使用Future和CompletableFuture,我们可以方便地判断Java代码中异步操作的执行是否完成。这样,我们就可以在异步操作完成后获取结果,并且继续进行后续的操作。这种方式提高了代码的响应性和性能,使我们能够更好地处理并发和异步任务。

点击关注,第一时间了解华为云新鲜技术~

公元2024年2月24日18时许,笔者的个人网站突然遭遇不明身份者的DDOS攻击,且攻击流量已超过阿里云DDos基础防护的黑洞阈值,服务器的所有公网访问已被屏蔽,由于之前早已通过Nginx屏蔽了所有国外IP,在咨询了阿里云客服之后,阿里网安的老同事帮助分析日志并进行了溯源,客服建议笔者选择立刻报警处理!

我国《刑法》二百八十六条规定,违反国家规定,对计算机信息系统功能进行删除、修改、增加、干扰,造成计算机信息系统不能正常运行,后果严重的,处五年以下三年以上有期徒刑或者拘役;后果特别严重的,处五年以上有期徒刑。

但口说无凭,没有证据无法立案,于是立刻登录阿里云后台,调取后台日志

随后点击DDos入流攻击的日志,点击证据下载。

一般DDos攻击日志都已以.cap文件结尾,例子:

ddos-2024022417-xx.xxx.xx.xxx.cap

所以需要下载wireshark软件才能对攻击日志进行查看。

接着需要把日志拷贝到移动硬盘,以便随时可以进行举证操作。

采集好证据之后,拿起手机立刻拨打:110

接警专员了解情况后,会立刻通知附近民警护送报案者前往公安机关:

时值元宵佳节,但办案民警依然坚守岗位,时刻为人民服务:

随后民警迅速提取了证据并且做了笔录。

根据我国《最高人民法院、最高人民检察院关于办理危害计算机信息系统安全刑事案件应用法律若干问题的解释》第四条规定:“破坏计算机信息系统功能、数据或者应用程序,具有下列情形之一的,应当认定为刑法第二百八十六条第一款和第二款规定的“后果严重”:(一)造成十台以上计算机信息系统的主要软件或者硬件不能正常运行的;(二)对二十台以上计算机信息系统中存储、处理或者传输的数据进行删除、修改、增加操作的;(三)违法所得五千元以上或者造成经济损失一万元以上的;”

虽然笔者的网站只是几个简单的静态页面,但服务器却不是独享ip,本次犯罪者的攻击行为已经造成整个公网ip内的多台服务器无法正常访问,应当认定为刑法第二百八十六条第一款和第二款规定的“后果严重”情况,并且根据攻击者的ip已经基本锁定了其位置。

市局领导高度重视,按照公安部“净网2020”以来专项行动统一部署,严厉打击犯罪分子的嚣张气焰。

结语

办案民警也嘱咐笔者提醒相关互联网从业者:

第一,警方将持续发力,加强监管;

第二,各企业单位要加强网络信息系统安全防护,做好网站安全管理工作;

第三,个人用户要定期检查电脑系统,及时更新系统补丁和杀毒软件;

第四,尽量使用复杂的混合密码,各类网站不应用相同密码;

第五,发现服务器遭受不明来源的网络攻击,要立即报警处理。

记一次 splice 导致 io.Copy 阻塞的排查过程

简而言之,net.TCPConn 的 ReadFrom 零拷贝实现
splice

1.21.0 - 1.21.4
删除了
SPLICE_F_NONBLOCK
参数,导致在 CentOS7.2(内核版本 3.10.0) 上
splice
被阻塞。

相关的
issue

https://github.com/golang/go/issues/59041

这个问题在
1.21.5
中被修复,
commit

https://github.com/yunginnanet/go/commit/35afad885d5e046a4a14643b5b530b128ca953de

背景

由于环境的问题,需要有一个 TCP 的代理,之前一直用
ncat -vl 10022 -k -c 'ncat -nv 127.0.0.1 22'
方式将 10022 端口的流量代理至
127.0.0.1:22
,但是 ncat 是一个连接一个进程,如果要做短连接压测的,代理会成为瓶颈。

所以决定换个代理的软件,因为 Go 写一个代理特别简单,十行代码就能实现一个性能不错的服务,那就直接自己写一个。

package main

import (
	"flag"
	"fmt"
	"io"
	"net"
	"sync"

	"github.com/sirupsen/logrus"
)

func main() {
	f := flag.String("from", "", "source addr")
	t := flag.String("to", "", "dest addr")
	flag.Parse()

	if *f == "" || *t == "" {
		fmt.Println("Invalid from/to address")
		return
	}
	logrus.WithFields(logrus.Fields{"from": *f, "to": *t}).Info("Setup proxy server")

	lis, err := net.Listen("tcp", *f)
	if err != nil {
		panic(err)
	}
	logrus.WithField("addr", lis.Addr()).Info("Listen on")

	for {
		conn, err := lis.Accept()
		if err != nil {
			panic(err)
		}
		go handleConn(conn, *t)
	}
}

func handleConn(uConn net.Conn, to string) {
	logrus.WithField("addr", uConn.RemoteAddr()).Info("New conn")
	defer uConn.Close()

	rConn, err := net.Dial("tcp", to)
	if err != nil {
		logrus.WithError(err).Error("Fail to net.DialTCP")
		return
	}
	logrus.WithField("local", rConn.LocalAddr()).Info("Start proxy conn")

	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		io.Copy(uConn, rConn)
		rConn.Close()
		uConn.Close()
	}()
	go func() {
		defer wg.Done()
		io.Copy(rConn, uConn)
		uConn.Close()
		rConn.Close()
	}()
	wg.Wait()
}

编译操作系统为 Debian12,Go 版本为 1.21.1

因为默认路由的原因,我把这个服务部署在了一个 CentOS7.2 的虚拟机里面,压测发现QPS总是上不去。

用 tcpdump 抓包定位到是这边的代理程序有问题,流量没有被正确的进行转发。

为避免出现敏感数据,用下面的图来做模拟,在
A
使用
scp

B
发送文件,中间经过了个我们写的服务
PROXY

+----------------+      +-------------------------------+
|  (A) Debian12  |      |  (B) CentOS7.2                |
|                | <--> | 192.168.32.251:10022          |
| 192.168.32.251 |      |        └─> PROXY              |
|                |      |              └─> 127.0.0.1:22 |
+----------------+      +-------------------------------+

# 生成一个大的文件
#   dd if=/dev/zero of=/tmp/1.txt bs=1M count=1024
# 使用命令模拟压测
#   scp -P 10022 /tmp/1.txt root@192.168.32.245:/tmp/

排查

ps
看到这个进程还在运行,所以不是进程退出导致的。

top
观察进程 CPU 占用也不高,所以不是代码写出死循环来了。

由于程序没有加日志,通过
strace -p $(pidof PROXY)
来分析一下当前哪些系统调用在执行,看起来是 epoll_pwait 没有就绪事件返回。

[pid 26877] splice(14, NULL, 18, NULL, 1048576, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 26790] epoll_pwait(5,  <unfinished ...>
[pid 26788] nanosleep({tv_sec=0, tv_nsec=20000},  <unfinished ...>
[pid 26790] <... epoll_pwait resumed>[], 128, 0, NULL, 0) = 0
[pid 26877] epoll_pwait(5,  <unfinished ...>
[pid 26790] epoll_pwait(5,  <unfinished ...>
[pid 26877] <... epoll_pwait resumed>[], 128, 0, NULL, 0) = 0
[pid 26877] futex(0xc000040d48, FUTEX_WAIT_PRIVATE, 0, NULL <unfinished ...>
[pid 26788] <... nanosleep resumed>NULL) = 0
[pid 26788] futex(0x5ea8a0, FUTEX_WAIT_PRIVATE, 0, {tv_sec=60, tv_nsec=0} <unfinished ...>
[pid 26790] <... epoll_pwait resumed>[{EPOLLIN|EPOLLOUT, {u32=2345140225, u64=9221451948300435457}}], 128, -1, NULL, 0) = 1
[pid 26790] epoll_pwait(5, [], 128, 0, NULL, 0) = 0
[pid 26790] epoll_pwait(5, [{EPOLLIN|EPOLLOUT, {u32=2345140225, u64=9221451948300435457}}], 128, -1, NULL, 0) = 1
         多条 epoll_pwait 省略

看看连接缓冲区里面有没有数据
netstat -ntp | grep 10022
,在接受缓冲区内还有 1666120 个字节的数据没有被读出

tcp6  1666120      0 192.168.32.245:10022    192.168.32.251:49440    ESTABLISHED 26787/PROXY

当时想着看看重启能不能复现,在重启之前先
kill -3
把堆栈打印出来,拿到了一个关键的栈信息。

goroutine 19 [syscall]:
syscall.Syscall6(0x7ff92db08be8?, 0xc000068c88?, 0x45fca5?, 0xc000068c98?, 0x48ed3c?, 0xc000068cb0?, 0x48eea7?)
        /usr/local/go1.21/src/syscall/syscall_linux.go:91 +0x30 fp=0xc000068c60 sp=0xc000068bd8 pc=0x481b50
syscall.Splice(0xc000102000?, 0xc000068d08?, 0x0?, 0x4e70c0?, 0x4e70c0?, 0xc000068d20?)
        /usr/local/go1.21/src/syscall/zsyscall_linux_amd64.go:1356 +0x45 fp=0xc000068cc0 sp=0xc000068c60 pc=0x480d05
internal/poll.splice(...)
        /usr/local/go1.21/src/internal/poll/splice_linux.go:155
internal/poll.spliceDrain(0xc000102100?, 0xc000102000, 0x5a800?)
        /usr/local/go1.21/src/internal/poll/splice_linux.go:92 +0x185 fp=0xc000068d68 sp=0xc000068cc0 pc=0x4917c5
internal/poll.Splice(0x0?, 0x0?, 0x7fffffffffffffff)
        /usr/local/go1.21/src/internal/poll/splice_linux.go:42 +0x173 fp=0xc000068e00 sp=0xc000068d68 pc=0x491413
net.splice(0x0?, {0x53bca8?, 0xc000106000?})
        /usr/local/go1.21/src/net/splice_linux.go:39 +0xdf fp=0xc000068e60 sp=0xc000068e00 pc=0x4cc29f
net.(*TCPConn).readFrom(0xc000106008, {0x53bca8, 0xc000106000})
        /usr/local/go1.21/src/net/tcpsock_posix.go:48 +0x28 fp=0xc000068e90 sp=0xc000068e60 pc=0x4cd0c8
net.(*TCPConn).ReadFrom(0xc000106008, {0x53bca8?, 0xc000106000?})
        /usr/local/go1.21/src/net/tcpsock.go:130 +0x30 fp=0xc000068ed0 sp=0xc000068e90 pc=0x4cc770
io.copyBuffer({0x53bd68, 0xc000106008}, {0x53bca8, 0xc000106000}, {0x0, 0x0, 0x0})
        /usr/local/go1.21/src/io/io.go:416 +0x147 fp=0xc000068f50 sp=0xc000068ed0 pc=0x47d587
io.Copy(...)
        /usr/local/go1.21/src/io/io.go:389
main.handleConn.func2()
        /home/devel/demo/app/demo/main.go:73 +0xb2 fp=0xc000068fe0 sp=0xc000068f50 pc=0x4db672
runtime.goexit()
        /usr/local/go1.21/src/runtime/asm_amd64.s:1650 +0x1 fp=0xc000068fe8 sp=0xc000068fe0 pc=0x464641
created by main.handleConn in goroutine 17
        /home/devel/demo/app/demo/main.go:71 +0x368

分析看到在
io.Copy
这条路线有问题,先看看
io.Copy
的源码

分析 io.Copy

io.Copy
内部有这么一段代码,优先于
read/write
调用,上面的堆栈打印看起来也是这个
ReadFrom
里面有问题。

if wt, ok := src.(WriterTo); ok {
	return wt.WriteTo(dst)
}
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
if rt, ok := dst.(ReaderFrom); ok {
	return rt.ReadFrom(src)
}

OK 先跳过这个
ReadFrom
看看能不能行呢,于是把
io.Copy
里面的
WriteTo/ReadFrom
注释,并且直接放到外面来,使用一般的
read/write
调用。

编译运行,
可行!!!

那么问题就只能在这个
ReadFrom
里面了,照着上面的堆栈,一路追到了
poll.Splice
内,但是之前没有用过
splice
这个函数,只知道是一个零拷贝相关的函数。好吧,Go 在这里还做了一些优化。

那看来还是得研究一下,这个
splice
系统调用。

分析 poll.Splice

在这之前先搜索了一些文档看了一下,这个
splice

文档
写的相当好,很快就能够理解。

文章里面的的这张图清晰的描述了两次
splice
就能通过
pipe
在内核就将数据发送出去,没有把数据从内核空间拷贝至用户空间。

为了减少语言的干扰,使用 C 照着
poll.Splice
重写了一遍,代码如下。在
splice_readfrom
内部,每次循环调用两次
splice
,一次将源 sockfd 的数据放至 pipe 中,一次将 pipe 中的数据写入目的 sockfd 中。

#define _GNU_SOURCE 1
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <netinet/in.h>
#include <poll.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <thread>

static ssize_t splice_drain(int fd, int pipefd, size_t max) {
  while (1) {
    ssize_t n = splice(fd, NULL, pipefd, NULL, max, 0);
    if (n >= 0)
      return n;

    // error handle
    if (errno == EINTR)
      continue;
    else if (errno != EAGAIN)
      return -1;
  }
}

static ssize_t splice_pump(int pipefd, int fd, size_t in_pipe) {
  ssize_t written = 0;
  while (in_pipe > 0) {
    ssize_t n = splice(pipefd, NULL, fd, NULL, in_pipe, 0);
    if (n >= 0) {
      in_pipe -= n;
      written += n;
      continue;
    }

    if (errno != EAGAIN)
      return -1;
  }
  return written;
}

static const size_t kMaxSpliceSize = 1 << 20;

ssize_t splice_readfrom(int dstfd, int srcfd) {
  int pipefd[2];
  if (pipe2(pipefd, 0) < 0)
    return -1;

  ssize_t written = 0;
  ssize_t remain = INT64_MAX;
  while (remain > 0) {
    size_t max = kMaxSpliceSize;
    if (max > (size_t)remain)
      max = remain;

    ssize_t in_pipe = splice_drain(srcfd, pipefd[1], max);
    if (in_pipe < 0)
      return -1;
    else if (in_pipe == 0)
      break;

    ssize_t n = splice_pump(pipefd[0], dstfd, in_pipe);
    if (n > 0) {
      remain -= n;
      written += n;
    }
  }
  close(pipefd[0]);
  close(pipefd[1]);
  return written;
}

int main(int argc, char **argv) {
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  if (sockfd < 0) {
    perror("Fail to socket");
    return -1;
  }

  int opt = 1;
  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(opt));

  fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, NULL) | O_NONBLOCK);

  struct sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_port = htons(10022);
  addr.sin_addr.s_addr = htonl(INADDR_ANY);

  if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
    perror("Fail to bind");
    return -1;
  }

  if (listen(sockfd, 10) < 0) {
    perror("Fail to listen");
    return -1;
  }
  printf("listen on\n");

  int timeout = 3000;
  struct pollfd fds = {sockfd};
  fds.events |= POLLIN;

  while (1) {
    int ret = poll(&fds, 1, timeout);
    if (ret > 0) {
      struct sockaddr_in in;
      socklen_t len = sizeof(in);
      int connfd = accept(sockfd, (struct sockaddr *)&in, &len);
      if (connfd < 0) {
        perror("Fail to accept");
        return -1;
      }

      fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFL, NULL) | O_NONBLOCK);

      std::thread t(
          [](struct sockaddr_in addr, int u_connfd) {
            int sockfd = socket(AF_INET, SOCK_STREAM, 0);
            if (sockfd < 0) {
              perror("Fail to socket");
              return;
            }

            struct sockaddr_in dst_addr;
            dst_addr.sin_family = AF_INET;
            dst_addr.sin_port = htons(10022);
            dst_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

            if (connect(sockfd, (struct sockaddr *)&dst_addr,
                        sizeof(dst_addr)) < 0) {
              perror("Fail to connect");
              return;
            }

            char dst_txt[INET_ADDRSTRLEN];
            char src_txt[INET_ADDRSTRLEN];

            inet_ntop(AF_INET, &addr.sin_addr, src_txt, sizeof(src_txt));
            printf("New conn from %s:%d\n", src_txt, ntohs(addr.sin_port));

            std::thread t1([&]() { splice_readfrom(sockfd, u_connfd); });
            std::thread t2([&]() { splice_readfrom(u_connfd, sockfd); });
            t1.join();
            t2.join();

            close(sockfd);
            close(u_connfd);
          },
          in, connfd);
      t.detach();
    }
  }

  return 0;
}

测试下来,和 Go 版本表现一样,也是被阻塞,不过现在问题就更清晰一些,
splice
的使用有问题。

于是仔细看了一下文档,里面有一个参数
SPLICE_F_NONBLOCK
,要不加上试一下看看,加上之后程序是正常运行的。

所以会是这个参数的问题?在 Go 的实现里面,
splice

flags
参数是为 0 的,也就是意味着是没有设置为非阻塞状态的。

想到我们之前的代理程序都没有出现这个情况,难道是 Go 版本的原因?于是使用 Go1.18 对
PROXY
进行编译运行,
正常运行!

看了两个版本的实现,果然 Go1.18 是含有这个
SPLICE_F_NONBLOCK
参数的,在之后的版本内被删除了;继续搜索,发现了有人提了个上面的 issue。

对代码追踪发现
受影响版本为 1.21.0 - 1.21.4

扩展分析

issue 里面 Go 的开发者说所有的 case 都正常能跑过,所以把这个参数删除了。既然开发者测试没有问题,但是实际使用又有问题,那就有可能是环境不一致导致的。

分析未在不同内核上splice表现不一致

在上面的排查过程中,我还把
PROXY
(Go1.21.1) 放到
A
中运行,代理至
192.168.32.245:22
上,表现也是正常的。经过测试,io.Copy 在不同的系统上的影响如下:

Kernel\Go 1.18.0 1.21.1
3.10 正常 不正常
6.1 正常 正常

1.18 是没有 BUG 的版本,也就是增加了
SPLICE_F_NONBLOCK
参数。那为何 1.21.1 版本没有增加这个参数的可以在
6.1
的内核上运行呢。

没有很好的头绪,难道是 pipe 导致的问题吗,pipe 太小了?于是调整 pipe 大小

fcntl(pipefd[0], F_SETPIPE_SZ, 1 << 20);
fcntl(pipefd[1], F_SETPIPE_SZ, 1 << 20);

使用 Go1.21.1 版本进行编译,并且进行测试,结果如下:

Kernel\Go 1.21.1
3.10 正常
6.1 正常


pipe 太小,那测试数据小于默认大小
65536
的看看会不会有问题

dd if=/dev/zero of=/tmp/1.txt bs=1 count=65536

测试结果如下:

测试数据大小 测试结果
65536 不正常
32768 不正常
25000 不正常
16384 正常

splice
还有一个参数
len
,为从 fd_in 到 pipe_w 中的字节数,如果我减少这个大小,那么结果会如何。测试下来
和调整 pipe 大小带来的结果相同


splice
在不同内核上表现的结果不同这个问题,可以缩小一些排查的范围了:和
pipe
相关

不同内核的 splice 实现

看代码之前确认要关注的点:
在哪里存在阻塞的动作

splice
实现位于
fs/splice.c
中,下面的代码取自 kernel-6.1(3.10 的内核代码也相似,主体逻辑没有变化)

SYSCALL_DEFINE6(splice, ...) // fs/splice.c
  -> __do_splice
    -> do_splice
      -> splice_file_to_pipe  // 将 sockfd 的数据传输至 pipe 中,走这条路径
        -> do_splice_to
          -> tcp_splice_read (in->f_op->splice_read) // net/ipv4/tcp.c
            -> __tcp_splice_read
              -> tcp_read_sock
                -> tcp_splice_data_recv
                  -> skb_splice_bits                 // net/core/ipv4/skbuff.c
                    -> splice_to_pipe                // fs/splice.c

经过 TCP 的读取,兜兜转转又回到
fs/splice.c
中。

kernel-6.1 的实现

在 kernel-6.1 的实现中,
spclie_to_pipe
的实现没有阻塞

ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
		       struct splice_pipe_desc *spd)
{
  // ....
	while (!pipe_full(head, tail, pipe->max_usage)) {
		struct pipe_buffer *buf = &pipe->bufs[head & mask];

		buf->page = spd->pages[page_nr];
		buf->offset = spd->partial[page_nr].offset;
		buf->len = spd->partial[page_nr].len;
		buf->private = spd->partial[page_nr].private;
		buf->ops = spd->ops;
		buf->flags = 0;

		head++;
		pipe->head = head;
		page_nr++;
		ret += buf->len;

		if (!--spd->nr_pages)
			break;
	}
	if (!ret)
		ret = -EAGAIN;

out:
	while (page_nr < spd_pages)
		spd->spd_release(spd, page_nr++);

	return ret;
}

向上回溯,在
splice_file_to_pipe
中,
wait_for_space
中如果
pipe
满了则进行等待
pipe_wait_writable(pipe)

long splice_file_to_pipe(struct file *in,
			 struct pipe_inode_info *opipe,
			 loff_t *offset,
			 size_t len, unsigned int flags)
{
	long ret;

	pipe_lock(opipe);
	ret = wait_for_space(opipe, flags);
	if (!ret)
		ret = do_splice_to(in, offset, opipe, len, flags);
	pipe_unlock(opipe);
	if (ret > 0)
		wakeup_pipe_readers(opipe);
	return ret;
}

static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
{
	for (;;) {
		if (unlikely(!pipe->readers)) {
			send_sig(SIGPIPE, current, 0);
			return -EPIPE;
		}
		if (!pipe_full(pipe->head, pipe->tail, pipe->max_usage))
			return 0;
		if (flags & SPLICE_F_NONBLOCK)
			return -EAGAIN;
		if (signal_pending(current))
			return -ERESTARTSYS;
		pipe_wait_writable(pipe);
	}
}

调整测试代码,对 pipe 只生产而不不消费数据。

ssize_t splice_readfrom(int dstfd, int srcfd) {
    ...
    ssize_t in_pipe = splice_drain(srcfd, pipefd[1], max);

    sleep(1);
    written += in_pipe;
    printf("+%ld written=%ld\n", in_pipe, written);
    continue;

    ssize_t n = splice_pump(pipefd[0], dstfd, in_pipe);
}

为避免 ssh 的元数据干扰,不再使用 sshd
127.0.0.1:22
作为最后点,转而写了一个
io.Discard
的 Go 服务。
测试客户端为
ncat -nv 192.168.32.251 10022 < /tmp/1.txt

在 Debian12(kernel-6.1) 上进行测试,结果如下

+65509 written=65509
+57344 written=122853
+49152 written=172005
+36864 written=208869
+28672 written=237541
+20480 written=258021
+16384 written=274405
+8192 written=282597
+4096 written=286693
// 之后阻塞

pipe 的大小为
65536(PAGE_SIZE * 16)
,但是写入的数据大于了 pipe 的缓冲区后,还能够继续写入,这点和可能和 skbuff/pipe 的 PAGE 有关,这里先跳过,直接测试一下在 CentOS7.2 上表现如何,结果直接阻塞,第一个
splice
都没有返回,好吧看看代码。

kernel-3.10 的实现

同样找到关键的
splice_to_pipe
函数

ssize_t splice_to_pipe(struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
{
  // ...
	for (;;) {
		if (pipe->nrbufs < pipe->buffers) {
			int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
			struct pipe_buffer *buf = pipe->bufs + newbuf;

			buf->page = spd->pages[page_nr];
			buf->offset = spd->partial[page_nr].offset;
			buf->len = spd->partial[page_nr].len;
			buf->private = spd->partial[page_nr].private;
			buf->ops = spd->ops;
			if (spd->flags & SPLICE_F_GIFT)
				buf->flags |= PIPE_BUF_FLAG_GIFT;

			pipe->nrbufs++;
			page_nr++;
			ret += buf->len;

			if (!--spd->nr_pages)
				break;
			if (pipe->nrbufs < pipe->buffers)
				continue;

			break;
		}

		if (spd->flags & SPLICE_F_NONBLOCK) {
			if (!ret)
				ret = -EAGAIN;
			break;
		}

		pipe->waiting_writers++;
		pipe_wait(pipe);
		pipe->waiting_writers--;
	}
	return ret;
}

代码删除了和信号相关的逻辑,整个循环内的关键路径

  • if (!--spd->nr_pages)
    为数据页都被挂在 pipe 后退出循环
  • if (pipe->nrbufs < pipe->buffers)
    为 pipe 中还有空间则继续运行
  • if (spd->flags & SPLICE_F_NONBLOCK)
    为 pipe 没有空间但是设置了非阻塞,则直接返回
  • pipe_wait
    为数据没有读完,但是 pipe 已经没有空间则直接被挂起

在上面
分析未在不同内核上splice表现不一致
的结果中,可以看到 16K 的数据是能够返回的,数据的大小大一些就被阻塞了。

对比分析

kernel-6.1
对 splice 的实现相较
kernel-3.10
做了关键的两点变化:

  1. 提前做了
    pipe
    的空判断,这样数据挂载函数
    splice_to_pipe
    内部就不用进行阻塞了,而 3.10 将空判断和数据的转移放在一起做了
    static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
    {
    	for (;;) {
    		if (unlikely(!pipe->readers)) {
    			send_sig(SIGPIPE, current, 0);
    			return -EPIPE;
    		}
    		if (!pipe_full(pipe->head, pipe->tail, pipe->max_usage))
    			return 0;
    		if (flags & SPLICE_F_NONBLOCK)
    			return -EAGAIN;
    		if (signal_pending(current))
    			return -ERESTARTSYS;
    		pipe_wait_writable(pipe);
    	}
    }
    
  2. 限制了单次
    splice
    读取的大小
    static long do_splice_to(struct file *in, loff_t *ppos,
    			 struct pipe_inode_info *pipe, size_t len,
    			 unsigned int flags)
    {
    	/* Don't try to read more the pipe has space for. */
    	p_space = pipe->max_usage - pipe_occupancy(pipe->head, pipe->tail);
    	len = min_t(size_t, len, p_space << PAGE_SHIFT);
    
    	return in->f_op->splice_read(in, ppos, pipe, len, flags);
    }
    

结合代码和测试程序进行分析一下
kernel-3.10
里面的实现可能在
splice_to_pipe
中就被阻塞了,pipe 可容纳的空间小于 skbuff 中的数据
kernel-6.1
由于每次都会判断是否为空,只向 pipe 中写入可容纳的数据,所以只要有空间就不会被阻塞。

那么就遗留另外一个问题,pipe 的可容纳大小在不同版本内核上的不一样,和文档里面的 65536 都有一些明显出入,但是测试 pipe 的 write,则是准确的 65536. 据查资料得到的结论,fd -> pipe -> fd 这个过程只是 skbuff 的 PAGE 变化,内核不会再进行额外的内存分配。

上面的分析还需要通过调试来进行证明,那可以再写一篇文章通过kprobe分析 splice 了,这里再挖一个坑。

结论

这个问题只在低版本的内核上有问题,在高版本 Debian12 是正常的,在
Go1.21.5
中已经修复,建议使用
Go1.21.5
及以上的版本。

TODO

通过 kprobe 来分析 splice 下的 pipe 空间变化

参考