blob: e3e24d1d2904641be03cfe8f19046c5bfacceb6d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <deque>
#include "common/logging.h"
#include "common/names.h"
#include "gen-cpp/StatestoreService_types.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/cluster-membership-test-util.h"
#include "service/impala-server.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include "util/metrics.h"
using std::mt19937;
using std::uniform_int_distribution;
using std::uniform_real_distribution;
using namespace impala;
using namespace impala::test;
DECLARE_int32(statestore_max_missed_heartbeats);
DECLARE_int32(statestore_heartbeat_frequency_ms);
namespace impala {
/// This class and the following tests exercise the ClusterMembershipMgr core membership
/// handling code by simulating the interactions between multiple ClusterMembershipMgr
/// instances through the statestore. Updates between all cluster members are sent and
/// process sequentially and in a deterministic order.
///
/// The tests progress in 4 subsequently more sophisticated ways:
/// 1) Make sure that simple interactions between 2 backends and their
/// ClusterMembershipMgr work correctly.
/// 2) Run a cluster of backends through the regular lifecycle of a backend in lockstep.
/// 3) Make random but valid changes to the membership of a whole cluster for a given
/// number of iterations and observe that each member's state remains consistent.
/// 4) TODO: Make random, potentially invalid changes to the membership of a whole
/// cluster.
class ClusterMembershipMgrTest : public testing::Test {
public:
virtual void SetUp() {
RandTestUtil::SeedRng("CLUSTER_MEMBERSHIP_MGR_TEST_SEED", &rng_);
}
protected:
ClusterMembershipMgrTest() {}
/// Returns the size of the default executor group of the current membership in 'cmm'.
int GetDefaultGroupSize(const ClusterMembershipMgr& cmm) const {
const string& group_name = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
return cmm.GetSnapshot()->executor_groups.find(group_name)->second.NumExecutors();
}
/// A struct to hold information related to a simulated backend during the test.
struct Backend {
string backend_id;
std::unique_ptr<MetricGroup> metric_group;
std::unique_ptr<ClusterMembershipMgr> cmm;
std::shared_ptr<TBackendDescriptor> desc;
};
/// The list of backends_ that owns all backends.
vector<unique_ptr<Backend>> backends_;
/// Various lists of pointers that point into elements of 'backends_'. These pointers
/// will stay valid when backend_ resizes. Backends can be in one of 5 states:
/// - "Offline": A TBackendDescriptor has been created but has not been associated with
/// a ClusterMembershipMgr.
/// - "Starting": A ClusterMembershipMgr has been created, but the associated backend
/// descriptor is not running yet (no callback registered with the
/// ClusterMembershipMgr).
/// - "Running": The backend descriptor is available to the ClusterMembershipMgr via a
/// callback.
/// - "Quiescing": The backend descriptor is marked as quiescing.
/// - "Deleted": The backend has been deleted from 'backends_' altogether and other
/// backends have received statestore messages to notify them of the deletion. Note
/// that transition into this state does not update the affected backend itself, it
/// merely gets destructed.
///
/// As part of the state transitions, all other backends are notified of the new state
/// by calling their UpdateMembership() methods with a matching statestore update
/// message.
///
/// We keep backends in double ended queues to allow for efficient removal of elements
/// on both ends.
typedef deque<Backend*> Backends;
bool IsInVector(const Backend* be, const Backends& backends) {
return find(backends.begin(), backends.end(), be) != backends.end();
}
void RemoveFromVector(const Backend* be, Backends* backends) {
auto it = find(backends->begin(), backends->end(), be);
ASSERT_TRUE(it != backends->end());
backends->erase(it);
it = find(backends->begin(), backends->end(), be);
ASSERT_TRUE(it == backends->end());
}
/// Lists that are used to track backends in a particular state. Methods that manipulate
/// these lists maintain the invariant that a backend is only in one list at a time.
/// Backends that are in state "Offline".
Backends offline_;
/// Backends that are in state "Starting".
Backends starting_;
/// Backends that are in state "Running".
Backends running_;
/// Backends that are in state "Quiescing".
Backends quiescing_;
typedef vector<TTopicDelta> TopicDeltas;
/// Polls a backend for changes to its local state by sending an empty statestore update
/// that is marked as a delta (i.e. it does not represent any changes).
vector<TTopicDelta> Poll(Backend* be) {
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
// The empty delta is used to poll subscribers for updates without sending new
// changes.
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
TTopicDelta& empty_delta = topic_delta_map[topic_id];
empty_delta.is_delta = true;
vector<TTopicDelta> returned_topic_deltas;
be->cmm->UpdateMembership(topic_delta_map, &returned_topic_deltas);
return returned_topic_deltas;
}
/// Sends a single topic item in a delta to all backends in 'backend_'.
void SendDelta(const TTopicItem& item) {
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map;
TTopicDelta& delta = topic_delta_map[topic_id];
delta.topic_entries.push_back(item);
delta.is_delta = true;
for (auto& backend : backends_) {
vector<TTopicDelta> returned_topic_deltas;
backend->cmm->UpdateMembership(topic_delta_map, &returned_topic_deltas);
// We never expect backends to respond with a delta update on their own because the
// test code explicitly polls them after making changes to their state.
ASSERT_EQ(0, returned_topic_deltas.size())
<< "Error with backend " << backend->backend_id;
}
}
/// Creates a new backend and adds it to the list of offline backends. If idx is
/// omitted, the current number of backends will be used as the new index.
Backend* CreateBackend(int idx = -1) {
if (idx == -1) idx = backends_.size();
backends_.push_back(make_unique<Backend>());
auto& be = backends_.back();
be->desc = make_shared<TBackendDescriptor>(MakeBackendDescriptor(idx));
be->backend_id = be->desc->address.hostname;
offline_.push_back(be.get());
return be.get();
}
/// Creates a new ClusterMembershipMgr for a backend and moves the backend from
/// 'offline_' to 'starting_'. Callers must handle invalidated iterators after calling
/// this method.
void CreateCMM(Backend* be) {
ASSERT_TRUE(IsInVector(be, offline_));
be->metric_group = make_unique<MetricGroup>("test");
be->cmm = make_unique<ClusterMembershipMgr>(
be->backend_id, nullptr, be->metric_group.get());
RemoveFromVector(be, &offline_);
starting_.push_back(be);
}
/// Starts a backend by making its backend descriptor available to its
/// ClusterMembershipMgr through a callback. This method also propagates the change to
/// all other backends in 'backends_' and moves the backend from 'starting_' to
/// 'running_'.
void StartBackend(Backend* be) {
ASSERT_TRUE(IsInVector(be, starting_));
ASSERT_TRUE(be->cmm.get() != nullptr);
ASSERT_TRUE(be->desc.get() != nullptr);
auto be_cb = [be]() { return be->desc; };
be->cmm->SetLocalBeDescFn(be_cb);
// Poll to obtain topic update
TopicDeltas topic_deltas = Poll(be);
ASSERT_EQ(1, topic_deltas.size());
ASSERT_EQ(1, topic_deltas[0].topic_entries.size());
// Broadcast to all other backends
SendDelta(topic_deltas[0].topic_entries[0]);
RemoveFromVector(be, &starting_);
running_.push_back(be);
}
/// Quiesces a backend by updating its backend descriptor and polling its
/// ClusterMembershipMgr to make the change take effect. The resulting update from the
/// backend's ClusterMembershipMgr is then broadcast to the rest of the cluster. Also
/// moves the backend from 'running_' to 'quiescing_'.
void QuiesceBackend(Backend* be) {
ASSERT_TRUE(IsInVector(be, running_));
be->desc->__set_is_quiescing(true);
TopicDeltas topic_deltas = Poll(be);
ASSERT_EQ(1, topic_deltas.size());
ASSERT_EQ(1, topic_deltas[0].topic_entries.size());
// Broadcast to all other backends
SendDelta(topic_deltas[0].topic_entries[0]);
RemoveFromVector(be, &running_);
quiescing_.push_back(be);
}
/// Deletes a backend from all other backends and from 'backends_'. A delta marking the
/// backend as deleted gets sent to all nodes in 'backends_'. Note that this method does
/// not send any updates to the backend itself, as it would - like in a real cluster -
/// just disappear. The backend must be in 'running_' or 'quiescing_' and is removed
/// from the respective list by this method.
void DeleteBackend(Backend* be) {
bool is_running = IsInVector(be, running_);
bool is_quiescing = IsInVector(be, quiescing_);
ASSERT_TRUE(is_running || is_quiescing);
// Create topic item before erasing the backend.
TTopicItem deletion_item;
deletion_item.key = be->backend_id;
deletion_item.deleted = true;
// Delete the backend
auto new_end_it = std::remove_if(backends_.begin(), backends_.end(),
[be](const unique_ptr<Backend>& elem) { return elem.get() == be; });
backends_.erase(new_end_it, backends_.end());
// Create deletion update
SendDelta(deletion_item);
if (is_running) RemoveFromVector(be, &running_);
if (is_quiescing) RemoveFromVector(be, &quiescing_);
}
mt19937 rng_;
int RandomInt(int max) {
uniform_int_distribution<int> rand_int(0, max - 1);
return rand_int(rng_);
}
double RandomDoubleFraction() {
uniform_real_distribution<double> rand_double(0, 1);
return rand_double(rng_);
}
};
/// This test takes two instances of the ClusterMembershipMgr through a common lifecycle.
/// It also serves as an example for how to craft statestore messages and pass them to
/// UpdaUpdateMembership().
TEST_F(ClusterMembershipMgrTest, TwoInstances) {
auto b1 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(1));
auto b2 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(2));
MetricGroup tmp_metrics1("test-metrics1");
MetricGroup tmp_metrics2("test-metrics2");
ClusterMembershipMgr cmm1(b1->address.hostname, nullptr, &tmp_metrics1);
ClusterMembershipMgr cmm2(b2->address.hostname, nullptr, &tmp_metrics2);
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
TTopicDelta* ss_topic_delta = &topic_delta_map[topic_id];
vector<TTopicDelta> returned_topic_deltas;
// The empty delta is used to poll subscribers for updates without sending new changes.
TTopicDelta empty_delta;
empty_delta.is_delta = true;
// Ping both managers, both should have no state to update
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
// Hook up first callback and iterate again
cmm1.SetLocalBeDescFn([b1]() { return b1; });
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(1, returned_topic_deltas.size());
// First manager now has one BE
ASSERT_EQ(1, cmm1.GetSnapshot()->current_backends.size());
// Hook up second callback and iterate with the result of the first manager
cmm2.SetLocalBeDescFn([b2]() { return b2; });
*ss_topic_delta = returned_topic_deltas[0];
returned_topic_deltas.clear();
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(1, returned_topic_deltas.size());
ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
// Send the returned update to the first manager, this time no deltas will be returned
*ss_topic_delta = returned_topic_deltas[0];
returned_topic_deltas.clear();
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
// Both managers now have the same state. Shutdown one of them and step through
// propagating the update.
b1->is_quiescing = true;
// Send an empty update to the 1st one to trigger propagation of the shutdown
returned_topic_deltas.clear();
topic_delta_map[topic_id] = empty_delta;
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
// The mgr will return its changed TBackendDescriptor
ASSERT_EQ(1, returned_topic_deltas.size());
// It will also remove itself from the executor group (but not the current backends).
ASSERT_EQ(1, GetDefaultGroupSize(cmm1));
ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
// Propagate the quiescing to the 2nd mgr
*ss_topic_delta = returned_topic_deltas[0];
returned_topic_deltas.clear();
ASSERT_EQ(2, GetDefaultGroupSize(cmm2));
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
// Delete the 1st backend from the 2nd one
ASSERT_EQ(1, ss_topic_delta->topic_entries.size());
ss_topic_delta->topic_entries[0].deleted = true;
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
}
// This test verifies the interaction between the ExecutorBlacklist and
// ClusterMembershipMgr.
TEST_F(ClusterMembershipMgrTest, ExecutorBlacklist) {
// Set some flags to make the blacklist timeout fairly small (50ms);
gflags::FlagSaver saver;
FLAGS_statestore_max_missed_heartbeats = 5;
FLAGS_statestore_heartbeat_frequency_ms = 10;
const int BLACKLIST_TIMEOUT_SLEEP_US = 100000;
const int NUM_BACKENDS = 3;
for (int i = 0; i < NUM_BACKENDS; ++i) CreateBackend();
EXPECT_EQ(NUM_BACKENDS, backends_.size());
EXPECT_EQ(backends_.size(), offline_.size());
while (!offline_.empty()) CreateCMM(offline_.front());
EXPECT_EQ(0, offline_.size());
EXPECT_EQ(NUM_BACKENDS, starting_.size());
while (!starting_.empty()) StartBackend(starting_.front());
EXPECT_EQ(0, starting_.size());
EXPECT_EQ(NUM_BACKENDS, running_.size());
// Assert that all backends know about each other and are all in the default executor
// group.
for (Backend* be : running_) {
EXPECT_EQ(running_.size(), be->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(running_.size(), GetDefaultGroupSize(*be->cmm));
}
// Tell a BE to blacklist itself, should have no effect.
backends_[0]->cmm->BlacklistExecutor(*backends_[0]->desc);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS, GetDefaultGroupSize(*backends_[0]->cmm));
// Tell a BE to blacklist another BE, should remove it from executor_groups but not
// current_backends.
backends_[0]->cmm->BlacklistExecutor(*backends_[1]->desc);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Blacklist a BE that is already blacklisted. Should have no effect.
backends_[0]->cmm->BlacklistExecutor(*backends_[1]->desc);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Sleep and check the node has been un-blacklisted.
usleep(BLACKLIST_TIMEOUT_SLEEP_US);
EXPECT_EQ(Poll(backends_[0].get()).size(), 0);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS, GetDefaultGroupSize(*backends_[0]->cmm));
// Blacklist the BE and sleep again.
backends_[0]->cmm->BlacklistExecutor(*backends_[1]->desc);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
usleep(BLACKLIST_TIMEOUT_SLEEP_US);
// Quiesce the blacklisted BE. The update to quiesce it will arrive in the same call to
// UpdateMembership() that it would have been un-blacklisted.
QuiesceBackend(backends_[1].get());
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Try blacklisting the quiesced BE, should have no effect.
backends_[0]->cmm->BlacklistExecutor(*backends_[1]->desc);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 1, GetDefaultGroupSize(*backends_[0]->cmm));
// Blacklist another BE and sleep.
backends_[0]->cmm->BlacklistExecutor(*backends_[2]->desc);
EXPECT_EQ(NUM_BACKENDS, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm));
usleep(BLACKLIST_TIMEOUT_SLEEP_US);
// Delete the blacklisted BE. The update to delete it will arrive in the same call to
// UpdateMembership() that it would have been un-blacklisted.
DeleteBackend(backends_[2].get());
EXPECT_EQ(NUM_BACKENDS - 1, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm));
}
// This test runs a group of 20 backends through their full lifecycle, validating that
// their state is correctly propagated through the cluster after every change.
TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
const int NUM_BACKENDS = 20;
for (int i = 0; i < NUM_BACKENDS; ++i) {
CreateBackend();
}
EXPECT_EQ(NUM_BACKENDS, backends_.size());
EXPECT_EQ(backends_.size(), offline_.size());
while (!offline_.empty()) CreateCMM(offline_.front());
ASSERT_EQ(0, offline_.size());
ASSERT_EQ(NUM_BACKENDS, starting_.size());
while (!starting_.empty()) StartBackend(starting_.front());
ASSERT_EQ(0, starting_.size());
ASSERT_EQ(NUM_BACKENDS, running_.size());
// Assert that all backends know about each other and are all in the default executor
// group.
for (Backend* be : running_) {
EXPECT_EQ(running_.size(), be->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(running_.size(), GetDefaultGroupSize(*be->cmm));
}
// Quiesce half of the backends.
for (int i = 0; quiescing_.size() < NUM_BACKENDS / 2; ++i) {
Backend* be = running_.front();
// All backends must still remain online
EXPECT_EQ(NUM_BACKENDS, be->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - i, GetDefaultGroupSize(*be->cmm));
QuiesceBackend(be);
// Make sure that the numbers drop
EXPECT_EQ(NUM_BACKENDS - i - 1, GetDefaultGroupSize(*be->cmm));
}
int num_still_running = NUM_BACKENDS - quiescing_.size();
ASSERT_EQ(num_still_running, running_.size());
ASSERT_EQ(NUM_BACKENDS / 2, quiescing_.size());
for (auto& be : backends_) {
// All backends are still registered
EXPECT_EQ(backends_.size(), be->cmm->GetSnapshot()->current_backends.size());
// Executor groups now show half of the backends remaining
EXPECT_EQ(num_still_running, GetDefaultGroupSize(*be->cmm));
}
// Delete half of the backends and make sure that the other half learned about it.
int to_delete = backends_.size() / 2;
int num_expected_alive = backends_.size() - to_delete;
for (int idx = 0; idx < to_delete; ++idx) {
// Will change backends_
if (idx % 2 == 0) {
DeleteBackend(running_.front());
} else {
DeleteBackend(quiescing_.front());
}
}
ASSERT_EQ(num_expected_alive, quiescing_.size() + running_.size());
for (Backend* be : quiescing_) {
EXPECT_EQ(num_expected_alive, be->cmm->GetSnapshot()->current_backends.size());
}
// Quiesce the remaining backends to validate that executor groups can scale to 0
// backends.
while (!running_.empty()) QuiesceBackend(running_.front());
for (auto& be : backends_) {
// Executor groups now are empty
EXPECT_EQ(0, GetDefaultGroupSize(*be->cmm));
}
}
/// This test executes a number of random changes to cluster membership. On every
/// iteration a new backend is created, and with some probability, a backend is quiesced,
/// removed from the cluster after having been quiesced before, or removed from the
/// cluster without quiescing. The test relies on the consistency checks built into the
/// ClusterMembershipMgr to ensure that it is in a consistent state.
TEST_F(ClusterMembershipMgrTest, RandomizedMembershipUpdates) {
// TODO: Parameterize this test and run with several parameter sets
const int NUM_ITERATIONS = 100;
const double P_ADD = 1;
const double P_QUIESCE = 0.35;
const double P_DELETE = 0.30;
const double P_KILL = 0.2;
// Cumulative counts of how many backends were added/shutdown/deleted/killed by the
// tests.
int num_added = 0;
int num_shutdown = 0;
int num_deleted = 0;
// In this test "killing" a backend means deleting it without quiescing it first to
// simulate non-graceful failures.
int num_killed = 0;
for (int i = 0; i < NUM_ITERATIONS; ++i) {
double p = RandomDoubleFraction();
if (p < P_ADD) {
Backend* be = CreateBackend(i);
CreateCMM(be);
StartBackend(be);
++num_added;
}
if (p < P_QUIESCE && !running_.empty()) {
int idx = RandomInt(running_.size());
Backend* be = running_[idx];
QuiesceBackend(be);
++num_shutdown;
}
if (p < P_DELETE && !quiescing_.empty()) {
int idx = RandomInt(quiescing_.size());
Backend* be = quiescing_[idx];
DeleteBackend(be);
++num_deleted;
}
if (p < P_KILL && !running_.empty()) {
int idx = RandomInt(running_.size());
Backend* be = running_[idx];
DeleteBackend(be);
++num_killed;
}
}
std::cout << "Added: " << num_added << ", shutdown: " << num_shutdown << ", deleted: "
<< num_deleted << ", killed: " << num_killed << endl;
}
/// TODO: Write a test that makes a number of random changes to cluster membership while
/// not maintaining the proper lifecycle steps that a backend goes through (create, start,
/// quiesce, delete).
} // end namespace impala