blob: 5804b0072f2595b5956288f973cef6d634083dfb [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.gearpump.streaming.metrics
import scala.collection.JavaConverters._
import scala.util.Random
import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.cluster.ClientToMaster.ReadOption
import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
import org.apache.gearpump.metrics.Metrics.{Gauge, Histogram, Meter}
import org.apache.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, HistogramAggregator, MeterAggregator, MultiLayerMap}
import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
class ProcessorAggregatorSpec extends FlatSpec with Matchers {
"MultiLayerMap" should "maintain multiple layers HashMap" in {
val layers = 3
val map = new MultiLayerMap[String](layers)
assert(map.get(layer = 0, "key") == null)
// Illegal, handle safely
assert(map.get(layer = 10, "key") == null)
map.put(layer = 0, "key", "value")
assert(map.get(layer = 0, "key") == "value")
map.put(layer = 1, "key2", "value2")
map.put(layer = 2, "key3", "value3")
map.put(layer = 2, "key4", "value4")
// Illegal, should be ignored
map.put(layer = 4, "key5", "value5")
assert(map.size == 4)
assert(map.valueIterator.asScala.toSet == Set("value", "value2", "value3", "value4"))
"HistogramAggregator" should "aggregate by calculating average" in {
val aggregator = new HistogramAggregator("processor")
val a = Histogram("processor.task1", 1, 2, 3, 4, 5, 6)
val b = Histogram("processor.task2", 5, 6, 7, 8, 9, 10)
val expect = Histogram("processor.task2", 3, 4, 5, 6, 7, 8)
val olderTime = 100
val newerTime = 200
aggregator.aggregate(HistoryMetricsItem(time = newerTime, a))
aggregator.aggregate(HistoryMetricsItem(time = olderTime, b))
val result = aggregator.result
// Picks old time as aggregated time
assert(result.time == olderTime)
// Does average
val check = result.value.asInstanceOf[Histogram]
assert(check.mean - expect.mean < 0.01)
assert(check.stddev - expect.stddev < 0.01)
assert(check.median - expect.median < 0.01)
assert(check.p95 - expect.p95 < 0.01)
assert(check.p99 - expect.p99 < 0.01)
assert(check.p999 - expect.p999 < 0.01)
"MeterAggregator" should "aggregate by summing" in {
val aggregator = new MeterAggregator("processor")
val a = Meter("processor.task1", count = 1, 1, 3, "s")
val b = Meter("processor.task2", count = 2, 5, 7, "s")
val expect = Meter("processor.task2", count = 3, 6, 10, "s")
val olderTime = 100
val newerTime = 200
aggregator.aggregate(HistoryMetricsItem(time = newerTime, a))
aggregator.aggregate(HistoryMetricsItem(time = olderTime, b))
val result = aggregator.result
// Picks old time
assert(result.time == olderTime)
// Does summing
val check = result.value.asInstanceOf[Meter]
assert(check.count == expect.count)
assert(check.m1 - expect.m1 < 0.01)
assert(check.meanRate - expect.meanRate < 0.01)
assert(check.rateUnit == expect.rateUnit)
"AggregatorFactory" should "create aggregator" in {
val factory = new AggregatorFactory()
val a = Meter("processor.task1", count = 1, 1, 3, "s")
val b = Histogram("processor.task1", 1, 2, 3, 4, 5, 6)
val aggegator1 = factory.create(HistoryMetricsItem(time = 0, a), "name1")
val aggegator2 = factory.create(HistoryMetricsItem(time = 0, b), "name2")
"ProcessorAggregator" should "aggregate on different read options" in {
val hours = 2 // Maintains 2 hours history
val seconds = 2 // Maintains 2 seconds recent data
val taskCount = 5 // For each processor
val metricCount = 100 // For each task, have metricCount metrics
val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000,
seconds, seconds / 2 * 1000)
val aggregator = new ProcessorAggregator(range)
def count(value: Int): Int = value
def inputs(timeRange: Long): List[HistoryMetricsItem] = {
(0 until taskCount).map(TaskId(processorId = 0, _))
.flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 0, _))
.flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 1, _))
.flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 1, _))
.flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 0, _))
.flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 0, _))
.flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 1, _))
.flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++
(0 until taskCount).map(TaskId(processorId = 1, _))
.flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList
def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): Boolean = {
val nameCount = => key).mapValues(_.size).toList.toMap
nameCount sameElements countMap
// Aggregates on processor and meterNames,
val input = inputs(Long.MaxValue)
val readLatest = aggregator.aggregate(ReadOption.ReadLatest,
input.iterator, now = Long.MaxValue)
assert(readLatest.size == 8) // 2 processor * 4 metrics type
assert(check(readLatest, Map(
"app0.processor0:receiveLatency" -> count(1),
"app0.processor0:processTime" -> count(1),
"app0.processor0:sendThroughput" -> count(1),
"app0.processor0:receiveThroughput" -> count(1),
"app0.processor1:receiveLatency" -> count(1),
"app0.processor1:processTime" -> count(1),
"app0.processor1:sendThroughput" -> count(1),
"app0.processor1:receiveThroughput" -> count(1)
// Aggregates on processor and meterNames and time range,
val readRecent = aggregator.aggregate(ReadOption.ReadRecent,
inputs(seconds * 1000).iterator, now = seconds * 1000)
assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time range
assert(check(readRecent, Map(
"app0.processor0:receiveLatency" -> count(2),
"app0.processor0:processTime" -> count(2),
"app0.processor0:sendThroughput" -> count(2),
"app0.processor0:receiveThroughput" -> count(2),
"app0.processor1:receiveLatency" -> count(2),
"app0.processor1:processTime" -> count(2),
"app0.processor1:sendThroughput" -> count(2),
"app0.processor1:receiveThroughput" -> count(2)
// Aggregates on processor and meterNames and time range,
val readHistory = aggregator.aggregate(ReadOption.ReadHistory,
inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000)
assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time ranges
assert(check(readHistory, Map(
"app0.processor0:receiveLatency" -> count(2),
"app0.processor0:processTime" -> count(2),
"app0.processor0:sendThroughput" -> count(2),
"app0.processor0:receiveThroughput" -> count(2),
"app0.processor1:receiveLatency" -> count(2),
"app0.processor1:processTime" -> count(2),
"app0.processor1:sendThroughput" -> count(2),
"app0.processor1:receiveThroughput" -> count(2)
private def histogram(
taskId: TaskId, metricName: String = "latency", timeRange: Long = Long.MaxValue,
repeat: Int = 1): List[HistoryMetricsItem] = {
val random = new Random()
(0 until repeat).map { _ =>
new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
private def meter(taskId: TaskId, metricName: String, timeRange: Long, repeat: Int)
: List[HistoryMetricsItem] = {
val random = new Random()
(0 until repeat).map { _ =>
new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
new Meter(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
"ProcessorAggregator" should "handle smoothly for unsupported metric type and " +
"error formatted metric name" in {
val invalid = List(
// Unsupported metric type
HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)),
// Wrong format: should be app0.processor0.task0:throughput
HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, ""))
val valid = histogram(TaskId(0, 0), repeat = 10)
val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 0))
val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ invalid).toIterator,
now = Long.MaxValue)
// For one taskId, will only use one data point.
assert(result.size == 1)
assert( == "app0.processor0:latency")