blob: babf22b6399bf29779823cdf7add752763a63c26 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
/**
* Test YarnRunner and make sure the client side plugin works
* fine
*/
public class TestYARNRunner {
private static final Logger LOG =
LoggerFactory.getLogger(TestYARNRunner.class);
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
// prefix before <LOG_DIR>/profile.out
private static final String PROFILE_PARAMS =
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
private static class CustomResourceTypesConfigurationProvider
extends LocalConfigurationProvider {
@Override
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
String name) throws YarnException, IOException {
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
return new ByteArrayInputStream(
("<configuration>\n" +
" <property>\n" +
" <name>yarn.resource-types</name>\n" +
" <value>a-custom-resource</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
" <value>G</value>\n" +
" </property>\n" +
"</configuration>\n").getBytes());
} else {
return super.getConfigurationInputStream(bootstrapConf, name);
}
}
}
private static class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
public void close() {
}
@Override
protected void append(LoggingEvent arg0) {
logEvents.add(arg0);
}
private List<LoggingEvent> getLogEvents() {
return logEvents;
}
}
private YARNRunner yarnRunner;
private ResourceMgrDelegate resourceMgrDelegate;
private YarnConfiguration conf;
private ClientCache clientCache;
private ApplicationId appId;
private JobID jobId;
private File testWorkDir =
new File("target", TestYARNRunner.class.getName());
private ApplicationSubmissionContext submissionContext;
private ClientServiceDelegate clientDelegate;
private static final String failString = "Rejected job";
@BeforeClass
public static void setupBeforeClass() {
ResourceUtils.resetResourceTypes(new Configuration());
}
@Before
public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
clientCache = new ClientCache(conf, resourceMgrDelegate);
clientCache = spy(clientCache);
yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
yarnRunner = spy(yarnRunner);
submissionContext = mock(ApplicationSubmissionContext.class);
doAnswer(
new Answer<ApplicationSubmissionContext>() {
@Override
public ApplicationSubmissionContext answer(InvocationOnMock invocation)
throws Throwable {
return submissionContext;
}
}
).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class),
any(String.class), any(Credentials.class));
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
jobId = TypeConverter.fromYarn(appId);
if (testWorkDir.exists()) {
FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);
}
testWorkDir.mkdirs();
}
@After
public void cleanup() {
FileUtil.fullyDelete(testWorkDir);
ResourceUtils.resetResourceTypes(new Configuration());
}
@Test(timeout=20000)
public void testJobKill() throws Exception {
clientDelegate = mock(ClientServiceDelegate.class);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
when(clientDelegate.killJob(any(JobID.class))).thenReturn(true);
doAnswer(
new Answer<ClientServiceDelegate>() {
@Override
public ClientServiceDelegate answer(InvocationOnMock invocation)
throws Throwable {
return clientDelegate;
}
}
).when(clientCache).getClient(any(JobID.class));
yarnRunner.killJob(jobId);
verify(resourceMgrDelegate).killApplication(appId);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null);
when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class)))
.thenReturn(
ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp",
"tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp",
0L, 0L, 0L,
FinalApplicationStatus.SUCCEEDED, null, null, 0f,
"tmp", null));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
}
@Test(timeout=60000)
public void testJobKillTimeout() throws Exception {
long timeToWaitBeforeHardKill =
10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS;
conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
timeToWaitBeforeHardKill);
clientDelegate = mock(ClientServiceDelegate.class);
doAnswer(
new Answer<ClientServiceDelegate>() {
@Override
public ClientServiceDelegate answer(InvocationOnMock invocation)
throws Throwable {
return clientDelegate;
}
}
).when(clientCache).getClient(any(JobID.class));
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
long startTimeMillis = System.currentTimeMillis();
yarnRunner.killJob(jobId);
assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill
+ " ms.", System.currentTimeMillis() - startTimeMillis
>= timeToWaitBeforeHardKill);
}
@Test(timeout=20000)
public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
thenReturn(appId);
ApplicationReport report = mock(ApplicationReport.class);
when(report.getApplicationId()).thenReturn(appId);
when(report.getDiagnostics()).thenReturn(failString);
when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FAILED);
when(resourceMgrDelegate.getApplicationReport(appId)).thenReturn(report);
Credentials credentials = new Credentials();
File jobxml = new File(testWorkDir, "job.xml");
OutputStream out = new FileOutputStream(jobxml);
conf.writeXml(out);
out.close();
try {
yarnRunner.submitJob(jobId, testWorkDir.getAbsolutePath().toString(), credentials);
} catch(IOException io) {
LOG.info("Logging exception:", io);
assertTrue(io.getLocalizedMessage().contains(failString));
}
}
@Test(timeout=20000)
public void testResourceMgrDelegate() throws Exception {
/* we not want a mock of resource mgr delegate */
final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class);
ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
@Override
protected void serviceStart() throws Exception {
assertTrue(this.client instanceof YarnClientImpl);
((YarnClientImpl) this.client).setRMClient(clientRMProtocol);
}
};
/* make sure kill calls finish application master */
when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
.thenReturn(KillApplicationResponse.newInstance(true));
delegate.killApplication(appId);
verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
/* make sure getalljobs calls get all applications */
when(clientRMProtocol.getApplications(any(GetApplicationsRequest.class))).
thenReturn(recordFactory.newRecordInstance(GetApplicationsResponse.class));
delegate.getAllJobs();
verify(clientRMProtocol).getApplications(any(GetApplicationsRequest.class));
/* make sure getapplication report is called */
when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class)))
.thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class));
delegate.getApplicationReport(appId);
verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class));
/* make sure metrics is called */
GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance
(GetClusterMetricsResponse.class);
clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance(
YarnClusterMetrics.class));
when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class)))
.thenReturn(clusterMetricsResponse);
delegate.getClusterMetrics();
verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class));
when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))).
thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class));
delegate.getActiveTrackers();
verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class));
GetNewApplicationResponse newAppResponse = recordFactory.newRecordInstance(
GetNewApplicationResponse.class);
newAppResponse.setApplicationId(appId);
when(clientRMProtocol.getNewApplication(any(GetNewApplicationRequest.class))).
thenReturn(newAppResponse);
delegate.getNewJobID();
verify(clientRMProtocol).getNewApplication(any(GetNewApplicationRequest.class));
GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance(
GetQueueInfoResponse.class);
queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class));
when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))).
thenReturn(queueInfoResponse);
delegate.getQueues();
verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class));
GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance(
GetQueueUserAclsInfoResponse.class);
when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)))
.thenReturn(aclResponse);
delegate.getQueueAclsForCurrentUser();
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
}
@Test(timeout=20000)
public void testGetHSDelegationToken() throws Exception {
try {
Configuration conf = new Configuration();
// Setup mock service
InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
// Setup mock rm token
RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
new Text("owner"), new Text("renewer"), new Text("real"));
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
// Setup mock history token
org.apache.hadoop.yarn.api.records.Token historyToken =
org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0],
MRDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
hsTokenSevice.toString());
GetDelegationTokenResponse getDtResponse =
Records.newRecord(GetDelegationTokenResponse.class);
getDtResponse.setDelegationToken(historyToken);
// mock services
MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
any(GetDelegationTokenRequest.class));
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService();
ClientCache clientCache = mock(ClientCache.class);
doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
Credentials creds = new Credentials();
YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
// No HS token if no RM token
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// No HS token if RM token, but secirity disabled.
creds.addToken(new Text("rmdt"), token);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
creds = new Credentials();
// No HS token if no RM token, security enabled
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(0)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// HS token if RM token present, security enabled
creds.addToken(new Text("rmdt"), token);
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(1)).getDelegationToken(
any(GetDelegationTokenRequest.class));
// No additional call to get HS token if RM and HS token present
yarnRunner.addHistoryToken(creds);
verify(mockHsProxy, times(1)).getDelegationToken(
any(GetDelegationTokenRequest.class));
} finally {
// Back to defaults.
UserGroupInformation.setConfiguration(new Configuration());
}
}
@Test(timeout=20000)
public void testHistoryServerToken() throws Exception {
//Set the master principal in the config
conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
final String masterPrincipal = Master.getMasterPrincipal(conf);
final MRClientProtocol hsProxy = mock(MRClientProtocol.class);
when(hsProxy.getDelegationToken(any(GetDelegationTokenRequest.class))).thenAnswer(
new Answer<GetDelegationTokenResponse>() {
public GetDelegationTokenResponse answer(InvocationOnMock invocation) {
GetDelegationTokenRequest request =
(GetDelegationTokenRequest)invocation.getArguments()[0];
// check that the renewer matches the cluster's RM principal
assertEquals(masterPrincipal, request.getRenewer() );
org.apache.hadoop.yarn.api.records.Token token =
recordFactory.newRecordInstance(org.apache.hadoop.yarn.api.records.Token.class);
// none of these fields matter for the sake of the test
token.setKind("");
token.setService("");
token.setIdentifier(ByteBuffer.allocate(0));
token.setPassword(ByteBuffer.allocate(0));
GetDelegationTokenResponse tokenResponse =
recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
tokenResponse.setDelegationToken(token);
return tokenResponse;
}
});
UserGroupInformation.createRemoteUser("someone").doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
yarnRunner = new YARNRunner(conf, null, null);
yarnRunner.getDelegationTokenFromHS(hsProxy);
verify(hsProxy).
getDelegationToken(any(GetDelegationTokenRequest.class));
return null;
}
});
}
@Test(timeout=20000)
public void testAMAdminCommandOpts() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true");
jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m");
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec();
List<String> commands = containerSpec.getCommands();
int index = 0;
int adminIndex = 0;
int adminPos = -1;
int userIndex = 0;
int userPos = -1;
int tmpDirPos = -1;
for(String command : commands) {
if(command != null) {
assertFalse("Profiler should be disabled by default",
command.contains(PROFILE_PARAMS));
adminPos = command.indexOf("-Djava.net.preferIPv4Stack=true");
if(adminPos >= 0)
adminIndex = index;
userPos = command.indexOf("-Xmx1024m");
if(userPos >= 0)
userIndex = index;
tmpDirPos = command.indexOf("-Djava.io.tmpdir=");
}
index++;
}
// Check java.io.tmpdir opts are set in the commands
assertTrue("java.io.tmpdir is not set for AM", tmpDirPos > 0);
// Check both admin java opts and user java opts are in the commands
assertTrue("AM admin command opts not in the commands.", adminPos > 0);
assertTrue("AM user command opts not in the commands.", userPos > 0);
// Check the admin java opts is before user java opts in the commands
if(adminIndex == userIndex) {
assertTrue("AM admin command opts is after user command opts.", adminPos < userPos);
} else {
assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
}
}
@Test(timeout=20000)
public void testWarnCommandOpts() throws Exception {
org.apache.log4j.Logger logger =
org.apache.log4j.Logger.getLogger(YARNRunner.class);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
Appender appender = new WriterAppender(layout, bout);
logger.addAppender(appender);
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo");
jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar");
YARNRunner yarnRunner = new YARNRunner(jobConf);
@SuppressWarnings("unused")
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
String logMsg = bout.toString();
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.admin-command-opts can cause programs to no " +
"longer function if hadoop native libraries are used. These values " +
"should be set as part of the LD_LIBRARY_PATH in the app master JVM " +
"env using yarn.app.mapreduce.am.admin.user.env config settings."));
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.command-opts can cause programs to no longer " +
"function if hadoop native libraries are used. These values should " +
"be set as part of the LD_LIBRARY_PATH in the app master JVM env " +
"using yarn.app.mapreduce.am.env config settings."));
}
@Test(timeout=20000)
public void testAMProfiler() throws Exception {
JobConf jobConf = new JobConf();
jobConf.setBoolean(MRJobConfig.MR_AM_PROFILE, true);
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec();
List<String> commands = containerSpec.getCommands();
for(String command : commands) {
if (command != null) {
if (command.contains(PROFILE_PARAMS)) {
return;
}
}
}
throw new IllegalStateException("Profiler opts not found!");
}
@Test
public void testNodeLabelExp() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU");
jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem");
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
assertEquals(appSubCtx.getAMContainerResourceRequests().get(0)
.getNodeLabelExpression(), "highMem");
}
@Test
public void testResourceRequestLocalityAny() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, true);
verifyResourceRequestLocality(null, null, amAnyResourceRequest);
verifyResourceRequestLocality(null, "label1", amAnyResourceRequest);
}
@Test
public void testResourceRequestLocalityRack() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", true);
verifyResourceRequestLocality("/rack1", null, amAnyResourceRequest,
amRackResourceRequest);
verifyResourceRequestLocality("/rack1", "label1", amAnyResourceRequest,
amRackResourceRequest);
}
@Test
public void testResourceRequestLocalityNode() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
verifyResourceRequestLocality("/rack1/node1", null, amAnyResourceRequest,
amRackResourceRequest, amNodeResourceRequest);
verifyResourceRequestLocality("/rack1/node1", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
}
@Test
public void testResourceRequestLocalityNodeDefaultRack() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/default-rack", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
verifyResourceRequestLocality("node1", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
verifyResourceRequestLocality("node1", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
}
@Test
public void testResourceRequestLocalityMultipleNodes() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
ResourceRequest amNode2ResourceRequest =
createResourceRequest("node2", true);
verifyResourceRequestLocality("/rack1/node1,/rack1/node2", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amNode2ResourceRequest);
verifyResourceRequestLocality("/rack1/node1,/rack1/node2", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amNode2ResourceRequest);
}
@Test
public void testResourceRequestLocalityMultipleNodesDifferentRack()
throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
ResourceRequest amRack2ResourceRequest =
createResourceRequest("/rack2", false);
ResourceRequest amNode2ResourceRequest =
createResourceRequest("node2", true);
verifyResourceRequestLocality("/rack1/node1,/rack2/node2", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
verifyResourceRequestLocality("/rack1/node1,/rack2/node2", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
}
@Test
public void testResourceRequestLocalityMultipleNodesDefaultRack()
throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
ResourceRequest amRack2ResourceRequest =
createResourceRequest("/default-rack", false);
ResourceRequest amNode2ResourceRequest =
createResourceRequest("node2", true);
verifyResourceRequestLocality("/rack1/node1,node2", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
verifyResourceRequestLocality("/rack1/node1,node2", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
}
@Test
public void testResourceRequestLocalityInvalid() throws Exception {
try {
verifyResourceRequestLocality("rack/node1", null,
new ResourceRequest[]{});
fail("Should have failed due to invalid resource but did not");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Invalid resource name"));
}
try {
verifyResourceRequestLocality("/rack/node1/blah", null,
new ResourceRequest[]{});
fail("Should have failed due to invalid resource but did not");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Invalid resource name"));
}
}
private void verifyResourceRequestLocality(String strictResource,
String label, ResourceRequest... expectedReqs) throws Exception {
JobConf jobConf = new JobConf();
if (strictResource != null) {
jobConf.set(MRJobConfig.AM_STRICT_LOCALITY, strictResource);
}
if (label != null) {
jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, label);
for (ResourceRequest expectedReq : expectedReqs) {
expectedReq.setNodeLabelExpression(label);
}
}
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
assertEquals(Arrays.asList(expectedReqs),
appSubCtx.getAMContainerResourceRequests());
}
private ResourceRequest createResourceRequest(String name,
boolean relaxLocality) {
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
capability.setVirtualCores(MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
ResourceRequest req =
recordFactory.newRecordInstance(ResourceRequest.class);
req.setPriority(YARNRunner.AM_CONTAINER_PRIORITY);
req.setResourceName(name);
req.setCapability(capability);
req.setNumContainers(1);
req.setRelaxLocality(relaxLocality);
return req;
}
@Test
public void testAMStandardEnvWithDefaultLibPath() throws Exception {
testAMStandardEnv(false, false);
}
@Test
public void testAMStandardEnvWithCustomLibPath() throws Exception {
testAMStandardEnv(true, false);
}
@Test
public void testAMStandardEnvWithCustomLibPathWithSeparateEnvProps()
throws Exception {
testAMStandardEnv(true, true);
}
private void testAMStandardEnv(boolean customLibPath,
boolean useSeparateEnvProps) throws Exception {
// the Windows behavior is different and this test currently doesn't really
// apply
// MAPREDUCE-6588 should revisit this test
assumeNotWindows();
final String ADMIN_LIB_PATH = "foo";
final String USER_LIB_PATH = "bar";
final String USER_SHELL = "shell";
JobConf jobConf = new JobConf();
String pathKey = Environment.LD_LIBRARY_PATH.name();
if (customLibPath) {
if (useSeparateEnvProps) {
// Specify these as individual variables instead of k=v lists
jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV + "." + pathKey,
ADMIN_LIB_PATH);
jobConf.set(MRJobConfig.MR_AM_ENV + "." + pathKey, USER_LIB_PATH);
} else {
jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, pathKey + "=" +
ADMIN_LIB_PATH);
jobConf.set(MRJobConfig.MR_AM_ENV, pathKey + "=" + USER_LIB_PATH);
}
}
jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
// make sure PWD is first in the lib path
ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
Map<String, String> env = clc.getEnvironment();
String libPath = env.get(pathKey);
assertNotNull(pathKey + " not set", libPath);
String cps = jobConf.getBoolean(
MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
String expectedLibPath =
MRApps.crossPlatformifyMREnv(conf, Environment.PWD);
if (customLibPath) {
// append admin libpath and user libpath
expectedLibPath += cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH;
} else {
expectedLibPath += cps +
MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV.substring(
pathKey.length() + 1);
}
assertEquals("Bad AM " + pathKey + " setting", expectedLibPath, libPath);
// make sure SHELL is set
String shell = env.get(Environment.SHELL.name());
assertNotNull("SHELL not set", shell);
assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
@Test
public void testJobPriority() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.PRIORITY, "LOW");
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner,
jobConf);
// 2 corresponds to LOW
assertEquals(appSubCtx.getPriority(), Priority.newInstance(2));
// Set an integer explicitly
jobConf.set(MRJobConfig.PRIORITY, "12");
yarnRunner = new YARNRunner(jobConf);
appSubCtx = buildSubmitContext(yarnRunner,
jobConf);
// Verify whether 12 is set to submission context
assertEquals(appSubCtx.getPriority(), Priority.newInstance(12));
}
private ApplicationSubmissionContext buildSubmitContext(
YARNRunner yarnRunner, JobConf jobConf) throws IOException {
File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE);
OutputStream out = new FileOutputStream(jobxml);
conf.writeXml(out);
out.close();
File jobsplit = new File(testWorkDir, MRJobConfig.JOB_SPLIT);
out = new FileOutputStream(jobsplit);
out.close();
File jobsplitmetainfo = new File(testWorkDir,
MRJobConfig.JOB_SPLIT_METAINFO);
out = new FileOutputStream(jobsplitmetainfo);
out.close();
return yarnRunner.createApplicationSubmissionContext(jobConf,
testWorkDir.toString(), new Credentials());
}
// Test configs that match regex expression should be set in
// containerLaunchContext
@Test
public void testSendJobConf() throws IOException {
JobConf jobConf = new JobConf();
jobConf.set("dfs.nameservices", "mycluster1,mycluster2");
jobConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1");
jobConf.set("dfs.namenode.rpc-address.mycluster2.nn2", "123.0.0.2");
jobConf.set("dfs.ha.namenodes.mycluster2", "nn1,nn2");
jobConf.set("dfs.client.failover.proxy.provider.mycluster2", "provider");
jobConf.set("hadoop.tmp.dir", "testconfdir");
jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
jobConf.set("mapreduce.job.send-token-conf",
"dfs.nameservices|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$"
+ "|^dfs.client.failover.proxy.provider.*$"
+ "|dfs.namenode.kerberos.principal");
UserGroupInformation.setConfiguration(jobConf);
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
Configuration confSent = BuilderUtils.parseTokensConf(submissionContext);
// configs that match regex should be included
Assert.assertEquals("123.0.0.1",
confSent.get("dfs.namenode.rpc-address.mycluster2.nn1"));
Assert.assertEquals("123.0.0.2",
confSent.get("dfs.namenode.rpc-address.mycluster2.nn2"));
// configs that aren't matching regex should not be included
Assert.assertTrue(confSent.get("hadoop.tmp.dir") == null || !confSent
.get("hadoop.tmp.dir").equals("testconfdir"));
UserGroupInformation.reset();
}
@Test
public void testCustomAMRMResourceType() throws Exception {
initResourceTypes();
String customResourceName = "a-custom-resource";
JobConf jobConf = new JobConf();
jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
customResourceName, 5);
jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
yarnRunner = new YARNRunner(jobConf);
submissionContext = buildSubmitContext(yarnRunner, jobConf);
List<ResourceRequest> resourceRequests =
submissionContext.getAMContainerResourceRequests();
Assert.assertEquals(1, resourceRequests.size());
ResourceRequest resourceRequest = resourceRequests.get(0);
ResourceInformation resourceInformation = resourceRequest.getCapability()
.getResourceInformation(customResourceName);
Assert.assertEquals("Expecting the default unit (G)",
"G", resourceInformation.getUnits());
Assert.assertEquals(5L, resourceInformation.getValue());
Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores());
}
@Test
public void testAMRMemoryRequest() throws Exception {
for (String memoryName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
yarnRunner = new YARNRunner(jobConf);
submissionContext = buildSubmitContext(yarnRunner, jobConf);
List<ResourceRequest> resourceRequests =
submissionContext.getAMContainerResourceRequests();
Assert.assertEquals(1, resourceRequests.size());
ResourceRequest resourceRequest = resourceRequests.get(0);
long memorySize = resourceRequest.getCapability().getMemorySize();
Assert.assertEquals(3072, memorySize);
}
}
@Test
public void testAMRMemoryRequestOverriding() throws Exception {
for (String memoryName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
TestAppender testAppender = new TestAppender();
org.apache.log4j.Logger logger =
org.apache.log4j.Logger.getLogger(YARNRunner.class);
logger.addAppender(testAppender);
try {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048);
yarnRunner = new YARNRunner(jobConf);
submissionContext = buildSubmitContext(yarnRunner, jobConf);
List<ResourceRequest> resourceRequests =
submissionContext.getAMContainerResourceRequests();
Assert.assertEquals(1, resourceRequests.size());
ResourceRequest resourceRequest = resourceRequests.get(0);
long memorySize = resourceRequest.getCapability().getMemorySize();
Assert.assertEquals(3072, memorySize);
assertTrue(testAppender.getLogEvents().stream().anyMatch(
e -> e.getLevel() == Level.WARN && ("Configuration " +
"yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " +
"overriding the yarn.app.mapreduce.am.resource.mb=2048 " +
"configuration").equals(e.getMessage())));
} finally {
logger.removeAppender(testAppender);
}
}
}
private void initResourceTypes() {
Configuration configuration = new Configuration();
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
CustomResourceTypesConfigurationProvider.class.getName());
ResourceUtils.resetResourceTypes(configuration);
}
}