NIFI-8523 Added SFTP algorithm and cipher properties

- Updated secure FTP processors to configure which algorithms, ciphers and message authentication codes are allowed to be used by the SSH Client
- Included Expression Language Variable Registry support for properties

This closes #5061

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 2124b68..69281e5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -87,6 +87,10 @@
         properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
         properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
         properties.add(FILE_NOT_FOUND_LOG_LEVEL);
+        properties.add(SFTPTransfer.CIPHERS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_EXCHANGE_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.MESSAGE_AUTHENTICATION_CODES_ALLOWED);
         return properties;
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
index 5a3d667..0e105a8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
@@ -89,6 +89,10 @@
         properties.add(FTPTransfer.PROXY_PORT);
         properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
         properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        properties.add(SFTPTransfer.CIPHERS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_EXCHANGE_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.MESSAGE_AUTHENTICATION_CODES_ALLOWED);
         this.properties = Collections.unmodifiableList(properties);
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index c345b3b..704cebc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -116,6 +116,10 @@
         properties.add(ListFile.MAX_AGE);
         properties.add(ListFile.MIN_SIZE);
         properties.add(ListFile.MAX_SIZE);
+        properties.add(SFTPTransfer.CIPHERS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_EXCHANGE_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.MESSAGE_AUTHENTICATION_CODES_ALLOWED);
         return properties;
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
index 35bb174..e431042 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
@@ -80,6 +80,10 @@
         properties.add(FTPTransfer.PROXY_PORT);
         properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
         properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        properties.add(SFTPTransfer.CIPHERS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.KEY_EXCHANGE_ALGORITHMS_ALLOWED);
+        properties.add(SFTPTransfer.MESSAGE_AUTHENTICATION_CODES_ALLOWED);
         this.properties = Collections.unmodifiableList(properties);
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index e7980cd..01cc6cf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -18,8 +18,10 @@
 
 import net.schmizz.keepalive.KeepAlive;
 import net.schmizz.keepalive.KeepAliveProvider;
+import net.schmizz.sshj.Config;
 import net.schmizz.sshj.DefaultConfig;
 import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.common.Factory;
 import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.connection.ConnectionImpl;
 import net.schmizz.sshj.sftp.FileAttributes;
