| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #ifndef IMPALA_EXEC_EXEC_NODE_H |
| #define IMPALA_EXEC_EXEC_NODE_H |
| |
| #include <sstream> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "exprs/expr-context.h" |
| #include "gen-cpp/PlanNodes_types.h" |
| #include "runtime/descriptors.h" // for RowDescriptor |
| #include "util/blocking-queue.h" |
| #include "util/runtime-profile.h" |
| |
| namespace impala { |
| |
| class Expr; |
| class ExprContext; |
| class ObjectPool; |
| class Counters; |
| class SortExecExprs; |
| class RowBatch; |
| class RuntimeState; |
| class TPlan; |
| class TupleRow; |
| class DataSink; |
| class MemTracker; |
| class SubplanNode; |
| |
| /// Superclass of all executor nodes. |
| /// All subclasses need to make sure to check RuntimeState::is_cancelled() |
| /// periodically in order to ensure timely termination after the cancellation |
| /// flag gets set. |
| class ExecNode { |
| public: |
| /// Init conjuncts. |
| ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); |
| |
| virtual ~ExecNode(); |
| |
| /// Initializes this object from the thrift tnode desc. The subclass should |
| /// do any initialization that can fail in Init() rather than the ctor. |
| /// If overridden in subclass, must first call superclass's Init(). |
| virtual Status Init(const TPlanNode& tnode, RuntimeState* state); |
| |
| /// Sets up internal structures, etc., without doing any actual work. |
| /// Must be called prior to Open(). Will only be called once in this |
| /// node's lifetime. |
| /// If overridden in subclass, must first call superclass's Prepare(). |
| virtual Status Prepare(RuntimeState* state); |
| |
| /// Recursively calls Codegen() on all children. |
| /// Expected to be overriden in subclass to generate LLVM IR functions and register |
| /// them with the LlvmCodeGen object. The function pointers of the compiled IR functions |
| /// will be set up in PlanFragmentExecutor::Open(). If overridden in subclass, must also |
| /// call superclass's Codegen() before or after the code generation for this exec node. |
| /// Will only be called once in the node's lifetime. |
| virtual void Codegen(RuntimeState* state); |
| |
| /// Performs any preparatory work prior to calling GetNext(). |
| /// Caller must not be holding any io buffers. This will cause deadlock. |
| /// If overridden in subclass, must first call superclass's Open(). |
| /// Open() is called after Prepare() or Reset(), i.e., possibly multiple times |
| /// throughout the lifetime of this node. |
| virtual Status Open(RuntimeState* state); |
| |
| /// Retrieves rows and returns them via row_batch. Sets eos to true |
| /// if subsequent calls will not retrieve any more rows. |
| /// Data referenced by any tuples returned in row_batch must not be overwritten |
| /// by the callee until Close() is called. The memory holding that data |
| /// can be returned via row_batch's tuple_data_pool (in which case it may be deleted |
| /// by the caller) or held on to by the callee. The row_batch, including its |
| /// tuple_data_pool, will be destroyed by the caller at some point prior to the final |
| /// Close() call. |
| /// In other words, if the memory holding the tuple data will be referenced |
| /// by the callee in subsequent GetNext() calls, it must *not* be attached to the |
| /// row_batch's tuple_data_pool. |
| /// Caller must not be holding any io buffers. This will cause deadlock. |
| /// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet. |
| virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0; |
| |
| /// Resets the stream of row batches to be retrieved by subsequent GetNext() calls. |
| /// Clears all internal state, returning this node to the state it was in after calling |
| /// Prepare() and before calling Open(). This function must not clear memory |
| /// still owned by this node that is backing rows returned in GetNext(). |
| /// Prepare() and Open() must have already been called before calling Reset(). |
| /// GetNext() may have optionally been called (not necessarily until eos). |
| /// Close() must not have been called. |
| /// Reset() is not idempotent. Calling it multiple times in a row without a preceding |
| /// call to Open() is invalid. |
| /// If overridden in a subclass, must call superclass's Reset() at the end. The default |
| /// implementation calls Reset() on children. |
| /// Note that this function may be called many times (proportional to the input data), |
| /// so should be fast. |
| virtual Status Reset(RuntimeState* state); |
| |
| /// Close() will get called for every exec node, regardless of what else is called and |
| /// the status of these calls (i.e. Prepare() may never have been called, or |
| /// Prepare()/Open()/GetNext() returned with an error). |
| /// Close() releases all resources that were allocated in Open()/GetNext(), even if the |
| /// latter ended with an error. Close() can be called if the node has been prepared or |
| /// the node is closed. |
| /// The default implementation updates runtime profile counters and calls |
| /// Close() on the children. Subclasses should check if the node has already been |
| /// closed (is_closed()), then close themselves, then call the base Close(). |
| /// Nodes that are using tuples returned by a child may call Close() on their children |
| /// before their own Close() if the child node has returned eos. |
| /// It is only safe to call Close() on the child node while the parent node is still |
| /// returning rows if the parent node fully materializes the child's input. |
| virtual void Close(RuntimeState* state); |
| |
| /// Creates exec node tree from list of nodes contained in plan via depth-first |
| /// traversal. All nodes are placed in state->obj_pool() and have Init() called on them. |
| /// Returns error if 'plan' is corrupted, otherwise success. |
| static Status CreateTree(RuntimeState* state, const TPlan& plan, |
| const DescriptorTbl& descs, ExecNode** root); |
| |
| /// Set debug action for node with given id in 'tree' |
| static void SetDebugOptions(int node_id, TExecNodePhase::type phase, |
| TDebugAction::type action, ExecNode* tree); |
| |
| /// Collect all nodes of given 'node_type' that are part of this subtree, and return in |
| /// 'nodes'. |
| void CollectNodes(TPlanNodeType::type node_type, std::vector<ExecNode*>* nodes); |
| |
| /// Collect all scan node types. |
| void CollectScanNodes(std::vector<ExecNode*>* nodes); |
| |
| /// Evaluate ExprContexts over row. Returns true if all exprs return true. |
| /// TODO: This doesn't use the vector<Expr*> signature because I haven't figured |
| /// out how to deal with declaring a templated std:vector type in IR |
| static bool EvalConjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row); |
| |
| /// Codegen EvalConjuncts(). Returns a non-OK status if the function couldn't be |
| /// codegen'd. The codegen'd version uses inlined, codegen'd GetBooleanVal() functions. |
| static Status CodegenEvalConjuncts( |
| LlvmCodeGen* codegen, const std::vector<ExprContext*>& conjunct_ctxs, |
| llvm::Function** fn, const char* name = "EvalConjuncts"); |
| |
| /// Returns a string representation in DFS order of the plan rooted at this. |
| std::string DebugString() const; |
| |
| /// Recursive helper method for generating a string for DebugString(). |
| /// Implementations should call DebugString(int, std::stringstream) on their children. |
| /// Input parameters: |
| /// indentation_level: Current level in plan tree. |
| /// Output parameters: |
| /// out: Stream to accumulate debug string. |
| virtual void DebugString(int indentation_level, std::stringstream* out) const; |
| |
| const std::vector<ExprContext*>& conjunct_ctxs() const { return conjunct_ctxs_; } |
| int id() const { return id_; } |
| TPlanNodeType::type type() const { return type_; } |
| const RowDescriptor& row_desc() const { return row_descriptor_; } |
| ExecNode* child(int i) { return children_[i]; } |
| int num_children() const { return children_.size(); } |
| SubplanNode* get_containing_subplan() const { return containing_subplan_; } |
| void set_containing_subplan(SubplanNode* sp) { |
| DCHECK(containing_subplan_ == NULL); |
| containing_subplan_ = sp; |
| } |
| int64_t rows_returned() const { return num_rows_returned_; } |
| int64_t limit() const { return limit_; } |
| bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; } |
| |
| RuntimeProfile* runtime_profile() { return runtime_profile_.get(); } |
| MemTracker* mem_tracker() { return mem_tracker_.get(); } |
| MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); } |
| |
| /// Return true if codegen was disabled by the planner for this ExecNode. Does not |
| /// check to see if codegen was enabled for the enclosing fragment. |
| bool IsNodeCodegenDisabled() const; |
| |
| /// Add codegen disabled message if codegen is disabled for this ExecNode. |
| void AddCodegenDisabledMessage(RuntimeState* state); |
| |
| /// Extract node id from p->name(). |
| static int GetNodeIdFromProfile(RuntimeProfile* p); |
| |
| /// Names of counters shared by all exec nodes |
| static const std::string ROW_THROUGHPUT_COUNTER; |
| |
| protected: |
| friend class DataSink; |
| |
| /// Extends blocking queue for row batches. Row batches have a property that |
| /// they must be processed in the order they were produced, even in cancellation |
| /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches |
| /// and we need to make sure those ptrs stay valid. |
| /// Row batches that are added after Shutdown() are queued in another queue, which can |
| /// be cleaned up during Close(). |
| /// All functions are thread safe. |
| class RowBatchQueue : public BlockingQueue<RowBatch*> { |
| public: |
| /// max_batches is the maximum number of row batches that can be queued. |
| /// When the queue is full, producers will block. |
| RowBatchQueue(int max_batches); |
| ~RowBatchQueue(); |
| |
| /// Adds a batch to the queue. This is blocking if the queue is full. |
| void AddBatch(RowBatch* batch); |
| |
| /// Adds a batch to the queue. If the queue is full, this blocks until space becomes |
| /// available or 'timeout_micros' has elapsed. |
| /// Returns true if the element was added to the queue, false if it wasn't. If this |
| /// method returns false, the queue didn't take ownership of the batch and it must be |
| /// managed externally. |
| bool AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros); |
| |
| /// Gets a row batch from the queue. Returns NULL if there are no more. |
| /// This function blocks. |
| /// Returns NULL after Shutdown(). |
| RowBatch* GetBatch(); |
| |
| /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch() |
| /// after this is called. |
| /// Returns the number of io buffers that were released (for debug tracking) |
| int Cleanup(); |
| |
| private: |
| /// Lock protecting cleanup_queue_ |
| SpinLock lock_; |
| |
| /// Queue of orphaned row batches |
| std::list<RowBatch*> cleanup_queue_; |
| }; |
| |
| /// Unique within a single plan tree. |
| int id_; |
| |
| TPlanNodeType::type type_; |
| ObjectPool* pool_; |
| std::vector<ExprContext*> conjunct_ctxs_; |
| |
| std::vector<ExecNode*> children_; |
| RowDescriptor row_descriptor_; |
| |
| /// debug-only: if debug_action_ is not INVALID, node will perform action in |
| /// debug_phase_ |
| TExecNodePhase::type debug_phase_; |
| TDebugAction::type debug_action_; |
| |
| int64_t limit_; // -1: no limit |
| int64_t num_rows_returned_; |
| |
| boost::scoped_ptr<RuntimeProfile> runtime_profile_; |
| RuntimeProfile::Counter* rows_returned_counter_; |
| RuntimeProfile::Counter* rows_returned_rate_; |
| |
| /// Account for peak memory used by this node |
| boost::scoped_ptr<MemTracker> mem_tracker_; |
| |
| /// MemTracker that should be used for ExprContexts. |
| boost::scoped_ptr<MemTracker> expr_mem_tracker_; |
| |
| bool is_closed() const { return is_closed_; } |
| |
| /// Pointer to the containing SubplanNode or NULL if not inside a subplan. |
| /// Set by SubplanNode::Init(). Not owned. |
| SubplanNode* containing_subplan_; |
| |
| /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode. |
| /// Valid to call in or after Prepare(). |
| bool IsInSubplan() const { return containing_subplan_ != NULL; } |
| |
| /// If true, codegen should be disabled for this exec node. |
| const bool disable_codegen_; |
| |
| /// Create a single exec node derived from thrift node; place exec node in 'pool'. |
| static Status CreateNode(ObjectPool* pool, const TPlanNode& tnode, |
| const DescriptorTbl& descs, ExecNode** node, RuntimeState* state); |
| |
| static Status CreateTreeHelper(RuntimeState* state, |
| const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs, ExecNode* parent, |
| int* node_idx, ExecNode** root); |
| |
| virtual bool IsScanNode() const { return false; } |
| |
| void InitRuntimeProfile(const std::string& name); |
| |
| /// Executes debug_action_ if phase matches debug_phase_. |
| /// 'phase' must not be INVALID. |
| Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state); |
| |
| /// Frees any local allocations made by expr_ctxs_to_free_ and returns the result of |
| /// state->CheckQueryState(). Nodes should call this periodically, e.g. once per input |
| /// row batch. This should not be called outside the main execution thread. |
| // |
| /// Nodes may override this to add extra periodic cleanup, e.g. freeing other local |
| /// allocations. ExecNodes overriding this function should return |
| /// ExecNode::QueryMaintenance(). |
| virtual Status QueryMaintenance(RuntimeState* state); |
| |
| /// Add an ExprContext to have its local allocations freed by QueryMaintenance(). |
| /// Exprs that are evaluated in the main execution thread should be added. Exprs |
| /// evaluated in a separate thread are generally not safe to add, since a local |
| /// allocation may be freed while it's being used. Rather than using this mechanism, |
| /// threads should call FreeLocalAllocations() on local ExprContexts periodically. |
| void AddExprCtxToFree(ExprContext* ctx) { expr_ctxs_to_free_.push_back(ctx); } |
| void AddExprCtxsToFree(const std::vector<ExprContext*>& ctxs); |
| void AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs); |
| |
| /// Free any local allocations made by expr_ctxs_to_free_. |
| void FreeLocalAllocations() { ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); } |
| |
| private: |
| /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not protected |
| /// by a lock, it assumes all calls to Close() are made by the same thread. |
| bool is_closed_; |
| |
| /// Expr contexts whose local allocations are safe to free in the main execution thread. |
| std::vector<ExprContext*> expr_ctxs_to_free_; |
| }; |
| |
| } |
| #endif |