| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_RUNTIME_COORDINATOR_H |
| #define IMPALA_RUNTIME_COORDINATOR_H |
| |
| #include <vector> |
| #include <string> |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/accumulators/accumulators.hpp> |
| #include <boost/accumulators/statistics/stats.hpp> |
| #include <boost/accumulators/statistics/min.hpp> |
| #include <boost/accumulators/statistics/mean.hpp> |
| #include <boost/accumulators/statistics/median.hpp> |
| #include <boost/accumulators/statistics/max.hpp> |
| #include <boost/accumulators/statistics/variance.hpp> |
| #include <boost/unordered_map.hpp> |
| #include <boost/unordered_set.hpp> |
| #include <boost/thread/mutex.hpp> |
| #include <boost/thread/condition_variable.hpp> |
| |
| #include "common/global-types.h" |
| #include "common/hdfs.h" |
| #include "common/status.h" |
| #include "gen-cpp/Frontend_types.h" |
| #include "gen-cpp/Types_types.h" |
| #include "util/histogram-metric.h" |
| #include "util/progress-updater.h" |
| #include "util/runtime-profile.h" |
| #include "scheduling/query-schedule.h" |
| #include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle |
| |
| namespace impala { |
| |
| class CountingBarrier; |
| class DataStreamMgr; |
| class DataSink; |
| class RowBatch; |
| class RowDescriptor; |
| class PlanFragmentExecutor; |
| class ObjectPool; |
| class RuntimeState; |
| class Expr; |
| class ExprContext; |
| class ExecEnv; |
| class TUpdateCatalogRequest; |
| class TQueryExecRequest; |
| class TReportExecStatusParams; |
| class TRowBatch; |
| class TPlanExecRequest; |
| class TRuntimeProfileTree; |
| class RuntimeProfile; |
| class TablePrinter; |
| class TPlanFragment; |
| class QueryResultSet; |
| class MemTracker; |
| class PlanRootSink; |
| class FragmentInstanceState; |
| |
| struct DebugOptions; |
| |
| /// Query coordinator: handles execution of fragment instances on remote nodes, given a |
| /// TQueryExecRequest. As part of that, it handles all interactions with the executing |
| /// backends; it is also responsible for implementing all client requests regarding the |
| /// query, including cancellation. |
| /// |
| /// The coordinator monitors the execution status of fragment instances and aborts the |
| /// entire query if an error is reported by any of them. |
| /// |
| /// Queries that have results have those results fetched by calling GetNext(). Results |
| /// rows are produced by a fragment instance that always executes on the same machine as |
| /// the coordinator. |
| /// |
| /// Once a query has finished executing and all results have been returned either to the |
| /// caller of GetNext() or a data sink, execution_completed() will return true. If the |
| /// query is aborted, execution_completed should also be set to true. Coordinator is |
| /// thread-safe, with the exception of GetNext(). |
| // |
| /// A typical sequence of calls for a single query (calls under the same numbered |
| /// item can happen concurrently): |
| /// 1. client: Exec() |
| /// 2. client: Wait()/client: Cancel()/backend: UpdateFragmentExecStatus() |
| /// 3. client: GetNext()*/client: Cancel()/backend: UpdateFragmentExecStatus() |
| /// |
| /// The implementation ensures that setting an overall error status and initiating |
| /// cancellation of local and all remote fragments is atomic. |
| /// |
| /// TODO: move into separate subdirectory and move nested classes into separate files |
| /// and unnest them |
| class Coordinator { // NOLINT: The member variables could be re-ordered to save space |
| public: |
| Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, |
| RuntimeProfile::EventSequence* events); |
| ~Coordinator(); |
| |
| /// Initiate asynchronous execution of a query with the given schedule. When it returns, |
| /// all fragment instances have started executing at their respective backends. |
| /// A call to Exec() must precede all other member function calls. |
| Status Exec(); |
| |
| /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the |
| /// query doesn't return rows, until the query finishes or is cancelled. |
| /// A call to Wait() must precede all calls to GetNext(). |
| /// Multiple calls to Wait() are idempotent and it is okay to issue multiple |
| /// Wait() calls concurrently. |
| Status Wait(); |
| |
| /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows' |
| /// rows, but will not return more. |
| /// |
| /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a |
| /// no-op. |
| /// |
| /// GetNext() will not set *eos=true until all fragment instances have either completed |
| /// or have failed. |
| /// |
| /// Returns an error status if an error was encountered either locally or by any of the |
| /// remote fragments or if the query was cancelled. |
| /// |
| /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext() |
| /// calls (but may call any of the other member functions concurrently with GetNext()). |
| Status GetNext(QueryResultSet* results, int max_rows, bool* eos); |
| |
| /// Cancel execution of query. This includes the execution of the local plan fragment, |
| /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the |
| /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED. |
| /// Idempotent. |
| void Cancel(const Status* cause = NULL); |
| |
| /// Updates status and query execution metadata of a particular |
| /// fragment; if 'status' is an error status or if 'done' is true, |
| /// considers the plan fragment to have finished execution. Assumes |
| /// that calls to UpdateFragmentExecStatus() won't happen |
| /// concurrently for the same backend. |
| /// If 'status' is an error status, also cancel execution of the query via a call |
| /// to CancelInternal(). |
| Status UpdateFragmentExecStatus(const TReportExecStatusParams& params); |
| |
| /// Only valid *after* calling Exec(). Return nullptr if the running query does not |
| /// produce any rows. |
| /// |
| /// TODO: The only dependency on this is QueryExecState, used to track memory for the |
| /// result cache. Remove this dependency, possibly by moving result caching inside this |
| /// class. |
| RuntimeState* runtime_state(); |
| |
| /// Only valid after Exec(). Returns runtime_state()->query_mem_tracker() if there |
| /// is a coordinator fragment, or query_mem_tracker_ (initialized in Exec()) otherwise. |
| /// |
| /// TODO: Remove, see runtime_state(). |
| MemTracker* query_mem_tracker(); |
| |
| /// Get cumulative profile aggregated over all fragments of the query. |
| /// This is a snapshot of the current state of execution and will change in |
| /// the future if not all fragments have finished execution. |
| RuntimeProfile* query_profile() const { return query_profile_.get(); } |
| |
| const TUniqueId& query_id() const { return query_id_; } |
| |
| /// This is safe to call only after Wait() |
| const PartitionStatusMap& per_partition_status() { return per_partition_status_; } |
| |
| /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu |
| /// was executed, or 0 if there were no Kudu timestamps reported. |
| /// This should only be called after Wait(). |
| uint64_t GetLatestKuduInsertTimestamp() const; |
| |
| /// Gathers all updates to the catalog required once this query has completed execution. |
| /// Returns true if a catalog update is required, false otherwise. |
| /// Must only be called after Wait() |
| bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update); |
| |
| /// Return error log for coord and all the fragments. The error messages from the |
| /// individual fragment instances are merged into a single output to retain readability. |
| std::string GetErrorLog(); |
| |
| const ProgressUpdater& progress() { return progress_; } |
| |
| /// Returns query_status_. |
| Status GetStatus(); |
| |
| /// Returns the exec summary. The exec summary lock must already have been taken. |
| /// The caller must not block while holding the lock. |
| const TExecSummary& exec_summary() const { |
| exec_summary_lock_.DCheckLocked(); |
| return exec_summary_; |
| } |
| |
| /// See the ImpalaServer class comment for the required lock acquisition order. |
| SpinLock& GetExecSummaryLock() const { return exec_summary_lock_; } |
| |
| /// Receive a local filter update from a fragment instance. Aggregate that filter update |
| /// with others for the same filter ID into a global filter. If all updates for that |
| /// filter ID have been received (may be 1 or more per filter), broadcast the global |
| /// filter to fragment instances. |
| void UpdateFilter(const TUpdateFilterParams& params); |
| |
| /// Called once the query is complete to tear down any remaining state. |
| void TearDown(); |
| |
| private: |
| class InstanceState; |
| struct FilterTarget; |
| class FilterState; |
| |
| /// Typedef for boost utility to compute averaged stats |
| /// TODO: including the median doesn't compile, looks like some includes are missing |
| typedef boost::accumulators::accumulator_set<int64_t, |
| boost::accumulators::features< |
| boost::accumulators::tag::min, |
| boost::accumulators::tag::max, |
| boost::accumulators::tag::mean, |
| boost::accumulators::tag::variance> |
| > SummaryStats; |
| |
| const QuerySchedule schedule_; |
| ExecEnv* exec_env_; |
| TUniqueId query_id_; |
| |
| /// copied from TQueryExecRequest; constant across all fragments |
| TDescriptorTable desc_tbl_; |
| TQueryCtx query_ctx_; |
| |
| /// copied from TQueryExecRequest, governs when to call ReportQuerySummary |
| TStmtType::type stmt_type_; |
| |
| /// map from id of a scan node to a specific counter in the node's profile |
| typedef std::map<PlanNodeId, RuntimeProfile::Counter*> CounterMap; |
| |
| /// Struct for per fragment instance counters that will be aggregated by the coordinator. |
| struct FragmentInstanceCounters { |
| /// Throughput counters per node |
| CounterMap throughput_counters; |
| |
| /// Total finished scan ranges per node |
| CounterMap scan_ranges_complete_counters; |
| }; |
| |
| /// InstanceStates for all fragment instances, including that of the coordinator |
| /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in |
| /// StartFInstances(). |
| std::vector<InstanceState*> fragment_instance_states_; |
| |
| /// True if the query needs a post-execution step to tidy up |
| bool needs_finalization_; |
| |
| /// Only valid if needs_finalization is true |
| TFinalizeParams finalize_params_; |
| |
| /// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this |
| boost::mutex wait_lock_; |
| |
| bool has_called_wait_; // if true, Wait() was called; protected by wait_lock_ |
| |
| /// Keeps track of number of completed ranges and total scan ranges. |
| ProgressUpdater progress_; |
| |
| /// Protects all fields below. This is held while making RPCs, so this lock should |
| /// only be acquired if the acquiring thread is prepared to wait for a significant |
| /// time. |
| /// Lock ordering is |
| /// 1. lock_ |
| /// 2. InstanceState::lock_ |
| boost::mutex lock_; |
| |
| /// Overall status of the entire query; set to the first reported fragment error |
| /// status or to CANCELLED, if Cancel() is called. |
| Status query_status_; |
| |
| /// If true, the query is done returning all results. It is possible that the |
| /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries |
| /// with limit) |
| /// Once this is set to true, errors from remote fragments are ignored. |
| bool returned_all_results_; |
| |
| /// Non-null if and only if the query produces results for the client; i.e. is of |
| /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree and return |
| /// them to the client in GetNext(), and also to access the fragment instance's runtime |
| /// state. |
| /// |
| /// Result rows are materialized by this fragment instance in its own thread. They are |
| /// materialized into a QueryResultSet provided to the coordinator during GetNext(). |
| /// |
| /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied |
| /// reference of QueryState released) in TearDown(). |
| FragmentInstanceState* coord_instance_ = nullptr; |
| |
| /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when |
| /// GetNext() hits eos. |
| PlanRootSink* coord_sink_ = nullptr; |
| |
| /// Query mem tracker for this coordinator initialized in Exec(). Only valid if there |
| /// is no coordinator fragment (i.e. executor_ == NULL). If executor_ is not NULL, |
| /// this->runtime_state()->query_mem_tracker() returns the query mem tracker. |
| /// (See this->query_mem_tracker()) |
| std::shared_ptr<MemTracker> query_mem_tracker_; |
| |
| /// owned by plan root, which resides in runtime_state_'s pool |
| const RowDescriptor* row_desc_; |
| |
| /// Returns a local object pool. |
| ObjectPool* obj_pool() { return obj_pool_.get(); } |
| |
| PlanFragmentExecutor* executor(); |
| |
| // Sets the TDescriptorTable(s) for the current fragment. |
| void SetExecPlanDescriptorTable(const TPlanFragment& fragment, |
| TExecPlanFragmentParams* rpc_params); |
| |
| /// True if execution has completed, false otherwise. |
| bool execution_completed_; |
| |
| /// Number of remote fragments that have completed |
| int num_remote_fragements_complete_; |
| |
| /// If there is no coordinator fragment, Wait() simply waits until all |
| /// backends report completion by notifying on instance_completion_cv_. |
| /// Tied to lock_. |
| boost::condition_variable instance_completion_cv_; |
| |
| /// Count of the number of backends for which done != true. When this |
| /// hits 0, any Wait()'ing thread is notified |
| int num_remaining_fragment_instances_; |
| |
| /// The following two structures, partition_row_counts_ and files_to_move_ are filled in |
| /// as the query completes, and track the results of INSERT queries that alter the |
| /// structure of tables. They are either the union of the reports from all fragment |
| /// instances, or taken from the coordinator fragment: only one of the two can |
| /// legitimately produce updates. |
| |
| /// The set of partitions that have been written to or updated by all fragment |
| /// instances, along with statistics such as the number of rows written (may be 0). For |
| /// unpartitioned tables, the empty string denotes the entire table. |
| PartitionStatusMap per_partition_status_; |
| |
| /// The set of files to move after an INSERT query has run, in (src, dest) form. An |
| /// empty string for the destination means that a file is to be deleted. |
| FileMoveMap files_to_move_; |
| |
| /// Object pool owned by the coordinator. Any executor will have its own pool. |
| boost::scoped_ptr<ObjectPool> obj_pool_; |
| |
| /// Execution summary for this query. |
| /// See the ImpalaServer class comment for the required lock acquisition order. |
| mutable SpinLock exec_summary_lock_; |
| TExecSummary exec_summary_; |
| |
| /// A mapping of plan node ids to index into exec_summary_.nodes |
| boost::unordered_map<TPlanNodeId, int> plan_node_id_to_summary_map_; |
| |
| /// Aggregate counters for the entire query. |
| boost::scoped_ptr<RuntimeProfile> query_profile_; |
| |
| /// Event timeline for this query. Unowned. |
| RuntimeProfile::EventSequence* query_events_; |
| |
| /// Per fragment profile information |
| struct PerFragmentProfileData { |
| /// Averaged profile for this fragment. Stored in obj_pool. |
| /// The counters in this profile are averages (type AveragedCounter) of the |
| /// counters in the fragment instance profiles. |
| /// Note that the individual fragment instance profiles themselves are stored and |
| /// displayed as children of the root_profile below. |
| RuntimeProfile* averaged_profile; |
| |
| /// Number of instances running this fragment. |
| int num_instances; |
| |
| /// Root profile for all fragment instances for this fragment |
| RuntimeProfile* root_profile; |
| |
| /// Bytes assigned for instances of this fragment |
| SummaryStats bytes_assigned; |
| |
| /// Completion times for instances of this fragment |
| SummaryStats completion_times; |
| |
| /// Execution rates for instances of this fragment |
| SummaryStats rates; |
| |
| PerFragmentProfileData() |
| : averaged_profile(nullptr), num_instances(-1), root_profile(nullptr) {} |
| }; |
| |
| /// This is indexed by fragment idx (TPlanFragment.idx). |
| /// This array is only modified at coordinator startup and query completion and |
| /// does not need locks. |
| std::vector<PerFragmentProfileData> fragment_profiles_; |
| |
| /// Throughput counters for the coordinator fragment |
| FragmentInstanceCounters coordinator_counters_; |
| |
| /// The set of hosts that the query will run on. Populated in Exec. |
| boost::unordered_set<TNetworkAddress> unique_hosts_; |
| |
| /// Total time spent in finalization (typically 0 except for INSERT into hdfs tables) |
| RuntimeProfile::Counter* finalization_timer_; |
| |
| /// Barrier that is released when all calls to ExecRemoteFragment() have |
| /// returned, successfully or not. Initialised during Exec(). |
| boost::scoped_ptr<CountingBarrier> exec_complete_barrier_; |
| |
| /// Protects filter_routing_table_. |
| SpinLock filter_lock_; |
| |
| /// Map from filter ID to filter. |
| typedef boost::unordered_map<int32_t, FilterState> FilterRoutingTable; |
| FilterRoutingTable filter_routing_table_; |
| |
| /// Set to true when all calls to UpdateFilterRoutingTable() have finished, and it's |
| /// safe to concurrently read from filter_routing_table_. |
| bool filter_routing_table_complete_; |
| |
| /// Total number of filter updates received (always 0 if filter mode is not |
| /// GLOBAL). Excludes repeated broadcast filter updates. |
| RuntimeProfile::Counter* filter_updates_received_; |
| |
| /// The filtering mode for this query. Set in constructor. |
| TRuntimeFilterMode::type filter_mode_; |
| |
| /// Tracks the memory consumed by runtime filters during aggregation. Child of |
| /// query_mem_tracker_. |
| std::unique_ptr<MemTracker> filter_mem_tracker_; |
| |
| /// True if and only if TearDown() has been called. |
| bool torn_down_; |
| |
| /// Returns a pretty-printed table of the current filter state. |
| std::string FilterDebugString(); |
| |
| /// Sets 'filter_routing_table_complete_' and prints the table to the profile and log. |
| void MarkFilterRoutingTableComplete(); |
| |
| /// Fill in rpc_params based on params. |
| void SetExecPlanFragmentParams( |
| const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params); |
| |
| /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from |
| /// multiple threads. |
| void ExecRemoteFInstance( |
| const FInstanceExecParams& exec_params, const DebugOptions* debug_options); |
| |
| /// Determine fragment number, given fragment id. |
| int GetFragmentNum(const TUniqueId& fragment_id); |
| |
| /// Print hdfs split size stats to VLOG_QUERY and details to VLOG_FILE |
| /// Attaches split size summary to the appropriate runtime profile |
| void PrintFragmentInstanceInfo(); |
| |
| /// Collect scan node counters from the profile. |
| /// Assumes lock protecting profile and result is held. |
| void CollectScanNodeCounters(RuntimeProfile*, FragmentInstanceCounters* result); |
| |
| /// Runs cancel logic. Assumes that lock_ is held. |
| void CancelInternal(); |
| |
| /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when |
| /// the query is not being cancelled in the case where the query limit is reached. |
| void CancelFragmentInstances(); |
| |
| /// Acquires lock_ and updates query_status_ with 'status' if it's not already |
| /// an error status, and returns the current query_status_. |
| /// Calls CancelInternal() when switching to an error status. |
| /// failed_fragment is the fragment_id that has failed, used for error reporting along |
| /// with instance_hostname. |
| Status UpdateStatus(const Status& status, const TUniqueId& failed_fragment, |
| const std::string& instance_hostname); |
| |
| /// Returns only when either all fragment instances have reported success or the query |
| /// is in error. Returns the status of the query. |
| /// It is safe to call this concurrently, but any calls must be made only after Exec(). |
| /// WaitForAllInstances may be called before Wait(), but note that Wait() guarantees |
| /// that any coordinator fragment has finished, which this method does not. |
| Status WaitForAllInstances(); |
| |
| /// Perform any post-query cleanup required. Called by Wait() only after all fragment |
| /// instances have returned, or if the query has failed, in which case it only cleans up |
| /// temporary data rather than finishing the INSERT in flight. |
| Status FinalizeQuery(); |
| |
| /// Moves all temporary staging files to their final destinations. |
| Status FinalizeSuccessfulInsert(); |
| |
| /// Initializes the structures in fragment_profiles_. Must be called before RPCs to |
| /// start remote fragments. |
| void InitExecProfiles(); |
| |
| /// Initialize the structures to collect execution summary of every plan node |
| /// (exec_summary_ and plan_node_id_to_summary_map_) |
| void InitExecSummary(); |
| |
| /// Update fragment profile information from a fragment instance state. |
| void UpdateAverageProfile(InstanceState* instance_state); |
| |
| /// Compute the summary stats (completion_time and rates) |
| /// for an individual fragment_profile_ based on the specified instance state. |
| void ComputeFragmentSummaryStats(InstanceState* instance_state); |
| |
| /// Outputs aggregate query profile summary. This is assumed to be called at the end of |
| /// a query -- remote fragments' profiles must not be updated while this is running. |
| void ReportQuerySummary(); |
| |
| /// Populates the summary execution stats from the profile. Can only be called when the |
| /// query is done. |
| void UpdateExecSummary(const InstanceState& instance_state); |
| |
| /// Determines what the permissions of directories created by INSERT statements should |
| /// be if permission inheritance is enabled. Populates a map from all prefixes of |
| /// path_str (including the full path itself) which is a path in Hdfs, to pairs |
| /// (does_not_exist, permissions), where does_not_exist is true if the path does not |
| /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of |
| /// the most immediate ancestor of the path that does exist, i.e. the permissions that |
| /// the path should inherit when created. Otherwise permissions is set to the actual |
| /// permissions of the path. The PermissionCache argument is also used to cache the |
| /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the |
| /// same path. |
| typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache; |
| void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str, |
| PermissionCache* permissions_cache); |
| |
| /// Starts all fragment instances contained in the schedule by issuing RPCs in |
| /// parallel and then waiting for all of the RPCs to complete. Also sets up and |
| /// registers the state for all fragment instances. |
| void StartFInstances(); |
| |
| /// Calls CancelInternal() and returns an error if there was any error starting the |
| /// fragments. |
| /// Also updates query_profile_ with the startup latency histogram. |
| Status FinishInstanceStartup(); |
| |
| /// Build the filter routing table by iterating over all plan nodes and collecting the |
| /// filters that they either produce or consume. |
| void UpdateFilterRoutingTable(const FragmentExecParams& fragment_params); |
| }; |
| |
| } |
| |
| #endif |