blob: 480dd1de96a2054d51bf3867de0de3c904665c64 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.yarn;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.base.Optional;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.packing.roundrobin.RoundRobinPacking;
import org.apache.heron.scheduler.SchedulerMain;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.utils.PackingTestUtils;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.types.NamedParameterNode;
import org.apache.reef.wake.time.event.StartTime;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("jdk.internal.reflect.*")
public class HeronMasterDriverTest {
private EvaluatorRequestor mockRequestor;
private HeronMasterDriver driver;
private HeronMasterDriver spyDriver;
@Before
public void createMocks() throws IOException {
mockRequestor = mock(EvaluatorRequestor.class);
driver = new HeronMasterDriver(mockRequestor,
new REEFFileNames(),
"yarn",
"heron",
"testTopology",
"env",
"jar",
"package",
"core",
0,
false);
spyDriver = spy(driver);
doReturn("").when(spyDriver).getComponentRamMap();
}
@Test
public void requestContainerForWorkerSubmitsValidRequest() {
ByteAmount memory = ByteAmount.fromMegabytes(786);
EvaluatorRequest request = spyDriver.createEvaluatorRequest(5, memory);
doReturn(request).when(spyDriver).createEvaluatorRequest(5, memory);
HeronMasterDriver.HeronWorker worker = new HeronMasterDriver.HeronWorker(3, 5, memory);
spyDriver.requestContainerForWorker(3, worker);
verify(mockRequestor, times(1)).submit(request);
}
@Test
public void scheduleHeronWorkersRequestsContainersForPacking() throws Exception {
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
PackingPlan.ContainerPlan container1 = PackingTestUtils.testContainerPlan(1, 1, 2);
containers.add(container1);
PackingPlan.ContainerPlan container2 = PackingTestUtils.testContainerPlan(2, 1, 2, 3);
containers.add(container2);
PackingPlan packing = new PackingPlan("packingId", containers);
spyDriver.scheduleHeronWorkers(packing);
verify(mockRequestor, times(2)).submit(any(EvaluatorRequest.class));
verify(spyDriver, times(1)).requestContainerForWorker(eq(1), anyHeronWorker());
verify(spyDriver, times(1)).requestContainerForWorker(eq(2), anyHeronWorker());
verify(spyDriver, times(1)).createEvaluatorRequest(getCpu(container1), getRam(container1));
verify(spyDriver, times(1)).createEvaluatorRequest(getCpu(container2), getRam(container2));
}
@Test
public void onKillClosesContainersKillsTManager() throws Exception {
HeronMasterDriver.TManager mockTManager = mock(HeronMasterDriver.TManager.class);
when(spyDriver.buildTManager(any(ExecutorService.class))).thenReturn(mockTManager);
int numContainers = 3;
AllocatedEvaluator[] mockEvaluators = createApplicationWithContainers(numContainers);
spyDriver.launchTManager();
spyDriver.killTopology();
for (int id = 0; id < numContainers; id++) {
Mockito.verify(mockEvaluators[id]).close();
assertFalse(spyDriver.lookupByEvaluatorId("e" + id).isPresent());
}
verify(mockTManager, times(1)).killTManager();
}
/**
* Tests if all workers are killed and restarted
*/
@Test
public void restartTopologyClosesAndStartsContainers() throws Exception {
int numContainers = 3;
AllocatedEvaluator[] mockEvaluators = createApplicationWithContainers(numContainers);
verify(spyDriver, never()).requestContainerForWorker(anyInt(), anyHeronWorker());
spyDriver.restartTopology();
for (int id = 0; id < numContainers; id++) {
verify(spyDriver, times(1)).requestContainerForWorker(eq(id), anyHeronWorker());
Mockito.verify(mockEvaluators[id]).close();
}
}
@Test
public void restartWorkerRestartsSpecificWorker() throws Exception {
int numContainers = 3;
AllocatedEvaluator[] mockEvaluators = createApplicationWithContainers(numContainers);
verify(spyDriver, never()).requestContainerForWorker(anyInt(), anyHeronWorker());
spyDriver.restartWorker(1);
for (int id = 0; id < numContainers; id++) {
if (id == 1) {
verify(spyDriver, times(1)).requestContainerForWorker(eq(id), anyHeronWorker());
Mockito.verify(mockEvaluators[1]).close();
assertFalse(spyDriver.lookupByEvaluatorId("e" + id).isPresent());
continue;
}
verify(mockEvaluators[id], never()).close();
assertEquals(Integer.valueOf(id), spyDriver.lookupByEvaluatorId("e" + id).get());
}
}
@Test
public void onNextFailedEvaluatorRestartsContainer() throws Exception {
int numContainers = 3;
AllocatedEvaluator[] mockEvaluators = createApplicationWithContainers(numContainers);
FailedEvaluator mockFailedContainer = mock(FailedEvaluator.class);
when(mockFailedContainer.getId()).thenReturn("e1");
verify(spyDriver, never()).requestContainerForWorker(anyInt(), anyHeronWorker());
spyDriver.new FailedContainerHandler().onNext(mockFailedContainer);
for (int id = 0; id < numContainers; id++) {
if (id == 1) {
verify(spyDriver, times(1)).requestContainerForWorker(eq(id), anyHeronWorker());
assertFalse(spyDriver.lookupByEvaluatorId("e" + id).isPresent());
continue;
}
verify(mockEvaluators[id], never()).close();
assertEquals(Integer.valueOf(id), spyDriver.lookupByEvaluatorId("e" + id).get());
}
}
@Test
public void createContextConfigCreatesForGivenWorkerId() {
Configuration config = driver.createContextConfig(4);
boolean found = false;
for (NamedParameterNode<?> namedParameterNode : config.getNamedParameters()) {
if (namedParameterNode.getName().equals(ContextIdentifier.class.getSimpleName())) {
Assert.assertEquals("4", config.getNamedParameter(namedParameterNode));
found = true;
}
}
assertTrue("ContextIdentifier didn't exist.", found);
}
@Test(expected = HeronMasterDriver.ContainerAllocationException.class)
public void scheduleHeronWorkersFailsOnDuplicateRequest() throws Exception {
PackingPlan packingPlan = PackingTestUtils.testPackingPlan("test", new RoundRobinPacking());
spyDriver.scheduleHeronWorkers(packingPlan);
verify(spyDriver, times(1)).requestContainerForWorker(eq(1), anyHeronWorker());
verify(mockRequestor, times(1)).submit(any(EvaluatorRequest.class));
PackingPlan.ContainerPlan duplicatePlan = PackingTestUtils.testContainerPlan(1);
Set<PackingPlan.ContainerPlan> toBeAddedContainerPlans = new HashSet<>();
toBeAddedContainerPlans.add(duplicatePlan);
spyDriver.scheduleHeronWorkers(toBeAddedContainerPlans);
}
@Test
public void scheduleHeronWorkersAddsContainers() throws Exception {
PackingPlan packingPlan = PackingTestUtils.testPackingPlan("test", new RoundRobinPacking());
spyDriver.scheduleHeronWorkers(packingPlan);
verify(spyDriver, times(1)).requestContainerForWorker(eq(1), anyHeronWorker());
verify(mockRequestor, times(1)).submit(any(EvaluatorRequest.class));
Set<PackingPlan.ContainerPlan> toBeAddedContainerPlans = new HashSet<>();
toBeAddedContainerPlans.add(PackingTestUtils.testContainerPlan(2));
toBeAddedContainerPlans.add(PackingTestUtils.testContainerPlan(3));
spyDriver.scheduleHeronWorkers(toBeAddedContainerPlans);
verify(spyDriver, times(1)).requestContainerForWorker(eq(2), anyHeronWorker());
verify(spyDriver, times(1)).requestContainerForWorker(eq(3), anyHeronWorker());
verify(mockRequestor, times(3)).submit(any(EvaluatorRequest.class));
}
@Test
public void killWorkersTerminatesSpecificContainers() throws Exception {
int numContainers = 5;
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
for (int id = 0; id < numContainers; id++) {
containers.add(PackingTestUtils.testContainerPlan(id));
}
PackingPlan packingPlan = new PackingPlan("packing", containers);
spyDriver.scheduleHeronWorkers(packingPlan);
for (int id = 0; id < numContainers; id++) {
verify(spyDriver, times(1)).requestContainerForWorker(eq(id), anyHeronWorker());
assertTrue(spyDriver.lookupByContainerPlan(id).isPresent());
}
verify(mockRequestor, times(numContainers)).submit(any(EvaluatorRequest.class));
AllocatedEvaluator[] mockEvaluators = createApplicationWithContainers(numContainers);
Set<PackingPlan.ContainerPlan> containersTobeDeleted = new HashSet<>();
containersTobeDeleted.add(PackingTestUtils.testContainerPlan(2));
containersTobeDeleted.add(PackingTestUtils.testContainerPlan(3));
spyDriver.killWorkers(containersTobeDeleted);
for (int id = 0; id < numContainers; id++) {
if (id == 2 || id == 3) {
verify(mockEvaluators[id], times(1)).close();
assertFalse(spyDriver.lookupByContainerPlan(id).isPresent());
assertFalse(spyDriver.lookupByEvaluatorId("e" + id).isPresent());
continue;
}
verify(mockEvaluators[id], never()).close();
assertTrue(spyDriver.lookupByContainerPlan(id).isPresent());
assertTrue(spyDriver.lookupByEvaluatorId("e" + id).isPresent());
}
}
@Test
public void findLargestFittingWorkerReturnsLargestWorker() {
Set<HeronMasterDriver.HeronWorker> workers = new HashSet<>();
workers.add(new HeronMasterDriver.HeronWorker(1, 3, ByteAmount.fromGigabytes(3)));
workers.add(new HeronMasterDriver.HeronWorker(2, 7, ByteAmount.fromGigabytes(7)));
workers.add(new HeronMasterDriver.HeronWorker(3, 5, ByteAmount.fromGigabytes(5)));
workers.add(new HeronMasterDriver.HeronWorker(4, 1, ByteAmount.fromGigabytes(1)));
// enough memory and cores to fit largest container, 2
verifyFittingContainer(workers, 7 * 1024 + 100, 7, 2);
// enough to fit 3 but not container 2
verifyFittingContainer(workers, 5 * 1024 + 100, 6, 3);
// enough memory but not enough cores for container 2
verifyFittingContainer(workers, 7 * 1024 + 100, 5, 3);
// enough cores but not enough memory for container 2
verifyFittingContainer(workers, 5 * 1024 + 100, 7, 3);
}
private void verifyFittingContainer(Set<HeronMasterDriver.HeronWorker> containers,
int ram,
int cores,
int expectedContainer) {
EvaluatorDescriptor evaluatorDescriptor = mock(EvaluatorDescriptor.class);
AllocatedEvaluator mockEvaluator = mock(AllocatedEvaluator.class);
when(mockEvaluator.getEvaluatorDescriptor()).thenReturn(evaluatorDescriptor);
when(evaluatorDescriptor.getMemory()).thenReturn(ram);
when(evaluatorDescriptor.getNumberOfCores()).thenReturn(cores);
Optional<HeronMasterDriver.HeronWorker> worker =
spyDriver.findLargestFittingWorker(mockEvaluator, containers, false);
assertTrue(worker.isPresent());
assertEquals(expectedContainer, worker.get().getWorkerId());
}
@Test
public void fitBiggestContainerIgnoresCoresIfMissing() {
Set<HeronMasterDriver.HeronWorker> workers = new HashSet<>();
workers.add(new HeronMasterDriver.HeronWorker(1, 3, ByteAmount.fromGigabytes(3)));
AllocatedEvaluator mockEvaluator = createMockEvaluator("test", 1, ByteAmount.fromGigabytes(3));
Optional<HeronMasterDriver.HeronWorker> result =
spyDriver.findLargestFittingWorker(mockEvaluator, workers, false);
Assert.assertFalse(result.isPresent());
result = spyDriver.findLargestFittingWorker(mockEvaluator, workers, true);
assertTrue(result.isPresent());
assertEquals(1, result.get().getWorkerId());
}
@Test
public void onNextAllocatedEvaluatorStartsWorker() throws Exception {
PackingPlan packingPlan = PackingTestUtils.testPackingPlan("test", new RoundRobinPacking());
spyDriver.scheduleHeronWorkers(packingPlan);
assertTrue(spyDriver.lookupByContainerPlan(1).isPresent());
PackingPlan.ContainerPlan containerPlan = spyDriver.lookupByContainerPlan(1).get();
AllocatedEvaluator mockEvaluator =
createMockEvaluator("test", getCpu(containerPlan), getRam(containerPlan));
assertFalse(spyDriver.lookupByEvaluatorId("test").isPresent());
spyDriver.new ContainerAllocationHandler().onNext(mockEvaluator);
assertTrue(spyDriver.lookupByEvaluatorId("test").isPresent());
assertEquals(Integer.valueOf(1), spyDriver.lookupByEvaluatorId("test").get());
verify(mockEvaluator, times(1)).submitContext(any(Configuration.class));
}
@Test
public void onNextAllocatedEvaluatorDiscardsExtraWorker() throws Exception {
AllocatedEvaluator mockEvaluator
= createMockEvaluator("test", 1, ByteAmount.fromMegabytes(123));
assertFalse(spyDriver.lookupByEvaluatorId("test").isPresent());
spyDriver.new ContainerAllocationHandler().onNext(mockEvaluator);
assertFalse(spyDriver.lookupByEvaluatorId("test").isPresent());
verify(mockEvaluator, never()).submitContext(any(Configuration.class));
}
@Test
public void tManagerLaunchLaunchesExecutorForTManager() throws Exception {
ExecutorService executorService = mock(ExecutorService.class);
HeronMasterDriver.TManager tManager = spyDriver.buildTManager(executorService);
doReturn(mock(Future.class)).when(executorService).submit(tManager);
tManager.launch();
verify(executorService, times(1)).submit(tManager);
}
@Test
public void tManagerKillTerminatesTManager() throws Exception {
ExecutorService mockExecutorService = mock(ExecutorService.class);
HeronMasterDriver.TManager tManager = spyDriver.buildTManager(mockExecutorService);
Future<?> mockFuture = mock(Future.class);
doReturn(mockFuture).when(mockExecutorService).submit(tManager);
tManager.launch();
tManager.killTManager();
verify(mockFuture, times(1)).cancel(true);
verify(mockExecutorService, times(1)).shutdownNow();
}
@Test
public void tManagerLaunchRestartsTManagerOnFailure() throws Exception {
HeronMasterDriver.TManager tManager =
spy(spyDriver.buildTManager(Executors.newSingleThreadExecutor()));
HeronExecutorTask mockTask = mock(HeronExecutorTask.class);
final CountDownLatch testLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
testLatch.await();
return null;
}
}).when(mockTask).startExecutor();
doReturn(mockTask).when(tManager).getTManagerExecutorTask();
tManager.launch();
verify(mockTask, timeout(1000).times(1)).startExecutor();
testLatch.countDown();
//retries if tmanager ends for some reason
verify(mockTask, timeout(1000).times(3)).startExecutor();
}
@Test
@PrepareForTest({HeronReefUtils.class, SchedulerMain.class})
public void onNextStartTimeStartsSchedulerTManager() throws Exception {
PowerMockito.spy(HeronReefUtils.class);
PowerMockito.doNothing().when(HeronReefUtils.class,
"extractPackageInSandbox",
anyString(),
anyString(),
anyString());
SchedulerMain mockScheduler = mock(SchedulerMain.class);
PowerMockito.spy(SchedulerMain.class);
PowerMockito.doReturn(mockScheduler).when(SchedulerMain.class,
"createInstance",
anyString(),
anyString(),
anyString(),
anyString(),
anyString(),
eq(0),
eq(false));
spyDriver.new HeronSchedulerLauncher().onNext(new StartTime(System.currentTimeMillis()));
verify(mockScheduler, times(1)).runScheduler();
}
private AllocatedEvaluator[] createApplicationWithContainers(int numContainers) {
AllocatedEvaluator[] mockEvaluators = new AllocatedEvaluator[numContainers];
for (int id = 0; id < numContainers; id++) {
mockEvaluators[id]
= simulateContainerAllocation("e" + id, 1, ByteAmount.fromMegabytes(123), id);
}
for (int id = 0; id < numContainers; id++) {
assertEquals(Integer.valueOf(id), spyDriver.lookupByEvaluatorId("e" + id).get());
verify(mockEvaluators[id], times(1)).submitContext(anyConfiguration());
verify(mockEvaluators[id], never()).close();
}
return mockEvaluators;
}
private AllocatedEvaluator simulateContainerAllocation(String evaluatorId,
int cores,
ByteAmount ram,
int workerId) {
AllocatedEvaluator evaluator = createMockEvaluator(evaluatorId, cores, ram);
HeronMasterDriver.HeronWorker worker = new HeronMasterDriver.HeronWorker(workerId, cores, ram);
Set<HeronMasterDriver.HeronWorker> workers = new HashSet<>();
workers.add(worker);
doReturn(workers).when(spyDriver).getWorkersAwaitingAllocation();
doReturn(Optional.of(worker)).when(spyDriver)
.findLargestFittingWorker(eq(evaluator), eq(workers), eq(false));
spyDriver.new ContainerAllocationHandler().onNext(evaluator);
return evaluator;
}
private AllocatedEvaluator createMockEvaluator(String evaluatorId, int cores, ByteAmount mem) {
EvaluatorDescriptor descriptor = mock(EvaluatorDescriptor.class);
when(descriptor.getMemory()).thenReturn(((Long) mem.asMegabytes()).intValue());
when(descriptor.getNumberOfCores()).thenReturn(cores);
AllocatedEvaluator mockEvaluator = mock(AllocatedEvaluator.class);
when(mockEvaluator.getEvaluatorDescriptor()).thenReturn(descriptor);
when(mockEvaluator.getId()).thenReturn(evaluatorId);
return mockEvaluator;
}
private ByteAmount getRam(PackingPlan.ContainerPlan container) {
return container.getRequiredResource().getRam();
}
private int getCpu(PackingPlan.ContainerPlan container) {
return (int) Math.ceil(container.getRequiredResource().getCpu());
}
private Configuration anyConfiguration() {
return Mockito.any(Configuration.class);
}
private HeronMasterDriver.HeronWorker anyHeronWorker() {
return any(HeronMasterDriver.HeronWorker.class);
}
}