| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.util |
| |
| import java.util |
| import scala.collection.mutable.ListBuffer |
| |
| import akka.actor.Actor |
| import com.typesafe.config.Config |
| import org.slf4j.Logger |
| |
| import org.apache.gearpump.Time.MilliSeconds |
| import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption} |
| import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} |
| import org.apache.gearpump.metrics.Metrics._ |
| import org.apache.gearpump.metrics.MetricsAggregator |
| import org.apache.gearpump.util.Constants._ |
| import org.apache.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator} |
| |
| /** |
| * |
| * Metrics service to serve history metrics data |
| * |
| * For simplicity, HistoryMetricsService will maintain 72 hours coarse-grained data |
| * for last 72 hours, and fine-grained data for past 5 min. |
| * |
| * For the coarse-grained data of past 72 hours, one or two sample point will be stored |
| * for each hour. |
| * |
| * For fine-grained data in last 5 min, there will be 1 sample point per 15 seconds. |
| */ |
| class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends Actor { |
| private val LOG: Logger = LogUtil.getLogger(getClass, name = name) |
| private var metricsStore = Map.empty[String, HistoryMetricsStore] |
| private val systemConfig = context.system.settings.config |
| |
| def receive: Receive = metricHandler orElse commandHandler |
| def metricHandler: Receive = { |
| case ReportMetrics => |
| sender ! DemandMoreMetrics(self) |
| case metrics: MetricType => |
| val name = metrics.name |
| if (metricsStore.contains(name)) { |
| metricsStore(name).add(metrics) |
| } else { |
| val store = HistoryMetricsStore(name, metrics, config) |
| metricsStore += name -> store |
| store.add(metrics) |
| } |
| } |
| |
| private def toRegularExpression(input: String): String = { |
| "^" + input.flatMap { |
| case '*' => ".*" |
| case '?' => "." |
| case char if "()[]$^.{}|\\".contains(char) => "\\" + char |
| case other => s"$other" |
| } + ".*$" |
| } |
| |
| private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption) |
| : List[HistoryMetricsItem] = { |
| |
| val result = new ListBuffer[HistoryMetricsItem] |
| |
| val regex = toRegularExpression(pathPattern).r.pattern |
| |
| val iter = metricsStore.iterator |
| while (iter.hasNext) { |
| val (name, store) = iter.next() |
| |
| val matcher = regex.matcher(name) |
| if (matcher.matches()) { |
| readOption match { |
| case ReadOption.ReadLatest => |
| result.append(store.readLatest: _*) |
| case ReadOption.ReadRecent => |
| result.append(store.readRecent: _*) |
| case ReadOption.ReadHistory => |
| result.append(store.readHistory: _*) |
| case _ => |
| // Skip all other options. |
| } |
| } |
| } |
| result.toList |
| } |
| |
| val dummyAggregator = new DummyMetricsAggregator |
| private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, MetricsAggregator] |
| |
| import scala.collection.JavaConverters._ |
| private val validAggregators: Set[String] = { |
| val rootConfig = systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped |
| rootConfig.keySet().asScala.toSet |
| } |
| |
| def commandHandler: Receive = { |
| // Path accept syntax ? *, ? will match one char, * will match at least one char |
| case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) => |
| |
| val aggregator = { |
| if (aggregatorClazz == null || aggregatorClazz.isEmpty) { |
| dummyAggregator |
| } else if (aggregators.contains(aggregatorClazz)) { |
| aggregators(aggregatorClazz) |
| } else if (validAggregators.contains(aggregatorClazz)) { |
| val clazz = Class.forName(aggregatorClazz) |
| val constructor = clazz.getConstructor(classOf[Config]) |
| val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator] |
| aggregators += aggregatorClazz -> aggregator |
| aggregator |
| } else { |
| LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, " + |
| s"we will drop all messages. Please see config at ${GEARPUMP_METRICS_AGGREGATORS}") |
| val skipAll = new SkipAllAggregator |
| aggregators += aggregatorClazz -> new SkipAllAggregator |
| skipAll |
| } |
| } |
| |
| val metrics = fetchMetricsHistory(inputPath, readOption).iterator |
| sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, metrics)) |
| } |
| } |
| |
| object HistoryMetricsService { |
| |
| trait MetricsStore { |
| def add(inputMetrics: MetricType): Unit |
| |
| def read: List[HistoryMetricsItem] |
| |
| /** |
| * read latest inserted records |
| * @return |
| */ |
| def readLatest: List[HistoryMetricsItem] |
| } |
| |
| trait HistoryMetricsStore { |
| def add(inputMetrics: MetricType): Unit |
| |
| /** |
| * read latest inserted records |
| * @return |
| */ |
| def readLatest: List[HistoryMetricsItem] |
| |
| def readRecent: List[HistoryMetricsItem] |
| |
| def readHistory: List[HistoryMetricsItem] |
| } |
| |
| class DummyHistoryMetricsStore extends HistoryMetricsStore { |
| |
| val empty = List.empty[HistoryMetricsItem] |
| |
| override def add(inputMetrics: MetricType): Unit = Unit |
| |
| override def readRecent: List[HistoryMetricsItem] = empty |
| |
| /** |
| * read latest inserted records |
| * @return |
| */ |
| override def readLatest: List[HistoryMetricsItem] = empty |
| |
| override def readHistory: List[HistoryMetricsItem] = empty |
| } |
| |
| object HistoryMetricsStore { |
| def apply(name: String, metric: MetricType, config: HistoryMetricsConfig) |
| : HistoryMetricsStore = { |
| metric match { |
| case histogram: Histogram => new HistogramMetricsStore(config) |
| case meter: Meter => new MeterMetricsStore(config) |
| case counter: Counter => new CounterMetricsStore(config) |
| case gauge: Gauge => new GaugeMetricsStore(config) |
| case _ => new DummyHistoryMetricsStore // other metrics are not supported |
| } |
| } |
| } |
| |
| /** |
| * Metrics store to store history data points |
| * For each time point, we will store single data point. |
| * |
| * @param retainCount how many data points to retain, old data will be removed |
| * @param retainIntervalMs time interval between two data points. |
| */ |
| class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) extends MetricsStore { |
| |
| private val queue = new util.ArrayDeque[HistoryMetricsItem]() |
| private var latest = List.empty[HistoryMetricsItem] |
| |
| // End of the time window we are tracking |
| private var endTime = 0L |
| |
| override def add(inputMetrics: MetricType): Unit = { |
| add(inputMetrics, System.currentTimeMillis()) |
| } |
| |
| def add(inputMetrics: MetricType, now: MilliSeconds): Unit = { |
| |
| val metrics = HistoryMetricsItem(now, inputMetrics) |
| latest = List(metrics) |
| |
| if (now >= endTime) { |
| queue.addFirst(metrics) |
| endTime = (now / retainIntervalMs + 1) * retainIntervalMs |
| |
| // Removes old data |
| if (queue.size() > retainCount) { |
| queue.removeLast() |
| } |
| } |
| } |
| |
| def read: List[HistoryMetricsItem] = { |
| val result = new ListBuffer[HistoryMetricsItem] |
| import scala.collection.JavaConverters._ |
| queue.iterator().asScala.foreach(result.prepend(_)) |
| result.toList |
| } |
| |
| override def readLatest: List[HistoryMetricsItem] = { |
| latest |
| } |
| } |
| |
| /** |
| * Config for how long to keep history metrics data. |
| * |
| * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history data(unit hour) |
| * @param retainHistoryDataIntervalMs time interval between two history data points.(unit: ms) |
| * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS |
| * recent data points(unit: seconds) |
| * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent |
| * data points(unit: ms) |
| */ |
| case class HistoryMetricsConfig( |
| retainHistoryDataHours: Int, |
| retainHistoryDataIntervalMs: Int, |
| retainRecentDataSeconds: Int, |
| retainRecentDataIntervalMs: Int) |
| |
| object HistoryMetricsConfig { |
| def apply(config: Config): HistoryMetricsConfig = { |
| val historyHour = config.getInt(GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS) |
| val historyInterval = config.getInt(GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS) |
| |
| val recentSeconds = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_SECONDS) |
| val recentInterval = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS) |
| HistoryMetricsConfig(historyHour, historyInterval, recentSeconds, recentInterval) |
| } |
| } |
| |
| class HistogramMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { |
| |
| private val history = new SingleValueMetricsStore( |
| config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, |
| config.retainHistoryDataIntervalMs) |
| |
| private val recent = new SingleValueMetricsStore( |
| config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, |
| config.retainRecentDataIntervalMs) |
| |
| override def add(inputMetrics: MetricType): Unit = { |
| recent.add(inputMetrics) |
| history.add(inputMetrics) |
| } |
| |
| override def readRecent: List[HistoryMetricsItem] = { |
| recent.read |
| } |
| |
| override def readHistory: List[HistoryMetricsItem] = { |
| history.read |
| } |
| |
| override def readLatest: List[HistoryMetricsItem] = { |
| recent.readLatest |
| } |
| } |
| |
| class MeterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { |
| |
| private val history = new SingleValueMetricsStore( |
| config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, |
| config.retainHistoryDataIntervalMs) |
| |
| private val recent = new SingleValueMetricsStore( |
| config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, |
| config.retainRecentDataIntervalMs) |
| |
| override def add(inputMetrics: MetricType): Unit = { |
| recent.add(inputMetrics) |
| history.add(inputMetrics) |
| } |
| |
| override def readRecent: List[HistoryMetricsItem] = { |
| recent.read |
| } |
| |
| override def readHistory: List[HistoryMetricsItem] = { |
| history.read |
| } |
| |
| override def readLatest: List[HistoryMetricsItem] = { |
| recent.readLatest |
| } |
| } |
| |
| class CounterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { |
| |
| private val history = new SingleValueMetricsStore( |
| config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, |
| config.retainHistoryDataIntervalMs) |
| |
| private val recent = new SingleValueMetricsStore( |
| config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, |
| config.retainRecentDataIntervalMs) |
| |
| override def add(inputMetrics: MetricType): Unit = { |
| history.add(inputMetrics) |
| recent.add(inputMetrics) |
| } |
| |
| override def readRecent: List[HistoryMetricsItem] = { |
| recent.read |
| } |
| |
| override def readHistory: List[HistoryMetricsItem] = { |
| history.read |
| } |
| |
| override def readLatest: List[HistoryMetricsItem] = { |
| recent.readLatest |
| } |
| } |
| |
| class GaugeMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { |
| |
| private val history = new SingleValueMetricsStore( |
| config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, |
| config.retainHistoryDataIntervalMs) |
| |
| private val recent = new SingleValueMetricsStore( |
| config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, |
| config.retainRecentDataIntervalMs) |
| |
| override def add(inputMetrics: MetricType): Unit = { |
| recent.add(inputMetrics) |
| history.add(inputMetrics) |
| } |
| |
| override def readRecent: List[HistoryMetricsItem] = { |
| recent.read |
| } |
| |
| override def readHistory: List[HistoryMetricsItem] = { |
| history.read |
| } |
| |
| override def readLatest: List[HistoryMetricsItem] = { |
| recent.readLatest |
| } |
| } |
| |
| class DummyMetricsAggregator extends MetricsAggregator { |
| def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) |
| : List[HistoryMetricsItem] = { |
| inputs.toList |
| } |
| } |
| |
| class SkipAllAggregator extends MetricsAggregator { |
| private val empty = List.empty[HistoryMetricsItem] |
| def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) |
| : List[HistoryMetricsItem] = { |
| empty |
| } |
| } |
| } |