blob: 557d346beb6ba9844f53b575a3aaa963f5adacdc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-scan-node.h"
#include <memory>
#include <sstream>
#include "common/logging.h"
#include "exec/hdfs-scanner.h"
#include "exec/scanner-context.h"
#include "runtime/descriptors.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
DEFINE_int32(max_row_batches, 0, "the maximum size of materialized_row_batches_");
#ifndef NDEBUG
DECLARE_bool(skip_file_runtime_filtering);
#endif
using namespace impala;
// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
// The memory used does not vary considerably between file formats (just a couple of MBs).
// This value is conservative and taken from running against the tpch lineitem table.
// TODO: revisit how we do this.
const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
// Estimated upper bound on the compression ratio of compressed text files. Used to
// estimate scanner thread memory usage.
const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
// Amount of time to block waiting for GetNext() to release scanner threads between
// checking if a scanner thread should yield itself back to the global thread pool.
const int SCANNER_THREAD_WAIT_TIME_MS = 20;
HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: HdfsScanNodeBase(pool, tnode, descs),
ranges_issued_barrier_(1),
scanner_thread_bytes_required_(0),
done_(false),
all_ranges_started_(false),
thread_avail_cb_id_(-1),
max_num_scanner_threads_(CpuInfo::num_cores()) {
}
HdfsScanNode::~HdfsScanNode() {
}
Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
if (!initial_ranges_issued_) {
// We do this in GetNext() to maximise the amount of work we can do while waiting for
// runtime filters to show up. The scanner threads have already started (in Open()),
// so we need to tell them there is work to do.
// TODO: This is probably not worth splitting the organisational cost of splitting
// initialisation across two places. Move to before the scanner threads start.
RETURN_IF_ERROR(IssueInitialScanRanges(state));
// Release the scanner threads
ranges_issued_barrier_.Notify();
if (progress_.done()) SetDone();
}
Status status = GetNextInternal(state, row_batch, eos);
if (!status.ok() || *eos) {
unique_lock<mutex> l(lock_);
lock_guard<SpinLock> l2(file_type_counts_);
StopAndFinalizeCounters();
row_batches_put_timer_->Set(materialized_row_batches_->total_put_wait_time());
row_batches_get_timer_->Set(materialized_row_batches_->total_get_wait_time());
}
return status;
}
Status HdfsScanNode::GetNextInternal(
RuntimeState* state, RowBatch* row_batch, bool* eos) {
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
if (ReachedLimit()) {
// LIMIT 0 case. Other limit values handled below.
DCHECK_EQ(limit_, 0);
*eos = true;
return Status::OK();
}
*eos = false;
unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
if (materialized_batch != NULL) {
num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers());
row_batch->AcquireState(materialized_batch.get());
// Update the number of materialized rows now instead of when they are materialized.
// This means that scanners might process and queue up more rows than are necessary
// for the limit case but we want to avoid the synchronized writes to
// num_rows_returned_.
num_rows_returned_ += row_batch->num_rows();
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
if (ReachedLimit()) {
int num_rows_over = num_rows_returned_ - limit_;
row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
num_rows_returned_ -= num_rows_over;
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
*eos = true;
SetDone();
}
DCHECK_EQ(materialized_batch->num_io_buffers(), 0);
materialized_batch.reset();
return Status::OK();
}
// The RowBatchQueue was shutdown either because all scan ranges are complete or a
// scanner thread encountered an error. Check status_ to distinguish those cases.
*eos = true;
unique_lock<mutex> l(lock_);
return status_;
}
Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
int default_max_row_batches = FLAGS_max_row_batches;
if (default_max_row_batches <= 0) {
default_max_row_batches = 10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS);
}
if (state->query_options().__isset.mt_dop && state->query_options().mt_dop > 0) {
// To avoid a significant memory increase, adjust the number of maximally queued
// row batches per scan instance based on MT_DOP. The max materialized row batches
// is at least 2 to allow for some parallelism between the producer/consumer.
max_materialized_row_batches_ =
max(2, default_max_row_batches / state->query_options().mt_dop);
} else {
max_materialized_row_batches_ = default_max_row_batches;
}
VLOG_QUERY << "Max row batch queue size for scan node '" << id_
<< "' in fragment instance '" << state->fragment_instance_id()
<< "': " << max_materialized_row_batches_;
materialized_row_batches_.reset(new RowBatchQueue(max_materialized_row_batches_));
return HdfsScanNodeBase::Init(tnode, state);
}
Status HdfsScanNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
// Compute the minimum bytes required to start a new thread. This is based on the
// file format.
// The higher the estimate, the less likely it is the query will fail but more likely
// the query will be throttled when it does not need to be.
// TODO: how many buffers should we estimate per range. The IoMgr will throttle down to
// one but if there are already buffers queued before memory pressure was hit, we can't
// reclaim that memory.
if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
// Parquet files require buffers per column
scanner_thread_bytes_required_ =
materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_read_buffer_size();
} else {
scanner_thread_bytes_required_ =
3 * runtime_state_->io_mgr()->max_read_buffer_size();
}
// scanner_thread_bytes_required_ now contains the IoBuffer requirement.
// Next we add in the other memory the scanner thread will use.
// e.g. decompression buffers, tuple buffers, etc.
// For compressed text, we estimate this based on the file size (since the whole file
// will need to be decompressed at once). For all other formats, we use a constant.
// TODO: can we do something better?
int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) {
if (file->file_compression != THdfsCompression::NONE) {
int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
}
}
scanner_thread_bytes_required_ += scanner_thread_mem_usage;
row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
return Status::OK();
}
// This function registers the ThreadTokenAvailableCb to start up the initial scanner
// threads. Scan ranges are not issued until the first GetNext() call; scanner threads
// will block on ranges_issued_barrier_ until ranges are issued.
Status HdfsScanNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
if (file_descs_.empty() || progress_.done()) return Status::OK();
// We need at least one scanner thread to make progress. We need to make this
// reservation before any ranges are issued.
runtime_state_->resource_pool()->ReserveOptionalTokens(1);
if (runtime_state_->query_options().num_scanner_threads > 0) {
max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
}
DCHECK_GT(max_num_scanner_threads_, 0);
thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
return Status::OK();
}
void HdfsScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
SetDone();
if (thread_avail_cb_id_ != -1) {
state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
}
scanner_threads_.JoinAll();
num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup());
DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers";
HdfsScanNodeBase::Close(state);
}
void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
const std::vector<THdfsCompression::type>& compression_type) {
lock_guard<SpinLock> l(file_type_counts_);
HdfsScanNodeBase::RangeComplete(file_type, compression_type);
}
void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
unique_lock<mutex> l(lock_);
HdfsScanNodeBase::TransferToScanNodePool(pool);
}
void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
InitNullCollectionValues(row_batch.get());
materialized_row_batches_->AddBatch(move(row_batch));
}
Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
int num_files_queued) {
RETURN_IF_ERROR(
runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
num_unqueued_files_.Add(-num_files_queued);
DCHECK_GE(num_unqueued_files_.Load(), 0);
if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
return Status::OK();
}
// For controlling the amount of memory used for scanners, we approximate the
// scanner mem usage based on scanner_thread_bytes_required_, rather than the
// consumption in the scan node's mem tracker. The problem with the scan node
// trackers is that it does not account for the memory the scanner will use.
// For example, if there is 110 MB of memory left (based on the mem tracker)
// and we estimate that a scanner will use 100MB, we want to make sure to only
// start up one additional thread. However, after starting the first thread, the
// mem tracker value will not change immediately (it takes some time before the
// scanner is running and starts using memory). Therefore we just use the estimate
// based on the number of running scanner threads.
bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
int64_t committed_scanner_mem =
active_scanner_thread_counter_.value() * scanner_thread_bytes_required_;
int64_t tracker_consumption = mem_tracker()->consumption();
int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
if (est_additional_scanner_mem < 0) {
// This is the case where our estimate was too low. Expand the estimate based
// on the usage.
int64_t avg_consumption =
tracker_consumption / active_scanner_thread_counter_.value();
// Take the average and expand it by 50%. Some scanners will not have hit their
// steady state mem usage yet.
// TODO: how can we scale down if we've overestimated.
// TODO: better heuristic?
scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5);
est_additional_scanner_mem = 0;
}
// If we are starting a new thread, take that into account now.
if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
}
void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
// This is called to start up new scanner threads. It's not a big deal if we
// spin up more than strictly necessary since they will go through and terminate
// promptly. However, we want to minimize that by checking a conditions.
// 1. Don't start up if the ScanNode is done
// 2. Don't start up if all the ranges have been taken by another thread.
// 3. Don't start up if the number of ranges left is less than the number of
// active scanner threads.
// 4. Don't start up if no initial ranges have been issued (see IMPALA-1722).
// 5. Don't start up a ScannerThread if materialized_row_batches_ is full since
// we are not scanner bound.
// 6. Don't start up a thread if there isn't enough memory left to run it.
// 7. Don't start up more than maximum number of scanner threads configured.
// 8. Don't start up if there are no thread tokens.
// Case 4. We have not issued the initial ranges so don't start a scanner thread.
// Issuing ranges will call this function and we'll start the scanner threads then.
// TODO: It would be good to have a test case for that.
if (!initial_ranges_issued_) return;
while (true) {
// The lock must be given up between loops in order to give writers to done_,
// all_ranges_started_ etc. a chance to grab the lock.
// TODO: This still leans heavily on starvation-free locks, come up with a more
// correct way to communicate between this method and ScannerThreadHelper
unique_lock<mutex> lock(lock_);
// Cases 1, 2, 3.
if (done_ || all_ranges_started_ ||
active_scanner_thread_counter_.value() >= progress_.remaining()) {
break;
}
// Cases 5 and 6.
if (active_scanner_thread_counter_.value() > 0 &&
(materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
!EnoughMemoryForScannerThread(true))) {
break;
}
// Case 7 and 8.
bool is_reserved = false;
if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
!pool->TryAcquireThreadToken(&is_reserved)) {
break;
}
COUNTER_ADD(&active_scanner_thread_counter_, 1);
COUNTER_ADD(num_scanner_threads_started_counter_, 1);
string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
PrintId(runtime_state_->fragment_instance_id()), id(),
num_scanner_threads_started_counter_->value());
auto fn = [this]() { this->ScannerThread(); };
scanner_threads_.AddThread(
make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
}
}
void HdfsScanNode::ScannerThread() {
SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
// Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
// scanner for finer-grained filtering. Use a thread-local MemPool for the filter
// contexts as the embedded expression evaluators may allocate from it and MemPool
// is not thread safe.
MemPool filter_mem_pool(expr_mem_tracker());
vector<FilterContext> filter_ctxs;
Status filter_status = Status::OK();
for (auto& filter_ctx: filter_ctxs_) {
FilterContext filter;
filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, &filter_mem_pool);
if (!filter_status.ok()) break;
filter_ctxs.push_back(filter);
}
while (!done_) {
{
// Check if we have enough resources (thread token and memory) to keep using
// this thread.
unique_lock<mutex> l(lock_);
if (active_scanner_thread_counter_.value() > 1) {
if (runtime_state_->resource_pool()->optional_exceeded() ||
!EnoughMemoryForScannerThread(false)) {
// We can't break here. We need to update the counter with the lock held or else
// all threads might see active_scanner_thread_counter_.value > 1
COUNTER_ADD(&active_scanner_thread_counter_, -1);
// Unlock before releasing the thread token to avoid deadlock in
// ThreadTokenAvailableCb().
l.unlock();
goto exit;
}
} else {
// If this is the only scanner thread, it should keep running regardless
// of resource constraints.
}
}
bool unused = false;
// Wake up every SCANNER_THREAD_COUNTERS to yield scanner threads back if unused, or
// to return if there's an error.
ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
DiskIoMgr::ScanRange* scan_range;
// Take a snapshot of num_unqueued_files_ before calling GetNextRange().
// We don't want num_unqueued_files_ to go to zero between the return from
// GetNextRange() and the check for when all ranges are complete.
int num_unqueued_files = num_unqueued_files_.Load();
// TODO: the Load() acts as an acquire barrier. Is this needed? (i.e. any earlier
// stores that need to complete?)
AtomicUtil::MemoryBarrier();
Status status = runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range);
if (status.ok() && scan_range != NULL) {
// Got a scan range. Process the range end to end (in this thread).
status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
scan_range);
}
if (!status.ok()) {
{
unique_lock<mutex> l(lock_);
// If there was already an error, the main thread will do the cleanup
if (!status_.ok()) break;
if (status.IsCancelled() && done_) {
// Scan node initiated scanner thread cancellation. No need to do anything.
break;
}
// Set status_ before calling SetDone() (which shuts down the RowBatchQueue),
// to ensure that GetNextInternal() notices the error status.
status_ = status;
}
SetDone();
break;
}
// Done with range and it completed successfully
if (progress_.done()) {
// All ranges are finished. Indicate we are done.
SetDone();
break;
}
if (scan_range == NULL && num_unqueued_files == 0) {
// TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not
// needed to acquire the lock in x86.
unique_lock<mutex> l(lock_);
// All ranges have been queued and GetNextRange() returned NULL. This means that
// every range is either done or being processed by another thread.
all_ranges_started_ = true;
break;
}
}
COUNTER_ADD(&active_scanner_thread_counter_, -1);
exit:
runtime_state_->resource_pool()->ReleaseThreadToken(false);
if (filter_status.ok()) {
for (auto& ctx: filter_ctxs) {
ctx.expr_eval->FreeLocalAllocations();
ctx.expr_eval->Close(runtime_state_);
}
}
filter_mem_pool.FreeAll();
}
namespace {
// Returns true if 'format' uses a scanner derived from BaseSequenceScanner. Used to
// workaround IMPALA-3798.
bool FileFormatIsSequenceBased(THdfsFileFormat::type format) {
return format == THdfsFileFormat::SEQUENCE_FILE ||
format == THdfsFileFormat::RC_FILE ||
format == THdfsFileFormat::AVRO;
}
}
Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
DiskIoMgr::ScanRange* scan_range) {
DCHECK(scan_range != NULL);
ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
int64_t partition_id = metadata->partition_id;
HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id()
<< " partition_id=" << partition_id
<< "\n" << PrintThrift(runtime_state_->instance_ctx());
// IMPALA-3798: Filtering before the scanner is created can cause hangs if a header
// split is filtered out, for sequence-based file formats. If the scanner does not
// process the header split, the remaining scan ranges in the file will not be marked as
// done. See FilePassesFilterPredicates() for the correct logic to mark all splits in a
// file as done; the correct fix here is to do that for every file in a thread-safe way.
if (!FileFormatIsSequenceBased(partition->file_format())) {
if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
// Avoid leaking unread buffers in scan_range.
scan_range->Cancel(Status::CANCELLED);
// Mark scan range as done.
scan_ranges_complete_counter()->Add(1);
progress_.Update(1);
return Status::OK();
}
}
ScannerContext context(runtime_state_, this, partition, scan_range, filter_ctxs);
scoped_ptr<HdfsScanner> scanner;
Status status = CreateAndOpenScanner(partition, &context, &scanner);
if (!status.ok()) {
// If preparation fails, avoid leaking unread buffers in the scan_range.
scan_range->Cancel(status);
if (VLOG_QUERY_IS_ON) {
stringstream ss;
ss << "Error preparing scanner for scan range " << scan_range->file() <<
"(" << scan_range->offset() << ":" << scan_range->len() << ").";
ss << endl << runtime_state_->ErrorLog();
VLOG_QUERY << ss.str();
}
return status;
}
status = scanner->ProcessSplit();
if (VLOG_QUERY_IS_ON && !status.ok() && !status.IsCancelled()) {
// This thread hit an error, record it and bail
stringstream ss;
ss << "Scan node (id=" << id() << ") ran into a parse error for scan range "
<< scan_range->file() << "(" << scan_range->offset() << ":"
<< scan_range->len() << ").";
// Parquet doesn't read the range end to end so the current offset isn't useful.
// TODO: make sure the parquet reader is outputting as much diagnostic
// information as possible.
if (partition->file_format() != THdfsFileFormat::PARQUET) {
ScannerContext::Stream* stream = context.GetStream();
ss << " Processed " << stream->total_bytes_returned() << " bytes.";
}
VLOG_QUERY << ss.str();
}
// Transfer remaining resources to a final batch and add it to the row batch queue.
scanner->Close();
return status;
}
void HdfsScanNode::SetDone() {
{
unique_lock<mutex> l(lock_);
if (done_) return;
done_ = true;
}
if (reader_context_ != NULL) {
runtime_state_->io_mgr()->CancelContext(reader_context_);
}
materialized_row_batches_->Shutdown();
}