blob: 836fdf3e3951828aeb119922d3b0ce26e505179a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* An implementation of {@link AsyncChecker} that skips checking recently
* checked objects. It will enforce at least minMsBetweenChecks
* milliseconds between two successive checks of any one object.
*
* It is assumed that the total number of Checkable objects in the system
* is small, (not more than a few dozen) since the checker uses O(Checkables)
* storage and also potentially O(Checkables) threads.
*
* minMsBetweenChecks should be configured reasonably
* by the caller to avoid spinning up too many threads frequently.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
public static final Logger LOG =
LoggerFactory.getLogger(ThrottledAsyncChecker.class);
private final Timer timer;
/**
* The ExecutorService used to schedule asynchronous checks.
*/
private final ListeningExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
/**
* The minimum gap in milliseconds between two successive checks
* of the same object. This is the throttle.
*/
private final long minMsBetweenChecks;
private final long diskCheckTimeout;
/**
* Map of checks that are currently in progress. Protected by the object
* lock.
*/
private final Map<Checkable, ListenableFuture<V>> checksInProgress;
/**
* Maps Checkable objects to a future that can be used to retrieve
* the results of the operation.
* Protected by the object lock.
*/
private final Map<Checkable, ThrottledAsyncChecker.LastCheckResult<V>>
completedChecks;
public ThrottledAsyncChecker(final Timer timer,
final long minMsBetweenChecks,
final long diskCheckTimeout,
final ExecutorService executorService) {
this.timer = timer;
this.minMsBetweenChecks = minMsBetweenChecks;
this.diskCheckTimeout = diskCheckTimeout;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.checksInProgress = new HashMap<>();
this.completedChecks = new WeakHashMap<>();
if (this.diskCheckTimeout > 0) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
ScheduledThreadPoolExecutor(1);
this.scheduledExecutorService = MoreExecutors
.getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
} else {
this.scheduledExecutorService = null;
}
}
/**
* See {@link AsyncChecker#schedule}
*
* If the object has been checked recently then the check will
* be skipped. Multiple concurrent checks for the same object
* will receive the same Future.
*/
@Override
public Optional<ListenableFuture<V>> schedule(
Checkable<K, V> target, K context) {
if (checksInProgress.containsKey(target)) {
return Optional.empty();
}
if (completedChecks.containsKey(target)) {
final ThrottledAsyncChecker.LastCheckResult<V> result =
completedChecks.get(target);
final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
if (msSinceLastCheck < minMsBetweenChecks) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped checking {}. Time since last check {}ms " +
"is less than the min gap {}ms.",
target, msSinceLastCheck, minMsBetweenChecks);
}
return Optional.empty();
}
}
LOG.info("Scheduling a check for {}", target);
final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
() -> target.check(context));
final ListenableFuture<V> lf;
if (diskCheckTimeout > 0) {
lf = TimeoutFuture
.create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS,
scheduledExecutorService);
} else {
lf = lfWithoutTimeout;
}
checksInProgress.put(target, lf);
addResultCachingCallback(target, lf);
return Optional.of(lf);
}
/**
* Register a callback to cache the result of a check.
* @param target
* @param lf
*/
private void addResultCachingCallback(
Checkable<K, V> target, ListenableFuture<V> lf) {
Futures.addCallback(lf, new FutureCallback<V>() {
@Override
public void onSuccess(@Nullable V result) {
synchronized (ThrottledAsyncChecker.this) {
checksInProgress.remove(target);
completedChecks.put(target, new LastCheckResult<>(
result, timer.monotonicNow()));
}
}
@Override
public void onFailure(@Nonnull Throwable t) {
synchronized (ThrottledAsyncChecker.this) {
checksInProgress.remove(target);
completedChecks.put(target, new LastCheckResult<>(
t, timer.monotonicNow()));
}
}
});
}
/**
* {@inheritDoc}.
*
* The results of in-progress checks are not useful during shutdown,
* so we optimize for faster shutdown by interrupt all actively
* executing checks.
*/
@Override
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(timeout, timeUnit);
}
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
/**
* Status of running a check. It can either be a result or an
* exception, depending on whether the check completed or threw.
*/
private static final class LastCheckResult<V> {
/**
* Timestamp at which the check completed.
*/
private final long completedAt;
/**
* Result of running the check if it completed. null if it threw.
*/
@Nullable
private final V result;
/**
* Exception thrown by the check. null if it returned a result.
*/
private final Throwable exception; // null on success.
/**
* Initialize with a result.
* @param result
*/
private LastCheckResult(V result, long completedAt) {
this.result = result;
this.exception = null;
this.completedAt = completedAt;
}
/**
* Initialize with an exception.
* @param completedAt
* @param t
*/
private LastCheckResult(Throwable t, long completedAt) {
this.result = null;
this.exception = t;
this.completedAt = completedAt;
}
}
}