blob: a73a7e0676d89a91d331c952ff77163ea47bbd1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.aurora;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.commons.cli.CommandLine;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.utils.topology.TopologyTests;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.proto.system.PackingPlans;
import org.apache.heron.scheduler.SubmitterMain;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.common.TokenSub;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.utils.PackingTestUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("jdk.internal.reflect.*")
@PrepareForTest({TokenSub.class, Config.class})
public class AuroraSchedulerTest {
private static final String AURORA_PATH = "path.aurora";
private static final String PACKING_PLAN_ID = "packing.plan.id";
private static final String TOPOLOGY_NAME = "topologyName";
private static final int CONTAINER_ID = 7;
private static AuroraScheduler scheduler;
@Before
public void setUp() throws Exception {
}
@After
public void after() throws Exception {
}
@BeforeClass
public static void beforeClass() throws Exception {
scheduler = Mockito.spy(AuroraScheduler.class);
doReturn(new HashMap<String, String>())
.when(scheduler).createAuroraProperties(Mockito.any(Resource.class));
}
@AfterClass
public static void afterClass() throws Exception {
scheduler.close();
}
/**
* Tests that we can schedule
*/
@Test
public void testOnSchedule() throws Exception {
AuroraController controller = Mockito.mock(AuroraController.class);
doReturn(controller).when(scheduler).getController();
SchedulerStateManagerAdaptor stateManager = mock(SchedulerStateManagerAdaptor.class);
Config runtime = Mockito.mock(Config.class);
when(runtime.get(Key.SCHEDULER_STATE_MANAGER_ADAPTOR)).thenReturn(stateManager);
when(runtime.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
Config mConfig = Mockito.mock(Config.class);
PowerMockito.mockStatic(Config.class);
when(Config.toClusterMode(mConfig)).thenReturn(mConfig);
when(mConfig.getStringValue(eq(AuroraContext.JOB_TEMPLATE),
anyString())).thenReturn(AURORA_PATH);
scheduler.initialize(mConfig, runtime);
// Fail to schedule due to null PackingPlan
Assert.assertFalse(scheduler.onSchedule(null));
PackingPlan plan = new PackingPlan(PACKING_PLAN_ID, new HashSet<PackingPlan.ContainerPlan>());
assertTrue(plan.getContainers().isEmpty());
// Fail to schedule due to PackingPlan is empty
Assert.assertFalse(scheduler.onSchedule(plan));
// Construct valid PackingPlan
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(PackingTestUtils.testContainerPlan(CONTAINER_ID));
PackingPlan validPlan = new PackingPlan(PACKING_PLAN_ID, containers);
// Failed to create job via controller
doReturn(false).when(controller)
.createJob(Matchers.anyMapOf(AuroraField.class, String.class),
Matchers.anyMapOf(String.class, String.class));
doReturn(true).when(stateManager)
.updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
Assert.assertFalse(scheduler.onSchedule(validPlan));
Mockito.verify(controller)
.createJob(Matchers.anyMapOf(AuroraField.class, String.class),
Matchers.anyMapOf(String.class, String.class));
Mockito.verify(stateManager)
.updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
// Happy path
doReturn(true).when(controller)
.createJob(Matchers.anyMapOf(AuroraField.class, String.class),
Matchers.anyMapOf(String.class, String.class));
assertTrue(scheduler.onSchedule(validPlan));
Mockito.verify(controller, Mockito.times(2))
.createJob(Matchers.anyMapOf(AuroraField.class, String.class),
Matchers.anyMapOf(String.class, String.class));
Mockito.verify(stateManager, Mockito.times(2))
.updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
}
@Test
public void testOnKill() throws Exception {
Config mockConfig = Mockito.mock(Config.class);
PowerMockito.mockStatic(Config.class);
when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
AuroraController controller = Mockito.mock(AuroraController.class);
doReturn(controller).when(scheduler).getController();
scheduler.initialize(mockConfig, Mockito.mock(Config.class));
// Failed to kill job via controller
doReturn(false).when(controller).killJob();
Assert.assertFalse(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance()));
Mockito.verify(controller).killJob();
// Happy path
doReturn(true).when(controller).killJob();
assertTrue(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance()));
Mockito.verify(controller, Mockito.times(2)).killJob();
}
@Test
public void testOnRestart() throws Exception {
Config mockConfig = Mockito.mock(Config.class);
PowerMockito.mockStatic(Config.class);
when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
AuroraController controller = Mockito.mock(AuroraController.class);
doReturn(controller).when(scheduler).getController();
scheduler.initialize(mockConfig, Mockito.mock(Config.class));
// Construct the RestartTopologyRequest
int containerToRestart = 1;
Scheduler.RestartTopologyRequest restartTopologyRequest =
Scheduler.RestartTopologyRequest.newBuilder().
setTopologyName(TOPOLOGY_NAME).setContainerIndex(containerToRestart).
build();
// Failed to kill job via controller
doReturn(false).when(
controller).restart(containerToRestart);
Assert.assertFalse(scheduler.onRestart(restartTopologyRequest));
Mockito.verify(controller).restart(containerToRestart);
// Happy path
doReturn(true).when(
controller).restart(containerToRestart);
assertTrue(scheduler.onRestart(restartTopologyRequest));
Mockito.verify(controller, Mockito.times(2)).restart(containerToRestart);
}
@Test
public void testGetJobLinks() throws Exception {
final String JOB_LINK_FORMAT = "http://go/${CLUSTER}/${ROLE}/${ENVIRON}/${TOPOLOGY}";
final String SUBSTITUTED_JOB_LINK = "http://go/local/heron/test/test_topology";
Config mockConfig = Mockito.mock(Config.class);
when(mockConfig.getStringValue(AuroraContext.JOB_LINK_TEMPLATE))
.thenReturn(JOB_LINK_FORMAT);
PowerMockito.mockStatic(Config.class);
when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig);
AuroraController controller = Mockito.mock(AuroraController.class);
doReturn(controller).when(scheduler).getController();
scheduler.initialize(mockConfig, Mockito.mock(Config.class));
PowerMockito.spy(TokenSub.class);
PowerMockito.doReturn(SUBSTITUTED_JOB_LINK)
.when(TokenSub.class, "substitute", mockConfig, JOB_LINK_FORMAT);
List<String> result = scheduler.getJobLinks();
assertEquals(1, result.size());
assertTrue(result.get(0).equals(SUBSTITUTED_JOB_LINK));
}
@Test
public void testProperties() throws URISyntaxException {
TopologyAPI.Topology topology = TopologyTests.createTopology(
TOPOLOGY_NAME, new org.apache.heron.api.Config(),
"spoutName", "boltName", 1, 1);
Config runtime = mock(Config.class);
when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(topology);
when(runtime.get(Key.TOPOLOGY_PACKAGE_URI)).thenReturn(new URI("http://foo/bar"));
// This must mimic how SubmitterMain loads configs
CommandLine commandLine = mock(CommandLine.class);
when(commandLine.getOptionValue("cluster")).thenReturn("some_cluster");
when(commandLine.getOptionValue("role")).thenReturn("some_role");
when(commandLine.getOptionValue("environment")).thenReturn("some_env");
when(commandLine.getOptionValue("heron_home")).thenReturn("/some/heron/home");
when(commandLine.getOptionValue("config_path")).thenReturn("/some/config/path");
when(commandLine.getOptionValue("topology_package")).thenReturn("jar");
when(commandLine.getOptionValue("topology_defn")).thenReturn("/mock/defnFile.defn");
when(commandLine.getOptionValue("topology_bin")).thenReturn("binaryFile.jar");
Config config = Mockito.spy(SubmitterMain.loadConfig(commandLine, topology));
AuroraScheduler testScheduler = new AuroraScheduler();
testScheduler.initialize(config, runtime);
Resource containerResource =
new Resource(2.3, ByteAmount.fromGigabytes(2), ByteAmount.fromGigabytes(3));
Map<AuroraField, String> properties = testScheduler.createAuroraProperties(containerResource);
// this part is key, the conf path in the config is absolute to the install dir, but what
// aurora properties get below is the relative ./heron-conf path to be used when run remotely
assertEquals("Invalid value for key " + Key.HERON_CONF,
"/some/config/path", config.getStringValue(Key.HERON_CONF));
String expectedConf = "./heron-conf";
String expectedBin = "./heron-core/bin";
String expectedLib = "./heron-core/lib";
String expectedDist = "./heron-core/dist";
for (AuroraField field : AuroraField.values()) {
boolean asserted = false;
Object expected = null;
Object found = properties.get(field);
switch (field) {
case CORE_PACKAGE_URI:
expected = expectedDist + "/heron-core.tar.gz";
break;
case CPUS_PER_CONTAINER:
expected = Double.valueOf(containerResource.getCpu()).toString();
break;
case DISK_PER_CONTAINER:
expected = Long.valueOf(containerResource.getDisk().asBytes()).toString();
break;
case RAM_PER_CONTAINER:
expected = Long.valueOf(containerResource.getRam().asBytes()).toString();
break;
case TIER:
expected = "preemptible";
break;
case NUM_CONTAINERS:
expected = "2";
break;
case EXECUTOR_BINARY:
expected = expectedBin + "/heron-executor";
break;
case TOPOLOGY_PACKAGE_URI:
expected = "http://foo/bar";
break;
case TOPOLOGY_ARGUMENTS:
expected = "--topology-name=topologyName"
+ " --topology-id=" + topology.getId()
+ " --topology-defn-file=defnFile.defn"
+ " --state-manager-connection=null"
+ " --state-manager-root=null"
+ " --state-manager-config-file=" + expectedConf + "/statemgr.yaml"
+ " --tmanager-binary=" + expectedBin + "/heron-tmanager"
+ " --stmgr-binary=" + expectedBin + "/heron-stmgr"
+ " --metrics-manager-classpath=" + expectedLib + "/metricsmgr/*"
+ " --instance-jvm-opts=\"\""
+ " --classpath=binaryFile.jar"
+ " --heron-internals-config-file=" + expectedConf + "/heron_internals.yaml"
+ " --override-config-file=" + expectedConf + "/override.yaml"
+ " --component-ram-map=null"
+ " --component-jvm-opts=\"\""
+ " --pkg-type=jar"
+ " --topology-binary-file=binaryFile.jar"
+ " --heron-java-home=/usr/lib/jvm/default-java"
+ " --heron-shell-binary=" + expectedBin + "/heron-shell"
+ " --cluster=some_cluster"
+ " --role=some_role"
+ " --environment=some_env"
+ " --instance-classpath=" + expectedLib + "/instance/*"
+ " --metrics-sinks-config-file=" + expectedConf + "/metrics_sinks.yaml"
+ " --scheduler-classpath=" + expectedLib + "/scheduler/*:./heron-core"
+ "/lib/packing/*:" + expectedLib + "/statemgr/*"
+ " --python-instance-binary=" + expectedBin + "/heron-python-instance"
+ " --cpp-instance-binary=" + expectedBin + "/heron-cpp-instance"
+ " --metricscache-manager-classpath=" + expectedLib + "/metricscachemgr/*"
+ " --metricscache-manager-mode=disabled"
+ " --is-stateful=false"
+ " --checkpoint-manager-classpath=" + expectedLib + "/ckptmgr/*:"
+ expectedLib + "/statefulstorage/*:"
+ " --stateful-config-file=" + expectedConf + "/stateful.yaml"
+ " --checkpoint-manager-ram=1073741824"
+ " --health-manager-mode=disabled"
+ " --health-manager-classpath=" + expectedLib + "/healthmgr/*";
break;
case CLUSTER:
expected = "some_cluster";
break;
case ENVIRON:
expected = "some_env";
break;
case ROLE:
expected = "some_role";
break;
case TOPOLOGY_NAME:
expected = "topologyName";
break;
default:
fail(String.format(
"Expected value for Aurora field %s not found in test (found=%s)", field, found));
}
if (!asserted) {
assertEquals("Incorrect value found for field " + field, expected, found);
}
properties.remove(field);
}
assertTrue("The following aurora fields were not set by the scheduler: " + properties,
properties.isEmpty());
}
}