NIFI-8764 Refactored UnpackContent to use both Commons Compress and Zip4j

- UnpackContent uses Zip4j when configured with a password property
- UnpackContent uses Commons Compress when a password is not specified

NIFI-8764 Updated Password property description mentioning disabled algorithms

NIFI-8764 Adjusted Password property description

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5201
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 4beaa1f..1c7629c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -36,10 +36,14 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import net.lingala.zip4j.io.inputstream.ZipInputStream;
 import net.lingala.zip4j.model.LocalFileHeader;
+import net.lingala.zip4j.model.enums.EncryptionMethod;
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -67,7 +71,6 @@
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -75,7 +78,6 @@
 import org.apache.nifi.util.FlowFileUnpackagerV1;
 import org.apache.nifi.util.FlowFileUnpackagerV2;
 import org.apache.nifi.util.FlowFileUnpackagerV3;
-import net.lingala.zip4j.io.inputstream.ZipInputStream;
 
 @EventDriven
 @SideEffectFree
@@ -154,7 +156,7 @@
     public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
             .name("Password")
             .displayName("Password")
-            .description("Password used for decrypting archive entries. Supports Zip files encrypted with ZipCrypto or AES")
+            .description("Password used for decrypting Zip archives encrypted with ZipCrypto or AES. Configuring a password disables support for alternative Zip compression algorithms.")
             .required(false)
             .sensitive(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -239,7 +241,7 @@
             packagingFormat = null;
             final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
             if (mimeType == null) {
-                logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile});
+                logger.error("No mime.type attribute set for {}; routing to failure", flowFile);
                 session.transfer(flowFile, REL_FAILURE);
                 return;
             }
@@ -250,7 +252,7 @@
                 }
             }
             if (packagingFormat == null) {
-                logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
+                logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", flowFile, mimeType);
                 session.transfer(flowFile, REL_SUCCESS);
                 return;
             }
@@ -291,7 +293,7 @@
         try {
             unpacker.unpack(session, flowFile, unpacked);
             if (unpacked.isEmpty()) {
-                logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile});
+                logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", flowFile);
                 session.transfer(flowFile, REL_FAILURE);
                 return;
             }
@@ -304,20 +306,20 @@
             flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
             session.transfer(flowFile, REL_ORIGINAL);
             session.getProvenanceReporter().fork(flowFile, unpacked);
-            logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
+            logger.info("Unpacked {} into {} and transferred to success", flowFile, unpacked);
         } catch (final Exception e) {
-            logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e});
+            logger.error("Unable to unpack {}; routing to failure", flowFile, e);
             session.transfer(flowFile, REL_FAILURE);
             session.remove(unpacked);
         }
     }
 
     private static abstract class Unpacker {
-        private Pattern fileFilter = null;
+        protected Pattern fileFilter = null;
 
         public Unpacker() {}
 
-        public Unpacker(Pattern fileFilter) {
+        public Unpacker(final Pattern fileFilter) {
             this.fileFilter = fileFilter;
         }
 
@@ -340,54 +342,45 @@
         @Override
         public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
             final String fragmentId = UUID.randomUUID().toString();
-            session.read(source, new InputStreamCallback() {
+            session.read(source, inputStream -> {
+                int fragmentCount = 0;
+                try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inputStream))) {
+                    TarArchiveEntry tarEntry;
+                    while ((tarEntry = tarIn.getNextTarEntry()) != null) {
+                        if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
+                            continue;
+                        }
+                        final File file = new File(tarEntry.getName());
+                        final Path filePath = file.toPath();
+                        String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
+                        final Path absPath = filePath.toAbsolutePath();
+                        final String absPathString = absPath.getParent().toString() + "/";
 
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    int fragmentCount = 0;
-                    try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) {
-                        TarArchiveEntry tarEntry;
-                        while ((tarEntry = tarIn.getNextTarEntry()) != null) {
-                            if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
-                                continue;
-                            }
-                            final File file = new File(tarEntry.getName());
-                            final Path filePath = file.toPath();
-                            String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
-                            final Path absPath = filePath.toAbsolutePath();
-                            final String absPathString = absPath.getParent().toString() + "/";
+                        FlowFile unpackedFile = session.create(source);
+                        try {
+                            final Map<String, String> attributes = new HashMap<>();
+                            attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+                            attributes.put(CoreAttributes.PATH.key(), filePathString);
+                            attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+                            attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
 
-                            FlowFile unpackedFile = session.create(source);
-                            try {
-                                final Map<String, String> attributes = new HashMap<>();
-                                attributes.put(CoreAttributes.FILENAME.key(), file.getName());
-                                attributes.put(CoreAttributes.PATH.key(), filePathString);
-                                attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
-                                attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+                            attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
+                            attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
+                            attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
 
-                                attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
-                                attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
-                                attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
+                            final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
+                            attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
+                            attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
 
-                                final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
-                                attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
-                                attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
+                            attributes.put(FRAGMENT_ID, fragmentId);
+                            attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
 
-                                attributes.put(FRAGMENT_ID, fragmentId);
-                                attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
+                            unpackedFile = session.putAllAttributes(unpackedFile, attributes);
 
-                                unpackedFile = session.putAllAttributes(unpackedFile, attributes);
-
-                                final long fileSize = tarEntry.getSize();
-                                unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
-                                    @Override
-                                    public void process(final OutputStream out) throws IOException {
-                                        StreamUtils.copy(tarIn, out, fileSize);
-                                    }
-                                });
-                            } finally {
-                                unpacked.add(unpackedFile);
-                            }
+                            final long fileSize = tarEntry.getSize();
+                            unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize));
+                        } finally {
+                            unpacked.add(unpackedFile);
                         }
                     }
                 }
