blob: fad5f73588bc72d673a4b878c97bb0f626fa52c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PAGESPEED_CONTROLLER_CENTRAL_CONTROLLER_RPC_CLIENT_H_
#define PAGESPEED_CONTROLLER_CENTRAL_CONTROLLER_RPC_CLIENT_H_
#include <memory>
#include <unordered_set>
#include "pagespeed/controller/central_controller.h"
#include "pagespeed/controller/controller.grpc.pb.h"
#include "pagespeed/controller/expensive_operation_callback.h"
#include "pagespeed/controller/schedule_rewrite_callback.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/message_handler.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/util/grpc.h"
namespace net_instaweb {
// CentralController implementation that forwards all requests to a gRPC server.
// RewriteDrivers wait for the controller response (possibly detaching) before
// proceeding to rewrite. If the controller stops responding but requests keep
// coming in, we could keep creating RewriteDrivers indefinitely and eat all
// available memory. To guard against this we looks at the number of outstanding
// gRPC requests. If that ever exceeds the max possible number, we declare the
// controller to have hung, cancel all outstanding requests and stop talking to
// it. We signal this via a statistic, so all processes can notice and do the
// same.
class CentralControllerRpcClient : public CentralController {
public:
static const char kControllerReconnectTimeStatistic[];
static const int kControllerReconnectDelayMs;
CentralControllerRpcClient(const GoogleString& server_address,
int panic_threshold, ThreadSystem* thread_system,
Timer* timer, Statistics* statistics,
MessageHandler* handler);
~CentralControllerRpcClient() override;
// CentralController implementation.
void ScheduleExpensiveOperation(
ExpensiveOperationCallback* callback) override;
void ScheduleRewrite(ScheduleRewriteCallback* callback) override;
void ShutDown() override LOCKS_EXCLUDED(mutex_);
static void InitStats(Statistics* stats);
private:
enum State {
DISCONNECTED, // Temporary shutdown, can be revived.
RUNNING,
SHUTDOWN, // Permanent shutdown.
};
class GrpcClientThread;
class ClientRegistry;
// Common code that checks shutdown status before creating a new ContextT.
template <typename ContextT, typename CallbackT>
void StartContext(CallbackT* callback) LOCKS_EXCLUDED(mutex_);
// If we're not connected and kControllerReconnectDelay has passed, attempt
// to reconnect.
void ConsiderConnecting(int64 now_ms) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Have we passed reconnect_time_ms_ and reconnect_time_ms_statistic_?
bool TimestampsAllowConnection(int64 now) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
ThreadSystem* thread_system_;
Timer* timer_;
std::unique_ptr<AbstractMutex> mutex_;
// TODO(cheesy): This should probably by guarded by mutex_ but I'm worried
// about deadlock in Shutdown() while the clients are draining. It *might*
// be OK because the lock you absolutely cannot hold is the one in the
// registry and that does get released.
std::unique_ptr<ClientRegistry> clients_;
MessageHandler* handler_;
State state_ GUARDED_BY(mutex_);
const int controller_panic_threshold_;
int64 reconnect_time_ms_ GUARDED_BY(mutex_);
UpDownCounter* reconnect_time_ms_statistic_;
std::unique_ptr<::grpc::CompletionQueue> queue_;
std::shared_ptr<::grpc::ChannelInterface> channel_;
std::unique_ptr<CentralControllerRpcService::Stub> stub_;
// This must be last so that it's destructed first.
std::unique_ptr<GrpcClientThread> client_thread_ GUARDED_BY(mutex_);
DISALLOW_COPY_AND_ASSIGN(CentralControllerRpcClient);
};
} // namespace net_instaweb
#endif // PAGESPEED_CONTROLLER_CENTRAL_CONTROLLER_RPC_CLIENT_H_