NIFI-8768 Added toLocalDate() for convertType() handling of DATE fields
- Updated PutKudu to use DataTypeUtils.toLocalDate() for DATE fields
- Updated PutDatabaseRecord to remove convertDateToLocalTZ() since convertType() uses toLocalDate()
- Updated PutElasticsearchHttpRecord to use default time zone format for DATE fields
- Updated WriteXMLResult to use default time zone format for DATE fields
- Updated WriteJsonResult to use default time zone format for DATE fields
- Updated AvroTypeUtil to use toLocalDate() for logical DATE fields
- Updated JdbcCommon to avoid conversion to UTC for logical DATE fields
- Updated Processor and RecordReader unit tests for consistency in DATE comparison
Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #5210
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 20fc472..3331d81 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -50,9 +50,11 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
+import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -110,7 +112,7 @@
private static final Pattern FLOATING_POINT_PATTERN = Pattern.compile(doubleRegex);
private static final Pattern DECIMAL_PATTERN = Pattern.compile(decimalRegex);
- private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
+ private static final TimeZone GMT_TIME_ZONE = TimeZone.getTimeZone("GMT");
private static final Supplier<DateFormat> DEFAULT_DATE_FORMAT = () -> getDateFormat(RecordFieldType.DATE.getDefaultFormat());
private static final Supplier<DateFormat> DEFAULT_TIME_FORMAT = () -> getDateFormat(RecordFieldType.TIME.getDefaultFormat());
@@ -189,7 +191,7 @@
case CHAR:
return toCharacter(value, fieldName);
case DATE:
- return toDate(value, dateFormat, fieldName);
+ return convertTypeToDate(value, dateFormat, fieldName);
case DECIMAL:
return toBigDecimal(value, fieldName);
case DOUBLE:
@@ -1108,6 +1110,133 @@
}
/**
+ * Get Date Time Formatter using Zone Identifier
+ *
+ * @param pattern Date Format Pattern
+ * @param zoneId Time Zone Identifier
+ * @return Date Time Formatter or null when provided pattern is null
+ */
+ public static DateTimeFormatter getDateTimeFormatter(final String pattern, final ZoneId zoneId) {
+ if (pattern == null || zoneId == null) {
+ return null;
+ }
+ return DateTimeFormatter.ofPattern(pattern).withZone(zoneId);
+ }
+
+ /**
+ * Convert value to Local Date with support for conversion from numbers or formatted strings
+ *
+ * @param value Value to be converted
+ * @param formatter Supplier for Date Time Formatter can be null when string parsing is not necessary
+ * @param fieldName Field Name for value to be converted
+ * @return Local Date or null when value to be converted is null
+ * @throws IllegalTypeConversionException Thrown when conversion from string fails or unsupported value provided
+ */
+ public static LocalDate toLocalDate(final Object value, final Supplier<DateTimeFormatter> formatter, final String fieldName) {
+ LocalDate localDate;
+
+ if (value == null) {
+ return null;
+ } else if (value instanceof LocalDate) {
+ localDate = (LocalDate) value;
+ } else if (value instanceof java.sql.Date) {
+ final java.sql.Date date = (java.sql.Date) value;
+ localDate = date.toLocalDate();
+ } else if (value instanceof java.util.Date) {
+ final java.util.Date date = (java.util.Date) value;
+ localDate = parseLocalDateEpochMillis(date.getTime());
+ } else if (value instanceof Number) {
+ final long epochMillis = ((Number) value).longValue();
+ localDate = parseLocalDateEpochMillis(epochMillis);
+ } else if (value instanceof String) {
+ try {
+ localDate = parseLocalDate((String) value, formatter);
+ } catch (final RuntimeException e) {
+ final String message = String.format("Failed Conversion of Field [%s] from String [%s] to LocalDate", fieldName, value);
+ throw new IllegalTypeConversionException(message, e);
+ }
+ } else {
+ final String message = String.format("Failed Conversion of Field [%s] from Value [%s] Type [%s] to LocalDate", fieldName, value, value.getClass());
+ throw new IllegalTypeConversionException(message);
+ }
+
+ return localDate;
+ }
+
+ /**
+ * Convert value to java.sql.Date using java.time.LocalDate parsing and conversion from DateFormat to DateTimeFormatter
+ *
+ * Transitional method supporting conversion from legacy java.text.DateFormat to java.time.DateTimeFormatter
+ *
+ * @param value Value object to be converted
+ * @param format Supplier function for java.text.DateFormat when necessary for parsing
+ * @param fieldName Field name being parsed
+ * @return java.sql.Date or null when value is null
+ */
+ private static Date convertTypeToDate(final Object value, final Supplier<DateFormat> format, final String fieldName) {
+ if (value == null) {
+ return null;
+ } else {
+ final LocalDate localDate = toLocalDate(value, () -> {
+ final SimpleDateFormat dateFormat = (SimpleDateFormat) format.get();
+ return dateFormat == null ? null : DateTimeFormatter.ofPattern(dateFormat.toPattern());
+ }, fieldName);
+ return Date.valueOf(localDate);
+ }
+ }
+
+ /**
+ * Parse Local Date from String using Date Time Formatter when supplied
+ *
+ * @param value String not null containing either formatted string or number of epoch milliseconds
+ * @param formatter Supplier for Date Time Formatter
+ * @return Local Date or null when provided value is empty
+ */
+ private static LocalDate parseLocalDate(final String value, final Supplier<DateTimeFormatter> formatter) {
+ LocalDate localDate = null;
+
+ final String normalized = value.trim();
+ if (!normalized.isEmpty()) {
+ if (formatter == null) {
+ localDate = parseLocalDateEpochMillis(normalized);
+ } else {
+ final DateTimeFormatter dateTimeFormatter = formatter.get();
+ if (dateTimeFormatter == null) {
+ localDate = parseLocalDateEpochMillis(normalized);
+ } else {
+ localDate = LocalDate.parse(normalized, dateTimeFormatter);
+ }
+ }
+ }
+
+ return localDate;
+ }
+
+
+ /**
+ * Parse Local Date from string expected to contain number of epoch milliseconds
+ *
+ * @param number Number string expected to contain epoch milliseconds
+ * @return Local Date converted from epoch milliseconds
+ */
+ private static LocalDate parseLocalDateEpochMillis(final String number) {
+ final long epochMillis = Long.parseLong(number);
+ return parseLocalDateEpochMillis(epochMillis);
+ }
+
+ /**
+ * Parse Local Date from epoch milliseconds using System Default Zone Offset
+ *
+ * @param epochMillis Epoch milliseconds
+ * @return Local Date converted from epoch milliseconds
+ */
+ private static LocalDate parseLocalDateEpochMillis(final long epochMillis) {
+ final Instant instant = Instant.ofEpochMilli(epochMillis);
+ final ZonedDateTime zonedDateTime = instant.atZone(ZoneOffset.systemDefault());
+ return zonedDateTime.toLocalDate();
+ }
+
+ /**
* Converts a java.sql.Date object in local time zone (typically coming from a java.sql.ResultSet and having 00:00:00 time part)
* to UTC normalized form (storing the epoch corresponding to the UTC time with the same date/time as the input).
*
@@ -1120,19 +1249,6 @@
return new Date(zdtUTC.toInstant().toEpochMilli());
}
- /**
- * Converts a java.sql.Date object in UTC normalized form
- * to local time zone (storing the epoch corresponding to the local time with the same date/time as the input).
- *
- * @param dateUTC java.sql.Date in UTC normalized form
- * @return java.sql.Date in local time zone
- */
- public static Date convertDateToLocalTZ(Date dateUTC) {
- ZonedDateTime zdtUTC = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneOffset.UTC);
- ZonedDateTime zdtLocalTZ = zdtUTC.withZoneSameLocal(ZoneId.systemDefault());
- return new Date(zdtLocalTZ.toInstant().toEpochMilli());
- }
-
public static boolean isDateTypeCompatible(final Object value, final String format) {
if (value == null) {
return false;
@@ -1212,22 +1328,42 @@
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time for field " + fieldName);
}
- public static DateFormat getDateFormat(final String format) {
- if (format == null) {
+ /**
+ * Get Date Format using GMT Time Zone
+ *
+ * This Date Format can produce unexpected results when the system default Time Zone is not GMT
+ *
+ * @param pattern Date Format Pattern used for new SimpleDateFormat()
+ * @return Date Format or null when pattern not provided
+ */
+ public static DateFormat getDateFormat(final String pattern) {
+ if (pattern == null) {
return null;
}
- final DateFormat df = new SimpleDateFormat(format);
- df.setTimeZone(gmt);
- return df;
+ return getDateFormat(pattern, GMT_TIME_ZONE);
}
- public static DateFormat getDateFormat(final String format, final String timezoneID) {
- if (format == null || timezoneID == null) {
+ /**
+ * Get Date Format using specified Time Zone to adjust Date during processing
+ *
+ * @param pattern Date Format Pattern used for new SimpleDateFormat()
+ * @param timeZoneId Time Zone Identifier used for TimeZone.getTimeZone()
+ * @return Date Format or null when input parameters not provided
+ */
+ public static DateFormat getDateFormat(final String pattern, final String timeZoneId) {
+ if (pattern == null || timeZoneId == null) {
return null;
}
- final DateFormat df = new SimpleDateFormat(format);
- df.setTimeZone(TimeZone.getTimeZone(timezoneID));
- return df;
+ return getDateFormat(pattern, TimeZone.getTimeZone(timeZoneId));
+ }
+
+ private static DateFormat getDateFormat(final String pattern, final TimeZone timeZone) {
+ if (pattern == null) {
+ return null;
+ }
+ final DateFormat dateFormat = new SimpleDateFormat(pattern);
+ dateFormat.setTimeZone(timeZone);
+ return dateFormat;
}
public static boolean isTimeTypeCompatible(final Object value, final String format) {
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index a1aae73..2115e67 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -30,10 +30,15 @@
import java.sql.Date;
import java.sql.Timestamp;
import java.sql.Types;
+import java.text.DateFormat;
import java.time.Instant;
+import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -42,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.function.Function;
@@ -56,6 +62,16 @@
import static org.junit.Assert.assertTrue;
public class TestDataTypeUtils {
+ private static final ZoneId SYSTEM_DEFAULT_ZONE_ID = ZoneOffset.systemDefault();
+
+ private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01";
+
+ private static final String CUSTOM_MONTH_DAY_YEAR = "01-01-2000";
+
+ private static final String CUSTOM_MONTH_DAY_YEAR_PATTERN = "MM-dd-yyyy";
+
+ private static final String DATE_FIELD = "date";
+
/**
* This is a unit test to verify conversion java Date objects to Timestamps. Support for this was
* required in order to help the MongoDB packages handle date/time logical types in the Record API.
@@ -923,23 +939,73 @@
assertEquals(0, zdt.getNano());
}
+ /**
+ * Convert String to java.sql.Date using implicit default DateFormat with GMT Time Zone
+ *
+ * Running this method on a system with a time zone other than GMT should return the same year-month-day
+ */
@Test
- public void testConvertDateToLocalTZ() {
- int year = 2021;
- int month = 1;
- int dayOfMonth = 25;
+ public void testConvertTypeStringToDateDefaultTimeZoneFormat() {
+ final Object converted = DataTypeUtils.convertType(ISO_8601_YEAR_MONTH_DAY, RecordFieldType.DATE.getDataType(), DATE_FIELD);
+ assertTrue("Converted value is not java.sql.Date", converted instanceof java.sql.Date);
+ assertEquals(ISO_8601_YEAR_MONTH_DAY, converted.toString());
+ }
- Date dateUTC = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.of("UTC")).toInstant().toEpochMilli());
+ /**
+ * Convert String to java.sql.Date using custom pattern DateFormat with configured GMT Time Zone
+ */
+ @Test
+ public void testConvertTypeStringToDateConfiguredTimeZoneFormat() {
+ final DateFormat dateFormat = DataTypeUtils.getDateFormat(CUSTOM_MONTH_DAY_YEAR_PATTERN, "GMT");
+ final Object converted = DataTypeUtils.convertType(CUSTOM_MONTH_DAY_YEAR, RecordFieldType.DATE.getDataType(), () -> dateFormat, null, null,"date");
+ assertTrue("Converted value is not java.sql.Date", converted instanceof java.sql.Date);
+ assertEquals(ISO_8601_YEAR_MONTH_DAY, converted.toString());
+ }
- Date dateLocalTZ = DataTypeUtils.convertDateToLocalTZ(dateUTC);
+ /**
+ * Convert String to java.sql.Date using custom pattern DateFormat with system default Time Zone
+ */
+ @Test
+ public void testConvertTypeStringToDateConfiguredSystemDefaultTimeZoneFormat() {
+ final DateFormat dateFormat = DataTypeUtils.getDateFormat(CUSTOM_MONTH_DAY_YEAR_PATTERN, TimeZone.getDefault().getID());
+ final Object converted = DataTypeUtils.convertType(CUSTOM_MONTH_DAY_YEAR, RecordFieldType.DATE.getDataType(), () -> dateFormat, null, null,"date");
+ assertTrue("Converted value is not java.sql.Date", converted instanceof java.sql.Date);
+ assertEquals(ISO_8601_YEAR_MONTH_DAY, converted.toString());
+ }
- ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
- assertEquals(year, zdt.getYear());
- assertEquals(month, zdt.getMonthValue());
- assertEquals(dayOfMonth, zdt.getDayOfMonth());
- assertEquals(0, zdt.getHour());
- assertEquals(0, zdt.getMinute());
- assertEquals(0, zdt.getSecond());
- assertEquals(0, zdt.getNano());
+ @Test
+ public void testToLocalDateFromString() {
+ assertToLocalDateEquals(ISO_8601_YEAR_MONTH_DAY, ISO_8601_YEAR_MONTH_DAY);
+ }
+
+ @Test
+ public void testToLocalDateFromSqlDate() {
+ assertToLocalDateEquals(ISO_8601_YEAR_MONTH_DAY, java.sql.Date.valueOf(ISO_8601_YEAR_MONTH_DAY));
+ }
+
+ @Test
+ public void testToLocalDateFromUtilDate() {
+ final LocalDate localDate = LocalDate.parse(ISO_8601_YEAR_MONTH_DAY);
+ final long epochMillis = toEpochMilliSystemDefaultZone(localDate);
+ assertToLocalDateEquals(ISO_8601_YEAR_MONTH_DAY, new java.util.Date(epochMillis));
+ }
+
+ @Test
+ public void testToLocalDateFromNumberEpochMillis() {
+ final LocalDate localDate = LocalDate.parse(ISO_8601_YEAR_MONTH_DAY);
+ final long epochMillis = toEpochMilliSystemDefaultZone(localDate);
+ assertToLocalDateEquals(ISO_8601_YEAR_MONTH_DAY, epochMillis);
+ }
+
+ private long toEpochMilliSystemDefaultZone(final LocalDate localDate) {
+ final LocalTime localTime = LocalTime.of(0, 0);
+ final Instant instantSystemDefaultZone = ZonedDateTime.of(localDate, localTime, SYSTEM_DEFAULT_ZONE_ID).toInstant();
+ return instantSystemDefaultZone.toEpochMilli();
+ }
+
+ private void assertToLocalDateEquals(final String expected, final Object value) {
+ final DateTimeFormatter systemDefaultZoneFormatter = DataTypeUtils.getDateTimeFormatter(RecordFieldType.DATE.getDefaultFormat(), SYSTEM_DEFAULT_ZONE_ID);
+ final LocalDate localDate = DataTypeUtils.toLocalDate(value, () -> systemDefaultZoneFormatter, DATE_FIELD);
+ assertEquals(String.format("Value Class [%s] to LocalDate not matched", value.getClass()), expected, localDate.toString());
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index fd28b49..b9f2e27 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -78,6 +78,7 @@
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -645,7 +646,8 @@
switch (chosenDataType.getFieldType()) {
case DATE: {
- final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat));
+ // Use SimpleDateFormat with system default time zone for string conversion
+ final String stringValue = DataTypeUtils.toString(coercedValue, () -> new SimpleDateFormat(dateFormat));
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index bea9995..c5a621b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -818,7 +818,7 @@
parser.addSchemaField("amount", RecordFieldType.DECIMAL);
for(int i=1; i<=numRecords; i++) {
- parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L), new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
+ parser.addRecord(i, "reç" + i, 100 + i, Date.valueOf("2018-12-20"), new Time(68150000), new Timestamp(1545332150000L), new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index ed22b69..9862383 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -398,7 +398,14 @@
} else {
rec.put(i-1, value);
}
-
+ } else if (javaSqlType == DATE) {
+ if (options.useLogicalTypes) {
+ // Handle SQL DATE fields using system default time zone without conversion
+ rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
+ } else {
+ // As string for backward compatibility.
+ rec.put(i - 1, value.toString());
+ }
} else if (value instanceof java.sql.Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
index d4e5e41..c72ff8e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
@@ -62,22 +62,12 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAccessor;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
@@ -751,7 +741,7 @@
testConvertToAvroStreamForDateTime(options,
(record, date) -> {
- final int expectedDaysSinceEpoch = (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date.toLocalDate());
+ final int expectedDaysSinceEpoch = (int) date.toLocalDate().toEpochDay();
final int actualDaysSinceEpoch = (int) record.get("date");
LOGGER.debug("comparing days since epoch, expecting '{}', actual '{}'", expectedDaysSinceEpoch, actualDaysSinceEpoch);
assertEquals(expectedDaysSinceEpoch, actualDaysSinceEpoch);
@@ -781,48 +771,22 @@
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
- // create a ZonedDateTime (UTC) given a formatting pattern and a date/time string
- BiFunction<String, String, ZonedDateTime> toZonedDateTime = (format, dateStr) -> {
- DateTimeFormatterBuilder dateTimeFormatterBuilder = new DateTimeFormatterBuilder().appendPattern(format);
- TemporalAccessor temporalAccessor = DateTimeFormatter.ofPattern(format).parse(dateStr);
- if (!temporalAccessor.isSupported(ChronoField.EPOCH_DAY)) {
- ZonedDateTime utcNow = LocalDateTime.now().atZone(ZoneId.systemDefault());
- dateTimeFormatterBuilder.parseDefaulting(ChronoField.DAY_OF_MONTH, utcNow.getDayOfMonth())
- .parseDefaulting(ChronoField.MONTH_OF_YEAR, utcNow.getMonthValue())
- .parseDefaulting(ChronoField.YEAR, utcNow.getYear());
-
- }
- if (!temporalAccessor.isSupported(ChronoField.MILLI_OF_SECOND)) {
- dateTimeFormatterBuilder.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
- .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
- .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0);
- }
- DateTimeFormatter formatter = dateTimeFormatterBuilder.toFormatter();
- LocalDateTime dateTime = LocalDateTime.parse(dateStr, formatter);
- ZonedDateTime zonedDateTime = dateTime.atZone(ZoneOffset.UTC).withZoneSameInstant(ZoneOffset.UTC);
- LOGGER.debug("calculated ZonedDateTime '{}' from format '{}', date/time string '{}'", zonedDateTime, format, dateStr);
- return zonedDateTime;
- };
-
when(metadata.getColumnCount()).thenReturn(3);
when(metadata.getTableName(anyInt())).thenReturn("table");
when(metadata.getColumnType(1)).thenReturn(Types.DATE);
when(metadata.getColumnName(1)).thenReturn("date");
- ZonedDateTime parsedDate = toZonedDateTime.apply("yyyy/MM/dd", "2017/05/10");
- final java.sql.Date date = java.sql.Date.valueOf(parsedDate.toLocalDate());
+ final java.sql.Date date = java.sql.Date.valueOf("2017-05-10");
when(rs.getObject(1)).thenReturn(date);
when(metadata.getColumnType(2)).thenReturn(Types.TIME);
when(metadata.getColumnName(2)).thenReturn("time");
- ZonedDateTime parsedTime = toZonedDateTime.apply("HH:mm:ss.SSS", "12:34:56.789");
- final Time time = Time.valueOf(parsedTime.toLocalTime());
+ final Time time = Time.valueOf("12:34:56");
when(rs.getObject(2)).thenReturn(time);
when(metadata.getColumnType(3)).thenReturn(Types.TIMESTAMP);
when(metadata.getColumnName(3)).thenReturn("timestamp");
- ZonedDateTime parsedDateTime = toZonedDateTime.apply("yyyy/MM/dd HH:mm:ss.SSS", "2017/05/11 19:59:39.123");
- final Timestamp timestamp = Timestamp.valueOf(parsedDateTime.toLocalDateTime());
+ final Timestamp timestamp = Timestamp.valueOf("2017-05-11 19:59:39");
when(rs.getObject(3)).thenReturn(timestamp);
final AtomicInteger counter = new AtomicInteger(1);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 6003e19..286b418 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -62,7 +62,8 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
-import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.ArrayList;
@@ -680,8 +681,8 @@
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
- final java.sql.Date date = DataTypeUtils.toDate(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
- return (int) ChronoUnit.DAYS.between(Instant.EPOCH, Instant.ofEpochMilli(date.getTime()));
+ final LocalDate localDate = DataTypeUtils.toLocalDate(rawValue, () -> DataTypeUtils.getDateTimeFormatter(format, ZoneId.systemDefault()), fieldName);
+ return (int) localDate.toEpochDay();
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final Time time = DataTypeUtils.toTime(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
@@ -1032,7 +1033,7 @@
final String logicalName = logicalType.getName();
if (LOGICAL_TYPE_DATE.equals(logicalName)) {
// date logical name means that the value is number of days since Jan 1, 1970
- return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
+ return java.sql.Date.valueOf(LocalDate.ofEpochDay((int) value));
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalName)) {
// time-millis logical name means that the value is number of milliseconds since midnight.
return new java.sql.Time((int) value);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index b0e4a23..bc2e65f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -52,13 +52,11 @@
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -521,17 +519,15 @@
@Test
public void testDateConversion() {
- final Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- c.set(2019, Calendar.JANUARY, 1, 0, 0, 0);
- c.set(Calendar.MILLISECOND, 0);
- final long epochMillis = c.getTimeInMillis();
+ final String date = "2019-01-01";
final LogicalTypes.Date dateType = LogicalTypes.date();
final Schema fieldSchema = Schema.create(Type.INT);
dateType.addToSchema(fieldSchema);
- final Object convertedValue = AvroTypeUtil.convertToAvroObject(new Date(epochMillis), fieldSchema);
+ final Object convertedValue = AvroTypeUtil.convertToAvroObject(Date.valueOf(date), fieldSchema);
assertTrue(convertedValue instanceof Integer);
- assertEquals(LocalDate.of(2019, 1, 1).toEpochDay(), (int) convertedValue);
+ final int epochDay = (int) LocalDate.parse(date).toEpochDay();
+ assertEquals(epochDay, convertedValue);
}
@Test
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 774430e..8724254 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -57,8 +57,8 @@
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -427,17 +427,8 @@
* @return Date object or null when value is null
*/
private Date getDate(final Object value, final String recordFieldName, final String format) {
- return DataTypeUtils.toDate(value, () -> getDateFormat(format), recordFieldName);
- }
-
- /**
- * Get Date Format using Date Record Field default pattern and system time zone to avoid unnecessary conversion
- *
- * @param format Date Format Pattern
- * @return Date Format used to parsing date fields
- */
- private DateFormat getDateFormat(final String format) {
- return new SimpleDateFormat(format);
+ final LocalDate localDate = DataTypeUtils.toLocalDate(value, () -> DataTypeUtils.getDateTimeFormatter(format, ZoneId.systemDefault()), recordFieldName);
+ return Date.valueOf(localDate);
}
/**
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 1d397d9..f02917f 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -62,6 +62,8 @@
import java.io.InputStream;
import java.math.BigDecimal;
import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -78,7 +80,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -89,6 +91,10 @@
public static final String SKIP_HEAD_LINE = "false";
public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal,dateVal";
+ private static final String DATE_FIELD = "created";
+ private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01";
+ private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd";
+
private TestRunner testRunner;
private MockPutKudu processor;
@@ -98,13 +104,13 @@
private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
@Before
- public void setUp() throws InitializationException {
+ public void setUp() {
processor = new MockPutKudu();
testRunner = TestRunners.newTestRunner(processor);
setUpTestRunner(testRunner);
}
- private void setUpTestRunner(TestRunner testRunner) throws InitializationException {
+ private void setUpTestRunner(TestRunner testRunner) {
testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
@@ -130,7 +136,7 @@
readerFactory.addSchemaField("dateVal", RecordFieldType.DATE);
for (int i = 0; i < numOfRecord; i++) {
readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i,
- new BigDecimal(111.111D).add(BigDecimal.valueOf(i)), today);
+ new BigDecimal("111.111").add(BigDecimal.valueOf(i)), today);
}
testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -164,7 +170,7 @@
}
@Test
- public void testWriteKuduWithDefaults() throws IOException, InitializationException {
+ public void testWriteKuduWithDefaults() throws InitializationException {
createRecordReader(100);
final String filename = "testWriteKudu-" + System.currentTimeMillis();
@@ -251,7 +257,7 @@
}
@Test
- public void testValidSchemaShouldBeSuccessful() throws InitializationException, IOException {
+ public void testValidSchemaShouldBeSuccessful() throws InitializationException {
createRecordReader(10);
final String filename = "testValidSchemaShouldBeSuccessful-" + System.currentTimeMillis();
@@ -266,9 +272,8 @@
}
@Test
- public void testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed() throws InitializationException, IOException {
- // given
- processor.setTableSchema(new Schema(Arrays.asList()));
+ public void testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed() throws InitializationException {
+ processor.setTableSchema(new Schema(Collections.emptyList()));
createRecordReader(5);
final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis();
@@ -278,10 +283,8 @@
testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true");
testRunner.enqueue("trigger", flowFileAttributes);
- // when
testRunner.run();
- // then
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
}
@@ -311,7 +314,7 @@
}
@Test
- public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
+ public void testReadAsStringAndWriteAsInt() throws InitializationException {
createRecordReader(0);
// add the favorite color as a string
readerFactory.addRecord(1, "name0", "0", "89.89", "111.111", today);
@@ -327,11 +330,11 @@
}
@Test
- public void testMissingColumInReader() throws InitializationException, IOException {
+ public void testMissingColumnInReader() throws InitializationException {
createRecordReader(0);
readerFactory.addRecord("name0", "0", "89.89"); //missing id
- final String filename = "testMissingColumInReader-" + System.currentTimeMillis();
+ final String filename = "testMissingColumnInReader-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
@@ -345,7 +348,7 @@
@Test
public void testInsertManyFlowFiles() throws Exception {
createRecordReader(50);
- final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"valu11\" }";
+ final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value11\" }";
final String content2 = "{ \"field1\" : \"value1\", \"field2\" : \"value11\" }";
final String content3 = "{ \"field1\" : \"value3\", \"field2\" : \"value33\" }";
@@ -475,33 +478,45 @@
// Comparing string representations of dates, because java.sql.Date does not override
// java.util.Date.equals method and therefore compares milliseconds instead of
// comparing dates, even though java.sql.Date is supposed to ignore time
- Assert.assertEquals(String.format("Expecting the date to be %s, but got %s", today.toString(), row.getDate("sql_date").toString()),
+ Assert.assertEquals(String.format("Expecting the date to be %s, but got %s", today, row.getDate("sql_date").toString()),
row.getDate("sql_date").toString(), today.toString());
}
@Test
- public void testBuildPartialRowWithDateString() {
- final String dateFieldName = "created";
- final String dateFieldValue = "2000-01-01";
+ public void testBuildPartialRowWithDateDefaultTimeZone() throws ParseException {
+ final SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_8601_YEAR_MONTH_DAY_PATTERN);
+ final java.util.Date dateFieldValue = dateFormat.parse(ISO_8601_YEAR_MONTH_DAY);
+ assertPartialRowDateFieldEquals(dateFieldValue);
+ }
+
+ @Test
+ public void testBuildPartialRowWithDateString() {
+ assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY);
+ }
+
+ private void assertPartialRowDateFieldEquals(final Object dateFieldValue) {
+ final PartialRow row = buildPartialRowDateField(dateFieldValue);
+ final java.sql.Date rowDate = row.getDate(DATE_FIELD);
+ assertEquals("Partial Row Date Field not matched", ISO_8601_YEAR_MONTH_DAY, rowDate.toString());
+ }
+
+ private PartialRow buildPartialRowDateField(final Object dateFieldValue) {
final Schema kuduSchema = new Schema(Collections.singletonList(
- new ColumnSchema.ColumnSchemaBuilder(dateFieldName, Type.DATE).nullable(true).build()
+ new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, Type.DATE).nullable(true).build()
));
final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
- new RecordField(dateFieldName, RecordFieldType.DATE.getDataType())
+ new RecordField(DATE_FIELD, RecordFieldType.DATE.getDataType())
));
final Map<String, Object> values = new HashMap<>();
- values.put(dateFieldName, dateFieldValue);
+ values.put(DATE_FIELD, dateFieldValue);
final MapRecord record = new MapRecord(schema, values);
final PartialRow row = kuduSchema.newPartialRow();
-
processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true);
-
- final java.sql.Date rowDate = row.getDate(dateFieldName);
- assertEquals("Partial Row Date Field not matched", dateFieldValue, rowDate.toString());
+ return row;
}
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 4626766..bc8a1f0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -67,7 +67,6 @@
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -758,11 +757,6 @@
}
}
- if (sqlType == Types.DATE && currentValue instanceof Date) {
- // convert Date from the internal UTC normalized form to local time zone needed by database drivers
- currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue);
- }
-
// If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 623e8d1..d7817ff 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -248,14 +248,12 @@
parser.addSchemaField("dt", RecordFieldType.DATE)
LocalDate testDate1 = LocalDate.of(2021, 1, 26)
- Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in UTC
Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26)
- Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in URC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
- parser.addRecord(1, 'rec1', 101, nifiDate1)
- parser.addRecord(2, 'rec2', 102, nifiDate2)
+ parser.addRecord(1, 'rec1', 101, jdbcDate1)
+ parser.addRecord(2, 'rec2', 102, jdbcDate2)
parser.addRecord(3, 'rec3', 103, null)
parser.addRecord(4, 'rec4', 104, null)
parser.addRecord(5, null, 105, null)
@@ -275,12 +273,12 @@
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
- assertEquals(jdbcDate1, rs.getDate(4))
+ assertEquals(jdbcDate1.toString(), rs.getDate(4).toString())
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(102, rs.getInt(3))
- assertEquals(jdbcDate2, rs.getDate(4))
+ assertEquals(jdbcDate2.toString(), rs.getDate(4).toString())
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
@@ -314,14 +312,12 @@
parser.addSchemaField("dt", RecordFieldType.DATE)
LocalDate testDate1 = LocalDate.of(2021, 1, 26)
- Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in UTC
Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26)
- Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in URC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
- parser.addRecord(1, 'rec1', nifiDate1)
- parser.addRecord(2, 'rec2', nifiDate2)
+ parser.addRecord(1, 'rec1', jdbcDate1)
+ parser.addRecord(2, 'rec2', jdbcDate2)
parser.addRecord(3, 'rec3', null)
parser.addRecord(4, 'rec4', null)
parser.addRecord(5, null, null)
@@ -342,12 +338,12 @@
assertEquals('rec1', rs.getString(2))
// Zero value because of the constraint
assertEquals(0, rs.getInt(3))
- assertEquals(jdbcDate1, rs.getDate(4))
+ assertEquals(jdbcDate1.toString(), rs.getDate(4).toString())
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(0, rs.getInt(3))
- assertEquals(jdbcDate2, rs.getDate(4))
+ assertEquals(jdbcDate2.toString(), rs.getDate(4).toString())
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
@@ -1274,11 +1270,12 @@
parser.addSchemaField("dt", RecordFieldType.BIGINT)
LocalDate testDate1 = LocalDate.of(2021, 1, 26)
- BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+ BigInteger nifiDate1 = jdbcDate1.getTime() // in local TZ
+
LocalDate testDate2 = LocalDate.of(2021, 7, 26)
- BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+ BigInteger nifiDate2 = jdbcDate2.getTime() // in local TZ
parser.addRecord(1, 'rec1', 101, nifiDate1)
parser.addRecord(2, 'rec2', 102, nifiDate2)
@@ -1301,12 +1298,12 @@
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
- assertEquals(jdbcDate1, rs.getDate(4))
+ assertEquals(jdbcDate1.toString(), rs.getDate(4).toString())
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(102, rs.getInt(3))
- assertEquals(jdbcDate2, rs.getDate(4))
+ assertEquals(jdbcDate2.toString(), rs.getDate(4).toString())
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 80c8512..8d6fe93 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -44,6 +44,7 @@
import java.io.OutputStream;
import java.math.BigInteger;
import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -80,7 +81,8 @@
this.outputGrouping = outputGrouping;
this.mimeType = mimeType;
- final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+ // Use DateFormat with default TimeZone to avoid unexpected conversion of year-month-day
+ final DateFormat df = dateFormat == null ? null : new SimpleDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
index dc40e75..3903ca2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
@@ -41,6 +41,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
@@ -105,7 +106,8 @@
this.allowWritingMultipleRecords = !(this.rootTagName == null);
hasWrittenRecord = false;
- final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+ // Use DateFormat with default TimeZone to avoid unexpected conversion of year-month-day
+ final DateFormat df = dateFormat == null ? null : new SimpleDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index d310d4f..1afedab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -32,6 +32,7 @@
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -47,7 +48,6 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
@@ -61,13 +61,13 @@
@Test
- public void testLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
+ public void testLogicalTypes() throws IOException, ParseException, MalformedRecordException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
testLogicalTypes(schema);
}
@Test
- public void testNullableLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
+ public void testNullableLogicalTypes() throws IOException, ParseException, MalformedRecordException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types-nullable.avsc"));
testLogicalTypes(schema);
}
@@ -75,6 +75,7 @@
private void testLogicalTypes(Schema schema) throws ParseException, IOException, MalformedRecordException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final int epochDay = 17260;
final String expectedTime = "2017-04-04 14:20:33.000";
final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
@@ -95,7 +96,7 @@
record.put("timeMicros", millisSinceMidnight * 1000L);
record.put("timestampMillis", timeLong);
record.put("timestampMicros", timeLong * 1000L);
- record.put("date", 17260);
+ record.put("date", epochDay);
record.put("decimal", ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray()));
writer.append(record);
@@ -120,15 +121,15 @@
assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMicros"));
assertEquals(new java.sql.Timestamp(timeLong), record.getValue("timestampMillis"));
assertEquals(new java.sql.Timestamp(timeLong), record.getValue("timestampMicros"));
- final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
- noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
- assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date")));
+
+ final Object date = record.getValue("date");
+ assertEquals(LocalDate.ofEpochDay(epochDay).toString(), date.toString());
assertEquals(bigDecimal, record.getValue("decimal"));
}
}
@Test
- public void testDataTypes() throws IOException, MalformedRecordException, SchemaNotFoundException {
+ public void testDataTypes() throws IOException, MalformedRecordException {
final List<Field> accountFields = new ArrayList<>();
accountFields.add(new Field("accountId", Schema.create(Type.LONG), null, (Object) null));
accountFields.add(new Field("accountName", Schema.create(Type.STRING), null, (Object) null));
@@ -291,7 +292,7 @@
}
@Test
- public void testMultipleTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
+ public void testMultipleTypes() throws IOException, MalformedRecordException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/multiple-types.avsc"));
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index 33f350c..99fcd3c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -37,14 +37,12 @@
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
-import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
@@ -92,6 +90,7 @@
@Test
public void testDate() throws IOException, MalformedRecordException {
+ final String dateValue = "1983-11-30";
final String text = "date\n11/30/1983";
final List<RecordField> fields = new ArrayList<>();
@@ -104,13 +103,8 @@
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
final Record record = reader.nextRecord(coerceTypes, false);
- final java.sql.Date date = (Date) record.getValue("date");
- final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
- calendar.setTimeInMillis(date.getTime());
-
- assertEquals(1983, calendar.get(Calendar.YEAR));
- assertEquals(10, calendar.get(Calendar.MONTH));
- assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ final Object date = record.getValue("date");
+ assertEquals(java.sql.Date.valueOf(dateValue), date);
}
}
}
@@ -137,6 +131,7 @@
@Test
public void testDateNoCoersionExpectedFormat() throws IOException, MalformedRecordException {
+ final String dateValue = "1983-11-30";
final String text = "date\n11/30/1983";
final List<RecordField> fields = new ArrayList<>();
@@ -148,13 +143,8 @@
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
final Record record = reader.nextRecord(false, false);
- final java.sql.Date date = (Date) record.getValue("date");
- final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
- calendar.setTimeInMillis(date.getTime());
-
- assertEquals(1983, calendar.get(Calendar.YEAR));
- assertEquals(10, calendar.get(Calendar.MONTH));
- assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ final Object date = record.getValue("date");
+ assertEquals(java.sql.Date.valueOf(dateValue), date);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
index 8026da0..c984464 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
@@ -35,11 +35,8 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.sql.Date;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.List;
-import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -85,6 +82,7 @@
@Test
public void testDate() throws IOException, MalformedRecordException {
+ final String dateValue = "1983-11-30";
final String text = "date\n11/30/1983";
final List<RecordField> fields = new ArrayList<>();
@@ -96,13 +94,8 @@
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
final Record record = reader.nextRecord();
- final Date date = (Date) record.getValue("date");
- final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
- calendar.setTimeInMillis(date.getTime());
-
- assertEquals(1983, calendar.get(Calendar.YEAR));
- assertEquals(10, calendar.get(Calendar.MONTH));
- assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ final Object date = record.getValue("date");
+ assertEquals(java.sql.Date.valueOf(dateValue), date);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index e93cd39..eacff95 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -44,6 +44,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -450,6 +451,26 @@
}
}
+ @Test
+ public void testDateCoercedFromString() throws IOException, MalformedRecordException {
+ final String dateField = "date";
+ final List<RecordField> recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+ final String date = "2000-01-01";
+ final String datePattern = "yyyy-MM-dd";
+ final String json = String.format("{ \"%s\": \"%s\" }", dateField, date);
+ for (final boolean coerceTypes : new boolean[] {true, false}) {
+ try (final InputStream in = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, timeFormat, timestampFormat)) {
+
+ final Record record = reader.nextRecord(coerceTypes, false);
+ final Object value = record.getValue(dateField);
+ assertTrue("With coerceTypes set to " + coerceTypes + ", value is not a Date", value instanceof java.sql.Date);
+ assertEquals(date, value.toString());
+ }
+ }
+ }
@Test
public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index e77896f..b1f0582 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -96,7 +96,7 @@
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
valueMap.put("decimal", BigDecimal.valueOf(8.1D));
- valueMap.put("date", new Date(time));
+ valueMap.put("date", Date.valueOf("2017-01-01"));
valueMap.put("time", new Time(time));
valueMap.put("timestamp", new Timestamp(time));
valueMap.put("record", null);
@@ -166,7 +166,8 @@
final Map<String, Object> values = new HashMap<>();
values.put("timestamp", new java.sql.Timestamp(37293723L));
values.put("time", new java.sql.Time(37293723L));
- values.put("date", new java.sql.Date(37293723L));
+ final java.sql.Date date = java.sql.Date.valueOf("1970-01-01");
+ values.put("date", date);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
@@ -186,7 +187,7 @@
final byte[] data = baos.toByteArray();
- final String expected = "[{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}]";
+ final String expected = String.format("[{\"timestamp\":37293723,\"time\":37293723,\"date\":%d}]", date.getTime());
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
@@ -442,7 +443,9 @@
final Map<String, Object> values1 = new HashMap<>();
values1.put("timestamp", new java.sql.Timestamp(37293723L));
values1.put("time", new java.sql.Time(37293723L));
- values1.put("date", new java.sql.Date(37293723L));
+
+ final java.sql.Date date = java.sql.Date.valueOf("1970-01-01");
+ values1.put("date", date);
final List<RecordField> fields1 = new ArrayList<>();
fields1.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
@@ -456,7 +459,7 @@
final Map<String, Object> values2 = new HashMap<>();
values2.put("timestamp", new java.sql.Timestamp(37293999L));
values2.put("time", new java.sql.Time(37293999L));
- values2.put("date", new java.sql.Date(37293999L));
+ values2.put("date", date);
final Record record2 = new MapRecord(schema, values2);
@@ -471,7 +474,8 @@
final byte[] data = baos.toByteArray();
- final String expected = "{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}\n{\"timestamp\":37293999,\"time\":37293999,\"date\":37293999}";
+ final long dateTime = date.getTime();
+ final String expected = String.format("{\"timestamp\":37293723,\"time\":37293723,\"date\":%d}\n{\"timestamp\":37293999,\"time\":37293999,\"date\":%d}", dateTime, dateTime);
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
index da3adef..b7521f0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
@@ -173,6 +173,7 @@
final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long time = df.parse("2017/01/01 17:00:00.000").getTime();
+ final String date = "2017-01-01";
final Map<String, Object> map = new LinkedHashMap<>();
map.put("height", 48);
@@ -190,7 +191,7 @@
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
valueMap.put("decimal", 8.1D);
- valueMap.put("date", new Date(time));
+ valueMap.put("date", Date.valueOf(date));
valueMap.put("time", new Time(time));
valueMap.put("timestamp", new Timestamp(time));
valueMap.put("record", null);