blob: 98e4f18af00320d07ebbe432402281b298ff8229 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.dataload
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.scalatest.BeforeAndAfterAll
class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
var df: DataFrame = _
override def beforeAll {
sql("DROP TABLE IF EXISTS carbon1")
import sqlContext.implicits._
df = sqlContext.sparkContext.parallelize(1 to 1000)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon1")
.mode(SaveMode.Overwrite)
.save()
}
test("read and write using CarbonContext") {
val in = sqlContext.read
.format("carbondata")
.option("tableName", "carbon1")
.load()
assert(in.where("c3 > 500").count() == 500)
}
test("read and write using CarbonContext with compression") {
val in = sqlContext.read
.format("carbondata")
.option("tableName", "carbon1")
.option("compress", "true")
.load()
assert(in.where("c3 > 500").count() == 500)
}
test("test overwrite") {
sql("DROP TABLE IF EXISTS carbon4")
df.write
.format("carbondata")
.option("tableName", "carbon4")
.mode(SaveMode.Overwrite)
.save()
df.write
.format("carbondata")
.option("tableName", "carbon4")
.mode(SaveMode.Overwrite)
.save()
val in = sqlContext.read
.format("carbondata")
.option("tableName", "carbon4")
.load()
assert(in.where("c3 > 500").count() == 500)
sql("DROP TABLE IF EXISTS carbon4")
}
test("read and write using CarbonContext, multiple load") {
sql("DROP TABLE IF EXISTS carbon4")
df.write
.format("carbondata")
.option("tableName", "carbon4")
.mode(SaveMode.Overwrite)
.save()
df.write
.format("carbondata")
.option("tableName", "carbon4")
.mode(SaveMode.Append)
.save()
val in = sqlContext.read
.format("carbondata")
.option("tableName", "carbon4")
.load()
assert(in.where("c3 > 500").count() == 1000)
sql("DROP TABLE IF EXISTS carbon4")
}
test("query using SQLContext") {
val newSQLContext = new SQLContext(sqlContext.sparkContext)
newSQLContext.sql(
s"""
| CREATE TEMPORARY TABLE temp
| (c1 string, c2 string, c3 int)
| USING org.apache.spark.sql.CarbonSource
| OPTIONS (path '$storeLocation/default/carbon1')
""".stripMargin)
checkAnswer(newSQLContext.sql(
"""
| SELECT c1, c2, count(*)
| FROM temp
| WHERE c3 > 100
| GROUP BY c1, c2
""".stripMargin), Seq(Row("a", "b", 900)))
newSQLContext.dropTempTable("temp")
}
test("query using SQLContext without providing schema") {
val newSQLContext = new SQLContext(sqlContext.sparkContext)
newSQLContext.sql(
s"""
| CREATE TEMPORARY TABLE temp
| USING org.apache.spark.sql.CarbonSource
| OPTIONS (path '$storeLocation/default/carbon1')
""".stripMargin)
checkAnswer(newSQLContext.sql(
"""
| SELECT c1, c2, count(*)
| FROM temp
| WHERE c3 > 100
| GROUP BY c1, c2
""".stripMargin), Seq(Row("a", "b", 900)))
newSQLContext.dropTempTable("temp")
}
test("query using SQLContext, multiple load") {
sql("DROP TABLE IF EXISTS test")
sql(
"""
| CREATE TABLE test(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
val testData = s"${resourcesPath}/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
val newSQLContext = new SQLContext(sqlContext.sparkContext)
newSQLContext.sql(
s"""
| CREATE TEMPORARY TABLE temp
| (id int, name string, city string, age int)
| USING org.apache.spark.sql.CarbonSource
| OPTIONS (path '$storeLocation/default/test')
""".stripMargin)
checkAnswer(newSQLContext.sql(
"""
| SELECT count(id)
| FROM temp
""".stripMargin), Seq(Row(8)))
newSQLContext.dropTempTable("temp")
sql("DROP TABLE test")
}
test("json data with long datatype issue CARBONDATA-405") {
val jsonDF = sqlContext.read.format("json").load(s"$resourcesPath/test.json")
jsonDF.write
.format("carbondata")
.option("tableName", "dftesttable")
.option("compress", "true")
.mode(SaveMode.Overwrite)
.save()
val carbonDF = sqlContext
.read
.format("carbondata")
.option("tableName", "dftesttable")
.load()
checkAnswer(
carbonDF.select("age", "name"),
jsonDF.select("age", "name"))
sql("drop table dftesttable")
}
override def afterAll {
sql("DROP TABLE IF EXISTS carbon1")
sql("DROP TABLE IF EXISTS carbon2")
}
}