blob: c7a5ed33d4e3a6197ce64489ee7a6d7d14dfed85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.local.driver;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
import org.apache.reef.runtime.local.client.parameters.RackNames;
import org.apache.reef.runtime.local.client.parameters.RootFolder;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.mockito.Mockito.*;
/**
* Unit test for Resource Manager (and ContainerManager) classes.
*
*/
public class ResourceManagerTest {
private Injector injector;
private ResourceManager resourceManager;
private RemoteManager remoteManager;
private EventHandler<ResourceStatusEvent> mockRuntimeResourceStatusHandler;
private EventHandler<NodeDescriptorEvent> mockNodeDescriptorHandler;
private EventHandler<ResourceAllocationEvent> mockResourceAllocationHandler;
private EventHandler<RuntimeStatusEvent> mockRuntimeStatusHandler;
private REEFFileNames filenames;
private ContainerManager containerManager;
private ConfigurationSerializer configurationSerializer;
private static final double JVM_HEAP_SLACK = 0.1;
private LoggingScopeFactory loggingScopeFactory;
@SuppressWarnings("unchecked")
@Before
public void setUp() throws InjectionException {
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
cb.bindNamedParameter(RootFolder.class, "target/REEF_LOCAL_RUNTIME");
injector = Tang.Factory.getTang().newInjector(cb.build());
remoteManager = injector.getInstance(RemoteManager.class);
mockRuntimeResourceStatusHandler = mock(EventHandler.class);
injector.bindVolatileParameter(RuntimeParameters.ResourceStatusHandler.class, mockRuntimeResourceStatusHandler);
mockNodeDescriptorHandler = mock(EventHandler.class);
injector.bindVolatileParameter(RuntimeParameters.NodeDescriptorHandler.class, mockNodeDescriptorHandler);
mockResourceAllocationHandler = mock(EventHandler.class);
injector.bindVolatileParameter(RuntimeParameters.ResourceAllocationHandler.class, mockResourceAllocationHandler);
mockRuntimeStatusHandler = mock(EventHandler.class);
injector.bindVolatileParameter(RuntimeParameters.RuntimeStatusHandler.class, mockRuntimeStatusHandler);
configurationSerializer = injector.getInstance(ConfigurationSerializer.class);
filenames = injector.getInstance(REEFFileNames.class);
loggingScopeFactory = injector.getInstance(LoggingScopeFactory.class);
}
@After
public void tearDown() {
// no need to reset mocks, they are created again in the setup
}
/**
* Helper method to call the sendNodeDescriptors private method in the
* containerManager, which populates the available containers in each rack.
*/
private void sendNodeDescriptors() {
try {
final Method method = ContainerManager.class.getDeclaredMethod("sendNodeDescriptors");
method.setAccessible(true);
method.invoke(containerManager);
} catch (final Exception exc) {
throw new RuntimeException(exc);
}
}
@Test(expected = InjectionException.class)
public void testInvalidRacksConfigured() throws InjectionException {
// Given
final Set<String> availableRacks = new HashSet<>(Arrays.asList("/rack1/*"));
injector.bindVolatileParameter(RackNames.class, availableRacks);
// When
containerManager = injector.getInstance(ContainerManager.class);
// Then
// expect the exception to be thrown
}
@Test
public void testOneAllocationsInDefaultRack() throws InjectionException {
// Given
containerManager = injector.getInstance(ContainerManager.class);
sendNodeDescriptors();
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager, filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(1).setVirtualCores(1)
.setMemorySize(64).build();
// When
resourceManager.onResourceRequest(request);
// Then
verify(mockResourceAllocationHandler, times(1)).onNext(any(ResourceAllocationEvent.class));
verify(mockRuntimeStatusHandler, times(2)).onNext(any(RuntimeStatusEvent.class));
}
@Test
public void testZeroAllocationsDueToContainersNotAvailableAndRelaxLocalityDisabled() throws InjectionException {
// Given
containerManager = injector.getInstance(ContainerManager.class);
// not sending notifications, there are no available free slots in the container manager
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager,
filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(2).setVirtualCores(1)
.setMemorySize(64).build();
// When
resourceManager.onResourceRequest(request);
// Then
verify(mockResourceAllocationHandler, times(0)).onNext(any(ResourceAllocationEvent.class));
verify(mockRuntimeStatusHandler, times(1)).onNext(any(RuntimeStatusEvent.class));
}
@Test
public void testTwoAllocationsInDifferentRacks() throws InjectionException {
// Given
final List<String> availableRacks = Arrays.asList("/rack1", "/rack2");
final Set<String> availableRacksSet = new HashSet<>(availableRacks);
injector.bindVolatileParameter(RackNames.class, availableRacksSet); // 2 available racks
injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 2); // 1 evaluator per rack
containerManager = injector.getInstance(ContainerManager.class); // inject containerManager with this updated info
sendNodeDescriptors();
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager,
filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(2).setVirtualCores(1)
.setMemorySize(64).addRackName(availableRacks.get(0)).addRackName(availableRacks.get(1)).build();
// When
resourceManager.onResourceRequest(request);
// Then
final ArgumentCaptor<ResourceAllocationEvent> argument = ArgumentCaptor.forClass(ResourceAllocationEvent.class);
verify(mockResourceAllocationHandler, times(2)).onNext(argument.capture());
final List<ResourceAllocationEvent> actualResourceAllocationEvent = argument.getAllValues();
Assert.assertEquals("/rack1", actualResourceAllocationEvent.get(0).getRackName().get());
Assert.assertEquals("/rack2", actualResourceAllocationEvent.get(1).getRackName().get());
verify(mockRuntimeStatusHandler, times(3)).onNext(any(RuntimeStatusEvent.class));
}
@Test
public void testTwoAllocationsOnFourContainersAvailableInDefaultRack() throws InjectionException {
// Given
containerManager = injector.getInstance(ContainerManager.class);
sendNodeDescriptors();
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager,
filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(2).setVirtualCores(1)
.setMemorySize(64).build();
// When
resourceManager.onResourceRequest(request);
// Then
verify(mockResourceAllocationHandler, times(2)).onNext(any(ResourceAllocationEvent.class));
verify(mockRuntimeStatusHandler, times(3)).onNext(any(RuntimeStatusEvent.class));
}
@Test
public void testOneAllocationInRack1AndTwoInDatacenter2() throws InjectionException {
// Given
final List<String> availableRacks = Arrays.asList("/dc1/rack1", "/dc2/rack1", "/dc2/rack2");
final Set<String> availableRacksSet = new HashSet<>(availableRacks);
injector.bindVolatileParameter(RackNames.class, availableRacksSet); // 3 available racks
injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 3); // 1 evaluator per rack
containerManager = injector.getInstance(ContainerManager.class);
sendNodeDescriptors();
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager,
filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(3).setVirtualCores(1)
.setMemorySize(64).addRackName("dc1/*").addRackName("/dc2/*").build();
// When
resourceManager.onResourceRequest(request);
// Then
final ArgumentCaptor<ResourceAllocationEvent> argument = ArgumentCaptor.forClass(ResourceAllocationEvent.class);
verify(mockResourceAllocationHandler, times(3)).onNext(argument.capture());
final List<ResourceAllocationEvent> actualResourceAllocationEvent = argument.getAllValues();
Assert.assertTrue(actualResourceAllocationEvent.get(0).getRackName().get().contains("/dc1"));
Assert.assertTrue(actualResourceAllocationEvent.get(1).getRackName().get().contains("/dc2"));
Assert.assertTrue(actualResourceAllocationEvent.get(2).getRackName().get().contains("/dc2"));
verify(mockRuntimeStatusHandler, times(4)).onNext(any(RuntimeStatusEvent.class));
}
@Test
public void testAllocateNode8AndTwoRandomOnesInDefaultRack() throws InjectionException {
// Given
injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 8); // 8 evaluator in the default rack
containerManager = injector.getInstance(ContainerManager.class);
sendNodeDescriptors();
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager,
filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(3).setVirtualCores(1)
.setMemorySize(64).addNodeName("Node-8").build();
// When
resourceManager.onResourceRequest(request);
// Then
final ArgumentCaptor<ResourceAllocationEvent> argument = ArgumentCaptor.forClass(ResourceAllocationEvent.class);
verify(mockResourceAllocationHandler, times(3)).onNext(argument.capture());
final List<ResourceAllocationEvent> actualResourceAllocationEvent = argument.getAllValues();
Assert.assertEquals("Node-8", actualResourceAllocationEvent.get(0).getNodeId());
Assert.assertEquals(RackNames.DEFAULT_RACK_NAME, actualResourceAllocationEvent.get(0).getRackName().get());
Assert.assertNotEquals("Node-8", actualResourceAllocationEvent.get(1).getNodeId());
Assert.assertEquals(RackNames.DEFAULT_RACK_NAME, actualResourceAllocationEvent.get(1).getRackName().get());
Assert.assertNotEquals("Node-8", actualResourceAllocationEvent.get(2).getNodeId());
Assert.assertEquals(RackNames.DEFAULT_RACK_NAME, actualResourceAllocationEvent.get(2).getRackName().get());
verify(mockRuntimeStatusHandler, times(4)).onNext(any(RuntimeStatusEvent.class));
}
@Test
public void testOneAllocationInRack1AndTwoInDifferentRacksDueToRelaxLocality() throws InjectionException {
// Given
final List<String> availableRacks = Arrays.asList("/dc1/rack1", "/dc2/rack1", "/dc3/rack1");
final Set<String> availableRacksSet = new HashSet<>(availableRacks);
injector.bindVolatileParameter(RackNames.class, availableRacksSet);
injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 3); // 3 evaluators in three different racks
containerManager = injector.getInstance(ContainerManager.class);
sendNodeDescriptors();
resourceManager = new ResourceManager(containerManager, mockResourceAllocationHandler, mockRuntimeStatusHandler,
JVM_HEAP_SLACK, configurationSerializer, remoteManager,
filenames, loggingScopeFactory);
final ResourceRequestEvent request = ResourceRequestEventImpl.newBuilder().setResourceCount(3).setVirtualCores(1)
.setMemorySize(64).addRackName("/dc3/rack1").addRackName("/*").build();
// When
resourceManager.onResourceRequest(request);
// Then
final ArgumentCaptor<ResourceAllocationEvent> argument = ArgumentCaptor.forClass(ResourceAllocationEvent.class);
verify(mockResourceAllocationHandler, times(3)).onNext(argument.capture());
final List<ResourceAllocationEvent> actualResourceAllocationEvent = argument.getAllValues();
Assert.assertEquals("/dc3/rack1", actualResourceAllocationEvent.get(0).getRackName().get());
Assert.assertNotEquals("/dc3/rack1", actualResourceAllocationEvent.get(1).getRackName().get());
Assert.assertNotEquals("/dc3/rack1", actualResourceAllocationEvent.get(2).getRackName().get());
verify(mockRuntimeStatusHandler, times(4)).onNext(any(RuntimeStatusEvent.class));
}
}