DRILL-8276: Add Support for User Translation for Splunk Plugin (#2618)

diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
index 9932801..cba9e3d 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
@@ -81,7 +81,7 @@
     this.subScan = subScan;
     this.projectedColumns = subScan.getColumns();
     this.subScanSpec = subScan.getScanSpec();
-    SplunkConnection connection = new SplunkConnection(config);
+    SplunkConnection connection = new SplunkConnection(config, subScan.getUserName());
     this.splunkService = connection.connect();
 
     this.csvSettings = new CsvParserSettings();
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index c92738e..cac75b8 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -26,6 +26,7 @@
 import com.splunk.Service;
 import com.splunk.ServiceArgs;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,12 +44,18 @@
   private final Optional<UsernamePasswordCredentials> credentials;
   private final String hostname;
   private final int port;
+  private final String queryUserName;
   private Service service;
   private int connectionAttempts;
 
-  public SplunkConnection(SplunkPluginConfig config) {
-    this.credentials = config.getUsernamePasswordCredentials();
+  public SplunkConnection(SplunkPluginConfig config, String queryUserName) {
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      this.credentials = config.getUsernamePasswordCredentials(queryUserName);
+    } else {
+      this.credentials = config.getUsernamePasswordCredentials();
+    }
     this.hostname = config.getHostname();
+    this.queryUserName = queryUserName;
     this.port = config.getPort();
     this.connectionAttempts = config.getReconnectRetries();
     service = connect();
@@ -58,10 +65,15 @@
   /**
    * This constructor is used for testing only
    */
-  public SplunkConnection(SplunkPluginConfig config, Service service) {
-    this.credentials = config.getUsernamePasswordCredentials();
+  public SplunkConnection(SplunkPluginConfig config, Service service, String queryUserName) {
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      this.credentials = config.getUsernamePasswordCredentials(queryUserName);
+    } else {
+      this.credentials = config.getUsernamePasswordCredentials();
+    }
     this.hostname = config.getHostname();
     this.port = config.getPort();
+    this.queryUserName = queryUserName;
     this.service = service;
   }
 
@@ -80,11 +92,11 @@
       connectionAttempts--;
       service = Service.connect(loginArgs);
     } catch (Exception e) {
-      if(connectionAttempts > 0) {
+      if (connectionAttempts > 0) {
         try {
           TimeUnit.SECONDS.sleep(2);
         } catch (InterruptedException interruptedException) {
-          logger.error("Unable to wait 2 secs before next connection trey to Splunk");
+          logger.error("Unable to wait 2 secs before next connection try to Splunk");
         }
         return connect();
       }
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
index 69ed312..47ad693 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -30,10 +31,13 @@
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.base.filter.ExprNode;
 import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -50,17 +54,20 @@
 
   private int hashCode;
 
+  private MetadataProviderManager metadataProviderManager;
+
   /**
    * Creates a new group scan from the storage plugin.
    */
-  public SplunkGroupScan (SplunkScanSpec scanSpec) {
-    super("no-user");
+  public SplunkGroupScan (SplunkScanSpec scanSpec, MetadataProviderManager metadataProviderManager) {
+    super(scanSpec.queryUserName());
     this.splunkScanSpec = scanSpec;
     this.config = scanSpec.getConfig();
     this.columns = ALL_COLUMNS;
     this.filters = null;
     this.filterSelectivity = 0.0;
     this.maxRecords = -1;
+    this.metadataProviderManager = metadataProviderManager;
     this.scanStats = computeScanStats();
 
   }
@@ -76,10 +83,12 @@
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.maxRecords = that.maxRecords;
+    this.metadataProviderManager = that.metadataProviderManager;
 
     // Calcite makes many copies in the later stage of planning
     // without changing anything. Retain the previous stats.
     this.scanStats = that.scanStats;
+    this.hashCode = that.hashCode;
   }
 
   /**
@@ -97,8 +106,8 @@
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.maxRecords = that.maxRecords;
+    this.metadataProviderManager = that.metadataProviderManager;
     this.scanStats = computeScanStats();
-
   }
 
   /**
@@ -115,6 +124,7 @@
     this.filters = filters;
     this.filterSelectivity = filterSelectivity;
     this.maxRecords = that.maxRecords;
+    this.metadataProviderManager = that.metadataProviderManager;
     this.scanStats = computeScanStats();
   }
 
@@ -208,6 +218,25 @@
     return new SplunkGroupScan(this, maxRecords);
   }
 
+  public TupleMetadata getSchema() {
+    if (metadataProviderManager == null) {
+      return null;
+    }
+    try {
+      return metadataProviderManager.getSchemaProvider().read().getSchema();
+    } catch (IOException | NullPointerException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public TableMetadataProvider getMetadataProvider() {
+    if (metadataProviderManager == null) {
+      return null;
+    }
+    return metadataProviderManager.getTableMetadataProvider();
+  }
+
   @Override
   public String getDigest() {
     return toString();
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 41d134a..c6e574f 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -28,6 +28,8 @@
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 import java.util.Optional;
@@ -35,13 +37,14 @@
 @JsonTypeName(SplunkPluginConfig.NAME)
 public class SplunkPluginConfig extends StoragePluginConfig {
 
+  private static final Logger logger = LoggerFactory.getLogger(SplunkPluginConfig.class);
+
   public static final String NAME = "splunk";
   public static final int DISABLED_RECONNECT_RETRIES = 1;
 
   private final String hostname;
   private final String earliestTime;
   private final String latestTime;
-
   private final int port;
   private final Integer reconnectRetries;
 
@@ -56,7 +59,7 @@
                             @JsonProperty("reconnectRetries") Integer reconnectRetries,
                             @JsonProperty("authMode") String authMode) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
-        credentialsProvider == null);
+        credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
     this.hostname = hostname;
     this.port = port;
     this.earliestTime = earliestTime;
@@ -73,6 +76,10 @@
     this.reconnectRetries = that.reconnectRetries;
   }
 
+  /**
+   * Gets the credentials. This method is used when user translation is not enabled.
+   * @return An {@link Optional} containing {@link UsernamePasswordCredentials} from the config.
+   */
   @JsonIgnore
   public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
     return new UsernamePasswordCredentials.Builder()
@@ -80,12 +87,24 @@
       .build();
   }
 
+  /**
+   * Gets the credentials. This method is used when user translation is enabled.
+   * @return An {@link Optional} containing {@link UsernamePasswordCredentials} from the config.
+   */
+  @JsonIgnore
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(username)
+      .build();
+  }
+
   @JsonProperty("username")
   public String getUsername() {
     if (!directCredentials) {
       return null;
     }
-    return getUsernamePasswordCredentials()
+    return getUsernamePasswordCredentials(null)
       .map(UsernamePasswordCredentials::getUsername)
       .orElse(null);
   }
@@ -95,7 +114,7 @@
     if (!directCredentials) {
       return null;
     }
-    return getUsernamePasswordCredentials()
+    return getUsernamePasswordCredentials(null)
       .map(UsernamePasswordCredentials::getPassword)
       .orElse(null);
   }
