blob: e4c2abc40877120f5477145d86c32ce70dc2a2f2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/parquet/parquet-level-decoder.h"
#include "exec/read-write-util.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "util/bit-util.h"
#include "util/ubsan.h"
#include "common/names.h"
using parquet::Encoding;
namespace impala {
const int16_t ParquetLevel::ROW_GROUP_END;
const int16_t ParquetLevel::INVALID_LEVEL;
const int16_t ParquetLevel::INVALID_POS;
Status ParquetLevelDecoder::Init(const string& filename, const Encoding::type* encoding,
MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size) {
DCHECK(*data != nullptr);
DCHECK_GE(*data_size, 0);
DCHECK_GT(cache_size, 0);
cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
max_level_ = max_level;
filename_ = filename;
RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
// Return because there is no level data to read, e.g., required field.
if (max_level == 0) return Status::OK();
int32_t num_bytes = 0;
if (Ubsan::EnumToInt(encoding) > Encoding::MAX_ENUM_VALUE) {
stringstream ss;
ss << "Unsupported encoding: " << Ubsan::EnumToInt(encoding);
return Status(ss.str());
}
switch (*encoding) {
case Encoding::RLE: {
Status status;
if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
return status;
}
if (num_bytes < 0 || num_bytes > *data_size) {
return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
}
int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
rle_decoder_.Reset(*data, num_bytes, bit_width);
break;
}
case parquet::Encoding::BIT_PACKED:
return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
default: {
stringstream ss;
ss << "Unsupported encoding: " << *encoding;
return Status(ss.str());
}
}
if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but "
"only $2 bytes left in page",
filename, num_bytes, *data_size));
}
*data += num_bytes;
*data_size -= num_bytes;
return Status::OK();
}
Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
num_cached_levels_ = 0;
cached_level_idx_ = 0;
// Memory has already been allocated.
if (cached_levels_ != nullptr) {
DCHECK_EQ(cache_size_, cache_size);
return Status::OK();
}
cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
if (cached_levels_ == nullptr) {
return pool->mem_tracker()->MemLimitExceeded(
nullptr, "Definition level cache", cache_size);
}
memset(cached_levels_, 0, cache_size);
cache_size_ = cache_size;
return Status::OK();
}
Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
/// Fill the cache completely if there are enough values remaining.
/// Otherwise don't try to read more values than are left.
int batch_size = min(vals_remaining, cache_size_);
if (max_level_ > 0) {
if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_)
|| num_cached_levels_ < batch_size)) {
return Status(decoding_error_code_, vals_remaining, filename_);
}
} else {
// No levels to read, e.g., because the field is required. The cache was
// already initialized with all zeros, so we can hand out those values.
DCHECK_EQ(max_level_, 0);
cached_level_idx_ = 0;
num_cached_levels_ = batch_size;
}
return Status::OK();
}
bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
DCHECK(!CacheHasNext());
DCHECK(num_cached_levels != nullptr);
DCHECK_GE(max_level_, 0);
DCHECK_GE(*num_cached_levels, 0);
cached_level_idx_ = 0;
if (max_level_ == 0) {
// No levels to read, e.g., because the field is required. The cache was
// already initialized with all zeros, so we can hand out those values.
*num_cached_levels = batch_size;
return true;
}
*num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_);
return *num_cached_levels > 0;
}
} // namespace impala