C++20 协程支持

bRPC 支持 C++20 协程说明文档。

注:该功能是实验性的,请勿在生产环境下使用。

使用说明

适用场景

C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。

使用前提

  1. 需要使用支持c++20的编译器,如gcc 11
  2. 需要编译选项中加上 -std=c++20

简单示例

以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。

#include <brpc/channel.h>
#include <brpc/coroutine.h>

// 协程函数的返回类型,需要是brpc::experimental::Awaitable<T>
// T是函数返回的实际数据类型
brpc::experimental::Awaitable<int> RpcCall(brpc::Channel& channel) {
    EchoRequest request;
    EchoResponse response;
    EchoService_Stub stub(&_channel);
    brpc::Controller cntl;
    brpc::experimental::AwaitableDone done;
    stub.Echo(&cntl, &request, &response, &done);
    // 等待RPC返回结果
    co_await done.awaitable();
    // 返回数据,注意这里用co_return而不是return
    // 因为函数返回值类型是brpc::experimental::Awaitable<int>而不是int
    co_return cntl.ErrorCode();
}

brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
    brpc::Channel channel;
    channel.Init(server, NULL);
    // co_await会从Awaitable<int>得到int类型的返回值
    int code = co_await RpcCall(channel);
    printf("Rpc result:%d\n", code);
}

int main() {
    // 启动协程
    brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080"));
    // 等待协程执行完成
    coro.join();
    return 0;
}

更完整的例子可以查看源码中的example/coroutine/coroutine_server.cpp文件。

更多用法

  1. 在非协程环境下等待一个协程执行完成:
brpc::experimental::Coroutine coro(func(args));
coro.join();
  1. 在非协程环境下等待协程完成并获取返回值:
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
int result = coro.join<int>();
  1. 在协程环境下等待协程执行完成:
brpc::experimental::Coroutine coro(func(args));
... // 做一些其它事情
co_await coro.awaitable();
  1. 在协程环境下等待协程执行完成并获取返回值:
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
... // 做一些其它事情
int ret = co_await coro.awaitable<int>();
  1. 在协程环境下sleep:
co_await brpc::experimental::Coroutine::usleep(1000);

注意事项

  1. 协程不保证一个函数的上下文都在同一个pthread或同一个bthread下执行。在co_await之后,代码所在的pthread或bthread可能发生变化,因此依赖于pthread或bthread的线程局部变量的代码(比如rpcz功能)将无法正确工作。
  2. 不应在协程中使用阻塞bthread(如bthread_join、同步RPC)或阻塞pthread的函数,否则可能导致死锁或者长耗时。
  3. 不要在不必要的地方使用协程,如下面的代码,虽然也能正常工作,但没有意义:
brpc::experimental::Awaitable<int> inplace_func() {
    co_return 123;
}

实现极致性能

如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启usercode_in_coroutine这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread worker数量控制。

实现原理

C++20协程实现原理

为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分:

brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
    brpc::Channel channel;
    channel.Init(server, NULL);
    brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);

    int code = co_await awaitable;
    printf("Rpc result:%d\n", code);
}

上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个Awaitable<T>对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多):


brpc::experimental::Awaitable<void> CoroutineMain(const char* server) { // 根据函数返回类型,找到Awaitable<void>的名为promise_type的子类 // 在函数的入口,创建一个promise_type类型的对象 auto promise = new brpc::experimental::Awaitable<void>::promise_type(); // 从promise对象中创建返回Awaitable对象 Awaitable<void> ret = promise->get_return_object(); // co_await之前的逻辑,保持不变 brpc::Channel channel; channel.Init(server, NULL); brpc::experimental::Awaitable<int> awaitable = RpcCall(channel); // co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数 awaitable.await_suspend([promise, &awaitable]() { // co_await之后的逻辑,转移到callback函数中 int code = awaitable.await_resume(); printf("Rpc result:%d\n", code); // 在final_suspend里面,会做一些唤醒调用者、资源释放的工作 promise->final_suspend(); delete promise; }) // 返回Awaitable<void>对象,以便上层函数进行处理 return ret; }

也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。

原子等待操作

上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。

  1. 等待RPC返回结果: AwaitableDone::awaitable()
  2. 等待sleep: Coroutine::usleep()
  3. 等待另一个协程完成: Coroutine::awaitable()

下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。

inline Awaitable<int> Coroutine::usleep(int sleep_us) {
    auto promise = new detail::AwaitablePromise<int>();
    promise->set_needs_suspend();
    bthread_timer_t timer;
    auto abstime = butil::microseconds_from_now(sleep_us);
    auto cb = [](void* p) {
        auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
        promise->set_value(0);
        promise->on_done();
    };
    bthread_timer_add(&timer, abstime, cb, promise);
    return Awaitable<int>(promise);
}

协程与多线程

上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthread或bthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。

协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。