blob: 063a15b55285ef16cdc4ca67a4bd46a1b2622f44 [file] [log] [blame]
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (Steve Hill)
#include "pagespeed/controller/central_controller_rpc_client.h"
#include <utility>
#include "base/logging.h"
#include "pagespeed/controller/central_controller_rpc_server.h"
#include "pagespeed/controller/context_registry.h"
#include "pagespeed/controller/expensive_operation_rpc_context.h"
#include "pagespeed/controller/schedule_rewrite_rpc_context.h"
#include "pagespeed/kernel/base/thread.h"
namespace net_instaweb {
// We might log if this rolls over, so don't be too aggressive.
const int CentralControllerRpcClient::kControllerReconnectDelayMs =
5 * Timer::kMinuteMs;
const char CentralControllerRpcClient::kControllerReconnectTimeStatistic[] =
"central-controller-reconnect-time-ms";
class CentralControllerRpcClient::GrpcClientThread
: public ThreadSystem::Thread {
public:
explicit GrpcClientThread(ThreadSystem* thread_system)
: Thread(thread_system, "central_controller_client",
ThreadSystem::kJoinable) {}
~GrpcClientThread() override {
queue_.Shutdown();
if (this->Started()) {
this->Join();
}
}
::grpc::CompletionQueue* queue() { return &queue_; }
private:
void Run() override { CentralControllerRpcServer::MainLoop(&queue_); }
::grpc::CompletionQueue queue_;
DISALLOW_COPY_AND_ASSIGN(GrpcClientThread);
};
// Adapt ContextRegistry to ClientContext::GlobalCallbacks, which are a
// convenient way of hooking up the Regitry.
class CentralControllerRpcClient::ClientRegistry
: public ::grpc::ClientContext::GlobalCallbacks {
public:
explicit ClientRegistry(ThreadSystem* thread_system)
: thread_system_(thread_system),
registry_(new ContextRegistry<::grpc::ClientContext>(thread_system)) {}
// Called whenever a ClientContext is created.
void DefaultConstructor(::grpc::ClientContext* context) override {
CHECK(registry_->TryRegisterContext(context));
}
// Called whenever a ClientContext is destroyed.
void Destructor(::grpc::ClientContext* context) override {
registry_->RemoveContext(context);
}
void CancelAllActive() { registry_->CancelAllActive(); }
void CancelAllActiveAndWait() { registry_->CancelAllActiveAndWait(); }
// When re-connecting we need to "un-shutdown" the registry. We can't call
// SetGlobalCallbacks() more than once and replace the pointer, so instead we
// swap out the inner pointer. Bad things will happen if there are things
// in the registry when this is called, so it's important that something
// external is preventing clients from starting while you call this
// (in this case it's state_ != RUNNING).
void ReviveAfterShutdown() {
std::unique_ptr<ContextRegistry<::grpc::ClientContext>> registry(
new ContextRegistry<::grpc::ClientContext>(thread_system_));
registry_.swap(registry);
CHECK(registry->Empty());
}
// These are also not locked, so if you plan to rely on the result you need
// some external assurance that the value hasn't changed in a way that
// matters.
int Size() const { return registry_->Size(); }
bool Empty() const { return registry_->Empty(); }
private:
ThreadSystem* thread_system_;
std::unique_ptr<ContextRegistry<::grpc::ClientContext>> registry_;
};
CentralControllerRpcClient::CentralControllerRpcClient(
const GoogleString& server_address, int max_outstanding_requests,
ThreadSystem* thread_system, Timer* timer, Statistics* statistics,
MessageHandler* handler)
: thread_system_(thread_system),
timer_(timer),
mutex_(thread_system_->NewMutex()),
clients_(new ClientRegistry(thread_system_)),
handler_(handler),
state_(DISCONNECTED),
// Fudge max_outstanding_requests a bit, just in case we're
// single-process. We'd rather not panic unnecessarily.
controller_panic_threshold_(max_outstanding_requests + 10),
reconnect_time_ms_(0),
reconnect_time_ms_statistic_(
statistics->GetUpDownCounter(kControllerReconnectTimeStatistic)),
channel_(::grpc::CreateChannel(server_address,
::grpc::InsecureChannelCredentials())),
stub_(grpc::CentralControllerRpcService::NewStub(channel_)) {
::grpc::ClientContext::SetGlobalCallbacks(clients_.get());
{
ScopedMutex lock(mutex_.get());
ConsiderConnecting(timer_->NowMs());
// If someone already detected that the controller is stalled, we may not
// connect and be left DISCONNECTED without a client_thread_.
}
}
CentralControllerRpcClient::~CentralControllerRpcClient() {
ShutDown();
// It's not possible to clear the GlobalCallbacks here, but since
// ShutDown prevents further clients from registering, that ought to
// be OK.
// TODO(cheesy): A better solution would be to install a static object for the
// handler. It could call back into "this" only while it has a non-null
// pointer, which would be cleared right here. The above remains true, though.
}
void CentralControllerRpcClient::InitStats(Statistics* statistics) {
statistics->AddUpDownCounter(kControllerReconnectTimeStatistic);
}
void CentralControllerRpcClient::ShutDown() {
{
ScopedMutex lock(mutex_.get());
if (state_ == SHUTDOWN) {
return;
}
// This will reject all further requests.
state_ = SHUTDOWN;
}
clients_->CancelAllActiveAndWait();
{
ScopedMutex lock(mutex_.get());
CHECK_EQ(state_, SHUTDOWN);
client_thread_.reset();
}
}
bool CentralControllerRpcClient::TimestampsAllowConnection(int64 now_ms) {
// At server startup, both the local variable and the statistic will be 0.
// At process startup, the statistic will be non-zero if someone else detected
// that the controller is not responding. In the event of a local problem,
// the local-only reconnect_time_ms_ is advanced. If any worker detects the
// controller is in trouble, they advance the statistic. For a connection to
// be allowed, now must be >= both the local timestamp and the statistic.
// Local reconnect_time_ms_ takes precedence over the statistic. Only once
// that has expired do we consult the statistic.
if (now_ms >= reconnect_time_ms_) {
// Cache the statistic, because why not?
reconnect_time_ms_ = reconnect_time_ms_statistic_->Get();
}
return now_ms >= reconnect_time_ms_;
}
void CentralControllerRpcClient::ConsiderConnecting(
int64 now_ms) /* EXCLUSIVE_LOCKS_REQUIRED(mutex_) */ {
// Reconnect if the time to connect has passed and we're not connected or
// shutdown.
if (state_ == DISCONNECTED && TimestampsAllowConnection(now_ms)) {
if (!clients_->Empty()) {
// We expect that there should be no clients in the registry now!
// Clients can only be added to the registry when state_ is RUNNING and
// we currently have a lock on state_. So, nothing can be added until we
// return.
#ifndef NDEBUG
handler_->Message(kError, "clients_ not empty for reconnect!");
#endif
reconnect_time_ms_ = now_ms + 5 * Timer::kSecondMs;
return;
}
std::unique_ptr<GrpcClientThread> thread(
new GrpcClientThread(thread_system_));
// We check fail if the thread fails to start at startup, but that's
// probably not OK here. This should rarely fail.
if (thread->Start()) {
clients_->ReviveAfterShutdown();
client_thread_ = std::move(thread);
state_ = RUNNING;
} else {
handler_->Message(
kError, "Couldn't start thread for talking to the controller!");
// This is the local variable, not the statistic. We don't want to
// force everyone to reconnect just because we had trouble starting a
// thread.
reconnect_time_ms_ = now_ms + kControllerReconnectDelayMs;
}
}
}
template <typename ContextT, typename CallbackT>
void CentralControllerRpcClient::StartContext(CallbackT* callback) {
bool shutdown_required = false;
int64 now_ms = timer_->NowMs();
{
ScopedMutex lock(mutex_.get());
ConsiderConnecting(now_ms);
if (state_ == RUNNING) {
CHECK(client_thread_ != nullptr);
if (!TimestampsAllowConnection(now_ms)) {
// Someone else (another thread or process) detected that the
// controller is not responding. Kill the client thread.
shutdown_required = true;
} else if (clients_->Size() > controller_panic_threshold_) {
// We've accumulated a crazy number of gRPC clients in the registry.
// It looks like the controller isn't responding and we're just piling
// up detached RewriteDrivers.
handler_->Message(
kError,
"The central controller isn't responding, "
"stopping image rewrites for %d seconds.",
static_cast<int>(kControllerReconnectDelayMs / Timer::kSecondMs));
// Tell everyone else to stop talking to the controller, too.
reconnect_time_ms_statistic_->Set(now_ms + kControllerReconnectDelayMs);
shutdown_required = true;
} else {
// Starts the transaction and deletes itself when done.
new ContextT(stub_.get(), client_thread_->queue(), thread_system_,
handler_, callback);
return; // Do not fall through, as callback will be canceled!
}
if (shutdown_required) {
// Stop further requests. We must do this before releasing the lock.
state_ = DISCONNECTED;
}
}
}
// Someone noticed that the controller is in trouble, so flush all outstanding
// requests to it.
if (shutdown_required) {
// We can't use CancelAllActiveAndWait here, because the thread calling
// us is the same one that needs to process the cancellations. However,
// nothing should be *added* to clients_ because because state_ was set to
// DISCONNECTED above.
clients_->CancelAllActive();
}
callback->CallCancel();
}
void CentralControllerRpcClient::ScheduleExpensiveOperation(
ExpensiveOperationCallback* callback) {
StartContext<ExpensiveOperationRpcContext>(callback);
}
void CentralControllerRpcClient::ScheduleRewrite(
ScheduleRewriteCallback* callback) {
StartContext<ScheduleRewriteRpcContext>(callback);
}
} // namespace net_instaweb