/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.cq.internal;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqState;
import org.apache.geode.cache.query.CqStatistics;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.internal.CompiledIteratorDef;
import org.apache.geode.cache.query.internal.CompiledRegion;
import org.apache.geode.cache.query.internal.CompiledSelect;
import org.apache.geode.cache.query.internal.CompiledValue;
import org.apache.geode.cache.query.internal.CqQueryVsdStats;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.QueryExecutionContext;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogService;

/**
 * Represents the CqQuery object. Implements CqQuery API and CqAttributeMutator.
 *
 * @since GemFire 5.5
 */
@SuppressWarnings("deprecation")
public abstract class CqQueryImpl implements InternalCqQuery {
  private static final Logger logger = LogService.getLogger();

  protected String cqName;

  protected String queryString;

  static final Object TOKEN = new Object();

  LocalRegion cqBaseRegion;

  protected Query query = null;

  InternalLogWriter securityLogWriter;

  protected CqServiceImpl cqService;

  protected String regionName;

  protected boolean isDurable = false;

  /** Stats counters */
  private CqStatisticsImpl cqStats;

  protected CqQueryVsdStats stats;

  protected final CqStateImpl cqState = new CqStateImpl();

  private ExecutionContext queryExecutionContext = null;

  public static TestHook testHook = null;

  /**
   * Constructor.
   */
  public CqQueryImpl() {}

  public CqQueryImpl(CqServiceImpl cqService, String cqName, String queryString,
      boolean isDurable) {
    this.cqName = cqName;
    this.queryString = queryString;
    this.securityLogWriter = (InternalLogWriter) cqService.getCache().getSecurityLoggerI18n();
    this.cqService = cqService;
    this.isDurable = isDurable;
  }

  /**
   * returns CQ name
   */
  @Override
  public String getName() {
    return this.cqName;
  }

  @Override
  public void setName(String cqName) {
    this.cqName = cqName;
  }

  @Override
  public void setCqService(CqService cqService) {
    this.cqService = (CqServiceImpl) cqService;
  }

  /**
   * get the region name in CQ query
   */
  @Override
  public String getRegionName() {
    return this.regionName;
  }

  void updateCqCreateStats() {
    // Initialize the VSD statistics
    StatisticsFactory factory = cqService.getCache().getDistributedSystem();
    this.stats = new CqQueryVsdStats(factory, getServerCqName());
    this.cqStats = new CqStatisticsImpl(this);

    // Update statistics with CQ creation.
    this.cqService.stats().incCqsStopped();
    this.cqService.stats().incCqsCreated();
    this.cqService.stats().incCqsOnClient();
  }

  /**
   * Validates the CQ. Checks for cq constraints. Also sets the base region name.
   */
  void validateCq() {
    InternalCache cache = cqService.getInternalCache();
    DefaultQuery locQuery = (DefaultQuery) cache.getLocalQueryService().newQuery(this.queryString);
    this.query = locQuery;
    // assert locQuery != null;

    // validate Query.
    Object[] parameters = new Object[0]; // parameters are not permitted

    // check that it is only a SELECT statement (possibly with IMPORTs)
    CompiledSelect select = locQuery.getSimpleSelect();
    if (select == null) {
      throw new UnsupportedOperationException(
          "CQ queries must be a select statement only");
    }

    // must not be a DISTINCT select
    if (select.isDistinct()) {
      throw new UnsupportedOperationException(
          "select DISTINCT queries not supported in CQ");
    }

    // get the regions referenced in this query
    Set regionsInQuery = locQuery.getRegionsInQuery(parameters);
    // check for more than one region is referenced in the query
    // (though it could still be one region referenced multiple times)
    if (regionsInQuery.size() > 1 || regionsInQuery.size() < 1) {
      throw new UnsupportedOperationException(
          "CQ queries must reference one and only one region");
    }
    this.regionName = (String) regionsInQuery.iterator().next();

    // make sure the where clause references no regions
    Set regions = new HashSet();
    CompiledValue whereClause = select.getWhereClause();
    if (whereClause != null) {
      whereClause.getRegionsInQuery(regions, parameters);
      if (!regions.isEmpty()) {
        throw new UnsupportedOperationException(
            "The WHERE clause in CQ queries cannot refer to a region");
      }
    }
    List fromClause = select.getIterators();
    // cannot have more than one iterator in FROM clause
    if (fromClause.size() > 1) {
      throw new UnsupportedOperationException(
          "CQ queries cannot have more than one iterator in the FROM clause");
    }

    // the first iterator in the FROM clause must be just a CompiledRegion
    CompiledIteratorDef itrDef = (CompiledIteratorDef) fromClause.get(0);
    // By process of elimination, we know that the first iterator contains a reference
    // to the region. Check to make sure it is only a CompiledRegion
    if (!(itrDef.getCollectionExpr() instanceof CompiledRegion)) {
      throw new UnsupportedOperationException(
          "CQ queries must have a region path only as the first iterator in the FROM clause");
    }

    // must not have any projections
    List projs = select.getProjectionAttributes();
    if (projs != null) {
      throw new UnsupportedOperationException(
          "CQ queries do not support projections");
    }

    // check the orderByAttrs, not supported
    List orderBys = select.getOrderByAttrs();
    if (orderBys != null) {
      throw new UnsupportedOperationException(
          "CQ queries do not support ORDER BY");
    }

    // Set Query ExecutionContext, that will be used in later execution.
    this.setQueryExecutionContext(
        new QueryExecutionContext(null, (InternalCache) this.cqService.getCache()));
  }

