blob: c19c51e83c5d28e64fa926c6a7a2b64a5d417799 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.standardpartition
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
/**
* Test Class for detailed query on timestamp datatypes
*
*
*/
class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
var hiveContext: HiveContext = _
override def beforeAll {
drop()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
}
test("test partition redirect") {
sql(
s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED AS carbondata TBLPROPERTIES('BAD_RECORD_PATH'='$warehouse')""")
val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
+
"('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"')")
checkAnswer(
sql("select count(*) from sales"),
Seq(Row(2)
)
)
}
test("test partition serializable_values") {
// 1.0 "\N" which should be treated as NULL
// 1.1 Time stamp "\N" which should be treated as NULL
val csvFilePath = s"$resourcesPath/badrecords/seriazableValue.csv"
sql(
"""CREATE TABLE IF NOT EXISTS serializable_values(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED AS carbondata
""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values OPTIONS"
+
"('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
checkAnswer(
sql("select count(*) from serializable_values"),
Seq(Row(2)
)
)
}
test("test partition serializable_values_false") {
val csvFilePath = s"$resourcesPath/badrecords/seriazableValue.csv"
// load with bad_records_logger_enable false
sql(
"""CREATE TABLE IF NOT EXISTS serializable_values_false(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED AS carbondata
""")
sql(
"LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
checkAnswer(
sql("select count(*) from serializable_values_false"),
Seq(Row(2)
)
)
}
test("test partition with empty_timestamp") {
// 4.1 Time stamp empty data - Bad records/Null value based on configuration
// 5. non-parsable data - Bad records/Null value based on configuration
// 6. empty line(check current one) - Bad records/Null value based on configuration
// only one value should be loadded.
sql(
"""CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+
"('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true' ," +
"'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
checkAnswer(
sql("select count(*) from empty_timestamp"),
Seq(Row(1)
)
)
}
test("test partition with insufficientColumn") {
// 2. insufficient columns - Bad records/Null value based on configuration
sql(
"""CREATE TABLE IF NOT EXISTS insufficientColumn(date Timestamp,country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/insufficientColumns.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn OPTIONS"
+
"('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
checkAnswer(
sql("select count(*) from insufficientColumn"),
Seq(Row(3)
)
)
}
test("test partition with insufficientColumn_false") {
// load with bad_records_logger_enable false
sql(
"""CREATE TABLE IF NOT EXISTS insufficientColumn_false(date Timestamp,country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/insufficientColumns.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
checkAnswer(
sql("select count(*) from insufficientColumn_false"),
Seq(Row(3)
)
)
}
test("test partition with emptyColumnValues") {
// 3. empty data for string data type - take empty value
// 4. empty data for non-string data type - Bad records/Null value based on configuration
//table should have only two records.
sql(
"""CREATE TABLE IF NOT EXISTS emptyColumnValues(date Timestamp,country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+
"('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true'," +
" 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')")
checkAnswer(
sql("select count(*) from emptyColumnValues"),
Seq(Row(2)
)
)
}
test("test partition with emptyColumnValues_false") {
// load with bad_records_logger_enable to false
sql(
"""CREATE TABLE IF NOT EXISTS emptyColumnValues_false(date Timestamp,country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (ID BigInt) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
checkAnswer(
sql("select count(*) from emptyColumnValues_false"),
Seq(Row(7)
)
)
}
test("test partition with empty_timestamp_false") {
// load with bad_records_logger_enable to false
sql(
"""CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+ "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
checkAnswer(
sql("select count(*) from empty_timestamp_false"),
Seq(Row(7)
)
)
}
test("test load ddl command") {
sql(
"""CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata
""")
val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
try {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
+ "('bad_records_action'='FORCA', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
} catch {
case ex: Exception =>
assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT or FAIL"
.equals(ex.getMessage))
}
}
def drop(): Unit = {
sql("drop table IF EXISTS sales")
sql("drop table IF EXISTS serializable_values")
sql("drop table IF EXISTS serializable_values_false")
sql("drop table IF EXISTS insufficientColumn")
sql("drop table IF EXISTS insufficientColumn_false")
sql("drop table IF EXISTS emptyColumnValues")
sql("drop table IF EXISTS emptyColumnValues_false")
sql("drop table IF EXISTS empty_timestamp")
sql("drop table IF EXISTS empty_timestamp_false")
sql("drop table IF EXISTS dataloadOptionTests")
}
override def afterAll {
drop()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
}
}