NIFI-8442 Add a new test with date, timestamp and time as string & New management of date, time and timestamp
NIFI-8442 Put DateTimeFormatter as static and Add comments to explain why ZoneOffset.UTC is required

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5014.
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 6d5cb58..b1a4117 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -126,6 +126,9 @@
                         <exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
                         <exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
                         <exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
+                        <exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
+                        <exclude>src/test/resources/bigquery/streaming-correct-data-with-date.json</exclude>
+                        <exclude>src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
@@ -149,4 +152,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
index 98457a3..d8d9b3d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
@@ -18,10 +18,17 @@
 package org.apache.nifi.processors.gcp.bigquery;
 
 import java.io.InputStream;
-import java.util.ArrayList;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SystemResource;
@@ -93,6 +100,9 @@
             .defaultValue("false")
             .build();
 
+    private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+    private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
+
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor> builder()
@@ -191,6 +201,18 @@
                     lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
                 }
                 result.put(key, lmapr);
+            } else if (obj instanceof Timestamp) {
+                // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
+                // the local system time zone to the GMT time zone
+                LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), ZoneOffset.UTC);
+                result.put(key, dateTime.format(timestampFormatter));
+            } else if (obj instanceof Time) {
+                // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
+                // the local system time zone to the GMT time zone
+                LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
+                result.put(key, dateTime.format(timeFormatter) );
+            } else if (obj instanceof Date) {
+                result.put(key, obj.toString());
             } else {
                 result.put(key, obj);
             }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
index 6bed8be..fe233b6 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
@@ -21,12 +21,18 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.Iterator;
 
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,8 +76,13 @@
         Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
         Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
 
+        Field date = Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Mode.NULLABLE).build();
+        Field time = Field.newBuilder("time", LegacySQLTypeName.TIME).setMode(Mode.NULLABLE).build();
+        Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Mode.NULLABLE).build();
+        Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Mode.NULLABLE).build();
+
         // Table schema definition
-        schema = Schema.of(id, name, alias, addresses, job);
+        schema = Schema.of(id, name, alias, addresses, job, birth);
         TableDefinition tableDefinition = StandardTableDefinition.of(schema);
         TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
 
@@ -181,4 +192,116 @@
         deleteTable(tableName);
     }
 
