mereged with master
diff --git a/docs/docs/config.md b/docs/docs/config.md
index f49186a..52adfc2 100644
--- a/docs/docs/config.md
+++ b/docs/docs/config.md
@@ -65,6 +65,7 @@
       file: start.py
 
 ```
+
 ## Configuration Types
 
 Amaterasu allows the configuration of three main areas:
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 7a32b9e..e3e503f 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -56,8 +56,8 @@
 
     override val groupResources: Array<File> by lazy {
         when (conf.mode()) {
-            "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runner-${conf.version()}-all.jar"), File("spark-runtime-${conf.version()}.jar"))
-            "yarn" -> arrayOf(File("spark-runner-${conf.version()}-all.jar"), File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
+            "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runtime-${conf.version()}.jar"))
+            "yarn" -> arrayOf(File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
             else -> arrayOf()
         }
     }
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
index d716427..c4a4f93 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
@@ -31,8 +31,8 @@
         val hive  = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
         val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
 
-        return "$command && \$SPARK_HOME/bin/spark-submit $master" +
-                SparkCommandLineHelper.getOptions(sparkOptions) +
+        return "$command && \$SPARK_HOME/bin/spark-submit $master " +
+                SparkCommandLineHelper.getOptions(sparkOptions) + " " +
                 SparkCommandLineHelper.getProperties(sparkProperties) + " " +
                 "--conf spark.pyspark.python=${conf.pythonPath()} " +
                 "--conf spark.pyspark.driver.python=$virtualPythonPath " +
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig
new file mode 100644
index 0000000..daaab40
--- /dev/null
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import com.uchuhimo.konf.Config
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
+import org.apache.amaterasu.common.configuration.Job
+
+class PySparkRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) {
+
+    override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
+        val sparkProperties: Map<String, Any> = env["sparkProperties"]
+        val sparkOptions: Map<String, Any> = env["sparkOptions"]
+        val command = super.getCommand(jobId, actionData, env, executorId, callbackAddress)
+        val hive  = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
+        val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
+
+<<<<<<< HEAD
+        return "$command && \$SPARK_HOME/bin/spark-submit $master " +
+                SparkCommandLineHelper.getOptions(sparkOptions) + " " +
+=======
+        return "$command && \$SPARK_HOME/bin/spark-submit $master" +
+                SparkCommandLineHelper.getOptions(sparkOptions) +
+>>>>>>> upstream/master
+                SparkCommandLineHelper.getProperties(sparkProperties) + " " +
+                "--conf spark.pyspark.python=${conf.pythonPath()} " +
+                "--conf spark.pyspark.driver.python=$virtualPythonPath " +
+                hive +
+                " ${actionData.src}"
+    }
+
+    override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
+
+    override val runnerResources: Array<String>
+        get() {
+            var resources = super.runnerResources
+            resources += "amaterasu_pyspark-${conf.version()}.zip"
+            //log.info("PYSPARK RESOURCES ==> ${resources.toSet}")
+            return resources
+        }
+
+    override val hasExecutor: Boolean = false
+}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
index e60eb11..ccfa6f0 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
@@ -19,10 +19,10 @@
 object SparkCommandLineHelper {
 
     fun getOptions(sparkOptions: Map<String, Any>): String {
-        return sparkOptions.map { "--${it.key} ${it.value}" }.joinToString( separator = " ")
+        return sparkOptions.map { "--${it.key} ${it.value}" }.joinToString( separator = " ") + " "
     }
 
     fun getProperties(sparkProps: Map<String, Any>): String{
-        return sparkProps.map { "--conf $it" }.joinToString( separator = " ")
+        return sparkProps.map { "--conf $it" }.joinToString( separator = " ") + " "
     }
 }
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index a607d75..f413fd1 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -20,7 +20,6 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.utils.ArtifactUtil
-import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.common.configuration.Job
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
 
@@ -35,8 +34,8 @@
         val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
 
         return "\$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.artifact).first().name} " +
-                SparkCommandLineHelper.getOptions(sparkOptions) +
-                SparkCommandLineHelper.getProperties(sparkProperties) +
+                SparkCommandLineHelper.getOptions(sparkOptions) + " " +
+                SparkCommandLineHelper.getProperties(sparkProperties) + " " +
                 master +
                 " --jars spark-runtime-${conf.version()}.jar >&1"
     }
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
deleted file mode 100644
index b363058..0000000
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/build.gradle b/frameworks/spark/runner/build.gradle
deleted file mode 100644
index 3a612f2..0000000
--- a/frameworks/spark/runner/build.gradle
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
-    id 'com.github.johnrengelman.shadow' version '1.2.4'
-    id 'com.github.maiflai.scalatest' version '0.22'
-    id 'scala'
-    id 'java'
-}
-
-shadowJar {
-    zip64 true
-}
-
-repositories {
-    maven {
-        url "https://plugins.gradle.org/m2/"
-    }
-    mavenCentral()
-}
-
-test {
-    maxParallelForks = 1
-    forkEvery = 1
-}
-
-configurations {
-    provided
-}
-
-sourceSets {
-    main.compileClasspath += configurations.provided
-    test.compileClasspath += configurations.provided
-    test.runtimeClasspath += configurations.provided
-}
-
-dependencies {
-
-    compile project(':executor')
-    compile project(':spark-runtime')
-    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
-    compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
-    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
-    compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
-    compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5'
-    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5'
-    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
-
-    compile('com.jcabi:jcabi-aether:0.10.1') {
-        exclude group: 'org.jboss.netty'
-    }
-    compile('org.apache.activemq:activemq-client:5.15.2') {
-        exclude group: 'org.jboss.netty'
-    }
-
-    //runtime dependency for spark
-    provided('org.apache.spark:spark-repl_2.11:2.2.1')
-    provided('org.apache.spark:spark-core_2.11:2.2.1')
-
-    testCompile project(':common')
-    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
-    testRuntime 'org.pegdown:pegdown:1.1.0'
-    testCompile 'junit:junit:4.11'
-    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
-    testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
-    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
-
-}
-
-sourceSets {
-    test {
-        resources.srcDirs += [file('src/test/resources')]
-    }
-
-    main {
-        scala {
-            srcDirs = ['src/main/scala', 'src/main/java']
-        }
-        java {
-            srcDirs = []
-        }
-    }
-}
-
-test {
-
-    maxParallelForks = 1
-}
-
-task copyToHome(type: Copy) {
-    dependsOn shadowJar
-    from 'build/libs'
-    into '../../../build/amaterasu/dist'
-    from 'build/resources/main'
-    into '../../../build/amaterasu/dist'
-}
diff --git a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkEntryPoint.java b/frameworks/spark/runner/src/main/java/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkEntryPoint.java
deleted file mode 100755
index 491d771..0000000
--- a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkEntryPoint.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark;
-
-import org.apache.amaterasu.common.runtime.Environment;
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SparkSession;
-import py4j.GatewayServer;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PySparkEntryPoint {
-
-    //private static Boolean Started = false;
-    private static  PySparkExecutionQueue queue = new PySparkExecutionQueue();
-    private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>();
-
-    private static int port = 0;
-    private static SparkSession sparkSession = null;
-    private static JavaSparkContext jsc = null;
-    private static SQLContext sqlContext = null;
-    private static SparkEnv sparkEnv = null;
-
-    public static PySparkExecutionQueue getExecutionQueue() {
-        return queue;
-    }
-
-    public static ResultQueue getResultQueue(String actionName) {
-        resultQueues.putIfAbsent(actionName, new ResultQueue());
-        return resultQueues.get(actionName);
-    }
-
-    public static JavaSparkContext getJavaSparkContext() {
-        SparkEnv.set(sparkEnv);
-        return jsc;
-    }
-
-    public static String getJobId(){
-        return AmaContext.jobId();
-    }
-
-    public static Environment getEnv(){
-        return AmaContext.env();
-    }
-
-    public static SQLContext getSqlContext() {
-        SparkEnv.set(sparkEnv);
-        return sqlContext;
-    }
-
-    public static SparkSession getSparkSession() {
-        SparkEnv.set(sparkEnv);
-        return sparkSession;
-    }
-
-    public static SparkConf getSparkConf() {
-        return jsc.getConf();
-    }
-
-    private static void generatePort() {
-
-        try {
-
-            ServerSocket socket = new ServerSocket(0);
-            port = socket.getLocalPort();
-
-            socket.close();
-
-        } catch (IOException e) {
-        }
-
-    }
-
-    public static int getPort() {
-        return port;
-    }
-
-    public static void start(SparkSession spark,
-                             String jobName,
-                             Environment env,
-                             SparkEnv sparkEnv) {
-
-        AmaContext.init(spark, jobName, env);
-
-        sparkSession = spark;
-        jsc = new JavaSparkContext(spark.sparkContext());
-        sqlContext = spark.sqlContext();
-        PySparkEntryPoint.sparkEnv = sparkEnv;
-        generatePort();
-        GatewayServer gatewayServer = new GatewayServer(new PySparkEntryPoint(), port);
-
-        gatewayServer.start();
-    }
-}
diff --git a/frameworks/spark/runner/src/main/resources/codegen.py b/frameworks/spark/runner/src/main/resources/codegen.py
deleted file mode 100644
index 113d9be..0000000
--- a/frameworks/spark/runner/src/main/resources/codegen.py
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
-    codegen
-    ~~~~~~~
-
-    Extension to ast that allow ast -> python code generation.
-
-    :copyright: Copyright 2008 by Armin Ronacher.
-    :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
-    """This function can convert a node tree back into python sourcecode.
-    This is useful for debugging purposes, especially if you're dealing with
-    custom asts not generated by python itself.
-
-    It could be that the sourcecode is evaluable when the AST itself is not
-    compilable / evaluable.  The reason for this is that the AST contains some
-    more data than regular sourcecode does, which is dropped during
-    conversion.
-
-    Each level of indentation is replaced with `indent_with`.  Per default this
-    parameter is equal to four spaces as suggested by PEP 8, but it might be
-    adjusted to match the application's styleguide.
-
-    If `add_line_information` is set to `True` comments for the line numbers
-    of the nodes are added to the output.  This can be used to spot wrong line
-    number information of statement nodes.
-    """
-    generator = SourceGenerator(indent_with, add_line_information)
-    generator.visit(node)
-
-    return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
-    """This visitor is able to transform a well formed syntax tree into python
-    sourcecode.  For more details have a look at the docstring of the
-    `node_to_source` function.
-    """
-
-    def __init__(self, indent_with, add_line_information=False):
-        self.result = []
-        self.indent_with = indent_with
-        self.add_line_information = add_line_information
-        self.indentation = 0
-        self.new_lines = 0
-
-    def write(self, x):
-        if self.new_lines:
-            if self.result:
-                self.result.append('\n' * self.new_lines)
-            self.result.append(self.indent_with * self.indentation)
-            self.new_lines = 0
-        self.result.append(x)
-
-    def newline(self, node=None, extra=0):
-        self.new_lines = max(self.new_lines, 1 + extra)
-        if node is not None and self.add_line_information:
-            self.write('# line: %s' % node.lineno)
-            self.new_lines = 1
-
-    def body(self, statements):
-        self.new_line = True
-        self.indentation += 1
-        for stmt in statements:
-            self.visit(stmt)
-        self.indentation -= 1
-
-    def body_or_else(self, node):
-        self.body(node.body)
-        if node.orelse:
-            self.newline()
-            self.write('else:')
-            self.body(node.orelse)
-
-    def signature(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        padding = [None] * (len(node.args) - len(node.defaults))
-        for arg, default in zip(node.args, padding + node.defaults):
-            write_comma()
-            self.visit(arg)
-            if default is not None:
-                self.write('=')
-                self.visit(default)
-        if node.vararg is not None:
-            write_comma()
-            self.write('*' + node.vararg)
-        if node.kwarg is not None:
-            write_comma()
-            self.write('**' + node.kwarg)
-
-    def decorators(self, node):
-        for decorator in node.decorator_list:
-            self.newline(decorator)
-            self.write('@')
-            self.visit(decorator)
-
-    # Statements
-
-    def visit_Assert(self, node):
-        self.newline(node)
-        self.write('assert ')
-        self.visit(node.test)
-        if node.msg is not None:
-           self.write(', ')
-           self.visit(node.msg)
-
-    def visit_Assign(self, node):
-        self.newline(node)
-        for idx, target in enumerate(node.targets):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-        self.write(' = ')
-        self.visit(node.value)
-
-    def visit_AugAssign(self, node):
-        self.newline(node)
-        self.visit(node.target)
-        self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
-        self.visit(node.value)
-
-    def visit_ImportFrom(self, node):
-        self.newline(node)
-        self.write('from %s%s import ' % ('.' * node.level, node.module))
-        for idx, item in enumerate(node.names):
-            if idx:
-                self.write(', ')
-            self.write(item)
-
-    def visit_Import(self, node):
-        self.newline(node)
-        for item in node.names:
-            self.write('import ')
-            self.visit(item)
-
-    def visit_Expr(self, node):
-        self.newline(node)
-        self.generic_visit(node)
-
-    def visit_FunctionDef(self, node):
-        self.newline(extra=1)
-        self.decorators(node)
-        self.newline(node)
-        self.write('def %s(' % node.name)
-        self.visit(node.args)
-        self.write('):')
-        self.body(node.body)
-
-    def visit_ClassDef(self, node):
-        have_args = []
-        def paren_or_comma():
-            if have_args:
-                self.write(', ')
-            else:
-                have_args.append(True)
-                self.write('(')
-
-        self.newline(extra=2)
-        self.decorators(node)
-        self.newline(node)
-        self.write('class %s' % node.name)
-        for base in node.bases:
-            paren_or_comma()
-            self.visit(base)
-        # XXX: the if here is used to keep this module compatible
-        #      with python 2.6.
-        if hasattr(node, 'keywords'):
-            for keyword in node.keywords:
-                paren_or_comma()
-                self.write(keyword.arg + '=')
-                self.visit(keyword.value)
-            if node.starargs is not None:
-                paren_or_comma()
-                self.write('*')
-                self.visit(node.starargs)
-            if node.kwargs is not None:
-                paren_or_comma()
-                self.write('**')
-                self.visit(node.kwargs)
-        self.write(have_args and '):' or ':')
-        self.body(node.body)
-
-    def visit_If(self, node):
-        self.newline(node)
-        self.write('if ')
-        self.visit(node.test)
-        self.write(':')
-        self.body(node.body)
-        while True:
-            else_ = node.orelse
-            if len(else_) == 0:
-                break
-            elif len(else_) == 1 and isinstance(else_[0], If):
-                node = else_[0]
-                self.newline()
-                self.write('elif ')
-                self.visit(node.test)
-                self.write(':')
-                self.body(node.body)
-            else:
-                self.newline()
-                self.write('else:')
-                self.body(else_)
-                break
-
-    def visit_For(self, node):
-        self.newline(node)
-        self.write('for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_While(self, node):
-        self.newline(node)
-        self.write('while ')
-        self.visit(node.test)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_With(self, node):
-        self.newline(node)
-        self.write('with ')
-        self.visit(node.context_expr)
-        if node.optional_vars is not None:
-            self.write(' as ')
-            self.visit(node.optional_vars)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_Pass(self, node):
-        self.newline(node)
-        self.write('pass')
-
-    def visit_Print(self, node):
-        # XXX: python 2.6 only
-        self.newline(node)
-        self.write('print ')
-        want_comma = False
-        if node.dest is not None:
-            self.write(' >> ')
-            self.visit(node.dest)
-            want_comma = True
-        for value in node.values:
-            if want_comma:
-                self.write(', ')
-            self.visit(value)
-            want_comma = True
-        if not node.nl:
-            self.write(',')
-
-    def visit_Delete(self, node):
-        self.newline(node)
-        self.write('del ')
-        for idx, target in enumerate(node):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-
-    def visit_TryExcept(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        for handler in node.handlers:
-            self.visit(handler)
-
-    def visit_TryFinally(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        self.newline(node)
-        self.write('finally:')
-        self.body(node.finalbody)
-
-    def visit_Global(self, node):
-        self.newline(node)
-        self.write('global ' + ', '.join(node.names))
-
-    def visit_Nonlocal(self, node):
-        self.newline(node)
-        self.write('nonlocal ' + ', '.join(node.names))
-
-    def visit_Return(self, node):
-        self.newline(node)
-        if node.value is None:
-            self.write('return')
-        else:
-            self.write('return ')
-            self.visit(node.value)
-
-    def visit_Break(self, node):
-        self.newline(node)
-        self.write('break')
-
-    def visit_Continue(self, node):
-        self.newline(node)
-        self.write('continue')
-
-    def visit_Raise(self, node):
-        # XXX: Python 2.6 / 3.0 compatibility
-        self.newline(node)
-        self.write('raise')
-        if hasattr(node, 'exc') and node.exc is not None:
-            self.write(' ')
-            self.visit(node.exc)
-            if node.cause is not None:
-                self.write(' from ')
-                self.visit(node.cause)
-        elif hasattr(node, 'type') and node.type is not None:
-            self.visit(node.type)
-            if node.inst is not None:
-                self.write(', ')
-                self.visit(node.inst)
-            if node.tback is not None:
-                self.write(', ')
-                self.visit(node.tback)
-
-    # Expressions
-
-    def visit_Attribute(self, node):
-        self.visit(node.value)
-        self.write('.' + node.attr)
-
-    def visit_Call(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        self.visit(node.func)
-        self.write('(')
-        for arg in node.args:
-            write_comma()
-            self.visit(arg)
-        for keyword in node.keywords:
-            write_comma()
-            self.write(keyword.arg + '=')
-            self.visit(keyword.value)
-        if node.starargs is not None:
-            write_comma()
-            self.write('*')
-            self.visit(node.starargs)
-        if node.kwargs is not None:
-            write_comma()
-            self.write('**')
-            self.visit(node.kwargs)
-        self.write(')')
-
-    def visit_Name(self, node):
-        self.write(node.id)
-
-    def visit_Str(self, node):
-        self.write(repr(node.s))
-
-    def visit_Bytes(self, node):
-        self.write(repr(node.s))
-
-    def visit_Num(self, node):
-        self.write(repr(node.n))
-
-    def visit_Tuple(self, node):
-        self.write('(')
-        idx = -1
-        for idx, item in enumerate(node.elts):
-            if idx:
-                self.write(', ')
-            self.visit(item)
-        self.write(idx and ')' or ',)')
-
-    def sequence_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            for idx, item in enumerate(node.elts):
-                if idx:
-                    self.write(', ')
-                self.visit(item)
-            self.write(right)
-        return visit
-
-    visit_List = sequence_visit('[', ']')
-    visit_Set = sequence_visit('{', '}')
-    del sequence_visit
-
-    def visit_Dict(self, node):
-        self.write('{')
-        for idx, (key, value) in enumerate(zip(node.keys, node.values)):
-            if idx:
-                self.write(', ')
-            self.visit(key)
-            self.write(': ')
-            self.visit(value)
-        self.write('}')
-
-    def visit_BinOp(self, node):
-        self.visit(node.left)
-        self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
-        self.visit(node.right)
-
-    def visit_BoolOp(self, node):
-        self.write('(')
-        for idx, value in enumerate(node.values):
-            if idx:
-                self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
-            self.visit(value)
-        self.write(')')
-
-    def visit_Compare(self, node):
-        self.write('(')
-        self.visit(node.left)
-        for op, right in zip(node.ops, node.comparators):
-            self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
-            self.visit(right)
-        self.write(')')
-
-    def visit_UnaryOp(self, node):
-        self.write('(')
-        op = UNARYOP_SYMBOLS[type(node.op)]
-        self.write(op)
-        if op == 'not':
-            self.write(' ')
-        self.visit(node.operand)
-        self.write(')')
-
-    def visit_Subscript(self, node):
-        self.visit(node.value)
-        self.write('[')
-        self.visit(node.slice)
-        self.write(']')
-
-    def visit_Slice(self, node):
-        if node.lower is not None:
-            self.visit(node.lower)
-        self.write(':')
-        if node.upper is not None:
-            self.visit(node.upper)
-        if node.step is not None:
-            self.write(':')
-            if not (isinstance(node.step, Name) and node.step.id == 'None'):
-                self.visit(node.step)
-
-    def visit_ExtSlice(self, node):
-        for idx, item in node.dims:
-            if idx:
-                self.write(', ')
-            self.visit(item)
-
-    def visit_Yield(self, node):
-        self.write('yield ')
-        self.visit(node.value)
-
-    def visit_Lambda(self, node):
-        self.write('lambda ')
-        self.visit(node.args)
-        self.write(': ')
-        self.visit(node.body)
-
-    def visit_Ellipsis(self, node):
-        self.write('Ellipsis')
-
-    def generator_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            self.visit(node.elt)
-            for comprehension in node.generators:
-                self.visit(comprehension)
-            self.write(right)
-        return visit
-
-    visit_ListComp = generator_visit('[', ']')
-    visit_GeneratorExp = generator_visit('(', ')')
-    visit_SetComp = generator_visit('{', '}')
-    del generator_visit
-
-    def visit_DictComp(self, node):
-        self.write('{')
-        self.visit(node.key)
-        self.write(': ')
-        self.visit(node.value)
-        for comprehension in node.generators:
-            self.visit(comprehension)
-        self.write('}')
-
-    def visit_IfExp(self, node):
-        self.visit(node.body)
-        self.write(' if ')
-        self.visit(node.test)
-        self.write(' else ')
-        self.visit(node.orelse)
-
-    def visit_Starred(self, node):
-        self.write('*')
-        self.visit(node.value)
-
-    def visit_Repr(self, node):
-        # XXX: python 2.6 only
-        self.write('`')
-        self.visit(node.value)
-        self.write('`')
-
-    # Helper Nodes
-
-    def visit_alias(self, node):
-        self.write(node.name)
-        if node.asname is not None:
-            self.write(' as ' + node.asname)
-
-    def visit_comprehension(self, node):
-        self.write(' for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        if node.ifs:
-            for if_ in node.ifs:
-                self.write(' if ')
-                self.visit(if_)
-
-    def visit_excepthandler(self, node):
-        self.newline(node)
-        self.write('except')
-        if node.type is not None:
-            self.write(' ')
-            self.visit(node.type)
-            if node.name is not None:
-                self.write(' as ')
-                self.visit(node.name)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_arguments(self, node):
-        self.signature(node)
diff --git a/frameworks/spark/runner/src/main/resources/runtime.py b/frameworks/spark/runner/src/main/resources/runtime.py
deleted file mode 100644
index d01664c..0000000
--- a/frameworks/spark/runner/src/main/resources/runtime.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
diff --git a/frameworks/spark/runner/src/main/resources/spark-version-info.properties b/frameworks/spark/runner/src/main/resources/spark-version-info.properties
deleted file mode 100644
index ce0b312..0000000
--- a/frameworks/spark/runner/src/main/resources/spark-version-info.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-version=2.1.0-SNAPSHOT
-
-user=root
-
-revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe
-
-branch=master
-
-date=2016-07-27T11:23:21Z
-
-url=https://github.com/apache/spark.git
diff --git a/frameworks/spark/runner/src/main/resources/spark_intp.py b/frameworks/spark/runner/src/main/resources/spark_intp.py
deleted file mode 100755
index f3c9fc0..0000000
--- a/frameworks/spark/runner/src/main/resources/spark_intp.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-sys.path.append(os.getcwd())
-from runtime import AmaContext, Environment
-
-# os.chdir(os.getcwd() + '/build/resources/test/')
-# import zipfile
-# zip = zipfile.ZipFile('pyspark.zip')
-# zip.extractall()
-# zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-# zip.extractall()
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
-    actionData = queue.getNext()
-    resultQueue = entry_point.getResultQueue(actionData._2())
-    actionSource = actionData._1()
-    tree = ast.parse(actionSource)
-    exports = actionData._3()
-
-    for node in tree.body:
-
-        wrapper = ast.Module(body=[node])
-        try:
-            co = compile(wrapper, "<ast>", 'exec')
-            exec (co)
-            resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
-            #if this node is an assignment, we need to check if it needs to be persisted
-            try:
-                persistCode = ''
-                if(isinstance(node,ast.Assign)):
-                    varName = node.targets[0].id
-                    if(exports.containsKey(varName)):
-                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
-                        persist = compile(persistCode, '<stdin>', 'exec')
-                        exec(persist)
-
-            except:
-                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
-        except:
-            resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
-    resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
deleted file mode 100644
index 06dfa0d..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner
-
-import java.io._
-
-import com.jcabi.aether.Aether
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunner
-import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunner
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.eclipse.aether.util.artifact.JavaScopes
-import org.sonatype.aether.repository.RemoteRepository
-import org.sonatype.aether.util.artifact.DefaultArtifact
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.sys.process._
-
-class SparkRunnersProvider extends Logging with RunnersProvider {
-
-  private val runners = new TrieMap[String, AmaterasuRunner]
-  private var shellLoger = ProcessLogger(
-    (o: String) => log.info(o),
-    (e: String) => log.error(e)
-
-  )
-  private var conf: Option[Map[String, AnyRef]] = None
-  private var executorEnv: Option[Map[String, AnyRef]] = None
-  private var clusterConfig: ClusterConfig = _
-
-  override def init(execData: ExecData,
-                    jobId: String,
-                    outStream: ByteArrayOutputStream,
-                    notifier: Notifier,
-                    executorId: String,
-                    config: ClusterConfig,
-                    hostName: String): Unit = {
-
-    shellLoger = ProcessLogger(
-      (o: String) => log.info(o),
-      (e: String) => log.error("", e)
-    )
-    clusterConfig = config
-    var jars = Seq.empty[String]
-
-    if (execData.getDeps != null) {
-      jars ++= getDependencies(execData.getDeps)
-    }
-
-    if (execData.getPyDeps != null &&
-      execData.getPyDeps.getFilePaths.nonEmpty) {
-      loadPythonDependencies(execData.getPyDeps, notifier)
-    }
-
-    if(execData.getConfigurations.containsKey("spark")){
-      conf = Some(execData.getConfigurations.get("spark").toMap)
-    }
-
-    if (execData.getConfigurations.containsKey("spark_exec_env")) {
-      executorEnv = Some( execData.getConfigurations.get("spark_exec_env").toMap)
-    }
-    val sparkAppName = s"job_${jobId}_executor_$executorId"
-
-    SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(execData.getEnv, sparkAppName, jars, conf, executorEnv, config, hostName)
-
-    lazy val sparkScalaRunner = SparkScalaRunner(execData.getEnv, jobId, spark, outStream, notifier, jars)
-    sparkScalaRunner.initializeAmaContext(execData.getEnv)
-
-    runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-    var pypath = ""
-    // TODO: get rid of hard-coded version
-    config.mode match {
-      case "yarn" =>
-        pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
-      case "mesos" =>
-        pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
-    }
-    lazy val pySparkRunner = PySparkRunner(execData.getEnv, jobId, notifier, spark, pypath, execData.getPyDeps, config)
-    runners.put(pySparkRunner.getIdentifier, pySparkRunner)
-
-    lazy val sparkSqlRunner = SparkSqlRunner(execData.getEnv, jobId, notifier, spark)
-    runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
-  }
-
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.getChannel
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.getPackageId}") ! shellLoger
-    } else {
-      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.getPackageId}") ! shellLoger
-    }
-  }
-
-  private def installAnacondaOnNode(): Unit = {
-    // TODO: get rid of hard-coded version
-
-    this.clusterConfig.mode match {
-      case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
-      case "mesos" => Seq("sh", "miniconda.sh", "-b", "-p", "miniconda") ! shellLoger
-    }
-
-    Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
-  }
-
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = new PythonPackage("codegen", "", "auto")
-    installAnacondaPackage(codegenPackage)
-//    try {
-//      // notifier.info("loadPythonDependencies #5")
-//      deps.getFilePaths.foreach(pack => {
-//        pack.toLowerCase match {
-//          case "anaconda" => installAnacondaPackage(pack)
-//          // case "pypi" => installPyPiPackage(pack)
-//        }
-//      })
-//    }
-//    catch {
-//
-//      case rte: RuntimeException =>
-//        val sw = new StringWriter
-//        rte.printStackTrace(new PrintWriter(sw))
-//        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-//      case e: Exception =>
-//        val sw = new StringWriter
-//        e.printStackTrace(new PrintWriter(sw))
-//        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-//    }
-  }
-
-  override def getGroupIdentifier: String = "spark"
-
-  override def getRunner(id: String): AmaterasuRunner = runners(id)
-
-  private def getDependencies(deps: Dependencies): Seq[String] = {
-
-    // adding a local repo because Aether needs one
-    val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
-
-    val remotes = deps.getRepos.map(r =>
-      new RemoteRepository(
-        r.getId,
-        r.getType,
-        r.getUrl
-      )).toList.asJava
-
-    val aether = new Aether(remotes, repo)
-
-    deps.getArtifacts.flatMap(a => {
-      aether.resolve(
-        new DefaultArtifact(a.getGroupId, a.getArtifactId, "", "jar", a.getVersion),
-        JavaScopes.RUNTIME
-      ).map(a => a)
-    }).map(x => x.getFile.getAbsolutePath)
-
-  }
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkExecutionQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkExecutionQueue.scala
deleted file mode 100755
index 4a7c3a2..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkExecutionQueue.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.util
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class PySparkExecutionQueue {
-
-  val queue = new LinkedBlockingQueue[(String, String, util.Map[String, String])]()
-
-  def getNext(): (String, String, util.Map[String, String]) = {
-
-    // if the queue is idle for an hour it will return null which
-    // terminates the python execution, need to revisit
-    queue.poll(1, TimeUnit.HOURS)
-
-  }
-
-  def setForExec(line: (String, String, util.Map[String, String])) = {
-
-    queue.put(line)
-
-  }
-
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkResultQueue.scala
deleted file mode 100755
index 1b17a81..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkResultQueue.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import org.apache.amaterasu.frameworks.spark.runner.pyspark.ResultType.ResultType
-
-object ResultType extends Enumeration {
-  type ResultType = Value
-  val success = Value("success")
-  val error = Value("error")
-  val completion = Value("completion")
-}
-
-case class PySparkResult(
-  resultType: ResultType,
-  action: String,
-  statement: String,
-  message: String
-)
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
deleted file mode 100644
index 24dfc8f..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.commons.lang.StringUtils
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.SparkSession
-
-import scala.sys.process.{Process, ProcessLogger}
-
-
-
-
-class PySparkRunner extends Logging with AmaterasuRunner {
-
-  var proc: Process = _
-  var notifier: Notifier = _
-
-  override def getIdentifier: String = "pyspark"
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    interpretSources(actionSource, actionName, exports)
-  }
-
-  def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
-    val resQueue = PySparkEntryPoint.getResultQueue(actionName)
-
-    notifier.info(s"================= Started action $actionName =================")
-
-    var res: PySparkResult = null
-
-    do {
-      res = resQueue.getNext()
-      res.resultType match {
-        case ResultType.success =>
-          notifier.success(res.statement)
-        case ResultType.error =>
-          notifier.error(res.statement, res.message)
-          throw new Exception(res.message)
-        case ResultType.completion =>
-          notifier.info(s"================= finished action $actionName =================")
-      }
-    } while (res != null && res.resultType != ResultType.completion)
-  }
-
-}
-
-object PySparkRunner {
-
-  def collectCondaPackages(): String = {
-    val pkgsDirs = new File("./miniconda/pkgs")
-    (pkgsDirs.listFiles.filter {
-      file => file.getName.endsWith(".tar.bz2")
-    }.map {
-      file => s"./miniconda/pkgs/${file.getName}"
-    }.toBuffer ++ "dist/codegen.py").mkString(",")
-  }
-
-  def apply(env: Environment,
-            jobId: String,
-            notifier: Notifier,
-            spark: SparkSession,
-            pypath: String,
-            pyDeps: PythonDependencies,
-            config: ClusterConfig): PySparkRunner = {
-
-    val shellLoger = ProcessLogger(
-      (o: String) => println(o),
-      (e: String) => println(e)
-    )
-
-    //TODO: can we make this less ugly?
-
-
-    val result = new PySparkRunner
-
-    PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
-    val port = PySparkEntryPoint.getPort
-    var intpPath = ""
-    if (env.getConfiguration.containsKey("cwd")) {
-      val cwd = new File(env.getConfiguration.get("cwd").toString)
-      intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
-    } else {
-      intpPath = s"spark_intp.py"
-    }
-    var pysparkPath = ""
-    var condaPkgs = ""
-    if (pyDeps != null)
-      condaPkgs = collectCondaPackages()
-    var sparkCmd: Seq[String] = Seq()
-    config.mode match {
-      case "yarn" =>
-        pysparkPath = s"${StringUtils.stripStart(config.spark.home,"/")}/bin/spark-submit"
-        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
-        val proc = Process(sparkCmd, None,
-          "PYTHONPATH" -> pypath,
-          "PYTHONHASHSEED" -> 0.toString)
-
-        proc.run(shellLoger)
-      case "mesos" =>
-        pysparkPath = config.pysparkPath
-        if (pysparkPath.endsWith("spark-submit")) {
-          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
-        }
-        else {
-          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
-    }
-        var pysparkPython = "/usr/bin/python"
-
-        val proc = Process(sparkCmd, None,
-      "PYTHONPATH" -> pypath,
-      "PYSPARK_PYTHON" -> pysparkPython,
-      "PYTHONHASHSEED" -> 0.toString)
-
-        proc.run(shellLoger)
-    }
-
-    result.notifier = notifier
-
-    result
-  }
-
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/ResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/ResultQueue.scala
deleted file mode 100755
index daca0fc..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/ResultQueue.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class ResultQueue {
-  val queue = new LinkedBlockingQueue[PySparkResult]()
-
-  def getNext(): PySparkResult = {
-
-    // if the queue is idle for an hour it will return null which
-    // terminates the python execution, need to revisit
-    queue.poll(10, TimeUnit.MINUTES)
-
-  }
-
-  def put(
-    resultType: String,
-    action: String,
-    statement: String,
-    message: String
-  ) = {
-
-    val result = new PySparkResult(ResultType.withName(resultType), action, statement, message)
-    queue.put(result)
-  }
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/AmaSparkILoop.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/AmaSparkILoop.scala
deleted file mode 100755
index 1f803dc..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/AmaSparkILoop.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import java.io.PrintWriter
-
-import org.apache.spark.repl.SparkILoop
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.IMain
-
-class AmaSparkILoop(writer: PrintWriter) extends SparkILoop(None, writer) {
-
-  def create = {
-    this.createInterpreter
-  }
-
-  def setSettings(settings: Settings) = {
-    this.settings = settings
-  }
-
-  def getIntp: IMain = {
-    this.intp
-  }
-
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
deleted file mode 100644
index 65342eb..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import java.io.{ByteArrayOutputStream, File, PrintWriter}
-import java.nio.file.{Files, Paths}
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.common.utils.FileUtils
-import org.apache.commons.lang.StringUtils
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-
-import scala.tools.nsc.GenericRunnerSettings
-import scala.tools.nsc.interpreter.IMain
-
-object SparkRunnerHelper extends Logging {
-
-  private val conf = new SparkConf()
-  private val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
-  private val outputDir = Files.createTempDirectory(Paths.get(rootDir), "repl").toFile
-  outputDir.deleteOnExit()
-
-  private var sparkSession: SparkSession = _
-
-  var notifier: Notifier = _
-
-  private var interpreter: IMain = _
-
-  def getNode: String = sys.env.get("AMA_NODE") match {
-    case None => "127.0.0.1"
-    case _ => sys.env("AMA_NODE")
-  }
-
-  def getOrCreateScalaInterperter(outStream: ByteArrayOutputStream, jars: Seq[String], recreate: Boolean = false): IMain = {
-    if (interpreter == null || recreate) {
-      initInterpreter(outStream, jars)
-    }
-    interpreter
-  }
-
-  private def scalaOptionError(msg: String): Unit = {
-    notifier.error("", msg)
-  }
-
-  private def initInterpreter(outStream: ByteArrayOutputStream, jars: Seq[String]): Unit = {
-
-    var result: IMain = null
-    val config = new ClusterConfig()
-    try {
-
-      val interpArguments = List(
-        "-Yrepl-class-based",
-        "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
-        "-classpath", jars.mkString(File.separator)
-      )
-
-      val settings = new GenericRunnerSettings(scalaOptionError)
-      settings.processArguments(interpArguments, processAll = true)
-
-      settings.classpath.append(System.getProperty("java.class.path") + java.io.File.pathSeparator +
-        "spark-" + config.webserver.sparkVersion + "/jars/*" + java.io.File.pathSeparator +
-        jars.mkString(java.io.File.pathSeparator))
-
-      settings.usejavacp.value = true
-
-      val out = new PrintWriter(outStream)
-      val interpreter = new AmaSparkILoop(out)
-      interpreter.setSettings(settings)
-
-      interpreter.create
-
-      val intp = interpreter.getIntp
-
-      settings.embeddedDefaults(Thread.currentThread().getContextClassLoader)
-      intp.setContextClassLoader
-      intp.initializeSynchronous
-
-      result = intp
-    }
-    catch {
-      case e: Exception =>
-        println(new Predef.String(outStream.toByteArray))
-
-    }
-
-    interpreter = result
-  }
-
-
-  def createSpark(env: Environment,
-                  sparkAppName: String,
-                  jars: Seq[String],
-                  sparkConf: Option[Map[String, Any]],
-                  executorEnv: Option[Map[String, Any]],
-                  config: ClusterConfig,
-                  hostName: String): SparkSession = {
-
-    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
-    val minicondaPkgsPath = "miniconda/pkgs"
-    val executorMinicondaDirRef = new File(minicondaPkgsPath)
-    val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0)
-    val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") ||
-      f.getName.endsWith(".egg") ||
-      f.getName.endsWith(".zip"))
-
-    conf.setAppName(sparkAppName)
-      .set("spark.driver.host", hostName)
-      .set("spark.submit.deployMode", "client")
-      .set("spark.hadoop.validateOutputSpecs", "false")
-      .set("spark.logConf", "true")
-      .set("spark.submit.pyFiles", pyfiles.mkString(","))
-
-
-    val master: String = if (env.getMaster.isEmpty) {
-      "yarn"
-    } else {
-      env.getMaster
-    }
-
-    config.mode match {
-
-      case "mesos" =>
-        conf.set("spark.executor.uri", s"http://$getNode:${config.webserver.Port}/spark-${config.webserver.sparkVersion}.tgz")
-          .setJars(jars)
-          .set("spark.master", env.getMaster)
-          .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-${config.webserver.sparkVersion}")
-
-      case "yarn" =>
-        conf.set("spark.home", StringUtils.stripStart(config.spark.home,"/"))
-          // TODO: parameterize those
-          .setJars(Seq(s"executor-${config.version}-all.jar", s"spark-runner-${config.version}-all.jar", s"spark-runtime-${config.version}.jar") ++ jars)
-          .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
-          .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
-          .set("spark.yarn.queue", "default")
-          .set("spark.history.kerberos.principal", "none")
-
-          .set("spark.master", master)
-          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
-          .set("spark.yarn.jars", s"${StringUtils.stripStart(config.spark.home,"/")}/jars/*")
-          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
-          .set("spark.dynamicAllocation.enabled", "false")
-          .set("spark.eventLog.enabled", "false")
-          .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
-          .set("hadoop.home.dir", config.yarn.hadoopHomeDir)
-
-      case _ => throw new Exception(s"mode ${config.mode} is not legal.")
-    }
-
-    if (config.spark.opts != null && config.spark.opts.nonEmpty) {
-      config.spark.opts.foreach(kv => {
-        log.info(s"Setting ${kv._1} to ${kv._2} as specified in amaterasu.properties")
-        conf.set(kv._1, kv._2)
-      })
-    }
-
-    // adding the the configurations from spark.yml
-    sparkConf match {
-      case Some(cnf) => {
-        for (c <- cnf) {
-          if (c._2.isInstanceOf[String])
-            conf.set(c._1, c._2.toString)
-        }
-      }
-      case None =>
-    }
-
-    // setting the executor env from spark_exec.yml
-    if (executorEnv != null) {
-      executorEnv match {
-        case Some(env) => {
-          for (c <- env) {
-            if (c._2.isInstanceOf[String])
-              conf.setExecutorEnv(c._1, c._2.toString)
-          }
-        }
-        case None =>
-      }
-    }
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
-    sparkSession = SparkSession.builder
-      .appName(sparkAppName)
-      .master(env.getMaster)
-
-      //.enableHiveSupport()
-      .config(conf).getOrCreate()
-
-    sparkSession.conf.getAll.foreach(x => log.info(x.toString))
-
-    val hc = sparkSession.sparkContext.hadoopConfiguration
-
-    sys.env.get("AWS_ACCESS_KEY_ID") match {
-      case None =>
-      case _ =>
-        hc.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
-        hc.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))
-        hc.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
-    }
-
-    sparkSession
-  }
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
deleted file mode 100755
index 7e089b5..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.sql.{Dataset, SparkSession}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-import scala.tools.nsc.interpreter.{IMain, Results}
-
-class ResHolder(var value: Any)
-
-class SparkScalaRunner(var env: Environment,
-                       var jobId: String,
-                       var interpreter: IMain,
-                       var outStream: ByteArrayOutputStream,
-                       var spark: SparkSession,
-                       var notifier: Notifier,
-                       var jars: Seq[String]) extends Logging with AmaterasuRunner {
-
-  private def scalaOptionError(msg: String): Unit = {
-    notifier.error("", msg)
-  }
-
-  override def getIdentifier = "scala"
-
-  val holder = new ResHolder(null)
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    val source = Source.fromString(actionSource)
-    interpretSources(source, actionName, exports.asScala.toMap)
-  }
-
-  def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = {
-
-    notifier.info(s"================= Started action $actionName =================")
-    //notifier.info(s"exports is: $exports")
-
-    for (line <- source.getLines()) {
-
-      // ignoring empty or commented lines
-      if (!line.isEmpty && !line.trim.startsWith("*") && !line.startsWith("/")) {
-
-        outStream.reset()
-        log.debug(line)
-
-        if (line.startsWith("import")) {
-          interpreter.interpret(line)
-        }
-        else {
-          val intresult = interpreter.interpret(line)
-
-          val result = interpreter.prevRequestList.last.lineRep.call("$result")
-
-          // intresult: Success, Error, etc
-          // result: the actual result (RDD, df, etc.) for caching
-          // outStream.toString gives you the error message
-          intresult match {
-            case Results.Success =>
-              log.debug("Results.Success")
-
-              notifier.success(line)
-
-              val resultName = interpreter.prevRequestList.last.termNames.last
-
-              if (exports.contains(resultName.toString)) {
-
-                val format = exports(resultName.toString)
-
-                if (result != null) {
-
-                  result match {
-                    case ds: Dataset[_] =>
-                      log.debug(s"persisting DataFrame: $resultName")
-                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.getWorkingDir}/$jobId/$actionName/$resultName")"""
-                      val writeResult = interpreter.interpret(writeLine)
-                      if (writeResult != Results.Success) {
-                        val err = outStream.toString
-                        notifier.error(writeLine, err)
-                        log.error(s"error persisting dataset: $writeLine Failed with: $err")
-                        //throw new Exception(err)
-                      }
-                      log.debug(s"persisted DataFrame: $resultName")
-
-                    case _ => notifier.info(s"""+++> result type ${result.getClass}""")
-                  }
-                }
-              }
-
-            case Results.Error =>
-              log.debug("Results.Error")
-              val err = outStream.toString
-              notifier.error(line, err)
-              throw new Exception(err)
-
-            case Results.Incomplete =>
-              log.debug("Results.Incomplete")
-
-          }
-        }
-      }
-    }
-
-    notifier.info(s"================= finished action $actionName =================")
-  }
-
-  def initializeAmaContext(env: Environment): Unit = {
-
-    // setting up some context :)
-    val sc = this.spark.sparkContext
-    val sqlContext = this.spark.sqlContext
-
-    interpreter.interpret("import scala.util.control.Exception._")
-    interpreter.interpret("import org.apache.spark.{ SparkContext, SparkConf }")
-    interpreter.interpret("import org.apache.spark.sql.SQLContext")
-    interpreter.interpret("import org.apache.spark.sql.{ Dataset, SparkSession }")
-    interpreter.interpret("import org.apache.spark.sql.SaveMode")
-    interpreter.interpret("import org.apache.amaterasu.frameworks.spark.runtime.AmaContext")
-    interpreter.interpret("import org.apache.amaterasu.common.runtime.Environment")
-
-    // creating a map (_contextStore) to hold the different spark contexts
-    // in th REPL and getting a reference to it
-    interpreter.interpret("var _contextStore = scala.collection.mutable.Map[String, AnyRef]()")
-    val contextStore = interpreter.prevRequestList.last.lineRep.call("$result").asInstanceOf[mutable.Map[String, AnyRef]]
-    AmaContext.init(spark, jobId, env)
-
-    // populating the contextStore
-    contextStore.put("sc", sc)
-    contextStore.put("sqlContext", sqlContext)
-    contextStore.put("env", env)
-    contextStore.put("spark", spark)
-    contextStore.put("ac", AmaContext)
-
-    interpreter.interpret("val sc = _contextStore(\"sc\").asInstanceOf[SparkContext]")
-    interpreter.interpret("val sqlContext = _contextStore(\"sqlContext\").asInstanceOf[SQLContext]")
-    interpreter.interpret("val env = _contextStore(\"env\").asInstanceOf[Environment]")
-    interpreter.interpret("val spark = _contextStore(\"spark\").asInstanceOf[SparkSession]")
-    interpreter.interpret("val AmaContext = _contextStore(\"ac\").asInstanceOf[AmaContext]")
-    interpreter.interpret("import sqlContext.implicits._")
-
-    // initializing the AmaContext
-    println(s"""AmaContext.init(sc, sqlContext ,"$jobId")""")
-
-  }
-
-}
-
-object SparkScalaRunner extends Logging {
-
-  def apply(env: Environment,
-            jobId: String,
-            spark: SparkSession,
-            outStream: ByteArrayOutputStream,
-            notifier: Notifier,
-            jars: Seq[String]): SparkScalaRunner = {
-
-    new SparkScalaRunner(env, jobId, SparkRunnerHelper.getOrCreateScalaInterperter(outStream, jars), outStream, spark, notifier, jars)
-
-  }
-
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparkr/SparkRRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparkr/SparkRRunner.scala
deleted file mode 100644
index 2dc7b4e..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparkr/SparkRRunner.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.sparkr
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkContext
-
-
-class SparkRRunner extends Logging with AmaterasuRunner {
-
-    override def getIdentifier = "spark-r"
-
-    override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    }
-}
-
-object SparkRRunner {
-    def apply(
-               env: Environment,
-               jobId: String,
-               sparkContext: SparkContext,
-               outStream: ByteArrayOutputStream,
-               notifier: Notifier,
-               jars: Seq[String]
-             ): SparkRRunner = {
-        new SparkRRunner()
-    }
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
deleted file mode 100644
index 9230bca..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.sparksql
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.commons.io.FilenameUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Amaterasu currently supports JSON and PARQUET as data sources.
-  * CSV data source support will be provided in later versions.
-  */
-class SparkSqlRunner extends Logging with AmaterasuRunner {
-  var env: Environment = _
-  var notifier: Notifier = _
-  var jobId: String = _
-  //var actionName: String = _
-  var spark: SparkSession = _
-
-  /*
-  Method: executeQuery
-  Description: when user specifies query in amaterasu format, this method parse and executes the query.
-               If not in Amaterasu format, then directly executes the query
-  @Params: query string
-   */
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    notifier.info(s"================= Started action $actionName =================")
-
-    if (!actionSource.isEmpty) {
-
-      var result: DataFrame = null
-      if (actionSource.toLowerCase.contains("amacontext")) {
-
-        //Parse the incoming query
-        //notifier.info(s"================= parsing the SQL query =================")
-
-        val parser: List[String] = actionSource.toLowerCase.split(" ").toList
-        var sqlPart1: String = ""
-        var sqlPart2: String = ""
-        var queryTempLen: Int = 0
-
-        //get only the sql part of the query
-        for (i <- 0 to parser.indexOf("from")) {
-          sqlPart1 += parser(i) + " "
-        }
-
-        if (parser.indexOf("readas") == -1) {
-          queryTempLen = parser.length - 1
-        }
-        else
-          queryTempLen = parser.length - 3
-
-        for (i <- parser.indexOf("from") + 1 to queryTempLen) {
-          if (!parser(i).contains("amacontext"))
-            sqlPart2 += " " + parser(i)
-        }
-
-        //If no read format is speicified by the user, use PARQUET as default file format to load data
-        var fileFormat: String = null
-        //if there is no index for "readas" keyword, then set PARQUET as default read format
-        if (parser.indexOf("readas") == -1) {
-          fileFormat = "parquet"
-        }
-        else
-          fileFormat = parser(parser.indexOf("readas") + 1)
-
-
-        val locationPath: String = parser.filter(word => word.contains("amacontext")).mkString("")
-        val directories = locationPath.split("_")
-        val actionName = directories(1)
-        val dfName = directories(2)
-        val parsedQuery = sqlPart1 + locationPath + sqlPart2
-
-        //Load the dataframe from previous action
-        val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat)
-        loadData.createOrReplaceTempView(locationPath)
-
-
-        try{
-
-          result = spark.sql(parsedQuery)
-          notifier.success(parsedQuery)
-        } catch {
-          case e: Exception => notifier.error(parsedQuery, e.getMessage)
-        }
-
-      }
-      else {
-
-        notifier.info("Executing SparkSql on: " + actionSource)
-
-        result = spark.sql(actionSource)
-      }
-      val exportsBuff = exports.asScala.toBuffer
-      if (exportsBuff.nonEmpty) {
-        val exportName = exportsBuff.head._1
-        val exportFormat = exportsBuff.head._2
-        //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
-        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.getWorkingDir}/$jobId/$actionName/$exportName")
-      }
-      notifier.info(s"================= finished action $actionName =================")
-    }
-  }
-
-  /*
-  Method to find the file type of files within a directory
-  @Params
-  folderName : Path to location of the directory containing data-source files
-  */
-
-  def findFileType(folderName: File): Array[String] = {
-    // get all the files from a directory
-    val files: Array[File] = folderName.listFiles()
-    val extensions: Array[String] = files.map(file => FilenameUtils.getExtension(file.toString))
-    extensions
-  }
-
-  override def getIdentifier: String = "sql"
-
-}
-
-object SparkSqlRunner {
-
-  def apply(env: Environment,
-            jobId: String,
-            // actionName: String,
-            notifier: Notifier,
-            spark: SparkSession): SparkSqlRunner = {
-
-    val sparkSqlRunnerObj = new SparkSqlRunner
-
-    sparkSqlRunnerObj.env = env
-    sparkSqlRunnerObj.jobId = jobId
-    //sparkSqlRunnerObj.actionName = actionName
-    sparkSqlRunnerObj.notifier = notifier
-    sparkSqlRunnerObj.spark = spark
-    sparkSqlRunnerObj
-  }
-}
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
deleted file mode 100644
index 5f0ce0e..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
+++ /dev/null
@@ -1,4 +0,0 @@
-name,age
-sampath,22
-kirupa,30
-dev,19
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json b/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
deleted file mode 100644
index d297f1f..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{"name":"Sampath","age":22}
-{"name":"Kirupa", "age":30}
-{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644
index 7d66b64..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644
index 74b1890..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
+++ /dev/null
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
deleted file mode 100644
index 5d83fd6..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
deleted file mode 100644
index ac54053..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index e1b0d2e..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index d807ba9..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/amaterasu.properties b/frameworks/spark/runner/src/test/resources/amaterasu.properties
deleted file mode 100755
index e95df02..0000000
--- a/frameworks/spark/runner/src/test/resources/amaterasu.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-zk=127.0.0.1
-version=0.2.0-incubating
-master=192.168.33.11
-user=root
-mode=mesos
-webserver.port=8000
-webserver.root=dist
-spark.version=2.1.1-bin-hadoop2.7
-pysparkPath = /usr/bin/python
diff --git a/frameworks/spark/runner/src/test/resources/codegen.py b/frameworks/spark/runner/src/test/resources/codegen.py
deleted file mode 100644
index 113d9be..0000000
--- a/frameworks/spark/runner/src/test/resources/codegen.py
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
-    codegen
-    ~~~~~~~
-
-    Extension to ast that allow ast -> python code generation.
-
-    :copyright: Copyright 2008 by Armin Ronacher.
-    :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
-    """This function can convert a node tree back into python sourcecode.
-    This is useful for debugging purposes, especially if you're dealing with
-    custom asts not generated by python itself.
-
-    It could be that the sourcecode is evaluable when the AST itself is not
-    compilable / evaluable.  The reason for this is that the AST contains some
-    more data than regular sourcecode does, which is dropped during
-    conversion.
-
-    Each level of indentation is replaced with `indent_with`.  Per default this
-    parameter is equal to four spaces as suggested by PEP 8, but it might be
-    adjusted to match the application's styleguide.
-
-    If `add_line_information` is set to `True` comments for the line numbers
-    of the nodes are added to the output.  This can be used to spot wrong line
-    number information of statement nodes.
-    """
-    generator = SourceGenerator(indent_with, add_line_information)
-    generator.visit(node)
-
-    return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
-    """This visitor is able to transform a well formed syntax tree into python
-    sourcecode.  For more details have a look at the docstring of the
-    `node_to_source` function.
-    """
-
-    def __init__(self, indent_with, add_line_information=False):
-        self.result = []
-        self.indent_with = indent_with
-        self.add_line_information = add_line_information
-        self.indentation = 0
-        self.new_lines = 0
-
-    def write(self, x):
-        if self.new_lines:
-            if self.result:
-                self.result.append('\n' * self.new_lines)
-            self.result.append(self.indent_with * self.indentation)
-            self.new_lines = 0
-        self.result.append(x)
-
-    def newline(self, node=None, extra=0):
-        self.new_lines = max(self.new_lines, 1 + extra)
-        if node is not None and self.add_line_information:
-            self.write('# line: %s' % node.lineno)
-            self.new_lines = 1
-
-    def body(self, statements):
-        self.new_line = True
-        self.indentation += 1
-        for stmt in statements:
-            self.visit(stmt)
-        self.indentation -= 1
-
-    def body_or_else(self, node):
-        self.body(node.body)
-        if node.orelse:
-            self.newline()
-            self.write('else:')
-            self.body(node.orelse)
-
-    def signature(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        padding = [None] * (len(node.args) - len(node.defaults))
-        for arg, default in zip(node.args, padding + node.defaults):
-            write_comma()
-            self.visit(arg)
-            if default is not None:
-                self.write('=')
-                self.visit(default)
-        if node.vararg is not None:
-            write_comma()
-            self.write('*' + node.vararg)
-        if node.kwarg is not None:
-            write_comma()
-            self.write('**' + node.kwarg)
-
-    def decorators(self, node):
-        for decorator in node.decorator_list:
-            self.newline(decorator)
-            self.write('@')
-            self.visit(decorator)
-
-    # Statements
-
-    def visit_Assert(self, node):
-        self.newline(node)
-        self.write('assert ')
-        self.visit(node.test)
-        if node.msg is not None:
-           self.write(', ')
-           self.visit(node.msg)
-
-    def visit_Assign(self, node):
-        self.newline(node)
-        for idx, target in enumerate(node.targets):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-        self.write(' = ')
-        self.visit(node.value)
-
-    def visit_AugAssign(self, node):
-        self.newline(node)
-        self.visit(node.target)
-        self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
-        self.visit(node.value)
-
-    def visit_ImportFrom(self, node):
-        self.newline(node)
-        self.write('from %s%s import ' % ('.' * node.level, node.module))
-        for idx, item in enumerate(node.names):
-            if idx:
-                self.write(', ')
-            self.write(item)
-
-    def visit_Import(self, node):
-        self.newline(node)
-        for item in node.names:
-            self.write('import ')
-            self.visit(item)
-
-    def visit_Expr(self, node):
-        self.newline(node)
-        self.generic_visit(node)
-
-    def visit_FunctionDef(self, node):
-        self.newline(extra=1)
-        self.decorators(node)
-        self.newline(node)
-        self.write('def %s(' % node.name)
-        self.visit(node.args)
-        self.write('):')
-        self.body(node.body)
-
-    def visit_ClassDef(self, node):
-        have_args = []
-        def paren_or_comma():
-            if have_args:
-                self.write(', ')
-            else:
-                have_args.append(True)
-                self.write('(')
-
-        self.newline(extra=2)
-        self.decorators(node)
-        self.newline(node)
-        self.write('class %s' % node.name)
-        for base in node.bases:
-            paren_or_comma()
-            self.visit(base)
-        # XXX: the if here is used to keep this module compatible
-        #      with python 2.6.
-        if hasattr(node, 'keywords'):
-            for keyword in node.keywords:
-                paren_or_comma()
-                self.write(keyword.arg + '=')
-                self.visit(keyword.value)
-            if node.starargs is not None:
-                paren_or_comma()
-                self.write('*')
-                self.visit(node.starargs)
-            if node.kwargs is not None:
-                paren_or_comma()
-                self.write('**')
-                self.visit(node.kwargs)
-        self.write(have_args and '):' or ':')
-        self.body(node.body)
-
-    def visit_If(self, node):
-        self.newline(node)
-        self.write('if ')
-        self.visit(node.test)
-        self.write(':')
-        self.body(node.body)
-        while True:
-            else_ = node.orelse
-            if len(else_) == 0:
-                break
-            elif len(else_) == 1 and isinstance(else_[0], If):
-                node = else_[0]
-                self.newline()
-                self.write('elif ')
-                self.visit(node.test)
-                self.write(':')
-                self.body(node.body)
-            else:
-                self.newline()
-                self.write('else:')
-                self.body(else_)
-                break
-
-    def visit_For(self, node):
-        self.newline(node)
-        self.write('for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_While(self, node):
-        self.newline(node)
-        self.write('while ')
-        self.visit(node.test)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_With(self, node):
-        self.newline(node)
-        self.write('with ')
-        self.visit(node.context_expr)
-        if node.optional_vars is not None:
-            self.write(' as ')
-            self.visit(node.optional_vars)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_Pass(self, node):
-        self.newline(node)
-        self.write('pass')
-
-    def visit_Print(self, node):
-        # XXX: python 2.6 only
-        self.newline(node)
-        self.write('print ')
-        want_comma = False
-        if node.dest is not None:
-            self.write(' >> ')
-            self.visit(node.dest)
-            want_comma = True
-        for value in node.values:
-            if want_comma:
-                self.write(', ')
-            self.visit(value)
-            want_comma = True
-        if not node.nl:
-            self.write(',')
-
-    def visit_Delete(self, node):
-        self.newline(node)
-        self.write('del ')
-        for idx, target in enumerate(node):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-
-    def visit_TryExcept(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        for handler in node.handlers:
-            self.visit(handler)
-
-    def visit_TryFinally(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        self.newline(node)
-        self.write('finally:')
-        self.body(node.finalbody)
-
-    def visit_Global(self, node):
-        self.newline(node)
-        self.write('global ' + ', '.join(node.names))
-
-    def visit_Nonlocal(self, node):
-        self.newline(node)
-        self.write('nonlocal ' + ', '.join(node.names))
-
-    def visit_Return(self, node):
-        self.newline(node)
-        if node.value is None:
-            self.write('return')
-        else:
-            self.write('return ')
-            self.visit(node.value)
-
-    def visit_Break(self, node):
-        self.newline(node)
-        self.write('break')
-
-    def visit_Continue(self, node):
-        self.newline(node)
-        self.write('continue')
-
-    def visit_Raise(self, node):
-        # XXX: Python 2.6 / 3.0 compatibility
-        self.newline(node)
-        self.write('raise')
-        if hasattr(node, 'exc') and node.exc is not None:
-            self.write(' ')
-            self.visit(node.exc)
-            if node.cause is not None:
-                self.write(' from ')
-                self.visit(node.cause)
-        elif hasattr(node, 'type') and node.type is not None:
-            self.visit(node.type)
-            if node.inst is not None:
-                self.write(', ')
-                self.visit(node.inst)
-            if node.tback is not None:
-                self.write(', ')
-                self.visit(node.tback)
-
-    # Expressions
-
-    def visit_Attribute(self, node):
-        self.visit(node.value)
-        self.write('.' + node.attr)
-
-    def visit_Call(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        self.visit(node.func)
-        self.write('(')
-        for arg in node.args:
-            write_comma()
-            self.visit(arg)
-        for keyword in node.keywords:
-            write_comma()
-            self.write(keyword.arg + '=')
-            self.visit(keyword.value)
-        if node.starargs is not None:
-            write_comma()
-            self.write('*')
-            self.visit(node.starargs)
-        if node.kwargs is not None:
-            write_comma()
-            self.write('**')
-            self.visit(node.kwargs)
-        self.write(')')
-
-    def visit_Name(self, node):
-        self.write(node.id)
-
-    def visit_Str(self, node):
-        self.write(repr(node.s))
-
-    def visit_Bytes(self, node):
-        self.write(repr(node.s))
-
-    def visit_Num(self, node):
-        self.write(repr(node.n))
-
-    def visit_Tuple(self, node):
-        self.write('(')
-        idx = -1
-        for idx, item in enumerate(node.elts):
-            if idx:
-                self.write(', ')
-            self.visit(item)
-        self.write(idx and ')' or ',)')
-
-    def sequence_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            for idx, item in enumerate(node.elts):
-                if idx:
-                    self.write(', ')
-                self.visit(item)
-            self.write(right)
-        return visit
-
-    visit_List = sequence_visit('[', ']')
-    visit_Set = sequence_visit('{', '}')
-    del sequence_visit
-
-    def visit_Dict(self, node):
-        self.write('{')
-        for idx, (key, value) in enumerate(zip(node.keys, node.values)):
-            if idx:
-                self.write(', ')
-            self.visit(key)
-            self.write(': ')
-            self.visit(value)
-        self.write('}')
-
-    def visit_BinOp(self, node):
-        self.visit(node.left)
-        self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
-        self.visit(node.right)
-
-    def visit_BoolOp(self, node):
-        self.write('(')
-        for idx, value in enumerate(node.values):
-            if idx:
-                self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
-            self.visit(value)
-        self.write(')')
-
-    def visit_Compare(self, node):
-        self.write('(')
-        self.visit(node.left)
-        for op, right in zip(node.ops, node.comparators):
-            self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
-            self.visit(right)
-        self.write(')')
-
-    def visit_UnaryOp(self, node):
-        self.write('(')
-        op = UNARYOP_SYMBOLS[type(node.op)]
-        self.write(op)
-        if op == 'not':
-            self.write(' ')
-        self.visit(node.operand)
-        self.write(')')
-
-    def visit_Subscript(self, node):
-        self.visit(node.value)
-        self.write('[')
-        self.visit(node.slice)
-        self.write(']')
-
-    def visit_Slice(self, node):
-        if node.lower is not None:
-            self.visit(node.lower)
-        self.write(':')
-        if node.upper is not None:
-            self.visit(node.upper)
-        if node.step is not None:
-            self.write(':')
-            if not (isinstance(node.step, Name) and node.step.id == 'None'):
-                self.visit(node.step)
-
-    def visit_ExtSlice(self, node):
-        for idx, item in node.dims:
-            if idx:
-                self.write(', ')
-            self.visit(item)
-
-    def visit_Yield(self, node):
-        self.write('yield ')
-        self.visit(node.value)
-
-    def visit_Lambda(self, node):
-        self.write('lambda ')
-        self.visit(node.args)
-        self.write(': ')
-        self.visit(node.body)
-
-    def visit_Ellipsis(self, node):
-        self.write('Ellipsis')
-
-    def generator_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            self.visit(node.elt)
-            for comprehension in node.generators:
-                self.visit(comprehension)
-            self.write(right)
-        return visit
-
-    visit_ListComp = generator_visit('[', ']')
-    visit_GeneratorExp = generator_visit('(', ')')
-    visit_SetComp = generator_visit('{', '}')
-    del generator_visit
-
-    def visit_DictComp(self, node):
-        self.write('{')
-        self.visit(node.key)
-        self.write(': ')
-        self.visit(node.value)
-        for comprehension in node.generators:
-            self.visit(comprehension)
-        self.write('}')
-
-    def visit_IfExp(self, node):
-        self.visit(node.body)
-        self.write(' if ')
-        self.visit(node.test)
-        self.write(' else ')
-        self.visit(node.orelse)
-
-    def visit_Starred(self, node):
-        self.write('*')
-        self.visit(node.value)
-
-    def visit_Repr(self, node):
-        # XXX: python 2.6 only
-        self.write('`')
-        self.visit(node.value)
-        self.write('`')
-
-    # Helper Nodes
-
-    def visit_alias(self, node):
-        self.write(node.name)
-        if node.asname is not None:
-            self.write(' as ' + node.asname)
-
-    def visit_comprehension(self, node):
-        self.write(' for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        if node.ifs:
-            for if_ in node.ifs:
-                self.write(' if ')
-                self.visit(if_)
-
-    def visit_excepthandler(self, node):
-        self.newline(node)
-        self.write('except')
-        if node.type is not None:
-            self.write(' ')
-            self.visit(node.type)
-            if node.name is not None:
-                self.write(' as ')
-                self.visit(node.name)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_arguments(self, node):
-        self.signature(node)
diff --git a/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip b/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
deleted file mode 100644
index 8c3829e..0000000
--- a/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/py4j.tar.gz b/frameworks/spark/runner/src/test/resources/py4j.tar.gz
deleted file mode 100644
index 761a0af..0000000
--- a/frameworks/spark/runner/src/test/resources/py4j.tar.gz
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py b/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
deleted file mode 100755
index c940eea..0000000
--- a/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
-
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-rdd = ama_context.sc.parallelize(data)
-odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/pyspark.tar.gz b/frameworks/spark/runner/src/test/resources/pyspark.tar.gz
deleted file mode 100644
index 6f25984..0000000
--- a/frameworks/spark/runner/src/test/resources/pyspark.tar.gz
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/pyspark.zip b/frameworks/spark/runner/src/test/resources/pyspark.zip
deleted file mode 100644
index a624c9f..0000000
--- a/frameworks/spark/runner/src/test/resources/pyspark.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/runtime.py b/frameworks/spark/runner/src/test/resources/runtime.py
deleted file mode 100644
index d01664c..0000000
--- a/frameworks/spark/runner/src/test/resources/runtime.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
diff --git a/frameworks/spark/runner/src/test/resources/simple-pyspark.py b/frameworks/spark/runner/src/test/resources/simple-pyspark.py
deleted file mode 100755
index 923f81c..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-pyspark.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-try:
-    rdd = sc.parallelize(data)
-
-    def g(x):
-        print(x)
-
-    rdd.foreach(g)
-except Exception as e:
-    print type(e), e
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/simple-python-err.py b/frameworks/spark/runner/src/test/resources/simple-python-err.py
deleted file mode 100755
index dff1491..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-python-err.py
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-1/0
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
-    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/frameworks/spark/runner/src/test/resources/simple-python.py b/frameworks/spark/runner/src/test/resources/simple-python.py
deleted file mode 100755
index 0ac6f85..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-python.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
-print(data)
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
-    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/frameworks/spark/runner/src/test/resources/simple-spark.scala b/frameworks/spark/runner/src/test/resources/simple-spark.scala
deleted file mode 100755
index f2e49fd..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-spark.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.spark.sql.{DataFrame, SaveMode}
-
-val data = Seq(1,3,4,5,6)
-
-
-val sc = AmaContext.sc
-val rdd = sc.parallelize(data)
-val sqlContext = AmaContext.spark
-val x: DataFrame = rdd.toDF()
-
-x.write.mode(SaveMode.Overwrite)
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/spark-version-info.properties b/frameworks/spark/runner/src/test/resources/spark-version-info.properties
deleted file mode 100644
index 8410a21..0000000
--- a/frameworks/spark/runner/src/test/resources/spark-version-info.properties
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-version=2.1.0-SNAPSHOT
-
-user=root
-
-revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe
-
-branch=master
-
-date=2016-07-27T11:23:21Z
-
-url=https://github.com/apache/spark.git
diff --git a/frameworks/spark/runner/src/test/resources/spark_intp.py b/frameworks/spark/runner/src/test/resources/spark_intp.py
deleted file mode 100755
index fd8dc0e..0000000
--- a/frameworks/spark/runner/src/test/resources/spark_intp.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-from runtime import AmaContext, Environment
-
-os.chdir(os.getcwd() + '/build/resources/test/')
-import zipfile
-zip = zipfile.ZipFile('pyspark.zip')
-zip.extractall()
-zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-zip.extractall()
-sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-sys.path.append(os.getcwd())
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
-    actionData = queue.getNext()
-    resultQueue = entry_point.getResultQueue(actionData._2())
-    actionSource = actionData._1()
-    tree = ast.parse(actionSource)
-    exports = actionData._3()
-
-    for node in tree.body:
-
-        wrapper = ast.Module(body=[node])
-        try:
-            co = compile(wrapper, "<ast>", 'exec')
-            exec (co)
-            resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
-            #if this node is an assignment, we need to check if it needs to be persisted
-            try:
-                persistCode = ''
-                if(isinstance(node,ast.Assign)):
-                    varName = node.targets[0].id
-                    if(exports.containsKey(varName)):
-                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
-                        persist = compile(persistCode, '<stdin>', 'exec')
-                        exec(persist)
-
-            except:
-                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
-        except:
-            resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
-    resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/step-2.scala b/frameworks/spark/runner/src/test/resources/step-2.scala
deleted file mode 100755
index 86fd048..0000000
--- a/frameworks/spark/runner/src/test/resources/step-2.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
-highNoDf.show
diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index e1b0d2e..0000000
--- a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index d807ba9..0000000
--- a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/RunnersLoadingTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/RunnersLoadingTests.scala
deleted file mode 100644
index dff1e4a..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/RunnersLoadingTests.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.scalatest._
-
-@DoNotDiscover
-class RunnersLoadingTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  var env: Environment = _
-  var factory: ProvidersFactory = _
-
-  "RunnersFactory" should "be loaded with all the implementations of AmaterasuRunner in its classpath" in {
-    val r = factory.getRunner("spark", "scala")
-    r should not be null
-  }
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
deleted file mode 100644
index d940b2f..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner
-
-import java.io.{ByteArrayOutputStream, File}
-
-import org.apache.amaterasu.common.dataobjects.{Artifact, ExecData, Repo}
-import org.apache.amaterasu.common.execution.dependencies._
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunnerTests
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkScalaRunner, SparkScalaRunnerTests}
-import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunnerTests
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.spark.sql.SparkSession
-import org.scalatest._
-
-import scala.collection.mutable.ListBuffer
-import scala.collection.JavaConverters._
-
-class SparkTestsSuite extends Suites(
-  //new PySparkRunnerTests,
-  new RunnersLoadingTests,
-  new SparkSqlRunnerTests,
-  new SparkScalaRunnerTests
-) with BeforeAndAfterAll {
-
-  var env: Environment = _
-  var factory: ProvidersFactory = _
-  var spark: SparkSession = _
-
-  private def createTestMiniconda(): Unit = {
-    println(s"PATH: ${new File(".").getAbsolutePath}")
-    new File("miniconda/pkgs").mkdirs()
-  }
-
-  override def beforeAll(): Unit = {
-
-    // I can't apologise enough for this
-    val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
-    val workDir = new File(resources).getParentFile.getParent
-
-    env = new Environment()
-    env.setWorkingDir(s"file://$workDir")
-
-    env.setMaster("local[1]")
-    if (env.getConfiguration != null) env.setConfiguration(Map("pysparkPath" -> "/usr/bin/python").asJava) else env.setConfiguration( Map(
-      "pysparkPath" -> "/usr/bin/python",
-      "cwd" -> resources
-    ).asJava)
-
-    val excEnv = Map[String, Any](
-      "PYTHONPATH" -> resources
-    )
-    createTestMiniconda()
-    env.setConfiguration(Map( "spark_exec_env" -> excEnv).asJava)
-    factory = ProvidersFactory(new ExecData(env,
-      new Dependencies(ListBuffer.empty[Repo].asJava, List.empty[Artifact].asJava),
-      new PythonDependencies(Array.empty[String]),
-      Map(
-        "spark" -> Map.empty[String, Any].asJava,
-        "spark_exec_env" -> Map("PYTHONPATH" -> resources).asJava).asJava),
-      "test",
-      new ByteArrayOutputStream(),
-      new TestNotifier(),
-      "test",
-      "localhost",
-      getClass.getClassLoader.getResource("amaterasu.properties").getPath)
-    spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark
-
-    this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkScalaRunnerTests]).foreach(s => s.asInstanceOf[SparkScalaRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
-
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    new File("miniconda").delete()
-    spark.stop()
-
-    super.afterAll()
-  }
-
-}
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunnerTests.scala
deleted file mode 100755
index 1ed029f..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunnerTests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.io.File
-
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-import scala.io.Source
-
-@DoNotDiscover
-class PySparkRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  Logger.getLogger("spark").setLevel(Level.OFF)
-  Logger.getLogger("jetty").setLevel(Level.OFF)
-  Logger.getRootLogger.setLevel(Level.OFF)
-
-  var factory: ProvidersFactory = _
-
-  def delete(file: File) {
-    if (file.isDirectory)
-      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
-    file.delete
-  }
-
-  override protected def afterAll(): Unit = {
-    val pysparkDir = new File(getClass.getResource("/pyspark").getPath)
-    val py4jDir = new File(getClass.getResource("/py4j").getPath)
-    delete(pysparkDir)
-    delete(py4jDir)
-    super.afterAll()
-  }
-
-
-  "PySparkRunner.executeSource" should "execute simple python code" in {
-    val src = Source.fromFile(getClass.getResource("/simple-python.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    println("3333333333333333333333")
-    runner.executeSource(src, "test_action1", Map.empty[String, String].asJava)
-  }
-
-  it should "print and trows an errors" in {
-    a[java.lang.Exception] should be thrownBy {
-      val src = Source.fromFile(getClass.getResource("/simple-python-err.py").getPath).mkString
-      var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-      runner.executeSource(src, "test_action2", Map.empty[String, String].asJava)
-    }
-  }
-
-  it should "also execute spark code written in python" in {
-    val src = Source.fromFile(getClass.getResource("/simple-pyspark.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    runner.executeSource(src, "test_action3", Map("numDS" -> "parquet").asJava)
-  }
-
-  it should "also execute spark code written in python with AmaContext being used" in {
-    val src = Source.fromFile(getClass.getResource("/pyspark-with-amacontext.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    runner.executeSource(src, "test_action4", Map.empty[String, String].asJava)
-  }
-
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
deleted file mode 100755
index 011c4d9..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-import scala.io.Source
-
-@DoNotDiscover
-class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-  var factory: ProvidersFactory = _
-  var runner: SparkScalaRunner = _
-
-
-  "SparkScalaRunner" should "execute the simple-spark.scala" in {
-
-
-    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
-    val script = getClass.getResource("/simple-spark.scala").getPath
-    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
-    sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
-
-  }
-
-  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
-
-    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
-    val script = getClass.getResource("/step-2.scala").getPath
-    sparkRunner.env.setWorkingDir(s"${getClass.getResource("/tmp").getPath}")
-    AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
-    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
-    sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
-
-  }
-
-
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
deleted file mode 100644
index 756bed9..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.sparksql
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.log4j.{Level, Logger}
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-@DoNotDiscover
-class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  Logger.getLogger("spark").setLevel(Level.OFF)
-  Logger.getLogger("jetty").setLevel(Level.OFF)
-  Logger.getRootLogger.setLevel(Level.OFF)
-
-
-  val notifier = new TestNotifier()
-
-  var factory: ProvidersFactory = _
-  var env: Environment = _
-
-  /*
-  Test whether parquet is used as default file format to load data from previous actions
-   */
-
-  "SparkSql" should "load data as parquet if no input foramt is specified" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
-
-    val outputDf = spark.read.parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_parquet_test/result")
-    println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
-    outputDf.first().getString(1) shouldEqual "Michael"
-  }
-
-  /*
-  Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL
-   */
-
-  "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
-
-    val outputDf = spark.read.parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
-    println("Output Parquet: " + inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
-  }
-
-
-  /*
-  Test whether the JSON data is successfully parsed, loaded by SparkSQL
-  */
-
-  "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
-
-    inputDf.write.mode(SaveMode.Overwrite).json(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf  where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
-
-    val outputDf = spark.read.json(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_json_test/result")
-    println("Output JSON: " + inputDf.count + "," + outputDf.count)
-    outputDf.first().getString(1) shouldEqual "Kirupa"
-
-  }
-
-  /*
-  Test whether the CSV data is successfully parsed, loaded by SparkSQL
-  */
-
-  "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
-
-
-    val outputDf = spark.read.csv(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_csv_test/result")
-    println("Output CSV: " + inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
-  }
-
-  /*
-  Test whether the data can be directly read from a file and executed by sparkSql
-  */
-//  "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
-//
-//    val tempFileEnv = Environment()
-//    tempFileEnv.workingDir = "file:/tmp/"
-//    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
-//
-//    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
-//    sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
-//    val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-//    println("Output Parquet dataframe: " + outputParquetDf.show)
-//    outputParquetDf.first().getString(1) shouldEqual "Michael"
-//    sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
-//
-//    val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-//    println("Output Json dataframe: " + outputJsonDf.show)
-//    outputJsonDf.first().getString(1) shouldEqual "Sampath"
-//
-//  }
-
-
-}
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
deleted file mode 100644
index 6a9e9f0..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-
-
-class TestNotifier extends Notifier  {
-
-  override def info(msg: String): Unit = {
-    getLog.info(msg)
-  }
-
-  override def success(line: String): Unit = {
-    getLog.info(s"successfully executed line: $line")
-  }
-
-  override def error(line: String, msg: String): Unit = {
-    getLog.error(s"Error executing line: $line message: $msg")
-  }
-}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index 35f3800..e89345b 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -395,7 +395,7 @@
     private fun requestContainer(actionData: ActionData, capability: Resource) {
 
         actionsBuffer.add(actionData)
-        log.info("About to ask container for action ${actionData.id} with mem ${capability.memory} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}")
+        log.info("About to ask container for action ${actionData.id} with mem ${capability.memorySize} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}")
 
         // we have an action to schedule, let's request a container
         val priority: Priority = Records.newRecord(Priority::class.java)
diff --git a/settings.gradle b/settings.gradle
index e41445d..9e757dc 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -40,8 +40,6 @@
 
 // Frameworks
 // Spark
-include 'spark-runner'
-project(':spark-runner').projectDir=file("frameworks/spark/runner")
 include 'spark-runtime'
 project(':spark-runtime').projectDir=file("frameworks/spark/runtime")
 include 'spark-dispatcher'