Merge pull request #3214 from RuiLi8080/MINOR-Nimbus
[MINOR] remove some unused fields and correct comment typo
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
private final Map<String, Object> conf;
+ private final Map<String, Object> topologyConf;
private final IContext context;
private final String topologyId;
private final String assignmentId;
@@ -93,7 +93,6 @@
private Collection<IAutoCredentials> autoCreds;
private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
-
/**
* TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
* what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@
* @param port - port on which the worker runs
* @param workerId - worker id
*/
-
public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
- int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+ int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+ throws IOException {
this.conf = conf;
this.context = context;
this.topologyId = topologyId;
@@ -118,7 +117,25 @@
this.workerId = workerId;
this.logConfigManager = new LogConfigManager();
this.metricRegistry = new StormMetricRegistry();
- this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+ this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+ if (supervisorIfaceSupplier == null) {
+ this.supervisorIfaceSupplier = () -> {
+ try {
+ return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ };
+ } else {
+ this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+ }
+ }
+
+ public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+ int supervisorPort, int port, String workerId) throws IOException {
+ this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
}
public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@
Utils.setupDefaultUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
- Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
- try {
- return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
- } catch (UnknownHostException e) {
- throw Utils.wrapInRuntime(e);
- }
- };
- Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
- Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+ Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
worker.start();
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
Charset.forName("UTF-8"));
}
- final Map<String, Object> topologyConf =
- ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
- () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+ () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
);
}
- private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+ private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
+ if (ConfigUtils.isLocalMode(conf)) {
Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@
} catch (Exception tr1) {
//If any error/exception thrown, report directly to nimbus.
LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
- try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+ try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
} catch (Exception tr2) {
//if any error/exception thrown, just ignore.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index d6a345e..27b356a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -34,7 +34,6 @@
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.log4j.Logger;
-import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.shade.org.apache.zookeeper.Shell;
import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
@@ -296,8 +295,14 @@
+ System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
}
Configuration configuration = this.getConfiguration(jaasConfFile);
- LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
- loginContext.login();
+ LoginContext loginContext;
+ try {
+ loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
+ loginContext.login();
+ } catch (LoginException e) {
+ LOG.error("Login using jaas conf " + jaasConfFile + " failed");
+ throw e;
+ }
LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
return loginContext;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
index 6a9b703..3db487d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
@@ -409,9 +409,17 @@
Set<Object> creds = subject.getPrivateCredentials();
synchronized (creds) {
WorkerToken previous = findWorkerToken(subject, type);
- creds.add(token);
- if (previous != null) {
- creds.remove(previous);
+ boolean notAlreadyContained = creds.add(token);
+ if (notAlreadyContained) {
+ if (previous != null) {
+ //this means token is not equal to previous so we should remove previous
+ creds.remove(previous);
+ LOG.info("Replaced WorkerToken for service type {}", type);
+ } else {
+ LOG.info("Added new WorkerToken for service type {}", type);
+ }
+ } else {
+ LOG.info("The new WorkerToken for service type {} is the same as the previous token", type);
}
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index 6936555..668d4b3 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -172,6 +172,7 @@
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
private void populateSubjectWithTGT(Subject subject, Map<String, String> credentials) {
+ LOG.info("Populating TGT from credentials");
KerberosTicket tgt = getTGT(credentials);
if (tgt != null) {
clearCredentials(subject, tgt);
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index ccb5feb..734d660 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -138,7 +138,7 @@
try {
curator.delete().forPath(path);
} catch (KeeperException.NoNodeException nne) {
- LOG.warn("Path {} already deleted.");
+ LOG.warn("Path {} already deleted.", path);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
index c71a58c..564f68d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
@@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
@@ -144,7 +145,7 @@
ret = new VersionInfoImpl(info);
break;
} catch (IOException e) {
- LOG.error("Skipping {} get an error while trying to parse the file.", part, e);
+ LOG.error("Skipping {}; got an error while trying to parse the file.", part, e);
}
}
} else if (part.toLowerCase().endsWith(".jar")
@@ -164,10 +165,27 @@
}
}
} catch (IOException e) {
- LOG.error("Skipping {} get an error while trying to parse the jar file.", part, e);
+ LOG.error("Skipping {}; got an error while trying to parse the jar file.", part, e);
+ }
+ } else if (p.endsWith("*")) {
+ //for a path like /<parent-path>/*
+ try {
+ Path parent = p.getParent();
+ List<String> children = new ArrayList<>();
+ Files.list(parent)
+ //avoid infinite recursion
+ .filter(path -> !path.endsWith("*"))
+ .forEach(path -> children.add(path.toString()));
+ IVersionInfo resFromChildren = getFromClasspath(children, propFileName);
+ if (resFromChildren != null) {
+ ret = resFromChildren;
+ break;
+ }
+ } catch (NullPointerException | IOException e) {
+ LOG.error("Skipping {}; got an error while trying to parse it", part, e);
}
} else {
- LOG.warn("Skipping {} don't know what to do with it.", part);
+ LOG.warn("Skipping {}; don't know what to do with it.", part);
}
}
return ret;
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index f52df59..eb4ce7d 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -79,7 +79,7 @@
throw new RuntimeException(e);
}
}
- LOG.debug("Will use {} for validation", ret);
+ LOG.info("Will use {} for validation", ret);
configClasses = ret;
}
return configClasses;
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
index 92dcc7b..ffaefda 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -20,6 +20,7 @@
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
+import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,15 @@
}
Map<String, Object> confOverrides = (Map<String, Object>) cl.get("t");
+ Map<String, Object> jvmOpts = Utils.readCommandLineOpts(); // values in -Dstorm.options (originally -c in storm.py)
+ if (jvmOpts != null && !jvmOpts.isEmpty()) {
+ if (confOverrides == null) {
+ confOverrides = jvmOpts;
+ } else {
+ confOverrides.putAll(jvmOpts); // override with values obtained from -Dstorm.options
+ }
+ LOG.info("Rebalancing topology with overrides {}", JSONObject.toJSONString(confOverrides));
+ }
if (null != confOverrides) {
rebalanceOptions.set_topology_conf_overrides(JSONValue.toJSONString(confOverrides));