Coroutine internals
上次研究了Fiber,这次结合C++ Coroutines TS(N4680)和folly::coro::Baton,看看Coroutine~
Coroutines TS
- 三个新关键字:co_await,co_yield,co_return
- 一些新类型:
    - coroutine_handle
- coroutine_traits
- suspend_always
- suspend_never
 
- 协程的基础机制:基础库的开发者可以通过这个机制与协程交互并自定义其行为
- 基于协程的编写异步代码的方式
然而,Coroutines TS 提供的更像是协程的底层工具,这些工具很难被直接使用。相反,基础库的编写者可以基于这些底层工具,提供更加简单易用的高级抽象,比如 cppcoro 或者 folly::coro。
比如,Coroutines TS 实际上并没有定义协程的语义:
- 它没有定义如何生成返回给调用者的值。
- 它没有定义如何处理传递给co_return语句的返回值,或者如何处理从协程传播出去的异常。
- 它没有定义应该在哪个线程上恢复协程。
相反,它为基础库提供了一种通用机制,基础库通过实现符合特定接口的类型来定制化协程的行为。因此我们可以拓展出许多不同类型的协程,分别用于各种不同的场合。例如,你可以定义一个异步生成单个值的协程,或者一个 lazily 生成一系列值的协程。
Coroutines TS 定义了两种接口:Promise和Awaitable。
Promise接口可以自定义协程本身的行为。基础库编写者能够自定义:
- 调用协程的行为
- 协程返回时的行为(无论是通过正常方式还是通过未处理的异常)
- 协程中任何co_await或co_yield表达式的行为。
Awaitable接口则指定了co_await一个表达式时的语义。当我们co_await一个表达式时,代码将被转换为对Awaitable对象上的一系列方法的调用,这些方法允许它指定:
- 是否暂停当前协程
- 在挂起后执行某些逻辑以安排之后协程恢复
- 在协程恢复后执行某些逻辑以产生co_await表达式的结果
这一篇我们主要先关注Awaitable接口。
Suspend a coroutine
Coroutines TS引入了一个新的单目运算符co_await。co_await只能在协程使用,这句话有点像废话。因为根据定义,任何包含co_await运算符的函数体都将被编译为协程。co_await <expression>时会涉及两个类型:Awaitable 和 Awaiter。
- 支持co_await运算符的类型称为Awaitable,有一些介绍中也称为 Awaitable concept。
- Awaiter则是任何实现了以下三个特殊接口的类型,在- co_await <expr>时会调用这几个特殊接口:- await_ready
- await_suspend
- await_resume
 
