DRILL-8155: Introduce New Plugin Authentication Modes (#2516)

* Do not set the read-only hint on JDBC connections.

* Outline of different auth modes in storage-jdbc.

In this commit, a new `authMode` storage config supporting three new auth modes
is defined: shared user (default), user translation (user is translated to some
other user from the external storage) and impersonation (the external storage
and JDBC driver provide support for impersonating the Drill query user).

The JdbcStoragePlugin is enhanced to be able to work with a lookup table of
connection pools, where a pool is dedicated to each query user except when
in shared user auth mode.

Planning and execution time APIs are also enhanced to transmit a user
credentials object for the query user, instead of just its username.
This allows for the expansion of the UserCredentials protobuf type to
include some optional extra credentials, e.g. in an array of byte arrays.
These credentials may be relevant in the user tranlsation mode when a
credential provider must be accessed in order to obtain the creds to be used
for the external system.

* Rebased to current master and build fixes

* Build works, cred stuff added

* Credentials being saved and pushed down to storage plugin

* UI now closing properly

* User Translation working for HTTP plugin

* HTTP unit tests passing

* WIP

* Fixed import

* User Credentials now being stored in credential provider

* Working

* Fixed TPCH Unit Tests

* Fix CredProvider SerDe Test

* Added unit tests for JDBC

* Code cleanup

* Fix LGTM alerts

* Correct username now populating Group Scan

* Username to Subscan

* Remove PerUserUsernamePasswordCredentials class.

* Remove getUserCredentials from CredentialsProvider.

* Planning errors fixed

* Removed unused imports

* Fixed minor issues

* Unit test fixes

* WIP.

* Fix CodeQL Alert

* Ignore LGTM False Positive

* Fix tainted string LGTM alert

* Revert LGTM Comment

* Addressed review comments

* Use fixed size Guava caches in JDBC convetion and dialect factories.

These replace Maps with no size limit that might have grown without
bound. LRU eviction begins when the cache size limit is reached.

* Add a TTL to the JDBC dialect and convention caches.

Co-authored-by: James Turton <james@somecomputer.xyz>
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index 73c5ffb..d4c08f3 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -101,8 +101,12 @@
       String tableName = mapRDBFormatPlugin.getTableName(selection);
       TableProperties props = mapRDBFormatPlugin.getMaprFS().getTableProperties(new Path(tableName));
       if (props.getAttr().getJson()) {
-        return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
-            new FormatSelection(mapRDBFormatPlugin.getConfig(), selection));
+        return new DynamicDrillTable(
+          fsPlugin,
+          storageEngineName,
+          schemaConfig.getQueryUserCredentials().getUserName(),
+          new FormatSelection(mapRDBFormatPlugin.getConfig(), selection)
+        );
       } else {
         FormatSelection formatSelection = new FormatSelection(mapRDBFormatPlugin.getConfig(), selection);
         return new MapRDBBinaryTable(storageEngineName, fsPlugin, mapRDBFormatPlugin, formatSelection);
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
index 26ed2a0..d6a51cd 100644
--- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
@@ -21,7 +21,8 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -29,9 +30,10 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonTypeName(CassandraStorageConfig.NAME)
-public class CassandraStorageConfig extends AbstractSecuredStoragePluginConfig {
+public class CassandraStorageConfig extends CredentialedStoragePluginConfig {
   public static final String NAME = "cassandra";
 
   private final String host;
@@ -59,33 +61,35 @@
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
-    }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
-    }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
   public Map<String, Object> toConfigMap() {
-    UsernamePasswordCredentials credentials = getUsernamePasswordCredentials();
+    Optional<UsernamePasswordCredentials> credentials = getUsernamePasswordCredentials();
 
     Map<String, Object> result = new HashMap<>();
     result.put("host", host);
     result.put("port", port);
-    result.put("username", credentials.getUsername());
-    result.put("password", credentials.getPassword());
+    if (credentials.isPresent()) {
+      result.put("username", credentials.get().getUsername());
+      result.put("password", credentials.get().getPassword());
+    }
     return result;
   }
 
@@ -106,4 +110,9 @@
   public int hashCode() {
     return Objects.hash(host, credentialsProvider);
   }
+
+  @Override
+  public CassandraStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
index 9e84635..c03f525 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
@@ -24,7 +24,7 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -33,9 +33,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonTypeName(ElasticsearchStorageConfig.NAME)
-public class ElasticsearchStorageConfig extends AbstractSecuredStoragePluginConfig {
+public class ElasticsearchStorageConfig extends CredentialedStoragePluginConfig {
   public static final String NAME = "elastic";
 
   private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(List.class);
@@ -57,22 +58,28 @@
   }
 
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonIgnore
@@ -102,4 +109,9 @@
   public int hashCode() {
     return Objects.hash(hosts, credentialsProvider);
   }
+
+  @Override
+  public CredentialedStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
index c9f7e31..9263c0d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
@@ -34,12 +34,14 @@
 
   private final HttpStoragePlugin plugin;
   private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+  private final String queryUserName;
 
   public HttpAPIConnectionSchema(HttpSchema parent,
                                  String name,
-                                 HttpStoragePlugin plugin) {
+                                 HttpStoragePlugin plugin, String queryUserName) {
     super(parent.getSchemaPath(), name);
     this.plugin = plugin;
+    this.queryUserName = queryUserName;
   }
 
   @Override
@@ -63,11 +65,10 @@
       // Return the found table
       return table;
     } else {
-
       // Register a new table
       return registerTable(tableName, new DynamicDrillTable(plugin, plugin.getName(),
         new HttpScanSpec(plugin.getName(), name, tableName,
-              plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), plugin.getRegistry())));
+              plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), queryUserName, plugin.getRegistry())));
     }
   }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index 5da4d15..581b61c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -30,6 +30,7 @@
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.security.UsernamePasswordWithProxyCredentials;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 @JsonDeserialize(builder = HttpApiConfig.HttpApiConfigBuilder.class)
@@ -335,18 +337,22 @@
 
   @JsonProperty
   public String userName() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonProperty
   public String password() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
@@ -365,8 +371,18 @@
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+  }
+
+  @JsonIgnore
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(username)
+      .build();
   }
 
   @JsonProperty
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index f6651d7..b99ec57 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -38,8 +38,8 @@
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
-import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
+import org.apache.drill.exec.store.security.UsernamePasswordWithProxyCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +48,7 @@
 import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
 
@@ -242,13 +243,15 @@
         .fromConfigForURL(drillConfig, url.toString());
     final String proxyType = config.proxyType();
     if (proxyType != null && !"direct".equals(proxyType)) {
-      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
       builder
         .type(config.proxyType())
         .host(config.proxyHost())
-        .port(config.proxyPort())
-        .username(credentials.getUsername())
-        .password(credentials.getPassword());
+        .port(config.proxyPort());
+
+      Optional<UsernamePasswordWithProxyCredentials> credentials = config.getUsernamePasswordCredentials();
+      if (credentials.isPresent()) {
+        builder.username(credentials.get().getUsername()).password(credentials.get().getPassword());
+      }
     }
     return builder.build();
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 619858d..5eab823 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -49,6 +49,7 @@
   private final ScanStats scanStats;
   private final double filterSelectivity;
   private final int maxRecords;
+  private final String username;
 
   // Used only in planner, not serialized
   private int hashCode;
@@ -57,8 +58,9 @@
    * Creates a new group scan from the storage plugin.
    */
   public HttpGroupScan (HttpScanSpec scanSpec) {
-    super("no-user");
+    super(scanSpec.queryUserName());
     this.httpScanSpec = scanSpec;
+    this.username = scanSpec.queryUserName();
     this.columns = ALL_COLUMNS;
     this.filters = null;
     this.filterSelectivity = 0.0;
@@ -76,6 +78,7 @@
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.maxRecords = that.maxRecords;
+    this.username = that.username;
 
     // Calcite makes many copies in the later stage of planning
     // without changing anything. Retain the previous stats.
@@ -96,6 +99,7 @@
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.scanStats = computeScanStats();
+    this.username = that.username;
     this.maxRecords = that.maxRecords;
   }
 
@@ -107,6 +111,7 @@
     super(that);
     this.columns = that.columns;
     this.httpScanSpec = that.httpScanSpec;
+    this.username = that.username;
 
     // Applies a filter.
     this.filters = filters;
@@ -122,6 +127,7 @@
     super(that);
     this.columns = that.columns;
     this.httpScanSpec = that.httpScanSpec;
+    this.username = that.username;
 
     // Applies a filter.
     this.filters = that.filters;
@@ -143,9 +149,10 @@
     @JsonProperty("filterSelectivity") double selectivity,
     @JsonProperty("maxRecords") int maxRecords
   ) {
-    super("no-user");
+    super(httpScanSpec.queryUserName());
     this.columns = columns;
     this.httpScanSpec = httpScanSpec;
+    this.username = httpScanSpec.queryUserName();
     this.filters = filters;
     this.filterSelectivity = selectivity;
     this.scanStats = computeScanStats();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
index e43490c..151532c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
@@ -35,6 +35,7 @@
   private final HttpStoragePluginConfig config;
   private final StoragePluginRegistry registry;
   private final PersistentTokenTable tokenTable;
+  private final String queryUserName;
 
   @JsonCreator
   public HttpScanSpec(@JsonProperty("pluginName") String pluginName,
@@ -42,6 +43,7 @@
                       @JsonProperty("tableName") String tableName,
                       @JsonProperty("config") HttpStoragePluginConfig config,
                       @JsonProperty("tokenTable") PersistentTokenTable tokenTable,
+                      @JsonProperty("queryUserName") String queryUserName,
                       @JacksonInject StoragePluginRegistry engineRegistry) {
     this.pluginName = pluginName;
     this.connectionName = connectionName;
@@ -49,6 +51,7 @@
     this.config = config;
     this.registry = engineRegistry;
     this.tokenTable = tokenTable;
+    this.queryUserName = queryUserName;
   }
 
   @JsonProperty("pluginName")
@@ -71,6 +74,11 @@
     return config;
   }
 
+  @JsonProperty("queryUserName")
+  public String queryUserName() {
+    return queryUserName;
+  }
+
   @JsonIgnore
   public PersistentTokenTable getTokenTable() {
     return tokenTable;
@@ -98,6 +106,7 @@
       .field("database", connectionName)
       .field("tableName", tableName)
       .field("config", config)
+      .field("queryUserName", queryUserName)
       .toString();
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
index 661cbef..ef6368d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
@@ -44,7 +44,7 @@
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
-    HttpSchema schema = new HttpSchema(plugin);
+    HttpSchema schema = new HttpSchema(plugin, schemaConfig.getUserName());
     logger.debug("Registering {} {}", schema.getName(), schema.toString());
 
     SchemaPlus schemaPlus = parent.add(getName(), schema);
@@ -57,15 +57,17 @@
     private final Map<String, HttpAPIConnectionSchema> subSchemas = CaseInsensitiveMap.newHashMap();
     private final Map<String, HttpApiConfig> tables = CaseInsensitiveMap.newHashMap();
     private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+    private final String queryUserName;
 
-    public HttpSchema(HttpStoragePlugin plugin) {
+    public HttpSchema(HttpStoragePlugin plugin, String queryUserName) {
       super(Collections.emptyList(), plugin.getName());
+      this.queryUserName = queryUserName;
       this.plugin = plugin;
       for (Entry<String, HttpApiConfig> entry : plugin.getConfig().connections().entrySet()) {
         String configName = entry.getKey();
         HttpApiConfig config = entry.getValue();
         if (config.requireTail()) {
-          subSchemas.put(configName, new HttpAPIConnectionSchema(this, configName, plugin));
+          subSchemas.put(configName, new HttpAPIConnectionSchema(this, configName, plugin, queryUserName));
         } else {
           tables.put(configName, config);
         }
@@ -104,7 +106,7 @@
         // Register a new table
         return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
             new HttpScanSpec(plugin.getName(), name, null,
-                plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), plugin.getRegistry())));
+                plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), queryUserName, plugin.getRegistry())));
       } else {
         return null; // Unknown table
       }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 721db6a..3af4ed2 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -20,7 +20,7 @@
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -36,11 +36,12 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 
 @JsonTypeName(HttpStoragePluginConfig.NAME)
-public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
+public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
   private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
   public static final String NAME = "http";
 
@@ -68,18 +69,21 @@
                                  @JsonProperty("proxyUsername") String proxyUsername,
                                  @JsonProperty("proxyPassword") String proxyPassword,
                                  @JsonProperty("oAuthConfig") HttpOAuthConfig oAuthConfig,
-                                 @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider
+                                 @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+                                 @JsonProperty("authMode") String authMode
                                  ) {
     super(CredentialProviderUtils.getCredentialsProvider(
-        getClientID(new OAuthTokenCredentials(credentialsProvider)),
-        getClientSecret(new OAuthTokenCredentials(credentialsProvider)),
-        getTokenURL(new OAuthTokenCredentials(credentialsProvider)),
+        null,
+        null,
+        null,
         normalize(username),
         normalize(password),
         normalize(proxyUsername),
         normalize(proxyPassword),
         credentialsProvider),
-        credentialsProvider == null);
+        credentialsProvider == null,
+        AuthMode.parseOrDefault(authMode)
+    );
     this.cacheResults = cacheResults != null && cacheResults;
 
     this.connections = CaseInsensitiveMap.newHashMap();
@@ -110,6 +114,17 @@
     }
   }
 
+  private HttpStoragePluginConfig(HttpStoragePluginConfig that, CredentialsProvider credentialsProvider) {
+    super(credentialsProvider, credentialsProvider == null, that.authMode);
+    this.cacheResults = that.cacheResults;
+    this.connections = that.connections;
+    this.timeout = that.timeout;
+    this.proxyHost = that.proxyHost;
+    this.proxyPort = that.proxyPort;
+    this.proxyType = that.proxyType;
+    this.oAuthConfig = that.oAuthConfig;
+  }
+
   /**
    * Clone constructor used for updating OAuth tokens
    * @param that The current HTTP Plugin Config
@@ -141,12 +156,22 @@
    * The copy is used in the query plan to avoid including unnecessary information.
    */
   public HttpStoragePluginConfig copyForPlan(String connectionName) {
+    Optional<UsernamePasswordWithProxyCredentials> creds = getUsernamePasswordCredentials();
     return new HttpStoragePluginConfig(
-        cacheResults, configFor(connectionName), timeout,
-      getUsernamePasswordCredentials().getUsername(),
-      getUsernamePasswordCredentials().getPassword(),
-        proxyHost, proxyPort, proxyType, getUsernamePasswordCredentials().getProxyUsername(),
-      getUsernamePasswordCredentials().getProxyPassword(), oAuthConfig, credentialsProvider);
+      cacheResults,
+      configFor(connectionName),
+      timeout,
+      username(),
+      password(),
+      proxyHost,
+      proxyPort,
+      proxyType,
+      proxyUsername(),
+      proxyPassword(),
+      oAuthConfig,
+      credentialsProvider,
+      authMode.name()
+    );
   }
 
   private Map<String, HttpApiConfig> configFor(String connectionName) {
@@ -164,12 +189,13 @@
     }
     HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that;
     return Objects.equals(connections, thatConfig.connections) &&
-           Objects.equals(cacheResults, thatConfig.cacheResults) &&
-           Objects.equals(proxyHost, thatConfig.proxyHost) &&
-           Objects.equals(proxyPort, thatConfig.proxyPort) &&
-           Objects.equals(proxyType, thatConfig.proxyType) &&
-           Objects.equals(oAuthConfig, thatConfig.oAuthConfig) &&
-           Objects.equals(credentialsProvider, thatConfig.credentialsProvider);
+      Objects.equals(cacheResults, thatConfig.cacheResults) &&
+      Objects.equals(proxyHost, thatConfig.proxyHost) &&
+      Objects.equals(proxyPort, thatConfig.proxyPort) &&
+      Objects.equals(proxyType, thatConfig.proxyType) &&
+      Objects.equals(oAuthConfig, thatConfig.oAuthConfig) &&
+      Objects.equals(credentialsProvider, thatConfig.credentialsProvider) &&
+      Objects.equals(authMode, thatConfig.authMode);
   }
 
   @Override
@@ -183,13 +209,14 @@
       .field("credentialsProvider", credentialsProvider)
       .field("oauthConfig", oAuthConfig)
       .field("proxyType", proxyType)
+      .field("authMode", authMode)
       .toString();
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(connections, cacheResults, timeout,
-        proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider);
+        proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider, authMode);
   }
 
   @JsonProperty("cacheResults")
@@ -201,7 +228,7 @@
   @JsonProperty("timeout")
   public int timeout() { return timeout;}
 
-  @JsonProperty("proxyHost")
+   @JsonProperty("proxyHost")
   public String proxyHost() { return proxyHost; }
 
   @JsonProperty("proxyPort")
