DRILL-8357: Add new config options to the Splunk storage plugin (#2705)

diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml
index e71c893..140bcb3 100644
--- a/contrib/storage-splunk/pom.xml
+++ b/contrib/storage-splunk/pom.xml
@@ -42,7 +42,7 @@
     <dependency>
       <groupId>com.splunk</groupId>
       <artifactId>splunk</artifactId>
-      <version>1.7.1</version>
+      <version>1.9.1</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.maven.plugins</groupId>
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index cac75b8..f3a1e45 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -18,7 +18,6 @@
 
 package org.apache.drill.exec.store.splunk;
 
-import com.splunk.ConfCollection;
 import com.splunk.EntityCollection;
 import com.splunk.HttpService;
 import com.splunk.Index;
@@ -42,9 +41,20 @@
   private static final Logger logger = LoggerFactory.getLogger(SplunkConnection.class);
 
   private final Optional<UsernamePasswordCredentials> credentials;
+  private final String scheme;
   private final String hostname;
-  private final int port;
-  private final String queryUserName;
+  private final Integer port;
+  // Whether the Splunk client will validate the server's SSL cert.
+  private final boolean validateCertificates;
+  // The application context of the service.
+  private final String app;
+  // The owner context of the service.
+  private final String owner;
+  // A Splunk authentication token to use for the session.
+  private final String token;
+  // A valid login cookie.
+  private final String cookie;
+
   private Service service;
   private int connectionAttempts;
 
@@ -54,12 +64,16 @@
     } else {
       this.credentials = config.getUsernamePasswordCredentials();
     }
+    this.scheme = config.getScheme();
     this.hostname = config.getHostname();
-    this.queryUserName = queryUserName;
     this.port = config.getPort();
+    this.app = config.getApp();
+    this.owner = config.getOwner();
+    this.token = config.getToken();
+    this.cookie = config.getCookie();
+    this.validateCertificates = Optional.ofNullable(config.getValidateCertificates()).orElse(false);
     this.connectionAttempts = config.getReconnectRetries();
     service = connect();
