1+1=10

记记笔记,放松一下...

C++协程小记-2

接前面 C++ 协程小记,继续学习C++中的协程。

coroutine-cpp20

协程(coroutine)是一种可以挂起自身并被调用者恢复的函数。与从头到尾顺序执行的普通函数/子程序(subroutine)不同,协程允许控制执行的挂起和恢复。这使我们能够编写看起来是同步的代码,但可以高效地处理异步操作而不阻塞调用线程。

例子

还是先从一个简短例子看起:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <coroutine>
#include <iostream>

struct Task {
    struct promise_type {
        Task get_return_object() {
            return std::coroutine_handle<promise_type>::from_promise(*this);
        }
        std::suspend_always initial_suspend() {
            return {};
        }
        std::suspend_always final_suspend() noexcept {
            return {};
        }
        void return_void() {}
        void unhandled_exception() {}
    };
    Task(std::coroutine_handle<promise_type> handle): _h{handle} {}

    void resume() {
        _h.resume();
    }
    void destroy() {
        _h.destroy();
    }

    std::coroutine_handle<promise_type> _h;
};

Task myFunc() {
    std::cout << "Start coro\n";
    co_await std::suspend_always{};
    std::cout << "Resume coro\n";
}

int main() {
    Task t = myFunc();
    std::cout << "Not start yet\n";
    t.resume();
    std::cout << "Suspend coro\n";
    t.resume();
    t.destroy();
    return 0;
}

结果:

1
2
3
4
Not start yet
Start coro
Suspend coro
Resume coro

在C++中,包含 co_awaitco_yieldco_return的函数就是协程函数。可是,为什么要手动定义一个返回值类型,并且要包含一个所谓的promise-type??

C++编译器看到co_xxx这几个关键词后,要做很多额外的事情,才能将这个这个函数变成协程,简单起见,可以理解成:

  • 借助 std::coroutine_traits,从返回值类型(Task)中找到所需要的promise-type类型
  • 创建promise-type对象,并将代码转成对该promies-type对象的调用
  • 通过promies-typeget_return_object()生成待返回对象并返回。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
auto *f = new frame<Task::promise_type>();
Task ret = f->promise.get_return_object();
invoke([&]{
    try {
        co_await f->promise.initial_suspend();
        {... co_* ...}
        co_await f->promise.final_suspend();
    } catch(...) {
        if (!initial-await-resume-called)
            throw ;
        f->promies.unhandled_exception();
    }
});
return ret;

注:initial-await-resume-called 前后,异常处理的方式不同。

promise_type 结构体很关键,它允许用户定义了协程的 Promise 接口:

  • initial_suspend(): 如何启动
  • final_suspend(): 如何结束
  • unhandled_exception(): 如何处理异常
  • get_return_object(): 创建外部可用对象

但,如果只是这样的话,肯定没有挂起和恢复的效果...

状态机

协程本身是一个状态机。编译器看到co_xxx这几个关键词后,要实现状态机的代码。

  • co_xxx定义挂起点。状态机需要保存当前状态
  • 状态保存在一个在堆中申请的所谓的协程帧(frame)中
  • 该协程帧句柄类型就是 std::coroutine_handle
  • 协程帧的生命周期可通过我们定义的协程函数返回值类型(Task)进行管理

借助 https://cppinsights.io/,我们可以将上面的例子转换成不带co_xxx的版本:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#include <coroutine>
#include <iostream>

struct Task
{
  struct promise_type
  {
    inline Task get_return_object()
    {
      return Task(std::coroutine_handle<promise_type>::from_promise(*this));
    }

    inline std::suspend_always initial_suspend()
    {
      return {};
    }

    inline std::suspend_always final_suspend() noexcept
    {
      return {};
    }

    inline void return_void()
    {
    }

    inline void unhandled_exception()
    {
    }

    // inline constexpr promise_type() noexcept = default;
  };

  inline Task(std::coroutine_handle<promise_type> handle)
  : _h{std::coroutine_handle<promise_type>{handle}}
  {
  }

  inline void resume()
  {
    this->_h.resume();
  }

  inline void destroy()
  {
    this->_h.destroy();
  }

  std::coroutine_handle<promise_type> _h;
};


struct __myFuncFrame
{
  void (*resume_fn)(__myFuncFrame *);
  void (*destroy_fn)(__myFuncFrame *);
  std::__coroutine_traits_impl<Task>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  std::suspend_always __suspend_30_6;
  std::suspend_always __suspend_32_14;
  std::suspend_always __suspend_30_6_1;
};

Task myFunc()
{
  /* Allocate the frame including the promise */
  /* Note: The actual parameter new is __builtin_coro_size */
  __myFuncFrame * __f = reinterpret_cast<__myFuncFrame *>(operator new(sizeof(__myFuncFrame)));
  __f->__suspend_index = 0;
  __f->__initial_await_suspend_called = false;

  /* Construct the promise. */
  new (&__f->__promise)std::__coroutine_traits_impl<Task>::promise_type{};

  /* Forward declare the resume and destroy function. */
  void __myFuncResume(__myFuncFrame * __f);
  void __myFuncDestroy(__myFuncFrame * __f);

  /* Assign the resume and destroy function pointers. */
  __f->resume_fn = &__myFuncResume;
  __f->destroy_fn = &__myFuncDestroy;

  /* Call the made up function with the coroutine body for initial suspend.
     This function will be called subsequently by coroutine_handle<>::resume()
     which calls __builtin_coro_resume(__handle_) */
  __myFuncResume(__f);


  return __f->__promise.get_return_object();
}

