blob: b2294bffc44a168241724889a6a9d2775ae1a816 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
#include "exec/exec-node.h"
#include "exec/filter-context.h"
#include "util/runtime-profile.h"
#include "util/thread.h"
#include "gen-cpp/ImpalaInternalService_types.h"
namespace impala {
class BlockingRowBatchQueue;
class TScanRange;
/// Abstract base class of all scan nodes. Subclasses support different storage layers
/// and different threading models.
///
/// Includes ScanNode common counters:
/// BytesRead - total bytes read from disk by this scan node. Provided as a counter
/// as well as a time series that samples the counter. Only implemented for scan node
/// subclasses that expose the bytes read, e.g. HDFS and HBase.
///
/// TotalReadThroughput - BytesRead divided by the total wall clock time that this scan
/// was executing (from Open() to Close()). This gives the aggregate rate that data
/// is read from disks. If this is the only scan executing, ideally this will
/// approach the maximum bandwidth supported by the disks.
///
/// RowsRead - number of top-level rows/tuples read from the storage layer, including
/// those discarded by predicate evaluation. Used for all types of scans.
///
/// CollectionItemsRead - total number of nested collection items read by the scan.
/// Only created for scans (e.g. Parquet) that support nested types.
///
/// ScanRangesComplete - number of scan ranges completed. Initialized for scans that
/// have a concept of "scan range".
///
/// MaterializeTupleTime - wall clock time spent materializing tuples and evaluating
/// predicates.
///
/// The following counters are specific to multithreaded scan node implementations:
///
/// PeakScannerThreadConcurrency - the peak number of scanner threads executing at any
/// one time. Present only for multithreaded scan nodes.
///
/// AverageScannerThreadConcurrency - the average number of scanner threads executing
/// between Open() and the time when the scan completes. Present only for
/// multithreaded scan nodes.
///
/// NumScannerThreadsStarted - the number of scanner threads started for the duration
/// of the ScanNode. This is at most the number of scan ranges but should be much
/// less since a single scanner thread will likely process multiple scan ranges.
/// This is *not* the same as peak scanner thread concurrency because the number of
/// scanner threads can fluctuate during execution of the scan.
///
/// ScannerThreadsTotalWallClockTime - total wall clock time spent in all scanner
/// threads.
///
/// ScannerThreadsUserTime, ScannerThreadsSysTime,
/// ScannerThreadsVoluntaryContextSwitches, ScannerThreadsInvoluntaryContextSwitches -
/// these are aggregated counters across all scanner threads of this scan node. They
/// are taken from getrusage. See RuntimeProfile::ThreadCounters for details.
///
/// RowBatchesEnqueued, RowBatchBytesEnqueued - Number of row batches and bytes enqueued
/// in the scan node's output queue.
///
/// RowBatchQueueGetWaitTime - Wall clock time that the fragment execution thread spent
/// blocked waiting for row batches to be added to the scan node's output queue.
///
/// RowBatchQueuePutWaitTime - Wall clock time that the scanner threads spent blocked
/// waiting for space in the scan node's output queue when it is full.
///
/// RowBatchQueueCapacity - capacity in batches of the scan node's output queue.
///
/// RowBatchQueuePeakMemoryUsage - peak memory consumption of row batches enqueued in
/// the scan node's output queue.
///
class ScanNode : public ExecNode {
public:
ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
scan_range_params_(NULL) {}
virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
/// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can
/// start periodic counters and rely on this function stopping them.
virtual void Close(RuntimeState* state);
/// This should be called before Prepare(), and the argument must be not destroyed until
/// after Prepare().
void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) {
scan_range_params_ = &scan_range_params;
}
virtual bool IsScanNode() const { return true; }
/// Returns true iff the data cache in IoMgr is disabled by query options.
bool IsDataCacheDisabled() const;
RuntimeState* runtime_state() const { return runtime_state_; }
RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; }
RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; }
RuntimeProfile::Counter* collection_items_read_counter() const {
return collection_items_read_counter_;
}
RuntimeProfile::Counter* materialize_tuple_timer() const {
return materialize_tuple_timer_;
}
/// names of ScanNode common counters
static const std::string BYTES_READ_COUNTER;
static const std::string ROWS_READ_COUNTER;
static const std::string COLLECTION_ITEMS_READ_COUNTER;
static const std::string TOTAL_HDFS_READ_TIMER;
static const std::string TOTAL_HDFS_OPEN_FILE_TIMER;
static const std::string TOTAL_HBASE_READ_TIMER;
static const std::string TOTAL_THROUGHPUT_COUNTER;
static const std::string PER_READ_THREAD_THROUGHPUT_COUNTER;
static const std::string NUM_DISKS_ACCESSED_COUNTER;
static const std::string MATERIALIZE_TUPLE_TIMER;
static const std::string SCAN_RANGES_COMPLETE_COUNTER;
static const std::string SCANNER_THREAD_COUNTERS_PREFIX;
static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME;
static const std::string AVERAGE_SCANNER_THREAD_CONCURRENCY;
static const std::string PEAK_SCANNER_THREAD_CONCURRENCY;
static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY;
static const std::string NUM_SCANNER_THREADS_STARTED;
const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; }
const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
protected:
RuntimeState* runtime_state_ = nullptr;
/// The scan ranges this scan node is responsible for. Not owned.
const std::vector<TScanRangeParams>* scan_range_params_;
/// Total bytes read from the scanner. Initialised in subclasses that track
/// bytes read, including HDFS and HBase by calling AddBytesReadCounters().
RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
/// Time series of 'bytes_read_counter_', initialized at the same time.
RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_ = nullptr;
/// Wall based aggregate read throughput [bytes/sec]. Depends on 'bytes_read_counter_'
/// and initialized at the same time.
RuntimeProfile::Counter* total_throughput_counter_ = nullptr;
/// # top-level rows/tuples read from the scanner, including those discarded by
/// EvalConjuncts(). Used for all types of scans.
RuntimeProfile::Counter* rows_read_counter_ = nullptr;
/// # items the scanner read into CollectionValues. For example, for schema
/// array<struct<B: INT, array<C: INT>> and tuple
/// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
/// Initialized by subclasses that support scanning nested types.
RuntimeProfile::Counter* collection_items_read_counter_ = nullptr;
/// Total time writing tuple slots. Used for all types of scans.
RuntimeProfile::Counter* materialize_tuple_timer_ = nullptr;
/// Total number of scan ranges completed. Initialised in subclasses that have a
/// concept of "scan range", including HDFS and Kudu.
RuntimeProfile::Counter* scan_ranges_complete_counter_ = nullptr;
/// Expressions to evaluate the input rows for filtering against runtime filters.
std::vector<ScalarExpr*> filter_exprs_;
/// List of contexts for expected runtime filters for this scan node. These contexts are
/// cloned by individual scanners to be used in multi-threaded contexts, passed through
/// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'.
std::vector<FilterContext> filter_ctxs_;
/// Initializes 'bytes_read_counter_', 'bytes_read_timeseries_counter_' and
/// 'total_throughput_counter_'
void AddBytesReadCounters();
/// Waits for runtime filters to arrive, checking every 20ms. Max wait time is specified
/// by the 'runtime_filter_wait_time_ms' flag, which is overridden by the query option
/// of the same name. Returns true if all filters arrived within the time limit (as
/// measured from the time of RuntimeFilterBank::RegisterFilter()), false otherwise.
bool WaitForRuntimeFilters();
/// Additional state only used by multi-threaded scan node implementations.
/// The lifecycle is as follows:
/// 1. Prepare() is called.
/// 2. Open() is called.
/// 3. Other methods can be called.
/// 4. Shutdown() is called to prevent new batches being added to the queue.
/// 5. Close() is called to release all resources.
class ScannerThreadState {
public:
/// Called from *ScanNode::Prepare() to initialize counters and MemTracker.
/// 'estimated_per_thread_mem' is the estimated memory consumption of each scanner
/// thread and must be positive. Prepare() registers the scan with the query-global
/// ScannerMemLimit and accounts for the first thread's memory consumption.
void Prepare(ScanNode* parent, int64_t estimated_per_thread_mem);
/// Called from *ScanNode::Open() to create the row batch queue and start periodic
/// counters running. 'max_row_batches_override' determines size of the row batch
/// queue if >= 0. Otherwise the size is automatically determined.
void Open(ScanNode* parent, int64_t max_row_batches_override);
/// Called when no more batches need to be enqueued or dequeued. Shuts down the
/// queue. Thread-safe.
void Shutdown();
/// Waits for all scanner threads to finish and cleans up the queue. Called from
/// *ScanNode::Close(). No other methods can be called after this. Not thread-safe.
void Close(ScanNode* parent);
/// Add a new scanner thread to the thread group. Not thread-safe: only one thread
/// should call AddThread() at a time.
void AddThread(std::unique_ptr<Thread> thread);
/// Get the number of active scanner threads. Thread-safe.
int32_t GetNumActive() const { return num_active_.Load(); }
/// Get the number of started scanner threads. Thread-safe.
int32_t GetNumStarted() const { return num_threads_started_->value(); }
/// Called from a scanner thread that is exiting to decrement the number of active
/// scanner threads. Returns true if this was the last thread to exit. Thread-safe.
bool DecrementNumActive();
/// Adds a materialized row batch for the scan node. This is called from scanner
/// threads. This function will block if the row batch queue is full. Thread-safe.
void EnqueueBatch(std::unique_ptr<RowBatch> row_batch);
/// Adds a materialized row batch for the scan node. This is called from scanner
/// threads. This function will block for up to timeout_micros if the row batch
/// queue is full. Return true and takes ownership of '*row_batch' if the batch
/// was successfully enqueued. Returns false if the timeout expired or the queue
/// was shut down and the batch could not be enqueued and does not take ownership
/// of '*row_batch'. Thread-safe.
bool EnqueueBatchWithTimeout(std::unique_ptr<RowBatch>* row_batch,
int64_t timeout_micros);
BlockingRowBatchQueue* batch_queue() { return batch_queue_.get(); }
RuntimeProfile::ThreadCounters* thread_counters() const { return thread_counters_; }
int max_num_scanner_threads() const { return max_num_scanner_threads_; }
int64_t estimated_per_thread_mem() const { return estimated_per_thread_mem_; }
RuntimeProfile::Counter* scanner_thread_mem_unavailable_counter() const {
return scanner_thread_mem_unavailable_counter_;
}
private:
/// Thread group for all scanner threads.
ThreadGroup scanner_threads_;
/// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
/// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
/// are generally cpu bound so there is no benefit in spinning up more threads than
/// the number of cores. Set in Open().
int max_num_scanner_threads_ = 0;
/// Estimated amount of memory that each additional scanner thread will consume. Used
/// to decide whether enough memory is available to create a new scanner thread. Set
/// to 0 when the memory for the first thread claimed in Prepare() is released.
int64_t estimated_per_thread_mem_ = 0;
// MemTracker for queued row batches. Initialized in Prepare(). Owned by RuntimeState.
MemTracker* row_batches_mem_tracker_ = nullptr;
/// Outgoing row batches queue. Row batches are produced asynchronously by the scanner
/// threads and consumed by the main fragment thread that calls GetNext() on the scan
/// node.
boost::scoped_ptr<BlockingRowBatchQueue> batch_queue_;
/// The number of scanner threads currently running.
AtomicInt32 num_active_{0};
/// Aggregated scanner thread CPU time counters.
RuntimeProfile::ThreadCounters* thread_counters_ = nullptr;
/// Average number of executing scanner threads
/// This should be created in Open and stopped when all the scanner threads are done.
RuntimeProfile::Counter* average_concurrency_ = nullptr;
/// Peak number of executing scanner threads.
RuntimeProfile::HighWaterMarkCounter* peak_concurrency_ = nullptr;
/// Cumulative number of scanner threads created during the scan. Some may be created
/// and then destroyed, so this can exceed the peak number of threads.
RuntimeProfile::Counter* num_threads_started_ = nullptr;
/// The number of row batches enqueued into the row batch queue.
RuntimeProfile::Counter* row_batches_enqueued_ = nullptr;
/// The total bytes of row batches enqueued into the row batch queue.
RuntimeProfile::Counter* row_batch_bytes_enqueued_ = nullptr;
/// The wait time for fetching a row batch from the row batch queue.
RuntimeProfile::Counter* row_batches_get_timer_ = nullptr;
/// The wait time for enqueuing a row batch into the row batch queue.
RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
/// Peak memory consumption of the materialized batch queue. Updated in Close().
RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;
/// Number of times scanner threads were not created because of memory not available.
RuntimeProfile::Counter* scanner_thread_mem_unavailable_counter_ = nullptr;
};
};
}