| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "util/block_compression.h" |
| |
| #include <bzlib.h> |
| #include <gen_cpp/parquet_types.h> |
| #include <gen_cpp/segment_v2.pb.h> |
| #include <glog/logging.h> |
| |
| #include <exception> |
| // Only used on x86 or x86_64 |
| #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
| defined(__i386) || defined(_M_IX86) |
| #include <libdeflate.h> |
| #endif |
| #include <brotli/decode.h> |
| #include <glog/log_severity.h> |
| #include <glog/logging.h> |
| #include <lz4/lz4.h> |
| #include <lz4/lz4frame.h> |
| #include <lz4/lz4hc.h> |
| #include <snappy/snappy-sinksource.h> |
| #include <snappy/snappy.h> |
| #include <zconf.h> |
| #include <zlib.h> |
| #include <zstd.h> |
| #include <zstd_errors.h> |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <limits> |
| #include <mutex> |
| #include <orc/Exceptions.hh> |
| #include <ostream> |
| |
| #include "absl/strings/substitute.h" |
| #include "common/config.h" |
| #include "common/factory_creator.h" |
| #include "exec/decompressor.h" |
| #include "runtime/thread_context.h" |
| #include "util/defer_op.h" |
| #include "util/faststring.h" |
| #include "vec/common/endian.h" |
| |
| namespace orc { |
| /** |
| * Decompress the bytes in to the output buffer. |
| * @param inputAddress the start of the input |
| * @param inputLimit one past the last byte of the input |
| * @param outputAddress the start of the output buffer |
| * @param outputLimit one past the last byte of the output buffer |
| * @result the number of bytes decompressed |
| */ |
| uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, |
| char* outputLimit); |
| } // namespace orc |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| // exception safe |
| Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) { |
| faststring buf; |
| // we compute total size to avoid more memory copy |
| buf.reserve(uncompressed_size); |
| for (auto& input : inputs) { |
| buf.append(input.data, input.size); |
| } |
| return compress(buf, output); |
| } |
| |
| bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { |
| return uncompressed_size > std::numeric_limits<int32_t>::max(); |
| } |
| |
| class Lz4BlockCompression : public BlockCompressionCodec { |
| private: |
| class Context { |
| ENABLE_FACTORY_CREATOR(Context); |
| |
| public: |
| Context() : ctx(nullptr) { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| buffer = std::make_unique<faststring>(); |
| } |
| LZ4_stream_t* ctx; |
| std::unique_ptr<faststring> buffer; |
| ~Context() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| if (ctx) { |
| LZ4_freeStream(ctx); |
| } |
| buffer.reset(); |
| } |
| }; |
| |
| public: |
| static Lz4BlockCompression* instance() { |
| static Lz4BlockCompression s_instance; |
| return &s_instance; |
| } |
| ~Lz4BlockCompression() override { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| _ctx_pool.clear(); |
| } |
| |
| Status compress(const Slice& input, faststring* output) override { |
| if (input.size > LZ4_MAX_INPUT_SIZE) { |
| return Status::InvalidArgument( |
| "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should " |
| "change " |
| "fragment_transmission_compression_codec to snappy, input.size={}, " |
| "LZ4_MAX_INPUT_SIZE={}", |
| input.size, LZ4_MAX_INPUT_SIZE); |
| } |
| |
| std::unique_ptr<Context> context; |
| RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
| bool compress_failed = false; |
| Defer defer {[&] { |
| if (!compress_failed) { |
| _release_compression_ctx(std::move(context)); |
| } |
| }}; |
| |
| try { |
| Slice compressed_buf; |
| size_t max_len = max_compressed_len(input.size); |
| if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| // use output directly |
| output->resize(max_len); |
| compressed_buf.data = reinterpret_cast<char*>(output->data()); |
| compressed_buf.size = max_len; |
| } else { |
| // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
| { |
| // context->buffer is resuable between queries, should accouting to |
| // global tracker. |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| context->buffer->resize(max_len); |
| } |
| compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
| compressed_buf.size = max_len; |
| } |
| |
| // input.size is aready checked before; |
| // compressed_buf.size is got from max_compressed_len, which is |
| // the return value of LZ4_compressBound, so it is safe to cast to int |
| size_t compressed_len = LZ4_compress_fast_continue( |
| context->ctx, input.data, compressed_buf.data, static_cast<int>(input.size), |
| static_cast<int>(compressed_buf.size), ACCELARATION); |
| if (compressed_len == 0) { |
| compress_failed = true; |
| return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
| compressed_buf.size); |
| } |
| output->resize(compressed_len); |
| if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
| compressed_len); |
| } |
| } catch (...) { |
| // Do not set compress_failed to release context |
| DCHECK(!compress_failed); |
| return Status::InternalError("Fail to do LZ4Block compress due to exception"); |
| } |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| auto decompressed_len = LZ4_decompress_safe( |
| input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size)); |
| if (decompressed_len < 0) { |
| return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len); |
| } |
| output->size = decompressed_len; |
| return Status::OK(); |
| } |
| |
| size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); } |
| |
| private: |
| // reuse LZ4 compress stream |
| Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
| std::lock_guard<std::mutex> l(_ctx_mutex); |
| if (_ctx_pool.empty()) { |
| std::unique_ptr<Context> localCtx = Context::create_unique(); |
| if (localCtx.get() == nullptr) { |
| return Status::InvalidArgument("new LZ4 context error"); |
| } |
| localCtx->ctx = LZ4_createStream(); |
| if (localCtx->ctx == nullptr) { |
| return Status::InvalidArgument("LZ4_createStream error"); |
| } |
| out = std::move(localCtx); |
| return Status::OK(); |
| } |
| out = std::move(_ctx_pool.back()); |
| _ctx_pool.pop_back(); |
| return Status::OK(); |
| } |
| void _release_compression_ctx(std::unique_ptr<Context> context) { |
| DCHECK(context); |
| LZ4_resetStream(context->ctx); |
| std::lock_guard<std::mutex> l(_ctx_mutex); |
| _ctx_pool.push_back(std::move(context)); |
| } |
| |
| private: |
| mutable std::mutex _ctx_mutex; |
| mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
| static const int32_t ACCELARATION = 1; |
| }; |
| |
| class HadoopLz4BlockCompression : public Lz4BlockCompression { |
| public: |
| HadoopLz4BlockCompression() { |
| Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor); |
| if (!st.ok()) { |
| throw Exception(Status::FatalError( |
| "HadoopLz4BlockCompression construction failed. status = {}", st)); |
| } |
| } |
| |
| ~HadoopLz4BlockCompression() override = default; |
| |
| static HadoopLz4BlockCompression* instance() { |
| static HadoopLz4BlockCompression s_instance; |
| return &s_instance; |
| } |
| |
| // hadoop use block compression for lz4 |
| // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc |
| Status compress(const Slice& input, faststring* output) override { |
| // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java |
| size_t lz4_block_size = config::lz4_compression_block_size; |
| size_t overhead = lz4_block_size / 255 + 16; |
| size_t max_input_size = lz4_block_size - overhead; |
| |
| size_t data_len = input.size; |
| char* data = input.data; |
| std::vector<OwnedSlice> buffers; |
| size_t out_len = 0; |
| |
| while (data_len > 0) { |
| size_t input_size = std::min(data_len, max_input_size); |
| Slice input_slice(data, input_size); |
| faststring output_data; |
| RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data)); |
| out_len += output_data.size(); |
| buffers.push_back(output_data.build()); |
| data += input_size; |
| data_len -= input_size; |
| } |
| |
| // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
| size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
| output->resize(total_output_len); |
| char* output_buffer = (char*)output->data(); |
| BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size())); |
| output_buffer += 4; |
| for (const auto& buffer : buffers) { |
| auto slice = buffer.slice(); |
| BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size())); |
| output_buffer += 4; |
| memcpy(output_buffer, slice.get_data(), slice.get_size()); |
| output_buffer += slice.get_size(); |
| } |
| |
| DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
| |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| size_t input_bytes_read = 0; |
| size_t decompressed_len = 0; |
| size_t more_input_bytes = 0; |
| size_t more_output_bytes = 0; |
| bool stream_end = false; |
| auto st = _decompressor->decompress((uint8_t*)input.data, cast_set<uint32_t>(input.size), |
| &input_bytes_read, (uint8_t*)output->data, |
| cast_set<uint32_t>(output->size), &decompressed_len, |
| &stream_end, &more_input_bytes, &more_output_bytes); |
| //try decompress use hadoopLz4 ,if failed fall back lz4. |
| return (st != Status::OK() || stream_end != true) |
| ? Lz4BlockCompression::decompress(input, output) |
| : Status::OK(); |
| } |
| |
| private: |
| std::unique_ptr<Decompressor> _decompressor; |
| }; |
| // Used for LZ4 frame format, decompress speed is two times faster than LZ4. |
| class Lz4fBlockCompression : public BlockCompressionCodec { |
| private: |
| class CContext { |
| ENABLE_FACTORY_CREATOR(CContext); |
| |
| public: |
| CContext() : ctx(nullptr) { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| buffer = std::make_unique<faststring>(); |
| } |
| LZ4F_compressionContext_t ctx; |
| std::unique_ptr<faststring> buffer; |
| ~CContext() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| if (ctx) { |
| LZ4F_freeCompressionContext(ctx); |
| } |
| buffer.reset(); |
| } |
| }; |
| class DContext { |
| ENABLE_FACTORY_CREATOR(DContext); |
| |
| public: |
| DContext() : ctx(nullptr) {} |
| LZ4F_decompressionContext_t ctx; |
| ~DContext() { |
| if (ctx) { |
| LZ4F_freeDecompressionContext(ctx); |
| } |
| } |
| }; |
| |
| public: |
| static Lz4fBlockCompression* instance() { |
| static Lz4fBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~Lz4fBlockCompression() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| _ctx_c_pool.clear(); |
| _ctx_d_pool.clear(); |
| } |
| |
| Status compress(const Slice& input, faststring* output) override { |
| std::vector<Slice> inputs {input}; |
| return compress(inputs, input.size, output); |
| } |
| |
| Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) override { |
| return _compress(inputs, uncompressed_size, output); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| return _decompress(input, output); |
| } |
| |
| size_t max_compressed_len(size_t len) override { |
| return std::max(LZ4F_compressBound(len, &_s_preferences), |
| LZ4F_compressFrameBound(len, &_s_preferences)); |
| } |
| |
| private: |
| Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) { |
| std::unique_ptr<CContext> context; |
| RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
| bool compress_failed = false; |
| Defer defer {[&] { |
| if (!compress_failed) { |
| _release_compression_ctx(std::move(context)); |
| } |
| }}; |
| |
| try { |
| Slice compressed_buf; |
| size_t max_len = max_compressed_len(uncompressed_size); |
| if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| // use output directly |
| output->resize(max_len); |
| compressed_buf.data = reinterpret_cast<char*>(output->data()); |
| compressed_buf.size = max_len; |
| } else { |
| { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
| context->buffer->resize(max_len); |
| } |
| compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
| compressed_buf.size = max_len; |
| } |
| |
| auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, |
| &_s_preferences); |
| if (LZ4F_isError(wbytes)) { |
| compress_failed = true; |
| return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", |
| LZ4F_getErrorName(wbytes)); |
| } |
| size_t offset = wbytes; |
| for (auto input : inputs) { |
| wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, |
| compressed_buf.size - offset, input.data, input.size, |
| nullptr); |
| if (LZ4F_isError(wbytes)) { |
| compress_failed = true; |
| return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", |
| LZ4F_getErrorName(wbytes)); |
| } |
| offset += wbytes; |
| } |
| wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, |
| compressed_buf.size - offset, nullptr); |
| if (LZ4F_isError(wbytes)) { |
| compress_failed = true; |
| return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", |
| LZ4F_getErrorName(wbytes)); |
| } |
| offset += wbytes; |
| output->resize(offset); |
| if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset); |
| } |
| } catch (...) { |
| // Do not set compress_failed to release context |
| DCHECK(!compress_failed); |
| return Status::InternalError("Fail to do LZ4F compress due to exception"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status _decompress(const Slice& input, Slice* output) { |
| bool decompress_failed = false; |
| std::unique_ptr<DContext> context; |
| RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
| Defer defer {[&] { |
| if (!decompress_failed) { |
| _release_decompression_ctx(std::move(context)); |
| } |
| }}; |
| size_t input_size = input.size; |
| auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, |
| &input_size, nullptr); |
| if (LZ4F_isError(lres)) { |
| decompress_failed = true; |
| return Status::InternalError("Fail to do LZ4F decompress, res={}", |
| LZ4F_getErrorName(lres)); |
| } else if (input_size != input.size) { |
| decompress_failed = true; |
| return Status::InvalidArgument( |
| absl::Substitute("Fail to do LZ4F decompress: trailing data left in " |
| "compressed data, read=$0 vs given=$1", |
| input_size, input.size)); |
| } else if (lres != 0) { |
| decompress_failed = true; |
| return Status::InvalidArgument( |
| "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| // acquire a compression ctx from pool, release while finish compress, |
| // delete if compression failed |
| Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
| std::lock_guard<std::mutex> l(_ctx_c_mutex); |
| if (_ctx_c_pool.empty()) { |
| std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
| if (localCtx.get() == nullptr) { |
| return Status::InvalidArgument("failed to new LZ4F CContext"); |
| } |
| auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION); |
| if (LZ4F_isError(res) != 0) { |
| return Status::InvalidArgument(absl::Substitute( |
| "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); |
| } |
| out = std::move(localCtx); |
| return Status::OK(); |
| } |
| out = std::move(_ctx_c_pool.back()); |
| _ctx_c_pool.pop_back(); |
| return Status::OK(); |
| } |
| void _release_compression_ctx(std::unique_ptr<CContext> context) { |
| DCHECK(context); |
| std::lock_guard<std::mutex> l(_ctx_c_mutex); |
| _ctx_c_pool.push_back(std::move(context)); |
| } |
| |
| Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
| std::lock_guard<std::mutex> l(_ctx_d_mutex); |
| if (_ctx_d_pool.empty()) { |
| std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
| if (localCtx.get() == nullptr) { |
| return Status::InvalidArgument("failed to new LZ4F DContext"); |
| } |
| auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION); |
| if (LZ4F_isError(res) != 0) { |
| return Status::InvalidArgument(absl::Substitute( |
| "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); |
| } |
| out = std::move(localCtx); |
| return Status::OK(); |
| } |
| out = std::move(_ctx_d_pool.back()); |
| _ctx_d_pool.pop_back(); |
| return Status::OK(); |
| } |
| void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
| DCHECK(context); |
| // reset decompression context to avoid ERROR_maxBlockSize_invalid |
| LZ4F_resetDecompressionContext(context->ctx); |
| std::lock_guard<std::mutex> l(_ctx_d_mutex); |
| _ctx_d_pool.push_back(std::move(context)); |
| } |
| |
| private: |
| static LZ4F_preferences_t _s_preferences; |
| |
| std::mutex _ctx_c_mutex; |
| // LZ4F_compressionContext_t is a pointer so no copy here |
| std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
| |
| std::mutex _ctx_d_mutex; |
| std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
| }; |
| |
| LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { |
| {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U, |
| LZ4F_noBlockChecksum}, |
| 0, |
| 0u, |
| 0u, |
| {0u, 0u, 0u}}; |
| |
| class Lz4HCBlockCompression : public BlockCompressionCodec { |
| private: |
| class Context { |
| ENABLE_FACTORY_CREATOR(Context); |
| |
| public: |
| Context() : ctx(nullptr) { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| buffer = std::make_unique<faststring>(); |
| } |
| LZ4_streamHC_t* ctx; |
| std::unique_ptr<faststring> buffer; |
| ~Context() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| if (ctx) { |
| LZ4_freeStreamHC(ctx); |
| } |
| buffer.reset(); |
| } |
| }; |
| |
| public: |
| static Lz4HCBlockCompression* instance() { |
| static Lz4HCBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~Lz4HCBlockCompression() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| _ctx_pool.clear(); |
| } |
| |
| Status compress(const Slice& input, faststring* output) override { |
| std::unique_ptr<Context> context; |
| RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
| bool compress_failed = false; |
| Defer defer {[&] { |
| if (!compress_failed) { |
| _release_compression_ctx(std::move(context)); |
| } |
| }}; |
| |
| try { |
| Slice compressed_buf; |
| size_t max_len = max_compressed_len(input.size); |
| if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| // use output directly |
| output->resize(max_len); |
| compressed_buf.data = reinterpret_cast<char*>(output->data()); |
| compressed_buf.size = max_len; |
| } else { |
| { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
| context->buffer->resize(max_len); |
| } |
| compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
| compressed_buf.size = max_len; |
| } |
| |
| size_t compressed_len = LZ4_compress_HC_continue( |
| context->ctx, input.data, compressed_buf.data, cast_set<int>(input.size), |
| static_cast<int>(compressed_buf.size)); |
| if (compressed_len == 0) { |
| compress_failed = true; |
| return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
| compressed_buf.size); |
| } |
| output->resize(compressed_len); |
| if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
| compressed_len); |
| } |
| } catch (...) { |
| // Do not set compress_failed to release context |
| DCHECK(!compress_failed); |
| return Status::InternalError("Fail to do LZ4HC compress due to exception"); |
| } |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| auto decompressed_len = LZ4_decompress_safe( |
| input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size)); |
| if (decompressed_len < 0) { |
| return Status::InvalidArgument( |
| "destination buffer is not large enough or the source stream is detected " |
| "malformed, fail to do LZ4 decompress, error={}", |
| decompressed_len); |
| } |
| output->size = decompressed_len; |
| return Status::OK(); |
| } |
| |
| size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); } |
| |
| private: |
| Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
| std::lock_guard<std::mutex> l(_ctx_mutex); |
| if (_ctx_pool.empty()) { |
| std::unique_ptr<Context> localCtx = Context::create_unique(); |
| if (localCtx.get() == nullptr) { |
| return Status::InvalidArgument("new LZ4HC context error"); |
| } |
| localCtx->ctx = LZ4_createStreamHC(); |
| if (localCtx->ctx == nullptr) { |
| return Status::InvalidArgument("LZ4_createStreamHC error"); |
| } |
| out = std::move(localCtx); |
| return Status::OK(); |
| } |
| out = std::move(_ctx_pool.back()); |
| _ctx_pool.pop_back(); |
| return Status::OK(); |
| } |
| void _release_compression_ctx(std::unique_ptr<Context> context) { |
| DCHECK(context); |
| LZ4_resetStreamHC_fast(context->ctx, static_cast<int>(_compression_level)); |
| std::lock_guard<std::mutex> l(_ctx_mutex); |
| _ctx_pool.push_back(std::move(context)); |
| } |
| |
| private: |
| int64_t _compression_level = config::LZ4_HC_compression_level; |
| mutable std::mutex _ctx_mutex; |
| mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
| }; |
| |
| class SnappySlicesSource : public snappy::Source { |
| public: |
| SnappySlicesSource(const std::vector<Slice>& slices) |
| : _available(0), _cur_slice(0), _slice_off(0) { |
| for (auto& slice : slices) { |
| // We filter empty slice here to avoid complicated process |
| if (slice.size == 0) { |
| continue; |
| } |
| _available += slice.size; |
| _slices.push_back(slice); |
| } |
| } |
| ~SnappySlicesSource() override {} |
| |
| // Return the number of bytes left to read from the source |
| size_t Available() const override { return _available; } |
| |
| // Peek at the next flat region of the source. Does not reposition |
| // the source. The returned region is empty iff Available()==0. |
| // |
| // Returns a pointer to the beginning of the region and store its |
| // length in *len. |
| // |
| // The returned region is valid until the next call to Skip() or |
| // until this object is destroyed, whichever occurs first. |
| // |
| // The returned region may be larger than Available() (for example |
| // if this ByteSource is a view on a substring of a larger source). |
| // The caller is responsible for ensuring that it only reads the |
| // Available() bytes. |
| const char* Peek(size_t* len) override { |
| if (_available == 0) { |
| *len = 0; |
| return nullptr; |
| } |
| // we should assure that *len is not 0 |
| *len = _slices[_cur_slice].size - _slice_off; |
| DCHECK(*len != 0); |
| return _slices[_cur_slice].data + _slice_off; |
| } |
| |
| // Skip the next n bytes. Invalidates any buffer returned by |
| // a previous call to Peek(). |
| // REQUIRES: Available() >= n |
| void Skip(size_t n) override { |
| _available -= n; |
| while (n > 0) { |
| auto left = _slices[_cur_slice].size - _slice_off; |
| if (left > n) { |
| // n can be digest in current slice |
| _slice_off += n; |
| return; |
| } |
| _slice_off = 0; |
| _cur_slice++; |
| n -= left; |
| } |
| } |
| |
| private: |
| std::vector<Slice> _slices; |
| size_t _available; |
| size_t _cur_slice; |
| size_t _slice_off; |
| }; |
| |
| class SnappyBlockCompression : public BlockCompressionCodec { |
| public: |
| static SnappyBlockCompression* instance() { |
| static SnappyBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~SnappyBlockCompression() override = default; |
| |
| Status compress(const Slice& input, faststring* output) override { |
| size_t max_len = max_compressed_len(input.size); |
| output->resize(max_len); |
| Slice s(*output); |
| |
| snappy::RawCompress(input.data, input.size, s.data, &s.size); |
| output->resize(s.size); |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| if (!snappy::RawUncompress(input.data, input.size, output->data)) { |
| return Status::InvalidArgument("Fail to do Snappy decompress"); |
| } |
| // NOTE: GetUncompressedLength only takes O(1) time |
| snappy::GetUncompressedLength(input.data, input.size, &output->size); |
| return Status::OK(); |
| } |
| |
| Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) override { |
| auto max_len = max_compressed_len(uncompressed_size); |
| output->resize(max_len); |
| |
| SnappySlicesSource source(inputs); |
| snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data())); |
| output->resize(snappy::Compress(&source, &sink)); |
| return Status::OK(); |
| } |
| |
| size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } |
| }; |
| |
| class HadoopSnappyBlockCompression : public SnappyBlockCompression { |
| public: |
| static HadoopSnappyBlockCompression* instance() { |
| static HadoopSnappyBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~HadoopSnappyBlockCompression() override = default; |
| |
| // hadoop use block compression for snappy |
| // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc |
| Status compress(const Slice& input, faststring* output) override { |
| // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java |
| size_t snappy_block_size = config::snappy_compression_block_size; |
| size_t overhead = snappy_block_size / 6 + 32; |
| size_t max_input_size = snappy_block_size - overhead; |
| |
| size_t data_len = input.size; |
| char* data = input.data; |
| std::vector<OwnedSlice> buffers; |
| size_t out_len = 0; |
| |
| while (data_len > 0) { |
| size_t input_size = std::min(data_len, max_input_size); |
| Slice input_slice(data, input_size); |
| faststring output_data; |
| RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data)); |
| out_len += output_data.size(); |
| // the OwnedSlice will be moved here |
| buffers.push_back(output_data.build()); |
| data += input_size; |
| data_len -= input_size; |
| } |
| |
| // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
| size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
| output->resize(total_output_len); |
| char* output_buffer = (char*)output->data(); |
| BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size())); |
| output_buffer += 4; |
| for (const auto& buffer : buffers) { |
| auto slice = buffer.slice(); |
| BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size())); |
| output_buffer += 4; |
| memcpy(output_buffer, slice.get_data(), slice.get_size()); |
| output_buffer += slice.get_size(); |
| } |
| |
| DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
| |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress"); |
| } |
| }; |
| |
| class ZlibBlockCompression : public BlockCompressionCodec { |
| public: |
| static ZlibBlockCompression* instance() { |
| static ZlibBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~ZlibBlockCompression() override = default; |
| |
| Status compress(const Slice& input, faststring* output) override { |
| size_t max_len = max_compressed_len(input.size); |
| output->resize(max_len); |
| Slice s(*output); |
| |
| auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); |
| if (zres == Z_MEM_ERROR) { |
| throw Exception(Status::MemoryLimitExceeded(fmt::format( |
| "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", |
| zError(zres), zres))); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); |
| } |
| output->resize(s.size); |
| return Status::OK(); |
| } |
| |
| Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) override { |
| size_t max_len = max_compressed_len(uncompressed_size); |
| output->resize(max_len); |
| |
| z_stream zstrm; |
| zstrm.zalloc = Z_NULL; |
| zstrm.zfree = Z_NULL; |
| zstrm.opaque = Z_NULL; |
| auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); |
| if (zres == Z_MEM_ERROR) { |
| throw Exception(Status::MemoryLimitExceeded( |
| "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres)); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
| zError(zres), zres); |
| } |
| // we assume that output is e |
| zstrm.next_out = (Bytef*)output->data(); |
| zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size()); |
| for (int i = 0; i < inputs.size(); ++i) { |
| if (inputs[i].size == 0) { |
| continue; |
| } |
| zstrm.next_in = (Bytef*)inputs[i].data; |
| zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size); |
| int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
| |
| zres = deflate(&zstrm, flush); |
| if (zres != Z_OK && zres != Z_STREAM_END) { |
| return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
| zError(zres), zres); |
| } |
| } |
| |
| output->resize(zstrm.total_out); |
| zres = deflateEnd(&zstrm); |
| if (zres == Z_DATA_ERROR) { |
| return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), |
| zres); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
| zError(zres), zres); |
| } |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| size_t input_size = input.size; |
| auto zres = |
| ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); |
| if (zres == Z_DATA_ERROR) { |
| return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); |
| } else if (zres == Z_MEM_ERROR) { |
| throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", |
| zError(zres))); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); |
| } |
| return Status::OK(); |
| } |
| |
| size_t max_compressed_len(size_t len) override { |
| // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block |
| return len + 6 + 5 * ((len >> 14) + 1); |
| } |
| }; |
| |
| class Bzip2BlockCompression : public BlockCompressionCodec { |
| public: |
| static Bzip2BlockCompression* instance() { |
| static Bzip2BlockCompression s_instance; |
| return &s_instance; |
| } |
| ~Bzip2BlockCompression() override = default; |
| |
| Status compress(const Slice& input, faststring* output) override { |
| size_t max_len = max_compressed_len(input.size); |
| output->resize(max_len); |
| auto size = cast_set<uint32_t>(output->size()); |
| auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, |
| cast_set<uint32_t>(input.size), 9, 0, 0); |
| if (bzres == BZ_MEM_ERROR) { |
| throw Exception( |
| Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres)); |
| } else if (bzres == BZ_PARAM_ERROR) { |
| return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); |
| } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
| bzres != BZ_STREAM_END && bzres != BZ_OK) { |
| return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
| } |
| output->resize(size); |
| return Status::OK(); |
| } |
| |
| Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) override { |
| size_t max_len = max_compressed_len(uncompressed_size); |
| output->resize(max_len); |
| |
| bz_stream bzstrm; |
| bzero(&bzstrm, sizeof(bzstrm)); |
| int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); |
| if (bzres == BZ_PARAM_ERROR) { |
| return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
| } else if (bzres == BZ_MEM_ERROR) { |
| throw Exception( |
| Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres)); |
| } else if (bzres != BZ_OK) { |
| return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
| } |
| // we assume that output is e |
| bzstrm.next_out = (char*)output->data(); |
| bzstrm.avail_out = cast_set<uint32_t>(output->size()); |
| for (int i = 0; i < inputs.size(); ++i) { |
| if (inputs[i].size == 0) { |
| continue; |
| } |
| bzstrm.next_in = (char*)inputs[i].data; |
| bzstrm.avail_in = cast_set<uint32_t>(inputs[i].size); |
| int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; |
| |
| bzres = BZ2_bzCompress(&bzstrm, flush); |
| if (bzres == BZ_PARAM_ERROR) { |
| return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
| } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
| bzres != BZ_STREAM_END && bzres != BZ_OK) { |
| return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
| } |
| } |
| |
| size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; |
| output->resize(total_out); |
| bzres = BZ2_bzCompressEnd(&bzstrm); |
| if (bzres == BZ_PARAM_ERROR) { |
| return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
| } else if (bzres != BZ_OK) { |
| return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
| } |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| return Status::InternalError("unimplement: Bzip2BlockCompression::decompress"); |
| } |
| |
| size_t max_compressed_len(size_t len) override { |
| // TODO: make sure the max_compressed_len for bzip2 |
| // 50 is an estimate fix overhead for bzip2 |
| // in case the input len is small and BZ2_bzBuffToBuffCompress will return |
| // BZ_OUTBUFF_FULL |
| return len * 2 + 50; |
| } |
| }; |
| |
| // for ZSTD compression and decompression, with BOTH fast and high compression ratio |
| class ZstdBlockCompression : public BlockCompressionCodec { |
| private: |
| class CContext { |
| ENABLE_FACTORY_CREATOR(CContext); |
| |
| public: |
| CContext() : ctx(nullptr) { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| buffer = std::make_unique<faststring>(); |
| } |
| ZSTD_CCtx* ctx; |
| std::unique_ptr<faststring> buffer; |
| ~CContext() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| if (ctx) { |
| ZSTD_freeCCtx(ctx); |
| } |
| buffer.reset(); |
| } |
| }; |
| class DContext { |
| ENABLE_FACTORY_CREATOR(DContext); |
| |
| public: |
| DContext() : ctx(nullptr) {} |
| ZSTD_DCtx* ctx; |
| ~DContext() { |
| if (ctx) { |
| ZSTD_freeDCtx(ctx); |
| } |
| } |
| }; |
| |
| public: |
| static ZstdBlockCompression* instance() { |
| static ZstdBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~ZstdBlockCompression() { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| _ctx_c_pool.clear(); |
| _ctx_d_pool.clear(); |
| } |
| |
| size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } |
| |
| Status compress(const Slice& input, faststring* output) override { |
| std::vector<Slice> inputs {input}; |
| return compress(inputs, input.size, output); |
| } |
| |
| // follow ZSTD official example |
| // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c |
| Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) override { |
| std::unique_ptr<CContext> context; |
| RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
| bool compress_failed = false; |
| Defer defer {[&] { |
| if (!compress_failed) { |
| _release_compression_ctx(std::move(context)); |
| } |
| }}; |
| |
| try { |
| size_t max_len = max_compressed_len(uncompressed_size); |
| Slice compressed_buf; |
| if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| // use output directly |
| output->resize(max_len); |
| compressed_buf.data = reinterpret_cast<char*>(output->data()); |
| compressed_buf.size = max_len; |
| } else { |
| { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->block_compression_mem_tracker()); |
| // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
| context->buffer->resize(max_len); |
| } |
| compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
| compressed_buf.size = max_len; |
| } |
| |
| // set compression level to default 3 |
| auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, |
| ZSTD_CLEVEL_DEFAULT); |
| if (ZSTD_isError(ret)) { |
| return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", |
| ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
| } |
| // set checksum flag to 1 |
| ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); |
| if (ZSTD_isError(ret)) { |
| return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", |
| ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
| } |
| |
| ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; |
| |
| for (size_t i = 0; i < inputs.size(); i++) { |
| ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; |
| |
| bool last_input = (i == inputs.size() - 1); |
| auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; |
| |
| bool finished = false; |
| do { |
| // do compress |
| ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); |
| |
| if (ZSTD_isError(ret)) { |
| compress_failed = true; |
| return Status::InternalError("ZSTD_compressStream2 error: {}", |
| ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
| } |
| |
| // ret is ZSTD hint for needed output buffer size |
| if (ret > 0 && out_buf.pos == out_buf.size) { |
| compress_failed = true; |
| return Status::InternalError("ZSTD_compressStream2 output buffer full"); |
| } |
| |
| finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); |
| } while (!finished); |
| } |
| |
| // set compressed size for caller |
| output->resize(out_buf.pos); |
| if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
| output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos); |
| } |
| } catch (std::exception& e) { |
| return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what()); |
| } catch (...) { |
| // Do not set compress_failed to release context |
| DCHECK(!compress_failed); |
| return Status::InternalError("Fail to do ZSTD compress due to exception"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| std::unique_ptr<DContext> context; |
| bool decompress_failed = false; |
| RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
| Defer defer {[&] { |
| if (!decompress_failed) { |
| _release_decompression_ctx(std::move(context)); |
| } |
| }}; |
| |
| size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data, |
| input.size); |
| if (ZSTD_isError(ret)) { |
| decompress_failed = true; |
| return Status::InternalError("ZSTD_decompressDCtx error: {}", |
| ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
| } |
| |
| // set decompressed size for caller |
| output->size = ret; |
| |
| return Status::OK(); |
| } |
| |
| private: |
| Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
| std::lock_guard<std::mutex> l(_ctx_c_mutex); |
| if (_ctx_c_pool.empty()) { |
| std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
| if (localCtx.get() == nullptr) { |
| return Status::InvalidArgument("failed to new ZSTD CContext"); |
| } |
| //typedef LZ4F_cctx* LZ4F_compressionContext_t; |
| localCtx->ctx = ZSTD_createCCtx(); |
| if (localCtx->ctx == nullptr) { |
| return Status::InvalidArgument("Failed to create ZSTD compress ctx"); |
| } |
| out = std::move(localCtx); |
| return Status::OK(); |
| } |
| out = std::move(_ctx_c_pool.back()); |
| _ctx_c_pool.pop_back(); |
| return Status::OK(); |
| } |
| void _release_compression_ctx(std::unique_ptr<CContext> context) { |
| DCHECK(context); |
| auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); |
| DCHECK(!ZSTD_isError(ret)); |
| std::lock_guard<std::mutex> l(_ctx_c_mutex); |
| _ctx_c_pool.push_back(std::move(context)); |
| } |
| |
| Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
| std::lock_guard<std::mutex> l(_ctx_d_mutex); |
| if (_ctx_d_pool.empty()) { |
| std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
| if (localCtx.get() == nullptr) { |
| return Status::InvalidArgument("failed to new ZSTD DContext"); |
| } |
| localCtx->ctx = ZSTD_createDCtx(); |
| if (localCtx->ctx == nullptr) { |
| return Status::InvalidArgument("Fail to init ZSTD decompress context"); |
| } |
| out = std::move(localCtx); |
| return Status::OK(); |
| } |
| out = std::move(_ctx_d_pool.back()); |
| _ctx_d_pool.pop_back(); |
| return Status::OK(); |
| } |
| void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
| DCHECK(context); |
| // reset ctx to start a new decompress session |
| auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); |
| DCHECK(!ZSTD_isError(ret)); |
| std::lock_guard<std::mutex> l(_ctx_d_mutex); |
| _ctx_d_pool.push_back(std::move(context)); |
| } |
| |
| private: |
| mutable std::mutex _ctx_c_mutex; |
| mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
| |
| mutable std::mutex _ctx_d_mutex; |
| mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
| }; |
| |
| class GzipBlockCompression : public ZlibBlockCompression { |
| public: |
| static GzipBlockCompression* instance() { |
| static GzipBlockCompression s_instance; |
| return &s_instance; |
| } |
| ~GzipBlockCompression() override = default; |
| |
| Status compress(const Slice& input, faststring* output) override { |
| size_t max_len = max_compressed_len(input.size); |
| output->resize(max_len); |
| |
| z_stream z_strm = {}; |
| z_strm.zalloc = Z_NULL; |
| z_strm.zfree = Z_NULL; |
| z_strm.opaque = Z_NULL; |
| |
| int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
| 8, Z_DEFAULT_STRATEGY); |
| |
| if (zres == Z_MEM_ERROR) { |
| throw Exception(Status::MemoryLimitExceeded( |
| "Fail to init ZLib compress, error={}, res={}", zError(zres), zres)); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to init ZLib compress, error={}, res={}", |
| zError(zres), zres); |
| } |
| |
| z_strm.next_in = (Bytef*)input.get_data(); |
| z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.get_size()); |
| z_strm.next_out = (Bytef*)output->data(); |
| z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size()); |
| |
| zres = deflate(&z_strm, Z_FINISH); |
| if (zres != Z_OK && zres != Z_STREAM_END) { |
| return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
| zError(zres), zres); |
| } |
| |
| output->resize(z_strm.total_out); |
| zres = deflateEnd(&z_strm); |
| if (zres == Z_DATA_ERROR) { |
| return Status::InvalidArgument("Fail to end zlib compress"); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to end zlib compress"); |
| } |
| return Status::OK(); |
| } |
| |
| Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
| faststring* output) override { |
| size_t max_len = max_compressed_len(uncompressed_size); |
| output->resize(max_len); |
| |
| z_stream zstrm; |
| zstrm.zalloc = Z_NULL; |
| zstrm.zfree = Z_NULL; |
| zstrm.opaque = Z_NULL; |
| auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
| 8, Z_DEFAULT_STRATEGY); |
| if (zres == Z_MEM_ERROR) { |
| throw Exception(Status::MemoryLimitExceeded( |
| "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres)); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", |
| zError(zres), zres); |
| } |
| |
| // we assume that output is e |
| zstrm.next_out = (Bytef*)output->data(); |
| zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size()); |
| for (int i = 0; i < inputs.size(); ++i) { |
| if (inputs[i].size == 0) { |
| continue; |
| } |
| zstrm.next_in = (Bytef*)inputs[i].data; |
| zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size); |
| int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
| |
| zres = deflate(&zstrm, flush); |
| if (zres != Z_OK && zres != Z_STREAM_END) { |
| return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
| zError(zres), zres); |
| } |
| } |
| |
| output->resize(zstrm.total_out); |
| zres = deflateEnd(&zstrm); |
| if (zres == Z_DATA_ERROR) { |
| return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
| zError(zres), zres); |
| } else if (zres != Z_OK) { |
| return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
| zError(zres), zres); |
| } |
| return Status::OK(); |
| } |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| z_stream z_strm = {}; |
| z_strm.zalloc = Z_NULL; |
| z_strm.zfree = Z_NULL; |
| z_strm.opaque = Z_NULL; |
| |
| int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); |
| if (ret != Z_OK) { |
| return Status::InternalError("Fail to init ZLib decompress, error={}, res={}", |
| zError(ret), ret); |
| } |
| |
| // 1. set input and output |
| z_strm.next_in = reinterpret_cast<Bytef*>(input.data); |
| z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.size); |
| z_strm.next_out = reinterpret_cast<Bytef*>(output->data); |
| z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size); |
| |
| if (z_strm.avail_out > 0) { |
| // We only support non-streaming use case for block decompressor |
| ret = inflate(&z_strm, Z_FINISH); |
| if (ret != Z_OK && ret != Z_STREAM_END) { |
| (void)inflateEnd(&z_strm); |
| if (ret == Z_MEM_ERROR) { |
| throw Exception(Status::MemoryLimitExceeded( |
| "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret)); |
| } else if (ret == Z_DATA_ERROR) { |
| return Status::InvalidArgument( |
| "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); |
| } |
| return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
| zError(ret), ret); |
| } |
| } |
| (void)inflateEnd(&z_strm); |
| |
| return Status::OK(); |
| } |
| |
| size_t max_compressed_len(size_t len) override { |
| z_stream zstrm; |
| zstrm.zalloc = Z_NULL; |
| zstrm.zfree = Z_NULL; |
| zstrm.opaque = Z_NULL; |
| auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
| MEM_LEVEL, Z_DEFAULT_STRATEGY); |
| if (zres != Z_OK) { |
| // Fall back to zlib estimate logic for deflate, notice this may |
| // cause decompress error |
| LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres) |
| << ", res=" << zres; |
| return ZlibBlockCompression::max_compressed_len(len); |
| } else { |
| zres = deflateEnd(&zstrm); |
| if (zres != Z_OK) { |
| LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres) |
| << ", res=" << zres; |
| } |
| // Mark, maintainer of zlib, has stated that 12 needs to be added to |
| // result for gzip |
| // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854 |
| // To have a safe upper bound for "wrapper variations", we add 32 to |
| // estimate |
| auto upper_bound = deflateBound(&zstrm, len) + 32; |
| return upper_bound; |
| } |
| } |
| |
| private: |
| // Magic number for zlib, see https://zlib.net/manual.html for more details. |
| const static int GZIP_CODEC = 16; // gzip |
| // The memLevel parameter specifies how much memory should be allocated for |
| // the internal compression state. |
| const static int MEM_LEVEL = 8; |
| }; |
| |
| // Only used on x86 or x86_64 |
| #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
| defined(__i386) || defined(_M_IX86) |
| class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { |
| public: |
| GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {} |
| static GzipBlockCompressionByLibdeflate* instance() { |
| static GzipBlockCompressionByLibdeflate s_instance; |
| return &s_instance; |
| } |
| ~GzipBlockCompressionByLibdeflate() override = default; |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| if (input.empty()) { |
| output->size = 0; |
| return Status::OK(); |
| } |
| thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)> |
| decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor}; |
| if (!decompressor) { |
| return Status::InternalError("libdeflate_alloc_decompressor error."); |
| } |
| std::size_t out_len; |
| auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size, |
| output->data, output->size, &out_len); |
| if (result != LIBDEFLATE_SUCCESS) { |
| return Status::InternalError("libdeflate_gzip_decompress error, res={}", result); |
| } |
| return Status::OK(); |
| } |
| }; |
| #endif |
| |
| class LzoBlockCompression final : public BlockCompressionCodec { |
| public: |
| static LzoBlockCompression* instance() { |
| static LzoBlockCompression s_instance; |
| return &s_instance; |
| } |
| |
| Status compress(const Slice& input, faststring* output) override { |
| return Status::InvalidArgument("not impl lzo compress."); |
| } |
| size_t max_compressed_len(size_t len) override { return 0; }; |
| Status decompress(const Slice& input, Slice* output) override { |
| auto* input_ptr = input.data; |
| auto remain_input_size = input.size; |
| auto* output_ptr = output->data; |
| auto remain_output_size = output->size; |
| auto* output_limit = output->data + output->size; |
| |
| // Example: |
| // OriginData(The original data will be divided into several large data block.) : |
| // large data block1 | large data block2 | large data block3 | .... |
| // The large data block will be divided into several small data block. |
| // Suppose a large data block is divided into three small blocks: |
| // large data block1: | small block1 | small block2 | small block3 | |
| // CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> |
| // |
| // A : original length of the current block of large data block. |
| // sizeof(A) = 4 bytes. |
| // A = length(small block1) + length(small block2) + length(small block3) |
| // Bx : length of small data block bx. |
| // sizeof(Bx) = 4 bytes. |
| // Bx = length(compress(small blockx)) |
| try { |
| while (remain_input_size > 0) { |
| if (remain_input_size < 4) { |
| return Status::InvalidArgument( |
| "Need more input buffer to get large_block_uncompressed_len."); |
| } |
| |
| uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); |
| input_ptr += 4; |
| remain_input_size -= 4; |
| |
| if (remain_output_size < large_block_uncompressed_len) { |
| return Status::InvalidArgument( |
| "Need more output buffer to get uncompressed data."); |
| } |
| |
| while (large_block_uncompressed_len > 0) { |
| if (remain_input_size < 4) { |
| return Status::InvalidArgument( |
| "Need more input buffer to get small_block_compressed_len."); |
| } |
| |
| uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); |
| input_ptr += 4; |
| remain_input_size -= 4; |
| |
| if (remain_input_size < small_block_compressed_len) { |
| return Status::InvalidArgument( |
| "Need more input buffer to decompress small block."); |
| } |
| |
| auto small_block_uncompressed_len = |
| orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, |
| output_ptr, output_limit); |
| |
| input_ptr += small_block_compressed_len; |
| remain_input_size -= small_block_compressed_len; |
| |
| output_ptr += small_block_uncompressed_len; |
| large_block_uncompressed_len -= small_block_uncompressed_len; |
| remain_output_size -= small_block_uncompressed_len; |
| } |
| } |
| } catch (const orc::ParseError& e) { |
| //Prevent be from hanging due to orc::lzoDecompress throw exception |
| return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class BrotliBlockCompression final : public BlockCompressionCodec { |
| public: |
| static BrotliBlockCompression* instance() { |
| static BrotliBlockCompression s_instance; |
| return &s_instance; |
| } |
| |
| Status compress(const Slice& input, faststring* output) override { |
| return Status::InvalidArgument("not impl brotli compress."); |
| } |
| |
| size_t max_compressed_len(size_t len) override { return 0; }; |
| |
| Status decompress(const Slice& input, Slice* output) override { |
| // The size of output buffer is always equal to the umcompressed length. |
| BrotliDecoderResult result = BrotliDecoderDecompress( |
| input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size, |
| reinterpret_cast<uint8_t*>(output->data)); |
| if (result != BROTLI_DECODER_RESULT_SUCCESS) { |
| return Status::InternalError("Brotli decompression failed, result={}", result); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| Status get_block_compression_codec(segment_v2::CompressionTypePB type, |
| BlockCompressionCodec** codec) { |
| switch (type) { |
| case segment_v2::CompressionTypePB::NO_COMPRESSION: |
| *codec = nullptr; |
| break; |
| case segment_v2::CompressionTypePB::SNAPPY: |
| *codec = SnappyBlockCompression::instance(); |
| break; |
| case segment_v2::CompressionTypePB::LZ4: |
| *codec = Lz4BlockCompression::instance(); |
| break; |
| case segment_v2::CompressionTypePB::LZ4F: |
| *codec = Lz4fBlockCompression::instance(); |
| break; |
| case segment_v2::CompressionTypePB::LZ4HC: |
| *codec = Lz4HCBlockCompression::instance(); |
| break; |
| case segment_v2::CompressionTypePB::ZLIB: |
| *codec = ZlibBlockCompression::instance(); |
| break; |
| case segment_v2::CompressionTypePB::ZSTD: |
| *codec = ZstdBlockCompression::instance(); |
| break; |
| default: |
| return Status::InternalError("unknown compression type({})", type); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // this can only be used in hive text write |
| Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) { |
| switch (type) { |
| case TFileCompressType::PLAIN: |
| *codec = nullptr; |
| break; |
| case TFileCompressType::ZLIB: |
| *codec = ZlibBlockCompression::instance(); |
| break; |
| case TFileCompressType::GZ: |
| *codec = GzipBlockCompression::instance(); |
| break; |
| case TFileCompressType::BZ2: |
| *codec = Bzip2BlockCompression::instance(); |
| break; |
| case TFileCompressType::LZ4BLOCK: |
| *codec = HadoopLz4BlockCompression::instance(); |
| break; |
| case TFileCompressType::SNAPPYBLOCK: |
| *codec = HadoopSnappyBlockCompression::instance(); |
| break; |
| case TFileCompressType::ZSTD: |
| *codec = ZstdBlockCompression::instance(); |
| break; |
| default: |
| return Status::InternalError("unsupport compression type({}) int hive text", type); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, |
| BlockCompressionCodec** codec) { |
| switch (parquet_codec) { |
| case tparquet::CompressionCodec::UNCOMPRESSED: |
| *codec = nullptr; |
| break; |
| case tparquet::CompressionCodec::SNAPPY: |
| *codec = SnappyBlockCompression::instance(); |
| break; |
| case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW |
| case tparquet::CompressionCodec::LZ4: |
| *codec = HadoopLz4BlockCompression::instance(); |
| break; |
| case tparquet::CompressionCodec::ZSTD: |
| *codec = ZstdBlockCompression::instance(); |
| break; |
| case tparquet::CompressionCodec::GZIP: |
| // Only used on x86 or x86_64 |
| #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
| defined(__i386) || defined(_M_IX86) |
| *codec = GzipBlockCompressionByLibdeflate::instance(); |
| #else |
| *codec = GzipBlockCompression::instance(); |
| #endif |
| break; |
| case tparquet::CompressionCodec::LZO: |
| *codec = LzoBlockCompression::instance(); |
| break; |
| case tparquet::CompressionCodec::BROTLI: |
| *codec = BrotliBlockCompression::instance(); |
| break; |
| default: |
| return Status::InternalError("unknown compression type({})", parquet_codec); |
| } |
| |
| return Status::OK(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |