NIFI-8748 Corrected PutKudu String to java.sql.Date parsing
- Added getDateFormat() using default time zone instead of GMT time zone from DataTypeUtils.getDateFormat()
NIFI-8748 Adjusted Date Format to use DataType.getFormat()
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5194.
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index b0d4566..774430e 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -55,11 +55,15 @@
import javax.security.auth.login.LoginException;
import java.math.BigDecimal;
+import java.sql.Date;
import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
@@ -114,7 +118,7 @@
.displayName("Kudu Operation Timeout")
.description("Default timeout used for user operations (using sessions and scanners)")
.required(false)
- .defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + "ms")
+ .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@@ -124,7 +128,7 @@
.displayName("Kudu Keep Alive Period Timeout")
.description("Default timeout used for user operations")
.required(false)
- .defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) + "ms")
+ .defaultValue(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS + "ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@@ -403,7 +407,9 @@
row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
break;
case DATE:
- row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
+ final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
+ final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat() : RecordFieldType.DATE.getDefaultFormat();
+ row.addDate(columnIndex, getDate(value, recordFieldName, format));
break;
default:
throw new IllegalStateException(String.format("unknown column type %s", colType));
@@ -413,6 +419,28 @@
}
/**
+ * Get java.sql.Date from Record Field Value with optional parsing when input value is a String
+ *
+ * @param value Record Field Value
+ * @param recordFieldName Record Field Name
+ * @param format Date Format Pattern
+ * @return Date object or null when value is null
+ */
+ private Date getDate(final Object value, final String recordFieldName, final String format) {
+ return DataTypeUtils.toDate(value, () -> getDateFormat(format), recordFieldName);
+ }
+
+ /**
+ * Get Date Format using Date Record Field default pattern and system time zone to avoid unnecessary conversion
+ *
+ * @param format Date Format Pattern
+ * @return Date Format used to parsing date fields
+ */
+ private DateFormat getDateFormat(final String format) {
+ return new SimpleDateFormat(format);
+ }
+
+ /**
* Converts a NiFi DataType to it's equivalent Kudu Type.
*/
private Type toKuduType(DataType nifiType) {
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 0c739ea..1d397d9 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -75,6 +75,7 @@
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -478,6 +479,31 @@
row.getDate("sql_date").toString(), today.toString());
}
+ @Test
+ public void testBuildPartialRowWithDateString() {
+ final String dateFieldName = "created";
+ final String dateFieldValue = "2000-01-01";
+
+ final Schema kuduSchema = new Schema(Collections.singletonList(
+ new ColumnSchema.ColumnSchemaBuilder(dateFieldName, Type.DATE).nullable(true).build()
+ ));
+
+ final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField(dateFieldName, RecordFieldType.DATE.getDataType())
+ ));
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put(dateFieldName, dateFieldValue);
+ final MapRecord record = new MapRecord(schema, values);
+
+ final PartialRow row = kuduSchema.newPartialRow();
+
+ processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true);
+
+ final java.sql.Date rowDate = row.getDate(dateFieldName);
+ assertEquals("Partial Row Date Field not matched", dateFieldValue, rowDate.toString());
+ }
+
private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) {
final Schema kuduSchema = new Schema(Arrays.asList(
new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),