接前面 C++ 协程小记,继续学习C++中的协程。
协程(coroutine)是一种可以挂起自身并被调用者恢复的函数。与从头到尾顺序执行的普通函数/子程序(subroutine)不同,协程允许控制执行的挂起和恢复。这使我们能够编写看起来是同步的代码,但可以高效地处理异步操作而不阻塞调用线程。
例子
还是先从一个简短例子看起:
#include <coroutine>
#include <iostream>
struct Task {
struct promise_type {
Task get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
std::suspend_always initial_suspend() {
return {};
}
std::suspend_always final_suspend() noexcept {
return {};
}
void return_void() {}
void unhandled_exception() {}
};
Task(std::coroutine_handle<promise_type> handle): _h{handle} {}
void resume() {
_h.resume();
}
void destroy() {
_h.destroy();
}
std::coroutine_handle<promise_type> _h;
};
Task myFunc() {
std::cout << "Start coro\n";
co_await std::suspend_always{};
std::cout << "Resume coro\n";
}
int main() {
Task t = myFunc();
std::cout << "Not start yet\n";
t.resume();
std::cout << "Suspend coro\n";
t.resume();
t.destroy();
return 0;
}
结果:
Not start yet
Start coro
Suspend coro
Resume coro
在C++中,包含 co_await
、co_yield
或 co_return
的函数就是协程函数。可是,为什么要手动定义一个返回值类型,并且要包含一个所谓的promise-type
??
C++编译器看到co_xxx
这几个关键词后,要做很多额外的事情,才能将这个这个函数变成协程,简单起见,可以理解成:
- 借助
std::coroutine_traits
,从返回值类型(Task)中找到所需要的promise-type
类型 - 创建
promise-type
对象,并将代码转成对该promies-type
对象的调用 - 通过
promies-type
的get_return_object()
生成待返回对象并返回。
auto *f = new frame<Task::promise_type>();
Task ret = f->promise.get_return_object();
invoke([&]{
try {
co_await f->promise.initial_suspend();
{... co_* ...}
co_await f->promise.final_suspend();
} catch(...) {
if (!initial-await-resume-called)
throw ;
f->promies.unhandled_exception();
}
});
return ret;
注:initial-await-resume-called 前后,异常处理的方式不同。
promise_type
结构体很关键,它允许用户定义了协程的 Promise 接口:
initial_suspend()
: 如何启动final_suspend()
: 如何结束unhandled_exception()
: 如何处理异常get_return_object()
: 创建外部可用对象
但,如果只是这样的话,肯定没有挂起和恢复的效果...
状态机
协程本身是一个状态机。编译器看到co_xxx
这几个关键词后,要实现状态机的代码。
co_xxx
定义挂起点。状态机需要保存当前状态- 状态保存在一个在堆中申请的所谓的协程帧(frame)中
- 该协程帧句柄类型就是
std::coroutine_handle
- 协程帧的生命周期可通过我们定义的协程函数返回值类型(Task)进行管理
借助 https://cppinsights.io/,我们可以将上面的例子转换成不带co_xxx
的版本:
#include <coroutine>
#include <iostream>
struct Task
{
struct promise_type
{
inline Task get_return_object()
{
return Task(std::coroutine_handle<promise_type>::from_promise(*this));
}
inline std::suspend_always initial_suspend()
{
return {};
}
inline std::suspend_always final_suspend() noexcept
{
return {};
}
inline void return_void()
{
}
inline void unhandled_exception()
{
}
// inline constexpr promise_type() noexcept = default;
};
inline Task(std::coroutine_handle<promise_type> handle)
: _h{std::coroutine_handle<promise_type>{handle}}
{
}
inline void resume()
{
this->_h.resume();
}
inline void destroy()
{
this->_h.destroy();
}
std::coroutine_handle<promise_type> _h;
};
struct __myFuncFrame
{
void (*resume_fn)(__myFuncFrame *);
void (*destroy_fn)(__myFuncFrame *);
std::__coroutine_traits_impl<Task>::promise_type __promise;
int __suspend_index;
bool __initial_await_suspend_called;
std::suspend_always __suspend_30_6;
std::suspend_always __suspend_32_14;
std::suspend_always __suspend_30_6_1;
};
Task myFunc()
{
/* Allocate the frame including the promise */
/* Note: The actual parameter new is __builtin_coro_size */
__myFuncFrame * __f = reinterpret_cast<__myFuncFrame *>(operator new(sizeof(__myFuncFrame)));
__f->__suspend_index = 0;
__f->__initial_await_suspend_called = false;
/* Construct the promise. */
new (&__f->__promise)std::__coroutine_traits_impl<Task>::promise_type{};
/* Forward declare the resume and destroy function. */
void __myFuncResume(__myFuncFrame * __f);
void __myFuncDestroy(__myFuncFrame * __f);
/* Assign the resume and destroy function pointers. */
__f->resume_fn = &__myFuncResume;
__f->destroy_fn = &__myFuncDestroy;
/* Call the made up function with the coroutine body for initial suspend.
This function will be called subsequently by coroutine_handle<>::resume()
which calls __builtin_coro_resume(__handle_) */
__myFuncResume(__f);
return __f->__promise.get_return_object();
}
/* This function invoked by coroutine_handle<>::resume() */
void __myFuncResume(__myFuncFrame * __f)
{
try
{
/* Create a switch to get to the correct resume point */
switch(__f->__suspend_index) {
case 0: break;
case 1: goto __resume_myFunc_1;
case 2: goto __resume_myFunc_2;
}
/* co_await insights.cpp:30 */
__f->__suspend_30_6 = __f->__promise.initial_suspend();
if(!__f->__suspend_30_6.await_ready()) {
__f->__suspend_30_6.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 1;
__f->__initial_await_suspend_called = true;
return;
}
__resume_myFunc_1:
__f->__suspend_30_6.await_resume();
std::operator<<(std::cout, "Start coro\n");
/* co_await insights.cpp:32 */
__f->__suspend_32_14 = std::suspend_always{};
if(!__f->__suspend_32_14.await_ready()) {
__f->__suspend_32_14.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 2;
return;
}
__resume_myFunc_2:
__f->__suspend_32_14.await_resume();
std::operator<<(std::cout, "Resume coro\n");
goto __final_suspend;
} catch(...) {
if(!__f->__initial_await_suspend_called) {
throw ;
}
__f->__promise.unhandled_exception();
}
__final_suspend:
/* co_await insights.cpp:30 */
__f->__suspend_30_6_1 = __f->__promise.final_suspend();
if(!__f->__suspend_30_6_1.await_ready()) {
__f->__suspend_30_6_1.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
}
;
}
/* This function invoked by coroutine_handle<>::destroy() */
void __myFuncDestroy(__myFuncFrame * __f)
{
/* destroy all variables with dtors */
__f->~__myFuncFrame();
/* Deallocating the coroutine frame */
/* Note: The actual argument to delete is __builtin_coro_frame with the promise as parameter */
operator delete(static_cast<void *>(__f));
}
int main()
{
Task t = myFunc();
std::operator<<(std::cout, "Not start yet\n");
t.resume();
std::operator<<(std::cout, "Suspend coro\n");
t.resume();
t.destroy();
return 0;
}
可以看到,变换后的myFunc内部,其实都没有什么东西了:
- 创建协程帧(
__myFuncFrame
) 并初始化。 - 通过
promise-type
创建并返回 Task
Task myFunc()
{
/* Allocate the frame including the promise */
/* Note: The actual parameter new is __builtin_coro_size */
__myFuncFrame * __f = reinterpret_cast<__myFuncFrame *>(operator new(sizeof(__myFuncFrame)));
__f->__suspend_index = 0;
__f->__initial_await_suspend_called = false;
/* Construct the promise. */
new (&__f->__promise)std::__coroutine_traits_impl<Task>::promise_type{};
/* Forward declare the resume and destroy function. */
void __myFuncResume(__myFuncFrame * __f);
void __myFuncDestroy(__myFuncFrame * __f);
/* Assign the resume and destroy function pointers. */
__f->resume_fn = &__myFuncResume;
__f->destroy_fn = &__myFuncDestroy;
/* Call the made up function with the coroutine body for initial suspend.
This function will be called subsequently by coroutine_handle<>::resume()
which calls __builtin_coro_resume(__handle_) */
__myFuncResume(__f);
return __f->__promise.get_return_object();
}
但是,协程帧中这几个成员(包括其位置都有讲究):
resume_fn
:状态机的逻辑基本都在它里面destroy_fn
:状态机的销毁逻辑,不同状态下,保存不同的变量,需要分别考虑。__promise
:定义的promise对象__suspend_index
:挂起点位置,从哪儿恢复(resume)或销毁(destroy)__initial_await_suspend_called
:异常处理的逻辑不同- ...
struct __myFuncFrame
{
void (*resume_fn)(__myFuncFrame *);
void (*destroy_fn)(__myFuncFrame *);
std::__coroutine_traits_impl<Task>::promise_type __promise;
int __suspend_index;
bool __initial_await_suspend_called;
std::suspend_always __suspend_30_6;
std::suspend_always __suspend_32_14;
std::suspend_always __suspend_30_6_1;
};
Task 与 Generator
协程函数的返回值可以分为这两种
- Task:做一个job,不关心返回值
- Generator:做一个job,并通过
co_yield
或co_return
返回一个值。
关键词 | 动作 | 状态 |
---|---|---|
co_yield |
output | 挂起 |
co_return |
output | 结束 |
co_await |
input | 挂起 |
co_await 语句
value = co_await expr;
展开后伪代码
auto awaiter = operator co_await(expr);
if (!awaiter.await_ready()) {
awaiter.await_suspend(coroutine_handle);
<resume here when coroutine_handle.resume() is called>
}
value = awaiter.await_resume();
注意,对于Awaiter:
bool await_ready()
:继续或挂起,false为挂起await_suspend(std::coroutine_handle<> h)?
,有几种不同的返回值:void await_suspend(std::coroutine_handle<> h)
:挂起bool await_suspend(std::coroutine_handle<> h)
:挂起,如果返回值为false则resumestd::coroutine_handle<T> await_suspend(std::coroutine_handle<> h)
:挂起,返回的句柄立即被resume
await_resume()
:T await_resume()
:恢复时,T是co_await
返回值void await_resume()
:不需要/不使用返回值
co_yield 语句
co_yield expr;
等价于:
co_await promise.yield_value(expr)
co_yield
和co_await
一样,可以有返回值。不过:
对于std::generator来说,yield_value()
返回的是std::suspend_always
这个awaiter,该awaiter的await_resume()
返回值是void。所以std::generator下的co_yield
不能有返回值。
co_return 语句
co_return;
和 co_return v;
分别对应 promise.return_void()
与promise.return_value(v)
,但是它们都不返回Awaitable,也就是不能用暂停协程。
参考
- https://lewissbaker.github.io/2022/08/27/understanding-the-compiler-transform
- https://itnext.io/c-20-coroutines-complete-guide-7c3fc08db89d
- Understanding C++ coroutines by example
- https://cppinsights.io/
- https://www.modernescpp.com/index.php/a-concise-introduction-to-coroutines-by-dian-lun-li/
- 万字好文:从无栈协程到C++异步框架!