fix mismatch container id between packing alg and aurora (#2582)

* fix packing container id and aurroa container id mismatch

* fix import

* fix import

* fix import 2

* add import

* add import 3

* fix unit tests

* fix style

* fix style 2

* fix style 3

* add fix for aurora json parser

* debug

* update debug

* add debug

* fix optional ornul

* add debug

* add debug print

* update container diff calc

* fix stderr

* fix update packing plan proto

* clean code

* clean shell util

* address comment and fix unit test

* addresscomments

* fix unit test

* add import

* update spi according to ashvin suggestion

* fix style

* fix style 2

* fix ning comment

* fix ning comment
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index ff60879..6ee7033 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -41,6 +41,7 @@
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.packing.PackingPlanProtoDeserializer;
+import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
 import com.twitter.heron.spi.scheduler.IScalable;
 import com.twitter.heron.spi.statemgr.IStateManager;
 import com.twitter.heron.spi.statemgr.Lock;
@@ -152,15 +153,30 @@
       deactivateTopology(stateManager, topology, proposedPackingPlan);
     }
 
+    Set<PackingPlan.ContainerPlan> updatedContainers =
+        new HashSet<>(proposedPackingPlan.getContainers());
     // request new resources if necessary. Once containers are allocated we should make the changes
     // to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
     if (newContainerCount > 0 && scalableScheduler.isPresent()) {
-      scalableScheduler.get().addContainers(containerDelta.getContainersToAdd());
+      Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
+      Set<PackingPlan.ContainerPlan> containersAdded =
+          scalableScheduler.get().addContainers(containersToAdd);
+      // Update the PackingPlan with new container-ids
+      if (containersAdded != null) {
+        updatedContainers.removeAll(containersToAdd);
+        updatedContainers.addAll(containersAdded);
+      }
     }
 
+    PackingPlan updatedPackingPlan =
+        new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
+    PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
+    PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
+    LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);
+
     // update packing plan to trigger the scaling event
     logInfo("Update new PackingPlan: %s",
-        stateManager.updatePackingPlan(proposedProtoPackingPlan, topologyName));
+        stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));
 
     // reactivate topology
     if (initiallyRunning) {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java
index 1d018c2..e217fdf 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java
@@ -16,10 +16,12 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -111,29 +113,46 @@
   }
 
   @Override
-  public void addContainers(Integer count) {
+  public Set<Integer> addContainers(Integer count) {
     //aurora job add <cluster>/<role>/<env>/<name>/<instance_id> <count>
     //clone instance 0
     List<String> auroraCmd = new ArrayList<>(Arrays.asList(
-        "aurora", "job", "add", "--wait-until", "RUNNING", jobSpec + "/0", count.toString()));
-
-    if (isVerbose) {
-      auroraCmd.add("--verbose");
-    }
+        "aurora", "job", "add", "--wait-until", "RUNNING",
+        jobSpec + "/0", count.toString(), "--verbose"));
 
     LOG.info(String.format("Requesting %s new aurora containers %s", count, auroraCmd));
-    if (!runProcess(auroraCmd)) {
+    StringBuilder stderr = new StringBuilder();
+    if (!runProcess(auroraCmd, null, stderr)) {
       throw new RuntimeException("Failed to create " + count + " new aurora instances");
     }
+
+    if (stderr.length() <= 0) { // no container was added
+      throw new RuntimeException("empty output by Aurora");
+    }
+    return extractContainerIds(stderr.toString());
+  }
+
+  private Set<Integer> extractContainerIds(String auroraOutputStr) {
+    String pattern = "Querying instance statuses: [";
+    int idx1 = auroraOutputStr.indexOf(pattern);
+    if (idx1 < 0) { // no container was added
+      LOG.info("stdout & stderr by Aurora " + auroraOutputStr);
+      return new HashSet<Integer>();
+    }
+    idx1 += pattern.length();
+    int idx2 = auroraOutputStr.indexOf("]", idx1);
+    String containerIdStr = auroraOutputStr.substring(idx1, idx2);
+    LOG.info("container IDs returned by Aurora " + containerIdStr);
+    return Arrays.asList(containerIdStr.split(", "))
+        .stream().map(x->Integer.valueOf(x)).collect(Collectors.toSet());
   }
 
   // Utils method for unit tests
   @VisibleForTesting
