2024年11月

升讯威在线客服与营销系统是基于 .net core / WPF 开发的一款在线客服软件,宗旨是:
开放、开源、共享。努力打造 .net 社区的一款优秀开源产品。

背景

随着下载私有化部署的用户越来越多,部署配置时的便利性到了需要进一步提升的时候。
部署时需要修改一些配置文件,除了直接手工修改外,之前我提供了一个 Windows 应用程序来自动化写入配置文件,但这有一些小的弊端,很多用户选择把文件上传到服务器上之后,直接在服务器上配置,而 Windows 界面程序无法在 Linux 、宝塔这样的环境中使用。用户还是只能手工修改配置文件,经常有修改错误的情况发生。
所以我提供了一个网页版本的配置文件生成工具,
直接在浏览器上使用,生成配置文件的内容,复制文件内容到终端的 vim 编辑器中直接粘贴即可
。高效,实用。

这个网页版小工具虽然功能并不复杂,然而开发它还是花了点时间,本文我将仔细讲解
如何使用原生Web开发技术,来实现这样的功能。

效果展示

https://kf.shengxunwei.com/ServerSettingsTool/index.html

在网页中填写服务器的各项配置信息:

根据填写的配置信息,自动生成所需的配置文件内容:

技术方案

下面我将详细介绍这样的网页版配置文件生成工具是如何开发的,在这个应用场景中,我使用了原生Web开发技术,使用了 Bootstrap + jQuery 的方案。

Bootstrap

Bootstrap 是最受欢迎的前端框架之一,用于快速开发响应式、移动优先的网站和应用程序。它由 Twitter(现 X)开发并开源,为开发者提供了一系列预先设计好的 CSS 样式、HTML 模板、JavaScript 插件,能够节省大量的时间和精力,使网页设计更高效。

  • Bootstrap 的核心优势是其响应式设计理念。它基于 CSS 媒体查询(Media Queries),能够根据不同设备的屏幕尺寸(如桌面显示器、平板电脑、手机)自动调整网页布局。例如,一个使用 Bootstrap 构建的网站,在大屏幕设备上可能会以多栏布局显示内容,而在小屏幕手机上则会自动堆叠成单栏布局,确保内容在任何设备上都能清晰可读。
  • 它提供了一套响应式网格系统,将页面划分为 12 列。开发者可以通过指定不同的列数来分配内容在不同屏幕尺寸下的宽度。比如,在桌面端可以让一个元素占据 6 列(即屏幕宽度的一半),在移动端可以让它占据 12 列(全屏宽度)。

jQuery

jQuery 是一个很实用的 JavaScript 库。

  • 它的主要作用是让网页开发中的一些操作变得更简单。比如在操作网页元素方面,能很方便地找到页面中的各种元素,像按标签、ID 或者类名来选取。找到元素后,可以轻松地修改元素里的文字、属性和样式。
  • 在事件处理上,给网页元素添加点击、鼠标移动、键盘按键等事件变得很容易,还能高效地处理动态添加元素的事件。
  • 它也能制作动画,简单的显示隐藏动画或者复杂一点的位置、透明度变化等动画都可以实现。
  • 另外,通过 jQuery 可以方便地发送异步请求,从服务器获取数据来更新网页的部分内容,而不用重新加载整个页面。

highlight.js

Highlight.js 是一个用于在网页中对代码进行语法高亮显示的 JavaScript 库。它能够识别多种编程语言的语法,包括但不限于 JavaScript、Python、Java、C++、HTML、CSS 等,使代码片段在网页上呈现出清晰、易读的格式,带有不同颜色来区分不同的语法元素。

  • 它通过解析代码文本,根据各种编程语言的语法规则来识别关键字、变量、函数、注释等不同的语法成分。例如,在 JavaScript 代码中,它可以将function关键字标记为一种颜色,将变量名标记为另一种颜色。
  • 当应用到网页时,通常需要在 HTML 中引入 Highlight.js 的库文件(JavaScript 和 CSS),然后指定要进行语法高亮显示的代码块,一般是通过类名或者其他属性来标识这些代码块。

开发核心功能

首先基于 Bootstrap 完成基本的页面开发,Bootstrap 提供了非常完善的布局功能,对于一个单页面的应用来说,这并不复杂,我们可以使用如下代码生成一个基本的页面结构,它包含了几个标签页:

<ul class="nav nav-tabs" id="myTab" role="tablist">
  <li class="nav-item" role="presentation">
    <button class="nav-link active" id="home-tab" data-bs-toggle="tab" data-bs-target="#home-tab-pane" type="button" role="tab" aria-controls="home-tab-pane" aria-selected="true">Home</button>
  </li>
  <li class="nav-item" role="presentation">
    <button class="nav-link" id="profile-tab" data-bs-toggle="tab" data-bs-target="#profile-tab-pane" type="button" role="tab" aria-controls="profile-tab-pane" aria-selected="false">Profile</button>
  </li>
  <li class="nav-item" role="presentation">
    <button class="nav-link" id="contact-tab" data-bs-toggle="tab" data-bs-target="#contact-tab-pane" type="button" role="tab" aria-controls="contact-tab-pane" aria-selected="false">Contact</button>
  </li>
  <li class="nav-item" role="presentation">
    <button class="nav-link" id="disabled-tab" data-bs-toggle="tab" data-bs-target="#disabled-tab-pane" type="button" role="tab" aria-controls="disabled-tab-pane" aria-selected="false" disabled>Disabled</button>
  </li>
</ul>
<div class="tab-content" id="myTabContent">
  <div class="tab-pane fade show active" id="home-tab-pane" role="tabpanel" aria-labelledby="home-tab" tabindex="0">...</div>
  <div class="tab-pane fade" id="profile-tab-pane" role="tabpanel" aria-labelledby="profile-tab" tabindex="0">...</div>
  <div class="tab-pane fade" id="contact-tab-pane" role="tabpanel" aria-labelledby="contact-tab" tabindex="0">...</div>
  <div class="tab-pane fade" id="disabled-tab-pane" role="tabpanel" aria-labelledby="disabled-tab" tabindex="0">...</div>
</div>

然后需要使用jQuery 来操作 form 表单,获取用户填写的配置信息:

  $(document).ready(function () {
      // 当按钮被点击时执行以下函数
      $('#btn').click(function () {
        // 获取输入框的值
        var inputValue = $('#inputBox').val();

        // 将获取到的值赋值给另一个变量(这里只是示例,可根据实际需求处理该值)
        var assignedValue = inputValue;

        // 将赋值后的变量值显示在指定的div元素中
        $('#result').text('获取并赋值后的值为:' + assignedValue);
      });
    });

最后,使用 highlight.js 来高亮显示生成的配置文件,以使它们更加可读:

在生成文件之后,调用
hljs.highlightAll();
方法即可。

<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/default.min.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script>

<!-- and it's easy to individually load additional languages -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/go.min.js"></script>

<script>hljs.highlightAll();</script>


简介下这个 .net 开发的小系统

可全天候 7 × 24 小时挂机运行,不掉线不丢消息,欢迎实测。

https://kf.shengxunwei.com/

大家好,我是汤师爷~

在工作当中,我们经常会听到以下说法:

  • 产品负责人说,现在的业务架构太复杂,需要仔细梳理下。
  • 技术领导说,这个项目很复杂,需要做下系统架构方案评审。
  • 研发经理说,这次秒杀活动访问量非常大,需要用到高并发架构方案。
  • 一线研发说,互联网大厂都会用到微服务架构,我要学学微服务架构设计。

上面提到的架构到底是指什么?这些说法究竟是对还是错?

其实上面的说法都是对的,只是采用的视角不一样。

复杂系统涉及多方利益相关者,如客户、产品经理、研发、销售、运营和管理层等。由于背景和认知差异,每个人看待系统的角度和方法都不尽相同。

为控制复杂度,我们为不同角色设计特定的架构描述物。通过分类和定义,让每种架构描述都有其侧重点,让每个利益相关者能快速获取他们最关心的信息。

要实现这一目标,我们首先需要理解"视角"和"视图"这两个关键概念。

架构视角

什么是视角?大白话就是你站在什么地方看。

我们以城市系统为例,你站在城市的某条马路上,能看到什么?

能看到几座楼房,几排树木,几条大马路,熙熙攘攘的行人。

但是你坐在飞机上看,能看到什么?

能看到一片片的楼盘,能看到群山,能看到江河湖海。所以,你能看到什么,和你站在什么地方看有很大关系,同时也会影响你看待事物的粒度。

如果把视角比作一个坐标点,那它需要一套坐标系,坐标系通常有4个维度:广度、深度、视图类型、时间。

广度是指看待事物的宽度,以业务流程为例,根据出发点不同,有时需要看一个部门内的流程,有时需要看多个部门的协作流程,有时需要看端到端跨部门流程。

深度是指看待事物时,要到达哪个细节层次,例如看业务流程,需要看到组织级、部门级、还是某个岗位的具体操作步骤。看软件系统,需要看到系统级、应用级、模块级、还是一行行的代码。

广度和深度一般是相互影响的,如果看待事物的广度越宽,那么层次就会越抽象,这和组织架构的设计也是相辅相成的,一般高层管理者看问题非常全面,但对细节不关注,一线执行人员,对问题的细节非常了解,但视角却非常窄。

时间维度比较好理解,就是看待事物的时间点,过去、现在、还是未来。

视图类型是为利益干系人量身打造的一组关注点的集合,接下来会详细介绍。

架构视图

什么是视图?大白话就是你想看到什么。

视图是为利益干系人量身打造的一组关注点的集合。

同样以城市系统为例,想要赶早高峰的上班族,他的关注点是哪条路线上班最快,因此他需要一副公交地铁路线图。

想要租房的租客,他的关注点是公司附近有哪些小区,租金多少,因此他需要一副公司附近的小区地图。

想要疏通下水道的工人,他的关注点是下水道是怎样排布的,因此他需要一副下水道的排布图。

同一个城市系统,不同角色的关注点是完全不一样的,想要获取的信息也是完全不一样,如果把所有信息杂糅在一起,不做视图隔离,导致的结果就是信息太庞杂,每个人都很难获取想要的信息。

同理,不同干系人看待软件系统的关注点也是迥然不同的,为了把不同人的关注点区分开,诞生了很多软件视图的分类方法,比较著名的有“4+1”视图,TOGAF的业务架构、应用架构、数据架构、技术架构等视图分类法。

TOGAF的4种架构视图

1996年,克林格.科恩法案颁布,美国联邦政府立法,强制要求政府机构使用企业架构理论构建自己的IT系统,最重要的机构是国防部、财政部,这一举措,直接让政府机构的数字化水平,以火箭般的速度飞速发展。

同一时间,大名鼎鼎的TOGAF也在快速发展,它大量参考了政府机构的企业架构理论,沉淀出一套更加通用的企业架构方法论。

目前80%的福布斯排行榜前50名的企业,以及60%的美国500强企业,都在使用TOGAF理论改善自身的IT架构。

我们重点说下TOGAF的4种视图类型:业务架构,应用架构,数据架构,技术架构。

它们是企业架构中的四个主要部分,它们关注的方面和功能不同,但相互关联和支持,共同构成了企业的总体架构。

一个清晰的企业架构可以确保业务流程顺畅、信息系统合理支持、构建步骤有序。企业架构是项目决策的重要依据,也是企业未来发展的基础。

  • 业务架构定义了为实现企业的业务战略,企业将自身业务结构化表达为全面的、多维度的抽象模型,包括商业模式、价值流、业务能力、业务流程、组织架构,以及它们与战略、产品、策略、项目执行、利益干系人之间的关系。
  • 应用架构定义了企业中的应用系统的结构和行为,这些系统之间的关系,以及它们如何与业务流程对接。
  • 数据架构定义了企业如何收集、存储、管理和使用数据,涉及到数据模型、数据管理、数据集成和治理的设计和实施。
  • 技术架构定义IT基础设施和技术组件的结构,通过它们可以支撑起企业对业务、数据、应用服务的需求,它们包括但不限于硬件、可部署的软件包、网络、技术中间件、通信设施、运算设施等。

