Merge branch 'master' into version-0.2.0-incubating-rc3
diff --git a/.gitignore b/.gitignore
index a64ebd0..fabb847 100755
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,18 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
 ### Scala ###
 *.class
 *.log
diff --git a/.travis.yml b/.travis.yml
index 0613a29..fa3b8d3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
 language: scala
 cache:
   directories:
@@ -7,12 +25,13 @@
 before_install:
 - chmod +x gradlew
 script:
-- "./gradlew buildDistribution"
+- "./gradlew clean test buildDistribution"
 deploy:
   provider: releases
   api_key:
     secure: LgUGiVEjuFmAepc1XDIEPovYTh6zqn+/q/HXW0AZfwv4RoxROuoXFS8AgG6y1PY3QnbfoxCa2FsvVBwP3MupJyWlMKzZBWp9I1bFEbf4Zmn7Lb5hBJieWrgwvMN/R31YybiA9wBlhL32uN1JN3dvNQIIqwGjRzBS4Hm6db+RtnKiZFTn8p5eB5loyEu32QIlx9PWTLPSVZ/ZEqJlxFE7B5XBRCiy4+JQavFZ9fgg2DmuH5erqBGpv5W/Bn0ekKnj9NA4QaoksNnQSFx+MtL+mvJN/jACZpHF2ACjtMr01KoAUIKC2G8cnX2HVp/CBw2qG5UtTNnlh/ECpG/dtZfKaRmbZhoIxy9clf/PgiBQFdGHJKLrdN2Jpc8wG4aLDM2rRQ0k7oVp0AMWXAB1oZJ7MQcZeiwyAOo/HcU298iGuETRWMghlwEHy66iDzB+3xvxzROC2mVx0cvZE7095PkxJeddKBSpYXRQcdONkNj+b46IenpEj611UqvDilmCikhpIxqFxop24/eeTFOpmREf6JRo1dhMg0e8LwuxU5d4ZiwWx2xwlrZs2wKxrHygx/R8UD08FyntK4vaY6mTfEhcFkPuGb4kUnyrLv4fFq7oH96QXzhTtfftutxMTyLL+Y2+aJTbTeQPo6wpJo3Qn5vEsodzU9ytNV1+yvuqFfPgHtw=
-  file: 'build/distributions/apache-amaterasu-0.2.0-incubating.tar'
+  file_glob: true
+  file: 'build/distributions/apache-amaterasu-*.tar'
   skip_cleanup: true
   on:
     tags: true
diff --git a/LICENSE b/LICENSE
index 13f2054..4248414 100644
--- a/LICENSE
+++ b/LICENSE
@@ -200,26 +200,28 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 
-For persistence-elasticsearch/plugins/security/src/main/java/org/apache/unomi/elasticsearch/plugin/security/IPRangeMatcher.java :
+============================================================================
+   APACHE AMATERASU SUBCOMPONENTS:
 
-The MIT License
+   The Apache Amaterasu project contains subcomponents with separate copyright
+   notices and license terms. Your use of the source code for the these
+   subcomponents is subject to the terms and conditions of the following
+   licenses.
 
