Python Coroutine 池化实现

池化介绍

在当今计算机科学和软件工程的领域中,池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而,在 Python 的协程领域,我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什么会这样呢?

首先,Python Coroutine 的特性使得池化技术在协程中的应用相对较少。与像 Golang 这样支持有栈协程的语言不同,Python Coroutine 是无栈的,无法跨核执行,从而限制了协程池发挥多核优势的可能性。

其次,Python Coroutine 的轻量级和快速创建销毁的特性,使得频繁创建和销毁协程并不会带来显著的性能损耗。这也解释了为什么 Python 官方一直没有引入 CoroutinePoolExecutor。

然而,作为开发者,我们仍然可以在特定场景下考虑协程的池化。虽然 Python Coroutine 轻量,但在一些需要大量协程协同工作的应用中,池化技术能够提供更方便、统一的调度子协程的方式。尤其是在涉及到异步操作的同时需要控制并发数量时,协程池的优势就显而易见了。

关于 Python 官方是否会在未来引入类似于 TaskGroup 的 CoroutinePoolExecutor,这或许是一个悬而未决的问题。考虑到 Python 在异步编程方面的快速发展,我们不能排除未来可能性的存在。或许有一天,我们会看到 TaskGroup 引入一个 max_workers 的形参,以更好地支持对协程池的需求。

在实际开发中,我们也可以尝试编写自己的 CoroutinePoolExecutor,以满足特定业务场景的需求。通过合理的设计架构和对数据流的全局考虑,我们可以最大程度地发挥协程池的优势,提高系统的性能和响应速度。

在接下来的文章中,我们将探讨如何设计和实现一个简单的 CoroutinePoolExecutor,以及在实际项目中的应用场景。通过深入理解协程池的工作原理,我们或许能更好地利用这一技术,使我们的异步应用更为高效。

如何开始编写

如何开始编写 CoroutinePoolExecutor,首先我们要明确出其适用范畴、考虑到使用方式和其潜在的风险点:

  • 它并不适用于 Mult Thread + Mult Event Loop 的场景,因此它并非线程安全的。
  • 应当保持和 ThreadPoolExecutor 相同的调用方式。
  • 不同于 Mult Thread 中子线程不依赖于主线程的运行,而在 Mult Coroutine 中子协程必须依赖于主协程,因此主协程在子协程没有全部运行完毕之前不能直接 done 掉。这也解释了为什么 TaskGroup 官方实现中没有提供类似于 shutdown 之类的方法,而是只提供上下文管理的运行方式。

有了上述 3 点的考量,我们决定将 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。这样的好处在于,作为学习者一方面可以了解 ThreadPoolExecutor 的内部实现机制,另一方面站在巨人肩膀上的编程借鉴往往会事半功倍,对于自我的提升也是较为明显的。

在考虑这些因素的同时,我们将继续深入研究协程池的设计和实现。通过对适用范围和使用方式的明确,我们能更好地把握 CoroutinePoolExecutor 的潜在优势,为异步应用的性能提升做出更有针对性的贡献。

具体代码实现

在这里我先贴出完整的代码实现,其中着重点已经用注释标明。

以下是 CoroutinePoolExecutor 的代码实现:

import os
import asyncio
import weakref
import logging
import itertools


async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
    try:
        while True:
            work_item = await work_queue.get()

            if work_item is not None:
                await work_item.run()
                del work_item

                executor = executor_reference()
                if executor is not None:
                    # Notify available coroutines
                    executor._idle_semaphore.release()
                del executor
                continue

            # Notifies the next coroutine task that it is time to exit
            await work_queue.put(None)
            break

    except Exception as exc:
        logging.critical('Exception in worker', exc_info=True)


class _WorkItem:
    def __init__(self, future, coro):
        self.future = future
        self.coro = coro

    async def run(self):
        try:
            result = await self.coro
        except Exception as exc:
            self.future.set_exception(exc)
        else:
            self.future.set_result(result)


