NIFI-8442 Add a new test with date, timestamp and time as string & New management of date, time and timestamp
NIFI-8442 Put DateTimeFormatter as static and Add comments to explain why ZoneOffset.UTC is required
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5014.
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 6d5cb58..b1a4117 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -126,6 +126,9 @@
<exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
<exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
+ <exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
+ <exclude>src/test/resources/bigquery/streaming-correct-data-with-date.json</exclude>
+ <exclude>src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json</exclude>
</excludes>
</configuration>
</plugin>
@@ -149,4 +152,4 @@
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
index 98457a3..d8d9b3d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
@@ -18,10 +18,17 @@
package org.apache.nifi.processors.gcp.bigquery;
import java.io.InputStream;
-import java.util.ArrayList;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
@@ -93,6 +100,9 @@
.defaultValue("false")
.build();
+ private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+ private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
+
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor> builder()
@@ -191,6 +201,18 @@
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
}
result.put(key, lmapr);
+ } else if (obj instanceof Timestamp) {
+ // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
+ // the local system time zone to the GMT time zone
+ LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), ZoneOffset.UTC);
+ result.put(key, dateTime.format(timestampFormatter));
+ } else if (obj instanceof Time) {
+ // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
+ // the local system time zone to the GMT time zone
+ LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
+ result.put(key, dateTime.format(timeFormatter) );
+ } else if (obj instanceof Date) {
+ result.put(key, obj.toString());
} else {
result.put(key, obj);
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
index 6bed8be..fe233b6 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
@@ -21,12 +21,18 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
@@ -70,8 +76,13 @@
Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
+ Field date = Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Mode.NULLABLE).build();
+ Field time = Field.newBuilder("time", LegacySQLTypeName.TIME).setMode(Mode.NULLABLE).build();
+ Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Mode.NULLABLE).build();
+ Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Mode.NULLABLE).build();
+
// Table schema definition
- schema = Schema.of(id, name, alias, addresses, job);
+ schema = Schema.of(id, name, alias, addresses, job, birth);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
@@ -181,4 +192,116 @@
deleteTable(tableName);
}
-}
+ @Test
+ public void PutBigQueryStreamingNoErrorWithDate() throws Exception {
+ String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ createTable(tableName);
+
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+ final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
+ runner.enableControllerService(jsonReader);
+
+ runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+ runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date.json"));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
+
+ TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+ Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+ FieldValueList firstElt = iterator.next();
+ FieldValueList sndElt = iterator.next();
+ assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
+ assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
+
+ FieldValueList john;
+ FieldValueList jane;
+ john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
+ jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
+
+ assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
+ assertTrue(john.get("alias").getRepeatedValue().size() == 2);
+ assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
+
+ long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
+ DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
+ assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
+ assertEquals((john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000), timestampRecordJohn);
+
+ long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
+ DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
+ assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
+ assertEquals((jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000) , timestampRecordJane);
+
+ deleteTable(tableName);
+ }
+
+ @Test
+ public void PutBigQueryStreamingNoErrorWithDateFormat() throws Exception {
+ String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ createTable(tableName);
+
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+ final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
+ runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, "MM/dd/yyyy");
+ runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
+ runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss");
+ runner.enableControllerService(jsonReader);
+
+ runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+ runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json"));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
+
+ TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+ Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+ FieldValueList firstElt = iterator.next();
+ FieldValueList sndElt = iterator.next();
+ assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
+ assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
+
+ FieldValueList john;
+ FieldValueList jane;
+ john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
+ jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
+
+ assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
+ assertTrue(john.get("alias").getRepeatedValue().size() == 2);
+ assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
+
+ long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
+ DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
+ assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
+ assertEquals(john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJohn);
+
+ long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
+ DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
+ assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
+ assertEquals(jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJane);
+
+ deleteTable(tableName);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
new file mode 100644
index 0000000..f27f5ed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
@@ -0,0 +1,92 @@
+{
+ "namespace": "nifi.example",
+ "name": "streaming_correct_data_with_schema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "name",
+ "type": ["null", "string"]
+ },
+ {
+ "name": "alias",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "addresses",
+ "type": {
+ "type": "array",
+ "items": {
+ "namespace": "nifi.example.address",
+ "name": "address",
+ "type": "record",
+ "fields": [
+ {
+ "name": "zip",
+ "type": ["null", "string"]
+ },
+ {
+ "name": "city",
+ "type": ["null", "string"]
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name": "job",
+ "type": ["null", {
+ "namespace": "nifi.example.job",
+ "name": "job",
+ "type": "record",
+ "fields": [
+ {
+ "name": "position",
+ "type": ["null", "string"]
+ },
+ {
+ "name": "company",
+ "type": ["null", "string"]
+ }
+ ]
+ } ]
+ },
+ {
+ "name": "birth",
+ "type": {
+ "namespace": "nifi.example.birth",
+ "name": "job",
+ "type": "record",
+ "fields": [
+ {
+ "name": "date",
+ "type": ["null", {
+ "type": "int",
+ "logicalType": "date"
+ } ]
+ },
+ {
+ "name": "time",
+ "type": ["null", {
+ "type": "int",
+ "logicalType": "time-millis"
+ } ]
+ },
+ {
+ "name": "full",
+ "type": ["null", {
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ } ]
+ }
+ ]
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
new file mode 100644
index 0000000..3d21842
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
@@ -0,0 +1,46 @@
+[
+ {
+ "id": 1,
+ "name": "John Doe",
+ "alias": ["john", "jd"],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ },
+ {
+ "zip": "2000",
+ "city": "Bar"
+ }
+ ],
+ "job": {
+ "position": "Manager",
+ "company": "ASF"
+ },
+ "birth": {
+ "date": "07/18/2021",
+ "time": "12:35:24",
+ "full": "07-18-2021 12:35:24"
+ }
+ },
+ {
+ "id": 2,
+ "name": "Jane Doe",
+ "alias": [],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ }
+ ],
+ "job": {
+ "position": "Director",
+ "company": "ASF"
+ },
+ "birth": {
+ "date": "01/01/1992",
+ "time": "00:00:00",
+ "full": "01-01-1992 00:00:00"
+ }
+ }
+]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
new file mode 100644
index 0000000..ffe4050
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
@@ -0,0 +1,46 @@
+[
+ {
+ "id": 1,
+ "name": "John Doe",
+ "alias": ["john", "jd"],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ },
+ {
+ "zip": "2000",
+ "city": "Bar"
+ }
+ ],
+ "job": {
+ "position": "Manager",
+ "company": "ASF"
+ },
+ "birth": {
+ "date": 1626611724000,
+ "time": 1626611724000,
+ "full": 1626611724000
+ }
+ },
+ {
+ "id": 2,
+ "name": "Jane Doe",
+ "alias": [],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ }
+ ],
+ "job": {
+ "position": "Director",
+ "company": "ASF"
+ },
+ "birth": {
+ "date": 694224000000,
+ "time": 694224000000,
+ "full": 694224000000
+ }
+ }
+]
\ No newline at end of file