/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.internal.index;

import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.logging.log4j.Logger;

import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.query.AmbiguousNameException;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.IndexExistsException;
import org.apache.geode.cache.query.IndexInvalidException;
import org.apache.geode.cache.query.IndexMaintenanceException;
import org.apache.geode.cache.query.IndexNameConflictException;
import org.apache.geode.cache.query.IndexStatistics;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.cache.query.MultiIndexCreationException;
import org.apache.geode.cache.query.NameResolutionException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.TypeMismatchException;
import org.apache.geode.cache.query.internal.CompiledPath;
import org.apache.geode.cache.query.internal.CompiledValue;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.MapIndexable;
import org.apache.geode.cache.query.internal.NullToken;
import org.apache.geode.cache.query.internal.QueryMonitor;
import org.apache.geode.cache.query.internal.QueryObserver;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache.query.internal.index.AbstractIndex.InternalIndexStatistics;
import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.HasCachePerfStats;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;

public class IndexManager {
  private static final Logger logger = LogService.getLogger();

  public static final int ADD_ENTRY = 1;
  public static final int UPDATE_ENTRY = 2;
  public static final int REMOVE_ENTRY = 3;
  // Asif : This action is to rerun Index creation after
  // clear is called on the region
  public static final int RECREATE_INDEX = 4;
  private final InternalCache cache;
  protected final Region region;

  private final boolean isOverFlowToDisk;
  private final boolean offHeap;
  private final boolean indexMaintenanceSynchronous;
  private int numCreators = 0;
  private int numUpdatersInProgress = 0;
  private int numUpdatersInWaiting = 0;
  private int iternameCounter = 0;
  /*
   * Map containing <IndexTask, FutureTask<IndexTask> or Index>. IndexTask represents an index thats
   * completely created or one thats in create phase. This is done in order to avoid synchronization
   * on the indexes.
   */
  private final ConcurrentMap indexes = new ConcurrentHashMap();
  // TODO Asif : Fix the appropriate size of the Map & the concurrency level
  private ConcurrentMap canonicalizedIteratorNameMap = new ConcurrentHashMap();
  private IndexUpdaterThread updater;

