/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.work.foreman;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.FailureUtils;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Date;
import java.util.List;

import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;

/**
 * Foreman manages all the fragments (local and remote) for a single query where this
 * is the driving/root node.
 *
 * The flow is as follows:
 * <ul>
 * <li>While Foreman is initialized query is in preparing state.</li>
 * <li>Foreman is submitted as a runnable.</li>
 * <li>Runnable does query planning.</li>
 * <li>Runnable submits query to be enqueued.</li>
 * <li>The Runnable's run() completes, but the Foreman stays around to listen to state changes.</li>
 * <li>Once query is enqueued, starting fragments are sent out.</li>
 * <li>Status listener are activated</li>
 * <li>Foreman listens for state change messages.</li>
 * <li>State change messages can drive the state to FAILED or CANCELED, in which case
 *   messages are sent to running fragments to terminate.</li>
 * <li>When all fragments is completed, state change messages drive the state to COMPLETED.</li>
 * </ul>
 */

public class Foreman implements Runnable {
  private static final Logger logger = LoggerFactory.getLogger(Foreman.class);
  private static final Logger queryLogger = LoggerFactory.getLogger("query.logger");
  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class);

  public enum ProfileOption { SYNC, ASYNC, NONE }

  private static final ObjectMapper MAPPER = new ObjectMapper();

  private final QueryId queryId;
  private final String queryIdString;
  private final RunQuery queryRequest;
  private final QueryContext queryContext;
  private final QueryManager queryManager; // handles lower-level details of query execution
  private final DrillbitContext drillbitContext;
  private final UserClientConnection initiatingClient; // used to send responses
  private boolean resume;

  private final QueryResourceManager queryRM;

  private final ResponseSendListener responseListener = new ResponseSendListener();
  private final GenericFutureListener<Future<Void>> closeListener = future -> cancel();
  private final Future<Void> closeFuture;
  private final FragmentsRunner fragmentsRunner;
  private final QueryStateProcessor queryStateProcessor;

  private String queryText;

  private RuntimeFilterRouter runtimeFilterRouter;
  private final boolean enableRuntimeFilter;

  /**
   * Constructor. Sets up the Foreman, but does not initiate any execution.
   *
   * @param bee work manager (runs fragments)
   * @param drillbitContext drillbit context
   * @param connection connection
   * @param queryId the id for the query
   * @param queryRequest the query to execute
   */
  public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
      final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
    this.queryId = queryId;
    this.queryIdString = QueryIdHelper.getQueryId(queryId);
    this.queryRequest = queryRequest;
    this.drillbitContext = drillbitContext;
    this.initiatingClient = connection;
    this.closeFuture = initiatingClient.getClosureFuture();
    closeFuture.addListener(closeListener);

    // Apply AutoLimit on resultSet (Usually received via REST APIs)
    final int autoLimit = queryRequest.getAutolimitRowcount();
    if (autoLimit > 0) {
      connection.getSession().getOptions().setLocalOption(ExecConstants.QUERY_MAX_ROWS, autoLimit);
    }
    this.queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
    this.queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
        drillbitContext.getClusterCoordinator(), this);
    this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
    this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
    this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
    this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
  }

  /**
   * @return query id
   */
  public QueryId getQueryId() {
    return queryId;
  }

  /**
   * @return current query state
   */
  public QueryState getState() {
    return queryStateProcessor.getState();
  }

  /**
   * @return sql query text of the query request
   */
  public String getQueryText() {
    return queryText;
  }

  /**
   * Get the QueryContext created for the query.
   *
   * @return the QueryContext
   */
  public QueryContext getQueryContext() {
    return queryContext;
  }

  /**
   * Get the QueryManager created for the query.
   *
   * @return the QueryManager
   */
  public QueryManager getQueryManager() {
    return queryManager;
  }

  /**
   * Cancel the query (move query in cancellation requested state).
   * Query execution will be canceled once possible.
   */
  public void cancel() {
    logger.debug("Cancel Foreman");
    queryStateProcessor.cancel();
  }

  /**
   * Adds query status in the event queue to process it when foreman is ready.
   *
   * @param state new query state
   * @param exception exception if failure has occurred
   */
  public void addToEventQueue(QueryState state, Exception exception) {
    queryStateProcessor.addToEventQueue(state, exception);
  }

  /**
   * Resume the query. Regardless of the current state, this method sends a resume signal to all fragments.
   * This method can be called multiple times.
   */
  public void resume() {
    resume = true;
    // resume all pauses through query context
    queryContext.getExecutionControls().unpauseAll();
    // resume all pauses through all fragment contexts
    queryManager.unpauseExecutingFragments(drillbitContext);
  }

  /**
   * Called by execution pool to do query setup, and kick off remote execution.
   *
   * <p>Note that completion of this function is not the end of the Foreman's role
   * in the query's lifecycle.
   */
  @Override
  public void run() {
    // rename the thread we're using for debugging purposes
    final Thread currentThread = Thread.currentThread();
    final String originalName = currentThread.getName();
    currentThread.setName(queryIdString + ":foreman");
    try {
      /*
       Check if the foreman is ONLINE. If not don't accept any new queries.
       */
      if (!drillbitContext.isForemanOnline()) {
        throw new ForemanException("Query submission failed since Foreman is shutting down.");
      }
    } catch (ForemanException e) {
      logger.debug("Failure while submitting query", e);
      queryStateProcessor.addToEventQueue(QueryState.FAILED, e);
    }

    queryText = queryRequest.getPlan();
    try {
      queryStateProcessor.moveToState(QueryState.PLANNING, null);
      injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);

      // convert a run query request into action
      switch (queryRequest.getType()) {
      case LOGICAL:
        parseAndRunLogicalPlan(queryRequest.getPlan());
        break;
      case PHYSICAL:
        parseAndRunPhysicalPlan(queryRequest.getPlan());
        break;
      case SQL:
        final String sql = queryRequest.getPlan();
        // log query id, username and query text before starting any real work. Also, put
        // them together such that it is easy to search based on query id
        logger.info("Query text for query with id {} issued by {}: {}", queryIdString,
            queryContext.getQueryUserName(), sql);
        runSQL(sql);
        break;
      case EXECUTION:
        runFragment(queryRequest.getFragmentsList());
        break;
      case PREPARED_STATEMENT:
        runPreparedStatement(queryRequest.getPreparedStatementHandle());
        break;
      default:
        throw new IllegalStateException();
      }
      injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
    } catch (ForemanException | UserException e) {
      queryStateProcessor.moveToState(QueryState.FAILED, e);
    } catch (OutOfMemoryError | OutOfMemoryException e) {
      if (FailureUtils.isDirectMemoryOOM(e)) {
        queryStateProcessor.moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
      } else {
        /*
         * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
         * die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify
         * them, which might not work under these conditions.
         */
        FailureUtils.unrecoverableFailure(e, "Unable to handle out of memory condition in Foreman.", EXIT_CODE_HEAP_OOM);
      }
    } catch (Throwable ex) {
      queryStateProcessor.moveToState(QueryState.FAILED,
          new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
    } finally {
      // restore the thread's original name
      currentThread.setName(originalName);
    }

    /*
     * Note that despite the run() completing, the Foreman continues to exist, and receives
     * events (indirectly, through the QueryManager's use of stateListener), about fragment
     * completions. It won't go away until everything is completed, failed, or cancelled.
     */
  }

  /**
   * While one fragments where sanding out, other might have been completed. We don't want to process completed / failed
   * events until all fragments are sent out. This method triggers events processing when all fragments were sent out.
   */
  public void startProcessingEvents() {
    queryStateProcessor.startProcessingEvents();

    // If we received the resume signal before fragments are setup, the first call does not actually resume the
    // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
    if (resume) {
      resume();
    }
  }

  private ProfileOption getProfileOption(QueryContext queryContext) {
    if (queryContext.isSkipProfileWrite()) {
      return ProfileOption.NONE;
    }
    OptionSet options = queryContext.getOptions();
    if (!options.getBoolean(ExecConstants.ENABLE_QUERY_PROFILE_OPTION)) {
      return ProfileOption.NONE;
    }
    if (options.getBoolean(ExecConstants.QUERY_PROFILE_DEBUG_OPTION)) {
      return ProfileOption.SYNC;
    } else {
      return ProfileOption.ASYNC;
    }
  }

  private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
    LogicalPlan logicalPlan;
    try {
      logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
    } catch (final IOException e) {
      throw new ForemanException("Failure parsing logical plan.", e);
    }

    if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) {
      throw new ForemanException(
          "Failure running plan.  You requested a result mode of LOGICAL and submitted a logical plan.  In this case you're output mode must be PHYSICAL or EXEC.");
    }

    log(logicalPlan);

    final PhysicalPlan physicalPlan = convert(logicalPlan);

    if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
      returnPhysical(physicalPlan);
      return;
    }

    log(physicalPlan);
    runPhysicalPlan(physicalPlan);
  }

  private void log(final LogicalPlan plan) {
    if (logger.isDebugEnabled()) {
      logger.debug("Logical {}", plan.unparse(queryContext.getLpPersistence()));
    }
  }

  private void log(final PhysicalPlan plan) {
    if (logger.isDebugEnabled()) {
      try {
        final String planText = queryContext.getLpPersistence().getMapper().writeValueAsString(plan);
        logger.debug("Physical {}", planText);
      } catch (final IOException e) {
        logger.warn("Error while attempting to log physical plan.", e);
      }
    }
  }

  private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException {
    final String jsonPlan = plan.unparse(queryContext.getLpPersistence().getMapper().writer());
    runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan)));
  }

  public static class PhysicalFromLogicalExplain {
    public final String json;

    public PhysicalFromLogicalExplain(final String json) {
      this.json = json;
    }
  }

  private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException {
    try {
      final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
      runPhysicalPlan(plan);
    } catch (final IOException e) {
      throw new ForemanSetupException("Failure while parsing physical plan.", e);
    }
  }

  private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
    runPhysicalPlan(plan, null);
  }

  private void runPhysicalPlan(final PhysicalPlan plan, Pointer<String> textPlan) throws ExecutionSetupException {
    validatePlan(plan);

    queryRM.visitAbstractPlan(plan);
    final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM);
    if (enableRuntimeFilter) {
      runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
      runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
    }
    if (textPlan != null) {
      queryManager.setPlanText(textPlan.value);
      queryManager.setPlanProperties(plan.getProperties());
    }
    queryRM.visitPhysicalPlan(work);
    queryRM.setCost(plan.totalCost());
    queryManager.setTotalCost(plan.totalCost());
    work.applyPlan(drillbitContext.getPlanReader());
    logWorkUnit(work);

    fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator());

    startQueryProcessing();
  }

  /**
   * This is a helper method to run query based on the list of PlanFragment that were planned
   * at some point of time
   * @param fragmentsList fragment list
   * @throws ExecutionSetupException
   */
  private void runFragment(List<PlanFragment> fragmentsList) throws ExecutionSetupException {
    // need to set QueryId, MinorFragment for incoming Fragments
    PlanFragment rootFragment = null;
    boolean isFirst = true;
    final List<PlanFragment> planFragments = Lists.newArrayList();
    for (PlanFragment myFragment : fragmentsList) {
      final FragmentHandle handle = myFragment.getHandle();
      // though we have new field in the FragmentHandle - parentQueryId
      // it can not be used until every piece of code that creates handle is using it, as otherwise
      // comparisons on that handle fail that causes fragment runtime failure
      final FragmentHandle newFragmentHandle = FragmentHandle.newBuilder().setMajorFragmentId(handle.getMajorFragmentId())
          .setMinorFragmentId(handle.getMinorFragmentId()).setQueryId(queryId)
          .build();
      final PlanFragment newFragment = PlanFragment.newBuilder(myFragment).setHandle(newFragmentHandle).build();
      if (isFirst) {
        rootFragment = newFragment;
        isFirst = false;
      } else {
        planFragments.add(newFragment);
      }
    }

    assert rootFragment != null;

    final FragmentRoot rootOperator;
    try {
      rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
    } catch (IOException e) {
      throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
    }
    queryRM.setCost(rootOperator.getCost().getOutputRowCount());

    fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);

    startQueryProcessing();
  }

  /**
   * Enqueues the query and once enqueued, starts sending out query fragments for further execution.
   * Moves query to RUNNING state.
   */
  private void startQueryProcessing() {
    enqueue();
    runFragments();
    queryStateProcessor.moveToState(QueryState.RUNNING, null);
  }

  /**
   * Move query to ENQUEUED state. Enqueues query if queueing is enabled.
   * Foreman run will be blocked until query is enqueued.
   * In case of failures (ex: queue timeout exception) will move query to FAILED state.
   */
  private void enqueue() {
    queryStateProcessor.moveToState(QueryState.ENQUEUED, null);

    try {
      queryRM.admit();
      queryStateProcessor.moveToState(QueryState.STARTING, null);
    } catch (QueueTimeoutException | QueryQueueException e) {
      queryStateProcessor.moveToState(QueryState.FAILED, e);
    } finally {
      String queueName = queryRM.queueName();
      queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
    }
  }

  private void runFragments() {
    try {
      fragmentsRunner.submit();
    } catch (Exception e) {
      queryStateProcessor.moveToState(QueryState.FAILED, e);
    } finally {
      /*
       * Begin accepting external events.
       *
       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
       * event delivery call.
       *
       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
       * make sure that we can't make things any worse as those events are delivered, but allow
       * any necessary remaining cleanup to proceed.
       *
       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
       * to accept events.
       */
      startProcessingEvents();
    }
  }

  /**
   * Helper method to execute the query in prepared statement. Current implementation takes the query from opaque
   * object of the <code>preparedStatement</code> and submits as a new query.
   *
   * @param preparedStatementHandle prepared statement handle
   * @throws ExecutionSetupException
   */
  private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
      throws ExecutionSetupException {
    final ServerPreparedStatementState serverState;

    try {
      serverState =
          ServerPreparedStatementState.PARSER.parseFrom(preparedStatementHandle.getServerInfo());
    } catch (final InvalidProtocolBufferException ex) {
      throw UserException.parseError(ex)
          .message("Failed to parse the prepared statement handle. " +
              "Make sure the handle is same as one returned from create prepared statement call.")
          .build(logger);
    }

    queryText = serverState.getSqlQuery();
    logger.info("Prepared statement query for QueryId {} : {}", queryId, queryText);
    runSQL(queryText);

  }

  private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
    if (plan.getProperties().resultMode != ResultMode.EXEC) {
      throw new ForemanSetupException(String.format(
          "Failure running plan.  You requested a result mode of %s and a physical plan can only be output as EXEC",
          plan.getProperties().resultMode));
    }
  }

  private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan,
                                         final QueryResourceManager rm) throws ExecutionSetupException {

    final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();

    final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);

    return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(),
                                                                        queryContext.getCurrentEndpoint(),
                                                                        queryId, queryContext.getOnlineEndpoints(),
                                                                        rootFragment, initiatingClient.getSession(),
                                                                        queryContext.getQueryContextInfo());
  }

  private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
    if (! logger.isTraceEnabled()) {
      return;
    }
    logger.trace(String.format("PlanFragments for query %s \n%s",
        queryId, queryWorkUnit.stringifyFragments()));
  }

  private void runSQL(final String sql) throws ExecutionSetupException {
    final Pointer<String> textPlan = new Pointer<>();
    final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
    runPhysicalPlan(plan, textPlan);
  }

  private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
    if (logger.isDebugEnabled()) {
      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence()));
    }
    return new BasicOptimizer(queryContext, initiatingClient).optimize(
        new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
  }

  public RuntimeFilterRouter getRuntimeFilterRouter() {
    return runtimeFilterRouter;
  }

  /**
   * Manages the end-state processing for Foreman.
   *
   * End-state processing is tricky, because even if a query appears to succeed, but
   * we then encounter a problem during cleanup, we still want to mark the query as
   * failed. So we have to construct the successful result we would send, and then
   * clean up before we send that result, possibly changing that result if we encounter
   * a problem during cleanup. We only send the result when there is nothing left to
   * do, so it will account for any possible problems.
   *
   * The idea here is to make close()ing the ForemanResult do the final cleanup and
   * sending. Closing the result must be the last thing that is done by Foreman.
   */
  public class ForemanResult implements AutoCloseable {
    private QueryState resultState = null;
    private volatile Exception resultException = null;
    private boolean isClosed = false;

    /**
     * Set up the result for a COMPLETED or CANCELED state.
     *
     * <p>Note that before sending this result, we execute cleanup steps that could
     * result in this result still being changed to a FAILED state.
     *
     * @param queryState one of COMPLETED or CANCELED
     */
    public void setCompleted(final QueryState queryState) {
      Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED));
      Preconditions.checkState(!isClosed);
      Preconditions.checkState(resultState == null);

      resultState = queryState;
    }

    /**
     * Set up the result for a FAILED state.
     *
     * <p>Failures that occur during cleanup processing will be added as suppressed
     * exceptions.
     *
     * @param exception the exception that led to the FAILED state
     */
    public void setFailed(final Exception exception) {
      Preconditions.checkArgument(exception != null);
      Preconditions.checkState(!isClosed);
      Preconditions.checkState(resultState == null);

      resultState = QueryState.FAILED;
      resultException = exception;
    }

    /**
     * Ignore the current status and force the given failure as current status.
     * NOTE: Used only for testing purposes. Shouldn't be used in production.
     */
    public void setForceFailure(final Exception exception) {
      Preconditions.checkArgument(exception != null);
      Preconditions.checkState(!isClosed);

      resultState = QueryState.FAILED;
      resultException = exception;
    }

    /**
     * Add an exception to the result. All exceptions after the first become suppressed
     * exceptions hanging off the first.
     *
     * @param exception the exception to add
     */
    private void addException(final Exception exception) {
      assert exception != null;

      if (resultException == null) {
        resultException = exception;
      } else {
        resultException.addSuppressed(exception);
      }
    }

    /**
     * Expose the current exception (if it exists). This is useful for secondary reporting to the query profile.
     *
     * @return the current Foreman result exception or null.
     */
    public Exception getException() {
      return resultException;
    }

    /**
     * Close the given resource, catching and adding any caught exceptions via {@link #addException(Exception)}. If an
     * exception is caught, it will change the result state to FAILED, regardless of what its current value.
     *
     * @param autoCloseable
     *          the resource to close
     */
    private void suppressingClose(final AutoCloseable autoCloseable) {
      Preconditions.checkState(!isClosed);
      Preconditions.checkState(resultState != null);

      if (autoCloseable == null) {
        return;
      }

      try {
        autoCloseable.close();
      } catch(final Exception e) {
        /*
         * Even if the query completed successfully, we'll still report failure if we have
         * problems cleaning up.
         */
        resultState = QueryState.FAILED;
        addException(e);
      }
    }

    private void logQuerySummary() {
      try {
        LoggedQuery q = new LoggedQuery(
            queryIdString,
            queryContext.getQueryContextInfo().getDefaultSchemaName(),
            queryText,
            new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
            new Date(System.currentTimeMillis()),
            queryStateProcessor.getState(),
            queryContext.getQueryUserCredentials().getUserName(),
            initiatingClient.getRemoteAddress());
        queryLogger.info(MAPPER.writeValueAsString(q));
      } catch (Exception e) {
        logger.error("Failure while recording query information to query log.", e);
      }
    }

    @Override
    public void close() {
      Preconditions.checkState(!isClosed);
      Preconditions.checkState(resultState != null);

      logger.debug(queryIdString + ": cleaning up.");
      injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
      if (enableRuntimeFilter && runtimeFilterRouter != null) {
        runtimeFilterRouter.waitForComplete();
      }
      // remove the channel disconnected listener (doesn't throw)
      closeFuture.removeListener(closeListener);

      // log the query summary
      logQuerySummary();

      // These are straight forward removals from maps, so they won't throw.
      drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
      drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());

      suppressingClose(queryContext);

      /*
       * We do our best to write the latest state, but even that could fail. If it does, we can't write
       * the (possibly newly failing) state, so we continue on anyway.
       *
       * We only need to do this if the resultState differs from the last recorded state
       */
      if (resultState != queryStateProcessor.getState()) {
        suppressingClose(() -> queryStateProcessor.recordNewState(resultState));
      }

      // set query end time before writing final profile
      queryStateProcessor.close();

      /*
       * Construct the response based on the latest resultState. The builder shouldn't fail.
       */
      final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
          .setQueryId(queryId)
          .setQueryState(resultState);
      final UserException uex;
      if (resultException != null) {
        final boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
        if (resultException instanceof UserException) {
          uex = (UserException) resultException;
        } else {
          uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build(logger);
        }
        resultBuilder.addError(uex.getOrCreatePBError(verbose));
      } else {
        uex = null;
      }

      // Debug option: write query profile before sending final results so that
      // the client can be certain the profile exists.
      ProfileOption profileOption = getProfileOption(queryContext);
      if (profileOption == ProfileOption.SYNC) {
        queryManager.writeFinalProfile(uex);
      }

      /*
       * If sending the result fails, we don't really have any way to modify the result we tried to send;
       * it is possible it got sent but the result came from a later part of the code path. It is also
       * possible the connection has gone away, so this is irrelevant because there's nowhere to
       * send anything to.
       */
      try {
        // send whatever result we ended up with
        initiatingClient.sendResult(responseListener, resultBuilder.build());
      } catch(final Exception e) {
        addException(e);
        logger.warn("Exception sending result to client", resultException);
      }

      // Store the final result here so we can capture any error/errorId in the
      // profile for later debugging.
      // Normal behavior is to write the query profile AFTER sending results to the user.
      // The observed
      // user behavior is a possible time-lag between query return and appearance
      // of the query profile in persistent storage. Also, the query might
      // succeed, but the profile never appear if the profile write fails. This
      // behavior is acceptable for an eventually-consistent distributed system.
      // The key benefit is that the client does not wait for the persistent
      // storage write; query completion occurs in parallel with profile
      // persistence.

      if (profileOption == ProfileOption.ASYNC) {
        queryManager.writeFinalProfile(uex);
      }

      // Remove the Foreman from the running query list.
      fragmentsRunner.getBee().retireForeman(Foreman.this);

      try {
        queryContext.close();
      } catch (Exception e) {
        logger.error("Unable to close query context for query {}", QueryIdHelper.getQueryId(queryId), e);
      }

      try {
        queryManager.close();
      } catch (final Exception e) {
        logger.warn("unable to close query manager", e);
      }

      try {
        queryRM.exit();
      } finally {
        isClosed = true;
      }
    }
  }

  /**
   * Listens for the status of the RPC response sent to the user for the query.
   */
  private static class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
    @Override
    public void failed(final RpcException ex) {
      logger.info("Failure while trying communicate query result to initiating client. " +
          "This would happen if a client is disconnected before response notice can be sent.", ex);
    }

    @Override
    public void interrupted(final InterruptedException e) {
      logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
    }
  }

}
