NIFI-15001 Fixed PutElasticsearchRecord with recursive reads (#10334)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index 6479e0a..bc9ea06 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -56,7 +56,7 @@
 import java.util.stream.Stream;
 
 public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
-    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
             .description("All flowfiles that are sent to Elasticsearch without request failures go to this relationship.")
             .build();
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 8675480..c05ca2c 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -61,6 +61,7 @@
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -79,6 +80,8 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -420,22 +423,28 @@
         try (final InputStream inStream = session.read(input);
             final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
             final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
+            final RecordSchema recordSchema = reader.getSchema();
             final List<IndexOperationRequest> operationList = new ArrayList<>();
-            final List<Record> originals = new ArrayList<>();
+            final List<Record> processedRecords = new ArrayList<>();
+            final List<Record> originalRecords = new ArrayList<>();
 
             Record record;
             while ((record = recordSet.next()) != null) {
-                addOperation(operationList, record, indexOperationParameters, indices, types);
-                originals.add(record);
+                final Record originalRecord = cloneRecord(record);
+                final Record processedRecord = cloneRecord(record);
+
+                addOperation(operationList, processedRecord, indexOperationParameters, indices, types);
+                processedRecords.add(processedRecord);
+                originalRecords.add(originalRecord);
 
                 if (operationList.size() == indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) {
-                    operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
+                    operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords);
                     batches++;
                 }
             }
 
             if (!operationList.isEmpty()) {
-                operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
+                operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords);
                 batches++;
             }
         } catch (final ElasticsearchException ese) {
@@ -508,20 +517,21 @@
         operationList.add(new IndexOperationRequest(index, type, id, contentMap, indexOp, script, scriptedUpsert, dynamicTemplates, bulkHeaderFields));
     }
 
-    private void operate(final List<IndexOperationRequest> operationList, final List<Record> originals, final RecordReader reader,
-                         final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters,
-                         final List<FlowFile> resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords, final int batch)
+    private void operate(final List<IndexOperationRequest> operationList, final List<Record> processedRecords, final List<Record> originalRecords,
+                         final RecordSchema recordSchema, final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters,
+                         final List<FlowFile> resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords)
             throws IOException, SchemaNotFoundException, MalformedRecordException {
 
-        final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
-        final ResponseDetails responseDetails = indexDocuments(bundle, session, input, indexOperationParameters, batch);
+        final BulkOperation bundle = new BulkOperation(operationList, processedRecords, recordSchema);
+        final ResponseDetails responseDetails = indexDocuments(bundle, originalRecords, session, input, indexOperationParameters);
 
         successfulRecords.getAndAdd(responseDetails.successCount());
         erroredRecords.getAndAdd(responseDetails.errorCount());
         resultRecords.addAll(responseDetails.outputs().values().stream().map(Output::getFlowFile).toList());
 
         operationList.clear();
-        originals.clear();
+        processedRecords.clear();
+        originalRecords.clear();
     }
 
     private void removeResultRecordFlowFiles(final List<FlowFile> results, final ProcessSession session) {
@@ -532,8 +542,8 @@
         results.clear();
     }
 
-    private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input,
-                                           final IndexOperationParameters indexOperationParameters, final int batch)
+    private ResponseDetails indexDocuments(final BulkOperation bundle, final List<Record> originalRecords, final ProcessSession session, final FlowFile input,
+                                           final IndexOperationParameters indexOperationParameters)
             throws IOException, SchemaNotFoundException, MalformedRecordException {
         final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), indexOperationParameters.getElasticsearchRequestOptions());
 
@@ -546,16 +556,7 @@
         final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
         final Map<String, Output> outputs = new HashMap<>();
 
-        try (final InputStream inStream = session.read(input);
-             final RecordReader inputReader = readerFactory.createRecordReader(input, inStream, getLogger())) {
-
-            // if there are errors present, skip through the input FlowFile to the current batch of records
-            if (numErrors > 0) {
-                for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) {
-                    inputReader.nextRecord();
-                }
-            }
-
+        try {
             for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
                 final String type;
                 final Relationship relationship;
@@ -574,7 +575,7 @@
                     } else {
                         type = OUTPUT_TYPE_ERROR;
                     }
