blob: 8dcac2894e874c65c485ed7563d6073c95c4b537 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.partitionsender;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.testing.CountDownLatchInjection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decorator class to hide multiple Partitioner existence from the caller since
* this class involves multithreaded processing of incoming batches as well as
* flushing it needs special handling of OperatorStats - stats since stats are
* not suitable for use in multithreaded environment The algorithm to figure out
* processing versus wait time is based on following formula: totalWaitTime =
* totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
*/
public final class PartitionerDecorator {
private static final Logger logger = LoggerFactory.getLogger(PartitionerDecorator.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
private final List<Partitioner> partitioners;
private final OperatorStats stats;
private final ExecutorService executor;
private final FragmentContext context;
private final Thread thread;
private final boolean enableParallelTaskExecution;
PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
this(partitioners, stats, context, partitioners.size() > 1);
}
PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context, boolean enableParallelTaskExecution) {
this.partitioners = partitioners;
this.stats = stats;
this.context = context;
this.enableParallelTaskExecution = enableParallelTaskExecution;
executor = enableParallelTaskExecution ? context.getExecutor() : MoreExecutors.newDirectExecutorService();
thread = Thread.currentThread();
}
/**
* partitionBatch - decorator method to call real Partitioner(s) to process incoming batch
* uses either threading or not threading approach based on number Partitioners
* @param incoming
* @throws ExecutionException
*/
public void partitionBatch(final RecordBatch incoming) throws ExecutionException {
executeMethodLogic(new PartitionBatchHandlingClass(incoming));
}
/**
* flushOutgoingBatches - decorator to call real Partitioner(s) flushOutgoingBatches
* @param isLastBatch
* @param schemaChanged
* @throws ExecutionException
*/
public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws ExecutionException {
executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, schemaChanged));
}
/**
* decorator method to call multiple Partitioners initialize()
*/
public void initialize() {
for (Partitioner part : partitioners ) {
part.initialize();
}
}
/**
* decorator method to call multiple Partitioners clear()
*/
public void clear() {
for (Partitioner part : partitioners ) {
part.clear();
}
}
/**
* Helper method to get PartitionOutgoingBatch based on the index
* since we may have more then one Partitioner
* As number of Partitioners should be very small AND this method it used very rarely,
* so it is OK to loop in order to find right partitioner
* @param index - index of PartitionOutgoingBatch
* @return PartitionOutgoingBatch
*/
public PartitionOutgoingBatch getOutgoingBatches(int index) {
for (Partitioner part : partitioners ) {
PartitionOutgoingBatch outBatch = part.getOutgoingBatch(index);
if ( outBatch != null ) {
return outBatch;
}
}
return null;
}
List<Partitioner> getPartitioners() {
return partitioners;
}
/**
* Helper to execute the different methods wrapped into same logic
* @param iface
* @throws ExecutionException
*/
@VisibleForTesting
void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException {
// To simulate interruption of main fragment thread and interrupting the partition threads, create a
// CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or
// interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads.
try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) {
testCountDownLatch.initialize(1);
final AtomicInteger count = new AtomicInteger();
List<PartitionerTask> partitionerTasks = new ArrayList<>(partitioners.size());
ExecutionException executionException = null;
// start waiting on main stats to adjust by sum(max(processing)) at the end
startWait();
try {
partitioners.forEach(partitioner -> createAndExecute(iface, testCountDownLatch, count, partitionerTasks, partitioner));
// Wait for main fragment interruption.
injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
testCountDownLatch.countDown();
} catch (InterruptedException e) {
logger.warn("fragment thread interrupted", e);
throw new QueryCancelledException();
} catch (RejectedExecutionException e) {
logger.warn("Failed to execute partitioner tasks. Execution service down?", e);
executionException = new ExecutionException(e);
} finally {
await(count, partitionerTasks);
stopWait();
processPartitionerTasks(partitionerTasks, executionException);
}
}
}
private void createAndExecute(GeneralExecuteIface iface, CountDownLatchInjection testCountDownLatch, AtomicInteger count,
List<PartitionerTask> partitionerTasks, Partitioner partitioner) {
PartitionerTask partitionerTask = new PartitionerTask(this, iface, partitioner, count, testCountDownLatch);
executor.execute(partitionerTask);
partitionerTasks.add(partitionerTask);
count.incrementAndGet();
}
/**
* Wait for completion of all partitioner tasks.
* @param count current number of task not yet completed
* @param partitionerTasks list of partitioner tasks submitted for execution
*/
private void await(AtomicInteger count, List<PartitionerTask> partitionerTasks) {
boolean cancelled = false;
while (count.get() > 0) {
if (context.getExecutorState().shouldContinue() || cancelled) {
LockSupport.park();
} else {
logger.warn("Cancelling fragment {} partitioner tasks...", context.getFragIdString());
partitionerTasks.forEach(partitionerTask -> partitionerTask.cancel(true));
cancelled = true;
}
}
}
private void startWait() {
if (enableParallelTaskExecution) {
stats.startWait();
}
}
private void stopWait() {
if (enableParallelTaskExecution) {
stats.stopWait();
}
}
private void processPartitionerTasks(List<PartitionerTask> partitionerTasks, ExecutionException executionException) throws ExecutionException {
long maxProcessTime = 0l;
for (PartitionerTask partitionerTask : partitionerTasks) {
ExecutionException e = partitionerTask.getException();
if (e != null) {
if (executionException == null) {
executionException = e;
} else {
executionException.getCause().addSuppressed(e.getCause());
}
}
if (executionException == null) {
final OperatorStats localStats = partitionerTask.getStats();
// find out max Partitioner processing time
if (enableParallelTaskExecution) {
long currentProcessingNanos = localStats.getProcessingNanos();
maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime;
} else {
maxProcessTime += localStats.getWaitNanos();
}
stats.mergeMetrics(localStats);
}
}
if (executionException != null) {
throw executionException;
}
// scale down main stats wait time based on calculated processing time
// since we did not wait for whole duration of above execution
if (enableParallelTaskExecution) {
stats.adjustWaitNanos(-maxProcessTime);
} else {
stats.adjustWaitNanos(maxProcessTime);
}
}
/**
* Helper interface to generalize functionality executed in the thread
* since it is absolutely the same for partitionBatch and flushOutgoingBatches
* protected is for testing purposes
*/
protected interface GeneralExecuteIface {
void execute(Partitioner partitioner) throws IOException;
}
/**
* Class to handle running partitionBatch method
*
*/
private static class PartitionBatchHandlingClass implements GeneralExecuteIface {
private final RecordBatch incoming;
PartitionBatchHandlingClass(RecordBatch incoming) {
this.incoming = incoming;
}
@Override
public void execute(Partitioner part) throws IOException {
part.partitionBatch(incoming);
}
}
/**
* Class to handle running flushOutgoingBatches method
*
*/
private static class FlushBatchesHandlingClass implements GeneralExecuteIface {
private final boolean isLastBatch;
private final boolean schemaChanged;
public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
this.isLastBatch = isLastBatch;
this.schemaChanged = schemaChanged;
}
@Override
public void execute(Partitioner part) throws IOException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
* Helper class to wrap Runnable with cancellation and waiting for completion support
*
*/
private static class PartitionerTask implements Runnable {
private enum STATE {
NEW,
COMPLETING,
NORMAL,
EXCEPTIONAL,
CANCELLED,
INTERRUPTING,
INTERRUPTED
}
private final AtomicReference<STATE> state;
private final AtomicReference<Thread> runner;
private final PartitionerDecorator partitionerDecorator;
private final AtomicInteger count;
private final GeneralExecuteIface iface;
private final Partitioner partitioner;
private final CountDownLatchInjection testCountDownLatch;
private volatile ExecutionException exception;
public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
state = new AtomicReference<>(STATE.NEW);
runner = new AtomicReference<>();
this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
this.partitioner = partitioner;
this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
final Thread thread = Thread.currentThread();
if (runner.compareAndSet(null, thread)) {
final String name = thread.getName();
thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId()));
final OperatorStats localStats = partitioner.getStats();
localStats.clear();
localStats.startProcessing();
ExecutionException executionException = null;
try {
// Test only - Pause until interrupted by fragment thread
testCountDownLatch.await();
if (state.get() == STATE.NEW) {
iface.execute(partitioner);
}
} catch (InterruptedException e) {
if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
logger.warn("Partitioner Task interrupted during the run", e);
}
} catch (Throwable t) {
executionException = new ExecutionException(t);
} finally {
if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
if (executionException == null) {
localStats.stopProcessing();
state.lazySet(STATE.NORMAL);
} else {
exception = executionException;
state.lazySet(STATE.EXCEPTIONAL);
}
}
if (count.decrementAndGet() == 0) {
LockSupport.unpark(partitionerDecorator.thread);
}
thread.setName(name);
while (state.get() == STATE.INTERRUPTING) {
Thread.yield();
}
// Clear interrupt flag
Thread.interrupted();
}
}
}
void cancel(boolean mayInterruptIfRunning) {
Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread,
String.format("PartitionerTask can be cancelled only from the main %s thread", partitionerDecorator.thread.getName()));
if (runner.compareAndSet(null, partitionerDecorator.thread)) {
if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
}
count.decrementAndGet();
} else {
if (mayInterruptIfRunning) {
if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTING)) {
try {
runner.get().interrupt();
} finally {
state.lazySet(STATE.INTERRUPTED);
}
}
} else {
state.compareAndSet(STATE.NEW, STATE.CANCELLED);
}
}
}
public ExecutionException getException() {
return exception;
}
public OperatorStats getStats() {
return partitioner.getStats();
}
}
}