NIFI-15001 Fixed PutElasticsearchRecord with recursive reads (#10334)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index 6479e0a..bc9ea06 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -56,7 +56,7 @@
import java.util.stream.Stream;
public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
- static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All flowfiles that are sent to Elasticsearch without request failures go to this relationship.")
.build();
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 8675480..c05ca2c 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -61,6 +61,7 @@
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -79,6 +80,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -420,22 +423,28 @@
try (final InputStream inStream = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
+ final RecordSchema recordSchema = reader.getSchema();
final List<IndexOperationRequest> operationList = new ArrayList<>();
- final List<Record> originals = new ArrayList<>();
+ final List<Record> processedRecords = new ArrayList<>();
+ final List<Record> originalRecords = new ArrayList<>();
Record record;
while ((record = recordSet.next()) != null) {
- addOperation(operationList, record, indexOperationParameters, indices, types);
- originals.add(record);
+ final Record originalRecord = cloneRecord(record);
+ final Record processedRecord = cloneRecord(record);
+
+ addOperation(operationList, processedRecord, indexOperationParameters, indices, types);
+ processedRecords.add(processedRecord);
+ originalRecords.add(originalRecord);
if (operationList.size() == indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) {
- operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
+ operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords);
batches++;
}
}
if (!operationList.isEmpty()) {
- operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
+ operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords);
batches++;
}
} catch (final ElasticsearchException ese) {
@@ -508,20 +517,21 @@
operationList.add(new IndexOperationRequest(index, type, id, contentMap, indexOp, script, scriptedUpsert, dynamicTemplates, bulkHeaderFields));
}
- private void operate(final List<IndexOperationRequest> operationList, final List<Record> originals, final RecordReader reader,
- final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters,
- final List<FlowFile> resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords, final int batch)
+ private void operate(final List<IndexOperationRequest> operationList, final List<Record> processedRecords, final List<Record> originalRecords,
+ final RecordSchema recordSchema, final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters,
+ final List<FlowFile> resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords)
throws IOException, SchemaNotFoundException, MalformedRecordException {
- final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
- final ResponseDetails responseDetails = indexDocuments(bundle, session, input, indexOperationParameters, batch);
+ final BulkOperation bundle = new BulkOperation(operationList, processedRecords, recordSchema);
+ final ResponseDetails responseDetails = indexDocuments(bundle, originalRecords, session, input, indexOperationParameters);
successfulRecords.getAndAdd(responseDetails.successCount());
erroredRecords.getAndAdd(responseDetails.errorCount());
resultRecords.addAll(responseDetails.outputs().values().stream().map(Output::getFlowFile).toList());
operationList.clear();
- originals.clear();
+ processedRecords.clear();
+ originalRecords.clear();
}
private void removeResultRecordFlowFiles(final List<FlowFile> results, final ProcessSession session) {
@@ -532,8 +542,8 @@
results.clear();
}
- private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input,
- final IndexOperationParameters indexOperationParameters, final int batch)
+ private ResponseDetails indexDocuments(final BulkOperation bundle, final List<Record> originalRecords, final ProcessSession session, final FlowFile input,
+ final IndexOperationParameters indexOperationParameters)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), indexOperationParameters.getElasticsearchRequestOptions());
@@ -546,16 +556,7 @@
final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();
- try (final InputStream inStream = session.read(input);
- final RecordReader inputReader = readerFactory.createRecordReader(input, inStream, getLogger())) {
-
- // if there are errors present, skip through the input FlowFile to the current batch of records
- if (numErrors > 0) {
- for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) {
- inputReader.nextRecord();
- }
- }
-
+ try {
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
@@ -574,7 +575,7 @@
} else {
type = OUTPUT_TYPE_ERROR;
}
- outputRecord = inputReader.nextRecord();
+ outputRecord = originalRecords.get(o);
recordSchema = outputRecord.getSchema();
} else {
relationship = REL_SUCCESSFUL;
@@ -582,10 +583,6 @@
type = OUTPUT_TYPE_SUCCESS;
outputRecord = bundle.getOriginalRecords().get(o);
recordSchema = bundle.getSchema();
- // skip the associated Input Record for this successful Record
- if (numErrors > 0) {
- inputReader.nextRecord();
- }
}
final Output output = getOutputByType(outputs, type, session, relationship, input, recordSchema);
output.write(outputRecord, error);
@@ -594,7 +591,7 @@
for (final Output output : outputs.values()) {
output.transfer(session);
}
- } catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) {
+ } catch (final IOException | SchemaNotFoundException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
@@ -773,6 +770,47 @@
: stringValue;
}
+ private Record cloneRecord(final Record record) {
+ final Map<String, Object> valuesCopy = cloneValues(record.toMap());
+ return new MapRecord(record.getSchema(), valuesCopy, record.isTypeChecked(), record.isDropUnknownFields());
+ }
+
+ private Map<String, Object> cloneValues(final Map<String, Object> values) {
+ final Map<String, Object> cloned = new LinkedHashMap<>(values.size());
+ for (final Map.Entry<String, Object> entry : values.entrySet()) {
+ cloned.put(entry.getKey(), cloneValue(entry.getValue()));
+ }
+ return cloned;
+ }
+
+ private Object cloneValue(final Object value) {
+ if (value instanceof Record recordValue) {
+ return cloneRecord(recordValue);
+ }
+ if (value instanceof Map<?, ?> mapValue) {
+ final Map<Object, Object> clonedMap = new LinkedHashMap<>(mapValue.size());
+ for (final Map.Entry<?, ?> entry : mapValue.entrySet()) {
+ clonedMap.put(entry.getKey(), cloneValue(entry.getValue()));
+ }
+ return clonedMap;
+ }
+ if (value instanceof Collection<?> collectionValue) {
+ final List<Object> clonedList = new ArrayList<>(collectionValue.size());
+ for (final Object element : collectionValue) {
+ clonedList.add(cloneValue(element));
+ }
+ return clonedList;
+ }
+ if (value instanceof Object[] arrayValue) {
+ final Object[] clonedArray = new Object[arrayValue.length];
+ for (int i = 0; i < arrayValue.length; i++) {
+ clonedArray[i] = cloneValue(arrayValue[i]);
+ }
+ return clonedArray;
+ }
+ return value;
+ }
+
private class Output {
private FlowFile flowFile;
private final RecordSetWriter writer;
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
index 4eab3b3..e1e9006 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
@@ -28,6 +28,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -142,6 +145,30 @@
}
@Test
+ void testMultipleBatchesWithoutRecursiveReads() {
+ final String json = generateRecordsJson(2000);
+
+ runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-multiple-batches-no-recursive");
+ runner.setProperty(PutElasticsearchRecord.BATCH_SIZE, "100");
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setAllowRecursiveReads(false);
+
+ runner.enqueue(json);
+ runner.run();
+
+ runner.assertTransferCount(ElasticsearchRestProcessor.REL_FAILURE, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+
+ final List<MockFlowFile> successful = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL);
+ assertEquals(20, successful.size());
+ final int totalRecordCount = successful.stream()
+ .mapToInt(flowFile -> Integer.parseInt(flowFile.getAttribute("record.count")))
+ .sum();
+ assertEquals(2000, totalRecordCount);
+ }
+
+ @Test
void testUpdateError() {
final String json = """
{"id": "123", "foo": "bar"}
@@ -200,4 +227,17 @@
assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
assertTrue(errorContent.contains("\"script\":{"), errorContent);
}
+
+ private String generateRecordsJson(final int recordCount) {
+ final StringBuilder builder = new StringBuilder(recordCount * 32);
+ builder.append('[');
+ for (int i = 0; i < recordCount; i++) {
+ builder.append("{\"id\":\"").append(i).append("\",\"value\":").append(i).append('}');
+ if (i < recordCount - 1) {
+ builder.append(',');
+ }
+ }
+ builder.append(']');
+ return builder.toString();
+ }
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index dad1fd1..b2ab74b 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -108,7 +108,7 @@
<profile>
<id>elasticsearch8</id>
<properties>
- <elasticsearch_docker_image>8.18.2</elasticsearch_docker_image>
+ <elasticsearch_docker_image>8.19.4</elasticsearch_docker_image>
</properties>
</profile>
<profile>