blob: 9b106ba8af27487da7f717fdde3ab11c080be390 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.carbondata.bucketing
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
var threshold: Int = _
override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1")
sql("DROP TABLE IF EXISTS t4")
sql("DROP TABLE IF EXISTS t5")
sql("DROP TABLE IF EXISTS t6")
sql("DROP TABLE IF EXISTS t7")
sql("DROP TABLE IF EXISTS t8")
sql("DROP TABLE IF EXISTS t9")
sql("DROP TABLE IF EXISTS t10")
sql("DROP TABLE IF EXISTS t11")
}
test("test create table with buckets") {
sql("CREATE TABLE t4 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4")
val table = CarbonEnv.getCarbonTable(Option("default"), "t4")(sqlContext.sparkSession)
if (table != null && table.getBucketingInfo() != null) {
assert(true)
} else {
assert(false, "Bucketing info does not exist")
}
}
test("test create table with buckets unsafe") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
sql("CREATE TABLE t10 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t10")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "t10")
if (table != null && table.getBucketingInfo() != null) {
assert(true)
} else {
assert(false, "Bucketing info does not exist")
}
}
test("must be unable to create if number of buckets is in negative number") {
try {
sql(
"""
CREATE TABLE t9
(ID Int, date Timestamp, country String,
name String, phonetype String, serialname String, salary Int)
USING carbondata
OPTIONS("bucketnumber"="-1", "bucketcolumns"="name")
""")
assert(false)
}
catch {
case malformedCarbonCommandException: MalformedCarbonCommandException => assert(true)
}
}
test("must unable to create table if number of buckets is 0") {
try{
sql(
"""
|CREATE TABLE t11
|(ID Int,
| date Timestamp,
| country String,
| name String,
| phonetype String,
| serialname String,
| salary Int)
| STORED AS carbondata
| TBLPROPERTIES('bucketnumber'='0', 'bucketcolumns'='name')
""".stripMargin
)
assert(false)
}
catch {
case malformedCarbonCommandException: MalformedCarbonCommandException => assert(true)
}
}
test("test create table with no bucket join of carbon tables") {
sql("CREATE TABLE t5 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t5")
val plan = sql(
"""
|select t1.*, t2.*
|from t5 t1, t5 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
}
test("test create table with bucket join of carbon tables") {
sql("CREATE TABLE t6 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6")
val plan = sql(
"""
|select t1.*, t2.*
|from t6 t1, t6 t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
test("test create table with bucket join of carbon table and parquet table") {
sql("CREATE TABLE t7 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t7")
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("select * from t7").write
.format("parquet")
.bucketBy(4, "name")
.saveAsTable("bucketed_parquet_table")
val plan = sql(
"""
|select t1.*, t2.*
|from t7 t1, bucketed_parquet_table t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
sql("DROP TABLE bucketed_parquet_table")
}
test("test create table with bucket join of carbon table and non bucket parquet table") {
sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, phonetype String," +
"serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
"('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8")
sql("DROP TABLE IF EXISTS parquet_table")
sql("select * from t8").write
.format("parquet")
.saveAsTable("parquet_table")
val plan = sql(
"""
|select t1.*, t2.*
|from t8 t1, parquet_table t2
|where t1.name = t2.name
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
case s: Exchange if (s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
s.getClass.getName.equals
("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
=> shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
sql("DROP TABLE parquet_table")
}
// TODO: make pluggable CarbonOptimizerUtil.transformForScalarSubQuery
ignore("test scalar subquery with equal") {
sql(
"""select sum(salary) from t4 t1
|where ID = (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin)
.count()
}
// TODO: make pluggable CarbonOptimizerUtil.transformForScalarSubQuery
ignore("test scalar subquery with lessthan") {
sql(
"""select sum(salary) from t4 t1
|where ID < (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin)
.count()
}
override def afterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
sql("DROP TABLE IF EXISTS t4")
sql("DROP TABLE IF EXISTS t5")
sql("DROP TABLE IF EXISTS t6")
sql("DROP TABLE IF EXISTS t7")
sql("DROP TABLE IF EXISTS t8")
sql("DROP TABLE IF EXISTS t9")
sql("DROP TABLE IF EXISTS t10")
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("DROP TABLE IF EXISTS parquet_table")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString)
}
}