blob: 61792b7e5d8d725fc02dd2e80ca49b8e3cc7c3f4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <algorithm>
#include "kudu/cfile/block_compression.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
namespace kudu {
namespace cfile {
using std::vector;
CompressedBlockBuilder::CompressedBlockBuilder(const CompressionCodec* codec,
size_t size_limit)
: codec_(DCHECK_NOTNULL(codec)),
compressed_size_limit_(size_limit) {
}
Status CompressedBlockBuilder::Compress(const Slice& data, Slice *result) {
vector<Slice> v;
v.push_back(data);
return Compress(v, result);
}
Status CompressedBlockBuilder::Compress(const vector<Slice> &data_slices, Slice *result) {
size_t data_size = 0;
for (const Slice& data : data_slices) {
data_size += data.size();
}
// Ensure that the buffer for header + compressed data is large enough
size_t max_compressed_size = codec_->MaxCompressedLength(data_size);
if (max_compressed_size > compressed_size_limit_) {
return Status::InvalidArgument(
StringPrintf("estimated max size %lu is greater than the expected %lu",
max_compressed_size, compressed_size_limit_));
}
buffer_.resize(kHeaderReservedLength + max_compressed_size);
// Compress
size_t compressed_size;
RETURN_NOT_OK(codec_->Compress(data_slices,
buffer_.data() + kHeaderReservedLength, &compressed_size));
// Set up the header
InlineEncodeFixed32(&buffer_[0], compressed_size);
InlineEncodeFixed32(&buffer_[4], data_size);
*result = Slice(buffer_.data(), compressed_size + kHeaderReservedLength);
return Status::OK();
}
CompressedBlockDecoder::CompressedBlockDecoder(const CompressionCodec* codec,
size_t size_limit)
: codec_(DCHECK_NOTNULL(codec)),
uncompressed_size_limit_(size_limit) {
}
Status CompressedBlockDecoder::ValidateHeader(const Slice& data, uint32_t *uncompressed_size) {
// Check if the on-disk size is correct.
if (data.size() < CompressedBlockBuilder::kHeaderReservedLength) {
return Status::Corruption(
StringPrintf("data size %lu is not enough to contains the header. "
"required %lu, buffer",
data.size(), CompressedBlockBuilder::kHeaderReservedLength),
data.ToDebugString(50));
}
// Decode the header
uint32_t compressed_size = DecodeFixed32(data.data());
*uncompressed_size = DecodeFixed32(data.data() + 4);
// Check if the on-disk data size matches with the buffer
if (data.size() != (CompressedBlockBuilder::kHeaderReservedLength + compressed_size)) {
return Status::Corruption(
StringPrintf("compressed size %u does not match remaining length in buffer %lu, buffer",
compressed_size, data.size() - CompressedBlockBuilder::kHeaderReservedLength),
data.ToDebugString(50));
}
// Check if uncompressed size seems to be reasonable
if (*uncompressed_size > uncompressed_size_limit_) {
return Status::Corruption(
StringPrintf("uncompressed size %u overflows the maximum length %lu, buffer",
compressed_size, uncompressed_size_limit_), data.ToDebugString(50));
}
return Status::OK();
}
Status CompressedBlockDecoder::UncompressIntoBuffer(const Slice& data, uint8_t* dst,
uint32_t uncompressed_size) {
Slice compressed = data;
compressed.remove_prefix(CompressedBlockBuilder::kHeaderReservedLength);
RETURN_NOT_OK(codec_->Uncompress(compressed, dst, uncompressed_size));
return Status::OK();
}
Status CompressedBlockDecoder::Uncompress(const Slice& data, Slice *result) {
// Decode the header
uint32_t uncompressed_size;
RETURN_NOT_OK(ValidateHeader(data, &uncompressed_size));
// Allocate the buffer for the uncompressed data and uncompress
::gscoped_array<uint8_t> buffer(new uint8_t[uncompressed_size]);
RETURN_NOT_OK(UncompressIntoBuffer(data, buffer.get(), uncompressed_size));
*result = Slice(buffer.release(), uncompressed_size);
return Status::OK();
}
} // namespace cfile
} // namespace kudu