| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import static org.apache.hadoop.fs.CreateFlag.CREATE; |
| import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.isA; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.BufferedWriter; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FsStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ConfigurationException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; |
| import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; |
| import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; |
| import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; |
| import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; |
| import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestDefaultContainerExecutor { |
| |
| private static Path BASE_TMP_PATH = new Path("target", |
| TestDefaultContainerExecutor.class.getSimpleName()); |
| |
| private YarnConfiguration yarnConfiguration; |
| |
| private DefaultContainerExecutor containerExecutor; |
| |
| private Container mockContainer; |
| |
| private NumaResourceAllocator numaResourceAllocator; |
| |
| @AfterClass |
| public static void deleteTmpFiles() throws IOException { |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| try { |
| lfs.delete(BASE_TMP_PATH, true); |
| } catch (FileNotFoundException e) { |
| } |
| } |
| |
| byte[] createTmpFile(Path dst, Random r, int len) |
| throws IOException { |
| // use unmodified local context |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| dst = lfs.makeQualified(dst); |
| lfs.mkdir(dst.getParent(), null, true); |
| byte[] bytes = new byte[len]; |
| FSDataOutputStream out = null; |
| try { |
| out = lfs.create(dst, EnumSet.of(CREATE, OVERWRITE)); |
| r.nextBytes(bytes); |
| out.write(bytes); |
| } finally { |
| if (out != null) out.close(); |
| } |
| return bytes; |
| } |
| |
| @Test |
| public void testDirPermissions() throws Exception { |
| deleteTmpFiles(); |
| |
| final String user = "somebody"; |
| final String appId = "app_12345_123"; |
| final FsPermission userCachePerm = new FsPermission( |
| DefaultContainerExecutor.USER_PERM); |
| final FsPermission appCachePerm = new FsPermission( |
| DefaultContainerExecutor.APPCACHE_PERM); |
| final FsPermission fileCachePerm = new FsPermission( |
| DefaultContainerExecutor.FILECACHE_PERM); |
| final FsPermission appDirPerm = new FsPermission( |
| DefaultContainerExecutor.APPDIR_PERM); |
| |
| List<String> localDirs = new ArrayList<String>(); |
| localDirs.add(new Path(BASE_TMP_PATH, "localDirA").toString()); |
| localDirs.add(new Path(BASE_TMP_PATH, "localDirB").toString()); |
| List<String> logDirs = new ArrayList<String>(); |
| logDirs.add(new Path(BASE_TMP_PATH, "logDirA").toString()); |
| logDirs.add(new Path(BASE_TMP_PATH, "logDirB").toString()); |
| |
| Configuration conf = new Configuration(); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| FileContext lfs = FileContext.getLocalFSFileContext(conf); |
| DefaultContainerExecutor executor = new DefaultContainerExecutor(lfs); |
| executor.setConf(conf); |
| executor.init(null); |
| |
| try { |
| executor.createUserLocalDirs(localDirs, user); |
| executor.createUserCacheDirs(localDirs, user); |
| executor.createAppDirs(localDirs, user, appId); |
| |
| for (String dir : localDirs) { |
| FileStatus stats = lfs.getFileStatus( |
| new Path(new Path(dir, ContainerLocalizer.USERCACHE), user)); |
| Assert.assertEquals(userCachePerm, stats.getPermission()); |
| } |
| |
| for (String dir : localDirs) { |
| Path userCachePath = new Path( |
| new Path(dir, ContainerLocalizer.USERCACHE), user); |
| Path appCachePath = new Path(userCachePath, |
| ContainerLocalizer.APPCACHE); |
| FileStatus stats = lfs.getFileStatus(appCachePath); |
| Assert.assertEquals(appCachePerm, stats.getPermission()); |
| stats = lfs.getFileStatus( |
| new Path(userCachePath, ContainerLocalizer.FILECACHE)); |
| Assert.assertEquals(fileCachePerm, stats.getPermission()); |
| stats = lfs.getFileStatus(new Path(appCachePath, appId)); |
| Assert.assertEquals(appDirPerm, stats.getPermission()); |
| } |
| |
| String[] permissionsArray = { "000", "111", "555", "710", "777" }; |
| |
| for (String perm : permissionsArray ) { |
| conf.set(YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS, perm); |
| executor.clearLogDirPermissions(); |
| FsPermission logDirPerm = new FsPermission( |
| executor.getLogDirPermissions()); |
| executor.createAppLogDirs(appId, logDirs, user); |
| |
| for (String dir : logDirs) { |
| FileStatus stats = lfs.getFileStatus(new Path(dir, appId)); |
| Assert.assertEquals(logDirPerm, stats.getPermission()); |
| lfs.delete(new Path(dir, appId), true); |
| } |
| } |
| } finally { |
| deleteTmpFiles(); |
| } |
| } |
| |
| private void writeStringToRelativePath(FileContext fc, Path p, String str) |
| throws IOException { |
| p = p.makeQualified(fc.getDefaultFileSystem().getUri(), |
| new Path(new File(".").getAbsolutePath())); |
| try (FSDataOutputStream os = fc.create(p).build()) { |
| os.writeUTF(str); |
| } |
| } |
| |
| private String readStringFromPath(FileContext fc, Path p) throws IOException { |
| try (FSDataInputStream is = fc.open(p)) { |
| return is.readUTF(); |
| } |
| } |
| |
| @Test |
| public void testLaunchContainerCopyFilesWithoutHTTPS() throws Exception { |
| testLaunchContainerCopyFiles(false); |
| } |
| |
| @Test |
| public void testLaunchContainerCopyFilesWithHTTPS() throws Exception { |
| testLaunchContainerCopyFiles(true); |
| } |
| |
| private void testLaunchContainerCopyFiles(boolean https) throws Exception { |
| if (Shell.WINDOWS) { |
| BASE_TMP_PATH = |
| new Path(new File("target").getAbsolutePath(), |
| TestDefaultContainerExecutor.class.getSimpleName()); |
| } |
| |
| Path localDir = new Path(BASE_TMP_PATH, "localDir"); |
| List<String> localDirs = new ArrayList<String>(); |
| localDirs.add(localDir.toString()); |
| List<String> logDirs = new ArrayList<String>(); |
| Path logDir = new Path(BASE_TMP_PATH, "logDir"); |
| logDirs.add(logDir.toString()); |
| |
| Configuration conf = new Configuration(); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString()); |
| conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); |
| |
| FileContext lfs = FileContext.getLocalFSFileContext(conf); |
| deleteTmpFiles(); |
| lfs.mkdir(BASE_TMP_PATH, FsPermission.getDefault(), true); |
| DefaultContainerExecutor dce = new DefaultContainerExecutor(lfs); |
| dce.setConf(conf); |
| |
| Container container = mock(Container.class); |
| ContainerId cId = mock(ContainerId.class); |
| ContainerLaunchContext context = mock(ContainerLaunchContext.class); |
| HashMap<String, String> env = new HashMap<String, String>(); |
| env.put("LANG", "C"); |
| |
| String appSubmitter = "nobody"; |
| String appId = "APP_ID"; |
| String containerId = "CONTAINER_ID"; |
| |
| when(container.getContainerId()).thenReturn(cId); |
| when(container.getLaunchContext()).thenReturn(context); |
| when(cId.toString()).thenReturn(containerId); |
| when(cId.getApplicationAttemptId()).thenReturn( |
| ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 0)); |
| when(context.getEnvironment()).thenReturn(env); |
| |
| Path scriptPath = new Path(BASE_TMP_PATH, "script"); |
| Path tokensPath = new Path(BASE_TMP_PATH, "tokens"); |
| Path keystorePath = new Path(BASE_TMP_PATH, "keystore"); |
| Path truststorePath = new Path(BASE_TMP_PATH, "truststore"); |
| writeStringToRelativePath(lfs, scriptPath, "script"); |
| writeStringToRelativePath(lfs, tokensPath, "tokens"); |
| if (https) { |
| writeStringToRelativePath(lfs, keystorePath, "keystore"); |
| writeStringToRelativePath(lfs, truststorePath, "truststore"); |
| } |
| |
| Path workDir = localDir; |
| Path pidFile = new Path(workDir, "pid.txt"); |
| |
| dce.init(null); |
| dce.activateContainer(cId, pidFile); |
| ContainerStartContext.Builder ctxBuilder = |
| new ContainerStartContext.Builder() |
| .setContainer(container) |
| .setNmPrivateContainerScriptPath(scriptPath) |
| .setNmPrivateTokensPath(tokensPath) |
| .setUser(appSubmitter) |
| .setAppId(appId) |
| .setContainerWorkDir(workDir) |
| .setLocalDirs(localDirs) |
| .setLogDirs(logDirs); |
| if (https) { |
| ctxBuilder.setNmPrivateTruststorePath(truststorePath) |
| .setNmPrivateKeystorePath(keystorePath); |
| } |
| ContainerStartContext ctx = ctxBuilder.build(); |
| |
| // #launchContainer will copy a number of files to this directory. |
| // Ensure that it doesn't exist first |
| lfs.delete(workDir, true); |
| try { |
| lfs.getFileStatus(workDir); |
| Assert.fail("Expected FileNotFoundException on " + workDir); |
| } catch (FileNotFoundException e) { |
| // expected |
| } |
| |
| dce.launchContainer(ctx); |
| |
| Path finalScriptPath = new Path(workDir, |
| ContainerLaunch.CONTAINER_SCRIPT); |
| Path finalTokensPath = new Path(workDir, |
| ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); |
| Path finalKeystorePath = new Path(workDir, |
| ContainerLaunch.KEYSTORE_FILE); |
| Path finalTrustorePath = new Path(workDir, |
| ContainerLaunch.TRUSTSTORE_FILE); |
| |
| Assert.assertTrue(lfs.getFileStatus(workDir).isDirectory()); |
| Assert.assertTrue(lfs.getFileStatus(finalScriptPath).isFile()); |
| Assert.assertTrue(lfs.getFileStatus(finalTokensPath).isFile()); |
| if (https) { |
| Assert.assertTrue(lfs.getFileStatus(finalKeystorePath).isFile()); |
| Assert.assertTrue(lfs.getFileStatus(finalTrustorePath).isFile()); |
| } else { |
| try { |
| lfs.getFileStatus(finalKeystorePath); |
| Assert.fail("Expected FileNotFoundException on " + finalKeystorePath); |
| } catch (FileNotFoundException e) { |
| // expected |
| } |
| try { |
| lfs.getFileStatus(finalTrustorePath); |
| Assert.fail("Expected FileNotFoundException on " + finalKeystorePath); |
| } catch (FileNotFoundException e) { |
| // expected |
| } |
| } |
| |
| Assert.assertEquals("script", readStringFromPath(lfs, finalScriptPath)); |
| Assert.assertEquals("tokens", readStringFromPath(lfs, finalTokensPath)); |
| if (https) { |
| Assert.assertEquals("keystore", readStringFromPath(lfs, |
| finalKeystorePath)); |
| Assert.assertEquals("truststore", readStringFromPath(lfs, |
| finalTrustorePath)); |
| } |
| } |
| |
| @Test |
| public void testContainerLaunchError() |
| throws IOException, InterruptedException, ConfigurationException { |
| |
| if (Shell.WINDOWS) { |
| BASE_TMP_PATH = |
| new Path(new File("target").getAbsolutePath(), |
| TestDefaultContainerExecutor.class.getSimpleName()); |
| } |
| |
| Path localDir = new Path(BASE_TMP_PATH, "localDir"); |
| List<String> localDirs = new ArrayList<String>(); |
| localDirs.add(localDir.toString()); |
| List<String> logDirs = new ArrayList<String>(); |
| Path logDir = new Path(BASE_TMP_PATH, "logDir"); |
| logDirs.add(logDir.toString()); |
| |
| Configuration conf = new Configuration(); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString()); |
| conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); |
| |
| FileContext lfs = FileContext.getLocalFSFileContext(conf); |
| DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(lfs)); |
| mockExec.setConf(conf); |
| doAnswer( |
| new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) |
| throws Throwable { |
| String diagnostics = (String) invocationOnMock.getArguments()[0]; |
| assertTrue("Invalid Diagnostics message: " + diagnostics, |
| diagnostics.contains("No such file or directory")); |
| return null; |
| } |
| } |
| ).when(mockExec).logOutput(any(String.class)); |
| |
| String appSubmitter = "nobody"; |
| String appId = "APP_ID"; |
| String containerId = "CONTAINER_ID"; |
| Container container = mock(Container.class); |
| ContainerId cId = mock(ContainerId.class); |
| ContainerLaunchContext context = mock(ContainerLaunchContext.class); |
| HashMap<String, String> env = new HashMap<String, String>(); |
| env.put("LANG", "C"); |
| |
| when(container.getContainerId()).thenReturn(cId); |
| when(container.getLaunchContext()).thenReturn(context); |
| try { |
| doAnswer(new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) |
| throws Throwable { |
| ContainerDiagnosticsUpdateEvent event = |
| (ContainerDiagnosticsUpdateEvent) invocationOnMock |
| .getArguments()[0]; |
| assertTrue("Invalid Diagnostics message: " |
| + event.getDiagnosticsUpdate(), |
| event.getDiagnosticsUpdate().contains("No such file or directory") |
| ); |
| return null; |
| } |
| }).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class)); |
| |
| when(cId.toString()).thenReturn(containerId); |
| when(cId.getApplicationAttemptId()).thenReturn( |
| ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 0)); |
| |
| when(context.getEnvironment()).thenReturn(env); |
| |
| mockExec.createUserLocalDirs(localDirs, appSubmitter); |
| mockExec.createUserCacheDirs(localDirs, appSubmitter); |
| mockExec.createAppDirs(localDirs, appSubmitter, appId); |
| mockExec.createAppLogDirs(appId, logDirs, appSubmitter); |
| |
| Path scriptPath = new Path("file:///bin/echo"); |
| Path tokensPath = new Path("file:///dev/null"); |
| Path keystorePath = new Path("file:///dev/null"); |
| Path truststorePath = new Path("file:///dev/null"); |
| if (Shell.WINDOWS) { |
| File tmp = new File(BASE_TMP_PATH.toString(), "test_echo.cmd"); |
| BufferedWriter output = new BufferedWriter(new FileWriter(tmp)); |
| output.write("Exit 1"); |
| output.write("Echo No such file or directory 1>&2"); |
| output.close(); |
| scriptPath = new Path(tmp.getAbsolutePath()); |
| tmp = new File(BASE_TMP_PATH.toString(), "tokens"); |
| tmp.createNewFile(); |
| tokensPath = new Path(tmp.getAbsolutePath()); |
| } |
| Path workDir = localDir; |
| Path pidFile = new Path(workDir, "pid.txt"); |
| |
| mockExec.init(null); |
| mockExec.activateContainer(cId, pidFile); |
| int ret = mockExec.launchContainer(new ContainerStartContext.Builder() |
| .setContainer(container) |
| .setNmPrivateContainerScriptPath(scriptPath) |
| .setNmPrivateTokensPath(tokensPath) |
| .setNmPrivateKeystorePath(keystorePath) |
| .setNmPrivateTruststorePath(truststorePath) |
| .setUser(appSubmitter) |
| .setAppId(appId) |
| .setContainerWorkDir(workDir) |
| .setLocalDirs(localDirs) |
| .setLogDirs(logDirs) |
| .build()); |
| Assert.assertNotSame(0, ret); |
| } finally { |
| mockExec.deleteAsUser(new DeletionAsUserContext.Builder() |
| .setUser(appSubmitter) |
| .setSubDir(localDir) |
| .build()); |
| mockExec.deleteAsUser(new DeletionAsUserContext.Builder() |
| .setUser(appSubmitter) |
| .setSubDir(logDir) |
| .build()); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testStartLocalizer() throws IOException, InterruptedException, |
| YarnException { |
| |
| final Path firstDir = new Path(BASE_TMP_PATH, "localDir1"); |
| List<String> localDirs = new ArrayList<String>(); |
| final Path secondDir = new Path(BASE_TMP_PATH, "localDir2"); |
| List<String> logDirs = new ArrayList<String>(); |
| final Path logDir = new Path(BASE_TMP_PATH, "logDir"); |
| final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir"); |
| FsPermission perms = new FsPermission((short)0770); |
| |
| Configuration conf = new Configuration(); |
| |
| final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf)); |
| final FileContext.Util mockUtil = spy(mockLfs.util()); |
| doAnswer(new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) |
| throws Throwable { |
| return mockUtil; |
| } |
| }).when(mockLfs).util(); |
| doAnswer(new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) |
| throws Throwable { |
| Path dest = (Path) invocationOnMock.getArguments()[1]; |
| if (dest.toString().contains(firstDir.toString())) { |
| // throw an Exception when copy token to the first local dir |
| // to simulate no space on the first drive |
| throw new IOException("No space on this drive " + |
| dest.toString()); |
| } else { |
| // copy token to the second local dir |
| DataOutputStream tokenOut = null; |
| try { |
| Credentials credentials = new Credentials(); |
| tokenOut = mockLfs.create(dest, |
| EnumSet.of(CREATE, OVERWRITE)); |
| credentials.writeTokenStorageToStream(tokenOut); |
| } finally { |
| if (tokenOut != null) { |
| tokenOut.close(); |
| } |
| } |
| } |
| return null; |
| }}).when(mockUtil).copy(any(Path.class), any(Path.class), |
| anyBoolean(), anyBoolean()); |
| |
| doAnswer(new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) |
| throws Throwable { |
| Path p = (Path) invocationOnMock.getArguments()[0]; |
| // let second local directory return more free space than |
| // first local directory |
| if (p.toString().contains(firstDir.toString())) { |
| return new FsStatus(2000, 2000, 0); |
| } else { |
| return new FsStatus(1000, 0, 1000); |
| } |
| } |
| }).when(mockLfs).getFsStatus(any(Path.class)); |
| |
| DefaultContainerExecutor mockExec = |
| spy(new DefaultContainerExecutor(mockLfs) { |
| @Override |
| public ContainerLocalizer createContainerLocalizer(String user, |
| String appId, String locId, String tokenFileName, |
| List<String> localDirs, FileContext localizerFc) |
| throws IOException { |
| |
| // Spy on the localizer and make it return valid heart-beat |
| // responses even though there is no real NodeManager. |
| ContainerLocalizer localizer = |
| super.createContainerLocalizer(user, appId, locId, |
| tokenFileName, localDirs, localizerFc); |
| ContainerLocalizer spyLocalizer = spy(localizer); |
| LocalizationProtocol nmProxy = mock(LocalizationProtocol.class); |
| try { |
| when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenReturn( |
| new MockLocalizerHeartbeatResponse(LocalizerAction.DIE, |
| new ArrayList<ResourceLocalizationSpec>())); |
| } catch (YarnException e) { |
| throw new IOException(e); |
| } |
| when(spyLocalizer.getProxy(any())) |
| .thenReturn(nmProxy); |
| |
| return spyLocalizer; |
| } |
| }); |
| mockExec.setConf(conf); |
| localDirs.add(mockLfs.makeQualified(firstDir).toString()); |
| localDirs.add(mockLfs.makeQualified(secondDir).toString()); |
| logDirs.add(mockLfs.makeQualified(logDir).toString()); |
| conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, |
| localDirs.toArray(new String[localDirs.size()])); |
| conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); |
| mockLfs.mkdir(tokenDir, perms, true); |
| Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens"); |
| String appSubmitter = "nobody"; |
| String appId = "APP_ID"; |
| String locId = "LOC_ID"; |
| |
| LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); |
| when(dirsHandler.getLocalDirs()).thenReturn(localDirs); |
| when(dirsHandler.getLogDirs()).thenReturn(logDirs); |
| |
| try { |
| mockExec.startLocalizer(new LocalizerStartContext.Builder() |
| .setNmPrivateContainerTokens(nmPrivateCTokensPath) |
| .setNmAddr(null) |
| .setUser(appSubmitter) |
| .setAppId(appId) |
| .setLocId(locId) |
| .setDirsHandler(dirsHandler) |
| .build()); |
| |
| } catch (IOException e) { |
| Assert.fail("StartLocalizer failed to copy token file: " |
| + StringUtils.stringifyException(e)); |
| } finally { |
| mockExec.deleteAsUser(new DeletionAsUserContext.Builder() |
| .setUser(appSubmitter) |
| .setSubDir(firstDir) |
| .build()); |
| mockExec.deleteAsUser(new DeletionAsUserContext.Builder() |
| .setUser(appSubmitter) |
| .setSubDir(secondDir) |
| .build()); |
| mockExec.deleteAsUser(new DeletionAsUserContext.Builder() |
| .setUser(appSubmitter) |
| .setSubDir(logDir) |
| .build()); |
| deleteTmpFiles(); |
| } |
| |
| // Verify that the calls happen the expected number of times |
| verify(mockUtil, times(1)).copy(any(Path.class), any(Path.class), |
| anyBoolean(), anyBoolean()); |
| verify(mockLfs, times(2)).getFsStatus(any(Path.class)); |
| } |
| |
| @Test |
| public void testPickDirectory() throws Exception { |
| Configuration conf = new Configuration(); |
| FileContext lfs = FileContext.getLocalFSFileContext(conf); |
| DefaultContainerExecutor executor = new DefaultContainerExecutor(lfs); |
| |
| long[] availableOnDisk = new long[2]; |
| availableOnDisk[0] = 100; |
| availableOnDisk[1] = 100; |
| assertEquals(0, executor.pickDirectory(0L, availableOnDisk)); |
| assertEquals(0, executor.pickDirectory(99L, availableOnDisk)); |
| assertEquals(1, executor.pickDirectory(100L, availableOnDisk)); |
| assertEquals(1, executor.pickDirectory(101L, availableOnDisk)); |
| assertEquals(1, executor.pickDirectory(199L, availableOnDisk)); |
| |
| long[] availableOnDisk2 = new long[5]; |
| availableOnDisk2[0] = 100; |
| availableOnDisk2[1] = 10; |
| availableOnDisk2[2] = 400; |
| availableOnDisk2[3] = 200; |
| availableOnDisk2[4] = 350; |
| assertEquals(0, executor.pickDirectory(0L, availableOnDisk2)); |
| assertEquals(0, executor.pickDirectory(99L, availableOnDisk2)); |
| assertEquals(1, executor.pickDirectory(100L, availableOnDisk2)); |
| assertEquals(1, executor.pickDirectory(105L, availableOnDisk2)); |
| assertEquals(2, executor.pickDirectory(110L, availableOnDisk2)); |
| assertEquals(2, executor.pickDirectory(259L, availableOnDisk2)); |
| assertEquals(3, executor.pickDirectory(700L, availableOnDisk2)); |
| assertEquals(4, executor.pickDirectory(710L, availableOnDisk2)); |
| assertEquals(4, executor.pickDirectory(910L, availableOnDisk2)); |
| } |
| |
| // @Test |
| // public void testInit() throws IOException, InterruptedException { |
| // Configuration conf = new Configuration(); |
| // AbstractFileSystem spylfs = |
| // spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); |
| // // don't actually create dirs |
| // //doNothing().when(spylfs).mkdir(Matchers.<Path>anyObject(), |
| // // Matchers.<FsPermission>anyObject(), anyBoolean()); |
| // FileContext lfs = FileContext.getFileContext(spylfs, conf); |
| // |
| // Path basedir = new Path("target", |
| // TestDefaultContainerExecutor.class.getSimpleName()); |
| // List<String> localDirs = new ArrayList<String>(); |
| // List<Path> localPaths = new ArrayList<Path>(); |
| // for (int i = 0; i < 4; ++i) { |
| // Path p = new Path(basedir, i + ""); |
| // lfs.mkdir(p, null, true); |
| // localPaths.add(p); |
| // localDirs.add(p.toString()); |
| // } |
| // final String user = "yak"; |
| // final String appId = "app_RM_0"; |
| // final Path logDir = new Path(basedir, "logs"); |
| // final Path nmLocal = new Path(basedir, "nmPrivate/" + user + "/" + appId); |
| // final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 8040); |
| // System.out.println("NMLOCAL: " + nmLocal); |
| // Random r = new Random(); |
| // |
| // /* |
| // // XXX FileContext cannot be reasonably mocked to do this |
| // // mock jobFiles copy |
| // long fileSeed = r.nextLong(); |
| // r.setSeed(fileSeed); |
| // System.out.println("SEED: " + seed); |
| // Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE); |
| // DataOutputBuffer fileCacheBytes = mockStream(spylfs, fileCachePath, r, 512); |
| // |
| // // mock jobTokens copy |
| // long jobSeed = r.nextLong(); |
| // r.setSeed(jobSeed); |
| // System.out.println("SEED: " + seed); |
| // Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE); |
| // DataOutputBuffer jobTokenBytes = mockStream(spylfs, jobTokenPath, r, 512); |
| // */ |
| // |
| // // create jobFiles |
| // long fileSeed = r.nextLong(); |
| // r.setSeed(fileSeed); |
| // System.out.println("SEED: " + fileSeed); |
| // Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE); |
| // byte[] fileCacheBytes = createTmpFile(fileCachePath, r, 512); |
| // |
| // // create jobTokens |
| // long jobSeed = r.nextLong(); |
| // r.setSeed(jobSeed); |
| // System.out.println("SEED: " + jobSeed); |
| // Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE); |
| // byte[] jobTokenBytes = createTmpFile(jobTokenPath, r, 512); |
| // |
| // DefaultContainerExecutor dce = new DefaultContainerExecutor(lfs); |
| // Localization mockLocalization = mock(Localization.class); |
| // ApplicationLocalizer spyLocalizer = |
| // spy(new ApplicationLocalizer(lfs, user, appId, logDir, |
| // localPaths)); |
| // // ignore cache localization |
| // doNothing().when(spyLocalizer).localizeFiles( |
| // Matchers.<Localization>anyObject(), Matchers.<Path>anyObject()); |
| // Path workingDir = lfs.getWorkingDirectory(); |
| // dce.initApplication(spyLocalizer, nmLocal, mockLocalization, localPaths); |
| // lfs.setWorkingDirectory(workingDir); |
| // |
| // for (Path localdir : localPaths) { |
| // Path userdir = lfs.makeQualified(new Path(localdir, |
| // new Path(ApplicationLocalizer.USERCACHE, user))); |
| // // $localdir/$user |
| // verify(spylfs).mkdir(userdir, |
| // new FsPermission(ApplicationLocalizer.USER_PERM), true); |
| // // $localdir/$user/appcache |
| // Path jobdir = new Path(userdir, ApplicationLocalizer.appcache); |
| // verify(spylfs).mkdir(jobdir, |
| // new FsPermission(ApplicationLocalizer.appcache_PERM), true); |
| // // $localdir/$user/filecache |
| // Path filedir = new Path(userdir, ApplicationLocalizer.FILECACHE); |
| // verify(spylfs).mkdir(filedir, |
| // new FsPermission(ApplicationLocalizer.FILECACHE_PERM), true); |
| // // $localdir/$user/appcache/$appId |
| // Path appdir = new Path(jobdir, appId); |
| // verify(spylfs).mkdir(appdir, |
| // new FsPermission(ApplicationLocalizer.APPDIR_PERM), true); |
| // // $localdir/$user/appcache/$appId/work |
| // Path workdir = new Path(appdir, ApplicationLocalizer.WORKDIR); |
| // verify(spylfs, atMost(1)).mkdir(workdir, FsPermission.getDefault(), true); |
| // } |
| // // $logdir/$appId |
| // Path logdir = new Path(lfs.makeQualified(logDir), appId); |
| // verify(spylfs).mkdir(logdir, |
| // new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true); |
| // } |
| |
| @Before |
| public void setUp() throws IOException, YarnException { |
| yarnConfiguration = new YarnConfiguration(); |
| setNumaConfig(); |
| Context mockContext = createAndGetMockContext(); |
| NMStateStoreService nmStateStoreService = |
| mock(NMStateStoreService.class); |
| when(mockContext.getNMStateStore()).thenReturn(nmStateStoreService); |
| numaResourceAllocator = new NumaResourceAllocator(mockContext) { |
| @Override |
| public String executeNGetCmdOutput(Configuration config) |
| throws YarnRuntimeException { |
| return getNumaCmdOutput(); |
| } |
| }; |
| |
| numaResourceAllocator.init(yarnConfiguration); |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| containerExecutor = new DefaultContainerExecutor(lfs) { |
| @Override |
| public Configuration getConf() { |
| return yarnConfiguration; |
| } |
| }; |
| containerExecutor.setNumaResourceAllocator(numaResourceAllocator); |
| mockContainer = mock(Container.class); |
| } |
| |
| private void setNumaConfig() { |
| yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_ENABLED, "true"); |
| yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, "true"); |
| yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, "/usr/bin/numactl"); |
| } |
| |
| |
| private String getNumaCmdOutput() { |
| // architecture of 8 cpu cores |
| // randomly picked size of memory |
| return "available: 2 nodes (0-1)\n\t" |
| + "node 0 cpus: 0 2 4 6\n\t" |
| + "node 0 size: 73717 MB\n\t" |
| + "node 0 free: 73717 MB\n\t" |
| + "node 1 cpus: 1 3 5 7\n\t" |
| + "node 1 size: 73717 MB\n\t" |
| + "node 1 free: 73717 MB\n\t" |
| + "node distances:\n\t" |
| + "node 0 1\n\t" |
| + "0: 10 20\n\t" |
| + "1: 20 10"; |
| } |
| |
| private Context createAndGetMockContext() { |
| Context mockContext = mock(Context.class); |
| @SuppressWarnings("unchecked") |
| ConcurrentHashMap<ContainerId, Container> mockContainers = mock( |
| ConcurrentHashMap.class); |
| mockContainer = mock(Container.class); |
| when(mockContainer.getResourceMappings()) |
| .thenReturn(new ResourceMappings()); |
| when(mockContainers.get(any())).thenReturn(mockContainer); |
| when(mockContext.getContainers()).thenReturn(mockContainers); |
| when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2)); |
| return mockContext; |
| } |
| |
| private void testAllocateNumaResource(String containerId, Resource resource, |
| String memNodes, String cpuNodes) throws Exception { |
| when(mockContainer.getContainerId()) |
| .thenReturn(ContainerId.fromString(containerId)); |
| when(mockContainer.getResource()).thenReturn(resource); |
| NumaResourceAllocation numaResourceAllocation = |
| numaResourceAllocator.allocateNumaNodes(mockContainer); |
| containerExecutor.setNumactl(containerExecutor.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, |
| YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD)); |
| String[] commands = containerExecutor.getNumaCommands(numaResourceAllocation); |
| assertEquals(Arrays.asList(commands), Arrays.asList("/usr/bin/numactl", |
| "--interleave=" + memNodes, "--cpunodebind=" + cpuNodes)); |
| } |
| |
| @Test |
| public void testAllocateNumaMemoryResource() throws Exception { |
| // keeping cores constant for testing memory resources |
| |
| // allocates node 0 for memory and cpu |
| testAllocateNumaResource("container_1481156246874_0001_01_000001", |
| Resource.newInstance(2048, 2), "0", "0"); |
| |
| // allocates node 1 for memory and cpu since allocator uses round robin assignment |
| testAllocateNumaResource("container_1481156246874_0001_01_000002", |
| Resource.newInstance(60000, 2), "1", "1"); |
| |
| // allocates node 0,1 for memory since there is no sufficient memory in any one node |
| testAllocateNumaResource("container_1481156246874_0001_01_000003", |
| Resource.newInstance(80000, 2), "0,1", "0"); |
| |
| // returns null since there are no sufficient resources available for the request |
| when(mockContainer.getContainerId()).thenReturn( |
| ContainerId.fromString("container_1481156246874_0001_01_000004")); |
| when(mockContainer.getResource()) |
| .thenReturn(Resource.newInstance(80000, 2)); |
| Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); |
| |
| // allocates node 1 for memory and cpu |
| testAllocateNumaResource("container_1481156246874_0001_01_000005", |
| Resource.newInstance(1024, 2), "1", "1"); |
| } |
| |
| @Test |
| public void testAllocateNumaCpusResource() throws Exception { |
| // keeping memory constant |
| |
| // allocates node 0 for memory and cpu |
| testAllocateNumaResource("container_1481156246874_0001_01_000001", |
| Resource.newInstance(2048, 2), "0", "0"); |
| |
| // allocates node 1 for memory and cpu since allocator uses round robin assignment |
| testAllocateNumaResource("container_1481156246874_0001_01_000002", |
| Resource.newInstance(2048, 2), "1", "1"); |
| |
| // allocates node 0,1 for cpus since there is are no sufficient cpus available in any one node |
| testAllocateNumaResource("container_1481156246874_0001_01_000003", |
| Resource.newInstance(2048, 3), "0", "0,1"); |
| |
| // returns null since there are no sufficient resources available for the request |
| when(mockContainer.getContainerId()).thenReturn( |
| ContainerId.fromString("container_1481156246874_0001_01_000004")); |
| when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2)); |
| Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); |
| |
| // allocates node 1 for memory and cpu |
| testAllocateNumaResource("container_1481156246874_0001_01_000005", |
| Resource.newInstance(2048, 1), "1", "1"); |
| } |
| |
| @Test |
| public void testReacquireContainer() throws Exception { |
| @SuppressWarnings("unchecked") |
| ConcurrentHashMap<ContainerId, Container> mockContainers = mock( |
| ConcurrentHashMap.class); |
| Context mockContext = mock(Context.class); |
| NMStateStoreService mock = mock(NMStateStoreService.class); |
| when(mockContext.getNMStateStore()).thenReturn(mock); |
| ResourceMappings resourceMappings = new ResourceMappings(); |
| AssignedResources assignedRscs = new AssignedResources(); |
| when(mockContainer.getResource()) |
| .thenReturn(Resource.newInstance(147434, 2)); |
| ContainerId cid = ContainerId.fromString("container_1481156246874_0001_01_000001"); |
| when(mockContainer.getContainerId()).thenReturn(cid); |
| NumaResourceAllocation numaResourceAllocation = |
| numaResourceAllocator.allocateNumaNodes(mockContainer); |
| assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation)); |
| resourceMappings.addAssignedResources("numa", assignedRscs); |
| when(mockContainer.getResourceMappings()).thenReturn(resourceMappings); |
| when(mockContainers.get(any())).thenReturn(mockContainer); |
| when(mockContext.getContainers()).thenReturn(mockContainers); |
| |
| // recovered numa resources should be added to the used resources and |
| // remaining will be available for further allocation. |
| |
| ContainerReacquisitionContext containerReacquisitionContext = |
| new ContainerReacquisitionContext.Builder() |
| .setContainerId(cid) |
| .setUser("user") |
| .setContainer(mockContainer) |
| .build(); |
| |
| containerExecutor.reacquireContainer(containerReacquisitionContext); |
| |
| // reacquireContainer recovers all the numa resources , |
| // that should be free to use next |
| testAllocateNumaResource("container_1481156246874_0001_01_000001", |
| Resource.newInstance(147434, 2), "0,1", "1"); |
| when(mockContainer.getContainerId()).thenReturn( |
| ContainerId.fromString("container_1481156246874_0001_01_000004")); |
| when(mockContainer.getResource()) |
| .thenReturn(Resource.newInstance(1024, 2)); |
| |
| // returns null since there are no sufficient resources available for the request |
| Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); |
| } |
| |
| @Test |
| public void testConcatStringCommands() { |
| // test one array of string as null |
| assertEquals(containerExecutor.concatStringCommands(null, new String[]{"hello"})[0], |
| new String[]{"hello"}[0]); |
| // test both array of string as null |
| Assert.assertNull(containerExecutor.concatStringCommands(null, null)); |
| // test case when both arrays are not null and of equal length |
| String[] res = containerExecutor.concatStringCommands(new String[]{"one"}, |
| new String[]{"two"}); |
| assertEquals(res[0]+res[1], "one" + "two"); |
| // test both array of different length |
| res = containerExecutor.concatStringCommands(new String[]{"one"}, |
| new String[]{"two", "three"}); |
| assertEquals(res[0] + res[1] + res[2], "one" + "two" + "three"); |
| |
| } |
| |
| |
| } |