blob: 5fe767144c0f9d679d76a6605b96234d999e7df8 [file] [log] [blame]
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.concurrent;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
import static com.google.common.collect.Maps.newHashMap;
import static org.jclouds.util.Throwables2.containsThrowable;
import static org.jclouds.util.Throwables2.propagateAuthorizationOrOriginalException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
import org.jclouds.javax.annotation.Nullable;
import org.jclouds.logging.Logger;
import org.jclouds.rest.AuthorizationException;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.inject.Inject;
/**
* functions related to or replacing those in {@link com.google.common.collect.Iterables} dealing with Futures
*
* @author Adrian Cole
*/
@Beta
public class FutureIterables {
@Resource
private static Logger logger = Logger.CONSOLE;
@Inject(optional = true)
@Named(Constants.PROPERTY_MAX_RETRIES)
private static int maxRetries = 5;
@Inject(optional = true)
@Named(Constants.PROPERTY_RETRY_DELAY_START)
private static long delayStart = 50L;
@Inject(optional = true)
private static BackoffLimitedRetryHandler retryHandler = BackoffLimitedRetryHandler.INSTANCE;
public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
final Function<? super F, Future<T>> function) {
return transformParallel(fromIterable, function, org.jclouds.concurrent.MoreExecutors.sameThreadExecutor(), null);
}
public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime) {
return transformParallel(fromIterable, function, exec, maxTime, logger, "transforming");
}
public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
String logPrefix) {
return transformParallel(fromIterable, function, exec, maxTime, logger, logPrefix, retryHandler, maxRetries);
}
@SuppressWarnings("unchecked")
public static <F, T> Iterable<T> transformParallel(Iterable<F> fromIterable,
Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
String logPrefix, BackoffLimitedRetryHandler retryHandler, int maxRetries) {
Map<F, Exception> exceptions = newHashMap();
Map<F, Future<T>> responses = newHashMap();
for (int i = 0; i < maxRetries; i++) {
for (F from : fromIterable) {
responses.put(from, function.apply(from));
}
exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
if (exceptions.size() > 0 && !any(exceptions.values(), containsThrowable(AuthorizationException.class))) {
fromIterable = exceptions.keySet();
retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries,
String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions));
} else {
break;
}
}
//make sure we propagate any authorization exception so that we don't lock out accounts
if (exceptions.size() > 0)
return propagateAuthorizationOrOriginalException(new TransformParallelException((Map) responses, exceptions,
logPrefix));
return unwrap(responses.values());
}
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
@Nullable Long maxTime, final Logger logger, final String logPrefix) {
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
if (responses.size() == 0)
return errorMap;
final int total = responses.size();
final CountDownLatch doneSignal = new CountDownLatch(total);
final AtomicInteger complete = new AtomicInteger(0);
final AtomicInteger errors = new AtomicInteger(0);
final long start = System.currentTimeMillis();
for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) {
Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() {
@Override
public void run() {
try {
future.getValue().get();
complete.incrementAndGet();
} catch (Exception e) {
errors.incrementAndGet();
logException(logger, logPrefix, total, complete.get(), errors.get(), start, e);
errorMap.put(future.getKey(), e);
}
doneSignal.countDown();
}
@Override
public String toString() {
return "callGetOnFuture(" + future.getKey() + "," + future.getValue() + ")";
}
}, exec);
}
try {
if (maxTime != null)
doneSignal.await(maxTime, TimeUnit.MILLISECONDS);
else
doneSignal.await();
if (errors.get() > 0) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
RuntimeException exception = new RuntimeException(message);
logger.error(exception, message);
}
if (logger.isTraceEnabled()) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
logger.trace(message);
}
} catch (InterruptedException e) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
TimeoutException exception = new TimeoutException(message);
logger.error(exception, message);
propagate(exception);
}
return errorMap;
}
public static <T> Iterable<T> unwrap(Iterable<Future<T>> values) {
return transform(values, new Function<Future<T>, T>() {
@Override
public T apply(Future<T> from) {
try {
return from.get();
} catch (InterruptedException e) {
propagate(e);
} catch (ExecutionException e) {
propagate(e);
}
return null;
}
@Override
public String toString() {
return "callGetOnFuture()";
}
});
}
private static void logException(Logger logger, String logPrefix, int total, int complete, int errors, long start,
Exception e) {
String message = message(logPrefix, total, complete, errors, start);
logger.error(e, message);
}
private static String message(String prefix, int size, int complete, int errors, long start) {
return String.format("%s, completed: %d/%d, errors: %d, rate: %dms/op", prefix, complete, size, errors,
(long) ((System.currentTimeMillis() - start) / ((double) size)));
}
protected static boolean timeOut(long start, Long maxTime) {
return maxTime != null ? System.currentTimeMillis() < start + maxTime : false;
}
}