| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/hdfs-scan-node-base.h" |
| |
| #include "exec/base-sequence-scanner.h" |
| #include "exec/hdfs-avro-scanner.h" |
| #include "exec/hdfs-orc-scanner.h" |
| #include "exec/hdfs-plugin-text-scanner.h" |
| #include "exec/hdfs-rcfile-scanner.h" |
| #include "exec/hdfs-sequence-scanner.h" |
| #include "exec/hdfs-text-scanner.h" |
| #include "exec/parquet/hdfs-parquet-scanner.h" |
| |
| #include <avro/errors.h> |
| #include <avro/schema.h> |
| #include <boost/filesystem.hpp> |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "common/logging.h" |
| #include "common/object-pool.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "exprs/scalar-expr.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/io/disk-io-mgr.h" |
| #include "runtime/io/request-context.h" |
| #include "runtime/runtime-filter.inline.h" |
| #include "runtime/runtime-state.h" |
| #include "util/disk-info.h" |
| #include "util/hdfs-util.h" |
| #include "util/impalad-metrics.h" |
| #include "util/metrics.h" |
| #include "util/periodic-counter-updater.h" |
| #include "util/pretty-printer.h" |
| #include "util/scope-exit-trigger.h" |
| |
| #include "common/names.h" |
| |
| #ifndef NDEBUG |
| DECLARE_bool(skip_file_runtime_filtering); |
| #endif |
| |
| DEFINE_bool(always_use_data_cache, false, "(Advanced) Always uses the IO data cache " |
| "for all reads, regardless of whether the read is local or remote. By default, the " |
| "IO data cache is only used if the data is expected to be remote. Used by tests."); |
| |
| namespace filesystem = boost::filesystem; |
| using namespace impala; |
| using namespace impala::io; |
| using namespace strings; |
| |
| const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC = |
| "Hdfs split stats (<volume id>:<# splits>/<split lengths>)"; |
| |
| // Determines how many unexpected remote bytes trigger an error in the runtime state |
| const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024; |
| |
| HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, |
| const DescriptorTbl& descs) |
| : ScanNode(pool, tnode, descs), |
| min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ? |
| tnode.hdfs_scan_node.min_max_tuple_id : -1), |
| skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? |
| tnode.hdfs_scan_node.skip_header_line_count : 0), |
| tuple_id_(tnode.hdfs_scan_node.tuple_id), |
| parquet_count_star_slot_offset_( |
| tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset ? |
| tnode.hdfs_scan_node.parquet_count_star_slot_offset : -1), |
| tuple_desc_(descs.GetTupleDescriptor(tuple_id_)), |
| thrift_dict_filter_conjuncts_map_( |
| tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ? |
| &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr), |
| disks_accessed_bitmap_(TUnit::UNIT, 0), |
| active_hdfs_read_thread_counter_(TUnit::UNIT, 0) { |
| } |
| |
| HdfsScanNodeBase::~HdfsScanNodeBase() { |
| } |
| |
| Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { |
| RETURN_IF_ERROR(ScanNode::Init(tnode, state)); |
| |
| // Add collection item conjuncts |
| for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) { |
| TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first); |
| RowDescriptor* collection_row_desc = state->obj_pool()->Add( |
| new RowDescriptor(tuple_desc, /* is_nullable */ false)); |
| DCHECK(conjuncts_map_.find(entry.first) == conjuncts_map_.end()); |
| RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, state, |
| &conjuncts_map_[entry.first])); |
| } |
| DCHECK(conjuncts_map_[tuple_id_].empty()); |
| conjuncts_map_[tuple_id_] = conjuncts_; |
| |
| // Add min max conjuncts |
| if (min_max_tuple_id_ != -1) { |
| min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_); |
| DCHECK(min_max_tuple_desc_ != nullptr); |
| RowDescriptor* min_max_row_desc = state->obj_pool()->Add( |
| new RowDescriptor(min_max_tuple_desc_, /* is_nullable */ false)); |
| RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts, |
| *min_max_row_desc, state, &min_max_conjuncts_)); |
| } |
| return Status::OK(); |
| } |
| |
| /// TODO: Break up this very long function. |
| Status HdfsScanNodeBase::Prepare(RuntimeState* state) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| RETURN_IF_ERROR(ScanNode::Prepare(state)); |
| AddBytesReadCounters(); |
| |
| // Prepare collection conjuncts |
| for (const auto& entry: conjuncts_map_) { |
| TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first); |
| // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again |
| if (tuple_desc == tuple_desc_) { |
| conjunct_evals_map_[entry.first] = conjunct_evals(); |
| } else { |
| DCHECK(conjunct_evals_map_[entry.first].empty()); |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(entry.second, state, pool_, |
| expr_perm_pool(), expr_results_pool(), &conjunct_evals_map_[entry.first])); |
| } |
| } |
| |
| // Prepare min max statistics conjuncts. |
| if (min_max_tuple_id_ != -1) { |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_, |
| expr_perm_pool(), expr_results_pool(), &min_max_conjunct_evals_)); |
| } |
| |
| // One-time initialization of state that is constant across scan ranges |
| DCHECK(tuple_desc_->table_desc() != NULL); |
| hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc()); |
| scan_node_pool_.reset(new MemPool(mem_tracker())); |
| |
| // Parse Avro table schema if applicable |
| const string& avro_schema_str = hdfs_table_->avro_schema(); |
| if (!avro_schema_str.empty()) { |
| avro_schema_t avro_schema; |
| int error = avro_schema_from_json_length( |
| avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema); |
| if (error != 0) { |
| return Status(Substitute("Failed to parse table schema: $0", avro_strerror())); |
| } |
| RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get())); |
| } |
| |
| // Gather materialized partition-key slots and non-partition slots. |
| const vector<SlotDescriptor*>& slots = tuple_desc_->slots(); |
| for (size_t i = 0; i < slots.size(); ++i) { |
| if (hdfs_table_->IsClusteringCol(slots[i])) { |
| partition_key_slots_.push_back(slots[i]); |
| } else { |
| materialized_slots_.push_back(slots[i]); |
| } |
| } |
| |
| // Order the materialized slots such that for schemaless file formats (e.g. text) the |
| // order corresponds to the physical order in files. For formats where the file schema |
| // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary. |
| sort(materialized_slots_.begin(), materialized_slots_.end(), |
| SlotDescriptor::ColPathLessThan); |
| |
| // Populate mapping from slot path to index into materialized_slots_. |
| for (int i = 0; i < materialized_slots_.size(); ++i) { |
| path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i; |
| } |
| |
| // Initialize is_materialized_col_ |
| is_materialized_col_.resize(hdfs_table_->num_cols()); |
| for (int i = 0; i < hdfs_table_->num_cols(); ++i) { |
| is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN; |
| } |
| |
| HdfsFsCache::HdfsFsMap fs_cache; |
| // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate |
| // partition_ids_, file_descs_, and per_type_files_. |
| DCHECK(scan_range_params_ != NULL) |
| << "Must call SetScanRanges() before calling Prepare()"; |
| int num_ranges_missing_volume_id = 0; |
| for (const TScanRangeParams& params: *scan_range_params_) { |
| DCHECK(params.scan_range.__isset.hdfs_file_split); |
| const THdfsFileSplit& split = params.scan_range.hdfs_file_split; |
| partition_ids_.insert(split.partition_id); |
| HdfsPartitionDescriptor* partition_desc = |
| hdfs_table_->GetPartition(split.partition_id); |
| if (partition_desc == NULL) { |
| // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702. |
| LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id() |
| << " partition_id=" << split.partition_id |
| << "\n" << PrintThrift(state->instance_ctx()); |
| return Status("Query encountered invalid metadata, likely due to IMPALA-1702." |
| " Try rerunning the query."); |
| } |
| |
| filesystem::path file_path(partition_desc->location()); |
| file_path.append(split.relative_path, filesystem::path::codecvt()); |
| const string& native_file_path = file_path.native(); |
| |
| auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path); |
| HdfsFileDesc* file_desc = NULL; |
| FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key); |
| if (file_desc_it == file_descs_.end()) { |
| // Add new file_desc to file_descs_ and per_type_files_ |
| file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path)); |
| file_descs_[file_desc_map_key] = file_desc; |
| file_desc->file_length = split.file_length; |
| file_desc->mtime = split.mtime; |
| file_desc->file_compression = split.file_compression; |
| file_desc->is_erasure_coded = split.is_erasure_coded; |
| RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( |
| native_file_path, &file_desc->fs, &fs_cache)); |
| per_type_files_[partition_desc->file_format()].push_back(file_desc); |
| } else { |
| // File already processed |
| file_desc = file_desc_it->second; |
| } |
| |
| bool expected_local = params.__isset.is_remote && !params.is_remote; |
| if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id; |
| |
| int cache_options = BufferOpts::NO_CACHING; |
| if (params.__isset.try_hdfs_cache && params.try_hdfs_cache) { |
| cache_options |= BufferOpts::USE_HDFS_CACHE; |
| } |
| if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) { |
| cache_options |= BufferOpts::USE_DATA_CACHE; |
| } |
| file_desc->splits.push_back( |
| AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length, |
| split.offset, split.partition_id, params.volume_id, expected_local, |
| file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options))); |
| } |
| |
| // Update server wide metrics for number of scan ranges and ranges that have |
| // incomplete metadata. |
| ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size()); |
| ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id); |
| |
| // Check if reservation was enough to allocate at least one buffer. The |
| // reservation calculation in HdfsScanNode.java should guarantee this. |
| // Hitting this error indicates a misconfiguration or bug. |
| int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size(); |
| if (scan_range_params_->size() > 0 |
| && resource_profile_.min_reservation < min_buffer_size) { |
| return Status(TErrorCode::INTERNAL_ERROR, |
| Substitute("HDFS scan min reservation $0 must be >= min buffer size $1", |
| resource_profile_.min_reservation, min_buffer_size)); |
| } |
| // Add per volume stats to the runtime profile |
| PerVolumeStats per_volume_stats; |
| stringstream str; |
| UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats); |
| PrintHdfsSplitStats(per_volume_stats, &str); |
| runtime_profile()->AddInfoString("Table Name", hdfs_table_->fully_qualified_name()); |
| runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str()); |
| state->CheckAndAddCodegenDisabledMessage(runtime_profile()); |
| return Status::OK(); |
| } |
| |
| void HdfsScanNodeBase::Codegen(RuntimeState* state) { |
| DCHECK(state->ShouldCodegen()); |
| ExecNode::Codegen(state); |
| if (IsNodeCodegenDisabled()) return; |
| |
| // Create codegen'd functions |
| for (int format = THdfsFileFormat::TEXT; format <= THdfsFileFormat::PARQUET; ++format) { |
| vector<HdfsFileDesc*>& file_descs = |
| per_type_files_[static_cast<THdfsFileFormat::type>(format)]; |
| |
| if (file_descs.empty()) continue; |
| |
| // Randomize the order this node processes the files. We want to do this to avoid |
| // issuing remote reads to the same DN from different impalads. In file formats such |
| // as avro/seq/rc (i.e. splittable with a header), every node first reads the header. |
| // If every node goes through the files in the same order, all the remote reads are |
| // for the same file meaning a few DN serves a lot of remote reads at the same time. |
| random_shuffle(file_descs.begin(), file_descs.end()); |
| |
| // Create reusable codegen'd functions for each file type type needed |
| // TODO: do this for conjuncts_map_ |
| llvm::Function* fn; |
| Status status; |
| switch (format) { |
| case THdfsFileFormat::TEXT: |
| status = HdfsTextScanner::Codegen(this, conjuncts_, &fn); |
| break; |
| case THdfsFileFormat::SEQUENCE_FILE: |
| status = HdfsSequenceScanner::Codegen(this, conjuncts_, &fn); |
| break; |
| case THdfsFileFormat::AVRO: |
| status = HdfsAvroScanner::Codegen(this, conjuncts_, &fn); |
| break; |
| case THdfsFileFormat::PARQUET: |
| status = HdfsParquetScanner::Codegen(this, conjuncts_, &fn); |
| break; |
| default: |
| // No codegen for this format |
| fn = NULL; |
| status = Status::Expected("Not implemented for this format."); |
| } |
| DCHECK(fn != NULL || !status.ok()); |
| const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second; |
| if (status.ok()) { |
| LlvmCodeGen* codegen = state->codegen(); |
| DCHECK(codegen != NULL); |
| codegen->AddFunctionToJit(fn, |
| &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]); |
| } |
| runtime_profile()->AddCodegenMsg(status.ok(), status, format_name); |
| } |
| } |
| |
| Status HdfsScanNodeBase::Open(RuntimeState* state) { |
| RETURN_IF_ERROR(ScanNode::Open(state)); |
| |
| // Open collection conjuncts |
| for (auto& entry: conjunct_evals_map_) { |
| // conjuncts_ are already opened in ExecNode::Open() |
| if (entry.first == tuple_id_) continue; |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open(entry.second, state)); |
| } |
| |
| // Open min max conjuncts |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state)); |
| |
| // Create template tuples for all partitions. |
| for (int64_t partition_id: partition_ids_) { |
| HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id); |
| DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id() |
| << " partition_id=" << partition_id |
| << "\n" << PrintThrift(state->instance_ctx()); |
| partition_template_tuple_map_[partition_id] = InitTemplateTuple( |
| partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state); |
| } |
| |
| RETURN_IF_ERROR(ClaimBufferReservation(state)); |
| reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext(); |
| |
| // Initialize HdfsScanNode specific counters |
| hdfs_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER); |
| hdfs_open_file_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_OPEN_FILE_TIMER); |
| per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter( |
| PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND, |
| bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, |
| hdfs_read_timer_)); |
| scan_ranges_complete_counter_ = |
| ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT); |
| collection_items_read_counter_ = |
| ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT); |
| if (DiskInfo::num_disks() < 64) { |
| num_disks_accessed_counter_ = |
| ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT); |
| } else { |
| num_disks_accessed_counter_ = NULL; |
| } |
| |
| data_cache_hit_count_ = ADD_COUNTER(runtime_profile(), |
| "DataCacheHitCount", TUnit::UNIT); |
| data_cache_partial_hit_count_ = ADD_COUNTER(runtime_profile(), |
| "DataCachePartialHitCount", TUnit::UNIT); |
| data_cache_miss_count_ = ADD_COUNTER(runtime_profile(), |
| "DataCacheMissCount", TUnit::UNIT); |
| data_cache_hit_bytes_ = ADD_COUNTER(runtime_profile(), |
| "DataCacheHitBytes", TUnit::BYTES); |
| data_cache_miss_bytes_ = ADD_COUNTER(runtime_profile(), |
| "DataCacheMissBytes", TUnit::BYTES); |
| |
| reader_context_->set_bytes_read_counter(bytes_read_counter()); |
| reader_context_->set_read_timer(hdfs_read_timer_); |
| reader_context_->set_open_file_timer(hdfs_open_file_timer_); |
| reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_); |
| reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_); |
| reader_context_->set_data_cache_hit_counter(data_cache_hit_count_); |
| reader_context_->set_data_cache_partial_hit_counter(data_cache_partial_hit_count_); |
| reader_context_->set_data_cache_miss_counter(data_cache_miss_count_); |
| reader_context_->set_data_cache_hit_bytes_counter(data_cache_hit_bytes_); |
| reader_context_->set_data_cache_miss_bytes_counter(data_cache_miss_bytes_); |
| |
| average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter( |
| AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_); |
| |
| initial_range_ideal_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(), |
| "InitialRangeIdealReservation", TUnit::BYTES); |
| initial_range_actual_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(), |
| "InitialRangeActualReservation", TUnit::BYTES); |
| |
| uncompressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER( |
| runtime_profile(), "ParquetUncompressedBytesReadPerColumn", TUnit::BYTES); |
| compressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER( |
| runtime_profile(), "ParquetCompressedBytesReadPerColumn", TUnit::BYTES); |
| |
| bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal", |
| TUnit::BYTES); |
| bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit", |
| TUnit::BYTES); |
| bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache", |
| TUnit::BYTES); |
| num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges", |
| TUnit::UNIT); |
| unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected", |
| TUnit::BYTES); |
| cached_file_handles_hit_count_ = ADD_COUNTER(runtime_profile(), |
| "CachedFileHandlesHitCount", TUnit::UNIT); |
| cached_file_handles_miss_count_ = ADD_COUNTER(runtime_profile(), |
| "CachedFileHandlesMissCount", TUnit::UNIT); |
| |
| max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter( |
| "MaxCompressedTextFileLength", TUnit::BYTES); |
| |
| scanner_io_wait_time_ = ADD_TIMER(runtime_profile(), "ScannerIoWaitTime"); |
| hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters( |
| &active_hdfs_read_thread_counter_, |
| ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1); |
| |
| counters_running_ = true; |
| |
| int64_t total_splits = 0; |
| for (const auto& fd: file_descs_) total_splits += fd.second->splits.size(); |
| progress_.Init(Substitute("Splits complete (node=$0)", id_), total_splits); |
| return Status::OK(); |
| } |
| |
| Status HdfsScanNodeBase::Reset(RuntimeState* state, RowBatch* row_batch) { |
| DCHECK(false) << "Internal error: Scan nodes should not appear in subplans."; |
| return Status("Internal error: Scan nodes should not appear in subplans."); |
| } |
| |
| void HdfsScanNodeBase::Close(RuntimeState* state) { |
| if (is_closed()) return; |
| |
| if (reader_context_ != nullptr) { |
| // Need to wait for all the active scanner threads to finish to ensure there is no |
| // more memory tracked by this scan node's mem tracker. |
| ExecEnv::GetInstance()->disk_io_mgr()->UnregisterContext(reader_context_.get()); |
| } |
| |
| StopAndFinalizeCounters(); |
| |
| // There should be no active hdfs read threads. |
| DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0); |
| |
| if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll(); |
| |
| // Close collection conjuncts |
| for (auto& tid_conjunct: conjuncts_map_) { |
| // conjuncts_ are already closed in ExecNode::Close() |
| if (tid_conjunct.first == tuple_id_) continue; |
| ScalarExprEvaluator::Close(conjunct_evals_map_[tid_conjunct.first], state); |
| ScalarExpr::Close(tid_conjunct.second); |
| } |
| |
| // Close min max conjunct |
| ScalarExprEvaluator::Close(min_max_conjunct_evals_, state); |
| ScalarExpr::Close(min_max_conjuncts_); |
| ScanNode::Close(state); |
| } |
| |
| Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) { |
| DCHECK(!initial_ranges_issued_); |
| initial_ranges_issued_ = true; |
| // We want to decrement this remaining_scan_range_submissions in all cases. |
| auto remaining_scan_range_submissions_trigger = |
| MakeScopeExitTrigger([&](){ UpdateRemainingScanRangeSubmissions(-1); }); |
| |
| // No need to issue ranges with limit 0. |
| if (ReachedLimitShared()) { |
| DCHECK_EQ(limit_, 0); |
| return Status::OK(); |
| } |
| |
| if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(); |
| // Apply dynamic partition-pruning per-file. |
| FileFormatsMap matching_per_type_files; |
| for (const FileFormatsMap::value_type& v: per_type_files_) { |
| vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first]; |
| for (HdfsFileDesc* file: v.second) { |
| if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) { |
| matching_files->push_back(file); |
| } else { |
| SkipFile(v.first, file); |
| } |
| } |
| } |
| |
| // Issue initial ranges for all file types. Only call functions for file types that |
| // actually exist - trying to add empty lists of ranges can result in spurious |
| // CANCELLED errors - see IMPALA-6564. |
| for (const auto& entry : matching_per_type_files) { |
| if (entry.second.empty()) continue; |
| switch (entry.first) { |
| case THdfsFileFormat::PARQUET: |
| RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, entry.second)); |
| break; |
| case THdfsFileFormat::TEXT: |
| RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, entry.second)); |
| break; |
| case THdfsFileFormat::SEQUENCE_FILE: |
| case THdfsFileFormat::RC_FILE: |
| case THdfsFileFormat::AVRO: |
| RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, entry.second)); |
| break; |
| case THdfsFileFormat::ORC: |
| RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this, entry.second)); |
| break; |
| default: |
| DCHECK(false) << "Unexpected file type " << entry.first; |
| } |
| } |
| // Except for BaseSequenceScanner, IssueInitialRanges() takes care of |
| // issuing all the ranges. For BaseSequenceScanner, IssueInitialRanges() |
| // will have incremented the counter. |
| return Status::OK(); |
| } |
| |
| bool HdfsScanNodeBase::FilePassesFilterPredicates( |
| const vector<FilterContext>& filter_ctxs, const THdfsFileFormat::type& format, |
| HdfsFileDesc* file) { |
| #ifndef NDEBUG |
| if (FLAGS_skip_file_runtime_filtering) return true; |
| #endif |
| if (filter_ctxs_.size() == 0) return true; |
| ScanRangeMetadata* metadata = |
| static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data()); |
| if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY, |
| filter_ctxs)) { |
| return false; |
| } |
| return true; |
| } |
| |
| Status HdfsScanNodeBase::StartNextScanRange(int64_t* reservation, |
| ScanRange** scan_range) { |
| DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| bool needs_buffers; |
| RETURN_IF_ERROR(reader_context_->GetNextUnstartedRange(scan_range, &needs_buffers)); |
| if (*scan_range == nullptr) return Status::OK(); |
| if (needs_buffers) { |
| // Check if we should increase our reservation to read this range more efficiently. |
| // E.g. if we are scanning a large text file, we might want extra I/O buffers to |
| // improve throughput. Note that if this is a columnar format like Parquet, |
| // '*scan_range' is the small footer range only so we won't request an increase. |
| int64_t ideal_scan_range_reservation = |
| io_mgr->ComputeIdealBufferReservation((*scan_range)->bytes_to_read()); |
| *reservation = IncreaseReservationIncrementally(*reservation, ideal_scan_range_reservation); |
| initial_range_ideal_reservation_stats_->UpdateCounter(ideal_scan_range_reservation); |
| initial_range_actual_reservation_stats_->UpdateCounter(*reservation); |
| RETURN_IF_ERROR( |
| io_mgr->AllocateBuffersForRange(buffer_pool_client(), *scan_range, *reservation)); |
| } |
| return Status::OK(); |
| } |
| |
| int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservation, |
| int64_t ideal_reservation) { |
| DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); |
| // Check if we could use at least one more max-sized I/O buffer for this range. Don't |
| // increase in smaller increments since we may not be able to use additional smaller |
| // buffers. |
| while (curr_reservation < ideal_reservation) { |
| // Increase to the next I/O buffer multiple or to the ideal reservation. |
| int64_t target = min(ideal_reservation, |
| BitUtil::RoundUpToPowerOf2(curr_reservation + 1, io_mgr->max_buffer_size())); |
| DCHECK_LT(curr_reservation, target); |
| bool increased = buffer_pool_client()->IncreaseReservation(target - curr_reservation); |
| VLOG_FILE << "Increasing reservation from " |
| << PrettyPrinter::PrintBytes(curr_reservation) << " to " |
| << PrettyPrinter::PrintBytes(target) << " " |
| << (increased ? "succeeded" : "failed"); |
| if (!increased) break; |
| curr_reservation = target; |
| } |
| return curr_reservation; |
| } |
| |
| ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, |
| int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local, |
| bool is_erasure_coded, int64_t mtime, const BufferOpts& buffer_opts, |
| const ScanRange* original_split) { |
| ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add( |
| new ScanRangeMetadata(partition_id, original_split)); |
| return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local, |
| is_erasure_coded, mtime, buffer_opts); |
| } |
| |
| ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, |
| int64_t len, int64_t offset, ScanRangeMetadata* metadata, int disk_id, |
| bool expected_local, bool is_erasure_coded, int64_t mtime, |
| const BufferOpts& buffer_opts) { |
| return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local, |
| is_erasure_coded, mtime, buffer_opts); |
| } |
| |
| ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, |
| int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, |
| int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded, |
| int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) { |
| ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add( |
| new ScanRangeMetadata(partition_id, original_split)); |
| return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata, |
| disk_id, expected_local, is_erasure_coded, mtime, buffer_opts); |
| } |
| |
| ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, |
| int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, |
| ScanRangeMetadata* metadata, int disk_id, bool expected_local, bool is_erasure_coded, |
| int64_t mtime, const BufferOpts& buffer_opts) { |
| DCHECK_GE(disk_id, -1); |
| // Require that the scan range is within [0, file_length). While this cannot be used |
| // to guarantee safety (file_length metadata may be stale), it avoids different |
| // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking |
| // beyond the end of the file). |
| DCHECK_GE(offset, 0); |
| DCHECK_GE(len, 0); |
| DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length) |
| << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")"; |
| disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue( |
| file, disk_id, expected_local); |
| |
| ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange); |
| range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime, |
| buffer_opts, move(sub_ranges), metadata); |
| return range; |
| } |
| |
| ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, |
| int64_t len, int64_t offset, int64_t partition_id, int disk_id, |
| int cache_options, bool expected_local, int64_t mtime, |
| bool is_erasure_coded, const ScanRange* original_split) { |
| return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local, |
| is_erasure_coded, mtime, BufferOpts(cache_options), original_split); |
| } |
| |
| Status HdfsScanNodeBase::AddDiskIoRanges(const vector<ScanRange*>& ranges, |
| EnqueueLocation enqueue_location) { |
| DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished."; |
| DCHECK_GT(remaining_scan_range_submissions_.Load(), 0); |
| DCHECK_GT(ranges.size(), 0); |
| return reader_context_->AddScanRanges(ranges, enqueue_location); |
| } |
| |
| HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(int64_t partition_id, const string& filename) { |
| auto file_desc_map_key = make_pair(partition_id, filename); |
| DCHECK(file_descs_.find(file_desc_map_key) != file_descs_.end()); |
| return file_descs_[file_desc_map_key]; |
| } |
| |
| void HdfsScanNodeBase::SetFileMetadata( |
| int64_t partition_id, const string& filename, void* metadata) { |
| unique_lock<mutex> l(metadata_lock_); |
| auto file_metadata_map_key = make_pair(partition_id, filename); |
| DCHECK(per_file_metadata_.find(file_metadata_map_key) == per_file_metadata_.end()); |
| per_file_metadata_[file_metadata_map_key] = metadata; |
| } |
| |
| void* HdfsScanNodeBase::GetFileMetadata( |
| int64_t partition_id, const string& filename) { |
| unique_lock<mutex> l(metadata_lock_); |
| auto file_metadata_map_key = make_pair(partition_id, filename); |
| auto it = per_file_metadata_.find(file_metadata_map_key); |
| if (it == per_file_metadata_.end()) return NULL; |
| return it->second; |
| } |
| |
| void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) { |
| CodegendFnMap::iterator it = codegend_fn_map_.find(type); |
| if (it == codegend_fn_map_.end()) return NULL; |
| return it->second; |
| } |
| |
| Status HdfsScanNodeBase::CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition, |
| ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) { |
| DCHECK(context != nullptr); |
| DCHECK(scanner->get() == nullptr); |
| THdfsCompression::type compression = |
| context->GetStream()->file_desc()->file_compression; |
| |
| // Create a new scanner for this file format and compression. |
| switch (partition->file_format()) { |
| case THdfsFileFormat::TEXT: |
| if (HdfsTextScanner::HasBuiltinSupport(compression)) { |
| scanner->reset(new HdfsTextScanner(this, runtime_state_)); |
| } else { |
| // No builtin support - we must have loaded the plugin in IssueInitialRanges(). |
| auto it = _THdfsCompression_VALUES_TO_NAMES.find(compression); |
| DCHECK(it != _THdfsCompression_VALUES_TO_NAMES.end()) |
| << "Already issued ranges for this compression type."; |
| scanner->reset(HdfsPluginTextScanner::GetHdfsPluginTextScanner( |
| this, runtime_state_, it->second)); |
| } |
| break; |
| case THdfsFileFormat::SEQUENCE_FILE: |
| scanner->reset(new HdfsSequenceScanner(this, runtime_state_)); |
| break; |
| case THdfsFileFormat::RC_FILE: |
| scanner->reset(new HdfsRCFileScanner(this, runtime_state_)); |
| break; |
| case THdfsFileFormat::AVRO: |
| scanner->reset(new HdfsAvroScanner(this, runtime_state_)); |
| break; |
| case THdfsFileFormat::PARQUET: |
| scanner->reset(new HdfsParquetScanner(this, runtime_state_)); |
| break; |
| case THdfsFileFormat::ORC: |
| scanner->reset(new HdfsOrcScanner(this, runtime_state_)); |
| break; |
| default: |
| return Status(Substitute("Unknown Hdfs file format type: $0", |
| partition->file_format())); |
| } |
| DCHECK(scanner->get() != nullptr); |
| RETURN_IF_ERROR(scanner->get()->Open(context)); |
| // Inject the error after the scanner is opened, to test the scanner close path. |
| return ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER); |
| } |
| |
| Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals, |
| MemPool* pool, RuntimeState* state) const { |
| if (partition_key_slots_.empty()) return NULL; |
| Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool); |
| for (int i = 0; i < partition_key_slots_.size(); ++i) { |
| const SlotDescriptor* slot_desc = partition_key_slots_[i]; |
| ScalarExprEvaluator* eval = evals[slot_desc->col_pos()]; |
| // Exprs guaranteed to be literals, so can safely be evaluated without a row. |
| RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, NULL); |
| } |
| return template_tuple; |
| } |
| |
| void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_desc, |
| Tuple* tuple) const { |
| for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) { |
| CollectionValue* slot = reinterpret_cast<CollectionValue*>( |
| tuple->GetSlot(slot_desc->tuple_offset())); |
| if (tuple->IsNull(slot_desc->null_indicator_offset())) { |
| *slot = CollectionValue(); |
| continue; |
| } |
| // Recursively traverse collection items. |
| const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor(); |
| if (item_desc->collection_slots().empty()) continue; |
| for (int i = 0; i < slot->num_tuples; ++i) { |
| int item_offset = i * item_desc->byte_size(); |
| Tuple* collection_item = reinterpret_cast<Tuple*>(slot->ptr + item_offset); |
| InitNullCollectionValues(item_desc, collection_item); |
| } |
| } |
| } |
| |
| void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const { |
| DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1); |
| const TupleDescriptor& tuple_desc = |
| *row_batch->row_desc()->tuple_descriptors()[tuple_idx()]; |
| if (tuple_desc.collection_slots().empty()) return; |
| for (int i = 0; i < row_batch->num_rows(); ++i) { |
| Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx()); |
| DCHECK(tuple != NULL); |
| InitNullCollectionValues(&tuple_desc, tuple); |
| } |
| } |
| |
| bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id, |
| const string& stats_name, const vector<FilterContext>& filter_ctxs) { |
| if (filter_ctxs.empty()) return true; |
| if (FilterContext::CheckForAlwaysFalse(stats_name, filter_ctxs)) return false; |
| DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size()) |
| << "Mismatched number of filter contexts"; |
| Tuple* template_tuple = partition_template_tuple_map_[partition_id]; |
| // Defensive - if template_tuple is NULL, there can be no filters on partition columns. |
| if (template_tuple == nullptr) return true; |
| TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple); |
| for (const FilterContext& ctx: filter_ctxs) { |
| int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_); |
| if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) { |
| continue; |
| } |
| |
| bool has_filter = ctx.filter->HasFilter(); |
| bool passed_filter = !has_filter || ctx.Eval(tuple_row_mem); |
| ctx.stats->IncrCounters(stats_name, 1, has_filter, !passed_filter); |
| if (!passed_filter) return false; |
| } |
| |
| return true; |
| } |
| |
| void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type, |
| const THdfsCompression::type& compression_type, bool skipped) { |
| vector<THdfsCompression::type> types; |
| types.push_back(compression_type); |
| RangeComplete(file_type, types, skipped); |
| } |
| |
| void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type, |
| const vector<THdfsCompression::type>& compression_types, bool skipped) { |
| scan_ranges_complete_counter_->Add(1); |
| progress_.Update(1); |
| HdfsCompressionTypesSet compression_set; |
| for (int i = 0; i < compression_types.size(); ++i) { |
| compression_set.AddType(compression_types[i]); |
| } |
| ++file_type_counts_[std::make_tuple(file_type, skipped, compression_set)]; |
| } |
| |
| void HdfsScanNodeBase::SkipFile(const THdfsFileFormat::type& file_type, |
| HdfsFileDesc* file) { |
| for (int i = 0; i < file->splits.size(); ++i) { |
| RangeComplete(file_type, file->file_compression, true); |
| } |
| } |
| |
| void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const { |
| const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts(); |
| // Initialize all order to be conjuncts.size() (after the last conjunct) |
| order->insert(order->begin(), materialized_slots().size(), conjuncts.size()); |
| |
| const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl(); |
| |
| vector<SlotId> slot_ids; |
| for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) { |
| slot_ids.clear(); |
| int num_slots = conjuncts[conjunct_idx]->GetSlotIds(&slot_ids); |
| for (int j = 0; j < num_slots; ++j) { |
| SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]); |
| int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path()); |
| // slot_idx == -1 means this was a partition key slot which is always |
| // materialized before any slots. |
| if (slot_idx == -1) continue; |
| // If this slot hasn't been assigned an order, assign it be materialized |
| // before evaluating conjuncts[i] |
| if ((*order)[slot_idx] == conjuncts.size()) { |
| (*order)[slot_idx] = conjunct_idx; |
| } |
| } |
| } |
| } |
| |
| void HdfsScanNodeBase::TransferToScanNodePool(MemPool* pool) { |
| scan_node_pool_->AcquireData(pool, false); |
| } |
| |
| void HdfsScanNodeBase::UpdateHdfsSplitStats( |
| const vector<TScanRangeParams>& scan_range_params_list, |
| PerVolumeStats* per_volume_stats) { |
| pair<int, int64_t> init_value(0, 0); |
| for (const TScanRangeParams& scan_range_params: scan_range_params_list) { |
| const TScanRange& scan_range = scan_range_params.scan_range; |
| if (!scan_range.__isset.hdfs_file_split) continue; |
| const THdfsFileSplit& split = scan_range.hdfs_file_split; |
| pair<int, int64_t>* stats = |
| FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value); |
| ++(stats->first); |
| stats->second += split.length; |
| } |
| } |
| |
| void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats, |
| stringstream* ss) { |
| for (PerVolumeStats::const_iterator i = per_volume_stats.begin(); |
| i != per_volume_stats.end(); ++i) { |
| (*ss) << i->first << ":" << i->second.first << "/" |
| << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " "; |
| } |
| } |
| |
| void HdfsScanNodeBase::StopAndFinalizeCounters() { |
| if (!counters_running_) return; |
| counters_running_ = false; |
| |
| runtime_profile_->StopPeriodicCounters(); |
| |
| // Output hdfs read thread concurrency into info string |
| stringstream ss; |
| for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) { |
| ss << i << ":" << setprecision(4) |
| << (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% "; |
| } |
| runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str()); |
| |
| // Convert disk access bitmap to num of disk accessed |
| uint64_t num_disk_bitmap = disks_accessed_bitmap_.value(); |
| int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap); |
| if (num_disks_accessed_counter_ != NULL) { |
| num_disks_accessed_counter_->Set(num_disk_accessed); |
| } |
| |
| // output completed file types and counts to info string |
| if (!file_type_counts_.empty()) { |
| stringstream ss; |
| { |
| for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin(); |
| it != file_type_counts_.end(); ++it) { |
| |
| THdfsFileFormat::type file_format = std::get<0>(it->first); |
| bool skipped = std::get<1>(it->first); |
| HdfsCompressionTypesSet compressions_set = std::get<2>(it->first); |
| int file_cnt = it->second; |
| |
| if (skipped) { |
| if (file_format == THdfsFileFormat::PARQUET) { |
| // If a scan range stored as parquet is skipped, its compression type |
| // cannot be figured out without reading the data. |
| ss << PrintThriftEnum(file_format) << "/" << "Unknown" << "(Skipped):" |
| << file_cnt << " "; |
| } else { |
| ss << PrintThriftEnum(file_format) << "/" |
| << PrintThriftEnum(compressions_set.GetFirstType()) << "(Skipped):" |
| << file_cnt << " "; |
| } |
| } else if (compressions_set.Size() == 1) { |
| ss << PrintThriftEnum(file_format) << "/" |
| << PrintThriftEnum(compressions_set.GetFirstType()) << ":" << file_cnt |
| << " "; |
| } else { |
| ss << PrintThriftEnum(file_format) << "/" << "("; |
| bool first = true; |
| for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) { |
| THdfsCompression::type type = static_cast<THdfsCompression::type>( |
| elem.first); |
| if (!compressions_set.HasType(type)) continue; |
| if (!first) ss << ","; |
| ss << PrintThriftEnum(type); |
| first = false; |
| } |
| ss << "):" << file_cnt << " "; |
| } |
| } |
| } |
| runtime_profile_->AddInfoString("File Formats", ss.str()); |
| } |
| |
| // Output fraction of scanners with codegen enabled |
| int num_enabled = num_scanners_codegen_enabled_.Load(); |
| int total = num_enabled + num_scanners_codegen_disabled_.Load(); |
| runtime_profile()->AppendExecOption( |
| Substitute("Codegen enabled: $0 out of $1", num_enabled, total)); |
| |
| // Locking here should not be necessary since bytes_read_per_col_ is only updated inside |
| // column readers, and all column readers should have completed at this point; however, |
| // we acquire a read lock in case the update semantics of bytes_read_per_col_ change |
| { |
| shared_lock<shared_mutex> bytes_read_per_col_guard_read_lock( |
| bytes_read_per_col_lock_); |
| for (const auto& bytes_read : bytes_read_per_col_) { |
| int64_t uncompressed_bytes_read = bytes_read.second.uncompressed_bytes_read.Load(); |
| if (uncompressed_bytes_read > 0) { |
| uncompressed_bytes_read_per_column_counter_->UpdateCounter( |
| uncompressed_bytes_read); |
| } |
| int64_t compressed_bytes_read = bytes_read.second.compressed_bytes_read.Load(); |
| if (compressed_bytes_read > 0) { |
| compressed_bytes_read_per_column_counter_->UpdateCounter(compressed_bytes_read); |
| } |
| } |
| } |
| |
| if (reader_context_ != nullptr) { |
| bytes_read_local_->Set(reader_context_->bytes_read_local()); |
| bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit()); |
| bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache()); |
| num_remote_ranges_->Set(reader_context_->num_remote_ranges()); |
| unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes()); |
| cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count()); |
| cached_file_handles_miss_count_->Set( |
| reader_context_->cached_file_handles_miss_count()); |
| |
| if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) { |
| runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute( |
| "Read $0 of data across network that was expected to be local. Block locality " |
| "metadata for table '$1.$2' may be stale. This only affects query performance " |
| "and not result correctness. One of the common causes for this warning is HDFS " |
| "rebalancer moving some of the file's blocks. If the issue persists, consider " |
| "running \"INVALIDATE METADATA `$1`.`$2`\".", |
| PrettyPrinter::Print(unexpected_remote_bytes_->value(), TUnit::BYTES), |
| hdfs_table_->database(), hdfs_table_->name()))); |
| } |
| |
| ImpaladMetrics::IO_MGR_BYTES_READ->Increment(bytes_read_counter()->value()); |
| ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ->Increment( |
| bytes_read_local_->value()); |
| ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ->Increment( |
| bytes_read_short_circuit_->value()); |
| ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment( |
| bytes_read_dn_cache_->value()); |
| } |
| } |
| |
| Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) { |
| return ExecDebugAction(phase, runtime_state_); |
| } |
| |
| void HdfsScanNodeBase::UpdateBytesRead( |
| SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read) { |
| // Acquire a read lock first and check if the slot_id is in bytes_read_per_col_, if it |
| // is then update the value and release the read lock; if not then release the read |
| // lock, acquire the write lock, and then initialize the slot_id with the give value for |
| // bytes_read |
| shared_lock<shared_mutex> bytes_read_per_col_guard_read_lock( |
| bytes_read_per_col_lock_); |
| auto bytes_read_itr = bytes_read_per_col_.find(slot_id); |
| if (bytes_read_itr != bytes_read_per_col_.end()) { |
| bytes_read_itr->second.uncompressed_bytes_read.Add(uncompressed_bytes_read); |
| bytes_read_itr->second.compressed_bytes_read.Add(compressed_bytes_read); |
| } else { |
| bytes_read_per_col_guard_read_lock.unlock(); |
| lock_guard<shared_mutex> bytes_read_per_col_guard_write_lock( |
| bytes_read_per_col_lock_); |
| bytes_read_per_col_[slot_id].uncompressed_bytes_read.Add(uncompressed_bytes_read); |
| bytes_read_per_col_[slot_id].compressed_bytes_read.Add(compressed_bytes_read); |
| } |
| } |