blob: 6777e06b81cbcc1a85e0ae9cd002805fc85463c4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <boost/bind.hpp> // IWYU pragma: keep
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/maintenance_manager.pb.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
METRIC_DEFINE_entity(test);
METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
"Number of Maintenance Operations Running",
kudu::MetricUnit::kMaintenanceOperations,
"The number of background maintenance operations currently running.");
METRIC_DEFINE_histogram(test, maintenance_op_duration,
"Maintenance Operation Duration",
kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
DECLARE_int64(log_target_replay_size_mb);
namespace kudu {
static const int kHistorySize = 4;
static const char kFakeUuid[] = "12345";
class MaintenanceManagerTest : public KuduTest {
public:
void SetUp() override {
MaintenanceManager::Options options;
options.num_threads = 2;
options.polling_interval_ms = 1;
options.history_size = kHistorySize;
manager_.reset(new MaintenanceManager(options, kFakeUuid));
manager_->set_memory_pressure_func_for_tests(
[&](double* consumption) {
return indicate_memory_pressure_.load();
});
ASSERT_OK(manager_->Start());
}
void TearDown() override {
manager_->Shutdown();
}
protected:
shared_ptr<MaintenanceManager> manager_;
std::atomic<bool> indicate_memory_pressure_ { false };
};
// Just create the MaintenanceManager and then shut it down, to make sure
// there are no race conditions there.
TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
}
class TestMaintenanceOp : public MaintenanceOp {
public:
TestMaintenanceOp(const std::string& name,
IOUsage io_usage)
: MaintenanceOp(name, io_usage),
ram_anchored_(500),
logs_retained_bytes_(0),
perf_improvement_(0),
metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)),
remaining_runs_(1),
prepared_runs_(0),
sleep_time_(MonoDelta::FromSeconds(0)) {
}
virtual ~TestMaintenanceOp() {}
virtual bool Prepare() OVERRIDE {
std::lock_guard<Mutex> guard(lock_);
if (remaining_runs_ == 0) {
return false;
}
remaining_runs_--;
prepared_runs_++;
DLOG(INFO) << "Prepared op " << name();
return true;
}
virtual void Perform() OVERRIDE {
{
std::lock_guard<Mutex> guard(lock_);
DLOG(INFO) << "Performing op " << name();
// Ensure that we don't call Perform() more times than we returned
// success from Prepare().
CHECK_GE(prepared_runs_, 1);
prepared_runs_--;
}
SleepFor(sleep_time_);
}
virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
std::lock_guard<Mutex> guard(lock_);
stats->set_runnable(remaining_runs_ > 0);
stats->set_ram_anchored(ram_anchored_);
stats->set_logs_retained_bytes(logs_retained_bytes_);
stats->set_perf_improvement(perf_improvement_);
}
void set_remaining_runs(int runs) {
std::lock_guard<Mutex> guard(lock_);
remaining_runs_ = runs;
}
void set_sleep_time(MonoDelta time) {
std::lock_guard<Mutex> guard(lock_);
sleep_time_ = time;
}
void set_ram_anchored(uint64_t ram_anchored) {
std::lock_guard<Mutex> guard(lock_);
ram_anchored_ = ram_anchored;
}
void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
std::lock_guard<Mutex> guard(lock_);
logs_retained_bytes_ = logs_retained_bytes;
}
void set_perf_improvement(uint64_t perf_improvement) {
std::lock_guard<Mutex> guard(lock_);
perf_improvement_ = perf_improvement;
}
virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
return maintenance_op_duration_;
}
virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
return maintenance_ops_running_;
}
private:
Mutex lock_;
uint64_t ram_anchored_;
uint64_t logs_retained_bytes_;
uint64_t perf_improvement_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
scoped_refptr<Histogram> maintenance_op_duration_;
scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
// The number of remaining times this operation will run before disabling
// itself.
int remaining_runs_;
// The number of Prepared() operations which have not yet been Perform()ed.
int prepared_runs_;
// The amount of time each op invocation will sleep.
MonoDelta sleep_time_;
};
// Create an op and wait for it to start running. Unregister it while it is
// running and verify that UnregisterOp waits for it to finish before
// proceeding.
TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
op1.set_perf_improvement(10);
// Register initially with no remaining runs. We'll later enable it once it's
// already registered.
op1.set_remaining_runs(0);
manager_->RegisterOp(&op1);
scoped_refptr<kudu::Thread> thread;
CHECK_OK(Thread::Create(
"TestThread", "TestRegisterUnregister",
boost::bind(&TestMaintenanceOp::set_remaining_runs, &op1, 1), &thread));
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(op1.DurationHistogram()->TotalCount(), 1);
});
manager_->UnregisterOp(&op1);
ThreadJoiner(thread.get()).Join();
}
// Regression test for KUDU-1495: when an operation is being unregistered,
// new instances of that operation should not be scheduled.
TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
op1.set_perf_improvement(10);
// Set the op to run up to 10 times, and each time should sleep for a second.
op1.set_remaining_runs(10);
op1.set_sleep_time(MonoDelta::FromSeconds(1));
manager_->RegisterOp(&op1);
// Wait until two instances of the ops start running, since we have two MM threads.
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(op1.RunningGauge()->value(), 2);
});
// Trigger Unregister while they are running. This should wait for the currently-
// running operations to complete, but no new operations should be scheduled.
manager_->UnregisterOp(&op1);
// Hence, we should have run only the original two that we saw above.
ASSERT_LE(op1.DurationHistogram()->TotalCount(), 2);
}
// Test that we'll run an operation that doesn't improve performance when memory
// pressure gets high.
TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
op.set_ram_anchored(100);
manager_->RegisterOp(&op);
// At first, we don't want to run this, since there is no perf_improvement.
SleepFor(MonoDelta::FromMilliseconds(20));
ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
// Fake that the server is under memory pressure.
indicate_memory_pressure_ = true;
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
});
manager_->UnregisterOp(&op);
}
// Test that ops are prioritized correctly when we add log retention.
TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
const int64_t kMB = 1024 * 1024;
manager_->Shutdown();
TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
op1.set_ram_anchored(0);
op1.set_logs_retained_bytes(100 * kMB);
TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE);
op2.set_ram_anchored(100);
op2.set_logs_retained_bytes(100 * kMB);
TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE);
op3.set_ram_anchored(200);
op3.set_logs_retained_bytes(100 * kMB);
manager_->RegisterOp(&op1);
manager_->RegisterOp(&op2);
manager_->RegisterOp(&op3);
// We want to do the low IO op first since it clears up some log retention.
auto op_and_why = manager_->FindBestOp();
ASSERT_EQ(&op1, op_and_why.first);
EXPECT_EQ(op_and_why.second, "free 104857600 bytes of WAL");
manager_->UnregisterOp(&op1);
// Low IO is taken care of, now we find the op that clears the most log retention and ram.
// However, with the default settings, we won't bother running any of these operations
// which only retain 100MB of logs.
op_and_why = manager_->FindBestOp();
ASSERT_EQ(nullptr, op_and_why.first);
EXPECT_EQ(op_and_why.second, "no ops with positive improvement");
// If we change the target WAL size, we will select these ops.
FLAGS_log_target_replay_size_mb = 50;
op_and_why = manager_->FindBestOp();
ASSERT_EQ(&op3, op_and_why.first);
EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
manager_->UnregisterOp(&op3);
op_and_why = manager_->FindBestOp();
ASSERT_EQ(&op2, op_and_why.first);
EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
manager_->UnregisterOp(&op2);
}
// Test retrieving a list of an op's running instances
TEST_F(MaintenanceManagerTest, TestRunningInstances) {
TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
op.set_perf_improvement(10);
op.set_remaining_runs(2);
op.set_sleep_time(MonoDelta::FromSeconds(1));
manager_->RegisterOp(&op);
// Check that running instances are added to the maintenance manager's collection,
// and fields are getting filled.
ASSERT_EVENTUALLY([&]() {
MaintenanceManagerStatusPB status_pb;
manager_->GetMaintenanceManagerStatusDump(&status_pb);
ASSERT_EQ(status_pb.running_operations_size(), 2);
const MaintenanceManagerStatusPB_OpInstancePB& instance1 = status_pb.running_operations(0);
const MaintenanceManagerStatusPB_OpInstancePB& instance2 = status_pb.running_operations(1);
ASSERT_EQ(instance1.name(), op.name());
ASSERT_NE(instance1.thread_id(), instance2.thread_id());
});
// Wait for instances to complete.
manager_->UnregisterOp(&op);
// Check that running instances are removed from collection after completion.
MaintenanceManagerStatusPB status_pb;
manager_->GetMaintenanceManagerStatusDump(&status_pb);
ASSERT_EQ(status_pb.running_operations_size(), 0);
}
// Test adding operations and make sure that the history of recently completed operations
// is correct in that it wraps around and doesn't grow.
TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
for (int i = 0; i < 5; i++) {
string name = Substitute("op$0", i);
TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
op.set_perf_improvement(1);
op.set_ram_anchored(100);
manager_->RegisterOp(&op);
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
});
manager_->UnregisterOp(&op);
MaintenanceManagerStatusPB status_pb;
manager_->GetMaintenanceManagerStatusDump(&status_pb);
// The size should be at most the history_size.
ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
// The most recently completed op should always be first, even if we wrap
// around.
ASSERT_EQ(name, status_pb.completed_operations(0).name());
}
}
} // namespace kudu