yarm containers resources are using the new SDK API
diff --git a/build.gradle b/build.gradle
index dc63d02..7b79da4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -15,7 +15,7 @@
* limitations under the License.
*/
buildscript {
- ext.kotlin_version = '1.3.0'
+ ext.kotlin_version = '1.3.21'
repositories {
mavenCentral()
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 5fca169..31d696c 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -92,7 +92,7 @@
}
- val YARN = new YARN()
+ val yarn = new YARN()
class Spark {
var home: String = ""
@@ -125,7 +125,7 @@
}
}
- object Jobs {
+ class Jobs {
var cpus: Double = 1
var mem: Long = 1024
@@ -137,10 +137,10 @@
if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
- Tasks.load(props)
+ tasks.load(props)
}
- object Tasks {
+ class Tasks {
var attempts: Int = 3
var cpus: Int = 1
@@ -155,6 +155,8 @@
}
}
+ val tasks = new Tasks()
+
}
object AWS {
@@ -214,9 +216,11 @@
Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
- Jobs.load(props)
+ val jobsss = new Jobs()
+ jobsss.load(props)
+
Webserver.load(props)
- YARN.load(props)
+ yarn.load(props)
spark.load(props)
distLocation match {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index 7104e28..5b3e842 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -68,7 +68,7 @@
override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
case "mesos" => Map[String, String]("SPARK_HOME" ->s"spark-${conf.Webserver.sparkVersion}","SPARK_HOME_DOCKER" -> "/opt/spark/")
- case "yarn" => Map[String, String]("SPARK_HOME" -> "spark")
+ case "yarn" => Map[String, String]("SPARK_HOME" -> "spark-${conf.Webserver.sparkVersion}")
case _ => Map[String, String]()
}
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 3499791..7ee53d1 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -79,6 +79,7 @@
runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
compile group: 'com.andreapivetta.kolor', name: 'kolor', version: '0.0.2'
compile group: 'com.beust', name: 'klaxon', version: '5.0.1'
+ compile group: 'com.github.ajalt', name: 'clikt', version: '1.6.0'
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect"
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
new file mode 100644
index 0000000..413f2cd
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.dsl.GitUtil
+import org.apache.amaterasu.leader.common.dsl.JobParser
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+object JobLoader : KLogging() {
+
+ fun loadJob(src: String, branch: String, jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+ // creating the jobs znode and storing the source repo and branch
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/repo", src.toByteArray())
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/branch", branch.toByteArray())
+
+ val maki: String = loadMaki(src, branch)
+
+ return createJobManager(maki, jobId, client, attempts, actionsQueue)
+
+ }
+
+ fun createJobManager(maki: String, jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+ return JobParser.parse(
+ jobId,
+ maki,
+ actionsQueue,
+ client,
+ attempts
+ )
+ }
+
+ fun loadMaki(src: String, branch: String): String {
+
+ // cloning the git repo
+ log.debug("getting repo: $src, for branch $branch")
+ GitUtil.cloneRepo(src, branch)
+
+ // parsing the maki.yaml and creating a JobManager to
+ // coordinate the workflow based on the file
+ val maki = JobParser.loadMakiFile()
+ return maki
+ }
+
+ fun reloadJob(jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+ //val jobState = client.getChildren.forPath(s"/$jobId")
+ val src = String(client.data.forPath("/$jobId/repo"))
+ val branch = String(client.data.forPath("/$jobId/branch"))
+
+ val maki: String = loadMaki(src, branch)
+
+ val jobManager: JobManager = createJobManager(maki, jobId, client, attempts, actionsQueue)
+ restoreJobState(jobManager, jobId, client)
+
+ jobManager.start()
+ return jobManager
+ }
+
+ fun restoreJobState(jobManager: JobManager, jobId: String, client: CuratorFramework): Unit {
+
+ val tasks = client.children.forPath("/$jobId").filter { it.startsWith("task") }
+
+ for (task in tasks) {
+
+ val status = ActionStatus.valueOf(client.data.forPath("/$jobId/$task").toString())
+ if (status == ActionStatus.Queued || status == ActionStatus.Started) {
+ jobManager.reQueueAction(task.substring(task.indexOf("task-") + 5))
+ }
+
+ }
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
new file mode 100644
index 0000000..cf3e10d
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.frameworls
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.reflections.Reflections
+
+class FrameworkProvidersFactory(val env: String, val config: ClusterConfig) : KLogging() {
+
+ var providers: Map<String, FrameworkSetupProvider>
+
+ init {
+ val reflections = Reflections(ClassLoader::class.java)
+ val runnerTypes = reflections.getSubTypesOf(FrameworkSetupProvider::class.java)
+
+
+ providers = runnerTypes.map {
+
+
+ val provider = it.newInstance()
+
+ provider.init(env, config)
+ log.info("a provider for group ${provider.groupIdentifier} was created")
+
+ provider.groupIdentifier to provider
+
+ }.toMap()
+ }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt
new file mode 100644
index 0000000..132bd77
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.launcher
+
+data class AmaOpts(
+ var repo: String = "",
+ var branch: String = "master",
+ var env: String = "default",
+ var name: String = "amaterasu-job",
+ var jobId: String = "",
+ var newJobId: String = "",
+ var report: String = "code",
+ var home: String = "") {
+
+ fun toCmdString(): String {
+
+ var cmd = " --repo $repo --branch $branch --env $env --name $name --report $report --home $home"
+ if (jobId.isNotEmpty()) {
+ cmd += " --job-id $jobId"
+ }
+ return cmd
+ }
+
+ override fun toString(): String {
+ return toCmdString()
+ }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt
new file mode 100644
index 0000000..330ccbc
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.launcher
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.prompt
+
+abstract class ArgsParser : CliktCommand() {
+
+ private val repo: String by option(help = "The service address").prompt("Please provide an Amaterasu Reop")
+ private val branch: String by option(help = "The branch to be executed (default is master)").default("master")
+ private val env: String by option(help = "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)").default("default")
+ private val name: String by option(help = "The name of the job").default("amaterasu-job")
+ private val jobId: String by option("--job-id", help = "The jobId - should be passed only when resuming a job").default("")
+ private val newJobId: String by option("--new-job-id" ,help = "The jobId - should never be passed by a user").default("")
+ private val report: String by option(help = "The level of reporting").default("code")
+ private val home: String by option(help = "").default("")
+
+ val opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+}
\ No newline at end of file
diff --git a/leader-yarn/build.gradle b/leader-yarn/build.gradle
index 5f05943..f44a7e8 100644
--- a/leader-yarn/build.gradle
+++ b/leader-yarn/build.gradle
@@ -14,6 +14,84 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+buildscript {
+ repositories {
+ mavenCentral()
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+ maven {
+ url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+ }
+ }
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+ }
+}
+plugins {
+ id "com.github.johnrengelman.shadow" version "2.0.4"
+ id 'scala'
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+ filters {
+ engines {
+ include 'spek'
+ }
+ }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+repositories {
+ maven { url "https://plugins.gradle.org/m2/" }
+ maven { url 'http://repository.jetbrains.com/all' }
+ maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+ maven { url "http://dl.bintray.com/jetbrains/spek" }
+ maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+ mavenCentral()
+ jcenter()
+}
+
+dependencies {
+ compile project(':leader-common')
+ compile project(':amaterasu-sdk')
+
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
+ compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.1.1'
+
+}
+
+task copyToHomeRoot(type: Copy) {
+ from 'src/main/scripts'
+ into '../build/amaterasu/'
+}
+
+task copyToHomeBin(type: Copy) {
+ dependsOn shadowJar
+ from 'build/libs'
+ into '../build/amaterasu/bin'
+}
+
+task copyToHome() {
+ dependsOn copyToHomeRoot
+ dependsOn copyToHomeBin
+}
+
+compileKotlin{
+ kotlinOptions.jvmTarget = "1.8"
+}
+
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
similarity index 60%
copy from leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
copy to leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
index b8c29b7..6af59d8 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
@@ -14,15 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.yarn;
+package org.apache.amaterasu.leader.yarn
-public class JobOpts {
- public String repo = "";
- public String branch = "master";
- public String env = "default";
- public String name = "amaterasu-job";
- public String jobId = null;
- public String newJobId = null;
- public String report ="code";
- public String home ="";
-}
\ No newline at end of file
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+
+class AppMasterArgsParser(val args: Array<String>): ArgsParser() {
+
+
+ override fun run() {
+
+ val appMaster = ApplicationMaster()
+ appMaster.address = MessagingClientUtil.borkerAddress
+ appMaster.broker.addConnector(appMaster.address)
+ appMaster.broker.start()
+
+ appMaster.execute(opts)
+ }
+}
+
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
new file mode 100644
index 0000000..92e5858
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.activemq.broker.BrokerService
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.execution.JobLoader
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.zookeeper.CreateMode
+import java.io.File
+import java.io.FileInputStream
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.LinkedBlockingQueue
+import javax.jms.MessageConsumer
+
+import kotlinx.coroutines.*
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+import java.nio.ByteBuffer
+
+class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler {
+ override fun onNodesUpdated(updatedNodes: MutableList<NodeReport>?) {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ override fun onShutdownRequest() {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ override fun getProgress(): Float {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ override fun onError(e: Throwable?) {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ override fun onContainersCompleted(statuses: MutableList<ContainerStatus>?) {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ lateinit var address: String
+
+ val broker: BrokerService = BrokerService()
+ private val conf = YarnConfiguration()
+ private val fs = FileSystem.get(conf)
+ private val propPath = System.getenv("PWD") + "/amaterasu.properties"
+ private val props = FileInputStream(File(propPath))
+ private val config = ClusterConfig.apply(props)
+ private val nmClient: NMClientAsync = NMClientAsyncImpl(YarnNMCallbackHandler())
+ private val actionsBuffer = ConcurrentLinkedQueue<ActionData>()
+
+ private lateinit var jobManager: JobManager
+ private lateinit var client: CuratorFramework
+ private lateinit var env: String
+ private lateinit var consumer: MessageConsumer
+ private lateinit var rmClient: AMRMClientAsync<AMRMClient.ContainerRequest>
+
+ fun execute(opts: AmaOpts) {
+
+ try {
+ initJob(opts)
+ } catch (e: Exception) {
+ log.error("error initializing ", e.message)
+ }
+
+ // now that the job was initiated, the curator client is Started and we can
+ // register the broker's address
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/${jobManager.jobId}/broker")
+ client.setData().forPath("/${jobManager.jobId}/broker", address.toByteArray())
+
+ // once the broker is registered, we can remove the barrier so clients can connect
+ log.info("/${jobManager.jobId}-report-barrier")
+ val barrier = DistributedBarrier(client, "/${jobManager.jobId}-report-barrier")
+ barrier.removeBarrier()
+
+ consumer = MessagingClientUtil.setupMessaging(address)
+
+ // Initialize clients to ResourceManager and NodeManagers
+ nmClient.init(conf)
+ nmClient.start()
+
+ rmClient = startRMClient()
+
+ val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
+ val maxMem = registrationResponse.maximumResourceCapability.memory
+ val maxVCores = registrationResponse.maximumResourceCapability.virtualCores
+
+
+ val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+
+ while (!jobManager.outOfActions) {
+ val capability = Records.newRecord(Resource::class.java)
+
+ val actionData = jobManager.nextActionData
+ if (actionData != null) {
+
+ val frameworkProvider = frameworkFactory.getFramework(actionData.groupId)
+ val driverConfiguration = frameworkProvider.driverConfiguration
+
+ var mem: Int = driverConfiguration.memory
+ mem = Math.min(mem, maxMem)
+ capability.memory = mem
+
+ var cpu = driverConfiguration.cpus
+ cpu = Math.min(cpu, maxVCores)
+ capability.virtualCores = cpu
+
+ requestContainer(actionData, capability)
+ }
+ }
+ }
+
+ private fun initJob(opts: AmaOpts) {
+
+ this.env = opts.env
+
+ try {
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ client = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy)
+ client.start()
+ } catch (e: Exception) {
+ log.error("Error connecting to zookeeper", e)
+ throw e
+ }
+
+ if (opts.jobId.isNotEmpty()) {
+ log.info("resuming job" + opts.jobId)
+ jobManager = JobLoader.reloadJob(
+ opts.jobId,
+ client,
+ config.Jobs().tasks().attempts(),
+ LinkedBlockingQueue<ActionData>())
+
+ } else {
+ log.info("new job is being created")
+ try {
+
+ jobManager = JobLoader.loadJob(
+ opts.repo,
+ opts.branch,
+ opts.newJobId,
+ client,
+ config.Jobs().tasks().attempts(),
+ LinkedBlockingQueue<ActionData>())
+ } catch (e: Exception) {
+ log.error("Error creating JobManager.", e)
+ throw e
+ }
+
+ }
+
+ jobManager.start()
+ log.info("Started jobManager")
+ }
+
+ override fun onContainersAllocated(containers: MutableList<Container>?) = runBlocking {
+ containers?.let {
+ for (container in it) {
+ if (actionsBuffer.isNotEmpty()) {
+ val actionData = actionsBuffer.poll()
+ val result = async {
+ val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+ val framework = frameworkFactory.getFramework(actionData.groupId)
+ val runnerProvider = framework.getRunnerProvider(actionData.typeId)
+ val ctx = Records.newRecord(ContainerLaunchContext::class.java)
+ val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, "${actionData.id}-${container.id.containerId}", address))
+
+ ctx.commands = commands
+ ctx.tokens = allTokens()
+ ctx.localResources = setupContainerResources(framework, runnerProvider)
+
+ nmClient.startContainerAsync(container, ctx)
+
+ }
+
+
+
+ }
+ }
+ }
+ }!!
+
+ private fun allTokens(): ByteBuffer {
+ // creating the credentials for container execution
+ val credentials = UserGroupInformation.getCurrentUser().credentials
+ val dob = DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+
+ // removing the AM->RM token so that containers cannot access it.
+ val iter = credentials.allTokens.iterator()
+ log.info("Executing with tokens:")
+ for (token in iter) {
+ log.info(token.toString())
+ if (token.kind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+ }
+ return ByteBuffer.wrap(dob.data, 0, dob.length)
+ }
+
+ /**
+ * Creates the map of resources to be copied into the container
+ * @framework The frameworkSetupProvider for the action
+ * @runnerProvider the actions runner provider
+ */
+ private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider): Map<String, LocalResource> {
+
+ val yarnJarPath = Path(config.yarn().hdfsJarsPath())
+
+ // Getting framework (group) resources
+ val result = framework.groupResources.map { it.path to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it.path))) }.toMap().toMutableMap()
+
+ // Getting runner resources
+ result.putAll(runnerProvider.runnerResources.map { it to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }.toMap())
+
+ // Adding the Amaterasu configuration files
+ result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties")))
+ result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties")))
+
+ return result
+ }
+
+ private fun createDistPath(path: String): Path = Path.mergePaths(Path("dist/"), Path(path))
+
+ private fun startRMClient(): AMRMClientAsync<AMRMClient.ContainerRequest> {
+ val client = AMRMClientAsync.createAMRMClientAsync<AMRMClient.ContainerRequest>(1000, this)
+ client.init(conf)
+ client.start()
+ return client
+ }
+
+ private fun createLocalResourceFromPath(path: Path): LocalResource {
+
+ val stat = fs.getFileStatus(path)
+ val fileResource = Records.newRecord(LocalResource::class.java)
+
+ fileResource.shouldBeUploadedToSharedCache = true
+ fileResource.visibility = LocalResourceVisibility.PUBLIC
+ fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+ fileResource.size = stat.len
+ fileResource.timestamp = stat.modificationTime
+ fileResource.type = LocalResourceType.FILE
+ fileResource.visibility = LocalResourceVisibility.PUBLIC
+ return fileResource
+
+ }
+
+ private fun requestContainer(actionData: ActionData, capability: Resource) {
+
+ actionsBuffer.add(actionData)
+ log.info("About to ask container for action ${actionData.id}. Action buffer size is: ${actionsBuffer.size}")
+
+ // we have an action to schedule, let's request a container
+ val priority: Priority = Records.newRecord(Priority::class.java)
+ priority.priority = 1
+ val containerReq = AMRMClient.ContainerRequest(capability, null, null, priority)
+ rmClient.addContainerRequest(containerReq)
+ log.info("Asked container for action ${actionData.id}")
+
+ }
+
+ companion object {
+ @JvmStatic
+ fun main(args: Array<String>) = AppMasterArgsParser(args).main(args)
+
+ }
+}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
new file mode 100644
index 0000000..48fb67b
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.hadoop.fs.*
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.client.api.YarnClientApplication
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.hadoop.yarn.util.Apps
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.log4j.LogManager
+import org.slf4j.LoggerFactory
+
+import javax.jms.*
+import java.io.File
+import java.io.FileInputStream
+import java.io.IOException
+import java.util.*
+
+import java.lang.System.exit
+
+class Client {
+ private val conf = YarnConfiguration()
+ private var fs: FileSystem? = null
+
+ @Throws(IOException::class)
+ private fun setLocalResourceFromPath(path: Path): LocalResource {
+
+ val stat = fs!!.getFileStatus(path)
+ val fileResource = Records.newRecord(LocalResource::class.java)
+ fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+ fileResource.size = stat.len
+ fileResource.timestamp = stat.modificationTime
+ fileResource.type = LocalResourceType.FILE
+ fileResource.visibility = LocalResourceVisibility.PUBLIC
+ return fileResource
+ }
+
+ @Throws(Exception::class)
+ fun run(opts: AmaOpts, args: Array<String>) {
+
+ LogManager.resetConfiguration()
+ val config = ClusterConfig()
+ config.load(FileInputStream(opts.home + "/amaterasu.properties"))
+
+ // Create yarnClient
+ val yarnClient = YarnClient.createYarnClient()
+ yarnClient.init(conf)
+ yarnClient.start()
+
+ // Create application via yarnClient
+ var app: YarnClientApplication? = null
+ try {
+ app = yarnClient.createApplication()
+ } catch (e: YarnException) {
+ LOGGER.error("Error initializing yarn application with yarn client.", e)
+ exit(1)
+ } catch (e: IOException) {
+ LOGGER.error("Error initializing yarn application with yarn client.", e)
+ exit(2)
+ }
+
+ // Setup jars on hdfs
+ try {
+ fs = FileSystem.get(conf)
+ } catch (e: IOException) {
+ LOGGER.error("Eror creating HDFS client isntance.", e)
+ exit(3)
+ }
+
+ val jarPath = Path(config.yarn().hdfsJarsPath())
+ val jarPathQualified = fs!!.makeQualified(jarPath)
+
+ val appContext = app!!.applicationSubmissionContext
+
+ var newId = ""
+ if (opts.jobId.isEmpty()) {
+ newId = "--new-job-id " + appContext.applicationId.toString() + "-" + UUID.randomUUID().toString()
+ }
+
+
+ val commands = listOf("env AMA_NODE=" + System.getenv("AMA_NODE") +
+ " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().userName +
+ " \$JAVA_HOME/bin/java" +
+ " -Dscala.usejavacp=false" +
+ " -Xmx2G" +
+ " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
+ joinStrings(args) +
+ newId +
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+
+ // Set up the container launch context for the application master
+ val amContainer = Records.newRecord(ContainerLaunchContext::class.java)
+ amContainer.commands = commands
+
+ // Setup local ama folder on hdfs.
+ try {
+
+ println("===> $jarPathQualified")
+ if (!fs!!.exists(jarPathQualified)) {
+ val home = File(opts.home)
+ fs!!.mkdirs(jarPathQualified)
+
+ for (f in home.listFiles()) {
+ fs!!.copyFromLocalFile(false, true, Path(f.getAbsolutePath()), jarPathQualified)
+ }
+
+ // setup frameworks
+ println("===> setting up frameworks")
+ val frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config)
+ for (group in frameworkFactory.groups()) {
+ println("===> setting up $group")
+ val framework = frameworkFactory.getFramework(group)
+
+ //creating a group folder
+ val frameworkPath = Path.mergePaths(jarPathQualified, Path("/" + framework.groupIdentifier))
+ println("===> $frameworkPath")
+
+ fs!!.mkdirs(frameworkPath)
+ for (file in framework.groupResources) {
+ if (file.exists())
+ file.let {
+ fs!!.copyFromLocalFile(false, true, Path(file.absolutePath), Path(it.path))
+ }
+
+ }
+ }
+ }
+
+ } catch (e: IOException) {
+ println("===>" + e.message)
+ LOGGER.error("Error uploading ama folder to HDFS.", e)
+ exit(3)
+ } catch (ne: NullPointerException) {
+ println("===>" + ne.message)
+ LOGGER.error("No files in home dir.", ne)
+ exit(4)
+ }
+
+ // get version of build
+ val version = config.version()
+
+ // get local resources pointers that will be set on the master container env
+ val leaderJarPath = String.format("/bin/leader-%s-all.jar", version)
+ LOGGER.info("Leader Jar path is: {}", leaderJarPath)
+ val mergedPath = Path.mergePaths(jarPath, Path(leaderJarPath))
+
+ // System.out.println("===> path: " + jarPathQualified);
+ LOGGER.info("Leader merged jar path is: {}", mergedPath)
+ var leaderJar: LocalResource? = null
+ var propFile: LocalResource? = null
+ var log4jPropFile: LocalResource? = null
+
+ try {
+ leaderJar = setLocalResourceFromPath(mergedPath)
+ propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/amaterasu.properties")))
+ log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/log4j.properties")))
+ } catch (e: IOException) {
+ LOGGER.error("Error initializing yarn local resources.", e)
+ exit(4)
+ }
+
+ // set local resource on master container
+ val localResources = HashMap<String, LocalResource>()
+ //localResources.put("leader.jar", leaderJar);
+ // making the bin folder's content available to the appMaster
+ val bin = fs!!.listFiles(Path.mergePaths(jarPath, Path("/bin")), true)
+
+ while (bin.hasNext()) {
+ val binFile = bin.next()
+ localResources[binFile.path.name] = setLocalResourceFromPath(binFile.path)
+ }
+
+ localResources["amaterasu.properties"] = propFile!!
+ localResources["log4j.properties"] = log4jPropFile!!
+ amContainer.localResources = localResources
+
+ // Setup CLASSPATH for ApplicationMaster
+ val appMasterEnv = HashMap<String, String>()
+ setupAppMasterEnv(appMasterEnv)
+ appMasterEnv["AMA_CONF_PATH"] = String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath())
+ amContainer.environment = appMasterEnv
+
+ // Set up resource type requirements for ApplicationMaster
+ val capability = Records.newRecord(Resource::class.java)
+ capability.memory = config.YARN().master().memoryMB()
+ capability.virtualCores = config.YARN().master().cores()
+
+ // Finally, set-up ApplicationSubmissionContext for the application
+ appContext.applicationName = "amaterasu-" + opts.name
+ appContext.amContainerSpec = amContainer
+ appContext.resource = capability
+ appContext.queue = config.YARN().queue()
+ appContext.priority = Priority.newInstance(1)
+
+ // Submit application
+ val appId = appContext.applicationId
+ LOGGER.info("Submitting application {}", appId)
+ try {
+ yarnClient.submitApplication(appContext)
+
+ } catch (e: YarnException) {
+ LOGGER.error("Error submitting application.", e)
+ exit(6)
+ } catch (e: IOException) {
+ LOGGER.error("Error submitting application.", e)
+ exit(7)
+ }
+
+ val client = CuratorFrameworkFactory.newClient(config.zk(),
+ ExponentialBackoffRetry(1000, 3))
+ client.start()
+
+ val newJobId = newId.replace("--new-job-id ", "")
+ println("===> /$newJobId-report-barrier")
+ val reportBarrier = DistributedBarrier(client, "/$newJobId-report-barrier")
+ reportBarrier.setBarrier()
+ reportBarrier.waitOnBarrier()
+
+ val address = String(client.data.forPath("/$newJobId/broker"))
+ println("===> $address")
+ setupReportListener(address)
+
+ var appReport: ApplicationReport? = null
+ var appState: YarnApplicationState
+
+ do {
+ try {
+ appReport = yarnClient.getApplicationReport(appId)
+ } catch (e: YarnException) {
+ LOGGER.error("Error getting application report.", e)
+ exit(8)
+ } catch (e: IOException) {
+ LOGGER.error("Error getting application report.", e)
+ exit(9)
+ }
+
+ appState = appReport!!.yarnApplicationState
+ if (isAppFinished(appState)) {
+ exit(0)
+ break
+ }
+ //LOGGER.info("Application not finished ({})", appReport.getProgress());
+ try {
+ Thread.sleep(100)
+ } catch (e: InterruptedException) {
+ LOGGER.error("Interrupted while waiting for job completion.", e)
+ exit(137)
+ }
+
+ } while (!isAppFinished(appState))
+
+ LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport!!.finalApplicationStatus, appReport.finishTime)
+ }
+
+ private fun isAppFinished(appState: YarnApplicationState): Boolean {
+ return appState == YarnApplicationState.FINISHED ||
+ appState == YarnApplicationState.KILLED ||
+ appState == YarnApplicationState.FAILED
+ }
+
+ @Throws(JMSException::class)
+ private fun setupReportListener(address: String) {
+
+ val cf = ActiveMQConnectionFactory(address)
+ val conn = cf.createConnection()
+ conn.start()
+
+ val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ //TODO: move to a const in common
+ val destination = session.createTopic("JOB.REPORT")
+
+ val consumer = session.createConsumer(destination)
+ consumer.messageListener =ActiveReportListener()
+
+ }
+
+ private fun setupAppMasterEnv(appMasterEnv: Map<String, String>) {
+ Apps.addToEnvironment(appMasterEnv,
+ ApplicationConstants.Environment.CLASSPATH.name,
+ ApplicationConstants.Environment.PWD.`$`() + File.separator + "*", File.pathSeparator)
+
+ for (c in conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ *YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name,
+ c.trim { it <= ' ' }, File.pathSeparator)
+ }
+ }
+
+ companion object {
+
+ private val LOGGER = LoggerFactory.getLogger(Client::class.java)
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) = ClientArgsParser(args).main(args)
+
+ private fun joinStrings(str: Array<String>): String {
+
+ val builder = StringBuilder()
+ for (s in str) {
+ builder.append(s)
+ builder.append(" ")
+ }
+ return builder.toString()
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
similarity index 69%
rename from leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
rename to leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
index b8c29b7..df3aed9 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.yarn;
+package org.apache.amaterasu.leader.yarn
-public class JobOpts {
- public String repo = "";
- public String branch = "master";
- public String env = "default";
- public String name = "amaterasu-job";
- public String jobId = null;
- public String newJobId = null;
- public String report ="code";
- public String home ="";
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+
+class ClientArgsParser(val args: Array<String>): ArgsParser() {
+
+ override fun run() {
+ val client = Client()
+ client.run(opts, args)
+ }
}
\ No newline at end of file
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
new file mode 100644
index 0000000..0702f01
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.amaterasu.common.logging.KLogging
+
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.yarn.api.records.ContainerId
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+
+
+
+class YarnNMCallbackHandler : KLogging() , NMClientAsync.CallbackHandler {
+
+ override fun onStartContainerError(containerId: ContainerId, t: Throwable) {
+ log.error("Container ${containerId.containerId} couldn't start.", t)
+ }
+
+ override fun onGetContainerStatusError(containerId: ContainerId, t: Throwable) {
+ log.error("Couldn't get status from container ${containerId.containerId}.", t)
+ }
+
+ override fun onContainerStatusReceived(containerId: ContainerId, containerStatus: ContainerStatus) {
+ log.info("Container ${containerId.containerId} has status of ${containerStatus.state}")
+ }
+
+ override fun onContainerStarted(containerId: ContainerId, allServiceResponse: Map<String, ByteBuffer>) {
+ log.info("Container ${containerId.containerId} Started")
+ }
+
+ override fun onStopContainerError(containerId: ContainerId, t: Throwable) {
+ log.error("Container ${containerId.containerId} has thrown an error", t)
+ }
+
+ override fun onContainerStopped(containerId: ContainerId) {
+ log.info("Container ${containerId.containerId} stopped")
+ }
+
+}
\ No newline at end of file
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
deleted file mode 100644
index 38a9c38..0000000
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn;
-
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-public class ArgsParser {
- private static Options getOptions() {
-
- Options options = new Options();
- options.addOption("r", "repo", true, "The git repo containing the job");
- options.addOption("b", "branch", true, "The branch to be executed (default is master)");
- options.addOption("e", "env", true, "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)");
- options.addOption("n", "name", true, "The name of the job");
- options.addOption("i", "job-id", true, "The jobId - should be passed only when resuming a job");
- options.addOption("j", "new-job-id", true, "The jobId - should never be passed by a user");
- options.addOption("r", "report", true, "The level of reporting");
- options.addOption("h", "home", true, "The level of reporting");
-
- return options;
- }
-
- public static JobOpts getJobOpts(String[] args) throws ParseException {
-
- CommandLineParser parser = new BasicParser();
- Options options = getOptions();
- CommandLine cli = parser.parse(options, args);
-
- JobOpts opts = new JobOpts();
- if (cli.hasOption("repo")) {
- opts.repo = cli.getOptionValue("repo");
- }
-
- if (cli.hasOption("branch")) {
- opts.branch = cli.getOptionValue("branch");
- }
-
- if (cli.hasOption("env")) {
- opts.env = cli.getOptionValue("env");
- }
-
- if (cli.hasOption("job-id")) {
- opts.jobId = cli.getOptionValue("job-id");
- }
- if (cli.hasOption("new-job-id")) {
- opts.newJobId = cli.getOptionValue("new-job-id");
- }
-
- if (cli.hasOption("report")) {
- opts.report = cli.getOptionValue("report");
- }
-
- if (cli.hasOption("home")) {
- opts.home = cli.getOptionValue("home");
- }
-
- if (cli.hasOption("name")) {
- opts.name = cli.getOptionValue("name");
- }
-
- return opts;
- }
-}
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
deleted file mode 100644
index 3b0e8c2..0000000
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.amaterasu.common.configuration.ClusterConfig;
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory;
-import org.apache.amaterasu.leader.common.utilities.ActiveReportListener;
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.LogManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.*;
-
-import static java.lang.System.exit;
-
-public class Client {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);
- private final Configuration conf = new YarnConfiguration();
- private FileSystem fs;
-
- private LocalResource setLocalResourceFromPath(Path path) throws IOException {
-
- FileStatus stat = fs.getFileStatus(path);
- LocalResource fileResource = Records.newRecord(LocalResource.class);
- fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
- fileResource.setSize(stat.getLen());
- fileResource.setTimestamp(stat.getModificationTime());
- fileResource.setType(LocalResourceType.FILE);
- fileResource.setVisibility(LocalResourceVisibility.PUBLIC);
- return fileResource;
- }
-
- private void run(JobOpts opts, String[] args) throws Exception {
-
- LogManager.resetConfiguration();
- ClusterConfig config = new ClusterConfig();
- config.load(new FileInputStream(opts.home + "/amaterasu.properties"));
-
- // Create yarnClient
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
-
- // Create application via yarnClient
- YarnClientApplication app = null;
- try {
- app = yarnClient.createApplication();
- } catch (YarnException e) {
- LOGGER.error("Error initializing yarn application with yarn client.", e);
- exit(1);
- } catch (IOException e) {
- LOGGER.error("Error initializing yarn application with yarn client.", e);
- exit(2);
- }
-
- // Setup jars on hdfs
- try {
- fs = FileSystem.get(conf);
- } catch (IOException e) {
- LOGGER.error("Eror creating HDFS client isntance.", e);
- exit(3);
- }
- Path jarPath = new Path(config.YARN().hdfsJarsPath());
- Path jarPathQualified = fs.makeQualified(jarPath);
-
- ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
-
- String newId = "";
- if (opts.jobId == null) {
- newId = "--new-job-id " + appContext.getApplicationId().toString() + "-" + UUID.randomUUID().toString();
- }
-
-
- List<String> commands = Collections.singletonList(
- "env AMA_NODE=" + System.getenv("AMA_NODE") +
- " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
- " $JAVA_HOME/bin/java" +
- " -Dscala.usejavacp=false" +
- " -Xmx2G" +
- " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
- joinStrings(args) +
- newId +
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
- );
-
-
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- amContainer.setCommands(commands);
-
- // Setup local ama folder on hdfs.
- try {
-
- System.out.println("===> " + jarPathQualified);
- if (!fs.exists(jarPathQualified)) {
- File home = new File(opts.home);
- fs.mkdirs(jarPathQualified);
-
- for (File f : home.listFiles()) {
- fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified);
- }
-
- // setup frameworks
- System.out.println("===> setting up frameworks");
- FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config);
- for (String group : frameworkFactory.groups()) {
- System.out.println("===> setting up " + group);
- FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
-
- //creating a group folder
- Path frameworkPath = Path.mergePaths(jarPathQualified, new Path("/" + framework.getGroupIdentifier()));
- System.out.println("===> " + frameworkPath.toString());
-
- fs.mkdirs(frameworkPath);
- for (File file : framework.getGroupResources()) {
- if (file.exists())
- fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), frameworkPath);
- }
- }
- }
- } catch (IOException e) {
- System.out.println("===>" + e.getMessage());
- LOGGER.error("Error uploading ama folder to HDFS.", e);
- exit(3);
- } catch (NullPointerException ne) {
- System.out.println("===>" + ne.getMessage());
- LOGGER.error("No files in home dir.", ne);
- exit(4);
- }
-
- // get version of build
- String version = config.version();
-
- // get local resources pointers that will be set on the master container env
- String leaderJarPath = String.format("/bin/leader-%s-all.jar", version);
- LOGGER.info("Leader Jar path is: {}", leaderJarPath);
- Path mergedPath = Path.mergePaths(jarPath, new Path(leaderJarPath));
-
- // System.out.println("===> path: " + jarPathQualified);
- LOGGER.info("Leader merged jar path is: {}", mergedPath);
- LocalResource leaderJar = null;
- LocalResource propFile = null;
- LocalResource log4jPropFile = null;
-
- try {
- leaderJar = setLocalResourceFromPath(mergedPath);
- propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")));
- log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")));
- } catch (IOException e) {
- LOGGER.error("Error initializing yarn local resources.", e);
- exit(4);
- }
-
- // set local resource on master container
- Map<String, LocalResource> localResources = new HashMap<>();
- //localResources.put("leader.jar", leaderJar);
- // making the bin folder's content available to the appMaster
- RemoteIterator<LocatedFileStatus> bin = fs.listFiles(Path.mergePaths(jarPath, new Path("/bin")), true);
-
- while (bin.hasNext()){
- LocatedFileStatus binFile = bin.next();
- localResources.put(binFile.getPath().getName(), setLocalResourceFromPath(binFile.getPath()));
- }
-
- localResources.put("amaterasu.properties", propFile);
- localResources.put("log4j.properties", log4jPropFile);
- amContainer.setLocalResources(localResources);
-
- // Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<>();
- setupAppMasterEnv(appMasterEnv);
- appMasterEnv.put("AMA_CONF_PATH", String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath()));
- amContainer.setEnvironment(appMasterEnv);
-
- // Set up resource type requirements for ApplicationMaster
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(config.YARN().master().memoryMB());
- capability.setVirtualCores(config.YARN().master().cores());
-
- // Finally, set-up ApplicationSubmissionContext for the application
- appContext.setApplicationName("amaterasu-" + opts.name);
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(capability);
- appContext.setQueue(config.YARN().queue());
- appContext.setPriority(Priority.newInstance(1));
-
- // Submit application
- ApplicationId appId = appContext.getApplicationId();
- LOGGER.info("Submitting application {}", appId);
- try {
- yarnClient.submitApplication(appContext);
-
- } catch (YarnException e) {
- LOGGER.error("Error submitting application.", e);
- exit(6);
- } catch (IOException e) {
- LOGGER.error("Error submitting application.", e);
- exit(7);
- }
-
- CuratorFramework client = CuratorFrameworkFactory.newClient(config.zk(),
- new ExponentialBackoffRetry(1000, 3));
- client.start();
-
- String newJobId = newId.replace("--new-job-id ", "");
- System.out.println("===> /" + newJobId + "-report-barrier");
- DistributedBarrier reportBarrier = new DistributedBarrier(client, "/" + newJobId + "-report-barrier");
- reportBarrier.setBarrier();
- reportBarrier.waitOnBarrier();
-
- String address = new String(client.getData().forPath("/" + newJobId + "/broker"));
- System.out.println("===> " + address);
- setupReportListener(address);
-
- ApplicationReport appReport = null;
- YarnApplicationState appState;
-
- do {
- try {
- appReport = yarnClient.getApplicationReport(appId);
- } catch (YarnException e) {
- LOGGER.error("Error getting application report.", e);
- exit(8);
- } catch (IOException e) {
- LOGGER.error("Error getting application report.", e);
- exit(9);
- }
- appState = appReport.getYarnApplicationState();
- if (isAppFinished(appState)) {
- exit(0);
- break;
- }
- //LOGGER.info("Application not finished ({})", appReport.getProgress());
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- LOGGER.error("Interrupted while waiting for job completion.", e);
- exit(137);
- }
- } while (!isAppFinished(appState));
-
- LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime());
- }
-
- private boolean isAppFinished(YarnApplicationState appState) {
- return appState == YarnApplicationState.FINISHED ||
- appState == YarnApplicationState.KILLED ||
- appState == YarnApplicationState.FAILED;
- }
-
- public static void main(String[] args) throws Exception {
- Client c = new Client();
-
- JobOpts opts = ArgsParser.getJobOpts(args);
-
- c.run(opts, args);
- }
-
- private static String joinStrings(String[] str) {
-
- StringBuilder builder = new StringBuilder();
- for (String s : str) {
- builder.append(s);
- builder.append(" ");
- }
- return builder.toString();
-
- }
-
- private void setupReportListener(String address) throws JMSException {
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(address);
- Connection conn = cf.createConnection();
- conn.start();
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- //TODO: move to a const in common
- Topic destination = session.createTopic("JOB.REPORT");
-
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(new ActiveReportListener());
-
- }
-
- private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
- Apps.addToEnvironment(appMasterEnv,
- ApplicationConstants.Environment.CLASSPATH.name(),
- ApplicationConstants.Environment.PWD.$() + File.separator + "*", File.pathSeparator);
-
- for (String c : conf.getStrings(
- YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(),
- c.trim(), File.pathSeparator);
- }
- }
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 771eb15..979709a 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -1,483 +1,483 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn
-
-import java.io.{File, FileInputStream, InputStream}
-import java.net.{InetAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-
-import javax.jms.MessageConsumer
-import org.apache.activemq.broker.BrokerService
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.execution.JobManager
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
-import org.apache.amaterasu.leader.execution.JobLoader
-import org.apache.amaterasu.leader.utilities.Args
-import org.apache.curator.framework.recipes.barriers.DistributedBarrier
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
-import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.{concurrent, mutable}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
-
- var capability: Resource = _
-
- log.info("ApplicationMaster start")
-
- private var jobManager: JobManager = _
- private var client: CuratorFramework = _
- private var config: ClusterConfig = _
- private var env: String = _
- private var branch: String = _
- private var fs: FileSystem = _
- private var conf: YarnConfiguration = _
- private var propPath: String = ""
- private var props: InputStream = _
- private var jarPath: Path = _
- private var executorPath: Path = _
- private var executorJar: LocalResource = _
- private var propFile: LocalResource = _
- private var log4jPropFile: LocalResource = _
- private var nmClient: NMClientAsync = _
- private var allocListener: YarnRMCallbackHandler = _
- private var rmClient: AMRMClientAsync[ContainerRequest] = _
- private var address: String = _
- private var consumer: MessageConsumer = _
-
- private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
- private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
- private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
- private val host: String = InetAddress.getLocalHost.getHostName
- private val broker: BrokerService = new BrokerService()
-
-
- def setLocalResourceFromPath(path: Path): LocalResource = {
-
- val stat = fs.getFileStatus(path)
- val fileResource = Records.newRecord(classOf[LocalResource])
-
- fileResource.setShouldBeUploadedToSharedCache(true)
- fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
- fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
- fileResource.setSize(stat.getLen)
- fileResource.setTimestamp(stat.getModificationTime)
- fileResource.setType(LocalResourceType.FILE)
- fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
- fileResource
-
- }
-
- def execute(arguments: Args): Unit = {
-
- log.info(s"Started AM with args $arguments")
-
- propPath = System.getenv("PWD") + "/amaterasu.properties"
- props = new FileInputStream(new File(propPath))
-
- // no need for hdfs double check (nod to Aaron Rodgers)
- // jars on HDFS should have been verified by the YARN client
- conf = new YarnConfiguration()
- fs = FileSystem.get(conf)
-
- config = ClusterConfig(props)
-
- try {
- initJob(arguments)
- } catch {
- case e: Exception => log.error("error initializing ", e.getMessage)
- }
-
- // now that the job was initiated, the curator client is Started and we can
- // register the broker's address
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
- client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
-
- // once the broker is registered, we can remove the barrier so clients can connect
- log.info(s"/${jobManager.getJobId}-report-barrier")
- val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
- barrier.removeBarrier()
-
- consumer = MessagingClientUtil.setupMessaging(address)
-
- log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
-
- jarPath = new Path(config.YARN.hdfsJarsPath)
-
- // TODO: change this to read all dist folder and add to exec path
- executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
- log.info("Executor jar path is {}", executorPath)
- executorJar = setLocalResourceFromPath(executorPath)
- propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")))
- log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")))
-
- log.info("Started execute")
-
- nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
-
- // Initialize clients to ResourceManager and NodeManagers
- nmClient.init(conf)
- nmClient.start()
-
- // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
- allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
-
- rmClient = startRMClient()
- val registrationResponse = registerAppMaster("", 0, "")
- val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
- log.info("Max mem capability of resources in this cluster " + maxMem)
- val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
- log.info("Max vcores capability of resources in this cluster " + maxVCores)
- log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
-
- // Resource requirements for worker containers
- this.capability = Records.newRecord(classOf[Resource])
- val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
-
- while (!jobManager.getOutOfActions) {
- val actionData = jobManager.getNextActionData
- if (actionData != null) {
-
- val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
- val driverConfiguration = frameworkProvider.getDriverConfiguration
-
- var mem: Int = driverConfiguration.getMemory
- mem = Math.min(mem, maxMem)
- this.capability.setMemory(mem)
-
- var cpu = driverConfiguration.getCpus
- cpu = Math.min(cpu, maxVCores)
- this.capability.setVirtualCores(cpu)
-
- askContainer(actionData)
- }
- }
-
- log.info("Finished asking for containers")
- }
-
- private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
- val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
- client.init(conf)
- client.start()
- client
- }
-
- private def registerAppMaster(host: String, port: Int, url: String) = {
- // Register with ResourceManager
- log.info("Registering application")
- val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
- log.info("Registered application")
- registrationResponse
- }
-
-// private def setupMessaging(jobId: String): Unit = {
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.leader.yarn
//
-// val cf = new ActiveMQConnectionFactory(address)
-// val conn = cf.createConnection()
-// conn.start()
+//import java.io.{File, FileInputStream, InputStream}
+//import java.net.{InetAddress, ServerSocket}
+//import java.nio.ByteBuffer
+//import java.util
+//import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
//
-// val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
-// //TODO: move to a const in common
-// val destination = session.createTopic("JOB.REPORT")
+//import javax.jms.MessageConsumer
+//import org.apache.activemq.broker.BrokerService
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.ActionData
+//import org.apache.amaterasu.common.logging.Logging
+//import org.apache.amaterasu.leader.common.execution.JobManager
+//import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+//import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+//import org.apache.amaterasu.leader.execution.JobLoader
+//import org.apache.amaterasu.leader.utilities.Args
+//import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+//import org.apache.curator.retry.ExponentialBackoffRetry
+//import org.apache.hadoop.fs.{FileSystem, Path}
+//import org.apache.hadoop.io.DataOutputBuffer
+//import org.apache.hadoop.security.UserGroupInformation
+//import org.apache.hadoop.yarn.api.records._
+//import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+//import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+//import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
+//import org.apache.hadoop.yarn.conf.YarnConfiguration
+//import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+//import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+//import org.apache.zookeeper.CreateMode
//
-// val consumer = session.createConsumer(destination)
-// consumer.setMessageListener(new ActiveReportListener)
+//import scala.collection.JavaConversions._
+//import scala.collection.JavaConverters._
+//import scala.collection.{concurrent, mutable}
+//import scala.concurrent.ExecutionContext.Implicits.global
+//import scala.concurrent.Future
+//import scala.util.{Failure, Success}
+//
+//class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
+//
+// var capability: Resource = _
+//
+// log.info("ApplicationMaster start")
+//
+// private var jobManager: JobManager = _
+// private var client: CuratorFramework = _
+// private var config: ClusterConfig = _
+// private var env: String = _
+// private var branch: String = _
+// private var fs: FileSystem = _
+// private var conf: YarnConfiguration = _
+// private var propPath: String = ""
+// private var props: InputStream = _
+// private var jarPath: Path = _
+// private var executorPath: Path = _
+// private var executorJar: LocalResource = _
+// private var propFile: LocalResource = _
+// private var log4jPropFile: LocalResource = _
+// private var nmClient: NMClientAsync = _
+// private var allocListener: YarnRMCallbackHandler = _
+// private var rmClient: AMRMClientAsync[ContainerRequest] = _
+// private var address: String = _
+// private var consumer: MessageConsumer = _
+//
+// private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
+// private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
+// private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
+// private val host: String = InetAddress.getLocalHost.getHostName
+// private val broker: BrokerService = new BrokerService()
+//
+//
+// def setLocalResourceFromPath(path: Path): LocalResource = {
+//
+// val stat = fs.getFileStatus(path)
+// val fileResource = Records.newRecord(classOf[LocalResource])
+//
+// fileResource.setShouldBeUploadedToSharedCache(true)
+// fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
+// fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
+// fileResource.setSize(stat.getLen)
+// fileResource.setTimestamp(stat.getModificationTime)
+// fileResource.setType(LocalResourceType.FILE)
+// fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
+// fileResource
//
// }
-
- private def askContainer(actionData: ActionData): Unit = {
-
- actionsBuffer.add(actionData)
- log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
-
- // we have an action to schedule, let's request a container
- val priority: Priority = Records.newRecord(classOf[Priority])
- priority.setPriority(1)
- val containerReq = new ContainerRequest(capability, null, null, priority)
- rmClient.addContainerRequest(containerReq)
- log.info(s"Asked container for action ${actionData.getId}")
-
- }
-
- override def onContainersAllocated(containers: util.List[Container]): Unit = {
-
- log.info(s"${containers.size()} Containers allocated")
- for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
- if (actionsBuffer.isEmpty) {
- log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${containers.map(c => c.getId.getContainerId)}")
- return
- }
-
- val actionData = actionsBuffer.poll()
- val containerTask = Future[ActionData] {
-
- val frameworkFactory = FrameworkProvidersFactory(env, config)
- val framework = frameworkFactory.getFramework(actionData.getGroupId)
- val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
- val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
-
- log.info("Running container id {}.", container.getId.getContainerId)
- log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
-
- ctx.setCommands(commands)
- ctx.setTokens(allTokens)
-
- val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
-
- //TODO Eyal - Remove the hardcoding of the dist path
- /* val resources = mutable.Map[String, LocalResource]()
- val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
- while (binaryFileIter.hasNext) {
- val eachFile = binaryFileIter.next().getPath
- resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
- }
- resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
- resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
-
- val resources = mutable.Map[String, LocalResource](
- "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
- "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
- "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
- "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
- "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
- // TODO: Nadav/Eyal all of these should move to the executor resource setup
- "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
- "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
- "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
- "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
- "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
-
- //adding the framework and executor resources
- setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
- setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
-
- ctx.setLocalResources(resources)
-
- ctx.setEnvironment(Map[String, String](
- "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
- "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
- "AMA_NODE" -> sys.env("AMA_NODE"),
- "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
- ))
-
- log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
- nmClient.startContainerAsync(container, ctx)
- actionData
- }
-
- containerTask onComplete {
- case Failure(t) =>
- log.error(s"launching container Failed", t)
- askContainer(actionData)
-
- case Success(requestedActionData) =>
- jobManager.actionStarted(requestedActionData.getId)
- containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
- log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
-
- }
- }
- }
-
- private def allTokens: ByteBuffer = {
- // creating the credentials for container execution
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- val dob = new DataOutputBuffer
- credentials.writeTokenStorageToStream(dob)
-
- // removing the AM->RM token so that containers cannot access it.
- val iter = credentials.getAllTokens.iterator
- log.info("Executing with tokens:")
- for (token <- iter) {
- log.info(token.toString)
- if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
- }
- ByteBuffer.wrap(dob.getData, 0, dob.getLength)
- }
-
- private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
-
- val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
-
- if (fs.exists(sourcePath)) {
-
- val files = fs.listFiles(sourcePath, true)
-
- while (files.hasNext) {
- val res = files.next()
- val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
- countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
- }
- }
- }
-
- def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = {
- import java.io.IOException
-
- import org.apache.hadoop.yarn.exceptions.YarnException
- try
- rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
- catch {
- case ex: YarnException =>
- log.error("Failed to unregister application", ex)
- case e: IOException =>
- log.error("Failed to unregister application", e)
- }
- rmClient.stop()
- nmClient.stop()
- }
-
- override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
-
- for (status <- statuses.asScala) {
-
- if (status.getState == ContainerState.COMPLETE) {
-
- val containerId = status.getContainerId.getContainerId
- val task = containersIdsToTask(containerId)
- rmClient.releaseAssignedContainer(status.getContainerId)
-
- val taskId = task.getId
- if (status.getExitStatus == 0) {
-
- //completedContainersAndTaskIds.put(containerId, task.id)
- jobManager.actionComplete(taskId)
- log.info(s"Container $containerId Complete with task ${taskId} with success.")
- } else {
- // TODO: Check the getDiagnostics value and see if appropriate
- jobManager.actionFailed(taskId, status.getDiagnostics)
- log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})")
- }
- }
- }
-
- if (jobManager.getOutOfActions) {
- log.info("Finished all tasks successfully! Wow!")
- jobManager.actionsCount()
- stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
- } else {
- log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
- }
- }
-
- override def getProgress: Float = {
- jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
- }
-
- override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
- log.info("Nodes change. Nothing to report.")
- }
-
- override def onShutdownRequest(): Unit = {
- log.error("Shutdown requested.")
- stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
- }
-
- override def onError(e: Throwable): Unit = {
- log.error("Error on AM", e)
- stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
- }
-
- def initJob(args: Args): Unit = {
-
- this.env = args.env
- this.branch = args.branch
- try {
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
- client.start()
- } catch {
- case e: Exception =>
- log.error("Error connecting to zookeeper", e)
- throw e
- }
- if (args.jobId != null && !args.jobId.isEmpty) {
- log.info("resuming job" + args.jobId)
- jobManager = JobLoader.reloadJob(
- args.jobId,
- client,
- config.Jobs.Tasks.attempts,
- new LinkedBlockingQueue[ActionData])
-
- } else {
- log.info("new job is being created")
- try {
-
- jobManager = JobLoader.loadJob(
- args.repo,
- args.branch,
- args.newJobId,
- client,
- config.Jobs.Tasks.attempts,
- new LinkedBlockingQueue[ActionData])
- } catch {
- case e: Exception =>
- log.error("Error creating JobManager.", e)
- throw e
- }
-
- }
-
- jobManager.start()
- log.info("Started jobManager")
- }
-}
-
-object ApplicationMaster extends Logging with App {
-
-
- val parser = Args.getParser
- parser.parse(args, Args()) match {
-
- case Some(arguments: Args) =>
- val appMaster = new ApplicationMaster()
-
- appMaster.address = MessagingClientUtil.getBorkerAddress
- appMaster.broker.addConnector(appMaster.address)
- appMaster.broker.start()
-
- log.info(s"broker Started with address ${appMaster.address}")
- appMaster.execute(arguments)
-
- case None =>
- }
-
-
-}
+//
+// def execute(arguments: Args): Unit = {
+//
+// log.info(s"Started AM with args $arguments")
+//
+// propPath = System.getenv("PWD") + "/amaterasu.properties"
+// props = new FileInputStream(new File(propPath))
+//
+// // no need for hdfs double check (nod to Aaron Rodgers)
+// // jars on HDFS should have been verified by the YARN client
+// conf = new YarnConfiguration()
+// fs = FileSystem.get(conf)
+//
+// config = ClusterConfig(props)
+//
+// try {
+// initJob(arguments)
+// } catch {
+// case e: Exception => log.error("error initializing ", e.getMessage)
+// }
+//
+// // now that the job was initiated, the curator client is Started and we can
+// // register the broker's address
+// client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
+// client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
+//
+// // once the broker is registered, we can remove the barrier so clients can connect
+// log.info(s"/${jobManager.getJobId}-report-barrier")
+// val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
+// barrier.removeBarrier()
+//
+// consumer = MessagingClientUtil.setupMessaging(address)
+//
+// log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
+//
+// jarPath = new Path(config.YARN.hdfsJarsPath)
+//
+// // TODO: change this to read all dist folder and add to exec path
+// executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
+// log.info("Executor jar path is {}", executorPath)
+// executorJar = setLocalResourceFromPath(executorPath)
+// propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")))
+// log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")))
+//
+// log.info("Started execute")
+//
+// nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
+//
+// // Initialize clients to ResourceManager and NodeManagers
+// nmClient.init(conf)
+// nmClient.start()
+//
+// // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
+// allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
+//
+// rmClient = startRMClient()
+// val registrationResponse = registerAppMaster("", 0, "")
+// val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
+// log.info("Max mem capability of resources in this cluster " + maxMem)
+// val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
+// log.info("Max vcores capability of resources in this cluster " + maxVCores)
+// log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
+//
+// // Resource requirements for worker containers
+// this.capability = Records.newRecord(classOf[Resource])
+// val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+//
+// while (!jobManager.getOutOfActions) {
+// val actionData = jobManager.getNextActionData
+// if (actionData != null) {
+//
+// val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
+// val driverConfiguration = frameworkProvider.getDriverConfiguration
+//
+// var mem: Int = driverConfiguration.getMemory
+// mem = Math.min(mem, maxMem)
+// this.capability.setMemory(mem)
+//
+// var cpu = driverConfiguration.getCpus
+// cpu = Math.min(cpu, maxVCores)
+// this.capability.setVirtualCores(cpu)
+//
+// askContainer(actionData)
+// }
+// }
+//
+// log.info("Finished asking for containers")
+// }
+//
+// private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
+// val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
+// client.init(conf)
+// client.start()
+// client
+// }
+//
+// private def registerAppMaster(host: String, port: Int, url: String) = {
+// // Register with ResourceManager
+// log.info("Registering application")
+// val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
+// log.info("Registered application")
+// registrationResponse
+// }
+//
+//// private def setupMessaging(jobId: String): Unit = {
+////
+//// val cf = new ActiveMQConnectionFactory(address)
+//// val conn = cf.createConnection()
+//// conn.start()
+////
+//// val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
+//// //TODO: move to a const in common
+//// val destination = session.createTopic("JOB.REPORT")
+////
+//// val consumer = session.createConsumer(destination)
+//// consumer.setMessageListener(new ActiveReportListener)
+////
+//// }
+//
+// private def askContainer(actionData: ActionData): Unit = {
+//
+// actionsBuffer.add(actionData)
+// log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
+//
+// // we have an action to schedule, let's request a container
+// val priority: Priority = Records.newRecord(classOf[Priority])
+// priority.setPriority(1)
+// val containerReq = new ContainerRequest(capability, null, null, priority)
+// rmClient.addContainerRequest(containerReq)
+// log.info(s"Asked container for action ${actionData.getId}")
+//
+// }
+//
+// override def onContainersAllocated(containers: util.List[Container]): Unit = {
+//
+// log.info(s"${containers.size()} Containers allocated")
+// for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
+// if (actionsBuffer.isEmpty) {
+// log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${containers.map(c => c.getId.getContainerId)}")
+// return
+// }
+//
+// val actionData = actionsBuffer.poll()
+// val containerTask = Future[ActionData] {
+//
+// val frameworkFactory = FrameworkProvidersFactory(env, config)
+// val framework = frameworkFactory.getFramework(actionData.getGroupId)
+// val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
+// val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+// val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
+//
+// log.info("Running container id {}.", container.getId.getContainerId)
+// log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
+//
+// ctx.setCommands(commands)
+// ctx.setTokens(allTokens)
+//
+// val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
+//
+// //TODO Eyal - Remove the hardcoding of the dist path
+// /* val resources = mutable.Map[String, LocalResource]()
+// val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
+// while (binaryFileIter.hasNext) {
+// val eachFile = binaryFileIter.next().getPath
+// resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
+// }
+// resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
+// resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
+//
+// val resources = mutable.Map[String, LocalResource](
+// "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
+// "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
+// "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
+// "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
+// "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
+// // TODO: Nadav/Eyal all of these should move to the executor resource setup
+// "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
+// "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
+// "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
+// "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
+// "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
+//
+// //adding the framework and executor resources
+// setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
+// setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
+//
+// ctx.setLocalResources(resources)
+//
+// ctx.setEnvironment(Map[String, String](
+// "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
+// "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
+// "AMA_NODE" -> sys.env("AMA_NODE"),
+// "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
+// ))
+//
+// log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
+// nmClient.startContainerAsync(container, ctx)
+// actionData
+// }
+//
+// containerTask onComplete {
+// case Failure(t) =>
+// log.error(s"launching container Failed", t)
+// askContainer(actionData)
+//
+// case Success(requestedActionData) =>
+// jobManager.actionStarted(requestedActionData.getId)
+// containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
+// log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
+//
+// }
+// }
+// }
+//
+// private def allTokens: ByteBuffer = {
+// // creating the credentials for container execution
+// val credentials = UserGroupInformation.getCurrentUser.getCredentials
+// val dob = new DataOutputBuffer
+// credentials.writeTokenStorageToStream(dob)
+//
+// // removing the AM->RM token so that containers cannot access it.
+// val iter = credentials.getAllTokens.iterator
+// log.info("Executing with tokens:")
+// for (token <- iter) {
+// log.info(token.toString)
+// if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+// }
+// ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+// }
+//
+// private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
+//
+// val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
+//
+// if (fs.exists(sourcePath)) {
+//
+// val files = fs.listFiles(sourcePath, true)
+//
+// while (files.hasNext) {
+// val res = files.next()
+// val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
+// countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
+// }
+// }
+// }
+//
+// def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = {
+// import java.io.IOException
+//
+// import org.apache.hadoop.yarn.exceptions.YarnException
+// try
+// rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
+// catch {
+// case ex: YarnException =>
+// log.error("Failed to unregister application", ex)
+// case e: IOException =>
+// log.error("Failed to unregister application", e)
+// }
+// rmClient.stop()
+// nmClient.stop()
+// }
+//
+// override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
+//
+// for (status <- statuses.asScala) {
+//
+// if (status.getState == ContainerState.COMPLETE) {
+//
+// val containerId = status.getContainerId.getContainerId
+// val task = containersIdsToTask(containerId)
+// rmClient.releaseAssignedContainer(status.getContainerId)
+//
+// val taskId = task.getId
+// if (status.getExitStatus == 0) {
+//
+// //completedContainersAndTaskIds.put(containerId, task.id)
+// jobManager.actionComplete(taskId)
+// log.info(s"Container $containerId Complete with task ${taskId} with success.")
+// } else {
+// // TODO: Check the getDiagnostics value and see if appropriate
+// jobManager.actionFailed(taskId, status.getDiagnostics)
+// log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})")
+// }
+// }
+// }
+//
+// if (jobManager.getOutOfActions) {
+// log.info("Finished all tasks successfully! Wow!")
+// jobManager.actionsCount()
+// stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
+// } else {
+// log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
+// }
+// }
+//
+// override def getProgress: Float = {
+// jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
+// }
+//
+// override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
+// log.info("Nodes change. Nothing to report.")
+// }
+//
+// override def onShutdownRequest(): Unit = {
+// log.error("Shutdown requested.")
+// stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
+// }
+//
+// override def onError(e: Throwable): Unit = {
+// log.error("Error on AM", e)
+// stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
+// }
+//
+// def initJob(args: Args): Unit = {
+//
+// this.env = args.env
+// this.branch = args.branch
+// try {
+// val retryPolicy = new ExponentialBackoffRetry(1000, 3)
+// client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
+// client.start()
+// } catch {
+// case e: Exception =>
+// log.error("Error connecting to zookeeper", e)
+// throw e
+// }
+// if (args.jobId != null && !args.jobId.isEmpty) {
+// log.info("resuming job" + args.jobId)
+// jobManager = JobLoader.reloadJob(
+// args.jobId,
+// client,
+// config.Jobs.Tasks.attempts,
+// new LinkedBlockingQueue[ActionData])
+//
+// } else {
+// log.info("new job is being created")
+// try {
+//
+// jobManager = JobLoader.loadJob(
+// args.repo,
+// args.branch,
+// args.newJobId,
+// client,
+// config.Jobs.Tasks.attempts,
+// new LinkedBlockingQueue[ActionData])
+// } catch {
+// case e: Exception =>
+// log.error("Error creating JobManager.", e)
+// throw e
+// }
+//
+// }
+//
+// jobManager.start()
+// log.info("Started jobManager")
+// }
+//}
+//
+//object ApplicationMaster extends Logging with App {
+//
+//
+// val parser = Args.getParser
+// parser.parse(args, Args()) match {
+//
+// case Some(arguments: Args) =>
+// val appMaster = new ApplicationMaster()
+//
+// appMaster.address = MessagingClientUtil.getBorkerAddress
+// appMaster.broker.addConnector(appMaster.address)
+// appMaster.broker.start()
+//
+// log.info(s"broker Started with address ${appMaster.address}")
+// appMaster.execute(arguments)
+//
+// case None =>
+// }
+//
+//
+//}
diff --git a/sdk/build.gradle b/sdk/build.gradle
index 585b1c0..3cc9227 100644
--- a/sdk/build.gradle
+++ b/sdk/build.gradle
@@ -15,7 +15,7 @@
* limitations under the License.
*/
buildscript {
- ext.kotlin_version = '1.2.71'
+ ext.kotlin_version = '1.3.21'
repositories {
mavenCentral()