class CoroutinePoolExecutor:
    """
    Coroutine pool implemented based on ThreadPoolExecutor
    Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
    So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
    """

    # Used to assign unique thread names when coroutine_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers, coroutine_name_prefix=""):

        if max_workers is None:
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = asyncio.Queue()
        self._idle_semaphore = asyncio.Semaphore(0)
        self._coroutines = set()
        self._shutdown = False
        self._shutdown_lock = asyncio.Lock()
        self._coroutine_name_prefix = (coroutine_name_prefix or (
            f"{__class__.__name__}-{self._counter()}"
        ))

    async def submit(self, coro):
        async with self._shutdown_lock:
            # When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
            # This is because after shutdown, all sub-coroutines will end their work
            # one after another. Even if there are new coroutine tasks, they will not
            # be reactivated.
            if self._shutdown:
                raise RuntimeError('cannot schedule new coroutine task after shutdown')

            f = asyncio.Future()
            w = _WorkItem(
                f,
                coro
            )
            await self._work_queue.put(w)
            await self._adjust_coroutine_count()
            return f

    async def _adjust_coroutine_count(self):

        try:
            # 2 functions:
            # - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
            # - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
            # Since the Semaphore provided by asyncio does not have a timeout
            # parameter, you can choose to use it with wait_for.
            if await asyncio.wait_for(
                    self._idle_semaphore.acquire(),
                    0
            ):
                return
        except TimeoutError:
            pass

        num_coroutines = len(self._coroutines)
        if num_coroutines < self._max_workers:
            coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
            t = asyncio.create_task(
                coro=_worker(
                    weakref.ref(self),
                    self._work_queue
                ),
                name=coroutine_name
            )

            self._coroutines.add(t)

    async def shutdown(self, wait=True, *, cancel_futures=False):
        async with self._shutdown_lock:
            self._shutdown = True

            if cancel_futures:
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # None is an exit signal, given by the shutdown method, when the shutdown method is called
            # will notify the sub-coroutine to stop working and exit the loop
            await self._work_queue.put(None)

        if wait:
            for t in self._coroutines:
                await t

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.shutdown(wait=True)
        return False

以下是 CoroutinePoolExecutor 的使用方式:

import asyncio

from coroutinepoolexecutor import CoroutinePoolExecutor


async def task(i):
    await asyncio.sleep(1)
    print(f"task-{i}")


async def main():
    async with CoroutinePoolExecutor(2) as executor:
        for i in range(10):
            await executor.submit(task(i))

if __name__ == "__main__":
    asyncio.run(main())

我们知道,在线程池中,工作线程一旦创建会不断的领取新的任务并执行,除开 shutdown() 调用,否则对于静态的线程池来讲工作线程不会自己结束。

在上述协程池代码实现中,CoroutinePoolExecutor 类包含了主要的对外调用功能的接口、内部提供了存储 task 的 Queue、工作协程自动生成 name 的计数器、保障协程的信号量锁等等。

而 _worker 函数是工作协程的运行函数,其会在工作协程启动后,不断的从 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具体执行 coro task。

剩下的 _WorkItem 是一个 future 对象与 coro task 的封装器,其功能是解耦 future 对象和 coro task、并在 coro task 运行时和运行后设置 future 的结果。

对于异步循环的思考

在此 CoroutinePoolExecutor 实现后,我其实又有了一个新的思考。Python 的 EventLoop 相较于 Node.js 的 EventLoop 来说其实更加的底层,它有感的暴露了出来。

具体体现在当 Python Event Loop 启动后,如果 main coroutine 停止运行,那么所有的 subtask coroutine 也会停止运行,尤其是对于一些需要清理资源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都会在某些情况显得无措,说的更具体点就是不知道在什么时候调用。

对于这些问题,我们可以继承 BaseEventLoop 自己手动对 EventLoop 的功能进行扩展,如在事件循环关闭之前添加 hook function,甚至可以限制整个 EventLoop 的 max_workers 或者做成动态的可调节 coroutine 数量的 EventLoop 都行。

无论如何,只要心里有想法,就可以去将它实现 .. 学习本身就是一个不断挑战的过程。

标签: none

添加新评论