blob: 0497c2c71fa005425d55d7c151a12b20b32fe082 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.geode.CancelException;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.operations.DestroyOperationContext;
import org.apache.geode.cache.operations.PutOperationContext;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.EventIDHolder;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.pdx.PdxConfigurationException;
import org.apache.geode.pdx.PdxRegistryMismatchException;
import org.apache.geode.pdx.internal.EnumId;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
public class GatewayReceiverCommand extends BaseCommand {
@Immutable
private static final GatewayReceiverCommand SINGLETON = new GatewayReceiverCommand();
public static Command getCommand() {
return SINGLETON;
}
private GatewayReceiverCommand() {
// nothing
}
private void handleRegionNull(ServerConnection servConn, String regionName, int batchId) {
InternalCache cache = servConn.getCachedRegionHelper().getCacheForGatewayCommand();
if (cache != null && cache.isCacheAtShutdownAll()) {
throw cache.getCacheClosedException("Shutdown occurred during message processing");
}
String reason = String.format("Region %s was not found during batch create request %s",
regionName, batchId);
throw new RegionDestroyedException(reason, regionName);
}
@Override
public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
final SecurityService securityService, long start) throws IOException, InterruptedException {
CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats();
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadProcessBatchRequestTime(start - oldStart);
}
stats.incBatchSize(clientMessage.getPayloadLength());
// Retrieve the number of events
Part numberOfEventsPart = clientMessage.getPart(0);
int numberOfEvents = numberOfEventsPart.getInt();
stats.incEventsReceived(numberOfEvents);
// Retrieve the batch id
Part batchIdPart = clientMessage.getPart(1);
int batchId = batchIdPart.getInt();
// If this batch has already been seen, do not reply.
// Instead, drop the batch and continue.
if (batchId <= serverConnection.getLatestBatchIdReplied()) {
if (GatewayReceiver.APPLY_RETRIES) {
// Do nothing!!!
logger.warn(
"Received process batch request {} that has already been or is being processed. gemfire.gateway.ApplyRetries is set, so this batch will be processed anyway.",
batchId);
} else {
logger.warn(
"Received process batch request {} that has already been or is being processed. This process batch request is being ignored.",
batchId);
writeReply(clientMessage, serverConnection, batchId, numberOfEvents);
return;
}
stats.incDuplicateBatchesReceived();
}
// Verify the batches arrive in order
if (batchId != serverConnection.getLatestBatchIdReplied() + 1) {
logger.warn(
"Received process batch request {} out of order. The id of the last batch processed was {}. This batch request will be processed, but some messages may have been lost.",
batchId, serverConnection.getLatestBatchIdReplied());
stats.incOutoforderBatchesReceived();
}
if (logger.isDebugEnabled()) {
logger.debug("Received process batch request {} that will be processed.", batchId);
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}",
serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
"normal", serverConnection.getSocketString());
}
// Retrieve the events from the message parts. The '2' below
// represents the number of events (part0) and the batchId (part1)
int partNumber = 2;
int dsid = clientMessage.getPart(partNumber++).getInt();
boolean removeOnException = clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1;
// event received in batch also have PDX events at the start of the batch,to
// represent correct index on which the exception occurred, number of PDX
// events need to be subtracted.
int indexWithoutPDXEvent = -1; //
Part valuePart = null;
Throwable fatalException = null;
List<BatchException70> exceptions = new ArrayList<>();
for (int i = 0; i < numberOfEvents; i++) {
indexWithoutPDXEvent++;
Part actionTypePart = clientMessage.getPart(partNumber);
int actionType = actionTypePart.getInt();
boolean callbackArgExists = false;
try {
boolean isPdxEvent = false;
boolean retry = true;
do {
if (isPdxEvent) {
// This is a retried event. Reset the PDX event index.
indexWithoutPDXEvent++;
}
isPdxEvent = false;
Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1);
byte[] possibleDuplicatePartBytes;
try {
possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
} catch (Exception e) {
logger.warn(String.format(
"%s: Caught exception processing batch request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
break;
}
boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
// Make sure instance variables are null before each iteration
String regionName = null;
Object key = null;
Object callbackArg = null;
// Retrieve the region name from the message parts
Part regionNamePart = clientMessage.getPart(partNumber + 2);
regionName = regionNamePart.getCachedString();
if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
indexWithoutPDXEvent--;
isPdxEvent = true;
}
// Retrieve the event id from the message parts
// This was going to be used to determine possible
// duplication of events, but it is unused now. In
// fact the event id is overridden by the FROM_GATEWAY
// token.
Part eventIdPart = clientMessage.getPart(partNumber + 3);
eventIdPart.setVersion(serverConnection.getClientVersion());
// String eventId = eventIdPart.getString();
EventID eventId;
try {
eventId = (EventID) eventIdPart.getObject();
} catch (Exception e) {
logger.warn(String.format(
"%s: Caught exception processing batch request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
break;
}
// Retrieve the key from the message parts
Part keyPart = clientMessage.getPart(partNumber + 4);
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
logger.warn(String.format(
"%s: Caught exception processing batch request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
break;
}
int index;
Part callbackArgPart;
EventIDHolder clientEvent;
long versionTimeStamp;
Part callbackArgExistsPart;
LocalRegion region;
switch (actionType) {
case 0: // Create
try {
/*
* CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key;
* System.out.println("Processing new key: " + key); if
* (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
* .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
* )); }
*/
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
// try {
// logger.warn(getName() + ": Creating key " + key + " value " +
// valuePart.getObject());
// } catch (Exception e) {}
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 6;
callbackArgExistsPart = clientMessage.getPart(index++);
{
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
}
if (callbackArgExists) {
callbackArgPart = clientMessage.getPart(index++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger
.warn(String.format(
"%s: Caught exception processing batch create request %s for %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
throw e;
}
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}",
serverConnection.getName(), batchId, serverConnection.getSocketString(),
regionName, key, valuePart, callbackArg, eventId);
}
versionTimeStamp = clientMessage.getPart(index++).getLong();
// Process the create request
if (key == null || regionName == null) {
String message = null;
if (key == null) {
message = "%s: The input key for the batch create request %s is null";
}
if (regionName == null) {
message = "%s: The input region name for the batch create request %s is null";
}
String s = String.format(message, serverConnection.getName(), batchId);
logger.warn(s);
throw new Exception(s);
}
region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
tag.setVersionTimeStamp(versionTimeStamp);
tag.setDistributedSystemId(dsid);
clientEvent.setVersionTag(tag);
}
clientEvent.setPossibleDuplicate(possibleDuplicate);
handleMessageRetry(region, clientEvent);
byte[] value = valuePart.getSerializedForm();
boolean isObject = valuePart.isObject();
// [sumedh] This should be done on client while sending
// since that is the WAN gateway
AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
PutOperationContext putContext =
authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg);
value = putContext.getSerializedValue();
isObject = putContext.isObject();
}
// Attempt to create the entry
boolean result = false;
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
result = region.basicBridgeCreate(key, value, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent, false);
// If the create fails (presumably because it already exists),
// attempt to update the entry
if (!result) {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
}
}
if (result || clientEvent.isConcurrencyConflict()) {
serverConnection.setModificationInfo(true, regionName, key);
stats.incCreateRequest();
retry = false;
} else {
// This exception will be logged in the catch block below
throw new Exception(
String.format(
"%s: Failed to create or update entry for region %s key %s value %s callbackArg %s",
serverConnection.getName(), regionName, key, valuePart, callbackArg));
}
}
} catch (Exception e) {
logger.warn(String.format(
"%s: Caught exception processing batch create request %s for %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
}
break;
case 1: // Update
try {
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
// try {
// logger.warn(getName() + ": Updating key " + key + " value " +
// valuePart.getObject());
// } catch (Exception e) {}
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 6;
callbackArgExistsPart = clientMessage.getPart(index++);
{
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
}
if (callbackArgExists) {
callbackArgPart = clientMessage.getPart(index++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger
.warn(
String.format(
"%s: Caught exception processing batch update request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
throw e;
}
}
versionTimeStamp = clientMessage.getPart(index++).getLong();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}",
serverConnection.getName(), batchId, serverConnection.getSocketString(),
regionName, key, valuePart, callbackArg);
}
// Process the update request
if (key == null || regionName == null) {
String message = null;
if (key == null) {
message = "%s: The input key for the batch update request %s is null";
}
if (regionName == null) {
message = "%s: The input region name for the batch update request %s is null";
}
String s = String.format(message, serverConnection.getName(), batchId);
logger.warn(s);
throw new Exception(s);
}
region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
tag.setVersionTimeStamp(versionTimeStamp);
tag.setDistributedSystemId(dsid);
clientEvent.setVersionTag(tag);
}
clientEvent.setPossibleDuplicate(possibleDuplicate);
handleMessageRetry(region, clientEvent);
byte[] value = valuePart.getSerializedForm();
boolean isObject = valuePart.isObject();
AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
PutOperationContext putContext = authzRequest.putAuthorize(regionName, key,
value, isObject, callbackArg, PutOperationContext.UPDATE);
value = putContext.getSerializedValue();
isObject = putContext.isObject();
}
boolean result = false;
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
}
if (result || clientEvent.isConcurrencyConflict()) {
serverConnection.setModificationInfo(true, regionName, key);
stats.incUpdateRequest();
retry = false;
} else {
final String message =
"%s: Failed to update entry for region %s, key %s, value %s, and callbackArg %s";
String s = String.format(message, serverConnection.getName(), regionName,
key, valuePart, callbackArg);
logger.info(s);
throw new Exception(s);
}
}
} catch (Exception e) {
// Preserve the connection under all circumstances
logger.warn(String.format(
"%s: Caught exception processing batch update request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
}
break;
case 2: // Destroy
try {
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 5;
callbackArgExistsPart = clientMessage.getPart(index++);
{
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
}
if (callbackArgExists) {
callbackArgPart = clientMessage.getPart(index++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger
.warn(
String.format(
"%s: Caught exception processing batch destroy request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
throw e;
}
}
versionTimeStamp = clientMessage.getPart(index++).getLong();
if (logger.isDebugEnabled()) {
logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}",
serverConnection.getName(), batchId, serverConnection.getSocketString(),
regionName, key);
}
// Process the destroy request
if (key == null || regionName == null) {
String message = null;
if (key == null) {
message =
"%s: The input key for the batch destroy request %s is null";
}
if (regionName == null) {
message =
"%s: The input region name for the batch destroy request %s is null";
}
String s = String.format(message, serverConnection.getName(), batchId);
logger.warn(s);
throw new Exception(s);
}
region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
tag.setVersionTimeStamp(versionTimeStamp);
tag.setDistributedSystemId(dsid);
clientEvent.setVersionTag(tag);
}
handleMessageRetry(region, clientEvent);
// Destroy the entry
AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
DestroyOperationContext destroyContext =
authzRequest.destroyAuthorize(regionName, key, callbackArg);
callbackArg = destroyContext.getCallbackArg();
}
try {
region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(),
false, clientEvent);
serverConnection.setModificationInfo(true, regionName, key);
} catch (EntryNotFoundException e) {
logger.info("{}: during batch destroy no entry was found for key {}",
serverConnection.getName(), key);
}
stats.incDestroyRequest();
retry = false;
}
} catch (Exception e) {
logger.warn(String.format(
"%s: Caught exception processing batch destroy request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
handleException(removeOnException, stats, e);
}
break;
case 3: // Update Time-stamp for a RegionEntry
try {
// Region name
regionNamePart = clientMessage.getPart(partNumber + 2);
regionName = regionNamePart.getCachedString();
// Retrieve the event id from the message parts
eventIdPart = clientMessage.getPart(partNumber + 3);
eventId = (EventID) eventIdPart.getObject();
// Retrieve the key from the message parts
keyPart = clientMessage.getPart(partNumber + 4);
key = keyPart.getStringOrObject();
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 5;
callbackArgExistsPart = clientMessage.getPart(index++);
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
if (callbackArgExists) {
callbackArgPart = clientMessage.getPart(index++);
callbackArg = callbackArgPart.getObject();
}
versionTimeStamp = clientMessage.getPart(index++).getLong();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}",
serverConnection.getName(), batchId, serverConnection.getSocketString(),
regionName, key, valuePart, callbackArg);
}
// Process the update time-stamp request
if (key == null || regionName == null) {
String message =
"%s: Caught exception processing batch update version request request %s containing %s events";
String s = String.format(message, serverConnection.getName(),
batchId, numberOfEvents);
logger.warn(s);
throw new Exception(s);
} else {
region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
tag.setVersionTimeStamp(versionTimeStamp);
tag.setDistributedSystemId(dsid);
clientEvent.setVersionTag(tag);
}
// Update the version tag
try {
region.basicBridgeUpdateVersionStamp(key, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
} catch (EntryNotFoundException e) {
logger.info(
"Entry for key {} was not found in Region {} during ProcessBatch for Update Entry Version",
serverConnection.getName(), key);
}
retry = false;
}
}
} catch (Exception e) {
logger.warn(String.format(
"%s: Caught exception processing batch update version request request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
}
break;
default:
logger.fatal("{}: Unknown action type ({}) for batch from {}",
serverConnection.getName(), actionType, serverConnection.getSocketString());
stats.incUnknowsOperationsReceived();
}
} while (retry);
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"{} ignoring message of type {} from client {} because shutdown occurred during message processing.",
serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()),
serverConnection.getProxyID());
}
serverConnection.setFlagProcessMessagesAsFalse();
serverConnection.setClientDisconnectedException(e);
return;
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
checkForInterrupt(serverConnection, e);
// If we have an issue with the PDX registry, stop processing more data
if (e.getCause() instanceof PdxRegistryMismatchException) {
fatalException = e.getCause();
logger.fatal(String.format(
"This gateway receiver has received a PDX type from %s that does match the existing PDX type. This gateway receiver will not process any more events, in order to prevent receiving objects which may not be deserializable.",
serverConnection.getMembershipID()), e.getCause());
break;
}
// Increment the batch id unless the received batch id is -1 (a
// failover batch)
DistributedSystem ds = crHelper.getCacheForGatewayCommand().getDistributedSystem();
String exceptionMessage = String.format(
"Exception occurred while processing a batch on the receiver running on DistributedSystem with Id: %s, DistributedMember on which the receiver is running: %s",
((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(),
ds.getDistributedMember());
BatchException70 be =
new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, batchId);
exceptions.add(be);
} finally {
// Increment the partNumber
if (actionType == 0 /* create */ || actionType == 1 /* update */) {
if (callbackArgExists) {
partNumber += 9;
} else {
partNumber += 8;
}
} else if (actionType == 2 /* destroy */) {
if (callbackArgExists) {
partNumber += 8;
} else {
partNumber += 7;
}
} else if (actionType == 3 /* update-version */) {
if (callbackArgExists) {
partNumber += 8;
} else {
partNumber += 7;
}
}
}
}
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incProcessBatchTime(start - oldStart);
}
if (fatalException != null) {
serverConnection.incrementLatestBatchIdReplied(batchId);
writeFatalException(clientMessage, fatalException, serverConnection, batchId);
serverConnection.setAsTrue(RESPONDED);
} else if (!exceptions.isEmpty()) {
serverConnection.incrementLatestBatchIdReplied(batchId);
writeBatchException(clientMessage, exceptions, serverConnection, batchId);
serverConnection.setAsTrue(RESPONDED);
} else {
// Increment the batch id unless the received batch id is -1 (a failover
// batch)
serverConnection.incrementLatestBatchIdReplied(batchId);
writeReply(clientMessage, serverConnection, batchId, numberOfEvents);
serverConnection.setAsTrue(RESPONDED);
stats.incWriteProcessBatchResponseTime(DistributionStats.getStatTime() - start);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Sent process batch normal response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}",
serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
"normal", serverConnection.getSocketString());
}
}
}
private boolean addPdxType(CachedRegionHelper crHelper, Object key, Object value)
throws Exception {
if (key instanceof EnumId) {
EnumId enumId = (EnumId) key;
value = BlobHelper.deserializeBlob((byte[]) value);
crHelper.getCacheForGatewayCommand().getPdxRegistry().addRemoteEnum(enumId.intValue(),
(EnumInfo) value);
} else {
value = BlobHelper.deserializeBlob((byte[]) value);
crHelper.getCacheForGatewayCommand().getPdxRegistry().addRemoteType((int) key,
(PdxType) value);
}
return true;
}
private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e)
throws Exception {
if (shouldThrowException(removeOnException, e)) {
throw e;
} else {
stats.incEventsRetried();
Thread.sleep(500);
}
}
private boolean shouldThrowException(boolean removeOnException, Exception e) {
// Split out in case specific exceptions would short-circuit retry logic.
// Currently it just considers the boolean.
return removeOnException;
}
private void handleMessageRetry(LocalRegion region, EntryEventImpl clientEvent) {
if (clientEvent.isPossibleDuplicate()) {
if (region.getAttributes().getConcurrencyChecksEnabled()) {
// recover the version tag from other servers
clientEvent.setRegion(region);
if (!recoverVersionTagForRetriedOperation(clientEvent)) {
// no-one has seen this event
clientEvent.setPossibleDuplicate(false);
}
}
}
}
private void writeReply(Message msg, ServerConnection servConn, int batchId, int numberOfEvents)
throws IOException {
Message replyMsg = servConn.getResponseMessage();
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setTransactionId(msg.getTransactionId());
replyMsg.setNumberOfParts(2);
replyMsg.addIntPart(batchId);
replyMsg.addIntPart(numberOfEvents);
replyMsg.setTransactionId(msg.getTransactionId());
replyMsg.send(servConn);
servConn.setAsTrue(Command.RESPONDED);
if (logger.isDebugEnabled()) {
logger.debug("{}: rpl tx: {} batchId {} numberOfEvents: {}", servConn.getName(),
msg.getTransactionId(), batchId, numberOfEvents);
}
}
private static void writeBatchException(Message origMsg, List<BatchException70> exceptions,
ServerConnection servConn, int batchId) throws IOException {
Message errorMsg = servConn.getErrorResponseMessage();
errorMsg.setMessageType(MessageType.EXCEPTION);
errorMsg.setNumberOfParts(2);
errorMsg.setTransactionId(origMsg.getTransactionId());
errorMsg.addObjPart(exceptions);
// errorMsg.addStringPart(be.toString());
errorMsg.send(servConn);
for (Exception e : exceptions) {
((GatewayReceiverStats) servConn.getCacheServerStats()).incExceptionsOccurred();
}
for (Exception be : exceptions) {
if (logger.isWarnEnabled()) {
logger.warn(servConn.getName() + ": Wrote batch exception: ",
be);
}
}
}
private static void writeFatalException(Message origMsg, Throwable exception,
ServerConnection servConn, int batchId) throws IOException {
Message errorMsg = servConn.getErrorResponseMessage();
errorMsg.setMessageType(MessageType.EXCEPTION);
errorMsg.setNumberOfParts(2);
errorMsg.setTransactionId(origMsg.getTransactionId());
// For older gateway senders, we need to send back an exception
// they can deserialize.
if ((servConn.getClientVersion() == null
|| servConn.getClientVersion().compareTo(Version.GFE_80) < 0)
&& exception instanceof PdxRegistryMismatchException) {
PdxConfigurationException newException =
new PdxConfigurationException(exception.getMessage());
newException.setStackTrace(exception.getStackTrace());
exception = newException;
}
errorMsg.addObjPart(exception);
// errorMsg.addStringPart(be.toString());
errorMsg.send(servConn);
logger.warn(servConn.getName() + ": Wrote batch exception: ",
exception);
}
}