通过视图与视角,我们可以分离关注点,将复杂问题进行拆解,让每个局部的复杂度控制在一个可以接受的范围。同时,团队有了统一的架构认知坐标系,进一步促成了业务标准化,通过分离不变点与变化点,提炼出可复用的业务组件,快速响应业务需求变化。

架构视图的核心概念

每种架构视图都包含一系列核心概念,通过这些概念可以层层剖析整个业务系统,系统化地理解和管理整体架构,确保各个层面的协调与一致。

  • 业务架构:商业模式,价值流,业务能力,业务流程,组织架构。
  • 应用架构:应用服务,应用结构,应用交互。
  • 数据架构:数据模型,数据库技术。
  • 技术架构:软件部署,技术组件、基础设施。

本文已收录于,我的技术网站:
tangshiye.cn
里面有,算法Leetcode详解,面试八股文、BAT面试真题、简历模版、架构设计,等经验分享。

【引言】

本应用的主要功能是将用户输入的数字转换为中文的小写、大写及大写金额形式。用户可以在输入框中输入任意数字,点击“示例”按钮可以快速填充预设的数字,点击“清空”按钮则会清除当前输入。转换结果显示在下方的结果区域,每个结果旁边都有一个“复制”按钮,方便用户将结果复制到剪贴板。

【环境准备】

• 操作系统:Windows 10

• 开发工具:DevEco Studio NEXT Beta1 Build Version: 5.0.3.806

• 目标设备:华为Mate60 Pro

• 开发语言:ArkTS

• 框架:ArkUI

• API版本:API 12

• 三方库:chinese-number-format(数字转中文)、chinese-finance-number(将数字转换成财务用的中文大写数字)

ohpm install @nutpi/chinese-finance-number
ohpm install @nutpi/chinese-number-format

【功能实现】

• 输入监听:通过 @Watch 装饰器监听输入框的变化,一旦输入发生变化,即调用 inputChanged 方法更新转换结果。

• 转换逻辑:利用 @nutpi/chinese-number-format 和 @nutpi/chinese-finance-number 库提供的方法完成数字到中文的各种转换。

• 复制功能:使用 pasteboard 模块将结果显示的中文文本复制到剪贴板,通过 promptAction.showToast 提示用户复制成功。

【完整代码】

// 导入必要的模块
import { promptAction } from '@kit.ArkUI'; // 用于显示提示信息
import { pasteboard } from '@kit.BasicServicesKit'; // 用于处理剪贴板操作
import { toChineseNumber } from '@nutpi/chinese-finance-number'; // 将数字转换为中文大写金额
import {
  toChineseWithUnits, // 将数字转换为带单位的中文
  toUpperCase, // 将中文小写转换为大写
} from '@nutpi/chinese-number-format';

@Entry // 标记此组件为入口点
@Component // 定义一个组件
struct NumberToChineseConverter {
  @State private exampleNumber: number = 88.8; // 示例数字
  @State private textColor: string = "#2e2e2e"; // 文本颜色
  @State private lineColor: string = "#d5d5d5"; // 分割线颜色
  @State private basePadding: number = 30; // 基础内边距
  @State private chineseLowercase: string = ""; // 转换后的小写中文
  @State private chineseUppercase: string = ""; // 转换后的中文大写
  @State private chineseUppercaseAmount: string = ""; // 转换后的中文大写金额
  @State @Watch('inputChanged') private inputText: string = ""; // 监听输入文本变化

  // 当输入文本改变时触发的方法
  inputChanged() {
    this.chineseLowercase = toChineseWithUnits(Number(this.inputText), 'zh-CN'); // 转换为小写中文并带上单位
    this.chineseUppercase = toUpperCase(this.chineseLowercase, 'zh-CN'); // 将小写中文转换为大写
    this.chineseUppercaseAmount = toChineseNumber(Number(this.inputText)); // 转换为大写金额
  }

  // 复制文本到剪贴板的方法
  private copyToClipboard(text: string): void {
    const pasteboardData = pasteboard.createData(pasteboard.MIMETYPE_TEXT_PLAIN, text); // 创建剪贴板数据
    const systemPasteboard = pasteboard.getSystemPasteboard(); // 获取系统剪贴板
    systemPasteboard.setData(pasteboardData); // 设置剪贴板数据
    promptAction.showToast({ message: '已复制' }); // 显示复制成功的提示
  }

  // 构建用户界面的方法
  build() {
    Column() { // 主列容器
      // 页面标题
      Text('数字转中文大小写')
        .fontColor(this.textColor) // 设置字体颜色
        .fontSize(18) // 设置字体大小
        .width('100%') // 设置宽度
        .height(50) // 设置高度
        .textAlign(TextAlign.Center) // 文本居中对齐
        .backgroundColor(Color.White) // 设置背景颜色
        .shadow({ // 添加阴影效果
          radius: 2, // 阴影半径
          color: this.lineColor, // 阴影颜色
          offsetX: 0, // X轴偏移量
          offsetY: 5 // Y轴偏移量
        });

      Scroll() { // 滚动视图
        Column() { // 内部列容器
          // 工具介绍部分
          Column() {
            Text('工具介绍').fontSize(20).fontWeight(600).fontColor(this.textColor); // 设置介绍文字样式
            Text('将数字转换为中文格式,适用于票据填写、合同文书、财务报表等多种场景。支持从最小单位“分”到最大单位“千兆”的数字转换。')
              .textAlign(TextAlign.JUSTIFY)
              .fontSize(18).fontColor(this.textColor).margin({ top: `${this.basePadding / 2}lpx` }); // 设置介绍详情文字样式
          }
          .alignItems(HorizontalAlign.Start) // 对齐方式
          .width('650lpx') // 设置宽度
          .padding(`${this.basePadding}lpx`) // 设置内边距
          .margin({ top: `${this.basePadding}lpx` }) // 设置外边距
          .borderRadius(10) // 设置圆角
          .backgroundColor(Color.White) // 设置背景颜色
          .shadow({ // 添加阴影效果
            radius: 10, // 阴影半径
            color: this.lineColor, // 阴影颜色
            offsetX: 0, // X轴偏移量
            offsetY: 0 // Y轴偏移量
          });

          // 输入区
          Column() {
            Row() { // 行容器
              Text('示例')
                .fontColor("#5871ce") // 设置字体颜色
                .fontSize(18) // 设置字体大小
                .padding(`${this.basePadding / 2}lpx`) // 设置内边距
                .backgroundColor("#f2f1fd") // 设置背景颜色
                .borderRadius(5) // 设置圆角
                .clickEffect({ level: ClickEffectLevel.LIGHT, scale: 0.8 }) // 设置点击效果
                .onClick(() => { // 点击事件
                  this.inputText = `${this.exampleNumber}`; // 设置输入框文本为示例数字
                });
              Blank(); // 占位符
              Text('清空')
                .fontColor("#e48742") // 设置字体颜色
                .fontSize(18) // 设置字体大小
                .padding(`${this.basePadding / 2}lpx`) // 设置内边距
                .clickEffect({ level: ClickEffectLevel.LIGHT, scale: 0.8 }) // 设置点击效果
                .backgroundColor("#ffefe6") // 设置背景颜色
                .borderRadius(5) // 设置圆角
                .onClick(() => { // 点击事件
                  this.inputText = ""; // 清空输入框
                });
            }.height(45) // 设置高度
            .justifyContent(FlexAlign.SpaceBetween) // 子元素水平分布方式
            .width('100%'); // 设置宽度
            Divider().margin({ top: 5, bottom: 5 }); // 分割线
            TextInput({ text: $$this.inputText, placeholder: `请输入数字,例如:${this.exampleNumber}` }) // 输入框
              .width('100%') // 设置宽度
              .fontSize(18) // 设置字体大小
              .caretColor(this.textColor) // 设置光标颜色
              .fontColor(this.textColor) // 设置字体颜色
              .margin({ top: `${this.basePadding}lpx` }) // 设置外边距
              .padding(0) // 设置内边距
              .backgroundColor(Color.Transparent) // 设置背景颜色
              .borderRadius(0) // 设置圆角
              .type(InputType.NUMBER_DECIMAL); // 设置输入类型为数字
          }
          .alignItems(HorizontalAlign.Start) // 对齐方式
          .width('650lpx') // 设置宽度
          .padding(`${this.basePadding}lpx`) // 设置内边距
          .margin({ top: `${this.basePadding}lpx` }) // 设置外边距
          .borderRadius(10) // 设置圆角
          .backgroundColor(Color.White) // 设置背景颜色
          .shadow({ // 添加阴影效果
            radius: 10, // 阴影半径
            color: this.lineColor, // 阴影颜色
            offsetX: 0, // X轴偏移量
            offsetY: 0 // Y轴偏移量
          });

          // 结果区
          Column() {
            Row() {
              Text(`小写:${this.chineseLowercase}`).fontColor(this.textColor).fontSize(18).layoutWeight(1); // 显示小写结果
              Text('复制')
                .fontColor(Color.White) // 设置字体颜色
                .fontSize(18) // 设置字体大小
                .padding(`${this.basePadding / 2}lpx`) // 设置内边距
                .backgroundColor("#0052d9") // 设置背景颜色
                .borderRadius(5) // 设置圆角
                .clickEffect({ level: ClickEffectLevel.LIGHT, scale: 0.8 }) // 设置点击效果
                .onClick(() => { // 点击事件
                  this.copyToClipboard(this.chineseLowercase); // 复制小写结果到剪贴板
                });
            }.constraintSize({ minHeight: 45 }) // 最小高度
            .justifyContent(FlexAlign.SpaceBetween) // 子元素水平分布方式
            .width('100%'); // 设置宽度
            Divider().margin({ top: 5, bottom: 5 }); // 分割线
            Row() {
              Text(`大写:${this.chineseUppercase}`).fontColor(this.textColor).fontSize(18).layoutWeight(1); // 显示大写结果
              Text('复制')
                .fontColor(Color.White) // 设置字体颜色
                .fontSize(18) // 设置字体大小
                .padding(`${this.basePadding / 2}lpx`) // 设置内边距
                .backgroundColor("#0052d9") // 设置背景颜色
                .borderRadius(5) // 设置圆角
                .clickEffect({ level: ClickEffectLevel.LIGHT, scale: 0.8 }) // 设置点击效果
                .onClick(() => { // 点击事件
                  this.copyToClipboard(this.chineseUppercase); // 复制大写结果到剪贴板
                });
            }.constraintSize({ minHeight: 45 }) // 最小高度
            .justifyContent(FlexAlign.SpaceBetween) // 子元素水平分布方式
            .width('100%'); // 设置宽度
            Divider().margin({ top: 5, bottom: 5 }); // 分割线
            Row() {
              Text(`大写金额:${this.chineseUppercaseAmount}`).fontColor(this.textColor).fontSize(18).layoutWeight(1); // 显示大写金额结果
              Text('复制')
                .fontColor(Color.White) // 设置字体颜色
                .fontSize(18) // 设置字体大小
                .padding(`${this.basePadding / 2}lpx`) // 设置内边距
                .backgroundColor("#0052d9") // 设置背景颜色
                .borderRadius(5) // 设置圆角
                .clickEffect({ level: ClickEffectLevel.LIGHT, scale: 0.8 }) // 设置点击效果
                .onClick(() => { // 点击事件
                  this.copyToClipboard(this.chineseUppercaseAmount); // 复制大写金额结果到剪贴板
                });
            }.constraintSize({ minHeight: 45 }) // 最小高度
            .justifyContent(FlexAlign.SpaceBetween) // 子元素水平分布方式
            .width('100%'); // 设置宽度
          }
          .alignItems(HorizontalAlign.Start) // 对齐方式
          .width('650lpx') // 设置宽度
          .padding(`${this.basePadding}lpx`) // 设置内边距
          .margin({ top: `${this.basePadding}lpx` }) // 设置外边距
          .borderRadius(10) // 设置圆角
          .backgroundColor(Color.White) // 设置背景颜色
          .shadow({ // 添加阴影效果
            radius: 10, // 阴影半径
            color: this.lineColor, // 阴影颜色
            offsetX: 0, // X轴偏移量
            offsetY: 0 // Y轴偏移量
          });
        }
      }.scrollBar(BarState.Off).clip(false); // 关闭滚动条,不允许裁剪
    }
    .height('100%') // 设置高度
    .width('100%') // 设置宽度
    .backgroundColor("#f4f8fb"); // 设置页面背景颜色
  }
}

