blob: dc2b3a2d94e363ed24cfd5bfa93d3aaf11fc6e95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.regex.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CopyException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SerializationException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FindVersionTagOperation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.NonTXEntry;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.VersionTagHolder;
import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.GemFireSecurityException;
public abstract class BaseCommand implements Command {
protected static final Logger logger = LogService.getLogger();
@Immutable
private static final byte[] OK_BYTES = new byte[] {0};
public static final int MAXIMUM_CHUNK_SIZE =
Integer.getInteger("BridgeServer.MAXIMUM_CHUNK_SIZE", 100);
/** Whether to suppress logging of IOExceptions */
private static final boolean SUPPRESS_IO_EXCEPTION_LOGGING =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "bridge.suppressIOExceptionLogging");
/**
* Maximum number of concurrent incoming client message bytes that a cache server will allow.
* Once a server is working on this number additional incoming client messages will wait until one
* of them completes or fails. The bytes are computed based in the size sent in the incoming msg
* header.
*/
private static final int MAX_INCOMING_DATA =
Integer.getInteger("BridgeServer.MAX_INCOMING_DATA", -1);
/**
* Maximum number of concurrent incoming client messages that a cache server will allow. Once a
* server is working on this number additional incoming client messages will wait until one of
* them completes or fails.
*/
private static final int MAX_INCOMING_MESSAGES =
Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1);
@MakeNotStatic
private static final Semaphore INCOMING_DATA_LIMITER;
@MakeNotStatic
private static final Semaphore INCOMING_MSG_LIMITER;
static {
Semaphore semaphore;
if (MAX_INCOMING_DATA > 0) {
// backport requires that this is fair since we inc by values > 1
semaphore = new Semaphore(MAX_INCOMING_DATA, true);
} else {
semaphore = null;
}
INCOMING_DATA_LIMITER = semaphore;
if (MAX_INCOMING_MESSAGES > 0) {
// unfair for best performance
semaphore = new Semaphore(MAX_INCOMING_MESSAGES, false);
} else {
semaphore = null;
}
INCOMING_MSG_LIMITER = semaphore;
}
protected static byte[] okBytes() {
return OK_BYTES;
}
protected boolean setLastResultReceived(
ServerToClientFunctionResultSender resultSender) {
if (resultSender != null) {
synchronized (resultSender) {
if (resultSender.isLastResultReceived()) {
return false;
} else {
resultSender.setLastResultReceived(true);
}
}
}
return true;
}
@Override
public void execute(Message clientMessage, ServerConnection serverConnection,
SecurityService securityService) {
// Read the request and update the statistics
long start = DistributionStats.getStatTime();
if (EntryLogger.isEnabled() && serverConnection != null) {
EntryLogger.setSource(serverConnection.getMembershipID(), "c2s");
}
boolean shouldMasquerade = shouldMasqueradeForTx(clientMessage, serverConnection);
try {
if (shouldMasquerade) {
InternalCache cache = serverConnection.getCache();
InternalDistributedMember member =
(InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
TXManagerImpl txMgr = cache.getTxManager();
TXStateProxy tx = null;
try {
tx = txMgr.masqueradeAs(clientMessage, member, false);
cmdExecute(clientMessage, serverConnection, securityService, start);
tx.updateProxyServer(txMgr.getMemberId());
} finally {
txMgr.unmasquerade(tx);
}
} else {
cmdExecute(clientMessage, serverConnection, securityService, start);
}
} catch (TransactionException | CopyException | SerializationException | CacheWriterException
| CacheLoaderException | GemFireSecurityException | PartitionOfflineException
| MessageTooLargeException e) {
handleExceptionNoDisconnect(clientMessage, serverConnection, e);
} catch (EOFException eof) {
BaseCommand.handleEOFException(clientMessage, serverConnection, eof);
} catch (InterruptedIOException e) { // Solaris only
BaseCommand.handleInterruptedIOException(serverConnection, e);
} catch (IOException e) {
BaseCommand.handleIOException(clientMessage, serverConnection, e);
} catch (DistributedSystemDisconnectedException e) {
BaseCommand.handleShutdownException(clientMessage, serverConnection, e);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
BaseCommand.handleThrowable(clientMessage, serverConnection, e);
} finally {
EntryLogger.clearSource();
}
}
/**
* checks to see if this thread needs to masquerade as a transactional thread. clients after
* GFE_66 should be able to start a transaction.
*
* @return true if thread should masquerade as a transactional thread.
*/
protected boolean shouldMasqueradeForTx(Message clientMessage,
ServerConnection serverConnection) {
return serverConnection.getClientVersion().compareTo(Version.GFE_66) >= 0
&& clientMessage.getTransactionId() > TXManagerImpl.NOTX;
}
/**
* If an operation is retried then some server may have seen it already. We cannot apply this
* operation to the cache without knowing whether a version tag has already been created for it.
* Otherwise caches that have seen the event already will reject it but others will not, but will
* have no version tag with which to perform concurrency checks.
* <p>
* The client event should have the event identifier from the client and the region affected by
* the operation.
*/
public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) {
InternalRegion r = clientEvent.getRegion();
VersionTag tag = r.findVersionTagForEvent(clientEvent.getEventId());
if (tag == null) {
if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {
// TODO this could be optimized for partitioned regions by sending the key
// so that the PR could look at an individual bucket for the event
tag = FindVersionTagOperation.findVersionTag(r, clientEvent.getEventId(), false);
}
}
if (tag != null) {
if (logger.isDebugEnabled()) {
logger.debug("recovered version tag {} for replayed operation {}", tag,
clientEvent.getEventId());
}
clientEvent.setVersionTag(tag);
}
return tag != null;
}
/**
* If an operation is retried then some server may have seen it already. We cannot apply this
* operation to the cache without knowing whether a version tag has already been created for it.
* Otherwise caches that have seen the event already will reject it but others will not, but will
* have no version tag with which to perform concurrency checks.
* <p>
* The client event should have the event identifier from the client and the region affected by
* the operation.
*/
protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion region, EventID eventID) {
VersionTag tag = region.findVersionTagForClientBulkOp(eventID);
if (tag != null) {
if (logger.isDebugEnabled()) {
logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
}
return tag;
}
if (region instanceof DistributedRegion || region instanceof PartitionedRegion) {
// TODO this could be optimized for partitioned regions by sending the key
// so that the PR could look at an individual bucket for the event
tag = FindVersionTagOperation.findVersionTag(region, eventID, true);
}
if (tag != null) {
if (logger.isDebugEnabled()) {
logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
}
}
return tag;
}
public abstract void cmdExecute(final Message clientMessage,
final ServerConnection serverConnection, final SecurityService securityService,
final long start) throws IOException, ClassNotFoundException, InterruptedException;
protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException {
Message replyMsg = serverConnection.getReplyMessage();
serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(1);
replyMsg.setTransactionId(origMsg.getTransactionId());
replyMsg.addBytesPart(okBytes());
replyMsg.send(serverConnection);
if (logger.isTraceEnabled()) {
logger.trace("{}: rpl tx: {}", serverConnection.getName(), origMsg.getTransactionId());
}
}
protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection,
PartitionedRegion pr, byte nwHop) throws IOException {
Message replyMsg = serverConnection.getReplyMessage();
serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(1);
replyMsg.setTransactionId(origMsg.getTransactionId());
replyMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop});
replyMsg.send(serverConnection);
pr.getPrStats().incPRMetaDataSentCount();
if (logger.isTraceEnabled()) {
logger.trace("{}: rpl with REFRESH_METADATA tx: {}", serverConnection.getName(),
origMsg.getTransactionId());
}
}
private static void handleEOFException(Message msg, ServerConnection serverConnection,
Exception eof) {
CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
CacheServerStats stats = serverConnection.getCacheServerStats();
boolean potentialModification = serverConnection.getPotentialModification();
if (!crHelper.isShutdown()) {
if (potentialModification) {
stats.incAbandonedWriteRequests();
} else {
stats.incAbandonedReadRequests();
}
if (!SUPPRESS_IO_EXCEPTION_LOGGING) {
if (potentialModification) {
int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(
"{}: EOFException during a write operation on region : {} key: {} messageId: {}",
new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
serverConnection.getModKey(), transId});
} else {
logger.debug("EOF exception", eof);
logger.warn("{}: connection disconnect detected by EOF.",
serverConnection.getName());
}
}
}
serverConnection.setFlagProcessMessagesAsFalse();
serverConnection.setClientDisconnectedException(eof);
}
private static void handleInterruptedIOException(ServerConnection serverConnection, Exception e) {
CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
if (!crHelper.isShutdown() && serverConnection.isOpen()) {
if (!SUPPRESS_IO_EXCEPTION_LOGGING) {
if (logger.isDebugEnabled())
logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e);
}
}
serverConnection.setFlagProcessMessagesAsFalse();
serverConnection.setClientDisconnectedException(e);
}
private static void handleIOException(Message msg, ServerConnection serverConnection,
Exception e) {
CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
boolean potentialModification = serverConnection.getPotentialModification();
if (!crHelper.isShutdown() && serverConnection.isOpen()) {
if (!SUPPRESS_IO_EXCEPTION_LOGGING) {
if (potentialModification) {
int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(String.format(
"%s: Unexpected IOException during operation for region: %s key: %s messId: %s",
new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
serverConnection.getModKey(), transId}),
e);
} else {
logger.warn(String.format("%s: Unexpected IOException: ",
serverConnection.getName()), e);
}
}
}
serverConnection.setFlagProcessMessagesAsFalse();
serverConnection.setClientDisconnectedException(e);
}
private static void handleShutdownException(Message msg, ServerConnection serverConnection,
Exception e) {
CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
boolean potentialModification = serverConnection.getPotentialModification();
if (!crHelper.isShutdown()) {
if (potentialModification) {
int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(String.format(
"%s: Unexpected ShutdownException during operation on region: %s key: %s messageId: %s",
new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
serverConnection.getModKey(), transId}),
e);
} else {
logger.warn(String.format("%s: Unexpected ShutdownException: ",
serverConnection.getName()),
e);
}
}
serverConnection.setFlagProcessMessagesAsFalse();
serverConnection.setClientDisconnectedException(e);
}
private static void handleExceptionNoDisconnect(Message msg, ServerConnection serverConnection,
Exception e) {
boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE);
boolean responded = serverConnection.getTransientFlag(RESPONDED);
boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
boolean potentialModification = serverConnection.getPotentialModification();
try {
boolean wroteExceptionResponse = false;
try {
if (requiresResponse && !responded) {
if (requiresChunkedResponse) {
writeChunkedException(msg, e, serverConnection);
} else {
writeException(msg, e, false, serverConnection);
}
wroteExceptionResponse = true;
serverConnection.setAsTrue(RESPONDED);
}
} finally { // inner try-finally to ensure proper ordering of logging
if (potentialModification) {
int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
if (!wroteExceptionResponse) {
logger.warn(String.format(
"%s: Unexpected Exception during operation on region: %s key: %s messageId: %s",
new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
serverConnection.getModKey(), transId}),
e);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}",
serverConnection.getName(), serverConnection.getModRegion(),
serverConnection.getModKey(), transId, e);
}
}
} else {
if (!wroteExceptionResponse) {
logger.warn(String.format("%s: Unexpected Exception",
serverConnection.getName()), e);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: Exception: {}", serverConnection.getName(), e.getMessage(), e);
}
}
}
}
} catch (IOException ioe) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(),
ioe.getMessage(), ioe);
}
}
}
private static void handleThrowable(Message msg, ServerConnection serverConnection,
Throwable th) {
boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE);
boolean responded = serverConnection.getTransientFlag(RESPONDED);
boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
boolean potentialModification = serverConnection.getPotentialModification();
try {
try {
if (th instanceof Error) {
logger.fatal(String.format("%s : Unexpected Error on server",
serverConnection.getName()),
th);
}
if (requiresResponse && !responded) {
if (requiresChunkedResponse) {
writeChunkedException(msg, th, serverConnection);
} else {
writeException(msg, th, false, serverConnection);
}
serverConnection.setAsTrue(RESPONDED);
}
} finally { // inner try-finally to ensure proper ordering of logging
if (!(th instanceof Error || th instanceof CacheLoaderException)) {
if (potentialModification) {
int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(String.format(
"%s: Unexpected Exception during operation on region: %s key: %s messageId: %s",
new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
serverConnection.getModKey(), transId}),
th);
} else {
logger.warn(String.format("%s: Unexpected Exception",
serverConnection.getName()), th);
}
}
}
} catch (IOException ioe) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(),
ioe.getMessage(), ioe);
}
} finally {
serverConnection.setFlagProcessMessagesAsFalse();
serverConnection.setClientDisconnectedException(th);
}
}
protected static void writeChunkedException(Message origMsg, Throwable e,
ServerConnection serverConnection) throws IOException {
writeChunkedException(origMsg, e, serverConnection,
serverConnection.getChunkedResponseMessage());
}
protected static void writeChunkedException(Message origMsg, Throwable e,
ServerConnection serverConnection, ChunkedMessage originalResponse) throws IOException {
writeChunkedException(origMsg, e, serverConnection, originalResponse, 2);
}
private static void writeChunkedException(Message origMsg, Throwable exception,
ServerConnection serverConnection, ChunkedMessage originalResponse, int numOfParts)
throws IOException {
Throwable e = getClientException(serverConnection, exception);
ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
chunkedResponseMsg.setServerConnection(serverConnection);
if (originalResponse.headerHasBeenSent()) {
chunkedResponseMsg.setNumberOfParts(numOfParts);
chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
chunkedResponseMsg.addObjPart(e);
if (numOfParts == 2) {
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending exception chunk while reply in progress: {}",
serverConnection.getName(), e.getMessage(), e);
}
} else {
chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
chunkedResponseMsg.setNumberOfParts(numOfParts);
chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
chunkedResponseMsg.sendHeader();
chunkedResponseMsg.addObjPart(e);
if (numOfParts == 2) {
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(),
e);
}
}
chunkedResponseMsg.sendChunk(serverConnection);
}
// Get the exception stacktrace for native clients
public static String getExceptionTrace(Throwable ex) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
ex.printStackTrace(pw);
pw.close();
return sw.toString();
}
protected static void writeException(Message origMsg, Throwable e, boolean isSevere,
ServerConnection serverConnection) throws IOException {
writeException(origMsg, MessageType.EXCEPTION, e, isSevere, serverConnection);
}
private static Throwable getClientException(ServerConnection serverConnection, Throwable e) {
InternalCache cache = serverConnection.getCache();
if (cache != null) {
OldClientSupportService svc = cache.getService(OldClientSupportService.class);
if (svc != null) {
return svc.getThrowable(e, serverConnection.getClientVersion());
}
}
return e;
}
protected static void writeException(Message origMsg, int msgType, Throwable e, boolean isSevere,
ServerConnection serverConnection) throws IOException {
Throwable theException = getClientException(serverConnection, e);
Message errorMsg = serverConnection.getErrorResponseMessage();
errorMsg.setMessageType(msgType);
errorMsg.setNumberOfParts(2);
errorMsg.setTransactionId(origMsg.getTransactionId());
if (isSevere) {
String msg = theException.getMessage();
if (msg == null) {
msg = theException.toString();
}
logger.fatal("Severe cache exception : {}", msg);
}
errorMsg.addObjPart(theException);
errorMsg.addStringPart(getExceptionTrace(theException));
errorMsg.send(serverConnection);
if (logger.isDebugEnabled()) {
logger.debug("{}: Wrote exception: {}", serverConnection.getName(), e.getMessage(), e);
}
if (e instanceof MessageTooLargeException) {
throw (IOException) e;
}
}
protected static void writeErrorResponse(Message origMsg, int messageType,
ServerConnection serverConnection) throws IOException {
Message errorMsg = serverConnection.getErrorResponseMessage();
errorMsg.setMessageType(messageType);
errorMsg.setNumberOfParts(1);
errorMsg.setTransactionId(origMsg.getTransactionId());
errorMsg.addStringPart(
"Invalid data received. Please see the cache server log file for additional details.");
errorMsg.send(serverConnection);
}
protected static void writeErrorResponse(Message origMsg, int messageType, String msg,
ServerConnection serverConnection) throws IOException {
Message errorMsg = serverConnection.getErrorResponseMessage();
errorMsg.setMessageType(messageType);
errorMsg.setNumberOfParts(1);
errorMsg.setTransactionId(origMsg.getTransactionId());
errorMsg.addStringPart(msg);
errorMsg.send(serverConnection);
}
protected static void writeRegionDestroyedEx(Message msg, String regionName, String title,
ServerConnection serverConnection) throws IOException {
String reason = serverConnection.getName() + ": Region named " + regionName + title;
RegionDestroyedException ex = new RegionDestroyedException(reason, regionName);
if (serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) {
writeChunkedException(msg, ex, serverConnection);
} else {
writeException(msg, ex, false, serverConnection);
}
}
protected static void writeResponse(Object data, Object callbackArg, Message origMsg,
boolean isObject, ServerConnection serverConnection) throws IOException {
Message responseMsg = serverConnection.getResponseMessage();
responseMsg.setMessageType(MessageType.RESPONSE);
responseMsg.setTransactionId(origMsg.getTransactionId());
if (callbackArg == null) {
responseMsg.setNumberOfParts(1);
} else {
responseMsg.setNumberOfParts(2);
}
if (data instanceof byte[]) {
responseMsg.addRawPart((byte[]) data, isObject);
} else {
Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]");
responseMsg.addObjPart(data, false);
}
if (callbackArg != null) {
responseMsg.addObjPart(callbackArg);
}
serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
responseMsg.send(serverConnection);
origMsg.clearParts();
}
protected static void writeResponseWithRefreshMetadata(Object data, Object callbackArg,
Message origMsg, boolean isObject, ServerConnection serverConnection, PartitionedRegion pr,
byte nwHop) throws IOException {
Message responseMsg = serverConnection.getResponseMessage();
responseMsg.setMessageType(MessageType.RESPONSE);
responseMsg.setTransactionId(origMsg.getTransactionId());
if (callbackArg == null) {
responseMsg.setNumberOfParts(2);
} else {
responseMsg.setNumberOfParts(3);
}
if (data instanceof byte[]) {
responseMsg.addRawPart((byte[]) data, isObject);
} else {
Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]");
responseMsg.addObjPart(data, false);
}
if (callbackArg != null) {
responseMsg.addObjPart(callbackArg);
}
responseMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop});
serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
responseMsg.send(serverConnection);
origMsg.clearParts();
}
protected static void writeResponseWithFunctionAttribute(byte[] data, Message origMsg,
ServerConnection serverConnection) throws IOException {
Message responseMsg = serverConnection.getResponseMessage();
responseMsg.setMessageType(MessageType.RESPONSE);
responseMsg.setTransactionId(origMsg.getTransactionId());
responseMsg.setNumberOfParts(1);
responseMsg.addBytesPart(data);
serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
responseMsg.send(serverConnection);
origMsg.clearParts();
}
protected static void checkForInterrupt(ServerConnection serverConnection, Exception e)
throws InterruptedException, InterruptedIOException {
serverConnection.getCachedRegionHelper().checkCancelInProgress(e);
if (e instanceof InterruptedException) {
throw (InterruptedException) e;
}
if (e instanceof InterruptedIOException) {
throw (InterruptedIOException) e;
}
}
static void writeQueryResponseChunk(Object queryResponseChunk, CollectionType collectionType,
boolean lastChunk, ServerConnection serverConnection) throws IOException {
ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage();
queryResponseMsg.setNumberOfParts(2);
queryResponseMsg.setLastChunk(lastChunk);
queryResponseMsg.addObjPart(collectionType, false);
queryResponseMsg.addObjPart(queryResponseChunk, false);
queryResponseMsg.sendChunk(serverConnection);
}
protected static void writeQueryResponseException(Message origMsg, Throwable exception,
ServerConnection serverConnection) throws IOException {
Throwable e = getClientException(serverConnection, exception);
ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage();
ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
if (queryResponseMsg.headerHasBeenSent()) {
// fix for bug 35442
// This client is expecting 2 parts in this message so send 2 parts
queryResponseMsg.setServerConnection(serverConnection);
queryResponseMsg.setNumberOfParts(2);
queryResponseMsg.setLastChunkAndNumParts(true, 2);
queryResponseMsg.addObjPart(e);
queryResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending exception chunk while reply in progress: {}",
serverConnection.getName(), e.getMessage(), e);
}
queryResponseMsg.sendChunk(serverConnection);
} else {
chunkedResponseMsg.setServerConnection(serverConnection);
chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
chunkedResponseMsg.setNumberOfParts(2);
chunkedResponseMsg.setLastChunkAndNumParts(true, 2);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
chunkedResponseMsg.sendHeader();
chunkedResponseMsg.addObjPart(e);
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(),
e);
}
chunkedResponseMsg.sendChunk(serverConnection);
}
}
protected static void writeChunkedErrorResponse(Message origMsg, int messageType, String message,
ServerConnection serverConnection) throws IOException {
// Send chunked response header identifying error message
ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending error message header type: {} transaction: {}",
serverConnection.getName(), messageType, origMsg.getTransactionId());
}
chunkedResponseMsg.setMessageType(messageType);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
chunkedResponseMsg.sendHeader();
// Send actual error
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending error message chunk: {}", serverConnection.getName(), message);
}
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(true);
chunkedResponseMsg.addStringPart(message);
chunkedResponseMsg.sendChunk(serverConnection);
}
protected static void writeFunctionResponseException(Message origMsg, int messageType,
ServerConnection serverConnection, Throwable exception) throws IOException {
Throwable e = getClientException(serverConnection, exception);
ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage();
ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
if (functionResponseMsg.headerHasBeenSent()) {
functionResponseMsg.setServerConnection(serverConnection);
functionResponseMsg.setNumberOfParts(2);
functionResponseMsg.setLastChunkAndNumParts(true, 2);
functionResponseMsg.addObjPart(e);
functionResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending exception chunk while reply in progress: {}",
serverConnection.getName(), e.getMessage(), e);
}
functionResponseMsg.sendChunk(serverConnection);
} else {
chunkedResponseMsg.setServerConnection(serverConnection);
chunkedResponseMsg.setMessageType(messageType);
chunkedResponseMsg.setNumberOfParts(2);
chunkedResponseMsg.setLastChunkAndNumParts(true, 2);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
chunkedResponseMsg.sendHeader();
chunkedResponseMsg.addObjPart(e);
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(),
e);
}
chunkedResponseMsg.sendChunk(serverConnection);
}
}
protected static void writeFunctionResponseError(Message origMsg, int messageType, String message,
ServerConnection servConn) throws IOException {
ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
if (functionResponseMsg.headerHasBeenSent()) {
functionResponseMsg.setNumberOfParts(1);
functionResponseMsg.setLastChunk(true);
functionResponseMsg.addStringPart(message);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending Error chunk while reply in progress: {}", servConn.getName(),
message);
}
functionResponseMsg.sendChunk(servConn);
} else {
chunkedResponseMsg.setMessageType(messageType);
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(true);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
chunkedResponseMsg.sendHeader();
chunkedResponseMsg.addStringPart(message);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending Error chunk: {}", servConn.getName(), message);
}
chunkedResponseMsg.sendChunk(servConn);
}
}
protected static void writeKeySetErrorResponse(Message origMsg, int messageType, String message,
ServerConnection servConn) throws IOException {
// Send chunked response header identifying error message
ChunkedMessage chunkedResponseMsg = servConn.getKeySetResponseMessage();
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending error message header type: {} transaction: {}", servConn.getName(),
messageType, origMsg.getTransactionId());
}
chunkedResponseMsg.setMessageType(messageType);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
chunkedResponseMsg.sendHeader();
// Send actual error
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message);
}
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(true);
chunkedResponseMsg.addStringPart(message);
chunkedResponseMsg.sendChunk(servConn);
}
static Message readRequest(ServerConnection servConn) {
Message requestMsg = null;
try {
requestMsg = servConn.getRequestMessage();
requestMsg.receive(servConn, MAX_INCOMING_DATA, INCOMING_DATA_LIMITER, INCOMING_MSG_LIMITER);
return requestMsg;
} catch (EOFException eof) {
handleEOFException(null, servConn, eof);
// TODO: Check if there is any need for explicitly returning
} catch (InterruptedIOException e) { // Solaris only
handleInterruptedIOException(servConn, e);
} catch (IOException e) {
handleIOException(null, servConn, e);
} catch (DistributedSystemDisconnectedException e) {
handleShutdownException(null, servConn, e);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
SystemFailure.checkFailure();
handleThrowable(null, servConn, e);
}
return requestMsg;
}
protected static void fillAndSendRegisterInterestResponseChunks(LocalRegion region, Object riKey,
int interestType, InterestResultPolicy policy, ServerConnection servConn) throws IOException {
fillAndSendRegisterInterestResponseChunks(region, riKey, interestType, false, policy, servConn);
}
/**
* serializeValues is unused for clients < GFE_80
*/
protected static void fillAndSendRegisterInterestResponseChunks(LocalRegion region, Object riKey,
int interestType, boolean serializeValues, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
// Client is not interested.
if (policy.isNone()) {
sendRegisterInterestResponseChunk(region, riKey, new ArrayList(), true, servConn);
return;
}
if (policy.isKeysValues() && servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) {
handleKeysValuesPolicy(region, riKey, interestType, serializeValues, servConn);
return;
}
if (riKey instanceof List) {
handleList(region, (List) riKey, policy, servConn);
return;
}
if (!(riKey instanceof String)) {
handleSingleton(region, riKey, policy, servConn);
return;
}
switch (interestType) {
case InterestType.OQL_QUERY:
// Not supported yet
throw new InternalGemFireError(
"not yet supported");
case InterestType.FILTER_CLASS:
throw new InternalGemFireError(
"not yet supported");
case InterestType.REGULAR_EXPRESSION:
String regEx = (String) riKey;
if (regEx.equals(".*")) {
handleAllKeys(region, policy, servConn);
} else {
handleRegEx(region, regEx, policy, servConn);
}
break;
case InterestType.KEY:
if (riKey.equals("ALL_KEYS")) {
handleAllKeys(region, policy, servConn);
} else {
handleSingleton(region, riKey, policy, servConn);
}
break;
default:
throw new InternalGemFireError(
"unknown interest type");
}
}
private static void handleKeysValuesPolicy(LocalRegion region, Object riKey, int interestType,
boolean serializeValues, ServerConnection servConn) throws IOException {
if (riKey instanceof List) {
handleKVList(region, (List) riKey, serializeValues, servConn);
return;
}
if (!(riKey instanceof String)) {
handleKVSingleton(region, riKey, serializeValues, servConn);
return;
}
switch (interestType) {
case InterestType.OQL_QUERY:
throw new InternalGemFireError(
"not yet supported");
case InterestType.FILTER_CLASS:
throw new InternalGemFireError(
"not yet supported");
case InterestType.REGULAR_EXPRESSION:
String regEx = (String) riKey;
if (regEx.equals(".*")) {
handleKVAllKeys(region, null, serializeValues, servConn);
} else {
handleKVAllKeys(region, regEx, serializeValues, servConn);
}
break;
case InterestType.KEY:
if (riKey.equals("ALL_KEYS")) {
handleKVAllKeys(region, null, serializeValues, servConn);
} else {
handleKVSingleton(region, riKey, serializeValues, servConn);
}
break;
default:
throw new InternalGemFireError(
"unknown interest type");
}
}
/**
* @param list is a List of entry keys
*/
private static void sendRegisterInterestResponseChunk(Region region, Object riKey, List list,
boolean lastChunk, ServerConnection servConn) throws IOException {
ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
chunkedResponseMsg.addObjPart(list, false);
String regionName = region == null ? " null " : region.getFullPath();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>",
servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg);
}
chunkedResponseMsg.sendChunk(servConn);
}
/**
* Determines whether keys for destroyed entries (tombstones) should be sent to clients in
* register-interest results.
*
* @return true if tombstones should be sent to the client
*/
private static boolean sendTombstonesInRIResults(ServerConnection servConn,
InterestResultPolicy policy) {
return policy == InterestResultPolicy.KEYS_VALUES
&& servConn.getClientVersion().compareTo(Version.GFE_80) >= 0;
}
/**
* Process an interest request involving a list of keys
*
* @param region the region
* @param keyList the list of keys
* @param policy the policy
*/
private static void handleList(LocalRegion region, List keyList, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
if (region instanceof PartitionedRegion) {
// too bad java doesn't provide another way to do this...
handleListPR((PartitionedRegion) region, keyList, policy, servConn);
return;
}
List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
// Handle list of keys
if (region != null) {
for (Object entryKey : keyList) {
if (region.containsKey(entryKey)
|| sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) {
appendInterestResponseKey(region, keyList, entryKey, newKeyList, servConn);
}
}
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn);
}
/**
* Handles both RR and PR cases
*/
@SuppressWarnings(value = "NP_NULL_PARAM_DEREF",
justification = "Null value handled in sendNewRegisterInterestResponseChunk()")
private static void handleKVSingleton(LocalRegion region, Object entryKey,
boolean serializeValues, ServerConnection servConn) throws IOException {
VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues);
if (region != null) {
if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
VersionTagHolder versionHolder = createVersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
// From Get70.getValueAndIsObject()
Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
VersionTag vt = versionHolder.getVersionTag();
updateValues(values, entryKey, data, vt);
}
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendNewRegisterInterestResponseChunk(region, entryKey, values, true, servConn);
}
/**
* Process an interest request consisting of a single key
*
* @param region the region
* @param entryKey the key
* @param policy the policy
*/
private static void handleSingleton(LocalRegion region, Object entryKey,
InterestResultPolicy policy, ServerConnection servConn) throws IOException {
List keyList = new ArrayList(1);
if (region != null) {
if (region.containsKey(entryKey)
|| sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) {
appendInterestResponseKey(region, entryKey, entryKey, keyList, servConn);
}
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendRegisterInterestResponseChunk(region, entryKey, keyList, true, servConn);
}
/**
* Process an interest request of type ALL_KEYS
*
* @param region the region
* @param policy the policy
*/
private static void handleAllKeys(LocalRegion region, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
if (region != null) {
for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) {
appendInterestResponseKey(region, "ALL_KEYS", entryKey, keyList, servConn);
}
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true, servConn);
}
private static void handleKVAllKeys(LocalRegion region, String regex, boolean serializeValues,
ServerConnection servConn) throws IOException {
if (region instanceof PartitionedRegion) {
handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn);
return;
}
VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues);
if (region != null) {
Pattern keyPattern = null;
if (regex != null) {
keyPattern = Pattern.compile(regex);
}
for (Object key : region.keySet(true)) {
VersionTagHolder versionHolder = createVersionTagHolder();
if (keyPattern != null) {
if (!(key instanceof String)) {
// key is not a String, cannot apply regex to this entry
continue;
}
if (!keyPattern.matcher((String) key).matches()) {
// key does not match the regex, this entry should not be
// returned.
continue;
}
}
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
Object data = region.get(key, null, true, true, true, id, versionHolder, true);
VersionTag versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
if (values.size() == MAXIMUM_CHUNK_SIZE) {
sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values,
false, servConn);
values.clear();
}
} // for
} // if
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, true,
servConn);
}
private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo,
boolean serializeValues, ServerConnection servConn) throws IOException {
VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
region.getConcurrencyChecksEnabled(), serializeValues);
if (keyInfo instanceof List) {
HashMap<Integer, HashSet> bucketKeys = new HashMap<>();
for (Object key : (List) keyInfo) {
int id = PartitionedRegionHelper.getHashKey(region, null, key, null, null);
if (bucketKeys.containsKey(id)) {
bucketKeys.get(id).add(key);
} else {
HashSet<Object> keys = new HashSet<>();
keys.add(key);
bucketKeys.put(id, keys);
}
}
region.fetchEntries(bucketKeys, values, servConn);
} else { // keyInfo is a String
region.fetchEntries((String) keyInfo, values, servConn);
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendNewRegisterInterestResponseChunk(region, keyInfo != null ? keyInfo : "ALL_KEYS", values,
true, servConn);
}
/**
* Copied from Get70.getValueAndIsObject(), except a minor change. (Make the method static instead
* of copying it here?)
*/
private static void updateValues(VersionedObjectList values, Object key, Object value,
VersionTag versionTag) {
boolean isObject = true;
// If the value in the VM is a CachedDeserializable,
// get its value. If it is Token.REMOVED, Token.DESTROYED,
// Token.INVALID, or Token.LOCAL_INVALID
// set it to null. If it is NOT_AVAILABLE, get the value from
// disk. If it is already a byte[], set isObject to false.
boolean wasInvalid = false;
if (value instanceof CachedDeserializable) {
value = ((CachedDeserializable) value).getValue();
} else if (isRemovalToken(value)) {
value = null;
} else if (value == Token.INVALID || value == Token.LOCAL_INVALID) {
value = null; // fix for bug 35884
wasInvalid = true;
} else if (value instanceof byte[]) {
isObject = false;
}
boolean keyNotPresent = !wasInvalid && (value == null || value == Token.TOMBSTONE);
if (keyNotPresent) {
values.addObjectPartForAbsentKey(key, value, versionTag);
} else {
values.addObjectPart(key, value, isObject, versionTag);
}
}
private static boolean isRemovalToken(final Object value) {
return value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2
|| value == Token.DESTROYED || value == Token.TOMBSTONE;
}
public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
throws IOException {
ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
for (Object key : keySet) {
VersionTagHolder versionHolder = createVersionTagHolder();
Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
updateValues(values, key, value, versionHolder.getVersionTag());
if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
// values.setKeys(null); // Now we need to send keys too.
sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values,
false, servConn);
values.clear();
}
} // for
}
public static void appendNewRegisterInterestResponseChunk(LocalRegion region,
VersionedObjectList values, Object riKeys, Set<Map.Entry> set, ServerConnection servConn)
throws IOException {
for (Entry entry : set) {
if (entry instanceof Region.Entry) { // local entries
VersionTag vt;
Object key;
Object value;
if (entry instanceof EntrySnapshot) {
vt = ((EntrySnapshot) entry).getVersionTag();
key = ((EntrySnapshot) entry).getRegionEntry().getKey();
value = ((EntrySnapshot) entry).getRegionEntry().getValue(null);
updateValues(values, key, value, vt);
} else {
VersionStamp vs = ((NonTXEntry) entry).getRegionEntry().getVersionStamp();
vt = vs == null ? null : vs.asVersionTag();
key = entry.getKey();
value = ((NonTXEntry) entry).getRegionEntry().getValueRetain(region, true);
try {
updateValues(values, key, value, vt);
} finally {
OffHeapHelper.release(value);
}
}
} else { // Map.Entry (remote entries)
List list = (List) entry.getValue();
Object value = list.get(0);
VersionTag tag = (VersionTag) list.get(1);
updateValues(values, entry.getKey(), value, tag);
}
if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values,
false, servConn);
values.clear();
}
} // for
}
public static void sendNewRegisterInterestResponseChunk(LocalRegion region, Object riKey,
VersionedObjectList list, boolean lastChunk, ServerConnection servConn) throws IOException {
ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
chunkedResponseMsg.addObjPart(list, false);
String regionName = region == null ? " null " : region.getFullPath();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>",
servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg);
}
chunkedResponseMsg.sendChunk(servConn);
}
/**
* Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
*/
private static void handleRegEx(LocalRegion region, String regex, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
if (region instanceof PartitionedRegion) {
// too bad java doesn't provide another way to do this...
handleRegExPR((PartitionedRegion) region, regex, policy, servConn);
return;
}
List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
// Handle the regex pattern
if (region != null) {
Pattern keyPattern = Pattern.compile(regex);
for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) {
if (!(entryKey instanceof String)) {
// key is not a String, cannot apply regex to this entry
continue;
}
if (!keyPattern.matcher((String) entryKey).matches()) {
// key does not match the regex, this entry should not be returned.
continue;
}
appendInterestResponseKey(region, regex, entryKey, keyList, servConn);
}
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn);
}
/**
* Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
*/
private static void handleRegExPR(final PartitionedRegion region, final String regex,
final InterestResultPolicy policy, final ServerConnection servConn) throws IOException {
final List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy),
new PartitionedRegion.SetCollector() {
@Override
public void receiveSet(Set theSet) throws IOException {
appendInterestResponseKeys(region, regex, theSet, keyList, servConn);
}
});
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn);
}
/**
* Process an interest request involving a list of keys
*/
private static void handleListPR(final PartitionedRegion region, final List keyList,
final InterestResultPolicy policy, final ServerConnection servConn) throws IOException {
final List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy),
new PartitionedRegion.SetCollector() {
@Override
public void receiveSet(Set theSet) throws IOException {
appendInterestResponseKeys(region, keyList, theSet, newKeyList, servConn);
}
});
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn);
}
private static void handleKVList(final LocalRegion region, final List keyList,
boolean serializeValues, final ServerConnection servConn) throws IOException {
if (region instanceof PartitionedRegion) {
handleKVKeysPR((PartitionedRegion) region, keyList, serializeValues, servConn);
return;
}
VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues);
// Handle list of keys
if (region != null) {
for (Object key : keyList) {
if (region.containsKey(key) || region.containsTombstone(key)) {
VersionTagHolder versionHolder = createVersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
Object data = region.get(key, null, true, true, true, id, versionHolder, true);
VersionTag versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
// values.setKeys(null); // Now we need to send keys too.
sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn);
values.clear();
}
}
}
}
// Send the last chunk (the only chunk for individual and list keys)
// always send it back, even if the list is of zero size.
sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn);
}
private static VersionTagHolder createVersionTagHolder() {
VersionTagHolder versionHolder = new VersionTagHolder();
versionHolder.setOperation(Operation.GET_FOR_REGISTER_INTEREST);
return versionHolder;
}
/**
* Append an interest response
*
* @param region the region (for debugging)
* @param riKey the registerInterest "key" (what the client is interested in)
* @param entryKey key we're responding to
* @param list list to append to
*/
private static void appendInterestResponseKey(LocalRegion region, Object riKey, Object entryKey,
List list, ServerConnection servConn) throws IOException {
list.add(entryKey);
if (logger.isDebugEnabled()) {
logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}",
servConn.getName(), entryKey, list.size(), region.getFullPath());
}
if (list.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
sendRegisterInterestResponseChunk(region, riKey, list, false, servConn);
list.clear();
}
}
private static void appendInterestResponseKeys(LocalRegion region, Object riKey,
Collection entryKeys, List collector, ServerConnection servConn) throws IOException {
for (final Object entryKey : entryKeys) {
appendInterestResponseKey(region, riKey, entryKey, collector, servConn);
}
}
}