blob: cda635177e72e1b78e718ce7be453db4d6052a78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.executor.yarn.executors
import java.io.ByteArrayOutputStream
import java.net.{InetAddress, URLDecoder}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.common.utils.ActiveNotifier
import org.apache.amaterasu.executor.common.executors.ProvidersFactory
import scala.collection.JavaConverters._
class ActionsExecutor extends Logging {
var master: String = _
var jobId: String = _
var actionName: String = _
var taskData: TaskData = _
var execData: ExecData = _
var providersFactory: ProvidersFactory = _
def execute(): Unit = {
val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
runner match {
case Some(r) => {
try {
r.executeSource(taskData.getSrc, actionName, taskData.getExports)
log.info("Completed action")
System.exit(0)
} catch {
case e: Exception => {
log.error("Exception in execute source", e)
System.exit(100)
}
}
}
case None =>
log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
System.exit(101)
}
}
}
object ActionsExecutorLauncher extends Logging with App {
val hostName = InetAddress.getLocalHost.getHostName
println(s"Hostname resolved to: $hostName")
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
log.info("Starting actions executor")
val jobId = this.args(0)
val master = this.args(1)
val actionName = this.args(2)
val notificationsAddress = this.args(6)
log.info("parsing task data")
val taskData = mapper.readValue(URLDecoder.decode(this.args(3), "UTF-8"), classOf[TaskData])
log.info("parsing executor data")
val execData = mapper.readValue(URLDecoder.decode(this.args(4), "UTF-8"), classOf[ExecData])
val taskIdAndContainerId = this.args(5)
val actionsExecutor: ActionsExecutor = new ActionsExecutor
actionsExecutor.master = master
actionsExecutor.actionName = actionName
actionsExecutor.taskData = taskData
actionsExecutor.execData = execData
log.info("Setup executor")
val baos = new ByteArrayOutputStream()
val notifier = new ActiveNotifier(notificationsAddress)
actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
actionsExecutor.execute()
}