NIFI-7443 Corrected SFTP Keep Alive behavior
- Configured SSHJ Keep Alive Interval of 5 seconds
- Updated Send Keep Alive On Timeout property description
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5223.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index ee12c82..fd77991 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -90,6 +90,8 @@
import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
public class SFTPTransfer implements FileTransfer {
+ private static final int KEEP_ALIVE_INTERVAL_SECONDS = 5;
+
private static final Set<String> DEFAULT_KEY_ALGORITHM_NAMES;
private static final Set<String> DEFAULT_CIPHER_NAMES;
private static final Set<String> DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES;
@@ -162,7 +164,7 @@
.build();
public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
.name("Send Keep Alive On Timeout")
- .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+ .description("Send a Keep Alive message every 5 seconds up to 5 times for an overall timeout of 25 seconds.")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
@@ -570,6 +572,20 @@
}
};
+ private static final KeepAliveProvider DEFAULT_KEEP_ALIVE_PROVIDER = new KeepAliveProvider() {
+ @Override
+ public KeepAlive provide(final ConnectionImpl connection) {
+ final KeepAlive keepAlive = KeepAliveProvider.KEEP_ALIVE.provide(connection);
+ keepAlive.setKeepAliveInterval(KEEP_ALIVE_INTERVAL_SECONDS);
+ return keepAlive;
+ }
+ };
+
+ protected KeepAliveProvider getKeepAliveProvider() {
+ final boolean useKeepAliveOnTimeout = ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
+ return useKeepAliveOnTimeout ? DEFAULT_KEEP_ALIVE_PROVIDER : NO_OP_KEEP_ALIVE;
+ }
+
protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException {
// If the client is already initialized then compare the host that the client is connected to with the current
// host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse
@@ -586,16 +602,8 @@
}
// Initialize a new SSHClient...
-
- // If use keep-alive is set then set the provider which sends max of 5 keep-alives, otherwise set the no-op provider
final DefaultConfig sshClientConfig = new DefaultConfig();
- final boolean useKeepAliveOnTimeout = ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
- if (useKeepAliveOnTimeout) {
- sshClientConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
- } else {
- sshClientConfig.setKeepAliveProvider(NO_OP_KEEP_ALIVE);
- }
-
+ sshClientConfig.setKeepAliveProvider(getKeepAliveProvider());
updateConfigAlgorithms(sshClientConfig);
final SSHClient sshClient = new SSHClient(sshClientConfig);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index f164487..d8fa728 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,12 +16,16 @@
*/
package org.apache.nifi.processors.standard.util;
+import net.schmizz.keepalive.KeepAlive;
+import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.common.Factory;
+import net.schmizz.sshj.connection.ConnectionImpl;
import net.schmizz.sshj.sftp.Response;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.Transport;
import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
import net.schmizz.sshj.userauth.method.AuthMethod;
import net.schmizz.sshj.userauth.method.AuthPassword;
@@ -50,6 +54,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -331,4 +336,35 @@
final SSHClient sshClient = new SSHClient();
assertThrows(ProcessException.class, () -> sftpTransfer.getAuthMethods(sshClient, null));
}
+
+ @Test
+ public void testGetKeepAliveProviderEnabled() {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new MockPropertyValue(Boolean.TRUE.toString()));
+
+ final KeepAlive keepAlive = getKeepAlive(processContext);
+ assertNotSame("Keep Alive Interval not configured", 0, keepAlive.getKeepAliveInterval());
+ }
+
+ @Test
+ public void testGetKeepAliveProviderDisabled() {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ when(processContext.getProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT)).thenReturn(new MockPropertyValue(Boolean.FALSE.toString()));
+
+ final KeepAlive keepAlive = getKeepAlive(processContext);
+ assertEquals("Keep Alive Interval configured", 0, keepAlive.getKeepAliveInterval());
+ }
+
+ private KeepAlive getKeepAlive(final ProcessContext processContext) {
+ final SFTPClient sftpClient = mock(SFTPClient.class);
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
+
+ final Transport transport = mock(Transport.class);
+ when(transport.getConfig()).thenReturn(new DefaultConfig());
+ final KeepAliveProvider mockKeepAliveProvider = mock(KeepAliveProvider.class);
+ final ConnectionImpl connection = new ConnectionImpl(transport, mockKeepAliveProvider);
+
+ final KeepAliveProvider keepAliveProvider = sftpTransfer.getKeepAliveProvider();
+ return keepAliveProvider.provide(connection);
+ }
}