1+1=10

扬长避短 vs 取长补短

C++协程小记-2

接前面 C++ 协程小记,继续学习C++中的协程。

coroutine-cpp20

协程(coroutine)是一种可以挂起自身并被调用者恢复的函数。与从头到尾顺序执行的普通函数/子程序(subroutine)不同,协程允许控制执行的挂起和恢复。这使我们能够编写看起来是同步的代码,但可以高效地处理异步操作而不阻塞调用线程。

例子

还是先从一个简短例子看起:

#include <coroutine>
#include <iostream>

struct Task {
    struct promise_type {
        Task get_return_object() {
            return std::coroutine_handle<promise_type>::from_promise(*this);
        }
        std::suspend_always initial_suspend() {
            return {};
        }
        std::suspend_always final_suspend() noexcept {
            return {};
        }
        void return_void() {}
        void unhandled_exception() {}
    };
    Task(std::coroutine_handle<promise_type> handle): _h{handle} {}

    void resume() {
        _h.resume();
    }
    void destroy() {
        _h.destroy();
    }

    std::coroutine_handle<promise_type> _h;
};

Task myFunc() {
    std::cout << "Start coro\n";
    co_await std::suspend_always{};
    std::cout << "Resume coro\n";
}

int main() {
    Task t = myFunc();
    std::cout << "Not start yet\n";
    t.resume();
    std::cout << "Suspend coro\n";
    t.resume();
    t.destroy();
    return 0;
}

结果:

Not start yet
Start coro
Suspend coro
Resume coro

在C++中,包含 co_awaitco_yieldco_return的函数就是协程函数。可是,为什么要手动定义一个返回值类型,并且要包含一个所谓的promise-type??

C++编译器看到co_xxx这几个关键词后,要做很多额外的事情,才能将这个这个函数变成协程,简单起见,可以理解成:

  • 借助 std::coroutine_traits,从返回值类型(Task)中找到所需要的promise-type类型
  • 创建promise-type对象,并将代码转成对该promies-type对象的调用
  • 通过promies-typeget_return_object()生成待返回对象并返回。
auto *f = new frame<Task::promise_type>();
Task ret = f->promise.get_return_object();
invoke([&]{
    try {
        co_await f->promise.initial_suspend();
        {... co_* ...}
        co_await f->promise.final_suspend();
    } catch(...) {
        if (!initial-await-resume-called)
            throw ;
        f->promies.unhandled_exception();
    }
});
return ret;

注:initial-await-resume-called 前后,异常处理的方式不同。

promise_type 结构体很关键,它允许用户定义了协程的 Promise 接口:

  • initial_suspend(): 如何启动
  • final_suspend(): 如何结束
  • unhandled_exception(): 如何处理异常
  • get_return_object(): 创建外部可用对象

但,如果只是这样的话,肯定没有挂起和恢复的效果...

状态机

协程本身是一个状态机。编译器看到co_xxx这几个关键词后,要实现状态机的代码。

  • co_xxx定义挂起点。状态机需要保存当前状态
  • 状态保存在一个在堆中申请的所谓的协程帧(frame)中
  • 该协程帧句柄类型就是 std::coroutine_handle
  • 协程帧的生命周期可通过我们定义的协程函数返回值类型(Task)进行管理

借助 https://cppinsights.io/,我们可以将上面的例子转换成不带co_xxx的版本:

#include <coroutine>
#include <iostream>

struct Task
{
  struct promise_type
  {
    inline Task get_return_object()
    {
      return Task(std::coroutine_handle<promise_type>::from_promise(*this));
    }

    inline std::suspend_always initial_suspend()
    {
      return {};
    }

    inline std::suspend_always final_suspend() noexcept
    {
      return {};
    }

    inline void return_void()
    {
    }

    inline void unhandled_exception()
    {
    }

    // inline constexpr promise_type() noexcept = default;
  };

  inline Task(std::coroutine_handle<promise_type> handle)
  : _h{std::coroutine_handle<promise_type>{handle}}
  {
  }

  inline void resume()
  {
    this->_h.resume();
  }

  inline void destroy()
  {
    this->_h.destroy();
  }

  std::coroutine_handle<promise_type> _h;
};


struct __myFuncFrame
{
  void (*resume_fn)(__myFuncFrame *);
  void (*destroy_fn)(__myFuncFrame *);
  std::__coroutine_traits_impl<Task>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  std::suspend_always __suspend_30_6;
  std::suspend_always __suspend_32_14;
  std::suspend_always __suspend_30_6_1;
};

Task myFunc()
{
  /* Allocate the frame including the promise */
  /* Note: The actual parameter new is __builtin_coro_size */
  __myFuncFrame * __f = reinterpret_cast<__myFuncFrame *>(operator new(sizeof(__myFuncFrame)));
  __f->__suspend_index = 0;
  __f->__initial_await_suspend_called = false;

  /* Construct the promise. */
  new (&__f->__promise)std::__coroutine_traits_impl<Task>::promise_type{};

  /* Forward declare the resume and destroy function. */
  void __myFuncResume(__myFuncFrame * __f);
  void __myFuncDestroy(__myFuncFrame * __f);

  /* Assign the resume and destroy function pointers. */
  __f->resume_fn = &__myFuncResume;
  __f->destroy_fn = &__myFuncDestroy;

  /* Call the made up function with the coroutine body for initial suspend.
     This function will be called subsequently by coroutine_handle<>::resume()
     which calls __builtin_coro_resume(__handle_) */
  __myFuncResume(__f);


  return __f->__promise.get_return_object();
}

