2024年11月

前言

对于一些较大的图形,会出现显示卡顿和渲染缓慢的问题,这时候就要使用到osgUtil::Simplifier简化器,来对其进行简化。


Demo

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


osgUtil

osgUtil库是osg的四大核心库之一,OSG 核心库提供了用于场景图形操作的核心场景图形功能、类和方法;开发3D图形程序所需的某些特定功能函数和编程接口,以及2D和3D文件I/O的OSG 插件入口。
在这里插入图片描述
osgUtil(OSG Utilities)模块提供了场景更新、裁剪、绘制、数据统计等公用工具。


osgUtil::Simplifier:简化几何体类

在这里插入图片描述

OsgUtil::Simplifier 是 OpenSceneGraph (OSG) 库中的一个工具类,用于简化几何图形。OpenSceneGraph 是一个高性能的场景图渲染引擎,用于图形应用程序的开发,比如可视化、模拟、虚拟现实和游戏等。OsgUtil::Simplifier 类提供了一种方法来减少场景中的多边形数量,这在需要优化渲染性能或减小场景文件大小时非常有用。


功能概述

OsgUtil::Simplifier 的主要功能是简化几何体,比如网格(mesh)。它通过移除一些不太重要的顶点或边来实现这一点,从而减少多边形的数量。简化的程度可以通过参数设置来控制,以达到在保持视觉质量的同时减少渲染负载的目的。


使用场景

  • 性能优化:在需要渲染大量几何体时,通过简化几何体可以减少GPU的负担,提高渲染速度。
  • 文件大小优化:简化后的几何体可以生成较小的场景文件,这对于网络传输或资源存储都是有益的。
  • 实时应用:在实时渲染应用中,如虚拟现实或增强现实,简化可以帮助保持帧率稳定,提供流畅的用户体验。


基本用法

使用 OsgUtil::Simplifier 通常涉及以下几个步骤:

  • 创建简化器实例:创建一个 OsgUtil::Simplifier 的实例。
  • 设置简化参数:通过配置简化器的参数(如目标多边形数量、误差容限等)来控制简化的程度。
  • 应用简化:将需要简化的几何体(如 osg::Geometry 或 osg::Node)传递给简化器进行处理。
  • 获取结果:简化器处理后会返回简化后的几何体,可以将其用于渲染。
    注意事项
  • 视觉质量:虽然简化可以提高性能,但过度简化可能会导致视觉质量的显著下降。因此,需要仔细调整简化参数以找到性能和质量的平衡点。
  • 数据完整性:简化过程中可能会修改原始数据,因此在使用前最好保留原始数据的备份。
  • 资源消耗:简化操作可能需要一定的计算资源,特别是在处理复杂的几何体时。因此,在实时应用中需要谨慎使用,以避免引入额外的延迟。
    OsgUtil::Simplifier 是 OpenSceneGraph 提供的一个强大工具,可以帮助开发者在不影响视觉体验的前提下,优化渲染性能和资源使用。


osgUtil::Simplifier使用步骤


步骤一:引入头文件

在这里插入图片描述

#include <osgUtil/Simplifier>


步骤二:创建实例,同步设置简化参数

在这里插入图片描述

// 步骤二:创建实例
double sampleRatio = 0.5f;
double maximumError = 4.0f;
double maximumLength = 0.0f;
osgUtil::Simplifier simplifier = osgUtil::Simplifier(sampleRatio, maximumError, maximumLength);


步骤三:应用简化到几何体

在这里插入图片描述

// 步骤三:设置简化参数
osg::ref_ptr<osg::Node> pNode2 = (osg::Node*)pNode->clone(osg::CopyOp::DEEP_COPY_ALL);
// 接收报错:terminate called after throwing an instance of 'std::bad_alloc' what(): std::bad_alloc
// 模型太大,简化类型时出现内存分配错误
pNode2->accept(simplifier);


步骤四:使用简化后的几何体

在这里插入图片描述

// 移动下,方便对比
osg::ref_ptr<osg::MatrixTransform> pMatrixTransform = new osg::MatrixTransform();
pMatrixTransform->addChild(pNode2);
osg::Matrix matrix = pMatrixTransform->getMatrix();
matrix = matrix * osg::Matrix::translate(0, 3.5, 0);
pMatrixTransform->setMatrix(matrix);
pGroup->addChild(pMatrixTransform);


Demo源码

osg::ref_ptr<osg::Node> OsgWidget::getSimplifierNode()
{
// 其他demo的控件
updateControlVisible(false);

osg::ref_ptr<osg::Group> pGroup = new osg::Group();

// 加载支持stl格式插件
// osgDB::Registry::instance()->addFileExtensionAlias(".stl", "stl");

// 加载模型
{
osg::ref_ptr<osg::Node> pNode;
QString filePath = "T:/CVN76.STL";

// QString filePath = "cow.osg";
pNode = osgDB::readNodeFile(filePath.toStdString());
if(!pNode.get())
{
LOG << "Failed to openFile:" << filePath;
}

pGroup->addChild(pNode);

#if 1
// 对模型进行简化
// 步骤一:添加头文件
// #include <osgUtil/Simplifier>
// 步骤二:创建实例
// double sampleRatio = 0.5f;
// double sampleRatio = 0.3f;
double sampleRatio = 0.1f;
double maximumError = 4.0f;
double maximumLength = 0.0f;
osgUtil::Simplifier simplifier = osgUtil::Simplifier(sampleRatio, maximumError, maximumLength);
// 步骤三:设置简化参数
osg::ref_ptr<osg::Node> pNode2 = (osg::Node*)pNode->clone(osg::CopyOp::DEEP_COPY_ALL);
// 接收报错:terminate called after throwing an instance of 'std::bad_alloc' what(): std::bad_alloc
// 模型太大,简化类型时出现内存分配错误
pNode2->accept(simplifier);

// 移动下,方便对比
osg::ref_ptr<osg::MatrixTransform> pMatrixTransform = new osg::MatrixTransform();
pMatrixTransform->addChild(pNode2);
osg::Matrix matrix = pMatrixTransform->getMatrix();
matrix = matrix * osg::Matrix::translate(0, 3.5, 0);
pMatrixTransform->setMatrix(matrix);

pGroup->addChild(pMatrixTransform);
#endif
}

return pGroup.get();
}


工程模板v1.37.0

在这里插入图片描述


入坑


入坑一:应用简化器的时候崩溃


问题

应用简化器的时候崩溃
在这里插入图片描述


原因

是模型太大,简化失败, 因为测试了其他图形是可以的:
在这里插入图片描述

崩溃时的cpu、内存和gpu占用率:
在这里插入图片描述

看到其保存的文件达到300MB:
在这里插入图片描述


解决

可尝试加大内存,因为笔者是mingw32版本的,没有进行研究了,具体参照文章《关于 Qt运行加载内存较大崩溃添加扩大运行内存 的解决方法》

参考资料

本篇内容主要参考 韦东山的《嵌入式Linux应用开发完全手册V5.2_IMX6ULL_Pro开发板.pdf》

具体课程见
百问网嵌入式专家-韦东山嵌入式专注于嵌入式课程及硬件研发

实践环境为百问网官方开发板
100ASK_IMX6ULL-Pro

目标系统组成

Linux系统启动流程

一个有效的根文件系统集成了
第三方和内部的所有软件组件。涉及组件的下载、提取、配置、编译和安装,并可能修复问题和调整配置文件。

  • 一个基本的根文件系统至少需要


    • 传统的目录层次结构,包括/bin、/etc、/lib、/root、/usr/bin、/usr/lib、/usr/share、/usr/sbin、/var、/sbin。
    • 一套基本实用程序,至少提供 init 程序、shell 和其他传统的 UNIX 命令行工具。这通常由 BusyBox 提供。
    • 安装在/lib 中的 C 库和相关库(线程库、数学库等)。一些配置文件,如/etc/inittab,以及初始化脚本/etc/init.d。
  • 在大多数嵌入式 Linux 系统通用的基础之上,可以添加第三方或内部组件。

几种解决方案:

  • 手动操作
  • 系统构建工具
  • 发行版或现成的文件系统

组件构建工具

  • Makefile
  • autotools
  • CMake

系统构建工具

可以使用不同的工具来自动化构建目标系统的过程,包括内核,有时还包括工具链。

  • 以正确的顺序自动下载、配置、编译和安装所有组件,有时在应用补丁修复交叉编译问题之后。
  • 已经支持大量的包,应该符合您的主要要求,并且易于扩展。
  • 构建变得可重现,这允许轻松更改某些组件的配置、升级它们、修复错误等。

常见的系统构建工具

  • Buildroot,由社区开发,
    https://buildroot.org


    • 项目主要特点


      • 制作启动映像
      • 从源代码构建所有组件
      • 注重简单
    • 支持:根文件系统映像、内核、引导加载程序、工具链的自由组合

  • OpenWRT,最初是无线路由器 Buildroot 的一个分支,现在是一个更通用的项目,
    https://openwrt.org

  • OpenEmbedded,更灵活但也更复杂,
    http://www.openembedded.org
    及其工业化版本 Yocto 项目。

发行版Linux系统

Linux系统

  • 一种自由和开放源码的类UNIX操作系统,内核创建于1991-10-5,加上用户空间和应用程序之后成为Linux系统。

  • 遵循 GNU通用公共许可证(GPL),任何个人和机构都可以自由使用,修改和再发布。


    • GNU 是一个自由的操作系统,其内容软件完全以GPL方式发布(编译套件 GCC,C库 glibc,核心工具组 coreutils,调试器 GDB等)

发行版Linux
:Linux distribution(GNU/Linux 发行版,为一般用户预先集成好Linux操作系统和各种应用软件,以软件包管理系统进行应用管理)

  • 商业发行版本:Ubuntu(支持x86,arm等不同的处理器架构),Red Hat(主要支持x86),SUSE
  • 社区发行版本:Debain,Fedora, Arch
  • 国内衍生Linux发行版本:Deepin,优麒麟

嵌入式Linux系统

嵌入式Linux系统框架

嵌入式Linux系统构建

开发环境搭建

基础软件安装

百问网提供了 Ubuntu 配置命令脚本,支持一键下载安装

git clone https://e.coding.net/weidongshan/DevelopmentEnvConf.git

cd DevelopmentEnvConf

sudo ./Configuring_ubuntu.sh

配置交叉编译工具链

工具链:一组编程工具,用于开发软件,创建软件产品。包括编译器和链接器,C库,调试器,头文件,二进制实用程序等等

交叉编译工具链:一组工具,用来将源代码构建为可以运行在其他平台的二进制代码,将构建环境和目标环境分隔开(不同的CPU架构,ABI,OS,Clib)

SDK(software development kit):一个更广泛的集合,除了工具链外,还包括为 Target 目标架构构建的库、头文件,以及示例代码、文档、开发指南等资源。

交叉编译构建过程:

  • Build(构建机器),使用 GCC 的源码,制作交叉编译工具链。
  • Host(主机),使用交叉编译工具链,编译出程序。
  • Target(目标机器),程序执行的地方。

本地工具链:build == host == target。

交叉编译工具链:build == host!= target

系统定义描述了一个系统:CPU 架构、芯片厂商、操作系统、ABI、C 库。

  • <arch>-<vendor>-<os>-<libc/abi>(完整名称);
  • <arch>-<os>-<libc/abi>。

arm-foo-none-eabi
,针对 ARM 架构的裸机工具链,来自供应商 foo。

arm-unknown-linux-gnueabihf
,针对 ARM 架构的 Linux 工具链,使用来自未知供应商的 EABlhf ABI 和 glibc C 库。

armeb-linux-uclibcgnueabi
,针对 ARM big-endian 的 Linux 工具链架构,使用 EABl ABI 和 uClibc C 库。

