blob: e1e32d4146067b487f21413558851860bb93e257 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include "lib/commons.h"
#include "config.h"
#include "lib/BufferStream.h"
#include "lib/FileSystem.h"
#include "lib/IFile.h"
#include "test_commons.h"
SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
const string & path, KeyValueType type, const string & codec) {
FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path);
IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL);
for (int i = 0; i < partition; i++) {
iw->startPartition();
for (size_t i = 0; i < kvs.size(); i++) {
pair<string, string> & p = kvs[i];
iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length());
}
iw->endPartition();
}
SingleSpillInfo * info = iw->getSpillInfo();
delete iw;
delete fout;
return info;
}
void readIFile(vector<pair<string, string> > & kvs, const string & path, KeyValueType type,
SingleSpillInfo * info, const string & codec) {
FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path);
IFileReader * ir = new IFileReader(fin, info);
while (ir->nextPartition()) {
const char * key, *value;
uint32_t keyLen, valueLen;
while (NULL != (key = ir->nextKey(keyLen))) {
value = ir->value(valueLen);
string keyS(key, keyLen);
string valueS(value, valueLen);
kvs.push_back(std::make_pair(keyS, valueS));
}
}
delete ir;
delete fin;
}
void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
vector<pair<string, string> > & kvs, const string & codec = "") {
string outputpath = "ifilewriter";
SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec);
LOG("write finished");
vector<pair<string, string> > readkvs;
readIFile(readkvs, outputpath, kvtype, info, codec);
LOG("read finished");
delete info;
ASSERT_EQ(kvs.size() * partition, readkvs.size());
for (int i = 0; i < partition; i++) {
vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
readkvs.begin() + (i + 1) * kvs.size());
ASSERT_EQ(kvs.size(), cur_part.size());
// for (size_t j=0;j<kvs.size();j++) {
// SCOPED_TRACE(j);
// ASSERT_EQ(kvs[j], cur_part[j]);
// }
ASSERT_EQ(kvs, cur_part);
}
FileSystem::getLocal().remove(outputpath);
}
TEST(IFile, WriteRead) {
int partition = TestConfig.getInt("ifile.partition", 7);
int size = TestConfig.getInt("partition.size", 20000);
vector<pair<string, string> > kvs;
Generate(kvs, size, "bytes");
TestIFileReadWrite(TextType, partition, size, kvs);
TestIFileReadWrite(BytesType, partition, size, kvs);
TestIFileReadWrite(UnknownType, partition, size, kvs);
#if defined HADOOP_SNAPPY_LIBRARY
TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
#endif
}
void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,
const string & codec, ChecksumType checksumType, KeyValueType type) {
int partition = TestConfig.getInt("ifile.partition", 50);
Timer timer;
OutputBuffer outputBuffer = OutputBuffer(buff, buffsize);
IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL);
timer.reset();
for (int i = 0; i < partition; i++) {
iw->startPartition();
for (size_t j = 0; j < kvs.size(); j++) {
iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(),
kvs[j].second.length());
}
iw->endPartition();
}
SingleSpillInfo * info = iw->getSpillInfo();
LOG("%s",
timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str());
delete iw;
InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell());
IFileReader * ir = new IFileReader(&inputBuffer, info);
timer.reset();
int sum = 0;
while (ir->nextPartition()) {
const char * key, *value;
uint32_t keyLen, valueLen;
while (NULL != (key = ir->nextKey(keyLen))) {
value = ir->value(valueLen);
sum += value[0];
}
}
// use the result so that value() calls don't get optimized out
ASSERT_NE(0xdeadbeef, sum);
LOG("%s",
timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str());
delete ir;
delete info;
}
TEST(Perf, IFile) {
int size = TestConfig.getInt("partition.size", 20000);
string codec = TestConfig.get("ifile.codec", "");
string type = TestConfig.get("ifile.type", "bytes");
vector<pair<string, string> > kvs;
Generate(kvs, size, type);
std::sort(kvs.begin(), kvs.end());
size_t buffsize = 200 * 1024 * 1024;
char * buff = new char[buffsize];
memset(buff, 0, buffsize);
LOG("Test TextType CRC32");
TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType);
LOG("Test BytesType CRC32");
TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType);
LOG("Test UnknownType CRC32");
TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType);
LOG("Test TextType CRC32C");
TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType);
LOG("Test BytesType CRC32C");
TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType);
LOG("Test UnknownType CRC32C");
TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType);
delete[] buff;
}
// The Glibc has a bug in the file tell api, it will overwrite the file data
// unexpected.
// Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
// This case is to check whether the bug exists.
// If it exists, it means you need to upgrade the glibc.
TEST(IFile, TestGlibCBug) {
std::string path("./testData/testGlibCBugSpill.out");
int32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654};
LOG("TestGlibCBug %s", path.c_str());
IFileSegment * segments = new IFileSegment[1];
segments[0].realEndOffset = 10000000;
SingleSpillInfo info(segments, 1, path, CHECKSUM_NONE,
IntType, TextType, "");
InputStream * fileOut = FileSystem::getLocal().open(path);
IFileReader * reader = new IFileReader(fileOut, &info, true);
const char * key = NULL;
uint32_t length = 0;
reader->nextPartition();
uint32_t index = 0;
while (NULL != (key = reader->nextKey(length))) {
int32_t realKey = (int32_t)bswap(*(uint32_t *)(key));
ASSERT_LT(index, 5);
ASSERT_EQ(expect[index], realKey);
index++;
}
delete reader;
}