blob: 387e262710cc0f8480c72c76006586faadca2a8b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "lz4.h"
#include "config.h"
#include "lib/commons.h"
#include "lib/Path.h"
#include "lib/BufferStream.h"
#include "lib/FileSystem.h"
#include "lib/Compressions.h"
#include "test_commons.h"
#if defined HADOOP_SNAPPY_LIBRARY
#include <snappy.h>
#endif
void TestCodec(const string & codec) {
string data;
size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024);
uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
string type = TestConfig.get("compression.input.type", "bytes");
Timer timer;
GenerateKVTextLength(data, length, type);
LOG("%s", timer.getInterval("Generate data").c_str());
InputBuffer inputBuffer = InputBuffer(data);
size_t buffLen = data.length() / 2 * 3;
timer.reset();
char * buff = new char[buffLen];
char * buff2 = new char[buffLen];
memset(buff, 0, buffLen);
memset(buff2, 0, buffLen);
LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str());
OutputBuffer outputBuffer = OutputBuffer(buff, buffLen);
CompressStream * compressor = Compressions::getCompressionStream(codec, &outputBuffer, buffhint);
LOG("%s", codec.c_str());
timer.reset();
for (size_t i = 0; i < data.length(); i += 128 * 1024) {
compressor->write(data.c_str() + i, std::min(data.length() - i, (size_t)(128 * 1024)));
}
compressor->flush();
LOG("%s",
timer.getSpeedM2("compress origin/compressed", data.length(), outputBuffer.tell()).c_str());
InputBuffer decompInputBuffer = InputBuffer(buff, outputBuffer.tell());
DecompressStream * decompressor = Compressions::getDecompressionStream(codec, &decompInputBuffer,
buffhint);
size_t total = 0;
timer.reset();
while (true) {
int32_t rd = decompressor->read(buff2 + total, buffLen - total);
if (rd <= 0) {
break;
}
total += rd;
}
LOG("%s", timer.getSpeedM2("decompress origin/uncompressed", outputBuffer.tell(), total).c_str());
LOG("ratio: %.3lf", outputBuffer.tell() / (double )total);
ASSERT_EQ(data.length(), total);
ASSERT_EQ(0, memcmp(data.c_str(), buff2, total));
delete[] buff;
delete[] buff2;
delete compressor;
delete decompressor;
}
TEST(Perf, CompressionUtil) {
string inputfile = TestConfig.get("input", "");
string outputfile = TestConfig.get("output", "");
uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
string inputcodec = Compressions::getCodecByFile(inputfile);
string outputcodec = Compressions::getCodecByFile(outputfile);
size_t bufferSize = buffhint;
if (inputcodec.length() > 0 && outputcodec.length() == 0) {
// decompression
InputStream * fin = FileSystem::getLocal().open(inputfile);
if (fin == NULL) {
THROW_EXCEPTION(IOException, "input file not found");
}
DecompressStream * source = Compressions::getDecompressionStream(inputcodec, fin, bufferSize);
OutputStream * fout = FileSystem::getLocal().create(outputfile, true);
char * buffer = new char[bufferSize];
while (true) {
int rd = source->read(buffer, bufferSize);
if (rd <= 0) {
break;
}
fout->write(buffer, rd);
}
source->close();
delete source;
fin->close();
delete fin;
fout->flush();
fout->close();
delete fout;
delete buffer;
} else if (inputcodec.length() == 0 && outputcodec.length() > 0) {
// compression
InputStream * fin = FileSystem::getLocal().open(inputfile);
if (fin == NULL) {
THROW_EXCEPTION(IOException, "input file not found");
}
OutputStream * fout = FileSystem::getLocal().create(outputfile, true);
CompressStream * dest = Compressions::getCompressionStream(outputcodec, fout, bufferSize);
char * buffer = new char[bufferSize];
while (true) {
int rd = fin->read(buffer, bufferSize);
if (rd <= 0) {
break;
}
dest->write(buffer, rd);
}
dest->flush();
dest->close();
delete dest;
fout->close();
delete fout;
fin->close();
delete fin;
delete buffer;
} else {
LOG("Not compression or decompression, do nothing");
}
}
class CompressResult {
public:
uint64_t uncompressedSize;
uint64_t compressedSize;
uint64_t compressTime;
uint64_t uncompressTime;
CompressResult()
: uncompressedSize(0), compressedSize(0), compressTime(0), uncompressTime(0) {
}
CompressResult & operator+=(const CompressResult & rhs) {
uncompressedSize += rhs.uncompressedSize;
compressedSize += rhs.compressedSize;
compressTime += rhs.compressTime;
uncompressTime += rhs.uncompressTime;
return *this;
}
string toString() {
return StringUtil::Format("Compress: %4.0fM/s Decompress: %5.0fM/s(%5.0fM/s) ratio: %.1f%%",
(uncompressedSize / 1024.0 / 1024) / (compressTime / 1000000000.),
(compressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.),
(uncompressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.),
compressedSize / (float)uncompressedSize * 100);
}
};
TEST(Perf, GzipCodec) {
TestCodec("org.apache.hadoop.io.compress.GzipCodec");
}
void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t blockSize,
int times) {
string data;
ReadFile(data, path);
size_t maxlength = std::max((size_t)(blockSize * 1.005), blockSize + 8);
char * outputBuffer = new char[maxlength];
char * dest = new char[blockSize + 8];
CompressResult result;
Timer t;
for (size_t start = 0; start < data.length(); start += blockSize) {
size_t currentblocksize = std::min(data.length() - start, blockSize);
uint64_t startTime = t.now();
for (int i = 0; i < times; i++) {
int osize = LZ4_compress((char*)data.data() + start, outputBuffer, currentblocksize);
result.compressedSize += osize;
result.uncompressedSize += currentblocksize;
}
uint64_t endTime = t.now();
result.compressTime += endTime - startTime;
startTime = t.now();
for (int i = 0; i < times; i++) {
int osize = LZ4_uncompress(outputBuffer, dest, currentblocksize);
ASSERT_EQ(currentblocksize, osize);
}
endTime = t.now();
result.uncompressTime += endTime - startTime;
}
printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str());
delete[] outputBuffer;
delete[] dest;
total += result;
}
TEST(Perf, RawCompressionLz4) {
string inputdir = TestConfig.get("compressions.input.path", "");
int64_t times = TestConfig.getInt("compression.time", 400);
int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64);
vector<FileEntry> inputfiles;
FileSystem::getLocal().list(inputdir, inputfiles);
CompressResult total;
printf("Block size: %lldK\n", (long long int)(blockSize / 1024));
for (size_t i = 0; i < inputfiles.size(); i++) {
if (!inputfiles[i].isDirectory) {
MeasureSingleFileLz4((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize, times);
}
}
printf("%s - Total\n", total.toString().c_str());
}
TEST(Perf, Lz4Codec) {
TestCodec("org.apache.hadoop.io.compress.Lz4Codec");
}
#if defined HADOOP_SNAPPY_LIBRARY
void MeasureSingleFileSnappy(const string & path, CompressResult & total, size_t blockSize,
int times) {
string data;
ReadFile(data, path);
size_t maxlength = snappy::MaxCompressedLength(blockSize);
char * outputBuffer = new char[maxlength];
char * dest = new char[blockSize];
CompressResult result;
Timer t;
int compressedSize = -1;
for (size_t start = 0; start < data.length(); start += blockSize) {
size_t currentblocksize = std::min(data.length() - start, blockSize);
uint64_t startTime = t.now();
for (int i = 0; i < times; i++) {
size_t osize = maxlength;
snappy::RawCompress(data.data() + start, currentblocksize, outputBuffer, &osize);
compressedSize = osize;
result.compressedSize += osize;
result.uncompressedSize += currentblocksize;
}
uint64_t endTime = t.now();
result.compressTime += endTime - startTime;
startTime = t.now();
for (int i = 0; i < times; i++) {
snappy::RawUncompress(outputBuffer, compressedSize, dest);
}
endTime = t.now();
result.uncompressTime += endTime - startTime;
}
printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str());
delete[] outputBuffer;
delete[] dest;
total += result;
}
TEST(Perf, RawCompressionSnappy) {
string inputdir = TestConfig.get("compressions.input.path", "");
int64_t times = TestConfig.getInt("compression.time", 400);
int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64);
vector<FileEntry> inputfiles;
FileSystem::getLocal().list(inputdir, inputfiles);
CompressResult total;
printf("Block size: %"PRId64"K\n", blockSize / 1024);
for (size_t i = 0; i < inputfiles.size(); i++) {
if (!inputfiles[i].isDirectory) {
MeasureSingleFileSnappy((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize,
times);
}
}
printf("%s - Total\n", total.toString().c_str());
}
TEST(Perf, SnappyCodec) {
TestCodec("org.apache.hadoop.io.compress.SnappyCodec");
}
#endif // define HADOOP_SNAPPY_LIBRARY