| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.util; |
| |
| import static org.apache.hadoop.fs.CreateFlag.CREATE; |
| import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assume.assumeTrue; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.jar.JarEntry; |
| import java.util.jar.JarOutputStream; |
| import java.util.jar.Manifest; |
| import java.util.zip.GZIPOutputStream; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipOutputStream; |
| |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.junit.Assert; |
| |
| import org.apache.commons.compress.archivers.tar.TarArchiveEntry; |
| import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.junit.AfterClass; |
| import org.junit.Test; |
| |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| |
| public class TestFSDownload { |
| |
| private static final Log LOG = LogFactory.getLog(TestFSDownload.class); |
| private static AtomicLong uniqueNumberGenerator = |
| new AtomicLong(System.currentTimeMillis()); |
| private enum TEST_FILE_TYPE { |
| TAR, JAR, ZIP, TGZ |
| }; |
| |
| @AfterClass |
| public static void deleteTestDir() throws IOException { |
| FileContext fs = FileContext.getLocalFSFileContext(); |
| fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true); |
| } |
| |
| static final RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| static LocalResource createFile(FileContext files, Path p, int len, |
| Random r, LocalResourceVisibility vis) throws IOException { |
| createFile(files, p, len, r); |
| LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); |
| ret.setResource(URL.fromPath(p)); |
| ret.setSize(len); |
| ret.setType(LocalResourceType.FILE); |
| ret.setVisibility(vis); |
| ret.setTimestamp(files.getFileStatus(p).getModificationTime()); |
| return ret; |
| } |
| |
| static void createFile(FileContext files, Path p, int len, Random r) |
| throws IOException { |
| FSDataOutputStream out = null; |
| try { |
| byte[] bytes = new byte[len]; |
| out = files.create(p, EnumSet.of(CREATE, OVERWRITE)); |
| r.nextBytes(bytes); |
| out.write(bytes); |
| } finally { |
| if (out != null) out.close(); |
| } |
| } |
| |
| static LocalResource createJar(FileContext files, Path p, |
| LocalResourceVisibility vis) throws IOException { |
| LOG.info("Create jar file " + p); |
| File jarFile = new File((files.makeQualified(p)).toUri()); |
| FileOutputStream stream = new FileOutputStream(jarFile); |
| LOG.info("Create jar out stream "); |
| JarOutputStream out = new JarOutputStream(stream, new Manifest()); |
| LOG.info("Done writing jar stream "); |
| out.close(); |
| LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); |
| ret.setResource(URL.fromPath(p)); |
| FileStatus status = files.getFileStatus(p); |
| ret.setSize(status.getLen()); |
| ret.setTimestamp(status.getModificationTime()); |
| ret.setType(LocalResourceType.PATTERN); |
| ret.setVisibility(vis); |
| ret.setPattern("classes/.*"); |
| return ret; |
| } |
| |
| static LocalResource createTarFile(FileContext files, Path p, int len, |
| Random r, LocalResourceVisibility vis) throws IOException, |
| URISyntaxException { |
| byte[] bytes = new byte[len]; |
| r.nextBytes(bytes); |
| |
| File archiveFile = new File(p.toUri().getPath() + ".tar"); |
| archiveFile.createNewFile(); |
| TarArchiveOutputStream out = new TarArchiveOutputStream( |
| new FileOutputStream(archiveFile)); |
| TarArchiveEntry entry = new TarArchiveEntry(p.getName()); |
| entry.setSize(bytes.length); |
| out.putArchiveEntry(entry); |
| out.write(bytes); |
| out.closeArchiveEntry(); |
| out.close(); |
| |
| LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); |
| ret.setResource(URL.fromPath(new Path(p.toString() |
| + ".tar"))); |
| ret.setSize(len); |
| ret.setType(LocalResourceType.ARCHIVE); |
| ret.setVisibility(vis); |
| ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar")) |
| .getModificationTime()); |
| return ret; |
| } |
| |
| static LocalResource createTgzFile(FileContext files, Path p, int len, |
| Random r, LocalResourceVisibility vis) throws IOException, |
| URISyntaxException { |
| byte[] bytes = new byte[len]; |
| r.nextBytes(bytes); |
| |
| File gzipFile = new File(p.toUri().getPath() + ".tar.gz"); |
| gzipFile.createNewFile(); |
| TarArchiveOutputStream out = new TarArchiveOutputStream( |
| new GZIPOutputStream(new FileOutputStream(gzipFile))); |
| TarArchiveEntry entry = new TarArchiveEntry(p.getName()); |
| entry.setSize(bytes.length); |
| out.putArchiveEntry(entry); |
| out.write(bytes); |
| out.closeArchiveEntry(); |
| out.close(); |
| |
| LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); |
| ret.setResource(URL.fromPath(new Path(p.toString() |
| + ".tar.gz"))); |
| ret.setSize(len); |
| ret.setType(LocalResourceType.ARCHIVE); |
| ret.setVisibility(vis); |
| ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar.gz")) |
| .getModificationTime()); |
| return ret; |
| } |
| |
| static LocalResource createJarFile(FileContext files, Path p, int len, |
| Random r, LocalResourceVisibility vis) throws IOException, |
| URISyntaxException { |
| byte[] bytes = new byte[len]; |
| r.nextBytes(bytes); |
| |
| File archiveFile = new File(p.toUri().getPath() + ".jar"); |
| archiveFile.createNewFile(); |
| JarOutputStream out = new JarOutputStream( |
| new FileOutputStream(archiveFile)); |
| out.putNextEntry(new JarEntry(p.getName())); |
| out.write(bytes); |
| out.closeEntry(); |
| out.close(); |
| |
| LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); |
| ret.setResource(URL.fromPath(new Path(p.toString() |
| + ".jar"))); |
| ret.setSize(len); |
| ret.setType(LocalResourceType.ARCHIVE); |
| ret.setVisibility(vis); |
| ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) |
| .getModificationTime()); |
| return ret; |
| } |
| |
| static LocalResource createZipFile(FileContext files, Path p, int len, |
| Random r, LocalResourceVisibility vis) throws IOException, |
| URISyntaxException { |
| byte[] bytes = new byte[len]; |
| r.nextBytes(bytes); |
| |
| File archiveFile = new File(p.toUri().getPath() + ".ZIP"); |
| archiveFile.createNewFile(); |
| ZipOutputStream out = new ZipOutputStream( |
| new FileOutputStream(archiveFile)); |
| out.putNextEntry(new ZipEntry(p.getName())); |
| out.write(bytes); |
| out.closeEntry(); |
| out.close(); |
| |
| LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); |
| ret.setResource(URL.fromPath(new Path(p.toString() |
| + ".ZIP"))); |
| ret.setSize(len); |
| ret.setType(LocalResourceType.ARCHIVE); |
| ret.setVisibility(vis); |
| ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP")) |
| .getModificationTime()); |
| return ret; |
| } |
| |
| @Test (timeout=10000) |
| public void testDownloadBadPublic() throws IOException, URISyntaxException, |
| InterruptedException { |
| Configuration conf = new Configuration(); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| FileContext files = FileContext.getLocalFSFileContext(conf); |
| final Path basedir = files.makeQualified(new Path("target", |
| TestFSDownload.class.getSimpleName())); |
| files.mkdir(basedir, null, true); |
| conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); |
| |
| Map<LocalResource, LocalResourceVisibility> rsrcVis = |
| new HashMap<LocalResource, LocalResourceVisibility>(); |
| |
| Random rand = new Random(); |
| long sharedSeed = rand.nextLong(); |
| rand.setSeed(sharedSeed); |
| System.out.println("SEED: " + sharedSeed); |
| |
| Map<LocalResource,Future<Path>> pending = |
| new HashMap<LocalResource,Future<Path>>(); |
| ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); |
| LocalDirAllocator dirs = |
| new LocalDirAllocator(TestFSDownload.class.getName()); |
| int size = 512; |
| LocalResourceVisibility vis = LocalResourceVisibility.PUBLIC; |
| Path path = new Path(basedir, "test-file"); |
| LocalResource rsrc = createFile(files, path, size, rand, vis); |
| rsrcVis.put(rsrc, vis); |
| Path destPath = dirs.getLocalPathForWrite( |
| basedir.toString(), size, conf); |
| destPath = new Path (destPath, |
| Long.toString(uniqueNumberGenerator.incrementAndGet())); |
| FSDownload fsd = |
| new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, |
| destPath, rsrc); |
| pending.put(rsrc, exec.submit(fsd)); |
| exec.shutdown(); |
| while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS)); |
| Assert.assertTrue(pending.get(rsrc).isDone()); |
| |
| try { |
| for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) { |
| p.getValue().get(); |
| Assert.fail("We localized a file that is not public."); |
| } |
| } catch (ExecutionException e) { |
| Assert.assertTrue(e.getCause() instanceof IOException); |
| } |
| } |
| |
| @Test (timeout=60000) |
| public void testDownloadPublicWithStatCache() throws IOException, |
| URISyntaxException, InterruptedException, ExecutionException { |
| final Configuration conf = new Configuration(); |
| FileContext files = FileContext.getLocalFSFileContext(conf); |
| Path basedir = files.makeQualified(new Path("target", |
| TestFSDownload.class.getSimpleName())); |
| |
| // if test directory doesn't have ancestor permission, skip this test |
| FileSystem f = basedir.getFileSystem(conf); |
| assumeTrue(FSDownload.ancestorsHaveExecutePermissions(f, basedir, null)); |
| |
| files.mkdir(basedir, null, true); |
| conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); |
| |
| int size = 512; |
| |
| final ConcurrentMap<Path,AtomicInteger> counts = |
| new ConcurrentHashMap<Path,AtomicInteger>(); |
| final CacheLoader<Path,Future<FileStatus>> loader = |
| FSDownload.createStatusCacheLoader(conf); |
| final LoadingCache<Path,Future<FileStatus>> statCache = |
| CacheBuilder.newBuilder().build(new CacheLoader<Path,Future<FileStatus>>() { |
| public Future<FileStatus> load(Path path) throws Exception { |
| // increment the count |
| AtomicInteger count = counts.get(path); |
| if (count == null) { |
| count = new AtomicInteger(0); |
| AtomicInteger existing = counts.putIfAbsent(path, count); |
| if (existing != null) { |
| count = existing; |
| } |
| } |
| count.incrementAndGet(); |
| |
| // use the default loader |
| return loader.load(path); |
| } |
| }); |
| |
| // test FSDownload.isPublic() concurrently |
| final int fileCount = 3; |
| List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); |
| for (int i = 0; i < fileCount; i++) { |
| Random rand = new Random(); |
| long sharedSeed = rand.nextLong(); |
| rand.setSeed(sharedSeed); |
| System.out.println("SEED: " + sharedSeed); |
| final Path path = new Path(basedir, "test-file-" + i); |
| createFile(files, path, size, rand); |
| final FileSystem fs = path.getFileSystem(conf); |
| final FileStatus sStat = fs.getFileStatus(path); |
| tasks.add(new Callable<Boolean>() { |
| public Boolean call() throws IOException { |
| return FSDownload.isPublic(fs, path, sStat, statCache); |
| } |
| }); |
| } |
| |
| ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount); |
| try { |
| List<Future<Boolean>> futures = exec.invokeAll(tasks); |
| // files should be public |
| for (Future<Boolean> future: futures) { |
| assertTrue(future.get()); |
| } |
| // for each path exactly one file status call should be made |
| for (AtomicInteger count: counts.values()) { |
| assertSame(count.get(), 1); |
| } |
| } finally { |
| exec.shutdown(); |
| } |
| } |
| |
| @Test (timeout=10000) |
| public void testDownload() throws IOException, URISyntaxException, |
| InterruptedException { |
| Configuration conf = new Configuration(); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| FileContext files = FileContext.getLocalFSFileContext(conf); |
| final Path basedir = files.makeQualified(new Path("target", |
| TestFSDownload.class.getSimpleName())); |
| files.mkdir(basedir, null, true); |
| conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); |
| |
| Map<LocalResource, LocalResourceVisibility> rsrcVis = |
| new HashMap<LocalResource, LocalResourceVisibility>(); |
| |
| Random rand = new Random(); |
| long sharedSeed = rand.nextLong(); |
| rand.setSeed(sharedSeed); |
| System.out.println("SEED: " + sharedSeed); |
| |
| Map<LocalResource,Future<Path>> pending = |
| new HashMap<LocalResource,Future<Path>>(); |
| ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); |
| LocalDirAllocator dirs = |
| new LocalDirAllocator(TestFSDownload.class.getName()); |
| int[] sizes = new int[10]; |
| for (int i = 0; i < 10; ++i) { |
| sizes[i] = rand.nextInt(512) + 512; |
| LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; |
| if (i%2 == 1) { |
| vis = LocalResourceVisibility.APPLICATION; |
| } |
| Path p = new Path(basedir, "" + i); |
| LocalResource rsrc = createFile(files, p, sizes[i], rand, vis); |
| rsrcVis.put(rsrc, vis); |
| Path destPath = dirs.getLocalPathForWrite( |
| basedir.toString(), sizes[i], conf); |
| destPath = new Path (destPath, |
| Long.toString(uniqueNumberGenerator.incrementAndGet())); |
| FSDownload fsd = |
| new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, |
| destPath, rsrc); |
| pending.put(rsrc, exec.submit(fsd)); |
| } |
| |
| exec.shutdown(); |
| while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS)); |
| for (Future<Path> path: pending.values()) { |
| Assert.assertTrue(path.isDone()); |
| } |
| |
| try { |
| for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) { |
| Path localized = p.getValue().get(); |
| assertEquals(sizes[Integer.parseInt(localized.getName())], p.getKey() |
| .getSize()); |
| |
| FileStatus status = files.getFileStatus(localized.getParent()); |
| FsPermission perm = status.getPermission(); |
| assertEquals("Cache directory permissions are incorrect", |
| new FsPermission((short)0755), perm); |
| |
| status = files.getFileStatus(localized); |
| perm = status.getPermission(); |
| System.out.println("File permission " + perm + |
| " for rsrc vis " + p.getKey().getVisibility().name()); |
| assert(rsrcVis.containsKey(p.getKey())); |
| Assert.assertTrue("Private file should be 500", |
| perm.toShort() == FSDownload.PRIVATE_FILE_PERMS.toShort()); |
| } |
| } catch (ExecutionException e) { |
| throw new IOException("Failed exec", e); |
| } |
| } |
| |
| private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, |
| URISyntaxException, InterruptedException{ |
| Configuration conf = new Configuration(); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| FileContext files = FileContext.getLocalFSFileContext(conf); |
| final Path basedir = files.makeQualified(new Path("target", |
| TestFSDownload.class.getSimpleName())); |
| files.mkdir(basedir, null, true); |
| conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); |
| |
| Random rand = new Random(); |
| long sharedSeed = rand.nextLong(); |
| rand.setSeed(sharedSeed); |
| System.out.println("SEED: " + sharedSeed); |
| |
| Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>(); |
| ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); |
| LocalDirAllocator dirs = new LocalDirAllocator( |
| TestFSDownload.class.getName()); |
| |
| int size = rand.nextInt(512) + 512; |
| LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; |
| Path p = new Path(basedir, "" + 1); |
| String strFileName = ""; |
| LocalResource rsrc = null; |
| switch (fileType) { |
| case TAR: |
| rsrc = createTarFile(files, p, size, rand, vis); |
| break; |
| case JAR: |
| rsrc = createJarFile(files, p, size, rand, vis); |
| rsrc.setType(LocalResourceType.PATTERN); |
| break; |
| case ZIP: |
| rsrc = createZipFile(files, p, size, rand, vis); |
| strFileName = p.getName() + ".ZIP"; |
| break; |
| case TGZ: |
| rsrc = createTgzFile(files, p, size, rand, vis); |
| break; |
| } |
| Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf); |
| destPath = new Path (destPath, |
| Long.toString(uniqueNumberGenerator.incrementAndGet())); |
| FSDownload fsd = new FSDownload(files, |
| UserGroupInformation.getCurrentUser(), conf, destPath, rsrc); |
| pending.put(rsrc, exec.submit(fsd)); |
| exec.shutdown(); |
| while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS)); |
| try { |
| pending.get(rsrc).get(); // see if there was an Exception during download |
| FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( |
| basedir); |
| for (FileStatus filestatus : filesstatus) { |
| if (filestatus.isDirectory()) { |
| FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( |
| filestatus.getPath()); |
| for (FileStatus childfile : childFiles) { |
| if(strFileName.endsWith(".ZIP") && |
| childfile.getPath().getName().equals(strFileName) && |
| !childfile.isDirectory()) { |
| Assert.fail("Failure...After unzip, there should have been a" + |
| " directory formed with zip file name but found a file. " |
| + childfile.getPath()); |
| } |
| if (childfile.getPath().getName().startsWith("tmp")) { |
| Assert.fail("Tmp File should not have been there " |
| + childfile.getPath()); |
| } |
| } |
| } |
| } |
| }catch (Exception e) { |
| throw new IOException("Failed exec", e); |
| } |
| } |
| |
| @Test (timeout=10000) |
| public void testDownloadArchive() throws IOException, URISyntaxException, |
| InterruptedException { |
| downloadWithFileType(TEST_FILE_TYPE.TAR); |
| } |
| |
| @Test (timeout=10000) |
| public void testDownloadPatternJar() throws IOException, URISyntaxException, |
| InterruptedException { |
| downloadWithFileType(TEST_FILE_TYPE.JAR); |
| } |
| |
| @Test (timeout=10000) |
| public void testDownloadArchiveZip() throws IOException, URISyntaxException, |
| InterruptedException { |
| downloadWithFileType(TEST_FILE_TYPE.ZIP); |
| } |
| |
| /* |
| * To test fix for YARN-3029 |
| */ |
| @Test (timeout=10000) |
| public void testDownloadArchiveZipWithTurkishLocale() throws IOException, |
| URISyntaxException, InterruptedException { |
| Locale defaultLocale = Locale.getDefault(); |
| // Set to Turkish |
| Locale turkishLocale = new Locale("tr", "TR"); |
| Locale.setDefault(turkishLocale); |
| downloadWithFileType(TEST_FILE_TYPE.ZIP); |
| // Set the locale back to original default locale |
| Locale.setDefault(defaultLocale); |
| } |
| |
| @Test (timeout=10000) |
| public void testDownloadArchiveTgz() throws IOException, URISyntaxException, |
| InterruptedException { |
| downloadWithFileType(TEST_FILE_TYPE.TGZ); |
| } |
| |
| private void verifyPermsRecursively(FileSystem fs, |
| FileContext files, Path p, |
| LocalResourceVisibility vis) throws IOException { |
| FileStatus status = files.getFileStatus(p); |
| if (status.isDirectory()) { |
| if (vis == LocalResourceVisibility.PUBLIC) { |
| Assert.assertTrue(status.getPermission().toShort() == |
| FSDownload.PUBLIC_DIR_PERMS.toShort()); |
| } |
| else { |
| Assert.assertTrue(status.getPermission().toShort() == |
| FSDownload.PRIVATE_DIR_PERMS.toShort()); |
| } |
| if (!status.isSymlink()) { |
| FileStatus[] statuses = fs.listStatus(p); |
| for (FileStatus stat : statuses) { |
| verifyPermsRecursively(fs, files, stat.getPath(), vis); |
| } |
| } |
| } |
| else { |
| if (vis == LocalResourceVisibility.PUBLIC) { |
| Assert.assertTrue(status.getPermission().toShort() == |
| FSDownload.PUBLIC_FILE_PERMS.toShort()); |
| } |
| else { |
| Assert.assertTrue(status.getPermission().toShort() == |
| FSDownload.PRIVATE_FILE_PERMS.toShort()); |
| } |
| } |
| } |
| |
| @Test (timeout=10000) |
| public void testDirDownload() throws IOException, InterruptedException { |
| Configuration conf = new Configuration(); |
| FileContext files = FileContext.getLocalFSFileContext(conf); |
| final Path basedir = files.makeQualified(new Path("target", |
| TestFSDownload.class.getSimpleName())); |
| files.mkdir(basedir, null, true); |
| conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); |
| |
| Map<LocalResource, LocalResourceVisibility> rsrcVis = |
| new HashMap<LocalResource, LocalResourceVisibility>(); |
| |
| Random rand = new Random(); |
| long sharedSeed = rand.nextLong(); |
| rand.setSeed(sharedSeed); |
| System.out.println("SEED: " + sharedSeed); |
| |
| Map<LocalResource,Future<Path>> pending = |
| new HashMap<LocalResource,Future<Path>>(); |
| ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); |
| LocalDirAllocator dirs = |
| new LocalDirAllocator(TestFSDownload.class.getName()); |
| for (int i = 0; i < 5; ++i) { |
| LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; |
| if (i%2 == 1) { |
| vis = LocalResourceVisibility.APPLICATION; |
| } |
| |
| Path p = new Path(basedir, "dir" + i + ".jar"); |
| LocalResource rsrc = createJar(files, p, vis); |
| rsrcVis.put(rsrc, vis); |
| Path destPath = dirs.getLocalPathForWrite( |
| basedir.toString(), conf); |
| destPath = new Path (destPath, |
| Long.toString(uniqueNumberGenerator.incrementAndGet())); |
| FSDownload fsd = |
| new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, |
| destPath, rsrc); |
| pending.put(rsrc, exec.submit(fsd)); |
| } |
| |
| exec.shutdown(); |
| while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS)); |
| for (Future<Path> path: pending.values()) { |
| Assert.assertTrue(path.isDone()); |
| } |
| |
| try { |
| |
| for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) { |
| Path localized = p.getValue().get(); |
| FileStatus status = files.getFileStatus(localized); |
| |
| System.out.println("Testing path " + localized); |
| assert(status.isDirectory()); |
| assert(rsrcVis.containsKey(p.getKey())); |
| |
| verifyPermsRecursively(localized.getFileSystem(conf), |
| files, localized, rsrcVis.get(p.getKey())); |
| } |
| } catch (ExecutionException e) { |
| throw new IOException("Failed exec", e); |
| } |
| } |
| |
| @Test (timeout=10000) |
| public void testUniqueDestinationPath() throws Exception { |
| Configuration conf = new Configuration(); |
| FileContext files = FileContext.getLocalFSFileContext(conf); |
| final Path basedir = files.makeQualified(new Path("target", |
| TestFSDownload.class.getSimpleName())); |
| files.mkdir(basedir, null, true); |
| conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); |
| |
| ExecutorService singleThreadedExec = HadoopExecutors |
| .newSingleThreadExecutor(); |
| |
| LocalDirAllocator dirs = |
| new LocalDirAllocator(TestFSDownload.class.getName()); |
| Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf); |
| destPath = |
| new Path(destPath, Long.toString(uniqueNumberGenerator |
| .incrementAndGet())); |
| |
| Path p = new Path(basedir, "dir" + 0 + ".jar"); |
| LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; |
| LocalResource rsrc = createJar(files, p, vis); |
| FSDownload fsd = |
| new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, |
| destPath, rsrc); |
| Future<Path> rPath = singleThreadedExec.submit(fsd); |
| singleThreadedExec.shutdown(); |
| while (!singleThreadedExec.awaitTermination(1000, TimeUnit.MILLISECONDS)); |
| Assert.assertTrue(rPath.isDone()); |
| // Now FSDownload will not create a random directory to localize the |
| // resource. Therefore the final localizedPath for the resource should be |
| // destination directory (passed as an argument) + file name. |
| Assert.assertEquals(destPath, rPath.get().getParent()); |
| } |
| } |