/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.openwhisk.core.monitoring.metrics

import java.time.Duration

import com.typesafe.config.{Config, ConfigFactory}
import kamon.metric.PeriodSnapshot
import kamon.module.MetricReporter
import kamon.Kamon
import kamon.tag.Lookups
import org.apache.openwhisk.core.connector.{Activation, EventMessage}
import org.apache.openwhisk.core.entity.{ActivationResponse, Subject, UUID}
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner

import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with KamonMetricNames {
  var reporter: MetricReporter = _

  override protected def beforeEach(): Unit = {
    super.beforeEach()
    TestReporter.reset()
    val newConfig = ConfigFactory.parseString("""kamon {
        |  metric {
        |    tick-interval = 50 ms
        |    optimistic-tick-alignment = no
        |  }
        |}""".stripMargin).withFallback(ConfigFactory.load())
    Kamon.registerModule("test", TestReporter)
    Kamon.reconfigure(newConfig)
    reporter = TestReporter
  }

  override protected def afterEach(): Unit = {
    reporter.stop()
    Kamon.reconfigure(ConfigFactory.load())
    super.afterEach()
  }

  behavior of "KamonConsumer"

  val initiator = "initiatorTest"
  val namespaceDemo = "demo"
  val namespaceGuest = "guest"
  val actionWithCustomPackage = "apimgmt/createApi"
  val actionWithDefaultPackage = "createApi"
  val kind = "nodejs:10"
  val memory = 256

  it should "push user events to kamon" in {
    createCustomTopic(EventConsumer.userEventTopic)

    val consumer = createConsumer(kafkaPort, system.settings.config, KamonRecorder)

    publishStringMessageToKafka(
      EventConsumer.userEventTopic,
      newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage").serialize)
    publishStringMessageToKafka(
      EventConsumer.userEventTopic,
      newActivationEvent(s"$namespaceDemo/$actionWithDefaultPackage").serialize)

    publishStringMessageToKafka(
      EventConsumer.userEventTopic,
      newActivationEvent(s"$namespaceGuest/$actionWithDefaultPackage").serialize)

    sleep(sleepAfterProduce, "sleeping post produce")
    consumer.shutdown().futureValue
    sleep(4.second, "sleeping for Kamon reporters to get invoked")

    // Custom package
    TestReporter.counter(activationMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1
    TestReporter
      .counter(activationMetric, namespaceDemo, actionWithCustomPackage)
      .filter((t) => t.tags.get(Lookups.plain(actionMemory)) == memory.toString)(0)
      .value shouldBe 1
    TestReporter
      .counter(activationMetric, namespaceDemo, actionWithCustomPackage)
      .filter((t) => t.tags.get(Lookups.plain(actionKind)) == kind)(0)
      .value shouldBe 1
    TestReporter
      .counter(statusMetric, namespaceDemo, actionWithCustomPackage)
      .filter((t) => t.tags.get(Lookups.plain(actionStatus)) == ActivationResponse.statusDeveloperError)(0)
      .value shouldBe 1
    TestReporter.counter(coldStartMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1
    TestReporter.histogram(waitTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1
    TestReporter.histogram(initTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1
    TestReporter.histogram(durationMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1

    // Default package
    TestReporter.histogram(durationMetric, namespaceDemo, actionWithDefaultPackage).size shouldBe 1

    // Blacklisted namespace should not be tracked
    TestReporter.counter(activationMetric, namespaceGuest, actionWithDefaultPackage)(0).value shouldBe 0

    // Blacklisted should be counted in "openwhisk.namespace.activations" metric
    TestReporter.namespaceCounter(namespaceActivationMetric, namespaceGuest)(0).value shouldBe 1
  }

  private def newActivationEvent(actionPath: String) =
    EventMessage(
      "test",
      Activation(actionPath, 2, 3.millis, 5.millis, 11.millis, kind, false, memory, None),
      Subject("testuser"),
      initiator,
      UUID("test"),
      Activation.typeName)

  private object TestReporter extends MetricReporter {
    var snapshotAccumulator = PeriodSnapshot.accumulator(Duration.ofDays(1), Duration.ZERO)
    override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
      snapshotAccumulator.add(snapshot)
    }

    override def stop(): Unit = {}
    override def reconfigure(config: Config): Unit = {}

    def reset(): Unit = {
      snapshotAccumulator = PeriodSnapshot.accumulator(Duration.ofDays(1), Duration.ZERO)
    }

    def counter(metricName: String, namespace: String, action: String) = {
      snapshotAccumulator
        .peek()
        .counters
        .filter(_.name == metricName)
        .flatMap(_.instruments)
        .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
        .filter(_.tags.get(Lookups.plain(actionName)) == action)
    }

    def namespaceCounter(metricName: String, namespace: String) = {
      snapshotAccumulator
        .peek()
        .counters
        .filter(_.name == metricName)
        .flatMap(_.instruments)
        .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
    }

    def histogram(metricName: String, namespace: String, action: String) = {
      snapshotAccumulator
        .peek()
        .histograms
        .filter(_.name == metricName)
        .flatMap(_.instruments)
        .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
        .filter(_.tags.get(Lookups.plain(actionName)) == action)
    }
  }
}