-    ConfCollection confs = service.getConfs();
   }
 
   /**
@@ -71,9 +85,14 @@
     } else {
       this.credentials = config.getUsernamePasswordCredentials();
     }
+    this.scheme = config.getScheme();
     this.hostname = config.getHostname();
     this.port = config.getPort();
-    this.queryUserName = queryUserName;
+    this.app = config.getApp();
+    this.owner = config.getOwner();
+    this.token = config.getToken();
+    this.cookie = config.getCookie();
+    this.validateCertificates = config.getValidateCertificates();
     this.service = service;
   }
 
@@ -83,12 +102,29 @@
    */
   public Service connect() {
     HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
+    HttpService.setValidateCertificates(validateCertificates);
+
     ServiceArgs loginArgs = new ServiceArgs();
-    loginArgs.setHost(hostname);
-    loginArgs.setPort(port);
+    if (scheme != null) {
+      // Fall back to the Splunk SDK default if our value is null by not setting
+      loginArgs.setScheme(scheme);
+    }
+    if (hostname != null) {
+      // Fall back to the Splunk SDK default if our value is null by not setting
+      loginArgs.setHost(hostname);
+    }
+    if (port != null) {
+      // Fall back to the Splunk SDK default if our value is null by not setting
+      loginArgs.setPort(port);
+    }
     loginArgs.setPassword(credentials.map(UsernamePasswordCredentials::getPassword).orElse(null));
     loginArgs.setUsername(credentials.map(UsernamePasswordCredentials::getUsername).orElse(null));
-    try {
+    loginArgs.setApp(app);
+    loginArgs.setOwner(owner);
+    loginArgs.setToken(token);
+    loginArgs.setCookie(cookie);
+
+     try {
       connectionAttempts--;
       service = Service.connect(loginArgs);
     } catch (Exception e) {
@@ -101,12 +137,11 @@
         return connect();
       }
       throw UserException
-        .connectionError()
+        .connectionError(e)
         .message("Unable to connect to Splunk at %s:%s", hostname, port)
-        .addContext(e.getMessage())
         .build(logger);
     }
-    logger.debug("Successfully connected to {} on port {}", hostname, port);
+    logger.info("Successfully connected to {} on port {}", hostname, port);
     return service;
   }
 
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index c6e574f..579edaf 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -28,8 +28,6 @@
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 import java.util.Optional;
@@ -37,22 +35,32 @@
 @JsonTypeName(SplunkPluginConfig.NAME)
 public class SplunkPluginConfig extends StoragePluginConfig {
 
-  private static final Logger logger = LoggerFactory.getLogger(SplunkPluginConfig.class);
-
   public static final String NAME = "splunk";
   public static final int DISABLED_RECONNECT_RETRIES = 1;
 
+  private final String scheme;
   private final String hostname;
   private final String earliestTime;
   private final String latestTime;
-  private final int port;
+  private final Integer port;
+  private final String app;
+  private final String owner;
+  private final String token;
+  private final String cookie;
+  private final Boolean validateCertificates;
   private final Integer reconnectRetries;
 
   @JsonCreator
   public SplunkPluginConfig(@JsonProperty("username") String username,
                             @JsonProperty("password") String password,
+                            @JsonProperty("scheme") String scheme,
                             @JsonProperty("hostname") String hostname,
-                            @JsonProperty("port") int port,
+                            @JsonProperty("port") Integer port,
+                            @JsonProperty("app") String app,
+                            @JsonProperty("owner") String owner,
+                            @JsonProperty("token") String token,
+                            @JsonProperty("cookie") String cookie,
+                            @JsonProperty("validateCertificates") Boolean validateCertificates,
                             @JsonProperty("earliestTime") String earliestTime,
                             @JsonProperty("latestTime") String latestTime,
                             @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
@@ -60,8 +68,14 @@
                             @JsonProperty("authMode") String authMode) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
         credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
+    this.scheme = scheme;
     this.hostname = hostname;
     this.port = port;
+    this.app = app;
+    this.owner = owner;
+    this.token = token;
+    this.cookie = cookie;
+    this.validateCertificates = validateCertificates;
     this.earliestTime = earliestTime;
     this.latestTime = latestTime == null ? "now" : latestTime;
     this.reconnectRetries = reconnectRetries;
@@ -69,8 +83,14 @@
 
   private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) {
     super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+    this.scheme = that.scheme;
     this.hostname = that.hostname;
     this.port = that.port;
+    this.app = that.app;
+    this.owner = that.owner;
+    this.token = that.token;
+    this.cookie = that.cookie;
+    this.validateCertificates = that.validateCertificates;
     this.earliestTime = that.earliestTime;
     this.latestTime = that.latestTime;
     this.reconnectRetries = that.reconnectRetries;
@@ -119,6 +139,11 @@
       .orElse(null);
   }
 
+  @JsonProperty("scheme")
+  public String getScheme() {
+    return scheme;
+  }
+
   @JsonProperty("hostname")
   public String getHostname() {
     return hostname;
@@ -129,6 +154,31 @@
     return port;
   }
 
+  @JsonProperty("app")
+  public String getApp() {
+    return app;
+  }
+
+  @JsonProperty("owner")
+  public String getOwner() {
+    return owner;
+  }
+
+  @JsonProperty("token")
+  public String getToken() {
+    return token;
+  }
+
+  @JsonProperty("cookie")
+  public String getCookie() {
+    return cookie;
+  }
+
+  @JsonProperty("validateCertificates")
+  public Boolean getValidateCertificates() {
+    return validateCertificates;
+  }
+
   @JsonProperty("earliestTime")
   public String getEarliestTime() {
     return earliestTime;
@@ -157,8 +207,14 @@
     }
     SplunkPluginConfig thatConfig = (SplunkPluginConfig) that;
     return Objects.equals(credentialsProvider, thatConfig.credentialsProvider) &&
