Merge pull request #19 from amezng/amaterasu-21
AMATERASU-21 Fix Spark scala tests
diff --git a/executor/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala
index 797235d..34798e7 100755
--- a/executor/src/test/resources/simple-spark.scala
+++ b/executor/src/test/resources/simple-spark.scala
@@ -1,11 +1,13 @@
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.spark.sql.{DataFrame, SaveMode}
-val data = Array(1, 2, 3, 4, 5)
+import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+val data = Seq(1,3,4,5,6)
+
val sc = AmaContext.sc
val rdd = sc.parallelize(data)
-val sqlContext = AmaContext.sqlContext
+val sqlContext = AmaContext.spark
import sqlContext.implicits._
val x: DataFrame = rdd.toDF()
diff --git a/executor/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala
index 34ad839..189701f 100755
--- a/executor/src/test/resources/step-2.scala
+++ b/executor/src/test/resources/step-2.scala
@@ -1,7 +1,5 @@
import org.apache.amaterasu.executor.runtime.AmaContext
-val oddRdd = AmaContext.getRDD[Int]("start", "rdd").filter(x=>x/2 == 0)
-oddRdd.take(5).foreach(println)
-val highNoDf = AmaContext.getDataFrame("start", "x").where("_1 > 3")
+val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
highNoDf.show
diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644
index 0000000..e1b0d2e
--- /dev/null
+++ b/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
Binary files differ
diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644
index 0000000..d807ba9
--- /dev/null
+++ b/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
Binary files differ
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
index d41feea..68c06ce 100755
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
+++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
@@ -1,54 +1,56 @@
-//package org.apache.amaterasu.spark
-//
-//import java.io.File
-//
-//import org.apache.amaterasu.common.runtime._
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.utilities.TestNotifier
-//
-//import scala.collection.JavaConverters._
-//import org.apache.commons.io.FileUtils
-//import java.io.ByteArrayOutputStream
-//
-//import org.apache.spark.SparkConf
-//import org.apache.spark.repl.Main
-//import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
-//import org.apache.spark.sql.SparkSession
-//import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-//
-//class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-//
-// var runner: SparkScalaRunner = _
-//
-// override protected def beforeAll(): Unit = {
-//
-// FileUtils.deleteQuietly(new File("/tmp/job_5/"))
-//
-// val env = Environment()
-// env.workingDir = "file:///tmp"
-// env.master = "local[*]"
-//
-//
-// val spark = SparkRunnerHelper.createSpark(env, "job_5", Seq.empty[String], Map.empty)
-//
-//
-// val notifier = new TestNotifier()
-// val strm = new ByteArrayOutputStream()
-// runner = SparkScalaRunner(env, "job_5", spark, strm, notifier, Seq.empty[String])
-// super.beforeAll()
-// }
-//
-// "SparkScalaRunner" should "execute the simple-spark.scala" in {
-//
-// val script = getClass.getResource("/simple-spark.scala").getPath
-// runner.executeSource(script, "start", Map.empty[String, String].asJava)
-//
-// }
-//
-// "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
-//
-// val script = getClass.getResource("/step-2.scala").getPath
-// runner.executeSource(script, "cont", Map.empty[String, String].asJava)
-//
-// }
-//}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.spark
+
+
+import scala.collection.JavaConverters._
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
+import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+
+import scala.io.Source
+
+@DoNotDiscover
+class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+ var factory: ProvidersFactory = _
+ var runner: SparkScalaRunner = _
+
+
+ "SparkScalaRunner" should "execute the simple-spark.scala" in {
+
+
+ val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+ val script = getClass.getResource("/simple-spark.scala").getPath
+ val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+ sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
+
+ }
+
+ "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
+
+ val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+ val script = getClass.getResource("/step-2.scala").getPath
+ sparkRunner.env.workingDir = s"${getClass.getResource("/tmp").getPath}"
+ AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
+ val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+ sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
+
+ }
+
+
+}
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
index 8a1e549..b11a4f9 100644
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
+++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
@@ -36,7 +36,9 @@
class SparkTestsSuite extends Suites(
new PySparkRunnerTests,
new RunnersLoadingTests,
- new SparkSqlRunnerTests) with BeforeAndAfterAll {
+ new SparkSqlRunnerTests,
+ new SparkScalaRunnerTests
+) with BeforeAndAfterAll {
var env: Environment = _
var factory: ProvidersFactory = _
@@ -84,9 +86,9 @@
this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
+ this.nestedSuites.filter(s => s.isInstanceOf[SparkScalaRunnerTests]).foreach(s => s.asInstanceOf[SparkScalaRunnerTests].factory = factory)
this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
-
super.beforeAll()
}