| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.query.internal.index; |
| |
| import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.FutureTask; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.query.AmbiguousNameException; |
| import org.apache.geode.cache.query.Index; |
| import org.apache.geode.cache.query.IndexExistsException; |
| import org.apache.geode.cache.query.IndexInvalidException; |
| import org.apache.geode.cache.query.IndexMaintenanceException; |
| import org.apache.geode.cache.query.IndexNameConflictException; |
| import org.apache.geode.cache.query.IndexStatistics; |
| import org.apache.geode.cache.query.IndexType; |
| import org.apache.geode.cache.query.MultiIndexCreationException; |
| import org.apache.geode.cache.query.NameResolutionException; |
| import org.apache.geode.cache.query.QueryException; |
| import org.apache.geode.cache.query.TypeMismatchException; |
| import org.apache.geode.cache.query.internal.CompiledPath; |
| import org.apache.geode.cache.query.internal.CompiledValue; |
| import org.apache.geode.cache.query.internal.ExecutionContext; |
| import org.apache.geode.cache.query.internal.MapIndexable; |
| import org.apache.geode.cache.query.internal.NullToken; |
| import org.apache.geode.cache.query.internal.QueryMonitor; |
| import org.apache.geode.cache.query.internal.QueryObserver; |
| import org.apache.geode.cache.query.internal.QueryObserverHolder; |
| import org.apache.geode.cache.query.internal.index.AbstractIndex.InternalIndexStatistics; |
| import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.BucketRegion; |
| import org.apache.geode.internal.cache.CachePerfStats; |
| import org.apache.geode.internal.cache.HasCachePerfStats; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.RegionEntry; |
| import org.apache.geode.internal.cache.TXManagerImpl; |
| import org.apache.geode.internal.cache.TXStateProxy; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public class IndexManager { |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final int ADD_ENTRY = 1; |
| public static final int UPDATE_ENTRY = 2; |
| public static final int REMOVE_ENTRY = 3; |
| // Asif : This action is to rerun Index creation after |
| // clear is called on the region |
| public static final int RECREATE_INDEX = 4; |
| private final InternalCache cache; |
| protected final Region region; |
| |
| private final boolean isOverFlowToDisk; |
| private final boolean offHeap; |
| private final boolean indexMaintenanceSynchronous; |
| private int numCreators = 0; |
| private int numUpdatersInProgress = 0; |
| private int numUpdatersInWaiting = 0; |
| private int iternameCounter = 0; |
| /* |
| * Map containing <IndexTask, FutureTask<IndexTask> or Index>. IndexTask represents an index thats |
| * completely created or one thats in create phase. This is done in order to avoid synchronization |
| * on the indexes. |
| */ |
| private final ConcurrentMap indexes = new ConcurrentHashMap(); |
| // TODO Asif : Fix the appropriate size of the Map & the concurrency level |
| private ConcurrentMap canonicalizedIteratorNameMap = new ConcurrentHashMap(); |
| private IndexUpdaterThread updater; |
| |
| // Threshold for Queue. |
| private final int INDEX_MAINTENANCE_BUFFER = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "AsynchIndexMaintenanceThreshold", -1); |
| |
| public static final boolean JOIN_OPTIMIZATION = |
| !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "index.DisableJoinOptimization"); |
| |
| @MutableForTesting |
| public static boolean INPLACE_OBJECT_MODIFICATION_FOR_TEST = false; |
| |
| @MutableForTesting |
| public static boolean IS_TEST_LDM = false; |
| |
| @MutableForTesting |
| public static boolean IS_TEST_EXPANSION = false; |
| |
| |
| |
| /** |
| * System property to maintain the ReverseMap to take care in-place modification of the objects by |
| * the application. In case of in-place modification the EntryEvent will not have the old-value, |
| * without this the old-values are not removed from the index-maps thus resulting in inconsistent |
| * results. |
| */ |
| public static final boolean INPLACE_OBJECT_MODIFICATION = Boolean.valueOf(System.getProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "index.INPLACE_OBJECT_MODIFICATION", "false")); |
| |
| /** |
| * System property to turn-off the compact-index support. |
| */ |
| public static final boolean RANGEINDEX_ONLY = Boolean.valueOf( |
| System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "index.RANGEINDEX_ONLY", "false")); |
| |
| @MutableForTesting |
| public static boolean TEST_RANGEINDEX_ONLY = false; |
| public static final String INDEX_ELEMARRAY_THRESHOLD_PROP = "index_elemarray_threshold"; |
| public static final String INDEX_ELEMARRAY_SIZE_PROP = "index_elemarray_size"; |
| public static final int INDEX_ELEMARRAY_THRESHOLD = |
| Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_THRESHOLD_PROP, "100")); |
| public static final int INDEX_ELEMARRAY_SIZE = |
| Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_SIZE_PROP, "5")); |
| @MakeNotStatic |
| public static final AtomicLong SAFE_QUERY_TIME = new AtomicLong(0); |
| @MutableForTesting |
| public static boolean ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION = true; |
| /** The NULL constant */ |
| public static final Object NULL = new NullToken(); |
| |
| @MutableForTesting |
| public static TestHook testHook; |
| |
| // private int numCreatorsInWaiting = 0; |
| // @todo ericz |
| // there should be a read/write lock PER INDEX in order to maximize |
| // the concurrency of query execution. |
| public IndexManager(InternalCache cache, Region region) { |
| this.cache = cache; |
| this.region = region; |
| // must be a SortedMap to ensure the indexes are iterated over in fixed |
| // order |
| // to avoid deadlocks when acquiring locks |
| // indexes = Collections.synchronizedSortedMap(new TreeMap()); |
| indexMaintenanceSynchronous = region.getAttributes().getIndexMaintenanceSynchronous(); |
| isOverFlowToDisk = |
| region.getAttributes().getEvictionAttributes().getAction().isOverflowToDisk(); |
| this.offHeap = region.getAttributes().getOffHeap(); |
| if (!indexMaintenanceSynchronous) { |
| updater = new IndexUpdaterThread(this.INDEX_MAINTENANCE_BUFFER, |
| "OqlIndexUpdater:" + region.getFullPath()); |
| updater.start(); |
| } |
| } |
| |
| /** |
| * Stores the largest combination of current time + delta If there is a large delta/hiccup in |
| * timings, this allows us to calculate the correct results for a query but, reevaluate more |
| * aggressively. But the large hiccup will eventually be rolled off as time is always increasing |
| * This is a fix for #47475 |
| * |
| * @param lastModifiedTime the last modified time from version tag |
| */ |
| public static void setIndexBufferTime(long lastModifiedTime, long currentCacheTime) { |
| long oldValue = SAFE_QUERY_TIME.get(); |
| long newValue = currentCacheTime + currentCacheTime - lastModifiedTime; |
| if (oldValue < newValue) { |
| // use compare and set in case the value has been changed since we got the old value |
| SAFE_QUERY_TIME.compareAndSet(oldValue, newValue); |
| } |
| } |
| |
| /** |
| * only for test purposes This should not be called from any product code. Calls from product code |
| * will possibly cause continous reevaluation (performance issue) OR incorrect query results |
| * (functional issue) |
| **/ |
| public static void resetIndexBufferTime() { |
| SAFE_QUERY_TIME.set(0); |
| } |
| |
| /** |
| * Calculates whether we need to reevluate the key for the region entry We added a way to |
| * determine whether to reevaluate an entry for query execution The method is to keep track of the |
| * delta and current time in a single long value The value is then used by the query to determine |
| * if a region entry needs to be reevaluated, based on subtracting the value with the query |
| * execution time. This provides a delta + some false positive time (dts) If the dts + last |
| * modified time of the region entry is > query start time, we can assume that it needs to be |
| * reevaluated |
| * |
| * This is to fix bug 47475, where references to region entries can be held by the executing query |
| * either directly or indirectly (iterators can hold references for next) but the values |
| * underneath could change. |
| * |
| * Small amounts of false positives are ok as it will have a slight impact on performance |
| */ |
| public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) { |
| boolean needsRecalculate = |
| (queryStartTime <= (lastModifiedTime + (SAFE_QUERY_TIME.get() - queryStartTime))); |
| return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION && needsRecalculate; |
| } |
| |
| /** Test Hook */ |
| public interface TestHook { |
| void hook(final int spot) throws RuntimeException; |
| } |
| |
| /** |
| * The Region this IndexManager is associated with |
| * |
| * @return the Region for this IndexManager |
| */ |
| public Region getRegion() { |
| return region; |
| } |
| |
| /** |
| * Used by tests to access the updater thread to determine its progress |
| */ |
| public IndexUpdaterThread getUpdaterThread() { |
| return this.updater; |
| } |
| |
| // @todo need more specific list of exceptions |
| /** |
| * Create an index that can be used when executing queries. |
| * |
| * @param indexName the name of this index, used for statistics collection |
| * @param indexType the type of index |
| * @param origIndexedExpression the expression to index on, a function dependent on region entries |
| * individually. |
| * @param origFromClause expression that evaluates to the collection(s) that will be queried over, |
| * must contain one and only one region path. |
| * @return the newly created Index |
| */ |
| public Index createIndex(String indexName, IndexType indexType, String origIndexedExpression, |
| String origFromClause, String imports, ExecutionContext externalContext, |
| PartitionedIndex prIndex, boolean loadEntries) |
| throws IndexNameConflictException, IndexExistsException, IndexInvalidException { |
| |
| if (QueryMonitor.isLowMemory()) { |
| throw new IndexInvalidException( |
| "Index creation canceled due to low memory"); |
| } |
| |
| boolean oldReadSerialized = this.cache.getPdxReadSerializedOverride(); |
| this.cache.setPdxReadSerializedOverride(true); |
| |
| TXStateProxy tx = null; |
| if (!((InternalCache) this.cache).isClient()) { |
| tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).pauseTransaction(); |
| } |
| |
| try { |
| String projectionAttributes = "*"; // for now this is the only option |
| |
| if (getIndex(indexName) != null) { |
| throw new IndexNameConflictException( |
| String.format("Index named ' %s ' already exists.", |
| indexName)); |
| } |
| |
| IndexCreationHelper helper = null; |
| boolean isCompactOrHash = false; |
| // Hash index not supported for overflow but we "thought" we were so let's maintain backwards |
| // compatibility |
| // and create a regular compact range index instead. This is due to having to reload entries |
| // from overflow just |
| // to recalculate the index key for the entry for comparisons during query. |
| if (indexType == IndexType.HASH && isOverFlowRegion()) { |
| indexType = IndexType.FUNCTIONAL; |
| } |
| if (indexType != IndexType.PRIMARY_KEY) { |
| helper = new FunctionalIndexCreationHelper(origFromClause, origIndexedExpression, |
| projectionAttributes, imports, (InternalCache) region.getCache(), externalContext, |
| this); |
| // Asif: For now support Map index as non compact .expand later |
| // The limitation for compact range index also apply to hash index for now |
| isCompactOrHash = shouldCreateCompactIndex((FunctionalIndexCreationHelper) helper); |
| } else if (indexType == IndexType.PRIMARY_KEY) { |
| helper = new PrimaryKeyIndexCreationHelper(origFromClause, origIndexedExpression, |
| projectionAttributes, (InternalCache) region.getCache(), externalContext, this); |
| } else { |
| throw new AssertionError("Don't know how to set helper for " + indexType); |
| } |
| if (!isCompactOrHash && indexType != IndexType.PRIMARY_KEY) { |
| |
| if (indexType == IndexType.HASH) { |
| if (!isIndexMaintenanceTypeSynchronous()) { |
| throw new UnsupportedOperationException( |
| "Hash index is currently not supported for regions with Asynchronous index maintenance."); |
| } |
| throw new UnsupportedOperationException( |
| "Hash Index is not supported with from clause having multiple iterators(collections)."); |
| } |
| // Overflow is not supported with range index. |
| if (isOverFlowRegion()) { |
| throw new UnsupportedOperationException( |
| String.format( |
| "The specified index conditions are not supported for regions which overflow to disk. The region involved is %s", |
| region.getFullPath())); |
| } |
| // OffHeap is not supported with range index. |
| if (isOffHeap()) { |
| if (!isIndexMaintenanceTypeSynchronous()) { |
| throw new UnsupportedOperationException( |
| String.format( |
| "Asynchronous index maintenance is currently not supported for off-heap regions. The off-heap region is %s", |
| region.getFullPath())); |
| } |
| throw new UnsupportedOperationException( |
| String.format( |
| "From clauses having multiple iterators(collections) are not supported for off-heap regions. The off-heap region is %s", |
| region.getFullPath())); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Started creating index with indexName: {} On region: {}", indexName, |
| region.getFullPath()); |
| } |
| |
| if (IndexManager.testHook != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexManager TestHook is set."); |
| } |
| |
| if (((LocalRegion) this.region).isInitialized()) { |
| testHook.hook(1); |
| } else { |
| testHook.hook(0); |
| } |
| } |
| |
| IndexTask indexTask = new IndexTask(cache, indexName, indexType, origFromClause, |
| origIndexedExpression, helper, isCompactOrHash, prIndex, loadEntries); |
| FutureTask<Index> indexFutureTask = new FutureTask<Index>(indexTask); |
| Object oldIndex = this.indexes.putIfAbsent(indexTask, indexFutureTask); |
| |
| Index index = null; |
| |
| boolean interrupted = false; |
| try { |
| if (oldIndex == null) { |
| // Initialize index. |
| indexFutureTask.run(); |
| // Set the index. |
| index = (Index) indexFutureTask.get(); |
| } else { |
| // Index with same name or characteristic already exists. |
| // Check if index creation is complete. |
| if (!(oldIndex instanceof Index)) { |
| // Some other thread is creating the same index. |
| // Wait for index to be initialized from other thread. |
| ((Future) oldIndex).get(); |
| } |
| |
| // The Index is successfully created, throw appropriate error message |
| // from this thread. |
| if (getIndex(indexName) != null) { |
| throw new IndexNameConflictException( |
| String.format("Index named ' %s ' already exists.", |
| indexName)); |
| } else { |
| throw new IndexExistsException( |
| "Similar Index Exists"); |
| } |
| } |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } catch (ExecutionException ee) { |
| Throwable c = ee.getCause(); |
| if (c instanceof IndexNameConflictException) { |
| throw (IndexNameConflictException) c; |
| } else if (c instanceof IndexExistsException) { |
| throw (IndexExistsException) c; |
| } else if (c instanceof IMQException) { |
| throw new IndexInvalidException(c.getMessage()); |
| } |
| throw new IndexInvalidException(ee); |
| |
| } finally { |
| // If the index is not successfully created, remove IndexTask from |
| // the map. |
| if (oldIndex == null && index == null) { |
| Object ind = this.indexes.get(indexTask); |
| if (ind != null && !(ind instanceof Index)) { |
| this.indexes.remove(indexTask); |
| } |
| } |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| assert (index != null); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Completed creating index with indexName: {} On region: {}", indexName, |
| region.getFullPath()); |
| } |
| return index; |
| |
| } finally { |
| this.cache.setPdxReadSerializedOverride(oldReadSerialized); |
| ((TXManagerImpl) this.cache.getCacheTransactionManager()).unpauseTransaction(tx); |
| |
| } |
| } |
| |
| /** |
| * Return true if we should create CompactRangeIndex Required conditions: indexedExpression is a |
| * path expression, fromClause has only one iterator and it is directly on the region values. |
| * Currently we have to use the "fat" implementation when asynchronous index updates are on. |
| */ |
| private boolean shouldCreateCompactIndex(FunctionalIndexCreationHelper helper) { |
| if (RANGEINDEX_ONLY || TEST_RANGEINDEX_ONLY) { |
| return false; |
| } |
| |
| // compact range index is not supported on asynchronous index maintenance. |
| // since compact range index maintains reference to region entry, in case of |
| // asynchronous updates the window between cache operation updating the |
| // index increases causing query thread to return new value before doing |
| // index evaluation (resulting in wrong value. There is still a small window |
| // which can be addressed by the sys property: |
| // gemfire.index.acquireCompactIndexLocksWithRegionEntryLocks |
| if (!getRegion().getAttributes().getIndexMaintenanceSynchronous()) { |
| return false; |
| } |
| |
| // indexedExpression requirement |
| CompiledValue cv = helper.getCompiledIndexedExpression(); |
| int nodeType; |
| do { |
| nodeType = cv.getType(); |
| if (nodeType == CompiledValue.PATH) { |
| cv = ((CompiledPath) cv).getReceiver(); |
| } |
| } while (nodeType == CompiledValue.PATH); |
| // end of path, nodeType at this point should be an Identifier |
| if (nodeType != OQLLexerTokenTypes.Identifier && nodeType != OQLLexerTokenTypes.METHOD_INV) { |
| if (nodeType == OQLLexerTokenTypes.TOK_LBRACK && !helper.isMapTypeIndex() |
| && helper.modifiedIndexExpr instanceof MapIndexable) { |
| if (((MapIndexable) helper.modifiedIndexExpr).getIndexingKeys().size() == 1) { |
| |
| } else { |
| return false; |
| } |
| } else { |
| return false; |
| } |
| } |
| |
| // fromClause requirement |
| List iterators = helper.getIterators(); |
| if (iterators.size() != 1) { |
| return false; |
| } |
| // "missing link" must be "value". Later to support key, entry, etc. |
| CompiledValue missingLink = helper.missingLink; |
| if (helper.isFirstIteratorRegionEntry) { |
| return true; |
| } else if (!(missingLink instanceof CompiledPath)) { |
| return false; |
| |
| } |
| String tailId = ((CompiledPath) missingLink).getTailID(); |
| if (!(tailId.equals("value") || tailId.equals("key"))) { |
| return false; |
| } |
| return true; |
| } |
| |
| public Index getIndex(String indexName) { |
| IndexTask indexTask = new IndexTask(cache, indexName); |
| Object ind = this.indexes.get(indexTask); |
| // Check if the returned value is instance of Index (this means |
| // the index is not in create phase, its created successfully). |
| if (ind instanceof Index) { |
| return (Index) ind; |
| } |
| return null; |
| } |
| |
| public void addIndex(String indexName, Index index) { |
| IndexTask indexTask = new IndexTask(cache, indexName); |
| this.indexes.put(indexTask, index); |
| } |
| |
| /** |
| * Get the Index with the specified indexType, fromClause, indexedExpression TODO: Asif :Check if |
| * synchronization is needed while obtaining Array of Indexes as similar to what we have used |
| * during index updates. This function will get the exact index , if available, else will return |
| * null |
| * |
| * @param indexType the type of index |
| * @param definitions the String array containing the required defintions of the fromClause of the |
| * index |
| * @param indexedExpression the indexedExpression for the index |
| * @param context ExecutionContext |
| * @return the sole index of the region with these parameters, or null if there isn't one |
| */ |
| public IndexData getIndex(IndexType indexType, String[] definitions, |
| CompiledValue indexedExpression, ExecutionContext context) |
| throws AmbiguousNameException, TypeMismatchException, NameResolutionException { |
| IndexData indxData = null; |
| int qItrSize = definitions.length; |
| Iterator it = this.indexes.values().iterator(); |
| StringBuilder sb = new StringBuilder(); |
| indexedExpression.generateCanonicalizedExpression(sb, context); |
| String indexExprStr = sb.toString(); |
| while (it.hasNext()) { |
| int mapping[] = new int[qItrSize]; |
| Object ind = it.next(); |
| // Check if the returned value is instance of Index (this means |
| // the index is not in create phase, its created successfully). |
| if (!(ind instanceof Index)) { |
| continue; |
| } |
| Index index = (Index) ind; |
| if (!((IndexProtocol) ind).isMatchingWithIndexExpression(indexedExpression, indexExprStr, |
| context) || index.getType() != indexType) { |
| continue; |
| } |
| |
| int matchLevel = getMatchLevel(definitions, |
| ((IndexProtocol) index).getCanonicalizedIteratorDefinitions(), mapping); |
| |
| if (matchLevel == 0) { |
| indxData = new IndexData((IndexProtocol) index, 0/* Exact Match */, mapping); |
| break; |
| } |
| |
| } |
| return indxData; |
| } |
| |
| public int compareIndexData(IndexType indexType, String[] indexDefinitions, |
| String indexExpression, IndexType otherType, String[] otherDefinitions, |
| String otherExpression, int mapping[]) { |
| |
| int matchLevel = -2; |
| |
| if (indexExpression.equals(otherExpression) && indexType == otherType) { |
| /* Asif : A match level of zero means exact match. */ |
| matchLevel = getMatchLevel(otherDefinitions, indexDefinitions, mapping); |
| } |
| return matchLevel; |
| } |
| |
| /** |
| * Asif : Returns the best available Index based on the available iterators in the Group |
| * |
| * TODO: Asif :Check if synchronization is needed while obtaining Array of Indexes as similar to |
| * what we have used during index updates |
| * |
| * @param indexType Primary or Range Index |
| * @param definitions String array containing the canonicalized definitions of the Iterators of |
| * the Group |
| * @param indexedExpression Index Expression path(CompiledValue) on which index needs to be |
| * created |
| * @param context ExecutionContext object |
| * @return IndexData object |
| */ |
| public IndexData getBestMatchIndex(IndexType indexType, String[] definitions, |
| CompiledValue indexedExpression, ExecutionContext context) |
| throws AmbiguousNameException, TypeMismatchException, NameResolutionException { |
| |
| Index bestIndex = null; |
| Index bestPRIndex = null; |
| int[] bestMapping = null; |
| |
| int qItrSize = definitions.length; |
| int bestIndexMatchLevel = qItrSize; |
| Iterator iter = this.indexes.values().iterator(); |
| StringBuilder sb = new StringBuilder(); |
| indexedExpression.generateCanonicalizedExpression(sb, context); |
| String indexExprStr = sb.toString(); |
| PartitionedIndex prIndex = null; |
| Index prevBestPRIndex = null; |
| Index prevBestIndex = null; |
| |
| Index index; |
| while (iter.hasNext()) { |
| Object ind = iter.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| |
| // If the index is still empty |
| if (!((AbstractIndex) ind).isPopulated()) { |
| continue; |
| } |
| |
| index = (Index) ind; |
| |
| if (index instanceof PartitionedIndex) { |
| prIndex = (PartitionedIndex) index; |
| // Get one of the bucket index. This index will be |
| // available on all the buckets. |
| index = prIndex.getBucketIndex(); |
| if (index == null) { |
| continue; |
| } |
| } |
| |
| // System.out.println(" Index = "+index); |
| // Use simple strategy just pick first compatible index |
| if (((IndexProtocol) index).isMatchingWithIndexExpression(indexedExpression, indexExprStr, |
| context) && index.getType() == indexType) { |
| |
| // For PR the matched index needs to be available on all the query buckets. |
| if (prIndex != null) { |
| try { |
| |
| // Protect the PartitionedIndex from being removed when it is being used. |
| if (!prIndex.acquireIndexReadLockForRemove()) { |
| continue; |
| } |
| |
| prIndex.verifyAndCreateMissingIndex(context.getBucketList()); |
| } catch (Exception ignored) { |
| // Index is not there on all buckets. |
| // ignore this index. |
| prIndex.releaseIndexReadLockForRemove(); |
| prIndex = null; |
| continue; |
| } |
| } else { |
| // For index on replicated regions |
| if (!((AbstractIndex) index).acquireIndexReadLockForRemove()) { |
| continue; |
| } |
| } |
| |
| /* |
| * Asif : A match level of zero means exact match. A match level greater than 0 means the |
| * query from clauses have extra iterators as compared to Index resultset ( This does not |
| * neccessarily mean that Index resultset is not having extra fields. It is just that the |
| * criteria for match level is the absence or presence of extra iterators. The order of |
| * preference will be 0 , <0 , > 0 for first cut. |
| */ |
| String indexDefinitions[] = ((IndexProtocol) index).getCanonicalizedIteratorDefinitions(); |
| |
| int mapping[] = new int[qItrSize]; |
| int matchLevel = getMatchLevel(definitions, indexDefinitions, mapping); |
| |
| if (matchLevel == 0) { |
| prevBestPRIndex = bestPRIndex; |
| bestPRIndex = prIndex; |
| prevBestIndex = bestIndex; |
| bestIndex = index; |
| bestIndexMatchLevel = matchLevel; |
| bestMapping = mapping; |
| |
| // If we chose new index we should release lock on previous index |
| // chosen as bestIndex. |
| if (prIndex != null && prevBestPRIndex != null |
| && prevBestPRIndex instanceof PartitionedIndex) { |
| ((PartitionedIndex) prevBestPRIndex).releaseIndexReadLockForRemove(); |
| prevBestPRIndex = null; |
| } else if (prevBestIndex != null) { |
| ((AbstractIndex) prevBestIndex).releaseIndexReadLockForRemove(); |
| prevBestIndex = null; |
| } |
| break; |
| } else if ((bestIndexMatchLevel > 0 && matchLevel < bestIndexMatchLevel) |
| || (bestIndexMatchLevel < 0 && matchLevel < 0 && matchLevel > bestIndexMatchLevel)) { |
| prevBestPRIndex = bestPRIndex; |
| bestPRIndex = prIndex; |
| prevBestIndex = bestIndex; |
| bestIndex = index; |
| bestIndexMatchLevel = matchLevel; |
| bestMapping = mapping; |
| } |
| |
| // release the lock if this index is not chosen as bestIndex. |
| if (prIndex != null && bestPRIndex != prIndex) { |
| prIndex.releaseIndexReadLockForRemove(); |
| prIndex = null; |
| } else if (bestIndex != index) { |
| ((AbstractIndex) index).releaseIndexReadLockForRemove(); |
| index = null; |
| } |
| |
| // If we chose new index we should release lock on previous index |
| // chosen as bestIndex. |
| if (prevBestPRIndex != null && prevBestPRIndex instanceof PartitionedIndex) { |
| ((PartitionedIndex) prevBestPRIndex).releaseIndexReadLockForRemove(); |
| prevBestPRIndex = null; |
| } else if (prevBestIndex != null) { |
| ((AbstractIndex) prevBestIndex).releaseIndexReadLockForRemove(); |
| prevBestIndex = null; |
| } |
| } |
| } |
| if (bestIndex != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "The best index found for index expression: {} is: {} with Match-level: {} and mapping: {}", |
| indexExprStr, bestIndex, bestIndexMatchLevel, Arrays.toString(bestMapping)); |
| } |
| } |
| return bestIndex != null |
| ? new IndexData((IndexProtocol) bestIndex, bestIndexMatchLevel, bestMapping) : null; |
| } |
| |
| /* |
| * Asif : This function returns the best match index. The crietria used to identify best match |
| * index is based currently , relative to the query from clause. If the iterators of query from |
| * clause exactly match the index from clause , then match level is zero & is the best match. If |
| * the query from clause contain extra iterators , not available in index from clause, then mach |
| * level is > 0 & is not the best. If the match level is < 0 that means Index from clause have |
| * some extra iterators as compared to query. The int array gives the mapping of query iterator's |
| * position relative to the index resultset fields . The position is '1' based index. That is for |
| * the first query iterator ( 0 position), the mapping will contain 1 which can be thought of as |
| * Index ResultSet value at the field with name index_iter1. If the second query iterator has a |
| * value 3 , that means for (1 position) iterator , the field will have name index_iter3 |
| */ |
| private static int getMatchLevel(String[] queryDefintions, String[] indexDefinitions, |
| int[] mapping) { |
| int qItrLen = queryDefintions.length; |
| int indxItrLen = indexDefinitions.length; |
| // Asif : We know that because index expressions have matched that |
| // itself indicates that the first iterator, which is regon iterator |
| // has matched. So the index field position of the first RuntimeIterator |
| // of the Query group is 1 |
| mapping[0] = 1; |
| int matchLevel = qItrLen - 1; |
| for (int i = 1; i < qItrLen; ++i) { |
| for (int j = 1; j < indxItrLen; ++j) { |
| if (queryDefintions[i].equals(indexDefinitions[j])) { |
| mapping[i] = ++j; |
| --matchLevel; |
| break; |
| } |
| } |
| } |
| if (matchLevel == 0 && indxItrLen > qItrLen) { |
| matchLevel = qItrLen - indxItrLen; |
| } |
| return matchLevel; |
| } |
| |
| /* |
| * private static int getMatchLevel(String fromClause, String iFromClause) { if |
| * (fromClause.equals(iFromClause)) return 0; if (fromClause.startsWith(iFromClause)) { int cnt = |
| * -1; int index = fromClause.indexOf(',', iFromClause.length() + 1); while (index > 0) { cnt--; |
| * index = fromClause.indexOf(',', index + 1); } return cnt; } else if |
| * (iFromClause.startsWith(fromClause)) { int cnt = 1; int index = iFromClause.indexOf(',', |
| * fromClause.length() + 1); while (index > 0) { cnt++; index = iFromClause.indexOf(',', index + |
| * 1); } return cnt; } //No compatible return Integer.MAX_VALUE; } |
| */ |
| /** |
| * Get a collection of all the indexes. If the IndexType is specified returns only the matching |
| * indexes. |
| * |
| * @param indexType the type of indexes to get. Currently must be Indexable.FUNCTIONAL_SORTED |
| * @return the collection of indexes for the specified region and type |
| */ |
| public Collection getIndexes(IndexType indexType) { |
| ArrayList<Index> list = new ArrayList<>(); |
| Iterator it = this.indexes.values().iterator(); |
| while (it.hasNext()) { |
| Object ind = it.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| Index index = (Index) ind; |
| |
| // Check if indexType needs to be matched. |
| if (indexType == null || index.getType() == indexType) { |
| // No type check. |
| list.add(index); |
| } |
| } |
| return list; |
| } |
| |
| /** |
| * Get a collection of all the indexes managed by IndexManager |
| * |
| * @return the collection of indexes on the specified region |
| */ |
| public Collection getIndexes() { |
| return getIndexes(null); |
| } |
| |
| /** |
| * Remove the specified index. |
| * |
| * @param index the Index to remove |
| */ |
| public void removeIndex(Index index) { |
| if (index.getRegion() != this.region) { |
| throw new IllegalArgumentException( |
| "Index does not belong to this IndexManager"); |
| } |
| // Asif: We will just remove the Index from the map. Since the |
| // TreeMap is synchronized & the operation of adding a newly created |
| // index is in synch there will not be any situation where the unintended |
| // Index gets removed( in case of same Index Name scenario). |
| // If query obtains the Index handle which is getting removed , that |
| // is OK as we are not clearing data maps . The destroy though marks |
| // the index invalid , that is OK. Because of this flag a query |
| // may or may not use the Index |
| IndexTask indexTask = new IndexTask(cache, index.getName()); |
| if (this.indexes.remove(indexTask) != null) { |
| AbstractIndex indexHandle = (AbstractIndex) index; |
| indexHandle.destroy(); |
| } |
| } |
| |
| // @todo need more specific list of exceptions |
| /** |
| * Remove all the indexes managed by IndexManager |
| */ |
| public int removeIndexes() { |
| // Remove indexes which are available (create completed). |
| int numIndexes = 0; |
| Iterator it = this.indexes.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Object ind = entry.getValue(); |
| // Check if the returned value is instance of Index (this means |
| // the index is not in create phase, its created successfully). |
| if (!(ind instanceof Index)) { |
| continue; |
| } |
| numIndexes++; |
| IndexTask indexTask = (IndexTask) entry.getKey(); |
| this.indexes.remove(indexTask); |
| } |
| return numIndexes; |
| } |
| |
| |
| /** |
| * Asif : This function is invoked during clear operation on Region. It causes re execution of |
| * Index Initialization query on the region & before doing this it makes theexisting data maps |
| * null. This is needed so that index does not miss any entry being put in the region when the |
| * Region.clear is in progress |
| */ |
| public void rerunIndexCreationQuery() throws QueryException { |
| try { |
| QueryObserver observer = QueryObserverHolder.getInstance(); |
| observer.beforeRerunningIndexCreationQuery(); |
| } catch (Exception e) { |
| // Asif Ignore any exception as this should not hamper normal code flow |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "IndexMananger::rerunIndexCreationQuery: Exception in callback beforeRerunningIndexcreationQuery", |
| e); |
| } |
| } |
| if (isIndexMaintenanceTypeSynchronous()) { |
| recreateAllIndexesForRegion(); |
| } else { |
| // System.out.println("Aynchronous update"); |
| updater.addTask(RECREATE_INDEX, null, IndexProtocol.OTHER_OP); |
| } |
| } |
| |
| /** |
| * populates all the indexes in the region |
| */ |
| public void populateIndexes(Collection<Index> indexSet) throws MultiIndexCreationException { |
| waitBeforeUpdate(); |
| if (region.getCache().getLogger().infoEnabled()) { |
| region.getCache().getLogger().info("Populating indexes for region " + region.getName()); |
| } |
| boolean throwException = false; |
| HashMap<String, Exception> exceptionsMap = new HashMap<String, Exception>(); |
| boolean oldReadSerialized = this.cache.getPdxReadSerializedOverride(); |
| this.cache.setPdxReadSerializedOverride(true); |
| try { |
| Iterator entryIter = ((LocalRegion) region).getBestIterator(true); |
| while (entryIter.hasNext()) { |
| RegionEntry entry = (RegionEntry) entryIter.next(); |
| if (entry == null || entry.isInvalidOrRemoved()) { |
| continue; |
| } |
| // Fault in the value once before index update so that every index |
| // update does not have |
| // to read the value from disk every time. |
| entry.getValue((LocalRegion) this.region); |
| Iterator<Index> indexSetIterator = indexSet.iterator(); |
| while (indexSetIterator.hasNext()) { |
| AbstractIndex index = (AbstractIndex) indexSetIterator.next(); |
| if (!index.isPopulated() && index.getType() != IndexType.PRIMARY_KEY) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adding to index :{}{} value :{}", index.getName(), |
| this.region.getFullPath(), entry.getKey()); |
| } |
| long start = ((AbstractIndex) index).updateIndexUpdateStats(); |
| try { |
| index.addIndexMapping(entry); |
| } catch (IMQException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adding to index failed for: {}, {}", index.getName(), e.getMessage(), |
| e); |
| } |
| exceptionsMap.put(index.indexName, e); |
| indexSetIterator.remove(); |
| throwException = true; |
| } |
| ((AbstractIndex) index).updateIndexUpdateStats(start); |
| } |
| } |
| } |
| setPopulateFlagForIndexes(indexSet); |
| if (throwException) { |
| throw new MultiIndexCreationException(exceptionsMap); |
| } |
| } finally { |
| this.cache.setPdxReadSerializedOverride(oldReadSerialized); |
| notifyAfterUpdate(); |
| } |
| } |
| |
| /** |
| * Sets the {@link AbstractIndex#isPopulated} after populating all the indexes in this region |
| */ |
| public void setPopulateFlagForIndexes(Collection<Index> indexSet) { |
| for (Object ind : indexSet) { |
| AbstractIndex index = (AbstractIndex) ind; |
| if (!index.isPopulated()) { |
| index.setPopulated(true); |
| } |
| } |
| } |
| |
| public void updateIndexes(RegionEntry entry, int action, int opCode) throws QueryException { |
| updateIndexes(entry, action, opCode, false); |
| } |
| |
| /** |
| * Callback for IndexManager to update indexes Called from AbstractRegionMap. |
| * |
| * @param entry the RegionEntry being updated |
| * @param action action to be taken (IndexManager.ADD_ENTRY, IndexManager.UPDATE_ENTRY, |
| * IndexManager.REMOVE_ENTRY) |
| * @param opCode one of IndexProtocol.OTHER_OP, BEFORE_UPDATE_OP, AFTER_UPDATE_OP. |
| */ |
| public void updateIndexes(RegionEntry entry, int action, int opCode, |
| boolean isDiskRecoveryInProgress) throws QueryException { |
| if (isDiskRecoveryInProgress) { |
| assert !((LocalRegion) this.region).isInitialized(); |
| } else { |
| assert Assert.assertHoldsLock(entry, true); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexManager.updateIndexes {} + action: {}", entry.getKey(), action); |
| } |
| if (entry == null) |
| return; |
| if (isIndexMaintenanceTypeSynchronous()) { |
| // System.out.println("Synchronous update"); |
| processAction(entry, action, opCode); |
| } else { |
| // System.out.println("Aynchronous update"); |
| updater.addTask(action, entry, opCode); |
| } |
| } |
| |
| /** |
| * @param opCode one of IndexProtocol.OTHER_OP, BEFORE_UPDATE_OP, AFTER_UPDATE_OP. |
| */ |
| private void processAction(RegionEntry entry, int action, int opCode) throws QueryException { |
| final long startPA = getCachePerfStats().startIndexUpdate(); |
| Boolean initialPdxReadSerialized = this.cache.getPdxReadSerializedOverride(); |
| this.cache.setPdxReadSerializedOverride(true); |
| TXStateProxy tx = null; |
| if (!this.cache.isClient()) { |
| tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).pauseTransaction(); |
| } |
| |
| try { |
| // Asif: Allow the thread to update iff there is no current index |
| // creator thread in progress. There will not be any issue if |
| // allow the updater thread to proceed if there is any index |
| // creator thread in waiting , but that can cause starvation |
| // for index creator thread. So we will give priorityto index |
| // creation thread |
| if (IndexManager.testHook != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexManager TestHook is set."); |
| } |
| testHook.hook(6); // ConcurrentIndexInitOnOverflowRegionDUnitTest |
| } |
| |
| long start = 0; |
| boolean indexLockAcquired = false; |
| switch (action) { |
| case ADD_ENTRY: { |
| if (IndexManager.testHook != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexManager TestHook in ADD_ENTRY."); |
| } |
| testHook.hook(5); |
| } |
| // this action is only called after update |
| assert opCode == IndexProtocol.OTHER_OP; |
| |
| // Asif The behaviour can arise if an index creation has already |
| // acted upon a newly added entry , but by the time callback |
| // occurs , the index is added to the map & thus |
| // the add operation will now have an effect of update. |
| // so we need to remove the mapping even if it is an Add action |
| // as otherwise the new results will get added into the |
| // old results instead of replacement |
| Iterator iter = this.indexes.values().iterator(); |
| while (iter.hasNext()) { |
| Object ind = iter.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| IndexProtocol index = (IndexProtocol) ind; |
| |
| if (index.isValid() && ((AbstractIndex) index).isPopulated() |
| && index.getType() != IndexType.PRIMARY_KEY) { |
| // Asif : If the current Index contains an entry inspite |
| // of add operation , this can only mean that Index |
| // has already acted on it during creation, so do not |
| // apply IMQ on it |
| if (!index.containsEntry(entry)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adding to index: {}{} value: {}", index.getName(), |
| this.region.getFullPath(), entry.getKey()); |
| } |
| start = ((AbstractIndex) index).updateIndexUpdateStats(); |
| addIndexMapping(entry, index); |
| ((AbstractIndex) index).updateIndexUpdateStats(start); |
| } |
| } |
| } |
| break; |
| } |
| case UPDATE_ENTRY: { |
| |
| if (IndexManager.testHook != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexManager TestHook in UPDATE_ENTRY."); |
| } |
| testHook.hook(5); |
| testHook.hook(9); // QueryDataInconsistencyDUnitTest |
| } |
| |
| // this action is only called with opCode AFTER_UPDATE_OP |
| assert opCode == IndexProtocol.AFTER_UPDATE_OP; |
| Iterator iter = this.indexes.values().iterator(); |
| while (iter.hasNext()) { |
| Object ind = iter.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| IndexProtocol index = (IndexProtocol) ind; |
| |
| if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Updating index: {}{} value: {}", index.getName(), |
| this.region.getFullPath(), entry.getKey()); |
| } |
| start = ((AbstractIndex) index).updateIndexUpdateStats(); |
| |
| addIndexMapping(entry, index); |
| |
| ((AbstractIndex) index).updateIndexUpdateStats(start); |
| } |
| } |
| break; |
| } |
| case REMOVE_ENTRY: { |
| |
| if (IndexManager.testHook != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexManager TestHook in REMOVE_ENTRY."); |
| } |
| testHook.hook(5); |
| testHook.hook(10); |
| } |
| Iterator iter = this.indexes.values().iterator(); |
| while (iter.hasNext()) { |
| Object ind = iter.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| IndexProtocol index = (IndexProtocol) ind; |
| |
| if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) { |
| AbstractIndex abstractIndex = (AbstractIndex) index; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Removing from index: {}{} value: {}", index.getName(), |
| this.region.getFullPath(), entry.getKey()); |
| } |
| start = ((AbstractIndex) index).updateIndexUpdateStats(); |
| |
| index.removeIndexMapping(entry, opCode); |
| |
| ((AbstractIndex) index).updateIndexUpdateStats(start); |
| } |
| } |
| break; |
| } |
| default: { |
| throw new IndexMaintenanceException( |
| "Invalid action"); |
| } |
| } |
| } finally { |
| this.cache.setPdxReadSerializedOverride(initialPdxReadSerialized); |
| ((TXManagerImpl) this.cache.getCacheTransactionManager()).unpauseTransaction(tx); |
| |
| getCachePerfStats().endIndexUpdate(startPA); |
| } |
| } |
| |
| private void addIndexMapping(RegionEntry entry, IndexProtocol index) throws IMQException { |
| try { |
| index.addIndexMapping(entry); |
| } catch (Exception exception) { |
| index.markValid(false); |
| setPRIndexAsInvalid((AbstractIndex) index); |
| logger.warn("Put operation for the entry corrupted the index : " |
| + ((AbstractIndex) index).indexName + " with the exception : \n " + exception); |
| } |
| } |
| |
| private void setPRIndexAsInvalid(AbstractIndex index) { |
| if (index.prIndex != null) { |
| AbstractIndex prIndex = (AbstractIndex) index.prIndex; |
| prIndex.markValid(false); |
| } |
| } |
| |
| private void waitBeforeUpdate() { |
| synchronized (indexes) { |
| ++numCreators; |
| // Asif : If there exists any updater thread in progress |
| // we should not allow index creation to proceed. |
| while (numUpdatersInProgress > 0) { |
| ((LocalRegion) getRegion()).getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| indexes.wait(); |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // while |
| } |
| } |
| |
| private void notifyAfterUpdate() { |
| synchronized (indexes) { |
| --numCreators; |
| // ASIF: If the creator is in progress , this itself |
| // means that there is no Update active. The updater threads |
| // are either in wait state or there are no threads at all. |
| // Since we do not want any update to progress , if there is |
| // any creator thread in lock seeking mode ( meaning that it has |
| // entered the previous synch block) . We will not issue |
| // any notify till the creator count drops to zero & |
| // also unless there is any updater thread in waiting |
| if (numCreators == 0 && numUpdatersInWaiting > 0) { |
| indexes.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Recreates all indexes for this region. This operation blocks all updates on all indexes while |
| * recreate is in progress. This is required as recreate does NOT lock region entries before index |
| * update and hence might cause inconsistencies in index if concurrent region entry operations are |
| * going on. |
| * |
| */ |
| private void recreateAllIndexesForRegion() { |
| |
| long start = 0; |
| waitBeforeUpdate(); |
| try { |
| // opCode is ignored for this operation |
| Iterator iter = this.indexes.values().iterator(); |
| while (iter.hasNext()) { |
| Object ind = iter.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| IndexProtocol index = (IndexProtocol) ind; |
| if (index.getType() == IndexType.FUNCTIONAL || index.getType() == IndexType.HASH) { |
| AbstractIndex aIndex = ((AbstractIndex) index); |
| start = ((AbstractIndex) index).updateIndexUpdateStats(); |
| ((AbstractIndex) index).recreateIndexData(); |
| ((AbstractIndex) index).updateIndexUpdateStats(start); |
| |
| } |
| } |
| } catch (Exception e) { |
| throw new IndexInvalidException(e); |
| } finally { |
| notifyAfterUpdate(); |
| } |
| } |
| |
| /** |
| * Wait for index initialization before entry create, update, invalidate or destroy operation. |
| * |
| * Note: If the region has a disk region then we should wait for index initialization before |
| * getting region entry lock to avoid deadlock (#44431). |
| */ |
| public void waitForIndexInit() { |
| synchronized (this.indexes) { |
| ++this.numUpdatersInWaiting; |
| while (this.numCreators > 0) { |
| ((LocalRegion) this.getRegion()).getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.indexes.wait(); |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // while |
| --this.numUpdatersInWaiting; |
| ++this.numUpdatersInProgress; |
| } |
| } |
| |
| /** |
| * Necessary finally block call for above method. |
| */ |
| public void countDownIndexUpdaters() { |
| synchronized (this.indexes) { |
| --this.numUpdatersInProgress; |
| // Asif: Since Index creator threads can progress only if |
| // there is no update threads in progress, thus we need to issue |
| // notify all iff there are any creator threads in action & also |
| // if the upDateInProgress Count has dipped to 0 |
| if (this.numUpdatersInProgress == 0 && this.numCreators > 0) { |
| this.indexes.notifyAll(); |
| } |
| } |
| } |
| |
| private CachePerfStats getCachePerfStats() { |
| return ((HasCachePerfStats) this.region).getCachePerfStats(); |
| } |
| |
| /** |
| * Callback for destroying IndexManager Called after Region.destroy() called |
| */ |
| public void destroy() throws QueryException { |
| this.indexes.clear(); |
| if (!isIndexMaintenanceTypeSynchronous()) |
| updater.shutdown(); |
| } |
| |
| /** |
| * Removes indexes for a destroyed bucket region from the list of bucket indexes in the |
| * {@link PartitionedIndex}. |
| * |
| * @param prRegion the partition region that this bucket belongs to |
| */ |
| public void removeBucketIndexes(PartitionedRegion prRegion) throws QueryException { |
| IndexManager parentManager = prRegion.getIndexManager(); |
| if (parentManager != null) { |
| Iterator bucketIndexIterator = indexes.values().iterator(); |
| while (bucketIndexIterator.hasNext()) { |
| Index bucketIndex = (Index) bucketIndexIterator.next(); |
| Index prIndex = parentManager.getIndex(bucketIndex.getName()); |
| if (prIndex instanceof PartitionedIndex) { |
| ((PartitionedIndex) prIndex).removeFromBucketIndexes(this.region, bucketIndex); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| Iterator iter = this.indexes.values().iterator(); |
| while (iter.hasNext()) { |
| Object ind = iter.next(); |
| // Check if the value is instance of FutureTask, this means |
| // the index is in create phase. |
| if (ind instanceof FutureTask) { |
| continue; |
| } |
| sb.append(ind).append(getLineSeparator()); |
| } |
| return sb.toString(); |
| } |
| |
| public boolean isIndexMaintenanceTypeSynchronous() { |
| return this.indexMaintenanceSynchronous; |
| } |
| |
| public boolean isOverFlowRegion() { |
| return this.isOverFlowToDisk; |
| } |
| |
| public boolean isOffHeap() { |
| return this.offHeap; |
| } |
| |
| public static boolean isObjectModificationInplace() { |
| return (INPLACE_OBJECT_MODIFICATION || INPLACE_OBJECT_MODIFICATION_FOR_TEST); |
| } |
| |
| /** |
| * Asif : This function is used exclusively by Index Manager. It gets the unique Iterator name for |
| * a Iterator definition, if it already exists, else creates a unqiue name & also stores it in a |
| * map for subsequent use |
| * |
| * @param definition String containing definition of the iterator |
| * @return String containing the name of the Iterator |
| */ |
| public String putCanonicalizedIteratorNameIfAbsent(String definition) { |
| String str = null; |
| synchronized (canonicalizedIteratorNameMap) { |
| if ((str = (String) this.canonicalizedIteratorNameMap.get(definition)) == null) { |
| str = new StringBuilder("index_iter").append(this.getIncrementedCounter()).toString(); |
| String temp; |
| if ((temp = |
| (String) this.canonicalizedIteratorNameMap.putIfAbsent(definition, str)) != null) { |
| str = temp; |
| } |
| } |
| } |
| return str; |
| } |
| |
| public void putCanonicalizedIteratorName(String definition, String name) { |
| synchronized (canonicalizedIteratorNameMap) { |
| this.canonicalizedIteratorNameMap.put(definition, name); |
| } |
| } |
| |
| private synchronized int getIncrementedCounter() { |
| return ++this.iternameCounter; |
| } |
| |
| /** |
| * Asif : Given a definition returns the canonicalized iterator name for the definition. If the |
| * definition does not exist , null is returned |
| * |
| */ |
| public String getCanonicalizedIteratorName(String definition) { |
| return ((String) (this.canonicalizedIteratorNameMap.get(definition))); |
| } |
| |
| ////////////////////// Inner Classes ////////////////////// |
| |
| public class IndexUpdaterThread extends LoggingThread { |
| |
| private volatile boolean running = true; |
| |
| private volatile boolean shutdownRequested = false; |
| |
| private volatile BlockingQueue pendingTasks; |
| |
| /** |
| * Creates instance of IndexUpdaterThread |
| */ |
| IndexUpdaterThread(int updateThreshold, String threadName) { |
| super(threadName); |
| // Check if threshold is set. |
| if (updateThreshold > 0) { |
| // Create a bounded queue. |
| pendingTasks = new ArrayBlockingQueue(updateThreshold); |
| } else { |
| // Create non-bounded queue. |
| pendingTasks = new LinkedBlockingQueue(); |
| } |
| } |
| |
| public void addTask(int action, RegionEntry entry, int opCode) { |
| Object[] task = new Object[3]; |
| task[0] = action; |
| task[1] = entry; |
| task[2] = opCode; |
| pendingTasks.add(task); |
| } |
| |
| /** |
| * Stops this thread. Does not return until it has stopped. |
| */ |
| public void shutdown() { |
| if (!this.running) { |
| return; |
| } |
| this.shutdownRequested = true; |
| this.interrupt(); |
| try { |
| this.join(); |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| // just return, we're done |
| } |
| } |
| |
| @Override |
| public void run() { |
| // async writers main loop |
| // logger.debug("DiskRegion writer started (writer=" + this + ")"); |
| org.apache.geode.CancelCriterion stopper = ((LocalRegion) region).getCancelCriterion(); |
| try { |
| while (!this.shutdownRequested) { |
| // Termination checks |
| SystemFailure.checkFailure(); |
| if (stopper.isCancelInProgress()) { |
| break; |
| } |
| try { |
| Object[] task = (Object[]) pendingTasks.take(); |
| if (this.shutdownRequested) { |
| break; |
| } |
| updateIndexes(task); |
| } catch (InterruptedException ignore) { |
| return; // give up (exit the thread) |
| } |
| } |
| } finally { |
| this.running = false; |
| } |
| } |
| |
| private void updateIndexes(Object[] task) { |
| int action = (Integer) task[0]; |
| RegionEntry entry = (RegionEntry) task[1]; |
| int opCode = (Integer) task[2]; |
| // System.out.println("entry = "+entry.getKey()); |
| if (entry != null || action == RECREATE_INDEX) { |
| try { |
| if (action == RECREATE_INDEX) { |
| recreateAllIndexesForRegion(); |
| } else { |
| if (entry != null) { |
| entry.setUpdateInProgress(true); |
| } |
| processAction(entry, action, opCode); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } finally { |
| if (entry != null && action != RECREATE_INDEX) { |
| entry.setUpdateInProgress(false); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Used by tests to determine if the updater thread has finished updating its indexes. The list |
| * is cleared without synchronization, which makes this methods somewhat unsafe from a threading |
| * point of view. |
| */ |
| public synchronized boolean isDone() { |
| return this.pendingTasks.size() == 0; |
| } |
| |
| } |
| |
| /** |
| * Index Task used to create the index. This is used along with the FutureTask to take care of, |
| * same index creation request from multiple threads. At any time only one thread succeeds and |
| * other threads waits for the completion of the index creation. This avoids usage of |
| * synchronization which could block any index creation. |
| */ |
| public class IndexTask implements Callable<Index> { |
| |
| public String indexName; |
| |
| public IndexType indexType; |
| |
| public IndexCreationHelper helper; |
| |
| public String origFromClause; |
| |
| public String origIndexedExpression; |
| |
| public boolean isCompactOrHash = false; |
| |
| public boolean isLDM = false; |
| |
| public PartitionedIndex prIndex; |
| |
| public boolean loadEntries; |
| |
| private final InternalCache cache; |
| |
| IndexTask(InternalCache cache, String indexName, IndexType type, String origFromClause, |
| String origIndexedExpression, IndexCreationHelper helper, boolean isCompactOrHash, |
| PartitionedIndex prIndex, boolean loadEntries) { |
| this.cache = cache; |
| this.indexName = indexName; |
| this.indexType = type; |
| this.origFromClause = origFromClause; |
| this.origIndexedExpression = origIndexedExpression; |
| this.helper = helper; |
| this.isCompactOrHash = isCompactOrHash; |
| this.prIndex = prIndex; |
| this.loadEntries = loadEntries; |
| } |
| |
| /* For name based index search */ |
| IndexTask(InternalCache cache, String indexName) { |
| this.cache = cache; |
| this.indexName = indexName; |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| // TODO: equals should check the class of its parameter |
| if (other == null) { |
| return false; |
| } |
| IndexTask otherIndexTask = (IndexTask) other; |
| // compare indexName. |
| if (this.indexName.equals(otherIndexTask.indexName)) { |
| return true; |
| } |
| |
| if (otherIndexTask.helper == null || this.helper == null) { |
| return false; |
| } |
| |
| String[] indexDefinitions = this.helper.getCanonicalizedIteratorDefinitions(); |
| // TODO: avoid object creation in equals |
| int[] mapping = new int[indexDefinitions.length]; |
| // compare index based on its type, expression and definition. |
| if (compareIndexData(this.indexType, indexDefinitions, |
| this.helper.getCanonicalizedIndexedExpression(), otherIndexTask.indexType, |
| otherIndexTask.helper.getCanonicalizedIteratorDefinitions(), |
| otherIndexTask.helper.getCanonicalizedIndexedExpression(), mapping) == 0) { |
| return true; |
| } |
| return false; |
| } |
| |
| public int hashCode() { |
| // It returns a constant number as the equality check is based on |
| // the OR condition between the indexName and its characteristics |
| // (involving type, expression and definition), because of this |
| // its not possible to come-up with an accurate hash code. |
| return 99; |
| } |
| |
| /* |
| * Creates and initializes the index. |
| */ |
| @Override |
| public Index call() { |
| Index index = null; |
| String indexedExpression = helper.getCanonicalizedIndexedExpression(); |
| String fromClause = helper.getCanonicalizedFromClause(); |
| String projectionAttributes = helper.getCanonicalizedProjectionAttributes(); |
| String[] definitions = helper.getCanonicalizedIteratorDefinitions(); |
| IndexStatistics stats = null; |
| this.isLDM = IndexManager.IS_TEST_LDM; |
| |
| if (this.prIndex != null) { |
| stats = this.prIndex.getStatistics(); |
| } |
| if (indexType == IndexType.PRIMARY_KEY) { |
| index = new PrimaryKeyIndex(cache, indexName, region, fromClause, indexedExpression, |
| projectionAttributes, origFromClause, origIndexedExpression, definitions, stats); |
| logger.info("Using Primary Key index implementation for '{}' on region {}", indexName, |
| region.getFullPath()); |
| } else if (indexType == IndexType.HASH) { |
| index = new HashIndex(cache, indexName, region, fromClause, indexedExpression, |
| projectionAttributes, origFromClause, origIndexedExpression, definitions, stats); |
| |
| logger.info("Using Hash index implementation for '{}' on region {}", indexName, |
| region.getFullPath()); |
| } else { |
| // boolean isCompact = !helper.isMapTypeIndex() && |
| // shouldCreateCompactIndex((FunctionalIndexCreationHelper)helper); |
| if (this.isCompactOrHash || this.isLDM) { |
| if (indexType == IndexType.FUNCTIONAL && !helper.isMapTypeIndex()) { |
| index = new CompactRangeIndex(cache, indexName, region, fromClause, indexedExpression, |
| projectionAttributes, origFromClause, origIndexedExpression, definitions, stats); |
| logger.info("Using Compact Range index implementation for '{}' on region {}", indexName, |
| region.getFullPath()); |
| } else { |
| FunctionalIndexCreationHelper fich = (FunctionalIndexCreationHelper) helper; |
| index = new CompactMapRangeIndex(cache, indexName, region, fromClause, |
| indexedExpression, projectionAttributes, origFromClause, origIndexedExpression, |
| definitions, fich.isAllKeys(), fich.multiIndexKeysPattern, fich.mapKeys, stats); |
| logger.info("Using Compact Map Range index implementation for '{}' on region {}", |
| indexName, region.getFullPath()); |
| } |
| } else { |
| assert indexType == IndexType.FUNCTIONAL; |
| if (!helper.isMapTypeIndex()) { |
| index = new RangeIndex(cache, indexName, region, fromClause, indexedExpression, |
| projectionAttributes, origFromClause, origIndexedExpression, definitions, stats); |
| logger.info("Using Non-Compact index implementation for '{}' on region {}", indexName, |
| region.getFullPath()); |
| } else { |
| FunctionalIndexCreationHelper fich = (FunctionalIndexCreationHelper) helper; |
| index = new MapRangeIndex(cache, indexName, region, fromClause, indexedExpression, |
| projectionAttributes, origFromClause, origIndexedExpression, definitions, |
| fich.isAllKeys(), fich.multiIndexKeysPattern, fich.mapKeys, stats); |
| logger.info("Using Non-Compact Map index implementation for '{}' on region {}", |
| indexName, region.getFullPath()); |
| } |
| } |
| } |
| ((AbstractIndex) index).setPRIndex(prIndex); |
| |
| if (index.getType() != IndexType.PRIMARY_KEY) { |
| AbstractIndex aIndex = ((AbstractIndex) index); |
| aIndex.instantiateEvaluator(helper); |
| waitBeforeUpdate(); |
| boolean indexCreatedSuccessfully = false; |
| try { |
| ((LocalRegion) region).setFlagForIndexCreationThread(true); |
| aIndex.initializeIndex(loadEntries); |
| logger.info((loadEntries ? "Initialized and loaded entries into the index " |
| : "Initialized but entries not yet loaded into the index " + indexName |
| + " on region: " + region.getFullPath())); |
| aIndex.markValid(true); |
| indexCreatedSuccessfully = true; |
| if (loadEntries) { |
| aIndex.setPopulated(true); |
| if (this.prIndex != null) { |
| ((AbstractIndex) this.prIndex).setPopulated(true); |
| } |
| } |
| indexes.put(this, index); |
| if (region instanceof BucketRegion && prIndex != null) { |
| prIndex.addToBucketIndexes(region, index); |
| prIndex.incNumBucketIndexes(); |
| } |
| } catch (Exception e) { |
| throw new IndexInvalidException(e); |
| } finally { |
| notifyAfterUpdate(); |
| ((LocalRegion) region).setFlagForIndexCreationThread(false); |
| if (!indexCreatedSuccessfully) { |
| ((InternalIndexStatistics) index.getStatistics()).close(); |
| } |
| } |
| } else { |
| // For PrimaryKey index |
| ((AbstractIndex) index).setPopulated(true); |
| indexes.put(this, index); |
| if (region instanceof BucketRegion && prIndex != null) { |
| prIndex.addToBucketIndexes(region, index); |
| } |
| if (this.prIndex != null) { |
| ((AbstractIndex) this.prIndex).setPopulated(true); |
| } |
| } |
| return index; |
| } |
| } |
| } |