/* This function invoked by coroutine_handle<>::resume() */
void __myFuncResume(__myFuncFrame * __f)
{
  try 
  {
    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_myFunc_1;
      case 2: goto __resume_myFunc_2;
    }

    /* co_await insights.cpp:30 */
    __f->__suspend_30_6 = __f->__promise.initial_suspend();
    if(!__f->__suspend_30_6.await_ready()) {
      __f->__suspend_30_6.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 1;
      __f->__initial_await_suspend_called = true;
      return;
    } 

    __resume_myFunc_1:
    __f->__suspend_30_6.await_resume();
    std::operator<<(std::cout, "Start coro\n");

    /* co_await insights.cpp:32 */
    __f->__suspend_32_14 = std::suspend_always{};
    if(!__f->__suspend_32_14.await_ready()) {
      __f->__suspend_32_14.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 2;
      return;
    } 

    __resume_myFunc_2:
    __f->__suspend_32_14.await_resume();
    std::operator<<(std::cout, "Resume coro\n");
    goto __final_suspend;
  } catch(...) {
    if(!__f->__initial_await_suspend_called) {
      throw ;
    } 

    __f->__promise.unhandled_exception();
  }

  __final_suspend:

  /* co_await insights.cpp:30 */
  __f->__suspend_30_6_1 = __f->__promise.final_suspend();
  if(!__f->__suspend_30_6_1.await_ready()) {
    __f->__suspend_30_6_1.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
  } 

  ;
}

/* This function invoked by coroutine_handle<>::destroy() */
void __myFuncDestroy(__myFuncFrame * __f)
{
  /* destroy all variables with dtors */
  __f->~__myFuncFrame();
  /* Deallocating the coroutine frame */
  /* Note: The actual argument to delete is __builtin_coro_frame with the promise as parameter */
  operator delete(static_cast<void *>(__f));
}


int main()
{
  Task t = myFunc();
  std::operator<<(std::cout, "Not start yet\n");
  t.resume();
  std::operator<<(std::cout, "Suspend coro\n");
  t.resume();
  t.destroy();
  return 0;
}

可以看到,变换后的myFunc内部,其实都没有什么东西了:

  • 创建协程帧(__myFuncFrame) 并初始化。
  • 通过promise-type创建并返回 Task
Task myFunc()
{
  /* Allocate the frame including the promise */
  /* Note: The actual parameter new is __builtin_coro_size */
  __myFuncFrame * __f = reinterpret_cast<__myFuncFrame *>(operator new(sizeof(__myFuncFrame)));
  __f->__suspend_index = 0;
  __f->__initial_await_suspend_called = false;

  /* Construct the promise. */
  new (&__f->__promise)std::__coroutine_traits_impl<Task>::promise_type{};

  /* Forward declare the resume and destroy function. */
  void __myFuncResume(__myFuncFrame * __f);
  void __myFuncDestroy(__myFuncFrame * __f);

  /* Assign the resume and destroy function pointers. */
  __f->resume_fn = &__myFuncResume;
  __f->destroy_fn = &__myFuncDestroy;

  /* Call the made up function with the coroutine body for initial suspend.
     This function will be called subsequently by coroutine_handle<>::resume()
     which calls __builtin_coro_resume(__handle_) */
  __myFuncResume(__f);


  return __f->__promise.get_return_object();
}

但是,协程帧中这几个成员(包括其位置都有讲究):

  • resume_fn:状态机的逻辑基本都在它里面
  • destroy_fn:状态机的销毁逻辑,不同状态下,保存不同的变量,需要分别考虑。
  • __promise:定义的promise对象
  • __suspend_index:挂起点位置,从哪儿恢复(resume)或销毁(destroy)
  • __initial_await_suspend_called:异常处理的逻辑不同
  • ...
struct __myFuncFrame
{
  void (*resume_fn)(__myFuncFrame *);
  void (*destroy_fn)(__myFuncFrame *);
  std::__coroutine_traits_impl<Task>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  std::suspend_always __suspend_30_6;
  std::suspend_always __suspend_32_14;
  std::suspend_always __suspend_30_6_1;
};

Task 与 Generator

协程函数的返回值可以分为这两种

  • Task:做一个job,不关心返回值
  • Generator:做一个job,并通过co_yieldco_return返回一个值。
关键词 动作 状态
co_yield output 挂起
co_return output 结束
co_await input 挂起

co_await 语句

value = co_await expr;

展开后伪代码

auto awaiter = operator co_await(expr);
if (!awaiter.await_ready()) {
    awaiter.await_suspend(coroutine_handle);
    <resume here when coroutine_handle.resume() is called>
}

value = awaiter.await_resume();

注意,对于Awaiter:

  • bool await_ready():继续或挂起,false为挂起
  • await_suspend(std::coroutine_handle<> h)?,有几种不同的返回值:
    • void await_suspend(std::coroutine_handle<> h):挂起
    • bool await_suspend(std::coroutine_handle<> h):挂起,如果返回值为false则resume
    • std::coroutine_handle<T> await_suspend(std::coroutine_handle<> h):挂起,返回的句柄立即被resume
  • await_resume()
    • T await_resume():恢复时,T是co_await返回值
    • void await_resume():不需要/不使用返回值

co_yield 语句

co_yield expr;

等价于:

co_await promise.yield_value(expr)

co_yieldco_await一样,可以有返回值。不过:

对于std::generator来说,yield_value() 返回的是std::suspend_always这个awaiter,该awaiter的await_resume()返回值是void。所以std::generator下的co_yield不能有返回值。

co_return 语句

co_return;co_return v;

分别对应 promise.return_void()promise.return_value(v),但是它们都不返回Awaitable,也就是不能用暂停协程。

参考

Comments