blob: 000b5858751d5cb0cd1d138723d275849cfd0104 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/decompress.h"
// Codec libraries
#include <zlib.h>
#include <bzlib.h>
#undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this.
#include <lz4.h>
#include <snappy.h>
#include <zstd.h>
#include <zstd_errors.h>
#include "common/logging.h"
#include "exec/read-write-util.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "common/names.h"
using namespace impala;
using namespace strings;
const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED =
"$0Decompressor failed to allocate $1 bytes.";
GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate)
: Codec(mem_pool, reuse_buffer, true),
is_deflate_(is_deflate) {
bzero(&stream_, sizeof(stream_));
}
GzipDecompressor::~GzipDecompressor() {
(void)inflateEnd(&stream_);
}
Status GzipDecompressor::Init() {
// Initialize to run either deflate or zlib/gzip format
int window_bits = is_deflate_ ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
int ret = inflateInit2(&stream_, window_bits);
if (ret != Z_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Gzip",
"inflateInit2()", ret);
}
return Status::OK();
}
int64_t GzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
return -1;
}
string GzipDecompressor::DebugStreamState() const {
stringstream ss;
ss << "next_in=" << (void*)stream_.next_in;
ss << " avail_in=" << stream_.avail_in;
ss << " total_in=" << stream_.total_in;
ss << " next_out=" << (void*)stream_.next_out;
ss << " avail_out=" << stream_.avail_out;
ss << " total_out=" << stream_.total_out;
return ss.str();
}
Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* stream_end) {
if (!reuse_buffer_ || out_buffer_ == nullptr) {
buffer_length_ = STREAM_OUT_BUF_SIZE;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
buffer_length_);
return memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = input_length;
stream_.next_out = reinterpret_cast<Bytef*>(*output);
stream_.avail_out = buffer_length_;
*stream_end = false;
*input_bytes_read = 0;
*output_length = 0;
while (stream_.avail_out > 0 && stream_.avail_in > 0) {
*stream_end = false;
// inflate() performs one or both of the following actions:
// Decompress more input starting at next_in and update next_in and avail_in
// accordingly.
// Provide more output starting at next_out and update next_out and avail_out
// accordingly.
// inflate() returns Z_OK if some progress has been made (more input processed
// or more output produced)
int ret = inflate(&stream_, Z_SYNC_FLUSH);
*input_bytes_read = input_length - stream_.avail_in;
*output_length = buffer_length_ - stream_.avail_out;
VLOG_ROW << "inflate() ret=" << ret << " consumed=" << *input_bytes_read
<< " produced=" << *output_length << " stream: " << DebugStreamState();
if (ret == Z_DATA_ERROR) {
return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, "Gzip");
} else if (ret == Z_BUF_ERROR) {
// Z_BUF_ERROR indicates that inflate() could not consume more input or
// produce more output. inflate() can be called again with more output space
// or more available input.
VLOG_ROW << "inflate() ret=" << ret << ", cannot make progress, need more input";
return Status::OK();
} else if (ret == Z_STREAM_END) {
*stream_end = true;
ret = inflateReset(&stream_);
if (ret != Z_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Gzip",
"inflateReset()", ret);
}
} else if (ret != Z_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Gzip",
"inflate()", ret);
}
DCHECK_EQ(ret, Z_OK);
}
return Status::OK();
}
Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
int64_t output_length_local = *output_length;
*output_length = 0;
if (UNLIKELY(output_preallocated && output_length_local == 0)) {
// The zlib library does not allow *output to be nullptr, even when output_length is 0
// (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail
// early if no output is expected. Note that we don't signal an error if the input
// actually contains compressed data.
return Status::OK();
}
bool use_temp = false;
if (!output_preallocated) {
if (!reuse_buffer_ || out_buffer_ == nullptr) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
buffer_length_);
return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
}
use_temp = true;
*output = out_buffer_;
output_length_local = buffer_length_;
}
// Reset the stream for this block
int ret = inflateReset(&stream_);
if (ret != Z_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Gzip",
"inflateReset()", ret);
}
// We only support the non-streaming use case where we present it the entire
// compressed input and a buffer big enough to contain the entire decompressed
// output. In the case where we don't know the output, we just make a bigger
// buffer and try the non-streaming mode from the beginning again.
// TODO: IMPALA-3073 Verify if compressed block could be multistream. If yes, we need
// to support it and shouldn't stop decompressing while ret == Z_STREAM_END.
while (ret != Z_STREAM_END) {
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = input_length;
stream_.next_out = reinterpret_cast<Bytef*>(*output);
stream_.avail_out = output_length_local;
if (use_temp) {
// We don't know the output size, so this might fail.
ret = inflate(&stream_, Z_PARTIAL_FLUSH);
} else {
// We know the output size. In this case, we can use Z_FINISH
// which is more efficient.
ret = inflate(&stream_, Z_FINISH);
}
if (ret == Z_STREAM_END || ret != Z_OK) break;
// Not enough output space.
if (!use_temp) {
stringstream ss;
ss << "Too small a buffer passed to GzipDecompressor. InputLength="
<< input_length << " OutputLength=" << output_length_local;
return Status(ss.str());
}
// User didn't supply the buffer, double the buffer and try again.
temp_memory_pool_->Clear();
buffer_length_ *= 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
buffer_length_);
return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
*output = out_buffer_;
output_length_local = buffer_length_;
ret = inflateReset(&stream_);
}
if (ret == Z_DATA_ERROR) {
return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, "Gzip");
} else if (ret != Z_STREAM_END) {
stringstream ss;
ss << "GzipDecompressor failed: ";
if (stream_.msg != nullptr) ss << stream_.msg;
return Status(ss.str());
}
// stream_.avail_out is the number of bytes *left* in the out buffer, but
// we're interested in the number of bytes used.
*output_length = output_length_local - stream_.avail_out;
if (use_temp) memory_pool_->AcquireData(temp_memory_pool_.get(), reuse_buffer_);
return Status::OK();
}
BzipDecompressor::BzipDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer, true) {
bzero(&stream_, sizeof(stream_));
}
BzipDecompressor::~BzipDecompressor() {
BZ2_bzDecompressEnd(&stream_);
}
Status BzipDecompressor::Init() {
int ret = BZ2_bzDecompressInit(&stream_, 0, 0);
if (ret != BZ_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Bzip2",
"BZ2_bzDecompressInit()", ret);
}
return Status::OK();
}
int64_t BzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
return -1;
}
Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
int64_t output_length_local = *output_length;
*output_length = 0;
if (UNLIKELY(output_preallocated && output_length_local == 0)) {
// Same problem as zlib library, see comment in GzipDecompressor::ProcessBlock().
return Status::OK();
}
bool use_temp = false;
if (output_preallocated) {
buffer_length_ = output_length_local;
out_buffer_ = *output;
} else if (!reuse_buffer_ || out_buffer_ == nullptr) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
buffer_length_);
return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
use_temp = true;
}
int ret = BZ_OUTBUFF_FULL;
unsigned int outlen = static_cast<unsigned int>(buffer_length_);
// TODO: IMPALA-3073 Verify if compressed block could be multistream. If yes, we need
// to support it and shouldn't stop decompressing while ret == BZ_STREAM_END.
while (ret == BZ_OUTBUFF_FULL) {
if (out_buffer_ == nullptr) {
DCHECK(!output_preallocated);
temp_memory_pool_->Clear();
buffer_length_ = buffer_length_ * 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
buffer_length_);
return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
}
outlen = static_cast<unsigned int>(buffer_length_);
if ((ret = BZ2_bzBuffToBuffDecompress(reinterpret_cast<char*>(out_buffer_), &outlen,
const_cast<char*>(reinterpret_cast<const char*>(input)),
static_cast<unsigned int>(input_length), 0, 0)) == BZ_OUTBUFF_FULL) {
if (output_preallocated) {
return Status("Too small a buffer passed to BzipDecompressor");
}
out_buffer_ = nullptr;
}
}
if (ret == BZ_DATA_ERROR) {
return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, "Bzip2");
} else if (ret != BZ_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Bzip2",
"BZ2_bzBuffToBuffDecompressor()", ret);
}
*output = out_buffer_;
*output_length = outlen;
if (use_temp) memory_pool_->AcquireData(temp_memory_pool_.get(), reuse_buffer_);
return Status::OK();
}
string BzipDecompressor::DebugStreamState() const {
stringstream ss;
ss << "Stream: " << &stream_;
ss << " next_in=" << stream_.next_in;
ss << " avail_in=" << stream_.avail_in;
ss << " next_out=" << stream_.next_out;
ss << " avail_out=" << stream_.avail_out;
return ss.str();
}
// Decompress bzip2 data as a stream so we don't need to read the whole file
// to decompress at once. Possible formats:
// 1. Single stream file,
// ProcessBlockStreaming() will be called until the end of the file is reached.
// 2. Multiple streams concatenated into a single file.
// ProcessBlockStreaming() should be called multiple times until the end
// of the file is reached. Each stream could be pretty small (<= 900k).
// We try to consume as many streams as possible in one call to avoid
// re-reading input data and allocating output buffer for each stream.
//
// Return if the output buffer is full or reach end of file or encounter an
// error.
Status BzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* stream_end) {
if (!reuse_buffer_ || out_buffer_ == nullptr) {
buffer_length_ = STREAM_OUT_BUF_SIZE;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
buffer_length_);
return memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
stream_.next_in = const_cast<char*>(reinterpret_cast<const char*>(input));
stream_.avail_in = input_length;
stream_.next_out = reinterpret_cast<char*>(*output);
stream_.avail_out = buffer_length_;
*stream_end = false;
*input_bytes_read = 0;
*output_length = 0;
while (stream_.avail_out > 0 && stream_.avail_in > 0) {
*stream_end = false;
int ret = BZ2_bzDecompress(&stream_);
*output_length = buffer_length_ - stream_.avail_out;
*input_bytes_read = input_length - stream_.avail_in;
if (ret == BZ_DATA_ERROR || ret == BZ_DATA_ERROR_MAGIC) {
return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, "Bzip2");
} else if (ret == BZ_STREAM_END) {
*stream_end = true;
ret = BZ2_bzDecompressEnd(&stream_);
if (ret != BZ_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Bzip2",
"BZ2_bzDecompressEnd()", ret);
}
ret = BZ2_bzDecompressInit(&stream_, 0, 0);
if (ret != BZ_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Bzip2",
"BZ2_bzDecompressInit()", ret);
}
} else if (ret != BZ_OK) {
return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Bzip2",
"BZ2_bzDecompress()", ret);
}
DCHECK_EQ(ret, BZ_OK);
}
return Status::OK();
}
SnappyBlockDecompressor::SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
}
int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
return -1;
}
// Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs
// (BlockCompressorStream.java and BlockDecompressorStream.java) the input is split
// into blocks. Each block "contains the uncompressed length for the block followed
// by one of more length-prefixed blocks of compressed data."
// This is essentially blocks of blocks.
// The outer block consists of:
// - 4 byte big endian uncompressed_size
// < inner blocks >
// ... repeated until input_len is consumed ..
// The inner blocks have:
// - 4-byte big endian compressed_size
// < snappy compressed block >
// - 4-byte big endian compressed_size
// < snappy compressed block >
// ... repeated until uncompressed_size from outer block is consumed ...
// Utility function to decompress snappy block compressed data. If size_only is true,
// this function does not decompress but only computes the output size and writes
// the result to *output_len.
// If size_only is false, output buffer size must be at least *output_len. *output_len is
// updated with the actual output size if the decompression succeeds, and is set to 0
// otherwise.
// size_only is an O(1) operation (just reads a single varint for each snappy block).
static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
bool size_only, int64_t* output_len, char* output) {
int64_t buffer_size = *output_len;
*output_len = 0;
int64_t uncompressed_total_len = 0;
while (input_len > 0) {
uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
input += sizeof(uint32_t);
input_len -= sizeof(uint32_t);
if (!size_only) {
int64_t remaining_output_size = buffer_size - uncompressed_total_len;
if (remaining_output_size < uncompressed_block_len) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
}
}
while (uncompressed_block_len > 0) {
// Check that input length should not be negative.
if (input_len < 0) {
stringstream ss;
ss << " Corruption snappy decomp input_len " << input_len;
return Status(ss.str());
}
// Read the length of the next snappy compressed block.
size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
input += sizeof(uint32_t);
input_len -= sizeof(uint32_t);
if (compressed_len == 0 || compressed_len > input_len) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
}
// Read how big the output will be.
size_t uncompressed_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
compressed_len, &uncompressed_len)) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
}
// Check that uncompressed length should be greater than 0.
if (uncompressed_len <= 0) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
}
DCHECK_GT(uncompressed_len, 0);
if (!size_only) {
// Check output bounds
int64_t remaining_output_size = buffer_size - uncompressed_total_len;
if (remaining_output_size < uncompressed_len) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
}
// Decompress this snappy block
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
compressed_len, output)) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
}
output += uncompressed_len;
}
input += compressed_len;
input_len -= compressed_len;
uncompressed_block_len -= uncompressed_len;
uncompressed_total_len += uncompressed_len;
}
}
*output_len = uncompressed_total_len;
return Status::OK();
}
Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t input_len,
const uint8_t* input, int64_t* output_len, uint8_t** output) {
int64_t output_length_local = *output_len;
*output_len = 0;
if (!output_preallocated) {
// If we don't know the size beforehand, compute it.
RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, &output_length_local,
nullptr));
if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < output_length_local) {
// Need to allocate a new buffer
buffer_length_ = output_length_local;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "SnappyBlock",
buffer_length_);
return memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
}
char* out_ptr = reinterpret_cast<char*>(*output);
RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, &output_length_local,
out_ptr));
*output_len = output_length_local;
return Status::OK();
}
SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
}
int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
if (input_len <= 0) return -1;
DCHECK(input != nullptr);
size_t result;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
input_len, &result)) {
return -1;
}
return result;
}
Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
int64_t output_length_local = *output_length;
*output_length = 0;
int64_t uncompressed_length = MaxOutputLen(input_length, input);
if (uncompressed_length < 0) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
}
if (!output_preallocated) {
if (!reuse_buffer_ || out_buffer_ == nullptr
|| buffer_length_ < uncompressed_length) {
buffer_length_ = uncompressed_length;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
buffer_length_);
return memory_pool_->mem_tracker()->MemLimitExceeded(
nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
} else {
// If the preallocated buffer is too small (e.g. if the file metadata is corrupt),
// bail out early. Otherwise, this could result in a buffer overrun.
if (uncompressed_length > output_length_local) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
}
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
return Status("Snappy: RawUncompress failed");
}
*output_length = uncompressed_length;
return Status::OK();
}
Lz4Decompressor::Lz4Decompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
}
int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
DCHECK(input != nullptr) << "Passed null input to Lz4 Decompressor";
return -1;
}
Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
if(*output_length == 0) return Status::OK();
int ret = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(*output), input_length, *output_length);
if (ret < 0) {
*output_length = 0;
return Status("Lz4: uncompress failed");
}
*output_length = ret;
return Status::OK();
}
ZstandardDecompressor::ZstandardDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {}
int64_t ZstandardDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
return -1;
}
Status ZstandardDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec";
if (*output_length == 0) return Status::OK();
size_t ret = ZSTD_decompress(*output, *output_length, input, input_length);
if (ZSTD_isError(ret)) {
*output_length = 0;
return Status(TErrorCode::ZSTD_ERROR, "ZSTD_decompress",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
*output_length = ret;
return Status::OK();
}
Lz4BlockDecompressor::Lz4BlockDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
}
int64_t Lz4BlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
DCHECK(input != nullptr) << "Passed null input to Lz4 Decompressor";
return -1;
}
// Decompresses a block compressed using Hadoop's lz4 block compression scheme. The
// compressed block layout is similar to Hadoop's snappy block compression scheme, with
// the only difference being the compression codec used. For more details please refer
// to the comment section for the SnappyBlockDecompress above.
Status Lz4BlockDecompressor::ProcessBlock(bool output_preallocated, int64_t input_len,
const uint8_t* input, int64_t* output_len, uint8_t** output) {
DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
if(*output_len == 0) return Status::OK();
uint8_t* out_ptr = *output;
int64_t uncompressed_total_len = 0;
const int64_t buffer_size = *output_len;
*output_len = 0;
while (input_len > 0) {
uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
input += sizeof(uint32_t);
input_len -= sizeof(uint32_t);
int64_t remaining_output_size = buffer_size - uncompressed_total_len;
if (remaining_output_size < uncompressed_block_len) {
return Status(TErrorCode::LZ4_BLOCK_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
}
while (uncompressed_block_len > 0) {
// Check that input length should not be negative.
if (input_len < 0) {
return Status(TErrorCode::LZ4_BLOCK_DECOMPRESS_INVALID_INPUT_LENGTH);
}
// Read the length of the next lz4 compressed block.
size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
input += sizeof(uint32_t);
input_len -= sizeof(uint32_t);
if (compressed_len == 0 || compressed_len > input_len) {
return Status(TErrorCode::LZ4_BLOCK_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
}
// Decompress this block.
int64_t remaining_output_size = buffer_size - uncompressed_total_len;
int uncompressed_len = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(out_ptr), compressed_len, remaining_output_size);
if (uncompressed_len < 0) {
return Status(TErrorCode::LZ4_DECOMPRESS_SAFE_FAILED);
}
out_ptr += uncompressed_len;
input += compressed_len;
input_len -= compressed_len;
uncompressed_block_len -= uncompressed_len;
uncompressed_total_len += uncompressed_len;
}
}
*output_len = uncompressed_total_len;
return Status::OK();
}