-  boolean runProcess(List<String> auroraCmd) {
-    StringBuilder stdout = new StringBuilder();
-    StringBuilder stderr = new StringBuilder();
+  boolean runProcess(List<String> auroraCmd, StringBuilder stdout, StringBuilder stderr) {
     int status =
-        ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]), stderr);
+        ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]),
+            stderr != null ? stderr : new StringBuilder());
 
     if (status != 0) {
       LOG.severe(String.format(
@@ -142,6 +161,12 @@
     return status == 0;
   }
 
+  // Utils method for unit tests
+  @VisibleForTesting
+  boolean runProcess(List<String> auroraCmd) {
+    return runProcess(auroraCmd, null, null);
+  }
+
   private static String getInstancesIdsToKill(Set<PackingPlan.ContainerPlan> containersToRemove) {
     StringBuilder ids = new StringBuilder();
     for (PackingPlan.ContainerPlan containerPlan : containersToRemove) {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java
index fef0a1c..1f6b20f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java
@@ -33,5 +33,5 @@
   boolean restart(Integer containerId);
 
   void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove);
-  void addContainers(Integer count);
+  Set<Integer> addContainers(Integer count);
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
index daf28c1..51917b7 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
@@ -121,7 +121,7 @@
   }
 
   @Override
-  public void addContainers(Integer count) {
-    cliController.addContainers(count);
+  public Set<Integer> addContainers(Integer count) {
+    return cliController.addContainers(count);
   }
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
index 18aa52a..e51dcf8 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
@@ -16,6 +16,8 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -183,8 +185,28 @@
   }
 
   @Override
-  public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
-    controller.addContainers(containersToAdd.size());
+  public Set<PackingPlan.ContainerPlan> addContainers(
+      Set<PackingPlan.ContainerPlan> containersToAdd) {
+    // Do the actual containers adding
+    LinkedList<Integer> newAddedContainerIds = new LinkedList<>(
+        controller.addContainers(containersToAdd.size()));
+    if (newAddedContainerIds.size() != containersToAdd.size()) {
+      throw new RuntimeException(
+          "Aurora returned differnt countainer count " + newAddedContainerIds.size()
+          + "; input count was " + containersToAdd.size());
+    }
+    Set<PackingPlan.ContainerPlan> remapping = new HashSet<>();
+    // Do the remapping:
+    // use the `newAddedContainerIds` to replace the container id in the `containersToAdd`
+    for (PackingPlan.ContainerPlan cp : containersToAdd) {
+      PackingPlan.ContainerPlan newContainerPlan =
+          new PackingPlan.ContainerPlan(
+              newAddedContainerIds.pop(), cp.getInstances(),
+              cp.getRequiredResource(), cp.getScheduledResource().orNull());
+      remapping.add(newContainerPlan);
+    }
+    LOG.info("The remapping structure: " + remapping);
+    return remapping;
   }
 
   @Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java
index 7d95059..5ad785f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java
@@ -127,7 +127,8 @@
   }
 
   @Override
-  public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+  public Set<PackingPlan.ContainerPlan>
+      addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
     final V1beta1StatefulSet statefulSet;
     try {
       statefulSet = getStatefulSet();
@@ -147,6 +148,8 @@
       throw new TopologyRuntimeManagementException(
           ae.getMessage() + "\netails\n" + ae.getResponseBody());
     }
+
+    return containersToAdd;
   }
 
   @Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
