blob: f81e4fb7590332ec0fee3e0512833ad1182342cf [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "client/FileSystem.h"
#include "client/InputStream.h"
#include "client/OutputStream.h"
#include "DateTime.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "gtest/gtest.h"
#include "Logger.h"
#include "Memory.h"
#include "TestUtil.h"
#include "Thread.h"
#include "XmlConfig.h"
#include <inttypes.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <vector>
using namespace Hdfs;
using namespace Internal;
#ifndef TEST_HDFS_PREFIX
#define TEST_HDFS_PREFIX "./"
#endif
#define BASE_DIR TEST_HDFS_PREFIX"/testInputStream/"
class TestInputStream: public ::testing::Test {
public:
TestInputStream() :
conf("function-test.xml") {
conf.set("output.default.packetsize", 1024);
fs = new FileSystem(conf);
fs->connect();
superfs = new FileSystem(conf);
superfs->connect(conf.getString("dfs.default.uri"), HDFS_SUPERUSER, NULL);
conf.set("dfs.client.read.shortcircuit", false);
remotefs = new FileSystem(conf);
remotefs->connect();
superfs->setWorkingDirectory(fs->getWorkingDirectory().c_str());
try {
superfs->deletePath(BASE_DIR, true);
} catch (...) {
}
superfs->mkdirs(BASE_DIR, 0755);
superfs->setOwner(TEST_HDFS_PREFIX, USER, NULL);
superfs->setOwner(BASE_DIR, USER, NULL);
ous.open(*fs, BASE_DIR"smallfile", Create | Overwrite, 0644, true, 0, 2048);
char buffer1[10], buffer2[20 * 2048];
FillBuffer(buffer1, sizeof(buffer1), 0);
FillBuffer(buffer2, sizeof(buffer2), 0);
ous.append(buffer1, sizeof(buffer1));
ous.close();
ous.open(*fs, BASE_DIR"largefile", Create, 0644, false, 0, 2048);
ous.append(buffer2, sizeof(buffer2));
ous.close();
}
~TestInputStream() {
try {
superfs->deletePath(BASE_DIR, true);
} catch (...) {
}
fs->disconnect();
delete fs;
superfs->disconnect();
delete superfs;
remotefs->disconnect();
delete remotefs;
}
void OpenFailed(FileSystem & tfs) {
ASSERT_THROW(ins.open(tfs, "", true), InvalidParameter);
FileSystem fs1(conf);
ASSERT_THROW(ins.open(fs1, BASE_DIR"smallfile", true), HdfsIOException);
ASSERT_NO_THROW(ins.open(tfs, BASE_DIR"smallfile", true));
}
void OpenforRead(FileSystem & tfs) {
EXPECT_THROW(ins.open(tfs, BASE_DIR"a", true), FileNotFoundException);
char buff[100];
ASSERT_THROW(ins.read(buff, 100), HdfsIOException);
ins.close();
}
void Read(FileSystem & tfs, size_t size) {
char buff[2048];
ins.open(tfs, BASE_DIR"largefile", true);
ASSERT_NO_THROW(ins.read(buff, size));
EXPECT_TRUE(CheckBuffer(buff, size, 0));
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.read(buff, size - 100));
EXPECT_TRUE(CheckBuffer(buff, size - 100, 0));
if (size == 2048) {
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.read(buff, size + 100));
EXPECT_TRUE(CheckBuffer(buff, size, 0));
ASSERT_NO_THROW(ins.seek(2));
ASSERT_NO_THROW(ins.read(buff, 100));
EXPECT_TRUE(CheckBuffer(buff, 100, 2));
} else {
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.read(buff, size + 100));
EXPECT_TRUE(CheckBuffer(buff, size + 100, 0));
}
ins.close();
}
void Seek(FileSystem & tfs) {
ins.open(tfs, BASE_DIR"smallfile", true);
char buff[1024];
ASSERT_NO_THROW(ins.read(buff, 100));
ASSERT_THROW(ins.read(buff, 1024), HdfsEndOfStream);
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.read(buff, 100));
ins.close();
ins.open(tfs, BASE_DIR"smallfile", true);
ASSERT_NO_THROW(ins.read(buff, 100));
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.read(buff, 100));
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.seek(10));
ASSERT_THROW(ins.read(buff, 1), HdfsEndOfStream);
ins.close();
ASSERT_THROW(ins.seek(12), HdfsIOException);
ins.open(tfs, BASE_DIR"smallfile", true);
ASSERT_THROW(ins.seek(12), HdfsIOException);
ins.close();
ins.open(tfs, BASE_DIR"largefile", true);
ASSERT_NO_THROW(ins.seek(1027));
ASSERT_NO_THROW(ins.read(buff, 100));
ins.close();
}
void CheckSum(FileSystem & tfs) {
ins.open(tfs, BASE_DIR"largefile", false);
std::vector<char> buff(10240);
ASSERT_NO_THROW(ins.read(&buff[0], 512));
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.read(&buff[0], 1049));
ins.close();
ins.open(tfs, BASE_DIR"smallfile", false);
ASSERT_THROW(ins.seek(13), HdfsIOException);
ins.close();
}
void ReadFully(FileSystem & tfs, size_t size) {
ins.open(tfs, BASE_DIR"largefile", false);
char buff[20 * 2048 + 1];
ASSERT_NO_THROW(ins.readFully(buff, size));
EXPECT_TRUE(CheckBuffer(buff, size, 0));
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.readFully(buff, size - 100));
EXPECT_TRUE(CheckBuffer(buff, size - 100, 0));
ASSERT_NO_THROW(ins.seek(0));
ASSERT_NO_THROW(ins.readFully(buff, size + 100));
EXPECT_TRUE(CheckBuffer(buff, size + 100, 0));
ASSERT_THROW(ins.readFully(buff, 20 * 2048 + 1), HdfsIOException);
ins.close();
}
protected:
Config conf;
FileSystem * fs;
FileSystem * superfs;
FileSystem * remotefs; //test remote block reader
InputStream ins;
OutputStream ous;
};
TEST_F(TestInputStream, TestInputStream_OpenFailed) {
OpenFailed(*fs);
OpenFailed(*remotefs);
}
TEST_F(TestInputStream, TestInputStream_OpenforRead) {
OpenforRead(*fs);
OpenforRead(*remotefs);
}
TEST_F(TestInputStream, TestInputStream_Read) {
Read(*fs, 512);
Read(*fs, 1024);
Read(*fs, 2048);
Read(*remotefs, 512);
Read(*remotefs, 1024);
Read(*remotefs, 2048);
}
TEST_F(TestInputStream, TestInputStream_Seek) {
Seek(*fs);
Seek(*remotefs);
}
TEST_F(TestInputStream, TestInputStream_CheckSum) {
CheckSum(*fs);
CheckSum(*remotefs);
}
TEST_F(TestInputStream, TestInputStream_ReadFully) {
ReadFully(*fs, 512);
ReadFully(*fs, 1024);
ReadFully(*fs, 2048);
ReadFully(*remotefs, 512);
ReadFully(*remotefs, 1024);
ReadFully(*remotefs, 2048);
}
static void CheckFileContent(FileSystem * fs, std::string path, int64_t len, size_t offset) {
InputStream in;
EXPECT_NO_THROW(in.open(*fs, path.c_str(), true));
std::vector<char> buff(20 * 1024 + 1);
int rc, todo = len, batch;
while (todo > 0) {
batch = todo < static_cast<int>(buff.size()) ? todo : buff.size();
batch = in.read(&buff[0], batch);
ASSERT_TRUE(batch > 0);
todo = todo - batch;
rc = Hdfs::CheckBuffer(&buff[0], batch, offset);
offset += batch;
EXPECT_TRUE(rc);
}
EXPECT_NO_THROW(in.close());
}
static void WriteFile(FileSystem * fs, std::string filename, int64_t writeSize, int flag) {
std::vector<char> buffer(64 * 1024);
int64_t todo, batch;
size_t offset = 0;
todo = writeSize;
OutputStream ousA;
ASSERT_NO_THROW(ousA.open(*fs, filename.c_str(), flag, 0644, false, 0, 1024 * 1024));
while (todo > 0) {
batch = todo < static_cast<int>(buffer.size()) ? todo : buffer.size();
Hdfs::FillBuffer(&buffer[0], batch, offset);
ASSERT_NO_THROW(DebugException(ousA.append(&buffer[0], batch)));
todo -= batch;
offset += batch;
}
ASSERT_NO_THROW(ousA.close());
}
static void NothrowCheckFileContent(FileSystem * fs, std::string path,
int64_t len, size_t offset) {
EXPECT_NO_THROW(CheckFileContent(fs, path, len, offset));
}
TEST_F(TestInputStream, TestReadOneFileSameTime) {
int flag = Create | Overwrite;
int64_t readSize = 1 * 1024 * 1024 + 234;
int64_t writeSize = 1 * 1024 * 1024 * 1024 + 234;
std::string filename(BASE_DIR"testReadOneFileSameTime");
std::vector<shared_ptr<thread> > threads;
WriteFile(&*fs, filename, writeSize, flag);
for (int i = 1; i <= 50; ++i) {
threads.push_back(
shared_ptr<thread>(
new thread(NothrowCheckFileContent, fs, filename, readSize, 0)));
}
for (size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
}
/**
* test read many files in the same time
*/
TEST_F(TestInputStream, TestReadManyFileSameTime) {
int flag = Create | Overwrite;
int64_t readSize = 1 * 1024 * 1024 + 234;
int64_t writeSize = 20 * 1024 * 1024 + 234;
std::vector<shared_ptr<thread> > threads;
const char * filename = BASE_DIR"testReadSameTime";
for (int i = 1; i <= 50; ++i) {
std::stringstream ss;
ss.imbue(std::locale::classic());
ss << filename << i;
WriteFile(fs, ss.str(), writeSize, flag);
threads.push_back(
shared_ptr<thread>(
new thread(NothrowCheckFileContent, fs, ss.str(), readSize, 0)));
}
for (size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
}
void static SetupTestEnv(FileSystem & fs, Config & conf) {
FileSystem superfs(conf);
superfs.connect(conf.getString("dfs.default.uri"), HDFS_SUPERUSER, NULL);
superfs.setWorkingDirectory(fs.getWorkingDirectory().c_str());
try {
superfs.deletePath(BASE_DIR, true);
} catch (...) {
}
superfs.mkdirs(BASE_DIR, 0755);
superfs.setOwner(TEST_HDFS_PREFIX, USER, NULL);
superfs.setOwner(BASE_DIR, USER, NULL);
superfs.disconnect();
}
TEST(TestInputStreamWithOutputStream, TestOpenFirstAndAppend) {
Config conf("function-test.xml");
conf.set("dfs.client.read.shortcircuit", false);
conf.set("input.notretry-another-node", true);
FileSystem fs(conf);
fs.connect();
SetupTestEnv(fs, conf);
//int prefix = 45013;
int step = 16384;
int prefix = 400;
int suffix = 16034;
int fileSize = prefix + step * 3 + suffix;
std::vector<char> buffer;
buffer.resize(fileSize);
FillBuffer(&buffer[0], buffer.size(), 0);
OutputStream os;
ASSERT_NO_THROW(os.open(fs, BASE_DIR"testOpenFirstAndAppend", Create, 0666, true, 1));
ASSERT_NO_THROW(os.append(&buffer[0], buffer.size()));
ASSERT_NO_THROW(os.sync());
ASSERT_NO_THROW(os.close());
InputStream is;
ASSERT_NO_THROW(is.open(fs, BASE_DIR"testOpenFirstAndAppend", true));
ASSERT_NO_THROW(is.seek(prefix));
buffer.resize(step);
int todo = fileSize - prefix;
size_t offset = prefix;
while (todo > 0) {
int batch = step;
batch = batch < todo ? batch : todo;
ASSERT_NO_THROW(batch = is.read(&buffer[0], batch));
ASSERT_TRUE(CheckBuffer(&buffer[0], batch, offset));
todo -= batch;
offset += batch;
}
ASSERT_NO_THROW(is.close());
}
static double CalculateThroughput(int64_t elapsed, int64_t size) {
return size / 1024.0 * 1000.0 / 1024.0 / elapsed;
}
TEST(TestThroughput, Throughput) {
Config conf("function-test.xml");
FileSystem fs(conf);
fs.connect();
SetupTestEnv(fs, conf);
const char * filename = BASE_DIR"TestThroughput";
//const char * filename = "TestThroughput_SeekAhead";
std::vector<char> buffer(64 * 1024);
int64_t fileLength = 5 * 1024 * 1024 * 1024ll;
int64_t todo = fileLength, batch, elapsed;
size_t offset = 0;
steady_clock::time_point start, stop;
if (!fs.exist(filename)) {
OutputStream ous;
start = steady_clock::now();
EXPECT_NO_THROW(
DebugException(ous.open(fs, filename, Create | Overwrite /*| SyncBlock*/)));
while (todo > 0) {
batch = todo < static_cast<int>(buffer.size()) ?
todo : buffer.size();
ASSERT_NO_THROW(DebugException(ous.append(&buffer[0], batch)));
todo -= batch;
offset += batch;
}
ASSERT_NO_THROW(DebugException(ous.close()));
steady_clock::time_point stop = steady_clock::now();
elapsed = ToMilliSeconds(start, stop);
LOG(INFO, "write file time %" PRId64 " ms, throughput is %lf mbyte/s",
elapsed, CalculateThroughput(elapsed, fileLength));
}
start = steady_clock::now();
InputStream in;
EXPECT_NO_THROW(in.open(fs, filename, true));
std::vector<char> buff(20 * 1024 + 1);
todo = fileLength;
while (todo > 0) {
batch = todo < static_cast<int>(buff.size()) ? todo : buff.size();
batch = in.read(&buff[0], batch);
ASSERT_TRUE(batch > 0);
todo = todo - batch;
offset += batch;
}
EXPECT_NO_THROW(in.close());
stop = steady_clock::now();
elapsed = ToMilliSeconds(start, stop);
LOG(INFO, "read file time %" PRId64 " ms, throughput is %lf mbyte/s", elapsed, CalculateThroughput(elapsed, fileLength));
fs.deletePath(filename, true);
}
TEST(TestThroughput, TestSeekAhead) {
Config conf("function-test.xml");
conf.set("dfs.client.read.shortcircuit", true);
FileSystem fs(conf);
fs.connect();
SetupTestEnv(fs, conf);
int64_t offset = 0;
int64_t fileLength = 20 * 1024 * 1024 * 1024ll;
int64_t todo = fileLength, batch, elapsed;
const char * filename = BASE_DIR"TestThroughput_SeekAhead";
//const char * filename = "TestThroughput_SeekAhead";
steady_clock::time_point start, stop;
if (!fs.exist(filename)) {
std::vector<char> buffer(64 * 1024);
OutputStream ous;
start = steady_clock::now();
EXPECT_NO_THROW(
DebugException(ous.open(fs, filename, Create | Overwrite /*| SyncBlock*/)));
while (todo > 0) {
batch = todo < static_cast<int>(buffer.size()) ?
todo : buffer.size();
ASSERT_NO_THROW(DebugException(ous.append(&buffer[0], batch)));
todo -= batch;
}
ASSERT_NO_THROW(DebugException(ous.close()));
stop = steady_clock::now();
elapsed = ToMilliSeconds(start, stop);
LOG(INFO, "write file time %" PRId64 " ms, throughput is %lf mbyte/s",
elapsed, CalculateThroughput(elapsed, fileLength));
}
start = steady_clock::now();
InputStream in;
EXPECT_NO_THROW(in.open(fs, filename, true));
std::vector<char> buff(8 * 1024 * 1024 + 1);
todo = fileLength;
while (todo > 0) {
batch = todo < static_cast<int>(buff.size()) ? todo : buff.size();
DebugException(in.readFully(&buff[0], batch / 8));
//seek
in.seek(offset + batch);
todo = todo - batch;
offset += batch;
}
EXPECT_NO_THROW(in.close());
stop = steady_clock::now();
elapsed = ToMilliSeconds(start, stop);
LOG(INFO, "read and seek file time %" PRId64 " ms, throughput is %lf mbyte/s",
elapsed, CalculateThroughput(elapsed, fileLength));
fs.deletePath(filename, true);
}