blob: d82ddd55a3d3e0e40aeda04d187139b3d2eb3ee7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.master.resource.ContainerManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ResourceSpecification;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests {@link org.apache.nemo.runtime.master.resource.ContainerManager}.
*/
public final class ContainerManagerTest {
private static final ResourceSpecification RESOURCE_SPEC_A =
new ResourceSpecification(ResourcePriorityProperty.COMPUTE, 1, 1024);
private static final ResourceSpecification RESOURCE_SPEC_B =
new ResourceSpecification(ResourcePriorityProperty.TRANSIENT, 2, 2048);
private static final ResourceSpecification RESOURCE_SPEC_C =
new ResourceSpecification(ResourcePriorityProperty.RESERVED, 3, 3072);
private ContainerManager containerManager;
private AtomicInteger testIdNumber = new AtomicInteger(0);
private String getNodeName() {
return "NODE-" + testIdNumber.incrementAndGet();
}
private String getEvaluatorId() {
return "EVALUATOR-" + testIdNumber.incrementAndGet();
}
private String getExecutorId() {
return "EXECUTOR-" + testIdNumber.incrementAndGet();
}
@Before
public void setUp() throws InjectionException {
final MessageEnvironment mockMsgEnv = mock(MessageEnvironment.class);
when(mockMsgEnv.asyncConnect(anyString(), anyString())).thenReturn(mock(Future.class));
final Configuration configuration = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(JobConf.ScheduleSerThread.class, "1")
.build();
final Injector injector = Tang.Factory.getTang().newInjector(configuration);
injector.bindVolatileInstance(EvaluatorRequestor.class, mock(EvaluatorRequestor.class));
injector.bindVolatileInstance(MessageEnvironment.class, mockMsgEnv);
containerManager = injector.getInstance(ContainerManager.class);
}
@Test
public void testRequestAllocateLaunch() {
// Create 2 of A, 2 of B and 1 of C.
final Map<Integer, ResourceSpecification> numToSpec = new HashMap();
numToSpec.put(2, RESOURCE_SPEC_A);
numToSpec.put(2, RESOURCE_SPEC_B);
numToSpec.put(1, RESOURCE_SPEC_C);
// Request -> Allocate -> Launch
for (final Map.Entry<Integer, ResourceSpecification> entry : numToSpec.entrySet()) {
final int num = entry.getKey();
final ResourceSpecification spec = entry.getValue();
containerManager.requestContainer(num, spec);
for (int i = 0; i < num; i++) {
final String evaluatorId = getEvaluatorId();
final String executorId = getExecutorId();
final EvaluatorDescriptor descriptor = createDescriptor(spec);
containerManager.onContainerAllocated(
executorId,
createMockEvaluator(evaluatorId, descriptor),
createMockConfiguration());
final ExecutorRepresenter executorRepresenter =
containerManager.onContainerLaunched(createMockContext(executorId, descriptor)).get();
assertEquals(spec.getContainerType(), executorRepresenter.getContainerType());
assertEquals(spec.getCapacity(), executorRepresenter.getExecutorCapacity());
assertEquals(descriptor.getNodeDescriptor().getName(), executorRepresenter.getNodeName());
}
}
}
@Test
public void testFailureBeforeLaunch() {
containerManager.requestContainer(1, RESOURCE_SPEC_A);
final String evaluatorId = getEvaluatorId();
containerManager.onContainerAllocated(
getExecutorId(),
createMockEvaluator(evaluatorId, createDescriptor(RESOURCE_SPEC_A)),
createMockConfiguration());
assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId));
}
@Test
public void testFailureAfterLaunch() {
containerManager.requestContainer(1, RESOURCE_SPEC_A);
final String evaluatorId = getEvaluatorId();
final String executorId = getExecutorId();
final EvaluatorDescriptor descriptor = createDescriptor(RESOURCE_SPEC_A);
containerManager.onContainerAllocated(
executorId,
createMockEvaluator(evaluatorId, descriptor),
createMockConfiguration());
containerManager.onContainerLaunched(createMockContext(executorId, descriptor));
assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId));
}
private EvaluatorDescriptor createDescriptor(final ResourceSpecification spec) {
final EvaluatorDescriptor descriptor = mock(EvaluatorDescriptor.class);
when(descriptor.getMemory()).thenReturn(spec.getMemory());
when(descriptor.getNumberOfCores()).thenReturn(spec.getCapacity());
final NodeDescriptor node = mock(NodeDescriptor.class);
when(node.getName()).thenReturn(getNodeName());
when(descriptor.getNodeDescriptor()).thenReturn(node);
return descriptor;
}
private AllocatedEvaluator createMockEvaluator(final String id,
final EvaluatorDescriptor descriptor) {
final AllocatedEvaluator evaluator = mock(AllocatedEvaluator.class);
when(evaluator.getId()).thenReturn(id);
when(evaluator.getEvaluatorDescriptor()).thenReturn(descriptor);
return evaluator;
}
private ActiveContext createMockContext(final String id,
final EvaluatorDescriptor descriptor) {
final ActiveContext mockedContext = mock(ActiveContext.class);
when(mockedContext.getId()).thenReturn(id);
when(mockedContext.getEvaluatorDescriptor()).thenReturn(descriptor);
return mockedContext;
}
private Configuration createMockConfiguration() {
return Tang.Factory.getTang().newConfigurationBuilder().build();
}
}