| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.app.rm; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyFloat; |
| import static org.mockito.Matchers.anyInt; |
| import static org.mockito.Matchers.isA; |
| import static org.mockito.Mockito.doCallRealMethod; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.inOrder; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskType; |
| import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; |
| import org.apache.hadoop.mapreduce.v2.app.MRApp; |
| import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; |
| import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; |
| import org.apache.hadoop.mapreduce.v2.app.client.ClientService; |
| import org.apache.hadoop.mapreduce.v2.app.job.Job; |
| import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; |
| import org.apache.hadoop.mapreduce.v2.app.job.Task; |
| import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; |
| import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; |
| import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; |
| import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.CollectorInfo; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NMToken; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.client.api.TimelineV2Client; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.records.NodeAction; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.ControlledClock; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import com.google.common.base.Supplier; |
| import org.mockito.InOrder; |
| |
| @SuppressWarnings("unchecked") |
| public class TestRMContainerAllocator { |
| |
| static final Log LOG = LogFactory |
| .getLog(TestRMContainerAllocator.class); |
| static final RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| @Before |
| public void setup() { |
| MyContainerAllocator.getJobUpdatedNodeEvents().clear(); |
| MyContainerAllocator.getTaskAttemptKillEvents().clear(); |
| |
| // make each test create a fresh user to avoid leaking tokens between tests |
| UserGroupInformation.setLoginUser(null); |
| } |
| |
| @After |
| public void tearDown() { |
| DefaultMetricsSystem.shutdown(); |
| } |
| |
| @Test |
| public void testSimple() throws Exception { |
| |
| LOG.info("Running testSimple"); |
| |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // add resources to scheduler |
| MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); |
| MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); |
| MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); |
| rm.drainEvents(); |
| |
| // create the container request |
| ContainerRequestEvent event1 = createReq(jobId, 1, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| |
| // send 1 more request with different resource req |
| ContainerRequestEvent event2 = createReq(jobId, 2, 1024, |
| new String[] { "h2" }); |
| allocator.sendRequest(event2); |
| |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); |
| |
| // send another request with different resource and priority |
| ContainerRequestEvent event3 = createReq(jobId, 3, 1024, |
| new String[] { "h3" }); |
| allocator.sendRequest(event3); |
| |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); |
| |
| // update resources in scheduler |
| nodeManager1.nodeHeartbeat(true); // Node heartbeat |
| nodeManager2.nodeHeartbeat(true); // Node heartbeat |
| nodeManager3.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); |
| checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, |
| assigned, false); |
| |
| // check that the assigned container requests are cancelled |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); |
| } |
| |
| @Test |
| public void testMapNodeLocality() throws Exception { |
| // test checks that ordering of allocated containers list from the RM does |
| // not affect the map->container assignment done by the AM. If there is a |
| // node local container available for a map then it should be assigned to |
| // that container and not a rack-local container that happened to be seen |
| // earlier in the allocated containers list from the RM. |
| // Regression test for MAPREDUCE-4893 |
| LOG.info("Running testMapNodeLocality"); |
| |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // add resources to scheduler |
| MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps |
| rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node |
| MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map |
| rm.drainEvents(); |
| |
| // create the container requests for maps |
| ContainerRequestEvent event1 = createReq(jobId, 1, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| ContainerRequestEvent event2 = createReq(jobId, 2, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event2); |
| ContainerRequestEvent event3 = createReq(jobId, 3, 1024, |
| new String[] { "h2" }); |
| allocator.sendRequest(event3); |
| |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // update resources in scheduler |
| // Node heartbeat from rack-local first. This makes node h3 the first in the |
| // list of allocated containers but it should not be assigned to task1. |
| nodeManager3.nodeHeartbeat(true); |
| // Node heartbeat from node-local next. This allocates 2 node local |
| // containers for task1 and task2. These should be matched with those tasks. |
| nodeManager1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, |
| assigned, false); |
| // remove the rack-local assignment that should have happened for task3 |
| for(TaskAttemptContainerAssignedEvent event : assigned) { |
| if(event.getTaskAttemptID().equals(event3.getAttemptID())) { |
| assigned.remove(event); |
| Assert.assertEquals("h3", event.getContainer().getNodeId().getHost()); |
| break; |
| } |
| } |
| checkAssignments(new ContainerRequestEvent[] { event1, event2}, |
| assigned, true); |
| } |
| |
| @Test |
| public void testResource() throws Exception { |
| |
| LOG.info("Running testResource"); |
| |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // add resources to scheduler |
| MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); |
| MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); |
| MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); |
| rm.drainEvents(); |
| |
| // create the container request |
| ContainerRequestEvent event1 = createReq(jobId, 1, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| |
| // send 1 more request with different resource req |
| ContainerRequestEvent event2 = createReq(jobId, 2, 2048, |
| new String[] { "h2" }); |
| allocator.sendRequest(event2); |
| |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // update resources in scheduler |
| nodeManager1.nodeHeartbeat(true); // Node heartbeat |
| nodeManager2.nodeHeartbeat(true); // Node heartbeat |
| nodeManager3.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| checkAssignments(new ContainerRequestEvent[] { event1, event2 }, |
| assigned, false); |
| } |
| |
| @Test(timeout = 30000) |
| public void testReducerRampdownDiagnostics() throws Exception { |
| LOG.info("Running tesReducerRampdownDiagnostics"); |
| |
| final Configuration conf = new Configuration(); |
| conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); |
| final MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| final RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| final String host = "host1"; |
| final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048); |
| nm.nodeHeartbeat(true); |
| rm.drainEvents(); |
| final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| final JobId jobId = MRBuilderUtils |
| .newJobId(appAttemptId.getApplicationId(), 0); |
| final Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob, SystemClock.getInstance()); |
| // add resources to scheduler |
| rm.drainEvents(); |
| |
| // create the container request |
| final String[] locations = new String[] { host }; |
| allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); |
| for (int i = 0; i < 1;) { |
| rm.drainEvents(); |
| i += allocator.schedule().size(); |
| nm.nodeHeartbeat(true); |
| } |
| |
| allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); |
| while (allocator.getTaskAttemptKillEvents().size() == 0) { |
| rm.drainEvents(); |
| allocator.schedule().size(); |
| nm.nodeHeartbeat(true); |
| } |
| final String killEventMessage = allocator.getTaskAttemptKillEvents().get(0) |
| .getMessage(); |
| Assert.assertTrue("No reducer rampDown preemption message", |
| killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC)); |
| } |
| |
| @Test(timeout = 30000) |
| public void testPreemptReducers() throws Exception { |
| LOG.info("Running testPreemptReducers"); |
| |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob, SystemClock.getInstance()); |
| allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); |
| allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); |
| RMContainerAllocator.AssignedRequests assignedRequests = |
| allocator.getAssignedRequests(); |
| RMContainerAllocator.ScheduledRequests scheduledRequests = |
| allocator.getScheduledRequests(); |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); |
| scheduledRequests.maps.put(mock(TaskAttemptId.class), |
| new RMContainerRequestor.ContainerRequest(event1, null,null)); |
| assignedRequests.reduces.put(mock(TaskAttemptId.class), |
| mock(Container.class)); |
| |
| allocator.preemptReducesIfNeeded(); |
| Assert.assertEquals("The reducer is not preempted", |
| 1, assignedRequests.preemptionWaitingReduces.size()); |
| } |
| |
| @Test(timeout = 30000) |
| public void testNonAggressivelyPreemptReducers() throws Exception { |
| LOG.info("Running testNonAggressivelyPreemptReducers"); |
| |
| final int preemptThreshold = 2; //sec |
| Configuration conf = new Configuration(); |
| conf.setInt( |
| MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, |
| preemptThreshold); |
| |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| ControlledClock clock = new ControlledClock(null); |
| clock.setTime(1); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob, clock); |
| allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); |
| allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); |
| RMContainerAllocator.AssignedRequests assignedRequests = |
| allocator.getAssignedRequests(); |
| RMContainerAllocator.ScheduledRequests scheduledRequests = |
| allocator.getScheduledRequests(); |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); |
| scheduledRequests.maps.put(mock(TaskAttemptId.class), |
| new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); |
| assignedRequests.reduces.put(mock(TaskAttemptId.class), |
| mock(Container.class)); |
| |
| clock.setTime(clock.getTime() + 1); |
| allocator.preemptReducesIfNeeded(); |
| Assert.assertEquals("The reducer is aggressively preeempted", 0, |
| assignedRequests.preemptionWaitingReduces.size()); |
| |
| clock.setTime(clock.getTime() + (preemptThreshold) * 1000); |
| allocator.preemptReducesIfNeeded(); |
| Assert.assertEquals("The reducer is not preeempted", 1, |
| assignedRequests.preemptionWaitingReduces.size()); |
| } |
| |
| @Test(timeout = 30000) |
| public void testUnconditionalPreemptReducers() throws Exception { |
| LOG.info("Running testForcePreemptReducers"); |
| |
| int forcePreemptThresholdSecs = 2; |
| Configuration conf = new Configuration(); |
| conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, |
| 2 * forcePreemptThresholdSecs); |
| conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, |
| forcePreemptThresholdSecs); |
| |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8)); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| ControlledClock clock = new ControlledClock(null); |
| clock.setTime(1); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob, clock); |
| allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); |
| allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); |
| RMContainerAllocator.AssignedRequests assignedRequests = |
| allocator.getAssignedRequests(); |
| RMContainerAllocator.ScheduledRequests scheduledRequests = |
| allocator.getScheduledRequests(); |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); |
| scheduledRequests.maps.put(mock(TaskAttemptId.class), |
| new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); |
| assignedRequests.reduces.put(mock(TaskAttemptId.class), |
| mock(Container.class)); |
| |
| clock.setTime(clock.getTime() + 1); |
| allocator.preemptReducesIfNeeded(); |
| Assert.assertEquals("The reducer is preeempted too soon", 0, |
| assignedRequests.preemptionWaitingReduces.size()); |
| |
| clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs); |
| allocator.preemptReducesIfNeeded(); |
| Assert.assertEquals("The reducer is not preeempted", 1, |
| assignedRequests.preemptionWaitingReduces.size()); |
| } |
| |
| @Test(timeout = 30000) |
| public void testExcessReduceContainerAssign() throws Exception { |
| final Configuration conf = new Configuration(); |
| conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); |
| final MyResourceManager2 rm = new MyResourceManager2(conf); |
| rm.start(); |
| final RMApp app = rm.submitApp(2048); |
| rm.drainEvents(); |
| final String host = "host1"; |
| final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096); |
| nm.nodeHeartbeat(true); |
| rm.drainEvents(); |
| final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| final JobId jobId = MRBuilderUtils |
| .newJobId(appAttemptId.getApplicationId(), 0); |
| final Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob, SystemClock.getInstance()); |
| |
| // request to allocate two reduce priority containers |
| final String[] locations = new String[] { host }; |
| allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); |
| allocator.scheduleAllReduces(); |
| allocator.makeRemoteRequest(); |
| nm.nodeHeartbeat(true); |
| rm.drainEvents(); |
| allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); |
| |
| int assignedContainer; |
| for (assignedContainer = 0; assignedContainer < 1;) { |
| assignedContainer += allocator.schedule().size(); |
| nm.nodeHeartbeat(true); |
| rm.drainEvents(); |
| } |
| // only 1 allocated container should be assigned |
| Assert.assertEquals(assignedContainer, 1); |
| } |
| |
| @Test |
| public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { |
| |
| LOG.info("Running testMapReduceAllocationWithNodeLabelExpression"); |
| Configuration conf = new Configuration(); |
| /* |
| * final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1; |
| * conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); |
| * conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); |
| */ |
| conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f); |
| conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes"); |
| conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes"); |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId appAttemptId = |
| ApplicationAttemptId.newInstance(appId, 1); |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| final MockScheduler mockScheduler = new MockScheduler(appAttemptId); |
| MyContainerAllocator allocator = |
| new MyContainerAllocator(null, conf, appAttemptId, mockJob, |
| SystemClock.getInstance()) { |
| @Override |
| protected void register() { |
| } |
| |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mockScheduler; |
| } |
| }; |
| |
| // create some map requests |
| ContainerRequestEvent reqMapEvents; |
| reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); |
| allocator.sendRequests(Arrays.asList(reqMapEvents)); |
| |
| // create some reduce requests |
| ContainerRequestEvent reqReduceEvents; |
| reqReduceEvents = |
| createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); |
| allocator.sendRequests(Arrays.asList(reqReduceEvents)); |
| allocator.schedule(); |
| // verify all of the host-specific asks were sent plus one for the |
| // default rack and one for the ANY request |
| Assert.assertEquals(3, mockScheduler.lastAsk.size()); |
| // verify ResourceRequest sent for MAP have appropriate node |
| // label expression as per the configuration |
| validateLabelsRequests(mockScheduler.lastAsk.get(0), false); |
| validateLabelsRequests(mockScheduler.lastAsk.get(1), false); |
| validateLabelsRequests(mockScheduler.lastAsk.get(2), false); |
| |
| // assign a map task and verify we do not ask for any more maps |
| ContainerId cid0 = mockScheduler.assignContainer("map", false); |
| allocator.schedule(); |
| // default rack and one for the ANY request |
| Assert.assertEquals(3, mockScheduler.lastAsk.size()); |
| validateLabelsRequests(mockScheduler.lastAsk.get(0), true); |
| validateLabelsRequests(mockScheduler.lastAsk.get(1), true); |
| validateLabelsRequests(mockScheduler.lastAsk.get(2), true); |
| |
| // complete the map task and verify that we ask for one more |
| allocator.close(); |
| } |
| |
| private void validateLabelsRequests(ResourceRequest resourceRequest, |
| boolean isReduce) { |
| switch (resourceRequest.getResourceName()) { |
| case "map": |
| case "reduce": |
| case NetworkTopology.DEFAULT_RACK: |
| Assert.assertNull(resourceRequest.getNodeLabelExpression()); |
| break; |
| case "*": |
| Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes", |
| resourceRequest.getNodeLabelExpression()); |
| break; |
| default: |
| Assert.fail("Invalid resource location " |
| + resourceRequest.getResourceName()); |
| } |
| } |
| |
| @Test |
| public void testUpdateCollectorInfo() throws Exception { |
| LOG.info("Running testUpdateCollectorInfo"); |
| Configuration conf = new Configuration(); |
| conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); |
| conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); |
| JobId jobId = MRBuilderUtils.newJobId(appId, 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| String localAddr = "localhost:1234"; |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| |
| // Generate a timeline delegation token. |
| TimelineDelegationTokenIdentifier ident = |
| new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()), |
| new Text("renewer"), null); |
| ident.setSequenceNumber(1); |
| Token<TimelineDelegationTokenIdentifier> collectorToken = |
| new Token<TimelineDelegationTokenIdentifier>(ident.getBytes(), |
| new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME, |
| new Text(localAddr)); |
| org.apache.hadoop.yarn.api.records.Token token = |
| org.apache.hadoop.yarn.api.records.Token.newInstance( |
| collectorToken.getIdentifier(), collectorToken.getKind().toString(), |
| collectorToken.getPassword(), |
| collectorToken.getService().toString()); |
| CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token); |
| // Mock scheduler to server Allocate request. |
| final MockSchedulerForTimelineCollector mockScheduler = |
| new MockSchedulerForTimelineCollector(collectorInfo); |
| MyContainerAllocator allocator = |
| new MyContainerAllocator(null, conf, attemptId, mockJob, |
| SystemClock.getInstance()) { |
| @Override |
| protected void register() { |
| } |
| |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mockScheduler; |
| } |
| }; |
| // Initially UGI should have no tokens. |
| ArrayList<Token<? extends TokenIdentifier>> tokens = |
| new ArrayList<>(ugi.getTokens()); |
| assertEquals(0, tokens.size()); |
| TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId)); |
| client.init(conf); |
| when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()). |
| thenReturn(client); |
| |
| // Send allocate request to RM and fetch collector address and token. |
| allocator.schedule(); |
| verify(client).setTimelineCollectorInfo(collectorInfo); |
| // Verify if token has been updated in UGI. |
| tokens = new ArrayList<>(ugi.getTokens()); |
| assertEquals(1, tokens.size()); |
| assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME, |
| tokens.get(0).getKind()); |
| assertEquals(collectorToken.decodeIdentifier(), |
| tokens.get(0).decodeIdentifier()); |
| |
| // Generate new collector token, send allocate request to RM and fetch the |
| // new token. |
| ident.setSequenceNumber(100); |
| Token<TimelineDelegationTokenIdentifier> collectorToken1 = |
| new Token<TimelineDelegationTokenIdentifier>(ident.getBytes(), |
| new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME, |
| new Text(localAddr)); |
| token = org.apache.hadoop.yarn.api.records.Token.newInstance( |
| collectorToken1.getIdentifier(), collectorToken1.getKind().toString(), |
| collectorToken1.getPassword(), collectorToken1.getService().toString()); |
| collectorInfo = CollectorInfo.newInstance(localAddr, token); |
| mockScheduler.updateCollectorInfo(collectorInfo); |
| allocator.schedule(); |
| verify(client).setTimelineCollectorInfo(collectorInfo); |
| // Verify if new token has been updated in UGI. |
| tokens = new ArrayList<>(ugi.getTokens()); |
| assertEquals(1, tokens.size()); |
| assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME, |
| tokens.get(0).getKind()); |
| assertEquals(collectorToken1.decodeIdentifier(), |
| tokens.get(0).decodeIdentifier()); |
| allocator.close(); |
| } |
| |
| @Test |
| public void testMapReduceScheduling() throws Exception { |
| |
| LOG.info("Running testMapReduceScheduling"); |
| |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob, SystemClock.getInstance()); |
| |
| // add resources to scheduler |
| MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); |
| MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); |
| MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); |
| rm.drainEvents(); |
| |
| // create the container request |
| // send MAP request |
| ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { |
| "h1", "h2" }, true, false); |
| allocator.sendRequest(event1); |
| |
| // send REDUCE request |
| ContainerRequestEvent event2 = createReq(jobId, 2, 3000, |
| new String[] { "h1" }, false, true); |
| allocator.sendRequest(event2); |
| |
| // send MAP request |
| ContainerRequestEvent event3 = createReq(jobId, 3, 2048, |
| new String[] { "h3" }, false, false); |
| allocator.sendRequest(event3); |
| |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // update resources in scheduler |
| nodeManager1.nodeHeartbeat(true); // Node heartbeat |
| nodeManager2.nodeHeartbeat(true); // Node heartbeat |
| nodeManager3.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| checkAssignments(new ContainerRequestEvent[] { event1, event3 }, |
| assigned, false); |
| |
| // validate that no container is assigned to h1 as it doesn't have 2048 |
| for (TaskAttemptContainerAssignedEvent assig : assigned) { |
| Assert.assertFalse("Assigned count not correct", "h1".equals(assig |
| .getContainer().getNodeId().getHost())); |
| } |
| } |
| |
| private static class MyResourceManager extends MockRM { |
| |
| private static long fakeClusterTimeStamp = System.currentTimeMillis(); |
| |
| public MyResourceManager(Configuration conf) { |
| super(conf); |
| } |
| |
| public MyResourceManager(Configuration conf, RMStateStore store) { |
| super(conf, store); |
| } |
| |
| @Override |
| public void serviceStart() throws Exception { |
| super.serviceStart(); |
| // Ensure that the application attempt IDs for all the tests are the same |
| // The application attempt IDs will be used as the login user names |
| MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp); |
| } |
| |
| @Override |
| protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { |
| // Dispatch inline for test sanity |
| return new EventHandler<SchedulerEvent>() { |
| @Override |
| public void handle(SchedulerEvent event) { |
| scheduler.handle(event); |
| } |
| }; |
| } |
| @Override |
| protected ResourceScheduler createScheduler() { |
| return new MyFifoScheduler(this.getRMContext()); |
| } |
| |
| MyFifoScheduler getMyFifoScheduler() { |
| return (MyFifoScheduler) scheduler; |
| } |
| } |
| |
| private static class MyResourceManager2 extends MyResourceManager { |
| public MyResourceManager2(Configuration conf) { |
| super(conf); |
| } |
| |
| @Override |
| protected ResourceScheduler createScheduler() { |
| return new ExcessReduceContainerAllocateScheduler(this.getRMContext()); |
| } |
| } |
| |
| @Test |
| public void testReportedAppProgress() throws Exception { |
| |
| LOG.info("Running testReportedAppProgress"); |
| |
| Configuration conf = new Configuration(); |
| final MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() |
| .getDispatcher(); |
| |
| // Submit the application |
| RMApp rmApp = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 21504); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( |
| appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { |
| @Override |
| protected Dispatcher createDispatcher() { |
| return new DrainDispatcher(); |
| } |
| protected ContainerAllocator createContainerAllocator( |
| ClientService clientService, AppContext context) { |
| return new MyContainerAllocator(rm, appAttemptId, context); |
| }; |
| }; |
| |
| Assert.assertEquals(0.0, rmApp.getProgress(), 0.0); |
| |
| mrApp.submit(conf); |
| Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() |
| .getValue(); |
| |
| DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); |
| |
| MyContainerAllocator allocator = (MyContainerAllocator) mrApp |
| .getContainerAllocator(); |
| |
| mrApp.waitForInternalState((JobImpl) job, JobStateInternal.RUNNING); |
| |
| amDispatcher.await(); |
| // Wait till all map-attempts request for containers |
| for (Task t : job.getTasks().values()) { |
| if (t.getType() == TaskType.MAP) { |
| mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values() |
| .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); |
| } |
| } |
| amDispatcher.await(); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Wait for all map-tasks to be running |
| for (Task t : job.getTasks().values()) { |
| if (t.getType() == TaskType.MAP) { |
| mrApp.waitForState(t, TaskState.RUNNING); |
| } |
| } |
| |
| allocator.schedule(); // Send heartbeat |
| rm.drainEvents(); |
| Assert.assertEquals(0.05f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); |
| |
| // Finish off 1 map. |
| Iterator<Task> it = job.getTasks().values().iterator(); |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0.095f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); |
| |
| // Finish off 7 more so that map-progress is 80% |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7); |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0.41f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); |
| |
| // Finish off the 2 remaining maps |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Wait for all reduce-tasks to be running |
| for (Task t : job.getTasks().values()) { |
| if (t.getType() == TaskType.REDUCE) { |
| mrApp.waitForState(t, TaskState.RUNNING); |
| } |
| } |
| |
| // Finish off 2 reduces |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0.59f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); |
| |
| // Finish off the remaining 8 reduces. |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8); |
| allocator.schedule(); |
| rm.drainEvents(); |
| // Remaining is JobCleanup |
| Assert.assertEquals(0.95f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); |
| } |
| |
| private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node, |
| MRApp mrApp, Iterator<Task> it, int nextN) throws Exception { |
| Task task; |
| for (int i=0; i<nextN; i++) { |
| task = it.next(); |
| finishTask(rmDispatcher, node, mrApp, task); |
| } |
| } |
| |
| private void finishTask(DrainDispatcher rmDispatcher, MockNM node, |
| MRApp mrApp, Task task) throws Exception { |
| TaskAttempt attempt = task.getAttempts().values().iterator().next(); |
| List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1); |
| contStatus.add(ContainerStatus.newInstance(attempt.getAssignedContainerID(), |
| ContainerState.COMPLETE, "", 0)); |
| Map<ApplicationId,List<ContainerStatus>> statusUpdate = |
| new HashMap<ApplicationId,List<ContainerStatus>>(1); |
| statusUpdate.put(mrApp.getAppID(), contStatus); |
| node.nodeHeartbeat(statusUpdate, true); |
| rmDispatcher.await(); |
| mrApp.getContext().getEventHandler().handle( |
| new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); |
| mrApp.waitForState(task, TaskState.SUCCEEDED); |
| } |
| |
| @Test |
| public void testReportedAppProgressWithOnlyMaps() throws Exception { |
| |
| LOG.info("Running testReportedAppProgressWithOnlyMaps"); |
| |
| Configuration conf = new Configuration(); |
| final MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() |
| .getDispatcher(); |
| |
| // Submit the application |
| RMApp rmApp = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 11264); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( |
| appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { |
| @Override |
| protected Dispatcher createDispatcher() { |
| return new DrainDispatcher(); |
| } |
| protected ContainerAllocator createContainerAllocator( |
| ClientService clientService, AppContext context) { |
| return new MyContainerAllocator(rm, appAttemptId, context); |
| }; |
| }; |
| |
| Assert.assertEquals(0.0, rmApp.getProgress(), 0.0); |
| |
| mrApp.submit(conf); |
| Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() |
| .getValue(); |
| |
| DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); |
| |
| MyContainerAllocator allocator = (MyContainerAllocator) mrApp |
| .getContainerAllocator(); |
| |
| mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING); |
| |
| amDispatcher.await(); |
| // Wait till all map-attempts request for containers |
| for (Task t : job.getTasks().values()) { |
| mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values() |
| .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); |
| } |
| amDispatcher.await(); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Wait for all map-tasks to be running |
| for (Task t : job.getTasks().values()) { |
| mrApp.waitForState(t, TaskState.RUNNING); |
| } |
| |
| allocator.schedule(); // Send heartbeat |
| rm.drainEvents(); |
| Assert.assertEquals(0.05f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); |
| |
| Iterator<Task> it = job.getTasks().values().iterator(); |
| |
| // Finish off 1 map so that map-progress is 10% |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0.14f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); |
| |
| // Finish off 5 more map so that map-progress is 60% |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5); |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0.59f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); |
| |
| // Finish off remaining map so that map-progress is 100% |
| finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4); |
| allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0.95f, job.getProgress(), 0.001f); |
| Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); |
| } |
| |
| @Test |
| public void testUpdatedNodes() throws Exception { |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // add resources to scheduler |
| MockNM nm1 = rm.registerNode("h1:1234", 10240); |
| MockNM nm2 = rm.registerNode("h2:1234", 10240); |
| rm.drainEvents(); |
| |
| // create the map container request |
| ContainerRequestEvent event = createReq(jobId, 1, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event); |
| TaskAttemptId attemptId = event.getAttemptID(); |
| |
| TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); |
| when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId()); |
| Task mockTask = mock(Task.class); |
| when(mockTask.getAttempt(attemptId)).thenReturn(mockTaskAttempt); |
| when(mockJob.getTask(attemptId.getTaskId())).thenReturn(mockTask); |
| |
| // this tells the scheduler about the requests |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| |
| nm1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); |
| Assert.assertEquals(3, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); |
| allocator.getJobUpdatedNodeEvents().clear(); |
| // get the assignment |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(1, assigned.size()); |
| Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId()); |
| // no updated nodes reported |
| Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); |
| Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); |
| |
| // mark nodes bad |
| nm1.nodeHeartbeat(false); |
| nm2.nodeHeartbeat(false); |
| rm.drainEvents(); |
| |
| // schedule response returns updated nodes |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0, assigned.size()); |
| // updated nodes are reported |
| Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); |
| Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); |
| Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); |
| Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); |
| allocator.getJobUpdatedNodeEvents().clear(); |
| allocator.getTaskAttemptKillEvents().clear(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals(0, assigned.size()); |
| // no updated nodes reported |
| Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); |
| Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); |
| } |
| |
| @Test |
| public void testBlackListedNodes() throws Exception { |
| |
| LOG.info("Running testBlackListedNodes"); |
| |
| Configuration conf = new Configuration(); |
| conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); |
| conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); |
| conf.setInt( |
| MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); |
| |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // add resources to scheduler |
| MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); |
| MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); |
| MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); |
| rm.drainEvents(); |
| |
| // create the container request |
| ContainerRequestEvent event1 = createReq(jobId, 1, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| |
| // send 1 more request with different resource req |
| ContainerRequestEvent event2 = createReq(jobId, 2, 1024, |
| new String[] { "h2" }); |
| allocator.sendRequest(event2); |
| |
| // send another request with different resource and priority |
| ContainerRequestEvent event3 = createReq(jobId, 3, 1024, |
| new String[] { "h3" }); |
| allocator.sendRequest(event3); |
| |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // Send events to blacklist nodes h1 and h2 |
| ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); |
| allocator.sendFailure(f1); |
| ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); |
| allocator.sendFailure(f2); |
| |
| // update resources in scheduler |
| nodeManager1.nodeHeartbeat(true); // Node heartbeat |
| nodeManager2.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| assertBlacklistAdditionsAndRemovals(2, 0, rm); |
| |
| // mark h1/h2 as bad nodes |
| nodeManager1.nodeHeartbeat(false); |
| nodeManager2.nodeHeartbeat(false); |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| nodeManager3.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm); |
| |
| Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); |
| |
| // validate that all containers are assigned to h3 |
| for (TaskAttemptContainerAssignedEvent assig : assigned) { |
| Assert.assertTrue("Assigned container host not correct", "h3".equals(assig |
| .getContainer().getNodeId().getHost())); |
| } |
| } |
| |
| @Test |
| public void testIgnoreBlacklisting() throws Exception { |
| LOG.info("Running testIgnoreBlacklisting"); |
| |
| Configuration conf = new Configuration(); |
| conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); |
| conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); |
| conf.setInt( |
| MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33); |
| |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM[] nodeManagers = new MockNM[10]; |
| int nmNum = 0; |
| List<TaskAttemptContainerAssignedEvent> assigned = null; |
| nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); |
| nodeManagers[0].nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = |
| app.getCurrentAppAttempt().getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = |
| new MyContainerAllocator(rm, conf, appAttemptId, mockJob); |
| |
| // Known=1, blacklisted=0, ignore should be false - assign first container |
| assigned = |
| getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| LOG.info("Failing container _1 on H1 (Node should be blacklisted and" |
| + " ignore blacklisting enabled"); |
| // Send events to blacklist nodes h1 and h2 |
| ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); |
| allocator.sendFailure(f1); |
| |
| // Test single node. |
| // Known=1, blacklisted=1, ignore should be true - assign 0 |
| // Because makeRemoteRequest will not be aware of it until next call |
| // The current call will send blacklisted node "h1" to RM |
| assigned = |
| getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 1, 0, 0, 1, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // Known=1, blacklisted=1, ignore should be true - assign 1 |
| assigned = |
| getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); |
| // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. |
| assigned = |
| getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, |
| nodeManagers[1], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); |
| // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. |
| assigned = |
| getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, |
| nodeManagers[2], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| // Known=3, blacklisted=1, ignore should be true - assign 1 |
| assigned = |
| getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); |
| // Known=4, blacklisted=1, ignore should be false - assign 1 anyway |
| assigned = |
| getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, |
| nodeManagers[3], allocator, 0, 0, 1, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| // Test blacklisting re-enabled. |
| // Known=4, blacklisted=1, ignore should be false - no assignment on h1 |
| assigned = |
| getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| // RMContainerRequestor would have created a replacement request. |
| |
| // Blacklist h2 |
| ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false); |
| allocator.sendFailure(f2); |
| |
| // Test ignore blacklisting re-enabled |
| // Known=4, blacklisted=2, ignore should be true. Should assign 0 |
| // container for the same reason above. |
| assigned = |
| getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 1, 0, 0, 2, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // Known=4, blacklisted=2, ignore should be true. Should assign 2 |
| // containers. |
| assigned = |
| getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, |
| nodeManagers[0], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); |
| |
| // Known=4, blacklisted=2, ignore should be true. |
| assigned = |
| getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, |
| nodeManagers[1], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| // Test blacklist while ignore blacklisting enabled |
| ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false); |
| allocator.sendFailure(f3); |
| |
| nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); |
| // Known=5, blacklisted=3, ignore should be true. |
| assigned = |
| getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, |
| nodeManagers[2], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| // Assign on 5 more nodes - to re-enable blacklisting |
| for (int i = 0; i < 5; i++) { |
| nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); |
| assigned = |
| getContainerOnHost(jobId, 11 + i, 1024, |
| new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], |
| allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| } |
| |
| // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. |
| assigned = |
| getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, |
| nodeManagers[2], allocator, 0, 0, 0, 0, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| } |
| |
| private MockNM registerNodeManager(int i, MyResourceManager rm) |
| throws Exception { |
| MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240); |
| rm.drainEvents(); |
| return nm; |
| } |
| |
| private |
| List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId, |
| int taskAttemptId, int memory, String[] hosts, MockNM mockNM, |
| MyContainerAllocator allocator, |
| int expectedAdditions1, int expectedRemovals1, |
| int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) |
| throws Exception { |
| ContainerRequestEvent reqEvent = |
| createReq(jobId, taskAttemptId, memory, hosts); |
| allocator.sendRequest(reqEvent); |
| |
| // Send the request to the RM |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals( |
| expectedAdditions1, expectedRemovals1, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // Heartbeat from the required nodeManager |
| mockNM.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals( |
| expectedAdditions2, expectedRemovals2, rm); |
| return assigned; |
| } |
| |
| @Test |
| public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { |
| LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); |
| |
| Configuration conf = new Configuration(); |
| conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); |
| conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); |
| conf.setInt( |
| MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); |
| |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // add resources to scheduler |
| MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); |
| MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); |
| rm.drainEvents(); |
| |
| LOG.info("Requesting 1 Containers _1 on H1"); |
| // create the container request |
| ContainerRequestEvent event1 = createReq(jobId, 1, 1024, |
| new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| |
| LOG.info("RM Heartbeat (to send the container requests)"); |
| // this tells the scheduler about the requests |
| // as nodes are not added, no allocations |
| List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); |
| rm.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| LOG.info("h1 Heartbeat (To actually schedule the containers)"); |
| // update resources in scheduler |
| nodeManager1.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| LOG.info("RM Heartbeat (To process the scheduled containers)"); |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm); |
| Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); |
| |
| LOG.info("Failing container _1 on H1 (should blacklist the node)"); |
| // Send events to blacklist nodes h1 and h2 |
| ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); |
| allocator.sendFailure(f1); |
| |
| //At this stage, a request should be created for a fast fail map |
| //Create a FAST_FAIL request for a previously failed map. |
| ContainerRequestEvent event1f = createReq(jobId, 1, 1024, |
| new String[] { "h1" }, true, false); |
| allocator.sendRequest(event1f); |
| |
| //Update the Scheduler with the new requests. |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals(1, 0, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| // send another request with different resource and priority |
| ContainerRequestEvent event3 = createReq(jobId, 3, 1024, |
| new String[] { "h1", "h3" }); |
| allocator.sendRequest(event3); |
| |
| //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container. |
| //RM is only aware of the prio:5 container |
| |
| LOG.info("h1 Heartbeat (To actually schedule the containers)"); |
| // update resources in scheduler |
| nodeManager1.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| LOG.info("RM Heartbeat (To process the scheduled containers)"); |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| //RMContainerAllocator gets assigned a p:5 on a blacklisted node. |
| |
| //Send a release for the p:5 container + another request. |
| LOG.info("RM Heartbeat (To process the re-scheduled containers)"); |
| assigned = allocator.schedule(); |
| rm.drainEvents(); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm); |
| Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); |
| |
| //Hearbeat from H3 to schedule on this host. |
| LOG.info("h3 Heartbeat (To re-schedule the containers)"); |
| nodeManager3.nodeHeartbeat(true); // Node heartbeat |
| rm.drainEvents(); |
| |
| LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); |
| assigned = allocator.schedule(); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm); |
| rm.drainEvents(); |
| |
| // For debugging |
| for (TaskAttemptContainerAssignedEvent assig : assigned) { |
| LOG.info(assig.getTaskAttemptID() + |
| " assgined to " + assig.getContainer().getId() + |
| " with priority " + assig.getContainer().getPriority()); |
| } |
| |
| Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); |
| |
| // validate that all containers are assigned to h3 |
| for (TaskAttemptContainerAssignedEvent assig : assigned) { |
| Assert.assertEquals("Assigned container " + assig.getContainer().getId() |
| + " host not correct", "h3", assig.getContainer().getNodeId().getHost()); |
| } |
| } |
| |
| private static void assertBlacklistAdditionsAndRemovals( |
| int expectedAdditions, int expectedRemovals, MyResourceManager rm) { |
| Assert.assertEquals(expectedAdditions, |
| rm.getMyFifoScheduler().lastBlacklistAdditions.size()); |
| Assert.assertEquals(expectedRemovals, |
| rm.getMyFifoScheduler().lastBlacklistRemovals.size()); |
| } |
| |
| private static void assertAsksAndReleases(int expectedAsk, |
| int expectedRelease, MyResourceManager rm) { |
| Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size()); |
| Assert.assertEquals(expectedRelease, |
| rm.getMyFifoScheduler().lastRelease.size()); |
| } |
| |
| private static class MyFifoScheduler extends FifoScheduler { |
| |
| public MyFifoScheduler(RMContext rmContext) { |
| super(); |
| try { |
| Configuration conf = new Configuration(); |
| reinitialize(conf, rmContext); |
| } catch (IOException ie) { |
| LOG.info("add application failed with ", ie); |
| assert (false); |
| } |
| } |
| |
| List<ResourceRequest> lastAsk = null; |
| List<ContainerId> lastRelease = null; |
| List<String> lastBlacklistAdditions; |
| List<String> lastBlacklistRemovals; |
| Resource forceResourceLimit = null; |
| |
| // override this to copy the objects otherwise FifoScheduler updates the |
| // numContainers in same objects as kept by RMContainerAllocator |
| @Override |
| public synchronized Allocation allocate( |
| ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, |
| List<ContainerId> release, List<String> blacklistAdditions, |
| List<String> blacklistRemovals, |
| ContainerUpdates updateRequests) { |
| List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); |
| for (ResourceRequest req : ask) { |
| ResourceRequest reqCopy = ResourceRequest.newInstance(req |
| .getPriority(), req.getResourceName(), req.getCapability(), req |
| .getNumContainers(), req.getRelaxLocality()); |
| askCopy.add(reqCopy); |
| } |
| SecurityUtil.setTokenServiceUseIp(false); |
| lastAsk = ask; |
| lastRelease = release; |
| lastBlacklistAdditions = blacklistAdditions; |
| lastBlacklistRemovals = blacklistRemovals; |
| Allocation allocation = super.allocate( |
| applicationAttemptId, askCopy, release, blacklistAdditions, |
| blacklistRemovals, updateRequests); |
| if (forceResourceLimit != null) { |
| // Test wants to force the non-default resource limit |
| allocation.setResourceLimit(forceResourceLimit); |
| } |
| return allocation; |
| } |
| |
| public void forceResourceLimit(Resource resource) { |
| this.forceResourceLimit = resource; |
| } |
| } |
| |
| private static class ExcessReduceContainerAllocateScheduler extends FifoScheduler { |
| |
| public ExcessReduceContainerAllocateScheduler(RMContext rmContext) { |
| super(); |
| try { |
| Configuration conf = new Configuration(); |
| reinitialize(conf, rmContext); |
| } catch (IOException ie) { |
| LOG.info("add application failed with ", ie); |
| assert (false); |
| } |
| } |
| |
| @Override |
| public synchronized Allocation allocate( |
| ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, |
| List<ContainerId> release, List<String> blacklistAdditions, |
| List<String> blacklistRemovals, |
| ContainerUpdates updateRequests) { |
| List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); |
| for (ResourceRequest req : ask) { |
| ResourceRequest reqCopy = ResourceRequest.newInstance(req |
| .getPriority(), req.getResourceName(), req.getCapability(), req |
| .getNumContainers(), req.getRelaxLocality()); |
| askCopy.add(reqCopy); |
| } |
| SecurityUtil.setTokenServiceUseIp(false); |
| Allocation normalAlloc = super.allocate( |
| applicationAttemptId, askCopy, release, |
| blacklistAdditions, blacklistRemovals, updateRequests); |
| List<Container> containers = normalAlloc.getContainers(); |
| if(containers.size() > 0) { |
| // allocate excess container |
| FiCaSchedulerApp application = super.getApplicationAttempt(applicationAttemptId); |
| ContainerId containerId = BuilderUtils.newContainerId(application |
| .getApplicationAttemptId(), application.getNewContainerId()); |
| Container excessC = mock(Container.class); |
| when(excessC.getId()).thenReturn(containerId); |
| when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE); |
| Resource mockR = mock(Resource.class); |
| when(mockR.getMemorySize()).thenReturn(2048L); |
| when(excessC.getResource()).thenReturn(mockR); |
| NodeId nId = mock(NodeId.class); |
| when(nId.getHost()).thenReturn("local"); |
| when(excessC.getNodeId()).thenReturn(nId); |
| containers.add(excessC); |
| } |
| Allocation excessAlloc = mock(Allocation.class); |
| when(excessAlloc.getContainers()).thenReturn(containers); |
| return excessAlloc; |
| } |
| } |
| |
| private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, |
| int memory, String[] hosts) { |
| return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false); |
| } |
| |
| private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, |
| int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { |
| return createReq(jobId, taskAttemptId, mem, |
| 1, hosts, earlierFailedAttempt, reduce); |
| } |
| |
| private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, |
| int memory, int vcore, String[] hosts, boolean earlierFailedAttempt, |
| boolean reduce) { |
| TaskId taskId; |
| if (reduce) { |
| taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); |
| } else { |
| taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); |
| } |
| TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, |
| taskAttemptId); |
| Resource containerNeed = Resource.newInstance(memory, vcore); |
| if (earlierFailedAttempt) { |
| return ContainerRequestEvent |
| .createContainerRequestEventForFailedContainer(attemptId, |
| containerNeed); |
| } |
| return new ContainerRequestEvent(attemptId, containerNeed, hosts, |
| new String[] { NetworkTopology.DEFAULT_RACK }); |
| } |
| |
| private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, |
| String host, boolean reduce) { |
| TaskId taskId; |
| if (reduce) { |
| taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); |
| } else { |
| taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); |
| } |
| TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, |
| taskAttemptId); |
| return new ContainerFailedEvent(attemptId, host); |
| } |
| |
| private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, |
| int taskAttemptId, boolean reduce) { |
| TaskId taskId; |
| if (reduce) { |
| taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); |
| } else { |
| taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); |
| } |
| TaskAttemptId attemptId = |
| MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); |
| return new ContainerAllocatorEvent(attemptId, |
| ContainerAllocator.EventType.CONTAINER_DEALLOCATE); |
| } |
| |
| private void checkAssignments(ContainerRequestEvent[] requests, |
| List<TaskAttemptContainerAssignedEvent> assignments, |
| boolean checkHostMatch) { |
| Assert.assertNotNull("Container not assigned", assignments); |
| Assert.assertEquals("Assigned count not correct", requests.length, |
| assignments.size()); |
| |
| // check for uniqueness of containerIDs |
| Set<ContainerId> containerIds = new HashSet<ContainerId>(); |
| for (TaskAttemptContainerAssignedEvent assigned : assignments) { |
| containerIds.add(assigned.getContainer().getId()); |
| } |
| Assert.assertEquals("Assigned containers must be different", assignments |
| .size(), containerIds.size()); |
| |
| // check for all assignment |
| for (ContainerRequestEvent req : requests) { |
| TaskAttemptContainerAssignedEvent assigned = null; |
| for (TaskAttemptContainerAssignedEvent ass : assignments) { |
| if (ass.getTaskAttemptID().equals(req.getAttemptID())) { |
| assigned = ass; |
| break; |
| } |
| } |
| checkAssignment(req, assigned, checkHostMatch); |
| } |
| } |
| |
| private void checkAssignment(ContainerRequestEvent request, |
| TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { |
| Assert.assertNotNull("Nothing assigned to attempt " |
| + request.getAttemptID(), assigned); |
| Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), |
| assigned.getTaskAttemptID()); |
| if (checkHostMatch) { |
| Assert.assertTrue("Not assigned to requested host", Arrays.asList( |
| request.getHosts()).contains( |
| assigned.getContainer().getNodeId().getHost())); |
| } |
| } |
| |
| // Mock RMContainerAllocator |
| // Instead of talking to remote Scheduler,uses the local Scheduler |
| private static class MyContainerAllocator extends RMContainerAllocator { |
| static final List<TaskAttemptContainerAssignedEvent> events |
| = new ArrayList<TaskAttemptContainerAssignedEvent>(); |
| static final List<TaskAttemptKillEvent> taskAttemptKillEvents |
| = new ArrayList<TaskAttemptKillEvent>(); |
| static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents |
| = new ArrayList<JobUpdatedNodesEvent>(); |
| static final List<JobEvent> jobEvents = new ArrayList<JobEvent>(); |
| private MyResourceManager rm; |
| private boolean isUnregistered = false; |
| private AllocateResponse allocateResponse; |
| private static AppContext createAppContext( |
| ApplicationAttemptId appAttemptId, Job job) { |
| AppContext context = mock(RunningAppContext.class); |
| ApplicationId appId = appAttemptId.getApplicationId(); |
| when(context.getApplicationID()).thenReturn(appId); |
| when(context.getApplicationAttemptId()).thenReturn(appAttemptId); |
| when(context.getJob(isA(JobId.class))).thenReturn(job); |
| when(context.getClock()).thenReturn(new ControlledClock()); |
| when(context.getClusterInfo()).thenReturn( |
| new ClusterInfo(Resource.newInstance(10240, 1))); |
| when(context.getEventHandler()).thenReturn(new EventHandler() { |
| @Override |
| public void handle(Event event) { |
| // Only capture interesting events. |
| if (event instanceof TaskAttemptContainerAssignedEvent) { |
| events.add((TaskAttemptContainerAssignedEvent) event); |
| } else if (event instanceof TaskAttemptKillEvent) { |
| taskAttemptKillEvents.add((TaskAttemptKillEvent)event); |
| } else if (event instanceof JobUpdatedNodesEvent) { |
| jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); |
| } else if (event instanceof JobEvent) { |
| jobEvents.add((JobEvent)event); |
| } |
| } |
| }); |
| return context; |
| } |
| |
| private static AppContext createAppContext( |
| ApplicationAttemptId appAttemptId, Job job, Clock clock) { |
| AppContext context = createAppContext(appAttemptId, job); |
| when(context.getClock()).thenReturn(clock); |
| return context; |
| } |
| |
| private static ClientService createMockClientService() { |
| ClientService service = mock(ClientService.class); |
| when(service.getBindAddress()).thenReturn( |
| NetUtils.createSocketAddr("localhost:4567")); |
| when(service.getHttpPort()).thenReturn(890); |
| return service; |
| } |
| |
| // Use this constructor when using a real job. |
| MyContainerAllocator(MyResourceManager rm, |
| ApplicationAttemptId appAttemptId, AppContext context) { |
| super(createMockClientService(), context); |
| this.rm = rm; |
| } |
| |
| // Use this constructor when you are using a mocked job. |
| public MyContainerAllocator(MyResourceManager rm, Configuration conf, |
| ApplicationAttemptId appAttemptId, Job job) { |
| super(createMockClientService(), createAppContext(appAttemptId, job)); |
| this.rm = rm; |
| super.init(conf); |
| super.start(); |
| } |
| |
| public MyContainerAllocator(MyResourceManager rm, Configuration conf, |
| ApplicationAttemptId appAttemptId, Job job, Clock clock) { |
| super(createMockClientService(), |
| createAppContext(appAttemptId, job, clock)); |
| this.rm = rm; |
| super.init(conf); |
| super.start(); |
| } |
| |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return this.rm.getApplicationMasterService(); |
| } |
| |
| @Override |
| protected void register() { |
| ApplicationAttemptId attemptId = getContext().getApplicationAttemptId(); |
| Token<AMRMTokenIdentifier> token = |
| rm.getRMContext().getRMApps().get(attemptId.getApplicationId()) |
| .getRMAppAttempt(attemptId).getAMRMToken(); |
| try { |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| ugi.addTokenIdentifier(token.decodeIdentifier()); |
| } catch (IOException e) { |
| throw new YarnRuntimeException(e); |
| } |
| super.register(); |
| } |
| |
| @Override |
| protected void unregister() { |
| isUnregistered=true; |
| } |
| |
| @Override |
| protected Resource getMaxContainerCapability() { |
| return Resource.newInstance(10240, 1); |
| } |
| |
| public void sendRequest(ContainerRequestEvent req) { |
| sendRequests(Arrays.asList(new ContainerRequestEvent[] { req })); |
| } |
| |
| public void sendRequests(List<ContainerRequestEvent> reqs) { |
| for (ContainerRequestEvent req : reqs) { |
| super.handleEvent(req); |
| } |
| } |
| |
| public void sendFailure(ContainerFailedEvent f) { |
| super.handleEvent(f); |
| } |
| |
| public void sendDeallocate(ContainerAllocatorEvent f) { |
| super.handleEvent(f); |
| } |
| |
| // API to be used by tests |
| public List<TaskAttemptContainerAssignedEvent> schedule() |
| throws Exception { |
| // before doing heartbeat with RM, drain all the outstanding events to |
| // ensure all the requests before this heartbeat is to be handled |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| public Boolean get() { |
| return eventQueue.isEmpty(); |
| } |
| }, 100, 10000); |
| // run the scheduler |
| super.heartbeat(); |
| |
| List<TaskAttemptContainerAssignedEvent> result |
| = new ArrayList<TaskAttemptContainerAssignedEvent>(events); |
| events.clear(); |
| return result; |
| } |
| |
| static List<TaskAttemptKillEvent> getTaskAttemptKillEvents() { |
| return taskAttemptKillEvents; |
| } |
| |
| static List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() { |
| return jobUpdatedNodeEvents; |
| } |
| |
| @Override |
| protected void startAllocatorThread() { |
| // override to NOT start thread |
| } |
| |
| @Override |
| protected boolean isApplicationMasterRegistered() { |
| return super.isApplicationMasterRegistered(); |
| } |
| |
| public boolean isUnregistered() { |
| return isUnregistered; |
| } |
| |
| public void updateSchedulerProxy(MyResourceManager rm) { |
| scheduler = rm.getApplicationMasterService(); |
| } |
| |
| @Override |
| protected AllocateResponse makeRemoteRequest() throws IOException, |
| YarnException { |
| allocateResponse = super.makeRemoteRequest(); |
| return allocateResponse; |
| } |
| } |
| |
| private static class MyContainerAllocator2 extends MyContainerAllocator { |
| public MyContainerAllocator2(MyResourceManager rm, Configuration conf, |
| ApplicationAttemptId appAttemptId, Job job) { |
| super(rm, conf, appAttemptId, job); |
| } |
| @Override |
| protected AllocateResponse makeRemoteRequest() throws IOException, |
| YarnException { |
| throw new IOException("for testing"); |
| } |
| } |
| |
| @Test |
| public void testReduceScheduling() throws Exception { |
| int totalMaps = 10; |
| int succeededMaps = 1; |
| int scheduledMaps = 10; |
| int scheduledReduces = 0; |
| int assignedMaps = 2; |
| int assignedReduces = 0; |
| Resource mapResourceReqt = BuilderUtils.newResource(1024, 1); |
| Resource reduceResourceReqt = BuilderUtils.newResource(2 * 1024, 1); |
| int numPendingReduces = 4; |
| float maxReduceRampupLimit = 0.5f; |
| float reduceSlowStart = 0.2f; |
| |
| RMContainerAllocator allocator = mock(RMContainerAllocator.class); |
| doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(), |
| anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class), |
| any(Resource.class), anyInt(), anyFloat(), anyFloat()); |
| doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator) |
| .getSchedulerResourceTypes(); |
| |
| // Test slow-start |
| allocator.scheduleReduces( |
| totalMaps, succeededMaps, |
| scheduledMaps, scheduledReduces, |
| assignedMaps, assignedReduces, |
| mapResourceReqt, reduceResourceReqt, |
| numPendingReduces, |
| maxReduceRampupLimit, reduceSlowStart); |
| verify(allocator, never()).setIsReduceStarted(true); |
| |
| // verify slow-start still in effect when no more maps need to |
| // be scheduled but some have yet to complete |
| allocator.scheduleReduces( |
| totalMaps, succeededMaps, |
| 0, scheduledReduces, |
| totalMaps - succeededMaps, assignedReduces, |
| mapResourceReqt, reduceResourceReqt, |
| numPendingReduces, |
| maxReduceRampupLimit, reduceSlowStart); |
| verify(allocator, never()).setIsReduceStarted(true); |
| verify(allocator, never()).scheduleAllReduces(); |
| |
| succeededMaps = 3; |
| doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit(); |
| allocator.scheduleReduces( |
| totalMaps, succeededMaps, |
| scheduledMaps, scheduledReduces, |
| assignedMaps, assignedReduces, |
| mapResourceReqt, reduceResourceReqt, |
| numPendingReduces, |
| maxReduceRampupLimit, reduceSlowStart); |
| verify(allocator, times(1)).setIsReduceStarted(true); |
| |
| // Test reduce ramp-up |
| doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator) |
| .getResourceLimit(); |
| allocator.scheduleReduces( |
| totalMaps, succeededMaps, |
| scheduledMaps, scheduledReduces, |
| assignedMaps, assignedReduces, |
| mapResourceReqt, reduceResourceReqt, |
| numPendingReduces, |
| maxReduceRampupLimit, reduceSlowStart); |
| verify(allocator).rampUpReduces(anyInt()); |
| verify(allocator, never()).rampDownReduces(anyInt()); |
| |
| // Test reduce ramp-down |
| scheduledReduces = 3; |
| doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) |
| .getResourceLimit(); |
| allocator.scheduleReduces( |
| totalMaps, succeededMaps, |
| scheduledMaps, scheduledReduces, |
| assignedMaps, assignedReduces, |
| mapResourceReqt, reduceResourceReqt, |
| numPendingReduces, |
| maxReduceRampupLimit, reduceSlowStart); |
| verify(allocator).rampDownReduces(anyInt()); |
| |
| // Test reduce ramp-down for when there are scheduled maps |
| // Since we have two scheduled Maps, rampDownReducers |
| // should be invoked twice. |
| scheduledMaps = 2; |
| assignedReduces = 2; |
| doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) |
| .getResourceLimit(); |
| allocator.scheduleReduces( |
| totalMaps, succeededMaps, |
| scheduledMaps, scheduledReduces, |
| assignedMaps, assignedReduces, |
| mapResourceReqt, reduceResourceReqt, |
| numPendingReduces, |
| maxReduceRampupLimit, reduceSlowStart); |
| verify(allocator, times(2)).rampDownReduces(anyInt()); |
| |
| doReturn( |
| EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU)) |
| .when(allocator).getSchedulerResourceTypes(); |
| |
| // Test ramp-down when enough memory but not enough cpu resource |
| scheduledMaps = 10; |
| assignedReduces = 0; |
| doReturn(BuilderUtils.newResource(100 * 1024, 5 * 1)).when(allocator) |
| .getResourceLimit(); |
| allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps, |
| scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, |
| reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, |
| reduceSlowStart); |
| verify(allocator, times(3)).rampDownReduces(anyInt()); |
| |
| // Test ramp-down when enough cpu but not enough memory resource |
| doReturn(BuilderUtils.newResource(10 * 1024, 100 * 1)).when(allocator) |
| .getResourceLimit(); |
| allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps, |
| scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, |
| reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, |
| reduceSlowStart); |
| verify(allocator, times(4)).rampDownReduces(anyInt()); |
| } |
| |
| private static class RecalculateContainerAllocator extends MyContainerAllocator { |
| public boolean recalculatedReduceSchedule = false; |
| |
| public RecalculateContainerAllocator(MyResourceManager rm, |
| Configuration conf, ApplicationAttemptId appAttemptId, Job job) { |
| super(rm, conf, appAttemptId, job); |
| } |
| |
| @Override |
| public void scheduleReduces(int totalMaps, int completedMaps, |
| int scheduledMaps, int scheduledReduces, int assignedMaps, |
| int assignedReduces, Resource mapResourceReqt, Resource reduceResourceReqt, |
| int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { |
| recalculatedReduceSchedule = true; |
| } |
| } |
| |
| @Test |
| public void testCompletedTasksRecalculateSchedule() throws Exception { |
| LOG.info("Running testCompletedTasksRecalculateSchedule"); |
| |
| Configuration conf = new Configuration(); |
| final MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| // Make a node to register so as to launch the AM. |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job job = mock(Job.class); |
| when(job.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| doReturn(10).when(job).getTotalMaps(); |
| doReturn(10).when(job).getTotalReduces(); |
| doReturn(0).when(job).getCompletedMaps(); |
| RecalculateContainerAllocator allocator = |
| new RecalculateContainerAllocator(rm, conf, appAttemptId, job); |
| allocator.schedule(); |
| |
| allocator.recalculatedReduceSchedule = false; |
| allocator.schedule(); |
| Assert.assertFalse("Unexpected recalculate of reduce schedule", |
| allocator.recalculatedReduceSchedule); |
| |
| doReturn(1).when(job).getCompletedMaps(); |
| allocator.schedule(); |
| Assert.assertTrue("Expected recalculate of reduce schedule", |
| allocator.recalculatedReduceSchedule); |
| } |
| |
| @Test |
| public void testHeartbeatHandler() throws Exception { |
| LOG.info("Running testHeartbeatHandler"); |
| |
| Configuration conf = new Configuration(); |
| conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1); |
| ControlledClock clock = new ControlledClock(); |
| AppContext appContext = mock(AppContext.class); |
| when(appContext.getClock()).thenReturn(clock); |
| when(appContext.getApplicationID()).thenReturn( |
| ApplicationId.newInstance(1, 1)); |
| |
| RMContainerAllocator allocator = new RMContainerAllocator( |
| mock(ClientService.class), appContext) { |
| @Override |
| protected void register() { |
| } |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mock(ApplicationMasterProtocol.class); |
| } |
| @Override |
| protected synchronized void heartbeat() throws Exception { |
| } |
| }; |
| allocator.init(conf); |
| allocator.start(); |
| |
| clock.setTime(5); |
| int timeToWaitMs = 5000; |
| while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) { |
| Thread.sleep(10); |
| timeToWaitMs -= 10; |
| } |
| Assert.assertEquals(5, allocator.getLastHeartbeatTime()); |
| clock.setTime(7); |
| timeToWaitMs = 5000; |
| while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) { |
| Thread.sleep(10); |
| timeToWaitMs -= 10; |
| } |
| Assert.assertEquals(7, allocator.getLastHeartbeatTime()); |
| |
| final AtomicBoolean callbackCalled = new AtomicBoolean(false); |
| allocator.runOnNextHeartbeat(new Runnable() { |
| @Override |
| public void run() { |
| callbackCalled.set(true); |
| } |
| }); |
| clock.setTime(8); |
| timeToWaitMs = 5000; |
| while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) { |
| Thread.sleep(10); |
| timeToWaitMs -= 10; |
| } |
| Assert.assertEquals(8, allocator.getLastHeartbeatTime()); |
| Assert.assertTrue(callbackCalled.get()); |
| } |
| |
| @Test |
| public void testCompletedContainerEvent() { |
| RMContainerAllocator allocator = new RMContainerAllocator( |
| mock(ClientService.class), mock(AppContext.class)); |
| |
| TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( |
| MRBuilderUtils.newTaskId( |
| MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); |
| ApplicationId applicationId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance( |
| applicationId, 1); |
| ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1); |
| ContainerStatus status = ContainerStatus.newInstance( |
| containerId, ContainerState.RUNNING, "", 0); |
| |
| ContainerStatus abortedStatus = ContainerStatus.newInstance( |
| containerId, ContainerState.RUNNING, "", |
| ContainerExitStatus.ABORTED); |
| |
| TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, |
| attemptId); |
| Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| event.getType()); |
| |
| TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( |
| abortedStatus, attemptId); |
| Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); |
| |
| ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2); |
| ContainerStatus status2 = ContainerStatus.newInstance(containerId2, |
| ContainerState.RUNNING, "", 0); |
| |
| ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2, |
| ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED); |
| |
| TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2, |
| attemptId); |
| Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| event2.getType()); |
| |
| TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent( |
| preemptedStatus, attemptId); |
| Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); |
| } |
| |
| @Test |
| public void testUnregistrationOnlyIfRegistered() throws Exception { |
| Configuration conf = new Configuration(); |
| final MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp rmApp = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| final ApplicationAttemptId appAttemptId = |
| rmApp.getCurrentAppAttempt().getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| MRApp mrApp = |
| new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10, |
| 0, false, this.getClass().getName(), true, 1) { |
| @Override |
| protected Dispatcher createDispatcher() { |
| return new DrainDispatcher(); |
| } |
| |
| protected ContainerAllocator createContainerAllocator( |
| ClientService clientService, AppContext context) { |
| return new MyContainerAllocator(rm, appAttemptId, context); |
| }; |
| }; |
| |
| mrApp.submit(conf); |
| DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); |
| MyContainerAllocator allocator = |
| (MyContainerAllocator) mrApp.getContainerAllocator(); |
| amDispatcher.await(); |
| Assert.assertTrue(allocator.isApplicationMasterRegistered()); |
| mrApp.stop(); |
| Assert.assertTrue(allocator.isUnregistered()); |
| } |
| |
| // Step-1 : AM send allocate request for 2 ContainerRequests and 1 |
| // blackListeNode |
| // Step-2 : 2 containers are allocated by RM. |
| // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to |
| // RM |
| // Step-4 : On RM restart, AM(does not know RM is restarted) sends |
| // additional containerRequest(event4) and blacklisted nodes. |
| // Intern RM send resync command |
| // Step-5 : On Resync,AM sends all outstanding |
| // asks,release,blacklistAaddition |
| // and another containerRequest(event5) |
| // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 |
| @Test |
| public void testRMContainerAllocatorResendsRequestsOnRMRestart() |
| throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); |
| conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); |
| conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); |
| conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); |
| conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); |
| conf.setInt( |
| MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); |
| |
| MyResourceManager rm1 = new MyResourceManager(conf); |
| rm1.start(); |
| |
| // Submit the application |
| RMApp app = rm1.submitApp(1024); |
| rm1.drainEvents(); |
| |
| MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| nm1.nodeHeartbeat(true); // Node heartbeat |
| rm1.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = |
| app.getCurrentAppAttempt().getAppAttemptId(); |
| rm1.sendAMLaunched(appAttemptId); |
| rm1.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = |
| new MyContainerAllocator(rm1, conf, appAttemptId, mockJob); |
| |
| // Step-1 : AM send allocate request for 2 ContainerRequests and 1 |
| // blackListeNode |
| // create the container request |
| // send MAP request |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 1024, new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| |
| ContainerRequestEvent event2 = |
| createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); |
| allocator.sendRequest(event2); |
| |
| // Send events to blacklist h2 |
| ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false); |
| allocator.sendFailure(f1); |
| |
| // send allocate request and 1 blacklisted nodes |
| List<TaskAttemptContainerAssignedEvent> assignedContainers = |
| allocator.schedule(); |
| rm1.drainEvents(); |
| Assert.assertEquals("No of assignments must be 0", 0, |
| assignedContainers.size()); |
| // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed |
| assertAsksAndReleases(3, 0, rm1); |
| assertBlacklistAdditionsAndRemovals(1, 0, rm1); |
| |
| nm1.nodeHeartbeat(true); // Node heartbeat |
| rm1.drainEvents(); |
| |
| // Step-2 : 2 containers are allocated by RM. |
| assignedContainers = allocator.schedule(); |
| rm1.drainEvents(); |
| Assert.assertEquals("No of assignments must be 2", 2, |
| assignedContainers.size()); |
| assertAsksAndReleases(0, 0, rm1); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm1); |
| |
| assignedContainers = allocator.schedule(); |
| Assert.assertEquals("No of assignments must be 0", 0, |
| assignedContainers.size()); |
| assertAsksAndReleases(3, 0, rm1); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm1); |
| |
| // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to |
| // RM |
| // send container request |
| ContainerRequestEvent event3 = |
| createReq(jobId, 3, 1000, new String[] { "h1" }); |
| allocator.sendRequest(event3); |
| |
| // send deallocate request |
| ContainerAllocatorEvent deallocate1 = |
| createDeallocateEvent(jobId, 1, false); |
| allocator.sendDeallocate(deallocate1); |
| |
| assignedContainers = allocator.schedule(); |
| Assert.assertEquals("No of assignments must be 0", 0, |
| assignedContainers.size()); |
| assertAsksAndReleases(3, 1, rm1); |
| assertBlacklistAdditionsAndRemovals(0, 0, rm1); |
| |
| // Phase-2 start 2nd RM is up |
| MyResourceManager rm2 = new MyResourceManager(conf, rm1.getRMStateStore()); |
| rm2.start(); |
| nm1.setResourceTrackerService(rm2.getResourceTrackerService()); |
| allocator.updateSchedulerProxy(rm2); |
| |
| // NM should be rebooted on heartbeat, even first heartbeat for nm2 |
| NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); |
| |
| // new NM to represent NM re-register |
| nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); |
| nm1.registerNode(); |
| nm1.nodeHeartbeat(true); |
| rm2.drainEvents(); |
| |
| // Step-4 : On RM restart, AM(does not know RM is restarted) sends |
| // additional containerRequest(event4) and blacklisted nodes. |
| // Intern RM send resync command |
| |
| // send deallocate request, release=1 |
| ContainerAllocatorEvent deallocate2 = |
| createDeallocateEvent(jobId, 2, false); |
| allocator.sendDeallocate(deallocate2); |
| |
| // Send events to blacklist nodes h3 |
| ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false); |
| allocator.sendFailure(f2); |
| |
| ContainerRequestEvent event4 = |
| createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); |
| allocator.sendRequest(event4); |
| |
| // send allocate request to 2nd RM and get resync command |
| allocator.schedule(); |
| rm2.drainEvents(); |
| |
| // Step-5 : On Resync,AM sends all outstanding |
| // asks,release,blacklistAaddition |
| // and another containerRequest(event5) |
| ContainerRequestEvent event5 = |
| createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); |
| allocator.sendRequest(event5); |
| |
| // send all outstanding request again. |
| assignedContainers = allocator.schedule(); |
| rm2.drainEvents(); |
| assertAsksAndReleases(3, 2, rm2); |
| assertBlacklistAdditionsAndRemovals(2, 0, rm2); |
| |
| nm1.nodeHeartbeat(true); |
| rm2.drainEvents(); |
| |
| // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 |
| assignedContainers = allocator.schedule(); |
| rm2.drainEvents(); |
| |
| Assert.assertEquals("Number of container should be 3", 3, |
| assignedContainers.size()); |
| |
| for (TaskAttemptContainerAssignedEvent assig : assignedContainers) { |
| Assert.assertTrue("Assigned count not correct", |
| "h1".equals(assig.getContainer().getNodeId().getHost())); |
| } |
| |
| rm1.stop(); |
| rm2.stop(); |
| |
| } |
| |
| @Test |
| public void testUnsupportedMapContainerRequirement() throws Exception { |
| final Resource maxContainerSupported = Resource.newInstance(1, 1); |
| |
| final ApplicationId appId = ApplicationId.newInstance(1, 1); |
| final ApplicationAttemptId appAttemptId = |
| ApplicationAttemptId.newInstance(appId, 1); |
| final JobId jobId = |
| MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| |
| final MockScheduler mockScheduler = new MockScheduler(appAttemptId); |
| final Configuration conf = new Configuration(); |
| |
| final MyContainerAllocator allocator = new MyContainerAllocator(null, |
| conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) { |
| @Override |
| protected void register() { |
| } |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mockScheduler; |
| } |
| @Override |
| protected Resource getMaxContainerCapability() { |
| return maxContainerSupported; |
| } |
| }; |
| |
| ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, |
| (int) (maxContainerSupported.getMemorySize() + 10), |
| maxContainerSupported.getVirtualCores(), |
| new String[0], false, false); |
| allocator.sendRequests(Arrays.asList(mapRequestEvt)); |
| allocator.schedule(); |
| |
| Assert.assertEquals(0, mockScheduler.lastAnyAskMap); |
| } |
| |
| @Test |
| public void testUnsupportedReduceContainerRequirement() throws Exception { |
| final Resource maxContainerSupported = Resource.newInstance(1, 1); |
| |
| final ApplicationId appId = ApplicationId.newInstance(1, 1); |
| final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 1); |
| final JobId jobId = |
| MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| |
| final MockScheduler mockScheduler = new MockScheduler(appAttemptId); |
| final Configuration conf = new Configuration(); |
| |
| final MyContainerAllocator allocator = new MyContainerAllocator(null, |
| conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) { |
| @Override |
| protected void register() { |
| } |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mockScheduler; |
| } |
| @Override |
| protected Resource getMaxContainerCapability() { |
| return maxContainerSupported; |
| } |
| }; |
| |
| ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, |
| (int) (maxContainerSupported.getMemorySize() + 10), |
| maxContainerSupported.getVirtualCores(), |
| new String[0], false, true); |
| allocator.sendRequests(Arrays.asList(reduceRequestEvt)); |
| // Reducer container requests are added to the pending queue upon request, |
| // schedule all reducers here so that we can observe if reducer requests |
| // are accepted by RMContainerAllocator on RM side. |
| allocator.scheduleAllReduces(); |
| allocator.schedule(); |
| |
| Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); |
| } |
| |
| @Test |
| public void testRMUnavailable() |
| throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setInt( |
| MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0); |
| MyResourceManager rm1 = new MyResourceManager(conf); |
| rm1.start(); |
| |
| RMApp app = rm1.submitApp(1024); |
| rm1.drainEvents(); |
| |
| MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); |
| nm1.registerNode(); |
| nm1.nodeHeartbeat(true); |
| rm1.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = |
| app.getCurrentAppAttempt().getAppAttemptId(); |
| rm1.sendAMLaunched(appAttemptId); |
| rm1.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator2 allocator = |
| new MyContainerAllocator2(rm1, conf, appAttemptId, mockJob); |
| allocator.jobEvents.clear(); |
| try { |
| allocator.schedule(); |
| Assert.fail("Should Have Exception"); |
| } catch (RMContainerAllocationException e) { |
| Assert.assertTrue(e.getMessage().contains("Could not contact RM after")); |
| } |
| rm1.drainEvents(); |
| Assert.assertEquals("Should Have 1 Job Event", 1, |
| allocator.jobEvents.size()); |
| JobEvent event = allocator.jobEvents.get(0); |
| Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT)); |
| } |
| |
| @Test(timeout=60000) |
| public void testAMRMTokenUpdate() throws Exception { |
| LOG.info("Running testAMRMTokenUpdate"); |
| |
| final String rmAddr = "somermaddress:1234"; |
| final Configuration conf = new YarnConfiguration(); |
| conf.setLong( |
| YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, 8); |
| conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 2000); |
| conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, rmAddr); |
| |
| final MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| AMRMTokenSecretManager secretMgr = |
| rm.getRMContext().getAMRMTokenSecretManager(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| final ApplicationId appId = app.getApplicationId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| final Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| |
| final Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps() |
| .get(appId).getRMAppAttempt(appAttemptId).getAMRMToken(); |
| Assert.assertNotNull("app should have a token", oldToken); |
| UserGroupInformation testUgi = UserGroupInformation.createUserForTesting( |
| "someuser", new String[0]); |
| Token<AMRMTokenIdentifier> newToken = testUgi.doAs( |
| new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() { |
| @Override |
| public Token<AMRMTokenIdentifier> run() throws Exception { |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // Keep heartbeating until RM thinks the token has been updated |
| Token<AMRMTokenIdentifier> currentToken = oldToken; |
| long startTime = Time.monotonicNow(); |
| while (currentToken == oldToken) { |
| if (Time.monotonicNow() - startTime > 20000) { |
| Assert.fail("Took to long to see AMRM token change"); |
| } |
| Thread.sleep(100); |
| allocator.schedule(); |
| currentToken = rm.getRMContext().getRMApps().get(appId) |
| .getRMAppAttempt(appAttemptId).getAMRMToken(); |
| } |
| |
| return currentToken; |
| } |
| }); |
| |
| // verify there is only one AMRM token in the UGI and it matches the |
| // updated token from the RM |
| int tokenCount = 0; |
| Token<? extends TokenIdentifier> ugiToken = null; |
| for (Token<? extends TokenIdentifier> token : testUgi.getTokens()) { |
| if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) { |
| ugiToken = token; |
| ++tokenCount; |
| } |
| } |
| |
| Assert.assertEquals("too many AMRM tokens", 1, tokenCount); |
| Assert.assertArrayEquals("token identifier not updated", |
| newToken.getIdentifier(), ugiToken.getIdentifier()); |
| Assert.assertArrayEquals("token password not updated", |
| newToken.getPassword(), ugiToken.getPassword()); |
| Assert.assertEquals("AMRM token service not updated", |
| new Text(rmAddr), ugiToken.getService()); |
| } |
| |
| @Test |
| public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception { |
| final int MAP_COUNT = 1; |
| final int REDUCE_COUNT = 1; |
| final int MAP_LIMIT = 1; |
| final int REDUCE_LIMIT = 1; |
| Configuration conf = new Configuration(); |
| conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); |
| conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); |
| conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId appAttemptId = |
| ApplicationAttemptId.newInstance(appId, 1); |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT); |
| when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT); |
| |
| final MockScheduler mockScheduler = new MockScheduler(appAttemptId); |
| MyContainerAllocator allocator = |
| new MyContainerAllocator(null, conf, appAttemptId, mockJob, |
| SystemClock.getInstance()) { |
| @Override |
| protected void register() { |
| } |
| |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mockScheduler; |
| } |
| |
| @Override |
| protected void setRequestLimit(Priority priority, |
| Resource capability, int limit) { |
| Assert.fail("setRequestLimit() should not be invoked"); |
| } |
| }; |
| |
| // create some map requests |
| ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; |
| for (int i = 0; i < reqMapEvents.length; ++i) { |
| reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); |
| } |
| allocator.sendRequests(Arrays.asList(reqMapEvents)); |
| // create some reduce requests |
| ContainerRequestEvent[] reqReduceEvents = |
| new ContainerRequestEvent[REDUCE_COUNT]; |
| for (int i = 0; i < reqReduceEvents.length; ++i) { |
| reqReduceEvents[i] = |
| createReq(jobId, i, 1024, new String[] {}, false, true); |
| } |
| allocator.sendRequests(Arrays.asList(reqReduceEvents)); |
| allocator.schedule(); |
| allocator.schedule(); |
| allocator.schedule(); |
| allocator.close(); |
| } |
| |
| @Test |
| public void testConcurrentTaskLimits() throws Exception { |
| final int MAP_COUNT = 5; |
| final int REDUCE_COUNT = 2; |
| final int MAP_LIMIT = 3; |
| final int REDUCE_LIMIT = 1; |
| LOG.info("Running testConcurrentTaskLimits"); |
| Configuration conf = new Configuration(); |
| conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); |
| conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); |
| conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( |
| appId, 1); |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT); |
| when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT); |
| |
| final MockScheduler mockScheduler = new MockScheduler(appAttemptId); |
| MyContainerAllocator allocator = new MyContainerAllocator(null, conf, |
| appAttemptId, mockJob, SystemClock.getInstance()) { |
| @Override |
| protected void register() { |
| } |
| |
| @Override |
| protected ApplicationMasterProtocol createSchedulerProxy() { |
| return mockScheduler; |
| } |
| }; |
| |
| // create some map requests |
| ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; |
| for (int i = 0; i < reqMapEvents.length; ++i) { |
| reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); |
| } |
| allocator.sendRequests(Arrays.asList(reqMapEvents)); |
| // create some reduce requests |
| ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT]; |
| for (int i = 0; i < reqReduceEvents.length; ++i) { |
| reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, |
| false, true); |
| } |
| allocator.sendRequests(Arrays.asList(reqReduceEvents)); |
| allocator.schedule(); |
| |
| // verify all of the host-specific asks were sent plus one for the |
| // default rack and one for the ANY request |
| Assert.assertEquals(reqMapEvents.length + 2, mockScheduler.lastAsk.size()); |
| |
| // verify AM is only asking for the map limit overall |
| Assert.assertEquals(MAP_LIMIT, mockScheduler.lastAnyAskMap); |
| |
| // assign a map task and verify we do not ask for any more maps |
| ContainerId cid0 = mockScheduler.assignContainer("h0", false); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(2, mockScheduler.lastAnyAskMap); |
| |
| // complete the map task and verify that we ask for one more |
| mockScheduler.completeContainer(cid0); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(3, mockScheduler.lastAnyAskMap); |
| |
| // assign three more maps and verify we ask for no more maps |
| ContainerId cid1 = mockScheduler.assignContainer("h1", false); |
| ContainerId cid2 = mockScheduler.assignContainer("h2", false); |
| ContainerId cid3 = mockScheduler.assignContainer("h3", false); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(0, mockScheduler.lastAnyAskMap); |
| |
| // complete two containers and verify we only asked for one more |
| // since at that point all maps should be scheduled/completed |
| mockScheduler.completeContainer(cid2); |
| mockScheduler.completeContainer(cid3); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(1, mockScheduler.lastAnyAskMap); |
| |
| // allocate the last container and complete the first one |
| // and verify there are no more map asks. |
| mockScheduler.completeContainer(cid1); |
| ContainerId cid4 = mockScheduler.assignContainer("h4", false); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(0, mockScheduler.lastAnyAskMap); |
| |
| // complete the last map |
| mockScheduler.completeContainer(cid4); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(0, mockScheduler.lastAnyAskMap); |
| |
| // verify only reduce limit being requested |
| Assert.assertEquals(REDUCE_LIMIT, mockScheduler.lastAnyAskReduce); |
| |
| // assign a reducer and verify ask goes to zero |
| cid0 = mockScheduler.assignContainer("h0", true); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); |
| |
| // complete the reducer and verify we ask for another |
| mockScheduler.completeContainer(cid0); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(1, mockScheduler.lastAnyAskReduce); |
| |
| // assign a reducer and verify ask goes to zero |
| cid0 = mockScheduler.assignContainer("h0", true); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); |
| |
| // complete the reducer and verify no more reducers |
| mockScheduler.completeContainer(cid0); |
| allocator.schedule(); |
| allocator.schedule(); |
| Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); |
| allocator.close(); |
| } |
| |
| @Test(expected = RMContainerAllocationException.class) |
| public void testAttemptNotFoundCausesRMCommunicatorException() |
| throws Exception { |
| |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| // Now kill the application |
| rm.killApp(app.getApplicationId()); |
| rm.waitForState(app.getApplicationId(), RMAppState.KILLED); |
| allocator.schedule(); |
| } |
| |
| @Test |
| public void testUpdateAskOnRampDownAllReduces() throws Exception { |
| LOG.info("Running testUpdateAskOnRampDownAllReduces"); |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| // Use a controlled clock to advance time for test. |
| ControlledClock clock = (ControlledClock)allocator.getContext().getClock(); |
| clock.setTime(System.currentTimeMillis()); |
| |
| // Register nodes to RM. |
| MockNM nodeManager = rm.registerNode("h1:1234", 1024); |
| rm.drainEvents(); |
| |
| // Request 2 maps and 1 reducer(sone on nodes which are not registered). |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 1024, new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| ContainerRequestEvent event2 = |
| createReq(jobId, 2, 1024, new String[] { "h2" }); |
| allocator.sendRequest(event2); |
| ContainerRequestEvent event3 = |
| createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); |
| allocator.sendRequest(event3); |
| |
| // This will tell the scheduler about the requests but there will be no |
| // allocations as nodes are not added. |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Advance clock so that maps can be considered as hanging. |
| clock.setTime(System.currentTimeMillis() + 500000L); |
| |
| // Request for another reducer on h3 which has not registered. |
| ContainerRequestEvent event4 = |
| createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); |
| allocator.sendRequest(event4); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Update resources in scheduler through node heartbeat from h1. |
| nodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // One map is assigned. |
| Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); |
| // Send deallocate request for map so that no maps are assigned after this. |
| ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false); |
| allocator.sendDeallocate(deallocate); |
| // Now one reducer should be scheduled and one should be pending. |
| Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size()); |
| Assert.assertEquals(1, allocator.getNumOfPendingReduces()); |
| // No map should be assigned and one should be scheduled. |
| Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); |
| Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); |
| |
| Assert.assertEquals(6, allocator.getAsk().size()); |
| for (ResourceRequest req : allocator.getAsk()) { |
| boolean isReduce = |
| req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); |
| if (isReduce) { |
| // 1 reducer each asked on h2, * and default-rack |
| Assert.assertTrue((req.getResourceName().equals("*") || |
| req.getResourceName().equals("/default-rack") || |
| req.getResourceName().equals("h2")) && req.getNumContainers() == 1); |
| } else { //map |
| // 0 mappers asked on h1 and 1 each on * and default-rack |
| Assert.assertTrue(((req.getResourceName().equals("*") || |
| req.getResourceName().equals("/default-rack")) && |
| req.getNumContainers() == 1) || (req.getResourceName().equals("h1") |
| && req.getNumContainers() == 0)); |
| } |
| } |
| // On next allocate request to scheduler, headroom reported will be 0. |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| // After allocate response from scheduler, all scheduled reduces are ramped |
| // down and move to pending. 3 asks are also updated with 0 containers to |
| // indicate ramping down of reduces to scheduler. |
| Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); |
| Assert.assertEquals(2, allocator.getNumOfPendingReduces()); |
| Assert.assertEquals(3, allocator.getAsk().size()); |
| for (ResourceRequest req : allocator.getAsk()) { |
| Assert.assertEquals( |
| RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); |
| Assert.assertTrue(req.getResourceName().equals("*") || |
| req.getResourceName().equals("/default-rack") || |
| req.getResourceName().equals("h2")); |
| Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability()); |
| Assert.assertEquals(0, req.getNumContainers()); |
| } |
| } |
| |
| /** |
| * MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the |
| * right order while processing finished containers. |
| */ |
| @Test |
| public void testHandlingFinishedContainers() { |
| EventHandler eventHandler = mock(EventHandler.class); |
| |
| AppContext context = mock(MRAppMaster.RunningAppContext.class); |
| when(context.getClock()).thenReturn(new ControlledClock()); |
| when(context.getClusterInfo()).thenReturn( |
| new ClusterInfo(Resource.newInstance(10240, 1))); |
| when(context.getEventHandler()).thenReturn(eventHandler); |
| RMContainerAllocator containerAllocator = |
| new RMContainerAllocatorForFinishedContainer(null, context); |
| |
| ContainerStatus finishedContainer = ContainerStatus.newInstance( |
| mock(ContainerId.class), ContainerState.COMPLETE, "", 0); |
| containerAllocator.processFinishedContainer(finishedContainer); |
| |
| InOrder inOrder = inOrder(eventHandler); |
| inOrder.verify(eventHandler).handle( |
| isA(TaskAttemptDiagnosticsUpdateEvent.class)); |
| inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class)); |
| inOrder.verifyNoMoreInteractions(); |
| } |
| |
| private static class RMContainerAllocatorForFinishedContainer |
| extends RMContainerAllocator { |
| public RMContainerAllocatorForFinishedContainer(ClientService clientService, |
| AppContext context) { |
| super(clientService, context); |
| } |
| @Override |
| protected AssignedRequests createAssignedRequests() { |
| AssignedRequests assignedReqs = mock(AssignedRequests.class); |
| TaskAttemptId taskAttempt = mock(TaskAttemptId.class); |
| when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt); |
| return assignedReqs; |
| } |
| } |
| |
| @Test |
| public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() |
| throws Exception { |
| LOG.info("Running testAvoidAskMoreReducersWhenReducerPreemptionIsRequired"); |
| Configuration conf = new Configuration(); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| // Use a controlled clock to advance time for test. |
| ControlledClock clock = (ControlledClock)allocator.getContext().getClock(); |
| clock.setTime(System.currentTimeMillis()); |
| |
| // Register nodes to RM. |
| MockNM nodeManager = rm.registerNode("h1:1234", 1024); |
| rm.drainEvents(); |
| |
| // Request 2 maps and 1 reducer(sone on nodes which are not registered). |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 1024, new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| ContainerRequestEvent event2 = |
| createReq(jobId, 2, 1024, new String[] { "h2" }); |
| allocator.sendRequest(event2); |
| ContainerRequestEvent event3 = |
| createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); |
| allocator.sendRequest(event3); |
| |
| // This will tell the scheduler about the requests but there will be no |
| // allocations as nodes are not added. |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Advance clock so that maps can be considered as hanging. |
| clock.setTime(System.currentTimeMillis() + 500000L); |
| |
| // Request for another reducer on h3 which has not registered. |
| ContainerRequestEvent event4 = |
| createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); |
| allocator.sendRequest(event4); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Update resources in scheduler through node heartbeat from h1. |
| nodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // One map is assigned. |
| Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); |
| // Send deallocate request for map so that no maps are assigned after this. |
| ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false); |
| allocator.sendDeallocate(deallocate); |
| // Now one reducer should be scheduled and one should be pending. |
| Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size()); |
| Assert.assertEquals(1, allocator.getNumOfPendingReduces()); |
| // No map should be assigned and one should be scheduled. |
| Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); |
| Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); |
| |
| Assert.assertEquals(6, allocator.getAsk().size()); |
| for (ResourceRequest req : allocator.getAsk()) { |
| boolean isReduce = |
| req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); |
| if (isReduce) { |
| // 1 reducer each asked on h2, * and default-rack |
| Assert.assertTrue((req.getResourceName().equals("*") || |
| req.getResourceName().equals("/default-rack") || |
| req.getResourceName().equals("h2")) && req.getNumContainers() == 1); |
| } else { //map |
| // 0 mappers asked on h1 and 1 each on * and default-rack |
| Assert.assertTrue(((req.getResourceName().equals("*") || |
| req.getResourceName().equals("/default-rack")) && |
| req.getNumContainers() == 1) || (req.getResourceName().equals("h1") |
| && req.getNumContainers() == 0)); |
| } |
| } |
| |
| clock.setTime(System.currentTimeMillis() + 500000L + 10 * 60 * 1000); |
| |
| // On next allocate request to scheduler, headroom reported will be 2048. |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| // After allocate response from scheduler, all scheduled reduces are ramped |
| // down and move to pending. 3 asks are also updated with 0 containers to |
| // indicate ramping down of reduces to scheduler. |
| Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); |
| Assert.assertEquals(2, allocator.getNumOfPendingReduces()); |
| Assert.assertEquals(3, allocator.getAsk().size()); |
| for (ResourceRequest req : allocator.getAsk()) { |
| Assert.assertEquals( |
| RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); |
| Assert.assertTrue(req.getResourceName().equals("*") || |
| req.getResourceName().equals("/default-rack") || |
| req.getResourceName().equals("h2")); |
| Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability()); |
| Assert.assertEquals(0, req.getNumContainers()); |
| } |
| } |
| |
| /** |
| * Tests whether scheduled reducers are excluded from headroom while |
| * calculating headroom. |
| */ |
| @Test |
| public void testExcludeSchedReducesFromHeadroom() throws Exception { |
| LOG.info("Running testExcludeSchedReducesFromHeadroom"); |
| Configuration conf = new Configuration(); |
| conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1); |
| MyResourceManager rm = new MyResourceManager(conf); |
| rm.start(); |
| |
| // Submit the application |
| RMApp app = rm.submitApp(1024); |
| rm.drainEvents(); |
| |
| MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); |
| amNodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| rm.sendAMLaunched(appAttemptId); |
| rm.drainEvents(); |
| |
| JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); |
| Job mockJob = mock(Job.class); |
| when(mockJob.getReport()).thenReturn( |
| MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, |
| 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); |
| Task mockTask = mock(Task.class); |
| TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); |
| when(mockJob.getTask((TaskId)any())).thenReturn(mockTask); |
| when(mockTask.getAttempt((TaskAttemptId)any())).thenReturn(mockTaskAttempt); |
| when(mockTaskAttempt.getProgress()).thenReturn(0.01f); |
| MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, |
| appAttemptId, mockJob); |
| |
| MockNM nodeManager = rm.registerNode("h1:1234", 4096); |
| rm.drainEvents(); |
| // Register nodes to RM. |
| MockNM nodeManager2 = rm.registerNode("h2:1234", 1024); |
| rm.drainEvents(); |
| |
| // Request 2 maps and 1 reducer(sone on nodes which are not registered). |
| ContainerRequestEvent event1 = |
| createReq(jobId, 1, 1024, new String[] { "h1" }); |
| allocator.sendRequest(event1); |
| ContainerRequestEvent event2 = |
| createReq(jobId, 2, 1024, new String[] { "h2" }); |
| allocator.sendRequest(event2); |
| ContainerRequestEvent event3 = |
| createReq(jobId, 3, 1024, new String[] { "h1" }, false, true); |
| allocator.sendRequest(event3); |
| |
| // This will tell the scheduler about the requests but there will be no |
| // allocations as nodes are not added. |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Request for another reducer on h3 which has not registered. |
| ContainerRequestEvent event4 = |
| createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); |
| allocator.sendRequest(event4); |
| |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Update resources in scheduler through node heartbeat from h1. |
| nodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // Two maps are assigned. |
| Assert.assertEquals(2, allocator.getAssignedRequests().maps.size()); |
| // Send deallocate request for map so that no maps are assigned after this. |
| ContainerAllocatorEvent deallocate1 = createDeallocateEvent(jobId, 1, false); |
| allocator.sendDeallocate(deallocate1); |
| ContainerAllocatorEvent deallocate2 = createDeallocateEvent(jobId, 2, false); |
| allocator.sendDeallocate(deallocate2); |
| // No map should be assigned. |
| Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); |
| |
| nodeManager.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| |
| // h2 heartbeats. |
| nodeManager2.nodeHeartbeat(true); |
| rm.drainEvents(); |
| |
| // Send request for one more mapper. |
| ContainerRequestEvent event5 = |
| createReq(jobId, 5, 1024, new String[] { "h1" }); |
| allocator.sendRequest(event5); |
| |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| // One reducer is assigned and one map is scheduled |
| Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); |
| Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size()); |
| // Headroom enough to run a mapper if headroom is taken as it is but wont be |
| // enough if scheduled reducers resources are deducted. |
| rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2)); |
| allocator.schedule(); |
| rm.drainEvents(); |
| // After allocate response, the one assigned reducer is preempted and killed |
| Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size()); |
| Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC, |
| MyContainerAllocator.getTaskAttemptKillEvents().get(0).getMessage()); |
| Assert.assertEquals(1, allocator.getNumOfPendingReduces()); |
| } |
| |
| private static class MockScheduler implements ApplicationMasterProtocol { |
| ApplicationAttemptId attemptId; |
| long nextContainerId = 10; |
| List<ResourceRequest> lastAsk = null; |
| int lastAnyAskMap = 0; |
| int lastAnyAskReduce = 0; |
| List<ContainerStatus> containersToComplete = |
| new ArrayList<ContainerStatus>(); |
| List<Container> containersToAllocate = new ArrayList<Container>(); |
| |
| public MockScheduler(ApplicationAttemptId attemptId) { |
| this.attemptId = attemptId; |
| } |
| |
| @Override |
| public RegisterApplicationMasterResponse registerApplicationMaster( |
| RegisterApplicationMasterRequest request) throws YarnException, |
| IOException { |
| return RegisterApplicationMasterResponse.newInstance( |
| Resource.newInstance(512, 1), |
| Resource.newInstance(512000, 1024), |
| Collections.<ApplicationAccessType,String>emptyMap(), |
| ByteBuffer.wrap("fake_key".getBytes()), |
| Collections.<Container>emptyList(), |
| "default", |
| Collections.<NMToken>emptyList()); |
| } |
| |
| @Override |
| public FinishApplicationMasterResponse finishApplicationMaster( |
| FinishApplicationMasterRequest request) throws YarnException, |
| IOException { |
| return FinishApplicationMasterResponse.newInstance(false); |
| } |
| |
| @Override |
| public AllocateResponse allocate(AllocateRequest request) |
| throws YarnException, IOException { |
| lastAsk = request.getAskList(); |
| for (ResourceRequest req : lastAsk) { |
| if (ResourceRequest.ANY.equals(req.getResourceName())) { |
| Priority priority = req.getPriority(); |
| if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) { |
| lastAnyAskMap = req.getNumContainers(); |
| } else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){ |
| lastAnyAskReduce = req.getNumContainers(); |
| } |
| } |
| } |
| AllocateResponse response = AllocateResponse.newInstance( |
| request.getResponseId(), |
| containersToComplete, containersToAllocate, |
| Collections.<NodeReport>emptyList(), |
| Resource.newInstance(512000, 1024), null, 10, null, |
| Collections.<NMToken>emptyList()); |
| // RM will always ensure that a default priority is sent to AM |
| response.setApplicationPriority(Priority.newInstance(0)); |
| containersToComplete.clear(); |
| containersToAllocate.clear(); |
| return response; |
| } |
| |
| public ContainerId assignContainer(String nodeName, boolean isReduce) { |
| ContainerId containerId = |
| ContainerId.newContainerId(attemptId, nextContainerId++); |
| Priority priority = isReduce ? RMContainerAllocator.PRIORITY_REDUCE |
| : RMContainerAllocator.PRIORITY_MAP; |
| Container container = Container.newInstance(containerId, |
| NodeId.newInstance(nodeName, 1234), nodeName + ":5678", |
| Resource.newInstance(1024, 1), priority, null); |
| containersToAllocate.add(container); |
| return containerId; |
| } |
| |
| public void completeContainer(ContainerId containerId) { |
| containersToComplete.add(ContainerStatus.newInstance(containerId, |
| ContainerState.COMPLETE, "", 0)); |
| } |
| } |
| |
| private final static class MockSchedulerForTimelineCollector |
| implements ApplicationMasterProtocol { |
| private CollectorInfo collectorInfo; |
| |
| private MockSchedulerForTimelineCollector(CollectorInfo info) { |
| this.collectorInfo = info; |
| } |
| |
| private void updateCollectorInfo(CollectorInfo info) { |
| collectorInfo = info; |
| } |
| |
| @Override |
| public RegisterApplicationMasterResponse registerApplicationMaster( |
| RegisterApplicationMasterRequest request) throws YarnException, |
| IOException { |
| return Records.newRecord(RegisterApplicationMasterResponse.class); |
| } |
| |
| @Override |
| public FinishApplicationMasterResponse finishApplicationMaster( |
| FinishApplicationMasterRequest request) throws YarnException, |
| IOException { |
| return FinishApplicationMasterResponse.newInstance(false); |
| } |
| |
| @Override |
| public AllocateResponse allocate(AllocateRequest request) |
| throws YarnException, IOException { |
| AllocateResponse response = AllocateResponse.newInstance( |
| request.getResponseId(), Collections.<ContainerStatus>emptyList(), |
| Collections.<Container>emptyList(), |
| Collections.<NodeReport>emptyList(), |
| Resource.newInstance(512000, 1024), null, 10, null, |
| Collections.<NMToken>emptyList()); |
| response.setCollectorInfo(collectorInfo); |
| return response; |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| TestRMContainerAllocator t = new TestRMContainerAllocator(); |
| t.testSimple(); |
| t.testResource(); |
| t.testMapReduceScheduling(); |
| t.testReportedAppProgress(); |
| t.testReportedAppProgressWithOnlyMaps(); |
| t.testBlackListedNodes(); |
| t.testCompletedTasksRecalculateSchedule(); |
| t.testAMRMTokenUpdate(); |
| } |
| |
| } |