blob: 8e5e7bc931a1560937b2b7c816df6111f95fd170 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HDFS_PARQUET_SCANNER_H
#define IMPALA_EXEC_HDFS_PARQUET_SCANNER_H
#include "codegen/impala-ir.h"
#include "exec/hdfs-scanner.h"
#include "exec/parquet/parquet-column-stats.h"
#include "exec/parquet/parquet-common.h"
#include "exec/parquet/parquet-metadata-utils.h"
#include "exec/parquet/parquet-page-index.h"
#include "exec/parquet/parquet-scratch-tuple-batch.h"
#include "runtime/scoped-buffer.h"
#include "util/runtime-profile-counters.h"
namespace impala {
class CollectionValueBuilder;
struct HdfsFileDesc;
/// Internal schema representation and resolution.
struct SchemaNode;
/// Class that implements Parquet definition and repetition level decoding.
class ParquetLevelDecoder;
/// Per column reader.
class ParquetColumnReader;
class CollectionColumnReader;
class ColumnStatsReader;
class BaseScalarColumnReader;
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
class ScalarColumnReader;
class BoolColumnReader;
class ParquetPageReader;
/// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
/// the Impala in-memory representation of data, e.g. (tuples, rows, row batches).
/// For the file format spec, see: github.com/apache/parquet-format
///
/// ---- Schema resolution ----
/// Additional columns are allowed at the end in either the table or file schema (i.e.,
/// extra columns at the end of the schema or extra fields at the end of a struct). If
/// there are extra columns in the file schema, they are simply ignored. If there are
/// extra in the table schema, we return NULLs for those columns (if they're
/// materialized).
///
/// ---- Disk IO ----
/// Parquet (and other columnar formats) use scan ranges differently than other formats.
/// Each materialized column maps to a single ScanRange per row group. For streaming
/// reads, all the columns need to be read in parallel. This is done by issuing one
/// ScanRange (in IssueInitialRanges()) for the file footer per split.
/// ProcessSplit() is called once for each original split and determines the row groups
/// whose midpoints fall within that split. We use the mid-point to determine whether a
/// row group should be processed because if the row group size is less than or equal to
/// the split size, the mid point guarantees that we have at least 50% of the row group in
/// the current split. ProcessSplit() then computes the column ranges for these row groups
/// and submits them to the IoMgr for immediate scheduling (so they don't surface in
/// RequestContext::GetNextUnstartedRange()). Scheduling them immediately also guarantees
/// they are all read at once.
///
/// Like the other scanners, each parquet scanner object is one to one with a
/// ScannerContext. Unlike the other scanners though, the context will have multiple
/// streams, one for each column. Row groups are processed one at a time this way.
///
/// ---- Nested types ----
/// This scanner supports reading and materializing nested data. For a good overview of
/// how nested data is encoded, see blog.twitter.com/2013/dremel-made-simple-with-parquet.
/// For how SQL nested schemas are translated to parquet schemas, see
/// github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types.
///
/// Examples:
/// For these examples, we will use the following table definition:
/// tbl:
/// id bigint
/// array_col array<array<int>>
///
/// The table definition could correspond to the following parquet schema (note the
/// required 'id' field. If written by Impala, all non-repeated fields would be optional,
/// but we can read repeated fields as well):
///
/// required group record d=0 r=0
/// req int64 id d=0 r=0
/// opt group array_col (LIST) d=1 r=0
/// repeated group list d=2 r=1
/// opt group item (LIST) d=3 r=1
/// repeated group list d=4 r=2
/// opt int32 item d=5 r=2
///
/// Each element in the schema has been annotated with the maximum def level and maximum
/// rep level corresponding to that element. Note that the repeated elements add a def
/// level. This distinguishes between 0 items (empty list) and more than 0 items
/// (non-empty list). The containing optional LIST element for each array determines
/// whether the whole list is null or non-null. Maps work the same way, the only
/// differences being that the repeated group contains two child fields ("key" and "value"
/// instead of "item"), and the outer element is annotated with MAP instead of LIST.
///
/// Only scalar schema elements are materialized in parquet files; internal nested
/// elements can be reconstructed using the def and rep levels. To illustrate this, here
/// is data containing every valid definition and repetition for the materialized int
/// 'item' element. The data records appear on the left, the encoded definition levels,
/// repetition levels, and values for the 'item' field appear on the right (the encoded
/// 'id' field is not shown).
///
/// record d r v
/// ------------------------------------
/// {id: 0, array_col: NULL} 0 0 -
/// {id: 1, array_col: []} 1 0 -
/// {id: 2, array_col: [NULL]} 2 0 -
/// {id: 3, array_col: [[]]} 3 0 -
/// {id: 4, array_col: [[NULL]]} 4 0 -
/// {id: 5, array_col: [[1, 5 0 1
/// NULL], 4 2 -
/// [2]]} 5 1 2
/// {id: 6, array_col: [[3]]} 5 0 3
///
/// * Example query 1:
/// select id, inner.item from tbl t, t.array_col outer, outer.item inner
/// Results from above sample data:
/// 4,NULL
/// 5,1
/// 5,NULL
/// 5,2
/// 6,3
///
/// Descriptors:
/// Tuple(id=0 tuple_path=[] slots=[
/// Slot(id=0 type=ARRAY col_path=[1] collection_item_tuple_id=1),
/// Slot(id=2 type=BIGINT col_path=[0])])
/// Tuple(id=1 tuple_path=[1] slots=[
/// Slot(id=1 type=ARRAY col_path=[1,0] collection_item_tuple_id=2)])
/// Tuple(id=2 tuple_path=[1, 0] slots=[
/// Slot(id=3 type=INT col_path=[1,0,0])])
///
/// The parquet scanner will materialize the following in-memory row batch:
/// RowBatch
/// +==========+
/// | 0 | NULL |
/// |----------|
/// | 1 | NULL | outer
/// |----------| +======+
/// | 2 | --------->| NULL |
/// | | | +======+
/// |----------|
/// | | | +======+
/// | 3 | --------->| NULL |
/// | | | +======+
/// | | | inner
/// |----------| +======+ +======+
/// | 4 | --------->| -------->| NULL |
/// | | | +======+ +======+
/// | | |
/// |----------| +======+ +======+
/// | 5 | --------->| -------->| 1 |
/// | | | | | +------+
/// | | | | | | NULL |
/// | | | +------+ +======+
/// | | | | |
/// | | | | | +======+
/// | | | | -------->| 2 |
/// | | | +======+ +======+
/// | | |
/// |----------| +======+ +======+
/// | 6 | --------->| -------->| 3 |
/// +==========+ +======+ +======+
///
/// The top-level row batch contains two slots, one containing the int64_t 'id' slot and
/// the other containing the CollectionValue 'array_col' slot. The CollectionValues in
/// turn contain pointers to their item tuple data. Each item tuple contains a single
/// ArrayColumn slot ('array_col.item'). The inner CollectionValues' item tuples contain
/// a single int 'item' slot.
///
/// Note that the scanner materializes a NULL CollectionValue for empty collections.
/// This is technically a bug (it should materialize a CollectionValue with num_tuples =
/// 0), but we don't distinguish between these two cases yet.
/// TODO: fix this (IMPALA-2272)
///
/// The column readers that materialize this structure form a tree analogous to the
/// materialized output:
/// CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)"
/// CollectionColumnReader slot_id=1 node="repeated group list (d=4 r=2)"
/// ScalarColumnReader<int32_t> slot_id=3 node="opt int32 item (d=5 r=2)"
/// ScalarColumnReader<int64_t> slot_id=2 node="req int64 id (d=0 r=0)"
///
/// Note that the collection column readers reference the "repeated group item" schema
/// element of the serialized array, not the outer "opt group" element. This is what
/// causes the bug described above, it should consider both elements.
///
/// * Example query 2:
/// select inner.item from tbl.array_col.item inner;
/// Results from the above sample data:
/// NULL
/// 1
/// NULL
/// 2
/// 3
///
/// Descriptors:
/// Tuple(id=0 tuple_path=[1, 0] slots=[
/// Slot(id=0 type=INT col_path=[1,0,0])])
///
/// In-memory row batch:
/// +======+
/// | NULL |
/// |------|
/// | 1 |
/// |------|
/// | NULL |
/// |------|
/// | 2 |
/// |------|
/// | 3 |
/// +======+
///
/// Column readers:
/// ScalarColumnReader<int32_t> slot_id=0 node="opt int32 item (d=5 r=2)"
///
/// In this example, the scanner doesn't materialize a nested in-memory result, since
/// only the single int 'item' slot is materialized. However, it still needs to read the
/// nested data as shown above. An important point to notice is that a tuple is not
/// materialized for every rep and def level pair read -- there are 9 of these pairs
/// total in the sample data above, but only 5 tuples are materialized. This is because
/// in this case, nothing should be materialized for NULL or empty arrays, since we're
/// only materializing the innermost item. If a def level is read that doesn't
/// correspond to any item value (NULL or otherwise), the scanner advances to the next
/// rep and def levels without materializing a tuple.
///
/// * Example query 3:
/// select id, inner.item from tbl t, t.array_col.item inner
/// Results from the above sample data (same as example 1):
/// 4,NULL
/// 5,1
/// 5,NULL
/// 5,2
/// 6,3
///
/// Descriptors:
/// Tuple(id=0 tuple_path=[] slots=[
/// Slot(id=0 type=ARRAY col_path=[2]),
/// Slot(id=1 type=BIGINT col_path=[0])])
/// Tuple(id=1 tuple_path=[2, 0] slots=[
/// Slot(id=2 type=INT col_path=[2,0,0])])
///
/// In-memory row batch:
/// RowBatch
/// +==========+
/// | 0 | NULL |
/// |----------|
/// | 1 | NULL |
/// |----------| inner
/// | 2 | --------->+======+
/// | | | +======+
/// |----------|
/// | | |
/// | 3 | --------->+======+
/// | | | +======+
/// | | |
/// |----------| +======+
/// | 4 | --------->| NULL |
/// | | | +======+
/// | | |
/// |----------| +======+
/// | 5 | --------->| 1 |
/// | | | +------+
/// | | | | NULL |
/// | | | +------+
/// | | | | 2 |
/// | | | +======+
/// | | |
/// |----------| +======+
/// | 6 | --------->| 3 |
/// +==========+ +======+
///
/// Column readers:
/// CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)"
/// ScalarColumnReader<int32_t> slot_id=2 node="opt int32 item (d=5 r=2)"
/// ScalarColumnReader<int32_t> id=1 node="req int64 id (d=0 r=0)"
///
/// In this example, the scanner materializes a "flattened" version of inner, rather
/// than the full 3-level structure. Note that the collection reader references the
/// outer array, which determines how long each materialized array is, and the items in
/// the array are from the inner array.
///
/// ---- Slot materialization ----
/// Top-level tuples:
/// The slots of top-level tuples are populated in a column-wise fashion. Each column
/// reader materializes a batch of values into a temporary 'scratch batch'. Once a
/// scratch batch has been fully populated, runtime filters and conjuncts are evaluated
/// against the scratch tuples, and the surviving tuples are set in the output batch that
/// is handed to the scan node. The ownership of tuple memory is transferred from a
/// scratch batch to an output row batch once all tuples in the scratch batch have either
/// been filtered or returned as part of an output batch.
///
/// Collection items:
/// Unlike the top-level tuples, the item tuples of CollectionValues are populated in
/// a row-wise fashion because doing it column-wise has the following challenges.
/// First, we would need to allocate a scratch batch for every collection-typed slot
/// which could consume a lot of memory. Then we'd need a similar mechanism to transfer
/// tuples that survive conjuncts to an output collection. However, CollectionValues lack
/// the row indirection that row batches have, so we would need to either deep copy the
/// surviving tuples, or come up with a different mechanism altogether.
/// TODO: Populating CollectionValues in a column-wise fashion seems different enough
/// and less critical for most of our users today to defer this task until later.
///
/// ---- Runtime filters ----
/// HdfsParquetScanner is able to apply runtime filters that arrive before or during
/// scanning. Filters are applied at both the row group (see AssembleRows()) and row (see
/// ReadRow()) scope. If all filter predicates do not pass, the row or row group will be
/// excluded from output. Only partition-column filters are applied at AssembleRows(). The
/// FilterContexts for these filters are cloned from the parent scan node and attached to
/// the ScannerContext.
///
/// ---- Page filtering ----
/// A Parquet file can contain a so called "page index". It has two parts, a column index
/// and an offset index. The column index contains statistics like minimum and maximum
/// values for each page. The offset index contains information about page locations in
/// the Parquet file and top-level row ranges. HdfsParquetScanner evaluates the min/max
/// conjuncts against the column index and determines the surviving pages with the help of
/// the offset index. Then it will configure the column readers to only scan the pages
/// and row ranges that have a chance to store rows that pass the conjuncts.
class HdfsParquetScanner : public HdfsScanner {
public:
HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsParquetScanner() {}
/// Issue just the footer range for each file. We'll then parse the footer and pick
/// out the columns we want. 'files' must not be empty.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files)
WARN_UNUSED_RESULT;
virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
virtual Status ProcessSplit() WARN_UNUSED_RESULT;
virtual void Close(RowBatch* row_batch);
/// Codegen ProcessScratchBatch(). Stores the resulting function in
/// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
static Status Codegen(HdfsScanNodeBase* node,
const std::vector<ScalarExpr*>& conjuncts,
llvm::Function** process_scratch_batch_fn)
WARN_UNUSED_RESULT;
/// Helper function to create ColumnStatsReader object. 'col_order' might be NULL.
ColumnStatsReader CreateColumnStatsReader(
const parquet::ColumnChunk& col_chunk, const ColumnType& col_type,
const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element);
/// Initializes a ParquetTimestampDecoder depending on writer, timezone, and the schema
/// of the column.
ParquetTimestampDecoder CreateTimestampDecoder(const parquet::SchemaElement& element);
/// Class name in LLVM IR.
static const char* LLVM_CLASS_NAME;
private:
friend class ParquetColumnReader;
friend class CollectionColumnReader;
friend class BaseScalarColumnReader;
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
friend class ScalarColumnReader;
friend class BoolColumnReader;
friend class HdfsParquetScannerTest;
friend class ParquetPageIndex;
friend class ParquetColumnChunkReader;
friend class ParquetPageReader;
/// Index of the current row group being processed. Initialized to -1 which indicates
/// that we have not started processing the first row group yet (GetNext() has not yet
/// been called).
int32_t row_group_idx_;
/// Counts the number of rows processed for the current row group.
int64_t row_group_rows_read_;
/// Indicates whether we should advance to the next row group in the next GetNext().
/// Starts out as true to move to the very first row group.
bool advance_row_group_;
boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
/// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
Tuple* min_max_tuple_;
/// Clone of Min/max statistics conjunct evaluators. Has the same life time as
/// the scanner. Stored in 'obj_pool_'.
vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
/// Pool used for allocating caches of definition/repetition levels and tuples for
/// dictionary filtering. The definition/repetition levels are populated by the
/// level readers. The pool is freed in Close().
boost::scoped_ptr<MemPool> perm_pool_;
/// Number of scratch batches processed so far.
int64_t row_batches_produced_;
/// Column reader for each top-level materialized slot in the output tuple.
std::vector<ParquetColumnReader*> column_readers_;
/// Column readers will write slot values into this scratch batch for
/// top-level tuples. See AssembleRows().
boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
/// File metadata thrift object
parquet::FileMetaData file_metadata_;
/// Version of the application that wrote this file.
ParquetFileVersion file_version_;
/// Scan range for the metadata.
const io::ScanRange* metadata_range_;
/// Pool to copy dictionary page buffer into. This pool is shared across all the
/// pages in a column chunk.
boost::scoped_ptr<MemPool> dictionary_pool_;
/// Contains the leftover ranges after evaluating the page index.
/// If all rows were eliminated, then the row group is skipped immediately after
/// evaluating the page index.
std::vector<RowRange> candidate_ranges_;
/// Column readers that are eligible for dictionary filtering.
/// These are pointers to elements of column_readers_. Materialized columns that are
/// dictionary encoded correspond to scalar columns that are either top-level columns
/// or nested within a collection. CollectionColumnReaders are not eligible for
/// dictionary filtering so are not included.
std::vector<BaseScalarColumnReader*> dict_filterable_readers_;
/// Column readers that are not eligible for dictionary filtering.
/// These are pointers to elements of column_readers_. The readers are either top-level
/// or nested within a collection.
std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
/// Flattened list of all scalar column readers in column_readers_.
std::vector<BaseScalarColumnReader*> scalar_readers_;
/// Flattened collection column readers that point to readers in column_readers_.
std::vector<CollectionColumnReader*> collection_readers_;
/// Mapping from Parquet column indexes to scalar readers.
std::unordered_map<int, BaseScalarColumnReader*> scalar_reader_map_;
/// Memory used to store the tuples used for dictionary filtering. Tuples owned by
/// perm_pool_.
std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
/// Timer for materializing rows. This ignores time getting the next buffer.
ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
/// Average and min/max time spent processing the footer by each split.
RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_;
/// Average and min/max time spent processing the page index for each row group.
RuntimeProfile::SummaryStatsCounter* process_page_index_stats_;
/// Number of columns that need to be read.
RuntimeProfile::Counter* num_cols_counter_;
/// Number of row groups that are skipped because of Parquet statistics, either by
/// row group level statistics, or page level statistics.
RuntimeProfile::Counter* num_stats_filtered_row_groups_counter_;
/// Number of row groups that need to be read.
RuntimeProfile::Counter* num_row_groups_counter_;
/// Number of row groups with page index.
RuntimeProfile::Counter* num_row_groups_with_page_index_counter_;
/// Number of pages that are skipped because of Parquet page statistics.
RuntimeProfile::Counter* num_stats_filtered_pages_counter_;
/// Number of pages need to be examined. We need to scan
/// 'num_pages_counter_ - num_stats_filtered_pages_counter_' pages.
RuntimeProfile::Counter* num_pages_counter_;
/// Number of scanners that end up doing no reads because their splits don't overlap
/// with the midpoint of any row-group in the file.
RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
/// Number of row groups skipped due to dictionary filter
RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
/// Tracks the size of any compressed pages read. If no compressed pages are read, this
/// counter is empty
RuntimeProfile::SummaryStatsCounter* parquet_compressed_page_size_counter_;
/// Tracks the size of any de-compressed pages reads. This counter is updated in two
/// places: (1) when a compressed page is de-compressed, the de-compressed size is added
/// to this counter, (2) when a page that is not compressed is read, its size is added
/// to this counter
RuntimeProfile::SummaryStatsCounter* parquet_uncompressed_page_size_counter_;
/// Number of collection items read in current row batch. It is a scanner-local counter
/// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
/// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
/// AssembleRows() and then is reset to 0.
int64_t coll_items_read_counter_;
typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
/// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
ParquetPageIndex page_index_;
const char* filename() const { return metadata_range_->file(); }
virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
/// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
/// compute the statistics. Sets 'skip_row_group' to true if the row group can be
/// skipped, 'false' otherwise.
Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT;
/// Advances 'row_group_idx_' to the next non-empty row group and initializes
/// the column readers to scan it. Recoverable errors are logged to the runtime
/// state. Only returns a non-OK status if a non-recoverable error is encountered
/// (or abort_on_error is true). If OK is returned, 'parse_status_' is guaranteed
/// to be OK as well.
Status NextRowGroup() WARN_UNUSED_RESULT;
/// High-level function for initializing page filtering for the scalar readers.
/// Sets 'filter_pages' to true if found any page to filter out.
Status ProcessPageIndex(bool* filter_pages);
/// Evaluates 'min_max_conjunct_evals_' against the column index and determines the row
/// ranges that might contain data we are looking for.
/// Sets 'filter_pages' to true if found any page to filter out.
Status EvaluatePageIndex(bool* filter_pages);
/// Based on 'candidate_ranges_' it determines the candidate pages for each
/// scalar reader.
Status ComputeCandidatePagesForColumns(bool* filter_pages);
/// Check that the scalar readers agree on the top-level row being scanned.
Status CheckPageFiltering();
/// Reads statistics data for a page of given column. Page is specified by 'page_idx',
/// column is specified by 'column_index'. 'stats_field' specifies whether we should
/// read the min or max value. 'is_null_page' is set to true for null pages, in that
/// case there is no min nor max value. Returns true if the read of the min or max
/// value was successful, in this case 'slot' will hold the min or max value.
static bool ReadStatFromIndex(const ColumnStatsReader& stats_reader,
const parquet::ColumnIndex& column_index, int page_idx,
ColumnStatsReader::StatsField stats_field, bool* is_null_page, void* slot);
/// Reads data using 'column_readers' to materialize top-level tuples into 'row_batch'.
/// Returns a non-OK status if a non-recoverable error was encountered and execution
/// of this query should be terminated immediately.
/// May set *skip_row_group to indicate that the current row group should be skipped,
/// e.g., due to a parse error, but execution should continue.
Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
RowBatch* row_batch, bool* skip_row_group) WARN_UNUSED_RESULT;
/// Commit num_rows to the given row batch.
/// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
/// Scanner can call this with 0 rows to flush any pending resources (attached pools
/// and io buffers) to minimize memory consumption.
Status CommitRows(RowBatch* dst_batch, int num_rows) WARN_UNUSED_RESULT;
/// Evaluates runtime filters and conjuncts (if any) against the tuples in
/// 'scratch_batch_', and adds the surviving tuples to the given batch.
/// Transfers the ownership of tuple memory to the target batch when the
/// scratch batch is exhausted.
/// Returns the number of rows that should be committed to the given batch.
int TransferScratchTuples(RowBatch* dst_batch);
/// Processes a single row batch for TransferScratchTuples, looping over scratch_batch_
/// until it is exhausted or the output is full. Called for the case when there are
/// materialized tuples. This is a separate function so it can be codegened.
int ProcessScratchBatch(RowBatch* dst_batch);
/// Reads data using 'column_readers' to materialize the tuples of a CollectionValue
/// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the
/// number of items in this collection and descendant collections.
///
/// 'new_collection_rep_level' indicates when the end of the collection has been
/// reached, namely when current_rep_level <= new_collection_rep_level.
///
/// Returns true when the end of the current collection is reached, and execution can
/// be safely resumed.
/// Returns false if execution should be aborted due to:
/// - parse_error_ is set
/// - query is cancelled
/// - scan node limit was reached
/// When false is returned the column_readers are left in an undefined state and
/// execution should be aborted immediately by the caller.
bool AssembleCollection(const std::vector<ParquetColumnReader*>& column_readers,
int new_collection_rep_level, CollectionValueBuilder* coll_value_builder);
/// Function used by AssembleCollection() to materialize a single collection item
/// into 'tuple'. Returns false if execution should be aborted for some reason,
/// otherwise returns true.
/// If 'materialize_tuple' is false, only advances the column readers' levels,
/// and does not read any data values.
inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& column_readers,
bool materialize_tuple, MemPool* pool, Tuple* tuple) const;
/// Process the file footer and parse file_metadata_. This should be called with the
/// last FOOTER_SIZE bytes in context_.
Status ProcessFooter() WARN_UNUSED_RESULT;
/// Populates 'column_readers' for the slots in 'tuple_desc', including creating child
/// readers for any collections. Schema resolution is handled in this function as
/// well. Fills in the appropriate template tuple slot with NULL for any materialized
/// fields missing in the file.
Status CreateColumnReaders(const TupleDescriptor& tuple_desc,
const ParquetSchemaResolver& schema_resolver,
std::vector<ParquetColumnReader*>* column_readers) WARN_UNUSED_RESULT;
/// Returns the total number of scalar column readers in 'column_readers', including
/// the children of collection readers.
int CountScalarColumns(const std::vector<ParquetColumnReader*>& column_readers);
/// Creates a column reader that reads one value for each item in the table or
/// collection element corresponding to 'parent_path'. 'parent_path' should point to
/// either a collection element or the root schema (i.e. empty path). The returned
/// reader has no slot desc associated with it, meaning only NextLevels() and not
/// ReadValue() can be called on it.
///
/// This is used for counting item values, rather than materializing any values. For
/// example, in a count(*) over a collection, there are no values to materialize, but we
/// still need to iterate over every item in the collection to count them.
Status CreateCountingReader(const SchemaPath& parent_path,
const ParquetSchemaResolver& schema_resolver,
ParquetColumnReader** reader)
WARN_UNUSED_RESULT;
/// Walks file_metadata_ and initiates reading the materialized columns. This
/// initializes 'scalar_readers_' and divides reservation between the columns but
/// does not start any scan ranges.
Status InitScalarColumns() WARN_UNUSED_RESULT;
/// Decides how to divide stream_->reservation() between the columns. May increase
/// the reservation if more reservation would enable more efficient I/O for the
/// current columns being scanned. Sets the reservation on each corresponding reader
/// in 'column_readers'.
Status DivideReservationBetweenColumns(
const std::vector<BaseScalarColumnReader*>& column_readers);
/// Compute the ideal reservation to scan a file with scan range lengths
/// 'col_range_lengths' given the min and max buffer size of the singleton DiskIoMgr
/// in ExecEnv.
static int64_t ComputeIdealReservation(const std::vector<int64_t>& col_range_lengths);
/// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
/// dividing a reservation of 'reservation_to_distribute' bytes between columns with
/// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
/// a vector with an entry per column with the index into 'col_range_lengths' and the
/// amount of reservation in bytes to give to that column.
static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
int64_t min_buffer_size, int64_t max_buffer_size,
const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
/// Initializes the column readers in collection_readers_.
void InitCollectionColumns();
/// Initialize dictionaries for all column readers
Status InitDictionaries(const std::vector<BaseScalarColumnReader*>& column_readers)
WARN_UNUSED_RESULT;
/// Performs some validation once we've reached the end of a row group to help detect
/// bugs or bad input files.
Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers,
int row_group_idx, int64_t rows_read) WARN_UNUSED_RESULT;
/// Part of the HdfsScanner interface, not used in Parquet.
Status InitNewRange() WARN_UNUSED_RESULT { return Status::OK(); }
/// Transfers the remaining resources backing tuples such as IO buffers and memory
/// from mem pools to the given row batch. Closes all column readers.
/// Should be called after completing a row group and when returning the last batch.
void FlushRowGroupResources(RowBatch* row_batch);
/// Releases resources associated with a row group that was skipped and closes all
/// column readers. Should be called after skipping a row group from which no rows
/// were returned.
void ReleaseSkippedRowGroupResources();
/// Evaluates whether the column reader is eligible for dictionary predicates
bool IsDictFilterable(ParquetColumnReader* col_reader);
/// Evaluates whether the column reader is eligible for dictionary predicates.
bool IsDictFilterable(BaseScalarColumnReader* col_reader);
/// Partitions the readers into scalar and collection readers. The collection readers
/// are flattened into collection_readers_. The scalar readers are partitioned into
/// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
/// dictionary filtering is enabled and the reader can be dictionary filtered. All
/// scalar readers are also flattened into scalar_readers_.
void PartitionReaders(const vector<ParquetColumnReader*>& readers,
bool can_eval_dict_filters);
/// Divides the column readers into dict_filterable_readers_,
/// non_dict_filterable_readers_ and collection_readers_. Allocates memory for
/// dict_filter_tuple_map_.
Status InitDictFilterStructures() WARN_UNUSED_RESULT;
/// Returns true if all of the data pages in the column chunk are dictionary encoded
bool IsDictionaryEncoded(const parquet::ColumnMetaData& col_metadata);
/// Checks to see if this row group can be eliminated based on applying conjuncts
/// to the dictionary values. Specifically, if any dictionary-encoded column has
/// no values that pass the relevant conjuncts, then the row group can be skipped.
Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
bool* skip_row_group) WARN_UNUSED_RESULT;
/// Updates the counter parquet_compressed_page_size_counter_ with the given compressed
/// page size. Called by ParquetColumnReader for each page read.
void UpdateCompressedPageSizeCounter(int64_t compressed_page_size);
/// Updates the counter parquet_uncompressed_page_size_counter_ with the given
/// uncompressed page size. Called by ParquetColumnReader for each page read.
void UpdateUncompressedPageSizeCounter(int64_t uncompressed_page_size);
};
} // namespace impala
#endif