blob: 82c7fe2dd8026e7637175daa3b369ce83f8dcb06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.util
import org.apache.gearpump.cluster.AppMasterContext
import org.apache.gearpump.cluster.worker.WorkerId
import scala.concurrent.{Await, ExecutionContext, Future}
import akka.actor.Actor.Receive
import akka.actor._
import akka.pattern.ask
import org.slf4j.Logger
import akka.util.Timeout
import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers}
import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult}
import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystems}
import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
import org.apache.gearpump.transport.HostPort
import scala.concurrent.duration.Duration
object ActorUtil {
private val LOG: Logger = LogUtil.getLogger(getClass)
def getSystemAddress(system: ActorSystem): Address = {
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
}
def getFullPath(system: ActorSystem, path: ActorPath): String = {
path.toStringWithAddress(getSystemAddress(system))
}
def getHostname(actor: ActorRef): String = {
val path = actor.path
path.address.host.getOrElse("localhost")
}
def defaultMsgHandler(actor: ActorRef): Receive = {
case msg: Any =>
LOG.error(s"Cannot find a matching message, $msg, forwarded from $actor")
}
def printActorSystemTree(system: ActorSystem): Unit = {
val clazz = system.getClass
val m = clazz.getDeclaredMethod("printTree")
m.setAccessible(true)
LOG.info(m.invoke(system).asInstanceOf[String])
}
/** Checks whether a actor is child actor by simply examining name */
// TODO: fix this, we should also check the path to root besides name
def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = {
if (null != child) {
parent.path.name == child.path.parent.name
} else {
false
}
}
def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + appId + "-executor" +
executorId
// TODO: Currently we explicitly require the master contacts to be started with this path pattern
// akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER
def getMasterActorPath(master: HostPort): ActorPath = {
import org.apache.gearpump.util.Constants.MASTER
ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER")
}
def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig,
sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): Unit = {
implicit val timeout = Constants.FUTURE_TIMEOUT
(master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
val resources = list.workers.map {
workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)
}.toArray
sender ! list
master.tell(StartExecutorSystems(resources, executorJvmConfig), sender)
}
}
def tellMasterIfApplicationReady(workerNum: Option[Int], executorSystemNum: Int,
appContext: AppMasterContext): Unit = {
if (workerNum.contains(executorSystemNum)) {
appContext.masterProxy ! ActivateAppMaster(appContext.appId)
}
}
def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext)
: Future[T] = {
implicit val timeout = Constants.FUTURE_TIMEOUT
val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result =>
if (result.appMaster.isSuccess) {
Future.successful(result.appMaster.get)
} else {
Future.failed(result.appMaster.failed.get)
}
}
appmaster.flatMap(askActor[T](_, msg))
}
def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext)
: Future[T] = {
implicit val timeout = Constants.FUTURE_TIMEOUT
val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId))
.flatMap { result =>
if (result.worker.isSuccess) {
Future.successful(result.worker.get)
} else {
Future.failed(result.worker.failed.get)
}
}
worker.flatMap(askActor[T](_, msg))
}
def askActor[T](actor: ActorRef, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
implicit val timeout = Constants.FUTURE_TIMEOUT
(actor ? msg).asInstanceOf[Future[T]]
}
def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = {
askActor(actor, msg, timeout, ActorRef.noSender)
}
def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef)
(implicit ex: ExecutionContext): T = {
Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf)
}
}