blob: 1211df6b74b0aa707bc9c2e1322fefb0aab30982 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.driver;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryMetricsManager;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import org.apache.tsfile.read.common.block.TsBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Boolean.TRUE;
import static org.apache.iotdb.db.queryengine.execution.operator.Operator.NOT_BLOCKED;
import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DRIVER_INTERNAL_PROCESS;
public abstract class Driver implements IDriver {
protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
protected static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
protected static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
QueryExecutionMetricSet.getInstance();
protected final DriverContext driverContext;
protected final Operator root;
protected final ISink sink;
protected final AtomicReference<SettableFuture<?>> driverBlockedFuture = new AtomicReference<>();
protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
protected final DriverLock exclusiveLock = new DriverLock();
private boolean isHighestPriority;
protected enum State {
ALIVE,
NEED_DESTRUCTION,
DESTROYED
}
protected Driver(Operator root, DriverContext driverContext) {
checkNotNull(root, "root Operator should not be null");
checkNotNull(driverContext.getSink(), "Sink should not be null");
this.driverContext = driverContext;
this.root = root;
this.sink = driverContext.getSink();
// initially the driverBlockedFuture is not blocked (it is completed)
SettableFuture<Void> future = SettableFuture.create();
future.set(null);
driverBlockedFuture.set(future);
}
@Override
public boolean isFinished() {
checkLockNotHeld("Cannot check finished status while holding the driver lock");
// if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
}
public DriverContext getDriverContext() {
return driverContext;
}
/**
* do initialization.
*
* @return true if init succeed, false otherwise
*/
protected abstract boolean init(SettableFuture<?> blockedFuture);
/** release resource this driver used. */
protected abstract void releaseResource();
@Override
public ListenableFuture<?> processFor(Duration duration) {
SettableFuture<?> blockedFuture = driverBlockedFuture.get();
// if the driver is blocked we don't need to continue
if (!blockedFuture.isDone()) {
return blockedFuture;
}
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
Optional<ListenableFuture<?>> result =
tryWithLock(
100,
TimeUnit.MILLISECONDS,
false,
() -> {
// only keep doing query processing if driver state is still alive
if (state.get() == State.ALIVE) {
long start = System.nanoTime();
// initialization may be time-consuming, so we keep it in the processFor method
// in normal case, it won't cause deadlock and should finish soon, otherwise it will
// be a
// critical bug
// We should do initialization after holding the lock to avoid parallelism problems
// with close
if (!init(blockedFuture)) {
return blockedFuture;
}
do {
ListenableFuture<?> future = processInternal();
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
} while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
@Override
public DriverTaskId getDriverTaskId() {
return driverContext.getDriverTaskID();
}
@Override
public void setDriverTaskId(DriverTaskId driverTaskId) {
this.driverContext.setDriverTaskID(driverTaskId);
}
@Override
public boolean isHighestPriority() {
return isHighestPriority;
}
@Override
public void setHighestPriority(boolean isHighestPriority) {
this.isHighestPriority = isHighestPriority;
}
@Override
public void close() {
// mark the service for destruction
if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
return;
}
exclusiveLock.interruptCurrentOwner();
// if we can get the lock, attempt a clean shutdown; otherwise someone else will shut down
tryWithLockUnInterruptibly(() -> TRUE);
}
@Override
public void failed(Throwable t) {
driverContext.failed(t);
}
@Override
public ISink getSink() {
return sink;
}
@SuppressWarnings("squid:S112")
@GuardedBy("exclusiveLock")
private boolean isFinishedInternal() {
checkLockHeld("Lock must be held to call isFinishedInternal");
boolean finished;
try {
finished =
state.get() != State.ALIVE
|| driverContext.isDone()
|| root.isFinished()
|| sink.isClosed();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (finished) {
state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
}
return finished;
}
@SuppressWarnings({"squid:S1181", "squid:S112"})
private ListenableFuture<?> processInternal() {
long startTimeNanos = System.nanoTime();
try {
ListenableFuture<?> blocked = root.isBlocked();
if (!blocked.isDone()) {
return blocked;
}
blocked = sink.isFull();
if (!blocked.isDone()) {
return blocked;
}
if (root.hasNextWithTimer()) {
TsBlock tsBlock = root.nextWithTimer();
if (tsBlock != null && !tsBlock.isEmpty()) {
sink.send(tsBlock);
}
}
return NOT_BLOCKED;
} catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw new RuntimeException(t);
}
// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state
// machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
} finally {
QUERY_EXECUTION_METRICS.recordExecutionCost(
DRIVER_INTERNAL_PROCESS, System.nanoTime() - startTimeNanos);
}
}
private ListenableFuture<?> updateDriverBlockedFuture(ListenableFuture<?> sourceBlockedFuture) {
// driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
// or any of the operators gets a memory revocation request
SettableFuture<?> newDriverBlockedFuture = SettableFuture.create();
driverBlockedFuture.set(newDriverBlockedFuture);
sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
// Although we don't have memory management for operator now, we should consider it for
// future
// it's possible that memory revoking is requested for some operator
// before we update driverBlockedFuture above and we don't want to miss that
// notification, so we check to see whether that's the case before returning.
return newDriverBlockedFuture;
}
private synchronized void checkLockNotHeld(String message) {
checkState(!exclusiveLock.isHeldByCurrentThread(), message);
}
@GuardedBy("exclusiveLock")
private synchronized void checkLockHeld(String message) {
checkState(exclusiveLock.isHeldByCurrentThread(), message);
}
/**
* Try to acquire the {@code exclusiveLock} immediately and run a {@code task} The task will not
* be interrupted if the {@code Driver} is closed.
*
* <p>Note: task cannot return null
*/
private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
}
/**
* Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}. If the
* {@code interruptOnClose} flag is set to {@code true} the {@code task} will be interrupted if
* the {@code Driver} is closed.
*
* <p>Note: task cannot return null
*/
private <T> Optional<T> tryWithLock(
long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
checkLockNotHeld("Lock cannot be reacquired");
boolean acquired = false;
try {
acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!acquired) {
return Optional.empty();
}
Optional<T> result;
try {
result = Optional.of(task.get());
} finally {
try {
destroyIfNecessary();
} finally {
exclusiveLock.unlock();
}
}
// We need to recheck whether the state is NEED_DESTRUCTION, if so, destroy the driver.
// We assume that there is another concurrent Thread-A calling close method, it successfully CAS
// state from ALIVE to NEED_DESTRUCTION just after current Thread-B do destroyIfNecessary() and
// before exclusiveLock.unlock().
// Then Thread-A call this method, trying to acquire lock and do destroy things, but it won't
// succeed because the lock is still held by Thread-B. So Thread-A exit.
// If we don't do this recheck here, Thread-B will exit too. Nobody will do destroy things.
if (state.get() == State.NEED_DESTRUCTION && exclusiveLock.tryLock(interruptOnClose)) {
try {
destroyIfNecessary();
} finally {
exclusiveLock.unlock();
}
}
return result;
}
@SuppressWarnings({"squid:S1181", "squid:S112"})
@GuardedBy("exclusiveLock")
private void destroyIfNecessary() {
checkLockHeld("Lock must be held to call destroyIfNecessary");
if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
return;
}
// if we get an error while closing a driver, record it and we will throw it at the end
Throwable inFlightException = null;
try {
inFlightException = closeAndDestroyOperators();
driverContext.finished();
} catch (Throwable t) {
// this shouldn't happen but be safe
inFlightException =
addSuppressedException(
inFlightException,
t,
"Error destroying driver for task %s",
driverContext.getDriverTaskID());
} finally {
releaseResource();
}
if (inFlightException != null) {
// this will always be an Error or Runtime
throwIfUnchecked(inFlightException);
throw new RuntimeException(inFlightException);
}
}
@SuppressWarnings("squid:S1181")
private Throwable closeAndDestroyOperators() {
// record the current interrupted status (and clear the flag); we'll reset it later
boolean wasInterrupted = Thread.interrupted();
Throwable inFlightException = null;
try {
root.close();
if (driverContext.mayHaveTmpFile()) {
cleanTmpFile();
}
sink.setNoMoreTsBlocks();
Map<String, long[]> operatorType2TotalCost = new HashMap<>();
// record operator execution statistics to metrics
List<OperatorContext> operatorContexts = driverContext.getOperatorContexts();
for (OperatorContext operatorContext : operatorContexts) {
String operatorType = operatorContext.getOperatorType();
long[] value = operatorType2TotalCost.computeIfAbsent(operatorType, k -> new long[2]);
value[0] += operatorContext.getTotalExecutionTimeInNanos();
value[1] += operatorContext.getNextCalledCount();
}
for (Map.Entry<String, long[]> entry : operatorType2TotalCost.entrySet()) {
QUERY_METRICS.recordOperatorExecutionCost(entry.getKey(), entry.getValue()[0]);
QUERY_METRICS.recordOperatorExecutionCount(entry.getKey(), entry.getValue()[1]);
}
} catch (InterruptedException t) {
// don't record the stack
wasInterrupted = true;
} catch (Throwable t) {
inFlightException =
addSuppressedException(
inFlightException,
t,
"Error closing operator {} for driver task {}",
root.getOperatorContext().getOperatorId(),
driverContext.getDriverTaskID());
} finally {
// reset the interrupted flag
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
return inFlightException;
}
private void cleanTmpFile() {
String pipeLineSortDir =
IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ File.separator
+ driverContext.getFragmentInstanceContext().getId().getFullId()
+ File.separator
+ driverContext.getPipelineId()
+ File.separator;
File tmpPipeLineDir = new File(pipeLineSortDir);
if (!tmpPipeLineDir.exists()) {
return;
}
FileUtils.deleteFileOrDirectory(tmpPipeLineDir);
}
private static Throwable addSuppressedException(
Throwable inFlightException, Throwable newException, String message, Object... args) {
if (newException instanceof Error) {
if (inFlightException == null) {
inFlightException = newException;
} else {
// Self-suppression not permitted
if (inFlightException != newException) {
inFlightException.addSuppressed(newException);
}
}
} else {
// log normal exceptions instead of rethrowing them
LOGGER.error(message, args, newException);
}
return inFlightException;
}
private static class DriverLock {
private final ReentrantLock lock = new ReentrantLock();
@GuardedBy("this")
private Thread currentOwner;
@GuardedBy("this")
private boolean currentOwnerInterruptionAllowed;
@GuardedBy("this")
private List<StackTraceElement> interrupterStack;
public boolean isHeldByCurrentThread() {
return lock.isHeldByCurrentThread();
}
public boolean tryLock(boolean currentThreadInterruptionAllowed) {
checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
boolean acquired = lock.tryLock();
if (acquired) {
setOwner(currentThreadInterruptionAllowed);
}
return acquired;
}
public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
throws InterruptedException {
checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
boolean acquired = lock.tryLock(timeout, unit);
if (acquired) {
setOwner(currentThreadInterruptionAllowed);
}
return acquired;
}
private synchronized void setOwner(boolean interruptionAllowed) {
checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
currentOwner = Thread.currentThread();
currentOwnerInterruptionAllowed = interruptionAllowed;
// NOTE: We do not use interrupted stack information to know that another
// thread has attempted to interrupt the driver, and interrupt this new lock
// owner. The interrupted stack information is for debugging purposes only.
// In the case of interruption, the caller should (and does) have a separate
// state to prevent further processing in the Driver.
}
public synchronized void unlock() {
checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
currentOwner = null;
currentOwnerInterruptionAllowed = false;
lock.unlock();
}
public synchronized List<StackTraceElement> getInterrupterStack() {
return interrupterStack;
}
public synchronized void interruptCurrentOwner() {
if (!currentOwnerInterruptionAllowed) {
return;
}
// there is a benign race condition here were the lock holder
// can be change between attempting to get lock and grabbing
// the synchronized lock here, but in either case we want to
// interrupt the lock holder thread
if (interrupterStack == null) {
interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
}
if (currentOwner != null) {
currentOwner.interrupt();
}
}
}
}