我们看下编译器是如何处理co_await,并最终将协程挂起的:
- 首先编译器会获取Awaitable对象,这里不展开过多介绍,详见Coroutines TS 5.3.8(3)。
template<typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
  if constexpr (has_any_await_transform_member_v<P>)
    return promise.await_transform(static_cast<T&&>(expr));
  else
    return static_cast<T&&>(expr);
}
- 根据Awaitable对象,获取Awaiter对象。
template<typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
  if constexpr (has_member_operator_co_await_v<Awaitable>)
    return static_cast<Awaitable&&>(awaitable).operator co_await();
  else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
    return operator co_await(static_cast<Awaitable&&>(awaitable));
  else
    return static_cast<Awaitable&&>(awaitable);
}
- 等待Awaiter。这一步是co_await的核心流程,其中会调用Awaiter实现的三个特殊方法。
整个co_await <expr>会被转换为类似下面的代码:
{
  auto&& value = <expr>;
  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::coroutine_handle<P>;
    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(p)));
    <suspend-coroutine>
    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(p));
      <return-to-caller-or-resumer>
    }
    else
    {
      static_assert(
         std::is_same_v<await_suspend_result_t, bool>,
         "await_suspend() must return 'void' or 'bool'.");
      if (awaiter.await_suspend(handle_t::from_promise(p)))
      {
        <return-to-caller-or-resumer>
      }
    }
    <resume-point>
  }
  return awaiter.await_resume();
}
首先是通过await_ready判断Awaiter是否已经完成异步操作,如果已经完成则不需要再将协程挂起、恢复。
如果没有完成,此时就会进到<suspend-coroutine>,编译器会生成一些代码来保存协程的当前状态并准备恢复。会将<resume-point>的位置、协程的形参、以及当前寄存器中的值保存到协程帧中(即coroutine frame,一般是经过动态分配到堆上)。此时协程就已经处于挂起状态了。
在返回到调用方或者恢复方之前,编译器生成的代码还会调用await_suspend,这个函数是第一个可以观测到协程被挂起的地方。await_suspend负责在操作完成后的某个时间点安排协程恢复或销毁。如果await_suspend返回false,则会将协程在当前线程上立即恢复。
协程一旦挂起之后,协程就可以通过
coroutine_handle进行恢复或者销毁(具体方式后面coroutine_handle的部分会介绍)。
await_suspend有两种返回值类型:当对await_suspend调用返回时,await_suspend返回void的版本会无条件地将执行返还给协程的调用方,而返回bool的版本允许Awaiter对象有条件地立即恢复协程,而无需返回给调用方或者恢复方。比如co_await的这个异步操作有时候可以同步完成时,如果在co_await时已经完成了,那么await_suspend就可以返回false,进而使协程立即恢复并继续执行。
如果的确需要等待,那么就会进入到<return-to-caller-or-resumer>。此时会将执行返还给协程的调用方,具体返还的形式就是将协程的栈帧pop,并恢复调用方的栈帧,注意此时coroutine frame是仍然存在的。
当被挂起的协程被恢复时,会被恢复到<resume-point>处,之后就会调用await_resume去获取co_await的结果。await_resume返回值将成为co_await <expr>的结果。await_resume方法也可能抛出异常,在这种情况下,异常会从co_await表达式中传播出去。另外,如果异常从 await_suspend中抛出异常,则协程将自动恢复,异常也会从co_await 表达式中传播出去,但不会调用await_resume。
上面的流程就完成了协程的挂起,那么我们如何恢复这个协程呢?
Resume a coroutine
细心的你在上面流程中可能也注意到,在await_suspend时需要传入一个coroutine_handle,它主要作用就是用来恢复或者销毁协程。coroutine_handle内部持有一个coroutine frame的指针(注意它不是coroutine frame的所有者)。
coroutine_handle的主要接口如下:
template <>
struct coroutine_handle<void> {
    coroutine_handle() noexcept = default;
    coroutine_handle(nullptr_t) noexcept;
    coroutine_handle& operator=(nullptr_t) noexcept;
    
