| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/maintenance_manager.h" |
| |
| #include <algorithm> |
| #include <cinttypes> |
| #include <cmath> |
| #include <cstdint> |
| #include <memory> |
| #include <mutex> |
| #include <sstream> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| |
| #include "kudu/gutil/dynamic_annotations.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/debug/trace_logging.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/maintenance_manager.pb.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/process_memory.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/thread.h" |
| #include "kudu/util/threadpool.h" |
| #include "kudu/util/trace.h" |
| |
| using std::pair; |
| using std::string; |
| using std::vector; |
| using strings::Split; |
| using strings::Substitute; |
| |
| DEFINE_int32(maintenance_manager_num_threads, 1, |
| "Size of the maintenance manager thread pool. " |
| "For spinning disks, the number of threads should " |
| "not be above the number of devices."); |
| TAG_FLAG(maintenance_manager_num_threads, stable); |
| |
| DEFINE_int32(maintenance_manager_polling_interval_ms, 250, |
| "Polling interval for the maintenance manager scheduler, " |
| "in milliseconds."); |
| TAG_FLAG(maintenance_manager_polling_interval_ms, hidden); |
| |
| DEFINE_int32(maintenance_manager_history_size, 8, |
| "Number of completed operations the manager keeps track of."); |
| TAG_FLAG(maintenance_manager_history_size, hidden); |
| |
| DEFINE_bool(enable_maintenance_manager, true, |
| "Enable the maintenance manager, which runs flush, compaction, " |
| "and garbage collection operations on tablets."); |
| TAG_FLAG(enable_maintenance_manager, unsafe); |
| |
| DEFINE_int64(log_target_replay_size_mb, 1024, |
| "The target maximum size of logs to be replayed at startup. If a tablet " |
| "has in-memory operations that are causing more than this size of logs " |
| "to be retained, then the maintenance manager will prioritize flushing " |
| "these operations to disk."); |
| TAG_FLAG(log_target_replay_size_mb, experimental); |
| |
| DEFINE_int64(data_gc_min_size_mb, 0, |
| "The (exclusive) minimum number of mebibytes of ancient data on " |
| "disk, per tablet, needed to prioritize deletion of that data."); |
| TAG_FLAG(data_gc_min_size_mb, experimental); |
| |
| DEFINE_double(data_gc_prioritization_prob, 0.5, |
| "The probability that we will prioritize data GC over performance " |
| "improvement operations. If set to 1.0, we will always prefer to " |
| "delete old data before running performance improvement operations " |
| "such as delta compaction."); |
| TAG_FLAG(data_gc_prioritization_prob, experimental); |
| |
| DEFINE_double(maintenance_op_multiplier, 1.1, |
| "Multiplier applied on different priority levels, table maintenance OPs on level N " |
| "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be " |
| "multiplied by this multiplier. Note: this multiplier is only take effect on " |
| "compaction OPs"); |
| TAG_FLAG(maintenance_op_multiplier, advanced); |
| TAG_FLAG(maintenance_op_multiplier, experimental); |
| TAG_FLAG(maintenance_op_multiplier, runtime); |
| |
| DEFINE_int32(max_priority_range, 5, |
| "Maximal priority range of OPs."); |
| TAG_FLAG(max_priority_range, advanced); |
| TAG_FLAG(max_priority_range, experimental); |
| TAG_FLAG(max_priority_range, runtime); |
| |
| namespace kudu { |
| |
| MaintenanceOpStats::MaintenanceOpStats() { |
| Clear(); |
| } |
| |
| void MaintenanceOpStats::Clear() { |
| valid_ = false; |
| runnable_ = false; |
| ram_anchored_ = 0; |
| logs_retained_bytes_ = 0; |
| data_retained_bytes_ = 0; |
| perf_improvement_ = 0; |
| last_modified_ = MonoTime(); |
| } |
| |
| MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage) |
| : name_(std::move(name)), |
| running_(0), |
| cancel_(false), |
| io_usage_(io_usage) { |
| } |
| |
| MaintenanceOp::~MaintenanceOp() { |
| CHECK(!manager_.get()) << "You must unregister the " << name_ |
| << " Op before destroying it."; |
| } |
| |
| void MaintenanceOp::Unregister() { |
| CHECK(manager_.get()) << "Op " << name_ << " is not registered."; |
| manager_->UnregisterOp(this); |
| } |
| |
| MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const { |
| MaintenanceManagerStatusPB_OpInstancePB pb; |
| pb.set_thread_id(thread_id); |
| pb.set_name(name); |
| if (duration.Initialized()) { |
| pb.set_duration_millis(static_cast<int32_t>(duration.ToMilliseconds())); |
| } |
| MonoDelta delta(MonoTime::Now() - start_mono_time); |
| pb.set_millis_since_start(static_cast<int32_t>(delta.ToMilliseconds())); |
| return pb; |
| } |
| |
| const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = { |
| .num_threads = 0, |
| .polling_interval_ms = 0, |
| .history_size = 0, |
| }; |
| |
| MaintenanceManager::MaintenanceManager(const Options& options, |
| string server_uuid) |
| : server_uuid_(std::move(server_uuid)), |
| num_threads_(options.num_threads <= 0 ? |
| FLAGS_maintenance_manager_num_threads : options.num_threads), |
| cond_(&lock_), |
| shutdown_(false), |
| polling_interval_ms_(options.polling_interval_ms <= 0 ? |
| FLAGS_maintenance_manager_polling_interval_ms : |
| options.polling_interval_ms), |
| running_ops_(0), |
| completed_ops_count_(0), |
| rand_(GetRandomSeed32()), |
| memory_pressure_func_(&process_memory::UnderMemoryPressure) { |
| CHECK_OK(ThreadPoolBuilder("MaintenanceMgr") |
| .set_min_threads(num_threads_) |
| .set_max_threads(num_threads_) |
| .Build(&thread_pool_)); |
| uint32_t history_size = options.history_size == 0 ? |
| FLAGS_maintenance_manager_history_size : |
| options.history_size; |
| completed_ops_.resize(history_size); |
| } |
| |
| MaintenanceManager::~MaintenanceManager() { |
| Shutdown(); |
| } |
| |
| Status MaintenanceManager::Start() { |
| CHECK(!monitor_thread_); |
| RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler", |
| [this]() { this->RunSchedulerThread(); }, |
| &monitor_thread_)); |
| return Status::OK(); |
| } |
| |
| void MaintenanceManager::Shutdown() { |
| { |
| std::lock_guard<Mutex> guard(lock_); |
| if (shutdown_) { |
| return; |
| } |
| shutdown_ = true; |
| cond_.Broadcast(); |
| } |
| if (monitor_thread_.get()) { |
| CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join()); |
| monitor_thread_.reset(); |
| // Wait for all the running and queued tasks before shutting down. Otherwise, |
| // Shutdown() can remove a queued task silently. We count on eventually running the |
| // queued tasks to decrement their "running" count, which is incremented at the time |
| // they are enqueued. |
| thread_pool_->Wait(); |
| thread_pool_->Shutdown(); |
| } |
| } |
| |
| void MaintenanceManager::RegisterOp(MaintenanceOp* op) { |
| CHECK(op); |
| std::lock_guard<Mutex> guard(lock_); |
| CHECK(!op->manager_) << "Tried to register " << op->name() |
| << ", but it is already registered."; |
| pair<OpMapTy::iterator, bool> val |
| (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats()))); |
| CHECK(val.second) << "Tried to register " << op->name() |
| << ", but it already exists in ops_."; |
| op->manager_ = shared_from_this(); |
| op->cond_.reset(new ConditionVariable(&lock_)); |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Registered " << op->name(); |
| } |
| |
| void MaintenanceManager::UnregisterOp(MaintenanceOp* op) { |
| { |
| std::lock_guard<Mutex> guard(lock_); |
| CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name() |
| << ", but it is not currently registered with this maintenance manager."; |
| auto iter = ops_.find(op); |
| CHECK(iter != ops_.end()) << "Tried to unregister " << op->name() |
| << ", but it was never registered"; |
| // While the op is running, wait for it to be finished. |
| if (iter->first->running_ > 0) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Waiting for op " << op->name() |
| << " to finish so we can unregister it."; |
| } |
| op->CancelAndDisable(); |
| while (iter->first->running_ > 0) { |
| op->cond_->Wait(); |
| iter = ops_.find(op); |
| CHECK(iter != ops_.end()) << "Tried to unregister " << op->name() |
| << ", but another thread unregistered it while we were " |
| << "waiting for it to complete"; |
| } |
| ops_.erase(iter); |
| } |
| VLOG_WITH_PREFIX(1) << "Unregistered op " << op->name(); |
| op->cond_.reset(); |
| // Remove the op's shared_ptr reference to us. This might 'delete this'. |
| op->manager_.reset(); |
| } |
| |
| bool MaintenanceManager::disabled_for_tests() const { |
| return !ANNOTATE_UNPROTECTED_READ(FLAGS_enable_maintenance_manager); |
| } |
| |
| void MaintenanceManager::RunSchedulerThread() { |
| if (!FLAGS_enable_maintenance_manager) { |
| LOG(INFO) << "Maintenance manager is disabled. Stopping thread."; |
| return; |
| } |
| |
| MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_); |
| |
| std::unique_lock<Mutex> guard(lock_); |
| |
| // Set to true if the scheduler runs and finds that there is no work to do. |
| bool prev_iter_found_no_work = false; |
| |
| while (true) { |
| // We'll keep sleeping if: |
| // 1) there are no free threads available to perform a maintenance op. |
| // or 2) we just tried to schedule an op but found nothing to run. |
| // However, if it's time to shut down, we want to do so immediately. |
| while (CouldNotLaunchNewOp(prev_iter_found_no_work)) { |
| cond_.WaitFor(polling_interval); |
| prev_iter_found_no_work = false; |
| } |
| if (shutdown_) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Shutting down maintenance manager."; |
| return; |
| } |
| |
| // If we found no work to do, then we should sleep before trying again to schedule. |
| // Otherwise, we can go right into trying to find the next op. |
| prev_iter_found_no_work = !FindAndLaunchOp(&guard); |
| } |
| } |
| |
| bool MaintenanceManager::FindAndLaunchOp(std::unique_lock<Mutex>* guard) { |
| // Find the best op. |
| auto best_op_and_why = FindBestOp(); |
| auto* op = best_op_and_why.first; |
| const auto& note = best_op_and_why.second; |
| |
| if (!op) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << "No maintenance operations look worth doing."; |
| return false; |
| } |
| |
| // Prepare the maintenance operation. |
| IncreaseOpCount(op); |
| guard->unlock(); |
| bool ready = op->Prepare(); |
| guard->lock(); |
| if (!ready) { |
| LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() |
| << ". Re-running scheduler."; |
| DecreaseOpCount(op); |
| op->cond_->Signal(); |
| return true; |
| } |
| |
| LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO) |
| << Substitute("Scheduling $0: $1", op->name(), note); |
| // Run the maintenance operation. |
| CHECK_OK(thread_pool_->Submit([this, op]() { this->LaunchOp(op); })); |
| return true; |
| } |
| |
| // Finding the best operation goes through some filters: |
| // - If there's an Op that we can run quickly that frees log retention, run it |
| // (e.g. GCing WAL segments). |
| // - If we've hit the overall process memory limit (note: this includes memory |
| // that the Ops cannot free), we run the Op that retains the most WAL |
| // segments, which will free memory (e.g. MRS or DMS flush). |
| // - If there Ops that are retaining logs past our target replay size, we |
| // run the one that has the highest retention, and if many qualify, then we |
| // run the one that also frees up the most RAM (e.g. MRS or DMS flush). |
| // - If there are Ops that we can run that free disk space, run whichever frees |
| // the most space (e.g. GCing ancient deltas). |
| // - Finally, if there's nothing else that we really need to do, we run the Op |
| // that will improve performance the most. |
| // |
| // In general, we want to prioritize limiting the amount of expensive resources |
| // we hold onto. Low IO ops that free WAL disk space are preferred, followed by |
| // ops that free memory, then ops that free data disk space, then ops that |
| // improve performance. |
| pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() { |
| TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp"); |
| |
| if (!HasFreeThreads()) { |
| return {nullptr, "no free threads"}; |
| } |
| |
| int64_t low_io_most_logs_retained_bytes = 0; |
| MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr; |
| |
| int64_t most_logs_retained_bytes = 0; |
| int64_t most_logs_retained_bytes_ram_anchored = 0; |
| MaintenanceOp* most_logs_retained_bytes_ram_anchored_op = nullptr; |
| |
| int64_t most_data_retained_bytes = 0; |
| MaintenanceOp* most_data_retained_bytes_op = nullptr; |
| |
| double best_perf_improvement = 0; |
| MaintenanceOp* best_perf_improvement_op = nullptr; |
| for (auto& val : ops_) { |
| MaintenanceOp* op(val.first); |
| MaintenanceOpStats& stats(val.second); |
| VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name(); |
| // Update op stats. |
| stats.Clear(); |
| op->UpdateStats(&stats); |
| if (op->cancelled() || !stats.valid() || !stats.runnable()) { |
| continue; |
| } |
| |
| const auto logs_retained_bytes = stats.logs_retained_bytes(); |
| if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE && |
| logs_retained_bytes > low_io_most_logs_retained_bytes) { |
| low_io_most_logs_retained_bytes_op = op; |
| low_io_most_logs_retained_bytes = logs_retained_bytes; |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << Substitute("Op $0 can free $1 bytes of logs", |
| op->name(), logs_retained_bytes); |
| } |
| |
| // We prioritize ops that can free more logs, but when it's the same we |
| // pick the one that also frees up the most memory. |
| const auto ram_anchored = stats.ram_anchored(); |
| if (std::make_pair(logs_retained_bytes, ram_anchored) > |
| std::make_pair(most_logs_retained_bytes, |
| most_logs_retained_bytes_ram_anchored)) { |
| most_logs_retained_bytes_ram_anchored_op = op; |
| most_logs_retained_bytes = logs_retained_bytes; |
| most_logs_retained_bytes_ram_anchored = ram_anchored; |
| } |
| |
| const auto data_retained_bytes = stats.data_retained_bytes(); |
| if (data_retained_bytes > most_data_retained_bytes) { |
| most_data_retained_bytes_op = op; |
| most_data_retained_bytes = data_retained_bytes; |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << Substitute("Op $0 can free $1 bytes of data", |
| op->name(), data_retained_bytes); |
| } |
| |
| const auto perf_improvement = PerfImprovement(stats.perf_improvement(), op->priority()); |
| if ((!best_perf_improvement_op) || |
| (perf_improvement > best_perf_improvement)) { |
| best_perf_improvement_op = op; |
| best_perf_improvement = perf_improvement; |
| } |
| } |
| |
| // Look at ops that we can run quickly that free up log retention. |
| if (low_io_most_logs_retained_bytes_op && low_io_most_logs_retained_bytes > 0) { |
| string notes = Substitute("free $0 bytes of WAL", low_io_most_logs_retained_bytes); |
| return {low_io_most_logs_retained_bytes_op, std::move(notes)}; |
| } |
| |
| // Look at free memory. If it is dangerously low, we must select something |
| // that frees memory -- ignore the target replay size and flush whichever op |
| // anchors the most WALs (the op should also free memory). |
| // |
| // Why not select the op that frees the most memory? Such a heuristic could |
| // lead to starvation of ops that consume less memory, e.g. we might always |
| // choose to do MRS flushes even when there are small, long-lived DMSes that |
| // are anchoring WALs. Choosing the op that frees the most WALs ensures that |
| // all ops that anchor memory (and also anchor WALs) will eventually be |
| // performed. |
| double capacity_pct; |
| if (memory_pressure_func_(&capacity_pct) && most_logs_retained_bytes_ram_anchored_op) { |
| DCHECK_GT(most_logs_retained_bytes_ram_anchored, 0); |
| string note = StringPrintf("under memory pressure (%.2f%% used), " |
| "%" PRIu64 " bytes log retention, and flush " |
| "%" PRIu64 " bytes memory", capacity_pct, |
| most_logs_retained_bytes, |
| most_logs_retained_bytes_ram_anchored); |
| return {most_logs_retained_bytes_ram_anchored_op, std::move(note)}; |
| } |
| |
| // Look at ops that free up more log retention, and also free up more memory. |
| if (most_logs_retained_bytes_ram_anchored_op && |
| most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) { |
| string note = Substitute("$0 bytes log retention, and flush $1 bytes memory", |
| most_logs_retained_bytes, |
| most_logs_retained_bytes_ram_anchored); |
| return {most_logs_retained_bytes_ram_anchored_op, std::move(note)}; |
| } |
| |
| // Look at ops that free up data on disk. To avoid starvation of |
| // performance-improving ops, we might skip freeing disk space. |
| if (most_data_retained_bytes_op && |
| most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) { |
| if (!best_perf_improvement_op || best_perf_improvement <= 0 || |
| rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) { |
| string note = Substitute("$0 bytes on disk", most_data_retained_bytes); |
| return {most_data_retained_bytes_op, std::move(note)}; |
| } |
| VLOG(1) << "Skipping data GC due to prioritizing perf improvement"; |
| } |
| |
| // Look at ops that can improve read/write performance most. |
| if (best_perf_improvement_op && best_perf_improvement > 0) { |
| string note = StringPrintf("perf score=%.6f", best_perf_improvement); |
| return {best_perf_improvement_op, std::move(note)}; |
| } |
| return {nullptr, "no ops with positive improvement"}; |
| } |
| |
| double MaintenanceManager::PerfImprovement(double perf_improvement, int32_t priority) const { |
| if (priority == 0) { |
| return perf_improvement; |
| } |
| |
| priority = std::max(priority, -FLAGS_max_priority_range); |
| priority = std::min(priority, FLAGS_max_priority_range); |
| return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier, priority); |
| } |
| |
| void MaintenanceManager::LaunchOp(MaintenanceOp* op) { |
| int64_t thread_id = Thread::CurrentThreadId(); |
| OpInstance op_instance; |
| op_instance.thread_id = thread_id; |
| op_instance.name = op->name(); |
| op_instance.start_mono_time = MonoTime::Now(); |
| op->RunningGauge()->Increment(); |
| { |
| std::lock_guard<Mutex> lock(running_instances_lock_); |
| InsertOrDie(&running_instances_, thread_id, &op_instance); |
| } |
| |
| SCOPED_CLEANUP({ |
| op->RunningGauge()->Decrement(); |
| |
| std::lock_guard<Mutex> l(lock_); |
| { |
| std::lock_guard<Mutex> lock(running_instances_lock_); |
| running_instances_.erase(thread_id); |
| } |
| op_instance.duration = MonoTime::Now() - op_instance.start_mono_time; |
| completed_ops_[completed_ops_count_ % completed_ops_.size()] = op_instance; |
| completed_ops_count_++; |
| |
| op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds()); |
| DecreaseOpCount(op); |
| op->cond_->Signal(); |
| cond_.Signal(); // Wake up scheduler. |
| }); |
| |
| scoped_refptr<Trace> trace(new Trace); |
| Stopwatch sw; |
| sw.start(); |
| { |
| ADOPT_TRACE(trace.get()); |
| TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp", |
| "name", op->name()); |
| op->Perform(); |
| sw.stop(); |
| } |
| LOG_WITH_PREFIX(INFO) << Substitute("$0 complete. Timing: $1 Metrics: $2", |
| op->name(), |
| sw.elapsed().ToString(), |
| trace->MetricsAsJSON()); |
| } |
| |
| void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) { |
| DCHECK(out_pb != nullptr); |
| std::lock_guard<Mutex> guard(lock_); |
| for (const auto& val : ops_) { |
| MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations(); |
| MaintenanceOp* op(val.first); |
| const MaintenanceOpStats& stats(val.second); |
| op_pb->set_name(op->name()); |
| op_pb->set_running(op->running()); |
| if (stats.valid()) { |
| op_pb->set_runnable(stats.runnable()); |
| op_pb->set_ram_anchored_bytes(stats.ram_anchored()); |
| op_pb->set_logs_retained_bytes(stats.logs_retained_bytes()); |
| op_pb->set_perf_improvement(stats.perf_improvement()); |
| } else { |
| op_pb->set_runnable(false); |
| op_pb->set_ram_anchored_bytes(0); |
| op_pb->set_logs_retained_bytes(0); |
| op_pb->set_perf_improvement(0.0); |
| } |
| } |
| |
| { |
| std::lock_guard<Mutex> lock(running_instances_lock_); |
| for (const auto& running_instance : running_instances_) { |
| *out_pb->add_running_operations() = running_instance.second->DumpToPB(); |
| } |
| } |
| |
| // The latest completed op will be dumped at first. |
| for (int n = 1; n <= completed_ops_.size(); n++) { |
| int64_t i = completed_ops_count_ - n; |
| if (i < 0) break; |
| const auto& completed_op = completed_ops_[i % completed_ops_.size()]; |
| |
| if (!completed_op.name.empty()) { |
| *out_pb->add_completed_operations() = completed_op.DumpToPB(); |
| } |
| } |
| } |
| |
| string MaintenanceManager::LogPrefix() const { |
| return Substitute("P $0: ", server_uuid_); |
| } |
| |
| bool MaintenanceManager::HasFreeThreads() { |
| return num_threads_ - running_ops_ > 0; |
| } |
| |
| bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) { |
| return (!HasFreeThreads() || prev_iter_found_no_work || disabled_for_tests()) && !shutdown_; |
| } |
| |
| void MaintenanceManager::IncreaseOpCount(MaintenanceOp *op) { |
| op->running_++; |
| running_ops_++; |
| } |
| |
| void MaintenanceManager::DecreaseOpCount(MaintenanceOp *op) { |
| op->running_--; |
| running_ops_--; |
| } |
| |
| } // namespace kudu |