blob: 4e84caac5c10c5380038f2fa674b79ef955b74a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the"License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an"AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.datacompaction
import java.io.{File, PrintWriter}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
class TableLevelCompactionOptionTest extends QueryTest
with BeforeAndAfterEach with BeforeAndAfterAll {
val tempFilePath: String = s"$resourcesPath/temp/tableLevelCompactionParaTest.csv"
val sampleFilePath: String = resourcesPath + "/sample.csv"
override def beforeEach {
cleanTable()
}
override def afterEach {
resetConf()
cleanTable()
}
private def resetConf() ={
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)
}
private def cleanTable() = {
deleteTempFile()
sql("DROP TABLE IF EXISTS carbon_table")
}
private def generateTempFile() = {
val writer = new PrintWriter(new File(tempFilePath))
try {
writer.println("id,name,city,age")
val lines =
s"""|1,david,shenzhen,31
|2,eason,shenzhen,27
|3,jarry,wuhan,35
|3,jarry,Bangalore,35
|4,kunal,Delhi,26
|4,vishal,Bangalore,29""".stripMargin
for (i <- 0 until 250000) {
writer.println(lines)
}
writer.flush()
} finally {
if (writer != null) writer.close()
}
}
private def deleteTempFile() = {
val file = new File(tempFilePath)
if (file.exists()) {
file.delete()
}
}
test("MAJOR_COMPACTION_SIZE, use system level configuration"){
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "10")
// generate a temp file which is larger than 1M but smaller than 5M
generateTempFile()
sql(
"""
|CREATE TABLE carbon_table
|(id INT, name STRING, city STRING, age INT)
|STORED AS carbondata
|TBLPROPERTIES('SORT_COLUMNS'='city,name')
""".stripMargin)
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$tempFilePath' INTO TABLE carbon_table")
}
sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
sql("CLEAN FILES FOR TABLE carbon_table")
val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(!SegmentSequenceIds.contains("0"))
assert(!SegmentSequenceIds.contains("1"))
assert(SegmentSequenceIds.contains("0.1"))
}
test("MAJOR_COMPACTION_SIZE, use table level configuration") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "10")
// generate a temp file which is larger than 1M but smaller than 5M
generateTempFile()
sql(
"""
|CREATE TABLE carbon_table
|(id INT, name STRING, city STRING, age INT)
|STORED AS carbondata
|TBLPROPERTIES('SORT_COLUMNS'='city,name',
|'MAJOR_COMPACTION_SIZE'='1', 'LOCAL_DICTIONARY_ENABLE'='false')
""".stripMargin)
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$tempFilePath' INTO TABLE carbon_table")
}
// each segment is larger than 1M, so no segments will be compacted
sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
sql("CLEAN FILES FOR TABLE carbon_table")
val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0"))
assert(SegmentSequenceIds.contains("1"))
assert(!SegmentSequenceIds.contains("0.1"))
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)
}
test("ENABLE_AUTO_LOAD_MERGE: true, use system level configuration"){
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
sql(
"""
| CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name')
""".stripMargin)
for (i <- 0 until 8) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
sql("CLEAN FILES FOR TABLE carbon_table")
var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.size==1)
assert(segmentSequenceIds.contains("0.2"))
}
test("ENABLE_AUTO_LOAD_MERGE: false, use table level configuration"){
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
sql(
"""
| CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name',
| 'AUTO_LOAD_MERGE'='false')
""".stripMargin)
for (i <- 0 until 8) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
// table level configuration: 'AUTO_LOAD_MERGE'='false', so no segments will be compacted
checkExistence(sql("SHOW SEGMENTS FOR TABLE carbon_table"), false, "Compacted")
var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.size==8)
assert(!segmentSequenceIds.contains("0.1"))
assert(!segmentSequenceIds.contains("4.1"))
assert(!segmentSequenceIds.contains("0.2"))
}
test("ENABLE_AUTO_LOAD_MERGE: true, use table level configuration") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
sql(
"""
| CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name',
| 'AUTO_LOAD_MERGE'='true',
| 'COMPACTION_LEVEL_THRESHOLD'='3,2',
| 'COMPACTION_PRESERVE_SEGMENTS'='2',
| 'TABLE_ALLOWED_COMPACTION_DAYS'='1')
""".stripMargin)
// load 6 segments, the latest 2 segments are preserved
// only one level-1 minor compaction is triggered which compacts segment 0,1,2 to segment 0.1
// seg0 \
// seg1 -- compacted to seg0.1
// seg2 /
// seg3
// seg4 (preserved)
// seg5 (preserved)
for (i <- 0 until 6) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
sql("CLEAN FILES FOR TABLE carbon_table")
var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.contains("0.1"))
assert(!segmentSequenceIds.contains("3.1"))
assert(!segmentSequenceIds.contains("0.2"))
// load another two segments, the latest 2 segments are preserved
// level-2 minor compaction is triggered which compacts segment 0,1,2,3,4,5 -> 0.2
// seg0 \
// seg1 -- compacted to seg0.1 \
// seg2 / -- compacted to seg0.2
// seg3 \ /
// seg4 -- compacted to seg3.1
// seg5 /
// seg6 (preserved)
// seg7 (preserved)
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
sql("CLEAN FILES FOR TABLE carbon_table")
segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.contains("0.2"))
assert(!segmentSequenceIds.contains("0.1"))
assert(!segmentSequenceIds.contains("3.1"))
}
test("AUTO MERGE TRUE:Verify 2nd Level compaction equals to 1"){
sql("DROP TABLE IF EXISTS tablecompaction_table")
sql(
"""
|create table tablecompaction_table(
|name string,age int) STORED AS carbondata
|tblproperties('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,1')
""".stripMargin)
for(i <-0 until 4){
sql("insert into tablecompaction_table select 'a',12")
}
var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.size==6)
assert(segmentSequenceIds.contains("0.1"))
assert(segmentSequenceIds.contains("2.1"))
}
test("AUTO MERGE FALSE:Verify 2nd Level compaction equals to 1"){
sql("DROP TABLE IF EXISTS tablecompaction_table")
sql(
"""
|create table tablecompaction_table(
|name string,age int) STORED AS carbondata
|tblproperties('COMPACTION_LEVEL_THRESHOLD'='2,1')
""".stripMargin)
for(i <-0 until 4){
sql("insert into tablecompaction_table select 'a',12")
}
sql("alter table tablecompaction_table compact 'minor' ")
var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.size==6)
assert(segmentSequenceIds.contains("0.1"))
assert(segmentSequenceIds.contains("2.1"))
}
// 2nd Level compaction value = 0 is supported by system level(like 6,0)
// same need to support for table level also
test("Verify 2nd Level compaction equals to 0"){
sql("DROP TABLE IF EXISTS tablecompaction_table")
sql(
"""
|create table tablecompaction_table(
|name string,age int) STORED AS carbondata
|tblproperties('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,0')
""".stripMargin)
for(i <-0 until 4){
sql("insert into tablecompaction_table select 'a',12")
}
var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.size==6)
assert(segmentSequenceIds.contains("0.1"))
assert(segmentSequenceIds.contains("2.1"))
}
test("System Level:Verify 2nd Level compaction equals to 1"){
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,1")
sql("DROP TABLE IF EXISTS tablecompaction_table")
sql(
"""
|create table tablecompaction_table(
|name string,age int) STORED AS carbondata
""".stripMargin)
for(i <-0 until 4){
sql("insert into tablecompaction_table select 'a',12")
}
sql("alter table tablecompaction_table compact 'minor' ")
var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.size==6)
assert(segmentSequenceIds.contains("0.1"))
assert(segmentSequenceIds.contains("2.1"))
}
}