blob: f5236c4798b729a19e072634fd5c3d0a15d0e40d [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_GRPC_HPP__
#define __PROCESS_GRPC_HPP__
#include <chrono>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <google/protobuf/message.h>
#include <grpcpp/grpcpp.h>
#include <process/check.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
// This file provides libprocess "support" for using gRPC. In particular, it
// defines two wrapper classes: `client::Connection` which represents a
// connection to a gRPC server, and `client::Runtime` which integrates an event
// loop waiting for gRPC responses and provides the `call` interface to create
// an asynchronous gRPC call and return a `Future`.
#define GRPC_CLIENT_METHOD(service, rpc) \
(&service::Stub::PrepareAsync##rpc)
namespace grpc {
std::ostream& operator<<(std::ostream& stream, StatusCode statusCode);
} // namespace grpc {
namespace process {
namespace grpc {
/**
* Represents errors caused by non-OK gRPC statuses. See:
* https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
*/
class StatusError : public Error
{
public:
StatusError(::grpc::Status _status)
: Error(stringify(_status.error_code()) +
(_status.error_message().empty()
? "" : ": " + _status.error_message())),
status(std::move(_status))
{
CHECK(!status.ok());
}
const ::grpc::Status status;
};
namespace client {
// Internal helper utilities.
namespace internal {
template <typename T>
struct MethodTraits; // Undefined.
template <typename Stub, typename Request, typename Response>
struct MethodTraits<
std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
::grpc::ClientContext*,
const Request&,
::grpc::CompletionQueue*)>
{
typedef Stub stub_type;
typedef Request request_type;
typedef Response response_type;
};
} // namespace internal {
/**
* A copyable interface to manage a connection to a gRPC server. All
* `Connection` copies share the same gRPC channel which is thread safe. Note
* that the actual connection is established lazily by the gRPC library at the
* time an RPC is made to the channel.
*/
class Connection
{
public:
Connection(
const std::string& uri,
const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
::grpc::InsecureChannelCredentials())
: channel(::grpc::CreateChannel(uri, credentials)) {}
explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
: channel(std::move(_channel)) {}
const std::shared_ptr<::grpc::Channel> channel;
};
/**
* Defines the gRPC options for each call.
*/
struct CallOptions
{
// Enable the gRPC wait-for-ready semantics by default so the call will be
// retried if the connection is not ready. See:
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
bool wait_for_ready = true;
// The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
// there is no response in the specified amount of time. This is required to
// avoid the call from being pending forever.
Duration timeout = Seconds(60);
};
/**
* A copyable interface to manage an internal runtime process for asynchronous
* gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
* outstanding requests, a looper thread to wait for any incoming responses from
* the `CompletionQueue`, and handles the requests and responses. All `Runtime`
* copies share the same runtime process. Usually we only need a single runtime
* process to handle all gRPC calls, but multiple runtime processes can be
* instantiated for better parallelism and isolation.
*
* NOTE: The caller must call `terminate` to drain the `CompletionQueue` before
* finalizing libprocess to gracefully terminate the gRPC runtime.
*/
class Runtime
{
public:
Runtime() : data(new Data()) {}
/**
* Sends an asynchronous gRPC call.
*
* This function returns a `Future` of a `Try` such that the response protobuf
* is returned only if the gRPC call returns an OK status to ensure type
* safety (see https://github.com/grpc/grpc/issues/12824). Note that the
* future never fails; it will return a `StatusError` if a non-OK status is
* returned for the call, so the caller can handle the error programmatically.
*
* @param connection A connection to a gRPC server.
* @param method The asynchronous gRPC call to make. This should be obtained
* by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
* @param options The gRPC options for the call.
* @return a `Future` of `Try` waiting for a response protobuf or an error.
*/
template <
typename Method,
typename Request =
typename internal::MethodTraits<Method>::request_type,
typename Response =
typename internal::MethodTraits<Method>::response_type,
typename std::enable_if<
std::is_convertible<
typename std::decay<Request>::type*,
google::protobuf::Message*>::value,
int>::type = 0>
Future<Try<Response, StatusError>> call(
const Connection& connection,
Method&& method,
Request&& request,
const CallOptions& options)
{
// Create a `Promise` that will be set upon receiving a response.
// TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
// to be captured by the lambda below. Use a `unique_ptr` once we get C++14.
std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
new Promise<Try<Response, StatusError>>);
Future<Try<Response, StatusError>> future = promise->future();
// Send the request in the internal runtime process.
// TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
// extra copy. We should capture it by forwarding once we get C++14.
dispatch(data->pid, &RuntimeProcess::send, std::bind(
[connection, method, options, promise](
const Request& request,
bool terminating,
::grpc::CompletionQueue* queue) {
if (terminating) {
promise->fail("Runtime has been terminated");
return;
}
// TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be
// captured by the lambda below. Use `unique_ptr`s once we get C++14.
std::shared_ptr<::grpc::ClientContext> context(
new ::grpc::ClientContext());
context->set_wait_for_ready(options.wait_for_ready);
// We need to ensure that we're using a
// `std::chrono::system_clock::time_point` because `grpc::TimePoint`
// provides a specialization only for this type and we cannot
// guarantee that the operation below will always result in this type.
auto time_point =
std::chrono::time_point_cast<std::chrono::system_clock::duration>(
std::chrono::system_clock::now() +
std::chrono::nanoseconds(options.timeout.ns()));
context->set_deadline(time_point);
promise->future().onDiscard([=] { context->TryCancel(); });
std::shared_ptr<Response> response(new Response());
std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
(typename internal::MethodTraits<Method>::stub_type(
connection.channel).*method)(context.get(), request, queue);
reader->StartCall();
// Create a `ReceiveCallback` as a tag in the `CompletionQueue` for
// the current asynchronous gRPC call. The callback will set up the
// above `Promise` upon receiving a response.
// NOTE: `context` and `reader` need to be held on in order to get
// updates for the ongoing RPC, and thus are captured here. The
// callback itself will later be retrieved and managed in the
// looper thread.
void* tag = new ReceiveCallback(
[context, reader, response, status, promise]() {
CHECK_PENDING(promise->future());
if (promise->future().hasDiscard()) {
promise->discard();
} else {
promise->set(status->ok()
? std::move(*response)
: Try<Response, StatusError>::error(std::move(*status)));
}
});
reader->Finish(response.get(), status.get(), tag);
},
std::forward<Request>(request),
lambda::_1,
lambda::_2));
return future;
}
/**
* Asks the internal runtime process to shut down the `CompletionQueue`, which
* would asynchronously drain and fail all pending gRPC calls in the
* `CompletionQueue`, then join the looper thread.
*/
void terminate();
/**
* @return A `Future` waiting for all pending gRPC calls in the
* `CompletionQueue` of the internal runtime process to be drained and the
* looper thread to be joined.
*/
Future<Nothing> wait();
private:
// Type of the callback functions that can get invoked when sending a request
// or receiving a response.
typedef lambda::CallableOnce<
void(bool, ::grpc::CompletionQueue*)> SendCallback;
typedef lambda::CallableOnce<void()> ReceiveCallback;
class RuntimeProcess : public Process<RuntimeProcess>
{
public:
RuntimeProcess();
~RuntimeProcess() override;
void send(SendCallback callback);
void receive(ReceiveCallback callback);
void terminate();
Future<Nothing> wait();
private:
void initialize() override;
void finalize() override;
void loop();
::grpc::CompletionQueue queue;
std::unique_ptr<std::thread> looper;
bool terminating;
Promise<Nothing> terminated;
};
struct Data
{
Data();
~Data();
PID<RuntimeProcess> pid;
Future<Nothing> terminated;
};
std::shared_ptr<Data> data;
};
} // namespace client {
} // namespace grpc {
} // namespace process {
#endif // __PROCESS_GRPC_HPP__