blob: 9677df6d86e0466fcdd8c1859e55a298a654f017 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <gmock/gmock.h>
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/gtest.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
#include <stout/tests/utils.hpp>
#include "grpc_tests.grpc.pb.h"
#include "grpc_tests.pb.h"
namespace client = process::grpc::client;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using ::grpc::ChannelArguments;
using ::grpc::InsecureServerCredentials;
using ::grpc::Server;
using ::grpc::ServerBuilder;
using ::grpc::ServerContext;
using ::grpc::Status;
using process::Future;
using process::Promise;
using process::grpc::StatusError;
using process::grpc::client::CallOptions;
using testing::_;
using testing::DoAll;
using testing::InvokeWithoutArgs;
using testing::Return;
namespace tests {
class PingPongServer : public PingPong::Service
{
public:
PingPongServer()
{
EXPECT_CALL(*this, Send(_, _, _))
.WillRepeatedly(Return(Status::OK));
}
MOCK_METHOD3(Send, Status(ServerContext*, const Ping* ping, Pong* pong));
Try<client::Connection> startup(const Option<string>& address = None())
{
ServerBuilder builder;
if (address.isSome()) {
builder.AddListeningPort(address.get(), InsecureServerCredentials());
}
builder.RegisterService(this);
server = builder.BuildAndStart();
if (!server) {
return Error("Unable to start a gRPC server.");
}
return address.isSome()
? client::Connection(address.get())
: client::Connection(server->InProcessChannel(ChannelArguments()));
}
Try<Nothing> shutdown()
{
server->Shutdown();
server->Wait();
return Nothing();
}
private:
unique_ptr<Server> server;
};
class GRPCClientTest : public TemporaryDirectoryTest
{
protected:
// TODO(chhsiao): Consider removing this once we have a way to get a
// connection before the server starts on Windows. See the
// `DiscardedBeforeServerStarted` test below.
string server_address() const
{
return "unix://" + path::join(sandbox.get(), "socket");
}
static CallOptions call_options()
{
CallOptions options;
options.wait_for_ready = true;
options.timeout = Milliseconds(100);
return options;
}
};
// This test verifies that a gRPC future is properly set once the
// given call is processed by the server.
TEST_F(GRPCClientTest, Success)
{
PingPongServer server;
Try<client::Connection> connection = server.startup();
ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
ASSERT_SOME(server.shutdown());
}
// This test verifies that we can have multiple outstanding gRPC calls.
TEST_F(GRPCClientTest, ConcurrentRPCs)
{
PingPongServer server;
Try<client::Connection> connection = server.startup();
ASSERT_SOME(connection);
shared_ptr<Promise<Nothing>> processed1(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> processed2(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> processed3(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> pinged(new Promise<Nothing>());
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(DoAll(
InvokeWithoutArgs([=] {
processed1->set(Nothing());
AWAIT_READY(pinged->future());
}),
Return(Status::OK)))
.WillOnce(DoAll(
InvokeWithoutArgs([=] {
processed2->set(Nothing());
AWAIT_READY(pinged->future());
}),
Return(Status::OK)))
.WillOnce(DoAll(
InvokeWithoutArgs([=] {
processed3->set(Nothing());
AWAIT_READY(pinged->future());
}),
Return(Status::OK)));
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong1 = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
Future<Try<Pong, StatusError>> pong2 = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
Future<Try<Pong, StatusError>> pong3 = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
AWAIT_READY(processed3->future());
pinged->set(Nothing());
AWAIT_ASSERT_READY(pong1);
EXPECT_SOME(pong1.get());
AWAIT_ASSERT_READY(pong2);
EXPECT_SOME(pong2.get());
AWAIT_ASSERT_READY(pong3);
EXPECT_SOME(pong3.get());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
ASSERT_SOME(server.shutdown());
}
// This test verifies that a gRPC future is set with an error when the server
// responds with a status other than OK for the given call.
TEST_F(GRPCClientTest, StatusError)
{
PingPongServer server;
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(Return(Status::CANCELLED));
Try<client::Connection> connection = server.startup();
ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::CANCELLED, pong->error().status.error_code());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
ASSERT_SOME(server.shutdown());
}
// This test verifies that a gRPC future can be discarded before the
// server processes the given call.
TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
{
PingPongServer server;
EXPECT_CALL(server, Send(_, _, _))
.Times(0);
client::Connection connection(server_address());
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection,
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
pong.discard();
ASSERT_SOME(server.startup(server_address()));
AWAIT_EXPECT_DISCARDED(pong);
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
ASSERT_SOME(server.shutdown());
}
// This test verifies that a gRPC future can be discarded before and
// when the server is processing the given call.
TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
{
PingPongServer server;
shared_ptr<Promise<Nothing>> processed(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> discarded(new Promise<Nothing>());
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(DoAll(
InvokeWithoutArgs([=] {
processed->set(Nothing());
AWAIT_READY(discarded->future());
}),
Return(Status::OK)));
Try<client::Connection> connection = server.startup();
ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
AWAIT_READY(processed->future());
pong.discard();
discarded->set(Nothing());
AWAIT_EXPECT_DISCARDED(pong);
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
ASSERT_SOME(server.shutdown());
}
// This test verifies that a gRPC future is set with an error when the runtime
// is shut down before the server responds.
TEST_F(GRPCClientTest, ClientShutdown)
{
PingPongServer server;
shared_ptr<Promise<Nothing>> processed(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> shutdown(new Promise<Nothing>());
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(DoAll(
InvokeWithoutArgs([=] {
processed->set(Nothing());
AWAIT_READY(shutdown->future());
}),
Return(Status::OK)));
Try<client::Connection> connection = server.startup();
ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
AWAIT_READY(processed->future());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
shutdown->set(Nothing());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
ASSERT_SOME(server.shutdown());
}
// This test verifies that a gRPC future is set with an error when it is unable
// to connect to the server.
TEST_F(GRPCClientTest, ServerUnreachable)
{
client::Connection connection("nosuchhost");
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection,
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
}
// This test verifies that a gRPC future is set with an error when the server
// hangs when processing the given call.
TEST_F(GRPCClientTest, ServerTimeout)
{
PingPongServer server;
shared_ptr<Promise<Nothing>> done(new Promise<Nothing>());
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(DoAll(
InvokeWithoutArgs([=] {
AWAIT_READY(done->future());
}),
Return(Status::OK)));
Try<client::Connection> connection = server.startup();
ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong = runtime.call(
connection.get(),
GRPC_CLIENT_METHOD(PingPong, Send),
Ping(),
call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
done->set(Nothing());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
ASSERT_SOME(server.shutdown());
}
} // namespace tests {