    explicit operator bool() const noexcept;
    static coroutine_handle from_address(void* a) noexcept;
    void* to_address() const noexcept;
    void operator()() const;
    void resume() const;
    void destroy();
    bool done() const;
};
对于不需要返回值的协程,主要提供了几类接口:
- 构造
- operator bool()用于检查- coroutine_handle是否关联了一个协程
- from_address和- to_address用于将- coroutine_handle和一个函数指针之间进行转换,主要是为了兼容C-style API。
- operator()和- resume用于恢复一个协程继续执行。调用这个接口之后,协程会继续执行,直到协程再次挂起并运行到- <return-to-caller-or-resumer>。
- destroy用于调用方在协程任何一次挂起之后,可以直接销毁协程。内部实际是销毁协程对应的coroutine frame。
- done用于检查协程是否执行完
而对于需要返回值的协程,额外提供了两个接口:
template <typename Promise>
struct coroutine_handle : coroutine_handle<void> {
    Promise& promise() const noexcept;
    static coroutine_handle from_promise(Promise&) noexcept;
};
事实上,这其中有一部分接口,是一般的开发人员不会调用也不不应该调用的:
- destroy
- promise
- from_promise
这些接口一般都是基础库的编写者才需要用到的接口,我们在绝大多数情况下,应该把这些接口视为协程的内部实现。
folly::coro::Baton
下面我们就看看一个folly::coro协程库是怎么利用C++ Coroutines TS提供的底层工具,构建一个基于协程的Baton。关于Baton的用法直接参照下面的注释:
/// A baton is a synchronisation primitive for coroutines that allows a
/// coroutine to co_await the baton and suspend until the baton is posted by
/// some thread via a call to .post().
///
/// This primitive is typically used in the construction of larger library types
/// rather than directly in user code.
///
/// As a primitive, this is not cancellation-aware.
///
/// The Baton supports being awaited by multiple coroutines at a time. If the
/// baton is not ready at the time it is awaited then an awaiting coroutine
/// suspends. All suspended coroutines waiting for the baton to be posted will
/// be resumed when some thread next calls .post().
///
/// Example usage:
///
///   folly::coro::Baton baton;
///   std::string sharedValue;
///
///   folly::coro::Task<void> consumer()
///   {
///     // Wait until the baton is posted.
///     co_await baton;
///
///     // Now safe to read shared state.
///     std::cout << sharedValue << std::cout;
///   }
///
///   void producer()
///   {
///     // Write to shared state
///     sharedValue = "some result";
///
///     // Publish the value by 'posting' the baton.
///     // This will resume the consumer if it was currently suspended.
///     baton.post();
///   }
简单来说就是个生产者消费者模型,消费者通过co_await等待Baton变成ready状态,如果Baton没有ready,则会挂起协程进行等待。直到生产者调用post方法,所有被挂起的协程会继续执行。
为了能够完成上述功能,Baton内部需要维护一个状态,用来表明这个Baton是否已经ready。当co_await一个Baton时:
- 如果Baton没有ready,那么协程就会被挂起,直到post被调用。(也就是上面介绍await_suspend时提到的有条件的挂起)
- 如果Baton已经ready,协程就会直接继续执行
Baton提供的主要接口如下所示:
class Baton {
 public:
  class WaitOperation;
  /// Initialise the Baton to either the signalled or non-signalled state.
  explicit Baton(bool initiallySignalled = false) noexcept;
  ~Baton();
  bool ready() const noexcept;
  [[nodiscard]] WaitOperation operator co_await() const noexcept {
    return Baton::WaitOperation{*this};
  }
  void post() noexcept;
  void reset() noexcept;
  class WaitOperation;
 private:
  // this  - Baton is in the signalled/posted state.
  // other - Baton is not signalled/posted and this is a pointer to the head
  //         of a potentially empty linked-list of Awaiter nodes that were
  //         waiting for the baton to become signalled.
  mutable std::atomic<void*> state_;
};
大多数接口在上面已经提到,没有涉及到的部分包括:
- reset是把一个已经ready的- Baton重置回没有ready的状态
- std::atomic<void*> state_的作用就是用来表明Baton是否ready:- 当state_值等于this时,代表已经ready。
- 当state_不等于this时,代表没有ready。
 
- 当
从原理上来说,所有正在等待的协程指针,会被维护在一个链表中,当post被调用时,就会遍历这个链表并继续执行这些协程。当state_不等于this时,state_中保存的就是链表头,之所以一定用this来代表这个ready这个特殊状态,也是因为正在等待的协程链表中的协程指针,肯定不会和this相同。
WaitOperation就是当co_await这个Baton时,会返回的Awaiter对象。它需要完成的工作有以下几点:
- 它需要知道正在等待的是哪个Baton
- 其次它需要维护所有正在等待这个Baton的协程,并在post调用之后恢复执行这些协程
- 此外它需要保存正在等待这个Baton的协程的coroutine_handle,从而完成协程的恢复
- 最后它既然是Awaiter对象,也就需要实现await_ready,await_suspend和await_resume。
为了实现上述功能,它的实现如下所示:
  class WaitOperation {
   public:
    explicit WaitOperation(const Baton& baton) noexcept : baton_(baton) {}
    bool await_ready() const noexcept;
    bool await_suspend(coroutine_handle<> awaitingCoroutine) noexcept;
    void await_resume() noexcept {}
   protected:
    friend class Baton;
    const Baton& baton_;
    coroutine_handle<> awaitingCoroutine_;
    WaitOperation* next_;
  };
