blob: e2b6b9ba508b1917cb9a2b970ae6aa132576a6f6 [file] [log] [blame]
package com.gemstone.gemfire.internal.cache.tier.sockets;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.operations.QueryOperationContext;
import com.gemstone.gemfire.cache.query.Query;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.QueryInvalidException;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.cache.query.internal.CqEntry;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ;
import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl;
import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
import com.gemstone.gemfire.cache.query.types.CollectionType;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
import com.gemstone.gemfire.internal.cache.tier.MessageType;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
public abstract class BaseCommandQuery extends BaseCommand {
/**
* Process the give query and sends the resulset back to the client.
*
* @param msg
* @param query
* @param queryString
* @param regionNames
* @param start
* @param cqQuery
* @param queryContext
* @param servConn
* @return true if successful execution
* false in case of failure.
* @throws IOException
*/
protected static boolean processQuery(Message msg, Query query,
String queryString, Set regionNames, long start, ServerCQ cqQuery,
QueryOperationContext queryContext, ServerConnection servConn,
boolean sendResults)
throws IOException, InterruptedException {
return processQueryUsingParams(msg, query, queryString,
regionNames, start, cqQuery, queryContext, servConn, sendResults, null);
}
/**
* Process the give query and sends the resulset back to the client.
*
* @param msg
* @param query
* @param queryString
* @param regionNames
* @param start
* @param cqQuery
* @param queryContext
* @param servConn
* @return true if successful execution
* false in case of failure.
* @throws IOException
*/
protected static boolean processQueryUsingParams(Message msg, Query query,
String queryString, Set regionNames, long start, ServerCQ cqQuery,
QueryOperationContext queryContext, ServerConnection servConn,
boolean sendResults, Object[] params)
throws IOException, InterruptedException {
ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
CacheServerStats stats = servConn.getCacheServerStats();
CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadQueryRequestTime(start - oldStart);
}
// from 7.0, set flag to indicate a remote query irrespective of the
// object type
if (servConn.getClientVersion().compareTo(Version.GFE_70) >= 0) {
((DefaultQuery) query).setRemoteQuery(true);
}
// Process the query request
try {
// Execute query
// startTime = GenericStats.getTime();
// startTime = System.currentTimeMillis();
// For now we assume the results are a SelectResults
// which is the only possibility now, but this may change
// in the future if we support arbitrary queries
Object result = null;
if (params != null) {
result = query.execute(params);
} else {
result = query.execute();
}
//Asif : Before conditioning the results check if any
//of the regions involved in the query have been destroyed
//or not. If yes, throw an Exception.
//This is a workaround/fix for Bug 36969
Iterator itr = regionNames.iterator();
while(itr.hasNext()) {
String regionName = (String)itr.next();
if(crHelper.getRegion(regionName) == null) {
throw new RegionDestroyedException(
LocalizedStrings.BaseCommand_REGION_DESTROYED_DURING_THE_EXECUTION_OF_THE_QUERY.toLocalizedString(), regionName);
}
}
AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest();
if (postAuthzRequest != null) {
if (cqQuery == null) {
queryContext = postAuthzRequest.queryAuthorize(queryString,
regionNames, result, queryContext, params);
}
else {
queryContext = postAuthzRequest.executeCQAuthorize(cqQuery.getName(),
queryString, regionNames, result, queryContext);
}
result = queryContext.getQueryResult();
}
// endTime = System.currentTimeMillis();
// System.out.println("Query executed in: " + (endTime-startTime) + "ms");
// GenericStats.endTime0(startTime);
if (result instanceof SelectResults) {
SelectResults selectResults = (SelectResults)result;
if (logger.isDebugEnabled()) {
logger.debug("Query Result size for : {} is {}", query.getQueryString(), selectResults.size());
}
CollectionType collectionType = null;
boolean sendCqResultsWithKey = true;
boolean isStructs = false;
// check if resultset has serialized objects, so that they could be sent
// as ObjectPartList
boolean hasSerializedObjects = ((DefaultQuery) query)
.isKeepSerialized();
if (logger.isDebugEnabled()) {
logger.debug("Query Result for :{} has serialized objects: {}", query.getQueryString(), hasSerializedObjects);
}
// Don't convert to a Set, there might be duplicates now
// The results in a StructSet are stored in Object[]s
// Get them as Object[]s for the objs[] in order to avoid duplicating
// the StructTypes
// Object[] objs = new Object[selectResults.size()];
// Get the collection type (which includes the element type)
// (used to generate the appropriate instance on the client)
// Get the collection type (which includes the element type)
// (used to generate the appropriate instance on the client)
collectionType = selectResults.getCollectionType();
isStructs = collectionType.getElementType().isStructType();
// Check if the Query is from CQ execution.
if (cqQuery != null){
// Check if the key can be sent to the client based on its version.
sendCqResultsWithKey = sendCqResultsWithKey(servConn);
if (sendCqResultsWithKey){
// Update the collection type to include key info.
collectionType = new CollectionTypeImpl(Collection.class,
new StructTypeImpl(new String[]{"key", "value"}));
isStructs = collectionType.getElementType().isStructType();
}
}
int numberOfChunks = (int)Math.ceil(selectResults.size() * 1.0
/ maximumChunkSize);
if (logger.isTraceEnabled()) {
logger.trace("{}: Query results size: {}: Entries in chunk: {}: Number of chunks: {}",
servConn.getName(), selectResults.size(), maximumChunkSize, numberOfChunks);
}
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incProcessQueryTime(start - oldStart);
if(sendResults){
queryResponseMsg.setMessageType(MessageType.RESPONSE);
queryResponseMsg.setTransactionId(msg.getTransactionId());
queryResponseMsg.sendHeader();
}
if (sendResults && numberOfChunks == 0) {
// Send 1 empty chunk
if (logger.isTraceEnabled()) {
logger.trace("{}: Creating chunk: 0", servConn.getName() );
}
writeQueryResponseChunk(new Object[0], collectionType, true, servConn);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sent chunk (1 of 1) of query response for query {}", servConn.getName(), queryString);
}
}
else {
// Send response to client.
// from 7.0, if the object is in the form of serialized byte array,
// send it as a part of ObjectPartList
if (hasSerializedObjects) {
sendResultsAsObjectPartList(numberOfChunks, servConn,
selectResults.asList(), isStructs, collectionType,
queryString, cqQuery, sendCqResultsWithKey, sendResults);
} else {
sendResultsAsObjectArray(selectResults, numberOfChunks, servConn,
isStructs, collectionType, queryString, cqQuery, sendCqResultsWithKey, sendResults);
}
}
if(cqQuery != null){
// Set the CQ query result cache initialized flag.
cqQuery.setCqResultsCacheInitialized();
}
}
else if (result instanceof Integer) {
if (sendResults) {
queryResponseMsg.setMessageType(MessageType.RESPONSE);
queryResponseMsg.setTransactionId(msg.getTransactionId());
queryResponseMsg.sendHeader();
writeQueryResponseChunk(result, null, true, servConn);
}
}
else {
throw new QueryInvalidException(LocalizedStrings.BaseCommand_UNKNOWN_RESULT_TYPE_0.toLocalizedString(result.getClass()));
}
msg.clearParts();
}
catch (QueryInvalidException e) {
// Handle this exception differently since it can contain
// non-serializable objects.
// java.io.NotSerializableException: antlr.CommonToken
// Log a warning to show stack trace and create a new
// QueryInvalidEsception on the original one's message (not cause).
logger.warn(LocalizedMessage.create(
LocalizedStrings.BaseCommand_UNEXPECTED_QUERYINVALIDEXCEPTION_WHILE_PROCESSING_QUERY_0, queryString), e);
QueryInvalidException qie = new QueryInvalidException(LocalizedStrings.BaseCommand_0_QUERYSTRING_IS_1
.toLocalizedString(new Object[] {e.getLocalizedMessage(), queryString}));
writeQueryResponseException(msg, qie, false, servConn);
return false;
}
catch (DistributedSystemDisconnectedException se) {
if (msg != null && logger.isDebugEnabled()) {
logger.debug("{}: ignoring message of type {} from client {} because shutdown occurred during message processing.",
servConn.getName(), MessageType.getString(msg.getMessageType()), servConn.getProxyID());
}
servConn.setFlagProcessMessagesAsFalse();
return false;
}
catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
checkForInterrupt(servConn, e);
// Otherwise, write a query response and continue
// Check if query got canceled from QueryMonitor.
DefaultQuery defaultQuery = (DefaultQuery)query;
if ((defaultQuery).isCanceled()){
e = new QueryException(defaultQuery.getQueryCanceledException().getMessage(), e.getCause());
}
writeQueryResponseException(msg, e, false, servConn);
return false;
} finally {
// Since the query object is being shared in case of bind queries,
// resetting the flag may cause inconsistency.
// Also since this flag is only being set in code path executed by
// remote query execution, resetting it is not required.
//((DefaultQuery)query).setRemoteQuery(false);
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Sent query response for query {}", servConn.getName(), queryString);
}
stats.incWriteQueryResponseTime(DistributionStats.getStatTime() - start);
return true;
}
private static boolean sendCqResultsWithKey(ServerConnection servConn) {
Version clientVersion = servConn.getClientVersion();
if (clientVersion.compareTo(Version.GFE_65) >= 0) {
return true;
}
return false;
}
protected static void sendCqResponse(int msgType, String msgStr, int txId,
Throwable e, ServerConnection servConn) throws IOException {
ChunkedMessage cqMsg = servConn.getChunkedResponseMessage();
if (logger.isDebugEnabled()) {
logger.debug("CQ Response message :{}", msgStr);
}
switch (msgType) {
case MessageType.REPLY:
cqMsg.setNumberOfParts(1);
break;
case MessageType.CQDATAERROR_MSG_TYPE:
logger.warn(msgStr);
cqMsg.setNumberOfParts(1);
break;
case MessageType.CQ_EXCEPTION_TYPE:
String exMsg = "";
if (e != null) {
exMsg = e.getLocalizedMessage();
}
logger.info(msgStr + exMsg, e);
msgStr += exMsg; // fixes bug 42309
cqMsg.setNumberOfParts(1);
break;
default:
msgType = MessageType.CQ_EXCEPTION_TYPE;
cqMsg.setNumberOfParts(1);
msgStr += LocalizedStrings.BaseCommand_UNKNOWN_QUERY_EXCEPTION.toLocalizedString();
break;
}
cqMsg.setMessageType(msgType);
cqMsg.setTransactionId(txId);
cqMsg.sendHeader();
cqMsg.addStringPart(msgStr);
cqMsg.setLastChunk(true);
cqMsg.sendChunk(servConn);
cqMsg.setLastChunk(true);
if (logger.isDebugEnabled()) {
logger.debug("CQ Response sent successfully");
}
}
private static void sendResultsAsObjectArray(SelectResults selectResults,
int numberOfChunks, ServerConnection servConn,
boolean isStructs, CollectionType collectionType, String queryString, ServerCQ cqQuery, boolean sendCqResultsWithKey, boolean sendResults)
throws IOException {
int resultIndex = 0;
// For CQ only as we dont want CQEntries which have null values.
int cqResultIndex = 0;
Object[] objs = selectResults.toArray();
for (int j = 0; j < numberOfChunks; j++) {
boolean incompleteArray = false;
if (logger.isTraceEnabled()) {
logger.trace("{}: Creating chunk: {}", servConn.getName(), j);
}
Object[] results = new Object[maximumChunkSize];
for (int i = 0; i < maximumChunkSize; i++) {
if ((resultIndex) == selectResults.size()) {
incompleteArray = true;
break;
}
if (logger.isTraceEnabled()) {
logger.trace("{}: Adding entry [{}] to query results: {}", servConn.getName(), resultIndex, objs[resultIndex]);
}
if (cqQuery != null){
CqEntry e = (CqEntry)objs[resultIndex];
// The value may have become null because of entry invalidation.
if (e.getValue() == null) {
resultIndex++;
// i will get incremented anyway so we need to decrement it back so
// that results[i] is not null.
i--;
continue;
}
// Add the key into CQ results cache.
// For PR the Result caching is not yet supported.
// cqQuery.cqResultsCacheInitialized is added to take care
// of CQ execute requests that are re-sent. In that case no
// need to update the Results cache.
if (!cqQuery.isPR()) {
cqQuery.addToCqResultKeys(e.getKey());
}
// Add to the Results object array.
if (sendCqResultsWithKey) {
results[i] = e.getKeyValuePair();
} else {
results[i] = e.getValue();
}
} else {
// instance check added to fix bug 40516.
if (isStructs && (objs[resultIndex] instanceof Struct)) {
results[i] = ((Struct) objs[resultIndex]).getFieldValues();
} else {
results[i] = objs[resultIndex];
}
}
resultIndex++;
cqResultIndex++;
}
// Shrink array if necessary. This will occur if the number
// of entries in the chunk does not divide evenly into the
// number of entries in the result set.
if (incompleteArray) {
Object[] newResults;
if (cqQuery != null) {
newResults = new Object[cqResultIndex % maximumChunkSize];
} else {
newResults = new Object[resultIndex % maximumChunkSize];
}
for (int i = 0; i < newResults.length; i++) {
newResults[i] = results[i];
}
results = newResults;
}
if (sendResults) {
writeQueryResponseChunk(results, collectionType,
(resultIndex == selectResults.size()), servConn);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}",
servConn.getName(), (j + 1), numberOfChunks, queryString);
}
}
// If we have reached the last element of SelectResults then we should
// break out of loop here only.
if (resultIndex == selectResults.size()) {
break;
}
}
}
private static void sendResultsAsObjectPartList(int numberOfChunks,
ServerConnection servConn, List objs, boolean isStructs,
CollectionType collectionType, String queryString, ServerCQ cqQuery, boolean sendCqResultsWithKey, boolean sendResults)
throws IOException {
int resultIndex = 0;
Object result = null;
for (int j = 0; j < numberOfChunks; j++) {
if (logger.isTraceEnabled()) {
logger.trace("{}: Creating chunk: {}", servConn.getName(), j);
}
ObjectPartList serializedObjs = new ObjectPartList(maximumChunkSize,
false);
for (int i = 0; i < maximumChunkSize; i++) {
if ((resultIndex) == objs.size()) {
break;
}
if (logger.isTraceEnabled()) {
logger.trace("{}: Adding entry [{}] to query results: {}", servConn.getName() , resultIndex, objs.get(resultIndex));
}
if (cqQuery != null){
CqEntry e = (CqEntry)objs.get(resultIndex);
// The value may have become null because of entry invalidation.
if (e.getValue() == null) {
resultIndex++;
continue;
}
// Add the key into CQ results cache.
// For PR the Result caching is not yet supported.
// cqQuery.cqResultsCacheInitialized is added to take care
// of CQ execute requests that are re-sent. In that case no
// need to update the Results cache.
if (!cqQuery.isPR()) {
cqQuery.addToCqResultKeys(e.getKey());
}
// Add to the Results object array.
if (sendCqResultsWithKey) {
result = e.getKeyValuePair();
} else {
result = e.getValue();
}
}
else {
result = objs.get(resultIndex);
}
if (sendResults) {
addToObjectPartList(serializedObjs, result, collectionType, false,
servConn, isStructs);
}
resultIndex++;
}
if (sendResults) {
writeQueryResponseChunk(serializedObjs, collectionType,
((j + 1) == numberOfChunks), servConn);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}",
servConn.getName(), (j + 1), numberOfChunks, queryString);
}
}
}
}
private static void addToObjectPartList(ObjectPartList serializedObjs,
Object res, CollectionType collectionType, boolean lastChunk,
ServerConnection servConn, boolean isStructs) throws IOException {
if (isStructs && (res instanceof Struct)) {
Object[] values = ((Struct) res).getFieldValues();
// create another ObjectPartList for the struct
ObjectPartList serializedValueObjs = new ObjectPartList(values.length,
false);
for (Object value : values) {
if (value instanceof CachedDeserializable) {
serializedValueObjs.addPart(null,
((CachedDeserializable) value).getSerializedValue(),
ObjectPartList.OBJECT, null);
} else {
addDeSerializedObjectToObjectPartList(serializedValueObjs, value);
}
}
serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT,
null);
} else if (res instanceof Object[]) {// for CQ key-value pairs
Object[] values = ((Object[]) res);
// create another ObjectPartList for the Object[]
ObjectPartList serializedValueObjs = new ObjectPartList(values.length,
false);
for (Object value : values) {
if (value instanceof CachedDeserializable) {
serializedValueObjs.addPart(null,
((CachedDeserializable) value).getSerializedValue(),
ObjectPartList.OBJECT, null);
} else {
addDeSerializedObjectToObjectPartList(serializedValueObjs, value);
}
}
serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT,
null);
} else if (res instanceof CachedDeserializable) {
serializedObjs.addPart(null,
((CachedDeserializable) res).getSerializedValue(),
ObjectPartList.OBJECT, null);
} else { // for deserialized objects
addDeSerializedObjectToObjectPartList(serializedObjs, res);
}
}
private static void addDeSerializedObjectToObjectPartList(
ObjectPartList objPartList, Object obj) {
if (obj instanceof byte[]) {
objPartList.addPart(null, obj, ObjectPartList.BYTES, null);
} else {
objPartList.addPart(null, obj, ObjectPartList.OBJECT, null);
}
}
}