blob: 9b40dc7a092512e3c74542dfc8a553d2cc464add [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.createTable
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.SparkTestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
/**
* test functionality for create table as select command
*/
class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
private def createTablesAndInsertData {
// create carbon table and insert data
sql("CREATE TABLE carbon_ctas_test(key INT, value STRING) STORED AS carbondata")
sql("insert into carbon_ctas_test select 100,'spark'")
sql("insert into carbon_ctas_test select 200,'hive'")
// create parquet table and insert data
sql("CREATE TABLE parquet_ctas_test(key INT, value STRING) STORED as parquet")
sql("insert into parquet_ctas_test select 100,'spark'")
sql("insert into parquet_ctas_test select 200,'hive'")
// create hive table and insert data
sql("CREATE TABLE orc_ctas_test(key INT, value STRING) STORED as ORC")
sql("insert into orc_ctas_test select 100,'spark'")
sql("insert into orc_ctas_test select 200,'hive'")
}
override def beforeAll {
sql("DROP TABLE IF EXISTS carbon_ctas_test")
sql("DROP TABLE IF EXISTS parquet_ctas_test")
sql("DROP TABLE IF EXISTS orc_ctas_test")
createTablesAndInsertData
CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
}
test("test create table as select with select from same carbon table name with if not exists clause") {
sql("drop table if exists ctas_same_table_name")
sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED AS carbondata")
checkExistence(sql("SHOW TABLES"), true, "ctas_same_table_name")
sql(
"""
| CREATE TABLE IF NOT EXISTS ctas_same_table_name
| STORED AS carbondata
| AS SELECT * FROM ctas_same_table_name
""".stripMargin)
val e = intercept[TableAlreadyExistsException] {
sql(
"""
| CREATE TABLE ctas_same_table_name
| STORED AS carbondata
| AS SELECT * FROM ctas_same_table_name
""".stripMargin)
}
assert(e.getMessage().contains("Table or view 'ctas_same_table_name' already exists"))
}
test("test create table as select with select from same table name when table does not exists") {
sql("drop table if exists ctas_same_table_name")
intercept[Exception] {
sql("create table ctas_same_table_name STORED AS carbondata as select * from ctas_same_table_name")
}
}
test("test create table as select with select from another carbon table") {
sql("DROP TABLE IF EXISTS ctas_select_carbon")
sql("create table ctas_select_carbon STORED AS carbondata as select * from carbon_ctas_test")
checkAnswer(sql("select * from ctas_select_carbon"), sql("select * from carbon_ctas_test"))
}
test("test create table as select with select from another parquet table") {
sql("DROP TABLE IF EXISTS ctas_select_parquet")
sql("create table ctas_select_parquet STORED AS carbondata as select * from parquet_ctas_test")
checkAnswer(sql("select * from ctas_select_parquet"), sql("select * from parquet_ctas_test"))
}
test("test create table as select with select from another hive/orc table") {
sql("DROP TABLE IF EXISTS ctas_select_orc")
sql("create table ctas_select_orc STORED AS carbondata as select * from orc_ctas_test")
checkAnswer(sql("select * from ctas_select_orc"), sql("select * from orc_ctas_test"))
}
test("test create table as select with where clause in select from carbon table that returns data") {
sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
sql("create table ctas_select_where_carbon STORED AS carbondata as select * from carbon_ctas_test where key=100")
checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=100"))
}
test(
"test create table as select with where clause in select from carbon table that does not return data") {
sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
sql("create table ctas_select_where_carbon STORED AS carbondata as select * from carbon_ctas_test where key=300")
checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=300"))
}
test("test create table as select with where clause in select from carbon table and load again") {
sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
sql("create table ctas_select_where_carbon STORED AS carbondata as select * from carbon_ctas_test where key=100")
sql("insert into ctas_select_where_carbon select 200,'hive'")
checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test"))
}
test("test create table as select with where clause in select from parquet table") {
sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
sql("create table ctas_select_where_parquet STORED AS carbondata as select * from parquet_ctas_test where key=100")
checkAnswer(sql("select * from ctas_select_where_parquet"), sql("select * from parquet_ctas_test where key=100"))
}
test("test create table as select with where clause in select from hive/orc table") {
sql("DROP TABLE IF EXISTS ctas_select_where_orc")
sql("create table ctas_select_where_orc STORED AS carbondata as select * from orc_ctas_test where key=100")
checkAnswer(sql("select * from ctas_select_where_orc"), sql("select * from orc_ctas_test where key=100"))
}
test("test create table as select with select directly having the data") {
sql("DROP TABLE IF EXISTS ctas_select_direct_data")
sql("create table ctas_select_direct_data STORED AS carbondata as select 300,'carbondata'")
checkAnswer(sql("select * from ctas_select_direct_data"), Seq(Row(300, "carbondata")))
}
test("test create table as select with TBLPROPERTIES") {
sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
sql(
"create table ctas_tblproperties_testt STORED AS carbondata TBLPROPERTIES" +
"('sort_scope'='global_sort') as select * from carbon_ctas_test")
checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test"))
val carbonTable = CarbonEnv.getInstance(SparkTestQueryExecutor.spark).carbonMetaStore
.lookupRelation(Option("default"), "ctas_tblproperties_testt")(SparkTestQueryExecutor.spark)
.asInstanceOf[CarbonRelation].carbonTable
val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath)
assert(metadataFolderPath.exists())
}
test("test create table as select with column name as tupleid") {
intercept[Exception] {
sql("create table t2 STORED AS carbondata as select count(value) AS tupleid from carbon_ctas_test")
}
}
test("test create table as select with column name as positionid") {
intercept[Exception] {
sql("create table t2 STORED AS carbondata as select count(value) AS positionid from carbon_ctas_test")
}
}
test("test create table as select with column name as positionreference") {
intercept[Exception] {
sql("create table t2 STORED AS carbondata as select count(value) AS positionreference from carbon_ctas_test")
}
}
test("test create table as select with where clause in select from parquet table that does not return data") {
sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
sql(
"""
| CREATE TABLE ctas_select_where_parquet
| STORED AS carbondata
| as select * FROM parquet_ctas_test
| where key=300""".stripMargin)
checkAnswer(sql("SELECT * FROM ctas_select_where_parquet"),
sql("SELECT * FROM parquet_ctas_test where key=300"))
}
test("test create table as select with where clause in select from hive/orc table that does not return data") {
sql("DROP TABLE IF EXISTS ctas_select_where_orc")
sql(
"""
| CREATE TABLE ctas_select_where_orc
| STORED AS carbondata
| AS SELECT * FROM orc_ctas_test
| where key=300""".stripMargin)
checkAnswer(sql("SELECT * FROM ctas_select_where_orc"),
sql("SELECT * FROM orc_ctas_test where key=300"))
}
test("test create table as select with select from same carbon table name with if not exists clause and source table not exists") {
sql("DROP TABLE IF EXISTS ctas_same_table_name")
checkExistence(sql("SHOW TABLES"), false, "ctas_same_table_name")
//TODO: should throw NoSuchTableException
val e = intercept[AnalysisException] {
sql(
"""
| CREATE TABLE IF NOT EXISTS ctas_same_table_name
| STORED AS carbondata
| AS SELECT * FROM ctas_same_table_name
""".stripMargin)
}
assert(e.getMessage().contains("Table or view not found: ctas_same_table_name"))
}
test("add example for documentation") {
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS parquet
| """.stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT city,avg(age) FROM source_table group by city
""".stripMargin)
// results:
// sql("SELECT * FROM target_table").show
// +--------+--------+
// | city|avg(age)|
// +--------+--------+
// |shenzhen| 29.0|
// +--------+--------+
checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 29)))
}
test("test create table as select with sum,count,min,max") {
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS carbondata
""".stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT city,sum(age),count(age),min(age),max(age)
| FROM source_table group by city
""".stripMargin)
checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 58, 2, 27, 31)))
}
test("test create table as select with insert data into source_table after CTAS") {
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS carbondata
| """.stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT city,sum(age),count(age),min(age),max(age)
| FROM source_table group by city
""".stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 58, 2, 27, 31)))
}
test("test create table as select with auto merge") {
CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS carbondata
| """.stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT city,avg(age)
| FROM source_table group by city
""".stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
checkExistence(sql("SHOW SEGMENTS FOR TABLE source_table"), true, "Compacted")
checkExistence(sql("SHOW SEGMENTS FOR TABLE target_table"), false, "Compacted")
sql("INSERT INTO target_table SELECT 'shenzhen',8")
sql("INSERT INTO target_table SELECT 'shenzhen',9")
sql("INSERT INTO target_table SELECT 'shenzhen',3")
checkExistence(sql("SHOW SEGMENTS FOR TABLE target_table"), true, "Compacted")
checkAnswer(sql("SELECT * FROM target_table"),
Seq(Row("shenzhen", 29), Row("shenzhen", 8), Row("shenzhen", 9), Row("shenzhen", 3)))
CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
}
test("test create table as select with filter, <, and, >=") {
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS carbondata
| """.stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
sql("INSERT INTO source_table SELECT 3,'jack','shenzhen',5")
sql("INSERT INTO source_table SELECT 4,'alice','shenzhen',35")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT city,avg(age)
| FROM source_table where age > 20 and age <= 31 GROUP BY city
""".stripMargin)
checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 29)))
}
test("test create table as select with filter, >=, or, =") {
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS carbondata
| """.stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
sql("INSERT INTO source_table SELECT 3,'jack','shenzhen',5")
sql("INSERT INTO source_table SELECT 4,'alice','shenzhen',35")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT city,avg(age)
| FROM source_table where age >= 20 or age = 5 group by city
""".stripMargin)
checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 24.5)))
}
test("test duplicate columns with select query") {
sql("DROP TABLE IF EXISTS target_table")
sql("DROP TABLE IF EXISTS source_table")
// create carbon table and insert data
sql(
"""
| CREATE TABLE source_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED AS carbondata
| """.stripMargin)
sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
val e = intercept[AnalysisException] {
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT t1.city, t2.city
| FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen'
""".stripMargin)
}
e.getMessage().toString.contains("Duplicated column names found in table definition of " +
"`target_table`: [\"city\"]")
sql(
"""
| CREATE TABLE target_table
| STORED AS carbondata
| AS
| SELECT t1.city as a, t2.city as b
| FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen'
""".stripMargin)
checkAnswer(sql("select * from target_table"), Seq(Row("shenzhen", "shenzhen")))
}
override def afterAll {
sql("DROP TABLE IF EXISTS carbon_ctas_test")
sql("DROP TABLE IF EXISTS parquet_ctas_test")
sql("DROP TABLE IF EXISTS orc_ctas_test")
sql("DROP TABLE IF EXISTS ctas_same_table_name")
sql("DROP TABLE IF EXISTS ctas_select_carbon")
sql("DROP TABLE IF EXISTS ctas_select_direct_data")
sql("DROP TABLE IF EXISTS ctas_select_parquet")
sql("DROP TABLE IF EXISTS ctas_select_orc")
sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
sql("DROP TABLE IF EXISTS ctas_select_where_orc")
sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
sql("DROP TABLE IF EXISTS ctas_if_table_name")
sql("DROP TABLE IF EXISTS source_table")
sql("DROP TABLE IF EXISTS target_table")
}
}