| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.Semaphore; |
| import java.util.regex.Pattern; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressWarnings; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CopyException; |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.SerializationException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.CacheLoaderException; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.TransactionException; |
| import org.apache.geode.cache.persistence.PartitionOfflineException; |
| import org.apache.geode.cache.query.types.CollectionType; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.DistributionStats; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.CachedDeserializable; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.EntrySnapshot; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.FindVersionTagOperation; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalRegion; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.NonTXEntry; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionHelper; |
| import org.apache.geode.internal.cache.TXManagerImpl; |
| import org.apache.geode.internal.cache.TXStateProxy; |
| import org.apache.geode.internal.cache.Token; |
| import org.apache.geode.internal.cache.VersionTagHolder; |
| import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; |
| import org.apache.geode.internal.cache.tier.CachedRegionHelper; |
| import org.apache.geode.internal.cache.tier.Command; |
| import org.apache.geode.internal.cache.tier.InterestType; |
| import org.apache.geode.internal.cache.tier.MessageType; |
| import org.apache.geode.internal.cache.versions.VersionStamp; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.offheap.OffHeapHelper; |
| import org.apache.geode.internal.security.SecurityService; |
| import org.apache.geode.internal.sequencelog.EntryLogger; |
| import org.apache.geode.internal.serialization.KnownVersion; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.security.GemFireSecurityException; |
| import org.apache.geode.util.internal.GeodeGlossary; |
| |
| public abstract class BaseCommand implements Command { |
| protected static final Logger logger = LogService.getLogger(); |
| |
| @Immutable |
| private static final byte[] OK_BYTES = new byte[] {0}; |
| |
| public static final int MAXIMUM_CHUNK_SIZE = |
| Integer.getInteger("BridgeServer.MAXIMUM_CHUNK_SIZE", 100); |
| |
| /** Whether to suppress logging of IOExceptions */ |
| private static final boolean SUPPRESS_IO_EXCEPTION_LOGGING = |
| Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "bridge.suppressIOExceptionLogging"); |
| |
| /** |
| * Maximum number of concurrent incoming client message bytes that a cache server will allow. |
| * Once a server is working on this number additional incoming client messages will wait until one |
| * of them completes or fails. The bytes are computed based in the size sent in the incoming msg |
| * header. |
| */ |
| private static final int MAX_INCOMING_DATA = |
| Integer.getInteger("BridgeServer.MAX_INCOMING_DATA", -1); |
| |
| /** |
| * Maximum number of concurrent incoming client messages that a cache server will allow. Once a |
| * server is working on this number additional incoming client messages will wait until one of |
| * them completes or fails. |
| */ |
| private static final int MAX_INCOMING_MESSAGES = |
| Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1); |
| |
| @MakeNotStatic |
| private static final Semaphore INCOMING_DATA_LIMITER; |
| |
| @MakeNotStatic |
| private static final Semaphore INCOMING_MSG_LIMITER; |
| |
| static { |
| Semaphore semaphore; |
| if (MAX_INCOMING_DATA > 0) { |
| // backport requires that this is fair since we inc by values > 1 |
| semaphore = new Semaphore(MAX_INCOMING_DATA, true); |
| } else { |
| semaphore = null; |
| } |
| INCOMING_DATA_LIMITER = semaphore; |
| if (MAX_INCOMING_MESSAGES > 0) { |
| // unfair for best performance |
| semaphore = new Semaphore(MAX_INCOMING_MESSAGES, false); |
| } else { |
| semaphore = null; |
| } |
| INCOMING_MSG_LIMITER = semaphore; |
| } |
| |
| protected static byte[] okBytes() { |
| return OK_BYTES; |
| } |
| |
| protected boolean setLastResultReceived( |
| ServerToClientFunctionResultSender resultSender) { |
| |
| if (resultSender != null) { |
| synchronized (resultSender) { |
| if (resultSender.isLastResultReceived()) { |
| return false; |
| } else { |
| resultSender.setLastResultReceived(true); |
| } |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public void execute(Message clientMessage, ServerConnection serverConnection, |
| SecurityService securityService) { |
| // Read the request and update the statistics |
| long start = DistributionStats.getStatTime(); |
| if (EntryLogger.isEnabled() && serverConnection != null) { |
| EntryLogger.setSource(serverConnection.getMembershipID(), "c2s"); |
| } |
| boolean shouldMasquerade = shouldMasqueradeForTx(clientMessage, serverConnection); |
| try { |
| if (shouldMasquerade) { |
| InternalCache cache = serverConnection.getCache(); |
| InternalDistributedMember member = |
| (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); |
| TXManagerImpl txMgr = cache.getTxManager(); |
| TXStateProxy tx = null; |
| try { |
| tx = txMgr.masqueradeAs(clientMessage, member, false); |
| cmdExecute(clientMessage, serverConnection, securityService, start); |
| tx.updateProxyServer(txMgr.getMemberId()); |
| } finally { |
| txMgr.unmasquerade(tx); |
| } |
| } else { |
| cmdExecute(clientMessage, serverConnection, securityService, start); |
| } |
| |
| } catch (TransactionException | CopyException | SerializationException | CacheWriterException |
| | CacheLoaderException | GemFireSecurityException | PartitionOfflineException |
| | MessageTooLargeException e) { |
| handleExceptionNoDisconnect(clientMessage, serverConnection, e); |
| } catch (EOFException eof) { |
| BaseCommand.handleEOFException(clientMessage, serverConnection, eof); |
| } catch (InterruptedIOException e) { // Solaris only |
| BaseCommand.handleInterruptedIOException(serverConnection, e); |
| } catch (IOException e) { |
| BaseCommand.handleIOException(clientMessage, serverConnection, e); |
| } catch (DistributedSystemDisconnectedException e) { |
| BaseCommand.handleShutdownException(clientMessage, serverConnection, e); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable e) { |
| BaseCommand.handleThrowable(clientMessage, serverConnection, e); |
| } finally { |
| EntryLogger.clearSource(); |
| } |
| } |
| |
| /** |
| * checks to see if this thread needs to masquerade as a transactional thread. clients after |
| * GFE_66 should be able to start a transaction. |
| * |
| * @return true if thread should masquerade as a transactional thread. |
| */ |
| protected boolean shouldMasqueradeForTx(Message clientMessage, |
| ServerConnection serverConnection) { |
| return serverConnection.getClientVersion().isNotOlderThan(KnownVersion.GFE_66) |
| && clientMessage.getTransactionId() > TXManagerImpl.NOTX; |
| } |
| |
| /** |
| * If an operation is retried then some server may have seen it already. We cannot apply this |
| * operation to the cache without knowing whether a version tag has already been created for it. |
| * Otherwise caches that have seen the event already will reject it but others will not, but will |
| * have no version tag with which to perform concurrency checks. |
| * <p> |
| * The client event should have the event identifier from the client and the region affected by |
| * the operation. |
| */ |
| public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) { |
| InternalRegion r = clientEvent.getRegion(); |
| VersionTag tag = r.findVersionTagForEvent(clientEvent.getEventId()); |
| if (tag == null) { |
| if (r instanceof DistributedRegion || r instanceof PartitionedRegion) { |
| // TODO this could be optimized for partitioned regions by sending the key |
| // so that the PR could look at an individual bucket for the event |
| tag = FindVersionTagOperation.findVersionTag(r, clientEvent.getEventId(), false); |
| } |
| } |
| if (tag != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recovered version tag {} for replayed operation {}", tag, |
| clientEvent.getEventId()); |
| } |
| clientEvent.setVersionTag(tag); |
| } |
| return tag != null; |
| } |
| |
| /** |
| * If an operation is retried then some server may have seen it already. We cannot apply this |
| * operation to the cache without knowing whether a version tag has already been created for it. |
| * Otherwise caches that have seen the event already will reject it but others will not, but will |
| * have no version tag with which to perform concurrency checks. |
| * <p> |
| * The client event should have the event identifier from the client and the region affected by |
| * the operation. |
| */ |
| protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion region, EventID eventID) { |
| VersionTag tag = region.findVersionTagForClientBulkOp(eventID); |
| if (tag != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID); |
| } |
| return tag; |
| } |
| if (region instanceof DistributedRegion || region instanceof PartitionedRegion) { |
| // TODO this could be optimized for partitioned regions by sending the key |
| // so that the PR could look at an individual bucket for the event |
| tag = FindVersionTagOperation.findVersionTag(region, eventID, true); |
| } |
| if (tag != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID); |
| } |
| } |
| return tag; |
| } |
| |
| public abstract void cmdExecute(final Message clientMessage, |
| final ServerConnection serverConnection, final SecurityService securityService, |
| final long start) throws IOException, ClassNotFoundException, InterruptedException; |
| |
| protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException { |
| Message replyMsg = serverConnection.getReplyMessage(); |
| serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); |
| replyMsg.setMessageType(MessageType.REPLY); |
| replyMsg.setNumberOfParts(1); |
| replyMsg.setTransactionId(origMsg.getTransactionId()); |
| replyMsg.addBytesPart(okBytes()); |
| replyMsg.send(serverConnection); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: rpl tx: {}", serverConnection.getName(), origMsg.getTransactionId()); |
| } |
| } |
| |
| protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, |
| PartitionedRegion pr, byte nwHop) throws IOException { |
| Message replyMsg = serverConnection.getReplyMessage(); |
| serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); |
| replyMsg.setMessageType(MessageType.REPLY); |
| replyMsg.setNumberOfParts(1); |
| replyMsg.setTransactionId(origMsg.getTransactionId()); |
| replyMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop}); |
| replyMsg.send(serverConnection); |
| pr.getPrStats().incPRMetaDataSentCount(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: rpl with REFRESH_METADATA tx: {}", serverConnection.getName(), |
| origMsg.getTransactionId()); |
| } |
| } |
| |
| private static void handleEOFException(Message msg, ServerConnection serverConnection, |
| Exception eof) { |
| CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); |
| CacheServerStats stats = serverConnection.getCacheServerStats(); |
| boolean potentialModification = serverConnection.getPotentialModification(); |
| if (!crHelper.isShutdown()) { |
| if (potentialModification) { |
| stats.incAbandonedWriteRequests(); |
| } else { |
| stats.incAbandonedReadRequests(); |
| } |
| if (!SUPPRESS_IO_EXCEPTION_LOGGING) { |
| if (potentialModification) { |
| int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; |
| logger.warn( |
| "{}: EOFException during a write operation on region : {} key: {} messageId: {}", |
| new Object[] {serverConnection.getName(), serverConnection.getModRegion(), |
| serverConnection.getModKey(), transId}); |
| } else { |
| logger.debug("EOF exception", eof); |
| logger.warn("{}: connection disconnect detected by EOF.", |
| serverConnection.getName()); |
| } |
| } |
| } |
| serverConnection.setFlagProcessMessagesAsFalse(); |
| serverConnection.setClientDisconnectedException(eof); |
| } |
| |
| private static void handleInterruptedIOException(ServerConnection serverConnection, Exception e) { |
| CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); |
| if (!crHelper.isShutdown() && serverConnection.isOpen()) { |
| if (!SUPPRESS_IO_EXCEPTION_LOGGING) { |
| if (logger.isDebugEnabled()) |
| logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e); |
| } |
| } |
| serverConnection.setFlagProcessMessagesAsFalse(); |
| serverConnection.setClientDisconnectedException(e); |
| } |
| |
| private static void handleIOException(Message msg, ServerConnection serverConnection, |
| Exception e) { |
| CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); |
| boolean potentialModification = serverConnection.getPotentialModification(); |
| |
| if (!crHelper.isShutdown() && serverConnection.isOpen()) { |
| if (!SUPPRESS_IO_EXCEPTION_LOGGING) { |
| if (potentialModification) { |
| int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; |
| logger.warn(String.format( |
| "%s: Unexpected IOException during operation for region: %s key: %s messId: %s", |
| new Object[] {serverConnection.getName(), serverConnection.getModRegion(), |
| serverConnection.getModKey(), transId}), |
| e); |
| } else { |
| logger.warn(String.format("%s: Unexpected IOException: ", |
| serverConnection.getName()), e); |
| } |
| } |
| } |
| serverConnection.setFlagProcessMessagesAsFalse(); |
| serverConnection.setClientDisconnectedException(e); |
| } |
| |
| private static void handleShutdownException(Message msg, ServerConnection serverConnection, |
| Exception e) { |
| CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); |
| boolean potentialModification = serverConnection.getPotentialModification(); |
| |
| if (!crHelper.isShutdown()) { |
| if (potentialModification) { |
| int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; |
| logger.warn(String.format( |
| "%s: Unexpected ShutdownException during operation on region: %s key: %s messageId: %s", |
| new Object[] {serverConnection.getName(), serverConnection.getModRegion(), |
| serverConnection.getModKey(), transId}), |
| e); |
| } else { |
| logger.warn(String.format("%s: Unexpected ShutdownException: ", |
| serverConnection.getName()), |
| e); |
| } |
| } |
| serverConnection.setFlagProcessMessagesAsFalse(); |
| serverConnection.setClientDisconnectedException(e); |
| } |
| |
| private static void handleExceptionNoDisconnect(Message msg, ServerConnection serverConnection, |
| Exception e) { |
| boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE); |
| boolean responded = serverConnection.getTransientFlag(RESPONDED); |
| boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE); |
| boolean potentialModification = serverConnection.getPotentialModification(); |
| |
| try { |
| boolean wroteExceptionResponse = false; |
| try { |
| if (requiresResponse && !responded) { |
| if (requiresChunkedResponse) { |
| writeChunkedException(msg, e, serverConnection); |
| } else { |
| writeException(msg, e, false, serverConnection); |
| } |
| wroteExceptionResponse = true; |
| serverConnection.setAsTrue(RESPONDED); |
| } |
| } finally { // inner try-finally to ensure proper ordering of logging |
| if (potentialModification) { |
| int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; |
| if (!wroteExceptionResponse) { |
| logger.warn(String.format( |
| "%s: Unexpected Exception during operation on region: %s key: %s messageId: %s", |
| new Object[] {serverConnection.getName(), serverConnection.getModRegion(), |
| serverConnection.getModKey(), transId}), |
| e); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}", |
| serverConnection.getName(), serverConnection.getModRegion(), |
| serverConnection.getModKey(), transId, e); |
| } |
| } |
| } else { |
| if (!wroteExceptionResponse) { |
| logger.warn(String.format("%s: Unexpected Exception", |
| serverConnection.getName()), e); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Exception: {}", serverConnection.getName(), e.getMessage(), e); |
| } |
| } |
| } |
| } |
| } catch (IOException ioe) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(), |
| ioe.getMessage(), ioe); |
| } |
| } |
| } |
| |
| private static void handleThrowable(Message msg, ServerConnection serverConnection, |
| Throwable th) { |
| boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE); |
| boolean responded = serverConnection.getTransientFlag(RESPONDED); |
| boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE); |
| boolean potentialModification = serverConnection.getPotentialModification(); |
| |
| try { |
| try { |
| if (th instanceof Error) { |
| logger.fatal(String.format("%s : Unexpected Error on server", |
| serverConnection.getName()), |
| th); |
| } |
| if (requiresResponse && !responded) { |
| if (requiresChunkedResponse) { |
| writeChunkedException(msg, th, serverConnection); |
| } else { |
| writeException(msg, th, false, serverConnection); |
| } |
| serverConnection.setAsTrue(RESPONDED); |
| } |
| } finally { // inner try-finally to ensure proper ordering of logging |
| if (!(th instanceof Error || th instanceof CacheLoaderException)) { |
| if (potentialModification) { |
| int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; |
| logger.warn(String.format( |
| "%s: Unexpected Exception during operation on region: %s key: %s messageId: %s", |
| new Object[] {serverConnection.getName(), serverConnection.getModRegion(), |
| serverConnection.getModKey(), transId}), |
| th); |
| } else { |
| logger.warn(String.format("%s: Unexpected Exception", |
| serverConnection.getName()), th); |
| } |
| } |
| } |
| } catch (IOException ioe) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(), |
| ioe.getMessage(), ioe); |
| } |
| } finally { |
| serverConnection.setFlagProcessMessagesAsFalse(); |
| serverConnection.setClientDisconnectedException(th); |
| } |
| } |
| |
| protected static void writeChunkedException(Message origMsg, Throwable e, |
| ServerConnection serverConnection) throws IOException { |
| writeChunkedException(origMsg, e, serverConnection, |
| serverConnection.getChunkedResponseMessage()); |
| } |
| |
| protected static void writeChunkedException(Message origMsg, Throwable e, |
| ServerConnection serverConnection, ChunkedMessage originalResponse) throws IOException { |
| writeChunkedException(origMsg, e, serverConnection, originalResponse, 2); |
| } |
| |
| private static void writeChunkedException(Message origMsg, Throwable exception, |
| ServerConnection serverConnection, ChunkedMessage originalResponse, int numOfParts) |
| throws IOException { |
| Throwable e = getClientException(serverConnection, exception); |
| ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); |
| chunkedResponseMsg.setServerConnection(serverConnection); |
| if (originalResponse.headerHasBeenSent()) { |
| chunkedResponseMsg.setNumberOfParts(numOfParts); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts); |
| chunkedResponseMsg.addObjPart(e); |
| if (numOfParts == 2) { |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk while reply in progress: {}", |
| serverConnection.getName(), e.getMessage(), e); |
| } |
| } else { |
| chunkedResponseMsg.setMessageType(MessageType.EXCEPTION); |
| chunkedResponseMsg.setNumberOfParts(numOfParts); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addObjPart(e); |
| if (numOfParts == 2) { |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(), |
| e); |
| } |
| } |
| chunkedResponseMsg.sendChunk(serverConnection); |
| } |
| |
| // Get the exception stacktrace for native clients |
| public static String getExceptionTrace(Throwable ex) { |
| StringWriter sw = new StringWriter(); |
| PrintWriter pw = new PrintWriter(sw); |
| ex.printStackTrace(pw); |
| pw.close(); |
| return sw.toString(); |
| } |
| |
| protected static void writeException(Message origMsg, Throwable e, boolean isSevere, |
| ServerConnection serverConnection) throws IOException { |
| writeException(origMsg, MessageType.EXCEPTION, e, isSevere, serverConnection); |
| } |
| |
| private static Throwable getClientException(ServerConnection serverConnection, Throwable e) { |
| InternalCache cache = serverConnection.getCache(); |
| if (cache != null) { |
| OldClientSupportService svc = cache.getService(OldClientSupportService.class); |
| if (svc != null) { |
| return svc.getThrowable(e, serverConnection.getClientVersion()); |
| } |
| } |
| return e; |
| } |
| |
| protected static void writeException(Message origMsg, int msgType, Throwable e, boolean isSevere, |
| ServerConnection serverConnection) throws IOException { |
| Throwable theException = getClientException(serverConnection, e); |
| Message errorMsg = serverConnection.getErrorResponseMessage(); |
| errorMsg.setMessageType(msgType); |
| errorMsg.setNumberOfParts(2); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| if (isSevere) { |
| String msg = theException.getMessage(); |
| if (msg == null) { |
| msg = theException.toString(); |
| } |
| logger.fatal("Severe cache exception : {}", msg); |
| } |
| errorMsg.addObjPart(theException); |
| errorMsg.addStringPart(getExceptionTrace(theException)); |
| errorMsg.send(serverConnection); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Wrote exception: {}", serverConnection.getName(), e.getMessage(), e); |
| } |
| if (e instanceof MessageTooLargeException) { |
| throw (IOException) e; |
| } |
| } |
| |
| protected static void writeErrorResponse(Message origMsg, int messageType, |
| ServerConnection serverConnection) throws IOException { |
| Message errorMsg = serverConnection.getErrorResponseMessage(); |
| errorMsg.setMessageType(messageType); |
| errorMsg.setNumberOfParts(1); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| errorMsg.addStringPart( |
| "Invalid data received. Please see the cache server log file for additional details."); |
| errorMsg.send(serverConnection); |
| } |
| |
| protected static void writeErrorResponse(Message origMsg, int messageType, String msg, |
| ServerConnection serverConnection) throws IOException { |
| Message errorMsg = serverConnection.getErrorResponseMessage(); |
| errorMsg.setMessageType(messageType); |
| errorMsg.setNumberOfParts(1); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| errorMsg.addStringPart(msg); |
| errorMsg.send(serverConnection); |
| } |
| |
| protected static void writeRegionDestroyedEx(Message msg, String regionName, String reason, |
| ServerConnection serverConnection) throws IOException { |
| String exceptionMessage = serverConnection.getName() + ": Region named " + regionName + reason; |
| RegionDestroyedException ex = new RegionDestroyedException(exceptionMessage, regionName); |
| if (serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) { |
| writeChunkedException(msg, ex, serverConnection); |
| } else { |
| writeException(msg, ex, false, serverConnection); |
| } |
| } |
| |
| protected static void writeResponse(Object data, Object callbackArg, Message origMsg, |
| boolean isObject, ServerConnection serverConnection) throws IOException { |
| Message responseMsg = serverConnection.getResponseMessage(); |
| responseMsg.setMessageType(MessageType.RESPONSE); |
| responseMsg.setTransactionId(origMsg.getTransactionId()); |
| |
| |
| if (callbackArg == null) { |
| responseMsg.setNumberOfParts(1); |
| } else { |
| responseMsg.setNumberOfParts(2); |
| } |
| if (data instanceof byte[]) { |
| responseMsg.addRawPart((byte[]) data, isObject); |
| } else { |
| Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]"); |
| responseMsg.addObjPart(data, false); |
| } |
| if (callbackArg != null) { |
| responseMsg.addObjPart(callbackArg); |
| } |
| serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); |
| responseMsg.send(serverConnection); |
| origMsg.clearParts(); |
| } |
| |
| protected static void writeResponseWithRefreshMetadata(Object data, Object callbackArg, |
| Message origMsg, boolean isObject, ServerConnection serverConnection, PartitionedRegion pr, |
| byte nwHop) throws IOException { |
| Message responseMsg = serverConnection.getResponseMessage(); |
| responseMsg.setMessageType(MessageType.RESPONSE); |
| responseMsg.setTransactionId(origMsg.getTransactionId()); |
| |
| if (callbackArg == null) { |
| responseMsg.setNumberOfParts(2); |
| } else { |
| responseMsg.setNumberOfParts(3); |
| } |
| |
| if (data instanceof byte[]) { |
| responseMsg.addRawPart((byte[]) data, isObject); |
| } else { |
| Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]"); |
| responseMsg.addObjPart(data, false); |
| } |
| if (callbackArg != null) { |
| responseMsg.addObjPart(callbackArg); |
| } |
| responseMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop}); |
| serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); |
| responseMsg.send(serverConnection); |
| origMsg.clearParts(); |
| } |
| |
| protected static void writeResponseWithFunctionAttribute(byte[] data, Message origMsg, |
| ServerConnection serverConnection) throws IOException { |
| Message responseMsg = serverConnection.getResponseMessage(); |
| responseMsg.setMessageType(MessageType.RESPONSE); |
| responseMsg.setTransactionId(origMsg.getTransactionId()); |
| responseMsg.setNumberOfParts(1); |
| responseMsg.addBytesPart(data); |
| serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); |
| responseMsg.send(serverConnection); |
| origMsg.clearParts(); |
| } |
| |
| protected static void checkForInterrupt(ServerConnection serverConnection, Exception e) |
| throws InterruptedException, InterruptedIOException { |
| serverConnection.getCachedRegionHelper().checkCancelInProgress(e); |
| if (e instanceof InterruptedException) { |
| throw (InterruptedException) e; |
| } |
| if (e instanceof InterruptedIOException) { |
| throw (InterruptedIOException) e; |
| } |
| } |
| |
| static void writeQueryResponseChunk(Object queryResponseChunk, CollectionType collectionType, |
| boolean lastChunk, ServerConnection serverConnection) throws IOException { |
| ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage(); |
| queryResponseMsg.setNumberOfParts(2); |
| queryResponseMsg.setLastChunk(lastChunk); |
| queryResponseMsg.addObjPart(collectionType, false); |
| queryResponseMsg.addObjPart(queryResponseChunk, false); |
| queryResponseMsg.sendChunk(serverConnection); |
| } |
| |
| protected static void writeQueryResponseException(Message origMsg, Throwable exception, |
| ServerConnection serverConnection) throws IOException { |
| Throwable e = getClientException(serverConnection, exception); |
| ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage(); |
| ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); |
| if (queryResponseMsg.headerHasBeenSent()) { |
| // fix for bug 35442 |
| // This client is expecting 2 parts in this message so send 2 parts |
| queryResponseMsg.setServerConnection(serverConnection); |
| queryResponseMsg.setNumberOfParts(2); |
| queryResponseMsg.setLastChunkAndNumParts(true, 2); |
| queryResponseMsg.addObjPart(e); |
| queryResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk while reply in progress: {}", |
| serverConnection.getName(), e.getMessage(), e); |
| } |
| queryResponseMsg.sendChunk(serverConnection); |
| } else { |
| chunkedResponseMsg.setServerConnection(serverConnection); |
| chunkedResponseMsg.setMessageType(MessageType.EXCEPTION); |
| chunkedResponseMsg.setNumberOfParts(2); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, 2); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addObjPart(e); |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(), |
| e); |
| } |
| chunkedResponseMsg.sendChunk(serverConnection); |
| } |
| } |
| |
| protected static void writeChunkedErrorResponse(Message origMsg, int messageType, String message, |
| ServerConnection serverConnection) throws IOException { |
| // Send chunked response header identifying error message |
| ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message header type: {} transaction: {}", |
| serverConnection.getName(), messageType, origMsg.getTransactionId()); |
| } |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| |
| // Send actual error |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message chunk: {}", serverConnection.getName(), message); |
| } |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(true); |
| chunkedResponseMsg.addStringPart(message); |
| chunkedResponseMsg.sendChunk(serverConnection); |
| } |
| |
| protected static void writeFunctionResponseException(Message origMsg, int messageType, |
| ServerConnection serverConnection, Throwable exception) throws IOException { |
| Throwable e = getClientException(serverConnection, exception); |
| ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage(); |
| ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); |
| if (functionResponseMsg.headerHasBeenSent()) { |
| functionResponseMsg.setServerConnection(serverConnection); |
| functionResponseMsg.setNumberOfParts(2); |
| functionResponseMsg.setLastChunkAndNumParts(true, 2); |
| functionResponseMsg.addObjPart(e); |
| functionResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk while reply in progress: {}", |
| serverConnection.getName(), e.getMessage(), e); |
| } |
| functionResponseMsg.sendChunk(serverConnection); |
| } else { |
| chunkedResponseMsg.setServerConnection(serverConnection); |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setNumberOfParts(2); |
| chunkedResponseMsg.setLastChunkAndNumParts(true, 2); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addObjPart(e); |
| chunkedResponseMsg.addStringPart(getExceptionTrace(e)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(), |
| e); |
| } |
| chunkedResponseMsg.sendChunk(serverConnection); |
| } |
| } |
| |
| protected static void writeFunctionResponseError(Message origMsg, int messageType, String message, |
| ServerConnection servConn) throws IOException { |
| ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); |
| ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); |
| if (functionResponseMsg.headerHasBeenSent()) { |
| functionResponseMsg.setNumberOfParts(1); |
| functionResponseMsg.setLastChunk(true); |
| functionResponseMsg.addStringPart(message); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending Error chunk while reply in progress: {}", servConn.getName(), |
| message); |
| } |
| functionResponseMsg.sendChunk(servConn); |
| } else { |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(true); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| chunkedResponseMsg.addStringPart(message); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending Error chunk: {}", servConn.getName(), message); |
| } |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| } |
| |
| protected static void writeKeySetErrorResponse(Message origMsg, int messageType, String message, |
| ServerConnection servConn) throws IOException { |
| // Send chunked response header identifying error message |
| ChunkedMessage chunkedResponseMsg = servConn.getKeySetResponseMessage(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message header type: {} transaction: {}", servConn.getName(), |
| messageType, origMsg.getTransactionId()); |
| } |
| chunkedResponseMsg.setMessageType(messageType); |
| chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); |
| chunkedResponseMsg.sendHeader(); |
| // Send actual error |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message); |
| } |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(true); |
| chunkedResponseMsg.addStringPart(message); |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| static Message readRequest(ServerConnection servConn) { |
| Message requestMsg = null; |
| try { |
| requestMsg = servConn.getRequestMessage(); |
| requestMsg.receive(servConn, MAX_INCOMING_DATA, INCOMING_DATA_LIMITER, INCOMING_MSG_LIMITER); |
| return requestMsg; |
| } catch (EOFException eof) { |
| handleEOFException(null, servConn, eof); |
| // TODO: Check if there is any need for explicitly returning |
| |
| } catch (InterruptedIOException e) { // Solaris only |
| handleInterruptedIOException(servConn, e); |
| |
| } catch (IOException e) { |
| handleIOException(null, servConn, e); |
| |
| } catch (DistributedSystemDisconnectedException e) { |
| handleShutdownException(null, servConn, e); |
| |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| handleThrowable(null, servConn, e); |
| } |
| return requestMsg; |
| } |
| |
| protected static void fillAndSendRegisterInterestResponseChunks(LocalRegion region, Object riKey, |
| int interestType, InterestResultPolicy policy, ServerConnection servConn) throws IOException { |
| fillAndSendRegisterInterestResponseChunks(region, riKey, interestType, false, policy, servConn); |
| } |
| |
| /** |
| * serializeValues is unused for clients < GFE_80 |
| */ |
| protected static void fillAndSendRegisterInterestResponseChunks(LocalRegion region, Object riKey, |
| int interestType, boolean serializeValues, InterestResultPolicy policy, |
| ServerConnection servConn) throws IOException { |
| // Client is not interested. |
| if (policy.isNone()) { |
| sendRegisterInterestResponseChunk(region, riKey, new ArrayList(), true, servConn); |
| return; |
| } |
| if (policy.isKeysValues() && servConn.getClientVersion().isNotOlderThan(KnownVersion.GFE_80)) { |
| handleKeysValuesPolicy(region, riKey, interestType, serializeValues, servConn); |
| return; |
| } |
| if (riKey instanceof List) { |
| handleList(region, (List) riKey, policy, servConn); |
| return; |
| } |
| if (!(riKey instanceof String)) { |
| handleSingleton(region, riKey, policy, servConn); |
| return; |
| } |
| |
| switch (interestType) { |
| case InterestType.OQL_QUERY: |
| // Not supported yet |
| throw new InternalGemFireError( |
| "not yet supported"); |
| |
| case InterestType.FILTER_CLASS: |
| throw new InternalGemFireError( |
| "not yet supported"); |
| |
| case InterestType.REGULAR_EXPRESSION: |
| String regEx = (String) riKey; |
| if (regEx.equals(".*")) { |
| handleAllKeys(region, policy, servConn); |
| } else { |
| handleRegEx(region, regEx, policy, servConn); |
| } |
| break; |
| |
| case InterestType.KEY: |
| if (riKey.equals("ALL_KEYS")) { |
| handleAllKeys(region, policy, servConn); |
| } else { |
| handleSingleton(region, riKey, policy, servConn); |
| } |
| break; |
| |
| default: |
| throw new InternalGemFireError( |
| "unknown interest type"); |
| } |
| } |
| |
| private static void handleKeysValuesPolicy(LocalRegion region, Object riKey, int interestType, |
| boolean serializeValues, ServerConnection servConn) throws IOException { |
| if (riKey instanceof List) { |
| handleKVList(region, (List) riKey, serializeValues, servConn); |
| return; |
| } |
| if (!(riKey instanceof String)) { |
| handleKVSingleton(region, riKey, serializeValues, servConn); |
| return; |
| } |
| |
| switch (interestType) { |
| case InterestType.OQL_QUERY: |
| throw new InternalGemFireError( |
| "not yet supported"); |
| |
| case InterestType.FILTER_CLASS: |
| throw new InternalGemFireError( |
| "not yet supported"); |
| |
| case InterestType.REGULAR_EXPRESSION: |
| String regEx = (String) riKey; |
| if (regEx.equals(".*")) { |
| handleKVAllKeys(region, null, serializeValues, servConn); |
| } else { |
| handleKVAllKeys(region, regEx, serializeValues, servConn); |
| } |
| break; |
| |
| case InterestType.KEY: |
| if (riKey.equals("ALL_KEYS")) { |
| handleKVAllKeys(region, null, serializeValues, servConn); |
| } else { |
| handleKVSingleton(region, riKey, serializeValues, servConn); |
| } |
| break; |
| |
| default: |
| throw new InternalGemFireError( |
| "unknown interest type"); |
| } |
| } |
| |
| /** |
| * @param list is a List of entry keys |
| */ |
| private static void sendRegisterInterestResponseChunk(Region region, Object riKey, List list, |
| boolean lastChunk, ServerConnection servConn) throws IOException { |
| ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(lastChunk); |
| chunkedResponseMsg.addObjPart(list, false); |
| String regionName = region == null ? " null " : region.getFullPath(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>", |
| servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg); |
| } |
| |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| /** |
| * Determines whether keys for destroyed entries (tombstones) should be sent to clients in |
| * register-interest results. |
| * |
| * @return true if tombstones should be sent to the client |
| */ |
| private static boolean sendTombstonesInRIResults(ServerConnection servConn, |
| InterestResultPolicy policy) { |
| return policy == InterestResultPolicy.KEYS_VALUES |
| && servConn.getClientVersion().isNotOlderThan(KnownVersion.GFE_80); |
| } |
| |
| /** |
| * Process an interest request involving a list of keys |
| * |
| * @param region the region |
| * @param keyList the list of keys |
| * @param policy the policy |
| */ |
| private static void handleList(LocalRegion region, List keyList, InterestResultPolicy policy, |
| ServerConnection servConn) throws IOException { |
| if (region instanceof PartitionedRegion) { |
| // too bad java doesn't provide another way to do this... |
| handleListPR((PartitionedRegion) region, keyList, policy, servConn); |
| return; |
| } |
| List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE); |
| // Handle list of keys |
| if (region != null) { |
| for (Object entryKey : keyList) { |
| if (region.containsKey(entryKey) |
| || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) { |
| |
| appendInterestResponseKey(region, keyList, entryKey, newKeyList, servConn); |
| } |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn); |
| } |
| |
| /** |
| * Handles both RR and PR cases |
| */ |
| @SuppressWarnings(value = "NP_NULL_PARAM_DEREF", |
| justification = "Null value handled in sendNewRegisterInterestResponseChunk()") |
| private static void handleKVSingleton(LocalRegion region, Object entryKey, |
| boolean serializeValues, ServerConnection servConn) throws IOException { |
| VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, |
| region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues); |
| |
| if (region != null) { |
| if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) { |
| VersionTagHolder versionHolder = createVersionTagHolder(); |
| ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); |
| // From Get70.getValueAndIsObject() |
| Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true); |
| VersionTag vt = versionHolder.getVersionTag(); |
| |
| updateValues(values, entryKey, data, vt); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, entryKey, values, true, servConn); |
| } |
| |
| /** |
| * Process an interest request consisting of a single key |
| * |
| * @param region the region |
| * @param entryKey the key |
| * @param policy the policy |
| */ |
| private static void handleSingleton(LocalRegion region, Object entryKey, |
| InterestResultPolicy policy, ServerConnection servConn) throws IOException { |
| List keyList = new ArrayList(1); |
| if (region != null) { |
| if (region.containsKey(entryKey) |
| || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) { |
| appendInterestResponseKey(region, entryKey, entryKey, keyList, servConn); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, entryKey, keyList, true, servConn); |
| } |
| |
| /** |
| * Process an interest request of type ALL_KEYS |
| * |
| * @param region the region |
| * @param policy the policy |
| */ |
| private static void handleAllKeys(LocalRegion region, InterestResultPolicy policy, |
| ServerConnection servConn) throws IOException { |
| List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); |
| if (region != null) { |
| for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) { |
| appendInterestResponseKey(region, "ALL_KEYS", entryKey, keyList, servConn); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true, servConn); |
| } |
| |
| private static void handleKVAllKeys(LocalRegion region, String regex, boolean serializeValues, |
| ServerConnection servConn) throws IOException { |
| |
| if (region instanceof PartitionedRegion) { |
| handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn); |
| return; |
| } |
| |
| VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, |
| region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues); |
| |
| if (region != null) { |
| |
| Pattern keyPattern = null; |
| if (regex != null) { |
| keyPattern = Pattern.compile(regex); |
| } |
| |
| for (Object key : region.keySet(true)) { |
| VersionTagHolder versionHolder = createVersionTagHolder(); |
| if (keyPattern != null) { |
| if (!(key instanceof String)) { |
| // key is not a String, cannot apply regex to this entry |
| continue; |
| } |
| if (!keyPattern.matcher((String) key).matches()) { |
| // key does not match the regex, this entry should not be |
| // returned. |
| continue; |
| } |
| } |
| |
| ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); |
| Object data = region.get(key, null, true, true, true, id, versionHolder, true); |
| VersionTag versionTag = versionHolder.getVersionTag(); |
| updateValues(values, key, data, versionTag); |
| |
| if (values.size() == MAXIMUM_CHUNK_SIZE) { |
| sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, |
| false, servConn); |
| values.clear(); |
| } |
| } // for |
| } // if |
| |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, true, |
| servConn); |
| } |
| |
| private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo, |
| boolean serializeValues, ServerConnection servConn) throws IOException { |
| |
| VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, |
| region.getConcurrencyChecksEnabled(), serializeValues); |
| |
| if (keyInfo instanceof List) { |
| HashMap<Integer, HashSet> bucketKeys = new HashMap<>(); |
| for (Object key : (List) keyInfo) { |
| int id = PartitionedRegionHelper.getHashKey(region, null, key, null, null); |
| if (bucketKeys.containsKey(id)) { |
| bucketKeys.get(id).add(key); |
| } else { |
| HashSet<Object> keys = new HashSet<>(); |
| keys.add(key); |
| bucketKeys.put(id, keys); |
| } |
| } |
| region.fetchEntries(bucketKeys, values, servConn); |
| } else { // keyInfo is a String |
| region.fetchEntries((String) keyInfo, values, servConn); |
| } |
| |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, keyInfo != null ? keyInfo : "ALL_KEYS", values, |
| true, servConn); |
| } |
| |
| /** |
| * Copied from Get70.getValueAndIsObject(), except a minor change. (Make the method static instead |
| * of copying it here?) |
| */ |
| private static void updateValues(VersionedObjectList values, Object key, Object value, |
| VersionTag versionTag) { |
| boolean isObject = true; |
| |
| // If the value in the VM is a CachedDeserializable, |
| // get its value. If it is Token.REMOVED, Token.DESTROYED, |
| // Token.INVALID, or Token.LOCAL_INVALID |
| // set it to null. If it is NOT_AVAILABLE, get the value from |
| // disk. If it is already a byte[], set isObject to false. |
| boolean wasInvalid = false; |
| if (value instanceof CachedDeserializable) { |
| value = ((CachedDeserializable) value).getValue(); |
| } else if (isRemovalToken(value)) { |
| value = null; |
| } else if (value == Token.INVALID || value == Token.LOCAL_INVALID) { |
| value = null; // fix for bug 35884 |
| wasInvalid = true; |
| } else if (value instanceof byte[]) { |
| isObject = false; |
| } |
| boolean keyNotPresent = !wasInvalid && (value == null || value == Token.TOMBSTONE); |
| |
| if (keyNotPresent) { |
| values.addObjectPartForAbsentKey(key, value, versionTag); |
| } else { |
| values.addObjectPart(key, value, isObject, versionTag); |
| } |
| } |
| |
| private static boolean isRemovalToken(final Object value) { |
| return value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2 |
| || value == Token.DESTROYED || value == Token.TOMBSTONE; |
| } |
| |
| public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region, |
| VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn) |
| throws IOException { |
| ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID(); |
| for (Object key : keySet) { |
| VersionTagHolder versionHolder = createVersionTagHolder(); |
| |
| Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true); |
| |
| updateValues(values, key, value, versionHolder.getVersionTag()); |
| |
| if (values.size() == MAXIMUM_CHUNK_SIZE) { |
| // Send the chunk and clear the list |
| // values.setKeys(null); // Now we need to send keys too. |
| sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, |
| false, servConn); |
| values.clear(); |
| } |
| } // for |
| } |
| |
| public static void appendNewRegisterInterestResponseChunk(LocalRegion region, |
| VersionedObjectList values, Object riKeys, Set<Map.Entry> set, ServerConnection servConn) |
| throws IOException { |
| for (Entry entry : set) { |
| if (entry instanceof Region.Entry) { // local entries |
| VersionTag vt; |
| Object key; |
| Object value; |
| if (entry instanceof EntrySnapshot) { |
| vt = ((EntrySnapshot) entry).getVersionTag(); |
| key = ((EntrySnapshot) entry).getRegionEntry().getKey(); |
| value = ((EntrySnapshot) entry).getRegionEntry().getValue(null); |
| updateValues(values, key, value, vt); |
| } else { |
| VersionStamp vs = ((NonTXEntry) entry).getRegionEntry().getVersionStamp(); |
| vt = vs == null ? null : vs.asVersionTag(); |
| key = entry.getKey(); |
| value = ((NonTXEntry) entry).getRegionEntry().getValueRetain(region, true); |
| try { |
| updateValues(values, key, value, vt); |
| } finally { |
| OffHeapHelper.release(value); |
| } |
| } |
| } else { // Map.Entry (remote entries) |
| List list = (List) entry.getValue(); |
| Object value = list.get(0); |
| VersionTag tag = (VersionTag) list.get(1); |
| updateValues(values, entry.getKey(), value, tag); |
| } |
| if (values.size() == MAXIMUM_CHUNK_SIZE) { |
| // Send the chunk and clear the list |
| sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, |
| false, servConn); |
| values.clear(); |
| } |
| } // for |
| } |
| |
| public static void sendNewRegisterInterestResponseChunk(LocalRegion region, Object riKey, |
| VersionedObjectList list, boolean lastChunk, ServerConnection servConn) throws IOException { |
| ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); |
| chunkedResponseMsg.setNumberOfParts(1); |
| chunkedResponseMsg.setLastChunk(lastChunk); |
| chunkedResponseMsg.addObjPart(list, false); |
| String regionName = region == null ? " null " : region.getFullPath(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>", |
| servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg); |
| } |
| chunkedResponseMsg.sendChunk(servConn); |
| } |
| |
| /** |
| * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION} |
| */ |
| private static void handleRegEx(LocalRegion region, String regex, InterestResultPolicy policy, |
| ServerConnection servConn) throws IOException { |
| if (region instanceof PartitionedRegion) { |
| // too bad java doesn't provide another way to do this... |
| handleRegExPR((PartitionedRegion) region, regex, policy, servConn); |
| return; |
| } |
| List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); |
| // Handle the regex pattern |
| if (region != null) { |
| Pattern keyPattern = Pattern.compile(regex); |
| for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) { |
| if (!(entryKey instanceof String)) { |
| // key is not a String, cannot apply regex to this entry |
| continue; |
| } |
| if (!keyPattern.matcher((String) entryKey).matches()) { |
| // key does not match the regex, this entry should not be returned. |
| continue; |
| } |
| |
| appendInterestResponseKey(region, regex, entryKey, keyList, servConn); |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn); |
| } |
| |
| /** |
| * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION} |
| */ |
| private static void handleRegExPR(final PartitionedRegion region, final String regex, |
| final InterestResultPolicy policy, final ServerConnection servConn) throws IOException { |
| final List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); |
| region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy), |
| new PartitionedRegion.SetCollector() { |
| @Override |
| public void receiveSet(Set theSet) throws IOException { |
| appendInterestResponseKeys(region, regex, theSet, keyList, servConn); |
| } |
| }); |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn); |
| } |
| |
| /** |
| * Process an interest request involving a list of keys |
| */ |
| private static void handleListPR(final PartitionedRegion region, final List keyList, |
| final InterestResultPolicy policy, final ServerConnection servConn) throws IOException { |
| final List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE); |
| region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy), |
| new PartitionedRegion.SetCollector() { |
| @Override |
| public void receiveSet(Set theSet) throws IOException { |
| appendInterestResponseKeys(region, keyList, theSet, newKeyList, servConn); |
| } |
| }); |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn); |
| } |
| |
| private static void handleKVList(final LocalRegion region, final List keyList, |
| boolean serializeValues, final ServerConnection servConn) throws IOException { |
| |
| if (region instanceof PartitionedRegion) { |
| handleKVKeysPR((PartitionedRegion) region, keyList, serializeValues, servConn); |
| return; |
| } |
| VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, |
| region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues); |
| |
| // Handle list of keys |
| if (region != null) { |
| |
| for (Object key : keyList) { |
| if (region.containsKey(key) || region.containsTombstone(key)) { |
| VersionTagHolder versionHolder = createVersionTagHolder(); |
| |
| ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); |
| Object data = region.get(key, null, true, true, true, id, versionHolder, true); |
| VersionTag versionTag = versionHolder.getVersionTag(); |
| updateValues(values, key, data, versionTag); |
| |
| if (values.size() == MAXIMUM_CHUNK_SIZE) { |
| // Send the chunk and clear the list |
| // values.setKeys(null); // Now we need to send keys too. |
| sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn); |
| values.clear(); |
| } |
| } |
| } |
| } |
| // Send the last chunk (the only chunk for individual and list keys) |
| // always send it back, even if the list is of zero size. |
| sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn); |
| } |
| |
| private static VersionTagHolder createVersionTagHolder() { |
| VersionTagHolder versionHolder = new VersionTagHolder(); |
| versionHolder.setOperation(Operation.GET_FOR_REGISTER_INTEREST); |
| return versionHolder; |
| } |
| |
| /** |
| * Append an interest response |
| * |
| * @param region the region (for debugging) |
| * @param riKey the registerInterest "key" (what the client is interested in) |
| * @param entryKey key we're responding to |
| * @param list list to append to |
| */ |
| private static void appendInterestResponseKey(LocalRegion region, Object riKey, Object entryKey, |
| List list, ServerConnection servConn) throws IOException { |
| list.add(entryKey); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}", |
| servConn.getName(), entryKey, list.size(), region.getFullPath()); |
| } |
| if (list.size() == MAXIMUM_CHUNK_SIZE) { |
| // Send the chunk and clear the list |
| sendRegisterInterestResponseChunk(region, riKey, list, false, servConn); |
| list.clear(); |
| } |
| } |
| |
| private static void appendInterestResponseKeys(LocalRegion region, Object riKey, |
| Collection entryKeys, List collector, ServerConnection servConn) throws IOException { |
| for (final Object entryKey : entryKeys) { |
| appendInterestResponseKey(region, riKey, entryKey, collector, servConn); |
| } |
| } |
| } |