-Copyright (c) 2013 Edin Dazdarevic (edin.dazdarevic@gmail.com)
+========================================================================
+BSD licenses
+========================================================================
+The following components are provided under the BSD license.
+See file headers and project links for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
 
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
+    (BSD License) codegen (https://github.com/andreif/codegen)
 
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
+========================================================================
+BSD 3-Clause licenses
+========================================================================
+The following components are provided under the BSD 2-Clause license.
+See file headers and project links for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
 
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
\ No newline at end of file
+    (BSD 3 License) Conda (https://conda.io/docs/)
\ No newline at end of file
diff --git a/ama-key b/ama-key
deleted file mode 100644
index 645e687..0000000
--- a/ama-key
+++ /dev/null
@@ -1,51 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIJKQIBAAKCAgEAr2LrzGISOdddwPvua6ae7TssYVu+MBlMwv7d1fG6pusVbq6P
-Ah6HTTbObsoZ4NH3x6gvFxSC2nFp1um9Z0QlyHODUxiUO5v+p6wtuxFySORm5N03
-sjh8xJPRK3n/LhSglyD98Iy0cbGdP51r7+kZGyHk0YfUSjKicGkQAbf1V2wXBzKr
-qynjSHTppQeH5yAiym/IJuanbrpTW0H2JqA4uyV7FkxSiFYApMuHOZ3NUsPk+Evd
-2tkfhqZaw9scaG22icdzypEyDF4See+aMUfb+2WWa4G9ikVbislzTaA7di3MYeNz
-R9lI7QmA2BtqBm4oV7dninLbIrFL5Zp+70m011WdWjQ39D0bRhAfB+R5igxqzeuT
-VOeXMuTWJKo8f1jyRshr/urAB86Di5zzZK2dY9bqUUMLGcY/QU4GgB5FORVawN+K
-y6AhI+AnG/KmmuC0o0cTTkKMa2BUlY7aq7QCT+hqezM+/sOPDYm/WrtyW4Bfc/VR
-vvfc3b9TLk9aUT/2DgZHg/2/j3Odjo7ZXxAqnelxIjgOLMU4RcoI7fVu0STcbnpf
-AAD9DvwScuH0Y0q03BfTtG7BwR6Yiw5mIHoZuNq8c2NebGSYIDN1ZJGq7B5q/26z
-Ml6Rz8SfUGd0qISHntdkpvLBHxohXqC+G8b15GLWX1n2JBsrhFnM86nd6g8CAwEA
-AQKCAgEAmlzVLmiuo+vyn1Tc7jCTNjbbg8DsbocF8aXB93gvEJRdo7HNOk9GRGZV
-YFtOVXpXu4lCEO1DkiE5xyaoRghLvNY2Il/Cr4hHpKm9AiWD0bX8/bfaOmjPH3D+
-K2bPem47PWiTODGO63Yo8YGLK3ecWi4Fp4kGBlv0bj16EhknvU7sIbCuORK/8Ni9
-fztWmMzG8idaISrm+GTT0sEGdc6Uv9poMCLyjP4syN49YS+LNCooD4UueVyaC7fE
-sRbbNOpDO5apSgNq6kmtt1Zz/qXBbs1li69/8//BZzCQ5CR/0S7T8N/waa9LKR0x
-IoNWWNyBc1p/rfIS/sDPDQFicRcNKvgyVeZaEMMezYVgPZKWmLO125DUv0+PEbpp
-/yK88VQLRObTa91GKTTAqTc1aJJHqkp7Kq3jgp6d6xVa9+bJ7lg/VWhaIxY0tubX
-clcp6XmZXwsewBB3TiOdWpx4qpxnuhgvIcrUaJzJ2dREPPNsuYN/G6mGXhDEYe0T
-fw7d274bExeG/TIjre/+75s+ecoWfWvrNnPaYYPDrwk7lYAGQkFBslPe53TO/TVM
-gzdbKDRwev48cmJu3HW3+goe/K0t0OCwdWwZltG3bPJwXm/uF745pNI5HbdaK0FV
-v21eX2lJh28SVtFh8AGKw242PWR2jTfNH9Qzc5SgOL6hac9fdhECggEBANZAd65x
-a8O02FJW1jBqykMyRrhHSEboQg7xS2g+WS9WVn/mE4E69X6gX1h66oIzKu8iBHhY
-O4qRku+UbJqBcG8IFbYOuKZ+pQBke7rHrEmWec4YbtwVo0FB5HML7ZA7UlHoaPDA
-djC161Nk7cDTRxe6THlJub515cNO+CTgBbeqFb/J/m48sb4zd433zit5+rZC1dVg
-TOXdT5mQzxOhbJn3YFW3BwJdIPUBQNBvMVByFjE6yXEYQ9KOvCeEhRnGJ0CzqoFm
-zGQIOCWa8gyg2hH1PHBeS8DGr/X3BVoalJlQXr90pddJ0bfO65fx4HaXO5lqCEQ9
-qpcvlGs1AQNwVFkCggEBANGPun3JctChZ7WCoCPR16hibrLNAnMTH4BHHMuLsAEa
-exwgWAMODfaHebAkobTcUAli72TdAIx6XQ9REFDa2lydWzEDjYIKFA6vD7rWLnQq
-60KWFa2zL/rprEa0vk28DXj/WmBTtRH3E1EzGi1LNziXBtfYJJRceCGOrbGqbvwk
-v6C9popRrVaOqHFLxseMKX5jSNEdoZqVoHGQ551luVUHSxu3cVi+sgItcUTfgpqQ
-0SKXTLdYtc5cGjoEgEDxFJD/I9Y0vCgkFPFU/tz8AwBy0adJEsP68pfRpj9yXntt
-Nzkq15JF+7XK2Vt00rKr+We7fiO8Nm1mJEyMsxHBhKcCggEBALhl0dvUiHBu9IOh
-c0VGlABThCRUTXOhsEEWEdWNW8rvHxGDHqRp7yJlusn3OGCI01nvSDOflNdFRVZn
-wzUTVIZrSexgLTI266Iz2X2/HpxTI1BrHPbUtKaUpJ8T1An/1HDke3VB4Dc6S2iC
-BFKiRJy6XdlBx9iRtgdrrwxltuYFQCTKH+4W1M+jkjEg51Pp7wrw6QN1l5l66Wh9
-BoyZsVOuYj5DgYfaSWQ1COib2rCnEEyckQWCYdUVvgCxALFXJy97srMem6k4ncJX
-4h1WT3mHPNZlggNPveAPE48iM2TklDdpmNZ7FUGCmKg0qADJVqVKagT5ohnu/Gls
-vAuOoTkCggEAHmFk2umCgKZ1n4XRa3/3cMzcWYWJDl++WF122jdlC7PoFxrFR6QY
-+B2J0bRt0QeDfujd5dR4SOVQanEJGX+w2m5hkwh90lVdtQdCE4cLcwHp21xgxi7N
-DOYleJapZCGYHmt+kapw/KrCHSp4aAqYddbHQjFulCeXrt29Zp1bu6gkM8xqwXC3
-3W2PE+W1aqZyOYVxQAe4ru10NiKYwWPG64HELL96ajAzJEesPRzeFURbXVVr5MSy
-jrkhgDm40jFhFug2LM62XGdmtdnpnOXYFY+Pv13Dn6/YzZOyM06ETgZ2VA5W9Di7
-Fg7TVgPoq8hsvtIaplmZ1mBRcNuQ9kkEzwKCAQBDrQTgTizuCOozunnHDsX0XBhh
-XOkwwjsrAtMsg0nGt5EI2gAxetQ+y+aMYldPN5s1hDQ7p1f5pfVhuAPy+MM4OtGb
-SNZEAlTI3abk2f27PTak4HBwqTJNGPQh1iSpDvqFut0W92C8kkZbACwqWo9cwxIR
-GqjOTQZkglUhtVQE1xhvjjw5LQj0ORrd3csnyIKioJ82ZQA5svcYHafgCuMtsHh5
-3c7XJabDcKE6pcZ/9lSu+htUguaUKRLmuMNC8BTAb23DXgIKo1R4xuobG1Asc0za
-vzU9a6BzFN8I7hUi8UpSQOmjWh/UtPq1fGnYSzc6pk+6Abzrer1jVPtNeqtM
------END RSA PRIVATE KEY-----
diff --git a/ama-key.pub b/ama-key.pub
deleted file mode 100644
index 2348b44..0000000
--- a/ama-key.pub
+++ /dev/null
@@ -1 +0,0 @@
-ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQCvYuvMYhI5113A++5rpp7tOyxhW74wGUzC/t3V8bqm6xVuro8CHodNNs5uyhng0ffHqC8XFILacWnW6b1nRCXIc4NTGJQ7m/6nrC27EXJI5Gbk3TeyOHzEk9Eref8uFKCXIP3wjLRxsZ0/nWvv6RkbIeTRh9RKMqJwaRABt/VXbBcHMqurKeNIdOmlB4fnICLKb8gm5qduulNbQfYmoDi7JXsWTFKIVgCky4c5nc1Sw+T4S93a2R+GplrD2xxobbaJx3PKkTIMXhJ575oxR9v7ZZZrgb2KRVuKyXNNoDt2Lcxh43NH2UjtCYDYG2oGbihXt2eKctsisUvlmn7vSbTXVZ1aNDf0PRtGEB8H5HmKDGrN65NU55cy5NYkqjx/WPJGyGv+6sAHzoOLnPNkrZ1j1upRQwsZxj9BTgaAHkU5FVrA34rLoCEj4Ccb8qaa4LSjRxNOQoxrYFSVjtqrtAJP6Gp7Mz7+w48Nib9au3JbgF9z9VG+99zdv1MuT1pRP/YOBkeD/b+Pc52OjtlfECqd6XEiOA4sxThFygjt9W7RJNxuel8AAP0O/BJy4fRjSrTcF9O0bsHBHpiLDmYgehm42rxzY15sZJggM3VkkarsHmr/brMyXpHPxJ9QZ3SohIee12Sm8sEfGiFeoL4bxvXkYtZfWfYkGyuEWczzqd3qDw== whisrael@gmail.com
diff --git a/build.gradle b/build.gradle
index 7bcecd4..79b926f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,20 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+plugins {
+    id "org.nosphere.apache.rat" version "0.3.1"
+}
+
 apply plugin: 'distribution'
 
 allprojects {
     group 'org.apache.amaterasu'
-    version '0.2.0-incubating'
+    version '0.2.0-incubating-rc3'
 }
 
 project(':leader')
 project(':common')
 project(':executor')
 
+task copyLeagalFiles(type: Copy) {
+    from "./DISCLAIMER", "./LICENSE", "./NOTICE"
+    into "${buildDir}/amaterasu"
+}
+
 task buildHomeDir() {
-    dependsOn subprojects.collect { getTasksByName('shadowJar', true) }
     dependsOn subprojects.collect { getTasksByName('copyToHome', true) }
+    dependsOn copyLeagalFiles
 }
 
 distributions {
@@ -46,6 +55,11 @@
     dependsOn customDistTar
 }
 
-tasks.withType(Test)  {
+rat {
+    // List of exclude directives, defaults to ['**/.gradle/**']
+    excludes = ["**/build/**", '**/.gradle/**', '**/gradle/**', '**/.idea/**', '**/.ruby-version/**', '**/repo/**', '**/resources/**',  '**/*.iml/**']
+}
+
+tasks.withType(Test) {
     maxParallelForks = 1
 }
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 7c9f924..3661b48 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -71,8 +71,8 @@
       var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
-        if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int]
-        if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").asInstanceOf[Int]
+        if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").toInt
+        if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").toInt
       }
     }
 