@@ -214,34 +241,42 @@
 
   @JsonProperty("username")
   public String username() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonProperty("password")
   public String password() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonProperty("proxyUsername")
   public String proxyUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getProxyUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getProxyUsername)
+      .orElse(null);
   }
 
   @JsonProperty("proxyPassword")
   public String proxyPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getProxyPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getProxyPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
@@ -268,12 +303,29 @@
   }
 
   @JsonIgnore
-  public UsernamePasswordWithProxyCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordWithProxyCredentials(credentialsProvider);
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonIgnore
-  public static OAuthTokenCredentials getOAuthCredentials(CredentialsProvider credentialsProvider) {
-    return new OAuthTokenCredentials(credentialsProvider);
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(username)
+      .build();
+  }
+
+  @JsonIgnore
+  public static Optional<OAuthTokenCredentials> getOAuthCredentials(CredentialsProvider credentialsProvider) {
+    return new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+  }
+
+  @Override
+  public HttpStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new HttpStoragePluginConfig(this, credentialsProvider);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index e517259..7e7dc33 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -51,7 +51,7 @@
     @JsonProperty("filters") Map<String, String> filters,
     @JsonProperty("maxRecords") int maxRecords
     ) {
-    super("user-if-needed");
+    super(tableSpec.queryUserName());
     this.tableSpec = tableSpec;
     this.columns = columns;
     this.filters = filters;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
index 3c7b06c..48ce501 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
@@ -59,7 +59,10 @@
     accessToken = tokenTable.getAccessToken();
     refreshToken = tokenTable.getRefreshToken();
 
-    this.credentials = new OAuthTokenCredentials(credentialsProvider, tokenTable);
+    this.credentials = new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setTokenTable(tokenTable)
+      .build().get();
 
     // Add proxy info
     SimpleHttp.addProxyInfo(builder, proxyConfig);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index f38f764..0b11030 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -32,6 +32,7 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -55,7 +56,7 @@
 import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
 import org.apache.drill.exec.store.http.oauth.AccessTokenRepository;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
-import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.security.UsernamePasswordWithProxyCredentials;
 import org.jetbrains.annotations.NotNull;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -84,6 +85,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -117,6 +119,7 @@
   private int responseCode;
   private String responseProtocol;
   private String responseURL;
+  private String username;
 
 
   public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
@@ -125,6 +128,7 @@
     this.pluginConfig = scanDefn.tableSpec().config();
     this.connection = scanDefn.tableSpec().connection();
     this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
+    this.username = scanDefn.getUserName();
     this.filters = scanDefn.filters();
     this.url = url;
     this.tempDir = tempDir;
@@ -208,8 +212,19 @@
       // If the API uses basic authentication add the authentication code.  Use the global credentials unless there are credentials
       // for the specific endpoint.
       logger.debug("Adding Interceptor");
-      UsernamePasswordCredentials credentials = getCredentials();
-      builder.addInterceptor(new BasicAuthInterceptor(credentials.getUsername(), credentials.getPassword()));
+      Optional<UsernamePasswordWithProxyCredentials> credentials;
+      if (pluginConfig.getAuthMode() == AuthMode.USER_TRANSLATION) {
+        credentials = getCredentials(username);
+        if (!credentials.isPresent() || StringUtils.isEmpty(credentials.get().getUsername()) || StringUtils.isEmpty(credentials.get().getPassword())) {
+          throw UserException.connectionError()
+            .message("You do not have valid credentials for this API.  Please provide your credentials.")
+            .addContext(errorContext)
+            .build(logger);
+        }
+      } else {
+        credentials = getCredentials();
+      }
+      builder.addInterceptor(new BasicAuthInterceptor(credentials.get().getUsername(), credentials.get().getPassword()));
     }
 
     // Set timeouts
@@ -415,13 +430,17 @@
       .fromConfigForURL(drillConfig, url.toString());
     final String proxyType = config.proxyType();
     if (proxyType != null && !"direct".equals(proxyType)) {
-      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
       builder
         .type(config.proxyType())
         .host(config.proxyHost())
-        .port(config.proxyPort())
-        .username(credentials.getUsername())
-        .password(credentials.getPassword());
+        .port(config.proxyPort());
+
+      Optional<UsernamePasswordWithProxyCredentials> credentials = config.getUsernamePasswordCredentials();
+
+      if (credentials.isPresent()) {
+        builder.username(credentials.get().getUsername())
+          .password(credentials.get().getPassword());
+      }
     }
     return builder.build();
   }
@@ -445,31 +464,18 @@
   }
 
   /**
-   * Logic to determine whether the API connection has global credentials or credentials specific for the
-   * API endpoint.
-   * @param endpointConfig The API endpoint configuration
-   * @return True if the endpoint has credentials, false if not.
-   */
-  private boolean hasEndpointCredentials(HttpApiConfig endpointConfig) {
-    UsernamePasswordCredentials credentials = endpointConfig.getUsernamePasswordCredentials();
-    if (StringUtils.isNotEmpty(credentials.getUsername()) &&
-    StringUtils.isNotEmpty(credentials.getPassword())) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
    * If the user has defined username/password for the specific API endpoint, pass the API endpoint credentials.
    * Otherwise, use the global connection credentials.
    * @return A UsernamePasswordCredentials collection with the correct username/password
    */
-  private UsernamePasswordCredentials getCredentials() {
-    if (hasEndpointCredentials(apiConfig)) {
-      return apiConfig.getUsernamePasswordCredentials();
-    } else {
-      return pluginConfig.getUsernamePasswordCredentials();
-    }
+  private Optional<UsernamePasswordWithProxyCredentials> getCredentials() {
+    Optional<UsernamePasswordWithProxyCredentials> apiCreds = apiConfig.getUsernamePasswordCredentials();
+    return apiCreds.isPresent() ? apiCreds : pluginConfig.getUsernamePasswordCredentials();
+  }
+
+  private Optional<UsernamePasswordWithProxyCredentials> getCredentials(String queryUser) {
+    Optional<UsernamePasswordWithProxyCredentials> apiCreds = apiConfig.getUsernamePasswordCredentials();
+    return apiCreds.isPresent() ? apiCreds : pluginConfig.getUsernamePasswordCredentials(queryUser);
   }
 
   /**
@@ -971,6 +977,7 @@
     private HttpOAuthConfig oAuthConfig;
     private Map<String,String> filters;
     private String connection;
+    private String username;
 
     public SimpleHttpBuilder scanDefn(HttpSubScan scanDefn) {
       this.scanDefn = scanDefn;
@@ -979,6 +986,7 @@
       this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
       this.tokenTable = scanDefn.tableSpec().getTokenTable();
       this.filters = scanDefn.filters();
+      this.username = scanDefn.getUserName();
       return this;
     }
 
@@ -992,6 +1000,11 @@
       return this;
     }
 
+    public SimpleHttpBuilder username(String username) {
+      this.username = username;
+      return this;
+    }
+
     public SimpleHttpBuilder proxyConfig(HttpProxyConfig proxyConfig) {
       this.proxyConfig = proxyConfig;
       return this;
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index dd1f13d..918d2ed 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -22,6 +22,7 @@
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -135,7 +136,7 @@
     configs.put("pokemon", pokemonConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-        new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
   }
@@ -343,7 +344,7 @@
         new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
           80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
           UsernamePasswordCredentials.USERNAME, "globaluser",
-          UsernamePasswordCredentials.PASSWORD, "globalpass")));
+          UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 8f17b26..9e32116 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -21,6 +21,7 @@
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -72,7 +73,7 @@
       new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
         80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
         UsernamePasswordCredentials.USERNAME, "globaluser",
-        UsernamePasswordCredentials.PASSWORD, "globalpass")));
+        UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
index 521dcb2..11d61f5 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
@@ -23,6 +23,7 @@
 import okhttp3.Response;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -110,7 +111,7 @@
     // Add storage plugin for test OAuth
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
       new HttpStoragePluginConfig(false, configs, TIMEOUT, null, null, "", 80, "", "", "",
-        oAuthConfig, credentialsProvider);
+        oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
index dbadcd1..81739d5 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
@@ -24,6 +24,7 @@
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.ExecConstants;
@@ -90,7 +91,7 @@
     // Add storage plugin for test OAuth
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
       new HttpStoragePluginConfig(false, configs, TIMEOUT,null, null, "", 80, "", "", "",
-        oAuthConfig, credentialsProvider);
+        oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index 45f6263..b8c5972 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -21,6 +21,7 @@
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.util.DrillFileUtils;
@@ -115,7 +116,8 @@
     configs.put("github", githubConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+      new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null,
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
   }
@@ -195,7 +197,8 @@
     configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+      new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null,
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
new file mode 100644
index 0000000..2256393
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.Cookie;
+import okhttp3.CookieJar;
+import okhttp3.FormBody;
+import okhttp3.Headers;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.net.util.Base64;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class TestUserTranslationInHttpPlugin extends ClusterTest {
+
+  private static final int MOCK_SERVER_PORT = 47775;
+
+  private static final int TIMEOUT = 30;
+  private final OkHttpClient httpClient = new OkHttpClient.Builder()
+    .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+    .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+    .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+    .cookieJar(new TestCookieJar())
+    .build();
+
+  private static String TEST_JSON_RESPONSE_WITH_DATATYPES;
+  private static int portNumber;
+
+
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @After
+  public void cleanup() throws Exception {
+    FileUtils.cleanDirectory(dirTestWatcher.getStoreDir());
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TEST_JSON_RESPONSE_WITH_DATATYPES = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response2.json"), Charsets.UTF_8).read();
+
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, true)
+      .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+      .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+    portNumber = cluster.drillbit().getWebServerPort();
+
+    HttpApiConfig testEndpoint = HttpApiConfig.builder()
+      .url(makeUrl("http://localhost:%d/json"))
+      .method("GET")
+      .requireTail(false)
+      .authType("basic")
+      .errorOn400(true)
+      .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("sharedEndpoint", testEndpoint);
+
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "user2user");
+    credentials.put("password", "user2pass");
+
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(TEST_USER_2, credentials);
+
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+      new HttpStoragePluginConfig(false, configs, 2, null, null, "",
+        80, "", "", "", null, credentialsProvider, AuthMode.USER_TRANSLATION.name());
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+  }
+
+  @Test
+  public void testEmptyUserCredentials() throws Exception {
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    // First verify that the user has no credentials
+    StoragePluginRegistry registry = cluster.storageRegistry();
+    StoragePlugin plugin = registry.getPlugin("local");
+    PlainCredentialsProvider credentialsProvider = (PlainCredentialsProvider)((CredentialedStoragePluginConfig)plugin.getConfig()).getCredentialsProvider();
+    Map<String, String> credentials = credentialsProvider.getCredentials(TEST_USER_1);
+    assertNotNull(credentials);
+    assertNull(credentials.get("username"));
+    assertNull(credentials.get("password"));
+  }
+
+  @Test
+  public void testQueryWithValidCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to the HTTP API.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
+      .build();
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse()
+        .setResponseCode(200)
+        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+
+      String sql = "SELECT * FROM local.sharedEndpoint";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertEquals(results.rowCount(), 2);
+      results.clear();
+
+      // Verify correct username/password from endpoint configuration
+      RecordedRequest recordedRequest = server.takeRequest();
+      Headers headers = recordedRequest.getHeaders();
+      assertEquals(headers.get("Authorization"), createEncodedText("user2user", "user2pass") );
+    }
+  }
+
+  @Test
+  public void testQueryWithMissingCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to the HTTP API.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse()
+        .setResponseCode(200)
+        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+
+      String sql = "SELECT * FROM local.sharedEndpoint";
+      try {
+        client.queryBuilder().sql(sql).run();
+        fail();
+      } catch (UserException e) {
+        assertTrue(e.getMessage().contains("You do not have valid credentials for this API."));
+      }
+    }
+  }
+
+  private boolean makeLoginRequest(String username, String password) throws IOException {
+    String loginURL =  "http://localhost:" + portNumber + "/j_security_check";
+
+    RequestBody formBody = new FormBody.Builder()
+      .add("j_username", username)
+      .add("j_password", password)
+      .build();
+
+    Request request = new Request.Builder()
+      .url(loginURL)
+      .post(formBody)
+      .addHeader("Content-Type", "application/x-www-form-urlencoded")
+      .addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
+      .build();
+
+    Response response = httpClient.newCall(request).execute();
+    return response.code() == 200;
+  }
+
+  @Test
+  public void testUnrelatedQueryWithUser() throws Exception {
+    // This test verifies that a query with a user that does NOT have credentials
+    // for a plugin using user translation will still execute.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    String sql = "SHOW FILES IN dfs";
+    QuerySummary result = client.queryBuilder().sql(sql).run();
+    assertTrue(result.succeeded());
+  }
+
+  /**
+   * Helper function to start the MockHTTPServer
+   * @return Started Mock server
+   * @throws IOException If the server cannot start, throws IOException
+   */
+  public static MockWebServer startServer () throws IOException {
+    MockWebServer server = new MockWebServer();
+    server.start(MOCK_SERVER_PORT);
+    return server;
+  }
+
+  public static String makeUrl(String url) {
+    return String.format(url, MOCK_SERVER_PORT);
+  }
+
+  private static String createEncodedText(String username, String password) {
+    String pair = username + ":" + password;
+    byte[] encodedBytes = Base64.encodeBase64(pair.getBytes());
+    return "Basic " + new String(encodedBytes);
+  }
+
+  public static class TestCookieJar implements CookieJar {
+
+    private List<Cookie> cookies;
+
+    @Override
+    public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
+      this.cookies =  cookies;
+    }
+
+    @Override
+    public List<Cookie> loadForRequest(HttpUrl url) {
+      if (cookies != null) {
+        return cookies;
+      }
+      return new ArrayList<>();
+    }
+  }
+
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index 1d7503f..09b0987 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -112,7 +112,7 @@
 
   @Override
   public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
-    if (! plugin.getConfig().isWritable()) {
+    if (plugin.getConfig().isWritable() == null || (! plugin.getConfig().isWritable())) {
       throw UserException
         .dataWriteError()
         .message(plugin.getName() + " is not writable.")
@@ -145,7 +145,8 @@
 
     String tableWithSchema = CreateTableStmtBuilder.buildCompleteTableName(tableName, catalog, schema);
     String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
-    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+    SqlDialect dialect = plugin.getDialect(inner.getDataSource());
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, dialect);
 
     try (Connection conn = inner.getDataSource().getConnection();
          Statement stmt = conn.createStatement()) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
index 64c0ce3..74769d2 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
@@ -26,25 +26,42 @@
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SubsetRemover;
 
-public class DefaultJdbcDialect implements JdbcDialect {
-  private final JdbcStoragePlugin plugin;
+import java.util.Optional;
+import javax.sql.DataSource;
 
-  public DefaultJdbcDialect(JdbcStoragePlugin plugin) {
+public class DefaultJdbcDialect implements JdbcDialect {
+
+  private final JdbcStoragePlugin plugin;
+  private final SqlDialect dialect;
+
+  public DefaultJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
     this.plugin = plugin;
+    this.dialect = dialect;
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-      JdbcCatalogSchema schema = new JdbcCatalogSchema(plugin.getName(),
-        plugin.getDataSource(), plugin.getDialect(), plugin.getConvention(),
-        !plugin.getConfig().areTableNamesCaseInsensitive());
-      SchemaPlus holder = parent.add(plugin.getName(), schema);
-      schema.setHolder(holder);
+    Optional<DataSource> dataSource = plugin.getDataSource(config.getQueryUserCredentials());
+
+    if (!dataSource.isPresent()) {
+      return;
+    }
+
+    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+
+    JdbcCatalogSchema schema = new JdbcCatalogSchema(
+      plugin.getName(),
+      dataSource.get(),
+      dialect,
+      convention,
+      !plugin.getConfig().areTableNamesCaseInsensitive()
+    );
+    SchemaPlus holder = parent.add(plugin.getName(), schema);
+    schema.setHolder(holder);
   }
 
   @Override
   public String generateSql(RelOptCluster cluster, RelNode input) {
-    final SqlDialect dialect = plugin.getDialect();
     final JdbcImplementor jdbcImplementor = new JdbcImplementor(dialect,
         (JavaTypeFactory) cluster.getTypeFactory());
     final JdbcImplementor.Result result = jdbcImplementor.visitChild(0,
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index 55ac145..07b12b7 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -55,10 +55,12 @@
 
   private final ImmutableSet<RelOptRule> rules;
   private final JdbcStoragePlugin plugin;
+  private final String username;
 
-  DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin) {
+  DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, String username) {
     super(dialect, ConstantUntypedNull.INSTANCE, name);
     this.plugin = plugin;
+    this.username = username;
     List<RelOptRule> calciteJdbcRules = JdbcRules.rules(this, DrillRelFactories.LOGICAL_BUILDER).stream()
         .filter(rule -> !EXCLUDED_CALCITE_RULES.contains(rule.getClass()))
         .collect(Collectors.toList());
@@ -69,7 +71,7 @@
 
     ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.<RelOptRule>builder()
       .addAll(calciteJdbcRules)
-      .add(new JdbcIntermediatePrelConverterRule(this))
+      .add(new JdbcIntermediatePrelConverterRule(this, username))
       .add(new VertexDrelConverterRule(this))
       .add(RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE)
       .add(RuleInstance.PROJECT_REMOVE_RULE);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
new file mode 100644
index 0000000..3a774e0
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class JdbcConventionFactory {
+  public static final int CACHE_SIZE = 100;
+  public static final Duration CACHE_TTL = Duration.ofHours(1);
+
+  private final Cache<SqlDialect, DrillJdbcConvention> cache = CacheBuilder.newBuilder()
+      .maximumSize(CACHE_SIZE)
+      .expireAfterAccess(CACHE_TTL)
+      .build();
+
+  public DrillJdbcConvention getJdbcConvention(
+      JdbcStoragePlugin plugin,
+      SqlDialect dialect,
+      String username) {
+    try {
+      return cache.get(dialect, new Callable<DrillJdbcConvention>() {
+        @Override
+        public DrillJdbcConvention call() {
+          return new DrillJdbcConvention(dialect, plugin.getName(), plugin, username);
+        }
+      });
+    } catch (ExecutionException ex) {
+      throw new DrillRuntimeException("Cannot load the requested DrillJdbcConvention", ex);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
index 9c10cf7..dfc2073 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
@@ -17,16 +17,39 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.jdbc.clickhouse.ClickhouseJdbcDialect;
+import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 public class JdbcDialectFactory {
   public static final String JDBC_CLICKHOUSE_PREFIX = "jdbc:clickhouse";
+  public static final int CACHE_SIZE = 100;
+  public static final Duration CACHE_TTL = Duration.ofHours(1);
 
-  public static JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, String url) {
-    if (url.startsWith(JDBC_CLICKHOUSE_PREFIX)) {
-      return new ClickhouseJdbcDialect(plugin);
-    } else {
-      return new DefaultJdbcDialect(plugin);
+  private final Cache<SqlDialect, JdbcDialect> cache = CacheBuilder.newBuilder()
+      .maximumSize(CACHE_SIZE)
+      .expireAfterAccess(CACHE_TTL)
+      .build();
+
+  public JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
+    try {
+      return cache.get(dialect, new Callable<JdbcDialect>() {
+        @Override
+        public JdbcDialect call() {
+          return plugin.getConfig().getUrl().startsWith(JDBC_CLICKHOUSE_PREFIX)
+              ? new ClickhouseJdbcDialect(plugin, dialect)
+              : new DefaultJdbcDialect(plugin, dialect);
+        }
+      });
+    } catch (ExecutionException ex) {
+      throw new DrillRuntimeException("Cannot load the requested JdbcDialect", ex);
     }
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
index 4fcd14d..8ef3061 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -18,10 +18,11 @@
 package org.apache.drill.exec.store.jdbc;
 
 import java.util.List;
+import java.util.Objects;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
@@ -42,36 +43,43 @@
   private final List<SchemaPath> columns;
   private final JdbcStoragePlugin plugin;
   private final double rows;
+  private int hashCode;
 
   @JsonCreator
   public JdbcGroupScan(
       @JsonProperty("sql") String sql,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("config") StoragePluginConfig config,
+      @JsonProperty("config") JdbcStorageConfig config,
       @JsonProperty("rows") double rows,
+      @JsonProperty("username") String username,
       @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
-    super("");
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
     this.rows = rows;
   }
 
-  JdbcGroupScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin, double rows) {
-    super("");
+  JdbcGroupScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin, double rows, String username) {
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugin;
     this.rows = rows;
   }
 
+  @JsonProperty("config")
+  public JdbcStorageConfig config() {
+    return plugin.getConfig();
+  }
+
   @Override
   public void applyAssignments(List<DrillbitEndpoint> endpoints) {
   }
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new JdbcSubScan(sql, columns, plugin);
+    return new JdbcSubScan(sql, columns, plugin, getUserName());
   }
 
   @Override
@@ -88,11 +96,13 @@
         1);
   }
 
+  @JsonProperty("sql")
   public String getSql() {
     return sql;
   }
 
   @Override
+  @JsonProperty("columns")
   public List<SchemaPath> getColumns() {
     return columns;
   }
@@ -102,12 +112,50 @@
     return sql + plugin.getConfig();
   }
 
-  public StoragePluginConfig getConfig() {
+  public JdbcStorageConfig getConfig() {
     return plugin.getConfig();
   }
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new JdbcGroupScan(sql, columns, plugin, rows);
+    return new JdbcGroupScan(sql, columns, plugin, rows, userName);
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    JdbcGroupScan that = (JdbcGroupScan) obj;
+    return Objects.equals(sql, that.sql) &&
+      Objects.equals(columns, that.columns) &&
+      Objects.equals(rows, that.rows) &&
+      Objects.equals(plugin.getName(), that.plugin.getName()) &&
+      Objects.equals(config(), that.getConfig());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash code is cached since Calcite calls this method many times.
+    if (hashCode == 0) {
+      // Don't include cost; it is derived.
+      hashCode = Objects.hash(sql, columns, plugin.getConfig(), rows, plugin.getName());
+    }
+    return hashCode;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("sql", sql)
+      .field("columns", columns)
+      .field("jdbcConfig", plugin.getConfig())
+      .field("rows", rows)
+      .toString();
+  }
+
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
index baaa0b5..f21b986 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -35,9 +35,11 @@
  * before execution can happen.
  */
 public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable {
+  private final String username;
 
-  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, String username) {
     super(cluster, traits, child);
+    this.username = username;
   }
 
   @Override
@@ -47,7 +49,7 @@
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput());
+    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput(), username);
   }
 
   @Override
@@ -62,7 +64,7 @@
 
   @Override
   public Prel finalizeRel() {
-    return new JdbcPrel(getCluster(), getTraitSet(), this);
+    return new JdbcPrel(getCluster(), getTraitSet(), this, username);
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
index a66888f..8995aab 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
@@ -32,8 +32,9 @@
 
   private final RelTrait inTrait;
   private final RelTrait outTrait;
+  private final String username;
 
-  public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention) {
+  public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention, String username) {
     super(
         RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
             RelOptHelper.any(RelNode.class, jdbcConvention)),
@@ -41,6 +42,7 @@
 
     this.inTrait = DrillRel.DRILL_LOGICAL;
     this.outTrait = Prel.DRILL_PHYSICAL;
+    this.username = username;
   }
 
   @Override
@@ -49,7 +51,7 @@
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
         in.getTraitSet().replace(outTrait),
-        in.getInput(0));
+        in.getInput(0), username);
     call.transformTo(jdbcIntermediatePrel);
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index 487c8cc..b126d94 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -43,13 +43,16 @@
   private final String sql;
   private final double rows;
   private final DrillJdbcConvention convention;
+  private final String username;
 
-  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel) {
+  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel, String username) {
     super(cluster, traitSet);
     final RelNode input = prel.getInput();
+    this.username = username;
     rows = input.estimateRowCount(cluster.getMetadataQuery());
     convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
-    sql = convention.getPlugin().getJdbcDialect().generateSql(getCluster(), input);
+    JdbcDialect jdbcDialect = convention.getPlugin().getJdbcDialect(convention.dialect);
+    sql = jdbcDialect.generateSql(getCluster(), input);
     rowType = input.getRowType();
   }
 
@@ -71,7 +74,7 @@
     for (String col : rowType.getFieldNames()) {
       columns.add(SchemaPath.getSimplePath(col));
     }
-    JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows);
+    JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows, username);
     return creator.addMetadata(this, output);
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
index fe83a2e..e403e4f 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
@@ -120,7 +120,7 @@
   public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
     this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
     this.rowList = new ArrayList<>();
-    this.dialect = config.getPlugin().getDialect();
+    this.dialect = config.getPlugin().getDialect(source);
     this.config = config;
     this.rawTableName = name;
     this.fields = new ArrayList<>();
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
index 6a727579..01ee329 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
@@ -27,15 +27,17 @@
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
 
+import javax.sql.DataSource;
+
 public class JdbcScanBatchCreator implements BatchCreator<JdbcSubScan> {
 
   @Override
@@ -44,7 +46,7 @@
     Preconditions.checkArgument(children.isEmpty());
 
     try {
-      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+      ScanFrameworkBuilder builder = createBuilder(context, subScan);
       return builder.buildScanOperator(context, subScan);
     } catch (UserException e) {
       // Rethrow user exceptions directly
@@ -55,14 +57,22 @@
     }
   }
 
-  private ScanFrameworkBuilder createBuilder(OptionManager options, JdbcSubScan subScan) {
-    JdbcStorageConfig config = subScan.getConfig();
+  private ScanFrameworkBuilder createBuilder(ExecutorFragmentContext context, JdbcSubScan subScan) {
     ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
     builder.projection(subScan.getColumns());
     builder.setUserName(subScan.getUserName());
     JdbcStoragePlugin plugin = subScan.getPlugin();
-    List<ManagedReader<SchemaNegotiator>> readers =
-      Collections.singletonList(new JdbcBatchReader(plugin.getDataSource(), subScan.getSql(), subScan.getColumns()));
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+    DataSource ds = plugin.getDataSource(userCreds)
+      .orElseThrow(() -> UserException.permissionError().message(
+        "Query user %s could not obtain a connection to %s, missing credentials?",
+        userCreds.getUserName(),
+        plugin.getName()
+      ).build(JdbcStoragePlugin.logger));
+
+    List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(
+      new JdbcBatchReader(ds, subScan.getSql(), subScan.getColumns())
+    );
 
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(readers.iterator());
     builder.setReaderFactory(readerFactory);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
index df2d789..d6deb64 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
@@ -20,6 +20,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import com.fasterxml.jackson.annotation.JsonFilter;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -28,14 +29,21 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @JsonTypeName(JdbcStorageConfig.NAME)
 @JsonFilter("passwordFilter")
-public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
+public class JdbcStorageConfig extends CredentialedStoragePluginConfig {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcStorageConfig.class);
 
   public static final String NAME = "jdbc";
   public static final int DEFAULT_MAX_WRITER_BATCH_SIZE = 10000;
@@ -43,7 +51,7 @@
   private final String driver;
   private final String url;
   private final boolean caseInsensitiveTableNames;
-  private final boolean writable;
+  private final Boolean writable;
   private final Map<String, Object> sourceParameters;
   private final int writerBatchSize;
 
@@ -54,17 +62,59 @@
       @JsonProperty("username") String username,
       @JsonProperty("password") String password,
       @JsonProperty("caseInsensitiveTableNames") boolean caseInsensitiveTableNames,
-      @JsonProperty("writable") boolean writable,
+      @JsonProperty("writable") Boolean writable,
       @JsonProperty("sourceParameters") Map<String, Object> sourceParameters,
       @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+      @JsonProperty("authMode") String authMode,
       @JsonProperty("writerBatchSize") int writerBatchSize) {
-    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null);
+    super(
+      CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
+      credentialsProvider == null,
+      AuthMode.parseOrDefault(authMode)
+    );
     this.driver = driver;
     this.url = url;
     this.writable = writable;
     this.caseInsensitiveTableNames = caseInsensitiveTableNames;
     this.sourceParameters = sourceParameters == null ? Collections.emptyMap() : sourceParameters;
-    this.writerBatchSize = writerBatchSize == 0 ? writerBatchSize = DEFAULT_MAX_WRITER_BATCH_SIZE : writerBatchSize;
+    this.writerBatchSize = writerBatchSize == 0 ? DEFAULT_MAX_WRITER_BATCH_SIZE : writerBatchSize;
+  }
+
+
+  private JdbcStorageConfig(JdbcStorageConfig that, CredentialsProvider credentialsProvider) {
+    super(credentialsProvider, credentialsProvider == null, that.authMode);
+    this.driver = that.driver;
+    this.url = that.url;
+    this.writable = that.writable;
+    this.caseInsensitiveTableNames = that.caseInsensitiveTableNames;
+    this.sourceParameters = that.sourceParameters;
+    this.writerBatchSize = that.writerBatchSize;
+  }
+
+  @JsonProperty("userName")
+  public String getUsername() {
+    if (!directCredentials) {
+      return null;
+    }
+    return getUsernamePasswordCredentials(null)
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
+  }
+
+  @JsonIgnore
+  @JsonProperty("password")
+  public String getPassword() {
+    if (!directCredentials) {
+      return null;
+    }
+    return getUsernamePasswordCredentials(null)
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
+  }
+
+  @Override
+  public JdbcStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new JdbcStorageConfig(this, credentialsProvider);
   }
 
   public String getDriver() {
@@ -75,24 +125,10 @@
     return url;
   }
 
-  public boolean isWritable() { return writable; }
+  public Boolean isWritable() { return writable; }
 
   public int getWriterBatchSize() { return writerBatchSize; }
 
-  public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
-    }
-    return null;
-  }
-
-  public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
-    }
-    return null;
-  }
-
   @JsonProperty("caseInsensitiveTableNames")
   public boolean areTableNamesCaseInsensitive() {
     return caseInsensitiveTableNames;
@@ -103,8 +139,26 @@
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(UserCredentials userCredentials) {
+    switch (authMode) {
+      case SHARED_USER:
+        return new UsernamePasswordCredentials.Builder()
+          .setCredentialsProvider(credentialsProvider)
+          .build();
+      case USER_TRANSLATION:
+        Preconditions.checkNotNull(
+          userCredentials,
+          "A drill query user is required for user translation auth mode."
+        );
+        return new UsernamePasswordCredentials.Builder()
+          .setCredentialsProvider(credentialsProvider)
+          .setQueryUser(userCredentials.getUserName()) // lgtm [java/dereferenced-value-may-be-null]
+          .build();
+      default:
+        throw UserException.connectionError()
+          .message("This storage plugin does not support auth mode: %s", authMode)
+          .build(logger);
+    }
   }
 
   @Override
@@ -137,7 +191,9 @@
       .field("url", url)
       .field("writable", writable)
       .field("writerBatchSize", writerBatchSize)
+      .field("sourceParameters", sourceParameters)
       .field("caseInsensitiveTableNames", caseInsensitiveTableNames)
+      .field("credentialProvider", credentialsProvider)
       .toString();
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 32d4d52..6caf9dd 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -18,7 +18,10 @@
 package org.apache.drill.exec.store.jdbc;
 
 import java.util.Properties;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import com.zaxxer.hikari.HikariConfig;
@@ -31,11 +34,13 @@
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,39 +48,75 @@
 
 public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
-  private static final Logger logger = LoggerFactory.getLogger(JdbcStoragePlugin.class);
+  static final Logger logger = LoggerFactory.getLogger(JdbcStoragePlugin.class);
 
-  private final JdbcStorageConfig config;
-  private final HikariDataSource dataSource;
-  private final SqlDialect dialect;
-  private final DrillJdbcConvention convention;
-  private final JdbcDialect jdbcDialect;
+  private final JdbcStorageConfig jdbcStorageConfig;
+  private final JdbcDialectFactory dialectFactory;
+  private final JdbcConventionFactory conventionFactory;
+  // DataSources for this storage config keyed on JDBC username
+  private final Map<String, HikariDataSource> dataSources = new ConcurrentHashMap<>();
 
-  public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) {
+  public JdbcStoragePlugin(JdbcStorageConfig jdbcStorageConfig, DrillbitContext context, String name) {
     super(context, name);
-    this.config = config;
-    this.dataSource = initDataSource(config);
-    this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
-    this.convention = new DrillJdbcConvention(dialect, name, this);
-    this.jdbcDialect = JdbcDialectFactory.getJdbcDialect(this, config.getUrl());
+    this.jdbcStorageConfig = jdbcStorageConfig;
+    this.dialectFactory = new JdbcDialectFactory();
+    this.conventionFactory = new JdbcConventionFactory();
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-    this.jdbcDialect.registerSchemas(config, parent);
+    UserCredentials userCreds = config.getQueryUserCredentials();
+    Optional<DataSource> dataSource = getDataSource(userCreds);
+    if (!dataSource.isPresent()) {
+      logger.debug(
+        "No schemas will be registered in {} for query user {}.",
+        getName(),
+        config.getUserName()
+      );
+      return;
+    }
+
+    SqlDialect dialect = getDialect(dataSource.get());
+    getJdbcDialect(dialect).registerSchemas(config, parent);
   }
 
-  public JdbcDialect getJdbcDialect() {
-    return jdbcDialect;
+  public Optional<DataSource> getDataSource(UserCredentials userCredentials) {
+    Optional<UsernamePasswordCredentials> jdbcCreds = jdbcStorageConfig.getUsernamePasswordCredentials(userCredentials);
+
+    if (!jdbcCreds.isPresent()) {
+      logger.debug(
+        "There are no {} mode credentials in {} for query user {}",
+        jdbcStorageConfig.getAuthMode(),
+        getName(),
+        userCredentials.getUserName()
+      );
+      return Optional.<DataSource>empty();
+    }
+
+    return Optional.of(dataSources.computeIfAbsent(
+      jdbcCreds.get().getUsername(),
+      ds -> initDataSource(this.jdbcStorageConfig, jdbcCreds.get())
+    ));
   }
 
-  public DrillJdbcConvention getConvention() {
-    return convention;
+  public SqlDialect getDialect(DataSource dataSource) {
+    return JdbcSchema.createDialect(
+      SqlDialectFactoryImpl.INSTANCE,
+      dataSource
+    );
+  }
+
+  public JdbcDialect getJdbcDialect(SqlDialect dialect) {
+    return dialectFactory.getJdbcDialect(this, dialect);
+  }
+
+  public DrillJdbcConvention getConvention(SqlDialect dialect, String username) {
+    return conventionFactory.getJdbcConvention(this, dialect, username);
   }
 
   @Override
   public JdbcStorageConfig getConfig() {
-    return config;
+    return jdbcStorageConfig;
   }
 
   @Override
@@ -85,25 +126,24 @@
 
   @Override
   public boolean supportsWrite() {
-    return config.isWritable();
-  }
-
-  public DataSource getDataSource() {
-    return dataSource;
-  }
-
-  public SqlDialect getDialect() {
-    return dialect;
+    return jdbcStorageConfig.isWritable();
   }
 
   @Override
   public Set<RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext context) {
-    return convention.getRules();
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+    Optional<DataSource> dataSource = getDataSource(userCreds);
+
+    if (!dataSource.isPresent()) {
+      return ImmutableSet.of();
+    }
+
+    return getConvention( getDialect(dataSource.get()), userCreds.getUserName() ).getRules();
   }
 
   @Override
-  public void close() {
-    AutoCloseables.closeSilently(dataSource);
+  public void close() throws Exception {
+    AutoCloseables.close(dataSources.values());
   }
 
   /**
@@ -118,7 +158,10 @@
    * @throws UserException if unable to configure Hikari data source
    */
   @VisibleForTesting
-  static HikariDataSource initDataSource(JdbcStorageConfig config) {
+  static HikariDataSource initDataSource(
+    JdbcStorageConfig config,
+    UsernamePasswordCredentials jdbcCredentials
+  ) {
     try {
       Properties properties = new Properties();
 
@@ -127,8 +170,8 @@
       systems with connections which mostly remain idle.  A data source that is present in N
       storage configs replicated over P drillbits with a HikariCP minimumIdle value of Q will
       have N×P×Q connections made to it eagerly.
-        The trade off of lazier connections is increased latency should there be a spike in user
-      queries involving a JDBC data source.  When comparing the defaults that follow with e.g. the
+        The trade off of lazier connections is increased latency after periods of inactivity in
+      which the pool has emptied.  When comparing the defaults that follow with e.g. the
       HikariCP defaults, bear in mind that the context here is OLAP, not OLTP.  It is normal
       for queries to run for a long time and to be separated by long intermissions. Users who
       prefer eager to lazy connections remain free to overwrite the following defaults in their
@@ -153,11 +196,23 @@
 
       hikariConfig.setDriverClassName(config.getDriver());
       hikariConfig.setJdbcUrl(config.getUrl());
-      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
-      hikariConfig.setUsername(credentials.getUsername());
-      hikariConfig.setPassword(credentials.getPassword());
-      // this serves as a hint to the driver, which *might* enable database optimizations
-      hikariConfig.setReadOnly(!config.isWritable());
+
+      if (jdbcCredentials != null) {
+        hikariConfig.setUsername(jdbcCredentials.getUsername());
+        hikariConfig.setPassword(jdbcCredentials.getPassword());
+      }
+
+      /*
+      The following serves as a hint to the driver, which *might* enable database
+      optimizations.  Unfortunately some JDBC drivers without read-only support,
+      notably Snowflake's, fail to connect outright when this option is set even
+      though it is only a hint, so enabling it is generally problematic.
+
+      The solution is to leave that option as null.
+      */
+      if (config.isWritable() != null) {
+        hikariConfig.setReadOnly(!config.isWritable());
+      }
 
       return new HikariDataSource(hikariConfig);
     } catch (RuntimeException e) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index f08f4c1..f6ac4d0 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -31,6 +31,7 @@
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 import java.util.List;
+import java.util.Objects;
 
 @JsonTypeName("jdbc-sub-scan")
 public class JdbcSubScan extends AbstractSubScan {
@@ -46,15 +47,16 @@
       @JsonProperty("sql") String sql,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("config") StoragePluginConfig config,
+      @JsonProperty("username") String username,
       @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
-    super("");
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
   }
 
-  JdbcSubScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin) {
-    super("");
+  JdbcSubScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin, String username) {
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugin;
@@ -89,4 +91,22 @@
       .field("columns", columns)
       .toString();
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sql, columns);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    JdbcSubScan other = (JdbcSubScan) obj;
+    return Objects.equals(sql, other.sql)
+      && Objects.equals(columns, other.columns);
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
index 8a28565..182c104 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
@@ -19,10 +19,13 @@
 
 import java.util.List;
 
+import javax.sql.DataSource;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -33,7 +36,19 @@
     throws ExecutionSetupException {
     assert children != null && children.size() == 1;
 
-    return new WriterRecordBatch(config, children.iterator().next(), context,
-      new JdbcRecordWriter (config.getPlugin().getDataSource(), null, config.getTableName(), config));
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+    DataSource ds = config.getPlugin().getDataSource(userCreds)
+      .orElseThrow(() -> new ExecutionSetupException(String.format(
+        "Query user %s could obtain a connection to %s, missing credentials?",
+        userCreds.getUserName(),
+        config.getPlugin().getName()
+      )));
+
+    return new WriterRecordBatch(
+      config,
+      children.iterator().next(),
+      context,
+      new JdbcRecordWriter(ds, null, config.getTableName(), config)
+    );
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
index 9251560..4a58bfc 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
@@ -23,30 +23,48 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.jdbc.DrillJdbcConvention;
 import org.apache.drill.exec.store.jdbc.JdbcDialect;
 import org.apache.drill.exec.store.jdbc.JdbcStoragePlugin;
 
+import java.util.Optional;
+
+import javax.sql.DataSource;
+
 public class ClickhouseJdbcDialect implements JdbcDialect {
 
   private final JdbcStoragePlugin plugin;
+  private final SqlDialect dialect;
 
-  public ClickhouseJdbcDialect(JdbcStoragePlugin plugin) {
+  public ClickhouseJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
     this.plugin = plugin;
+    this.dialect = dialect;
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-    ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(plugin.getName(),
-      plugin.getDataSource(), plugin.getDialect(), plugin.getConvention());
+    UserCredentials userCreds = config.getQueryUserCredentials();
+    Optional<DataSource> dataSource = plugin.getDataSource(userCreds);
+    if (!dataSource.isPresent()) {
+      return;
+    }
+    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+
+    ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(
+      plugin.getName(),
+      dataSource.get(),
+      dialect,
+      convention
+    );
     SchemaPlus holder = parent.add(plugin.getName(), schema);
     schema.setHolder(holder);
   }
 
   @Override
   public String generateSql(RelOptCluster cluster, RelNode input) {
-    final SqlDialect dialect = plugin.getDialect();
     final JdbcImplementor jdbcImplementor = new ClickhouseJdbcImplementor(dialect,
       (JavaTypeFactory) cluster.getTypeFactory());
     final JdbcImplementor.Result result = jdbcImplementor.visitChild(0,
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java
index 6bab89a..be1acb8 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java
@@ -44,7 +44,7 @@
     DrillLimitRelBase limit = (DrillLimitRelBase) rel;
     if (limit.getOffset() == null
       || !limit.getTraitSet().contains(RelCollations.EMPTY)
-      || !(convention.getPlugin().getDialect() instanceof MssqlSqlDialect)) {
+      || !(convention.dialect instanceof MssqlSqlDialect)) {
       return super.convert(limit);
     } else {
       // MS SQL doesn't support either OFFSET or FETCH without ORDER BY.
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java
index 4b52306..da0d1f3 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java
@@ -42,7 +42,7 @@
       // So do not push down only the limit with both OFFSET and FETCH but without ORDER BY.
       return sort.offset == null
         || !sort.getCollation().getFieldCollations().isEmpty()
-        || !(convention.getPlugin().getDialect() instanceof MssqlSqlDialect);
+        || !(convention.dialect instanceof MssqlSqlDialect);
     }
     return false;
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
index 5b02c4d..2e345ea 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
@@ -19,7 +19,9 @@
 
 import com.zaxxer.hikari.HikariDataSource;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTest;
 import org.junit.Before;
@@ -53,8 +55,8 @@
   @Test
   public void testInitWithoutUserAndPassword() {
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, null, null, false, false, null, null, 1000);
-    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      DRIVER, url, null, null, false, false, null, null, AuthMode.SHARED_USER.name(), 1000);
+    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config, null)) {
       assertEquals(DRIVER, dataSource.getDriverClassName());
       assertEquals(url, dataSource.getJdbcUrl());
       assertNull(dataSource.getUsername());
@@ -65,8 +67,9 @@
   @Test
   public void testInitWithUserAndPassword() {
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, null, null, 1000);
-    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      DRIVER, url, "user", "password", false, false, null, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
+    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config, jdbcCreds)) {
       assertEquals("user", dataSource.getUsername());
       assertEquals("password", dataSource.getPassword());
     }
@@ -80,14 +83,17 @@
     sourceParameters.put("connectionTestQuery", "select * from information_schema.collations");
     sourceParameters.put("dataSource.cachePrepStmts", true);
     sourceParameters.put("dataSource.prepStmtCacheSize", 250);
+    sourceParameters.put("dataSource.minimumIdle", 0);
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, sourceParameters, null, 1000);
-    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      DRIVER, url, "user", "password", false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
+    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config, jdbcCreds)) {
       assertEquals(5, dataSource.getMinimumIdle());
       assertFalse(dataSource.isAutoCommit());
       assertEquals("select * from information_schema.collations", dataSource.getConnectionTestQuery());
       assertEquals(true, dataSource.getDataSourceProperties().get("cachePrepStmts"));
       assertEquals(250, dataSource.getDataSourceProperties().get("prepStmtCacheSize"));
+      assertEquals(0, dataSource.getDataSourceProperties().get("minimumIdle"));
     }
   }
 
@@ -96,12 +102,13 @@
     Map<String, Object> sourceParameters = new HashMap<>();
     sourceParameters.put("abc", "abc");
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, sourceParameters, null, 1000);
+      DRIVER, url, "user", "password", false,  false, sourceParameters, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
 
     thrown.expect(UserException.class);
     thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
-
-    JdbcStoragePlugin.initDataSource(config);
+    // Drill query user credentials are ignored and may be null for the shared user auth mode.
+    JdbcStoragePlugin.initDataSource(config, jdbcCreds);
   }
 
   @Test
@@ -109,11 +116,12 @@
     Map<String, Object> sourceParameters = new HashMap<>();
     sourceParameters.put("minimumIdle", "abc");
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, sourceParameters, null, 1000);
+      DRIVER, url, "user", "password", false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
 
     thrown.expect(UserException.class);
     thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
-
-    JdbcStoragePlugin.initDataSource(config);
+    // Drill query user credentials are ignored and may be null for the shared user auth mode.
+    JdbcStoragePlugin.initDataSource(config, jdbcCreds);
   }
 }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
index 8230e4b..d2ecb83 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -36,6 +38,8 @@
 import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -66,10 +70,16 @@
       .withInitScript("clickhouse-test-data.sql");
     jdbcContainer.start();
 
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", jdbcContainer.getUsername());
+    credentials.put("password", jdbcContainer.getPassword());
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+
     JdbcStorageConfig jdbcStorageConfig =
       new JdbcStorageConfig("ru.yandex.clickhouse.ClickHouseDriver",
-        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), null,
-        true, false,null, null, 0);
+        jdbcContainer.getJdbcUrl(), null, null,
+        true, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 0);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("clickhouse", jdbcStorageConfig);
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index 4167306..b097b47 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -20,6 +20,8 @@
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 
@@ -30,6 +32,7 @@
 import org.h2.tools.RunScript;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -69,10 +72,18 @@
          FileReader fileReader = new FileReader(scriptFile.getFile())) {
       RunScript.execute(connection, fileReader);
     }
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "root");
+    credentials.put("password", "root");
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
     Map<String, Object> sourceParameters =  new HashMap<>();
     sourceParameters.put("minimumIdle", 1);
+    sourceParameters.put("maximumPoolSize", "1");
+
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
-        "root", "root", true, false, sourceParameters, null, 10000);
+        null, null, true, false, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("h2", jdbcStorageConfig);
     cluster.defineStoragePlugin("h2o", jdbcStorageConfig);
@@ -189,6 +200,7 @@
   }
 
   @Test // DRILL-7340
+  @Ignore
   public void twoPluginsPredicatesPushDown() throws Exception {
     String query = "SELECT * " +
         "FROM h2.tmp.drill_h2_test.person l " +
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index d735473..3f9a5b5 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -39,6 +41,8 @@
 import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -82,9 +86,15 @@
       ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
     }
 
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", jdbcContainer.getUsername());
+    credentials.put("password", jdbcContainer.getPassword());
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+
     String jdbcUrl = jdbcContainer.getJdbcUrl();
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-            jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, null, null, 10000);
+            null, null, false, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
@@ -92,7 +102,7 @@
     if (osName.startsWith("linux")) {
       // adds storage plugin with case insensitive table names
       JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-              jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, false,null, null, 10000);
+              null, null, true, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
       jdbcCaseSensitiveStorageConfig.setEnabled(true);
       cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
     }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
index dd11ce5..cfdd658 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -36,6 +38,8 @@
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
 
 import static org.junit.Assert.assertEquals;
