blob: 3c2b46f89e04cf87005a25b5bbeb7cffe49d8eef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.dataretention
import java.text.SimpleDateFormat
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.commons.lang3.time.DateUtils
import org.apache.spark.sql.{CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
/**
* This class contains data retention feature test cases
*/
class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
var absoluteTableIdentifierForLock: AbsoluteTableIdentifier = null
var absoluteTableIdentifierForRetention: AbsoluteTableIdentifier = null
var carbonTablePath : String = null
var carbonDateFormat = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
var defaultDateFormat = new SimpleDateFormat(CarbonCommonConstants
.CARBON_TIMESTAMP_DEFAULT_FORMAT)
var carbonTableStatusLock: ICarbonLock = null
var carbonDeleteSegmentLock: ICarbonLock = null
var carbonCleanFilesLock: ICarbonLock = null
var carbonMetadataLock: ICarbonLock = null
override def beforeAll {
sql("drop table if exists DataRetentionTable")
sql("drop table if exists retentionlock")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK, "1")
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "1")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
sql(
"CREATE table DataRetentionTable (ID int, date String, country String, name " +
"String," +
"phonetype String, serialname String, salary int) STORED AS carbondata"
)
sql(
"CREATE table retentionlock (ID int, date String, country String, name " +
"String," +
"phonetype String, serialname String, salary int) STORED AS carbondata"
)
val carbonTable =
CarbonEnv.getCarbonTable(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"retentionlock")(sqlContext.sparkSession)
absoluteTableIdentifierForLock = carbonTable.getAbsoluteTableIdentifier
val carbonTable2 =
CarbonEnv.getCarbonTable(Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
"dataRetentionTable")(sqlContext.sparkSession)
absoluteTableIdentifierForRetention = carbonTable2.getAbsoluteTableIdentifier
carbonTablePath = CarbonTablePath
.getMetadataPath(absoluteTableIdentifierForRetention.getTablePath)
carbonTableStatusLock = CarbonLockFactory
.getCarbonLockObj(absoluteTableIdentifierForLock, LockUsage.TABLE_STATUS_LOCK)
carbonDeleteSegmentLock= CarbonLockFactory
.getCarbonLockObj(absoluteTableIdentifierForLock, LockUsage.DELETE_SEGMENT_LOCK)
carbonCleanFilesLock = CarbonLockFactory
.getCarbonLockObj(absoluteTableIdentifierForLock, LockUsage.CLEAN_FILES_LOCK)
carbonMetadataLock = CarbonLockFactory
.getCarbonLockObj(absoluteTableIdentifierForLock, LockUsage.METADATA_LOCK)
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE retentionlock " +
"OPTIONS('DELIMITER' = ',')")
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE DataRetentionTable " +
"OPTIONS('DELIMITER' = ',')")
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention2.csv' INTO TABLE DataRetentionTable " +
"OPTIONS('DELIMITER' = ',')")
}
override def afterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
sql("drop table if exists DataRetentionTable")
sql("drop table if exists retentionlock")
}
private def getSegmentStartTime(segments: Array[LoadMetadataDetails],
segmentId: Integer): String = {
val segmentLoadTimeString = segments(segmentId).getLoadStartTime()
var loadTime = carbonDateFormat.parse(carbonDateFormat.format(segmentLoadTimeString))
// add one min to execute delete before load start time command
loadTime = DateUtils.addMinutes(loadTime, 1)
defaultDateFormat.format(loadTime)
}
test("RetentionTest_withoutDelete") {
checkAnswer(
sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
" IN ('china','ind','aus','eng') GROUP BY country"
),
Seq(Row("aus", 9), Row("ind", 9))
)
}
test("RetentionTest_DeleteSegmentsByLoadTime") {
val segments: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(carbonTablePath)
// check segment length, it should be 3 (loads)
if (segments.length != 2) {
assert(false)
}
val actualValue: String = getSegmentStartTime(segments, 1)
// delete segments (0,1) which contains ind, aus
sql(
"delete from table DataRetentionTable where segment.starttime before '" + actualValue + "'")
// load segment 2 which contains eng
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention3.csv' INTO TABLE DataRetentionTable " +
"OPTIONS('DELIMITER' = ',')")
checkAnswer(
sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
" IN ('china','ind','aus','eng') GROUP BY country"
),
Seq(Row("eng", 9))
)
}
test("RetentionTest3_DeleteByLoadId") {
// delete segment 2 and load ind segment
sql("delete from table DataRetentionTable where segment.id in (2)")
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE DataRetentionTable " +
"OPTIONS('DELIMITER' = ',')")
checkAnswer(
sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
" IN ('china','ind','aus','eng') GROUP BY country"
),
Seq(Row("ind", 9))
)
// these queries should execute without any error.
sql("show segments for table DataRetentionTable")
sql("clean files for table DataRetentionTable")
}
test("RetentionTest4_DeleteByInvalidLoadId") {
val e = intercept[MalformedCarbonCommandException] {
// delete segment with no id
sql("delete from table DataRetentionTable where segment.id in ()")
}
assert(e.getMessage.contains("should not be empty"))
}
test("test delete segments by load date with case-insensitive table name") {
sql(
"""
CREATE TABLE IF NOT EXISTS carbon_table_1
(ID Int, date Timestamp, country String,
name String, phonetype String, serialname String, salary Int)
STORED AS carbondata
""")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/emptyDimensionData.csv' into table carbon_table_1")
checkAnswer(
sql("select count(*) from carbon_table_1"), Seq(Row(20)))
sql("delete from table carbon_table_1 where segment.starttime " +
" before '2099-07-28 11:00:00'")
checkAnswer(
sql("select count(*) from carbon_table_1"), Seq(Row(0)))
sql("DROP TABLE carbon_table_1")
}
test("RetentionTest_DeleteSegmentsByLoadTimeValiadtion") {
val e = intercept[MalformedCarbonCommandException] {
sql(
"delete from table DataRetentionTable where segment.starttime before" +
" 'abcd-01-01 00:00:00'")
}
assert(e.getMessage.contains("Invalid load start time format"))
val ex = intercept[MalformedCarbonCommandException] {
sql(
"delete from table DataRetentionTable where segment.starttime before" +
" '2099:01:01 00:00:00'")
}
assert(ex.getMessage.contains("Invalid load start time format"))
checkAnswer(
sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
" IN ('china','ind','aus','eng') GROUP BY country"
),
Seq(Row("ind", 9))
)
sql("delete from table DataRetentionTable where segment.starttime before '2099-01-01'")
checkAnswer(
sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
" IN ('china','ind','aus','eng') GROUP BY country"), Seq())
}
test("RetentionTest_InvalidDeleteCommands") {
// All these queries should fail.
intercept[Exception] {
sql("DELETE LOADS FROM TABLE DataRetentionTable where STARTTIME before '2099-01-01'")
}
intercept[Exception] {
sql("DELETE LOAD 2 FROM TABLE DataRetentionTable")
}
intercept[Exception] {
sql("show loads for table DataRetentionTable")
}
}
test("RetentionTest_Locks") {
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE retentionlock " +
"OPTIONS('DELIMITER' = ',')")
carbonDeleteSegmentLock.lockWithRetries()
carbonTableStatusLock.lockWithRetries()
carbonCleanFilesLock.lockWithRetries()
// delete segment 0 it should fail
intercept[Exception] {
sql("delete from table retentionlock where segment.id in (0)")
}
// it should fail
intercept[Exception] {
sql("delete from table retentionlock where segment.starttime before " +
"'2099-01-01 00:00:00.0'")
}
// it should fail
intercept[Exception] {
sql("clean files for table retentionlock")
}
sql("SHOW SEGMENTS FOR TABLE retentionlock").show
carbonTableStatusLock.unlock()
carbonCleanFilesLock.unlock()
carbonDeleteSegmentLock.unlock()
sql("delete from table retentionlock where segment.id in (0)")
//load and delete should execute parallely
carbonMetadataLock.lockWithRetries()
sql("delete from table retentionlock where segment.id in (1)")
carbonMetadataLock.unlock()
}
}