| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| /** |
| * |
| */ |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Semaphore; |
| import java.util.regex.Pattern; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.CopyException; |
| import com.gemstone.gemfire.InternalGemFireError; |
| import com.gemstone.gemfire.SerializationException; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.CacheLoaderException; |
| import com.gemstone.gemfire.cache.CacheWriterException; |
| import com.gemstone.gemfire.cache.InterestResultPolicy; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException; |
| import com.gemstone.gemfire.cache.TransactionDataRebalancedException; |
| import com.gemstone.gemfire.cache.TransactionException; |
| import com.gemstone.gemfire.cache.operations.QueryOperationContext; |
| import com.gemstone.gemfire.cache.persistence.PartitionOfflineException; |
| import com.gemstone.gemfire.cache.query.Query; |
| import com.gemstone.gemfire.cache.query.QueryException; |
| import com.gemstone.gemfire.cache.query.QueryInvalidException; |
| import com.gemstone.gemfire.cache.query.SelectResults; |
| import com.gemstone.gemfire.cache.query.Struct; |
| import com.gemstone.gemfire.cache.query.internal.CqEntry; |
| import com.gemstone.gemfire.cache.query.internal.DefaultQuery; |
| import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl; |
| import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; |
| import com.gemstone.gemfire.cache.query.types.CollectionType; |
| import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; |
| import com.gemstone.gemfire.distributed.internal.DistributionStats; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.i18n.LogWriterI18n; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.CachedDeserializable; |
| import com.gemstone.gemfire.internal.cache.DistributedRegion; |
| import com.gemstone.gemfire.internal.cache.EntryEventImpl; |
| import com.gemstone.gemfire.internal.cache.EntrySnapshot; |
| import com.gemstone.gemfire.internal.cache.EventID; |
| import com.gemstone.gemfire.internal.cache.FindVersionTagOperation; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; |
| import com.gemstone.gemfire.internal.cache.TXManagerImpl; |
| import com.gemstone.gemfire.internal.cache.TXStateProxy; |
| import com.gemstone.gemfire.internal.cache.Token; |
| import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; |
| import com.gemstone.gemfire.internal.cache.tier.Command; |
| import com.gemstone.gemfire.internal.cache.tier.InterestType; |
| import com.gemstone.gemfire.internal.cache.tier.MessageType; |
| import com.gemstone.gemfire.internal.cache.versions.VersionStamp; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.offheap.OffHeapHelper; |
| import com.gemstone.gemfire.internal.security.AuthorizeRequestPP; |
| import com.gemstone.gemfire.internal.sequencelog.EntryLogger; |
| import com.gemstone.gemfire.security.GemFireSecurityException; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| /** |
| * @author ashahid |
| * |
| */ |
| public abstract class BaseCommand implements Command { |
| protected static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * Whether zipped values are being passed to/from the client. Can be modified |
| * using the system property Message.ZIP_VALUES ? This does not appear to |
| * happen anywhere |
| */ |
| protected static final boolean zipValues = false; |
| |
| protected static final boolean APPLY_RETRIES = Boolean |
| .getBoolean("gemfire.gateway.ApplyRetries"); |
| |
| public static final byte[] OK_BYTES = new byte[]{0}; |
| |
| public static final int maximumChunkSize = Integer.getInteger( |
| "BridgeServer.MAXIMUM_CHUNK_SIZE", 100).intValue(); |
| |
| /** Maximum number of entries in each chunked response chunk */ |
| |
| /** Whether to suppress logging of IOExceptions */ |
| private static boolean suppressIOExceptionLogging = Boolean |
| .getBoolean("gemfire.bridge.suppressIOExceptionLogging"); |
| |
| /** |
| * Maximum number of concurrent incoming client message bytes that a bridge |
| * server will allow. Once a server is working on this number additional |
| * incoming client messages will wait until one of them completes or fails. |
| * The bytes are computed based in the size sent in the incoming msg header. |
| */ |
| private static final int MAX_INCOMING_DATA = Integer.getInteger( |
| "BridgeServer.MAX_INCOMING_DATA", -1).intValue(); |
| |
| /** |
| * Maximum number of concurrent incoming client messages that a bridge server |
| * will allow. Once a server is working on this number additional incoming |
| * client messages will wait until one of them completes or fails. |
| */ |
| private static final int MAX_INCOMING_MSGS = Integer.getInteger( |
| "BridgeServer.MAX_INCOMING_MSGS", -1).intValue(); |
| |
| private static final Semaphore incomingDataLimiter; |
| |
| private static final Semaphore incomingMsgLimiter; |
| static { |
| Semaphore tmp; |
| if (MAX_INCOMING_DATA > 0) { |
| // backport requires that this is fair since we inc by values > 1 |
| tmp = new Semaphore(MAX_INCOMING_DATA, true); |
| } |
| else { |
| tmp = null; |
| } |
| incomingDataLimiter = tmp; |
| if (MAX_INCOMING_MSGS > 0) { |
| tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best |
| // performance |
| } |
| else { |
| tmp = null; |
| } |
| incomingMsgLimiter = tmp; |
| |
| } |
| |
| final public void execute(Message msg, ServerConnection servConn) { |
| // Read the request and update the statistics |
| long start = DistributionStats.getStatTime(); |
| //servConn.resetTransientData(); |
| if(EntryLogger.isEnabled() && servConn != null) { |
| EntryLogger.setSource(servConn.getMembershipID(), "c2s"); |
| } |
| boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn); |
| try { |
| if (shouldMasquerade) { |
| GemFireCacheImpl cache = (GemFireCacheImpl)servConn.getCache(); |
| InternalDistributedMember member = (InternalDistributedMember)servConn.getProxyID().getDistributedMember(); |
| TXManagerImpl txMgr = cache.getTxManager(); |
| TXStateProxy tx = null; |
| try { |
| tx = txMgr.masqueradeAs(msg, member, false); |
| cmdExecute(msg, servConn, start); |
| } finally { |
| txMgr.unmasquerade(tx); |
| } |
| } else { |
| cmdExecute(msg, servConn, start); |
| } |
| |
| } |
| catch (EOFException eof) { |
| BaseCommand.handleEOFException(msg, servConn, eof); |
| // TODO:Asif: Check if there is any need for explicitly returning |
| return; |
| } |
| catch (InterruptedIOException e) { // Solaris only |
| BaseCommand.handleInterruptedIOException(msg, servConn, e); |
| return; |
| } |
| catch (IOException e) { |
| BaseCommand.handleIOException(msg, servConn, e); |
| return; |
| } |
| catch (DistributedSystemDisconnectedException e) { |
| BaseCommand.handleShutdownException(msg, servConn, e); |
| return; |
| } |
| catch (PartitionOfflineException e) { // fix for bug #42225 |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } |
| catch (GemFireSecurityException e) { |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } |
| catch (CacheLoaderException e) { |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } |
| catch (CacheWriterException e) { |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } catch (SerializationException e) { |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } catch (CopyException e) { |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } catch (TransactionException e) { |
| handleExceptionNoDisconnect(msg, servConn, e); |
| } |
| |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable e) { |
| BaseCommand.handleThrowable(msg, servConn, e); |
| } finally { |
| EntryLogger.clearSource(); |
| } |
| /* |
| * finally { // Keep track of the fact that a message is no longer being // |
| * processed. servConn.setNotProcessingMessage(); |
| * servConn.clearRequestMsg(); } |
| */ |
| } |
| |
| /** |
| * checks to see if this thread needs to masquerade as a transactional thread. |
| * clients after GFE_66 should be able to start a transaction. |
| * @param msg |
| * @param servConn |
| * @return true if thread should masquerade as a transactional thread. |
| */ |
| protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) { |
| if (servConn.getClientVersion().compareTo(Version.GFE_66) >= 0 |
| && msg.getTransactionId() > TXManagerImpl.NOTX) { |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * If an operation is retried then some server may have seen it already. |
| * We cannot apply this operation to the cache without knowing whether a |
| * version tag has already been created for it. Otherwise caches that have |
| * seen the event already will reject it but others will not, but will have |
| * no version tag with which to perform concurrency checks. |
| * <p>The client event should have the event identifier from the client and |
| * the region affected by the operation. |
| * @param clientEvent |
| */ |
| public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) { |
| LocalRegion r = clientEvent.getRegion(); |
| VersionTag tag = null; |
| if ((clientEvent.getVersionTag() != null) && (clientEvent.getVersionTag().isGatewayTag())) { |
| tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId()); |
| } |
| else { |
| tag = r.findVersionTagForClientEvent(clientEvent.getEventId()); |
| } |
| if (tag == null) { |
| if (r instanceof DistributedRegion || r instanceof PartitionedRegion) { |
| // TODO this could be optimized for partitioned regions by sending the key |
| // so that the PR could look at an individual bucket for the event |
| tag = FindVersionTagOperation.findVersionTag(r, clientEvent.getEventId(), false); |
| } |
| } |
| if (tag != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recovered version tag {} for replayed operation {}", tag, clientEvent.getEventId()); |
| } |
| clientEvent.setVersionTag(tag); |
| } |
| return (tag != null); |
| } |
| |
| /** |
| * If an operation is retried then some server may have seen it already. |
| * We cannot apply this operation to the cache without knowing whether a |
| * version tag has already been created for it. Otherwise caches that have |
| * seen the event already will reject it but others will not, but will have |
| * no version tag with which to perform concurrency checks. |
| * <p>The client event should have the event identifier from the client and |
| * the region affected by the operation. |
| */ |
| protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion r, EventID eventID) { |
| VersionTag tag = r.findVersionTagForClientBulkOp(eventID); |
| if(tag != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID); |
| } |
| return tag; |
| } |
| if (r instanceof DistributedRegion || r instanceof PartitionedRegion) { |
| // TODO this could be optimized for partitioned regions by sending the key |
| // so that the PR could look at an individual bucket for the event |
| tag = FindVersionTagOperation.findVersionTag(r, eventID, true); |
| } |
| if (tag != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID); |
| } |
| } |
| return tag; |
| } |
| |
| abstract public void cmdExecute(Message msg, ServerConnection servConn, |
| long start) throws IOException, ClassNotFoundException, InterruptedException; |
| |
| protected void writeReply(Message origMsg, ServerConnection servConn) |
| throws IOException { |
| Message replyMsg = servConn.getReplyMessage(); |
| servConn.getCache().getCancelCriterion().checkCancelInProgress(null); |
| replyMsg.setMessageType(MessageType.REPLY); |
| replyMsg.setNumberOfParts(1); |
| replyMsg.setTransactionId(origMsg.getTransactionId()); |
| replyMsg.addBytesPart(OK_BYTES); |
| replyMsg.send(servConn); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: rpl tx: {}", servConn.getName(), origMsg.getTransactionId()); |
| } |
| } |
| protected void writeReplyWithRefreshMetadata(Message origMsg, |
| ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException { |
| Message replyMsg = servConn.getReplyMessage(); |
| servConn.getCache().getCancelCriterion().checkCancelInProgress(null); |
| replyMsg.setMessageType(MessageType.REPLY); |
| replyMsg.setNumberOfParts(1); |
| replyMsg.setTransactionId(origMsg.getTransactionId()); |
| replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); |
| replyMsg.send(servConn); |
| pr.getPrStats().incPRMetaDataSentCount(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), origMsg.getTransactionId()); |
| } |
| } |
| |
| private static void handleEOFException(Message msg, |
| ServerConnection servConn, Exception eof) { |
| CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); |
| CacheServerStats stats = servConn.getCacheServerStats(); |
| boolean potentialModification = servConn.getPotentialModification(); |
| if (!crHelper.isShutdown()) { |
| if (potentialModification) { |
| stats.incAbandonedWriteRequests(); |
| } |
| else { |
| stats.incAbandonedReadRequests(); |
| } |
| if (!suppressIOExceptionLogging) { |
| if (potentialModification) { |
| int transId = (msg != null) ? msg.getTransactionId() |
| : Integer.MIN_VALUE; |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_EOFEXCEPTION_DURING_A_WRITE_OPERATION_ON_REGION__1_KEY_2_MESSAGEID_3, |
| new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)})); |
| } |
| else { |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_CONNECTION_DISCONNECT_DETECTED_BY_EOF, |
| servConn.getName())); |
| } |
| } |
| } |
| servConn.setFlagProcessMessagesAsFalse(); |
| } |
| |
| private static void handleInterruptedIOException(Message msg, |
| ServerConnection servConn, Exception e) { |
| CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); |
| if (!crHelper.isShutdown() && servConn.isOpen()) { |
| if (!suppressIOExceptionLogging) { |
| if (logger.isDebugEnabled()) |
| logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e); |
| } |
| } |
| servConn.setFlagProcessMessagesAsFalse(); |
| } |
| |
| private static void handleIOException(Message msg, ServerConnection servConn, |
| Exception e) { |
| CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); |
| boolean potentialModification = servConn.getPotentialModification(); |
| |
| if (!crHelper.isShutdown() && servConn.isOpen()) { |
| if (!suppressIOExceptionLogging) { |
| if (potentialModification) { |
| int transId = (msg != null) ? msg.getTransactionId() |
| : Integer.MIN_VALUE; |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION_DURING_OPERATION_FOR_REGION_1_KEY_2_MESSID_3, |
| new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e); |
| } |
| else { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION, |
| servConn.getName()), e); |
| } |
| } |
| } |
| servConn.setFlagProcessMessagesAsFalse(); |
| } |
| |
| private static void handleShutdownException(Message msg, |
| ServerConnection servConn, Exception e) { |
| CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); |
| boolean potentialModification = servConn.getPotentialModification(); |
| |
| if (!crHelper.isShutdown()) { |
| if (potentialModification) { |
| int transId = (msg != null) ? msg.getTransactionId() |
| : Integer.MIN_VALUE; |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3, |
| new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e); |
| } |
| else { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION, |
| servConn.getName()),e); |
| } |
| } |
| servConn.setFlagProcessMessagesAsFalse(); |
| } |
| |
| // Handle GemfireSecurityExceptions separately since the connection should not |
| // be terminated (by setting processMessages to false) unlike in |
| // handleThrowable. Fixes bugs #38384 and #39392. |
| // private static void handleGemfireSecurityException(Message msg, |
| // ServerConnection servConn, GemFireSecurityException e) { |
| // |
| // boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE); |
| // boolean responded = servConn.getTransientFlag(RESPONDED); |
| // boolean requiresChunkedResponse = servConn |
| // .getTransientFlag(REQUIRES_CHUNKED_RESPONSE); |
| // boolean potentialModification = servConn.getPotentialModification(); |
| // |
| // try { |
| // try { |
| // if (requiresResponse && !responded) { |
| // if (requiresChunkedResponse) { |
| // writeChunkedException(msg, e, false, servConn); |
| // } |
| // else { |
| // writeException(msg, e, false, servConn); |
| // } |
| // servConn.setAsTrue(RESPONDED); |
| // } |
| // } |
| // finally { // inner try-finally to ensure proper ordering of logging |
| // if (potentialModification) { |
| // int transId = (msg != null) ? msg.getTransactionId() |
| // : Integer.MIN_VALUE; |
| // } |
| // } |
| // } |
| // catch (IOException ioe) { |
| // if (logger.isDebugEnabled()) { |
| // logger.fine(servConn.getName() |
| // + ": Unexpected IOException writing security exception: ", ioe); |
| // } |
| // } |
| // } |
| |
| private static void handleExceptionNoDisconnect(Message msg, |
| ServerConnection servConn, Exception e) { |
| boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE); |
| boolean responded = servConn.getTransientFlag(RESPONDED); |
| boolean requiresChunkedResponse = servConn |
| .getTransientFlag(REQUIRES_CHUNKED_RESPONSE); |
| boolean potentialModification = servConn.getPotentialModification(); |
| boolean wroteExceptionResponse = false; |
| |
| try { |
| try { |
| if (requiresResponse && !responded) { |
| if (requiresChunkedResponse) { |
| writeChunkedException(msg, e, false, servConn); |
| } |
| else { |
| writeException(msg, e, false, servConn); |
| } |
| wroteExceptionResponse = true; |
| servConn.setAsTrue(RESPONDED); |
| } |
| } |
| finally { // inner try-finally to ensure proper ordering of logging |
| if (potentialModification) { |
| int transId = (msg != null) ? msg.getTransactionId() |
| : Integer.MIN_VALUE; |
| if (!wroteExceptionResponse) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3, |
| new Object[] {servConn.getName(),servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}", servConn.getName(), |
| servConn.getModRegion(), servConn.getModKey(), transId, e); |
| } |
| } |
| } |
| else { |
| if (!wroteExceptionResponse) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION, |
| servConn.getName()), e); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Exception: {}", servConn.getName(), e.getMessage(), e); |
| } |
| } |
| } |
| } |
| } |
| catch (IOException ioe) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), ioe.getMessage(), ioe); |
| } |
| } |
| } |
| |
| private static void handleThrowable(Message msg, ServerConnection servConn, |
| Throwable th) { |
| boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE); |
| boolean responded = servConn.getTransientFlag(RESPONDED); |
| boolean requiresChunkedResponse = servConn |
| .getTransientFlag(REQUIRES_CHUNKED_RESPONSE); |
| boolean potentialModification = servConn.getPotentialModification(); |
| |
| try { |
| try { |
| if (th instanceof Error) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER, |
| servConn.getName()), th); |
| } |
| if (requiresResponse && !responded) { |
| if (requiresChunkedResponse) { |
| writeChunkedException(msg, th, false, servConn); |
| } |
| else { |
| writeException(msg, th, false, servConn); |
| } |
| servConn.setAsTrue(RESPONDED); |
| } |
| } |
| finally { // inner try-finally to ensure proper ordering of logging |
| if (th instanceof Error) { |
| // log nothing |
| } else if (th instanceof CancelException) { |
| // log nothing |
| } else { |
| if (potentialModification) { |
| int transId = (msg != null) ? msg.getTransactionId() |
| : Integer.MIN_VALUE; |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3, |
| new Object[] {servConn.getName(),servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), th); |
| } |
| else { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION, |
| servConn.getName()), th); |
| } |
| } |
| } |
| } catch (IOException ioe) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), ioe.getMessage(), ioe); |
| } |
| } finally { |
| servConn.setFlagProcessMessagesAsFalse(); |
| } |
| } |
| |
| protected static void writeChunkedException(Message origMsg, Throwable e, |
| boolean isSevere, ServerConnection servConn) throws IOException { |
| writeChunkedException(origMsg, e, isSevere, servConn, servConn.getChunkedResponseMessage()); |
| } |
| |
| protected static void writeChunkedException(Message origMsg, Throwable e, |
| boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse) throws IOException { |
| writeChunkedException(origMsg, e, isSevere, servConn, originalReponse, 2); |
| } |
| |
| protected static void writeChunkedException(Message origMsg, Throwable e, |
| boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse, int numOfParts) throws IOException { |
| ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); |
| chunkedResponseMsg.setServerConnection(servConn); |
| if (originalReponse.headerHasBeenSent()) { |
| //chunkedResponseMsg = originalReponse; |
| // fix for bug 35442 |
| chunkedResponseMsg.setNumberOfParts(numOfParts); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts); |
| chunkedResponseMsg.addObjPart(e); |
| if (numOfParts == 2) { |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e); |
| } |
| } |
| else { |
| chunkedResponseMsg.setMessageType(MessageType.EXCEPTION); |
| chunkedResponseMsg.setNumberOfParts(numOfParts); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addObjPart(e); |
| if (numOfParts == 2) { |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e); |
| } |
| } |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| // Get the exception stacktrace for native clients |
| public static String getExceptionTrace(Throwable ex) { |
| StringWriter sw = new StringWriter(); |
| PrintWriter pw = new PrintWriter(sw); |
| ex.printStackTrace(pw); |
| pw.close(); |
| return sw.toString(); |
| } |
| |
| protected static void writeException(Message origMsg, Throwable e, |
| boolean isSevere, ServerConnection servConn) throws IOException { |
| writeException(origMsg, MessageType.EXCEPTION, e, isSevere, servConn); |
| } |
| |
| protected static void writeException(Message origMsg, int msgType, Throwable e, |
| boolean isSevere, ServerConnection servConn) throws IOException { |
| Message errorMsg = servConn.getErrorResponseMessage(); |
| errorMsg.setMessageType(msgType); |
| errorMsg.setNumberOfParts(2); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| if (isSevere) { |
| String msg = e.getMessage(); |
| if (msg == null) { |
| msg = e.toString(); |
| } |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.BaseCommand_SEVERE_CACHE_EXCEPTION_0, msg)); |
| } |
| errorMsg.addObjPart(e); |
| errorMsg.addStringPart(getExceptionTrace(e)); |
| errorMsg.send(servConn); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Wrote exception: {}", servConn.getName(), e.getMessage(), e); |
| } |
| } |
| |
| protected static void writeErrorResponse(Message origMsg, int messageType, |
| ServerConnection servConn) throws IOException { |
| Message errorMsg = servConn.getErrorResponseMessage(); |
| errorMsg.setMessageType(messageType); |
| errorMsg.setNumberOfParts(1); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| errorMsg |
| .addStringPart(LocalizedStrings.BaseCommand_INVALID_DATA_RECEIVED_PLEASE_SEE_THE_CACHE_SERVER_LOG_FILE_FOR_ADDITIONAL_DETAILS.toLocalizedString()); |
| errorMsg.send(servConn); |
| } |
| |
| protected static void writeErrorResponse(Message origMsg, int messageType, |
| String msg, ServerConnection servConn) throws IOException { |
| Message errorMsg = servConn.getErrorResponseMessage(); |
| errorMsg.setMessageType(messageType); |
| errorMsg.setNumberOfParts(1); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| errorMsg.addStringPart(msg); |
| errorMsg.send(servConn); |
| } |
| |
| protected static void writeRegionDestroyedEx(Message msg, String regionName, |
| String title, ServerConnection servConn) throws IOException { |
| String reason = servConn.getName() + ": Region named " + regionName + title; |
| RegionDestroyedException ex = new RegionDestroyedException(reason, |
| regionName); |
| if (servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) { |
| writeChunkedException(msg, ex, false, servConn); |
| } |
| else { |
| writeException(msg, ex, false, servConn); |
| } |
| } |
| |
| protected static void writeResponse(Object data, Object callbackArg, |
| Message origMsg, boolean isObject, ServerConnection servConn) |
| throws IOException { |
| Message responseMsg = servConn.getResponseMessage(); |
| responseMsg.setMessageType(MessageType.RESPONSE); |
| responseMsg.setTransactionId(origMsg.getTransactionId()); |
| |
| |
| if (callbackArg == null) { |
| responseMsg.setNumberOfParts(1); |
| } |
| else { |
| responseMsg.setNumberOfParts(2); |
| } |
| if (data instanceof byte[]) { |
| responseMsg.addRawPart((byte[])data, isObject); |
| } |
| else { |
| Assert.assertTrue(isObject, |
| "isObject should be true when value is not a byte[]"); |
| responseMsg.addObjPart(data, zipValues); |
| } |
| if (callbackArg != null) { |
| responseMsg.addObjPart(callbackArg); |
| } |
| servConn.getCache().getCancelCriterion().checkCancelInProgress(null); |
| responseMsg.send(servConn); |
| origMsg.clearParts(); |
| } |
| |
| protected static void writeResponseWithRefreshMetadata(Object data, |
| Object callbackArg, Message origMsg, boolean isObject, |
| ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException { |
| Message responseMsg = servConn.getResponseMessage(); |
| responseMsg.setMessageType(MessageType.RESPONSE); |
| responseMsg.setTransactionId(origMsg.getTransactionId()); |
| |
| if (callbackArg == null) { |
| responseMsg.setNumberOfParts(2); |
| } |
| else { |
| responseMsg.setNumberOfParts(3); |
| } |
| |
| if (data instanceof byte[]) { |
| responseMsg.addRawPart((byte[])data, isObject); |
| } |
| else { |
| Assert.assertTrue(isObject, |
| "isObject should be true when value is not a byte[]"); |
| responseMsg.addObjPart(data, zipValues); |
| } |
| if (callbackArg != null) { |
| responseMsg.addObjPart(callbackArg); |
| } |
| responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop}); |
| servConn.getCache().getCancelCriterion().checkCancelInProgress(null); |
| responseMsg.send(servConn); |
| origMsg.clearParts(); |
| } |
| |
| protected static void writeResponseWithFunctionAttribute(byte[] data, |
| Message origMsg, ServerConnection servConn) throws IOException { |
| Message responseMsg = servConn.getResponseMessage(); |
| responseMsg.setMessageType(MessageType.RESPONSE); |
| responseMsg.setTransactionId(origMsg.getTransactionId()); |
| responseMsg.setNumberOfParts(1); |
| responseMsg.addBytesPart(data); |
| servConn.getCache().getCancelCriterion().checkCancelInProgress(null); |
| responseMsg.send(servConn); |
| origMsg.clearParts(); |
| } |
| |
| static protected void checkForInterrupt(ServerConnection servConn, Exception e) |
| throws InterruptedException, InterruptedIOException { |
| servConn.getCachedRegionHelper().checkCancelInProgress(e); |
| if (e instanceof InterruptedException) { |
| throw (InterruptedException)e; |
| } |
| if (e instanceof InterruptedIOException) { |
| throw (InterruptedIOException)e; |
| } |
| } |
| |
| protected static void writeQueryResponseChunk(Object queryResponseChunk, |
| CollectionType collectionType, boolean lastChunk, |
| ServerConnection servConn) throws IOException { |
| ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage(); |
| queryResponseMsg.setNumberOfParts(2); |
| queryResponseMsg.setLastChunk(lastChunk); |
| queryResponseMsg.addObjPart(collectionType, zipValues); |
| queryResponseMsg.addObjPart(queryResponseChunk, zipValues); |
| queryResponseMsg.sendChunk(servConn); |
| } |
| |
| protected static void writeQueryResponseException(Message origMsg, |
| Throwable e, boolean isSevere, ServerConnection servConn) |
| throws IOException { |
| ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage(); |
| ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); |
| if (queryResponseMsg.headerHasBeenSent()) { |
| // fix for bug 35442 |
| // This client is expecting 2 parts in this message so send 2 parts |
| queryResponseMsg.setServerConnection(servConn); |
| queryResponseMsg.setNumberOfParts(2); |
| queryResponseMsg.setLastChunkAndNumParts(true, 2); |
| queryResponseMsg.addObjPart(e); |
| queryResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e); |
| } |
| queryResponseMsg.sendChunk(servConn); |
| } |
| else { |
| chunkedResponseMsg.setServerConnection(servConn); |
| chunkedResponseMsg.setMessageType(MessageType.EXCEPTION); |
| chunkedResponseMsg.setNumberOfParts(2); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, 2); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addObjPart(e); |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e); |
| } |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| } |
| |
| protected static void writeChunkedErrorResponse(Message origMsg, |
| int messageType, String message, ServerConnection servConn) |
| throws IOException { |
| // Send chunked response header identifying error message |
| ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); |
| if (logger.isDebugEnabled()) { |
| logger.debug(servConn.getName() + ": Sending error message header type: " |
| + messageType + " transaction: " + origMsg.getTransactionId()); |
| } |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| |
| // Send actual error |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message); |
| } |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(true); |
| chunkedResponseMsg.addStringPart(message); |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| protected static void writeFunctionResponseException(Message origMsg, |
| int messageType, String message, ServerConnection servConn, Throwable e) |
| throws IOException { |
| ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); |
| ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); |
| if (functionResponseMsg.headerHasBeenSent()) { |
| functionResponseMsg.setServerConnection(servConn); |
| functionResponseMsg.setNumberOfParts(2); |
| functionResponseMsg.setLastChunkAndNumParts(true,2); |
| functionResponseMsg.addObjPart(e); |
| functionResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e); |
| } |
| functionResponseMsg.sendChunk(servConn); |
| } |
| else { |
| chunkedResponseMsg.setServerConnection(servConn); |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setNumberOfParts(2); |
| chunkedResponseMsg.setLastChunkAndNumParts(true,2); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addObjPart(e); |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e); |
| } |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| } |
| |
| protected static void writeFunctionResponseError(Message origMsg, |
| int messageType, String message, ServerConnection servConn) |
| throws IOException { |
| ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); |
| ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); |
| if (functionResponseMsg.headerHasBeenSent()) { |
| functionResponseMsg.setNumberOfParts(1); |
| functionResponseMsg.setLastChunk(true); |
| functionResponseMsg.addStringPart(message); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending Error chunk while reply in progress: {}", servConn.getName(), message); |
| } |
| functionResponseMsg.sendChunk(servConn); |
| } |
| else { |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(true); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addStringPart(message); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending Error chunk: {}", servConn.getName(), message); |
| } |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| } |
| |
| protected static void writeKeySetErrorResponse(Message origMsg, |
| int messageType, String message, ServerConnection servConn) |
| throws IOException { |
| // Send chunked response header identifying error message |
| ChunkedMessage chunkedResponseMsg = servConn.getKeySetResponseMessage(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message header type: {} transaction: {}", |
| servConn.getName(), messageType, origMsg.getTransactionId()); |
| } |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| // Send actual error |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message); |
| } |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(true); |
| chunkedResponseMsg.addStringPart(message); |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| static Message readRequest(ServerConnection servConn) { |
| Message requestMsg = null; |
| try { |
| requestMsg = servConn.getRequestMessage(); |
| requestMsg.recv(servConn, MAX_INCOMING_DATA, incomingDataLimiter, |
| MAX_INCOMING_MSGS, incomingMsgLimiter); |
| return requestMsg; |
| } |
| catch (EOFException eof) { |
| handleEOFException(null, servConn, eof); |
| // TODO:Asif: Check if there is any need for explicitly returning |
| |
| } |
| catch (InterruptedIOException e) { // Solaris only |
| handleInterruptedIOException(null, servConn, e); |
| |
| } |
| catch (IOException e) { |
| handleIOException(null, servConn, e); |
| |
| } |
| catch (DistributedSystemDisconnectedException e) { |
| handleShutdownException(null, servConn, e); |
| |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| handleThrowable(null, servConn, e); |
| } |
| return requestMsg; |
| } |
| |
| protected static void fillAndSendRegisterInterestResponseChunks( |
| LocalRegion region, Object riKey, int interestType, |
| InterestResultPolicy policy, ServerConnection servConn) |
| throws IOException { |
| fillAndSendRegisterInterestResponseChunks(region, riKey, interestType, |
| false, policy, servConn); |
| } |
| |
| /* |
| * serializeValues is unused for clients < GFE_80 |
| */ |
| protected static void fillAndSendRegisterInterestResponseChunks( |
| LocalRegion region, Object riKey, int interestType, boolean serializeValues, |
| InterestResultPolicy policy, ServerConnection servConn) |
| throws IOException { |
| // Client is not interested. |
| if (policy.isNone()) { |
| sendRegisterInterestResponseChunk(region, riKey, new ArrayList(), true, |
| servConn); |
| return; |
| } |
| if (policy.isKeysValues() |
| && servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) { |
| handleKeysValuesPolicy(region, riKey, interestType, serializeValues, servConn); |
| return; |
| } |
| if (riKey instanceof List) { |
| handleList(region, (List)riKey, policy, servConn); |
| return; |
| } |
| if (!(riKey instanceof String)) { |
| handleSingleton(region, riKey, policy, servConn); |
| return; |
| } |
| |
| switch (interestType) { |
| case InterestType.OQL_QUERY: |
| // Not supported yet |
| throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); |
| case InterestType.FILTER_CLASS: |
| throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); |
| // handleFilter(region, (String)riKey, policy); |
| // break; |
| case InterestType.REGULAR_EXPRESSION: { |
| String regEx = (String)riKey; |
| if (regEx.equals(".*")) { |
| handleAllKeys(region, policy, servConn); |
| } |
| else { |
| handleRegEx(region, regEx, policy, servConn); |
| } |
| } |
| break; |
| case InterestType.KEY: |
| if (riKey.equals("ALL_KEYS")) { |
| handleAllKeys(region, policy, servConn); |
| } |
| else { |
| handleSingleton(region, riKey, policy, servConn); |
| } |
| break; |
| default: |
| throw new InternalGemFireError(LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString()); |
| } |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private static void handleKeysValuesPolicy(LocalRegion region, Object riKey, |
| int interestType, boolean serializeValues, ServerConnection servConn) |
| throws IOException { |
| if (riKey instanceof List) { |
| handleKVList(region, (List)riKey, serializeValues, servConn); |
| return; |
| } |
| if (!(riKey instanceof String)) { |
| handleKVSingleton(region, riKey, serializeValues, servConn); |
| return; |
| } |
| |
| switch (interestType) { |
| case InterestType.OQL_QUERY: |
| throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); |
| case InterestType.FILTER_CLASS: |
| throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); |
| case InterestType.REGULAR_EXPRESSION: |
| String regEx = (String)riKey; |
| if (regEx.equals(".*")) { |
| handleKVAllKeys(region, null, serializeValues, servConn); |
| } else { |
| handleKVAllKeys(region, regEx, serializeValues, servConn); |
| } |
| break; |
| case InterestType.KEY: |
| if (riKey.equals("ALL_KEYS")) { |
| handleKVAllKeys(region, null, serializeValues, servConn); |
| } else { |
| handleKVSingleton(region, riKey, serializeValues, servConn); |
| } |
| break; |
| default: |
| throw new InternalGemFireError(LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString()); |
| } |
| } |
| |
| /** |
| * @param list |
| * is a List of entry keys |
| */ |
| protected static void sendRegisterInterestResponseChunk(Region region, |
| Object riKey, ArrayList list, boolean lastChunk, ServerConnection servConn) |
| throws IOException { |
| ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(lastChunk); |
| chunkedResponseMsg.addObjPart(list, zipValues); |
| String regionName = (region == null) ? " null " : region.getFullPath(); |
| if (logger.isDebugEnabled()) { |
| String str = servConn.getName() + ": Sending" |
| + (lastChunk ? " last " : " ") |
| + "register interest response chunk for region: " + regionName |
| + " for keys: " + riKey + " chunk=<" + chunkedResponseMsg + ">"; |
| logger.debug(str); |
| } |
| |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| /** |
| * Determines whether keys for destroyed entries (tombstones) should be sent |
| * to clients in register-interest results. |
| * |
| * @param servConn |
| * @param policy |
| * @return true if tombstones should be sent to the client |
| */ |
| private static boolean sendTombstonesInRIResults(ServerConnection servConn, InterestResultPolicy policy) { |
| return (policy == InterestResultPolicy.KEYS_VALUES) |
| && (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0); |
| } |
| |
| /** |
| * Process an interest request involving a list of keys |
| * |
| * @param region |
| * the region |
| * @param keyList |
| * the list of keys |
| * @param policy |
| * the policy |
| * @throws IOException |
| */ |
| private static void handleList(LocalRegion region, List keyList, |
| InterestResultPolicy policy, ServerConnection servConn) |
| throws IOException { |
| if (region instanceof PartitionedRegion) { |
| // too bad java doesn't provide another way to do this... |
| handleListPR((PartitionedRegion)region, keyList, policy, servConn); |
| return; |
| } |
| ArrayList newKeyList = new ArrayList(maximumChunkSize); |
| // Handle list of keys |
| if (region != null) { |
| for (Iterator it = keyList.iterator(); it.hasNext();) { |
| Object entryKey = it.next(); |
| if (region.containsKey(entryKey) |
| || (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) { |
| |
| appendInterestResponseKey(region, keyList, entryKey, newKeyList, |
| "list", servConn); |
| } |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, |
| servConn); |
| } |
| |
| /** |
| * Handles both RR and PR cases |
| */ |
| @SuppressWarnings("rawtypes") |
| @SuppressFBWarnings(value="NP_NULL_PARAM_DEREF", justification="Null value handled in sendNewRegisterInterestResponseChunk()") |
| private static void handleKVSingleton(LocalRegion region, Object entryKey, |
| boolean serializeValues, ServerConnection servConn) |
| throws IOException { |
| VersionedObjectList values = new VersionedObjectList(maximumChunkSize, |
| true, region == null ? true : region.getAttributes() |
| .getConcurrencyChecksEnabled(), serializeValues); |
| |
| if (region != null) { |
| if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) { |
| EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder(); |
| ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); |
| // From Get70.getValueAndIsObject() |
| Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false); |
| VersionTag vt = versionHolder.getVersionTag(); |
| |
| updateValues(values, entryKey, data, vt); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, entryKey, values, true, servConn); |
| } |
| |
| /** |
| * Process an interest request consisting of a single key |
| * |
| * @param region |
| * the region |
| * @param entryKey |
| * the key |
| * @param policy |
| * the policy |
| * @throws IOException |
| */ |
| private static void handleSingleton(LocalRegion region, Object entryKey, |
| InterestResultPolicy policy, ServerConnection servConn) |
| throws IOException { |
| ArrayList keyList = new ArrayList(1); |
| if (region != null) { |
| if (region.containsKey(entryKey) || |
| (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) { |
| appendInterestResponseKey(region, entryKey, entryKey, keyList, |
| "individual", servConn); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, entryKey, keyList, true, servConn); |
| } |
| |
| /** |
| * Process an interest request of type ALL_KEYS |
| * |
| * @param region |
| * the region |
| * @param policy |
| * the policy |
| * @throws IOException |
| */ |
| private static void handleAllKeys(LocalRegion region, |
| InterestResultPolicy policy, ServerConnection servConn) |
| throws IOException { |
| ArrayList keyList = new ArrayList(maximumChunkSize); |
| if (region != null) { |
| for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it.hasNext();) { |
| appendInterestResponseKey(region, "ALL_KEYS", it.next(), keyList, |
| "ALL_KEYS", servConn); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true, |
| servConn); |
| } |
| |
| /** |
| * @param region |
| * @param regex |
| * @param serializeValues |
| * @param servConn |
| * @throws IOException |
| */ |
| private static void handleKVAllKeys(LocalRegion region, String regex, |
| boolean serializeValues, ServerConnection servConn) throws IOException { |
| |
| if (region != null && region instanceof PartitionedRegion) { |
| handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn); |
| return; |
| } |
| |
| VersionedObjectList values = new VersionedObjectList(maximumChunkSize, |
| true, region == null ? true : region.getAttributes() |
| .getConcurrencyChecksEnabled(), serializeValues); |
| |
| if (region != null) { |
| |
| VersionTag versionTag = null; |
| Object data = null; |
| |
| Pattern keyPattern = null; |
| if (regex != null) { |
| keyPattern = Pattern.compile(regex); |
| } |
| |
| for (Object key : region.keySet(true)) { |
| EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder(); |
| if (keyPattern != null) { |
| if (!(key instanceof String)) { |
| // key is not a String, cannot apply regex to this entry |
| continue; |
| } |
| if (!keyPattern.matcher((String) key).matches()) { |
| // key does not match the regex, this entry should not be |
| // returned. |
| continue; |
| } |
| } |
| |
| ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); |
| data = region.get(key, null, true, true, true, id, versionHolder, true, false); |
| versionTag = versionHolder.getVersionTag(); |
| updateValues(values, key, data, versionTag); |
| |
| if (values.size() == maximumChunkSize) { |
| sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, false, servConn); |
| values.clear(); |
| } |
| } // for |
| } // if |
| |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, true, servConn); |
| } |
| |
| private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo, |
| boolean serializeValues, ServerConnection servConn) throws IOException { |
| int id = 0; |
| HashMap<Integer, HashSet> bucketKeys = null; |
| |
| VersionedObjectList values = new VersionedObjectList(maximumChunkSize, |
| true, region.getConcurrencyChecksEnabled(), serializeValues); |
| |
| if (keyInfo != null && keyInfo instanceof List) { |
| bucketKeys = new HashMap<Integer, HashSet>(); |
| for (Object key : (List) keyInfo) { |
| id = PartitionedRegionHelper.getHashKey(region, null, key, null, null); |
| if (bucketKeys.containsKey(id)) { |
| bucketKeys.get(id).add(key); |
| } else { |
| HashSet<Object> keys = new HashSet<Object>(); |
| keys.add(key); |
| bucketKeys.put(id, keys); |
| } |
| } |
| region.fetchEntries(bucketKeys, values, servConn); |
| } else { // keyInfo is a String |
| region.fetchEntries((String)keyInfo, values, servConn); |
| } |
| |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, keyInfo != null ? keyInfo : "ALL_KEYS", values, true, servConn); |
| } |
| |
| /** |
| * Copied from Get70.getValueAndIsObject(), except a minor change. (Make the |
| * method static instead of copying it here?) |
| * |
| * @param value |
| */ |
| private static void updateValues(VersionedObjectList values, Object key, Object value, VersionTag versionTag) { |
| boolean isObject = true; |
| |
| // If the value in the VM is a CachedDeserializable, |
| // get its value. If it is Token.REMOVED, Token.DESTROYED, |
| // Token.INVALID, or Token.LOCAL_INVALID |
| // set it to null. If it is NOT_AVAILABLE, get the value from |
| // disk. If it is already a byte[], set isObject to false. |
| boolean wasInvalid = false; |
| if (value instanceof CachedDeserializable) { |
| value = ((CachedDeserializable)value).getValue(); |
| } |
| else if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2 || value == Token.DESTROYED || value == Token.TOMBSTONE) { |
| value = null; |
| } |
| else if (value == Token.INVALID || value == Token.LOCAL_INVALID) { |
| value = null; // fix for bug 35884 |
| wasInvalid = true; |
| } |
| else if (value instanceof byte[]) { |
| isObject = false; |
| } |
| boolean keyNotPresent = !wasInvalid && (value == null || value == Token.TOMBSTONE); |
| |
| if (keyNotPresent) { |
| values.addObjectPartForAbsentKey(key, value, versionTag); |
| } else { |
| values.addObjectPart(key, value, isObject, versionTag); |
| } |
| } |
| |
| public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region, |
| VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn) |
| throws IOException { |
| Object key = null; |
| EntryEventImpl versionHolder = null; |
| ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID(); |
| for (Iterator it = keySet.iterator(); it.hasNext();) { |
| key = it.next(); |
| versionHolder = EntryEventImpl.createVersionTagHolder(); |
| |
| Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false); |
| |
| updateValues(values, key, value, versionHolder.getVersionTag()); |
| |
| if (values.size() == maximumChunkSize) { |
| // Send the chunk and clear the list |
| // values.setKeys(null); // Now we need to send keys too. |
| sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn); |
| values.clear(); |
| } |
| } // for |
| } |
| |
| /** |
| * |
| * @param region |
| * @param values {@link VersionedObjectList} |
| * @param riKeys |
| * @param set set of entries |
| * @param servConn |
| * @throws IOException |
| */ |
| public static void appendNewRegisterInterestResponseChunk(LocalRegion region, |
| VersionedObjectList values, Object riKeys, Set set, ServerConnection servConn) |
| throws IOException { |
| for (Iterator<Map.Entry> it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = it.next(); // Region.Entry or Map.Entry |
| if (entry instanceof Region.Entry) { // local entries |
| VersionTag vt = null; |
| Object key = null; |
| Object value = null; |
| if (entry instanceof EntrySnapshot) { |
| vt = ((EntrySnapshot) entry).getVersionTag(); |
| key = ((EntrySnapshot) entry).getRegionEntry().getKey(); |
| value = ((EntrySnapshot) entry).getRegionEntry().getValue(null); |
| updateValues(values, key, value, vt); |
| } else { |
| VersionStamp vs = ((NonTXEntry)entry).getRegionEntry().getVersionStamp(); |
| vt = vs == null ? null : vs.asVersionTag(); |
| key = entry.getKey(); |
| value = ((NonTXEntry)entry).getRegionEntry()._getValueRetain(region, true); |
| try { |
| updateValues(values, key, value, vt); |
| } finally { |
| // TODO OFFHEAP: in the future we might want to delay this release |
| // until the "values" VersionedObjectList is released. |
| // But for now "updateValues" copies the off-heap value to the heap. |
| OffHeapHelper.release(value); |
| } |
| } |
| } else { // Map.Entry (remote entries) |
| ArrayList list = (ArrayList)entry.getValue(); |
| Object value = list.get(0); |
| VersionTag tag = (VersionTag)list.get(1); |
| updateValues(values, entry.getKey(), value, tag); |
| } |
| if (values.size() == maximumChunkSize) { |
| // Send the chunk and clear the list |
| // values.setKeys(null); // Now we need to send keys too. |
| sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn); |
| values.clear(); |
| } |
| } // for |
| } |
| |
| public static void sendNewRegisterInterestResponseChunk(LocalRegion region, |
| Object riKey, VersionedObjectList list, boolean lastChunk, ServerConnection servConn) |
| throws IOException { |
| ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(lastChunk); |
| chunkedResponseMsg.addObjPart(list, zipValues); |
| String regionName = (region == null) ? " null " : region.getFullPath(); |
| if (logger.isDebugEnabled()) { |
| String str = servConn.getName() + ": Sending" |
| + (lastChunk ? " last " : " ") |
| + "register interest response chunk for region: " + regionName |
| + " for keys: " + riKey + " chunk=<" + chunkedResponseMsg + ">"; |
| logger.debug(str); |
| } |
| |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| /** |
| * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION} |
| * |
| * @param region |
| * the region |
| * @param regex |
| * the regex |
| * @param policy |
| * the policy |
| * @throws IOException |
| */ |
| private static void handleRegEx(LocalRegion region, String regex, |
| InterestResultPolicy policy, ServerConnection servConn) |
| throws IOException { |
| if (region instanceof PartitionedRegion) { |
| // too bad java doesn't provide another way to do this... |
| handleRegExPR((PartitionedRegion)region, regex, policy, servConn); |
| return; |
| } |
| ArrayList keyList = new ArrayList(maximumChunkSize); |
| // Handle the regex pattern |
| Pattern keyPattern = Pattern.compile(regex); |
| if (region != null) { |
| for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it.hasNext();) { |
| Object entryKey = it.next(); |
| if (!(entryKey instanceof String)) { |
| // key is not a String, cannot apply regex to this entry |
| continue; |
| } |
| if (!keyPattern.matcher((String)entryKey).matches()) { |
| // key does not match the regex, this entry should not be returned. |
| continue; |
| } |
| |
| appendInterestResponseKey(region, regex, entryKey, keyList, "regex", |
| servConn); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn); |
| } |
| |
| /** |
| * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION} |
| * |
| * @param region |
| * the region |
| * @param regex |
| * the regex |
| * @param policy |
| * the policy |
| * @throws IOException |
| */ |
| private static void handleRegExPR(final PartitionedRegion region, |
| final String regex, final InterestResultPolicy policy, |
| final ServerConnection servConn) throws IOException { |
| final ArrayList keyList = new ArrayList(maximumChunkSize); |
| region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() { |
| public void receiveSet(Set theSet) throws IOException { |
| appendInterestResponseKeys(region, regex, theSet, keyList, "regex", |
| servConn); |
| } |
| }); |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn); |
| } |
| |
| /** |
| * Process an interest request involving a list of keys |
| * |
| * @param region |
| * the region |
| * @param keyList |
| * the list of keys |
| * @param policy |
| * the policy |
| * @throws IOException |
| */ |
| private static void handleListPR(final PartitionedRegion region, |
| final List keyList, final InterestResultPolicy policy, |
| final ServerConnection servConn) throws IOException { |
| final ArrayList newKeyList = new ArrayList(maximumChunkSize); |
| region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() { |
| public void receiveSet(Set theSet) throws IOException { |
| appendInterestResponseKeys(region, keyList, theSet, newKeyList, "list", |
| servConn); |
| } |
| }); |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, |
| servConn); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private static void handleKVList(final LocalRegion region, |
| final List keyList, boolean serializeValues, |
| final ServerConnection servConn) throws IOException { |
| |
| if (region != null && region instanceof PartitionedRegion) { |
| handleKVKeysPR((PartitionedRegion)region, keyList, serializeValues, servConn); |
| return; |
| } |
| VersionedObjectList values = new VersionedObjectList(maximumChunkSize, |
| true, region == null ? true : region.getAttributes() |
| .getConcurrencyChecksEnabled(), serializeValues); |
| |
| // Handle list of keys |
| if (region != null) { |
| VersionTag versionTag = null; |
| Object data = null; |
| |
| for (Iterator it = keyList.iterator(); it.hasNext();) { |
| Object key = it.next(); |
| if (region.containsKey(key) || region.containsTombstone(key)) { |
| EntryEventImpl versionHolder = EntryEventImpl |
| .createVersionTagHolder(); |
| |
| ClientProxyMembershipID id = servConn == null ? null : servConn |
| .getProxyID(); |
| data = region.get(key, null, true, true, true, id, versionHolder, |
| true, false); |
| versionTag = versionHolder.getVersionTag(); |
| updateValues(values, key, data, versionTag); |
| |
| if (values.size() == maximumChunkSize) { |
| // Send the chunk and clear the list |
| // values.setKeys(null); // Now we need to send keys too. |
| sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn); |
| values.clear(); |
| } |
| } |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn); |
| } |
| |
| /** |
| * Append an interest response |
| * |
| * @param region |
| * the region (for debugging) |
| * @param riKey |
| * the registerInterest "key" (what the client is interested |
| * in) |
| * @param entryKey |
| * key we're responding to |
| * @param list |
| * list to append to |
| * @param kind |
| * for debugging |
| */ |
| private static void appendInterestResponseKey(LocalRegion region, |
| Object riKey, Object entryKey, ArrayList list, String kind, |
| ServerConnection servConn) throws IOException { |
| list.add(entryKey); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}", |
| servConn.getName(), entryKey, list.size(), region.getFullPath()); |
| } |
| if (list.size() == maximumChunkSize) { |
| // Send the chunk and clear the list |
| sendRegisterInterestResponseChunk(region, riKey, list, false, servConn); |
| list.clear(); |
| } |
| } |
| |
| protected static void appendInterestResponseKeys(LocalRegion region, |
| Object riKey, Collection entryKeys, ArrayList collector, String riDescr, |
| ServerConnection servConn) throws IOException { |
| for (Iterator it = entryKeys.iterator(); it.hasNext();) { |
| appendInterestResponseKey(region, riKey, it.next(), collector, riDescr, |
| servConn); |
| } |
| } |
| } |