blob: a919ed90059e288235c16f0c57e53d6fd9a7e3ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.foreman;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile.Builder;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.predicates.IntObjectPredicate;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Each Foreman holds its own QueryManager. This manages the events associated
* with execution of a particular query across all fragments.
*/
public class QueryManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(QueryManager.class);
private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
private final QueryId queryId;
private final String stringQueryId;
private final RunQuery runQuery;
private final Foreman foreman;
/*
* Doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then
* accessed by multiple threads for reads only.
*/
private final IntObjectHashMap<IntObjectHashMap<FragmentData>> fragmentDataMap =
new IntObjectHashMap<>();
private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
private final PersistentStore<QueryProfile> completedProfileStore;
private final TransientStore<QueryInfo> runningProfileStore;
// the following mutable variables are used to capture ongoing query status
private String planText;
private PlanProperties planProps;
private long startTime = System.currentTimeMillis();
private long endTime;
private long planningEndTime;
private long queueWaitEndTime;
// How many nodes have finished their execution. Query is complete when all nodes are complete.
private final AtomicInteger finishedNodes = new AtomicInteger(0);
// How many fragments have finished their execution.
private final AtomicInteger finishedFragments = new AtomicInteger(0);
// Is the query saved in transient store
private boolean inTransientStore;
/**
* Total query cost. This value is used to place the query into a queue
* and so has meaning to the user who wants to predict queue placement.
*/
private double totalCost;
private String queueName;
public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
final ClusterCoordinator coordinator, final Foreman foreman) {
this.queryId = queryId;
this.runQuery = runQuery;
this.foreman = foreman;
stringQueryId = QueryIdHelper.getQueryId(queryId);
this.completedProfileStore = foreman.getQueryContext().getProfileStoreContext().getCompletedProfileStore();
this.runningProfileStore = foreman.getQueryContext().getProfileStoreContext().getRunningProfileStore();
}
private static boolean isTerminal(final FragmentState state) {
return state == FragmentState.FAILED
|| state == FragmentState.FINISHED
|| state == FragmentState.CANCELLED;
}
private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
final int majorFragmentId = fragmentHandle.getMajorFragmentId();
final int minorFragmentId = fragmentHandle.getMinorFragmentId();
final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);
final FragmentState oldState = data.getState();
final boolean inTerminalState = isTerminal(oldState);
final FragmentState currentState = fragmentStatus.getProfile().getState();
if (inTerminalState || (oldState == FragmentState.CANCELLATION_REQUESTED && !isTerminal(currentState))) {
// Already in a terminal state, or invalid state transition from CANCELLATION_REQUESTED. This shouldn't happen.
logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState));
return false;
}
data.setStatus(fragmentStatus);
return oldState != currentState;
}
private void fragmentDone(final FragmentStatus status) {
final boolean stateChanged = updateFragmentStatus(status);
if (stateChanged) {
// since we're in the fragment done clause and this was a change from previous
final NodeTracker node = nodeMap.get(status.getProfile().getEndpoint());
node.fragmentComplete();
finishedFragments.incrementAndGet();
}
}
private void addFragment(final FragmentData fragmentData) {
final FragmentHandle fragmentHandle = fragmentData.getHandle();
final int majorFragmentId = fragmentHandle.getMajorFragmentId();
final int minorFragmentId = fragmentHandle.getMinorFragmentId();
IntObjectHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
if (minorMap == null) {
minorMap = new IntObjectHashMap<>();
fragmentDataMap.put(majorFragmentId, minorMap);
}
minorMap.put(minorFragmentId, fragmentData);
fragmentDataSet.add(fragmentData);
}
public String getFragmentStatesAsString() {
return fragmentDataMap.toString();
}
void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
final DrillbitEndpoint assignment = fragment.getAssignment();
NodeTracker tracker = nodeMap.get(assignment);
if (tracker == null) {
tracker = new NodeTracker(assignment);
nodeMap.put(assignment, tracker);
}
tracker.addFragment();
addFragment(new FragmentData(fragment.getHandle(), assignment, isRoot));
}
/**
* Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
*
* For the actual cancel calls for intermediate and leaf fragments, see
* {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
* (1) Root fragment: pending or running, send the cancel signal through a tunnel.
* (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
* fragments). The actual cancel is done by delegating the cancel to the work bus.
* (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
void cancelExecutingFragments(final DrillbitContext drillbitContext) {
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
switch(data.getState()) {
case SENDING:
case AWAITING_ALLOCATION:
case RUNNING:
final FragmentHandle handle = data.getHandle();
final DrillbitEndpoint endpoint = data.getEndpoint();
// TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
SignalListener.Signal.CANCEL), handle);
break;
case FINISHED:
case CANCELLATION_REQUESTED:
case CANCELLED:
case FAILED:
// nothing to do
break;
}
}
}
/**
* Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
* sending any message. Resume all fragments through the control tunnel.
*/
void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
final DrillbitEndpoint endpoint = data.getEndpoint();
final FragmentHandle handle = data.getHandle();
controller.getTunnel(endpoint).unpauseFragment(new SignalListener(endpoint, handle,
SignalListener.Signal.UNPAUSE), handle);
}
}
@Override
public void close() throws Exception { }
/*
* This assumes that the FragmentStatusListener implementation takes action when it hears
* that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
* but log messages.
*/
private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
/**
* An enum of possible signals that {@link SignalListener} listens to.
*/
public static enum Signal { CANCEL, UNPAUSE }
private final Signal signal;
public SignalListener(final DrillbitEndpoint endpoint, final FragmentHandle handle, final Signal signal) {
super(endpoint, handle);
this.signal = signal;
}
@Override
public void failed(final RpcException ex) {
logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", signal, value, endpoint, ex);
}
@Override
public void success(final Ack ack, final ByteBuf buf) {
if (!ack.getOk()) {
logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", endpoint, signal, value,
ack);
}
}
@Override
public void interrupted(final InterruptedException ex) {
logger.error("Interrupted while waiting for RPC outcome of action fragment {}. " +
"Endpoint {}, Fragment handle {}", signal, endpoint, value, ex);
}
}
void updateEphemeralState(final QueryState queryState) {
// If query is already in zk transient store, ignore the transient state update option.
// Else, they will not be removed from transient store upon completion.
if (!inTransientStore && !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
return;
}
switch (queryState) {
case PREPARING:
case PLANNING:
case ENQUEUED:
case STARTING:
case RUNNING:
case CANCELLATION_REQUESTED:
try {
runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
inTransientStore = true;
} catch (IllegalArgumentException e) {
throw UserException.executionError(e)
.message("Failed to persist query info. Query length is too big.", e)
.build(logger);
}
break;
case COMPLETED:
case CANCELED:
case FAILED:
try {
runningProfileStore.remove(stringQueryId);
inTransientStore = false;
} catch (final Exception e) {
logger.warn("Failure while trying to delete the stored profile for the query [{}]", stringQueryId, e);
}
break;
default:
throw new IllegalStateException("unrecognized queryState " + queryState);
}
}
void writeFinalProfile(UserException ex) {
try {
// TODO(DRILL-2362) when do these ever get deleted?
completedProfileStore.put(stringQueryId, getQueryProfile(ex));
} catch (Exception e) {
logger.error("Failure while storing Query Profile", e);
}
}
private QueryInfo getQueryInfo() {
final String queryText = foreman.getQueryText();
QueryInfo.Builder queryInfoBuilder = QueryInfo.newBuilder()
.setState(foreman.getState())
.setUser(foreman.getQueryContext().getQueryUserName())
.setForeman(foreman.getQueryContext().getCurrentEndpoint())
.setStart(startTime)
.setTotalCost(totalCost)
.setQueueName(queueName == null ? "-" : queueName)
.setOptionsJson(getQueryOptionsAsJson());
if (queryText != null) {
queryInfoBuilder.setQuery(queryText);
}
return queryInfoBuilder.build();
}
public QueryProfile getQueryProfile() {
return getQueryProfile(null);
}
private QueryProfile getQueryProfile(UserException ex) {
QueryContext queryCtx = foreman.getQueryContext();
final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
.setUser(queryCtx.getQueryUserName())
.setType(runQuery.getType())
.setId(queryId)
.setQueryId(QueryIdHelper.getQueryId(queryId))
.setState(foreman.getState())
.setForeman(queryCtx.getCurrentEndpoint())
.setStart(startTime)
.setEnd(endTime)
.setPlanEnd(planningEndTime)
.setQueueWaitEnd(queueWaitEndTime)
.setTotalFragments(fragmentDataSet.size())
.setFinishedFragments(finishedFragments.get())
.setTotalCost(totalCost)
.setQueueName(queueName == null ? "-" : queueName)
.setOptionsJson(getQueryOptionsAsJson());
if (ex != null) {
profileBuilder.setError(ex.getMessage(false));
profileBuilder.setVerboseError(ex.getVerboseMessage(false));
profileBuilder.setErrorId(ex.getErrorId());
if (ex.getErrorLocation() != null) {
profileBuilder.setErrorNode(ex.getErrorLocation());
}
}
if (planText != null) {
profileBuilder.setPlan(planText);
}
final String queryText = foreman.getQueryText();
if (queryText != null) {
profileBuilder.setQuery(queryText);
}
if (planProps != null && planProps.scannedPluginNames != null ) {
profileBuilder.addAllScannedPlugins(planProps.scannedPluginNames);
}
int autoLimitRowCount = queryCtx.getOptions().getOption(ExecConstants.QUERY_MAX_ROWS).num_val.intValue();
if (autoLimitRowCount > 0) {
profileBuilder.setAutoLimit(autoLimitRowCount);
logger.debug("The query's resultset was limited to {} rows", autoLimitRowCount);
}
fragmentDataMap.forEach(new OuterIter(profileBuilder));
return profileBuilder.build();
}
private String getQueryOptionsAsJson() {
try {
OptionList optionList = foreman.getQueryContext().getOptions().getOptionList();
return foreman.getQueryContext().getLpPersistence().getMapper().writeValueAsString(optionList);
} catch (JsonProcessingException e) {
throw new DrillRuntimeException("Error while trying to convert option list to json string", e);
}
}
private class OuterIter implements IntObjectPredicate<IntObjectHashMap<FragmentData>> {
private final QueryProfile.Builder profileBuilder;
public OuterIter(Builder profileBuilder) {
this.profileBuilder = profileBuilder;
}
@Override
public boolean apply(final int majorFragmentId, final IntObjectHashMap<FragmentData> minorMap) {
final MajorFragmentProfile.Builder builder = MajorFragmentProfile.newBuilder().setMajorFragmentId(majorFragmentId);
minorMap.forEach(new InnerIter(builder));
profileBuilder.addFragmentProfile(builder);
return true;
}
}
private class InnerIter implements IntObjectPredicate<FragmentData> {
private final MajorFragmentProfile.Builder builder;
public InnerIter(MajorFragmentProfile.Builder fb) {
this.builder = fb;
}
@Override
public boolean apply(int key, FragmentData data) {
builder.addMinorFragmentProfile(data.getProfile());
return true;
}
}
void setPlanText(final String planText) {
this.planText = planText;
}
void setPlanProperties(PlanProperties planProps) {
this.planProps = planProps;
}
void markStartTime() {
startTime = System.currentTimeMillis();
}
void markEndTime() {
endTime = System.currentTimeMillis();
}
void markPlanningEndTime() {
planningEndTime = System.currentTimeMillis();
}
void markQueueWaitEndTime() {
queueWaitEndTime = System.currentTimeMillis();
}
public void setTotalCost(double totalCost) {
this.totalCost = totalCost;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
/**
* Internal class used to track the number of pending completion messages required from particular node. This allows
* to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
* there is a node failure, we can then correctly track how many outstanding messages will never arrive.
*/
private class NodeTracker {
private final AtomicInteger totalFragments = new AtomicInteger(0);
private final AtomicInteger completedFragments = new AtomicInteger(0);
public NodeTracker(final DrillbitEndpoint endpoint) { }
/**
* Increments the number of fragment this node is running.
*/
public void addFragment() {
totalFragments.incrementAndGet();
}
/**
* Increments the number of fragments completed on this node. Once the number of fragments completed
* equals the number of fragments running, this will be marked as a finished node and result in the finishedNodes being incremented.
*
* If the number of remaining nodes has been decremented to zero, this will allow the query to move to a completed state.
*/
public void fragmentComplete() {
if (totalFragments.get() == completedFragments.incrementAndGet()) {
nodeComplete();
}
}
/**
* Increments the number of fragments completed on this node until we mark this node complete. Note that this uses
* the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only
* occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an
* external event).
*
* @return true if the node has fragments that are pending (non-terminal state); false if all fragments running on
* this node have already terminated.
*/
public boolean nodeDead() {
if (completedFragments.get() == totalFragments.get()) {
return false;
}
while (completedFragments.get() < totalFragments.get()) {
fragmentComplete();
}
return true;
}
}
/**
* Increments the number of currently complete nodes and returns the number of completed nodes. If the there are no
* more pending nodes, moves the query to a terminal state.
*/
private void nodeComplete() {
final int finishedNodes = this.finishedNodes.incrementAndGet();
final int totalNodes = nodeMap.size();
Preconditions.checkArgument(finishedNodes <= totalNodes, "The finished node count exceeds the total node count");
final int remaining = totalNodes - finishedNodes;
if (remaining == 0) {
// this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
foreman.addToEventQueue(QueryState.COMPLETED, null);
} else {
logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
this.fragmentDataSet.size() - finishedFragments.get());
}
}
public FragmentStatusListener getFragmentStatusListener(){
return fragmentStatusListener;
}
private final FragmentStatusListener fragmentStatusListener = new FragmentStatusListener() {
@Override
public void statusUpdate(final FragmentStatus status) {
logger.debug("New fragment status was provided to QueryManager of {}", status);
switch(status.getProfile().getState()) {
case AWAITING_ALLOCATION:
case RUNNING:
case CANCELLATION_REQUESTED:
updateFragmentStatus(status);
break;
case FAILED:
foreman.addToEventQueue(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
// fall-through.
case FINISHED:
case CANCELLED:
fragmentDone(status);
break;
default:
throw new UnsupportedOperationException(String.format("Received status of %s", status));
}
}
};
public DrillbitStatusListener getDrillbitStatusListener() {
return drillbitStatusListener;
}
private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener() {
@Override
public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
}
@Override
public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
final StringBuilder failedNodeList = new StringBuilder();
boolean atLeastOneFailure = false;
for (final DrillbitEndpoint ep : unregisteredDrillbits) {
final NodeTracker tracker = nodeMap.get(ep);
if (tracker == null) {
continue; // fragments were not assigned to this Drillbit
}
// mark node as dead.
if (!tracker.nodeDead()) {
continue; // fragments assigned to this Drillbit completed
}
// fragments were running on the Drillbit, capture node name for exception or logging message
if (atLeastOneFailure) {
failedNodeList.append(", ");
} else {
atLeastOneFailure = true;
}
failedNodeList.append(ep.getAddress());
failedNodeList.append(":");
failedNodeList.append(ep.getUserPort());
}
if (atLeastOneFailure) {
logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}",
failedNodeList, QueryIdHelper.getQueryId(queryId));
foreman.addToEventQueue(QueryState.FAILED,
new ForemanException(String.format("One more more nodes lost connectivity during query. Identified nodes were [%s].",
failedNodeList)));
}
}
};
}