@@ -83,8 +83,8 @@
       var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
-        if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int]
-        if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int]
+        if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").toInt
+        if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").toInt
       }
     }
 
@@ -133,9 +133,9 @@
 
     def load(props: Properties): Unit = {
 
-      if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").asInstanceOf[Double]
-      if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").asInstanceOf[Long]
-      if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").asInstanceOf[Long]
+      if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").toDouble
+      if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
+      if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
 
       Tasks.load(props)
     }
@@ -148,9 +148,9 @@
 
       def load(props: Properties): Unit = {
 
-        if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").asInstanceOf[Int]
-        if (props.containsKey("jobs.tasks.cpus")) attempts = props.getProperty("jobs.tasks.cpus").asInstanceOf[Int]
-        if (props.containsKey("jobs.tasks.mem")) attempts = props.getProperty("jobs.tasks.mem").asInstanceOf[Int]
+        if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").toInt
+        if (props.containsKey("jobs.tasks.cpus")) cpus = props.getProperty("jobs.tasks.cpus").toInt
+        if (props.containsKey("jobs.tasks.mem")) mem = props.getProperty("jobs.tasks.mem").toInt
 
       }
     }
@@ -209,7 +209,7 @@
     if (props.containsKey("timeout")) timeout = props.getProperty("timeout").asInstanceOf[Double]
     if (props.containsKey("mode")) mode = props.getProperty("mode")
     if (props.containsKey("workingFolder")) workingFolder = props.getProperty("workingFolder", s"/user/$user")
