/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.openwhisk.core.controller.test

import java.time.Instant

import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller
import akka.http.scaladsl.server.Route
import spray.json._
import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.controller.WhiskActionsApi
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.http.Messages._

import scala.util.Success

/**
 * Tests Conductor Actions API.
 *
 * Unit tests of the controller service as a standalone component.
 * These tests exercise a fresh instance of the service object in memory -- these
 * tests do NOT communication with a whisk deployment.
 */
@RunWith(classOf[JUnitRunner])
class ConductorsApiTests extends ControllerTestCommon with WhiskActionsApi {

  /** Conductors API tests */
  behavior of "Conductor"

  val creds = WhiskAuthHelpers.newIdentity()
  val namespace = EntityPath(creds.subject.asString)
  val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"

  val alternateCreds = WhiskAuthHelpers.newIdentity()
  val alternateNamespace = EntityPath(alternateCreds.subject.asString)

  // test actions
  val echo = MakeName.next("echo")
  val conductor = MakeName.next("conductor")
  val step = MakeName.next("step")
  val missing = MakeName.next("missingAction") // undefined
  val invalid = "invalid#Action" // invalid name

  val testString = "this is a test"
  val duration = 42

  val limit = whiskConfig.actionSequenceLimit.toInt

  override val loadBalancer = new FakeLoadBalancerService(whiskConfig)
  override val activationIdFactory = new ActivationId.ActivationIdGenerator() {}

  it should "invoke a conductor action with no dynamic steps" in {
    implicit val tid = transid()
    put(entityStore, WhiskAction(namespace, echo, jsDefault("??"), annotations = Parameters("conductor", "true")))

    // a normal result
    Post(s"$collectionPath/${echo}?blocking=true", JsObject("payload" -> testString.toJson)) ~> Route.seal(
      routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("payload" -> testString.toJson)
      response.fields("duration") shouldBe duration.toJson
      val annotations = response.fields("annotations").convertTo[Parameters]
      annotations.getAs[Boolean]("conductor") shouldBe Success(true)
      annotations.getAs[String]("kind") shouldBe Success("sequence")
      annotations.getAs[Boolean]("topmost") shouldBe Success(true)
      annotations.get("limits") should not be None
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
    }

    // an error result
    Post(s"$collectionPath/${echo}?blocking=true", JsObject("error" -> testString.toJson)) ~> Route.seal(routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> testString.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
    }

    // a wrapped result { params: result } is unwrapped by the controller
    Post(s"$collectionPath/${echo}?blocking=true", JsObject("params" -> JsObject("payload" -> testString.toJson))) ~> Route
      .seal(routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("payload" -> testString.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
    }

    // an invalid action name
    Post(s"$collectionPath/${echo}?blocking=true", JsObject("payload" -> testString.toJson, "action" -> invalid.toJson)) ~> Route
      .seal(routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject(
        "error" -> compositionComponentInvalid(invalid.toJson).toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 2
    }

    // an undefined action
    Post(s"$collectionPath/${echo}?blocking=true", JsObject("payload" -> testString.toJson, "action" -> missing.toJson)) ~> Route
      .seal(routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject(
        "error" -> compositionComponentNotFound(s"$namespace/$missing").toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 2
    }
  }

  it should "invoke a conductor action with dynamic steps" in {
    implicit val tid = transid()
    put(entityStore, WhiskAction(namespace, conductor, jsDefault("??"), annotations = Parameters("conductor", "true")))
    put(entityStore, WhiskAction(namespace, step, jsDefault("??")))
    put(entityStore, WhiskAction(alternateNamespace, step, jsDefault("??"))) // forbidden action
    val forbidden = s"/$alternateNamespace/$step" // forbidden action name

    // dynamically invoke step action
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> step.toJson, "params" -> JsObject("n" -> 1.toJson))) ~> Route.seal(routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 2.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
      response.fields("duration") shouldBe (3 * duration).toJson
    }

    // dynamically invoke step action with an error result
    Post(s"$collectionPath/${conductor}?blocking=true", JsObject("action" -> step.toJson)) ~> Route.seal(routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> "missing parameter".toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
      response.fields("duration") shouldBe (3 * duration).toJson
    }

    // dynamically invoke step action, forwarding state
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> step.toJson, "state" -> JsObject("witness" -> 42.toJson), "n" -> 1.toJson)) ~> Route.seal(
      routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 2.toJson, "witness" -> 42.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
      response.fields("duration") shouldBe (3 * duration).toJson
    }