/* This function invoked by coroutine_handle<>::resume() */
void __myFuncResume(__myFuncFrame * __f)
{
  try 
  {
    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_myFunc_1;
      case 2: goto __resume_myFunc_2;
    }

    /* co_await insights.cpp:30 */
    __f->__suspend_30_6 = __f->__promise.initial_suspend();
    if(!__f->__suspend_30_6.await_ready()) {
      __f->__suspend_30_6.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 1;
      __f->__initial_await_suspend_called = true;
      return;
    } 

    __resume_myFunc_1:
    __f->__suspend_30_6.await_resume();
    std::operator<<(std::cout, "Start coro\n");

    /* co_await insights.cpp:32 */
    __f->__suspend_32_14 = std::suspend_always{};
    if(!__f->__suspend_32_14.await_ready()) {
      __f->__suspend_32_14.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 2;
      return;
    } 

    __resume_myFunc_2:
    __f->__suspend_32_14.await_resume();
    std::operator<<(std::cout, "Resume coro\n");
    goto __final_suspend;
  } catch(...) {
    if(!__f->__initial_await_suspend_called) {
      throw ;
    } 

    __f->__promise.unhandled_exception();
  }

  __final_suspend:

  /* co_await insights.cpp:30 */
  __f->__suspend_30_6_1 = __f->__promise.final_suspend();
  if(!__f->__suspend_30_6_1.await_ready()) {
    __f->__suspend_30_6_1.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
  } 

  ;
}

/* This function invoked by coroutine_handle<>::destroy() */
void __myFuncDestroy(__myFuncFrame * __f)
{
  /* destroy all variables with dtors */
  __f->~__myFuncFrame();
  /* Deallocating the coroutine frame */
  /* Note: The actual argument to delete is __builtin_coro_frame with the promise as parameter */
  operator delete(static_cast<void *>(__f));
}


int main()
{
  Task t = myFunc();
  std::operator<<(std::cout, "Not start yet\n");
  t.resume();
  std::operator<<(std::cout, "Suspend coro\n");
  t.resume();
  t.destroy();
  return 0;
}

可以看到,变换后的myFunc内部,其实都没有什么东西了:

  • 创建协程帧(__myFuncFrame) 并初始化。
  • 通过promise-type创建并返回 Task
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Task myFunc()
{
  /* Allocate the frame including the promise */
  /* Note: The actual parameter new is __builtin_coro_size */
  __myFuncFrame * __f = reinterpret_cast<__myFuncFrame *>(operator new(sizeof(__myFuncFrame)));
  __f->__suspend_index = 0;
  __f->__initial_await_suspend_called = false;

  /* Construct the promise. */
  new (&__f->__promise)std::__coroutine_traits_impl<Task>::promise_type{};

  /* Forward declare the resume and destroy function. */
  void __myFuncResume(__myFuncFrame * __f);
  void __myFuncDestroy(__myFuncFrame * __f);

  /* Assign the resume and destroy function pointers. */
  __f->resume_fn = &__myFuncResume;
  __f->destroy_fn = &__myFuncDestroy;

  /* Call the made up function with the coroutine body for initial suspend.
     This function will be called subsequently by coroutine_handle<>::resume()
     which calls __builtin_coro_resume(__handle_) */
  __myFuncResume(__f);


  return __f->__promise.get_return_object();
}

但是,协程帧中这几个成员(包括其位置都有讲究):

  • resume_fn:状态机的逻辑基本都在它里面
  • destroy_fn:状态机的销毁逻辑,不同状态下,保存不同的变量,需要分别考虑。
  • __promise:定义的promise对象
  • __suspend_index:挂起点位置,从哪儿恢复(resume)或销毁(destroy)
  • __initial_await_suspend_called:异常处理的逻辑不同
  • ...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct __myFuncFrame
{
  void (*resume_fn)(__myFuncFrame *);
  void (*destroy_fn)(__myFuncFrame *);
  std::__coroutine_traits_impl<Task>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  std::suspend_always __suspend_30_6;
  std::suspend_always __suspend_32_14;
  std::suspend_always __suspend_30_6_1;
};

Task 与 Generator

协程函数的返回值可以分为这两种

  • Task:做一个job,不关心返回值
  • Generator:做一个job,并通过co_yieldco_return返回一个值。
关键词 动作 状态
co_yield output 挂起
co_return output 结束
co_await input 挂起

co_await 语句

1
value = co_await expr;

展开后伪代码

1
2
3
4
5
6
7
auto awaiter = operator co_await(expr);
if (!awaiter.await_ready()) {
    awaiter.await_suspend(coroutine_handle);
    <resume here when coroutine_handle.resume() is called>
}

value = awaiter.await_resume();

注意,对于Awaiter:

  • bool await_ready():继续或挂起,false为挂起
  • await_suspend(std::coroutine_handle<> h)?,有几种不同的返回值:
    • void await_suspend(std::coroutine_handle<> h):挂起
    • bool await_suspend(std::coroutine_handle<> h):挂起,如果返回值为false则resume
    • std::coroutine_handle<T> await_suspend(std::coroutine_handle<> h):挂起,返回的句柄立即被resume
  • await_resume()
    • T await_resume():恢复时,T是co_await返回值
    • void await_resume():不需要/不使用返回值

co_yield 语句

1
co_yield expr;

等价于:

1
co_await promise.yield_value(expr)

co_yieldco_await一样,可以有返回值。不过:

对于std::generator来说,yield_value() 返回的是std::suspend_always这个awaiter,该awaiter的await_resume()返回值是void。所以std::generator下的co_yield不能有返回值。

co_return 语句

co_return;co_return v;

分别对应 promise.return_void()promise.return_value(v),但是它们都不返回Awaitable,也就是不能用暂停协程。

参考