DRILL-8276: Add Support for User Translation for Splunk Plugin (#2618)
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
index 9932801..cba9e3d 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
@@ -81,7 +81,7 @@
this.subScan = subScan;
this.projectedColumns = subScan.getColumns();
this.subScanSpec = subScan.getScanSpec();
- SplunkConnection connection = new SplunkConnection(config);
+ SplunkConnection connection = new SplunkConnection(config, subScan.getUserName());
this.splunkService = connection.connect();
this.csvSettings = new CsvParserSettings();
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index c92738e..cac75b8 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -26,6 +26,7 @@
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,12 +44,18 @@
private final Optional<UsernamePasswordCredentials> credentials;
private final String hostname;
private final int port;
+ private final String queryUserName;
private Service service;
private int connectionAttempts;
- public SplunkConnection(SplunkPluginConfig config) {
- this.credentials = config.getUsernamePasswordCredentials();
+ public SplunkConnection(SplunkPluginConfig config, String queryUserName) {
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ this.credentials = config.getUsernamePasswordCredentials(queryUserName);
+ } else {
+ this.credentials = config.getUsernamePasswordCredentials();
+ }
this.hostname = config.getHostname();
+ this.queryUserName = queryUserName;
this.port = config.getPort();
this.connectionAttempts = config.getReconnectRetries();
service = connect();
@@ -58,10 +65,15 @@
/**
* This constructor is used for testing only
*/
- public SplunkConnection(SplunkPluginConfig config, Service service) {
- this.credentials = config.getUsernamePasswordCredentials();
+ public SplunkConnection(SplunkPluginConfig config, Service service, String queryUserName) {
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ this.credentials = config.getUsernamePasswordCredentials(queryUserName);
+ } else {
+ this.credentials = config.getUsernamePasswordCredentials();
+ }
this.hostname = config.getHostname();
this.port = config.getPort();
+ this.queryUserName = queryUserName;
this.service = service;
}
@@ -80,11 +92,11 @@
connectionAttempts--;
service = Service.connect(loginArgs);
} catch (Exception e) {
- if(connectionAttempts > 0) {
+ if (connectionAttempts > 0) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException interruptedException) {
- logger.error("Unable to wait 2 secs before next connection trey to Splunk");
+ logger.error("Unable to wait 2 secs before next connection try to Splunk");
}
return connect();
}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
index 69ed312..47ad693 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -30,10 +31,13 @@
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.base.filter.ExprNode;
import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -50,17 +54,20 @@
private int hashCode;
+ private MetadataProviderManager metadataProviderManager;
+
/**
* Creates a new group scan from the storage plugin.
*/
- public SplunkGroupScan (SplunkScanSpec scanSpec) {
- super("no-user");
+ public SplunkGroupScan (SplunkScanSpec scanSpec, MetadataProviderManager metadataProviderManager) {
+ super(scanSpec.queryUserName());
this.splunkScanSpec = scanSpec;
this.config = scanSpec.getConfig();
this.columns = ALL_COLUMNS;
this.filters = null;
this.filterSelectivity = 0.0;
this.maxRecords = -1;
+ this.metadataProviderManager = metadataProviderManager;
this.scanStats = computeScanStats();
}
@@ -76,10 +83,12 @@
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
this.scanStats = that.scanStats;
+ this.hashCode = that.hashCode;
}
/**
@@ -97,8 +106,8 @@
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
-
}
/**
@@ -115,6 +124,7 @@
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
}
@@ -208,6 +218,25 @@
return new SplunkGroupScan(this, maxRecords);
}
+ public TupleMetadata getSchema() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ try {
+ return metadataProviderManager.getSchemaProvider().read().getSchema();
+ } catch (IOException | NullPointerException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public TableMetadataProvider getMetadataProvider() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ return metadataProviderManager.getTableMetadataProvider();
+ }
+
@Override
public String getDigest() {
return toString();
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 41d134a..c6e574f 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -28,6 +28,8 @@
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.store.security.CredentialProviderUtils;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Optional;
@@ -35,13 +37,14 @@
@JsonTypeName(SplunkPluginConfig.NAME)
public class SplunkPluginConfig extends StoragePluginConfig {
+ private static final Logger logger = LoggerFactory.getLogger(SplunkPluginConfig.class);
+
public static final String NAME = "splunk";
public static final int DISABLED_RECONNECT_RETRIES = 1;
private final String hostname;
private final String earliestTime;
private final String latestTime;
-
private final int port;
private final Integer reconnectRetries;
@@ -56,7 +59,7 @@
@JsonProperty("reconnectRetries") Integer reconnectRetries,
@JsonProperty("authMode") String authMode) {
super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
- credentialsProvider == null);
+ credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
this.hostname = hostname;
this.port = port;
this.earliestTime = earliestTime;
@@ -73,6 +76,10 @@
this.reconnectRetries = that.reconnectRetries;
}
+ /**
+ * Gets the credentials. This method is used when user translation is not enabled.
+ * @return An {@link Optional} containing {@link UsernamePasswordCredentials} from the config.
+ */
@JsonIgnore
public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
return new UsernamePasswordCredentials.Builder()
@@ -80,12 +87,24 @@
.build();
}
+ /**
+ * Gets the credentials. This method is used when user translation is enabled.
+ * @return An {@link Optional} containing {@link UsernamePasswordCredentials} from the config.
+ */
+ @JsonIgnore
+ public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String username) {
+ return new UsernamePasswordCredentials.Builder()
+ .setCredentialsProvider(credentialsProvider)
+ .setQueryUser(username)
+ .build();
+ }
+
@JsonProperty("username")
public String getUsername() {
if (!directCredentials) {
return null;
}
- return getUsernamePasswordCredentials()
+ return getUsernamePasswordCredentials(null)
.map(UsernamePasswordCredentials::getUsername)
.orElse(null);
}
@@ -95,7 +114,7 @@
if (!directCredentials) {
return null;
}
- return getUsernamePasswordCredentials()
+ return getUsernamePasswordCredentials(null)
.map(UsernamePasswordCredentials::getPassword)
.orElse(null);
}
@@ -141,12 +160,13 @@
Objects.equals(hostname, thatConfig.hostname) &&
Objects.equals(port, thatConfig.port) &&
Objects.equals(earliestTime, thatConfig.earliestTime) &&
- Objects.equals(latestTime, thatConfig.latestTime);
+ Objects.equals(latestTime, thatConfig.latestTime) &&
+ Objects.equals(authMode, thatConfig.authMode);
}
@Override
public int hashCode() {
- return Objects.hash(credentialsProvider, hostname, port, earliestTime, latestTime);
+ return Objects.hash(credentialsProvider, hostname, port, earliestTime, latestTime, authMode);
}
@Override
@@ -157,6 +177,7 @@
.field("port", port)
.field("earliestTime", earliestTime)
.field("latestTime", latestTime)
+ .field("Authentication Mode", authMode)
.toString();
}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
index 2d736bb..bd4a774 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
@@ -29,14 +29,17 @@
private final String pluginName;
private final String indexName;
private final SplunkPluginConfig config;
+ private final String queryUserName;
@JsonCreator
public SplunkScanSpec(@JsonProperty("pluginName") String pluginName,
@JsonProperty("indexName") String indexName,
- @JsonProperty("config") SplunkPluginConfig config) {
+ @JsonProperty("config") SplunkPluginConfig config,
+ @JsonProperty("queryUserName") String queryUserName) {
this.pluginName = pluginName;
this.indexName = indexName;
this.config = config;
+ this.queryUserName = queryUserName;
}
@JsonProperty("pluginName")
@@ -48,12 +51,18 @@
@JsonProperty("config")
public SplunkPluginConfig getConfig() { return config; }
+ @JsonProperty("queryUserName")
+ public String queryUserName() {
+ return queryUserName;
+ }
+
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("config", config)
.field("schemaName", pluginName)
.field("indexName", indexName)
+ .field("queryUserName", queryUserName)
.toString();
}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
index bdd71f9..8270c45 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
@@ -20,6 +20,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractSchemaFactory;
@@ -38,6 +39,7 @@
private static final Logger logger = LoggerFactory.getLogger(SplunkSchemaFactory.class);
private static final String SPL_TABLE_NAME = "spl";
private final SplunkStoragePlugin plugin;
+ private String queryUserName;
public SplunkSchemaFactory(SplunkStoragePlugin plugin) {
super(plugin.getName());
@@ -46,18 +48,23 @@
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
- SplunkSchema schema = new SplunkSchema(plugin);
+ this.queryUserName = schemaConfig.getUserName();
+ SplunkSchema schema = new SplunkSchema(plugin, queryUserName);
SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
}
- class SplunkSchema extends AbstractSchema {
+ static class SplunkSchema extends AbstractSchema {
private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
private final SplunkStoragePlugin plugin;
+ private final String queryUserName;
- public SplunkSchema(SplunkStoragePlugin plugin) {
+ public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
super(Collections.emptyList(), plugin.getName());
this.plugin = plugin;
+ this.queryUserName = queryUserName;
+
+
registerIndexes();
}
@@ -70,7 +77,7 @@
} else {
// Register the table
return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
- new SplunkScanSpec(plugin.getName(), name, plugin.getConfig())));
+ new SplunkScanSpec(plugin.getName(), name, plugin.getConfig(), queryUserName)));
}
}
@@ -95,19 +102,29 @@
}
private void registerIndexes() {
+ // Verify that the connection is successful. If not, don't register any indexes,
+ // and throw an exception.
+ SplunkPluginConfig config = plugin.getConfig();
+ SplunkConnection connection;
+ try {
+ connection = new SplunkConnection(config, queryUserName);
+ connection.connect();
+ } catch (Exception e) {
+ // Catch any connection errors that may happen.
+ throw UserException.connectionError()
+ .message("Unable to connect to Splunk: " + plugin.getName() + " " + e.getMessage())
+ .build(logger);
+ }
+
// Add default "spl" table to index list.
registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(),
- new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig())));
+ new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName)));
// Retrieve and add all other Splunk indexes
- SplunkPluginConfig config = plugin.getConfig();
- SplunkConnection connection = new SplunkConnection(config);
- connection.connect();
-
for (String indexName : connection.getIndexes().keySet()) {
logger.debug("Registering {}", indexName);
registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
- new SplunkScanSpec(plugin.getName(), indexName, config)));
+ new SplunkScanSpec(plugin.getName(), indexName, config, queryUserName)));
}
}
}
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
index dc011a6..f04e25d 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
@@ -22,20 +22,30 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.base.filter.FilterPushDownUtils;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
import java.util.Set;
public class SplunkStoragePlugin extends AbstractStoragePlugin {
+ private static final Logger logger = LoggerFactory.getLogger(SplunkStoragePlugin.class);
private final SplunkPluginConfig config;
private final SplunkSchemaFactory schemaFactory;
@@ -57,13 +67,51 @@
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ // Check to see if user translation is enabled. If so, and creds are
+ // not present, then do not register any schemata. This prevents
+ // info schema errors.
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ Optional<UsernamePasswordCredentials> userCreds = config.getUsernamePasswordCredentials(schemaConfig.getUserName());
+ if (! userCreds.isPresent()) {
+ logger.debug(
+ "No schemas will be registered in {} for query user {}.",
+ getName(), schemaConfig.getUserName()
+ );
+ return;
+ }
+ }
schemaFactory.registerSchemas(schemaConfig, parent);
}
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ SessionOptionManager options) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, metadataProviderManager);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ List<SchemaPath> columns) throws IOException {
+ return getPhysicalScan(userName, selection, columns, null, null);
+ }
@Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options,
+ MetadataProviderManager metadataProviderManager) throws IOException {
SplunkScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<SplunkScanSpec>() {});
- return new SplunkGroupScan(scanSpec);
+ return new SplunkGroupScan(scanSpec, metadataProviderManager);
}
@Override
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
index 21b1237..e90bd2a 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
@@ -52,7 +52,7 @@
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
@JsonProperty("maxRecords") int maxRecords) {
- super("user");
+ super(splunkScanSpec.queryUserName());
this.config = config;
this.splunkScanSpec = splunkScanSpec;
this.columns = columns;
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index 4e010b2..22cc0a5 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -36,7 +36,7 @@
@Test
public void testConnection() {
- SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+ SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null);
sc.connect();
}
@@ -54,7 +54,7 @@
SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
StoragePluginConfig.AuthMode.SHARED_USER.name()
);
- SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
+ SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null);
sc.connect();
fail();
} catch (UserException e) {
@@ -64,7 +64,7 @@
@Test
public void testGetIndexes() {
- SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+ SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null);
EntityCollection<Index> indexes = sc.getIndexes();
assertEquals(9, indexes.size());
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index 7d52008..2818018 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -311,7 +311,7 @@
.thenThrow(new RuntimeException("Fail second connection to Splunk"))
.thenThrow(new RuntimeException("Fail third connection to Splunk"))
.thenReturn(new Service(loginArgs)); // fourth connection is successful
- new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG); // it will fail, in case "reconnectRetries": 1 is specified in configs
+ new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null); // it will fail, in case "reconnectRetries": 1 is specified in configs
splunk.verify(
() -> Service.connect(loginArgs),
times(4)
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 0abf92a..2c07ac4 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -20,8 +20,11 @@
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -34,8 +37,12 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+
@RunWith(Suite.class)
@Suite.SuiteClasses({
@@ -44,7 +51,8 @@
SplunkLimitPushDownTest.class,
SplunkIndexesTest.class,
SplunkPluginTest.class,
- SplunkTestSplunkUtils.class
+ SplunkTestSplunkUtils.class,
+ TestSplunkUserTranslation.class
})
@Category({SlowTest.class})
@@ -52,6 +60,8 @@
private static final Logger logger = LoggerFactory.getLogger(SplunkTestSuite.class);
protected static SplunkPluginConfig SPLUNK_STORAGE_PLUGIN_CONFIG = null;
+
+ protected static SplunkPluginConfig SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = null;
public static final String SPLUNK_LOGIN = "admin";
public static final String SPLUNK_PASS = "password";
@@ -67,7 +77,12 @@
public static void initSplunk() throws Exception {
synchronized (SplunkTestSuite.class) {
if (initCount.get() == 0) {
- startCluster(ClusterFixture.builder(dirTestWatcher));
+ ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+ .configProperty(ExecConstants.HTTP_ENABLE, true)
+ .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+ .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+ startCluster(builder);
+
splunk.start();
String hostname = splunk.getHost();
Integer port = splunk.getFirstMappedPort();
@@ -79,6 +94,19 @@
runningSuite = true;
logger.info("Take a time to ready more Splunk events (10 sec)...");
Thread.sleep(10000);
+
+
+ PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>());
+ // Add authorized user
+ credentialsProvider.setUserCredentials(SPLUNK_LOGIN, SPLUNK_PASS, TEST_USER_1);
+ // Add unauthorized user
+ credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2);
+
+ SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = new SplunkPluginConfig(null, null, hostname, port, "1", "now",
+ credentialsProvider, 4, AuthMode.USER_TRANSLATION.name());
+ SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true);
+ pluginRegistry.put("ut_splunk", SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION);
+
}
initCount.incrementAndGet();
runningSuite = true;
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
new file mode 100644
index 0000000..7e24e0c
--- /dev/null
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Category({SlowTest.class})
+public class TestSplunkUserTranslation extends SplunkBaseTest {
+
+ @Test
+ public void testInfoSchemaQueryWithMissingCredentials() throws Exception {
+ // This test validates that the correct credentials are sent down to Splunk.
+ // This user should not see the ut_splunk because they do not have valid credentials
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, ADMIN_USER)
+ .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+ .build();
+
+ String sql = "SHOW DATABASES WHERE schema_name LIKE '%splunk%'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ }
+
+ @Test
+ public void testInfoSchemaQueryWithValidCredentials() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, TEST_USER_1)
+ .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+ .build();
+
+ String sql = "SHOW DATABASES WHERE schema_name LIKE '%splunk'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(2, results.rowCount());
+ }
+
+ @Test
+ public void testSplunkQueryWithUserTranslation() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, TEST_USER_1)
+ .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+ .build();
+
+ String sql = "SELECT acceleration_id, action, add_offset, add_timestamp FROM ut_splunk._audit LIMIT 2";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(2, results.rowCount());
+ }
+
+ @Test
+ public void testSplunkQueryWithUserTranslationAndInvalidCredentials() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, ADMIN_USER)
+ .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+ .build();
+
+ String sql = "SELECT acceleration_id, action, add_offset, add_timestamp FROM ut_splunk._audit LIMIT 2";
+ try {
+ client.queryBuilder().sql(sql).rowSet();
+ fail();
+ } catch (UserRemoteException e) {
+ assertTrue(e.getMessage().contains("Schema [[ut_splunk]] is not valid"));
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 16193cb..9786377 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,7 @@
<commons.cli.version>1.4</commons.cli.version>
<snakeyaml.version>1.26</snakeyaml.version>
<commons.lang3.version>3.10</commons.lang3.version>
- <testcontainers.version>1.16.3</testcontainers.version>
+ <testcontainers.version>1.17.3</testcontainers.version>
<typesafe.config.version>1.4.2</typesafe.config.version>
<commons.codec.version>1.14</commons.codec.version>
<xerces.version>2.12.2</xerces.version>