Merge pull request #3176 from agresch/agresch_storm_3549
STORM-3549 allow use of custom jaas conf files for Kafka
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
index c4d15fc..31425d4 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -23,7 +23,6 @@
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -55,26 +54,16 @@
SaslUtils.KERBEROS);
LOG.info("Creating Kerberos Client.");
-
- Configuration loginConf;
- try {
- loginConf = ClientAuthUtils.getConfiguration(topoConf);
- } catch (Throwable t) {
- LOG.error("Failed to get loginConf: ", t);
- throw t;
- }
LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
+ String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
+
subject = null;
try {
- LOG.debug("Setting Configuration to login_config: {}", loginConf);
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
- //now login
- LOG.debug("Trying to login.");
- Login login = new Login(jaasSection, ch);
+ LOG.debug("Trying to login using {}.", jaasConfFile);
+ Login login = new Login(jaasSection, ch, jaasConfFile);
subject = login.getSubject();
LOG.debug("Got Subject: {}", subject.toString());
} catch (LoginException ex) {
@@ -88,12 +77,12 @@
throw new RuntimeException("Fail to verify user principal with section \""
+ jaasSection
+ "\" in login configuration file "
- + loginConf);
+ + jaasConfFile);
}
String serviceName = null;
try {
- serviceName = ClientAuthUtils.get(loginConf, jaasSection, "serviceName");
+ serviceName = ClientAuthUtils.get(topoConf, jaasSection, "serviceName");
} catch (IOException e) {
LOG.error("Failed to get service name.", e);
throw new RuntimeException(e);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
index ee410d6..fefcdc6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -25,7 +25,6 @@
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
@@ -49,28 +48,17 @@
KerberosSaslNettyServer(Map<String, Object> topoConf, String jaasSection, List<String> authorizedUsers) {
this.authorizedUsers = authorizedUsers;
- LOG.debug("Getting Configuration.");
- Configuration loginConf;
- try {
- loginConf = ClientAuthUtils.getConfiguration(topoConf);
- } catch (Throwable t) {
- LOG.error("Failed to get loginConf: ", t);
- throw t;
- }
LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(authorizedUsers);
+ String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
//login our principal
subject = null;
try {
- LOG.debug("Setting Configuration to login_config: {}", loginConf);
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
- //now login
- LOG.debug("Trying to login.");
- Login login = new Login(jaasSection, ch);
+ LOG.debug("Trying to login using {}.", jaasConfFile);
+ Login login = new Login(jaasSection, ch, jaasConfFile);
subject = login.getSubject();
LOG.debug("Got Subject: {}", subject.toString());
} catch (LoginException ex) {
@@ -84,7 +72,7 @@
throw new RuntimeException("Fail to verify user principal with section \""
+ jaasSection
+ "\" in login configuration file "
- + loginConf);
+ + jaasConfFile);
}
try {
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index 6a50d84..d6a345e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -19,6 +19,9 @@
* does not die.
*/
+import java.io.File;
+import java.net.URI;
+import java.security.URIParameter;
import java.util.Date;
import java.util.Random;
import java.util.Set;
@@ -31,6 +34,7 @@
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.log4j.Logger;
+import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.shade.org.apache.zookeeper.Shell;
import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
@@ -57,12 +61,9 @@
private Thread thread = null;
private boolean isKrbTicket = false;
private boolean isUsingTicketCache = false;
- private boolean isUsingKeytab = false;
private LoginContext login = null;
private String loginContextName = null;
- private String keytabFile = null;
private String principal = null;
-
private long lastLogin = 0;
/**
@@ -77,14 +78,14 @@
* @throws javax.security.auth.login.LoginException
* Thrown if authentication fails.
*/
- public Login(final String loginContextName, CallbackHandler callbackHandler)
+ public Login(final String loginContextName, CallbackHandler callbackHandler, String jaasConfFile)
throws LoginException {
this.callbackHandler = callbackHandler;
- login = login(loginContextName);
+ login = login(loginContextName, jaasConfFile);
this.loginContextName = loginContextName;
subject = login.getSubject();
isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
- AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+ AppConfigurationEntry[] entries = this.getConfiguration(jaasConfFile).getAppConfigurationEntry(loginContextName);
for (AppConfigurationEntry entry : entries) {
// there will only be a single entry, so this for() loop will only be iterated through once.
if (entry.getOptions().get("useTicketCache") != null) {
@@ -93,10 +94,6 @@
isUsingTicketCache = true;
}
}
- if (entry.getOptions().get("keyTab") != null) {
- keytabFile = (String) entry.getOptions().get("keyTab");
- isUsingKeytab = true;
- }
if (entry.getOptions().get("principal") != null) {
principal = (String) entry.getOptions().get("principal");
}
@@ -251,6 +248,19 @@
thread.setDaemon(true);
}
+ private Configuration getConfiguration(String jaasConfFile) {
+ File configFile = new File(jaasConfFile);
+ if (!configFile.canRead()) {
+ throw new RuntimeException("File " + jaasConfFile + " cannot be read.");
+ }
+ try {
+ URI configUri = configFile.toURI();
+ return Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to get configuration for " + jaasConfFile, ex);
+ }
+ }
+
public void startThreadIfNeeded() {
// thread object 'thread' will be null if a refresh thread is not needed.
if (thread != null) {
@@ -277,7 +287,7 @@
return loginContextName;
}
- private synchronized LoginContext login(final String loginContextName) throws LoginException {
+ private synchronized LoginContext login(final String loginContextName, String jaasConfFile) throws LoginException {
if (loginContextName == null) {
throw new LoginException("loginContext name (JAAS file section header) was null. "
+ "Please check your java.security.login.auth.config (="
@@ -285,9 +295,10 @@
+ ") and your " + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY + "(="
+ System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
}
- LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);
+ Configuration configuration = this.getConfiguration(jaasConfFile);
+ LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
loginContext.login();
- LOG.info("successfully logged in.");
+ LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
return loginContext;
}
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index b77e003..30faf1f 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -73,9 +73,8 @@
switch (auth) {
case "DIGEST":
- Configuration loginConf = ClientAuthUtils.getConfiguration(config);
authMethod = ThriftNettyClientCodec.AuthMethod.DIGEST;
- secret = ClientAuthUtils.makeDigestPayload(loginConf, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+ secret = ClientAuthUtils.makeDigestPayload(config, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
if (secret == null) {
LOG.error("Can't start pacemaker server without digest secret.");
throw new RuntimeException("Can't start pacemaker server without digest secret.");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
index 9fb0e4b..6a9b703 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
@@ -60,6 +60,10 @@
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
+ public static String getJaasConf(Map<String, Object> topoConf) {
+ return (String) topoConf.get("java.security.auth.login.config");
+ }
+
/**
* Construct a JAAS configuration object per storm configuration file.
*
@@ -70,7 +74,7 @@
Configuration loginConf = null;
//find login file configuration from Storm configuration
- String loginConfigurationFile = (String) topoConf.get("java.security.auth.login.config");
+ String loginConfigurationFile = getJaasConf(topoConf);
if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) {
File configFile = new File(loginConfigurationFile);
if (!configFile.canRead()) {
@@ -111,12 +115,13 @@
/**
* Pull a set of keys out of a Configuration.
*
- * @param configuration The config to pull the key/value pairs out of.
+ * @param topoConf The config containing the jaas conf file.
* @param section The app configuration entry name to get stuff from.
* @return Return a map of the configs in conf.
*/
- public static SortedMap<String, ?> pullConfig(Configuration configuration,
+ public static SortedMap<String, ?> pullConfig(Map<String, Object> topoConf,
String section) throws IOException {
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
if (configurationEntries == null) {
@@ -138,12 +143,17 @@
/**
* Pull a the value given section and key from Configuration.
*
- * @param configuration The config to pull the key/value pairs out of.
+ * @param topoConf The config containing the jaas conf file.
* @param section The app configuration entry name to get stuff from.
* @param key The key to look up inside of the section
* @return Return a the String value of the configuration value
*/
- public static String get(Configuration configuration, String section, String key) throws IOException {
+ public static String get(Map<String, Object> topoConf, String section, String key) throws IOException {
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
+ return get(configuration, section, key);
+ }
+
+ static String get(Configuration configuration, String section, String key) throws IOException {
AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
if (configurationEntries == null) {
@@ -456,22 +466,22 @@
/**
* Construct a transport plugin per storm configuration.
*/
- public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf) {
+ public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf) {
try {
String transportPluginClassName = type.getTransportPlugin(topoConf);
ITransportPlugin transportPlugin = ReflectionUtils.newInstance(transportPluginClassName);
- transportPlugin.prepare(type, topoConf, loginConf);
+ transportPlugin.prepare(type, topoConf);
return transportPlugin;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- public static String makeDigestPayload(Configuration loginConfig, String configSection) {
+ public static String makeDigestPayload(Map<String, Object> topoConf, String configSection) {
String username = null;
String password = null;
try {
- Map<String, ?> results = ClientAuthUtils.pullConfig(loginConfig, configSection);
+ Map<String, ?> results = ClientAuthUtils.pullConfig(topoConf, configSection);
username = (String) results.get(USERNAME);
password = (String) results.get(PASSWORD);
} catch (Exception e) {
@@ -492,6 +502,8 @@
}
}
+
+
public static byte[] serializeKerberosTicket(KerberosTicket tgt) throws Exception {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bao);
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
index 6bf3b0b..19ec374 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
@@ -29,9 +29,8 @@
*
* @param type the type of connection this will process.
* @param topoConf Storm configuration
- * @param loginConf login configuration
*/
- void prepare(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf);
+ void prepare(ThriftConnectionType type, Map<String, Object> topoConf);
/**
* Create a server associated with a given port, service handler, and purpose.
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 21a6ce1..0605385 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -48,14 +48,12 @@
private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
protected ThriftConnectionType type;
protected Map<String, Object> topoConf;
- protected Configuration loginConf;
private int port;
@Override
- public void prepare(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf) {
+ public void prepare(ThriftConnectionType type, Map<String, Object> topoConf) {
this.type = type;
this.topoConf = topoConf;
- this.loginConf = loginConf;
}
@Override
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index a9d9129..88d70ec 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -83,11 +83,8 @@
socket.setTimeout(timeout);
}
- //locate login configuration
- Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
-
//construct a transport plugin
- ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf, loginConf);
+ ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf);
//TODO get this from type instead of hardcoding to Nimbus.
//establish client-server transport via plugin
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index f6102bc..9938b7c 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -12,10 +12,8 @@
package org.apache.storm.security.auth;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
-import javax.security.auth.login.Configuration;
import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
import org.apache.storm.thrift.TProcessor;
import org.apache.storm.thrift.server.TServer;
@@ -29,7 +27,6 @@
private final Map<String, Object> conf; //storm configuration
private final ThriftConnectionType type;
private TServer server;
- private Configuration loginConf;
private int port;
private boolean areWorkerTokensSupported;
private ITransportPlugin transportPlugin;
@@ -40,14 +37,8 @@
this.type = type;
try {
- //retrieve authentication configuration
- loginConf = ClientAuthUtils.getConfiguration(this.conf);
- } catch (Exception x) {
- LOG.error(x.getMessage(), x);
- }
- try {
//locate our thrift transport plugin
- transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
+ transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf);
//server
server = transportPlugin.getServer(this.processor);
port = transportPlugin.getPort();
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index e3e4497..5a77d51 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -16,6 +16,8 @@
import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
import org.apache.storm.generated.WorkerToken;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
@@ -44,7 +46,7 @@
//create an authentication callback handler
CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(impersonationAllowed,
workerTokenAuthorizer,
- new JassPasswordProvider(loginConf));
+ new JassPasswordProvider(conf));
//create a transport factory that will invoke our auth callback for digest
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
@@ -60,7 +62,11 @@
WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
if (token != null) {
clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
- } else if (loginConf != null) {
+ } else {
+ Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
+ if (loginConf == null) {
+ throw new IOException("Could not find any way to authenticate with the server.");
+ }
AppConfigurationEntry[] configurationEntries = loginConf.getAppConfigurationEntry(ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
if (configurationEntries == null) {
String errorMessage = "Could not find a '" + ClientAuthUtils.LOGIN_CONTEXT_CLIENT
@@ -76,8 +82,6 @@
password = (String) options.getOrDefault("password", password);
}
clientCallbackHandler = new SimpleSaslClientCallbackHandler(username, password);
- } else {
- throw new IOException("Could not find any way to authenticate with the server.");
}
TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
index 5c0d7a4..1d5c7e3 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
@@ -36,10 +36,12 @@
/**
* Constructor.
*
- * @param configuration the jaas configuration to get the credentials out of.
+ * @param topoConf the configuration containing the jaas conf to use.
* @throws IOException if we could not read the Server section in the jaas conf.
*/
- public JassPasswordProvider(Configuration configuration) throws IOException {
+ public JassPasswordProvider(Map<String, Object> topoConf) throws IOException {
+
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
if (configuration == null) {
return;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index c4f9f91..6936555 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -121,11 +121,10 @@
//Log the user in and get the TGT
try {
Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
- ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(loginConf);
+ ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(conf);
//login our user
- Configuration.setConfiguration(loginConf);
- LoginContext lc = new LoginContext(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler);
+ LoginContext lc = new LoginContext(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, null, clientCallbackHandler, loginConf);
try {
lc.login();
final Subject subject = lc.getSubject();
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
index 64673ce..f9a8e09 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -13,6 +13,7 @@
package org.apache.storm.security.auth.kerberos;
import java.io.IOException;
+import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -36,7 +37,8 @@
*
* <p>For digest, you should have a pair of user name and password defined in this figgure.
*/
- public ClientCallbackHandler(Configuration configuration) throws IOException {
+ public ClientCallbackHandler(Map<String, Object> topoConf) throws IOException {
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
if (configuration == null) {
return;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 915473b..5b42886 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -58,15 +58,15 @@
workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
}
//create an authentication callback handler
- CallbackHandler serverCallbackHandler = new ServerCallbackHandler(loginConf, impersonationAllowed);
+ CallbackHandler serverCallbackHandler = new ServerCallbackHandler(conf, impersonationAllowed);
+
+ String jaasConfFile = ClientAuthUtils.getJaasConf(conf);
//login our principal
Subject subject = null;
try {
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
//now login
- Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_SERVER, serverCallbackHandler);
+ Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_SERVER, serverCallbackHandler, jaasConfFile);
subject = login.getSubject();
login.startThreadIfNeeded();
} catch (LoginException ex) {
@@ -77,10 +77,10 @@
//check the credential of our principal
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
throw new RuntimeException("Fail to verify user principal with section \""
- + ClientAuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + loginConf);
+ + ClientAuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + jaasConfFile);
}
- String principal = ClientAuthUtils.get(loginConf, ClientAuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+ String principal = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_SERVER, "principal");
LOG.debug("principal:" + principal);
KerberosName serviceKerberosName = new KerberosName(principal);
String serviceName = serviceKerberosName.getServiceName();
@@ -107,11 +107,9 @@
private Login mkLogin() throws IOException {
try {
//create an authentication callback handler
- ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(loginConf);
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
+ ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(conf);
//now login
- Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler);
+ Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler, ClientAuthUtils.getJaasConf(conf));
login.startThreadIfNeeded();
return login;
} catch (LoginException ex) {
@@ -142,7 +140,7 @@
private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException {
//login our user
- SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(loginConf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
+ SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
if (authConf == null) {
throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null");
}
@@ -180,11 +178,11 @@
final Subject subject = login.getSubject();
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
throw new RuntimeException("Fail to verify user principal with section \""
- + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + loginConf);
+ + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + ClientAuthUtils.getJaasConf(conf));
}
final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
- String serviceName = ClientAuthUtils.get(loginConf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+ String serviceName = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
if (serviceName == null) {
serviceName = ClientAuthUtils.SERVICE;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 05630e9..daa5fbf 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -13,6 +13,7 @@
package org.apache.storm.security.auth.kerberos;
import java.io.IOException;
+import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -35,8 +36,10 @@
private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
private final boolean impersonationAllowed;
- public ServerCallbackHandler(Configuration configuration, boolean impersonationAllowed) throws IOException {
+ public ServerCallbackHandler(Map<String, Object> topoConf, boolean impersonationAllowed) throws IOException {
this.impersonationAllowed = impersonationAllowed;
+
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
if (configuration == null) {
return;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index b204c1a..49f5b82 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -49,14 +49,12 @@
public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable {
protected ThriftConnectionType type;
protected Map<String, Object> conf;
- protected Configuration loginConf;
private int port;
@Override
- public void prepare(ThriftConnectionType type, Map<String, Object> conf, Configuration loginConf) {
+ public void prepare(ThriftConnectionType type, Map<String, Object> conf) {
this.type = type;
this.conf = conf;
- this.loginConf = loginConf;
}
@Override
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
index 9f09ee4..1aa854b 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
@@ -95,8 +95,9 @@
@Test
public void objGettersReturnNullWithNullConfigTest() throws IOException {
- Assert.assertNull(ClientAuthUtils.pullConfig(null, "foo"));
- Assert.assertNull(ClientAuthUtils.get(null, "foo", "bar"));
+ Map<String, Object> topoConf = new HashMap<>();
+ Assert.assertNull(ClientAuthUtils.pullConfig(topoConf, "foo"));
+ Assert.assertNull(ClientAuthUtils.get(topoConf, "foo", "bar"));
Assert.assertNull(ClientAuthUtils.getConfiguration(Collections.emptyMap()));
}
@@ -141,39 +142,6 @@
Mockito.verify(autoCred, Mockito.times(1)).populateSubject(subject, cred);
}
- @Test
- public void makeDigestPayloadTest() throws NoSuchAlgorithmException {
- String section = "user-pass-section";
- Map<String, String> optionMap = new HashMap<String, String>();
- String user = "user";
- String pass = "pass";
- optionMap.put("username", user);
- optionMap.put("password", pass);
- AppConfigurationEntry entry = Mockito.mock(AppConfigurationEntry.class);
-
- Mockito.<Map<String, ?>>when(entry.getOptions()).thenReturn(optionMap);
- Configuration mockConfig = Mockito.mock(Configuration.class);
- Mockito.when(mockConfig.getAppConfigurationEntry(section))
- .thenReturn(new AppConfigurationEntry[]{ entry });
-
- MessageDigest digest = MessageDigest.getInstance("SHA-512");
- byte[] output = digest.digest((user + ":" + pass).getBytes());
- String sha = Hex.encodeHexString(output);
-
- // previous code used this method to generate the string, ensure the two match
- StringBuilder builder = new StringBuilder();
- for (byte b : output) {
- builder.append(String.format("%02x", b));
- }
- String stringFormatMethod = builder.toString();
-
- Assert.assertEquals(
- ClientAuthUtils.makeDigestPayload(mockConfig, "user-pass-section"),
- sha);
-
- Assert.assertEquals(sha, stringFormatMethod);
- }
-
@Test(expected = RuntimeException.class)
public void invalidConfigResultsInIOException() throws RuntimeException {
HashMap<String, Object> conf = new HashMap<>();
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
index d8364dc..19e6cbe 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
@@ -63,9 +63,8 @@
switch (auth) {
case "DIGEST":
- Configuration loginConf = ClientAuthUtils.getConfiguration(config);
authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST;
- this.secret = ClientAuthUtils.makeDigestPayload(loginConf, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+ this.secret = ClientAuthUtils.makeDigestPayload(config, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
if (this.secret == null) {
LOG.error("Can't start pacemaker server without digest secret.");
throw new RuntimeException("Can't start pacemaker server without digest secret.");
diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
index 49a4641..7802390 100644
--- a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
@@ -576,7 +576,7 @@
public void getTransportPluginThrowsRunimeTest() {
Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
- ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
+ ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf);
}
@Test