blob: 7bf0c35cad79a33e0f6835765f4789cca2b1ae7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.preaggregate
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.Matchers._
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val testData = s"$resourcesPath/sample.csv"
override def beforeEach(): Unit = {
sql("drop database if exists compaction cascade")
sql("create database if not exists compaction")
sql("use compaction")
sql("create table testtable (id int, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
sql(
"""
| CREATE TABLE maintable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(
s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
.stripMargin)
sql(
s"""create datamap preagg_avg on table maintable using 'preaggregate' as select id,avg(age) from maintable group by id"""
.stripMargin)
}
test("test if pre-agg table is compacted with parent table minor compaction") {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("insert into testtable select * from maintable")
val sumResult = sql("select id, sum(age) from testtable group by id").collect()
val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
sql("alter table maintable compact 'minor'")
val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
}
test("test if pre-agg table is compacted with parent table major compaction") {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable compact 'major'")
sql("insert into testtable select * from maintable")
val sumResult = sql("select id, sum(age) from testtable group by id").collect()
val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
sql("alter table maintable compact 'minor'")
val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
}
test("test if 2nd level minor compaction is successful for pre-agg table") {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable compact 'minor'")
var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
sql("insert into testtable select * from maintable")
var sumResult = sql("select id, sum(age) from testtable group by id").collect()
var avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
var segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable compact 'minor'")
sql("insert overwrite table testtable select * from maintable")
sumResult = sql("select id, sum(age) from testtable group by id").collect()
avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
segmentNamesAvg.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable compact 'minor'")
sql("insert overwrite table testtable select * from maintable")
sumResult = sql("select id, sum(age) from testtable group by id").collect()
avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
segmentNamesAvg should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult)
}
test("test direct minor compaction on pre-agg tables") {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable_preagg_sum compact 'minor'")
var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
sql("insert into testtable select * from maintable")
var sumResult = sql("select id, sum(age) from testtable group by id").collect()
checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable_preagg_sum compact 'minor'")
sql("insert overwrite table testtable select * from maintable")
sumResult = sql("select id, sum(age) from testtable group by id").collect()
segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable_preagg_sum compact 'minor'")
sql("insert overwrite table testtable select * from maintable")
sumResult = sql("select id, sum(age) from testtable group by id").collect()
segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
val mainTableSegment = sql("SHOW SEGMENTS FOR TABLE maintable")
val SegmentSequenceIds = mainTableSegment.collect().map { each => (each.toSeq) (0) }
assert(!SegmentSequenceIds.contains("0.1"))
}
test("test if minor/major compaction is successful for pre-agg table") {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable_preagg_sum compact 'minor'")
var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum should equal (Array("3","2","1","0.1", "0"))
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable_preagg_sum compact 'major'")
segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
}
test("test auto compaction on aggregate table") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}
test("test minor compaction on Pre-agg tables after multiple loads") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
sql("alter table maintable compact 'minor'")
assert(sql("show segments for table maintable").collect().map(_.get(1).toString.toLowerCase).contains("compacted"))
}
override def afterAll(): Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
sql("drop database if exists compaction cascade")
sql("use default")
}
}