[python] asyncio库常见问题与实践案例
本文详细介绍了在使用asyncio库编写异步程序时常见的错误和问题,并进一步通过实践案例进行分析和讨论,以便在项目中更有效地应用asyncio库。有关asyncio库的详细介绍,可参考:
Python 异步编程库 asyncio 使用指北
。
1 asyncio程序的常见错误
本节展示了在使用asyncio模块时,开发人员常遇到的一些常见错误示例。以下是四个最常见的异步编程错误:
- 直接调用并运行协程。
- 主协程过早退出。
- 错误使用asyncio的低级API。
- 程序出现竞争条件或死锁问题。
1.1 试图直接调用并运行协程
协程通常通过
async def
定义,如下所示:
# 自定义协程
async def custom_coro():
print('hi there')
若直接像函数一样调用该协程,通常不会执行预期的操作,而是创建一个协程对象。这种调用方式不会触发协程的执行:
# 错误:像函数一样调用协程
custom_coro() # 这只是创建了一个协程对象,并不会执行
此时,返回的是一个协程对象,而不是立即执行协程主体,这忽略协程必须在事件循环中运行。如果协程未被执行,系统将发出以下运行时警告:
sys:1: RuntimeWarning: coroutine 'custom_coro' was never awaited
要正确执行协程,需要在
asyncio
事件循环中等待该对象。例如,使用
asyncio.run()
启动事件循环来执行协程:
# 正确:通过 asyncio.run() 运行协程
import asyncio
asyncio.run(custom_coro())
另一种执行协程方法是通过
await
表达式在现有协程中挂起并调度其他协程。例如,定义一个新的协程,在其中调用
custom_coro()
:
# 正确:在协程中使用 await 调度另一个协程
async def main():
await custom_coro()
# 使用 asyncio.run 启动事件循环
asyncio.run(main())
1.2 主协程过早退出
在异步编程中,任务的执行可能无法按预期及时完成。通过
asyncio.create_task()
可以并行运行多个协程,但如果主协程提前退出,这些任务可能会被强制中止。为确保所有任务能够在主协程退出前完成,主协程应在无其他活动时显式等待剩余任务的完成。可以使用
asyncio.all_tasks()
来获取当前事件循环中的所有任务,并在移除主协程本身后,通过
asyncio.wait()
等待其他任务的执行结果。如果不移除当前协程,
asyncio.wait
会等待所有任务完成,包括当前协程,从而导致程序不退出(死锁)。示例如下:
import asyncio
async def task_1():
print("任务 1 开始")
await asyncio.sleep(2)
print("任务 1 完成")
async def task_2():
print("任务 2 开始")
await asyncio.sleep(1)
print("任务 2 完成")
async def main():
# 创建多个任务
task1 = asyncio.create_task(task_1())
task2 = asyncio.create_task(task_2())
# 获取所有正在运行的任务的集合
all_tasks = asyncio.all_tasks()
# 获取当前任务(即主协程)
current_task = asyncio.current_task()
# 从所有任务列表中删除当前任务
all_tasks.remove(current_task)
# 暂停直到所有任务完成
await asyncio.wait(all_tasks)
# 运行主协程
asyncio.run(main())
代码运行结果为:
任务 1 开始
任务 2 开始
任务 2 完成
任务 1 完成
1.3 错误使用asyncio的低级API
asyncio
提供了两类API:一类是面向应用程序开发者的高级API,另一类是面向框架开发者的低级API。低级API主要为高级API提供底层支持,如事件循环、传输协议等内部结构。在大多数情况下,推荐优先使用高级API,特别是在学习阶段。只有在需要实现特定功能时,才应考虑使用低级API。尽管学习低级API具有一定的价值,但不应在刚开始时就使用。建议先通过高级API熟悉异步编程的基本概念,进行应用开发,掌握核心知识后,再深入探讨技术细节。例如:
import asyncio
# 高级API:推荐的用法
async def hello_world():
print("你好,世界!")
# 使用 asyncio.run 来启动事件循环
def run_hello_world():
asyncio.run(hello_world())
# 低级API:不推荐直接使用
async def low_level_example():
loop = asyncio.get_event_loop() # 获取当前事件循环
task = loop.create_task(hello_world()) # 创建任务
await task # 显式等待任务完成
# 运行高级 API 示例
print("使用 asyncio.run 运行:")
run_hello_world()
# 运行低级 API 示例
print("\n使用低级 API 运行:")
asyncio.run(low_level_example())
1.4 程序出现竞争条件或死锁问题
竞争条件和死锁是并发编程中常见的错误。竞争条件发生在多个任务同时访问相同资源时,缺乏适当的控制可能导致数据错误或丢失。死锁则是指不同任务互相等待对方释放资源,最终导致所有任务无法继续执行。
许多Python开发者认为,使用
asyncio
协程可以避免这些问题,因为在任何时刻,事件循环中只有一个协程在执行。然而,协程在运行过程中可能会暂停和恢复,并且可能会访问共享资源。如果对这些资源没有适当的保护,就可能会引发竞争条件。此外,在协程同步资源时处理不当,也有可能导致死锁。因此,在编写
asyncio
程序时,确保协程的安全性至关重要。
1.4.1 竞争条件问题
以下示例代码模拟了两个异步任务并行增加共享变量
counter
,每个任务循环10000次对
counter
进行递增操作。通过
awaitasyncio.sleep(0)
来模拟上下文切换,确保两个任务能够交替执行。然而,由于未使用同步机制(如锁),会导致竞态条件。因此,最终的
counter
值可能小于预期的20000,而不是20000,因为两个任务可能在读取和更新
counter
的值时发生冲突,导致多个协程可能重复更新相同的数据:
import asyncio
# 共享资源
counter = 0
async def increment():
global counter
for _ in range(10000):
temp = counter
temp += 1
await asyncio.sleep(0) # 让出控制权,模拟上下文切换
counter = temp
async def main():
tasks = [increment(), increment()]
await asyncio.gather(*tasks)
print("最终计数器的值:", counter)
# 运行 asyncio 程序
asyncio.run(main())
代码运行结果为:
最终计数器的值: 10000
为了解决这个问题,可以使用
asyncio.Lock
来同步对共享资源
counter
的访问。然而,由于
asyncio.Lock
与
asyncio.run
之间的事件循环可能不匹配,通常会在某些环境中(如特定的 IDE 或脚本运行环境)出现问题。原因在于
asyncio.run
创建并管理一个新的事件循环,而锁 (
asyncio.Lock
) 可能会被不同的事件循环使用,从而导致不一致。为避免这种情况,可以显式创建并使用一个事件循环,如下所示:
import asyncio
# 共享资源
counter = 0
# 创建锁
lock = asyncio.Lock()
async def increment():
global counter
for _ in range(10000):
async with lock: # 确保在修改 counter 时,只有一个任务可以访问
temp = counter
temp += 1
await asyncio.sleep(0) # 让出控制权,模拟上下文切换
counter = temp
async def main():
tasks = [increment(), increment()]
await asyncio.gather(*tasks)
print("最终计数器的值:", counter)
# 显式创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
代码运行结果为:
最终计数器的值: 20000
1.4.2 死锁问题
死锁介绍
死锁(Deadlock)是并发编程中的一种常见问题,它发生在多个任务之间的资源争用中,导致所有任务都陷入无法继续执行的僵局。即使在Python中使用
asyncio
协程框架,资源竞争和同步问题也可能导致死锁的发生,尤其是在协程需要同步资源(如锁)时。如果同步机制设计不当,容易引发死锁。
死锁的特征如下:
- 循环等待:多个任务之间相互等待对方释放资源,从而形成一个循环等待的关系。例如,任务1等待任务2释放资源,而任务2又在等待任务1释放资源,形成闭环。
- 不可抢占:每个任务持有的资源(如锁)不能被其他任务强制抢占。只有在任务主动释放资源时,其他任务才能获取该资源。
- 持有资源且等待:任务持有某些资源(如锁),同时又在等待其他资源的释放。由于任务在持有资源的情况下无法继续执行,导致系统中的任务无法前进。
以下代码中的死锁是典型的循环等待问题,所有相关任务陷入相互等待的死循环,无法继续执行:
import asyncio
# 创建两个共享锁
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
print("任务1:尝试获取锁1")
await lock1.acquire() # 获取锁1
print("任务1:已获取锁1,尝试获取锁2")
await asyncio.sleep(1) # 模拟一些操作
await lock2.acquire() # 获取锁2
print("任务1:已获取锁2")
# 释放锁
lock1.release()
lock2.release()
async def task2():
print("任务2:尝试获取锁2")
await lock2.acquire() # 获取锁2
print("任务2:已获取锁2,尝试获取锁1")
await asyncio.sleep(1) # 模拟一些操作
await lock1.acquire() # 获取锁1
print("任务2:已获取锁1")
# 释放锁
lock1.release()
lock2.release()
async def main():
# 启动两个任务
await asyncio.gather(task1(), task2())
# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
代码运行结果如下,由于两个任务都被挂起,程序无法退出,且永远不会打印出"任务1:已获取锁2"或"任务2:已获取锁1":
任务1:尝试获取锁1
任务1:已获取锁1,尝试获取锁2
任务2:尝试获取锁2
任务2:已获取锁2,尝试获取锁1
...
asyncio中死锁的避免
在使用
asyncio
时,为了避免死锁,可以采取以下几种方法:
- 锁的顺序管理:确保所有任务按照相同的顺序获取锁,以防止发生相互等待的情况。
- 尝试获取锁:使用
asyncio.Lock
的
acquire
方法并设置超时时间,避免任务长时间处于等待锁的状态。 - 使用
async with
:通过
async with
语句来管理锁,这样可以确保在任务完成后自动释放锁,避免因忘记释放锁而引发问题。
根据这一思路,前面死锁的案例解决示例代码如下:
import asyncio
# 创建两个共享锁
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
print("任务1:尝试获取锁1")
async with lock1: # 使用async with获取锁,自动释放
print("任务1:已获取锁1,尝试获取锁2")
await asyncio.sleep(1) # 模拟一些操作
print("任务1:尝试获取锁2")
async with lock2: # 使用async with获取锁,自动释放
print("任务1:已获取锁2")
async def task2():
print("任务2:尝试获取锁1")
async with lock1: # 使用async with获取锁,自动释放
print("任务2:已获取锁1,尝试获取锁2")
await asyncio.sleep(1) # 模拟一些操作
print("任务2:尝试获取锁2")
async with lock2: # 使用async with获取锁,自动释放
print("任务2:已获取锁2")
async def main():
# 启动两个任务
await asyncio.gather(task1(), task2())
# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
代码运行结果如下,可以看到两个任务避免了死锁:
任务1:尝试获取锁1
任务1:已获取锁1,尝试获取锁2
任务2:尝试获取锁1
任务1:尝试获取锁2
任务1:已获取锁2
任务2:已获取锁1,尝试获取锁2
任务2:尝试获取锁2
任务2:已获取锁2
2 asyncio程序的常见问题
在使用asyncio编写异步程序时,开发者可能会遇到一系列常见问题,这些问题涉及到任务的管理、执行流程、性能优化等多个方面。以下是一些常见的问题和挑战:
- 任务的等待、停止、结果获取
- 如何在后台运行和等待任务
- 任务的延迟后运行和后续运行
- 如何显示运行任务的进度
- 如何在asyncio中执行阻塞I/O或CPU密集型函数
- Python协程:操作系统原生支持吗
2.1 任务的等待、停止、结果获取
2.1.1 如何等待任务
可以通过直接等待
asyncio.Task
对象来等待任务的完成:
# 等待任务完成
await task
也同时创建并等待任务完成。例如:
# 创建并等待任务完成
await asyncio.create_task(custom_coro())
与协程不同,任务可以多次等待而不会引发错误。以下是一个演示如何多次等待同一任务的示例,在此例中,
await task
两次都能成功执行,因为
task
已经完成并保存了返回值:
import asyncio
async def other_coro():
await asyncio.sleep(1)
return "任务完成"
async def main():
# 将协程包装在任务中并安排其执行
task = asyncio.create_task(other_coro())
# 第一次等待任务并获取返回值
value1 = await task
print(value1)
# 再次等待任务(任务已经完成)
value2 = await task
print(value2)
# 运行主协程
asyncio.run(main())
2.1.2 何时停止任务
可以通过
asyncio.Task
对象的
cancel()
方法取消任务。若任务被成功取消,
cancel()
方法返回
True
,否则返回
False
。例如:
# 取消任务
was_cancelled = task.cancel()
2.1.3 如何获取任务的返回值
在Python中创建一个
asyncio
任务后,有两种方法可以从
asyncio.Task
中检索返回值:
- 等待任务(使用
await
)。 - 调用
result()
方法。
基于
await
函数,等待任务时,调用者会挂起,直到任务完成并返回结果。如果任务已完成,返回值会立即提供。以下代码展示了如何等待任务并获取其返回值:
import asyncio
async def other_coro():
await asyncio.sleep(1)
return "任务完成"
async def main():
# 将协程包装在任务中并安排其执行
task = asyncio.create_task(other_coro())
# 等待任务完成并获取返回值
value = await task
print(value)
# 运行主协程
asyncio.run(main())
也可以通过调用
asyncio.Task
对象的
result()
方法获取任务的返回值。此时要求任务已完成。如果任务未完成,调用
result()
会引发
InvalidStateError
异常。如果任务被取消,则会引发
CancelledError
异常。以下是一个使用
result()
方法的例子:
import asyncio
async def other_coro():
await asyncio.sleep(1)
return "任务完成"
async def main():
task = asyncio.create_task(other_coro())
# 等待任务完成
await task
try:
# 获取任务的返回值
value = task.result()
print(value)
except asyncio.InvalidStateError:
print("任务尚未完成")
except asyncio.CancelledError:
print("任务已取消")
# 运行主协程
asyncio.run(main())
2.2 如何在后台运行和等待任务
2.2.1 如何在后台运行任务
通过
asyncio.create_task()
可以将协程封装为Task对象,并在后台执行。创建的任务对象会立即返回,且不会阻塞调用者的执行。为了确保任务能够开始执行,可以使用
await asyncio.sleep(0)
暂停片刻。之所以使用
await asyncio.sleep(0)
,是因为新创建的任务并不会立刻开始执行。事件循环负责管理多个任务,它会根据调度策略决定哪个任务优先执行。通过
await asyncio.sleep(0)
暂时让出执行权,使得事件循环有机会调度并执行刚刚创建的任务。这样,
await asyncio.sleep(0)
确保了任务在创建后能尽早开始执行,同时不会阻塞主协程的其他操作。示例代码如下:
import asyncio
async def other_coroutine():
print("开始执行 other_coroutine")
await asyncio.sleep(2)
print("other_coroutine 执行完毕")
async def main():
# 创建并调度任务
task = asyncio.create_task(other_coroutine())
# 暂停片刻以确保任务开始执行
await asyncio.sleep(0)
print("主协程正在执行")
# 等待任务完成
await task
print("任务执行完毕")
# 运行主协程
asyncio.run(main())
此外,后台任务可以在程序运行时执行,不会妨碍主程序的结束。如果主程序没有其他待执行的任务,而后台任务仍在进行中,那么需要确保程序在后台任务完成后才会完全退出。
2.2.2 如何等待所有后台任务
在使用
asyncio
时,可能需要等待多个独立的任务完成。比如,当多个任务同时运行时,有时想要等待所有任务完成,但又不想一直阻塞当前正在运行的任务。为了实现这个功能,可以通过以下步骤:
- 获取所有当前任务:使用
asyncio.all_tasks()
可以获取到当前事件循环中的所有任务。 - 排除当前任务:通过
asyncio.current_task()
获取当前正在运行的任务,并将其从任务集合中移除。这样可以避免等待当前任务自己。 - 等待所有剩余任务完成:使用
asyncio.wait()
来等待所有任务完成,直到它们都执行完毕。
示例代码如下:
import asyncio
async def example_coroutine(name):
# 这是一个模拟任务的协程,睡眠 1 秒钟
await asyncio.sleep(1)
print(f"任务 {name} 完成。")
async def main():
# 创建多个协程任务
tasks = [asyncio.create_task(example_coroutine(name = str(i))) for i in range(5)]
# 获取所有正在运行的任务
all_tasks = asyncio.all_tasks()
# 获取当前正在运行的任务(即 main 协程)
current_task = asyncio.current_task()
# 从任务集合中移除当前任务
all_tasks.remove(current_task)
# 等待所有其他任务完成
await asyncio.wait(all_tasks)
# 启动事件循环并执行主协程
asyncio.run(main())
2.3 任务的延迟后运行和后续运行
2.3.1 任务的延迟后运行
想要实现任务的延迟后运行,可以通过开发一个自定义的包装协程,使其在延迟指定时间后执行目标协程。该包装协程接受两个参数:目标协程和延迟时间(单位为秒)。它会先休眠指定的延迟时间,然后执行传入的目标协程。
以下代码展示了如何通过自定义包装协程
delay
,在指定的延迟时间后执行目标协程。
delay
协程通过
asyncio.sleep()
实现延时,随后再执行传入的目标协程。可以在不同场景中使用该方法,如直接挂起协程或将任务安排为独立执行:
import asyncio
# 延迟几秒后启动另一个协程的包装协程
async def delay(coro, seconds):
"""
延迟指定时间(秒)后执行目标协程。
参数:
coro: 要执行的目标协程
seconds: 延迟时间,单位为秒
"""
# 暂停指定时间(以秒为单位)
await asyncio.sleep(seconds)
# 执行目标协程
await coro
# 示例目标协程
async def my_coroutine():
print("目标协程开始执行")
# 模拟一些工作
await asyncio.sleep(2)
print("目标协程执行完成")
# 使用包装协程时,可以创建协程对象并直接等待,或将其作为任务独立执行
# 1. 调用者可以挂起并调度延迟后的协程
async def main():
print("延迟10秒后执行目标协程:")
await delay(my_coroutine(), 10)
print("目标协程已经完成执行")
# 2. 或者调用者可以安排延迟协程独立运行
async def schedule_task():
print("将目标协程安排为独立任务,延迟10秒后执行")
task = asyncio.create_task(delay(my_coroutine(), 10))
await task # 等待任务完成
print("任务已完成")
# 运行示例
if __name__ == "__main__":
asyncio.run(main()) # 运行主协程
# 或者运行独立任务的调度
# asyncio.run(schedule_task())
2.3.2 任务的后续运行
在asyncio中,触发后续任务的方式主要有三种:
- 通过已完成的任务本身调度后续任务
- 通过任务发起方调度后续任务
- 使用回调函数自动调度后续任务
逐一分析这三种方式:
1. 通过已完成的任务本身调度后续任务
已完成的任务可以触发后续任务的调度,通常依赖于某些状态检查来决定是否应该发起后续任务。任务调度可以通过
asyncio.create_task()
来完成。示例代码展示了运行指定任务后直接调度后续任务:
import asyncio
async def task():
print("任务开始执行。")
await asyncio.sleep(2) # 模拟任务执行
print("任务执行完成。")
await followup_task() # 在任务完成后直接调度后续任务
async def followup_task():
print("正在执行后续任务。")
await asyncio.sleep(2) # 模拟后续任务执行
print("后续任务执行完成。")
# 启动事件循环,执行任务
async def main():
await task()
asyncio.run(main())
2. 通过任务发起方调度后续任务
任务发起方可以根据实际需要决定是否继续启动后续任务。在启动第一个任务时,可以保留
asyncio.Task
对象,通过检查任务的结果或状态,来判断是否启动后续任务。任务发起方还可以选择等待后续任务完成,也可以选择不等待。示例代码如下:
import asyncio
async def task():
# 模拟一个任务
await asyncio.sleep(1)
return True # 假设任务成功完成,返回True
async def followup_task():
# 模拟后续任务
await asyncio.sleep(1)
print("后续任务执行")
async def main():
# 发起并等待第一个任务
task_1 = asyncio.create_task(task())
# 等待第一个任务完成
result = await task_1
# 检查任务结果
if result:
# 发起后续任务
await followup_task()
# 运行主程序
asyncio.run(main())
3. 使用回调函数自动调度后续任务
在任务发起时,可以为其注册一个回调函数。该回调函数会在任务完成后自动执行。回调函数接收一个
asyncio.Task
对象作为参数,但它不会等待后续任务的执行。因为回调函数通常是普通的Python函数,无法进行异步操作。示例代码:
import asyncio
# 定义回调函数
def callback(task):
# 安排并启动后续任务
# 注意:这里不能直接使用 await,需通过 create_task 调度异步任务
asyncio.create_task(followup())
# 定义第一个异步任务
async def work():
print("工作任务正在执行...")
await asyncio.sleep(2) # 模拟一些异步操作
print("工作任务完成!")
# 定义后续异步任务
async def followup():
print("后续任务正在执行...")
await asyncio.sleep(1) # 模拟一些异步操作
print("后续任务完成!")
# 创建事件循环并运行任务
async def main():
# 发起任务并注册回调函数
task = asyncio.create_task(work())
task.add_done_callback(callback)
# 等待任务完成
await task
# 确保后续任务完成
await asyncio.sleep(1) # 等待回调任务完成的时间
# 执行事件循环
asyncio.run(main())
2.4 如何显示运行任务的进度
2.4.1 基于回调函数的任务进度显示
每个任务的回调函数可用于显示进度。
asyncio.Task
对象支持注册回调函数,这些函数会在任务完成时被调用,无论是正常完成还是以异常结束。回调函数是普通函数而非协程,且接受与其关联的
asyncio.Task
对象作为参数。通过为所有任务注册相同的回调函数,可以统一报告任务进度:
import asyncio
# 回调函数,用于显示任务完成的进度,区分任务
def progress(task):
task_name = task.get_name() # 获取任务的名称
print(f"任务 {task_name} 完成。")
async def example_task(n, task_name):
"""模拟一个异步任务,表示处理n秒的任务,并设置任务名称"""
await asyncio.sleep(n)
return task_name
async def main():
# 定义多个异步任务并添加回调函数
tasks = []
for i in range(1, 6):
task_name = f"Task-{i}" # 为每个任务分配一个唯一名称
task = asyncio.create_task(example_task(i, task_name)) # 创建任务,模拟不同的执行时间
task.set_name(task_name) # 设置任务名称
# 为任务添加回调函数,回调函数会在相应任务执行完毕时被调用
task.add_done_callback(progress)
tasks.append(task)
# 等待所有任务完成
await asyncio.gather(*tasks)
# 运行主程序
asyncio.run(main())
2.4.2 基于tqdm库的任务进度显示
使用tqdm库显示任务总体进度
以下代码演示了如何结合
tqdm
库和
asyncio
库,来展示异步任务的总体执行进度:
import asyncio
from tqdm.asyncio import tqdm
async def example_task(n, task_name):
"""模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""
await asyncio.sleep(n) # 模拟任务处理时间
return task_name # 返回任务名称
async def main():
# 定义多个异步任务并使用 tqdm 显示进度
tasks = []
total_tasks = 5 # 总任务数
task_durations = [1, 2, 3, 4, 5] # 每个任务的持续时间(秒)
# 使用 tqdm 创建进度条,`total` 为任务的数量
progress_bar = tqdm(total=total_tasks, desc="已完成任务数", ncols=100)
# 创建任务
for i, n in enumerate(task_durations):
task_name = f"Task-{i+1}" # 为每个任务分配一个唯一名称
task = asyncio.create_task(example_task(n, task_name)) # 创建任务,模拟不同的执行时间
tasks.append(task)
# 等待任务完成并更新进度条
for task in asyncio.as_completed(tasks):
await task # 等待每个任务完成
progress_bar.update(1) # 每完成一个任务,更新进度条
progress_bar.close() # 关闭进度条
# 运行主程序
asyncio.run(main())
使用tqdm库为多个任务设置单独进度条
以下示例代码演示了如何使用
asyncio
并行执行多个异步任务,同时通过
tqdm
库为每个任务单独显示进度条:
import asyncio
from tqdm.asyncio import tqdm
async def example_task(n, task_name, progress_bar):
"""模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""
for _ in range(n): # 每秒更新一次进度
await asyncio.sleep(1) # 模拟任务处理时间
progress_bar.update(1) # 更新当前任务的进度
return task_name # 返回任务名称
async def main():
# 定义多个异步任务并使用 tqdm 显示进度
tasks = []
total_tasks = 5 # 总任务数
task_durations = [1, 2, 3, 4, 5] # 每个任务的持续时间(秒)
# 创建进度条并为每个任务单独设置
progress_bars = []
for i, n in enumerate(task_durations):
task_name = f"Task-{i+1}" # 为每个任务分配一个唯一名称
progress_bar = tqdm(total=n, desc=task_name, ncols=100, position=i) # 创建任务对应的进度条
progress_bars.append(progress_bar)
task = asyncio.create_task(example_task(n, task_name, progress_bar)) # 创建任务
tasks.append(task)
# 等待任务完成
await asyncio.gather(*tasks) # 使用 asyncio.gather 同时等待所有任务完成
# 关闭所有进度条
for progress_bar in progress_bars:
progress_bar.close()
# 运行主程序
asyncio.run(main())
2.5 如何在asyncio中执行阻塞I/O或CPU密集型函数
在编程中,“阻塞调用”指的是某些操作(例如读取文件、等待网络请求或执行数据库查询等)需要一定时间才能完成。在执行这些操作时,程序会暂停,无法继续处理其他任务,这就是“阻塞”。另外,CPU密集型操作也可能会导致程序阻塞。因此,为了在异步环境中仍然能够处理阻塞调用,asyncio模块提供了两种方法来在异步程序中执行阻塞调用:
asyncio.to_thread()
:此方法简化了线程管理流程,特别适合处理大多数I/O密集型任务。它允许将阻塞调用委派给一个线程,从而避免阻塞主事件循环。loop.run_in_executor()
:此方法提供了更高的灵活性,支持使用自定义的执行器,比如线程池或进程池。这适用于需要精细控制执行环境的场景。
这两种方法均可有效地将阻塞调用转为异步任务,以下逐一分析这两种方式:
2.5.1 使用
asyncio.to_thread()
asyncio.to_thread()
是一个高级 API,适用于大多数应用场景。它会将指定的函数和参数提交到一个独立的线程中执行,并返回一个可等待的协程。这样,阻塞操作就可以在后台线程池中执行,而不会阻塞事件循环。需要注意的是,任务并不会立即执行,而是会等待事件循环空闲时再开始执行。由于
asyncio.to_thread()
会在后台创建一个
ThreadPoolExecutor
来处理阻塞任务,因此它特别适合 I/O 密集型的操作。示例代码如下:
import asyncio
import time
def blocking_task(task_id):
# 模拟一个耗时的阻塞操作
time.sleep(2)
return f"任务 {task_id} 完成"
# 同步执行多个任务
def sync_main():
start_time = time.time()
# 顺序执行多个阻塞任务
results = [blocking_task(i) for i in range(5)]
end_time = time.time()
for result in results:
print(result)
print(f"同步任务执行时间: {end_time - start_time:.4f} 秒")
# 异步运行多个阻塞任务
async def async_main():
start_time = time.time()
# 使用 asyncio.to_thread 来并发运行多个阻塞任务
tasks = [asyncio.to_thread(blocking_task, i) for i in range(5)]
results = await asyncio.gather(*tasks)
end_time = time.time()
for result in results:
print(result)
print(f"异步任务执行时间: {end_time - start_time:.4f} 秒")
# 执行同步任务
print("同步执行开始:")
sync_main()
# 执行异步任务
print("\n异步执行开始:")
asyncio.run(async_main())
以上代码展示了同步执行阻塞任务与异步执行阻塞任务的对比。通过使用
asyncio.to_thread()
,I/O 密集型操作的处理被委托给独立的线程池,从而避免了阻塞事件循环,显著提升了异步任务的效率:
- 同步执行:在
sync_main()
中,多个阻塞任务按顺序逐一执行,每个任务需等待前一个任务完成后才能开始,整体执行时间为所有任务总时间(即 5 * 2 秒)。 - 异步执行:在
async_main()
中,多个阻塞任务并发执行。尽管每个任务仍然是阻塞的,但它们在后台线程中并行处理,因此总执行时间仅为单个任务的执行时间(即约 2 秒)。
代码运行结果如下:
同步执行开始:
任务 0 完成
任务 1 完成
任务 2 完成
任务 3 完成
任务 4 完成
同步任务执行时间: 10.0317 秒
异步执行开始:
任务 0 完成
任务 1 完成
任务 2 完成
任务 3 完成
任务 4 完成
异步任务执行时间: 2.0089 秒
2.5.2 使用
loop.run_in_executor()
loop.run_in_executor()
是
asyncio
提供的低级API,需先获取事件循环(例如,使用
asyncio.get_running_loop()
)。该函数允许指定执行器(默认是
ThreadPoolExecutor
)以及要执行的函数。
与
asyncio.to_thread()
相比,
run_in_executor()
提供了更大的灵活性,支持使用自定义执行器,而不仅限于线程池。此外,调用该函数后,任务会立即开始执行,无需等待返回的可等待对象来触发任务的启动。
示例代码如下:
import asyncio
import time
# 定义一个需要执行的阻塞任务
def task():
print("任务开始")
time.sleep(2)
print("任务结束")
# 在单独的线程中执行函数
async def main():
# 获取事件循环
loop = asyncio.get_running_loop()
# 使用run_in_executor来将task函数异步执行在线程池中
# None 表示使用默认的线程池执行器
await loop.run_in_executor(None, task)
# 执行主任务
asyncio.run(main())
如果希望使用进程池,可以创建一个自定义的执行器并传递给
run_in_executor()
。在这种情况下,调用者需要负责管理执行器的生命周期,使用完后要手动关闭。示例代码如下:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
# 定义一个耗时的任务
def task(name):
print(f"任务 {name} 开始")
time.sleep(2) # 模拟一个阻塞的操作
print(f"任务 {name} 完成")
return f"来自 {name} 的结果"
# 使用自定义的执行器来运行任务
async def main():
# 创建一个进程池
with ProcessPoolExecutor() as executor:
# 获取当前的事件循环
loop = asyncio.get_running_loop()
# 使用 run_in_executor 来在进程池中执行任务
results = await asyncio.gather(
loop.run_in_executor(executor, task, "A"),
loop.run_in_executor(executor, task, "B"),
loop.run_in_executor(executor, task, "C")
)
# 打印所有任务的结果
for result in results:
print(result)
# 启动 asyncio 事件循环并执行 main
if __name__ == "__main__":
asyncio.run(main())
2.6 Python协程:操作系统原生支持吗
异步编程和协程并不总是解决程序中所有并发问题的最佳方案。Python 中的协程是由软件管理的,它们通过asyncio事件循环来执行和调度。与操作系统提供的线程和进程不同,协程并不由操作系统直接支持,而是通过Python的软件框架来实现的。在这个意义上,Python中的协程并不是“原生”的。它们并不像线程或进程那样具有独立的执行上下文,反而是在同一个线程内通过协作式调度来切换任务。
此外,Python的GIL(全局解释器锁)用来保护解释器内部的状态,防止多个线程同时访问和修改解释器的数据。而asyncio的事件循环是单线程运行的,这意味着所有的协程都在同一个线程里执行。由于协程本身是通过事件循环调度的,而不是通过多线程或多进程并行执行,因此,尽管Python中的多线程模型受到GIL的限制,协程在处理 I/O 密集型任务时能够有效避免GIL的影响,从而提高并发性能。这也是为什么在处理大量I/O操作时,使用asyncio和协程能够带来较好的性能表现。
然而,协程并不适用于所有类型的并发任务。例如,对于计算密集型任务,使用线程或进程模型可能更为合适,因为协程并不会突破GIL的限制,计算密集型任务依然会在单个CPU核心上串行执行。因此,在选择是否使用协程时,需要根据任务的特性做出权衡。
3 应用实例
3.1 在基于线程的程序中调用asyncio代码
直接调用同步I/O代码
以下代码实现了一个简单的Tkinter应用,点击按钮后,程序会发起一个同步HTTP请求(GET 请求)。在每60毫秒的刷新周期中,程序会根据当前状态更新显示的文本。然而,当点击按钮时,
request_remote
方法中的
requests.get
会发起一个同步请求,这会阻塞主线程,从而导致界面卡顿或无响应。如下代码,
App.QUERYING_STATE
状态相关信息不会显示出来:
import tkinter as tk
import requests
class App(tk.Tk):
INIT_STATE = 0 # 初始化状态
QUERYING_STATE = 1 # 请求中状态
RESULT_STATE = 2 # 请求结果状态
def __init__(self):
super().__init__()
self.status_code = 0 # HTTP请求返回的状态码
self._refresh_ms = 60 # 刷新间隔时间(毫秒)
self.state = App.INIT_STATE # 初始状态
self._button = None # 按钮
self._label = None # 标签
self.render_elements() # 渲染界面元素
self.after(self._refresh_ms, self.refresh) # 设置定时刷新,定时调用refresh方法
def render_elements(self):
""" 设置界面布局,渲染UI元素 """
self.geometry("400x200") # 设置窗口大小
self._button = tk.Button(self, text="请求状态码", command=self.request_remote) # 创建按钮,点击时调用request_remote方法
self._label = tk.Label(self, text="") # 创建标签,初始为空
self._button.pack() # 将按钮添加到窗口中
self._label.pack() # 将标签添加到窗口中
def request_remote(self):
""" 发起同步HTTP请求 """
self.state = App.QUERYING_STATE # 设置状态为请求中
response = requests.get("https://www.example.com") # 发起GET请求,获取响应
self.status_code = response.status_code # 获取响应返回的状态码
self.state = App.RESULT_STATE # 设置状态为结果状态,表示请求已完成
def refresh(self):
""" 每60毫秒刷新一次UI内容 """
self.update_label() # 更新标签内容
self.after(self._refresh_ms, self.refresh) # 设置下次刷新时间(每60毫秒刷新一次)
def update_label(self):
""" 根据应用状态更新标签内容 """
if self.state == App.INIT_STATE:
self._label.config(text="这里将显示状态码。") # 初始状态下提示文字
elif self.state == App.QUERYING_STATE:
self._label.config(text="正在查询远程...") # 请求中状态时显示提示文字
elif self.state == App.RESULT_STATE:
self._label.config(text=f"返回的状态码是: {self.status_code}") # 请求结果状态时显示返回的状态码
def start(self):
self.mainloop() # 启动Tkinter事件循环,进入GUI界面
def main():
app = App() # 创建应用实例
app.start() # 启动应用
if __name__ == "__main__":
main()
I/O请求的异步调用
可以将requests包替换为aiohttp包,实现I/O请求的异步调用。aiohttp和requests都是Python中常用的HTTP客户端库,但requests适用于同步场景,简单易用,aiohttp则适用于异步并发的场景,能够处理大量并行请求。具体区别如下:
- 同步vs异步:
- requests是一个同步库,意味着每次发送请求时,程序会等待响应回来后才继续执行。适用于一些简单的、串行的HTTP请求场景。
- aiohttp是一个异步库,基于Python的asyncio模块,能够在发送HTTP请求时非阻塞地继续执行其他任务。适用于需要大量并发请求或长时间等待的异步场景。
- 性能:
- requests由于是同步的,处理大量请求时容易出现性能瓶颈,因为每个请求必须等待前一个请求完成。
- aiohttp通过异步I/O处理,可以在等待响应时同时发起其他请求,极大提高了并发性能,尤其在处理大量HTTP请求时。
- 用法:
- requests用法简单,适合初学者和一般同步的任务。
- aiohttp需要使用async和await,适合需要并发或异步操作的任务。
在上述示例代码中,为了替代requests模块的同步请求,可以创建一个继承自
App
类的
AppAsync
类,并利用aiohttp和asyncio库实现异步请求。通过
async_request
方法异步发起HTTP请求:
import aiohttp
import asyncio
class AppAsync(App):
async def async_request(self):
"""
异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。
"""
async with aiohttp.ClientSession() as session: # 创建一个aiohttp会话对象
async with session.get("https://www.example.com") as response: # 发起GET请求
self.status_code = response.status # 获取响应状态码
self.state = App.RESULT_STATE # 更新应用状态
def __int__(self):
super().__init__()
def request_remote(self):
""" 使用asyncio.run来调用异步请求代码 """
self.state = self.QUERYING_STATE # 设置状态为请求中
asyncio.run(self.async_request()) # 异步发起HTTP请求
def main():
app = AppAsync() # 创建应用实例
app.start() # 启动应用
if __name__ == "__main__":
main() # 运行主程序
然而
AppAsync
类中的
asyncio.run(self.async_request())
会阻塞Tkinter的主线程,因为
asyncio.run()
会一直运行,直到异步任务完成。同时Tkinter自身有一个事件循环(mainloop()),与
asyncio
需要的事件循环冲突。如果在Tkinter内创建新事件循环,可能会导致Tkinter关闭或中断后出现问题。
将asyncio与线程结合
为了解决asyncio事件循环阻塞的问题,可以使用一个单独的守护线程,并在守护线程中运行事件循环,这样asyncio的事件循环就不会阻塞主线程。重写
AppAsync
类示例如下:
import aiohttp
import asyncio
import threading
class AppAsync(App):
def __init__(self):
super().__init__()
self._loop_thread = threading.Thread(target=self.run_asyncio_loop, daemon=True)
self._loop_thread.start() # 启动事件循环线程
async def async_request(self):
"""
异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。
"""
async with aiohttp.ClientSession() as session: # 创建一个aiohttp会话对象
async with session.get("https://www.example.com") as response: # 发起GET请求
self.status_code = response.status # 获取响应状态码
self.state = App.RESULT_STATE # 更新应用状态
def request_remote(self):
""" 使用异步请求,在事件循环中执行 """
self.state = App.QUERYING_STATE # 设置状态为请求中
asyncio.run_coroutine_threadsafe(self.async_request(), self._loop) # 调用异步请求并与当前事件循环进行交互
def run_asyncio_loop(self):
""" 运行asyncio事件循环 """
self._loop = asyncio.new_event_loop() # 创建新的事件循环
asyncio.set_event_loop(self._loop) # 设置当前线程的事件循环
self._loop.run_forever() # 启动事件循环
def main():
app = AppAsync() # 创建应用实例
app.start() # 启动应用
if __name__ == "__main__":
main() # 运行主程序
示例代码运行时,
App.QUERYING_STATE
状态相关信息会显示出来,
AppAsync
类主要的改动点如下:
AppAsync
类的构造函数:
- 增加了一个新的线程来运行asyncio事件循环,避免在Tkinter线程中阻塞。
- 使用
threading.Thread
启动一个守护线程,执行
run_asyncio_loop
方法,确保事件循环在后台运行。 - 在创建线程时设置为守护线程。这样即使主线程退出,守护线程也会自动结束。
run_asyncio_loop
方法:
- 在一个单独的线程中启动新的asyncio事件循环。
- 使用
asyncio.set_event_loop
设置当前线程的事件循环,并调用
loop.run_forever()
来保持事件循环持续运行。
request_remote
方法:
- 使用
asyncio.run_coroutine_threadsafe
将异步请求任务提交给后台事件循环执行,用于在非主线程中安全地执行协程。
- 使用
3.2 基于asyncio实现多核异步处理
单核异步处理
asyncio的并发机制是基于协作式多任务(协程),它不会并行地使用多个CPU核心来加速计算,所有的任务都是在单个核心上轮流执行的。以下代码模拟了1000个爬虫任务,并使用单核异步来执行:
import random
import asyncio
import time
# 模拟爬虫任务,执行时会有随机的延迟
async def fake_crawlers():
# 随机生成一个0.2到1.0秒之间的延迟,保留两位小数
io_delay = round(random.uniform(0.2, 1.0), 2)
await asyncio.sleep(io_delay)
result = 0
# 随机生成100,000到500,000之间的数字,用于模拟计算密集型任务
# 这段代码耗时大约0.2秒到0.5秒之间
for i in range(random.randint(100000, 500000)):
result += i
return result
# 主程序入口,负责创建并执行多个爬虫任务
async def main():
# time.monotonic()是用于测量时间间隔的可靠方法,它不受系统时间更改的影响
start = time.monotonic()
tasks = [asyncio.create_task(fake_crawlers()) for i in range(1000)] # 模拟创建1000个任务
await asyncio.gather(*tasks) # 等待所有任务完成
# 输出所有任务完成的时间
print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")
# 启动程序
asyncio.run(main())
代码运行结果如下:
所有任务已完成,耗时 8.51 秒
多核异步处理
要实现多核异步处理,可以将异步编程和多进程池结合起来使用。具体来说,主程序会把任务分成多个批次,每个批次由不同的进程来处理。每个进程内部,多个任务又是通过异步方式并行执行的。这样一来,计算密集型的任务可以通过多进程并行处理,而每个进程内部的I/O操作则可以通过asyncio来异步管理,从而大幅提高整体效率。示例代码如下,代码将1000个任务分布到10个子进程中并行执行,每个子进程执行100个模拟的爬虫任务:
import random
import asyncio #
import time
from concurrent.futures import ProcessPoolExecutor
# 模拟爬虫任务,执行时会有随机的延迟
async def fake_crawlers():
# 随机生成一个0.2到1.0秒之间的延迟,保留两位小数
io_delay = round(random.uniform(0.2, 1.0), 2)
await asyncio.sleep(io_delay)
result = 0
# 随机生成100,000到500,000之间的数字,用于模拟阻塞任务
# 这段代码耗时大约0.2秒到0.5秒之间
for i in range(random.randint(100000, 500000)):
result += i
return result
# 并发查询任务,通过起始和结束索引分配任务
async def query_concurrently(begin_idx: int, end_idx: int):
""" 启动并发任务,通过起始和结束序列号 """
tasks = []
# 根据给定的索引范围(从 begin_idx 到 end_idx),创建并发任务
for _ in range(begin_idx, end_idx, 1):
tasks.append(asyncio.create_task(fake_crawlers()))
# 等待所有任务完成,并返回每个任务的结果
results = await asyncio.gather(*tasks)
return results
# 批量任务执行函数,使用子进程池并行执行任务
def run_batch_tasks(batch_idx: int, step: int):
""" 在子进程中执行批量任务 """
# 计算当前批次任务的起始和结束索引
begin = batch_idx * step + 1 # 当前批次任务的起始索引
end = begin + step # 当前批次任务的结束索引
# 使用 asyncio.run() 启动异步任务并获取结果
results = [result for result in asyncio.run(query_concurrently(begin, end))]
return results
# 主函数,分批次将任务分配到子进程中执行
async def main():
""" 将任务分批次分配到子进程中执行 """
start = time.monotonic()
loop = asyncio.get_running_loop() # 获取当前运行的事件循环
# 创建进程池执行器,用于将任务分配到多个子进程中执行
with ProcessPoolExecutor() as executor:
# 启动多个批次任务,并行执行。每个批次执行 100个任务,共启动10个批次
tasks = [loop.run_in_executor(executor, run_batch_tasks, batch_idx, 100)
for batch_idx in range(10)]
# 等待所有子进程任务完成,并将结果汇总
results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list]
# 输出所有任务完成的时间
print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")
# 程序入口
if __name__ == "__main__":
asyncio.run(main())
代码运行结果如下:
所有任务已完成,耗时 1.83 秒
3.3 图片下载器
若经常需要从互联网下载文件,可以使用aiohttp库来实现任务的自动化。下面提供了一个简单的脚本,用于从指定URL下载文件:
建立本地图片服务器
为了提供图片下载链接,以下代码展示了如何使用FastAPI框架创建一个简单的Web应用程序,用于上传、管理和访问图片:
import os
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import uvicorn
app = FastAPI()
# 配置图片存储目录
UPLOAD_DIR = "./uploaded_images"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
# 将图片目录挂载为静态文件目录
app.mount("/images", StaticFiles(directory=UPLOAD_DIR), name="images")
# 上传图片接口
@app.post("/upload/")
async def upload_image(file: UploadFile = File(...)):
try:
# 定义图片保存路径
file_path = os.path.join(UPLOAD_DIR, file.filename)
# 保存图片到本地
with open(file_path, "wb") as f:
f.write(file.file.read())
# 返回图片的访问 URL
image_url = f"http://127.0.0.1:8000/images/{file.filename}"
return {"image_url": image_url}
except Exception as e:
return {"error": str(e)}
# 获取所有上传图片的链接
@app.get("/images_list/")
async def list_images():
try:
# 获取目录下的所有文件
files = os.listdir(UPLOAD_DIR)
image_urls = [f"http://127.0.0.1:8000/images/{file}" for file in files if os.path.isfile(os.path.join(UPLOAD_DIR, file))]
return {"image_urls": image_urls}
except Exception as e:
return {"error": str(e)}
# 获取单个图片
@app.get("/image/{image_name}")
async def get_image(image_name: str):
try:
file_path = os.path.join(UPLOAD_DIR, image_name)
if os.path.exists(file_path):
return FileResponse(file_path)
else:
return {"error": "Image not found"}
except Exception as e:
return {"error": str(e)}
# 启动 FastAPI 服务器
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
该代码实现了一个图片上传和访问服务,包含以下三个主要接口:
- 服务器启动后,会监听本地地址
127.0.0.1
的8000端口。 - 客户端可以通过以下方式与服务器进行交互:
- 访问
http://127.0.0.1:8000/upload/
上传图片,并获取返回的图片 URL。 - 访问
http://127.0.0.1:8000/images_list/
查看所有已上传图片的 URL。 - 访问
http://127.0.0.1:8000/images/{image_name}
来查看特定图片。
- 访问
注意,所有上传和保存的图片都会保存在本地的
uploaded_images
文件夹中。
图片下载
以下代码利用了aiohttp、asyncio和aiofiles库,通过异步方式从API获取图片URL列表,并将图片下载到指定目录。借助这些库的结合,代码能够高效地处理HTTP请求、文件下载和文件操作,同时确保主程序的执行不被阻塞:
import aiohttp # 导入 aiohttp 库,用于异步 HTTP 请求
import asyncio # 导入 asyncio 库,用于管理异步任务
import aiofiles # 导入 aiofiles 库,用于异步文件操作
import os
# 获取图片 URL 列表的异步函数
async def get_image_urls(api_url):
try:
# 使用 aiohttp 启动一个异步 HTTP 会话
async with aiohttp.ClientSession() as session:
# 异步发送 GET 请求以获取 API 返回的数据
async with session.get(api_url) as response:
# 如果响应状态码是 200 (请求成功)
if response.status == 200:
# 将响应内容解析为 JSON 格式
data = await response.json()
# 从 JSON 数据中提取图片 URL 列表,若没有则返回空列表
return data.get("image_urls", [])
else:
# 如果请求失败,打印错误信息
print(f"从 {api_url} 获取图片列表失败。状态码: {response.status}")
return []
except Exception as e:
# 如果发生任何异常,打印错误信息
print(f"获取图片列表时发生错误: {e}")
return []
# 下载文件的异步函数
async def download_file(url, save_directory):
try:
# 使用 aiohttp 启动异步 HTTP 会话
async with aiohttp.ClientSession() as session:
# 异步发送 GET 请求以获取文件内容
async with session.get(url) as response:
# 如果响应状态码是 200 (请求成功)
if response.status == 200:
# 确保保存文件的目录存在,若不存在则创建
os.makedirs(save_directory, exist_ok=True)
# 从 URL 中提取文件名
filename = os.path.join(save_directory, url.split('/')[-1])
# 异步打开文件以进行写入操作
async with aiofiles.open(filename, 'wb') as file:
# 读取响应内容
content = await response.read()
# 将内容写入本地文件
await file.write(content)
print(f"已下载 {filename}")
else:
# 如果下载失败,打印错误信息
print(f"下载 {url} 失败。状态码: {response.status}")
except Exception as e:
# 如果发生任何异常,打印错误信息
print(f"下载 {url} 时发生错误: {e}")
# 根据获取的图片 URL 列表进行下载的异步函数
async def download_images(api_url, save_directory):
# 调用 get_image_urls 函数获取图片 URL 列表
image_urls = await get_image_urls(api_url)
# 如果没有获取到图片 URL,则打印提示并返回
if not image_urls:
print("没有找到需要下载的图片。")
return
# 为每个图片 URL 创建一个下载任务
tasks = [download_file(url, save_directory) for url in image_urls]
# 使用 asyncio.gather 并行执行所有下载任务
await asyncio.gather(*tasks)
# 启动事件循环,开始下载图片
if __name__ == "__main__":
# API 地址,提供图片 URL 列表
api_url = "http://127.0.0.1:8000/images_list/"
# 指定保存下载图片的目录
save_directory = "downloads"
# 获取事件循环并运行下载任务
loop = asyncio.get_event_loop()
loop.run_until_complete(download_images(api_url, save_directory))
3.4 生产者消费者模型
生产者-消费者模型(Producer-Consumer Model)是一种经典的并发编程模式,旨在解决多个任务之间生产和消费的协调问题,从而确保资源得到合理利用并保证数据按顺序处理。该模型通过生产者和消费者两个角色,模拟共享资源的生产和消费过程。以下代码实现了一个基本的生产者-消费者模型,采用了asyncio进行异步任务处理:
import asyncio
from asyncio import Queue
from typing import List
# 生产者函数,负责将物品添加到队列
async def produce_items(queue: Queue, items: List[int], producer_name: str):
for item in items:
await queue.put(item) # 将物品放入队列
print(f"{producer_name} 添加物品:{item}")
await asyncio.sleep(0.5) # 模拟生产过程中的等待时间
print(f"{producer_name} 完成所有物品的生产")
# 消费者函数,负责从队列中取出并处理物品
async def consume_items(queue: Queue, consumer_name: str):
while True:
item = await queue.get() # 阻塞直到获取到一个物品
if item is None: # 使用None作为结束信号
queue.task_done() # 标记任务完成
break # 退出循环
print(f"{consumer_name} 处理物品:{item}")
await asyncio.sleep(1) # 模拟处理物品的时间
queue.task_done() # 标记任务完成
# 主函数,负责启动多个生产者和消费者任务
async def main():
queue = Queue() # 创建一个队列
items_to_produce = ['A','B','C','D'] # 需要生产的物品列表
# 创建产者任务(例如3个生产者)
producer_tasks = [
asyncio.create_task(produce_items(queue, items_to_produce, f"生产者_{i}"))
for i in range(3)
]
# 创建消费者任务(例如2个消费者)
consumer_tasks = [
asyncio.create_task(consume_items(queue, f"消费者_{i}"))
for i in range(2)
]
# 等待所有生产者任务完成
await asyncio.gather(*producer_tasks)
# 生产者完成后,发送 None 给消费者,通知它们退出
for _ in consumer_tasks:
await queue.put(None)
# 等待队列中的所有任务处理完成
await queue.join()
# 等待所有消费者任务完成
await asyncio.gather(*consumer_tasks)
if __name__ == '__main__':
# 运行主函数
asyncio.run(main())