/*
 * Copyright [2017] [B2W Digital]
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package org.marvin.executor.actions

import java.time.LocalDateTime

import akka.Done
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.testkit.{EventFilter, ImplicitSender, TestActorRef, TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.marvin.artifact.manager.ArtifactSaver.{SaveToLocal, SaveToRemote}
import org.marvin.exception.MarvinEExecutorException
import org.marvin.executor.actions.BatchAction.{BatchExecute, BatchExecutionStatus, BatchHealthCheck, BatchReload}
import org.marvin.executor.proxies.EngineProxy.{ExecuteBatch, HealthCheck, Reload}
import org.marvin.fixtures.MetadataMock
import org.marvin.model._
import org.marvin.util.{JsonUtil, LocalCache}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

import scala.concurrent.duration._

class BatchActionTest extends TestKit(
  ActorSystem("BatchActionTest", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")))
  with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with MockFactory {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "batch action actor" must {

    "send Done message" in {

      val mockedProxy = TestProbe()
      val mockedSaver = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("acquisitor", metadata, mockedProxy.ref, mockedSaver.ref, mockedCache)))

      batchAction ! Done
      expectNoMsg()
      EventFilter.info("Work Done!")
    }

    "send BatchExecutionStatus message with valid protocol" in {

      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("acquisitor", metadata, null, null, mockedCache)))

      val protocol = "fake_protocol"
      val batchExecution = new BatchExecution("acquisitor", protocol, LocalDateTime.now, Working)

      (mockedCache.load _).expects(protocol).returning(Option(batchExecution)).once()

      batchAction ! BatchExecutionStatus(protocol)

      expectMsg(JsonUtil.toJson(batchExecution))
    }

    "send BatchExecutionStatus message with invalid protocol" in {

      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = new LocalCache[BatchExecution](10, 1.minute)

      val batchAction = system.actorOf(Props(new MockedBatchAction("acquisitor", metadata, null, null, mockedCache)))

      val protocol = "fake_protocol"

      batchAction ! BatchExecutionStatus(protocol)

      val returnedMessage = expectMsgType[akka.actor.Status.Failure]
      returnedMessage.cause shouldBe a[MarvinEExecutorException]
    }

    "send BatchExecute message" in {

      val mockedProxy = TestProbe()
      val mockedSaver = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("acquisitor", metadata, mockedProxy.ref, mockedSaver.ref, mockedCache)))

      val protocol = "fake_protocol"
      val params = "fakeParams"

      (mockedCache.save(_: String, _: BatchExecution)).expects(protocol, new BatchExecution("acquisitor", protocol, LocalDateTime.now, Working)).once()
      (mockedCache.save(_: String, _: BatchExecution)).expects(protocol, new BatchExecution("acquisitor", protocol, LocalDateTime.now, Finished)).once()

      batchAction ! BatchExecute(protocol, params)

      mockedProxy.expectMsg(ExecuteBatch(protocol, params))
      mockedProxy.reply(Done)

      mockedSaver.expectMsg(SaveToRemote("initial_dataset", protocol))
      mockedSaver.reply(Done)

      expectNoMsg()
    }

    "send BatchExecute message to get exception from proxy [execute msg]" in {

      val mockedProxy = TestActorRef(new Actor {
        def receive = {
          case ExecuteBatch(protocol, params) => throw new Exception("boom")
        }
      })

      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("acquisitor", metadata, mockedProxy, null, mockedCache)))

      val protocol = "fake_protocol"
      val params = "fakeParams"

      (mockedCache.save(_: String, _: BatchExecution)).expects(protocol, new BatchExecution("acquisitor", protocol, LocalDateTime.now, Working)).once()
      (mockedCache.save(_: String, _: BatchExecution)).expects(protocol, new BatchExecution("acquisitor", protocol, LocalDateTime.now, Failed)).once()

      batchAction ! BatchExecute(protocol, params)

      expectNoMsg()
    }

    "send BatchExecute message to get exception from saver" in {

      val mockedSaver = TestActorRef(new Actor {
        def receive = {
          case SaveToRemote(artifactName, protocol) => throw new Exception("boom")
        }
      })

      val mockedProxy = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("acquisitor", metadata, mockedProxy.ref, mockedSaver, mockedCache)))

      val protocol = "fake_protocol"
      val params = "fakeParams"

      (mockedCache.save(_: String, _: BatchExecution)).expects(protocol, new BatchExecution("acquisitor", protocol, LocalDateTime.now, Working)).once()
      (mockedCache.save(_: String, _: BatchExecution)).expects(protocol, new BatchExecution("acquisitor", protocol, LocalDateTime.now, Failed)).once()

      batchAction ! BatchExecute(protocol, params)

      mockedProxy.expectMsg(ExecuteBatch(protocol, params))
      mockedProxy.reply(Done)

      expectNoMsg()
    }

    "send BatchReload message with single protocol" in {

      val mockedProxy = TestProbe()
      val mockedSaver = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("tpreparator", metadata, mockedProxy.ref, mockedSaver.ref, mockedCache)))

      val protocol = "acquisitor_12345protocol"

      batchAction ! BatchReload(protocol)

      mockedSaver.expectMsg(SaveToLocal("initial_dataset", protocol))
      mockedSaver.reply(Done)

      mockedProxy.expectMsg(Reload(protocol))
      mockedProxy.reply(Done)

      expectNoMsg()
    }

    "send BatchReload message with multiple protocol" in {

      val mockedProxy = TestProbe()
      val mockedSaver = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("evaluator", metadata, mockedProxy.ref, mockedSaver.ref, mockedCache)))

      val protocol = "tpreparator_12345protocol,trainer_12345protocol"

      batchAction ! BatchReload(protocol)

      mockedSaver.expectMsg(SaveToLocal("dataset", "tpreparator_12345protocol"))
      mockedSaver.reply(Done)

      mockedSaver.expectMsg(SaveToLocal("model", "trainer_12345protocol"))
      mockedSaver.reply(Done)

      mockedProxy.expectMsg(Reload(protocol))
      mockedProxy.reply(Done)

      expectNoMsg()
    }

    "send BatchHealthCheck message with OK" in {

      val mockedProxy = TestProbe()
      val mockedSaver = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("evaluator", metadata, mockedProxy.ref, mockedSaver.ref, mockedCache)))

      batchAction ! BatchHealthCheck

      mockedProxy.expectMsg(HealthCheck)
      mockedProxy.reply("OK")

      expectMsg("OK")
    }

    "send BatchHealthCheck message with NOK" in {

      val mockedProxy = TestProbe()
      val mockedSaver = TestProbe()
      val metadata = MetadataMock.simpleMockedMetadata()
      val mockedCache = mock[LocalCache[BatchExecution]]

      val batchAction = system.actorOf(Props(new MockedBatchAction("evaluator", metadata, mockedProxy.ref, mockedSaver.ref, mockedCache)))

      batchAction ! BatchHealthCheck

      mockedProxy.expectMsg(HealthCheck)
      mockedProxy.reply("NOK")

      expectMsg("NOK")
    }

  }

  class MockedBatchAction (actionName: String,
                           metadata: EngineMetadata,
                           _batchActionProxy: ActorRef,
                           _artifactSaver: ActorRef,
                          _cache: LocalCache[BatchExecution]
                          ) extends BatchAction (actionName, metadata){

    override def preStart() = {
      engineActionMetadata = metadata.actionsMap(actionName)
      artifactsToLoad = engineActionMetadata.artifactsToLoad.mkString(",")
      batchActionProxy = _batchActionProxy
      artifactSaver = _artifactSaver
      cache = _cache
    }
  }
}