blob: 0c41bcdabfc31836a50dc01099520ef81101f455 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UTIL_DECOMPRESS_H
#define IMPALA_UTIL_DECOMPRESS_H
// We need zlib.h here to declare stream_ below.
#include <zlib.h>
#include <bzlib.h>
#include "util/codec.h"
namespace impala {
class MemPool;
class GzipDecompressor : public Codec {
public:
GzipDecompressor(
MemPool* mem_pool = nullptr, bool reuse_buffer = false, bool is_deflate = false);
virtual ~GzipDecompressor();
virtual Status Init() override WARN_UNUSED_RESULT;
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* stream_end) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "gz"; }
private:
std::string DebugStreamState() const;
/// If set assume deflate format, otherwise zlib or gzip
bool is_deflate_;
z_stream stream_;
/// These are magic numbers from zlib.h. Not clear why they are not defined there.
const static int WINDOW_BITS = 15; // Maximum window size
const static int DETECT_CODEC = 32; // Determine if this is libz or gzip from header.
};
class BzipDecompressor : public Codec {
public:
BzipDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~BzipDecompressor();
virtual Status Init() override WARN_UNUSED_RESULT;
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* stream_end) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "bz2"; }
private:
std::string DebugStreamState() const;
/// Used for streaming decompression.
bz_stream stream_;
};
class SnappyDecompressor : public Codec {
public:
/// Snappy-compressed data block includes trailing 4-byte checksum. Decompressor
/// doesn't expect this.
static const uint TRAILING_CHECKSUM_LEN = 4;
SnappyDecompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual ~SnappyDecompressor() { }
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "snappy"; }
};
/// Lz4 is a compression codec with similar compression ratios as snappy but much faster
/// decompression. This decompressor is not able to decompress unless the output buffer
/// is allocated and will cause an error if asked to do so.
class Lz4Decompressor : public Codec {
public:
virtual ~Lz4Decompressor() { }
Lz4Decompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "lz4"; }
};
class SnappyBlockDecompressor : public Codec {
public:
SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyBlockDecompressor() { }
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "snappy"; }
};
class ZstandardDecompressor : public Codec {
public:
ZstandardDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~ZstandardDecompressor() {}
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "zstd"; }
};
/// Hadoop's block compression scheme on top of LZ4.
class Lz4BlockDecompressor : public Codec {
public:
virtual ~Lz4BlockDecompressor() { }
Lz4BlockDecompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "lz4"; }
};
}
#endif