blob: 2164d4583e57419de3eba61f9875b4cbbae2d566 [file] [log] [blame]
/*
* Copyright [2019] [Apache Software Foundation]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.marvin.executor.statemachine
import actions.HealthCheckResponse.Status
import actions.OnlineActionResponse
import akka.actor.ActorSystem
import akka.testkit.{EventFilter, ImplicitSender, TestFSMRef, TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.apache.marvin.exception.MarvinEExecutorException
import org.apache.marvin.executor.actions.OnlineAction.{OnlineExecute, OnlineHealthCheck, OnlineReload}
import org.apache.marvin.executor.proxies.Reloaded
import org.apache.marvin.fixtures.MetadataMock
import org.scalatest.{Matchers, WordSpecLike}
class PredictorFSMTest extends TestKit(
ActorSystem("EngineFSMTest", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")))
with ImplicitSender with WordSpecLike with Matchers {
"engine finite state machine" should {
"start with Unavailable" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.stateName should be (Unavailable)
fsm.stateData should be (NoModel)
}
"go to Reloading when Unavailable and receive Reload" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
val testProtocol = "protocol1234"
fsm ! Reload(testProtocol)
probe.expectMsg(OnlineReload(testProtocol))
fsm.stateName should be (Reloading)
fsm.stateData should be (ToReload(testProtocol))
}
"go to Reloading when Unavailable and receive Reload with no protocol" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm ! Reload()
probe.expectMsg(OnlineReload(""))
fsm.stateName should be (Reloading)
fsm.stateData should be (ToReload(""))
}
"stay unavailable and send a failure when unavailable and receive unknown message" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm ! "any message"
probe.expectNoMsg()
fsm.stateName shouldBe Unavailable
val returnedMessage = expectMsgType[akka.actor.Status.Failure]
returnedMessage.cause shouldBe a[MarvinEExecutorException]
}
"go to Ready when Reloading and receive Reloaded" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Reloading)
fsm ! Reloaded("protocol123")
fsm.stateName should be (Ready)
fsm.stateData should be (Model("protocol123"))
}
"receive failure and go to Unavailable when Reloading" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Reloading)
fsm ! OnlineExecute("test", "test")
probe.expectNoMsg
fsm.stateName should be (Unavailable)
val returnedMessage = expectMsgType[akka.actor.Status.Failure]
returnedMessage.cause shouldBe a[MarvinEExecutorException]
}
"forward the message when Ready" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Ready)
fsm ! OnlineExecute("test", "testMessage")
probe.expectMsg(OnlineExecute("test", "testMessage"))
probe.reply(OnlineActionResponse(message = "testResult"))
expectMsg(OnlineActionResponse(message = "testResult"))
fsm.stateName should be (Ready)
}
"forward the health message when Ready" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Ready)
fsm ! OnlineHealthCheck
probe.expectMsg(OnlineHealthCheck)
probe.reply(Status.OK)
expectMsg(Status.OK)
fsm.stateName should be (Ready)
}
"go to Reloading when Ready" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Ready)
val protocol = "protocol99"
fsm ! Reload(protocol)
probe.expectMsg(OnlineReload(protocol))
fsm.stateName should be (Reloading)
fsm.stateData should be (ToReload(protocol))
}
"go to Reloading when Ready and receive Reload with no protocol" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Ready)
val protocol = null
fsm ! Reload(protocol)
probe.expectMsg(OnlineReload(null))
fsm.stateName should be (Reloading)
fsm.stateData should be (ToReload(null))
}
"stay in the same state and log a warning when unknown event is sent" in {
val probe = TestProbe()
val fsm = TestFSMRef[State, Data, PredictorFSM](new PredictorFSM(probe.ref, MetadataMock.simpleMockedMetadata()))
fsm.setState(Ready)
EventFilter.warning(message = "Received an unknown event unhandled message. The current state/data is Ready/NoModel.", occurrences = 1) intercept {
fsm ! "unhandled message"
}
fsm.stateName should be (Ready)
}
}
}