blob: 616406f7dc1f36f2301e7a747a5ef3a18eda2954 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.localizer;
import org.apache.storm.Config;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.utils.ShellUtils.ExitCodeException;
import org.apache.storm.utils.ShellUtils.ShellCommandExecutor;
import org.apache.storm.utils.Utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.apache.storm.utils.Utils.OR;
/**
* Class to download and manage files from the blobstore. It uses an LRU cache
* to determine which files to keep so they can be reused and which files to delete.
*/
public class Localizer {
public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
public static final String USERCACHE = "usercache";
public static final String FILECACHE = "filecache";
// sub directories to store either files or uncompressed archives respectively
public static final String FILESDIR = "files";
public static final String ARCHIVESDIR = "archives";
private static final String TO_UNCOMPRESS = "_tmp_";
private final Map<String, Object> _conf;
private final int _threadPoolSize;
// thread pool for initial download
private final ExecutorService _execService;
// thread pool for updates
private final ExecutorService _updateExecService;
private final int _blobDownloadRetries;
// track resources - user to resourceSet
private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
ConcurrentHashMap<String, LocalizedResourceSet>();
private final String _localBaseDir;
// cleanup
private long _cacheTargetSize;
private long _cacheCleanupPeriod;
private ScheduledExecutorService _cacheCleanupService;
public Localizer(Map<String, Object> conf, String baseDir) {
_conf = conf;
_localBaseDir = baseDir;
// default cache size 10GB, converted to Bytes
_cacheTargetSize = Utils.getInt(_conf.get(Config.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
10 * 1024).longValue() << 20;
// default 10 minutes.
_cacheCleanupPeriod = Utils.getInt(_conf.get(
Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
// if we needed we could make config for update thread pool size
_threadPoolSize = Utils.getInt(_conf.get(Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
_blobDownloadRetries = Utils.getInt(_conf.get(
Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
_execService = Executors.newFixedThreadPool(_threadPoolSize);
_updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
reconstructLocalizedResources();
}
// For testing, it allows setting size in bytes
protected void setTargetCacheSize(long size) {
_cacheTargetSize = size;
}
// For testing, be careful as it doesn't clone
ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
return _userRsrc;
}
public void startCleaner() {
_cacheCleanupService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("Localizer Cache Cleanup")
.build());
_cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
handleCacheCleanup();
}
}, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
}
public void shutdown() {
if (_cacheCleanupService != null) {
_cacheCleanupService.shutdown();
}
if (_execService != null) {
_execService.shutdown();
}
if (_updateExecService != null) {
_updateExecService.shutdown();
}
}
// baseDir/supervisor/usercache/
protected File getUserCacheDir() {
return new File(_localBaseDir, USERCACHE);
}
// baseDir/supervisor/usercache/user1/
protected File getLocalUserDir(String userName) {
return new File(getUserCacheDir(), userName);
}
// baseDir/supervisor/usercache/user1/filecache
public File getLocalUserFileCacheDir(String userName) {
return new File(getLocalUserDir(userName), FILECACHE);
}
// baseDir/supervisor/usercache/user1/filecache/files
protected File getCacheDirForFiles(File dir) {
return new File(dir, FILESDIR);
}
// get the directory to put uncompressed archives in
// baseDir/supervisor/usercache/user1/filecache/archives
protected File getCacheDirForArchives(File dir) {
return new File(dir, ARCHIVESDIR);
}
protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
boolean uncompress) {
File[] lrsrcs = readCurrentBlobs(dir);
if (lrsrcs != null) {
for (File rsrc : lrsrcs) {
LOG.info("add localized in dir found: " + rsrc);
/// strip off .suffix
String path = rsrc.getPath();
int p = path.lastIndexOf('.');
if (p > 0) {
path = path.substring(0, p);
}
LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
uncompress);
lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
}
}
}
protected File[] readDirContents(String location) {
File dir = new File(location);
File[] files = null;
if (dir.exists()) {
files = dir.listFiles();
}
return files;
}
// Looks for files in the directory with .current suffix
protected File[] readCurrentBlobs(String location) {
File dir = new File(location);
File[] files = null;
if (dir.exists()) {
files = dir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().endsWith(Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
}
});
}
return files;
}
// Check to see if there are any existing files already localized.
protected void reconstructLocalizedResources() {
try {
LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
File[] users = readDirContents(getUserCacheDir().getPath());
if (users != null) {
for (File userDir : users) {
String user = userDir.getName();
LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
LocalizedResourceSet newSet = new LocalizedResourceSet(user);
LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
if (lrsrcSet == null) {
lrsrcSet = newSet;
}
addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
lrsrcSet, false);
addLocalizedResourceInDir(
getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
lrsrcSet, true);
}
} else {
LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
}
} catch (Exception e) {
LOG.error("ERROR reconstructing localized resources", e);
}
}
// ignores invalid user/topo/key
public synchronized void removeBlobReference(String key, String user, String topo,
boolean uncompress) throws AuthorizationException, KeyNotFoundException {
LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
if (lrsrcSet != null) {
LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
if (lrsrc != null) {
LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
lrsrc.removeReference(topo);
} else {
LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
" topo: " + topo);
}
} else {
LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
+ key + " topo: " + topo);
}
}
public synchronized void addReferences(List<LocalResource> localresource, String user,
String topo) {
LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
if (lrsrcSet != null) {
for (LocalResource blob : localresource) {
LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
if (lrsrc != null) {
lrsrc.addReference(topo);
LOG.debug("added reference for topo: {} key: {}", topo, blob);
} else {
LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
}
}
} else {
LOG.warn("trying to add reference to non-existent local resource set, " +
"user: " + user + " topo: " + topo);
}
}
/**
* This function either returns the blob in the existing cache or if it doesn't exist in the
* cache, it will download the blob and will block until the download is complete.
*/
public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
arr.add(localResource);
List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
if (results.isEmpty() || results.size() != 1) {
throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
", topo: " + topo);
}
return results.get(0);
}
protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
File versionFile = new File(lrsrc.getVersionFilePath());
return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
}
protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
String localFile = lrsrc.getFilePath();
long nimbusBlobVersion = Utils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
long currentBlobVersion = Utils.localVersionOfBlob(localFile);
return (nimbusBlobVersion == currentBlobVersion);
}
protected ClientBlobStore getClientBlobStore() {
return Utils.getClientBlobStoreForSupervisor(_conf);
}
/**
* This function updates blobs on the supervisor. It uses a separate thread pool and runs
* asynchronously of the download and delete.
*/
public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
String user) throws AuthorizationException, KeyNotFoundException, IOException {
LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
ArrayList<LocalizedResource> results = new ArrayList<>();
ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
if (lrsrcSet == null) {
// resource set must have been removed
return results;
}
ClientBlobStore blobstore = null;
try {
blobstore = getClientBlobStore();
for (LocalResource localResource: localResources) {
String key = localResource.getBlobName();
LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
if (lrsrc == null) {
LOG.warn("blob requested for update doesn't exist: {}", key);
continue;
} else if ((boolean)OR(_conf.get(Config.DISABLE_SYMLINKS), false)) {
LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
continue;
} else {
// update it if either the version isn't the latest or if any local blob files are missing
if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
!isLocalizedResourceDownloaded(lrsrc)) {
LOG.debug("updating blob: {}", key);
updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
lrsrc.isUncompressed(), true));
}
}
}
} finally {
if(blobstore != null) {
blobstore.shutdown();
}
}
try {
List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
for (Future<LocalizedResource> futureRsrc : futures) {
try {
LocalizedResource lrsrc = futureRsrc.get();
// put the resource just in case it was removed at same time by the cleaner
LocalizedResourceSet newSet = new LocalizedResourceSet(user);
LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
if (newlrsrcSet == null) {
newlrsrcSet = newSet;
}
newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
results.add(lrsrc);
}
catch (ExecutionException e) {
LOG.error("Error updating blob: ", e);
if (e.getCause() instanceof AuthorizationException) {
throw (AuthorizationException)e.getCause();
}
if (e.getCause() instanceof KeyNotFoundException) {
throw (KeyNotFoundException)e.getCause();
}
}
}
} catch (RejectedExecutionException re) {
LOG.error("Error updating blobs : ", re);
} catch (InterruptedException ie) {
throw new IOException("Interrupted Exception", ie);
}
return results;
}
/**
* This function either returns the blobs in the existing cache or if they don't exist in the
* cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
* and will block until all of them have been downloaded
*/
public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
String user, String topo, File userFileDir)
throws AuthorizationException, KeyNotFoundException, IOException {
if ((boolean)OR(_conf.get(Config.DISABLE_SYMLINKS), false)) {
throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
}
LocalizedResourceSet newSet = new LocalizedResourceSet(user);
LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
if (lrsrcSet == null) {
lrsrcSet = newSet;
}
ArrayList<LocalizedResource> results = new ArrayList<>();
ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
ClientBlobStore blobstore = null;
try {
blobstore = getClientBlobStore();
for (LocalResource localResource: localResources) {
String key = localResource.getBlobName();
boolean uncompress = localResource.shouldUncompress();
LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
boolean isUpdate = false;
if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
(isLocalizedResourceDownloaded(lrsrc))) {
if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
LOG.debug("blob already exists: {}", key);
lrsrc.addReference(topo);
results.add(lrsrc);
continue;
}
LOG.debug("blob exists but isn't up to date: {}", key);
isUpdate = true;
}
// go off to blobstore and get it
// assume dir passed in exists and has correct permission
LOG.debug("fetching blob: {}", key);
File downloadDir = getCacheDirForFiles(userFileDir);
File localFile = new File(downloadDir, key);
if (uncompress) {
// for compressed file, download to archives dir
downloadDir = getCacheDirForArchives(userFileDir);
localFile = new File(downloadDir, key);
}
downloadDir.mkdir();
downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
isUpdate));
}
} finally {
if(blobstore !=null) {
blobstore.shutdown();
}
}
try {
List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
for (Future<LocalizedResource> futureRsrc: futures) {
LocalizedResource lrsrc = futureRsrc.get();
lrsrc.addReference(topo);
lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
results.add(lrsrc);
}
} catch (ExecutionException e) {
if (e.getCause() instanceof AuthorizationException)
throw (AuthorizationException)e.getCause();
else if (e.getCause() instanceof KeyNotFoundException) {
throw (KeyNotFoundException)e.getCause();
} else {
throw new IOException("Error getting blobs", e);
}
} catch (RejectedExecutionException re) {
throw new IOException("RejectedExecutionException: ", re);
} catch (InterruptedException ie) {
throw new IOException("Interrupted Exception", ie);
}
return results;
}
static class DownloadBlob implements Callable<LocalizedResource> {
private Localizer _localizer;
private Map _conf;
private String _key;
private File _localFile;
private String _user;
private boolean _uncompress;
private boolean _isUpdate;
public DownloadBlob(Localizer localizer, Map conf, String key, File localFile,
String user, boolean uncompress, boolean update) {
_localizer = localizer;
_conf = conf;
_key = key;
_localFile = localFile;
_user = user;
_uncompress = uncompress;
_isUpdate = update;
}
@Override
public LocalizedResource call()
throws AuthorizationException, KeyNotFoundException, IOException {
return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
_isUpdate);
}
}
private LocalizedResource downloadBlob(Map conf, String key, File localFile,
String user, boolean uncompress, boolean isUpdate)
throws AuthorizationException, KeyNotFoundException, IOException {
ClientBlobStore blobstore = null;
try {
blobstore = getClientBlobStore();
long nimbusBlobVersion = Utils.nimbusVersionOfBlob(key, blobstore);
long oldVersion = Utils.localVersionOfBlob(localFile.toString());
FileOutputStream out = null;
PrintWriter writer = null;
int numTries = 0;
String localizedPath = localFile.toString();
String localFileWithVersion = Utils.constructBlobWithVersionFileName(localFile.toString(),
nimbusBlobVersion);
String localVersionFile = Utils.constructVersionFileName(localFile.toString());
String downloadFile = localFileWithVersion;
if (uncompress) {
// we need to download to temp file and then unpack into the one requested
downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
}
while (numTries < _blobDownloadRetries) {
out = new FileOutputStream(downloadFile);
numTries++;
try {
if (!Utils.canUserReadBlob(blobstore.getBlobMeta(key), user)) {
throw new AuthorizationException(user + " does not have READ access to " + key);
}
InputStreamWithMeta in = blobstore.getBlob(key);
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) >= 0) {
out.write(buffer, 0, len);
}
out.close();
in.close();
if (uncompress) {
Utils.unpack(new File(downloadFile), new File(localFileWithVersion));
LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
}
// Next write the version.
LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
nimbusBlobVersion + " local version was: " + oldVersion);
// The false parameter ensures overwriting the version file, not appending
writer = new PrintWriter(
new BufferedWriter(new FileWriter(localVersionFile, false)));
writer.println(nimbusBlobVersion);
writer.close();
try {
setBlobPermissions(conf, user, localFileWithVersion);
setBlobPermissions(conf, user, localVersionFile);
// Update the key.current symlink. First create tmp symlink and do
// move of tmp to current so that the operation is atomic.
String tmp_uuid_local = java.util.UUID.randomUUID().toString();
LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
"linking to: " + localFile + "." + nimbusBlobVersion);
File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
Files.createSymbolicLink(uuid_symlink.toPath(),
Paths.get(Utils.constructBlobWithVersionFileName(localFile.toString(),
nimbusBlobVersion)));
File current_symlink = new File(Utils.constructBlobCurrentSymlinkName(
localFile.toString()));
Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
} catch (IOException e) {
// if we fail after writing the version file but before we move current link we need to
// restore the old version to the file
try {
PrintWriter restoreWriter = new PrintWriter(
new BufferedWriter(new FileWriter(localVersionFile, false)));
restoreWriter.println(oldVersion);
restoreWriter.close();
} catch (IOException ignore) {}
throw e;
}
String oldBlobFile = localFile + "." + oldVersion;
try {
// Remove the old version. Note that if a number of processes have that file open,
// the OS will keep the old blob file around until they all close the handle and only
// then deletes it. No new process will open the old blob, since the users will open the
// blob through the "blob.current" symlink, which always points to the latest version of
// a blob. Remove the old version after the current symlink is updated as to not affect
// anyone trying to read it.
if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
LOG.info("Removing an old blob file:" + oldBlobFile);
Files.delete(Paths.get(oldBlobFile));
}
} catch (IOException e) {
// At this point we have downloaded everything and moved symlinks. If the remove of
// old fails just log an error
LOG.error("Exception removing old blob version: " + oldBlobFile);
}
break;
} catch (AuthorizationException ae) {
// we consider this non-retriable exceptions
if (out != null) {
out.close();
}
new File(downloadFile).delete();
throw ae;
} catch (IOException | KeyNotFoundException e) {
if (out != null) {
out.close();
}
if (writer != null) {
writer.close();
}
new File(downloadFile).delete();
if (uncompress) {
try {
FileUtils.deleteDirectory(new File(localFileWithVersion));
} catch (IOException ignore) {}
}
if (!isUpdate) {
// don't want to remove existing version file if its an update
new File(localVersionFile).delete();
}
if (numTries < _blobDownloadRetries) {
LOG.error("Failed to download blob, retrying", e);
} else {
throw e;
}
}
}
return new LocalizedResource(key, localizedPath, uncompress);
} finally {
if(blobstore != null) {
blobstore.shutdown();
}
}
}
public void setBlobPermissions(Map conf, String user, String path)
throws IOException {
if (!Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
return;
}
String wlCommand = Utils.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
if (wlCommand.isEmpty()) {
String stormHome = System.getProperty("storm.home");
wlCommand = stormHome + "/bin/worker-launcher";
}
List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
try {
shExec.execute();
LOG.debug("output: {}", shExec.getOutput());
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
LOG.debug("output: {}", shExec.getOutput());
throw new IOException("Setting blob permissions failed" +
" (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
}
}
public synchronized void handleCacheCleanup() {
LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
// need one large set of all and then clean via LRU
for (LocalizedResourceSet t : _userRsrc.values()) {
toClean.addResources(t);
LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
}
toClean.cleanup();
LOG.debug("Resource cleanup: {}", toClean);
for (LocalizedResourceSet t : _userRsrc.values()) {
if (t.getSize() == 0) {
String user = t.getUser();
LOG.debug("removing empty set: {}", user);
File userFileCacheDir = getLocalUserFileCacheDir(user);
getCacheDirForFiles(userFileCacheDir).delete();
getCacheDirForArchives(userFileCacheDir).delete();
getLocalUserFileCacheDir(user).delete();
boolean dirsRemoved = getLocalUserDir(user).delete();
// to catch race with update thread
if (dirsRemoved) {
_userRsrc.remove(user);
}
}
}
}
}