blob: 1d035bfe200c87b3e7293b97edc907b9f644a1f6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_PARQUET_COLUMN_READERS_H
#define IMPALA_PARQUET_COLUMN_READERS_H
#include <boost/scoped_ptr.hpp>
#include "exec/parquet/hdfs-parquet-scanner.h"
#include "exec/parquet/parquet-level-decoder.h"
#include "exec/parquet/parquet-column-chunk-reader.h"
namespace impala {
class DictDecoderBase;
class Tuple;
class MemPool;
/// Base class for reading a Parquet column. Reads a logical column, not necessarily a
/// column materialized in the file (e.g. collections). The two subclasses are
/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep
/// level pair at a time. The current def and rep level are exposed to the user, and the
/// corresponding value (if defined) can optionally be copied into a slot via
/// ReadValue(). Can also write position slots.
///
/// The constructor adds the object to the obj_pool of the parent HdfsParquetScanner.
class ParquetColumnReader {
public:
/// Creates a column reader for 'node' and associates it with the given parent scanner.
/// The constructor of column readers add the new object to the parent's object pool.
/// 'slot_desc' may be NULL, in which case the returned column reader can only be used
/// to read def/rep levels.
/// 'is_collection_field' should be set to true if the returned reader is reading a
/// collection. This cannot be determined purely by 'node' because a repeated scalar
/// node represents both an array and the array's items (in this case
/// 'is_collection_field' should be true if the reader reads one value per array, and
/// false if it reads one value per item). The reader is added to the runtime state's
/// object pool. Does not create child readers for collection readers; these must be
/// added by the caller.
///
/// It supports the following primitive type widening that does not have any loss of
/// precision.
/// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64), double (DOUBLE)
/// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
/// - int (INT32) -> bigint (INT64), double (DOUBLE)
/// - float (FLOAT) -> double (DOUBLE)
static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field,
const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& node,
const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
virtual ~ParquetColumnReader() { }
int def_level() const { return def_level_; }
int rep_level() const { return rep_level_; }
const SlotDescriptor* slot_desc() const { return slot_desc_; }
const parquet::SchemaElement& schema_element() const { return *node_.element; }
int16_t max_def_level() const { return max_def_level_; }
int16_t max_rep_level() const { return max_rep_level_; }
int def_level_of_immediate_repeated_ancestor() const {
return node_.def_level_of_immediate_repeated_ancestor;
}
const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
DCHECK(pos_slot_desc_ == NULL);
pos_slot_desc_ = pos_slot_desc;
}
/// Returns true if this reader materializes collections (i.e. CollectionValues).
virtual bool IsCollectionReader() const = 0;
const char* filename() const { return parent_->filename(); }
/// Read the current value (or null) into 'tuple' for this column. This should only be
/// called when a value is defined, i.e., def_level() >=
/// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce
/// no output values), otherwise NextLevels() should be called instead.
///
/// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be
/// called after calling ReadValue()).
///
/// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
/// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
/// true.
///
/// NextLevels() must be called on this reader before calling ReadValue() for the first
/// time. This is to initialize the current value that ReadValue() will read.
///
/// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue())
/// The codegened functions from all the materialized cols will then be combined
/// into one function.
/// TODO: another option is to materialize col by col for the entire row batch in
/// one call. e.g. MaterializeCol would write out 1024 values. Our row batches
/// are currently dense so we'll need to figure out something there.
virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
/// Same as ReadValue() but does not advance repetition level. Only valid for columns
/// not in collections.
virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
/// Batched version of ReadValue() that reads up to max_values at once and materializes
/// them into tuples in tuple_mem. Returns the number of values actually materialized
/// in *num_values. The return value, error behavior and state changes are generally
/// the same as in ReadValue(). For example, if an error occurs in the middle of
/// materializing a batch then false is returned, and num_values, tuple_mem, as well as
/// this column reader are left in an undefined state, assuming that the caller will
/// immediately abort execution. NextLevels() does *not* need to be called before
/// ReadValueBatch(), unlike ReadValue().
virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
uint8_t* tuple_mem, int* num_values) = 0;
/// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and
/// materializes them into tuples in tuple_mem.
/// The return value and error behavior are the same as in ReadValueBatch().
virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
uint8_t* tuple_mem, int* num_values) = 0;
/// Advances this column reader's def and rep levels to the next logical value, i.e. to
/// the next scalar value or the beginning of the next collection, without attempting to
/// read the value. This is used to skip past def/rep levels that don't materialize a
/// value, such as the def/rep levels corresponding to an empty containing collection.
///
/// NextLevels() must be called on this reader before calling ReadValue() for the first
/// time. This is to initialize the current value that ReadValue() will read.
///
/// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
/// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
/// true.
virtual bool NextLevels() = 0;
/// Writes pos_current_value_ (i.e. "reads" the synthetic position field of the
/// parent collection) to 'pos' and increments pos_current_value_. Only valid to
/// call when doing non-batched reading, i.e. NextLevels() must have been called
/// before each call to this function to advance to the next element in the
/// collection.
inline void ReadPositionNonBatched(int64_t* pos);
/// Returns true if this column reader has reached the end of the row group.
inline bool RowGroupAtEnd() {
DCHECK_EQ(rep_level_ == ParquetLevel::ROW_GROUP_END,
def_level_ == ParquetLevel::ROW_GROUP_END);
return rep_level_ == ParquetLevel::ROW_GROUP_END;
}
/// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it,
/// and frees up other resources. If 'row_batch' is NULL frees all resources instead.
virtual void Close(RowBatch* row_batch) = 0;
protected:
HdfsParquetScanner* parent_;
const SchemaNode& node_;
const SlotDescriptor* const slot_desc_;
/// The slot descriptor for the position field of the tuple, if there is one. NULL if
/// there's not. Only one column reader for a given tuple desc will have this set.
const SlotDescriptor* pos_slot_desc_;
/// The next value to write into the position slot, if there is one. 64-bit int because
/// the pos slot is always a BIGINT Set to ParquetLevel::INVALID_POS when this column
/// reader does not have a current rep and def level (i.e. before the first NextLevels()
/// call or after the last value in the column has been read).
int64_t pos_current_value_;
/// The current repetition and definition levels of this reader. Advanced via
/// ReadValue() and NextLevels(). Set to ParquetLevel:: INVALID_LEVEL before the first
/// NextLevels() call for a row group or if an error is encountered decoding a level.
/// Set to ROW_GROUP_END after the last value in the column has been read). If this is
/// not inside a collection, rep_level_ is always 0, ParquetLevel::INVALID_LEVEL or
/// ParquetLevel::ROW_GROUP_END.
/// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values
/// ParquetLevel::INVALID_LEVEL and ParquetLevel::ROW_GROUP_END. The maximum values are
/// cached here because they are accessed in inner loops.
int16_t rep_level_;
const int16_t max_rep_level_;
int16_t def_level_;
const int16_t max_def_level_;
// Cache frequently accessed members of slot_desc_ for perf.
/// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
const int tuple_offset_;
/// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
const NullIndicatorOffset null_indicator_offset_;
ParquetColumnReader(
HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
: parent_(parent),
node_(node),
slot_desc_(slot_desc),
pos_slot_desc_(NULL),
pos_current_value_(ParquetLevel::INVALID_POS),
rep_level_(ParquetLevel::INVALID_LEVEL),
max_rep_level_(node_.max_rep_level),
def_level_(ParquetLevel::INVALID_LEVEL),
max_def_level_(node_.max_def_level),
tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() :
slot_desc->null_indicator_offset()) {
DCHECK(parent != nullptr);
parent->obj_pool_.Add(this);
DCHECK_GE(node_.max_rep_level, 0);
DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
DCHECK_GE(node_.max_def_level, 0);
DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
// rep_level_ is always valid and equal to 0 if col not in collection.
if (max_rep_level() == 0) rep_level_ = 0;
}
/// Called in the middle of creating a scratch tuple batch to simulate failures
/// such as exceeding memory limit or cancellation. Returns false if the debug
/// action deems that the parquet column reader should halt execution. 'val_count'
/// is the counter which the column reader uses to track the number of tuples
/// produced so far. If the column reader should halt execution, 'parse_status_'
/// is updated with the error status and 'val_count' is set to 0.
inline bool ColReaderDebugAction(int* val_count);
};
/// Reader for a single column from the parquet file. It's associated with a
/// ScannerContext::Stream and is responsible for decoding the data. Super class for
/// per-type column readers. This contains most of the logic, the type specific functions
/// must be implemented in the subclass.
class BaseScalarColumnReader : public ParquetColumnReader {
public:
BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
: ParquetColumnReader(parent, node, slot_desc),
col_chunk_reader_(parent, node.element->name,
slot_desc != nullptr ? slot_desc->id() : -1, PageReaderValueMemoryType()) {
DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
}
virtual ~BaseScalarColumnReader() { }
virtual bool IsCollectionReader() const { return false; }
/// Resets the reader for each row group in the file and creates the scan
/// range for the column, but does not start it. To start scanning,
/// set_io_reservation() must be called to assign reservation to this
/// column, followed by StartScan().
Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
int row_group_idx);
/// Starts the column scan range. The reader must be Reset() and have a
/// reservation assigned via set_io_reservation(). This must be called
/// before any of the column data can be read (including dictionary and
/// data pages). Returns an error status if there was an error starting the
/// scan or allocating buffers for it.
Status StartScan() { return col_chunk_reader_.StartScan(); }
/// Helper to start scans for multiple columns at once.
static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan());
return Status::OK();
}
virtual void Close(RowBatch* row_batch);
io::ScanRange* scan_range() const { return col_chunk_reader_.scan_range(); }
int64_t total_len() const { return metadata_->total_compressed_size; }
int col_idx() const { return node_.col_idx; }
THdfsCompression::type codec() const {
if (metadata_ == NULL) return THdfsCompression::NONE;
return ConvertParquetToImpalaCodec(metadata_->codec);
}
void set_io_reservation(int bytes) { col_chunk_reader_.set_io_reservation(bytes); }
/// Reads the next definition and repetition levels for this column. Initializes the
/// next data page if necessary.
virtual bool NextLevels() { return NextLevels<true>(); }
/// Check the data stream to see if there is a dictionary page. If there is,
/// use that page to initialize dict_decoder_ and advance the data stream
/// past the dictionary page.
Status InitDictionary();
/// Convenience function to initialize multiple dictionaries.
static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers);
// Returns the dictionary or NULL if the dictionary doesn't exist
virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
// Returns whether the datatype for this column requires conversion from the on-disk
// format for correctness. For example, timestamps can require an offset to be
// applied.
virtual bool NeedsConversion() { return false; }
// Returns whether the datatype for this column requires validation. For example,
// the timestamp format has certain bit combinations that are invalid, and these
// need to be validated when read from disk.
virtual bool NeedsValidation() { return false; }
// TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if
// we know this row can be skipped. This could be very useful with stats and big
// sections can be skipped. Implement that when we can benefit from it.
protected:
// Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup())
friend class HdfsParquetScanner;
ParquetColumnChunkReader col_chunk_reader_;
// Class members that are accessed for every column should be included up here so they
// fit in as few cache lines as possible.
/// Pointer to start of next value in data page
uint8_t* data_ = nullptr;
/// End of the data page.
const uint8_t* data_end_ = nullptr;
/// Decoder for definition levels.
ParquetLevelDecoder def_levels_{true};
/// Decoder for repetition levels.
ParquetLevelDecoder rep_levels_{false};
/// Page encoding for values of the current data page. Cached here for perf. Set in
/// InitDataPage().
parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
/// Num values remaining in the current data page
int num_buffered_values_ = 0;
// Less frequently used members that are not accessed in inner loop should go below
// here so they do not occupy precious cache line space.
/// The number of values seen so far. Updated per data page. It is only used for
/// validation. It is not used when we filter rows based on the page index.
int64_t num_values_read_ = 0;
/// Metadata for the column for the current row group.
const parquet::ColumnMetaData* metadata_ = nullptr;
/////////////////////////////////////////
/// BEGIN: Members used for page filtering
/// They are not set when we don't filter out pages at all.
/// The parquet OffsetIndex of this column chunk. It stores information about the page
/// locations and row indexes.
parquet::OffsetIndex offset_index_;
/// Collection of page indexes that we are going to read. When we use page filtering,
/// we issue a scan-range with sub-ranges that belong to the candidate data pages, i.e.
/// we will not even see the bytes of the filtered out pages.
/// It is set in HdfsParquetScanner::CalculateCandidatePagesForColumns().
std::vector<int> candidate_data_pages_;
/// Stores an index to 'candidate_data_pages_'. It is the currently read data page when
/// we have candidate pages.
int candidate_page_idx_ = -1;
/// Stores an index to 'parent_->candidate_ranges_'. When we have candidate pages, we
/// are processing values in this range. When we leave this range, then we need to skip
/// rows and increment this field.
int current_row_range_ = 0;
/// Index of the current top-level row. It is updated together with the rep/def levels.
/// When updated, and its value is N, it means that we already processed the Nth row
/// completely.
int64_t current_row_ = -1;
/// This flag is needed for the proper tracking of the last processed row.
/// The batched and non-batched interfaces behave differently. E.g. when using the
/// batched interface you don't need to invoke NextLevels() in advance, while you need
/// to do that for the non-batched interface. In fact, the batched interface doesn't
/// call NextLevels() at all. It directly reads the levels then the corresponding value
/// in a loop. On the other hand, the non-batched interface (ReadValue()) expects that
/// the levels for the next value are already read via NextLevels(). And after reading
/// the value it calls NextLevels() to read the levels of the next value. Hence, the
/// levels are always read ahead in this case.
/// Returns true, if we read ahead def and rep levels. In this case 'current_row_'
/// points to the row we'll process next, not to the row we already processed.
bool levels_readahead_ = false;
/// END: Members used for page filtering
/////////////////////////////////////////
/// Reads the next page header into next_page_header/next_header_size.
/// If the stream reaches the end before reading a complete page header,
/// eos is set to true. If peek is false, the stream position is advanced
/// past the page header. If peek is true, the stream position is not moved.
/// Returns an error status if the next page header could not be read.
Status ReadPageHeader(bool peek, parquet::PageHeader* next_page_header,
uint32_t* next_header_size, bool* eos);
/// Read the next data page. If a dictionary page is encountered, that will be read and
/// this function will continue reading the next data page.
Status ReadDataPage();
/// Try to move the the next page and buffer more values. Return false and
/// sets rep_level_, def_level_ and pos_current_value_ to -1 if no more pages or an
/// error encountered.
bool NextPage();
/// Implementation for NextLevels().
template <bool ADVANCE_REP_LEVEL>
bool NextLevels();
/// Creates a dictionary decoder from values/size. 'decoder' is set to point to a
/// dictionary decoder stored in this object. Subclass must implement this. Returns
/// an error status if the dictionary values could not be decoded successfully.
virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
DictDecoderBase** decoder) = 0;
/// Return true if the column has a dictionary decoder. Subclass must implement this.
virtual bool HasDictionaryDecoder() = 0;
/// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass
/// must implement this.
virtual void ClearDictionaryDecoder() = 0;
/// Initializes the reader with the data contents. This is the content for the entire
/// decompressed data page. Decoders can initialize state from here. The caller must
/// validate the input such that 'size' is non-negative and that 'data' has at least
/// 'size' bytes remaining.
virtual Status InitDataPage(uint8_t* data, int size) = 0;
ParquetColumnChunkReader::ValueMemoryType PageReaderValueMemoryType() {
if (slot_desc_ == nullptr) {
return ParquetColumnChunkReader::ValueMemoryType::NO_SLOT_DESC;
}
const ColumnType& col_type = slot_desc_->type();
if (col_type.IsStringType()) {
if (col_type.IsVarLenStringType()) {
return ParquetColumnChunkReader::ValueMemoryType::VAR_LEN_STR;
}
return ParquetColumnChunkReader::ValueMemoryType::FIXED_LEN_STR;
}
return ParquetColumnChunkReader::ValueMemoryType::SCALAR;
}
/// Resets structures related to page filtering.
void ResetPageFiltering();
/// Must be invoked when starting a new page. Updates the structures related to page
/// filtering and skips the first rows if needed.
Status StartPageFiltering();
/// Returns the index of the row that was processed most recently.
int64_t LastProcessedRow() const {
if (def_level_ == ParquetLevel::ROW_GROUP_END) return current_row_;
return levels_readahead_ ? current_row_ - 1 : current_row_;
}
/// Creates sub-ranges if page filtering is active.
void CreateSubRanges(std::vector<io::ScanRange::SubRange>* sub_ranges);
/// Calculates how many encoded values we need to skip in the page data, then
/// invokes SkipEncodedValuesInPage(). The number of the encoded values depends on the
/// nesting of the data, and also on the number of null values.
/// E.g. if 'num_rows' is 10, and every row contains an array of 10 integers, then
/// we need to skip 100 encoded values in the page data.
/// And, if 'num_rows' is 10, and every second value is NULL, then we only need to skip
/// 5 values in the page data since NULL values are not stored there.
/// The number of primitive values can be calculated from the def and rep levels.
/// Returns true on success, false otherwise.
bool SkipTopLevelRows(int64_t num_rows);
/// Skip values in the page data. Returns true on success, false otherwise.
virtual bool SkipEncodedValuesInPage(int64_t num_values) = 0;
/// Only valid to call this function when we filter out pages based on the Page index.
/// Returns the RowGroup-level index of the starting row in the candidate page.
int64_t FirstRowIdxInCurrentPage() const {
DCHECK(!candidate_data_pages_.empty());
return offset_index_.page_locations[
candidate_data_pages_[candidate_page_idx_]].first_row_index;
}
/// The number of top-level rows until the end of the current candidate range.
/// For simple columns it returns 0 if we have processed the last row in the current
/// range. For nested columns, it returns 0 when we are processing values from the last
/// row in the current row range.
int RowsRemainingInCandidateRange() const {
DCHECK(!candidate_data_pages_.empty());
return parent_->candidate_ranges_[current_row_range_].last - current_row_;
}
/// Returns true if we are filtering pages.
bool DoesPageFiltering() const {
return !candidate_data_pages_.empty();
}
bool IsLastCandidateRange() const {
return current_row_range_ == parent_->candidate_ranges_.size() - 1;
}
template <bool IN_COLLECTION>
bool ConsumedCurrentCandidateRange() {
return RowsRemainingInCandidateRange() == 0 &&
(!IN_COLLECTION || max_rep_level() == 0 || num_buffered_values_ == 0 ||
rep_levels_.PeekLevel() == 0);
}
/// This function fills the position slots up to 'max_values' or up to values belonging
/// to the current candidate range or up to cached repetition levels. It returns the
/// count of values until its limit. It can also be used without a position slot, in
/// that case it returns the number of values until the first limit it reaches.
/// It consumes cached repetition levels.
int FillPositionsInCandidateRange(int rows_remaining, int max_values,
uint8_t* RESTRICT tuple_mem, int tuple_size);
/// Advance to the next candidate range that contains 'current_row_'.
/// Cannot advance past the last candidate range.
void AdvanceCandidateRange();
/// Returns true if the current candidate row range has some rows in the current page.
bool PageHasRemainingCandidateRows() const;
/// Skip top level rows in current page until current candidate range is reached.
bool SkipRowsInPage();
/// Invoke this when there aren't any more interesting rows in the current page based
/// on the page index. It starts reading the next page.
bool JumpToNextPage();
/// Slow-path status construction code for def/rep decoding errors. 'level_name' is
/// either "rep" or "def", 'decoded_level' is the value returned from
/// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed value.
void __attribute__((noinline)) SetLevelDecodeError(const char* level_name,
int decoded_level, int max_level);
// Returns a detailed error message about unsupported encoding.
Status GetUnsupportedDecodingError();
Status LogCorruptNumValuesInMetadataError();
Status HandleTooEarlyEos();
};
// Inline to allow inlining into collection and scalar column reader.
inline void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
// NextLevels() should have already been called
DCHECK_GE(rep_level_, 0);
DCHECK_GE(def_level_, 0);
DCHECK_GE(pos_current_value_, 0);
DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
"Caller should have called NextLevels() until we are ready to read a value";
*pos = pos_current_value_++;
}
// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
// path doesn't falsely report that the file is corrupted.
// Inlined to avoid overhead in release builds.
inline bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
#ifndef NDEBUG
Status status = parent_->ScannerDebugAction();
if (!status.ok()) {
if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
*val_count = 0;
return false;
}
#endif
return true;
}
// Trigger debug action on every other call of Read*ValueBatch() once at least 128
// tuples have been produced to simulate failure such as exceeding memory limit.
// Triggering it every other call so as not to always fail on the first column reader
// when materializing multiple columns. Failing on non-empty row batch tests proper
// resources freeing by the Parquet scanner.
#ifndef NDEBUG
extern int parquet_column_reader_debug_count;
#define SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(num_tuples) \
((parquet_column_reader_debug_count++ % 2) == 1 && num_tuples >= 128)
#else
#define SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(x) (false)
#endif
} // namespace impala
#endif