blob: dee2a205fb370a974d6b1b4ab14e4777a804b641 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test cases for {@link DistributedScheduler}.
*/
public class TestDistributedScheduler {
@Test
public void testDistributedScheduler() throws Exception {
Configuration conf = new Configuration();
DistributedScheduler distributedScheduler = new DistributedScheduler();
RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"),
RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2"),
RemoteNode.newInstance(NodeId.newInstance("c", 3), "http://c:3"),
RemoteNode.newInstance(NodeId.newInstance("d", 4), "http://d:4")));
final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
Mockito.any(DistributedSchedulingAllocateRequest.class)))
.thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() {
@Override
public DistributedSchedulingAllocateResponse answer(
InvocationOnMock invocationOnMock) throws Throwable {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("c", 3), "http://c:3"),
RemoteNode.newInstance(
NodeId.newInstance("d", 4), "http://d:4"),
RemoteNode.newInstance(
NodeId.newInstance("e", 5), "http://e:5"),
RemoteNode.newInstance(
NodeId.newInstance("f", 6), "http://f:6")));
} else {
return createAllocateResponse(Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("f", 6), "http://f:6"),
RemoteNode.newInstance(
NodeId.newInstance("e", 5), "http://e:5"),
RemoteNode.newInstance(
NodeId.newInstance("d", 4), "http://d:4"),
RemoteNode.newInstance(
NodeId.newInstance("c", 3), "http://c:3")));
}
}
});
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
ResourceRequest guaranteedReq =
createResourceRequest(ExecutionType.GUARANTEED, 5, "*");
ResourceRequest opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
AllocateResponse allocateResponse =
distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify equal distribution on hosts a, b, c and d, and none on e / f
// NOTE: No more than 1 container will be allocated on a node in the
// top k list per allocate call.
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("b", 2)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("e", 5)));
Assert.assertNull(allocs.get(NodeId.newInstance("f", 6)));
// New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
allocateResponse = distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify new containers are equally distribution on hosts c and d,
// and none on a or b
allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("e", 5)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("f", 6)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
// Ensure the DistributedScheduler respects the list order..
// The first request should be allocated to "c" since it is ranked higher
// The second request should be allocated to "f" since the ranking is
// flipped on every allocate response.
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("f", 6)).size());
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
}
private void registerAM(DistributedScheduler distributedScheduler,
RequestInterceptor finalReqIntcptr, List<RemoteNode> nodeList)
throws Exception {
RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
distSchedRegisterResponse.setMaxContainerResource(
Resource.newInstance(1024, 4));
distSchedRegisterResponse.setMinContainerResource(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(nodeList);
Mockito.when(
finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
Mockito.any(RegisterApplicationMasterRequest.class)))
.thenReturn(distSchedRegisterResponse);
distributedScheduler.registerApplicationMaster(
Records.newRecord(RegisterApplicationMasterRequest.class));
}
private RequestInterceptor setup(Configuration conf,
DistributedScheduler distributedScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
NMContainerTokenSecretManager nmContainerTokenSecretManager = new
NMContainerTokenSecretManager(conf);
MasterKey mKey = new MasterKey() {
@Override
public int getKeyId() {
return 1;
}
@Override
public void setKeyId(int keyId) {}
@Override
public ByteBuffer getBytes() {
return ByteBuffer.allocate(8);
}
@Override
public void setBytes(ByteBuffer bytes) {}
};
nmContainerTokenSecretManager.setMasterKey(mKey);
OpportunisticContainerAllocator containerAllocator =
new OpportunisticContainerAllocator(nmContainerTokenSecretManager);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
nmTokenSecretManagerInNM.setMasterKey(mKey);
distributedScheduler.initLocal(1234,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
containerAllocator, nmTokenSecretManagerInNM, "test");
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
distributedScheduler.setNextInterceptor(finalReqIntcptr);
return finalReqIntcptr;
}
private ResourceRequest createResourceRequest(ExecutionType execType,
int numContainers, String resourceName) {
ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
opportunisticReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(execType, true));
opportunisticReq.setNumContainers(numContainers);
opportunisticReq.setCapability(Resource.newInstance(1024, 4));
opportunisticReq.setPriority(Priority.newInstance(100));
opportunisticReq.setRelaxLocality(true);
opportunisticReq.setResourceName(resourceName);
return opportunisticReq;
}
private DistributedSchedulingAllocateResponse createAllocateResponse(
List<RemoteNode> nodes) {
DistributedSchedulingAllocateResponse distSchedAllocateResponse =
Records.newRecord(DistributedSchedulingAllocateResponse.class);
distSchedAllocateResponse
.setAllocateResponse(Records.newRecord(AllocateResponse.class));
distSchedAllocateResponse.setNodesForScheduling(nodes);
return distSchedAllocateResponse;
}
private Map<NodeId, List<ContainerId>> mapAllocs(
AllocateResponse allocateResponse, int expectedSize) throws Exception {
Assert.assertEquals(expectedSize,
allocateResponse.getAllocatedContainers().size());
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
for (Container c : allocateResponse.getAllocatedContainers()) {
ContainerTokenIdentifier cTokId = BuilderUtils
.newContainerTokenIdentifier(c.getContainerToken());
Assert.assertEquals(
c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
cTokId.getNmHostAddress());
List<ContainerId> cIds = allocs.get(c.getNodeId());
if (cIds == null) {
cIds = new ArrayList<>();
allocs.put(c.getNodeId(), cIds);
}
cIds.add(c.getId());
}
return allocs;
}
}