blob: 75d71ecca638bdd8d6d53bb7cc0f57db2fc5f5a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.preaggregate
import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.Spark2TestQueryExecutor
import org.apache.spark.util.SparkUtil4Test
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.SparkQueryTest
class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with BeforeAndAfterEach{
val testData = s"$resourcesPath/sample.csv"
val p1 = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
override def beforeAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
SparkUtil4Test.createTaskMockUp(sqlContext)
sql("DROP TABLE IF EXISTS maintable")
}
override protected def afterAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, p1)
sql("DROP TABLE IF EXISTS y ")
sql("DROP TABLE IF EXISTS maintable")
sql("DROP TABLE IF EXISTS maintbl")
sql("DROP TABLE IF EXISTS main_table")
}
override protected def beforeEach(): Unit = {
sql("DROP TABLE IF EXISTS main_table")
sql("DROP TABLE IF EXISTS segmaintable")
}
private def createAllAggregateTables(parentTableName: String, columnName: String = "age"): Unit = {
sql(
s"""
| create datamap preagg_sum
| on table $parentTableName
| using 'preaggregate'
| as select id,sum($columnName)
| from $parentTableName
| group by id
""".stripMargin)
sql(
s"""create datamap preagg_avg on table $parentTableName using 'preaggregate' as select id,avg($columnName) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create datamap preagg_count on table $parentTableName using 'preaggregate' as select id,count($columnName) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create datamap preagg_min on table $parentTableName using 'preaggregate' as select id,min($columnName) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create datamap preagg_max on table $parentTableName using 'preaggregate' as select id,max($columnName) from $parentTableName group by id"""
.stripMargin)
}
test("test load into main table with pre-aggregate table") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
createAllAggregateTables("maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
checkAnswer(sql(s"select * from maintable_preagg_avg"),
Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
checkAnswer(sql(s"select * from maintable_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"select * from maintable_preagg_min"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
checkAnswer(sql(s"select * from maintable_preagg_max"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
sql("drop table if exists maintable")
}
test("test load into main table with pre-aggregate table with dictionary_include") {
sql("drop table if exists maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
""".stripMargin)
createAllAggregateTables("maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
checkAnswer(sql(s"select * from maintable_preagg_avg"),
Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55,2)))
checkAnswer(sql(s"select * from maintable_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"select * from maintable_preagg_min"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
checkAnswer(sql(s"select * from maintable_preagg_max"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
sql("drop table if exists maintable")
}
test("test load into main table with pre-aggregate table with single_pass") {
sql("drop table if exists maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
""".stripMargin)
createAllAggregateTables("maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable options('single_pass'='true')")
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
checkAnswer(sql(s"select * from maintable_preagg_avg"),
Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55,2)))
checkAnswer(sql(s"select * from maintable_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"select * from maintable_preagg_min"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
checkAnswer(sql(s"select * from maintable_preagg_max"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
sql("drop table if exists maintable")
}
test("test load into main table with incremental load") {
sql("drop table if exists maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
""".stripMargin)
createAllAggregateTables("maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31),
Row(2, 27),
Row(3, 70),
Row(4, 55),
Row(1, 31),
Row(2, 27),
Row(3, 70),
Row(4, 55)))
checkAnswer(sql(s"select * from maintable_preagg_avg"),
Seq(Row(1, 31, 1),
Row(2, 27, 1),
Row(3, 70, 2),
Row(4, 55, 2),
Row(1, 31, 1),
Row(2, 27, 1),
Row(3, 70, 2),
Row(4, 55, 2)))
checkAnswer(sql(s"select * from maintable_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2), Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"select * from maintable_preagg_min"),
Seq(Row(1, 31),
Row(2, 27),
Row(3, 35),
Row(4, 26),
Row(1, 31),
Row(2, 27),
Row(3, 35),
Row(4, 26)))
checkAnswer(sql(s"select * from maintable_preagg_max"),
Seq(Row(1, 31),
Row(2, 27),
Row(3, 35),
Row(4, 29),
Row(1, 31),
Row(2, 27),
Row(3, 35),
Row(4, 29)))
}
test("test to check if exception is thrown for direct load on pre-aggregate table") {
sql("drop table if exists maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
""".stripMargin)
sql(
s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
.stripMargin)
assert(intercept[RuntimeException] {
sql(s"insert into maintable_preagg_sum values(1, 30)")
}.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
sql("set carbon.input.segments.default.maintable=0")
sql(
s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id"""
.stripMargin)
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52))
}
test("test if pre-aagregate is overwritten if main table is inserted with insert overwrite") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(
s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id"""
.stripMargin)
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
sql(s"insert overwrite table maintable values(1, 'xyz', 'delhi', 29)")
checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 29))
}
test("test load in aggregate table with Measure col") {
val originalBadRecordsAction = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
sql("drop table if exists y ")
sql("create table y(year int,month int,name string,salary int) stored by 'carbondata'")
sql("insert into y select 10,11,'babu',12")
sql("create datamap y1_sum1 on table y using 'preaggregate' as select year,name,sum(salary) from y group by year,name")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, originalBadRecordsAction)
}
test("test partition load into main table with pre-aggregate table") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, city string, age int) partitioned by(name string)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
createAllAggregateTables("maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
checkAnswer(sql(s"select * from maintable_preagg_avg"),
Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
checkAnswer(sql(s"select * from maintable_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"select * from maintable_preagg_min"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
checkAnswer(sql(s"select * from maintable_preagg_max"),
Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
sql("drop table if exists maintable")
}
test("test load into preaggregate table having group by clause") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
sql("set carbon.input.segments.default.maintable=0")
sql(
s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id,name"""
.stripMargin)
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52, "xyz"))
}
test("test pregarregate with spark adaptive execution ") {
if (Spark2TestQueryExecutor.spark.version.startsWith("2.3")) {
// enable adaptive execution
Spark2TestQueryExecutor.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
}
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(
s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id,name"""
.stripMargin)
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 20)")
sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 30)")
checkAnswer(sql("select id, sum(age) from maintable group by id, name"), Row(1, 50))
sql("drop datamap preagg_sum on table maintable")
sql("drop table maintable")
if (Spark2TestQueryExecutor.spark.version.startsWith("2.3")) {
// disable adaptive execution
Spark2TestQueryExecutor.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
}
}
test("check load and select for avg double datatype") {
sql("drop table if exists maintbl ")
sql("create table maintbl(year int,month int,name string,salary double) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
sql("insert into maintbl select 10,11,'babu',12.89")
sql("insert into maintbl select 10,11,'babu',12.89")
sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name")
checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.89))
}
test("check load and select for avg int datatype") {
sql("drop table if exists maintbl ")
sql("create table maintbl(year int,month int,name string,salary int) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
sql("insert into maintbl select 10,11,'babu',12")
sql("insert into maintbl select 10,11,'babu',12")
sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name")
checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.0))
}
test("check load and select for avg bigint datatype") {
sql("drop table if exists maintbl ")
sql("create table maintbl(year int,month int,name string,salary bigint) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
sql("insert into maintbl select 10,11,'babu',12")
sql("insert into maintbl select 10,11,'babu',12")
sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name")
checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.0))
}
test("check load and select for avg short datatype") {
sql("drop table if exists maintbl ")
sql("create table maintbl(year int,month int,name string,salary short) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
sql("insert into maintbl select 10,11,'babu',12")
sql("insert into maintbl select 10,11,'babu',12")
sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name")
checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.0))
}
test("check load and select for avg float datatype") {
sql("drop table if exists maintbl ")
sql("create table maintbl(year int,month int,name string,salary float) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
sql("insert into maintbl select 10,11,'babu',12")
sql("insert into maintbl select 10,11,'babu',12")
val rows = sql("select name,avg(salary) from maintbl group by name").collect()
sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name")
checkAnswer(sql("select name,avg(salary) from maintbl group by name"), rows)
}
test("create datamap with 'if not exists' after load data into mainTable and create datamap") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(
s"""
| create datamap preagg_sum
| on table maintable
| using 'preaggregate'
| as select id,sum(age) from maintable
| group by id
""".stripMargin)
sql(
s"""
| create datamap if not exists preagg_sum
| on table maintable
| using 'preaggregate'
| as select id,sum(age) from maintable
| group by id
""".stripMargin)
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
sql("drop table if exists maintable")
}
test("create datamap with 'if not exists' after create datamap and load data into mainTable") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(
s"""
| create datamap preagg_sum
| on table maintable
| using 'preaggregate'
| as select id,sum(age) from maintable
| group by id
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(
s"""
| create datamap if not exists preagg_sum
| on table maintable
| using 'preaggregate'
| as select id,sum(age) from maintable
| group by id
""".stripMargin)
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
sql("drop table if exists maintable")
}
test("create datamap without 'if not exists' after load data into mainTable and create datamap") {
sql("DROP TABLE IF EXISTS maintable")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(
s"""
| create datamap preagg_sum
| on table maintable
| using 'preaggregate'
| as select id,sum(age) from maintable
| group by id
""".stripMargin)
val e: Exception = intercept[MalformedDataMapCommandException] {
sql(
s"""
| create datamap preagg_sum
| on table maintable
| using 'preaggregate'
| as select id,sum(age) from maintable
| group by id
""".stripMargin)
}
assert(e.getMessage.contains("DataMap name 'preagg_sum' already exist"))
checkAnswer(sql(s"select * from maintable_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
sql("drop table if exists maintable")
}
test("check load and select for avg int datatype and group by") {
sql("drop table if exists maintable ")
sql("CREATE TABLE maintable(id int, city string, age int) stored by 'carbondata'")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
val rows = sql("select age,avg(age) from maintable group by age").collect()
sql("create datamap maintbl_double on table maintable using 'preaggregate' as select avg(age) from maintable group by age")
checkAnswer(sql("select age,avg(age) from maintable group by age"), rows)
sql("drop table if exists maintable ")
}
test("test load into main table with pre-aggregate table: string") {
sql(
"""
| CREATE TABLE main_table(
| id INT,
| name STRING,
| city STRING,
| age STRING)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
createAllAggregateTables("main_table")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
checkAnswer(sql(s"SELECT * FROM main_table_preagg_sum"),
Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_avg"),
Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_min"),
Seq(Row(1, "31"), Row(2, "27"), Row(3, "35"), Row(4, "26")))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_max"),
Seq(Row(1, "31"), Row(2, "27"), Row(3, "35"), Row(4, "29")))
// check select and match or not match pre-aggregate table
checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_sum")
checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_avg", "main_table")
checkPreAggTable(sql("SELECT id, AVG(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_avg")
checkPreAggTable(sql("SELECT id, AVG(age) from main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT id, COUNT(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_count")
checkPreAggTable(sql("SELECT id, COUNT(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT id, MIN(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_min")
checkPreAggTable(sql("SELECT id, MIN(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT id, MAX(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_max")
checkPreAggTable(sql("SELECT id, MAX(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
// sub query should match pre-aggregate table
checkPreAggTable(sql("SELECT SUM(age) FROM main_table"),
true, "main_table_preagg_sum")
checkPreAggTable(sql("SELECT SUM(age) FROM main_table"),
false, "main_table_preagg_avg", "main_table")
checkPreAggTable(sql("SELECT AVG(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_avg")
checkPreAggTable(sql("SELECT AVG(age) from main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT COUNT(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_count")
checkPreAggTable(sql("SELECT COUNT(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT MIN(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_min")
checkPreAggTable(sql("SELECT MIN(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT MAX(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_max")
checkPreAggTable(sql("SELECT MAX(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
}
test("test load into main table with pre-aggregate table: sum string column") {
sql(
"""
| CREATE TABLE main_table(
| id INT,
| name STRING,
| city STRING,
| age STRING)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
createAllAggregateTables("main_table", "name")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
checkAnswer(sql(s"SELECT * FROM main_table_preagg_sum"),
Seq(Row(1, null), Row(2, null), Row(3, null), Row(4, null)))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_avg"),
Seq(Row(1, null, 1.0), Row(2, null, 1.0), Row(3, null, 2.0), Row(4, null, 2.0)))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_count"),
Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_min"),
Seq(Row(1, "david"), Row(2, "eason"), Row(3, "jarry"), Row(4, "kunal")))
checkAnswer(sql(s"SELECT * FROM main_table_preagg_max"),
Seq(Row(1, "david"), Row(2, "eason"), Row(3, "jarry"), Row(4, "vishal")))
// check select and match or not match pre-aggregate table
checkPreAggTable(sql("SELECT id, SUM(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_sum")
checkPreAggTable(sql("SELECT id, SUM(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_avg", "main_table")
checkPreAggTable(sql("SELECT id, AVG(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_avg")
checkPreAggTable(sql("SELECT id, AVG(name) from main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT id, COUNT(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_count")
checkPreAggTable(sql("SELECT id, COUNT(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT id, MIN(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_min")
checkPreAggTable(sql("SELECT id, MIN(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT id, MAX(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_max")
checkPreAggTable(sql("SELECT id, MAX(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
// sub query should match pre-aggregate table
checkPreAggTable(sql("SELECT SUM(name) FROM main_table"),
true, "main_table_preagg_sum")
checkPreAggTable(sql("SELECT SUM(name) FROM main_table"),
false, "main_table_preagg_avg", "main_table")
checkPreAggTable(sql("SELECT AVG(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_avg")
checkPreAggTable(sql("SELECT AVG(name) from main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT COUNT(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_count")
checkPreAggTable(sql("SELECT COUNT(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT MIN(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_min")
checkPreAggTable(sql("SELECT MIN(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
checkPreAggTable(sql("SELECT MAX(name) FROM main_table GROUP BY id"),
true, "main_table_preagg_max")
checkPreAggTable(sql("SELECT MAX(name) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum", "main_table")
}
test("test whether all segments are loaded into pre-aggregate table if segments are set on main table 2") {
sql("DROP TABLE IF EXISTS segmaintable")
sql(
"""
| CREATE TABLE segmaintable(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql("set carbon.input.segments.default.segmaintable=0")
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 26)))
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE segmaintable
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM segmaintable
| GROUP BY id
""".stripMargin)
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
false, "segmaintable_preagg_sum")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 26)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
}
test("test whether all segments are loaded into pre-aggregate table if segments are set on main table 3") {
sql("DROP TABLE IF EXISTS segmaintable")
sql(
"""
| CREATE TABLE segmaintable(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql("set carbon.input.segments.default.segmaintable=0")
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 26)))
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE segmaintable
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM segmaintable
| GROUP BY id
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 26), Row(1, 26)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
}
test("test whether all segments are loaded into pre-aggregate table if segments are set on main table 4") {
sql("DROP TABLE IF EXISTS segmaintable")
sql(
"""
| CREATE TABLE segmaintable(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
// check value before set segments
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 52)))
sql("set carbon.input.segments.default.segmaintable=0")
// check value after set segments
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 26)))
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE segmaintable
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM segmaintable
| GROUP BY id
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 52), Row(1, 26)))
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 26)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
false, "segmaintable_preagg_sum")
sqlContext.sparkSession.catalog.clearCache()
// reset
sql("reset")
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 78)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
}
test("test whether all segments are loaded into pre-aggregate table: auto merge and input segment") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
sql("DROP TABLE IF EXISTS segmaintable")
sql(
"""
| CREATE TABLE segmaintable(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql("set carbon.input.segments.default.segmaintable=0")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
// check value before auto merge
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 26)))
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
// check value after set segments and auto merge
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq.empty)
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
false, "segmaintable_preagg_sum")
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE segmaintable
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM segmaintable
| GROUP BY id
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sqlContext.sparkSession.catalog.clearCache()
// reset
sql("reset")
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 130)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}
//TODO: need to check and fix
ignore("test whether all segments are loaded into pre-aggregate table: auto merge and no input segment") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
sql("DROP TABLE IF EXISTS segmaintable")
sql(
"""
| CREATE TABLE segmaintable(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE segmaintable
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM segmaintable
| GROUP BY id
""".stripMargin)
// check value before auto merge
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 78)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
// check value after auto merge
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 130)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}
test("test whether all segments are loaded into pre-aggregate table: create after auto merge and no input segment") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
sql("DROP TABLE IF EXISTS segmaintable")
sql(
"""
| CREATE TABLE segmaintable(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE segmaintable
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM segmaintable
| GROUP BY id
""".stripMargin)
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 130)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
Seq(Row(1, 156)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
true, "segmaintable_preagg_sum")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}
//TODO: need to check and fix
ignore("test whether all segments are loaded into pre-aggregate table: mixed, load, auto merge and input segment") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
sql("DROP TABLE IF EXISTS main_table")
sql(
"""
| CREATE TABLE main_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
createAllAggregateTables("main_table", "age")
sql("set carbon.input.segments.default.main_table=0")
checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
false, "main_table_preagg_sum")
checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
Seq(Row(1, 26)))
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_sum")
sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
Seq(Row(1, 171), Row(2, 81), Row(3, 210), Row(4, 165)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_sum")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}
//TODO: need to check and fix
ignore("test whether all segments are loaded into pre-aggregate table: auto merge and check pre-aggregate segment") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
sql("DROP TABLE IF EXISTS main_table")
sql(
"""
| CREATE TABLE main_table(
| id INT,
| name STRING,
| city STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
sql(
s"""
| CREATE DATAMAP preagg_sum
| ON TABLE main_table
| USING 'preaggregate'
| AS SELECT id, SUM(age)
| FROM main_table
| GROUP BY id
""".stripMargin)
sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
checkExistence(sql("show segments for table main_table_preagg_sum"), false, "Compacted")
sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
// check the data whether auto merge
checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
Seq(Row(1, 109), Row(2, 27), Row(3, 70), Row(4, 55)))
checkExistence(sql("show segments for table main_table_preagg_sum"), true, "Compacted")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
Seq(Row(1, 171), Row(2, 81), Row(3, 210), Row(4, 165)))
checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
true, "main_table_preagg_sum")
checkExistence(sql("show segments for table main_table_preagg_sum"), true, "Compacted")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}
test("test deferred rebuild is not supported for preagg") {
val baseTable = "maintable"
val preagg = "preaggtable"
sql(s"DROP TABLE IF EXISTS $baseTable")
sql(
s"""
| CREATE TABLE $baseTable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
val deferredRebuildException = intercept[MalformedDataMapCommandException] {
sql(
s"""
| CREATE DATAMAP $preagg ON TABLE $baseTable
| USING 'preaggregate'
| WITH DEFERRED REBUILD
| AS select id, sum(age) from $baseTable group by id
""".stripMargin)
}
assert(deferredRebuildException.getMessage.contains(
s"DEFERRED REBUILD is not supported on this datamap $preagg with provider preaggregate"))
sql(
s"""
| CREATE DATAMAP $preagg ON TABLE $baseTable
| USING 'preaggregate'
| AS select id, sum(age) from $baseTable group by id
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $baseTable")
checkExistence(sql(s"SHOW DATAMAP ON TABLE $baseTable"), true, preagg, "preaggregate")
val exception = intercept[MalformedDataMapCommandException] {
sql(s"REBUILD DATAMAP $preagg ON TABLE $baseTable").show()
}
LOGGER.error(s"XU ${exception.getMessage}")
assert(exception.getMessage.contains(s"Non-lazy datamap $preagg does not support rebuild"))
sql(s"DROP TABLE IF EXISTS $baseTable")
}
}