/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.openwhisk.http

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.{Http, ServerBuilder}
import akka.http.scaladsl.model.{HttpRequest, _}
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives._

import kamon.metric.MeasurementUnit
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._

import scala.collection.immutable.Seq
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

/**
 * This trait extends the Akka Directives and Actor with logging and transaction counting
 * facilities common to all OpenWhisk REST services.
 */
trait BasicHttpService extends Directives {

  val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING"

  /**
   * Gets the routes implemented by the HTTP service.
   *
   * @param transid the id for the transaction (every request is assigned an id)
   */
  def routes(implicit transid: TransactionId): Route

  /**
   * Gets the log level for a given route. The default is
   * InfoLevel so override as needed.
   *
   * @param route the route to determine the loglevel for
   * @return a log level for the route
   */
  def loglevelForRoute(route: String): Logging.LogLevel = Logging.InfoLevel

  /** Prioritized rejections based on relevance. */
  val prioritizeRejections = recoverRejections { rejections =>
    val priorityRejection = rejections.find {
      case rejection: UnacceptedResponseContentTypeRejection => true
      case rejection: ValidationRejection                    => true
      case _                                                 => false
    }

    priorityRejection.map(rejection => Rejected(Seq(rejection))).getOrElse(Rejected(rejections))
  }

  /**
   * Receives a message and runs the router.
   */
  def route: Route = {
    assignId { implicit transid =>
      respondWithHeader(transid.toHeader) {
        DebuggingDirectives.logRequest(logRequestInfo _) {
          DebuggingDirectives.logRequestResult(logResponseInfo _) {
            BasicDirectives.mapRequest(_.removeHeader(OW_EXTRA_LOGGING_HEADER)) {
              handleRejections(BasicHttpService.customRejectionHandler) {
                prioritizeRejections {
                  toStrictEntity(30.seconds) {
                    routes
                  }
                }
              }
            }
          }
        }
      }
    }
  }

  /** Assigns transaction id to every request. */
  protected def assignId = HeaderDirectives.optionalHeaderValueByName(OW_EXTRA_LOGGING_HEADER) flatMap { headerValue =>
    val extraLogging = headerValue match {
      // extract headers from HTTP request that indicates if additional logging should be enabled for this transaction.
      // Passing "on" as header content will enable additional logging for this transaction,
      // passing any other value will leave it as configured in the logging configuration
      case Some(value) => value.toLowerCase == "on"
      case None        => false
    }
    extract { req =>
      val tid =
        req.request.headers
          .find(_.is(TransactionId.generatorConfig.lowerCaseHeader))
          .map(_.value)
          .filterNot(_.startsWith(TransactionId.systemPrefix))
          .getOrElse {
            // As this is only a fallback, because the tid should be generated by nginx, this shouldn't be used.
            TransactionId.generateTid()
          }

      TransactionId(tid, extraLogging)
    }
  }

  /** Generates log entry for every request. */
  protected def logRequestInfo(req: HttpRequest)(implicit tid: TransactionId): LogEntry = {
    val m = req.method.name
    val p = req.uri.path.toString

    val q: String = {
      try {
        req.uri.query().toString
      } catch {
        case _: IllegalUriException => s"Bad query parameters:${req.uri.toString()}"
        case e: Exception           => s"Query parsing error: ${e.getMessage}"
      }
    }
    val l = loglevelForRoute(p)
    LogEntry(s"[$tid] $m $p $q", l)
  }

  protected def logResponseInfo(req: HttpRequest)(implicit tid: TransactionId): RouteResult => Option[LogEntry] = {
    case RouteResult.Complete(res: HttpResponse) =>
      val m = req.method.name
      val p = req.uri.path.toString
      val l = loglevelForRoute(p)

      val name = "BasicHttpService"

      val token =
        LogMarkerToken(
          "http",
          m.toLowerCase,
          LoggingMarkers.counter,
          Some(res.status.intValue.toString),
          Map("statusCode" -> res.status.intValue.toString))(MeasurementUnit.time.milliseconds)
      val marker = LogMarker(token, tid.deltaToStart, Some(tid.deltaToStart))

      MetricEmitter.emitHistogramMetric(token, tid.deltaToStart)
      MetricEmitter.emitCounterMetric(token)

      if (TransactionId.metricsLog) {
        Some(LogEntry(s"[$tid] [$name] $marker", l))
      } else {
        None
      }

    case _ => None // other kind of responses
  }
}

object BasicHttpService {

  /**
   * Starts an HTTP(S) route handler on given port and registers a shutdown hook.
   */
  def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")(
    implicit
    actorSystem: ActorSystem): Unit = {
    val httpsContext = config.map(Https.connectionContextServer(_))
    var httpBindingBuilder: ServerBuilder = Http().newServerAt(interface, port)
    if (httpsContext.isDefined) {
      httpBindingBuilder = httpBindingBuilder.enableHttps(httpsContext.get)
    }
    val httpBinding = httpBindingBuilder.bindFlow(route)
    addShutdownHook(httpBinding)
  }

  def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem): Unit = {
    implicit val executionContext = actorSystem.dispatcher
    sys.addShutdownHook {
      Await.result(binding.map(_.unbind()), 30.seconds)
      Await.result(actorSystem.whenTerminated, 30.seconds)
    }
  }

  /** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
  def customRejectionHandler(implicit transid: TransactionId) = {
    RejectionHandler.default.mapRejectionResponse {
      case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
        val error = ErrorResponse(ent.data.utf8String, transid).toJson
        res.withEntity(HttpEntity(ContentTypes.`application/json`, error.compactPrint))
      case x => x
    }
  }

}