-
+    if (props.containsKey("pysparkPath")) pysparkPath = props.getProperty("pysparkPath")
     // TODO: rethink this
     Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
     JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
index ab8a28a..5fa2d74 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.common.dataobjects
 
 import com.google.gson.Gson
diff --git a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
index 8c000cc..9813296 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.common.utils
 
 import java.io.File
diff --git a/executor/build.gradle b/executor/build.gradle
index a173c58..21bc2b0 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -112,6 +112,7 @@
 }
 
 task copyToHome(type: Copy) {
+    dependsOn shadowJar
     from 'build/libs'
     into '../build/amaterasu/dist'
     from 'build/resources/main'
diff --git a/executor/src/main/resources/runtime.py b/executor/src/main/resources/runtime.py
index 874b174..d01664c 100644
--- a/executor/src/main/resources/runtime.py
+++ b/executor/src/main/resources/runtime.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 class AmaContext(object):
 
     def __init__(self, sc, spark, job_id, env):
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
index f1752a2..f3c9fc0 100755
--- a/executor/src/main/resources/spark_intp.py
+++ b/executor/src/main/resources/spark_intp.py
@@ -1,16 +1,27 @@
 #!/usr/bin/python
-
-# import os
-# user_paths = os.environ['PYTHONPATH']
 #
-# with open('/Users/roadan/pypath.txt', 'a') as the_file:
-#     the_file.write(user_paths)
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 
 import ast
 import codegen
 import os
 import sys
 import zipimport
+sys.path.append(os.getcwd())
 from runtime import AmaContext, Environment
 
 # os.chdir(os.getcwd() + '/build/resources/test/')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
index 94b8056..79fe18a 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
@@ -16,19 +16,21 @@
  */
 package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
 
-import java.io.{File, PrintWriter, StringWriter}
+import java.io.File
 import java.util
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.sdk.AmaterasuRunner
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.SparkSession
 
-import scala.sys.process.Process
+import scala.sys.process.{Process, ProcessLogger}
+
+
 
 
 class PySparkRunner extends AmaterasuRunner with Logging {
@@ -69,6 +71,15 @@
 
 object PySparkRunner {
 
+  def collectCondaPackages(): String = {
+    val pkgsDirs = new File("./miniconda/pkgs")
+    (pkgsDirs.listFiles.filter {
+      file => file.getName.endsWith(".tar.bz2")
+    }.map {
+      file => s"./miniconda/pkgs/${file.getName}"
+    }.toBuffer ++ "dist/codegen.py").mkString(",")
+  }
+
   def apply(env: Environment,
             jobId: String,
             notifier: Notifier,
@@ -77,14 +88,13 @@
             pyDeps: PythonDependencies,
             config: ClusterConfig): PySparkRunner = {
 
-    //TODO: can we make this less ugly?
-    var pysparkPython = "/usr/bin/python"
+    val shellLoger = ProcessLogger(
+      (o: String) => println(o),
+      (e: String) => println(e)
+    )
 
-    if (pyDeps != null &&
-        pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(pyDeps, notifier)
-      pysparkPython = "miniconda/bin/python"
-    }
+    //TODO: can we make this less ugly?
+
 
     val result = new PySparkRunner
 
@@ -98,87 +108,44 @@
       intpPath = s"spark_intp.py"
     }
     var pysparkPath = ""
-    if (env.configuration.contains("pysparkPath")) {
-      pysparkPath = env.configuration("pysparkPath")
-    } else {
-      pysparkPath = s"${config.spark.home}/bin/spark-submit"
+    var condaPkgs = ""
+    if (pyDeps != null)
+      condaPkgs = collectCondaPackages()
+    var sparkCmd: Seq[String] = Seq()
+    config.mode match {
+      case "yarn" =>
+        pysparkPath = s"spark/bin/spark-submit"
+        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
+      case "mesos" =>
+        pysparkPath = config.pysparkPath
+        if (pysparkPath.endsWith("spark-submit")) {
+          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
+        }
+        else {
+          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
+        }
+        var pysparkPython = "/usr/bin/python"
+
+        if (pyDeps != null &&
+          pyDeps.packages.nonEmpty) {
+          pysparkPython = "./miniconda/bin/python"
+        }
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYSPARK_PYTHON" -> pysparkPython,
+        "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
     }
-    val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
-      "PYTHONPATH" -> pypath,
-      "PYSPARK_PYTHON" -> pysparkPython,
-      "PYTHONHASHSEED" -> 0.toString) #> System.out
-
-    proc.run()
-
 
     result.notifier = notifier
 
     result
   }
 
-  /**
-    * This installs the required python dependencies.
-    * We basically need 2 packages to make pyspark work with customer's scripts:
-    * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
-    * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
-    * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
-    * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
-    *
-    * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
-    * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
-    * @param notifier
-    */
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
-    installAnacondaPackage(codegenPackage)
-    try {
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
-          case "anaconda" => installAnacondaPackage(pack)
-          // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
-        }
-      })
-    }
-    catch {
-
-      case rte: RuntimeException =>
-        val sw = new StringWriter
-        rte.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-      case e: Exception =>
-        val sw = new StringWriter
-        e.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-    }
-  }
-
-
-  /**
-    * Installs one python package using Anaconda.
-    * Anaconda works with multiple channels, or better called, repositories.
-    * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
-    * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
-    * @param pythonPackage This comes from parsing the python.yml dep file.
-    */
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
-    } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
-    }
-  }
-
-  /**
-    * Installs Anaconda and then links it with the local spark that was installed on the executor.
-    */
-  private def installAnacondaOnNode(): Unit = {
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
-  }
-
-
 }
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index ff56d8c..ba7ff03 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -47,6 +47,7 @@
   )
   private var conf: Option[Map[String, Any]] = _
   private var executorEnv: Option[Map[String, Any]] = _
