| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/hdfs-columnar-scanner.h" |
| |
| #include <algorithm> |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "exec/hdfs-scan-node-base.h" |
| #include "exec/scratch-tuple-batch.h" |
| #include "runtime/collection-value-builder.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/fragment-state.h" |
| #include "runtime/io/disk-io-mgr.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "util/runtime-profile-counters.h" |
| |
| using namespace std; |
| using namespace strings; |
| |
| namespace impala { |
| |
| PROFILE_DEFINE_COUNTER( |
| NumColumns, STABLE_LOW, TUnit::UNIT, "Number of columns that need to be read."); |
| PROFILE_DEFINE_COUNTER(NumScannersWithNoReads, STABLE_LOW, TUnit::UNIT, |
| "Number of scanners that end up doing no reads because their splits don't overlap " |
| "with the midpoint of any row-group/stripe in the file."); |
| PROFILE_DEFINE_SUMMARY_STATS_TIMER(FooterProcessingTime, STABLE_LOW, |
| "Average and min/max time spent processing the footer by each split."); |
| PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ColumnarScannerIdealReservation, DEBUG, TUnit::BYTES, |
| "Tracks stats about the ideal reservation for a scanning a row group (parquet) or " |
| "stripe (orc). The ideal reservation is calculated based on min and max buffer " |
| "size."); |
| PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ColumnarScannerActualReservation, DEBUG, |
| TUnit::BYTES, |
| "Tracks stats about the actual reservation for a scanning a row group " |
| "(parquet) or stripe (orc)."); |
| PROFILE_DEFINE_COUNTER(IoReadSyncRequest, DEBUG, TUnit::UNIT, |
| "Number of stream read request done in synchronized manner."); |
| PROFILE_DEFINE_COUNTER(IoReadAsyncRequest, DEBUG, TUnit::UNIT, |
| "Number of stream read request done in asynchronized manner."); |
| PROFILE_DEFINE_COUNTER( |
| IoReadTotalRequest, DEBUG, TUnit::UNIT, "Total number of stream read request."); |
| PROFILE_DEFINE_COUNTER(IoReadSyncBytes, DEBUG, TUnit::BYTES, |
| "The total number of bytes read from streams in synchronized manner."); |
| PROFILE_DEFINE_COUNTER(IoReadAsyncBytes, DEBUG, TUnit::BYTES, |
| "The total number of bytes read from streams in asynchronized manner."); |
| PROFILE_DEFINE_COUNTER(IoReadTotalBytes, DEBUG, TUnit::BYTES, |
| "The total number of bytes read from streams."); |
| PROFILE_DEFINE_COUNTER(IoReadSkippedBytes, DEBUG, TUnit::BYTES, |
| "The total number of bytes skipped from streams."); |
| PROFILE_DEFINE_COUNTER(NumFileMetadataRead, DEBUG, TUnit::UNIT, |
| "The total number of file metadata reads done in place of rows or row groups / " |
| "stripe iteration."); |
| PROFILE_DEFINE_SUMMARY_STATS_TIMER(ScratchBatchMemAllocDuration, DEBUG, |
| "Stats of time spent in malloc() used by MemPools of the scratch batch. These are " |
| "part of MaterializeTupleTime. If the sum accounts for a significant portion of it, " |
| "tuple materialization is dominated by memory allocation time."); |
| PROFILE_DEFINE_SUMMARY_STATS_TIMER(ScratchBatchMemFreeDuration, DEBUG, |
| "Stats of time spent in free() used by MemPools of the scratch batch. Same as " |
| "ScratchBatchMemAllocDuration, these are part of MaterializeTupleTime."); |
| PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ScratchBatchMemAllocBytes, DEBUG, TUnit::BYTES, |
| "Stats of bytes allocated by MemPools of the scratch batch."); |
| PROFILE_DEFINE_TIMER(MaterializeCollectionGetMemTime, UNSTABLE, "Wall clock time spent " |
| "in getting/allocating collection memory, including the memcpy duration when doubling" |
| "the tuple buffer. It's part of the time spent in materializing collections. Thus " |
| "it's also part of MaterializeTupleTime. The memory allocation time in materializing" |
| " collections is also tracked in ScratchBatchMemAllocDuration."); |
| |
| const char* HdfsColumnarScanner::LLVM_CLASS_NAME = "class.impala::HdfsColumnarScanner"; |
| |
| HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node, |
| RuntimeState* state) : |
| HdfsScanner(scan_node, state), |
| scratch_batch_(new ScratchTupleBatch( |
| *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())) { |
| } |
| |
| HdfsColumnarScanner::~HdfsColumnarScanner() {} |
| |
| Status HdfsColumnarScanner::Open(ScannerContext* context) { |
| RETURN_IF_ERROR(HdfsScanner::Open(context)); |
| // Memorize 'is_footer_scanner_' here since 'stream_' can be released early. |
| const io::ScanRange* range = stream_->scan_range(); |
| is_footer_scanner_ = |
| range->offset() + range->bytes_to_read() >= stream_->file_desc()->file_length; |
| |
| RuntimeProfile* profile = scan_node_->runtime_profile(); |
| num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile); |
| num_scanners_with_no_reads_counter_ = |
| PROFILE_NumScannersWithNoReads.Instantiate(profile); |
| process_footer_timer_stats_ = PROFILE_FooterProcessingTime.Instantiate(profile); |
| columnar_scanner_ideal_reservation_counter_ = |
| PROFILE_ColumnarScannerIdealReservation.Instantiate(profile); |
| columnar_scanner_actual_reservation_counter_ = |
| PROFILE_ColumnarScannerActualReservation.Instantiate(profile); |
| |
| io_sync_request_ = PROFILE_IoReadSyncRequest.Instantiate(profile); |
| io_sync_bytes_ = PROFILE_IoReadSyncBytes.Instantiate(profile); |
| io_async_request_ = PROFILE_IoReadAsyncRequest.Instantiate(profile); |
| io_async_bytes_ = PROFILE_IoReadAsyncBytes.Instantiate(profile); |
| io_total_request_ = PROFILE_IoReadTotalRequest.Instantiate(profile); |
| io_total_bytes_ = PROFILE_IoReadTotalBytes.Instantiate(profile); |
| io_skipped_bytes_ = PROFILE_IoReadSkippedBytes.Instantiate(profile); |
| num_file_metadata_read_ = PROFILE_NumFileMetadataRead.Instantiate(profile); |
| scratch_mem_alloc_duration_ = PROFILE_ScratchBatchMemAllocDuration.Instantiate(profile); |
| scratch_mem_free_duration_ = PROFILE_ScratchBatchMemFreeDuration.Instantiate(profile); |
| scratch_mem_alloc_bytes_ = PROFILE_ScratchBatchMemAllocBytes.Instantiate(profile); |
| // Only add this counter when there are array/map slots. |
| if (!scan_node_->tuple_desc()->collection_slots().empty()) { |
| get_collection_mem_timer_ = |
| PROFILE_MaterializeCollectionGetMemTime.Instantiate(profile); |
| } |
| return Status::OK(); |
| } |
| |
| int HdfsColumnarScanner::FilterScratchBatch(RowBatch* dst_batch) { |
| // This function must not be called when the output batch is already full. As long as |
| // we always call CommitRows() after TransferScratchTuples(), the output batch can |
| // never be empty. |
| DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity()); |
| DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1); |
| if (scratch_batch_->tuple_byte_size == 0) { |
| Tuple** output_row = |
| reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows())); |
| // We are materializing a collection with empty tuples. Add a NULL tuple to the |
| // output batch per remaining scratch tuple and return. No need to evaluate |
| // filters/conjuncts. |
| DCHECK(filter_ctxs_.empty()); |
| DCHECK(conjunct_evals_->empty()); |
| int num_tuples = std::min(dst_batch->capacity() - dst_batch->num_rows(), |
| scratch_batch_->num_tuples - scratch_batch_->tuple_idx); |
| memset(output_row, 0, num_tuples * sizeof(Tuple*)); |
| scratch_batch_->tuple_idx += num_tuples; |
| // No data is required to back the empty tuples, so we should not attach any data to |
| // these batches. |
| DCHECK_EQ(0, scratch_batch_->total_allocated_bytes()); |
| return num_tuples; |
| } |
| return ProcessScratchBatchCodegenOrInterpret(dst_batch); |
| } |
| |
| int HdfsColumnarScanner::TransferScratchTuples(RowBatch* dst_batch) { |
| const int num_rows_to_commit = FilterScratchBatch(dst_batch); |
| if (scratch_batch_->tuple_byte_size != 0) { |
| scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit); |
| } |
| return num_rows_to_commit; |
| } |
| |
| Status HdfsColumnarScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state, |
| llvm::Function** process_scratch_batch_fn) { |
| DCHECK(state->ShouldCodegen()); |
| *process_scratch_batch_fn = nullptr; |
| LlvmCodeGen* codegen = state->codegen(); |
| DCHECK(codegen != nullptr); |
| |
| llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true); |
| DCHECK(fn != nullptr); |
| |
| llvm::Function* eval_conjuncts_fn; |
| const vector<ScalarExpr*>& conjuncts = node->conjuncts_; |
| RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn)); |
| DCHECK(eval_conjuncts_fn != nullptr); |
| |
| int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| llvm::Function* eval_runtime_filters_fn; |
| RETURN_IF_ERROR(CodegenEvalRuntimeFilters( |
| codegen, node->runtime_filter_exprs_, &eval_runtime_filters_fn)); |
| DCHECK(eval_runtime_filters_fn != nullptr); |
| |
| replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| fn->setName("ProcessScratchBatch"); |
| *process_scratch_batch_fn = codegen->FinalizeFunction(fn); |
| if (*process_scratch_batch_fn == nullptr) { |
| return Status("Failed to finalize process_scratch_batch_fn."); |
| } |
| return Status::OK(); |
| } |
| |
| int HdfsColumnarScanner::ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_batch) { |
| return CallCodegendOrInterpreted<ProcessScratchBatchFn>::invoke(this, |
| codegend_process_scratch_batch_fn_, &HdfsColumnarScanner::ProcessScratchBatch, |
| dst_batch); |
| } |
| |
| HdfsColumnarScanner::ColumnReservations |
| HdfsColumnarScanner::DivideReservationBetweenColumnsHelper(int64_t min_buffer_size, |
| int64_t max_buffer_size, const ColumnRangeLengths& col_range_lengths, |
| int64_t reservation_to_distribute) { |
| // Pair of (column index, reservation allocated). |
| ColumnReservations tmp_reservations; |
| tmp_reservations.reserve(col_range_lengths.size()); |
| for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0); |
| |
| // Sort in descending order of length, breaking ties by index so that larger columns |
| // get allocated reservation first. It is common to have dramatically different column |
| // sizes in a single file because of different value sizes and compressibility. E.g. |
| // consider a large STRING "comment" field versus a highly compressible |
| // dictionary-encoded column with only a few distinct values. We want to give max-sized |
| // buffers to large columns first to maximize the size of I/Os that we do while reading |
| // this row group. |
| sort(tmp_reservations.begin(), tmp_reservations.end(), |
| [&col_range_lengths]( |
| const pair<int, int64_t>& left, const pair<int, int64_t>& right) { |
| int64_t left_len = col_range_lengths[left.first]; |
| int64_t right_len = col_range_lengths[right.first]; |
| return (left_len != right_len) ? (left_len > right_len) : |
| (left.first < right.first); |
| }); |
| |
| // Set aside the minimum reservation per column. |
| reservation_to_distribute -= min_buffer_size * col_range_lengths.size(); |
| |
| // Allocate reservations to columns by repeatedly allocating either a max-sized buffer |
| // or a large enough buffer to fit the remaining data for each column. Do this |
| // round-robin up to the ideal number of I/O buffers. |
| for (int i = 0; i < io::DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) { |
| for (auto& tmp_reservation : tmp_reservations) { |
| // Add back the reservation we set aside above. |
| if (i == 0) reservation_to_distribute += min_buffer_size; |
| |
| int64_t bytes_left_in_range = |
| col_range_lengths[tmp_reservation.first] - tmp_reservation.second; |
| int64_t bytes_to_add; |
| if (bytes_left_in_range >= max_buffer_size) { |
| if (reservation_to_distribute >= max_buffer_size) { |
| bytes_to_add = max_buffer_size; |
| } else if (i == 0) { |
| DCHECK_EQ(0, tmp_reservation.second); |
| // Ensure this range gets at least one buffer on the first iteration. |
| bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute); |
| } else { |
| DCHECK_GT(tmp_reservation.second, 0); |
| // We need to read more than the max buffer size, but can't allocate a |
| // max-sized buffer. Stop adding buffers to this column: we prefer to use |
| // the existing max-sized buffers without small buffers mixed in so that |
| // we will alway do max-sized I/Os, which make efficient use of I/O devices. |
| bytes_to_add = 0; |
| } |
| } else if (bytes_left_in_range > 0 |
| && reservation_to_distribute >= min_buffer_size) { |
| // Choose a buffer size that will fit the rest of the bytes left in the range. |
| bytes_to_add = |
| max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range)); |
| // But don't add more reservation than is available. |
| bytes_to_add = |
| min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute)); |
| } else { |
| bytes_to_add = 0; |
| } |
| DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add; |
| reservation_to_distribute -= bytes_to_add; |
| tmp_reservation.second += bytes_to_add; |
| |
| DCHECK_GE(reservation_to_distribute, 0); |
| DCHECK_GT(tmp_reservation.second, 0); |
| } |
| } |
| return tmp_reservations; |
| } |
| |
| int64_t HdfsColumnarScanner::ComputeIdealReservation( |
| const ColumnRangeLengths& col_range_lengths) { |
| io::DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| int64_t ideal_reservation = 0; |
| for (int64_t len : col_range_lengths) { |
| ideal_reservation += io_mgr->ComputeIdealBufferReservation(len); |
| } |
| return ideal_reservation; |
| } |
| |
| Status HdfsColumnarScanner::DivideReservationBetweenColumns( |
| const ColumnRangeLengths& col_range_lengths, |
| ColumnReservations& reservation_per_column) { |
| io::DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| const int64_t min_buffer_size = io_mgr->min_buffer_size(); |
| const int64_t max_buffer_size = io_mgr->max_buffer_size(); |
| // The HdfsScanNode reservation calculation in the planner ensures that we have |
| // reservation for at least one buffer per column. |
| if (context_->total_reservation() < min_buffer_size * col_range_lengths.size()) { |
| return Status(TErrorCode::INTERNAL_ERROR, |
| Substitute("Not enough reservation in columnar scanner for file '$0'. " |
| "Need at least $1 bytes per column for $2 columns but had $3 bytes", |
| filename(), min_buffer_size, col_range_lengths.size(), |
| context_->total_reservation())); |
| } |
| |
| // The scanner-wide stream was used only to read the file footer. Each column has added |
| // its own stream. We can use the total reservation now that 'stream_''s resources have |
| // been released. We may benefit from increasing reservation further, so let's compute |
| // the ideal reservation to scan all the columns. |
| int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths); |
| if (ideal_reservation > context_->total_reservation()) { |
| context_->TryIncreaseReservation(ideal_reservation); |
| } |
| columnar_scanner_actual_reservation_counter_->UpdateCounter( |
| context_->total_reservation()); |
| columnar_scanner_ideal_reservation_counter_->UpdateCounter(ideal_reservation); |
| |
| reservation_per_column = DivideReservationBetweenColumnsHelper( |
| min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation()); |
| return Status::OK(); |
| } |
| |
| void HdfsColumnarScanner::AddSyncReadBytesCounter(int64_t total_bytes) { |
| io_sync_request_->Add(1); |
| io_total_request_->Add(1); |
| io_sync_bytes_->Add(total_bytes); |
| io_total_bytes_->Add(total_bytes); |
| } |
| |
| void HdfsColumnarScanner::AddAsyncReadBytesCounter(int64_t total_bytes) { |
| io_async_request_->Add(1); |
| io_total_request_->Add(1); |
| io_async_bytes_->Add(total_bytes); |
| io_total_bytes_->Add(total_bytes); |
| } |
| |
| void HdfsColumnarScanner::AddSkippedReadBytesCounter(int64_t total_bytes) { |
| io_skipped_bytes_->Add(total_bytes); |
| } |
| |
| void HdfsColumnarScanner::CloseInternal() { |
| // Only aggregate the counters at the end to reduce the contention between scanner |
| // threads. |
| MemPoolCounters counters = scratch_batch_->tuple_mem_pool.GetMemPoolCounters(); |
| scratch_mem_alloc_duration_->Merge(counters.sys_alloc_duration); |
| scratch_mem_free_duration_->Merge(counters.sys_free_duration); |
| scratch_mem_alloc_bytes_->Merge(counters.allocated_bytes); |
| // Merge counters of aux_mem_pool |
| counters = scratch_batch_->aux_mem_pool.GetMemPoolCounters(); |
| scratch_mem_alloc_duration_->Merge(counters.sys_alloc_duration); |
| scratch_mem_free_duration_->Merge(counters.sys_free_duration); |
| scratch_mem_alloc_bytes_->Merge(counters.allocated_bytes); |
| HdfsScanner::CloseInternal(); |
| } |
| |
| Status HdfsColumnarScanner::GetCollectionMemory(CollectionValueBuilder* builder, |
| MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) { |
| SCOPED_TIMER(get_collection_mem_timer_); |
| int num_tuples; |
| *pool = builder->pool(); |
| RETURN_IF_ERROR(builder->GetFreeMemory(tuple_mem, &num_tuples)); |
| // Treat tuple as a single-tuple row |
| *tuple_row_mem = reinterpret_cast<TupleRow*>(tuple_mem); |
| *num_rows = num_tuples; |
| return Status::OK(); |
| } |
| } |