blob: abb5745450b1dce4067cc3f228f4d85fe79b221e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.framework.spark.runner.sparksql
import org.apache.amaterasu.common.runtime.Environment
import org.apache.amaterasu.executor.common.executors.ProvidersFactory
import org.apache.amaterasu.utilities.TestNotifier
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
import scala.collection.JavaConverters._
@DoNotDiscover
class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
Logger.getLogger("spark").setLevel(Level.OFF)
Logger.getLogger("jetty").setLevel(Level.OFF)
Logger.getRootLogger.setLevel(Level.OFF)
val notifier = new TestNotifier()
var factory: ProvidersFactory = _
var env: Environment = _
/*
Test whether parquet is used as default file format to load data from previous actions
*/
"SparkSql" should "load data as parquet if no input foramt is specified" in {
val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
val spark: SparkSession = sparkSql.spark
//Prepare test dataset
val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result")
println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
outputDf.first().getString(1) shouldEqual "Michael"
}
/*
Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL
*/
"SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
val spark: SparkSession = sparkSql.spark
//Prepare test dataset
val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
println("Output Parquet: " + inputDf.count + "," + outputDf.count)
inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
}
/*
Test whether the JSON data is successfully parsed, loaded by SparkSQL
*/
"SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
val spark: SparkSession = sparkSql.spark
//Prepare test dataset
val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
inputDf.write.mode(SaveMode.Overwrite).json(s"${env.workingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
val outputDf = spark.read.json(s"${env.workingDir}/${sparkSql.jobId}/sql_json_test/result")
println("Output JSON: " + inputDf.count + "," + outputDf.count)
outputDf.first().getString(1) shouldEqual "Kirupa"
}
/*
Test whether the CSV data is successfully parsed, loaded by SparkSQL
*/
"SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
val spark: SparkSession = sparkSql.spark
//Prepare test dataset
val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.workingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
val outputDf = spark.read.csv(s"${env.workingDir}/${sparkSql.jobId}/sql_csv_test/result")
println("Output CSV: " + inputDf.count + "," + outputDf.count)
inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
}
/*
Test whether the data can be directly read from a file and executed by sparkSql
*/
// "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
//
// val tempFileEnv = Environment()
// tempFileEnv.workingDir = "file:/tmp/"
// AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
//
// val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
// sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
// val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
// println("Output Parquet dataframe: " + outputParquetDf.show)
// outputParquetDf.first().getString(1) shouldEqual "Michael"
// sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
//
// val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
// println("Output Json dataframe: " + outputJsonDf.show)
// outputJsonDf.first().getString(1) shouldEqual "Sampath"
//
// }
}