+  private var clusterConfig: ClusterConfig = _
 
   override def init(execData: ExecData,
                     jobId: String,
@@ -60,7 +61,7 @@
       (o: String) => log.info(o),
       (e: String) => log.error("", e)
     )
-
+    clusterConfig = config
     var jars = Seq.empty[String]
 
     if (execData.deps != null) {
@@ -83,9 +84,15 @@
     sparkScalaRunner.initializeAmaContext(execData.env)
 
     runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-
+    var pypath = ""
     // TODO: get rid of hard-coded version
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config)
+    config.mode match {
+      case "yarn" =>
+        pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
+      case "mesos" =>
+        pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
+    }
+    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
     runners.put(pySparkRunner.getIdentifier, pySparkRunner)
 
     lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
@@ -95,17 +102,22 @@
   private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
     val channel = pythonPackage.channel.getOrElse("anaconda")
     if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
     } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
     }
   }
 
   private def installAnacondaOnNode(): Unit = {
     // TODO: get rid of hard-coded version
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
+
+    this.clusterConfig.mode match {
+      case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
+      case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
+    }
+
+    Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
+    Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
   }
 
   private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index d437778..f4f553c 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.executor.yarn.executors
 
 import java.io.ByteArrayOutputStream
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index abab8a4..f2c2afa 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -145,16 +145,16 @@
       case "yarn" =>
         conf.set("spark.home", config.spark.home)
           // TODO: parameterize those
-          .setJars(s"executor-${config.version}-all.jar" +: jars)
+          .setJars(s"executor.jar" +: jars)
           .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
           .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
           .set("spark.yarn.queue", "default")
           .set("spark.history.kerberos.principal", "none")
 
           .set("spark.master", master)
-          .set("spark.executor.instances", "1") // TODO: change this
+          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
           .set("spark.yarn.jars", s"spark/jars/*")
-          .set("spark.executor.memory", "1g")
+          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
           .set("spark.dynamicAllocation.enabled", "false")
           .set("spark.eventLog.enabled", "false")
           .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
index 19cb189..d402fed 100755
--- a/executor/src/test/resources/amaterasu.properties
+++ b/executor/src/test/resources/amaterasu.properties
@@ -6,3 +6,4 @@
 webserver.port=8000
 webserver.root=dist
 spark.version=2.1.1-bin-hadoop2.7
+pysparkPath = /usr/bin/python
diff --git a/executor/src/test/resources/pyspark-with-amacontext.py b/executor/src/test/resources/pyspark-with-amacontext.py
index bd780a9..c940eea 100755
--- a/executor/src/test/resources/pyspark-with-amacontext.py
+++ b/executor/src/test/resources/pyspark-with-amacontext.py
@@ -1,3 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class AmaContext(object):
+
+    def __init__(self, sc, spark, job_id, env):
+        self.sc = sc
+        self.spark = spark
+        self.job_id = job_id
+        self.env = env
+
+    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
+        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
+
+class Environment(object):
+
+    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
+        self.name = name
+        self.master = master
+        self.input_root_path = input_root_path
+        self.output_root_path = output_root_path
+        self.working_dir = working_dir
+        self.configuration = configuration
+
 data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 rdd = ama_context.sc.parallelize(data)
 odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/executor/src/test/resources/runtime.py b/executor/src/test/resources/runtime.py
index 3a90952..d01664c 100644
--- a/executor/src/test/resources/runtime.py
+++ b/executor/src/test/resources/runtime.py
@@ -1,6 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 class AmaContext(object):
 
-    def __init__(self, sc, sqlContext):
+    def __init__(self, sc, spark, job_id, env):
         self.sc = sc
-        self.sqlContext = sqlContext
+        self.spark = spark
+        self.job_id = job_id
+        self.env = env
 
+    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
+        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
+
+class Environment(object):
+
+    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
+        self.name = name
+        self.master = master
+        self.input_root_path = input_root_path
+        self.output_root_path = output_root_path
+        self.working_dir = working_dir
+        self.configuration = configuration
diff --git a/executor/src/test/resources/simple-pyspark.py b/executor/src/test/resources/simple-pyspark.py
index df9eb0a..923f81c 100755
--- a/executor/src/test/resources/simple-pyspark.py
+++ b/executor/src/test/resources/simple-pyspark.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 try:
     rdd = sc.parallelize(data)
diff --git a/executor/src/test/resources/simple-python-err.py b/executor/src/test/resources/simple-python-err.py
index 14ca311..dff1491 100755
--- a/executor/src/test/resources/simple-python-err.py
+++ b/executor/src/test/resources/simple-python-err.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 data = [1, 2, 3, 4, 5]
 1/0
 
