NIFI-8609: Optimized AvroTypeUtil Record creation and conversion

Added unit test that is ignored so that it can be manually run for testing performance before/after changes to AvroTypeUtil. Updated AvroTypeUtil to be more efficient by not using Record.getValue() and instead iterating over the Map of values directly. getValue() is less efficient here because we know the RecordField's we are iterating over exist in the schema since they are retrieved from there directly; as a result, any null values still have be looked up by aliaases, but that step can be skipped in this situation. Also avoided looking for fields that exist in Avro Schema and not in RecordSchema just to set default values on GenericRecord - there's no need to set them if they are default values.

This closes #5080

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 2d5ee94..6003e19 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -585,27 +585,37 @@
         final GenericRecord rec = new GenericData.Record(avroSchema);
         final RecordSchema recordSchema = record.getSchema();
 
-        for (final RecordField recordField : recordSchema.getFields()) {
-            final Object rawValue = record.getValue(recordField);
-
-            Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
-            final String fieldName = fieldPair.getLeft();
-            final Field field = fieldPair.getRight();
-            if (field == null) {
+        final Map<String, Object> recordValues = record.toMap();
+        for (final Map.Entry<String, Object> entry : recordValues.entrySet()) {
+            final Object rawValue = entry.getValue();
+            if (rawValue == null) {
                 continue;
             }
 
-            final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
-            rec.put(field.name(), converted);
-        }
-
-        // see if the Avro schema has any fields that aren't in the RecordSchema, and if those fields have a default
-        // value then we want to populate it in the GenericRecord being produced
-        for (final Field field : avroSchema.getFields()) {
-            final Optional<RecordField> recordField = recordSchema.getField(field.name());
-            if (!recordField.isPresent() && rec.get(field.name()) == null && field.defaultVal() != null) {
-                rec.put(field.name(), field.defaultVal());
+            final String rawFieldName = entry.getKey();
+            final Optional<RecordField> optionalRecordField = recordSchema.getField(rawFieldName);
+            if (!optionalRecordField.isPresent()) {
+                continue;
             }
+
+            final RecordField recordField = optionalRecordField.get();
+
+            final Field field;
+            final Field avroField = avroSchema.getField(rawFieldName);
+            if (avroField == null) {
+                final Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
+                field = fieldPair.getRight();
+
+                if (field == null) {
+                    continue;
+                }
+            } else {
+                field = avroField;
+            }
+
+            final String fieldName = field.name();
+            final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
+            rec.put(fieldName, converted);
         }
 
         return rec;
@@ -850,6 +860,10 @@
                     throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
                 }
             case STRING:
+                if (rawValue instanceof String) {
+                    return rawValue;
+                }
+
                 return DataTypeUtils.toString(rawValue, (String) null, charset);
         }
 
@@ -913,12 +927,19 @@
         // we will have two possible types, and one of them will be null. When this happens, we can be much more efficient by simply
         // determining the non-null type and converting to that.
         final List<Schema> schemaTypes = fieldSchema.getTypes();
-        if (schemaTypes.size() == 2 && (schemaTypes.get(0).getType() == Type.NULL || schemaTypes.get(1).getType() == Type.NULL)) {
-            final Schema nonNullType = schemaTypes.get(0).getType() == Type.NULL ? schemaTypes.get(1) : schemaTypes.get(0);
-            return conversion.apply(nonNullType);
+        if (schemaTypes.size() == 2) {
+            final Schema firstSchema = schemaTypes.get(0);
+            final Schema secondSchema = schemaTypes.get(1);
+
+            if (firstSchema.getType() == Type.NULL) {
+                return conversion.apply(secondSchema);
+            }
+            if (secondSchema.getType() == Type.NULL) {
+                return conversion.apply(firstSchema);
+            }
         }
 
-        Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
+        final Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
                 originalValue,
                 getNonNullSubSchemas(fieldSchema),
                 AvroTypeUtil::determineDataType
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index ccbfff4..b0e4a23 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -40,6 +40,7 @@
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -71,6 +72,41 @@
 public class TestAvroTypeUtil {
 
     @Test
+    @Ignore("Performance test meant for manually testing only before/after changes in order to measure performance difference caused by changes.")
+    public void testCreateAvroRecordPerformance() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        for (int i=0; i < 100; i++) {
+            fields.add(new RecordField("field" + i, RecordFieldType.STRING.getDataType(), true));
+        }
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+
+        final Map<String, Object> values = new HashMap<>();
+        for (int i=0; i < 100; i++) {
+            // Leave half of the values null
+            if (i % 2 == 0) {
+                values.put("field" + i, String.valueOf(i));
+            }
+        }
+
+        final MapRecord record = new MapRecord(recordSchema, values);
+
+        final int iterations = 1_000_000;
+
+        for (int j=0; j < 1_000; j++) {
+            final long start = System.currentTimeMillis();
+
+            for (int i = 0; i < iterations; i++) {
+                AvroTypeUtil.createAvroRecord(record, avroSchema);
+            }
+
+            final long millis = System.currentTimeMillis() - start;
+            System.out.println(millis);
+        }
+    }
+
+    @Test
     public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));