blob: 2db55c396b8594f33e1eeeb2dff89a645b08196c [file] [log] [blame]
/*
* =========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.CacheRuntimeException;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver;
import com.gemstone.gemfire.cache.query.internal.NWayMergeResults;
import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext;
import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
import com.gemstone.gemfire.cache.query.internal.QueryObserver;
import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
import com.gemstone.gemfire.cache.query.types.ObjectType;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.PartitionedRegionQueryEvaluator.PRQueryResultCollector;
import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
/**
* This class takes the responsibility of executing the query on a data store
* for the buckets specified in bucketList. It contains a
* <code>PRQueryExecutor</code> thread-pool executor that takes a
* <code>Callable</code> task identified by <code>PartitionedRegion</code>,
* queryString and bucketId.
*
* The QueryTasks add results directly to a results queue.
* The BucketQueryResult is used not only to indicate completion, and holds an exception if there one occurred while
* processing a query.
*
* @author rreja
* @author Eric Zoerner
*/
public class PRQueryProcessor
{
private static final Logger logger = LogService.getLogger();
final static int BUCKET_QUERY_TIMEOUT = 60;
public final static int NUM_THREADS = Integer.getInteger("gemfire.PRQueryProcessor.numThreads", 1).intValue();
/* For Test purpose */
public static int TEST_NUM_THREADS = 0;
private PartitionedRegionDataStore _prds;
private PartitionedRegion pr;
private final DefaultQuery query;
private final Object[] parameters;
private final List<Integer> _bucketsToQuery;
private volatile int numBucketsProcessed = 0;
private volatile ObjectType resultType = null;
private boolean isIndexUsedForLocalQuery = false;
// private List _failedBuckets;
public PRQueryProcessor(PartitionedRegionDataStore prDS,
DefaultQuery query, Object[] parameters, List<Integer> buckets) {
Assert.assertTrue(!buckets.isEmpty(), "bucket list can not be empty. ");
this._prds = prDS;
this._bucketsToQuery = buckets;
((GemFireCacheImpl)prDS.partitionedRegion.getCache()).getLocalQueryService();
this.query = query;
this.parameters = parameters;
PRQueryExecutor.initializeExecutorService();
}
public PRQueryProcessor(PartitionedRegion pr,
DefaultQuery query, Object[] parameters, List buckets) {
Assert.assertTrue(!buckets.isEmpty(), "bucket list can not be empty. ");
this.pr = pr;
this._bucketsToQuery = buckets;
this.query = query;
this.parameters = parameters;
PRQueryExecutor.initializeExecutorService();
}
private synchronized void incNumBucketsProcessed() {
this.numBucketsProcessed++;
}
private synchronized int getNumBucketsProcessed() {
return this.numBucketsProcessed;
}
/**
* Executes a pre-compiled query on a data store.
* Adds result objects to resultQueue
* @return boolean true if the result is a struct type
* @throws QueryException
* @throws ForceReattemptException if query should be tried again
*/
public boolean executeQuery(Collection<Collection> resultCollector)
throws QueryException, InterruptedException, ForceReattemptException {
//Set indexInfoMap to this threads observer.
//QueryObserver observer = QueryObserverHolder.getInstance();
//if(observer != null && observer instanceof IndexTrackingQueryObserver){
//((IndexTrackingQueryObserver)observer).setIndexInfo(resultCollector.getIndexInfoMap());
//}
if (NUM_THREADS > 1 || this.TEST_NUM_THREADS > 1) {
executeWithThreadPool(resultCollector);
} else {
executeSequentially(resultCollector, this._bucketsToQuery);
}
return this.resultType.isStructType();
}
private void executeWithThreadPool(Collection<Collection> resultCollector)
throws QueryException, InterruptedException, ForceReattemptException {
if (Thread.interrupted()) throw new InterruptedException();
java.util.List callableTasks = buildCallableTaskList(resultCollector);
ExecutorService execService = PRQueryExecutor.getExecutorService();
boolean reattemptNeeded = false;
ForceReattemptException fre = null;
if (callableTasks != null && !callableTasks.isEmpty()) {
List futures = null;
try {
futures = execService.invokeAll(callableTasks, 300, TimeUnit.SECONDS);
}
catch (RejectedExecutionException rejectedExecutionEx) {
//this._prds.partitionedRegion.checkReadiness();
throw rejectedExecutionEx;
}
if (futures != null) {
Iterator itr = futures.iterator();
while (itr.hasNext() && !execService.isShutdown() && !execService.isTerminated()) {
//this._prds.partitionedRegion.checkReadiness();
Future fut = (Future)itr.next();
QueryTask.BucketQueryResult bqr = null;
try {
bqr = (QueryTask.BucketQueryResult)fut.get(BUCKET_QUERY_TIMEOUT, TimeUnit.SECONDS);
//if (retry.booleanValue()) {
// reattemptNeeded = true;
//fre = (ForceReattemptException)bqr.getException();
//} else {
bqr.handleAndThrowException(); // handles an exception if there was one,
// otherwise, the results have already been added to the resultQueue
//}
if (bqr.retry) {
reattemptNeeded = true;
}
} catch (TimeoutException e) {
throw new InternalGemFireException(LocalizedStrings.PRQueryProcessor_TIMED_OUT_WHILE_EXECUTING_QUERY_TIME_EXCEEDED_0.toLocalizedString(
Integer.valueOf(BUCKET_QUERY_TIMEOUT)), e);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof QueryException) {
throw (QueryException)cause;
}
else {
throw new InternalGemFireException(LocalizedStrings.PRQueryProcessor_GOT_UNEXPECTED_EXCEPTION_WHILE_EXECUTING_QUERY_ON_PARTITIONED_REGION_BUCKET.toLocalizedString(),
cause);
}
}
}
CompiledSelect cs = this.query.getSimpleSelect();
if(cs != null && (cs.isOrderBy() || cs.isGroupBy())) {
ExecutionContext context = new QueryExecutionContext(this.parameters, pr.getCache());
int limit = this.query.getLimit(parameters);
Collection mergedResults =coalesceOrderedResults(resultCollector, context, cs, limit);
resultCollector.clear();
resultCollector.add(mergedResults);
}
}
}
if (execService == null || execService.isShutdown()
|| execService.isTerminated()) {
this._prds.partitionedRegion.checkReadiness();
}
if (reattemptNeeded) {
throw fre;
}
}
/**
* @throws ForceReattemptException
* if bucket was moved so caller should try query again
*/
private void doBucketQuery(final Integer bId,
final PartitionedRegionDataStore prds,
final DefaultQuery query,
final Object[] params,
final PRQueryResultCollector rq)
throws QueryException, ForceReattemptException, InterruptedException {
final BucketRegion bukRegion = (BucketRegion)prds.localBucket2RegionMap.get(bId);
final PartitionedRegion pr = prds.getPartitionedRegion();
try {
pr.checkReadiness();
if (bukRegion == null) {
if (pr.isLocallyDestroyed || pr.isClosed) {
throw new RegionDestroyedException("PR destroyed during query", pr.getFullPath());
} else {
throw new ForceReattemptException("Bucket id "
+ pr.bucketStringForLogs(bId.intValue())
+ " not found on VM "
+ pr.getMyId());
}
}
bukRegion.waitForData();
SelectResults results = null;
// If the query has LIMIT and is not order by, apply the limit while building the result set.
int limit = -1;
if (query.getSimpleSelect().getOrderByAttrs() == null) {
limit = query.getLimit(params);
}
if (!bukRegion.isBucketDestroyed()) {
// If the result queue has reached the limit, no need to
// execute the query. Handle the bucket destroy condition
// and add the end bucket token.
int numBucketsProcessed = getNumBucketsProcessed();
if (limit < 0 || (rq.size() - numBucketsProcessed) < limit) {
results = (SelectResults) query.prExecuteOnBucket(params, pr,
bukRegion);
this.resultType = results.getCollectionType().getElementType();
}
if (!bukRegion.isBucketDestroyed()) {
// someday, when queries can return objects as a stream, the entire results set won't need to be manifested
// here before we can start adding to the results queue
if (results != null) {
for (Object r : results){
if (r == null) { // Blocking queue does not support adding null.
rq.put(DefaultQuery.NULL_RESULT);
} else {
// Count from each bucket should be > 0 otherwise limit makes the final result wrong.
// Avoid if query is distinct as this Integer could be a region value.
if (!query.getSimpleSelect().isDistinct() &&
query.getSimpleSelect().isCount() && r instanceof Integer) {
if (((Integer) r).intValue() != 0 ) {
rq.put(r);
}
} else {
rq.put(r);
}
}
// Check if limit is satisfied.
if (limit >= 0 && (rq.size() - numBucketsProcessed) >= limit) {
break;
}
}
}
rq.put(new EndOfBucket(bId.intValue()));
this.incNumBucketsProcessed();
return; // success
}
}
// if we get here then the bucket must have been moved
checkForBucketMoved(bId, bukRegion, pr);
Assert.assertTrue(false, "checkForBucketMoved should have thrown ForceReattemptException");
} catch (RegionDestroyedException rde) {
checkForBucketMoved(bId, bukRegion, pr);
throw rde;
} catch (QueryException qe) {
checkForBucketMoved(bId, bukRegion, pr);
throw qe;
}
}
/**
* @throws ForceReattemptException if it detects that the given bucket moved
* @throws RegionDestroyedException if the given pr was destroyed
*/
private static void checkForBucketMoved(Integer bId, BucketRegion br, PartitionedRegion pr)
throws ForceReattemptException, RegionDestroyedException {
if (br.isBucketDestroyed()) {
// see if the pr is destroyed
if (pr.isLocallyDestroyed || pr.isClosed) {
throw new RegionDestroyedException("PR destroyed during query", pr.getFullPath());
}
pr.checkReadiness();
throw new ForceReattemptException("Bucket id "
+ pr.bucketStringForLogs(bId.intValue())
+ " not found on VM "
+ pr.getMyId());
}
}
private void executeSequentially(Collection<Collection> resultCollector, List buckets)
throws QueryException, InterruptedException, ForceReattemptException {
/*
for (Iterator itr = _bucketsToQuery.iterator(); itr.hasNext(); ) {
Integer bId = (Integer)itr.next();
doBucketQuery(bId, this._prds, this.query, this.parameters, resultCollector);
}*/
ExecutionContext context = new QueryExecutionContext(this.parameters, this.pr.getCache(), this.query);
CompiledSelect cs = this.query.getSimpleSelect();
int limit = this.query.getLimit(parameters);
if(cs != null && cs.isOrderBy() ) {
for(Integer bucketID : this._bucketsToQuery) {
List<Integer> singleBucket = Collections.singletonList(bucketID);
context.setBucketList(singleBucket);
executeQueryOnBuckets(resultCollector, context);
}
Collection mergedResults =coalesceOrderedResults(resultCollector, context, cs, limit);
resultCollector.clear();
resultCollector.add(mergedResults);
}else {
context.setBucketList(buckets);
executeQueryOnBuckets(resultCollector, context);
}
}
private Collection coalesceOrderedResults(Collection<Collection> results,
ExecutionContext context, CompiledSelect cs, int limit) {
List<Collection> sortedResults = new ArrayList<Collection>(results.size());
//TODO :Asif : Deal with UNDEFINED
for(Object o : results) {
if(o instanceof Collection) {
sortedResults.add((Collection)o);
}
}
NWayMergeResults mergedResults = new NWayMergeResults(sortedResults, cs.isDistinct(), limit,
cs.getOrderByAttrs(), context,cs.getElementTypeForOrderByQueries());
return mergedResults;
}
private void executeQueryOnBuckets(Collection<Collection> resultCollector,
ExecutionContext context) throws ForceReattemptException,
QueryInvocationTargetException, QueryException {
// Check if QueryMonitor is enabled, if so add query to be monitored.
QueryMonitor queryMonitor = null;
context.setCqQueryContext(query.isCqQuery());
if (GemFireCacheImpl.getInstance() != null)
{
queryMonitor = GemFireCacheImpl.getInstance().getQueryMonitor();
}
try {
if (queryMonitor != null) {
// Add current thread to be monitored by QueryMonitor.
queryMonitor.monitorQueryThread(Thread.currentThread(), query);
}
Object results = query.executeUsingContext(context);
synchronized (resultCollector) {
//TODO:Asif: In what situation would the results object itself be undefined?
// The elements of the results can be undefined , but not the resultset itself
/*if (results == QueryService.UNDEFINED) {
resultCollector.add(Collections.singleton(results));
} else {*/
this.resultType = ((SelectResults)results).getCollectionType().getElementType();
resultCollector.add((SelectResults) results);
//}
}
isIndexUsedForLocalQuery =((QueryExecutionContext)context).isIndexUsed();
} catch (BucketMovedException bme) {
if (logger.isDebugEnabled()) {
logger.debug("Query targeted local bucket not found. {}", bme.getMessage(), bme);
}
throw new ForceReattemptException("Query targeted local bucket not found." + bme.getMessage(), bme);
} catch (RegionDestroyedException rde) {
throw new QueryInvocationTargetException("The Region on which query is executed may have been destroyed." +
rde.getMessage(), rde);
} catch (QueryException qe) {
// Check if PR is locally destroyed.
if (pr.isLocallyDestroyed || pr.isClosed) {
throw new ForceReattemptException("Local Partition Region or the targeted bucket has been moved");
}
throw qe;
} finally {
if (queryMonitor != null) {
queryMonitor.stopMonitoringQueryThread(Thread.currentThread(), query);
}
}
}
private List buildCallableTaskList(Collection<Collection> resultsColl)
{
List callableTasks = new ArrayList();
for (Iterator itr = _bucketsToQuery.iterator(); itr.hasNext();) {
Integer bId = (Integer)itr.next();
callableTasks.add(new QueryTask(this.query, this.parameters, _prds, bId, resultsColl));
}
return callableTasks;
}
public boolean isIndexUsed() {
return isIndexUsedForLocalQuery;
}
public static void shutdown()
{
PRQueryExecutor.shutdown();
}
public static void shutdownNow()
{
PRQueryExecutor.shutdownNow();
}
/**
* A ThreadPool ( Fixed Size ) with an executor service to execute the query
* execution spread over buckets.
*
* @author rreja
*
*/
static class PRQueryExecutor {
private static ExecutorService execService = null;
/**
* Closes the executor service. This is called from
* {@link PartitionedRegion#afterRegionsClosedByCacheClose(GemFireCacheImpl)}
*/
static synchronized void shutdown() {
if (execService != null) {
execService.shutdown();
}
}
static synchronized void shutdownNow() {
if (execService != null)
execService.shutdownNow();
}
static synchronized ExecutorService getExecutorService() {
if (execService == null) {
initializeExecutorService();
}
assert execService != null;
return execService;
}
/**
* Creates the Executor Service.
*/
static synchronized void initializeExecutorService() {
if (execService == null || execService.isShutdown()
|| execService.isTerminated()) {
int numThreads = (TEST_NUM_THREADS > 1 ? TEST_NUM_THREADS : NUM_THREADS);
execService = Executors.newFixedThreadPool(numThreads);
}
}
}
/**
* Status token placed in results stream to track completion of
* query results for a given bucket
*/
public static final class EndOfBucket implements DataSerializableFixedID {
private int bucketId;
/** Required by DataSerializer */
public EndOfBucket() {
}
public EndOfBucket(int bucketId) {
this.bucketId = bucketId;
}
public int getBucketId() {
return this.bucketId;
}
@Override
public String toString() {
return "EndOfBucket(" + this.bucketId + ")";
}
public int getDSFID() {
return END_OF_BUCKET;
}
public void fromData(DataInput in)
throws IOException, ClassNotFoundException {
this.bucketId = in.readInt();
}
public void toData(DataOutput out) throws IOException {
out.writeInt(this.bucketId);
}
@Override
public Version[] getSerializationVersions() {
// TODO Auto-generated method stub
return null;
}
}
/**
* Implementation of call-able task to execute query on a bucket region. This
* task will be generated by the PRQueryProcessor.
*
* @author rreja
*/
@SuppressWarnings("synthetic-access")
private final class QueryTask implements Callable {
private final DefaultQuery query;
private final Object[] parameters;
private final PartitionedRegionDataStore _prDs;
private final Integer _bucketId;
private final Collection<Collection> resultColl;
public QueryTask(DefaultQuery query, Object[] parameters, PartitionedRegionDataStore prDS,
Integer bucketId, final Collection<Collection> rColl) {
this.query = query;
this._prDs = prDS;
this._bucketId = bucketId;
this.resultColl = rColl;
this.parameters = parameters;
}
public Object call() throws Exception {
BucketQueryResult bukResult = new BucketQueryResult(this._bucketId);
boolean retry = false;
try {
//Add indexInfo of this thread to result collector
QueryObserver observer = QueryObserverHolder.getInstance();
if (observer != null && observer instanceof IndexTrackingQueryObserver) {
//((IndexTrackingQueryObserver)observer).setIndexInfo(resultColl.getIndexInfoMap());
}
final Integer bId = Integer.valueOf(this._bucketId);
List<Integer> bucketList = Collections.singletonList(bId);
ExecutionContext context = new QueryExecutionContext(this.parameters, pr.getCache(), this.query);
context.setBucketList(bucketList);
executeQueryOnBuckets(this.resultColl, context);
//executeSequentially(this.resultColl, bucketList);
// success
//doBucketQuery(bId, this._prDs, this.query, this.parameters, this.resultColl);
} catch (ForceReattemptException fre) {
bukResult.setException(fre);
} catch (QueryException e) {
bukResult.setException(e);
} catch (CacheRuntimeException cre) {
bukResult.setException(cre);
}
// Exception
return bukResult;
}
/**
* Encapsulates the result for the query on the bucket.
*
* @author rreja
*/
private final class BucketQueryResult {
private int _buk;
private Exception _ex = null;
public boolean retry = false;
/**
* Constructor
*
* @param bukId
*/
public BucketQueryResult(int bukId) {
this._buk = bukId;
}
public Exception getException()
{
return _ex;
}
public boolean exceptionOccured()
{
return _ex != null;
}
public void setException(Exception e)
{
this._ex = e;
}
public Integer getBucketId()
{
return Integer.valueOf(this._buk);
}
public boolean isReattemptNeeded() {
return this._ex instanceof ForceReattemptException;
}
public void handleAndThrowException() throws QueryException
{
if (_ex != null) {
if (_ex instanceof QueryException) {
throw (QueryException)_ex;
}
else if (_ex instanceof CacheRuntimeException) {
throw (CacheRuntimeException)_ex;
}
}
}
}
}
}