Merge pull request #3181 from marjoral/STORM-3552
STORM-3552 Fix update log level from command line
diff --git a/docs/Setting-up-a-Storm-cluster.md b/docs/Setting-up-a-Storm-cluster.md
index ea05da4..b57639e 100644
--- a/docs/Setting-up-a-Storm-cluster.md
+++ b/docs/Setting-up-a-Storm-cluster.md
@@ -30,7 +30,7 @@
Next you need to install Storm's dependencies on Nimbus and the worker machines. These are:
1. Java 8+ (Apache Storm 2.x is tested through travis ci against a java 8 JDK)
-2. Python 2.6.6 (Python 3.x should work too, but is not tested as part of our CI enviornment)
+2. Python 2.7.x or Python 3.x
These are the versions of the dependencies that have been tested with Storm. Storm may or may not work with different versions of Java and/or Python.
diff --git a/docs/flux.md b/docs/flux.md
index 27d28bb..000270f 100644
--- a/docs/flux.md
+++ b/docs/flux.md
@@ -47,7 +47,7 @@
If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
on your system:
-* Python 2.6.x or later
+* Python 2.7.x or later
* Node.js 0.10.x or later
#### Building with unit tests enabled:
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
index c4d15fc..31425d4 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -23,7 +23,6 @@
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -55,26 +54,16 @@
SaslUtils.KERBEROS);
LOG.info("Creating Kerberos Client.");
-
- Configuration loginConf;
- try {
- loginConf = ClientAuthUtils.getConfiguration(topoConf);
- } catch (Throwable t) {
- LOG.error("Failed to get loginConf: ", t);
- throw t;
- }
LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
+ String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
+
subject = null;
try {
- LOG.debug("Setting Configuration to login_config: {}", loginConf);
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
- //now login
- LOG.debug("Trying to login.");
- Login login = new Login(jaasSection, ch);
+ LOG.debug("Trying to login using {}.", jaasConfFile);
+ Login login = new Login(jaasSection, ch, jaasConfFile);
subject = login.getSubject();
LOG.debug("Got Subject: {}", subject.toString());
} catch (LoginException ex) {
@@ -88,12 +77,12 @@
throw new RuntimeException("Fail to verify user principal with section \""
+ jaasSection
+ "\" in login configuration file "
- + loginConf);
+ + jaasConfFile);
}
String serviceName = null;
try {
- serviceName = ClientAuthUtils.get(loginConf, jaasSection, "serviceName");
+ serviceName = ClientAuthUtils.get(topoConf, jaasSection, "serviceName");
} catch (IOException e) {
LOG.error("Failed to get service name.", e);
throw new RuntimeException(e);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
index ee410d6..fefcdc6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -25,7 +25,6 @@
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
@@ -49,28 +48,17 @@
KerberosSaslNettyServer(Map<String, Object> topoConf, String jaasSection, List<String> authorizedUsers) {
this.authorizedUsers = authorizedUsers;
- LOG.debug("Getting Configuration.");
- Configuration loginConf;
- try {
- loginConf = ClientAuthUtils.getConfiguration(topoConf);
- } catch (Throwable t) {
- LOG.error("Failed to get loginConf: ", t);
- throw t;
- }
LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(authorizedUsers);
+ String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
//login our principal
subject = null;
try {
- LOG.debug("Setting Configuration to login_config: {}", loginConf);
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
- //now login
- LOG.debug("Trying to login.");
- Login login = new Login(jaasSection, ch);
+ LOG.debug("Trying to login using {}.", jaasConfFile);
+ Login login = new Login(jaasSection, ch, jaasConfFile);
subject = login.getSubject();
LOG.debug("Got Subject: {}", subject.toString());
} catch (LoginException ex) {
@@ -84,7 +72,7 @@
throw new RuntimeException("Fail to verify user principal with section \""
+ jaasSection
+ "\" in login configuration file "
- + loginConf);
+ + jaasConfFile);
}
try {
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index 6a50d84..d6a345e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -19,6 +19,9 @@
* does not die.
*/
+import java.io.File;
+import java.net.URI;
+import java.security.URIParameter;
import java.util.Date;
import java.util.Random;
import java.util.Set;
@@ -31,6 +34,7 @@
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.log4j.Logger;
+import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.shade.org.apache.zookeeper.Shell;
import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
@@ -57,12 +61,9 @@
private Thread thread = null;
private boolean isKrbTicket = false;
private boolean isUsingTicketCache = false;
- private boolean isUsingKeytab = false;
private LoginContext login = null;
private String loginContextName = null;
- private String keytabFile = null;
private String principal = null;
-
private long lastLogin = 0;
/**
@@ -77,14 +78,14 @@
* @throws javax.security.auth.login.LoginException
* Thrown if authentication fails.
*/
- public Login(final String loginContextName, CallbackHandler callbackHandler)
+ public Login(final String loginContextName, CallbackHandler callbackHandler, String jaasConfFile)
throws LoginException {
this.callbackHandler = callbackHandler;
- login = login(loginContextName);
+ login = login(loginContextName, jaasConfFile);
this.loginContextName = loginContextName;
subject = login.getSubject();
isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
- AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+ AppConfigurationEntry[] entries = this.getConfiguration(jaasConfFile).getAppConfigurationEntry(loginContextName);
for (AppConfigurationEntry entry : entries) {
// there will only be a single entry, so this for() loop will only be iterated through once.
if (entry.getOptions().get("useTicketCache") != null) {
@@ -93,10 +94,6 @@
isUsingTicketCache = true;
}
}
- if (entry.getOptions().get("keyTab") != null) {
- keytabFile = (String) entry.getOptions().get("keyTab");
- isUsingKeytab = true;
- }
if (entry.getOptions().get("principal") != null) {
principal = (String) entry.getOptions().get("principal");
}
@@ -251,6 +248,19 @@
thread.setDaemon(true);
}
+ private Configuration getConfiguration(String jaasConfFile) {
+ File configFile = new File(jaasConfFile);
+ if (!configFile.canRead()) {
+ throw new RuntimeException("File " + jaasConfFile + " cannot be read.");
+ }
+ try {
+ URI configUri = configFile.toURI();
+ return Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to get configuration for " + jaasConfFile, ex);
+ }
+ }
+
public void startThreadIfNeeded() {
// thread object 'thread' will be null if a refresh thread is not needed.
if (thread != null) {
@@ -277,7 +287,7 @@
return loginContextName;
}
- private synchronized LoginContext login(final String loginContextName) throws LoginException {
+ private synchronized LoginContext login(final String loginContextName, String jaasConfFile) throws LoginException {
if (loginContextName == null) {
throw new LoginException("loginContext name (JAAS file section header) was null. "
+ "Please check your java.security.login.auth.config (="
@@ -285,9 +295,10 @@
+ ") and your " + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY + "(="
+ System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
}
- LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);
+ Configuration configuration = this.getConfiguration(jaasConfFile);
+ LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
loginContext.login();
- LOG.info("successfully logged in.");
+ LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
return loginContext;
}
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index b77e003..30faf1f 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -73,9 +73,8 @@
switch (auth) {
case "DIGEST":
- Configuration loginConf = ClientAuthUtils.getConfiguration(config);
authMethod = ThriftNettyClientCodec.AuthMethod.DIGEST;
- secret = ClientAuthUtils.makeDigestPayload(loginConf, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+ secret = ClientAuthUtils.makeDigestPayload(config, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
if (secret == null) {
LOG.error("Can't start pacemaker server without digest secret.");
throw new RuntimeException("Can't start pacemaker server without digest secret.");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
index 9fb0e4b..6a9b703 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
@@ -60,6 +60,10 @@
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
+ public static String getJaasConf(Map<String, Object> topoConf) {
+ return (String) topoConf.get("java.security.auth.login.config");
+ }
+
/**
* Construct a JAAS configuration object per storm configuration file.
*
@@ -70,7 +74,7 @@
Configuration loginConf = null;
//find login file configuration from Storm configuration
- String loginConfigurationFile = (String) topoConf.get("java.security.auth.login.config");
+ String loginConfigurationFile = getJaasConf(topoConf);
if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) {
File configFile = new File(loginConfigurationFile);
if (!configFile.canRead()) {
@@ -111,12 +115,13 @@
/**
* Pull a set of keys out of a Configuration.
*
- * @param configuration The config to pull the key/value pairs out of.
+ * @param topoConf The config containing the jaas conf file.
* @param section The app configuration entry name to get stuff from.
* @return Return a map of the configs in conf.
*/
- public static SortedMap<String, ?> pullConfig(Configuration configuration,
+ public static SortedMap<String, ?> pullConfig(Map<String, Object> topoConf,
String section) throws IOException {
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
if (configurationEntries == null) {
@@ -138,12 +143,17 @@
/**
* Pull a the value given section and key from Configuration.
*
- * @param configuration The config to pull the key/value pairs out of.
+ * @param topoConf The config containing the jaas conf file.
* @param section The app configuration entry name to get stuff from.
* @param key The key to look up inside of the section
* @return Return a the String value of the configuration value
*/
- public static String get(Configuration configuration, String section, String key) throws IOException {
+ public static String get(Map<String, Object> topoConf, String section, String key) throws IOException {
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
+ return get(configuration, section, key);
+ }
+
+ static String get(Configuration configuration, String section, String key) throws IOException {
AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
if (configurationEntries == null) {
@@ -456,22 +466,22 @@
/**
* Construct a transport plugin per storm configuration.
*/
- public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf) {
+ public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf) {
try {
String transportPluginClassName = type.getTransportPlugin(topoConf);
ITransportPlugin transportPlugin = ReflectionUtils.newInstance(transportPluginClassName);
- transportPlugin.prepare(type, topoConf, loginConf);
+ transportPlugin.prepare(type, topoConf);
return transportPlugin;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- public static String makeDigestPayload(Configuration loginConfig, String configSection) {
+ public static String makeDigestPayload(Map<String, Object> topoConf, String configSection) {
String username = null;
String password = null;
try {
- Map<String, ?> results = ClientAuthUtils.pullConfig(loginConfig, configSection);
+ Map<String, ?> results = ClientAuthUtils.pullConfig(topoConf, configSection);
username = (String) results.get(USERNAME);
password = (String) results.get(PASSWORD);
} catch (Exception e) {
@@ -492,6 +502,8 @@
}
}
+
+
public static byte[] serializeKerberosTicket(KerberosTicket tgt) throws Exception {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bao);
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
index 6bf3b0b..19ec374 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
@@ -29,9 +29,8 @@
*
* @param type the type of connection this will process.
* @param topoConf Storm configuration
- * @param loginConf login configuration
*/
- void prepare(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf);
+ void prepare(ThriftConnectionType type, Map<String, Object> topoConf);
/**
* Create a server associated with a given port, service handler, and purpose.
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 21a6ce1..0605385 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -48,14 +48,12 @@
private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
protected ThriftConnectionType type;
protected Map<String, Object> topoConf;
- protected Configuration loginConf;
private int port;
@Override
- public void prepare(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf) {
+ public void prepare(ThriftConnectionType type, Map<String, Object> topoConf) {
this.type = type;
this.topoConf = topoConf;
- this.loginConf = loginConf;
}
@Override
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index a9d9129..88d70ec 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -83,11 +83,8 @@
socket.setTimeout(timeout);
}
- //locate login configuration
- Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
-
//construct a transport plugin
- ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf, loginConf);
+ ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf);
//TODO get this from type instead of hardcoding to Nimbus.
//establish client-server transport via plugin
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index f6102bc..9938b7c 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -12,10 +12,8 @@
package org.apache.storm.security.auth;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
-import javax.security.auth.login.Configuration;
import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
import org.apache.storm.thrift.TProcessor;
import org.apache.storm.thrift.server.TServer;
@@ -29,7 +27,6 @@
private final Map<String, Object> conf; //storm configuration
private final ThriftConnectionType type;
private TServer server;
- private Configuration loginConf;
private int port;
private boolean areWorkerTokensSupported;
private ITransportPlugin transportPlugin;
@@ -40,14 +37,8 @@
this.type = type;
try {
- //retrieve authentication configuration
- loginConf = ClientAuthUtils.getConfiguration(this.conf);
- } catch (Exception x) {
- LOG.error(x.getMessage(), x);
- }
- try {
//locate our thrift transport plugin
- transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
+ transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf);
//server
server = transportPlugin.getServer(this.processor);
port = transportPlugin.getPort();
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index e3e4497..5a77d51 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -16,6 +16,8 @@
import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
import org.apache.storm.generated.WorkerToken;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
@@ -44,7 +46,7 @@
//create an authentication callback handler
CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(impersonationAllowed,
workerTokenAuthorizer,
- new JassPasswordProvider(loginConf));
+ new JassPasswordProvider(conf));
//create a transport factory that will invoke our auth callback for digest
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
@@ -60,7 +62,11 @@
WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
if (token != null) {
clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
- } else if (loginConf != null) {
+ } else {
+ Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
+ if (loginConf == null) {
+ throw new IOException("Could not find any way to authenticate with the server.");
+ }
AppConfigurationEntry[] configurationEntries = loginConf.getAppConfigurationEntry(ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
if (configurationEntries == null) {
String errorMessage = "Could not find a '" + ClientAuthUtils.LOGIN_CONTEXT_CLIENT
@@ -76,8 +82,6 @@
password = (String) options.getOrDefault("password", password);
}
clientCallbackHandler = new SimpleSaslClientCallbackHandler(username, password);
- } else {
- throw new IOException("Could not find any way to authenticate with the server.");
}
TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
index 5c0d7a4..1d5c7e3 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
@@ -36,10 +36,12 @@
/**
* Constructor.
*
- * @param configuration the jaas configuration to get the credentials out of.
+ * @param topoConf the configuration containing the jaas conf to use.
* @throws IOException if we could not read the Server section in the jaas conf.
*/
- public JassPasswordProvider(Configuration configuration) throws IOException {
+ public JassPasswordProvider(Map<String, Object> topoConf) throws IOException {
+
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
if (configuration == null) {
return;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index c4f9f91..6936555 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -121,11 +121,10 @@
//Log the user in and get the TGT
try {
Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
- ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(loginConf);
+ ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(conf);
//login our user
- Configuration.setConfiguration(loginConf);
- LoginContext lc = new LoginContext(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler);
+ LoginContext lc = new LoginContext(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, null, clientCallbackHandler, loginConf);
try {
lc.login();
final Subject subject = lc.getSubject();
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
index 64673ce..f9a8e09 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -13,6 +13,7 @@
package org.apache.storm.security.auth.kerberos;
import java.io.IOException;
+import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -36,7 +37,8 @@
*
* <p>For digest, you should have a pair of user name and password defined in this figgure.
*/
- public ClientCallbackHandler(Configuration configuration) throws IOException {
+ public ClientCallbackHandler(Map<String, Object> topoConf) throws IOException {
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
if (configuration == null) {
return;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 915473b..5b42886 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -58,15 +58,15 @@
workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
}
//create an authentication callback handler
- CallbackHandler serverCallbackHandler = new ServerCallbackHandler(loginConf, impersonationAllowed);
+ CallbackHandler serverCallbackHandler = new ServerCallbackHandler(conf, impersonationAllowed);
+
+ String jaasConfFile = ClientAuthUtils.getJaasConf(conf);
//login our principal
Subject subject = null;
try {
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
//now login
- Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_SERVER, serverCallbackHandler);
+ Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_SERVER, serverCallbackHandler, jaasConfFile);
subject = login.getSubject();
login.startThreadIfNeeded();
} catch (LoginException ex) {
@@ -77,10 +77,10 @@
//check the credential of our principal
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
throw new RuntimeException("Fail to verify user principal with section \""
- + ClientAuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + loginConf);
+ + ClientAuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + jaasConfFile);
}
- String principal = ClientAuthUtils.get(loginConf, ClientAuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+ String principal = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_SERVER, "principal");
LOG.debug("principal:" + principal);
KerberosName serviceKerberosName = new KerberosName(principal);
String serviceName = serviceKerberosName.getServiceName();
@@ -107,11 +107,9 @@
private Login mkLogin() throws IOException {
try {
//create an authentication callback handler
- ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(loginConf);
- //specify a configuration object to be used
- Configuration.setConfiguration(loginConf);
+ ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(conf);
//now login
- Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler);
+ Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler, ClientAuthUtils.getJaasConf(conf));
login.startThreadIfNeeded();
return login;
} catch (LoginException ex) {
@@ -142,7 +140,7 @@
private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException {
//login our user
- SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(loginConf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
+ SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
if (authConf == null) {
throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null");
}
@@ -180,11 +178,11 @@
final Subject subject = login.getSubject();
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
throw new RuntimeException("Fail to verify user principal with section \""
- + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + loginConf);
+ + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + ClientAuthUtils.getJaasConf(conf));
}
final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
- String serviceName = ClientAuthUtils.get(loginConf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+ String serviceName = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
if (serviceName == null) {
serviceName = ClientAuthUtils.SERVICE;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 05630e9..daa5fbf 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -13,6 +13,7 @@
package org.apache.storm.security.auth.kerberos;
import java.io.IOException;
+import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -35,8 +36,10 @@
private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
private final boolean impersonationAllowed;
- public ServerCallbackHandler(Configuration configuration, boolean impersonationAllowed) throws IOException {
+ public ServerCallbackHandler(Map<String, Object> topoConf, boolean impersonationAllowed) throws IOException {
this.impersonationAllowed = impersonationAllowed;
+
+ Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
if (configuration == null) {
return;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index b204c1a..49f5b82 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -49,14 +49,12 @@
public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable {
protected ThriftConnectionType type;
protected Map<String, Object> conf;
- protected Configuration loginConf;
private int port;
@Override
- public void prepare(ThriftConnectionType type, Map<String, Object> conf, Configuration loginConf) {
+ public void prepare(ThriftConnectionType type, Map<String, Object> conf) {
this.type = type;
this.conf = conf;
- this.loginConf = loginConf;
}
@Override
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
index 9f09ee4..1aa854b 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
@@ -95,8 +95,9 @@
@Test
public void objGettersReturnNullWithNullConfigTest() throws IOException {
- Assert.assertNull(ClientAuthUtils.pullConfig(null, "foo"));
- Assert.assertNull(ClientAuthUtils.get(null, "foo", "bar"));
+ Map<String, Object> topoConf = new HashMap<>();
+ Assert.assertNull(ClientAuthUtils.pullConfig(topoConf, "foo"));
+ Assert.assertNull(ClientAuthUtils.get(topoConf, "foo", "bar"));
Assert.assertNull(ClientAuthUtils.getConfiguration(Collections.emptyMap()));
}
@@ -141,39 +142,6 @@
Mockito.verify(autoCred, Mockito.times(1)).populateSubject(subject, cred);
}
- @Test
- public void makeDigestPayloadTest() throws NoSuchAlgorithmException {
- String section = "user-pass-section";
- Map<String, String> optionMap = new HashMap<String, String>();
- String user = "user";
- String pass = "pass";
- optionMap.put("username", user);
- optionMap.put("password", pass);
- AppConfigurationEntry entry = Mockito.mock(AppConfigurationEntry.class);
-
- Mockito.<Map<String, ?>>when(entry.getOptions()).thenReturn(optionMap);
- Configuration mockConfig = Mockito.mock(Configuration.class);
- Mockito.when(mockConfig.getAppConfigurationEntry(section))
- .thenReturn(new AppConfigurationEntry[]{ entry });
-
- MessageDigest digest = MessageDigest.getInstance("SHA-512");
- byte[] output = digest.digest((user + ":" + pass).getBytes());
- String sha = Hex.encodeHexString(output);
-
- // previous code used this method to generate the string, ensure the two match
- StringBuilder builder = new StringBuilder();
- for (byte b : output) {
- builder.append(String.format("%02x", b));
- }
- String stringFormatMethod = builder.toString();
-
- Assert.assertEquals(
- ClientAuthUtils.makeDigestPayload(mockConfig, "user-pass-section"),
- sha);
-
- Assert.assertEquals(sha, stringFormatMethod);
- }
-
@Test(expected = RuntimeException.class)
public void invalidConfigResultsInIOException() throws RuntimeException {
HashMap<String, Object> conf = new HashMap<>();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index df419b9..85e5f9a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -12,8 +12,10 @@
package org.apache.storm.daemon.supervisor;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -36,13 +38,13 @@
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.BlobChangingCallback;
import org.apache.storm.localizer.GoodToGo;
import org.apache.storm.localizer.LocallyCachedBlob;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.EnumUtil;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
@@ -174,9 +176,80 @@
}
/**
- * Decide the equivalence of two local assignments, ignoring the order of executors
- * This is different from #equal method.
- * @param first Local assignment A
+ * This method compares WorkerResources while considering any resources are NULL to be 0.0
+ *
+ * @param first WorkerResources A
+ * @param second WorkerResources B
+ * @return True if A and B are equivalent, treating the absent resources as 0.0
+ */
+ @VisibleForTesting
+ static boolean customWorkerResourcesEquality(WorkerResources first, WorkerResources second) {
+ if (first == null) {
+ return false;
+ }
+ if (first == second) {
+ return true;
+ }
+ if (first.equals(second)) {
+ return true;
+ }
+
+ if (first.get_cpu() != second.get_cpu()) {
+ return false;
+ }
+ if (first.get_mem_on_heap() != second.get_mem_on_heap()) {
+ return false;
+ }
+ if (first.get_mem_off_heap() != second.get_mem_off_heap()) {
+ return false;
+ }
+ if (first.get_shared_mem_off_heap() != second.get_shared_mem_off_heap()) {
+ return false;
+ }
+ if (first.get_shared_mem_on_heap() != second.get_shared_mem_on_heap()) {
+ return false;
+ }
+ if (!customResourceMapEquality(first.get_resources(), second.get_resources())) {
+ return false;
+ }
+ if (!customResourceMapEquality(first.get_shared_resources(), second.get_shared_resources())) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This method compares Resource Maps while considering any resources are NULL to be 0.0
+ *
+ * @param firstMap Resource Map A
+ * @param secondMap Resource Map B
+ * @return True if A and B are equivalent, treating the absent resources as 0.0
+ */
+ private static boolean customResourceMapEquality(Map<String, Double> firstMap, Map<String, Double> secondMap) {
+ if (firstMap == null && secondMap == null) {
+ return true;
+ }
+ if (firstMap == null) {
+ firstMap = new HashMap<>();
+ }
+ if (secondMap == null) {
+ secondMap = new HashMap<>();
+ }
+
+ Set<String> keys = new HashSet<>(firstMap.keySet());
+ keys.addAll(secondMap.keySet());
+ for (String key : keys) {
+ if (firstMap.getOrDefault(key, 0.0).doubleValue() != secondMap.getOrDefault(key, 0.0).doubleValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Decide the equivalence of two local assignments, ignoring the order of executors This is different from #equal method.
+ *
+ * @param first Local assignment A
* @param second Local assignment B
* @return True if A and B are equivalent, ignoring the order of the executors
*/
@@ -196,9 +269,9 @@
return true;
}
if (firstHasResources && secondHasResources) {
- if (first.get_resources().equals(second.get_resources())) {
- return true;
- }
+ WorkerResources firstResources = first.get_resources();
+ WorkerResources secondResources = second.get_resources();
+ return customWorkerResourcesEquality(firstResources, secondResources);
}
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index fdd130b..48ad7e8 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -262,22 +262,26 @@
while (!done) {
try {
synchronized (blob) {
- long localVersion = blob.getLocalVersion();
- long remoteVersion = blob.getRemoteVersion(blobStore);
- if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
- if (blob.isFullyDownloaded()) {
- //Avoid case of different blob version
- // when blob is not downloaded (first time download)
- numBlobUpdateVersionChanged.mark();
+ if (blob.isUsed()) {
+ long localVersion = blob.getLocalVersion();
+ long remoteVersion = blob.getRemoteVersion(blobStore);
+ if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
+ if (blob.isFullyDownloaded()) {
+ //Avoid case of different blob version
+ // when blob is not downloaded (first time download)
+ numBlobUpdateVersionChanged.mark();
+ }
+ Timer.Context t = singleBlobLocalizationDuration.time();
+ try {
+ long newVersion = blob.fetchUnzipToTemp(blobStore);
+ blob.informReferencesAndCommitNewVersion(newVersion);
+ t.stop();
+ } finally {
+ blob.cleanupOrphanedData();
+ }
}
- Timer.Context t = singleBlobLocalizationDuration.time();
- try {
- long newVersion = blob.fetchUnzipToTemp(blobStore);
- blob.informReferencesAndCommitNewVersion(newVersion);
- t.stop();
- } finally {
- blob.cleanupOrphanedData();
- }
+ } else {
+ LOG.debug("Skipping update of unused blob {}", blob);
}
}
done = true;
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
index d8364dc..19e6cbe 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
@@ -63,9 +63,8 @@
switch (auth) {
case "DIGEST":
- Configuration loginConf = ClientAuthUtils.getConfiguration(config);
authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST;
- this.secret = ClientAuthUtils.makeDigestPayload(loginConf, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+ this.secret = ClientAuthUtils.makeDigestPayload(config, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
if (this.secret == null) {
LOG.error("Can't start pacemaker server without digest secret.");
throw new RuntimeException("Can't start pacemaker server without digest secret.");
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 9dfe2d7..1cfea8a 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -15,10 +15,13 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -77,6 +80,14 @@
return resources;
}
+ static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, Double mem_off_heap, Map<String, Double> resources) {
+ WorkerResources workerResources = mkWorkerResources(cpu, mem_on_heap, mem_off_heap);
+ if (resources != null) {
+ workerResources.set_resources(resources);
+ }
+ return workerResources;
+ }
+
static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) {
LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
ret.set_topology_id(id);
@@ -108,12 +119,64 @@
}
@Test
- public void testEquivilant() {
+ public void testWorkerResourceEquality() {
+ WorkerResources resourcesRNull = mkWorkerResources(100.0, 100.0, 100.0, null);
+ WorkerResources resourcesREmpty = mkWorkerResources(100.0, 100.0, 100.0, Maps.newHashMap());
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesRNull,resourcesREmpty));
+
+ Map resources = new HashMap<String, Double>();
+ resources.put("network.resource.units", 0.0);
+ WorkerResources resourcesRNetwork = mkWorkerResources(100.0, 100.0, 100.0,resources);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetwork));
+
+
+ Map resourcesNetwork = new HashMap<String, Double>();
+ resourcesNetwork.put("network.resource.units", 50.0);
+ WorkerResources resourcesRNetworkNonZero = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetwork);
+ assertFalse(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetworkNonZero));
+
+ Map resourcesNetworkOne = new HashMap<String, Double>();
+ resourcesNetworkOne.put("network.resource.units", 50.0);
+ WorkerResources resourcesRNetworkOne = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkOne);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkNonZero));
+
+ Map resourcesNetworkTwo = new HashMap<String, Double>();
+ resourcesNetworkTwo.put("network.resource.units", 100.0);
+ WorkerResources resourcesRNetworkTwo = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkTwo);
+ assertFalse(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkTwo));
+
+ WorkerResources resourcesCpuNull = mkWorkerResources(null, 100.0,100.0);
+ WorkerResources resourcesCPUZero = mkWorkerResources(0.0, 100.0,100.0);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesCpuNull, resourcesCPUZero));
+
+ WorkerResources resourcesOnHeapMemNull = mkWorkerResources(100.0, null,100.0);
+ WorkerResources resourcesOnHeapMemZero = mkWorkerResources(100.0, 0.0,100.0);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesOnHeapMemNull, resourcesOnHeapMemZero));
+
+ WorkerResources resourcesOffHeapMemNull = mkWorkerResources(100.0, 100.0,null);
+ WorkerResources resourcesOffHeapMemZero = mkWorkerResources(100.0, 100.0,0.0);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesOffHeapMemNull, resourcesOffHeapMemZero));
+
+ }
+
+ @Test
+ public void testEquivalent() {
LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 100.0, 100.0));
LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0));
LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0));
LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0));
+ LocalAssignment c = mkLocalAssignment("C", mkExecutorInfoList(188, 261),mkWorkerResources(400.0,10000.0,0.0));
+
+ WorkerResources workerResources = mkWorkerResources(400.0, 10000.0, 0.0);
+ Map<String, Double> additionalResources = workerResources.get_resources();
+ if( additionalResources == null) additionalResources = new HashMap<>();
+ additionalResources.put("network.resource.units", 0.0);
+
+ workerResources.set_resources(additionalResources);
+ LocalAssignment cReordered = mkLocalAssignment("C", mkExecutorInfoList(188, 261), workerResources);
+
+ assertTrue(Slot.equivalent(c,cReordered));
assertTrue(Slot.equivalent(null, null));
assertTrue(Slot.equivalent(a, a));
assertTrue(Slot.equivalent(b, bReordered));
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index d3c1d74..f97c868 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -126,18 +126,21 @@
when(jarBlob.getLocalVersion()).thenReturn(-1L);
when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
+ when(jarBlob.isUsed()).thenReturn(true);
LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(codeBlob).when(victim).getTopoCode(topoId, localAssignment.get_owner());
when(codeBlob.getLocalVersion()).thenReturn(-1L);
when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
+ when(codeBlob.isUsed()).thenReturn(true);
LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(confBlob).when(victim).getTopoConf(topoId, localAssignment.get_owner());
when(confBlob.getLocalVersion()).thenReturn(-1L);
when(confBlob.getRemoteVersion(any())).thenReturn(300L);
when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
+ when(confBlob.isUsed()).thenReturn(true);
when(mockedReflectionUtils.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
index 49a4641..7802390 100644
--- a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
@@ -576,7 +576,7 @@
public void getTransportPluginThrowsRunimeTest() {
Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
- ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
+ ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf);
}
@Test