blob: eed9c3e74c71c062e5ab2ad9b778cf365a0b3079 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.dataload
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
class TestLoadWithSortTempCompressed extends QueryTest
with BeforeAndAfterEach with BeforeAndAfterAll {
val originOffHeapStatus: String = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
val originSortTempCompressor: String = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT)
val simpleTable = "simpleTable"
val complexCarbonTable = "complexCarbonTable"
val complexHiveTable = "complexHiveTable"
override def beforeEach(): Unit = {
sql(s"drop table if exists $simpleTable")
sql(s"drop table if exists $complexCarbonTable")
sql(s"drop table if exists $complexHiveTable")
}
override def afterEach(): Unit = {
sql(s"drop table if exists $simpleTable")
sql(s"drop table if exists $complexCarbonTable")
sql(s"drop table if exists $complexHiveTable")
}
override protected def beforeAll(): Unit = {
}
override def afterAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
originSortTempCompressor)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, originOffHeapStatus)
}
private def testSimpleTable(): Unit = {
val lineNum: Int = 10002
val df = {
import sqlContext.implicits._
sqlContext.sparkContext.parallelize((1 to lineNum).reverse)
.map(x => (s"a$x", s"b$x", s"c$x", 12.3 + x, x, System.currentTimeMillis(), s"d$x"))
.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7")
}
df.write
.format("carbondata")
.option("tableName", simpleTable)
.option("tempCSV", "true")
.option("SORT_COLUMNS", "c1,c3")
.save()
checkAnswer(sql(s"select count(*) from $simpleTable"), Row(lineNum))
checkAnswer(sql(s"select count(*) from $simpleTable where c5 > 5001"), Row(5001))
}
test("test data load for simple table without sort temp compressed and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR, "")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table without sort temp compressed and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR, "")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table with sort temp compressed with snappy" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table with sort temp compressed with snappy" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table with sort temp compressed with zstd" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table with sort temp compressed with zstd" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table with sort temp compressed with gzip" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"gzip")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for simple table with sort temp compressed with gzip" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"gzip")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
private def testComplexTable(): Unit = {
// note: following tests are copied from `TestComplexTypeQuery`
sql(
s"create table $complexCarbonTable(deviceInformationId int, channelsId string, ROMSize " +
"string, ROMName String, purchasedate string, mobile struct<imei:string, imsi:string>, MAC " +
"array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>, " +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double) STORED AS carbondata ")
sql(s"LOAD DATA local inpath '$resourcesPath/complextypesample.csv' INTO table" +
s" $complexCarbonTable OPTIONS('DELIMITER'=',', " +
"'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
"purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber', " +
"'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
sql(
s"create table $complexHiveTable(deviceInformationId int, channelsId string, ROMSize " +
"string, ROMName String, purchasedate string,mobile struct<imei:string, imsi:string>,MAC " +
"array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>, " +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double)row format delimited fields terminated by ',' collection " +
"items terminated by '$' map keys terminated by ':'")
sql(s"LOAD DATA local inpath '$resourcesPath/complextypesample.csv' INTO table" +
s" $complexHiveTable")
checkAnswer(sql(s"select * from $complexCarbonTable"), sql(s"select * from $complexHiveTable"))
checkAnswer(sql(s"select MAC from $complexCarbonTable where MAC[0] = 'MAC1'"),
sql(s"select MAC from $complexHiveTable where MAC[0] = 'MAC1'"))
checkAnswer(sql(s"select mobile from $complexCarbonTable where mobile.imei like '1AA%'"),
sql(s"select mobile from $complexHiveTable where mobile.imei like '1AA%'"))
checkAnswer(sql(s"select locationinfo from $complexCarbonTable" +
" where locationinfo[0].ActiveAreaId > 2 AND locationinfo[0].ActiveAreaId < 7"),
sql(s"select locationinfo from $complexHiveTable" +
" where locationinfo[0].ActiveAreaId > 2 AND locationinfo[0].ActiveAreaId < 7"))
}
test("test data load for complex table with sort temp compressed with snappy" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for complex table with sort temp compressed with snappy" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for complex table with sort temp compressed with zstd" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
test("test data load for complex table with sort temp compressed with zstd" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}
}