Merge pull request #3251 from agresch/agresch_storm_3623

STORM-3623 executors should only report their metrics for V2 metric tick
diff --git a/external/storm-kafka-migration/pom.xml b/external/storm-kafka-migration/pom.xml
index ca0891d..610a960 100644
--- a/external/storm-kafka-migration/pom.xml
+++ b/external/storm-kafka-migration/pom.xml
@@ -36,6 +36,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
         </dependency>
 
         <dependency>
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index d391784..f782272 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -252,8 +252,8 @@
         try {
             String serConf = JSONValue.toJSONString(topoConf);
             try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
-                if (isTopologyNameAllowed(name, client)) {
-                    throw new RuntimeException("Topology with name `" + name + "` is either not allowed or it already exists on cluster");
+                if (!isTopologyNameAllowed(name, client)) {
+                    throw new RuntimeException("Topology name " + name + " is either not allowed or it already exists on the cluster");
                 }
 
                 // Dependency uploading only makes sense for distributed mode
@@ -438,7 +438,7 @@
 
     private static boolean isTopologyNameAllowed(String name, NimbusClient client) {
         try {
-            return !client.getClient().isTopologyNameAllowed(name);
+            return client.getClient().isTopologyNameAllowed(name);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 7337927..503b44a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -247,7 +247,7 @@
         Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
         if (nodeInfoWorkerResourcesMap != null) {
             for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
-                if (entry.getKey().get_node().equals(assignmentId)) {
+                if (entry.getKey().get_node().startsWith(assignmentId)) {
                     Set<Long> ports = entry.getKey().get_port();
                     for (Long port : ports) {
                         slotsResources.put(port, entry.getValue());
@@ -267,7 +267,7 @@
         Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
         if (executorNodePort != null) {
             for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
-                if (entry.getValue().get_node().equals(assignmentId)) {
+                if (entry.getValue().get_node().startsWith(assignmentId)) {
                     for (Long port : entry.getValue().get_port()) {
                         LocalAssignment localAssignment = portTasks.get(port.intValue());
                         if (localAssignment == null) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
index 3960ce9..62f73f2 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -12,12 +12,19 @@
 
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
+import org.apache.storm.ServerConstants;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.ReadClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.SupervisorAssignments;
 import org.apache.storm.thrift.TException;
 import org.apache.storm.utils.ConfigUtils;
@@ -49,14 +56,21 @@
         this.readClusterState = readClusterState;
     }
 
-    private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
-        if (null == assignments) {
+    private static void assignedAssignmentsToLocal(IStormClusterState clusterState,
+                                                   List<SupervisorAssignments> supervisorAssignments) {
+        if (null == supervisorAssignments || supervisorAssignments.isEmpty()) {
             //unknown error, just skip
             return;
         }
         Map<String, byte[]> serAssignments = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
-            serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+        for (SupervisorAssignments supervisorAssignment : supervisorAssignments) {
+            if (supervisorAssignment == null) {
+                //unknown error, just skip
+                continue;
+            }
+            for (Map.Entry<String, Assignment> entry : supervisorAssignment.get_storm_assignment().entrySet()) {
+                serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+            }
         }
         clusterState.syncRemoteAssignments(serAssignments);
     }
@@ -67,7 +81,7 @@
         if (null == assignments) {
             getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
         } else {
-            assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
+            assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), Collections.singletonList(assignments));
         }
         this.readClusterState.run();
     }
@@ -81,7 +95,7 @@
         while (!success) {
             try (NimbusClient master = NimbusClient.getConfiguredClient(supervisor.getConf())) {
                 SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(supervisor.getAssignmentId());
-                assignedAssignmentsToLocal(supervisor.getStormClusterState(), assignments);
+                assignedAssignmentsToLocal(supervisor.getStormClusterState(), Collections.singletonList(assignments));
                 success = true;
             } catch (Exception t) {
                 // just ignore the exception
@@ -99,6 +113,24 @@
 
     }
 
+    public List<SupervisorAssignments> getAllAssignmentsFromNumaSupervisors(
+            Nimbus.Iface nimbus, String node
+    ) throws TException {
+        List<SupervisorAssignments> supervisorAssignmentsList = new ArrayList();
+        Map<String, Object> validatedNumaMap = SupervisorUtils.getNumaMap(supervisor.getConf());
+        for (Map.Entry<String, Object> numaEntry : validatedNumaMap.entrySet()) {
+            String numaId = numaEntry.getKey();
+            SupervisorAssignments assignments = nimbus.getSupervisorAssignments(
+                    node + ServerConstants.NUMA_ID_SEPARATOR + numaId
+            );
+            supervisorAssignmentsList.add(assignments);
+        }
+        SupervisorAssignments assignments = nimbus.getSupervisorAssignments(node);
+        supervisorAssignmentsList.add(assignments);
+
+        return supervisorAssignmentsList;
+    }
+
     /**
      * Used by {@link Supervisor} to fetch assignments when start up.
      * @param conf config
@@ -108,16 +140,19 @@
     public void getAssignmentsFromMaster(Map conf, IStormClusterState clusterState, String node) {
         if (ConfigUtils.isLocalMode(conf)) {
             try {
-                SupervisorAssignments assignments = this.supervisor.getLocalNimbus().getSupervisorAssignments(node);
-                assignedAssignmentsToLocal(clusterState, assignments);
+                List<SupervisorAssignments> supervisorAssignmentsList =
+                        getAllAssignmentsFromNumaSupervisors(
+                                this.supervisor.getLocalNimbus(), node
+                        );
+                assignedAssignmentsToLocal(clusterState, supervisorAssignmentsList);
             } catch (TException e) {
                 LOG.error("Get assignments from local master exception", e);
             }
         } else {
             try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
-                SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(node);
-                LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", assignments);
-                assignedAssignmentsToLocal(clusterState, assignments);
+                List<SupervisorAssignments> supervisorAssignmentsList = getAllAssignmentsFromNumaSupervisors(master.getClient(), node);
+                LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", supervisorAssignmentsList);
+                assignedAssignmentsToLocal(clusterState, supervisorAssignmentsList);
             } catch (Exception t) {
                 LOG.error("Get assignments from master exception", t);
             }