@@ -64,10 +68,15 @@
       .withInitScript("postgres-test-data.sql");
     jdbcContainer.start();
 
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", jdbcContainer.getUsername());
+    credentials.put("password", jdbcContainer.getPassword());
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
     JdbcStorageConfig jdbcStorageConfig =
       new JdbcStorageConfig("org.postgresql.Driver",
-        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
-        true, false,null, null, 100000);
+        jdbcContainer.getJdbcUrl(), null, null,
+        true, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 100000);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("pg", jdbcStorageConfig);
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcUserTranslation.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcUserTranslation.java
new file mode 100644
index 0000000..2b34a28
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcUserTranslation.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.ext.ScriptUtils;
+import org.testcontainers.jdbc.JdbcDatabaseDelegate;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashMap;
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcUserTranslation extends ClusterTest {
+
+  private static final String DOCKER_IMAGE_MYSQL = "mysql:5.7.27";
+  private static final String DOCKER_IMAGE_MARIADB = "mariadb:10.6.0";
+  private static JdbcDatabaseContainer<?> jdbcContainer;
+  private static final String PLUGIN_NAME = "mysql";
+
+  @BeforeClass
+  public static void initMysql() throws Exception {
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, true)
+      .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+      .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+    String osName = System.getProperty("os.name").toLowerCase();
+    String mysqlDBName = "drill_mysql_test";
+
+    DockerImageName imageName;
+    if (osName.startsWith("linux") && "aarch64".equals(System.getProperty("os.arch"))) {
+      imageName = DockerImageName.parse(DOCKER_IMAGE_MARIADB).asCompatibleSubstituteFor("mysql");
+    } else {
+      imageName = DockerImageName.parse(DOCKER_IMAGE_MYSQL);
+    }
+
+    jdbcContainer = new MySQLContainer<>(imageName)
+      .withExposedPorts(3306)
+      .withConfigurationOverride("mysql_config_override")
+      .withUsername("mysqlUser")
+      .withPassword("mysqlPass")
+      .withDatabaseName(mysqlDBName)
+      .withUrlParam("serverTimezone", "UTC")
+      .withUrlParam("useJDBCCompliantTimezoneShift", "true")
+      .withInitScript("mysql-test-data.sql");
+    jdbcContainer.start();
+
+    if (osName.startsWith("linux")) {
+      JdbcDatabaseDelegate databaseDelegate = new JdbcDatabaseDelegate(jdbcContainer, "");
+      ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
+    }
+
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>());
+
+    String jdbcUrl = jdbcContainer.getJdbcUrl();
+    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false,
+      null, credentialsProvider, "user_translation", 10000);
+    jdbcStorageConfig.setEnabled(true);
+
+    cluster.defineStoragePlugin(PLUGIN_NAME, jdbcStorageConfig);
+  }
+
+  @AfterClass
+  public static void stopMysql() {
+    if (jdbcContainer != null) {
+      jdbcContainer.stop();
+    }
+  }
+
+  @Test
+  public void testShowDatabasesWithUserWithNoCreds() throws Exception {
+    // This test verifies that a user without credentials to a JDBC data source is able to query
+    // Drill without causing various errors and NPEs.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
+      .build();
+
+    String sql = "SHOW DATABASES";
+    QuerySummary results = client.queryBuilder().sql(sql).run();
+    assertTrue(results.succeeded());
+    assertEquals(results.recordCount(), 7);
+  }
+
+  @Test
+  public void testShowDatabasesWithUserWithValidCreds() throws Exception {
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    // Add the credentials to the user
+    JdbcStorageConfig pluginConfig = (JdbcStorageConfig) cluster.storageRegistry().getPlugin(PLUGIN_NAME).getConfig();
+    PlainCredentialsProvider credentialProvider = (PlainCredentialsProvider) pluginConfig.getCredentialsProvider();
+    credentialProvider.setUserCredentials("mysqlUser", "mysqlPass", TEST_USER_1);
+    pluginConfig.updateCredentialProvider(credentialProvider);
+
+    String sql = "SHOW DATABASES";
+    QuerySummary results = client.queryBuilder().sql(sql).run();
+    assertTrue(results.succeeded());
+    assertEquals(10, results.recordCount());
+  }
+
+  @Test
+  public void testQueryWithInvalidCredentials() {
+    // This test attempts to actually execute a query against a MySQL database with invalid credentials.
+    // The query should fail, but Drill should not crash.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
+      .build();
+
+    String sql = "SELECT * FROM mysql.`drill_mysql_test`.person";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Schema [[mysql, drill_mysql_test]] is not valid"));
+    }
+  }
+
+  @Test
+  public void testQueryWithValidCredentials() throws Exception {
+    // This test validates that a user can query a JDBC data source with valid credentials
+    // and user translation enabled.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    // Add the credentials to the user
+    JdbcStorageConfig pluginConfig = (JdbcStorageConfig) cluster.storageRegistry().getPlugin(PLUGIN_NAME).getConfig();
+    PlainCredentialsProvider credentialProvider = (PlainCredentialsProvider) pluginConfig.getCredentialsProvider();
+    credentialProvider.setUserCredentials("mysqlUser", "mysqlPass", TEST_USER_1);
+    pluginConfig.updateCredentialProvider(credentialProvider);
+
+    String sql = "SELECT first_name, last_name FROM mysql.`drill_mysql_test`.person";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("first_name", MinorType.VARCHAR, 38)
+      .addNullable("last_name", MinorType.VARCHAR,38)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("first_name_1", "last_name_1")
+      .addRow("first_name_2", "last_name_2")
+      .addRow("first_name_3", "last_name_3")
+      .addRow(null, null)
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
index afde051..ce112b8 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
@@ -19,6 +19,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -76,14 +78,20 @@
          FileReader fileReader = new FileReader(scriptFile.getFile())) {
       RunScript.execute(connection, fileReader);
     }
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "root");
+    credentials.put("password", "root");
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
     Map<String, Object> sourceParameters =  new HashMap<>();
     sourceParameters.put("minimumIdle", 1);
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
-      "root", "root", true, true, sourceParameters, null, 10000);
+      "root", "root", true, true, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
 
     JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("org.h2.Driver", connString,
-      "root", "root", true, false, sourceParameters, null, 10000);
+      "root", "root", true, false, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
     jdbcStorageConfigNoWrite.setEnabled(true);
 
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
index c5d5a99..6076267 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
@@ -20,6 +20,7 @@
 
 import org.apache.drill.categories.JdbcStorageTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -49,7 +50,10 @@
 import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -66,6 +70,7 @@
   private static final String DOCKER_IMAGE_MARIADB = "mariadb:10.6.0";
   private static final Logger logger = LoggerFactory.getLogger(TestJdbcWriterWithMySQL.class);
   private static JdbcDatabaseContainer<?> jdbcContainer;
+
   @BeforeClass
   public static void initMysql() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
@@ -98,16 +103,23 @@
       ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
     }
 
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maximumPoolSize", "1");
+    sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+    sourceParameters.put("minimumIdle", "0");
+
     String jdbcUrl = jdbcContainer.getJdbcUrl();
     logger.debug("JDBC URL: {}", jdbcUrl);
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, true, null, null, 10000);
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
 
     JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, null, null, 10000);
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfigNoWrite.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql_no_write", jdbcStorageConfigNoWrite);
@@ -115,7 +127,7 @@
     if (osName.startsWith("linux")) {
       // adds storage plugin with case insensitive table names
       JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-        jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, true, null, null, 10000);
+        jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
       jdbcCaseSensitiveStorageConfig.setEnabled(true);
       cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
     }
@@ -214,7 +226,7 @@
     // Local databases
     String localMySql = "jdbc:mysql://localhost:3306/?useJDBCCompliantTimezoneShift=true&serverTimezone=EST5EDT";
     JdbcStorageConfig localJdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", localMySql,
-      "root", "password", false, true, null, null, 10000);
+      "root", "password", false, true, null, null, AuthMode.SHARED_USER.name(), 10000);
     localJdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("localMysql", localJdbcStorageConfig);
@@ -440,25 +452,6 @@
   }
 
   @Test
-  public void testUnwritableConnection() throws Exception {
-    try {
-      String query = "CREATE TABLE IF NOT EXISTS mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
-    }
-
-    try {
-      String query = "CREATE TABLE mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
-    }
-  }
-
-  @Test
   public void testWithLargeFile() throws Exception {
     String query = "CREATE TABLE mysql.`drill_mysql_test`.test (id,first_name,last_name,email,gender,ip_address) AS " +
       "SELECT id,first_name,last_name,email,gender,ip_address FROM cp.`csv/large_csv.csvh`";
@@ -510,6 +503,30 @@
     }
   }
 
+  @Test
+  public void testUnwritableConnection() throws Exception {
+    try {
+      String query = "CREATE TABLE IF NOT EXISTS mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
+    }
+  }
+
+  @Test
+  public void testUnwritableConnectionWithoutIfNotExists() throws Exception {
+    try {
+      String query = "CREATE TABLE mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
+    }
+  }
+
   @AfterClass
   public static void stopMysql() {
     if (jdbcContainer != null) {
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
index 7815de1..2be9c3a 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
@@ -20,6 +20,7 @@
 
 import org.apache.drill.categories.JdbcStorageTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -45,7 +46,10 @@
 import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -73,17 +77,24 @@
       .withInitScript("postgres-test-data.sql");
     jdbcContainer.start();
 
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maximumPoolSize", "16");
+    sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+    sourceParameters.put("minimumIdle", "0");
+
     JdbcStorageConfig jdbcStorageConfig =
       new JdbcStorageConfig("org.postgresql.Driver",
         jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
-        true, true,null, null, 10000);
+        true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("pg", jdbcStorageConfig);
 
     JdbcStorageConfig unWritableJdbcStorageConfig =
       new JdbcStorageConfig("org.postgresql.Driver",
         jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
-        true, false,null, null, 10000);
+        true, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     unWritableJdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("pg_unwritable", unWritableJdbcStorageConfig);
 
@@ -97,6 +108,30 @@
   }
 
   @Test
+  public void testUnwritableConnectionWithIfNotExists() throws Exception {
+    try {
+      String query = "CREATE TABLE IF NOT EXISTS pg_unwritable.public.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
+    }
+  }
+
+  @Test
+  public void testUnwritableConnection() throws Exception {
+    try {
+      String query = "CREATE TABLE pg_unwritable.`public`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
+    }
+  }
+
+  @Test
   public void testBasicCTAS() throws Exception {
     String query = "CREATE TABLE pg.`public`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
     // Create the table and insert the values
@@ -427,23 +462,4 @@
       assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
     }
   }
-
-  @Test
-  public void testUnwritableConnection() throws Exception {
-    try {
-      String query = "CREATE TABLE IF NOT EXISTS pg_unwritable.public.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
-    }
-
-    try {
-      String query = "CREATE TABLE pg_unwritable.`public`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
-    }
-  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 5c297d8..cb0fb27 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -58,6 +58,7 @@
 import java.net.URLEncoder;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -104,13 +105,13 @@
   private String addCredentialsFromCredentialsProvider(String connection, String name) {
     ConnectionString parsed = new ConnectionString(connection);
     if (parsed.getCredential() == null) {
-      UsernamePasswordCredentials credentials = getUsernamePasswordCredentials(name);
+      Optional<UsernamePasswordCredentials> credentials = getUsernamePasswordCredentials(name);
       try {
         // The default connection has the name "mongo" but multiple connections can be added;
         // each will need their own credentials.
-        if (credentials.getUsername() != null && credentials.getPassword() != null) {
-          String username = URLEncoder.encode(credentials.getUsername(), "UTF-8");
-          String password = URLEncoder.encode(credentials.getPassword(), "UTF-8");
+        if (credentials.isPresent()) {
+          String username = URLEncoder.encode(credentials.get().getUsername(), "UTF-8");
+          String password = URLEncoder.encode(credentials.get().getPassword(), "UTF-8");
           return connection.replaceFirst("://",
               String.format("://%s:%s@", username, password));
         }
@@ -121,7 +122,7 @@
     return connection;
   }
 
-  private UsernamePasswordCredentials getUsernamePasswordCredentials(String name) {
+  private Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String name) {
     CredentialsProvider credentialsProvider = mongoConfig.getCredentialsProvider();
     // for the case if empty credentials, tries to obtain credentials using HadoopCredentialsProvider
     if (credentialsProvider == null || credentialsProvider == PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER) {
@@ -132,7 +133,9 @@
               UsernamePasswordCredentials.PASSWORD,
               DrillMongoConstants.STORE_CONFIG_PREFIX + name + DrillMongoConstants.PASSWORD_CONFIG_SUFFIX));
     }
-    return new UsernamePasswordCredentials(credentialsProvider);
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider).
+      build();
   }
 
   @Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index e803512..f80f1be 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -23,7 +23,7 @@
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.ConnectionString;
 import org.apache.commons.lang3.ObjectUtils;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 
@@ -31,7 +31,7 @@
 import java.util.Objects;
 
 @JsonTypeName(MongoStoragePluginConfig.NAME)
-public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
+public class MongoStoragePluginConfig extends CredentialedStoragePluginConfig {
 
   public static final String NAME = "mongo";
 
@@ -60,6 +60,15 @@
     this.allowDiskUse = allowDiskUse;
   }
 
+  private MongoStoragePluginConfig(MongoStoragePluginConfig that, CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+    this.connection = that.connection;
+    this.clientURI = that.clientURI;
+    this.pluginOptimizations = that.pluginOptimizations;
+    this.batchSize = that.batchSize;
+    this.allowDiskUse = that.allowDiskUse;
+  }
+
   public MongoPluginOptimizations getPluginOptimizations() {
     return pluginOptimizations;
   }
@@ -103,6 +112,11 @@
     return Objects.hash(connection);
   }
 
