| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/maintenance_manager.h" |
| |
| #include <algorithm> |
| #include <cinttypes> |
| #include <cmath> |
| #include <cstdint> |
| #include <memory> |
| #include <mutex> |
| #include <sstream> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| |
| #include "kudu/gutil/dynamic_annotations.h" |
| #include "kudu/gutil/integral_types.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/debug/trace_logging.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/maintenance_manager.pb.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/process_memory.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/thread.h" |
| #include "kudu/util/threadpool.h" |
| #include "kudu/util/trace.h" |
| |
| using std::pair; |
| using std::string; |
| using std::vector; |
| using strings::Split; |
| using strings::Substitute; |
| |
| DEFINE_int32(maintenance_manager_num_threads, 1, |
| "Size of the maintenance manager thread pool. " |
| "For spinning disks, the number of threads should " |
| "not be above the number of devices."); |
| TAG_FLAG(maintenance_manager_num_threads, stable); |
| DEFINE_validator(maintenance_manager_num_threads, |
| [](const char* /*n*/, int32 v) { return v > 0; }); |
| |
| DEFINE_int32(maintenance_manager_polling_interval_ms, 250, |
| "Polling interval for the maintenance manager scheduler, " |
| "in milliseconds."); |
| TAG_FLAG(maintenance_manager_polling_interval_ms, hidden); |
| |
| DEFINE_int32(maintenance_manager_history_size, 8, |
| "Number of completed operations the manager keeps track of."); |
| TAG_FLAG(maintenance_manager_history_size, hidden); |
| |
| DEFINE_bool(enable_maintenance_manager, true, |
| "Enable the maintenance manager, which runs flush, compaction, " |
| "and garbage collection operations on tablets."); |
| TAG_FLAG(enable_maintenance_manager, unsafe); |
| TAG_FLAG(enable_maintenance_manager, runtime); |
| |
| DEFINE_int64(log_target_replay_size_mb, 1024, |
| "The target maximum size of logs to be replayed at startup. If a tablet " |
| "has in-memory operations that are causing more than this size of logs " |
| "to be retained, then the maintenance manager will prioritize flushing " |
| "these operations to disk."); |
| TAG_FLAG(log_target_replay_size_mb, experimental); |
| |
| DEFINE_int64(data_gc_min_size_mb, 0, |
| "The (exclusive) minimum number of mebibytes of ancient data on " |
| "disk, per tablet, needed to prioritize deletion of that data."); |
| TAG_FLAG(data_gc_min_size_mb, experimental); |
| |
| DEFINE_double(data_gc_prioritization_prob, 0.5, |
| "The probability that we will prioritize data GC over performance " |
| "improvement operations. If set to 1.0, we will always prefer to " |
| "delete old data before running performance improvement operations " |
| "such as delta compaction."); |
| TAG_FLAG(data_gc_prioritization_prob, experimental); |
| |
| DEFINE_double(maintenance_op_multiplier, 1.1, |
| "Multiplier applied on different priority levels, table maintenance OPs on level N " |
| "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be " |
| "multiplied by this multiplier. Note: this multiplier is only take effect on " |
| "compaction OPs"); |
| TAG_FLAG(maintenance_op_multiplier, advanced); |
| TAG_FLAG(maintenance_op_multiplier, experimental); |
| TAG_FLAG(maintenance_op_multiplier, runtime); |
| |
| DEFINE_int32(max_priority_range, 5, |
| "Maximal priority range of OPs."); |
| TAG_FLAG(max_priority_range, advanced); |
| TAG_FLAG(max_priority_range, experimental); |
| TAG_FLAG(max_priority_range, runtime); |
| |
| DEFINE_int32(maintenance_manager_inject_latency_ms, 0, |
| "Injects latency into maintenance thread. For use in tests only."); |
| TAG_FLAG(maintenance_manager_inject_latency_ms, runtime); |
| TAG_FLAG(maintenance_manager_inject_latency_ms, unsafe); |
| |
| namespace kudu { |
| |
| MaintenanceOpStats::MaintenanceOpStats() { |
| Clear(); |
| } |
| |
| void MaintenanceOpStats::Clear() { |
| valid_ = false; |
| runnable_ = false; |
| ram_anchored_ = 0; |
| logs_retained_bytes_ = 0; |
| data_retained_bytes_ = 0; |
| perf_improvement_ = 0; |
| workload_score_ = 0; |
| last_modified_ = MonoTime(); |
| } |
| |
| MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage) |
| : name_(std::move(name)), |
| io_usage_(io_usage), |
| running_(0), |
| cancel_(false) { |
| } |
| |
| MaintenanceOp::~MaintenanceOp() { |
| CHECK(!manager_.get()) << "You must unregister the " << name_ |
| << " Op before destroying it."; |
| } |
| |
| void MaintenanceOp::Unregister() { |
| CHECK(manager_.get()) << "Op " << name_ << " is not registered."; |
| manager_->UnregisterOp(this); |
| } |
| |
| MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const { |
| MaintenanceManagerStatusPB_OpInstancePB pb; |
| pb.set_thread_id(thread_id); |
| pb.set_name(name); |
| if (duration.Initialized()) { |
| pb.set_duration_millis(static_cast<int32_t>(duration.ToMilliseconds())); |
| } |
| MonoDelta delta(MonoTime::Now() - start_mono_time); |
| pb.set_millis_since_start(static_cast<int32_t>(delta.ToMilliseconds())); |
| return pb; |
| } |
| |
| const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = { |
| .num_threads = 0, |
| .polling_interval_ms = 0, |
| .history_size = 0, |
| }; |
| |
| MaintenanceManager::MaintenanceManager( |
| const Options& options, |
| string server_uuid, |
| const scoped_refptr<MetricEntity>& metric_entity) |
| : server_uuid_(std::move(server_uuid)), |
| num_threads_(options.num_threads > 0 |
| ? options.num_threads |
| : FLAGS_maintenance_manager_num_threads), |
| polling_interval_(MonoDelta::FromMilliseconds( |
| options.polling_interval_ms > 0 |
| ? options.polling_interval_ms |
| : FLAGS_maintenance_manager_polling_interval_ms)), |
| cond_(&lock_), |
| shutdown_(false), |
| running_ops_(0), |
| completed_ops_count_(0), |
| rand_(GetRandomSeed32()), |
| memory_pressure_func_(&process_memory::UnderMemoryPressure), |
| metrics_(CHECK_NOTNULL(metric_entity)) { |
| CHECK_OK(ThreadPoolBuilder("MaintenanceMgr") |
| .set_min_threads(num_threads_) |
| .set_max_threads(num_threads_) |
| .Build(&thread_pool_)); |
| uint32_t history_size = options.history_size == 0 ? |
| FLAGS_maintenance_manager_history_size : |
| options.history_size; |
| completed_ops_.resize(history_size); |
| } |
| |
| MaintenanceManager::~MaintenanceManager() { |
| Shutdown(); |
| } |
| |
| Status MaintenanceManager::Start() { |
| CHECK(!monitor_thread_); |
| return Thread::Create("maintenance", "maintenance_scheduler", |
| [this]() { this->RunSchedulerThread(); }, |
| &monitor_thread_); |
| } |
| |
| void MaintenanceManager::Shutdown() { |
| { |
| std::lock_guard<Mutex> guard(lock_); |
| if (shutdown_) { |
| return; |
| } |
| shutdown_ = true; |
| cond_.Broadcast(); |
| } |
| if (monitor_thread_.get()) { |
| CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join()); |
| monitor_thread_.reset(); |
| // Wait for all the running and queued tasks before shutting down. Otherwise, |
| // Shutdown() can remove a queued task silently. We count on eventually running the |
| // queued tasks to decrement their "running" count, which is incremented at the time |
| // they are enqueued. |
| thread_pool_->Wait(); |
| thread_pool_->Shutdown(); |
| } |
| } |
| |
| void MaintenanceManager::MergePendingOpRegistrationsUnlocked() { |
| lock_.AssertAcquired(); |
| OpMapType ops_to_register; |
| { |
| std::lock_guard<simple_spinlock> l(registration_lock_); |
| ops_to_register = std::move(ops_pending_registration_); |
| ops_pending_registration_.clear(); |
| } |
| for (auto& op_and_stats : ops_to_register) { |
| auto* op = op_and_stats.first; |
| op->cond_.reset(new ConditionVariable(&running_instances_lock_)); |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Registered " << op->name(); |
| } |
| ops_.insert(ops_to_register.begin(), ops_to_register.end()); |
| } |
| |
| void MaintenanceManager::RegisterOp(MaintenanceOp* op) { |
| CHECK(op); |
| { |
| std::lock_guard<simple_spinlock> l(registration_lock_); |
| CHECK(!op->manager_) << "Tried to register " << op->name() |
| << ", but it is already registered."; |
| EmplaceOrDie(&ops_pending_registration_, op, MaintenanceOpStats()); |
| op->manager_ = shared_from_this(); |
| } |
| // If we can take 'lock_', add to 'ops_' immediately. |
| if (lock_.try_lock()) { |
| MergePendingOpRegistrationsUnlocked(); |
| lock_.unlock(); |
| } |
| } |
| |
| void MaintenanceManager::UnregisterOp(MaintenanceOp* op) { |
| CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name() |
| << ", but it is not currently registered with this maintenance manager."; |
| op->CancelAndDisable(); |
| |
| // While the op is running, wait for it to be finished. |
| { |
| std::lock_guard<Mutex> guard(running_instances_lock_); |
| if (op->running_ > 0) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) |
| << Substitute("Waiting for op $0 to finish so we can unregister it", op->name()); |
| } |
| while (op->running_ > 0) { |
| op->cond_->Wait(); |
| } |
| } |
| |
| // Remove the op from 'ops_', and if it wasn't there, erase it from |
| // 'ops_pending_registration_'. |
| { |
| std::lock_guard<Mutex> guard(lock_); |
| if (ops_.erase(op) == 0) { |
| std::lock_guard<simple_spinlock> l(registration_lock_); |
| const auto num_erased_ops = ops_pending_registration_.erase(op); |
| CHECK_GT(num_erased_ops, 0); |
| } |
| } |
| VLOG_WITH_PREFIX(1) << "Unregistered op " << op->name(); |
| op->cond_.reset(); |
| // Remove the op's shared_ptr reference to us. This might 'delete this'. |
| op->manager_.reset(); |
| } |
| |
| bool MaintenanceManager::disabled_for_tests() const { |
| return !ANNOTATE_UNPROTECTED_READ(FLAGS_enable_maintenance_manager); |
| } |
| |
| void MaintenanceManager::RunSchedulerThread() { |
| // Set to true if the scheduler runs and finds that there is no work to do. |
| bool prev_iter_found_no_work = false; |
| |
| while (true) { |
| if (!FLAGS_enable_maintenance_manager) { |
| { |
| std::unique_lock<Mutex> guard(lock_); |
| if (shutdown_) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Shutting down maintenance manager."; |
| return; |
| } |
| } |
| KLOG_EVERY_N_SECS(INFO, 1200) |
| << "Maintenance manager is disabled (check --enable_maintenance_manager)."; |
| SleepFor(polling_interval_); |
| continue; |
| } |
| MaintenanceOp* op = nullptr; |
| string op_note; |
| { |
| std::unique_lock<Mutex> guard(lock_); |
| // Upon each iteration, we should have dropped and reacquired 'lock_'. |
| // Register any ops that may have been buffered for registration while the |
| // lock was last held. |
| MergePendingOpRegistrationsUnlocked(); |
| |
| // We'll keep sleeping if: |
| // 1) there are no free threads available to perform a maintenance op. |
| // or 2) we just tried to schedule an op but found nothing to run. |
| // However, if it's time to shut down, we want to do so immediately. |
| while (CouldNotLaunchNewOp(prev_iter_found_no_work)) { |
| cond_.WaitFor(polling_interval_); |
| prev_iter_found_no_work = false; |
| } |
| if (shutdown_) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Shutting down maintenance manager."; |
| return; |
| } |
| |
| if (PREDICT_FALSE(FLAGS_maintenance_manager_inject_latency_ms > 0)) { |
| LOG(WARNING) << "Injecting " << FLAGS_maintenance_manager_inject_latency_ms |
| << "ms of latency into maintenance thread"; |
| SleepFor(MonoDelta::FromMilliseconds(FLAGS_maintenance_manager_inject_latency_ms)); |
| } |
| |
| // Find the best op. If we found no work to do, then we should sleep |
| // before trying again to schedule. Otherwise, we can go right into trying |
| // to find the next op. |
| { |
| auto best_op_and_why = FindBestOp(); |
| op = best_op_and_why.first; |
| op_note = std::move(best_op_and_why.second); |
| } |
| if (op) { |
| // While 'running_instances_lock_' is held, check one more time for |
| // whether the op is cancelled. This ensures that we don't attempt to |
| // launch an op that has been destructed in UnregisterOp(). See |
| // KUDU-3268 for more details. |
| std::lock_guard<Mutex> guard(running_instances_lock_); |
| if (op->cancelled()) { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << "picked maintenance operation that has been cancelled"; |
| continue; |
| } |
| IncreaseOpCount(op); |
| prev_iter_found_no_work = false; |
| } else { |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << "no maintenance operations look worth doing"; |
| prev_iter_found_no_work = true; |
| continue; |
| } |
| } |
| |
| // Prepare the maintenance operation. |
| DCHECK(op); |
| if (!op->Prepare()) { |
| LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() |
| << ". Re-running scheduler."; |
| metrics_.SubmitOpPrepareFailed(); |
| std::lock_guard<Mutex> guard(running_instances_lock_); |
| DecreaseOpCountAndNotifyWaiters(op); |
| continue; |
| } |
| |
| LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO) |
| << Substitute("Scheduling $0: $1", op->name(), op_note); |
| // Submit the maintenance operation to be run on the "MaintenanceMgr" pool. |
| CHECK_OK(thread_pool_->Submit([this, op]() { this->LaunchOp(op); })); |
| } |
| } |
| |
| // Finding the best operation goes through some filters: |
| // - If there's an Op that we can run quickly that frees log retention, run it |
| // (e.g. GCing WAL segments). |
| // - If we've hit the overall process memory limit (note: this includes memory |
| // that the Ops cannot free), we run the Op that retains the most WAL |
| // segments, which will free memory (e.g. MRS or DMS flush). |
| // - If there Ops that are retaining logs past our target replay size, we |
| // run the one that has the highest retention, and if many qualify, then we |
| // run the one that also frees up the most RAM (e.g. MRS or DMS flush). |
| // - If there are Ops that we can run that free disk space, run whichever frees |
| // the most space (e.g. GCing ancient deltas). |
| // - Finally, if there's nothing else that we really need to do, we run the Op |
| // that will improve performance the most. |
| // |
| // In general, we want to prioritize limiting the amount of expensive resources |
| // we hold onto. Low IO ops that free WAL disk space are preferred, followed by |
| // ops that free memory, then ops that free data disk space, then ops that |
| // improve performance. |
| pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, 10000, "finding best maintenance operation"); |
| TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp"); |
| |
| if (!HasFreeThreads()) { |
| return {nullptr, "no free threads"}; |
| } |
| |
| const auto start_time = MonoTime::Now(); |
| SCOPED_CLEANUP({ |
| metrics_.SubmitOpFindDuration(MonoTime::Now() - start_time); |
| }); |
| |
| int64_t low_io_most_logs_retained_bytes = 0; |
| MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr; |
| |
| int64_t most_logs_retained_bytes = 0; |
| int64_t most_logs_retained_bytes_ram_anchored = 0; |
| MaintenanceOp* most_logs_retained_bytes_ram_anchored_op = nullptr; |
| |
| int64_t most_data_retained_bytes = 0; |
| MaintenanceOp* most_data_retained_bytes_op = nullptr; |
| |
| double best_perf_improvement = 0; |
| MaintenanceOp* best_perf_improvement_op = nullptr; |
| for (auto& val : ops_) { |
| MaintenanceOp* op(val.first); |
| MaintenanceOpStats& stats(val.second); |
| VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name(); |
| // Update op stats. |
| stats.Clear(); |
| op->UpdateStats(&stats); |
| if (op->cancelled() || !stats.valid() || !stats.runnable()) { |
| continue; |
| } |
| |
| const auto logs_retained_bytes = stats.logs_retained_bytes(); |
| if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE && |
| logs_retained_bytes > low_io_most_logs_retained_bytes) { |
| low_io_most_logs_retained_bytes_op = op; |
| low_io_most_logs_retained_bytes = logs_retained_bytes; |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << Substitute("Op $0 can free $1 bytes of logs", |
| op->name(), logs_retained_bytes); |
| } |
| |
| // We prioritize ops that can free more logs, but when it's the same we |
| // pick the one that also frees up the most memory. |
| const auto ram_anchored = stats.ram_anchored(); |
| if (std::make_pair(logs_retained_bytes, ram_anchored) > |
| std::make_pair(most_logs_retained_bytes, |
| most_logs_retained_bytes_ram_anchored)) { |
| most_logs_retained_bytes_ram_anchored_op = op; |
| most_logs_retained_bytes = logs_retained_bytes; |
| most_logs_retained_bytes_ram_anchored = ram_anchored; |
| } |
| |
| const auto data_retained_bytes = stats.data_retained_bytes(); |
| if (data_retained_bytes > most_data_retained_bytes) { |
| most_data_retained_bytes_op = op; |
| most_data_retained_bytes = data_retained_bytes; |
| VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) |
| << Substitute("Op $0 can free $1 bytes of data", |
| op->name(), data_retained_bytes); |
| } |
| |
| const auto perf_improvement = |
| AdjustedPerfScore(stats.perf_improvement(), stats.workload_score(), op->priority()); |
| if ((!best_perf_improvement_op) || |
| (perf_improvement > best_perf_improvement)) { |
| best_perf_improvement_op = op; |
| best_perf_improvement = perf_improvement; |
| } |
| } |
| |
| // Look at ops that we can run quickly that free up log retention. |
| if (low_io_most_logs_retained_bytes_op && low_io_most_logs_retained_bytes > 0) { |
| string notes = Substitute("free $0 bytes of WAL", low_io_most_logs_retained_bytes); |
| return {low_io_most_logs_retained_bytes_op, std::move(notes)}; |
| } |
| |
| // Look at free memory. If it is dangerously low, we must select something |
| // that frees memory -- ignore the target replay size and flush whichever op |
| // anchors the most WALs (the op should also free memory). |
| // |
| // Why not select the op that frees the most memory? Such a heuristic could |
| // lead to starvation of ops that consume less memory, e.g. we might always |
| // choose to do MRS flushes even when there are small, long-lived DMSes that |
| // are anchoring WALs. Choosing the op that frees the most WALs ensures that |
| // all ops that anchor memory (and also anchor WALs) will eventually be |
| // performed. |
| double capacity_pct; |
| if (memory_pressure_func_(&capacity_pct) && most_logs_retained_bytes_ram_anchored_op) { |
| DCHECK_GT(most_logs_retained_bytes_ram_anchored, 0); |
| string note = StringPrintf("under memory pressure (%.2f%% used), " |
| "%" PRIu64 " bytes log retention, and flush " |
| "%" PRIu64 " bytes memory", capacity_pct, |
| most_logs_retained_bytes, |
| most_logs_retained_bytes_ram_anchored); |
| return {most_logs_retained_bytes_ram_anchored_op, std::move(note)}; |
| } |
| |
| // Look at ops that free up more log retention, and also free up more memory. |
| if (most_logs_retained_bytes_ram_anchored_op && |
| most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) { |
| string note = Substitute("$0 bytes log retention, and flush $1 bytes memory", |
| most_logs_retained_bytes, |
| most_logs_retained_bytes_ram_anchored); |
| return {most_logs_retained_bytes_ram_anchored_op, std::move(note)}; |
| } |
| |
| // Look at ops that free up data on disk. To avoid starvation of |
| // performance-improving ops, we might skip freeing disk space. |
| if (most_data_retained_bytes_op && |
| most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) { |
| if (!best_perf_improvement_op || best_perf_improvement <= 0 || |
| rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) { |
| string note = Substitute("$0 bytes on disk", most_data_retained_bytes); |
| return {most_data_retained_bytes_op, std::move(note)}; |
| } |
| VLOG(1) << "Skipping data GC due to prioritizing perf improvement"; |
| } |
| |
| // Look at ops that can improve read/write performance most. |
| if (best_perf_improvement_op && best_perf_improvement > 0) { |
| string note = StringPrintf("perf score=%.6f", best_perf_improvement); |
| return {best_perf_improvement_op, std::move(note)}; |
| } |
| return {nullptr, "no ops with positive improvement"}; |
| } |
| |
| double MaintenanceManager::AdjustedPerfScore(double perf_improvement, |
| double workload_score, |
| int32_t priority) { |
| if (perf_improvement == 0) { |
| return 0; |
| } |
| double perf_score = perf_improvement + workload_score; |
| if (priority == 0) { |
| return perf_score; |
| } |
| |
| priority = std::max(priority, -FLAGS_max_priority_range); |
| priority = std::min(priority, FLAGS_max_priority_range); |
| return perf_score * std::pow(FLAGS_maintenance_op_multiplier, priority); |
| } |
| |
| void MaintenanceManager::LaunchOp(MaintenanceOp* op) { |
| const auto thread_id = Thread::CurrentThreadId(); |
| OpInstance op_instance; |
| op_instance.thread_id = thread_id; |
| op_instance.name = op->name(); |
| op_instance.start_mono_time = MonoTime::Now(); |
| op->RunningGauge()->Increment(); |
| { |
| std::lock_guard<Mutex> lock(running_instances_lock_); |
| InsertOrDie(&running_instances_, thread_id, &op_instance); |
| } |
| |
| SCOPED_CLEANUP({ |
| // To avoid timing distortions in case of lock contention, it's important |
| // to take a snapshot of 'now' right after the operation completed |
| // before acquiring any locks in the code below. |
| const auto now = MonoTime::Now(); |
| |
| op->RunningGauge()->Decrement(); |
| { |
| std::lock_guard<Mutex> lock(running_instances_lock_); |
| running_instances_.erase(thread_id); |
| |
| op_instance.duration = now - op_instance.start_mono_time; |
| op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds()); |
| |
| DecreaseOpCountAndNotifyWaiters(op); |
| } |
| cond_.Signal(); // wake up the scheduler |
| |
| // Add corresponding entry into the completed_ops_ container. |
| { |
| std::lock_guard<simple_spinlock> lock(completed_ops_lock_); |
| completed_ops_[completed_ops_count_ % completed_ops_.size()] = |
| std::move(op_instance); |
| ++completed_ops_count_; |
| } |
| }); |
| |
| scoped_refptr<Trace> trace(new Trace); |
| Stopwatch sw; |
| sw.start(); |
| { |
| ADOPT_TRACE(trace.get()); |
| TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp", |
| "name", op->name()); |
| op->Perform(); |
| sw.stop(); |
| } |
| LOG_WITH_PREFIX(INFO) << Substitute("$0 complete. Timing: $1 Metrics: $2", |
| op->name(), |
| sw.elapsed().ToString(), |
| trace->MetricsAsJSON()); |
| } |
| |
| void MaintenanceManager::GetMaintenanceManagerStatusDump( |
| MaintenanceManagerStatusPB* out_pb) { |
| DCHECK(out_pb != nullptr); |
| std::lock_guard<Mutex> guard(lock_); |
| MergePendingOpRegistrationsUnlocked(); |
| for (const auto& val : ops_) { |
| auto* op_pb = out_pb->add_registered_operations(); |
| MaintenanceOp* op(val.first); |
| const MaintenanceOpStats& stats(val.second); |
| op_pb->set_name(op->name()); |
| op_pb->set_running(op->running()); |
| if (stats.valid()) { |
| op_pb->set_runnable(stats.runnable()); |
| op_pb->set_ram_anchored_bytes(stats.ram_anchored()); |
| op_pb->set_logs_retained_bytes(stats.logs_retained_bytes()); |
| op_pb->set_perf_improvement(stats.perf_improvement()); |
| op_pb->set_workload_score(stats.workload_score()); |
| op_pb->set_data_retained_bytes(stats.data_retained_bytes()); |
| } else { |
| op_pb->set_runnable(false); |
| op_pb->set_ram_anchored_bytes(0); |
| op_pb->set_logs_retained_bytes(0); |
| op_pb->set_perf_improvement(0.0); |
| op_pb->set_workload_score(0.0); |
| op_pb->set_data_retained_bytes(0); |
| } |
| } |
| |
| { |
| std::lock_guard<Mutex> lock(running_instances_lock_); |
| for (const auto& running_instance : running_instances_) { |
| *out_pb->add_running_operations() = running_instance.second->DumpToPB(); |
| } |
| } |
| |
| // The latest completed op will be dumped at first. |
| { |
| std::lock_guard<simple_spinlock> lock(completed_ops_lock_); |
| for (int n = 1; n <= completed_ops_.size(); ++n) { |
| if (completed_ops_count_ < n) { |
| break; |
| } |
| size_t i = completed_ops_count_ - n; |
| const auto& completed_op = completed_ops_[i % completed_ops_.size()]; |
| |
| if (!completed_op.name.empty()) { |
| *out_pb->add_completed_operations() = completed_op.DumpToPB(); |
| } |
| } |
| } |
| } |
| |
| string MaintenanceManager::LogPrefix() const { |
| return Substitute("P $0: ", server_uuid_); |
| } |
| |
| bool MaintenanceManager::HasFreeThreads() { |
| return num_threads_ > running_ops_; |
| } |
| |
| bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) { |
| lock_.AssertAcquired(); |
| return (!HasFreeThreads() || prev_iter_found_no_work || disabled_for_tests()) && !shutdown_; |
| } |
| |
| void MaintenanceManager::IncreaseOpCount(MaintenanceOp* op) { |
| running_instances_lock_.AssertAcquired(); |
| ++running_ops_; |
| ++op->running_; |
| } |
| |
| void MaintenanceManager::DecreaseOpCountAndNotifyWaiters(MaintenanceOp* op) { |
| running_instances_lock_.AssertAcquired(); |
| --running_ops_; |
| --op->running_; |
| op->cond_->Signal(); |
| } |
| |
| } // namespace kudu |