Python的
asyncio
模块提供了基于协程(coroutines)的异步编程(asynchronous programming)模型。作为一种高效的编程范式,异步编程允许多个轻量级任务并发执行,且相比传统的多线程模型,具有更低的内存消耗。因此,
asyncio
在需要高并发处理的场景中,尤其是在Web开发、网络请求、API调用和套接字编程等领域,得到了广泛应用。本文将详细介绍如何在Python中使用
asyncio
进行异步编程。

本文在学习
asyncio
的过程中参考了以下文献:

1 异步编程介绍

异步编程是一种非阻塞的编程范式。在这种范式中,请求和函数调用会在未来某个时刻以某种方式在后台执行。非阻塞意味着当一个请求被发出时,程序不会停下来等待该请求的结果,而是会继续执行后续的操作。当请求的结果准备好时,程序会在适当的时机处理该结果,而不会影响程序其他部分的执行。因此,调用者可以继续执行其他任务,并在结果准备好或需要时,稍后处理已发出的调用结果。

1.1 什么是异步任务

异步操作指的是在程序运行时,有些任务不会立即完成,而是安排在未来某个时刻执行。与同步操作不同,后者要求任务在当前步骤中完成。

异步函数调用
(Asynchronous Function Call)是实现异步操作的一种方式。这种方式允许程序在等待某些任务完成时,继续执行其他工作,从而避免程序被卡住,提升效率。

通常,异步函数调用会返回一个被称为“未来”(Future)的对象(句柄)。这个对象可以看作是一个指向异步操作结果的标识符。程序可以通过它来查看任务的进度,或者等到任务完成时获取最终结果。这样,程序就可以在等待任务完成时做其他事情,而不是一直停下来等。

结合异步函数调用和Future,就得到了
异步任务
(Asynchronous Task)的概念。异步任务不仅仅是调用一个函数,它还包括了如任务取消、错误处理等更多的内容。这样,程序就可以更加灵活和高效地管理多个任务,提高并发性和整体性能。

简单来说,以下是这几个概念的总结:

  • 异步函数调用
    :指触发一个函数执行的请求,它会在未来某个时刻开始执行,而不会阻止程序继续做其他事情。
  • Future
    :是异步函数调用的一个标识符,允许调用者检查任务的状态,并在任务完成时获取结果。
  • 异步任务
    :指代一个包含异步函数调用和结果(Future)的集合,用于管理和跟踪整个异步操作的过程。

1.2 Python中的异步编程

1.2.1 非阻塞I/O与异步编程

输入input/输出output,简称I/O,指的是从资源中读取或写入数据。以下是I/O操作的一些典型应用场景:

  • 硬盘驱动器:对文件进行读取、写入、追加、重命名、删除等操作。
  • 外围设备:鼠标、键盘、屏幕、打印机、串行设备、摄像头等。
  • 互联网:下载和上传文件、获取网页、查询RSS等。
  • 数据库:执行选择、更新、删除等SQL查询。
  • 电子邮件:发送邮件、接收邮件、查询收件箱等。

相较于中央处理器(CPU)执行的计算任务,I/O操作通常具有较低的效率。在程序设计中,I/O请求通常以同步方式实现,即发起I/O请求的线程在数据传输完成之前会被挂起,等待操作完成。这种模式被称为阻塞式I/O(Blocking I/O)。在此模式下,操作系统能够识别线程的阻塞状态,并执行上下文切换,以便调度其他可执行的线程,从而优化CPU资源的利用率。尽管如此,阻塞式I/O操作会导致发起请求的线程或进程在I/O操作完成前无法继续执行。虽然这种设计不会对整个系统的运行造成影响,但它确实会在I/O操作期间暂时阻塞发起请求的线程或进程,影响其响应性和并发处理能力。

作为对阻塞I/O的替代方案,非阻塞I/O提供了更高效的选择。与阻塞I/O类似,非阻塞I/O同样需要底层操作系统的支持,但现代操作系统普遍提供了某种形式的非阻塞I/O功能。通过非阻塞I/O,应用程序可以以异步方式发起读写请求,操作系统将负责处理这些请求,并在数据准备好时通知应用程序。

异步编程(Asynchronous Programming)是一种专门用于处理非阻塞I/O操作的编程方式。与传统的阻塞I/O不同,非阻塞I/O使得系统在发出读写请求后不会等待操作完成,而是可以同时处理其他请求。操作结果或数据会在准备好时返回,或者在需要时提供给程序。因此,非阻塞I/O是实现异步编程的核心技术,通常这两者被统称为异步I/O。

1.2.2 asyncio模块介绍

在Python中,异步编程泛指非阻塞的请求处理方式,即发起请求后不暂停程序执行,而是继续处理其他任务。Python支持多种异步编程技术,其中部分与并发性紧密相关。为了支持异步编程,Python3.4版本首次引入了asyncio(asynchronous I/O的缩写)模块,为异步编程提供了基础设施。随后,在Python 3.5版本中,引入了async/await语法。其中:

  • asyncio模块旨在支持异步编程,并提供了底层和高级API。高级API提供了执行异步任务、处理回调、执行I/O操作等工具。而底层API则为高级API提供了支撑,包括事件循环的内部机制、传输协议和策略等。

  • async/await语法的引入是为了更好地支持协程,这是asyncio模块中实现并发的核心。因为协程提供了一种轻量级的并发方式,可以让单个线程在多个任务之间高效切换,从而实现并发执行,而无需使用传统的线程或进程。

协程是一种特殊的函数,它能够在执行过程中的多个点被暂停和恢复,实现协作式的多任务处理。与传统的子程序或函数相比,协程提供了更灵活的控制流,允许在多个点进行进入、退出和恢复执行。

目前,asyncio模块是Python异步编程的常用工具,它结合async/await语法和非阻塞I/O操作,为开发者提供了一个全面的异步编程框架。那么为什么要在Python程序使用异步编程:

  • 提升并发性能:通过使用协程,asyncio使得程序能够以单线程的方式高效地处理大量并发任务,避免了传统多线程编程中的复杂性和资源消耗。
  • 简化异步编程:asyncio提供了一套简洁的异步编程范例,使得编写和维护异步代码变得更加容易,同时也提高了代码的可读性和可维护性。
  • 优化I/O操作:asyncio支持非阻塞I/O操作,这意味着程序在等待I/O操作(如文件读写、网络通信等)时,不会阻塞主线程,从而可以同时执行其他任务,显著提高了I/O操作的效率。

1.3 Python并发单元的选择与比较

线程、进程、协程

在现代编程中,有效地处理并发是提高程序性能和响应能力的关键。Python提供了多种并发单元,包括线程、进程和协程,每种都有其特定的用途和优势。以下是对这三种并发单元的详细介绍:

  1. 线程(Threads)

线程是一种并发单元,Python中由threading模块提供,并得到操作系统的支持。线程适合处理阻塞I/O任务,例如从文件、套接字和设备中进行读写操作。然而,由于全局解释器锁(GIL)的存在,Python中的线程在执行CPU密集型任务时效率不高。

  1. 进程(Processes)

进程也是由操作系统支持的并发单元,Python中由multiprocessing模块提供。进程适合执行CPU密集型任务,尤其是那些不需要大量进程间通信的计算任务。与线程相比,进程可以绕过全局解释器锁,因此在处理CPU密集型任务时更为高效。

  1. 协程(Coroutines)

协程是Python语言和运行时(标准解释器)提供的并发单元,Python中由asyncio模块进一步支持。相较于线程,程序中可以拥有更多的协程同时运行。协程适用于非阻塞I/O操作,如与子进程和套接字的交互。此外,虽然阻塞I/O和CPU密集型任务并非协程的直接应用场景,但可以通过在后台使用线程和进程以模拟非阻塞的方式执行这些任务。

关于线程、进程、协程的详细介绍见:
进程、线程、协程

https://semfionetworks.com/blog/multi-threading-vs-multi-processing-programming-in-python/

在Python中使用协程的利弊

在Python中,使用协程相比于线程和进程有以下几个主要好处:

  • 轻量级:协程的创建和切换比线程和进程更高效,消耗的资源更少。
  • 避免上下文切换开销:协程切换由程序控制,不依赖操作系统调度,减少了CPU时间消耗。
  • 共享内存:协程在同一线程内运行,数据共享更简单,不需要锁和复杂的同步机制。
  • 简洁的代码结构:协程使异步代码能够以同步的方式书写,代码更易理解和维护。
  • 高并发处理:协程非常适合处理大量I/O密集型任务,能够高效利用单个线程实现并发。

当然在Python中使用协程也存在一些缺点:

  • 编程复杂性:协程要求开发者理解异步编程模式,这增加了编程的复杂性,尤其是在编写、测试和调试异步代码时。
  • 库支持有限:并非所有Python库都支持异步操作,这可能限制了协程在某些场景下的应用。
  • 调试难度:异步代码的调试比同步代码更困难,因为传统的调试工具可能无法有效地跟踪协程的执行流程。
  • 错误处理:异步代码中的错误处理比同步代码更为复杂,因为需要处理协程挂起和恢复时的状态。
  • 执行机制:根据Python底层设计,协程在执行时是协作式的,一次只能运行一个协程。这种机制类似于全局解释器锁下的线程执行。

2 asyncio的使用

2.1 协程的使用

了解协程的创建和运行是学习
asyncio
库的基础,因为
asyncio
正是通过协程实现异步编程的。因此,在学习
asyncio
之前,先掌握协程的基本概念非常重要。

协程是异步编程中实现并发的核心,它是一种特殊的函数,能够在执行过程中暂停并稍后恢复。与传统的函数不同,传统函数只能在一个固定的入口和出口点运行,而协程则允许在多个地方挂起、恢复或退出。这种特性使得协程在执行时可以暂停并等待其他任务完成,比如等待其他协程的执行结果、外部资源的返回(例如网络连接或数据处理),然后再继续执行。正是因为协程具备这种暂停和恢复的能力,它们能够同时执行多个任务,而且能够精确控制任务何时暂停和何时恢复。

协程的定义

在Python中,协程可以通过使用
async def
关键字来定义。这种定义方式允许协程接受参数,并在执行完毕后返回一个值,类似于常规函数的行为:

# 定义一个协程
async def custom_coroutine():
    # 协程体,可以包含异步操作
    pass

使用
async def
声明的协程被称为“协程函数”,这是一种特殊的函数,其返回值是一个协程对象。协程函数在其内部使用
await
(用于等待另一个协程完成)、
async for
(用于异步迭代)和
async with
(用于异步上下文管理器)等关键字来处理异步操作。如下所示:

# 定义一个异步协程
async def custom_coroutine():
    # 等待另一个协程执行
    # await表达式将暂停当前协程的执行,并将控制权转交给被等待的协程,以便其能够执行
    await asyncio.sleep(1)

