blob: 51572848d2c1ea990bcec3ec82fb54de87a10959 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.appmaster
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
import org.apache.gearpump.cluster.ClientToMaster.QueryHistoryMetrics
import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.metrics.Metrics.{Counter, Histogram, Meter}
import org.apache.gearpump.util.HistoryMetricsService
import org.apache.gearpump.util.HistoryMetricsService._
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
import scala.concurrent.Await
class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
val count = 2
val intervalMs = 10
val config = HistoryMetricsConfig(
retainHistoryDataHours = 72,
retainHistoryDataIntervalMs = 3600 * 1000,
retainRecentDataSeconds = 300,
retainRecentDataIntervalMs = 15 * 1000)
"SingleValueMetricsStore" should "retain metrics and expire old value" in {
val store = new SingleValueMetricsStore(count, intervalMs)
var now = 0L
// Only 1 data point will be kept in @intervalMs
store.add(Counter("count", 1), now)
store.add(Counter("count", 2), now)
now = now + intervalMs + 1
// Only 1 data point will be kept in @intervalMs
store.add(Counter("count", 3), now)
store.add(Counter("count", 4), now)
now = now + intervalMs + 1
// Only 1 data point will be kept in @intervalMs
// expire oldest data point, because we only keep @count records
store.add(Counter("count", 5), now)
store.add(Counter("count", 6), now)
val result = store.read
assert(result.size == count)
// The oldest value is expired
assert(result.head.value.asInstanceOf[Counter].value == 3L)
// The newest value is inserted
assert(result.last.value.asInstanceOf[Counter].value == 5L)
}
val meterTemplate = Meter("meter", 0, 0, 0, "s")
"HistogramMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
val store = new HistogramMetricsStore(config)
val a = Histogram(null, 100, 0, 0, 0, 0, 0)
val b = Histogram(null, 200, 0, 0, 0, 0, 0)
val c = Histogram(null, 50, 0, 0, 0, 0, 0)
store.add(a)
store.add(b)
store.add(c)
assert(store.readLatest.map(_.value) == List(c))
assert(store.readRecent.map(_.value) == List(a))
assert(store.readHistory.map(_.value) == List(a))
}
"MeterMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
val store = new MeterMetricsStore(config)
val a = Meter(null, 1, 100, 0, null)
val b = Meter(null, 1, 200, 0, null)
val c = Meter(null, 1, 50, 0, null)
store.add(a)
store.add(b)
store.add(c)
assert(store.readLatest.map(_.value) == List(c))
assert(store.readRecent.map(_.value) == List(a))
assert(store.readHistory.map(_.value) == List(a))
}
"CounterMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
val store = new CounterMetricsStore(config)
val a = Counter(null, 50)
val b = Counter(null, 100)
val c = Counter(null, 150)
store.add(a)
store.add(b)
store.add(c)
assert(store.readLatest.map(_.value) == List(c))
assert(store.readRecent.map(_.value) == List(a))
assert(store.readHistory.map(_.value) == List(a))
}
"HistoryMetricsService" should
"retain lastest metrics data and allow user to query metrics by path" in {
implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
val appId = 0
val service = system.actorOf(Props(new HistoryMetricsService("app0", config)))
service ! Counter("metric.counter", 0)
service ! Meter("metric.meter", 0, 0, 0, null)
service ! Histogram("metric.histogram", 0, 0, 0, 0, 0, 0)
val client = TestProbe()
// Filters metrics with path "metric.counter"
client.send(service, QueryHistoryMetrics("metric.counter"))
import scala.concurrent.duration._
client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
assert(history.path == "metric.counter")
val metricList = history.metrics
metricList.foreach(metricItem =>
assert(metricItem.value.isInstanceOf[Counter])
)
}
// Filters metrics with path "metric.meter"
client.send(service, QueryHistoryMetrics("metric.meter"))
client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
assert(history.path == "metric.meter")
val metricList = history.metrics
metricList.foreach(metricItem =>
assert(metricItem.value.isInstanceOf[Meter])
)
}
// Filters metrics with path "metric.histogram"
client.send(service, QueryHistoryMetrics("metric.histogram"))
client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
assert(history.path == "metric.histogram")
val metricList = history.metrics
metricList.foreach(metricItem =>
assert(metricItem.value.isInstanceOf[Histogram])
)
}
// Filters metrics with path prefix "metric", all metrics which can
// match the path prefix will be retained.
client.send(service, QueryHistoryMetrics("metric"))
client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
val metricList = history.metrics
var counterFound = false
var meterFound = false
var histogramFound = false
metricList.foreach(metricItem =>
metricItem.value match {
case v: Counter => counterFound = true
case v: Meter => meterFound = true
case v: Histogram => histogramFound = true
case _ => // Skip
}
)
// All kinds of metric type are reserved.
assert(counterFound && meterFound && histogramFound)
}
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
}
}