| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "parquet-column-readers.h" |
| |
| #include <boost/scoped_ptr.hpp> |
| #include <string> |
| #include <sstream> |
| #include <gflags/gflags.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "exec/hdfs-parquet-scanner.h" |
| #include "exec/parquet-metadata-utils.h" |
| #include "exec/parquet-scratch-tuple-batch.h" |
| #include "exec/read-write-util.h" |
| #include "exec/scanner-context.inline.h" |
| #include "rpc/thrift-util.h" |
| #include "runtime/collection-value-builder.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/io/disk-io-mgr.h" |
| #include "runtime/io/request-context.h" |
| #include "runtime/tuple-row.h" |
| #include "runtime/tuple.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/mem-pool.h" |
| #include "util/bit-util.h" |
| #include "util/codec.h" |
| #include "util/debug-util.h" |
| #include "util/dict-encoding.h" |
| #include "util/rle-encoding.h" |
| |
| #include "common/names.h" |
| |
| // Provide a workaround for IMPALA-1658. |
| DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, |
| "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will " |
| "be converted from UTC to local time. Writes are unaffected."); |
| |
| // Max dictionary page header size in bytes. This is an estimate and only needs to be an |
| // upper bound. |
| static const int MAX_DICT_HEADER_SIZE = 100; |
| |
| // Max data page header size in bytes. This is an estimate and only needs to be an upper |
| // bound. It is theoretically possible to have a page header of any size due to string |
| // value statistics, but in practice we'll have trouble reading string values this large. |
| // Also, this limit is in place to prevent impala from reading corrupt parquet files. |
| DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes"); |
| |
| // Trigger debug action on every other call of Read*ValueBatch() once at least 128 |
| // tuples have been produced to simulate failure such as exceeding memory limit. |
| // Triggering it every other call so as not to always fail on the first column reader |
| // when materializing multiple columns. Failing on non-empty row batch tests proper |
| // resources freeing by the Parquet scanner. |
| #ifndef NDEBUG |
| static int debug_count = 0; |
| #define SHOULD_TRIGGER_DEBUG_ACTION(num_tuples) \ |
| ((debug_count++ % 2) == 1 && num_tuples >= 128) |
| #else |
| #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false) |
| #endif |
| |
| using namespace impala::io; |
| |
| using parquet::Encoding; |
| |
| namespace impala { |
| |
| const string PARQUET_COL_MEM_LIMIT_EXCEEDED = |
| "ParquetColumnReader::$0() failed to allocate $1 bytes for $2."; |
| |
| Status ParquetLevelDecoder::Init(const string& filename, |
| Encoding::type encoding, MemPool* cache_pool, int cache_size, |
| int max_level, int num_buffered_values, uint8_t** data, int* data_size) { |
| DCHECK(*data != nullptr); |
| DCHECK_GE(*data_size, 0); |
| DCHECK_GE(num_buffered_values, 0); |
| DCHECK_GT(cache_size, 0); |
| cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32); |
| encoding_ = encoding; |
| max_level_ = max_level; |
| num_buffered_values_ = num_buffered_values; |
| filename_ = filename; |
| RETURN_IF_ERROR(InitCache(cache_pool, cache_size)); |
| |
| // Return because there is no level data to read, e.g., required field. |
| if (max_level == 0) return Status::OK(); |
| |
| int32_t num_bytes = 0; |
| switch (encoding) { |
| case Encoding::RLE: { |
| Status status; |
| if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) { |
| return status; |
| } |
| if (num_bytes < 0 || num_bytes > *data_size) { |
| return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes); |
| } |
| int bit_width = BitUtil::Log2Ceiling64(max_level + 1); |
| rle_decoder_.Reset(*data, num_bytes, bit_width); |
| break; |
| } |
| case parquet::Encoding::BIT_PACKED: |
| return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename); |
| default: { |
| stringstream ss; |
| ss << "Unsupported encoding: " << encoding; |
| return Status(ss.str()); |
| } |
| } |
| if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) { |
| return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but " |
| "only $2 bytes left in page", filename, num_bytes, *data_size)); |
| } |
| *data += num_bytes; |
| *data_size -= num_bytes; |
| return Status::OK(); |
| } |
| |
| Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) { |
| num_cached_levels_ = 0; |
| cached_level_idx_ = 0; |
| // Memory has already been allocated. |
| if (cached_levels_ != nullptr) { |
| DCHECK_EQ(cache_size_, cache_size); |
| return Status::OK(); |
| } |
| |
| cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size)); |
| if (cached_levels_ == nullptr) { |
| return pool->mem_tracker()->MemLimitExceeded( |
| nullptr, "Definition level cache", cache_size); |
| } |
| memset(cached_levels_, 0, cache_size); |
| cache_size_ = cache_size; |
| return Status::OK(); |
| } |
| |
| inline int16_t ParquetLevelDecoder::ReadLevel() { |
| if (UNLIKELY(!CacheHasNext())) { |
| if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) { |
| return HdfsParquetScanner::INVALID_LEVEL; |
| } |
| DCHECK_GE(num_cached_levels_, 0); |
| if (UNLIKELY(num_cached_levels_ == 0)) { |
| return HdfsParquetScanner::INVALID_LEVEL; |
| } |
| } |
| return CacheGetNext(); |
| } |
| |
| Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) { |
| /// Fill the cache completely if there are enough values remaining. |
| /// Otherwise don't try to read more values than are left. |
| int batch_size = min(vals_remaining, cache_size_); |
| if (max_level_ > 0) { |
| if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) || |
| num_cached_levels_ < batch_size)) { |
| return Status(decoding_error_code_, num_buffered_values_, filename_); |
| } |
| } else { |
| // No levels to read, e.g., because the field is required. The cache was |
| // already initialized with all zeros, so we can hand out those values. |
| DCHECK_EQ(max_level_, 0); |
| cached_level_idx_ = 0; |
| num_cached_levels_ = batch_size; |
| } |
| return Status::OK(); |
| } |
| |
| bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) { |
| DCHECK(!CacheHasNext()); |
| DCHECK(num_cached_levels != nullptr); |
| DCHECK_GE(max_level_, 0); |
| DCHECK_GE(*num_cached_levels, 0); |
| cached_level_idx_ = 0; |
| if (max_level_ == 0) { |
| // No levels to read, e.g., because the field is required. The cache was |
| // already initialized with all zeros, so we can hand out those values. |
| *num_cached_levels = batch_size; |
| return true; |
| } |
| DCHECK_EQ(encoding_, parquet::Encoding::RLE); |
| *num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_); |
| return *num_cached_levels > 0; |
| } |
| |
| /// Per column type reader. InternalType is the datatype that Impala uses internally to |
| /// store tuple data and PARQUET_TYPE is the corresponding primitive datatype (as defined |
| /// in the parquet spec) that is used to store column values in parquet files. |
| /// If MATERIALIZED is true, the column values are materialized into the slot described |
| /// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but |
| /// the position can be accessed. |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| class ScalarColumnReader : public BaseScalarColumnReader { |
| public: |
| ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, |
| const SlotDescriptor* slot_desc); |
| virtual ~ScalarColumnReader() { } |
| |
| virtual bool ReadValue(MemPool* pool, Tuple* tuple) override { |
| return ReadValue<true>(tuple); |
| } |
| |
| virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override { |
| return ReadValue<false>(tuple); |
| } |
| |
| virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, |
| uint8_t* tuple_mem, int* num_values) override { |
| return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values); |
| } |
| |
| virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, |
| uint8_t* tuple_mem, int* num_values) override { |
| return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, num_values); |
| } |
| |
| virtual DictDecoderBase* GetDictionaryDecoder() override { |
| return HasDictionaryDecoder() ? &dict_decoder_ : nullptr; |
| } |
| |
| virtual bool NeedsConversion() override { return NeedsConversionInline(); } |
| virtual bool NeedsValidation() override { return NeedsValidationInline(); } |
| |
| protected: |
| template <bool IN_COLLECTION> |
| inline bool ReadValue(Tuple* tuple); |
| |
| /// Implementation of the ReadValueBatch() functions specialized for this |
| /// column reader type. This function drives the reading of data pages and |
| /// caching of rep/def levels. Once a data page and cached levels are available, |
| /// it calls into a more specialized MaterializeValueBatch() for doing the actual |
| /// value materialization using the level caches. |
| /// Use RESTRICT so that the compiler knows that it is safe to cache member |
| /// variables in registers or on the stack (otherwise gcc's alias analysis |
| /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the |
| /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with |
| /// -fno-strict-alias). |
| template <bool IN_COLLECTION> |
| bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, |
| int* RESTRICT num_values) RESTRICT; |
| |
| /// Helper function for ReadValueBatch() above that performs value materialization. |
| /// It assumes a data page with remaining values is available, and that the def/rep |
| /// level caches have been populated. Materializes values into 'tuple_mem' with a |
| /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the number of |
| /// values materialized in 'num_values'. |
| /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not |
| /// handled in this function. |
| /// Use RESTRICT so that the compiler knows that it is safe to cache member |
| /// variables in registers or on the stack (otherwise gcc's alias analysis |
| /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the |
| /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with |
| /// -fno-strict-alias). |
| template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION> |
| bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, |
| int* RESTRICT num_values) RESTRICT; |
| |
| /// Same as above, but dispatches to the appropriate templated implementation of |
| /// MaterializeValueBatch() based on 'page_encoding_' and NeedsConversionInline(). |
| template <bool IN_COLLECTION> |
| bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, |
| int* RESTRICT num_values) RESTRICT; |
| |
| virtual Status CreateDictionaryDecoder(uint8_t* values, int size, |
| DictDecoderBase** decoder) override { |
| if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) { |
| return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), |
| slot_desc_->type().DebugString(), "could not decode dictionary"); |
| } |
| dict_decoder_init_ = true; |
| *decoder = &dict_decoder_; |
| return Status::OK(); |
| } |
| |
| virtual bool HasDictionaryDecoder() override { |
| return dict_decoder_init_; |
| } |
| |
| virtual void ClearDictionaryDecoder() override { |
| dict_decoder_init_ = false; |
| } |
| |
| virtual Status InitDataPage(uint8_t* data, int size) override; |
| |
| private: |
| /// Writes the next value into the appropriate destination slot in 'tuple'. Returns |
| /// false if execution should be aborted for some reason, e.g. parse_error_ is set, the |
| /// query is cancelled, or the scan node limit was reached. Otherwise returns true. |
| /// |
| /// Force inlining - GCC does not always inline this into hot loops. |
| template <Encoding::type ENCODING, bool NEEDS_CONVERSION> |
| inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple); |
| |
| /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'. |
| /// This helper is only used on the batched decoding path where we need to reset |
| /// 'pos_current_value_' to 0 based on 'rep_level'. |
| inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* pos); |
| |
| /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte |
| /// past the end of the buffer. Return false and set 'parse_error_' if there is an |
| /// error decoding the value. |
| template <Encoding::type ENCODING> |
| inline ALWAYS_INLINE bool DecodeValue(uint8_t** data, const uint8_t* data_end, |
| InternalType* RESTRICT val) RESTRICT; |
| |
| /// Most column readers never require conversion, so we can avoid branches by |
| /// returning constant false. Column readers for types that require conversion |
| /// must specialize this function. |
| inline bool NeedsConversionInline() const { |
| DCHECK(!needs_conversion_); |
| return false; |
| } |
| |
| /// Similar to NeedsConversion(), most column readers do not require validation, |
| /// so to avoid branches, we return constant false. In general, types where not |
| /// all possible bit representations of the data type are valid should be |
| /// validated. |
| inline bool NeedsValidationInline() const { |
| return false; |
| } |
| |
| /// Converts and writes 'src' into 'slot' based on desc_->type() |
| bool ConvertSlot(const InternalType* src, void* slot) { |
| DCHECK(false); |
| return false; |
| } |
| |
| /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it |
| /// is invalid, logs the error and returns false. If the error should stop execution, |
| /// sets 'parent_->parse_status_'. |
| bool ValidateValue(InternalType* val) const { |
| DCHECK(false); |
| return false; |
| } |
| |
| /// Pull out slow-path Status construction code |
| void __attribute__((noinline)) SetDictDecodeError() { |
| parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(), |
| slot_desc_->type().DebugString(), stream_->file_offset()); |
| } |
| |
| void __attribute__((noinline)) SetPlainDecodeError() { |
| parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(), |
| slot_desc_->type().DebugString(), stream_->file_offset()); |
| } |
| |
| /// Dictionary decoder for decoding column values. |
| DictDecoder<InternalType> dict_decoder_; |
| |
| /// True if dict_decoder_ has been initialized with a dictionary page. |
| bool dict_decoder_init_ = false; |
| |
| /// true if decoded values must be converted before being written to an output tuple. |
| bool needs_conversion_ = false; |
| |
| /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or |
| /// the max length for VARCHAR columns. Unused otherwise. |
| int fixed_len_size_; |
| }; |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader( |
| HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) |
| : BaseScalarColumnReader(parent, node, slot_desc), |
| dict_decoder_(parent->scan_node_->mem_tracker()) { |
| if (!MATERIALIZED) { |
| // We're not materializing any values, just counting them. No need (or ability) to |
| // initialize state used to materialize values. |
| DCHECK(slot_desc_ == nullptr); |
| return; |
| } |
| |
| DCHECK(slot_desc_ != nullptr); |
| DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN); |
| if (slot_desc_->type().type == TYPE_DECIMAL |
| && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) { |
| fixed_len_size_ = node.element->type_length; |
| } else if (slot_desc_->type().type == TYPE_VARCHAR) { |
| fixed_len_size_ = slot_desc_->type().len; |
| } else { |
| fixed_len_size_ = -1; |
| } |
| needs_conversion_ = slot_desc_->type().type == TYPE_CHAR || |
| // TODO: Add logic to detect file versions that have unconverted TIMESTAMP |
| // values. Currently all versions have converted values. |
| (FLAGS_convert_legacy_hive_parquet_utc_timestamps && |
| slot_desc_->type().type == TYPE_TIMESTAMP && |
| parent->file_version_.application == "parquet-mr"); |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage( |
| uint8_t* data, int size) { |
| // Data can be empty if the column contains all NULLs |
| DCHECK_GE(size, 0); |
| page_encoding_ = current_page_header_.data_page_header.encoding; |
| if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY && |
| page_encoding_ != parquet::Encoding::PLAIN) { |
| return GetUnsupportedDecodingError(); |
| } |
| |
| // If slot_desc_ is NULL, we don't need so decode any values so dict_decoder_ does |
| // not need to be initialized. |
| if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) { |
| if (!dict_decoder_init_) { |
| return Status("File corrupt. Missing dictionary page."); |
| } |
| RETURN_IF_ERROR(dict_decoder_.SetData(data, size)); |
| } |
| // TODO: Perform filter selectivity checks here. |
| return Status::OK(); |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| template <bool IN_COLLECTION> |
| bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue( |
| Tuple* tuple) { |
| // NextLevels() should have already been called and def and rep levels should be in |
| // valid range. |
| DCHECK_GE(rep_level_, 0); |
| DCHECK_GE(def_level_, 0); |
| DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << |
| "Caller should have called NextLevels() until we are ready to read a value"; |
| |
| if (MATERIALIZED) { |
| if (def_level_ >= max_def_level()) { |
| bool continue_execution; |
| if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { |
| continue_execution = NeedsConversionInline() ? |
| ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) : |
| ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple); |
| } else { |
| DCHECK_EQ(page_encoding_, Encoding::PLAIN); |
| continue_execution = NeedsConversionInline() ? |
| ReadSlot<Encoding::PLAIN, true>(tuple) : |
| ReadSlot<Encoding::PLAIN, false>(tuple); |
| } |
| if (!continue_execution) return false; |
| } else { |
| tuple->SetNull(null_indicator_offset_); |
| } |
| } |
| return NextLevels<IN_COLLECTION>(); |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| template <bool IN_COLLECTION> |
| bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatch( |
| int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, |
| int* RESTRICT num_values) RESTRICT { |
| // Repetition level is only present if this column is nested in a collection type. |
| if (IN_COLLECTION) { |
| DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString(); |
| } else { |
| DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); |
| } |
| |
| int val_count = 0; |
| bool continue_execution = true; |
| while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { |
| DCHECK_GE(num_buffered_values_, 0); |
| // Read next page if necessary. |
| if (num_buffered_values_ == 0) { |
| if (!NextPage()) { |
| continue_execution = parent_->parse_status_.ok(); |
| continue; |
| } |
| } |
| |
| // Not materializing anything - skip decoding any levels and rely on the value |
| // count from page metadata to return the correct number of rows. |
| if (!MATERIALIZED && !IN_COLLECTION) { |
| int vals_to_add = min(num_buffered_values_, max_values - val_count); |
| val_count += vals_to_add; |
| num_buffered_values_ -= vals_to_add; |
| continue; |
| } |
| // Fill the rep level cache if needed. We are flattening out the fields of the |
| // nested collection into the top-level tuple returned by the scan, so we don't |
| // care about the nesting structure unless the position slot is being populated. |
| if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) { |
| parent_->parse_status_.MergeStatus( |
| rep_levels_.CacheNextBatch(num_buffered_values_)); |
| if (UNLIKELY(!parent_->parse_status_.ok())) return false; |
| } |
| |
| // Fill def level cache if needed. |
| if (!def_levels_.CacheHasNext()) { |
| // TODO: add a fast path here if there's a run of repeated values. |
| parent_->parse_status_.MergeStatus( |
| def_levels_.CacheNextBatch(num_buffered_values_)); |
| if (UNLIKELY(!parent_->parse_status_.ok())) return false; |
| } |
| |
| // Read data page and cached levels to materialize values. |
| uint8_t* next_tuple = tuple_mem + val_count * tuple_size; |
| int remaining_val_capacity = max_values - val_count; |
| int ret_val_count = 0; |
| continue_execution = MaterializeValueBatch<IN_COLLECTION>( |
| remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); |
| val_count += ret_val_count; |
| if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { |
| continue_execution &= ColReaderDebugAction(&val_count); |
| } |
| } |
| *num_values = val_count; |
| return continue_execution; |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION> |
| bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch( |
| int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, |
| int* RESTRICT num_values) RESTRICT { |
| DCHECK(MATERIALIZED || IN_COLLECTION); |
| DCHECK_GT(num_buffered_values_, 0); |
| DCHECK(def_levels_.CacheHasNext()); |
| if (IN_COLLECTION && pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext()); |
| const int cache_start_idx = def_levels_.CacheCurrIdx(); |
| uint8_t* curr_tuple = tuple_mem; |
| int val_count = 0; |
| while (def_levels_.CacheHasNext() && val_count < max_values) { |
| Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); |
| int def_level = def_levels_.CacheGetNext(); |
| |
| if (IN_COLLECTION) { |
| if (def_level < def_level_of_immediate_repeated_ancestor()) { |
| // A containing repeated field is empty or NULL. Skip the value but |
| // move to the next repetition level if necessary. |
| if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1); |
| continue; |
| } |
| if (pos_slot_desc_ != nullptr) { |
| ReadPositionBatched(rep_levels_.CacheGetNext(), |
| tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset())); |
| } |
| } |
| |
| if (MATERIALIZED) { |
| if (def_level >= max_def_level()) { |
| bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple); |
| if (UNLIKELY(!continue_execution)) return false; |
| } else { |
| tuple->SetNull(null_indicator_offset_); |
| } |
| } |
| curr_tuple += tuple_size; |
| ++val_count; |
| } |
| num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); |
| *num_values = val_count; |
| return true; |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| template <bool IN_COLLECTION> |
| bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch( |
| int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, |
| int* RESTRICT num_values) RESTRICT { |
| // Dispatch to the correct templated implementation of MaterializeValueBatch(). |
| if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { |
| if (NeedsConversionInline()) { |
| return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>( |
| max_values, tuple_size, tuple_mem, num_values); |
| } else { |
| return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, false>( |
| max_values, tuple_size, tuple_mem, num_values); |
| } |
| } else { |
| DCHECK_EQ(page_encoding_, Encoding::PLAIN); |
| if (NeedsConversionInline()) { |
| return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>( |
| max_values, tuple_size, tuple_mem, num_values); |
| } else { |
| return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>( |
| max_values, tuple_size, tuple_mem, num_values); |
| } |
| } |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| template <Encoding::type ENCODING, bool NEEDS_CONVERSION> |
| bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot( |
| Tuple* RESTRICT tuple) RESTRICT { |
| void* slot = tuple->GetSlot(tuple_offset_); |
| // Use an uninitialized stack allocation for temporary value to avoid running |
| // constructors doing work unnecessarily, e.g. if T == StringValue. |
| alignas(InternalType) uint8_t val_buf[sizeof(InternalType)]; |
| InternalType* val_ptr = |
| reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot); |
| |
| if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false; |
| if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) { |
| if (UNLIKELY(!parent_->parse_status_.ok())) return false; |
| // The value is invalid but execution should continue - set the null indicator and |
| // skip conversion. |
| tuple->SetNull(null_indicator_offset_); |
| return true; |
| } |
| if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false; |
| return true; |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| template <Encoding::type ENCODING> |
| bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue( |
| uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end, |
| InternalType* RESTRICT val) RESTRICT { |
| DCHECK_EQ(page_encoding_, ENCODING); |
| if (ENCODING == Encoding::PLAIN_DICTIONARY) { |
| if (UNLIKELY(!dict_decoder_.GetNextValue(val))) { |
| SetDictDecodeError(); |
| return false; |
| } |
| } else { |
| DCHECK_EQ(ENCODING, Encoding::PLAIN); |
| int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>( |
| *data, data_end, fixed_len_size_, val); |
| if (UNLIKELY(encoded_len < 0)) { |
| SetPlainDecodeError(); |
| return false; |
| } |
| *data += encoded_len; |
| } |
| return true; |
| } |
| |
| template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> |
| void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED> |
| ::ReadPositionBatched(int16_t rep_level, int64_t* pos) { |
| // Reset position counter if we are at the start of a new parent collection. |
| if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; |
| *pos = pos_current_value_++; |
| } |
| |
| template <> |
| inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true> |
| ::NeedsConversionInline() const { |
| return needs_conversion_; |
| } |
| |
| template <> |
| bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot( |
| const StringValue* src, void* slot) { |
| DCHECK(slot_desc() != nullptr); |
| DCHECK(slot_desc()->type().type == TYPE_CHAR); |
| int char_len = slot_desc()->type().len; |
| int unpadded_len = min(char_len, src->len); |
| char* dst_char = reinterpret_cast<char*>(slot); |
| memcpy(dst_char, src->ptr, unpadded_len); |
| StringValue::PadWithSpaces(dst_char, char_len, unpadded_len); |
| return true; |
| } |
| |
| template <> |
| inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true> |
| ::NeedsConversionInline() const { |
| return needs_conversion_; |
| } |
| |
| template <> |
| bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot( |
| const TimestampValue* src, void* slot) { |
| // Conversion should only happen when this flag is enabled. |
| DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps); |
| TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot); |
| *dst_ts = *src; |
| if (dst_ts->HasDateAndTime()) dst_ts->UtcToLocal(); |
| return true; |
| } |
| |
| template <> |
| inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true> |
| ::NeedsValidationInline() const { |
| return true; |
| } |
| |
| template <> |
| bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue( |
| TimestampValue* val) const { |
| if (UNLIKELY(!TimestampValue::IsValidDate(val->date()))) { |
| ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE, |
| filename(), node_.element->name); |
| Status status = parent_->state_->LogOrReturnError(msg); |
| if (!status.ok()) parent_->parse_status_ = status; |
| return false; |
| } |
| return true; |
| } |
| |
| class BoolColumnReader : public BaseScalarColumnReader { |
| public: |
| BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, |
| const SlotDescriptor* slot_desc) |
| : BaseScalarColumnReader(parent, node, slot_desc) { |
| if (slot_desc_ != nullptr) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN); |
| } |
| |
| virtual ~BoolColumnReader() { } |
| |
| virtual bool ReadValue(MemPool* pool, Tuple* tuple) override { |
| return ReadValue<true>(tuple); |
| } |
| |
| virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override { |
| return ReadValue<false>(tuple); |
| } |
| |
| protected: |
| virtual Status CreateDictionaryDecoder(uint8_t* values, int size, |
| DictDecoderBase** decoder) override { |
| DCHECK(false) << "Dictionary encoding is not supported for bools. Should never " |
| << "have gotten this far."; |
| return Status::OK(); |
| } |
| |
| virtual bool HasDictionaryDecoder() override { |
| // Decoder should never be created for bools. |
| return false; |
| } |
| |
| virtual void ClearDictionaryDecoder() override { } |
| |
| virtual Status InitDataPage(uint8_t* data, int size) override; |
| |
| private: |
| template <bool IN_COLLECTION> |
| inline bool ReadValue(Tuple* tuple); |
| |
| /// Decodes the next value into 'value'. Returns false and sets |
| /// 'parent_->parse_status_' if an error is encountered decoding the |
| /// value. Otherwise returns true. |
| template <bool IN_COLLECTION> |
| inline bool DecodeValue(bool* value); |
| |
| /// A buffer to store unpacked values. Must be a multiple of 32 size to use the |
| /// batch-oriented interface of BatchedBitReader. |
| static const int UNPACKED_BUFFER_LEN = 128; |
| bool unpacked_values_[UNPACKED_BUFFER_LEN]; |
| |
| /// The number of valid values in 'unpacked_values_'. |
| int num_unpacked_values_ = 0; |
| |
| /// The next value to return from 'unpacked_values_'. |
| int unpacked_value_idx_ = 0; |
| |
| /// Bit packed decoder, used if 'encoding_' is PLAIN. |
| BatchedBitReader bool_values_; |
| |
| /// RLE decoder, used if 'encoding_' is RLE. |
| RleBatchDecoder<bool> rle_decoder_; |
| }; |
| |
| Status BoolColumnReader::InitDataPage(uint8_t* data, int size) { |
| page_encoding_ = current_page_header_.data_page_header.encoding; |
| // Only the relevant decoder is initialized for a given data page. |
| switch (page_encoding_) { |
| case parquet::Encoding::PLAIN: |
| bool_values_.Reset(data, size); |
| break; |
| case parquet::Encoding::RLE: |
| // The first 4 bytes contain the size of the encoded data. This information is |
| // redundant, as this is the last part of the data page, and the number of |
| // remaining bytes is already known. |
| rle_decoder_.Reset(data + 4, size - 4, 1); |
| break; |
| default: |
| return GetUnsupportedDecodingError(); |
| } |
| num_unpacked_values_ = 0; |
| unpacked_value_idx_ = 0; |
| return Status::OK(); |
| } |
| |
| template <bool IN_COLLECTION> |
| bool BoolColumnReader::ReadValue(Tuple* tuple) { |
| DCHECK(slot_desc_ != nullptr); |
| // Def and rep levels should be in valid range. |
| DCHECK_GE(rep_level_, 0); |
| DCHECK_GE(def_level_, 0); |
| DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << |
| "Caller should have called NextLevels() until we are ready to read a value"; |
| |
| if (def_level_ >= max_def_level()) { |
| bool* slot = tuple->GetBoolSlot(tuple_offset_); |
| if (UNLIKELY(!DecodeValue<IN_COLLECTION>(slot))) return false; |
| } else { |
| // Null value |
| tuple->SetNull(null_indicator_offset_); |
| } |
| return NextLevels<IN_COLLECTION>(); |
| } |
| |
| template <bool IN_COLLECTION> |
| bool BoolColumnReader::DecodeValue(bool* value) { |
| if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) { |
| *value = unpacked_values_[unpacked_value_idx_++]; |
| } else { |
| // Unpack as many values as we can into the buffer. We expect to read at least one |
| // value. |
| if (page_encoding_ == parquet::Encoding::PLAIN) { |
| num_unpacked_values_ = |
| bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); |
| } else { |
| num_unpacked_values_ = |
| rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]); |
| } |
| |
| if (UNLIKELY(num_unpacked_values_ == 0)) { |
| parent_->parse_status_ = Status("Invalid bool column."); |
| return false; |
| } |
| *value = unpacked_values_[0]; |
| unpacked_value_idx_ = 1; |
| } |
| return true; |
| } |
| |
| // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling |
| // path doesn't falsely report that the file is corrupted. |
| bool ParquetColumnReader::ColReaderDebugAction(int* val_count) { |
| #ifndef NDEBUG |
| Status status = parent_->ScannerDebugAction(); |
| if (!status.ok()) { |
| if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status); |
| *val_count = 0; |
| return false; |
| } |
| #endif |
| return true; |
| } |
| |
| bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values, |
| int tuple_size, uint8_t* tuple_mem, int* num_values) { |
| // The below loop requires that NextLevels() was called previously to populate |
| // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each |
| // row group. |
| if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false; |
| |
| int val_count = 0; |
| bool continue_execution = true; |
| while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { |
| Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); |
| if (def_level_ < def_level_of_immediate_repeated_ancestor()) { |
| // A containing repeated field is empty or NULL |
| continue_execution = NextLevels(); |
| continue; |
| } |
| // Fill in position slot if applicable |
| if (pos_slot_desc_ != nullptr) { |
| ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset())); |
| } |
| continue_execution = ReadValue(pool, tuple); |
| ++val_count; |
| if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { |
| continue_execution &= ColReaderDebugAction(&val_count); |
| } |
| } |
| *num_values = val_count; |
| return continue_execution; |
| } |
| |
| bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool, |
| int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) { |
| // The below loop requires that NextLevels() was called previously to populate |
| // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each |
| // row group. |
| if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false; |
| |
| int val_count = 0; |
| bool continue_execution = true; |
| while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { |
| Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); |
| continue_execution = ReadNonRepeatedValue(pool, tuple); |
| ++val_count; |
| if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { |
| continue_execution &= ColReaderDebugAction(&val_count); |
| } |
| } |
| *num_values = val_count; |
| return continue_execution; |
| } |
| |
| void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) { |
| // NextLevels() should have already been called |
| DCHECK_GE(rep_level_, 0); |
| DCHECK_GE(def_level_, 0); |
| DCHECK_GE(pos_current_value_, 0); |
| DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << |
| "Caller should have called NextLevels() until we are ready to read a value"; |
| *pos = pos_current_value_++; |
| } |
| |
| // In 1.1, we had a bug where the dictionary page metadata was not set. Returns true |
| // if this matches those versions and compatibility workarounds need to be used. |
| static bool RequiresSkippedDictionaryHeaderCheck( |
| const ParquetFileVersion& v) { |
| if (v.application != "impala") return false; |
| return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal); |
| } |
| |
| Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, |
| const parquet::ColumnChunk& col_chunk, int row_group_idx) { |
| // Ensure metadata is valid before using it to initialize the reader. |
| RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_, |
| parent_->filename(), row_group_idx, col_idx(), schema_element(), |
| parent_->state_)); |
| num_buffered_values_ = 0; |
| data_ = nullptr; |
| data_end_ = nullptr; |
| stream_ = nullptr; |
| io_reservation_ = 0; |
| metadata_ = &col_chunk.meta_data; |
| num_values_read_ = 0; |
| def_level_ = HdfsParquetScanner::INVALID_LEVEL; |
| // See ColumnReader constructor. |
| rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL; |
| pos_current_value_ = HdfsParquetScanner::INVALID_POS; |
| |
| if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { |
| RETURN_IF_ERROR(Codec::CreateDecompressor( |
| nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_)); |
| } |
| int64_t col_start = col_chunk.meta_data.data_page_offset; |
| if (col_chunk.meta_data.__isset.dictionary_page_offset) { |
| // Already validated in ValidateColumnOffsets() |
| DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start); |
| col_start = col_chunk.meta_data.dictionary_page_offset; |
| } |
| int64_t col_len = col_chunk.meta_data.total_compressed_size; |
| if (col_len <= 0) { |
| return Status(Substitute("File '$0' contains invalid column chunk size: $1", |
| filename(), col_len)); |
| } |
| int64_t col_end = col_start + col_len; |
| |
| // Already validated in ValidateColumnOffsets() |
| DCHECK_GT(col_end, 0); |
| DCHECK_LT(col_end, file_desc.file_length); |
| const ParquetFileVersion& file_version = parent_->file_version_; |
| if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) { |
| // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the |
| // dictionary page header size in total_compressed_size and total_uncompressed_size |
| // (see IMPALA-694). We pad col_len to compensate. |
| int64_t bytes_remaining = file_desc.file_length - col_end; |
| int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining); |
| col_len += pad; |
| } |
| |
| // TODO: this will need to change when we have co-located files and the columns |
| // are different files. |
| if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) { |
| return Status(Substitute("Expected parquet column file path '$0' to match " |
| "filename '$1'", col_chunk.file_path, filename())); |
| } |
| |
| const ScanRange* metadata_range = parent_->metadata_range_; |
| int64_t partition_id = parent_->context_->partition_descriptor()->id(); |
| const ScanRange* split_range = |
| static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split; |
| // Determine if the column is completely contained within a local split. |
| bool col_range_local = split_range->expected_local() |
| && col_start >= split_range->offset() |
| && col_end <= split_range->offset() + split_range->len(); |
| scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(), |
| filename(), col_len, col_start, partition_id, split_range->disk_id(), |
| col_range_local, |
| BufferOpts(split_range->try_cache(), file_desc.mtime)); |
| ClearDictionaryDecoder(); |
| return Status::OK(); |
| } |
| |
| Status BaseScalarColumnReader::StartScan() { |
| DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan."; |
| DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| ScannerContext* context = parent_->context_; |
| DCHECK_GT(io_reservation_, 0); |
| bool needs_buffers; |
| RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange( |
| scan_range_, &needs_buffers)); |
| if (needs_buffers) { |
| RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( |
| context->bp_client(), scan_range_, io_reservation_)); |
| } |
| stream_ = parent_->context_->AddStream(scan_range_, io_reservation_); |
| DCHECK(stream_ != nullptr); |
| return Status::OK(); |
| } |
| |
| Status BaseScalarColumnReader::ReadPageHeader(bool peek, |
| parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) { |
| DCHECK(stream_ != nullptr); |
| *eos = false; |
| |
| uint8_t* buffer; |
| int64_t buffer_size; |
| RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size)); |
| // check for end of stream |
| if (buffer_size == 0) { |
| // The data pages contain fewer values than stated in the column metadata. |
| DCHECK(stream_->eosr()); |
| DCHECK_LT(num_values_read_, metadata_->num_values); |
| // TODO for 2.3: node_.element->name isn't necessarily useful |
| ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values, |
| num_values_read_, node_.element->name, filename()); |
| RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg)); |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| // We don't know the actual header size until the thrift object is deserialized. Loop |
| // until we successfully deserialize the header or exceed the maximum header size. |
| uint32_t header_size; |
| Status status; |
| while (true) { |
| header_size = buffer_size; |
| status = DeserializeThriftMsg(buffer, &header_size, true, next_page_header); |
| if (status.ok()) break; |
| |
| if (buffer_size >= FLAGS_max_page_header_size) { |
| stringstream ss; |
| ss << "ParquetScanner: could not read data page because page header exceeded " |
| << "maximum size of " |
| << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES); |
| status.AddDetail(ss.str()); |
| return status; |
| } |
| |
| // Didn't read entire header, increase buffer size and try again |
| int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024); |
| status = Status::OK(); |
| bool success = stream_->GetBytes( |
| new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true); |
| if (!success) { |
| DCHECK(!status.ok()); |
| return status; |
| } |
| DCHECK(status.ok()); |
| |
| // Even though we increased the allowed buffer size, the number of bytes |
| // read did not change. The header is not limited by the buffer space, |
| // so it must be incomplete in the file. |
| if (buffer_size == new_buffer_size) { |
| DCHECK_NE(new_buffer_size, 0); |
| return Status(TErrorCode::PARQUET_HEADER_EOF, filename()); |
| } |
| DCHECK_GT(new_buffer_size, buffer_size); |
| buffer_size = new_buffer_size; |
| } |
| |
| *next_header_size = header_size; |
| |
| // Successfully deserialized current_page_header_ |
| if (!peek && !stream_->SkipBytes(header_size, &status)) return status; |
| |
| int data_size = next_page_header->compressed_page_size; |
| if (UNLIKELY(data_size < 0)) { |
| return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for " |
| "column '$2'", filename(), data_size, schema_element().name)); |
| } |
| int uncompressed_size = next_page_header->uncompressed_page_size; |
| if (UNLIKELY(uncompressed_size < 0)) { |
| return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page " |
| "size $1 for column '$2'", filename(), uncompressed_size, |
| schema_element().name)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BaseScalarColumnReader::InitDictionary() { |
| // Peek at the next page header |
| bool eos; |
| parquet::PageHeader next_page_header; |
| uint32_t next_header_size; |
| DCHECK(stream_ != nullptr); |
| DCHECK(!HasDictionaryDecoder()); |
| |
| RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header, |
| &next_header_size, &eos)); |
| if (eos) return Status::OK(); |
| // The dictionary must be the first data page, so if the first page |
| // is not a dictionary, then there is no dictionary. |
| if (next_page_header.type != parquet::PageType::DICTIONARY_PAGE) return Status::OK(); |
| |
| current_page_header_ = next_page_header; |
| Status status; |
| if (!stream_->SkipBytes(next_header_size, &status)) return status; |
| |
| int data_size = current_page_header_.compressed_page_size; |
| if (slot_desc_ == nullptr) { |
| // Skip processing the dictionary page if we don't need to decode any values. In |
| // addition to being unnecessary, we are likely unable to successfully decode the |
| // dictionary values because we don't necessarily create the right type of scalar |
| // reader if there's no slot to read into (see CreateReader()). |
| if (!stream_->SkipBytes(data_size, &status)) return status; |
| return Status::OK(); |
| } |
| |
| if (node_.element->type == parquet::Type::BOOLEAN) { |
| return Status("Unexpected dictionary page. Dictionary page is not" |
| " supported for booleans."); |
| } |
| |
| const parquet::DictionaryPageHeader* dict_header = nullptr; |
| if (current_page_header_.__isset.dictionary_page_header) { |
| dict_header = ¤t_page_header_.dictionary_page_header; |
| } else { |
| if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) { |
| return Status("Dictionary page does not have dictionary header set."); |
| } |
| } |
| if (dict_header != nullptr && |
| dict_header->encoding != Encoding::PLAIN && |
| dict_header->encoding != Encoding::PLAIN_DICTIONARY) { |
| return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported " |
| "for dictionary pages."); |
| } |
| |
| if (!stream_->ReadBytes(data_size, &data_, &status)) return status; |
| data_end_ = data_ + data_size; |
| |
| // The size of dictionary can be 0, if every value is null. The dictionary still has to |
| // be reset in this case. |
| DictDecoderBase* dict_decoder; |
| if (current_page_header_.uncompressed_page_size == 0) { |
| return CreateDictionaryDecoder(nullptr, 0, &dict_decoder); |
| } |
| |
| // There are 3 different cases from the aspect of memory management: |
| // 1. If the column type is string, the dictionary will contain pointers to a buffer, |
| // so the buffer's lifetime must be as long as any row batch that references it. |
| // 2. If the column type is not string, and the dictionary page is compressed, then a |
| // temporary buffer is needed for the uncompressed values. |
| // 3. If the column type is not string, and the dictionary page is not compressed, |
| // then no buffer is necessary. |
| ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker()); |
| uint8_t* dict_values = nullptr; |
| if (decompressor_.get() != nullptr || slot_desc_->type().IsStringType()) { |
| int buffer_size = current_page_header_.uncompressed_page_size; |
| if (slot_desc_->type().IsStringType()) { |
| dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1. |
| } else if (uncompressed_buffer.TryAllocate(buffer_size)) { |
| dict_values = uncompressed_buffer.buffer(); // case 2 |
| } |
| if (UNLIKELY(dict_values == nullptr)) { |
| string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "InitDictionary", |
| buffer_size, "dictionary"); |
| return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( |
| parent_->state_, details, buffer_size); |
| } |
| } else { |
| dict_values = data_; // case 3. |
| } |
| |
| if (decompressor_.get() != nullptr) { |
| int uncompressed_size = current_page_header_.uncompressed_page_size; |
| RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_, |
| &uncompressed_size, &dict_values)); |
| VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size; |
| if (current_page_header_.uncompressed_page_size != uncompressed_size) { |
| return Status(Substitute("Error decompressing dictionary page in file '$0'. " |
| "Expected $1 uncompressed bytes but got $2", filename(), |
| current_page_header_.uncompressed_page_size, uncompressed_size)); |
| } |
| } else { |
| if (current_page_header_.uncompressed_page_size != data_size) { |
| return Status(Substitute("Error reading dictionary page in file '$0'. " |
| "Expected $1 bytes but got $2", filename(), |
| current_page_header_.uncompressed_page_size, data_size)); |
| } |
| if (slot_desc_->type().IsStringType()) memcpy(dict_values, data_, data_size); |
| } |
| |
| RETURN_IF_ERROR(CreateDictionaryDecoder( |
| dict_values, current_page_header_.uncompressed_page_size, &dict_decoder)); |
| if (dict_header != nullptr && |
| dict_header->num_values != dict_decoder->num_entries()) { |
| return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), |
| slot_desc_->type().DebugString(), |
| Substitute("Expected $0 entries but data contained $1 entries", |
| dict_header->num_values, dict_decoder->num_entries())); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BaseScalarColumnReader::InitDictionaries( |
| const vector<BaseScalarColumnReader*> readers) { |
| for (BaseScalarColumnReader* reader : readers) { |
| RETURN_IF_ERROR(reader->InitDictionary()); |
| } |
| return Status::OK(); |
| } |
| |
| Status BaseScalarColumnReader::ReadDataPage() { |
| // We're about to move to the next data page. The previous data page is |
| // now complete, free up any memory allocated for it. If the data page contained |
| // strings we need to attach it to the returned batch. |
| if (PageContainsTupleData(page_encoding_)) { |
| parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(), false); |
| } else { |
| data_page_pool_->FreeAll(); |
| } |
| // We don't hold any pointers to earlier pages in the stream - we can safely free |
| // any I/O or boundary buffer. |
| stream_->ReleaseCompletedResources(false); |
| |
| // Read the next data page, skipping page types we don't care about. |
| // We break out of this loop on the non-error case (a data page was found or we read all |
| // the pages). |
| while (true) { |
| DCHECK_EQ(num_buffered_values_, 0); |
| if (num_values_read_ == metadata_->num_values) { |
| // No more pages to read |
| // TODO: should we check for stream_->eosr()? |
| break; |
| } else if (num_values_read_ > metadata_->num_values) { |
| ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, |
| metadata_->num_values, num_values_read_, node_.element->name, filename()); |
| RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg)); |
| return Status::OK(); |
| } |
| |
| bool eos; |
| uint32_t header_size; |
| RETURN_IF_ERROR(ReadPageHeader(false /* peek */, ¤t_page_header_, |
| &header_size, &eos)); |
| if (eos) return Status::OK(); |
| |
| if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) { |
| // Any dictionary is already initialized, as InitDictionary has already |
| // been called. There are two possibilities: |
| // 1. The parquet file has two dictionary pages |
| // OR |
| // 2. The parquet file does not have the dictionary as the first data page. |
| // Both are errors in the parquet file. |
| if (HasDictionaryDecoder()) { |
| return Status(Substitute("Corrupt Parquet file '$0': multiple dictionary pages " |
| "for column '$1'", filename(), schema_element().name)); |
| } else { |
| return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for " |
| "column '$1' is not the first page", filename(), schema_element().name)); |
| } |
| } |
| |
| Status status; |
| int data_size = current_page_header_.compressed_page_size; |
| if (current_page_header_.type != parquet::PageType::DATA_PAGE) { |
| // We can safely skip non-data pages |
| if (!stream_->SkipBytes(data_size, &status)) return status; |
| continue; |
| } |
| |
| // Read Data Page |
| // TODO: when we start using page statistics, we will need to ignore certain corrupt |
| // statistics. See IMPALA-2208 and PARQUET-251. |
| if (!stream_->ReadBytes(data_size, &data_, &status)) return status; |
| data_end_ = data_ + data_size; |
| int num_values = current_page_header_.data_page_header.num_values; |
| if (num_values < 0) { |
| return Status(Substitute("Error reading data page in Parquet file '$0'. " |
| "Invalid number of values in metadata: $1", filename(), num_values)); |
| } |
| num_buffered_values_ = num_values; |
| num_values_read_ += num_buffered_values_; |
| |
| int uncompressed_size = current_page_header_.uncompressed_page_size; |
| if (decompressor_.get() != nullptr) { |
| SCOPED_TIMER(parent_->decompress_timer_); |
| uint8_t* decompressed_buffer; |
| RETURN_IF_ERROR(AllocateUncompressedDataPage( |
| uncompressed_size, "decompressed data", &decompressed_buffer)); |
| RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, |
| current_page_header_.compressed_page_size, data_, &uncompressed_size, |
| &decompressed_buffer)); |
| VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size |
| << " to " << uncompressed_size; |
| if (current_page_header_.uncompressed_page_size != uncompressed_size) { |
| return Status(Substitute("Error decompressing data page in file '$0'. " |
| "Expected $1 uncompressed bytes but got $2", filename(), |
| current_page_header_.uncompressed_page_size, uncompressed_size)); |
| } |
| data_ = decompressed_buffer; |
| data_size = current_page_header_.uncompressed_page_size; |
| data_end_ = data_ + data_size; |
| } else { |
| DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED); |
| if (current_page_header_.compressed_page_size != uncompressed_size) { |
| return Status(Substitute("Error reading data page in file '$0'. " |
| "Expected $1 bytes but got $2", filename(), |
| current_page_header_.compressed_page_size, uncompressed_size)); |
| } |
| if (PageContainsTupleData(current_page_header_.data_page_header.encoding)) { |
| // In this case returned batches will have pointers into the data page itself. |
| // We don't transfer disk I/O buffers out of the scanner so we need to copy |
| // the page data so that it can be attached to output batches. |
| uint8_t* copy_buffer; |
| RETURN_IF_ERROR(AllocateUncompressedDataPage( |
| uncompressed_size, "uncompressed variable-length data", ©_buffer)); |
| memcpy(copy_buffer, data_, uncompressed_size); |
| data_ = copy_buffer; |
| data_end_ = data_ + uncompressed_size; |
| } |
| } |
| |
| // Initialize the repetition level data |
| RETURN_IF_ERROR(rep_levels_.Init(filename(), |
| current_page_header_.data_page_header.repetition_level_encoding, |
| parent_->perm_pool_.get(), parent_->state_->batch_size(), |
| max_rep_level(), num_buffered_values_, |
| &data_, &data_size)); |
| |
| // Initialize the definition level data |
| RETURN_IF_ERROR(def_levels_.Init(filename(), |
| current_page_header_.data_page_header.definition_level_encoding, |
| parent_->perm_pool_.get(), parent_->state_->batch_size(), |
| max_def_level(), num_buffered_values_, &data_, &data_size)); |
| |
| // Data can be empty if the column contains all NULLs |
| RETURN_IF_ERROR(InitDataPage(data_, data_size)); |
| break; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size, |
| const char* err_ctx, uint8_t** buffer) { |
| *buffer = data_page_pool_->TryAllocate(size); |
| if (*buffer == nullptr) { |
| string details = |
| Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx); |
| return data_page_pool_->mem_tracker()->MemLimitExceeded( |
| parent_->state_, details, size); |
| } |
| return Status::OK(); |
| } |
| |
| template <bool ADVANCE_REP_LEVEL> |
| bool BaseScalarColumnReader::NextLevels() { |
| if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); |
| |
| if (UNLIKELY(num_buffered_values_ == 0)) { |
| if (!NextPage()) return parent_->parse_status_.ok(); |
| } |
| --num_buffered_values_; |
| |
| // Definition level is not present if column and any containing structs are required. |
| def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel(); |
| // The compiler can optimize these two conditions into a single branch by treating |
| // def_level_ as unsigned. |
| if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) { |
| SetLevelDecodeError("def", def_level_, max_def_level()); |
| return false; |
| } |
| |
| if (ADVANCE_REP_LEVEL && max_rep_level() > 0) { |
| // Repetition level is only present if this column is nested in any collection type. |
| rep_level_ = rep_levels_.ReadLevel(); |
| if (UNLIKELY(rep_level_ < 0 || rep_level_ > max_rep_level())) { |
| SetLevelDecodeError("rep", rep_level_, max_rep_level()); |
| return false; |
| } |
| // Reset position counter if we are at the start of a new parent collection. |
| if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; |
| } |
| |
| return parent_->parse_status_.ok(); |
| } |
| |
| Status BaseScalarColumnReader::GetUnsupportedDecodingError() { |
| return Status(Substitute( |
| "File '$0' is corrupt: unexpected encoding: $1 for data page of column '$2'.", |
| filename(), PrintThriftEnum(page_encoding_), schema_element().name)); |
| } |
| |
| bool BaseScalarColumnReader::NextPage() { |
| parent_->assemble_rows_timer_.Stop(); |
| parent_->parse_status_ = ReadDataPage(); |
| if (UNLIKELY(!parent_->parse_status_.ok())) return false; |
| if (num_buffered_values_ == 0) { |
| rep_level_ = HdfsParquetScanner::ROW_GROUP_END; |
| def_level_ = HdfsParquetScanner::ROW_GROUP_END; |
| pos_current_value_ = HdfsParquetScanner::INVALID_POS; |
| return false; |
| } |
| parent_->assemble_rows_timer_.Start(); |
| return true; |
| } |
| |
| void BaseScalarColumnReader::SetLevelDecodeError(const char* level_name, |
| int decoded_level, int max_level) { |
| if (decoded_level < 0) { |
| DCHECK_EQ(decoded_level, HdfsParquetScanner::INVALID_LEVEL); |
| parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': " |
| "could not read all $1 levels for column '$2'", filename(), |
| level_name, schema_element().name))); |
| } else { |
| parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': " |
| "invalid $1 level $2 > max $1 level $3 for column '$4'", filename(), |
| level_name, decoded_level, max_level, schema_element().name))); |
| } |
| } |
| |
| bool CollectionColumnReader::NextLevels() { |
| DCHECK(!children_.empty()); |
| DCHECK_LE(rep_level_, new_collection_rep_level()); |
| for (int c = 0; c < children_.size(); ++c) { |
| do { |
| // TODO(skye): verify somewhere that all column readers are at end |
| if (!children_[c]->NextLevels()) return false; |
| } while (children_[c]->rep_level() > new_collection_rep_level()); |
| } |
| UpdateDerivedState(); |
| return true; |
| } |
| |
| bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) { |
| DCHECK_GE(rep_level_, 0); |
| DCHECK_GE(def_level_, 0); |
| DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << |
| "Caller should have called NextLevels() until we are ready to read a value"; |
| |
| if (tuple_offset_ == -1) { |
| return CollectionColumnReader::NextLevels(); |
| } else if (def_level_ >= max_def_level()) { |
| return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool); |
| } else { |
| // Null value |
| tuple->SetNull(null_indicator_offset_); |
| return CollectionColumnReader::NextLevels(); |
| } |
| } |
| |
| bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) { |
| return CollectionColumnReader::ReadValue(pool, tuple); |
| } |
| |
| bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) { |
| DCHECK(!children_.empty()); |
| DCHECK_LE(rep_level_, new_collection_rep_level()); |
| |
| // Recursively read the collection into a new CollectionValue. |
| *slot = CollectionValue(); |
| CollectionValueBuilder builder( |
| slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_); |
| bool continue_execution = parent_->AssembleCollection( |
| children_, new_collection_rep_level(), &builder); |
| if (!continue_execution) return false; |
| |
| // AssembleCollection() advances child readers, so we don't need to call NextLevels() |
| UpdateDerivedState(); |
| return true; |
| } |
| |
| void CollectionColumnReader::UpdateDerivedState() { |
| // We don't need to cap our def_level_ at max_def_level(). We always check def_level_ |
| // >= max_def_level() to check if the collection is defined. |
| // TODO(skye): consider capping def_level_ at max_def_level() |
| def_level_ = children_[0]->def_level(); |
| rep_level_ = children_[0]->rep_level(); |
| |
| // All children should have been advanced to the beginning of the next collection |
| for (int i = 0; i < children_.size(); ++i) { |
| DCHECK_EQ(children_[i]->rep_level(), rep_level_); |
| if (def_level_ < max_def_level()) { |
| // Collection not defined |
| FILE_CHECK_EQ(children_[i]->def_level(), def_level_); |
| } else { |
| // Collection is defined |
| FILE_CHECK_GE(children_[i]->def_level(), max_def_level()); |
| } |
| } |
| |
| if (RowGroupAtEnd()) { |
| // No more values |
| pos_current_value_ = HdfsParquetScanner::INVALID_POS; |
| } else if (rep_level_ <= max_rep_level() - 2) { |
| // Reset position counter if we are at the start of a new parent collection (i.e., |
| // the current collection is the first item in a new parent collection). |
| pos_current_value_ = 0; |
| } |
| } |
| |
| /// Returns a column reader for decimal types based on its size and parquet type. |
| static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node, |
| const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) { |
| switch (node.element->type) { |
| case parquet::Type::FIXED_LEN_BYTE_ARRAY: |
| switch (slot_desc->type().GetByteSize()) { |
| case 4: |
| return new ScalarColumnReader<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY, |
| true>(parent, node, slot_desc); |
| case 8: |
| return new ScalarColumnReader<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY, |
| true>(parent, node, slot_desc); |
| case 16: |
| return new ScalarColumnReader<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY, |
| true>(parent, node, slot_desc); |
| } |
| break; |
| case parquet::Type::BYTE_ARRAY: |
| switch (slot_desc->type().GetByteSize()) { |
| case 4: |
| return new ScalarColumnReader<Decimal4Value, parquet::Type::BYTE_ARRAY, true>( |
| parent, node, slot_desc); |
| case 8: |
| return new ScalarColumnReader<Decimal8Value, parquet::Type::BYTE_ARRAY, true>( |
| parent, node, slot_desc); |
| case 16: |
| return new ScalarColumnReader<Decimal16Value, parquet::Type::BYTE_ARRAY, true>( |
| parent, node, slot_desc); |
| } |
| break; |
| default: |
| DCHECK(false) << "Invalid decimal primitive type"; |
| } |
| DCHECK(false) << "Invalid decimal type"; |
| return nullptr; |
| } |
| |
| ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node, |
| bool is_collection_field, const SlotDescriptor* slot_desc, |
| HdfsParquetScanner* parent) { |
| ParquetColumnReader* reader = nullptr; |
| if (is_collection_field) { |
| // Create collection reader (note this handles both NULL and non-NULL 'slot_desc') |
| reader = new CollectionColumnReader(parent, node, slot_desc); |
| } else if (slot_desc != nullptr) { |
| // Create the appropriate ScalarColumnReader type to read values into 'slot_desc' |
| switch (slot_desc->type().type) { |
| case TYPE_BOOLEAN: |
| reader = new BoolColumnReader(parent, node, slot_desc); |
| break; |
| case TYPE_TINYINT: |
| reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node, |
| slot_desc); |
| break; |
| case TYPE_SMALLINT: |
| reader = new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node, |
| slot_desc); |
| break; |
| case TYPE_INT: |
| reader = new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node, |
| slot_desc); |
| break; |
| case TYPE_BIGINT: |
| reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent, node, |
| slot_desc); |
| break; |
| case TYPE_FLOAT: |
| reader = new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node, |
| slot_desc); |
| break; |
| case TYPE_DOUBLE: |
| reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent, node, |
| slot_desc); |
| break; |
| case TYPE_TIMESTAMP: |
| reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>( |
| parent, node, slot_desc); |
| break; |
| case TYPE_STRING: |
| case TYPE_VARCHAR: |
| case TYPE_CHAR: |
| reader = new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>( |
| parent, node, slot_desc); |
| break; |
| case TYPE_DECIMAL: |
| reader = GetDecimalColumnReader(node, slot_desc, parent); |
| break; |
| default: |
| DCHECK(false) << slot_desc->type().DebugString(); |
| } |
| } else { |
| // Special case for counting scalar values (e.g. count(*), no materialized columns in |
| // the file, only materializing a position slot). We won't actually read any values, |
| // only the rep and def levels, so it doesn't matter what kind of reader we make. |
| reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node, |
| slot_desc); |
| } |
| return parent->obj_pool_.Add(reader); |
| } |
| |
| } |