| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueState; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| /** |
| * Test Queue States. |
| */ |
| public class TestQueueState { |
| |
| private static final String Q1 = "q1"; |
| private static final String Q2 = "q2"; |
| private static final String Q3 = "q3"; |
| |
| private final static String Q1_PATH = |
| CapacitySchedulerConfiguration.ROOT + "." + Q1; |
| private final static String Q2_PATH = |
| Q1_PATH + "." + Q2; |
| private final static String Q3_PATH = |
| Q1_PATH + "." + Q3; |
| private CapacityScheduler cs; |
| private YarnConfiguration conf; |
| |
| @Test (timeout = 15000) |
| public void testQueueState() throws IOException { |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); |
| csConf.setQueues(Q1_PATH, new String[] {Q2}); |
| |
| csConf.setCapacity(Q1_PATH, 100); |
| csConf.setCapacity(Q2_PATH, 100); |
| |
| conf = new YarnConfiguration(csConf); |
| cs = new CapacityScheduler(); |
| |
| RMContext rmContext = TestUtils.getMockRMContext(); |
| cs.setConf(conf); |
| cs.setRMContext(rmContext); |
| cs.init(conf); |
| |
| //by default, the state of both queues should be RUNNING |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); |
| |
| // Change the state of Q1 to STOPPED, and re-initiate the CS |
| csConf.setState(Q1_PATH, QueueState.STOPPED); |
| conf = new YarnConfiguration(csConf); |
| cs.reinitialize(conf, rmContext); |
| // The state of Q1 and its child: Q2 should be STOPPED |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); |
| |
| // Change the state of Q1 to RUNNING, and change the state of Q2 to STOPPED |
| csConf.setState(Q1_PATH, QueueState.RUNNING); |
| csConf.setState(Q2_PATH, QueueState.STOPPED); |
| conf = new YarnConfiguration(csConf); |
| // reinitialize the CS, the operation should be successful |
| cs.reinitialize(conf, rmContext); |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); |
| |
| // Change the state of Q1 to STOPPED, and change the state of Q2 to RUNNING |
| csConf.setState(Q1_PATH, QueueState.STOPPED); |
| csConf.setState(Q2_PATH, QueueState.RUNNING); |
| conf = new YarnConfiguration(csConf); |
| // reinitialize the CS, the operation should be failed. |
| try { |
| cs.reinitialize(conf, rmContext); |
| Assert.fail("Should throw an Exception."); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getCause().getMessage().contains( |
| "The parent queue:q1 state is STOPPED, " |
| + "child queue:q2 state cannot be RUNNING.")); |
| } |
| } |
| |
| @Test(timeout = 15000) |
| public void testQueueStateTransit() throws Exception { |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); |
| csConf.setQueues(Q1_PATH, new String[] {Q2, Q3}); |
| |
| csConf.setCapacity(Q1_PATH, 100); |
| csConf.setCapacity(Q2_PATH, 50); |
| csConf.setCapacity(Q3_PATH, 50); |
| |
| conf = new YarnConfiguration(csConf); |
| cs = new CapacityScheduler(); |
| |
| RMContext rmContext = TestUtils.getMockRMContext(); |
| cs.setConf(conf); |
| cs.setRMContext(rmContext); |
| cs.init(conf); |
| |
| //by default, the state of ALL queues should be RUNNING |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); |
| |
| // submit an application to Q2 |
| ApplicationId appId = ApplicationId.newInstance( |
| System.currentTimeMillis(), 1); |
| String userName = "testUser"; |
| cs.getQueue(Q2).submitApplication(appId, userName, Q2); |
| FiCaSchedulerApp app = getMockApplication(appId, userName, |
| Resources.createResource(4, 0)); |
| cs.getQueue(Q2).submitApplicationAttempt(app, userName); |
| |
| // set Q2 state to stop and do reinitialize. |
| csConf.setState(Q2_PATH, QueueState.STOPPED); |
| conf = new YarnConfiguration(csConf); |
| cs.reinitialize(conf, rmContext); |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); |
| Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); |
| |
| // set Q1 state to stop and do reinitialize. |
| csConf.setState(Q1_PATH, QueueState.STOPPED); |
| conf = new YarnConfiguration(csConf); |
| cs.reinitialize(conf, rmContext); |
| Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); |
| |
| // Active Q3, should fail |
| csConf.setState(Q3_PATH, QueueState.RUNNING); |
| conf = new YarnConfiguration(csConf); |
| try { |
| cs.reinitialize(conf, rmContext); |
| Assert.fail("Should throw an Exception."); |
| } catch (Exception ex) { |
| // Do Nothing |
| } |
| |
| // stop the app running in q2 |
| cs.getQueue(Q2).finishApplicationAttempt(app, Q2); |
| cs.getQueue(Q2).finishApplication(appId, userName); |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); |
| Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); |
| |
| } |
| |
| private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, |
| Resource amResource) { |
| FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); |
| ApplicationAttemptId applicationAttemptId = |
| ApplicationAttemptId.newInstance(appId, 0); |
| doReturn(applicationAttemptId.getApplicationId()). |
| when(application).getApplicationId(); |
| doReturn(applicationAttemptId).when(application).getApplicationAttemptId(); |
| doReturn(user).when(application).getUser(); |
| doReturn(amResource).when(application).getAMResource(); |
| doReturn(Priority.newInstance(0)).when(application).getPriority(); |
| doReturn(CommonNodeLabelsManager.NO_LABEL).when(application) |
| .getAppAMNodePartitionName(); |
| doReturn(amResource).when(application).getAMResource( |
| CommonNodeLabelsManager.NO_LABEL); |
| when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) |
| .thenCallRealMethod(); |
| return application; |
| } |
| } |