| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.net.Node; |
| import org.apache.hadoop.net.NodeBase; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationState; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| /** |
| * Testing application cleanup (notifications to nodemanagers). |
| * |
| */ |
| @Ignore |
| public class TestApplicationCleanup { |
| // private static final Log LOG = LogFactory.getLog(TestApplicationCleanup.class); |
| // private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| // private AtomicInteger waitForState = new AtomicInteger(0); |
| // private ResourceScheduler scheduler; |
| // private final int memoryCapability = 1024; |
| // private ExtASM asm; |
| // private static final int memoryNeeded = 100; |
| // |
| // private final RMContext context = new RMContextImpl(new MemStore()); |
| // private ClientRMService clientService; |
| // |
| // @Before |
| // public void setUp() { |
| // new DummyApplicationTracker(); |
| // scheduler = new FifoScheduler(); |
| // context.getDispatcher().register(ApplicationTrackerEventType.class, scheduler); |
| // Configuration conf = new Configuration(); |
| // context.getDispatcher().init(conf); |
| // context.getDispatcher().start(); |
| // asm = new ExtASM(new ApplicationTokenSecretManager(), scheduler); |
| // asm.init(conf); |
| // clientService = new ClientRMService(context, |
| // asm.getAmLivelinessMonitor(), asm.getClientToAMSecretManager(), |
| // scheduler); |
| // } |
| // |
| // @After |
| // public void tearDown() { |
| // |
| // } |
| // |
| // |
| // private class DummyApplicationTracker implements EventHandler<ASMEvent |
| // <ApplicationTrackerEventType>> { |
| // |
| // public DummyApplicationTracker() { |
| // context.getDispatcher().register(ApplicationTrackerEventType.class, this); |
| // } |
| // @Override |
| // public void handle(ASMEvent<ApplicationTrackerEventType> event) { |
| // } |
| // |
| // } |
| // private class ExtASM extends ApplicationsManagerImpl { |
| // boolean schedulerCleanupCalled = false; |
| // boolean launcherLaunchCalled = false; |
| // boolean launcherCleanupCalled = false; |
| // boolean schedulerScheduleCalled = false; |
| // |
| // private class DummyApplicationMasterLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> { |
| // private AtomicInteger notify = new AtomicInteger(0); |
| // private AppAttempt application; |
| // |
| // public DummyApplicationMasterLauncher(RMContext context) { |
| // context.getDispatcher().register(AMLauncherEventType.class, this); |
| // } |
| // |
| // @Override |
| // public void handle(ASMEvent<AMLauncherEventType> appEvent) { |
| // AMLauncherEventType event = appEvent.getType(); |
| // switch (event) { |
| // case CLEANUP: |
| // launcherCleanupCalled = true; |
| // break; |
| // case LAUNCH: |
| // LOG.info("Launcher Launch called"); |
| // launcherLaunchCalled = true; |
| // application = appEvent.getApplication(); |
| // context.getDispatcher().getEventHandler().handle( |
| // new ApplicationEvent(ApplicationEventType.LAUNCHED, |
| // application.getApplicationID())); |
| // break; |
| // default: |
| // break; |
| // } |
| // } |
| // } |
| // |
| // private class DummySchedulerNegotiator implements EventHandler<ASMEvent<SNEventType>> { |
| // private AtomicInteger snnotify = new AtomicInteger(0); |
| // AppAttempt application; |
| // public DummySchedulerNegotiator(RMContext context) { |
| // context.getDispatcher().register(SNEventType.class, this); |
| // } |
| // |
| // @Override |
| // public void handle(ASMEvent<SNEventType> appEvent) { |
| // SNEventType event = appEvent.getType(); |
| // switch (event) { |
| // case RELEASE: |
| // schedulerCleanupCalled = true; |
| // break; |
| // case SCHEDULE: |
| // schedulerScheduleCalled = true; |
| // application = appEvent.getAppAttempt(); |
| // context.getDispatcher().getEventHandler().handle( |
| // new AMAllocatedEvent(application.getApplicationID(), |
| // application.getMasterContainer())); |
| // default: |
| // break; |
| // } |
| // } |
| // |
| // } |
| // public ExtASM(ApplicationTokenSecretManager applicationTokenSecretManager, |
| // YarnScheduler scheduler) { |
| // super(applicationTokenSecretManager, scheduler, context); |
| // } |
| // |
| // @Override |
| // protected EventHandler<ASMEvent<SNEventType>> createNewSchedulerNegotiator( |
| // YarnScheduler scheduler) { |
| // return new DummySchedulerNegotiator(context); |
| // } |
| // |
| // @Override |
| // protected EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher( |
| // ApplicationTokenSecretManager tokenSecretManager) { |
| // return new DummyApplicationMasterLauncher(context); |
| // } |
| // |
| // } |
| // |
| // private void waitForState(ApplicationState |
| // finalState, AppAttempt application) throws Exception { |
| // int count = 0; |
| // while(application.getState() != finalState && count < 10) { |
| // Thread.sleep(500); |
| // count++; |
| // } |
| // Assert.assertEquals(finalState, application.getState()); |
| // } |
| // |
| // |
| // private ResourceRequest createNewResourceRequest(int capability, int i) { |
| // ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); |
| // request.setCapability(recordFactory.newRecordInstance(Resource.class)); |
| // request.getCapability().setMemory(capability); |
| // request.setNumContainers(1); |
| // request.setPriority(recordFactory.newRecordInstance(Priority.class)); |
| // request.getPriority().setPriority(i); |
| // request.setHostName("*"); |
| // return request; |
| // } |
| // |
| // protected RMNode addNodes(String commonName, int i, int memoryCapability) throws IOException { |
| // NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); |
| // nodeId.setId(i); |
| // String hostName = commonName + "_" + i; |
| // Node node = new NodeBase(hostName, NetworkTopology.DEFAULT_RACK); |
| // Resource capability = recordFactory.newRecordInstance(Resource.class); |
| // capability.setMemory(memoryCapability); |
| // return new RMNodeImpl(nodeId, hostName, i, -i, node, capability); |
| // } |
| // |
| // @Test |
| // public void testApplicationCleanUp() throws Exception { |
| // ApplicationId appID = clientService.getNewApplicationId(); |
| // ApplicationSubmissionContext submissionContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); |
| // submissionContext.setApplicationId(appID); |
| // submissionContext.setQueue("queuename"); |
| // submissionContext.setUser("dummyuser"); |
| // SubmitApplicationRequest request = recordFactory |
| // .newRecordInstance(SubmitApplicationRequest.class); |
| // request.setApplicationSubmissionContext(submissionContext); |
| // clientService.submitApplication(request); |
| // waitForState(ApplicationState.LAUNCHED, context.getApplications().get( |
| // appID)); |
| // List<ResourceRequest> reqs = new ArrayList<ResourceRequest>(); |
| // ResourceRequest req = createNewResourceRequest(100, 1); |
| // reqs.add(req); |
| // reqs.add(createNewResourceRequest(memoryNeeded, 2)); |
| // List<Container> release = new ArrayList<Container>(); |
| // scheduler.allocate(appID, reqs, release); |
| // ArrayList<RMNode> nodesAdded = new ArrayList<RMNode>(); |
| // for (int i = 0; i < 10; i++) { |
| // nodesAdded.add(addNodes("localhost", i, memoryCapability)); |
| // } |
| // /* let one node heartbeat */ |
| // Map<String, List<Container>> containers = new HashMap<String, List<Container>>(); |
| // RMNode firstNode = nodesAdded.get(0); |
| // int firstNodeMemory = firstNode.getAvailableResource().getMemory(); |
| // RMNode secondNode = nodesAdded.get(1); |
| // |
| // context.getNodesCollection().updateListener(firstNode, containers); |
| // context.getNodesCollection().updateListener(secondNode, containers); |
| // LOG.info("Available resource on first node" + firstNode.getAvailableResource()); |
| // LOG.info("Available resource on second node" + secondNode.getAvailableResource()); |
| // /* only allocate the containers to the first node */ |
| // Assert.assertEquals((firstNodeMemory - (2 * memoryNeeded)), firstNode |
| // .getAvailableResource().getMemory()); |
| // context.getDispatcher().getEventHandler().handle( |
| // new ApplicationEvent(ApplicationEventType.KILL, appID)); |
| // while (asm.launcherCleanupCalled != true) { |
| // Thread.sleep(500); |
| // } |
| // Assert.assertTrue(asm.launcherCleanupCalled); |
| // Assert.assertTrue(asm.launcherLaunchCalled); |
| // Assert.assertTrue(asm.schedulerCleanupCalled); |
| // Assert.assertTrue(asm.schedulerScheduleCalled); |
| // /* check for update of completed application */ |
| // context.getNodesCollection().updateListener(firstNode, containers); |
| // NodeResponse response = firstNode.statusUpdate(containers); |
| // Assert.assertTrue(response.getFinishedApplications().contains(appID)); |
| // LOG.info("The containers to clean up " + response.getContainersToCleanUp().size()); |
| // Assert.assertEquals(2, response.getContainersToCleanUp().size()); |
| // } |
| } |