|  | // Copyright 2016 Google Inc. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  | // | 
|  | // Author: cheesy@google.com (Steve Hill) | 
|  |  | 
|  | #include <memory> | 
|  |  | 
|  | #include "base/logging.h" | 
|  | #include "pagespeed/controller/controller.pb.h" | 
|  | #include "pagespeed/controller/controller_grpc_mocks.h" | 
|  | #include "pagespeed/controller/expensive_operation_rpc_context.h" | 
|  | #include "pagespeed/kernel/base/function.h" | 
|  | #include "pagespeed/kernel/base/gmock.h" | 
|  | #include "pagespeed/kernel/base/gtest.h" | 
|  | #include "pagespeed/kernel/base/proto_matcher.h" | 
|  | #include "pagespeed/kernel/base/scoped_ptr.h" | 
|  | #include "pagespeed/kernel/base/string.h" | 
|  | #include "pagespeed/kernel/base/string_util.h" | 
|  | #include "pagespeed/kernel/base/message_handler_test_base.h" | 
|  | #include "pagespeed/kernel/thread/queued_worker_pool.h" | 
|  | #include "pagespeed/kernel/thread/sequence.h" | 
|  | #include "pagespeed/kernel/thread/worker_test_base.h" | 
|  | #include "pagespeed/kernel/util/grpc.h" | 
|  | #include "pagespeed/kernel/util/platform.h" | 
|  |  | 
|  | using testing::_; | 
|  | using testing::Eq; | 
|  | using testing::Invoke; | 
|  | using testing::InvokeWithoutArgs; | 
|  | using testing::IsEmpty; | 
|  | using testing::HasSubstr; | 
|  | using testing::Not; | 
|  | using testing::WithArgs; | 
|  | using testing::Return; | 
|  | using testing::SetArgPointee; | 
|  |  | 
|  | namespace net_instaweb { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | typedef MockReaderWriterT<ScheduleExpensiveOperationRequest, | 
|  | ScheduleExpensiveOperationResponse> | 
|  | MockReaderWriter; | 
|  |  | 
|  | class MockExpensiveOperationCallback : public ExpensiveOperationCallback { | 
|  | public: | 
|  | MockExpensiveOperationCallback(Sequence* s) : ExpensiveOperationCallback(s) { | 
|  | EXPECT_CALL(*this, RunImpl(_)).Times(0); | 
|  | EXPECT_CALL(*this, CancelImpl()).Times(0);; | 
|  | } | 
|  |  | 
|  | MOCK_METHOD1(RunImpl, void(scoped_ptr<ExpensiveOperationContext>* context)); | 
|  | MOCK_METHOD0(CancelImpl, void()); | 
|  | }; | 
|  |  | 
|  | class ExpensiveOperationRpcContextTest : public testing::Test { | 
|  | public: | 
|  | ExpensiveOperationRpcContextTest() | 
|  | : thread_system_(Platform::CreateThreadSystem()), | 
|  | worker_(2 /* max_workers */, "expensive_operation_test", | 
|  | thread_system_.get()), | 
|  | sequence_(worker_.NewSequence()), | 
|  | stub_(sequence_) {} | 
|  |  | 
|  | ~ExpensiveOperationRpcContextTest() { | 
|  | worker_.FreeSequence(sequence_); | 
|  | } | 
|  |  | 
|  | void StartRpcContext(MockReaderWriter* rw, | 
|  | MockExpensiveOperationCallback* cb) { | 
|  | stub_.ExpectAsyncScheduleExpensiveOperation(rw); | 
|  | // Now start the operation. This cleans up after itself. | 
|  | new ExpensiveOperationRpcContext(&stub_, nullptr /* queue */, | 
|  | thread_system_.get(), &handler_, cb); | 
|  | } | 
|  |  | 
|  | void ScheduleCallDoneAndDelete( | 
|  | scoped_ptr<ExpensiveOperationContext>* ctx) { | 
|  | sequence_->Add( | 
|  | MakeFunction(this, &ExpensiveOperationRpcContextTest::CallDoneAndDelete, | 
|  | ctx->release())); | 
|  | } | 
|  |  | 
|  | void CallDoneAndDelete(ExpensiveOperationContext* context) { | 
|  | context->Done(); | 
|  | delete context; | 
|  | } | 
|  |  | 
|  | void ExpectFinishWithDebugHack(MockReaderWriter* rw, ::grpc::Status status, | 
|  | WorkerTestBase::SyncPoint* sync) { | 
|  | #ifndef NDEBUG | 
|  | if (!status.ok()) { | 
|  | // This is a pretty nasty hack. The code calls LOG(DFATAL) when the | 
|  | // error_code != OK. Unfortunately, I can't make either EXPECT_DEATH | 
|  | // or EXPECT_DFATAL work properly because threads. So, we hack the tests | 
|  | // in debug builds to avoid the DFATAL path. | 
|  | LOG(WARNING) << "Squashing gRPC error status " << status.error_code() | 
|  | << " to OK. Consider re-running this test under opt."; | 
|  | status = ::grpc::Status(::grpc::StatusCode::OK, status.error_message()); | 
|  | } | 
|  | #endif | 
|  | if (sync != nullptr) { | 
|  | rw->ExpectFinishAndNotify(status, sync); | 
|  | } else { | 
|  | rw->ExpectFinish(status); | 
|  | } | 
|  | } | 
|  |  | 
|  | protected: | 
|  | std::unique_ptr<ThreadSystem> thread_system_; | 
|  | QueuedWorkerPool worker_; | 
|  | QueuedWorkerPool::Sequence* sequence_; | 
|  | MockCentralControllerRpcServiceStub stub_; | 
|  | TestMessageHandler handler_; | 
|  | }; | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, SuccessfulRequest) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  | { | 
|  | ::testing::InSequence s; | 
|  |  | 
|  | // First, the context writes the initial request, which is an empty proto. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // Next, the context attempts to read a response back from the server. Here | 
|  | // we tell it that it's OK to continue. | 
|  | rw->ExpectRead("ok_to_proceed: true"); | 
|  |  | 
|  | // Context was told it was OK to run, so it calls Run on the callback. | 
|  | EXPECT_CALL(*cb, RunImpl(_)).Times(1); | 
|  |  | 
|  | // When the callback completes, write a "Did it!" message back to the | 
|  | // server. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // And now call Finish and wait for the server to tell us it's done. | 
|  | rw->ExpectFinishAndNotify(::grpc::Status(), &sync); | 
|  | } | 
|  |  | 
|  | StartRpcContext(rw, cb); | 
|  | sync.Wait(); | 
|  | } | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, SuccessfulRequestWithPointerSteal) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  | { | 
|  | ::testing::InSequence s; | 
|  |  | 
|  | // First, the context writes the initial request, which is an empty proto. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // Next, the context attempts to read a response back from the server. Here | 
|  | // we tell it that it's OK to continue. | 
|  | rw->ExpectRead("ok_to_proceed: true"); | 
|  |  | 
|  | // Context was told it was OK to run, so it calls Run on the callback. | 
|  | // We detach the context supplied to Run and schedule it to be marked | 
|  | // Done() via a subsequent callback. | 
|  | EXPECT_CALL(*cb, RunImpl(_)) | 
|  | .WillOnce(Invoke( | 
|  | this, | 
|  | &ExpensiveOperationRpcContextTest::ScheduleCallDoneAndDelete)); | 
|  |  | 
|  | // When the callback completes, write a "Did it!" message back to the | 
|  | // server. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // And now call Finish and wait for the server to tell us it's done. | 
|  | rw->ExpectFinishAndNotify(::grpc::Status(), &sync); | 
|  | } | 
|  |  | 
|  | StartRpcContext(rw, cb); | 
|  | sync.Wait(); | 
|  | } | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, UnsuccessfulRequest) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  | { | 
|  | ::testing::InSequence s; | 
|  |  | 
|  | // First, the context writes the initial request, which is an empty proto. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // Next, the context attempts to read a response back from the server. Here | 
|  | // we tell it that it's not OK to continue. | 
|  | rw->ExpectRead("ok_to_proceed: false"); | 
|  |  | 
|  | // Context was told it was not OK to run, so it calls Cancel on the | 
|  | // callback. | 
|  | EXPECT_CALL(*cb, CancelImpl()) | 
|  | .WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify)); | 
|  |  | 
|  | // If the server returns not OK, we don't actually call Finish, just close | 
|  | // the connection. So no EXPECT_CALL for Finish here. | 
|  | } | 
|  |  | 
|  | StartRpcContext(rw, cb); | 
|  | sync.Wait(); | 
|  | } | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, InitFailed) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  |  | 
|  | // Configure the stub to return rw in response to the RpcContext | 
|  | // initiating a request, but then indicate that the gRPC init failed. | 
|  | stub_.ExpectAsyncScheduleExpensiveOperationFailure(rw); | 
|  |  | 
|  | ExpectFinishWithDebugHack( | 
|  | rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"), | 
|  | nullptr /* notify */); | 
|  |  | 
|  | // Controller is AFK so we expect Cancel on the callback. | 
|  | EXPECT_CALL(*cb, CancelImpl()) | 
|  | .WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify)); | 
|  |  | 
|  | // Start the operation. This cleans up after itself. | 
|  | new ExpensiveOperationRpcContext(&stub_, nullptr /* queue */, | 
|  | thread_system_.get(), &handler_, cb); | 
|  |  | 
|  | sync.Wait(); | 
|  |  | 
|  | ASSERT_THAT(handler_.messages(), Not(IsEmpty())); | 
|  | #ifdef NDEBUG | 
|  | EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup")); | 
|  | #endif | 
|  | } | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, FinishFailed) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  |  | 
|  | // Configure the stub to return rw in response to the RpcContext | 
|  | // initiating a request, but then indicate that the gRPC init failed. | 
|  | stub_.ExpectAsyncScheduleExpensiveOperationFailure(rw); | 
|  |  | 
|  | // Now what we're actually testing; Make Finish() itself fail. | 
|  | // Not clear how that would actually happen in practice. | 
|  | rw->ExpectFinishFailure(); | 
|  |  | 
|  | // Controller is AFK so we expect Cancel on the callback. | 
|  | EXPECT_CALL(*cb, CancelImpl()) | 
|  | .WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify)); | 
|  |  | 
|  | // Start the operation. This cleans up after itself. | 
|  | new ExpensiveOperationRpcContext(&stub_, nullptr /* queue */, | 
|  | thread_system_.get(), &handler_, cb); | 
|  |  | 
|  | sync.Wait(); | 
|  |  | 
|  | ASSERT_THAT(handler_.messages(), Not(IsEmpty())); | 
|  | EXPECT_THAT(handler_.messages().back(), HasSubstr("Finish failed")); | 
|  | } | 
|  |  | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, FirstWriteFailed) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  | { | 
|  | ::testing::InSequence s; | 
|  |  | 
|  | // Pretend that the initial Write failed. | 
|  | rw->ExpectWriteFailure(EqualsProto("")); | 
|  |  | 
|  | ExpectFinishWithDebugHack( | 
|  | rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"), | 
|  | nullptr /* notify */); | 
|  |  | 
|  | // Controller is AFK so we expect Cancel on the callback. | 
|  | EXPECT_CALL(*cb, CancelImpl()) | 
|  | .WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify)); | 
|  | } | 
|  |  | 
|  | StartRpcContext(rw, cb); | 
|  | sync.Wait(); | 
|  |  | 
|  | ASSERT_THAT(handler_.messages(), Not(IsEmpty())); | 
|  | #ifdef NDEBUG | 
|  | EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup")); | 
|  | #endif | 
|  | } | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, ReadFailed) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  | { | 
|  | ::testing::InSequence s; | 
|  |  | 
|  | // First, the context writes the initial request, which is an empty proto. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // Now pretend that the gRPC call to Read failed. | 
|  | rw->ExpectReadFailure(); | 
|  |  | 
|  | ExpectFinishWithDebugHack( | 
|  | rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"), | 
|  | nullptr /* notify */); | 
|  |  | 
|  | // Controller is AFK so we expect Cancel on the callback. | 
|  | EXPECT_CALL(*cb, CancelImpl()) | 
|  | .WillOnce(Invoke(&sync, &WorkerTestBase::SyncPoint::Notify)); | 
|  | } | 
|  |  | 
|  | StartRpcContext(rw, cb); | 
|  | sync.Wait(); | 
|  |  | 
|  | ASSERT_THAT(handler_.messages(), Not(IsEmpty())); | 
|  | #ifdef NDEBUG | 
|  | EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup")); | 
|  | #endif | 
|  | } | 
|  |  | 
|  | TEST_F(ExpensiveOperationRpcContextTest, SecondWriteFailed) { | 
|  | WorkerTestBase::SyncPoint sync(thread_system_.get()); | 
|  | MockExpensiveOperationCallback* cb = | 
|  | new MockExpensiveOperationCallback(sequence_); | 
|  | MockReaderWriter* rw = new MockReaderWriter(sequence_); | 
|  | { | 
|  | ::testing::InSequence s; | 
|  |  | 
|  | // First, the context writes the initial request, which is an empty proto. | 
|  | rw->ExpectWrite(EqualsProto("")); | 
|  |  | 
|  | // Next, the context attempts to read a response back from the server. Here | 
|  | // we tell it that it's OK to continue. | 
|  | rw->ExpectRead("ok_to_proceed: true"); | 
|  |  | 
|  | // Context was told it was OK to run, so it calls Run on the callback. | 
|  | EXPECT_CALL(*cb, RunImpl(_)).Times(1); | 
|  |  | 
|  | // When the callback completes, try to write the "Did it!" message back to | 
|  | // the server, but pretend it failed. | 
|  | rw->ExpectWriteFailure(EqualsProto("")); | 
|  |  | 
|  | ExpectFinishWithDebugHack( | 
|  | rw, ::grpc::Status(::grpc::StatusCode::ABORTED, "hangup"), &sync); | 
|  | } | 
|  |  | 
|  | StartRpcContext(rw, cb); | 
|  | sync.Wait(); | 
|  |  | 
|  | ASSERT_THAT(handler_.messages(), Not(IsEmpty())); | 
|  | #ifdef NDEBUG | 
|  | EXPECT_THAT(handler_.messages().back(), HasSubstr("hangup")); | 
|  | #endif | 
|  | } | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | }  // namespace net_instaweb |