diff --git a/executor/src/test/resources/simple-python.py b/executor/src/test/resources/simple-python.py
index 3f25951..0ac6f85 100755
--- a/executor/src/test/resources/simple-python.py
+++ b/executor/src/test/resources/simple-python.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 data = [1, 2, 3, 4, 5]
 print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
 print(data)
diff --git a/executor/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala
index 34798e7..a11a458 100755
--- a/executor/src/test/resources/simple-spark.scala
+++ b/executor/src/test/resources/simple-spark.scala
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import org.apache.amaterasu.executor.runtime.AmaContext
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 
diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py
index 6b8eaf6..fd8dc0e 100755
--- a/executor/src/test/resources/spark_intp.py
+++ b/executor/src/test/resources/spark_intp.py
@@ -1,24 +1,41 @@
 #!/usr/bin/python
-
-# import os
-# user_paths = os.environ['PYTHONPATH']
 #
-# with open('/Users/roadan/pypath.txt', 'a') as the_file:
-#     the_file.write(user_paths)
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 
 import ast
 import codegen
 import os
 import sys
-from runtime import AmaContext
+import zipimport
+from runtime import AmaContext, Environment
+
 os.chdir(os.getcwd() + '/build/resources/test/')
 import zipfile
 zip = zipfile.ZipFile('pyspark.zip')
 zip.extractall()
 zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
 zip.extractall()
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
+sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.append(os.getcwd())
+
+# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
+# py4j_importer = zipimport.zipimporter(py4j_path)
+# py4j = py4j_importer.load_module('py4j')
 from py4j.java_gateway import JavaGateway, GatewayClient, java_import
 from py4j.protocol import Py4JJavaError
 from pyspark.conf import SparkConf
@@ -31,9 +48,10 @@
 from pyspark.broadcast import Broadcast
 from pyspark.serializers import MarshalSerializer, PickleSerializer
 from pyspark.sql import SparkSession
+from pyspark.sql import Row
 
 client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert = True)
+gateway = JavaGateway(client, auto_convert=True)
 entry_point = gateway.entry_point
 queue = entry_point.getExecutionQueue()
 
@@ -49,28 +67,44 @@
 jconf = entry_point.getSparkConf()
 jsc = entry_point.getJavaSparkContext()
 
-conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
+job_id = entry_point.getJobId()
+javaEnv = entry_point.getEnv()
 
+env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
+conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
+conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-
 spark = SparkSession(sc, entry_point.getSparkSession())
-sqlc = spark._wrapped
 
-ama_context = AmaContext(sc, sqlc)
+ama_context = AmaContext(sc, spark, job_id, env)
 
 while True:
     actionData = queue.getNext()
     resultQueue = entry_point.getResultQueue(actionData._2())
     actionSource = actionData._1()
     tree = ast.parse(actionSource)
+    exports = actionData._3()
 
     for node in tree.body:
 
         wrapper = ast.Module(body=[node])
         try:
-            co  = compile(wrapper, "<ast>", 'exec')
-            exec(co)
+            co = compile(wrapper, "<ast>", 'exec')
+            exec (co)
             resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
+
+            #if this node is an assignment, we need to check if it needs to be persisted
+            try:
+                persistCode = ''
+                if(isinstance(node,ast.Assign)):
+                    varName = node.targets[0].id
+                    if(exports.containsKey(varName)):
+                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
+                        persist = compile(persistCode, '<stdin>', 'exec')
+                        exec(persist)
+
+            except:
+                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
         except:
             resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
     resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/executor/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala
index 189701f..a3d034c 100755
--- a/executor/src/test/resources/step-2.scala
+++ b/executor/src/test/resources/step-2.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import org.apache.amaterasu.executor.runtime.AmaContext
 
 
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
index 68c06ce..9575205 100755
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
+++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
@@ -53,4 +53,4 @@
   }
 
 
-}
+}
\ No newline at end of file
diff --git a/leader/build.gradle b/leader/build.gradle
index 8595d02..da29397 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -43,10 +43,10 @@
     compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
     compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1'
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.3'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
     compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908'
@@ -102,6 +102,7 @@
 }
 
 task copyToHomeBin(type: Copy) {
+    dependsOn shadowJar
     from 'build/libs'
     into '../build/amaterasu/bin'
 }
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index 731efb8..e3c2812 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -110,8 +111,9 @@
 
 
         List<String> commands = Collections.singletonList(
-                "env AMA_NODE=" + System.getenv("AMA_NODE") + " " +
-                        "$JAVA_HOME/bin/java" +
+                "env AMA_NODE=" + System.getenv("AMA_NODE") +
+                        " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
+                        " $JAVA_HOME/bin/java" +
                         " -Dscala.usejavacp=false" +
                         " -Xmx1G" +
                         " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
@@ -128,6 +130,7 @@
 
         // Setup local ama folder on hdfs.
         try {
+
             if (!fs.exists(jarPathQualified)) {
                 File home = new File(opts.home);
                 fs.mkdirs(jarPathQualified);
@@ -139,6 +142,7 @@
                 // setup frameworks
                 FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config);
                 for (String group : frameworkFactory.groups()) {
+                    System.out.println("===> setting up " + group);
                     FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
 
                     //creating a group folder
@@ -153,14 +157,15 @@
                 }
             }
         } catch (IOException e) {
+            System.out.println("===>" + e.getMessage());
             LOGGER.error("Error uploading ama folder to HDFS.", e);
             exit(3);
         } catch (NullPointerException ne) {
+            System.out.println("===>" + ne.getMessage());
             LOGGER.error("No files in home dir.", ne);
             exit(4);
         }
 
