blob: 0e882996d4298c6d0f3d3804392893355d7c394a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
/**
* Test Placement Constraints and Scheduling Requests.
*/
public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
@Test(timeout=60000)
public void testAMRMClientWithPlacementConstraints()
throws Exception {
// we have to create a new instance of MiniYARNCluster to avoid SASL qop
// mismatches between client and server
teardown();
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
createClusterAndStartApplication(conf);
AMRMClient<AMRMClient.ContainerRequest> amClient =
AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
final List<Container> allocatedContainers = new ArrayList<>();
final List<RejectedSchedulingRequest> rejectedSchedulingRequests =
new ArrayList<>();
AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000,
new AMRMClientAsync.AbstractCallbackHandler() {
@Override
public void onContainersAllocated(List<Container> containers) {
allocatedContainers.addAll(containers);
}
@Override
public void onRequestsRejected(
List<RejectedSchedulingRequest> rejReqs) {
rejectedSchedulingRequests.addAll(rejReqs);
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {}
@Override
public void onContainersUpdated(List<UpdatedContainer> containers) {}
@Override
public void onShutdownRequest() {}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
@Override
public void onError(Throwable e) {}
@Override
public float getProgress() {
return 0.1f;
}
});
asyncClient.init(conf);
asyncClient.start();
Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>();
pcMapping.put(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
pcMapping.put(Collections.singleton("bar"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
// Send two types of requests - 4 with source tag "foo" have numAlloc = 1
// and 1 with source tag "bar" and has numAlloc = 4. Both should be
// handled similarly. i.e: Since there are only 3 nodes,
// 2 schedulingRequests - 1 with source tag "foo" on one with source
// tag "bar" should get rejected.
asyncClient.addSchedulingRequests(
Arrays.asList(
// 4 reqs with numAlloc = 1
schedulingRequest(1, 1, 1, 1, 512, "foo"),
schedulingRequest(1, 1, 2, 1, 512, "foo"),
schedulingRequest(1, 1, 3, 1, 512, "foo"),
schedulingRequest(1, 1, 4, 1, 512, "foo"),
// 1 req with numAlloc = 4
schedulingRequest(4, 1, 5, 1, 512, "bar")));
// kick the scheduler
waitForContainerAllocation(allocatedContainers,
rejectedSchedulingRequests, 6, 2);
Assert.assertEquals(6, allocatedContainers.size());
Map<NodeId, List<Container>> containersPerNode =
allocatedContainers.stream().collect(
Collectors.groupingBy(Container::getNodeId));
// Ensure 2 containers allocated per node.
// Each node should have a "foo" and a "bar" container.
Assert.assertEquals(3, containersPerNode.entrySet().size());
HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar"));
containersPerNode.entrySet().forEach(
x ->
Assert.assertEquals(
srcTags,
x.getValue()
.stream()
.map(y -> y.getAllocationTags().iterator().next())
.collect(Collectors.toSet()))
);
// Ensure 2 rejected requests - 1 of "foo" and 1 of "bar"
Assert.assertEquals(2, rejectedSchedulingRequests.size());
Assert.assertEquals(srcTags,
rejectedSchedulingRequests
.stream()
.map(x -> x.getRequest().getAllocationTags().iterator().next())
.collect(Collectors.toSet()));
asyncClient.stop();
}
private static void waitForContainerAllocation(
List<Container> allocatedContainers,
List<RejectedSchedulingRequest> rejectedRequests,
int containerNum, int rejNum) throws Exception {
int maxCount = 10;
while (maxCount >= 0 &&
(allocatedContainers.size() < containerNum ||
rejectedRequests.size() < rejNum)) {
maxCount--;
sleep(1000);
}
}
private static SchedulingRequest schedulingRequest(int numAllocations,
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(numAllocations, priority, allocReqId, cores, mem,
ExecutionType.GUARANTEED, tags);
}
private static SchedulingRequest schedulingRequest(int numAllocations,
int priority, long allocReqId, int cores, int mem,
ExecutionType execType, String... tags) {
return SchedulingRequest.newBuilder()
.priority(Priority.newInstance(priority))
.allocationRequestId(allocReqId)
.allocationTags(new HashSet<>(Arrays.asList(tags)))
.executionType(ExecutionTypeRequest.newInstance(execType, true))
.resourceSizing(
ResourceSizing.newInstance(numAllocations,
Resource.newInstance(mem, cores)))
.build();
}
}