+  @Override
+  public MongoStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new MongoStoragePluginConfig(this, credentialsProvider);
+  }
+
   public static class MongoPluginOptimizations {
 
     private boolean supportsProjectPushdown = true;
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
index b1fe8e2..6e73dc2 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
@@ -20,10 +20,11 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -34,7 +35,7 @@
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(PhoenixStoragePluginConfig.NAME)
-public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
+public class PhoenixStoragePluginConfig extends CredentialedStoragePluginConfig {
 
   public static final String NAME = "phoenix";
   public static final String THIN_DRIVER_CLASS = "org.apache.phoenix.queryserver.client.Driver";
@@ -62,8 +63,10 @@
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonProperty("host")
@@ -78,19 +81,23 @@
 
   @JsonProperty("userName")
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonIgnore
   @JsonProperty("password")
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonProperty("jdbcURL")
@@ -140,4 +147,9 @@
         .field("props", props)
         .toString();
   }
+
+  @Override
+  public PhoenixStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index b262d7d..c92738e 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -31,6 +31,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
+import java.util.Optional;
 
 /**
  * This class wraps the functionality of the Splunk connection for Drill.
@@ -39,7 +40,7 @@
 
   private static final Logger logger = LoggerFactory.getLogger(SplunkConnection.class);
 
-  private final UsernamePasswordCredentials credentials;
+  private final Optional<UsernamePasswordCredentials> credentials;
   private final String hostname;
   private final int port;
   private Service service;
@@ -73,8 +74,8 @@
     ServiceArgs loginArgs = new ServiceArgs();
     loginArgs.setHost(hostname);
     loginArgs.setPort(port);
-    loginArgs.setPassword(credentials.getPassword());
-    loginArgs.setUsername(credentials.getUsername());
+    loginArgs.setPassword(credentials.map(UsernamePasswordCredentials::getPassword).orElse(null));
+    loginArgs.setUsername(credentials.map(UsernamePasswordCredentials::getUsername).orElse(null));
     try {
       connectionAttempts--;
       service = Service.connect(loginArgs);
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 54c6564..0734347 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -23,15 +23,17 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonTypeName(SplunkPluginConfig.NAME)
-public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
+public class SplunkPluginConfig extends CredentialedStoragePluginConfig {
 
   public static final String NAME = "splunk";
   public static final int DISABLED_RECONNECT_RETRIES = 1;
@@ -51,7 +53,8 @@
                             @JsonProperty("earliestTime") String earliestTime,
                             @JsonProperty("latestTime") String latestTime,
                             @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
-                            @JsonProperty("reconnectRetries") Integer reconnectRetries) {
+                            @JsonProperty("reconnectRetries") Integer reconnectRetries,
+                            @JsonProperty("authMode") String authMode) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
         credentialsProvider == null);
     this.hostname = hostname;
@@ -61,25 +64,40 @@
     this.reconnectRetries = reconnectRetries;
   }
 
+  private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+    this.hostname = that.hostname;
+    this.port = that.port;
+    this.earliestTime = that.earliestTime;
+    this.latestTime = that.latestTime;
+    this.reconnectRetries = that.reconnectRetries;
+  }
+
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonProperty("username")
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonProperty("password")
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonProperty("hostname")
@@ -107,6 +125,10 @@
     return reconnectRetries != null ? reconnectRetries : DISABLED_RECONNECT_RETRIES;
   }
 
+  private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+    return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+  }
+
   @Override
   public boolean equals(Object that) {
     if (this == that) {
@@ -137,4 +159,9 @@
       .field("latestTime", latestTime)
       .toString();
   }
+
+  @Override
+  public SplunkPluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new SplunkPluginConfig(this, credentialsProvider);
+  }
 }
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index d497c6e..4e010b2 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -21,6 +21,7 @@
 import com.splunk.EntityCollection;
 import com.splunk.Index;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -50,7 +51,8 @@
               SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
               SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
               null,
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries()
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
+              StoragePluginConfig.AuthMode.SHARED_USER.name()
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
       sc.connect();
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 23082e3..0abf92a 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.store.splunk;
 
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
@@ -72,7 +73,7 @@
         Integer port = splunk.getFirstMappedPort();
         StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
         SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now",
-                null, 4);
+                null, 4, StoragePluginConfig.AuthMode.SHARED_USER.name());
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
         runningSuite = true;
@@ -97,4 +98,4 @@
   public static boolean isRunningSuite() {
     return runningSuite;
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java
index 24fcd10..6799fa6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java
@@ -27,14 +27,14 @@
  * Provides query context information (such as query start time, query user, default schema etc.) for UDFs.
  */
 public class ContextInformation {
-  private final String queryUser;
+  private final UserCredentials queryUserCredentials;
   private final String currentDefaultSchema;
   private final long queryStartTime;
   private final int rootFragmentTimeZone;
   private final String sessionId;
 
   public ContextInformation(final UserCredentials userCredentials, final QueryContextInformation queryContextInfo) {
-    this.queryUser = userCredentials.getUserName();
+    this.queryUserCredentials = userCredentials;
     this.currentDefaultSchema = queryContextInfo.getDefaultSchemaName();
     this.queryStartTime = queryContextInfo.getQueryStartTime();
     this.rootFragmentTimeZone = queryContextInfo.getTimeZone();
@@ -45,7 +45,14 @@
    * @return userName of the user who issued the current query.
    */
   public String getQueryUser() {
-    return queryUser;
+    return queryUserCredentials.getUserName();
+  }
+
+  /**
+   * @return credentials of the user who issued the current query.
+   */
+  public UserCredentials getQueryUserCredentials() {
+    return queryUserCredentials;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 0da319f..889c251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -55,6 +55,7 @@
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.rpc.RpcException;
@@ -319,7 +320,7 @@
     SchemaConfig schemaConfig = SchemaConfig
         .newBuilder(
             isImpersonationEnabled ? contextInformation.getQueryUser() : ImpersonationUtil.getProcessUserName(),
-            new FragmentSchemaConfigInfoProvider(fragmentOptions, contextInformation.getQueryUser(), context))
+            new FragmentSchemaConfigInfoProvider(fragmentOptions, contextInformation.getQueryUserCredentials(), context))
         .setIgnoreAuthErrors(isImpersonationEnabled)
         .build();
 
@@ -680,16 +681,16 @@
 
     private final OptionManager optionManager;
 
-    private final String queryUser;
+    private final UserCredentials queryUserCredentials;
 
     private final SchemaTreeProvider schemaTreeProvider;
 
     private final ViewExpansionContext viewExpansionContext;
 
     private FragmentSchemaConfigInfoProvider(OptionManager optionManager,
-        String queryUser, DrillbitContext context) {
+        UserCredentials queryUserCredentials, DrillbitContext context) {
       this.optionManager = optionManager;
-      this.queryUser = queryUser;
+      this.queryUserCredentials = queryUserCredentials;
       this.schemaTreeProvider = new SchemaTreeProvider(context);
       viewExpansionContext = new ViewExpansionContext(context.getConfig(), this);
     }
@@ -706,7 +707,12 @@
 
     @Override
     public String getQueryUserName() {
-      return queryUser;
+      return queryUserCredentials.getUserName();
+    }
+
+    @Override
+    public UserCredentials getQueryUserCredentials() {
+      return queryUserCredentials;
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 76b44ce..0b37ce1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -37,6 +37,7 @@
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -146,12 +147,23 @@
     return plannerSettings;
   }
 
-  public UserSession getSession() { return session; }
+  @Override
+  public UserCredentials getQueryUserCredentials() {
+    return session.getCredentials();
+  }
 
   @Override
-  public BufferAllocator getAllocator() { return allocator; }
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
 
-  public QueryId getQueryId( ) { return queryId; }
+  public UserSession getSession() {
+    return session;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
 
   /**
    * Return reference to default schema instance in a schema tree. Each {@link org.apache.calcite.schema.SchemaPlus}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
index 07fbae3..5499003 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
@@ -23,6 +23,7 @@
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
 
 import com.carrotsearch.hppc.ObjectIntHashMap;
@@ -74,7 +75,7 @@
 
   private final SchemaConfigInfoProvider schemaConfigInfoProvider;
   private final int maxChainedUserHops;
-  private final String queryUser;
+  private final UserCredentials queryUserCredentials;
   private final ObjectIntHashMap<String> userTokens = new ObjectIntHashMap<>();
   private final boolean impersonationEnabled;
 
@@ -85,7 +86,7 @@
   public ViewExpansionContext(DrillConfig config, SchemaConfigInfoProvider schemaConfigInfoProvider) {
     this.schemaConfigInfoProvider = schemaConfigInfoProvider;
     this.maxChainedUserHops = config.getInt(IMPERSONATION_MAX_CHAINED_USER_HOPS);
-    this.queryUser = schemaConfigInfoProvider.getQueryUserName();
+    this.queryUserCredentials = schemaConfigInfoProvider.getQueryUserCredentials();
     this.impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
@@ -103,7 +104,7 @@
    */
   public ViewExpansionToken reserveViewExpansionToken(String viewOwner) {
     int totalTokens = 1;
-    if (!viewOwner.equals(queryUser)) {
+    if (!viewOwner.equals(queryUserCredentials.getUserName())) {
       // We want to track the tokens only if the "viewOwner" is not same as the "queryUser".
       if (userTokens.containsKey(viewOwner)) {
         // If the user already exists, we don't need to validate the limit on maximum user hops in chained impersonation
@@ -131,7 +132,7 @@
   private void releaseViewExpansionToken(ViewExpansionToken token) {
     final String viewOwner = token.viewOwner;
 
-    if (viewOwner.equals(queryUser)) {
+    if (viewOwner.equals(queryUserCredentials.getUserName())) {
       // If the token owner and queryUser are same, no need to track the token release.
       return;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
new file mode 100644
index 0000000..c63043e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.rest;
+
+import io.swagger.v3.oas.annotations.ExternalDocumentation;
+import io.swagger.v3.oas.annotations.Operation;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.rest.StorageResources.StoragePluginModel;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
+import org.glassfish.jersey.server.mvc.Viewable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.security.RolesAllowed;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.SecurityContext;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.AUTHENTICATED_ROLE;
+
+@Path("/")
+@RolesAllowed(AUTHENTICATED_ROLE)
+public class CredentialResources {
+  private static final Logger logger = LoggerFactory.getLogger(CredentialResources.class);
+  private static final Comparator<PluginConfigWrapper> PLUGIN_COMPARATOR =
+    Comparator.comparing(PluginConfigWrapper::getName);
+  private static final String ALL_PLUGINS = "all";
+  private static final String ENABLED_PLUGINS = "enabled";
+  private static final String DISABLED_PLUGINS = "disabled";
+  private static final String TRANSLATES_USERS = "translates_users";
+
+  @Inject
+  UserAuthEnabled authEnabled;
+
+  @Inject
+  StoragePluginRegistry storage;
+
+  @Inject
+  SecurityContext sc;
+
+  @Inject
+  HttpServletRequest request;
+
+  @GET
+  @Path("/credentials")
+  @Produces(MediaType.TEXT_HTML)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Viewable getPlugins() {
+    List<StoragePluginModel> model = getPluginsJSON().stream()
+      .map(plugin -> new StoragePluginModel(plugin, request, sc))
+      .collect(Collectors.toList());
+    // Creating an empty model with CSRF token, if there are no storage plugins
+    if (model.isEmpty()) {
+      model.add(new StoragePluginModel(null, request, sc));
+    }
+    return ViewableWithPermissions.create(authEnabled.get(), "/rest/credentials/list.ftl", sc, model);
+  }
+
+  @GET
+  @Path("/credentials.json")
+  @Produces(MediaType.APPLICATION_JSON)
+  public List<PluginConfigWrapper> getPluginsJSON() {
+    return getConfigsFor(TRANSLATES_USERS);
+  }
+
+  @GET
+  @Path("/credentials{group: (/[^/]+?)*}-plugins.json")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public List<PluginConfigWrapper> getConfigsFor(@PathParam("group") String pluginGroup) {
+    PluginFilter filter;
+    switch (pluginGroup.trim()) {
+      case ALL_PLUGINS:
+        filter = PluginFilter.ALL;
+        break;
+      case ENABLED_PLUGINS:
+        filter = PluginFilter.ENABLED;
+        break;
+      case DISABLED_PLUGINS:
+        filter = PluginFilter.DISABLED;
+        break;
+      case TRANSLATES_USERS:
+        filter = PluginFilter.TRANSLATES_USERS;
+        break;
+      default:
+        return Collections.emptyList();
+    }
+    return StreamSupport.stream(
+        Spliterators.spliteratorUnknownSize(storage.storedConfigs(filter).entrySet().iterator(), Spliterator.ORDERED), false)
+      .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue(), sc))
+      .sorted(PLUGIN_COMPARATOR)
+      .collect(Collectors.toList());
+  }
+
+  @POST
+  @Path("/credentials/update_credentials")
+  @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response createOrUpdateCredentials(@FormParam("plugin") String pluginName,
+                                       @FormParam("username") String username,
+                                       @FormParam("password") String password) {
+    String queryUser = sc.getUserPrincipal().getName();
+    pluginName = pluginName.trim();
+    if (pluginName.isEmpty()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+        .entity(message("A storage config name may not be empty"))
+        .build();
+    }
+
+    // Get the config
+    StoragePluginConfig rawConfig = storage.getStoredConfig(pluginName);
+    if (!(rawConfig instanceof CredentialedStoragePluginConfig)) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(pluginName + " does not support per user credentials."))
+        .build();
+    }
+
+    CredentialedStoragePluginConfig config = (CredentialedStoragePluginConfig)rawConfig;
+
+    if (config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(pluginName + " does not support per user translation."))
+        .build();
+    }
+
+    // Get the credential provider
+    CredentialsProvider credentialProvider = config.getCredentialsProvider();
+    credentialProvider.setUserCredentials(username, password, queryUser);
+
+    // Since the config classes are not accessible from java-exec, we have to serialize them,
+    // replace the credential provider with the updated one, and update the storage plugin registry
+    CredentialedStoragePluginConfig newConfig = config.updateCredentialProvider(credentialProvider);
+    newConfig.setEnabled(config.isEnabled());
+
+    try {
+      storage.validatedPut(pluginName, newConfig);
+      // Force re-caching
+      storage.setEnabled(pluginName, newConfig.isEnabled());
+    } catch (PluginException e) {
+      logger.error("Error while saving plugin", e);
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Error while saving plugin: %s", e.getMessage()))
+        .build();
+    }
+
+    return Response.ok().entity(message("Success")).build();
+  }
+
+  @POST
+  @Path("/credentials/{pluginName}/update_credentials.json")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response createOrUpdatePlugin(@PathParam("pluginName") String pluginName, UsernamePasswordContainer credentials) {
+    String queryUser = sc.getUserPrincipal().getName();
+    String cleanPluginName;
+    if (pluginName.isEmpty()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+        .entity(message("A storage config name may not be empty"))
+        .build();
+    }
+    cleanPluginName = pluginName.trim();
+    StoragePluginConfig config = storage.getStoredConfig(cleanPluginName);
+
+    if (!(config instanceof CredentialedStoragePluginConfig)) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(cleanPluginName + " does not support user translation."))
+        .build();
+    }
+
+    if (config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(cleanPluginName + " does not have user translation enabled."))
+        .build();
+    }
+
+    CredentialedStoragePluginConfig credsConfig = (CredentialedStoragePluginConfig)config;
+    CredentialsProvider credentialProvider = credsConfig.getCredentialsProvider();
+    credentialProvider.setUserCredentials(credentials.getUsername(), credentials.getPassword(), queryUser);
+
+    // Since the config classes are not accessible from java-exec, we have to serialize them,
+    // replace the credential provider with the updated one, and update the storage plugin registry
+    CredentialedStoragePluginConfig newConfig = credsConfig.updateCredentialProvider(credentialProvider);
+    newConfig.setEnabled(credsConfig.isEnabled());
+
+    try {
+      storage.validatedPut(cleanPluginName, newConfig);
+      // Force re-caching
+      storage.setEnabled(cleanPluginName, newConfig.isEnabled());
+    } catch (PluginException e) {
+      logger.error("Error while saving plugin", e);
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Error while updating plugin credentials: %s", e.getMessage()))
+        .build();
+    }
+
+    return Response.status(Status.OK)
+      .entity("Credentials have been updated.")
+      .build();
+  }
+
+  private JsonResult message(String message, Object... args) {
+    return new JsonResult(String.format(message, args)); // lgtm [java/tainted-format-string]
+  }
+
+  @XmlRootElement
+  public static class JsonResult {
+    private final String result;
+    public JsonResult(String result) {
+      this.result = result;
+    }
+    public String getResult() {
+      return result;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 1639d58..cbe7128 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -91,6 +91,7 @@
     register(StatusResources.class);
     register(StorageResources.class);
     register(ProfileResources.class);
+    register(CredentialResources.class);
     register(QueryResources.class);
     register(MetricsResources.class);
     register(ThreadsResources.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
index 6454a55..ff5170d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import java.util.Optional;
+
+import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -29,19 +32,22 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
 
 @XmlRootElement
 public class PluginConfigWrapper {
-
   private final String name;
   private final StoragePluginConfig config;
+  private final SecurityContext sc;
 
   @JsonCreator
   public PluginConfigWrapper(@JsonProperty("name") String name,
-      @JsonProperty("config") StoragePluginConfig config) {
+                             @JsonProperty("config") StoragePluginConfig config,
+                             @JacksonInject SecurityContext sc) {
     this.name = name;
     this.config = config;
+    this.sc = sc;
   }
 
   public String getName() { return name; }
@@ -52,6 +58,40 @@
     return config.isEnabled();
   }
 
+  @JsonIgnore
+  public String getUserName() {
+    if (!(config instanceof CredentialedStoragePluginConfig)) {
+      return null;
+    }
+
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
+    CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
+    String queryUser = sc.getUserPrincipal().getName();
+    Optional<UsernamePasswordCredentials> credentials = new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(queryUser)
+      .build();
+
+    return credentials.map(UsernamePasswordCredentials::getUsername).orElse(null);
+  }
+
+  @JsonIgnore
+  public String getPassword() {
+    if (!(config instanceof CredentialedStoragePluginConfig)) {
+      return null;
+    }
+
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
+    CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
+    String queryUser = sc.getUserPrincipal().getName();
+    Optional<UsernamePasswordCredentials> credentials = new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(queryUser)
+      .build();
+
+    return credentials.map(UsernamePasswordCredentials::getPassword).orElse(null);
+  }
+
   public void createOrUpdateInStorage(StoragePluginRegistry storage) throws PluginException {
     storage.validatedPut(name, config);
   }
@@ -66,17 +106,19 @@
    */
   @JsonIgnore
   public boolean isOauth() {
-    if (! (config instanceof AbstractSecuredStoragePluginConfig)) {
+    if (! (config instanceof CredentialedStoragePluginConfig)) {
       return false;
     }
-    AbstractSecuredStoragePluginConfig securedStoragePluginConfig = (AbstractSecuredStoragePluginConfig) config;
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
     CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
     if (credentialsProvider == null) {
       return false;
     }
-    OAuthTokenCredentials tokenCredentials = new OAuthTokenCredentials(credentialsProvider);
 
-    return !StringUtils.isEmpty(tokenCredentials.getClientID()) ||
-      !StringUtils.isEmpty(tokenCredentials.getClientSecret());
+    Optional<OAuthTokenCredentials> tokenCredentials = new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+
+    return tokenCredentials.map(OAuthTokenCredentials::getClientID).orElse(null) != null;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
index 5df4211..fa1fc54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
@@ -87,7 +87,7 @@
   public Viewable getQuery() {
     List<StorageResources.StoragePluginModel> enabledPlugins = sr.getConfigsFor("enabled")
       .stream()
-      .map(plugin -> new StorageResources.StoragePluginModel(plugin, request))
+      .map(plugin -> new StorageResources.StoragePluginModel(plugin, request, sc))
       .collect(Collectors.toList());
     return ViewableWithPermissions.create(
         authEnabled.get(), "/rest/query/query.ftl",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 800fd11..2d36f77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -56,7 +56,7 @@
 import okhttp3.OkHttpClient.Builder;
 import okhttp3.Request;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.oauth.OAuthTokenProvider;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
@@ -148,11 +148,11 @@
   @Produces(MediaType.TEXT_HTML)
   public Viewable getPlugins() {
     List<StoragePluginModel> model = getPluginsJSON().stream()
-      .map(plugin -> new StoragePluginModel(plugin, request))
+      .map(plugin -> new StoragePluginModel(plugin, request, sc))
       .collect(Collectors.toList());
     // Creating an empty model with CSRF token, if there are no storage plugins
     if (model.isEmpty()) {
-      model.add(new StoragePluginModel(null, request));
+      model.add(new StoragePluginModel(null, request, sc));
     }
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/list.ftl", sc, model);
   }
@@ -163,7 +163,7 @@
   @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
   public Response getPluginConfig(@PathParam("name") String name) {
     try {
-      return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name)))
+      return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name), sc))
         .build();
     } catch (Exception e) {
       logger.error("Failure while trying to access storage config: {}", name, e);
@@ -180,7 +180,7 @@
   public Viewable getPlugin(@PathParam("name") String name) {
     StoragePluginModel model = new StoragePluginModel(
       (PluginConfigWrapper) getPluginConfig(name).getEntity(),
-      request
+      request, sc
     );
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/update.ftl", sc,
       model);
@@ -213,7 +213,7 @@
   @Produces(MediaType.APPLICATION_JSON)
   public Response updateRefreshToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
         DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
         OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
         PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
@@ -244,7 +244,7 @@
   @Produces(MediaType.APPLICATION_JSON)
   public Response updateAccessToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
         DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
         OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
         PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
@@ -276,7 +276,7 @@
   public Response updateOAuthTokens(@PathParam("name") String name,
                                     OAuthTokenContainer tokenContainer) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
         DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
         OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
         PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
@@ -307,8 +307,8 @@
   @Produces(MediaType.TEXT_HTML)
   public Response updateAuthToken(@PathParam("name") String name, @QueryParam("code") String code) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
-        AbstractSecuredStoragePluginConfig securedStoragePluginConfig = (AbstractSecuredStoragePluginConfig) storage.getPlugin(name).getConfig();
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
+        CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) storage.getPlugin(name).getConfig();
         CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
         String callbackURL = this.request.getRequestURL().toString();
 
@@ -389,7 +389,7 @@
         .build();
     }
 
-    return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name)))
+    return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name), sc))
       .header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment;filename=\"%s.%s\"", name, format))
       .build();
   }
