blob: c9e6d0b78ac11de22c7f2a20fba0e19881f94293 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
/**
* Utility class for parallel execution, takes closures for the various
* actions.
* There is no retry logic: it is expected to be handled by the closures.
* From {@code org.apache.hadoop.fs.s3a.commit.Tasks} which came from
* the Netflix committer patch.
* Apache Iceberg has its own version of this, with a common ancestor
* at some point in its history.
* A key difference with this class is that the iterator is always,
* internally, an {@link RemoteIterator}.
* This is to allow tasks to be scheduled while incremental operations
* such as paged directory listings are still collecting in results.
*
* While awaiting completion, this thread spins and sleeps a time of
* {@link #SLEEP_INTERVAL_AWAITING_COMPLETION}, which, being a
* busy-wait, is inefficient.
* There's an implicit assumption that remote IO is being performed, and
* so this is not impacting throughput/performance.
*
* History:
* This class came with the Netflix contributions to the S3A committers
* in HADOOP-13786.
* It was moved into hadoop-common for use in the manifest committer and
* anywhere else it is needed, and renamed in the process as
* "Tasks" has too many meanings in the hadoop source.
* The iterator was then changed from a normal java iterable
* to a hadoop {@link org.apache.hadoop.fs.RemoteIterator}.
* This allows a task pool to be supplied with incremental listings
* from object stores, scheduling work as pages of listing
* results come in, rather than blocking until the entire
* directory/directory tree etc has been enumerated.
*
* There is a variant of this in Apache Iceberg in
* {@code org.apache.iceberg.util.Tasks}
* That is not derived from any version in the hadoop codebase, it
* just shares a common ancestor somewhere in the Netflix codebase.
* It is the more sophisticated version.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class TaskPool {
private static final Logger LOG =
LoggerFactory.getLogger(TaskPool.class);
/**
* Interval in milliseconds to await completion.
*/
private static final int SLEEP_INTERVAL_AWAITING_COMPLETION = 10;
private TaskPool() {
}
/**
* Callback invoked to process an item.
* @param <I> item type being processed
* @param <E> exception class which may be raised
*/
@FunctionalInterface
public interface Task<I, E extends Exception> {
void run(I item) throws E;
}
/**
* Callback invoked on a failure.
* @param <I> item type being processed
* @param <E> exception class which may be raised
*/
@FunctionalInterface
public interface FailureTask<I, E extends Exception> {
/**
* process a failure.
* @param item item the task is processing
* @param exception the exception which was raised.
* @throws E Exception of type E
*/
void run(I item, Exception exception) throws E;
}
/**
* Builder for task execution.
* @param <I> item type
*/
public static class Builder<I> {
private final RemoteIterator<I> items;
private Submitter service = null;
private FailureTask<I, ?> onFailure = null;
private boolean stopOnFailure = false;
private boolean suppressExceptions = false;
private Task<I, ?> revertTask = null;
private boolean stopRevertsOnFailure = false;
private Task<I, ?> abortTask = null;
private boolean stopAbortsOnFailure = false;
private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION;
/**
* IOStatisticsContext to switch to in all threads
* taking part in the commit operation.
* This ensures that the IOStatistics collected in the
* worker threads will be aggregated into the total statistics
* of the thread calling the committer commit/abort methods.
*/
private IOStatisticsContext ioStatisticsContext = null;
/**
* Create the builder.
* @param items items to process
*/
Builder(RemoteIterator<I> items) {
this.items = requireNonNull(items, "items");
}
/**
* Create the builder.
* @param items items to process
*/
Builder(Iterable<I> items) {
this(remoteIteratorFromIterable(items));
}
/**
* Declare executor service: if null, the tasks are executed in a single
* thread.
* @param submitter service to schedule tasks with.
* @return this builder.
*/
public Builder<I> executeWith(@Nullable Submitter submitter) {
this.service = submitter;
return this;
}
/**
* Task to invoke on failure.
* @param task task
* @return the builder
*/
public Builder<I> onFailure(FailureTask<I, ?> task) {
this.onFailure = task;
return this;
}
public Builder<I> stopOnFailure() {
this.stopOnFailure = true;
return this;
}
/**
* Suppress exceptions from tasks.
* RemoteIterator exceptions are not suppressable.
* @return the builder.
*/
public Builder<I> suppressExceptions() {
return suppressExceptions(true);
}
/**
* Suppress exceptions from tasks.
* RemoteIterator exceptions are not suppressable.
* @param suppress new value
* @return the builder.
*/
public Builder<I> suppressExceptions(boolean suppress) {
this.suppressExceptions = suppress;
return this;
}
/**
* Task to revert with after another task failed.
* @param task task to execute
* @return the builder
*/
public Builder<I> revertWith(Task<I, ?> task) {
this.revertTask = task;
return this;
}
/**
* Stop trying to revert if one operation fails.
* @return the builder
*/
public Builder<I> stopRevertsOnFailure() {
this.stopRevertsOnFailure = true;
return this;
}
/**
* Task to abort with after another task failed.
* @param task task to execute
* @return the builder
*/
public Builder<I> abortWith(Task<I, ?> task) {
this.abortTask = task;
return this;
}
/**
* Stop trying to abort if one operation fails.
* @return the builder
*/
public Builder<I> stopAbortsOnFailure() {
this.stopAbortsOnFailure = true;
return this;
}
/**
* Set the sleep interval.
* @param value new value
* @return the builder
*/
public Builder<I> sleepInterval(final int value) {
sleepInterval = value;
return this;
}
/**
* Execute the task across the data.
* @param task task to execute
* @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully
* @throws E any exception raised.
* @throws IOException IOExceptions raised by remote iterator or in execution.
*/
public <E extends Exception> boolean run(Task<I, E> task) throws E, IOException {
requireNonNull(items, "items");
if (!items.hasNext()) {
// if there are no items, return without worrying about
// execution pools, errors etc.
return true;
}
if (service != null) {
// thread pool, so run in parallel
return runParallel(task);
} else {
// single threaded execution.
return runSingleThreaded(task);
}
}
/**
* Single threaded execution.
* @param task task to execute
* @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully
* @throws E any exception raised.
* @throws IOException IOExceptions raised by remote iterator or in execution.
*/
private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
throws E, IOException {
List<I> succeeded = new ArrayList<>();
List<Exception> exceptions = new ArrayList<>();
RemoteIterator<I> iterator = items;
boolean threw = true;
try {
while (iterator.hasNext()) {
I item = iterator.next();
try {
task.run(item);
succeeded.add(item);
} catch (Exception e) {
exceptions.add(e);
if (onFailure != null) {
try {
onFailure.run(item, e);
} catch (Exception failException) {
LOG.error("Failed to clean up on failure", e);
// keep going
}
}
if (stopOnFailure) {
break;
}
}
}
threw = false;
} catch (IOException iteratorIOE) {
// an IOE is reaised here during iteration
LOG.debug("IOException when iterating through {}", iterator, iteratorIOE);
throw iteratorIOE;
} finally {
// threw handles exceptions that were *not* caught by the catch block,
// and exceptions that were caught and possibly handled by onFailure
// are kept in exceptions.
if (threw || !exceptions.isEmpty()) {
if (revertTask != null) {
boolean failed = false;
for (I item : succeeded) {
try {
revertTask.run(item);
} catch (Exception e) {
LOG.error("Failed to revert task", e);
failed = true;
// keep going
}
if (stopRevertsOnFailure && failed) {
break;
}
}
}
if (abortTask != null) {
boolean failed = false;
while (iterator.hasNext()) {
try {
abortTask.run(iterator.next());
} catch (Exception e) {
failed = true;
LOG.error("Failed to abort task", e);
// keep going
}
if (stopAbortsOnFailure && failed) {
break;
}
}
}
}
}
if (!suppressExceptions && !exceptions.isEmpty()) {
TaskPool.<E>throwOne(exceptions);
}
return exceptions.isEmpty();
}
/**
* Parallel execution.
* All tasks run within the same IOStatisticsContext as the
* thread calling this method.
* @param task task to execute
* @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully
* @throws E any exception raised.
* @throws IOException IOExceptions raised by remote iterator or in execution.
*/
private <E extends Exception> boolean runParallel(final Task<I, E> task)
throws E, IOException {
final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
final AtomicBoolean taskFailed = new AtomicBoolean(false);
final AtomicBoolean abortFailed = new AtomicBoolean(false);
final AtomicBoolean revertFailed = new AtomicBoolean(false);
List<Future<?>> futures = new ArrayList<>();
ioStatisticsContext = IOStatisticsContext.getCurrentIOStatisticsContext();
IOException iteratorIOE = null;
final RemoteIterator<I> iterator = this.items;
try {
while (iterator.hasNext()) {
final I item = iterator.next();
// submit a task for each item that will either run or abort the task
futures.add(service.submit(() -> {
setStatisticsContext();
try {
if (!(stopOnFailure && taskFailed.get())) {
// prepare and run the task
boolean threw = true;
try {
LOG.debug("Executing task");
task.run(item);
succeeded.add(item);
LOG.debug("Task succeeded");
threw = false;
} catch (Exception e) {
taskFailed.set(true);
exceptions.add(e);
LOG.info("Task failed {}", e.toString());
LOG.debug("Task failed", e);
if (onFailure != null) {
try {
onFailure.run(item, e);
} catch (Exception failException) {
LOG.warn("Failed to clean up on failure", e);
// swallow the exception
}
}
} finally {
if (threw) {
taskFailed.set(true);
}
}
} else if (abortTask != null) {
// abort the task instead of running it
if (stopAbortsOnFailure && abortFailed.get()) {
return;
}
boolean failed = true;
try {
LOG.info("Aborting task");
abortTask.run(item);
failed = false;
} catch (Exception e) {
LOG.error("Failed to abort task", e);
// swallow the exception
} finally {
if (failed) {
abortFailed.set(true);
}
}
}
} finally {
resetStatisticsContext();
}
}));
}
} catch (IOException e) {
// iterator failure.
LOG.debug("IOException when iterating through {}", iterator, e);
iteratorIOE = e;
// mark as a task failure so all submitted tasks will halt/abort
taskFailed.set(true);
}
// let the above tasks complete (or abort)
waitFor(futures, sleepInterval);
int futureCount = futures.size();
futures.clear();
if (taskFailed.get() && revertTask != null) {
// at least one task failed, revert any that succeeded
LOG.info("Reverting all {} succeeded tasks from {} futures",
succeeded.size(), futureCount);
for (final I item : succeeded) {
futures.add(service.submit(() -> {
if (stopRevertsOnFailure && revertFailed.get()) {
return;
}
boolean failed = true;
setStatisticsContext();
try {
revertTask.run(item);
failed = false;
} catch (Exception e) {
LOG.error("Failed to revert task", e);
// swallow the exception
} finally {
if (failed) {
revertFailed.set(true);
}
resetStatisticsContext();
}
}));
}
// let the revert tasks complete
waitFor(futures, sleepInterval);
}
// give priority to execution exceptions over
// iterator exceptions.
if (!suppressExceptions && !exceptions.isEmpty()) {
// there's an exception list to build up, cast and throw.
TaskPool.<E>throwOne(exceptions);
}
// raise any iterator exception.
// this can not be suppressed.
if (iteratorIOE != null) {
throw iteratorIOE;
}
// return true if all tasks succeeded.
return !taskFailed.get();
}
/**
* Set the statistics context for this thread.
*/
private void setStatisticsContext() {
if (ioStatisticsContext != null) {
IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
}
}
/**
* Reset the statistics context if it was set earlier.
* This unbinds the current thread from any statistics
* context.
*/
private void resetStatisticsContext() {
if (ioStatisticsContext != null) {
IOStatisticsContext.setThreadIOStatisticsContext(null);
}
}
}
/**
* Wait for all the futures to complete; there's a small sleep between
* each iteration; enough to yield the CPU.
* @param futures futures.
* @param sleepInterval Interval in milliseconds to await completion.
*/
private static void waitFor(Collection<Future<?>> futures, int sleepInterval) {
int size = futures.size();
LOG.debug("Waiting for {} tasks to complete", size);
int oldNumFinished = 0;
while (true) {
int numFinished = (int) futures.stream().filter(Future::isDone).count();
if (oldNumFinished != numFinished) {
LOG.debug("Finished count -> {}/{}", numFinished, size);
oldNumFinished = numFinished;
}
if (numFinished == size) {
// all of the futures are done, stop looping
break;
} else {
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
futures.forEach(future -> future.cancel(true));
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* Create a task builder for the iterable.
* @param items item source.
* @param <I> type of result.
* @return builder.
*/
public static <I> Builder<I> foreach(Iterable<I> items) {
return new Builder<>(requireNonNull(items, "items"));
}
/**
* Create a task builder for the remote iterator.
* @param items item source.
* @param <I> type of result.
* @return builder.
*/
public static <I> Builder<I> foreach(RemoteIterator<I> items) {
return new Builder<>(items);
}
public static <I> Builder<I> foreach(I[] items) {
return new Builder<>(Arrays.asList(requireNonNull(items, "items")));
}
/**
* Throw one exception, adding the others as suppressed
* exceptions attached to the one thrown.
* This method never completes normally.
* @param exceptions collection of exceptions
* @param <E> class of exceptions
* @throws E an extracted exception.
*/
private static <E extends Exception> void throwOne(
Collection<Exception> exceptions)
throws E {
Iterator<Exception> iter = exceptions.iterator();
Exception e = iter.next();
Class<? extends Exception> exceptionClass = e.getClass();
while (iter.hasNext()) {
Exception other = iter.next();
if (!exceptionClass.isInstance(other)) {
e.addSuppressed(other);
}
}
TaskPool.<E>castAndThrow(e);
}
/**
* Raise an exception of the declared type.
* This method never completes normally.
* @param e exception
* @param <E> class of exceptions
* @throws E a recast exception.
*/
@SuppressWarnings("unchecked")
private static <E extends Exception> void castAndThrow(Exception e) throws E {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw (E) e;
}
/**
* Interface to whatever lets us submit tasks.
*/
public interface Submitter {
/**
* Submit work.
* @param task task to execute
* @return the future of the submitted task.
*/
Future<?> submit(Runnable task);
}
}