blob: 656414184d2cc2055070f1461b13f8263ebbc993 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.dataload
import java.util
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import scala.collection.JavaConverters._
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
/**
* Test class of creating and loading for carbon table with staledata in segment folder
*
*/
class TestLoadDataWithStaleDataInSegmentFolder extends QueryTest with BeforeAndAfterAll {
val tableName = "staleDataInSegmentFolder"
val siName = "si_StaleDataInSegmentFolder"
val testData = s"$resourcesPath/sample.csv"
val sortcolumns = "id"
test("test load with staledata in segmentfolder, " +
"carbon.merge.index.in.segment = false") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
testIUDWithStaleData
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
}
test("test load with staledata in segmentfolder, " +
"carbon.merge.index.in.segment = true") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
testIUDWithStaleData
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
}
private def testIUDWithStaleData(): Unit = {
List("NO_SORT", "LOCAL_SORT", "GLOBAL_SORT").foreach(sort => {
List(true, false).foreach(isPartition => {
testLoadWithStaleData(sort, sortcolumns, isPartition)
testSIWithStaleData(sort, sortcolumns, isPartition)
testInsertIntoWithStaleData(sort, sortcolumns, isPartition)
testUpdateWithStaleData(sort, sortcolumns, isPartition)
testDeleteWithStaleData(sort, sortcolumns, isPartition)
testCompactWithStaleData(sort, sortcolumns, isPartition)
})
})
}
private def testLoadWithStaleData(sortscope: String, sortcolumns: String,
isPartition: Boolean): Unit = {
createTable(sortscope, sortcolumns, isPartition)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
mockStaleDataByRemoveTablestatus(tableName)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
verifyThereIsNoSameContentInDifferentIndexes(tableName, "0")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
sql(s"clean files for table $tableName")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
}
private def testCompactWithStaleData(sortscope: String, sortcolumns: String,
isPartition: Boolean): Unit = {
createTable(sortscope, sortcolumns, isPartition)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
mockStaleDataByRemoveTablestatus(tableName)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
sql(s"ALTER TABLE $tableName COMPACT 'MINOR'")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(24)))
sql(s"clean files for table $tableName")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(24)))
}
private def testSIWithStaleData(sortscope: String, sortcolumns: String,
isPartition: Boolean): Unit = {
createTable(sortscope, sortcolumns, isPartition)
createSI()
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
mockStaleDataByRemoveTablestatus(tableName)
mockStaleDataByRemoveTablestatus(siName)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
verifyThereIsNoSameContentInDifferentIndexes(tableName, "0")
verifyThereIsNoSameContentInDifferentIndexes(siName, "0")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
if (isPartition) {
checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(6)))
} else {
checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(4)))
}
sql(s"clean files for table $tableName")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
if (isPartition) {
checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(6)))
} else {
checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(4)))
}
}
private def testInsertIntoWithStaleData(sortscope: String, sortcolumns: String,
isPartition: Boolean): Unit = {
createTable(sortscope, sortcolumns, isPartition)
sql(s"INSERT INTO $tableName values(1, 'a', 'b', 2)")
mockStaleDataByRemoveTablestatus(tableName)
sql(s"INSERT INTO $tableName values(1, 'a', 'c', 2)")
verifyThereIsNoSameContentInDifferentIndexes(tableName, "0")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1)))
sql(s"clean files for table $tableName")
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1)))
}
private def testUpdateWithStaleData(sortscope: String, sortcolumns: String,
isPartition: Boolean): Unit = {
createTable(sortscope, sortcolumns, isPartition)
sql(s"INSERT INTO $tableName values(1, 'a', 'b', 2)")
mockStaleDataByRemoveTablestatus(tableName)
sql(s"INSERT INTO $tableName values(1, 'a', 'c', 2)")
sql("""update staleDataInSegmentFolder d set
| (d.id) = (d.id + 1) where d.name = 'a'""".stripMargin).collect()
checkAnswer(sql(s"select * from $tableName"),
Seq(Row(2, "a", "c", 2)))
}
private def testDeleteWithStaleData(sortscope: String, sortcolumns: String,
isPartition: Boolean): Unit = {
createTable(sortscope, sortcolumns, isPartition)
sql(s"INSERT INTO $tableName values(1, 'a', 'b', 2)")
mockStaleDataByRemoveTablestatus(tableName)
sql(s"INSERT INTO $tableName values(1, 'a', 'c', 2)")
sql("""delete from staleDataInSegmentFolder d where d.city = 'c'""".stripMargin).collect()
checkAnswer(sql(s"select * from $tableName"), Seq())
}
override def afterAll: Unit = {
sql(s"DROP TABLE IF EXISTS $tableName")
}
private def createTable(sortscope: String, sortcolumns: String, isPartition: Boolean): Unit = {
if (!isPartition) {
createNonPartitionTable(sortscope, sortcolumns)
} else {
createPartitionTable(sortscope, sortcolumns)
}
}
private def createNonPartitionTable(sortscope: String, sortcolumns: String): Unit = {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(
s"""
CREATE TABLE $tableName(id int, name string, city string, age int)
STORED AS carbondata
TBLPROPERTIES('sort_scope'='$sortscope','sort_columns'='$sortcolumns')
""")
}
private def createPartitionTable(sortscope: String, sortcolumns: String): Unit = {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(
s"""
CREATE TABLE $tableName(id int, name string, city string)
PARTITIONED BY(age int)
STORED AS carbondata
TBLPROPERTIES('sort_scope'='$sortscope','sort_columns'='$sortcolumns')
""")
}
private def createSI(): Unit = {
sql(s"create index $siName on table $tableName (city) AS 'carbondata'")
}
private def mockStaleDataByRemoveTablestatus(tableName: String): Unit = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
val tableStatusFile = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
FileFactory.getCarbonFile(tableStatusFile).delete()
}
private def verifyThereIsNoSameContentInDifferentIndexes(tableName: String,
segment: String): Unit = {
val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
var path = CarbonTablePath.
getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
if (table.isHivePartitionTable) {
path = table.getAbsoluteTableIdentifier.getTablePath
}
val allIndexFilesSet = new util.HashSet[String]()
val allIndexFilesList = new util.ArrayList[String]()
FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) ||
file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT)
}
}).asScala.map(indexFile => {
val ssim = new SegmentIndexFileStore()
if (indexFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
ssim.readMergeFile(indexFile.getAbsolutePath)
} else {
ssim.readIndexFile(indexFile)
}
allIndexFilesSet.addAll(ssim.getCarbonIndexMapWithFullPath.keySet())
allIndexFilesList.addAll(ssim.getCarbonIndexMapWithFullPath.keySet())
})
assert(allIndexFilesList.size() == allIndexFilesSet.size())
}
}