[CARBONDATA-3955] Fix load failures due to daylight saving time changes

Why is this PR needed?
1. Fix load failures due to daylight saving time changes.
2. During load, date/timestamp year values with >4 digit should fail or
be null according to bad records action property.

What changes were proposed in this PR?
New property added to setLeniet as true and parse timestampformat.
Added validation for timestamp range values.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3896
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a271da6..1a19b86 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1592,6 +1592,16 @@
 
   public static final String CARBON_LUCENE_INDEX_STOP_WORDS_DEFAULT = "false";
 
+  // Property to enable parsing the timestamp/date data with setLenient = true in load
+  // flow if it fails with parse invalid timestamp data. (example: 1941-03-15 00:00:00
+  // is valid time in Asia/Calcutta zone and is invalid and will fail to parse in Asia/Shanghai
+  // zone as DST is observed and clocks were turned forward 1 hour to 1941-03-15 01:00:00)
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String
+      CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE = "carbon.load.dateformat.setlenient.enable";
+
+  public static final String CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE_DEFAULT = "false";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Constant value start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 1cd181d..cf339d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2076,4 +2076,11 @@
   public static void setAuditEnabled(boolean enabled) {
     getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_AUDIT, String.valueOf(enabled));
   }
+
+  public boolean isSetLenientEnabled() {
+    String configuredValue =
+        getProperty(CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE,
+            CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE_DEFAULT);
+    return Boolean.parseBoolean(configuredValue);
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index dd362d6..fca2639 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -34,6 +34,7 @@
 import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -435,7 +436,8 @@
 
   private static Object parseTimestamp(String dimensionValue, String dateFormat) {
     Date dateToStr;
-    DateFormat dateFormatter;
+    DateFormat dateFormatter = null;
+    long timeValue;
     try {
       if (null != dateFormat && !dateFormat.trim().isEmpty()) {
         dateFormatter = new SimpleDateFormat(dateFormat);
@@ -444,9 +446,40 @@
         dateFormatter = timestampFormatter.get();
       }
       dateToStr = dateFormatter.parse(dimensionValue);
-      return dateToStr.getTime();
+      timeValue = dateToStr.getTime();
+      validateTimeStampRange(timeValue);
+      return timeValue;
     } catch (ParseException e) {
-      throw new NumberFormatException(e.getMessage());
+      // If the parsing fails, try to parse again with setLenient to true if the property is set
+      // (example: 1941-03-15 00:00:00 is invalid data and will fail to parse in Asia/Shanghai zone
+      // as DST is observed and clocks were turned forward 1 hour to 1941-03-15 01:00:00)
+      if (CarbonProperties.getInstance().isSetLenientEnabled()) {
+        try {
+          dateFormatter.setLenient(true);
+          dateToStr = dateFormatter.parse(dimensionValue);
+          timeValue = dateToStr.getTime();
+          validateTimeStampRange(timeValue);
+          LOGGER.info("Parsed data with lenience as true, setting back to default mode");
+          return timeValue;
+        } catch (ParseException ex) {
+          LOGGER.info("Failed to parse data with lenience as true, setting back to default mode");
+          throw new NumberFormatException(ex.getMessage());
+        } finally {
+          dateFormatter.setLenient(false);
+        }
+      } else {
+        throw new NumberFormatException(e.getMessage());
+      }
+    }
+  }
+
+  private static void validateTimeStampRange(Long timeValue) {
+    if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE
+        || timeValue > DateDirectDictionaryGenerator.MAX_VALUE) {
+      throw new NumberFormatException(
+          "timestamp column data value: " + timeValue + "is not in valid range of: "
+              + DateDirectDictionaryGenerator.MIN_VALUE + " and "
+              + DateDirectDictionaryGenerator.MAX_VALUE);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 6848626..7a9d547 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -32,6 +32,7 @@
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_MV;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT;
@@ -153,6 +154,7 @@
       case ENABLE_UNSAFE_IN_QUERY_EXECUTION:
       case ENABLE_AUTO_LOAD_MERGE:
       case CARBON_PUSH_ROW_FILTERS_FOR_VECTOR:
+      case CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE:
       case CARBON_ENABLE_INDEX_SERVER:
       case CARBON_QUERY_STAGE_INPUT:
       case CARBON_ENABLE_MV:
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8074119..95c6941 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -55,7 +55,7 @@
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonUtil, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.view.{MVSchema, MVStatus}
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
@@ -816,10 +816,13 @@
       val partitionByRdd = keyRDD.partitionBy(
         new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism))
 
+      val carbonSessionInfoBroadcast = sqlContext.sparkSession.sparkContext
+        .broadcast(ThreadLocalSessionInfo.getCarbonSessionInfo)
       // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
       // so segmentIdIndex=partitionId/parallelism, this has been verified.
       val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf)
       partitionByRdd.map(_._2).mapPartitions { partition =>
+        ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfoBroadcast.value)
         ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         val partitionId = TaskContext.getPartitionId()
         val segIdIndex = partitionId / segmentUpdateParallelism
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
index 27abdc6..72cf525 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -19,6 +19,8 @@
 
 import java.io.File
 
