| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.spark.testsuite.badrecordloger |
| |
| import java.io.File |
| |
| import scala.collection.mutable.ListBuffer |
| import org.apache.commons.io.FileUtils |
| import org.apache.spark.sql.Row |
| import org.apache.spark.sql.test.util.QueryTest |
| import org.scalatest.BeforeAndAfterEach |
| |
| import org.apache.carbondata.common.constants.LoggerAction |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.spark.util.BadRecordUtil |
| |
| class BadRecordActionTest extends QueryTest { |
| |
| |
| val csvFilePath = s"$resourcesPath/badrecords/datasample.csv" |
| val badRecordFilePath = new File(currentPath + "/target/test/badRecords") |
| initCarbonProperties |
| |
| private def initCarbonProperties = { |
| defaultConfig() |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name()) |
| badRecordFilePath.mkdirs() |
| } |
| |
| test("test load for bad_record_action=force") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='force', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales"), |
| Seq(Row(6))) |
| |
| } |
| |
| test("test load for bad_record_action=FORCE") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='FORCE', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales"), |
| Seq(Row(6))) |
| } |
| |
| test("test load for bad_record_action=fail") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| val exception = intercept[Exception] { |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='fail', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| } |
| assert(exception.getMessage |
| .contains( |
| "Data load failed due to bad record: The value with column name date and column data " + |
| "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" + |
| " the detail reason")) |
| |
| } |
| |
| test("test load for bad_record_action=FAIL") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| val exception = intercept[Exception] { |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='FAIL', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| } |
| assert(exception.getMessage |
| .contains( |
| "Data load failed due to bad record: The value with column name date and column data " + |
| "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" + |
| " the detail reason")) |
| } |
| |
| |
| test("test load for bad_record_action=ignore") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='ignore', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales"), |
| Seq(Row(2))) |
| } |
| |
| test("test load for bad_record_action=IGNORE") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='IGNORE', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record REDIRECT but not having empty location in option should throw exception") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| val badRecordLocation = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC) |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, |
| CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL) |
| try { |
| val exMessage = intercept[Exception] { |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='REDIRECT', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| } |
| assert(exMessage.getMessage |
| .contains("Cannot redirect bad records as bad record location is not provided.")) |
| } |
| finally { |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, |
| badRecordLocation) |
| } |
| } |
| |
| test("test bad record is REDIRECT with location in carbon properties should pass") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='REDIRECT', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record is redirect with location in option while data loading should pass") { |
| sql("drop table if exists sales") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata""") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + |
| "('bad_records_action'='REDIRECT', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } + |
| "','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record FORCE option with no_sort as sort scope ") { |
| sql("drop table if exists sales_no_sort") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata tblproperties('sort_scope'='NO_SORT')""") |
| |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" + |
| "('bad_records_action'='FORCE', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales_no_sort"), |
| Seq(Row(6))) |
| } |
| |
| test("test bad record REDIRECT option with location and no_sort as sort scope ") { |
| sql("drop table if exists sales_no_sort") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata tblproperties('sort_scope'='NO_SORT')""") |
| |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" + |
| "('bad_records_action'='REDIRECT', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } + |
| "','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales_no_sort"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record IGNORE option with no_sort as sort scope ") { |
| sql("drop table if exists sales_no_sort") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata tblproperties('sort_scope'='NO_SORT')""") |
| |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" + |
| "('bad_records_action'='IGNORE', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales_no_sort"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record with FAIL option with location and no_sort as sort scope ") { |
| sql("drop table if exists sales_no_sort") |
| sql( |
| """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String, |
| actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata tblproperties('sort_scope'='NO_SORT')""") |
| |
| val exception = intercept[Exception] { |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" + |
| "('bad_records_action'='FAIL', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| } |
| assert(exception.getMessage |
| .contains( |
| "Data load failed due to bad record: The value with column name date and column data " + |
| "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" + |
| " the detail reason")) |
| } |
| |
| test("test bad record with IGNORE option for bucketed table") { |
| sql("drop table if exists sales_bucket") |
| sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," + |
| "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='2', 'BUCKET_COLUMNS'='country')") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" + |
| "('bad_records_action'='IGNORE', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales_bucket"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record with REDIRECT option for bucketed table") { |
| sql("drop table if exists sales_bucket") |
| sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," + |
| "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='2', 'BUCKET_COLUMNS'='country')") |
| sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" + |
| "('bad_records_action'='REDIRECT', 'DELIMITER'=" + |
| " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } + |
| "','timestampformat'='yyyy/MM/dd')") |
| checkAnswer(sql("select count(*) from sales_bucket"), |
| Seq(Row(2))) |
| } |
| |
| test("test bad record IGNORE with complex data types") { |
| val timeStampFormat = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT) |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) |
| sql("drop table if exists complextable") |
| sql("create table complextable(arrayColumn array<timestamp>, structColumn struct<s1:int,s2:timestamp>,arraystruct array<Struct<as1:int,as2:timestamp>>) stored as carbondata") |
| sql(s"LOAD DATA local inpath '$resourcesPath/badrecords/complexdata.csv' INTO TABLE complextable OPTIONS('bad_records_action'='ignore', 'DELIMITER'=',', " + |
| "'QUOTECHAR'= '\"','COMPLEX_DELIMITER_LEVEL_1'='$','COMPLEX_DELIMITER_LEVEL_2'='#')") |
| checkAnswer(sql("select count(*) from complextable"), Seq(Row(5))) |
| sql("drop table if exists complextable") |
| if(null != timeStampFormat) { |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampFormat) |
| } |
| } |
| |
| test("test bad record FAIL with invalid timestamp range") { |
| val csvPath = s"$resourcesPath/badrecords/invalidTimeStampRange.csv" |
| val rows = new ListBuffer[Array[String]] |
| rows += Array("ID", "date", "time") |
| rows += Array("1", "2016-7-24", "342016-7-24 01:02:30") |
| BadRecordUtil.createCSV(rows, csvPath) |
| sql("DROP TABLE IF EXISTS test_time") |
| sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time Timestamp) STORED AS carbondata " + |
| "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 'timestampformat'='yyyy-MM-dd HH:mm:ss') ") |
| val exception = intercept[Exception] { |
| sql(s" LOAD DATA LOCAL INPATH '$resourcesPath/badrecords/invalidTimeStampRange.csv' " + |
| s"into table test_time options ('bad_records_action'='fail')") |
| } |
| assert(exception.getMessage.contains("Data load failed due to bad record: The value with column name time and column data" + |
| " type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know the detail reason")) |
| sql("DROP TABLE IF EXISTS test_time") |
| FileUtils.forceDelete(new File(csvPath)) |
| } |
| |
| private def currentPath: String = { |
| new File(this.getClass.getResource("/").getPath + "../../") |
| .getCanonicalPath |
| } |
| } |