/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.tier.sockets;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.operations.QueryOperationContext;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
import org.apache.geode.cache.query.QueryExecutionTimeoutException;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.CqEntry;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.QueryExecutionCanceledException;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.security.AuthorizeRequestPP;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.security.ResourcePermission.Operation;
import org.apache.geode.security.ResourcePermission.Resource;

public abstract class BaseCommandQuery extends BaseCommand {

  /**
   * Process the give query and sends the resulset back to the client.
   *
   * @return true if successful execution false in case of failure.
   */
  protected boolean processQuery(final Message msg,
      final Query query,
      final String queryString,
      final Set regionNames,
      final long start,
      final ServerCQ cqQuery,
      final QueryOperationContext queryContext,
      final ServerConnection servConn,
      final boolean sendResults,
      final SecurityService securityService)
      throws IOException, InterruptedException {
    return processQueryUsingParams(msg, query, queryString, regionNames, start,
        cqQuery, queryContext, servConn, sendResults, null, securityService);
  }

  /**
   * Process the give query and sends the resulset back to the client.
   *
   * @return true if successful execution false in case of failure.
   */
  protected boolean processQueryUsingParams(final Message msg,
      final Query query,
      final String queryString,
      final Set regionNames,
      long start,
      final ServerCQ cqQuery,
      QueryOperationContext queryContext,
      final ServerConnection servConn,
      final boolean sendResults,
      final Object[] params,
      final SecurityService securityService) throws IOException, InterruptedException {
    ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
    CacheServerStats stats = servConn.getCacheServerStats();
    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();

    {
      long oldStart = start;
      start = DistributionStats.getStatTime();
      stats.incReadQueryRequestTime(start - oldStart);
    }

    // from 7.0, set flag to indicate a remote query irrespective of the
    // object type
    if (servConn.getClientVersion().compareTo(Version.GFE_70) >= 0) {
      ((DefaultQuery) query).setRemoteQuery(true);
    }
    // Process the query request
    try {
      // integrated security
      for (Object regionName : regionNames) {
        securityService.authorize(Resource.DATA, Operation.READ, regionName.toString());
      }

      // Execute query
      // startTime = GenericStats.getTime();
      // startTime = System.currentTimeMillis();

      // For now we assume the results are a SelectResults
      // which is the only possibility now, but this may change
      // in the future if we support arbitrary queries
      Object result = null;

      if (params != null) {
        result = query.execute(params);
      } else {
        result = query.execute();
      }

      // Asif : Before conditioning the results check if any
      // of the regions involved in the query have been destroyed
      // or not. If yes, throw an Exception.
      // This is a workaround/fix for Bug 36969
      Iterator itr = regionNames.iterator();
      while (itr.hasNext()) {
        String regionName = (String) itr.next();
        if (crHelper.getRegion(regionName) == null) {
          throw new RegionDestroyedException(
              "Region destroyed during the execution of the query",
              regionName);
        }
      }
      AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest();
      if (postAuthzRequest != null) {
        if (cqQuery == null) {
          queryContext = postAuthzRequest.queryAuthorize(queryString, regionNames, result,
              queryContext, params);
        } else {
          queryContext = postAuthzRequest.executeCQAuthorize(cqQuery.getName(), queryString,
              regionNames, result, queryContext);
        }
        result = queryContext.getQueryResult();
      }

      if (result instanceof SelectResults) {
        SelectResults selectResults = (SelectResults) result;

        if (logger.isDebugEnabled()) {
          logger.debug("Query Result size for : {} is {}", query.getQueryString(),
              selectResults.size());
        }

        CollectionType collectionType = null;
        boolean sendCqResultsWithKey = true;
        boolean isStructs = false;

        // check if resultset has serialized objects, so that they could be sent
        // as ObjectPartList
        boolean hasSerializedObjects = ((DefaultQuery) query).isKeepSerialized();
        if (logger.isDebugEnabled()) {
          logger.debug("Query Result for :{} has serialized objects: {}", query.getQueryString(),
              hasSerializedObjects);
        }
        // Don't convert to a Set, there might be duplicates now
        // The results in a StructSet are stored in Object[]s
        // Get them as Object[]s for the objs[] in order to avoid duplicating
        // the StructTypes

        // Object[] objs = new Object[selectResults.size()];
        // Get the collection type (which includes the element type)
        // (used to generate the appropriate instance on the client)

        // Get the collection type (which includes the element type)
        // (used to generate the appropriate instance on the client)
        collectionType = getCollectionType(selectResults);
        isStructs = collectionType.getElementType().isStructType();

        // Check if the Query is from CQ execution.
        if (cqQuery != null) {
          // Check if the key can be sent to the client based on its version.
          sendCqResultsWithKey = sendCqResultsWithKey(servConn);

          if (sendCqResultsWithKey) {
            // Update the collection type to include key info.
            collectionType = new CollectionTypeImpl(Collection.class,
                new StructTypeImpl(new String[] {"key", "value"}));
            isStructs = collectionType.getElementType().isStructType();
          }
        }

        int numberOfChunks = (int) Math.ceil(selectResults.size() * 1.0 / MAXIMUM_CHUNK_SIZE);

        if (logger.isTraceEnabled()) {
          logger.trace("{}: Query results size: {}: Entries in chunk: {}: Number of chunks: {}",
              servConn.getName(), selectResults.size(), MAXIMUM_CHUNK_SIZE, numberOfChunks);
        }

        long oldStart = start;
        start = DistributionStats.getStatTime();
        stats.incProcessQueryTime(start - oldStart);

        if (sendResults) {
          queryResponseMsg.setMessageType(MessageType.RESPONSE);
          queryResponseMsg.setTransactionId(msg.getTransactionId());
          queryResponseMsg.sendHeader();
        }

        if (sendResults && numberOfChunks == 0) {
          // Send 1 empty chunk
          if (logger.isTraceEnabled()) {
            logger.trace("{}: Creating chunk: 0", servConn.getName());
          }
          writeQueryResponseChunk(new Object[0], collectionType, true, servConn);
          if (logger.isDebugEnabled()) {
            logger.debug("{}: Sent chunk (1 of 1) of query response for query {}",
                servConn.getName(), queryString);
          }
        } else {
          // Send response to client.
          // from 7.0, if the object is in the form of serialized byte array,
          // send it as a part of ObjectPartList
          if (hasSerializedObjects) {
            sendResultsAsObjectPartList(numberOfChunks, servConn, selectResults.asList(), isStructs,
                collectionType, queryString, cqQuery, sendCqResultsWithKey, sendResults,
                securityService);
          } else {
            sendResultsAsObjectArray(selectResults, numberOfChunks, servConn, isStructs,
                collectionType, queryString, cqQuery, sendCqResultsWithKey, sendResults);
          }
        }

        if (cqQuery != null) {
          // Set the CQ query result cache initialized flag.
          cqQuery.setCqResultsCacheInitialized();
        }

      } else if (result instanceof Integer) {
        if (sendResults) {
          queryResponseMsg.setMessageType(MessageType.RESPONSE);
          queryResponseMsg.setTransactionId(msg.getTransactionId());
          queryResponseMsg.sendHeader();
          writeQueryResponseChunk(result, null, true, servConn);
        }
      } else {
        throw new QueryInvalidException(String.format("Unknown result type: %s",
            result.getClass()));
      }
      msg.clearParts();
    } catch (QueryInvalidException e) {
      // Handle this exception differently since it can contain
      // non-serializable objects.
      // java.io.NotSerializableException: antlr.CommonToken
      // Log a warning to show stack trace and create a new
      // QueryInvalidEsception on the original one's message (not cause).
      logger.warn(String.format("Unexpected QueryInvalidException while processing query %s",
          queryString),
          e);
      QueryInvalidException qie =
          new QueryInvalidException(String.format("%s : QueryString is: %s.",
              new Object[] {e.getLocalizedMessage(), queryString}));
      writeQueryResponseException(msg, qie, servConn);
      return false;
    } catch (DistributedSystemDisconnectedException se) {
      if (msg != null && logger.isDebugEnabled()) {
        logger.debug(
            "{}: ignoring message of type {} from client {} because shutdown occurred during message processing.",
            servConn.getName(), MessageType.getString(msg.getMessageType()), servConn.getProxyID());
      }
      servConn.setFlagProcessMessagesAsFalse();
      servConn.setClientDisconnectedException(se);
      return false;
    } catch (Exception e) {
      // If an interrupted exception is thrown , rethrow it
      checkForInterrupt(servConn, e);
      // Otherwise, write a query response and continue
      // Check if query got canceled from QueryMonitor.
      if (e instanceof QueryExecutionLowMemoryException
          || e instanceof QueryExecutionTimeoutException
          || e instanceof QueryExecutionCanceledException) {
        e = new QueryException(e.getMessage(),
            e.getCause());
      }
      writeQueryResponseException(msg, e, servConn);
      return false;
    } finally {
      // Since the query object is being shared in case of bind queries,
      // resetting the flag may cause inconsistency.
      // Also since this flag is only being set in code path executed by
      // remote query execution, resetting it is not required.

      // ((DefaultQuery)query).setRemoteQuery(false);
    }

    if (logger.isDebugEnabled()) {
      logger.debug("{}: Sent query response for query {}", servConn.getName(), queryString);
    }

    stats.incWriteQueryResponseTime(DistributionStats.getStatTime() - start);
    return true;
  }

