DRILL-8407: Add Support for SFTP File Systems (#2770)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 0f0f353..3a9260c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -35,6 +35,7 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
@@ -61,6 +62,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +107,7 @@
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection());
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
fsConf.set("fs.dropbox.impl", DropboxFileSystem.class.getName());
+ fsConf.set("fs.sftp.impl", SFTPFileSystem.class.getName());
fsConf.set("fs.box.impl", BoxFileSystem.class.getName());
fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
CredentialsProvider credentialsProvider = config.getCredentialsProvider();
@@ -116,6 +119,8 @@
if (isS3Connection(fsConf)) {
handleS3Credentials(fsConf);
+ } else if (isSFTP(fsConf) && config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+ handleSFTPCredentials(credentialsProvider);
} else if (config.oAuthConfig() != null && config.getAuthMode() == AuthMode.SHARED_USER) {
initializeOauthTokenTable(null);
}
@@ -177,6 +182,11 @@
return uri.getScheme().equals("s3a");
}
+ private boolean isSFTP(Configuration conf) {
+ URI uri = FileSystem.getDefaultUri(conf);
+ return uri.getScheme().equals("sftp");
+ }
+
/**
* Retrieve secret and access keys from configured (with
* {@link org.apache.hadoop.security.alias.CredentialProviderFactory#CREDENTIAL_PROVIDER_PATH} property)
@@ -199,6 +209,27 @@
}
}
+ private void handleSFTPCredentials(CredentialsProvider credentialsProvider) {
+ handleSFTPCredentials(credentialsProvider, null);
+ }
+
+ private void handleSFTPCredentials(CredentialsProvider credentialsProvider, String username) {
+ String[] credentialKeys = {"fs.s3a.secret.key", "fs.s3a.access.key"};
+ Map<String, String> creds;
+ if (credentialsProvider != null) {
+ // Get credentials from credential provider if present
+ URI uri = FileSystem.getDefaultUri(fsConf);
+ if (StringUtils.isEmpty(username)) {
+ creds = credentialsProvider.getCredentials();
+ } else {
+ // Handle user translation
+ creds = credentialsProvider.getUserCredentials(username);
+ }
+ fsConf.set(SFTPFileSystem.FS_SFTP_USER_PREFIX + uri.getHost(), creds.get("username"));
+ fsConf.set(SFTPFileSystem.FS_SFTP_PASSWORD_PREFIX + uri.getHost() + "." + creds.get("username"), creds.get("password"));
+ }
+ }
+
@VisibleForTesting
public void initializeOauthTokenTable(String username) {
OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
@@ -285,7 +316,12 @@
// active username in the constructor. Removing it from the constructor makes
// it difficult to test, so we do the check and leave it in both places.
if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
- initializeOauthTokenTable(schemaConfig.getUserName());
+ // If the file system uses OAuth, populate the OAuth tokens
+ if (config.oAuthConfig() != null) {
+ initializeOauthTokenTable(schemaConfig.getUserName());
+ } else if (isSFTP(fsConf)) {
+ handleSFTPCredentials(config.getCredentialsProvider(), schemaConfig.getUserName());
+ }
}
schemaFactory.registerSchemas(schemaConfig, parent);
}