| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.lang.time.DateUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestContainerResourceUsage { |
| |
| private YarnConfiguration conf; |
| |
| @Before |
| public void setup() throws UnknownHostException { |
| Logger rootLogger = LogManager.getRootLogger(); |
| rootLogger.setLevel(Level.DEBUG); |
| conf = new YarnConfiguration(); |
| UserGroupInformation.setConfiguration(conf); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| } |
| |
| @After |
| public void tearDown() { |
| } |
| |
| @Test (timeout = 120000) |
| public void testUsageWithOneAttemptAndOneContainer() throws Exception { |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm = |
| new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); |
| nm.registerNode(); |
| |
| RMApp app0 = rm.submitApp(200); |
| |
| RMAppMetrics rmAppMetrics = app0.getRMAppMetrics(); |
| Assert.assertTrue( |
| "Before app submittion, memory seconds should have been 0 but was " |
| + rmAppMetrics.getMemorySeconds(), |
| rmAppMetrics.getMemorySeconds() == 0); |
| Assert.assertTrue( |
| "Before app submission, vcore seconds should have been 0 but was " |
| + rmAppMetrics.getVcoreSeconds(), |
| rmAppMetrics.getVcoreSeconds() == 0); |
| |
| RMAppAttempt attempt0 = app0.getCurrentAppAttempt(); |
| |
| nm.nodeHeartbeat(true); |
| MockAM am0 = rm.sendAMLaunched(attempt0.getAppAttemptId()); |
| am0.registerAppAttempt(); |
| |
| RMContainer rmContainer = |
| rm.getResourceScheduler() |
| .getRMContainer(attempt0.getMasterContainer().getId()); |
| |
| // Allow metrics to accumulate. |
| int sleepInterval = 1000; |
| int cumulativeSleepTime = 0; |
| while (rmAppMetrics.getMemorySeconds() <= 0 && cumulativeSleepTime < 5000) { |
| Thread.sleep(sleepInterval); |
| cumulativeSleepTime += sleepInterval; |
| } |
| |
| rmAppMetrics = app0.getRMAppMetrics(); |
| Assert.assertTrue( |
| "While app is running, memory seconds should be >0 but is " |
| + rmAppMetrics.getMemorySeconds(), |
| rmAppMetrics.getMemorySeconds() > 0); |
| Assert.assertTrue( |
| "While app is running, vcore seconds should be >0 but is " |
| + rmAppMetrics.getVcoreSeconds(), |
| rmAppMetrics.getVcoreSeconds() > 0); |
| |
| MockRM.finishAMAndVerifyAppState(app0, rm, nm, am0); |
| |
| AggregateAppResourceUsage ru = calculateContainerResourceMetrics(rmContainer); |
| rmAppMetrics = app0.getRMAppMetrics(); |
| |
| Assert.assertEquals("Unexpected MemorySeconds value", |
| ru.getMemorySeconds(), rmAppMetrics.getMemorySeconds()); |
| Assert.assertEquals("Unexpected VcoreSeconds value", |
| ru.getVcoreSeconds(), rmAppMetrics.getVcoreSeconds()); |
| |
| rm.stop(); |
| } |
| |
| @Test (timeout = 120000) |
| public void testUsageWithMultipleContainersAndRMRestart() throws Exception { |
| // Set max attempts to 1 so that when the first attempt fails, the app |
| // won't try to start a new one. |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); |
| conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); |
| conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); |
| MockRM rm0 = new MockRM(conf); |
| rm0.start(); |
| MockMemoryRMStateStore memStore = |
| (MockMemoryRMStateStore) rm0.getRMStateStore(); |
| MockNM nm = |
| new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService()); |
| nm.registerNode(); |
| |
| RMApp app0 = rm0.submitApp(200); |
| |
| rm0.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); |
| RMAppAttempt attempt0 = app0.getCurrentAppAttempt(); |
| ApplicationAttemptId attemptId0 = attempt0.getAppAttemptId(); |
| rm0.waitForState(attemptId0, RMAppAttemptState.SCHEDULED); |
| |
| nm.nodeHeartbeat(true); |
| rm0.waitForState(attemptId0, RMAppAttemptState.ALLOCATED); |
| MockAM am0 = rm0.sendAMLaunched(attempt0.getAppAttemptId()); |
| am0.registerAppAttempt(); |
| |
| int NUM_CONTAINERS = 2; |
| am0.allocate("127.0.0.1" , 1000, NUM_CONTAINERS, |
| new ArrayList<ContainerId>()); |
| nm.nodeHeartbeat(true); |
| List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers(); |
| while (conts.size() != NUM_CONTAINERS) { |
| nm.nodeHeartbeat(true); |
| conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers()); |
| Thread.sleep(500); |
| } |
| |
| // launch the 2nd and 3rd containers. |
| for (Container c : conts) { |
| nm.nodeHeartbeat(attempt0.getAppAttemptId(), |
| c.getId().getContainerId(), ContainerState.RUNNING); |
| rm0.waitForState(nm, c.getId(), RMContainerState.RUNNING); |
| } |
| |
| // Get the RMContainers for all of the live containers, to be used later |
| // for metrics calculations and comparisons. |
| Collection<RMContainer> rmContainers = |
| rm0.scheduler |
| .getSchedulerAppInfo(attempt0.getAppAttemptId()) |
| .getLiveContainers(); |
| |
| // Allow metrics to accumulate. |
| int sleepInterval = 1000; |
| int cumulativeSleepTime = 0; |
| while (app0.getRMAppMetrics().getMemorySeconds() <= 0 |
| && cumulativeSleepTime < 5000) { |
| Thread.sleep(sleepInterval); |
| cumulativeSleepTime += sleepInterval; |
| } |
| |
| // Stop all non-AM containers |
| for (Container c : conts) { |
| if (c.getId().getContainerId() == 1) continue; |
| nm.nodeHeartbeat(attempt0.getAppAttemptId(), |
| c.getId().getContainerId(), ContainerState.COMPLETE); |
| rm0.waitForState(nm, c.getId(), RMContainerState.COMPLETED); |
| } |
| |
| // After all other containers have completed, manually complete the master |
| // container in order to trigger a save to the state store of the resource |
| // usage metrics. This will cause the attempt to fail, and, since the max |
| // attempt retries is 1, the app will also fail. This is intentional so |
| // that all containers will complete prior to saving. |
| ContainerId cId = ContainerId.newContainerId(attempt0.getAppAttemptId(), 1); |
| nm.nodeHeartbeat(attempt0.getAppAttemptId(), |
| cId.getContainerId(), ContainerState.COMPLETE); |
| rm0.waitForState(nm, cId, RMContainerState.COMPLETED); |
| |
| // Check that the container metrics match those from the app usage report. |
| long memorySeconds = 0; |
| long vcoreSeconds = 0; |
| for (RMContainer c : rmContainers) { |
| AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c); |
| memorySeconds += ru.getMemorySeconds(); |
| vcoreSeconds += ru.getVcoreSeconds(); |
| } |
| |
| RMAppMetrics metricsBefore = app0.getRMAppMetrics(); |
| Assert.assertEquals("Unexpected MemorySeconds value", |
| memorySeconds, metricsBefore.getMemorySeconds()); |
| Assert.assertEquals("Unexpected VcoreSeconds value", |
| vcoreSeconds, metricsBefore.getVcoreSeconds()); |
| |
| // create new RM to represent RM restart. Load up the state store. |
| MockRM rm1 = new MockRM(conf, memStore); |
| rm1.start(); |
| RMApp app0After = |
| rm1.getRMContext().getRMApps().get(app0.getApplicationId()); |
| |
| // Compare container resource usage metrics from before and after restart. |
| RMAppMetrics metricsAfter = app0After.getRMAppMetrics(); |
| Assert.assertEquals("Vcore seconds were not the same after RM Restart", |
| metricsBefore.getVcoreSeconds(), metricsAfter.getVcoreSeconds()); |
| Assert.assertEquals("Memory seconds were not the same after RM Restart", |
| metricsBefore.getMemorySeconds(), metricsAfter.getMemorySeconds()); |
| |
| rm0.stop(); |
| rm0.close(); |
| rm1.stop(); |
| rm1.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testUsageAfterAMRestartWithMultipleContainers() throws Exception { |
| amRestartTests(false); |
| } |
| |
| @Test(timeout = 60000) |
| public void testUsageAfterAMRestartKeepContainers() throws Exception { |
| amRestartTests(true); |
| } |
| |
| private void amRestartTests(boolean keepRunningContainers) |
| throws Exception { |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| |
| RMApp app = |
| rm.submitApp(200, "name", "user", |
| new HashMap<ApplicationAccessType, String>(), false, "default", -1, |
| null, "MAPREDUCE", false, keepRunningContainers); |
| MockNM nm = |
| new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService()); |
| nm.registerNode(); |
| |
| MockAM am0 = MockRM.launchAndRegisterAM(app, rm, nm); |
| int NUM_CONTAINERS = 1; |
| // allocate NUM_CONTAINERS containers |
| am0.allocate("127.0.0.1", 1024, NUM_CONTAINERS, |
| new ArrayList<ContainerId>()); |
| nm.nodeHeartbeat(true); |
| |
| // wait for containers to be allocated. |
| List<Container> containers = |
| am0.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers(); |
| while (containers.size() != NUM_CONTAINERS) { |
| nm.nodeHeartbeat(true); |
| containers.addAll(am0.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers()); |
| Thread.sleep(200); |
| } |
| |
| // launch the 2nd container. |
| ContainerId containerId2 = |
| ContainerId.newContainerId(am0.getApplicationAttemptId(), 2); |
| nm.nodeHeartbeat(am0.getApplicationAttemptId(), |
| containerId2.getContainerId(), ContainerState.RUNNING); |
| rm.waitForState(nm, containerId2, RMContainerState.RUNNING); |
| |
| // Capture the containers here so the metrics can be calculated after the |
| // app has completed. |
| Collection<RMContainer> rmContainers = |
| rm.scheduler |
| .getSchedulerAppInfo(am0.getApplicationAttemptId()) |
| .getLiveContainers(); |
| |
| // fail the first app attempt by sending CONTAINER_FINISHED event without |
| // registering. |
| ContainerId amContainerId = |
| app.getCurrentAppAttempt().getMasterContainer().getId(); |
| nm.nodeHeartbeat(am0.getApplicationAttemptId(), |
| amContainerId.getContainerId(), ContainerState.COMPLETE); |
| rm.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED); |
| rm.drainEvents(); |
| long memorySeconds = 0; |
| long vcoreSeconds = 0; |
| |
| // Calculate container usage metrics for first attempt. |
| if (keepRunningContainers) { |
| // Only calculate the usage for the one container that has completed. |
| for (RMContainer c : rmContainers) { |
| if (c.getContainerId().equals(amContainerId)) { |
| AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c); |
| memorySeconds += ru.getMemorySeconds(); |
| vcoreSeconds += ru.getVcoreSeconds(); |
| } else { |
| // The remaining container should be RUNNING. |
| Assert.assertTrue("After first attempt failed, remaining container " |
| + "should still be running. ", |
| c.getContainerState().equals(ContainerState.RUNNING)); |
| } |
| } |
| } else { |
| // If keepRunningContainers is false, all live containers should now |
| // be completed. Calculate the resource usage metrics for all of them. |
| for (RMContainer c : rmContainers) { |
| waitforContainerCompletion(rm, nm, amContainerId, c); |
| AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c); |
| memorySeconds += ru.getMemorySeconds(); |
| vcoreSeconds += ru.getVcoreSeconds(); |
| } |
| } |
| |
| // wait for app to start a new attempt. |
| rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); |
| |
| // assert this is a new AM. |
| RMAppAttempt attempt2 = app.getCurrentAppAttempt(); |
| Assert.assertFalse(attempt2.getAppAttemptId() |
| .equals(am0.getApplicationAttemptId())); |
| |
| rm.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.SCHEDULED); |
| nm.nodeHeartbeat(true); |
| MockAM am1 = rm.sendAMLaunched(attempt2.getAppAttemptId()); |
| am1.registerAppAttempt(); |
| rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.RUNNING); |
| // allocate NUM_CONTAINERS containers |
| am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, |
| new ArrayList<ContainerId>()); |
| nm.nodeHeartbeat(true); |
| |
| // wait for containers to be allocated. |
| containers = |
| am1.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers(); |
| while (containers.size() != NUM_CONTAINERS) { |
| nm.nodeHeartbeat(true); |
| containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(), |
| new ArrayList<ContainerId>()).getAllocatedContainers()); |
| Thread.sleep(200); |
| } |
| |
| rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); |
| |
| // Capture running containers for later use by metrics calculations. |
| rmContainers = rm.scheduler.getSchedulerAppInfo(attempt2.getAppAttemptId()) |
| .getLiveContainers(); |
| |
| // complete container by sending the container complete event which has |
| // earlier attempt's attemptId |
| amContainerId = app.getCurrentAppAttempt().getMasterContainer().getId(); |
| nm.nodeHeartbeat(am0.getApplicationAttemptId(), |
| amContainerId.getContainerId(), ContainerState.COMPLETE); |
| |
| MockRM.finishAMAndVerifyAppState(app, rm, nm, am1); |
| |
| // Calculate container usage metrics for second attempt. |
| for (RMContainer c : rmContainers) { |
| waitforContainerCompletion(rm, nm, amContainerId, c); |
| AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c); |
| memorySeconds += ru.getMemorySeconds(); |
| vcoreSeconds += ru.getVcoreSeconds(); |
| } |
| |
| RMAppMetrics rmAppMetrics = app.getRMAppMetrics(); |
| |
| Assert.assertEquals("Unexpected MemorySeconds value", |
| memorySeconds, rmAppMetrics.getMemorySeconds()); |
| Assert.assertEquals("Unexpected VcoreSeconds value", |
| vcoreSeconds, rmAppMetrics.getVcoreSeconds()); |
| |
| rm.stop(); |
| return; |
| } |
| |
| private void waitforContainerCompletion(MockRM rm, MockNM nm, |
| ContainerId amContainerId, RMContainer container) throws Exception { |
| ContainerId containerId = container.getContainerId(); |
| if (null != rm.scheduler.getRMContainer(containerId)) { |
| if (containerId.equals(amContainerId)) { |
| rm.waitForState(nm, containerId, RMContainerState.COMPLETED); |
| } else { |
| rm.waitForState(nm, containerId, RMContainerState.KILLED); |
| } |
| } else { |
| rm.drainEvents(); |
| } |
| } |
| |
| private AggregateAppResourceUsage calculateContainerResourceMetrics( |
| RMContainer rmContainer) { |
| Resource resource = rmContainer.getContainer().getResource(); |
| long usedMillis = |
| rmContainer.getFinishTime() - rmContainer.getCreationTime(); |
| long memorySeconds = resource.getMemorySize() |
| * usedMillis / DateUtils.MILLIS_PER_SECOND; |
| long vcoreSeconds = resource.getVirtualCores() |
| * usedMillis / DateUtils.MILLIS_PER_SECOND; |
| Map<String, Long> map = new HashMap<>(); |
| map.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds); |
| map.put(ResourceInformation.VCORES.getName(), vcoreSeconds); |
| return new AggregateAppResourceUsage(map); |
| } |
| } |