+import scala.collection.mutable.ListBuffer
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
@@ -26,6 +28,7 @@
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.BadRecordUtil
 
 class BadRecordActionTest extends QueryTest {
 
@@ -270,6 +273,24 @@
     }
   }
 
+  test("test bad record FAIL with invalid timestamp range") {
+    val csvPath = s"$resourcesPath/badrecords/invalidTimeStampRange.csv"
+    val rows = new ListBuffer[Array[String]]
+    rows += Array("ID", "date", "time")
+    rows += Array("1", "2016-7-24", "342016-7-24 01:02:30")
+    BadRecordUtil.createCSV(rows, csvPath)
+    sql("DROP TABLE IF EXISTS test_time")
+    sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time Timestamp) STORED AS carbondata " +
+        "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 'timestampformat'='yyyy-MM-dd HH:mm:ss') ")
+    val exception = intercept[Exception] {
+      sql(s" LOAD DATA LOCAL INPATH '$resourcesPath/badrecords/invalidTimeStampRange.csv' " +
+          s"into table test_time options ('bad_records_action'='fail')")
+    }
+    assert(exception.getMessage.contains("Data load failed due to bad record: The value with column name time and column data" +
+        " type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know the detail reason"))
+    sql("DROP TABLE IF EXISTS test_time")
+    FileUtils.forceDelete(new File(csvPath))
+  }
 
   private def currentPath: String = {
     new File(this.getClass.getResource("/").getPath + "../../")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index 57c8f4b..319ad4e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -17,9 +17,13 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import java.io.File
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
+import java.util.TimeZone
 
+import scala.collection.mutable.ListBuffer
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
@@ -29,10 +33,15 @@
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.BadRecordUtil
 
 class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterAll {
 
+  val defaultTimeZone = TimeZone.getDefault
+  val csvPath = s"$resourcesPath/differentZoneTimeStamp.csv"
+
   override def beforeAll {
+    generateCSVFile()
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
 
@@ -306,7 +315,51 @@
     }
   }
 
+  test("test load, update data with setlenient carbon property for daylight " +
+       "saving time from different timezone") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE, "true")
+    TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
+    sql("DROP TABLE IF EXISTS test_time")
+    sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time Timestamp) STORED AS carbondata " +
+        "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 'timestampformat'='yyyy-MM-dd HH:mm:ss') ")
+    sql(s" LOAD DATA LOCAL INPATH '$resourcesPath/differentZoneTimeStamp.csv' into table test_time")
+    sql(s"insert into test_time select 11, '2016-7-24', '1941-3-15 00:00:00' ")
+    sql("update test_time set (time) = ('1941-3-15 00:00:00') where ID='2'")
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 1"), Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 11"), Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 2"), Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    sql("DROP TABLE test_time")
+    CarbonProperties.getInstance().removeProperty(CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE)
+  }
+
+  test("test load, update data with setlenient session level property for daylight " +
+       "saving time from different timezone") {
+    sql("set carbon.load.dateformat.setlenient.enable = true")
+    TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
+    sql("DROP TABLE IF EXISTS test_time")
+    sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time Timestamp) STORED AS carbondata " +
+        "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 'timestampformat'='yyyy-MM-dd HH:mm:ss') ")
+    sql(s" LOAD DATA LOCAL INPATH '$resourcesPath/differentZoneTimeStamp.csv' into table test_time")
+    sql(s"insert into test_time select 11, '2016-7-24', '1941-3-15 00:00:00' ")
+    sql("update test_time set (time) = ('1941-3-15 00:00:00') where ID='2'")
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 1"), Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 11"), Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 2"), Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    sql("DROP TABLE test_time")
+    defaultConfig()
+  }
+
+  def generateCSVFile(): Unit = {
+    val rows = new ListBuffer[Array[String]]
+    rows += Array("ID", "date", "time")
+    rows += Array("1", "1941-3-15", "1941-3-15 00:00:00")
+    rows += Array("2", "2016-7-24", "2016-7-24 01:02:30")
+    BadRecordUtil.createCSV(rows, csvPath)
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS t3")
+    FileUtils.forceDelete(new File(csvPath))
+    TimeZone.setDefault(defaultTimeZone)
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
index cc7b152..3f4fa60 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
@@ -17,7 +17,9 @@
 
 package org.apache.carbondata.spark.util
 
-import java.io.{File, FileFilter}
+import java.io.{BufferedWriter, File, FileFilter, FileWriter}
+import scala.collection.mutable.ListBuffer
+import au.com.bytecode.opencsv.CSVWriter
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -68,4 +70,17 @@
     badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName
     FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(badRecordLocation))
   }
+
+  def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
+    val out = new BufferedWriter(new FileWriter(csvPath))
+    val writer: CSVWriter = new CSVWriter(out)
+    try {
+      for (row <- rows) {
+        writer.writeNext(row)
+      }
+    } finally {
+      out.close()
+      writer.close()
+    }
+  }
 }