index bdc2701..a299aab 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -138,8 +138,10 @@
    * TODO (jrcrawfo) -- (https://github.com/twitter/heron/issues/1981)
    */
   @Override
-  public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+  public Set<PackingPlan.ContainerPlan>
+      addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
     controller.addContainers(containersToAdd);
+    return containersToAdd;
   }
 
   /**
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index 7b598ce..27e42b8 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -288,7 +288,7 @@
   }
 
   @Override
-  public void addContainers(Set<PackingPlan.ContainerPlan> containers) {
+  public Set<PackingPlan.ContainerPlan> addContainers(Set<PackingPlan.ContainerPlan> containers) {
     synchronized (processToContainer) {
       for (PackingPlan.ContainerPlan container : containers) {
         if (processToContainer.values().contains(container.getId())) {
@@ -298,6 +298,7 @@
         startExecutor(container.getId(), container.getInstances());
       }
     }
+    return containers;
   }
 
   @Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java
index e269f04..2eccff3 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java
@@ -108,12 +108,14 @@
   }
 
   @Override
-  public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+  public Set<PackingPlan.ContainerPlan> addContainers(
+      Set<PackingPlan.ContainerPlan> containersToAdd) {
     try {
       HeronMasterDriverProvider.getInstance().scheduleHeronWorkers(containersToAdd);
     } catch (HeronMasterDriver.ContainerAllocationException e) {
       throw new RuntimeException("Failed to launch new yarn containers", e);
     }
+    return containersToAdd;
   }
 
   @Override
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java
index 5c04597..cc357b6 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java
@@ -20,6 +20,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -31,6 +32,8 @@
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.utils.PackingTestUtils;
@@ -144,9 +147,45 @@
         "aurora job add --wait-until RUNNING %s/0 %s %s",
         JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
 
-    Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
-    controller.addContainers(containersToAdd);
-    Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
+    Mockito.doAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock arg0) throws Throwable {
+        final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
+        originalArgument.append("Querying instance statuses: [1, 2, 3]");
+        return true;
+      }
+    }).when(controller).runProcess(
+                    Matchers.anyListOf(String.class),
+                    Matchers.any(StringBuilder.class),
+                    Matchers.any(StringBuilder.class));
+    Set<Integer> ret = controller.addContainers(containersToAdd);
+    Assert.assertEquals(containersToAdd.intValue(), ret.size());
+    Mockito.verify(controller)
+        .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
+  }
+
+  @Test
+  public void testAddContainersFailure() {
+    Integer containersToAdd = 3;
+    List<String> expectedCommand = asList(
+        "aurora job add --wait-until RUNNING %s/0 %s %s",
+        JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
+
+    Mockito.doAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock arg0) throws Throwable {
+        final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
+        originalArgument.append("Querying instance statuses: x");
+        return true;
+      }
+    }).when(controller).runProcess(
+                    Matchers.anyListOf(String.class),
+                    Matchers.any(StringBuilder.class),
+                    Matchers.any(StringBuilder.class));
+    Set<Integer> ret = controller.addContainers(containersToAdd);
+    Assert.assertEquals(0, ret.size());
+    Mockito.verify(controller)
+        .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
   }
 
   private static List<String> asList(String command, Object... values) {
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java
index 48c2d91..fb802be 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java
@@ -72,8 +72,9 @@
       }
 
       @Override
-      public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
-
+      public Set<PackingPlan.ContainerPlan>
+          addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+        return containersToAdd;
       }
 
       @Override
diff --git a/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java b/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java
index beb7f05..a050fcc 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java
@@ -30,8 +30,9 @@
    * Requests new containers for scaling a topology
    *
    * @param containersToAdd Set of containers to be added by the scheduler
+   * @return Set of added containers with real ids and resources from the scheduler
    */
-  void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd);
+  Set<PackingPlan.ContainerPlan> addContainers(Set<PackingPlan.ContainerPlan> containersToAdd);
 
   /**
    * Requests containers to be released for down-scaling a topology.