blob: 7dbee7d4a079b31c58d5b74bb32f6ef2b58080ca [file] [log] [blame]
package org.apache.marvin.executor.proxies
import actions.BatchActionHandlerGrpc.BatchActionHandlerBlockingClient
import actions._
import akka.Done
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import com.typesafe.config.ConfigFactory
import org.apache.marvin.executor.proxies.EngineProxy.{ExecuteBatch, HealthCheck, Reload}
import org.apache.marvin.fixtures.MetadataMock
import org.apache.marvin.model.EngineActionMetadata
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class BatchActionProxyTest extends TestKit(
ActorSystem("BatchActionProxyTest", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")))
with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with MockFactory {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
"batch action proxy" should {
"receive ExecuteBatch message" in {
val metadata = MetadataMock.simpleMockedEngineActionMetadata("batch")
val artifacts = metadata.artifactsToLoad.mkString(",")
val client = mock[BatchActionHandlerBlockingClient]
val actor = system.actorOf(Props(new BatchActionProxyMock(metadata, client, artifacts)))
val protocol = "protocol"
val params = "{}"
val message = "check"
(client.RemoteExecute _ ).expects(BatchActionRequest(params)).once().returns(BatchActionResponse(message))
actor ! ExecuteBatch(protocol, params)
expectMsg(Done)
}
"receive OK HealthCheck message" in {
val metadata = MetadataMock.simpleMockedEngineActionMetadata("batch")
val artifacts = metadata.artifactsToLoad.mkString(",")
val client = mock[BatchActionHandlerBlockingClient]
val actor = system.actorOf(Props(new BatchActionProxyMock(metadata, client, artifacts)))
val status = HealthCheckResponse.Status.OK
(client.HealthCheck _ ).expects(HealthCheckRequest(artifacts)).once().returns(HealthCheckResponse(status))
actor ! HealthCheck
expectMsg(status)
}
"receive NOK HealthCheck message" in {
val metadata = MetadataMock.simpleMockedEngineActionMetadata("batch")
val artifacts = metadata.artifactsToLoad.mkString(",")
val client = mock[BatchActionHandlerBlockingClient]
val actor = system.actorOf(Props(new BatchActionProxyMock(metadata, client, artifacts)))
val status = HealthCheckResponse.Status.NOK
(client.HealthCheck _ ).expects(HealthCheckRequest(artifacts)).once().returns(HealthCheckResponse(status))
actor ! HealthCheck
expectMsg(status)
}
"receive Reload message" in {
val metadata = MetadataMock.simpleMockedEngineActionMetadata("batch")
val artifacts = metadata.artifactsToLoad.mkString(",")
val client = mock[BatchActionHandlerBlockingClient]
val actor = system.actorOf(Props(new BatchActionProxyMock(metadata, client, artifacts)))
val protocol = "protocol"
(client.RemoteReload _ ).expects(ReloadRequest(protocol, artifacts)).once().returns(ReloadResponse("Done"))
actor ! Reload(protocol)
expectMsg(Done)
}
"call preStart method wth success" in {
val metadata = MetadataMock.simpleMockedEngineActionMetadata("batch")
try{
system.actorOf(Props(new BatchActionProxy(metadata)))
assert(true)
}catch {
case _ =>
assert(false)
}
}
}
class BatchActionProxyMock(metadata: EngineActionMetadata, _engineClient: BatchActionHandlerBlockingClient, _artifacts: String) extends BatchActionProxy(metadata) {
def _preStart(): Unit = super.preStart()
override def preStart(): Unit = {
engineClient = _engineClient
artifacts = _artifacts
}
}
}