blob: 3ef93d510fe95ae250a823ae062e346a167787ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.monitoring.metrics
import java.time.Duration
import com.typesafe.config.{Config, ConfigFactory}
import kamon.metric.PeriodSnapshot
import kamon.module.MetricReporter
import kamon.Kamon
import kamon.tag.Lookups
import org.apache.openwhisk.core.connector.{Activation, EventMessage}
import org.apache.openwhisk.core.entity.{ActivationResponse, Subject, UUID}
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with KamonMetricNames {
var reporter: MetricReporter = _
override protected def beforeEach(): Unit = {
super.beforeEach()
TestReporter.reset()
val newConfig = ConfigFactory.parseString("""kamon {
| metric {
| tick-interval = 50 ms
| optimistic-tick-alignment = no
| }
|}""".stripMargin).withFallback(ConfigFactory.load())
Kamon.registerModule("test", TestReporter)
Kamon.reconfigure(newConfig)
reporter = TestReporter
}
override protected def afterEach(): Unit = {
reporter.stop()
Kamon.reconfigure(ConfigFactory.load())
super.afterEach()
}
behavior of "KamonConsumer"
val initiator = "initiatorTest"
val namespaceDemo = "demo"
val namespaceGuest = "guest"
val actionWithCustomPackage = "apimgmt/createApi"
val actionWithDefaultPackage = "createApi"
val kind = "nodejs:10"
val memory = 256
it should "push user events to kamon" in {
createCustomTopic(EventConsumer.userEventTopic)
val consumer = createConsumer(kafkaPort, system.settings.config, KamonRecorder)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage").serialize)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
newActivationEvent(s"$namespaceDemo/$actionWithDefaultPackage").serialize)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
newActivationEvent(s"$namespaceGuest/$actionWithDefaultPackage").serialize)
sleep(sleepAfterProduce, "sleeping post produce")
consumer.shutdown().futureValue
sleep(4.second, "sleeping for Kamon reporters to get invoked")
// Custom package
TestReporter.counter(activationMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1
TestReporter
.counter(activationMetric, namespaceDemo, actionWithCustomPackage)
.filter((t) => t.tags.get(Lookups.plain(actionMemory)) == memory.toString)(0)
.value shouldBe 1
TestReporter
.counter(activationMetric, namespaceDemo, actionWithCustomPackage)
.filter((t) => t.tags.get(Lookups.plain(actionKind)) == kind)(0)
.value shouldBe 1
TestReporter
.counter(statusMetric, namespaceDemo, actionWithCustomPackage)
.filter((t) => t.tags.get(Lookups.plain(actionStatus)) == ActivationResponse.statusDeveloperError)(0)
.value shouldBe 1
TestReporter.counter(coldStartMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1
TestReporter.histogram(waitTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1
TestReporter.histogram(initTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1
TestReporter.histogram(durationMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1
// Default package
TestReporter.histogram(durationMetric, namespaceDemo, actionWithDefaultPackage).size shouldBe 1
// Blacklisted namespace should not be tracked
TestReporter.counter(activationMetric, namespaceGuest, actionWithDefaultPackage)(0).value shouldBe 0
// Blacklisted should be counted in "openwhisk.namespace.activations" metric
TestReporter.namespaceCounter(namespaceActivationMetric, namespaceGuest)(0).value shouldBe 1
}
private def newActivationEvent(actionPath: String) =
EventMessage(
"test",
Activation(actionPath, 2, 3.millis, 5.millis, 11.millis, kind, false, memory, None),
Subject("testuser"),
initiator,
UUID("test"),
Activation.typeName)
private object TestReporter extends MetricReporter {
var snapshotAccumulator = PeriodSnapshot.accumulator(Duration.ofDays(1), Duration.ZERO)
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
snapshotAccumulator.add(snapshot)
}
override def stop(): Unit = {}
override def reconfigure(config: Config): Unit = {}
def reset(): Unit = {
snapshotAccumulator = PeriodSnapshot.accumulator(Duration.ofDays(1), Duration.ZERO)
}
def counter(metricName: String, namespace: String, action: String) = {
snapshotAccumulator
.peek()
.counters
.filter(_.name == metricName)
.flatMap(_.instruments)
.filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
.filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
.filter(_.tags.get(Lookups.plain(actionName)) == action)
}
def namespaceCounter(metricName: String, namespace: String) = {
snapshotAccumulator
.peek()
.counters
.filter(_.name == metricName)
.flatMap(_.instruments)
.filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
.filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
}
def histogram(metricName: String, namespace: String, action: String) = {
snapshotAccumulator
.peek()
.histograms
.filter(_.name == metricName)
.flatMap(_.instruments)
.filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
.filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
.filter(_.tags.get(Lookups.plain(actionName)) == action)
}
}
}