@@ -141,12 +160,13 @@
       Objects.equals(hostname, thatConfig.hostname) &&
       Objects.equals(port, thatConfig.port) &&
       Objects.equals(earliestTime, thatConfig.earliestTime) &&
-      Objects.equals(latestTime, thatConfig.latestTime);
+      Objects.equals(latestTime, thatConfig.latestTime) &&
+      Objects.equals(authMode, thatConfig.authMode);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(credentialsProvider, hostname, port, earliestTime, latestTime);
+    return Objects.hash(credentialsProvider, hostname, port, earliestTime, latestTime, authMode);
   }
 
   @Override
@@ -157,6 +177,7 @@
       .field("port", port)
       .field("earliestTime", earliestTime)
       .field("latestTime", latestTime)
+      .field("Authentication Mode", authMode)
       .toString();
   }
 
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
index 2d736bb..bd4a774 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
@@ -29,14 +29,17 @@
   private final String pluginName;
   private final String indexName;
   private final SplunkPluginConfig config;
+  private final String queryUserName;
 
   @JsonCreator
   public SplunkScanSpec(@JsonProperty("pluginName") String pluginName,
                         @JsonProperty("indexName") String indexName,
-                        @JsonProperty("config") SplunkPluginConfig config) {
+                        @JsonProperty("config") SplunkPluginConfig config,
+                        @JsonProperty("queryUserName") String queryUserName) {
     this.pluginName = pluginName;
     this.indexName = indexName;
     this.config = config;
+    this.queryUserName = queryUserName;
   }
 
   @JsonProperty("pluginName")
@@ -48,12 +51,18 @@
   @JsonProperty("config")
   public SplunkPluginConfig getConfig() { return config; }
 
+  @JsonProperty("queryUserName")
+  public String queryUserName() {
+    return queryUserName;
+  }
+
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("config", config)
       .field("schemaName", pluginName)
       .field("indexName", indexName)
+      .field("queryUserName", queryUserName)
       .toString();
   }
 
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
index bdd71f9..8270c45 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
@@ -20,6 +20,7 @@
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
@@ -38,6 +39,7 @@
   private static final Logger logger = LoggerFactory.getLogger(SplunkSchemaFactory.class);
   private static final String SPL_TABLE_NAME = "spl";
   private final SplunkStoragePlugin plugin;
