blob: 88bb94451cd0d29e02da1ae5e539dc0f4a1da69f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.utils;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import io.netty.channel.ChannelFuture;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** Functions for waiting on some events to happen while reporting progress */
public class ProgressableUtils {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(ProgressableUtils.class);
/** Msecs to refresh the progress meter (one minute) */
private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
/**
* When getting results with many threads, how many milliseconds to wait
* on each when looping through them
*/
private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
/** Do not instantiate. */
private ProgressableUtils() {
}
/**
* Wait for executor tasks to terminate, while periodically reporting
* progress.
*
* @param executor Executor which we are waiting for
* @param progressable Progressable for reporting progress (Job context)
* @param msecsPeriod How often to report progress
*/
public static void awaitExecutorTermination(ExecutorService executor,
Progressable progressable, int msecsPeriod) {
waitForever(new ExecutorServiceWaitable(executor), progressable,
msecsPeriod);
}
/**
* Wait for executor tasks to terminate, while periodically reporting
* progress.
*
* @param executor Executor which we are waiting for
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitExecutorTermination(ExecutorService executor,
Progressable progressable) {
waitForever(new ExecutorServiceWaitable(executor), progressable);
}
/**
* Wait for executorgroup to terminate, while periodically reporting progress
*
* @param group ExecutorGroup whose termination we are awaiting
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitTerminationFuture(EventExecutorGroup group,
Progressable progressable) {
waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
}
/**
* Wait for the result of the future to be ready, while periodically
* reporting progress.
*
* @param <T> Type of the return value of the future
* @param future Future
* @param progressable Progressable for reporting progress (Job context)
* @return Computed result of the future.
*/
public static <T> T getFutureResult(Future<T> future,
Progressable progressable) {
return waitForever(new FutureWaitable<T>(future), progressable);
}
/**
* Wait for {@link ChannelGroupFuture} to finish, while periodically
* reporting progress.
*
* @param future ChannelGroupFuture
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitChannelGroupFuture(ChannelGroupFuture future,
Progressable progressable) {
waitForever(new ChannelGroupFutureWaitable(future), progressable);
}
/**
* Wait for {@link ChannelFuture} to finish, while periodically
* reporting progress.
*
* @param future ChannelFuture
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitChannelFuture(ChannelFuture future,
Progressable progressable) {
waitForever(new ChannelFutureWaitable(future), progressable);
}
/**
* Wait to acquire enough permits from {@link Semaphore}, while periodically
* reporting progress.
*
* @param semaphore Semaphore
* @param permits How many permits to acquire
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitSemaphorePermits(final Semaphore semaphore,
int permits, Progressable progressable) {
while (true) {
waitForever(new SemaphoreWaitable(semaphore, permits), progressable);
// Verify permits were not taken by another thread,
// if they were keep looping
if (semaphore.tryAcquire(permits)) {
return;
}
}
}
/**
* Wait forever for waitable to finish. Periodically reports progress.
*
* @param waitable Waitable which we wait for
* @param progressable Progressable for reporting progress (Job context)
* @param <T> Result type
* @return Result of waitable
*/
private static <T> T waitForever(Waitable<T> waitable,
Progressable progressable) {
return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
}
/**
* Wait forever for waitable to finish. Periodically reports progress.
*
* @param waitable Waitable which we wait for
* @param progressable Progressable for reporting progress (Job context)
* @param msecsPeriod How often to report progress
* @param <T> Result type
* @return Result of waitable
*/
private static <T> T waitForever(Waitable<T> waitable,
Progressable progressable, int msecsPeriod) {
while (true) {
waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
if (waitable.isFinished()) {
try {
return waitable.getResult();
} catch (ExecutionException e) {
throw new IllegalStateException("waitForever: " +
"ExecutionException occurred while waiting for " + waitable, e);
} catch (InterruptedException e) {
throw new IllegalStateException("waitForever: " +
"InterruptedException occurred while waiting for " + waitable, e);
}
}
}
}
/**
* Wait for desired number of milliseconds for waitable to finish.
* Periodically reports progress.
*
* @param waitable Waitable which we wait for
* @param progressable Progressable for reporting progress (Job context)
* @param msecs Number of milliseconds to wait for
* @param msecsPeriod How often to report progress
* @param <T> Result type
* @return Result of waitable
*/
private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
int msecs, int msecsPeriod) {
long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
int currentWaitMsecs;
while (true) {
progressable.progress();
currentWaitMsecs = Math.min(msecs, msecsPeriod);
try {
waitable.waitFor(currentWaitMsecs);
if (waitable.isFinished()) {
return waitable.getResult();
}
} catch (InterruptedException e) {
throw new IllegalStateException("waitFor: " +
"InterruptedException occurred while waiting for " + waitable, e);
} catch (ExecutionException e) {
throw new IllegalStateException("waitFor: " +
"ExecutionException occurred while waiting for " + waitable, e);
}
if (LOG.isInfoEnabled()) {
LOG.info("waitFor: Waiting for " + waitable);
}
if (System.currentTimeMillis() >= timeoutTimeMsecs) {
return waitable.getTimeoutResult();
}
msecs = Math.max(0, msecs - currentWaitMsecs);
}
}
/**
* Create {#link numThreads} callables from {#link callableFactory},
* execute them and gather results.
*
* @param callableFactory Factory for Callables
* @param numThreads Number of threads to use
* @param threadNameFormat Format for thread name
* @param progressable Progressable for reporting progress
* @param <R> Type of Callable's results
* @return List of results from Callables
*/
public static <R> List<R> getResultsWithNCallables(
CallableFactory<R> callableFactory, int numThreads,
String threadNameFormat, Progressable progressable) {
ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
ThreadUtils.createThreadFactory(threadNameFormat));
HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
for (int i = 0; i < numThreads; i++) {
Callable<R> callable = callableFactory.newCallable(i);
Future<R> future = executorService.submit(
new LogStacktraceCallable<R>(callable));
futures.put(i, future);
}
executorService.shutdown();
List<R> futureResults =
new ArrayList<>(Collections.<R>nCopies(numThreads, null));
// Loop through the futures until all are finished
// We do this in order to get any exceptions from the futures early
while (!futures.isEmpty()) {
Iterator<Map.Entry<Integer, Future<R>>> iterator =
futures.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, Future<R>> entry = iterator.next();
R result;
try {
// Try to get result from the future
result = entry.getValue().get(
MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Exception occurred", e);
} catch (TimeoutException e) {
// If result is not ready yet just keep waiting
continue;
}
// Result is ready, put it to final results
futureResults.set(entry.getKey(), result);
// Remove current future since we are done with it
iterator.remove();
}
progressable.progress();
}
return futureResults;
}
/**
* Interface for waiting on a result from some operation.
*
* @param <T> Result type.
*/
private interface Waitable<T> {
/**
* Wait for desired number of milliseconds for waitable to finish.
*
* @param msecs Number of milliseconds to wait.
*/
void waitFor(int msecs) throws InterruptedException, ExecutionException;
/**
* Check if waitable is finished.
*
* @return True iff waitable finished.
*/
boolean isFinished();
/**
* Get result of waitable. Call after isFinished() returns true.
*
* @return Result of waitable.
*/
T getResult() throws ExecutionException, InterruptedException;
/**
* Get the result which we want to return in case of timeout.
*
* @return Timeout result.
*/
T getTimeoutResult();
}
/**
* abstract class for waitables which don't have the result.
*/
private abstract static class WaitableWithoutResult
implements Waitable<Void> {
@Override
public Void getResult() throws ExecutionException, InterruptedException {
return null;
}
@Override
public Void getTimeoutResult() {
return null;
}
}
/**
* {@link Waitable} for waiting on a result of a {@link Future}.
*
* @param <T> Future result type
*/
private static class FutureWaitable<T> implements Waitable<T> {
/** Future which we want to wait for */
private final Future<T> future;
/**
* Constructor
*
* @param future Future which we want to wait for
*/
public FutureWaitable(Future<T> future) {
this.future = future;
}
@Override
public void waitFor(int msecs) throws InterruptedException,
ExecutionException {
try {
future.get(msecs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (LOG.isInfoEnabled()) {
LOG.info("waitFor: Future result not ready yet " + future);
}
}
}
@Override
public boolean isFinished() {
return future.isDone();
}
@Override
public T getResult() throws ExecutionException, InterruptedException {
return future.get();
}
@Override
public T getTimeoutResult() {
return null;
}
}
/**
* {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
*/
private static class ExecutorServiceWaitable extends WaitableWithoutResult {
/** ExecutorService which we want to wait for */
private final ExecutorService executorService;
/**
* Constructor
*
* @param executorService ExecutorService which we want to wait for
*/
public ExecutorServiceWaitable(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void waitFor(int msecs) throws InterruptedException {
executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
}
@Override
public boolean isFinished() {
return executorService.isTerminated();
}
}
/**
* {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
* terminate.
*/
private static class ChannelGroupFutureWaitable extends
WaitableWithoutResult {
/** ChannelGroupFuture which we want to wait for */
private final ChannelGroupFuture future;
/**
* Constructor
*
* @param future ChannelGroupFuture which we want to wait for
*/
public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
this.future = future;
}
@Override
public void waitFor(int msecs) throws InterruptedException {
future.await(msecs, TimeUnit.MILLISECONDS);
}
@Override
public boolean isFinished() {
return future.isDone();
}
}
/**
* {@link Waitable} for waiting on a {@link ChannelFuture} to
* terminate.
*/
private static class ChannelFutureWaitable extends WaitableWithoutResult {
/** ChannelGroupFuture which we want to wait for */
private final ChannelFuture future;
/**
* Constructor
*
* @param future ChannelFuture which we want to wait for
*/
public ChannelFutureWaitable(ChannelFuture future) {
this.future = future;
}
@Override
public void waitFor(int msecs) throws InterruptedException {
future.await(msecs, TimeUnit.MILLISECONDS);
}
@Override
public boolean isFinished() {
return future.isDone();
}
}
/**
* {@link Waitable} for waiting on required number of permits in a
* {@link Semaphore} to become available.
*/
private static class SemaphoreWaitable extends WaitableWithoutResult {
/** Semaphore to wait on */
private final Semaphore semaphore;
/** How many permits to wait on */
private final int permits;
/**
* Constructor
*
* @param semaphore Semaphore to wait on
* @param permits How many permits to wait on
*/
public SemaphoreWaitable(Semaphore semaphore, int permits) {
this.semaphore = semaphore;
this.permits = permits;
}
@Override
public void waitFor(int msecs) throws InterruptedException {
boolean acquired =
semaphore.tryAcquire(permits, msecs, TimeUnit.MILLISECONDS);
// Return permits if we managed to acquire them
if (acquired) {
semaphore.release(permits);
}
}
@Override
public boolean isFinished() {
return semaphore.availablePermits() >= permits;
}
}
}