[HUDI-1177]: fixed TaskNotSerializableException in TimestampBasedKeyGenerator (#1987)
Co-authored-by: Bhavani Sudha Saktheeswaran <bhavanisudhas@gmail.com>
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index 0209fe8..25a52fe 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -22,6 +22,7 @@
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -40,7 +41,6 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -61,7 +61,8 @@
private final TimeUnit timeUnit;
private final TimestampType timestampType;
private final String outputDateFormat;
- private DateTimeFormatter inputFormatter;
+ private transient Option<DateTimeFormatter> inputFormatter;
+ private transient DateTimeFormatter partitionFormatter;
private final HoodieDateTimeParser parser;
// TimeZone detailed settings reference
@@ -108,13 +109,8 @@
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
this.outputDateTimeZone = parser.getOutputDateTimeZone();
this.outputDateFormat = parser.getOutputDateFormat();
- this.inputFormatter = parser.getInputFormatter();
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
- if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
- this.inputFormatter = parser.getInputFormatter();
- }
-
switch (this.timestampType) {
case EPOCHMILLISECONDS:
timeUnit = MILLISECONDS;
@@ -147,17 +143,28 @@
}
/**
+ * The function takes care of lazily initialising dateTimeFormatter variables only once.
+ */
+ private void initIfNeeded() {
+ if (this.inputFormatter == null) {
+ this.inputFormatter = parser.getInputFormatter();
+ }
+ if (this.partitionFormatter == null) {
+ this.partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
+ if (this.outputDateTimeZone != null) {
+ partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
+ }
+ }
+ }
+
+ /**
* Parse and fetch partition path based on data type.
*
* @param partitionVal partition path object value fetched from record/row
* @return the parsed partition path based on data type
- * @throws ParseException on any parse exception
*/
- private String getPartitionPath(Object partitionVal) throws ParseException {
- DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
- if (this.outputDateTimeZone != null) {
- partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
- }
+ private String getPartitionPath(Object partitionVal) {
+ initIfNeeded();
long timeMs;
if (partitionVal instanceof Double) {
timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
@@ -166,13 +173,16 @@
} else if (partitionVal instanceof Long) {
timeMs = convertLongTimeToMillis((Long) partitionVal);
} else if (partitionVal instanceof CharSequence) {
- DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString());
+ if (!inputFormatter.isPresent()) {
+ throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
+ }
+ DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
if (this.outputDateTimeZone == null) {
// Use the timezone that came off the date that was passed in, if it had one
partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
}
- timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
+ timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
} else {
throw new HoodieNotSupportedException(
"Unexpected type for partition field: " + partitionVal.getClass().getName());
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
index 3550193..6612f4c 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
@@ -17,10 +17,13 @@
package org.apache.hudi.keygen.parser;
+import org.apache.hudi.common.util.Option;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
-public interface HoodieDateTimeParser {
+import java.io.Serializable;
+
+public interface HoodieDateTimeParser extends Serializable {
/**
* Returns the output date format in which the partition paths will be created for the hudi dataset.
@@ -32,7 +35,7 @@
* Returns input formats in which datetime based values might be coming in incoming records.
* @return
*/
- DateTimeFormatter getInputFormatter();
+ Option<DateTimeFormatter> getInputFormatter();
/**
* Returns the datetime zone one should expect the incoming values into.
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
index 933e1af..11790cb 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
@@ -19,6 +19,7 @@
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
import org.joda.time.DateTimeZone;
@@ -37,7 +38,6 @@
private String configInputDateFormatList;
private final String configInputDateFormatDelimiter;
private final TypedProperties config;
- private DateTimeFormatter inputFormatter;
// TimeZone detailed settings reference
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
@@ -48,14 +48,6 @@
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
this.inputDateTimeZone = getInputDateTimeZone();
this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
-
- TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
- if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
- DataSourceUtils.checkRequiredProperties(config,
- Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
- this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
- inputFormatter = getInputDateFormatter();
- }
}
private String getConfigInputDateFormatDelimiter() {
@@ -94,8 +86,16 @@
}
@Override
- public DateTimeFormatter getInputFormatter() {
- return this.inputFormatter;
+ public Option<DateTimeFormatter> getInputFormatter() {
+ TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
+ if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
+ DataSourceUtils.checkRequiredProperties(config,
+ Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
+ this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
+ return Option.of(getInputDateFormatter());
+ }
+
+ return Option.empty();
}
@Override