| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tx; |
| |
| import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.EntryExistsException; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.TransactionDataNotColocatedException; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DirectReplyProcessor; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyMessage; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.ReplySender; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.NanoTimer; |
| import org.apache.geode.internal.cache.AbstractRegion; |
| import org.apache.geode.internal.cache.CachedDeserializable; |
| import org.apache.geode.internal.cache.DataLocationException; |
| import org.apache.geode.internal.cache.DistributedCacheOperation; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.InternalRegion; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.RemoteOperationException; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.versions.DiskVersionTag; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.offheap.annotations.Unretained; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * A class that specifies a destroy operation. Used by ReplicateRegions. Note: The reason for |
| * different classes for Destroy and Invalidate is to prevent sending an extra bit for every |
| * RemoteDestroyMessage to differentiate an invalidate versus a destroy. The assumption is that |
| * these operations are used frequently, if they are not then it makes sense to fold the destroy and |
| * the invalidate into the same message and use an extra bit to differentiate. |
| * |
| * This message is used by transactions to destroy an entry on a transaction hosted on a remote |
| * member. It is also used by non-transactional region destroys that need to generate a VersionTag |
| * on a remote member. |
| * |
| * @since GemFire 6.5 |
| * |
| */ |
| public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply |
| implements OldValueImporter { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** The key associated with the value that must be sent */ |
| private Object key; |
| |
| /** The callback arg */ |
| private Object cbArg; |
| |
| /** The operation performed on the sender */ |
| private Operation op; |
| |
| /** |
| * An additional object providing context for the operation, e.g., for BridgeServer notification |
| */ |
| ClientProxyMembershipID bridgeContext; |
| |
| /** event identifier */ |
| EventID eventId; |
| |
| /** for relayed messages, this is the original sender of the message */ |
| InternalDistributedMember originalSender; |
| |
| /** whether the message has old value */ |
| private boolean hasOldValue = false; |
| |
| /** whether old value is serialized */ |
| private boolean oldValueIsSerialized = false; |
| |
| private Object expectedOldValue; |
| |
| private byte[] oldValBytes; |
| |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| private transient Object oldValObj; |
| |
| boolean useOriginRemote; |
| |
| protected boolean possibleDuplicate; |
| |
| VersionTag<?> versionTag; |
| |
| // additional bitmask flags used for serialization/deserialization |
| |
| protected static final short USE_ORIGIN_REMOTE = UNRESERVED_FLAGS_START; |
| protected static final short HAS_OLD_VALUE = (USE_ORIGIN_REMOTE << 1); |
| protected static final short CACHE_WRITE = (HAS_OLD_VALUE << 1); |
| protected static final short HAS_BRIDGE_CONTEXT = (CACHE_WRITE << 1); |
| protected static final short HAS_ORIGINAL_SENDER = (HAS_BRIDGE_CONTEXT << 1); |
| protected static final int HAS_VERSION_TAG = (HAS_ORIGINAL_SENDER << 1); |
| |
| /** |
| * Empty constructor to satisfy {@link DataSerializer} requirements |
| */ |
| public RemoteDestroyMessage() {} |
| |
| protected RemoteDestroyMessage(DistributedMember recipient, String regionPath, |
| DirectReplyProcessor processor, EntryEventImpl event, Object expectedOldValue, |
| boolean useOriginRemote, boolean possibleDuplicate) { |
| super((InternalDistributedMember) recipient, regionPath, processor); |
| this.expectedOldValue = expectedOldValue; |
| this.key = event.getKey(); |
| this.cbArg = event.getRawCallbackArgument(); |
| this.op = event.getOperation(); |
| this.bridgeContext = event.getContext(); |
| this.eventId = event.getEventId(); |
| this.useOriginRemote = useOriginRemote; |
| this.possibleDuplicate = possibleDuplicate; |
| this.versionTag = event.getVersionTag(); |
| Assert.assertTrue(this.eventId != null); |
| |
| // added for old value if available sent over the wire for cache servers. |
| if (event.hasOldValue()) { |
| this.hasOldValue = true; |
| event.exportOldValue(this); |
| } |
| } |
| |
| private void setOldValBytes(byte[] valBytes) { |
| this.oldValBytes = valBytes; |
| } |
| |
| private void setOldValObj(@Unretained(ENTRY_EVENT_OLD_VALUE) Object o) { |
| this.oldValObj = o; |
| } |
| |
| public byte[] getOldValueBytes() { |
| return this.oldValBytes; |
| } |
| |
| private Object getOldValObj() { |
| return this.oldValObj; |
| } |
| |
| protected boolean getHasOldValue() { |
| return this.hasOldValue; |
| } |
| |
| protected boolean getOldValueIsSerialized() { |
| return this.oldValueIsSerialized; |
| } |
| |
| /** |
| * Set the old value for this message, only used if there are cqs registered on one of the bridge |
| * servers. |
| * |
| * @param event underlying event. |
| * @since GemFire 5.5 |
| */ |
| public void setOldValue(EntryEventImpl event) { |
| if (event.hasOldValue()) { |
| this.hasOldValue = true; |
| CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue(); |
| if (cd != null) { |
| if (!cd.isSerialized()) { |
| // it is a byte[] |
| this.oldValueIsSerialized = false; |
| setOldValBytes((byte[]) cd.getDeserializedForReading()); |
| } else { |
| this.oldValueIsSerialized = true; |
| Object o = cd.getValue(); |
| if (o instanceof byte[]) { |
| setOldValBytes((byte[]) o); |
| } else { |
| // Defer serialization until toData is called. |
| setOldValObj(o); |
| } |
| } |
| } else { |
| Object old = event.getRawOldValue(); |
| if (old instanceof byte[]) { |
| this.oldValueIsSerialized = false; |
| setOldValBytes((byte[]) old); |
| } else { |
| this.oldValueIsSerialized = true; |
| setOldValObj(AbstractRegion.handleNotAvailable(old)); |
| } |
| } |
| } |
| } |
| |
| |
| @SuppressWarnings("unchecked") |
| public static boolean distribute(EntryEventImpl event, Object expectedOldValue, |
| boolean onlyPersistent) { |
| boolean successful = false; |
| DistributedRegion r = (DistributedRegion) event.getRegion(); |
| Collection<InternalDistributedMember> replicates = onlyPersistent |
| ? r.getCacheDistributionAdvisor().adviseInitializedPersistentMembers().keySet() |
| : r.getCacheDistributionAdvisor().adviseInitializedReplicates(); |
| if (replicates.isEmpty()) { |
| return false; |
| } |
| if (replicates.size() > 1) { |
| ArrayList<InternalDistributedMember> l = new ArrayList<>(replicates); |
| Collections.shuffle(l); |
| replicates = l; |
| } |
| int attempts = 0; |
| for (InternalDistributedMember replicate : replicates) { |
| try { |
| attempts++; |
| final boolean posDup = (attempts > 1); |
| RemoteDestroyReplyProcessor processor = |
| send(replicate, event.getRegion(), event, expectedOldValue, false, posDup); |
| processor.waitForRemoteResponse(); |
| VersionTag<?> versionTag = processor.getVersionTag(); |
| if (versionTag != null) { |
| event.setVersionTag(versionTag); |
| if (event.getRegion().getVersionVector() != null) { |
| event.getRegion().getVersionVector().recordVersion(versionTag.getMemberID(), |
| versionTag); |
| } |
| } |
| event.setInhibitDistribution(true); |
| return true; |
| |
| } catch (EntryNotFoundException e) { |
| throw new EntryNotFoundException("" + event.getKey()); |
| |
| } catch (TransactionDataNotColocatedException enfe) { |
| throw enfe; |
| |
| } catch (CancelException e) { |
| event.getRegion().getCancelCriterion().checkCancelInProgress(e); |
| |
| } catch (CacheException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("RemoteDestroyMessage caught CacheException during distribution", e); |
| } |
| successful = true; // not a cancel-exception, so don't complain any more about it |
| |
| } catch (RegionDestroyedException | RemoteOperationException e) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, |
| "RemoteDestroyMessage caught an exception during distribution; retrying to another member", |
| e); |
| } |
| } |
| } |
| return successful; |
| } |
| |
| /** |
| * Sends a RemoteDestroyMessage {@link org.apache.geode.cache.Region#destroy(Object)}message to |
| * the recipient |
| * |
| * @param recipient the recipient of the message |
| * @param r the ReplicateRegion for which the destroy was performed |
| * @param event the event causing this message |
| * @return the processor used to await the potential {@link org.apache.geode.cache.CacheException} |
| */ |
| public static RemoteDestroyReplyProcessor send(DistributedMember recipient, InternalRegion r, |
| EntryEventImpl event, Object expectedOldValue, boolean useOriginRemote, |
| boolean possibleDuplicate) throws RemoteOperationException { |
| RemoteDestroyReplyProcessor p = |
| new RemoteDestroyReplyProcessor(r.getSystem(), recipient, false); |
| p.requireResponse(); |
| RemoteDestroyMessage m = new RemoteDestroyMessage(recipient, r.getFullPath(), p, event, |
| expectedOldValue, useOriginRemote, possibleDuplicate); |
| Set<?> failures = r.getDistributionManager().putOutgoing(m); |
| if (failures != null && failures.size() > 0) { |
| throw new RemoteOperationException( |
| String.format("Failed sending < %s >", m)); |
| } |
| return p; |
| } |
| |
| /** |
| * This method is called upon receipt and make the desired changes to the region. Note: It is very |
| * important that this message does NOT cause any deadlocks as the sender will wait indefinitely |
| * for the acknowledgement |
| */ |
| @Override |
| protected boolean operateOnRegion(ClusterDistributionManager dm, LocalRegion r, long startTime) |
| throws EntryExistsException, RemoteOperationException { |
| InternalDistributedMember eventSender = originalSender; |
| if (eventSender == null) { |
| eventSender = getSender(); |
| } |
| @Released |
| EntryEventImpl event = null; |
| try { |
| if (this.bridgeContext != null) { |
| event = EntryEventImpl.create(r, getOperation(), getKey(), null/* newValue */, |
| getCallbackArg(), false/* originRemote */, eventSender, true/* generateCallbacks */); |
| event.setContext(this.bridgeContext); |
| |
| // for cq processing and client notification by BS. |
| if (this.hasOldValue) { |
| if (this.oldValueIsSerialized) { |
| event.setSerializedOldValue(getOldValueBytes()); |
| } else { |
| event.setOldValue(getOldValueBytes()); |
| } |
| } |
| } // bridgeContext != null |
| else { |
| event = EntryEventImpl.create(r, getOperation(), getKey(), null, /* newValue */ |
| getCallbackArg(), this.useOriginRemote, eventSender, true/* generateCallbacks */, |
| false/* initializeId */); |
| } |
| |
| event.setCausedByMessage(this); |
| |
| if (this.versionTag != null) { |
| this.versionTag.replaceNullIDs(getSender()); |
| event.setVersionTag(this.versionTag); |
| } |
| // for cq processing and client notification by BS. |
| if (this.hasOldValue) { |
| if (this.oldValueIsSerialized) { |
| event.setSerializedOldValue(getOldValueBytes()); |
| } else { |
| event.setOldValue(getOldValueBytes()); |
| } |
| } |
| |
| Assert.assertTrue(eventId != null); |
| event.setEventId(eventId); |
| |
| event.setPossibleDuplicate(this.possibleDuplicate); |
| |
| try { |
| r.getDataView().destroyOnRemote(event, true, this.expectedOldValue); |
| sendReply(dm, event.getVersionTag()); |
| } catch (CacheWriterException cwe) { |
| sendReply(getSender(), this.processorId, dm, new ReplyException(cwe), r, startTime); |
| return false; |
| } catch (EntryNotFoundException eee) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("operateOnRegion caught EntryNotFoundException", eee); |
| } |
| ReplyMessage.send(getSender(), getProcessorId(), new ReplyException(eee), |
| getReplySender(dm), r.isInternalRegion()); |
| } catch (DataLocationException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("operateOnRegion caught DataLocationException"); |
| } |
| ReplyMessage.send(getSender(), getProcessorId(), new ReplyException(e), getReplySender(dm), |
| r.isInternalRegion()); |
| } |
| return false; |
| } finally { |
| if (event != null) { |
| event.release(); |
| } |
| } |
| } |
| |
| @Override |
| public int getDSFID() { |
| return R_DESTROY_MESSAGE; |
| } |
| |
| private void sendReply(DistributionManager dm, VersionTag<?> versionTag) { |
| DestroyReplyMessage.send(this.getSender(), getReplySender(dm), this.processorId, versionTag); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| setKey(DataSerializer.readObject(in)); |
| this.cbArg = DataSerializer.readObject(in); |
| this.op = Operation.fromOrdinal(in.readByte()); |
| if ((flags & HAS_BRIDGE_CONTEXT) != 0) { |
| this.bridgeContext = DataSerializer.readObject(in); |
| } |
| if ((flags & HAS_ORIGINAL_SENDER) != 0) { |
| this.originalSender = DataSerializer.readObject(in); |
| } |
| this.eventId = DataSerializer.readObject(in); |
| |
| // for old values for CQs |
| if (this.hasOldValue) { |
| // out.writeBoolean(this.hasOldValue); |
| // below boolean is not strictly required, but this is for compatibility |
| in.readByte(); |
| setOldValBytes(DataSerializer.readByteArray(in)); |
| } |
| this.expectedOldValue = DataSerializer.readObject(in); |
| // to prevent bug 51024 always call readObject for versionTag |
| // since toData always calls writeObject for versionTag. |
| this.versionTag = DataSerializer.readObject(in); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| DataSerializer.writeObject(getKey(), out); |
| DataSerializer.writeObject(this.cbArg, out); |
| out.writeByte(this.op.ordinal); |
| if (this.bridgeContext != null) { |
| DataSerializer.writeObject(this.bridgeContext, out); |
| } |
| if (this.originalSender != null) { |
| DataSerializer.writeObject(this.originalSender, out); |
| } |
| DataSerializer.writeObject(this.eventId, out); |
| |
| // this will be on wire for cqs old value generations. |
| if (this.hasOldValue) { |
| out.writeByte(this.oldValueIsSerialized ? 1 : 0); |
| byte policy = DistributedCacheOperation.valueIsToDeserializationPolicy(oldValueIsSerialized); |
| DistributedCacheOperation.writeValue(policy, getOldValObj(), getOldValueBytes(), out); |
| } |
| DataSerializer.writeObject(this.expectedOldValue, out); |
| DataSerializer.writeObject(this.versionTag, out); |
| } |
| |
| @Override |
| protected void setFlags(short flags, DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.setFlags(flags, in, context); |
| this.hasOldValue = (flags & HAS_OLD_VALUE) != 0; |
| this.useOriginRemote = (flags & USE_ORIGIN_REMOTE) != 0; |
| this.possibleDuplicate = (flags & POS_DUP) != 0; |
| } |
| |
| @Override |
| protected short computeCompressedShort() { |
| short s = super.computeCompressedShort(); |
| // this will be on wire for cqs old value generations. |
| if (this.hasOldValue) |
| s |= HAS_OLD_VALUE; |
| if (this.useOriginRemote) |
| s |= USE_ORIGIN_REMOTE; |
| if (this.possibleDuplicate) |
| s |= POS_DUP; |
| if (this.bridgeContext != null) |
| s |= HAS_BRIDGE_CONTEXT; |
| if (this.originalSender != null) |
| s |= HAS_ORIGINAL_SENDER; |
| if (this.versionTag != null) |
| s |= HAS_VERSION_TAG; |
| return s; |
| } |
| |
| @Override |
| public EventID getEventID() { |
| return this.eventId; |
| } |
| |
| /** |
| * Assists the toString method in reporting the contents of this message |
| * |
| */ |
| @Override |
| protected void appendFields(StringBuffer buff) { |
| super.appendFields(buff); |
| buff.append("; key=").append(getKey()); |
| if (originalSender != null) { |
| buff.append("; originalSender=").append(originalSender); |
| } |
| if (bridgeContext != null) { |
| buff.append("; bridgeContext=").append(bridgeContext); |
| } |
| if (eventId != null) { |
| buff.append("; eventId=").append(eventId); |
| } |
| buff.append("; hasOldValue= ").append(this.hasOldValue); |
| } |
| |
| protected Object getKey() { |
| return this.key; |
| } |
| |
| private void setKey(Object key) { |
| this.key = key; |
| } |
| |
| public Operation getOperation() { |
| return this.op; |
| } |
| |
| protected Object getCallbackArg() { |
| return this.cbArg; |
| } |
| |
| @Override |
| public boolean prefersOldSerialized() { |
| return true; |
| } |
| |
| @Override |
| public boolean isUnretainedOldReferenceOk() { |
| return true; |
| } |
| |
| @Override |
| public boolean isCachedDeserializableValueOk() { |
| return false; |
| } |
| |
| private void setOldValueIsSerialized(boolean isSerialized) { |
| if (isSerialized) { |
| // Defer serialization until toData is called. |
| this.oldValueIsSerialized = true; // VALUE_IS_SERIALIZED_OBJECT; |
| } else { |
| this.oldValueIsSerialized = false; // VALUE_IS_BYTES; |
| } |
| } |
| |
| @Override |
| public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized) { |
| setOldValueIsSerialized(isSerialized); |
| // Defer serialization until toData is called. |
| setOldValObj(ov); |
| } |
| |
| @Override |
| public void importOldBytes(byte[] ov, boolean isSerialized) { |
| setOldValueIsSerialized(isSerialized); |
| setOldValBytes(ov); |
| } |
| |
| public static class DestroyReplyMessage extends ReplyMessage { |
| |
| private static final byte HAS_VERSION = 0x01; |
| private static final byte PERSISTENT = 0x02; |
| |
| private VersionTag<?> versionTag; |
| |
| /** DSFIDFactory constructor */ |
| public DestroyReplyMessage() {} |
| |
| static void send(InternalDistributedMember recipient, ReplySender dm, int procId, |
| VersionTag<?> versionTag) { |
| Assert.assertTrue(recipient != null, "DestroyReplyMessage NULL recipient"); |
| DestroyReplyMessage m = new DestroyReplyMessage(recipient, procId, versionTag); |
| dm.putOutgoing(m); |
| } |
| |
| DestroyReplyMessage(InternalDistributedMember recipient, int procId, VersionTag<?> versionTag) { |
| this.setProcessorId(procId); |
| this.setRecipient(recipient); |
| this.versionTag = versionTag; |
| } |
| |
| @Override |
| public boolean getInlineProcess() { |
| return true; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return R_DESTROY_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void process(final DistributionManager dm, final ReplyProcessor21 rp) { |
| final long startTime = getTimestamp(); |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, |
| "DestroyReplyMessage process invoking reply processor with processorId:{}", |
| this.processorId); |
| } |
| if (rp == null) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "DestroyReplyMessage processor not found"); |
| } |
| return; |
| } |
| if (this.versionTag != null) { |
| this.versionTag.replaceNullIDs(getSender()); |
| } |
| if (rp instanceof RemoteDestroyReplyProcessor) { |
| RemoteDestroyReplyProcessor processor = (RemoteDestroyReplyProcessor) rp; |
| processor.setResponse(this.versionTag); |
| } |
| rp.process(this); |
| |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", rp, this); |
| } |
| dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| byte b = 0; |
| if (this.versionTag != null) { |
| b |= HAS_VERSION; |
| } |
| if (this.versionTag instanceof DiskVersionTag) { |
| b |= PERSISTENT; |
| } |
| out.writeByte(b); |
| if (this.versionTag != null) { |
| InternalDataSerializer.invokeToData(this.versionTag, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| byte b = in.readByte(); |
| boolean hasTag = (b & HAS_VERSION) != 0; |
| boolean persistentTag = (b & PERSISTENT) != 0; |
| if (hasTag) { |
| this.versionTag = VersionTag.create(persistentTag, in); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = super.getStringBuilder(); |
| sb.append(getShortClassName()); |
| sb.append(" processorId="); |
| sb.append(this.processorId); |
| if (this.versionTag != null) { |
| sb.append(" version=").append(this.versionTag); |
| } |
| sb.append(" from "); |
| sb.append(this.getSender()); |
| ReplyException ex = getException(); |
| if (ex != null) { |
| sb.append(" with exception "); |
| sb.append(ex); |
| } |
| return sb.toString(); |
| } |
| |
| } |
| static class RemoteDestroyReplyProcessor extends RemoteOperationResponse { |
| VersionTag<?> versionTag; |
| |
| RemoteDestroyReplyProcessor(InternalDistributedSystem ds, DistributedMember recipient, |
| Object key) { |
| super(ds, (InternalDistributedMember) recipient, false); |
| } |
| |
| void setResponse(VersionTag<?> versionTag) { |
| this.versionTag = versionTag; |
| } |
| |
| VersionTag<?> getVersionTag() { |
| return this.versionTag; |
| } |
| } |
| } |