| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.uam; |
| |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; |
| import org.apache.hadoop.yarn.server.AMRMClientRelayer; |
| import org.apache.hadoop.yarn.server.MockResourceManagerFacade; |
| import org.apache.hadoop.yarn.util.AsyncCallback; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Unit test for UnmanagedApplicationManager. |
| */ |
| public class TestUnmanagedApplicationManager { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestUnmanagedApplicationManager.class); |
| |
| private TestableUnmanagedApplicationManager uam; |
| private Configuration conf = new YarnConfiguration(); |
| private CountingCallback callback; |
| |
| private ApplicationAttemptId attemptId; |
| |
| @Before |
| public void setup() { |
| conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId"); |
| callback = new CountingCallback(); |
| |
| attemptId = |
| ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); |
| |
| uam = new TestableUnmanagedApplicationManager(conf, |
| attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, |
| "rm"); |
| } |
| |
| protected void waitForCallBackCountAndCheckZeroPending( |
| CountingCallback callBack, int expectCallBackCount) { |
| synchronized (callBack) { |
| while (callBack.callBackCount != expectCallBackCount) { |
| try { |
| callBack.wait(); |
| } catch (InterruptedException e) { |
| } |
| } |
| Assert.assertEquals( |
| "Non zero pending requests when number of allocate callbacks reaches " |
| + expectCallBackCount, |
| 0, callBack.requestQueueSize); |
| } |
| } |
| |
| @Test(timeout = 10000) |
| public void testBasicUsage() |
| throws YarnException, IOException, InterruptedException { |
| |
| launchUAM(attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); |
| |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| |
| // Wait for outstanding async allocate callback |
| waitForCallBackCountAndCheckZeroPending(callback, 1); |
| |
| finishApplicationMaster( |
| FinishApplicationMasterRequest.newInstance(null, null, null), |
| attemptId); |
| |
| while (uam.isHeartbeatThreadAlive()) { |
| LOG.info("waiting for heartbeat thread to finish"); |
| Thread.sleep(100); |
| } |
| } |
| |
| /* |
| * Test re-attaching of an existing UAM. This is for HA of UAM client. |
| */ |
| @Test(timeout = 5000) |
| public void testUAMReAttach() |
| throws YarnException, IOException, InterruptedException { |
| |
| launchUAM(attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); |
| |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| // Wait for outstanding async allocate callback |
| waitForCallBackCountAndCheckZeroPending(callback, 1); |
| |
| MockResourceManagerFacade rmProxy = uam.getRMProxy(); |
| uam = new TestableUnmanagedApplicationManager(conf, |
| attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, |
| "rm"); |
| uam.setRMProxy(rmProxy); |
| |
| reAttachUAM(null, attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); |
| |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| |
| // Wait for outstanding async allocate callback |
| waitForCallBackCountAndCheckZeroPending(callback, 2); |
| |
| finishApplicationMaster( |
| FinishApplicationMasterRequest.newInstance(null, null, null), |
| attemptId); |
| } |
| |
| @Test(timeout = 5000) |
| public void testReRegister() |
| throws YarnException, IOException, InterruptedException { |
| |
| launchUAM(attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); |
| |
| uam.setShouldReRegisterNext(); |
| |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| |
| // Wait for outstanding async allocate callback |
| waitForCallBackCountAndCheckZeroPending(callback, 1); |
| |
| uam.setShouldReRegisterNext(); |
| |
| finishApplicationMaster( |
| FinishApplicationMasterRequest.newInstance(null, null, null), |
| attemptId); |
| } |
| |
| /** |
| * If register is slow, async allocate requests in the meanwhile should not |
| * throw or be dropped. |
| */ |
| @Test(timeout = 5000) |
| public void testSlowRegisterCall() |
| throws YarnException, IOException, InterruptedException { |
| |
| // Register with wait() in RM in a separate thread |
| Thread registerAMThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| launchUAM(attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 1001, null), |
| attemptId); |
| } catch (Exception e) { |
| LOG.info("Register thread exception", e); |
| } |
| } |
| }); |
| |
| // Sync obj from mock RM |
| Object syncObj = MockResourceManagerFacade.getRegisterSyncObj(); |
| |
| // Wait for register call in the thread get into RM and then wake us |
| synchronized (syncObj) { |
| LOG.info("Starting register thread"); |
| registerAMThread.start(); |
| try { |
| LOG.info("Test main starts waiting"); |
| syncObj.wait(); |
| LOG.info("Test main wait finished"); |
| } catch (Exception e) { |
| LOG.info("Test main wait interrupted", e); |
| } |
| } |
| |
| // First allocate before register succeeds |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| |
| // Notify the register thread |
| synchronized (syncObj) { |
| syncObj.notifyAll(); |
| } |
| |
| LOG.info("Test main wait for register thread to finish"); |
| registerAMThread.join(); |
| LOG.info("Register thread finished"); |
| |
| // Second allocate, normal case |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| |
| // Both allocate before should respond |
| waitForCallBackCountAndCheckZeroPending(callback, 2); |
| |
| finishApplicationMaster( |
| FinishApplicationMasterRequest.newInstance(null, null, null), |
| attemptId); |
| |
| // Allocates after finishAM should be ignored |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| |
| Assert.assertEquals(0, callback.requestQueueSize); |
| |
| // A short wait just in case the allocates get executed |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| } |
| |
| Assert.assertEquals(2, callback.callBackCount); |
| } |
| |
| @Test(expected = Exception.class) |
| public void testAllocateWithoutRegister() |
| throws YarnException, IOException, InterruptedException { |
| allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, |
| attemptId); |
| } |
| |
| @Test(expected = Exception.class) |
| public void testFinishWithoutRegister() |
| throws YarnException, IOException, InterruptedException { |
| finishApplicationMaster( |
| FinishApplicationMasterRequest.newInstance(null, null, null), |
| attemptId); |
| } |
| |
| @Test(timeout = 10000) |
| public void testForceKill() |
| throws YarnException, IOException, InterruptedException { |
| launchUAM(attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); |
| uam.forceKillApplication(); |
| |
| while (uam.isHeartbeatThreadAlive()) { |
| LOG.info("waiting for heartbeat thread to finish"); |
| Thread.sleep(100); |
| } |
| |
| try { |
| uam.forceKillApplication(); |
| Assert.fail("Should fail because application is already killed"); |
| } catch (YarnException t) { |
| } |
| } |
| |
| @Test(timeout = 10000) |
| public void testShutDownConnections() |
| throws YarnException, IOException, InterruptedException { |
| launchUAM(attemptId); |
| registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); |
| uam.shutDownConnections(); |
| while (uam.isHeartbeatThreadAlive()) { |
| LOG.info("waiting for heartbeat thread to finish"); |
| Thread.sleep(100); |
| } |
| } |
| |
| protected UserGroupInformation getUGIWithToken( |
| ApplicationAttemptId appAttemptId) { |
| UserGroupInformation ugi = |
| UserGroupInformation.createRemoteUser(appAttemptId.toString()); |
| AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1); |
| ugi.addTokenIdentifier(token); |
| return ugi; |
| } |
| |
| protected Token<AMRMTokenIdentifier> launchUAM( |
| ApplicationAttemptId appAttemptId) |
| throws IOException, InterruptedException { |
| return getUGIWithToken(appAttemptId) |
| .doAs(new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() { |
| @Override |
| public Token<AMRMTokenIdentifier> run() throws Exception { |
| return uam.launchUAM(); |
| } |
| }); |
| } |
| |
| protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken, |
| ApplicationAttemptId appAttemptId) |
| throws IOException, InterruptedException { |
| getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Token<AMRMTokenIdentifier> run() throws Exception { |
| uam.reAttachUAM(uamToken); |
| return null; |
| } |
| }); |
| } |
| |
| protected RegisterApplicationMasterResponse registerApplicationMaster( |
| final RegisterApplicationMasterRequest request, |
| ApplicationAttemptId appAttemptId) |
| throws YarnException, IOException, InterruptedException { |
| return getUGIWithToken(appAttemptId).doAs( |
| new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() { |
| @Override |
| public RegisterApplicationMasterResponse run() |
| throws YarnException, IOException { |
| return uam.registerApplicationMaster(request); |
| } |
| }); |
| } |
| |
| protected void allocateAsync(final AllocateRequest request, |
| final AsyncCallback<AllocateResponse> callBack, |
| ApplicationAttemptId appAttemptId) |
| throws YarnException, IOException, InterruptedException { |
| getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() throws YarnException { |
| uam.allocateAsync(request, callBack); |
| return null; |
| } |
| }); |
| } |
| |
| protected FinishApplicationMasterResponse finishApplicationMaster( |
| final FinishApplicationMasterRequest request, |
| ApplicationAttemptId appAttemptId) |
| throws YarnException, IOException, InterruptedException { |
| return getUGIWithToken(appAttemptId) |
| .doAs(new PrivilegedExceptionAction<FinishApplicationMasterResponse>() { |
| @Override |
| public FinishApplicationMasterResponse run() |
| throws YarnException, IOException { |
| FinishApplicationMasterResponse response = |
| uam.finishApplicationMaster(request); |
| return response; |
| } |
| }); |
| } |
| |
| protected class CountingCallback implements AsyncCallback<AllocateResponse> { |
| private int callBackCount; |
| private int requestQueueSize; |
| |
| @Override |
| public void callback(AllocateResponse response) { |
| synchronized (this) { |
| callBackCount++; |
| requestQueueSize = uam.getRequestQueueSize(); |
| this.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Testable UnmanagedApplicationManager that talks to a mock RM. |
| */ |
| public class TestableUnmanagedApplicationManager |
| extends UnmanagedApplicationManager { |
| |
| private MockResourceManagerFacade rmProxy; |
| |
| public TestableUnmanagedApplicationManager(Configuration conf, |
| ApplicationId appId, String queueName, String submitter, |
| String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, |
| String rmName) { |
| super(conf, appId, queueName, submitter, appNameSuffix, |
| keepContainersAcrossApplicationAttempts, rmName); |
| } |
| |
| @Override |
| protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler( |
| Configuration config, ApplicationId appId, |
| AMRMClientRelayer rmProxyRelayer) { |
| return new TestableAMRequestHandlerThread(config, appId, rmProxyRelayer); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| protected <T> T createRMProxy(final Class<T> protocol, Configuration config, |
| UserGroupInformation user, Token<AMRMTokenIdentifier> token) { |
| if (rmProxy == null) { |
| rmProxy = new MockResourceManagerFacade(config, 0); |
| } |
| return (T) rmProxy; |
| } |
| |
| public void setShouldReRegisterNext() { |
| if (rmProxy != null) { |
| rmProxy.setShouldReRegisterNext(); |
| } |
| } |
| |
| public MockResourceManagerFacade getRMProxy() { |
| return rmProxy; |
| } |
| |
| public void setRMProxy(MockResourceManagerFacade proxy) { |
| this.rmProxy = proxy; |
| } |
| } |
| |
| /** |
| * Wrap the handler thread so it calls from the same user. |
| */ |
| public class TestableAMRequestHandlerThread |
| extends AMHeartbeatRequestHandler { |
| public TestableAMRequestHandlerThread(Configuration conf, |
| ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) { |
| super(conf, applicationId, rmProxyRelayer); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| getUGIWithToken(attemptId) |
| .doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() { |
| TestableAMRequestHandlerThread.super.run(); |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| LOG.error("Exception running TestableAMRequestHandlerThread", e); |
| } |
| } |
| } |
| |
| } |