下载100ask_imx6ull 开发板的 BSP:

git clone https://e.coding.net/codebug8/repo.git

mkdir -p 100ask_imx6ull-sdk && cd 100ask_imx6ull-sdk

../repo/repo init -u https://gitee.com/weidongshan/manifests.git -b linux-sdk -m imx6ull/100ask_imx6ull_linux4.9.88_release.xml --no-repo-verify

../repo/repo sync -j4

交叉编译工具链配置

交叉工具链的主要内容

配置过程主要是设置 PATH, ARCH 和 CROSS_COMPILE 三个环境变量:

export ARCH=arm
export CROSS_COMPILE=arm-buildroot-linux-gnueabihf-
export PATH=$PATH:/home/book/100ask_imx6ull-sdk/ToolChain/arm-buildroot-linux-gnueabihf_sdk-buildroot/bin
  • 永久生效:修改用户配置文件
    vim ~/.bashrc
  • 临时生效:直接执行命令,只对当前终端有效

系统构建

Bootloader: Uboot

Bootloader 是在操作系统运行之前运行的一段代码,用于引导操作系统,并支持下载和调试。

U-Boot 是一个开源的主引导加载程序,并含有多种命令以便调试系统。它适用于多种计算机体系结构,包括 ARM,RISC-V 和 x86。

不同的开发板对应不同的配置文件,配置文件位于 u-boot 源码的“configs/”目录。

对于 IMX6ULL Pro 版,u-boot 的编译过程如下(编译 uboot前必须先配置好工具链等开发环境):

cd /home/book/100ask_imx6ull-sdk/Uboot-2017.03
make distclean
make mx6ull_14x14_evk_defconfig 
make

编译完成之后生成 u-boot-dtb.imx,可以烧在 TF 卡、EMMC 上

# 将u-boot-dtb.imx 文件烧写到 EMMC 上:
echo 0 > /sys/block/mmcblk1boot0/force_ro 
dd if=u-boot-dtb.imx of=/dev/mmcblk1boot0 bs=512 seek=2 
echo 1 > /sys/block/mmcblk1boot0/force_ro

Linux Kernel 和模块

编译驱动程序之前要先编译内核:

  • 驱动程序要用到内核文件
  • 编译驱动时用的内核、开发板上运行到内核,要保持一致
  • 同理,更换板子上的内核后,板子上的其他驱动也要更换

流程说明

  1. 获取配套的交叉编译工具链


    • SOC原厂提供:NXP ST Rockchip Amlogic Allwinnertech等。
    • 社区下载:Linrao Debian ARM Bootlin
  2. 下载kernel源码


  3. Host下配置开发环境


    • 安装必要依赖包
    • 解压配置合适的工具链
  4. 指定编译板子配置文件

    make BOARDNAME_defconfig

  5. 编译


    • 编译内核镜像
      make -jN
    • 编译设备树
      make dtbs
    • 编译安装模块驱动
      make modules

编译内核

