blob: 58bda02e8da279acc389f1c934fea03dca78ed2e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
#include "common/status.h"
#include "gen-cpp/parquet_types.h"
#include "util/rle-encoding.h"
namespace impala {
class MemPool;
/// Constants used instead of actual levels to indicate special conditions.
class ParquetLevel {
public:
/// The rep and def levels are set to this value to indicate the end of a row group.
static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
/// Indicates an invalid definition or repetition level.
static const int16_t INVALID_LEVEL = -1;
/// Indicates an invalid position value.
static const int16_t INVALID_POS = -1;
};
/// Decoder for encoded Parquet levels. Only supports the RLE encoding, not the deprecated
/// BIT_PACKED encoding. Optionally reads, decodes, and caches level values in batches.
/// Level values are unsigned 8-bit integers because we support a maximum nesting
/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
/// populating the level cache (e.g., with RLE we can memset() repeated values).
class ParquetLevelDecoder {
public:
ParquetLevelDecoder(bool is_def_level_decoder)
: decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR :
TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
/// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
/// encoding requires reading metadata from the page header. 'cache_size' will be
/// rounded up to a multiple of 32 internally.
Status Init(const string& filename, const parquet::Encoding::type* encoding,
MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size);
/// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
/// as batched methods.
inline int16_t ReadLevel();
/// Returns the next level or INVALID_LEVEL if there was an error. It doesn't move
/// to the next level.
inline int16_t PeekLevel();
/// If the next value is part of a repeated run and is not cached, return the length
/// of the repeated run. A max level of 0 is treated as an arbitrarily long run of
/// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 0.
inline int32_t NextRepeatedRunLength();
/// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume
/// 'num_to_consume' items in the run. Not valid to call if there are cached levels
/// that have not been consumed.
inline uint8_t GetRepeatedValue(uint32_t num_to_consume);
/// Decodes and caches the next batch of levels given that there are 'vals_remaining'
/// values left to decode in the page. Resets members associated with the cache.
/// Returns a non-ok status if there was a problem decoding a level, if a level was
/// encountered with a value greater than max_level_, or if fewer than
/// min(CacheSize(), vals_remaining) levels could be read, which indicates that the
/// input did not have the expected number of values. Only valid to call when
/// the cache has been exhausted, i.e. CacheHasNext() is false.
Status CacheNextBatch(int vals_remaining);
/// Functions for working with the level cache.
bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
uint8_t CacheGetNext() {
DCHECK_LT(cached_level_idx_, num_cached_levels_);
return cached_levels_[cached_level_idx_++];
}
// Retrieving the next cached level without consuming it.
uint8_t CachePeekNext() {
DCHECK_LT(cached_level_idx_, num_cached_levels_);
return cached_levels_[cached_level_idx_];
}
void CacheSkipLevels(int num_levels) {
DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
cached_level_idx_ += num_levels;
}
int CacheSize() const { return num_cached_levels_; }
int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
int CacheCurrIdx() const { return cached_level_idx_; }
private:
/// Initializes members associated with the level cache. Allocates memory for
/// the cache from pool, if necessary.
Status InitCache(MemPool* pool, int cache_size);
// Invokes FillCache() when the cache is empty. Returns true if there are values
// in the cache already, or filling the cache was successful, returns false otherwise.
inline bool PrepareForRead();
/// Decodes and writes a batch of levels into the cache. Returns true and sets
/// the number of values written to the cache via *num_cached_levels if no errors
/// are encountered. *num_cached_levels is < 'batch_size' in this case iff the
/// end of input was hit without any other errors. Returns false if there was an
/// error decoding a level or if there was an invalid level value greater than
/// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
/// CacheHasNext() is false.
bool FillCache(int batch_size, int* num_cached_levels);
/// RLE decoder, used if max_level_ > 0.
RleBatchDecoder<uint8_t> rle_decoder_;
/// Buffer for a batch of levels. The memory is allocated and owned by a pool passed
/// in Init().
uint8_t* cached_levels_ = nullptr;
/// Number of valid level values in the cache.
int num_cached_levels_ = 0;
/// Current index into cached_levels_.
int cached_level_idx_ = 0;
/// For error checking and reporting.
int max_level_ = 0;
/// Number of level values cached_levels_ has memory allocated for. Always
/// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
int cache_size_ = 0;
/// Name of the parquet file. Used for reporting level decoding errors.
string filename_;
/// Error code to use when reporting level decoding errors.
TErrorCode::type decoding_error_code_;
};
inline bool ParquetLevelDecoder::PrepareForRead() {
if (UNLIKELY(!CacheHasNext())) {
if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
return false;
}
DCHECK_GE(num_cached_levels_, 0);
if (UNLIKELY(num_cached_levels_ == 0)) {
return false;
}
}
return true;
}
inline int16_t ParquetLevelDecoder::ReadLevel() {
if (UNLIKELY(!PrepareForRead())) return ParquetLevel::INVALID_LEVEL;
return CacheGetNext();
}
inline int16_t ParquetLevelDecoder::PeekLevel() {
if (UNLIKELY(!PrepareForRead())) return ParquetLevel::INVALID_LEVEL;
return CachePeekNext();
}
inline int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
if (CacheHasNext()) return 0;
// Treat always-zero levels as an infinitely long run of zeroes. Return the maximum
// run length allowed by the Parquet standard.
if (max_level_ == 0) return numeric_limits<int32_t>::max();
return rle_decoder_.NextNumRepeats();
}
inline uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
DCHECK(!CacheHasNext());
// Treat always-zero levels as an infinitely long run of zeroes.
if (max_level_ == 0) return 0;
return rle_decoder_.GetRepeatedValue(num_to_consume);
}
} // namespace impala