blob: cdce5b9e0a0c7da4aa48afdada220b6dda384a0a [file] [log] [blame] [view]
# C++20 协程支持
bRPC 支持 C++20 协程说明文档。
> 注:该功能是实验性的,请勿在生产环境下使用。
## 使用说明
### 适用场景
C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。
### 使用前提
1. 需要使用支持c++20的编译器,如gcc 11
2. 需要编译选项中加上 `-std=c++20`
### 简单示例
以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。
```cpp
#include <brpc/channel.h>
#include <brpc/coroutine.h>
// 协程函数的返回类型,需要是brpc::experimental::Awaitable<T>
// T是函数返回的实际数据类型
brpc::experimental::Awaitable<int> RpcCall(brpc::Channel& channel) {
EchoRequest request;
EchoResponse response;
EchoService_Stub stub(&_channel);
brpc::Controller cntl;
brpc::experimental::AwaitableDone done;
stub.Echo(&cntl, &request, &response, &done);
// 等待RPC返回结果
co_await done.awaitable();
// 返回数据,注意这里用co_return而不是return
// 因为函数返回值类型是brpc::experimental::Awaitable<int>而不是int
co_return cntl.ErrorCode();
}
brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
brpc::Channel channel;
channel.Init(server, NULL);
// co_await会从Awaitable<int>得到int类型的返回值
int code = co_await RpcCall(channel);
printf("Rpc result:%d\n", code);
}
int main() {
// 启动协程
brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080"));
// 等待协程执行完成
coro.join();
return 0;
}
```
更完整的例子可以查看源码中的`example/coroutine/coroutine_server.cpp`文件。
### 更多用法
1. 在非协程环境下等待一个协程执行完成:
```cpp
brpc::experimental::Coroutine coro(func(args));
coro.join();
```
2. 在非协程环境下等待协程完成并获取返回值:
```cpp
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
int result = coro.join<int>();
```
3. 在协程环境下等待协程执行完成:
```cpp
brpc::experimental::Coroutine coro(func(args));
... // 做一些其它事情
co_await coro.awaitable();
```
4. 在协程环境下等待协程执行完成并获取返回值:
```cpp
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
... // 做一些其它事情
int ret = co_await coro.awaitable<int>();
```
5. 在协程环境下sleep
```cpp
co_await brpc::experimental::Coroutine::usleep(1000);
```
### 注意事项
1. 协程不保证一个函数的上下文都在同一个pthread或同一个bthread下执行。在co_await之后,代码所在的pthreadbthread可能发生变化,因此依赖于pthreadbthread的线程局部变量的代码(比如rpcz功能)将无法正确工作。
2. 不应在协程中使用阻塞bthread(如bthread_join、同步RPC)或阻塞pthread的函数,否则可能导致死锁或者长耗时。
3. 不要在不必要的地方使用协程,如下面的代码,虽然也能正常工作,但没有意义:
```cpp
brpc::experimental::Awaitable<int> inplace_func() {
co_return 123;
}
```
### 实现极致性能
如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启`usercode_in_coroutine`这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread worker数量控制。
## 实现原理
### C++20协程实现原理
为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分:
```cpp
brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
brpc::Channel channel;
channel.Init(server, NULL);
brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);
int code = co_await awaitable;
printf("Rpc result:%d\n", code);
}
```
上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个`Awaitable<T>`对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多):
```cpp
brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
// 根据函数返回类型,找到Awaitable<void>的名为promise_type的子类
// 在函数的入口,创建一个promise_type类型的对象
auto promise = new brpc::experimental::Awaitable<void>::promise_type();
// 从promise对象中创建返回Awaitable对象
Awaitable<void> ret = promise->get_return_object();
// co_await之前的逻辑,保持不变
brpc::Channel channel;
channel.Init(server, NULL);
brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);
// co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数
awaitable.await_suspend([promise, &awaitable]() {
// co_await之后的逻辑,转移到callback函数中
int code = awaitable.await_resume();
printf("Rpc result:%d\n", code);
// 在final_suspend里面,会做一些唤醒调用者、资源释放的工作
promise->final_suspend();
delete promise;
})
// 返回Awaitable<void>对象,以便上层函数进行处理
return ret;
}
```
也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。
### 原子等待操作
上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。
1. 等待RPC返回结果: `AwaitableDone::awaitable()`
2. 等待sleep: `Coroutine::usleep()`
3. 等待另一个协程完成: `Coroutine::awaitable()`
下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。
```cpp
inline Awaitable<int> Coroutine::usleep(int sleep_us) {
auto promise = new detail::AwaitablePromise<int>();
promise->set_needs_suspend();
bthread_timer_t timer;
auto abstime = butil::microseconds_from_now(sleep_us);
auto cb = [](void* p) {
auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
promise->set_value(0);
promise->on_done();
};
bthread_timer_add(&timer, abstime, cb, promise);
return Awaitable<int>(promise);
}
```
### 协程与多线程
上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthreadbthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。
协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。