| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import java.io.InputStream; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Random; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.AbstractFileSystem; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.Options.CreateOpts; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; |
| |
| import static org.apache.hadoop.fs.CreateFlag.*; |
| |
| |
| import org.junit.AfterClass; |
| import org.junit.Test; |
| import static org.junit.Assert.*; |
| import org.mockito.ArgumentMatcher; |
| import org.mockito.Matchers; |
| import static org.mockito.Mockito.*; |
| |
| public class TestDefaultContainerExecutor { |
| |
| /* |
| // XXX FileContext cannot be mocked to do this |
| static FSDataInputStream getRandomStream(Random r, int len) |
| throws IOException { |
| byte[] bytes = new byte[len]; |
| r.nextBytes(bytes); |
| DataInputBuffer buf = new DataInputBuffer(); |
| buf.reset(bytes, 0, bytes.length); |
| return new FSDataInputStream(new FakeFSDataInputStream(buf)); |
| } |
| |
| class PathEndsWith extends ArgumentMatcher<Path> { |
| final String suffix; |
| PathEndsWith(String suffix) { |
| this.suffix = suffix; |
| } |
| @Override |
| public boolean matches(Object o) { |
| return |
| suffix.equals(((Path)o).getName()); |
| } |
| } |
| |
| DataOutputBuffer mockStream( |
| AbstractFileSystem spylfs, Path p, Random r, int len) |
| throws IOException { |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| doReturn(getRandomStream(r, len)).when(spylfs).open(p); |
| doReturn(new FileStatus(len, false, -1, -1L, -1L, p)).when( |
| spylfs).getFileStatus(argThat(new PathEndsWith(p.getName()))); |
| doReturn(new FSDataOutputStream(dob)).when(spylfs).createInternal( |
| argThat(new PathEndsWith(p.getName())), |
| eq(EnumSet.of(OVERWRITE)), |
| Matchers.<FsPermission>anyObject(), anyInt(), anyShort(), anyLong(), |
| Matchers.<Progressable>anyObject(), anyInt(), anyBoolean()); |
| return dob; |
| } |
| */ |
| |
| @AfterClass |
| public static void deleteTmpFiles() throws IOException { |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| lfs.delete(new Path("target", |
| TestDefaultContainerExecutor.class.getSimpleName()), true); |
| } |
| |
| byte[] createTmpFile(Path dst, Random r, int len) |
| throws IOException { |
| // use unmodified local context |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| dst = lfs.makeQualified(dst); |
| lfs.mkdir(dst.getParent(), null, true); |
| byte[] bytes = new byte[len]; |
| FSDataOutputStream out = null; |
| try { |
| out = lfs.create(dst, EnumSet.of(CREATE, OVERWRITE)); |
| r.nextBytes(bytes); |
| out.write(bytes); |
| } finally { |
| if (out != null) out.close(); |
| } |
| return bytes; |
| } |
| |
| // @Test |
| // public void testInit() throws IOException, InterruptedException { |
| // Configuration conf = new Configuration(); |
| // AbstractFileSystem spylfs = |
| // spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); |
| // // don't actually create dirs |
| // //doNothing().when(spylfs).mkdir(Matchers.<Path>anyObject(), |
| // // Matchers.<FsPermission>anyObject(), anyBoolean()); |
| // FileContext lfs = FileContext.getFileContext(spylfs, conf); |
| // |
| // Path basedir = new Path("target", |
| // TestDefaultContainerExecutor.class.getSimpleName()); |
| // List<String> localDirs = new ArrayList<String>(); |
| // List<Path> localPaths = new ArrayList<Path>(); |
| // for (int i = 0; i < 4; ++i) { |
| // Path p = new Path(basedir, i + ""); |
| // lfs.mkdir(p, null, true); |
| // localPaths.add(p); |
| // localDirs.add(p.toString()); |
| // } |
| // final String user = "yak"; |
| // final String appId = "app_RM_0"; |
| // final Path logDir = new Path(basedir, "logs"); |
| // final Path nmLocal = new Path(basedir, "nmPrivate/" + user + "/" + appId); |
| // final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344); |
| // System.out.println("NMLOCAL: " + nmLocal); |
| // Random r = new Random(); |
| // |
| // /* |
| // // XXX FileContext cannot be reasonably mocked to do this |
| // // mock jobFiles copy |
| // long fileSeed = r.nextLong(); |
| // r.setSeed(fileSeed); |
| // System.out.println("SEED: " + seed); |
| // Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE); |
| // DataOutputBuffer fileCacheBytes = mockStream(spylfs, fileCachePath, r, 512); |
| // |
| // // mock jobTokens copy |
| // long jobSeed = r.nextLong(); |
| // r.setSeed(jobSeed); |
| // System.out.println("SEED: " + seed); |
| // Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE); |
| // DataOutputBuffer jobTokenBytes = mockStream(spylfs, jobTokenPath, r, 512); |
| // */ |
| // |
| // // create jobFiles |
| // long fileSeed = r.nextLong(); |
| // r.setSeed(fileSeed); |
| // System.out.println("SEED: " + fileSeed); |
| // Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE); |
| // byte[] fileCacheBytes = createTmpFile(fileCachePath, r, 512); |
| // |
| // // create jobTokens |
| // long jobSeed = r.nextLong(); |
| // r.setSeed(jobSeed); |
| // System.out.println("SEED: " + jobSeed); |
| // Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE); |
| // byte[] jobTokenBytes = createTmpFile(jobTokenPath, r, 512); |
| // |
| // DefaultContainerExecutor dce = new DefaultContainerExecutor(lfs); |
| // Localization mockLocalization = mock(Localization.class); |
| // ApplicationLocalizer spyLocalizer = |
| // spy(new ApplicationLocalizer(lfs, user, appId, logDir, |
| // localPaths)); |
| // // ignore cache localization |
| // doNothing().when(spyLocalizer).localizeFiles( |
| // Matchers.<Localization>anyObject(), Matchers.<Path>anyObject()); |
| // Path workingDir = lfs.getWorkingDirectory(); |
| // dce.initApplication(spyLocalizer, nmLocal, mockLocalization, localPaths); |
| // lfs.setWorkingDirectory(workingDir); |
| // |
| // for (Path localdir : localPaths) { |
| // Path userdir = lfs.makeQualified(new Path(localdir, |
| // new Path(ApplicationLocalizer.USERCACHE, user))); |
| // // $localdir/$user |
| // verify(spylfs).mkdir(userdir, |
| // new FsPermission(ApplicationLocalizer.USER_PERM), true); |
| // // $localdir/$user/appcache |
| // Path jobdir = new Path(userdir, ApplicationLocalizer.appcache); |
| // verify(spylfs).mkdir(jobdir, |
| // new FsPermission(ApplicationLocalizer.appcache_PERM), true); |
| // // $localdir/$user/filecache |
| // Path filedir = new Path(userdir, ApplicationLocalizer.FILECACHE); |
| // verify(spylfs).mkdir(filedir, |
| // new FsPermission(ApplicationLocalizer.FILECACHE_PERM), true); |
| // // $localdir/$user/appcache/$appId |
| // Path appdir = new Path(jobdir, appId); |
| // verify(spylfs).mkdir(appdir, |
| // new FsPermission(ApplicationLocalizer.APPDIR_PERM), true); |
| // // $localdir/$user/appcache/$appId/work |
| // Path workdir = new Path(appdir, ApplicationLocalizer.WORKDIR); |
| // verify(spylfs, atMost(1)).mkdir(workdir, FsPermission.getDefault(), true); |
| // } |
| // // $logdir/$appId |
| // Path logdir = new Path(lfs.makeQualified(logDir), appId); |
| // verify(spylfs).mkdir(logdir, |
| // new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true); |
| // } |
| |
| } |