blob: 64df6c4895f1d94aff7ebf1a3cfbbecebe255a23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.foreman;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.FailureUtils;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
/**
* Foreman manages all the fragments (local and remote) for a single query where this
* is the driving/root node.
*
* The flow is as follows:
* <ul>
* <li>While Foreman is initialized query is in preparing state.</li>
* <li>Foreman is submitted as a runnable.</li>
* <li>Runnable does query planning.</li>
* <li>Runnable submits query to be enqueued.</li>
* <li>The Runnable's run() completes, but the Foreman stays around to listen to state changes.</li>
* <li>Once query is enqueued, starting fragments are sent out.</li>
* <li>Status listener are activated</li>
* <li>Foreman listens for state change messages.</li>
* <li>State change messages can drive the state to FAILED or CANCELED, in which case
* messages are sent to running fragments to terminate.</li>
* <li>When all fragments is completed, state change messages drive the state to COMPLETED.</li>
* </ul>
*/
public class Foreman implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Foreman.class);
private static final Logger queryLogger = LoggerFactory.getLogger("query.logger");
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class);
public enum ProfileOption { SYNC, ASYNC, NONE }
private static final ObjectMapper MAPPER = new ObjectMapper();
private final QueryId queryId;
private final String queryIdString;
private final RunQuery queryRequest;
private final QueryContext queryContext;
private final QueryManager queryManager; // handles lower-level details of query execution
private final DrillbitContext drillbitContext;
private final UserClientConnection initiatingClient; // used to send responses
private boolean resume;
private final QueryResourceManager queryRM;
private final ResponseSendListener responseListener = new ResponseSendListener();
private final GenericFutureListener<Future<Void>> closeListener = future -> cancel();
private final Future<Void> closeFuture;
private final FragmentsRunner fragmentsRunner;
private final QueryStateProcessor queryStateProcessor;
private String queryText;
private RuntimeFilterRouter runtimeFilterRouter;
private final boolean enableRuntimeFilter;
/**
* Constructor. Sets up the Foreman, but does not initiate any execution.
*
* @param bee work manager (runs fragments)
* @param drillbitContext drillbit context
* @param connection connection
* @param queryId the id for the query
* @param queryRequest the query to execute
*/
public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
this.queryId = queryId;
this.queryIdString = QueryIdHelper.getQueryId(queryId);
this.queryRequest = queryRequest;
this.drillbitContext = drillbitContext;
this.initiatingClient = connection;
this.closeFuture = initiatingClient.getClosureFuture();
closeFuture.addListener(closeListener);
// Apply AutoLimit on resultSet (Usually received via REST APIs)
final int autoLimit = queryRequest.getAutolimitRowcount();
if (autoLimit > 0) {
connection.getSession().getOptions().setLocalOption(ExecConstants.QUERY_MAX_ROWS, autoLimit);
}
this.queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
this.queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
drillbitContext.getClusterCoordinator(), this);
this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
}
/**
* @return query id
*/
public QueryId getQueryId() {
return queryId;
}
/**
* @return current query state
*/
public QueryState getState() {
return queryStateProcessor.getState();
}
/**
* @return sql query text of the query request
*/
public String getQueryText() {
return queryText;
}
/**
* Get the QueryContext created for the query.
*
* @return the QueryContext
*/
public QueryContext getQueryContext() {
return queryContext;
}
/**
* Get the QueryManager created for the query.
*
* @return the QueryManager
*/
public QueryManager getQueryManager() {
return queryManager;
}
/**
* Cancel the query (move query in cancellation requested state).
* Query execution will be canceled once possible.
*/
public void cancel() {
queryStateProcessor.cancel();
}
/**
* Adds query status in the event queue to process it when foreman is ready.
*
* @param state new query state
* @param exception exception if failure has occurred
*/
public void addToEventQueue(QueryState state, Exception exception) {
queryStateProcessor.addToEventQueue(state, exception);
}
/**
* Resume the query. Regardless of the current state, this method sends a resume signal to all fragments.
* This method can be called multiple times.
*/
public void resume() {
resume = true;
// resume all pauses through query context
queryContext.getExecutionControls().unpauseAll();
// resume all pauses through all fragment contexts
queryManager.unpauseExecutingFragments(drillbitContext);
}
/**
* Called by execution pool to do query setup, and kick off remote execution.
*
* <p>Note that completion of this function is not the end of the Foreman's role
* in the query's lifecycle.
*/
@Override
public void run() {
// rename the thread we're using for debugging purposes
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
currentThread.setName(queryIdString + ":foreman");
try {
/*
Check if the foreman is ONLINE. If not don't accept any new queries.
*/
if (!drillbitContext.isForemanOnline()) {
throw new ForemanException("Query submission failed since Foreman is shutting down.");
}
} catch (ForemanException e) {
logger.debug("Failure while submitting query", e);
queryStateProcessor.addToEventQueue(QueryState.FAILED, e);
}
queryText = queryRequest.getPlan();
queryStateProcessor.moveToState(QueryState.PLANNING, null);
try {
injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
// convert a run query request into action
switch (queryRequest.getType()) {
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
break;
case PHYSICAL:
parseAndRunPhysicalPlan(queryRequest.getPlan());
break;
case SQL:
final String sql = queryRequest.getPlan();
// log query id, username and query text before starting any real work. Also, put
// them together such that it is easy to search based on query id
logger.info("Query text for query with id {} issued by {}: {}", queryIdString,
queryContext.getQueryUserName(), sql);
runSQL(sql);
break;
case EXECUTION:
runFragment(queryRequest.getFragmentsList());
break;
case PREPARED_STATEMENT:
runPreparedStatement(queryRequest.getPreparedStatementHandle());
break;
default:
throw new IllegalStateException();
}
injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
} catch (ForemanException | UserException e) {
queryStateProcessor.moveToState(QueryState.FAILED, e);
} catch (OutOfMemoryError | OutOfMemoryException e) {
if (FailureUtils.isDirectMemoryOOM(e)) {
queryStateProcessor.moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
} else {
/*
* FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
* die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify
* them, which might not work under these conditions.
*/
FailureUtils.unrecoverableFailure(e, "Unable to handle out of memory condition in Foreman.", EXIT_CODE_HEAP_OOM);
}
} catch (Throwable ex) {
queryStateProcessor.moveToState(QueryState.FAILED,
new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
} finally {
// restore the thread's original name
currentThread.setName(originalName);
}
/*
* Note that despite the run() completing, the Foreman continues to exist, and receives
* events (indirectly, through the QueryManager's use of stateListener), about fragment
* completions. It won't go away until everything is completed, failed, or cancelled.
*/
}
/**
* While one fragments where sanding out, other might have been completed. We don't want to process completed / failed
* events until all fragments are sent out. This method triggers events processing when all fragments were sent out.
*/
public void startProcessingEvents() {
queryStateProcessor.startProcessingEvents();
// If we received the resume signal before fragments are setup, the first call does not actually resume the
// fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
if (resume) {
resume();
}
}
private ProfileOption getProfileOption(QueryContext queryContext) {
if (queryContext.isSkipProfileWrite()) {
return ProfileOption.NONE;
}
OptionSet options = queryContext.getOptions();
if (!options.getBoolean(ExecConstants.ENABLE_QUERY_PROFILE_OPTION)) {
return ProfileOption.NONE;
}
if (options.getBoolean(ExecConstants.QUERY_PROFILE_DEBUG_OPTION)) {
return ProfileOption.SYNC;
} else {
return ProfileOption.ASYNC;
}
}
private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
LogicalPlan logicalPlan;
try {
logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
} catch (final IOException e) {
throw new ForemanException("Failure parsing logical plan.", e);
}
if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) {
throw new ForemanException(
"Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.");
}
log(logicalPlan);
final PhysicalPlan physicalPlan = convert(logicalPlan);
if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
returnPhysical(physicalPlan);
return;
}
log(physicalPlan);
runPhysicalPlan(physicalPlan);
}
private void log(final LogicalPlan plan) {
if (logger.isDebugEnabled()) {
logger.debug("Logical {}", plan.unparse(queryContext.getLpPersistence()));
}
}
private void log(final PhysicalPlan plan) {
if (logger.isDebugEnabled()) {
try {
final String planText = queryContext.getLpPersistence().getMapper().writeValueAsString(plan);
logger.debug("Physical {}", planText);
} catch (final IOException e) {
logger.warn("Error while attempting to log physical plan.", e);
}
}
}
private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException {
final String jsonPlan = plan.unparse(queryContext.getLpPersistence().getMapper().writer());
runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan)));
}
public static class PhysicalFromLogicalExplain {
public final String json;
public PhysicalFromLogicalExplain(final String json) {
this.json = json;
}
}
private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException {
try {
final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
runPhysicalPlan(plan);
} catch (final IOException e) {
throw new ForemanSetupException("Failure while parsing physical plan.", e);
}
}
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
runPhysicalPlan(plan, null);
}
private void runPhysicalPlan(final PhysicalPlan plan, Pointer<String> textPlan) throws ExecutionSetupException {
validatePlan(plan);
queryRM.visitAbstractPlan(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM);
if (enableRuntimeFilter) {
runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
}
if (textPlan != null) {
queryManager.setPlanText(textPlan.value);
}
queryRM.visitPhysicalPlan(work);
queryRM.setCost(plan.totalCost());
queryManager.setTotalCost(plan.totalCost());
work.applyPlan(drillbitContext.getPlanReader());
logWorkUnit(work);
fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator());
startQueryProcessing();
}
/**
* This is a helper method to run query based on the list of PlanFragment that were planned
* at some point of time
* @param fragmentsList fragment list
* @throws ExecutionSetupException
*/
private void runFragment(List<PlanFragment> fragmentsList) throws ExecutionSetupException {
// need to set QueryId, MinorFragment for incoming Fragments
PlanFragment rootFragment = null;
boolean isFirst = true;
final List<PlanFragment> planFragments = Lists.newArrayList();
for (PlanFragment myFragment : fragmentsList) {
final FragmentHandle handle = myFragment.getHandle();
// though we have new field in the FragmentHandle - parentQueryId
// it can not be used until every piece of code that creates handle is using it, as otherwise
// comparisons on that handle fail that causes fragment runtime failure
final FragmentHandle newFragmentHandle = FragmentHandle.newBuilder().setMajorFragmentId(handle.getMajorFragmentId())
.setMinorFragmentId(handle.getMinorFragmentId()).setQueryId(queryId)
.build();
final PlanFragment newFragment = PlanFragment.newBuilder(myFragment).setHandle(newFragmentHandle).build();
if (isFirst) {
rootFragment = newFragment;
isFirst = false;
} else {
planFragments.add(newFragment);
}
}
assert rootFragment != null;
final FragmentRoot rootOperator;
try {
rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
queryRM.setCost(rootOperator.getCost().getOutputRowCount());
fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);
startQueryProcessing();
}
/**
* Enqueues the query and once enqueued, starts sending out query fragments for further execution.
* Moves query to RUNNING state.
*/
private void startQueryProcessing() {
enqueue();
runFragments();
queryStateProcessor.moveToState(QueryState.RUNNING, null);
}
/**
* Move query to ENQUEUED state. Enqueues query if queueing is enabled.
* Foreman run will be blocked until query is enqueued.
* In case of failures (ex: queue timeout exception) will move query to FAILED state.
*/
private void enqueue() {
queryStateProcessor.moveToState(QueryState.ENQUEUED, null);
try {
queryRM.admit();
queryStateProcessor.moveToState(QueryState.STARTING, null);
} catch (QueueTimeoutException | QueryQueueException e) {
queryStateProcessor.moveToState(QueryState.FAILED, e);
} finally {
String queueName = queryRM.queueName();
queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
}
}
private void runFragments() {
try {
fragmentsRunner.submit();
} catch (Exception e) {
queryStateProcessor.moveToState(QueryState.FAILED, e);
} finally {
/*
* Begin accepting external events.
*
* Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
* is an exception anywhere during setup, it wouldn't occur, and any events that are generated
* as a result of any partial setup that was done (such as the FragmentSubmitListener,
* the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
* event delivery call.
*
* If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
* make sure that we can't make things any worse as those events are delivered, but allow
* any necessary remaining cleanup to proceed.
*
* Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
* would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
* to accept events.
*/
startProcessingEvents();
}
}
/**
* Helper method to execute the query in prepared statement. Current implementation takes the query from opaque
* object of the <code>preparedStatement</code> and submits as a new query.
*
* @param preparedStatementHandle prepared statement handle
* @throws ExecutionSetupException
*/
private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
throws ExecutionSetupException {
final ServerPreparedStatementState serverState;
try {
serverState =
ServerPreparedStatementState.PARSER.parseFrom(preparedStatementHandle.getServerInfo());
} catch (final InvalidProtocolBufferException ex) {
throw UserException.parseError(ex)
.message("Failed to parse the prepared statement handle. " +
"Make sure the handle is same as one returned from create prepared statement call.")
.build(logger);
}
queryText = serverState.getSqlQuery();
logger.info("Prepared statement query for QueryId {} : {}", queryId, queryText);
runSQL(queryText);
}
private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
if (plan.getProperties().resultMode != ResultMode.EXEC) {
throw new ForemanSetupException(String.format(
"Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC",
plan.getProperties().resultMode));
}
}
private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan,
final QueryResourceManager rm) throws ExecutionSetupException {
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(),
queryContext.getCurrentEndpoint(),
queryId, queryContext.getOnlineEndpoints(),
rootFragment, initiatingClient.getSession(),
queryContext.getQueryContextInfo());
}
private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
if (! logger.isTraceEnabled()) {
return;
}
logger.trace(String.format("PlanFragments for query %s \n%s",
queryId, queryWorkUnit.stringifyFragments()));
}
private void runSQL(final String sql) throws ExecutionSetupException {
final Pointer<String> textPlan = new Pointer<>();
final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
runPhysicalPlan(plan, textPlan);
}
private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
if (logger.isDebugEnabled()) {
logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence()));
}
return new BasicOptimizer(queryContext, initiatingClient).optimize(
new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
}
public RuntimeFilterRouter getRuntimeFilterRouter() {
return runtimeFilterRouter;
}
/**
* Manages the end-state processing for Foreman.
*
* End-state processing is tricky, because even if a query appears to succeed, but
* we then encounter a problem during cleanup, we still want to mark the query as
* failed. So we have to construct the successful result we would send, and then
* clean up before we send that result, possibly changing that result if we encounter
* a problem during cleanup. We only send the result when there is nothing left to
* do, so it will account for any possible problems.
*
* The idea here is to make close()ing the ForemanResult do the final cleanup and
* sending. Closing the result must be the last thing that is done by Foreman.
*/
public class ForemanResult implements AutoCloseable {
private QueryState resultState = null;
private volatile Exception resultException = null;
private boolean isClosed = false;
/**
* Set up the result for a COMPLETED or CANCELED state.
*
* <p>Note that before sending this result, we execute cleanup steps that could
* result in this result still being changed to a FAILED state.
*
* @param queryState one of COMPLETED or CANCELED
*/
public void setCompleted(final QueryState queryState) {
Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED));
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState == null);
resultState = queryState;
}
/**
* Set up the result for a FAILED state.
*
* <p>Failures that occur during cleanup processing will be added as suppressed
* exceptions.
*
* @param exception the exception that led to the FAILED state
*/
public void setFailed(final Exception exception) {
Preconditions.checkArgument(exception != null);
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState == null);
resultState = QueryState.FAILED;
resultException = exception;
}
/**
* Ignore the current status and force the given failure as current status.
* NOTE: Used only for testing purposes. Shouldn't be used in production.
*/
public void setForceFailure(final Exception exception) {
Preconditions.checkArgument(exception != null);
Preconditions.checkState(!isClosed);
resultState = QueryState.FAILED;
resultException = exception;
}
/**
* Add an exception to the result. All exceptions after the first become suppressed
* exceptions hanging off the first.
*
* @param exception the exception to add
*/
private void addException(final Exception exception) {
assert exception != null;
if (resultException == null) {
resultException = exception;
} else {
resultException.addSuppressed(exception);
}
}
/**
* Expose the current exception (if it exists). This is useful for secondary reporting to the query profile.
*
* @return the current Foreman result exception or null.
*/
public Exception getException() {
return resultException;
}
/**
* Close the given resource, catching and adding any caught exceptions via {@link #addException(Exception)}. If an
* exception is caught, it will change the result state to FAILED, regardless of what its current value.
*
* @param autoCloseable
* the resource to close
*/
private void suppressingClose(final AutoCloseable autoCloseable) {
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState != null);
if (autoCloseable == null) {
return;
}
try {
autoCloseable.close();
} catch(final Exception e) {
/*
* Even if the query completed successfully, we'll still report failure if we have
* problems cleaning up.
*/
resultState = QueryState.FAILED;
addException(e);
}
}
private void logQuerySummary() {
try {
LoggedQuery q = new LoggedQuery(
queryIdString,
queryContext.getQueryContextInfo().getDefaultSchemaName(),
queryText,
new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
new Date(System.currentTimeMillis()),
queryStateProcessor.getState(),
queryContext.getSession().getCredentials().getUserName(),
initiatingClient.getRemoteAddress());
queryLogger.info(MAPPER.writeValueAsString(q));
} catch (Exception e) {
logger.error("Failure while recording query information to query log.", e);
}
}
@Override
public void close() {
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState != null);
logger.debug(queryIdString + ": cleaning up.");
injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
if (enableRuntimeFilter && runtimeFilterRouter != null) {
runtimeFilterRouter.waitForComplete();
}
// remove the channel disconnected listener (doesn't throw)
closeFuture.removeListener(closeListener);
// log the query summary
logQuerySummary();
// These are straight forward removals from maps, so they won't throw.
drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
suppressingClose(queryContext);
/*
* We do our best to write the latest state, but even that could fail. If it does, we can't write
* the (possibly newly failing) state, so we continue on anyway.
*
* We only need to do this if the resultState differs from the last recorded state
*/
if (resultState != queryStateProcessor.getState()) {
suppressingClose(() -> queryStateProcessor.recordNewState(resultState));
}
// set query end time before writing final profile
queryStateProcessor.close();
/*
* Construct the response based on the latest resultState. The builder shouldn't fail.
*/
final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
.setQueryId(queryId)
.setQueryState(resultState);
final UserException uex;
if (resultException != null) {
final boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
if (resultException instanceof UserException) {
uex = (UserException) resultException;
} else {
uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build(logger);
}
resultBuilder.addError(uex.getOrCreatePBError(verbose));
} else {
uex = null;
}
// Debug option: write query profile before sending final results so that
// the client can be certain the profile exists.
ProfileOption profileOption = getProfileOption(queryContext);
if (profileOption == ProfileOption.SYNC) {
queryManager.writeFinalProfile(uex);
}
/*
* If sending the result fails, we don't really have any way to modify the result we tried to send;
* it is possible it got sent but the result came from a later part of the code path. It is also
* possible the connection has gone away, so this is irrelevant because there's nowhere to
* send anything to.
*/
try {
// send whatever result we ended up with
initiatingClient.sendResult(responseListener, resultBuilder.build());
} catch(final Exception e) {
addException(e);
logger.warn("Exception sending result to client", resultException);
}
// Store the final result here so we can capture any error/errorId in the
// profile for later debugging.
// Normal behavior is to write the query profile AFTER sending results to the user.
// The observed
// user behavior is a possible time-lag between query return and appearance
// of the query profile in persistent storage. Also, the query might
// succeed, but the profile never appear if the profile write fails. This
// behavior is acceptable for an eventually-consistent distributed system.
// The key benefit is that the client does not wait for the persistent
// storage write; query completion occurs in parallel with profile
// persistence.
if (profileOption == ProfileOption.ASYNC) {
queryManager.writeFinalProfile(uex);
}
// Remove the Foreman from the running query list.
fragmentsRunner.getBee().retireForeman(Foreman.this);
try {
queryContext.close();
} catch (Exception e) {
logger.error("Unable to close query context for query {}", QueryIdHelper.getQueryId(queryId), e);
}
try {
queryManager.close();
} catch (final Exception e) {
logger.warn("unable to close query manager", e);
}
try {
queryRM.exit();
} finally {
isClosed = true;
}
}
}
/**
* Listens for the status of the RPC response sent to the user for the query.
*/
private static class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@Override
public void failed(final RpcException ex) {
logger.info("Failure while trying communicate query result to initiating client. " +
"This would happen if a client is disconnected before response notice can be sent.", ex);
}
@Override
public void interrupted(final InterruptedException e) {
logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
}
}
}