blob: 75f0f7700b224e1241c426b220a22d35a7dd302a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pagespeed/controller/grpc_server_test.h"
#include <sys/stat.h>
#include <memory>
#include "base/logging.h"
#include "pagespeed/controller/central_controller_rpc_server.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/thread.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/util/grpc.h"
#include "pagespeed/kernel/util/platform.h"
#include <grpc++/alarm.h>
namespace net_instaweb {
// Fixture for testing gRPC servers. Contains an event dispatcher thread, plus
// some other bits you'll need.
GrpcServerTest::GrpcServerTest()
: thread_system_(Platform::CreateThreadSystem()) {}
GrpcServerTest::~GrpcServerTest() { StopServer(); }
void GrpcServerTest::SetUp() {
// This doesn't seem to be created automatically!
mkdir(GTestTempDir().c_str(), 0755);
::grpc::ServerBuilder builder;
int bound_port = 0;
builder.AddListeningPort(ServerAddress(), ::grpc::InsecureServerCredentials(),
&bound_port);
queue_ = builder.AddCompletionQueue();
RegisterServices(&builder);
server_ = builder.BuildAndStart();
CHECK(server_ != nullptr);
// Unix sockets are 1 on success. gRPC sets -1 on failure.
CHECK_GT(bound_port, 0);
server_thread_.reset(
new GrpcServerThread(queue_.get(), thread_system_.get()));
CHECK(server_thread_->Start());
}
void GrpcServerTest::StopServer() {
if (server_thread_ != nullptr) {
server_->Shutdown();
server_thread_->Stop();
server_thread_.reset();
server_.reset();
}
}
// Have the server thread call Run() (never Cancel()) on the supplied
// function, ASAP. This call is thread-safe.
void GrpcServerTest::QueueFunctionForServerThread(Function* func) {
// Function to schedule an alarm that invokes CallRun ASAP on a supplied
// function.
class DelayedCallFunction : public Function {
public:
DelayedCallFunction(ThreadSystem* thread_system,
::grpc::CompletionQueue* queue, Function* function)
: mutex_(thread_system->NewMutex()), func_(function) {
// The annotations don't require it, but it's important that we take
// a lock before creating the alarm; We can immediately get a callback
// on the other thread, before alarm_ is properly populated.
ScopedMutex lock(mutex_.get());
alarm_.reset(
new ::grpc::Alarm(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), this));
}
~DelayedCallFunction() override {}
void Run() override {
ScopedMutex lock(mutex_.get());
func_->CallRun();
}
void Cancel() override {
ScopedMutex lock(mutex_.get());
func_->CallRun();
}
private:
std::unique_ptr<AbstractMutex> mutex_;
Function* func_ GUARDED_BY(mutex_);
std::unique_ptr<::grpc::Alarm> alarm_ GUARDED_BY(mutex_);
DISALLOW_COPY_AND_ASSIGN(DelayedCallFunction);
};
// Schedule the call. This will clean itself up.
new DelayedCallFunction(thread_system_.get(), queue_.get(), func);
}
GoogleString GrpcServerTest::ServerAddress() const {
return StrCat("unix:", GTestTempDir(), "/grpc.sock");
}
GrpcServerTest::GrpcServerThread::GrpcServerThread(
::grpc::CompletionQueue* queue, ThreadSystem* thread_system)
: Thread(thread_system, "grpc_test_server", ThreadSystem::kJoinable),
queue_(queue) {}
GrpcServerTest::GrpcServerThread::~GrpcServerThread() {}
void GrpcServerTest::GrpcServerThread::Stop() {
queue_->Shutdown();
this->Join();
}
void GrpcServerTest::GrpcServerThread::Run() {
CentralControllerRpcServer::MainLoop(queue_);
}
} // namespace net_instaweb