  protected CollectionType getCollectionType(SelectResults results) {
    return results.getCollectionType();
  }

  private boolean sendCqResultsWithKey(ServerConnection servConn) {
    Version clientVersion = servConn.getClientVersion();
    if (clientVersion.compareTo(Version.GFE_65) >= 0) {
      return true;
    }
    return false;
  }

  protected void sendCqResponse(int msgType, String msgStr, int txId, Throwable e,
      ServerConnection servConn) throws IOException {
    ChunkedMessage cqMsg = servConn.getChunkedResponseMessage();
    if (logger.isDebugEnabled()) {
      logger.debug("CQ Response message :{}", msgStr);
    }

    switch (msgType) {
      case MessageType.REPLY:
        cqMsg.setNumberOfParts(1);
        break;

      case MessageType.CQDATAERROR_MSG_TYPE:
        logger.warn(msgStr);
        cqMsg.setNumberOfParts(1);
        break;

      case MessageType.CQ_EXCEPTION_TYPE:
        String exMsg = "";
        if (e != null) {
          exMsg = e.getLocalizedMessage();
        }
        logger.info(msgStr + exMsg, e);

        msgStr += exMsg; // fixes bug 42309

        cqMsg.setNumberOfParts(1);
        break;

      default:
        msgType = MessageType.CQ_EXCEPTION_TYPE;
        cqMsg.setNumberOfParts(1);
        msgStr += "Uknown query Exception.";
        break;
    }

    cqMsg.setMessageType(msgType);
    cqMsg.setTransactionId(txId);
    cqMsg.sendHeader();
    cqMsg.addStringPart(msgStr);
    cqMsg.setLastChunk(true);
    cqMsg.sendChunk(servConn);
    cqMsg.setLastChunk(true);

    if (logger.isDebugEnabled()) {
      logger.debug("CQ Response sent successfully");
    }
  }