@@ -396,7 +389,7 @@
     }
 
     private static class ZipUnpacker extends Unpacker {
-        private char[] password;
+        private final char[] password;
 
         public ZipUnpacker(final Pattern fileFilter, final char[] password) {
             super(fileFilter);
@@ -406,50 +399,118 @@
         @Override
         public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
             final String fragmentId = UUID.randomUUID().toString();
-            session.read(source, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    int fragmentCount = 0;
-                    try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(in), password)) {
-                        LocalFileHeader zipEntry;
-                        while ((zipEntry = zipIn.getNextEntry()) != null) {
-                            final String zipEntryName = zipEntry.getFileName();
-                            if (zipEntry.isDirectory() || !fileMatches(zipEntryName)) {
-                                continue;
-                            }
-                            final File file = new File(zipEntryName);
-                            final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
-                            final Path absPath = file.toPath().toAbsolutePath();
-                            final String absPathString = absPath.getParent().toString() + "/";
+            if (password == null) {
+                session.read(source, new CompressedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId));
+            } else {
+                session.read(source, new EncryptedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, password));
+            }
+        }
 
-                            FlowFile unpackedFile = session.create(source);
-                            try {
-                                final Map<String, String> attributes = new HashMap<>();
-                                attributes.put(CoreAttributes.FILENAME.key(), file.getName());
-                                attributes.put(CoreAttributes.PATH.key(), parentDirectory);
-                                attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
-                                attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+        private abstract static class ZipInputStreamCallback implements InputStreamCallback {
+            private static final String PATH_SEPARATOR = "/";
 
-                                attributes.put(FRAGMENT_ID, fragmentId);
-                                attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
+            private final Pattern fileFilter;
 
-                                final String encryptionMethod = zipEntry.getEncryptionMethod().toString();
-                                attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod);
+            private final ProcessSession session;
 
-                                unpackedFile = session.putAllAttributes(unpackedFile, attributes);
-                                unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
-                                    @Override
-                                    public void process(final OutputStream out) throws IOException {
-                                        StreamUtils.copy(zipIn, out);
-                                    }
-                                });
-                            } finally {
-                                unpacked.add(unpackedFile);
-                            }
-                        }
+            private final FlowFile sourceFlowFile;
+
+            private final List<FlowFile> unpacked;
+
+            private final String fragmentId;
+
+            private int fragmentIndex;
+
+            private ZipInputStreamCallback(
+                    final Pattern fileFilter,
+                    final ProcessSession session,
+                    final FlowFile sourceFlowFile,
+                    final List<FlowFile> unpacked,
+                    final String fragmentId
+            ) {
+                this.fileFilter = fileFilter;
+                this.session = session;
+                this.sourceFlowFile = sourceFlowFile;
+                this.unpacked = unpacked;
+                this.fragmentId = fragmentId;
+            }
+
+            protected boolean isFileEntryMatched(final boolean directory, final String fileName) {
+                return !directory && (fileFilter == null || fileFilter.matcher(fileName).find());
+            }
+
+            protected void processEntry(final InputStream zipInputStream, final boolean directory, final String zipEntryName, final EncryptionMethod encryptionMethod) {
+                if (isFileEntryMatched(directory, zipEntryName)) {
+                    final File file = new File(zipEntryName);
+                    final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent();
+
+                    FlowFile unpackedFile = session.create(sourceFlowFile);
+                    try {
+                        final Map<String, String> attributes = new HashMap<>();
+                        attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+                        attributes.put(CoreAttributes.PATH.key(), parentDirectory);
+                        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), file.toPath().toAbsolutePath() + PATH_SEPARATOR);
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+                        attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
+
+                        attributes.put(FRAGMENT_ID, fragmentId);
+                        attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex));
+
+                        unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+                        unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream));
+                    } finally {
+                        unpacked.add(unpackedFile);
                     }
                 }
