// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gtest/gtest_prod.h> // for FRIEND_TEST
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "common/global-types.h"
#include "common/status.h"
#include "gen-cpp/StatestoreService_types.h"
#include "gutil/threading/thread_collision_warner.h"
#include "scheduling/executor-blacklist.h"
#include "scheduling/executor-group.h"
#include "statestore/statestore-subscriber.h"
#include "util/container-util.h"
#include "util/metrics-fwd.h"
namespace impala {
namespace test {
class SchedulerWrapper;
/// The ClusterMembershipMgr manages the local backend's membership in the cluster. It has
/// two roles:
/// - Provide a consistent view on the current cluster membership, i.e. other backends
/// - Establish and maintain the membership of the local backend in the cluster.
/// To do this, the class subscribes to the statestore cluster-membership topic and
/// applies incoming changes to its local copy of the cluster state. If it finds that the
/// local backend is missing from the cluster membership, it will add it (contingent on
/// the local backend being available after startup).
/// Clients of this class can obtain a consistent, immutable snapshot of the cluster
/// membership state through GetSnapshot().
/// The ClusterMembershipMgr keeps the membership snapshot stable while the statestore
/// subscriber is recovering from a connection failure. This allows other backends to
/// re-register with the statestore after a statestore restart.
/// TODO(IMPALA-8484): Allow specifying executor groups during backend startup. Currently
/// only one executor group named "default" exists. All backends are part of that group
/// and it's the only group available for scheduling.
/// The class also allows the local backend (ImpalaServer), the local Frontend and the
/// AdmissionController to register callbacks to receive notifications of changes to the
/// cluster membership. Note: The notifications for blacklisted executors are not sent
/// immediately, but deferred to when statestore updates are processed (see detailed
/// comment in the BlacklistExecutor method in the cc file). These callbacks are
/// registered before the statestore subscriber is started by the impala server which
/// ensures that the very first update to the cluster membership is sent to all callbacks.
/// TODO: Replace the usage of shared_ptr with atomic_shared_ptr once compilers support
/// it. Alternatively consider using Kudu's rw locks.
class ClusterMembershipMgr {
/// A immutable pointer to a backend descriptor. It is used to return a consistent,
/// immutable copy of a backend descriptor.
typedef std::shared_ptr<const TBackendDescriptor> BeDescSharedPtr;
/// Maps statestore subscriber IDs to backend descriptors.
typedef std::unordered_map<std::string, TBackendDescriptor> BackendIdMap;
/// Maps executor group names to executor groups. For now, only a default group exists
/// and all executors are part of that group.
typedef std::unordered_map<std::string, ExecutorGroup> ExecutorGroups;
// A snapshot of the current cluster membership. The ClusterMembershipMgr maintains a
// consistent copy of this and updates it atomically when the membership changes.
// Clients can obtain an immutable copy. Class instances can be created through the
// implicitly-defined default and copy constructors.
struct Snapshot {
Snapshot() = default;
Snapshot(const Snapshot&) = default;
/// The current backend descriptor of the local backend.
BeDescSharedPtr local_be_desc;
/// Map from unique backend ID to TBackendDescriptor for all known backends, including
/// those that are quiescing or blacklisted. The {backend ID, TBackendDescriptor}
/// pairs represent the IMPALA_MEMBERSHIP_TOPIC {key, value} pairs of known executors
/// retrieved from the statestore. It's important to track both the backend ID as well
/// as the TBackendDescriptor so we know what is being removed in a given update.
BackendIdMap current_backends;
/// A map of executor groups by their names. Only contains executors that are
/// available for scheduling queries on and not executors that are quiescing or
/// blacklisted.
ExecutorGroups executor_groups;
/// The local blacklist. Backends that are added to this will be present in
/// 'current_backends' but not in 'executor_groups'.
ExecutorBlacklist executor_blacklist;
/// The version of this Snapshot. It is incremented every time the cluster membership
/// changes.
int64_t version = 0;
/// An immutable shared membership snapshot.
typedef std::shared_ptr<const Snapshot> SnapshotPtr;
/// A callback to provide the local backend descriptor. Typically the ImpalaServer would
/// provide such a backend descriptor, and it's the task of code outside this class to
/// register the ImpalaServer with this class, e.g. after it has started up. Tests can
/// register their own callback to provide a local backend descriptor. No locks are held
/// when calling this callback.
typedef std::function<BeDescSharedPtr()> BackendDescriptorPtrFn;
/// A callback to provide the latest snapshot of cluster membership whenever there are
/// any changes to the membership.
typedef std::function<void(SnapshotPtr)> UpdateCallbackFn;
ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber,
MetricGroup* metrics);
/// Initializes instances of this class. This only sets up the statestore subscription.
/// Callbacks to the local ImpalaServer and Frontend must be registered in separate
/// steps.
Status Init();
/// The following functions allow users of this class to register callbacks for certain
/// events. They may be called at any time before or after calling Init() and are
/// thread-safe. Each of the functions may be called at most once during the lifetime of
/// the object, and the function 'fn' must not be empty.
/// Registers a callback to provide the local backend descriptor.
void SetLocalBeDescFn(BackendDescriptorPtrFn fn);
/// Registers a callback to be notified with the latest snapshot of cluster membership
/// whenever there are any changes to the membership.
void RegisterUpdateCallbackFn(UpdateCallbackFn fn);
/// Returns a read only snapshot of the current cluster membership state. May be called
/// before or after calling Init(). The returned shared pointer will always be non-null,
/// but it may point to an empty Snapshot, depending on the arrival of statestore
/// updates and the status of the local backend.
SnapshotPtr GetSnapshot() const;
/// Handler for statestore updates, called asynchronously when an update is received
/// from the subscription manager. This method processes incoming updates from the
/// statestore and applies them to the current membership state. It also ensures that
/// the local backend descriptor gets registered with the statestore and adds it to the
/// current membership state. This method changes the 'current_membership_' atomically.
/// This handler is registered with the statestore and must not be called directly,
/// except in tests.
void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
std::vector<TTopicDelta>* subscriber_topic_updates);
/// Adds the given backend to the local blacklist. Updates 'current_membership_' to
/// remove the backend from 'executor_groups' so that it will not be scheduled on.
void BlacklistExecutor(const TBackendDescriptor& be_desc);
/// Serializes and adds the local backend descriptor to 'subscriber_topic_updates'.
void AddLocalBackendToStatestore(const TBackendDescriptor& local_be_desc,
std::vector<TTopicDelta>* subscriber_topic_updates);
/// Returns the local backend descriptor or nullptr if no local backend has been
/// registered.
BeDescSharedPtr GetLocalBackendDescriptor();
/// Notifies all registered callbacks of the latest changes to the membership by sending
/// them the latest cluster membership snapshot.
void NotifyListeners(SnapshotPtr snapshot);
/// Atomically replaces a membership snapshot with a new copy.
void SetState(const SnapshotPtr& new_state);
/// Checks if the local backend is available and its registration in 'state' matches the
/// given backend descriptor in 'local_be_desc'. Returns false otherwise.
bool NeedsLocalBackendUpdate(const Snapshot& state,
const BeDescSharedPtr& local_be_desc);
/// Checks that the backend ID map is consistent with 'executor_groups' and
/// 'executor_blacklist', i.e. all executors in all groups are also present in the map
/// and are non-quiescing, non-blacklisted executors.
/// This method should only be called in debug builds.
bool CheckConsistency(const BackendIdMap& current_backends,
const ExecutorGroups& executor_groups, const ExecutorBlacklist& executor_blacklist);
/// Updates the membership metrics. Is registered as an updated callback function to
/// receive any membership updates. The only exception is that this is called directly
/// in BlacklistExecutor() where updates are not required to be sent to external
/// listeners.
void UpdateMetrics(const SnapshotPtr& new_state);
/// Ensures that only one thread is processing a membership update at a time, either
/// from a statestore update or a blacklisting decision. Must be taken before any other
/// locks in this class.
boost::mutex update_membership_lock_;
/// Membership metrics
IntCounter* total_live_executor_groups_ = nullptr;
IntCounter* total_healthy_executor_groups_ = nullptr;
IntCounter* total_backends_ = nullptr;
/// The snapshot of the current cluster membership. When receiving changes to the
/// executors configuration from the statestore we will make a copy of the stored
/// object, apply the updates to the copy and atomically swap the contents of this
/// pointer.
SnapshotPtr current_membership_;
/// Protects current_membership_. Cannot be held at the same time as
/// 'callback_fn_lock_'.
mutable boost::mutex current_membership_lock_;
/// A temporary membership snapshot to hold updates while the statestore is in its
/// post-recovery grace period. Not exposed to clients. Protected by
/// 'update_membership_lock_'.
std::shared_ptr<Snapshot> recovering_membership_;
/// Pointer to a subscription manager (which we do not own) which is used to register
/// for dynamic updates to the set of available backends. May be nullptr if the set of
/// backends is fixed (only useful for tests). Thread-safe, not protected by a lock.
StatestoreSubscriber* statestore_subscriber_;
/// Serializes TBackendDescriptors when creating topic updates. Not protected by a lock
/// - only used in statestore thread.
ThriftSerializer thrift_serializer_;
/// Unique - across the cluster - identifier for this impala backend. Used to validate
/// incoming backend descriptors and to register this backend with the statestore. Not
/// protected by a lock - only used in the statestore thread.
std::string local_backend_id_;
/// Callbacks that provide external dependencies.
BackendDescriptorPtrFn local_be_desc_fn_;
std::vector<UpdateCallbackFn> update_callback_fns_;
/// Protects the callbacks. Cannot be held at the same time as
/// 'current_membership_lock_'.
mutable boost::mutex callback_fn_lock_;
friend class impala::test::SchedulerWrapper;
} // end namespace impala