blob: 0a9b16607c6c8450051fa2f37adcbc4a4fbfebda [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/*
* IndexManager.java
*
* Created on February 15, 2005, 11:49 AM
*/
package com.gemstone.gemfire.cache.query.internal.index;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.query.AmbiguousNameException;
import com.gemstone.gemfire.cache.query.Index;
import com.gemstone.gemfire.cache.query.IndexExistsException;
import com.gemstone.gemfire.cache.query.IndexInvalidException;
import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
import com.gemstone.gemfire.cache.query.IndexNameConflictException;
import com.gemstone.gemfire.cache.query.IndexStatistics;
import com.gemstone.gemfire.cache.query.IndexType;
import com.gemstone.gemfire.cache.query.MultiIndexCreationException;
import com.gemstone.gemfire.cache.query.NameResolutionException;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.TypeMismatchException;
import com.gemstone.gemfire.cache.query.internal.CompiledPath;
import com.gemstone.gemfire.cache.query.internal.CompiledValue;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
import com.gemstone.gemfire.cache.query.internal.MapIndexable;
import com.gemstone.gemfire.cache.query.internal.NullToken;
import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
import com.gemstone.gemfire.cache.query.internal.QueryObserver;
import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
import com.gemstone.gemfire.cache.query.internal.index.AbstractIndex.InternalIndexStatistics;
import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.CachePerfStats;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
/**
* @author vaibhav
* @author asif
*/
public class IndexManager {
private static final Logger logger = LogService.getLogger();
public static final int ADD_ENTRY = 1;
public static final int UPDATE_ENTRY = 2;
public static final int REMOVE_ENTRY = 3;
//Asif : This action is to rerun Index creation after
//clear is called on the region
public static final int RECREATE_INDEX = 4;
protected final Region region;
private final boolean isOverFlowToDisk;
private final boolean offHeap;
private final boolean indexMaintenanceSynchronous;
private int numCreators = 0;
private int numUpdatersInProgress = 0;
private int numUpdatersInWaiting = 0;
private int iternameCounter = 0;
/*
* Map containing <IndexTask, FutureTask<IndexTask> or Index>. IndexTask
* represents an index thats completely created or one thats in create phase.
* This is done in order to avoid synchronization on the indexes.
*/
private final ConcurrentMap indexes = new ConcurrentHashMap();
//TODO Asif : Fix the appropriate size of the Map & the concurrency level
private ConcurrentMap canonicalizedIteratorNameMap = new ConcurrentHashMap();
private IndexUpdaterThread updater;
// Threshold for Queue.
private final int INDEX_MAINTENANCE_BUFFER = Integer.getInteger("gemfire.AsynchIndexMaintenanceThreshold", -1).intValue();
// Added for test purposes only.
public static boolean INPLACE_OBJECT_MODIFICATION_FOR_TEST = false;
//Added for testing only
public static boolean IS_TEST_LDM = false;
public static boolean IS_TEST_EXPANSION = false;
/**
* System property to maintain the ReverseMap to take care in-place modification of the
* objects by the application.
* In case of in-place modification the EntryEvent will not have the old-value, without this
* the old-values are not removed from the index-maps thus resulting in inconsistent results.
*/
public static final boolean INPLACE_OBJECT_MODIFICATION =
Boolean.valueOf(System.getProperty("gemfire.index.INPLACE_OBJECT_MODIFICATION", "false")).booleanValue();
/**
* System property to turn-off the compact-index support.
*/
public static final boolean RANGEINDEX_ONLY =
Boolean.valueOf(System.getProperty("gemfire.index.RANGEINDEX_ONLY", "false")).booleanValue();
/** For test purpose only */
public static boolean TEST_RANGEINDEX_ONLY = false;
public static final String INDEX_ELEMARRAY_THRESHOLD_PROP = "index_elemarray_threshold";
public static final String INDEX_ELEMARRAY_SIZE_PROP = "index_elemarray_size";
public static final int INDEX_ELEMARRAY_THRESHOLD = Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_THRESHOLD_PROP,"100"));
public static final int INDEX_ELEMARRAY_SIZE = Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_SIZE_PROP,"5"));
public final static AtomicLong SAFE_QUERY_TIME = new AtomicLong(0);
/** The NULL constant */
public static final Object NULL = new NullToken();
public static TestHook testHook;
//private int numCreatorsInWaiting = 0;
// @todo ericz
// there should be a read/write lock PER INDEX in order to maximize
// the concurrency of query execution.
public IndexManager(Region region) {
this.region = region;
// must be a SortedMap to ensure the indexes are iterated over in fixed
// order
// to avoid deadlocks when acquiring locks
//indexes = Collections.synchronizedSortedMap(new TreeMap());
indexMaintenanceSynchronous = region.getAttributes()
.getIndexMaintenanceSynchronous();
isOverFlowToDisk = region.getAttributes().getEvictionAttributes()
.getAction().isOverflowToDisk();
this.offHeap = region.getAttributes().getOffHeap();
if (!indexMaintenanceSynchronous) {
final LoggingThreadGroup group =
LoggingThreadGroup.createThreadGroup("QueryMonitor Thread Group", logger);
updater = new IndexUpdaterThread(group, this.INDEX_MAINTENANCE_BUFFER,
"OqlIndexUpdater:" + region.getFullPath());
updater.start();
}
}
/**
* Stores the largest combination of current time + delta
* If there is a large delta/hiccup in timings, this allows us to calculate the
* correct results for a query but, reevaluate more aggressively.
* But the large hiccup will eventually be rolled off as time is always increasing
* This is a fix for #47475
*
* @param operationTime the last modified time from version tag
* @param currentCacheTime
*/
public static boolean setIndexBufferTime(long operationTime, long currentCacheTime) {
long timeDifference = currentCacheTime - operationTime;
return setNewLargestValue(SAFE_QUERY_TIME, currentCacheTime + timeDifference);
}
/** only for test purposes
* This should not be called from any product code. Calls from product code will
* possibly cause continous reevaluation (performance issue) OR
* incorrect query results (functional issue)
**/
public static void resetIndexBufferTime() {
SAFE_QUERY_TIME.set(0);
}
/**
* Calculates whether we need to reevluate the key for the region entry
* We added a way to determine whether to reevaluate an entry for query execution
* The method is to keep track of the delta and current time in a single long value
* The value is then used by the query to determine if a region entry needs to be reevaluated,
* based on subtracting the value with the query execution time. This provides a delta + some false positive time (dts)
* If the dts + last modified time of the region entry is > query start time,
* we can assume that it needs to be reevaluated
*
* This is to fix bug 47475, where references to region entries can be held
* by the executing query either directly or indirectly (iterators can hold
* references for next) but the values underneath could change.
*
* Small amounts of false positives are ok as it will have a slight impact on performance
* @param queryStartTime
* @param lastModifiedTime
*/
public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) {
return queryStartTime <= SAFE_QUERY_TIME.get() - queryStartTime + lastModifiedTime;
}
/**
*
* @param value
* @param newValue
*/
private static boolean setNewLargestValue(AtomicLong value, long newValue) {
boolean done = false;
while (!done) {
long oldValue = value.get();
if (oldValue < newValue ) {
return value.compareAndSet(oldValue, newValue);
}
else {
done = true;
}
}
return false;
}
/** Test Hook */
public interface TestHook {
public void hook(final int spot) throws RuntimeException;
}
/**
* The Region this IndexManager is associated with
*
* @return the Region for this IndexManager
*/
public Region getRegion() {
return region;
}
/**
* Used by tests to access the updater thread to determine its progress
*/
public IndexUpdaterThread getUpdaterThread() {
return this.updater;
}
// @todo need more specific list of exceptions
/**
* Create an index that can be used when executing queries.
*
* @param indexName the name of this index, used for statistics collection
* @param indexType the type of index
* @param origIndexedExpression the expression to index on, a function
* dependent on region entries individually.
* @param origFromClause expression that evaluates to the collection(s) that
* will be queried over, must contain one and only one region path.
* @return the newly created Index
*/
public Index createIndex(String indexName, IndexType indexType,
String origIndexedExpression, String origFromClause, String imports,
ExecutionContext externalContext, PartitionedIndex prIndex, boolean loadEntries)
throws IndexNameConflictException, IndexExistsException,
IndexInvalidException {
if (QueryMonitor.isLowMemory()) {
throw new IndexInvalidException(LocalizedStrings.IndexCreationMsg_CANCELED_DUE_TO_LOW_MEMORY.toLocalizedString());
}
boolean oldReadSerialized = DefaultQuery.getPdxReadSerialized();
DefaultQuery.setPdxReadSerialized(this.region.getCache(), true);
TXStateProxy tx = null;
if (!((GemFireCacheImpl)this.region.getCache()).isClient()) {
tx = ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalSuspend();
}
try {
String projectionAttributes = "*"; // for now this is the only option
if (getIndex(indexName) != null) {
throw new IndexNameConflictException(LocalizedStrings.
IndexManager_INDEX_NAMED_0_ALREADY_EXISTS.toLocalizedString(indexName));
}
IndexCreationHelper helper = null;
boolean isCompactOrHash = false;
if (indexType != IndexType.PRIMARY_KEY) {
helper = new FunctionalIndexCreationHelper(origFromClause,
origIndexedExpression, projectionAttributes, imports, region.getCache(),
externalContext, this);
//Asif: For now support Map index as non compact .expand later
//The limitation for compact range index also apply to hash index for now
isCompactOrHash = shouldCreateCompactIndex((FunctionalIndexCreationHelper)helper);
} else if (indexType == IndexType.PRIMARY_KEY) {
helper = new PrimaryKeyIndexCreationHelper(origFromClause,
origIndexedExpression, projectionAttributes, region.getCache(),
externalContext, this);
} else {
throw new AssertionError("Don't know how to set helper for " + indexType);
}
if (!isCompactOrHash && indexType != IndexType.PRIMARY_KEY) {
if (indexType == IndexType.HASH ) {
if (!isIndexMaintenanceTypeSynchronous()) {
throw new UnsupportedOperationException(LocalizedStrings.DefaultQueryService_HASH_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_ASYNC_MAINTENANCE.toLocalizedString());
}
throw new UnsupportedOperationException(LocalizedStrings.DefaultQueryService_HASH_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_MULTIPLE_ITERATORS.toLocalizedString());
}
// Overflow is not supported with range index.
if(isOverFlowRegion()) {
throw new UnsupportedOperationException(LocalizedStrings.DefaultQueryService_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_REGIONS_WHICH_OVERFLOW_TO_DISK_THE_REGION_INVOLVED_IS_0.toLocalizedString(region.getFullPath()));
}
// OffHeap is not supported with range index.
if(isOffHeap()) {
if (!isIndexMaintenanceTypeSynchronous()) {
throw new UnsupportedOperationException(LocalizedStrings.DefaultQueryService_OFF_HEAP_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_ASYNC_MAINTENANCE_THE_REGION_IS_0.toLocalizedString(region.getFullPath()));
}
throw new UnsupportedOperationException(LocalizedStrings.DefaultQueryService_OFF_HEAP_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_MULTIPLE_ITERATORS_THE_REGION_IS_0.toLocalizedString(region.getFullPath()));
}
}
if (logger.isDebugEnabled()) {
logger.debug("Started creating index with indexName: {} On region: {}", indexName, region.getFullPath());
}
if (IndexManager.testHook != null) {
if (logger.isDebugEnabled()) {
logger.debug("IndexManager TestHook is set.");
}
if (((LocalRegion)this.region).isInitialized()) {
testHook.hook(1);
} else {
testHook.hook(0);
}
}
IndexTask indexTask = new IndexTask(indexName, indexType, origFromClause,
origIndexedExpression, helper, isCompactOrHash, prIndex, loadEntries);
FutureTask<Index> indexFutureTask = new FutureTask<Index>(indexTask);
Object oldIndex = this.indexes.putIfAbsent(indexTask, indexFutureTask);
Index index = null;
boolean interrupted = false;
try {
if (oldIndex == null){
// Initialize index.
indexFutureTask.run();
// Set the index.
index = (Index)indexFutureTask.get();
} else {
// Index with same name or characteristic already exists.
// Check if index creation is complete.
if (!(oldIndex instanceof Index)){
// Some other thread is creating the same index.
// Wait for index to be initialized from other thread.
((FutureTask)oldIndex).get();
}
// The Index is successfully created, throw appropriate error message
// from this thread.
if (getIndex(indexName) != null) {
throw new IndexNameConflictException(LocalizedStrings.
IndexManager_INDEX_NAMED_0_ALREADY_EXISTS.toLocalizedString(indexName));
} else {
throw new IndexExistsException(LocalizedStrings.
IndexManager_SIMILAR_INDEX_EXISTS.toLocalizedString());
}
}
} catch (InterruptedException ie){
interrupted = true;
} catch (ExecutionException ee){
Throwable c = ee.getCause();
if (c instanceof IndexNameConflictException) {
throw (IndexNameConflictException)c;
} else if (c instanceof IndexExistsException){
throw (IndexExistsException)c;
} else if (c instanceof IMQException) {
throw new IndexInvalidException(c.getMessage());
}
throw new IndexInvalidException(ee);
} finally {
// If the index is not successfully created, remove IndexTask from
// the map.
if (oldIndex == null && index == null){
Object ind = this.indexes.get(indexTask);
if (ind != null && !(ind instanceof Index)){
this.indexes.remove(indexTask);
}
}
if (interrupted){
Thread.currentThread().interrupt();
}
}
assert (index != null);
if (logger.isDebugEnabled()) {
logger.debug("Completed creating index with indexName: {} On region: {}", indexName, region.getFullPath());
}
return index;
} finally {
DefaultQuery.setPdxReadSerialized(this.region.getCache(), oldReadSerialized);
if (tx != null) {
((TXManagerImpl) this.region.getCache().getCacheTransactionManager())
.resume(tx);
}
}
}
/**
* Return true if we should create CompactRangeIndex
* Required conditions: indexedExpression is a path expression,
* fromClause has only one iterator and it is directly on the
* region values.
* Currently we have to use the "fat" implementation when asynchronous
* index updates are on.
*/
private boolean shouldCreateCompactIndex(FunctionalIndexCreationHelper helper) {
if (RANGEINDEX_ONLY || TEST_RANGEINDEX_ONLY){
return false;
}
// compact range index is not supported on asynchronous index maintenance.
// since compact range index maintains reference to region entry, in case of
// asynchronous updates the window between cache operation updating the
// index increases causing query thread to return new value before doing
// index evaluation (resulting in wrong value. There is still a small window
// which can be addressed by the sys property:
// gemfire.index.acquireCompactIndexLocksWithRegionEntryLocks
if (!getRegion().getAttributes().getIndexMaintenanceSynchronous()) {
return false;
}
// indexedExpression requirement
CompiledValue cv = helper.getCompiledIndexedExpression();
int nodeType;
do {
nodeType = cv.getType();
if (nodeType == CompiledValue.PATH) {
cv = ((CompiledPath)cv).getReceiver();
}
} while (nodeType == CompiledValue.PATH);
// end of path, nodeType at this point should be an Identifier
if (nodeType != OQLLexerTokenTypes.Identifier && nodeType != OQLLexerTokenTypes.METHOD_INV) {
if (nodeType == OQLLexerTokenTypes.TOK_LBRACK && !helper.isMapTypeIndex() && helper.modifiedIndexExpr instanceof MapIndexable) {
if (((MapIndexable)helper.modifiedIndexExpr).getIndexingKeys().size() == 1) {
}
else {
return false;
}
}
else {
return false;
}
}
// fromClause requirement
List iterators = helper.getIterators();
if (iterators.size() != 1) {
return false;
}
// "missing link" must be "value". Later to support key, entry, etc.
CompiledValue missingLink = helper.missingLink;
if (helper.isFirstIteratorRegionEntry) {
return true;
} else if (!(missingLink instanceof CompiledPath)) {
return false;
}
String tailId = ((CompiledPath)missingLink).getTailID();
if (!(tailId.equals("value") || tailId.equals("key"))) {
return false;
}
return true;
}
public Index getIndex(String indexName) {
IndexTask indexTask = new IndexTask(indexName);
Object ind = this.indexes.get(indexTask);
// Check if the returned value is instance of Index (this means
// the index is not in create phase, its created successfully).
if (ind instanceof Index){
return (Index)ind;
}
return null;
}
public void addIndex(String indexName, Index index) {
IndexTask indexTask = new IndexTask(indexName);
this.indexes.put(indexTask, index);
}
/**
* Get the Index with the specified indexType, fromClause, indexedExpression
* TODO: Asif :Check if synchronization is needed while obtaining Array of
* Indexes as similar to what we have used during index updates. This function
* will get the exact index , if available, else will return null
*
* @param indexType the type of index
* @param definitions the String array containing the required defintions of the
* fromClause of the index
* @param indexedExpression the indexedExpression for the index
* @param context ExecutionContext
* @return the sole index of the region with these parameters, or null if
* there isn't one
* @throws NameResolutionException
* @throws TypeMismatchException
* @throws AmbiguousNameException
*/
public IndexData getIndex(IndexType indexType, String[] definitions,
CompiledValue indexedExpression, ExecutionContext context)
throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
IndexData indxData = null;
int qItrSize = definitions.length;
Iterator it = this.indexes.values().iterator();
StringBuffer sb = new StringBuffer();
indexedExpression.generateCanonicalizedExpression(sb, context);
String indexExprStr = sb.toString();
while (it.hasNext()) {
int mapping[] = new int[qItrSize];
Object ind = it.next();
// Check if the returned value is instance of Index (this means
// the index is not in create phase, its created successfully).
if (!(ind instanceof Index)){
continue;
}
Index index = (Index)ind;
if( !((IndexProtocol)ind).isMatchingWithIndexExpression(indexedExpression,
indexExprStr, context) || index.getType() != indexType) {
continue;
}
int matchLevel = getMatchLevel(definitions,
((IndexProtocol)index).getCanonicalizedIteratorDefinitions(), mapping);
if (matchLevel == 0){
indxData = new IndexData((IndexProtocol) index, 0/*Exact Match*/, mapping);
break;
}
}
return indxData;
}
public int compareIndexData(IndexType indexType, String[] indexDefinitions,
String indexExpression, IndexType otherType, String[] otherDefinitions,
String otherExpression, int mapping[]) {
int matchLevel = -2;
if (indexExpression.equals(otherExpression) && indexType == otherType) {
/* Asif : A match level of zero means exact match. */
matchLevel = getMatchLevel(otherDefinitions, indexDefinitions, mapping);
}
return matchLevel;
}
/**
* Asif : Returns the best available Index based on the available iterators in
* the Group
*
* TODO: Asif :Check if synchronization is needed while obtaining Array of
* Indexes as similar to what we have used during index updates
*
* @param indexType Primary or Range Index
* @param definitions String array containing the canonicalized definitions of
* the Iterators of the Group
* @param indexedExpression Index Expression path(CompiledValue) on which
* index needs to be created
* @param context ExecutionContext object
* @return IndexData object
* @throws NameResolutionException
* @throws TypeMismatchException
* @throws AmbiguousNameException
*/
public IndexData getBestMatchIndex(IndexType indexType, String[] definitions,
CompiledValue indexedExpression, ExecutionContext context)
throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
Index bestIndex = null;
Index bestPRIndex = null;
int[] bestMapping = null;
int qItrSize = definitions.length;
int bestIndexMatchLevel = qItrSize;
Iterator iter = this.indexes.values().iterator();
StringBuffer sb = new StringBuffer();
indexedExpression.generateCanonicalizedExpression(sb, context);
String indexExprStr = sb.toString();
PartitionedIndex prIndex = null;
Index prevBestPRIndex = null;
Index prevBestIndex = null;
Index index;
while (iter.hasNext()) {
Object ind = iter.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask){
continue;
}
//If the index is still empty
if(!((AbstractIndex)ind).isPopulated()) {
continue;
}
index = (Index)ind;
if (index instanceof PartitionedIndex) {
prIndex = (PartitionedIndex)index;
// Get one of the bucket index. This index will be
// available on all the buckets.
index = prIndex.getBucketIndex();
if (index == null) {
continue;
}
}
//System.out.println(" Index = "+index);
//Use simple strategy just pick first compatible index
if ( ((IndexProtocol)index).isMatchingWithIndexExpression(indexedExpression,indexExprStr,
context) && index.getType() == indexType ){
// For PR the matched index needs to be available on all the query buckets.
if (prIndex != null) {
try {
// Protect the PartitionedIndex from being removed when it is being used.
if (!prIndex.acquireIndexReadLockForRemove()) {
continue;
}
prIndex.verifyAndCreateMissingIndex(context.getBucketList());
} catch (Exception ex) {
// Index is not there on all buckets.
// ignore this index.
prIndex.releaseIndexReadLockForRemove();
prIndex = null;
continue;
}
} else {
// For index on replicated regions
if (!((AbstractIndex)index).acquireIndexReadLockForRemove()) {
continue;
}
}
/*
* Asif : A match level of zero means exact match. A match level greater
* than 0 means the query from clauses have extra iterators as compared
* to Index resultset ( This does not neccessarily mean that Index
* resultset is not having extra fields. It is just that the criteria
* for match level is the absence or presence of extra iterators. The
* order of preference will be 0 , <0 , > 0 for first cut.
*/
String indexDefinitions[] = ((IndexProtocol) index).getCanonicalizedIteratorDefinitions();
int mapping[] = new int[qItrSize];
int matchLevel = getMatchLevel(definitions, indexDefinitions, mapping);
if (matchLevel == 0) {
prevBestPRIndex = bestPRIndex;
bestPRIndex = prIndex;
prevBestIndex = bestIndex;
bestIndex = index;
bestIndexMatchLevel = matchLevel;
bestMapping = mapping;
// If we chose new index we should release lock on previous index
// chosen as bestIndex.
if (prIndex != null && prevBestPRIndex != null && prevBestPRIndex instanceof PartitionedIndex) {
((PartitionedIndex) prevBestPRIndex).releaseIndexReadLockForRemove();
prevBestPRIndex = null;
} else if (prevBestIndex != null) {
((AbstractIndex) prevBestIndex).releaseIndexReadLockForRemove();
prevBestIndex = null;
}
break;
} else if ((bestIndexMatchLevel > 0 && matchLevel < bestIndexMatchLevel)
|| (bestIndexMatchLevel < 0 && matchLevel < 0 && matchLevel > bestIndexMatchLevel)) {
prevBestPRIndex = bestPRIndex;
bestPRIndex = prIndex;
prevBestIndex = bestIndex;
bestIndex = index;
bestIndexMatchLevel = matchLevel;
bestMapping = mapping;
}
// release the lock if this index is not chosen as bestIndex.
if (prIndex != null && bestPRIndex != prIndex) {
prIndex.releaseIndexReadLockForRemove();
prIndex = null;
} else if(bestIndex != index) {
((AbstractIndex)index).releaseIndexReadLockForRemove();
index = null;
}
// If we chose new index we should release lock on previous index
// chosen as bestIndex.
if (prevBestPRIndex != null && prevBestPRIndex instanceof PartitionedIndex) {
((PartitionedIndex) prevBestPRIndex).releaseIndexReadLockForRemove();
prevBestPRIndex = null;
} else if (prevBestIndex != null) {
((AbstractIndex) prevBestIndex).releaseIndexReadLockForRemove();
prevBestIndex = null;
}
}
}
if (bestIndex != null) {
if (logger.isDebugEnabled()) {
logger.debug("The best index found for index expression: {} is: {} with Match-level: {} and mapping: {}", indexExprStr, bestIndex, bestIndexMatchLevel, Arrays.toString(bestMapping));
}
}
return bestIndex != null ? new IndexData((IndexProtocol) bestIndex,
bestIndexMatchLevel, bestMapping) : null;
}
/*
* Asif : This function returns the best match index. The crietria used to
* identify best match index is based currently , relative to the query from
* clause. If the iterators of query from clause exactly match the index from
* clause , then match level is zero & is the best match. If the query from
* clause contain extra iterators , not available in index from clause, then
* mach level is > 0 & is not the best. If the match level is < 0 that means
* Index from clause have some extra iterators as compared to query. The int
* array gives the mapping of query iterator's position relative to the index
* resultset fields . The position is '1' based index. That is for the first
* query iterator ( 0 position), the mapping will contain 1 which can be
* thought of as Index ResultSet value at the field with name index_iter1. If
* the second query iterator has a value 3 , that means for (1 position)
* iterator , the field will have name index_iter3
*/
private static int getMatchLevel(String[] queryDefintions,
String[] indexDefinitions, int[] mapping) {
int qItrLen = queryDefintions.length;
int indxItrLen = indexDefinitions.length;
// Asif : We know that because index expressions have matched that
// itself indicates that the first iterator, which is regon iterator
// has matched. So the index field position of the first RuntimeIterator
// of the Query group is 1
mapping[0] = 1;
int matchLevel = qItrLen - 1;
for (int i = 1; i < qItrLen; ++i) {
for (int j = 1; j < indxItrLen; ++j) {
if (queryDefintions[i].equals(indexDefinitions[j])) {
mapping[i] = ++j;
--matchLevel;
break;
}
}
}
if (matchLevel == 0 && indxItrLen > qItrLen) {
matchLevel = qItrLen - indxItrLen;
}
return matchLevel;
}
/*
* private static int getMatchLevel(String fromClause, String iFromClause) {
* if (fromClause.equals(iFromClause)) return 0; if
* (fromClause.startsWith(iFromClause)) { int cnt = -1; int index =
* fromClause.indexOf(',', iFromClause.length() + 1); while (index > 0) {
* cnt--; index = fromClause.indexOf(',', index + 1); } return cnt; } else if
* (iFromClause.startsWith(fromClause)) { int cnt = 1; int index =
* iFromClause.indexOf(',', fromClause.length() + 1); while (index > 0) {
* cnt++; index = iFromClause.indexOf(',', index + 1); } return cnt; } //No
* compatible return Integer.MAX_VALUE; }
*/
/**
* Get a collection of all the indexes. If the IndexType is specified
* returns only the matching indexes.
* @param indexType the type of indexes to get. Currently must be
* Indexable.FUNCTIONAL_SORTED
* @return the collection of indexes for the specified region and type
*/
public Collection getIndexes(IndexType indexType) {
ArrayList list = new ArrayList();
Iterator it = this.indexes.values().iterator();
while (it.hasNext()) {
Object ind = it.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask){
continue;
}
Index index = (Index)ind;
// Check if indexType needs to be matched.
if (indexType == null) { // No type check.
list.add(index);
} else if(index.getType() == indexType) {
list.add(index);
}
}
return list;
}
/**
* Get a collection of all the indexes managed by IndexManager
*
* @return the collection of indexes on the specified region
*/
public Collection getIndexes() {
return getIndexes(null);
}
// @todo need more specific list of exceptions
/**
* Remove the specified index.
*
* @param index the Index to remove
*/
public void removeIndex(Index index) {
if (index.getRegion() != this.region)
{
throw new IllegalArgumentException(LocalizedStrings.
IndexManager_INDEX_DOES_NOT_BELONG_TO_THIS_INDEXMANAGER.toLocalizedString());
}
//Asif: We will just remove the Index from the map. Since the
// TreeMap is synchronized & the operation of adding a newly created
// index is in synch there will not be any situation where the unintended
// Index gets removed( in case of same Index Name scenario).
// If query obtains the Index handle which is getting removed , that
// is OK as we are not clearing data maps . The destroy though marks
// the index invalid , that is OK. Because of this flag a query
// may or may not use the Index
IndexTask indexTask = new IndexTask(index.getName());
if (this.indexes.remove(indexTask) != null) {
AbstractIndex indexHandle = (AbstractIndex) index;
indexHandle.destroy();
}
}
// @todo need more specific list of exceptions
/**
* Remove all the indexes managed by IndexManager
*/
public int removeIndexes() {
// Remove indexes which are available (create completed).
int numIndexes = 0;
Iterator it = this.indexes.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry)it.next();
Object ind = entry.getValue();
// Check if the returned value is instance of Index (this means
// the index is not in create phase, its created successfully).
if (!(ind instanceof Index)){
continue;
}
numIndexes++;
IndexTask indexTask = (IndexTask)entry.getKey();
this.indexes.remove(indexTask);
}
return numIndexes;
}
/**
* Asif : This function is invoked during clear operation on Region. It causes
* re execution of Index Initialization query on the region & before doing
* this it makes theexisting data maps null. This is needed so that index does
* not miss any entry being put in the region when the Region.clear is in
* progress
*
* @throws QueryException
*/
public void rerunIndexCreationQuery() throws QueryException {
try {
QueryObserver observer = QueryObserverHolder.getInstance();
observer.beforeRerunningIndexCreationQuery();
}
catch (Exception e) {
//Asif Ignore any exception as this should not hamper normal code flow
if (logger.isDebugEnabled()) {
logger.debug("IndexMananger::rerunIndexCreationQuery: Exception in callback beforeRerunningIndexcreationQuery", e);
}
}
if (isIndexMaintenanceTypeSynchronous()) {
recreateAllIndexesForRegion();
}
else {
//System.out.println("Aynchronous update");
updater.addTask(RECREATE_INDEX, null, IndexProtocol.OTHER_OP);
}
}
/**
* populates all the indexes in the region
*/
public void populateIndexes(Collection<Index> indexSet) throws MultiIndexCreationException {
waitBeforeUpdate();
if(region.getCache().getLogger().infoEnabled()) {
region.getCache().getLogger().info("Populating indexes for region " + region.getName());
}
boolean throwException = false;
HashMap<String, Exception> exceptionsMap = new HashMap<String, Exception>();
try {
Iterator entryIter = ((LocalRegion) region).getBestIterator(true);
while (entryIter.hasNext()) {
RegionEntry entry = (RegionEntry) entryIter.next();
if (entry == null || entry.isInvalidOrRemoved()) {
continue;
}
// Fault in the value once before index update so that every index
// update does not have
// to read the value from disk every time.
// TODO OFFHEAP: this optimization (calling getValue to make sure it is faulted in to disk) has a performance problem.
// It also decompresses and deserializes the value and then throws that away. In the case of a heap region the deserialized
// value would be cached in a VMCachedDeserializable. But for compression and/or off-heap the decompression and/or deserialization
// this call does is lost and has to be done again. We could just add a method to RegionEntry that faults the value in without returning it.
// Even better (but more work): could we create a wrapper around RegionEntry that we create here to wrap "entry" and pass the wrapper to addIndexMapping?
// Any indexes that store a reference to the RegionEntry would need to ask the wrapper for the real one but any of them
// that want the value could get it from the wrapper. The first time the wrapper is asked for the value it could get it from
// the real RegionEntry it wraps and cache a reference to that value. I think that gives us the best of both worlds.
entry.getValue((LocalRegion)this.region);
Iterator<Index> indexSetIterator = indexSet.iterator();
while(indexSetIterator.hasNext()) {
AbstractIndex index = (AbstractIndex) indexSetIterator.next();
if (!index.isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
if (logger.isDebugEnabled()) {
logger.debug("Adding to index :{}{} value :{}", index.getName(), this.region.getFullPath(), entry.getKey());
}
long start = ((AbstractIndex) index).updateIndexUpdateStats();
try {
index.addIndexMapping(entry);
} catch (IMQException e) {
if(logger.isDebugEnabled()) {
logger.debug("Adding to index failed for: {}, {}", index.getName(), e.getMessage(), e);
}
exceptionsMap.put(index.indexName, e);
indexSetIterator.remove();
throwException = true;
}
((AbstractIndex) index).updateIndexUpdateStats(start);
}
}
}
setPopulateFlagForIndexes(indexSet);
if (throwException) {
throw new MultiIndexCreationException(exceptionsMap);
}
} finally {
notifyAfterUpdate();
}
}
/**
* Sets the {@link AbstractIndex#isPopulated} after
* populating all the indexes in this region
*/
public void setPopulateFlagForIndexes(Collection<Index> indexSet) {
for (Object ind : indexSet) {
AbstractIndex index = (AbstractIndex) ind;
if (!index.isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
index.setPopulated(true);
}
}
}
public void updateIndexes(RegionEntry entry, int action, int opCode) throws QueryException {
updateIndexes(entry, action, opCode, false);
}
// @todo need more specific list of exceptions
/**
* Callback for IndexManager to update indexes Called from AbstractRegionMap.
*
* @param entry the RegionEntry being updated
* @param action action to be taken (IndexManager.ADD_ENTRY,
* IndexManager.UPDATE_ENTRY, IndexManager.REMOVE_ENTRY)
* @param opCode one of IndexProtocol.OTHER_OP, BEFORE_UPDATE_OP, AFTER_UPDATE_OP.
* @throws com.gemstone.gemfire.cache.query.IndexMaintenanceException
*/
public void updateIndexes(RegionEntry entry, int action, int opCode, boolean isDiskRecoveryInProgress)
throws QueryException {
if(isDiskRecoveryInProgress) {
assert !((LocalRegion)this.region).isInitialized();
} else {
assert Assert.assertHoldsLock(entry,true);
}
if(logger.isDebugEnabled()) {
logger.debug("IndexManager.updateIndexes {} + action: {}", entry.getKey(), action);
}
if (entry == null) return;
if (isIndexMaintenanceTypeSynchronous()) {
//System.out.println("Synchronous update");
processAction(entry, action, opCode);
}
else {
//System.out.println("Aynchronous update");
updater.addTask(action, entry, opCode);
}
}
/**
* @param opCode one of IndexProtocol.OTHER_OP, BEFORE_UPDATE_OP, AFTER_UPDATE_OP.
*/
private void processAction(RegionEntry entry, int action, int opCode)
throws QueryException {
final long startPA = getCachePerfStats().startIndexUpdate();
DefaultQuery.setPdxReadSerialized(this.region.getCache(), true);
TXStateProxy tx = null;
if (!((GemFireCacheImpl)this.region.getCache()).isClient()) {
tx = ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalSuspend();
}
try {
//Asif: Allow the thread to update iff there is no current index
//creator thread in progress. There will not be any issue if
// allow the updater thread to proceed if there is any index
//creator thread in waiting , but that can cause starvation
//for index creator thread. So we will give priorityto index
//creation thread
if (IndexManager.testHook != null) {
if (logger.isDebugEnabled()) {
logger.debug("IndexManager TestHook is set.");
}
testHook.hook(6); //ConcurrentIndexInitOnOverflowRegionDUnitTest
}
long start = 0;
boolean indexLockAcquired = false;
switch (action) {
case ADD_ENTRY: {
if (IndexManager.testHook != null) {
if (logger.isDebugEnabled()) {
logger.debug("IndexManager TestHook in ADD_ENTRY.");
}
testHook.hook(5);
}
// this action is only called after update
assert opCode == IndexProtocol.OTHER_OP;
//Asif The behaviour can arise if an index creation has already
// acted upon a newly added entry , but by the time callback
// occurs , the index is added to the map & thus
// the add operation will now have an effect of update.
// so we need to remove the mapping even if it is an Add action
// as otherwise the new results will get added into the
// old results instead of replacement
Iterator iter = this.indexes.values().iterator();
while (iter.hasNext()) {
Object ind = iter.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask){
continue;
}
IndexProtocol index = (IndexProtocol)ind;
if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
// Asif : If the current Index contains an entry inspite
// of add operation , this can only mean that Index
// has already acted on it during creation, so do not
// apply IMQ on it
if (!index.containsEntry(entry)) {
if (logger.isDebugEnabled()) {
logger.debug("Adding to index: {}{} value: {}", index.getName(), this.region.getFullPath(), entry.getKey());
}
start = ((AbstractIndex) index).updateIndexUpdateStats();
index.addIndexMapping(entry);
((AbstractIndex) index).updateIndexUpdateStats(start);
}
}
}
break;
}
case UPDATE_ENTRY: {
if (IndexManager.testHook != null) {
if (logger.isDebugEnabled()) {
logger.debug("IndexManager TestHook in UPDATE_ENTRY.");
}
testHook.hook(5);
testHook.hook(9); //QueryDataInconsistencyDUnitTest
}
// this action is only called with opCode AFTER_UPDATE_OP
assert opCode == IndexProtocol.AFTER_UPDATE_OP;
Iterator iter = this.indexes.values().iterator();
while (iter.hasNext()) {
Object ind = iter.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask){
continue;
}
IndexProtocol index = (IndexProtocol)ind;
if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
if (logger.isDebugEnabled()) {
logger.debug("Updating index: {}{} value: ", index.getName(), this.region.getFullPath(), entry.getKey());
}
start = ((AbstractIndex) index).updateIndexUpdateStats();
index.addIndexMapping(entry);
((AbstractIndex) index).updateIndexUpdateStats(start);
}
}
break;
}
case REMOVE_ENTRY: {
if (IndexManager.testHook != null) {
if (logger.isDebugEnabled()) {
logger.debug("IndexManager TestHook in REMOVE_ENTRY.");
}
testHook.hook(5);
testHook.hook(10);
}
Iterator iter = this.indexes.values().iterator();
while (iter.hasNext()) {
Object ind = iter.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask) {
continue;
}
IndexProtocol index = (IndexProtocol) ind;
if (((AbstractIndex) index).isPopulated() && index.getType() != IndexType.PRIMARY_KEY) {
AbstractIndex abstractIndex = (AbstractIndex) index;
if (logger.isDebugEnabled()) {
logger.debug("Removing from index: {}{} value: {}", index.getName(), this.region.getFullPath(), entry.getKey());
}
start = ((AbstractIndex) index).updateIndexUpdateStats();
index.removeIndexMapping(entry, opCode);
((AbstractIndex) index).updateIndexUpdateStats(start);
}
}
break;
}
default: {
throw new IndexMaintenanceException(LocalizedStrings.IndexManager_INVALID_ACTION.toLocalizedString());
}
}
}
finally {
DefaultQuery.setPdxReadSerialized(this.region.getCache(), false);
if (tx != null) {
((TXManagerImpl) this.region.getCache().getCacheTransactionManager())
.resume(tx);
}
getCachePerfStats().endIndexUpdate(startPA);
}
}
private void waitBeforeUpdate() {
synchronized (indexes) {
++numCreators;
// Asif : If there exists any updater thread in progress
// we should not allow index creation to proceed.
while (numUpdatersInProgress > 0) {
((LocalRegion) getRegion()).getCancelCriterion()
.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
indexes.wait();
} catch (InterruptedException ie) {
interrupted = true;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
}
}
private void notifyAfterUpdate() {
synchronized (indexes) {
--numCreators;
// ASIF: If the creator is in progress , this itself
// means that there is no Update active. The updater threads
// are either in wait state or there are no threads at all.
// Since we do not want any update to progress , if there is
// any creator thread in lock seeking mode ( meaning that it has
// entered the previous synch block) . We will not issue
// any notify till the creator count drops to zero &
// also unless there is any updater thread in waiting
if (numCreators == 0 && numUpdatersInWaiting > 0) {
indexes.notifyAll();
}
}
}
/**
* Recreates all indexes for this region. This operation blocks all updates
* on all indexes while recreate is in progress. This is required as recreate
* does NOT lock region entries before index update and hence might cause
* inconsistencies in index if concurrent region entry operations are
* going on.
*
*/
private void recreateAllIndexesForRegion() {
long start = 0;
waitBeforeUpdate();
try {
// opCode is ignored for this operation
Iterator iter = this.indexes.values().iterator();
while (iter.hasNext()) {
Object ind = iter.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask) {
continue;
}
IndexProtocol index = (IndexProtocol) ind;
if (index.getType() == IndexType.FUNCTIONAL || index.getType() == IndexType.HASH) {
AbstractIndex aIndex = ((AbstractIndex) index);
start = ((AbstractIndex) index).updateIndexUpdateStats();
((AbstractIndex) index).recreateIndexData();
((AbstractIndex) index).updateIndexUpdateStats(start);
}
}
} catch (Exception e) {
throw new IndexInvalidException(e);
} finally {
notifyAfterUpdate();
}
}
/**
* Wait for index initialization before entry create, update, invalidate or destroy
* operation.
*
* Note: If the region has a disk region then we should wait for index
* initialization before getting region entry lock to avoid deadlock (#44431).
*/
public void waitForIndexInit() {
synchronized (this.indexes) {
++this.numUpdatersInWaiting;
while (this.numCreators > 0) {
((LocalRegion)this.getRegion()).getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
this.indexes.wait();
}
catch (InterruptedException ie) {
interrupted = true;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
--this.numUpdatersInWaiting;
++this.numUpdatersInProgress;
}
}
/**
* Necessary finally block call for above method.
*/
public void countDownIndexUpdaters() {
synchronized (this.indexes) {
--this.numUpdatersInProgress;
//Asif: Since Index creator threads can progress only if
// there is no update threads in progress, thus we need to issue
// notify all iff there are any creator threads in action & also
// if the upDateInProgress Count has dipped to 0
if (this.numUpdatersInProgress == 0 && this.numCreators > 0) {
this.indexes.notifyAll();
}
}
}
private CachePerfStats getCachePerfStats() {
return ((LocalRegion)this.region).getCachePerfStats();
}
/**
* Callback for destroying IndexManager Called after Region.destroy() called
*/
public void destroy() throws QueryException {
this.indexes.clear();
if (!isIndexMaintenanceTypeSynchronous()) updater.shutdown();
}
/**
* Removes indexes for a destroyed bucket region from the list of bucket indexes
* in the {@link PartitionedIndex}.
* @param prRegion the partition region that this bucket belongs to
* @throws QueryException
*/
public void removeBucketIndexes(PartitionedRegion prRegion) throws QueryException {
IndexManager parentManager = prRegion.getIndexManager();
if (parentManager != null) {
Iterator bucketIndexIterator = indexes.values().iterator();
while (bucketIndexIterator.hasNext()) {
Index bucketIndex = (Index)bucketIndexIterator.next();
Index prIndex = parentManager.getIndex(bucketIndex.getName());
if (prIndex instanceof PartitionedIndex) {
((PartitionedIndex)prIndex).removeFromBucketIndexes(this.region, bucketIndex);
}
}
}
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
Iterator iter = this.indexes.values().iterator();
while (iter.hasNext()) {
Object ind = iter.next();
// Check if the value is instance of FutureTask, this means
// the index is in create phase.
if (ind instanceof FutureTask){
continue;
}
sb.append(((Index)ind).toString()).append('\n');
}
return sb.toString();
}
public boolean isIndexMaintenanceTypeSynchronous() {
return this.indexMaintenanceSynchronous;
}
public boolean isOverFlowRegion() {
return this.isOverFlowToDisk;
}
public boolean isOffHeap() {
return this.offHeap;
}
public static boolean isObjectModificationInplace() {
return (INPLACE_OBJECT_MODIFICATION || INPLACE_OBJECT_MODIFICATION_FOR_TEST);
}
/**
* Asif : This function is used exclusively by Index Manager. It gets the
* unique Iterator name for a Iterator definition, if it already exists, else
* creates a unqiue name & also stores it in a map for subsequent use
*
* @param definition String containing definition of the iterator
* @return String containing the name of the Iterator
*/
public String putCanonicalizedIteratorNameIfAbsent(String definition) {
String str = null;
synchronized(canonicalizedIteratorNameMap) {
if ((str = (String) this.canonicalizedIteratorNameMap.get(definition)) == null) {
str = new StringBuffer("index_iter").append(this.getIncrementedCounter())
.toString();
String temp;
if ((temp = (String) this.canonicalizedIteratorNameMap.putIfAbsent(
definition, str)) != null) {
str = temp;
}
}
}
return str;
}
public void putCanonicalizedIteratorName(String definition, String name) {
synchronized(canonicalizedIteratorNameMap) {
this.canonicalizedIteratorNameMap.put(definition, name);
}
}
private synchronized int getIncrementedCounter() {
return ++this.iternameCounter;
}
/**
* Asif : Given a definition returns the canonicalized iterator name for the
* definition. If the definition does not exist , null is returned
*
* @param definition
* @return String
*/
public String getCanonicalizedIteratorName(String definition) {
return ((String) (this.canonicalizedIteratorNameMap.get(definition)));
}
////////////////////// Inner Classes //////////////////////
public class IndexUpdaterThread extends Thread {
private volatile boolean running = true;
private volatile boolean shutdownRequested = false;
private volatile BlockingQueue pendingTasks;
/**
* Creates instance of IndexUpdaterThread
* @param updateThreshold
* @param threadName
*/
IndexUpdaterThread(ThreadGroup group, int updateThreshold, String threadName) {
super(group, threadName);
// Check if threshold is set.
if (updateThreshold > 0){
// Create a bounded queue.
pendingTasks = new ArrayBlockingQueue(updateThreshold);
} else {
// Create non-bounded queue.
pendingTasks = new LinkedBlockingQueue();
}
this.setDaemon(true);
}
public void addTask(int action, RegionEntry entry, int opCode) {
Object[] task = new Object[3];
task[0] = Integer.valueOf(action);
task[1] = entry;
task[2] = Integer.valueOf(opCode); // !!!:ezoerner:20081029 change to valueOf jdk 1.5+
pendingTasks.add(task);
}
/**
* Stops this thread. Does not return until it has stopped.
*/
public void shutdown() {
if (!this.running) {
return;
}
this.shutdownRequested = true;
this.interrupt();
try {
this.join();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
// just return, we're done
}
}
@Override
public void run() {
// async writers main loop
// logger.debug("DiskRegion writer started (writer=" + this + ")");
com.gemstone.gemfire.CancelCriterion stopper = ((LocalRegion)region).getCancelCriterion();
try {
while (!this.shutdownRequested) {
// Termination checks
SystemFailure.checkFailure();
if (stopper.cancelInProgress() != null) {
break;
}
try {
Object[] task = (Object[])pendingTasks.take();
if (this.shutdownRequested) {
break;
}
updateIndexes(task);
}
catch (InterruptedException ignore) {
return; // give up (exit the thread)
}
}
}
finally {
this.running = false;
}
}
private void updateIndexes(Object[] task) {
int action = ((Integer)task[0]).intValue();
RegionEntry entry = (RegionEntry)task[1];
int opCode = ((Integer)task[2]).intValue();
//System.out.println("entry = "+entry.getKey());
if (entry != null || action == RECREATE_INDEX) {
try {
if (action == RECREATE_INDEX) {
recreateAllIndexesForRegion();
} else {
if (entry != null) {
entry.setUpdateInProgress(true);
}
processAction(entry, action, opCode);
}
}
catch (Exception e) {
e.printStackTrace();
} finally {
if (entry != null && action != RECREATE_INDEX) {
entry.setUpdateInProgress(false);
}
}
}
}
/**
* Used by tests to determine if the updater thread has finished updating
* its indexes. The list is cleared without synchronization, which makes
* this methods somewhat unsafe from a threading point of view.
*/
public synchronized boolean isDone() {
return this.pendingTasks.size() == 0;
}
}
/**
* Index Task used to create the index. This is used along with the
* FutureTask to take care of, same index creation request from multiple
* threads. At any time only one thread succeeds and other threads waits
* for the completion of the index creation. This avoids usage of
* synchronization which could block any index creation.
*/
public class IndexTask implements Callable<Index> {
public String indexName;
public IndexType indexType;
public IndexCreationHelper helper;
public String origFromClause;
public String origIndexedExpression;
public boolean isCompactOrHash = false;
public boolean isLDM = false;
public PartitionedIndex prIndex;
public boolean loadEntries;
IndexTask (String indexName, IndexType type, String origFromClause, String origIndexedExpression, IndexCreationHelper helper, boolean isCompactOrHash, PartitionedIndex prIndex, boolean loadEntries){
this.indexName = indexName;
this.indexType = type;
this.origFromClause = origFromClause;
this.origIndexedExpression = origIndexedExpression;
this.helper = helper;
this.isCompactOrHash = isCompactOrHash;
this.prIndex = prIndex;
this.loadEntries = loadEntries;
}
/* For name based index search */
IndexTask (String indexName){
this.indexName = indexName;
}
@Override
public boolean equals (Object other){
if (other == null) {
return false;
}
IndexTask otherIndexTask = (IndexTask) other;
// compare indexName.
if (this.indexName.equals(otherIndexTask.indexName)){
return true;
}
if (otherIndexTask.helper == null || this.helper == null) {
return false;
}
String[] indexDefinitions = this.helper.getCanonicalizedIteratorDefinitions();
int[] mapping = new int[indexDefinitions.length];
// compare index based on its type, expression and definition.
if(compareIndexData(this.indexType, indexDefinitions,
this.helper.getCanonicalizedIndexedExpression(), otherIndexTask.indexType,
otherIndexTask.helper.getCanonicalizedIteratorDefinitions(),
otherIndexTask.helper.getCanonicalizedIndexedExpression(), mapping) == 0)
{
return true;
}
return false;
}
public int hashCode(){
// It returns a constant number as the equality check is based on
// the OR condition between the indexName and its characteristics
// (involving type, expression and definition), because of this
// its not possible to come-up with an accurate hash code.
return 99;
}
/*
* Creates and initializes the index.
*/
public Index call() {
Index index = null;
String indexedExpression = helper.getCanonicalizedIndexedExpression();
String fromClause = helper.getCanonicalizedFromClause();
String projectionAttributes = helper.getCanonicalizedProjectionAttributes();
String[] definitions = helper.getCanonicalizedIteratorDefinitions();
IndexStatistics stats = null;
this.isLDM = IndexManager.IS_TEST_LDM;
if (this.prIndex != null) {
stats = this.prIndex.getStatistics();
}
//Hash index not supported for overflow but we "thought" we were so let's maintain backwards compatibility
//and create a regular compact range index instead
if (indexType == IndexType.HASH && isOverFlowRegion()) {
indexType = IndexType.FUNCTIONAL;
}
if (indexType == IndexType.PRIMARY_KEY) {
index = new PrimaryKeyIndex(indexName, region, fromClause,indexedExpression,
projectionAttributes, origFromClause,
origIndexedExpression, definitions, stats);
logger.info("Using Primary Key index implementation for '{}' on region {}", indexName, region.getFullPath());
} else if (indexType == IndexType.HASH){
index = new HashIndex(indexName, region, fromClause,
indexedExpression, projectionAttributes, origFromClause,
origIndexedExpression, definitions, stats);
logger.info("Using Hash index implementation for '{}' on region {}", indexName, region.getFullPath());
}
else {
//boolean isCompact = !helper.isMapTypeIndex() && shouldCreateCompactIndex((FunctionalIndexCreationHelper)helper);
if (this.isCompactOrHash || this.isLDM) {
if (indexType == IndexType.FUNCTIONAL && !helper.isMapTypeIndex()) {
index = new CompactRangeIndex(indexName, region, fromClause,
indexedExpression, projectionAttributes, origFromClause,
origIndexedExpression, definitions, stats);
logger.info("Using Compact Range index implementation for '{}' on region {}", indexName, region.getFullPath());
}
else {
FunctionalIndexCreationHelper fich = (FunctionalIndexCreationHelper)helper;
index = new CompactMapRangeIndex(indexName, region, fromClause, indexedExpression,
projectionAttributes, origFromClause,
origIndexedExpression, definitions, fich.isAllKeys(), fich.multiIndexKeysPattern,
fich.mapKeys, stats);
logger.info("Using Compact Map Range index implementation for '{}' on region {}", indexName, region.getFullPath());
}
}
else {
assert indexType == IndexType.FUNCTIONAL;
if ( !helper.isMapTypeIndex() ) {
index = new RangeIndex(indexName, region, fromClause,
indexedExpression, projectionAttributes, origFromClause,
origIndexedExpression, definitions, stats);
logger.info("Using Non-Compact index implementation for '{}' on region {}", indexName, region.getFullPath());
}
else {
FunctionalIndexCreationHelper fich = (FunctionalIndexCreationHelper)helper;
index = new MapRangeIndex(indexName, region, fromClause, indexedExpression,
projectionAttributes, origFromClause,
origIndexedExpression, definitions, fich.isAllKeys(), fich.multiIndexKeysPattern,
fich.mapKeys, stats);
logger.info("Using Non-Compact Map index implementation for '{}' on region {}", indexName, region.getFullPath());
}
}
}
((AbstractIndex)index).setPRIndex(prIndex);
if (index.getType() != IndexType.PRIMARY_KEY) {
AbstractIndex aIndex = ((AbstractIndex) index);
aIndex.instantiateEvaluator(helper);
waitBeforeUpdate();
boolean indexCreatedSuccessfully = false;
try {
((LocalRegion)region).setFlagForIndexCreationThread(true);
aIndex.initializeIndex(loadEntries);
logger.info((loadEntries ? "Initialized and loaded entries into the index "
: "Initialized but entries not yet loaded into the index "
+ indexName + " on region: " + region.getFullPath()));
aIndex.markValid(true);
indexCreatedSuccessfully = true;
if(loadEntries) {
aIndex.setPopulated(true);
if (this.prIndex != null) {
((AbstractIndex)this.prIndex).setPopulated(true);
}
}
indexes.put(this, index);
if (region instanceof BucketRegion && prIndex != null) {
prIndex.addToBucketIndexes(region, index);
prIndex.incNumBucketIndexes();
}
} catch (Exception e) {
throw new IndexInvalidException(e);
}
finally {
notifyAfterUpdate();
((LocalRegion)region).setFlagForIndexCreationThread(false);
if (!indexCreatedSuccessfully){
((InternalIndexStatistics)index.getStatistics()).close();
}
}
} else {
// For PrimaryKey index
((AbstractIndex)index).setPopulated(true);
indexes.put(this, index);
if (region instanceof BucketRegion && prIndex != null) {
prIndex.addToBucketIndexes(region, index);
}
}
return index;
}
}
}