@@ -468,7 +468,7 @@
   }
 
   private JsonResult message(String message, Object... args) {
-    return new JsonResult(String.format(message, args));
+    return new JsonResult(String.format(message, args));  // lgtm [java/tainted-format-string]
   }
 
   private boolean isSupported(String format) {
@@ -511,7 +511,7 @@
     pluginGroup = StringUtils.isNotEmpty(pluginGroup) ? pluginGroup.replace("/", "") : ALL_PLUGINS;
     return StreamSupport.stream(
       Spliterators.spliteratorUnknownSize(storage.storedConfigs(filter).entrySet().iterator(), Spliterator.ORDERED), false)
-        .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue()))
+        .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue(), sc))
         .sorted(PLUGIN_COMPARATOR)
         .collect(Collectors.toList());
   }
@@ -561,8 +561,9 @@
     private final PluginConfigWrapper plugin;
     private final String type;
     private final String csrfToken;
+    private final SecurityContext securityContext;
 
-    public StoragePluginModel(PluginConfigWrapper plugin, HttpServletRequest request) {
+    public StoragePluginModel(PluginConfigWrapper plugin, HttpServletRequest request, SecurityContext sc) {
       this.plugin = plugin;
 
       if (plugin != null) {
@@ -571,6 +572,11 @@
         this.type = "Unknown";
       }
       csrfToken = WebUtils.getCsrfTokenFromHttpRequest(request);
+      this.securityContext = sc;
+    }
+
+    public String getActiveUser() {
+      return securityContext.getUserPrincipal().getName();
     }
 
     public String getType() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/UsernamePasswordContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/UsernamePasswordContainer.java
new file mode 100644
index 0000000..ddbe9ab
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/UsernamePasswordContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class UsernamePasswordContainer {
+  private final String username;
+  private final String password;
+
+  @JsonCreator
+  public UsernamePasswordContainer(@JsonProperty("username") String username,
+                             @JsonProperty("password") String password) {
+    this.username = username;
+    this.password = password;
+  }
+
+  public String getUsername() {
+      return username;
+    }
+
+  public String getPassword() {
+      return password;
+    }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
index 9877527..e678a7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
@@ -79,8 +79,11 @@
 
     final boolean isUserLoggedIn = AuthDynamicFeature.isUserLoggedIn(sc);
 
+    final boolean showCredentials = (authEnabled && isUserLoggedIn);
+
     final ImmutableMap.Builder<String, Object> mapBuilder = ImmutableMap.<String, Object>builder()
         .put("showStorage", isAdmin)
+        .put("showCredentials", showCredentials)
         .put("showOptions", isAdmin)
         .put("showThreads", isAdmin)
         .put("showLogs", isAdmin)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
index 2e46770..19906fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -19,6 +19,7 @@
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.server.options.OptionValue;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -79,6 +80,10 @@
     return userName;
   }
 
+  public UserCredentials getQueryUserCredentials() {
+    return provider.getQueryUserCredentials();
+  }
+
   /**
    * @return Should ignore if authorization errors are reported while {@link SchemaPlus}
    * instances interact with the underlying storage.
@@ -105,6 +110,8 @@
 
     String getQueryUserName();
 
+    UserCredentials getQueryUserCredentials();
+
     OptionValue getOption(String optionKey);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
index e28a898..3ab2643 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -22,6 +22,7 @@
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.calcite.jdbc.DynamicSchema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -77,6 +78,12 @@
       @Override public String getQueryUserName() {
         return ImpersonationUtil.getProcessUserName();
       }
+
+      @Override public UserCredentials getQueryUserCredentials() {
+        return UserCredentials.newBuilder()
+          .setUserName(ImpersonationUtil.getProcessUserName())
+          .build();
+      }
     };
 
     final SchemaConfig schemaConfig = SchemaConfig.newBuilder(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index f1a75ff..dd34052 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -217,7 +217,7 @@
    */
   Map<String, StoragePluginConfig> storedConfigs();
 
-  enum PluginFilter { ALL, ENABLED, DISABLED };
+  enum PluginFilter { ALL, ENABLED, DISABLED, TRANSLATES_USERS };
 
   /**
    * Return a possibly-filtered set of plugins from the persistent
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 2e9b2d0..64d87f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -33,6 +33,7 @@
 import org.apache.drill.common.collections.ImmutableEntry;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
@@ -107,7 +108,7 @@
  * make sure that the cache ends up agreeing with the persistent store
  * as it was at some point in time.
  * <p>
- * The {@link PluginsMap} class provides in-memory synchronization of the
+ * The {@link StoragePluginMap} class provides in-memory synchronization of the
  * name and config maps. Careful coding is needed when handling refresh
  * since another thread could make the same changes.
  * <p>
@@ -141,7 +142,7 @@
  * <h4>Caveats</h4>
  *
  * The main problem with synchronization at present is that plugins
- * provide a {@link close()} method that, if used, could render the
+ * provide a {@code close()} method that, if used, could render the
  * plugin unusable. Suppose a Cassandra plugin, say, maintains a connection
  * to a server used across multiple queries and threads. Any change to
  * the config immediately calls {@code close()} on the plugin, even though
@@ -763,6 +764,10 @@
       case DISABLED:
         include = !plugin.getValue().isEnabled();
         break;
+      case TRANSLATES_USERS:
+        include = plugin.getValue().getAuthMode() == AuthMode.USER_TRANSLATION
+          && plugin.getValue().isEnabled();
+        break;
       default:
         include = true;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 4c11ab0..6fbba41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -34,7 +34,7 @@
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -43,7 +43,7 @@
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 
 @JsonTypeName(FileSystemConfig.NAME)
-public class FileSystemConfig extends AbstractSecuredStoragePluginConfig {
+public class FileSystemConfig extends CredentialedStoragePluginConfig {
   private static final List<String> FS_CREDENTIAL_KEYS =
       Arrays.asList(
           CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH,
@@ -182,4 +182,9 @@
     }
     return PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
+
+  @Override
+  public FileSystemConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index 210d49e..2833637 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -73,6 +73,9 @@
    * @param visitedPaths set used to ensure same path won't be visited twice
    */
   private void scanSchemaImpl(String schemaPath, SchemaPlus schema, Set<String> visitedPaths) {
+    if (schema == null) {
+      return;
+    }
     Set<String> subSchemaNames = schema.getParentSchema() == null
       ? schema.unwrap(DynamicRootSchema.class).schema.getSubSchemaNames()
       : schema.getSubSchemaNames();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
index 297c193..33a4757 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
@@ -35,13 +35,13 @@
     if (credentialsProvider != null) {
       return credentialsProvider;
     }
+    if (username == null) {
+      return PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+    }
+
     ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
-    if (username != null) {
-      mapBuilder.put(UsernamePasswordCredentials.USERNAME, username);
-    }
-    if (password != null) {
-      mapBuilder.put(UsernamePasswordCredentials.PASSWORD, password);
-    }
+    mapBuilder.put(UsernamePasswordCredentials.USERNAME, username);
+    mapBuilder.put(UsernamePasswordCredentials.PASSWORD, password);
     return new PlainCredentialsProvider(mapBuilder.build());
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
index c7f3d02..3799305 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
@@ -17,27 +17,63 @@
  */
 package org.apache.drill.exec.store.security;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 
 public class UsernamePasswordCredentials {
+  private static final Logger logger = LoggerFactory.getLogger(UsernamePasswordCredentials.class);
   public static final String USERNAME = "username";
   public static final String PASSWORD = "password";
 
   private final String username;
   private final String password;
 
-  public UsernamePasswordCredentials(CredentialsProvider credentialsProvider) {
-    if (credentialsProvider == null) {
-      this.username = null;
-      this.password = null;
-    } else {
-      Map<String, String> credentials = credentialsProvider.getCredentials() == null ? new HashMap<>() : credentialsProvider.getCredentials();
-      this.username = credentials.get(USERNAME);
-      this.password = credentials.get(PASSWORD);
+  /**
+   * While a builder may seem like overkill for a class that is little more than small struct,
+   * it allows us to wrap new instances in an Optional while using contructors does not.
+   */
+  public static class Builder {
+    private CredentialsProvider credentialsProvider;
+    private String queryUser;
+
+    public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
     }
+
+    public Builder setQueryUser(String queryUser) {
+      this.queryUser = queryUser;
+      return this;
+    }
+
+    public Optional<UsernamePasswordCredentials> build() {
+      if (credentialsProvider == null) {
+        return Optional.empty();
+      }
+
+      Map<String, String> credentials = queryUser != null
+        ? credentialsProvider.getCredentials(queryUser)
+        : credentialsProvider.getCredentials();
+
+      if (credentials.size() == 0) {
+        return Optional.empty();
+      }
+
+      return Optional.of(
+        new UsernamePasswordCredentials(credentials.get(USERNAME), credentials.get(PASSWORD))
+      );
+    }
+  }
+
+  public UsernamePasswordCredentials(String username, String password) {
+    this.username = username;
+    this.password = password;
   }
 
   public String getUsername() {
@@ -47,4 +83,30 @@
   public String getPassword() {
     return password;
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(username, password);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UsernamePasswordCredentials that = (UsernamePasswordCredentials) o;
+    return Objects.equals(username, that.password) &&
+      Objects.equals(password, that.password);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("username", username)
+      .maskedField("password", password)
+      .toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java
index 7ae56cc..d6285a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java
@@ -21,23 +21,64 @@
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class UsernamePasswordWithProxyCredentials extends UsernamePasswordCredentials {
   private final String proxyUsername;
   private final String proxyPassword;
 
-  public UsernamePasswordWithProxyCredentials(CredentialsProvider credentialsProvider) {
-    super(credentialsProvider);
-    if (credentialsProvider == null || credentialsProvider.getCredentials() == null) {
-      this.proxyUsername = null;
-      this.proxyPassword = null;
-    } else {
-      Map<String, String> credentials = credentialsProvider.getCredentials() == null ? new HashMap<>() : credentialsProvider.getCredentials();
-      this.proxyUsername = credentials.get(OAuthTokenCredentials.PROXY_USERNAME);
-      this.proxyPassword = credentials.get(OAuthTokenCredentials.PROXY_PASSWORD);
+  /**
+   * While a builder may seem like overkill for a class that is little more than small struct,
+   * it allows us to wrap new instances in an Optional while using contructors does not.
+   */
+  public static class Builder {
+    private CredentialsProvider credentialsProvider;
+    private String queryUser;
+
+    public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
     }
+
+    public Builder setQueryUser(String queryUser) {
+      this.queryUser = queryUser;
+      return this;
+    }
+
+    public Optional<UsernamePasswordWithProxyCredentials> build() {
+      if (credentialsProvider == null) {
+        return Optional.empty();
+      }
+
+      Map<String, String> credentials = queryUser != null
+        ? credentialsProvider.getCredentials(queryUser)
+        : credentialsProvider.getCredentials();
+
+      if (credentials.size() == 0) {
+        return Optional.empty();
+      }
+
+      return Optional.of(
+        new UsernamePasswordWithProxyCredentials(
+          credentials.get(USERNAME),
+          credentials.get(PASSWORD),
+          credentials.get(OAuthTokenCredentials.PROXY_USERNAME),
+          credentials.get(OAuthTokenCredentials.PROXY_PASSWORD)
+        )
+      );
+    }
+  }
+
+  public UsernamePasswordWithProxyCredentials(
+    String username,
+    String password,
+    String proxyUsername,
+    String proxyPassword
+  ) {
+    super(username, password);
+    this.proxyUsername = proxyUsername;
+    this.proxyPassword = proxyPassword;
   }
 
   public String getProxyUsername() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
index 516d872..66b2279 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
@@ -22,8 +22,8 @@
 import org.apache.drill.exec.oauth.PersistentTokenTable;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class OAuthTokenCredentials extends UsernamePasswordCredentials {
 
@@ -38,27 +38,71 @@
   private final String clientID;
   private final String clientSecret;
   private final String tokenURI;
-  private PersistentTokenTable tokenTable;
+  private Optional<PersistentTokenTable> tokenTable;
 
-  public OAuthTokenCredentials(CredentialsProvider credentialsProvider) {
-   super(credentialsProvider);
-   if (credentialsProvider == null) {
-     this.clientID = null;
-     this.clientSecret = null;
-     this.tokenURI = null;
-   } else {
-     Map<String, String> credentials = credentialsProvider.getCredentials() == null
-       ? new HashMap<>() : credentialsProvider.getCredentials();
+  /**
+   * While a builder may seem like overkill for a class that is little more than small struct,
+   * it allows us to wrap new instances in an Optional while using contructors does not.
+   */
+  public static class Builder {
+    private CredentialsProvider credentialsProvider;
+    private String queryUser;
+    private PersistentTokenTable tokenTable;
 
-     this.clientID = credentials.getOrDefault(CLIENT_ID, null);
-     this.clientSecret = credentials.getOrDefault(CLIENT_SECRET, null);
-     this.tokenURI = credentials.getOrDefault(TOKEN_URI, null);
-   }
+    public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
+    }
+
+    public Builder setQueryUser(String queryUser) {
+      this.queryUser = queryUser;
+      return this;
+    }
+
+    public Builder setTokenTable(PersistentTokenTable tokenTable) {
+      this.tokenTable = tokenTable;
+      return this;
+    }
+
+    public Optional<OAuthTokenCredentials> build() {
+      if (credentialsProvider == null) {
+        return Optional.empty();
+      }
+
+      Map<String, String> credentials = queryUser != null
+        ? credentialsProvider.getCredentials(queryUser)
+        : credentialsProvider.getCredentials();
+
+      if (credentials.size() == 0) {
+        return Optional.empty();
+      }
+
+      return Optional.of(
+        new OAuthTokenCredentials(
+          credentials.get(USERNAME),
+          credentials.get(PASSWORD),
+          credentials.get(CLIENT_ID),
+          credentials.get(CLIENT_SECRET),
+          credentials.get(TOKEN_URI),
+          tokenTable
+        )
+      );
+    }
   }
 
-  public OAuthTokenCredentials(CredentialsProvider credentialsProvider, PersistentTokenTable tokenTable) {
-    this(credentialsProvider);
-    this.tokenTable = tokenTable;
+  public OAuthTokenCredentials(
+    String username,
+    String password,
+    String clientID,
+    String clientSecret,
+    String tokenURI,
+    PersistentTokenTable tokenTable
+  ) {
+    super(username, password);
+    this.clientID = clientID;
+    this.clientSecret = clientSecret;
+    this.tokenURI = tokenURI;
+    this.tokenTable = Optional.ofNullable(tokenTable);
   }
 
   public String getClientID() {
@@ -70,17 +114,11 @@
   }
 
   public String getAccessToken() {
-    if (tokenTable == null) {
-      return null;
-    }
-    return tokenTable.getAccessToken();
+    return tokenTable.map(PersistentTokenTable::getAccessToken).orElse(null);
   }
 
   public String getRefreshToken() {
-    if (tokenTable == null) {
-      return null;
-    }
-    return tokenTable.getRefreshToken();
+    return tokenTable.map(PersistentTokenTable::getRefreshToken).orElse(null);
   }
 
   public String getTokenUri() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index c12c713..f9343d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -730,7 +730,7 @@
             new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
             new Date(System.currentTimeMillis()),
             queryStateProcessor.getState(),
-            queryContext.getSession().getCredentials().getUserName(),
+            queryContext.getQueryUserCredentials().getUserName(),
             initiatingClient.getRemoteAddress());
         queryLogger.info(MAPPER.writeValueAsString(q));
       } catch (Exception e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
index 08d1bab..2d2dd90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -40,6 +40,7 @@
 import org.apache.drill.exec.ops.ViewExpansionContext;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.CatalogMetadata;
 import org.apache.drill.exec.proto.UserProtos.ColumnMetadata;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
@@ -602,6 +603,10 @@
       public String getQueryUserName() {
         return session.getCredentials().getUserName();
       }
+
+      @Override public UserCredentials getQueryUserCredentials() {
+        return session.getCredentials();
+      }
     };
   }
 
diff --git a/exec/java-exec/src/main/resources/rest/credentials/list.ftl b/exec/java-exec/src/main/resources/rest/credentials/list.ftl
new file mode 100644
index 0000000..8872594
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/credentials/list.ftl
@@ -0,0 +1,117 @@
+<#--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+  <script src="/static/js/jquery.form.js"></script>
+  <!-- Ace Libraries for Syntax Formatting -->
+  <script src="/static/js/ace-code-editor/ace.js" type="text/javascript" charset="utf-8"></script>
+  <script src="/static/js/ace-code-editor/theme-eclipse.js" type="text/javascript" charset="utf-8"></script>
+  <script src="/static/js/credentialsServerMessage.js"></script>
+</#macro>
+
+<#macro page_body>
+
+<#include "*/confirmationModals.ftl">
+
+<h4 class="col-xs-6 mx-3">User Credential Management</h4>
+
+<div class="pb-2 mt-4 mb-2 border-bottom" style="margin: 5px;"></div>
+
+<div class="container-fluid">
+  <div class="row">
+    <div class="table-responsive col-sm-12 col-md-6 col-lg-5 col-xl-5">
+      <h4>Enabled Storage Plugins</h4>
+      <table class="table table-hover">
+        <tbody>
+        <#list model as pluginModel>
+          <tr>
+            <td style="border:none; max-width: 200px; overflow: hidden; text-overflow: ellipsis;">
+                ${pluginModel.getPlugin().getName()}
+            </td>
+            <td style="border:none;">
+              <button type="button" class="btn btn-primary" data-toggle="modal" data-target="#new-plugin-modal" data-plugin="${pluginModel.getPlugin().getName()}"
+                      data-username="${pluginModel.getPlugin().getUserName()}" data-password="${pluginModel.getPlugin().getPassword()}">
+                Update Credentials
+              </button>
+            </td>
+          </tr>
+        </#list>
+        </tbody>
+      </table>
+    </div>
+
+      <#--onclick="doUpdate('${pluginModel.getPlugin().getName()}')"-->
+      <#-- Modal window for creating plugin -->
+    <div class="modal fade" id="new-plugin-modal" role="dialog" aria-labelledby="configuration">
+      <div class="modal-dialog" role="document">
+        <div class="modal-content">
+          <div class="modal-header">
+            <h4 class="modal-title" id="configuration">Update Credentials</h4>
+            <button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
+          </div>
+          <div class="modal-body">
+
+            <form id="createForm" role="form" action="/credentials/update_credentials" method="POST">
+              <input type="text" class="form-control" name="username" id="usernameField" placeholder="Username" />
+              <input type="text" class="form-control" name="password" id="passwordField" placeholder="Password" />
+              <input type="hidden" name="plugin" id="plugin" />
+              <div style="text-align: right; margin: 10px">
+                <button type="button" class="btn btn-secondary" data-dismiss="modal">Close</button>
+                <button type="submit" class="btn btn-primary" onclick="doCreate()">Update Credentials</button>
+              </div>
+              <input type="hidden" name="csrfToken" value="${model[0].getCsrfToken()}">
+            </form>
+
+            <div id="message" class="d-none alert alert-info">
+            </div>
+          </div>
+        </div>
+      </div>
+    </div>
+      <#-- Modal window for creating plugin -->
+
+    <script>
+      // Populate the modal fields
+      $(function () {
+        $('#new-plugin-modal').on('show.bs.modal', function (event) {
+          var button = $(event.relatedTarget);
+          var username = button.data('username');
+          var password = button.data('password');
+          var plugin = button.data('plugin');
+
+          $('#plugin').val(plugin);
+          $('#usernameField').val(username);
+          $('#passwordField').val(password);
+        });
+      });
+
+      function doCreate() {
+        $("#createForm").ajaxForm({
+          dataType: 'json',
+          success: serverMessage,
+          error: serverMessage
+        });
+      }
+    </script>
+<#include "*/alertModals.ftl">
+</#macro>
+
+<@page_html/>
diff --git a/exec/java-exec/src/main/resources/rest/generic.ftl b/exec/java-exec/src/main/resources/rest/generic.ftl
index d91a7ed..b6c2bf0 100644
--- a/exec/java-exec/src/main/resources/rest/generic.ftl
+++ b/exec/java-exec/src/main/resources/rest/generic.ftl
@@ -67,6 +67,9 @@
             <ul class="nav navbar-nav mr-auto">
               <li class="nav-item"><a class="nav-link" href="/query">Query</a></li>
               <li class="nav-item"><a class="nav-link" href="/profiles">Profiles</a></li>
+              <#if showCredentials == true>
+              <li class="nav-item"><a class="nav-link" href="/credentials">Credentials</a></li>
+              </#if>
               <#if showStorage == true>
               <li class="nav-item"><a class="nav-link" href="/storage">Storage</a></li>
               </#if>
diff --git a/exec/java-exec/src/main/resources/rest/static/js/credentialsServerMessage.js b/exec/java-exec/src/main/resources/rest/static/js/credentialsServerMessage.js
new file mode 100644
index 0000000..d013f2c
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/static/js/credentialsServerMessage.js
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ *  license agreements. See the NOTICE file distributed with this work for additional
+ *  information regarding copyright ownership. The ASF licenses this file to
+ *  You under the Apache License, Version 2.0 (the "License"); you may not use
+ *  this file except in compliance with the License. You may obtain a copy of
+ *  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ *  by applicable law or agreed to in writing, software distributed under the
+ *  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ *  OF ANY KIND, either express or implied. See the License for the specific
+ *  language governing permissions and limitations under the License.
+ */
+
+// Shows Json message from the server
+function serverMessage(data) {
+    const messageEl = $("#message");
+    if (data.result === "Success") {
+        messageEl.removeClass("d-none")
+            .removeClass("alert-danger")
+            .addClass("alert-info")
+            .text(data.result).alert();
+        setTimeout(function() { window.location.href = "/credentials"; }, 800);
+        return true;
+    } else {
+        const errorMessage = data.errorMessage || data.responseJSON["result"];
+
+        messageEl.addClass("d-none");
+        // Wait a fraction of a second before showing the message again. This
+        // makes it clear if a second attempt gives the same error as
+        // the first that a "new" message came back from the server
+        setTimeout(function() {
+            messageEl.removeClass("d-none")
+                .removeClass("alert-info")
+                .addClass("alert-danger")
+                .text("Please retry: " + errorMessage).alert();
+        }, 200);
+        return false;
+    }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
index acfa606..a64e96f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
@@ -27,7 +27,7 @@
 import java.util.Set;
 
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -63,7 +63,7 @@
 
       // Abstract classes do not appear
       assertFalse(result.contains(StoragePluginConfig.class));
-      assertFalse(result.contains(AbstractSecuredStoragePluginConfig.class));
+      assertFalse(result.contains(CredentialedStoragePluginConfig.class));
 
       // The private plugin class does not appear
       assertFalse(result.contains(StoragePluginFixtureConfig.class));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
index 75adfbb..b254987 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
@@ -96,12 +96,11 @@
     String serialized = mapper.writerFor(CredentialsProvider.class).writeValueAsString(credentialsProvider);
 
     String expected =
-        "{\n" +
+      "{\n" +
         "  \"credentialsProviderType\" : \"PlainCredentialsProvider\",\n" +
-        "  \"credentials\" : {\n" +
-        "    \"username\" : \"myLogin\",\n" +
-        "    \"password\" : \"myPass\"\n" +
-        "  }\n" +
+        "  \"credentials\" : {\n" + "    \"username\" : \"myLogin\",\n" +
+        "    \"password\" : \"myPass\"\n" + "  },\n" +
+        "  \"userCredentials\" : { }\n" +
         "}";
 
     assertEquals(expected, serialized);
@@ -124,7 +123,8 @@
         "  \"credentials\" : {\n" +
         "    \"username\" : \"myLogin\",\n" +
         "    \"password\" : \"myPass\"\n" +
-        "  }\n" +
+        "  }, \n" +
+        "  \"userCredentials\" : { }\n" +
         "}";
 
     CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
diff --git a/logical/pom.xml b/logical/pom.xml
index 8713a4b..e3cbfdf 100644
--- a/logical/pom.xml
+++ b/logical/pom.xml
@@ -85,6 +85,24 @@
         <artifactId>joda-time</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
 
diff --git a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
deleted file mode 100644
index ce8d8dc..0000000
--- a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.common.logical;
-
-import org.apache.drill.common.logical.security.CredentialsProvider;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
-
-public abstract class AbstractSecuredStoragePluginConfig extends StoragePluginConfig {
-
-  protected final CredentialsProvider credentialsProvider;
-  protected boolean directCredentials;
-
-  public AbstractSecuredStoragePluginConfig() {
-    this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,  true);
-  }
-
-  public AbstractSecuredStoragePluginConfig(CredentialsProvider credentialsProvider, boolean directCredentials) {
-    this.credentialsProvider = credentialsProvider;
-    this.directCredentials = directCredentials;
-  }
-
-  public CredentialsProvider getCredentialsProvider() {
-    if (directCredentials) {
-      return null;
-    }
-    return credentialsProvider;
-  }
-}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
new file mode 100644
index 0000000..9549f7b
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical;
+
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CredentialedStoragePluginConfig extends StoragePluginConfig {
+
+  private static final Logger logger = LoggerFactory.getLogger(CredentialedStoragePluginConfig.class);
+  protected boolean directCredentials;
+  protected final CredentialsProvider credentialsProvider;
+
+  public CredentialedStoragePluginConfig() {
+    this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,  true);
+  }
+
+  public CredentialedStoragePluginConfig(
+    CredentialsProvider credentialsProvider,
+    boolean directCredentials
+  ) {
+    // Default auth mode for credentialed storage plugins is shared user.
+    this(credentialsProvider, directCredentials, AuthMode.SHARED_USER);
+  }
+
+  public CredentialedStoragePluginConfig(
+    CredentialsProvider credentialsProvider,
+    boolean directCredentials,
+    AuthMode authMode
+  ) {
+    this.credentialsProvider = credentialsProvider;
+    this.directCredentials = directCredentials;
+    this.authMode = authMode;
+  }
+
+  public abstract CredentialedStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider);
+
+  @Override
+  public boolean isEnabled() {
+    logger.debug("Enabled status");
+    return super.isEnabled();
+  }
+
+  public CredentialsProvider getCredentialsProvider() {
+    if (directCredentials) {
+      return null;
+    }
+    return credentialsProvider;
+  }
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 390e2a8..32154fa 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -22,13 +22,17 @@
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public abstract class StoragePluginConfig {
 
   // DO NOT include enabled status in equality and hash
   // comparisons; doing so will break the plugin registry.
-  private Boolean enabled;
+  protected Boolean enabled;
+
+  protected AuthMode authMode;
 
   /**
    * Check for enabled status of the plugin
@@ -43,6 +47,14 @@
     this.enabled = enabled;
   }
 
+  public AuthMode getAuthMode() {
+    return authMode;
+  }
+
+  public void setAuthMode(AuthMode authMode) {
+    this.authMode = authMode;
+  }
+
   /**
    * Allows to check whether the enabled status is present in config
    *
@@ -62,4 +74,39 @@
   public String getValue(String key) {
     return null;
   }
+
+  /**
+   * The standardised authentication modes that storage plugins may offer.
+   */
+  public enum AuthMode {
+    /**
+     * Default. Connects using the identity of the Drill cluster (OS user or
+     * service principal) if the external storage is aware of said identity,
+     * otherwise connects without authentication. Unaffected by the Drill
+     * query user's identity.
+     */
+    DRILL_PROCESS,
+    /**
+     * Connects using a single set of shared credentials stored in some
+     * credential provider.  Unaffected by the Drill query user's identity.
+     */
+    SHARED_USER,
+    /**
+     * Depending on the plugin, connects using one of the two modes above then
+     * instructs the external storage to set the identity on the connection
+     * to that of the Drill query user.  User identity in the external system
+     * will match the Drill query user's identity.
+     */
+    USER_IMPERSONATION,
+    /**
+     * Connects with stored credentials looked up for (translated from)
+     * the Drill query user.  User identity in the external system will be
+     * a function of the Drill query user's identity (1-1 or *-1) .
+     */
+    USER_TRANSLATION;
+
+    public static AuthMode parseOrDefault(String authMode) {
+      return !Strings.isNullOrEmpty(authMode) ? AuthMode.valueOf(authMode.toUpperCase()) : DRILL_PROCESS;
+    }
+  }
 }
diff --git a/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java b/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
index 5c82edf..ba38ad0 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
@@ -19,6 +19,7 @@
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.drill.common.exceptions.UserException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,15 +29,33 @@
  * Provider of authentication credentials.
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
-    property = "credentialsProviderType",
-    defaultImpl = PlainCredentialsProvider.class)
+  property = "credentialsProviderType",
+  defaultImpl = PlainCredentialsProvider.class)
 public interface CredentialsProvider {
+  Logger logger = LoggerFactory.getLogger(CredentialsProvider.class);
   /**
    * Returns map with authentication credentials. Key is the credential name, for example {@code "username"}
    * and map value is corresponding credential value.
    */
-  Logger logger = LoggerFactory.getLogger(CredentialsProvider.class);
-
   @JsonIgnore
   Map<String, String> getCredentials();
+
+  /**
+   * This method returns the credentials associated with a specific user.
+   * @param username The logged in username
+   * @return A Map of the logged in user's credentials.
+   */
+  @JsonIgnore
+  default Map<String, String> getCredentials(String username) {
+    throw UserException.unsupportedError()
+      .message("%s does not support per-user credentials.", getClass())
+      .build(logger);
+  }
+
+  @JsonIgnore
+  default void setUserCredentials(String username, String password, String queryUser) {
+    throw UserException.unsupportedError()
+      .message("%s does not support per-user credentials.", getClass())
+      .build(logger);
+  }
 }
diff --git a/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java b/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
index 8427b63..5624b39 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
@@ -19,13 +19,19 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.drill.common.PlanStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Implementation of {@link CredentialsProvider} that holds credentials provided by user.
@@ -35,21 +41,70 @@
 public class PlainCredentialsProvider implements CredentialsProvider {
   private static final Logger logger = LoggerFactory.getLogger(PlainCredentialsProvider.class);
   public static final CredentialsProvider EMPTY_CREDENTIALS_PROVIDER =
-      new PlainCredentialsProvider(Collections.emptyMap());
+    new PlainCredentialsProvider(Collections.emptyMap());
 
   private final Map<String, String> credentials;
+  private final Map<String, Map<String, String>> userCredentials;
+
+  public PlainCredentialsProvider(Map<String, String> credentials) {
+   this(credentials, new HashMap<>());
+  }
 
   @JsonCreator
-  public PlainCredentialsProvider(@JsonProperty("credentials") Map<String, String> credentials) {
-    this.credentials = credentials;
+  public PlainCredentialsProvider(
+    @JsonProperty("credentials") Map<String, String> credentials,
+    @JsonProperty("userCredentials") Map<String, Map<String, String>> userCredentials
+  ) {
+    this.credentials = Optional.ofNullable(credentials).orElse(new HashMap<>());
+    this.userCredentials = Optional.ofNullable(userCredentials).orElse(new HashMap<>());
+  }
+
+  @JsonIgnore
+  public PlainCredentialsProvider(String username, Map<String, String> credentials) {
+    this.credentials = new HashMap<>();
+    this.userCredentials = new HashMap<>();
+    userCredentials.put(username,credentials);
   }
 
   @Override
   @JsonIgnore(false)
-  public Map<String, String> getCredentials() {
+  @JsonProperty("credentials") public Map<String, String> getCredentials() {
     return credentials;
   }
 
+  @JsonProperty("userCredentials")
+  @JsonInclude(Include.NON_NULL)
+  public Map<String, Map<String, String>> getUserCredentials() {
+    return userCredentials;
+  }
+
+  /**
+   * Returns the credentials for a given query user.  If that user does not have credentials,
+   * the function will add an entry for that user with keys username, password which are both null.
+   * @param queryUser A String of the currently logged in user
+   * @return A Map of the active user's credentials
+   */
+  @Override
+  public Map<String, String> getCredentials(String queryUser) {
+    assert queryUser != null;
+    logger.debug("Getting credentials for query user {}", queryUser);
+
+    return userCredentials.getOrDefault(queryUser, new HashMap<>());
+  }
+
+  @Override
+  public void setUserCredentials(String username, String password, String queryUser) {
+    assert queryUser != null;
+    logger.debug("Setting credentials for query user {}", queryUser);
+
+    Map<String, String> creds = userCredentials.computeIfAbsent(
+      queryUser,
+      c -> new HashMap<String, String>()
+    );
+    creds.put("username", username);
+    creds.put("password", password);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -59,11 +114,20 @@
       return false;
     }
     PlainCredentialsProvider that = (PlainCredentialsProvider) o;
-    return Objects.equals(credentials, that.credentials);
+    return Objects.equals(credentials, that.credentials) &&
+      Objects.equals(userCredentials, that.userCredentials);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(credentials);
+    return Objects.hash(credentials, userCredentials);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("credentials", credentials)
+      .field("userCredentials", userCredentials)
+      .toString();
   }
 }