blob: a7288fc774a6223ed4ca5aaaea5f5901ce003e8d [file] [log] [blame]
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (Steve Hill)
#include "pagespeed/controller/grpc_server_test.h"
#include <sys/stat.h>
#include <memory>
#include "base/logging.h"
#include "pagespeed/controller/central_controller_rpc_server.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/thread.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/util/grpc.h"
#include "pagespeed/kernel/util/platform.h"
#include <grpc++/alarm.h>
namespace net_instaweb {
// Fixture for testing gRPC servers. Contains an event dispatcher thread, plus
// some other bits you'll need.
GrpcServerTest::GrpcServerTest()
: thread_system_(Platform::CreateThreadSystem()) {}
GrpcServerTest::~GrpcServerTest() { StopServer(); }
void GrpcServerTest::SetUp() {
// This doesn't seem to be created automatically!
mkdir(GTestTempDir().c_str(), 0755);
::grpc::ServerBuilder builder;
int bound_port = 0;
builder.AddListeningPort(ServerAddress(), ::grpc::InsecureServerCredentials(),
&bound_port);
queue_ = builder.AddCompletionQueue();
RegisterServices(&builder);
server_ = builder.BuildAndStart();
CHECK(server_ != nullptr);
// Unix sockets are 1 on success. gRPC sets -1 on failure.
CHECK_GT(bound_port, 0);
server_thread_.reset(
new GrpcServerThread(queue_.get(), thread_system_.get()));
CHECK(server_thread_->Start());
}
void GrpcServerTest::StopServer() {
if (server_thread_ != nullptr) {
server_->Shutdown();
server_thread_->Stop();
server_thread_.reset();
server_.reset();
}
}
// Have the server thread call Run() (never Cancel()) on the supplied
// function, ASAP. This call is thread-safe.
void GrpcServerTest::QueueFunctionForServerThread(Function* func) {
// Function to schedule an alarm that invokes CallRun ASAP on a supplied
// function.
class DelayedCallFunction : public Function {
public:
DelayedCallFunction(ThreadSystem* thread_system,
::grpc::CompletionQueue* queue, Function* function)
: mutex_(thread_system->NewMutex()), func_(function) {
// The annotations don't require it, but it's important that we take
// a lock before creating the alarm; We can immediately get a callback
// on the other thread, before alarm_ is properly populated.
ScopedMutex lock(mutex_.get());
alarm_.reset(
new ::grpc::Alarm(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), this));
}
~DelayedCallFunction() override {}
void Run() override {
ScopedMutex lock(mutex_.get());
func_->CallRun();
}
void Cancel() override {
ScopedMutex lock(mutex_.get());
func_->CallRun();
}
private:
std::unique_ptr<AbstractMutex> mutex_;
Function* func_ GUARDED_BY(mutex_);
std::unique_ptr<::grpc::Alarm> alarm_ GUARDED_BY(mutex_);
DISALLOW_COPY_AND_ASSIGN(DelayedCallFunction);
};
// Schedule the call. This will clean itself up.
new DelayedCallFunction(thread_system_.get(), queue_.get(), func);
}
GoogleString GrpcServerTest::ServerAddress() const {
return StrCat("unix:", GTestTempDir(), "/grpc.sock");
}
GrpcServerTest::GrpcServerThread::GrpcServerThread(
::grpc::CompletionQueue* queue, ThreadSystem* thread_system)
: Thread(thread_system, "grpc_test_server", ThreadSystem::kJoinable),
queue_(queue) {}
GrpcServerTest::GrpcServerThread::~GrpcServerThread() {}
void GrpcServerTest::GrpcServerThread::Stop() {
queue_->Shutdown();
this->Join();
}
void GrpcServerTest::GrpcServerThread::Run() {
CentralControllerRpcServer::MainLoop(queue_);
}
} // namespace net_instaweb