blob: b23a923513c9c68869971c0b464d2edf0b7158d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNMClient {
Configuration conf = null;
MiniYARNCluster yarnCluster = null;
YarnClientImpl yarnClient = null;
AMRMClientImpl<ContainerRequest> rmClient = null;
NMClientImpl nmClient = null;
List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null;
int nodeCount = 3;
NMTokenCache nmTokenCache = null;
@Before
public void setup() throws YarnException, IOException {
// start minicluster
conf = new YarnConfiguration();
yarnCluster =
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
assertNotNull(yarnCluster);
assertEquals(STATE.STARTED, yarnCluster.getServiceState());
// start rm client
yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
assertNotNull(yarnClient);
assertEquals(STATE.STARTED, yarnClient.getServiceState());
// get node info
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
// submit new app
ApplicationSubmissionContext appContext =
yarnClient.createApplication().getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// set the application name
appContext.setApplicationName("Test");
// Set the priority for the application master
Priority pri = Priority.newInstance(0);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue("default");
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer);
// unmanaged AM
appContext.setUnmanagedAM(true);
// Create the request to send to the applications manager
SubmitApplicationRequest appRequest = Records
.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
// Submit the application to the applications manager
yarnClient.submitApplication(appContext);
// wait for app to start
int iterationsLeft = 30;
RMAppAttempt appAttempt = null;
while (iterationsLeft > 0) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
attemptId = appReport.getCurrentApplicationAttemptId();
appAttempt =
yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
sleep(1000);
--iterationsLeft;
}
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
// Just dig into the ResourceManager and get the AMRMToken just for the sake
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
//creating an instance NMTokenCase
nmTokenCache = new NMTokenCache();
// start am rm client
rmClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient();
//setting an instance NMTokenCase
rmClient.setNMTokenCache(nmTokenCache);
rmClient.init(conf);
rmClient.start();
assertNotNull(rmClient);
assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client
nmClient = (NMClientImpl) NMClient.createNMClient();
//propagating the AMRMClient NMTokenCache instance
nmClient.setNMTokenCache(rmClient.getNMTokenCache());
nmClient.init(conf);
nmClient.start();
assertNotNull(nmClient);
assertEquals(STATE.STARTED, nmClient.getServiceState());
}
@After
public void tearDown() {
rmClient.stop();
yarnClient.stop();
yarnCluster.stop();
}
private void stopNmClient(boolean stopContainers) {
assertNotNull("Null nmClient", nmClient);
// leave one unclosed
assertEquals(1, nmClient.startedContainers.size());
// default true
assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.cleanupRunningContainersOnStop(stopContainers);
assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}
@Test (timeout = 180000)
public void testNMClientNoCleanupOnStop()
throws YarnException, IOException {
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// don't stop the running containers
stopNmClient(false);
assertFalse(nmClient.startedContainers.isEmpty());
//now cleanup
nmClient.cleanupRunningContainers();
assertEquals(0, nmClient.startedContainers.size());
}
@Test (timeout = 200000)
public void testNMClient()
throws YarnException, IOException {
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// stop the running containers on close
assertFalse(nmClient.startedContainers.isEmpty());
nmClient.cleanupRunningContainersOnStop(true);
assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}
private Set<Container> allocateContainers(
AMRMClientImpl<ContainerRequest> rmClient, int num)
throws YarnException, IOException {
// setup container request
Resource capability = Resource.newInstance(1024, 0);
Priority priority = Priority.newInstance(0);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[] {node};
String[] racks = new String[] {rack};
for (int i = 0; i < num; ++i) {
rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority));
}
int containersRequestedAny = rmClient.getTable(0)
.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
capability).remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 2;
Set<Container> containers = new TreeSet<Container>();
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft > 0) {
AllocateResponse allocResponse = rmClient.allocate(0.1f);
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
containers.add(container);
}
if (!allocResponse.getNMTokens().isEmpty()) {
for (NMToken token : allocResponse.getNMTokens()) {
rmClient.getNMTokenCache().setToken(token.getNodeId().toString(),
token.getToken());
}
}
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
}
--iterationsLeft;
}
return containers;
}
private void testContainerManagement(NMClientImpl nmClient,
Set<Container> containers) throws YarnException, IOException {
int size = containers.size();
int i = 0;
for (Container container : containers) {
// getContainerStatus shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.getContainerStatus(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// upadateContainerResource shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.updateContainerResource(container);
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// restart shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.restartContainer(container.getId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// rollback shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.rollbackLastReInitialization(container.getId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// commit shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.commitLastReInitialization(container.getId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown
try {
nmClient.stopContainer(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
if (!e.getMessage()
.contains("is not handled by this NodeManager")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e).initCause(
e));
}
}
Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
if (Shell.WINDOWS) {
clc.setCommands(
Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul"));
} else {
clc.setCommands(Arrays.asList("sleep", "10"));
}
clc.setTokens(securityTokens);
try {
nmClient.startContainer(container, clc);
} catch (YarnException e) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e).initCause(e));
}
// leave one container unclosed
if (++i < size) {
// NodeManager may still need some time to make the container started
testGetContainerStatus(container, i, ContainerState.RUNNING, "",
Arrays.asList(new Integer[] {-1000}));
// Test increase container API and make sure requests can reach NM
testIncreaseContainerResource(container);
testRestartContainer(container.getId());
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Restarted", Arrays.asList(new Integer[] {-1000}));
if (i % 2 == 0) {
testReInitializeContainer(container.getId(), clc, false);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
testRollbackContainer(container.getId(), false);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Rolled-back", Arrays.asList(new Integer[] {-1000}));
testCommitContainer(container.getId(), true);
testReInitializeContainer(container.getId(), clc, false);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
testCommitContainer(container.getId(), false);
} else {
testReInitializeContainer(container.getId(), clc, true);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
testRollbackContainer(container.getId(), true);
testCommitContainer(container.getId(), true);
}
try {
nmClient.stopContainer(container.getId(), container.getNodeId());
} catch (YarnException e) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
// getContainerStatus can be called after stopContainer
try {
// O is possible if CLEANUP_CONTAINER is executed too late
// -105 is possible if the container is not terminated but killed
testGetContainerStatus(container, i, ContainerState.COMPLETE,
"Container killed by the ApplicationMaster.", Arrays.asList(
new Integer[] {ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.SUCCESS}));
} catch (YarnException e) {
// The exception is possible because, after the container is stopped,
// it may be removed from NM's context.
if (!e.getMessage()
.contains("was recently stopped on node manager")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e).initCause(
e));
}
}
}
}
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void testGetContainerStatus(Container container, int index,
ContainerState state, String diagnostics, List<Integer> exitStatuses)
throws YarnException, IOException {
while (true) {
sleep(250);
ContainerStatus status = nmClient.getContainerStatus(
container.getId(), container.getNodeId());
// NodeManager may still need some time to get the stable
// container status
if (status.getState() == state) {
assertEquals(container.getId(), status.getContainerId());
assertTrue("" + index + ": " + status.getDiagnostics(),
status.getDiagnostics().contains(diagnostics));
assertTrue("Exit Statuses are supposed to be in: " + exitStatuses +
", but the actual exit status code is: " +
status.getExitStatus(),
exitStatuses.contains(status.getExitStatus()));
break;
}
}
}
@SuppressWarnings("deprecation")
private void testIncreaseContainerResource(Container container)
throws YarnException, IOException {
try {
nmClient.increaseContainerResource(container);
} catch (YarnException e) {
// NM container increase container resource should fail without a version
// increase action to fail.
if (!e.getMessage().contains(
container.getId() + " has update version ")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
private void testRestartContainer(ContainerId containerId)
throws YarnException, IOException {
try {
sleep(250);
nmClient.restartContainer(containerId);
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
private void testRollbackContainer(ContainerId containerId,
boolean notRollbackable) throws YarnException, IOException {
try {
sleep(250);
nmClient.rollbackLastReInitialization(containerId);
if (notRollbackable) {
fail("Should not be able to rollback..");
}
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (notRollbackable) {
Assert.assertTrue(e.getMessage().contains(
"Nothing to rollback to"));
} else {
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}
private void testCommitContainer(ContainerId containerId,
boolean notCommittable) throws YarnException, IOException {
try {
nmClient.commitLastReInitialization(containerId);
if (notCommittable) {
fail("Should not be able to commit..");
}
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (notCommittable) {
Assert.assertTrue(e.getMessage().contains(
"Nothing to Commit"));
} else {
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}
private void testReInitializeContainer(ContainerId containerId,
ContainerLaunchContext clc, boolean autoCommit)
throws YarnException, IOException {
try {
nmClient.reInitializeContainer(containerId, clc, autoCommit);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}