| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; |
| import org.apache.hadoop.yarn.util.Records; |
| |
| /** |
| * This class can submit an application to {@link MockRM}. |
| */ |
| public class MockRMAppSubmitter { |
| |
| public static RMApp submitWithMemory(long memory, MockRM mockRM) |
| throws Exception { |
| Resource resource = Records.newRecord(Resource.class); |
| resource.setMemorySize(memory); |
| MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder |
| .createWithResource(resource, mockRM).build(); |
| return MockRMAppSubmitter.submit(mockRM, data); |
| } |
| |
| public static RMApp submit(MockRM mockRM, MockRMAppSubmissionData data) |
| throws Exception { |
| ApplicationId appId = |
| data.isAppIdProvided() ? data.getApplicationId() : null; |
| ApplicationClientProtocol client = mockRM.getClientRMService(); |
| if (!data.isAppIdProvided()) { |
| GetNewApplicationResponse resp = client.getNewApplication(Records |
| .newRecord(GetNewApplicationRequest.class)); |
| appId = resp.getApplicationId(); |
| } |
| SubmitApplicationRequest req = Records |
| .newRecord(SubmitApplicationRequest.class); |
| ApplicationSubmissionContext sub = Records |
| .newRecord(ApplicationSubmissionContext.class); |
| sub.setKeepContainersAcrossApplicationAttempts(data.isKeepContainers()); |
| sub.setApplicationId(appId); |
| sub.setApplicationName(data.getName()); |
| sub.setMaxAppAttempts(data.getMaxAppAttempts()); |
| if (data.getApplicationTags() != null) { |
| sub.setApplicationTags(data.getApplicationTags()); |
| } |
| if (data.getApplicationTimeouts() != null |
| && data.getApplicationTimeouts().size() > 0) { |
| sub.setApplicationTimeouts(data.getApplicationTimeouts()); |
| } |
| if (data.isUnmanaged()) { |
| sub.setUnmanagedAM(true); |
| } |
| if (data.getQueue() != null) { |
| sub.setQueue(data.getQueue()); |
| } |
| if (data.getPriority() != null) { |
| sub.setPriority(data.getPriority()); |
| } |
| if (data.getAppNodeLabel() != null) { |
| sub.setNodeLabelExpression(data.getAppNodeLabel()); |
| } |
| sub.setApplicationType(data.getAppType()); |
| ContainerLaunchContext clc = Records |
| .newRecord(ContainerLaunchContext.class); |
| clc.setApplicationACLs(data.getAcls()); |
| if (data.getCredentials() != null |
| && UserGroupInformation.isSecurityEnabled()) { |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| data.getCredentials().writeTokenStorageToStream(dob); |
| ByteBuffer securityTokens = |
| ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| clc.setTokens(securityTokens); |
| clc.setTokensConf(data.getTokensConf()); |
| } |
| sub.setAMContainerSpec(clc); |
| sub.setAttemptFailuresValidityInterval( |
| data.getAttemptFailuresValidityInterval()); |
| if (data.getLogAggregationContext() != null) { |
| sub.setLogAggregationContext(data.getLogAggregationContext()); |
| } |
| sub.setCancelTokensWhenComplete(data.isCancelTokensWhenComplete()); |
| |
| Priority priority = data.getPriority(); |
| if (priority == null) { |
| priority = Priority.newInstance(0); |
| } |
| |
| List<ResourceRequest> amResourceRequests = data.getAmResourceRequests(); |
| if (amResourceRequests == null || amResourceRequests.isEmpty()) { |
| ResourceRequest amResReq = ResourceRequest.newInstance( |
| priority, ResourceRequest.ANY, data.getResource(), 1); |
| amResourceRequests = Collections.singletonList(amResReq); |
| } |
| |
| if (data.getAmLabel() != null && !data.getAmLabel().isEmpty()) { |
| for (ResourceRequest amResourceRequest : amResourceRequests) { |
| amResourceRequest.setNodeLabelExpression(data.getAmLabel().trim()); |
| } |
| } |
| sub.setAMContainerResourceRequests(amResourceRequests); |
| |
| req.setApplicationSubmissionContext(sub); |
| UserGroupInformation fakeUser = UserGroupInformation |
| .createUserForTesting(data.getUser(), new String[] { "someGroup" }); |
| PrivilegedExceptionAction<SubmitApplicationResponse> action = |
| new SubmitApplicationResponsePrivilegedExceptionAction() |
| .setClientReq(client, req); |
| fakeUser.doAs(action); |
| // make sure app is immediately available after submit |
| if (data.isWaitForAccepted()) { |
| mockRM.waitForState(appId, RMAppState.ACCEPTED); |
| } |
| RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); |
| |
| // unmanaged AM won't go to RMAppAttemptState.SCHEDULED. |
| if (data.isWaitForAccepted() && !data.isUnmanaged()) { |
| mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), |
| RMAppAttemptState.SCHEDULED); |
| } |
| |
| ((AbstractYarnScheduler)mockRM.getResourceScheduler()).update(); |
| |
| return rmApp; |
| } |
| |
| private static class SubmitApplicationResponsePrivilegedExceptionAction |
| implements PrivilegedExceptionAction<SubmitApplicationResponse> { |
| ApplicationClientProtocol client; |
| SubmitApplicationRequest req; |
| |
| @Override |
| public SubmitApplicationResponse run() throws IOException, YarnException { |
| try { |
| return client.submitApplication(req); |
| } catch (YarnException | IOException e) { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| PrivilegedExceptionAction<SubmitApplicationResponse> setClientReq( |
| ApplicationClientProtocol client, SubmitApplicationRequest req) { |
| this.client = client; |
| this.req = req; |
| return this; |
| } |
| } |
| } |