1+1=10

扬长避短 vs 取长补短

Python 协程(Coroutine)小记(一)

接前面 Python装饰器Python类型提示与注解Python生成器与yield,继续整理Python基础知识。

对于(原生)协程,Python(3.5以及更新版本)给出的定义如下:

Coroutines are a more generalized form of subroutines. Subroutines are entered at one point and exited at another point. Coroutines can be entered, exited, and resumed at many different points. They can be implemented with the async defstatement. See also PEP 492.

翻译过来就是,协程(conroutine)是比子程序(subroutine)更通用形式: 常规子程序在某一点进入并在另一点退出;而协程可以在许多不同的点进入、退出和恢复。 协程可以通过async def语句来实现。

我们先捋一捋和这个协程相关的概念,排除干扰,而后再回到 async def定义的协程。

捋一捋 ?

Python在协程上没少折腾,而且和生成器有些不清不楚。

例子

简单起见,还是先看代码:

import asyncio
import types

def func1():
    print('function')

def func2():
    print('generator')
    yield

async def func3():
    print('coroutine')
    await asyncio.sleep(1)

async def func4():
    print('async generator')
    yield

@types.coroutine
def func5():
    print('generator-based coroutine')
    yield

5个函数分别是:

  • func1:普通的函数
  • func2:生成器(函数)。返回值是一个迭代器(也叫生成器迭代器,或生成器),是 迭代器 + (经典)协程的合体
  • func3:(原生)协程。async def定义,且里面不能有yield。有没有await无所谓
  • func4:异步生成器。async def定义,且必须有yield
  • func5:生成器协程

它们调用方式各异:

async def main():
    func1()
    f2 = func2(); next(f2)
    await func3()
    f4 = func4(); await anext(f4)
    await func5()

asyncio.run(main())

结果如下:

function
generator
coroutine
async generator
generator-based coroutine

将上面函数分为五类,不是拍脑袋说的。这些函数的co_flags不相同,你可以直接输出:

print(func1.__code__.co_flags) #3
print(func2.__code__.co_flags) #35
print(func3.__code__.co_flags) #131
print(func4.__code__.co_flags) #515
print(func5.__code__.co_flags) #291

对照inspect查看可以发现秘密:

  • inspect.CO_GENERATOR:32
  • inspect.CO_COROUTINE:128
  • inspect.CO_ITERABLE_COROUTINE:256
  • inspect.CO_ASYNC_GENERATOR:512
  • inspect.CO_OPTIMIZED:1
  • inspect.CO_NEWLOCALS:2

协程1?协程2? 协程3?

上面一个简短的例子,直接出来三个协程:

  • 经典协程:就是生成器,个人认为是迭代器+(经典)协程的合体。不能被await驱动。
  • 原生协程:就是一般意义协程,使用 async def定义。被await驱动。
  • 生成器协程:使用装饰器types.coroutine 对生成器函数的进行装饰。可被await驱动

另外,

  • 异步生成器:使用async def定义,也和原生协程也密切相关。

要理清它们的关系,或许,我应该放上这张图:

generator-asyncgenerator-coroutine-classes

实际上,__iter__()__await__()返回值都是迭代器(python库中甚至某些类定义中,为了兼容,还有__iter__ = __await__的语句,比如asyncio.Future)。生成器和协程本质上,就是后者去掉了用于实现迭代的__next__()成员。所以我觉得:

协程Coroutine = 生成器Generator - 迭代器Iterator

或者

生成器Generator = 迭代器Iterator + 协程Coroutine

注:本文关注点是原生协程,其他内容只是简单带过。

经典协程-生成器

这个协程,是一种生成器函数,调用者可以通过send()发送数据,通过yield from可以委托其他子协程。但它不能被await驱动。为了和Python3.5 引入的原生协程区分,这个协程在《流畅的Python》一书中称为经典协程。

相关PEPs:

Python 2.2 引入生成器(函数)以及yield语句。生成器函数用于生成 迭代器,迭代器有__iter____next__成员。(Python3.5文档中称其为:生成器迭代器,代码中称其为生成器)。

Python 2.5 对生成器进行了增强,引入了send()throw()close()等成员。 这个操作使其变成了 迭代器 + 协程 的合体。见PEP 342。

Python 3.3 对这个 迭代器 + 协程 的合体,继续进行增强,引入yield from语法,使其可以可以其他子协程。

def firstN(maximum):
    i = 0
    while i < maximum:
        val = (yield i)
        if val is not None:
            i = val
        else:
            i += 1

gen = firstN(3)
print(next(gen))
print(gen.send(0))
print(gen.send(0))
print(next(gen))

原生协程-协程

Python 3.5 引入了使用async def定义的协程函数,内部可以使用await委托子协程,有些类似于yield from。

Python手册中直接用coroutine称呼这个协程。但为与经典协程区分,这个协程在PEP492中被称为原生协程(native coroutine)。

基于生成器的协程

Python 3.5 引入了 types.coroutine 和 asyncio.coroutine 两个装饰生成器函数的装饰器。用于将生成器与await关键词兼容。

Decorator to mark generator-based coroutines. This enables the generator use yield from to call async def coroutines, and also enables the generator to be called by async def coroutines, for instance using an awaitexpression.

注意:asyncio.coroutine这个东西在Python 3.8 中已经废弃。Python 3.11 中已经将其移除!

@asyncio.coroutine
def old_style_coroutine():
    print('Hello')
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

