blob: 46810f11b3067804b73cc0185e800039d4631535 [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "Atomic.h"
#include "ClientNamenodeProtocol.pb.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "IpcConnectionContext.pb.h"
#include "Logger.h"
#include "MockBufferedSocketReader.h"
#include "MockRpcClient.h"
#include "MockRpcRemoteCall.h"
#include "MockSocket.h"
#include "rpc/RpcChannel.h"
#include "RpcHeader.pb.h"
#include "TestUtil.h"
#include "Thread.h"
#include "UnitTestUtils.h"
#include <google/protobuf/io/coded_stream.h>
using namespace Hdfs;
using namespace Hdfs::Mock;
using namespace Hdfs::Internal;
using namespace testing;
using namespace ::google::protobuf;
using namespace ::google::protobuf::io;
RpcChannelKey BuildKey() {
RpcAuth auth;
RpcProtocolInfo protocol(0, "test", "kind");
RpcServerInfo server("unknown token server", "unknown server", "unknown port");
Config conf;
SessionConfig session(conf);
RpcConfig rpcConf(session);
return RpcChannelKey(auth, protocol, server, rpcConf);
}
static RpcConfig & GetConfig(RpcChannelKey & key) {
return const_cast<RpcConfig &>(key.getConf());
}
static uint32_t GetCallId(uint32_t start) {
static Hdfs::Internal::atomic<uint32_t> id(start);
return id++;
}
static void BuildResponse(uint32_t id,
RpcResponseHeaderProto::RpcStatusProto status, const char * errClass,
const char * errMsg, ::google::protobuf::Message * resp, std::vector<char> & output) {
WriteBuffer buffer;
RpcResponseHeaderProto respHeader;
respHeader.set_callid(id);
respHeader.set_status(status);
if (errClass != NULL) {
respHeader.set_exceptionclassname(errClass);
}
if (errMsg != NULL) {
respHeader.set_errormsg(errMsg);
}
int size = respHeader.ByteSize();
int totalSize = size + CodedOutputStream::VarintSize32(size);
if (resp != NULL) {
size = resp->ByteSize();
totalSize += size + CodedOutputStream::VarintSize32(size);
}
buffer.writeBigEndian(totalSize);
size = respHeader.ByteSize();
buffer.writeVarint32(size);
respHeader.SerializeToArray(buffer.alloc(size), size);
if (resp != NULL) {
size = resp->ByteSize();
buffer.writeVarint32(size);
resp->SerializeToArray(buffer.alloc(size), size);
}
output.resize(buffer.getDataSize(0));
memcpy(&output[0], buffer.getBuffer(0), buffer.getDataSize(0));
}
TEST(TestRpcChannel, TestConnectFailure) {
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
GetConfig(key).setMaxRetryOnConnect(2);
EXPECT_CALL(*sock, connect(An<const char *>(), An<const char *>(), _)).Times(2).WillOnce(
InvokeWithoutArgs(bind(&InvokeThrow<Hdfs::HdfsNetworkException>,
"test expected connect failed",
static_cast<bool *>(NULL)))).WillOnce(
InvokeWithoutArgs(
bind(&InvokeThrow<Hdfs::HdfsTimeoutException>,
"test expected connect timeout",
static_cast<bool *>(NULL))));
EXPECT_CALL(*sock, close()).Times(2);
RpcChannelImpl channel(key, sock, in, client);
EXPECT_THROW(DebugException(channel.connect()), Hdfs::HdfsTimeoutException);
}
TEST(TestRpcChannel, TestConnectSuccess) {
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
GetConfig(key).setMaxRetryOnConnect(1);
EXPECT_CALL(*sock, connect(An<const char *>(), An<const char *>(), _)).Times(1);
EXPECT_CALL(*sock, close()).Times(1);
EXPECT_CALL(*sock, setNoDelay(_)).Times(1);
EXPECT_CALL(*sock, writeFully(_, _, _)).Times(2);
RpcChannelImpl channel(key, sock, in, client);
EXPECT_NO_THROW(DebugException(channel.connect()));
EXPECT_TRUE(channel.available);
}
TEST(TestRpcChannel, TestCancelPendingCall) {
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
RpcRemoteCallPtr call = RpcRemoteCallPtr(
new MockRpcRemoteCall(RpcCall(false, "", NULL, NULL), 0, ""));
EXPECT_CALL(*sock, close()).Times(1);
EXPECT_CALL(*static_cast<MockRpcRemoteCall *>(call.get()), cancel(_)).Times(1);
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
channel.pendingCalls[call->getIdentity()] = call;
try {
THROW(HdfsNetworkException, "expected exception");
} catch (...) {
Hdfs::Internal::lock_guard<Hdfs::Internal::mutex> lock(channel.writeMut);
EXPECT_NO_THROW(DebugException(channel.cleanupPendingCalls(Hdfs::current_exception())));
}
}
TEST(TestRpcChannel, TestCheckResponse_InvalidResponse) {
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
EXPECT_CALL(*in, readBigEndianInt32(_)).Times(1).WillOnce(Return(100));
EXPECT_CALL(*in, readVarint32(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*in, readFully(_, _, _)).Times(1);
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
EXPECT_CALL(*sock, close()).Times(1);
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
EXPECT_THROW(channel.readOneResponse(false), HdfsRpcException);
}
TEST(TestRpcChannel, TestCheckResponse_InvalidCallId) {
std::vector<char> resp;
BuildResponse(0, RpcResponseHeaderProto_RpcStatusProto_SUCCESS, NULL, NULL, NULL, resp);
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
BufferedSocketReaderImpl * in = new BufferedSocketReaderImpl(*sock, resp);
EXPECT_CALL(*sock, close()).Times(1);
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
EXPECT_THROW(channel.readOneResponse(false), HdfsRpcException);
}
TEST(TestRpcChannel, TestCheckResponse_SendPing) {
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
GetConfig(key).setMaxRetryOnConnect(1);
GetConfig(key).setMaxIdleTime(1000);
GetConfig(key).setPingTimeout(400);
EXPECT_CALL(*sock, close()).Times(1);
EXPECT_CALL(*sock, writeFully(_, _, _)).Times(1);
EXPECT_CALL(*in, poll(_)).Times(2).WillOnce(
InvokeWithoutArgs(bind(&InvokeWaitAndReturn<bool>, 500, false, 0))).WillOnce(
InvokeWithoutArgs(
bind(&InvokeThrowAndReturn<HdfsNetworkException, bool>,
"expected exception", static_cast<bool *>(NULL),
false)));
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
RpcChannelImpl channel(key, sock, in, client);
channel.lastActivity = steady_clock::now();
channel.available = true;
EXPECT_THROW(channel.checkOneResponse(), HdfsNetworkException);
}
TEST(TestRpcChannel, TestCheckResponse_ErrorResponse) {
std::vector<char> resp;
const char * errClass = "test error class";
const char * errMsg = "test error message";
BuildResponse(0, RpcResponseHeaderProto_RpcStatusProto_ERROR, errClass, errMsg, NULL, resp);
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
BufferedSocketReaderImpl * in = new BufferedSocketReaderImpl(*sock, resp);
RpcRemoteCallPtr call = RpcRemoteCallPtr(
new MockRpcRemoteCall(RpcCall(false, "", NULL, NULL), 0, ""));
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
EXPECT_CALL(*sock, close()).Times(1);
EXPECT_CALL(*static_cast<MockRpcRemoteCall *>(call.get()), cancel(_)).Times(1);
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
channel.pendingCalls[call->getIdentity()] = call;
EXPECT_NO_THROW(DebugException(channel.readOneResponse(false)));
}
TEST(TestRpcChannel, TestCheckResponse_FatalResponse1) {
std::vector<char> resp;
const char * errMsg = "test fatal message";
BuildResponse(0, RpcResponseHeaderProto_RpcStatusProto_FATAL, NULL, errMsg, NULL, resp);
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
BufferedSocketReaderImpl * in = new BufferedSocketReaderImpl(*sock, resp);
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
EXPECT_CALL(*sock, close()).Times(1);
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
EXPECT_THROW(DebugException(channel.readOneResponse(false)), HdfsRpcException);
}
TEST(TestRpcChannel, TestCheckResponse_FatalResponse2) {
std::vector<char> resp;
const char * errClass = "org.apache.hadoop.ipc.StandbyException";
const char * errMsg = "test fatal message";
BuildResponse(0, RpcResponseHeaderProto_RpcStatusProto_FATAL, errClass, errMsg, NULL, resp);
RpcChannelKey key = BuildKey();
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
MockSocket * sock = new MockSocket();
BufferedSocketReaderImpl * in = new BufferedSocketReaderImpl(*sock, resp);
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
EXPECT_CALL(*sock, close()).Times(1);
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
EXPECT_THROW(DebugException(channel.readOneResponse(false)), NameNodeStandbyException);
}
TEST(TestRpcChannel, TestInvoke_RetryIdempotent) {
std::vector<char> respBody;
MockRpcClient client;
uint32_t callid = 2;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
EXPECT_CALL(client, getCallId()).Times(AnyNumber()).WillRepeatedly(Return(callid));
MkdirsRequestProto request;
MkdirsResponseProto response, resp;
request.set_src("src");
request.set_createparent(true);
request.mutable_masked()->set_perm(0600u);
resp.set_result(true);
BuildResponse(callid, RpcResponseHeaderProto_RpcStatusProto_SUCCESS, NULL, NULL, &resp, respBody);
MockSocket * sock = new MockSocket();
RpcChannelKey key = BuildKey();
BufferedSocketReaderImpl * in = new BufferedSocketReaderImpl(*sock, respBody);
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
EXPECT_CALL(*sock, connect(An<const char *>(), An<const char *>(), _)).Times(1);
EXPECT_CALL(*sock, setNoDelay(_)).Times(1);
EXPECT_CALL(*sock, close()).Times(AtLeast(2));
EXPECT_CALL(*sock, writeFully(_, _, _)).Times(4).WillOnce(
InvokeWithoutArgs(
bind(&InvokeThrow<HdfsNetworkConnectException>,
"expected exception", static_cast<bool *>(NULL)))).WillRepeatedly(
InvokeWithoutArgs(&InvokeDoNothing));
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
channel.addRef();
EXPECT_NO_THROW(DebugException(channel.invoke(RpcCall(true, "mkdirs", &request, &response))));
channel.close(false);
}
static void InvokeConcurrently(RpcChannelImpl * channel) {
MkdirsRequestProto request;
MkdirsResponseProto response;
request.set_src("src");
request.set_createparent(true);
request.mutable_masked()->set_perm(0600u);
channel->addRef();
EXPECT_THROW(DebugException(channel->invoke(RpcCall(true, "mkdirs", &request, &response))),
HdfsRpcException);
channel->close(false);
}
TEST(TestRpcChannel, TestInvokeConcurrently_FatalError) {
bool throwError = false;
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
EXPECT_CALL(client, getCallId()).Times(AnyNumber()).WillRepeatedly(
InvokeWithoutArgs(bind(GetCallId, 0)));
RpcChannelKey key = BuildKey();
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
Return(true));
EXPECT_CALL(*in, poll(_)).Times(AnyNumber()).WillRepeatedly(
InvokeWithoutArgs(
bind(&InvokeThrowAndReturn<HdfsNetworkException, bool>,
"expected exception", &throwError, false)));
EXPECT_CALL(*sock, writeFully(_, _, _)).Times(AnyNumber()).WillRepeatedly(
InvokeWithoutArgs(&InvokeDoNothing));
EXPECT_CALL(*sock, close()).Times(AnyNumber());
EXPECT_CALL(*sock, setNoDelay(_)).Times(AnyNumber());
RpcChannelImpl channel(key, sock, in, client);
EXPECT_CALL(*sock, connect(An<const char *>(), An<const char *>(), _)).Times(
AnyNumber()).WillRepeatedly(Assign(&channel.available, true));
std::vector<std::shared_ptr<thread> > threads;
int numOfThread = 50;
for (int i = 0; i < numOfThread; ++i) {
threads.push_back(
std::shared_ptr<thread>(
new thread(InvokeConcurrently, &channel)));
}
sleep_for(seconds(5));
throwError = true;
for (size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
}
TEST(TestRpcChannel, TestInvokeConcurrently_ClientExit) {
bool clientIsRunning = true;
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
RpcChannelKey key = BuildKey();
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(
ReturnPointee(&clientIsRunning));
EXPECT_CALL(*in, poll(_)).Times(AnyNumber()).WillRepeatedly(Return(false));
EXPECT_CALL(*sock, writeFully(_, _, _)).Times(AnyNumber()).WillRepeatedly(
InvokeWithoutArgs(&InvokeDoNothing));
EXPECT_CALL(*sock, close()).Times(AnyNumber());
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
std::vector<std::shared_ptr<thread> > threads;
EXPECT_CALL(client, getCallId()).Times(AnyNumber()).WillRepeatedly(
InvokeWithoutArgs(bind(GetCallId, 0)));
int numOfThread = 50;
for (int i = 0; i < numOfThread; ++i) {
threads.push_back(
std::shared_ptr<thread>(
new thread(InvokeConcurrently, &channel)));
}
for (int i = 0 ; i < 5; ++i) {
channel.checkIdle();
sleep_for(seconds(1));
}
clientIsRunning = false;
for (size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
}
static void TestTimeout(RpcChannelImpl & channel, RpcCall & call) {
try {
channel.invoke(call);
} catch (const HdfsRpcException & e) {
Hdfs::rethrow_if_nested(e);
}
}
TEST(TestRpcChannel, TestTimeout) {
MockRpcClient client;
EXPECT_CALL(client, getClientId()).Times(AnyNumber()).WillRepeatedly(Return(""));
EXPECT_CALL(client, getCallId()).Times(AnyNumber()).WillRepeatedly(InvokeWithoutArgs(bind(GetCallId, 0)));
RpcChannelKey key = BuildKey();
GetConfig(key).setRpcTimeout(1000);
MockSocket * sock = new MockSocket();
MockBufferedSocketReader * in = new MockBufferedSocketReader();
EXPECT_CALL(client, isRunning()).Times(AnyNumber()).WillRepeatedly(Return(true));
EXPECT_CALL(*sock, writeFully(_, _, _)).Times(AnyNumber()).WillRepeatedly(Return());
EXPECT_CALL(*sock, close()).Times(AnyNumber()).WillRepeatedly(Return());
EXPECT_CALL(*in, poll(_)).Times(AnyNumber()).WillRepeatedly(Return(false));
RpcChannelImpl channel(key, sock, in, client);
channel.available = true;
++channel.refs;
MkdirsRequestProto request;
MkdirsResponseProto response;
request.set_src("src");
request.set_createparent(true);
request.mutable_masked()->set_perm(0600u);
RpcCall call(false, "mkdirs", &request, &response);
EXPECT_THROW(TestTimeout(channel, call), HdfsTimeoutException);
--channel.refs;
}