blob: ca712014413a5d82013dca3617544e887772d96f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-scan-node-base.h"
#include "exec/base-sequence-scanner.h"
#include "exec/hdfs-text-scanner.h"
#include "exec/hdfs-lzo-text-scanner.h"
#include "exec/hdfs-sequence-scanner.h"
#include "exec/hdfs-rcfile-scanner.h"
#include "exec/hdfs-avro-scanner.h"
#include "exec/hdfs-parquet-scanner.h"
#include <sstream>
#include <avro/errors.h>
#include <avro/schema.h>
#include <boost/filesystem.hpp>
#include <gutil/strings/substitute.h>
#include "codegen/llvm-codegen.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "runtime/descriptors.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/string-buffer.h"
#include "util/bit-util.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/error-util.h"
#include "util/hdfs-util.h"
#include "util/impalad-metrics.h"
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
"that a scan node will wait for expected runtime filters to arrive.");
// TODO: Remove this flag in a compatibility-breaking release.
DEFINE_bool(suppress_unknown_disk_id_warnings, false, "Deprecated.");
#ifndef NDEBUG
DECLARE_bool(skip_file_runtime_filtering);
#endif
namespace filesystem = boost::filesystem;
using namespace impala;
using namespace llvm;
using namespace strings;
using boost::algorithm::join;
const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
"Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
// Determines how many unexpected remote bytes trigger an error in the runtime state
const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
tnode.hdfs_scan_node.min_max_tuple_id : -1),
skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
tnode.hdfs_scan_node.skip_header_line_count : 0),
tuple_id_(tnode.hdfs_scan_node.tuple_id),
optimize_parquet_count_star_(
tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset),
parquet_count_star_slot_offset_(
tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
tnode.hdfs_scan_node.parquet_count_star_slot_offset : -1),
tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
thrift_dict_filter_conjuncts_map_(
tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
&tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
disks_accessed_bitmap_(TUnit::UNIT, 0) {
}
HdfsScanNodeBase::~HdfsScanNodeBase() {
}
Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Init(tnode, state));
// Add collection item conjuncts
for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) {
TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first);
RowDescriptor* collection_row_desc = state->obj_pool()->Add(
new RowDescriptor(tuple_desc, /* is_nullable */ false));
DCHECK(conjuncts_map_.find(entry.first) == conjuncts_map_.end());
RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, state,
&conjuncts_map_[entry.first]));
}
DCHECK(conjuncts_map_[tuple_id_].empty());
conjuncts_map_[tuple_id_] = conjuncts_;
const TQueryOptions& query_options = state->query_options();
for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) {
auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id);
DCHECK(it != filter_desc.planid_to_target_ndx.end());
const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second];
if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL &&
!target.is_local_target) {
continue;
}
if (query_options.disable_row_runtime_filtering &&
!target.is_bound_by_partition_columns) {
continue;
}
ScalarExpr* filter_expr;
RETURN_IF_ERROR(
ScalarExpr::Create(target.target_expr, *row_desc(), state, &filter_expr));
filter_exprs_.push_back(filter_expr);
// TODO: Move this to Prepare()
filter_ctxs_.emplace_back();
FilterContext& filter_ctx = filter_ctxs_.back();
filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
RuntimeProfile* profile = state->obj_pool()->Add(
new RuntimeProfile(state->obj_pool(), filter_profile_title));
runtime_profile_->AddChild(profile);
filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
target.is_bound_by_partition_columns));
}
// Add min max conjuncts
if (min_max_tuple_id_ != -1) {
min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_);
DCHECK(min_max_tuple_desc_ != nullptr);
RowDescriptor* min_max_row_desc = state->obj_pool()->Add(
new RowDescriptor(min_max_tuple_desc_, /* is_nullable */ false));
RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
*min_max_row_desc, state, &min_max_conjuncts_));
}
return Status::OK();
}
/// TODO: Break up this very long function.
Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
runtime_state_ = state;
RETURN_IF_ERROR(ScanNode::Prepare(state));
// Prepare collection conjuncts
for (const auto& entry: conjuncts_map_) {
TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first);
// conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again
if (tuple_desc == tuple_desc_) {
conjunct_evals_map_[entry.first] = conjunct_evals();
} else {
DCHECK(conjunct_evals_map_[entry.first].empty());
RETURN_IF_ERROR(ScalarExprEvaluator::Create(entry.second, state, pool_,
expr_mem_pool(), &conjunct_evals_map_[entry.first]));
}
}
DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
for (int i = 0; i < filter_exprs_.size(); ++i) {
RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, pool_,
expr_mem_pool(), &filter_ctxs_[i].expr_eval));
AddEvaluatorToFree(filter_ctxs_[i].expr_eval);
}
// Prepare min max statistics conjuncts.
if (min_max_tuple_id_ != -1) {
RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_,
expr_mem_pool(), &min_max_conjunct_evals_));
}
// One-time initialization of state that is constant across scan ranges
DCHECK(tuple_desc_->table_desc() != NULL);
hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
scan_node_pool_.reset(new MemPool(mem_tracker()));
// Parse Avro table schema if applicable
const string& avro_schema_str = hdfs_table_->avro_schema();
if (!avro_schema_str.empty()) {
avro_schema_t avro_schema;
int error = avro_schema_from_json_length(
avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
if (error != 0) {
return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
}
RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
}
// Gather materialized partition-key slots and non-partition slots.
const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
for (size_t i = 0; i < slots.size(); ++i) {
if (hdfs_table_->IsClusteringCol(slots[i])) {
partition_key_slots_.push_back(slots[i]);
} else {
materialized_slots_.push_back(slots[i]);
}
}
// Order the materialized slots such that for schemaless file formats (e.g. text) the
// order corresponds to the physical order in files. For formats where the file schema
// is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
sort(materialized_slots_.begin(), materialized_slots_.end(),
SlotDescriptor::ColPathLessThan);
// Populate mapping from slot path to index into materialized_slots_.
for (int i = 0; i < materialized_slots_.size(); ++i) {
path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
}
// Initialize is_materialized_col_
is_materialized_col_.resize(hdfs_table_->num_cols());
for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
}
HdfsFsCache::HdfsFsMap fs_cache;
// Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
// partition_ids_, file_descs_, and per_type_files_.
DCHECK(scan_range_params_ != NULL)
<< "Must call SetScanRanges() before calling Prepare()";
int num_ranges_missing_volume_id = 0;
for (const TScanRangeParams& params: *scan_range_params_) {
DCHECK(params.scan_range.__isset.hdfs_file_split);
const THdfsFileSplit& split = params.scan_range.hdfs_file_split;
partition_ids_.insert(split.partition_id);
HdfsPartitionDescriptor* partition_desc =
hdfs_table_->GetPartition(split.partition_id);
if (partition_desc == NULL) {
// TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
<< " partition_id=" << split.partition_id
<< "\n" << PrintThrift(state->instance_ctx());
return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
" Try rerunning the query.");
}
filesystem::path file_path(partition_desc->location());
file_path.append(split.file_name, filesystem::path::codecvt());
const string& native_file_path = file_path.native();
auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
HdfsFileDesc* file_desc = NULL;
FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key);
if (file_desc_it == file_descs_.end()) {
// Add new file_desc to file_descs_ and per_type_files_
file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
file_descs_[file_desc_map_key] = file_desc;
file_desc->file_length = split.file_length;
file_desc->mtime = split.mtime;
file_desc->file_compression = split.file_compression;
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
native_file_path, &file_desc->fs, &fs_cache));
num_unqueued_files_.Add(1);
per_type_files_[partition_desc->file_format()].push_back(file_desc);
} else {
// File already processed
file_desc = file_desc_it->second;
}
bool expected_local = params.__isset.is_remote && !params.is_remote;
if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id;
bool try_cache = params.is_cached;
if (runtime_state_->query_options().disable_cached_reads) {
DCHECK(!try_cache) << "Params should not have had this set.";
}
file_desc->splits.push_back(
AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
split.offset, split.partition_id, params.volume_id, expected_local,
DiskIoMgr::BufferOpts(try_cache, file_desc->mtime)));
}
// Update server wide metrics for number of scan ranges and ranges that have
// incomplete metadata.
ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
// Add per volume stats to the runtime profile
PerVolumeStats per_volume_stats;
stringstream str;
UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
PrintHdfsSplitStats(per_volume_stats, &str);
runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
AddCodegenDisabledMessage(state);
return Status::OK();
}
void HdfsScanNodeBase::Codegen(RuntimeState* state) {
DCHECK(state->ShouldCodegen());
ExecNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
// Create codegen'd functions
for (int format = THdfsFileFormat::TEXT; format <= THdfsFileFormat::PARQUET; ++format) {
vector<HdfsFileDesc*>& file_descs =
per_type_files_[static_cast<THdfsFileFormat::type>(format)];
if (file_descs.empty()) continue;
// Randomize the order this node processes the files. We want to do this to avoid
// issuing remote reads to the same DN from different impalads. In file formats such
// as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
// If every node goes through the files in the same order, all the remote reads are
// for the same file meaning a few DN serves a lot of remote reads at the same time.
random_shuffle(file_descs.begin(), file_descs.end());
// Create reusable codegen'd functions for each file type type needed
// TODO: do this for conjuncts_map_
Function* fn;
Status status;
switch (format) {
case THdfsFileFormat::TEXT:
status = HdfsTextScanner::Codegen(this, conjuncts_, &fn);
break;
case THdfsFileFormat::SEQUENCE_FILE:
status = HdfsSequenceScanner::Codegen(this, conjuncts_, &fn);
break;
case THdfsFileFormat::AVRO:
status = HdfsAvroScanner::Codegen(this, conjuncts_, &fn);
break;
case THdfsFileFormat::PARQUET:
status = HdfsParquetScanner::Codegen(this, conjuncts_, &fn);
break;
default:
// No codegen for this format
fn = NULL;
status = Status("Not implemented for this format.");
}
DCHECK(fn != NULL || !status.ok());
const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
if (status.ok()) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != NULL);
codegen->AddFunctionToJit(fn,
&codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
}
runtime_profile()->AddCodegenMsg(status.ok(), status, format_name);
}
}
Status HdfsScanNodeBase::Open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Open(state));
// Open collection conjuncts
for (auto& entry: conjunct_evals_map_) {
// conjuncts_ are already opened in ExecNode::Open()
if (entry.first == tuple_id_) continue;
RETURN_IF_ERROR(ScalarExprEvaluator::Open(entry.second, state));
}
// Open min max conjuncts
RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state));
// Open Runtime filter expressions.
for (FilterContext& ctx : filter_ctxs_) {
RETURN_IF_ERROR(ctx.expr_eval->Open(state));
}
// Create template tuples for all partitions.
for (int64_t partition_id: partition_ids_) {
HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
<< " partition_id=" << partition_id
<< "\n" << PrintThrift(state->instance_ctx());
partition_template_tuple_map_[partition_id] = InitTemplateTuple(
partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state);
}
runtime_state_->io_mgr()->RegisterContext(&reader_context_, mem_tracker());
// Initialize HdfsScanNode specific counters
// TODO: Revisit counters and move the counters specific to multi-threaded scans
// into HdfsScanNode.
read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
scan_ranges_complete_counter_ =
ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
if (DiskInfo::num_disks() < 64) {
num_disks_accessed_counter_ =
ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT);
} else {
num_disks_accessed_counter_ = NULL;
}
num_scanner_threads_started_counter_ =
ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
runtime_state_->io_mgr()->set_bytes_read_counter(reader_context_, bytes_read_counter());
runtime_state_->io_mgr()->set_read_timer(reader_context_, read_timer());
runtime_state_->io_mgr()->set_active_read_thread_counter(reader_context_,
&active_hdfs_read_thread_counter_);
runtime_state_->io_mgr()->set_disks_access_bitmap(reader_context_,
&disks_accessed_bitmap_);
average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
TUnit::BYTES);
bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
TUnit::BYTES);
bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache",
TUnit::BYTES);
num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges",
TUnit::UNIT);
unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
TUnit::BYTES);
cached_file_handles_hit_count_ = ADD_COUNTER(runtime_profile(),
"CachedFileHandlesHitCount", TUnit::UNIT);
cached_file_handles_miss_count_ = ADD_COUNTER(runtime_profile(),
"CachedFileHandlesMissCount", TUnit::UNIT);
max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
"MaxCompressedTextFileLength", TUnit::BYTES);
for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
hdfs_read_thread_concurrency_bucket_.push_back(
pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
}
runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
&hdfs_read_thread_concurrency_bucket_);
counters_running_ = true;
int64_t total_splits = 0;
for (const auto& fd: file_descs_) total_splits += fd.second->splits.size();
progress_.Init(Substitute("Splits complete (node=$0)", total_splits), total_splits);
return Status::OK();
}
Status HdfsScanNodeBase::Reset(RuntimeState* state) {
DCHECK(false) << "Internal error: Scan nodes should not appear in subplans.";
return Status("Internal error: Scan nodes should not appear in subplans.");
}
void HdfsScanNodeBase::Close(RuntimeState* state) {
if (is_closed()) return;
if (reader_context_ != NULL) {
// There may still be io buffers used by parent nodes so we can't unregister the
// reader context yet. The runtime state keeps a list of all the reader contexts and
// they are unregistered when the fragment is closed.
state->AcquireReaderContext(reader_context_);
// Need to wait for all the active scanner threads to finish to ensure there is no
// more memory tracked by this scan node's mem tracker.
state->io_mgr()->CancelContext(reader_context_, true);
}
StopAndFinalizeCounters();
// There should be no active scanner threads and hdfs read threads.
DCHECK_EQ(active_scanner_thread_counter_.value(), 0);
DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
// Close collection conjuncts
for (auto& tid_conjunct: conjuncts_map_) {
// conjuncts_ are already closed in ExecNode::Close()
if (tid_conjunct.first == tuple_id_) continue;
ScalarExprEvaluator::Close(conjunct_evals_map_[tid_conjunct.first], state);
ScalarExpr::Close(tid_conjunct.second);
}
// Close min max conjunct
ScalarExprEvaluator::Close(min_max_conjunct_evals_, state);
ScalarExpr::Close(min_max_conjuncts_);
// Close filter
for (auto& filter_ctx : filter_ctxs_) {
if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state);
}
ScalarExpr::Close(filter_exprs_);
ScanNode::Close(state);
}
Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
DCHECK(!initial_ranges_issued_);
initial_ranges_issued_ = true;
// No need to issue ranges with limit 0.
if (ReachedLimit()) {
DCHECK_EQ(limit_, 0);
return Status::OK();
}
int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
if (state->query_options().runtime_filter_wait_time_ms > 0) {
wait_time_ms = state->query_options().runtime_filter_wait_time_ms;
}
if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms);
// Apply dynamic partition-pruning per-file.
FileFormatsMap matching_per_type_files;
for (const FileFormatsMap::value_type& v: per_type_files_) {
vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first];
for (HdfsFileDesc* file: v.second) {
if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) {
matching_files->push_back(file);
}
}
}
// Issue initial ranges for all file types.
RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
matching_per_type_files[THdfsFileFormat::PARQUET]));
RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
matching_per_type_files[THdfsFileFormat::TEXT]));
RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
matching_per_type_files[THdfsFileFormat::RC_FILE]));
RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
matching_per_type_files[THdfsFileFormat::AVRO]));
return Status::OK();
}
bool HdfsScanNodeBase::FilePassesFilterPredicates(
const vector<FilterContext>& filter_ctxs, const THdfsFileFormat::type& format,
HdfsFileDesc* file) {
#ifndef NDEBUG
if (FLAGS_skip_file_runtime_filtering) return true;
#endif
if (filter_ctxs_.size() == 0) return true;
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
filter_ctxs)) {
for (int j = 0; j < file->splits.size(); ++j) {
// Mark range as complete to ensure progress.
RangeComplete(format, file->file_compression);
}
return false;
}
return true;
}
bool HdfsScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) {
vector<string> arrived_filter_ids;
vector<string> missing_filter_ids;
int32_t start = MonotonicMillis();
for (auto& ctx: filter_ctxs_) {
string filter_id = Substitute("$0", ctx.filter->id());
if (ctx.filter->WaitForArrival(time_ms)) {
arrived_filter_ids.push_back(filter_id);
} else {
missing_filter_ids.push_back(filter_id);
}
}
int32_t end = MonotonicMillis();
const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS);
if (arrived_filter_ids.size() == filter_ctxs_.size()) {
runtime_profile()->AddInfoString("Runtime filters",
Substitute("All filters arrived. Waited $0", wait_time));
VLOG_QUERY << "Filters arrived. Waited " << wait_time;
return true;
}
const string& filter_str = Substitute(
"Not all filters arrived (arrived: [$0], missing [$1]), waited for $2",
join(arrived_filter_ids, ", "), join(missing_filter_ids, ", "), wait_time);
runtime_profile()->AddInfoString("Runtime filters", filter_str);
VLOG_QUERY << filter_str;
return false;
}
DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
const DiskIoMgr::BufferOpts& buffer_opts,
const DiskIoMgr::ScanRange* original_split) {
DCHECK_GE(disk_id, -1);
// Require that the scan range is within [0, file_length). While this cannot be used
// to guarantee safety (file_length metadata may be stale), it avoids different
// behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
// beyond the end of the file).
DCHECK_GE(offset, 0);
DCHECK_GE(len, 0);
DCHECK_LE(offset + len, GetFileDesc(partition_id, file)->file_length)
<< "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
new ScanRangeMetadata(partition_id, original_split));
DiskIoMgr::ScanRange* range =
runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata);
return range;
}
DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split) {
return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local,
DiskIoMgr::BufferOpts(try_cache, mtime), original_split);
}
Status HdfsScanNodeBase::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
int num_files_queued) {
RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
num_unqueued_files_.Add(-num_files_queued);
DCHECK_GE(num_unqueued_files_.Load(), 0);
return Status::OK();
}
HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(int64_t partition_id, const string& filename) {
auto file_desc_map_key = make_pair(partition_id, filename);
DCHECK(file_descs_.find(file_desc_map_key) != file_descs_.end());
return file_descs_[file_desc_map_key];
}
void HdfsScanNodeBase::SetFileMetadata(
int64_t partition_id, const string& filename, void* metadata) {
unique_lock<mutex> l(metadata_lock_);
auto file_metadata_map_key = make_pair(partition_id, filename);
DCHECK(per_file_metadata_.find(file_metadata_map_key) == per_file_metadata_.end());
per_file_metadata_[file_metadata_map_key] = metadata;
}
void* HdfsScanNodeBase::GetFileMetadata(
int64_t partition_id, const string& filename) {
unique_lock<mutex> l(metadata_lock_);
auto file_metadata_map_key = make_pair(partition_id, filename);
auto it = per_file_metadata_.find(file_metadata_map_key);
if (it == per_file_metadata_.end()) return NULL;
return it->second;
}
void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
CodegendFnMap::iterator it = codegend_fn_map_.find(type);
if (it == codegend_fn_map_.end()) return NULL;
return it->second;
}
Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
DCHECK(context != NULL);
THdfsCompression::type compression =
context->GetStream()->file_desc()->file_compression;
// Create a new scanner for this file format and compression.
switch (partition->file_format()) {
case THdfsFileFormat::TEXT:
// Lzo-compressed text files are scanned by a scanner that it is implemented as a
// dynamic library, so that Impala does not include GPL code.
if (compression == THdfsCompression::LZO) {
scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_));
} else {
scanner->reset(new HdfsTextScanner(this, runtime_state_));
}
break;
case THdfsFileFormat::SEQUENCE_FILE:
scanner->reset(new HdfsSequenceScanner(this, runtime_state_));
break;
case THdfsFileFormat::RC_FILE:
scanner->reset(new HdfsRCFileScanner(this, runtime_state_));
break;
case THdfsFileFormat::AVRO:
scanner->reset(new HdfsAvroScanner(this, runtime_state_));
break;
case THdfsFileFormat::PARQUET:
scanner->reset(new HdfsParquetScanner(this, runtime_state_));
break;
default:
return Status(Substitute("Unknown Hdfs file format type: $0",
partition->file_format()));
}
DCHECK(scanner->get() != NULL);
Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
if (status.ok()) {
status = scanner->get()->Open(context);
if (!status.ok()) {
scanner->get()->Close(nullptr);
scanner->reset();
}
} else {
context->ClearStreams();
scanner->reset();
}
return status;
}
Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals,
MemPool* pool, RuntimeState* state) const {
if (partition_key_slots_.empty()) return NULL;
Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool);
for (int i = 0; i < partition_key_slots_.size(); ++i) {
const SlotDescriptor* slot_desc = partition_key_slots_[i];
ScalarExprEvaluator* eval = evals[slot_desc->col_pos()];
// Exprs guaranteed to be literals, so can safely be evaluated without a row.
RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, NULL);
}
return template_tuple;
}
void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_desc,
Tuple* tuple) const {
for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) {
CollectionValue* slot = reinterpret_cast<CollectionValue*>(
tuple->GetSlot(slot_desc->tuple_offset()));
if (tuple->IsNull(slot_desc->null_indicator_offset())) {
*slot = CollectionValue();
continue;
}
// Recursively traverse collection items.
const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor();
if (item_desc->collection_slots().empty()) continue;
for (int i = 0; i < slot->num_tuples; ++i) {
int item_offset = i * item_desc->byte_size();
Tuple* collection_item = reinterpret_cast<Tuple*>(slot->ptr + item_offset);
InitNullCollectionValues(item_desc, collection_item);
}
}
}
void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
const TupleDescriptor& tuple_desc =
*row_batch->row_desc()->tuple_descriptors()[tuple_idx()];
if (tuple_desc.collection_slots().empty()) return;
for (int i = 0; i < row_batch->num_rows(); ++i) {
Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());
DCHECK(tuple != NULL);
InitNullCollectionValues(&tuple_desc, tuple);
}
}
bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
const string& stats_name, const vector<FilterContext>& filter_ctxs) {
if (filter_ctxs.size() == 0) return true;
DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
<< "Mismatched number of filter contexts";
Tuple* template_tuple = partition_template_tuple_map_[partition_id];
// Defensive - if template_tuple is NULL, there can be no filters on partition columns.
if (template_tuple == NULL) return true;
TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
for (const FilterContext& ctx: filter_ctxs) {
int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);
if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) {
continue;
}
bool has_filter = ctx.filter->HasBloomFilter();
bool passed_filter = !has_filter || ctx.Eval(tuple_row_mem);
ctx.stats->IncrCounters(stats_name, 1, has_filter, !passed_filter);
if (!passed_filter) return false;
}
return true;
}
void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
const THdfsCompression::type& compression_type) {
vector<THdfsCompression::type> types;
types.push_back(compression_type);
RangeComplete(file_type, types);
}
void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
const vector<THdfsCompression::type>& compression_types) {
scan_ranges_complete_counter()->Add(1);
progress_.Update(1);
for (int i = 0; i < compression_types.size(); ++i) {
++file_type_counts_[make_pair(file_type, compression_types[i])];
}
}
void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const {
const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts();
// Initialize all order to be conjuncts.size() (after the last conjunct)
order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
vector<SlotId> slot_ids;
for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
slot_ids.clear();
int num_slots = conjuncts[conjunct_idx]->GetSlotIds(&slot_ids);
for (int j = 0; j < num_slots; ++j) {
SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]);
int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path());
// slot_idx == -1 means this was a partition key slot which is always
// materialized before any slots.
if (slot_idx == -1) continue;
// If this slot hasn't been assigned an order, assign it be materialized
// before evaluating conjuncts[i]
if ((*order)[slot_idx] == conjuncts.size()) {
(*order)[slot_idx] = conjunct_idx;
}
}
}
}
void HdfsScanNodeBase::TransferToScanNodePool(MemPool* pool) {
scan_node_pool_->AcquireData(pool, false);
}
void HdfsScanNodeBase::UpdateHdfsSplitStats(
const vector<TScanRangeParams>& scan_range_params_list,
PerVolumeStats* per_volume_stats) {
pair<int, int64_t> init_value(0, 0);
for (const TScanRangeParams& scan_range_params: scan_range_params_list) {
const TScanRange& scan_range = scan_range_params.scan_range;
if (!scan_range.__isset.hdfs_file_split) continue;
const THdfsFileSplit& split = scan_range.hdfs_file_split;
pair<int, int64_t>* stats =
FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value);
++(stats->first);
stats->second += split.length;
}
}
void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats,
stringstream* ss) {
for (PerVolumeStats::const_iterator i = per_volume_stats.begin();
i != per_volume_stats.end(); ++i) {
(*ss) << i->first << ":" << i->second.first << "/"
<< PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " ";
}
}
void HdfsScanNodeBase::StopAndFinalizeCounters() {
if (!counters_running_) return;
counters_running_ = false;
PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
true);
// Output hdfs read thread concurrency into info string
stringstream ss;
for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
ss << i << ":" << setprecision(4)
<< hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
}
runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
// Convert disk access bitmap to num of disk accessed
uint64_t num_disk_bitmap = disks_accessed_bitmap_.value();
int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap);
if (num_disks_accessed_counter_ != NULL) {
num_disks_accessed_counter_->Set(num_disk_accessed);
}
// output completed file types and counts to info string
if (!file_type_counts_.empty()) {
stringstream ss;
{
for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin();
it != file_type_counts_.end(); ++it) {
ss << it->first.first << "/" << it->first.second << ":" << it->second << " ";
}
}
runtime_profile_->AddInfoString("File Formats", ss.str());
}
// Output fraction of scanners with codegen enabled
int num_enabled = num_scanners_codegen_enabled_.Load();
int total = num_enabled + num_scanners_codegen_disabled_.Load();
runtime_profile()->AppendExecOption(
Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
if (reader_context_ != NULL) {
bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_));
bytes_read_short_circuit_->Set(
runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_));
bytes_read_dn_cache_->Set(
runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_));
num_remote_ranges_->Set(static_cast<int64_t>(
runtime_state_->io_mgr()->num_remote_ranges(reader_context_)));
unexpected_remote_bytes_->Set(
runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_));
cached_file_handles_hit_count_->Set(
runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_));
cached_file_handles_miss_count_->Set(
runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_));
if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(
"Read $0 of data across network that was expected to be local. "
"Block locality metadata for table '$1.$2' may be stale. Consider running "
"\"INVALIDATE METADATA `$1`.`$2`\".",
PrettyPrinter::Print(unexpected_remote_bytes_->value(), TUnit::BYTES),
hdfs_table_->database(), hdfs_table_->name())));
}
ImpaladMetrics::IO_MGR_BYTES_READ->Increment(bytes_read_counter()->value());
ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ->Increment(
bytes_read_local_->value());
ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ->Increment(
bytes_read_short_circuit_->value());
ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
bytes_read_dn_cache_->value());
}
}
Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) {
return ExecDebugAction(phase, runtime_state_);
}