  private void sendResultsAsObjectArray(SelectResults selectResults, int numberOfChunks,
      ServerConnection servConn, boolean isStructs, CollectionType collectionType,
      String queryString, ServerCQ cqQuery, boolean sendCqResultsWithKey, boolean sendResults)
      throws IOException {
    int resultIndex = 0;
    // For CQ only as we dont want CQEntries which have null values.
    int cqResultIndex = 0;
    Object[] objs = selectResults.toArray();
    for (int j = 0; j < numberOfChunks; j++) {
      boolean incompleteArray = false;
      if (logger.isTraceEnabled()) {
        logger.trace("{}: Creating chunk: {}", servConn.getName(), j);
      }
      Object[] results = new Object[MAXIMUM_CHUNK_SIZE];
      for (int i = 0; i < MAXIMUM_CHUNK_SIZE; i++) {
        if ((resultIndex) == objs.length) {
          incompleteArray = true;
          break;
        }
        if (logger.isTraceEnabled()) {
          logger.trace("{}: Adding entry [{}] to query results: {}", servConn.getName(),
              resultIndex, objs[resultIndex]);
        }
        if (cqQuery != null) {
          CqEntry e = (CqEntry) objs[resultIndex];
          // The value may have become null because of entry invalidation.
          if (e.getValue() == null) {
            resultIndex++;
            // i will get incremented anyway so we need to decrement it back so
            // that results[i] is not null.
            i--;
            continue;
          }
          // Add the key into CQ results cache.
          // For PR the Result caching is not yet supported.
          // cqQuery.cqResultsCacheInitialized is added to take care
          // of CQ execute requests that are re-sent. In that case no
          // need to update the Results cache.
          if (!cqQuery.isPR()) {
            cqQuery.addToCqResultKeys(e.getKey());
          }

          // Add to the Results object array.
          if (sendCqResultsWithKey) {
            results[i] = e.getKeyValuePair();
          } else {
            results[i] = e.getValue();
          }
        } else {
          // instance check added to fix bug 40516.
          if (isStructs && (objs[resultIndex] instanceof Struct)) {
            results[i] = ((Struct) objs[resultIndex]).getFieldValues();
          } else {
            results[i] = objs[resultIndex];
          }
        }
        resultIndex++;
        cqResultIndex++;
      }
      // Shrink array if necessary. This will occur if the number
      // of entries in the chunk does not divide evenly into the
      // number of entries in the result set.
      if (incompleteArray) {
        Object[] newResults;
        if (cqQuery != null) {
          newResults = new Object[cqResultIndex % MAXIMUM_CHUNK_SIZE];
        } else {
          newResults = new Object[resultIndex % MAXIMUM_CHUNK_SIZE];
        }
        for (int i = 0; i < newResults.length; i++) {
          newResults[i] = results[i];
        }
        results = newResults;
      }

      if (sendResults) {
        writeQueryResponseChunk(results, collectionType, (resultIndex == objs.length),
            servConn);

        if (logger.isDebugEnabled()) {
          logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}",
              servConn.getName(), (j + 1), numberOfChunks, queryString);
        }
      }
      // If we have reached the last element of SelectResults then we should
      // break out of loop here only.
      if (resultIndex == objs.length) {
        break;
      }
    }
  }

  private void sendResultsAsObjectPartList(int numberOfChunks, ServerConnection servConn, List objs,
      boolean isStructs, CollectionType collectionType, String queryString, ServerCQ cqQuery,
      boolean sendCqResultsWithKey, boolean sendResults, final SecurityService securityService)
      throws IOException {
    int resultIndex = 0;
    Object result = null;
    for (int j = 0; j < numberOfChunks; j++) {
      if (logger.isTraceEnabled()) {
        logger.trace("{}: Creating chunk: {}", servConn.getName(), j);
      }
      ObjectPartList serializedObjs = new ObjectPartList(MAXIMUM_CHUNK_SIZE, false);
      for (int i = 0; i < MAXIMUM_CHUNK_SIZE; i++) {
        if ((resultIndex) == objs.size()) {
          break;
        }
        if (logger.isTraceEnabled()) {
          logger.trace("{}: Adding entry [{}] to query results: {}", servConn.getName(),
              resultIndex, objs.get(resultIndex));
        }
        if (cqQuery != null) {
          CqEntry e = (CqEntry) objs.get(resultIndex);
          // The value may have become null because of entry invalidation.
          if (e.getValue() == null) {
            resultIndex++;
            continue;
          }

          // Add the key into CQ results cache.
          // For PR the Result caching is not yet supported.
          // cqQuery.cqResultsCacheInitialized is added to take care
          // of CQ execute requests that are re-sent. In that case no
          // need to update the Results cache.
          if (!cqQuery.isPR()) {
            cqQuery.addToCqResultKeys(e.getKey());
          }

          // Add to the Results object array.
          if (sendCqResultsWithKey) {
            result = e.getKeyValuePair();
          } else {
            result = e.getValue();
          }
        } else {
          result = objs.get(resultIndex);
        }
        if (sendResults) {
          addToObjectPartList(serializedObjs, result, collectionType, false, servConn, isStructs,
              securityService);
        }
        resultIndex++;
      }

      if (sendResults) {
        writeQueryResponseChunk(serializedObjs, collectionType, ((j + 1) == numberOfChunks),
            servConn);

        if (logger.isDebugEnabled()) {
          logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}",
              servConn.getName(), (j + 1), numberOfChunks, queryString);
        }
      }
    }
  }

  private void addToObjectPartList(ObjectPartList serializedObjs, Object res,
      CollectionType collectionType, boolean lastChunk, ServerConnection servConn,
      boolean isStructs, final SecurityService securityService) throws IOException {
    if (isStructs && (res instanceof Struct)) {
      Object[] values = ((Struct) res).getFieldValues();
      // create another ObjectPartList for the struct
      ObjectPartList serializedValueObjs = new ObjectPartList(values.length, false);
      for (Object value : values) {
        addObjectToPartList(serializedValueObjs, null, value, securityService);
      }
      serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT, null);
    } else if (res instanceof Object[]) {// for CQ key-value pairs
      Object[] values = ((Object[]) res);
      // create another ObjectPartList for the Object[]
      ObjectPartList serializedValueObjs = new ObjectPartList(values.length, false);
      for (int i = 0; i < values.length; i += 2) {
        Object key = values[i];
        Object value = values[i + 1];
        addObjectToPartList(serializedValueObjs, key, value, securityService);
      }
      serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT, null);
    } else { // for deserialized objects
      addObjectToPartList(serializedObjs, null, res, securityService);
    }
  }

  private void addObjectToPartList(ObjectPartList objPartList, Object key, Object value,
      final SecurityService securityService) {
    Object object = value;
    boolean isObject = true;
    if (value instanceof CachedDeserializable) {
      object = ((CachedDeserializable) value).getSerializedValue();
    } else if (value instanceof byte[]) {
      isObject = false;
    }

    object = securityService.postProcess(null, key, object, isObject);
    if (key != null) {
      objPartList.addPart(null, key, ObjectPartList.OBJECT, null);
    }
    objPartList.addPart(null, object, isObject ? ObjectPartList.OBJECT : ObjectPartList.BYTES,
        null);
  }

}
