/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.rpc.user;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.user.UserClient.UserToBitConnection;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;

import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Encapsulates the future management of query submissions.  This entails a
 * potential race condition.  Normal ordering is:
 * <ul>
 *   <li>1.  Submit query to be executed. </li>
 *   <li>2.  Receive QueryHandle for buffer management. </li>
 *   <li>3.  Start receiving results batches for query. </li>
 * </ul>
 * However, 3 could potentially occur before 2.   Because of that, we need to
 * handle this case and then do a switcheroo.
 */
public class QueryResultHandler {
  private static final Logger logger =
      LoggerFactory.getLogger(QueryResultHandler.class);

  /**
   * Current listener for results, for each active query.
   * <p>
   *   Concurrency:  Access by SubmissionLister for query-ID message vs.
   *   access by batchArrived is not otherwise synchronized.
   * </p>
   */
  private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
      Maps.newConcurrentMap();

  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
    return new SubmissionListener(resultsListener);
  }

  public RpcConnectionHandler<UserToBitConnection> getWrappedConnectionHandler(
      RpcConnectionHandler<UserToBitConnection> handler) {
    return new ChannelClosedHandler(handler);
  }

  /**
   * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
   * handles data result messages
   */
  public void resultArrived(ByteBuf pBody) throws RpcException {
    QueryResult queryResult = RpcBus.get(pBody, QueryResult.PARSER);

    QueryId queryId = queryResult.getQueryId();
    QueryState queryState = queryResult.getQueryState();

    if (logger.isDebugEnabled()) {
      logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
          QueryIdHelper.getQueryId(queryId));
    }

    assert queryResult.hasQueryState() : "received query result without QueryState";

    boolean isFailureResult = QueryState.FAILED == queryState;
    // CANCELED queries are handled the same way as COMPLETED
    boolean isTerminalResult;
    switch (queryState) {
      case FAILED:
      case CANCELED:
      case COMPLETED:
        isTerminalResult = true;
        break;
      default:
        logger.error("Unexpected/unhandled QueryState " + queryState
          + " (for query " + queryId +  ")");
        isTerminalResult = false;
        break;
    }

    assert isFailureResult || queryResult.getErrorCount() == 0
      : "Error count for the query batch is non-zero but QueryState != FAILED";

    UserResultsListener resultsListener = newUserResultsListener(queryId);

    try {
      if (isFailureResult) {

        // Failure case--pass on via submissionFailed(...).
        resultsListener.submissionFailed(new UserRemoteException(queryResult.getError(0)));
        // Note: Listener is removed in finally below.
      } else if (isTerminalResult) {

        // A successful completion/canceled case--pass on via resultArrived
        try {
          resultsListener.queryCompleted(queryState);
        } catch (Exception e) {
          resultsListener.submissionFailed(UserException.systemError(e).build(logger));
        }
      } else {
        logger.warn("queryState {} was ignored", queryState);
      }
    } finally {
      if (isTerminalResult) {
        // TODO:  What exactly are we checking for?  How should we really check
        // for it?
        if ((! (resultsListener instanceof BufferingResultsListener)
          || ((BufferingResultsListener) resultsListener).output != null)) {
          queryIdToResultsListenersMap.remove(queryId, resultsListener);
        }
      }
    }
  }

  /**
   * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
   * handles query data messages
   */
  public void batchArrived(ConnectionThrottle throttle,
                            ByteBuf pBody, ByteBuf dBody) throws RpcException {
    QueryData queryData = RpcBus.get(pBody, QueryData.PARSER);
    // Current batch coming in.
    DrillBuf drillBuf = (DrillBuf) dBody;
    QueryDataBatch batch = new QueryDataBatch(queryData, drillBuf);

    QueryId queryId = queryData.getQueryId();

    if (logger.isDebugEnabled()) {
      logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
    }
    logger.trace("batchArrived: batch = {}", batch);

    UserResultsListener resultsListener = newUserResultsListener(queryId);

    // A data case--pass on via dataArrived
    try {
      resultsListener.dataArrived(batch, throttle);
      // That releases batch if successful.
    } catch (Exception e) {
      try {
        batch.release();
      } catch (IllegalStateException e2) {
        // Ignore, released twice
      }
      resultsListener.submissionFailed(UserException.systemError(e).build(logger));
    }
  }

  /**
   * Return {@link UserResultsListener} associated with queryId. Will create a
   * new {@link BufferingResultsListener} if no listener found.
   *
   * @param queryId
   *          queryId we are getting the listener for
   * @return {@link UserResultsListener} associated with queryId
   */
  private UserResultsListener newUserResultsListener(QueryId queryId) {
    UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId);
    logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener);
    if (null == resultsListener) {

      // WHO?? didn't get query ID response and set submission listener yet,
      // so install a buffering listener for now
      BufferingResultsListener bl = new BufferingResultsListener();
      resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl);
      // If we had a successful insertion, use that reference.  Otherwise, just
      // throw away the new buffering listener.
      if (null == resultsListener) {
        resultsListener = bl;
      }
    }
    return resultsListener;
  }

  private static class BufferingResultsListener implements UserResultsListener {

    private final ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
    private volatile UserException ex;
    private volatile QueryState queryState;
    private volatile UserResultsListener output;
    private volatile ConnectionThrottle throttle;

    public boolean transferTo(UserResultsListener l) {
      synchronized (this) {
        output = l;
        for (QueryDataBatch r : results) {
          l.dataArrived(r, throttle);
        }
        if (ex != null) {
          l.submissionFailed(ex);
          return true;
        } else if (queryState != null) {
          l.queryCompleted(queryState);
          return true;
        }

        return false;
      }
    }

    @Override
    public void queryCompleted(QueryState state) {
      assert queryState == null;
      this.queryState = state;
      synchronized (this) {
        if (output != null) {
          output.queryCompleted(state);
        }
      }
    }

    @Override
    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
      this.throttle = throttle;

      synchronized (this) {
        if (output == null) {
          this.results.add(result);
        } else {
          output.dataArrived(result, throttle);
        }
      }
    }

    @Override
    public void submissionFailed(UserException ex) {
      assert queryState == null;
      // there is one case when submissionFailed() is called even though the
      // query didn't fail on the server side
      // it happens when UserResultsListener.batchArrived() throws an exception
      // that will be passed to
      // submissionFailed() by QueryResultHandler.dataArrived()
      queryState = QueryState.FAILED;
      synchronized (this) {
        if (output == null) {
          this.ex = ex;
        } else{
          output.submissionFailed(ex);
        }
      }
    }

    @Override
    public void queryIdArrived(QueryId queryId) { }
  }

  private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {

    private final UserResultsListener resultsListener;
    private final AtomicBoolean isTerminal = new AtomicBoolean(false);

    public SubmissionListener(UserResultsListener resultsListener) {
      this.resultsListener = resultsListener;
    }

    @Override
    public void failed(RpcException ex) {
      if (!isTerminal.compareAndSet(false, true)) {
        logger.warn("Received multiple responses to run query request.");
        return;
      }

      // Although query submission failed, results might have arrived for this query.
      // However, the results could not be transferred to this resultListener because
      // there is no query id mapped to this resultListener. Look out for the warning
      // message from ChannelClosedHandler in the client logs.
      // TODO(DRILL-4586)
      resultsListener.submissionFailed(UserException.systemError(ex)
          .addContext("Query submission to Drillbit failed.")
          .build(logger));
    }

    @Override
    public void success(QueryId queryId, ByteBuf buf) {
      if (!isTerminal.compareAndSet(false, true)) {
        logger.warn("Received multiple responses to run query request.");
        return;
      }

      resultsListener.queryIdArrived(queryId);
      if (logger.isDebugEnabled()) {
        logger.debug("Received QueryId {} successfully. Adding results listener {}.",
          QueryIdHelper.getQueryId(queryId), resultsListener);
      }
      UserResultsListener oldListener =
          queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);

      // We need to deal with the situation where we already received results by
      // the time we got the query id back.  In that case, we'll need to
      // transfer the buffering listener over, grabbing a lock against reception
      // of additional results during the transition.
      if (oldListener != null) {
        logger.debug("Unable to place user results listener, buffering listener was already in place.");
        if (oldListener instanceof BufferingResultsListener) {
          boolean all = ((BufferingResultsListener) oldListener).transferTo(this.resultsListener);
          // simply remove the buffering listener if we already have the last response.
          if (all) {
            queryIdToResultsListenersMap.remove(queryId);
          } else {
            boolean replaced = queryIdToResultsListenersMap.replace(queryId, oldListener, resultsListener);
            if (!replaced) {
              throw new IllegalStateException(); // TODO: Say what the problem is!
            }
          }
        } else {
          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
        }
      }
    }

    @Override
    public void interrupted(InterruptedException ex) {
      if (!isTerminal.compareAndSet(false, true)) {
        logger.warn("Received multiple responses to run query request.");
        return;
      }

      // TODO(DRILL-4586)
      resultsListener.submissionFailed(UserException.systemError(ex)
          .addContext("The client had been asked to wait as the Drillbit is potentially being over-utilized." +
              " But the client was interrupted while waiting.")
          .build(logger));
    }
  }

  /**
   * When a {@link UserToBitConnection connection} to a server is successfully
   * created, this handler adds a listener to that connection that listens to
   * connection closure. If the connection is closed, all active
   * {@link UserResultsListener result listeners} are failed.
   */
  private class ChannelClosedHandler implements RpcConnectionHandler<UserToBitConnection> {

    private final RpcConnectionHandler<UserToBitConnection> parentHandler;

    public ChannelClosedHandler(RpcConnectionHandler<UserToBitConnection> parentHandler) {
      this.parentHandler = parentHandler;
    }

    @Override
    public void connectionSucceeded(UserToBitConnection connection) {
      connection.getChannel().closeFuture().addListener(
          new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future)
                throws Exception {
              for (UserResultsListener listener : queryIdToResultsListenersMap.values()) {
                listener.submissionFailed(UserException.connectionError()
                    .message("Connection %s closed unexpectedly. Drillbit down?",
                        connection.getName())
                    .build(logger));
                if (listener instanceof BufferingResultsListener) {
                  // the appropriate listener will be failed by SubmissionListener#failed
                  logger.warn("Buffering listener failed before results were transferred to the actual listener.");
                }
              }
            }
          });
      parentHandler.connectionSucceeded(connection);
    }

    @Override
    public void connectionFailed(FailureType type, Throwable t) {
      parentHandler.connectionFailed(type, t);
    }
  }
}
