blob: 1c0a43b0f8dec66f5a0dc6caf586d18727b5a8ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.nomad;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.google.common.base.Optional;
import com.hashicorp.nomad.apimodel.Job;
import com.hashicorp.nomad.apimodel.Port;
import com.hashicorp.nomad.apimodel.Task;
import com.hashicorp.nomad.apimodel.TaskGroup;
import com.hashicorp.nomad.javasdk.NomadApiClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.scheduler.utils.SchedulerUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import static org.mockito.Matchers.anyVararg;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.net.ssl.*", "jdk.internal.reflect.*"})
@PrepareForTest({NomadScheduler.class, Job.class, SchedulerUtils.class})
public class NomadSchedulerTest {
private static final Logger LOG = Logger.getLogger(NomadSchedulerTest.class.getName());
private static final String TOPOLOGY_NAME = "topology-name";
private static final String TOPOLOGY_ID = "topology-id";
private static final int CONTAINER_INDEX = 1;
private static final String PACKING_PLAN_ID = "packing_plan_id";
private static final String SCHEDULER_URI = "http://127.0.0.1:4646";
private static final String[] EXECUTOR_CMD_ARGS = {"args1", "args2"};
private static final String GROUP_NAME = "group-name";
private static final String TASK_NAME = "task-name";
private static final String TOPOLOGY_DOWNLOAD_CMD = "topology-download-cmd";
private static final String HERON_NOMAD_SCRIPT = "heron_nomad_script";
private static final double CPU_RESOURCE = 100.0;
private static final ByteAmount MEMORY_RESOURCE = ByteAmount.fromMegabytes(100);
private static final ByteAmount DISK_RESOURCE = ByteAmount.fromMegabytes(1000);
private static final int HERON_NOMAD_CORE_FREQ_MAPPING = 1000;
private static final String CORE_PACKAGE_URI = "core-package-uri";
private static final Boolean USE_CORE_PACKAGE_URI = true;
private static final String EXECUTOR_BINARY = "executor-binary";
private static final String PORT_FILE = "port-file";
private static NomadScheduler scheduler;
private Config mockRuntime;
private Config mockConfig;
@Before
public void setUp() throws Exception {
Config config = Config.newBuilder()
.put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
.put(Key.TOPOLOGY_ID, TOPOLOGY_ID)
.put(NomadContext.HERON_NOMAD_SCHEDULER_URI, SCHEDULER_URI)
.put(NomadContext.HERON_NOMAD_CORE_FREQ_MAPPING, HERON_NOMAD_CORE_FREQ_MAPPING)
.put(Key.CORE_PACKAGE_URI, CORE_PACKAGE_URI)
.put(Key.USE_CORE_PACKAGE_URI, USE_CORE_PACKAGE_URI)
.put(Key.EXECUTOR_BINARY, EXECUTOR_BINARY)
.put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.RAW_EXEC.getName())
.put(NomadContext.HERON_NOMAD_NETWORK_MODE, "default")
.build();
this.mockRuntime = config;
this.mockConfig = config;
scheduler = Mockito.spy(NomadScheduler.class);
}
@After
public void after() throws Exception {
scheduler.close();
}
@BeforeClass
public static void beforeClass() throws Exception {
}
@AfterClass
public static void afterClass() throws Exception {
}
@Test
public void testOnSchedule() throws Exception {
PowerMockito.mockStatic(NomadScheduler.class);
Mockito.doReturn(new LinkedList<>()).when(scheduler)
.getJobs(Mockito.any(PackingPlan.class));
scheduler.initialize(this.mockConfig, this.mockRuntime);
// Fail to schedule due to null PackingPlan
Assert.assertFalse(scheduler.onSchedule(null));
PackingPlan pplan =
new PackingPlan(
PACKING_PLAN_ID,
new HashSet<>()
);
Assert.assertTrue(pplan.getContainers().isEmpty());
// Fail to schedule due to PackingPlan is empty
Assert.assertFalse(scheduler.onSchedule(pplan));
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
PackingPlan validPlan =
new PackingPlan(PACKING_PLAN_ID, containers);
// Fail to submit due to client failure
Job[] jobs = {new Job(), new Job(), new Job()};
List<Job> jobList = Arrays.asList(jobs);
Mockito.doReturn(jobList).when(scheduler).getJobs(validPlan);
PowerMockito.doThrow(new RuntimeException()).when(NomadScheduler.class,
"startJobs", Mockito.any(NomadApiClient.class), anyVararg());
Assert.assertFalse(scheduler.onSchedule(validPlan));
// Succeed
Mockito.doReturn(jobList).when(scheduler).getJobs(validPlan);
PowerMockito.doNothing().when(NomadScheduler.class, "startJobs",
Mockito.any(NomadApiClient.class), anyVararg());
Assert.assertTrue(scheduler.onSchedule(validPlan));
}
@Test
public void testOnRestart() throws Exception {
PowerMockito.mockStatic(NomadScheduler.class);
// Construct RestartTopologyRequest to restart container
Scheduler.RestartTopologyRequest restartTopologyRequest =
Scheduler.RestartTopologyRequest.newBuilder()
.setTopologyName(TOPOLOGY_NAME)
.setContainerIndex(CONTAINER_INDEX)
.build();
scheduler.initialize(this.mockConfig, this.mockRuntime);
// Fail to restart
PowerMockito.when(NomadScheduler.getTopologyContainerJob(
Mockito.any(NomadApiClient.class),
Mockito.anyString(), Mockito.anyInt())).thenReturn(new Job());
PowerMockito.doThrow(new RuntimeException()).when(
NomadScheduler.class, "restartJobs",
Mockito.any(NomadApiClient.class), Mockito.any(Job.class));
Assert.assertFalse(scheduler.onRestart(restartTopologyRequest));
// Succeed to restart
PowerMockito.when(NomadScheduler.getTopologyContainerJob(
Mockito.any(NomadApiClient.class),
Mockito.anyString(), Mockito.anyInt())).thenReturn(new Job());
PowerMockito.doNothing().when(NomadScheduler.class, "restartJobs",
Mockito.any(NomadApiClient.class), Mockito.any(Job.class));
Assert.assertTrue(scheduler.onRestart(restartTopologyRequest));
// Construct RestartTopologyRequest to restart whole topology
restartTopologyRequest =
Scheduler.RestartTopologyRequest.newBuilder()
.setTopologyName(TOPOLOGY_NAME)
.setContainerIndex(-1)
.build();
scheduler.initialize(this.mockConfig, this.mockRuntime);
Job[] jobs = {new Job(), new Job(), new Job()};
List<Job> jobList = Arrays.asList(jobs);
// Fail to restart
PowerMockito.when(NomadScheduler.getTopologyJobs(Mockito.any(NomadApiClient.class),
Mockito.anyString())).thenReturn(jobList);
PowerMockito.doThrow(new RuntimeException()).when(NomadScheduler.class,
"restartJobs", Mockito.any(), anyVararg());
Assert.assertFalse(scheduler.onRestart(restartTopologyRequest));
// Succeed to restart
PowerMockito.when(NomadScheduler.getTopologyJobs(Mockito.any(NomadApiClient.class),
Mockito.anyString())).thenReturn(jobList);
PowerMockito.doNothing().when(NomadScheduler.class, "restartJobs",
Mockito.any(NomadApiClient.class), anyVararg());
Assert.assertTrue(scheduler.onRestart(restartTopologyRequest));
}
@Test
public void testOnKill() throws Exception {
PowerMockito.mockStatic(NomadScheduler.class);
Scheduler.KillTopologyRequest killTopologyRequest
= Scheduler.KillTopologyRequest.newBuilder()
.setTopologyName(TOPOLOGY_NAME)
.build();
scheduler.initialize(this.mockConfig, this.mockRuntime);
Job[] jobs = {new Job(), new Job(), new Job()};
List<Job> jobList = Arrays.asList(jobs);
// Fail to kill
PowerMockito.when(NomadScheduler.getTopologyJobs(Mockito.any(NomadApiClient.class),
Mockito.anyString())).thenReturn(jobList);
PowerMockito.doThrow(new RuntimeException()).when(NomadScheduler.class, "killJobs",
Mockito.any(NomadApiClient.class), anyVararg());
Assert.assertFalse(scheduler.onKill(killTopologyRequest));
// Succeed to kill
PowerMockito.when(NomadScheduler.getTopologyJobs(Mockito.any(NomadApiClient.class),
Mockito.anyString())).thenReturn(jobList);
PowerMockito.doNothing().when(NomadScheduler.class, "killJobs",
Mockito.any(NomadApiClient.class), anyVararg());
Assert.assertTrue(scheduler.onKill(killTopologyRequest));
}
@Test
public void testGetJobLinks() {
PowerMockito.mockStatic(NomadScheduler.class);
final String JOB_LINK = SCHEDULER_URI + "/ui/jobs";
scheduler.initialize(this.mockConfig, this.mockRuntime);
List<String> links = scheduler.getJobLinks();
Assert.assertEquals(1, links.size());
Assert.assertTrue(links.get(0).equals(JOB_LINK));
}
@Test
public void testGetJob() {
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
CONTAINER_INDEX, new HashSet<>(), Mockito.mock(Resource.class));
Optional<PackingPlan.ContainerPlan> plan = Optional.of(containerPlan);
Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE);
scheduler.initialize(this.mockConfig, this.mockRuntime);
Mockito.doReturn(new TaskGroup()).when(scheduler).getTaskGroup(
Mockito.anyString(), Mockito.anyInt(), Mockito.any());
Job job = scheduler.getJob(CONTAINER_INDEX, plan, resource);
LOG.info("job: " + job);
Assert.assertEquals(TOPOLOGY_ID + "-" + CONTAINER_INDEX, job.getId());
Assert.assertEquals(TOPOLOGY_NAME + "-" + CONTAINER_INDEX, job.getName());
Assert.assertArrayEquals(Arrays.asList(NomadConstants.NOMAD_DEFAULT_DATACENTER).toArray(),
job.getDatacenters().toArray());
Assert.assertNotNull(job.getTaskGroups());
}
@Test
public void testGetTaskGroup() {
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE);
PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
CONTAINER_INDEX, new HashSet<>(), Mockito.mock(Resource.class));
scheduler.initialize(this.mockConfig, this.mockRuntime);
Mockito.doReturn(new Task()).when(scheduler).getTask(
Mockito.anyString(), Mockito.anyInt(), Mockito.any());
TaskGroup taskGroup = scheduler.getTaskGroup(GROUP_NAME, CONTAINER_INDEX, resource);
LOG.info("taskGroup: " + taskGroup);
Assert.assertEquals(GROUP_NAME, taskGroup.getName());
Assert.assertNotNull(taskGroup.getCount());
Assert.assertNotNull(taskGroup.getTasks());
}
@SuppressWarnings("unchecked")
@Test
public void testGetTaskRawExec() {
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
PowerMockito.mockStatic(SchedulerUtils.class);
Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE);
PowerMockito.when(SchedulerUtils.executorCommandArgs(
Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString()))
.thenReturn(EXECUTOR_CMD_ARGS);
PowerMockito.mockStatic(NomadScheduler.class);
PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(TOPOLOGY_DOWNLOAD_CMD);
PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig))
.thenReturn(HERON_NOMAD_SCRIPT);
PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes()))
.thenReturn((int) MEMORY_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
.thenReturn((int) DISK_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
.thenReturn(PORT_FILE);
scheduler.initialize(this.mockConfig, this.mockRuntime);
Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
LOG.info("task: " + task);
Assert.assertEquals(TASK_NAME, task.getName());
Assert.assertEquals(NomadConstants.NomadDriver.RAW_EXEC.getName(), task.getDriver());
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND));
Assert.assertEquals(NomadConstants.SHELL_CMD,
task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND));
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND_ARGS));
Assert.assertArrayEquals(Arrays.asList(NomadConstants.NOMAD_HERON_SCRIPT_NAME).toArray(),
(String[]) task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND_ARGS));
Assert.assertEquals(1, task.getTemplates().size());
Assert.assertEquals(HERON_NOMAD_SCRIPT, task.getTemplates().get(0).getEmbeddedTmpl());
Assert.assertEquals(NomadConstants.NOMAD_HERON_SCRIPT_NAME,
task.getTemplates().get(0).getDestPath());
Assert.assertEquals((int) CPU_RESOURCE * HERON_NOMAD_CORE_FREQ_MAPPING,
task.getResources().getCpu().intValue());
Assert.assertEquals((int) MEMORY_RESOURCE.asMegabytes(),
task.getResources().getMemoryMb().intValue());
Assert.assertEquals((int) DISK_RESOURCE.asMegabytes(),
task.getResources().getDiskMb().intValue());
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_NOMAD_WORKING_DIR));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_USE_CORE_PACKAGE_URI));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_CORE_PACKAGE_URI));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_EXECUTOR_CMD));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
Assert.assertEquals(NomadKey.WORKING_DIRECTORY.getDefaultString() + "/container-"
+ String.valueOf(CONTAINER_INDEX),
task.getEnv().get(NomadConstants.HERON_NOMAD_WORKING_DIR));
Assert.assertEquals(USE_CORE_PACKAGE_URI.toString(),
task.getEnv().get(NomadConstants.HERON_USE_CORE_PACKAGE_URI));
Assert.assertEquals(CORE_PACKAGE_URI,
task.getEnv().get(NomadConstants.HERON_CORE_PACKAGE_URI));
Assert.assertEquals(TOPOLOGY_DOWNLOAD_CMD,
task.getEnv().get(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
Assert.assertEquals("./heron-core/bin/heron-executor args1 args2",
task.getEnv().get(NomadConstants.HERON_EXECUTOR_CMD));
Assert.assertEquals(PORT_FILE,
task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
}
@SuppressWarnings("unchecked")
@Test
public void testGetTaskDocker() {
this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockRuntime)
.put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.DOCKER.getName())
.build();
this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
.put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.DOCKER.getName())
.build();
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
PowerMockito.mockStatic(SchedulerUtils.class);
Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE);
PowerMockito.when(SchedulerUtils.executorCommandArgs(
Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString()))
.thenReturn(EXECUTOR_CMD_ARGS);
PowerMockito.mockStatic(NomadScheduler.class);
PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(TOPOLOGY_DOWNLOAD_CMD);
PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig))
.thenReturn(HERON_NOMAD_SCRIPT);
PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes()))
.thenReturn((int) MEMORY_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
.thenReturn((int) DISK_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
.thenReturn(PORT_FILE);
scheduler.initialize(this.mockConfig, this.mockRuntime);
Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
LOG.info("task: " + task);
Assert.assertEquals(TASK_NAME, task.getName());
Assert.assertEquals(NomadConstants.NomadDriver.DOCKER.getName(), task.getDriver());
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND));
Assert.assertEquals(NomadConstants.SHELL_CMD,
task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
Assert.assertEquals((int) CPU_RESOURCE * HERON_NOMAD_CORE_FREQ_MAPPING,
task.getResources().getCpu().intValue());
Assert.assertEquals((int) MEMORY_RESOURCE.asMegabytes(),
task.getResources().getMemoryMb().intValue());
Assert.assertEquals((int) DISK_RESOURCE.asMegabytes(),
task.getResources().getDiskMb().intValue());
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HOST));
Assert.assertEquals(PORT_FILE,
task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
Assert.assertEquals("${attr.unique.network.ip-address}",
task.getEnv().get(NomadConstants.HOST));
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NETWORK_MODE));
Assert.assertEquals("default",
task.getConfig().get(NomadConstants.NETWORK_MODE));
Assert.assertEquals(task.getResources().getNetworks().size(), 1);
Set<String> ports = new HashSet<>();
for (Port entry : task.getResources().getNetworks().get(0).getDynamicPorts()) {
ports.add(entry.getLabel());
Assert.assertEquals(entry.getValue(), 0);
}
for (SchedulerUtils.ExecutorPort entry : NomadConstants.EXECUTOR_PORTS.keySet()) {
Assert.assertTrue(ports.contains(entry.getName().replace("-", "_")));
}
Assert.assertTrue(ports.contains(NomadConstants.METRICS_PORT));
}
@SuppressWarnings("unchecked")
@Test
public void testServiceCheck() {
this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "foo,bar")
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "foo,bar")
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
PowerMockito.mockStatic(SchedulerUtils.class);
Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE);
PowerMockito.when(SchedulerUtils.executorCommandArgs(
Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString()))
.thenReturn(EXECUTOR_CMD_ARGS);
PowerMockito.mockStatic(NomadScheduler.class);
PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(TOPOLOGY_DOWNLOAD_CMD);
PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig))
.thenReturn(HERON_NOMAD_SCRIPT);
PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes()))
.thenReturn((int) MEMORY_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
.thenReturn((int) DISK_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
.thenReturn(PORT_FILE);
PowerMockito.when(NomadScheduler.getMetricsServiceName(Mockito.any(), Mockito.anyInt()))
.thenReturn(String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX));
scheduler.initialize(this.mockConfig, this.mockRuntime);
Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
LOG.info("task: " + task);
Assert.assertEquals(task.getServices().size(), 1);
Assert.assertEquals(task.getServices().get(0).getName(),
String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX));
String[] tags = {String.format("%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX), "foo", "bar"};
Assert.assertEquals(task.getServices().get(0).getTags(), Arrays.asList(tags));
Assert.assertEquals(task.getServices().get(0).getPortLabel(), NomadConstants.METRICS_PORT);
Assert.assertEquals(task.getServices().get(0).getChecks().size(), 1);
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getPortLabel(),
NomadConstants.METRICS_PORT);
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getType(),
NomadConstants.NOMAD_SERVICE_CHECK_TYPE);
TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getInterval(),
TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS));
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getTimeout(),
TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS));
// if service registration is turned off
this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
.put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
scheduler.initialize(this.mockConfig, this.mockRuntime);
task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
LOG.info("task: " + task);
Assert.assertTrue(task.getServices() == null);
}
}