| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/compression/compression_codec.h" |
| |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| #include <lz4.h> |
| #include <snappy-sinksource.h> |
| #include <snappy.h> |
| #include <zlib.h> |
| |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/singleton.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/string_case.h" |
| |
| namespace kudu { |
| |
| using std::vector; |
| |
| CompressionCodec::CompressionCodec() { |
| } |
| CompressionCodec::~CompressionCodec() { |
| } |
| |
| class SlicesSource : public snappy::Source { |
| public: |
| explicit SlicesSource(const std::vector<Slice>& slices) |
| : slice_index_(0), |
| slice_offset_(0), |
| slices_(slices) { |
| available_ = TotalSize(); |
| } |
| |
| size_t Available() const OVERRIDE { |
| return available_; |
| } |
| |
| const char* Peek(size_t* len) OVERRIDE { |
| if (available_ == 0) { |
| *len = 0; |
| return nullptr; |
| } |
| |
| const Slice& data = slices_[slice_index_]; |
| *len = data.size() - slice_offset_; |
| return reinterpret_cast<const char *>(data.data()) + slice_offset_; |
| } |
| |
| void Skip(size_t n) OVERRIDE { |
| DCHECK_LE(n, Available()); |
| if (n == 0) return; |
| |
| available_ -= n; |
| if ((n + slice_offset_) < slices_[slice_index_].size()) { |
| slice_offset_ += n; |
| } else { |
| n -= slices_[slice_index_].size() - slice_offset_; |
| slice_index_++; |
| while (n > 0 && n >= slices_[slice_index_].size()) { |
| n -= slices_[slice_index_].size(); |
| slice_index_++; |
| } |
| slice_offset_ = n; |
| } |
| } |
| |
| void Dump(faststring *buffer) { |
| buffer->reserve(buffer->size() + TotalSize()); |
| for (const Slice& block : slices_) { |
| buffer->append(block.data(), block.size()); |
| } |
| } |
| |
| private: |
| size_t TotalSize(void) const { |
| size_t size = 0; |
| for (const Slice& data : slices_) { |
| size += data.size(); |
| } |
| return size; |
| } |
| |
| private: |
| size_t available_; |
| size_t slice_index_; |
| size_t slice_offset_; |
| const vector<Slice>& slices_; |
| }; |
| |
| class SnappyCodec : public CompressionCodec { |
| public: |
| static SnappyCodec *GetSingleton() { |
| return Singleton<SnappyCodec>::get(); |
| } |
| |
| Status Compress(const Slice& input, |
| uint8_t *compressed, size_t *compressed_length) const OVERRIDE { |
| snappy::RawCompress(reinterpret_cast<const char *>(input.data()), input.size(), |
| reinterpret_cast<char *>(compressed), compressed_length); |
| return Status::OK(); |
| } |
| |
| Status Compress(const vector<Slice>& input_slices, |
| uint8_t *compressed, size_t *compressed_length) const OVERRIDE { |
| SlicesSource source(input_slices); |
| snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed)); |
| if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) { |
| return Status::Corruption("unable to compress the buffer"); |
| } |
| return Status::OK(); |
| } |
| |
| Status Uncompress(const Slice& compressed, |
| uint8_t *uncompressed, |
| size_t /*uncompressed_length*/) const OVERRIDE { |
| bool success = snappy::RawUncompress(reinterpret_cast<const char *>(compressed.data()), |
| compressed.size(), reinterpret_cast<char *>(uncompressed)); |
| return success ? Status::OK() : Status::Corruption("unable to uncompress the buffer"); |
| } |
| |
| size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { |
| return snappy::MaxCompressedLength(source_bytes); |
| } |
| |
| CompressionType type() const override { |
| return SNAPPY; |
| } |
| }; |
| |
| class Lz4Codec : public CompressionCodec { |
| public: |
| static Lz4Codec *GetSingleton() { |
| return Singleton<Lz4Codec>::get(); |
| } |
| |
| Status Compress(const Slice& input, |
| uint8_t *compressed, size_t *compressed_length) const OVERRIDE { |
| *compressed_length = LZ4_compress(reinterpret_cast<const char *>(input.data()), |
| reinterpret_cast<char *>(compressed), |
| input.size()); |
| return Status::OK(); |
| } |
| |
| Status Compress(const vector<Slice>& input_slices, |
| uint8_t *compressed, size_t *compressed_length) const OVERRIDE { |
| if (input_slices.size() == 1) { |
| return Compress(input_slices[0], compressed, compressed_length); |
| } |
| |
| SlicesSource source(input_slices); |
| faststring buffer; |
| source.Dump(&buffer); |
| return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length); |
| } |
| |
| Status Uncompress(const Slice& compressed, |
| uint8_t *uncompressed, |
| size_t uncompressed_length) const OVERRIDE { |
| int n = LZ4_decompress_safe(reinterpret_cast<const char *>(compressed.data()), |
| reinterpret_cast<char *>(uncompressed), |
| compressed.size(), uncompressed_length); |
| if (n != uncompressed_length) { |
| return Status::Corruption( |
| StringPrintf("unable to uncompress the buffer. error near %d, buffer", -n), |
| KUDU_REDACT(compressed.ToDebugString(100))); |
| } |
| return Status::OK(); |
| } |
| |
| size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { |
| return LZ4_compressBound(source_bytes); |
| } |
| |
| CompressionType type() const override { |
| return LZ4; |
| } |
| }; |
| |
| /** |
| * TODO: use a instance-local Arena and pass alloc/free into zlib |
| * so that it allocates from the arena. |
| */ |
| class ZlibCodec : public CompressionCodec { |
| public: |
| static ZlibCodec *GetSingleton() { |
| return Singleton<ZlibCodec>::get(); |
| } |
| |
| Status Compress(const Slice& input, |
| uint8_t *compressed, size_t *compressed_length) const OVERRIDE { |
| *compressed_length = MaxCompressedLength(input.size()); |
| int err = ::compress(compressed, compressed_length, input.data(), input.size()); |
| return err == Z_OK ? Status::OK() : Status::IOError("unable to compress the buffer"); |
| } |
| |
| Status Compress(const vector<Slice>& input_slices, |
| uint8_t *compressed, size_t *compressed_length) const OVERRIDE { |
| if (input_slices.size() == 1) { |
| return Compress(input_slices[0], compressed, compressed_length); |
| } |
| |
| // TODO: use z_stream |
| SlicesSource source(input_slices); |
| faststring buffer; |
| source.Dump(&buffer); |
| return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length); |
| } |
| |
| Status Uncompress(const Slice& compressed, |
| uint8_t *uncompressed, size_t uncompressed_length) const OVERRIDE { |
| int err = ::uncompress(uncompressed, &uncompressed_length, |
| compressed.data(), compressed.size()); |
| return err == Z_OK ? Status::OK() : Status::Corruption("unable to uncompress the buffer"); |
| } |
| |
| size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { |
| // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block |
| return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14))); |
| } |
| |
| CompressionType type() const override { |
| return ZLIB; |
| } |
| }; |
| |
| Status GetCompressionCodec(CompressionType compression, |
| const CompressionCodec** codec) { |
| switch (compression) { |
| case NO_COMPRESSION: |
| *codec = nullptr; |
| break; |
| case SNAPPY: |
| *codec = SnappyCodec::GetSingleton(); |
| break; |
| case LZ4: |
| *codec = Lz4Codec::GetSingleton(); |
| break; |
| case ZLIB: |
| *codec = ZlibCodec::GetSingleton(); |
| break; |
| default: |
| return Status::NotFound("bad compression type"); |
| } |
| return Status::OK(); |
| } |
| |
| CompressionType GetCompressionCodecType(const std::string& name) { |
| std::string uname; |
| ToUpperCase(name, &uname); |
| |
| if (uname == "SNAPPY") |
| return SNAPPY; |
| if (uname == "LZ4") |
| return LZ4; |
| if (uname == "ZLIB") |
| return ZLIB; |
| if (uname == "NO_COMPRESSION") |
| return NO_COMPRESSION; |
| |
| LOG(WARNING) << "Unable to recognize the compression codec '" << name |
| << "' using no compression as default."; |
| return NO_COMPRESSION; |
| } |
| |
| } // namespace kudu |