blob: 71321e3f51d534675f135bc86c1d199adce54feb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
* specifying OPPORTUNISTIC containers in its resource requests,
* the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
* on the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingAMService running on the RM.
*/
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
private static final Log LOG =
LogFactory.getLog(TestDistributedScheduling.class);
protected MiniYARNCluster cluster;
protected YarnClient rmClient;
protected ApplicationMasterProtocol client;
protected Configuration conf;
protected Configuration yarnConf;
protected ApplicationAttemptId attemptId;
protected ApplicationId appId;
@Before
public void doBefore() throws Exception {
cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
cluster.init(conf);
cluster.start();
yarnConf = cluster.getConfig();
// the client has to connect to AMRMProxy
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
// Submit application
attemptId = createApp(rmClient, cluster, conf);
appId = attemptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
}
@After
public void doAfter() throws Exception {
if (client != null) {
try {
client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
rmClient.killApplication(attemptId.getApplicationId());
attemptId = null;
} catch (Exception e) {
}
}
if (rmClient != null) {
try {
rmClient.stop();
} catch (Exception e) {
}
}
if (cluster != null) {
try {
cluster.stop();
} catch (Exception e) {
}
}
}
/**
* Validates if Allocate Requests containing only OPPORTUNISTIC container
* requests are satisfied instantly.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
// Wait until the RM has been updated and verify
Map<ApplicationId, RMApp> rmApps =
cluster.getResourceManager().getRMContext().getRMApps();
boolean rmUpdated = false;
for (int i=0; i<10 && !rmUpdated; i++) {
sleep(100);
RMApp rmApp = rmApps.get(appId);
if (rmApp.getState() == RMAppState.RUNNING) {
rmUpdated = true;
}
}
RMApp rmApp = rmApps.get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
// Replace 'ANY' requests with OPPORTUNISTIC aks and remove
// everything else
List<ResourceRequest> newAskList = new ArrayList<>();
for (ResourceRequest rr : request.getAskList()) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
// Check that the RM sees OPPORTUNISTIC containers
ResourceScheduler scheduler = cluster.getResourceManager()
.getResourceScheduler();
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerId containerId = allocatedContainer.getId();
RMContainer rmContainer = scheduler.getRMContainer(containerId);
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
rmContainer.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
* Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC
* container requests works as expected.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testMixedExecutionTypeRequestE2E() throws Exception {
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
List<ResourceRequest> askList = request.getAskList();
List<ResourceRequest> newAskList = new ArrayList<>(askList);
// Duplicate all ANY requests marking them as opportunistic
for (ResourceRequest rr : askList) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
request.setAskList(new ArrayList<ResourceRequest>());
request.setResponseId(request.getResponseId() + 1);
Thread.sleep(1000);
// RM should allocate GUARANTEED containers within 2 calls to allocate()
allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are GUARANTEED
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.GUARANTEED,
containerTokenIdentifier.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
* Validates if AMRMClient can be used with Distributed Scheduling turned on.
*
* @throws Exception
*/
@Test(timeout = 120000)
@SuppressWarnings("unchecked")
public void testAMRMClient() throws Exception {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
try {
Priority priority = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
Resource capability = Resource.newInstance(1024, 1);
List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[]{node};
String[] racks = new String[]{rack};
// start am rm client
amClient = new AMRMClientImpl(client);
amClient.init(yarnConf);
amClient.start();
amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
assertEquals(1, oppContainersRequestedAny);
assertEquals(4, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 10;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
while (allocatedContainerCount <
(containersRequestedAny + oppContainersRequestedAny)
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount += allocResponse.getAllocatedContainers()
.size();
for (Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(allocatedContainerCount,
containersRequestedAny + oppContainersRequestedAny);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
assertEquals(3, amClient.release.size());
assertEquals(0, amClient.ask.size());
// need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
assertEquals(4, amClient.ask.size());
// test RPC exception handling
amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
nodes, racks, priority));
amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
final AMRMClient amc = amClient;
ApplicationMasterProtocol realRM = amClient.rmClient;
try {
ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
.class);
when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
new Answer<AllocateResponse>() {
public AllocateResponse answer(InvocationOnMock invocation)
throws Exception {
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes,
racks, priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks,
priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null,
priority2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
throw new Exception();
}
});
amClient.rmClient = mockRM;
amClient.allocate(0.1f);
} catch (Exception ioe) {
} finally {
amClient.rmClient = realRM;
}
assertEquals(3, amClient.release.size());
assertEquals(6, amClient.ask.size());
iterationsLeft = 3;
// do a few iterations to ensure RM is not going send new containers
while (iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
assertEquals(0, allocResponse.getAllocatedContainers().size());
if (allocResponse.getCompletedContainersStatuses().size() > 0) {
for (ContainerStatus cStatus : allocResponse
.getCompletedContainersStatuses()) {
if (releases.contains(cStatus.getContainerId())) {
assertEquals(cStatus.getState(), ContainerState.COMPLETE);
assertEquals(-100, cStatus.getExitStatus());
releases.remove(cStatus.getContainerId());
}
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (amClient != null && amClient.getServiceState() == Service.STATE
.STARTED) {
amClient.stop();
}
}
}
/**
* Check if an AM can ask for opportunistic containers and get them.
* @throws Exception
*/
@Test
public void testAMOpportunistic() throws Exception {
// Basic container to request
Resource capability = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(1);
// Get the cluster topology
List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[]{node};
String[] racks = new String[]{rack};
// Create an AM to request resources
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
try {
amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
amClient.init(yarnConf);
amClient.start();
amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
// AM requests an opportunistic container
ExecutionTypeRequest execTypeRequest =
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
capability, nodes, racks, priority, true, null, execTypeRequest);
amClient.addContainerRequest(containerRequest);
// Wait until the container is allocated
ContainerId opportunisticContainerId = null;
for (int i=0; i<10 && opportunisticContainerId == null; i++) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<Container> allocatedContainers =
allocResponse.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
// Check that this is the container we required
assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainer.getExecutionType());
opportunisticContainerId = allocatedContainer.getId();
}
sleep(100);
}
assertNotNull(opportunisticContainerId);
// The RM sees the container as OPPORTUNISTIC
ResourceScheduler scheduler = cluster.getResourceManager()
.getResourceScheduler();
RMContainer rmContainer = scheduler.getRMContainer(
opportunisticContainerId);
assertEquals(ExecutionType.OPPORTUNISTIC,
rmContainer.getExecutionType());
// Release the opportunistic container
amClient.releaseAssignedContainer(opportunisticContainerId);
// Wait for the release container to appear
boolean released = false;
for (int i=0; i<10 && !released; i++) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<ContainerStatus> completedContainers =
allocResponse.getCompletedContainersStatuses();
for (ContainerStatus completedContainer : completedContainers) {
ContainerId completedContainerId =
completedContainer.getContainerId();
assertEquals(completedContainerId, opportunisticContainerId);
released = true;
}
if (!released) {
sleep(100);
}
}
assertTrue(released);
// The RM shouldn't see the container anymore
rmContainer = scheduler.getRMContainer(opportunisticContainerId);
assertNull(rmContainer);
// Clean the AM
amClient.unregisterApplicationMaster(
FinalApplicationStatus.SUCCEEDED, null, null);
} finally {
if (amClient != null &&
amClient.getServiceState() == Service.STATE.STARTED) {
amClient.close();
}
}
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}