blob: f306ffe22b41f3a8929ed0491952451fdb17cdb6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
/**
* Test cases for volume processor.
*/
public class TestVolumeProcessor {
private static final int GB = 1024;
private YarnConfiguration conf;
private RMNodeLabelsManager mgr;
private MockRM rm;
private MockNM[] mockNMS;
private RMNode[] rmNodes;
private static final int NUM_OF_NMS = 4;
// resource-types.xml will be created under target/test-classes/ dir,
// it must be deleted after the test is done, to avoid it from reading
// by other UT classes.
private File resourceTypesFile = null;
private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume";
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
resourceTypesFile = new File(conf.getClassLoader()
.getResource(".").getPath(), "resource-types.xml");
writeTmpResourceTypesFile(resourceTypesFile);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.set("yarn.scheduler.capacity.resource-calculator",
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
conf.set(CapacitySchedulerConfiguration.PREFIX
+ CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
"fair");
// this is required to enable volume processor
conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
VolumeAMSProcessor.class.getName());
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
rm = new MockRM(conf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
mockNMS = new MockNM[NUM_OF_NMS];
rmNodes = new RMNode[NUM_OF_NMS];
for (int i = 0; i < 4; i++) {
mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB);
rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId());
}
}
@After
public void tearDown() {
if (resourceTypesFile != null && resourceTypesFile.exists()) {
resourceTypesFile.delete();
}
}
private void writeTmpResourceTypesFile(File tmpFile) throws IOException {
Configuration yarnConf = new YarnConfiguration();
yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME);
yarnConf.set("yarn.resource-types."
+ VOLUME_RESOURCE_NAME + ".units", "Mi");
yarnConf.set("yarn.resource-types."
+ VOLUME_RESOURCE_NAME + ".tags",
CsiConstants.CSI_VOLUME_RESOURCE_TAG);
try (FileWriter fw = new FileWriter(tmpFile)) {
yarnConf.writeXml(fw);
}
}
@Test (timeout = 10000L)
public void testVolumeProvisioning() throws Exception {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
Resource resource = Resource.newInstance(1024, 1);
ResourceInformation volumeResource = ResourceInformation
.newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
ImmutableMap.of(
CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
CsiConstants.CSI_DRIVER_NAME, "hostpath",
CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
)
);
resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
SchedulingRequest sc = SchedulingRequest
.newBuilder().allocationRequestId(0L)
.resourceSizing(ResourceSizing.newInstance(1, resource))
.build();
AllocateRequest ar = AllocateRequest.newBuilder()
.schedulingRequests(Arrays.asList(sc))
.build();
// inject adaptor client for testing
CsiAdaptorProtocol mockedClient = Mockito
.mock(CsiAdaptorProtocol.class);
rm.getRMContext().getVolumeManager()
.registerCsiDriverAdaptor("hostpath", mockedClient);
// simulate validation succeed
doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
.when(mockedClient)
.validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
am1.allocate(ar);
VolumeStates volumeStates =
rm.getRMContext().getVolumeManager().getVolumeStates();
Assert.assertNotNull(volumeStates);
VolumeState volumeState = VolumeState.NEW;
while (volumeState != VolumeState.NODE_READY) {
Volume volume = volumeStates
.getVolume(new VolumeId("test-vol-000001"));
if (volume != null) {
volumeState = volume.getVolumeState();
}
am1.doHeartbeat();
mockNMS[0].nodeHeartbeat(true);
Thread.sleep(500);
}
rm.stop();
}
@Test (timeout = 30000L)
public void testInvalidRequest() throws Exception {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
Resource resource = Resource.newInstance(1024, 1);
ResourceInformation volumeResource = ResourceInformation
.newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
ImmutableMap.of(
// volume ID is missing...
CsiConstants.CSI_VOLUME_NAME, "test-vol-000001",
CsiConstants.CSI_DRIVER_NAME, "hostpath",
CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
)
);
resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
SchedulingRequest sc = SchedulingRequest
.newBuilder().allocationRequestId(0L)
.resourceSizing(ResourceSizing.newInstance(1, resource))
.build();
AllocateRequest ar = AllocateRequest.newBuilder()
.schedulingRequests(Arrays.asList(sc))
.build();
try {
am1.allocate(ar);
Assert.fail("allocate should fail because invalid request received");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidVolumeException);
}
rm.stop();
}
@Test (timeout = 30000L)
public void testProvisioningFailures() throws Exception {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
CsiAdaptorProtocol mockedClient = Mockito
.mock(CsiAdaptorProtocol.class);
// inject adaptor client
rm.getRMContext().getVolumeManager()
.registerCsiDriverAdaptor("hostpath", mockedClient);
doThrow(new VolumeException("failed"))
.when(mockedClient)
.validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
Resource resource = Resource.newInstance(1024, 1);
ResourceInformation volumeResource = ResourceInformation
.newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
ImmutableMap.of(
CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
CsiConstants.CSI_DRIVER_NAME, "hostpath",
CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
)
);
resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
SchedulingRequest sc = SchedulingRequest
.newBuilder().allocationRequestId(0L)
.resourceSizing(ResourceSizing.newInstance(1, resource))
.build();
AllocateRequest ar = AllocateRequest.newBuilder()
.schedulingRequests(Arrays.asList(sc))
.build();
try {
am1.allocate(ar);
Assert.fail("allocate should fail");
} catch (Exception e) {
Assert.assertTrue(e instanceof VolumeProvisioningException);
}
rm.stop();
}
@Test (timeout = 10000L)
public void testVolumeResourceAllocate() throws Exception {
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
Resource resource = Resource.newInstance(1024, 1);
ResourceInformation volumeResource = ResourceInformation
.newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
ImmutableMap.of(
CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
CsiConstants.CSI_DRIVER_NAME, "hostpath",
CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
)
);
resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
SchedulingRequest sc = SchedulingRequest
.newBuilder().allocationRequestId(0L)
.resourceSizing(ResourceSizing.newInstance(1, resource))
.build();
// inject adaptor client for testing
CsiAdaptorProtocol mockedClient = Mockito
.mock(CsiAdaptorProtocol.class);
rm.getRMContext().getVolumeManager()
.registerCsiDriverAdaptor("hostpath", mockedClient);
// simulate validation succeed
doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
.when(mockedClient)
.validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
am1.addSchedulingRequest(ImmutableList.of(sc));
List<Container> allocated = new ArrayList<>();
while (allocated.size() != 1) {
AllocateResponse response = am1.schedule();
mockNMS[0].nodeHeartbeat(true);
allocated.addAll(response.getAllocatedContainers());
Thread.sleep(500);
}
Assert.assertEquals(1, allocated.size());
Container alloc = allocated.get(0);
assertThat(alloc.getResource().getMemorySize()).isEqualTo(1024);
assertThat(alloc.getResource().getVirtualCores()).isEqualTo(1);
ResourceInformation allocatedVolume =
alloc.getResource().getResourceInformation(VOLUME_RESOURCE_NAME);
Assert.assertNotNull(allocatedVolume);
assertThat(allocatedVolume.getValue()).isEqualTo(1024);
assertThat(allocatedVolume.getUnits()).isEqualTo("Mi");
rm.stop();
}
}