blob: 78cd0a9249a96829a0a827f75a58506181152e11 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <stack>
#include <orc/OrcFile.hh>
#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/runtime-state.h"
#include "exec/hdfs-scanner.h"
#include "exec/hdfs-scan-node.h"
#include "exec/orc-metadata-utils.h"
#include "util/runtime-profile-counters.h"
namespace impala {
struct HdfsFileDesc;
class OrcStructReader;
class OrcComplexColumnReader;
/// This scanner leverage the ORC library to parse ORC files located in HDFS. Data is
/// transformed into Impala in-memory representation (i.e. Tuples, RowBatches) by
/// different kinds of OrcColumnReaders.
/// Steps of how we create orc::Reader, orc::RowReader and OrcColumnReaders:
/// * 'ProcessFileTail' to create orc::Reader with OrcMemPool and ScanRangeInputStream
/// * Resolve TupleDescriptors to get a list of mapped orc::Types (a.k.a column/node).
/// Init 'row_reader_options_' with these selected type ids.
/// * Build a map 'col_id_path_map_' from each orc::Type id to a SchemaPath. Will be
/// used in creating OrcColumnReaders.
/// * Create temporary orc::RowReader with 'row_reader_options_' to get the selected
/// subset of the schema (a tree of the selected orc::Types, i.e. the local variable
/// 'root_type' in 'HdfsOrcScanner::Open').
/// * Create OrcColumnReaders recursively with 'root_type', 'col_id_path_map_' and
/// TupleDescriptors (HdfsOrcScanner::Open)
/// * At the begining of processing a Stripe, we update 'row_reader_options_' to have
/// the range of the Stripe boundaries. Then create a orc::RowReader for this Stripe
/// (HdfsOrcScanner::NextStripe)
class HdfsOrcScanner : public HdfsScanner {
/// Exception throws from the orc scanner to stop the orc::RowReader. It's used in
/// IO errors (e.g. cancellation) or memory errors (e.g. mem_limit exceeded). The
/// exact error message will be recorded in parse_status_.
class ResourceError : public std::runtime_error {
explicit ResourceError(const Status& status)
: runtime_error(status.msg().msg()), status_(status) {}
virtual ~ResourceError() {}
Status& GetStatus() { return status_; }
Status status_;
/// A wrapper of std::malloc and std::free to track the memory usage of the ORC lib.
/// Without this the ORC lib will use std::malloc and std::free directly.
class OrcMemPool : public orc::MemoryPool {
OrcMemPool(HdfsOrcScanner* scanner);
virtual ~OrcMemPool();
char* malloc(uint64_t size) override;
void free(char* p) override;
void FreeAll();
HdfsOrcScanner* scanner_;
MemTracker* mem_tracker_;
boost::unordered_map<char*, uint64_t> chunk_sizes_;
/// A wrapper of DiskIoMgr to be used by the ORC lib.
class ScanRangeInputStream : public orc::InputStream {
ScanRangeInputStream(HdfsOrcScanner* scanner) {
this->scanner_ = scanner;
this->filename_ = scanner->filename();
this->file_desc_ = scanner->scan_node_->GetFileDesc(
scanner->context_->partition_descriptor()->id(), filename_);
uint64_t getLength() const override {
return file_desc_->file_length;
uint64_t getNaturalReadSize() const override {
return ExecEnv::GetInstance()->disk_io_mgr()->max_buffer_size();
/// Read 'length' bytes from the file starting at 'offset' into the buffer starting
/// at 'buf'.
void read(void* buf, uint64_t length, uint64_t offset) override;
const std::string& getName() const override {
return filename_;
HdfsOrcScanner* scanner_;
HdfsFileDesc* file_desc_;
std::string filename_;
HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsOrcScanner();
/// Issue just the footer range for each file. We'll then parse the footer and pick
/// out the columns we want.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT;
virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
virtual void Close(RowBatch* row_batch) override;
friend class OrcColumnReader;
friend class OrcStringColumnReader;
friend class OrcTimestampReader;
friend class OrcComplexColumnReader;
friend class OrcCollectionReader;
friend class OrcStructReader;
friend class OrcListReader;
friend class OrcMapReader;
friend class HdfsOrcScannerTest;
/// Memory guard of the tuple_mem_
uint8_t* tuple_mem_end_ = nullptr;
/// Index of the current stripe being processed. Stripe in ORC is equivalent to
/// RowGroup in Parquet. Initialized to -1 which indicates that we have not started
/// processing the first stripe yet (GetNext() has not yet been called).
int32_t stripe_idx_ = -1;
/// Counts the number of rows processed for the current stripe.
int64_t stripe_rows_read_ = 0;
/// Indicates whether we should advance to the next stripe in the next GetNext().
/// Starts out as true to move to the very first stripe.
bool advance_stripe_ = true;
/// Indicates whether we are at the end of a stripe.
bool end_of_stripe_ = true;
/// Number of scratch batches processed so far.
int64_t row_batches_produced_ = 0;
/// Mem pool used in orc readers.
boost::scoped_ptr<OrcMemPool> reader_mem_pool_;
std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
/// orc::Reader's responsibility is to read the footer and metadata from an ORC file.
/// It creates orc::RowReader for further materialization. orc::RowReader is used for
/// reading rows from the file.
std::unique_ptr<orc::Reader> reader_ = nullptr;
std::unique_ptr<orc::RowReader> row_reader_ = nullptr;
/// ReaderOptions used to create orc::Reader.
orc::ReaderOptions reader_options_;
/// RowReaderOptions used to create orc::RowReader.
orc::RowReaderOptions row_reader_options_;
/// Scratch batch updated in place by 'row_reader_' (reader from the ORC lib). Will be
/// consumed by 'orc_root_reader_' (column reader implemented by ourselves). See more
/// in 'AssembleRows'
std::unique_ptr<orc::ColumnVectorBatch> orc_root_batch_;
/// The root column reader to transfer orc values into impala RowBatch. The root of
/// the ORC file schema is always in STRUCT type so we use OrcStructReader here.
/// Instead of using std::unique_ptr, this object is tracked in 'obj_pool_' to be
/// together with children readers.
OrcStructReader* orc_root_reader_ = nullptr;
/// Slot descriptors that don't match any columns of the ORC file. We'll set NULL in
/// these slots.
std::unordered_set<const SlotDescriptor*> missing_field_slots_;
/// Selected column(type) ids of the ORC file. Use list instead of vector here since
/// orc::RowReaderOptions.includeTypes() expects a list
std::list<uint64_t> selected_type_ids_;
/// Map from orc::Type id to SchemaPath. See more in descriptors.h for the definition
/// of SchemaPath. The map is used in the constructors of OrcColumnReaders where we
/// resolve SchemaPaths of the descriptors into ORC columns. Built in 'Open'.
std::vector<SchemaPath> col_id_path_map_;
/// Scan range for the metadata (file tail).
const io::ScanRange* metadata_range_ = nullptr;
/// Timer for materializing rows. This ignores time getting the next buffer.
ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
/// Average and min/max time spent processing the footer by each split.
RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_ = nullptr;
/// Number of columns that need to be read.
RuntimeProfile::Counter* num_cols_counter_ = nullptr;
/// Number of stripes that need to be read.
RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
/// Number of scanners that end up doing no reads because their splits don't overlap
/// with the midpoint of any stripe in the file.
RuntimeProfile::Counter* num_scanners_with_no_reads_counter_ = nullptr;
/// Number of collection items read in current row batch. It is a scanner-local counter
/// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
/// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
/// AssembleRows() and then is reset to 0.
int64_t coll_items_read_counter_;
const char *filename() const { return metadata_range_->file(); }
Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
/// Advances 'stripe_idx_' to the next non-empty stripe and initializes
/// row_reader_ to scan it.
Status NextStripe() WARN_UNUSED_RESULT;
/// Reads data to materialize instances of 'tuple_desc'.
/// Returns a non-OK status if a non-recoverable error was encountered and execution
/// of this query should be terminated immediately.
Status AssembleRows(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Materialize collection(list/map) tuples belong to the 'row_idx'-th row of
/// coll_reader's ORC batch. Each column reader will hold an ORC batch until its values
/// are drained.
Status AssembleCollection(const OrcComplexColumnReader& coll_reader, int row_idx,
CollectionValueBuilder* coll_value_builder) WARN_UNUSED_RESULT;
/// Transfer rows in 'orc_root_batch_' into tuples in 'dst_batch'. Evaluates runtime
/// filters and conjuncts (if any) against the tuples. Only surviving tuples are added
/// to the given batch. Returns if either 'orc_root_batch_' is drained or 'dst_batch'
/// is full.
Status TransferTuples(OrcComplexColumnReader* column_reader,
RowBatch* dst_batch) WARN_UNUSED_RESULT;
/// Process the file footer and parse file_metadata_. This should be called with the
/// last FOOTER_SIZE bytes in context_.
Status ProcessFileTail() WARN_UNUSED_RESULT;
/// Resolve SchemaPath in TupleDescriptors and translate them to ORC type ids into
/// 'selected_nodes'. Track the position slots by pre-order traversal in the
/// descriptors and push them to a stack as 'pos_slots'.
Status ResolveColumns(const TupleDescriptor& tuple_desc,
std::list<const orc::Type*>* selected_nodes,
std::stack<const SlotDescriptor*>* pos_slots);
/// Resolve 'tuple_desc' to get selected columns. Update 'row_reader_options' with the
/// selected type ids.
Status SelectColumns(const TupleDescriptor& tuple_desc) WARN_UNUSED_RESULT;
/// Part of the HdfsScanner interface, not used in Orc.
Status InitNewRange() override WARN_UNUSED_RESULT { return Status::OK(); }
THdfsCompression::type TranslateCompressionKind(orc::CompressionKind kind);
inline Status AllocateTupleMem(RowBatch* row_batch) WARN_UNUSED_RESULT;
bool IsPartitionKeySlot(const SlotDescriptor* slot);
bool IsMissingField(const SlotDescriptor* slot);
} // namespace impala