NIFI-8764 Refactored UnpackContent to use both Commons Compress and Zip4j
- UnpackContent uses Zip4j when configured with a password property
- UnpackContent uses Commons Compress when a password is not specified
NIFI-8764 Updated Password property description mentioning disabled algorithms
NIFI-8764 Adjusted Password property description
Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #5201
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 4beaa1f..1c7629c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -36,10 +36,14 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import net.lingala.zip4j.io.inputstream.ZipInputStream;
import net.lingala.zip4j.model.LocalFileHeader;
+import net.lingala.zip4j.model.enums.EncryptionMethod;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -67,7 +71,6 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.stream.io.StreamUtils;
@@ -75,7 +78,6 @@
import org.apache.nifi.util.FlowFileUnpackagerV1;
import org.apache.nifi.util.FlowFileUnpackagerV2;
import org.apache.nifi.util.FlowFileUnpackagerV3;
-import net.lingala.zip4j.io.inputstream.ZipInputStream;
@EventDriven
@SideEffectFree
@@ -154,7 +156,7 @@
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.displayName("Password")
- .description("Password used for decrypting archive entries. Supports Zip files encrypted with ZipCrypto or AES")
+ .description("Password used for decrypting Zip archives encrypted with ZipCrypto or AES. Configuring a password disables support for alternative Zip compression algorithms.")
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -239,7 +241,7 @@
packagingFormat = null;
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
if (mimeType == null) {
- logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile});
+ logger.error("No mime.type attribute set for {}; routing to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -250,7 +252,7 @@
}
}
if (packagingFormat == null) {
- logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
+ logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", flowFile, mimeType);
session.transfer(flowFile, REL_SUCCESS);
return;
}
@@ -291,7 +293,7 @@
try {
unpacker.unpack(session, flowFile, unpacked);
if (unpacked.isEmpty()) {
- logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile});
+ logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -304,20 +306,20 @@
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
session.transfer(flowFile, REL_ORIGINAL);
session.getProvenanceReporter().fork(flowFile, unpacked);
- logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
+ logger.info("Unpacked {} into {} and transferred to success", flowFile, unpacked);
} catch (final Exception e) {
- logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e});
+ logger.error("Unable to unpack {}; routing to failure", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
session.remove(unpacked);
}
}
private static abstract class Unpacker {
- private Pattern fileFilter = null;
+ protected Pattern fileFilter = null;
public Unpacker() {}
- public Unpacker(Pattern fileFilter) {
+ public Unpacker(final Pattern fileFilter) {
this.fileFilter = fileFilter;
}
@@ -340,54 +342,45 @@
@Override
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
final String fragmentId = UUID.randomUUID().toString();
- session.read(source, new InputStreamCallback() {
+ session.read(source, inputStream -> {
+ int fragmentCount = 0;
+ try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inputStream))) {
+ TarArchiveEntry tarEntry;
+ while ((tarEntry = tarIn.getNextTarEntry()) != null) {
+ if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
+ continue;
+ }
+ final File file = new File(tarEntry.getName());
+ final Path filePath = file.toPath();
+ String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
+ final Path absPath = filePath.toAbsolutePath();
+ final String absPathString = absPath.getParent().toString() + "/";
- @Override
- public void process(final InputStream in) throws IOException {
- int fragmentCount = 0;
- try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) {
- TarArchiveEntry tarEntry;
- while ((tarEntry = tarIn.getNextTarEntry()) != null) {
- if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
- continue;
- }
- final File file = new File(tarEntry.getName());
- final Path filePath = file.toPath();
- String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
- final Path absPath = filePath.toAbsolutePath();
- final String absPathString = absPath.getParent().toString() + "/";
+ FlowFile unpackedFile = session.create(source);
+ try {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+ attributes.put(CoreAttributes.PATH.key(), filePathString);
+ attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+ attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
- FlowFile unpackedFile = session.create(source);
- try {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), file.getName());
- attributes.put(CoreAttributes.PATH.key(), filePathString);
- attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
- attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+ attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
+ attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
+ attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
- attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
- attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
- attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
+ final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
+ attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
+ attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
- final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
- attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
- attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
+ attributes.put(FRAGMENT_ID, fragmentId);
+ attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
- attributes.put(FRAGMENT_ID, fragmentId);
- attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
+ unpackedFile = session.putAllAttributes(unpackedFile, attributes);
- unpackedFile = session.putAllAttributes(unpackedFile, attributes);
-
- final long fileSize = tarEntry.getSize();
- unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- StreamUtils.copy(tarIn, out, fileSize);
- }
- });
- } finally {
- unpacked.add(unpackedFile);
- }
+ final long fileSize = tarEntry.getSize();
+ unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize));
+ } finally {
+ unpacked.add(unpackedFile);
}
}
}
@@ -396,7 +389,7 @@
}
private static class ZipUnpacker extends Unpacker {
- private char[] password;
+ private final char[] password;
public ZipUnpacker(final Pattern fileFilter, final char[] password) {
super(fileFilter);
@@ -406,50 +399,118 @@
@Override
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
final String fragmentId = UUID.randomUUID().toString();
- session.read(source, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- int fragmentCount = 0;
- try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(in), password)) {
- LocalFileHeader zipEntry;
- while ((zipEntry = zipIn.getNextEntry()) != null) {
- final String zipEntryName = zipEntry.getFileName();
- if (zipEntry.isDirectory() || !fileMatches(zipEntryName)) {
- continue;
- }
- final File file = new File(zipEntryName);
- final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
- final Path absPath = file.toPath().toAbsolutePath();
- final String absPathString = absPath.getParent().toString() + "/";
+ if (password == null) {
+ session.read(source, new CompressedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId));
+ } else {
+ session.read(source, new EncryptedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, password));
+ }
+ }
- FlowFile unpackedFile = session.create(source);
- try {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), file.getName());
- attributes.put(CoreAttributes.PATH.key(), parentDirectory);
- attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
- attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+ private abstract static class ZipInputStreamCallback implements InputStreamCallback {
+ private static final String PATH_SEPARATOR = "/";
- attributes.put(FRAGMENT_ID, fragmentId);
- attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
+ private final Pattern fileFilter;
- final String encryptionMethod = zipEntry.getEncryptionMethod().toString();
- attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod);
+ private final ProcessSession session;
- unpackedFile = session.putAllAttributes(unpackedFile, attributes);
- unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- StreamUtils.copy(zipIn, out);
- }
- });
- } finally {
- unpacked.add(unpackedFile);
- }
- }
+ private final FlowFile sourceFlowFile;
+
+ private final List<FlowFile> unpacked;
+
+ private final String fragmentId;
+
+ private int fragmentIndex;
+
+ private ZipInputStreamCallback(
+ final Pattern fileFilter,
+ final ProcessSession session,
+ final FlowFile sourceFlowFile,
+ final List<FlowFile> unpacked,
+ final String fragmentId
+ ) {
+ this.fileFilter = fileFilter;
+ this.session = session;
+ this.sourceFlowFile = sourceFlowFile;
+ this.unpacked = unpacked;
+ this.fragmentId = fragmentId;
+ }
+
+ protected boolean isFileEntryMatched(final boolean directory, final String fileName) {
+ return !directory && (fileFilter == null || fileFilter.matcher(fileName).find());
+ }
+
+ protected void processEntry(final InputStream zipInputStream, final boolean directory, final String zipEntryName, final EncryptionMethod encryptionMethod) {
+ if (isFileEntryMatched(directory, zipEntryName)) {
+ final File file = new File(zipEntryName);
+ final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent();
+
+ FlowFile unpackedFile = session.create(sourceFlowFile);
+ try {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+ attributes.put(CoreAttributes.PATH.key(), parentDirectory);
+ attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), file.toPath().toAbsolutePath() + PATH_SEPARATOR);
+ attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+ attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
+
+ attributes.put(FRAGMENT_ID, fragmentId);
+ attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex));
+
+ unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+ unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream));
+ } finally {
+ unpacked.add(unpackedFile);
}
}
- });
+ }
+ }
+
+ private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback {
+ private CompressedZipInputStreamCallback(
+ final Pattern fileFilter,
+ final ProcessSession session,
+ final FlowFile sourceFlowFile,
+ final List<FlowFile> unpacked,
+ final String fragmentId
+ ) {
+ super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
+ }
+
+ @Override
+ public void process(final InputStream inputStream) throws IOException {
+ try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream))) {
+ ZipArchiveEntry zipEntry;
+ while ((zipEntry = zipInputStream.getNextZipEntry()) != null) {
+ processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE);
+ }
+ }
+ }
+ }
+
+ private static class EncryptedZipInputStreamCallback extends ZipInputStreamCallback {
+ private final char[] password;
+
+ private EncryptedZipInputStreamCallback(
+ final Pattern fileFilter,
+ final ProcessSession session,
+ final FlowFile sourceFlowFile,
+ final List<FlowFile> unpacked,
+ final String fragmentId,
+ final char[] password
+ ) {
+ super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
+ this.password = password;
+ }
+
+ @Override
+ public void process(final InputStream inputStream) throws IOException {
+ try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password)) {
+ LocalFileHeader zipEntry;
+ while ((zipEntry = zipInputStream.getNextEntry()) != null) {
+ processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod());
+ }
+ }
+ }
}
}
@@ -463,48 +524,42 @@
@Override
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
- session.read(source, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws IOException {
- try (final InputStream in = new BufferedInputStream(rawIn)) {
- while (unpackager.hasMoreData()) {
- final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
- FlowFile unpackedFile = session.create(source);
- try {
- unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws IOException {
- try (final OutputStream out = new BufferedOutputStream(rawOut)) {
- final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
- if (attributes == null) {
- throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
- }
- attributesRef.set(attributes);
- }
+ session.read(source, inputStream -> {
+ try (final InputStream in = new BufferedInputStream(inputStream)) {
+ while (unpackager.hasMoreData()) {
+ final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
+ FlowFile unpackedFile = session.create(source);
+ try {
+ unpackedFile = session.write(unpackedFile, outputStream -> {
+ try (final OutputStream out = new BufferedOutputStream(outputStream)) {
+ final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
+ if (attributes == null) {
+ throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
}
- });
-
- final Map<String, String> attributes = attributesRef.get();
-
- // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
- // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
- // and later unpack it -- in this case, we have two FlowFiles with the same UUID.
- attributes.remove(CoreAttributes.UUID.key());
-
- // maintain backward compatibility with legacy NiFi attribute names
- mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
- mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
- mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
- mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
-
- if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
- attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+ attributesRef.set(attributes);
}
+ });
- unpackedFile = session.putAllAttributes(unpackedFile, attributes);
- } finally {
- unpacked.add(unpackedFile);
+ final Map<String, String> attributes = attributesRef.get();
+
+ // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
+ // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
+ // and later unpack it -- in this case, we have two FlowFiles with the same UUID.
+ attributes.remove(CoreAttributes.UUID.key());
+
+ // maintain backward compatibility with legacy NiFi attribute names
+ mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
+ mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
+ mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
+ mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
+
+ if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
}
+
+ unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+ } finally {
+ unpacked.add(unpackedFile);
}
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index f14759f..f545f07 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -46,6 +46,8 @@
public class TestUnpackContent {
+ private static final String FIRST_FRAGMENT_INDEX = "1";
+
private static final Path dataPath = Paths.get("src/test/resources/TestUnpackContent");
@Test
@@ -501,6 +503,7 @@
final MockFlowFile unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS).iterator().next();
unpacked.assertAttributeEquals(UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
+ unpacked.assertAttributeEquals(UnpackContent.FRAGMENT_INDEX, FIRST_FRAGMENT_INDEX);
final byte[] unpackedBytes = runner.getContentAsByteArray(unpacked);
final String unpackedContents = new String(unpackedBytes);