[HUDI-1177]: fixed TaskNotSerializableException in TimestampBasedKeyGenerator (#1987)


Co-authored-by: Bhavani Sudha Saktheeswaran <bhavanisudhas@gmail.com>
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index 0209fe8..25a52fe 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -22,6 +22,7 @@
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -40,7 +41,6 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -61,7 +61,8 @@
   private final TimeUnit timeUnit;
   private final TimestampType timestampType;
   private final String outputDateFormat;
-  private DateTimeFormatter inputFormatter;
+  private transient Option<DateTimeFormatter> inputFormatter;
+  private transient DateTimeFormatter partitionFormatter;
   private final HoodieDateTimeParser parser;
 
   // TimeZone detailed settings reference
@@ -108,13 +109,8 @@
     this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
     this.outputDateTimeZone = parser.getOutputDateTimeZone();
     this.outputDateFormat = parser.getOutputDateFormat();
-    this.inputFormatter = parser.getInputFormatter();
     this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
 
-    if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
-      this.inputFormatter = parser.getInputFormatter();
-    }
-
     switch (this.timestampType) {
       case EPOCHMILLISECONDS:
         timeUnit = MILLISECONDS;
@@ -147,17 +143,28 @@
   }
 
   /**
+   * The function takes care of lazily initialising dateTimeFormatter variables only once.
+   */
+  private void initIfNeeded() {
+    if (this.inputFormatter == null) {
+      this.inputFormatter = parser.getInputFormatter();
+    }
+    if (this.partitionFormatter == null) {
+      this.partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
+      if (this.outputDateTimeZone != null) {
+        partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
+      }
+    }
+  }
+
+  /**
    * Parse and fetch partition path based on data type.
    *
    * @param partitionVal partition path object value fetched from record/row
    * @return the parsed partition path based on data type
-   * @throws ParseException on any parse exception
    */
-  private String getPartitionPath(Object partitionVal) throws ParseException {
-    DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
-    if (this.outputDateTimeZone != null) {
-      partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
-    }
+  private String getPartitionPath(Object partitionVal) {
+    initIfNeeded();
     long timeMs;
     if (partitionVal instanceof Double) {
       timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
@@ -166,13 +173,16 @@
     } else if (partitionVal instanceof Long) {
       timeMs = convertLongTimeToMillis((Long) partitionVal);
     } else if (partitionVal instanceof CharSequence) {
-      DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString());
+      if (!inputFormatter.isPresent()) {
+        throw new HoodieException("Missing inputformatter. Ensure " +  Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
+      }
+      DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
       if (this.outputDateTimeZone == null) {
         // Use the timezone that came off the date that was passed in, if it had one
         partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
       }
 
-      timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
+      timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
     } else {
       throw new HoodieNotSupportedException(
           "Unexpected type for partition field: " + partitionVal.getClass().getName());
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
index 3550193..6612f4c 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
@@ -17,10 +17,13 @@
 
 package org.apache.hudi.keygen.parser;
 
+import org.apache.hudi.common.util.Option;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormatter;
 
-public interface HoodieDateTimeParser {
+import java.io.Serializable;
+
+public interface HoodieDateTimeParser extends Serializable {
 
   /**
    * Returns the output date format in which the partition paths will be created for the hudi dataset.
@@ -32,7 +35,7 @@
    * Returns input formats in which datetime based values might be coming in incoming records.
    * @return
    */
-  DateTimeFormatter getInputFormatter();
+  Option<DateTimeFormatter> getInputFormatter();
 
   /**
    * Returns the datetime zone one should expect the incoming values into.
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
index 933e1af..11790cb 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
@@ -19,6 +19,7 @@
 
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
 import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
 import org.joda.time.DateTimeZone;
@@ -37,7 +38,6 @@
   private String configInputDateFormatList;
   private final String configInputDateFormatDelimiter;
   private final TypedProperties config;
-  private DateTimeFormatter inputFormatter;
 
   // TimeZone detailed settings reference
   // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
@@ -48,14 +48,6 @@
     DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
     this.inputDateTimeZone = getInputDateTimeZone();
     this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
-
-    TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
-    if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
-      DataSourceUtils.checkRequiredProperties(config,
-          Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
-      this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
-      inputFormatter = getInputDateFormatter();
-    }
   }
 
   private String getConfigInputDateFormatDelimiter() {
@@ -94,8 +86,16 @@
   }
 
   @Override
-  public DateTimeFormatter getInputFormatter() {
-    return this.inputFormatter;
+  public Option<DateTimeFormatter> getInputFormatter() {
+    TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
+    if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
+      DataSourceUtils.checkRequiredProperties(config,
+          Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
+      this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
+      return Option.of(getInputDateFormatter());
+    }
+
+    return Option.empty();
   }
 
   @Override