DRILL-8407: Add Support for SFTP File Systems (#2770)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 0f0f353..3a9260c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -35,6 +35,7 @@
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -61,6 +62,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +107,7 @@
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection());
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.dropbox.impl", DropboxFileSystem.class.getName());
+      fsConf.set("fs.sftp.impl", SFTPFileSystem.class.getName());
       fsConf.set("fs.box.impl", BoxFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
       CredentialsProvider credentialsProvider = config.getCredentialsProvider();
@@ -116,6 +119,8 @@
 
       if (isS3Connection(fsConf)) {
         handleS3Credentials(fsConf);
+      } else if (isSFTP(fsConf) && config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+        handleSFTPCredentials(credentialsProvider);
       } else if (config.oAuthConfig() != null && config.getAuthMode() == AuthMode.SHARED_USER) {
         initializeOauthTokenTable(null);
       }
@@ -177,6 +182,11 @@
     return uri.getScheme().equals("s3a");
   }
 
+  private boolean isSFTP(Configuration conf) {
+    URI uri = FileSystem.getDefaultUri(conf);
+    return uri.getScheme().equals("sftp");
+  }
+
   /**
    * Retrieve secret and access keys from configured (with
    * {@link org.apache.hadoop.security.alias.CredentialProviderFactory#CREDENTIAL_PROVIDER_PATH} property)
@@ -199,6 +209,27 @@
     }
   }
 
+  private void handleSFTPCredentials(CredentialsProvider credentialsProvider) {
+    handleSFTPCredentials(credentialsProvider, null);
+  }
+
+  private void handleSFTPCredentials(CredentialsProvider credentialsProvider, String username) {
+    String[] credentialKeys = {"fs.s3a.secret.key", "fs.s3a.access.key"};
+    Map<String, String> creds;
+    if (credentialsProvider != null) {
+      // Get credentials from credential provider if present
+      URI uri = FileSystem.getDefaultUri(fsConf);
+      if (StringUtils.isEmpty(username)) {
+        creds = credentialsProvider.getCredentials();
+      } else {
+        // Handle user translation
+        creds = credentialsProvider.getUserCredentials(username);
+      }
+      fsConf.set(SFTPFileSystem.FS_SFTP_USER_PREFIX + uri.getHost(), creds.get("username"));
+      fsConf.set(SFTPFileSystem.FS_SFTP_PASSWORD_PREFIX + uri.getHost() + "." + creds.get("username"), creds.get("password"));
+    }
+  }
+
   @VisibleForTesting
   public void initializeOauthTokenTable(String username) {
     OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
@@ -285,7 +316,12 @@
     // active username in the constructor.  Removing it from the constructor makes
     // it difficult to test, so we do the check and leave it in both places.
     if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
-      initializeOauthTokenTable(schemaConfig.getUserName());
+      // If the file system uses OAuth, populate the OAuth tokens
+      if (config.oAuthConfig() != null) {
+        initializeOauthTokenTable(schemaConfig.getUserName());
+      } else if (isSFTP(fsConf)) {
+        handleSFTPCredentials(config.getCredentialsProvider(), schemaConfig.getUserName());
+      }
     }
     schemaFactory.registerSchemas(schemaConfig, parent);
   }