| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.operations.QueryOperationContext; |
| import com.gemstone.gemfire.cache.query.Query; |
| import com.gemstone.gemfire.cache.query.QueryException; |
| import com.gemstone.gemfire.cache.query.QueryInvalidException; |
| import com.gemstone.gemfire.cache.query.SelectResults; |
| import com.gemstone.gemfire.cache.query.Struct; |
| import com.gemstone.gemfire.cache.query.internal.CqEntry; |
| import com.gemstone.gemfire.cache.query.internal.DefaultQuery; |
| import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; |
| import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ; |
| import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl; |
| import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; |
| import com.gemstone.gemfire.cache.query.types.CollectionType; |
| import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; |
| import com.gemstone.gemfire.distributed.internal.DistributionStats; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.CachedDeserializable; |
| import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; |
| import com.gemstone.gemfire.internal.cache.tier.MessageType; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.security.AuthorizeRequestPP; |
| |
| public abstract class BaseCommandQuery extends BaseCommand { |
| |
| /** |
| * Process the give query and sends the resulset back to the client. |
| * |
| * @param msg |
| * @param query |
| * @param queryString |
| * @param regionNames |
| * @param start |
| * @param cqQuery |
| * @param queryContext |
| * @param servConn |
| * @return true if successful execution |
| * false in case of failure. |
| * @throws IOException |
| */ |
| protected static boolean processQuery(Message msg, Query query, |
| String queryString, Set regionNames, long start, ServerCQ cqQuery, |
| QueryOperationContext queryContext, ServerConnection servConn, |
| boolean sendResults) |
| throws IOException, InterruptedException { |
| return processQueryUsingParams(msg, query, queryString, |
| regionNames, start, cqQuery, queryContext, servConn, sendResults, null); |
| } |
| |
| /** |
| * Process the give query and sends the resulset back to the client. |
| * |
| * @param msg |
| * @param query |
| * @param queryString |
| * @param regionNames |
| * @param start |
| * @param cqQuery |
| * @param queryContext |
| * @param servConn |
| * @return true if successful execution |
| * false in case of failure. |
| * @throws IOException |
| */ |
| protected static boolean processQueryUsingParams(Message msg, Query query, |
| String queryString, Set regionNames, long start, ServerCQ cqQuery, |
| QueryOperationContext queryContext, ServerConnection servConn, |
| boolean sendResults, Object[] params) |
| throws IOException, InterruptedException { |
| ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage(); |
| CacheServerStats stats = servConn.getCacheServerStats(); |
| CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); |
| |
| { |
| long oldStart = start; |
| start = DistributionStats.getStatTime(); |
| stats.incReadQueryRequestTime(start - oldStart); |
| } |
| |
| // from 7.0, set flag to indicate a remote query irrespective of the |
| // object type |
| if (servConn.getClientVersion().compareTo(Version.GFE_70) >= 0) { |
| ((DefaultQuery) query).setRemoteQuery(true); |
| } |
| // Process the query request |
| try { |
| |
| // Execute query |
| // startTime = GenericStats.getTime(); |
| // startTime = System.currentTimeMillis(); |
| |
| // For now we assume the results are a SelectResults |
| // which is the only possibility now, but this may change |
| // in the future if we support arbitrary queries |
| Object result = null; |
| |
| if (params != null) { |
| result = query.execute(params); |
| } else { |
| result = query.execute(); |
| } |
| |
| //Asif : Before conditioning the results check if any |
| //of the regions involved in the query have been destroyed |
| //or not. If yes, throw an Exception. |
| //This is a workaround/fix for Bug 36969 |
| Iterator itr = regionNames.iterator(); |
| while(itr.hasNext()) { |
| String regionName = (String)itr.next(); |
| if(crHelper.getRegion(regionName) == null) { |
| throw new RegionDestroyedException( |
| LocalizedStrings.BaseCommand_REGION_DESTROYED_DURING_THE_EXECUTION_OF_THE_QUERY.toLocalizedString(), regionName); |
| } |
| } |
| AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); |
| if (postAuthzRequest != null) { |
| if (cqQuery == null) { |
| queryContext = postAuthzRequest.queryAuthorize(queryString, |
| regionNames, result, queryContext, params); |
| } |
| else { |
| queryContext = postAuthzRequest.executeCQAuthorize(cqQuery.getName(), |
| queryString, regionNames, result, queryContext); |
| } |
| result = queryContext.getQueryResult(); |
| } |
| |
| // endTime = System.currentTimeMillis(); |
| // System.out.println("Query executed in: " + (endTime-startTime) + "ms"); |
| // GenericStats.endTime0(startTime); |
| |
| |
| if (result instanceof SelectResults) { |
| SelectResults selectResults = (SelectResults)result; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Query Result size for : {} is {}", query.getQueryString(), selectResults.size()); |
| } |
| |
| CollectionType collectionType = null; |
| boolean sendCqResultsWithKey = true; |
| boolean isStructs = false; |
| |
| // check if resultset has serialized objects, so that they could be sent |
| // as ObjectPartList |
| boolean hasSerializedObjects = ((DefaultQuery) query) |
| .isKeepSerialized(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Query Result for :{} has serialized objects: {}", query.getQueryString(), hasSerializedObjects); |
| } |
| // Don't convert to a Set, there might be duplicates now |
| // The results in a StructSet are stored in Object[]s |
| // Get them as Object[]s for the objs[] in order to avoid duplicating |
| // the StructTypes |
| |
| // Object[] objs = new Object[selectResults.size()]; |
| // Get the collection type (which includes the element type) |
| // (used to generate the appropriate instance on the client) |
| |
| // Get the collection type (which includes the element type) |
| // (used to generate the appropriate instance on the client) |
| collectionType = selectResults.getCollectionType(); |
| isStructs = collectionType.getElementType().isStructType(); |
| |
| // Check if the Query is from CQ execution. |
| if (cqQuery != null){ |
| // Check if the key can be sent to the client based on its version. |
| sendCqResultsWithKey = sendCqResultsWithKey(servConn); |
| |
| if (sendCqResultsWithKey){ |
| // Update the collection type to include key info. |
| collectionType = new CollectionTypeImpl(Collection.class, |
| new StructTypeImpl(new String[]{"key", "value"})); |
| isStructs = collectionType.getElementType().isStructType(); |
| } |
| } |
| |
| int numberOfChunks = (int)Math.ceil(selectResults.size() * 1.0 |
| / maximumChunkSize); |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Query results size: {}: Entries in chunk: {}: Number of chunks: {}", |
| servConn.getName(), selectResults.size(), maximumChunkSize, numberOfChunks); |
| } |
| |
| long oldStart = start; |
| start = DistributionStats.getStatTime(); |
| stats.incProcessQueryTime(start - oldStart); |
| |
| if(sendResults){ |
| queryResponseMsg.setMessageType(MessageType.RESPONSE); |
| queryResponseMsg.setTransactionId(msg.getTransactionId()); |
| queryResponseMsg.sendHeader(); |
| } |
| |
| if (sendResults && numberOfChunks == 0) { |
| // Send 1 empty chunk |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Creating chunk: 0", servConn.getName() ); |
| } |
| writeQueryResponseChunk(new Object[0], collectionType, true, servConn); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sent chunk (1 of 1) of query response for query {}", servConn.getName(), queryString); |
| } |
| } |
| else { |
| // Send response to client. |
| // from 7.0, if the object is in the form of serialized byte array, |
| // send it as a part of ObjectPartList |
| if (hasSerializedObjects) { |
| sendResultsAsObjectPartList(numberOfChunks, servConn, |
| selectResults.asList(), isStructs, collectionType, |
| queryString, cqQuery, sendCqResultsWithKey, sendResults); |
| } else { |
| sendResultsAsObjectArray(selectResults, numberOfChunks, servConn, |
| isStructs, collectionType, queryString, cqQuery, sendCqResultsWithKey, sendResults); |
| } |
| } |
| |
| if(cqQuery != null){ |
| // Set the CQ query result cache initialized flag. |
| cqQuery.setCqResultsCacheInitialized(); |
| } |
| |
| } |
| else if (result instanceof Integer) { |
| if (sendResults) { |
| queryResponseMsg.setMessageType(MessageType.RESPONSE); |
| queryResponseMsg.setTransactionId(msg.getTransactionId()); |
| queryResponseMsg.sendHeader(); |
| writeQueryResponseChunk(result, null, true, servConn); |
| } |
| } |
| else { |
| throw new QueryInvalidException(LocalizedStrings.BaseCommand_UNKNOWN_RESULT_TYPE_0.toLocalizedString(result.getClass())); |
| } |
| msg.clearParts(); |
| } |
| catch (QueryInvalidException e) { |
| // Handle this exception differently since it can contain |
| // non-serializable objects. |
| // java.io.NotSerializableException: antlr.CommonToken |
| // Log a warning to show stack trace and create a new |
| // QueryInvalidEsception on the original one's message (not cause). |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_UNEXPECTED_QUERYINVALIDEXCEPTION_WHILE_PROCESSING_QUERY_0, queryString), e); |
| QueryInvalidException qie = new QueryInvalidException(LocalizedStrings.BaseCommand_0_QUERYSTRING_IS_1 |
| .toLocalizedString(new Object[] {e.getLocalizedMessage(), queryString})); |
| writeQueryResponseException(msg, qie, false, servConn); |
| return false; |
| } |
| catch (DistributedSystemDisconnectedException se) { |
| if (msg != null && logger.isDebugEnabled()) { |
| logger.debug("{}: ignoring message of type {} from client {} because shutdown occurred during message processing.", |
| servConn.getName(), MessageType.getString(msg.getMessageType()), servConn.getProxyID()); |
| } |
| servConn.setFlagProcessMessagesAsFalse(); |
| return false; |
| } |
| catch (Exception e) { |
| // If an interrupted exception is thrown , rethrow it |
| checkForInterrupt(servConn, e); |
| // Otherwise, write a query response and continue |
| // Check if query got canceled from QueryMonitor. |
| DefaultQuery defaultQuery = (DefaultQuery)query; |
| if ((defaultQuery).isCanceled()){ |
| e = new QueryException(defaultQuery.getQueryCanceledException().getMessage(), e.getCause()); |
| } |
| writeQueryResponseException(msg, e, false, servConn); |
| return false; |
| } finally { |
| // Since the query object is being shared in case of bind queries, |
| // resetting the flag may cause inconsistency. |
| // Also since this flag is only being set in code path executed by |
| // remote query execution, resetting it is not required. |
| |
| //((DefaultQuery)query).setRemoteQuery(false); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sent query response for query {}", servConn.getName(), queryString); |
| } |
| |
| stats.incWriteQueryResponseTime(DistributionStats.getStatTime() - start); |
| return true; |
| } |
| |
| private static boolean sendCqResultsWithKey(ServerConnection servConn) { |
| Version clientVersion = servConn.getClientVersion(); |
| if (clientVersion.compareTo(Version.GFE_65) >= 0) { |
| return true; |
| } |
| return false; |
| } |
| |
| protected static void sendCqResponse(int msgType, String msgStr, int txId, |
| Throwable e, ServerConnection servConn) throws IOException { |
| ChunkedMessage cqMsg = servConn.getChunkedResponseMessage(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("CQ Response message :{}", msgStr); |
| } |
| |
| switch (msgType) { |
| case MessageType.REPLY: |
| cqMsg.setNumberOfParts(1); |
| break; |
| |
| case MessageType.CQDATAERROR_MSG_TYPE: |
| logger.warn(msgStr); |
| cqMsg.setNumberOfParts(1); |
| break; |
| |
| case MessageType.CQ_EXCEPTION_TYPE: |
| String exMsg = ""; |
| if (e != null) { |
| exMsg = e.getLocalizedMessage(); |
| } |
| logger.info(msgStr + exMsg, e); |
| |
| msgStr += exMsg; // fixes bug 42309 |
| |
| cqMsg.setNumberOfParts(1); |
| break; |
| |
| default: |
| msgType = MessageType.CQ_EXCEPTION_TYPE; |
| cqMsg.setNumberOfParts(1); |
| msgStr += LocalizedStrings.BaseCommand_UNKNOWN_QUERY_EXCEPTION.toLocalizedString(); |
| break; |
| } |
| |
| cqMsg.setMessageType(msgType); |
| cqMsg.setTransactionId(txId); |
| cqMsg.sendHeader(); |
| cqMsg.addStringPart(msgStr); |
| cqMsg.setLastChunk(true); |
| cqMsg.sendChunk(servConn); |
| cqMsg.setLastChunk(true); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("CQ Response sent successfully"); |
| } |
| } |
| |
| private static void sendResultsAsObjectArray(SelectResults selectResults, |
| int numberOfChunks, ServerConnection servConn, |
| boolean isStructs, CollectionType collectionType, String queryString, ServerCQ cqQuery, boolean sendCqResultsWithKey, boolean sendResults) |
| throws IOException { |
| int resultIndex = 0; |
| // For CQ only as we dont want CQEntries which have null values. |
| int cqResultIndex = 0; |
| Object[] objs = selectResults.toArray(); |
| for (int j = 0; j < numberOfChunks; j++) { |
| boolean incompleteArray = false; |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Creating chunk: {}", servConn.getName(), j); |
| } |
| Object[] results = new Object[maximumChunkSize]; |
| for (int i = 0; i < maximumChunkSize; i++) { |
| if ((resultIndex) == selectResults.size()) { |
| incompleteArray = true; |
| break; |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Adding entry [{}] to query results: {}", servConn.getName(), resultIndex, objs[resultIndex]); |
| } |
| if (cqQuery != null){ |
| CqEntry e = (CqEntry)objs[resultIndex]; |
| // The value may have become null because of entry invalidation. |
| if (e.getValue() == null) { |
| resultIndex++; |
| // i will get incremented anyway so we need to decrement it back so |
| // that results[i] is not null. |
| i--; |
| continue; |
| } |
| // Add the key into CQ results cache. |
| // For PR the Result caching is not yet supported. |
| // cqQuery.cqResultsCacheInitialized is added to take care |
| // of CQ execute requests that are re-sent. In that case no |
| // need to update the Results cache. |
| if (!cqQuery.isPR()) { |
| cqQuery.addToCqResultKeys(e.getKey()); |
| } |
| |
| // Add to the Results object array. |
| if (sendCqResultsWithKey) { |
| results[i] = e.getKeyValuePair(); |
| } else { |
| results[i] = e.getValue(); |
| } |
| } else { |
| // instance check added to fix bug 40516. |
| if (isStructs && (objs[resultIndex] instanceof Struct)) { |
| results[i] = ((Struct) objs[resultIndex]).getFieldValues(); |
| } else { |
| results[i] = objs[resultIndex]; |
| } |
| } |
| resultIndex++; |
| cqResultIndex++; |
| } |
| // Shrink array if necessary. This will occur if the number |
| // of entries in the chunk does not divide evenly into the |
| // number of entries in the result set. |
| if (incompleteArray) { |
| Object[] newResults; |
| if (cqQuery != null) { |
| newResults = new Object[cqResultIndex % maximumChunkSize]; |
| } else { |
| newResults = new Object[resultIndex % maximumChunkSize]; |
| } |
| for (int i = 0; i < newResults.length; i++) { |
| newResults[i] = results[i]; |
| } |
| results = newResults; |
| } |
| |
| if (sendResults) { |
| writeQueryResponseChunk(results, collectionType, |
| (resultIndex == selectResults.size()), servConn); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}", |
| servConn.getName(), (j + 1), numberOfChunks, queryString); |
| } |
| } |
| // If we have reached the last element of SelectResults then we should |
| // break out of loop here only. |
| if (resultIndex == selectResults.size()) { |
| break; |
| } |
| } |
| } |
| |
| private static void sendResultsAsObjectPartList(int numberOfChunks, |
| ServerConnection servConn, List objs, boolean isStructs, |
| CollectionType collectionType, String queryString, ServerCQ cqQuery, boolean sendCqResultsWithKey, boolean sendResults) |
| throws IOException { |
| int resultIndex = 0; |
| Object result = null; |
| for (int j = 0; j < numberOfChunks; j++) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Creating chunk: {}", servConn.getName(), j); |
| } |
| ObjectPartList serializedObjs = new ObjectPartList(maximumChunkSize, |
| false); |
| for (int i = 0; i < maximumChunkSize; i++) { |
| if ((resultIndex) == objs.size()) { |
| break; |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Adding entry [{}] to query results: {}", servConn.getName() , resultIndex, objs.get(resultIndex)); |
| } |
| if (cqQuery != null){ |
| CqEntry e = (CqEntry)objs.get(resultIndex); |
| // The value may have become null because of entry invalidation. |
| if (e.getValue() == null) { |
| resultIndex++; |
| continue; |
| } |
| // Add the key into CQ results cache. |
| // For PR the Result caching is not yet supported. |
| // cqQuery.cqResultsCacheInitialized is added to take care |
| // of CQ execute requests that are re-sent. In that case no |
| // need to update the Results cache. |
| if (!cqQuery.isPR()) { |
| cqQuery.addToCqResultKeys(e.getKey()); |
| } |
| |
| // Add to the Results object array. |
| if (sendCqResultsWithKey) { |
| result = e.getKeyValuePair(); |
| } else { |
| result = e.getValue(); |
| } |
| } |
| else { |
| result = objs.get(resultIndex); |
| } |
| if (sendResults) { |
| addToObjectPartList(serializedObjs, result, collectionType, false, |
| servConn, isStructs); |
| } |
| resultIndex++; |
| } |
| |
| if (sendResults) { |
| writeQueryResponseChunk(serializedObjs, collectionType, |
| ((j + 1) == numberOfChunks), servConn); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sent chunk ({} of {}) of query response for query: {}", |
| servConn.getName(), (j + 1), numberOfChunks, queryString); |
| } |
| } |
| } |
| } |
| |
| private static void addToObjectPartList(ObjectPartList serializedObjs, |
| Object res, CollectionType collectionType, boolean lastChunk, |
| ServerConnection servConn, boolean isStructs) throws IOException { |
| |
| if (isStructs && (res instanceof Struct)) { |
| Object[] values = ((Struct) res).getFieldValues(); |
| // create another ObjectPartList for the struct |
| ObjectPartList serializedValueObjs = new ObjectPartList(values.length, |
| false); |
| for (Object value : values) { |
| if (value instanceof CachedDeserializable) { |
| serializedValueObjs.addPart(null, |
| ((CachedDeserializable) value).getSerializedValue(), |
| ObjectPartList.OBJECT, null); |
| } else { |
| addDeSerializedObjectToObjectPartList(serializedValueObjs, value); |
| } |
| } |
| serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT, |
| null); |
| } else if (res instanceof Object[]) {// for CQ key-value pairs |
| Object[] values = ((Object[]) res); |
| // create another ObjectPartList for the Object[] |
| ObjectPartList serializedValueObjs = new ObjectPartList(values.length, |
| false); |
| for (Object value : values) { |
| if (value instanceof CachedDeserializable) { |
| serializedValueObjs.addPart(null, |
| ((CachedDeserializable) value).getSerializedValue(), |
| ObjectPartList.OBJECT, null); |
| } else { |
| addDeSerializedObjectToObjectPartList(serializedValueObjs, value); |
| } |
| } |
| serializedObjs.addPart(null, serializedValueObjs, ObjectPartList.OBJECT, |
| null); |
| } else if (res instanceof CachedDeserializable) { |
| serializedObjs.addPart(null, |
| ((CachedDeserializable) res).getSerializedValue(), |
| ObjectPartList.OBJECT, null); |
| } else { // for deserialized objects |
| addDeSerializedObjectToObjectPartList(serializedObjs, res); |
| } |
| } |
| |
| private static void addDeSerializedObjectToObjectPartList( |
| ObjectPartList objPartList, Object obj) { |
| if (obj instanceof byte[]) { |
| objPartList.addPart(null, obj, ObjectPartList.BYTES, null); |
| } else { |
| objPartList.addPart(null, obj, ObjectPartList.OBJECT, null); |
| } |
| } |
| |
| } |