Merge pull request #3291 from kishorvpatil/STORM-3655
STORM-3655: Worker should die if its assignment has changed
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index db0c0e5..8e5af6a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -168,7 +168,6 @@
this.autoCredentials = autoCredentials;
this.conf = conf;
this.supervisorIfaceSupplier = supervisorIfaceSupplier;
- this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
this.topologyId = topologyId;
this.assignmentId = assignmentId;
@@ -176,6 +175,8 @@
this.workerId = workerId;
this.stateStorage = stateStorage;
this.stormClusterState = stormClusterState;
+ this.localExecutors =
+ new HashSet<>(readWorkerExecutors(assignmentId, port, getLocalAssignment(this.stormClusterState, topologyId)));
this.isWorkerActive = new CountDownLatch(1);
this.isTopologyActive = new AtomicBoolean(false);
this.stormComponentToDebug = new AtomicReference<>();
@@ -374,7 +375,23 @@
public SmartThread makeTransferThread() {
return workerTransfer.makeTransferThread();
}
-
+
+ public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
+ if (assignment != null) {
+ Set<List<Long>> assignedExecutors = new HashSet<>(readWorkerExecutors(assignmentId, port, assignment));
+ if (!localExecutors.equals(assignedExecutors)) {
+ LOG.info("Found conflicting assignments. We shouldn't be alive!"
+ + " Assigned: " + assignedExecutors + ", Current: "
+ + localExecutors);
+ if (!ConfigUtils.isLocalMode(conf)) {
+ suicideCallback.run();
+ } else {
+ LOG.info("Local worker tried to commit suicide!");
+ }
+ }
+ }
+ }
+
public void refreshConnections() {
Assignment assignment = null;
try {
@@ -383,6 +400,7 @@
LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
}
+ suicideIfLocalAssignmentsChanged(assignment);
Set<NodeInfo> neededConnections = new HashSet<>();
Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
if (null != assignment) {
@@ -631,13 +649,11 @@
return this.autoCredentials;
}
- private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
- int port) {
- LOG.info("Reading assignments");
+ private List<List<Long>> readWorkerExecutors(String assignmentId, int port, Assignment assignment) {
List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
Map<List<Long>, NodeInfo> executorToNodePort =
- getLocalAssignment(stormClusterState, topologyId).get_executor_node_port();
+ assignment.get_executor_node_port();
for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
NodeInfo nodeInfo = entry.getValue();
if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {