bRPC 支持 C++20 协程说明文档。
注:该功能是实验性的,请勿在生产环境下使用。
C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。
-std=c++20以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。
#include <brpc/channel.h> #include <brpc/coroutine.h> // 协程函数的返回类型,需要是brpc::experimental::Awaitable<T> // T是函数返回的实际数据类型 brpc::experimental::Awaitable<int> RpcCall(brpc::Channel& channel) { EchoRequest request; EchoResponse response; EchoService_Stub stub(&_channel); brpc::Controller cntl; brpc::experimental::AwaitableDone done; stub.Echo(&cntl, &request, &response, &done); // 等待RPC返回结果 co_await done.awaitable(); // 返回数据,注意这里用co_return而不是return // 因为函数返回值类型是brpc::experimental::Awaitable<int>而不是int co_return cntl.ErrorCode(); } brpc::experimental::Awaitable<void> CoroutineMain(const char* server) { brpc::Channel channel; channel.Init(server, NULL); // co_await会从Awaitable<int>得到int类型的返回值 int code = co_await RpcCall(channel); printf("Rpc result:%d\n", code); } int main() { // 启动协程 brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080")); // 等待协程执行完成 coro.join(); return 0; }
更完整的例子可以查看源码中的example/coroutine/coroutine_server.cpp文件。
brpc::experimental::Coroutine coro(func(args)); coro.join();
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int> int result = coro.join<int>();
brpc::experimental::Coroutine coro(func(args)); ... // 做一些其它事情 co_await coro.awaitable();
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int> ... // 做一些其它事情 int ret = co_await coro.awaitable<int>();
co_await brpc::experimental::Coroutine::usleep(1000);
brpc::experimental::Awaitable<int> inplace_func() { co_return 123; }
如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启usercode_in_coroutine这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread worker数量控制。
为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分:
brpc::experimental::Awaitable<void> CoroutineMain(const char* server) { brpc::Channel channel; channel.Init(server, NULL); brpc::experimental::Awaitable<int> awaitable = RpcCall(channel); int code = co_await awaitable; printf("Rpc result:%d\n", code); }
上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个Awaitable<T>对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多):
brpc::experimental::Awaitable<void> CoroutineMain(const char* server) { // 根据函数返回类型,找到Awaitable<void>的名为promise_type的子类 // 在函数的入口,创建一个promise_type类型的对象 auto promise = new brpc::experimental::Awaitable<void>::promise_type(); // 从promise对象中创建返回Awaitable对象 Awaitable<void> ret = promise->get_return_object(); // co_await之前的逻辑,保持不变 brpc::Channel channel; channel.Init(server, NULL); brpc::experimental::Awaitable<int> awaitable = RpcCall(channel); // co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数 awaitable.await_suspend([promise, &awaitable]() { // co_await之后的逻辑,转移到callback函数中 int code = awaitable.await_resume(); printf("Rpc result:%d\n", code); // 在final_suspend里面,会做一些唤醒调用者、资源释放的工作 promise->final_suspend(); delete promise; }) // 返回Awaitable<void>对象,以便上层函数进行处理 return ret; }
也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。
上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。
AwaitableDone::awaitable()Coroutine::usleep()Coroutine::awaitable()下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。
inline Awaitable<int> Coroutine::usleep(int sleep_us) { auto promise = new detail::AwaitablePromise<int>(); promise->set_needs_suspend(); bthread_timer_t timer; auto abstime = butil::microseconds_from_now(sleep_us); auto cb = [](void* p) { auto promise = static_cast<detail::AwaitablePromise<int>*>(p); promise->set_value(0); promise->on_done(); }; bthread_timer_add(&timer, abstime, cb, promise); return Awaitable<int>(promise); }
上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthread或bthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。
协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。