| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets.command; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.operations.DestroyOperationContext; |
| import org.apache.geode.cache.operations.PutOperationContext; |
| import org.apache.geode.cache.wan.GatewayReceiver; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.internal.DistributionStats; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.EventIDHolder; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.tier.CachedRegionHelper; |
| import org.apache.geode.internal.cache.tier.Command; |
| import org.apache.geode.internal.cache.tier.MessageType; |
| import org.apache.geode.internal.cache.tier.sockets.BaseCommand; |
| import org.apache.geode.internal.cache.tier.sockets.Message; |
| import org.apache.geode.internal.cache.tier.sockets.Part; |
| import org.apache.geode.internal.cache.tier.sockets.ServerConnection; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.cache.wan.BatchException70; |
| import org.apache.geode.internal.cache.wan.GatewayReceiverStats; |
| import org.apache.geode.internal.security.AuthorizeRequest; |
| import org.apache.geode.internal.security.SecurityService; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.util.BlobHelper; |
| import org.apache.geode.pdx.PdxConfigurationException; |
| import org.apache.geode.pdx.PdxRegistryMismatchException; |
| import org.apache.geode.pdx.internal.EnumId; |
| import org.apache.geode.pdx.internal.EnumInfo; |
| import org.apache.geode.pdx.internal.PdxType; |
| import org.apache.geode.pdx.internal.PeerTypeRegistration; |
| |
| public class GatewayReceiverCommand extends BaseCommand { |
| |
| @Immutable |
| private static final GatewayReceiverCommand SINGLETON = new GatewayReceiverCommand(); |
| |
| public static Command getCommand() { |
| return SINGLETON; |
| } |
| |
| private GatewayReceiverCommand() { |
| // nothing |
| } |
| |
| private void handleRegionNull(ServerConnection servConn, String regionName, int batchId) { |
| InternalCache cache = servConn.getCachedRegionHelper().getCacheForGatewayCommand(); |
| if (cache != null && cache.isCacheAtShutdownAll()) { |
| throw cache.getCacheClosedException("Shutdown occurred during message processing"); |
| } |
| String reason = String.format("Region %s was not found during batch create request %s", |
| regionName, batchId); |
| throw new RegionDestroyedException(reason, regionName); |
| } |
| |
| @Override |
| public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, |
| final SecurityService securityService, long start) throws IOException, InterruptedException { |
| CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); |
| GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats(); |
| |
| { |
| long oldStart = start; |
| start = DistributionStats.getStatTime(); |
| stats.incReadProcessBatchRequestTime(start - oldStart); |
| } |
| |
| stats.incBatchSize(clientMessage.getPayloadLength()); |
| |
| // Retrieve the number of events |
| Part numberOfEventsPart = clientMessage.getPart(0); |
| int numberOfEvents = numberOfEventsPart.getInt(); |
| stats.incEventsReceived(numberOfEvents); |
| |
| // Retrieve the batch id |
| Part batchIdPart = clientMessage.getPart(1); |
| int batchId = batchIdPart.getInt(); |
| |
| // If this batch has already been seen, do not reply. |
| // Instead, drop the batch and continue. |
| if (batchId <= serverConnection.getLatestBatchIdReplied()) { |
| if (GatewayReceiver.APPLY_RETRIES) { |
| // Do nothing!!! |
| logger.warn( |
| "Received process batch request {} that has already been or is being processed. gemfire.gateway.ApplyRetries is set, so this batch will be processed anyway.", |
| batchId); |
| } else { |
| logger.warn( |
| "Received process batch request {} that has already been or is being processed. This process batch request is being ignored.", |
| batchId); |
| writeReply(clientMessage, serverConnection, batchId, numberOfEvents); |
| return; |
| } |
| stats.incDuplicateBatchesReceived(); |
| } |
| |
| // Verify the batches arrive in order |
| if (batchId != serverConnection.getLatestBatchIdReplied() + 1) { |
| logger.warn( |
| "Received process batch request {} out of order. The id of the last batch processed was {}. This batch request will be processed, but some messages may have been lost.", |
| batchId, serverConnection.getLatestBatchIdReplied()); |
| stats.incOutoforderBatchesReceived(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Received process batch request {} that will be processed.", batchId); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}", |
| serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), |
| "normal", serverConnection.getSocketString()); |
| } |
| |
| // Retrieve the events from the message parts. The '2' below |
| // represents the number of events (part0) and the batchId (part1) |
| int partNumber = 2; |
| int dsid = clientMessage.getPart(partNumber++).getInt(); |
| |
| boolean removeOnException = clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1; |
| |
| // event received in batch also have PDX events at the start of the batch,to |
| // represent correct index on which the exception occurred, number of PDX |
| // events need to be subtracted. |
| int indexWithoutPDXEvent = -1; // |
| Part valuePart = null; |
| Throwable fatalException = null; |
| List<BatchException70> exceptions = new ArrayList<>(); |
| for (int i = 0; i < numberOfEvents; i++) { |
| indexWithoutPDXEvent++; |
| |
| Part actionTypePart = clientMessage.getPart(partNumber); |
| int actionType = actionTypePart.getInt(); |
| |
| boolean callbackArgExists = false; |
| |
| try { |
| boolean isPdxEvent = false; |
| boolean retry = true; |
| do { |
| if (isPdxEvent) { |
| // This is a retried event. Reset the PDX event index. |
| indexWithoutPDXEvent++; |
| } |
| isPdxEvent = false; |
| Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1); |
| byte[] possibleDuplicatePartBytes; |
| try { |
| possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject(); |
| } catch (Exception e) { |
| logger.warn(String.format( |
| "%s: Caught exception processing batch request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), e); |
| handleException(removeOnException, stats, e); |
| break; |
| } |
| boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01; |
| |
| // Make sure instance variables are null before each iteration |
| String regionName = null; |
| Object key = null; |
| Object callbackArg = null; |
| |
| // Retrieve the region name from the message parts |
| Part regionNamePart = clientMessage.getPart(partNumber + 2); |
| regionName = regionNamePart.getCachedString(); |
| if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) { |
| indexWithoutPDXEvent--; |
| isPdxEvent = true; |
| } |
| |
| // Retrieve the event id from the message parts |
| // This was going to be used to determine possible |
| // duplication of events, but it is unused now. In |
| // fact the event id is overridden by the FROM_GATEWAY |
| // token. |
| Part eventIdPart = clientMessage.getPart(partNumber + 3); |
| eventIdPart.setVersion(serverConnection.getClientVersion()); |
| // String eventId = eventIdPart.getString(); |
| EventID eventId; |
| try { |
| eventId = (EventID) eventIdPart.getObject(); |
| } catch (Exception e) { |
| logger.warn(String.format( |
| "%s: Caught exception processing batch request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), e); |
| handleException(removeOnException, stats, e); |
| break; |
| } |
| |
| // Retrieve the key from the message parts |
| Part keyPart = clientMessage.getPart(partNumber + 4); |
| try { |
| key = keyPart.getStringOrObject(); |
| } catch (Exception e) { |
| logger.warn(String.format( |
| "%s: Caught exception processing batch request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), e); |
| handleException(removeOnException, stats, e); |
| break; |
| } |
| int index; |
| Part callbackArgPart; |
| EventIDHolder clientEvent; |
| long versionTimeStamp; |
| Part callbackArgExistsPart; |
| LocalRegion region; |
| switch (actionType) { |
| case 0: // Create |
| try { |
| |
| /* |
| * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key; |
| * System.out.println("Processing new key: " + key); if |
| * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings |
| * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER |
| * )); } |
| */ |
| |
| // Retrieve the value from the message parts (do not deserialize it) |
| valuePart = clientMessage.getPart(partNumber + 5); |
| // try { |
| // logger.warn(getName() + ": Creating key " + key + " value " + |
| // valuePart.getObject()); |
| // } catch (Exception e) {} |
| |
| // Retrieve the callbackArg from the message parts if necessary |
| index = partNumber + 6; |
| callbackArgExistsPart = clientMessage.getPart(index++); |
| { |
| byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); |
| callbackArgExists = partBytes[0] == 0x01; |
| } |
| if (callbackArgExists) { |
| callbackArgPart = clientMessage.getPart(index++); |
| try { |
| callbackArg = callbackArgPart.getObject(); |
| } catch (Exception e) { |
| logger |
| .warn(String.format( |
| "%s: Caught exception processing batch create request %s for %s events", |
| serverConnection.getName(), batchId, numberOfEvents), |
| e); |
| throw e; |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}", |
| serverConnection.getName(), batchId, serverConnection.getSocketString(), |
| regionName, key, valuePart, callbackArg, eventId); |
| } |
| versionTimeStamp = clientMessage.getPart(index++).getLong(); |
| // Process the create request |
| if (key == null || regionName == null) { |
| String message = null; |
| if (key == null) { |
| message = "%s: The input key for the batch create request %s is null"; |
| } |
| if (regionName == null) { |
| message = "%s: The input region name for the batch create request %s is null"; |
| } |
| String s = String.format(message, serverConnection.getName(), batchId); |
| logger.warn(s); |
| throw new Exception(s); |
| } |
| region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName); |
| if (region == null) { |
| handleRegionNull(serverConnection, regionName, batchId); |
| } else { |
| clientEvent = new EventIDHolder(eventId); |
| if (versionTimeStamp > 0) { |
| VersionTag tag = VersionTag.create(region.getVersionMember()); |
| tag.setIsGatewayTag(true); |
| tag.setVersionTimeStamp(versionTimeStamp); |
| tag.setDistributedSystemId(dsid); |
| clientEvent.setVersionTag(tag); |
| } |
| clientEvent.setPossibleDuplicate(possibleDuplicate); |
| handleMessageRetry(region, clientEvent); |
| byte[] value = valuePart.getSerializedForm(); |
| boolean isObject = valuePart.isObject(); |
| // [sumedh] This should be done on client while sending |
| // since that is the WAN gateway |
| AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); |
| if (authzRequest != null) { |
| PutOperationContext putContext = |
| authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg); |
| value = putContext.getSerializedValue(); |
| isObject = putContext.isObject(); |
| } |
| // Attempt to create the entry |
| boolean result = false; |
| if (isPdxEvent) { |
| result = addPdxType(crHelper, key, value); |
| } else { |
| result = region.basicBridgeCreate(key, value, isObject, callbackArg, |
| serverConnection.getProxyID(), false, clientEvent, false); |
| // If the create fails (presumably because it already exists), |
| // attempt to update the entry |
| if (!result) { |
| result = region.basicBridgePut(key, value, null, isObject, callbackArg, |
| serverConnection.getProxyID(), false, clientEvent); |
| } |
| } |
| |
| if (result || clientEvent.isConcurrencyConflict()) { |
| serverConnection.setModificationInfo(true, regionName, key); |
| stats.incCreateRequest(); |
| retry = false; |
| } else { |
| // This exception will be logged in the catch block below |
| throw new Exception( |
| String.format( |
| "%s: Failed to create or update entry for region %s key %s value %s callbackArg %s", |
| serverConnection.getName(), regionName, key, valuePart, callbackArg)); |
| } |
| } |
| } catch (Exception e) { |
| logger.warn(String.format( |
| "%s: Caught exception processing batch create request %s for %s events", |
| serverConnection.getName(), batchId, numberOfEvents), e); |
| handleException(removeOnException, stats, e); |
| } |
| break; |
| |
| case 1: // Update |
| try { |
| // Retrieve the value from the message parts (do not deserialize it) |
| valuePart = clientMessage.getPart(partNumber + 5); |
| // try { |
| // logger.warn(getName() + ": Updating key " + key + " value " + |
| // valuePart.getObject()); |
| // } catch (Exception e) {} |
| |
| // Retrieve the callbackArg from the message parts if necessary |
| index = partNumber + 6; |
| callbackArgExistsPart = clientMessage.getPart(index++); |
| { |
| byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); |
| callbackArgExists = partBytes[0] == 0x01; |
| } |
| if (callbackArgExists) { |
| callbackArgPart = clientMessage.getPart(index++); |
| try { |
| callbackArg = callbackArgPart.getObject(); |
| } catch (Exception e) { |
| logger |
| .warn( |
| String.format( |
| "%s: Caught exception processing batch update request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), |
| e); |
| throw e; |
| } |
| } |
| versionTimeStamp = clientMessage.getPart(index++).getLong(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}", |
| serverConnection.getName(), batchId, serverConnection.getSocketString(), |
| regionName, key, valuePart, callbackArg); |
| } |
| // Process the update request |
| if (key == null || regionName == null) { |
| String message = null; |
| if (key == null) { |
| message = "%s: The input key for the batch update request %s is null"; |
| } |
| if (regionName == null) { |
| message = "%s: The input region name for the batch update request %s is null"; |
| } |
| String s = String.format(message, serverConnection.getName(), batchId); |
| logger.warn(s); |
| throw new Exception(s); |
| } |
| region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName); |
| if (region == null) { |
| handleRegionNull(serverConnection, regionName, batchId); |
| } else { |
| clientEvent = new EventIDHolder(eventId); |
| if (versionTimeStamp > 0) { |
| VersionTag tag = VersionTag.create(region.getVersionMember()); |
| tag.setIsGatewayTag(true); |
| tag.setVersionTimeStamp(versionTimeStamp); |
| tag.setDistributedSystemId(dsid); |
| clientEvent.setVersionTag(tag); |
| } |
| clientEvent.setPossibleDuplicate(possibleDuplicate); |
| handleMessageRetry(region, clientEvent); |
| byte[] value = valuePart.getSerializedForm(); |
| boolean isObject = valuePart.isObject(); |
| AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); |
| if (authzRequest != null) { |
| PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, |
| value, isObject, callbackArg, PutOperationContext.UPDATE); |
| value = putContext.getSerializedValue(); |
| isObject = putContext.isObject(); |
| } |
| boolean result = false; |
| if (isPdxEvent) { |
| result = addPdxType(crHelper, key, value); |
| } else { |
| result = region.basicBridgePut(key, value, null, isObject, callbackArg, |
| serverConnection.getProxyID(), false, clientEvent); |
| } |
| if (result || clientEvent.isConcurrencyConflict()) { |
| serverConnection.setModificationInfo(true, regionName, key); |
| stats.incUpdateRequest(); |
| retry = false; |
| } else { |
| final String message = |
| "%s: Failed to update entry for region %s, key %s, value %s, and callbackArg %s"; |
| String s = String.format(message, serverConnection.getName(), regionName, |
| key, valuePart, callbackArg); |
| logger.info(s); |
| throw new Exception(s); |
| } |
| } |
| } catch (Exception e) { |
| // Preserve the connection under all circumstances |
| logger.warn(String.format( |
| "%s: Caught exception processing batch update request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), e); |
| handleException(removeOnException, stats, e); |
| } |
| break; |
| |
| case 2: // Destroy |
| try { |
| // Retrieve the callbackArg from the message parts if necessary |
| index = partNumber + 5; |
| callbackArgExistsPart = clientMessage.getPart(index++); |
| { |
| byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); |
| callbackArgExists = partBytes[0] == 0x01; |
| } |
| if (callbackArgExists) { |
| callbackArgPart = clientMessage.getPart(index++); |
| try { |
| callbackArg = callbackArgPart.getObject(); |
| } catch (Exception e) { |
| logger |
| .warn( |
| String.format( |
| "%s: Caught exception processing batch destroy request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), |
| e); |
| throw e; |
| } |
| } |
| |
| versionTimeStamp = clientMessage.getPart(index++).getLong(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}", |
| serverConnection.getName(), batchId, serverConnection.getSocketString(), |
| regionName, key); |
| } |
| |
| // Process the destroy request |
| if (key == null || regionName == null) { |
| String message = null; |
| if (key == null) { |
| message = |
| "%s: The input key for the batch destroy request %s is null"; |
| } |
| if (regionName == null) { |
| message = |
| "%s: The input region name for the batch destroy request %s is null"; |
| } |
| String s = String.format(message, serverConnection.getName(), batchId); |
| logger.warn(s); |
| throw new Exception(s); |
| } |
| region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName); |
| if (region == null) { |
| handleRegionNull(serverConnection, regionName, batchId); |
| } else { |
| clientEvent = new EventIDHolder(eventId); |
| if (versionTimeStamp > 0) { |
| VersionTag tag = VersionTag.create(region.getVersionMember()); |
| tag.setIsGatewayTag(true); |
| tag.setVersionTimeStamp(versionTimeStamp); |
| tag.setDistributedSystemId(dsid); |
| clientEvent.setVersionTag(tag); |
| } |
| handleMessageRetry(region, clientEvent); |
| // Destroy the entry |
| AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); |
| if (authzRequest != null) { |
| DestroyOperationContext destroyContext = |
| authzRequest.destroyAuthorize(regionName, key, callbackArg); |
| callbackArg = destroyContext.getCallbackArg(); |
| } |
| try { |
| region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), |
| false, clientEvent); |
| serverConnection.setModificationInfo(true, regionName, key); |
| } catch (EntryNotFoundException e) { |
| logger.info("{}: during batch destroy no entry was found for key {}", |
| serverConnection.getName(), key); |
| } |
| stats.incDestroyRequest(); |
| retry = false; |
| } |
| } catch (Exception e) { |
| logger.warn(String.format( |
| "%s: Caught exception processing batch destroy request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), |
| e); |
| handleException(removeOnException, stats, e); |
| } |
| break; |
| |
| case 3: // Update Time-stamp for a RegionEntry |
| try { |
| // Region name |
| regionNamePart = clientMessage.getPart(partNumber + 2); |
| regionName = regionNamePart.getCachedString(); |
| |
| // Retrieve the event id from the message parts |
| eventIdPart = clientMessage.getPart(partNumber + 3); |
| eventId = (EventID) eventIdPart.getObject(); |
| |
| // Retrieve the key from the message parts |
| keyPart = clientMessage.getPart(partNumber + 4); |
| key = keyPart.getStringOrObject(); |
| |
| // Retrieve the callbackArg from the message parts if necessary |
| index = partNumber + 5; |
| callbackArgExistsPart = clientMessage.getPart(index++); |
| |
| byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); |
| callbackArgExists = partBytes[0] == 0x01; |
| |
| if (callbackArgExists) { |
| callbackArgPart = clientMessage.getPart(index++); |
| callbackArg = callbackArgPart.getObject(); |
| } |
| |
| versionTimeStamp = clientMessage.getPart(index++).getLong(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}", |
| serverConnection.getName(), batchId, serverConnection.getSocketString(), |
| regionName, key, valuePart, callbackArg); |
| } |
| // Process the update time-stamp request |
| if (key == null || regionName == null) { |
| String message = |
| "%s: Caught exception processing batch update version request request %s containing %s events"; |
| |
| String s = String.format(message, serverConnection.getName(), |
| batchId, numberOfEvents); |
| logger.warn(s); |
| throw new Exception(s); |
| |
| } else { |
| region = (LocalRegion) crHelper.getCacheForGatewayCommand().getRegion(regionName); |
| |
| if (region == null) { |
| handleRegionNull(serverConnection, regionName, batchId); |
| } else { |
| |
| clientEvent = new EventIDHolder(eventId); |
| |
| if (versionTimeStamp > 0) { |
| VersionTag tag = VersionTag.create(region.getVersionMember()); |
| tag.setIsGatewayTag(true); |
| tag.setVersionTimeStamp(versionTimeStamp); |
| tag.setDistributedSystemId(dsid); |
| clientEvent.setVersionTag(tag); |
| } |
| |
| // Update the version tag |
| try { |
| region.basicBridgeUpdateVersionStamp(key, callbackArg, |
| serverConnection.getProxyID(), false, clientEvent); |
| } catch (EntryNotFoundException e) { |
| logger.info( |
| "Entry for key {} was not found in Region {} during ProcessBatch for Update Entry Version", |
| serverConnection.getName(), key); |
| } |
| retry = false; |
| } |
| } |
| } catch (Exception e) { |
| logger.warn(String.format( |
| "%s: Caught exception processing batch update version request request %s containing %s events", |
| serverConnection.getName(), batchId, numberOfEvents), e); |
| handleException(removeOnException, stats, e); |
| } |
| |
| break; |
| default: |
| logger.fatal("{}: Unknown action type ({}) for batch from {}", |
| serverConnection.getName(), actionType, serverConnection.getSocketString()); |
| stats.incUnknowsOperationsReceived(); |
| } |
| } while (retry); |
| } catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{} ignoring message of type {} from client {} because shutdown occurred during message processing.", |
| serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()), |
| serverConnection.getProxyID()); |
| } |
| serverConnection.setFlagProcessMessagesAsFalse(); |
| serverConnection.setClientDisconnectedException(e); |
| return; |
| } catch (Exception e) { |
| // If an interrupted exception is thrown , rethrow it |
| checkForInterrupt(serverConnection, e); |
| |
| // If we have an issue with the PDX registry, stop processing more data |
| if (e.getCause() instanceof PdxRegistryMismatchException) { |
| fatalException = e.getCause(); |
| logger.fatal(String.format( |
| "This gateway receiver has received a PDX type from %s that does match the existing PDX type. This gateway receiver will not process any more events, in order to prevent receiving objects which may not be deserializable.", |
| serverConnection.getMembershipID()), e.getCause()); |
| break; |
| } |
| |
| // Increment the batch id unless the received batch id is -1 (a |
| // failover batch) |
| DistributedSystem ds = crHelper.getCacheForGatewayCommand().getDistributedSystem(); |
| String exceptionMessage = String.format( |
| "Exception occurred while processing a batch on the receiver running on DistributedSystem with Id: %s, DistributedMember on which the receiver is running: %s", |
| ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(), |
| ds.getDistributedMember()); |
| BatchException70 be = |
| new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, batchId); |
| exceptions.add(be); |
| } finally { |
| // Increment the partNumber |
| if (actionType == 0 /* create */ || actionType == 1 /* update */) { |
| if (callbackArgExists) { |
| partNumber += 9; |
| } else { |
| partNumber += 8; |
| } |
| } else if (actionType == 2 /* destroy */) { |
| if (callbackArgExists) { |
| partNumber += 8; |
| } else { |
| partNumber += 7; |
| } |
| } else if (actionType == 3 /* update-version */) { |
| if (callbackArgExists) { |
| partNumber += 8; |
| } else { |
| partNumber += 7; |
| } |
| } |
| } |
| } |
| |
| { |
| long oldStart = start; |
| start = DistributionStats.getStatTime(); |
| stats.incProcessBatchTime(start - oldStart); |
| } |
| if (fatalException != null) { |
| serverConnection.incrementLatestBatchIdReplied(batchId); |
| writeFatalException(clientMessage, fatalException, serverConnection, batchId); |
| serverConnection.setAsTrue(RESPONDED); |
| } else if (!exceptions.isEmpty()) { |
| serverConnection.incrementLatestBatchIdReplied(batchId); |
| writeBatchException(clientMessage, exceptions, serverConnection, batchId); |
| serverConnection.setAsTrue(RESPONDED); |
| } else { |
| // Increment the batch id unless the received batch id is -1 (a failover |
| // batch) |
| serverConnection.incrementLatestBatchIdReplied(batchId); |
| |
| writeReply(clientMessage, serverConnection, batchId, numberOfEvents); |
| serverConnection.setAsTrue(RESPONDED); |
| stats.incWriteProcessBatchResponseTime(DistributionStats.getStatTime() - start); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Sent process batch normal response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", |
| serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), |
| "normal", serverConnection.getSocketString()); |
| } |
| } |
| } |
| |
| private boolean addPdxType(CachedRegionHelper crHelper, Object key, Object value) |
| throws Exception { |
| if (key instanceof EnumId) { |
| EnumId enumId = (EnumId) key; |
| value = BlobHelper.deserializeBlob((byte[]) value); |
| crHelper.getCacheForGatewayCommand().getPdxRegistry().addRemoteEnum(enumId.intValue(), |
| (EnumInfo) value); |
| } else { |
| value = BlobHelper.deserializeBlob((byte[]) value); |
| crHelper.getCacheForGatewayCommand().getPdxRegistry().addRemoteType((int) key, |
| (PdxType) value); |
| } |
| return true; |
| } |
| |
| private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e) |
| throws Exception { |
| if (shouldThrowException(removeOnException, e)) { |
| throw e; |
| } else { |
| stats.incEventsRetried(); |
| Thread.sleep(500); |
| } |
| } |
| |
| private boolean shouldThrowException(boolean removeOnException, Exception e) { |
| // Split out in case specific exceptions would short-circuit retry logic. |
| // Currently it just considers the boolean. |
| return removeOnException; |
| } |
| |
| private void handleMessageRetry(LocalRegion region, EntryEventImpl clientEvent) { |
| if (clientEvent.isPossibleDuplicate()) { |
| if (region.getAttributes().getConcurrencyChecksEnabled()) { |
| // recover the version tag from other servers |
| clientEvent.setRegion(region); |
| if (!recoverVersionTagForRetriedOperation(clientEvent)) { |
| // no-one has seen this event |
| clientEvent.setPossibleDuplicate(false); |
| } |
| } |
| } |
| } |
| |
| private void writeReply(Message msg, ServerConnection servConn, int batchId, int numberOfEvents) |
| throws IOException { |
| Message replyMsg = servConn.getResponseMessage(); |
| replyMsg.setMessageType(MessageType.REPLY); |
| replyMsg.setTransactionId(msg.getTransactionId()); |
| replyMsg.setNumberOfParts(2); |
| replyMsg.addIntPart(batchId); |
| replyMsg.addIntPart(numberOfEvents); |
| replyMsg.setTransactionId(msg.getTransactionId()); |
| replyMsg.send(servConn); |
| servConn.setAsTrue(Command.RESPONDED); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: rpl tx: {} batchId {} numberOfEvents: {}", servConn.getName(), |
| msg.getTransactionId(), batchId, numberOfEvents); |
| } |
| } |
| |
| private static void writeBatchException(Message origMsg, List<BatchException70> exceptions, |
| ServerConnection servConn, int batchId) throws IOException { |
| Message errorMsg = servConn.getErrorResponseMessage(); |
| errorMsg.setMessageType(MessageType.EXCEPTION); |
| errorMsg.setNumberOfParts(2); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| |
| errorMsg.addObjPart(exceptions); |
| // errorMsg.addStringPart(be.toString()); |
| errorMsg.send(servConn); |
| for (Exception e : exceptions) { |
| ((GatewayReceiverStats) servConn.getCacheServerStats()).incExceptionsOccurred(); |
| } |
| for (Exception be : exceptions) { |
| if (logger.isWarnEnabled()) { |
| logger.warn(servConn.getName() + ": Wrote batch exception: ", |
| be); |
| } |
| } |
| } |
| |
| private static void writeFatalException(Message origMsg, Throwable exception, |
| ServerConnection servConn, int batchId) throws IOException { |
| Message errorMsg = servConn.getErrorResponseMessage(); |
| errorMsg.setMessageType(MessageType.EXCEPTION); |
| errorMsg.setNumberOfParts(2); |
| errorMsg.setTransactionId(origMsg.getTransactionId()); |
| |
| // For older gateway senders, we need to send back an exception |
| // they can deserialize. |
| if ((servConn.getClientVersion() == null |
| || servConn.getClientVersion().compareTo(Version.GFE_80) < 0) |
| && exception instanceof PdxRegistryMismatchException) { |
| PdxConfigurationException newException = |
| new PdxConfigurationException(exception.getMessage()); |
| newException.setStackTrace(exception.getStackTrace()); |
| exception = newException; |
| } |
| errorMsg.addObjPart(exception); |
| // errorMsg.addStringPart(be.toString()); |
| errorMsg.send(servConn); |
| logger.warn(servConn.getName() + ": Wrote batch exception: ", |
| exception); |
| } |
| } |