-}
+    @Test
+    public void PutBigQueryStreamingNoErrorWithDate() throws Exception {
+        String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        createTable(tableName);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
+        runner.enableControllerService(jsonReader);
+
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date.json"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
+
+        TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+        FieldValueList firstElt = iterator.next();
+        FieldValueList sndElt = iterator.next();
+        assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
+        assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
+
+        FieldValueList john;
+        FieldValueList jane;
+        john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
+        jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
+
+        assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
+        assertTrue(john.get("alias").getRepeatedValue().size() == 2);
+        assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
+
+        long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
+                DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+        assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
+        assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
+        assertEquals((john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000), timestampRecordJohn);
+
+        long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
+                DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+        assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
+        assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
+        assertEquals((jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000) , timestampRecordJane);
+
+        deleteTable(tableName);
+    }
+
+    @Test
+    public void PutBigQueryStreamingNoErrorWithDateFormat() throws Exception {
+        String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        createTable(tableName);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
+        runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, "MM/dd/yyyy");
+        runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss");
+        runner.enableControllerService(jsonReader);
+
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
+
+        TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+        FieldValueList firstElt = iterator.next();
+        FieldValueList sndElt = iterator.next();
+        assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
+        assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
+
+        FieldValueList john;
+        FieldValueList jane;
+        john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
+        jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
+
+        assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
+        assertTrue(john.get("alias").getRepeatedValue().size() == 2);
+        assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
+
+        long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
+                DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+        assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
+        assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
+        assertEquals(john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJohn);
+
+        long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
+                DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+        assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
+        assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
+        assertEquals(jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJane);
+
+        deleteTable(tableName);
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
new file mode 100644
index 0000000..f27f5ed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
@@ -0,0 +1,92 @@
+{
+    "namespace": "nifi.example",
+    "name": "streaming_correct_data_with_schema",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "type": "int"
+        },
+        {
+            "name": "name",
+            "type": ["null", "string"]
+        },
+        {
+            "name": "alias",
+            "type": {
+                "type": "array",
+                "items": "string"
+            }
+        },
+        {
+            "name": "addresses",
+            "type": {
+                "type": "array",
+                "items": {
+                    "namespace": "nifi.example.address",
+                    "name": "address",
+                    "type": "record",
+                    "fields": [
+                        {
+                            "name": "zip",
+                            "type": ["null", "string"]
+                        },
+                        {
+                            "name": "city",
+                            "type": ["null", "string"]
+                        }
+                    ]
+                }
+            }
+        },
+        {
+            "name": "job",
+            "type": ["null", {
+                "namespace": "nifi.example.job",
+                "name": "job",
+                "type": "record",
+                "fields": [
+                    {
+                        "name": "position",
+                        "type": ["null", "string"]
+                    },
+                    {
+                        "name": "company",
+                        "type": ["null", "string"]
+                    }
+                ]
+            } ]
+        },
+        {
+            "name": "birth",
+            "type": {
+                "namespace": "nifi.example.birth",
+                "name": "job",
+                "type": "record",
+                "fields": [
+                    {
+                        "name": "date",
+                        "type": ["null", {
+                            "type": "int",
+                            "logicalType": "date"
+                        } ]
+                    },
+                    {
+                        "name": "time",
+                        "type": ["null", {
+                            "type": "int",
+                            "logicalType": "time-millis"
+                        } ]
+                    },
+                    {
+                        "name": "full",
+                        "type": ["null", {
+                            "type": "long",
+                            "logicalType": "timestamp-millis"
+                        } ]
+                    }
+                ]
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
new file mode 100644
index 0000000..3d21842
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
@@ -0,0 +1,46 @@
+[
+  {
+    "id": 1,
+    "name": "John Doe",
+    "alias": ["john", "jd"],
+    "addresses": [
+      {
+        "zip": "1000",
+        "city": "NiFi"
+      },
+      {
+        "zip": "2000",
+        "city": "Bar"
+      }
+    ],
+    "job": {
+      "position": "Manager",
+      "company": "ASF"
+    },
+    "birth": {
+      "date": "07/18/2021",
+      "time": "12:35:24",
+      "full": "07-18-2021 12:35:24"
+    }
+  },
+  {
+    "id": 2,
+    "name": "Jane Doe",
+    "alias": [],
+    "addresses": [
+      {
+        "zip": "1000",
+        "city": "NiFi"
+      }
+    ],
+    "job": {
+      "position": "Director",
+      "company": "ASF"
+    },
+    "birth": {
+      "date": "01/01/1992",
+      "time": "00:00:00",
+      "full": "01-01-1992 00:00:00"
+    }
+  }
+]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
new file mode 100644
index 0000000..ffe4050
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
@@ -0,0 +1,46 @@
+[
+  {
+    "id": 1,
+    "name": "John Doe",
+    "alias": ["john", "jd"],
+    "addresses": [
+      {
+        "zip": "1000",
+        "city": "NiFi"
+      },
+      {
+        "zip": "2000",
+        "city": "Bar"
+      }
+    ],
+    "job": {
+      "position": "Manager",
+      "company": "ASF"
+    },
+    "birth": {
+      "date": 1626611724000,
+      "time": 1626611724000,
+      "full": 1626611724000
+    }
+  },
+  {
+    "id": 2,
+    "name": "Jane Doe",
+    "alias": [],
+    "addresses": [
+      {
+        "zip": "1000",
+        "city": "NiFi"
+      }
+    ],
+    "job": {
+      "position": "Director",
+      "company": "ASF"
+    },
+    "birth": {
+      "date": 694224000000,
+      "time": 694224000000,
+      "full": 694224000000
+    }
+  }
+]
\ No newline at end of file