blob: 0bfa93eb540aa39ac602e57144fe4069cecf4b9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.carbondata.bucketing
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.exception.ProcessMetaDataException
class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
var threshold: Int = _
override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1")
sql("DROP TABLE IF EXISTS t4")
sql("DROP TABLE IF EXISTS t5")
sql("DROP TABLE IF EXISTS t6")
sql("DROP TABLE IF EXISTS t6_")
sql("DROP TABLE IF EXISTS t7")
sql("DROP TABLE IF EXISTS t8")
sql("DROP TABLE IF EXISTS t9")
sql("DROP TABLE IF EXISTS t10")
sql("DROP TABLE IF EXISTS t11")
sql("DROP TABLE IF EXISTS t12")
sql("DROP TABLE IF EXISTS t13")
sql("DROP TABLE IF EXISTS t14")
sql("DROP TABLE IF EXISTS t15")
sql("DROP TABLE IF EXISTS t16")
sql("DROP TABLE IF EXISTS t17")
sql("DROP TABLE IF EXISTS t18")
sql("DROP TABLE IF EXISTS t19")
sql("DROP TABLE IF EXISTS t20")
sql("DROP TABLE IF EXISTS t21")
sql("DROP TABLE IF EXISTS t22")
sql("DROP TABLE IF EXISTS t23")
sql("DROP TABLE IF EXISTS t24")
sql("DROP TABLE IF EXISTS t25")
sql("DROP TABLE IF EXISTS t26")
sql("DROP TABLE IF EXISTS t27")
sql("DROP TABLE IF EXISTS t28")
sql("DROP TABLE IF EXISTS t40")
sql("DROP TABLE IF EXISTS t41")
sql("DROP TABLE IF EXISTS t42")
sql("DROP TABLE IF EXISTS t43")
sql("DROP TABLE IF EXISTS t44")
sql("DROP TABLE IF EXISTS t45")
sql("DROP TABLE IF EXISTS t46")
sql("DROP TABLE IF EXISTS t47")
sql("DROP TABLE IF EXISTS t48")
sql("DROP TABLE IF EXISTS t49")
sql("DROP TABLE IF EXISTS t50")
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("DROP TABLE IF EXISTS parquet_table")
}
test("test create table with buckets using table properties and loaded data will" +
" store into different files") {
sql("CREATE TABLE t4 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4")
val table = CarbonEnv.getCarbonTable(Option("default"), "t4")(sqlContext.sparkSession)
val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0")
val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles.length == 4)
checkAnswer(sql("select count(*) from t4"), Row(100))
checkAnswer(sql("select count(*) from t4 where name='aaa99'"), Row(1))
if (table != null && table.getBucketingInfo() != null) {
assert(true)
} else {
assert(false, "Bucketing info does not exist")
}
}
test("test IUD of bucket table") {
sql("CREATE TABLE t40 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t40")
sql("CREATE TABLE t41 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t41")
// insert
sql(s"insert into t40 select 101,'2015/10/16','china','aaa101','phone2569','ASD16163',15100")
checkAnswer(sql(
"""select count(*) from t40
""".stripMargin), Row(101))
// update
sql(s"update t40 set (name) = ('aaa100') where name='aaa101'")
checkAnswer(sql(
"""select count(*) from t40
""".stripMargin), Row(101))
checkAnswer(sql(
"""select count(*) from t40 where name='aaa100'
""".stripMargin), Row(2))
// delete
sql(s"delete from t40 where name='aaa100'")
checkAnswer(sql(
"""select count(*) from t40
""".stripMargin), Row(99))
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t40 t1, t41 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(99))
// insert again
sql(s"insert into t40 select 1011,'2015/10/16','china','aaa1011','phone2569','ASD16163',15100")
sql(s"insert into t40 select 1012,'2015/10/16','china','aaa1012','phone2569','ASD16163',15100")
sql(s"insert into t40 select 1013,'2015/10/16','china','aaa1013','phone2569','ASD16163',15100")
sql(s"insert into t40 select 1014,'2015/10/16','china','aaa1014','phone2569','ASD16163',15100")
checkAnswer(sql(
"""select count(*) from t40
""".stripMargin), Row(103))
// join after IUD
val plan = sql(
"""
|select t1.*, t2.*
|from t40 t1, t41 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t40 t1, t41 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(99))
// insert into t41
sql(s"insert into t41 select 1014,'2015/10/16','china','aaa1014','phone2569','ASD16163',15100")
// join after 2 tables both IUD
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t40 t1, t41 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
val plan2 = sql(
"""
|select t1.*, t2.*
|from t40 t1, t41 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists2 = false
plan2.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists2 = true
}
assert(!shuffleExists2, "shuffle should not exist on bucket tables")
}
test("test create carbon table with buckets like hive sql") {
sql("CREATE TABLE t13 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t13")
val table = CarbonEnv.getCarbonTable(Option("default"), "t13")(sqlContext.sparkSession)
val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0")
val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles.length == 4)
checkAnswer(sql("select count(*) from t13"), Row(100))
checkAnswer(sql("select count(*) from t13 where name='aaa99'"), Row(1))
if (table != null && table.getBucketingInfo() != null) {
assert(true)
} else {
assert(false, "Bucketing info does not exist")
}
}
test("test create table with buckets unsafe") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
sql("CREATE TABLE t10 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t10")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "t10")
if (table != null && table.getBucketingInfo() != null) {
assert(true)
} else {
assert(false, "Bucketing info does not exist")
}
}
test("test create table with empty bucket number must fail") {
val ex = intercept[MalformedCarbonCommandException] {
sql(
"CREATE TABLE t11 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='', 'BUCKET_COLUMNS'='name')"
)
}
assert(ex.getMessage.contains("INVALID NUMBER OF BUCKETS SPECIFIED"))
}
test("test create table with bucket number having non numeric value must fail") {
val ex = intercept[MalformedCarbonCommandException] {
sql(
"CREATE TABLE t11 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='one', 'BUCKET_COLUMNS'='name')"
)
}
assert(ex.getMessage.contains("INVALID NUMBER OF BUCKETS SPECIFIED"))
}
test("must be unable to create if number of buckets is in negative number") {
try {
sql(
"""
CREATE TABLE t9
(ID Int, date Timestamp, country String,
name String, phonetype String, serialname String, salary Int)
USING carbondata
OPTIONS("bucket_number"="-1", "bucket_columns"="name")
""")
assert(false)
}
catch {
case malformedCarbonCommandException: MalformedCarbonCommandException => assert(true)
}
}
test("must unable to create table if number of buckets is 0") {
try{
sql(
"""
|CREATE TABLE t11
|(ID Int,
| date Timestamp,
| country String,
| name String,
| phonetype String,
| serialname String,
| salary Int)
| STORED AS carbondata
| TBLPROPERTIES('bucket_number'='0', 'bucket_columns'='name')
""".stripMargin
)
assert(false)
}
catch {
case malformedCarbonCommandException: MalformedCarbonCommandException => assert(true)
}
}
test("Bucket table only sort inside buckets, can not set sort scope but can set sort columns.") {
val ex = intercept[ProcessMetaDataException] {
sql("CREATE TABLE t44 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'sort_columns'='name', 'sort_scope'='global_sort')")
}
assert(ex.getMessage.contains("Bucket table only sort inside buckets," +
" can not set sort scope but can set sort columns."))
}
test("test create table with both no bucket join of carbon tables") {
sql("CREATE TABLE t5 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t5")
val plan = sql(
"""
|select t1.*, t2.*
|from t5 t1, t5 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t5 t1, t5 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
}
test("test join of carbon bucket table and non bucket parquet table") {
sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8")
sql("DROP TABLE IF EXISTS parquet_table")
sql("select * from t8").write
.format("parquet")
.saveAsTable("parquet_table")
val plan = sql(
"""
|select t1.*, t2.*
|from t8 t1, parquet_table t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t8 t1, parquet_table t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
sql("DROP TABLE parquet_table")
}
test("test no shuffle when using bucket tables") {
sql("CREATE TABLE t12 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t12")
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("select * from t12").write
.format("parquet")
.bucketBy(4, "name")
.saveAsTable("bucketed_parquet_table")
checkAnswer(sql("select count(*) from t12"), Row(100))
checkAnswer(sql("select count(*) from bucketed_parquet_table"), Row(100))
val plan = sql(
"""
|select t1.*, t2.*
|from t12 t1, bucketed_parquet_table t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
sql("DROP TABLE bucketed_parquet_table")
}
test("test join of carbon bucket tables") {
sql("CREATE TABLE t6 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6")
sql("CREATE TABLE t6_ (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6_")
val plan = sql(
"""
|select t1.*, t2.*
|from t6 t1, t6_ t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t6 t1, t6_ t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
test("test join of carbon bucket table and parquet bucket table") {
sql("CREATE TABLE t7 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t7")
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("select * from t7").write
.format("parquet")
.bucketBy(9, "name")
.saveAsTable("bucketed_parquet_table")
// carbon join parquet, both bucket tables.
val plan = sql(
"""
|select t1.*, t2.*
|from t7 t1, bucketed_parquet_table t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
// parquet join parquet, both bucket tables.
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t7 t1, bucketed_parquet_table t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
sql("DROP TABLE bucketed_parquet_table")
}
test("test join of carbon bucket tables using hive sql") {
sql("CREATE TABLE t14 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t14")
sql("CREATE TABLE t15 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t15")
val plan = sql(
"""
|select t1.*, t2.*
|from t14 t1, t15 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t14 t1, t15 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
test("test join of diff data types as bucket column for carbon tables") {
sql("CREATE TABLE t16 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='ID')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t16")
sql("CREATE TABLE t17 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t17")
val plan = sql(
"""
|select t1.*, t2.*
|from t16 t1, t17 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t16 t1, t17 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
}
test("timestamp as bucket column, test join of carbon bucket tables") {
sql("CREATE TABLE t18 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='date')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t18")
sql("CREATE TABLE t19 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='date')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t19")
val plan = sql(
"""
|select t1.*, t2.*
|from t18 t1, t19 t2
|where t1.date = t2.date
""".stripMargin).queryExecution.executedPlan
// here the time column in source.csv has some duplicate values
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t18 t1, t19 t2
|where t1.date = t2.date) temp
""".stripMargin), Row(120))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
test("timestamp as bucket column, test join of carbon bucket table and parquet table") {
sql("CREATE TABLE t20 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='date')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t20")
// parquet 1
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20")
sql("select * from t20").write
.format("parquet")
.bucketBy(9, "date")
.saveAsTable("bucketed_parquet_table_t20")
// parquet 2
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20_")
sql("select * from t20").write
.format("parquet")
.bucketBy(9, "date")
.saveAsTable("bucketed_parquet_table_t20_")
// parquet join with parquet
val plan2 = sql(
"""
|select t1.*, t2.*
|from bucketed_parquet_table_t20_ t1, bucketed_parquet_table_t20 t2
|where t1.date = t2.date
""".stripMargin).queryExecution.executedPlan
var shuffleExists2 = false
plan2.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists2 = true
}
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from bucketed_parquet_table_t20_ t1, bucketed_parquet_table_t20 t2
|where t1.date = t2.date) temp
""".stripMargin), Row(120))
// carbon join with parquet
val plan = sql(
"""
|select t1.*, t2.*
|from t20 t1, bucketed_parquet_table_t20 t2
|where t1.date = t2.date
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t20 t1, bucketed_parquet_table_t20 t2
|where t1.date = t2.date) temp
""".stripMargin), Row(120))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(shuffleExists == shuffleExists2, "for no string bucket column, shuffle should " +
"keep the same behavior as parquet")
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20")
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20_")
}
test("long as bucket column, test join of carbon bucket table and parquet table") {
sql("CREATE TABLE t21 (ID long, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='ID')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t21")
// parquet 1
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t21")
sql("select * from t21").write
.format("parquet")
.bucketBy(9, "ID")
.saveAsTable("bucketed_parquet_table_t21")
// carbon join with parquet
val plan = sql(
"""
|select t1.*, t2.*
|from t21 t1, bucketed_parquet_table_t21 t2
|where t1.ID = t2.ID
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist in bucket table join")
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t21 t1, bucketed_parquet_table_t21 t2
|where t1.ID = t2.ID) temp
""".stripMargin), Row(100))
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t21")
}
test("int as bucket column, test join of carbon bucket table and parquet table") {
sql("CREATE TABLE t22 (ID int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='ID')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t22")
// parquet 1
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t22")
sql("select * from t22").write
.format("parquet")
.bucketBy(9, "ID")
.saveAsTable("bucketed_parquet_table_t22")
// carbon join with parquet
val plan = sql(
"""
|select t1.*, t2.*
|from t22 t1, bucketed_parquet_table_t22 t2
|where t1.ID = t2.ID
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist in bucket table join")
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t22 t1, bucketed_parquet_table_t22 t2
|where t1.ID = t2.ID) temp
""".stripMargin), Row(100))
sql("DROP TABLE IF EXISTS bucketed_parquet_table_t22")
}
test("test bucket hash method config") {
sql("CREATE TABLE t23 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'bucket_hash_method'='NATIVE' )")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t23")
sql("CREATE TABLE t24 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'bucket_hash_method'='NATIVE')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t24")
val plan = sql(
"""
|select t1.*, t2.*
|from t23 t1, t24 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t23 t1, t24 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
test("only shuffle 1 side whose bucket num larger when join of carbon bucket" +
" tables with diff bucket num") {
sql("CREATE TABLE t25 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='3', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t25")
sql("CREATE TABLE t26 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='7', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t26")
val plan = sql(
"""
|select t1.*, t2.*
|from t25 t1, t26 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t25 t1, t26 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleLeftExists = false
var shuffleRightExists = false
plan.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[SortMergeJoinExec].left.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleLeftExists = true
}
plan.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[SortMergeJoinExec].right.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleRightExists = true
}
assert(shuffleLeftExists && !shuffleRightExists, "only shuffle 1 side whose bucket num larger")
}
test("test compaction of bucket tables") {
sql("CREATE TABLE t27 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
sql(s"alter table t27 compact 'minor'")
val table = CarbonEnv.getCarbonTable(Option("default"), "t27")(sqlContext.sparkSession)
// data should store into diff files bases on bucket id in compaction
val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0.1")
val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles.length == 10)
sql("CREATE TABLE t28 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t28")
val plan = sql(
"""
|select t1.*, t2.*
|from t27 t1, t28 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t27 t1, t28 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(500))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
test("test alter column of bucket table") {
sql("CREATE TABLE t42 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t42")
sql("CREATE TABLE t43 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t43")
// bucket columns not allowed to change include rename change data type and drop.
val ex = intercept[MalformedCarbonCommandException] {
sql(s"alter table t42 change name name222 string")
}
assert(ex.getMessage.contains("Column Rename Operation failed." +
" Renaming the bucket column name is not allowed"))
val ex2 = intercept[ProcessMetaDataException] {
sql(s"alter table t42 drop columns(name)")
}
assert(ex2.getMessage.contains("Bucket columns cannot be dropped: List(name)"))
// alter table column
sql(s"alter table t42 change salary slong long")
checkAnswer(sql(
"""select count(*) from t42 where slong=15000
""".stripMargin), Row(1))
// join after alter table
val plan = sql(
"""
|select t1.*, t2.*
|from t42 t1, t43 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t42 t1, t43 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
// test desc formatted
val descPar = sql("desc formatted t42").collect
descPar.find(_.get(0).toString.contains("Bucket Columns")) match {
case Some(row) => assert(row.get(1).toString.contains("name"))
case None => fail("Bucket Columns: not found in describe formatted")
}
descPar.find(_.get(0).toString.contains("Number of Buckets")) match {
case Some(row) => assert(row.get(1).toString.contains("10"))
case None => fail("Number of Buckets: not found in describe formatted")
}
}
test("test insert into bucket table old insert flow") {
sql("CREATE TABLE t45 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t45")
sql("CREATE TABLE t46 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
// use old flow
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
sql(s"INSERT INTO t46 SELECT * FROM t45")
val table = CarbonEnv.getCarbonTable(Option("default"), "t46")(sqlContext.sparkSession)
val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0")
val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles.length == 4)
checkAnswer(sql(
"""select count(*) from t46
""".stripMargin), Row(100))
sql(
"""select * from t46
""".stripMargin).show(100, false)
val plan = sql(
"""
|select t1.*, t2.*
|from t45 t1, t46 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
checkAnswer(sql(
"""select count(*) from t46 where name='aaa1'
""".stripMargin), Row(1))
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t45 t1, t46 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
}
test("test insert into bucket table new insert flow") {
sql("CREATE TABLE t47 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t47")
sql("CREATE TABLE t48 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
// use new flow
sql(s"INSERT INTO t48 SELECT * FROM t47")
val table = CarbonEnv.getCarbonTable(Option("default"), "t48")(sqlContext.sparkSession)
val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0")
val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata")
})
assert(dataFiles.length == 4)
checkAnswer(sql(
"""select count(*) from t48
""".stripMargin), Row(100))
sql(
"""select * from t48
""".stripMargin).show(100, false)
val plan = sql(
"""
|select t1.*, t2.*
|from t47 t1, t48 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
checkAnswer(sql(
"""select count(*) from t48 where name='aaa1'
""".stripMargin), Row(1))
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t47 t1, t48 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
}
test("test multi bucket columns") {
sql("CREATE TABLE t49 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name,date')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t49")
sql("CREATE TABLE t50 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name,date')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t50")
val plan = sql(
"""
|select t1.*, t2.*
|from t49 t1, t50 t2
|where t1.name = t2.name and t1.date = t2.date
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t49 t1, t50 t2
|where t1.name = t2.name and t1.date = t2.date) temp
""".stripMargin), Row(100))
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist when all bucket columns" +
"in query filter")
val plan2 = sql(
"""
|select t1.*, t2.*
|from t49 t1, t50 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
checkAnswer(sql(
"""select count(*) from
|(select t1.*, t2.*
|from t49 t1, t50 t2
|where t1.name = t2.name) temp
""".stripMargin), Row(100))
var shuffleExists2 = false
plan2.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists2 = true
}
assert(shuffleExists2, "shuffle should exist when some bucket columns not exist in filter")
}
override def afterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
sql("DROP TABLE IF EXISTS t4")
sql("DROP TABLE IF EXISTS t5")
sql("DROP TABLE IF EXISTS t6")
sql("DROP TABLE IF EXISTS t6_")
sql("DROP TABLE IF EXISTS t7")
sql("DROP TABLE IF EXISTS t8")
sql("DROP TABLE IF EXISTS t9")
sql("DROP TABLE IF EXISTS t10")
sql("DROP TABLE IF EXISTS t11")
sql("DROP TABLE IF EXISTS t12")
sql("DROP TABLE IF EXISTS t13")
sql("DROP TABLE IF EXISTS t14")
sql("DROP TABLE IF EXISTS t15")
sql("DROP TABLE IF EXISTS t16")
sql("DROP TABLE IF EXISTS t17")
sql("DROP TABLE IF EXISTS t18")
sql("DROP TABLE IF EXISTS t19")
sql("DROP TABLE IF EXISTS t20")
sql("DROP TABLE IF EXISTS t21")
sql("DROP TABLE IF EXISTS t22")
sql("DROP TABLE IF EXISTS t23")
sql("DROP TABLE IF EXISTS t24")
sql("DROP TABLE IF EXISTS t25")
sql("DROP TABLE IF EXISTS t26")
sql("DROP TABLE IF EXISTS t27")
sql("DROP TABLE IF EXISTS t28")
sql("DROP TABLE IF EXISTS t40")
sql("DROP TABLE IF EXISTS t41")
sql("DROP TABLE IF EXISTS t42")
sql("DROP TABLE IF EXISTS t43")
sql("DROP TABLE IF EXISTS t44")
sql("DROP TABLE IF EXISTS t45")
sql("DROP TABLE IF EXISTS t46")
sql("DROP TABLE IF EXISTS t47")
sql("DROP TABLE IF EXISTS t48")
sql("DROP TABLE IF EXISTS t49")
sql("DROP TABLE IF EXISTS t50")
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("DROP TABLE IF EXISTS parquet_table")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString)
}
}