blob: b6de513e4296ada88149aeed21575c5fbda4b5c8 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler;
import java.util.HashMap;
import java.util.Properties;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.packing.roundrobin.RoundRobinPacking;
import com.twitter.heron.proto.system.PackingPlans;
import com.twitter.heron.scheduler.server.SchedulerServer;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
import com.twitter.heron.scheduler.utils.Shutdown;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.scheduler.IScheduler;
import com.twitter.heron.spi.statemgr.IStateManager;
import com.twitter.heron.spi.utils.PackingTestUtils;
import com.twitter.heron.spi.utils.ReflectionUtils;
import com.twitter.heron.spi.utils.TopologyTests;
import com.twitter.heron.spi.utils.TopologyUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
TopologyUtils.class, ReflectionUtils.class, SchedulerUtils.class, TopologyAPI.Topology.class})
public class SchedulerMainTest {
private static final String STATE_MANAGER_CLASS = "STATE_MANAGER_CLASS";
private static final String SCHEDULER_CLASS = "SCHEDULER_CLASS";
private static final String TOPOLOGY_NAME = "topologyName";
@Rule
public final ExpectedException exception = ExpectedException.none();
private IStateManager stateManager;
private IScheduler scheduler;
private SchedulerMain schedulerMain;
private SchedulerServer schedulerServer;
private String iTopologyName = "topologyName";
/**
* Basic setup before executing a test case
*/
@Before
public void setUp() throws Exception {
Config config = mock(Config.class);
when(config.getStringValue(Key.STATE_MANAGER_CLASS)).
thenReturn(STATE_MANAGER_CLASS);
when(config.getStringValue(Key.SCHEDULER_CLASS)).
thenReturn(SCHEDULER_CLASS);
int iSchedulerServerPort = 0;
TopologyAPI.Topology topology = TopologyTests.createTopology(
iTopologyName, new com.twitter.heron.api.Config(),
new HashMap<String, Integer>(), new HashMap<String, Integer>());
// Mock objects to be verified
stateManager = mock(IStateManager.class);
scheduler = mock(IScheduler.class);
final SettableFuture<PackingPlans.PackingPlan> future = getTestPacking();
when(stateManager.getPackingPlan(null, iTopologyName)).thenReturn(future);
// Mock ReflectionUtils stuff
PowerMockito.spy(ReflectionUtils.class);
PowerMockito.doReturn(stateManager).
when(ReflectionUtils.class, "newInstance", STATE_MANAGER_CLASS);
PowerMockito.doReturn(scheduler).
when(ReflectionUtils.class, "newInstance", SCHEDULER_CLASS);
// Mock objects to be verified
schedulerMain = spy(new SchedulerMain(
config, topology, iSchedulerServerPort, mock(Properties.class)));
schedulerServer = mock(SchedulerServer.class);
doReturn(schedulerServer).when(schedulerMain).getServer(
any(Config.class), eq(scheduler), eq(iSchedulerServerPort));
doReturn(true).when(scheduler).onSchedule(any(PackingPlan.class));
// Mock SchedulerUtils stuff
PowerMockito.spy(SchedulerUtils.class);
PowerMockito.doReturn(true).when(SchedulerUtils.class, "setSchedulerLocation",
any(Config.class), anyString(), eq(scheduler));
// Avoid infinite waiting
Shutdown shutdown = mock(Shutdown.class);
doReturn(shutdown).when(schedulerMain).getShutdown();
}
private SettableFuture<PackingPlans.PackingPlan> getTestPacking() {
PackingPlans.PackingPlan packingPlan =
PackingTestUtils.testProtoPackingPlan("testTopology", new RoundRobinPacking());
final SettableFuture<PackingPlans.PackingPlan> future = SettableFuture.create();
future.set(packingPlan);
return future;
}
// Exceptions during reflection --
// 1. should return false executing runScheduler()
// 2. Nothing should be initialized
@Test
public void testExceptionsInReflections() throws Exception {
PowerMockito.doThrow(new ClassNotFoundException("")).
when(ReflectionUtils.class, "newInstance", STATE_MANAGER_CLASS);
assertFalse(schedulerMain.runScheduler());
verify(stateManager, never()).initialize(any(Config.class));
verify(stateManager, never()).getPackingPlan(null, iTopologyName);
verify(scheduler, never()).initialize(any(Config.class), any(Config.class));
}
// Exceptions during initialize components --
// 1. should bubble up the exceptions directly executing runScheduler()
@Test
public void testExceptionsInInit() throws Exception {
PowerMockito.doReturn(stateManager).
when(ReflectionUtils.class, "newInstance", STATE_MANAGER_CLASS);
doThrow(new RuntimeException()).when(stateManager).initialize(any(Config.class));
exception.expect(RuntimeException.class);
schedulerMain.runScheduler();
// Should not be invoked; thread exited already
fail();
}
// Failed to IScheduler.onSchedule
// 1. should return false executing runScheduler()
// 2. close() should still be invoked
// 3. SchedulerServer should not start
@Test
public void testOnSchedulerFailure() throws Exception {
doNothing().when(stateManager).initialize(any(Config.class));
doReturn(false).when(scheduler).onSchedule(any(PackingPlan.class));
assertFalse(schedulerMain.runScheduler());
verify(stateManager).close();
verify(scheduler).close();
verify(schedulerServer, never()).start();
}
// Exceptions during start the server
// 1. should bubble up the exceptions executing runScheduler()
@Test
public void testExceptionsInStartingServer() throws Exception {
doThrow(new RuntimeException()).when(schedulerServer).start();
exception.expect(RuntimeException.class);
schedulerMain.runScheduler();
// Should not be invoked; thread exited already
fail();
}
// Failed to set SchedulerLocation
// 1. should return false executing runScheduler()
// 2. close() should still be invoked
// 3. SchedulerServer.stop() should be invoked
@Test
public void testSetSchedulerLocationFailure() throws Exception {
PowerMockito.doReturn(false).when(SchedulerUtils.class, "setSchedulerLocation",
any(Config.class), anyString(), eq(scheduler));
assertFalse(schedulerMain.runScheduler());
verify(stateManager).close();
verify(scheduler).close();
verify(schedulerServer).stop();
}
// Happy path
@Test
public void testRunScheduler() throws Exception {
assertTrue(schedulerMain.runScheduler());
}
@Test
public void updateNumContainersIfNeeded() {
int configuredNumContainers = 4;
int configuredNumStreamManagers = configuredNumContainers - 1;
int packingPlanSize = 1;
SubmitterMain submitterMain = new SubmitterMain(null, null);
PackingPlan packingPlan =
PackingTestUtils.testPackingPlan(TOPOLOGY_NAME, new RoundRobinPacking());
assertEquals(packingPlanSize, packingPlan.getContainers().size());
com.twitter.heron.api.Config apiConfig = new com.twitter.heron.api.Config();
apiConfig.setNumStmgrs(configuredNumStreamManagers);
TopologyAPI.Topology initialTopology = TopologyTests.createTopology(
TOPOLOGY_NAME, apiConfig, new HashMap<String, Integer>(), new HashMap<String, Integer>());
Config initialConfig = Config.newBuilder()
.put(Key.NUM_CONTAINERS, configuredNumContainers)
.put(Key.PACKING_CLASS, RoundRobinPacking.class.getName())
.put(Key.TOPOLOGY_DEFINITION, initialTopology)
.build();
// assert preconditions
assertEquals(Integer.toString(configuredNumStreamManagers),
apiConfig.get(com.twitter.heron.api.Config.TOPOLOGY_STMGRS));
assertEquals(configuredNumStreamManagers, TopologyUtils.getNumContainers(initialTopology));
assertContainerCount(configuredNumStreamManagers, initialConfig);
Config newConfig =
submitterMain.updateNumContainersIfNeeded(initialConfig, initialTopology, packingPlan);
assertContainerCount(packingPlanSize, newConfig);
}
private void assertContainerCount(int expectedNumStreamManagers, Config config) {
assertEquals(expectedNumStreamManagers + 1, config.get(Key.NUM_CONTAINERS));
assertEquals(expectedNumStreamManagers,
TopologyUtils.getNumContainers(Runtime.topology(config)));
}
}