@@ -72,17 +74,50 @@
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
 
 public class SFTPTransfer implements FileTransfer {
+    private static final Set<String> DEFAULT_KEY_ALGORITHM_NAMES;
+    private static final Set<String> DEFAULT_CIPHER_NAMES;
+    private static final Set<String> DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES;
+    private static final Set<String> DEFAULT_KEY_EXCHANGE_ALGORITHM_NAMES;
+
+    static {
+        DefaultConfig defaultConfig = new DefaultConfig();
+
+        DEFAULT_KEY_ALGORITHM_NAMES = Collections.unmodifiableSet(defaultConfig.getKeyAlgorithms().stream()
+                .map(Factory.Named::getName).collect(Collectors.toSet()));
+        DEFAULT_CIPHER_NAMES = Collections.unmodifiableSet(defaultConfig.getCipherFactories().stream()
+                .map(Factory.Named::getName).collect(Collectors.toSet()));
+        DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES = Collections.unmodifiableSet(defaultConfig.getMACFactories().stream()
+                .map(Factory.Named::getName).collect(Collectors.toSet()));
+        DEFAULT_KEY_EXCHANGE_ALGORITHM_NAMES = Collections.unmodifiableSet(defaultConfig.getKeyExchangeFactories().stream()
+                .map(Factory.Named::getName).collect(Collectors.toSet()));
+    }
+
+    /**
+     * Converts a set of names into an alphabetically ordered comma separated value list.
+     *
+     * @param factorySetNames The set of names
+     * @return An alphabetically ordered comma separated value list of names
+     */
+    private static String convertFactorySetToString(Set<String> factorySetNames) {
+        return factorySetNames
+                .stream()
+                .sorted()
+                .collect(Collectors.joining(", "));
+    }
 
     public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
         .name("Private Key Path")
@@ -131,6 +166,44 @@
         .required(true)
         .build();
 
+    public static final PropertyDescriptor KEY_ALGORITHMS_ALLOWED = new PropertyDescriptor.Builder()
+            .name("Key Algorithms Allowed")
+            .displayName("Key Algorithms Allowed")
+            .description("A comma-separated list of Key Algorithms allowed for SFTP connections. Leave unset to allow all. Available options are: "
+                    + convertFactorySetToString(DEFAULT_KEY_ALGORITHM_NAMES))
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CIPHERS_ALLOWED = new PropertyDescriptor.Builder()
+            .name("Ciphers Allowed")
+            .displayName("Ciphers Allowed")
+            .description("A comma-separated list of Ciphers allowed for SFTP connections. Leave unset to allow all. Available options are: " + convertFactorySetToString(DEFAULT_CIPHER_NAMES))
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_AUTHENTICATION_CODES_ALLOWED = new PropertyDescriptor.Builder()
+            .name("Message Authentication Codes Allowed")
+            .displayName("Message Authentication Codes Allowed")
+            .description("A comma-separated list of Message Authentication Codes allowed for SFTP connections. Leave unset to allow all. Available options are: "
+                    + convertFactorySetToString(DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES))
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor KEY_EXCHANGE_ALGORITHMS_ALLOWED = new PropertyDescriptor.Builder()
+            .name("Key Exchange Algorithms Allowed")
+            .displayName("Key Exchange Algorithms Allowed")
+            .description("A comma-separated list of Key Exchange Algorithms allowed for SFTP connections. Leave unset to allow all. Available options are: "
+                    + convertFactorySetToString(DEFAULT_KEY_EXCHANGE_ALGORITHM_NAMES))
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
 
     /**
      * Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link SFTPClient#ls(String)} before calling
@@ -521,6 +594,8 @@
             sshClientConfig.setKeepAliveProvider(NO_OP_KEEP_ALIVE);
         }
 
+        updateConfigAlgorithms(sshClientConfig);
+
         final SSHClient sshClient = new SSHClient(sshClientConfig);
 
         // Create a Proxy if the config was specified, proxy will be null if type was NO_PROXY
@@ -642,6 +717,44 @@
         return sftpClient;
     }
 
+    void updateConfigAlgorithms(final Config config) {
+        if (ctx.getProperty(CIPHERS_ALLOWED).isSet()) {
+            Set<String> allowedCiphers = Arrays.stream(ctx.getProperty(CIPHERS_ALLOWED).evaluateAttributeExpressions().getValue().split(","))
+                    .map(String::trim)
+                    .collect(Collectors.toSet());
+            config.setCipherFactories(config.getCipherFactories().stream()
+                    .filter(cipherNamed -> allowedCiphers.contains(cipherNamed.getName()))
+                    .collect(Collectors.toList()));
+        }
+
+        if (ctx.getProperty(KEY_ALGORITHMS_ALLOWED).isSet()) {
+            Set<String> allowedKeyAlgorithms = Arrays.stream(ctx.getProperty(KEY_ALGORITHMS_ALLOWED).evaluateAttributeExpressions().getValue().split(","))
+                    .map(String::trim)
+                    .collect(Collectors.toSet());
+            config.setKeyAlgorithms(config.getKeyAlgorithms().stream()
+                    .filter(keyAlgorithmNamed -> allowedKeyAlgorithms.contains(keyAlgorithmNamed.getName()))
+                    .collect(Collectors.toList()));
+        }
+
+        if (ctx.getProperty(KEY_EXCHANGE_ALGORITHMS_ALLOWED).isSet()) {
+            Set<String> allowedKeyExchangeAlgorithms = Arrays.stream(ctx.getProperty(KEY_EXCHANGE_ALGORITHMS_ALLOWED).evaluateAttributeExpressions().getValue().split(","))
+                    .map(String::trim)
+                    .collect(Collectors.toSet());
+            config.setKeyExchangeFactories(config.getKeyExchangeFactories().stream()
+                    .filter(keyExchangeNamed -> allowedKeyExchangeAlgorithms.contains(keyExchangeNamed.getName()))
+                    .collect(Collectors.toList()));
+        }
+
+        if (ctx.getProperty(MESSAGE_AUTHENTICATION_CODES_ALLOWED).isSet()) {
+            Set<String> allowedMessageAuthenticationCodes = Arrays.stream(ctx.getProperty(MESSAGE_AUTHENTICATION_CODES_ALLOWED).evaluateAttributeExpressions().getValue().split(","))
+                    .map(String::trim)
+                    .collect(Collectors.toSet());
+            config.setMACFactories(config.getMACFactories().stream()
+                    .filter(macNamed -> allowedMessageAuthenticationCodes.contains(macNamed.getName()))
+                    .collect(Collectors.toList()));
+        }
+    }
+
     @Override
     public String getHomeDirectory(final FlowFile flowFile) throws IOException {
         getSFTPClient(flowFile);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index e9d9a0b..e4307aa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,13 +16,18 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.common.Factory;
 import net.schmizz.sshj.sftp.Response;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.sftp.SFTPException;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyContext;
 import org.apache.nifi.util.MockPropertyValue;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -30,7 +35,10 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -244,4 +252,34 @@
         verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
     }
 
+    @Test
+    public void testRestrictSSHOptions() {
+        Map<PropertyDescriptor, String> propertyDescriptorValues = new HashMap<>();
+
+        DefaultConfig defaultConfig = new DefaultConfig();
+
+        String allowedMac = defaultConfig.getMACFactories().stream().map(Factory.Named::getName).collect(Collectors.toList()).get(0);
+        String allowedKeyAlgorithm = defaultConfig.getKeyAlgorithms().stream().map(Factory.Named::getName).collect(Collectors.toList()).get(0);
+        String allowedKeyExchangeAlgorithm = defaultConfig.getKeyExchangeFactories().stream().map(Factory.Named::getName).collect(Collectors.toList()).get(0);
+        String allowedCipher = defaultConfig.getCipherFactories().stream().map(Factory.Named::getName).collect(Collectors.toList()).get(0);
+
+        propertyDescriptorValues.put(SFTPTransfer.MESSAGE_AUTHENTICATION_CODES_ALLOWED, allowedMac);
+        propertyDescriptorValues.put(SFTPTransfer.CIPHERS_ALLOWED, allowedCipher);
+        propertyDescriptorValues.put(SFTPTransfer.KEY_ALGORITHMS_ALLOWED, allowedKeyAlgorithm);
+        propertyDescriptorValues.put(SFTPTransfer.KEY_EXCHANGE_ALGORITHMS_ALLOWED, allowedKeyExchangeAlgorithm);
+        MockPropertyContext mockPropertyContext = new MockPropertyContext(propertyDescriptorValues);
+        SFTPTransfer sftpTransfer = new SFTPTransfer(mockPropertyContext, new MockComponentLogger());
+
+        sftpTransfer.updateConfigAlgorithms(defaultConfig);
+
+        assertEquals(1, defaultConfig.getCipherFactories().size());
+        assertEquals(1, defaultConfig.getKeyAlgorithms().size());
+        assertEquals(1, defaultConfig.getKeyExchangeFactories().size());
+        assertEquals(1, defaultConfig.getMACFactories().size());
+
+        assertEquals(allowedCipher, defaultConfig.getCipherFactories().get(0).getName());
+        assertEquals(allowedKeyAlgorithm, defaultConfig.getKeyAlgorithms().get(0).getName());
+        assertEquals(allowedKeyExchangeAlgorithm, defaultConfig.getKeyExchangeFactories().get(0).getName());
+        assertEquals(allowedMac, defaultConfig.getMACFactories().get(0).getName());
+    }
 }