blob: 8c3b5cbf12d98c7af4c40dbbbbf2088e02e73aea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.spi.fs;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* A {@link PreferredVolumeChooser} that takes remaining HDFS space into account when making a
* volume choice rather than a simpler round robin. The list of volumes to use can be limited using
* the same properties as {@link PreferredVolumeChooser}
*
* @since 2.1.0
*/
public class SpaceAwareVolumeChooser extends PreferredVolumeChooser {
public static final String RECOMPUTE_INTERVAL = "spaceaware.volume.chooser.recompute.interval";
// Default time to wait in ms. Defaults to 5 min
private long defaultComputationCacheDuration = 300000;
private LoadingCache<Set<String>,WeightedRandomCollection> choiceCache = null;
private static final Logger log = LoggerFactory.getLogger(SpaceAwareVolumeChooser.class);
private Configuration conf = new Configuration();
protected double getFreeSpace(String uri) throws IOException {
FileSystem pathFs = new Path(uri).getFileSystem(conf);
FsStatus optionStatus = pathFs.getStatus();
return ((double) optionStatus.getRemaining() / optionStatus.getCapacity());
}
@Override
public String choose(VolumeChooserEnvironment env, Set<String> options) {
try {
return getCache(env).get(getPreferredVolumes(env, options)).next();
} catch (ExecutionException e) {
throw new IllegalStateException("Execution exception when attempting to cache choice", e);
}
}
private synchronized LoadingCache<Set<String>,WeightedRandomCollection>
getCache(VolumeChooserEnvironment env) {
if (choiceCache == null) {
String propertyValue = env.getServiceEnv().getConfiguration().getCustom(RECOMPUTE_INTERVAL);
long computationCacheDuration = StringUtils.isNotBlank(propertyValue)
? Long.parseLong(propertyValue) : defaultComputationCacheDuration;
choiceCache = CacheBuilder.newBuilder()
.expireAfterWrite(computationCacheDuration, TimeUnit.MILLISECONDS)
.build(new CacheLoader<>() {
@Override
public WeightedRandomCollection load(Set<String> key) {
return new WeightedRandomCollection(key, env, random);
}
});
}
return choiceCache;
}
private class WeightedRandomCollection {
private final NavigableMap<Double,String> map = new TreeMap<>();
private final Random random;
private double total = 0;
public WeightedRandomCollection(Set<String> options, VolumeChooserEnvironment env,
Random random) {
this.random = random;
if (options.size() < 1) {
throw new IllegalStateException("Options was empty! No valid volumes to choose from.");
}
// Compute percentage space available on each volume
for (String option : options) {
try {
double percentFree = getFreeSpace(option);
add(percentFree, option);
} catch (IOException e) {
log.error("Unable to get file system status for" + option, e);
}
}
if (map.size() < 1) {
throw new IllegalStateException(
"Weighted options was empty! Could indicate an issue getting file system status or "
+ "no free space on any volume");
}
}
public WeightedRandomCollection add(double weight, String result) {
if (weight <= 0) {
log.info("Weight was 0. Not adding " + result);
return this;
}
total += weight;
map.put(total, result);
return this;
}
public String next() {
double value = random.nextDouble() * total;
return map.higherEntry(value).getValue();
}
}
}