协程的创建

在定义了协程之后,可以创建具体的协程实例:

# 实例化协程
coroutine_instance = custom_coroutine()

需要注意的是,调用协程函数本身并不会导致任何用户定义的代码被执行,其作用仅限于创建并返回一个
coroutine
对象。
coroutine
对象是Python中的一种特殊对象类型,它提供了如
send()

close()
等方法,用于控制协程的执行流程和生命周期管理。

可以通过创建协程实例并调用
type()
函数来报告其类型来演示这一点:

import asyncio
# 定义协程
async def custom_coroutine():
    print("运行自定义协程")
    # 等待另一个协程
    await asyncio.sleep(1)

# 创建协程
coro = custom_coroutine()
# 检查协程的类型
print(type(coro))

代码运行结果为:

<class 'coroutine'>
sys:1: RuntimeWarning: coroutine 'custom_coroutine' was never awaited

该警告是因为在Python中,当定义一个协程函数并调用它时,返回的是一个协程对象,而不是立即执行。这个协程对象需要通过
await
关键字或事件循环来执行。代码中的
print(type(coro))
正确地打印出了协程对象的类型。但是,由于协程没有被await,所以会有一个RuntimeWarning警告。

协程的运行

协程可以被定义和创建,但只有在事件循环中才能执行。事件循环是异步应用程序的核心,负责调度异步任务和回调,处理网络 I/O 操作。它还负责协调协程之间的协作和多任务处理。

事件循环的工作方式类似于一个不断运行的“调度器”,它会检查哪些任务已经准备好执行,并按顺序执行这些任务。如果某个任务需要等待,例如等待网络响应或文件读取,事件循环会暂时挂起这个任务,并把控制权交给其他可以继续执行的任务。这样,程序就可以在等待的同时,处理其他任务,从而避免了阻塞操作。

通常,通过调用
asyncio.run()
函数启动事件循环。该函数会启动事件循环并接收一个协程作为参数,等待协程执行完成并返回结果。代码示例如下:

import asyncio

# 定义协程
async def custom_coroutine():
    print("运行自定义协程")
    # 等待另一个协程
    await asyncio.sleep(1)

# 创建协程
coro = custom_coroutine()

# 运行协程
asyncio.run(coro)

# 检查协程的类型
print(type(coro))

代码运行结果为:

运行自定义协程
<class 'coroutine'>

2.2 asyncio任务的使用

在asyncio框架中,任务(Task)是协程的封装,它将协程交给事件循环调度执行。任务通常由协程创建,并在事件循环中运行,但它独立于协程本身。创建任务时,不需要等待其完成,可以继续执行其他操作。任务对象代表一个将在事件循环中异步执行的操作,借助任务管理,可以更方便地处理异步编程中的复杂场景。

协程是异步操作的基本单元,任务则负责管理和调度这些协程。任务不仅支持同时执行多个协程,还能处理结果、取消任务和捕获异常等。如果只关注协程,而忽视任务,就无法有效调度多个异步操作,也难以管理任务的生命周期。因此,理解任务对于掌握异步编程至关重要。

任务的生命周期可以从多个阶段来描述。首先,任务是由协程(coroutine)创建的,并被安排在事件循环(event loop)中独立执行。随着时间的推移,任务会进入运行状态。在执行过程中,任务可能会因为等待其他协程或任务完成而被挂起(suspended)。任务有可能在正常情况下完成并返回结果,或者由于某些异常而失败。如果有其他协程介入,任务也可能会被取消(canceled)。一旦任务完成,它将无法再被重新执行。因此,任务的生命周期可以总结为以下几个阶段:

  1. 创建(Created):任务被创建,但尚未开始执行。
  2. 调度(Scheduled):任务被安排到事件循环中,准备开始执行。
  3. 取消(Canceled):任务在执行之前或执行过程中被取消。
  4. 运行(Running):任务开始执行,进入活跃状态。
  5. 挂起(Suspended):任务在运行过程中被挂起,等待其他操作完成。
  6. 结果(Result):任务成功完成并返回结果。
  7. 异常(Exception):任务执行过程中遇到错误或异常,导致失败。
  8. 完成(Done):任务无论是否成功,都已结束,无法再执行。

https://superfastpython.com/asyncio-task-life-cycle/

2.2.1 asyncio任务创建和运行

任务的创建

任务是通过协程实例创建的,因此只能在协程内部创建和调度。可以使用
asyncio.create_task()
函数来创建任务,该函数会返回一个
asyncio.Task
实例:

import asyncio
# 定义协程
async def custom_coroutine():
    # 等待另一个协程
    await asyncio.sleep(1)

# 创建协程
coro = custom_coroutine()

# 从协程创建任务
# name参数为设置任务的名称
task = asyncio.create_task(coro, name='task')
# 也可以用函数设置任务名称
task.set_name('MyTask')

此外,
asyncio.ensure_future
函数也可以用来创建和安排任务,它会确保返回一个Future或Task实例:

# 创建并安排任务
task = asyncio.ensure_future(custom_coroutine())

当然也可以直接通过事件循环来创建任务,可以使用事件循环对象的
create_task
方法。示例如下:

# 获取当前事件循环
loop = asyncio.get_event_loop()
# 创建并安排任务
task = loop.create_task(custom_coroutine())

任务的运行

在创建任务后,尽管可以使用
create_task
函数将协程安排为独立任务,但任务未必会立即执行。任务的执行依赖于事件循环的调度,它会在其他所有协程执行完成后才会开始。例如,在一个asyncio程序中,若某个协程创建并安排了任务,任务只有在该协程挂起后才有可能开始执行。具体来说,任务的执行通常会等到协程进入休眠、等待其他协程或任务时,才会被事件循环调度执行:

import asyncio

# 定义一个简单的异步函数
async def my_coroutine(name):
    print(f"任务 {name} 开始执行")
    await asyncio.sleep(1)  # 模拟任务的延时
    print(f"任务 {name} 完成")

# 获取事件循环
loop = asyncio.get_event_loop()

print("任务创建之前")

# 创建任务并加入事件循环
task = loop.create_task(my_coroutine("A"))
print("任务创建之后,任务已加入事件循环")

# 运行事件循环,直到任务完成
loop.run_until_complete(task)

# 关闭事件循环
loop.close()

print("事件循环已关闭")

代码运行结果为:

任务创建之前
任务创建之后,任务已加入事件循环
任务 A 开始执行
任务 A 完成
事件循环已关闭

多任务的运行

gather
函数可以同时启动多个协程(任务)并发执行,同时将它们存储在一个集合中进行管理。它还支持等待所有协程执行完成,并且可以提供取消操作的功能。

以下是具体的代码示例:

# 并发执行多个协程,并返回结果
# 如果协程没有返回值,则返回None
results = asyncio.gather(coro1(), coro2())

或者,使用展开语法:

# 使用展开语法收集多个协程
asyncio.gather(*[coro1(), coro2()])

需要注意的是,直接传递一个协程列表是无效的:

# 直接传递协程列表是不允许的
asyncio.gather([coro1(), coro2()])

2.2.2 asyncio任务状态

本节将介绍以下内容:

  • 任务的完成与取消
  • 获取任务结果
  • 任务异常的处理
  • 任务回调函数的使用

任务完成与取消

在任务创建完成后,需要重点检查两个关键状态:一是任务是否已顺利完成;二是任务是否已被正式取消。可以通过
done()
方法来确认任务是否已完成,通过
cancelled()
方法来检查任务是否已被取消。示例代码如下:

import asyncio

# 异步任务1: 打印任务开始、等待1秒并打印任务完成
async def task_completed():
    print("任务1正在执行")
    await asyncio.sleep(1)  # 模拟异步操作,暂停1秒
    print("任务1完成")

# 异步任务2: 打印任务开始、等待2秒并打印任务完成
async def task_cancelled():
    print("任务2正在执行")
    await asyncio.sleep(2)  # 模拟异步操作,暂停2秒
    print("任务2完成")

# 主异步函数
async def main():
    # 创建并启动两个异步任务
    task1 = asyncio.create_task(task_completed())  # 创建任务1
    task2 = asyncio.create_task(task_cancelled())  # 创建任务2

    # 等待task1任务完成
    await task1

    # 取消task2任务
    task2.cancel()
    
    # 异常处理: 捕获任务2被取消的异常
    try:
        await task2  # 尝试等待task2完成
    except asyncio.CancelledError:
        # 如果task2被取消
        pass

    # 检查task1是否完成
    if task1.done():
        print("任务1已完成")

    # 检查task2是否被取消
    if task2.cancelled():
        print("任务2已取消")

# 运行主异步函数
asyncio.run(main())  # 启动事件循环,执行main函数

代码运行结果为:

任务1正在执行
任务2正在执行
任务1完成
任务1已完成
任务2已取消

任务结果获取

通过调用
result()
方法可以获取任务执行的结果。如果任务中包含的协程函数有返回值,则
result()
方法将返回该值;若协程函数未显式返回任何值,则默认返回
None

若任务已被取消,在尝试调用
result()
方法时会触发
CancelledError
异常。因此,建议在调用
result()
方法前先检查任务是否已被取消:

# 检查任务是否未被取消
if not task.cancelled():
    # 获取包装协程的返回值
    value = task.result()
else:
    # 任务已被取消

如果任务尚未完成,在调用
result()
方法时会抛出
InvalidStateError
异常。因此,在调用
result()
方法之前,最好先确认任务是否已完成:

# 检查任务是否已完成
if not task.done():
    await task
# 获取包装协程的返回值
value = task.result()

任务异常处理

可以通过
exception()
方法获取协程未处理的异常信息。若任务执行过程中发生异常,使用该方法能够捕获并返回该异常:

import asyncio

# 定义一个协程,模拟异常
async def faulty_coroutine():
    raise ValueError("协程中发生了错误。")

# 主程序
async def main():
    # 创建协程任务
    task = asyncio.create_task(faulty_coroutine())
    
    # 等待任务执行完成,捕获异常
    try:
        await task
    except Exception as e:
        # 获取任务中的异常
        exception = task.exception()
        print(f"任务异常: {exception}")

# 运行事件循环
asyncio.run(main())

在这个示例中,创建了一个协程
faulty_coroutine
,该协程在执行时会引发
ValueError
异常。通过
task.exception()
方法捕获并打印该异常信息。执行结果将显示:

任务异常: 协程中发生了错误。

任务的回调函数

通过
add_done_callback()
方法可以为任务指定一个完成时触发的回调函数。这个方法需要传入一个函数名,该函数将在任务完成时被调用。注意,任务完成可以发生在以下几种情况:包装的协程正常结束、返回结果、抛出未捕获的异常,或者任务被取消。以下是如何定义和注册一个完成回调函数的示例:

# 定义完成回调函数
def handle(task):
    print(task)

# 为任务注册完成回调函数
task.add_done_callback(handle)

同样地,如果需要,可以使用
remove_done_callback()
方法来删除或取消之前注册的回调函数:

# 取消注册的回调函数
task.remove_done_callback(handle)

但是要注意的是回调函数通常是普通的Python函数,无法进行异步操作:

import asyncio

# 异步函数
async def my_coroutine():
    print("开始任务")
    await asyncio.sleep(1)
    print("任务完成")
    return "任务完成"

# 回调函数
def my_callback(task):
    print("回调函数被调用")
    result = task.result()  # 获取任务的返回结果
    print(f"任务的返回结果是: {result}")

async def main():
    # 创建任务
    task = asyncio.create_task(my_coroutine())

    # 注册回调函数
    task.add_done_callback(my_callback)

    # 等待任务完成
    await task

# 运行主函数
asyncio.run(main())

代码运行结果为:

开始任务
任务完成
回调函数被调用
任务的返回结果是: 任务完成

2.2.3 asyncio任务获取

