blob: 7a20e6d06e244daf053ade0a177551c140526b9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.ZipInputStream;
import static org.junit.Assert.assertEquals;
public class TestMergeContent {
@BeforeClass
public static void setup() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
}
/**
* This test will verify that if we have a FlowFile larger than the Max Size for a Bin, it will go into its
* own bin and immediately be processed as its own bin.
*/
@Test
public void testFlowFileLargerThanBin() {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
runner.setProperty(MergeContent.MIN_ENTRIES, "2");
runner.setProperty(MergeContent.MAX_ENTRIES, "2");
runner.setProperty(MergeContent.MIN_SIZE, "1 KB");
runner.setProperty(MergeContent.MAX_SIZE, "5 KB");
runner.enqueue(new byte[1026]); // add flowfile that fits within the bin limits
runner.enqueue(new byte[1024 * 6]); // add flowfile that is larger than the bin limit
runner.run(2); // run twice so that we have a chance to create two bins (though we shouldn't create 2, because only 1 bin will be full)
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 1);
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
assertEquals(runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0).getAttribute(CoreAttributes.UUID.key()),
runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).get(0).getAttribute(MergeContent.MERGE_UUID_ATTRIBUTE));
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assertEquals(1024 * 6, bundle.getSize());
// Queue should not be empty because the first FlowFile will be transferred back to the input queue
// when we run out @OnStopped logic, since it won't be transferred to any bin.
runner.assertQueueNotEmpty();
}
@Test
public void testSimpleAvroConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(3, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("Ben"));
Assert.assertTrue(users.containsKey("John"));
}
@Test
public void testAvroConcatWithDifferentSchemas() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
final Schema schema1 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final Schema schema2 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/place.avsc"));
final GenericRecord record1 = new GenericData.Record(schema1);
record1.put("name", "Alyssa");
record1.put("favorite_number", 256);
final GenericRecord record2 = new GenericData.Record(schema2);
record2.put("name", "Some Place");
final GenericRecord record3 = new GenericData.Record(schema1);
record3.put("name", "John");
record3.put("favorite_number", 5);
record3.put("favorite_color", "blue");
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema1);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema1, record1, datumWriter);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema2, record2, datumWriter);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema1, record3, datumWriter);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
final byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema1, "name");
Assert.assertEquals(2, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("John"));
final MockFlowFile failure = runner.getFlowFilesForRelationship(MergeContent.REL_FAILURE).get(0);
final byte[] failureData = runner.getContentAsByteArray(failure);
final Map<String, GenericRecord> places = getGenericRecordMap(failureData, schema2, "name");
Assert.assertEquals(1, places.size());
Assert.assertTrue(places.containsKey("Some Place"));
}
@Test
public void testAvroConcatWithDifferentMetadataDoNotMerge() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_DO_NOT_MERGE);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(1, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
}
@Test
public void testAvroConcatWithDifferentMetadataIgnore() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_IGNORE);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(3, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("Ben"));
Assert.assertTrue(users.containsKey("John"));
}
@Test
public void testAvroConcatWithDifferentMetadataUseFirst() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_USE_FIRST);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(3, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("Ben"));
Assert.assertTrue(users.containsKey("John"));
}
@Test
public void testAvroConcatWithDifferentMetadataKeepCommon() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_ALL_COMMON);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(2, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("John"));
}
private Map<String, GenericRecord> getGenericRecordMap(byte[] data, Schema schema, String key) throws IOException {
// create a reader for the merged contet
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
SeekableByteArrayInput input = new SeekableByteArrayInput(data);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(input, datumReader);
// read all the records into a map to verify all the records are there
Map<String,GenericRecord> records = new HashMap<>();
while (dataFileReader.hasNext()) {
GenericRecord user = dataFileReader.next();
records.put(user.get(key).toString(), user);
}
return records;
}
private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter) throws IOException {
return serializeAvroRecord(schema, user2, datumWriter, null);
}
private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter, Map<String, String> metadata) throws IOException {
ByteArrayOutputStream out2 = new ByteArrayOutputStream();
DataFileWriter<GenericRecord> dataFileWriter2 = new DataFileWriter<>(datumWriter);
if (metadata != null) {
metadata.forEach(dataFileWriter2::setMeta);
}
dataFileWriter2.create(schema, out2);
dataFileWriter2.append(user2);
dataFileWriter2.close();
return out2;
}
@Test
public void testSimpleBinaryConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).stream().forEach(
ff -> assertEquals(bundle.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeContent.MERGE_UUID_ATTRIBUTE)));
}
@Test
public void testSimpleBinaryConcatSingleBin() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.MAX_BIN_COUNT, "1");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
runner.setProperty(MergeContent.HEADER, "@");
runner.setProperty(MergeContent.DEMARCATOR, "#");
runner.setProperty(MergeContent.FOOTER, "$");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("@Hello#, #World!$".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
runner.setProperty(MergeContent.HEADER, "@");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("@Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME);
runner.setProperty(MergeContent.HEADER, "${header}");
runner.setProperty(MergeContent.DEMARCATOR, "${demarcator}");
runner.setProperty(MergeContent.FOOTER, "${footer}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("header", "src/test/resources/TestMergeContent/head");
attributes.put("demarcator", "src/test/resources/TestMergeContent/demarcate");
attributes.put("footer", "src/test/resources/TestMergeContent/foot");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("(|)Hello***, ***World!___".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testTextDelimitersValidation() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
runner.setProperty(MergeContent.HEADER, "");
runner.setProperty(MergeContent.DEMARCATOR, "");
runner.setProperty(MergeContent.FOOTER, "");
Collection<ValidationResult> results = new HashSet<>();
ProcessContext context = runner.getProcessContext();
if (context instanceof MockProcessContext) {
MockProcessContext mockContext = (MockProcessContext)context;
results = mockContext.validate();
}
Assert.assertEquals(3, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("cannot be empty"));
}
}
@Test
public void testFileDelimitersValidation() throws IOException, InterruptedException {
final String doesNotExistFile = "src/test/resources/TestMergeContent/does_not_exist";
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME);
runner.setProperty(MergeContent.HEADER, doesNotExistFile);
runner.setProperty(MergeContent.DEMARCATOR, doesNotExistFile);
runner.setProperty(MergeContent.FOOTER, doesNotExistFile);
Collection<ValidationResult> results = new HashSet<>();
ProcessContext context = runner.getProcessContext();
if (context instanceof MockProcessContext) {
MockProcessContext mockContext = (MockProcessContext)context;
results = mockContext.validate();
}
Assert.assertEquals(3, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because File " + new File(doesNotExistFile).toString() + " does not exist"));
}
}
@Test
public void testMimeTypeIsOctetStreamIfConflictingWithBinaryConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 4);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/octet-stream");
}
@Test
public void testOldestBinIsExpired() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 day");
runner.setProperty(MergeContent.MAX_BIN_COUNT, "50");
runner.setProperty(MergeContent.MIN_ENTRIES, "10");
runner.setProperty(MergeContent.MAX_ENTRIES, "10");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "correlationId");
final Map<String, String> attrs = new HashMap<>();
for (int i = 0; i < 49; i++) {
attrs.put("correlationId", String.valueOf(i));
for (int j = 0; j < 5; j++) {
runner.enqueue(new byte[0], attrs);
}
}
runner.run();
runner.assertQueueNotEmpty(); // sessions rolled back.
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
attrs.remove("correlationId");
runner.clearTransferState();
// Run a single iteration but do not perform the @OnStopped action because
// we do not want to purge our Bin Manager. This causes some bins to get
// created. We then enqueue a FlowFile with no correlation id. We do it this
// way because if we just run a single iteration, then all FlowFiles will be
// pulled in at once, and we don't know if the first bin to be created will
// have 5 FlowFiles or 1 FlowFile, since this one that we are about to enqueue
// will be in a separate bin.
runner.run(1, false, true);
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 5);
}
@Test
public void testSimpleBinaryConcatWaitsForMin() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.MIN_SIZE, "20 KB");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
}
@Test
public void testZip() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
try (final InputStream rawIn = new ByteArrayInputStream(runner.getContentAsByteArray(bundle)); final ZipInputStream in = new ZipInputStream(rawIn)) {
Assert.assertNotNull(in.getNextEntry());
final byte[] part1 = IOUtils.toByteArray(in);
Assert.assertTrue(Arrays.equals("Hello".getBytes("UTF-8"), part1));
in.getNextEntry();
final byte[] part2 = IOUtils.toByteArray(in);
Assert.assertTrue(Arrays.equals(", ".getBytes("UTF-8"), part2));
in.getNextEntry();
final byte[] part3 = IOUtils.toByteArray(in);
Assert.assertTrue(Arrays.equals("World!".getBytes("UTF-8"), part3));
}
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip");
}
@Test
public void testZipException() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("filename", "duplicate-filename.txt");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
}
@Test
public void testTar() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_TAR);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put(CoreAttributes.FILENAME.key(), "AShortFileName");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
attributes.put(CoreAttributes.FILENAME.key(), "ALongerrrFileName");
runner.enqueue(", ".getBytes("UTF-8"), attributes);
attributes.put(CoreAttributes.FILENAME.key(), "AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName");
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
try (final InputStream rawIn = new ByteArrayInputStream(runner.getContentAsByteArray(bundle)); final TarArchiveInputStream in = new TarArchiveInputStream(rawIn)) {
ArchiveEntry entry = in.getNextEntry();
Assert.assertNotNull(entry);
assertEquals("AShortFileName", entry.getName());
final byte[] part1 = IOUtils.toByteArray(in);
Assert.assertTrue(Arrays.equals("Hello".getBytes("UTF-8"), part1));
entry = in.getNextEntry();
assertEquals("ALongerrrFileName", entry.getName());
final byte[] part2 = IOUtils.toByteArray(in);
Assert.assertTrue(Arrays.equals(", ".getBytes("UTF-8"), part2));
entry = in.getNextEntry();
assertEquals("AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName", entry.getName());
final byte[] part3 = IOUtils.toByteArray(in);
Assert.assertTrue(Arrays.equals("World!".getBytes("UTF-8"), part3));
}
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/tar");
}
@Test
public void testFlowFileStream() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MIN_ENTRIES, "2");
runner.setProperty(MergeContent.MAX_ENTRIES, "2");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_FLOWFILE_STREAM_V3);
final Map<String, String> attributes = new HashMap<>();
attributes.put("path", "folder");
runner.enqueue(Paths.get("src/test/resources/TestUnpackContent/folder/cal.txt"), attributes);
runner.enqueue(Paths.get("src/test/resources/TestUnpackContent/folder/date.txt"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 2);
final MockFlowFile merged = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
merged.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), StandardFlowFileMediaType.VERSION_3.getMediaType());
}
@Test
public void testDefragment() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
}
@Test
public void testDefragmentDuplicateFragement() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
// enqueue a duplicate fragment
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(1, false);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
runner.clearTransferState();
Thread.sleep(1_100L);
runner.run();
runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
}
@Test
public void testDefragmentWithTooManyFragements() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
}
@Test
public void testDefragmentWithTooFewFragments() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "2 secs");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "5");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(1, false);
while (true) {
try {
Thread.sleep(3000L);
break;
} catch (final InterruptedException ie) {
}
}
runner.run(1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 4);
}
@Test
public void testDefragmentOutOfOrder() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).stream().forEach(
ff -> assertEquals(assembled.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeContent.MERGE_UUID_ATTRIBUTE)));
}
@Test
public void testDefragmentMultipleMingledSegments() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
final Map<String, String> secondAttrs = new HashMap<>();
secondAttrs.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "TWO");
secondAttrs.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
secondAttrs.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("No x ".getBytes("UTF-8"), secondAttrs);
secondAttrs.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("in ".getBytes("UTF-8"), secondAttrs);
secondAttrs.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("Nixon".getBytes("UTF-8"), secondAttrs);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(1);
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
final MockFlowFile assembledTwo = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(1);
assembledTwo.assertContentEquals("No x in Nixon".getBytes("UTF-8"));
}
@Test
public void testDefragmentOldStyleAttributes() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>();
attributes.put("segment.identifier", "1");
attributes.put("segment.count", "4");
attributes.put("segment.index", "1");
attributes.put("segment.original.filename", "originalfilename");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put("segment.index", "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put("segment.index", "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put("segment.index", "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
assembled.assertAttributeEquals(CoreAttributes.FILENAME.key(), "originalfilename");
}
@Test
public void testDefragmentMultipleOnTriggers() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
runner.run();
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
runner.run();
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
runner.run();
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
}
@Test
public void testMergeBasedOnCorrelation() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "attr");
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "1");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attr", "b");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put("attr", "c");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put("attr", "b");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(1);
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
final List<MockFlowFile> mergedFiles = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
final MockFlowFile merged1 = mergedFiles.get(0);
final MockFlowFile merged2 = mergedFiles.get(1);
final String attr1 = merged1.getAttribute("attr");
final String attr2 = merged2.getAttribute("attr");
if ("c".equals(attr1)) {
Assert.assertEquals("b", attr2);
merged1.assertContentEquals("A Canal ", "UTF-8");
merged2.assertContentEquals("A Man A Plan Panama", "UTF-8");
} else {
assertEquals("b", attr1);
assertEquals("c", attr2);
merged1.assertContentEquals("A Man A Plan Panama", "UTF-8");
merged2.assertContentEquals("A Canal ", "UTF-8");
}
}
@Test
public void testMaxBinAge() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
runner.setProperty(MergeContent.MAX_BIN_AGE, "2 sec");
runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "attr");
runner.setProperty(MergeContent.MAX_ENTRIES, "500");
runner.setProperty(MergeContent.MIN_ENTRIES, "500");
for (int i = 0; i < 50; i++) {
runner.enqueue(new byte[0]);
}
runner.run(5, false);
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
runner.clearTransferState();
Thread.sleep(3000L);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
}
@Test
public void testUniqueAttributes() {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_UNIQUE);
runner.setProperty(MergeContent.MAX_SIZE, "2 B");
runner.setProperty(MergeContent.MIN_SIZE, "2 B");
final Map<String, String> attr1 = new HashMap<>();
attr1.put("abc", "xyz");
attr1.put("xyz", "123");
attr1.put("hello", "good-bye");
final Map<String, String> attr2 = new HashMap<>();
attr2.put("abc", "xyz");
attr2.put("xyz", "321");
attr2.put("world", "aaa");
runner.enqueue(new byte[1], attr1);
runner.enqueue(new byte[1], attr2);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
outFile.assertAttributeEquals("abc", "xyz");
outFile.assertAttributeEquals("hello", "good-bye");
outFile.assertAttributeEquals("world", "aaa");
outFile.assertAttributeNotExists("xyz");
}
@Test
public void testCommonAttributesOnly() {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_COMMON);
runner.setProperty(MergeContent.MAX_SIZE, "2 B");
runner.setProperty(MergeContent.MIN_SIZE, "2 B");
final Map<String, String> attr1 = new HashMap<>();
attr1.put("abc", "xyz");
attr1.put("xyz", "123");
attr1.put("hello", "good-bye");
final Map<String, String> attr2 = new HashMap<>();
attr2.put("abc", "xyz");
attr2.put("xyz", "321");
attr2.put("world", "aaa");
runner.enqueue(new byte[1], attr1);
runner.enqueue(new byte[1], attr2);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
outFile.assertAttributeEquals("abc", "xyz");
outFile.assertAttributeNotExists("hello");
outFile.assertAttributeNotExists("world");
outFile.assertAttributeNotExists("xyz");
final Set<String> uuids = new HashSet<>();
for (final MockFlowFile mff : runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL)) {
uuids.add(mff.getAttribute(CoreAttributes.UUID.key()));
}
uuids.add(outFile.getAttribute(CoreAttributes.UUID.key()));
assertEquals(3, uuids.size());
}
@Test
public void testCountAttribute() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(MergeContent.MERGE_COUNT_ATTRIBUTE, "3");
bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE);
}
@Test
public void testLeavesSmallBinUnmerged() {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MIN_ENTRIES, "5");
runner.setProperty(MergeContent.MAX_ENTRIES, "5");
runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
for (int i = 0; i < 17; i++) {
runner.enqueue(String.valueOf(i) + "\n");
}
runner.run(5);
runner.assertTransferCount(MergeContent.REL_MERGED, 3);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15);
assertEquals(2, runner.getQueueSize().getObjectCount());
}
private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
// add 'fragment.index' attribute to ensure non-defragment mode operates correctly even when index is present
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
testRunner.enqueue("Hello".getBytes("UTF-8"), attributes);
testRunner.enqueue(", ".getBytes("UTF-8"), attributes);
testRunner.enqueue("World!".getBytes("UTF-8"), attributes);
}
}