blob: 6657155a89397984217f03827cf9465903b8a3ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PAGESPEED_CONTROLLER_RPC_HANDLER_H_
#define PAGESPEED_CONTROLLER_RPC_HANDLER_H_
#include "base/logging.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/ref_counted_ptr.h"
#include "pagespeed/kernel/util/grpc.h"
namespace net_instaweb {
// Class that manages the server side of a single gRPC bi-directional streaming
// RPC call; the invocation of a single method on a stub. We use bi-directional
// streaming RPCs because they allow us to send multiple messages over a single
// session and detect if either end disconnects.
//
// RpcHandler should be bootstrapped by calling Start() on an instance, after
// which subsequent instances are automatically created as needed. The class
// cleans up after itself (ie: You call new without calling delete). This is
// modeled after CallData in gRPCs
// examples/cpp/helloworld/greeter_async_server.cc.
//
// Every message received from a client will result in a call to HandleRequest.
// If gRPC detects a problem with the client (Read/Write failure, disconnect)
// after the first call to HandleRequest() but before Finish() is called,
// HandleError() will be invoked.
//
// The class is expected to be manipulated on a single thread, so it should be
// considered thread compatible.
// Template args:
// AsyncService should be the AsyncService member of the service in your proto.
// For example, given:
// service SampleService {
// rpc SaveConfiguration(stream Req) returns(stream Resp) {}
// }
// the template should be <SampleService::AsyncService, Req, Resp>
template <typename AsyncService, typename RequestT, typename ResponseT>
class RpcHandler
: public RefCounted<RpcHandler<AsyncService, RequestT, ResponseT>> {
public:
virtual ~RpcHandler() {}
protected:
typedef ::grpc::ServerAsyncReaderWriter<ResponseT, RequestT> ReaderWriterT;
RpcHandler(AsyncService* service, ::grpc::ServerCompletionQueue* cq);
// Send a response to the client. Returns true if the Write was successfully
// queued or false if the message cannot be sent, which may be because the
// client is not connected or because there is already a message outstanding.
// Once the Write has queued, one of either HandleWriteDone() or HandleError()
// will be called, depending on success.
//
// The single queued write is a restriction of the gRPC ReaderWriter class.
// If desired, it would be simple to expand this class with a write queue.
bool Write(const ResponseT& resp);
// Disconnect the client. Returns true if the client was not already
// disconnected. HandleError will not be called after Finish.
bool Finish(const ::grpc::Status& status);
// Invokes InitResponder with the various arguments that are private
// members of this class. This should be called in a factory used to create
// the initial RpcHandler. RpcHandler will take care of calling it on all the
// subsequent ones made with CreateHandler().
void Start();
private:
enum State {
INIT,
WAITING_FOR_FIRST_READ,
RUNNING,
FINISHED,
};
typedef RefCountedPtr<RpcHandler<AsyncService, RequestT, ResponseT>> RefPtrT;
// Called once for every message received from the client.
virtual void HandleRequest(const RequestT& req) = 0;
// Called if any of the gRPC operations fail after the first call to
// HandeRequest, after which the client will be disconnected (ie: It will
// only be called once). 'this' may be freed immediately upon return, if there
// are no other references. This method is *not* called if the client
// disconnects after the call to Finish.
virtual void HandleError() = 0;
// Called when a Write completes successfully.
virtual void HandleWriteDone() {}
// Attempt to initiate a gRPC client session by calling the appropriate
// RequestXXXRpcMethodName on the AsyncService object. In the SampleService
// example above, this would call service->RequestSaveConfiguration(...).
// "tag" should be passed directly to the gRPC API and will be initialised to
// a callback that will invoke the success/failure handlers in this class.
virtual void InitResponder(AsyncService* service, ::grpc::ServerContext* ctx,
ReaderWriterT* responder,
::grpc::ServerCompletionQueue* cq, void* tag) = 0;
// Create a new one of "this", ie: Just chain to the constructor.
// This is boilerplate because RpcHandler can't directly call the
// constructor of the implmenting class.
virtual RpcHandler* CreateHandler(AsyncService* service,
::grpc::ServerCompletionQueue* cq) = 0;
// Internal callback handlers. These all expect to be passed a RefPtrT to
// ensure that "this" can't be deleted while gRPC operations are still
// pending.
void InitDone(RefPtrT r);
void AttemptRead(RefPtrT r);
void ReadDone(RefPtrT r);
void FinishDone(RefPtrT r);
void WriteDone(RefPtrT r);
void CallHandleError(RefPtrT r);
// Is the current state compatible with making a call to Write or Finish
// (Not on the public API since write_outstanding_ makes this confusing).
bool IsClientWriteable() const;
AsyncService* service_;
::grpc::ServerCompletionQueue* cq_;
::grpc::ServerContext ctx_;
ReaderWriterT responder_;
// We pretty much always have a Read() request outstanding and we need a
// place to store the result.
RequestT request_;
State state_;
bool write_outstanding_;
DISALLOW_COPY_AND_ASSIGN(RpcHandler);
};
template <typename AsyncService, typename RequestT, typename ResponseT>
RpcHandler<AsyncService, RequestT, ResponseT>::RpcHandler(
AsyncService* service, ::grpc::ServerCompletionQueue* cq)
: service_(service),
cq_(cq),
responder_(&ctx_),
state_(INIT),
write_outstanding_(false) {
// It would be nice to put the contents of Start() in this function,
// but InitResponder is pure virtual at this point, so we can't downcall
// into it.
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::Start() {
// RequestFoo should only fail if the service or queue have been shutdown.
// So we just hook this up to CallHandleError which will silently destroy
// "this" without calling HandleError(), because state_ is INIT.
InitResponder(service_, &ctx_, &responder_, cq_,
MakeFunction(this, &RpcHandler::InitDone,
&RpcHandler::CallHandleError, RefPtrT(this)));
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::InitDone(RefPtrT ref) {
CreateHandler(service_, cq_)->Start(); // Cleans up after itself.
if (state_ != FINISHED) {
state_ = WAITING_FOR_FIRST_READ;
// It's now safe for our subclass to invoke Write() or Finish(). At some
// point in the future we could implement a callback to signal that.
AttemptRead(ref);
} else {
// When state is FINISHED, we were shutdown before we started.
// Propagate that as CANCELLED to any client.
::grpc::Status status(::grpc::StatusCode::CANCELLED, "not started");
responder_.Finish(
status, MakeFunction(this, &RpcHandler::FinishDone,
&RpcHandler::CallHandleError, RefPtrT(this)));
}
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::AttemptRead(RefPtrT ref) {
if (state_ != WAITING_FOR_FIRST_READ) {
DCHECK_EQ(state_, RUNNING);
}
if (state_ != FINISHED) {
responder_.Read(&request_, MakeFunction(this, &RpcHandler::ReadDone,
&RpcHandler::CallHandleError, ref));
}
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::ReadDone(RefPtrT ref) {
if (state_ == WAITING_FOR_FIRST_READ) {
state_ = RUNNING;
}
HandleRequest(request_);
request_.Clear(); // Save a little memory.
if (state_ != FINISHED) {
AttemptRead(ref);
}
}
template <typename AsyncService, typename RequestT, typename ResponseT>
bool RpcHandler<AsyncService, RequestT, ResponseT>::Finish(
const ::grpc::Status& status) {
if (state_ == FINISHED) {
return false;
}
if (IsClientWriteable()) {
responder_.Finish(
status, MakeFunction(this, &RpcHandler::FinishDone,
&RpcHandler::CallHandleError, RefPtrT(this)));
}
state_ = FINISHED;
return true;
}
template <typename AsyncService, typename RequestT, typename ResponseT>
bool RpcHandler<AsyncService, RequestT, ResponseT>::Write(
const ResponseT& response) {
if (IsClientWriteable() && !write_outstanding_) {
write_outstanding_ = true;
responder_.Write(response,
MakeFunction(this, &RpcHandler::WriteDone,
&RpcHandler::CallHandleError, RefPtrT(this)));
return true;
} else {
return false;
}
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::WriteDone(RefPtrT ref) {
write_outstanding_ = false;
HandleWriteDone();
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::FinishDone(RefPtrT ref) {
DCHECK_EQ(state_, FINISHED);
// This discards ref, which may free "this".
}
template <typename AsyncService, typename RequestT, typename ResponseT>
void RpcHandler<AsyncService, RequestT, ResponseT>::CallHandleError(
RefPtrT ref) {
// If an error occurs before we've read a message or after a call to
// Finish, we don't report it downwards. Note that HandleError is
// also called indirectly via CallHandleErrorAsync.
if (state_ == RUNNING) {
HandleError();
}
state_ = FINISHED;
// This discards ref, which may free "this".
}
template <typename AsyncService, typename RequestT, typename ResponseT>
bool RpcHandler<AsyncService, RequestT, ResponseT>::IsClientWriteable() const {
return state_ == WAITING_FOR_FIRST_READ || state_ == RUNNING;
}
} // namespace net_instaweb
#endif // PAGESPEED_CONTROLLER_RPC_HANDLER_H_