blob: a58603164e3bc74b2cebb8e3d5467e080eca8975 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work;
import com.codahale.metrics.Gauge;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Manages the running fragments in a Drillbit. Periodically requests run-time
* stats updates from fragments running elsewhere.
*/
public class WorkManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
private static final int EXIT_TIMEOUT_MS = 5000;
/*
* We use a {@see java.util.concurrent.ConcurrentHashMap} because it promises never to throw a
* {@see java.util.ConcurrentModificationException}; we need that because the statusThread may
* iterate over the map while other threads add FragmentExecutors via the {@see #WorkerBee}.
*/
private final ConcurrentMap<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();
private final ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();
private final BootStrapContext bContext;
private DrillbitContext dContext;
private final ControlMessageHandler controlMessageWorker;
private final UserWorker userWorker;
private final WorkerBee bee;
private final WorkEventBus workBus;
private final Executor executor;
private final StatusThread statusThread;
private final Lock isEmptyLock = new ReentrantLock();
private Condition isEmptyCondition;
/**
* How often the StatusThread collects statistics about running fragments.
*/
private final static int STATUS_PERIOD_SECONDS = 5;
public WorkManager(final BootStrapContext context) {
this.bContext = context;
bee = new WorkerBee(); // TODO should this just be an interface?
workBus = new WorkEventBus(); // TODO should this just be an interface?
executor = context.getExecutor();
// TODO references to this escape here (via WorkerBee) before construction is done
controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
statusThread = new StatusThread();
}
public void start(
final DrillbitEndpoint endpoint,
final Controller controller,
final DataConnectionCreator data,
final ClusterCoordinator coord,
final PersistentStoreProvider provider,
final PersistentStoreProvider profilesProvider) {
dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, profilesProvider);
statusThread.start();
DrillMetrics.register("drill.fragments.running", (Gauge<Integer>) runningFragments::size);
}
public Executor getExecutor() {
return executor;
}
public WorkEventBus getWorkBus() {
return workBus;
}
public ControlMessageHandler getControlMessageHandler() {
return controlMessageWorker;
}
public UserWorker getUserWorker() {
return userWorker;
}
public WorkerBee getBee() {
return bee;
}
@Override
public void close() throws Exception {
statusThread.interrupt();
final long numRunningFragments = runningFragments.size();
if (numRunningFragments != 0) {
logger.warn("Closing WorkManager but there are {} running fragments.", numRunningFragments);
if (logger.isDebugEnabled()) {
for (final FragmentHandle handle : runningFragments.keySet()) {
logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(handle),
runningFragments.get(handle).getStatus());
}
}
}
if (getContext() != null) {
getContext().close();
}
}
public DrillbitContext getContext() {
return dContext;
}
public void waitToExit(final boolean forcefulShutdown) {
isEmptyLock.lock();
isEmptyCondition = isEmptyLock.newCondition();
try {
if (forcefulShutdown) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + EXIT_TIMEOUT_MS;
long currentTime;
while (!areQueriesAndFragmentsEmpty() && (currentTime = System.currentTimeMillis()) < endTime) {
try {
if (!isEmptyCondition.await(endTime - currentTime, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
logger.error("Interrupted while waiting to exit");
}
}
if (!areQueriesAndFragmentsEmpty()) {
logger.warn("Timed out after {} millis. Shutting down before all fragments and foremen " +
"have completed.", EXIT_TIMEOUT_MS);
for (QueryId queryId: queries.keySet()) {
logger.warn("Query {} is still running.", QueryIdHelper.getQueryId(queryId));
}
for (FragmentHandle fragmentHandle: runningFragments.keySet()) {
logger.warn("Fragment {} is still running.", QueryIdHelper.getQueryIdentifier(fragmentHandle));
}
}
} else {
while (!areQueriesAndFragmentsEmpty()) {
isEmptyCondition.awaitUninterruptibly();
}
}
} finally {
isEmptyLock.unlock();
}
}
private boolean areQueriesAndFragmentsEmpty() {
return queries.isEmpty() && runningFragments.isEmpty();
}
/**
* A thread calling the {@link #waitToExit(boolean)} method is notified when a foreman is retired.
*/
private void indicateIfSafeToExit() {
isEmptyLock.lock();
try {
if (isEmptyCondition != null) {
logger.info("Waiting for {} running queries before shutting down.", queries.size());
logger.info("Waiting for {} running fragments before shutting down.", runningFragments.size());
if (areQueriesAndFragmentsEmpty()) {
isEmptyCondition.signal();
}
}
} finally {
isEmptyLock.unlock();
}
}
/**
* Get the number of queries that are running on a drillbit.
* Primarily used to monitor the number of running queries after a
* shutdown request is triggered.
*/
public synchronized Map<String, Integer> getRemainingQueries() {
Map<String, Integer> queriesInfo = new HashMap<>();
queriesInfo.put("queriesCount", queries.size());
queriesInfo.put("fragmentsCount", runningFragments.size());
return queriesInfo;
}
/**
* Narrowed interface to WorkManager that is made available to tasks it is managing.
*/
public class WorkerBee {
public void addNewForeman(final Foreman foreman) {
queries.put(foreman.getQueryId(), foreman);
// We're relying on the Foreman to clean itself up with retireForeman().
executor.execute(foreman);
}
/**
* Add a self contained runnable work to executor service.
*
* @param runnable runnable to execute
*/
public void addNewWork(final Runnable runnable) {
executor.execute(runnable);
}
public boolean cancelForeman(final QueryId queryId, DrillUserPrincipal principal) {
Preconditions.checkNotNull(queryId);
final Foreman foreman = queries.get(queryId);
if (foreman == null) {
return false;
}
final String queryIdString = QueryIdHelper.getQueryId(queryId);
if (principal != null && !principal.canManageQueryOf(foreman.getQueryContext().getQueryUserName())) {
throw UserException.permissionError()
.message("Not authorized to cancel the query '%s'", queryIdString)
.build(logger);
}
executor.execute(() -> {
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
try {
currentThread.setName(queryIdString + ":foreman:cancel");
logger.debug("Canceling foreman. Thread: {}", originalName);
foreman.cancel();
} catch (Throwable t) {
logger.warn("Exception while canceling foreman", t);
} finally {
currentThread.setName(originalName);
}
});
return true;
}
/**
* Remove the given Foreman from the running query list.
*
* <p>The running query list is a bit of a misnomer, because it doesn't
* necessarily mean that {@link org.apache.drill.exec.work.foreman.Foreman#run()}
* is executing. That only lasts for the duration of query setup, after which
* the Foreman instance survives as a state machine that reacts to events
* from the local root fragment as well as RPC responses from remote Drillbits.</p>
*
* @param foreman the Foreman to retire
*/
public void retireForeman(final Foreman foreman) {
Preconditions.checkNotNull(foreman);
final QueryId queryId = foreman.getQueryId();
final boolean wasRemoved = queries.remove(queryId, foreman);
if (!wasRemoved) {
logger.warn("Couldn't find retiring Foreman for query " + queryId);
}
indicateIfSafeToExit();
}
public Foreman getForemanForQueryId(final QueryId queryId) {
return queries.get(queryId);
}
public DrillbitContext getContext() {
return dContext;
}
/**
* Currently used to start a root fragment that is not blocked on data, and leaf fragments.
* @param fragmentExecutor the executor to run
*/
public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {
final FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle();
runningFragments.put(fragmentHandle, fragmentExecutor);
executor.execute(new SelfCleaningRunnable(fragmentExecutor) {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
indicateIfSafeToExit();
}
});
}
/**
* Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
* called, when the first batch arrives.
*
* @param fragmentManager the manager for the fragment
*/
public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
final FragmentHandle fragmentHandle = fragmentManager.getHandle();
final FragmentExecutor fragmentExecutor = fragmentManager.getRunnable();
if (fragmentExecutor == null) {
// the fragment was most likely cancelled
return;
}
runningFragments.put(fragmentHandle, fragmentExecutor);
executor.execute(new SelfCleaningRunnable(fragmentExecutor) {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
if (!fragmentManager.isCancelled()) {
workBus.removeFragmentManager(fragmentHandle, false);
}
indicateIfSafeToExit();
}
});
}
public FragmentExecutor getFragmentRunner(final FragmentHandle handle) {
return runningFragments.get(handle);
}
/**
* receive the RuntimeFilter thorough the wire
* @param runtimeFilter runtime filter
*/
public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) {
BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef();
boolean toForeman = runtimeFilterDef.getToForeman();
QueryId queryId = runtimeFilterDef.getQueryId();
String queryIdStr = QueryIdHelper.getQueryId(queryId);
runtimeFilter.retainBuffers(1);
//to foreman
if (toForeman) {
Foreman foreman = queries.get(queryId);
if (foreman != null) {
executor.execute(() -> {
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter");
try {
foreman.getRuntimeFilterRouter().register(runtimeFilter);
} catch (Exception e) {
logger.warn("Exception while registering the RuntimeFilter", e);
} finally {
currentThread.setName(originalName);
runtimeFilter.close();
}
});
}
} else {
//to the probe side scan node
int majorId = runtimeFilterDef.getMajorFragmentId();
int minorId = runtimeFilterDef.getMinorFragmentId();
ExecProtos.FragmentHandle fragmentHandle = ExecProtos.FragmentHandle.newBuilder().setMajorFragmentId(majorId)
.setMinorFragmentId(minorId)
.setQueryId(queryId).build();
FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle);
if (fragmentExecutor != null) {
fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
}
}
}
}
/**
* Periodically gather current statistics. {@link org.apache.drill.exec.work.foreman.QueryManager} uses a FragmentStatusListener to
* maintain changes to state, and should be current. However, we want to collect current statistics
* about RUNNING queries, such as current memory consumption, number of rows processed, and so on.
* The FragmentStatusListener only tracks changes to state, so the statistics kept there will be
* stale; this thread probes for current values.
*
* For each running fragment if the Foreman is the local Drillbit then status is updated locally bypassing the Control
* Tunnel, whereas for remote Foreman it is sent over the Control Tunnel.
*/
private class StatusThread extends Thread {
StatusThread() {
// assume this thread is created by a non-daemon thread
setName("WorkManager.StatusThread");
}
@Override
public void run() {
// Get the controller and localBitEndPoint outside the loop since these will not change once a Drillbit and
// StatusThread is started
final Controller controller = dContext.getController();
final DrillbitEndpoint localBitEndPoint = dContext.getEndpoint();
while (true) {
final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
for (final FragmentExecutor fragmentExecutor : runningFragments.values()) {
final FragmentStatus status = fragmentExecutor.getStatus();
if (status == null) {
continue;
}
final DrillbitEndpoint foremanEndpoint = fragmentExecutor.getContext().getForemanEndpoint();
// If local endpoint is the Foreman for this running fragment, then submit the status locally bypassing the
// Control Tunnel
if (localBitEndPoint.equals(foremanEndpoint)) {
workBus.statusUpdate(status);
} else { // else send the status to remote Foreman over Control Tunnel
futures.add(controller.getTunnel(foremanEndpoint).sendFragmentStatus(status));
}
}
for (final DrillRpcFuture<Ack> future : futures) {
try {
future.checkedGet();
} catch (final RpcException ex) {
logger.info("Failure while sending intermediate fragment status to Foreman", ex);
}
}
try {
Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
} catch (final InterruptedException e) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
// exit status thread on interrupt.
break;
}
}
}
}
}