    // dynamically invoke a forbidden action
    Post(s"$collectionPath/${conductor}?blocking=true", JsObject("action" -> forbidden.toJson)) ~> Route.seal(
      routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject(
        "error" -> compositionComponentNotAccessible(forbidden.drop(1)).toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 2
    }

    // dynamically invoke step action twice, forwarding state
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> step.toJson, "state" -> JsObject("action" -> step.toJson), "n" -> 1.toJson)) ~> Route.seal(
      routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 3.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 5
      response.fields("duration") shouldBe (5 * duration).toJson
    }

    // invoke nested conductor with single step
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> conductor.toJson, "params" -> JsObject("action" -> step.toJson), "n" -> 1.toJson)) ~> Route
      .seal(routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 2.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 3
      response.fields("duration") shouldBe (5 * duration).toJson
    }

    // nested step followed by outer step
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject(
        "action" -> conductor.toJson,
        "state" -> JsObject("action" -> step.toJson),
        "params" -> JsObject("action" -> step.toJson),
        "n" -> 1.toJson)) ~> Route.seal(routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 3.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 5
      response.fields("duration") shouldBe (7 * duration).toJson
    }

    // two levels of nesting, three steps
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject(
        "action" -> conductor.toJson,
        "state" -> JsObject("action" -> step.toJson),
        "params" -> JsObject(
          "action" -> conductor.toJson,
          "state" -> JsObject("action" -> step.toJson),
          "params" -> JsObject("action" -> step.toJson)),
        "n" -> 1.toJson)) ~> Route.seal(routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 4.toJson)
      response.fields("logs").convertTo[JsArray].elements.size shouldBe 5
      response.fields("duration") shouldBe (11 * duration).toJson
    }
  }

  it should "abort if composition is too long" in {
    implicit val tid = transid()
    put(entityStore, WhiskAction(namespace, conductor, jsDefault("??"), annotations = Parameters("conductor", "true")))
    put(entityStore, WhiskAction(namespace, step, jsDefault("??")))

    // stay just below limit
    var params = Map[String, JsValue]()
    for (i <- 1 to limit) {
      params = Map("action" -> step.toJson, "state" -> JsObject(params))
    }
    Post(s"$collectionPath/${conductor}?blocking=true", JsObject(params + ("n" -> 0.toJson))) ~> Route.seal(
      routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> limit.toJson)
      response.fields("duration") shouldBe (101 * duration).toJson
    }

    // add one extra step
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> step.toJson, "state" -> JsObject(params), "n" -> 0.toJson)) ~> Route.seal(routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> compositionIsTooLong.toJson)
    }

    // nesting a composition at the limit should be ok
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> conductor.toJson, "params" -> JsObject(params), "n" -> 0.toJson)) ~> Route.seal(
      routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> limit.toJson)
    }

    // nesting a composition beyond the limit should fail
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject(
        "action" -> conductor.toJson,
        "params" -> JsObject("action" -> step.toJson, "state" -> JsObject(params)),
        "n" -> 0.toJson)) ~> Route.seal(routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> compositionIsTooLong.toJson)
    }

    // recursing at the limit should be ok
    params = Map[String, JsValue]()
    for (i <- 1 to limit) {
      params = Map("action" -> conductor.toJson, "params" -> JsObject(params))
    }
    Post(s"$collectionPath/${conductor}?blocking=true", JsObject(params + ("n" -> 0.toJson))) ~> Route.seal(
      routes(creds)) ~> check {
      status should be(OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("n" -> 0.toJson)
    }

    // recursing beyond the limit should fail
    Post(
      s"$collectionPath/${conductor}?blocking=true",
      JsObject("action" -> conductor.toJson, "params" -> JsObject(params), "n" -> 0.toJson)) ~> Route.seal(
      routes(creds)) ~> check {
      status should not be (OK)
      val response = responseAs[JsObject]
      response.fields("response").asJsObject.fields("status") shouldBe "application error".toJson
      response.fields("response").asJsObject.fields("result") shouldBe JsObject("error" -> compositionIsTooLong.toJson)
    }
  }

  // fake load balancer to emulate a handful of actions
  class FakeLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionContext)
      extends DegenerateLoadBalancerService(config) {

    private def respond(action: ExecutableWhiskActionMetaData, msg: ActivationMessage, result: JsObject) = {
      val response =
        if (result.fields.get("error") isDefined) ActivationResponse(ActivationResponse.ApplicationError, Some(result))
        else ActivationResponse.success(Some(result))
      val start = Instant.now
      WhiskActivation(
        action.namespace,
        action.name,
        msg.user.subject,
        msg.activationId,
        start,
        end = start.plusMillis(duration),
        response = response)
    }

    override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
      implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
      msg.content map { args =>
        Future.successful {
          action.name match {
            case `echo` => // echo action
              Future(Right(respond(action, msg, args)))
            case `conductor` => // see tests/dat/actions/conductor.js
              val result =
                if (args.fields.get("error") isDefined) args
                else {
                  val action = args.fields.get("action") map { action =>
                    Map("action" -> action)
                  } getOrElse Map.empty
                  val state = args.fields.get("state") map { state =>
                    Map("state" -> state)
                  } getOrElse Map.empty
                  val wrappedParams = args.fields.getOrElse("params", JsObject.empty).asJsObject.fields
                  val escapedParams = args.fields - "action" - "state" - "params"
                  val params = Map("params" -> JsObject(wrappedParams ++ escapedParams))
                  JsObject(params ++ action ++ state)
                }
              Future(Right(respond(action, msg, result)))
            case `step` => // see tests/dat/actions/step.js
              val result = args.fields.get("n") map { n =>
                JsObject("n" -> (n.convertTo[BigDecimal] + 1).toJson)
              } getOrElse {
                JsObject("error" -> "missing parameter".toJson)
              }
              Future(Right(respond(action, msg, result)))
            case _ =>
              Future.failed(new IllegalArgumentException("Unknown action invoked in conductor test"))
          }
        }
      } getOrElse Future.failed(new IllegalArgumentException("No invocation parameters in conductor test"))
  }
}
