blob: a1d0fe057600e7112ccbce1029d38f8a93d19399 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the"License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an"AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.datacompaction
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.path.CarbonTablePath
class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val filePath: String = s"$resourcesPath/globalsort"
val file1: String = resourcesPath + "/globalsort/sample1.csv"
val file2: String = resourcesPath + "/globalsort/sample2.csv"
val file3: String = resourcesPath + "/globalsort/sample3.csv"
override def beforeEach {
resetConf
sql("DROP TABLE IF EXISTS compaction_globalsort")
sql(
"""
| CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1')
""".stripMargin)
sql("DROP TABLE IF EXISTS carbon_localsort")
sql(
"""
| CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
}
override def afterEach {
sql("DROP TABLE IF EXISTS compaction_globalsort")
sql("DROP TABLE IF EXISTS carbon_localsort")
resetConf()
}
test("MINOR, ENABLE_AUTO_LOAD_MERGE: false") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(!SegmentSequenceIds.contains("0.1"))
assert(SegmentSequenceIds.length == 6)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
}
test("MINOR, ENABLE_AUTO_LOAD_MERGE: true") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
// loaded 6 times and produced 6 segments,
// auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor),
// so total segment number is 7
assert(SegmentSequenceIds.length == 7)
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
}
test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
"0")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 4)
assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
}
test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
"4")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(!SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 6)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 0)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
}
test("MINOR, DAYS_ALLOWED_TO_COMPACT: 0") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
"0")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 4)
assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
}
test("MINOR, DAYS_ALLOWED_TO_COMPACT: 4") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
"4")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 4)
assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
}
test("MAJOR, ENABLE_AUTO_LOAD_MERGE: false") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(SegmentSequenceIds.length == 7)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
}
test("MAJOR, ENABLE_AUTO_LOAD_MERGE: true") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
// loaded 6 times and produced 6 segments,
// auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor),
// major compact and prodece 1 segment
// so total segment number is 8
assert(SegmentSequenceIds.length == 8)
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
}
test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
"0")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 6)
assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
}
test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
"4")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 2)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
}
test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 0") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
"0")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 6)
assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
}
test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 4") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
"4")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(!SegmentSequenceIds.contains("4.1"))
assert(SegmentSequenceIds.length == 7)
val status = segments.collect().map { each => (each.toSeq) (1) }
assert(status.filter(_.equals("Compacted")).length == 6)
assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
}
test("MAJOR, ENABLE_PREFETCH_DURING_COMPACTION: true") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
}
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
assert(SegmentSequenceIds.length == 7)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
checkAnswer(sql("SELECT * FROM compaction_globalsort"),
sql("SELECT * FROM carbon_localsort"))
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT)
}
private def resetConf() {
val prop = CarbonProperties.getInstance()
prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
}
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
if (FileFactory.isFileExist(segmentDir)) {
new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
} else {
val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
}
}
}