-
         // get version of build
         String version = config.version();
 
@@ -170,7 +175,6 @@
         Path mergedPath = Path.mergePaths(jarPath, new Path(leaderJarPath));
 
         // System.out.println("===> path: " + jarPathQualified);
-
         LOGGER.info("Leader merged jar path is: {}", mergedPath);
         LocalResource leaderJar = null;
         LocalResource propFile = null;
@@ -234,7 +238,7 @@
         reportBarrier.setBarrier();
         reportBarrier.waitOnBarrier();
 
-        String address = new String( client.getData().forPath("/" + newJobId + "/broker"));
+        String address = new String(client.getData().forPath("/" + newJobId + "/broker"));
         System.out.println("===> " + address);
         setupReportListener(address);
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
index adaeae9..3e1a67b 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.execution.frameworks
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
index 0fe378a..8c487c1 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.frameworks.spark
 
 import java.io.File
@@ -15,19 +31,22 @@
 
   private var env: String = _
   private var conf: ClusterConfig = _
-  private val runnersResources = mutable.Map[String,Array[File]]()
-  private var execData: ExecData = _
-  private var sparkExecConfigurations = mutable.Map[String, Any]()
+  private val runnersResources = mutable.Map[String, Array[File]]()
+  //private var execData: ExecData = _
+  private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
 
