Merge pull request #3291 from kishorvpatil/STORM-3655

STORM-3655: Worker should die if its assignment has changed
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index db0c0e5..8e5af6a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -168,7 +168,6 @@
         this.autoCredentials = autoCredentials;
         this.conf = conf;
         this.supervisorIfaceSupplier = supervisorIfaceSupplier;
-        this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
@@ -176,6 +175,8 @@
         this.workerId = workerId;
         this.stateStorage = stateStorage;
         this.stormClusterState = stormClusterState;
+        this.localExecutors =
+            new HashSet<>(readWorkerExecutors(assignmentId, port, getLocalAssignment(this.stormClusterState, topologyId)));
         this.isWorkerActive = new CountDownLatch(1);
         this.isTopologyActive = new AtomicBoolean(false);
         this.stormComponentToDebug = new AtomicReference<>();
@@ -374,7 +375,23 @@
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
-    
+
+    public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
+        if (assignment != null) {
+            Set<List<Long>> assignedExecutors = new HashSet<>(readWorkerExecutors(assignmentId, port, assignment));
+            if (!localExecutors.equals(assignedExecutors)) {
+                LOG.info("Found conflicting assignments. We shouldn't be alive!"
+                         + " Assigned: " + assignedExecutors + ", Current: "
+                         + localExecutors);
+                if (!ConfigUtils.isLocalMode(conf)) {
+                    suicideCallback.run();
+                } else {
+                    LOG.info("Local worker tried to commit suicide!");
+                }
+            }
+        }
+    }
+
     public void refreshConnections() {
         Assignment assignment = null;
         try {
@@ -383,6 +400,7 @@
             LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
         }
 
+        suicideIfLocalAssignmentsChanged(assignment);
         Set<NodeInfo> neededConnections = new HashSet<>();
         Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
         if (null != assignment) {
@@ -631,13 +649,11 @@
         return this.autoCredentials;
     }
 
-    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
-                                                 int port) {
-        LOG.info("Reading assignments");
+    private List<List<Long>> readWorkerExecutors(String assignmentId, int port, Assignment assignment) {
         List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
         executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
         Map<List<Long>, NodeInfo> executorToNodePort = 
-            getLocalAssignment(stormClusterState, topologyId).get_executor_node_port();
+            assignment.get_executor_node_port();
         for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
             NodeInfo nodeInfo = entry.getValue();
             if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {