blob: 2587f2963e19faa30b3e5d651b64ca382a83c7e9 [file] [log] [blame]
package org.apache.marvin.executor.manager
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.{Actor, ActorLogging, Address}
import scala.collection.immutable
class ExecutorClusterListener(seedNodes: immutable.Seq[Address]) extends Actor with ActorLogging {
var cluster: Cluster = _
override def preStart(): Unit = {
cluster = Cluster(context.system)
log.info(s"Joining to the cluster ${context.system.name} ...")
cluster.joinSeedNodes(seedNodes)
log.info(s"Subscribing to the cluster ${context.system.name} ...")
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp], classOf[MemberEvent], classOf[UnreachableMember])
log.info(s"Cluster configuration done! :-P")
log.info(s"Cluster Node Address is ${cluster.selfAddress}")
}
override def postStop(): Unit = {
log.info(s"Leaving cluster ${context.system.name} :-( ...")
cluster.unsubscribe(self)
cluster.leave(cluster.selfAddress)
log.info("Left cluster with success!")
}
def receive = {
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}", member.address, previousStatus)
case _:MemberEvent =>
log.info("Unknow Message received ...")
}
}