+  private String queryUserName;
 
   public SplunkSchemaFactory(SplunkStoragePlugin plugin) {
     super(plugin.getName());
@@ -46,18 +48,23 @@
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
-    SplunkSchema schema = new SplunkSchema(plugin);
+    this.queryUserName = schemaConfig.getUserName();
+    SplunkSchema schema = new SplunkSchema(plugin, queryUserName);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
   }
 
-  class SplunkSchema extends AbstractSchema {
+  static class SplunkSchema extends AbstractSchema {
 
     private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
     private final SplunkStoragePlugin plugin;
+    private final String queryUserName;
 
-    public SplunkSchema(SplunkStoragePlugin plugin) {
+    public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
       super(Collections.emptyList(), plugin.getName());
       this.plugin = plugin;
+      this.queryUserName = queryUserName;
+
+
       registerIndexes();
     }
 
@@ -70,7 +77,7 @@
       } else {
         // Register the table
         return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
-          new SplunkScanSpec(plugin.getName(), name, plugin.getConfig())));
+          new SplunkScanSpec(plugin.getName(), name, plugin.getConfig(), queryUserName)));
       }
     }
 
@@ -95,19 +102,29 @@
     }
 
     private void registerIndexes() {
+      // Verify that the connection is successful.  If not, don't register any indexes,
+      // and throw an exception.
+      SplunkPluginConfig config = plugin.getConfig();
+      SplunkConnection connection;
+      try {
+        connection = new SplunkConnection(config, queryUserName);
+        connection.connect();
+      } catch (Exception e) {
+        // Catch any connection errors that may happen.
+        throw UserException.connectionError()
+          .message("Unable to connect to Splunk: " +  plugin.getName() + " " + e.getMessage())
+          .build(logger);
+      }
+
       // Add default "spl" table to index list.
       registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(),
-        new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig())));
+        new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName)));
 
       // Retrieve and add all other Splunk indexes
-      SplunkPluginConfig config = plugin.getConfig();
-      SplunkConnection connection = new SplunkConnection(config);
-      connection.connect();
-
       for (String indexName : connection.getIndexes().keySet()) {
         logger.debug("Registering {}", indexName);
         registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
-          new SplunkScanSpec(plugin.getName(), indexName, config)));
+          new SplunkScanSpec(plugin.getName(), indexName, config, queryUserName)));
       }
     }
   }
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
index dc011a6..f04e25d 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
@@ -22,20 +22,30 @@
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.base.filter.FilterPushDownUtils;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 public class SplunkStoragePlugin extends AbstractStoragePlugin {
 
+  private static final Logger logger = LoggerFactory.getLogger(SplunkStoragePlugin.class);
   private final SplunkPluginConfig config;
   private final SplunkSchemaFactory schemaFactory;
 
@@ -57,13 +67,51 @@
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    // Check to see if user translation is enabled.  If so, and creds are
+    // not present, then do not register any schemata.  This prevents
+    // info schema errors.
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      Optional<UsernamePasswordCredentials> userCreds = config.getUsernamePasswordCredentials(schemaConfig.getUserName());
+      if (! userCreds.isPresent()) {
+        logger.debug(
+          "No schemas will be registered in {} for query user {}.",
+          getName(), schemaConfig.getUserName()
+        );
+        return;
+      }
+    }
     schemaFactory.registerSchemas(schemaConfig, parent);
   }
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+      options, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+      options, metadataProviderManager);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null, null);
+  }
 
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options,
+                                           MetadataProviderManager metadataProviderManager) throws IOException {
     SplunkScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<SplunkScanSpec>() {});
-    return new SplunkGroupScan(scanSpec);
+    return new SplunkGroupScan(scanSpec, metadataProviderManager);
   }
 
   @Override
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
index 21b1237..e90bd2a 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
@@ -52,7 +52,7 @@
     @JsonProperty("columns") List<SchemaPath> columns,
     @JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
     @JsonProperty("maxRecords") int maxRecords) {
-      super("user");
+      super(splunkScanSpec.queryUserName());
       this.config = config;
       this.splunkScanSpec = splunkScanSpec;
       this.columns = columns;
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index 4e010b2..22cc0a5 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -36,7 +36,7 @@
 
   @Test
   public void testConnection() {
-    SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+    SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null);
     sc.connect();
   }
 
@@ -54,7 +54,7 @@
               SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
               StoragePluginConfig.AuthMode.SHARED_USER.name()
       );
