NIFI-8698: If using a Data RecordPath and handling schema drift, ensure that the RecordPath is evaluated against the data before attempting to determine the fields to update. Also refactored to move the handling of schema drift into its own method for the purpose of clarity

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5156
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index ba6b929..49f0ca7 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -426,39 +426,19 @@
                 final RecordSet recordSet = recordReader.createRecordSet();
                 KuduTable kuduTable = kuduClient.openTable(tableName);
 
+                // Get the first record so that we can evaluate the Kudu table for Schema drift.
+                Record record = recordSet.next();
+
                 // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
                 if (handleSchemaDrift) {
-                    final Schema schema = kuduTable.getSchema();
-                    final List<RecordField> missing = recordReader.getSchema().getFields().stream()
-                            .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
-                            .collect(Collectors.toList());
+                    final boolean driftDetected = handleSchemaDrift(kuduTable, kuduClient, flowFile, record, lowercaseFields);
 
-                    if (!missing.isEmpty()) {
-                        getLogger().info("adding {} columns to table '{}' to handle schema drift", missing.size(), tableName);
-
-                        // Add each column one at a time to avoid failing if some of the missing columns
-                        // we created by a concurrent thread or application attempting to handle schema drift.
-                        for (final RecordField field : missing) {
-                            try {
-                                final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
-                                kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
-                            } catch (final KuduException e) {
-                                // Ignore the exception if the column already exists due to concurrent
-                                // threads or applications attempting to handle schema drift.
-                                if (e.getStatus().isAlreadyPresent()) {
-                                    getLogger().info("Column already exists in table '{}' while handling schema drift", tableName);
-                                } else {
-                                    throw new ProcessException(e);
-                                }
-                            }
-                        }
-
+                    if (driftDetected) {
                         // Re-open the table to get the new schema.
                         kuduTable = kuduClient.openTable(tableName);
                     }
                 }
 
-                Record record = recordSet.next();
                 recordReaderLoop: while (record != null) {
                     final OperationType operationType = operationTypeFunction.apply(record);
 
@@ -536,6 +516,67 @@
         }
     }
 
+    private boolean handleSchemaDrift(final KuduTable kuduTable, final KuduClient kuduClient, final FlowFile flowFile, final Record record, final boolean lowercaseFields) {
+        if (record == null) {
+            getLogger().debug("No Record to evaluate schema drift against for {}", flowFile);
+            return false;
+        }
+
+        final String tableName = kuduTable.getName();
+        final Schema schema = kuduTable.getSchema();
+
+        final List<RecordField> recordFields;
+        if (dataRecordPath == null) {
+            recordFields = record.getSchema().getFields();
+        } else {
+            final RecordPathResult recordPathResult = dataRecordPath.evaluate(record);
+            final List<FieldValue> fieldValues =  recordPathResult.getSelectedFields().collect(Collectors.toList());
+
+            recordFields = new ArrayList<>();
+            for (final FieldValue fieldValue : fieldValues) {
+                final RecordField recordField = fieldValue.getField();
+                if (recordField.getDataType().getFieldType() == RecordFieldType.RECORD) {
+                    final Object value = fieldValue.getValue();
+                    if (value instanceof Record) {
+                        recordFields.addAll(((Record) value).getSchema().getFields());
+                    }
+                } else {
+                    recordFields.add(recordField);
+                }
+            }
+        }
+
+        final List<RecordField> missing = recordFields.stream()
+            .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+            .collect(Collectors.toList());
+
+        if (missing.isEmpty()) {
+            getLogger().debug("No schema drift detected for {}", flowFile);
+            return false;
+        }
+
+        getLogger().info("Adding {} columns to table '{}' to handle schema drift", missing.size(), tableName);
+
+        // Add each column one at a time to avoid failing if some of the missing columns
+        // we created by a concurrent thread or application attempting to handle schema drift.
+        for (final RecordField field : missing) {
+            try {
+                final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
+            } catch (final KuduException e) {
+                // Ignore the exception if the column already exists due to concurrent
+                // threads or applications attempting to handle schema drift.
+                if (e.getStatus().isAlreadyPresent()) {
+                    getLogger().info("Column already exists in table '{}' while handling schema drift", tableName);
+                } else {
+                    throw new ProcessException(e);
+                }
+            }
+        }
+
+        return true;
+    }
+
     private void transferFlowFiles(final List<FlowFile> flowFiles,
                                    final Map<FlowFile, Integer> processedRecords,
                                    final Map<FlowFile, Object> flowFileFailures,