blob: db48e88879b2c36639788e968e7e23baa92b8347 [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Exception.h"
#include "ExceptionInternal.h"
#include "gtest/gtest.h"
#include "MockSocket.h"
#include "network/BufferedSocketReader.h"
#include "TestUtil.h"
#include "Thread.h"
using namespace Hdfs;
using namespace Hdfs::Internal;
using namespace testing;
class TestBufferedSocketReader: public ::testing::Test {
public:
TestBufferedSocketReader() :
reader(sock) {
}
public:
MockSocket sock;
BufferedSocketReaderImpl reader;
};
static int32_t ReadAction(char * buffer, int32_t size, const char * target,
int32_t tsize) {
int32_t todo = size < tsize ? size : tsize;
memcpy(buffer, target, todo);
return todo;
}
static void ReadFullyAction(char * buffer, int32_t size, int timeout,
const char * target, int32_t tsize) {
assert(size == tsize);
memcpy(buffer, target, size);
}
TEST_F(TestBufferedSocketReader, TestRead) {
ASSERT_EQ(0, reader.size);
ASSERT_EQ(0, reader.cursor);
char readData[64];
int rsize = sizeof(readData);
EXPECT_CALL(sock, read(_, _)).Times(1).WillOnce(
Invoke(bind(&ReadAction, _1, _2, readData, rsize)));
/*
* fill the buffer, read from buffer.
*/
int bufferSize = reader.buffer.size();
ASSERT_GT(bufferSize, 0);
FillBuffer(&reader.buffer[0], bufferSize, 0);
reader.size = bufferSize;
std::vector<char> target(bufferSize);
ASSERT_EQ(1, reader.read(&target[0], 1));
ASSERT_TRUE(CheckBuffer(&target[0], 1, 0));
ASSERT_EQ(bufferSize - 1, reader.read(&target[1], bufferSize - 1));
ASSERT_TRUE(CheckBuffer(&target[0], bufferSize, 0));
/*
* Refill the buffer, read cross the buffer.
*/
reader.cursor = 0;
reader.size = bufferSize;
target.resize(bufferSize + sizeof(readData));
FillBuffer(readData, sizeof(readData), bufferSize);
int32_t todo = target.size();
while (todo > 0) {
todo -= reader.read(&target[0] + (target.size() - todo), todo);
}
ASSERT_TRUE(CheckBuffer(&target[0], target.size(), 0));
ASSERT_EQ(0, reader.size);
ASSERT_EQ(0, reader.cursor);
}
TEST_F(TestBufferedSocketReader, TestReadFully) {
ASSERT_EQ(0, reader.size);
ASSERT_EQ(0, reader.cursor);
char readData[64];
int rsize = sizeof(readData);
EXPECT_CALL(sock, readFully(_, _, 500)).Times(1).WillOnce(
Invoke(bind(&ReadFullyAction, _1, _2, _3, readData, rsize)));
/*
* fill the buffer, read from buffer.
*/
int bufferSize = reader.buffer.size();
ASSERT_GT(bufferSize, 0);
FillBuffer(&reader.buffer[0], bufferSize, 0);
reader.size = bufferSize;
std::vector<char> target(bufferSize);
reader.readFully(&target[0], 1, 500);
ASSERT_TRUE(CheckBuffer(&target[0], 1, 0));
reader.readFully(&target[1], bufferSize - 1, 500);
ASSERT_TRUE(CheckBuffer(&target[0], bufferSize, 0));
/*
* Refill the buffer, read cross the buffer.
*/
reader.cursor = 0;
reader.size = bufferSize;
target.resize(bufferSize + sizeof(readData));
FillBuffer(readData, sizeof(readData), bufferSize);
reader.readFully(&target[0], target.size(), 500);
ASSERT_TRUE(CheckBuffer(&target[0], target.size(), 0));
ASSERT_EQ(0, reader.size);
ASSERT_EQ(0, reader.cursor);
}
TEST_F(TestBufferedSocketReader, TestBigEndianInt32) {
ASSERT_EQ(0, reader.size);
ASSERT_EQ(0, reader.cursor);
ASSERT_GE(reader.buffer.size(), sizeof(int32_t));
reader.buffer[0] = '1';
reader.buffer[1] = '2';
reader.buffer[2] = '3';
reader.buffer[3] = '4';
reader.size = 4;
char target[4] = { '4', '3', '2', '1' };
EXPECT_EQ(*reinterpret_cast<int32_t *>(target),
reader.readBigEndianInt32(500));
}
TEST_F(TestBufferedSocketReader, TestReadVarInt32) {
ASSERT_EQ(0, reader.size);
ASSERT_EQ(0, reader.cursor);
ASSERT_GE(reader.buffer.size(), sizeof(int32_t));
reader.buffer[0] = 0;
reader.buffer[1] = 1;
reader.buffer[2] = 2;
reader.size = 3;
reader.cursor = 1;
char target[2];
target[0] = 1;
target[1] = 2;
EXPECT_CALL(sock, read(_, _)).Times(1).WillOnce(
Invoke(bind(&ReadAction, _1, _2, target, sizeof(target))));
EXPECT_CALL(sock, poll(true, false, _)).WillOnce(Return(true));
EXPECT_EQ(1, reader.readVarint32(500));
EXPECT_EQ(1, reader.size - reader.cursor);
EXPECT_EQ(2, reader.readVarint32(500));
EXPECT_EQ(0, reader.size - reader.cursor);
EXPECT_EQ(1, reader.readVarint32(500));
EXPECT_EQ(2, reader.readVarint32(500));
EXPECT_EQ(0, reader.size - reader.cursor);
}
TEST_F(TestBufferedSocketReader, TestReadVarInt32_Failure) {
EXPECT_CALL(sock, poll(true, false, _)).Times(AnyNumber()).WillOnce(
Return(false)).WillRepeatedly(Return(true));
char target[1];
target[0] = -1;
EXPECT_CALL(sock, read(_, _)).Times(AnyNumber()).WillRepeatedly(
Invoke(bind(&ReadAction, _1, _2, target, sizeof(target))));
EXPECT_THROW(reader.readVarint32(0), HdfsTimeoutException);
EXPECT_THROW(reader.readVarint32(1000), HdfsNetworkException);
}