package org.apache.carbondata.cluster.sdv.generated
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.common.util._
import org.scalatest.BeforeAndAfterAll
class BucketingTestCase extends QueryTest with BeforeAndAfterAll {
var threshold: Int = _
var timeformat = CarbonProperties.getInstance()
.getProperty("carbon.timestamp.format", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
override def beforeAll {
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1")
sql("DROP TABLE IF EXISTS bucket_table")
test("test exception if bucketcolumns be measure column") {
intercept[Exception] {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
test("test exception if bucketcolumns be complex data type column") {
intercept[Exception] {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (Id int, number double, name string, " +
"gamePoint array<double>, mac struct<num:double>) STORED BY 'carbondata' TBLPROPERTIES" +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='gamePoint')")
test("test Int column as bucketcolumns through dictionary_include") {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table")
val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_bucket_table")
if (table != null && table.getBucketingInfo != null) {
} else {
assert(false, "Bucketing info does not exist")
test("test multi columns as bucketcolumns") {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name,phonetype')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table")
val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_bucket_table")
if (table != null && table.getBucketingInfo != null) {
} else {
assert(false, "Bucketing info does not exist")
test("test multi columns as bucketcolumns with bucket join") {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country,name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table")
val plan = sql(
|select t1.*, t2.*
|from bucket_table t1, bucket_table t2
|where = and =
var shuffleExists = false
plan.collect {
case s: ShuffleExchangeExec => shuffleExists = true
assert(!shuffleExists, "shuffle should not exist on bucket column join")
test("test non bucket column join") {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table")
val plan = sql(
|select t1.*, t2.*
|from bucket_table t1, bucket_table t2
|where =
var shuffleExists = false
plan.collect {
case s: ShuffleExchangeExec => shuffleExists = true
assert(shuffleExists, "shuffle should exist on non-bucket column join")
test("test bucketcolumns through multi data loading plus compaction") {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
val numOfLoad = 10
for (j <- 0 until numOfLoad) {
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table")
sql("ALTER TABLE bucket_table COMPACT 'MAJOR'")
val plan = sql(
|select t1.*, t2.*
|from bucket_table t1, bucket_table t2
|where =
var shuffleExists = false
plan.collect {
case s: ShuffleExchangeExec => shuffleExists = true
assert(!shuffleExists, "shuffle should not exist on bucket tables")
test("drop non-bucket column, test bucket column join") {
sql("DROP TABLE IF EXISTS bucket_table")
sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table")
sql("ALTER TABLE bucket_table DROP COLUMNS (ID,country)")
val plan = sql(
|select t1.*, t2.*
|from bucket_table t1, bucket_table t2
|where =
var shuffleExists = false
plan.collect {
case s: ShuffleExchangeExec => shuffleExists = true
assert(!shuffleExists, "shuffle should not exist on bucket tables")
override def afterAll {
sql("DROP TABLE IF EXISTS bucket_table")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeformat)