| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.UnsupportedFileSystemException; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
| import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; |
| import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; |
| import org.apache.hadoop.yarn.server.nodemanager.DeletionService; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.junit.Test; |
| |
| public class TestContainerManager extends BaseContainerManagerTest { |
| |
| public TestContainerManager() throws UnsupportedFileSystemException { |
| super(); |
| } |
| |
| static { |
| LOG = LogFactory.getLog(TestContainerManager.class); |
| } |
| |
| private ContainerId createContainerId() { |
| ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); |
| appId.setClusterTimestamp(0); |
| appId.setId(0); |
| ApplicationAttemptId appAttemptId = |
| recordFactory.newRecordInstance(ApplicationAttemptId.class); |
| appAttemptId.setApplicationId(appId); |
| appAttemptId.setAttemptId(1); |
| ContainerId containerId = |
| recordFactory.newRecordInstance(ContainerId.class); |
| containerId.setApplicationAttemptId(appAttemptId); |
| return containerId; |
| } |
| |
| @Test |
| public void testContainerManagerInitialization() throws IOException { |
| |
| containerManager.start(); |
| |
| // Just do a query for a non-existing container. |
| boolean throwsException = false; |
| try { |
| GetContainerStatusRequest request = |
| recordFactory.newRecordInstance(GetContainerStatusRequest.class); |
| ContainerId cId = createContainerId(); |
| request.setContainerId(cId); |
| containerManager.getContainerStatus(request); |
| } catch (YarnRemoteException e) { |
| throwsException = true; |
| } |
| Assert.assertTrue(throwsException); |
| } |
| |
| @Test |
| public void testContainerSetup() throws IOException, InterruptedException { |
| |
| containerManager.start(); |
| |
| // ////// Create the resources for the container |
| File dir = new File(tmpDir, "dir"); |
| dir.mkdirs(); |
| File file = new File(dir, "file"); |
| PrintWriter fileWriter = new PrintWriter(file); |
| fileWriter.write("Hello World!"); |
| fileWriter.close(); |
| |
| ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| |
| // ////// Construct the Container-id |
| ContainerId cId = createContainerId(); |
| container.setContainerId(cId); |
| |
| container.setUser(user); |
| |
| // ////// Construct the container-spec. |
| ContainerLaunchContext containerLaunchContext = |
| recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| URL resource_alpha = |
| ConverterUtils.getYarnUrlFromPath(localFS |
| .makeQualified(new Path(file.getAbsolutePath()))); |
| LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class); |
| rsrc_alpha.setResource(resource_alpha); |
| rsrc_alpha.setSize(-1); |
| rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); |
| rsrc_alpha.setType(LocalResourceType.FILE); |
| rsrc_alpha.setTimestamp(file.lastModified()); |
| String destinationFile = "dest_file"; |
| Map<String, LocalResource> localResources = |
| new HashMap<String, LocalResource>(); |
| localResources.put(destinationFile, rsrc_alpha); |
| containerLaunchContext.setLocalResources(localResources); |
| containerLaunchContext.setUser(container.getUser()); |
| containerLaunchContext.setContainerId(container.getContainerId()); |
| containerLaunchContext.setResource(recordFactory |
| .newRecordInstance(Resource.class)); |
| |
| StartContainerRequest startRequest = |
| recordFactory.newRecordInstance(StartContainerRequest.class); |
| startRequest.setContainerLaunchContext(containerLaunchContext); |
| |
| containerManager.startContainer(startRequest); |
| |
| BaseContainerManagerTest.waitForContainerState(containerManager, cId, |
| ContainerState.COMPLETE); |
| |
| // Now ascertain that the resources are localised correctly. |
| ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); |
| String appIDStr = ConverterUtils.toString(appId); |
| String containerIDStr = ConverterUtils.toString(cId); |
| File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); |
| File userDir = new File(userCacheDir, user); |
| File appCache = new File(userDir, ContainerLocalizer.APPCACHE); |
| File appDir = new File(appCache, appIDStr); |
| File containerDir = new File(appDir, containerIDStr); |
| File targetFile = new File(containerDir, destinationFile); |
| File sysDir = |
| new File(localDir, |
| ResourceLocalizationService.NM_PRIVATE_DIR); |
| File appSysDir = new File(sysDir, appIDStr); |
| File containerSysDir = new File(appSysDir, containerIDStr); |
| |
| for (File f : new File[] { localDir, sysDir, userCacheDir, appDir, |
| appSysDir, |
| containerDir, containerSysDir }) { |
| Assert.assertTrue(f.getAbsolutePath() + " doesn't exist!!", f.exists()); |
| Assert.assertTrue(f.getAbsolutePath() + " is not a directory!!", |
| f.isDirectory()); |
| } |
| Assert.assertTrue(targetFile.getAbsolutePath() + " doesn't exist!!", |
| targetFile.exists()); |
| |
| // Now verify the contents of the file |
| BufferedReader reader = new BufferedReader(new FileReader(targetFile)); |
| Assert.assertEquals("Hello World!", reader.readLine()); |
| Assert.assertEquals(null, reader.readLine()); |
| } |
| |
| @Test |
| public void testContainerLaunchAndStop() throws IOException, |
| InterruptedException { |
| containerManager.start(); |
| |
| File scriptFile = new File(tmpDir, "scriptFile.sh"); |
| PrintWriter fileWriter = new PrintWriter(scriptFile); |
| File processStartFile = |
| new File(tmpDir, "start_file.txt").getAbsoluteFile(); |
| fileWriter.write("\numask 0"); // So that start file is readable by the test |
| fileWriter.write("\necho Hello World! > " + processStartFile); |
| fileWriter.write("\necho $$ >> " + processStartFile); |
| fileWriter.write("\nexec sleep 100"); |
| fileWriter.close(); |
| |
| ContainerLaunchContext containerLaunchContext = |
| recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| |
| // ////// Construct the Container-id |
| ContainerId cId = createContainerId(); |
| containerLaunchContext.setContainerId(cId); |
| |
| containerLaunchContext.setUser(user); |
| |
| URL resource_alpha = |
| ConverterUtils.getYarnUrlFromPath(localFS |
| .makeQualified(new Path(scriptFile.getAbsolutePath()))); |
| LocalResource rsrc_alpha = |
| recordFactory.newRecordInstance(LocalResource.class); |
| rsrc_alpha.setResource(resource_alpha); |
| rsrc_alpha.setSize(-1); |
| rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); |
| rsrc_alpha.setType(LocalResourceType.FILE); |
| rsrc_alpha.setTimestamp(scriptFile.lastModified()); |
| String destinationFile = "dest_file"; |
| Map<String, LocalResource> localResources = |
| new HashMap<String, LocalResource>(); |
| localResources.put(destinationFile, rsrc_alpha); |
| containerLaunchContext.setLocalResources(localResources); |
| containerLaunchContext.setUser(containerLaunchContext.getUser()); |
| List<String> commands = new ArrayList<String>(); |
| commands.add("/bin/bash"); |
| commands.add(scriptFile.getAbsolutePath()); |
| containerLaunchContext.setCommands(commands); |
| containerLaunchContext.setResource(recordFactory |
| .newRecordInstance(Resource.class)); |
| containerLaunchContext.getResource().setMemory(100 * 1024 * 1024); |
| StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); |
| startRequest.setContainerLaunchContext(containerLaunchContext); |
| containerManager.startContainer(startRequest); |
| |
| int timeoutSecs = 0; |
| while (!processStartFile.exists() && timeoutSecs++ < 20) { |
| Thread.sleep(1000); |
| LOG.info("Waiting for process start-file to be created"); |
| } |
| Assert.assertTrue("ProcessStartFile doesn't exist!", |
| processStartFile.exists()); |
| |
| // Now verify the contents of the file |
| BufferedReader reader = |
| new BufferedReader(new FileReader(processStartFile)); |
| Assert.assertEquals("Hello World!", reader.readLine()); |
| // Get the pid of the process |
| String pid = reader.readLine().trim(); |
| // No more lines |
| Assert.assertEquals(null, reader.readLine()); |
| |
| // Now test the stop functionality. |
| |
| // Assert that the process is alive |
| Assert.assertTrue("Process is not alive!", |
| exec.signalContainer(user, |
| pid, Signal.NULL)); |
| // Once more |
| Assert.assertTrue("Process is not alive!", |
| exec.signalContainer(user, |
| pid, Signal.NULL)); |
| |
| StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); |
| stopRequest.setContainerId(cId); |
| containerManager.stopContainer(stopRequest); |
| |
| BaseContainerManagerTest.waitForContainerState(containerManager, cId, |
| ContainerState.COMPLETE); |
| |
| GetContainerStatusRequest gcsRequest = |
| recordFactory.newRecordInstance(GetContainerStatusRequest.class); |
| gcsRequest.setContainerId(cId); |
| ContainerStatus containerStatus = |
| containerManager.getContainerStatus(gcsRequest).getStatus(); |
| Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), |
| containerStatus.getExitStatus()); |
| |
| // Assert that the process is not alive anymore |
| Assert.assertFalse("Process is still alive!", |
| exec.signalContainer(user, |
| pid, Signal.NULL)); |
| } |
| |
| private void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException { |
| |
| File scriptFile = new File(tmpDir, "scriptFile.sh"); |
| PrintWriter fileWriter = new PrintWriter(scriptFile); |
| File processStartFile = |
| new File(tmpDir, "start_file.txt").getAbsoluteFile(); |
| fileWriter.write("\numask 0"); // So that start file is readable by the test |
| fileWriter.write("\necho Hello World! > " + processStartFile); |
| fileWriter.write("\necho $$ >> " + processStartFile); |
| |
| // Have script throw an exit code at the end |
| if (exitCode != 0) { |
| fileWriter.write("\nexit "+exitCode); |
| } |
| |
| fileWriter.close(); |
| |
| ContainerLaunchContext containerLaunchContext = |
| recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| |
| // ////// Construct the Container-id |
| ContainerId cId = createContainerId(); |
| containerLaunchContext.setContainerId(cId); |
| |
| containerLaunchContext.setUser(user); |
| |
| URL resource_alpha = |
| ConverterUtils.getYarnUrlFromPath(localFS |
| .makeQualified(new Path(scriptFile.getAbsolutePath()))); |
| LocalResource rsrc_alpha = |
| recordFactory.newRecordInstance(LocalResource.class); |
| rsrc_alpha.setResource(resource_alpha); |
| rsrc_alpha.setSize(-1); |
| rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); |
| rsrc_alpha.setType(LocalResourceType.FILE); |
| rsrc_alpha.setTimestamp(scriptFile.lastModified()); |
| String destinationFile = "dest_file"; |
| Map<String, LocalResource> localResources = |
| new HashMap<String, LocalResource>(); |
| localResources.put(destinationFile, rsrc_alpha); |
| containerLaunchContext.setLocalResources(localResources); |
| containerLaunchContext.setUser(containerLaunchContext.getUser()); |
| List<String> commands = new ArrayList<String>(); |
| commands.add("/bin/bash"); |
| commands.add(scriptFile.getAbsolutePath()); |
| containerLaunchContext.setCommands(commands); |
| containerLaunchContext.setResource(recordFactory |
| .newRecordInstance(Resource.class)); |
| containerLaunchContext.getResource().setMemory(100 * 1024 * 1024); |
| |
| StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); |
| startRequest.setContainerLaunchContext(containerLaunchContext); |
| containerManager.startContainer(startRequest); |
| |
| BaseContainerManagerTest.waitForContainerState(containerManager, cId, |
| ContainerState.COMPLETE); |
| |
| GetContainerStatusRequest gcsRequest = |
| recordFactory.newRecordInstance(GetContainerStatusRequest.class); |
| gcsRequest.setContainerId(cId); |
| ContainerStatus containerStatus = |
| containerManager.getContainerStatus(gcsRequest).getStatus(); |
| |
| // Verify exit status matches exit state of script |
| Assert.assertEquals(exitCode, |
| containerStatus.getExitStatus()); |
| } |
| |
| @Test |
| public void testContainerLaunchAndExitSuccess() throws IOException, InterruptedException { |
| containerManager.start(); |
| int exitCode = 0; |
| |
| // launch context for a command that will return exit code 0 |
| // and verify exit code returned |
| testContainerLaunchAndExit(exitCode); |
| } |
| |
| @Test |
| public void testContainerLaunchAndExitFailure() throws IOException, InterruptedException { |
| containerManager.start(); |
| int exitCode = 50; |
| |
| // launch context for a command that will return exit code 0 |
| // and verify exit code returned |
| testContainerLaunchAndExit(exitCode); |
| } |
| |
| @Test |
| public void testLocalFilesCleanup() throws InterruptedException, |
| IOException { |
| // Real del service |
| delSrvc = new DeletionService(exec); |
| delSrvc.init(conf); |
| |
| containerManager = |
| new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, |
| metrics, new ApplicationACLsManager(conf), dirsHandler); |
| containerManager.init(conf); |
| containerManager.start(); |
| |
| // ////// Create the resources for the container |
| File dir = new File(tmpDir, "dir"); |
| dir.mkdirs(); |
| File file = new File(dir, "file"); |
| PrintWriter fileWriter = new PrintWriter(file); |
| fileWriter.write("Hello World!"); |
| fileWriter.close(); |
| |
| ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| |
| // ////// Construct the Container-id |
| ContainerId cId = createContainerId(); |
| ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); |
| container.setContainerId(cId); |
| |
| container.setUser(user); |
| |
| // ////// Construct the container-spec. |
| ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| // containerLaunchContext.resources = |
| // new HashMap<CharSequence, LocalResource>(); |
| URL resource_alpha = |
| ConverterUtils.getYarnUrlFromPath(FileContext.getLocalFSFileContext() |
| .makeQualified(new Path(file.getAbsolutePath()))); |
| LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class); |
| rsrc_alpha.setResource(resource_alpha); |
| rsrc_alpha.setSize(-1); |
| rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); |
| rsrc_alpha.setType(LocalResourceType.FILE); |
| rsrc_alpha.setTimestamp(file.lastModified()); |
| String destinationFile = "dest_file"; |
| Map<String, LocalResource> localResources = |
| new HashMap<String, LocalResource>(); |
| localResources.put(destinationFile, rsrc_alpha); |
| containerLaunchContext.setLocalResources(localResources); |
| containerLaunchContext.setUser(container.getUser()); |
| containerLaunchContext.setContainerId(container.getContainerId()); |
| containerLaunchContext.setResource(recordFactory |
| .newRecordInstance(Resource.class)); |
| |
| // containerLaunchContext.command = new ArrayList<CharSequence>(); |
| |
| StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); |
| request.setContainerLaunchContext(containerLaunchContext); |
| containerManager.startContainer(request); |
| |
| BaseContainerManagerTest.waitForContainerState(containerManager, cId, |
| ContainerState.COMPLETE); |
| |
| BaseContainerManagerTest.waitForApplicationState(containerManager, |
| cId.getApplicationAttemptId().getApplicationId(), |
| ApplicationState.RUNNING); |
| |
| // Now ascertain that the resources are localised correctly. |
| String appIDStr = ConverterUtils.toString(appId); |
| String containerIDStr = ConverterUtils.toString(cId); |
| File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); |
| File userDir = new File(userCacheDir, user); |
| File appCache = new File(userDir, ContainerLocalizer.APPCACHE); |
| File appDir = new File(appCache, appIDStr); |
| File containerDir = new File(appDir, containerIDStr); |
| File targetFile = new File(containerDir, destinationFile); |
| File sysDir = |
| new File(localDir, |
| ResourceLocalizationService.NM_PRIVATE_DIR); |
| File appSysDir = new File(sysDir, appIDStr); |
| File containerSysDir = new File(appSysDir, containerIDStr); |
| // AppDir should still exist |
| Assert.assertTrue("AppDir " + appDir.getAbsolutePath() |
| + " doesn't exist!!", appDir.exists()); |
| Assert.assertTrue("AppSysDir " + appSysDir.getAbsolutePath() |
| + " doesn't exist!!", appSysDir.exists()); |
| for (File f : new File[] { containerDir, containerSysDir }) { |
| Assert.assertFalse(f.getAbsolutePath() + " exists!!", f.exists()); |
| } |
| Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", |
| targetFile.exists()); |
| |
| // Simulate RM sending an AppFinish event. |
| containerManager.handle(new CMgrCompletedAppsEvent(Arrays |
| .asList(new ApplicationId[] { appId }))); |
| |
| BaseContainerManagerTest.waitForApplicationState(containerManager, |
| cId.getApplicationAttemptId().getApplicationId(), |
| ApplicationState.FINISHED); |
| |
| // Now ascertain that the resources are localised correctly. |
| for (File f : new File[] { appDir, containerDir, appSysDir, |
| containerSysDir }) { |
| // Wait for deletion. Deletion can happen long after AppFinish because of |
| // the async DeletionService |
| int timeout = 0; |
| while (f.exists() && timeout++ < 15) { |
| Thread.sleep(1000); |
| } |
| Assert.assertFalse(f.getAbsolutePath() + " exists!!", f.exists()); |
| } |
| // Wait for deletion |
| int timeout = 0; |
| while (targetFile.exists() && timeout++ < 15) { |
| Thread.sleep(1000); |
| } |
| Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", |
| targetFile.exists()); |
| } |
| } |