// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <deque>
#include "common/logging.h"
#include "common/names.h"
#include "gen-cpp/StatestoreService_types.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/cluster-membership-test-util.h"
#include "service/impala-server.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include "util/metrics.h"
using std::mt19937;
using std::uniform_int_distribution;
using std::uniform_real_distribution;
using namespace impala;
using namespace impala::test;
namespace impala {
/// This class and the following tests exercise the ClusterMembershipMgr core membership
/// handling code by simulating the interactions between multiple ClusterMembershipMgr
/// instances through the statestore. Updates between all cluster members are sent and
/// process sequentially and in a deterministic order.
/// The tests progress in 4 subsequently more sophisticated ways:
/// 1) Make sure that simple interactions between 2 backends and their
/// ClusterMembershipMgr work correctly.
/// 2) Run a cluster of backends through the regular lifecycle of a backend in lockstep.
/// 3) Make random but valid changes to the membership of a whole cluster for a given
/// number of iterations and observe that each member's state remains consistent.
/// 4) TODO: Make random, potentially invalid changes to the membership of a whole
/// cluster.
class ClusterMembershipMgrTest : public testing::Test {
virtual void SetUp() {
ClusterMembershipMgrTest() {}
/// Returns the size of the default executor group of the current membership in 'cmm'
/// if the default executor group exists, otherwise returns 0.
int GetDefaultGroupSize(const ClusterMembershipMgr& cmm) const {
const string& group_name = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
if (cmm.GetSnapshot()->executor_groups.find(group_name)
== cmm.GetSnapshot()->executor_groups.end()) {
return 0;
return cmm.GetSnapshot()->executor_groups.find(group_name)->second.NumExecutors();
/// A struct to hold information related to a simulated backend during the test.
struct Backend {
UniqueIdPB backend_id;
std::unique_ptr<MetricGroup> metric_group;
std::unique_ptr<ClusterMembershipMgr> cmm;
std::shared_ptr<BackendDescriptorPB> desc;
/// The list of backends_ that owns all backends.
vector<unique_ptr<Backend>> backends_;
/// Various lists of pointers that point into elements of 'backends_'. These pointers
/// will stay valid when backend_ resizes. Backends can be in one of 5 states:
/// - "Offline": A BackendDescriptorPB has been created but has not been associated with
/// a ClusterMembershipMgr.
/// - "Starting": A ClusterMembershipMgr has been created, but the associated backend
/// descriptor is not running yet (no callback registered with the
/// ClusterMembershipMgr).
/// - "Running": The backend descriptor is available to the ClusterMembershipMgr via a
/// callback.
/// - "Quiescing": The backend descriptor is marked as quiescing.
/// - "Deleted": The backend has been deleted from 'backends_' altogether and other
/// backends have received statestore messages to notify them of the deletion. Note
/// that transition into this state does not update the affected backend itself, it
/// merely gets destructed.
/// As part of the state transitions, all other backends are notified of the new state
/// by calling their UpdateMembership() methods with a matching statestore update
/// message.
/// We keep backends in double ended queues to allow for efficient removal of elements
/// on both ends.
typedef deque<Backend*> Backends;
bool IsInVector(const Backend* be, const Backends& backends) {
return find(backends.begin(), backends.end(), be) != backends.end();
void RemoveFromVector(const Backend* be, Backends* backends) {
auto it = find(backends->begin(), backends->end(), be);
ASSERT_TRUE(it != backends->end());
it = find(backends->begin(), backends->end(), be);
ASSERT_TRUE(it == backends->end());
/// Lists that are used to track backends in a particular state. Methods that manipulate
/// these lists maintain the invariant that a backend is only in one list at a time.
/// Backends that are in state "Offline".
Backends offline_;
/// Backends that are in state "Starting".
Backends starting_;
/// Backends that are in state "Running".
Backends running_;
/// Backends that are in state "Quiescing".
Backends quiescing_;
typedef vector<TTopicDelta> TopicDeltas;
/// Polls a backend for changes to its local state by sending an empty statestore update
/// that is marked as a delta (i.e. it does not represent any changes).
vector<TTopicDelta> Poll(Backend* be) {
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
// The empty delta is used to poll subscribers for updates without sending new
// changes.
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
TTopicDelta& empty_delta = topic_delta_map[topic_id];
empty_delta.is_delta = true;
vector<TTopicDelta> returned_topic_deltas;
be->cmm->UpdateMembership(topic_delta_map, &returned_topic_deltas);
return returned_topic_deltas;
/// Sends a single topic item in a delta to all backends in 'backend_'.
void SendDelta(const TTopicItem& item) {
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map;
TTopicDelta& delta = topic_delta_map[topic_id];
delta.is_delta = true;
for (auto& backend : backends_) {
vector<TTopicDelta> returned_topic_deltas;
backend->cmm->UpdateMembership(topic_delta_map, &returned_topic_deltas);
// We never expect backends to respond with a delta update on their own because the
// test code explicitly polls them after making changes to their state.
ASSERT_EQ(0, returned_topic_deltas.size())
<< "Error with backend " << backend->backend_id;
/// Creates a new backend and adds it to the list of offline backends. If idx is
/// omitted, the current number of backends will be used as the new index.
Backend* CreateBackend(int idx = -1) {
if (idx == -1) idx = backends_.size();
auto& be = backends_.back();
be->desc = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(idx));
be->backend_id = be->desc->backend_id();
return be.get();
/// Creates a new ClusterMembershipMgr for a backend and moves the backend from
/// 'offline_' to 'starting_'. Callers must handle invalidated iterators after calling
/// this method.
void CreateCMM(Backend* be) {
ASSERT_TRUE(IsInVector(be, offline_));
be->metric_group = make_unique<MetricGroup>("test");
be->cmm = make_unique<ClusterMembershipMgr>(
PrintId(be->backend_id), nullptr, be->metric_group.get());
RemoveFromVector(be, &offline_);
/// Starts a backend by making its backend descriptor available to its
/// ClusterMembershipMgr through a callback. This method also propagates the change to
/// all other backends in 'backends_' and moves the backend from 'starting_' to
/// 'running_'.
void StartBackend(Backend* be) {
ASSERT_TRUE(IsInVector(be, starting_));
ASSERT_TRUE(be->cmm.get() != nullptr);
ASSERT_TRUE(be->desc.get() != nullptr);
auto be_cb = [be]() { return be->desc; };
// Poll to obtain topic update
TopicDeltas topic_deltas = Poll(be);
ASSERT_EQ(1, topic_deltas.size());
ASSERT_EQ(1, topic_deltas[0].topic_entries.size());
// Broadcast to all other backends
RemoveFromVector(be, &starting_);
/// Quiesces a backend by updating its backend descriptor and polling its
/// ClusterMembershipMgr to make the change take effect. The resulting update from the
/// backend's ClusterMembershipMgr is then broadcast to the rest of the cluster. Also
/// moves the backend from 'running_' to 'quiescing_'.
void QuiesceBackend(Backend* be) {
ASSERT_TRUE(IsInVector(be, running_));
TopicDeltas topic_deltas = Poll(be);
ASSERT_EQ(1, topic_deltas.size());
ASSERT_EQ(1, topic_deltas[0].topic_entries.size());
// Broadcast to all other backends
RemoveFromVector(be, &running_);
/// Deletes a backend from all other backends and from 'backends_'. A delta marking the
/// backend as deleted gets sent to all nodes in 'backends_'. Note that this method does
/// not send any updates to the backend itself, as it would - like in a real cluster -
/// just disappear. The backend must be in 'running_' or 'quiescing_' and is removed
/// from the respective list by this method.
void DeleteBackend(Backend* be) {
bool is_running = IsInVector(be, running_);
bool is_quiescing = IsInVector(be, quiescing_);
ASSERT_TRUE(is_running || is_quiescing);
// Create topic item before erasing the backend.
TTopicItem deletion_item;
deletion_item.key = PrintId(be->backend_id);
deletion_item.deleted = true;
// Delete the backend
auto new_end_it = std::remove_if(backends_.begin(), backends_.end(),
[be](const unique_ptr<Backend>& elem) { return elem.get() == be; });
backends_.erase(new_end_it, backends_.end());
// Create deletion update
if (is_running) RemoveFromVector(be, &running_);
if (is_quiescing) RemoveFromVector(be, &quiescing_);
mt19937 rng_;
int RandomInt(int max) {
uniform_int_distribution<int> rand_int(0, max - 1);
return rand_int(rng_);
double RandomDoubleFraction() {
uniform_real_distribution<double> rand_double(0, 1);
return rand_double(rng_);
/// This test takes two instances of the ClusterMembershipMgr through a common lifecycle.
/// It also serves as an example for how to craft statestore messages and pass them to
/// UpdaUpdateMembership().
TEST_F(ClusterMembershipMgrTest, TwoInstances) {
auto b1 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(1));
auto b2 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(2));
MetricGroup tmp_metrics1("test-metrics1");
MetricGroup tmp_metrics2("test-metrics2");
ClusterMembershipMgr cmm1(b1->address().hostname(), nullptr, &tmp_metrics1);
ClusterMembershipMgr cmm2(b2->address().hostname(), nullptr, &tmp_metrics2);
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
TTopicDelta* ss_topic_delta = &topic_delta_map[topic_id];
vector<TTopicDelta> returned_topic_deltas;
// The empty delta is used to poll subscribers for updates without sending new changes.
TTopicDelta empty_delta;
empty_delta.is_delta = true;
// Ping both managers, both should have no state to update
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
// Hook up first callback and iterate again
cmm1.SetLocalBeDescFn([b1]() { return b1; });
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(1, returned_topic_deltas.size());
// First manager now has one BE
ASSERT_EQ(1, cmm1.GetSnapshot()->current_backends.size());
// Hook up second callback and iterate with the result of the first manager
cmm2.SetLocalBeDescFn([b2]() { return b2; });
*ss_topic_delta = returned_topic_deltas[0];
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(1, returned_topic_deltas.size());
ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
// Send the returned update to the first manager, this time no deltas will be returned
*ss_topic_delta = returned_topic_deltas[0];
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
// Both managers now have the same state. Shutdown one of them and step through
// propagating the update.
// Send an empty update to the 1st one to trigger propagation of the shutdown
topic_delta_map[topic_id] = empty_delta;
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
// The mgr will return its changed BackendDescriptorPB
ASSERT_EQ(1, returned_topic_deltas.size());
// It will also remove itself from the executor group (but not the current backends).
ASSERT_EQ(1, GetDefaultGroupSize(cmm1));
ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
// Propagate the quiescing to the 2nd mgr
*ss_topic_delta = returned_topic_deltas[0];
ASSERT_EQ(2, GetDefaultGroupSize(cmm2));
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
// Delete the 1st backend from the 2nd one
ASSERT_EQ(1, ss_topic_delta->topic_entries.size());
ss_topic_delta->topic_entries[0].deleted = true;
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
TEST_F(ClusterMembershipMgrTest, IsBlacklisted) {
const int NUM_BACKENDS = 2;
for (int i = 0; i < NUM_BACKENDS; ++i) CreateBackend();
EXPECT_EQ(NUM_BACKENDS, backends_.size());
EXPECT_EQ(backends_.size(), offline_.size());
while (!offline_.empty()) CreateCMM(offline_.front());
EXPECT_EQ(0, offline_.size());
EXPECT_EQ(NUM_BACKENDS, starting_.size());
while (!starting_.empty()) StartBackend(starting_.front());
EXPECT_EQ(0, starting_.size());
EXPECT_EQ(NUM_BACKENDS, running_.size());
backends_[0]->cmm->BlacklistExecutor(backends_[1]->desc->backend_id(), Status("error"));
ClusterMembershipMgr::SnapshotPtr snapshot = backends_[0]->cmm->GetSnapshot();
// This test verifies the interaction between the ExecutorBlacklist and
// ClusterMembershipMgr.
TEST_F(ClusterMembershipMgrTest, ExecutorBlacklist) {
// Set some flags to make the blacklist timeout fairly small (50ms);
gflags::FlagSaver saver;
FLAGS_statestore_max_missed_heartbeats = 5;
FLAGS_statestore_heartbeat_frequency_ms = 10;
const int BLACKLIST_TIMEOUT_SLEEP_US = 100000;
const int NUM_BACKENDS = 3;
for (int i = 0; i < NUM_BACKENDS; ++i) CreateBackend();
EXPECT_EQ(NUM_BACKENDS, backends_.size());
EXPECT_EQ(backends_.size(), offline_.size());
while (!offline_.empty()) CreateCMM(offline_.front());
EXPECT_EQ(0, offline_.size());
EXPECT_EQ(NUM_BACKENDS, starting_.size());
while (!starting_.empty()) StartBackend(starting_.front());
EXPECT_EQ(0, starting_.size());
EXPECT_EQ(NUM_BACKENDS, running_.size());
// Assert that all backends know about each other and are all in the default executor
// group.
for (Backend* be : running_) {
EXPECT_EQ(running_.size(), be->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(running_.size(), GetDefaultGroupSize(*be->cmm));
// Tell a BE to blacklist itself, should have no effect.
backends_[0]->cmm->BlacklistExecutor(backends_[0]->desc->backend_id(), Status("error"));
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS, GetDefaultGroupSize(*backends_[0]->cmm));
// Tell a BE to blacklist another BE, should remove it from executor_groups but not
// current_backends.
backends_[0]->cmm->BlacklistExecutor(backends_[1]->desc->backend_id(), Status("error"));
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Blacklist a BE that is already blacklisted. Should have no effect.
backends_[0]->cmm->BlacklistExecutor(backends_[1]->desc->backend_id(), Status("error"));
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Sleep and check the node has been un-blacklisted.
EXPECT_EQ(Poll(backends_[0].get()).size(), 0);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS, GetDefaultGroupSize(*backends_[0]->cmm));
// Blacklist the BE and sleep again.
backends_[0]->cmm->BlacklistExecutor(backends_[1]->desc->backend_id(), Status("error"));
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Quiesce the blacklisted BE. The update to quiesce it will arrive in the same call to
// UpdateMembership() that it would have been un-blacklisted.
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Try blacklisting the quiesced BE, should have no effect.
backends_[0]->cmm->BlacklistExecutor(backends_[1]->desc->backend_id(), Status("error"));
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Blacklist another BE and sleep.
backends_[0]->cmm->BlacklistExecutor(backends_[2]->desc->backend_id(), Status("error"));
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm));
// Delete the blacklisted BE. The update to delete it will arrive in the same call to
// UpdateMembership() that it would have been un-blacklisted.
EXPECT_EQ(NUM_BACKENDS - 1, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm));
// This test runs a group of 20 backends through their full lifecycle, validating that
// their state is correctly propagated through the cluster after every change.
TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
const int NUM_BACKENDS = 20;
for (int i = 0; i < NUM_BACKENDS; ++i) {
EXPECT_EQ(NUM_BACKENDS, backends_.size());
EXPECT_EQ(backends_.size(), offline_.size());
while (!offline_.empty()) CreateCMM(offline_.front());
ASSERT_EQ(0, offline_.size());
ASSERT_EQ(NUM_BACKENDS, starting_.size());
while (!starting_.empty()) StartBackend(starting_.front());
ASSERT_EQ(0, starting_.size());
ASSERT_EQ(NUM_BACKENDS, running_.size());
// Assert that all backends know about each other and are all in the default executor
// group.
for (Backend* be : running_) {
EXPECT_EQ(running_.size(), be->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(running_.size(), GetDefaultGroupSize(*be->cmm));
// Quiesce half of the backends.
for (int i = 0; quiescing_.size() < NUM_BACKENDS / 2; ++i) {
Backend* be = running_.front();
// All backends must still remain online
EXPECT_EQ(NUM_BACKENDS, be->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - i, GetDefaultGroupSize(*be->cmm));
// Make sure that the numbers drop
EXPECT_EQ(NUM_BACKENDS - i - 1, GetDefaultGroupSize(*be->cmm));
int num_still_running = NUM_BACKENDS - quiescing_.size();
ASSERT_EQ(num_still_running, running_.size());
ASSERT_EQ(NUM_BACKENDS / 2, quiescing_.size());
for (auto& be : backends_) {
// All backends are still registered
EXPECT_EQ(backends_.size(), be->cmm->GetSnapshot()->current_backends.size());
// Executor groups now show half of the backends remaining
EXPECT_EQ(num_still_running, GetDefaultGroupSize(*be->cmm));
// Delete half of the backends and make sure that the other half learned about it.
int to_delete = backends_.size() / 2;
int num_expected_alive = backends_.size() - to_delete;
for (int idx = 0; idx < to_delete; ++idx) {
// Will change backends_
if (idx % 2 == 0) {
} else {
ASSERT_EQ(num_expected_alive, quiescing_.size() + running_.size());
for (Backend* be : quiescing_) {
EXPECT_EQ(num_expected_alive, be->cmm->GetSnapshot()->current_backends.size());
// Quiesce the remaining backends to validate that executor groups can scale to 0
// backends.
while (!running_.empty()) QuiesceBackend(running_.front());
for (auto& be : backends_) {
// Executor groups now are empty
EXPECT_EQ(0, GetDefaultGroupSize(*be->cmm));
/// This test executes a number of random changes to cluster membership. On every
/// iteration a new backend is created, and with some probability, a backend is quiesced,
/// removed from the cluster after having been quiesced before, or removed from the
/// cluster without quiescing. The test relies on the consistency checks built into the
/// ClusterMembershipMgr to ensure that it is in a consistent state.
TEST_F(ClusterMembershipMgrTest, RandomizedMembershipUpdates) {
// TODO: Parameterize this test and run with several parameter sets
const int NUM_ITERATIONS = 100;
const double P_ADD = 1;
const double P_QUIESCE = 0.35;
const double P_DELETE = 0.30;
const double P_KILL = 0.2;
// Cumulative counts of how many backends were added/shutdown/deleted/killed by the
// tests.
int num_added = 0;
int num_shutdown = 0;
int num_deleted = 0;
// In this test "killing" a backend means deleting it without quiescing it first to
// simulate non-graceful failures.
int num_killed = 0;
for (int i = 0; i < NUM_ITERATIONS; ++i) {
double p = RandomDoubleFraction();
if (p < P_ADD) {
Backend* be = CreateBackend(i);
if (p < P_QUIESCE && !running_.empty()) {
int idx = RandomInt(running_.size());
Backend* be = running_[idx];
if (p < P_DELETE && !quiescing_.empty()) {
int idx = RandomInt(quiescing_.size());
Backend* be = quiescing_[idx];
if (p < P_KILL && !running_.empty()) {
int idx = RandomInt(running_.size());
Backend* be = running_[idx];
std::cout << "Added: " << num_added << ", shutdown: " << num_shutdown << ", deleted: "
<< num_deleted << ", killed: " << num_killed << endl;
/// This tests various valid and invalid cases that the parsing logic in
/// PopulateExpectedExecGroupSets can encounter.
TEST(ClusterMembershipMgrUnitTest, TestPopulateExpectedExecGroupSets) {
gflags::FlagSaver saver;
vector<TExecutorGroupSet> expected_exec_group_sets;
// Case 1: Empty string
FLAGS_expected_executor_group_sets = "";
Status status =
// Case 2: Single valid group set
FLAGS_expected_executor_group_sets = "group-prefix1:2";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
EXPECT_EQ(expected_exec_group_sets.size(), 1);
EXPECT_EQ(expected_exec_group_sets[0].exec_group_name_prefix, "group-prefix1");
EXPECT_EQ(expected_exec_group_sets[0].expected_num_executors, 2);
// Case 3: Multiple valid group sets
FLAGS_expected_executor_group_sets = "group-prefix1:2,group-prefix2:10";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
EXPECT_EQ(expected_exec_group_sets.size(), 2);
EXPECT_EQ(expected_exec_group_sets[0].exec_group_name_prefix, "group-prefix1");
EXPECT_EQ(expected_exec_group_sets[0].expected_num_executors, 2);
EXPECT_EQ(expected_exec_group_sets[1].exec_group_name_prefix, "group-prefix2");
EXPECT_EQ(expected_exec_group_sets[1].expected_num_executors, 10);
// Case 4: Multiple valid group sets but out of order, output is expected to return in
// increasing order of expected group size
FLAGS_expected_executor_group_sets = "group-prefix1:10,group-prefix2:2";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
EXPECT_EQ(expected_exec_group_sets.size(), 2);
EXPECT_EQ(expected_exec_group_sets[0].exec_group_name_prefix, "group-prefix2");
EXPECT_EQ(expected_exec_group_sets[0].expected_num_executors, 2);
EXPECT_EQ(expected_exec_group_sets[1].exec_group_name_prefix, "group-prefix1");
EXPECT_EQ(expected_exec_group_sets[1].expected_num_executors, 10);
// Case 5: Invalid input for expected group size
FLAGS_expected_executor_group_sets = "group-prefix1:2abc";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
"Failed to parse expected executor group set size for input: group-prefix1:2abc\n");
// Case 6: Invalid input with no expected group size
FLAGS_expected_executor_group_sets = "group-prefix1:";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
"Failed to parse expected executor group set size for input: group-prefix1:\n");
// Case 7: Invalid input with no group prefix
FLAGS_expected_executor_group_sets = ":1";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
"Executor group set prefix cannot be empty for input: :1\n");
// Case 8: Invalid input with no colon separator
FLAGS_expected_executor_group_sets = "group-prefix1";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
"Invalid executor group set format: group-prefix1\n");
// Case 9: Invalid input with duplicated group prefix
FLAGS_expected_executor_group_sets = "group-prefix1:2,group-prefix1:10";
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
"Executor group set prefix specified multiple times: group-prefix1:10\n");
/// This ensures that all executor group configuration scenarios possible using available
/// startup flags are handled correctly when populating membership updates that are
/// sent to the frontend.
TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) {
gflags::FlagSaver saver;
FLAGS_num_expected_executors = 20;
auto snapshot_ptr = std::make_shared<ClusterMembershipMgr::Snapshot>();
TUpdateExecutorMembershipRequest update_req;
vector<TExecutorGroupSet> empty_exec_group_sets;
vector<TExecutorGroupSet> populated_exec_group_sets;
// Case 1a: Not using executor groups
ExecutorGroup exec_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, 0));
snapshot_ptr->executor_groups.insert({, exec_group});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 1);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
// Case 1b: Not using executor groups but expected_exec_group_sets is non-empty
ExecutorGroup exec_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, 0));
snapshot_ptr->executor_groups.insert({, exec_group});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 1);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
// Case 2a: Using executor groups, expected_exec_group_sets is empty
ExecutorGroup exec_group("foo-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({, exec_group});
// Adding another exec group with more executors.
ExecutorGroup exec_group2("foo-group2", 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({, exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 1);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
// Case 2b: Using executor groups, expected_exec_group_sets is empty, executor groups
// with different group prefixes
ExecutorGroup exec_group("foo-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({, exec_group});
// Adding another exec group with a different group prefix.
ExecutorGroup exec_group2("bar-group1", 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({, exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 1);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
// Case 2c: Using executor groups, expected_exec_group_sets is non-empty
ExecutorGroup exec_group("foo-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({, exec_group});
// Adding another exec group with a different group prefix.
ExecutorGroup exec_group2("bar-group1", 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({, exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 2);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo");
EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10);
EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar");
// Case 2d: Using executor groups, expected_exec_group_sets is non-empty
// and one executor group that does not match to any executor group sets having more
// number of executor groups
ExecutorGroup exec_group("foo-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({, exec_group});
// Adding another exec group with a different group prefix.
ExecutorGroup exec_group2("unmatch-group1", 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({, exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 2);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo");
EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 0);
EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10);
EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar");
template <class T>
static bool has(const vector<T>& v, const T& m) {
return find(v.begin(), v.end(), m) != v.end();
/// Test that we can get a list of coordinators.
TEST_F(ClusterMembershipMgrTest, GetCoordinatorAddresses) {
// Initialize all backends early. Test methods handle state propagation through the
// backends_ list, which must be fully initialized before starting backends.
Backend* coordinator0 = CreateBackend();
const NetworkAddressPB& addr0 = coordinator0->desc->address();
Backend* coordinator1 = CreateBackend();
const NetworkAddressPB& addr1 = coordinator1->desc->address();
Backend* executor = CreateBackend();
EXPECT_EQ(0, coordinator0->cmm->GetSnapshot()->GetCoordinatorAddresses().size());
vector<TNetworkAddress> orig_coordinators =
EXPECT_EQ(1, orig_coordinators.size());
EXPECT_EQ(FromNetworkAddressPB(addr0), orig_coordinators[0]);
EXPECT_EQ(orig_coordinators, executor->cmm->GetSnapshot()->GetCoordinatorAddresses());
orig_coordinators = coordinator0->cmm->GetSnapshot()->GetCoordinatorAddresses();
EXPECT_EQ(2, orig_coordinators.size());
// List of coordinators is unsorted.
EXPECT_TRUE(has(orig_coordinators, FromNetworkAddressPB(addr0)));
EXPECT_TRUE(has(orig_coordinators, FromNetworkAddressPB(addr1)));
EXPECT_EQ(orig_coordinators, executor->cmm->GetSnapshot()->GetCoordinatorAddresses());
/// TODO: Write a test that makes a number of random changes to cluster membership while
/// not maintaining the proper lifecycle steps that a backend goes through (create, start,
/// quiesce, delete).
} // end namespace impala