-            });
+            }
+        }
+
+        private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback {
+            private CompressedZipInputStreamCallback(
+                    final Pattern fileFilter,
+                    final ProcessSession session,
+                    final FlowFile sourceFlowFile,
+                    final List<FlowFile> unpacked,
+                    final String fragmentId
+            ) {
+                super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
+            }
+
+            @Override
+            public void process(final InputStream inputStream) throws IOException {
+                try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream))) {
+                    ZipArchiveEntry zipEntry;
+                    while ((zipEntry = zipInputStream.getNextZipEntry()) != null) {
+                        processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE);
+                    }
+                }
+            }
+        }
+
+        private static class EncryptedZipInputStreamCallback extends ZipInputStreamCallback {
+            private final char[] password;
+
+            private EncryptedZipInputStreamCallback(
+                    final Pattern fileFilter,
+                    final ProcessSession session,
+                    final FlowFile sourceFlowFile,
+                    final List<FlowFile> unpacked,
+                    final String fragmentId,
+                    final char[] password
+            ) {
+                super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
+                this.password = password;
+            }
+
+            @Override
+            public void process(final InputStream inputStream) throws IOException {
+                try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password)) {
+                    LocalFileHeader zipEntry;
+                    while ((zipEntry = zipInputStream.getNextEntry()) != null) {
+                        processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod());
+                    }
+                }
+            }
         }
     }
 
@@ -463,48 +524,42 @@
 
         @Override
         public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
-            session.read(source, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream rawIn) throws IOException {
-                    try (final InputStream in = new BufferedInputStream(rawIn)) {
-                        while (unpackager.hasMoreData()) {
-                            final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
-                            FlowFile unpackedFile = session.create(source);
-                            try {
-                                unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
-                                    @Override
-                                    public void process(final OutputStream rawOut) throws IOException {
-                                        try (final OutputStream out = new BufferedOutputStream(rawOut)) {
-                                            final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
-                                            if (attributes == null) {
-                                                throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
-                                            }
-                                            attributesRef.set(attributes);
-                                        }
+            session.read(source, inputStream -> {
+                try (final InputStream in = new BufferedInputStream(inputStream)) {
+                    while (unpackager.hasMoreData()) {
+                        final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
+                        FlowFile unpackedFile = session.create(source);
+                        try {
+                            unpackedFile = session.write(unpackedFile, outputStream -> {
+                                try (final OutputStream out = new BufferedOutputStream(outputStream)) {
+                                    final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
+                                    if (attributes == null) {
+                                        throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
                                     }
-                                });
-
-                                final Map<String, String> attributes = attributesRef.get();
-
-                                // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
-                                // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
-                                // and later unpack it -- in this case, we have two FlowFiles with the same UUID.
-                                attributes.remove(CoreAttributes.UUID.key());
-
-                                // maintain backward compatibility with legacy NiFi attribute names
-                                mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
-                                mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
-                                mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
-                                mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
-
-                                if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
-                                    attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+                                    attributesRef.set(attributes);
                                 }
+                            });
 
-                                unpackedFile = session.putAllAttributes(unpackedFile, attributes);
-                            } finally {
-                                unpacked.add(unpackedFile);
+                            final Map<String, String> attributes = attributesRef.get();
+
+                            // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
+                            // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
+                            // and later unpack it -- in this case, we have two FlowFiles with the same UUID.
+                            attributes.remove(CoreAttributes.UUID.key());
+
+                            // maintain backward compatibility with legacy NiFi attribute names
+                            mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
+                            mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
+                            mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
+                            mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
+
+                            if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
                             }
+
+                            unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+                        } finally {
+                            unpacked.add(unpackedFile);
                         }
                     }
                 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index f14759f..f545f07 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -46,6 +46,8 @@
 
 public class TestUnpackContent {
 
+    private static final String FIRST_FRAGMENT_INDEX = "1";
+
     private static final Path dataPath = Paths.get("src/test/resources/TestUnpackContent");
 
     @Test
@@ -501,6 +503,7 @@
 
         final MockFlowFile unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS).iterator().next();
         unpacked.assertAttributeEquals(UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
+        unpacked.assertAttributeEquals(UnpackContent.FRAGMENT_INDEX, FIRST_FRAGMENT_INDEX);
 
         final byte[] unpackedBytes = runner.getContentAsByteArray(unpacked);
         final String unpackedContents = new String(unpackedBytes);