blob: 84f411ed30604764658ca692e111e79c1b9c1ef9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.datacompaction
import java.io.{File, PrintWriter}
import scala.util.Random
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val file1 = resourcesPath + "/compaction/fil1.csv"
val file2 = resourcesPath + "/compaction/fil2.csv"
val file3 = resourcesPath + "/compaction/fil3.csv"
val file4 = resourcesPath + "/compaction/fil4.csv"
val file5 = resourcesPath + "/compaction/fil5.csv"
override protected def beforeAll(): Unit = {
resetConf("10")
//n should be about 5000000 of reset if size is default 1024
val n = 150000
CompactionSupportGlobalSortBigFileTest.createFile(file1, n, 0)
CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
CompactionSupportGlobalSortBigFileTest.createFile(file3, n * 3, n * 5)
CompactionSupportGlobalSortBigFileTest.createFile(file4, n * 2, n * 8)
CompactionSupportGlobalSortBigFileTest.createFile(file5, n * 2, n * 13)
}
override protected def afterAll(): Unit = {
CompactionSupportGlobalSortBigFileTest.deleteFile(file1)
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
CompactionSupportGlobalSortBigFileTest.deleteFile(file3)
CompactionSupportGlobalSortBigFileTest.deleteFile(file4)
CompactionSupportGlobalSortBigFileTest.deleteFile(file5)
resetConf(CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)
}
override def beforeEach {
sql("DROP TABLE IF EXISTS compaction_globalsort")
sql(
"""
| CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='global_sort')
""".stripMargin)
sql("DROP TABLE IF EXISTS carbon_localsort")
sql(
"""
| CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
""".stripMargin)
}
override def afterEach {
sql("DROP TABLE IF EXISTS compaction_globalsort")
sql("DROP TABLE IF EXISTS carbon_localsort")
}
test("Compaction major: segments size is bigger than default compaction size") {
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE carbon_localsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE carbon_localsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
checkAnswer(sql("select count(*) from compaction_globalsort"),sql("select count(*) from carbon_localsort"))
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(SegmentSequenceIds.contains("0.1"))
}
private def resetConf(size:String) {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, size)
}
}
object CompactionSupportGlobalSortBigFileTest {
def createFile(fileName: String, line: Int = 10000, start: Int = 0): Boolean = {
try {
val write = new PrintWriter(fileName);
for (i <- start until (start + line)) {
write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80))
}
write.close()
} catch {
case _: Exception => false
}
true
}
def deleteFile(fileName: String): Boolean = {
try {
val file = new File(fileName)
if (file.exists()) {
file.delete()
}
} catch {
case _: Exception => false
}
true
}
}