NIFI-9328: Transfer cleanup and reuse added to FetchFileTransfer in case of FileNotFound and PermissionDenied exceptions.
- Corrects connection handling in FetchFTP and FetchSFTP
This closes #5478
Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index e7ea491..2d38116 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -254,31 +254,30 @@
transfer = transferWrapper.getFileTransfer();
}
- boolean closeConnection = false;
try {
// Pull data from remote system.
try {
flowFile = transfer.getRemoteFile(filename, flowFile, session);
} catch (final FileNotFoundException e) {
- closeConnection = false;
getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
- new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
+ flowFile, filename, host, REL_NOT_FOUND.getName());
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
+ cleanupTransfer(transfer, false, transferQueue, host, port);
return;
} catch (final PermissionDeniedException e) {
- closeConnection = false;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
- new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
+ flowFile, filename, host, REL_PERMISSION_DENIED.getName());
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
+ cleanupTransfer(transfer, false, transferQueue, host, port);
return;
} catch (final ProcessException | IOException e) {
- closeConnection = true;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
new Object[]{flowFile, filename, host, port, e.toString()}, e);
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
+ cleanupTransfer(transfer, true, transferQueue, host, port);
return;
}
@@ -306,22 +305,17 @@
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
// result in data loss! If we commit the session first, we are safe.
- final boolean close = closeConnection;
final BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
- final Runnable cleanupTask = () -> cleanupTransfer(transfer, close, queue, host, port);
+ final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port);
final FlowFile flowFileReceived = flowFile;
session.commitAsync(() -> {
performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port);
cleanupTask.run();
- }, t -> {
- cleanupTask.run();
- });
+ }, t -> cleanupTask.run());
} catch (final Throwable t) {
getLogger().error("Failed to fetch file", t);
- if (transfer != null) {
- cleanupTransfer(transfer, closeConnection, transferQueue, host, port);
- }
+ cleanupTransfer(transfer, true, transferQueue, host, port);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
index 34c26c0..1ee337d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
@@ -91,7 +91,7 @@
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
- assertFalse(proc.closed);
+ assertFalse(proc.isClosed);
runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
}
@@ -101,7 +101,7 @@
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
- assertFalse(proc.closed);
+ assertFalse(proc.isClosed);
MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0);
transferredFlowFile.assertContentEquals("world");
transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key());
@@ -138,6 +138,42 @@
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
}
+ @Test
+ public void testInsufficientPermissionsDoesNotCloseConnection() {
+ addFileAndEnqueue("hello1.txt");
+ addFileAndEnqueue("hello2.txt");
+ proc.allowAccess = false;
+
+ runner.run(2, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2);
+
+ assertEquals(1, proc.numberOfFileTransfers);
+ assertFalse(proc.isClosed);
+ }
+
+ @Test
+ public void testFileNotFoundDoesNotCloseConnection() {
+ addFileAndEnqueue("hello1.txt");
+ addFileAndEnqueue("hello2.txt");
+ proc.isFileNotFound = true;
+
+ runner.run(2, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2);
+
+ assertEquals(1, proc.numberOfFileTransfers);
+ assertFalse(proc.isClosed);
+ }
+
+ @Test
+ public void testCommunicationFailureClosesConnection() {
+ addFileAndEnqueue("hello.txt");
+ proc.isCommFailure = true;
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1);
+
+ assertTrue(proc.isClosed);
+ }
@Test
public void testMoveFileWithNoTrailingSlashDirName() {
@@ -230,7 +266,10 @@
private boolean allowDelete = true;
private boolean allowCreateDir = true;
private boolean allowRename = true;
- private boolean closed = false;
+ private boolean isClosed = false;
+ private boolean isFileNotFound = false;
+ private boolean isCommFailure = false;
+ private int numberOfFileTransfers = 0;
private final Map<String, byte[]> fileContents = new HashMap<>();
private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class);
@@ -254,6 +293,7 @@
@Override
protected FileTransfer createFileTransfer(final ProcessContext context) {
+ numberOfFileTransfers++;
return new FTPTransfer(context, getLogger()) {
@Override
@@ -266,7 +306,12 @@
if (!allowAccess) {
throw new PermissionDeniedException("test permission denied");
}
-
+ if (isFileNotFound) {
+ throw new FileNotFoundException("test file not found");
+ }
+ if (isCommFailure) {
+ throw new IOException("test communication failure");
+ }
return super.getRemoteFile(remoteFileName, flowFile, session);
}
@@ -303,6 +348,12 @@
throw new PermissionDeniedException("test permission denied");
}
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ isClosed = true;
+ }
};
}
}