blob: 18508b63d112f722d8b910cee070499d22644e35 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/codec.h"
#include <gutil/strings/substitute.h>
#include "util/compress.h"
#include "util/decompress.h"
#include "gen-cpp/CatalogObjects_constants.h"
#include "common/names.h"
using namespace impala;
using namespace strings;
const char* const Codec::DEFAULT_COMPRESSION =
"org.apache.hadoop.io.compress.DefaultCodec";
const char* const Codec::GZIP_COMPRESSION = "org.apache.hadoop.io.compress.GzipCodec";
const char* const Codec::BZIP2_COMPRESSION = "org.apache.hadoop.io.compress.BZip2Codec";
const char* const Codec::SNAPPY_COMPRESSION = "org.apache.hadoop.io.compress.SnappyCodec";
const char* const Codec::LZ4_COMPRESSION = "org.apache.hadoop.io.compress.Lz4Codec";
const char* const Codec::ZSTD_COMPRESSION =
"org.apache.hadoop.io.compress.ZStandardCodec";
const char* const Codec::UNKNOWN_CODEC_ERROR =
"This compression codec is currently unsupported: ";
const char* const NO_LZO_MSG = "LZO codecs may not be created via the Codec interface. "
"Instead the LZO library is directly invoked.";
const Codec::CodecMap Codec::CODEC_MAP = {{"", THdfsCompression::NONE},
{DEFAULT_COMPRESSION, THdfsCompression::DEFAULT},
{GZIP_COMPRESSION, THdfsCompression::GZIP},
{BZIP2_COMPRESSION, THdfsCompression::BZIP2},
{SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED},
{LZ4_COMPRESSION, THdfsCompression::LZ4_BLOCKED},
{ZSTD_COMPRESSION, THdfsCompression::ZSTD}};
string Codec::GetCodecName(THdfsCompression::type type) {
for (const CodecMap::value_type& codec: g_CatalogObjects_constants.COMPRESSION_MAP) {
if (codec.second == type) return codec.first;
}
DCHECK(false) << "Missing codec in COMPRESSION_MAP: " << type;
return "INVALID";
}
Status Codec::GetHadoopCodecClassName(THdfsCompression::type type, string* out_name) {
for (const CodecMap::value_type& codec: CODEC_MAP) {
if (codec.second == type) {
out_name->assign(codec.first);
return Status::OK();
}
}
return Status(Substitute("Unsupported codec for given file type: $0",
_THdfsCompression_VALUES_TO_NAMES.find(type)->second));
}
Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const string& codec,
scoped_ptr<Codec>* compressor) {
CodecMap::const_iterator type = CODEC_MAP.find(codec);
if (type == CODEC_MAP.end()) {
return Status(Substitute("$0$1", UNKNOWN_CODEC_ERROR, codec));
}
CodecInfo codec_info(
type->second, (type->second == THdfsCompression::ZSTD) ? ZSTD_CLEVEL_DEFAULT : 0);
RETURN_IF_ERROR(CreateCompressor(mem_pool, reuse, codec_info, compressor));
return Status::OK();
}
Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& codec_info,
scoped_ptr<Codec>* compressor) {
THdfsCompression::type format = codec_info.format_;
switch (format) {
case THdfsCompression::NONE:
compressor->reset(nullptr);
return Status::OK();
case THdfsCompression::GZIP:
compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse));
break;
case THdfsCompression::DEFAULT:
compressor->reset(new GzipCompressor(GzipCompressor::ZLIB, mem_pool, reuse));
break;
case THdfsCompression::DEFLATE:
compressor->reset(new GzipCompressor(GzipCompressor::DEFLATE, mem_pool, reuse));
break;
case THdfsCompression::BZIP2:
compressor->reset(new BzipCompressor(mem_pool, reuse));
break;
case THdfsCompression::SNAPPY_BLOCKED:
compressor->reset(new SnappyBlockCompressor(mem_pool, reuse));
break;
case THdfsCompression::SNAPPY:
compressor->reset(new SnappyCompressor(mem_pool, reuse));
break;
case THdfsCompression::LZ4:
compressor->reset(new Lz4Compressor(mem_pool, reuse));
break;
case THdfsCompression::ZSTD:
compressor->reset(new ZstandardCompressor(mem_pool, reuse,
codec_info.compression_level_));
break;
case THdfsCompression::LZ4_BLOCKED:
compressor->reset(new Lz4BlockCompressor(mem_pool, reuse));
break;
default: {
if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
return Status(Substitute("Unsupported codec: $0", format));
}
}
return (*compressor)->Init();
}
Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse, const string& codec,
scoped_ptr<Codec>* decompressor) {
CodecMap::const_iterator type = CODEC_MAP.find(codec);
if (type == CODEC_MAP.end()) {
return Status(Substitute("$0$1", UNKNOWN_CODEC_ERROR, codec));
}
RETURN_IF_ERROR(
CreateDecompressor(mem_pool, reuse, type->second, decompressor));
return Status::OK();
}
Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
THdfsCompression::type format, scoped_ptr<Codec>* decompressor) {
switch (format) {
case THdfsCompression::NONE:
decompressor->reset(nullptr);
return Status::OK();
case THdfsCompression::DEFAULT:
case THdfsCompression::GZIP:
decompressor->reset(new GzipDecompressor(mem_pool, reuse, false));
break;
case THdfsCompression::DEFLATE:
decompressor->reset(new GzipDecompressor(mem_pool, reuse, true));
break;
case THdfsCompression::BZIP2:
decompressor->reset(new BzipDecompressor(mem_pool, reuse));
break;
case THdfsCompression::SNAPPY_BLOCKED:
decompressor->reset(new SnappyBlockDecompressor(mem_pool, reuse));
break;
case THdfsCompression::SNAPPY:
decompressor->reset(new SnappyDecompressor(mem_pool, reuse));
break;
case THdfsCompression::LZ4:
decompressor->reset(new Lz4Decompressor(mem_pool, reuse));
break;
case THdfsCompression::ZSTD:
decompressor->reset(new ZstandardDecompressor(mem_pool, reuse));
break;
case THdfsCompression::LZ4_BLOCKED:
decompressor->reset(new Lz4BlockDecompressor(mem_pool, reuse));
break;
default: {
if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
return Status(Substitute("Unsupported codec: $0", format));
}
}
return (*decompressor)->Init();
}
Codec::Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming)
: memory_pool_(mem_pool),
reuse_buffer_(reuse_buffer),
supports_streaming_(supports_streaming) {
if (memory_pool_ != nullptr) {
temp_memory_pool_.reset(new MemPool(memory_pool_->mem_tracker()));
}
}
void Codec::Close() {
if (temp_memory_pool_.get() != nullptr) {
DCHECK(memory_pool_ != nullptr);
memory_pool_->AcquireData(temp_memory_pool_.get(), false);
}
}
Status Codec::ProcessBlock32(bool output_preallocated, int input_length,
const uint8_t* input, int* output_length, uint8_t** output) {
int64_t input_len64 = input_length;
int64_t output_len64 = *output_length;
RETURN_IF_ERROR(
ProcessBlock(output_preallocated, input_len64, input, &output_len64, output));
// Buffer size should be between [0, (2^31 - 1)] bytes.
if (UNLIKELY(!BitUtil::IsNonNegative32Bit(output_len64))) {
return Status(Substitute("Arithmetic overflow in codec function. Output length is $0",
output_len64));;
}
*output_length = static_cast<int>(output_len64);
return Status::OK();
}