blob: f7c33ade807ba4511b4dc45c8f121b9be357f20b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_
#define IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_
#include <stdint.h>
#include <memory>
#include <unordered_set>
#include <vector>
#include <tuple>
#include <boost/unordered_map.hpp>
#include <boost/scoped_ptr.hpp>
#include "exec/filter-context.h"
#include "exec/scan-node.h"
#include "runtime/descriptors.h"
#include "runtime/io/request-context.h"
#include "runtime/io/request-ranges.h"
#include "util/avro-util.h"
#include "util/container-util.h"
#include "util/progress-updater.h"
#include "util/spinlock.h"
namespace impala {
class ScannerContext;
class DescriptorTbl;
class HdfsScanner;
class RowBatch;
class Status;
class Tuple;
class TPlanNode;
class TScanRange;
/// Maintains per file information for files assigned to this scan node. This includes
/// all the splits for the file. Note that it is not thread-safe.
struct HdfsFileDesc {
HdfsFileDesc(const std::string& filename)
: fs(NULL), filename(filename), file_length(0), mtime(0),
file_compression(THdfsCompression::NONE), is_erasure_coded(false) {
}
/// Connection to the filesystem containing the file.
hdfsFS fs;
/// File name including the path.
std::string filename;
/// Length of the file. This is not related to which parts of the file have been
/// assigned to this node.
int64_t file_length;
/// Last modified time
int64_t mtime;
THdfsCompression::type file_compression;
/// is erasure coded
bool is_erasure_coded;
/// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
std::vector<io::ScanRange*> splits;
};
/// Struct for additional metadata for scan ranges. This contains the partition id
/// that this scan range is for.
struct ScanRangeMetadata {
/// The partition id that this range is part of.
int64_t partition_id;
/// For parquet scan ranges we initially create a request for the file footer for each
/// split; we store a pointer to the actual split so that we can recover its information
/// for the scanner to process.
const io::ScanRange* original_split;
/// True, if this object belongs to a scan range which is the header of a
/// sequence-based file
bool is_sequence_header = false;
ScanRangeMetadata(int64_t partition_id, const io::ScanRange* original_split)
: partition_id(partition_id), original_split(original_split) { }
};
/// Base class for all Hdfs scan nodes. Contains common members and functions
/// that are independent of whether batches are materialized by the main thread
/// (via HdfsScanner::GexNext()) or by spinning up separate threads that feed
/// into a RowBatch queue (via HdfsScanner::ProcessSplit()). Those specifics
/// are expected to be implemented in subclasses.
///
/// Subclasses may expect to receive runtime filters produced elsewhere in the plan
/// (even from remote fragments). These filters arrive asynchronously during execution,
/// and are applied as soon as they arrive. Filters may be applied by the scan node in
/// the following scopes:
///
/// 1. Per-file (all file formats, partition column filters only) - filtering at this
/// scope saves IO as the filters are applied before scan ranges are issued.
/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at
/// this scope saves CPU as filtered scan ranges are never scanned.
///
/// Scanners may also use the same filters to eliminate rows at finer granularities
/// (e.g. per row).
///
/// Counters:
///
/// TotalRawHdfsReadTime - the total wall clock time spent by Disk I/O threads in HDFS
/// read operations. For example, if we have 3 reading threads and each spent 1 sec,
/// this counter will report 3 sec.
///
/// TotalRawHdfsOpenFileTime - the total wall clock time spent by Disk I/O threads in
/// HDFS open operations.
///
/// PerReadThreadRawHdfsThroughput - the read throughput in bytes/sec for each HDFS
/// read thread while it is executing I/O operations on behalf of this scan.
///
/// NumDisksAccessed - number of distinct disks accessed by HDFS scan. Each local disk
/// is counted as a disk and each remote disk queue (e.g. HDFS remote reads, S3)
/// is counted as a distinct disk.
///
/// ScannerIoWaitTime - total amount of time scanner threads spent waiting for
/// I/O. This is comparable to ScannerThreadsTotalWallClockTime in the traditional
/// HDFS scan nodes and the scan node total time for the MT_DOP > 1 scan nodes.
/// Low values show that each I/O completed before or around the time that the
/// scanner thread was ready to process the data. High values show that scanner
/// threads are spending significant time waiting for I/O instead of processing data.
/// Note that if CPU load is high, this can include time that the thread is runnable
/// but not scheduled.
///
/// AverageHdfsReadThreadConcurrency - the average number of HDFS read threads
/// executing read operations on behalf of this scan. Higher values show that this
/// scan is using a larger proportion of the I/O capacity of the system. Lower values
/// show that either this thread is not I/O bound or that it is getting a small share
/// of the I/O capacity of the system because of other concurrently executing
/// queries.
///
/// Hdfs Read Thread Concurrency Bucket - the bucket counting (%) of HDFS read thread
/// concurrency.
///
/// TODO: Once the legacy scan node has been removed, several functions can be made
/// non-virtual. Also merge this class with HdfsScanNodeMt.
class HdfsScanNodeBase : public ScanNode {
public:
HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~HdfsScanNodeBase();
virtual Status Init(const TPlanNode& tnode, RuntimeState* state)
override WARN_UNUSED_RESULT;
virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
virtual void Codegen(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
virtual Status Reset(
RuntimeState* state, RowBatch* row_batch) override WARN_UNUSED_RESULT;
virtual void Close(RuntimeState* state) override;
/// Returns true if this node uses separate threads for scanners that append RowBatches
/// to a queue, false otherwise.
virtual bool HasRowBatchQueue() const = 0;
const std::vector<SlotDescriptor*>& materialized_slots()
const { return materialized_slots_; }
/// Returns the tuple idx into the row for this scan node to output to.
/// Currently this is always 0.
int tuple_idx() const { return 0; }
/// Returns number of partition keys in the table.
int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
/// Returns number of partition key slots.
int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
int min_max_tuple_id() const { return min_max_tuple_id_; }
const std::vector<ScalarExprEvaluator*>& min_max_conjunct_evals() const {
return min_max_conjunct_evals_;
}
const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; }
const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
int skip_header_line_count() const { return skip_header_line_count_; }
io::RequestContext* reader_context() const { return reader_context_.get(); }
bool optimize_parquet_count_star() const {
return parquet_count_star_slot_offset_ != -1;
}
int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; }
typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
ConjunctEvaluatorsMap;
const ConjunctEvaluatorsMap& conjuncts_map() const { return conjunct_evals_map_; }
/// Slot Id => Dictionary Filter eligible conjuncts for that slot
typedef std::map<TSlotId, std::vector<int32_t>> TDictFilterConjunctsMap;
const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map() const {
return thrift_dict_filter_conjuncts_map_;
}
RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
return max_compressed_text_file_length_;
}
RuntimeProfile::Counter* scanner_io_wait_time() const {
return scanner_io_wait_time_;
}
const static int SKIP_COLUMN = -1;
/// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if
/// that path is not materialized.
int GetMaterializedSlotIdx(const std::vector<int>& path) const {
PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
return result->second;
}
/// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
/// column i should be materialized.
const bool* is_materialized_col() {
return reinterpret_cast<const bool*>(is_materialized_col_.data());
}
/// Returns the per format codegen'd function. Scanners call this to get the
/// codegen'd function to use. Returns NULL if codegen should not be used.
void* GetCodegenFn(THdfsFileFormat::type);
inline void IncNumScannersCodegenEnabled() { num_scanners_codegen_enabled_.Add(1); }
inline void IncNumScannersCodegenDisabled() { num_scanners_codegen_disabled_.Add(1); }
/// Allocate a new scan range object, stored in the runtime state's object pool. For
/// scan ranges that correspond to the original hdfs splits, the partition id must be
/// set to the range's partition id. Partition_id is mandatory as it is used to gather
/// file descriptor info. expected_local should be true if this scan range is not
/// expected to require a remote read. The range must fall within the file bounds.
/// That is, the offset must be >= 0, and offset + len <= file_length.
/// If not NULL, the 'original_split' pointer is stored for reference in the scan range
/// metadata of the scan range that is to be allocated.
/// This is thread safe.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
bool is_erasure_coded, int64_t mtime,
const io::BufferOpts& buffer_opts,
const io::ScanRange* original_split = nullptr);
/// Same as above, but it takes a pointer to a ScanRangeMetadata object which contains
/// the partition_id, original_splits, and other information about the scan range.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
bool is_erasure_coded, int64_t mtime, const io::BufferOpts& buffer_opts);
/// Same as the first overload, but it takes sub-ranges as well.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
int64_t mtime, const io::BufferOpts& buffer_opts,
const io::ScanRange* original_split = nullptr);
/// Same as above, but it takes both sub-ranges and metadata.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
ScanRangeMetadata* metadata, int disk_id, bool expected_local,
bool is_erasure_coded, int64_t mtime, const io::BufferOpts& buffer_opts);
/// Old API for compatibility with text scanners (e.g. LZO text scanner).
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, int cache_options,
bool expected_local, int64_t mtime, bool is_erasure_coded = false,
const io::ScanRange* original_split = nullptr);
/// Adds ranges to the io mgr queue. Can be overridden to add scan-node specific
/// actions like starting scanner threads. Must not be called once
/// remaining_scan_range_submissions_ is 0.
/// The enqueue_location specifies whether the scan ranges are added to the head or
/// tail of the queue.
virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
EnqueueLocation enqueue_location = EnqueueLocation::TAIL) WARN_UNUSED_RESULT;
/// Adds all splits for file_desc to the io mgr queue.
inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc,
EnqueueLocation enqueue_location = EnqueueLocation::TAIL) WARN_UNUSED_RESULT {
return AddDiskIoRanges(file_desc->splits);
}
/// When this counter goes to 0, AddDiskIoRanges() can no longer be called.
/// Furthermore, this implies that scanner threads failing to
/// acquire a new scan range with StartNextScanRange() can exit.
inline void UpdateRemainingScanRangeSubmissions(int32_t delta) {
remaining_scan_range_submissions_.Add(delta);
DCHECK_GE(remaining_scan_range_submissions_.Load(), 0);
}
/// Allocates and initializes a new template tuple allocated from pool with values
/// from the partition columns for the current scan range, if any,
/// Returns NULL if there are no partition keys slots.
Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals,
MemPool* pool, RuntimeState* state) const;
/// Given a partition_id and filename returns the related file descriptor
/// DCHECK ensures there is always file descriptor returned
HdfsFileDesc* GetFileDesc(int64_t partition_id, const std::string& filename);
/// Sets the scanner specific metadata for 'partition_id' and 'filename'.
/// Scanners can use this to store file header information. Thread safe.
void SetFileMetadata(int64_t partition_id, const std::string& filename, void* metadata);
/// Returns the scanner specific metadata for 'partition_id' and 'filename'.
/// Returns nullptr if there is no metadata. Thread safe.
void* GetFileMetadata(int64_t partition_id, const std::string& filename);
/// Called by scanners when a range is complete. Used to record progress.
/// This *must* only be called after a scanner has completely finished its
/// scan range (i.e. context->Flush()), and has returned the final row batch.
/// Otherwise, scan nodes using a RowBatch queue may lose the last batch due
/// to racing with shutting down the queue.
void RangeComplete(const THdfsFileFormat::type& file_type,
const THdfsCompression::type& compression_type, bool skipped = false);
/// Same as above except for when multiple compression codecs were used
/// in the file. The metrics are incremented for each compression_type.
/// 'skipped' is set to true in the following cases -
/// 1. when a scan range is filtered at runtime
/// 2. scan range is a metadata read only(e.x. count(*) on parquet files)
virtual void RangeComplete(const THdfsFileFormat::type& file_type,
const std::vector<THdfsCompression::type>& compression_type, bool skipped = false);
/// Calls RangeComplete() with skipped=true for all the splits of the file
void SkipFile(const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
/// Utility function to compute the order in which to materialize slots to allow for
/// computing conjuncts as slots get materialized (on partial tuples).
/// 'order' will contain for each slot, the first conjunct it is associated with.
/// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
/// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have
/// order set to conjuncts.size()
void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
/// Returns true if there are no materialized slots, such as a count(*) over the table.
inline bool IsZeroSlotTableScan() const {
return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
}
/// Transfers all memory from 'pool' to 'scan_node_pool_'.
virtual void TransferToScanNodePool(MemPool* pool);
/// map from volume id to <number of split, per volume split lengths>
typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumeStats;
/// Update the per volume stats with the given scan range params list
static void UpdateHdfsSplitStats(
const std::vector<TScanRangeParams>& scan_range_params_list,
PerVolumeStats* per_volume_stats);
/// Output the per_volume_stats to stringstream. The output format is a list of:
/// <volume id>:<# splits>/<per volume split lengths>
static void PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats,
std::stringstream* ss);
/// Description string for the per volume stats output.
static const std::string HDFS_SPLIT_STATS_DESC;
/// Returns true if partition 'partition_id' passes all the filter predicates in
/// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the
/// counter groups in FilterStats, and is used to update the correct statistics.
///
/// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the
/// function returns true, or a set of filter contexts to evaluate.
bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
const std::vector<FilterContext>& filter_ctxs);
/// Helper to increase reservation from 'curr_reservation' up to 'ideal_reservation'
/// that may succeed in getting a partial increase if the full increase is not
/// possible. First increases to an I/O buffer multiple then increases in I/O buffer
/// sized increments. 'curr_reservation' can refer to a "share' of the total
/// reservation of the buffer pool client, e.g. the 'share" belonging to a single
/// scanner thread. Returns the new reservation after increases.
int64_t IncreaseReservationIncrementally(int64_t curr_reservation,
int64_t ideal_reservation);
/// Update the number of [un]compressed bytes read for the given SlotId. This is used
/// to track the number of bytes read per column and is meant to be called by
/// individual scanner classes.
void UpdateBytesRead(
SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read);
protected:
friend class ScannerContext;
friend class HdfsScanner;
/// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
const int min_max_tuple_id_;
/// Conjuncts to evaluate on parquet::Statistics.
vector<ScalarExpr*> min_max_conjuncts_;
vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
/// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics.
TupleDescriptor* min_max_tuple_desc_ = nullptr;
// Number of header lines to skip at the beginning of each file of this table. Only set
// to values > 0 for hdfs text files.
const int skip_header_line_count_;
/// Tuple id resolved in Prepare() to set tuple_desc_
const int tuple_id_;
/// The byte offset of the slot for Parquet metadata if Parquet count star optimization
/// is enabled. When set, this scan node can optimize a count(*) query by populating
/// the tuple with data from the Parquet num rows statistic. See
/// applyParquetCountStartOptimization() in HdfsScanNode.java.
const int parquet_count_star_slot_offset_;
/// RequestContext object to use with the disk-io-mgr for reads.
std::unique_ptr<io::RequestContext> reader_context_;
/// Descriptor for tuples this scan node constructs
const TupleDescriptor* tuple_desc_ = nullptr;
/// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
/// the partition columns for that partition materialized. Used to filter files and scan
/// ranges on partition-column filters. Populated in Open().
boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
/// Descriptor for the hdfs table, including partition and format metadata.
/// Set in Prepare, owned by RuntimeState
const HdfsTableDescriptor* hdfs_table_ = nullptr;
/// The root of the table's Avro schema, if we're scanning an Avro table.
ScopedAvroSchemaElement avro_schema_;
/// Partitions scanned by this scan node.
std::unordered_set<int64_t> partition_ids_;
/// This is a pair for partition ID and filename
typedef pair<int64_t, std::string> PartitionFileKey;
/// partition_id, File path => file descriptor (which includes the file's splits)
typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap;
FileDescMap file_descs_;
/// File format => file descriptors.
typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>>
FileFormatsMap;
FileFormatsMap per_type_files_;
/// Scanner specific per file metadata (e.g. header information) and associated lock.
/// Key of the map is partition_id, filename pair
/// TODO: Remove this lock when removing the legacy scanners and scan nodes.
boost::mutex metadata_lock_;
std::unordered_map<PartitionFileKey, void*, pair_hash> per_file_metadata_;
/// Conjuncts for each materialized tuple (top-level row batch tuples and collection
/// item tuples). Includes a copy of ExecNode.conjuncts_.
typedef std::unordered_map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
ConjunctsMap conjuncts_map_;
ConjunctEvaluatorsMap conjunct_evals_map_;
/// Dictionary filtering eligible conjuncts for each slot. Set to nullptr when there
/// are no dictionary filters.
const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map_;
/// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
/// the first call to GetNext(). The token manager, in a different thread, will read
/// this variable.
bool initial_ranges_issued_ = false;
/// When this counter drops to 0, AddDiskIoRanges() will not be called again, and
/// therefore scanner threads that can't get work should exit. For most
/// file formats (except for sequence-based formats), this is 0 after
/// IssueInitialRanges(). Note that some scanners (namely Parquet) issue
/// additional work to the IO subsystem without using AddDiskIoRanges(),
/// but that is managed within the scanner, and doesn't require
/// additional scanner threads.
AtomicInt32 remaining_scan_range_submissions_ = { 1 };
/// Per scanner type codegen'd fn.
typedef boost::unordered_map<THdfsFileFormat::type, void*> CodegendFnMap;
CodegendFnMap codegend_fn_map_;
/// Maps from a slot's path to its index into materialized_slots_.
typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
PathToSlotIdxMap path_to_materialized_slot_idx_;
/// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
/// for 0 <= i < total # columns in table
//
/// This should be a vector<bool>, but bool vectors are special-cased and not stored
/// internally as arrays, so instead we store as chars and cast to bools as needed
std::vector<char> is_materialized_col_;
/// Vector containing slot descriptors for all non-partition key slots. These
/// descriptors are sorted in order of increasing col_pos.
std::vector<SlotDescriptor*> materialized_slots_;
/// Vector containing slot descriptors for all partition key slots.
std::vector<SlotDescriptor*> partition_key_slots_;
/// Keeps track of total splits and the number finished.
ProgressUpdater progress_;
/// Counters which track the number of scanners that have codegen enabled for the
/// materialize and conjuncts evaluation code paths.
AtomicInt32 num_scanners_codegen_enabled_;
AtomicInt32 num_scanners_codegen_disabled_;
/// If true, counters are actively running and need to be reported in the runtime
/// profile.
bool counters_running_ = false;
/// The size of the largest compressed text file to be scanned. This is used to
/// estimate scanner thread memory usage.
RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_ = nullptr;
/// Disk accessed bitmap
RuntimeProfile::Counter disks_accessed_bitmap_;
/// Total number of bytes read locally
RuntimeProfile::Counter* bytes_read_local_ = nullptr;
/// Total number of bytes read via short circuit read
RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr;
/// Total number of bytes read from data node cache
RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr;
/// Total number of remote scan ranges
RuntimeProfile::Counter* num_remote_ranges_ = nullptr;
/// Total number of bytes read remotely that were expected to be local
RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr;
/// Total number of file handle opens where the file handle was present in the cache
RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr;
/// Total number of file handle opens where the file handle was not in the cache
RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
/// Counters for data cache.
RuntimeProfile::Counter* data_cache_hit_count_ = nullptr;
RuntimeProfile::Counter* data_cache_partial_hit_count_ = nullptr;
RuntimeProfile::Counter* data_cache_miss_count_ = nullptr;
RuntimeProfile::Counter* data_cache_hit_bytes_ = nullptr;
RuntimeProfile::Counter* data_cache_miss_bytes_ = nullptr;
/// The amount of time scanner threads spend waiting for I/O.
RuntimeProfile::Counter* scanner_io_wait_time_ = nullptr;
/// The number of active hdfs reading threads reading for this node.
RuntimeProfile::Counter active_hdfs_read_thread_counter_;
/// Average number of active hdfs reading threads
/// This should be created in Open() and stopped when all the scanner threads are done.
RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;
/// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
/// taken where there are i concurrent hdfs read thread running. Created in Open().
std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
/// HDFS read throughput per Disk I/O thread [bytes/sec],
RuntimeProfile::Counter* per_read_thread_throughput_counter_ = nullptr;
/// Total number of disks accessed for this scan node.
RuntimeProfile::Counter* num_disks_accessed_counter_ = nullptr;
/// Total file read time in I/O mgr disk thread.
RuntimeProfile::Counter* hdfs_read_timer_ = nullptr;
/// Total time spent opening file handles in I/O mgr disk thread.
RuntimeProfile::Counter* hdfs_open_file_timer_ = nullptr;
/// Track stats about ideal/actual reservation for initial scan ranges so we can
/// determine if the scan got all of the reservation it wanted. Does not include
/// subsequent reservation increases done by scanner implementation (e.g. for Parquet
/// columns).
RuntimeProfile::SummaryStatsCounter* initial_range_ideal_reservation_stats_ = nullptr;
RuntimeProfile::SummaryStatsCounter* initial_range_actual_reservation_stats_ = nullptr;
/// Track stats about the number of bytes read per column. Each sample in the counter is
/// the size of a single column that is scanned by the scan node. The scan node tracks
/// the number of bytes read for each column it processes, and when the scan node is
/// closed, it updates these counters with the size of each column.
RuntimeProfile::SummaryStatsCounter* compressed_bytes_read_per_column_counter_ =
nullptr;
RuntimeProfile::SummaryStatsCounter* uncompressed_bytes_read_per_column_counter_ =
nullptr;
/// Pool for allocating some amounts of memory that is shared between scanners.
/// e.g. partition key tuple and their string buffers
boost::scoped_ptr<MemPool> scan_node_pool_;
/// Status of failed operations. This is set in the ScannerThreads
/// Returned in GetNext() if an error occurred. An non-ok status triggers cleanup
/// scanner threads.
Status status_;
/// Struct that tracks the uncompressed and compressed bytes read. Used by the map
/// bytes_read_per_col_ to track the [un]compressed bytes read per column. Types are
/// atomic as the struct may be updated concurrently.
struct BytesRead {
AtomicInt64 uncompressed_bytes_read;
AtomicInt64 compressed_bytes_read;
};
/// Map from SlotId (column identifer) to a pair where the first entry is the number of
/// uncompressed bytes read for the column and the second entry is the number of
/// compressed bytes read for the column. This map is used to update the
/// [un]compressed_bytes_read_per_column counter.
std::unordered_map<SlotId, BytesRead> bytes_read_per_col_;
/// Lock that controls access to bytes_read_per_col_ so that multiple scanners
/// can update the map concurrently
boost::shared_mutex bytes_read_per_col_lock_;
/// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
/// issues initial ranges for all file types. Waits for runtime filters if necessary.
/// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
/// A non-ok status is returned only if it encounters an invalid scan range or if a
/// scanner thread cancels the scan when it runs into an error.
Status IssueInitialScanRanges(RuntimeState* state) WARN_UNUSED_RESULT;
/// Gets the next scan range to process and allocates buffer for it. 'reservation' is
/// an in/out argument with the current reservation available for this range. It may
/// be increased by this function up to a computed "ideal" reservation, in which case
/// *reservation is increased to reflect the new reservation.
///
/// Returns Status::OK() and sets 'scan_range' if it gets a range to process. Returns
/// Status::OK() and sets 'scan_range' to NULL when no more ranges are left to process.
/// Returns an error status if there was an error getting the range or allocating
/// buffers.
Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
/// Helper for the CreateAndOpenScanner() implementations in the subclass. Creates and
/// opens a new scanner for this partition type. Depending on the outcome, the
/// behaviour differs:
/// - If the scanner is successfully created and opened, returns OK and sets *scanner.
/// - If the scanner cannot be created, returns an error and does not set *scanner.
/// - If the scanner is created but opening fails, returns an error and sets *scanner.
/// The caller is then responsible for closing the scanner.
Status CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
WARN_UNUSED_RESULT;
/// Recursively initializes all NULL collection slots to an empty CollectionValue in
/// addition to maintaining the null bit. Hack to allow UnnestNode to project out
/// collection slots. Assumes that the null bit has already been un/set.
/// TODO: Remove this function once the TODOs in UnnestNode regarding projection
/// have been addressed.
void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const;
/// Helper to call InitNullCollectionValues() on all tuples produced by this scan
/// in 'row_batch'.
void InitNullCollectionValues(RowBatch* row_batch) const;
/// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered
/// and therefore not processed. 'file_type' is the the format of 'file', and is used
/// for bookkeeping. Returns true if all filters pass or are not present.
bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs,
const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
/// Stops periodic counters and aggregates counter values for the entire scan node.
/// This should be called as soon as the scan node is complete to get the most accurate
/// counter values.
/// This can be called multiple times, subsequent calls will be ignored.
/// This must be called on Close() to unregister counters.
/// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
void StopAndFinalizeCounters();
/// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based on the
/// debug action specified for the query.
Status ScanNodeDebugAction(TExecNodePhase::type phase) WARN_UNUSED_RESULT;
private:
class HdfsCompressionTypesSet {
public:
HdfsCompressionTypesSet(): bit_map_(0) {
DCHECK_GE(sizeof(bit_map_) * CHAR_BIT, _THdfsCompression_VALUES_TO_NAMES.size());
}
bool HasType(THdfsCompression::type type) {
return (bit_map_ & (1 << type)) != 0;
}
void AddType(const THdfsCompression::type type) {
bit_map_ |= (1 << type);
}
int Size() { return BitUtil::Popcount(bit_map_); }
THdfsCompression::type GetFirstType() {
DCHECK_GT(Size(), 0);
for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) {
THdfsCompression::type type = static_cast<THdfsCompression::type>(elem.first);
if (HasType(type)) return type;
}
return THdfsCompression::NONE;
}
// The following operator overloading is needed so this class can be part of the
// std::map key.
bool operator< (const HdfsCompressionTypesSet& o) const {
return bit_map_ < o.bit_map_;
}
bool operator== (const HdfsCompressionTypesSet& o) const {
return bit_map_ == o.bit_map_;
}
private:
uint32_t bit_map_;
};
/// Mapping of file formats to the number of splits of that type. The key is a tuple
/// containing:
/// * file type
/// * whether the split was skipped
/// * compression types set
typedef std::map<std::tuple<THdfsFileFormat::type, bool, HdfsCompressionTypesSet>, int>
FileTypeCountsMap;
FileTypeCountsMap file_type_counts_;
};
}
#endif