| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #ifndef IMPALA_EXEC_HDFS_SCANNER_H_ |
| #define IMPALA_EXEC_HDFS_SCANNER_H_ |
| |
| #include <vector> |
| #include <memory> |
| #include <stdint.h> |
| #include <boost/scoped_ptr.hpp> |
| |
| #include "codegen/impala-ir.h" |
| #include "common/global-flags.h" |
| #include "common/object-pool.h" |
| #include "common/status.h" |
| #include "exec/hdfs-scan-node-base.h" |
| #include "exec/scanner-context.h" |
| #include "runtime/io/disk-io-mgr.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/tuple.h" |
| |
| namespace impala { |
| |
| class Codec; |
| class CollectionValueBuilder; |
| class Compression; |
| class Expr; |
| class HdfsPartitionDescriptor; |
| class MemPool; |
| class TextConverter; |
| class TupleDescriptor; |
| class SlotDescriptor; |
| |
| // The number of row batches between checks to see if a filter is effective, and |
| // should be disabled. Must be a power of two. |
| constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16; |
| static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK), |
| "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two"); |
| |
| /// Intermediate structure used for two pass parsing approach. In the first pass, |
| /// the FieldLocation structs are filled out and contain where all the fields start and |
| /// their lengths. In the second pass, the FieldLocation is used to write out the |
| /// slots. We want to keep this struct as small as possible. |
| struct FieldLocation { |
| /// start of field. This is set to NULL for FieldLocations that are past the |
| /// end of the row in the file. E.g. the file only had 3 cols but the table |
| /// has 10. These then get written out as NULL slots. |
| char* start; |
| /// Encodes the length and whether or not this fields needs to be unescaped. |
| /// If len < 0, then the field needs to be unescaped. |
| /// |
| /// Currently, 'len' has to fit in a 32-bit integer as that's the limit for StringValue |
| /// and StringVal. All other types shouldn't be anywhere near this limit. |
| int len; |
| |
| static const char* LLVM_CLASS_NAME; |
| }; |
| |
| /// HdfsScanner is the superclass for different hdfs file format parsers. There is |
| /// an instance of the scanner object created for each split, each driven by a different |
| /// thread created by the scan node. The scan node calls: |
| /// 1. Open() |
| /// 2. ProcessSplit() or GetNext()* |
| /// 3. Close() |
| /// The scanner can be used in either of two modes. Which mode is expected to be used |
| /// depends on the type of parent scan node. Parent scan nodes with a row batch queue |
| /// are expected to call ProcessSplit() and not GetNext(). Row batches will be added to |
| /// the scan node's row batch queue, including the final one in Close(). |
| /// ProcessSplit() scans the split and adds materialized row batches to the scan node's |
| /// row batch queue until the split is complete or an error occurred. |
| /// GetNext() provides an iterator-like interface where the caller can request |
| /// the next materialized row batch until the split has been fully processed (eos). |
| /// |
| /// The HdfsScanner works in tandem with the ScannerContext to interleave IO |
| /// and parsing. |
| // |
| /// If a split is compressed, then a decompressor will be created, either during Prepare() |
| /// or at the beginning of ProcessSplit(), and used for decompressing and reading the |
| /// split. |
| // |
| /// For codegen, the implementation is split into two parts. |
| /// 1. During the Prepare() phase of the ScanNode, the scanner subclass's static |
| /// Codegen() function will be called to perform codegen for that scanner type |
| /// for the specific tuple desc. This codegen'd function is cached in the HdfsScanNode. |
| /// 2. During the GetNext() phase of the scan node (where we create one Scanner for each |
| /// scan range), the created scanner subclass can retrieve, from the scan node, |
| /// the codegen'd function to use. |
| /// This way, we only codegen once per scanner type, rather than once per scanner object. |
| // |
| /// This class also encapsulates row batch management. Subclasses should call |
| /// CommitRows() after writing to the current row batch, which handles creating row |
| /// batches, releasing per-batch resources, and passing row batches up to the scan node. |
| /// Subclasses can also use GetMemory() to help with per-row memory management. |
| /// TODO: Have a pass over all members and move them out of the base class if sensible |
| /// to clarify which state each concrete scanner type actually has. |
| class HdfsScanner { |
| public: |
| /// Assumed size of an OS file block. Used mostly when reading file format headers, etc. |
| /// This probably ought to be a derived number from the environment. |
| const static int FILE_BLOCK_SIZE = 4096; |
| |
| HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state); |
| |
| virtual ~HdfsScanner(); |
| |
| /// One-time initialisation of state that is constant across scan ranges. |
| virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT; |
| |
| /// Returns the next row batch from this scanner's split. |
| /// Recoverable errors are logged to the runtime state. Only returns a non-OK status |
| /// if a non-recoverable error is encountered (or abort_on_error is true). If OK is |
| /// returned, 'parse_status_' is guaranteed to be OK as well. |
| /// The memory referenced by the tuples is valid until this or any subsequently |
| /// returned batch is reset or destroyed. |
| /// Only valid to call if the parent scan node is single-threaded. |
| Status GetNext(RowBatch* row_batch) WARN_UNUSED_RESULT { |
| DCHECK(!scan_node_->HasRowBatchQueue()); |
| return GetNextInternal(row_batch); |
| } |
| |
| /// Process an entire split, reading bytes from the context's streams. Context is |
| /// initialized with the split data (e.g. template tuple, partition descriptor, etc). |
| /// This function should only return on error or end of scan range. |
| /// Only valid to call if the parent scan node is multi-threaded. |
| virtual Status ProcessSplit() WARN_UNUSED_RESULT; |
| |
| /// Creates a new row batch and transfers the ownership of memory backing returned |
| /// tuples to it by calling Close(RowBatch). That last batch is added to the row batch |
| /// queue. Only valid to call if HasRowBatchQueue(). |
| void Close(); |
| |
| /// Transfers the ownership of memory backing returned tuples such as buffers |
| /// and memory in mem pools to the given row batch. If the row batch is NULL, |
| /// those resources are released instead. In any case, releases all other resources |
| /// that are not backing returned rows (e.g. temporary decompression buffers). |
| /// Also marks any associated scan ranges as complete by calling RangeComplete() on the |
| /// scan node. |
| /// This function is not idempotent and must only be called once. |
| virtual void Close(RowBatch* row_batch) = 0; |
| |
| /// Helper function that frees resources common to all scanner subclasses like the |
| /// 'decompressor_', 'context_', 'obj_pool_', etc. Should only be called once the last |
| /// row batch has been attached to the row batch queue (if applicable) to avoid freeing |
| /// memory that might be referenced by the last batch. |
| /// Only valid to call if 'is_closed_' is false. Sets 'is_closed_' to true. |
| void CloseInternal(); |
| |
| /// Only valid to call if the parent scan node is single-threaded. |
| bool eos() const { |
| DCHECK(!scan_node_->HasRowBatchQueue()); |
| return eos_; |
| } |
| |
| /// Scanner subclasses must implement these static functions as well. Unfortunately, |
| /// c++ does not allow static virtual functions. |
| |
| /// Issue the initial ranges for 'files'. HdfsFileDesc groups all the splits |
| /// assigned to this scan node by file. This is called before any of the scanner |
| /// subclasses are created to process splits in 'files'. |
| /// The strategy on how to parse the scan ranges depends on the file format. |
| /// - For simple text files, all the splits are simply issued to the io mgr and |
| /// one split == one scan range. |
| /// - For formats with a header, the metadata is first parsed, and then the ranges are |
| /// issued to the io mgr. There is one scan range for the header and one range for |
| /// each split. |
| /// - For columnar formats, the header is parsed and only the relevant byte ranges |
| /// should be issued to the io mgr. This is one range for the metadata and one |
| /// range for each column, for each split. |
| /// This function is how scanners can pick their strategy. |
| /// void IssueInitialRanges(HdfsScanNodeBase* scan_node, |
| /// const std::vector<HdfsFileDesc*>& files); |
| |
| /// Codegen all functions for this scanner. The codegen'd function is specific to |
| /// the scanner subclass but not specific to each scanner object. We don't want to |
| /// codegen the functions for each scanner object. |
| /// llvm::Function* Codegen(HdfsScanNode* scan_node); |
| |
| static const char* LLVM_CLASS_NAME; |
| |
| protected: |
| /// The scan node that started this scanner |
| HdfsScanNodeBase* scan_node_; |
| |
| /// RuntimeState for error reporting |
| RuntimeState* state_; |
| |
| /// Context for this scanner |
| ScannerContext* context_ = nullptr; |
| |
| /// Object pool for objects with same lifetime as scanner. |
| ObjectPool obj_pool_; |
| |
| /// The first stream for context_ |
| ScannerContext::Stream* stream_ = nullptr; |
| |
| /// Set if this scanner has processed all ranges and will not produce more rows. |
| bool eos_ = false; |
| |
| /// Starts as false and is set to true in Close(). |
| bool is_closed_ = false; |
| |
| /// MemPool used for expr-managed memory in expression evaluators in this scanner. |
| /// Need to be local to each scanner as MemPool is not thread safe. |
| boost::scoped_ptr<MemPool> expr_perm_pool_; |
| |
| /// Clones of the conjuncts' evaluators in scan_node_->conjuncts_map(). |
| /// Each scanner has its own ScalarExprEvaluators so the conjuncts can be safely |
| /// evaluated in parallel. |
| HdfsScanNodeBase::ConjunctEvaluatorsMap conjunct_evals_map_; |
| |
| // Convenience reference to conjuncts_evals_map_[scan_node_->tuple_idx()] for |
| // scanners that do not support nested types. |
| const std::vector<ScalarExprEvaluator*>* conjunct_evals_ = nullptr; |
| |
| // Clones of the conjuncts' evaluators in scan_node_->thrift_dict_filter_conjuncts(). |
| typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> |
| DictFilterConjunctsMap; |
| DictFilterConjunctsMap dict_filter_map_; |
| |
| /// Holds memory for template tuples. The memory in this pool must remain valid as long |
| /// as the row batches produced by this scanner. This typically means that the |
| /// ownership is transferred to the last row batch in Close(). Some scanners transfer |
| /// the ownership to the parent scan node instead due being closed multiple times. |
| boost::scoped_ptr<MemPool> template_tuple_pool_; |
| |
| /// A template tuple is a partially-materialized tuple with only partition key slots set |
| /// (or other default values, such as NULL for columns missing in a file). The other |
| /// slots are set to NULL. The template tuple must be copied into output tuples before |
| /// any of the other slots are materialized. |
| /// |
| /// Each tuple descriptor (i.e. scan_node_->tuple_desc() and any collection item tuple |
| /// descs) has a template tuple, or NULL if there are no partition key or default slots. |
| /// Template tuples are computed once for each file and are allocated from |
| /// template_tuple_pool_. |
| std::unordered_map<const TupleDescriptor*, Tuple*> template_tuple_map_; |
| |
| /// Convenience variable set to the top-level template tuple |
| /// (i.e. template_tuple_map_[scan_node_->tuple_desc()]). |
| Tuple* template_tuple_ = nullptr; |
| |
| /// Fixed size of each top-level tuple, in bytes |
| const int32_t tuple_byte_size_; |
| |
| /// Current tuple pointer into 'tuple_mem_'. |
| Tuple* tuple_ = nullptr; |
| |
| /// The tuple memory backing 'tuple_'. |
| uint8_t* tuple_mem_ = nullptr; |
| |
| /// Helper class for converting text to other types; |
| boost::scoped_ptr<TextConverter> text_converter_; |
| |
| /// Contains current parse status to minimize the number of Status objects returned. |
| /// This significantly minimizes the cross compile dependencies for llvm since status |
| /// objects inline a bunch of string functions. Also, status objects aren't extremely |
| /// cheap to create and destroy. |
| Status parse_status_ = Status::OK(); |
| |
| /// Decompressor class to use, if any. |
| boost::scoped_ptr<Codec> decompressor_; |
| |
| /// The most recently used decompression type. |
| THdfsCompression::type decompression_type_ = THdfsCompression::NONE; |
| |
| /// Pool to allocate per data block memory. This should be used with the |
| /// decompressor and any other per data block allocations. |
| boost::scoped_ptr<MemPool> data_buffer_pool_; |
| |
| /// Offsets of string slots in the result tuple that may need to be copied as part of |
| /// tuple materialization. Populated in constructor. This is redundant with offset |
| /// information stored in the TupleDescriptor but storing only the required metadata |
| /// in a simple array of struct simplifies codegen and speeds up interpretation. |
| std::vector<SlotOffsets> string_slot_offsets_; |
| |
| /// Time spent decompressing bytes. |
| RuntimeProfile::Counter* decompress_timer_ = nullptr; |
| |
| /// Matching typedef for WriteAlignedTuples for codegen. Refer to comments for |
| /// that function. |
| typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, FieldLocation*, |
| int, int, int, int, bool); |
| /// Jitted write tuples function pointer. Null if codegen is disabled. |
| WriteTuplesFn write_tuples_fn_ = nullptr; |
| |
| struct LocalFilterStats { |
| /// Total number of rows to which each filter was applied |
| int64_t considered; |
| |
| /// Total number of rows that each filter rejected. |
| int64_t rejected; |
| |
| /// Total number of rows that each filter could have been applied to (if it were |
| /// available from row 0). |
| int64_t total_possible; |
| |
| /// Use known-width type to act as logical boolean. Set to 1 if corresponding filter |
| /// in filter_ctxs_ should be applied, 0 if it was ineffective and was disabled. |
| uint8_t enabled; |
| |
| /// Padding to ensure structs do not straddle cache-line boundary. |
| uint8_t padding[7]; |
| |
| LocalFilterStats() : considered(0), rejected(0), total_possible(0), enabled(1) { } |
| }; |
| |
| /// Cached runtime filter contexts, one for each filter that applies to this column. |
| vector<const FilterContext *> filter_ctxs_; |
| |
| /// Track statistics of each filter (one for each filter in filter_ctxs_) per scanner |
| /// so that expensive aggregation up to the scan node can be performed once, during |
| /// Close(). |
| vector<LocalFilterStats> filter_stats_; |
| |
| /// Size of the file footer for ORC and Parquet. This is a guess. If this value is too |
| /// little, we will need to issue another read. |
| static const int64_t FOOTER_SIZE = 1024 * 100; |
| static_assert(FOOTER_SIZE <= READ_SIZE_MIN_VALUE, |
| "FOOTER_SIZE can not be greater than READ_SIZE_MIN_VALUE.\n" |
| "You can increase FOOTER_SIZE if you want, " |
| "just don't forget to increase READ_SIZE_MIN_VALUE as well."); |
| |
| /// Check runtime filters' effectiveness every BATCHES_PER_FILTER_SELECTIVITY_CHECK |
| /// row batches. Will update 'filter_stats_'. |
| void CheckFiltersEffectiveness(); |
| |
| /// Evaluates 'row' against the i-th runtime filter for this scan node and returns |
| /// true if 'row' finds a match in the filter. Returns false otherwise. |
| bool EvalRuntimeFilter(int i, TupleRow* row); |
| |
| /// Evaluates runtime filters (if any) against the given row. Returns true if |
| /// they passed, false otherwise. Maintains the runtime filter stats, determines |
| /// whether the filters are effective, and disables them if they are not. This is |
| /// replaced by generated code at runtime. |
| bool EvalRuntimeFilters(TupleRow* row); |
| |
| /// Find and return the last split in the file if it is assigned to this scan node. |
| /// Returns NULL otherwise. |
| static io::ScanRange* FindFooterSplit(HdfsFileDesc* file); |
| |
| /// Issue just the footer range for each file. This function is only used in parquet |
| /// and orc scanners. We'll then parse the footer and pick out the columns we want. |
| static Status IssueFooterRanges(HdfsScanNodeBase* scan_node, |
| const THdfsFileFormat::type& file_type, const std::vector<HdfsFileDesc*>& files) |
| WARN_UNUSED_RESULT; |
| |
| /// Implements GetNext(). Should be overridden by subclasses. |
| /// Only valid to call if the parent scan node is multi-threaded. |
| virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT { |
| DCHECK(false) << "GetNextInternal() not implemented for this scanner type."; |
| return Status::OK(); |
| } |
| |
| /// Initializes write_tuples_fn_ to the jitted function if codegen is possible. |
| /// - partition - partition descriptor for this scanner/scan range |
| /// - type - type for this scanner |
| /// - scanner_name - debug string name for this scanner (e.g. HdfsTextScanner) |
| Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, |
| THdfsFileFormat::type type, const std::string& scanner_name) WARN_UNUSED_RESULT; |
| |
| /// Reset internal state for a new scan range. |
| virtual Status InitNewRange() WARN_UNUSED_RESULT = 0; |
| |
| /// Gets memory for outputting tuples into the CollectionValue being constructed via |
| /// 'builder'. If memory limit is exceeded, an error status is returned. Otherwise, |
| /// returns the maximum number of tuples that can be output in 'num_rows'. |
| /// |
| /// The returned TupleRow* should not be incremented (i.e. don't call next_row() on |
| /// it). Instead, incrementing *tuple_mem will update *tuple_row_mem to be pointing at |
| /// the next tuple. This also means its unnecessary to call |
| /// (*tuple_row_mem)->SetTuple(). |
| Status GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool, |
| Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) WARN_UNUSED_RESULT; |
| |
| /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly. |
| /// Frees expr result allocations. Returns non-OK if 'context_' is cancelled or the |
| /// query status in 'state_' is non-OK. |
| Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT; |
| |
| /// Convenience function for evaluating conjuncts using this scanner's ScalarExprEvaluators. |
| /// This must always be inlined so we can correctly replace the call to |
| /// ExecNode::EvalConjuncts() during codegen. |
| bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow* row) { |
| return ExecNode::EvalConjuncts(conjunct_evals_->data(), conjunct_evals_->size(), row); |
| } |
| |
| /// Handles the case when there are no slots materialized (e.g. count(*)) by adding |
| /// up to 'num_tuples' rows to the row batch which 'row' points to. Assumes each tuple |
| /// row only has one tuple. Set the added tuples in the row batch with the template |
| /// tuple if it's not NULL. In the rare case when there are conjuncts, evaluate them |
| /// once for each row and only add a row when they evaluate to true. Returns the number |
| /// of tuple rows added. |
| int WriteTemplateTuples(TupleRow* row, int num_tuples); |
| |
| /// Processes batches of fields and writes them out to tuple_row_mem. |
| /// - 'pool' mempool to allocate from for auxiliary tuple memory |
| /// - 'tuple_row_mem' preallocated tuple_row memory this function must use. |
| /// - 'fields' must start at the beginning of a tuple. |
| /// - 'num_tuples' number of tuples to process |
| /// - 'max_added_tuples' the maximum number of tuples that should be added to the batch. |
| /// - 'row_idx_start' is the number of rows that have already been processed |
| /// as part of WritePartialTuple. |
| /// - 'copy_strings': if true, strings in returned tuples that pass conjuncts are |
| /// copied into 'pool' |
| /// Returns the number of tuples added to the row batch. This can be less than |
| /// num_tuples/tuples_till_limit because of failed conjuncts. |
| /// Returns -1 if an error is encountered, e.g. a parse error or a memory allocation |
| /// error. |
| /// Only valid to call if the parent scan node is multi-threaded. |
| int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, FieldLocation* fields, |
| int num_tuples, int max_added_tuples, int slots_per_tuple, int row_idx_start, |
| bool copy_strings); |
| |
| /// Update the decompressor_ object given a compression type or codec name. Depending on |
| /// the old compression type and the new one, it may close the old decompressor and/or |
| /// create a new one of different type. |
| Status UpdateDecompressor(const THdfsCompression::type& compression) WARN_UNUSED_RESULT; |
| Status UpdateDecompressor(const std::string& codec) WARN_UNUSED_RESULT; |
| |
| /// Utility function to report parse errors for each field. |
| /// If errors[i] is nonzero, fields[i] had a parse error. |
| /// Returns false if parsing should be aborted. In this case parse_status_ is set |
| /// to the error. |
| /// This is called from WriteAlignedTuples. |
| bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors); |
| |
| /// Triggers debug action of the scan node. This is currently used by Parquet column |
| /// readers to exercise various failure paths in Parquet scanner. Returns the status |
| /// returned by the scan node's TriggerDebugAction(). |
| Status ScannerDebugAction() WARN_UNUSED_RESULT { |
| return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER); |
| } |
| |
| /// Utility function to append an error message for an invalid row. |
| void LogRowParseError(); |
| |
| /// Writes out all slots for 'tuple' from 'fields'. 'fields' must be aligned |
| /// to the start of the tuple (e.g. fields[0] maps to slots[0]). |
| /// After writing the tuple, it will be evaluated against the conjuncts. |
| /// - error_fields is an out array. error_fields[i] will be set to true if the ith |
| /// field had a parse error |
| /// - error_in_row is an out bool. It is set to true if any field had parse errors |
| /// Returns whether the resulting tuplerow passed the conjuncts. |
| // |
| /// The parsing of the fields and evaluating against conjuncts is combined in this |
| /// function. This is done so it can be possible to evaluate conjuncts as slots |
| /// are materialized (on partial tuples). |
| // |
| /// This function is replaced by a codegen'd function at runtime. This is |
| /// the reason that the out error parameters are typed uint8_t instead of bool. We need |
| /// to be able to match this function's signature identically for the codegen'd function. |
| /// Bool's as out parameters can get converted to bytes by the compiler and rather than |
| /// implicitly depending on that to happen, we will explicitly type them to bytes. |
| /// TODO: revisit this |
| bool WriteCompleteTuple(MemPool* pool, FieldLocation* fields, Tuple* tuple, |
| TupleRow* tuple_row, Tuple* template_tuple, uint8_t* error_fields, |
| uint8_t* error_in_row) WARN_UNUSED_RESULT; |
| |
| /// Codegen function to replace WriteCompleteTuple. Should behave identically |
| /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn' |
| /// if codegen was successful or NULL otherwise. |
| static Status CodegenWriteCompleteTuple(const HdfsScanNodeBase* node, |
| LlvmCodeGen* codegen, const std::vector<ScalarExpr*>& conjuncts, |
| llvm::Function** write_complete_tuple_fn) WARN_UNUSED_RESULT; |
| |
| /// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross |
| /// compiled to IR. This function loads the precompiled IR function, modifies it, |
| /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was |
| /// successful or NULL otherwise. |
| static Status CodegenWriteAlignedTuples(const HdfsScanNodeBase*, LlvmCodeGen*, |
| llvm::Function* write_tuple_fn, |
| llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT; |
| |
| /// Codegen function to replace InitTuple() removing runtime constants like the tuple |
| /// size and branches like the template tuple existence check. The codegen'd version |
| /// of InitTuple() is stored in 'init_tuple_fn' if codegen was successful. |
| static Status CodegenInitTuple( |
| const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn); |
| |
| /// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted version |
| /// and emitting a customized version of EvalRuntimeFilter() for each filter in |
| /// 'filter_ctxs'. Return error status on failure. The generated function is returned |
| /// via 'fn'. |
| static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen, |
| const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn) |
| WARN_UNUSED_RESULT; |
| |
| /// Report parse error for column @ desc. If abort_on_error is true, sets |
| /// parse_status_ to the error message. |
| void ReportColumnParseError(const SlotDescriptor* desc, const char* data, int len); |
| |
| /// Initialize a tuple. Inlined into the convenience version below for codegen. |
| void IR_ALWAYS_INLINE InitTuple( |
| const TupleDescriptor* desc, Tuple* template_tuple, Tuple* tuple) { |
| if (has_template_tuple(template_tuple)) { |
| InitTupleFromTemplate(template_tuple, tuple, tuple_byte_size(*desc)); |
| } else { |
| Tuple::ClearNullBits(tuple, desc->null_bytes_offset(), desc->num_null_bytes()); |
| } |
| } |
| |
| /// Convenience version of above that passes in the scan's TupleDescriptor. |
| /// Replaced with a codegen'd version in IR. |
| void IR_NO_INLINE InitTuple(Tuple* template_tuple, Tuple* tuple) { |
| return InitTuple(scan_node_->tuple_desc(), template_tuple, tuple); |
| } |
| |
| /// Initialize 'tuple' with size 'tuple_byte_size' from 'template_tuple' |
| void InitTupleFromTemplate(Tuple* template_tuple, Tuple* tuple, int tuple_byte_size) { |
| memcpy(tuple, template_tuple, tuple_byte_size); |
| } |
| |
| /// Initialize a dense array of 'num_tuples' tuples. |
| /// TODO: we could do better here if we inlined the tuple and null indicator byte |
| /// widths with codegen to eliminate all the branches in memcpy()/memset(). |
| void InitTupleBuffer( |
| Tuple* template_tuple, uint8_t* __restrict__ tuple_mem, int64_t num_tuples) { |
| const TupleDescriptor* desc = scan_node_->tuple_desc(); |
| const int tuple_byte_size = desc->byte_size(); |
| // Handle the different template/non-template cases with different loops to avoid |
| // unnecessary branches inside the loop. |
| if (template_tuple != nullptr) { |
| for (int64_t i = 0; i < num_tuples; ++i) { |
| InitTupleFromTemplate( |
| template_tuple, reinterpret_cast<Tuple*>(tuple_mem), tuple_byte_size); |
| tuple_mem += tuple_byte_size; |
| } |
| } else if (tuple_byte_size <= CACHE_LINE_SIZE) { |
| // If each tuple fits in a cache line, it is quicker to zero the whole memory buffer |
| // instead of just the null indicators. This is because we are fetching the cache |
| // line anyway and zeroing a cache line is cheap (a couple of AVX2 instructions) |
| // compared with the overhead of calling memset() row-by-row. |
| memset(tuple_mem, 0, num_tuples * tuple_byte_size); |
| } else { |
| const int null_bytes_offset = desc->null_bytes_offset(); |
| const int num_null_bytes = desc->num_null_bytes(); |
| for (int64_t i = 0; i < num_tuples; ++i) { |
| reinterpret_cast<Tuple*>(tuple_mem)->ClearNullBits( |
| null_bytes_offset, num_null_bytes); |
| tuple_mem += tuple_byte_size; |
| } |
| } |
| } |
| |
| /// Not inlined in IR so it can be replaced with a constant. |
| int IR_NO_INLINE tuple_byte_size() const { return tuple_byte_size_; } |
| int IR_NO_INLINE tuple_byte_size(const TupleDescriptor& desc) const { |
| return desc.byte_size(); |
| } |
| |
| /// Returns true iff 'template_tuple' is non-NULL. |
| /// Not inlined in IR so it can be replaced with a constant. |
| static bool IR_NO_INLINE has_template_tuple(Tuple* template_tuple) { |
| return template_tuple != nullptr; |
| } |
| |
| inline Tuple* next_tuple(int tuple_byte_size, Tuple* t) const { |
| uint8_t* mem = reinterpret_cast<uint8_t*>(t); |
| return reinterpret_cast<Tuple*>(mem + tuple_byte_size); |
| } |
| |
| /// Assumes the row only has a single tuple. |
| inline TupleRow* next_row(TupleRow* r) const { |
| uint8_t* mem = reinterpret_cast<uint8_t*>(r); |
| return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*)); |
| } |
| |
| /// Simple wrapper around conjunct_evals_[idx]. Used in the codegen'd version of |
| /// WriteCompleteTuple() because it's easier than writing IR to access |
| /// conjunct_evals_. |
| ScalarExprEvaluator* GetConjunctEval(int idx) const; |
| |
| /// Unit test constructor |
| HdfsScanner(); |
| }; |
| |
| } |
| |
| #endif |