NIFI-9328: Transfer cleanup and reuse added to FetchFileTransfer in case of FileNotFound and PermissionDenied exceptions.

- Corrects connection handling in FetchFTP and FetchSFTP

This closes #5478

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index e7ea491..2d38116 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -254,31 +254,30 @@
             transfer = transferWrapper.getFileTransfer();
         }
 
-        boolean closeConnection = false;
         try {
             // Pull data from remote system.
             try {
                 flowFile = transfer.getRemoteFile(filename, flowFile, session);
 
             } catch (final FileNotFoundException e) {
-                closeConnection = false;
                 getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
-                        new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
+                        flowFile, filename, host, REL_NOT_FOUND.getName());
                 session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
                 session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
+                cleanupTransfer(transfer, false, transferQueue, host, port);
                 return;
             } catch (final PermissionDeniedException e) {
-                closeConnection = false;
                 getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
-                        new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
+                        flowFile, filename, host, REL_PERMISSION_DENIED.getName());
                 session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
                 session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
+                cleanupTransfer(transfer, false, transferQueue, host, port);
                 return;
             } catch (final ProcessException | IOException e) {
-                closeConnection = true;
                 getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
                         new Object[]{flowFile, filename, host, port, e.toString()}, e);
                 session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
+                cleanupTransfer(transfer, true, transferQueue, host, port);
                 return;
             }
 
@@ -306,22 +305,17 @@
             // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
             // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
             // result in data loss! If we commit the session first, we are safe.
-            final boolean close = closeConnection;
             final BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
-            final Runnable cleanupTask = () -> cleanupTransfer(transfer, close, queue, host, port);
+            final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port);
 
             final FlowFile flowFileReceived = flowFile;
             session.commitAsync(() -> {
                 performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port);
                 cleanupTask.run();
-            }, t -> {
-                cleanupTask.run();
-            });
+            }, t -> cleanupTask.run());
         } catch (final Throwable t) {
             getLogger().error("Failed to fetch file", t);
-            if (transfer != null) {
-                cleanupTransfer(transfer, closeConnection, transferQueue, host, port);
-            }
+            cleanupTransfer(transfer, true, transferQueue, host, port);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
index 34c26c0..1ee337d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
@@ -91,7 +91,7 @@
 
         runner.run(1, false, false);
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
-        assertFalse(proc.closed);
+        assertFalse(proc.isClosed);
         runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
     }
 
@@ -101,7 +101,7 @@
 
         runner.run(1, false, false);
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
-        assertFalse(proc.closed);
+        assertFalse(proc.isClosed);
         MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0);
         transferredFlowFile.assertContentEquals("world");
         transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key());
@@ -138,6 +138,42 @@
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
     }
 
+    @Test
+    public void testInsufficientPermissionsDoesNotCloseConnection() {
+        addFileAndEnqueue("hello1.txt");
+        addFileAndEnqueue("hello2.txt");
+        proc.allowAccess = false;
+
+        runner.run(2, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2);
+
+        assertEquals(1, proc.numberOfFileTransfers);
+        assertFalse(proc.isClosed);
+    }
+
+    @Test
+    public void testFileNotFoundDoesNotCloseConnection() {
+        addFileAndEnqueue("hello1.txt");
+        addFileAndEnqueue("hello2.txt");
+        proc.isFileNotFound = true;
+
+        runner.run(2, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2);
+
+        assertEquals(1, proc.numberOfFileTransfers);
+        assertFalse(proc.isClosed);
+    }
+
+    @Test
+    public void testCommunicationFailureClosesConnection() {
+        addFileAndEnqueue("hello.txt");
+        proc.isCommFailure = true;
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1);
+
+        assertTrue(proc.isClosed);
+    }
 
     @Test
     public void testMoveFileWithNoTrailingSlashDirName() {
@@ -230,7 +266,10 @@
         private boolean allowDelete = true;
         private boolean allowCreateDir = true;
         private boolean allowRename = true;
-        private boolean closed = false;
+        private boolean isClosed = false;
+        private boolean isFileNotFound = false;
+        private boolean isCommFailure = false;
+        private int numberOfFileTransfers = 0;
         private final Map<String, byte[]> fileContents = new HashMap<>();
         private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class);
 
@@ -254,6 +293,7 @@
 
         @Override
         protected FileTransfer createFileTransfer(final ProcessContext context) {
+            numberOfFileTransfers++;
             return new FTPTransfer(context, getLogger()) {
 
                 @Override
@@ -266,7 +306,12 @@
                     if (!allowAccess) {
                         throw new PermissionDeniedException("test permission denied");
                     }
-
+                    if (isFileNotFound) {
+                        throw new FileNotFoundException("test file not found");
+                    }
+                    if (isCommFailure) {
+                        throw new IOException("test communication failure");
+                    }
                     return super.getRemoteFile(remoteFileName, flowFile, session);
                 }
 
@@ -303,6 +348,12 @@
                         throw new PermissionDeniedException("test permission denied");
                     }
                 }
+
+                @Override
+                public void close() throws IOException {
+                    super.close();
+                    isClosed = true;
+                }
             };
         }
     }