blob: 604fd67fb230e88554b784e41f3ddc114cfb4590 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.mesos;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.scheduler.mesos.framework.BaseContainer;
import org.apache.heron.scheduler.mesos.framework.MesosFramework;
import org.apache.heron.scheduler.utils.SchedulerUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.mesos.SchedulerDriver;
public class MesosSchedulerTest {
private static final String TOPOLOGY_NAME = "testTopology";
private static final String ROLE = "role";
private static final long NUM_CONTAINER = 2;
private static final String TOPOLOGY_PACKAGE_URI = "topologyPackageURI";
private static final String CORE_PACKAGE_URI = "corePackageURI";
private MesosScheduler scheduler;
private MesosFramework mesosFramework;
private BaseContainer baseContainer;
@Before
public void before() throws Exception {
Config config = Mockito.mock(Config.class);
Mockito.when(config.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
Mockito.when(config.getStringValue(Key.ROLE)).thenReturn(ROLE);
Mockito.when(config.getStringValue(Key.CORE_PACKAGE_URI)).thenReturn(CORE_PACKAGE_URI);
Config runtime = Mockito.mock(Config.class);
Mockito.when(runtime.getLongValue(Key.NUM_CONTAINERS)).thenReturn(NUM_CONTAINER);
Properties properties = new Properties();
properties.put(Key.TOPOLOGY_PACKAGE_URI.value(), TOPOLOGY_PACKAGE_URI);
Mockito.when(runtime.get(Key.SCHEDULER_PROPERTIES)).thenReturn(properties);
mesosFramework = Mockito.mock(MesosFramework.class);
SchedulerDriver driver = Mockito.mock(SchedulerDriver.class);
baseContainer = Mockito.mock(BaseContainer.class);
scheduler = Mockito.spy(MesosScheduler.class);
Mockito.doReturn(mesosFramework).when(scheduler).getMesosFramework();
Mockito.doReturn(driver).when(scheduler)
.getSchedulerDriver(Mockito.anyString(), Mockito.eq(mesosFramework));
Mockito.doNothing().when(scheduler)
.startSchedulerDriver();
scheduler.initialize(config, runtime);
}
@After
public void after() throws Exception {
scheduler.close();
}
@Test
public void testOnSchedule() throws Exception {
Mockito.doReturn(baseContainer).when(scheduler)
.getBaseContainer(Mockito.anyInt(), Mockito.any(PackingPlan.class));
scheduler.onSchedule(Mockito.mock(PackingPlan.class));
Map<Integer, BaseContainer> expectedJobDef = new HashMap<>();
for (int containerIndex = 0;
containerIndex < NUM_CONTAINER;
containerIndex++) {
expectedJobDef.put(containerIndex, baseContainer);
}
Mockito.verify(mesosFramework).createJob(Mockito.eq(expectedJobDef));
}
@Test
public void testOnKill() throws Exception {
scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance());
Mockito.verify(mesosFramework).killJob();
}
@Test
public void testOnRestart() throws Exception {
Scheduler.RestartTopologyRequest request =
Scheduler.RestartTopologyRequest.newBuilder()
.setContainerIndex(-1)
.setTopologyName(TOPOLOGY_NAME).build();
scheduler.onRestart(request);
Mockito.verify(mesosFramework).restartJob(-1);
}
@Test
public void testGetBaseContainer() throws Exception {
final double CPU = 0.5;
final ByteAmount MEM = ByteAmount.fromMegabytes(100);
final ByteAmount DISK = ByteAmount.fromMegabytes(100);
Resource containerResources = new Resource(CPU, MEM, DISK);
PackingPlan.ContainerPlan containerPlan =
new PackingPlan.ContainerPlan(
0, new HashSet<PackingPlan.InstancePlan>(), containerResources);
Set<PackingPlan.ContainerPlan> containerPlans = new HashSet<>();
containerPlans.add(containerPlan);
PackingPlan packingPlan = new PackingPlan(TOPOLOGY_NAME, containerPlans);
BaseContainer container = scheduler.getBaseContainer(0, packingPlan);
// Assert we have constructed the correct BaseContainer structure
Assert.assertEquals(ROLE, container.runAsUser);
Assert.assertEquals(CPU, container.cpu, 0.01);
Assert.assertEquals(MEM, ByteAmount.fromMegabytes(((Double) container.memInMB).longValue()));
Assert.assertEquals(DISK, ByteAmount.fromMegabytes(((Double) container.diskInMB).longValue()));
Assert.assertEquals(SchedulerUtils.ExecutorPort.getRequiredPorts().size(), container.ports);
Assert.assertEquals(2, container.dependencies.size());
Assert.assertTrue(container.dependencies.contains(CORE_PACKAGE_URI));
Assert.assertTrue(container.dependencies.contains(TOPOLOGY_PACKAGE_URI));
Assert.assertTrue(container.environmentVariables.isEmpty());
Assert.assertTrue(container.name.startsWith("container_0"));
// Convert to JSON
String str = container.toString();
String json = BaseContainer.getJobDefinitionInJSON(container);
Assert.assertEquals(json, str);
// Convert the JSON back to BaseContainer
BaseContainer newContainer = BaseContainer.getJobFromJSONString(json);
Assert.assertEquals(json, BaseContainer.getJobDefinitionInJSON(newContainer));
}
}