-      SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
+      SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null);
       sc.connect();
       fail();
     } catch (UserException e) {
@@ -64,7 +64,7 @@
 
   @Test
   public void testGetIndexes() {
-    SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+    SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null);
     EntityCollection<Index> indexes = sc.getIndexes();
     assertEquals(9, indexes.size());
 
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index 7d52008..2818018 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -311,7 +311,7 @@
           .thenThrow(new RuntimeException("Fail second connection to Splunk"))
           .thenThrow(new RuntimeException("Fail third connection to Splunk"))
           .thenReturn(new Service(loginArgs)); // fourth connection is successful
-      new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG); // it will fail, in case "reconnectRetries": 1 is specified in configs
+      new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null); // it will fail, in case "reconnectRetries": 1 is specified in configs
       splunk.verify(
           () -> Service.connect(loginArgs),
           times(4)
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 0abf92a..2c07ac4 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -20,8 +20,11 @@
 
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -34,8 +37,12 @@
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+
 
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
@@ -44,7 +51,8 @@
   SplunkLimitPushDownTest.class,
   SplunkIndexesTest.class,
   SplunkPluginTest.class,
-  SplunkTestSplunkUtils.class
+  SplunkTestSplunkUtils.class,
+  TestSplunkUserTranslation.class
 })
 
 @Category({SlowTest.class})
@@ -52,6 +60,8 @@
   private static final Logger logger = LoggerFactory.getLogger(SplunkTestSuite.class);
 
   protected static SplunkPluginConfig SPLUNK_STORAGE_PLUGIN_CONFIG = null;
+
+  protected static SplunkPluginConfig SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = null;
   public static final String SPLUNK_LOGIN = "admin";
   public static final String SPLUNK_PASS = "password";
 
@@ -67,7 +77,12 @@
   public static void initSplunk() throws Exception {
     synchronized (SplunkTestSuite.class) {
       if (initCount.get() == 0) {
-        startCluster(ClusterFixture.builder(dirTestWatcher));
+        ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+          .configProperty(ExecConstants.HTTP_ENABLE, true)
+          .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+          .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+        startCluster(builder);
+
         splunk.start();
         String hostname = splunk.getHost();
         Integer port = splunk.getFirstMappedPort();
@@ -79,6 +94,19 @@
         runningSuite = true;
         logger.info("Take a time to ready more Splunk events (10 sec)...");
         Thread.sleep(10000);
+
+
+        PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>());
+        // Add authorized user
+        credentialsProvider.setUserCredentials(SPLUNK_LOGIN, SPLUNK_PASS, TEST_USER_1);
+        // Add unauthorized user
+        credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2);
+
+        SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = new SplunkPluginConfig(null, null, hostname, port, "1", "now",
+          credentialsProvider, 4, AuthMode.USER_TRANSLATION.name());
+        SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true);
+        pluginRegistry.put("ut_splunk", SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION);
+
       }
       initCount.incrementAndGet();
       runningSuite = true;
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
new file mode 100644
index 0000000..7e24e0c
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Category({SlowTest.class})
+public class TestSplunkUserTranslation extends SplunkBaseTest {
+
+  @Test
+  public void testInfoSchemaQueryWithMissingCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to Splunk.
+    // This user should not see the ut_splunk because they do not have valid credentials
+    ClientFixture client = cluster
+      .clientBuilder()
+      .property(DrillProperties.USER, ADMIN_USER)
+      .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+      .build();
+
+    String sql = "SHOW DATABASES WHERE schema_name LIKE '%splunk%'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(1, results.rowCount());
+  }
+
+  @Test
+  public void testInfoSchemaQueryWithValidCredentials() throws Exception {
+    ClientFixture client = cluster
+      .clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    String sql = "SHOW DATABASES WHERE schema_name LIKE '%splunk'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(2, results.rowCount());
+  }
+
+  @Test
+  public void testSplunkQueryWithUserTranslation() throws Exception {
+    ClientFixture client = cluster
+      .clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    String sql = "SELECT acceleration_id, action, add_offset, add_timestamp FROM ut_splunk._audit LIMIT 2";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(2, results.rowCount());
+  }
+
+  @Test
+  public void testSplunkQueryWithUserTranslationAndInvalidCredentials() throws Exception {
+    ClientFixture client = cluster
+      .clientBuilder()
+      .property(DrillProperties.USER, ADMIN_USER)
+      .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+      .build();
+
+    String sql = "SELECT acceleration_id, action, add_offset, add_timestamp FROM ut_splunk._audit LIMIT 2";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains("Schema [[ut_splunk]] is not valid"));
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 16193cb..9786377 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,7 @@
     <commons.cli.version>1.4</commons.cli.version>
     <snakeyaml.version>1.26</snakeyaml.version>
     <commons.lang3.version>3.10</commons.lang3.version>
-    <testcontainers.version>1.16.3</testcontainers.version>
+    <testcontainers.version>1.17.3</testcontainers.version>
     <typesafe.config.version>1.4.2</typesafe.config.version>
     <commons.codec.version>1.14</commons.codec.version>
     <xerces.version>2.12.2</xerces.version>