NIFI-8711 Added SFTP support for keyboard-interactive authentication

- Included debug logging of configured Authentication Methods

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5166.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 01cc6cf..ee12c82 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -22,7 +22,6 @@
 import net.schmizz.sshj.DefaultConfig;
 import net.schmizz.sshj.SSHClient;
 import net.schmizz.sshj.common.Factory;
-import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.connection.ConnectionImpl;
 import net.schmizz.sshj.sftp.FileAttributes;
 import net.schmizz.sshj.sftp.FileMode;
@@ -32,12 +31,15 @@
 import net.schmizz.sshj.sftp.Response;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.sftp.SFTPException;
-import net.schmizz.sshj.transport.TransportException;
 import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
+import net.schmizz.sshj.userauth.keyprovider.KeyFormat;
 import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
+import net.schmizz.sshj.userauth.keyprovider.KeyProviderUtil;
+import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
 import net.schmizz.sshj.userauth.method.AuthMethod;
 import net.schmizz.sshj.userauth.method.AuthPassword;
 import net.schmizz.sshj.userauth.method.AuthPublickey;
+import net.schmizz.sshj.userauth.method.PasswordResponseProvider;
 import net.schmizz.sshj.userauth.password.PasswordFinder;
 import net.schmizz.sshj.userauth.password.PasswordUtils;
 import net.schmizz.sshj.xfer.FilePermission;
@@ -430,7 +432,7 @@
         final SFTPClient sftpClient = getSFTPClient(origFlowFile);
         RemoteFile rf = null;
         RemoteFile.ReadAheadRemoteFileInputStream rfis = null;
-        FlowFile resultFlowFile = null;
+        FlowFile resultFlowFile;
         try {
             rf = sftpClient.open(remoteFileName);
             rfis = rf.new ReadAheadRemoteFileInputStream(16);
@@ -539,10 +541,10 @@
         if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
             ensureDirectoryExists(flowFile, directoryName.getParentFile());
         }
-        logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
+        logger.debug("Remote Directory {} does not exist; creating it", remoteDirectory);
         try {
             sftpClient.mkdir(remoteDirectory);
-            logger.debug("Created {}", new Object[] {remoteDirectory});
+            logger.debug("Created {}", remoteDirectory);
         } catch (final SFTPException e) {
             throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + getMessage(e), e);
         }
@@ -556,12 +558,12 @@
         }
     }
 
-    private static KeepAliveProvider NO_OP_KEEP_ALIVE = new KeepAliveProvider() {
+    private static final KeepAliveProvider NO_OP_KEEP_ALIVE = new KeepAliveProvider() {
         @Override
         public KeepAlive provide(final ConnectionImpl connection) {
             return new KeepAlive(connection, "no-op-keep-alive") {
                 @Override
-                protected void doKeepAlive() throws TransportException, ConnectionException {
+                protected void doKeepAlive() {
                     // do nothing;
                 }
             };
@@ -668,28 +670,11 @@
 
         // Connect to the host and port
         final String hostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
-        final int port = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue();
+        final int port = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger();
         sshClient.connect(hostname, port);
 
         // Setup authentication methods...
-        final List<AuthMethod> authMethods = new ArrayList<>();
-
-        // Add public-key auth if a private key is specified
-        final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        if (privateKeyFile != null) {
-            final String privateKeyPassphrase = ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue();
-            final KeyProvider keyProvider = privateKeyPassphrase == null ? sshClient.loadKeys(privateKeyFile) : sshClient.loadKeys(privateKeyFile, privateKeyPassphrase);
-            final AuthMethod publicKeyAuth = new AuthPublickey(keyProvider);
-            authMethods.add(publicKeyAuth);
-        }
-
-        // Add password auth if a password is specified
-        final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
-        if (password != null) {
-            final PasswordFinder passwordFinder = PasswordUtils.createOneOff(password.toCharArray());
-            final AuthMethod passwordAuth = new AuthPassword(passwordFinder);
-            authMethods.add(passwordAuth);
-        }
+        final List<AuthMethod> authMethods = getAuthMethods(sshClient, flowFile);
 
         // Authenticate...
         final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -711,7 +696,7 @@
             this.homeDir = "";
             // For some combination of server configuration and user home directory, getHome() can fail with "2: File not found"
             // Since  homeDir is only used tor SEND provenance event transit uri, this is harmless. Log and continue.
-            logger.debug("Failed to retrieve {} home directory due to {}", new Object[]{username, e.getMessage()});
+            logger.debug("Failed to retrieve {} home directory due to {}", username, e.getMessage());
         }
 
         return sftpClient;
@@ -793,7 +778,6 @@
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public FileInfo getRemoteFileInfo(final FlowFile flowFile, final String path, String filename) throws IOException {
         final SFTPClient sftpClient = getSFTPClient(flowFile);
 
@@ -840,7 +824,7 @@
         }
         final String tempPath = (path == null) ? tempFilename : (path.endsWith("/")) ? path + tempFilename : path + "/" + tempFilename;
 
-        int perms = 0;
+        int perms;
         final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
         if (permissions == null || permissions.trim().isEmpty()) {
             sftpClient.getFileTransfer().setPreserveAttributes(false); //We will accept whatever the default permissions are of the destination
@@ -872,7 +856,7 @@
 
                 sftpClient.setattr(tempPath, modifiedAttributes);
             } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", tempPath, lastModifiedTime, e);
             }
         }
 
@@ -881,7 +865,7 @@
             try {
                 sftpClient.chown(tempPath, Integer.parseInt(owner));
             } catch (final Exception e) {
-                logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e});
+                logger.error("Failed to set owner on {} to {} due to {}", tempPath, owner, e);
             }
         }
 