-                    outputRecord = inputReader.nextRecord();
+                    outputRecord = originalRecords.get(o);
                     recordSchema = outputRecord.getSchema();
                 } else {
                     relationship = REL_SUCCESSFUL;
@@ -582,10 +583,6 @@
                     type = OUTPUT_TYPE_SUCCESS;
                     outputRecord = bundle.getOriginalRecords().get(o);
                     recordSchema = bundle.getSchema();
-                    // skip the associated Input Record for this successful Record
-                    if (numErrors > 0) {
-                        inputReader.nextRecord();
-                    }
                 }
                 final Output output = getOutputByType(outputs, type, session, relationship, input, recordSchema);
                 output.write(outputRecord, error);
@@ -594,7 +591,7 @@
             for (final Output output : outputs.values()) {
                 output.transfer(session);
             }
-        } catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) {
+        } catch (final IOException | SchemaNotFoundException ex) {
             getLogger().error("Unable to write error/successful records", ex);
             outputs.values().forEach(o -> {
                 try {
@@ -773,6 +770,47 @@
                 : stringValue;
     }
 
+    private Record cloneRecord(final Record record) {
+        final Map<String, Object> valuesCopy = cloneValues(record.toMap());
+        return new MapRecord(record.getSchema(), valuesCopy, record.isTypeChecked(), record.isDropUnknownFields());
+    }
+
+    private Map<String, Object> cloneValues(final Map<String, Object> values) {
+        final Map<String, Object> cloned = new LinkedHashMap<>(values.size());
+        for (final Map.Entry<String, Object> entry : values.entrySet()) {
+            cloned.put(entry.getKey(), cloneValue(entry.getValue()));
+        }
+        return cloned;
+    }
+
+    private Object cloneValue(final Object value) {
+        if (value instanceof Record recordValue) {
+            return cloneRecord(recordValue);
+        }
+        if (value instanceof Map<?, ?> mapValue) {
+            final Map<Object, Object> clonedMap = new LinkedHashMap<>(mapValue.size());
+            for (final Map.Entry<?, ?> entry : mapValue.entrySet()) {
+                clonedMap.put(entry.getKey(), cloneValue(entry.getValue()));
+            }
+            return clonedMap;
+        }
+        if (value instanceof Collection<?> collectionValue) {
+            final List<Object> clonedList = new ArrayList<>(collectionValue.size());
+            for (final Object element : collectionValue) {
+                clonedList.add(cloneValue(element));
+            }
+            return clonedList;
+        }
+        if (value instanceof Object[] arrayValue) {
+            final Object[] clonedArray = new Object[arrayValue.length];
+            for (int i = 0; i < arrayValue.length; i++) {
+                clonedArray[i] = cloneValue(arrayValue[i]);
+            }
+            return clonedArray;
+        }
+        return value;
+    }
+
     private class Output {
         private FlowFile flowFile;
         private final RecordSetWriter writer;
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
index 4eab3b3..e1e9006 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
@@ -28,6 +28,9 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -142,6 +145,30 @@
     }
 
     @Test
+    void testMultipleBatchesWithoutRecursiveReads() {
+        final String json = generateRecordsJson(2000);
+
+        runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-multiple-batches-no-recursive");
+        runner.setProperty(PutElasticsearchRecord.BATCH_SIZE, "100");
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setAllowRecursiveReads(false);
+
+        runner.enqueue(json);
+        runner.run();
+
+        runner.assertTransferCount(ElasticsearchRestProcessor.REL_FAILURE, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+
+        final List<MockFlowFile> successful = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL);
+        assertEquals(20, successful.size());
+        final int totalRecordCount = successful.stream()
+                .mapToInt(flowFile -> Integer.parseInt(flowFile.getAttribute("record.count")))
+                .sum();
+        assertEquals(2000, totalRecordCount);
+    }
+
+    @Test
     void testUpdateError() {
         final String json = """
                 {"id": "123", "foo": "bar"}
@@ -200,4 +227,17 @@
         assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
         assertTrue(errorContent.contains("\"script\":{"), errorContent);
     }
+
+    private String generateRecordsJson(final int recordCount) {
+        final StringBuilder builder = new StringBuilder(recordCount * 32);
+        builder.append('[');
+        for (int i = 0; i < recordCount; i++) {
+            builder.append("{\"id\":\"").append(i).append("\",\"value\":").append(i).append('}');
+            if (i < recordCount - 1) {
+                builder.append(',');
+            }
+        }
+        builder.append(']');
+        return builder.toString();
+    }
 }
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index dad1fd1..b2ab74b 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -108,7 +108,7 @@
         <profile>
             <id>elasticsearch8</id>
             <properties>
-                <elasticsearch_docker_image>8.18.2</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.19.4</elasticsearch_docker_image>
             </properties>
         </profile>
         <profile>