blob: 2a07c92bcaa4f9f7d9a3b6db7e27ef6b030ef078 [file] [log] [blame]
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (Steve Hill)
#include <memory>
#include "base/logging.h"
#include "pagespeed/controller/controller.pb.h"
#include "pagespeed/controller/controller_grpc_mocks.h"
#include "pagespeed/controller/schedule_rewrite_rpc_context.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/gmock.h"
#include "pagespeed/kernel/base/gtest.h"
#include "pagespeed/kernel/base/proto_matcher.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/message_handler_test_base.h"
#include "pagespeed/kernel/thread/queued_worker_pool.h"
#include "pagespeed/kernel/thread/sequence.h"
#include "pagespeed/kernel/thread/worker_test_base.h"
#include "pagespeed/kernel/util/grpc.h"
#include "pagespeed/kernel/util/platform.h"
using testing::_;
using testing::Eq;
using testing::Invoke;
using testing::InvokeWithoutArgs;
using testing::IsEmpty;
using testing::HasSubstr;
using testing::Not;
using testing::WithArgs;
using testing::Return;
using testing::SetArgPointee;
namespace net_instaweb {
namespace {
typedef MockReaderWriterT<ScheduleRewriteRequest,
ScheduleRewriteResponse>
MockReaderWriter;
class MockScheduleRewriteCallback : public ScheduleRewriteCallback {
public:
MockScheduleRewriteCallback(const GoogleString& key, Sequence* s)
: ScheduleRewriteCallback(key, s) {
EXPECT_CALL(*this, RunImpl(_)).Times(0);
EXPECT_CALL(*this, CancelImpl()).Times(0);;
}
MOCK_METHOD1(RunImpl, void(scoped_ptr<ScheduleRewriteContext>* context));
MOCK_METHOD0(CancelImpl, void());
};
class ScheduleRewriteRpcContextTest : public testing::Test {
public:
ScheduleRewriteRpcContextTest()
: thread_system_(Platform::CreateThreadSystem()),
worker_(2 /* max_workers */, "schedule_rewrite_operation_test",
thread_system_.get()),
sequence_(worker_.NewSequence()),
stub_(sequence_) {}
~ScheduleRewriteRpcContextTest() {
worker_.FreeSequence(sequence_);
}
void StartRpcContext(MockReaderWriter* rw,
MockScheduleRewriteCallback* cb) {
stub_.ExpectAsyncScheduleRewrite(rw);
// Now start the operation. This cleans up after itself.
new ScheduleRewriteRpcContext(&stub_, nullptr /* queue */,
thread_system_.get(), &handler_, cb);
}
void ScheduleCallSuccessAndDelete(
scoped_ptr<ScheduleRewriteContext>* ctx) {
sequence_->Add(
MakeFunction(this, &ScheduleRewriteRpcContextTest::CallSuccessAndDelete,
ctx->release()));
}
void CallSuccessAndDelete(ScheduleRewriteContext* context) {
context->MarkSucceeded();
delete context;
}
void ScheduleCallFailedAndDelete(
scoped_ptr<ScheduleRewriteContext>* ctx) {
sequence_->Add(
MakeFunction(this, &ScheduleRewriteRpcContextTest::CallFailedAndDelete,
ctx->release()));
}
void CallFailedAndDelete(ScheduleRewriteContext* context) {
context->MarkFailed();
delete context;
}
void ExpectFinishWithDebugHack(MockReaderWriter* rw, ::grpc::Status status,
WorkerTestBase::SyncPoint* sync) {
#ifndef NDEBUG
if (!status.ok()) {
// This is a pretty nasty hack. The code calls LOG(DFATAL) when the
// error_code != OK. Unfortunately, I can't make either EXPECT_DEATH
// or EXPECT_DFATAL work properly because threads. So, we hack the tests
// in debug builds to avoid the DFATAL path.
LOG(WARNING) << "Squashing gRPC error status " << status.error_code()
<< " to OK. Consider re-running this test under opt.";
status = ::grpc::Status(::grpc::StatusCode::OK, status.error_message());
}
#endif
if (sync != nullptr) {
rw->ExpectFinishAndNotify(status, sync);
} else {
rw->ExpectFinish(status);
}
}
protected:
std::unique_ptr<ThreadSystem> thread_system_;
QueuedWorkerPool worker_;
QueuedWorkerPool::Sequence* sequence_;
MockCentralControllerRpcServiceStub stub_;
TestMessageHandler handler_;
};
TEST_F(ScheduleRewriteRpcContextTest, SuccessfulRequest) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("a", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// First, the context writes the initial request which contains the key it
// wants to rewrite.
rw->ExpectWrite(EqualsProto("key: \"a\""));
// Next, the context attempts to read a response back from the server. Here
// we tell it that it's OK to continue.
rw->ExpectRead("ok_to_proceed: true");
// Context was told it was OK to run, so it calls Run on the callback.
EXPECT_CALL(*cb, RunImpl(_)).Times(1);
// When the callback completes, write a "Did it!" message back to the
// server.
rw->ExpectWrite(EqualsProto("status: SUCCESS"));
// And now call Finish and wait for the server to tell us it's done.
rw->ExpectFinishAndNotify(::grpc::Status(), &sync);
}
StartRpcContext(rw, cb);
sync.Wait();
}
TEST_F(ScheduleRewriteRpcContextTest, SuccessfulRequestWithExplicitSuccess) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("b", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// First, the context writes the initial request which contains the key it
// wants to rewrite.
rw->ExpectWrite(EqualsProto("key: \"b\""));
// Next, the context attempts to read a response back from the server. Here
// we tell it that it's OK to continue.
rw->ExpectRead("ok_to_proceed: true");
// Context was told it was OK to run, so it calls Run on the callback.
// We detach the context supplied to Run and schedule it to be marked
// Done() via a subsequent callback.
EXPECT_CALL(*cb, RunImpl(_))
.WillOnce(Invoke(
this,
&ScheduleRewriteRpcContextTest::ScheduleCallSuccessAndDelete));
// When the callback completes, write a "Did it!" message back to the
// server.
rw->ExpectWrite(EqualsProto("status: SUCCESS"));
// And now call Finish and wait for the server to tell us it's done.
rw->ExpectFinishAndNotify(::grpc::Status(), &sync);
}
StartRpcContext(rw, cb);
sync.Wait();
}
TEST_F(ScheduleRewriteRpcContextTest, SuccessfulRequestWithExplicitFailure) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("b", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// First, the context writes the initial request which contains the key it
// wants to rewrite.
rw->ExpectWrite(EqualsProto("key: \"b\""));
// Next, the context attempts to read a response back from the server. Here
// we tell it that it's OK to continue.
rw->ExpectRead("ok_to_proceed: true");
// Context was told it was OK to run, so it calls Run on the callback.
// We detach the context supplied to Run and schedule it to be marked
// Done() via a subsequent callback.
EXPECT_CALL(*cb, RunImpl(_))
.WillOnce(Invoke(
this, &ScheduleRewriteRpcContextTest::ScheduleCallFailedAndDelete));
// When the callback completes, inform the controller that the rewrite
// failed.
rw->ExpectWrite(EqualsProto("status: FAILED"));
// And now call Finish and wait for the server to tell us it's done.
rw->ExpectFinishAndNotify(::grpc::Status(), &sync);
}
StartRpcContext(rw, cb);
sync.Wait();
}
TEST_F(ScheduleRewriteRpcContextTest, UnsuccessfulRequest) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("hello", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// First, the context writes the initial request which contains the key it
// wants to rewrite.
rw->ExpectWrite(EqualsProto("key: \"hello\""));
// Next, the context attempts to read a response back from the server. Here
// we tell it that it's not OK to continue.
rw->ExpectRead("ok_to_proceed: false");
// Context was told it was not OK to run, so it calls Cancel on the
// callback.
EXPECT_CALL(*cb, CancelImpl())
.WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify));
// If the server returns not OK, we don't actually call Finish, just close
// the connection. So no EXPECT_CALL for Finish here.
}
StartRpcContext(rw, cb);
sync.Wait();
}
TEST_F(ScheduleRewriteRpcContextTest, InitFailed) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("key", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
// Configure the stub to return rw in response to the RpcContext
// initiating a request, but then indicate that the gRPC init failed.
stub_.ExpectAsyncScheduleRewriteFailure(rw);
ExpectFinishWithDebugHack(
rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"),
nullptr /* notify */);
// Controller is AFK so we expect Cancel on the callback.
EXPECT_CALL(*cb, CancelImpl())
.WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify));
// Start the operation. This cleans up after itself.
new ScheduleRewriteRpcContext(&stub_, nullptr /* queue */,
thread_system_.get(), &handler_, cb);
sync.Wait();
ASSERT_THAT(handler_.messages(), Not(IsEmpty()));
#ifdef NDEBUG
EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup"));
#endif
}
TEST_F(ScheduleRewriteRpcContextTest, FinishFailed) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("key", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
// Configure the stub to return rw in response to the RpcContext
// initiating a request, but then indicate that the gRPC init failed.
stub_.ExpectAsyncScheduleRewriteFailure(rw);
// What we're actually testing; Make Finish() itself fail.
// Not clear how that would actually happen in practice.
rw->ExpectFinishFailure();
// Controller is AFK so we expect Cancel on the callback.
EXPECT_CALL(*cb, CancelImpl())
.WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify));
// Start the operation. This cleans up after itself.
new ScheduleRewriteRpcContext(&stub_, nullptr /* queue */,
thread_system_.get(), &handler_, cb);
sync.Wait();
ASSERT_THAT(handler_.messages(), Not(IsEmpty()));
EXPECT_THAT(handler_.messages().back(), HasSubstr("Finish failed"));
}
TEST_F(ScheduleRewriteRpcContextTest, FirstWriteFailed) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("key", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// Pretend that the initial Write failed.
rw->ExpectWriteFailure(EqualsProto("key: \"key\""));
ExpectFinishWithDebugHack(
rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"),
nullptr /* notify */);
// Controller is AFK so we expect Cancel on the callback.
EXPECT_CALL(*cb, CancelImpl())
.WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify));
}
StartRpcContext(rw, cb);
sync.Wait();
ASSERT_THAT(handler_.messages(), Not(IsEmpty()));
#ifdef NDEBUG
EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup"));
#endif
}
TEST_F(ScheduleRewriteRpcContextTest, ReadFailed) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("a", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// First, the context writes the initial request which contains the key it
// wants to rewrite.
rw->ExpectWrite(EqualsProto("key: \"a\""));
// Now pretend that the gRPC call to Read failed.
rw->ExpectReadFailure();
ExpectFinishWithDebugHack(
rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"),
nullptr /* notify */);
// Controller is AFK so we expect Cancel on the callback.
EXPECT_CALL(*cb, CancelImpl())
.WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify));
}
StartRpcContext(rw, cb);
sync.Wait();
ASSERT_THAT(handler_.messages(), Not(IsEmpty()));
#ifdef NDEBUG
EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup"));
#endif
}
TEST_F(ScheduleRewriteRpcContextTest, SecondWriteFailed) {
WorkerTestBase::SyncPoint sync(thread_system_.get());
MockScheduleRewriteCallback* cb =
new MockScheduleRewriteCallback("a", sequence_);
MockReaderWriter* rw = new MockReaderWriter(sequence_);
{
::testing::InSequence s;
// First, the context writes the initial request which contains the key it
// wants to rewrite.
rw->ExpectWrite(EqualsProto("key: \"a\""));
// Next, the context attempts to read a response back from the server. Here
// we tell it that it's OK to continue.
rw->ExpectRead("ok_to_proceed: true");
// Context was told it was OK to run, so it calls Run on the callback.
EXPECT_CALL(*cb, RunImpl(_)).Times(1);
// When the callback completes, try to write the "Did it!" message back to
// the server, but pretend it failed.
rw->ExpectWriteFailure(EqualsProto("status: SUCCESS"));
ExpectFinishWithDebugHack(
rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"), &sync);
}
StartRpcContext(rw, cb);
sync.Wait();
ASSERT_THAT(handler_.messages(), Not(IsEmpty()));
#ifdef NDEBUG
EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup"));
#endif
}
} // namespace
} // namespace net_instaweb