  // Threshold for Queue.
  private final int INDEX_MAINTENANCE_BUFFER =
      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "AsynchIndexMaintenanceThreshold", -1);

  public static final boolean JOIN_OPTIMIZATION =
      !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "index.DisableJoinOptimization");

  @MutableForTesting
  public static boolean INPLACE_OBJECT_MODIFICATION_FOR_TEST = false;

  @MutableForTesting
  public static boolean IS_TEST_LDM = false;

  @MutableForTesting
  public static boolean IS_TEST_EXPANSION = false;



  /**
   * System property to maintain the ReverseMap to take care in-place modification of the objects by
   * the application. In case of in-place modification the EntryEvent will not have the old-value,
   * without this the old-values are not removed from the index-maps thus resulting in inconsistent
   * results.
   */
  public static final boolean INPLACE_OBJECT_MODIFICATION = Boolean.valueOf(System.getProperty(
      DistributionConfig.GEMFIRE_PREFIX + "index.INPLACE_OBJECT_MODIFICATION", "false"));

  /**
   * System property to turn-off the compact-index support.
   */
  public static final boolean RANGEINDEX_ONLY = Boolean.valueOf(
      System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "index.RANGEINDEX_ONLY", "false"));

  @MutableForTesting
  public static boolean TEST_RANGEINDEX_ONLY = false;
  public static final String INDEX_ELEMARRAY_THRESHOLD_PROP = "index_elemarray_threshold";
  public static final String INDEX_ELEMARRAY_SIZE_PROP = "index_elemarray_size";
  public static final int INDEX_ELEMARRAY_THRESHOLD =
      Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_THRESHOLD_PROP, "100"));
  public static final int INDEX_ELEMARRAY_SIZE =
      Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_SIZE_PROP, "5"));
  @MakeNotStatic
  public static final AtomicLong SAFE_QUERY_TIME = new AtomicLong(0);
  @MutableForTesting
  public static boolean ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION = true;
  /** The NULL constant */
  public static final Object NULL = new NullToken();

  @MutableForTesting
  public static TestHook testHook;

  // private int numCreatorsInWaiting = 0;
  // @todo ericz
  // there should be a read/write lock PER INDEX in order to maximize
  // the concurrency of query execution.
  public IndexManager(InternalCache cache, Region region) {
    this.cache = cache;
    this.region = region;
    // must be a SortedMap to ensure the indexes are iterated over in fixed
    // order
    // to avoid deadlocks when acquiring locks
    // indexes = Collections.synchronizedSortedMap(new TreeMap());
    indexMaintenanceSynchronous = region.getAttributes().getIndexMaintenanceSynchronous();
    isOverFlowToDisk =
        region.getAttributes().getEvictionAttributes().getAction().isOverflowToDisk();
    this.offHeap = region.getAttributes().getOffHeap();
    if (!indexMaintenanceSynchronous) {
      updater = new IndexUpdaterThread(this.INDEX_MAINTENANCE_BUFFER,
          "OqlIndexUpdater:" + region.getFullPath());
      updater.start();
    }
  }

  /**
   * Stores the largest combination of current time + delta If there is a large delta/hiccup in
   * timings, this allows us to calculate the correct results for a query but, reevaluate more
   * aggressively. But the large hiccup will eventually be rolled off as time is always increasing
   * This is a fix for #47475
   *
   * @param lastModifiedTime the last modified time from version tag
   */
  public static void setIndexBufferTime(long lastModifiedTime, long currentCacheTime) {
    long oldValue = SAFE_QUERY_TIME.get();
    long newValue = currentCacheTime + currentCacheTime - lastModifiedTime;
    if (oldValue < newValue) {
      // use compare and set in case the value has been changed since we got the old value
      SAFE_QUERY_TIME.compareAndSet(oldValue, newValue);
    }
  }

  /**
   * only for test purposes This should not be called from any product code. Calls from product code
   * will possibly cause continous reevaluation (performance issue) OR incorrect query results
   * (functional issue)
   **/
  public static void resetIndexBufferTime() {
    SAFE_QUERY_TIME.set(0);
  }

  /**
   * Calculates whether we need to reevluate the key for the region entry We added a way to
   * determine whether to reevaluate an entry for query execution The method is to keep track of the
   * delta and current time in a single long value The value is then used by the query to determine
   * if a region entry needs to be reevaluated, based on subtracting the value with the query
   * execution time. This provides a delta + some false positive time (dts) If the dts + last
   * modified time of the region entry is > query start time, we can assume that it needs to be
   * reevaluated
   *
   * This is to fix bug 47475, where references to region entries can be held by the executing query
   * either directly or indirectly (iterators can hold references for next) but the values
   * underneath could change.
   *
   * Small amounts of false positives are ok as it will have a slight impact on performance
   */
  public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) {
    boolean needsRecalculate =
        (queryStartTime <= (lastModifiedTime + (SAFE_QUERY_TIME.get() - queryStartTime)));
    return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION && needsRecalculate;
  }

  /** Test Hook */
  public interface TestHook {
    void hook(final int spot) throws RuntimeException;
  }

  /**
   * The Region this IndexManager is associated with
   *
   * @return the Region for this IndexManager
   */
  public Region getRegion() {
    return region;
  }

  /**
   * Used by tests to access the updater thread to determine its progress
   */
  public IndexUpdaterThread getUpdaterThread() {
    return this.updater;
  }

  // @todo need more specific list of exceptions
  /**
   * Create an index that can be used when executing queries.
   *
   * @param indexName the name of this index, used for statistics collection
   * @param indexType the type of index
   * @param origIndexedExpression the expression to index on, a function dependent on region entries
   *        individually.
   * @param origFromClause expression that evaluates to the collection(s) that will be queried over,
   *        must contain one and only one region path.
   * @return the newly created Index
   */
  public Index createIndex(String indexName, IndexType indexType, String origIndexedExpression,
      String origFromClause, String imports, ExecutionContext externalContext,
      PartitionedIndex prIndex, boolean loadEntries)
      throws IndexNameConflictException, IndexExistsException, IndexInvalidException {

    if (QueryMonitor.isLowMemory()) {
      throw new IndexInvalidException(
          "Index creation canceled due to low memory");
    }

    boolean oldReadSerialized = this.cache.getPdxReadSerializedOverride();
    this.cache.setPdxReadSerializedOverride(true);

    TXStateProxy tx = null;
    if (!((InternalCache) this.cache).isClient()) {
      tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).pauseTransaction();
    }

    try {
      String projectionAttributes = "*"; // for now this is the only option

      if (getIndex(indexName) != null) {
        throw new IndexNameConflictException(
            String.format("Index named ' %s ' already exists.",
                indexName));
      }

      IndexCreationHelper helper = null;
      boolean isCompactOrHash = false;
      // Hash index not supported for overflow but we "thought" we were so let's maintain backwards
      // compatibility
      // and create a regular compact range index instead. This is due to having to reload entries
      // from overflow just
      // to recalculate the index key for the entry for comparisons during query.
      if (indexType == IndexType.HASH && isOverFlowRegion()) {
        indexType = IndexType.FUNCTIONAL;
      }
      if (indexType != IndexType.PRIMARY_KEY) {
        helper = new FunctionalIndexCreationHelper(origFromClause, origIndexedExpression,
            projectionAttributes, imports, (InternalCache) region.getCache(), externalContext,
            this);
        // Asif: For now support Map index as non compact .expand later
        // The limitation for compact range index also apply to hash index for now
        isCompactOrHash = shouldCreateCompactIndex((FunctionalIndexCreationHelper) helper);
      } else if (indexType == IndexType.PRIMARY_KEY) {
        helper = new PrimaryKeyIndexCreationHelper(origFromClause, origIndexedExpression,
            projectionAttributes, (InternalCache) region.getCache(), externalContext, this);
      } else {
        throw new AssertionError("Don't know how to set helper for " + indexType);
      }
      if (!isCompactOrHash && indexType != IndexType.PRIMARY_KEY) {

        if (indexType == IndexType.HASH) {
          if (!isIndexMaintenanceTypeSynchronous()) {
            throw new UnsupportedOperationException(
                "Hash index is currently not supported for regions with Asynchronous index maintenance.");
          }
          throw new UnsupportedOperationException(
              "Hash Index is not supported with from clause having multiple iterators(collections).");
        }
        // Overflow is not supported with range index.
        if (isOverFlowRegion()) {
          throw new UnsupportedOperationException(
              String.format(
                  "The specified index conditions are not supported for regions which overflow to disk. The region involved is %s",
                  region.getFullPath()));
        }
        // OffHeap is not supported with range index.
        if (isOffHeap()) {
          if (!isIndexMaintenanceTypeSynchronous()) {
            throw new UnsupportedOperationException(
                String.format(
                    "Asynchronous index maintenance is currently not supported for off-heap regions. The off-heap region is %s",
                    region.getFullPath()));
          }
          throw new UnsupportedOperationException(
              String.format(
                  "From clauses having multiple iterators(collections) are not supported for off-heap regions. The off-heap region is %s",
                  region.getFullPath()));
        }
      }

      if (logger.isDebugEnabled()) {
        logger.debug("Started creating index with indexName: {} On region: {}", indexName,
            region.getFullPath());
      }

      if (IndexManager.testHook != null) {
        if (logger.isDebugEnabled()) {
          logger.debug("IndexManager TestHook is set.");
        }

        if (((LocalRegion) this.region).isInitialized()) {
          testHook.hook(1);
        } else {
          testHook.hook(0);
        }
      }

      IndexTask indexTask = new IndexTask(cache, indexName, indexType, origFromClause,
          origIndexedExpression, helper, isCompactOrHash, prIndex, loadEntries);
      FutureTask<Index> indexFutureTask = new FutureTask<Index>(indexTask);
      Object oldIndex = this.indexes.putIfAbsent(indexTask, indexFutureTask);

      Index index = null;

      boolean interrupted = false;
      try {
        if (oldIndex == null) {
          // Initialize index.
          indexFutureTask.run();
          // Set the index.
          index = (Index) indexFutureTask.get();
        } else {
          // Index with same name or characteristic already exists.
          // Check if index creation is complete.
          if (!(oldIndex instanceof Index)) {
            // Some other thread is creating the same index.
            // Wait for index to be initialized from other thread.
            ((Future) oldIndex).get();
          }

          // The Index is successfully created, throw appropriate error message
          // from this thread.
          if (getIndex(indexName) != null) {
            throw new IndexNameConflictException(
                String.format("Index named ' %s ' already exists.",
                    indexName));
          } else {
            throw new IndexExistsException(
                "Similar Index Exists");
          }
        }
      } catch (InterruptedException ignored) {
        interrupted = true;
      } catch (ExecutionException ee) {
        Throwable c = ee.getCause();
        if (c instanceof IndexNameConflictException) {
          throw (IndexNameConflictException) c;
        } else if (c instanceof IndexExistsException) {
          throw (IndexExistsException) c;
        } else if (c instanceof IMQException) {
          throw new IndexInvalidException(c.getMessage());
        }
        throw new IndexInvalidException(ee);

      } finally {
        // If the index is not successfully created, remove IndexTask from
        // the map.
        if (oldIndex == null && index == null) {
          Object ind = this.indexes.get(indexTask);
          if (ind != null && !(ind instanceof Index)) {
            this.indexes.remove(indexTask);
          }
        }
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }
      assert (index != null);
      if (logger.isDebugEnabled()) {
        logger.debug("Completed creating index with indexName: {} On region: {}", indexName,
            region.getFullPath());
      }
      return index;

    } finally {
      this.cache.setPdxReadSerializedOverride(oldReadSerialized);
      ((TXManagerImpl) this.cache.getCacheTransactionManager()).unpauseTransaction(tx);

    }
  }

  /**
   * Return true if we should create CompactRangeIndex Required conditions: indexedExpression is a
   * path expression, fromClause has only one iterator and it is directly on the region values.
   * Currently we have to use the "fat" implementation when asynchronous index updates are on.
   */
  private boolean shouldCreateCompactIndex(FunctionalIndexCreationHelper helper) {
    if (RANGEINDEX_ONLY || TEST_RANGEINDEX_ONLY) {
      return false;
    }

    // compact range index is not supported on asynchronous index maintenance.
    // since compact range index maintains reference to region entry, in case of
    // asynchronous updates the window between cache operation updating the
    // index increases causing query thread to return new value before doing
    // index evaluation (resulting in wrong value. There is still a small window
    // which can be addressed by the sys property:
    // gemfire.index.acquireCompactIndexLocksWithRegionEntryLocks
    if (!getRegion().getAttributes().getIndexMaintenanceSynchronous()) {
      return false;
    }

    // indexedExpression requirement
    CompiledValue cv = helper.getCompiledIndexedExpression();
    int nodeType;
    do {
      nodeType = cv.getType();
      if (nodeType == CompiledValue.PATH) {
        cv = ((CompiledPath) cv).getReceiver();
      }
    } while (nodeType == CompiledValue.PATH);
    // end of path, nodeType at this point should be an Identifier
    if (nodeType != OQLLexerTokenTypes.Identifier && nodeType != OQLLexerTokenTypes.METHOD_INV) {
      if (nodeType == OQLLexerTokenTypes.TOK_LBRACK && !helper.isMapTypeIndex()
          && helper.modifiedIndexExpr instanceof MapIndexable) {
        if (((MapIndexable) helper.modifiedIndexExpr).getIndexingKeys().size() == 1) {

        } else {
          return false;
        }
      } else {
        return false;
      }
    }

    // fromClause requirement
    List iterators = helper.getIterators();
    if (iterators.size() != 1) {
      return false;
    }
    // "missing link" must be "value". Later to support key, entry, etc.
    CompiledValue missingLink = helper.missingLink;
    if (helper.isFirstIteratorRegionEntry) {
      return true;
    } else if (!(missingLink instanceof CompiledPath)) {
      return false;

    }
    String tailId = ((CompiledPath) missingLink).getTailID();
    if (!(tailId.equals("value") || tailId.equals("key"))) {
      return false;
    }
    return true;
  }

  public Index getIndex(String indexName) {
    IndexTask indexTask = new IndexTask(cache, indexName);
    Object ind = this.indexes.get(indexTask);
    // Check if the returned value is instance of Index (this means
    // the index is not in create phase, its created successfully).
    if (ind instanceof Index) {
      return (Index) ind;
    }
    return null;
  }

  public void addIndex(String indexName, Index index) {
    IndexTask indexTask = new IndexTask(cache, indexName);
    this.indexes.put(indexTask, index);
  }

  /**
   * Get the Index with the specified indexType, fromClause, indexedExpression TODO: Asif :Check if
   * synchronization is needed while obtaining Array of Indexes as similar to what we have used
   * during index updates. This function will get the exact index , if available, else will return
   * null
   *
   * @param indexType the type of index
   * @param definitions the String array containing the required defintions of the fromClause of the
   *        index
   * @param indexedExpression the indexedExpression for the index
   * @param context ExecutionContext
   * @return the sole index of the region with these parameters, or null if there isn't one
   */
  public IndexData getIndex(IndexType indexType, String[] definitions,
      CompiledValue indexedExpression, ExecutionContext context)
      throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
    IndexData indxData = null;
    int qItrSize = definitions.length;
    Iterator it = this.indexes.values().iterator();
    StringBuilder sb = new StringBuilder();
    indexedExpression.generateCanonicalizedExpression(sb, context);
    String indexExprStr = sb.toString();
    while (it.hasNext()) {
      int mapping[] = new int[qItrSize];
      Object ind = it.next();
      // Check if the returned value is instance of Index (this means
      // the index is not in create phase, its created successfully).
      if (!(ind instanceof Index)) {
        continue;
      }
      Index index = (Index) ind;
      if (!((IndexProtocol) ind).isMatchingWithIndexExpression(indexedExpression, indexExprStr,
          context) || index.getType() != indexType) {
        continue;
      }

      int matchLevel = getMatchLevel(definitions,
          ((IndexProtocol) index).getCanonicalizedIteratorDefinitions(), mapping);

      if (matchLevel == 0) {
        indxData = new IndexData((IndexProtocol) index, 0/* Exact Match */, mapping);
        break;
      }

    }
    return indxData;
  }

  public int compareIndexData(IndexType indexType, String[] indexDefinitions,
      String indexExpression, IndexType otherType, String[] otherDefinitions,
      String otherExpression, int mapping[]) {

    int matchLevel = -2;

    if (indexExpression.equals(otherExpression) && indexType == otherType) {
      /* Asif : A match level of zero means exact match. */
      matchLevel = getMatchLevel(otherDefinitions, indexDefinitions, mapping);
    }
    return matchLevel;
  }

  /**
   * Asif : Returns the best available Index based on the available iterators in the Group
   *
   * TODO: Asif :Check if synchronization is needed while obtaining Array of Indexes as similar to
   * what we have used during index updates
   *
   * @param indexType Primary or Range Index
   * @param definitions String array containing the canonicalized definitions of the Iterators of
   *        the Group
   * @param indexedExpression Index Expression path(CompiledValue) on which index needs to be
   *        created
   * @param context ExecutionContext object
   * @return IndexData object
   */
  public IndexData getBestMatchIndex(IndexType indexType, String[] definitions,
      CompiledValue indexedExpression, ExecutionContext context)
      throws AmbiguousNameException, TypeMismatchException, NameResolutionException {

    Index bestIndex = null;
    Index bestPRIndex = null;
    int[] bestMapping = null;

    int qItrSize = definitions.length;
    int bestIndexMatchLevel = qItrSize;
    Iterator iter = this.indexes.values().iterator();
    StringBuilder sb = new StringBuilder();
    indexedExpression.generateCanonicalizedExpression(sb, context);
    String indexExprStr = sb.toString();
    PartitionedIndex prIndex = null;
    Index prevBestPRIndex = null;
    Index prevBestIndex = null;

    Index index;
    while (iter.hasNext()) {
      Object ind = iter.next();
      // Check if the value is instance of FutureTask, this means
      // the index is in create phase.
      if (ind instanceof FutureTask) {
        continue;
      }

      // If the index is still empty
      if (!((AbstractIndex) ind).isPopulated()) {
        continue;
      }

      index = (Index) ind;

      if (index instanceof PartitionedIndex) {
        prIndex = (PartitionedIndex) index;
        // Get one of the bucket index. This index will be
        // available on all the buckets.
        index = prIndex.getBucketIndex();
        if (index == null) {
          continue;
        }
      }

      // System.out.println(" Index = "+index);
      // Use simple strategy just pick first compatible index
      if (((IndexProtocol) index).isMatchingWithIndexExpression(indexedExpression, indexExprStr,
          context) && index.getType() == indexType) {

        // For PR the matched index needs to be available on all the query buckets.
        if (prIndex != null) {
          try {

            // Protect the PartitionedIndex from being removed when it is being used.
            if (!prIndex.acquireIndexReadLockForRemove()) {
              continue;
            }

            prIndex.verifyAndCreateMissingIndex(context.getBucketList());
          } catch (Exception ignored) {
            // Index is not there on all buckets.
            // ignore this index.
            prIndex.releaseIndexReadLockForRemove();
            prIndex = null;
            continue;
          }
        } else {
          // For index on replicated regions
          if (!((AbstractIndex) index).acquireIndexReadLockForRemove()) {
            continue;
          }
        }

        /*
         * Asif : A match level of zero means exact match. A match level greater than 0 means the
         * query from clauses have extra iterators as compared to Index resultset ( This does not
         * neccessarily mean that Index resultset is not having extra fields. It is just that the
         * criteria for match level is the absence or presence of extra iterators. The order of
         * preference will be 0 , <0 , > 0 for first cut.
         */
        String indexDefinitions[] = ((IndexProtocol) index).getCanonicalizedIteratorDefinitions();

        int mapping[] = new int[qItrSize];
        int matchLevel = getMatchLevel(definitions, indexDefinitions, mapping);

        if (matchLevel == 0) {
          prevBestPRIndex = bestPRIndex;
          bestPRIndex = prIndex;
          prevBestIndex = bestIndex;
          bestIndex = index;
          bestIndexMatchLevel = matchLevel;
          bestMapping = mapping;

          // If we chose new index we should release lock on previous index
          // chosen as bestIndex.
          if (prIndex != null && prevBestPRIndex != null
              && prevBestPRIndex instanceof PartitionedIndex) {
            ((PartitionedIndex) prevBestPRIndex).releaseIndexReadLockForRemove();
            prevBestPRIndex = null;
          } else if (prevBestIndex != null) {
            ((AbstractIndex) prevBestIndex).releaseIndexReadLockForRemove();
            prevBestIndex = null;
          }
          break;
        } else if ((bestIndexMatchLevel > 0 && matchLevel < bestIndexMatchLevel)
            || (bestIndexMatchLevel < 0 && matchLevel < 0 && matchLevel > bestIndexMatchLevel)) {
          prevBestPRIndex = bestPRIndex;
          bestPRIndex = prIndex;
          prevBestIndex = bestIndex;
          bestIndex = index;
          bestIndexMatchLevel = matchLevel;
          bestMapping = mapping;
        }

        // release the lock if this index is not chosen as bestIndex.
        if (prIndex != null && bestPRIndex != prIndex) {
          prIndex.releaseIndexReadLockForRemove();
          prIndex = null;
        } else if (bestIndex != index) {
          ((AbstractIndex) index).releaseIndexReadLockForRemove();
          index = null;
        }

        // If we chose new index we should release lock on previous index
        // chosen as bestIndex.
        if (prevBestPRIndex != null && prevBestPRIndex instanceof PartitionedIndex) {
          ((PartitionedIndex) prevBestPRIndex).releaseIndexReadLockForRemove();
          prevBestPRIndex = null;
        } else if (prevBestIndex != null) {
          ((AbstractIndex) prevBestIndex).releaseIndexReadLockForRemove();
          prevBestIndex = null;
        }
      }
    }
    if (bestIndex != null) {
      if (logger.isDebugEnabled()) {
        logger.debug(
            "The best index found for index expression: {} is: {} with Match-level: {} and mapping: {}",
            indexExprStr, bestIndex, bestIndexMatchLevel, Arrays.toString(bestMapping));
      }
    }
    return bestIndex != null
        ? new IndexData((IndexProtocol) bestIndex, bestIndexMatchLevel, bestMapping) : null;
  }

  /*
   * Asif : This function returns the best match index. The crietria used to identify best match
   * index is based currently , relative to the query from clause. If the iterators of query from
   * clause exactly match the index from clause , then match level is zero & is the best match. If
   * the query from clause contain extra iterators , not available in index from clause, then mach
   * level is > 0 & is not the best. If the match level is < 0 that means Index from clause have
   * some extra iterators as compared to query. The int array gives the mapping of query iterator's
   * position relative to the index resultset fields . The position is '1' based index. That is for
   * the first query iterator ( 0 position), the mapping will contain 1 which can be thought of as
   * Index ResultSet value at the field with name index_iter1. If the second query iterator has a
   * value 3 , that means for (1 position) iterator , the field will have name index_iter3
   */
  private static int getMatchLevel(String[] queryDefintions, String[] indexDefinitions,
      int[] mapping) {
    int qItrLen = queryDefintions.length;
    int indxItrLen = indexDefinitions.length;
    // Asif : We know that because index expressions have matched that
    // itself indicates that the first iterator, which is regon iterator
    // has matched. So the index field position of the first RuntimeIterator
    // of the Query group is 1
    mapping[0] = 1;
    int matchLevel = qItrLen - 1;
    for (int i = 1; i < qItrLen; ++i) {
      for (int j = 1; j < indxItrLen; ++j) {
        if (queryDefintions[i].equals(indexDefinitions[j])) {
          mapping[i] = ++j;
          --matchLevel;
          break;
        }
      }
    }
    if (matchLevel == 0 && indxItrLen > qItrLen) {
      matchLevel = qItrLen - indxItrLen;
    }
    return matchLevel;
  }

  /*
   * private static int getMatchLevel(String fromClause, String iFromClause) { if
   * (fromClause.equals(iFromClause)) return 0; if (fromClause.startsWith(iFromClause)) { int cnt =
   * -1; int index = fromClause.indexOf(',', iFromClause.length() + 1); while (index > 0) { cnt--;
   * index = fromClause.indexOf(',', index + 1); } return cnt; } else if
   * (iFromClause.startsWith(fromClause)) { int cnt = 1; int index = iFromClause.indexOf(',',
   * fromClause.length() + 1); while (index > 0) { cnt++; index = iFromClause.indexOf(',', index +
   * 1); } return cnt; } //No compatible return Integer.MAX_VALUE; }
   */
  /**
   * Get a collection of all the indexes. If the IndexType is specified returns only the matching
   * indexes.
   *
   * @param indexType the type of indexes to get. Currently must be Indexable.FUNCTIONAL_SORTED
   * @return the collection of indexes for the specified region and type
   */
  public Collection getIndexes(IndexType indexType) {
    ArrayList<Index> list = new ArrayList<>();
    Iterator it = this.indexes.values().iterator();
    while (it.hasNext()) {
      Object ind = it.next();
      // Check if the value is instance of FutureTask, this means
      // the index is in create phase.
      if (ind instanceof FutureTask) {
        continue;
      }
      Index index = (Index) ind;

      // Check if indexType needs to be matched.
      if (indexType == null || index.getType() == indexType) {
        // No type check.
        list.add(index);
      }
    }
    return list;
  }

  /**
   * Get a collection of all the indexes managed by IndexManager
   *
   * @return the collection of indexes on the specified region
   */
  public Collection getIndexes() {
    return getIndexes(null);
  }

  /**
   * Remove the specified index.
   *
   * @param index the Index to remove
   */
  public void removeIndex(Index index) {
    if (index.getRegion() != this.region) {
      throw new IllegalArgumentException(
          "Index does not belong to this IndexManager");
    }
    // Asif: We will just remove the Index from the map. Since the
    // TreeMap is synchronized & the operation of adding a newly created
    // index is in synch there will not be any situation where the unintended
    // Index gets removed( in case of same Index Name scenario).
    // If query obtains the Index handle which is getting removed , that
    // is OK as we are not clearing data maps . The destroy though marks
    // the index invalid , that is OK. Because of this flag a query
    // may or may not use the Index
    IndexTask indexTask = new IndexTask(cache, index.getName());
    if (this.indexes.remove(indexTask) != null) {
      AbstractIndex indexHandle = (AbstractIndex) index;
      indexHandle.destroy();
    }
  }

  // @todo need more specific list of exceptions
  /**
   * Remove all the indexes managed by IndexManager
   */
  public int removeIndexes() {
    // Remove indexes which are available (create completed).
    int numIndexes = 0;
    Iterator it = this.indexes.entrySet().iterator();
    while (it.hasNext()) {
      Map.Entry entry = (Map.Entry) it.next();
      Object ind = entry.getValue();
      // Check if the returned value is instance of Index (this means
      // the index is not in create phase, its created successfully).
      if (!(ind instanceof Index)) {
        continue;
      }
      numIndexes++;
      IndexTask indexTask = (IndexTask) entry.getKey();
      this.indexes.remove(indexTask);
    }
    return numIndexes;
  }


  /**
   * Asif : This function is invoked during clear operation on Region. It causes re execution of
   * Index Initialization query on the region & before doing this it makes theexisting data maps
   * null. This is needed so that index does not miss any entry being put in the region when the
   * Region.clear is in progress
   */
  public void rerunIndexCreationQuery() throws QueryException {
    try {
      QueryObserver observer = QueryObserverHolder.getInstance();
      observer.beforeRerunningIndexCreationQuery();
    } catch (Exception e) {
      // Asif Ignore any exception as this should not hamper normal code flow
      if (logger.isDebugEnabled()) {
        logger.debug(
            "IndexMananger::rerunIndexCreationQuery: Exception in callback beforeRerunningIndexcreationQuery",
            e);
      }
    }
    if (isIndexMaintenanceTypeSynchronous()) {
      recreateAllIndexesForRegion();
    } else {
      // System.out.println("Aynchronous update");
      updater.addTask(RECREATE_INDEX, null, IndexProtocol.OTHER_OP);
    }
  }

  /**
   * populates all the indexes in the region
   */
  public void populateIndexes(Collection<Index> indexSet) throws MultiIndexCreationException {
    waitBeforeUpdate();
    if (region.getCache().getLogger().infoEnabled()) {
      region.getCache().getLogger().info("Populating indexes for region " + region.getName());
    }
    boolean throwException = false;
    HashMap<String, Exception> exceptionsMap = new HashMap<String, Exception>();
    boolean oldReadSerialized = this.cache.getPdxReadSerializedOverride();
    this.cache.setPdxReadSerializedOverride(true);
    try {
      Iterator entryIter = ((LocalRegion) region).getBestIterator(true);
      while (entryIter.hasNext()) {
        RegionEntry entry = (RegionEntry) entryIter.next();
        if (entry == null || entry.isInvalidOrRemoved()) {
          continue;
        }
        // Fault in the value once before index update so that every index
        // update does not have
        // to read the value from disk every time.
        entry.getValue((LocalRegion) this.region);
        Iterator<Index> indexSetIterator = indexSet.iterator();
        while (indexSetIterator.hasNext()) {
          AbstractIndex index = (AbstractIndex) indexSetIterator.next();
          if (!index.isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
            if (logger.isDebugEnabled()) {
              logger.debug("Adding to index :{}{} value :{}", index.getName(),
                  this.region.getFullPath(), entry.getKey());
            }
            long start = ((AbstractIndex) index).updateIndexUpdateStats();
            try {
              index.addIndexMapping(entry);
            } catch (IMQException e) {
              if (logger.isDebugEnabled()) {
                logger.debug("Adding to index failed for: {}, {}", index.getName(), e.getMessage(),
                    e);
              }
              exceptionsMap.put(index.indexName, e);
              indexSetIterator.remove();
              throwException = true;
            }
            ((AbstractIndex) index).updateIndexUpdateStats(start);
          }
        }
      }
      setPopulateFlagForIndexes(indexSet);
      if (throwException) {
        throw new MultiIndexCreationException(exceptionsMap);
      }
    } finally {
      this.cache.setPdxReadSerializedOverride(oldReadSerialized);
      notifyAfterUpdate();
    }
  }

  /**
   * Sets the {@link AbstractIndex#isPopulated} after populating all the indexes in this region
   */
  public void setPopulateFlagForIndexes(Collection<Index> indexSet) {
    for (Object ind : indexSet) {
      AbstractIndex index = (AbstractIndex) ind;
      if (!index.isPopulated()) {
        index.setPopulated(true);
      }
    }
  }

  public void updateIndexes(RegionEntry entry, int action, int opCode) throws QueryException {
    updateIndexes(entry, action, opCode, false);
  }

  /**
   * Callback for IndexManager to update indexes Called from AbstractRegionMap.
   *
   * @param entry the RegionEntry being updated
   * @param action action to be taken (IndexManager.ADD_ENTRY, IndexManager.UPDATE_ENTRY,
   *        IndexManager.REMOVE_ENTRY)
   * @param opCode one of IndexProtocol.OTHER_OP, BEFORE_UPDATE_OP, AFTER_UPDATE_OP.
   */
  public void updateIndexes(RegionEntry entry, int action, int opCode,
      boolean isDiskRecoveryInProgress) throws QueryException {
    if (isDiskRecoveryInProgress) {
      assert !((LocalRegion) this.region).isInitialized();
    } else {
      assert Assert.assertHoldsLock(entry, true);
    }
    if (logger.isDebugEnabled()) {
      logger.debug("IndexManager.updateIndexes {} + action: {}", entry.getKey(), action);
    }
    if (entry == null)
      return;
    if (isIndexMaintenanceTypeSynchronous()) {
      // System.out.println("Synchronous update");
      processAction(entry, action, opCode);
    } else {
      // System.out.println("Aynchronous update");
      updater.addTask(action, entry, opCode);
    }
  }

  /**
   * @param opCode one of IndexProtocol.OTHER_OP, BEFORE_UPDATE_OP, AFTER_UPDATE_OP.
   */
  private void processAction(RegionEntry entry, int action, int opCode) throws QueryException {
    final long startPA = getCachePerfStats().startIndexUpdate();
    Boolean initialPdxReadSerialized = this.cache.getPdxReadSerializedOverride();
    this.cache.setPdxReadSerializedOverride(true);
    TXStateProxy tx = null;
    if (!this.cache.isClient()) {
      tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).pauseTransaction();
    }

    try {
      // Asif: Allow the thread to update iff there is no current index
      // creator thread in progress. There will not be any issue if
      // allow the updater thread to proceed if there is any index
      // creator thread in waiting , but that can cause starvation
      // for index creator thread. So we will give priorityto index
      // creation thread
      if (IndexManager.testHook != null) {
        if (logger.isDebugEnabled()) {
          logger.debug("IndexManager TestHook is set.");
        }
        testHook.hook(6); // ConcurrentIndexInitOnOverflowRegionDUnitTest
      }

      long start = 0;
      boolean indexLockAcquired = false;
      switch (action) {
        case ADD_ENTRY: {
          if (IndexManager.testHook != null) {
            if (logger.isDebugEnabled()) {
              logger.debug("IndexManager TestHook in ADD_ENTRY.");
            }
            testHook.hook(5);
          }
          // this action is only called after update
          assert opCode == IndexProtocol.OTHER_OP;

          // Asif The behaviour can arise if an index creation has already
          // acted upon a newly added entry , but by the time callback
          // occurs , the index is added to the map & thus
          // the add operation will now have an effect of update.
          // so we need to remove the mapping even if it is an Add action
          // as otherwise the new results will get added into the
          // old results instead of replacement
          Iterator iter = this.indexes.values().iterator();
          while (iter.hasNext()) {
            Object ind = iter.next();
            // Check if the value is instance of FutureTask, this means
            // the index is in create phase.
            if (ind instanceof FutureTask) {
              continue;
            }
            IndexProtocol index = (IndexProtocol) ind;

            if (index.isValid() && ((AbstractIndex) index).isPopulated()
                && index.getType() != IndexType.PRIMARY_KEY) {
              // Asif : If the current Index contains an entry inspite
              // of add operation , this can only mean that Index
              // has already acted on it during creation, so do not
              // apply IMQ on it
              if (!index.containsEntry(entry)) {
                if (logger.isDebugEnabled()) {
                  logger.debug("Adding to index: {}{} value: {}", index.getName(),
                      this.region.getFullPath(), entry.getKey());
                }
                start = ((AbstractIndex) index).updateIndexUpdateStats();
                addIndexMapping(entry, index);
                ((AbstractIndex) index).updateIndexUpdateStats(start);
              }
            }
          }
          break;
        }
        case UPDATE_ENTRY: {

          if (IndexManager.testHook != null) {
            if (logger.isDebugEnabled()) {
              logger.debug("IndexManager TestHook in UPDATE_ENTRY.");
            }
            testHook.hook(5);
            testHook.hook(9); // QueryDataInconsistencyDUnitTest
          }

          // this action is only called with opCode AFTER_UPDATE_OP
          assert opCode == IndexProtocol.AFTER_UPDATE_OP;
          Iterator iter = this.indexes.values().iterator();
          while (iter.hasNext()) {
            Object ind = iter.next();
            // Check if the value is instance of FutureTask, this means
            // the index is in create phase.
            if (ind instanceof FutureTask) {
              continue;
            }
            IndexProtocol index = (IndexProtocol) ind;

            if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
              if (logger.isDebugEnabled()) {
                logger.debug("Updating index: {}{} value: {}", index.getName(),
                    this.region.getFullPath(), entry.getKey());
              }
              start = ((AbstractIndex) index).updateIndexUpdateStats();

              addIndexMapping(entry, index);

              ((AbstractIndex) index).updateIndexUpdateStats(start);
            }
          }
          break;
        }
        case REMOVE_ENTRY: {

          if (IndexManager.testHook != null) {
            if (logger.isDebugEnabled()) {
              logger.debug("IndexManager TestHook in REMOVE_ENTRY.");
            }
            testHook.hook(5);
            testHook.hook(10);
          }
          Iterator iter = this.indexes.values().iterator();
          while (iter.hasNext()) {
            Object ind = iter.next();
            // Check if the value is instance of FutureTask, this means
            // the index is in create phase.
            if (ind instanceof FutureTask) {
              continue;
            }
            IndexProtocol index = (IndexProtocol) ind;

            if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
              AbstractIndex abstractIndex = (AbstractIndex) index;
              if (logger.isDebugEnabled()) {
                logger.debug("Removing from index: {}{} value: {}", index.getName(),
                    this.region.getFullPath(), entry.getKey());
              }
              start = ((AbstractIndex) index).updateIndexUpdateStats();

              index.removeIndexMapping(entry, opCode);

              ((AbstractIndex) index).updateIndexUpdateStats(start);
            }
          }
          break;
        }
        default: {
          throw new IndexMaintenanceException(
              "Invalid action");
        }
      }
    } finally {
      this.cache.setPdxReadSerializedOverride(initialPdxReadSerialized);
      ((TXManagerImpl) this.cache.getCacheTransactionManager()).unpauseTransaction(tx);

      getCachePerfStats().endIndexUpdate(startPA);
    }
  }

  private void addIndexMapping(RegionEntry entry, IndexProtocol index) throws IMQException {
    try {
      index.addIndexMapping(entry);
    } catch (Exception exception) {
      index.markValid(false);
      setPRIndexAsInvalid((AbstractIndex) index);
      logger.warn("Put operation for the entry corrupted the index : "
          + ((AbstractIndex) index).indexName + " with the exception : \n " + exception);
    }
  }

  private void setPRIndexAsInvalid(AbstractIndex index) {
    if (index.prIndex != null) {
      AbstractIndex prIndex = (AbstractIndex) index.prIndex;
      prIndex.markValid(false);
    }
  }

  private void waitBeforeUpdate() {
    synchronized (indexes) {
      ++numCreators;
      // Asif : If there exists any updater thread in progress
      // we should not allow index creation to proceed.
      while (numUpdatersInProgress > 0) {
        ((LocalRegion) getRegion()).getCancelCriterion().checkCancelInProgress(null);
        boolean interrupted = Thread.interrupted();
        try {
          indexes.wait();
        } catch (InterruptedException ignored) {
          interrupted = true;
        } finally {
          if (interrupted) {
            Thread.currentThread().interrupt();
          }
        }
      } // while
    }
  }

  private void notifyAfterUpdate() {
    synchronized (indexes) {
      --numCreators;
      // ASIF: If the creator is in progress , this itself
      // means that there is no Update active. The updater threads
      // are either in wait state or there are no threads at all.
      // Since we do not want any update to progress , if there is
      // any creator thread in lock seeking mode ( meaning that it has
      // entered the previous synch block) . We will not issue
      // any notify till the creator count drops to zero &
      // also unless there is any updater thread in waiting
      if (numCreators == 0 && numUpdatersInWaiting > 0) {
        indexes.notifyAll();
      }
    }
  }

  /**
   * Recreates all indexes for this region. This operation blocks all updates on all indexes while
   * recreate is in progress. This is required as recreate does NOT lock region entries before index
   * update and hence might cause inconsistencies in index if concurrent region entry operations are
   * going on.
   *
   */
  private void recreateAllIndexesForRegion() {

    long start = 0;
    waitBeforeUpdate();
    try {
      // opCode is ignored for this operation
      Iterator iter = this.indexes.values().iterator();
      while (iter.hasNext()) {
        Object ind = iter.next();
        // Check if the value is instance of FutureTask, this means
        // the index is in create phase.
        if (ind instanceof FutureTask) {
          continue;
        }
        IndexProtocol index = (IndexProtocol) ind;
        if (index.getType() == IndexType.FUNCTIONAL || index.getType() == IndexType.HASH) {
          AbstractIndex aIndex = ((AbstractIndex) index);
          start = ((AbstractIndex) index).updateIndexUpdateStats();
          ((AbstractIndex) index).recreateIndexData();
          ((AbstractIndex) index).updateIndexUpdateStats(start);

        }
      }
    } catch (Exception e) {
      throw new IndexInvalidException(e);
    } finally {
      notifyAfterUpdate();
    }
  }

  /**
   * Wait for index initialization before entry create, update, invalidate or destroy operation.
   *
   * Note: If the region has a disk region then we should wait for index initialization before
   * getting region entry lock to avoid deadlock (#44431).
   */
  public void waitForIndexInit() {
    synchronized (this.indexes) {
      ++this.numUpdatersInWaiting;
      while (this.numCreators > 0) {
        ((LocalRegion) this.getRegion()).getCancelCriterion().checkCancelInProgress(null);
        boolean interrupted = Thread.interrupted();
        try {
          this.indexes.wait();
        } catch (InterruptedException ignored) {
          interrupted = true;
        } finally {
          if (interrupted) {
            Thread.currentThread().interrupt();
          }
        }
      } // while
      --this.numUpdatersInWaiting;
      ++this.numUpdatersInProgress;
    }
  }

  /**
   * Necessary finally block call for above method.
   */
  public void countDownIndexUpdaters() {
    synchronized (this.indexes) {
      --this.numUpdatersInProgress;
      // Asif: Since Index creator threads can progress only if
      // there is no update threads in progress, thus we need to issue
      // notify all iff there are any creator threads in action & also
      // if the upDateInProgress Count has dipped to 0
      if (this.numUpdatersInProgress == 0 && this.numCreators > 0) {
        this.indexes.notifyAll();
      }
    }
  }

  private CachePerfStats getCachePerfStats() {
    return ((HasCachePerfStats) this.region).getCachePerfStats();
  }

  /**
   * Callback for destroying IndexManager Called after Region.destroy() called
   */
  public void destroy() throws QueryException {
    this.indexes.clear();
    if (!isIndexMaintenanceTypeSynchronous())
      updater.shutdown();
  }

  /**
   * Removes indexes for a destroyed bucket region from the list of bucket indexes in the
   * {@link PartitionedIndex}.
   *
   * @param prRegion the partition region that this bucket belongs to
   */
  public void removeBucketIndexes(PartitionedRegion prRegion) throws QueryException {
    IndexManager parentManager = prRegion.getIndexManager();
    if (parentManager != null) {
      Iterator bucketIndexIterator = indexes.values().iterator();
      while (bucketIndexIterator.hasNext()) {
        Index bucketIndex = (Index) bucketIndexIterator.next();
        Index prIndex = parentManager.getIndex(bucketIndex.getName());
        if (prIndex instanceof PartitionedIndex) {
          ((PartitionedIndex) prIndex).removeFromBucketIndexes(this.region, bucketIndex);
        }
      }
    }
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    Iterator iter = this.indexes.values().iterator();
    while (iter.hasNext()) {
      Object ind = iter.next();
      // Check if the value is instance of FutureTask, this means
      // the index is in create phase.
      if (ind instanceof FutureTask) {
        continue;
      }
      sb.append(ind).append(getLineSeparator());
    }
    return sb.toString();
  }

  public boolean isIndexMaintenanceTypeSynchronous() {
    return this.indexMaintenanceSynchronous;
  }

  public boolean isOverFlowRegion() {
    return this.isOverFlowToDisk;
  }

  public boolean isOffHeap() {
    return this.offHeap;
  }

  public static boolean isObjectModificationInplace() {
    return (INPLACE_OBJECT_MODIFICATION || INPLACE_OBJECT_MODIFICATION_FOR_TEST);
  }

  /**
   * Asif : This function is used exclusively by Index Manager. It gets the unique Iterator name for
   * a Iterator definition, if it already exists, else creates a unqiue name & also stores it in a
   * map for subsequent use
   *
   * @param definition String containing definition of the iterator
   * @return String containing the name of the Iterator
   */
  public String putCanonicalizedIteratorNameIfAbsent(String definition) {
    String str = null;
    synchronized (canonicalizedIteratorNameMap) {
      if ((str = (String) this.canonicalizedIteratorNameMap.get(definition)) == null) {
        str = new StringBuilder("index_iter").append(this.getIncrementedCounter()).toString();
        String temp;
        if ((temp =
            (String) this.canonicalizedIteratorNameMap.putIfAbsent(definition, str)) != null) {
          str = temp;
        }
      }
    }
    return str;
  }

  public void putCanonicalizedIteratorName(String definition, String name) {
    synchronized (canonicalizedIteratorNameMap) {
      this.canonicalizedIteratorNameMap.put(definition, name);
    }
  }

  private synchronized int getIncrementedCounter() {
    return ++this.iternameCounter;
  }

  /**
   * Asif : Given a definition returns the canonicalized iterator name for the definition. If the
   * definition does not exist , null is returned
   *
   */
  public String getCanonicalizedIteratorName(String definition) {
    return ((String) (this.canonicalizedIteratorNameMap.get(definition)));
  }

  ////////////////////// Inner Classes //////////////////////

  public class IndexUpdaterThread extends LoggingThread {

    private volatile boolean running = true;

    private volatile boolean shutdownRequested = false;

    private volatile BlockingQueue pendingTasks;

    /**
     * Creates instance of IndexUpdaterThread
     */
    IndexUpdaterThread(int updateThreshold, String threadName) {
      super(threadName);
      // Check if threshold is set.
      if (updateThreshold > 0) {
        // Create a bounded queue.
        pendingTasks = new ArrayBlockingQueue(updateThreshold);
      } else {
        // Create non-bounded queue.
        pendingTasks = new LinkedBlockingQueue();
      }
    }

    public void addTask(int action, RegionEntry entry, int opCode) {
      Object[] task = new Object[3];
      task[0] = action;
      task[1] = entry;
      task[2] = opCode;
      pendingTasks.add(task);
    }

    /**
     * Stops this thread. Does not return until it has stopped.
     */
    public void shutdown() {
      if (!this.running) {
        return;
      }
      this.shutdownRequested = true;
      this.interrupt();
      try {
        this.join();
      } catch (InterruptedException ignore) {
        Thread.currentThread().interrupt();
        // just return, we're done
      }
    }

    @Override
    public void run() {
      // async writers main loop
      // logger.debug("DiskRegion writer started (writer=" + this + ")");
      org.apache.geode.CancelCriterion stopper = ((LocalRegion) region).getCancelCriterion();
      try {
        while (!this.shutdownRequested) {
          // Termination checks
          SystemFailure.checkFailure();
          if (stopper.isCancelInProgress()) {
            break;
          }
          try {
            Object[] task = (Object[]) pendingTasks.take();
            if (this.shutdownRequested) {
              break;
            }
            updateIndexes(task);
          } catch (InterruptedException ignore) {
            return; // give up (exit the thread)
          }
        }
      } finally {
        this.running = false;
      }
    }

    private void updateIndexes(Object[] task) {
      int action = (Integer) task[0];
      RegionEntry entry = (RegionEntry) task[1];
      int opCode = (Integer) task[2];
      // System.out.println("entry = "+entry.getKey());
      if (entry != null || action == RECREATE_INDEX) {
        try {
          if (action == RECREATE_INDEX) {
            recreateAllIndexesForRegion();
          } else {
            if (entry != null) {
              entry.setUpdateInProgress(true);
            }
            processAction(entry, action, opCode);
          }
        } catch (Exception e) {
          e.printStackTrace();
        } finally {
          if (entry != null && action != RECREATE_INDEX) {
            entry.setUpdateInProgress(false);
          }
        }
      }
    }

    /**
     * Used by tests to determine if the updater thread has finished updating its indexes. The list
     * is cleared without synchronization, which makes this methods somewhat unsafe from a threading
     * point of view.
     */
    public synchronized boolean isDone() {
      return this.pendingTasks.size() == 0;
    }

  }

  /**
   * Index Task used to create the index. This is used along with the FutureTask to take care of,
   * same index creation request from multiple threads. At any time only one thread succeeds and
   * other threads waits for the completion of the index creation. This avoids usage of
   * synchronization which could block any index creation.
   */
  public class IndexTask implements Callable<Index> {

    public String indexName;

    public IndexType indexType;

    public IndexCreationHelper helper;

    public String origFromClause;

    public String origIndexedExpression;

    public boolean isCompactOrHash = false;

    public boolean isLDM = false;

    public PartitionedIndex prIndex;

    public boolean loadEntries;

    private final InternalCache cache;

    IndexTask(InternalCache cache, String indexName, IndexType type, String origFromClause,
        String origIndexedExpression, IndexCreationHelper helper, boolean isCompactOrHash,
        PartitionedIndex prIndex, boolean loadEntries) {
      this.cache = cache;
      this.indexName = indexName;
      this.indexType = type;
      this.origFromClause = origFromClause;
      this.origIndexedExpression = origIndexedExpression;
      this.helper = helper;
      this.isCompactOrHash = isCompactOrHash;
      this.prIndex = prIndex;
      this.loadEntries = loadEntries;
    }

    /* For name based index search */
    IndexTask(InternalCache cache, String indexName) {
      this.cache = cache;
      this.indexName = indexName;
    }

    @Override
    public boolean equals(Object other) {
      // TODO: equals should check the class of its parameter
      if (other == null) {
        return false;
      }
      IndexTask otherIndexTask = (IndexTask) other;
      // compare indexName.
      if (this.indexName.equals(otherIndexTask.indexName)) {
        return true;
      }

      if (otherIndexTask.helper == null || this.helper == null) {
        return false;
      }

      String[] indexDefinitions = this.helper.getCanonicalizedIteratorDefinitions();
      // TODO: avoid object creation in equals
      int[] mapping = new int[indexDefinitions.length];
      // compare index based on its type, expression and definition.
      if (compareIndexData(this.indexType, indexDefinitions,
          this.helper.getCanonicalizedIndexedExpression(), otherIndexTask.indexType,
          otherIndexTask.helper.getCanonicalizedIteratorDefinitions(),
          otherIndexTask.helper.getCanonicalizedIndexedExpression(), mapping) == 0) {
        return true;
      }
      return false;
    }

    public int hashCode() {
      // It returns a constant number as the equality check is based on
      // the OR condition between the indexName and its characteristics
      // (involving type, expression and definition), because of this
      // its not possible to come-up with an accurate hash code.
      return 99;
    }

    /*
     * Creates and initializes the index.
     */
    @Override
    public Index call() {
      Index index = null;
      String indexedExpression = helper.getCanonicalizedIndexedExpression();
      String fromClause = helper.getCanonicalizedFromClause();
      String projectionAttributes = helper.getCanonicalizedProjectionAttributes();
      String[] definitions = helper.getCanonicalizedIteratorDefinitions();
      IndexStatistics stats = null;
      this.isLDM = IndexManager.IS_TEST_LDM;

      if (this.prIndex != null) {
        stats = this.prIndex.getStatistics();
      }
      if (indexType == IndexType.PRIMARY_KEY) {
        index = new PrimaryKeyIndex(cache, indexName, region, fromClause, indexedExpression,
            projectionAttributes, origFromClause, origIndexedExpression, definitions, stats);
        logger.info("Using Primary Key index implementation for '{}' on region {}", indexName,
            region.getFullPath());
      } else if (indexType == IndexType.HASH) {
        index = new HashIndex(cache, indexName, region, fromClause, indexedExpression,
            projectionAttributes, origFromClause, origIndexedExpression, definitions, stats);

        logger.info("Using Hash index implementation for '{}' on region {}", indexName,
            region.getFullPath());
      } else {
        // boolean isCompact = !helper.isMapTypeIndex() &&
        // shouldCreateCompactIndex((FunctionalIndexCreationHelper)helper);
        if (this.isCompactOrHash || this.isLDM) {
          if (indexType == IndexType.FUNCTIONAL && !helper.isMapTypeIndex()) {
            index = new CompactRangeIndex(cache, indexName, region, fromClause, indexedExpression,
                projectionAttributes, origFromClause, origIndexedExpression, definitions, stats);
            logger.info("Using Compact Range index implementation for '{}' on region {}", indexName,
                region.getFullPath());
          } else {
            FunctionalIndexCreationHelper fich = (FunctionalIndexCreationHelper) helper;
            index = new CompactMapRangeIndex(cache, indexName, region, fromClause,
                indexedExpression, projectionAttributes, origFromClause, origIndexedExpression,
                definitions, fich.isAllKeys(), fich.multiIndexKeysPattern, fich.mapKeys, stats);
            logger.info("Using Compact Map Range index implementation for '{}' on region {}",
                indexName, region.getFullPath());
          }
        } else {
          assert indexType == IndexType.FUNCTIONAL;
          if (!helper.isMapTypeIndex()) {
            index = new RangeIndex(cache, indexName, region, fromClause, indexedExpression,
                projectionAttributes, origFromClause, origIndexedExpression, definitions, stats);
            logger.info("Using Non-Compact index implementation for '{}' on region {}", indexName,
                region.getFullPath());
          } else {
            FunctionalIndexCreationHelper fich = (FunctionalIndexCreationHelper) helper;
            index = new MapRangeIndex(cache, indexName, region, fromClause, indexedExpression,
                projectionAttributes, origFromClause, origIndexedExpression, definitions,
                fich.isAllKeys(), fich.multiIndexKeysPattern, fich.mapKeys, stats);
            logger.info("Using Non-Compact Map index implementation for '{}' on region {}",
                indexName, region.getFullPath());
          }
        }
      }
      ((AbstractIndex) index).setPRIndex(prIndex);

      if (index.getType() != IndexType.PRIMARY_KEY) {
        AbstractIndex aIndex = ((AbstractIndex) index);
        aIndex.instantiateEvaluator(helper);
        waitBeforeUpdate();
        boolean indexCreatedSuccessfully = false;
        try {
          ((LocalRegion) region).setFlagForIndexCreationThread(true);
          aIndex.initializeIndex(loadEntries);
          logger.info((loadEntries ? "Initialized and loaded entries into the index "
              : "Initialized but entries not yet loaded into the index " + indexName
                  + " on region: " + region.getFullPath()));
          aIndex.markValid(true);
          indexCreatedSuccessfully = true;
          if (loadEntries) {
            aIndex.setPopulated(true);
            if (this.prIndex != null) {
              ((AbstractIndex) this.prIndex).setPopulated(true);
            }
          }
          indexes.put(this, index);
          if (region instanceof BucketRegion && prIndex != null) {
            prIndex.addToBucketIndexes(region, index);
            prIndex.incNumBucketIndexes();
          }
        } catch (Exception e) {
          throw new IndexInvalidException(e);
        } finally {
          notifyAfterUpdate();
          ((LocalRegion) region).setFlagForIndexCreationThread(false);
          if (!indexCreatedSuccessfully) {
            ((InternalIndexStatistics) index.getStatistics()).close();
          }
        }
      } else {
        // For PrimaryKey index
        ((AbstractIndex) index).setPopulated(true);
        indexes.put(this, index);
        if (region instanceof BucketRegion && prIndex != null) {
          prIndex.addToBucketIndexes(region, index);
        }
        if (this.prIndex != null) {
          ((AbstractIndex) this.prIndex).setPopulated(true);
        }
      }
      return index;
    }
  }
}