@@ -890,7 +874,7 @@
             try {
                 sftpClient.chgrp(tempPath, Integer.parseInt(group));
             } catch (final Exception e) {
-                logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e});
+                logger.error("Failed to set group on {} to {} due to {}", tempPath, group, e);
             }
         }
 
@@ -969,4 +953,54 @@
         return number;
     }
 
+    protected List<AuthMethod> getAuthMethods(final SSHClient client, final FlowFile flowFile) {
+        final List<AuthMethod> authMethods = new ArrayList<>();
+
+        final String privateKeyPath = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        if (privateKeyPath != null) {
+            final String privateKeyPassphrase = ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue();
+            final KeyProvider keyProvider = getKeyProvider(client, privateKeyPath, privateKeyPassphrase);
+            final AuthMethod authPublicKey = new AuthPublickey(keyProvider);
+            authMethods.add(authPublicKey);
+        }
+
+        final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
+        if (password != null) {
+            final AuthMethod authPassword = new AuthPassword(getPasswordFinder(password));
+            authMethods.add(authPassword);
+
+            final PasswordResponseProvider passwordProvider = new PasswordResponseProvider(getPasswordFinder(password));
+            final AuthMethod authKeyboardInteractive = new AuthKeyboardInteractive(passwordProvider);
+            authMethods.add(authKeyboardInteractive);
+        }
+
+        if (logger.isDebugEnabled()) {
+            final List<String> methods = authMethods.stream().map(AuthMethod::getName).collect(Collectors.toList());
+            logger.debug("Authentication Methods Configured {}", methods);
+        }
+        return authMethods;
+    }
+
+    private KeyProvider getKeyProvider(final SSHClient client, final String privateKeyLocation, final String privateKeyPassphrase) {
+        final KeyFormat keyFormat = getKeyFormat(privateKeyLocation);
+        logger.debug("Loading Private Key File [{}] Format [{}]", privateKeyLocation, keyFormat);
+        try {
+            return privateKeyPassphrase == null ? client.loadKeys(privateKeyLocation) : client.loadKeys(privateKeyLocation, privateKeyPassphrase);
+        } catch (final IOException e) {
+            throw new ProcessException(String.format("Loading Private Key File [%s] Format [%s] Failed", privateKeyLocation, keyFormat), e);
+        }
+    }
+
+    private KeyFormat getKeyFormat(final String privateKeyLocation) {
+        try {
+            final File privateKeyFile = new File(privateKeyLocation);
+            return KeyProviderUtil.detectKeyFileFormat(privateKeyFile);
+        } catch (final IOException e) {
+            throw new ProcessException(String.format("Reading Private Key File [%s] Format Failed", privateKeyLocation), e);
+        }
+    }
+
+    private PasswordFinder getPasswordFinder(final String password) {
+        return PasswordUtils.createOneOff(password.toCharArray());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index e4307aa..f164487 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,16 +16,21 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import net.schmizz.sshj.SSHClient;
 import net.schmizz.sshj.DefaultConfig;
 import net.schmizz.sshj.common.Factory;
 import net.schmizz.sshj.sftp.Response;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
+import net.schmizz.sshj.userauth.method.AuthMethod;
+import net.schmizz.sshj.userauth.method.AuthPassword;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.mock.MockComponentLogger;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockPropertyContext;
 import org.apache.nifi.util.MockPropertyValue;
@@ -37,10 +42,16 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -58,14 +69,14 @@
         final ComponentLog componentLog = mock(ComponentLog.class);
         return new SFTPTransfer(processContext, componentLog) {
             @Override
-            protected SFTPClient getSFTPClient(FlowFile flowFile) throws IOException {
+            protected SFTPClient getSFTPClient(FlowFile flowFile) {
                 return sftpClient;
             }
         };
     }
 
     @Test
-    public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsAlreadyExisted() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         final SFTPClient sftpClient = mock(SFTPClient.class);
         final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
@@ -78,7 +89,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsFailedToStat() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsFailedToStat() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         final SFTPClient sftpClient = mock(SFTPClient.class);
         // stat for the parent was successful, simulating that dir2 exists, but no dir3.
@@ -99,7 +110,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsNotExisted() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsNotExisted() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         final SFTPClient sftpClient = mock(SFTPClient.class);
         // stat for the parent was successful, simulating that dir2 exists, but no dir3.
@@ -117,7 +128,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsParentNotExisted() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         final SFTPClient sftpClient = mock(SFTPClient.class);
 
@@ -139,7 +150,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         final SFTPClient sftpClient = mock(SFTPClient.class);
 
@@ -165,7 +176,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
@@ -181,7 +192,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
@@ -211,7 +222,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
@@ -230,7 +241,7 @@
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SFTPException {
+    public void testEnsureDirectoryExistsBlindlyFailed() throws IOException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
@@ -282,4 +293,42 @@
         assertEquals(allowedKeyExchangeAlgorithm, defaultConfig.getKeyExchangeFactories().get(0).getName());
         assertEquals(allowedMac, defaultConfig.getMACFactories().get(0).getName());
     }
+
+    @Test
+    public void testGetAuthMethodsPassword() {
+        final String password = UUID.randomUUID().toString();
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.PASSWORD)).thenReturn(new MockPropertyValue(password));
+        when(processContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH)).thenReturn(new MockPropertyValue(null));
+
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
+
+        final SSHClient sshClient = new SSHClient();
+        final List<AuthMethod> authMethods = sftpTransfer.getAuthMethods(sshClient, null);
+        assertFalse("Authentication Methods not found", authMethods.isEmpty());
+
+        final Optional<AuthMethod> authPassword = authMethods.stream().filter(authMethod -> authMethod instanceof AuthPassword).findFirst();
+        assertTrue("Password Authentication not found", authPassword.isPresent());
+
+        final Optional<AuthMethod> authKeyboardInteractive = authMethods.stream().filter(authMethod -> authMethod instanceof AuthKeyboardInteractive).findFirst();
+        assertTrue("Keyboard Interactive Authentication not found", authKeyboardInteractive.isPresent());
+    }
+
+    @Test
+    public void testGetAuthMethodsPrivateKeyLoadFailed() throws IOException {
+        final File privateKeyFile = File.createTempFile(TestSFTPTransfer.class.getSimpleName(), ".key");
+        privateKeyFile.deleteOnExit();
+
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.PASSWORD)).thenReturn(new MockPropertyValue(null));
+        when(processContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH)).thenReturn(new MockPropertyValue(privateKeyFile.getAbsolutePath()));
+        when(processContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE)).thenReturn(new MockPropertyValue(null));
+
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
+
+        final SSHClient sshClient = new SSHClient();
+        assertThrows(ProcessException.class, () -> sftpTransfer.getAuthMethods(sshClient, null));
+    }
 }