blob: 0e9f1ef77e453adb63ba10dc19ca59cc1c7a8040 [file] [log] [blame]
/*
* Copyright [2019] [Apache Software Foundation]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.marvin.executor.statemachine
import akka.actor.{ActorRef, FSM, Props}
import org.apache.marvin.exception.MarvinEExecutorException
import org.apache.marvin.executor.actions.OnlineAction
import org.apache.marvin.executor.actions.OnlineAction._
import org.apache.marvin.executor.proxies.{FailedToReload, Reloaded}
import org.apache.marvin.model.EngineMetadata
import scala.concurrent.duration._
//receive events
final case class Reload(protocol: String = "")
//states
sealed trait State
case object Unavailable extends State
case object Reloading extends State
case object Ready extends State
sealed trait Data
case object NoModel extends Data
final case class ToReload(protocol: String) extends Data
final case class Model(protocol: String) extends Data
class PredictorFSM(var predictorActor: ActorRef, metadata: EngineMetadata) extends FSM[State, Data]{
def this(metadata: EngineMetadata) = this(null, metadata)
var reloadStateTimeout: FiniteDuration = metadata.reloadStateTimeout.getOrElse(180000D) milliseconds
override def preStart() {
if (predictorActor == null) predictorActor = context.system.actorOf(Props(new OnlineAction("predictor", metadata)), name = "predictorActor")
}
startWith(Unavailable, NoModel)
when(Unavailable) {
case Event(Reload(protocol), _) => {
predictorActor ! OnlineReload(protocol = protocol)
goto(Reloading) using ToReload(protocol)
}
case Event(e, s) => {
log.warning("Engine is unavailable, not possible to perform event {} in state {}/{}", e, stateName, s)
sender ! akka.actor.Status.Failure(new MarvinEExecutorException(
"It's not possible to process the request now, the model is unavailable. Perform a reload and try again."))
stay
}
}
when(Reloading, stateTimeout = reloadStateTimeout) {
case Event(Reloaded(protocol), _) => {
goto(Ready) using Model(protocol)
}
case Event(FailedToReload(protocol), _) => {
if(protocol == null || protocol.isEmpty)
log.warning("No valid model protocol found, " +
"if this is the first start up of the server without any trained model, please ignore the previous IOError from toolbox!")
else
log.error(s"Failed to reload with protocol {$protocol}")
goto(Unavailable)
}
case Event(StateTimeout, _) => {
log.warning("Reloading state timed out.")
goto(Unavailable)
}
case Event(e, s) => {
log.warning("Engine is reloading, not possible to perform event {} in state {}/{}", e, stateName, s)
sender ! akka.actor.Status.Failure(new MarvinEExecutorException(
"It's not possible to process the request now, the model is being reloaded."))
stay
}
}
when(Ready) {
case Event(OnlineExecute(message, params), _) => {
predictorActor forward OnlineExecute(message, params)
stay
}
case Event(Reload(protocol), _) => {
predictorActor ! OnlineReload(protocol = protocol)
goto(Reloading) using ToReload(protocol)
}
case Event(OnlineHealthCheck, _) => {
predictorActor forward OnlineHealthCheck
stay
}
}
whenUnhandled {
case Event(e, s ) =>
log.warning("Received an unknown event {}. The current state/data is {}/{}.", e, stateName, s)
stay
}
onTransition{
case Ready -> Reloading =>
log.info("Received a message to reload the model.")
case Reloading -> Ready =>
log.info("Reloaded the model with success.")
}
initialize()
}