| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "Compression.hh" |
| #include "Adaptor.hh" |
| #include "LzoDecompressor.hh" |
| #include "Utils.hh" |
| #include "lz4.h" |
| #include "orc/Exceptions.hh" |
| |
| #include <algorithm> |
| #include <array> |
| #include <iomanip> |
| #include <iostream> |
| #include <sstream> |
| |
| #include "zlib.h" |
| #include "zstd.h" |
| |
| #include "wrap/snappy-wrapper.h" |
| |
| #ifndef ZSTD_CLEVEL_DEFAULT |
| #define ZSTD_CLEVEL_DEFAULT 3 |
| #endif |
| |
| /* These macros are defined in lz4.c */ |
| #ifndef LZ4_ACCELERATION_DEFAULT |
| #define LZ4_ACCELERATION_DEFAULT 1 |
| #endif |
| |
| #ifndef LZ4_ACCELERATION_MAX |
| #define LZ4_ACCELERATION_MAX 65537 |
| #endif |
| |
| namespace orc { |
| |
| class CompressionStreamBase : public BufferedOutputStream { |
| public: |
| CompressionStreamBase(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics); |
| |
| virtual bool Next(void** data, int* size) override = 0; |
| virtual void BackUp(int count) override; |
| |
| virtual std::string getName() const override = 0; |
| virtual uint64_t flush() override; |
| virtual void suppress() override; |
| |
| virtual bool isCompressed() const override { |
| return true; |
| } |
| virtual uint64_t getSize() const override; |
| |
| protected: |
| void writeData(const unsigned char* data, int size); |
| |
| void writeHeader(size_t compressedSize, bool original) { |
| *header[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0)); |
| *header[1] = static_cast<char>(compressedSize >> 7); |
| *header[2] = static_cast<char>(compressedSize >> 15); |
| } |
| |
| // ensure enough room for compression block header |
| void ensureHeader(); |
| |
| // Buffer to hold uncompressed data until user calls Next() |
| DataBuffer<unsigned char> rawInputBuffer; |
| |
| // Compress level |
| int level; |
| |
| // Compressed data output buffer |
| char* outputBuffer; |
| |
| // Size for compressionBuffer |
| int bufferSize; |
| |
| // Compress output position |
| int outputPosition; |
| |
| // Compress output buffer size |
| int outputSize; |
| |
| // Compression block header pointer array |
| static const uint32_t HEADER_SIZE = 3; |
| std::array<char*, HEADER_SIZE> header; |
| }; |
| |
| CompressionStreamBase::CompressionStreamBase(OutputStream* outStream, int compressionLevel, |
| uint64_t capacity, uint64_t blockSize, |
| MemoryPool& pool, WriterMetrics* metrics) |
| : BufferedOutputStream(pool, outStream, capacity, blockSize, metrics), |
| rawInputBuffer(pool, blockSize), |
| level(compressionLevel), |
| outputBuffer(nullptr), |
| bufferSize(0), |
| outputPosition(0), |
| outputSize(0) { |
| // init header pointer array |
| header.fill(nullptr); |
| } |
| |
| void CompressionStreamBase::BackUp(int count) { |
| if (count > bufferSize) { |
| throw std::logic_error("Can't backup that much!"); |
| } |
| bufferSize -= count; |
| } |
| |
| uint64_t CompressionStreamBase::flush() { |
| void* data; |
| int size; |
| if (!Next(&data, &size)) { |
| throw std::runtime_error("Failed to flush compression buffer."); |
| } |
| BufferedOutputStream::BackUp(outputSize - outputPosition); |
| bufferSize = outputSize = outputPosition = 0; |
| return BufferedOutputStream::flush(); |
| } |
| |
| void CompressionStreamBase::suppress() { |
| outputBuffer = nullptr; |
| bufferSize = outputPosition = outputSize = 0; |
| BufferedOutputStream::suppress(); |
| } |
| |
| uint64_t CompressionStreamBase::getSize() const { |
| return BufferedOutputStream::getSize() - static_cast<uint64_t>(outputSize - outputPosition); |
| } |
| |
| // write the data content into outputBuffer |
| void CompressionStreamBase::writeData(const unsigned char* data, int size) { |
| int offset = 0; |
| while (offset < size) { |
| if (outputPosition == outputSize) { |
| if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) { |
| throw std::runtime_error("Failed to get next output buffer from output stream."); |
| } |
| outputPosition = 0; |
| } else if (outputPosition > outputSize) { |
| // for safety this will unlikely happen |
| throw std::logic_error("Write to an out-of-bound place during compression!"); |
| } |
| int currentSize = std::min(outputSize - outputPosition, size - offset); |
| memcpy(outputBuffer + outputPosition, data + offset, static_cast<size_t>(currentSize)); |
| offset += currentSize; |
| outputPosition += currentSize; |
| } |
| } |
| |
| void CompressionStreamBase::ensureHeader() { |
| // adjust 3 bytes for the compression header |
| for (uint32_t i = 0; i < HEADER_SIZE; ++i) { |
| if (outputPosition >= outputSize) { |
| if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) { |
| throw std::runtime_error("Failed to get next output buffer from output stream."); |
| } |
| outputPosition = 0; |
| } |
| header[i] = outputBuffer + outputPosition; |
| ++outputPosition; |
| } |
| } |
| |
| /** |
| * Streaming compression base class |
| */ |
| class CompressionStream : public CompressionStreamBase { |
| public: |
| CompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics); |
| |
| virtual bool Next(void** data, int* size) override; |
| virtual std::string getName() const override = 0; |
| |
| protected: |
| // return total compressed size |
| virtual uint64_t doStreamingCompression() = 0; |
| }; |
| |
| CompressionStream::CompressionStream(OutputStream* outStream, int compressionLevel, |
| uint64_t capacity, uint64_t blockSize, MemoryPool& pool, |
| WriterMetrics* metrics) |
| : CompressionStreamBase(outStream, compressionLevel, capacity, blockSize, pool, metrics) { |
| // PASS |
| } |
| |
| bool CompressionStream::Next(void** data, int* size) { |
| if (bufferSize != 0) { |
| ensureHeader(); |
| |
| uint64_t preSize = getSize(); |
| uint64_t totalCompressedSize = doStreamingCompression(); |
| if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) { |
| writeHeader(static_cast<size_t>(bufferSize), true); |
| // reset output buffer |
| outputBuffer = nullptr; |
| outputPosition = outputSize = 0; |
| uint64_t backup = getSize() - preSize; |
| BufferedOutputStream::BackUp(static_cast<int>(backup)); |
| |
| // copy raw input buffer into block buffer |
| writeData(rawInputBuffer.data(), bufferSize); |
| } else { |
| writeHeader(totalCompressedSize, false); |
| } |
| } |
| |
| *data = rawInputBuffer.data(); |
| *size = static_cast<int>(rawInputBuffer.size()); |
| bufferSize = *size; |
| |
| return true; |
| } |
| |
| class ZlibCompressionStream : public CompressionStream { |
| public: |
| ZlibCompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics); |
| |
| virtual ~ZlibCompressionStream() override { |
| end(); |
| } |
| |
| virtual std::string getName() const override; |
| |
| protected: |
| virtual uint64_t doStreamingCompression() override; |
| |
| private: |
| void init(); |
| void end(); |
| z_stream strm; |
| }; |
| |
| ZlibCompressionStream::ZlibCompressionStream(OutputStream* outStream, int compressionLevel, |
| uint64_t capacity, uint64_t blockSize, |
| MemoryPool& pool, WriterMetrics* metrics) |
| : CompressionStream(outStream, compressionLevel, capacity, blockSize, pool, metrics) { |
| init(); |
| } |
| |
| uint64_t ZlibCompressionStream::doStreamingCompression() { |
| if (deflateReset(&strm) != Z_OK) { |
| throw std::runtime_error("Failed to reset inflate."); |
| } |
| |
| strm.avail_in = static_cast<unsigned int>(bufferSize); |
| strm.next_in = rawInputBuffer.data(); |
| |
| do { |
| if (outputPosition >= outputSize) { |
| if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) { |
| throw std::runtime_error("Failed to get next output buffer from output stream."); |
| } |
| outputPosition = 0; |
| } |
| strm.next_out = reinterpret_cast<unsigned char*>(outputBuffer + outputPosition); |
| strm.avail_out = static_cast<unsigned int>(outputSize - outputPosition); |
| |
| int ret = deflate(&strm, Z_FINISH); |
| outputPosition = outputSize - static_cast<int>(strm.avail_out); |
| |
| if (ret == Z_STREAM_END) { |
| break; |
| } else if (ret == Z_OK) { |
| // needs more buffer so will continue the loop |
| } else { |
| throw std::runtime_error("Failed to deflate input data."); |
| } |
| } while (strm.avail_out == 0); |
| |
| return strm.total_out; |
| } |
| |
| std::string ZlibCompressionStream::getName() const { |
| return "ZlibCompressionStream"; |
| } |
| |
| DIAGNOSTIC_PUSH |
| |
| #if defined(__GNUC__) || defined(__clang__) |
| DIAGNOSTIC_IGNORE("-Wold-style-cast") |
| #endif |
| |
| void ZlibCompressionStream::init() { |
| strm.zalloc = nullptr; |
| strm.zfree = nullptr; |
| strm.opaque = nullptr; |
| strm.next_in = nullptr; |
| |
| if (deflateInit2(&strm, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) { |
| throw std::runtime_error("Error while calling deflateInit2() for zlib."); |
| } |
| } |
| |
| void ZlibCompressionStream::end() { |
| (void)deflateEnd(&strm); |
| } |
| |
| DIAGNOSTIC_PUSH |
| |
| enum DecompressState { |
| DECOMPRESS_HEADER, |
| DECOMPRESS_START, |
| DECOMPRESS_CONTINUE, |
| DECOMPRESS_ORIGINAL, |
| DECOMPRESS_EOF |
| }; |
| |
| std::string decompressStateToString(DecompressState state) { |
| switch (state) { |
| case DECOMPRESS_HEADER: |
| return "DECOMPRESS_HEADER"; |
| case DECOMPRESS_START: |
| return "DECOMPRESS_START"; |
| case DECOMPRESS_CONTINUE: |
| return "DECOMPRESS_CONTINUE"; |
| case DECOMPRESS_ORIGINAL: |
| return "DECOMPRESS_ORIGINAL"; |
| case DECOMPRESS_EOF: |
| return "DECOMPRESS_EOF"; |
| } |
| return "unknown"; |
| } |
| |
| class DecompressionStream : public SeekableInputStream { |
| public: |
| DecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t bufferSize, |
| MemoryPool& pool, ReaderMetrics* metrics); |
| virtual ~DecompressionStream() override {} |
| virtual bool Next(const void** data, int* size) override; |
| virtual void BackUp(int count) override; |
| virtual bool Skip(int count) override; |
| virtual int64_t ByteCount() const override; |
| virtual void seek(PositionProvider& position) override; |
| virtual std::string getName() const override = 0; |
| |
| protected: |
| virtual void NextDecompress(const void** data, int* size, size_t availableSize) = 0; |
| |
| std::string getStreamName() const; |
| void readBuffer(bool failOnEof); |
| uint32_t readByte(bool failOnEof); |
| void readHeader(); |
| |
| MemoryPool& pool; |
| std::unique_ptr<SeekableInputStream> input; |
| |
| // uncompressed output |
| DataBuffer<char> outputDataBuffer; |
| |
| // the current state |
| DecompressState state; |
| |
| // The starting and current position of the buffer for the uncompressed |
| // data. It either points to the data buffer or the underlying input stream. |
| const char* outputBufferStart; |
| const char* outputBuffer; |
| size_t outputBufferLength; |
| // The uncompressed buffer length. For compressed chunk, it's the original |
| // (ie. the overall) and the actual length of the decompressed data. |
| // For uncompressed chunk, it's the length of the loaded data of this chunk. |
| size_t uncompressedBufferLength; |
| |
| // The remaining size of the current chunk that is not yet consumed |
| // ie. decompressed or returned in output if state==DECOMPRESS_ORIGINAL |
| size_t remainingLength; |
| |
| // the last buffer returned from the input |
| const char* inputBufferStart; |
| const char* inputBuffer; |
| const char* inputBufferEnd; |
| |
| // Variables for saving the position of the header and the start of the |
| // buffer. Used when we have to seek a position. |
| size_t headerPosition; |
| size_t inputBufferStartPosition; |
| |
| // roughly the number of bytes returned |
| off_t bytesReturned; |
| |
| ReaderMetrics* metrics; |
| }; |
| |
| DecompressionStream::DecompressionStream(std::unique_ptr<SeekableInputStream> inStream, |
| size_t bufferSize, MemoryPool& _pool, |
| ReaderMetrics* _metrics) |
| : pool(_pool), |
| input(std::move(inStream)), |
| outputDataBuffer(pool, bufferSize), |
| state(DECOMPRESS_HEADER), |
| outputBufferStart(nullptr), |
| outputBuffer(nullptr), |
| outputBufferLength(0), |
| uncompressedBufferLength(0), |
| remainingLength(0), |
| inputBufferStart(nullptr), |
| inputBuffer(nullptr), |
| inputBufferEnd(nullptr), |
| headerPosition(0), |
| inputBufferStartPosition(0), |
| bytesReturned(0), |
| metrics(_metrics) {} |
| |
| std::string DecompressionStream::getStreamName() const { |
| return input->getName(); |
| } |
| |
| void DecompressionStream::readBuffer(bool failOnEof) { |
| SCOPED_MINUS_STOPWATCH(metrics, DecompressionLatencyUs); |
| int length; |
| if (!input->Next(reinterpret_cast<const void**>(&inputBuffer), &length)) { |
| if (failOnEof) { |
| throw ParseError("Read past EOF in DecompressionStream::readBuffer"); |
| } |
| state = DECOMPRESS_EOF; |
| inputBuffer = nullptr; |
| inputBufferEnd = nullptr; |
| inputBufferStart = nullptr; |
| } else { |
| inputBufferEnd = inputBuffer + length; |
| inputBufferStartPosition = static_cast<size_t>(input->ByteCount() - length); |
| inputBufferStart = inputBuffer; |
| } |
| } |
| |
| uint32_t DecompressionStream::readByte(bool failOnEof) { |
| if (inputBuffer == inputBufferEnd) { |
| readBuffer(failOnEof); |
| if (state == DECOMPRESS_EOF) { |
| return 0; |
| } |
| } |
| return static_cast<unsigned char>(*(inputBuffer++)); |
| } |
| |
| void DecompressionStream::readHeader() { |
| uint32_t header = readByte(false); |
| if (state != DECOMPRESS_EOF) { |
| header |= readByte(true) << 8; |
| header |= readByte(true) << 16; |
| if (header & 1) { |
| state = DECOMPRESS_ORIGINAL; |
| } else { |
| state = DECOMPRESS_START; |
| } |
| remainingLength = header >> 1; |
| } else { |
| remainingLength = 0; |
| } |
| } |
| |
| bool DecompressionStream::Next(const void** data, int* size) { |
| SCOPED_STOPWATCH(metrics, DecompressionLatencyUs, DecompressionCall); |
| // If we are starting a new header, we will have to store its positions |
| // after decompressing. |
| bool saveBufferPositions = false; |
| // If the user pushed back or seeked within the same chunk. |
| if (outputBufferLength) { |
| *data = outputBuffer; |
| *size = static_cast<int>(outputBufferLength); |
| outputBuffer += outputBufferLength; |
| bytesReturned += static_cast<off_t>(outputBufferLength); |
| outputBufferLength = 0; |
| return true; |
| } |
| if (state == DECOMPRESS_HEADER || remainingLength == 0) { |
| readHeader(); |
| // Here we already read the three bytes of the header. |
| headerPosition = |
| inputBufferStartPosition + static_cast<size_t>(inputBuffer - inputBufferStart) - 3; |
| saveBufferPositions = true; |
| } |
| if (state == DECOMPRESS_EOF) { |
| return false; |
| } |
| if (inputBuffer == inputBufferEnd) { |
| readBuffer(true); |
| } |
| size_t availableSize = |
| std::min(static_cast<size_t>(inputBufferEnd - inputBuffer), remainingLength); |
| if (state == DECOMPRESS_ORIGINAL) { |
| *data = inputBuffer; |
| *size = static_cast<int>(availableSize); |
| outputBuffer = inputBuffer + availableSize; |
| outputBufferLength = 0; |
| inputBuffer += availableSize; |
| remainingLength -= availableSize; |
| } else if (state == DECOMPRESS_START) { |
| NextDecompress(data, size, availableSize); |
| } else { |
| throw std::logic_error( |
| "Unknown compression state in " |
| "DecompressionStream::Next"); |
| } |
| bytesReturned += static_cast<off_t>(*size); |
| if (saveBufferPositions) { |
| uncompressedBufferLength = static_cast<size_t>(*size); |
| outputBufferStart = reinterpret_cast<const char*>(*data); |
| } |
| return true; |
| } |
| |
| void DecompressionStream::BackUp(int count) { |
| if (outputBuffer == nullptr || outputBufferLength != 0) { |
| throw std::logic_error("Backup without previous Next in " + getName()); |
| } |
| outputBuffer -= static_cast<size_t>(count); |
| outputBufferLength = static_cast<size_t>(count); |
| bytesReturned -= count; |
| } |
| |
| int64_t DecompressionStream::ByteCount() const { |
| return bytesReturned; |
| } |
| |
| bool DecompressionStream::Skip(int count) { |
| bytesReturned += count; |
| // this is a stupid implementation for now. |
| // should skip entire blocks without decompressing |
| while (count > 0) { |
| const void* ptr; |
| int len; |
| if (!Next(&ptr, &len)) { |
| return false; |
| } |
| if (len > count) { |
| BackUp(len - count); |
| count = 0; |
| } else { |
| count -= len; |
| } |
| } |
| return true; |
| } |
| |
| /** There are four possible scenarios when seeking a position: |
| * 1. The chunk of the seeked position is the current chunk that has been read and |
| * decompressed. For uncompressed chunk, it could be partially read. So there are two |
| * sub-cases: |
| * a. The seeked position is inside the uncompressed buffer. |
| * b. The seeked position is outside the uncompressed buffer. |
| * 2. The chunk of the seeked position is read from the input stream, but has not been |
| * decompressed yet, ie. it's not in the output stream. |
| * 3. The chunk of the seeked position is not read yet from the input stream. |
| */ |
| void DecompressionStream::seek(PositionProvider& position) { |
| size_t seekedHeaderPosition = position.current(); |
| // Case 1: the seeked position is in the current chunk and it's buffered and |
| // decompressed/uncompressed. Note that after the headerPosition comes the 3 bytes of |
| // the header. |
| if (headerPosition == seekedHeaderPosition && inputBufferStartPosition <= headerPosition + 3 && |
| inputBufferStart) { |
| position.next(); // Skip the input level position, i.e. seekedHeaderPosition. |
| size_t posInChunk = position.next(); // Chunk level position. |
| // Case 1.a: The position is in the decompressed/uncompressed buffer. Here we only |
| // need to set the output buffer's pointer to the seeked position. |
| if (uncompressedBufferLength >= posInChunk) { |
| outputBufferLength = uncompressedBufferLength - posInChunk; |
| outputBuffer = outputBufferStart + posInChunk; |
| return; |
| } |
| // Case 1.b: The position is outside the decompressed/uncompressed buffer. |
| // Skip bytes to seek. |
| if (!Skip(static_cast<int>(posInChunk - uncompressedBufferLength))) { |
| std::ostringstream ss; |
| ss << "Bad seek to (chunkHeader=" << seekedHeaderPosition << ", posInChunk=" << posInChunk |
| << ") in " << getName() << ". DecompressionState: " << decompressStateToString(state); |
| throw ParseError(ss.str()); |
| } |
| return; |
| } |
| // Clear state to prepare reading from a new chunk header. |
| state = DECOMPRESS_HEADER; |
| outputBuffer = nullptr; |
| outputBufferLength = 0; |
| remainingLength = 0; |
| if (seekedHeaderPosition < static_cast<uint64_t>(input->ByteCount()) && |
| seekedHeaderPosition >= inputBufferStartPosition) { |
| // Case 2: The input is buffered, but not yet decompressed. No need to |
| // force re-reading the inputBuffer, we just have to move it to the |
| // seeked position. |
| position.next(); // Skip the input level position. |
| inputBuffer = inputBufferStart + (seekedHeaderPosition - inputBufferStartPosition); |
| } else { |
| // Case 3: The seeked position is not in the input buffer, here we are |
| // forcing to read it. |
| inputBuffer = nullptr; |
| inputBufferEnd = nullptr; |
| input->seek(position); // Actually use the input level position. |
| } |
| bytesReturned = static_cast<off_t>(input->ByteCount()); |
| if (!Skip(static_cast<int>(position.next()))) { |
| throw ParseError("Bad skip in " + getName()); |
| } |
| } |
| |
| class ZlibDecompressionStream : public DecompressionStream { |
| public: |
| ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t blockSize, |
| MemoryPool& pool, ReaderMetrics* metrics); |
| virtual ~ZlibDecompressionStream() override; |
| virtual std::string getName() const override; |
| |
| protected: |
| virtual void NextDecompress(const void** data, int* size, size_t availableSize) override; |
| |
| private: |
| z_stream zstream; |
| }; |
| |
| DIAGNOSTIC_PUSH |
| |
| #if defined(__GNUC__) || defined(__clang__) |
| DIAGNOSTIC_IGNORE("-Wold-style-cast") |
| #endif |
| |
| ZlibDecompressionStream::ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, |
| size_t bufferSize, MemoryPool& _pool, |
| ReaderMetrics* _metrics) |
| : DecompressionStream(std::move(inStream), bufferSize, _pool, _metrics) { |
| zstream.next_in = nullptr; |
| zstream.avail_in = 0; |
| zstream.zalloc = nullptr; |
| zstream.zfree = nullptr; |
| zstream.opaque = nullptr; |
| zstream.next_out = reinterpret_cast<Bytef*>(outputDataBuffer.data()); |
| zstream.avail_out = static_cast<uInt>(outputDataBuffer.capacity()); |
| int64_t result = inflateInit2(&zstream, -15); |
| switch (result) { |
| case Z_OK: |
| break; |
| case Z_MEM_ERROR: |
| throw std::logic_error("Memory error from inflateInit2"); |
| case Z_VERSION_ERROR: |
| throw std::logic_error("Version error from inflateInit2"); |
| case Z_STREAM_ERROR: |
| throw std::logic_error("Stream error from inflateInit2"); |
| default: |
| throw std::logic_error("Unknown error from inflateInit2"); |
| } |
| } |
| |
| DIAGNOSTIC_POP |
| |
| ZlibDecompressionStream::~ZlibDecompressionStream() { |
| int64_t result = inflateEnd(&zstream); |
| if (result != Z_OK) { |
| // really can't throw in destructors |
| std::cout << "Error in ~ZlibDecompressionStream() " << result << "\n"; |
| } |
| } |
| |
| void ZlibDecompressionStream::NextDecompress(const void** data, int* size, size_t availableSize) { |
| zstream.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer)); |
| zstream.avail_in = static_cast<uInt>(availableSize); |
| outputBuffer = outputDataBuffer.data(); |
| zstream.next_out = reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer)); |
| zstream.avail_out = static_cast<uInt>(outputDataBuffer.capacity()); |
| if (inflateReset(&zstream) != Z_OK) { |
| throw std::logic_error( |
| "Bad inflateReset in " |
| "ZlibDecompressionStream::NextDecompress"); |
| } |
| int64_t result; |
| do { |
| result = inflate(&zstream, availableSize == remainingLength ? Z_FINISH : Z_SYNC_FLUSH); |
| switch (result) { |
| case Z_OK: |
| remainingLength -= availableSize; |
| inputBuffer += availableSize; |
| readBuffer(true); |
| availableSize = |
| std::min(static_cast<size_t>(inputBufferEnd - inputBuffer), remainingLength); |
| zstream.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer)); |
| zstream.avail_in = static_cast<uInt>(availableSize); |
| break; |
| case Z_STREAM_END: |
| break; |
| case Z_BUF_ERROR: |
| throw std::logic_error( |
| "Buffer error in " |
| "ZlibDecompressionStream::NextDecompress"); |
| case Z_DATA_ERROR: |
| throw std::logic_error( |
| "Data error in " |
| "ZlibDecompressionStream::NextDecompress"); |
| case Z_STREAM_ERROR: |
| throw std::logic_error( |
| "Stream error in " |
| "ZlibDecompressionStream::NextDecompress"); |
| default: |
| throw std::logic_error( |
| "Unknown error in " |
| "ZlibDecompressionStream::NextDecompress"); |
| } |
| } while (result != Z_STREAM_END); |
| *size = static_cast<int>(outputDataBuffer.capacity() - zstream.avail_out); |
| *data = outputBuffer; |
| outputBufferLength = 0; |
| outputBuffer += *size; |
| inputBuffer += availableSize; |
| remainingLength -= availableSize; |
| } |
| |
| std::string ZlibDecompressionStream::getName() const { |
| std::ostringstream result; |
| result << "zlib(" << input->getName() << ")"; |
| return result.str(); |
| } |
| |
| class BlockDecompressionStream : public DecompressionStream { |
| public: |
| BlockDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t blockSize, |
| MemoryPool& pool, ReaderMetrics* metrics); |
| |
| virtual ~BlockDecompressionStream() override {} |
| virtual std::string getName() const override = 0; |
| |
| protected: |
| virtual void NextDecompress(const void** data, int* size, size_t availableSize) override; |
| |
| virtual uint64_t decompress(const char* input, uint64_t length, char* output, |
| size_t maxOutputLength) = 0; |
| |
| private: |
| // may need to stitch together multiple input buffers; |
| // to give snappy a contiguous block |
| DataBuffer<char> inputDataBuffer; |
| }; |
| |
| BlockDecompressionStream::BlockDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, |
| size_t blockSize, MemoryPool& _pool, |
| ReaderMetrics* _metrics) |
| : DecompressionStream(std::move(inStream), blockSize, _pool, _metrics), |
| inputDataBuffer(pool, blockSize) {} |
| |
| void BlockDecompressionStream::NextDecompress(const void** data, int* size, |
| size_t availableSize) { |
| // Get contiguous bytes of compressed block. |
| const char* compressed = inputBuffer; |
| if (remainingLength == availableSize) { |
| inputBuffer += availableSize; |
| } else { |
| // Did not read enough from input. |
| if (inputDataBuffer.capacity() < remainingLength) { |
| inputDataBuffer.resize(remainingLength); |
| } |
| ::memcpy(inputDataBuffer.data(), inputBuffer, availableSize); |
| inputBuffer += availableSize; |
| compressed = inputDataBuffer.data(); |
| |
| for (size_t pos = availableSize; pos < remainingLength;) { |
| readBuffer(true); |
| size_t avail = |
| std::min(static_cast<size_t>(inputBufferEnd - inputBuffer), remainingLength - pos); |
| ::memcpy(inputDataBuffer.data() + pos, inputBuffer, avail); |
| pos += avail; |
| inputBuffer += avail; |
| } |
| } |
| outputBufferLength = decompress(compressed, remainingLength, outputDataBuffer.data(), |
| outputDataBuffer.capacity()); |
| remainingLength = 0; |
| state = DECOMPRESS_HEADER; |
| *data = outputDataBuffer.data(); |
| *size = static_cast<int>(outputBufferLength); |
| outputBuffer = outputDataBuffer.data() + outputBufferLength; |
| outputBufferLength = 0; |
| } |
| |
| class SnappyDecompressionStream : public BlockDecompressionStream { |
| public: |
| SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t blockSize, |
| MemoryPool& _pool, ReaderMetrics* _metrics) |
| : BlockDecompressionStream(std::move(inStream), blockSize, _pool, _metrics) { |
| // PASS |
| } |
| |
| std::string getName() const override { |
| std::ostringstream result; |
| result << "snappy(" << getStreamName() << ")"; |
| return result.str(); |
| } |
| |
| protected: |
| virtual uint64_t decompress(const char* input, uint64_t length, char* output, |
| size_t maxOutputLength) override; |
| }; |
| |
| uint64_t SnappyDecompressionStream::decompress(const char* _input, uint64_t length, char* output, |
| size_t maxOutputLength) { |
| size_t outLength; |
| if (!snappy::GetUncompressedLength(_input, length, &outLength)) { |
| throw ParseError("SnappyDecompressionStream choked on corrupt input"); |
| } |
| |
| if (outLength > maxOutputLength) { |
| throw std::logic_error("Snappy length exceeds block size"); |
| } |
| |
| if (!snappy::RawUncompress(_input, length, output)) { |
| throw ParseError("SnappyDecompressionStream choked on corrupt input"); |
| } |
| return outLength; |
| } |
| |
| class LzoDecompressionStream : public BlockDecompressionStream { |
| public: |
| LzoDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t blockSize, |
| MemoryPool& _pool, ReaderMetrics* _metrics) |
| : BlockDecompressionStream(std::move(inStream), blockSize, _pool, _metrics) { |
| // PASS |
| } |
| |
| std::string getName() const override { |
| std::ostringstream result; |
| result << "lzo(" << getStreamName() << ")"; |
| return result.str(); |
| } |
| |
| protected: |
| virtual uint64_t decompress(const char* input, uint64_t length, char* output, |
| size_t maxOutputLength) override; |
| }; |
| |
| uint64_t LzoDecompressionStream::decompress(const char* inputPtr, uint64_t length, char* output, |
| size_t maxOutputLength) { |
| return lzoDecompress(inputPtr, inputPtr + length, output, output + maxOutputLength); |
| } |
| |
| class Lz4DecompressionStream : public BlockDecompressionStream { |
| public: |
| Lz4DecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t blockSize, |
| MemoryPool& _pool, ReaderMetrics* _metrics) |
| : BlockDecompressionStream(std::move(inStream), blockSize, _pool, _metrics) { |
| // PASS |
| } |
| |
| std::string getName() const override { |
| std::ostringstream result; |
| result << "lz4(" << getStreamName() << ")"; |
| return result.str(); |
| } |
| |
| protected: |
| virtual uint64_t decompress(const char* input, uint64_t length, char* output, |
| size_t maxOutputLength) override; |
| }; |
| |
| uint64_t Lz4DecompressionStream::decompress(const char* inputPtr, uint64_t length, char* output, |
| size_t maxOutputLength) { |
| int result = LZ4_decompress_safe(inputPtr, output, static_cast<int>(length), |
| static_cast<int>(maxOutputLength)); |
| if (result < 0) { |
| throw ParseError(getName() + " - failed to decompress"); |
| } |
| return static_cast<uint64_t>(result); |
| } |
| |
| /** |
| * Block compression base class |
| */ |
| class BlockCompressionStream : public CompressionStreamBase { |
| public: |
| BlockCompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) |
| : CompressionStreamBase(outStream, compressionLevel, capacity, blockSize, pool, metrics), |
| compressorBuffer(pool) { |
| // PASS |
| } |
| |
| virtual bool Next(void** data, int* size) override; |
| virtual void suppress() override; |
| virtual std::string getName() const override = 0; |
| |
| protected: |
| // compresses a block and returns the compressed size |
| virtual uint64_t doBlockCompression() = 0; |
| |
| // return maximum possible compression size for allocating space for |
| // compressorBuffer below |
| virtual uint64_t estimateMaxCompressionSize() = 0; |
| |
| // should allocate max possible compressed size |
| DataBuffer<unsigned char> compressorBuffer; |
| }; |
| |
| bool BlockCompressionStream::Next(void** data, int* size) { |
| if (bufferSize != 0) { |
| ensureHeader(); |
| |
| // perform compression |
| size_t totalCompressedSize = doBlockCompression(); |
| |
| const unsigned char* dataToWrite = nullptr; |
| int totalSizeToWrite = 0; |
| |
| if (totalCompressedSize >= static_cast<size_t>(bufferSize)) { |
| writeHeader(static_cast<size_t>(bufferSize), true); |
| dataToWrite = rawInputBuffer.data(); |
| totalSizeToWrite = bufferSize; |
| } else { |
| writeHeader(totalCompressedSize, false); |
| dataToWrite = compressorBuffer.data(); |
| totalSizeToWrite = static_cast<int>(totalCompressedSize); |
| } |
| |
| writeData(dataToWrite, totalSizeToWrite); |
| } |
| |
| *data = rawInputBuffer.data(); |
| *size = static_cast<int>(rawInputBuffer.size()); |
| bufferSize = *size; |
| compressorBuffer.resize(estimateMaxCompressionSize()); |
| |
| return true; |
| } |
| |
| void BlockCompressionStream::suppress() { |
| compressorBuffer.resize(0); |
| CompressionStreamBase::suppress(); |
| } |
| |
| /** |
| * LZ4 block compression |
| */ |
| class Lz4CompressionSteam : public BlockCompressionStream { |
| public: |
| Lz4CompressionSteam(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) |
| : BlockCompressionStream(outStream, compressionLevel, capacity, blockSize, pool, metrics) { |
| this->init(); |
| } |
| |
| virtual std::string getName() const override { |
| return "Lz4CompressionStream"; |
| } |
| |
| virtual ~Lz4CompressionSteam() override { |
| this->end(); |
| } |
| |
| protected: |
| virtual uint64_t doBlockCompression() override; |
| |
| virtual uint64_t estimateMaxCompressionSize() override { |
| return static_cast<uint64_t>(LZ4_compressBound(bufferSize)); |
| } |
| |
| private: |
| void init(); |
| void end(); |
| LZ4_stream_t* state; |
| }; |
| |
| uint64_t Lz4CompressionSteam::doBlockCompression() { |
| int result = LZ4_compress_fast_extState( |
| static_cast<void*>(state), reinterpret_cast<const char*>(rawInputBuffer.data()), |
| reinterpret_cast<char*>(compressorBuffer.data()), bufferSize, |
| static_cast<int>(compressorBuffer.size()), level); |
| if (result == 0) { |
| throw std::runtime_error("Error during block compression using lz4."); |
| } |
| return static_cast<uint64_t>(result); |
| } |
| |
| void Lz4CompressionSteam::init() { |
| state = LZ4_createStream(); |
| if (!state) { |
| throw std::runtime_error("Error while allocating state for lz4."); |
| } |
| } |
| |
| void Lz4CompressionSteam::end() { |
| (void)LZ4_freeStream(state); |
| state = nullptr; |
| } |
| |
| /** |
| * Snappy block compression |
| */ |
| class SnappyCompressionStream : public BlockCompressionStream { |
| public: |
| SnappyCompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) |
| : BlockCompressionStream(outStream, compressionLevel, capacity, blockSize, pool, metrics) {} |
| |
| virtual std::string getName() const override { |
| return "SnappyCompressionStream"; |
| } |
| |
| virtual ~SnappyCompressionStream() override { |
| // PASS |
| } |
| |
| protected: |
| virtual uint64_t doBlockCompression() override; |
| |
| virtual uint64_t estimateMaxCompressionSize() override { |
| return static_cast<uint64_t>(snappy::MaxCompressedLength(static_cast<size_t>(bufferSize))); |
| } |
| }; |
| |
| uint64_t SnappyCompressionStream::doBlockCompression() { |
| size_t compressedLength; |
| snappy::RawCompress(reinterpret_cast<const char*>(rawInputBuffer.data()), |
| static_cast<size_t>(bufferSize), |
| reinterpret_cast<char*>(compressorBuffer.data()), &compressedLength); |
| return static_cast<uint64_t>(compressedLength); |
| } |
| |
| /** |
| * ZSTD block compression |
| */ |
| class ZSTDCompressionStream : public BlockCompressionStream { |
| public: |
| ZSTDCompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, |
| uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) |
| : BlockCompressionStream(outStream, compressionLevel, capacity, blockSize, pool, metrics) { |
| this->init(); |
| } |
| |
| virtual std::string getName() const override { |
| return "ZstdCompressionStream"; |
| } |
| |
| virtual ~ZSTDCompressionStream() override { |
| this->end(); |
| } |
| |
| protected: |
| virtual uint64_t doBlockCompression() override; |
| |
| virtual uint64_t estimateMaxCompressionSize() override { |
| return ZSTD_compressBound(static_cast<size_t>(bufferSize)); |
| } |
| |
| private: |
| void init(); |
| void end(); |
| ZSTD_CCtx* cctx; |
| }; |
| |
| uint64_t ZSTDCompressionStream::doBlockCompression() { |
| return ZSTD_compressCCtx(cctx, compressorBuffer.data(), compressorBuffer.size(), |
| rawInputBuffer.data(), static_cast<size_t>(bufferSize), level); |
| } |
| |
| DIAGNOSTIC_PUSH |
| |
| #if defined(__GNUC__) || defined(__clang__) |
| DIAGNOSTIC_IGNORE("-Wold-style-cast") |
| #endif |
| |
| void ZSTDCompressionStream::init() { |
| cctx = ZSTD_createCCtx(); |
| if (!cctx) { |
| throw std::runtime_error("Error while calling ZSTD_createCCtx() for zstd."); |
| } |
| } |
| |
| void ZSTDCompressionStream::end() { |
| (void)ZSTD_freeCCtx(cctx); |
| cctx = nullptr; |
| } |
| |
| DIAGNOSTIC_PUSH |
| |
| /** |
| * ZSTD block decompression |
| */ |
| class ZSTDDecompressionStream : public BlockDecompressionStream { |
| public: |
| ZSTDDecompressionStream(std::unique_ptr<SeekableInputStream> inStream, size_t blockSize, |
| MemoryPool& _pool, ReaderMetrics* _metrics) |
| : BlockDecompressionStream(std::move(inStream), blockSize, _pool, _metrics) { |
| this->init(); |
| } |
| |
| virtual ~ZSTDDecompressionStream() override { |
| this->end(); |
| } |
| |
| std::string getName() const override { |
| std::ostringstream result; |
| result << "zstd(" << getStreamName() << ")"; |
| return result.str(); |
| } |
| |
| protected: |
| virtual uint64_t decompress(const char* input, uint64_t length, char* output, |
| size_t maxOutputLength) override; |
| |
| private: |
| void init(); |
| void end(); |
| ZSTD_DCtx* dctx; |
| }; |
| |
| uint64_t ZSTDDecompressionStream::decompress(const char* inputPtr, uint64_t length, char* output, |
| size_t maxOutputLength) { |
| return static_cast<uint64_t>( |
| ZSTD_decompressDCtx(dctx, output, maxOutputLength, inputPtr, length)); |
| } |
| |
| DIAGNOSTIC_PUSH |
| |
| #if defined(__GNUC__) || defined(__clang__) |
| DIAGNOSTIC_IGNORE("-Wold-style-cast") |
| #endif |
| |
| void ZSTDDecompressionStream::init() { |
| dctx = ZSTD_createDCtx(); |
| if (!dctx) { |
| throw std::runtime_error("Error while calling ZSTD_createDCtx() for zstd."); |
| } |
| } |
| |
| void ZSTDDecompressionStream::end() { |
| (void)ZSTD_freeDCtx(dctx); |
| dctx = nullptr; |
| } |
| |
| DIAGNOSTIC_PUSH |
| |
| std::unique_ptr<BufferedOutputStream> createCompressor(CompressionKind kind, |
| OutputStream* outStream, |
| CompressionStrategy strategy, |
| uint64_t bufferCapacity, |
| uint64_t compressionBlockSize, |
| MemoryPool& pool, WriterMetrics* metrics) { |
| switch (static_cast<int64_t>(kind)) { |
| case CompressionKind_NONE: { |
| return std::make_unique<BufferedOutputStream>(pool, outStream, bufferCapacity, |
| compressionBlockSize, metrics); |
| } |
| case CompressionKind_ZLIB: { |
| int level = |
| (strategy == CompressionStrategy_SPEED) ? Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION; |
| return std::make_unique<ZlibCompressionStream>(outStream, level, bufferCapacity, |
| compressionBlockSize, pool, metrics); |
| } |
| case CompressionKind_ZSTD: { |
| int level = (strategy == CompressionStrategy_SPEED) ? 1 : ZSTD_CLEVEL_DEFAULT; |
| return std::make_unique<ZSTDCompressionStream>(outStream, level, bufferCapacity, |
| compressionBlockSize, pool, metrics); |
| } |
| case CompressionKind_LZ4: { |
| int level = (strategy == CompressionStrategy_SPEED) ? LZ4_ACCELERATION_MAX |
| : LZ4_ACCELERATION_DEFAULT; |
| return std::make_unique<Lz4CompressionSteam>(outStream, level, bufferCapacity, |
| compressionBlockSize, pool, metrics); |
| } |
| case CompressionKind_SNAPPY: { |
| int level = 0; |
| return std::make_unique<SnappyCompressionStream>(outStream, level, bufferCapacity, |
| compressionBlockSize, pool, metrics); |
| } |
| case CompressionKind_LZO: |
| default: |
| throw NotImplementedYet("compression codec"); |
| } |
| } |
| |
| std::unique_ptr<SeekableInputStream> createDecompressor( |
| CompressionKind kind, std::unique_ptr<SeekableInputStream> input, uint64_t blockSize, |
| MemoryPool& pool, ReaderMetrics* metrics) { |
| switch (static_cast<int64_t>(kind)) { |
| case CompressionKind_NONE: |
| return input; |
| case CompressionKind_ZLIB: |
| return std::make_unique<ZlibDecompressionStream>(std::move(input), blockSize, pool, |
| metrics); |
| case CompressionKind_SNAPPY: |
| return std::make_unique<SnappyDecompressionStream>(std::move(input), blockSize, pool, |
| metrics); |
| case CompressionKind_LZO: |
| return std::make_unique<LzoDecompressionStream>(std::move(input), blockSize, pool, metrics); |
| case CompressionKind_LZ4: |
| return std::make_unique<Lz4DecompressionStream>(std::move(input), blockSize, pool, metrics); |
| case CompressionKind_ZSTD: |
| return std::make_unique<ZSTDDecompressionStream>(std::move(input), blockSize, pool, |
| metrics); |
| default: { |
| std::ostringstream buffer; |
| buffer << "Unknown compression codec " << kind; |
| throw NotImplementedYet(buffer.str()); |
| } |
| } |
| } |
| |
| } // namespace orc |