根据我们要实现什么样的协程组件,我们就需要通过这几个接口定制
awaiter的行为。对于await_resume,我们不需要co_await一个Baton时得到一个返回值,所以不需要任何逻辑。
当我们co_await一个Baton时,我们就会获取到一个Awaiter对象,也就是WaitOperation。按照上面介绍编译器处理co_wait时的流程,之后就会调用它的await_ready检查是否已经完成异步操作,即Baton是否ready,也就是检查Baton中的m_state是否是this:
bool Baton::WaitOperation::await_ready() const noexcept {
  return baton_.ready();
}
inline bool Baton::ready() const noexcept {
  return state_.load(std::memory_order_acquire) == static_cast<const void*>(this);
}
如果不是ready状态,接下来就会调用await_suspend,此处协程已经处于挂起状态,通过await_suspend的返回值,可以决定是执行返还给调用方,还是立即恢复协程。
这里解释下为什么明明刚通过
await_ready检查了Baton没有ready,并挂起了协程,为什么又可能会立即恢复协程。Baton是多消费者单生产者模型,在这个协程调用co_await期间,准确说是await_ready和await_suspend期间,Baton的状态可能已经被修改为ready,此时这个协程就不需要再等待,也就是所谓有条件地立即恢复协程。
bool Baton::WaitOperation::await_suspend(coroutine_handle<> awaitingCoroutine) noexcept {
  awaitingCoroutine_ = awaitingCoroutine;
  return baton_.waitImpl(this);
}
bool Baton::waitImpl(WaitOperation* awaiter) const noexcept {
  // Try to push the awaiter onto the front of the queue of waiters.
  const auto signalledState = static_cast<const void*>(this);
  void* oldValue = state_.load(std::memory_order_acquire);
  do {
    if (oldValue == signalledState) {
      // Already in the signalled state, don't enqueue it.
      return false;
    }
    awaiter->next_ = static_cast<WaitOperation*>(oldValue);
  } while (!folly::atomic_compare_exchange_weak_explicit(
      &state_,
      &oldValue,
      awaiter,
      std::memory_order_release,
      std::memory_order_acquire));
  return true;
}
这部分大致逻辑是这样的:
- 保存当前的coroutine_handle,以便后续可以恢复这个协程
- 需要将自身加入到等待的协程列表,也就是这个无锁链表中去。当前协程的指针是通过awaiter保存的,waitImpl就是要将awaiter通过头插法插入到链表头中。- 首先获取当前state_。
- 检查Baton的状态是否已经是ready,即检查state_是否为Baton的this指针。
- 如果已经ready,直接返回false,代表当前协程不再需要等待,可以立即恢复执行,即执行到<resume-point>。
- 如果没有ready,则将当前awaiter的next指针设置成state_,即链表头。
- 然后尝试CAS将链表头state_替换为awaiter。
- 如果CAS成功代表已经成功加入链表,返回true,当前协程就会执行到<return-to-caller-or-resumer>,此后返回到co_await调用方,等待唤醒。
- 如果CAS失败,代表存在并发co_await这个Baton,需要重新走上述流程
 
- 首先获取当前
同理,reset逻辑就是将state_重新设置为nullptr,即没有ready的状态。
inline void Baton::reset() noexcept {
  // Transition from 'signalled' (ie. 'this') to not-signalled (ie. nullptr).
  void* oldState = this;
  (void)state_.compare_exchange_strong(
      oldState, nullptr, std::memory_order_acq_rel, std::memory_order_relaxed);
}
而post时候逻辑就是依次遍历这个链表,通过WaitOperation中保存的coroutine_handle恢复协程的执行。
void Baton::post() noexcept {
  void* const signalledState = static_cast<void*>(this);
  void* oldValue = state_.exchange(signalledState, std::memory_order_acq_rel);
  if (oldValue != signalledState) {
    // We are the first thread to set the state to signalled and there is
    // a waiting coroutine. We are responsible for resuming it.
    WaitOperation* awaiter = static_cast<WaitOperation*>(oldValue);
    while (awaiter != nullptr) {
      std::exchange(awaiter, awaiter->next_)->awaitingCoroutine_.resume();
    }
  }
}
Reference
| [C++ Coroutines: Understanding operator co_await | Asymmetric Transfer (lewissbaker.github.io)](https://lewissbaker.github.io/2017/11/17/understanding-operator-co-await) | 
| [Coroutine Theory | Asymmetric Transfer (lewissbaker.github.io)](https://lewissbaker.github.io/2017/09/25/coroutine-theory) | 
CppCon 2016: James McNellis “Introduction to C++ Coroutines” (youtube.com)