blob: 9b6780d4c607e9ddbc673bf3e5648fd1d1c64d5e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.carbondata.integration.spark.testsuite.dataload
import{File, FileOutputStream, OutputStreamWriter, Serializable}
import scala.util.Random
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.scalatest.{BeforeAndAfterAll, Ignore}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
case class SortColumnBoundRow (id: Int, date: String, country: String, name: String,
phoneType: String, serialName: String, salary: Int) extends Serializable
object TestLoadDataWithSortColumnBounds {
def generateOneRow(id : Int): SortColumnBoundRow = {
s"phone${new Random().nextInt(10000)}",
s"ASD${new Random().nextInt(10000)}",
10000 + id)
class TestLoadDataWithSortColumnBounds extends QueryTest with BeforeAndAfterAll {
private val tableName: String = "test_table_with_sort_column_bounds"
private val filePath: String = s"$resourcesPath/source_for_sort_column_bounds.csv"
private var df: DataFrame = _
private val dateFormatStr: String = "yyyy/MM/dd"
private val totalLineNum = 2000
private val originDateStatus: String = CarbonProperties.getInstance().getProperty(
override def beforeAll(): Unit = {
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, dateFormatStr)
sql(s"DROP TABLE IF EXISTS $tableName")
// sort column bounds work only with local_sort
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "local_sort")
override def afterAll(): Unit = {
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, originDateStatus)
sql(s"DROP TABLE IF EXISTS $tableName")
new File(filePath).delete()
df = null
// sort column bounds work only with local_sort
* generate loading files based on source.csv but can have more lines
private def prepareDataFile(): Unit = {
val file = new File(filePath)
val sb: StringBuilder = new StringBuilder
def generateLine(id : Int): String = {
val row = TestLoadDataWithSortColumnBounds.generateOneRow(id)
val outputStream = new FileOutputStream(file)
val writer = new OutputStreamWriter(outputStream)
for (i <- 1 to totalLineNum) {
* prepare data frame
private def prepareDataFrame(): Unit = {
import sqlContext.implicits._
df = sqlContext.sparkSession.sparkContext.parallelize(1 to totalLineNum)
.map(id => {
val row = TestLoadDataWithSortColumnBounds.generateOneRow(id)
(,,,, row.phoneType, row.serialName, row.salary)
.toDF("ID", "date", "country", "name", "phoneType", "serialName", "salary")
test("load data with sort column bounds: safe mode") {
val originStatus = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
// load with 4 bounds
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,aab1;800,aab1;1200,aab1;1600,aab1')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
CarbonCommonConstants.ENABLE_UNSAFE_SORT, originStatus)
test("load data with sort column bounds: unsafe mode") {
val originStatus = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
// load with 4 bounds
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,aab1;800,aab1;1200,aab1;1600,aab1')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
CarbonCommonConstants.ENABLE_UNSAFE_SORT, originStatus)
test("load data with sort column bounds: empty column value in bounds is treated as null") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
// bounds have empty value
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='200,aab1;,aab1')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data with sort column bounds: sort column bounds will be ignored if it is empty.") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data with sort column bounds: number of column value in bounds should match that of sort column") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
val e = intercept[Exception] {
// number of column value does not match that of sort columns
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,aab1;800')")
"The number of field in bounds should be equal to that in sort columns." +
" Expected 2, actual 1." +
" The illegal bound is '800'"))
val e2 = intercept[Exception] {
// number of column value does not match that of sort columns
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,aab1;800,aab1,def')")
"The number of field in bounds should be equal to that in sort columns." +
" Expected 2, actual 3." +
" The illegal bound is '800,aab1,def'"))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data with sort column bounds: sort column bounds will be ignored if not using local_sort") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata'" +
// since the sort_scope is 'global_sort', we will ignore the sort column bounds,
// so the error in sort_column bounds will not be thrown
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,aab,extra_field')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
// now default sort scope is no_sort, hence all dimesions are not sorted by default
ignore("load data with sort column bounds: no sort columns explicitly specified" +
" means all dimension columns will be sort columns, so bounds should be set correctly") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata'")
// the sort_columns will have 5 columns if we don't specify it explicitly
val e = intercept[Exception] {
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,aab')")
"The number of field in bounds should be equal to that in sort columns." +
" Expected 5, actual 2." +
" The illegal bound is '400,aab'"))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data with sort column bounds: sort column is global dictionary encoded") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
// ID is sort column and dictionary column. Since the actual order and literal order of
// this column are not necessarily the same, this will not cause error but will cause data skewed.
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='400,name400;800,name800;1200,name1200;1600,name1600')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data with sort column bounds: sort column is global dictionary encoded" +
" but bounds are not in dictionary") {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' " +
// 'name' is sort column and dictionary column, but value for 'name' in bounds does not exists
// in dictionary. It will not cause error but will cause data skewed.
sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
s" OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
s" 'sort_column_bounds'='nmm400,400;nme800,800;nme1200,1200;nme1600,1600')")
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data frame with sort column bounds") {
sql(s"DROP TABLE IF EXISTS $tableName")
.option("tableName", tableName)
.option("tempCSV", "false")
.option("sort_columns", "ID,name")
.option("sort_column_bounds", "600,aab1;1200,aab1")
sql(s"select count(*) from $tableName").show()
sql(s"select count(*) from $tableName where ID > 1001").show()
checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), Row(totalLineNum - 1001))
sql(s"DROP TABLE IF EXISTS $tableName")
test("load data frame with sort column bounds: number of column value in bounds should match that of sort column") {
sql(s"DROP TABLE IF EXISTS $tableName")
val e = intercept[Exception] {
.option("tableName", tableName)
.option("tempCSV", "false")
.option("sort_columns", "ID,name")
.option("sort_column_bounds", "600,aab1;1200,aab1,def")
"The number of field in bounds should be equal to that in sort columns." +
" Expected 2, actual 3." +
" The illegal bound is '1200,aab1,def'"))
sql(s"DROP TABLE IF EXISTS $tableName")