| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/parquet/hdfs-parquet-scanner.h" |
| |
| #include <algorithm> |
| #include <queue> |
| #include <stack> |
| |
| #include <gflags/gflags.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/codegen-anyval.h" |
| #include "exec/hdfs-scan-node.h" |
| #include "exec/parquet/parquet-collection-column-reader.h" |
| #include "exec/parquet/parquet-column-readers.h" |
| #include "exec/scanner-context.inline.h" |
| #include "rpc/thrift-util.h" |
| #include "runtime/collection-value-builder.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/io/disk-io-mgr.h" |
| #include "runtime/io/request-context.h" |
| #include "runtime/runtime-filter.inline.h" |
| #include "runtime/runtime-state.h" |
| #include "util/dict-encoding.h" |
| #include "util/pretty-printer.h" |
| #include "util/scope-exit-trigger.h" |
| |
| #include "common/names.h" |
| |
| DECLARE_bool(convert_legacy_hive_parquet_utc_timestamps); |
| |
| using std::move; |
| using std::sort; |
| using namespace impala; |
| using namespace impala::io; |
| |
| // Max entries in the dictionary before switching to PLAIN encoding. If a dictionary |
| // has fewer entries, then the entire column is dictionary encoded. This threshold |
| // is guaranteed to be true for Impala versions 2.9 or below. |
| // THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT. |
| const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000; |
| |
| const char* HdfsParquetScanner::LLVM_CLASS_NAME = "class.impala::HdfsParquetScanner"; |
| |
| static const string PARQUET_MEM_LIMIT_EXCEEDED = |
| "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2."; |
| |
| namespace impala { |
| |
| static const string IDEAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupIdealReservation"; |
| static const string ACTUAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupActualReservation"; |
| |
| Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, |
| const vector<HdfsFileDesc*>& files) { |
| DCHECK(!files.empty()); |
| // Add Parquet-specific counters. |
| ADD_SUMMARY_STATS_COUNTER( |
| scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, TUnit::BYTES); |
| ADD_SUMMARY_STATS_COUNTER( |
| scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, TUnit::BYTES); |
| |
| for (HdfsFileDesc* file : files) { |
| // If the file size is less than 12 bytes, it is an invalid Parquet file. |
| if (file->file_length < 12) { |
| return Status(Substitute("Parquet file $0 has an invalid file length: $1", |
| file->filename, file->file_length)); |
| } |
| } |
| return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files); |
| } |
| |
| HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) |
| : HdfsScanner(scan_node, state), |
| row_group_idx_(-1), |
| row_group_rows_read_(0), |
| advance_row_group_(true), |
| min_max_tuple_(nullptr), |
| row_batches_produced_(0), |
| scratch_batch_(new ScratchTupleBatch( |
| *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())), |
| metadata_range_(nullptr), |
| dictionary_pool_(new MemPool(scan_node->mem_tracker())), |
| assemble_rows_timer_(scan_node_->materialize_tuple_timer()), |
| process_footer_timer_stats_(nullptr), |
| num_cols_counter_(nullptr), |
| num_stats_filtered_row_groups_counter_(nullptr), |
| num_row_groups_counter_(nullptr), |
| num_scanners_with_no_reads_counter_(nullptr), |
| num_dict_filtered_row_groups_counter_(nullptr), |
| parquet_compressed_page_size_counter_(nullptr), |
| parquet_uncompressed_page_size_counter_(nullptr), |
| coll_items_read_counter_(0), |
| codegend_process_scratch_batch_fn_(nullptr), |
| page_index_(this) { |
| assemble_rows_timer_.Stop(); |
| } |
| |
| Status HdfsParquetScanner::Open(ScannerContext* context) { |
| RETURN_IF_ERROR(HdfsScanner::Open(context)); |
| metadata_range_ = stream_->scan_range(); |
| num_cols_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT); |
| num_stats_filtered_row_groups_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredRowGroups", |
| TUnit::UNIT); |
| num_row_groups_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT); |
| num_row_groups_with_page_index_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroupsWithPageIndex", |
| TUnit::UNIT); |
| num_stats_filtered_pages_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredPages", TUnit::UNIT); |
| num_pages_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumPages", TUnit::UNIT); |
| num_scanners_with_no_reads_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT); |
| num_dict_filtered_row_groups_counter_ = |
| ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT); |
| process_footer_timer_stats_ = |
| ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "FooterProcessingTime"); |
| parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER( |
| scan_node_->runtime_profile(), "ParquetCompressedPageSize", TUnit::BYTES); |
| parquet_uncompressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER( |
| scan_node_->runtime_profile(), "ParquetUncompressedPageSize", TUnit::BYTES); |
| process_page_index_stats_ = |
| ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "PageIndexProcessingTime"); |
| |
| codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>( |
| scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET)); |
| if (codegend_process_scratch_batch_fn_ == nullptr) { |
| scan_node_->IncNumScannersCodegenDisabled(); |
| } else { |
| scan_node_->IncNumScannersCodegenEnabled(); |
| } |
| |
| perm_pool_.reset(new MemPool(scan_node_->mem_tracker())); |
| |
| // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics. |
| const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc(); |
| if (min_max_tuple_desc != nullptr) { |
| int64_t tuple_size = min_max_tuple_desc->byte_size(); |
| uint8_t* buffer = perm_pool_->TryAllocate(tuple_size); |
| if (buffer == nullptr) { |
| string details = Substitute("Could not allocate buffer of $0 bytes for Parquet " |
| "statistics tuple for file '$1'.", tuple_size, filename()); |
| return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size); |
| } |
| min_max_tuple_ = reinterpret_cast<Tuple*>(buffer); |
| } |
| |
| // Clone the min/max statistics conjuncts. |
| RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_, |
| expr_perm_pool_.get(), context_->expr_results_pool(), |
| scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_)); |
| |
| for (int i = 0; i < context->filter_ctxs().size(); ++i) { |
| const FilterContext* ctx = &context->filter_ctxs()[i]; |
| DCHECK(ctx->filter != nullptr); |
| filter_ctxs_.push_back(ctx); |
| } |
| filter_stats_.resize(filter_ctxs_.size()); |
| |
| DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); |
| |
| // Each scan node can process multiple splits. Each split processes the footer once. |
| // We use a timer to measure the time taken to ProcessFooter() per split and add |
| // this time to the averaged timer. |
| MonotonicStopWatch single_footer_process_timer; |
| single_footer_process_timer.Start(); |
| // First process the file metadata in the footer. |
| Status footer_status = ProcessFooter(); |
| single_footer_process_timer.Stop(); |
| |
| process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime()); |
| |
| // Release I/O buffers immediately to make sure they are cleaned up |
| // in case we return a non-OK status anywhere below. |
| stream_ = nullptr; |
| context_->ReleaseCompletedResources(true); |
| context_->ClearStreams(); |
| RETURN_IF_ERROR(footer_status); |
| |
| // Parse the file schema into an internal representation for schema resolution. |
| schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(), |
| state_->query_options().parquet_fallback_schema_resolution, |
| state_->query_options().parquet_array_resolution)); |
| RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename())); |
| |
| // We've processed the metadata and there are columns that need to be materialized. |
| RETURN_IF_ERROR(CreateColumnReaders( |
| *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_)); |
| COUNTER_SET(num_cols_counter_, |
| static_cast<int64_t>(CountScalarColumns(column_readers_))); |
| // Set top-level template tuple. |
| template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; |
| |
| RETURN_IF_ERROR(InitDictFilterStructures()); |
| return Status::OK(); |
| } |
| |
| void HdfsParquetScanner::Close(RowBatch* row_batch) { |
| DCHECK(!is_closed_); |
| if (row_batch != nullptr) { |
| FlushRowGroupResources(row_batch); |
| row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); |
| if (scan_node_->HasRowBatchQueue()) { |
| static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch( |
| unique_ptr<RowBatch>(row_batch)); |
| } |
| } else { |
| template_tuple_pool_->FreeAll(); |
| dictionary_pool_->FreeAll(); |
| context_->ReleaseCompletedResources(true); |
| for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr); |
| // The scratch batch may still contain tuple data. We can get into this case if |
| // Open() fails or if the query is cancelled. |
| scratch_batch_->ReleaseResources(nullptr); |
| } |
| if (perm_pool_ != nullptr) { |
| perm_pool_->FreeAll(); |
| perm_pool_.reset(); |
| } |
| |
| // Verify all resources (if any) have been transferred. |
| DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0); |
| DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0); |
| DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0); |
| |
| // Collect compression types for reporting completed ranges. |
| vector<THdfsCompression::type> compression_types; |
| stack<ParquetColumnReader*> readers; |
| for (ParquetColumnReader* r: column_readers_) readers.push(r); |
| while (!readers.empty()) { |
| ParquetColumnReader* reader = readers.top(); |
| readers.pop(); |
| if (reader->IsCollectionReader()) { |
| CollectionColumnReader* coll_reader = static_cast<CollectionColumnReader*>(reader); |
| for (ParquetColumnReader* r: *coll_reader->children()) readers.push(r); |
| continue; |
| } |
| BaseScalarColumnReader* scalar_reader = static_cast<BaseScalarColumnReader*>(reader); |
| compression_types.push_back(scalar_reader->codec()); |
| } |
| assemble_rows_timer_.Stop(); |
| assemble_rows_timer_.ReleaseCounter(); |
| |
| // If this was a metadata only read (i.e. count(*)), there are no columns. |
| if (compression_types.empty()) { |
| compression_types.push_back(THdfsCompression::NONE); |
| scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types, true); |
| } else { |
| scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types); |
| } |
| if (schema_resolver_.get() != nullptr) schema_resolver_.reset(); |
| |
| ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_); |
| |
| for (int i = 0; i < filter_ctxs_.size(); ++i) { |
| const FilterStats* stats = filter_ctxs_[i]->stats; |
| const LocalFilterStats& local = filter_stats_[i]; |
| stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible, |
| local.considered, local.rejected); |
| } |
| |
| CloseInternal(); |
| } |
| |
| // Get the start of the column. |
| static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) { |
| if (column.__isset.dictionary_page_offset) { |
| DCHECK_LT(column.dictionary_page_offset, column.data_page_offset); |
| return column.dictionary_page_offset; |
| } |
| return column.data_page_offset; |
| } |
| |
| // Get the file offset of the middle of the row group. |
| static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) { |
| int64_t start_offset = GetColumnStartOffset(row_group.columns[0].meta_data); |
| |
| const parquet::ColumnMetaData& last_column = |
| row_group.columns[row_group.columns.size() - 1].meta_data; |
| int64_t end_offset = |
| GetColumnStartOffset(last_column) + last_column.total_compressed_size; |
| |
| return start_offset + (end_offset - start_offset) / 2; |
| } |
| |
| // Returns true if 'row_group' overlaps with 'split_range'. |
| static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group, |
| const ScanRange* split_range) { |
| int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data); |
| |
| const parquet::ColumnMetaData& last_column = |
| row_group.columns[row_group.columns.size() - 1].meta_data; |
| int64_t row_group_end = |
| GetColumnStartOffset(last_column) + last_column.total_compressed_size; |
| |
| int64_t split_start = split_range->offset(); |
| int64_t split_end = split_start + split_range->len(); |
| |
| return (split_start >= row_group_start && split_start < row_group_end) || |
| (split_end > row_group_start && split_end <= row_group_end) || |
| (split_start <= row_group_start && split_end >= row_group_end); |
| } |
| |
| int HdfsParquetScanner::CountScalarColumns( |
| const vector<ParquetColumnReader*>& column_readers) { |
| DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star()); |
| int num_columns = 0; |
| stack<ParquetColumnReader*> readers; |
| for (ParquetColumnReader* r: column_readers_) readers.push(r); |
| while (!readers.empty()) { |
| ParquetColumnReader* col_reader = readers.top(); |
| readers.pop(); |
| if (col_reader->IsCollectionReader()) { |
| CollectionColumnReader* collection_reader = |
| static_cast<CollectionColumnReader*>(col_reader); |
| for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r); |
| continue; |
| } |
| ++num_columns; |
| } |
| return num_columns; |
| } |
| |
| Status HdfsParquetScanner::ProcessSplit() { |
| DCHECK(scan_node_->HasRowBatchQueue()); |
| HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_); |
| do { |
| if (FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY, |
| context_->filter_ctxs())) { |
| eos_ = true; |
| break; |
| } |
| unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(), |
| state_->batch_size(), scan_node_->mem_tracker()); |
| Status status = GetNextInternal(batch.get()); |
| // Always add batch to the queue because it may contain data referenced by previously |
| // appended batches. |
| scan_node->AddMaterializedRowBatch(move(batch)); |
| RETURN_IF_ERROR(status); |
| ++row_batches_produced_; |
| if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) { |
| CheckFiltersEffectiveness(); |
| } |
| } while (!eos_ && !scan_node_->ReachedLimitShared()); |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { |
| DCHECK(parse_status_.ok()) << parse_status_.GetDetail(); |
| if (scan_node_->optimize_parquet_count_star()) { |
| // Populate the single slot with the Parquet num rows statistic. |
| int64_t tuple_buf_size; |
| uint8_t* tuple_buf; |
| // We try to allocate a smaller row batch here because in most cases the number row |
| // groups in a file is much lower than the default row batch capacity. |
| int capacity = min( |
| static_cast<int>(file_metadata_.row_groups.size()), row_batch->capacity()); |
| RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_, |
| row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), |
| &capacity, &tuple_buf_size, &tuple_buf)); |
| while (!row_batch->AtCapacity()) { |
| RETURN_IF_ERROR(NextRowGroup()); |
| DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); |
| DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); |
| if (row_group_idx_ == file_metadata_.row_groups.size()) break; |
| Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf); |
| TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); |
| InitTuple(template_tuple_, dst_tuple); |
| int64_t* dst_slot = |
| dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset()); |
| *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows; |
| row_group_rows_read_ += *dst_slot; |
| dst_row->SetTuple(0, dst_tuple); |
| row_batch->CommitLastRow(); |
| tuple_buf += scan_node_->tuple_desc()->byte_size(); |
| } |
| eos_ = row_group_idx_ == file_metadata_.row_groups.size(); |
| return Status::OK(); |
| } else if (scan_node_->IsZeroSlotTableScan()) { |
| // There are no materialized slots and we are not optimizing count(*), e.g. |
| // "select 1 from alltypes". We can serve this query from just the file metadata. |
| // We don't need to read the column data. |
| if (row_group_rows_read_ == file_metadata_.num_rows) { |
| eos_ = true; |
| return Status::OK(); |
| } |
| assemble_rows_timer_.Start(); |
| DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); |
| int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_; |
| int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining); |
| TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); |
| int num_to_commit = WriteTemplateTuples(current_row, max_tuples); |
| Status status = CommitRows(row_batch, num_to_commit); |
| assemble_rows_timer_.Stop(); |
| RETURN_IF_ERROR(status); |
| row_group_rows_read_ += max_tuples; |
| COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples); |
| return Status::OK(); |
| } |
| |
| // Transfer remaining tuples from the scratch batch. |
| if (!scratch_batch_->AtEnd()) { |
| assemble_rows_timer_.Start(); |
| int num_row_to_commit = TransferScratchTuples(row_batch); |
| assemble_rows_timer_.Stop(); |
| RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit)); |
| if (row_batch->AtCapacity()) return Status::OK(); |
| } |
| |
| while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) { |
| // Transfer resources and clear streams if there is any leftover from the previous |
| // row group. We will create new streams for the next row group. |
| FlushRowGroupResources(row_batch); |
| if (!advance_row_group_) { |
| Status status = |
| ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_); |
| if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); |
| } |
| RETURN_IF_ERROR(NextRowGroup()); |
| DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); |
| if (row_group_idx_ == file_metadata_.row_groups.size()) { |
| eos_ = true; |
| DCHECK(parse_status_.ok()); |
| return Status::OK(); |
| } |
| } |
| |
| // Apply any runtime filters to static tuples containing the partition keys for this |
| // partition. If any filter fails, we return immediately and stop processing this |
| // scan range. |
| if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(), |
| FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) { |
| eos_ = true; |
| DCHECK(parse_status_.ok()); |
| return Status::OK(); |
| } |
| assemble_rows_timer_.Start(); |
| Status status = AssembleRows(column_readers_, row_batch, &advance_row_group_); |
| assemble_rows_timer_.Stop(); |
| RETURN_IF_ERROR(status); |
| if (!parse_status_.ok()) { |
| RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg())); |
| parse_status_ = Status::OK(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::EvaluateStatsConjuncts( |
| const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group, |
| bool* skip_row_group) { |
| *skip_row_group = false; |
| |
| if (!state_->query_options().parquet_read_statistics) return Status::OK(); |
| |
| const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc(); |
| if (!min_max_tuple_desc) return Status::OK(); |
| |
| int64_t tuple_size = min_max_tuple_desc->byte_size(); |
| |
| DCHECK(min_max_tuple_ != nullptr); |
| min_max_tuple_->Init(tuple_size); |
| |
| DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size()); |
| for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) { |
| SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i]; |
| ScalarExprEvaluator* eval = min_max_conjunct_evals_[i]; |
| |
| // Resolve column path to determine col idx. |
| SchemaNode* node = nullptr; |
| bool pos_field; |
| bool missing_field; |
| RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(), |
| &node, &pos_field, &missing_field)); |
| |
| if (missing_field) { |
| // We are selecting a column that is not in the file. We would set its slot to NULL |
| // during the scan, so any predicate would evaluate to false. Return early. NULL |
| // comparisons cannot happen here, since predicates with NULL literals are filtered |
| // in the frontend. |
| *skip_row_group = true; |
| break; |
| } |
| |
| if (pos_field) { |
| // The planner should not send predicates with 'pos' for stats filtering to the BE. |
| // In case there is a bug, we return an error, which will abort the query. |
| stringstream err; |
| err << "Statistics not supported for pos fields: " << slot_desc->DebugString(); |
| DCHECK(false) << err.str(); |
| return Status(err.str()); |
| } |
| |
| int col_idx = node->col_idx; |
| DCHECK_LT(col_idx, row_group.columns.size()); |
| |
| const vector<parquet::ColumnOrder>& col_orders = file_metadata.column_orders; |
| const parquet::ColumnOrder* col_order = col_idx < col_orders.size() ? |
| &col_orders[col_idx] : nullptr; |
| |
| const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx]; |
| const ColumnType& col_type = slot_desc->type(); |
| |
| DCHECK(node->element != nullptr); |
| |
| ColumnStatsReader stat_reader = CreateColumnStatsReader(col_chunk, col_type, |
| col_order, *node->element); |
| |
| int64_t null_count = 0; |
| bool null_count_result = stat_reader.ReadNullCountStat(&null_count); |
| if (null_count_result && null_count == col_chunk.meta_data.num_values) { |
| *skip_row_group = true; |
| break; |
| } |
| |
| const string& fn_name = eval->root().function_name(); |
| ColumnStatsReader::StatsField stats_field; |
| if (!ColumnStatsReader::GetRequiredStatsField(fn_name, &stats_field)) continue; |
| |
| void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset()); |
| bool stats_read = stat_reader.ReadFromThrift(stats_field, slot); |
| |
| if (stats_read) { |
| TupleRow row; |
| row.SetTuple(0, min_max_tuple_); |
| if (!ExecNode::EvalPredicate(eval, &row)) { |
| *skip_row_group = true; |
| break; |
| } |
| } |
| } |
| |
| // Free any expr result allocations accumulated during conjunct evaluation. |
| context_->expr_results_pool()->Clear(); |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::NextRowGroup() { |
| const ScanRange* split_range = static_cast<ScanRangeMetadata*>( |
| metadata_range_->meta_data())->original_split; |
| int64_t split_offset = split_range->offset(); |
| int64_t split_length = split_range->len(); |
| |
| HdfsFileDesc* file_desc = scan_node_->GetFileDesc( |
| context_->partition_descriptor()->id(), filename()); |
| |
| bool start_with_first_row_group = row_group_idx_ == -1; |
| bool misaligned_row_group_skipped = false; |
| |
| advance_row_group_ = false; |
| row_group_rows_read_ = 0; |
| |
| // Loop until we have found a non-empty row group, and successfully initialized and |
| // seeded the column readers. Return a non-OK status from within loop only if the error |
| // is non-recoverable, otherwise log the error and continue with the next row group. |
| while (true) { |
| // Reset the parse status for the next row group. |
| parse_status_ = Status::OK(); |
| // Make sure that we don't have leftover resources from the file metadata scan range |
| // or previous row groups. |
| DCHECK_EQ(0, context_->NumStreams()); |
| |
| ++row_group_idx_; |
| if (row_group_idx_ >= file_metadata_.row_groups.size()) { |
| if (start_with_first_row_group && misaligned_row_group_skipped) { |
| // We started with the first row group and skipped all the row groups because |
| // they were misaligned. The execution flow won't reach this point if there is at |
| // least one non-empty row group which this scanner can process. |
| COUNTER_ADD(num_scanners_with_no_reads_counter_, 1); |
| } |
| break; |
| } |
| const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; |
| // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *' |
| // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0' |
| // but some data in row groups. |
| if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue; |
| |
| RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets( |
| file_desc->filename, file_desc->file_length, row_group)); |
| |
| // A row group is processed by the scanner whose split overlaps with the row |
| // group's mid point. |
| int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group); |
| if (!(row_group_mid_pos >= split_offset && |
| row_group_mid_pos < split_offset + split_length)) { |
| // The mid-point does not fall within the split, this row group will be handled by a |
| // different scanner. |
| // If the row group overlaps with the split, we found a misaligned row group. |
| misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, split_range); |
| continue; |
| } |
| |
| COUNTER_ADD(num_row_groups_counter_, 1); |
| if (!row_group.columns.empty() && |
| row_group.columns.front().__isset.offset_index_offset) { |
| COUNTER_ADD(num_row_groups_with_page_index_counter_, 1); |
| } |
| |
| // Evaluate row group statistics. |
| bool skip_row_group_on_stats; |
| RETURN_IF_ERROR( |
| EvaluateStatsConjuncts(file_metadata_, row_group, &skip_row_group_on_stats)); |
| if (skip_row_group_on_stats) { |
| COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1); |
| continue; |
| } |
| |
| // Evaluate page index. |
| if (!min_max_conjunct_evals_.empty() && |
| state_->query_options().parquet_read_page_index) { |
| bool filter_pages; |
| Status page_index_status = ProcessPageIndex(&filter_pages); |
| if (!page_index_status.ok()) { |
| RETURN_IF_ERROR(state_->LogOrReturnError(page_index_status.msg())); |
| } |
| if (filter_pages && candidate_ranges_.empty()) { |
| // Page level statistics filtered the whole row group. It can happen when there |
| // is a gap in the data between the pages and the user's predicate hit that gap. |
| // E.g. column chunk 'A' has two pages with statistics {min: 0, max: 5}, |
| // {min: 10, max: 20}, and query is 'select * from T where A = 8'. |
| // It can also happen when there are predicates against different columns, and |
| // the passing row ranges of the predicates don't have a common subset. |
| COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1); |
| continue; |
| } |
| } |
| |
| InitCollectionColumns(); |
| RETURN_IF_ERROR(InitScalarColumns()); |
| |
| // Start scanning dictionary filtering column readers, so we can read the dictionary |
| // pages in EvalDictionaryFilters(). |
| RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_)); |
| |
| // StartScans() may have allocated resources to scan columns. If we skip this row |
| // group below, we must call ReleaseSkippedRowGroupResources() before continuing. |
| |
| // If there is a dictionary-encoded column where every value is eliminated |
| // by a conjunct, the row group can be eliminated. This initializes dictionaries |
| // for all columns visited. |
| bool skip_row_group_on_dict_filters; |
| Status status = EvalDictionaryFilters(row_group, &skip_row_group_on_dict_filters); |
| if (!status.ok()) { |
| // Either return an error or skip this row group if it is ok to ignore errors |
| RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); |
| ReleaseSkippedRowGroupResources(); |
| continue; |
| } |
| if (skip_row_group_on_dict_filters) { |
| COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1); |
| ReleaseSkippedRowGroupResources(); |
| continue; |
| } |
| |
| // At this point, the row group has passed any filtering criteria |
| // Start scanning non-dictionary filtering column readers and initialize their |
| // dictionaries. |
| RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_)); |
| status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_); |
| if (!status.ok()) { |
| // Either return an error or skip this row group if it is ok to ignore errors |
| RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); |
| ReleaseSkippedRowGroupResources(); |
| continue; |
| } |
| DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); |
| break; |
| } |
| DCHECK(parse_status_.ok()); |
| return Status::OK(); |
| } |
| |
| bool HdfsParquetScanner::ReadStatFromIndex(const ColumnStatsReader& stats_reader, |
| const parquet::ColumnIndex& column_index, int page_idx, |
| ColumnStatsReader::StatsField stats_field, bool* is_null_page, void* slot) { |
| *is_null_page = column_index.null_pages[page_idx]; |
| if (*is_null_page) return false; |
| switch (stats_field) { |
| case ColumnStatsReader::StatsField::MIN: |
| return stats_reader.ReadFromString( |
| stats_field, column_index.min_values[page_idx], slot); |
| case ColumnStatsReader::StatsField::MAX: |
| return stats_reader.ReadFromString( |
| stats_field, column_index.max_values[page_idx], slot); |
| default: |
| DCHECK(false); |
| } |
| return false; |
| } |
| |
| Status HdfsParquetScanner::ProcessPageIndex(bool* filter_pages) { |
| MonotonicStopWatch single_process_page_index_timer; |
| single_process_page_index_timer.Start(); |
| candidate_ranges_.clear(); |
| *filter_pages = false; |
| for (auto& scalar_reader : scalar_readers_) scalar_reader->ResetPageFiltering(); |
| RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_)); |
| if (page_index_.IsEmpty()) return Status::OK(); |
| // We can release the raw page index buffer when we exit this function. |
| const auto scope_exit = MakeScopeExitTrigger([this](){page_index_.Release();}); |
| RETURN_IF_ERROR(EvaluatePageIndex(filter_pages)); |
| RETURN_IF_ERROR(ComputeCandidatePagesForColumns(filter_pages)); |
| single_process_page_index_timer.Stop(); |
| process_page_index_stats_->UpdateCounter(single_process_page_index_timer.ElapsedTime()); |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::EvaluatePageIndex(bool* filter_pages) { |
| parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; |
| vector<RowRange> skip_ranges; |
| |
| for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) { |
| ScalarExprEvaluator* eval = min_max_conjunct_evals_[i]; |
| SlotDescriptor* slot_desc = scan_node_->min_max_tuple_desc()->slots()[i]; |
| |
| // Resolve column path to determine col idx. |
| SchemaNode* node = nullptr; |
| bool pos_field, missing_field; |
| RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(), &node, |
| &pos_field, &missing_field)); |
| if (pos_field || missing_field) continue; |
| |
| int col_idx = node->col_idx; |
| DCHECK_LT(col_idx, row_group.columns.size()); |
| const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx]; |
| if (col_chunk.column_index_length == 0) continue; |
| |
| parquet::ColumnIndex column_index; |
| RETURN_IF_ERROR(page_index_.DeserializeColumnIndex(col_chunk, &column_index)); |
| |
| const ColumnType& col_type = slot_desc->type(); |
| const vector<parquet::ColumnOrder>& col_orders = file_metadata_.column_orders; |
| const parquet::ColumnOrder* col_order = col_idx < col_orders.size() ? |
| &col_orders[col_idx] : nullptr; |
| ColumnStatsReader stats_reader = CreateColumnStatsReader(col_chunk, col_type, |
| col_order, *node->element); |
| |
| min_max_tuple_->Init(scan_node_->min_max_tuple_desc()->byte_size()); |
| void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset()); |
| |
| const int num_of_pages = column_index.null_pages.size(); |
| const string& fn_name = eval->root().function_name(); |
| ColumnStatsReader::StatsField stats_field; |
| if (!ColumnStatsReader::GetRequiredStatsField(fn_name, &stats_field)) continue; |
| |
| for (int page_idx = 0; page_idx < num_of_pages; ++page_idx) { |
| bool value_read, is_null_page; |
| value_read = ReadStatFromIndex(stats_reader, column_index, page_idx, stats_field, |
| &is_null_page, slot); |
| if (!is_null_page && !value_read) continue; |
| TupleRow row; |
| row.SetTuple(0, min_max_tuple_); |
| if (is_null_page || !ExecNode::EvalPredicate(eval, &row)) { |
| BaseScalarColumnReader* scalar_reader = scalar_reader_map_[col_idx]; |
| RETURN_IF_ERROR(page_index_.DeserializeOffsetIndex(col_chunk, |
| &scalar_reader->offset_index_)); |
| RowRange row_range; |
| GetRowRangeForPage(row_group, scalar_reader->offset_index_, page_idx, &row_range); |
| skip_ranges.push_back(row_range); |
| } |
| } |
| } |
| if (skip_ranges.empty()) return Status::OK(); |
| |
| for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { |
| const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()]; |
| if (col_chunk.offset_index_length > 0) { |
| parquet::OffsetIndex& offset_index = scalar_reader->offset_index_; |
| if (!offset_index.page_locations.empty()) continue; |
| RETURN_IF_ERROR(page_index_.DeserializeOffsetIndex(col_chunk, &offset_index)); |
| } else { |
| // We can only filter pages based on the page index if we have the offset index |
| // for all columns. |
| return Status(Substitute("Found column index, but no offset index for '$0' in " |
| "file '$1'", scalar_reader->schema_element().name, filename())); |
| } |
| } |
| if (!ComputeCandidateRanges(row_group.num_rows, &skip_ranges, &candidate_ranges_)) { |
| return Status(Substitute("Invalid offset index in Parquet file $0.", filename())); |
| } |
| *filter_pages = true; |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::ComputeCandidatePagesForColumns(bool* filter_pages) { |
| if (candidate_ranges_.empty()) return Status::OK(); |
| |
| parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; |
| for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { |
| const auto& page_locations = scalar_reader->offset_index_.page_locations; |
| if (!ComputeCandidatePages(page_locations, candidate_ranges_, row_group.num_rows, |
| &scalar_reader->candidate_data_pages_)) { |
| *filter_pages = false; |
| return Status(Substitute("Invalid offset index in Parquet file $0.", filename())); |
| } |
| } |
| for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { |
| const auto& page_locations = scalar_reader->offset_index_.page_locations; |
| int total_page_count = page_locations.size(); |
| int candidate_pages_count = scalar_reader->candidate_data_pages_.size(); |
| COUNTER_ADD(num_stats_filtered_pages_counter_, |
| total_page_count - candidate_pages_count); |
| COUNTER_ADD(num_pages_counter_, total_page_count); |
| } |
| return Status::OK(); |
| } |
| |
| void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) { |
| DCHECK(row_batch != nullptr); |
| row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false); |
| scratch_batch_->ReleaseResources(row_batch->tuple_data_pool()); |
| context_->ReleaseCompletedResources(true); |
| for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch); |
| context_->ClearStreams(); |
| } |
| |
| void HdfsParquetScanner::ReleaseSkippedRowGroupResources() { |
| dictionary_pool_->FreeAll(); |
| scratch_batch_->ReleaseResources(nullptr); |
| context_->ReleaseCompletedResources(true); |
| for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr); |
| context_->ClearStreams(); |
| } |
| |
| bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) { |
| const SlotDescriptor* slot_desc = col_reader->slot_desc(); |
| // Some queries do not need the column to be materialized, so slot_desc is NULL. |
| // For example, a count(*) with no predicates only needs to count records |
| // rather than materializing the values. |
| if (!slot_desc) return false; |
| // Does this column reader have any dictionary filter conjuncts? |
| auto dict_filter_it = dict_filter_map_.find(slot_desc->id()); |
| if (dict_filter_it == dict_filter_map_.end()) return false; |
| |
| // Certain datatypes (chars, timestamps) do not have the appropriate value in the |
| // file format and must be converted before return. This is true for the |
| // dictionary values, so skip these datatypes for now. |
| // TODO: The values should be converted during dictionary construction and stored |
| // in converted form in the dictionary. |
| if (col_reader->NeedsConversion()) return false; |
| |
| // Certain datatypes (timestamps, date) need to validate the value, as certain bit |
| // combinations are not valid. The dictionary values are not validated, so skip |
| // these datatypes for now. |
| // TODO: This should be pushed into dictionary construction. |
| if (col_reader->NeedsValidation()) return false; |
| |
| return true; |
| } |
| |
| void HdfsParquetScanner::PartitionReaders( |
| const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters) { |
| for (auto* reader : readers) { |
| if (reader->IsCollectionReader()) { |
| CollectionColumnReader* col_reader = static_cast<CollectionColumnReader*>(reader); |
| collection_readers_.push_back(col_reader); |
| PartitionReaders(*col_reader->children(), can_eval_dict_filters); |
| } else { |
| BaseScalarColumnReader* scalar_reader = |
| static_cast<BaseScalarColumnReader*>(reader); |
| scalar_readers_.push_back(scalar_reader); |
| if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) { |
| dict_filterable_readers_.push_back(scalar_reader); |
| } else { |
| non_dict_filterable_readers_.push_back(scalar_reader); |
| } |
| } |
| } |
| } |
| |
| Status HdfsParquetScanner::InitDictFilterStructures() { |
| bool can_eval_dict_filters = |
| state_->query_options().parquet_dictionary_filtering && !dict_filter_map_.empty(); |
| |
| // Separate column readers into scalar and collection readers. |
| PartitionReaders(column_readers_, can_eval_dict_filters); |
| |
| // Allocate tuple buffers for all tuple descriptors that are associated with conjuncts |
| // that can be dictionary filtered. |
| for (auto* col_reader : dict_filterable_readers_) { |
| const SlotDescriptor* slot_desc = col_reader->slot_desc(); |
| const TupleDescriptor* tuple_desc = slot_desc->parent(); |
| auto tuple_it = dict_filter_tuple_map_.find(tuple_desc); |
| if (tuple_it != dict_filter_tuple_map_.end()) continue; |
| int tuple_size = tuple_desc->byte_size(); |
| if (tuple_size > 0) { |
| uint8_t* buffer = perm_pool_->TryAllocate(tuple_size); |
| if (buffer == nullptr) { |
| string details = Substitute( |
| PARQUET_MEM_LIMIT_EXCEEDED, "InitDictFilterStructures", tuple_size, |
| "Dictionary Filtering Tuple"); |
| return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size); |
| } |
| dict_filter_tuple_map_[tuple_desc] = reinterpret_cast<Tuple*>(buffer); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| bool HdfsParquetScanner::IsDictionaryEncoded( |
| const parquet::ColumnMetaData& col_metadata) { |
| // The Parquet spec allows for column chunks to have mixed encodings |
| // where some data pages are dictionary-encoded and others are plain |
| // encoded. For example, a Parquet file writer might start writing |
| // a column chunk as dictionary encoded, but it will switch to plain |
| // encoding if the dictionary grows too large. |
| // |
| // In order for dictionary filters to skip the entire row group, |
| // the conjuncts must be evaluated on column chunks that are entirely |
| // encoded with the dictionary encoding. There are two checks |
| // available to verify this: |
| // 1. The encoding_stats field on the column chunk metadata provides |
| // information about the number of data pages written in each |
| // format. This allows for a specific check of whether all the |
| // data pages are dictionary encoded. |
| // 2. The encodings field on the column chunk metadata lists the |
| // encodings used. If this list contains the dictionary encoding |
| // and does not include unexpected encodings (i.e. encodings not |
| // associated with definition/repetition levels), then it is entirely |
| // dictionary encoded. |
| |
| if (col_metadata.__isset.encoding_stats) { |
| // Condition #1 above |
| for (const parquet::PageEncodingStats& enc_stat : col_metadata.encoding_stats) { |
| if (enc_stat.page_type == parquet::PageType::DATA_PAGE && |
| enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY && |
| enc_stat.count > 0) { |
| return false; |
| } |
| } |
| } else { |
| // Condition #2 above |
| bool has_dict_encoding = false; |
| bool has_nondict_encoding = false; |
| for (const parquet::Encoding::type& encoding : col_metadata.encodings) { |
| if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding = true; |
| |
| // RLE and BIT_PACKED are used for repetition/definition levels |
| if (encoding != parquet::Encoding::PLAIN_DICTIONARY && |
| encoding != parquet::Encoding::RLE && |
| encoding != parquet::Encoding::BIT_PACKED) { |
| has_nondict_encoding = true; |
| break; |
| } |
| } |
| // Not entirely dictionary encoded if: |
| // 1. No dictionary encoding listed |
| // OR |
| // 2. Some non-dictionary encoding is listed |
| if (!has_dict_encoding || has_nondict_encoding) return false; |
| } |
| |
| return true; |
| } |
| |
| Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group, |
| bool* row_group_eliminated) { |
| *row_group_eliminated = false; |
| // Check if there's anything to do here. |
| if (dict_filterable_readers_.empty()) return Status::OK(); |
| |
| // Legacy impala files (< 2.9) require special handling, because they do not encode |
| // information about whether the column is 100% dictionary encoded. |
| bool is_legacy_impala = false; |
| if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0)) { |
| is_legacy_impala = true; |
| } |
| |
| // Keeps track of column readers that need to be initialized. For example, if a |
| // column cannot be filtered, then defer its dictionary initialization once we know |
| // the row group cannot be filtered. |
| vector<BaseScalarColumnReader*> deferred_dict_init_list; |
| // Keeps track of the initialized tuple associated with a TupleDescriptor. |
| unordered_map<const TupleDescriptor*, Tuple*> tuple_map; |
| for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) { |
| const parquet::ColumnMetaData& col_metadata = |
| row_group.columns[scalar_reader->col_idx()].meta_data; |
| |
| // Legacy impala files cannot be eliminated here, because the only way to |
| // determine whether the column is 100% dictionary encoded requires reading |
| // the dictionary. |
| if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) { |
| // We cannot guarantee that this reader is 100% dictionary encoded, |
| // so dictionary filters cannot be used. Defer initializing its dictionary |
| // until after the other filters have been evaluated. |
| deferred_dict_init_list.push_back(scalar_reader); |
| continue; |
| } |
| |
| RETURN_IF_ERROR(scalar_reader->InitDictionary()); |
| DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder(); |
| if (!dictionary) continue; |
| |
| // Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until |
| // it reaches the maximum number of dictionary entries. If the dictionary |
| // has fewer entries, then it is 100% dictionary encoded. |
| if (is_legacy_impala && |
| dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue; |
| |
| const SlotDescriptor* slot_desc = scalar_reader->slot_desc(); |
| DCHECK(slot_desc != nullptr); |
| const TupleDescriptor* tuple_desc = slot_desc->parent(); |
| auto dict_filter_it = dict_filter_map_.find(slot_desc->id()); |
| DCHECK(dict_filter_it != dict_filter_map_.end()); |
| const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals = |
| dict_filter_it->second; |
| Tuple* dict_filter_tuple = nullptr; |
| auto dict_filter_tuple_it = tuple_map.find(tuple_desc); |
| if (dict_filter_tuple_it == tuple_map.end()) { |
| auto tuple_it = dict_filter_tuple_map_.find(tuple_desc); |
| DCHECK(tuple_it != dict_filter_tuple_map_.end()); |
| dict_filter_tuple = tuple_it->second; |
| dict_filter_tuple->Init(tuple_desc->byte_size()); |
| tuple_map[tuple_desc] = dict_filter_tuple; |
| } else { |
| dict_filter_tuple = dict_filter_tuple_it->second; |
| } |
| |
| DCHECK(dict_filter_tuple != nullptr); |
| void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset()); |
| bool column_has_match = false; |
| for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) { |
| if (dict_idx % 1024 == 0) { |
| // Don't let expr result allocations accumulate too much for large dictionaries or |
| // many row groups. |
| context_->expr_results_pool()->Clear(); |
| } |
| dictionary->GetValue(dict_idx, slot); |
| |
| // We can only eliminate this row group if no value from the dictionary matches. |
| // If any dictionary value passes the conjuncts, then move on to the next column. |
| TupleRow row; |
| row.SetTuple(0, dict_filter_tuple); |
| if (ExecNode::EvalConjuncts(dict_filter_conjunct_evals.data(), |
| dict_filter_conjunct_evals.size(), &row)) { |
| column_has_match = true; |
| break; |
| } |
| } |
| // Free all expr result allocations now that we're done with the filter. |
| context_->expr_results_pool()->Clear(); |
| |
| if (!column_has_match) { |
| // The column contains no value that matches the conjunct. The row group |
| // can be eliminated. |
| *row_group_eliminated = true; |
| return Status::OK(); |
| } |
| } |
| |
| // Any columns that were not 100% dictionary encoded need to initialize |
| // their dictionaries here. |
| RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list)); |
| |
| return Status::OK(); |
| } |
| |
| /// High-level steps of this function: |
| /// 1. Allocate 'scratch' memory for tuples able to hold a full batch |
| /// 2. Populate the slots of all scratch tuples one column reader at a time, |
| /// using the ColumnReader::Read*ValueBatch() functions. |
| /// 3. Evaluate runtime filters and conjuncts against the scratch tuples and |
| /// set the surviving tuples in the output batch. Transfer the ownership of |
| /// scratch memory to the output batch once the scratch memory is exhausted. |
| /// 4. Repeat steps above until we are done with the row group or an error |
| /// occurred. |
| /// TODO: Since the scratch batch is populated in a column-wise fashion, it is |
| /// difficult to maintain a maximum memory footprint without throwing away at least |
| /// some work. This point needs further experimentation and thought. |
| Status HdfsParquetScanner::AssembleRows( |
| const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch, |
| bool* skip_row_group) { |
| DCHECK(!column_readers.empty()); |
| DCHECK(row_batch != nullptr); |
| DCHECK_EQ(*skip_row_group, false); |
| DCHECK(scratch_batch_ != nullptr); |
| |
| int64_t num_rows_read = 0; |
| while (!column_readers[0]->RowGroupAtEnd()) { |
| // Start a new scratch batch. |
| RETURN_IF_ERROR(scratch_batch_->Reset(state_)); |
| InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity); |
| |
| // Materialize the top-level slots into the scratch batch column-by-column. |
| int last_num_tuples = -1; |
| for (int c = 0; c < column_readers.size(); ++c) { |
| ParquetColumnReader* col_reader = column_readers[c]; |
| bool continue_execution; |
| if (col_reader->max_rep_level() > 0) { |
| continue_execution = col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool, |
| scratch_batch_->capacity, tuple_byte_size_, scratch_batch_->tuple_mem, |
| &scratch_batch_->num_tuples); |
| } else { |
| continue_execution = col_reader->ReadNonRepeatedValueBatch( |
| &scratch_batch_->aux_mem_pool, scratch_batch_->capacity, tuple_byte_size_, |
| scratch_batch_->tuple_mem, &scratch_batch_->num_tuples); |
| } |
| // Check that all column readers populated the same number of values. |
| bool num_tuples_mismatch = c != 0 && last_num_tuples != scratch_batch_->num_tuples; |
| if (UNLIKELY(!continue_execution || num_tuples_mismatch)) { |
| // Skipping this row group. Free up all the resources with this row group. |
| FlushRowGroupResources(row_batch); |
| scratch_batch_->num_tuples = 0; |
| DCHECK(scratch_batch_->AtEnd()); |
| *skip_row_group = true; |
| if (num_tuples_mismatch && continue_execution) { |
| Status err(Substitute("Corrupt Parquet file '$0': column '$1' " |
| "had $2 remaining values but expected $3", filename(), |
| col_reader->schema_element().name, last_num_tuples, |
| scratch_batch_->num_tuples)); |
| parse_status_.MergeStatus(err); |
| } |
| return Status::OK(); |
| } |
| last_num_tuples = scratch_batch_->num_tuples; |
| } |
| RETURN_IF_ERROR(CheckPageFiltering()); |
| num_rows_read += scratch_batch_->num_tuples; |
| int num_row_to_commit = TransferScratchTuples(row_batch); |
| RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit)); |
| if (row_batch->AtCapacity()) break; |
| } |
| row_group_rows_read_ += num_rows_read; |
| COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); |
| // Merge Scanner-local counter into HdfsScanNode counter and reset. |
| COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_); |
| coll_items_read_counter_ = 0; |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::CheckPageFiltering() { |
| if (candidate_ranges_.empty() || scalar_readers_.empty()) return Status::OK(); |
| |
| int64_t current_row = scalar_readers_[0]->LastProcessedRow(); |
| for (int i = 1; i < scalar_readers_.size(); ++i) { |
| if (current_row != scalar_readers_[i]->LastProcessedRow()) { |
| DCHECK(false); |
| return Status(Substitute( |
| "Top level rows aren't in sync during page filtering in file $0.", filename())); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) { |
| DCHECK(dst_batch != nullptr); |
| dst_batch->CommitRows(num_rows); |
| |
| if (context_->cancelled()) return Status::CancelledInternal("Parquet scanner"); |
| // TODO: It's a really bad idea to propagate UDF error via the global RuntimeState. |
| // Store UDF error in thread local storage or make UDF return status so it can merge |
| // with parse_status_. |
| RETURN_IF_ERROR(state_->GetQueryStatus()); |
| // Clear expr result allocations for this thread to avoid accumulating too much |
| // memory from evaluating the scanner conjuncts. |
| context_->expr_results_pool()->Clear(); |
| return Status::OK(); |
| } |
| |
| int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { |
| // This function must not be called when the output batch is already full. As long as |
| // we always call CommitRows() after TransferScratchTuples(), the output batch can |
| // never be empty. |
| DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity()); |
| DCHECK_EQ(scan_node_->tuple_idx(), 0); |
| DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1); |
| if (scratch_batch_->tuple_byte_size == 0) { |
| Tuple** output_row = |
| reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows())); |
| // We are materializing a collection with empty tuples. Add a NULL tuple to the |
| // output batch per remaining scratch tuple and return. No need to evaluate |
| // filters/conjuncts. |
| DCHECK(filter_ctxs_.empty()); |
| DCHECK(conjunct_evals_->empty()); |
| int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(), |
| scratch_batch_->num_tuples - scratch_batch_->tuple_idx); |
| memset(output_row, 0, num_tuples * sizeof(Tuple*)); |
| scratch_batch_->tuple_idx += num_tuples; |
| // No data is required to back the empty tuples, so we should not attach any data to |
| // these batches. |
| DCHECK_EQ(0, scratch_batch_->total_allocated_bytes()); |
| return num_tuples; |
| } |
| |
| int num_rows_to_commit; |
| if (codegend_process_scratch_batch_fn_ != nullptr) { |
| num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch); |
| } else { |
| num_rows_to_commit = ProcessScratchBatch(dst_batch); |
| } |
| scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit); |
| return num_rows_to_commit; |
| } |
| |
| Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node, |
| const vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn) { |
| DCHECK(node->runtime_state()->ShouldCodegen()); |
| *process_scratch_batch_fn = nullptr; |
| LlvmCodeGen* codegen = node->runtime_state()->codegen(); |
| DCHECK(codegen != nullptr); |
| |
| llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true); |
| DCHECK(fn != nullptr); |
| |
| llvm::Function* eval_conjuncts_fn; |
| RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn)); |
| DCHECK(eval_conjuncts_fn != nullptr); |
| |
| int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| llvm::Function* eval_runtime_filters_fn; |
| RETURN_IF_ERROR(CodegenEvalRuntimeFilters( |
| codegen, node->filter_exprs(), &eval_runtime_filters_fn)); |
| DCHECK(eval_runtime_filters_fn != nullptr); |
| |
| replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| fn->setName("ProcessScratchBatch"); |
| *process_scratch_batch_fn = codegen->FinalizeFunction(fn); |
| if (*process_scratch_batch_fn == nullptr) { |
| return Status("Failed to finalize process_scratch_batch_fn."); |
| } |
| return Status::OK(); |
| } |
| |
| bool HdfsParquetScanner::AssembleCollection( |
| const vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level, |
| CollectionValueBuilder* coll_value_builder) { |
| DCHECK(!column_readers.empty()); |
| DCHECK_GE(new_collection_rep_level, 0); |
| DCHECK(coll_value_builder != nullptr); |
| |
| const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc(); |
| Tuple* template_tuple = template_tuple_map_[tuple_desc]; |
| const vector<ScalarExprEvaluator*> evals = |
| conjunct_evals_map_[tuple_desc->id()]; |
| |
| int64_t rows_read = 0; |
| bool continue_execution = !scan_node_->ReachedLimitShared() && !context_->cancelled(); |
| // Note that this will be set to true at the end of the row group or the end of the |
| // current collection (if applicable). |
| bool end_of_collection = column_readers[0]->rep_level() == -1; |
| // We only initialize end_of_collection to true here if we're at the end of the row |
| // group (otherwise it would always be true because we're on the "edge" of two |
| // collections), and only ProcessSplit() should call AssembleRows() at the end of the |
| // row group. |
| if (coll_value_builder != nullptr) DCHECK(!end_of_collection); |
| |
| while (!end_of_collection && continue_execution) { |
| MemPool* pool; |
| Tuple* tuple; |
| TupleRow* row = nullptr; |
| |
| int64_t num_rows; |
| // We're assembling item tuples into an CollectionValue |
| parse_status_ = |
| GetCollectionMemory(coll_value_builder, &pool, &tuple, &row, &num_rows); |
| if (UNLIKELY(!parse_status_.ok())) { |
| continue_execution = false; |
| break; |
| } |
| // 'num_rows' can be very high if we're writing to a large CollectionValue. Limit |
| // the number of rows we read at one time so we don't spend too long in the |
| // 'num_rows' loop below before checking for cancellation or limit reached. |
| num_rows = min( |
| num_rows, static_cast<int64_t>(scan_node_->runtime_state()->batch_size())); |
| |
| int num_to_commit = 0; |
| int row_idx = 0; |
| for (row_idx = 0; row_idx < num_rows && !end_of_collection; ++row_idx) { |
| DCHECK(continue_execution); |
| // A tuple is produced iff the collection that contains its values is not empty and |
| // non-NULL. (Empty or NULL collections produce no output values, whereas NULL is |
| // output for the fields of NULL structs.) |
| bool materialize_tuple = column_readers[0]->def_level() >= |
| column_readers[0]->def_level_of_immediate_repeated_ancestor(); |
| InitTuple(tuple_desc, template_tuple, tuple); |
| continue_execution = |
| ReadCollectionItem(column_readers, materialize_tuple, pool, tuple); |
| if (UNLIKELY(!continue_execution)) break; |
| end_of_collection = column_readers[0]->rep_level() <= new_collection_rep_level; |
| |
| if (materialize_tuple) { |
| if (ExecNode::EvalConjuncts(evals.data(), evals.size(), row)) { |
| tuple = next_tuple(tuple_desc->byte_size(), tuple); |
| ++num_to_commit; |
| } |
| } |
| } |
| |
| rows_read += row_idx; |
| coll_value_builder->CommitTuples(num_to_commit); |
| continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled(); |
| } |
| coll_items_read_counter_ += rows_read; |
| if (end_of_collection) { |
| // All column readers should report the start of the same collection. |
| for (int c = 1; c < column_readers.size(); ++c) { |
| FILE_CHECK_EQ(column_readers[c]->rep_level(), column_readers[0]->rep_level()); |
| } |
| } |
| return continue_execution; |
| } |
| |
| inline bool HdfsParquetScanner::ReadCollectionItem( |
| const vector<ParquetColumnReader*>& column_readers, |
| bool materialize_tuple, MemPool* pool, Tuple* tuple) const { |
| DCHECK(!column_readers.empty()); |
| bool continue_execution = true; |
| int size = column_readers.size(); |
| for (int c = 0; c < size; ++c) { |
| ParquetColumnReader* col_reader = column_readers[c]; |
| if (materialize_tuple) { |
| // All column readers for this tuple should a value to materialize. |
| FILE_CHECK_GE(col_reader->def_level(), |
| col_reader->def_level_of_immediate_repeated_ancestor()); |
| // Fill in position slot if applicable |
| const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc(); |
| if (pos_slot_desc != nullptr) { |
| col_reader->ReadPositionNonBatched( |
| tuple->GetBigIntSlot(pos_slot_desc->tuple_offset())); |
| } |
| continue_execution = col_reader->ReadValue(pool, tuple); |
| } else { |
| // A containing repeated field is empty or NULL |
| FILE_CHECK_LT(col_reader->def_level(), |
| col_reader->def_level_of_immediate_repeated_ancestor()); |
| continue_execution = col_reader->NextLevels(); |
| } |
| if (UNLIKELY(!continue_execution)) break; |
| } |
| return continue_execution; |
| } |
| |
| Status HdfsParquetScanner::ProcessFooter() { |
| const int64_t file_len = stream_->file_desc()->file_length; |
| const int64_t scan_range_len = stream_->scan_range()->len(); |
| |
| // We're processing the scan range issued in IssueInitialRanges(). The scan range should |
| // be the last FOOTER_BYTES of the file. !success means the file is shorter than we |
| // expect. Note we can't detect if the file is larger than we expect without attempting |
| // to read past the end of the scan range, but in this case we'll fail below trying to |
| // parse the footer. |
| DCHECK_LE(scan_range_len, FOOTER_SIZE); |
| uint8_t* buffer; |
| bool success = stream_->ReadBytes(scan_range_len, &buffer, &parse_status_); |
| if (!success) { |
| DCHECK(!parse_status_.ok()); |
| if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) { |
| VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: " |
| << "metadata states file size to be " |
| << PrettyPrinter::Print(file_len, TUnit::BYTES) |
| << ", but could only read " |
| << PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES); |
| return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(), |
| scan_node_->hdfs_table()->fully_qualified_name()); |
| } |
| return parse_status_; |
| } |
| DCHECK(stream_->eosr()); |
| |
| // Number of bytes in buffer after the fixed size footer is accounted for. |
| int remaining_bytes_buffered = scan_range_len - sizeof(int32_t) - |
| sizeof(PARQUET_VERSION_NUMBER); |
| |
| // Make sure footer has enough bytes to contain the required information. |
| if (remaining_bytes_buffered < 0) { |
| return Status(Substitute("File '$0' is invalid. Missing metadata.", filename())); |
| } |
| |
| // Validate magic file bytes are correct. |
| uint8_t* magic_number_ptr = buffer + scan_range_len - sizeof(PARQUET_VERSION_NUMBER); |
| if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER, |
| sizeof(PARQUET_VERSION_NUMBER)) != 0) { |
| return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, filename(), |
| string(reinterpret_cast<char*>(magic_number_ptr), sizeof(PARQUET_VERSION_NUMBER)), |
| scan_node_->hdfs_table()->fully_qualified_name()); |
| } |
| |
| // The size of the metadata is encoded as a 4 byte little endian value before |
| // the magic number |
| uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t); |
| uint32_t metadata_size = *reinterpret_cast<uint32_t*>(metadata_size_ptr); |
| // The start of the metadata is: |
| // file_len - 4-byte footer length field - 4-byte version number field - metadata size |
| int64_t metadata_start = file_len - sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - |
| metadata_size; |
| if (UNLIKELY(metadata_start < 0)) { |
| return Status(Substitute("File '$0' is invalid. Invalid metadata size in file " |
| "footer: $1 bytes. File size: $2 bytes.", |
| filename(), metadata_size, file_len)); |
| } |
| uint8_t* metadata_ptr = metadata_size_ptr - metadata_size; |
| |
| // If the metadata was too big, we need to read it into a contiguous buffer before |
| // deserializing it. |
| ScopedBuffer metadata_buffer(scan_node_->mem_tracker()); |
| |
| DCHECK(metadata_range_ != nullptr); |
| if (UNLIKELY(metadata_size > remaining_bytes_buffered)) { |
| // In this case, the metadata is bigger than our guess meaning there are |
| // not enough bytes in the footer range from IssueInitialRanges(). |
| // We'll just issue more ranges to the IoMgr that is the actual footer. |
| int64_t partition_id = context_->partition_descriptor()->id(); |
| const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); |
| DCHECK_EQ(file_desc, stream_->file_desc()); |
| |
| if (!metadata_buffer.TryAllocate(metadata_size)) { |
| string details = Substitute("Could not allocate buffer of $0 bytes for Parquet " |
| "metadata for file '$1'.", metadata_size, filename()); |
| return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, metadata_size); |
| } |
| metadata_ptr = metadata_buffer.buffer(); |
| |
| // Read the footer into the metadata buffer. Skip HDFS caching in this case. |
| int cache_options = metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE; |
| ScanRange* metadata_range = scan_node_->AllocateScanRange( |
| metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id, |
| metadata_range_->disk_id(), metadata_range_->expected_local(), |
| metadata_range_->is_erasure_coded(), metadata_range_->mtime(), |
| BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size, cache_options)); |
| |
| unique_ptr<BufferDescriptor> io_buffer; |
| bool needs_buffers; |
| RETURN_IF_ERROR( |
| scan_node_->reader_context()->StartScanRange(metadata_range, &needs_buffers)); |
| DCHECK(!needs_buffers) << "Already provided a buffer"; |
| RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer)); |
| DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer()); |
| DCHECK_EQ(io_buffer->len(), metadata_size); |
| DCHECK(io_buffer->eosr()); |
| metadata_range->ReturnBuffer(move(io_buffer)); |
| } |
| |
| // Deserialize file footer |
| // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce this. |
| Status status = |
| DeserializeThriftMsg(metadata_ptr, &metadata_size, true, &file_metadata_); |
| if (!status.ok()) { |
| return Status(Substitute("File '$0' of length $1 bytes has invalid file metadata " |
| "at file offset $2, Error = $3.", filename(), file_len, metadata_start, |
| status.GetDetail())); |
| } |
| |
| RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, filename())); |
| |
| // IMPALA-3943: Do not throw an error for empty files for backwards compatibility. |
| if (file_metadata_.num_rows == 0) { |
| // Warn if the num_rows is inconsistent with the row group metadata. |
| if (!file_metadata_.row_groups.empty()) { |
| bool has_non_empty_row_group = false; |
| for (const parquet::RowGroup& row_group : file_metadata_.row_groups) { |
| if (row_group.num_rows > 0) { |
| has_non_empty_row_group = true; |
| break; |
| } |
| } |
| // Warn if there is at least one non-empty row group. |
| if (has_non_empty_row_group) { |
| ErrorMsg msg(TErrorCode::PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE, filename()); |
| state_->LogError(msg); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Parse out the created by application version string |
| if (file_metadata_.__isset.created_by) { |
| file_version_ = ParquetFileVersion(file_metadata_.created_by); |
| } |
| if (file_metadata_.row_groups.empty()) { |
| return Status( |
| Substitute("Invalid file. This file: $0 has no row groups", filename())); |
| } |
| if (file_metadata_.num_rows < 0) { |
| return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 in " |
| "file metadata", filename(), file_metadata_.num_rows)); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc, |
| const ParquetSchemaResolver& schema_resolver, |
| vector<ParquetColumnReader*>* column_readers) { |
| DCHECK(column_readers != nullptr); |
| DCHECK(column_readers->empty()); |
| |
| if (scan_node_->optimize_parquet_count_star()) { |
| // Column readers are not needed because we are not reading from any columns if this |
| // optimization is enabled. |
| return Status::OK(); |
| } |
| |
| // Each tuple can have at most one position slot. We'll process this slot desc last. |
| SlotDescriptor* pos_slot_desc = nullptr; |
| |
| for (SlotDescriptor* slot_desc: tuple_desc.slots()) { |
| // Skip partition columns |
| if (&tuple_desc == scan_node_->tuple_desc() && |
| slot_desc->col_pos() < scan_node_->num_partition_keys()) continue; |
| |
| SchemaNode* node = nullptr; |
| bool pos_field; |
| bool missing_field; |
| RETURN_IF_ERROR(schema_resolver.ResolvePath( |
| slot_desc->col_path(), &node, &pos_field, &missing_field)); |
| |
| if (missing_field) { |
| // In this case, we are selecting a column that is not in the file. |
| // Update the template tuple to put a NULL in this slot. |
| Tuple** template_tuple = &template_tuple_map_[&tuple_desc]; |
| if (*template_tuple == nullptr) { |
| *template_tuple = |
| Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get()); |
| } |
| (*template_tuple)->SetNull(slot_desc->null_indicator_offset()); |
| continue; |
| } |
| |
| if (pos_field) { |
| DCHECK(pos_slot_desc == nullptr) |
| << "There should only be one position slot per tuple"; |
| pos_slot_desc = slot_desc; |
| continue; |
| } |
| |
| RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(filename(), *node->element, |
| slot_desc, state_)); |
| |
| ParquetColumnReader* col_reader = ParquetColumnReader::Create( |
| *node, slot_desc->type().IsCollectionType(), slot_desc, this); |
| column_readers->push_back(col_reader); |
| |
| if (col_reader->IsCollectionReader()) { |
| // Recursively populate col_reader's children |
| DCHECK(slot_desc->collection_item_descriptor() != nullptr); |
| const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor(); |
| CollectionColumnReader* collection_reader = |
| static_cast<CollectionColumnReader*>(col_reader); |
| RETURN_IF_ERROR(CreateColumnReaders( |
| *item_tuple_desc, schema_resolver, collection_reader->children())); |
| } else { |
| scalar_reader_map_[node->col_idx] = static_cast<BaseScalarColumnReader*>( |
| col_reader); |
| } |
| } |
| |
| if (column_readers->empty()) { |
| // This is either a count(*) over a collection type (count(*) over the table is |
| // handled in ProcessFooter()), or no materialized columns appear in this file |
| // (e.g. due to schema evolution, or if there's only a position slot). Create a single |
| // column reader that we will use to count the number of tuples we should output. We |
| // will not read any values from this reader. |
| ParquetColumnReader* reader; |
| RETURN_IF_ERROR(CreateCountingReader( |
| tuple_desc.tuple_path(), schema_resolver, &reader)); |
| column_readers->push_back(reader); |
| } |
| |
| if (pos_slot_desc != nullptr) { |
| // 'tuple_desc' has a position slot. Use an existing column reader to populate it. |
| DCHECK(!column_readers->empty()); |
| (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path, |
| const ParquetSchemaResolver& schema_resolver, ParquetColumnReader** reader) { |
| SchemaNode* parent_node; |
| bool pos_field; |
| bool missing_field; |
| RETURN_IF_ERROR(schema_resolver.ResolvePath( |
| parent_path, &parent_node, &pos_field, &missing_field)); |
| |
| if (missing_field) { |
| // TODO: can we do anything else here? |
| return Status(Substitute("Could not find '$0' in file '$1'.", |
| PrintPath(*scan_node_->hdfs_table(), parent_path), filename())); |
| } |
| DCHECK(!pos_field); |
| DCHECK(parent_path.empty() || parent_node->is_repeated()); |
| |
| if (!parent_node->children.empty()) { |
| // Find a non-struct (i.e. collection or scalar) child of 'parent_node', which we will |
| // use to create the item reader |
| const SchemaNode* target_node = &parent_node->children[0]; |
| while (!target_node->children.empty() && !target_node->is_repeated()) { |
| target_node = &target_node->children[0]; |
| } |
| |
| *reader = ParquetColumnReader::Create( |
| *target_node, target_node->is_repeated(), nullptr, this); |
| if (target_node->is_repeated()) { |
| // Find the closest scalar descendant of 'target_node' via breadth-first search, and |
| // create scalar reader to drive 'reader'. We find the closest (i.e. least-nested) |
| // descendant as a heuristic for picking a descendant with fewer values, so it's |
| // faster to scan. |
| // TODO: use different heuristic than least-nested? Fewest values? |
| const SchemaNode* node = nullptr; |
| queue<const SchemaNode*> nodes; |
| nodes.push(target_node); |
| while (!nodes.empty()) { |
| node = nodes.front(); |
| nodes.pop(); |
| if (node->children.size() > 0) { |
| for (const SchemaNode& child: node->children) nodes.push(&child); |
| } else { |
| // node is the least-nested scalar descendant of 'target_node' |
| break; |
| } |
| } |
| DCHECK(node->children.empty()) << node->DebugString(); |
| CollectionColumnReader* parent_reader = |
| static_cast<CollectionColumnReader*>(*reader); |
| parent_reader->children()->push_back( |
| ParquetColumnReader::Create(*node, false, nullptr, this)); |
| } |
| } else { |
| // Special case for a repeated scalar node. The repeated node represents both the |
| // parent collection and the child item. |
| *reader = ParquetColumnReader::Create(*parent_node, false, nullptr, this); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void HdfsParquetScanner::InitCollectionColumns() { |
| for (CollectionColumnReader* col_reader: collection_readers_) { |
| col_reader->Reset(); |
| } |
| } |
| |
| Status HdfsParquetScanner::InitScalarColumns() { |
| int64_t partition_id = context_->partition_descriptor()->id(); |
| const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); |
| DCHECK(file_desc != nullptr); |
| parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; |
| |
| // Used to validate that the number of values in each reader in column_readers_ at the |
| // same SchemaElement is the same. |
| unordered_map<const parquet::SchemaElement*, int> num_values_map; |
| for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { |
| const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()]; |
| auto num_values_it = num_values_map.find(&scalar_reader->schema_element()); |
| int num_values = -1; |
| if (num_values_it != num_values_map.end()) { |
| num_values = num_values_it->second; |
| } else { |
| num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values; |
| } |
| if (num_values != -1 && col_chunk.meta_data.num_values != num_values) { |
| // TODO: improve this error message by saying which columns are different, |
| // and also specify column in other error messages as appropriate |
| return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(), |
| col_chunk.meta_data.num_values, num_values, filename()); |
| } |
| RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_)); |
| } |
| RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_)); |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::DivideReservationBetweenColumns( |
| const vector<BaseScalarColumnReader*>& column_readers) { |
| DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| const int64_t min_buffer_size = io_mgr->min_buffer_size(); |
| const int64_t max_buffer_size = io_mgr->max_buffer_size(); |
| // The HdfsScanNode reservation calculation in the planner ensures that we have |
| // reservation for at least one buffer per column. |
| if (context_->total_reservation() < min_buffer_size * column_readers.size()) { |
| return Status(TErrorCode::INTERNAL_ERROR, |
| Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at " |
| "least $1 bytes per column for $2 columns but had $3 bytes", |
| filename(), min_buffer_size, column_readers.size(), |
| context_->total_reservation())); |
| } |
| |
| vector<int64_t> col_range_lengths(column_readers.size()); |
| for (int i = 0; i < column_readers.size(); ++i) { |
| col_range_lengths[i] = column_readers[i]->scan_range()->bytes_to_read(); |
| } |
| |
| // The scanner-wide stream was used only to read the file footer. Each column has added |
| // its own stream. We can use the total reservation now that 'stream_''s resources have |
| // been released. We may benefit from increasing reservation further, so let's compute |
| // the ideal reservation to scan all the columns. |
| int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths); |
| if (ideal_reservation > context_->total_reservation()) { |
| context_->TryIncreaseReservation(ideal_reservation); |
| } |
| scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)-> |
| UpdateCounter(context_->total_reservation()); |
| scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)-> |
| UpdateCounter(ideal_reservation); |
| |
| vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper( |
| min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation()); |
| for (auto& tmp_reservation : tmp_reservations) { |
| column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second); |
| } |
| return Status::OK(); |
| } |
| |
| int64_t HdfsParquetScanner::ComputeIdealReservation( |
| const vector<int64_t>& col_range_lengths) { |
| DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| int64_t ideal_reservation = 0; |
| for (int64_t len : col_range_lengths) { |
| ideal_reservation += io_mgr->ComputeIdealBufferReservation(len); |
| } |
| return ideal_reservation; |
| } |
| |
| vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper( |
| int64_t min_buffer_size, int64_t max_buffer_size, |
| const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) { |
| // Pair of (column index, reservation allocated). |
| vector<pair<int, int64_t>> tmp_reservations; |
| for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0); |
| |
| // Sort in descending order of length, breaking ties by index so that larger columns |
| // get allocated reservation first. It is common to have dramatically different column |
| // sizes in a single file because of different value sizes and compressibility. E.g. |
| // consider a large STRING "comment" field versus a highly compressible |
| // dictionary-encoded column with only a few distinct values. We want to give max-sized |
| // buffers to large columns first to maximize the size of I/Os that we do while reading |
| // this row group. |
| sort(tmp_reservations.begin(), tmp_reservations.end(), |
| [&col_range_lengths]( |
| const pair<int, int64_t>& left, const pair<int, int64_t>& right) { |
| int64_t left_len = col_range_lengths[left.first]; |
| int64_t right_len = col_range_lengths[right.first]; |
| return left_len != right_len ? left_len > right_len : left.first < right.first; |
| }); |
| |
| // Set aside the minimum reservation per column. |
| reservation_to_distribute -= min_buffer_size * col_range_lengths.size(); |
| |
| // Allocate reservations to columns by repeatedly allocating either a max-sized buffer |
| // or a large enough buffer to fit the remaining data for each column. Do this |
| // round-robin up to the ideal number of I/O buffers. |
| for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) { |
| for (auto& tmp_reservation : tmp_reservations) { |
| // Add back the reservation we set aside above. |
| if (i == 0) reservation_to_distribute += min_buffer_size; |
| |
| int64_t bytes_left_in_range = |
| col_range_lengths[tmp_reservation.first] - tmp_reservation.second; |
| int64_t bytes_to_add; |
| if (bytes_left_in_range >= max_buffer_size) { |
| if (reservation_to_distribute >= max_buffer_size) { |
| bytes_to_add = max_buffer_size; |
| } else if (i == 0) { |
| DCHECK_EQ(0, tmp_reservation.second); |
| // Ensure this range gets at least one buffer on the first iteration. |
| bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute); |
| } else { |
| DCHECK_GT(tmp_reservation.second, 0); |
| // We need to read more than the max buffer size, but can't allocate a |
| // max-sized buffer. Stop adding buffers to this column: we prefer to use |
| // the existing max-sized buffers without small buffers mixed in so that |
| // we will alway do max-sized I/Os, which make efficient use of I/O devices. |
| bytes_to_add = 0; |
| } |
| } else if (bytes_left_in_range > 0 && |
| reservation_to_distribute >= min_buffer_size) { |
| // Choose a buffer size that will fit the rest of the bytes left in the range. |
| bytes_to_add = |
| max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range)); |
| // But don't add more reservation than is available. |
| bytes_to_add = |
| min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute)); |
| } else { |
| bytes_to_add = 0; |
| } |
| DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add; |
| reservation_to_distribute -= bytes_to_add; |
| tmp_reservation.second += bytes_to_add; |
| DCHECK_GE(reservation_to_distribute, 0); |
| DCHECK_GT(tmp_reservation.second, 0); |
| } |
| } |
| return tmp_reservations; |
| } |
| |
| Status HdfsParquetScanner::InitDictionaries( |
| const vector<BaseScalarColumnReader*>& column_readers) { |
| for (BaseScalarColumnReader* scalar_reader : column_readers) { |
| RETURN_IF_ERROR(scalar_reader->InitDictionary()); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsParquetScanner::ValidateEndOfRowGroup( |
| const vector<ParquetColumnReader*>& column_readers, int row_group_idx, |
| int64_t rows_read) { |
| DCHECK(!column_readers.empty()); |
| DCHECK(parse_status_.ok()) << "Don't overwrite parse_status_" |
| << parse_status_.GetDetail(); |
| |
| if (column_readers[0]->max_rep_level() == 0) { |
| if (candidate_ranges_.empty()) { |
| // These column readers materialize table-level values (vs. collection values). |
| // Test if the expected number of rows from the file metadata matches the actual |
| // number of rows read from the file. |
| int64_t expected_rows_in_group = file_metadata_.row_groups[row_group_idx].num_rows; |
| if (rows_read != expected_rows_in_group) { |
| return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, filename(), |
| row_group_idx, expected_rows_in_group, rows_read); |
| } |
| } else { |
| // In this case we filter out row ranges. Validate that the number of rows read |
| // matches the number of rows determined by the candidate row ranges. |
| int64_t expected_rows_in_group = 0; |
| for (auto& range : candidate_ranges_) { |
| expected_rows_in_group += range.last - range.first + 1; |
| } |
| if (rows_read != expected_rows_in_group) { |
| return Status(Substitute("Based on the page index of group $0($1) there are $2 ", |
| "rows need to be scanned, but $3 rows were read.", filename(), row_group_idx, |
| expected_rows_in_group, rows_read)); |
| } |
| } |
| } |
| |
| // Validate scalar column readers' state |
| int num_values_read = -1; |
| for (int c = 0; c < column_readers.size(); ++c) { |
| if (column_readers[c]->IsCollectionReader()) continue; |
| BaseScalarColumnReader* reader = |
| static_cast<BaseScalarColumnReader*>(column_readers[c]); |
| // All readers should have exhausted the final data page. This could fail if one |
| // column has more values than stated in the metadata, meaning the final data page |
| // will still have unread values. |
| if (reader->num_buffered_values_ != 0) { |
| return Status(Substitute("Corrupt Parquet metadata in file '$0': metadata reports " |
| "'$1' more values in data page than actually present", filename(), |
| reader->num_buffered_values_)); |
| } |
| // Sanity check that the num_values_read_ value is the same for all readers. All |
| // readers should have been advanced in lockstep (the above check is more likely to |
| // fail if this not the case though, since num_values_read_ is only updated at the end |
| // of a data page). |
| if (num_values_read == -1) num_values_read = reader->num_values_read_; |
| if (candidate_ranges_.empty()) DCHECK_EQ(reader->num_values_read_, num_values_read); |
| // ReadDataPage() uses metadata_->num_values to determine when the column's done |
| DCHECK(!candidate_ranges_.empty() || |
| (reader->num_values_read_ == reader->metadata_->num_values || |
| !state_->abort_on_error())); |
| } |
| return Status::OK(); |
| } |
| |
| ColumnStatsReader HdfsParquetScanner::CreateColumnStatsReader( |
| const parquet::ColumnChunk& col_chunk, const ColumnType& col_type, |
| const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element) { |
| ColumnStatsReader stats_reader(col_chunk, col_type, col_order, element); |
| if (col_type.IsTimestampType()) { |
| stats_reader.SetTimestampDecoder(CreateTimestampDecoder(element)); |
| } |
| return stats_reader; |
| } |
| |
| ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder( |
| const parquet::SchemaElement& element) { |
| bool timestamp_conversion_needed_for_int96_timestamps = |
| FLAGS_convert_legacy_hive_parquet_utc_timestamps && |
| file_version_.application == "parquet-mr"; |
| |
| return ParquetTimestampDecoder(element, &state_->local_time_zone(), |
| timestamp_conversion_needed_for_int96_timestamps); |
| } |
| |
| void HdfsParquetScanner::UpdateCompressedPageSizeCounter(int64_t compressed_page_size) { |
| parquet_compressed_page_size_counter_->UpdateCounter(compressed_page_size); |
| } |
| |
| void HdfsParquetScanner::UpdateUncompressedPageSizeCounter( |
| int64_t uncompressed_page_size) { |
| parquet_uncompressed_page_size_counter_->UpdateCounter(uncompressed_page_size); |
| } |
| |
| } |