当前任务获取

可以使用
asyncio.current_task()
方法来获取当前正在执行的任务。这个方法会返回一个代表当前任务的
Task
对象。以下示例展示了如何在主协程中获取当前任务:

# 从当前协程中获取当前任务
import asyncio

# 定义主协程
async def main():
    # 输出开始消息
    print('主协程已启动')
    # 获取当前任务
    current_task = asyncio.current_task()
    # 打印任务详情
    print(current_task)

# 运行主协程
asyncio.run(main())

上述代码打印结果包含任务的名称和正在运行的协程信息:

Task pending name='Task-1' coro=<main() 

所有任务获取

可以使用
asyncio.all_tasks()
函数来检索asyncio程序中所有已安排和正在执行(尚未完成)的任务。以下示例首先创建了10个任务,每个任务都封装并执行相同的协程。随后,主协程捕获程序中所有已计划或正在运行的任务集合,并输出它们的详细信息:

import asyncio

# 10个异步任务
async def task_coroutine(value):
    # 输出任务运行信息
    print(f'任务 {value} 开始运行')
    # 模拟异步等待,使每个任务休眠1秒
    await asyncio.sleep(1)
    
    # 输出任务运行信息
    print(f'任务 {value} 结束运行')

# 主协程定义
async def main():
    # 输出主协程启动信息
    print('主协程已启动')
    # 创建10个任务
    # 注意。任务的执行依赖于事件循环的调度,它会在其他所有协程执行完成后才会开始
    # 例如当前协程创建了任务,但是任务只有在当前协程挂起后才有可能开始执行
    started_tasks = [asyncio.create_task(task_coroutine(i), name=f'任务{i}') for i in range(10)]
    # 使得当前的协程(即 main 协程)挂起0.1秒,从而使得子任务运行
    # 如果没有这句代码,也没有之后的gather函数,子任务会在main函数将要结束时运行,此时事件循环还存在
    await asyncio.sleep(0.1)
    # 获取所有任务的集合
    all_tasks = asyncio.all_tasks()
    # 输出所有任务的详细信息
    for task in all_tasks:
        print(f'> {task.get_name()}, {task.get_coro()}')
    print("等待任务完成")
    # gather会收集传入的所有任务,并阻塞当前协程,直到所有任务都执行完毕
    # 没有gather函数,这样当前协程会直接结束,导致部分任务未能执行或未完成
    await asyncio.gather(*started_tasks)
    # gather函数类似于以下代码
    # 逐个等待每个任务完成
    # for task in started_tasks:
    #     await task  # 等待单个任务完成

# 运行异步程序
asyncio.run(main())

2.2.4 asyncio任务等待

asyncio.wait函数

asyncio.wait()
函数用于等待多个 asyncio.Task 实例(即封装了协程的任务)完成。它允许配置等待策略,比如等待全部任务、第一个完成或第一个出错的任务。这些任务实际上是 asyncio.Task 类的实例,它们封装了协程,使得协程可以被调度并独立运行,并提供了查询状态和获取结果的接口。

asyncio.wait()
函数接收一组可等待对象,通常为
Task
实例,或者
Task
的列表、字典或集合。该函数会持续等待,直到任务集合中的某些条件得到满足,默认情况下,这些条件是所有任务都已完成。
asyncio.wait()
返回一个包含两个集合的元组:第一个集合是所有已满足条件的任务对象,称为“完成集”("done" set);第二个集合是尚未满足条件的任务对象,称为“待处理集”("pending" set)。

例如:

tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks)

在上面的示例中,
asyncio.wait()
被加上了
await
,原因在于从技术角度来看,
asyncio.wait()
是一个返回协程的协程函数。
await
用于暂停异步函数的执行,以便调用
asyncio.wait
,从而等待所有任务完成。

等待条件设置


asyncio.wait()
函数中,
return_when
参数允许指定等待的条件,其默认值为
asyncio.ALL_COMPLETED
,意味着只有当所有任务都完成时,才会停止等待并返回结果。如果将
return_when
参数设置为
asyncio.FIRST_COMPLETED
,将等待直到列表中的第一个任务完成。一旦第一个任务完成并从等待集中移除,将继续执行当前代码,但其余的任务将继续执行,不会被取消:

import asyncio
import random

# 模拟一个可能需要不同时间完成的异步任务
async def async_task(name, duration):
    print(f"任务 {name} 开始,预计耗时 {duration} 秒")
    await asyncio.sleep(duration)
    print(f"任务 {name} 完成")
    return f"结果 {name}"

# 主函数,用于启动和管理异步任务
async def main():
    # 创建两个任务,一个快一个慢
    fast_task = asyncio.create_task(async_task("任务1", random.randint(1, 3)), name='任务1')
    slow_task = asyncio.create_task(async_task("任务2", random.randint(4, 6)), name='任务2')

    # return_when=asyncio.FIRST_COMPLETED表示第一个任务以下代码完成后,会继续后续等待,而不用等待其余任务
    # done和pending都是集合类型(set),包含完成的任务集合和未完成的集合
    done, pending = await asyncio.wait([fast_task, slow_task], return_when=asyncio.FIRST_COMPLETED)
    
    # 处理完成的任务
    for task in done:
        # 提取结果
        result = task.result()
        print(f"{task.get_name()}的执行结果:{result}")

    print(f"已完成任务数:{len(done)}")
    
    # 等待剩余的任务完成(如果需要)
    if len(pending) >0:
        await asyncio.wait(pending)

# 运行主函数
asyncio.run(main())

此外,可以通过将
return_when
参数设置为
FIRST_EXCEPTION
来等待第一个因异常失败的任务。如果没有任务因异常失败,
done
集合将包含所有已完成的任务,且
wait()
函数仅在所有任务完成后才会返回结果。

任务超时

可以通过
timeout
参数指定等待任务的最大时间(以秒为单位)。如果超时,则函数将返回一个包含当前满足条件的任务子集的元组,例如,如果等待所有任务完成,则返回的是已完成的任务子集。示例代码如下:

import asyncio
import random

# 在新任务中执行的协程
async def task_coro(arg):
    # 模拟不同的执行时间
    value =  random.uniform(0, 2)  # 保证每个任务的执行时间为0到2秒
    await asyncio.sleep(value)
    print(f'> 任务 {arg} 完成,执行时间 {value:.2f} 秒')

# 主协程
async def main():
    # 创建多个任务
    tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
    
    # 设置最大等待时间为1秒,超时后返回已完成的任务
    done, pending = await asyncio.wait(tasks, timeout=1)
    
    # 打印结果:已完成的任务和待处理的任务
    print(f'已完成的任务数量: {len(done)}')
    print(f'待处理的任务数量: {len(pending)}')
    
    # 如果超时后有任务未完成,显示它们的状态
    if pending:
        print('以下任务未在超时时间内完成:')
        for task in pending:
            print(f'- 任务 {tasks.index(task)}')

# 启动异步程序
asyncio.run(main())

单个任务等待

asyncio.wait_for()
函数用于等待协程或任务的完成,并提供超时控制。与
asyncio.wait()
不同,
wait_for()
仅等待一个任务,并且在超时之前会检查该任务是否已完成。如果任务未能在指定的超时时间内完成,函数会抛出
asyncio.TimeoutError
异常。如果没有设置超时,函数将一直等待任务完成。

wait_for()
会返回一个协程对象,实际执行时需要通过
await
显式等待结果,或者将其调度为任务。如果超时,任务将被取消。
wait_for()
接受两个参数:

  1. 第一个参数是待等待的协程或任务。
  2. 第二个参数是超时时间(单位为秒),可以是整数或浮点数。如果设置为
    None
    ,表示没有超时限制。

示例代码如下:

from random import random
import asyncio

# 定义异步函数 task_coro,作为协程任务执行
async def task_coro(arg):
    # 生成一个1到2之间的随机数
    value = 1 + random()
    # 打印接收到的值
    print(f'>任务接收到 {value}')
    # 异步等待,模拟耗时操作,等待时间由随机数决定
    await asyncio.sleep(value)
    # 打印任务完成消息
    print('>任务完成')

# 定义主协程函数 main
async def main():
    # 创建协程任务并传入参数1
    task = task_coro(1)
    # 尝试执行任务并设置超时为0.2秒
    try:
        await asyncio.wait_for(task, timeout=0.2)
    except asyncio.TimeoutError:
        # 超时处理:打印任务取消消息
        print('放弃等待,任务已取消')
    except Exception:
        # 捕获其他可能的异常
        pass

# 启动异步程序,运行主协程
asyncio.run(main())

2.2.5 asyncio任务保护

异步任务可通过调用
cancel()
方法取消。将任务包装在
asyncio.shield()
中可防止其被取消。
asyncio.shield()
会将协程或可等待对象包装在一个特殊对象中,吸收所有取消请求。即便外部请求取消,任务仍会继续执行。此功能在异步编程中尤为重要,特别是当某些任务可取消,而其他关键任务需持续运行时。
asyncio.shield()
接受可等待对象并返回一个
asyncio.Future
对象,可直接等待该对象或传递给其他任务:

# 防止任务被取消
shielded = asyncio.shield(task)
# 等待屏蔽任务
await shielded

返回的
Future
对象可以通过调用
cancel()
方法来取消。如果内部任务仍在执行,取消请求会被视为成功。例如:

# 取消屏蔽任务
was_canceled = shielded.cancel()

至关重要的是,向
Future
对象发出的取消请求并不会传递给其内部任务:

import asyncio

async def coro():
    print("任务开始")
    await asyncio.sleep(3)
    print("任务完成")

async def main():
    # 创建异步任务
    task = asyncio.create_task(coro())
    
    # 使用shield包装任务,以创建一个不可取消的任务
    shield = asyncio.shield(task)
    
    # 尝试取消shield,但这不会影响内部的task
    shield.cancel()
    
    try:
        # 等待任务执行完成
        await task
    except asyncio.CancelledError:
        print("任务被取消")

# 启动异步事件循环
asyncio.run(main())

代码运行结果为:

任务开始
任务完成

如果一个正在被屏蔽的任务被取消了,那么取消请求将会传递给
shield
对象,导致
shield
对象也被取消,并且会触发
asyncio.CancelledError
异常。以下是代码示例:

import asyncio

async def coro():
    print("任务开始")
    await asyncio.sleep(3)
    print("任务完成")

async def main():
    # 创建异步任务
    task = asyncio.create_task(coro())
    
    # 使用 shield 包装任务,以创建一个不可取消的任务
    shield = asyncio.shield(task)
    
    # 取消task
    task.cancel()
    
    try:
        # 等待任务执行完成
        await task
    except asyncio.CancelledError:
        print("任务被取消")

# 启动异步事件循环
asyncio.run(main())

代码运行结果为:

任务被取消

最后,以下示例展示了如何创建、调度和保护协程任务,首先,创建一个主协程
main()
,作为应用程序的入口点,并创建一个任务协程,确保任务不会被取消。随后,使用
asyncio.shield()
保护任务,将其传递给
cancel_task()
协程,在其中模拟
shielded
任务取消请求。主协程等待该任务并捕获
CancelledError
异常。任务在运行一段时间后休眠,最终,任务完成并返回结果,
shielded
任务被标记为取消,而内部任务则正常完成:

import asyncio

# 定义一个简单的异步任务,模拟处理逻辑
async def simple_task(number):
    await asyncio.sleep(1)
    return number

# 定义一个异步任务,稍后取消指定任务
async def cancel_task(task):
    await asyncio.sleep(0.2)
    was_cancelled = task.cancel()
    print(f'已取消: {was_cancelled}')

