Merge pull request #3208 from Ethanlm/STORM-3579
[STORM-3579] use the topo conf for thrift client in Worker code
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
private final Map<String, Object> conf;
+ private final Map<String, Object> topologyConf;
private final IContext context;
private final String topologyId;
private final String assignmentId;
@@ -93,7 +93,6 @@
private Collection<IAutoCredentials> autoCreds;
private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
-
/**
* TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
* what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@
* @param port - port on which the worker runs
* @param workerId - worker id
*/
-
public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
- int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+ int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+ throws IOException {
this.conf = conf;
this.context = context;
this.topologyId = topologyId;
@@ -118,7 +117,25 @@
this.workerId = workerId;
this.logConfigManager = new LogConfigManager();
this.metricRegistry = new StormMetricRegistry();
- this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+ this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+ if (supervisorIfaceSupplier == null) {
+ this.supervisorIfaceSupplier = () -> {
+ try {
+ return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ };
+ } else {
+ this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+ }
+ }
+
+ public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+ int supervisorPort, int port, String workerId) throws IOException {
+ this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
}
public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@
Utils.setupDefaultUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
- Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
- try {
- return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
- } catch (UnknownHostException e) {
- throw Utils.wrapInRuntime(e);
- }
- };
- Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
- Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+ Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
worker.start();
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
Charset.forName("UTF-8"));
}
- final Map<String, Object> topologyConf =
- ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
- () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+ () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
);
}
- private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+ private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
+ if (ConfigUtils.isLocalMode(conf)) {
Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@
} catch (Exception tr1) {
//If any error/exception thrown, report directly to nimbus.
LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
- try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+ try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
} catch (Exception tr2) {
//if any error/exception thrown, just ignore.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index d6a345e..27b356a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -34,7 +34,6 @@
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.log4j.Logger;
-import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.shade.org.apache.zookeeper.Shell;
import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
@@ -296,8 +295,14 @@
+ System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
}
Configuration configuration = this.getConfiguration(jaasConfFile);
- LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
- loginContext.login();
+ LoginContext loginContext;
+ try {
+ loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
+ loginContext.login();
+ } catch (LoginException e) {
+ LOG.error("Login using jaas conf " + jaasConfFile + " failed");
+ throw e;
+ }
LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
return loginContext;
}