types.coroutine目前还在(它和asyncio.coroutine有一定差异,也无需纠结了):

import asyncio
import types

@types.coroutine
def old_style_coroutine():
    print('Hello')
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

asyncio.run(main())

尽管如此,它不被认可为 coroutine function:

import asyncio
import types

@types.coroutine
def old_style_coroutine():
    print('Hello')
    yield from asyncio.sleep(1)

assert asyncio.iscoroutinefunction(old_style_coroutine) == False

协程 Coroutine

回归正题,看看Python中的(原生)协程。

Python 3.5 引入了使用async def定义的协程函数,await 以及async withasync for表达式。

协程函数(coroutine function)与协程(coroutine)

与生成器(函数)和迭代器 的关系一样:生成器(函数)是用于产生迭代器的函数。使用async def定义的是协程函数,它用于生成协程对象。

async def func3():
    print('coroutine')
    await asyncio.sleep(1)

print(type(func3))
print(asyncio.iscoroutinefunction(func3))
print(asyncio.iscoroutine(func3))

这个func3只是一个函数(协程函数),它自身并不是协程:

<class 'function'>
True
False

它的返回值是协程Coroutine类型。

协程Coroutine与Awaitable

不同于Generator从Iterator派生,Coroutine 是 Awaitable 派生类。

协程对象是awaitable对象。协程执行时,调用__await__()获取一个迭代器,而后遍历该迭代器。

除了协程之外,asyncio中下面两个也都是awaitable对象(但没有从Awaitable派生,鸭子行为):

  • asyncio.Task
  • asyncio.Future

python-asyncio-awaitable-classes

注意:

  • Coroutine:只能被await一次
  • Future:可以被多次await

代码

Awaitable 和 Coroutine的定义在_collections_abc.py中,可以顺便看一眼:

  • Awaitable:派生类需要实现__await__()
  • Coroutine:派生类需要实现__await__()send()throw()
class Awaitable(metaclass=ABCMeta):

    __slots__ = ()

    @abstractmethod
    def __await__(self):
        yield

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Awaitable:
            return _check_methods(C, "__await__")
        return NotImplemented

    __class_getitem__ = classmethod(GenericAlias)


class Coroutine(Awaitable):

    __slots__ = ()

    @abstractmethod
    def send(self, value):
        """Send a value into the coroutine.
        Return next yielded value or raise StopIteration.
        """
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """Raise an exception in the coroutine.
        Return next yielded value or raise StopIteration.
        """
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    def close(self):
        """Raise GeneratorExit inside coroutine.
        """
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("coroutine ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented


Coroutine.register(coroutine)

asyncio

async 和 await 是异步编程的API,本身不依赖于asyncio,也可以用于其他的异步框架。

asyncio 先使用了yield from机制,而后才适配到 async和 await 机制。所以资料有些乱。

  • Python3.3 引入yield from
  • Python 3.4 引入 asyncio 模块
  • Python 3.5 引入 asyncawait
  • Python 3.6 引入异步生成器和异步推导式
  • Python 3.7 将 asyncawait确定为关键字

asyncio 提供了高层(high-level)API:用于执行协程、网络IO、IPC、子进程控制,任务队列等。asyncio 的底层API:提供了事件循环的管理,网络与子进程异步API,系统signal处理,Protocols机制、回调函数与async的桥梁 等。

asyncio东西有点多,只看几个最简单的例子,意思一下

例子1

比如,要执行一个协程,在Python3.7下:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... 1+1=10!')

asyncio.run(main())

在Python 3.11下,使用Runner的上下文管理器:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... 1+1=10!')

with asyncio.Runner() as runner:
    runner.run(main())

使用底层API(获取事件循环对象),get_event_loop()从Python3.12起被废弃:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... 1+1=10!')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
  • 获取事件循环
  • 运行事件循环
  • 关闭事件循环

例子2

执行4个任务,每个任务都分别打印0~10数字,可以使用asyncio.gather()

import asyncio
async def counter(name: str):
    for i in range(10):
        print(f"{name}: {i}")
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(
        counter("task1"),
        counter("task2"),
        counter("task3"),
        counter("task4"),
    )

asyncio.run(main())
  • 使用 async with配合TaskGroup也可以(不用显式使用 await)
async def main():
    async with asyncio.TaskGroup() as tg:
        for n in range(1,5):
            tg.create_task(counter(f"task{n}"))
  • asyncio.create_task将协程 封装为一个Task并调度其执行,并返回 Task 对象。

只使用create_task,还可以这么写:

async def main():
    tasks = [asyncio.create_task(counter(f"task{n}")) for n in range(1,5)]

    while True:
        tasks = [t for t in tasks if not t.done()]
        if tasks:
            await tasks[0]

例子3

操作时间不可控,超时怎么控制?

  • wait_for()可以指定超时时间
import asyncio

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('....!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())
  • 使用async with配合asyncio.timeout()
async def main():
    # Wait for at most 1 second
    try:
        async with asyncio.timeout(1.0):
            await eternity()
    except asyncio.TimeoutError:
        print('timeout!')

例子4

有阻塞操作,需要次线程,怎么处理?

  • 使用 asyncio.to_thread()
import asyncio
import time

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    time.sleep(5)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    async with asyncio.TaskGroup() as tg:
        for i in range(5):
            tg.create_task(asyncio.to_thread(blocking_io))

asyncio.run(main())

参考

Comments