blob: 87c40cdacd8317233a52e61755ecf017fa10f541 [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_PROTOBUF_HPP__
#define __PROCESS_PROTOBUF_HPP__
#include <glog/logging.h>
#include <google/protobuf/arena.h>
#include <google/protobuf/message.h>
#include <google/protobuf/repeated_field.h>
#include <iterator>
#include <set>
#include <vector>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/hashmap.hpp>
#include <stout/lambda.hpp>
// Provides an implementation of process::post that for a protobuf.
namespace process {
inline void post(const process::UPID& to,
const google::protobuf::Message& message)
{
std::string data;
if (message.SerializeToString(&data)) {
post(to, message.GetTypeName(), data.data(), data.size());
} else {
LOG(ERROR) << "Failed to post '" << message.GetTypeName() << "' to "
<< to << ": Failed to serialize";
}
}
inline void post(const process::UPID& from,
const process::UPID& to,
const google::protobuf::Message& message)
{
std::string data;
if (message.SerializeToString(&data)) {
post(from, to, message.GetTypeName(), data.data(), data.size());
} else {
LOG(ERROR) << "Failed to post '" << message.GetTypeName() << "' to "
<< to << ": Failed to serialize";
}
}
} // namespace process {
// The rest of this file provides libprocess "support" for using
// protocol buffers. In particular, this file defines a subclass of
// Process (ProtobufProcess) that allows you to install protocol
// buffer handlers in addition to normal message and HTTP
// handlers. Install handlers can optionally take the sender's UPID
// as their first argument.
// Note that this header file assumes you will be linking
// against BOTH libprotobuf and libglog.
namespace google {
namespace protobuf {
// Type conversions helpful for changing between protocol buffer types
// and standard C++ types (for parameters).
template <typename T>
const T& convert(const T& t)
{
return t;
}
template <typename T>
std::vector<T> convert(const google::protobuf::RepeatedPtrField<T>& items)
{
return std::vector<T>(items.begin(), items.end());
}
template <typename T>
std::vector<T> convert(google::protobuf::RepeatedPtrField<T>&& items)
{
return std::vector<T>(
std::make_move_iterator(items.begin()),
std::make_move_iterator(items.end()));
}
} // namespace protobuf {
} // namespace google {
template <typename T>
class ProtobufProcess : public process::Process<T>
{
public:
~ProtobufProcess() override {}
protected:
void consume(process::MessageEvent&& event) override
{
if (protobufHandlers.count(event.message.name) > 0) {
from = event.message.from; // For 'reply'.
protobufHandlers[event.message.name](
event.message.from, event.message.body);
from = process::UPID();
} else {
process::Process<T>::consume(std::move(event));
}
}
void send(const process::UPID& to,
const google::protobuf::Message& message)
{
std::string data;
if (message.SerializeToString(&data)) {
process::Process<T>::send(to, message.GetTypeName(), std::move(data));
} else {
LOG(ERROR) << "Failed to send '" << message.GetTypeName() << "' to "
<< to << ": Failed to serialize";
}
}
using process::Process<T>::send;
void reply(const google::protobuf::Message& message)
{
CHECK(from) << "Attempting to reply without a sender";
send(from, message);
}
// Installs that take the sender as the first argument.
template <typename M>
void install(void (T::*method)(const process::UPID&, const M&))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(&handlerM<M>,
t, method,
lambda::_1, lambda::_2);
delete m;
}
template <typename M>
void install(void (T::*method)(const process::UPID&, M&&))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(&handlerMutM<M>,
t, method,
lambda::_1, lambda::_2);
delete m;
}
template <typename M, typename P>
using MessageProperty = P(M::*)() const;
template <typename M>
void install(void (T::*method)(const process::UPID&))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(&handler0,
t, method,
lambda::_1, lambda::_2);
delete m;
}
template <typename M,
typename ...P, typename ...PC>
void install(
void (T::*method)(const process::UPID&, PC...),
MessageProperty<M, P>... param)
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(static_cast<void(&)(
T*,
void (T::*)(const process::UPID&, PC...),
const process::UPID&,
const std::string&,
MessageProperty<M, P>...)>(handlerN),
t, method,
lambda::_1, lambda::_2, param...);
delete m;
}
// Installs that do not take the sender.
template <typename M>
void install(void (T::*method)(const M&))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(&_handlerM<M>,
t, method,
lambda::_1, lambda::_2);
delete m;
}
template <typename M>
void install(void (T::*method)(M&&))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(&_handlerMutM<M>,
t, method,
lambda::_1, lambda::_2);
delete m;
}
template <typename M>
void install(void (T::*method)())
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(&_handler0,
t, method,
lambda::_1, lambda::_2);
delete m;
}
template <typename M,
typename ...P, typename ...PC>
void install(
void (T::*method)(PC...),
MessageProperty<M, P>... param)
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] =
lambda::bind(static_cast<void(&)(
T*,
void (T::*)(PC...),
const process::UPID&,
const std::string&,
MessageProperty<M, P>...)>(_handlerN),
t, method,
lambda::_1, lambda::_2, param...);
delete m;
}
using process::Process<T>::install;
private:
// Handlers that take the sender as the first argument.
template <typename M>
static void handlerM(
T* t,
void (T::*method)(const process::UPID&, const M&),
const process::UPID& sender,
const std::string& data)
{
google::protobuf::Arena arena;
M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
if (m->ParseFromString(data)) {
(t->*method)(sender, *m);
} else {
LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
<< "' from " << sender;
}
}
template <typename M>
static void handlerMutM(
T* t,
void (T::*method)(const process::UPID&, M&&),
const process::UPID& sender,
const std::string& data)
{
M m;
if (m.ParseFromString(data)) {
(t->*method)(sender, std::move(m));
} else {
LOG(ERROR) << "Failed to deserialize '" << m.GetTypeName()
<< "' from " << sender;
}
}
static void handler0(
T* t,
void (T::*method)(const process::UPID&),
const process::UPID& sender,
const std::string& data)
{
(t->*method)(sender);
}
template <typename M,
typename ...P, typename ...PC>
static void handlerN(
T* t,
void (T::*method)(const process::UPID&, PC...),
const process::UPID& sender,
const std::string& data,
MessageProperty<M, P>... p)
{
google::protobuf::Arena arena;
M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
if (m->ParseFromString(data)) {
(t->*method)(sender, google::protobuf::convert((m->*p)())...);
} else {
LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
<< "' from " << sender;
}
}
// Handlers that ignore the sender.
template <typename M>
static void _handlerM(
T* t,
void (T::*method)(const M&),
const process::UPID& sender,
const std::string& data)
{
google::protobuf::Arena arena;
M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
if (m->ParseFromString(data)) {
(t->*method)(*m);
} else {
LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
<< "' from " << sender;
}
}
template <typename M>
static void _handlerMutM(
T* t,
void (T::*method)(M&&),
const process::UPID& sender,
const std::string& data)
{
M m;
if (m.ParseFromString(data)) {
(t->*method)(std::move(m));
} else {
LOG(ERROR) << "Failed to deserialize '" << m.GetTypeName()
<< "' from " << sender;
}
}
static void _handler0(
T* t,
void (T::*method)(),
const process::UPID&,
const std::string& data)
{
(t->*method)();
}
template <typename M,
typename ...P, typename ...PC>
static void _handlerN(
T* t,
void (T::*method)(PC...),
const process::UPID& sender,
const std::string& data,
MessageProperty<M, P>... p)
{
google::protobuf::Arena arena;
M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
if (m->ParseFromString(data)) {
(t->*method)(google::protobuf::convert((m->*p)())...);
} else {
LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
<< "' from " << sender;
}
}
typedef lambda::function<
void(const process::UPID&, const std::string&)> handler;
hashmap<std::string, handler> protobufHandlers;
// Sender of "current" message, inaccessible by subclasses.
// This is only used for reply().
process::UPID from;
};
// Implements a process for sending protobuf "requests" to a process
// and waiting for a protobuf "response", but uses futures so that
// this can be done without needing to implement a process.
template <typename Req, typename Res>
class ReqResProcess : public ProtobufProcess<ReqResProcess<Req, Res>>
{
public:
ReqResProcess(const process::UPID& _pid, const Req& _req)
: process::ProcessBase(process::ID::generate("__req_res__")),
pid(_pid),
req(_req)
{
ProtobufProcess<ReqResProcess<Req, Res>>::template
install<Res>(&ReqResProcess<Req, Res>::response);
}
~ReqResProcess() override
{
// Discard the promise.
promise.discard();
}
process::Future<Res> run()
{
promise.future().onDiscard(defer(this, &ReqResProcess::discarded));
ProtobufProcess<ReqResProcess<Req, Res>>::send(pid, req);
return promise.future();
}
private:
void discarded()
{
promise.discard();
process::terminate(this);
}
void response(const Res& res)
{
promise.set(res);
process::terminate(this);
}
const process::UPID pid;
const Req req;
process::Promise<Res> promise;
};
// Allows you to describe request/response protocols and then use
// those for sending requests and getting back responses.
template <typename Req, typename Res>
struct Protocol
{
process::Future<Res> operator()(
const process::UPID& pid,
const Req& req) const
{
// Help debugging by adding some "type constraints".
{ Req* req = nullptr; google::protobuf::Message* m = req; (void)m; }
{ Res* res = nullptr; google::protobuf::Message* m = res; (void)m; }
ReqResProcess<Req, Res>* process = new ReqResProcess<Req, Res>(pid, req);
process::spawn(process, true);
return process::dispatch(process, &ReqResProcess<Req, Res>::run);
}
};
#endif // __PROCESS_PROTOBUF_HPP__