blob: f545f07a2a75c0e44626dd80d81ef7e7edbaf50a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestUnpackContent {
private static final String FIRST_FRAGMENT_INDEX = "1";
private static final Path dataPath = Paths.get("src/test/resources/TestUnpackContent");
@Test
public void testTar() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.tar"));
unpackRunner.enqueue(dataPath.resolve("data.tar"));
Map<String, String> attributes = new HashMap<>(1);
Map<String, String> attributes2 = new HashMap<>(1);
attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType());
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertEquals("rw-r--r--", flowFile.getAttribute("file.permissions"));
assertEquals("jmcarey", flowFile.getAttribute("file.owner"));
assertEquals("mkpasswd", flowFile.getAttribute("file.group"));
String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime");
try {
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString);
} catch (DateTimeParseException e) {
fail();
}
String creationTimeAsString = flowFile.getAttribute("file.creationTime");
try {
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString);
} catch (DateTimeParseException e) {
fail();
}
assertTrue(Files.exists(path));
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testTarWithFilter() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$");
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
unpackRunner.enqueue(dataPath.resolve("data.tar"));
unpackRunner.enqueue(dataPath.resolve("data.tar"));
Map<String, String> attributes = new HashMap<>(1);
Map<String, String> attributes2 = new HashMap<>(1);
attributes.put("mime.type", "application/x-tar");
attributes2.put("mime.type", "application/tar");
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2);
unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
assertEquals("date.txt", filename);
flowFile.assertContentEquals(path.toFile());
}
unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
assertEquals("cal.txt", filename);
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testZip() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.zip"));
unpackRunner.enqueue(dataPath.resolve("data.zip"));
Map<String, String> attributes = new HashMap<>(1);
attributes.put("mime.type", "application/zip");
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testInvalidZip() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("invalid_data.zip"));
unpackRunner.enqueue(dataPath.resolve("invalid_data.zip"));
Map<String, String> attributes = new HashMap<>(1);
attributes.put("mime.type", "application/zip");
autoUnpackRunner.enqueue(dataPath.resolve("invalid_data.zip"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("invalid_data.zip"), attributes);
unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_FAILURE);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
// final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(filename);
assertTrue(Files.exists(path));
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testZipEncryptionZipStandard() throws IOException {
runZipEncryptionMethod(EncryptionMethod.ZIP_STANDARD);
}
@Test
public void testZipEncryptionAes() throws IOException {
runZipEncryptionMethod(EncryptionMethod.AES);
}
@Test
public void testZipEncryptionNoPasswordConfigured() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
final String password = String.class.getSimpleName();
final char[] streamPassword = password.toCharArray();
final String contents = TestRunner.class.getCanonicalName();
final byte[] zipEncrypted = createZipEncrypted(EncryptionMethod.AES, streamPassword, contents);
runner.enqueue(zipEncrypted);
runner.run();
runner.assertTransferCount(UnpackContent.REL_FAILURE, 1);
}
@Test
public void testZipWithFilter() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/date.txt$");
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$");
unpackRunner.enqueue(dataPath.resolve("data.zip"));
unpackRunner.enqueue(dataPath.resolve("data.zip"));
Map<String, String> attributes = new HashMap<>(1);
attributes.put("mime.type", "application/zip");
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes);
unpackRunner.run(2);
autoUnpackRunner.run(2);
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
assertEquals("date.txt", filename);
flowFile.assertContentEquals(path.toFile());
}
unpacked = autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
assertEquals("cal.txt", filename);
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testFlowFileStreamV3() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString());
runner.enqueue(dataPath.resolve("data.flowfilev3"));
runner.enqueue(dataPath.resolve("data.flowfilev3"));
runner.run(2);
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testFlowFileStreamV2() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString());
runner.enqueue(dataPath.resolve("data.flowfilev2"));
runner.enqueue(dataPath.resolve("data.flowfilev2"));
runner.run(2);
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder = flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
assertTrue(Files.exists(path));
flowFile.assertContentEquals(path.toFile());
}
}
@Test
public void testTarThenMerge() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.tar"));
unpackRunner.run();
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
assertEquals(flowFile.getAttribute(UnpackContent.SEGMENT_ORIGINAL_FILENAME), "data");
}
final TestRunner mergeRunner = TestRunners.newTestRunner(new MergeContent());
mergeRunner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_TAR);
mergeRunner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
mergeRunner.setProperty(MergeContent.KEEP_PATH, "true");
mergeRunner.enqueue(unpacked.toArray(new MockFlowFile[0]));
mergeRunner.run();
mergeRunner.assertTransferCount(MergeContent.REL_MERGED, 1);
mergeRunner.assertTransferCount(MergeContent.REL_ORIGINAL, 2);
mergeRunner.assertTransferCount(MergeContent.REL_FAILURE, 0);
final List<MockFlowFile> packed = mergeRunner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
for (final MockFlowFile flowFile : packed) {
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "data.tar");
}
}
@Test
public void testZipThenMerge() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.zip"));
unpackRunner.run();
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID);
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
assertEquals(flowFile.getAttribute(UnpackContent.SEGMENT_ORIGINAL_FILENAME), "data");
}
final TestRunner mergeRunner = TestRunners.newTestRunner(new MergeContent());
mergeRunner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
mergeRunner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
mergeRunner.setProperty(MergeContent.KEEP_PATH, "true");
mergeRunner.enqueue(unpacked.toArray(new MockFlowFile[0]));
mergeRunner.run();
mergeRunner.assertTransferCount(MergeContent.REL_MERGED, 1);
mergeRunner.assertTransferCount(MergeContent.REL_ORIGINAL, 2);
mergeRunner.assertTransferCount(MergeContent.REL_FAILURE, 0);
final List<MockFlowFile> packed = mergeRunner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
for (final MockFlowFile flowFile : packed) {
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "data.zip");
}
}
@Test
public void testZipHandlesBadData() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.tar"));
unpackRunner.run();
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 1);
}
@Test
public void testTarHandlesBadData() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString());
unpackRunner.enqueue(dataPath.resolve("data.zip"));
unpackRunner.run();
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 0);
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 1);
}
/*
* This test checks for thread safety problems when PackageFormat.AUTO_DETECT_FORMAT is used.
* It won't always fail if there is a issue with the code, but it will fail often enough to eventually be noticed.
* If this test fails at all, then it needs to be investigated.
*/
@Test
public void testThreadSafetyUsingAutoDetect() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
Map<String, String> attrsTar = new HashMap<>(1);
Map<String, String> attrsFFv3 = new HashMap<>(1);
attrsTar.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType());
attrsFFv3.put("mime.type", UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.getMimeType());
int numThreads = 50;
runner.setThreadCount(numThreads);
for (int i=0; i<numThreads; i++) {
if (i%2 == 0) {
runner.enqueue(dataPath.resolve("data.tar"), attrsTar);
} else {
runner.enqueue(dataPath.resolve("data.flowfilev3"), attrsFFv3);
}
}
runner.run(numThreads);
runner.assertTransferCount(UnpackContent.REL_SUCCESS, numThreads*2);
}
private void runZipEncryptionMethod(final EncryptionMethod encryptionMethod) throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
final String password = String.class.getSimpleName();
runner.setProperty(UnpackContent.PASSWORD, password);
final char[] streamPassword = password.toCharArray();
final String contents = TestRunner.class.getCanonicalName();
final byte[] zipEncrypted = createZipEncrypted(encryptionMethod, streamPassword, contents);
runner.enqueue(zipEncrypted);
runner.run();
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
final MockFlowFile unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS).iterator().next();
unpacked.assertAttributeEquals(UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
unpacked.assertAttributeEquals(UnpackContent.FRAGMENT_INDEX, FIRST_FRAGMENT_INDEX);
final byte[] unpackedBytes = runner.getContentAsByteArray(unpacked);
final String unpackedContents = new String(unpackedBytes);
assertEquals("Unpacked Contents not matched", contents, unpackedContents);
}
private byte[] createZipEncrypted(final EncryptionMethod encryptionMethod, final char[] password, final String contents) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream, password);
final String name = UUID.randomUUID().toString();
final ZipParameters zipParameters = new ZipParameters();
zipParameters.setEncryptionMethod(encryptionMethod);
zipParameters.setEncryptFiles(true);
zipParameters.setFileNameInZip(name);
zipOutputStream.putNextEntry(zipParameters);
zipOutputStream.write(contents.getBytes());
zipOutputStream.closeEntry();
zipOutputStream.close();
return outputStream.toByteArray();
}
}