blob: 25d6f4d953efa12c7aa356499dd89dfcb3c445f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.localizer;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A set of resources that we can look at to see which ones we retain and which ones should be
* removed.
*/
public class LocalizedResourceRetentionSet {
public static final Logger LOG = LoggerFactory.getLogger(LocalizedResourceRetentionSet.class);
@VisibleForTesting
final SortedMap<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> noReferences;
private long currentSize;
// targetSize in Bytes
private long targetSize;
LocalizedResourceRetentionSet(long targetSize) {
this(targetSize, new LRUComparator());
}
LocalizedResourceRetentionSet(long targetSize, Comparator<? super LocallyCachedBlob> cmp) {
this(targetSize, new TreeMap<>(cmp));
}
LocalizedResourceRetentionSet(long targetSize,
SortedMap<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> retain) {
this.noReferences = retain;
this.targetSize = targetSize;
}
// for testing
protected int getSizeWithNoReferences() {
return noReferences.size();
}
/**
* Add blobs to be checked if they can be deleted.
* @param blobs a map of blob name to the blob object. The blobs in this map will be deleted from the map
* if they are deleted on disk too.
*/
public void addResources(ConcurrentMap<String, ? extends LocallyCachedBlob> blobs) {
for (LocallyCachedBlob b : blobs.values()) {
currentSize += b.getSizeOnDisk();
if (b.isUsed()) {
LOG.debug("NOT going to clean up {}, {} depends on it", b.getKey(), b.getDependencies());
// always retain resources in use
continue;
}
LOG.debug("Possibly going to clean up {} ts {} size {}", b.getKey(), b.getLastUsed(), b.getSizeOnDisk());
noReferences.put(b, blobs);
}
}
/**
* Actually cleanup the blobs to try and get below the target cache size.
* @param store the blobs store client used to check if the blob has been deleted from the blobstore. If it has, the blob will be
* deleted even if the cache is not over the target size.
*/
public void cleanup(ClientBlobStore store) {
LOG.debug("cleanup target size: {} current size is: {}", targetSize, currentSize);
long bytesOver = currentSize - targetSize;
if (bytesOver <= 0) { // no need to query remote files
return;
}
//First delete everything that no longer exists...
for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>>> i = noReferences.entrySet().iterator();
i.hasNext(); ) {
Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next();
LocallyCachedBlob resource = rsrc.getKey();
try {
if (!store.isRemoteBlobExists(resource.getKey())) {
//The key was removed so we should delete it too.
Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue();
if (removeBlob(resource, set)) {
bytesOver -= resource.getSizeOnDisk();
LOG.info("Deleted blob: {} (REMOVED FROM CLUSTER).", resource.getKey());
i.remove();
}
}
} catch (AuthorizationException e) {
//Ignored
}
}
for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>>> i = noReferences.entrySet().iterator();
bytesOver > 0 && i.hasNext(); ) {
Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next();
LocallyCachedBlob resource = rsrc.getKey();
Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue();
if (removeBlob(resource, set)) {
bytesOver -= resource.getSizeOnDisk();
LOG.info("Deleted blob: {} (OVER SIZE LIMIT).", resource.getKey());
i.remove();
}
}
}
private boolean removeBlob(LocallyCachedBlob blob, Map<String, ? extends LocallyCachedBlob> blobs) {
synchronized (blob) {
if (!blob.isUsed()) {
try {
blob.completelyRemove();
} catch (Exception e) {
LOG.warn("Tried to remove {} but failed with", blob, e);
}
blobs.remove(blob.getKey());
return true;
}
return false;
}
}
@Override
public String toString() {
return "Cache: " + currentSize;
}
static class LRUComparator implements Comparator<LocallyCachedBlob> {
@Override
public int compare(LocallyCachedBlob r1, LocallyCachedBlob r2) {
long ret = r1.getLastUsed() - r2.getLastUsed();
if (0 == ret) {
return System.identityHashCode(r1) - System.identityHashCode(r2);
}
return ret > 0 ? 1 : -1;
}
}
}