+      Objects.equals(scheme, thatConfig.scheme) &&
       Objects.equals(hostname, thatConfig.hostname) &&
       Objects.equals(port, thatConfig.port) &&
+      Objects.equals(app, thatConfig.app) &&
+      Objects.equals(owner, thatConfig.owner) &&
+      Objects.equals(token, thatConfig.token) &&
+      Objects.equals(cookie, thatConfig.cookie) &&
+      Objects.equals(validateCertificates, thatConfig.validateCertificates) &&
       Objects.equals(earliestTime, thatConfig.earliestTime) &&
       Objects.equals(latestTime, thatConfig.latestTime) &&
       Objects.equals(authMode, thatConfig.authMode);
@@ -166,15 +222,34 @@
 
   @Override
   public int hashCode() {
-    return Objects.hash(credentialsProvider, hostname, port, earliestTime, latestTime, authMode);
+    return Objects.hash(
+      credentialsProvider,
+      scheme,
+      hostname,
+      port,
+      app,
+      owner,
+      token,
+      cookie,
+      validateCertificates,
+      earliestTime,
+      latestTime,
+      authMode
+    );
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("credentialsProvider", credentialsProvider)
+      .field("scheme", scheme)
       .field("hostname", hostname)
       .field("port", port)
+      .field("app", app)
+      .field("owner", owner)
+      .field("token", token)
+      .field("cookie", cookie)
+      .field("validateCertificates", validateCertificates)
       .field("earliestTime", earliestTime)
       .field("latestTime", latestTime)
       .field("Authentication Mode", authMode)
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index 22cc0a5..370c072 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -44,15 +44,21 @@
   public void testConnectionFail() {
     try {
       SplunkPluginConfig invalidSplunkConfig = new SplunkPluginConfig(
-              "hacker",
-              "hacker",
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname(),
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(),
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
-              null,
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
-              StoragePluginConfig.AuthMode.SHARED_USER.name()
+        "hacker",
+        "hacker",
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getScheme(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getApp(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getOwner(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getToken(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getCookie(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getValidateCertificates(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
+        null,
+        SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
+        StoragePluginConfig.AuthMode.SHARED_USER.name()
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null);
       sc.connect();
@@ -66,10 +72,11 @@
   public void testGetIndexes() {
     SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null);
     EntityCollection<Index> indexes = sc.getIndexes();
-    assertEquals(9, indexes.size());
+    assertEquals(10, indexes.size());
 
     List<String> expectedIndexNames = new ArrayList<>();
     expectedIndexNames.add("_audit");
+    expectedIndexNames.add("_configtracker");
     expectedIndexNames.add("_internal");
     expectedIndexNames.add("_introspection");
     expectedIndexNames.add("_telemetry");
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
index 9904ffd..fcdce8d 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
@@ -43,6 +43,7 @@
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
       .addRow("splunk", "summary")
       .addRow("splunk", "splunklogger")
+      .addRow("splunk", "_configtracker")
       .addRow("splunk", "_thefishbucket")
       .addRow("splunk", "_audit")
       .addRow("splunk", "_internal")
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index 2818018..ecc00c7 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -39,6 +39,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.times;
 
 @FixMethodOrder(MethodSorters.JVM)
@@ -76,6 +77,7 @@
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
       .addRow("splunk", "summary")
       .addRow("splunk", "splunklogger")
+      .addRow("splunk", "_configtracker")
       .addRow("splunk", "_thefishbucket")
       .addRow("splunk", "_audit")
       .addRow("splunk", "_internal")
@@ -302,11 +304,16 @@
   public void testReconnectRetries() {
     try (MockedStatic<Service> splunk = Mockito.mockStatic(Service.class)) {
       ServiceArgs loginArgs = new ServiceArgs();
+      loginArgs.setScheme(SPLUNK_STORAGE_PLUGIN_CONFIG.getScheme());
       loginArgs.setHost(SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname());
       loginArgs.setPort(SPLUNK_STORAGE_PLUGIN_CONFIG.getPort());
+      loginArgs.setApp(SPLUNK_STORAGE_PLUGIN_CONFIG.getApp());
+      loginArgs.setOwner(SPLUNK_STORAGE_PLUGIN_CONFIG.getOwner());
+      loginArgs.setToken(SPLUNK_STORAGE_PLUGIN_CONFIG.getToken());
+      loginArgs.setCookie(SPLUNK_STORAGE_PLUGIN_CONFIG.getCookie());
       loginArgs.setPassword(SPLUNK_STORAGE_PLUGIN_CONFIG.getPassword());
       loginArgs.setUsername(SPLUNK_STORAGE_PLUGIN_CONFIG.getUsername());
-      splunk.when(() -> Service.connect(loginArgs))
+      splunk.when(() -> Service.connect(any()))
           .thenThrow(new RuntimeException("Fail first connection to Splunk"))
           .thenThrow(new RuntimeException("Fail second connection to Splunk"))
           .thenThrow(new RuntimeException("Fail third connection to Splunk"))
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 2c07ac4..faa716c 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -68,10 +68,13 @@
   private static volatile boolean runningSuite = true;
   private static AtomicInteger initCount = new AtomicInteger(0);
   @ClassRule
-  public static GenericContainer<?> splunk = new GenericContainer<>(DockerImageName.parse("splunk/splunk:8.1"))
-          .withExposedPorts(8089, 8089)
-          .withEnv("SPLUNK_START_ARGS", "--accept-license")
-          .withEnv("SPLUNK_PASSWORD", SPLUNK_PASS);
+  public static GenericContainer<?> splunk = new GenericContainer<>(
+    DockerImageName.parse("splunk/splunk:9.0.2")
+  )
+    .withExposedPorts(8089, 8089)
+    .withEnv("SPLUNK_START_ARGS", "--accept-license")
+    .withEnv("SPLUNK_PASSWORD", SPLUNK_PASS)
+    .withEnv("SPLUNKD_SSL_ENABLE", "false");
 
   @BeforeClass
   public static void initSplunk() throws Exception {
@@ -87,8 +90,15 @@
         String hostname = splunk.getHost();
         Integer port = splunk.getFirstMappedPort();
         StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
-        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now",
-                null, 4, StoragePluginConfig.AuthMode.SHARED_USER.name());
+        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(
+          SPLUNK_LOGIN, SPLUNK_PASS,
+          "http", hostname, port,
+          null, null, null, null, null, // app, owner, token, cookie, validateCertificates
+          "1", "now",
+          null,
+          4,
+          StoragePluginConfig.AuthMode.SHARED_USER.name()
+        );
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
         runningSuite = true;
@@ -102,8 +112,15 @@
         // Add unauthorized user
         credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2);
 
-        SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = new SplunkPluginConfig(null, null, hostname, port, "1", "now",
-          credentialsProvider, 4, AuthMode.USER_TRANSLATION.name());
+        SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = new SplunkPluginConfig(
+          null, null, // username, password
+          "http", hostname, port,
+          null, null, null, null, null, // app, owner, token, cookie, validateCertificates
+          "1", "now",
+          credentialsProvider,
+          4,
+          AuthMode.USER_TRANSLATION.name()
+        );
         SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true);
         pluginRegistry.put("ut_splunk", SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION);
 
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index a1dbdeb..5468f26 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -757,7 +757,7 @@
     storage.list_files_recursively: false,
     storage.plugin_retry_attempts: 1,
     storage.plugin_retry_attempt_delay: 2000,
-    storage.plugin_auto_disable: true,
+    storage.plugin_auto_disable: false,
     # ============ index plan related options ==============
     planner.use_simple_optimizer: false,
     planner.enable_index_planning: true,