| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H |
| #define IMPALA_RUNTIME_RUNTIME_STATE_H |
| |
| #include <boost/scoped_ptr.hpp> |
| #include <vector> |
| #include <string> |
| |
| // NOTE: try not to add more headers here: runtime-state.h is included in many many files. |
| #include "common/global-types.h" // for PlanNodeId |
| #include "runtime/client-cache-types.h" |
| #include "runtime/thread-resource-mgr.h" |
| #include "util/runtime-profile.h" |
| #include "gen-cpp/ImpalaInternalService_types.h" |
| |
| namespace impala { |
| |
| class BufferedBlockMgr; |
| class DataStreamRecvr; |
| class DescriptorTbl; |
| class DiskIoMgr; |
| class DiskIoRequestContext; |
| class Expr; |
| class LlvmCodeGen; |
| class MemTracker; |
| class ObjectPool; |
| class RuntimeFilterBank; |
| class ScalarFnCall; |
| class Status; |
| class TimestampValue; |
| class TUniqueId; |
| class ExecEnv; |
| class DataStreamMgr; |
| class HBaseTableFactory; |
| class TPlanFragmentCtx; |
| class TPlanFragmentInstanceCtx; |
| class QueryState; |
| |
| /// TODO: move the typedefs into a separate .h (and fix the includes for that) |
| |
| /// Counts how many rows an INSERT query has added to a particular partition |
| /// (partitions are identified by their partition keys: k1=v1/k2=v2 |
| /// etc. Unpartitioned tables have a single 'default' partition which is |
| /// identified by ROOT_PARTITION_KEY. |
| typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap; |
| |
| /// Stats per partition for insert queries. They key is the same as for PartitionRowCount |
| typedef std::map<std::string, TInsertStats> PartitionInsertStats; |
| |
| /// Tracks files to move from a temporary (key) to a final destination (value) as |
| /// part of query finalization. If the destination is empty, the file is to be |
| /// deleted. |
| typedef std::map<std::string, std::string> FileMoveMap; |
| |
| /// A collection of items that are part of the global state of a query and shared across |
| /// all execution nodes of that query. After initialisation, callers must call |
| /// ReleaseResources() to ensure that all resources are correctly freed before |
| /// destruction. |
| class RuntimeState { |
| public: |
| /// query_state, fragment_ctx, and instance_ctx need to be alive at least as long as |
| /// the constructed RuntimeState |
| RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, |
| const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env); |
| |
| /// RuntimeState for executing expr in fe-support. |
| RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env = nullptr); |
| |
| /// Empty d'tor to avoid issues with scoped_ptr. |
| ~RuntimeState(); |
| |
| /// Set up five-level hierarchy of mem trackers: process, pool, query, fragment |
| /// instance. The instance tracker is tied to our profile. Specific parts of the |
| /// fragment (i.e. exec nodes, sinks, data stream senders, etc) will add a fifth level |
| /// when they are initialized. This function also initializes a user function mem |
| /// tracker (in the fifth level). If 'request_pool' is null, no request pool mem |
| /// tracker is set up, i.e. query pools will have the process mem pool as the parent. |
| void InitMemTrackers(const std::string* request_pool, int64_t query_bytes_limit); |
| |
| /// Initializes the runtime filter bank. Must be called after InitMemTrackers(). |
| void InitFilterBank(); |
| |
| /// Gets/Creates the query wide block mgr. |
| Status CreateBlockMgr(); |
| |
| QueryState* query_state() const { return query_state_; } |
| ObjectPool* obj_pool() const { return obj_pool_.get(); } |
| const DescriptorTbl& desc_tbl() const { return *desc_tbl_; } |
| void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; } |
| const TQueryOptions& query_options() const; |
| int batch_size() const { return query_options().batch_size; } |
| bool abort_on_error() const { return query_options().abort_on_error; } |
| bool strict_mode() const { return query_options().strict_mode; } |
| bool abort_on_default_limit_exceeded() const { |
| return query_options().abort_on_default_limit_exceeded; |
| } |
| const TQueryCtx& query_ctx() const; |
| const TPlanFragmentInstanceCtx& instance_ctx() const { return *instance_ctx_; } |
| const TUniqueId& session_id() const { return query_ctx().session.session_id; } |
| const std::string& do_as_user() const { return query_ctx().session.delegated_user; } |
| const std::string& connected_user() const { |
| return query_ctx().session.connected_user; |
| } |
| const TimestampValue* now() const { return now_.get(); } |
| void set_now(const TimestampValue* now); |
| const TUniqueId& query_id() const { return query_ctx().query_id; } |
| const TUniqueId& fragment_instance_id() const { |
| return instance_ctx_ != nullptr |
| ? instance_ctx_->fragment_instance_id |
| : no_instance_id_; |
| } |
| ExecEnv* exec_env() { return exec_env_; } |
| DataStreamMgr* stream_mgr(); |
| HBaseTableFactory* htable_factory(); |
| ImpalaBackendClientCache* impalad_client_cache(); |
| CatalogServiceClientCache* catalogd_client_cache(); |
| DiskIoMgr* io_mgr(); |
| MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } |
| MemTracker* query_mem_tracker() { return query_mem_tracker_.get(); } |
| ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } |
| |
| FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; } |
| |
| void set_fragment_root_id(PlanNodeId id) { |
| DCHECK_EQ(root_node_id_, -1) << "Should not set this twice."; |
| root_node_id_ = id; |
| } |
| |
| /// The seed value to use when hashing tuples. |
| /// See comment on root_node_id_. We add one to prevent having a hash seed of 0. |
| uint32_t fragment_hash_seed() const { return root_node_id_ + 1; } |
| |
| RuntimeFilterBank* filter_bank() { return filter_bank_.get(); } |
| |
| PartitionStatusMap* per_partition_status() { return &per_partition_status_; } |
| |
| /// Returns runtime state profile |
| RuntimeProfile* runtime_profile() { return &profile_; } |
| |
| /// Returns the LlvmCodeGen object for this fragment instance. |
| LlvmCodeGen* codegen() { return codegen_.get(); } |
| |
| const std::string& GetEffectiveUser() const; |
| |
| /// Add ScalarFnCall expression 'udf' to be codegen'd later if it's not disabled by |
| /// query option. This is for cases in which the UDF cannot be interpreted or if the |
| /// plan fragment doesn't contain any codegen enabled operator. |
| void AddScalarFnToCodegen(ScalarFnCall* udf) { scalar_fns_to_codegen_.push_back(udf); } |
| |
| /// Returns true if there are ScalarFnCall expressions in the fragments which can't be |
| /// interpreted. This should only be used after the Prepare() phase in which all |
| /// expressions' Prepare() are invoked. |
| bool ScalarFnNeedsCodegen() const { return !scalar_fns_to_codegen_.empty(); } |
| |
| /// Returns true if there is a hint to disable codegen. This can be true for single node |
| /// optimization or expression evaluation request from FE to BE (see fe-support.cc). |
| /// Note that this internal flag is advisory and it may be ignored if the fragment has |
| /// any UDF which cannot be interpreted. See ScalarFnCall::Prepare() for details. |
| inline bool CodegenHasDisableHint() const { |
| return query_ctx().disable_codegen_hint; |
| } |
| |
| /// Returns true iff there is a hint to disable codegen and all expressions in the |
| /// fragment can be interpreted. This should only be used after the Prepare() phase |
| /// in which all expressions' Prepare() are invoked. |
| inline bool CodegenDisabledByHint() const { |
| return CodegenHasDisableHint() && !ScalarFnNeedsCodegen(); |
| } |
| |
| /// Returns true if codegen is disabled by query option. |
| inline bool CodegenDisabledByQueryOption() const { |
| return query_options().disable_codegen; |
| } |
| |
| /// Returns true if codegen should be enabled for this fragment. Codegen is enabled |
| /// if all the following conditions hold: |
| /// 1. it's enabled by query option |
| /// 2. it's not disabled by internal hints or there are expressions in the fragment |
| /// which cannot be interpreted. |
| inline bool ShouldCodegen() const { |
| return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint(); |
| } |
| |
| /// Takes ownership of a scan node's reader context and plan fragment executor will call |
| /// UnregisterReaderContexts() to unregister it when the fragment is closed. The IO |
| /// buffers may still be in use and thus the deferred unregistration. |
| void AcquireReaderContext(DiskIoRequestContext* reader_context); |
| |
| /// Unregisters all reader contexts acquired through AcquireReaderContext(). |
| void UnregisterReaderContexts(); |
| |
| BufferedBlockMgr* block_mgr() { |
| DCHECK(block_mgr_.get() != NULL); |
| return block_mgr_.get(); |
| } |
| |
| inline Status GetQueryStatus() { |
| // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition. |
| if (UNLIKELY(!query_status_.ok())) { |
| boost::lock_guard<SpinLock> l(query_status_lock_); |
| return query_status_; |
| } |
| return Status::OK(); |
| } |
| |
| /// Log an error that will be sent back to the coordinator based on an instance of the |
| /// ErrorMsg class. The runtime state aggregates log messages based on type with one |
| /// exception: messages with the GENERAL type are not aggregated but are kept |
| /// individually. |
| bool LogError(const ErrorMsg& msg, int vlog_level = 1); |
| |
| /// Returns true if the error log has not reached max_errors_. |
| bool LogHasSpace() { |
| boost::lock_guard<SpinLock> l(error_log_lock_); |
| return error_log_.size() < query_options().max_errors; |
| } |
| |
| /// Returns true if there are entries in the error log. |
| bool HasErrors() { |
| boost::lock_guard<SpinLock> l(error_log_lock_); |
| return !error_log_.empty(); |
| } |
| |
| /// Returns the error log lines as a string joined with '\n'. |
| std::string ErrorLog(); |
| |
| /// Copy error_log_ to *errors |
| void GetErrors(ErrorLogMap* errors); |
| |
| /// Append all accumulated errors since the last call to this function to new_errors to |
| /// be sent back to the coordinator |
| void GetUnreportedErrors(ErrorLogMap* new_errors); |
| |
| /// Given an error message, determine whether execution should be aborted and, if so, |
| /// return the corresponding error status. Otherwise, log the error and return |
| /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to |
| /// true or the error is not recoverable and should be handled upstream. |
| Status LogOrReturnError(const ErrorMsg& message); |
| |
| bool is_cancelled() const { return is_cancelled_; } |
| void set_is_cancelled(bool v) { is_cancelled_ = v; } |
| |
| RuntimeProfile::Counter* total_storage_wait_timer() { |
| return total_storage_wait_timer_; |
| } |
| |
| RuntimeProfile::Counter* total_network_send_timer() { |
| return total_network_send_timer_; |
| } |
| |
| RuntimeProfile::Counter* total_network_receive_timer() { |
| return total_network_receive_timer_; |
| } |
| |
| RuntimeProfile::ThreadCounters* total_thread_statistics() const { |
| return total_thread_statistics_; |
| } |
| |
| /// Sets query_status_ with err_msg if no error has been set yet. |
| void SetQueryStatus(const std::string& err_msg) { |
| boost::lock_guard<SpinLock> l(query_status_lock_); |
| if (!query_status_.ok()) return; |
| query_status_ = Status(err_msg); |
| } |
| |
| /// Function for logging memory usages to the error log when memory limit is exceeded. |
| /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If |
| /// 'failed_allocation_size' is zero, nothing about the allocation size is logged. |
| void LogMemLimitExceeded(const MemTracker* tracker, int64_t failed_allocation_size); |
| |
| /// Sets query_status_ to MEM_LIMIT_EXCEEDED and logs all the registered trackers. |
| /// Subsequent calls to this will be no-ops. Returns query_status_. |
| /// If 'failed_allocation_size' is not 0, then it is the size of the allocation (in |
| /// bytes) that would have exceeded the limit allocated for 'tracker'. |
| /// This value and tracker are only used for error reporting. |
| /// If 'msg' is non-NULL, it will be appended to query_status_ in addition to the |
| /// generic "Memory limit exceeded" error. |
| /// Note that this interface is deprecated and MemTracker::LimitExceeded() should be |
| /// used and the error status should be returned. |
| Status SetMemLimitExceeded(MemTracker* tracker = NULL, |
| int64_t failed_allocation_size = 0, const ErrorMsg* msg = NULL); |
| |
| /// Returns a non-OK status if query execution should stop (e.g., the query was |
| /// cancelled or a mem limit was exceeded). Exec nodes should check this periodically so |
| /// execution doesn't continue if the query terminates abnormally. |
| Status CheckQueryState(); |
| |
| /// Create a codegen object accessible via codegen() if it doesn't exist already. |
| Status CreateCodegen(); |
| |
| /// Codegen all ScalarFnCall expressions in 'scalar_fns_to_codegen_'. If codegen fails |
| /// for any expressions, return immediately with the error status. Once IMPALA-4233 is |
| /// fixed, it's not fatal to fail codegen if the expression can be interpreted. |
| /// TODO: Fix IMPALA-4233 |
| Status CodegenScalarFns(); |
| |
| /// Release resources and prepare this object for destruction. |
| void ReleaseResources(); |
| |
| private: |
| /// Allow TestEnv to set block_mgr manually for testing. |
| friend class TestEnv; |
| |
| /// Set per-fragment state. |
| void Init(); |
| |
| /// Use a custom block manager for the query for testing purposes. |
| void set_block_mgr(const std::shared_ptr<BufferedBlockMgr>& block_mgr) { |
| block_mgr_ = block_mgr; |
| } |
| |
| DescriptorTbl* desc_tbl_ = nullptr; |
| boost::scoped_ptr<ObjectPool> obj_pool_; |
| |
| /// Lock protecting error_log_ |
| SpinLock error_log_lock_; |
| |
| /// Logs error messages. |
| ErrorLogMap error_log_; |
| |
| /// Global QueryState and original thrift descriptors for this fragment instance. |
| /// Not set by the (const TQueryCtx&) c'tor. |
| QueryState* const query_state_; |
| const TPlanFragmentCtx* const fragment_ctx_; |
| const TPlanFragmentInstanceCtx* const instance_ctx_; |
| |
| /// Provides query ctx if query_state_ == nullptr. |
| TQueryCtx local_query_ctx_; |
| |
| /// Provides instance id if instance_ctx_ == nullptr |
| TUniqueId no_instance_id_; |
| |
| /// Query-global timestamp, e.g., for implementing now(). Set from query_globals_. |
| /// Use pointer to avoid inclusion of timestampvalue.h and avoid clang issues. |
| boost::scoped_ptr<TimestampValue> now_; |
| |
| /// TODO: get rid of this and use ExecEnv::GetInstance() instead |
| ExecEnv* exec_env_; |
| boost::scoped_ptr<LlvmCodeGen> codegen_; |
| |
| /// Contains all ScalarFnCall expressions which need to be codegen'd. |
| vector<ScalarFnCall*> scalar_fns_to_codegen_; |
| |
| /// Thread resource management object for this fragment's execution. The runtime |
| /// state is responsible for returning this pool to the thread mgr. |
| ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr; |
| |
| /// Temporary Hdfs files created, and where they should be moved to ultimately. |
| /// Mapping a filename to a blank destination causes it to be deleted. |
| FileMoveMap hdfs_files_to_move_; |
| |
| /// Records summary statistics for the results of inserts into Hdfs partitions. |
| PartitionStatusMap per_partition_status_; |
| |
| RuntimeProfile profile_; |
| |
| /// Total time waiting in storage (across all threads) |
| RuntimeProfile::Counter* total_storage_wait_timer_; |
| |
| /// Total time spent sending over the network (across all threads) |
| RuntimeProfile::Counter* total_network_send_timer_; |
| |
| /// Total time spent receiving over the network (across all threads) |
| RuntimeProfile::Counter* total_network_receive_timer_; |
| |
| /// Total CPU utilization for all threads in this plan fragment. |
| RuntimeProfile::ThreadCounters* total_thread_statistics_; |
| |
| /// MemTracker that is shared by all fragment instances running on this host. |
| /// The query mem tracker must be released after the instance_mem_tracker_. |
| std::shared_ptr<MemTracker> query_mem_tracker_; |
| |
| /// Memory usage of this fragment instance |
| boost::scoped_ptr<MemTracker> instance_mem_tracker_; |
| |
| /// if true, execution should stop with a CANCELLED status |
| bool is_cancelled_; |
| |
| /// Non-OK if an error has occurred and query execution should abort. Used only for |
| /// asynchronously reporting such errors (e.g., when a UDF reports an error), so this |
| /// will not necessarily be set in all error cases. |
| SpinLock query_status_lock_; |
| Status query_status_; |
| |
| /// Reader contexts that need to be closed when the fragment is closed. |
| /// Synchronization is needed if there are multiple scan nodes in a plan fragment and |
| /// Close() may be called on them concurrently (see IMPALA-4180). |
| SpinLock reader_contexts_lock_; |
| std::vector<DiskIoRequestContext*> reader_contexts_; |
| |
| /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory |
| /// with a fixed memory budget. |
| /// The block mgr is shared by all fragments for this query. |
| std::shared_ptr<BufferedBlockMgr> block_mgr_; |
| |
| /// This is the node id of the root node for this plan fragment. This is used as the |
| /// hash seed and has two useful properties: |
| /// 1) It is the same for all exec nodes in a fragment, so the resulting hash values |
| /// can be shared. |
| /// 2) It is different between different fragments, so we do not run into hash |
| /// collisions after data partitioning (across fragments). See IMPALA-219 for more |
| /// details. |
| PlanNodeId root_node_id_; |
| |
| /// Manages runtime filters that are either produced or consumed (or both!) by plan |
| /// nodes that share this runtime state. |
| boost::scoped_ptr<RuntimeFilterBank> filter_bank_; |
| |
| /// prohibit copies |
| RuntimeState(const RuntimeState&); |
| |
| }; |
| |
| #define RETURN_IF_CANCELLED(state) \ |
| do { \ |
| if (UNLIKELY((state)->is_cancelled())) return Status::CANCELLED; \ |
| } while (false) |
| |
| } |
| |
| #endif |