blob: 43221de36acac0a96a9bfe84763801ab7ea89d86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.processor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
* Manages execution of {@link FrameProcessor} in an {@link ExecutorService}.
*
* If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a
* same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks.
*/
public class FrameProcessorExecutor
{
private static final Logger log = new Logger(FrameProcessorExecutor.class);
private final ListeningExecutorService exec;
private final Object lock = new Object();
// Futures that are active and therefore cancelable.
// Does not include return futures: those are in cancelableReturnFutures.
@GuardedBy("lock")
private final SetMultimap<String, ListenableFuture<?>> cancelableFutures =
Multimaps.newSetMultimap(new HashMap<>(), Sets::newIdentityHashSet);
// Return futures that are active and therefore cancelable.
@GuardedBy("lock")
private final SetMultimap<String, ListenableFuture<?>> cancelableReturnFutures =
Multimaps.newSetMultimap(new HashMap<>(), Sets::newIdentityHashSet);
// Processors that are active and therefore cancelable. They may not currently be running on an actual thread.
@GuardedBy("lock")
private final SetMultimap<String, FrameProcessor<?>> cancelableProcessors =
Multimaps.newSetMultimap(new HashMap<>(), Sets::newIdentityHashSet);
// Processors that are currently running on a thread.
// The "lock" is notified when processors are removed from runningProcessors.
@GuardedBy("lock")
private final Map<FrameProcessor<?>, Thread> runningProcessors = new IdentityHashMap<>();
public FrameProcessorExecutor(final ListeningExecutorService exec)
{
this.exec = exec;
}
/**
* Returns the underlying executor service used by this executor.
*/
public ListeningExecutorService getExecutorService()
{
return exec;
}
/**
* Runs a processor until it is done, and returns a future that resolves when execution is complete.
*
* If "cancellationId" is provided, it can be used with the {@link #cancel(String)} method to cancel all processors
* currently running with the same cancellationId.
*/
public <T> ListenableFuture<T> runFully(final FrameProcessor<T> processor, @Nullable final String cancellationId)
{
final List<ReadableFrameChannel> inputChannels = processor.inputChannels();
final List<WritableFrameChannel> outputChannels = processor.outputChannels();
final SettableFuture<T> finished = registerCancelableFuture(SettableFuture.create(), true, cancellationId);
class ExecutorRunnable implements Runnable
{
private final AwaitAnyWidget awaitAnyWidget = new AwaitAnyWidget(inputChannels);
@Override
public void run()
{
try {
final List<ListenableFuture<?>> allWritabilityFutures = gatherWritabilityFutures();
final List<ListenableFuture<?>> writabilityFuturesToWaitFor =
allWritabilityFutures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
logProcessorStatusString(processor, finished, allWritabilityFutures);
if (!writabilityFuturesToWaitFor.isEmpty()) {
runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor));
return;
}
final Optional<ReturnOrAwait<T>> maybeResult = runProcessorNow();
if (!maybeResult.isPresent()) {
// Processor exited abnormally. Just exit; cleanup would have been handled elsewhere.
return;
}
final ReturnOrAwait<T> result = maybeResult.get();
logProcessorStatusString(processor, finished, null);
if (result.isReturn()) {
succeed(result.value());
} else {
// Don't retain a reference to this set: it may be mutated the next time the processor runs.
final IntSet await = result.awaitSet();
if (await.isEmpty()) {
exec.submit(ExecutorRunnable.this);
} else if (result.isAwaitAll() || await.size() == 1) {
final List<ListenableFuture<?>> readabilityFutures = new ArrayList<>();
for (final int channelNumber : await) {
final ReadableFrameChannel channel = inputChannels.get(channelNumber);
if (!channel.isFinished() && !channel.canRead()) {
readabilityFutures.add(channel.readabilityFuture());
}
}
if (readabilityFutures.isEmpty()) {
exec.submit(ExecutorRunnable.this);
} else {
runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures));
}
} else {
// Await any.
runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await));
}
}
}
catch (Throwable e) {
fail(e);
}
}
private List<ListenableFuture<?>> gatherWritabilityFutures()
{
final List<ListenableFuture<?>> futures = new ArrayList<>();
for (final WritableFrameChannel channel : outputChannels) {
futures.add(channel.writabilityFuture());
}
return futures;
}
/**
* Executes {@link FrameProcessor#runIncrementally} on the currently-readable inputs, while respecting
* cancellation. Returns an empty Optional if the processor exited abnormally (canceled or failed). Returns a
* present Optional if the processor ran successfully. Throws an exception if the processor does.
*/
private Optional<ReturnOrAwait<T>> runProcessorNow()
{
final IntSet readableInputs = new IntOpenHashSet(inputChannels.size());
for (int i = 0; i < inputChannels.size(); i++) {
final ReadableFrameChannel channel = inputChannels.get(i);
if (channel.isFinished() || channel.canRead()) {
readableInputs.add(i);
}
}
if (cancellationId != null) {
// After this synchronized block, our thread may be interrupted by cancellations, because "cancel"
// checks "runningProcessors".
synchronized (lock) {
if (cancelableProcessors.containsEntry(cancellationId, processor)) {
runningProcessors.put(processor, Thread.currentThread());
} else {
// Processor has been canceled. We don't need to handle cleanup, because someone else did it.
return Optional.empty();
}
}
}
boolean canceled = false;
Either<Throwable, ReturnOrAwait<T>> retVal;
try {
if (Thread.interrupted()) {
throw new InterruptedException();
}
retVal = Either.value(processor.runIncrementally(readableInputs));
}
catch (Throwable e) {
// Catch InterruptedException too: interrupt was meant for the processor, not us.
retVal = Either.error(e);
}
finally {
if (cancellationId != null) {
// After this synchronized block, our thread will no longer be interrupted by cancellations,
// because "cancel" checks "runningProcessors".
synchronized (lock) {
if (Thread.interrupted()) {
// ignore: interrupt was meant for the processor, but came after the processor already exited.
}
runningProcessors.remove(processor);
lock.notifyAll();
if (!cancelableProcessors.containsEntry(cancellationId, processor)) {
// Processor has been canceled by one of the "cancel" methods. They will handle cleanup.
canceled = true;
}
}
}
}
if (canceled) {
return Optional.empty();
} else {
return Optional.of(retVal.valueOrThrow());
}
}
private <V> void runProcessorAfterFutureResolves(final ListenableFuture<V> future)
{
final ListenableFuture<V> cancelableFuture = registerCancelableFuture(future, false, cancellationId);
Futures.addCallback(
cancelableFuture,
new FutureCallback<V>()
{
@Override
public void onSuccess(final V ignored)
{
try {
exec.submit(ExecutorRunnable.this);
}
catch (Throwable e) {
fail(e);
}
}
@Override
public void onFailure(Throwable t)
{
// Ignore cancellation.
if (!cancelableFuture.isCancelled()) {
fail(t);
}
}
}
);
}
/**
* Called when a processor succeeds.
*
* Runs the cleanup routine and sets the finished future to a particular value. If cleanup fails, sets the
* finished future to an error.
*/
private void succeed(T value)
{
try {
doProcessorCleanup();
}
catch (Throwable e) {
finished.setException(e);
return;
}
finished.set(value);
}
/**
* Called when a processor fails.
*
* Cancels output channels, runs the cleanup routine, and sets the finished future to an error.
*/
private void fail(Throwable e)
{
for (final WritableFrameChannel outputChannel : outputChannels) {
try {
outputChannel.fail(e);
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
}
try {
doProcessorCleanup();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
finished.setException(e);
}
/**
* Called when a processor exits via {@link #succeed} or {@link #fail}. Not called when a processor
* is canceled.
*/
void doProcessorCleanup() throws IOException
{
final boolean doCleanup;
if (cancellationId != null) {
synchronized (lock) {
// Skip cleanup if the processor is no longer in cancelableProcessors. This means one of the "cancel"
// methods is going to do the cleanup.
doCleanup = cancelableProcessors.remove(cancellationId, processor);
}
} else {
doCleanup = true;
}
if (doCleanup) {
processor.cleanup();
}
}
}
final ExecutorRunnable runnable = new ExecutorRunnable();
finished.addListener(
() -> {
logProcessorStatusString(processor, finished, null);
// If the future was canceled, and the processor is cancelable, then cancel the processor too.
if (finished.isCancelled() && cancellationId != null) {
boolean didRemoveFromCancelableProcessors;
synchronized (lock) {
didRemoveFromCancelableProcessors = cancelableProcessors.remove(cancellationId, processor);
}
if (didRemoveFromCancelableProcessors) {
try {
cancel(Collections.singleton(processor));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
},
Execs.directExecutor()
);
logProcessorStatusString(processor, finished, null);
registerCancelableProcessor(processor, cancellationId);
exec.submit(runnable);
return finished;
}
/**
* Runs a sequence of processors and returns a future that resolves when execution is complete. Returns a value
* accumulated using the provided {@code accumulateFn}.
*
* @param processors sequence of processors to run
* @param initialResult initial value for result accumulation. If there are no processors at all, this
* is returned.
* @param accumulateFn result accumulator. Applied in a critical section, so it should be something
* that executes quickly. It gets {@code initialResult} for the initial accumulation.
* @param maxOutstandingProcessors maximum number of processors to run at once
* @param bouncer additional limiter on outstanding processors, beyond maxOutstandingProcessors.
* Useful when there is some finite resource being shared against multiple different
* calls to this method.
* @param cancellationId optional cancellation id for {@link #runFully}.
*/
public <T, ResultType> ListenableFuture<ResultType> runAllFully(
final Sequence<? extends FrameProcessor<T>> processors,
final ResultType initialResult,
final BiFunction<ResultType, T, ResultType> accumulateFn,
final int maxOutstandingProcessors,
final Bouncer bouncer,
@Nullable final String cancellationId
)
{
// Logic resides in a separate class in order to keep this one simpler.
return new RunAllFullyWidget<>(
processors,
this,
initialResult,
accumulateFn,
maxOutstandingProcessors,
bouncer,
cancellationId
).run();
}
/**
* Cancels all processors associated with a given cancellationId. Waits for the processors to exit before
* returning.
*/
public void cancel(final String cancellationId) throws InterruptedException
{
Preconditions.checkNotNull(cancellationId, "cancellationId");
final Set<ListenableFuture<?>> futuresToCancel;
final Set<FrameProcessor<?>> processorsToCancel;
final Set<ListenableFuture<?>> returnFuturesToCancel;
synchronized (lock) {
futuresToCancel = cancelableFutures.removeAll(cancellationId);
processorsToCancel = cancelableProcessors.removeAll(cancellationId);
returnFuturesToCancel = cancelableReturnFutures.removeAll(cancellationId);
}
// Cancel all associated non-return futures. Do this first, because it prevents new processors from being
// scheduled by runAllFully.
for (final ListenableFuture<?> future : futuresToCancel) {
future.cancel(true);
}
// Cancel all processors. Do this before the return futures, because this allows the processors to be canceled
// all at once, cleanly. Once we start canceling futuresToCancel, we cancel the processor continuation and return
// futures one-by-one, which if done first would lead to a noisier cancellation.
cancel(processorsToCancel);
// Cancel all associated return futures.
for (final ListenableFuture<?> future : returnFuturesToCancel) {
future.cancel(true);
}
}
/**
* Register a future that will be canceled when the provided {@code cancellationId} is canceled.
*
* @param future cancelable future
* @param isReturn if this is a return future for a processor; these are canceled last
* @param cancellationId cancellation id
*/
<T, FutureType extends ListenableFuture<T>> FutureType registerCancelableFuture(
final FutureType future,
final boolean isReturn,
@Nullable final String cancellationId
)
{
if (cancellationId != null) {
synchronized (lock) {
final SetMultimap<String, ListenableFuture<?>> map = isReturn ? cancelableReturnFutures : cancelableFutures;
map.put(cancellationId, future);
future.addListener(
() -> {
synchronized (lock) {
map.remove(cancellationId, future);
}
},
Execs.directExecutor()
);
}
}
return future;
}
@VisibleForTesting
int cancelableProcessorCount()
{
synchronized (lock) {
return cancelableProcessors.size();
}
}
/**
* Cancels a given set of processors. Waits for the processors to exit before returning.
* Calls {@link FrameProcessor#cleanup()} on all processors.
*
* Callers must remove processors from {@link #cancelableProcessors} before calling this method.
*
* Logs (but does not throw) exceptions encountered while running {@link FrameProcessor#cleanup()}.
*/
private void cancel(final Set<FrameProcessor<?>> processorsToCancel)
throws InterruptedException
{
synchronized (lock) {
for (final FrameProcessor<?> processor : processorsToCancel) {
final Thread processorThread = runningProcessors.get(processor);
if (processorThread != null) {
// Interrupt the thread running the processor. Do this under lock, because we want to make sure we don't
// interrupt the thread shortly *before* or *after* it runs the processor.
processorThread.interrupt();
}
}
// Wait for all running processors to stop running. Then clean them up outside the critical section.
while (processorsToCancel.stream().anyMatch(runningProcessors::containsKey)) {
lock.wait();
}
}
// Now processorsToCancel are not running, also won't run again, because the caller removed them
// from cancelableProcessors. (runProcessorNow checks cancelableProcessors.) Run their cleanup routines
// outside the critical section.
for (final FrameProcessor<?> processor : processorsToCancel) {
// Fail all output channels prior to calling cleanup.
for (final WritableFrameChannel outputChannel : processor.outputChannels()) {
try {
outputChannel.fail(new CancellationException("Canceled"));
}
catch (Throwable e) {
log.debug(e, "Exception encountered while marking output channel failed for processor [%s]", processor);
}
}
try {
processor.cleanup();
}
catch (Throwable e) {
log.noStackTrace().warn(e, "Exception encountered while canceling processor [%s]", processor);
}
}
}
private <T> void registerCancelableProcessor(final FrameProcessor<T> processor, @Nullable final String cancellationId)
{
if (cancellationId != null) {
synchronized (lock) {
cancelableProcessors.put(cancellationId, processor);
}
}
}
private static <T> void logProcessorStatusString(
final FrameProcessor<T> processor,
final ListenableFuture<?> finishedFuture,
@Nullable final List<ListenableFuture<?>> writabilityFutures
)
{
if (log.isDebugEnabled()) {
final StringBuilder sb = new StringBuilder()
.append("Processor [")
.append(processor)
.append("]; in=[");
for (ReadableFrameChannel channel : processor.inputChannels()) {
if (channel.canRead()) {
sb.append("R"); // R for readable
} else if (channel.isFinished()) {
sb.append("D"); // D for done
} else {
sb.append("~"); // ~ for waiting
}
}
sb.append("]");
if (writabilityFutures != null) {
sb.append("; out=[");
for (final ListenableFuture<?> future : writabilityFutures) {
if (future.isDone()) {
sb.append("W"); // W for writable
} else {
sb.append("~"); // ~ for waiting
}
}
sb.append("]");
}
sb.append("; cancel=").append(finishedFuture.isCancelled() ? "y" : "n");
sb.append("; done=").append(finishedFuture.isDone() ? "y" : "n");
log.debug(StringUtils.encodeForFormat(sb.toString()));
}
}
}