| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.applications.distributedshell; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.client.api.AMRMClient; |
| import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; |
| import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.Matchers; |
| import org.mockito.Mockito; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| /** |
| * A bunch of tests to make sure that the container allocations |
| * and releases occur correctly. |
| */ |
| public class TestDSAppMaster { |
| |
| static class TestAppMaster extends ApplicationMaster { |
| private int threadsLaunched = 0; |
| public List<String> yarnShellIds = new ArrayList<String>(); |
| |
| @Override |
| protected Thread createLaunchContainerThread(Container allocatedContainer, |
| String shellId) { |
| threadsLaunched++; |
| launchedContainers.add(allocatedContainer.getId()); |
| yarnShellIds.add(shellId); |
| return new Thread(); |
| } |
| |
| void setNumTotalContainers(int numTotalContainers) { |
| this.numTotalContainers = numTotalContainers; |
| } |
| |
| int getAllocatedContainers() { |
| return this.numAllocatedContainers.get(); |
| } |
| |
| @Override |
| void startTimelineClient(final Configuration conf) throws YarnException, |
| IOException, InterruptedException { |
| timelineClient = null; |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test |
| public void testDSAppMasterAllocateHandler() throws Exception { |
| |
| TestAppMaster master = new TestAppMaster(); |
| int targetContainers = 2; |
| AMRMClientAsync mockClient = Mockito.mock(AMRMClientAsync.class); |
| master.setAmRMClient(mockClient); |
| master.setNumTotalContainers(targetContainers); |
| Mockito.doNothing().when(mockClient) |
| .addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); |
| |
| ApplicationMaster.RMCallbackHandler handler = master.getRMCallbackHandler(); |
| |
| List<Container> containers = new ArrayList<>(1); |
| ContainerId id1 = BuilderUtils.newContainerId(1, 1, 1, 1); |
| containers.add(generateContainer(id1)); |
| |
| master.numRequestedContainers.set(targetContainers); |
| |
| // first allocate a single container, everything should be fine |
| handler.onContainersAllocated(containers); |
| Assert.assertEquals("Wrong container allocation count", 1, |
| master.getAllocatedContainers()); |
| Mockito.verifyZeroInteractions(mockClient); |
| Assert.assertEquals("Incorrect number of threads launched", 1, |
| master.threadsLaunched); |
| Assert.assertEquals("Incorrect YARN Shell IDs", |
| Arrays.asList("1"), master.yarnShellIds); |
| |
| // now send 3 extra containers |
| containers.clear(); |
| ContainerId id2 = BuilderUtils.newContainerId(1, 1, 1, 2); |
| containers.add(generateContainer(id2)); |
| ContainerId id3 = BuilderUtils.newContainerId(1, 1, 1, 3); |
| containers.add(generateContainer(id3)); |
| ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4); |
| containers.add(generateContainer(id4)); |
| handler.onContainersAllocated(containers); |
| Assert.assertEquals("Wrong final container allocation count", 4, |
| master.getAllocatedContainers()); |
| |
| Assert.assertEquals("Incorrect number of threads launched", 4, |
| master.threadsLaunched); |
| |
| Assert.assertEquals("Incorrect YARN Shell IDs", |
| Arrays.asList("1", "2", "3", "4"), master.yarnShellIds); |
| |
| // make sure we handle completion events correctly |
| List<ContainerStatus> status = new ArrayList<>(); |
| status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS)); |
| status.add(generateContainerStatus(id2, ContainerExitStatus.SUCCESS)); |
| status.add(generateContainerStatus(id3, ContainerExitStatus.ABORTED)); |
| status.add(generateContainerStatus(id4, ContainerExitStatus.ABORTED)); |
| handler.onContainersCompleted(status); |
| |
| Assert.assertEquals("Unexpected number of completed containers", |
| targetContainers, master.getNumCompletedContainers()); |
| Assert.assertTrue("Master didn't finish containers as expected", |
| master.getDone()); |
| |
| // test for events from containers we know nothing about |
| // these events should be ignored |
| status = new ArrayList<>(); |
| ContainerId id5 = BuilderUtils.newContainerId(1, 1, 1, 5); |
| status.add(generateContainerStatus(id5, ContainerExitStatus.ABORTED)); |
| Assert.assertEquals("Unexpected number of completed containers", |
| targetContainers, master.getNumCompletedContainers()); |
| Assert.assertTrue("Master didn't finish containers as expected", |
| master.getDone()); |
| status.add(generateContainerStatus(id5, ContainerExitStatus.SUCCESS)); |
| Assert.assertEquals("Unexpected number of completed containers", |
| targetContainers, master.getNumCompletedContainers()); |
| Assert.assertTrue("Master didn't finish containers as expected", |
| master.getDone()); |
| } |
| |
| private Container generateContainer(ContainerId cid) { |
| return Container.newInstance(cid, NodeId.newInstance("host", 5000), |
| "host:80", Resource.newInstance(1024, 1), Priority.newInstance(0), null); |
| } |
| |
| private ContainerStatus |
| generateContainerStatus(ContainerId id, int exitStatus) { |
| return ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", |
| exitStatus); |
| } |
| |
| @Test |
| public void testTimelineClientInDSAppMasterV1() throws Exception { |
| runTimelineClientInDSAppMaster(true, false); |
| } |
| |
| @Test |
| public void testTimelineClientInDSAppMasterV2() throws Exception { |
| runTimelineClientInDSAppMaster(false, true); |
| } |
| |
| @Test |
| public void testTimelineClientInDSAppMasterV1V2() throws Exception { |
| runTimelineClientInDSAppMaster(true, true); |
| } |
| |
| @Test |
| public void testTimelineClientInDSAppMasterDisabled() throws Exception { |
| runTimelineClientInDSAppMaster(false, false); |
| } |
| |
| private void runTimelineClientInDSAppMaster(boolean v1Enabled, |
| boolean v2Enabled) throws Exception { |
| ApplicationMaster appMaster = createAppMasterWithStartedTimelineService( |
| v1Enabled, v2Enabled); |
| validateAppMasterTimelineService(v1Enabled, v2Enabled, appMaster); |
| } |
| |
| private void validateAppMasterTimelineService(boolean v1Enabled, |
| boolean v2Enabled, ApplicationMaster appMaster) { |
| if (v1Enabled) { |
| Assert.assertEquals(appMaster.appSubmitterUgi, |
| ((TimelineClientImpl)appMaster.timelineClient).getUgi()); |
| } else { |
| Assert.assertNull(appMaster.timelineClient); |
| } |
| if (v2Enabled) { |
| Assert.assertNotNull(appMaster.timelineV2Client); |
| } else { |
| Assert.assertNull(appMaster.timelineV2Client); |
| } |
| } |
| |
| private ApplicationMaster createAppMasterWithStartedTimelineService( |
| boolean v1Enabled, boolean v2Enabled) throws Exception { |
| ApplicationMaster appMaster = new ApplicationMaster(); |
| appMaster.appSubmitterUgi = UserGroupInformation |
| .createUserForTesting("foo", new String[] {"bar"}); |
| Configuration conf = this.getTimelineServiceConf(v1Enabled, v2Enabled); |
| ApplicationId appId = ApplicationId.newInstance(1L, 1); |
| appMaster.appAttemptID = ApplicationAttemptId.newInstance(appId, 1); |
| appMaster.startTimelineClient(conf); |
| return appMaster; |
| } |
| |
| private Configuration getTimelineServiceConf(boolean v1Enabled, |
| boolean v2Enabled) { |
| Configuration conf = new YarnConfiguration(new Configuration(false)); |
| Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf)); |
| |
| if (v1Enabled || v2Enabled) { |
| conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); |
| } |
| |
| if (v1Enabled) { |
| conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); |
| } |
| |
| if (v2Enabled) { |
| conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); |
| conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, |
| FileSystemTimelineWriterImpl.class, TimelineWriter.class); |
| } |
| |
| if (v1Enabled && v2Enabled) { |
| conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); |
| conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); |
| } |
| return conf; |
| } |
| } |