blob: 44b28a172ff65dd67a803b0fb698818d405560af [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/test_graph.h"
#include <mutex>
#include <ostream>
#include <thread>
#include <utility>
#include <glog/logging.h>
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/faststring.h"
#include "kudu/util/monotime.h"
using std::shared_ptr;
using std::string;
using std::thread;
namespace kudu {
void TimeSeries::AddValue(double val) {
std::lock_guard<simple_spinlock> l(lock_);
val_ += val;
}
void TimeSeries::SetValue(double val) {
std::lock_guard<simple_spinlock> l(lock_);
val_ = val;
}
double TimeSeries::value() const {
std::lock_guard<simple_spinlock> l(lock_);
return val_;
}
TimeSeriesCollector::~TimeSeriesCollector() {
if (started_) {
StopDumperThread();
}
}
shared_ptr<TimeSeries> TimeSeriesCollector::GetTimeSeries(const string& key) {
MutexLock l(series_lock_);
SeriesMap::const_iterator it = series_map_.find(key);
if (it != series_map_.end()) {
return (*it).second;
}
auto ts(std::make_shared<TimeSeries>());
series_map_[key] = ts;
return ts;
}
void TimeSeriesCollector::StartDumperThread() {
LOG(INFO) << "Starting metrics dumper";
CHECK(!started_);
exit_latch_.Reset(1);
started_ = true;
dumper_thread_ = thread([this]() { this->DumperThread(); });
}
void TimeSeriesCollector::StopDumperThread() {
CHECK(started_);
exit_latch_.CountDown();
dumper_thread_.join();
started_ = false;
}
void TimeSeriesCollector::DumperThread() {
CHECK(started_);
WallTime start_time = WallTime_Now();
faststring metrics_str;
while (true) {
metrics_str.clear();
metrics_str.append("metrics: ");
BuildMetricsString(WallTime_Now() - start_time, &metrics_str);
LOG(INFO) << metrics_str.ToString();
// Sleep until next dump time, or return if we should exit
if (exit_latch_.WaitFor(MonoDelta::FromMilliseconds(250))) {
return;
}
}
}
void TimeSeriesCollector::BuildMetricsString(
WallTime time_since_start, faststring* dst_buf) const {
MutexLock l(series_lock_);
dst_buf->append(StringPrintf("{ \"scope\": \"%s\", \"time\": %.3f",
scope_.c_str(), time_since_start));
for (SeriesMap::const_reference entry : series_map_) {
dst_buf->append(StringPrintf(", \"%s\": %.3f",
entry.first.c_str(), entry.second->value()));
}
dst_buf->append("}");
}
} // namespace kudu