| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; |
| |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.security.PrivilegedAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Stack; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CompletionService; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.util.DiskValidator; |
| import org.apache.hadoop.util.DiskValidatorFactory; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.SerializedException; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; |
| import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; |
| import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; |
| import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; |
| import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; |
| import org.apache.hadoop.yarn.util.FSDownload; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import static org.apache.hadoop.util.Shell.getAllShells; |
| |
| public class ContainerLocalizer { |
| |
| static final Logger LOG = |
| LoggerFactory.getLogger(ContainerLocalizer.class); |
| |
| public static final String FILECACHE = "filecache"; |
| public static final String APPCACHE = "appcache"; |
| public static final String USERCACHE = "usercache"; |
| public static final String OUTPUTDIR = "output"; |
| public static final String TOKEN_FILE_NAME_FMT = "%s.tokens"; |
| public static final String WORKDIR = "work"; |
| private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs"; |
| private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs"; |
| private static final FsPermission FILECACHE_PERMS = |
| new FsPermission((short)0710); |
| private static final FsPermission USERCACHE_FOLDER_PERMS = |
| new FsPermission((short) 0755); |
| |
| private final String user; |
| private final String appId; |
| private final List<Path> localDirs; |
| private final String localizerId; |
| private final FileContext lfs; |
| private final Configuration conf; |
| private final RecordFactory recordFactory; |
| private final Map<LocalResource,Future<Path>> pendingResources; |
| private final String appCacheDirContextName; |
| private final DiskValidator diskValidator; |
| |
| private Set<Thread> localizingThreads = |
| Collections.synchronizedSet(new HashSet<Thread>()); |
| |
| public ContainerLocalizer(FileContext lfs, String user, String appId, |
| String localizerId, List<Path> localDirs, |
| RecordFactory recordFactory) throws IOException { |
| if (null == user) { |
| throw new IOException("Cannot initialize for null user"); |
| } |
| if (null == localizerId) { |
| throw new IOException("Cannot initialize for null containerId"); |
| } |
| this.lfs = lfs; |
| this.user = user; |
| this.appId = appId; |
| this.localDirs = localDirs; |
| this.localizerId = localizerId; |
| this.recordFactory = recordFactory; |
| this.conf = new YarnConfiguration(); |
| this.diskValidator = DiskValidatorFactory.getInstance( |
| conf.get(YarnConfiguration.DISK_VALIDATOR, |
| YarnConfiguration.DEFAULT_DISK_VALIDATOR)); |
| LOG.info("Disk Validator: " + YarnConfiguration.DISK_VALIDATOR + |
| " is loaded."); |
| this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId); |
| this.pendingResources = new HashMap<LocalResource,Future<Path>>(); |
| } |
| |
| @Private |
| @VisibleForTesting |
| public LocalizationProtocol getProxy(final InetSocketAddress nmAddr) { |
| YarnRPC rpc = YarnRPC.create(conf); |
| return (LocalizationProtocol) |
| rpc.getProxy(LocalizationProtocol.class, nmAddr, conf); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public void runLocalization(final InetSocketAddress nmAddr) |
| throws IOException, InterruptedException { |
| // load credentials |
| initDirs(conf, user, appId, lfs, localDirs); |
| final Credentials creds = new Credentials(); |
| DataInputStream credFile = null; |
| try { |
| // assume credentials in cwd |
| // TODO: Fix |
| Path tokenPath = |
| new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId)); |
| credFile = lfs.open(tokenPath); |
| creds.readTokenStorageStream(credFile); |
| // Explicitly deleting token file. |
| lfs.delete(tokenPath, false); |
| } finally { |
| if (credFile != null) { |
| credFile.close(); |
| } |
| } |
| // create localizer context |
| UserGroupInformation remoteUser = |
| UserGroupInformation.createRemoteUser(user); |
| remoteUser.addToken(creds.getToken(LocalizerTokenIdentifier.KIND)); |
| final LocalizationProtocol nodeManager = |
| remoteUser.doAs(new PrivilegedAction<LocalizationProtocol>() { |
| @Override |
| public LocalizationProtocol run() { |
| return getProxy(nmAddr); |
| } |
| }); |
| |
| // create user context |
| UserGroupInformation ugi = |
| UserGroupInformation.createRemoteUser(user); |
| for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) { |
| ugi.addToken(token); |
| } |
| |
| ExecutorService exec = null; |
| try { |
| exec = createDownloadThreadPool(); |
| CompletionService<Path> ecs = createCompletionService(exec); |
| localizeFiles(nodeManager, ecs, ugi); |
| } catch (Throwable e) { |
| throw new IOException(e); |
| } finally { |
| try { |
| if (exec != null) { |
| exec.shutdown(); |
| destroyShellProcesses(getAllShells()); |
| exec.awaitTermination(10, TimeUnit.SECONDS); |
| } |
| LocalDirAllocator.removeContext(appCacheDirContextName); |
| } finally { |
| closeFileSystems(ugi); |
| } |
| } |
| } |
| |
| ExecutorService createDownloadThreadPool() { |
| return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder() |
| .setNameFormat("ContainerLocalizer Downloader").build()); |
| } |
| |
| CompletionService<Path> createCompletionService(ExecutorService exec) { |
| return new ExecutorCompletionService<Path>(exec); |
| } |
| |
| class FSDownloadWrapper extends FSDownload { |
| |
| FSDownloadWrapper(FileContext files, UserGroupInformation ugi, |
| Configuration conf, Path destDirPath, LocalResource resource) { |
| super(files, ugi, conf, destDirPath, resource); |
| } |
| |
| @Override |
| public Path call() throws Exception { |
| Thread currentThread = Thread.currentThread(); |
| localizingThreads.add(currentThread); |
| try { |
| return doDownloadCall(); |
| } finally { |
| localizingThreads.remove(currentThread); |
| } |
| } |
| |
| Path doDownloadCall() throws Exception { |
| return super.call(); |
| } |
| |
| } |
| |
| Callable<Path> download(Path destDirPath, LocalResource rsrc, |
| UserGroupInformation ugi) throws IOException { |
| // For private localization FsDownload creates folder in destDirPath. Parent |
| // directories till user filecache folder is created here. |
| if (rsrc.getVisibility() == LocalResourceVisibility.PRIVATE) { |
| createParentDirs(destDirPath); |
| } |
| diskValidator.checkStatus(new File(destDirPath.toUri().getRawPath())); |
| return new FSDownloadWrapper(lfs, ugi, conf, destDirPath, rsrc); |
| } |
| |
| private void createParentDirs(Path destDirPath) throws IOException { |
| Path parent = destDirPath.getParent(); |
| Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(parent); |
| Stack<Path> dirs = new Stack<Path>(); |
| while (!parent.equals(cacheRoot)) { |
| dirs.push(parent); |
| parent = parent.getParent(); |
| } |
| // Create directories with user cache permission |
| while (!dirs.isEmpty()) { |
| createDir(lfs, dirs.pop(), USERCACHE_FOLDER_PERMS); |
| } |
| } |
| |
| static long getEstimatedSize(LocalResource rsrc) { |
| if (rsrc.getSize() < 0) { |
| return -1; |
| } |
| switch (rsrc.getType()) { |
| case ARCHIVE: |
| case PATTERN: |
| return 5 * rsrc.getSize(); |
| case FILE: |
| default: |
| return rsrc.getSize(); |
| } |
| } |
| |
| void sleep(int duration) throws InterruptedException { |
| TimeUnit.SECONDS.sleep(duration); |
| } |
| |
| protected void closeFileSystems(UserGroupInformation ugi) { |
| try { |
| FileSystem.closeAllForUGI(ugi); |
| } catch (IOException e) { |
| LOG.warn("Failed to close filesystems: ", e); |
| } |
| } |
| |
| protected void localizeFiles(LocalizationProtocol nodemanager, |
| CompletionService<Path> cs, UserGroupInformation ugi) |
| throws IOException, YarnException { |
| while (true) { |
| try { |
| LocalizerStatus status = createStatus(); |
| LocalizerHeartbeatResponse response = nodemanager.heartbeat(status); |
| switch (response.getLocalizerAction()) { |
| case LIVE: |
| List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs(); |
| for (ResourceLocalizationSpec newRsrc : newRsrcs) { |
| if (!pendingResources.containsKey(newRsrc.getResource())) { |
| pendingResources.put(newRsrc.getResource(), cs.submit(download( |
| new Path(newRsrc.getDestinationDirectory().getFile()), |
| newRsrc.getResource(), ugi))); |
| } |
| } |
| break; |
| case DIE: |
| // killall running localizations |
| for (Future<Path> pending : pendingResources.values()) { |
| pending.cancel(true); |
| } |
| status = createStatus(); |
| // ignore response while dying. |
| try { |
| nodemanager.heartbeat(status); |
| } catch (YarnException e) { |
| // Cannot do anything about this during death stage, let's just log |
| // it. |
| e.printStackTrace(System.out); |
| LOG.error("Heartbeat failed while dying: ", e); |
| } |
| return; |
| } |
| cs.poll(1000, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| return; |
| } catch (YarnException e) { |
| // TODO cleanup |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Create the payload for the HeartBeat. Mainly the list of |
| * {@link LocalResourceStatus}es |
| * |
| * @return a {@link LocalizerStatus} that can be sent via heartbeat. |
| * @throws InterruptedException |
| */ |
| private LocalizerStatus createStatus() throws InterruptedException { |
| final List<LocalResourceStatus> currentResources = |
| new ArrayList<LocalResourceStatus>(); |
| // TODO: Synchronization?? |
| for (Iterator<LocalResource> i = pendingResources.keySet().iterator(); |
| i.hasNext();) { |
| LocalResource rsrc = i.next(); |
| LocalResourceStatus stat = |
| recordFactory.newRecordInstance(LocalResourceStatus.class); |
| stat.setResource(rsrc); |
| Future<Path> fPath = pendingResources.get(rsrc); |
| if (fPath.isDone()) { |
| try { |
| Path localPath = fPath.get(); |
| stat.setLocalPath( |
| URL.fromPath(localPath)); |
| stat.setLocalSize( |
| FileUtil.getDU(new File(localPath.getParent().toUri()))); |
| stat.setStatus(ResourceStatusType.FETCH_SUCCESS); |
| } catch (ExecutionException e) { |
| stat.setStatus(ResourceStatusType.FETCH_FAILURE); |
| stat.setException(SerializedException.newInstance(e.getCause())); |
| } catch (CancellationException e) { |
| stat.setStatus(ResourceStatusType.FETCH_FAILURE); |
| stat.setException(SerializedException.newInstance(e)); |
| } |
| // TODO shouldn't remove until ACK |
| i.remove(); |
| } else { |
| stat.setStatus(ResourceStatusType.FETCH_PENDING); |
| } |
| currentResources.add(stat); |
| } |
| LocalizerStatus status = |
| recordFactory.newRecordInstance(LocalizerStatus.class); |
| status.setLocalizerId(localizerId); |
| status.addAllResources(currentResources); |
| return status; |
| } |
| |
| /** |
| * Returns the JVM options to to launch the resource localizer. |
| * @param conf the configuration properties to launch the resource localizer. |
| */ |
| public static List<String> getJavaOpts(Configuration conf) { |
| String opts = conf.get(YarnConfiguration.NM_CONTAINER_LOCALIZER_JAVA_OPTS_KEY, |
| YarnConfiguration.NM_CONTAINER_LOCALIZER_JAVA_OPTS_DEFAULT); |
| return Arrays.asList(opts.split(" ")); |
| } |
| |
| /** |
| * Adds the ContainerLocalizer arguments for a @{link ShellCommandExecutor}, |
| * as expected by ContainerLocalizer.main |
| * @param command the current ShellCommandExecutor command line |
| * @param user localization user |
| * @param appId localized app id |
| * @param locId localizer id |
| * @param nmAddr nodemanager address |
| * @param localDirs list of local dirs |
| */ |
| public static void buildMainArgs(List<String> command, |
| String user, String appId, String locId, |
| InetSocketAddress nmAddr, List<String> localDirs, Configuration conf) { |
| |
| String logLevel = conf.get(YarnConfiguration. |
| NM_CONTAINER_LOCALIZER_LOG_LEVEL, |
| YarnConfiguration.NM_CONTAINER_LOCALIZER_LOG_LEVEL_DEFAULT); |
| addLog4jSystemProperties(logLevel, command); |
| command.add(ContainerLocalizer.class.getName()); |
| command.add(user); |
| command.add(appId); |
| command.add(locId); |
| command.add(nmAddr.getHostName()); |
| command.add(Integer.toString(nmAddr.getPort())); |
| for(String dir : localDirs) { |
| command.add(dir); |
| } |
| } |
| |
| private static void addLog4jSystemProperties( |
| String logLevel, List<String> command) { |
| command.add("-Dlog4j.configuration=container-log4j.properties"); |
| command.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + |
| ApplicationConstants.LOG_DIR_EXPANSION_VAR); |
| command.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=0"); |
| command.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); |
| command.add("-Dhadoop.root.logfile=container-localizer-syslog"); |
| } |
| |
| public static void main(String[] argv) throws Throwable { |
| Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); |
| int nRet = 0; |
| // usage: $0 user appId locId host port app_log_dir user_dir [user_dir]* |
| // let $x = $x/usercache for $local.dir |
| // MKDIR $x/$user/appcache/$appid |
| // MKDIR $x/$user/appcache/$appid/output |
| // MKDIR $x/$user/appcache/$appid/filecache |
| // LOAD $x/$user/appcache/$appid/appTokens |
| try { |
| String user = argv[0]; |
| String appId = argv[1]; |
| String locId = argv[2]; |
| InetSocketAddress nmAddr = |
| new InetSocketAddress(argv[3], Integer.parseInt(argv[4])); |
| String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length); |
| ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length); |
| for (String sLocaldir : sLocaldirs) { |
| localDirs.add(new Path(sLocaldir)); |
| } |
| |
| final String uid = |
| UserGroupInformation.getCurrentUser().getShortUserName(); |
| if (!user.equals(uid)) { |
| // TODO: fail localization |
| LOG.warn("Localization running as " + uid + " not " + user); |
| } |
| |
| ContainerLocalizer localizer = |
| new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, |
| appId, locId, localDirs, |
| RecordFactoryProvider.getRecordFactory(null)); |
| localizer.runLocalization(nmAddr); |
| return; |
| } catch (Throwable e) { |
| // Print traces to stdout so that they can be logged by the NM address |
| // space in both DefaultCE and LCE cases |
| e.printStackTrace(System.out); |
| LOG.error("Exception in main:", e); |
| nRet = -1; |
| } finally { |
| System.exit(nRet); |
| } |
| } |
| |
| private static void initDirs(Configuration conf, String user, String appId, |
| FileContext lfs, List<Path> localDirs) throws IOException { |
| if (null == localDirs || 0 == localDirs.size()) { |
| throw new IOException("Cannot initialize without local dirs"); |
| } |
| String[] appsFileCacheDirs = new String[localDirs.size()]; |
| String[] usersFileCacheDirs = new String[localDirs.size()]; |
| for (int i = 0, n = localDirs.size(); i < n; ++i) { |
| // $x/usercache/$user |
| Path base = lfs.makeQualified( |
| new Path(new Path(localDirs.get(i), USERCACHE), user)); |
| // $x/usercache/$user/filecache |
| Path userFileCacheDir = new Path(base, FILECACHE); |
| usersFileCacheDirs[i] = userFileCacheDir.toString(); |
| createDir(lfs, userFileCacheDir, FILECACHE_PERMS); |
| // $x/usercache/$user/appcache/$appId |
| Path appBase = new Path(base, new Path(APPCACHE, appId)); |
| // $x/usercache/$user/appcache/$appId/filecache |
| Path appFileCacheDir = new Path(appBase, FILECACHE); |
| appsFileCacheDirs[i] = appFileCacheDir.toString(); |
| createDir(lfs, appFileCacheDir, FILECACHE_PERMS); |
| } |
| conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs); |
| conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs); |
| } |
| |
| private static void createDir(FileContext lfs, Path dirPath, |
| FsPermission perms) throws IOException { |
| lfs.mkdir(dirPath, perms, false); |
| if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { |
| lfs.setPermission(dirPath, perms); |
| } |
| } |
| |
| private void destroyShellProcesses(Set<Shell> shells) { |
| for (Shell shell : shells) { |
| if(localizingThreads.contains(shell.getWaitingThread())) { |
| shell.getProcess().destroy(); |
| } |
| } |
| } |
| } |