blob: 94af7c3daa04df0c699ccfc27f76cbb0bffdbbb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.badrecordloger
import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.{CarbonCommonConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.BadRecordUtil
import org.apache.spark.sql.test.util.QueryTest
/**
* Test Class for detailed query on timestamp datatypes
*
*
*/
class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
defaultConfig()
try {
sql("drop table IF EXISTS sales")
sql("drop table IF EXISTS serializable_values")
sql("drop table IF EXISTS serializable_values_false")
sql("drop table IF EXISTS insufficientColumn")
sql("drop table IF EXISTS insufficientColumn_false")
sql("drop table IF EXISTS emptyColumnValues")
sql("drop table IF EXISTS emptyColumnValues_false")
sql("drop table IF EXISTS empty_timestamp")
sql("drop table IF EXISTS empty_timestamp_false")
sql("drop table IF EXISTS dataloadOptionTests")
sql("drop table IF EXISTS sales_test")
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
var csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
+
"('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"')");
// 1.0 "\N" which should be treated as NULL
// 1.1 Time stamp "\N" which should be treated as NULL
csvFilePath = s"$resourcesPath/badrecords/seriazableValue.csv"
sql(
"""CREATE TABLE IF NOT EXISTS serializable_values(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values OPTIONS"
+
"('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// load with bad_records_logger_enable false
sql(
"""CREATE TABLE IF NOT EXISTS serializable_values_false(ID BigInt, date Timestamp,
country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
sql(
"LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// 2. insufficient columns - Bad records/Null value based on configuration
sql(
"""CREATE TABLE IF NOT EXISTS insufficientColumn(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
csvFilePath = s"$resourcesPath/badrecords/insufficientColumns.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn OPTIONS"
+
"('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// load with bad_records_logger_enable false
sql(
"""CREATE TABLE IF NOT EXISTS insufficientColumn_false(ID BigInt, date Timestamp, country
String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// 3. empty data for string data type - take empty value
// 4. empty data for non-string data type - Bad records/Null value based on configuration
//table should have only two records.
sql(
"""CREATE TABLE IF NOT EXISTS emptyColumnValues(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+
"('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true'," +
" 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// load with bad_records_logger_enable to false
sql(
"""CREATE TABLE IF NOT EXISTS emptyColumnValues_false(ID BigInt, date Timestamp, country
String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// 4.1 Time stamp empty data - Bad records/Null value based on configuration
// 5. non-parsable data - Bad records/Null value based on configuration
// 6. empty line(check current one) - Bad records/Null value based on configuration
// only one value should be loadded.
sql(
"""CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+
"('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true' ," +
"'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// load with bad_records_logger_enable to false
sql(
"""CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp, country
String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
} catch {
case x: Throwable => CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
}
test("select count(*) from sales") {
checkAnswer(
sql("select count(*) from sales"),
Seq(Row(2)
)
)
}
test("select count(*) from serializable_values") {
checkAnswer(
sql("select count(*) from serializable_values"),
Seq(Row(2)
)
)
}
test("select count(*) from serializable_values_false") {
checkAnswer(
sql("select count(*) from serializable_values_false"),
Seq(Row(2)
)
)
}
test("select count(*) from empty_timestamp") {
checkAnswer(
sql("select count(*) from empty_timestamp"),
Seq(Row(1)
)
)
}
test("select count(*) from insufficientColumn") {
checkAnswer(
sql("select count(*) from insufficientColumn"),
Seq(Row(3)
)
)
}
test("select count(*) from insufficientColumn_false") {
checkAnswer(
sql("select count(*) from insufficientColumn_false"),
Seq(Row(3)
)
)
}
test("select count(*) from emptyColumnValues") {
checkAnswer(
sql("select count(*) from emptyColumnValues"),
Seq(Row(2)
)
)
}
test("select count(*) from emptyColumnValues_false") {
checkAnswer(
sql("select count(*) from emptyColumnValues_false"),
Seq(Row(7)
)
)
}
test("select count(*) from empty_timestamp_false") {
checkAnswer(
sql("select count(*) from empty_timestamp_false"),
Seq(Row(7)
)
)
}
test("test load ddl command") {
sql(
"""CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
try {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
+ "('bad_records_action'='FORCA', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
} catch {
case ex: Exception =>
assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT or FAIL"
.equals(ex.getMessage))
}
}
test("validate redirected data") {
BadRecordUtil.cleanBadRecordPath("default", "sales_test")
val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql(
"""CREATE TABLE IF NOT EXISTS sales_test(ID BigInt, date long, country int,
actual_price Double, Quantity String, sold_price Decimal(19,2)) STORED AS carbondata""")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
try {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_test OPTIONS" +
"('bad_records_logger_enable'='false','bad_records_action'='redirect', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"')");
} catch {
case e: Exception => {
assert(true)
}
}
val redirectCsvPath = BadRecordUtil.getRedirectCsvPath("default", "sales_test", "0", "0")
assert(BadRecordUtil.checkRedirectedCsvContentAvailableInSource(csvFilePath, redirectCsvPath))
}
test("test load ddl command with improper value") {
sql("drop table IF EXISTS dataLoadOptionTests")
sql(
s"""
| CREATE TABLE IF NOT EXISTS dataLoadOptionTests(
| ID BigInt,
| date Timestamp,
| country String,
| actual_price Double,
| Quantity int,
| sold_price Decimal(19,2)
| ) STORED AS carbondata
""".stripMargin.trim)
val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
try {
sql(
s"""
| LOAD DATA local inpath '" + $csvFilePath + "' INTO TABLE dataLoadOptionTests
| OPTIONS(
| 'bad_records_logger_enable'='fals',
| 'DELIMITER'= ',',
| 'QUOTECHAR'= '\"'
| )""".stripMargin.trim);
assert(false)
} catch {
case ex: Exception =>
assert(ex.getMessage.contains(
"option BAD_RECORDS_LOGGER_ENABLE can have only either TRUE or FALSE, " +
"It shouldn't be fals"
))
} finally {
sql("drop table IF EXISTS dataLoadOptionTests")
}
}
override def afterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
sql("drop table IF EXISTS sales")
sql("drop table IF EXISTS sales_test")
sql("drop table IF EXISTS serializable_values")
sql("drop table IF EXISTS serializable_values_false")
sql("drop table IF EXISTS insufficientColumn")
sql("drop table IF EXISTS insufficientColumn_false")
sql("drop table IF EXISTS emptyColumnValues")
sql("drop table IF EXISTS emptyColumnValues_false")
sql("drop table IF EXISTS empty_timestamp")
sql("drop table IF EXISTS empty_timestamp_false")
sql("drop table IF EXISTS dataloadOptionTests")
sql("drop table IF EXISTS loadIssue")
}
}