fix mismatch container id between packing alg and aurora (#2582)
* fix packing container id and aurroa container id mismatch
* fix import
* fix import
* fix import 2
* add import
* add import 3
* fix unit tests
* fix style
* fix style 2
* fix style 3
* add fix for aurora json parser
* debug
* update debug
* add debug
* fix optional ornul
* add debug
* add debug print
* update container diff calc
* fix stderr
* fix update packing plan proto
* clean code
* clean shell util
* address comment and fix unit test
* addresscomments
* fix unit test
* add import
* update spi according to ashvin suggestion
* fix style
* fix style 2
* fix ning comment
* fix ning comment
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index ff60879..6ee7033 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -41,6 +41,7 @@
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.PackingPlanProtoDeserializer;
+import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.statemgr.IStateManager;
import com.twitter.heron.spi.statemgr.Lock;
@@ -152,15 +153,30 @@
deactivateTopology(stateManager, topology, proposedPackingPlan);
}
+ Set<PackingPlan.ContainerPlan> updatedContainers =
+ new HashSet<>(proposedPackingPlan.getContainers());
// request new resources if necessary. Once containers are allocated we should make the changes
// to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
if (newContainerCount > 0 && scalableScheduler.isPresent()) {
- scalableScheduler.get().addContainers(containerDelta.getContainersToAdd());
+ Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
+ Set<PackingPlan.ContainerPlan> containersAdded =
+ scalableScheduler.get().addContainers(containersToAdd);
+ // Update the PackingPlan with new container-ids
+ if (containersAdded != null) {
+ updatedContainers.removeAll(containersToAdd);
+ updatedContainers.addAll(containersAdded);
+ }
}
+ PackingPlan updatedPackingPlan =
+ new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
+ PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
+ PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
+ LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);
+
// update packing plan to trigger the scaling event
logInfo("Update new PackingPlan: %s",
- stateManager.updatePackingPlan(proposedProtoPackingPlan, topologyName));
+ stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));
// reactivate topology
if (initiallyRunning) {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java
index 1d018c2..e217fdf 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraCLIController.java
@@ -16,10 +16,12 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -111,29 +113,46 @@
}
@Override
- public void addContainers(Integer count) {
+ public Set<Integer> addContainers(Integer count) {
//aurora job add <cluster>/<role>/<env>/<name>/<instance_id> <count>
//clone instance 0
List<String> auroraCmd = new ArrayList<>(Arrays.asList(
- "aurora", "job", "add", "--wait-until", "RUNNING", jobSpec + "/0", count.toString()));
-
- if (isVerbose) {
- auroraCmd.add("--verbose");
- }
+ "aurora", "job", "add", "--wait-until", "RUNNING",
+ jobSpec + "/0", count.toString(), "--verbose"));
LOG.info(String.format("Requesting %s new aurora containers %s", count, auroraCmd));
- if (!runProcess(auroraCmd)) {
+ StringBuilder stderr = new StringBuilder();
+ if (!runProcess(auroraCmd, null, stderr)) {
throw new RuntimeException("Failed to create " + count + " new aurora instances");
}
+
+ if (stderr.length() <= 0) { // no container was added
+ throw new RuntimeException("empty output by Aurora");
+ }
+ return extractContainerIds(stderr.toString());
+ }
+
+ private Set<Integer> extractContainerIds(String auroraOutputStr) {
+ String pattern = "Querying instance statuses: [";
+ int idx1 = auroraOutputStr.indexOf(pattern);
+ if (idx1 < 0) { // no container was added
+ LOG.info("stdout & stderr by Aurora " + auroraOutputStr);
+ return new HashSet<Integer>();
+ }
+ idx1 += pattern.length();
+ int idx2 = auroraOutputStr.indexOf("]", idx1);
+ String containerIdStr = auroraOutputStr.substring(idx1, idx2);
+ LOG.info("container IDs returned by Aurora " + containerIdStr);
+ return Arrays.asList(containerIdStr.split(", "))
+ .stream().map(x->Integer.valueOf(x)).collect(Collectors.toSet());
}
// Utils method for unit tests
@VisibleForTesting
- boolean runProcess(List<String> auroraCmd) {
- StringBuilder stdout = new StringBuilder();
- StringBuilder stderr = new StringBuilder();
+ boolean runProcess(List<String> auroraCmd, StringBuilder stdout, StringBuilder stderr) {
int status =
- ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]), stderr);
+ ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]),
+ stderr != null ? stderr : new StringBuilder());
if (status != 0) {
LOG.severe(String.format(
@@ -142,6 +161,12 @@
return status == 0;
}
+ // Utils method for unit tests
+ @VisibleForTesting
+ boolean runProcess(List<String> auroraCmd) {
+ return runProcess(auroraCmd, null, null);
+ }
+
private static String getInstancesIdsToKill(Set<PackingPlan.ContainerPlan> containersToRemove) {
StringBuilder ids = new StringBuilder();
for (PackingPlan.ContainerPlan containerPlan : containersToRemove) {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java
index fef0a1c..1f6b20f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraController.java
@@ -33,5 +33,5 @@
boolean restart(Integer containerId);
void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove);
- void addContainers(Integer count);
+ Set<Integer> addContainers(Integer count);
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
index daf28c1..51917b7 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
@@ -121,7 +121,7 @@
}
@Override
- public void addContainers(Integer count) {
- cliController.addContainers(count);
+ public Set<Integer> addContainers(Integer count) {
+ return cliController.addContainers(count);
}
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
index 18aa52a..e51dcf8 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
@@ -16,6 +16,8 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -183,8 +185,28 @@
}
@Override
- public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
- controller.addContainers(containersToAdd.size());
+ public Set<PackingPlan.ContainerPlan> addContainers(
+ Set<PackingPlan.ContainerPlan> containersToAdd) {
+ // Do the actual containers adding
+ LinkedList<Integer> newAddedContainerIds = new LinkedList<>(
+ controller.addContainers(containersToAdd.size()));
+ if (newAddedContainerIds.size() != containersToAdd.size()) {
+ throw new RuntimeException(
+ "Aurora returned differnt countainer count " + newAddedContainerIds.size()
+ + "; input count was " + containersToAdd.size());
+ }
+ Set<PackingPlan.ContainerPlan> remapping = new HashSet<>();
+ // Do the remapping:
+ // use the `newAddedContainerIds` to replace the container id in the `containersToAdd`
+ for (PackingPlan.ContainerPlan cp : containersToAdd) {
+ PackingPlan.ContainerPlan newContainerPlan =
+ new PackingPlan.ContainerPlan(
+ newAddedContainerIds.pop(), cp.getInstances(),
+ cp.getRequiredResource(), cp.getScheduledResource().orNull());
+ remapping.add(newContainerPlan);
+ }
+ LOG.info("The remapping structure: " + remapping);
+ return remapping;
}
@Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java
index 7d95059..5ad785f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/AppsV1beta1Controller.java
@@ -127,7 +127,8 @@
}
@Override
- public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+ public Set<PackingPlan.ContainerPlan>
+ addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
final V1beta1StatefulSet statefulSet;
try {
statefulSet = getStatefulSet();
@@ -147,6 +148,8 @@
throw new TopologyRuntimeManagementException(
ae.getMessage() + "\netails\n" + ae.getResponseBody());
}
+
+ return containersToAdd;
}
@Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
index bdc2701..a299aab 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -138,8 +138,10 @@
* TODO (jrcrawfo) -- (https://github.com/twitter/heron/issues/1981)
*/
@Override
- public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+ public Set<PackingPlan.ContainerPlan>
+ addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
controller.addContainers(containersToAdd);
+ return containersToAdd;
}
/**
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index 7b598ce..27e42b8 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -288,7 +288,7 @@
}
@Override
- public void addContainers(Set<PackingPlan.ContainerPlan> containers) {
+ public Set<PackingPlan.ContainerPlan> addContainers(Set<PackingPlan.ContainerPlan> containers) {
synchronized (processToContainer) {
for (PackingPlan.ContainerPlan container : containers) {
if (processToContainer.values().contains(container.getId())) {
@@ -298,6 +298,7 @@
startExecutor(container.getId(), container.getInstances());
}
}
+ return containers;
}
@Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java
index e269f04..2eccff3 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/YarnScheduler.java
@@ -108,12 +108,14 @@
}
@Override
- public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+ public Set<PackingPlan.ContainerPlan> addContainers(
+ Set<PackingPlan.ContainerPlan> containersToAdd) {
try {
HeronMasterDriverProvider.getInstance().scheduleHeronWorkers(containersToAdd);
} catch (HeronMasterDriver.ContainerAllocationException e) {
throw new RuntimeException("Failed to launch new yarn containers", e);
}
+ return containersToAdd;
}
@Override
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java
index 5c04597..cc357b6 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraCLIControllerTest.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -31,6 +32,8 @@
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.utils.PackingTestUtils;
@@ -144,9 +147,45 @@
"aurora job add --wait-until RUNNING %s/0 %s %s",
JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
- Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
- controller.addContainers(containersToAdd);
- Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock arg0) throws Throwable {
+ final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
+ originalArgument.append("Querying instance statuses: [1, 2, 3]");
+ return true;
+ }
+ }).when(controller).runProcess(
+ Matchers.anyListOf(String.class),
+ Matchers.any(StringBuilder.class),
+ Matchers.any(StringBuilder.class));
+ Set<Integer> ret = controller.addContainers(containersToAdd);
+ Assert.assertEquals(containersToAdd.intValue(), ret.size());
+ Mockito.verify(controller)
+ .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
+ }
+
+ @Test
+ public void testAddContainersFailure() {
+ Integer containersToAdd = 3;
+ List<String> expectedCommand = asList(
+ "aurora job add --wait-until RUNNING %s/0 %s %s",
+ JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG);
+
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock arg0) throws Throwable {
+ final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2];
+ originalArgument.append("Querying instance statuses: x");
+ return true;
+ }
+ }).when(controller).runProcess(
+ Matchers.anyListOf(String.class),
+ Matchers.any(StringBuilder.class),
+ Matchers.any(StringBuilder.class));
+ Set<Integer> ret = controller.addContainers(containersToAdd);
+ Assert.assertEquals(0, ret.size());
+ Mockito.verify(controller)
+ .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any());
}
private static List<String> asList(String command, Object... values) {
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java
index 48c2d91..fb802be 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesControllerTest.java
@@ -72,8 +72,9 @@
}
@Override
- public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
-
+ public Set<PackingPlan.ContainerPlan>
+ addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+ return containersToAdd;
}
@Override
diff --git a/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java b/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java
index beb7f05..a050fcc 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/scheduler/IScalable.java
@@ -30,8 +30,9 @@
* Requests new containers for scaling a topology
*
* @param containersToAdd Set of containers to be added by the scheduler
+ * @return Set of added containers with real ids and resources from the scheduler
*/
- void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd);
+ Set<PackingPlan.ContainerPlan> addContainers(Set<PackingPlan.ContainerPlan> containersToAdd);
/**
* Requests containers to be released for down-scaling a topology.