| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine2; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; |
| import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; |
| import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; |
| import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl; |
| import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl; |
| import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; |
| import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerUpdateType; |
| import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; |
| import org.apache.hadoop.yarn.api.records.UpdatedContainer; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; |
| import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; |
| import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; |
| |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; |
| import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| |
| /** |
| * Test cases for {@link OpportunisticContainerAllocatorAMService}. |
| */ |
| public class TestOpportunisticContainerAllocatorAMService { |
| |
| private static final int GB = 1024; |
| |
| private MockRM rm; |
| private DrainDispatcher dispatcher; |
| |
| private OpportunisticContainersStatus oppContainersStatus = |
| getOpportunisticStatus(); |
| |
| @Before |
| public void createAndStartRM() { |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| conf.setBoolean( |
| YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); |
| conf.setInt( |
| YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); |
| startRM(conf); |
| } |
| |
| public void createAndStartRMWithAutoUpdateContainer() { |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| YarnConfiguration conf = new YarnConfiguration(csConf); |
| conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| conf.setBoolean( |
| YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); |
| conf.setInt( |
| YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); |
| startRM(conf); |
| } |
| |
| private void startRM(final YarnConfiguration conf) { |
| dispatcher = new DrainDispatcher(); |
| rm = new MockRM(conf) { |
| @Override |
| protected Dispatcher createDispatcher() { |
| return dispatcher; |
| } |
| }; |
| rm.start(); |
| } |
| |
| @After |
| public void stopRM() { |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test(timeout = 600000) |
| public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception { |
| HashMap<NodeId, MockNM> nodes = new HashMap<>(); |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm1.getNodeId(), nm1); |
| MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm2.getNodeId(), nm2); |
| MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm3.getNodeId(), nm3); |
| MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm4.getNodeId(), nm4); |
| nm1.registerNode(); |
| nm2.registerNode(); |
| nm3.registerNode(); |
| nm4.registerNode(); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| nm3.nodeHeartbeat(oppContainersStatus, true); |
| nm4.nodeHeartbeat(oppContainersStatus, true); |
| |
| OpportunisticContainerAllocatorAMService amservice = |
| (OpportunisticContainerAllocatorAMService) rm |
| .getApplicationMasterService(); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| |
| // All nodes 1 - 4 will be applicable for scheduling. |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| nm3.nodeHeartbeat(oppContainersStatus, true); |
| nm4.nodeHeartbeat(oppContainersStatus, true); |
| |
| GenericTestUtils.waitFor(() -> |
| amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100); |
| |
| QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() |
| .getMetrics(); |
| |
| // Verify Metrics |
| verifyMetrics(metrics, 15360, 15, 1024, 1, 1); |
| |
| AllocateResponse allocateResponse = am1.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2, true, null, |
| ExecutionTypeRequest.newInstance( |
| ExecutionType.OPPORTUNISTIC, true))), |
| null); |
| List<Container> allocatedContainers = allocateResponse |
| .getAllocatedContainers(); |
| Assert.assertEquals(2, allocatedContainers.size()); |
| Container container = allocatedContainers.get(0); |
| MockNM allocNode = nodes.get(container.getNodeId()); |
| MockNM sameHostDiffNode = null; |
| for (NodeId n : nodes.keySet()) { |
| if (n.getHost().equals(allocNode.getNodeId().getHost()) && |
| n.getPort() != allocNode.getNodeId().getPort()) { |
| sameHostDiffNode = nodes.get(n); |
| } |
| } |
| |
| // Verify Metrics After OPP allocation (Nothing should change) |
| verifyMetrics(metrics, 15360, 15, 1024, 1, 1); |
| |
| am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(0, |
| container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED))); |
| // Node on same host should not result in allocation |
| sameHostDiffNode.nodeHeartbeat(oppContainersStatus, true); |
| rm.drainEvents(); |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); |
| |
| // Wait for scheduler to process all events |
| dispatcher.waitForEventThreadToWait(); |
| rm.drainEvents(); |
| // Verify Metrics After OPP allocation (Nothing should change again) |
| verifyMetrics(metrics, 15360, 15, 1024, 1, 1); |
| |
| // Send Promotion req again... this should result in update error |
| allocateResponse = am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(0, |
| container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED))); |
| Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); |
| Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); |
| Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", |
| allocateResponse.getUpdateErrors().get(0).getReason()); |
| Assert.assertEquals(container.getId(), |
| allocateResponse.getUpdateErrors().get(0) |
| .getUpdateContainerRequest().getContainerId()); |
| |
| // Send Promotion req again with incorrect version... |
| // this should also result in update error |
| allocateResponse = am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(1, |
| container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED))); |
| |
| Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); |
| Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); |
| Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR", |
| allocateResponse.getUpdateErrors().get(0).getReason()); |
| Assert.assertEquals(0, |
| allocateResponse.getUpdateErrors().get(0) |
| .getCurrentContainerVersion()); |
| Assert.assertEquals(container.getId(), |
| allocateResponse.getUpdateErrors().get(0) |
| .getUpdateContainerRequest().getContainerId()); |
| |
| // Ensure after correct node heartbeats, we should get the allocation |
| allocNode.nodeHeartbeat(oppContainersStatus, true); |
| rm.drainEvents(); |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| Container uc = |
| allocateResponse.getUpdatedContainers().get(0).getContainer(); |
| Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); |
| Assert.assertEquals(uc.getId(), container.getId()); |
| Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); |
| |
| // Verify Metrics After OPP allocation : |
| // Allocated cores+mem should have increased, available should decrease |
| verifyMetrics(metrics, 14336, 14, 2048, 2, 2); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| nm3.nodeHeartbeat(oppContainersStatus, true); |
| nm4.nodeHeartbeat(oppContainersStatus, true); |
| rm.drainEvents(); |
| |
| // Verify that the container is still in ACQUIRED state wrt the RM. |
| RMContainer rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); |
| Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); |
| |
| // Now demote the container back.. |
| allocateResponse = am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(), |
| uc.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, |
| null, ExecutionType.OPPORTUNISTIC))); |
| // This should happen in the same heartbeat.. |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| uc = allocateResponse.getUpdatedContainers().get(0).getContainer(); |
| Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType()); |
| Assert.assertEquals(uc.getId(), container.getId()); |
| Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); |
| |
| // Wait for scheduler to finish processing events |
| dispatcher.waitForEventThreadToWait(); |
| rm.drainEvents(); |
| // Verify Metrics After OPP allocation : |
| // Everything should have reverted to what it was |
| verifyMetrics(metrics, 15360, 15, 1024, 1, 1); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContainerPromoteAfterContainerStart() throws Exception { |
| HashMap<NodeId, MockNM> nodes = new HashMap<>(); |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm1.getNodeId(), nm1); |
| MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm2.getNodeId(), nm2); |
| nm1.registerNode(); |
| nm2.registerNode(); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| OpportunisticContainerAllocatorAMService amservice = |
| (OpportunisticContainerAllocatorAMService) rm |
| .getApplicationMasterService(); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| |
| // All nodes 1 to 2 will be applicable for scheduling. |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| GenericTestUtils.waitFor(() -> |
| amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); |
| |
| QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() |
| .getMetrics(); |
| |
| // Verify Metrics |
| verifyMetrics(metrics, 7168, 7, 1024, 1, 1); |
| |
| AllocateResponse allocateResponse = am1.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2, true, null, |
| ExecutionTypeRequest.newInstance( |
| ExecutionType.OPPORTUNISTIC, true))), |
| null); |
| List<Container> allocatedContainers = allocateResponse |
| .getAllocatedContainers(); |
| Assert.assertEquals(2, allocatedContainers.size()); |
| Container container = allocatedContainers.get(0); |
| MockNM allocNode = nodes.get(container.getNodeId()); |
| |
| // Start Container in NM |
| allocNode.nodeHeartbeat(Arrays.asList( |
| ContainerStatus.newInstance(container.getId(), |
| ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), |
| true); |
| rm.drainEvents(); |
| |
| // Verify that container is actually running wrt the RM.. |
| RMContainer rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| container.getId().getApplicationAttemptId()).getRMContainer( |
| container.getId()); |
| Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); |
| |
| // Verify Metrics After OPP allocation (Nothing should change) |
| verifyMetrics(metrics, 7168, 7, 1024, 1, 1); |
| |
| am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(0, |
| container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED))); |
| |
| // Verify Metrics After OPP allocation (Nothing should change again) |
| verifyMetrics(metrics, 7168, 7, 1024, 1, 1); |
| |
| // Send Promotion req again... this should result in update error |
| allocateResponse = am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(0, |
| container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED))); |
| Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); |
| Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); |
| Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", |
| allocateResponse.getUpdateErrors().get(0).getReason()); |
| Assert.assertEquals(container.getId(), |
| allocateResponse.getUpdateErrors().get(0) |
| .getUpdateContainerRequest().getContainerId()); |
| |
| // Start Container in NM |
| allocNode.nodeHeartbeat(Arrays.asList( |
| ContainerStatus.newInstance(container.getId(), |
| ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), |
| true); |
| rm.drainEvents(); |
| |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| Container uc = |
| allocateResponse.getUpdatedContainers().get(0).getContainer(); |
| Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); |
| Assert.assertEquals(uc.getId(), container.getId()); |
| Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); |
| |
| // Verify that the Container is still in RUNNING state wrt RM.. |
| rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); |
| Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); |
| |
| // Verify Metrics After OPP allocation : |
| // Allocated cores+mem should have increased, available should decrease |
| verifyMetrics(metrics, 6144, 6, 2048, 2, 2); |
| } |
| |
| @Test(timeout = 600000) |
| public void testContainerPromoteAfterContainerComplete() throws Exception { |
| HashMap<NodeId, MockNM> nodes = new HashMap<>(); |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm1.getNodeId(), nm1); |
| MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm2.getNodeId(), nm2); |
| nm1.registerNode(); |
| nm2.registerNode(); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| OpportunisticContainerAllocatorAMService amservice = |
| (OpportunisticContainerAllocatorAMService) rm |
| .getApplicationMasterService(); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| |
| |
| // All nodes 1 to 2 will be applicable for scheduling. |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| GenericTestUtils.waitFor(() -> |
| amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); |
| |
| QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() |
| .getMetrics(); |
| |
| // Verify Metrics |
| verifyMetrics(metrics, 7168, 7, 1024, 1, 1); |
| |
| AllocateResponse allocateResponse = am1.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2, true, null, |
| ExecutionTypeRequest.newInstance( |
| ExecutionType.OPPORTUNISTIC, true))), |
| null); |
| List<Container> allocatedContainers = allocateResponse |
| .getAllocatedContainers(); |
| Assert.assertEquals(2, allocatedContainers.size()); |
| Container container = allocatedContainers.get(0); |
| MockNM allocNode = nodes.get(container.getNodeId()); |
| |
| // Start Container in NM |
| allocNode.nodeHeartbeat(Arrays.asList( |
| ContainerStatus.newInstance(container.getId(), |
| ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), |
| true); |
| rm.drainEvents(); |
| |
| // Verify that container is actually running wrt the RM.. |
| RMContainer rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| container.getId().getApplicationAttemptId()).getRMContainer( |
| container.getId()); |
| Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); |
| |
| // Container Completed in the NM |
| allocNode.nodeHeartbeat(Arrays.asList( |
| ContainerStatus.newInstance(container.getId(), |
| ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), |
| true); |
| rm.drainEvents(); |
| |
| // Verify that container has been removed.. |
| rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| container.getId().getApplicationAttemptId()).getRMContainer( |
| container.getId()); |
| Assert.assertNull(rmContainer); |
| |
| // Verify Metrics After OPP allocation (Nothing should change) |
| verifyMetrics(metrics, 7168, 7, 1024, 1, 1); |
| |
| // Send Promotion req... this should result in update error |
| // Since the container doesn't exist anymore.. |
| allocateResponse = am1.sendContainerUpdateRequest( |
| Arrays.asList(UpdateContainerRequest.newInstance(0, |
| container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED))); |
| |
| Assert.assertEquals(1, |
| allocateResponse.getCompletedContainersStatuses().size()); |
| Assert.assertEquals(container.getId(), |
| allocateResponse.getCompletedContainersStatuses().get(0) |
| .getContainerId()); |
| Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); |
| Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); |
| Assert.assertEquals("INVALID_CONTAINER_ID", |
| allocateResponse.getUpdateErrors().get(0).getReason()); |
| Assert.assertEquals(container.getId(), |
| allocateResponse.getUpdateErrors().get(0) |
| .getUpdateContainerRequest().getContainerId()); |
| |
| // Verify Metrics After OPP allocation (Nothing should change again) |
| verifyMetrics(metrics, 7168, 7, 1024, 1, 1); |
| } |
| |
| @Test(timeout = 600000) |
| public void testContainerAutoUpdateContainer() throws Exception { |
| rm.stop(); |
| createAndStartRMWithAutoUpdateContainer(); |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| nm1.registerNode(); |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| |
| OpportunisticContainerAllocatorAMService amservice = |
| (OpportunisticContainerAllocatorAMService) rm |
| .getApplicationMasterService(); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| |
| GenericTestUtils.waitFor(() -> |
| amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100); |
| |
| AllocateResponse allocateResponse = am1.allocate(Arrays.asList( |
| ResourceRequest.newInstance(Priority.newInstance(1), "*", |
| Resources.createResource(1 * GB), 2, true, null, |
| ExecutionTypeRequest |
| .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); |
| List<Container> allocatedContainers = |
| allocateResponse.getAllocatedContainers(); |
| allocatedContainers.addAll( |
| am1.allocate(null, null).getAllocatedContainers()); |
| Assert.assertEquals(2, allocatedContainers.size()); |
| Container container = allocatedContainers.get(0); |
| // Start Container in NM |
| nm1.nodeHeartbeat(Arrays.asList(ContainerStatus |
| .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, |
| ContainerState.RUNNING, "", 0)), true); |
| rm.drainEvents(); |
| |
| // Verify that container is actually running wrt the RM.. |
| RMContainer rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt(container.getId().getApplicationAttemptId()) |
| .getRMContainer(container.getId()); |
| Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); |
| |
| // Send Promotion req... this should result in update error |
| // Since the container doesn't exist anymore.. |
| allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( |
| UpdateContainerRequest.newInstance(0, container.getId(), |
| ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, |
| ExecutionType.GUARANTEED))); |
| |
| nm1.nodeHeartbeat(Arrays.asList(ContainerStatus |
| .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, |
| ContainerState.RUNNING, "", 0)), true); |
| rm.drainEvents(); |
| // Get the update response on next allocate |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| // Check the update response from YARNRM |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| UpdatedContainer uc = allocateResponse.getUpdatedContainers().get(0); |
| Assert.assertEquals(container.getId(), uc.getContainer().getId()); |
| Assert.assertEquals(ExecutionType.GUARANTEED, |
| uc.getContainer().getExecutionType()); |
| // Check that the container is updated in NM through NM heartbeat response |
| NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(1, response.getContainersToUpdate().size()); |
| Container containersFromNM = response.getContainersToUpdate().get(0); |
| Assert.assertEquals(container.getId(), containersFromNM.getId()); |
| Assert.assertEquals(ExecutionType.GUARANTEED, |
| containersFromNM.getExecutionType()); |
| |
| //Increase resources |
| allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( |
| UpdateContainerRequest.newInstance(1, container.getId(), |
| ContainerUpdateType.INCREASE_RESOURCE, |
| Resources.createResource(2 * GB, 1), null))); |
| response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus |
| .newInstance(container.getId(), ExecutionType.GUARANTEED, |
| ContainerState.RUNNING, "", 0)), true); |
| |
| rm.drainEvents(); |
| if (allocateResponse.getUpdatedContainers().size() == 0) { |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| } |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| uc = allocateResponse.getUpdatedContainers().get(0); |
| Assert.assertEquals(container.getId(), uc.getContainer().getId()); |
| Assert.assertEquals(Resource.newInstance(2 * GB, 1), |
| uc.getContainer().getResource()); |
| rm.drainEvents(); |
| |
| // Check that the container resources are increased in |
| // NM through NM heartbeat response |
| if (response.getContainersToUpdate().size() == 0) { |
| response = nm1.nodeHeartbeat(true); |
| } |
| Assert.assertEquals(1, response.getContainersToUpdate().size()); |
| Assert.assertEquals(Resource.newInstance(2 * GB, 1), |
| response.getContainersToUpdate().get(0).getResource()); |
| |
| //Decrease resources |
| allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( |
| UpdateContainerRequest.newInstance(2, container.getId(), |
| ContainerUpdateType.DECREASE_RESOURCE, |
| Resources.createResource(1 * GB, 1), null))); |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| rm.drainEvents(); |
| |
| // Check that the container resources are decreased |
| // in NM through NM heartbeat response |
| response = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(1, response.getContainersToUpdate().size()); |
| Assert.assertEquals(Resource.newInstance(1 * GB, 1), |
| response.getContainersToUpdate().get(0).getResource()); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| // DEMOTE the container |
| allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( |
| UpdateContainerRequest.newInstance(3, container.getId(), |
| ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null, |
| ExecutionType.OPPORTUNISTIC))); |
| |
| response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus |
| .newInstance(container.getId(), ExecutionType.GUARANTEED, |
| ContainerState.RUNNING, "", 0)), true); |
| rm.drainEvents(); |
| if (allocateResponse.getUpdatedContainers().size() == 0) { |
| // Get the update response on next allocate |
| allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); |
| } |
| // Check the update response from YARNRM |
| Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); |
| uc = allocateResponse.getUpdatedContainers().get(0); |
| Assert.assertEquals(ExecutionType.OPPORTUNISTIC, |
| uc.getContainer().getExecutionType()); |
| // Check that the container is updated in NM through NM heartbeat response |
| if (response.getContainersToUpdate().size() == 0) { |
| response = nm1.nodeHeartbeat(oppContainersStatus, true); |
| } |
| Assert.assertEquals(1, response.getContainersToUpdate().size()); |
| Assert.assertEquals(ExecutionType.OPPORTUNISTIC, |
| response.getContainersToUpdate().get(0).getExecutionType()); |
| } |
| |
| private void verifyMetrics(QueueMetrics metrics, long availableMB, |
| int availableVirtualCores, long allocatedMB, |
| int allocatedVirtualCores, int allocatedContainers) { |
| Assert.assertEquals(availableMB, metrics.getAvailableMB()); |
| Assert.assertEquals(availableVirtualCores, metrics.getAvailableVirtualCores()); |
| Assert.assertEquals(allocatedMB, metrics.getAllocatedMB()); |
| Assert.assertEquals(allocatedVirtualCores, metrics.getAllocatedVirtualCores()); |
| Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testOpportunisticSchedulerMetrics() throws Exception { |
| HashMap<NodeId, MockNM> nodes = new HashMap<>(); |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm1.getNodeId(), nm1); |
| MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); |
| nodes.put(nm2.getNodeId(), nm2); |
| nm1.registerNode(); |
| nm2.registerNode(); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| OpportunisticSchedulerMetrics metrics = |
| OpportunisticSchedulerMetrics.getMetrics(); |
| |
| int allocContainers = metrics.getAllocatedContainers(); |
| long aggrAllocatedContainers = metrics.getAggregatedAllocatedContainers(); |
| long aggrOffSwitchContainers = metrics.getAggregatedOffSwitchContainers(); |
| long aggrReleasedContainers = metrics.getAggregatedReleasedContainers(); |
| |
| OpportunisticContainerAllocatorAMService amservice = |
| (OpportunisticContainerAllocatorAMService) rm |
| .getApplicationMasterService(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .build()); |
| ApplicationAttemptId attemptId = |
| app1.getCurrentAppAttempt().getAppAttemptId(); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| |
| // All nodes 1 to 2 will be applicable for scheduling. |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| GenericTestUtils.waitFor(() -> |
| amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); |
| |
| AllocateResponse allocateResponse = am1.allocate(Arrays.asList( |
| ResourceRequest.newInstance(Priority.newInstance(1), "*", |
| Resources.createResource(1 * GB), 2, true, null, |
| ExecutionTypeRequest |
| .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); |
| |
| List<Container> allocatedContainers = allocateResponse |
| .getAllocatedContainers(); |
| Assert.assertEquals(2, allocatedContainers.size()); |
| |
| Assert.assertEquals(allocContainers + 2, metrics.getAllocatedContainers()); |
| Assert.assertEquals(aggrAllocatedContainers + 2, |
| metrics.getAggregatedAllocatedContainers()); |
| Assert.assertEquals(aggrOffSwitchContainers + 2, |
| metrics.getAggregatedOffSwitchContainers()); |
| |
| Container container = allocatedContainers.get(0); |
| MockNM allocNode = nodes.get(container.getNodeId()); |
| |
| // Start Container in NM |
| allocNode.nodeHeartbeat(Arrays.asList( |
| ContainerStatus.newInstance(container.getId(), |
| ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), |
| true); |
| rm.drainEvents(); |
| |
| // Verify that container is actually running wrt the RM.. |
| RMContainer rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| container.getId().getApplicationAttemptId()).getRMContainer( |
| container.getId()); |
| Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); |
| |
| // Container Completed in the NM |
| allocNode.nodeHeartbeat(Arrays.asList( |
| ContainerStatus.newInstance(container.getId(), |
| ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), |
| true); |
| rm.drainEvents(); |
| |
| // Verify that container has been removed.. |
| rmContainer = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt( |
| container.getId().getApplicationAttemptId()).getRMContainer( |
| container.getId()); |
| Assert.assertNull(rmContainer); |
| |
| Assert.assertEquals(allocContainers + 1, metrics.getAllocatedContainers()); |
| Assert.assertEquals(aggrReleasedContainers + 1, |
| metrics.getAggregatedReleasedContainers()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAMCrashDuringAllocate() throws Exception { |
| MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); |
| nm.registerNode(); |
| |
| RMApp app = MockRMAppSubmitter.submit(rm, |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .build()); |
| ApplicationAttemptId attemptId0 = |
| app.getCurrentAppAttempt().getAppAttemptId(); |
| MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); |
| |
| //simulate AM crash by replacing the current attempt |
| //Do not use rm.failApplicationAttempt, the bug will skip due to |
| //ApplicationDoesNotExistInCacheException |
| CapacityScheduler scheduler= ((CapacityScheduler) rm.getRMContext(). |
| getScheduler()); |
| SchedulerApplication<FiCaSchedulerApp> schApp = |
| (SchedulerApplication<FiCaSchedulerApp>)scheduler. |
| getSchedulerApplications().get(attemptId0.getApplicationId()); |
| final ApplicationAttemptId appAttemptId1 = TestUtils. |
| getMockApplicationAttemptId(1, 1); |
| schApp.setCurrentAppAttempt(new FiCaSchedulerApp(appAttemptId1, |
| null, scheduler.getQueue("default"), null, rm.getRMContext())); |
| |
| //start to allocate |
| am.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2)), null); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNodeRemovalDuringAllocate() throws Exception { |
| MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); |
| MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); |
| nm1.registerNode(); |
| nm2.registerNode(); |
| |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| OpportunisticContainerAllocatorAMService amservice = |
| (OpportunisticContainerAllocatorAMService) rm |
| .getApplicationMasterService(); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .withUnmanagedAM(false) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| ApplicationAttemptId attemptId = |
| app1.getCurrentAppAttempt().getAppAttemptId(); |
| MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); |
| |
| OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) |
| .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); |
| |
| // Both node 1 and node 2 will be applicable for scheduling. |
| nm1.nodeHeartbeat(oppContainersStatus, true); |
| nm2.nodeHeartbeat(oppContainersStatus, true); |
| |
| for (int i = 0; i < 10; i++) { |
| am1.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2)), |
| null); |
| if (ctxt.getNodeMap().size() == 2) { |
| break; |
| } |
| Thread.sleep(50); |
| } |
| Assert.assertEquals(2, ctxt.getNodeMap().size()); |
| // Remove node from scheduler but not from AM Service. |
| scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1)); |
| // After removal of node 1, only 1 node will be applicable for scheduling. |
| for (int i = 0; i < 10; i++) { |
| try { |
| am1.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2)), |
| null); |
| } catch (Exception e) { |
| Assert.fail("Allocate request should be handled on node removal"); |
| } |
| if (ctxt.getNodeMap().size() == 1) { |
| break; |
| } |
| Thread.sleep(50); |
| } |
| Assert.assertEquals(1, ctxt.getNodeMap().size()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { |
| MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); |
| |
| nm.registerNode(); |
| nm.nodeHeartbeat(oppContainersStatus, true); |
| |
| RMApp app = MockRMAppSubmitter.submit(rm, |
| MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| .withAppName("app") |
| .withUser("user") |
| .withAcls(null) |
| .withQueue("default") |
| .build()); |
| ApplicationAttemptId attemptId = |
| app.getCurrentAppAttempt().getAppAttemptId(); |
| MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); |
| ResourceScheduler scheduler = rm.getResourceScheduler(); |
| SchedulerApplicationAttempt schedulerAttempt = |
| ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId); |
| RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId()); |
| |
| nm.nodeHeartbeat(oppContainersStatus, true); |
| |
| GenericTestUtils.waitFor(() -> |
| scheduler.getNumClusterNodes() == 1, 10, 200 * 100); |
| |
| AllocateResponse allocateResponse = am.allocate( |
| Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), |
| "*", Resources.createResource(1 * GB), 2, true, null, |
| ExecutionTypeRequest.newInstance( |
| ExecutionType.OPPORTUNISTIC, true))), |
| null); |
| List<Container> allocatedContainers = allocateResponse |
| .getAllocatedContainers(); |
| Container container = allocatedContainers.get(0); |
| scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1)); |
| |
| GenericTestUtils.waitFor(() -> |
| scheduler.getNumClusterNodes() == 0, 10, 200 * 100); |
| |
| //test YARN-9165 |
| RMContainer rmContainer = null; |
| rmContainer = SchedulerUtils.createOpportunisticRmContainer( |
| rm.getRMContext(), container, true); |
| if (rmContainer == null) { |
| rmContainer = new RMContainerImpl(container, |
| SchedulerRequestKey.extractFrom(container), |
| schedulerAttempt.getApplicationAttemptId(), container.getNodeId(), |
| schedulerAttempt.getUser(), rm.getRMContext(), true); |
| } |
| //test YARN-9164 |
| schedulerAttempt.addRMContainer(container.getId(), rmContainer); |
| scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId, |
| RMAppAttemptState.FAILED, false)); |
| } |
| |
| private OpportunisticContainersStatus getOpportunisticStatus() { |
| return getOppurtunisticStatus(-1, 100, 1000); |
| } |
| |
| private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, |
| int queueLength, int queueCapacity) { |
| OpportunisticContainersStatus status = |
| OpportunisticContainersStatus.newInstance(); |
| status.setEstimatedQueueWaitTime(waitTime); |
| status.setOpportQueueCapacity(queueCapacity); |
| status.setWaitQueueLength(queueLength); |
| return status; |
| } |
| |
| // Test if the OpportunisticContainerAllocatorAMService can handle both |
| // DSProtocol as well as AMProtocol clients |
| @Test |
| public void testRPCWrapping() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class |
| .getName()); |
| YarnRPC rpc = YarnRPC.create(conf); |
| String bindAddr = "localhost:0"; |
| InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); |
| conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr); |
| final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null); |
| final RMContext rmContext = new RMContextImpl() { |
| @Override |
| public AMLivelinessMonitor getAMLivelinessMonitor() { |
| return null; |
| } |
| |
| @Override |
| public Configuration getYarnConfiguration() { |
| return new YarnConfiguration(); |
| } |
| |
| @Override |
| public RMContainerTokenSecretManager getContainerTokenSecretManager() { |
| return new RMContainerTokenSecretManager(conf); |
| } |
| |
| @Override |
| public ResourceScheduler getScheduler() { |
| return new FifoScheduler(); |
| } |
| }; |
| Container c = factory.newRecordInstance(Container.class); |
| c.setExecutionType(ExecutionType.OPPORTUNISTIC); |
| c.setId( |
| ContainerId.newContainerId( |
| ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(12345, 1), 2), 3)); |
| AllocateRequest allReq = |
| (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); |
| allReq.setAskList(Arrays.asList( |
| ResourceRequest.newInstance(Priority.UNDEFINED, "a", |
| Resource.newInstance(1, 2), 1, true, "exp", |
| ExecutionTypeRequest.newInstance( |
| ExecutionType.OPPORTUNISTIC, true)))); |
| OpportunisticContainerAllocatorAMService service = |
| createService(factory, rmContext, c); |
| conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); |
| Server server = service.getServer(rpc, conf, addr, null); |
| server.start(); |
| |
| // Verify that the OpportunisticContainerAllocatorAMSercvice can handle |
| // vanilla ApplicationMasterProtocol clients |
| RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, |
| ProtobufRpcEngine2.class); |
| ApplicationMasterProtocolPB ampProxy = |
| RPC.getProxy(ApplicationMasterProtocolPB |
| .class, 1, NetUtils.getConnectAddress(server), conf); |
| RegisterApplicationMasterResponse regResp = |
| new RegisterApplicationMasterResponsePBImpl( |
| ampProxy.registerApplicationMaster(null, |
| ((RegisterApplicationMasterRequestPBImpl)factory |
| .newRecordInstance( |
| RegisterApplicationMasterRequest.class)).getProto())); |
| Assert.assertEquals("dummyQueue", regResp.getQueue()); |
| FinishApplicationMasterResponse finishResp = |
| new FinishApplicationMasterResponsePBImpl( |
| ampProxy.finishApplicationMaster(null, |
| ((FinishApplicationMasterRequestPBImpl)factory |
| .newRecordInstance( |
| FinishApplicationMasterRequest.class)).getProto() |
| )); |
| Assert.assertEquals(false, finishResp.getIsUnregistered()); |
| AllocateResponse allocResp = |
| new AllocateResponsePBImpl( |
| ampProxy.allocate(null, |
| ((AllocateRequestPBImpl)factory |
| .newRecordInstance(AllocateRequest.class)).getProto()) |
| ); |
| List<Container> allocatedContainers = allocResp.getAllocatedContainers(); |
| Assert.assertEquals(1, allocatedContainers.size()); |
| Assert.assertEquals(ExecutionType.OPPORTUNISTIC, |
| allocatedContainers.get(0).getExecutionType()); |
| Assert.assertEquals(12345, allocResp.getNumClusterNodes()); |
| |
| |
| // Verify that the DistrubutedSchedulingService can handle the |
| // DistributedSchedulingAMProtocol clients as well |
| RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class, |
| ProtobufRpcEngine2.class); |
| DistributedSchedulingAMProtocolPB dsProxy = |
| RPC.getProxy(DistributedSchedulingAMProtocolPB |
| .class, 1, NetUtils.getConnectAddress(server), conf); |
| |
| RegisterDistributedSchedulingAMResponse dsRegResp = |
| new RegisterDistributedSchedulingAMResponsePBImpl( |
| dsProxy.registerApplicationMasterForDistributedScheduling(null, |
| ((RegisterApplicationMasterRequestPBImpl)factory |
| .newRecordInstance(RegisterApplicationMasterRequest.class)) |
| .getProto())); |
| Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); |
| Assert.assertEquals(4, |
| dsRegResp.getMaxContainerResource().getVirtualCores()); |
| Assert.assertEquals(1024, |
| dsRegResp.getMinContainerResource().getMemorySize()); |
| Assert.assertEquals(2, |
| dsRegResp.getIncrContainerResource().getVirtualCores()); |
| |
| DistributedSchedulingAllocateRequestPBImpl distAllReq = |
| (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance( |
| DistributedSchedulingAllocateRequest.class); |
| distAllReq.setAllocateRequest(allReq); |
| distAllReq.setAllocatedContainers(Arrays.asList(c)); |
| DistributedSchedulingAllocateResponse dsAllocResp = |
| new DistributedSchedulingAllocateResponsePBImpl( |
| dsProxy.allocateForDistributedScheduling(null, |
| distAllReq.getProto())); |
| Assert.assertEquals( |
| "h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost()); |
| Assert.assertEquals( |
| "l1", dsAllocResp.getNodesForScheduling().get(1).getNodePartition()); |
| |
| FinishApplicationMasterResponse dsfinishResp = |
| new FinishApplicationMasterResponsePBImpl( |
| dsProxy.finishApplicationMaster(null, |
| ((FinishApplicationMasterRequestPBImpl) factory |
| .newRecordInstance(FinishApplicationMasterRequest.class)) |
| .getProto())); |
| Assert.assertEquals( |
| false, dsfinishResp.getIsUnregistered()); |
| } |
| |
| private OpportunisticContainerAllocatorAMService createService( |
| final RecordFactory factory, final RMContext rmContext, |
| final Container c) { |
| return new OpportunisticContainerAllocatorAMService(rmContext, null) { |
| @Override |
| public RegisterApplicationMasterResponse registerApplicationMaster( |
| RegisterApplicationMasterRequest request) throws |
| YarnException, IOException { |
| RegisterApplicationMasterResponse resp = factory.newRecordInstance( |
| RegisterApplicationMasterResponse.class); |
| // Dummy Entry to Assert that we get this object back |
| resp.setQueue("dummyQueue"); |
| return resp; |
| } |
| |
| @Override |
| public FinishApplicationMasterResponse finishApplicationMaster( |
| FinishApplicationMasterRequest request) throws YarnException, |
| IOException { |
| FinishApplicationMasterResponse resp = factory.newRecordInstance( |
| FinishApplicationMasterResponse.class); |
| // Dummy Entry to Assert that we get this object back |
| resp.setIsUnregistered(false); |
| return resp; |
| } |
| |
| @Override |
| public AllocateResponse allocate(AllocateRequest request) throws |
| YarnException, IOException { |
| AllocateResponse response = factory.newRecordInstance( |
| AllocateResponse.class); |
| response.setNumClusterNodes(12345); |
| response.setAllocatedContainers(Arrays.asList(c)); |
| return response; |
| } |
| |
| @Override |
| public RegisterDistributedSchedulingAMResponse |
| registerApplicationMasterForDistributedScheduling( |
| RegisterApplicationMasterRequest request) |
| throws YarnException, IOException { |
| RegisterDistributedSchedulingAMResponse resp = factory |
| .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); |
| resp.setContainerIdStart(54321L); |
| resp.setMaxContainerResource(Resource.newInstance(4096, 4)); |
| resp.setMinContainerResource(Resource.newInstance(1024, 1)); |
| resp.setIncrContainerResource(Resource.newInstance(2048, 2)); |
| return resp; |
| } |
| |
| @Override |
| public DistributedSchedulingAllocateResponse |
| allocateForDistributedScheduling( |
| DistributedSchedulingAllocateRequest request) |
| throws YarnException, IOException { |
| List<ResourceRequest> askList = |
| request.getAllocateRequest().getAskList(); |
| List<Container> allocatedContainers = request.getAllocatedContainers(); |
| Assert.assertEquals(1, allocatedContainers.size()); |
| Assert.assertEquals(ExecutionType.OPPORTUNISTIC, |
| allocatedContainers.get(0).getExecutionType()); |
| Assert.assertEquals(1, askList.size()); |
| Assert.assertTrue(askList.get(0) |
| .getExecutionTypeRequest().getEnforceExecutionType()); |
| DistributedSchedulingAllocateResponse resp = factory |
| .newRecordInstance(DistributedSchedulingAllocateResponse.class); |
| RemoteNode remoteNode1 = RemoteNode.newInstance( |
| NodeId.newInstance("h1", 1234), "http://h1:4321"); |
| RemoteNode remoteNode2 = RemoteNode.newInstance( |
| NodeId.newInstance("h2", 1234), "http://h2:4321"); |
| remoteNode2.setNodePartition("l1"); |
| resp.setNodesForScheduling( |
| Arrays.asList(remoteNode1, remoteNode2)); |
| return resp; |
| } |
| }; |
| } |
| } |