blob: 51dbcaa3b31997c51968f9935a84739cb5eb17ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import com.google.common.annotations.VisibleForTesting;
/**
* A collection of {@link LocalizedResource}s all of same
* {@link LocalResourceVisibility}.
*
*/
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
private static final String RANDOM_DIR_REGEX = "-?\\d+";
private static final Pattern RANDOM_DIR_PATTERN = Pattern
.compile(RANDOM_DIR_REGEX);
private final String user;
private final ApplicationId appId;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
private LocalDirsHandlerService dirsHandler;
/*
* This flag controls whether this resource tracker uses hierarchical
* directories or not. For PRIVATE and PUBLIC resource trackers it
* will be set whereas for APPLICATION resource tracker it would
* be false.
*/
private final boolean useLocalCacheDirectoryManager;
private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
/*
* It is used to keep track of resource into hierarchical directory
* while it is getting downloaded. It is useful for reference counting
* in case resource localization fails.
*/
private ConcurrentHashMap<LocalResourceRequest, Path>
inProgressLocalResourcesMap;
/*
* starting with 10 to accommodate 0-9 directories created as a part of
* LocalCacheDirectoryManager. So there will be one unique number generator
* per APPLICATION, USER and PUBLIC cache.
*/
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
private NMStateStoreService stateStore;
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
Configuration conf, NMStateStoreService stateStore) {
this(user, appId, dispatcher,
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
useLocalCacheDirectoryManager, conf, stateStore, null);
}
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
Configuration conf, NMStateStoreService stateStore,
LocalDirsHandlerService dirHandler) {
this(user, appId, dispatcher,
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
useLocalCacheDirectoryManager, conf, stateStore, dirHandler);
}
LocalResourcesTrackerImpl(String user, ApplicationId appId,
Dispatcher dispatcher,
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc,
boolean useLocalCacheDirectoryManager, Configuration conf,
NMStateStoreService stateStore, LocalDirsHandlerService dirHandler) {
this.appId = appId;
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
if (this.useLocalCacheDirectoryManager) {
directoryManagers =
new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
inProgressLocalResourcesMap =
new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
this.stateStore = stateStore;
this.dirsHandler = dirHandler;
}
/*
* Synchronizing this method for avoiding races due to multiple ResourceEvent's
* coming to LocalResourcesTracker from Public/Private localizer and
* Resource Localization Service.
*/
@Override
public synchronized void handle(ResourceEvent event) {
LocalResourceRequest req = event.getLocalResourceRequest();
LocalizedResource rsrc = localrsrc.get(req);
switch (event.getType()) {
case LOCALIZED:
if (useLocalCacheDirectoryManager) {
inProgressLocalResourcesMap.remove(req);
}
break;
case REQUEST:
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
removeResource(req);
rsrc = null;
}
if (null == rsrc) {
rsrc = new LocalizedResource(req, dispatcher);
localrsrc.put(req, rsrc);
}
break;
case RELEASE:
if (null == rsrc) {
// The container sent a release event on a resource which
// 1) Failed
// 2) Removed for some reason (ex. disk is no longer accessible)
ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
LOG.info("Container " + relEvent.getContainer()
+ " sent RELEASE event on a resource request " + req
+ " not present in cache.");
return;
}
break;
case LOCALIZATION_FAILED:
/*
* If resource localization fails then Localized resource will be
* removed from local cache.
*/
removeResource(req);
break;
case RECOVERED:
if (rsrc != null) {
LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
return;
}
rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
localrsrc.put(req, rsrc);
break;
}
rsrc.handle(event);
if (event.getType() == ResourceEventType.LOCALIZED) {
if (rsrc.getLocalPath() != null) {
try {
stateStore.finishResourceLocalization(user, appId,
buildLocalizedResourceProto(rsrc));
} catch (IOException ioe) {
LOG.error("Error storing resource state for " + rsrc, ioe);
}
} else {
LOG.warn("Resource " + rsrc + " localized without a location");
}
}
}
private LocalizedResource recoverResource(LocalResourceRequest req,
ResourceRecoveredEvent event) {
// unique number for a resource is the directory of the resource
Path localDir = event.getLocalPath().getParent();
long rsrcId = Long.parseLong(localDir.getName());
// update ID generator to avoid conflicts with existing resources
while (true) {
long currentRsrcId = uniqueNumberGenerator.get();
long nextRsrcId = Math.max(currentRsrcId, rsrcId);
if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
break;
}
}
incrementFileCountForLocalCacheDirectory(localDir.getParent());
return new LocalizedResource(req, dispatcher);
}
private LocalizedResourceProto buildLocalizedResourceProto(
LocalizedResource rsrc) {
return LocalizedResourceProto.newBuilder()
.setResource(buildLocalResourceProto(rsrc.getRequest()))
.setLocalPath(rsrc.getLocalPath().toString())
.setSize(rsrc.getSize())
.build();
}
private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
LocalResourcePBImpl lrpb;
if (!(lr instanceof LocalResourcePBImpl)) {
lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
lr.getPattern());
}
lrpb = (LocalResourcePBImpl) lr;
return lrpb.getProto();
}
public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
if (useLocalCacheDirectoryManager) {
Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
cacheDir);
if (cacheRoot != null) {
LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
if (dir == null) {
dir = new LocalCacheDirectoryManager(conf);
LocalCacheDirectoryManager otherDir =
directoryManagers.putIfAbsent(cacheRoot, dir);
if (otherDir != null) {
dir = otherDir;
}
}
if (cacheDir.equals(cacheRoot)) {
dir.incrementFileCountForPath("");
} else {
String dirStr = cacheDir.toUri().getRawPath();
String rootStr = cacheRoot.toUri().getRawPath();
dir.incrementFileCountForPath(
dirStr.substring(rootStr.length() + 1));
}
}
}
}
/*
* Update the file-count statistics for a local cache-directory.
* This will retrieve the localized path for the resource from
* 1) inProgressRsrcMap if the resource was under localization and it
* failed.
* 2) LocalizedResource if the resource is already localized.
* From this path it will identify the local directory under which the
* resource was localized. Then rest of the path will be used to decrement
* file count for the HierarchicalSubDirectory pointing to this relative
* path.
*/
private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
LocalizedResource rsrc) {
if ( useLocalCacheDirectoryManager) {
Path rsrcPath = null;
if (inProgressLocalResourcesMap.containsKey(req)) {
// This happens when localization of a resource fails.
rsrcPath = inProgressLocalResourcesMap.remove(req);
} else if (rsrc != null && rsrc.getLocalPath() != null) {
rsrcPath = rsrc.getLocalPath().getParent().getParent();
}
if (rsrcPath != null) {
Path parentPath = new Path(rsrcPath.toUri().getRawPath());
while (!directoryManagers.containsKey(parentPath)) {
parentPath = parentPath.getParent();
if ( parentPath == null) {
return;
}
}
if ( parentPath != null) {
String parentDir = parentPath.toUri().getRawPath().toString();
LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
String rsrcDir = rsrcPath.toUri().getRawPath();
if (rsrcDir.equals(parentDir)) {
dir.decrementFileCountForPath("");
} else {
dir.decrementFileCountForPath(
rsrcDir.substring(
parentDir.length() + 1));
}
}
}
}
}
/**
* This module checks if the resource which was localized is already present
* or not
*
* @param rsrc
* @return true/false based on resource is present or not
*/
public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) {
File file = new File(rsrc.getLocalPath().toUri().getRawPath().
toString());
if (!file.exists()) {
ret = false;
} else if (dirsHandler != null) {
ret = checkLocalResource(rsrc);
}
}
return ret;
}
/**
* Check if the rsrc is Localized on a good dir.
*
* @param rsrc
* @return
*/
@VisibleForTesting
boolean checkLocalResource(LocalizedResource rsrc) {
List<String> localDirs = dirsHandler.getLocalDirsForRead();
for (String dir : localDirs) {
if (isParent(rsrc.getLocalPath().toUri().getPath(), dir)) {
return true;
} else {
continue;
}
}
return false;
}
/**
* @param path
* @param parentdir
* @return true if parentdir is parent of path else false.
*/
private boolean isParent(String path, String parentdir) {
// Add separator if not present.
if (path.charAt(path.length() - 1) != File.separatorChar) {
path += File.separator;
}
return path.startsWith(parentdir);
}
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) {
LOG.error("Attempt to remove absent resource: " + rem.getRequest()
+ " from " + getUser());
return true;
}
if (rsrc.getRefCount() > 0
|| ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
// internal error
LOG.error("Attempt to remove resource: " + rsrc
+ " with non-zero refcount");
return false;
} else { // ResourceState is LOCALIZED or INIT
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
removeResource(rem.getRequest());
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
return true;
}
}
private void removeResource(LocalResourceRequest req) {
LocalizedResource rsrc = localrsrc.remove(req);
decrementFileCountForLocalCacheDirectory(req, rsrc);
if (rsrc != null) {
Path localPath = rsrc.getLocalPath();
if (localPath != null) {
try {
stateStore.removeLocalizedResource(user, appId, localPath);
} catch (IOException e) {
LOG.error("Unable to remove resource " + rsrc + " from state store",
e);
}
}
}
}
/**
* Returns the path up to the random directory component.
*/
private Path getPathToDelete(Path localPath) {
Path delPath = localPath.getParent();
String name = delPath.getName();
Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
if (matcher.matches()) {
return delPath;
} else {
LOG.warn("Random directory component did not match. " +
"Deleting localized path only");
return localPath;
}
}
@Override
public String getUser() {
return user;
}
@Override
public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator();
}
/**
* @return {@link Path} absolute path for localization which includes local
* directory path and the relative hierarchical path (if use local
* cache directory manager is enabled)
*
* @param {@link LocalResourceRequest} Resource localization request to
* localize the resource.
* @param {@link Path} local directory path
* @param {@link DeletionService} Deletion Service to delete existing
* path for localization.
*/
@Override
public Path getPathForLocalization(LocalResourceRequest req,
Path localDirPath, DeletionService delService) {
Path rPath = localDirPath;
if (useLocalCacheDirectoryManager && localDirPath != null) {
if (!directoryManagers.containsKey(localDirPath)) {
directoryManagers.putIfAbsent(localDirPath,
new LocalCacheDirectoryManager(conf));
}
LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
rPath = localDirPath;
String hierarchicalPath = dir.getRelativePathForLocalization();
// For most of the scenarios we will get root path only which
// is an empty string
if (!hierarchicalPath.isEmpty()) {
rPath = new Path(localDirPath, hierarchicalPath);
}
inProgressLocalResourcesMap.put(req, rPath);
}
while (true) {
Path uniquePath = new Path(rPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
File file = new File(uniquePath.toUri().getRawPath());
if (!file.exists()) {
rPath = uniquePath;
break;
}
// If the directory already exists, delete it and move to next one.
LOG.warn("Directory " + uniquePath + " already exists, " +
"try next one.");
if (delService != null) {
delService.delete(getUser(), uniquePath);
}
}
Path localPath = new Path(rPath, req.getPath().getName());
LocalizedResource rsrc = localrsrc.get(req);
rsrc.setLocalPath(localPath);
LocalResource lr = LocalResource.newInstance(req.getResource(),
req.getType(), req.getVisibility(), req.getSize(),
req.getTimestamp());
try {
stateStore.startResourceLocalization(user, appId,
((LocalResourcePBImpl) lr).getProto(), localPath);
} catch (IOException e) {
LOG.error("Unable to record localization start for " + rsrc, e);
}
return rPath;
}
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
@VisibleForTesting
LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
LocalCacheDirectoryManager mgr = null;
if (useLocalCacheDirectoryManager) {
mgr = directoryManagers.get(localDirPath);
}
return mgr;
}
}