# 主协程,调度其他任务
async def main():
    coro = simple_task(1)
    task = asyncio.create_task(coro)
    shielded = asyncio.shield(task)

    # 创建取消任务的协程
    asyncio.create_task(cancel_task(shielded))
    
    try:
        result = await shielded
        print(f'>获得: {result}')
    except asyncio.CancelledError:
        print('任务已被取消')

    await asyncio.sleep(1)
    
    print(f'保护任务: {shielded}')
    print(f'任务: {task}')

# 启动主协程
asyncio.run(main())

2.2.6 asyncio中运行阻塞任务

asyncio专注于异步编程和非阻塞I/O操作。然而,异步应用中执行阻塞函数调用是不可避免的,原因包括:

  • 执行CPU密集型任务,如复杂计算。
  • 处理阻塞I/O任务,如文件读写。
  • 调用未与asyncio集成的第三方库。阻塞调用会导致事件循环暂停,阻止其他协程运行。

asyncio
模块提供了两种方法来在
asyncio
程序中执行阻塞调用。第一种方法是使用
asyncio.to_thread()
函数,它是一个高级API,专为应用程序开发者设计。
asyncio.to_thread()
在单独的线程中执行并返回一个协程。这个协程可以被等待或调度作为独立任务执行。同时
asyncio.to_thread()
会在后台创建一个
ThreadPoolExecutor
来执行阻塞操作,因此它适用于 I/O 密集型任务。

在以下代码中,
asyncio.to_thread
的作用是将一个阻塞的同步任务(即
blocking_task()
函数)封装成异步任务,并将其交给一个独立的线程池运行。这样做的目的是避免阻塞主事件循环,从而确保其他异步任务可以继续执行。具体而言,
blocking_task
是一个同步函数,其中的
time.sleep(2)
会阻塞当前线程 2 秒。由于线程在这段时间内无法执行其他任务,如果在传统的
asyncio
环境中直接调用阻塞函数,事件循环会被暂停,无法继续调度其他任务。但是
asyncio.to_thread
的使用确保了阻塞任务不会影响到主事件循环的执行:

import asyncio
import time

# 定义一个阻塞 IO 绑定任务
def blocking_task():
    # 报告任务开始
    print('任务开始')
    # 模拟阻塞操作:休眠2秒
    time.sleep(2)  # 这里的 time.sleep 是阻塞当前线程的操作
    # 报告任务结束
    print('任务完成')

# 主协程
async def main():
    # 报告主协程正在运行并启动阻塞任务
    print('主协程正在执行阻塞任务')

    # 将阻塞任务封装成协程并通过asyncio.to_thread函数运行
    # asyncio.to_thread 会将阻塞任务分配给一个独立的线程池来执行
    coro = asyncio.to_thread(blocking_task)

    # 创建一个 asyncio 任务来执行上述协程
    # asyncio.create_task 将协程封装为 Task 对象,使其能够在后台执行
    task = asyncio.create_task(coro)

    # 主协程继续执行其他事情
    print('主协程正在做其他事情')

    # 使用 await asyncio.sleep(0) 允许任务被调度
    # 这个操作确保协程任务有机会开始执行,因为 main() 协程的执行会在此处暂停
    await asyncio.sleep(0)  # 让出控制权,确保 task 能被执行

    # 等待任务执行完成
    await task

# 运行异步程序
# asyncio.run(main()) 负责启动整个异步事件循环并执行 main() 协程
asyncio.run(main())

另一种方法是使用
loop.run_in_executor()
函数,首先通过
asyncio.get_running_loop()
获取当前事件循环。
loop.run_in_executor()
函数接收一个执行器和一个要执行的函数,如果传入
None
作为执行器参数,则默认使用
ThreadPoolExecutor
。该函数返回一个可等待对象,可以选择等待它,且任务会立即开始执行,因此不需要额外等待或安排返回的可等待对象来启动阻塞调用。示例如下:

# 获取事件循环
loop = asyncio.get_running_loop()
# 在单独的线程中执行函数
await loop.run_in_executor(None, task)

或者,可以创建一个执行器并将其传递给
loop.run_in_executor()
函数,该函数将在执行器中执行异步调用。在这种情况下,调用者必须管理执行器,在调用者完成后将其关闭:

# 创建进程池
with ProcessPoolExecutor as exe:
    # 获取事件循环
    loop = asyncio.get_running_loop()
    # 在单独的线程中执行函数
    await loop.run_in_executor(exe, task)
    # 进程池自动关闭...

2.3 异步编程模型

2.3.1 异步迭代器

迭代是Python中的基本操作,
asyncio
提供了对异步迭代器的支持。通过定义实现
__aiter__

__anext__
方法的对象,能够在
asyncio
程序中创建并使用异步迭代器(Asynchronous Iterators)。

迭代器

迭代器是实现了迭代协议的Python对象。具体而言,
__iter__()
方法返回迭代器自身,而
__next__()
方法使迭代器前进并返回下一个元素。当没有更多数据时,迭代器会引发
StopIteration
异常。可以通过内置函数
next()
逐步获取迭代器中的元素,或者使用
for
循环自动遍历迭代器:

# 定义一个名为 MyIterator 的迭代器类
class MyIterator:
    # 初始化方法,接收两个参数:start 和 end,定义迭代的起始值和结束值
    def __init__(self, start, end):
        self.current = start  # 设置当前迭代的位置,初始为 start
        self.end = end        # 设置迭代的结束值

    # 定义迭代器的 __iter__ 方法,返回迭代器自身
    def __iter__(self):
        return self  # 迭代器对象自身是可迭代的

    # 定义迭代器的 __next__ 方法,用于获取下一个值
    def __next__(self):
        # 如果当前值已经达到或超过结束值,抛出 StopIteration 异常,表示迭代结束
        if self.current >= self.end:
            raise StopIteration
        self.current += 1  # 将当前值加 1
        return self.current - 1  # 返回当前值,减去 1 是因为在加 1 后,当前值已经递增过

# 创建 MyIterator 类的一个实例,从 2开始,结束值为 5
my_iter = MyIterator(2, 5)

# 逐步获取元素,调用 next(my_iter) 获取迭代器中的下一个元素
print(next(my_iter))  # 输出 2
print(next(my_iter))  # 输出 3

# 使用 for 循环遍历MyIterator类实例,这里会自动调用 __iter__ 和 __next__ 方法
# MyIterator(0, 3)创建一个新的迭代器实例,迭代的范围是从 0 到 3(不包含 3)
for number in MyIterator(0, 3):
    print(number)

异步迭代器

异步迭代器是实现了
__aiter__()

__anext__()
方法的Python对象。
__aiter__()
返回迭代器实例,
__anext__()
返回一个可等待对象,用于执行迭代步骤。异步迭代器只能在
asyncio
程序中使用,可以通过
async for
表达式遍历,自动调用
__anext__()
并等待其结果。与普通的
for
循环不同,
async for
适用于处理异步操作,如网络请求或文件读取。

要创建异步迭代器,只需定义实现这两个方法的类。
__anext__()
必须返回一个可等待对象,并使用
async def
定义。迭代结束时,
__anext__()
应抛出
StopAsyncIteration
异常。由于异步迭代器依赖
asyncio
事件循环,迭代过程中的每个对象都在协程中执行并等待:

# 定义一个异步迭代器
class AsyncIterator():
    # 构造函数,初始化一些状态
    def __init__(self):
        self.counter = 0

    # 实现迭代器协议的 __aiter__方法
    def __aiter__(self):
        return self

    # 实现异步的 __anext__ 方法
    async def __anext__(self):
        # 如果没有更多项目,抛出StopAsyncIteration异常
        if self.counter >= 10:
            raise StopAsyncIteration
        # 增加计数器
        self.counter += 1
        # 返回当前计数器值
        return self.counter
# 创建迭代器
it = AsyncIterator()

通过使用
async for
表达式在循环中遍历异步迭代器,该表达式将自动等待循环的每次迭代:

import asyncio
it = AsyncIterator()

async def main():
    # 遍历异步迭代器
    async for result in AsyncIterator():
        print(result)

# 启动异步任务
asyncio.run(main())

如果使用的是Python 3.10或更高版本,可以使用
anext
内置函数遍历迭代器的一步,就像使用
next
函数的经典迭代器一样:

import asyncio
# 获取迭代器一步的等待
awaitable = anext(it)
# 执行迭代器的一步并得到结果
result = await awaitable

2.3.2 异步生成器

生成器是Python的基本组成部分,指的是包含至少一个
yield
表达式的函数。与常规函数不同,生成器函数可以在执行过程中暂停,并在后续恢复执行,这种特性与协程相似。实际上,Python中的协程是生成器的扩展。通过
asyncio
库,能够实现异步生成器,而异步生成器(Asynchronous Generators)则是基于协程中
yield
表达式的应用。

生成器

生成器是一个Python函数,它通过
yield
表达式逐步返回值。每当生成器遇到
yield
时,它会返回一个值并暂停执行。下一次调用生成器时,它会从暂停的位置继续执行,直到再次遇到
yield
。虽然生成器可以通过内置的
next()
函数逐步执行,但通常更常见的做法是使用迭代器,如
for
循环或列表推导式,来遍历生成器并获取所有返回的值:

# 定义一个简单的生成器函数
def count_up_to(max):
    count = 1
    while count <= max:
        yield count  # 暂停并返回当前值
        count += 1

# 使用 next() 函数逐步执行生成器
counter = count_up_to(5)
print(next(counter))  # 输出 1
print(next(counter))  # 输出 2
print(next(counter))  # 输出 3

# 使用 for 循环遍历生成器
for num in count_up_to(5):
    print(num)  # 输出 1, 2, 3, 4, 5

# 也可以使用列表推导式将生成器的结果转为列表
numbers = [num for num in count_up_to(5)]
print(numbers)  # 输出 [1, 2, 3, 4, 5]

异步生成器

异步生成器是使用
yield
表达式的特殊协程,与普通生成器不同,它能在运行时暂停并等待其他协程或任务完成。异步生成器函数创建一个异步迭代器,无法通过
next()
遍历,而需使用
anext()
获取下一个值。这使得异步生成器支持
async for
语法,每次迭代时都会暂停,等待任务完成后继续执行。简单来说,异步生成器在每次迭代时像一个等待中的任务,
async for
会调度并等待其结果。

异步生成器通过定义一个包含至少一个
yield
表达式的协程实现,且该函数需使用
async def
语法。由于它本质上是一个协程,每个迭代返回的是一个等待对象,这些对象在
asyncio
事件循环中被调度执行,可以在生成器中等待这些对象:

# 定义一个异步生成器
async def async_generator():
    for i in range(10):
        # 暂停并睡眠一会儿
        await asyncio.sleep(1)
        # 向调用者产生一个值
        yield i
        
# 创建迭代器
gen = async_generator()

可以使用
async for
表达式在循环中遍历异步生成器,该表达式会自动等待每次迭代的结果:

# 异步运行的入口点
import asyncio
async def main():
    # 异步迭代生成器并打印结果
    async for result in async_generator():
        print(result)
    
    # 使用列表推导式收集所有结果
    results = [item async for item in async_generator()]
    print(results)

asyncio.run(main())

如果使用的是 Python 3.10 或更高版本,可以使用 anext() 内置函数遍历迭代器的一步,就像使用 next() 函数的经典迭代器一样:

import asyncio
# 获取生成器一步的等待值
awaitable = anext(gen)
# 执行生成器的一步并得到结果
result = await awaitable

2.3.3 异步推导式

异步推导式(Asynchronous Comprehensions)是经典推导式的异步版本,专门用于处理异步迭代和异步操作。
asyncio
支持两种类型的异步推导式,分别是
async for
推导式和
await
推导式。

推导式

推导式是一种通过简洁的语法构建数据集合(如列表、字典和集合)的方法。它允许在单行代码中结合
for
循环和条件语句,快速创建并填充数据结构。推导式通常由三部分组成:

  1. 输出表达式:表示生成的元素。
  2. for循环:用于遍历可迭代对象。
  3. 条件表达式(可选):用于过滤元素,仅保留符合条件的部分。

