blob: 9ab75bef32da26b2466d94779030c399d46670c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.executor.mesos.executors
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.executor.common.executors.ProvidersFactory
import org.apache.mesos.Protos._
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
class MesosActionsExecutor extends Executor with Logging {
var master: String = _
var executorDriver: ExecutorDriver = _
var sc: SparkContext = _
var jobId: String = _
var actionName: String = _
// var sparkScalaRunner: SparkScalaRunner = _
// var pySparkRunner: PySparkRunner = _
var notifier: MesosNotifier = _
var providersFactory: ProvidersFactory = _
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
override def shutdown(driver: ExecutorDriver) = {
}
override def killTask(driver: ExecutorDriver, taskId: TaskID) = ???
override def disconnected(driver: ExecutorDriver) = ???
override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo) = {
this.executorDriver = driver
}
override def error(driver: ExecutorDriver, message: String) = {
val status = TaskStatus.newBuilder
.setData(ByteString.copyFromUtf8(message))
.setState(TaskState.TASK_ERROR).build()
driver.sendStatusUpdate(status)
}
override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]) = ???
override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {
this.executorDriver = driver
val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData])
// this is used to resolve the spark drier address
val hostName = slaveInfo.getHostname
notifier = new MesosNotifier(driver)
notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
val outStream = new ByteArrayOutputStream()
providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties")
}
override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = {
notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}")
log.debug(s"launching task: $taskInfo")
val status = TaskStatus.newBuilder
.setTaskId(taskInfo.getTaskId)
.setState(TaskState.TASK_STARTING).build()
driver.sendStatusUpdate(status)
val task = Future {
val taskData = mapper.readValue(new ByteArrayInputStream(taskInfo.getData.toByteArray), classOf[TaskData])
val status = TaskStatus.newBuilder
.setTaskId(taskInfo.getTaskId)
.setState(TaskState.TASK_RUNNING).build()
driver.sendStatusUpdate(status)
val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
runner match {
case Some(r) => r.executeSource(taskData.src, actionName, taskData.exports.asJava)
case None =>
notifier.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
None
}
}
task onComplete {
case Failure(t) =>
println(s"launching task failed: ${t.getMessage}")
System.exit(1)
case Success(ts) =>
driver.sendStatusUpdate(TaskStatus.newBuilder()
.setTaskId(taskInfo.getTaskId)
.setState(TaskState.TASK_FINISHED).build())
notifier.info(s"complete task: ${taskInfo.getTaskId.getValue}")
}
}
}
object MesosActionsExecutor extends Logging {
def main(args: Array[String]) {
System.loadLibrary("mesos")
log.debug("Starting a new ActionExecutor")
val executor = new MesosActionsExecutor
executor.jobId = args(0)
executor.master = args(1)
executor.actionName = args(2)
val driver = new MesosExecutorDriver(executor)
driver.run()
}
}