-  override def init(env: String, conf: ClusterConfig): Unit = {
-    this.env = env
-    this.conf = conf
-    this.execData = DataLoader.getExecutorData(env, conf)
+  private def loadSparkConfig: mutable.Map[String, Any] = {
+    val execData = DataLoader.getExecutorData(env, conf)
     val sparkExecConfigurationsurations = execData.configurations.get("spark")
     if (sparkExecConfigurationsurations.isEmpty) {
       throw new Exception(s"Spark configuration files could not be loaded for the environment ${env}")
     }
-    this.sparkExecConfigurations = sparkExecConfigurations ++ sparkExecConfigurationsurations.get
+    collection.mutable.Map(sparkExecConfigurationsurations.get.toSeq: _*)
+  }
+
+  override def init(env: String, conf: ClusterConfig): Unit = {
+    this.env = env
+    this.conf = conf
 
     runnersResources += "scala" -> Array.empty[File]
     runnersResources += "sql" -> Array.empty[File]
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
index 0706491..8aec0f6 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.mesos
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 86863f5..87a8f5d 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -16,10 +16,11 @@
  */
 package org.apache.amaterasu.leader.mesos.schedulers
 
+import java.io.File
 import java.util
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import java.util.{Collections, UUID}
+import java.util.{Collections, Properties, UUID}
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -50,6 +51,9 @@
   */
 class JobScheduler extends AmaterasuScheduler {
 
+  /*private val props: Properties = new Properties(new File(""))
+  private val version = props.getProperty("version")
+  println(s"===> version  $version")*/
   LogManager.resetConfiguration()
   private var jobManager: JobManager = _
   private var client: CuratorFramework = _
@@ -166,15 +170,15 @@
                 val command = CommandInfo
                   .newBuilder
                   .setValue(
-                    s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-0.2.0-incubating-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
+                    s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-${config.version}-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
                   )
-//                  HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
-//                  )
+                  //                  HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
+                  //                  )
                   .addUris(URI.newBuilder
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-0.2.0-incubating-all.jar")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
+                  .setExecutable(false)
+                  .setExtract(false)
+                  .build())
                   .addUris(URI.newBuilder()
                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
                     .setExecutable(false)
@@ -201,10 +205,10 @@
                     .setExtract(false)
                     .build())
                   .addUris(URI.newBuilder()
-                      .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
-                      .setExecutable(false)
-                      .setExtract(false)
-                      .build())
+                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
+                    .setExecutable(false)
+                    .setExtract(false)
+                    .build())
                 executor = ExecutorInfo
                   .newBuilder
                   .setData(ByteString.copyFrom(execData))
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
index e24d979..2664665 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.utilities
 
 import javax.jms.{Message, MessageListener, TextMessage}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
index 1c86c13..c005256 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
@@ -1,6 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.utilities
 
-
 case class Args(
                  repo: String = "",
                  branch: String = "master",
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
index f15c3b1..d1d0c53 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.utilities
 
 import java.io.FileInputStream
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala
index c083bda..7aaa752 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.utilities
 
 object MemoryFormatParser {
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 3fed076..1828100 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -21,8 +21,8 @@
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import javax.jms.Session
 
+import javax.jms.Session
 import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.activemq.broker.BrokerService
 import org.apache.amaterasu.common.configuration.ClusterConfig
@@ -153,21 +153,14 @@
     // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
     allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
 
-    rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this)
-    rmClient.init(conf)
-    rmClient.start()
-
-    // Register with ResourceManager
-    log.info("Registering application")
-    val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
-    log.info("Registered application")
+    rmClient = startRMClient()
+    val registrationResponse = registerAppMaster("", 0, "")
     val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
     log.info("Max mem capability of resources in this cluster " + maxMem)
     val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
     log.info("Max vcores capability of resources in this cluster " + maxVCores)
     log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.registeredActions.size}")
 
-
     // Resource requirements for worker containers
     this.capability = Records.newRecord(classOf[Resource])
     val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
@@ -194,6 +187,21 @@
     log.info("Finished asking for containers")
   }
 
+  private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
+    val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
+    client.init(conf)
+    client.start()
+    client
+  }
+
+  private def registerAppMaster(host: String, port: Int, url: String) = {
+    // Register with ResourceManager
+    log.info("Registering application")
+    val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
+    log.info("Registered application")
+    registrationResponse
+  }
+
   private def setupMessaging(jobId: String): Unit = {
 
     val cf = new ActiveMQConnectionFactory(address)
@@ -225,20 +233,6 @@
 
   override def onContainersAllocated(containers: util.List[Container]): Unit = {
 
-    // creating the credentials for container execution
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    val dob = new DataOutputBuffer
-    credentials.writeTokenStorageToStream(dob)
-
-    // removing the AM->RM token so that containers cannot access it.
-    val iter = credentials.getAllTokens.iterator
-    log.info("Executing with tokens:")
-    for (token <- iter) {
-      log.info(token.toString)
-      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
-    }
-    val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-
     log.info(s"${containers.size()} Containers allocated")
     for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
       if (actionsBuffer.isEmpty) {
@@ -255,8 +249,8 @@
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
         val commands: List[String] = List(
           "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
-          s"/bin/bash ${config.spark.home}/bin/load-spark-env.sh && ",
-          s"java -cp ${config.spark.home}/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
+          s"/bin/bash spark/bin/load-spark-env.sh && ",
+          s"java -cp spark/jars/*:executor.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
             "-Xmx1G " +
             "-Dscala.usejavacp=true " +
             "-Dhdp.version=2.6.1.0-129 " +
@@ -294,7 +288,8 @@
         ctx.setEnvironment(Map[String, String](
           "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
           "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
-          "AMA_NODE" -> sys.env("AMA_NODE")
+          "AMA_NODE" -> sys.env("AMA_NODE"),
+          "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
         ))
 
         log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
@@ -316,6 +311,22 @@
     }
   }
 
+  private def allTokens: ByteBuffer = {
+    // creating the credentials for container execution
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    val dob = new DataOutputBuffer
+    credentials.writeTokenStorageToStream(dob)
+
+    // removing the AM->RM token so that containers cannot access it.
+    val iter = credentials.getAllTokens.iterator
+    log.info("Executing with tokens:")
+    for (token <- iter) {
+      log.info(token.toString)
+      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+    }
+    ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+  }
+
   private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
 
     val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
index 0c1a8f0..70da38e 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
@@ -106,7 +106,7 @@
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
         val command = s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")}
                          | env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz
-                         | java -cp executor-0.2.0-all.jar:spark-${config.Webserver.sparkVersion}/lib/*
+                         | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/*
                          | -Dscala.usejavacp=true
                          | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
                          | ${jobManager.jobId} ${config.master} ${actionData.name} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh
index 6fd194a..c0f8d52 100755
--- a/leader/src/main/scripts/ama-start-yarn.sh
+++ b/leader/src/main/scripts/ama-start-yarn.sh
@@ -1,4 +1,20 @@
 #!/usr/bin/env bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
 
 BASEDIR=$(dirname "$0")
 
@@ -86,7 +102,7 @@
 echo "repo: ${REPO} "
 echo "force-bin: ${FORCE_BIN}"
 export HADOOP_USER_CLASSPATH_FIRST=true
-CMD="yarn jar ${BASEDIR}/bin/leader-0.2.0-incubating-all.jar org.apache.amaterasu.leader.yarn.Client --home ${BASEDIR}"
+CMD="yarn jar ${BASEDIR}/bin/leader-0.2.0-incubating-rc3-all.jar org.apache.amaterasu.leader.yarn.Client --home ${BASEDIR}"
 
 if [ -n "$REPO" ]; then
     echo "repo is ${REPO}"
@@ -129,9 +145,9 @@
 if [ "$FORCE_BIN" = true ] ; then
     echo "FORCE: Deleting and re-creating /apps/amaterasu folder"
     eval "hdfs dfs -rm -R -skipTrash /apps/amaterasu"
-    eval "hdfs dfs -mkdir /apps/amaterasu/"
-    eval "hdfs dfs -chmod -R 777 /apps/amaterasu/"
-    eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
+    #eval "hdfs dfs -mkdir /apps/amaterasu/"
+    #eval "hdfs dfs -chmod -R 777 /apps/amaterasu/"
+    #eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
 fi
 
 eval $CMD | grep "===>"
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader/src/main/scripts/amaterasu.properties
index 7961db9..5cd6638 100755
--- a/leader/src/main/scripts/amaterasu.properties
+++ b/leader/src/main/scripts/amaterasu.properties
@@ -1,5 +1,19 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
 zk=127.0.0.1
-version=0.2.0-incubating
+version=0.2.0-incubating-rc3
 master=192.168.33.11
 user=root
 mode=yarn
diff --git a/leader/src/main/scripts/log4j.properties b/leader/src/main/scripts/log4j.properties
index c5e965f..742eb59 100644
--- a/leader/src/main/scripts/log4j.properties
+++ b/leader/src/main/scripts/log4j.properties
@@ -1,3 +1,18 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
 # Root logger option
 log4j.rootLogger=DEBUG, stdout, file
 
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
index d1be723..ef53fa9 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.sdk.frameworks;
 
 import org.apache.amaterasu.common.configuration.ClusterConfig;
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java
index ff9d7c7..8fe641c 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.sdk.frameworks.configuration;
 
 public class DriverConfiguration {