列表推导式可以通过
for
表达式从一个新列表中生成元素。例如:

# 使用列表推导式创建列表
result = [a * 2 for a in range(100)]

此外,推导式还可以用于创建字典和集合。例如:

# 使用推导式创建字典
result = {a: i for a, i in zip(['a', 'b', 'c'], range(3))}
# 使用推导式创建集合
result = {a for a in [1, 2, 3, 2, 3, 1, 5, 4]}

async for异步推导式

异步推导式可通过带异步迭代的
async for
创建列表、集合或字典。该表达式按需生成协程或任务,获取其结果并存入目标容器。需要注意,
async for
仅能在协程或任务中使用。异步迭代器返回可等待对象,
async for
用于遍历并获取每个对象的结果,在内部
async for
自动等待并调度协程。异步生成器实现异步迭代器方法,亦可用于异步推导式。代码示例如下:

import asyncio

# 异步函数,模拟异步操作
async def get_data():
    await asyncio.sleep(1)  # 模拟异步等待
    return [1, 2, 3, 4, 5]

# 异步推导式创建列表
async def async_list_comprehension():
    async_data = await get_data()
    async_list = [x * 2 for x in async_data]
    return async_list

# 异步推导式创建集合
async def async_set_comprehension():
    async_data = await get_data()
    async_set = {x * 2 for x in async_data}
    return async_set

# 异步推导式创建字典
async def async_dict_comprehension():
    async_data = await get_data()
    async_dict = {x: x * 2 for x in async_data}
    return async_dict

# 运行异步推导式
async def main():
    list_result = await async_list_comprehension()
    set_result = await async_set_comprehension()
    dict_result = await async_dict_comprehension()
    
    print("Async List:", list_result)
    print("Async Set:", set_result)
    print("Async Dict:", dict_result)

# 启动事件循环
asyncio.run(main())

await异步推导式

await
表达式不仅适用于常规异步操作,还可用于列表、集合或字典推导式,称为 await 推导。无论在异步还是同步代码中,建议统一使用 await 推导或列表推导。与异步推导式类似,await 推导仅能在异步协程或任务中使用。该机制通过挂起并等待一系列可等待对象,构建数据结构(如列表)。在当前协程中,这些可等待对象按顺序执行:

import asyncio
async def fetch_data(item):
    # 模拟异步数据获取操作
    await asyncio.sleep(1)  # 模拟异步等待
    return f"Data for {item}"

async def main():
    # 创建一个包含可等待对象的列表
    awaitables = [fetch_data(item) for item in range(5)]
    
    # 使用 await 推导式构建列表
    results = [await x for x in awaitables]
    
    # 打印结果
    print(results)

# 运行主函数
asyncio.run(main())

2.3.4 异步上下文管理器

上下文管理器是Python中的一种结构,它提供了类似
try-finally
的环境,具有统一的接口和简洁的语法,通常通过
with
语句来使用。上下文管理器常用于资源管理,确保资源在使用后能够被正确关闭或释放,无论使用过程是否成功,或是否因异常而导致失败。
asyncio
库支持开发异步上下文管理器。在
asyncio
程序中,异步上下文管理器(asynchronous ContextManagers)可以通过定义一个实现了
__aenter__()

__aexit__()
方法的协程对象来创建和使用。

上下文管理器

上下文管理器是 Python 中定义了
__enter__

__exit__
方法的对象,它们分别负责在
with
语句块开始和结束时执行特定的操作。这一机制使得在进入和退出特定代码块时,能够自动管理资源的生命周期,如文件、套接字或线程池的打开与关闭。通过上下文管理器,可以有效地管理资源,提升代码的安全性与可读性。

通过
with
语句使用上下文管理器,可在代码块执行前后自动完成资源的准备与清理工作。上下文管理器对象通常在
with
语句中创建,并自动触发
__enter__
方法。无论代码块是正常结束还是因异常退出,
__exit__
方法都会被自动调用。

例如:

# 使用上下文管理器
with ContextManager() as manager:
    # 在此处执行代码块
# 自动管理资源关闭
# 这与 try-finally 结构类似

或者,也可以手动创建对象并调用这些方法:

# 创建对象
manager = ContextManager()
try:
    manager.__enter__()
    # 执行代码块
finally:
    manager.__exit__()

异步上下文管理器

异步上下文管理器指的是能够在其
__aenter__

__aexit__
方法中挂起执行的上下文管理器。这两个方法被定义为协程,并由调用者进行等待。通过
async with
表达式,可以实现这一功能。因此,异步上下文管理器通常用于asyncio程序中,尤其是在协程调用时。
async with
表达式是对传统
with
表达式的扩展,专门用于异步上下文管理器,允许在协程中进行异步操作,其使用方式与同步的
with
表达式相似,但能够处理异步任务。下面是一个定义异步上下文管理器的例子:

import asyncio
# 定义异步上下文管理器
class AsyncContextManager:
    # 进入异步上下文管理器
    async def __aenter__(self):
        # 报告消息
        print('> entering the context manager')
        # 模拟异步操作,暂时阻塞
        await asyncio.sleep(0.5)

    # 退出异步上下文管理器
    async def __aexit__(self, exc_type, exc, tb):
        # 报告消息
        print('> exiting the context manager')
        # 模拟异步操作,暂时阻塞
        await asyncio.sleep(0.5)

在使用异步上下文管理器时,通过
async with
表达式来调用它。这不仅会自动等待进入和退出协程,还会确保在执行过程中暂停当前协程,直到相关的异步操作完成:

# 使用异步上下文管理器
async with AsyncContextManager() as manager:
    # 在此块中执行一些异步任务
    # ...

以下示例展示了在 asyncio 程序中异步上下文管理器的常见使用模式,首先会创建
main()
协程,并将其作为 asyncio 程序的入口点。
main()
协程执行时,创建了一个
AsyncContextManager
类的实例,并在
async with
表达式中使用它:

import asyncio

# 定义异步上下文管理器
class AsyncContextManager:
    # 进入异步上下文管理器
    async def __aenter__(self):
        # 报告消息
        print('>进入上下文管理器')
        # 暂停一段时间
        await asyncio.sleep(0.5)

    # 退出异步上下文管理器
    async def __aexit__(self, exc_type, exc, tb):
        # 报告消息
        print('>退出上下文管理器')
        # 暂停一段时间
        await asyncio.sleep(0.5)

# 定义一个简单的协程
async def custom_coroutine():
    # 创建并使用异步上下文管理器
    async with AsyncContextManager() as manager:
        # 输出当前状态
        print(f'在上下文管理器内部')

# 启动异步程序
asyncio.run(custom_coroutine())

代码运行结果为:

>进入上下文管理器
在上下文管理器内部
>退出上下文管理器

2.4 asyncio中的非阻塞流

2.4.1 非阻塞流介绍

asyncio
的一个重要特点是能够在进行网络操作时避免阻塞,这意味着在等待数据的过程中,程序仍然可以继续执行其他任务。这一功能通过“流”(streams)来实现,流就像一个管道,用于收发数据。借助流,数据的发送和接收变得更加简便,无需依赖复杂的回调函数或底层实现细节。

具体来说,
asyncio
中的
asyncio streams
支持通过网络连接创建“写”流和“读”流。在这些流中,可以执行数据写入和读取操作,且在等待期间程序不会被某个操作卡住。操作完成后,网络连接即可关闭。尽管在使用流功能时需要自行处理一些网络协议的细节,这种方式仍然能够支持许多常见的网络协议,例如:

  • 与网站服务器通信的HTTP或HTTPS协议。
  • 用于发送电子邮件的SMTP协议。
  • 用于文件传输的FTP协议。

流不仅可以用于创建服务器并处理标准协议的请求,还能帮助开发者定制协议,以满足特定应用需求。接下来,将介绍如何使用异步流:

打开连接

可以使用
asyncio.open_connection()
函数打开 asyncio TCP 客户端套接字连接,建立网络连接并返回一对(reader、writer)对象。这些返回的对象是
StreamReader

StreamWriter
类的实例,用于与套接字交互。该函数是一个必须等待的协程,一旦套接字连接打开便返回。

例如:

# 打开一个连接
reader, writer = await asyncio.open_connection(...)

asyncio.open_connection()
函数需要许多参数来配置套接字连接,其中两个必需的参数是主机和端口:

  • 主机是一个字符串,指定要连接的服务器,例如域名或 IP 地址。
  • 端口是套接字端口号,例如HTTP服务器为80,HTTPS 服务器为 443,SMTP为25 等。

例如,打开与 HTTP 服务器的连接:

# 打开与 http 服务器的连接
reader, writer = await asyncio.open_connection('www.baidu.com', 80)

如果需要加密套接字连接(如 HTTPS),可以通过设置
ssl=True
实现 SSL 协议支持:

# 打开与 https 服务器的连接
reader, writer = await asyncio.open_connection('www.baidu.com', 443, ssl=True)

启动侦听服务

要启动一个异步的TCP服务器,可以使用
asyncio.start_server()
函数。这个函数会创建一个服务器,它会在指定的地址和端口上监听来自客户端的连接请求。这个函数是一个需要等待的协程,当调用时,它会返回一个
asyncio.Server
对象,代表正在运行的服务器。

下面是如何使用这个函数的一个示例:

# 启动一个 TCP 服务器
server = await asyncio.start_server(...)

这个函数需要三个参数:一个处理连接的函数、服务器的地址和端口号。处理连接的函数是一个用户自定义的函数,每当有客户端连接到服务器时,这个函数就会被调用。这个函数会接收一对对象作为参数,这两个对象分别用于从客户端读取数据(
StreamReader
)和向客户端发送数据(
StreamWriter
)。

地址
是指客户端用来连接服务器的域名或IP地址,
端口号
则是服务器用来接收连接请求的网络端口,不同的服务通常会使用不同的端口,比如FTP服务常用端口21,而HTTP服务常用端口80。

下面是一个具体的使用示例:

# 定义一个处理客户端连接的函数
async def handler(reader, writer):
    # 在这里添加处理客户端请求的逻辑
    pass

# 使用指定的处理器、地址和端口号启动服务器
server = await asyncio.start_server(handler, '127.0.0.1', 80)

在这个例子中,
handler
函数将负责处理每个客户端的连接,
'127.0.0.1'
是本地回环地址,意味着服务器将只在本地计算机上可用,而
80
是HTTP服务的标准端口号。

使用StreamWriter写入数据

数据可以通过
asyncio.StreamWriter
写入套接字,套接字是计算机之间通过网络传输数据的通信端点。
StreamWriter
提供API将字节数据写入套接字连接的I/O流,数据会尝试立即发送到目标设备,若无法立即发送,则存储在缓冲区。写入后,最好使用
drain
方法清空缓冲区:

# 写入字节数据
writer.write(byte_data)
# 等待数据传输
await writer.drain()

使用StreamReader读取数据

数据可以通过
asyncio.StreamReader
从套接字中读取。读取的数据是字节格式,因此在使用之前可能需要进行编码。所有读取操作都是必须等待的协程。可以使用
read()
方法读取任意数量的字节,该方法会一直读取,直到文件末尾(EOF):

# 读取字节数据
byte_data = await reader.read()

也可以通过
n
参数指定要读取的字节数:

# 读取指定字节数的数据
byte_data = await reader.read(n=100)

使用
readline
方法可以读取单行数据,直到遇到新行字符
\n
或文件末尾(EOF),返回的是字节数据:

# 读取一行数据
byte_line = await reader.readline()

此外,
readexactly()
方法用于读取确切数量的字节,如果读取的字节数不足,则会引发异常。而
readuntil()
方法会读取字节数据,直到遇到指定的字节字符为止。

