blob: 31a39f304e844ae89aa8a78664122b5284d50d72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.dataload
import java.io.File
import java.math.BigDecimal
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.scalatest.BeforeAndAfterAll
class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
var df: DataFrame = _
var dataFrame: DataFrame = _
var df2: DataFrame = _
var df3: DataFrame = _
var booldf:DataFrame = _
def buildTestData() = {
import sqlContext.implicits._
df = sqlContext.sparkContext.parallelize(1 to 32000)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
val rdd = sqlContext.sparkContext.parallelize(
Row(52.23, BigDecimal.valueOf(1234.4440), "Warsaw") ::
Row(42.30, BigDecimal.valueOf(9999.9990), "Corte") :: Nil)
val schema = StructType(
StructField("double", DoubleType, nullable = false) ::
StructField("decimal", DecimalType(9, 2), nullable = false) ::
StructField("string", StringType, nullable = false) :: Nil)
dataFrame = sqlContext.createDataFrame(rdd, schema)
df2 = sqlContext.sparkContext.parallelize(1 to 1000)
.map(x => ("key_" + x, "str_" + x, x, x * 2, x * 3))
.toDF("c1", "c2", "c3", "c4", "c5")
df3 = sqlContext.sparkContext.parallelize(1 to 3)
.map(x => (x.toString + "te,s\nt", x))
.toDF("c1", "c2")
val boolrdd = sqlContext.sparkContext.parallelize(
Row("anubhav",true) ::
Row("prince",false) :: Nil)
val boolSchema = StructType(
StructField("name", StringType, nullable = false) ::
StructField("isCarbonEmployee",BooleanType,nullable = false)::Nil)
booldf = sqlContext.createDataFrame(boolrdd,boolSchema)
}
def dropTable() = {
sql("DROP TABLE IF EXISTS carbon0")
sql("DROP TABLE IF EXISTS carbon1")
sql("DROP TABLE IF EXISTS carbon2")
sql("DROP TABLE IF EXISTS carbon3")
sql("DROP TABLE IF EXISTS carbon4")
sql("DROP TABLE IF EXISTS carbon5")
sql("DROP TABLE IF EXISTS carbon6")
sql("DROP TABLE IF EXISTS carbon7")
sql("DROP TABLE IF EXISTS carbon8")
sql("DROP TABLE IF EXISTS carbon9")
sql("DROP TABLE IF EXISTS carbon10")
sql("DROP TABLE IF EXISTS carbon11")
sql("DROP TABLE IF EXISTS carbon12")
sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
sql("DROP TABLE IF EXISTS carbon_table_df")
sql("DROP TABLE IF EXISTS carbon_table_df1")
}
override def beforeAll {
dropTable
buildTestData
}
test("test the boolean data type"){
booldf.write
.format("carbondata")
.option("tableName", "carbon0")
.option("tempCSV", "true")
.option("compress", "true")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("SELECT * FROM CARBON0"),
Seq(Row("anubhav", true), Row("prince", false)))
}
test("test load dataframe with saving compressed csv files") {
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon1")
.option("tempCSV", "true")
.option("compress", "true")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon1 where c3 > 500"), Row(31500)
)
sql(s"describe formatted carbon1").show(true)
}
test("test load dataframe with saving csv uncompressed files") {
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon2")
.option("tempCSV", "true")
.option("compress", "false")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon2 where c3 > 500"), Row(31500)
)
}
test("test load dataframe without saving csv files") {
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon3")
.option("tempCSV", "false")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon3 where c3 > 500"), Row(31500)
)
}
test("test decimal values for dataframe load"){
dataFrame.write
.format("carbondata")
.option("tableName", "carbon4")
.option("compress", "true")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("SELECT decimal FROM carbon4"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44))))
}
test("test loading data if the data count is multiple of page size"){
checkAnswer(
sql("SELECT count(*) FROM carbon2"),Seq(Row(32000)))
}
test("test datasource table with specified table path") {
val path = "./source"
df2.write
.format("carbondata")
.option("tableName", "carbon10")
.option("tablePath", path)
.mode(SaveMode.Overwrite)
.save()
assert(new File(path).exists())
checkAnswer(
sql("select count(*) from carbon10 where c3 > 500"), Row(500)
)
sql("drop table carbon10")
assert(!new File(path).exists())
assert(intercept[AnalysisException](
sql("select count(*) from carbon10 where c3 > 500"))
.message
.contains("not found"))
}
test("test streaming Table") {
dataFrame.write
.format("carbondata")
.option("tableName", "carbon11")
.option("tempCSV", "true")
.option("compress", "false")
.option("streaming", "true")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("SELECT decimal FROM carbon11"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44))))
val descResult =sql("desc formatted carbon11")
val isStreaming: String = descResult.collect().find(row=>row(0).asInstanceOf[String].trim.equalsIgnoreCase("streaming")).get.get(1).asInstanceOf[String]
assert(isStreaming.contains("sink"))
}
test("test datasource table with specified char") {
df3.write
.format("carbondata")
.option("tableName", "carbon12")
.option("tempCSV", "true")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon12"), Row(3)
)
}
private def getSortColumnValue(tableName: String): Array[String] = {
val desc = sql(s"desc formatted $tableName")
val sortColumnRow = desc.collect.find(r =>
r(0).asInstanceOf[String].trim.equalsIgnoreCase("Sort Columns")
)
assert(sortColumnRow.isDefined)
sortColumnRow.get.get(1).asInstanceOf[String].split(",")
.map(_.trim.toLowerCase).filter(_.length > 0)
}
private def getDefaultWriter(tableName: String): DataFrameWriter[Row] = {
df2.write
.format("carbondata")
.option("tableName", tableName)
.option("tempCSV", "false")
.option("table_blocksize", "256")
.option("compress", "false")
.mode(SaveMode.Overwrite)
}
// now by default all the dimensions are not selected for sorting in no_sort
ignore("test load dataframe with sort_columns not specified," +
" by default all string columns will be sort_columns") {
// all string column will be sort_columns by default
getDefaultWriter("df_write_sort_column_not_specified").save()
checkAnswer(
sql("select count(*) from df_write_sort_column_not_specified where c3 > 500"), Row(500)
)
val sortColumnValue = getSortColumnValue("df_write_sort_column_not_specified")
assert(sortColumnValue.sameElements(Array("c1", "c2")))
}
test("test load dataframe with sort_columns specified") {
// only specify c1 as sort_columns
getDefaultWriter("df_write_specify_sort_column").option("sort_columns", "c1").save()
checkAnswer(
sql("select count(*) from df_write_specify_sort_column where c3 > 500"), Row(500)
)
val sortColumnValue = getSortColumnValue("df_write_specify_sort_column")
assert(sortColumnValue.sameElements(Array("c1")))
}
test("test load dataframe with sort_columns specified empty") {
// specify empty sort_column
getDefaultWriter("df_write_empty_sort_column").option("sort_columns", "").save()
checkAnswer(
sql("select count(*) from df_write_empty_sort_column where c3 > 500"), Row(500)
)
val sortColumnValue = getSortColumnValue("df_write_empty_sort_column")
assert(sortColumnValue.isEmpty)
}
test("test load dataframe while giving already created table") {
sql(s"create table carbon_table_df(c1 string, c2 string, c3 int) stored by 'carbondata'")
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon_table_df")
.option("tempCSV", "false")
.mode(SaveMode.Overwrite)
.save()
df.write
.format("carbondata")
.option("tableName", "carbon_table_df")
.option("tempCSV", "false")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon_table_df where c3 > 500"), Row(31500)
)
}
test("test load dataframe while giving already created table with delete segment") {
sql(s"create table carbon_table_df1(c1 string, c2 string, c3 int) stored by 'carbondata'")
val table = CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession)
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon_table_df1")
.option("tempCSV", "false")
.mode(SaveMode.Overwrite)
.save()
assert(CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession)
.getTableInfo.getFactTable.equals(table.getTableInfo.getFactTable))
sql("delete from table carbon_table_df1 where segment.id in (0)")
df.write
.format("carbondata")
.option("tableName", "carbon_table_df1")
.option("tempCSV", "false")
.mode(SaveMode.Overwrite)
.save()
assert(CarbonEnv.getCarbonTable(TableIdentifier("carbon_table_df1"))(sqlContext.sparkSession)
.getTableInfo.getFactTable.equals(table.getTableInfo.getFactTable))
checkAnswer(
sql("select count(*) from carbon_table_df1 where c3 > 500"), Row(31500)
)
}
test("test load data with binary_decoder in df") {
val spark = sqlContext.sparkSession
try {
sql("DROP TABLE IF EXISTS carbon_table")
val rdd = spark.sparkContext.parallelize(1 to 3)
.map(x => Row("a" + x % 10, "b", x, "abc".getBytes()))
val customSchema = StructType(Array(
StructField("c1", StringType),
StructField("c2", StringType),
StructField("number", IntegerType),
StructField("c4", BinaryType)))
val df = spark.createDataFrame(rdd, customSchema);
// Saves dataFrame to carbondata file
df.write.format("carbondata")
.option("binary_decoder", "base64")
.option("tableName", "carbon_table")
.save()
val carbonDF = spark.read
.format("carbondata")
.option("tableName", "carbon_table")
.schema(customSchema)
.load()
assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number", "c4"))
// "YWJj" is base64 decode data of "abc"
checkAnswer(carbonDF, Seq(Row("a1", "b", 1, "abc".getBytes()),
Row("a2", "b", 2, "abc".getBytes()),
Row("a3", "b", 3, "abc".getBytes())))
val carbonDF2 = carbonDF.drop("c1")
assert(carbonDF2.schema.map(_.name) === Seq("c2", "number", "c4"))
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS carbon_table")
}
}
override def afterAll {
dropTable
}
}