源码镜像

  • vmlinux,未压缩的原始内核映像,ELF 格式,用于调试目的,但无法启动。

  • arch/
    /boot/*image,是最终可以启动的压缩后的内核镜像文件:


    • bzImage for x86,
    • zImage for ARM,
    • Image.gz for RISC-V,
    • vmlinux.bin.gz for ARC等。
  • arch/
    /boot/Image,也可以引导的未压缩内核映像。

  • arch/
    /boot/dts/*.dtb,编译的设备树文件(某些架构)。

  • 所有内核模块,分布在内核源代码树中,作为 .ko(内核目标)文件。

配置编译环境
:指定架构和编译器,以及单板的配置文件

不同的开发板对应不同的配置文件,配置文件位于内核源码 arch/arm/configs/ 目录。

IMX6ULL_Pro开发板 kernel 的编译过程如下:

cd /home/book/100ask_imx6ull-sdk/Linux-4.9.88
make mrproper
make 100ask_imx6ull_defconfig
make zImage -j8
make dtbs

编译完成后,在 arch/arm/boot 目录下生成 zImage 内核文件, 在 arch/arm/boot/dts 目录下生成设备树的二进制文件 100ask_imx6ull14x14.dtb

编译安装内核模块

  • 使用内核模块可以支持更多不同的设备外设。
  • 模块使无需重启即可轻松开发驱动程序:加载、测试、卸载、重建、加载...
  • 有助于将内核映像大小保持在最小(在 PC 的 GNU/Linux 发行版中必不可少)。
  • 对于减少启动时间也很有用:您无需花时间初始化稍后才需要的设备和内核功能。

注意:一旦加载,在系统中拥有完全控制权和权限。没有特别的保护。这就是为什么只有 root 用户才能加载和卸载模块。

编译:

cd ~/100ask_imx6ull-sdk/Linux-4.9.88/
make modles

安装内核模块到Ubuntu内的某一个目录备用:

cd ~/100ask_imx6ull-sdk/Linux-4.9.88/
make ARCH=arm INSTALL_MOD_PATH=/home/book/nfs_rootfs modules_install

编译设备树

  • 目标单板所需的硬件设备信息。
  • 一般用于嵌入式设备。

设备树文件要与目标单板配套使用。一般和内核镜像存放同一位置。

内核源码目录下执行
make dtbs

安装内核和模块到开发板

mount -t nfs -o nolock,vers=3 192.168.5.11:/home/book/nfs_rootfs /mnt
cp /mnt/zImage /boot
cp /mnt/100ask_imx6ull-14x14.dtb /boot
cp /mnt/lib/modules /lib -rfd

替换对应的目录中文件,重启开发板即可完成更新 zImage、dtb、模块

完整的系统

根文件系统与 busybox

Linux 系统需要一组基本的程序才能工作,包括一个 init 程序、一个 shell 以及用于文件操作和系统配置的各种基本实用程序。在普通的 GNU/Linux 系统中,这些程序由不同的项目提供。

Busybox 本身包含了很多 Linux 命令,但是要编译其他程序或者某些依赖库,需要手工下载、编译。 如果想做一个极简的文件系统,可以使用 Busybox 手工制作。

文件系统是一套实现了数据的存储、分级组织、访问和获取等操作的抽象数据类型(Abstract data type)

  • 数据存取


    • 使用硬盘和光盘这样的存储设备,并维护文件在设备中的物理位置
    • 通过网络协议(如 NFS、SMB 等)提供的或者暂存于内存上
    • 虚拟文件(如 proc 文件系统)
  • 用户访问管理


    • 不必关心数据实际保存在硬盘(或者光盘)的地址为多少的数据块上,只需要记住这个文件的所属目录和文件名
    • 不必关心硬盘上的那个块地址没有被使用,硬盘上的存储空间管理(分配和释放)功能由文件系统自动完成

根文件系统:被挂载在特定层次结构的 root 位置,由“/”标识

  • mount 和 umount 是程序,是文件系统中的文件。所以在安装至少一个文件系统之前无法使用
  • 不能用普通的挂载命令挂载,由内核直接挂载,根据“root=”内核选项进行设置。当没有可用的根文件系统时,内核会崩溃
  • 支持从不同位置挂载,包括存储设备(硬盘,SD卡等),NFS和内存。

busybox 在根文件系统:

Buildroot

Buildroot 是一组 Makefile 和补丁,可自动化地为嵌入式系统构建完整的、可启动的 Linux 环境(包括 bootloader、Linux 内核、包含各种 APP的文件系统)。

  • 可以自动构建所需的交叉编译工具链,创建根文件系统,编译 Linux 内核映像,并生成引导加载程序用于目标嵌入式系统。并支持所有步骤的任何独立组合。
  • 使用简单:类似内核的menuconfig
  • 支持大量的实用软件包,比如 QT等

构建说明

  • 所有的构建都会输出到顶层目录下的 output/目录内。
    O = output
    。另外也支持 out-of-tree 构建。
  • 配置文件作为.config 存储在顶级 Buildroot 源目录中。
    CONFIG_DIR = $(TOPDIR),TOPDIR = $(shell pwd)

IMX6ULL_Pro 编译过程

cd ~/100ask_imx6ull-sdk/Buildroot_2020.02.x

#选择配置界面
make menuconfig

# 单独编译内核
make linux-rebuild

# 进入内核配置选项
make linux-menuconfig

# 单独编译 u-boot
make uboot-rebuild

# 单独编译某个软件包
make <pkg>-rebuild

# 进入 busybox 配置选项
make busy-box-menuconfig

# 生成系统 sdk
make sdk

配置文件说明

编译系统

cd /home/book/100ask_imx6ull-sdk/Buildroot_2020.02.x
make clean
make 100ask_imx6ull_pro_ddr512m_systemV_qt5_defconfig
make all -j4

镜像文件说明

buildroot2020.02.x/output/images/

  • 100ask_imx6ull-14x14.dtb
    【设备树文件】
  • rootfs.ext2
  • rootfs.ext4
  • rootfs.tar
  • rootfs.tar.bz2
    【打包并压缩的根文件系统,用于NFSROOT启动】
  • 100ask-imx6ull-pro-512d-systemv-v1.img
    【完整的系统镜像,用于烧写EMMC和SD卡】
  • uboot-dtb.imx
    【Uboot镜像】
  • zImage
    【内核镜像】

系统烧录

USB 模式烧录

使用 USB 烧写工具:

使用命令行

  • 执行脚本命令


    • sudo ./bin/uuu scripts/basic/emmc/write_all.clst #烧写整个系统
  • 对于裸机程序,名字各有不同,没有提供固定的脚本


    • sudo ./bin/uuu -b emmc(or sd) firmware/u-boot-dtb_fastboot_100ask.imx files/led.imx #把 led.imx 烧到 EMMC

在开发板上直接烧写

烧写 u-boot

  • 将uboot镜像u-boot-dtb.imx拷贝到开发板根目录


    • 烧写 EMMC

      echo 0 > /sys/block/mmcblk1boot0/force_ro 					//取消此分区的只读保护 
      dd if=u-boot-dtb.imx of=/dev/mmcblk1boot0 bs=512 seek=2 	//实际烧写命令 
      echo 1 > /sys/block/mmcblk1boot0/force_ro 					//打开此分区的只读保护
      
    • 烧写 SD/TF

      dd if=u-boot-dtb.imx of=/dev/mmcblk0 bs=512 seek=2

更新内核或设备树

开发板使用的内核名为 zImage,设备树名为 100ask_imx6ull-14x14.dtb。 保存在开发板的/boot 目录中,只要替换/boot 目录下的文件后重启即可完成更新

烧写 SD/TF 卡

Windows SD/TF 卡烧录工具

格式化:

烧写镜像:

Ubuntu下使用命令行烧录 SD/TF 卡

在 Ubuntu 下可以更精细地操作 SD/TF 卡:把 sdcoard.img 整个烧写到卡上,单独烧写 u-boot 到卡上,甚至挂接卡上的文件系统后单独更新里面的文件。

  • 识别 SD/TF 卡:使用 dmesg 命令获取设备挂载的设备节点

  • 更新整个系统镜像:使用 dd 命令烧写 sdcard.img 镜像文件到 /dev/sdb 设备


    • sudo dd if=sdcard.img of=/dev/sdb
  • 只更新卡上的 u-boot:使用 dd 命令烧写 uboot的imx 镜像文件到 /dev/sdb 设备


    • sudo dd if=u-boot-dtb.imx of=/dev/sdb bs=1k seek=1 conv=fsync
  • 更新 SD/TF 卡中的内核和设备树


    • 对于曾经烧写过的 SD/TF 卡,上面已经有分区。使用 vmware 连接 TF 卡设 备后,Ubuntu 系统系统会自动挂载 tf 卡内的分区文件系统


      • 执行
        df -h
        命令找到对应目录,将其中的内核镜像拷贝到开发板 /boot 目录后重启

数据抽取平台pydatax,前期项目做过介绍:

1,
数据抽取平台pydatax介绍--实现和项目使用

项目2: 客户有9个分公司,用的ERP有9套,有9个库,不同版本,抽取的同一个表字段长度有不一样,字段可能有多有少,客户ERP核心分公司ERP几个月后有大版本升级。

在2023年12月,当时做这个抽取时,客户只是做一个分公司的,抽取9套其中的一套ERP的Oracle库,当时考虑是否把这9个库的都抽取了,免得后面客户要做,浪费时间再做一遍,太麻烦,问了项目负责人也没说是要把这9个库都抽取过来,没说要做!
到底怎么处理?

想想早晚要做,还是先都抽取了,把他的8家公司的Oracle数据库都抽取过来,各个公司的数据,9张表合并成1张,在表加租户id字段区分:tenant_id 确定是那个公司的,把这个大数据抽取,数据指标计算完成。

到了2024年7月份,客户突然说后续指标计算暂停,要把其他的子公司的数据和计算也加进去,这半年又新参股合并了3家公司,原先有一家去掉。一共要新加7+3=10家公司,为支持该业务需要,pydatax的修改过程如下:

1,JSON抽取模板修改

oracle_gp_table_df_job.json文件加上新加的3个分公司,再原有的json去掉一家公司:

{"querySql": [  "select SYSdate as etl_create_time,SYSdate as etl_update_time, 'wflsy' as tenant_id,${src_table_columns_wflsy} from wflsy.${src_table_name} ${relation} where ${condition}"],"jdbcUrl": [ "jdbc:oracle:thin:@10.0.1.206:1521:erp"]
},
{
"querySql": [ "select SYSdate as etl_create_time,SYSdate as etl_update_time, 'ky' as tenant_id, ${src_table_columns_ky} from ky.${src_table_name} ${relation} where ${condition}"],"jdbcUrl": [ "jdbc:oracle:thin:@10.0.1.206:1521:erp"]
},
{
"querySql": [ "select SYSdate as etl_create_time,SYSdate as etl_update_time, 'wfjy' as tenant_id, ${src_table_columns_wfjy} from wfjy.${src_table_name} ${relation} where ${condition}"],"jdbcUrl": [ "jdbc:oracle:thin:@10.0.1.206:1521:erp"]
},

2,Python程序修改

以上加上后,同时要在pydatax.py和vprepair.py的脚本上加上3个变量,传递给datax的json模板, 这样每天抽取任务的全量和增量3家新公司数据生效。

src_table_columns_wflsy=get_org_src_columns(src_table_columns,"WFLSY",src_table_name)
src_table_columns_ky=get_org_src_columns(src_table_columns,"KY",src_table_name)
src_table_columns_wfjy=get_org_src_columns(src_table_columns,"WFJY",src_table_name)
"-Dsrc_table_columns_std='" + src_table_columns_std + "'"\"-Dsrc_table_columns_ky='" + src_table_columns_ky + "'"\"-Dsrc_table_columns_wflsy='" + src_table_columns_wflsy + "'"\"-Dsrc_table_columns_wfjy='" + src_table_columns_wfjy + "'"\"-Ddes_table_columns='"+des_table_columns+"' \""

这样整个每天11家公司的表全量数据和表增量数据就正常的,全量同步的表数据是每天全量,不用处理,
但增量表历史数据怎么抽取过来?

3,增量表历史数据处理

1,将1中的3个Json单独编写成1个json模板,模板的"preSql"中的“truncate table 表“ 数据不再使用。

2,新模板json文件放到对应文件夹下,写一条数据到datax_json中

3,是增量的历史表datax_config的数据copy一份到datax_config_repair中,并将json_id和2中的一致,只按2的新模板抽取数据

4,执行vprepair.py后,数据同步到临时表stg表,再执行select sp_stg();

4, 特殊表处理

商品表,有家公司的商品表,同一个商品有多个,直接抽取商品表,会在该公司有多条数据,客户做了个视图过滤成1条,说抽取这个视图数据就可以,但pydatax的11个库的表名要必须一致,
怎么处理?


想想也可以实现,就是对这个商品表单独配置一个抽取JSON模板,如下:

dbo.base_${src_table_name} ,当然也可以把这个表名写死,也可以使用。在datax_config表的json_id配置成新的模板,就可以实现单独表的特殊处理。

"querySql": [  "select SYSdate as etl_create_time,SYSdate as etl_update_time, 'zb' as tenant_id,${src_table_columns} from dbo.base_${src_table_name} ${relation} where ${condition}"],"jdbcUrl": [ "jdbc:oracle:thin:@192.168.0.17:1521/erpdg"]
},

修改的文件放在压缩包的”11库抽取”文件夹下:

pydatax
源码下载地址:

1,
https://files.cnblogs.com/files/zping/pydatax.rar

总结:


1,pydatax的灵活性在此项目得到很好的验证

2,其简单的修改就可以快速满足客户的需求

本文详细介绍了在使用asyncio库编写异步程序时常见的错误和问题,并进一步通过实践案例进行分析和讨论,以便在项目中更有效地应用asyncio库。有关asyncio库的详细介绍,可参考:
Python 异步编程库 asyncio 使用指北

1 asyncio程序的常见错误

本节展示了在使用asyncio模块时,开发人员常遇到的一些常见错误示例。以下是四个最常见的异步编程错误:

  1. 直接调用并运行协程。
  2. 主协程过早退出。
  3. 错误使用asyncio的低级API。
  4. 程序出现竞争条件或死锁问题。

1.1 试图直接调用并运行协程

协程通常通过
async def
定义,如下所示:

# 自定义协程
async def custom_coro():
    print('hi there')

若直接像函数一样调用该协程,通常不会执行预期的操作,而是创建一个协程对象。这种调用方式不会触发协程的执行:

# 错误:像函数一样调用协程
custom_coro()  # 这只是创建了一个协程对象,并不会执行

此时,返回的是一个协程对象,而不是立即执行协程主体,这忽略协程必须在事件循环中运行。如果协程未被执行,系统将发出以下运行时警告:

sys:1: RuntimeWarning: coroutine 'custom_coro' was never awaited

要正确执行协程,需要在
asyncio
事件循环中等待该对象。例如,使用
asyncio.run()
启动事件循环来执行协程:

# 正确:通过 asyncio.run() 运行协程
import asyncio

asyncio.run(custom_coro()) 

另一种执行协程方法是通过
await
表达式在现有协程中挂起并调度其他协程。例如,定义一个新的协程,在其中调用
custom_coro()

# 正确:在协程中使用 await 调度另一个协程
async def main():
    await custom_coro() 

# 使用 asyncio.run 启动事件循环
asyncio.run(main()) 

1.2 主协程过早退出

在异步编程中,任务的执行可能无法按预期及时完成。通过
asyncio.create_task()
可以并行运行多个协程,但如果主协程提前退出,这些任务可能会被强制中止。为确保所有任务能够在主协程退出前完成,主协程应在无其他活动时显式等待剩余任务的完成。可以使用
asyncio.all_tasks()
来获取当前事件循环中的所有任务,并在移除主协程本身后,通过
asyncio.wait()
等待其他任务的执行结果。如果不移除当前协程,
asyncio.wait
会等待所有任务完成,包括当前协程,从而导致程序不退出(死锁)。示例如下:

import asyncio

async def task_1():
    print("任务 1 开始")
    await asyncio.sleep(2)
    print("任务 1 完成")

async def task_2():
    print("任务 2 开始")
    await asyncio.sleep(1)
    print("任务 2 完成")

async def main():
    # 创建多个任务
    task1 = asyncio.create_task(task_1())
    task2 = asyncio.create_task(task_2())
    
    # 获取所有正在运行的任务的集合
    all_tasks = asyncio.all_tasks()
    
    # 获取当前任务(即主协程)
    current_task = asyncio.current_task()
    
    # 从所有任务列表中删除当前任务
    all_tasks.remove(current_task)
    
    # 暂停直到所有任务完成
    await asyncio.wait(all_tasks)

# 运行主协程
asyncio.run(main())

代码运行结果为:

任务 1 开始
任务 2 开始
任务 2 完成
任务 1 完成

1.3 错误使用asyncio的低级API

asyncio
提供了两类API:一类是面向应用程序开发者的高级API,另一类是面向框架开发者的低级API。低级API主要为高级API提供底层支持,如事件循环、传输协议等内部结构。在大多数情况下,推荐优先使用高级API,特别是在学习阶段。只有在需要实现特定功能时,才应考虑使用低级API。尽管学习低级API具有一定的价值,但不应在刚开始时就使用。建议先通过高级API熟悉异步编程的基本概念,进行应用开发,掌握核心知识后,再深入探讨技术细节。例如:

import asyncio

# 高级API:推荐的用法
async def hello_world():
    print("你好,世界!")

# 使用 asyncio.run 来启动事件循环
def run_hello_world():
    asyncio.run(hello_world())

# 低级API:不推荐直接使用
async def low_level_example():
    loop = asyncio.get_event_loop()  # 获取当前事件循环
    task = loop.create_task(hello_world())  # 创建任务
    await task  # 显式等待任务完成

# 运行高级 API 示例
print("使用 asyncio.run 运行:")
run_hello_world()

# 运行低级 API 示例
print("\n使用低级 API 运行:")
asyncio.run(low_level_example())

1.4 程序出现竞争条件或死锁问题

竞争条件和死锁是并发编程中常见的错误。竞争条件发生在多个任务同时访问相同资源时,缺乏适当的控制可能导致数据错误或丢失。死锁则是指不同任务互相等待对方释放资源,最终导致所有任务无法继续执行。

许多Python开发者认为,使用
asyncio
协程可以避免这些问题,因为在任何时刻,事件循环中只有一个协程在执行。然而,协程在运行过程中可能会暂停和恢复,并且可能会访问共享资源。如果对这些资源没有适当的保护,就可能会引发竞争条件。此外,在协程同步资源时处理不当,也有可能导致死锁。因此,在编写
asyncio
程序时,确保协程的安全性至关重要。

1.4.1 竞争条件问题

以下示例代码模拟了两个异步任务并行增加共享变量
counter
,每个任务循环10000次对
counter
进行递增操作。通过
awaitasyncio.sleep(0)
来模拟上下文切换,确保两个任务能够交替执行。然而,由于未使用同步机制(如锁),会导致竞态条件。因此,最终的
counter
值可能小于预期的20000,而不是20000,因为两个任务可能在读取和更新
counter
的值时发生冲突,导致多个协程可能重复更新相同的数据:

import asyncio

# 共享资源
counter = 0

async def increment():
    global counter
    for _ in range(10000):
        temp = counter
        temp += 1
        await asyncio.sleep(0)  # 让出控制权,模拟上下文切换
        counter = temp

async def main():
    tasks = [increment(), increment()]
    await asyncio.gather(*tasks)
    print("最终计数器的值:", counter)  

# 运行 asyncio 程序
asyncio.run(main())

代码运行结果为:

最终计数器的值: 10000

为了解决这个问题,可以使用
asyncio.Lock
来同步对共享资源
counter
的访问。然而,由于
asyncio.Lock

asyncio.run
之间的事件循环可能不匹配,通常会在某些环境中(如特定的 IDE 或脚本运行环境)出现问题。原因在于
asyncio.run
创建并管理一个新的事件循环,而锁 (
asyncio.Lock
) 可能会被不同的事件循环使用,从而导致不一致。为避免这种情况,可以显式创建并使用一个事件循环,如下所示:

import asyncio

# 共享资源
counter = 0
# 创建锁
lock = asyncio.Lock()

async def increment():
    global counter
    for _ in range(10000):
        async with lock:  # 确保在修改 counter 时,只有一个任务可以访问
            temp = counter
            temp += 1
            await asyncio.sleep(0)  # 让出控制权,模拟上下文切换
            counter = temp

async def main():
    tasks = [increment(), increment()]
    await asyncio.gather(*tasks)
    print("最终计数器的值:", counter)

# 显式创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

代码运行结果为:

最终计数器的值: 20000

1.4.2 死锁问题

死锁介绍

死锁(Deadlock)是并发编程中的一种常见问题,它发生在多个任务之间的资源争用中,导致所有任务都陷入无法继续执行的僵局。即使在Python中使用
asyncio
协程框架,资源竞争和同步问题也可能导致死锁的发生,尤其是在协程需要同步资源(如锁)时。如果同步机制设计不当,容易引发死锁。

死锁的特征如下:

  • 循环等待:多个任务之间相互等待对方释放资源,从而形成一个循环等待的关系。例如,任务1等待任务2释放资源,而任务2又在等待任务1释放资源,形成闭环。
  • 不可抢占:每个任务持有的资源(如锁)不能被其他任务强制抢占。只有在任务主动释放资源时,其他任务才能获取该资源。
  • 持有资源且等待:任务持有某些资源(如锁),同时又在等待其他资源的释放。由于任务在持有资源的情况下无法继续执行,导致系统中的任务无法前进。

以下代码中的死锁是典型的循环等待问题,所有相关任务陷入相互等待的死循环,无法继续执行:

import asyncio

# 创建两个共享锁
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()

async def task1():
    print("任务1:尝试获取锁1")
    await lock1.acquire()  # 获取锁1
    print("任务1:已获取锁1,尝试获取锁2")
    await asyncio.sleep(1)  # 模拟一些操作
    await lock2.acquire()  # 获取锁2
    print("任务1:已获取锁2")
    
    # 释放锁
    lock1.release()
    lock2.release()

async def task2():
    print("任务2:尝试获取锁2")
    await lock2.acquire()  # 获取锁2
    print("任务2:已获取锁2,尝试获取锁1")
    await asyncio.sleep(1)  # 模拟一些操作
    await lock1.acquire()  # 获取锁1
    print("任务2:已获取锁1")
    
    # 释放锁
    lock1.release()
    lock2.release()

async def main():
    # 启动两个任务
    await asyncio.gather(task1(), task2())

# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

代码运行结果如下,由于两个任务都被挂起,程序无法退出,且永远不会打印出"任务1:已获取锁2"或"任务2:已获取锁1":

任务1:尝试获取锁1
任务1:已获取锁1,尝试获取锁2
任务2:尝试获取锁2
任务2:已获取锁2,尝试获取锁1
...

asyncio中死锁的避免

在使用
asyncio
时,为了避免死锁,可以采取以下几种方法:

  1. 锁的顺序管理:确保所有任务按照相同的顺序获取锁,以防止发生相互等待的情况。
  2. 尝试获取锁:使用
    asyncio.Lock

    acquire
    方法并设置超时时间,避免任务长时间处于等待锁的状态。
  3. 使用
    async with
    :通过
    async with
    语句来管理锁,这样可以确保在任务完成后自动释放锁,避免因忘记释放锁而引发问题。

根据这一思路,前面死锁的案例解决示例代码如下:

import asyncio

# 创建两个共享锁
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()

async def task1():
    print("任务1:尝试获取锁1")
    async with lock1:  # 使用async with获取锁,自动释放
        print("任务1:已获取锁1,尝试获取锁2")
        await asyncio.sleep(1)  # 模拟一些操作
        
        print("任务1:尝试获取锁2")
        async with lock2:  # 使用async with获取锁,自动释放
            print("任务1:已获取锁2")

async def task2():
    print("任务2:尝试获取锁1")
    async with lock1:  # 使用async with获取锁,自动释放
        print("任务2:已获取锁1,尝试获取锁2")
        await asyncio.sleep(1)  # 模拟一些操作
        
        print("任务2:尝试获取锁2")
        async with lock2:  # 使用async with获取锁,自动释放
            print("任务2:已获取锁2")

async def main():
    # 启动两个任务
    await asyncio.gather(task1(), task2())

# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

代码运行结果如下,可以看到两个任务避免了死锁:

任务1:尝试获取锁1
任务1:已获取锁1,尝试获取锁2
任务2:尝试获取锁1
任务1:尝试获取锁2
任务1:已获取锁2
任务2:已获取锁1,尝试获取锁2
任务2:尝试获取锁2
任务2:已获取锁2

2 asyncio程序的常见问题

在使用asyncio编写异步程序时,开发者可能会遇到一系列常见问题,这些问题涉及到任务的管理、执行流程、性能优化等多个方面。以下是一些常见的问题和挑战:

  1. 任务的等待、停止、结果获取
  2. 如何在后台运行和等待任务
  3. 任务的延迟后运行和后续运行
  4. 如何显示运行任务的进度
  5. 如何在asyncio中执行阻塞I/O或CPU密集型函数
  6. Python协程:操作系统原生支持吗

2.1 任务的等待、停止、结果获取

2.1.1 如何等待任务

可以通过直接等待
asyncio.Task
对象来等待任务的完成:

# 等待任务完成
await task

也同时创建并等待任务完成。例如:

# 创建并等待任务完成
await asyncio.create_task(custom_coro())

与协程不同,任务可以多次等待而不会引发错误。以下是一个演示如何多次等待同一任务的示例,在此例中,
await task
两次都能成功执行,因为
task
已经完成并保存了返回值:

import asyncio

async def other_coro():
    await asyncio.sleep(1)
    return "任务完成"

async def main():
    # 将协程包装在任务中并安排其执行
    task = asyncio.create_task(other_coro())
    
    # 第一次等待任务并获取返回值
    value1 = await task
    print(value1)
    
    # 再次等待任务(任务已经完成)
    value2 = await task
    print(value2)

# 运行主协程
asyncio.run(main())

2.1.2 何时停止任务

可以通过
asyncio.Task
对象的
cancel()
方法取消任务。若任务被成功取消,
cancel()
方法返回
True
,否则返回
False
。例如:

# 取消任务
was_cancelled = task.cancel()

2.1.3 如何获取任务的返回值

在Python中创建一个
asyncio
任务后,有两种方法可以从
asyncio.Task
中检索返回值:

  1. 等待任务(使用
    await
    )。
  2. 调用
    result()
    方法。

基于
await
函数,等待任务时,调用者会挂起,直到任务完成并返回结果。如果任务已完成,返回值会立即提供。以下代码展示了如何等待任务并获取其返回值:

import asyncio

async def other_coro():
    await asyncio.sleep(1)
    return "任务完成"

async def main():
    # 将协程包装在任务中并安排其执行
    task = asyncio.create_task(other_coro())
    
    # 等待任务完成并获取返回值
    value = await task
    print(value)

# 运行主协程
asyncio.run(main())

也可以通过调用
asyncio.Task
对象的
result()
方法获取任务的返回值。此时要求任务已完成。如果任务未完成,调用
result()
会引发
InvalidStateError
异常。如果任务被取消,则会引发
CancelledError
异常。以下是一个使用
result()
方法的例子:

import asyncio

async def other_coro():
    await asyncio.sleep(1)
    return "任务完成"

async def main():
    task = asyncio.create_task(other_coro())
    
    # 等待任务完成
    await task
    
    try:
        # 获取任务的返回值
        value = task.result()
        print(value)
    except asyncio.InvalidStateError:
        print("任务尚未完成")
    except asyncio.CancelledError:
        print("任务已取消")

# 运行主协程
asyncio.run(main())

2.2 如何在后台运行和等待任务

2.2.1 如何在后台运行任务

通过
asyncio.create_task()
可以将协程封装为Task对象,并在后台执行。创建的任务对象会立即返回,且不会阻塞调用者的执行。为了确保任务能够开始执行,可以使用
await asyncio.sleep(0)
暂停片刻。之所以使用
await asyncio.sleep(0)
,是因为新创建的任务并不会立刻开始执行。事件循环负责管理多个任务,它会根据调度策略决定哪个任务优先执行。通过
await asyncio.sleep(0)
暂时让出执行权,使得事件循环有机会调度并执行刚刚创建的任务。这样,
await asyncio.sleep(0)
确保了任务在创建后能尽早开始执行,同时不会阻塞主协程的其他操作。示例代码如下:

import asyncio

async def other_coroutine():
    print("开始执行 other_coroutine")
    await asyncio.sleep(2)
    print("other_coroutine 执行完毕")

async def main():
    # 创建并调度任务
    task = asyncio.create_task(other_coroutine())
    
    # 暂停片刻以确保任务开始执行
    await asyncio.sleep(0)
    
    print("主协程正在执行")
    
    # 等待任务完成
    await task
    print("任务执行完毕")

# 运行主协程
asyncio.run(main())

此外,后台任务可以在程序运行时执行,不会妨碍主程序的结束。如果主程序没有其他待执行的任务,而后台任务仍在进行中,那么需要确保程序在后台任务完成后才会完全退出。

2.2.2 如何等待所有后台任务

在使用
asyncio
时,可能需要等待多个独立的任务完成。比如,当多个任务同时运行时,有时想要等待所有任务完成,但又不想一直阻塞当前正在运行的任务。为了实现这个功能,可以通过以下步骤:

  1. 获取所有当前任务:使用
    asyncio.all_tasks()
    可以获取到当前事件循环中的所有任务。
  2. 排除当前任务:通过
    asyncio.current_task()
    获取当前正在运行的任务,并将其从任务集合中移除。这样可以避免等待当前任务自己。
  3. 等待所有剩余任务完成:使用
    asyncio.wait()
    来等待所有任务完成,直到它们都执行完毕。

示例代码如下:

import asyncio

async def example_coroutine(name):
    # 这是一个模拟任务的协程,睡眠 1 秒钟
    await asyncio.sleep(1)
    print(f"任务 {name} 完成。")

async def main():
    # 创建多个协程任务
    tasks = [asyncio.create_task(example_coroutine(name = str(i))) for i in range(5)]
    
    # 获取所有正在运行的任务
    all_tasks = asyncio.all_tasks()
    
    # 获取当前正在运行的任务(即 main 协程)
    current_task = asyncio.current_task()
    
    # 从任务集合中移除当前任务
    all_tasks.remove(current_task)
    
    # 等待所有其他任务完成
    await asyncio.wait(all_tasks)

# 启动事件循环并执行主协程
asyncio.run(main())

2.3 任务的延迟后运行和后续运行

2.3.1 任务的延迟后运行

想要实现任务的延迟后运行,可以通过开发一个自定义的包装协程,使其在延迟指定时间后执行目标协程。该包装协程接受两个参数:目标协程和延迟时间(单位为秒)。它会先休眠指定的延迟时间,然后执行传入的目标协程。

以下代码展示了如何通过自定义包装协程
delay
,在指定的延迟时间后执行目标协程。
delay
协程通过
asyncio.sleep()
实现延时,随后再执行传入的目标协程。可以在不同场景中使用该方法,如直接挂起协程或将任务安排为独立执行:

import asyncio

# 延迟几秒后启动另一个协程的包装协程
async def delay(coro, seconds):
    """
    延迟指定时间(秒)后执行目标协程。

    参数:
    coro: 要执行的目标协程
    seconds: 延迟时间,单位为秒
    """
    # 暂停指定时间(以秒为单位)
    await asyncio.sleep(seconds)
    # 执行目标协程
    await coro

# 示例目标协程
async def my_coroutine():
    print("目标协程开始执行")
    # 模拟一些工作
    await asyncio.sleep(2)
    print("目标协程执行完成")

# 使用包装协程时,可以创建协程对象并直接等待,或将其作为任务独立执行

# 1. 调用者可以挂起并调度延迟后的协程
async def main():
    print("延迟10秒后执行目标协程:")
    await delay(my_coroutine(), 10)
    print("目标协程已经完成执行")

# 2. 或者调用者可以安排延迟协程独立运行
async def schedule_task():
    print("将目标协程安排为独立任务,延迟10秒后执行")
    task = asyncio.create_task(delay(my_coroutine(), 10))
    await task  # 等待任务完成
    print("任务已完成")

# 运行示例
if __name__ == "__main__":
    asyncio.run(main())  # 运行主协程

    # 或者运行独立任务的调度
    # asyncio.run(schedule_task())

2.3.2 任务的后续运行

在asyncio中,触发后续任务的方式主要有三种:

  1. 通过已完成的任务本身调度后续任务
  2. 通过任务发起方调度后续任务
  3. 使用回调函数自动调度后续任务

逐一分析这三种方式:

1. 通过已完成的任务本身调度后续任务

已完成的任务可以触发后续任务的调度,通常依赖于某些状态检查来决定是否应该发起后续任务。任务调度可以通过
asyncio.create_task()
来完成。示例代码展示了运行指定任务后直接调度后续任务:

import asyncio

async def task():
    print("任务开始执行。")
    await asyncio.sleep(2)  # 模拟任务执行
    print("任务执行完成。")
    await followup_task()  # 在任务完成后直接调度后续任务

async def followup_task():
    print("正在执行后续任务。")
    await asyncio.sleep(2)  # 模拟后续任务执行
    print("后续任务执行完成。")

# 启动事件循环,执行任务
async def main():
    await task()

asyncio.run(main())

2. 通过任务发起方调度后续任务

任务发起方可以根据实际需要决定是否继续启动后续任务。在启动第一个任务时,可以保留
asyncio.Task
对象,通过检查任务的结果或状态,来判断是否启动后续任务。任务发起方还可以选择等待后续任务完成,也可以选择不等待。示例代码如下:

import asyncio

async def task():
    # 模拟一个任务
    await asyncio.sleep(1)
    return True  # 假设任务成功完成,返回True

async def followup_task():
    # 模拟后续任务
    await asyncio.sleep(1)
    print("后续任务执行")

async def main():
    # 发起并等待第一个任务
    task_1 = asyncio.create_task(task())
    
    # 等待第一个任务完成
    result = await task_1
    
    # 检查任务结果
    if result:
        # 发起后续任务
        await followup_task()

# 运行主程序
asyncio.run(main())

3. 使用回调函数自动调度后续任务

在任务发起时,可以为其注册一个回调函数。该回调函数会在任务完成后自动执行。回调函数接收一个
asyncio.Task
对象作为参数,但它不会等待后续任务的执行。因为回调函数通常是普通的Python函数,无法进行异步操作。示例代码:

import asyncio

# 定义回调函数
def callback(task):
    # 安排并启动后续任务
    # 注意:这里不能直接使用 await,需通过 create_task 调度异步任务
    asyncio.create_task(followup())

# 定义第一个异步任务
async def work():
    print("工作任务正在执行...")
    await asyncio.sleep(2)  # 模拟一些异步操作
    print("工作任务完成!")

# 定义后续异步任务
async def followup():
    print("后续任务正在执行...")
    await asyncio.sleep(1)  # 模拟一些异步操作
    print("后续任务完成!")

# 创建事件循环并运行任务
async def main():
    # 发起任务并注册回调函数
    task = asyncio.create_task(work())
    task.add_done_callback(callback)

    # 等待任务完成
    await task
    # 确保后续任务完成
    await asyncio.sleep(1)  # 等待回调任务完成的时间

# 执行事件循环
asyncio.run(main())

2.4 如何显示运行任务的进度

2.4.1 基于回调函数的任务进度显示

每个任务的回调函数可用于显示进度。
asyncio.Task
对象支持注册回调函数,这些函数会在任务完成时被调用,无论是正常完成还是以异常结束。回调函数是普通函数而非协程,且接受与其关联的
asyncio.Task
对象作为参数。通过为所有任务注册相同的回调函数,可以统一报告任务进度:

import asyncio

# 回调函数,用于显示任务完成的进度,区分任务
def progress(task):
    task_name = task.get_name()  # 获取任务的名称
    print(f"任务 {task_name} 完成。")  

async def example_task(n, task_name):
    """模拟一个异步任务,表示处理n秒的任务,并设置任务名称"""
    await asyncio.sleep(n)
    return task_name

async def main():
    # 定义多个异步任务并添加回调函数
    tasks = []
    for i in range(1, 6):
        task_name = f"Task-{i}"  # 为每个任务分配一个唯一名称
        task = asyncio.create_task(example_task(i, task_name))  # 创建任务,模拟不同的执行时间
        task.set_name(task_name)  # 设置任务名称
        # 为任务添加回调函数,回调函数会在相应任务执行完毕时被调用
        task.add_done_callback(progress)  
        tasks.append(task)

    # 等待所有任务完成
    await asyncio.gather(*tasks)

# 运行主程序
asyncio.run(main())

2.4.2 基于tqdm库的任务进度显示

使用tqdm库显示任务总体进度

以下代码演示了如何结合
tqdm
库和
asyncio
库,来展示异步任务的总体执行进度:

import asyncio
from tqdm.asyncio import tqdm

async def example_task(n, task_name):
    """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""
    await asyncio.sleep(n)  # 模拟任务处理时间
    return task_name  # 返回任务名称

async def main():
    # 定义多个异步任务并使用 tqdm 显示进度
    tasks = []
    total_tasks = 5  # 总任务数
    task_durations = [1, 2, 3, 4, 5]  # 每个任务的持续时间(秒)

    # 使用 tqdm 创建进度条,`total` 为任务的数量
    progress_bar = tqdm(total=total_tasks, desc="已完成任务数", ncols=100)

    # 创建任务
    for i, n in enumerate(task_durations):
        task_name = f"Task-{i+1}"  # 为每个任务分配一个唯一名称
        task = asyncio.create_task(example_task(n, task_name))  # 创建任务,模拟不同的执行时间
        tasks.append(task)

    # 等待任务完成并更新进度条
    for task in asyncio.as_completed(tasks):
        await task  # 等待每个任务完成
        progress_bar.update(1)  # 每完成一个任务,更新进度条

    progress_bar.close()  # 关闭进度条

# 运行主程序
asyncio.run(main())

使用tqdm库为多个任务设置单独进度条

以下示例代码演示了如何使用
asyncio
并行执行多个异步任务,同时通过
tqdm
库为每个任务单独显示进度条:

import asyncio
from tqdm.asyncio import tqdm 

async def example_task(n, task_name, progress_bar):
    """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""
    for _ in range(n):  # 每秒更新一次进度
        await asyncio.sleep(1)  # 模拟任务处理时间
        progress_bar.update(1)  # 更新当前任务的进度
    return task_name  # 返回任务名称

async def main():
    # 定义多个异步任务并使用 tqdm 显示进度
    tasks = []
    total_tasks = 5  # 总任务数
    task_durations = [1, 2, 3, 4, 5]  # 每个任务的持续时间(秒)

    # 创建进度条并为每个任务单独设置
    progress_bars = []
    for i, n in enumerate(task_durations):
        task_name = f"Task-{i+1}"  # 为每个任务分配一个唯一名称
        progress_bar = tqdm(total=n, desc=task_name, ncols=100, position=i)  # 创建任务对应的进度条
        progress_bars.append(progress_bar)
        task = asyncio.create_task(example_task(n, task_name, progress_bar))  # 创建任务
        tasks.append(task)

    # 等待任务完成
    await asyncio.gather(*tasks)  # 使用 asyncio.gather 同时等待所有任务完成

    # 关闭所有进度条
    for progress_bar in progress_bars:
        progress_bar.close()

# 运行主程序
asyncio.run(main())

2.5 如何在asyncio中执行阻塞I/O或CPU密集型函数

在编程中,“阻塞调用”指的是某些操作(例如读取文件、等待网络请求或执行数据库查询等)需要一定时间才能完成。在执行这些操作时,程序会暂停,无法继续处理其他任务,这就是“阻塞”。另外,CPU密集型操作也可能会导致程序阻塞。因此,为了在异步环境中仍然能够处理阻塞调用,asyncio模块提供了两种方法来在异步程序中执行阻塞调用:

  • asyncio.to_thread()
    :此方法简化了线程管理流程,特别适合处理大多数I/O密集型任务。它允许将阻塞调用委派给一个线程,从而避免阻塞主事件循环。
  • loop.run_in_executor()
    :此方法提供了更高的灵活性,支持使用自定义的执行器,比如线程池或进程池。这适用于需要精细控制执行环境的场景。

这两种方法均可有效地将阻塞调用转为异步任务,以下逐一分析这两种方式:

2.5.1 使用
asyncio.to_thread()

asyncio.to_thread()
是一个高级 API,适用于大多数应用场景。它会将指定的函数和参数提交到一个独立的线程中执行,并返回一个可等待的协程。这样,阻塞操作就可以在后台线程池中执行,而不会阻塞事件循环。需要注意的是,任务并不会立即执行,而是会等待事件循环空闲时再开始执行。由于
asyncio.to_thread()
会在后台创建一个
ThreadPoolExecutor
来处理阻塞任务,因此它特别适合 I/O 密集型的操作。示例代码如下:

import asyncio
import time

def blocking_task(task_id):
    # 模拟一个耗时的阻塞操作
    time.sleep(2)
    return f"任务 {task_id} 完成"

# 同步执行多个任务
def sync_main():
    start_time = time.time()
    
    # 顺序执行多个阻塞任务
    results = [blocking_task(i) for i in range(5)]
    
    end_time = time.time()
    
    for result in results:
        print(result)
    
    print(f"同步任务执行时间: {end_time - start_time:.4f} 秒")

# 异步运行多个阻塞任务
async def async_main():
    start_time = time.time()
    
    # 使用 asyncio.to_thread 来并发运行多个阻塞任务
    tasks = [asyncio.to_thread(blocking_task, i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    
    for result in results:
        print(result)
    
    print(f"异步任务执行时间: {end_time - start_time:.4f} 秒")

# 执行同步任务
print("同步执行开始:")
sync_main()

# 执行异步任务
print("\n异步执行开始:")
asyncio.run(async_main())

以上代码展示了同步执行阻塞任务与异步执行阻塞任务的对比。通过使用
asyncio.to_thread()
,I/O 密集型操作的处理被委托给独立的线程池,从而避免了阻塞事件循环,显著提升了异步任务的效率:

  • 同步执行:在
    sync_main()
    中,多个阻塞任务按顺序逐一执行,每个任务需等待前一个任务完成后才能开始,整体执行时间为所有任务总时间(即 5 * 2 秒)。
  • 异步执行:在
    async_main()
    中,多个阻塞任务并发执行。尽管每个任务仍然是阻塞的,但它们在后台线程中并行处理,因此总执行时间仅为单个任务的执行时间(即约 2 秒)。

代码运行结果如下:

同步执行开始:
任务 0 完成
任务 1 完成
任务 2 完成
任务 3 完成
任务 4 完成
同步任务执行时间: 10.0317 秒

异步执行开始:
任务 0 完成
任务 1 完成
任务 2 完成
任务 3 完成
任务 4 完成
异步任务执行时间: 2.0089 秒

2.5.2 使用
loop.run_in_executor()

loop.run_in_executor()

asyncio
提供的低级API,需先获取事件循环(例如,使用
asyncio.get_running_loop()
)。该函数允许指定执行器(默认是
ThreadPoolExecutor
)以及要执行的函数。


asyncio.to_thread()
相比,
run_in_executor()
提供了更大的灵活性,支持使用自定义执行器,而不仅限于线程池。此外,调用该函数后,任务会立即开始执行,无需等待返回的可等待对象来触发任务的启动。

示例代码如下:

import asyncio
import time

# 定义一个需要执行的阻塞任务
def task():
    print("任务开始")
    time.sleep(2)
    print("任务结束")


# 在单独的线程中执行函数
async def main():
    # 获取事件循环
    loop = asyncio.get_running_loop()
    # 使用run_in_executor来将task函数异步执行在线程池中
    # None 表示使用默认的线程池执行器
    await loop.run_in_executor(None, task)

# 执行主任务
asyncio.run(main())

如果希望使用进程池,可以创建一个自定义的执行器并传递给
run_in_executor()
。在这种情况下,调用者需要负责管理执行器的生命周期,使用完后要手动关闭。示例代码如下:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import time

# 定义一个耗时的任务
def task(name):
    print(f"任务 {name} 开始")
    time.sleep(2)  # 模拟一个阻塞的操作
    print(f"任务 {name} 完成")
    return f"来自 {name} 的结果"

# 使用自定义的执行器来运行任务
async def main():
    # 创建一个进程池
    with ProcessPoolExecutor() as executor:
        # 获取当前的事件循环
        loop = asyncio.get_running_loop()

        # 使用 run_in_executor 来在进程池中执行任务
        results = await asyncio.gather(
            loop.run_in_executor(executor, task, "A"),
            loop.run_in_executor(executor, task, "B"),
            loop.run_in_executor(executor, task, "C")
        )

        # 打印所有任务的结果
        for result in results:
            print(result)

# 启动 asyncio 事件循环并执行 main
if __name__ == "__main__":
    asyncio.run(main())

2.6 Python协程:操作系统原生支持吗

异步编程和协程并不总是解决程序中所有并发问题的最佳方案。Python 中的协程是由软件管理的,它们通过asyncio事件循环来执行和调度。与操作系统提供的线程和进程不同,协程并不由操作系统直接支持,而是通过Python的软件框架来实现的。在这个意义上,Python中的协程并不是“原生”的。它们并不像线程或进程那样具有独立的执行上下文,反而是在同一个线程内通过协作式调度来切换任务。

此外,Python的GIL(全局解释器锁)用来保护解释器内部的状态,防止多个线程同时访问和修改解释器的数据。而asyncio的事件循环是单线程运行的,这意味着所有的协程都在同一个线程里执行。由于协程本身是通过事件循环调度的,而不是通过多线程或多进程并行执行,因此,尽管Python中的多线程模型受到GIL的限制,协程在处理 I/O 密集型任务时能够有效避免GIL的影响,从而提高并发性能。这也是为什么在处理大量I/O操作时,使用asyncio和协程能够带来较好的性能表现。

然而,协程并不适用于所有类型的并发任务。例如,对于计算密集型任务,使用线程或进程模型可能更为合适,因为协程并不会突破GIL的限制,计算密集型任务依然会在单个CPU核心上串行执行。因此,在选择是否使用协程时,需要根据任务的特性做出权衡。

3 应用实例

3.1 在基于线程的程序中调用asyncio代码

直接调用同步I/O代码

以下代码实现了一个简单的Tkinter应用,点击按钮后,程序会发起一个同步HTTP请求(GET 请求)。在每60毫秒的刷新周期中,程序会根据当前状态更新显示的文本。然而,当点击按钮时,
request_remote
方法中的
requests.get
会发起一个同步请求,这会阻塞主线程,从而导致界面卡顿或无响应。如下代码,
App.QUERYING_STATE
状态相关信息不会显示出来:

import tkinter as tk
import requests


class App(tk.Tk):
    INIT_STATE = 0         # 初始化状态
    QUERYING_STATE = 1     # 请求中状态
    RESULT_STATE = 2       # 请求结果状态

    def __init__(self):
        super().__init__()
        self.status_code = 0           # HTTP请求返回的状态码
        self._refresh_ms = 60          # 刷新间隔时间(毫秒)
        self.state = App.INIT_STATE    # 初始状态
        self._button = None            # 按钮
        self._label = None             # 标签
        self.render_elements()         # 渲染界面元素
        self.after(self._refresh_ms, self.refresh)  # 设置定时刷新,定时调用refresh方法

    def render_elements(self):
        """ 设置界面布局,渲染UI元素 """
        self.geometry("400x200")  # 设置窗口大小
        self._button = tk.Button(self, text="请求状态码", command=self.request_remote)  # 创建按钮,点击时调用request_remote方法
        self._label = tk.Label(self, text="")  # 创建标签,初始为空

        self._button.pack()  # 将按钮添加到窗口中
        self._label.pack()   # 将标签添加到窗口中

    def request_remote(self):
        """ 发起同步HTTP请求 """
        self.state = App.QUERYING_STATE  # 设置状态为请求中
        response = requests.get("https://www.example.com")  # 发起GET请求,获取响应
        self.status_code = response.status_code  # 获取响应返回的状态码
        self.state = App.RESULT_STATE  # 设置状态为结果状态,表示请求已完成

    def refresh(self):
        """ 每60毫秒刷新一次UI内容 """
        self.update_label()  # 更新标签内容
        self.after(self._refresh_ms, self.refresh)  # 设置下次刷新时间(每60毫秒刷新一次)

    def update_label(self):
        """ 根据应用状态更新标签内容 """
        if self.state == App.INIT_STATE:
            self._label.config(text="这里将显示状态码。")  # 初始状态下提示文字
        elif self.state == App.QUERYING_STATE:
            self._label.config(text="正在查询远程...")  # 请求中状态时显示提示文字
        elif self.state == App.RESULT_STATE:
            self._label.config(text=f"返回的状态码是: {self.status_code}")  # 请求结果状态时显示返回的状态码

    def start(self):
        self.mainloop()  # 启动Tkinter事件循环,进入GUI界面

def main():
    app = App()  # 创建应用实例
    app.start()  # 启动应用

if __name__ == "__main__":
    main()  

I/O请求的异步调用

可以将requests包替换为aiohttp包,实现I/O请求的异步调用。aiohttp和requests都是Python中常用的HTTP客户端库,但requests适用于同步场景,简单易用,aiohttp则适用于异步并发的场景,能够处理大量并行请求。具体区别如下:

  1. 同步vs异步:
  • requests是一个同步库,意味着每次发送请求时,程序会等待响应回来后才继续执行。适用于一些简单的、串行的HTTP请求场景。
  • aiohttp是一个异步库,基于Python的asyncio模块,能够在发送HTTP请求时非阻塞地继续执行其他任务。适用于需要大量并发请求或长时间等待的异步场景。
  1. 性能:
  • requests由于是同步的,处理大量请求时容易出现性能瓶颈,因为每个请求必须等待前一个请求完成。
  • aiohttp通过异步I/O处理,可以在等待响应时同时发起其他请求,极大提高了并发性能,尤其在处理大量HTTP请求时。
  1. 用法:
  • requests用法简单,适合初学者和一般同步的任务。
  • aiohttp需要使用async和await,适合需要并发或异步操作的任务。

在上述示例代码中,为了替代requests模块的同步请求,可以创建一个继承自
App
类的
AppAsync
类,并利用aiohttp和asyncio库实现异步请求。通过
async_request
方法异步发起HTTP请求:

import aiohttp
import asyncio

class AppAsync(App):
    async def async_request(self):
        """ 
        异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。
        """
        async with aiohttp.ClientSession() as session:  # 创建一个aiohttp会话对象
            async with session.get("https://www.example.com") as response:  # 发起GET请求
                self.status_code = response.status  # 获取响应状态码
                self.state = App.RESULT_STATE  # 更新应用状态

    def __int__(self):
        super().__init__()

    def request_remote(self):
        """ 使用asyncio.run来调用异步请求代码 """
        self.state = self.QUERYING_STATE  # 设置状态为请求中
        asyncio.run(self.async_request())  # 异步发起HTTP请求

def main():
    app = AppAsync()  # 创建应用实例
    app.start()  # 启动应用

if __name__ == "__main__":
    main()  # 运行主程序

然而
AppAsync
类中的
asyncio.run(self.async_request())
会阻塞Tkinter的主线程,因为
asyncio.run()
会一直运行,直到异步任务完成。同时Tkinter自身有一个事件循环(mainloop()),与
asyncio
需要的事件循环冲突。如果在Tkinter内创建新事件循环,可能会导致Tkinter关闭或中断后出现问题。

将asyncio与线程结合

为了解决asyncio事件循环阻塞的问题,可以使用一个单独的守护线程,并在守护线程中运行事件循环,这样asyncio的事件循环就不会阻塞主线程。重写
AppAsync
类示例如下:

import aiohttp
import asyncio
import threading

class AppAsync(App):
    def __init__(self):
        super().__init__()
        self._loop_thread = threading.Thread(target=self.run_asyncio_loop, daemon=True)
        self._loop_thread.start()  # 启动事件循环线程

    async def async_request(self):
        """ 
        异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。
        """
        async with aiohttp.ClientSession() as session:  # 创建一个aiohttp会话对象
            async with session.get("https://www.example.com") as response:  # 发起GET请求
                self.status_code = response.status  # 获取响应状态码
                self.state = App.RESULT_STATE  # 更新应用状态

    def request_remote(self):
        """ 使用异步请求,在事件循环中执行 """
        self.state = App.QUERYING_STATE  # 设置状态为请求中
        asyncio.run_coroutine_threadsafe(self.async_request(), self._loop)  # 调用异步请求并与当前事件循环进行交互

    def run_asyncio_loop(self):
        """ 运行asyncio事件循环 """
        self._loop = asyncio.new_event_loop()  # 创建新的事件循环
        asyncio.set_event_loop(self._loop)  # 设置当前线程的事件循环
        self._loop.run_forever()  # 启动事件循环

def main():
    app = AppAsync()  # 创建应用实例
    app.start()  # 启动应用

if __name__ == "__main__":
    main()  # 运行主程序

示例代码运行时,
App.QUERYING_STATE
状态相关信息会显示出来,
AppAsync
类主要的改动点如下:

  1. AppAsync
    类的构造函数:
    • 增加了一个新的线程来运行asyncio事件循环,避免在Tkinter线程中阻塞。
    • 使用
      threading.Thread
      启动一个守护线程,执行
      run_asyncio_loop
      方法,确保事件循环在后台运行。
    • 在创建线程时设置为守护线程。这样即使主线程退出,守护线程也会自动结束。
  2. run_asyncio_loop
    方法:
    • 在一个单独的线程中启动新的asyncio事件循环。
    • 使用
      asyncio.set_event_loop
      设置当前线程的事件循环,并调用
      loop.run_forever()
      来保持事件循环持续运行。
  3. request_remote
    方法:
    • 使用
      asyncio.run_coroutine_threadsafe
      将异步请求任务提交给后台事件循环执行,用于在非主线程中安全地执行协程。

3.2 基于asyncio实现多核异步处理

单核异步处理

asyncio的并发机制是基于协作式多任务(协程),它不会并行地使用多个CPU核心来加速计算,所有的任务都是在单个核心上轮流执行的。以下代码模拟了1000个爬虫任务,并使用单核异步来执行:

import random
import asyncio
import time

# 模拟爬虫任务,执行时会有随机的延迟
async def fake_crawlers():
    # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数
    io_delay = round(random.uniform(0.2, 1.0), 2)
    await asyncio.sleep(io_delay)

    result = 0
    # 随机生成100,000到500,000之间的数字,用于模拟计算密集型任务
    # 这段代码耗时大约0.2秒到0.5秒之间
    for i in range(random.randint(100000, 500000)):
        result += i
    return result

# 主程序入口,负责创建并执行多个爬虫任务
async def main():
    # time.monotonic()是用于测量时间间隔的可靠方法,它不受系统时间更改的影响
    start = time.monotonic()
    tasks = [asyncio.create_task(fake_crawlers()) for i in range(1000)]  # 模拟创建1000个任务

    await asyncio.gather(*tasks)  # 等待所有任务完成
    # 输出所有任务完成的时间
    print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")
    
# 启动程序
asyncio.run(main())

代码运行结果如下:

所有任务已完成,耗时 8.51 秒

多核异步处理

要实现多核异步处理,可以将异步编程和多进程池结合起来使用。具体来说,主程序会把任务分成多个批次,每个批次由不同的进程来处理。每个进程内部,多个任务又是通过异步方式并行执行的。这样一来,计算密集型的任务可以通过多进程并行处理,而每个进程内部的I/O操作则可以通过asyncio来异步管理,从而大幅提高整体效率。示例代码如下,代码将1000个任务分布到10个子进程中并行执行,每个子进程执行100个模拟的爬虫任务:

import random
import asyncio  #
import time  
from concurrent.futures import ProcessPoolExecutor  

# 模拟爬虫任务,执行时会有随机的延迟
async def fake_crawlers():
    # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数
    io_delay = round(random.uniform(0.2, 1.0), 2)
    await asyncio.sleep(io_delay)

    result = 0
    # 随机生成100,000到500,000之间的数字,用于模拟阻塞任务
    # 这段代码耗时大约0.2秒到0.5秒之间
    for i in range(random.randint(100000, 500000)):
        result += i
    return result

# 并发查询任务,通过起始和结束索引分配任务
async def query_concurrently(begin_idx: int, end_idx: int):
    """ 启动并发任务,通过起始和结束序列号 """
    tasks = []  
    # 根据给定的索引范围(从 begin_idx 到 end_idx),创建并发任务
    for _ in range(begin_idx, end_idx, 1):
        tasks.append(asyncio.create_task(fake_crawlers()))  
    # 等待所有任务完成,并返回每个任务的结果
    results = await asyncio.gather(*tasks)
    return results 

# 批量任务执行函数,使用子进程池并行执行任务
def run_batch_tasks(batch_idx: int, step: int):
    """ 在子进程中执行批量任务 """
    # 计算当前批次任务的起始和结束索引
    begin = batch_idx * step + 1  # 当前批次任务的起始索引
    end = begin + step  # 当前批次任务的结束索引

    # 使用 asyncio.run() 启动异步任务并获取结果
    results = [result for result in asyncio.run(query_concurrently(begin, end))]
    return results  

# 主函数,分批次将任务分配到子进程中执行
async def main():
    """ 将任务分批次分配到子进程中执行 """
    start = time.monotonic()  

    loop = asyncio.get_running_loop()  # 获取当前运行的事件循环
    # 创建进程池执行器,用于将任务分配到多个子进程中执行
    with ProcessPoolExecutor() as executor:
        # 启动多个批次任务,并行执行。每个批次执行 100个任务,共启动10个批次
        tasks = [loop.run_in_executor(executor, run_batch_tasks, batch_idx, 100)
                 for batch_idx in range(10)] 

    # 等待所有子进程任务完成,并将结果汇总
    results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list]

    # 输出所有任务完成的时间
    print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")

# 程序入口
if __name__ == "__main__":
    asyncio.run(main())  

代码运行结果如下:

所有任务已完成,耗时 1.83 秒

3.3 图片下载器

若经常需要从互联网下载文件,可以使用aiohttp库来实现任务的自动化。下面提供了一个简单的脚本,用于从指定URL下载文件:

建立本地图片服务器

为了提供图片下载链接,以下代码展示了如何使用FastAPI框架创建一个简单的Web应用程序,用于上传、管理和访问图片:

import os
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import uvicorn

app = FastAPI()

# 配置图片存储目录
UPLOAD_DIR = "./uploaded_images"
if not os.path.exists(UPLOAD_DIR):
    os.makedirs(UPLOAD_DIR)

# 将图片目录挂载为静态文件目录
app.mount("/images", StaticFiles(directory=UPLOAD_DIR), name="images")

# 上传图片接口
@app.post("/upload/")
async def upload_image(file: UploadFile = File(...)):
    try:
        # 定义图片保存路径
        file_path = os.path.join(UPLOAD_DIR, file.filename)
        
        # 保存图片到本地
        with open(file_path, "wb") as f:
            f.write(file.file.read())

        # 返回图片的访问 URL
        image_url = f"http://127.0.0.1:8000/images/{file.filename}"
        return {"image_url": image_url}
    
    except Exception as e:
        return {"error": str(e)}

# 获取所有上传图片的链接
@app.get("/images_list/")
async def list_images():
    try:
        # 获取目录下的所有文件
        files = os.listdir(UPLOAD_DIR)
        image_urls = [f"http://127.0.0.1:8000/images/{file}" for file in files if os.path.isfile(os.path.join(UPLOAD_DIR, file))]
        return {"image_urls": image_urls}
    except Exception as e:
        return {"error": str(e)}

# 获取单个图片
@app.get("/image/{image_name}")
async def get_image(image_name: str):
    try:
        file_path = os.path.join(UPLOAD_DIR, image_name)
        if os.path.exists(file_path):
            return FileResponse(file_path)
        else:
            return {"error": "Image not found"}
    except Exception as e:
        return {"error": str(e)}

# 启动 FastAPI 服务器
if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

该代码实现了一个图片上传和访问服务,包含以下三个主要接口:

  • 服务器启动后,会监听本地地址
    127.0.0.1
    的8000端口。
  • 客户端可以通过以下方式与服务器进行交互:
    • 访问
      http://127.0.0.1:8000/upload/
      上传图片,并获取返回的图片 URL。
    • 访问
      http://127.0.0.1:8000/images_list/
      查看所有已上传图片的 URL。
    • 访问
      http://127.0.0.1:8000/images/{image_name}
      来查看特定图片。

注意,所有上传和保存的图片都会保存在本地的
uploaded_images
文件夹中。

图片下载

以下代码利用了aiohttp、asyncio和aiofiles库,通过异步方式从API获取图片URL列表,并将图片下载到指定目录。借助这些库的结合,代码能够高效地处理HTTP请求、文件下载和文件操作,同时确保主程序的执行不被阻塞:

import aiohttp  # 导入 aiohttp 库,用于异步 HTTP 请求
import asyncio  # 导入 asyncio 库,用于管理异步任务
import aiofiles  # 导入 aiofiles 库,用于异步文件操作
import os 

# 获取图片 URL 列表的异步函数
async def get_image_urls(api_url):
    try:
        # 使用 aiohttp 启动一个异步 HTTP 会话
        async with aiohttp.ClientSession() as session:
            # 异步发送 GET 请求以获取 API 返回的数据
            async with session.get(api_url) as response:
                # 如果响应状态码是 200 (请求成功)
                if response.status == 200:
                    # 将响应内容解析为 JSON 格式
                    data = await response.json()
                    # 从 JSON 数据中提取图片 URL 列表,若没有则返回空列表
                    return data.get("image_urls", [])
                else:
                    # 如果请求失败,打印错误信息
                    print(f"从 {api_url} 获取图片列表失败。状态码: {response.status}")
                    return []
    except Exception as e:
        # 如果发生任何异常,打印错误信息
        print(f"获取图片列表时发生错误: {e}")
        return []

# 下载文件的异步函数
async def download_file(url, save_directory):
    try:
        # 使用 aiohttp 启动异步 HTTP 会话
        async with aiohttp.ClientSession() as session:
            # 异步发送 GET 请求以获取文件内容
            async with session.get(url) as response:
                # 如果响应状态码是 200 (请求成功)
                if response.status == 200:
                    # 确保保存文件的目录存在,若不存在则创建
                    os.makedirs(save_directory, exist_ok=True)
                    
                    # 从 URL 中提取文件名
                    filename = os.path.join(save_directory, url.split('/')[-1])
                    # 异步打开文件以进行写入操作
                    async with aiofiles.open(filename, 'wb') as file:
                        # 读取响应内容
                        content = await response.read()
                        # 将内容写入本地文件
                        await file.write(content)
                    print(f"已下载 {filename}")
                else:
                    # 如果下载失败,打印错误信息
                    print(f"下载 {url} 失败。状态码: {response.status}")
    except Exception as e:
        # 如果发生任何异常,打印错误信息
        print(f"下载 {url} 时发生错误: {e}")

# 根据获取的图片 URL 列表进行下载的异步函数
async def download_images(api_url, save_directory):
    # 调用 get_image_urls 函数获取图片 URL 列表
    image_urls = await get_image_urls(api_url)
    
    # 如果没有获取到图片 URL,则打印提示并返回
    if not image_urls:
        print("没有找到需要下载的图片。")
        return
    
    # 为每个图片 URL 创建一个下载任务
    tasks = [download_file(url, save_directory) for url in image_urls]
    
    # 使用 asyncio.gather 并行执行所有下载任务
    await asyncio.gather(*tasks)

# 启动事件循环,开始下载图片
if __name__ == "__main__":
    # API 地址,提供图片 URL 列表
    api_url = "http://127.0.0.1:8000/images_list/"
    # 指定保存下载图片的目录
    save_directory = "downloads"  

    # 获取事件循环并运行下载任务
    loop = asyncio.get_event_loop()
    loop.run_until_complete(download_images(api_url, save_directory))

3.4 生产者消费者模型

生产者-消费者模型(Producer-Consumer Model)是一种经典的并发编程模式,旨在解决多个任务之间生产和消费的协调问题,从而确保资源得到合理利用并保证数据按顺序处理。该模型通过生产者和消费者两个角色,模拟共享资源的生产和消费过程。以下代码实现了一个基本的生产者-消费者模型,采用了asyncio进行异步任务处理:

import asyncio
from asyncio import Queue
from typing import List

# 生产者函数,负责将物品添加到队列
async def produce_items(queue: Queue, items: List[int], producer_name: str):
    for item in items:
        await queue.put(item)  # 将物品放入队列
        print(f"{producer_name} 添加物品:{item}")
        await asyncio.sleep(0.5)  # 模拟生产过程中的等待时间
    print(f"{producer_name} 完成所有物品的生产")

# 消费者函数,负责从队列中取出并处理物品
async def consume_items(queue: Queue, consumer_name: str):
    while True:
        item = await queue.get()  # 阻塞直到获取到一个物品
        if item is None:  # 使用None作为结束信号
            queue.task_done()  # 标记任务完成
            break  # 退出循环
        print(f"{consumer_name} 处理物品:{item}")
        await asyncio.sleep(1)  # 模拟处理物品的时间
        queue.task_done()  # 标记任务完成

# 主函数,负责启动多个生产者和消费者任务
async def main():
    queue = Queue()  # 创建一个队列
    items_to_produce = ['A','B','C','D']  # 需要生产的物品列表
    
    # 创建产者任务(例如3个生产者)
    producer_tasks = [
        asyncio.create_task(produce_items(queue, items_to_produce, f"生产者_{i}"))
        for i in range(3)  
    ]
    
    # 创建消费者任务(例如2个消费者)
    consumer_tasks = [
        asyncio.create_task(consume_items(queue, f"消费者_{i}"))
        for i in range(2)
    ]
    
    # 等待所有生产者任务完成
    await asyncio.gather(*producer_tasks)
    
    # 生产者完成后,发送 None 给消费者,通知它们退出
    for _ in consumer_tasks:
        await queue.put(None)
    
    # 等待队列中的所有任务处理完成
    await queue.join()
    
    # 等待所有消费者任务完成
    await asyncio.gather(*consumer_tasks)

if __name__ == '__main__':
    # 运行主函数
    asyncio.run(main())

4 参考

为帮助开发者筛选出优质的
免费网站部署服务
,本文将从
体验

数值
上全面体验测试全球主流现代化前端部署平台。

体验对象包括:
Vercel

Cloudflare Pages

GitHub Pages
和新发现的腾讯云
EdgeOne Pages
。测量指标包括
全球访问时延

国内访问时延

Google Lighthouse 测量指标
,以及用户在使用过程中部署流畅程度。

需要说明的是,尽管
AWS Pages

Netlify
也备受关注,由于其注册和验证过程对国内用户不够友好(需要绑定银行卡及身份防欺诈验证),未纳入此次体验范围。

希望通过本次体验分享,为大家提供有价值的参考,帮助挑选最适合的免费网站部署服务。

体验对象设定

静态网站应用广泛,包括博客、公司主页、文档和展示页面等。本次体验对象聚焦于基于 React 框架构建的
摄影图片展示网站

此网站的静态资源总大小约为 110MB,采用
create-react-app
工具生成,为比较典型的展示型网站。

website display

以下是部署成功后的链接。您可以在查看具体体验结果之前,直观体验各网站的加载速度和内容渲染速度:

平台 部署地址
Vercel https://testvc.wenyiqing.tech/
Cloudflare https://testcf.wenyiqing.tech/
Github https://testgh.wenyiqing.tech/
EdgeOne https://testeo.wenyiqing.tech/

备注:

由于部署平台自动分配的访问域名国内可能无法访问。为了精准体现测试效果,所有部署的网站均添加了自定义域名。

平台优缺点及部署体验

Vercel

部署成功网址,
点击体验

简洁的网站设计,丝滑的部署流程,从注册到部署成功只需要5分钟。

体验结论

Vercel 专注于提供高效、快速和简便的开发与部署体验,整体部署
体验
甚至优于 Cloudflare。缺点是免费计划的资源较少。

优势

  1. 页面简洁且流畅
    :Vercel 的网站页面体验很好,数据展示清晰干净。
    2.
    Serverless 函数支持
    :支持 Serverless 函数,可以在前端项目中轻松添加后端逻辑。

缺点

  1. 资源限制
    :免费计划限制源文件大小为 100MB,对于静态资源较多的网站难以适用,需要额外购买资源存储服务。

  2. 构建次数受限
    :免费版本每天最多允许 100 次部署,且有多种限制,正常用户容易触发上限。

Cloudflare Pages

部署成功网址,
点击体验

使用 Cloudflare 部署网站流程顺畅,步骤清晰明了,官网博客中的教程和指引十分完善。此外,Cloudflare 还提供丰富的收费拓展功能,以满足高要求用户的需求。

Cloudflare 已整合 Pages 和 Workers 架构,便于开发人员在需要添加更多动态功能时实现无缝迁移。

Cloudflare Workers 是一种无服务器计算平台,开发人员可以编写和部署轻量级代码来执行复杂的逻辑,而无需依赖传统的后端服务器。

体验结论

Cloudflare Pages 优势明显,处于行业领先地位。但是在国内的访问时延特别高,不建议国内用户选择。并且免费计划的资源限制较严格,平台重心更倾向于付费用户。

优势

  1. 完善的CDN
    :Cloudflare 拥有强大的 CDN,确保部署网站全球访问速度极快,在国内表现也不错。

  2. 详尽教程
    :官网提供了非常详细的教程,包括文字和动图,对新手非常友好。

  3. 便捷的拓展和集成
    :付费功能包括多人协作和高效并发,支持开发者无缝迁移到 Workers。主流 CMS 都有丰富的插件,增强自动化部署流程。

缺点

  1. 国内访问高时延
    :Cloudflare 在国内访问时延很高,测试显示国内的请求会回源至美国,具体测试指标会在下文介绍。

  2. 受限的免费计划
    :免费版本不支持并发构建(不支持多人协作开发),每月仅有 500 次构建机会。

  3. 昂贵的付费计划
    :付费计划分为三个阶段:0 -> $ 20 -> $ 200 -> ∞,大多特色功能仅在付费计划中提供。

Github Pages

部署成功网址,
点击体验

GitHub Pages 依托于 GitHub 平台,由于其完全免费的定位和庞大的用户基础,吸引了大量开发者使用 GitHub Pages 进行网站部署。

体验结论

由于 Github 国内访问便捷且知名化程度高,GitHub Pages 成为前端新手学习部署的首选。活跃的社区也能帮助解决各种问题。

然而,在扩展性和自动化程度方面,GitHub Pages 与商业平台存在差距,其稳定性亦备受质疑。

优势

  1. 完全免费
    :无需担心因收费计划而被迫迁移。

  2. 社区活跃
    :社区活跃,专家众多,问题能够及时解决。

缺点

  1. 不稳定
    :GitHub Pages 在国内访问严重不稳定,常出现断连和访问缓慢的问题,测试过程中路由追踪路径也不理想。

  2. 无预制构建脚本
    :GitHub Pages 缺乏预配置的前端框架打包构建工作流,需要手动编写部署工作流文件,对初学者存在一定门槛。

  3. 资源有限
    :在商业或协作开发中,GitHub Pages 的资源限制可能无法满足需求。存储限额为 1GB,每小时最多 10 次构建部署限制了许多应用场景。

Tencent EdgeOne Pages

部署成功网址,
点击体验

EdgeOne Pages 是腾讯云 EdgeOne 刚推出的网站部署服务,目前 Beta 版
完全免费
,支持国内与国际站:

  1. 国内站
    :主要面向国内开发者,预置域名 .site,支持全球加速,国内访问体验最好,但是域名需要备案。

  2. 国际站
    :面向全球开发者,预置域名 .app,只有海外节点,国内访问速度还行,且支持未备案的域名。

体验结论

EdgeOne Pages 在国内表现最为出色,海外各项测量指标对比国际大牌稍弱一点。

平台为开发者提供了
丰富的免费资源
,并即将支持边缘函数 Serverless,尽管将来会引入收费计划,但还是值得期待。

优势

  1. 免费且限制少
    :目前平台几乎没有什么限制,没有收费功能,国际站也无需绑定信用卡。官方说明未来推出商业版本后免费计划可能会增加限制。

  2. 稳定的并发构建
    :EdgeOne Pages 在并发构建方面表现卓越。考虑到并发构建容易导致
    构建失败
    ,大部分平台对此都有严格限制。

缺点

  1. 功能简单
    :当前平台整体功能还较为简单,提供的前端模板也有限,看起来还在快速迭代中。

  2. 部分地区时延高
    :EdgeOne Pages 部署的网站在大多数海外地区表现良好,但部分地区访问时延较高,具体情况将在后文详细介绍。

优缺点总结

以上展示为平台
特有的
优点。共有优点如支持自定义域名和提供 SSL 服务等不再赘述。当然,各平台可能还有本次体验过程中未发现的优点,欢迎大家在评论区补充。

对于 Cloudflare、Vercel 等成熟平台来说,它们在业务层面的缺点很少,主要集中在国内访问时延和免费计划上。

在资源限制方面,开发者需求不同触顶情况不同,因此资源限制是否严苛需根据实际情况来判断。本次分享仅供讨论参考,无法涵盖所有使用场景。

部署网站指标测量

测量指标包括三类:

  1. 全球各地的访问时延
    :此项测试部署的静态网站客户端在世界各地的访问延迟,主要评估各个平台的 CDN 建设水平。

  2. 国内各地的访问时延
    :此项测试国内不同区域的访问延迟,主要评估国内用户的访问体验。这里为了能够更好的测量,对部署项目绑定了自定义域名。

  3. Google Lighthouse测量指标
    :Google Lighthouse 可以用于评估网页的性能、可访问性等。本次测量指标包含:首次内容绘制 (FCP)、最大内容绘制 (LCP)、总阻塞时间 (TBT) 和累积布局偏移 (CLS)等。这些指标能够反映用户在页面加载过程中的实际体验。

全球访问时延测量

全球访问时延是基于
平台分配的域名
测量的,如果平台对自定义域名进行了优化处理,测量结果可能与实际情况有所差异。

全球时延测量使用了多个工具验证,防止因工具选择出现偏差,下文展示图片测量工具为
CDNPerf

Vercel

Vercel 的表现相当不错,有趣的是,国内测试中并未出现高延迟点。进一步详细测量后发现,实际是由于平台分配的域名被禁用。
vercel global latency measurement

Cloudflare Pages

Cloudflare 测试显示非常强大,少量节点甚至测不出高延迟。为了详细测试增加了测量节点,可以看到在亚洲仍有不少高时延地区。

cloudflare pages global latency measurement

Github Pages

GitHub Pages 全球表现总体令人满意,仅在国内和亚太地区的节点出现访问不稳定的情况。

github pages global latency measurement

EdgeOne Pages

EdgeOne Pages 在海外大多数地区表现还行,部分地区时延较高。

edgeone pages global latency measurement

国内访问时延测量

因为有些平台预置域名被墙,为更好测试国内数据,对所有部署的网站绑定了自定义域名。其中 EdgeOne 使用国内站测试(支持海外节点)。

为保证测试结果的准确性,本文测试使用了多个平台验证,文中图片展示使用的工具为
boce

Vercel

Vercel 在国内的表现相当出色,并且用户注册和使用门槛明显低于 AWS 和 Netlify。

vercel pages China delay measurement

Cloudflare Pages

Cloudflare 在国内体验非常差。从路由追踪图可以看出,主要问题在于回源物理地址距离过远。

image

cloudflare pages China delay measurement

Github Pages

GitHub Pages 在国内的测速结果与其日常使用体验相符,表现出明显的不稳定性。测试过程中,多次测得的结果也存在较大差异。

github pages China delay measurement

Edgoene Pages

EdgeOne Pages 在国内的表现最佳,毕竟是国内厂商,国内节点优势很大。

image

Google Lighthouse实际使用体验测量

为评估网页性能,采用 Google Lighthouse 测量以下关键指标:CLS、LCP、FCP、TBT等。详细指标说明见
Google 官方文档

考虑到 Lighthouse 单次测量波动较大,为减少偶然误差,对单个站点进行 10 次测试,最后计算单次测量结果的均值。

以下是测量的结果:

Vercel Cloudflare GitHub EdgeOne
CLS(ms) 0.47 0.75 0.33 0.55
LCP(s) 2.08 1.06 2.56 5.28
FCP(ms) 489.38 725.79 338.17 397.42
TBT(ms) 201.55 49.3 47.33 56.92
Speed Index(s) 2.84 2.55 3.16 1.86
Performance 55 62 56 51

不同类型网站 Google Lighthouse 指标测试差别较大,本次分享指标仅供参考。

总结

整体来看,以
Cloudflare 和 Vercel
为代表的巨头平台方案多样,拓展功能强大,部署流程流畅。

GitHub Pages
目前已经解决了网上呼声很高的无法自定义域名、访问不稳定、SSL 支持的问题,相信之后的自动化程度应该也会提高。

EdgeOne Pages
带来了不小的惊喜。用户部署体验与巨头相比差距不大,各项指标测试表现良好,在国内的表现尤其突出。

本次分享涵盖了
免费计划
的内容,旨在帮助用户在初期找到优质的部署资源。由于付费计划复杂性,各平台在收费策略、资源配置和技术支持等方面存在较大差异,因此不做进一步探讨。

这是我整理的一份对照表,表内是各家免费计划的资源使用情况。

Vercel Cloudflare GitHub EdgeOne
自定义域名 ✅️ ✅️ ✅️ ✅️
SSL 支持 ✅️ ✅️ ✅️ ✅️
无限制请求数 ✅️ ✅️ ✅️ ✅️
站内域名注册 ✅️ ✅️
构建次数 100/day 500/month 10/hour 无限制
并发构建 1 1 10 无限制
带宽 100GB/month 无限制 100GB/month 无限制
部署网站个数 100 100 无限制 20

以上提到的“无限”指的是本次体验中
未触发上限
,且在官网上未找到
限制说明