关闭连接

可以通过
asyncio.StreamWriter
来关闭套接字。调用
close()
方法即可关闭套接字,该方法不会阻塞:

#关闭套接字
writer.close()

尽管
close()
方法不会阻塞,但可以通过
wait_close()
方法等待套接字完全关闭后再继续操作:

#关闭套接字
writer.close()
#等待套接字关闭
awaitwriter.wait_closed()

也可以通过
is_closing()
方法检查套接字是否已经关闭或正在关闭过程中:

#检查套接字是否已关闭或正在关闭
ifwriter.is_closing():
#...

2.4.2 使用asyncio检查HTTP状态

本节介绍如何使用
asyncio
模块通过打开流并进行HTTP请求和响应的读写操作,整个过程通常包括以下四个步骤:

  1. 打开连接
  2. 发送请求
  3. 读取响应
  4. 关闭连接

专业的异步HTTP框架可以参考使用:
aiohttp

打开链接

使用
asyncio.open_connection()
函数打开连接。该函数接受主机名和端口号作为参数,并返回一个
StreamReader

StreamWriter
,用于通过套接字进行数据的读写。这些功能通常用于在端口 80 上打开 HTTP 连接。

发送HTTP请求

在打开HTTSP连接后,可以向
StreamWriter
写入查询以发出 HTTP 请求。以HTTP版本1.1 请求为例,HTTP请求的格式为纯文本,可以请求根路径“/”,其示例如下:

GET / HTTP/1.1
Host: www.google.com

HTTP协议请求的具体介绍见:
HTTP协议请求/响应格式详解
。需要注意的是,每行末尾必须包含回车符和换行符(\r\n),且请求的末尾需有一个空行。若作为 Python 字符串表示,格式如下所示:

'GET / HTTP/1.1\r\n'
'Host: www.google.com\r\n'
'\r\n'

在写入
StreamWriter
之前,必须将该字符串编码为字节。可以通过调用
encode()
方法实现字符串编码,默认的“utf-8”编码通常适用。例如:

# 将字符串编码为字节
byte_data = string.encode()

接着,可以使用
StreamWriter

write()
方法将字节数据写入套接字。例如:

# 将查询写入套接字
# 等待套接字准备好
await writer.drain()

读取HTTP响应

发出HTTP请求后,可以读取响应。此操作可通过套接字的
StreamReader
实现。使用
read()
方法可一次读取一大块字节,或者使用
readline()
方法逐行读取字节。由于基于文本的HTTP协议通常每次发送一行HTML数据,因此
readline()
方法更加便捷。需要注意的是,
readline()
是一个协程,调用时需要等待其执行完成。示例如下:

#读取一行响应
line_bytes=awaitreader.readline()

HTTP/1.1响应由标头和正文两部分组成,二者通过空行分隔。标头部分包含关于请求是否成功以及即将发送的文件类型的信息,正文则包含文件内容,如HTML网页。HTTP标头的第一行通常表示请求页面的HTTP状态。每一行数据需要从字节解码为字符串,通常使用
decode()
方法,默认编码为"utf_8"。示例如下:

#将字节解码为字符串
line_data=line_bytes.decode()

关闭HTTP连接

可以通过调用
close()
方法关闭
StreamWriter
,从而关闭套接字连接。例如:

#关闭连接
writer.close()

此操作不会阻塞,并且可能不会立即关闭套接字。

2.4.3 asyncio中的流使用示例

本节介绍了一个用于检查网站状态的示例,代码实现了一个异步HTTP状态码获取工具。通过结合asyncio和urlsplit模块,该工具能够并发请求多个URL,并获取这些URL的HTTP状态行(例如 HTTP/1.1 200 OK)。代码流程如下:

  1. 解析URL
    :使用
    urlsplit(url)
    解析 URL,提取协议、主机名和路径等信息。
  2. 建立连接
    :根据协议选择相应端口(80 或 443),并建立异步网络连接。
  3. 发送HTTP请求
    :构建并发送简单的HTTP请求报文。
  4. 读取响应
    :异步读取 HTTP 响应的状态行并返回。
  5. 处理异常
    :若请求失败,捕获异常并输出错误信息,返回 None。
  6. 并发执行任务
    :通过
    asyncio.gather()
    实现并发执行所有 URL 请求,等待任务完成并返回结果。
  7. 输出结果
    :输出每个URL的HTTP状态行或错误信息。

示例代码如下:

import asyncio
from urllib.parse import urlsplit  # 导入 urlsplit 函数,用于解析 URL

# 定义一个异步函数,用于获取指定 URL 的 HTTP/S 状态
async def get_status(url):
    # 使用 urlsplit 解析 URL,将其分解为各个部分(例如:scheme, hostname, path)
    url_parsed = urlsplit(url)
    
    try:
        # 根据 URL 的 scheme(协议)判断是 http 还是 https,选择相应的端口(80 或 443)
        if url_parsed.scheme == 'https':
            # 如果是 https 协议,连接到 443 端口,并启用 SSL 加密
            reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
        else:
            # 如果是 http 协议,连接到 80 端口
            reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
        
        # 构建 HTTP 请求报文:请求目标是 URL 的 path 部分,使用 HTTP/1.1 协议
        query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
        
        # 将请求报文写入连接,并使用 StreamWriter 将编码字节写入套接字。
        writer.write(query.encode())
        # 等待数据写入完成
        await writer.drain()
        
        # 从服务器读取一行响应数据(HTTP 状态行)
        response = await reader.readline()
        # 解码响应并去除多余的空白字符
        status = response.decode().strip()
        
        # 返回 HTTP 状态行(例如:"HTTP/1.1 200 OK")
        return status
    except Exception as e:
        # 如果请求过程中发生任何异常,捕获并输出错误信息
        print(f"Error fetching {url}: {e}")
        # 如果发生错误,返回 None
        return None
    finally:
        # 确保连接关闭
        writer.close()
        await writer.wait_closed()  # 等待连接完全关闭
        reader.feed_eof()  # 通知 reader 没有更多数据会到来

# 主协程,执行多个 URL 状态获取任务
async def main():
    # 定义一个包含多个 URL 的列表,表示我们需要检查的目标网站
    sites = [
        'https://www.baidu.com/',
        'https://www.bilibili.com/',
        'https://www.weibo.com/',
        'https://www.douyin.com/',
        'https://www.zhihu.com/',
        'https://www.taobao.com/',
        'https://www.sohu.com/',
        'https://www.tmall.com/',
        'https://www.xinhuanet.com/',
        'https://www.163.com/'
    ]
    
    # 为每个 URL 创建一个获取状态的异步任务
    tasks = [get_status(url) for url in sites]
    
    # 使用 asyncio.gather 并发执行所有任务,返回所有任务的结果(包括异常)
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 遍历 URL 和对应的响应状态,输出结果
    for url, status in zip(sites, results):
        # 如果状态不为空,表示请求成功,输出 URL 和 HTTP 状态
        if status is not None:
            print(f'{url:30}:\t{status}')
        else:
            # 如果状态为 None,表示请求失败,输出错误信息
            print(f'{url:30}:\tError')

# 运行主异步程序
asyncio.run(main())

3 参考

全新向量数据库SQL Server 2025:带你迈入AI驱动的数据未来

上次大家下单的《
微软憋大招:SQL Server + Copilot = 地表最强AI数据库!
》 抱怨迟迟没有发货,这次微软没有食言,
终于发货

前言

随着人工智能技术的普及,客户的数据平台和应用程序正面临新挑战。大多数组织预计会在云、边缘和专用基础设施的混合环境中部署人工智能工作负载,而隐私和安全性比以往任何时候都更重要。

SQL Server 2025预览版已经发布

微软 SQL Server 2025
(目前预览版)是一款支持企业级人工智能、从本地到云的数据库,旨在通过将人工智能引入客户数据来应对这些挑战。本次发布延续了 SQL Server 在性能和安全性上的三十年创新,并新增了人工智能功能。通过与 Microsoft Fabric 集成,客户可以将数据带入下一代数据分析领域。该版本支持混合环境,包括云、本地数据中心和边缘设备,同时利用 Microsoft Azure 的创新技术服务客户的数据库需求。 SQL Server 2025 社区技术预览(CTP)1已经发布

全新的向量数据库

SQL Server 2025 正在转变为一个
向量数据库
,通过内置的过滤能力和高效的向量搜索,为开发人员提供卓越的性能,并支持使用熟悉的 T-SQL 语法轻松调用。

内置高级人工智能

这个新版本集成了人工智能功能,简化了人工智能应用程序开发,并支持基于向量的增强检索生成(RAG)模式,具有安全、高性能且易于使用的向量支持功能。借助此新功能,您可以将向量与 SQL 数据结合,用于混合人工智能向量搜索。

用企业数据库构建人工智能应用程序

SQL Server 2025 是一款具备企业级安全性和合规性的向量数据库,可将企业人工智能引入您的数据中。其原生向量存储和索引由 DiskANN 提供支持,这是一种基于磁盘存储的向量搜索技术,可高效地在大数据集中找到相似的数据点。这些数据库能够高效支持数据分块,并通过语义搜索实现精确的数据检索。在最新版本中,SQL 引擎内还提供了灵活的人工智能模型管理功能,支持基于 REST 接口的从本地到云的人工智能模型调用。

此外,无论客户正在进行数据预处理、模型训练还是 RAG 模式,扩展的低代码工具都提供了灵活的模型接口,支持通过 T-SQL 和外部 REST 端点进行操作。这些工具通过与 LangChain、Semantic Kernel 和 Entity Framework Core 等流行人工智能框架的无缝集成,增强了开发人员创建各种人工智能应用程序的能力。


允许用户使用 T-SQL 命令和 sp_invoke_external_rest_endpoint 直接调用 ChatGPT 等 AI 服务
内置 DiskANN 组件进行向量搜索

数据库内置原生向量数据类型和向量索引

提升开发者生产力

在构建数据密集型应用程序(例如人工智能应用程序)时,扩展性、框架和数据增强是提升开发者生产力的关键。
我们确保 SQL Server 为开发人员提供一流的体验,新增以下功能:

  • REST API 支持
  • 通过 Data API Builder 集成 GraphQL
  • 支持正则表达式
  • 原生的 JSON 支持让开发人员能够更有效地处理频繁变化的模式和层级数据,从而更轻松地创建动态应用程序。所有功能都由 SQL Server 引擎提供的安全性保障,使其成为真正的企业级人工智能平台。

世界一流的安全性和性能

SQL Server 2025 是数据库安全性和性能的行业领导者。

  • 改进的凭据管理:支持 Microsoft Entra 托管身份,减少潜在漏洞并提供合规性和审计能力。

  • 性能和可用性增强:引入从 Azure SQL 测试过的查询优化和执行性能改进功能。

  • 新的优化功能: 可选参数计划优化(OPPO):根据客户提供的运行时参数值选择最佳执行计划,显著减少参数嗅探问题。

  • 辅助副本上的持久统计信息:防止在重新启动或故障转移期间丢失统计信息,避免性能下降。

Microsoft Fabric 和 Azure Arc 集成

  • 实时分析能力:Fabric 的 Mirrored SQL Server Database 功能提供了近实时的数据复制能力,使 SQL Server 数据可以轻松集成到 Microsoft OneLake 统一数据平台中。

  • 简化管理:通过 Azure Arc,SQL Server 2025 提供自动补丁、自动备份和许可证管理等功能。


参考文章

https://www.brentozar.com/archive/2024/11/whats-new-in-sql-server-2025/

https://www.linkedin.com/pulse/announcing-sql-server-2025-bob-ward-6s0hc

https://redmondmag.com/Articles/2024/11/19/SQL-Server-2025-Announced-at-Microsoft-Ignite-2024.aspx

本文版权归作者所有,未经作者同意不得转载。