/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import org.apache.logging.log4j.Logger;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.CqServiceStatistics;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.IndexExistsException;
import org.apache.geode.cache.query.IndexInvalidException;
import org.apache.geode.cache.query.IndexNameConflictException;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.cache.query.MultiIndexCreationException;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.internal.cq.ClientCQ;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * A wrapper class over an actual QueryService instance. This is used when the
 * multiuser-authentication attribute is set to true.
 *
 * @see ProxyCache
 * @since GemFire 6.5
 */
public class ProxyQueryService implements QueryService {
  private static final Logger logger = LogService.getLogger();

  ProxyCache proxyCache;
  QueryService realQueryService;
  List<String> cqNames = new ArrayList<String>();

  public ProxyQueryService(ProxyCache proxyCache, QueryService realQueryService) {
    this.proxyCache = proxyCache;
    this.realQueryService = realQueryService;
  }

  @Override
  public void closeCqs() {
    closeCqs(false);
  }

  public void closeCqs(boolean keepAlive) {
    preOp();
    try {
      ((DefaultQueryService) realQueryService).getCqService().closeAllCqs(!keepAlive,
          Arrays.asList(this.getCqs()), keepAlive);
      this.cqNames.clear();
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to closeAll Cqs. Error: {}", cqe.getMessage(), cqe);
      }
    }
  }

  @Override
  public Index createHashIndex(String indexName, String indexedExpression, String fromClause)
      throws IndexInvalidException, IndexNameConflictException, IndexExistsException,
      RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public Index createHashIndex(String indexName, String indexedExpression, String fromClause,
      String imports) throws IndexInvalidException, IndexNameConflictException,
      IndexExistsException, RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public Index createIndex(String indexName, IndexType indexType, String indexedExpression,
      String fromClause) throws IndexInvalidException, IndexNameConflictException,
      IndexExistsException, RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public Index createIndex(String indexName, IndexType indexType, String indexedExpression,
      String fromClause, String imports) throws IndexInvalidException, IndexNameConflictException,
      IndexExistsException, RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public Index createIndex(String indexName, String indexedExpression, String fromClause)
      throws IndexInvalidException, IndexNameConflictException, IndexExistsException,
      RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public Index createIndex(String indexName, String indexedExpression, String fromClause,
      String imports) throws IndexInvalidException, IndexNameConflictException,
      IndexExistsException, RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public Index createKeyIndex(String indexName, String indexedExpression, String fromClause)
      throws IndexInvalidException, IndexNameConflictException, IndexExistsException,
      RegionNotFoundException, UnsupportedOperationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }


  @Override
  public void defineKeyIndex(String indexName, String indexedExpression, String fromClause)
      throws RegionNotFoundException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public void defineHashIndex(String indexName, String indexedExpression, String regionPath)
      throws RegionNotFoundException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public void defineHashIndex(String indexName, String indexedExpression, String regionPath,
      String imports) throws RegionNotFoundException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public void defineIndex(String indexName, String indexedExpression, String regionPath)
      throws RegionNotFoundException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public void defineIndex(String indexName, String indexedExpression, String regionPath,
      String imports) throws RegionNotFoundException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public List<Index> createDefinedIndexes() throws MultiIndexCreationException {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public boolean clearDefinedIndexes() {
    throw new UnsupportedOperationException(
        "Index creation on the server is not supported from the client.");
  }

  @Override
  public void executeCqs() throws CqException {
    preOp();
    try {
      ((DefaultQueryService) realQueryService).getCqService()
          .executeCqs(Arrays.asList(this.getCqs()));
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to execute cqs. Error: {}", cqe.getMessage(), cqe);
      }
    }
  }

  @Override
  public void executeCqs(String regionName) throws CqException {
    preOp();
    try {
      ((DefaultQueryService) realQueryService).getCqService()
          .executeCqs(Arrays.asList(this.getCqs(regionName)));
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to execute cqs on the specified region. Error: {}", cqe.getMessage(),
            cqe);
      }
    }
  }

  @Override
  public CqQuery getCq(String cqName) {
    preOp();
    if (this.cqNames.contains(cqName)) {
      return this.realQueryService.getCq(cqName);
    } else {
      return null;
    }
  }

  @Override
  public CqServiceStatistics getCqStatistics() {
    throw new UnsupportedOperationException();
  }

  @Override
  public ClientCQ[] getCqs() {
    preOp();
    ClientCQ[] cqs = null;
    try {
      ArrayList<InternalCqQuery> cqList = new ArrayList<InternalCqQuery>();
      for (String name : this.cqNames) {
        cqList.add(((DefaultQueryService) realQueryService).getCqService().getCq(name));
      }
      cqs = new ClientCQ[cqList.size()];
      cqList.toArray(cqs);
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to get Cqs. Error: {}", cqe.getMessage(), cqe);
      }
    }
    return cqs;
  }

  @Override
  public ClientCQ[] getCqs(String regionName) throws CqException {
    preOp();
    Collection<? extends InternalCqQuery> cqs = null;
    try {
      ArrayList<CqQuery> cqList = new ArrayList<CqQuery>();
      cqs = ((DefaultQueryService) realQueryService).getCqService().getAllCqs(regionName);
      for (InternalCqQuery cq : cqs) {
        if (this.cqNames.contains(cq.getName())) {
          cqList.add((CqQuery) cq);
        }
      }
      ClientCQ[] results = new ClientCQ[cqList.size()];
      cqList.toArray(results);
      return results;
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to get Cqs. Error: {}", cqe.getMessage(), cqe);
      }
      throw cqe;
    }
  }

  @Override
  public Index getIndex(Region<?, ?> region, String indexName) {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public Collection<Index> getIndexes() {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public Collection<Index> getIndexes(Region<?, ?> region) {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public Collection<Index> getIndexes(Region<?, ?> region, IndexType indexType) {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public CqQuery newCq(String queryString, CqAttributes cqAttributes)
      throws QueryInvalidException, CqException {
    preOp(true);
    ClientCQ cq = null;
    try {
      cq = ((DefaultQueryService) realQueryService).getCqService().newCq(null, queryString,
          cqAttributes, ((DefaultQueryService) realQueryService).getPool(), false);
      cq.setProxyCache(this.proxyCache);
      this.cqNames.add(cq.getName());
    } catch (CqExistsException cqe) {
      // Should not throw in here.
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to createCq. Error: {}", cqe.getMessage(), cqe);
      }
    } finally {
      postOp();
    }
    return cq;
  }

  @Override
  public CqQuery newCq(String queryString, CqAttributes cqAttributes, boolean isDurable)
      throws QueryInvalidException, CqException {
    preOp(true);
    ClientCQ cq = null;
    try {
      cq = ((DefaultQueryService) realQueryService).getCqService().newCq(null, queryString,
          cqAttributes, ((DefaultQueryService) realQueryService).getPool(), isDurable);
      cq.setProxyCache(this.proxyCache);
      this.cqNames.add(cq.getName());
    } catch (CqExistsException cqe) {
      // Should not throw in here.
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to createCq. Error: {}", cqe.getMessage(), cqe);
      }
    } finally {
      postOp();
    }
    return cq;
  }

  @Override
  public CqQuery newCq(String cqName, String queryString, CqAttributes cqAttributes)
      throws QueryInvalidException, CqExistsException, CqException {
    preOp(true);
    try {
      if (cqName == null) {
        throw new IllegalArgumentException(
            "cqName must not be null");
      }
      ClientCQ cq = ((DefaultQueryService) realQueryService).getCqService().newCq(cqName,
          queryString, cqAttributes, ((DefaultQueryService) realQueryService).getPool(), false);
      cq.setProxyCache(proxyCache);
      this.cqNames.add(cq.getName());
      return cq;
    } finally {
      postOp();
    }
  }

  @Override
  public CqQuery newCq(String cqName, String queryString, CqAttributes cqAttributes,
      boolean isDurable) throws QueryInvalidException, CqExistsException, CqException {
    preOp(true);
    try {
      if (cqName == null) {
        throw new IllegalArgumentException(
            "cqName must not be null");
      }
      ClientCQ cq = ((DefaultQueryService) realQueryService).getCqService().newCq(cqName,
          queryString, cqAttributes, ((DefaultQueryService) realQueryService).getPool(), isDurable);
      cq.setProxyCache(proxyCache);
      this.cqNames.add(cq.getName());
      return cq;
    } finally {
      postOp();
    }
  }

  @Override
  public Query newQuery(String queryString) {
    preOp();
    return ((DefaultQueryService) realQueryService).newQuery(queryString, this.proxyCache);
  }

  @Override
  public void removeIndex(Index index) {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public void removeIndexes() {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public void removeIndexes(Region<?, ?> region) {
    throw new UnsupportedOperationException(
        "Index operation on the server is not supported from the client.");
  }

  @Override
  public void stopCqs() throws CqException {
    preOp();
    try {
      ((DefaultQueryService) realQueryService).getCqService().stopCqs(Arrays.asList(this.getCqs()));
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to stop all CQs. Error: {}", cqe.getMessage(), cqe);
      }
    }
  }

  @Override
  public void stopCqs(String regionName) throws CqException {
    preOp();
    try {
      ((DefaultQueryService) realQueryService).getCqService()
          .stopCqs(Arrays.asList(this.getCqs(regionName)));
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to stop cqs on the specified region. Error: {}", cqe.getMessage(),
            cqe);
      }
    }
  }

  @Override
  public List<String> getAllDurableCqsFromServer() throws CqException {
    preOp();
    try {
      ((DefaultQueryService) realQueryService).getAllDurableCqsFromServer();
    } catch (CqException cqe) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to get all durablec client cqs on the specified region. Error: {}",
            cqe.getMessage(), cqe);
      }
    }
    return Collections.EMPTY_LIST;
  }

  private void preOp() {
    preOp(false);
  }

  private void preOp(boolean setTL) {
    if (this.proxyCache.isClosed()) {
      throw proxyCache.getCacheClosedException("Cache is closed for this user.");
    }
    if (setTL) {
      UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
    }
  }

  private void postOp() {
    UserAttributes.userAttributes.set(null);
  }

}
