blob: 143adb41d93324d074837fca24d267383ca8e7b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.util.Records;
/**
* This class can submit an application to {@link MockRM}.
*/
public class MockRMAppSubmitter {
public static RMApp submitWithMemory(long memory, MockRM mockRM)
throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemorySize(memory);
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
.createWithResource(resource, mockRM).build();
return MockRMAppSubmitter.submit(mockRM, data);
}
public static RMApp submit(MockRM mockRM, MockRMAppSubmissionData data)
throws Exception {
ApplicationId appId =
data.isAppIdProvided() ? data.getApplicationId() : null;
ApplicationClientProtocol client = mockRM.getClientRMService();
if (!data.isAppIdProvided()) {
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
appId = resp.getApplicationId();
}
SubmitApplicationRequest req = Records
.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
.newRecord(ApplicationSubmissionContext.class);
sub.setKeepContainersAcrossApplicationAttempts(data.isKeepContainers());
sub.setApplicationId(appId);
sub.setApplicationName(data.getName());
sub.setMaxAppAttempts(data.getMaxAppAttempts());
if (data.getApplicationTags() != null) {
sub.setApplicationTags(data.getApplicationTags());
}
if (data.getApplicationTimeouts() != null
&& data.getApplicationTimeouts().size() > 0) {
sub.setApplicationTimeouts(data.getApplicationTimeouts());
}
if (data.isUnmanaged()) {
sub.setUnmanagedAM(true);
}
if (data.getQueue() != null) {
sub.setQueue(data.getQueue());
}
if (data.getPriority() != null) {
sub.setPriority(data.getPriority());
}
if (data.getAppNodeLabel() != null) {
sub.setNodeLabelExpression(data.getAppNodeLabel());
}
sub.setApplicationType(data.getAppType());
ContainerLaunchContext clc = Records
.newRecord(ContainerLaunchContext.class);
clc.setApplicationACLs(data.getAcls());
if (data.getCredentials() != null
&& UserGroupInformation.isSecurityEnabled()) {
DataOutputBuffer dob = new DataOutputBuffer();
data.getCredentials().writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
clc.setTokens(securityTokens);
clc.setTokensConf(data.getTokensConf());
}
sub.setAMContainerSpec(clc);
sub.setAttemptFailuresValidityInterval(
data.getAttemptFailuresValidityInterval());
if (data.getLogAggregationContext() != null) {
sub.setLogAggregationContext(data.getLogAggregationContext());
}
sub.setCancelTokensWhenComplete(data.isCancelTokensWhenComplete());
Priority priority = data.getPriority();
if (priority == null) {
priority = Priority.newInstance(0);
}
List<ResourceRequest> amResourceRequests = data.getAmResourceRequests();
if (amResourceRequests == null || amResourceRequests.isEmpty()) {
ResourceRequest amResReq = ResourceRequest.newInstance(
priority, ResourceRequest.ANY, data.getResource(), 1);
amResourceRequests = Collections.singletonList(amResReq);
}
if (data.getAmLabel() != null && !data.getAmLabel().isEmpty()) {
for (ResourceRequest amResourceRequest : amResourceRequests) {
amResourceRequest.setNodeLabelExpression(data.getAmLabel().trim());
}
}
sub.setAMContainerResourceRequests(amResourceRequests);
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser = UserGroupInformation
.createUserForTesting(data.getUser(), new String[] { "someGroup" });
PrivilegedExceptionAction<SubmitApplicationResponse> action =
new SubmitApplicationResponsePrivilegedExceptionAction()
.setClientReq(client, req);
fakeUser.doAs(action);
// make sure app is immediately available after submit
if (data.isWaitForAccepted()) {
mockRM.waitForState(appId, RMAppState.ACCEPTED);
}
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
// unmanaged AM won't go to RMAppAttemptState.SCHEDULED.
if (data.isWaitForAccepted() && !data.isUnmanaged()) {
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
}
((AbstractYarnScheduler)mockRM.getResourceScheduler()).update();
return rmApp;
}
private static class SubmitApplicationResponsePrivilegedExceptionAction
implements PrivilegedExceptionAction<SubmitApplicationResponse> {
ApplicationClientProtocol client;
SubmitApplicationRequest req;
@Override
public SubmitApplicationResponse run() throws IOException, YarnException {
try {
return client.submitApplication(req);
} catch (YarnException | IOException e) {
e.printStackTrace();
throw e;
}
}
PrivilegedExceptionAction<SubmitApplicationResponse> setClientReq(
ApplicationClientProtocol client, SubmitApplicationRequest req) {
this.client = client;
this.req = req;
return this;
}
}
}