  /**
   * Removes the CQ from CQ repository.
   */
  void removeFromCqMap() throws CqException {
    try {
      cqService.removeCq(this.getServerCqName());
    } catch (Exception ex) {
      String errMsg =
          "Failed to remove Continuous Query From the repository. CqName: %s Error : %s";
      Object[] errMsgArgs = new Object[] {cqName, ex.getLocalizedMessage()};
      String msg = String.format(errMsg, errMsgArgs);
      logger.error(msg);
      throw new CqException(msg, ex);
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Removed CQ from the CQ repository. CQ Name: {}", this.cqName);
    }
  }

  /**
   * Returns the QueryString of this CQ.
   */
  @Override
  public String getQueryString() {
    return queryString;
  }

  /**
   * Return the query after replacing region names with parameters
   *
   * @return the Query for the query string
   */
  @Override
  public Query getQuery() {
    return query;
  }

  @Override
  public CqStatistics getStatistics() {
    return cqStats;
  }

  @Override
  public LocalRegion getCqBaseRegion() {
    return this.cqBaseRegion;
  }

  protected abstract void cleanup() throws CqException;

  /**
   * @return Returns the Region name on which this cq is created.
   */
  String getBaseRegionName() {

    return this.regionName;
  }

  @Override
  public abstract String getServerCqName();

  /**
   * Return the state of this query. Should not modify this state without first locking it.
   *
   * @return STOPPED RUNNING or CLOSED
   */
  @Override
  public CqState getState() {
    return this.cqState;
  }

  @Override
  public void setCqState(int state) {
    if (this.isClosed()) {
      throw new CqClosedException(
          String.format("CQ is closed, CqName : %s", this.cqName));
    }

    synchronized (cqState) {
      if (state == CqStateImpl.RUNNING) {
        this.cqState.setState(CqStateImpl.RUNNING);
        this.cqService.stats().incCqsActive();
        this.cqService.stats().decCqsStopped();
      } else if (state == CqStateImpl.STOPPED) {
        this.cqState.setState(CqStateImpl.STOPPED);
        this.cqService.stats().incCqsStopped();
        this.cqService.stats().decCqsActive();
      } else if (state == CqStateImpl.CLOSING) {
        this.cqState.setState(state);
      }
    }
  }

  /**
   * Update CQ stats
   *
   * @param cqEvent object
   */
  void updateStats(CqEvent cqEvent) {
    this.stats.updateStats(cqEvent); // Stats for VSD
  }

  /**
   * Return true if the CQ is in running state
   *
   * @return true if running, false otherwise
   */
  @Override
  public boolean isRunning() {
    return this.cqState.isRunning();
  }

  /**
   * Return true if the CQ is in stopped state
   *
   * @return true if stopped, false otherwise
   */
  @Override
  public boolean isStopped() {
    return this.cqState.isStopped();
  }

  /**
   * Return true if the CQ is closed
   *
   * @return true if closed, false otherwise
   */
  @Override
  public boolean isClosed() {
    return this.cqState.isClosed();
  }

  /**
   * Return true if the CQ is in closing state.
   *
   * @return true if close in progress, false otherwise
   */
  public boolean isClosing() {
    return this.cqState.isClosing();
  }

  /**
   * Return true if the CQ is durable
   *
   * @return true if durable, false otherwise
   */
  @Override
  public boolean isDurable() {
    return this.isDurable;
  }

  /**
   * Returns a reference to VSD stats of the CQ
   *
   * @return VSD stats of the CQ
   */
  @Override
  public CqQueryVsdStats getVsdStats() {
    return stats;
  }

  ExecutionContext getQueryExecutionContext() {
    return queryExecutionContext;
  }

  private void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
    this.queryExecutionContext = queryExecutionContext;
  }

  /** Test Hook */
  public interface TestHook {
    void pauseUntilReady();

    void ready();

    int numQueuedEvents();

    void setEventCount(int count);
  }
}
