blob: e9a8a3e9109c88e47a3d0134336b3eec36b2364e [file] [log] [blame]
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (Steve Hill)
#ifndef PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_
#define PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_
#include <memory>
#include "base/logging.h"
#include "pagespeed/controller/controller.grpc.pb.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/message_handler.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/util/grpc.h"
// RequestResultRpcClient manages the client portion of a gRPC connection. It is
// the client-side counterpart to RequestResultRpcHandler. See class comments
// below.
namespace net_instaweb {
// Holder of various bits of client-side gRPC stuff. Primarily exists
// to allow a blocking call to something like Done() into an async call to
// the server.
template <typename ReaderWriter>
class RpcHolder {
public:
RpcHolder(MessageHandler* handler) : handler_(handler) {}
// Takes ownership of "this", ie: You must remove the RpcHolder from any smart
// pointers because it's now going to delete itself.
// You should only call one of CallbackForAsyncCleanup() or Finish().
Function* CallbackForAsyncCleanup() {
return MakeFunction(this, &RpcHolder::Finish, &RpcHolder::Error);
}
// Takes ownership of "this", ie: You must remove the RpcHolder from any smart
// pointers because it's now going to delete itself.
// You should only call one of CallbackForAsyncCleanup() or Finish().
void Finish() {
rw()->Finish(&status_, MakeFunction(this, &RpcHolder::FinishSucceeded,
&RpcHolder::FinishFailed));
}
::grpc::ClientContext* context() { return &context_; }
MessageHandler* handler() { return handler_; }
ReaderWriter* rw() { return rw_.get(); }
void SetReaderWriter(std::unique_ptr<ReaderWriter> rw) {
DCHECK(rw_ == nullptr);
rw_ = std::move(rw);
}
private:
void FinishSucceeded() {
// OK and CANCELLED are expected error codes, don't bother to log them.
if (status_.error_code() != ::grpc::StatusCode::OK &&
status_.error_code() != ::grpc::StatusCode::CANCELLED) {
MessageType severity =
#ifndef NDEBUG
// ABORTED is generated by state machine errors on the server side,
// so use kFatal for that in debug builds.
(status_.error_code() == ::grpc::StatusCode::ABORTED) ? kFatal :
#endif
kWarning;
handler_->Message(severity,
"Received error status from CentralController: %d (%s)",
status_.error_code(), status_.error_message().c_str());
}
delete this;
}
void FinishFailed() {
PS_LOG_WARN(handler_, "RpcHolder Finish failed");
delete this;
}
void Error() {
PS_LOG_WARN(handler_, "RpcHolder cleanup to CentralController failed");
Finish(); // We'd still like to see the error status. Will delete us.
}
MessageHandler* handler_;
::grpc::ClientContext context_;
std::unique_ptr<ReaderWriter> rw_;
::grpc::Status status_;
};
// This is intended for use as the Context for a subclass of
// CentralControllerCallback (like ExpensiveOperationCallback or
// ScheduleRewriteCallback). Unfortunately it can't literally be the
// implementation of the Context due to double-inheritance, instead you probably
// want to put a subclass of this inside the Context and delegate to it. See
// ExpensiveOperationRpcContext for an example of this.
//
// Subclassers probably should call Start() right after they create an instace
// of this. Start() will trigger a series of gRPC calls to the server and
// eventually call either Run or Cancel on the callback supplied to the
// constructor. If Run is called, do your work and then call
// SendResultToServer() to let it know you are done. If Cancel() is called, do
// not do the work.
template <typename RequestT, typename ResponseT, typename CallbackT>
class RequestResultRpcClient {
public:
typedef ::grpc::ClientAsyncReaderWriterInterface<RequestT, ResponseT>
ReaderWriter;
RequestResultRpcClient(
::grpc::CompletionQueue* queue, ThreadSystem* thread_system,
MessageHandler* handler, CallbackT* callback)
: mutex_(thread_system->NewMutex()),
queue_(queue),
callback_(callback),
rpc_(new RpcHolder<ReaderWriter>(handler)) {
CHECK(callback_ != nullptr);
}
virtual ~RequestResultRpcClient() {
// The child's destructor should probably call SendResultToServerIfActive
// to ensure proper cleanup happens. This isn't strictly required as the
// server will notice the hangup and handle it gracefully, but it may log
// errors in the process.
}
// Actually start the RPC by having the client call RequestFoo on the stub.
void Start(grpc::CentralControllerRpcService::StubInterface* stub) {
ScopedMutex lock(mutex_.get());
rpc_->SetReaderWriter(
StartRpc(stub, rpc_->context(), queue_,
MakeFunction(this, &RequestResultRpcClient::BootStrapFinished,
&RequestResultRpcClient::StartUpFailed)));
}
// Call this once your client has completed their work, ie: they call
// something like Done(), or at context destruction. You may call this method
// as many times as you like, but only the first one will actually do
// anything.
void SendResultToServer(const RequestT& response) {
ScopedMutex lock(mutex_.get());
if (rpc_ == nullptr) {
return;
}
// Detach the rpc context stuff and kick off the last Write() to the
// server to let it know we're done. We do this "detached" because it's
// very common that this method is invoked through the destructor of the
// context, which shouldn't block. The AsyncCleanup callback just makes
// sure the message was sent to the server and logs an error if not.
RpcHolder<ReaderWriter>* rpc = rpc_.release();
rpc->rw()->Write(response, rpc->CallbackForAsyncCleanup());
}
private:
// Delegate for the client to call AsyncFoo for the appropriate RPC on the
// stub.
virtual std::unique_ptr<ReaderWriter> StartRpc(
grpc::CentralControllerRpcService::StubInterface* stub,
::grpc::ClientContext* context, ::grpc::CompletionQueue* queue,
void* tag) = 0;
// Populate the initial RequestT that is sent to the server. This is the
// message that is used to request work. The server's response will be
// communicated back by a call to either Run() or Cancel() on the callback
// that was supplied in the constructor.
virtual void PopulateServerRequest(RequestT* request) = 0;
// Handler for Start() success, above.
void BootStrapFinished() {
ScopedMutex lock(mutex_.get());
RequestT req;
PopulateServerRequest(&req);
rpc_->rw()->Write(
req,
MakeFunction(this, &RequestResultRpcClient::WriteServerRequestComplete,
&RequestResultRpcClient::StartUpFailed));
}
void WriteServerRequestComplete() {
ScopedMutex lock(mutex_.get());
rpc_->rw()->Read(
&resp_, MakeFunction(
this, &RequestResultRpcClient::NotifyClientOfServerDecision,
&RequestResultRpcClient::StartUpFailed));
}
void NotifyClientOfServerDecision() {
ScopedMutex lock(mutex_.get());
DCHECK(rpc_ != nullptr);
// This could delegate to the subclass, but we're already relying on the
// fact that this boolean has the same name on the server side.
bool ok_to_proceed = resp_.ok_to_proceed();
resp_.Clear();
CallbackT* cb = callback_;
callback_ = nullptr;
if (ok_to_proceed) {
lock.Release();
cb->CallRun();
return;
// User will call back into us via SendResultToServer() at some point.
} else {
// Terminate session and disable calls to SendResultToServer.
rpc_.reset();
lock.Release();
cb->CallCancel();
return;
}
}
// Handler for any errors up until the point NotifyClientOfServerDecision is
// called.
void StartUpFailed() LOCKS_EXCLUDED(mutex_) {
ScopedMutex lock(mutex_.get());
DCHECK(rpc_ != nullptr);
if (rpc_ != nullptr) {
PS_LOG_WARN(rpc_->handler(),
"Couldn't get response from CentralController");
// Detach the rpc and call Finish to get (log) the error code in the
// background. It is possible to avoid detaching if we delay the callback
// until Finish completes. However, there's very little advantage to that,
// and this means we can just use the same Finish() handler in the
// RpcHolder.
RpcHolder<ReaderWriter>* rpc = rpc_.release();
rpc->Finish();
Function* cb = callback_;
callback_ = nullptr;
lock.Release();
cb->CallCancel();
return;
}
}
std::unique_ptr<AbstractMutex> mutex_;
::grpc::CompletionQueue* queue_;
CallbackT* callback_ GUARDED_BY(mutex_);
std::unique_ptr<RpcHolder<ReaderWriter>> rpc_ GUARDED_BY(mutex_);
ResponseT resp_ GUARDED_BY(mutex_);
};
} // namespace net_instaweb
#endif // PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_