blob: 92476d77ddb446308877d684380f307641ed4450 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Interface for class that can tell estimate much space
* is used in a directory.
* <p>
* The implementor is fee to cache space used. As such there
* are methods to update the cached value with any known changes.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
static final Logger LOG = LoggerFactory.getLogger(CachingGetSpaceUsed.class);
protected final AtomicLong used = new AtomicLong();
private final AtomicBoolean running = new AtomicBoolean(true);
private final long refreshInterval;
private final long jitter;
private final String dirPath;
private Thread refreshUsed;
/**
* This is the constructor used by the builder.
* All overriding classes should implement this.
*/
public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
throws IOException {
this(builder.getPath(),
builder.getInterval(),
builder.getJitter(),
builder.getInitialUsed());
}
/**
* Keeps track of disk usage.
*
* @param path the path to check disk usage in
* @param interval refresh the disk usage at this interval
* @param jitter randomize the refresh interval timing by this amount;
* the actual interval will be chosen uniformly between
* {@code interval-jitter} and {@code interval+jitter}
* @param initialUsed use this value until next refresh
* @throws IOException if we fail to refresh the disk usage
*/
CachingGetSpaceUsed(File path,
long interval,
long jitter,
long initialUsed) throws IOException {
this.dirPath = path.getCanonicalPath();
this.refreshInterval = interval;
this.jitter = jitter;
this.used.set(initialUsed);
}
void init() {
if (used.get() < 0) {
used.set(0);
refresh();
}
if (refreshInterval > 0) {
refreshUsed = new Thread(new RefreshThread(this),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
} else {
running.set(false);
refreshUsed = null;
}
}
protected abstract void refresh();
/**
* @return an estimate of space used in the directory path.
*/
@Override public long getUsed() throws IOException {
return Math.max(used.get(), 0);
}
/**
* @return The directory path being monitored.
*/
public String getDirPath() {
return dirPath;
}
/**
* Increment the cached value of used space.
*/
public void incDfsUsed(long value) {
used.addAndGet(value);
}
/**
* Is the background thread running.
*/
boolean running() {
return running.get();
}
/**
* How long in between runs of the background refresh.
*/
long getRefreshInterval() {
return refreshInterval;
}
/**
* Reset the current used data amount. This should be called
* when the cached value is re-computed.
*
* @param usedValue new value that should be the disk usage.
*/
protected void setUsed(long usedValue) {
this.used.set(usedValue);
}
@Override
public void close() throws IOException {
running.set(false);
if (refreshUsed != null) {
refreshUsed.interrupt();
}
}
private static final class RefreshThread implements Runnable {
final CachingGetSpaceUsed spaceUsed;
RefreshThread(CachingGetSpaceUsed spaceUsed) {
this.spaceUsed = spaceUsed;
}
@Override
public void run() {
while (spaceUsed.running()) {
try {
long refreshInterval = spaceUsed.refreshInterval;
if (spaceUsed.jitter > 0) {
long jitter = spaceUsed.jitter;
// add/subtract the jitter.
refreshInterval +=
ThreadLocalRandom.current()
.nextLong(-jitter, jitter);
}
// Make sure that after the jitter we didn't end up at 0.
refreshInterval = Math.max(refreshInterval, 1);
Thread.sleep(refreshInterval);
// update the used variable
spaceUsed.refresh();
} catch (InterruptedException e) {
LOG.warn("Thread Interrupted waiting to refresh disk information: "
+ e.getMessage());
Thread.currentThread().interrupt();
}
}
}
}
}