NIFI-7443 Corrected SFTP Keep Alive behavior

- Configured SSHJ Keep Alive Interval of 5 seconds
- Updated Send Keep Alive On Timeout property description

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5223.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index ee12c82..fd77991 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -90,6 +90,8 @@
 import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
 
 public class SFTPTransfer implements FileTransfer {
+    private static final int KEEP_ALIVE_INTERVAL_SECONDS = 5;
+
     private static final Set<String> DEFAULT_KEY_ALGORITHM_NAMES;
     private static final Set<String> DEFAULT_CIPHER_NAMES;
     private static final Set<String> DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES;
@@ -162,7 +164,7 @@
         .build();
     public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
         .name("Send Keep Alive On Timeout")
-        .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+        .description("Send a Keep Alive message every 5 seconds up to 5 times for an overall timeout of 25 seconds.")
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
@@ -570,6 +572,20 @@
         }
     };
 
+    private static final KeepAliveProvider DEFAULT_KEEP_ALIVE_PROVIDER = new KeepAliveProvider() {
+        @Override
+        public KeepAlive provide(final ConnectionImpl connection) {
+            final KeepAlive keepAlive = KeepAliveProvider.KEEP_ALIVE.provide(connection);
+            keepAlive.setKeepAliveInterval(KEEP_ALIVE_INTERVAL_SECONDS);
+            return keepAlive;
+        }
+    };
+
+    protected KeepAliveProvider getKeepAliveProvider() {
+        final boolean useKeepAliveOnTimeout = ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
+        return useKeepAliveOnTimeout ? DEFAULT_KEEP_ALIVE_PROVIDER : NO_OP_KEEP_ALIVE;
+    }
+
     protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException {
         // If the client is already initialized then compare the host that the client is connected to with the current
         // host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse
@@ -586,16 +602,8 @@
         }
 
         // Initialize a new SSHClient...
-
-        // If use keep-alive is set then set the provider which sends max of 5 keep-alives, otherwise set the no-op provider
         final DefaultConfig sshClientConfig = new DefaultConfig();
-        final boolean useKeepAliveOnTimeout = ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
-        if (useKeepAliveOnTimeout) {
-            sshClientConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
-        } else {
-            sshClientConfig.setKeepAliveProvider(NO_OP_KEEP_ALIVE);
-        }
-
+        sshClientConfig.setKeepAliveProvider(getKeepAliveProvider());
         updateConfigAlgorithms(sshClientConfig);
 
         final SSHClient sshClient = new SSHClient(sshClientConfig);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index f164487..d8fa728 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,12 +16,16 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import net.schmizz.keepalive.KeepAlive;
+import net.schmizz.keepalive.KeepAliveProvider;
 import net.schmizz.sshj.SSHClient;
 import net.schmizz.sshj.DefaultConfig;
 import net.schmizz.sshj.common.Factory;
+import net.schmizz.sshj.connection.ConnectionImpl;
 import net.schmizz.sshj.sftp.Response;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.Transport;
 import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
 import net.schmizz.sshj.userauth.method.AuthMethod;
 import net.schmizz.sshj.userauth.method.AuthPassword;
@@ -50,6 +54,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -331,4 +336,35 @@
         final SSHClient sshClient = new SSHClient();
         assertThrows(ProcessException.class, () -> sftpTransfer.getAuthMethods(sshClient, null));
     }
+
+    @Test
+    public void testGetKeepAliveProviderEnabled() {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new MockPropertyValue(Boolean.TRUE.toString()));
+
+        final KeepAlive keepAlive = getKeepAlive(processContext);
+        assertNotSame("Keep Alive Interval not configured", 0, keepAlive.getKeepAliveInterval());
+    }
+
+    @Test
+    public void testGetKeepAliveProviderDisabled() {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new MockPropertyValue(Boolean.FALSE.toString()));
+
+        final KeepAlive keepAlive = getKeepAlive(processContext);
+        assertEquals("Keep Alive Interval configured", 0, keepAlive.getKeepAliveInterval());
+    }
+
+    private KeepAlive getKeepAlive(final ProcessContext processContext) {
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
+
+        final Transport transport = mock(Transport.class);
+        when(transport.getConfig()).thenReturn(new DefaultConfig());
+        final KeepAliveProvider mockKeepAliveProvider = mock(KeepAliveProvider.class);
+        final ConnectionImpl connection = new ConnectionImpl(transport, mockKeepAliveProvider);
+
+        final KeepAliveProvider keepAliveProvider = sftpTransfer.getKeepAliveProvider();
+        return keepAliveProvider.provide(connection);
+    }
 }