| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE; |
| import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.function.Function; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CopyHelper; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.DeltaSerializationException; |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.SerializationException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.EntryOperation; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.SerializedCacheValue; |
| import org.apache.geode.cache.TransactionId; |
| import org.apache.geode.cache.query.IndexMaintenanceException; |
| import org.apache.geode.cache.query.QueryException; |
| import org.apache.geode.cache.query.internal.index.IndexManager; |
| import org.apache.geode.cache.query.internal.index.IndexProtocol; |
| import org.apache.geode.cache.query.internal.index.IndexUtils; |
| import org.apache.geode.cache.util.TimestampedEntryEvent; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.DSFIDFactory; |
| import org.apache.geode.internal.HeapDataOutputStream; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.Sendable; |
| import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; |
| import org.apache.geode.internal.cache.entries.OffHeapRegionEntry; |
| import org.apache.geode.internal.cache.partitioned.PartitionMessage; |
| import org.apache.geode.internal.cache.partitioned.PutMessage; |
| import org.apache.geode.internal.cache.tier.sockets.CacheServerHelper; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tx.DistTxKeyInfo; |
| import org.apache.geode.internal.cache.tx.RemoteOperationMessage; |
| import org.apache.geode.internal.cache.tx.RemotePutMessage; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument; |
| import org.apache.geode.internal.lang.StringUtils; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.offheap.OffHeapHelper; |
| import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper; |
| import org.apache.geode.internal.offheap.ReferenceCountHelper; |
| import org.apache.geode.internal.offheap.Releasable; |
| import org.apache.geode.internal.offheap.StoredObject; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.offheap.annotations.Retained; |
| import org.apache.geode.internal.offheap.annotations.Unretained; |
| import org.apache.geode.internal.serialization.ByteArrayDataInput; |
| import org.apache.geode.internal.serialization.DataSerializableFixedID; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.size.Sizeable; |
| import org.apache.geode.internal.util.ArrayUtils; |
| import org.apache.geode.internal.util.BlobHelper; |
| import org.apache.geode.pdx.internal.PeerTypeRegistration; |
| |
| /** |
| * Implementation of an entry event |
| * |
| * must be public for DataSerializableFixedID |
| */ |
| public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, |
| DataSerializableFixedID, EntryOperation, Releasable { |
| private static final Logger logger = LogService.getLogger(); |
| |
| // PACKAGE FIELDS // |
| private transient InternalRegion region; |
| |
| private transient RegionEntry re; |
| |
| protected KeyInfo keyInfo; |
| |
| /** the event's id. Scoped by distributedMember. */ |
| protected EventID eventID; |
| |
| private Object newValue = null; |
| |
| /** |
| * If we ever serialize the new value then it should be stored in this field in case we need the |
| * serialized form again later. This was added to fix bug 43781. Note that we also have the |
| * "newValueBytes" field. But it is only non-null if setSerializedNewValue was called. |
| */ |
| private byte[] cachedSerializedNewValue = null; |
| |
| @Retained(ENTRY_EVENT_OLD_VALUE) |
| private Object oldValue = null; |
| |
| protected short eventFlags = 0x0000; |
| |
| protected TXId txId = null; |
| |
| protected Operation op; |
| |
| /* To store the operation/modification type */ |
| private transient EnumListenerEvent eventType; |
| |
| /** |
| * This field will be null unless this event is used for a putAll operation. |
| * |
| * @since GemFire 5.0 |
| */ |
| protected transient DistributedPutAllOperation putAllOp; |
| |
| /** |
| * This field will be null unless this event is used for a removeAll operation. |
| * |
| * @since GemFire 8.1 |
| */ |
| protected transient DistributedRemoveAllOperation removeAllOp; |
| |
| /** |
| * The member that originated this event |
| * |
| * @since GemFire 5.0 |
| */ |
| protected DistributedMember distributedMember; |
| |
| /** |
| * transient storage for the message that caused the event |
| */ |
| transient DistributionMessage causedByMessage; |
| |
| /** |
| * The originating membershipId of this event. |
| * |
| * @since GemFire 5.1 |
| */ |
| protected ClientProxyMembershipID context = null; |
| |
| /** |
| * this holds the bytes representing the change in value effected by this event. It is used when |
| * the value implements the Delta interface. |
| */ |
| private byte[] deltaBytes = null; |
| |
| /** routing information for cache clients for this event */ |
| private FilterInfo filterInfo; |
| |
| /** new value stored in serialized form */ |
| protected byte[] newValueBytes; |
| |
| /** old value stored in serialized form */ |
| private byte[] oldValueBytes; |
| |
| /** version tag for concurrency checks */ |
| protected VersionTag versionTag; |
| |
| /** boolean to indicate that the RegionEntry for this event has been evicted */ |
| private transient boolean isEvicted = false; |
| |
| private transient boolean isPendingSecondaryExpireDestroy = false; |
| |
| private transient boolean hasRetried = false; |
| |
| public static final Object SUSPECT_TOKEN = new Object(); |
| |
| public EntryEventImpl() { |
| this.offHeapLock = null; |
| } |
| |
| /** |
| * Reads the contents of this message from the given input. |
| */ |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| this.eventID = (EventID) context.getDeserializer().readObject(in); |
| Object key = context.getDeserializer().readObject(in); |
| Object value = context.getDeserializer().readObject(in); |
| this.keyInfo = new KeyInfo(key, value, null); |
| this.op = Operation.fromOrdinal(in.readByte()); |
| this.eventFlags = in.readShort(); |
| this.keyInfo.setCallbackArg(context.getDeserializer().readObject(in)); |
| this.txId = (TXId) context.getDeserializer().readObject(in); |
| |
| if (in.readBoolean()) { // isDelta |
| assert false : "isDelta should never be true"; |
| } else { |
| // OFFHEAP Currently values are never deserialized to off heap memory. If that changes then |
| // this code needs to change. |
| if (in.readBoolean()) { // newValueSerialized |
| this.newValueBytes = DataSerializer.readByteArray(in); |
| this.cachedSerializedNewValue = this.newValueBytes; |
| this.newValue = null; // set later in generateNewValueFromBytesIfNeeded |
| } else { |
| this.newValueBytes = null; |
| this.cachedSerializedNewValue = null; |
| this.newValue = context.getDeserializer().readObject(in); |
| } |
| } |
| |
| // OFFHEAP Currently values are never deserialized to off heap memory. If that changes then this |
| // code needs to change. |
| if (in.readBoolean()) { // oldValueSerialized |
| this.oldValueBytes = DataSerializer.readByteArray(in); |
| this.oldValue = null; // set later in basicGetOldValue |
| } else { |
| this.oldValueBytes = null; |
| this.oldValue = context.getDeserializer().readObject(in); |
| } |
| this.distributedMember = DSFIDFactory.readInternalDistributedMember(in); |
| this.context = ClientProxyMembershipID.readCanonicalized(in); |
| this.tailKey = DataSerializer.readLong(in); |
| } |
| |
| @Retained |
| protected EntryEventImpl(InternalRegion region, Operation op, Object key, boolean originRemote, |
| DistributedMember distributedMember, boolean generateCallbacks, boolean fromRILocalDestroy) { |
| this.region = region; |
| InternalDistributedSystem ds = |
| (InternalDistributedSystem) region.getCache().getDistributedSystem(); |
| if (ds.getOffHeapStore() != null) { |
| this.offHeapLock = new Object(); |
| } else { |
| this.offHeapLock = null; |
| } |
| this.op = op; |
| this.keyInfo = region.getKeyInfo(key); |
| setOriginRemote(originRemote); |
| setGenerateCallbacks(generateCallbacks); |
| this.distributedMember = distributedMember; |
| setFromRILocalDestroy(fromRILocalDestroy); |
| } |
| |
| /** |
| * Doesn't specify oldValue as this will be filled in later as part of an operation on the region, |
| * or lets it default to null. |
| */ |
| @Retained |
| protected EntryEventImpl(final InternalRegion region, Operation op, Object key, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote, |
| DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) { |
| |
| this.region = region; |
| InternalDistributedSystem ds = |
| (InternalDistributedSystem) region.getCache().getDistributedSystem(); |
| if (ds.getOffHeapStore() != null) { |
| this.offHeapLock = new Object(); |
| } else { |
| this.offHeapLock = null; |
| } |
| this.op = op; |
| this.keyInfo = region.getKeyInfo(key, newVal, callbackArgument); |
| |
| if (!Token.isInvalid(newVal)) { |
| basicSetNewValue(newVal, false); |
| } |
| |
| this.txId = region.getTXId(); |
| /* |
| * this might set txId for events done from a thread that has a tx even though the op is non-tx. |
| * For example region ops. |
| */ |
| if (newVal == Token.LOCAL_INVALID) { |
| setLocalInvalid(true); |
| } |
| setOriginRemote(originRemote); |
| setGenerateCallbacks(generateCallbacks); |
| this.distributedMember = distributedMember; |
| } |
| |
| /** |
| * Called by BridgeEntryEventImpl to use existing EventID |
| */ |
| @Retained |
| protected EntryEventImpl(InternalRegion region, Operation op, Object key, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, |
| boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, |
| EventID eventID) { |
| this(region, op, key, newValue, callbackArgument, originRemote, distributedMember, |
| generateCallbacks, true /* initializeId */); |
| Assert.assertTrue(eventID != null || !(region instanceof PartitionedRegion)); |
| this.setEventId(eventID); |
| } |
| |
| /** |
| * create an entry event from another entry event |
| */ |
| @Retained |
| public EntryEventImpl( |
| @Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other) { |
| this(other, true); |
| } |
| |
| @Retained |
| public EntryEventImpl( |
| @Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other, |
| boolean setOldValue) { |
| setRegion(other.getRegion()); |
| if (other.offHeapLock != null) { |
| this.offHeapLock = new Object(); |
| } else { |
| this.offHeapLock = null; |
| } |
| |
| this.eventID = other.eventID; |
| basicSetNewValue(other.basicGetNewValue(), false); |
| this.newValueBytes = other.newValueBytes; |
| this.cachedSerializedNewValue = other.cachedSerializedNewValue; |
| this.re = other.re; |
| if (setOldValue) { |
| retainAndSetOldValue(other.basicGetOldValue()); |
| this.oldValueBytes = other.oldValueBytes; |
| } |
| this.eventFlags = other.eventFlags; |
| setEventFlag(EventFlags.FLAG_CALLBACKS_INVOKED, false); |
| txId = other.txId; |
| op = other.op; |
| distributedMember = other.distributedMember; |
| this.filterInfo = other.filterInfo; |
| this.keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo) |
| : new KeyInfo(other.keyInfo); |
| if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) { |
| this.keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument( |
| (GatewaySenderEventCallbackArgument) other.getRawCallbackArgument()))); |
| } |
| this.context = other.context; |
| this.deltaBytes = other.deltaBytes; |
| this.tailKey = other.tailKey; |
| this.versionTag = other.versionTag; |
| // set possible duplicate |
| this.setPossibleDuplicate(other.isPossibleDuplicate()); |
| } |
| |
| @Retained |
| public EntryEventImpl(Object key2, boolean isOffHeap) { |
| this.keyInfo = new KeyInfo(key2, null, null); |
| if (isOffHeap) { |
| this.offHeapLock = new Object(); |
| } else { |
| this.offHeapLock = null; |
| } |
| } |
| |
| /** |
| * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the EntryEventImpl |
| * if the region parameter is a PartitionedRegion. |
| */ |
| @Retained |
| public static EntryEventImpl create(InternalRegion region, Operation op, Object key, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, |
| boolean originRemote, DistributedMember distributedMember) { |
| return create(region, op, key, newValue, callbackArgument, originRemote, distributedMember, |
| true, true); |
| } |
| |
| /** |
| * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the EntryEventImpl |
| * if the region parameter is a PartitionedRegion. |
| */ |
| @Retained |
| public static EntryEventImpl create(InternalRegion region, Operation op, Object key, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, |
| boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks) { |
| return create(region, op, key, newValue, callbackArgument, originRemote, distributedMember, |
| generateCallbacks, true); |
| } |
| |
| /** |
| * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the EntryEventImpl |
| * if the region parameter is a PartitionedRegion. |
| * |
| * Called by BridgeEntryEventImpl to use existing EventID |
| */ |
| @Retained |
| public static EntryEventImpl create(InternalRegion region, Operation op, Object key, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, |
| boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, |
| EventID eventID) { |
| return new EntryEventImpl(region, op, key, newValue, callbackArgument, originRemote, |
| distributedMember, generateCallbacks, eventID); |
| } |
| |
| /** |
| * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the EntryEventImpl |
| * if the region parameter is a PartitionedRegion. |
| */ |
| @Retained |
| public static EntryEventImpl create(InternalRegion region, Operation op, Object key, |
| boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, |
| boolean fromRILocalDestroy) { |
| return new EntryEventImpl(region, op, key, originRemote, distributedMember, generateCallbacks, |
| fromRILocalDestroy); |
| } |
| |
| /** |
| * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the EntryEventImpl |
| * if the region parameter is a PartitionedRegion. |
| * |
| * This creator does not specify the oldValue as this will be filled in later as part of an |
| * operation on the region, or lets it default to null. |
| */ |
| @Retained |
| public static EntryEventImpl create(final InternalRegion region, Operation op, Object key, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote, |
| DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) { |
| return new EntryEventImpl(region, op, key, newVal, callbackArgument, originRemote, |
| distributedMember, generateCallbacks, initializeId); |
| } |
| |
| /** |
| * Creates a PutAllEvent given the distributed operation, the region, and the entry data. |
| * |
| * @since GemFire 5.0 |
| */ |
| @Retained |
| static EntryEventImpl createPutAllEvent(DistributedPutAllOperation putAllOp, |
| InternalRegion region, Operation entryOp, Object entryKey, |
| @Retained(ENTRY_EVENT_NEW_VALUE) Object entryNewValue) { |
| @Retained |
| EntryEventImpl e; |
| if (putAllOp != null) { |
| EntryEventImpl event = putAllOp.getBaseEvent(); |
| if (event.isBridgeEvent()) { |
| e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, |
| event.getRawCallbackArgument(), false, event.distributedMember, |
| event.isGenerateCallbacks()); |
| e.setContext(event.getContext()); |
| } else { |
| e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, |
| event.getCallbackArgument(), false, region.getMyId(), event.isGenerateCallbacks()); |
| } |
| |
| } else { |
| e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, null, false, |
| region.getMyId(), true); |
| } |
| |
| e.putAllOp = putAllOp; |
| return e; |
| } |
| |
| @Retained |
| protected static EntryEventImpl createRemoveAllEvent(DistributedRemoveAllOperation op, |
| InternalRegion region, Object entryKey) { |
| @Retained |
| EntryEventImpl e; |
| final Operation entryOp = Operation.REMOVEALL_DESTROY; |
| if (op != null) { |
| EntryEventImpl event = op.getBaseEvent(); |
| if (event.isBridgeEvent()) { |
| e = EntryEventImpl.create(region, entryOp, entryKey, null, event.getRawCallbackArgument(), |
| false, event.distributedMember, event.isGenerateCallbacks()); |
| e.setContext(event.getContext()); |
| } else { |
| e = EntryEventImpl.create(region, entryOp, entryKey, null, event.getCallbackArgument(), |
| false, region.getMyId(), event.isGenerateCallbacks()); |
| } |
| |
| } else { |
| e = EntryEventImpl.create(region, entryOp, entryKey, null, null, false, region.getMyId(), |
| true); |
| } |
| |
| e.removeAllOp = op; |
| return e; |
| } |
| |
| public boolean isBulkOpInProgress() { |
| return getPutAllOperation() != null || getRemoveAllOperation() != null; |
| } |
| |
| /** return the putAll operation for this event, if any */ |
| public DistributedPutAllOperation getPutAllOperation() { |
| return this.putAllOp; |
| } |
| |
| public DistributedPutAllOperation setPutAllOperation(DistributedPutAllOperation nv) { |
| DistributedPutAllOperation result = this.putAllOp; |
| if (nv != null && nv.getBaseEvent() != null) { |
| setCallbackArgument(nv.getBaseEvent().getCallbackArgument()); |
| } |
| this.putAllOp = nv; |
| return result; |
| } |
| |
| public DistributedRemoveAllOperation getRemoveAllOperation() { |
| return this.removeAllOp; |
| } |
| |
| public DistributedRemoveAllOperation setRemoveAllOperation(DistributedRemoveAllOperation nv) { |
| DistributedRemoveAllOperation result = this.removeAllOp; |
| if (nv != null && nv.getBaseEvent() != null) { |
| setCallbackArgument(nv.getBaseEvent().getCallbackArgument()); |
| } |
| this.removeAllOp = nv; |
| return result; |
| } |
| |
| private boolean testEventFlag(short mask) { |
| return EventFlags.isSet(this.eventFlags, mask); |
| } |
| |
| private void setEventFlag(short mask, boolean on) { |
| this.eventFlags = EventFlags.set(this.eventFlags, mask, on); |
| } |
| |
| @Override |
| public DistributedMember getDistributedMember() { |
| return this.distributedMember; |
| } |
| |
| /////////////////////// INTERNAL BOOLEAN SETTERS |
| public void setOriginRemote(boolean b) { |
| setEventFlag(EventFlags.FLAG_ORIGIN_REMOTE, b); |
| } |
| |
| public void setLocalInvalid(boolean b) { |
| setEventFlag(EventFlags.FLAG_LOCAL_INVALID, b); |
| } |
| |
| public void setGenerateCallbacks(boolean b) { |
| setEventFlag(EventFlags.FLAG_GENERATE_CALLBACKS, b); |
| } |
| |
| /** set the the flag telling whether callbacks should be invoked for a partitioned region */ |
| public void setInvokePRCallbacks(boolean b) { |
| setEventFlag(EventFlags.FLAG_INVOKE_PR_CALLBACKS, b); |
| } |
| |
| /** get the flag telling whether callbacks should be invoked for a partitioned region */ |
| public boolean getInvokePRCallbacks() { |
| return testEventFlag(EventFlags.FLAG_INVOKE_PR_CALLBACKS); |
| } |
| |
| public boolean getInhibitDistribution() { |
| return testEventFlag(EventFlags.FLAG_INHIBIT_DISTRIBUTION); |
| } |
| |
| public void setInhibitDistribution(boolean b) { |
| setEventFlag(EventFlags.FLAG_INHIBIT_DISTRIBUTION, b); |
| } |
| |
| /** was the entry destroyed or missing and allowed to be destroyed again? */ |
| public boolean getIsRedestroyedEntry() { |
| return testEventFlag(EventFlags.FLAG_REDESTROYED_TOMBSTONE); |
| } |
| |
| public void setIsRedestroyedEntry(boolean b) { |
| setEventFlag(EventFlags.FLAG_REDESTROYED_TOMBSTONE, b); |
| } |
| |
| public void isConcurrencyConflict(boolean b) { |
| setEventFlag(EventFlags.FLAG_CONCURRENCY_CONFLICT, b); |
| } |
| |
| public boolean isConcurrencyConflict() { |
| return testEventFlag(EventFlags.FLAG_CONCURRENCY_CONFLICT); |
| } |
| |
| /** set the DistributionMessage that caused this event */ |
| public void setCausedByMessage(DistributionMessage msg) { |
| this.causedByMessage = msg; |
| } |
| |
| /** |
| * get the PartitionMessage that caused this event, or null if the event was not caused by a |
| * PartitionMessage |
| */ |
| public PartitionMessage getPartitionMessage() { |
| if (this.causedByMessage != null && this.causedByMessage instanceof PartitionMessage) { |
| return (PartitionMessage) this.causedByMessage; |
| } |
| return null; |
| } |
| |
| /** |
| * get the RemoteOperationMessage that caused this event, or null if the event was not caused by a |
| * RemoteOperationMessage |
| */ |
| public RemoteOperationMessage getRemoteOperationMessage() { |
| if (this.causedByMessage != null && this.causedByMessage instanceof RemoteOperationMessage) { |
| return (RemoteOperationMessage) this.causedByMessage; |
| } |
| return null; |
| } |
| |
| /////////////// BOOLEAN GETTERS |
| public boolean isLocalLoad() { |
| return this.op.isLocalLoad(); |
| } |
| |
| public boolean isNetSearch() { |
| return this.op.isNetSearch(); |
| } |
| |
| public boolean isNetLoad() { |
| return this.op.isNetLoad(); |
| } |
| |
| public boolean isDistributed() { |
| return this.op.isDistributed(); |
| } |
| |
| public boolean isExpiration() { |
| return this.op.isExpiration(); |
| } |
| |
| public boolean isEviction() { |
| return this.op.isEviction(); |
| } |
| |
| public void setEvicted() { |
| this.isEvicted = true; |
| } |
| |
| public boolean isEvicted() { |
| return this.isEvicted; |
| } |
| |
| public boolean hasRetried() { |
| return hasRetried; |
| } |
| |
| public void setRetried(boolean retried) { |
| hasRetried = retried; |
| } |
| |
| public boolean isPendingSecondaryExpireDestroy() { |
| return this.isPendingSecondaryExpireDestroy; |
| } |
| |
| public void setPendingSecondaryExpireDestroy(boolean value) { |
| this.isPendingSecondaryExpireDestroy = value; |
| } |
| |
| // Note that isOriginRemote is sometimes set to false even though the event |
| // was received from a peer. This is done to force distribution of the |
| // message to peers and to cause concurrency version stamping to be performed. |
| // This is done by all one-hop operations, like RemoteInvalidateMessage. |
| @Override |
| public boolean isOriginRemote() { |
| return testEventFlag(EventFlags.FLAG_ORIGIN_REMOTE); |
| } |
| |
| /* return whether this event originated from a WAN gateway and carries a WAN version tag */ |
| public boolean isFromWANAndVersioned() { |
| return (this.versionTag != null && this.versionTag.isGatewayTag()); |
| } |
| |
| /* return whether this event originated in a client and carries a version tag */ |
| public boolean isFromBridgeAndVersioned() { |
| return (this.context != null) && (this.versionTag != null); |
| } |
| |
| @Override |
| public boolean isGenerateCallbacks() { |
| return testEventFlag(EventFlags.FLAG_GENERATE_CALLBACKS); |
| } |
| |
| public void setNewEventId(DistributedSystem sys) { |
| Assert.assertTrue(this.eventID == null, "Double setting event id"); |
| EventID newID = new EventID(sys); |
| if (this.eventID != null) { |
| if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) { |
| logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Replacing event ID with {} in event {}", |
| newID, this); |
| } |
| } |
| this.eventID = newID; |
| } |
| |
| public void reserveNewEventId(DistributedSystem sys, int count) { |
| Assert.assertTrue(this.eventID == null, "Double setting event id"); |
| this.eventID = new EventID(sys); |
| if (count > 1) { |
| this.eventID.reserveSequenceId(count - 1); |
| } |
| } |
| |
| public void setEventId(EventID id) { |
| this.eventID = id; |
| } |
| |
| /** |
| * Return the event id, if any |
| * |
| * @return null if no event id has been set |
| */ |
| @Override |
| public EventID getEventId() { |
| return this.eventID; |
| } |
| |
| @Override |
| public boolean isBridgeEvent() { |
| return hasClientOrigin(); |
| } |
| |
| @Override |
| public boolean hasClientOrigin() { |
| return getContext() != null; |
| } |
| |
| /** |
| * sets the ID of the client that initiated this event |
| */ |
| public void setContext(ClientProxyMembershipID contx) { |
| Assert.assertTrue(contx != null); |
| this.context = contx; |
| } |
| |
| /** |
| * gets the ID of the client that initiated this event. Null if a server-initiated event |
| */ |
| @Override |
| public ClientProxyMembershipID getContext() { |
| return this.context; |
| } |
| |
| public boolean isLocalInvalid() { |
| return testEventFlag(EventFlags.FLAG_LOCAL_INVALID); |
| } |
| |
| ///////////////////////////////////////////////// |
| |
| /** |
| * Returns the key. |
| * |
| * @return the key. |
| */ |
| @Override |
| public Object getKey() { |
| return keyInfo.getKey(); |
| } |
| |
| /** |
| * Returns the value in the cache prior to this event. When passed to an event handler after an |
| * event occurs, this value reflects the value that was in the cache in this VM, not necessarily |
| * the value that was in the cache VM that initiated the operation. |
| * |
| * @return the value in the cache prior to this event. |
| */ |
| @Override |
| public Object getOldValue() { |
| try { |
| if (isOriginRemote() && getRegion().isProxy()) { |
| return null; |
| } |
| @Unretained |
| Object ov = handleNotAvailableOldValue(); |
| if (ov != null) { |
| boolean doCopyOnRead = getRegion().isCopyOnRead(); |
| if (ov instanceof CachedDeserializable) { |
| return callWithOffHeapLock((CachedDeserializable) ov, oldValueCD -> { |
| if (doCopyOnRead) { |
| return oldValueCD.getDeserializedWritableCopy(getRegion(), this.re); |
| } else { |
| return oldValueCD.getDeserializedValue(getRegion(), this.re); |
| } |
| }); |
| } else { |
| if (doCopyOnRead) { |
| return CopyHelper.copy(ov); |
| } else { |
| return ov; |
| } |
| } |
| } |
| return null; |
| } catch (IllegalArgumentException i) { |
| IllegalArgumentException iae = new IllegalArgumentException(String.format("%s", |
| "Error while deserializing value for key=" + getKey())); |
| iae.initCause(i); |
| throw iae; |
| } |
| } |
| |
| /** |
| * returns the old value after handling one this is NOT_AVAILABLE. If the old value is |
| * NOT_AVAILABLE then it may try to read it from disk. If it can't read an unavailable old value |
| * from disk then it will return null instead of NOT_AVAILABLE. |
| */ |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| private Object handleNotAvailableOldValue() { |
| @Unretained |
| Object result = basicGetOldValue(); |
| if (result != Token.NOT_AVAILABLE) { |
| return result; |
| } |
| if (getReadOldValueFromDisk()) { |
| try { |
| result = getRegion().getValueInVMOrDiskWithoutFaultIn(getKey()); |
| } catch (EntryNotFoundException ex) { |
| result = null; |
| } |
| } |
| result = AbstractRegion.handleNotAvailable(result); |
| return result; |
| } |
| |
| /** |
| * If true then when getOldValue is called if the NOT_AVAILABLE is found then an attempt will be |
| * made to read the old value from disk without faulting it in. Should only be set to true when |
| * product is calling a method on a CacheWriter. |
| */ |
| private boolean readOldValueFromDisk; |
| |
| public boolean getReadOldValueFromDisk() { |
| return this.readOldValueFromDisk; |
| } |
| |
| public void setReadOldValueFromDisk(boolean v) { |
| this.readOldValueFromDisk = v; |
| } |
| |
| /** |
| * Like getRawNewValue except that if the result is an off-heap reference then copy it to the |
| * heap. Note: to prevent the heap copy use getRawNewValue instead |
| */ |
| public Object getRawNewValueAsHeapObject() { |
| Object result = basicGetNewValue(); |
| if (mayHaveOffHeapReferences()) { |
| result = OffHeapHelper.copyIfNeeded(result, getRegion().getCache()); |
| } |
| return result; |
| } |
| |
| /** |
| * If new value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE). Its |
| * refcount is not inced by this call and the returned object can only be safely used for the |
| * lifetime of the EntryEventImpl instance that returned the value. Else return the raw form. |
| */ |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| public Object getRawNewValue() { |
| return basicGetNewValue(); |
| } |
| |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| public Object getValue() { |
| return basicGetNewValue(); |
| } |
| |
| @Released(ENTRY_EVENT_NEW_VALUE) |
| protected void basicSetNewValue(@Retained(ENTRY_EVENT_NEW_VALUE) Object v, |
| boolean clearCachedSerializedAndBytes) { |
| if (v == this.newValue) |
| return; |
| if (mayHaveOffHeapReferences()) { |
| if (this.offHeapOk) { |
| OffHeapHelper.releaseAndTrackOwner(this.newValue, this); |
| } |
| if (StoredObject.isOffHeapReference(v)) { |
| ReferenceCountHelper.setReferenceCountOwner(this); |
| if (!((StoredObject) v).retain()) { |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| this.newValue = null; |
| return; |
| } |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| } |
| } |
| this.newValue = v; |
| if (clearCachedSerializedAndBytes) { |
| this.newValueBytes = null; |
| this.cachedSerializedNewValue = null; |
| } |
| } |
| |
| private void generateNewValueFromBytesIfNeeded() { |
| if (this.newValue != null) { |
| // no need to generate a new value |
| return; |
| } |
| byte[] bytes = this.newValueBytes; |
| if (bytes != null) { |
| this.newValue = CachedDeserializableFactory.create(bytes, getRegion().getCache()); |
| } |
| } |
| |
| @Override |
| @Unretained |
| public Object basicGetNewValue() { |
| generateNewValueFromBytesIfNeeded(); |
| Object result = this.newValue; |
| if (!this.offHeapOk && isOffHeapReference(result)) { |
| // this.region.getCache().getLogger().info("DEBUG new value already freed " + |
| // System.identityHashCode(result)); |
| throw new IllegalStateException( |
| "Attempt to access off heap value after the EntryEvent was released."); |
| } |
| return result; |
| } |
| |
| private boolean isOffHeapReference(Object ref) { |
| return mayHaveOffHeapReferences() && StoredObject.isOffHeapReference(ref); |
| } |
| |
| private class OldValueOwner { |
| private EntryEventImpl getEvent() { |
| return EntryEventImpl.this; |
| } |
| |
| @Override |
| public int hashCode() { |
| return getEvent().hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj instanceof OldValueOwner) { |
| return getEvent().equals(((OldValueOwner) obj).getEvent()); |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "OldValueOwner " + getEvent().toString(); |
| } |
| } |
| |
| /** |
| * Note if v might be an off-heap reference that you did not retain for this EntryEventImpl then |
| * call retainsAndSetOldValue instead of this method. |
| * |
| * @param v the caller should have already retained this off-heap reference. |
| */ |
| @Released(ENTRY_EVENT_OLD_VALUE) |
| void basicSetOldValue(@Unretained(ENTRY_EVENT_OLD_VALUE) Object v) { |
| @Released |
| final Object curOldValue = this.oldValue; |
| if (v == curOldValue) { |
| return; |
| } |
| if (this.offHeapOk && mayHaveOffHeapReferences()) { |
| if (ReferenceCountHelper.trackReferenceCounts()) { |
| OffHeapHelper.releaseAndTrackOwner(curOldValue, new OldValueOwner()); |
| } else { |
| OffHeapHelper.release(curOldValue); |
| } |
| } |
| |
| this.oldValue = v; |
| this.oldValueBytes = null; |
| } |
| |
| @Released(ENTRY_EVENT_OLD_VALUE) |
| private void retainAndSetOldValue(@Retained(ENTRY_EVENT_OLD_VALUE) Object v) { |
| if (v == this.oldValue) { |
| return; |
| } |
| if (isOffHeapReference(v)) { |
| StoredObject so = (StoredObject) v; |
| if (ReferenceCountHelper.trackReferenceCounts()) { |
| ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner()); |
| boolean couldNotRetain = (!so.retain()); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| if (couldNotRetain) { |
| this.oldValue = null; |
| this.oldValueBytes = null; |
| return; |
| } |
| } else { |
| if (!so.retain()) { |
| this.oldValue = null; |
| this.oldValueBytes = null; |
| return; |
| } |
| } |
| } |
| basicSetOldValue(v); |
| } |
| |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| Object basicGetOldValue() { |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| Object result = this.oldValue; |
| if (result == null) { |
| byte[] bytes = this.oldValueBytes; |
| if (bytes != null) { |
| result = CachedDeserializableFactory.create(bytes, getRegion().getCache()); |
| this.oldValue = result; |
| } |
| } |
| if (!this.offHeapOk && isOffHeapReference(result)) { |
| // this.region.getCache().getLogger().info("DEBUG old value already freed " + |
| // System.identityHashCode(result)); |
| throw new IllegalStateException( |
| "Attempt to access off heap value after the EntryEvent was released."); |
| } |
| return result; |
| } |
| |
| /** |
| * Like getRawOldValue except that if the result is an off-heap reference then copy it to the |
| * heap. To avoid the heap copy use getRawOldValue instead. |
| */ |
| public Object getRawOldValueAsHeapObject() { |
| Object result = basicGetOldValue(); |
| if (mayHaveOffHeapReferences()) { |
| result = OffHeapHelper.copyIfNeeded(result, getRegion().getCache()); |
| } |
| return result; |
| } |
| |
| /* |
| * If the old value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE). Its |
| * refcount is not inced by this call and the returned object can only be safely used for the |
| * lifetime of the EntryEventImpl instance that returned the value. Else return the raw form. |
| */ |
| @Unretained |
| public Object getRawOldValue() { |
| return basicGetOldValue(); |
| } |
| |
| /** |
| * Just like getRawOldValue except if the raw old value is off-heap deserialize it. |
| */ |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| public Object getOldValueAsOffHeapDeserializedOrRaw() { |
| Object result = basicGetOldValue(); |
| if (mayHaveOffHeapReferences() && result instanceof StoredObject) { |
| result = ((CachedDeserializable) result).getDeserializedForReading(); |
| } |
| return AbstractRegion.handleNotAvailable(result); // fixes 49499 |
| } |
| |
| /** |
| * Added this function to expose isCopyOnRead function to the child classes of EntryEventImpl |
| * |
| */ |
| protected boolean isRegionCopyOnRead() { |
| return getRegion().isCopyOnRead(); |
| } |
| |
| /** |
| * Returns the value in the cache after this event. |
| * |
| * @return the value in the cache after this event. |
| */ |
| @Override |
| public Object getNewValue() { |
| |
| boolean doCopyOnRead = getRegion().isCopyOnRead(); |
| Object nv = basicGetNewValue(); |
| if (nv != null) { |
| if (nv == Token.NOT_AVAILABLE) { |
| // I'm not sure this can even happen |
| return AbstractRegion.handleNotAvailable(nv); |
| } |
| if (nv instanceof CachedDeserializable) { |
| return callWithOffHeapLock((CachedDeserializable) nv, newValueCD -> { |
| Object v = null; |
| if (doCopyOnRead) { |
| v = newValueCD.getDeserializedWritableCopy(getRegion(), this.re); |
| } else { |
| v = newValueCD.getDeserializedValue(getRegion(), this.re); |
| } |
| assert !(v instanceof CachedDeserializable) : "for key " + this.getKey() |
| + " found nested CachedDeserializable"; |
| return v; |
| }); |
| } else { |
| if (doCopyOnRead) { |
| return CopyHelper.copy(nv); |
| } else { |
| return nv; |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Invoke the given function with a lock if the given value is offheap. |
| * |
| * @return the value returned from invoking the function |
| */ |
| private <T, R> R callWithOffHeapLock(T value, Function<T, R> function) { |
| if (isOffHeapReference(value)) { |
| synchronized (this.offHeapLock) { |
| if (!this.offHeapOk) { |
| throw new IllegalStateException( |
| "Attempt to access off heap value after the EntryEvent was released."); |
| } |
| return function.apply(value); |
| } |
| } else { |
| return function.apply(value); |
| } |
| } |
| |
| private final Object offHeapLock; |
| |
| public String getNewValueStringForm() { |
| return StringUtils.forceToString(basicGetNewValue()); |
| } |
| |
| public String getOldValueStringForm() { |
| return StringUtils.forceToString(basicGetOldValue()); |
| } |
| |
| /** Set a deserialized value */ |
| public void setNewValue(@Retained(ENTRY_EVENT_NEW_VALUE) Object obj) { |
| basicSetNewValue(obj, true); |
| } |
| |
| @Override |
| public TransactionId getTransactionId() { |
| return this.txId; |
| } |
| |
| public void setTransactionId(TransactionId txId) { |
| this.txId = (TXId) txId; |
| } |
| |
| /** |
| * Answer true if this event resulted from a loader. |
| * |
| * @return true if isLocalLoad or isNetLoad |
| */ |
| public boolean isLoad() { |
| return this.op.isLoad(); |
| } |
| |
| public void setRegion(InternalRegion r) { |
| this.region = r; |
| } |
| |
| /** |
| * @see org.apache.geode.cache.CacheEvent#getRegion() |
| */ |
| @Override |
| public InternalRegion getRegion() { |
| return region; |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return this.op; |
| } |
| |
| public void setOperation(Operation op) { |
| this.op = op; |
| PartitionMessage prm = getPartitionMessage(); |
| if (prm != null) { |
| prm.setOperation(this.op); |
| } |
| } |
| |
| /** |
| * @see org.apache.geode.cache.CacheEvent#getCallbackArgument() |
| */ |
| @Override |
| public Object getCallbackArgument() { |
| Object result = this.keyInfo.getCallbackArg(); |
| while (result instanceof WrappedCallbackArgument) { |
| WrappedCallbackArgument wca = (WrappedCallbackArgument) result; |
| result = wca.getOriginalCallbackArg(); |
| } |
| if (result == Token.NOT_AVAILABLE) { |
| result = AbstractRegion.handleNotAvailable(result); |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean isCallbackArgumentAvailable() { |
| return this.getRawCallbackArgument() != Token.NOT_AVAILABLE; |
| } |
| |
| /** |
| * Returns the value of the EntryEventImpl field. This is for internal use only. Customers should |
| * always call {@link #getCallbackArgument} |
| * |
| * @since GemFire 5.5 |
| */ |
| public Object getRawCallbackArgument() { |
| return this.keyInfo.getCallbackArg(); |
| } |
| |
| /** |
| * Sets the value of raw callback argument field. |
| */ |
| public void setRawCallbackArgument(Object newCallbackArgument) { |
| this.keyInfo.setCallbackArg(newCallbackArgument); |
| } |
| |
| public void setCallbackArgument(Object newCallbackArgument) { |
| if (this.keyInfo.getCallbackArg() instanceof WrappedCallbackArgument) { |
| ((WrappedCallbackArgument) this.keyInfo.getCallbackArg()) |
| .setOriginalCallbackArgument(newCallbackArgument); |
| } else { |
| this.keyInfo.setCallbackArg(newCallbackArgument); |
| } |
| } |
| |
| /** |
| * @return null if new value is not serialized; otherwise returns a SerializedCacheValueImpl |
| * containing the new value. |
| */ |
| @Override |
| public SerializedCacheValue<?> getSerializedNewValue() { |
| // In the case where there is a delta that has not been applied yet, |
| // do not apply it here since it would not produce a serialized new |
| // value (return null instead to indicate the new value is not |
| // in serialized form). |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| final Object tmp = basicGetNewValue(); |
| if (tmp instanceof CachedDeserializable) { |
| CachedDeserializable cd = (CachedDeserializable) tmp; |
| if (!cd.isSerialized()) { |
| return null; |
| } |
| byte[] bytes = this.newValueBytes; |
| if (bytes == null) { |
| bytes = this.cachedSerializedNewValue; |
| } |
| return new SerializedCacheValueImpl(this, getRegion(), this.re, cd, bytes); |
| } else { |
| // Note we return null even if cachedSerializedNewValue is not null. |
| // This is because some callers of this method use it to indicate |
| // that a CacheDeserializable should be created during deserialization. |
| return null; |
| } |
| } |
| |
| /** |
| * Implement this interface if you want to call {@link #exportNewValue}. |
| * |
| * |
| */ |
| public interface NewValueImporter { |
| /** |
| * @return true if the importer prefers the value to be in serialized form. |
| */ |
| boolean prefersNewSerialized(); |
| |
| /** |
| * Only return true if the importer can use the value before the event that exported it is |
| * released. If false is returned then off-heap values will be copied to the heap for the |
| * importer. |
| * |
| * @return true if the importer can deal with the value being an unretained OFF_HEAP_REFERENCE. |
| */ |
| boolean isUnretainedNewReferenceOk(); |
| |
| /** |
| * Import a new value that is currently in object form. |
| * |
| * @param nv the new value to import; unretained if isUnretainedNewReferenceOk returns true |
| * @param isSerialized true if the imported new value represents data that needs to be |
| * serialized; false if the imported new value is a simple sequence of bytes. |
| */ |
| void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized); |
| |
| /** |
| * Import a new value that is currently in byte array form. |
| * |
| * @param nv the new value to import |
| * @param isSerialized true if the imported new value represents data that needs to be |
| * serialized; false if the imported new value is a simple sequence of bytes. |
| */ |
| void importNewBytes(byte[] nv, boolean isSerialized); |
| } |
| |
| /** |
| * Export the event's new value to the given importer. |
| */ |
| public void exportNewValue(NewValueImporter importer) { |
| final boolean prefersSerialized = importer.prefersNewSerialized(); |
| if (prefersSerialized) { |
| byte[] serializedNewValue = getCachedSerializedNewValue(); |
| if (serializedNewValue == null) { |
| serializedNewValue = this.newValueBytes; |
| } |
| if (serializedNewValue != null) { |
| importer.importNewBytes(serializedNewValue, true); |
| return; |
| } |
| } |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| final Object nv = getRawNewValue(); |
| if (nv instanceof StoredObject) { |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| final StoredObject so = (StoredObject) nv; |
| final boolean isSerialized = so.isSerialized(); |
| if (importer.isUnretainedNewReferenceOk()) { |
| importer.importNewObject(nv, isSerialized); |
| } else if (!isSerialized || prefersSerialized) { |
| byte[] bytes = so.getValueAsHeapByteArray(); |
| importer.importNewBytes(bytes, isSerialized); |
| if (isSerialized) { |
| setCachedSerializedNewValue(bytes); |
| } |
| } else { |
| importer.importNewObject(so.getValueAsDeserializedHeapObject(), true); |
| } |
| } else if (nv instanceof byte[]) { |
| importer.importNewBytes((byte[]) nv, false); |
| } else if (nv instanceof CachedDeserializable) { |
| CachedDeserializable cd = (CachedDeserializable) nv; |
| Object cdV = cd.getValue(); |
| if (cdV instanceof byte[]) { |
| importer.importNewBytes((byte[]) cdV, true); |
| setCachedSerializedNewValue((byte[]) cdV); |
| } else { |
| importer.importNewObject(cdV, true); |
| } |
| } else { |
| importer.importNewObject(nv, true); |
| } |
| } |
| |
| /** |
| * Implement this interface if you want to call {@link #exportOldValue}. |
| * |
| * |
| */ |
| public interface OldValueImporter { |
| /** |
| * @return true if the importer prefers the value to be in serialized form. |
| */ |
| boolean prefersOldSerialized(); |
| |
| /** |
| * Only return true if the importer can use the value before the event that exported it is |
| * released. |
| * |
| * @return true if the importer can deal with the value being an unretained OFF_HEAP_REFERENCE. |
| */ |
| boolean isUnretainedOldReferenceOk(); |
| |
| /** |
| * @return return true if you want the old value to possibly be an instanceof |
| * CachedDeserializable; false if you want the value contained in a |
| * CachedDeserializable. |
| */ |
| boolean isCachedDeserializableValueOk(); |
| |
| /** |
| * Import an old value that is currently in object form. |
| * |
| * @param ov the old value to import; unretained if isUnretainedOldReferenceOk returns true |
| * @param isSerialized true if the imported old value represents data that needs to be |
| * serialized; false if the imported old value is a simple sequence of bytes. |
| */ |
| void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized); |
| |
| /** |
| * Import an old value that is currently in byte array form. |
| * |
| * @param ov the old value to import |
| * @param isSerialized true if the imported old value represents data that needs to be |
| * serialized; false if the imported old value is a simple sequence of bytes. |
| */ |
| void importOldBytes(byte[] ov, boolean isSerialized); |
| } |
| |
| /** |
| * Export the event's old value to the given importer. |
| */ |
| public void exportOldValue(OldValueImporter importer) { |
| final boolean prefersSerialized = importer.prefersOldSerialized(); |
| if (prefersSerialized) { |
| if (this.oldValueBytes != null) { |
| importer.importOldBytes(this.oldValueBytes, true); |
| return; |
| } |
| } |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| final Object ov = getRawOldValue(); |
| if (ov instanceof StoredObject) { |
| final StoredObject so = (StoredObject) ov; |
| final boolean isSerialized = so.isSerialized(); |
| if (importer.isUnretainedOldReferenceOk()) { |
| importer.importOldObject(ov, isSerialized); |
| } else if (!isSerialized || prefersSerialized) { |
| importer.importOldBytes(so.getValueAsHeapByteArray(), isSerialized); |
| } else { |
| importer.importOldObject(so.getValueAsDeserializedHeapObject(), true); |
| } |
| } else if (ov instanceof byte[]) { |
| importer.importOldBytes((byte[]) ov, false); |
| } else if (!importer.isCachedDeserializableValueOk() && ov instanceof CachedDeserializable) { |
| CachedDeserializable cd = (CachedDeserializable) ov; |
| Object cdV = cd.getValue(); |
| if (cdV instanceof byte[]) { |
| importer.importOldBytes((byte[]) cdV, true); |
| } else { |
| importer.importOldObject(cdV, true); |
| } |
| } else { |
| importer.importOldObject(AbstractRegion.handleNotAvailable(ov), true); |
| } |
| } |
| |
| /** |
| * Just like getRawNewValue(true) except if the raw new value is off-heap deserialize it. |
| */ |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| public Object getNewValueAsOffHeapDeserializedOrRaw() { |
| Object result = getRawNewValue(); |
| if (mayHaveOffHeapReferences() && result instanceof StoredObject) { |
| result = ((CachedDeserializable) result).getDeserializedForReading(); |
| } |
| return AbstractRegion.handleNotAvailable(result); // fixes 49499 |
| } |
| |
| /** |
| * If the new value is stored off-heap return a retained OFF_HEAP_REFERENCE (caller must release). |
| * |
| * @return a retained OFF_HEAP_REFERENCE if the new value is off-heap; otherwise returns null |
| */ |
| @Retained(ENTRY_EVENT_NEW_VALUE) |
| public StoredObject getOffHeapNewValue() { |
| return convertToStoredObject(basicGetNewValue()); |
| } |
| |
| /** |
| * If the old value is stored off-heap return a retained OFF_HEAP_REFERENCE (caller must release). |
| * |
| * @return a retained OFF_HEAP_REFERENCE if the old value is off-heap; otherwise returns null |
| */ |
| @Retained(ENTRY_EVENT_OLD_VALUE) |
| public StoredObject getOffHeapOldValue() { |
| return convertToStoredObject(basicGetOldValue()); |
| } |
| |
| private StoredObject convertToStoredObject(final Object tmp) { |
| if (!mayHaveOffHeapReferences()) { |
| return null; |
| } |
| if (!(tmp instanceof StoredObject)) { |
| return null; |
| } |
| StoredObject result = (StoredObject) tmp; |
| if (!result.retain()) { |
| return null; |
| } |
| return result; |
| } |
| |
| public Object getDeserializedValue() { |
| final Object val = basicGetNewValue(); |
| if (val instanceof CachedDeserializable) { |
| return ((CachedDeserializable) val).getDeserializedForReading(); |
| } else { |
| return val; |
| } |
| } |
| |
| public byte[] getSerializedValue() { |
| if (this.newValueBytes == null) { |
| final Object val; |
| val = basicGetNewValue(); |
| if (val instanceof byte[]) { |
| return (byte[]) val; |
| } else if (val instanceof CachedDeserializable) { |
| return ((CachedDeserializable) val).getSerializedValue(); |
| } |
| try { |
| return CacheServerHelper.serialize(val); |
| } catch (IOException ioe) { |
| throw new GemFireIOException("unexpected exception", ioe); |
| } |
| } else { |
| return this.newValueBytes; |
| } |
| } |
| |
| /** |
| * Forces this entry's new value to be in serialized form. |
| * |
| * @since GemFire 5.0.2 |
| */ |
| public void makeSerializedNewValue() { |
| makeSerializedNewValue(false); |
| } |
| |
| /** |
| * @param isSynced true if RegionEntry currently under synchronization |
| */ |
| private void makeSerializedNewValue(boolean isSynced) { |
| Object obj = basicGetNewValue(); |
| |
| // ezoerner:20080611 In the case where there is an unapplied |
| // delta, do not apply the delta or serialize yet unless entry is |
| // under synchronization (isSynced is true) |
| if (isSynced) { |
| this.setSerializationDeferred(false); |
| } |
| basicSetNewValue(getCachedDeserializable(obj, this), false); |
| } |
| |
| public static Object getCachedDeserializable(Object obj, EntryEventImpl ev) { |
| if (obj instanceof byte[] || obj == null || obj instanceof CachedDeserializable |
| || obj == Token.NOT_AVAILABLE || Token.isInvalidOrRemoved(obj) |
| // don't serialize delta object already serialized |
| || obj instanceof org.apache.geode.Delta) { // internal delta |
| return obj; |
| } |
| final CachedDeserializable cd; |
| // avoid unneeded serialization of byte[][] that |
| // will end up being deserialized in any case (serialization is cheap |
| // for byte[][] anyways) |
| if (obj instanceof byte[][]) { |
| int objSize = Sizeable.PER_OBJECT_OVERHEAD + 4; |
| for (byte[] bytes : (byte[][]) obj) { |
| if (bytes != null) { |
| objSize += CachedDeserializableFactory.getByteSize(bytes); |
| } else { |
| objSize += Sizeable.PER_OBJECT_OVERHEAD; |
| } |
| } |
| cd = CachedDeserializableFactory.create(obj, objSize, ev.getRegion().getCache()); |
| } else { |
| final byte[] b = serialize(obj); |
| cd = CachedDeserializableFactory.create(b, ev.getRegion().getCache()); |
| if (ev != null) { |
| ev.newValueBytes = b; |
| ev.cachedSerializedNewValue = b; |
| } |
| } |
| return cd; |
| } |
| |
| @Override |
| public void setCachedSerializedNewValue(byte[] v) { |
| this.cachedSerializedNewValue = v; |
| } |
| |
| @Override |
| public byte[] getCachedSerializedNewValue() { |
| return this.cachedSerializedNewValue; |
| } |
| |
| public void setSerializedNewValue(byte[] serializedValue) { |
| Object newVal = null; |
| if (serializedValue != null) { |
| newVal = CachedDeserializableFactory.create(serializedValue, getRegion().getCache()); |
| } |
| basicSetNewValue(newVal, false); |
| this.newValueBytes = serializedValue; |
| this.cachedSerializedNewValue = serializedValue; |
| } |
| |
| public void setSerializedOldValue(byte[] serializedOldValue) { |
| final Object ov; |
| if (serializedOldValue != null) { |
| ov = CachedDeserializableFactory.create(serializedOldValue, getRegion().getCache()); |
| } else { |
| ov = null; |
| } |
| retainAndSetOldValue(ov); |
| this.oldValueBytes = serializedOldValue; |
| } |
| |
| /** |
| * If true (the default) then preserve old values in events. If false then mark non-null values as |
| * being NOT_AVAILABLE. |
| */ |
| private static final boolean EVENT_OLD_VALUE = |
| !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disable-event-old-value"); |
| |
| protected boolean areOldValuesEnabled() { |
| return EVENT_OLD_VALUE; |
| } |
| |
| void putExistingEntry(final InternalRegion owner, RegionEntry entry) |
| throws RegionClearedException { |
| putExistingEntry(owner, entry, false, null); |
| } |
| |
| /** |
| * Put a newValue into the given, write synced, existing, region entry. Sets oldValue in event if |
| * hasn't been set yet. |
| * |
| * @param oldValueForDelta Used by Delta Propagation feature |
| */ |
| public void putExistingEntry(final InternalRegion owner, final RegionEntry reentry, |
| boolean requireOldValue, Object oldValueForDelta) throws RegionClearedException { |
| makeUpdate(); |
| // only set oldValue if it hasn't already been set to something |
| if (this.oldValue == null && this.oldValueBytes == null) { |
| if (!reentry.isInvalidOrRemoved()) { |
| if (requireOldValue || areOldValuesEnabled() || getRegion() instanceof HARegion) { |
| @Retained |
| Object ov; |
| if (ReferenceCountHelper.trackReferenceCounts()) { |
| ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner()); |
| ov = reentry.getValueRetain(owner, true); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| } else { |
| ov = reentry.getValueRetain(owner, true); |
| } |
| if (ov == null) { |
| ov = Token.NOT_AVAILABLE; |
| } |
| // ov has already been retained so call basicSetOldValue instead of retainAndSetOldValue |
| basicSetOldValue(ov); |
| } else { |
| basicSetOldValue(Token.NOT_AVAILABLE); |
| } |
| } |
| } |
| if (this.oldValue == Token.NOT_AVAILABLE) { |
| FilterProfile fp = getRegion().getFilterProfile(); |
| if (this.op.guaranteesOldValue() |
| || (fp != null /* #41532 */ && fp.entryRequiresOldValue(this.getKey()))) { |
| setOldValueForQueryProcessing(); |
| } |
| } |
| |
| // setNewValueInRegion(null); |
| setNewValueInRegion(owner, reentry, oldValueForDelta); |
| } |
| |
| /** |
| * If we are currently a create op then turn us into an update |
| * |
| * @since GemFire 5.0 |
| */ |
| public void makeUpdate() { |
| setOperation(this.op.getCorrespondingUpdateOp()); |
| } |
| |
| /** |
| * If we are currently an update op then turn us into a create |
| * |
| * @since GemFire 5.0 |
| */ |
| public void makeCreate() { |
| setOperation(this.op.getCorrespondingCreateOp()); |
| } |
| |
| /** |
| * Put a newValue into the given, write synced, new, region entry. |
| */ |
| public void putNewEntry(final InternalRegion owner, final RegionEntry reentry) |
| throws RegionClearedException { |
| if (!this.op.guaranteesOldValue()) { // preserves oldValue for CM ops in clients |
| basicSetOldValue(null); |
| } |
| makeCreate(); |
| setNewValueInRegion(owner, reentry, null); |
| } |
| |
| @Override |
| public void setRegionEntry(RegionEntry re) { |
| this.re = re; |
| } |
| |
| RegionEntry getRegionEntry() { |
| return this.re; |
| } |
| |
| @Retained(ENTRY_EVENT_NEW_VALUE) |
| private void setNewValueInRegion(final InternalRegion owner, final RegionEntry reentry, |
| Object oldValueForDelta) throws RegionClearedException { |
| |
| boolean wasTombstone = reentry.isTombstone(); |
| |
| // put in newValue |
| |
| // If event contains new value, then it may mean that the delta bytes should |
| // not be applied. This is possible if the event originated locally. |
| if (this.deltaBytes != null && this.newValue == null && this.newValueBytes == null) { |
| processDeltaBytes(oldValueForDelta); |
| } |
| |
| if (owner != null) { |
| owner.generateAndSetVersionTag(this, reentry); |
| } else { |
| getRegion().generateAndSetVersionTag(this, reentry); |
| } |
| |
| generateNewValueFromBytesIfNeeded(); |
| Object v = this.newValue; |
| if (v == null) { |
| v = isLocalInvalid() ? Token.LOCAL_INVALID : Token.INVALID; |
| } else { |
| getRegion().setRegionInvalid(false); |
| } |
| |
| reentry.setValueResultOfSearch(this.op.isNetSearch()); |
| |
| // dsmith:20090524 |
| // This is a horrible hack, but we need to get the size of the object |
| // When we store an entry. This code is only used when we do a put |
| // in the primary. |
| if (v instanceof org.apache.geode.Delta && getRegion().isUsedForPartitionedRegionBucket()) { |
| int vSize; |
| Object ov = basicGetOldValue(); |
| if (ov instanceof CachedDeserializable && !GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) { |
| vSize = ((CachedDeserializable) ov).getValueSizeInBytes(); |
| } else { |
| vSize = CachedDeserializableFactory.calcMemSize(v, getRegion().getObjectSizer(), false); |
| } |
| v = CachedDeserializableFactory.create(v, vSize, getRegion().getCache()); |
| basicSetNewValue(v, true); |
| } |
| |
| Object preparedV = reentry.prepareValueForCache(getRegion(), v, this, false); |
| if (preparedV != v) { |
| v = preparedV; |
| if (v instanceof StoredObject) { |
| if (!((StoredObject) v).isCompressed()) { // fix bug 52109 |
| // If we put it off heap and it is not compressed then remember that value. |
| // Otherwise we want to remember the decompressed value in the event. |
| basicSetNewValue(v, false); |
| } |
| } |
| } |
| boolean isTombstone = (v == Token.TOMBSTONE); |
| boolean success = false; |
| boolean calledSetValue = false; |
| try { |
| setNewValueBucketSize(owner, v); |
| |
| // ezoerner:20081030 |
| // last possible moment to do index maintenance with old value in |
| // RegionEntry before new value is set. |
| // As part of an update, this is a remove operation as prelude to an add that |
| // will come after the new value is set. |
| // If this is an "update" from INVALID state, treat this as a create instead |
| // for the purpose of index maintenance since invalid entries are not |
| // indexed. |
| |
| if ((this.op.isUpdate() && !reentry.isInvalid()) || this.op.isInvalidate()) { |
| IndexManager idxManager = |
| IndexUtils.getIndexManager(getRegion().getCache(), getRegion(), false); |
| if (idxManager != null) { |
| try { |
| idxManager.updateIndexes(reentry, IndexManager.REMOVE_ENTRY, |
| this.op.isUpdate() ? IndexProtocol.BEFORE_UPDATE_OP : IndexProtocol.OTHER_OP); |
| } catch (QueryException e) { |
| throw new IndexMaintenanceException(e); |
| } |
| } |
| } |
| calledSetValue = true; |
| reentry.setValueWithTombstoneCheck(v, this); // already called prepareValueForCache |
| success = true; |
| } finally { |
| if (!success && reentry instanceof OffHeapRegionEntry && v instanceof StoredObject) { |
| if (!calledSetValue) { |
| OffHeapHelper.release(v); |
| } else { |
| OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry) reentry, (StoredObject) v); |
| } |
| } |
| } |
| if (logger.isTraceEnabled()) { |
| if (v instanceof CachedDeserializable) { |
| logger.trace("EntryEventImpl.setNewValueInRegion: put CachedDeserializable({},{})", |
| this.getKey(), ((CachedDeserializable) v).getStringForm()); |
| } else { |
| logger.trace("EntryEventImpl.setNewValueInRegion: put({},{})", this.getKey(), |
| StringUtils.forceToString(v)); |
| } |
| } |
| |
| if (!isTombstone && wasTombstone) { |
| owner.unscheduleTombstone(reentry); |
| } |
| } |
| |
| /** |
| * The size the new value contributes to a pr bucket. Note if this event is not on a pr then this |
| * value will be 0. |
| */ |
| private transient int newValueBucketSize; |
| |
| public int getNewValueBucketSize() { |
| return this.newValueBucketSize; |
| } |
| |
| private void setNewValueBucketSize(InternalRegion lr, Object v) { |
| if (lr == null) { |
| lr = getRegion(); |
| } |
| this.newValueBucketSize = lr.calculateValueSize(v); |
| } |
| |
| private void processDeltaBytes(Object oldValueInVM) { |
| if (!getRegion().hasSeenEvent(this)) { |
| if (oldValueInVM == null || Token.isInvalidOrRemoved(oldValueInVM)) { |
| getRegion().getCachePerfStats().incDeltaFailedUpdates(); |
| throw new InvalidDeltaException("Old value not found for key " + this.keyInfo.getKey()); |
| } |
| FilterProfile fp = getRegion().getFilterProfile(); |
| // If compression is enabled then we've already gotten a new copy due to the |
| // serializaion and deserialization that occurs. |
| boolean copy = getRegion().getCompressor() == null && (getRegion().isCopyOnRead() |
| || getRegion().getCloningEnabled() || (fp != null && fp.getCqCount() > 0)); |
| Object value = oldValueInVM; |
| boolean wasCD = false; |
| if (value instanceof CachedDeserializable) { |
| wasCD = true; |
| if (copy) { |
| value = ((CachedDeserializable) value).getDeserializedWritableCopy(getRegion(), re); |
| } else { |
| value = ((CachedDeserializable) value).getDeserializedValue(getRegion(), re); |
| } |
| } else { |
| if (copy) { |
| value = CopyHelper.copy(value); |
| } |
| } |
| boolean deltaBytesApplied = false; |
| try { |
| long start = getRegion().getCachePerfStats().getTime(); |
| ((org.apache.geode.Delta) value) |
| .fromDelta(new ByteArrayDataInput(getDeltaBytes())); |
| getRegion().getCachePerfStats().endDeltaUpdate(start); |
| deltaBytesApplied = true; |
| } catch (RuntimeException rte) { |
| throw rte; |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| SystemFailure.checkFailure(); |
| throw new DeltaSerializationException("Exception while deserializing delta bytes.", t); |
| } finally { |
| if (!deltaBytesApplied) { |
| getRegion().getCachePerfStats().incDeltaFailedUpdates(); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Delta has been applied for key {}", getKey()); |
| } |
| // assert event.getNewValue() == null; |
| if (wasCD) { |
| CachedDeserializable old = (CachedDeserializable) oldValueInVM; |
| int valueSize; |
| if (GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) { |
| valueSize = |
| CachedDeserializableFactory.calcMemSize(value, getRegion().getObjectSizer(), false); |
| } else { |
| valueSize = old.getValueSizeInBytes(); |
| } |
| value = CachedDeserializableFactory.create(value, valueSize, getRegion().getCache()); |
| } |
| setNewValue(value); |
| if (this.causedByMessage != null && this.causedByMessage instanceof PutMessage) { |
| ((PutMessage) this.causedByMessage).setDeltaValObj(value); |
| } |
| } else { |
| getRegion().getCachePerfStats().incDeltaFailedUpdates(); |
| throw new InvalidDeltaException( |
| "Cache encountered replay of event containing delta bytes for key " |
| + this.keyInfo.getKey()); |
| } |
| } |
| |
| void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable) { |
| if (Token.isInvalidOrRemoved(oldVal)) { |
| oldVal = null; |
| } else { |
| if (mustBeAvailable || oldVal == null || areOldValuesEnabled()) { |
| // set oldValue to oldVal |
| } else { |
| oldVal = Token.NOT_AVAILABLE; |
| } |
| } |
| retainAndSetOldValue(oldVal); |
| } |
| |
| void putValueTXEntry(final TXEntryState tx) { |
| Object v = basicGetNewValue(); |
| if (v == null) { |
| if (deltaBytes != null) { |
| // since newValue is null, and we have deltaBytes |
| // there must be a nearSidePendingValue |
| processDeltaBytes(tx.getNearSidePendingValue()); |
| v = basicGetNewValue(); |
| } else { |
| v = isLocalInvalid() ? Token.LOCAL_INVALID : Token.INVALID; |
| } |
| } |
| |
| if (this.op != Operation.LOCAL_INVALIDATE && this.op != Operation.LOCAL_DESTROY) { |
| // fix for bug 34387 |
| Object pv = v; |
| if (mayHaveOffHeapReferences()) { |
| pv = OffHeapHelper.copyIfNeeded(v, getRegion().getCache()); |
| } |
| tx.setPendingValue(pv); |
| } |
| tx.setCallbackArgument(getCallbackArgument()); |
| } |
| |
| public void setOldValueFromRegion() { |
| try { |
| RegionEntry re = getRegion().getRegionEntry(getKey()); |
| if (re == null) { |
| return; |
| } |
| ReferenceCountHelper.skipRefCountTracking(); |
| Object v = re.getValueRetain(getRegion(), true); |
| if (v == null) { |
| v = Token.NOT_AVAILABLE; |
| } |
| ReferenceCountHelper.unskipRefCountTracking(); |
| try { |
| setOldValue(v); |
| } finally { |
| if (mayHaveOffHeapReferences()) { |
| OffHeapHelper.releaseWithNoTracking(v); |
| } |
| } |
| } catch (EntryNotFoundException ignore) { |
| } |
| } |
| |
| /** Return true if old value is the DESTROYED token */ |
| boolean oldValueIsDestroyedToken() { |
| return this.oldValue == Token.DESTROYED || this.oldValue == Token.TOMBSTONE; |
| } |
| |
| public void setOldValueDestroyedToken() { |
| basicSetOldValue(Token.DESTROYED); |
| } |
| |
| public void setOldValue(Object v) { |
| setOldValue(v, false); |
| } |
| |
| |
| /** |
| * @param force true if the old value should be forcibly set, methods like putIfAbsent, etc., |
| * where the old value must be available. |
| */ |
| public void setOldValue(Object v, boolean force) { |
| if (v != null) { |
| if (Token.isInvalidOrRemoved(v)) { |
| v = null; |
| } else if (shouldOldValueBeUnavailable(v, force)) { |
| v = Token.NOT_AVAILABLE; |
| } |
| } |
| retainAndSetOldValue(v); |
| } |
| |
| private boolean shouldOldValueBeUnavailable(Object v, boolean force) { |
| if (force) { |
| return false; |
| } |
| if (areOldValuesEnabled()) { |
| return false; |
| } |
| if (getRegion() instanceof HARegion) { |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * sets the old value for concurrent map operation results received from a server. |
| */ |
| public void setConcurrentMapOldValue(Object v) { |
| if (Token.isRemoved(v)) { |
| return; |
| } else { |
| if (Token.isInvalid(v)) { |
| v = null; |
| } |
| retainAndSetOldValue(v); |
| } |
| } |
| |
| /** Return true if new value available */ |
| public boolean hasNewValue() { |
| if (this.newValueBytes != null) { |
| return true; |
| } |
| Object tmp = this.newValue; |
| return tmp != null && tmp != Token.NOT_AVAILABLE; |
| } |
| |
| public boolean hasOldValue() { |
| if (this.oldValueBytes != null) { |
| return true; |
| } |
| return this.oldValue != null && this.oldValue != Token.NOT_AVAILABLE; |
| } |
| |
| public boolean isOldValueAToken() { |
| return this.oldValue instanceof Token; |
| } |
| |
| @Override |
| public boolean isOldValueAvailable() { |
| if (isOriginRemote() && getRegion().isProxy()) { |
| return false; |
| } else { |
| return basicGetOldValue() != Token.NOT_AVAILABLE; |
| } |
| } |
| |
| public void oldValueNotAvailable() { |
| basicSetOldValue(Token.NOT_AVAILABLE); |
| } |
| |
| public static Object deserialize(byte[] bytes) { |
| return deserialize(bytes, null, null); |
| } |
| |
| public static Object deserialize(byte[] bytes, Version version, ByteArrayDataInput in) { |
| if (bytes == null) |
| return null; |
| try { |
| return BlobHelper.deserializeBlob(bytes, version, in); |
| } catch (IOException e) { |
| throw new SerializationException( |
| "An IOException was thrown while deserializing", |
| e); |
| } catch (ClassNotFoundException e) { |
| // fix for bug 43602 |
| throw new SerializationException( |
| "A ClassNotFoundException was thrown while trying to deserialize cached value.", |
| e); |
| } |
| } |
| |
| /** |
| * If a PdxInstance is returned then it will have an unretained reference to the StoredObject's |
| * off-heap address. |
| */ |
| public static @Unretained Object deserializeOffHeap(StoredObject bytes) { |
| if (bytes == null) |
| return null; |
| try { |
| return BlobHelper.deserializeOffHeapBlob(bytes); |
| } catch (IOException e) { |
| throw new SerializationException( |
| "An IOException was thrown while deserializing", |
| e); |
| } catch (ClassNotFoundException e) { |
| // fix for bug 43602 |
| throw new SerializationException( |
| "A ClassNotFoundException was thrown while trying to deserialize cached value.", |
| e); |
| } |
| } |
| |
| /** |
| * Serialize an object into a <code>byte[]</code> |
| * |
| * @throws IllegalArgumentException If <code>obj</code> should not be serialized |
| */ |
| public static byte[] serialize(Object obj) { |
| return serialize(obj, null); |
| } |
| |
| /** |
| * Serialize an object into a <code>byte[]</code> |
| * |
| * @throws IllegalArgumentException If <code>obj</code> should not be serialized |
| */ |
| public static byte[] serialize(Object obj, Version version) { |
| if (obj == null || obj == Token.NOT_AVAILABLE || Token.isInvalidOrRemoved(obj)) |
| throw new IllegalArgumentException( |
| String.format("Must not serialize %s in this context.", |
| obj)); |
| try { |
| return BlobHelper.serializeToBlob(obj, version); |
| } catch (IOException e) { |
| throw new SerializationException( |
| "An IOException was thrown while serializing.", |
| e); |
| } |
| } |
| |
| |
| /** |
| * Serialize an object into a <code>byte[]</code> . If the byte array provided by the wrapper is |
| * sufficient to hold the data, it is used otherwise a new byte array gets created & its reference |
| * is stored in the wrapper. The User Bit is also appropriately set as Serialized |
| * |
| * @param wrapper Object of type BytesAndBitsForCompactor which is used to fetch the serialized |
| * data. The byte array of the wrapper is used if possible else a the new byte array |
| * containing the data is set in the wrapper. |
| * @throws IllegalArgumentException If <code>obj</code> should not be serialized |
| */ |
| public static void fillSerializedValue(BytesAndBitsForCompactor wrapper, Object obj, |
| byte userBits) { |
| if (obj == null || obj == Token.NOT_AVAILABLE || Token.isInvalidOrRemoved(obj)) |
| throw new IllegalArgumentException( |
| String.format("Must not serialize %s in this context.", obj)); |
| try { |
| HeapDataOutputStream hdos = null; |
| if (wrapper.getBytes().length < 32) { |
| hdos = new HeapDataOutputStream(Version.CURRENT); |
| } else { |
| hdos = new HeapDataOutputStream(wrapper.getBytes()); |
| } |
| DataSerializer.writeObject(obj, hdos); |
| // return hdos.toByteArray(); |
| hdos.sendTo(wrapper, userBits); |
| } catch (IOException e) { |
| RuntimeException e2 = new IllegalArgumentException( |
| "An IOException was thrown while serializing."); |
| e2.initCause(e); |
| throw e2; |
| } |
| } |
| |
| protected String getShortClassName() { |
| String cname = getClass().getName(); |
| return cname.substring(getClass().getPackage().getName().length() + 1); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder buf = new StringBuilder(); |
| buf.append(getShortClassName()); |
| buf.append("["); |
| |
| buf.append("op="); |
| buf.append(getOperation()); |
| buf.append(";region="); |
| buf.append(getRegion().getFullPath()); |
| buf.append(";key="); |
| buf.append(this.getKey()); |
| if (Boolean.getBoolean("gemfire.insecure-logvalues")) { |
| buf.append(";oldValue="); |
| if (mayHaveOffHeapReferences()) { |
| synchronized (this.offHeapLock) { |
| try { |
| ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf); |
| } catch (IllegalStateException ignore) { |
| buf.append("OFFHEAP_VALUE_FREED"); |
| } |
| } |
| } else { |
| ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf); |
| } |
| |
| buf.append(";newValue="); |
| if (mayHaveOffHeapReferences()) { |
| synchronized (this.offHeapLock) { |
| try { |
| ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf); |
| } catch (IllegalStateException ignore) { |
| buf.append("OFFHEAP_VALUE_FREED"); |
| } |
| } |
| } else { |
| ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf); |
| } |
| } |
| buf.append(";callbackArg="); |
| buf.append(this.getRawCallbackArgument()); |
| buf.append(";originRemote="); |
| buf.append(isOriginRemote()); |
| buf.append(";originMember="); |
| buf.append(getDistributedMember()); |
| if (this.isPossibleDuplicate()) { |
| buf.append(";posDup"); |
| } |
| if (callbacksInvoked()) { |
| buf.append(";callbacksInvoked"); |
| } |
| if (inhibitCacheListenerNotification()) { |
| buf.append(";inhibitCacheListenerNotification"); |
| } |
| if (this.versionTag != null) { |
| buf.append(";version=").append(this.versionTag); |
| } |
| if (getContext() != null) { |
| buf.append(";context="); |
| buf.append(getContext()); |
| } |
| if (this.eventID != null) { |
| buf.append(";id="); |
| buf.append(this.eventID); |
| } |
| if (this.deltaBytes != null) { |
| buf.append(";[").append(this.deltaBytes.length).append(" deltaBytes]"); |
| } |
| if (this.filterInfo != null) { |
| buf.append(";routing="); |
| buf.append(this.filterInfo); |
| } |
| if (this.isFromServer()) { |
| buf.append(";isFromServer"); |
| } |
| if (this.isConcurrencyConflict()) { |
| buf.append(";isInConflict"); |
| } |
| if (this.getInhibitDistribution()) { |
| buf.append(";inhibitDistribution"); |
| } |
| if (this.tailKey != -1) { |
| buf.append(";tailKey=" + tailKey); |
| } |
| buf.append("]"); |
| return buf.toString(); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return ENTRY_EVENT; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| context.getSerializer().writeObject(this.eventID, out); |
| context.getSerializer().writeObject(this.getKey(), out); |
| context.getSerializer().writeObject(this.keyInfo.getValue(), out); |
| out.writeByte(this.op.ordinal); |
| out.writeShort(this.eventFlags & EventFlags.FLAG_TRANSIENT_MASK); |
| context.getSerializer().writeObject(this.getRawCallbackArgument(), out); |
| context.getSerializer().writeObject(this.txId, out); |
| |
| { |
| out.writeBoolean(false); |
| { |
| Object nv = basicGetNewValue(); |
| boolean newValueSerialized = nv instanceof CachedDeserializable; |
| if (newValueSerialized) { |
| newValueSerialized = ((CachedDeserializable) nv).isSerialized(); |
| } |
| out.writeBoolean(newValueSerialized); |
| if (newValueSerialized) { |
| if (this.newValueBytes != null) { |
| DataSerializer.writeByteArray(this.newValueBytes, out); |
| } else if (this.cachedSerializedNewValue != null) { |
| DataSerializer.writeByteArray(this.cachedSerializedNewValue, out); |
| } else { |
| CachedDeserializable cd = (CachedDeserializable) nv; |
| DataSerializer.writeObjectAsByteArray(cd.getValue(), out); |
| } |
| } else { |
| context.getSerializer().writeObject(nv, out); |
| } |
| } |
| } |
| |
| { |
| Object ov = basicGetOldValue(); |
| boolean oldValueSerialized = ov instanceof CachedDeserializable; |
| if (oldValueSerialized) { |
| oldValueSerialized = ((CachedDeserializable) ov).isSerialized(); |
| } |
| out.writeBoolean(oldValueSerialized); |
| if (oldValueSerialized) { |
| if (this.oldValueBytes != null) { |
| DataSerializer.writeByteArray(this.oldValueBytes, out); |
| } else { |
| CachedDeserializable cd = (CachedDeserializable) ov; |
| DataSerializer.writeObjectAsByteArray(cd.getValue(), out); |
| } |
| } else { |
| ov = AbstractRegion.handleNotAvailable(ov); |
| context.getSerializer().writeObject(ov, out); |
| } |
| } |
| InternalDataSerializer.invokeToData((InternalDistributedMember) this.distributedMember, out); |
| context.getSerializer().writeObject(getContext(), out); |
| DataSerializer.writeLong(tailKey, out); |
| } |
| |
| private abstract static class EventFlags { |
| private static final short FLAG_ORIGIN_REMOTE = 0x01; |
| // localInvalid: true if a null new value should be treated as a local |
| // invalid. |
| private static final short FLAG_LOCAL_INVALID = 0x02; |
| private static final short FLAG_GENERATE_CALLBACKS = 0x04; |
| private static final short FLAG_POSSIBLE_DUPLICATE = 0x08; |
| private static final short FLAG_INVOKE_PR_CALLBACKS = 0x10; |
| private static final short FLAG_CONCURRENCY_CONFLICT = 0x20; |
| private static final short FLAG_INHIBIT_LISTENER_NOTIFICATION = 0x40; |
| private static final short FLAG_CALLBACKS_INVOKED = 0x80; |
| private static final short FLAG_ISCREATE = 0x100; |
| private static final short FLAG_SERIALIZATION_DEFERRED = 0x200; |
| private static final short FLAG_FROM_SERVER = 0x400; |
| private static final short FLAG_FROM_RI_LOCAL_DESTROY = 0x800; |
| private static final short FLAG_INHIBIT_DISTRIBUTION = 0x1000; |
| private static final short FLAG_REDESTROYED_TOMBSTONE = 0x2000; |
| private static final short FLAG_INHIBIT_ALL_NOTIFICATIONS = 0x4000; |
| |
| /** mask for clearing transient flags when serializing */ |
| private static final short FLAG_TRANSIENT_MASK = ~(FLAG_CALLBACKS_INVOKED | FLAG_ISCREATE |
| | FLAG_INHIBIT_LISTENER_NOTIFICATION | FLAG_SERIALIZATION_DEFERRED | FLAG_FROM_SERVER |
| | FLAG_FROM_RI_LOCAL_DESTROY | FLAG_INHIBIT_DISTRIBUTION | FLAG_REDESTROYED_TOMBSTONE); |
| |
| protected static boolean isSet(short flags, short mask) { |
| return (flags & mask) != 0; |
| } |
| |
| /** WARNING: Does not set the bit in place, returns new short with bit set */ |
| protected static short set(short flags, short mask, boolean on) { |
| return (short) (on ? (flags | mask) : (flags & ~mask)); |
| } |
| } |
| |
| /** |
| * @return null if old value is not serialized; otherwise returns a SerializedCacheValueImpl |
| * containing the old value. |
| */ |
| @Override |
| public SerializedCacheValue<?> getSerializedOldValue() { |
| @Unretained(ENTRY_EVENT_OLD_VALUE) |
| final Object tmp = basicGetOldValue(); |
| if (tmp instanceof CachedDeserializable) { |
| CachedDeserializable cd = (CachedDeserializable) tmp; |
| if (!cd.isSerialized()) { |
| return null; |
| } |
| return new SerializedCacheValueImpl(this, getRegion(), this.re, cd, this.oldValueBytes); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Compute an estimate of the size of the new value for a PR. Since PR's always store values in a |
| * cached deserializable we need to compute its size as a blob. |
| * |
| * @return the size of serialized bytes for the new value |
| */ |
| public int getNewValSizeForPR() { |
| int newSize = 0; |
| Object v = basicGetNewValue(); |
| if (v != null) { |
| try { |
| newSize = CachedDeserializableFactory.calcSerializedSize(v) |
| + CachedDeserializableFactory.overhead(); |
| } catch (IllegalArgumentException iae) { |
| logger.warn("DataStore failed to calculate size of new value", |
| iae); |
| newSize = 0; |
| } |
| } |
| return newSize; |
| } |
| |
| /** |
| * Compute an estimate of the size of the old value |
| * |
| * @return the size of serialized bytes for the old value |
| */ |
| public int getOldValSize() { |
| int oldSize = 0; |
| if (hasOldValue()) { |
| try { |
| oldSize = CachedDeserializableFactory.calcMemSize(basicGetOldValue()); |
| } catch (IllegalArgumentException iae) { |
| logger.warn("DataStore failed to calculate size of old value", |
| iae); |
| oldSize = 0; |
| } |
| } |
| return oldSize; |
| } |
| |
| @Override |
| public EnumListenerEvent getEventType() { |
| return this.eventType; |
| } |
| |
| /** |
| * Sets the operation type. |
| */ |
| @Override |
| public void setEventType(EnumListenerEvent eventType) { |
| this.eventType = eventType; |
| } |
| |
| /** |
| * set this to true after dispatching the event to a cache listener |
| */ |
| public void callbacksInvoked(boolean dispatched) { |
| setEventFlag(EventFlags.FLAG_CALLBACKS_INVOKED, dispatched); |
| } |
| |
| /** |
| * has this event been dispatched to a cache listener? |
| */ |
| public boolean callbacksInvoked() { |
| return testEventFlag(EventFlags.FLAG_CALLBACKS_INVOKED); |
| } |
| |
| /** |
| * set this to true to inhibit application cache listener notification during event dispatching |
| */ |
| public void inhibitCacheListenerNotification(boolean inhibit) { |
| setEventFlag(EventFlags.FLAG_INHIBIT_LISTENER_NOTIFICATION, inhibit); |
| } |
| |
| /** |
| * are events being inhibited from dispatch to application cache listeners for this event? |
| */ |
| public boolean inhibitCacheListenerNotification() { |
| return testEventFlag(EventFlags.FLAG_INHIBIT_LISTENER_NOTIFICATION); |
| } |
| |
| |
| /** |
| * dispatch listener events for this event |
| * |
| * @param notifyGateways pass the event on to WAN queues |
| */ |
| public void invokeCallbacks(InternalRegion rgn, boolean skipListeners, boolean notifyGateways) { |
| if (!callbacksInvoked()) { |
| callbacksInvoked(true); |
| if (this.op.isUpdate()) { |
| rgn.invokePutCallbacks(EnumListenerEvent.AFTER_UPDATE, this, !skipListeners, |
| notifyGateways); // gateways are notified in part2 processing |
| } else if (this.op.isCreate()) { |
| rgn.invokePutCallbacks(EnumListenerEvent.AFTER_CREATE, this, !skipListeners, |
| notifyGateways); |
| } else if (this.op.isDestroy()) { |
| rgn.invokeDestroyCallbacks(EnumListenerEvent.AFTER_DESTROY, this, !skipListeners, |
| notifyGateways); |
| } else if (this.op.isInvalidate()) { |
| rgn.invokeInvalidateCallbacks(EnumListenerEvent.AFTER_INVALIDATE, this, !skipListeners); |
| } |
| } |
| } |
| |
| private void setFromRILocalDestroy(boolean on) { |
| setEventFlag(EventFlags.FLAG_FROM_RI_LOCAL_DESTROY, on); |
| } |
| |
| public boolean isFromRILocalDestroy() { |
| return testEventFlag(EventFlags.FLAG_FROM_RI_LOCAL_DESTROY); |
| } |
| |
| protected Long tailKey = -1L; |
| |
| /** |
| * Used to store next region version generated for a change on this entry by phase-1 commit on the |
| * primary. |
| * |
| * Not to be used in fromData and toData |
| */ |
| protected transient long nextRegionVersion = -1L; |
| |
| public void setNextRegionVersion(long regionVersion) { |
| this.nextRegionVersion = regionVersion; |
| } |
| |
| public long getNextRegionVersion() { |
| return this.nextRegionVersion; |
| } |
| |
| /** |
| * Return true if this event came from a server by the client doing a get. |
| * |
| * @since GemFire 5.7 |
| */ |
| public boolean isFromServer() { |
| return testEventFlag(EventFlags.FLAG_FROM_SERVER); |
| } |
| |
| /** |
| * Sets the fromServer flag to v. This must be set to true if an event comes from a server while |
| * the affected region entry is not locked. Among other things it causes version conflict checks |
| * to be performed to protect against overwriting a newer version of the entry. |
| * |
| * @since GemFire 5.7 |
| */ |
| public void setFromServer(boolean v) { |
| setEventFlag(EventFlags.FLAG_FROM_SERVER, v); |
| } |
| |
| /** |
| * If true, the region associated with this event had already applied the operation it |
| * encapsulates when an attempt was made to apply the event. |
| * |
| * @return the possibleDuplicate |
| */ |
| public boolean isPossibleDuplicate() { |
| return testEventFlag(EventFlags.FLAG_POSSIBLE_DUPLICATE); |
| } |
| |
| /** |
| * If the operation encapsulated by this event has already been seen by the region to which it |
| * pertains, this flag should be set to true. |
| * |
| * @param possibleDuplicate the possibleDuplicate to set |
| */ |
| public void setPossibleDuplicate(boolean possibleDuplicate) { |
| setEventFlag(EventFlags.FLAG_POSSIBLE_DUPLICATE, possibleDuplicate); |
| } |
| |
| |
| /** |
| * are events being inhibited from dispatch to to gateway/async queues, client queues, cache |
| * listener and cache write. If set, sending notifications for the data that is read from a |
| * persistent store (HDFS) and is being reinserted in the cache is skipped. |
| */ |
| public boolean inhibitAllNotifications() { |
| return testEventFlag(EventFlags.FLAG_INHIBIT_ALL_NOTIFICATIONS); |
| |
| } |
| |
| /** |
| * set this to true to inhibit notifications that are sent to gateway/async queues, client queues, |
| * cache listener and cache write. This is used to skip sending notifications for the data that is |
| * read from a persistent store (HDFS) and is being reinserted in the cache |
| */ |
| public void setInhibitAllNotifications(boolean inhibit) { |
| setEventFlag(EventFlags.FLAG_INHIBIT_ALL_NOTIFICATIONS, inhibit); |
| } |
| |
| /** |
| * sets the routing information for cache clients |
| */ |
| @Override |
| public void setLocalFilterInfo(FilterInfo info) { |
| this.filterInfo = info; |
| } |
| |
| /** |
| * retrieves the routing information for cache clients in this VM |
| */ |
| @Override |
| public FilterInfo getLocalFilterInfo() { |
| return this.filterInfo; |
| } |
| |
| /** |
| * This method returns the delta bytes used in Delta Propagation feature. <B>For internal delta, |
| * see getRawNewValue().</B> |
| * |
| * @return delta bytes |
| */ |
| public byte[] getDeltaBytes() { |
| return deltaBytes; |
| } |
| |
| /** |
| * This method sets the delta bytes used in Delta Propagation feature. <B>For internal delta, see |
| * setNewValue().</B> |
| */ |
| public void setDeltaBytes(byte[] deltaBytes) { |
| this.deltaBytes = deltaBytes; |
| } |
| |
| // TODO (ashetkar) Can this.op.isCreate() be used instead? |
| public boolean isCreate() { |
| return testEventFlag(EventFlags.FLAG_ISCREATE); |
| } |
| |
| /** |
| * this is used to distinguish an event that merely has Operation.CREATE from one that originated |
| * from Region.create() for delta processing purposes. |
| */ |
| public EntryEventImpl setCreate(boolean isCreate) { |
| setEventFlag(EventFlags.FLAG_ISCREATE, isCreate); |
| return this; |
| } |
| |
| /** |
| * @return the keyInfo |
| */ |
| public KeyInfo getKeyInfo() { |
| return keyInfo; |
| } |
| |
| public void setKeyInfo(KeyInfo keyInfo) { |
| this.keyInfo = keyInfo; |
| } |
| |
| /** |
| * establish the old value in this event as the current cache value, whether in memory or on disk |
| */ |
| public void setOldValueForQueryProcessing() { |
| RegionEntry reentry = getRegion().getRegionMap().getEntry(this.getKey()); |
| if (reentry != null) { |
| @Retained |
| Object v = reentry.getValueOffHeapOrDiskWithoutFaultIn(getRegion()); |
| if (!(v instanceof Token)) { |
| // v has already been retained. |
| basicSetOldValue(v); |
| // this event now owns the retention of v. |
| } |
| } |
| } |
| |
| @Override |
| public Version[] getSerializationVersions() { |
| return null; |
| } |
| |
| /** |
| * @param versionTag the versionTag to set |
| */ |
| public void setVersionTag(VersionTag versionTag) { |
| this.versionTag = versionTag; |
| } |
| |
| /** |
| * @return the concurrency versioning tag for this event, if any |
| */ |
| @Override |
| public VersionTag getVersionTag() { |
| return this.versionTag; |
| } |
| |
| /** |
| * @return if there's no valid version tag for this event |
| */ |
| public boolean hasValidVersionTag() { |
| return this.versionTag != null && this.versionTag.hasValidVersion(); |
| } |
| |
| /** |
| * this method joins together version tag timestamps and the "lastModified" timestamps generated |
| * and stored in entries. If a change does not already carry a lastModified timestamp |
| * |
| * @return the timestamp to store in the entry |
| */ |
| public long getEventTime(long suggestedTime) { |
| long result = suggestedTime; |
| if (this.versionTag != null && getRegion().getConcurrencyChecksEnabled()) { |
| if (suggestedTime != 0) { |
| this.versionTag.setVersionTimeStamp(suggestedTime); |
| } else { |
| result = this.versionTag.getVersionTimeStamp(); |
| } |
| } |
| if (result <= 0) { |
| InternalRegion region = this.getRegion(); |
| if (region != null) { |
| result = region.cacheTimeMillis(); |
| } else { |
| result = System.currentTimeMillis(); |
| } |
| } |
| return result; |
| } |
| |
| public static class SerializedCacheValueImpl |
| implements SerializedCacheValue, CachedDeserializable, Sendable { |
| private final EntryEventImpl event; |
| @Unretained |
| private final CachedDeserializable cd; |
| private final Region r; |
| private final RegionEntry re; |
| private final byte[] serializedValue; |
| |
| SerializedCacheValueImpl(EntryEventImpl event, Region r, RegionEntry re, |
| @Unretained CachedDeserializable cd, byte[] serializedBytes) { |
| if (event.isOffHeapReference(cd)) { |
| this.event = event; |
| } else { |
| this.event = null; |
| } |
| this.r = r; |
| this.re = re; |
| this.cd = cd; |
| this.serializedValue = serializedBytes; |
| } |
| |
| @Override |
| public byte[] getSerializedValue() { |
| if (this.serializedValue != null) { |
| return this.serializedValue; |
| } |
| return callWithOffHeapLock(cd -> { |
| return cd.getSerializedValue(); |
| }); |
| } |
| |
| private CachedDeserializable getCd() { |
| if (this.event != null && !this.event.offHeapOk) { |
| throw new IllegalStateException( |
| "Attempt to access off heap value after the EntryEvent was released."); |
| } |
| return this.cd; |
| } |
| |
| /** |
| * The only methods that need to use this method are those on the external SerializedCacheValue |
| * interface and any other method that a customer could call that may access the off-heap |
| * values. For example if toString was implemented on this class to access the value then it |
| * would need to use this method. |
| */ |
| private <R> R callWithOffHeapLock(Function<CachedDeserializable, R> function) { |
| if (this.event != null) { |
| // this call does not use getCd() to access this.cd |
| // because the check for offHeapOk is done by event.callWithOffHeapLock |
| return this.event.callWithOffHeapLock(this.cd, function); |
| } else { |
| return function.apply(getCd()); |
| } |
| } |
| |
| @Override |
| public Object getDeserializedValue() { |
| return getDeserializedValue(this.r, this.re); |
| } |
| |
| @Override |
| public Object getDeserializedForReading() { |
| return getCd().getDeserializedForReading(); |
| } |
| |
| @Override |
| public Object getDeserializedWritableCopy(Region rgn, RegionEntry entry) { |
| return getCd().getDeserializedWritableCopy(rgn, entry); |
| } |
| |
| @Override |
| public Object getDeserializedValue(Region rgn, RegionEntry reentry) { |
| return callWithOffHeapLock(cd -> { |
| return cd.getDeserializedValue(rgn, reentry); |
| }); |
| } |
| |
| @Override |
| public Object getValue() { |
| if (this.serializedValue != null) { |
| return this.serializedValue; |
| } |
| return getCd().getValue(); |
| } |
| |
| @Override |
| public void writeValueAsByteArray(DataOutput out) throws IOException { |
| if (this.serializedValue != null) { |
| DataSerializer.writeByteArray(this.serializedValue, out); |
| } else { |
| getCd().writeValueAsByteArray(out); |
| } |
| } |
| |
| @Override |
| public void fillSerializedValue(BytesAndBitsForCompactor wrapper, byte userBits) { |
| if (this.serializedValue != null) { |
| wrapper.setData(this.serializedValue, userBits, this.serializedValue.length, |
| false /* Not Reusable as it refers to underlying value */); |
| } else { |
| getCd().fillSerializedValue(wrapper, userBits); |
| } |
| } |
| |
| @Override |
| public int getValueSizeInBytes() { |
| return getCd().getValueSizeInBytes(); |
| } |
| |
| @Override |
| public int getSizeInBytes() { |
| return getCd().getSizeInBytes(); |
| } |
| |
| @Override |
| public String getStringForm() { |
| return getCd().getStringForm(); |
| } |
| |
| @Override |
| public void sendTo(DataOutput out) throws IOException { |
| DataSerializer.writeObject(getCd(), out); |
| } |
| |
| @Override |
| public boolean isSerialized() { |
| return getCd().isSerialized(); |
| } |
| |
| @Override |
| public boolean usesHeapForStorage() { |
| return getCd().usesHeapForStorage(); |
| } |
| } |
| ////////////////////////////////////////////////////////////////////////////////////////// |
| |
| public void setTailKey(Long tailKey) { |
| this.tailKey = tailKey; |
| } |
| |
| public Long getTailKey() { |
| return this.tailKey; |
| } |
| |
| private Thread invokeCallbacksThread; |
| |
| /** |
| * Mark this event as having its callbacks invoked by the current thread. Note this is done just |
| * before the actual invocation of the callbacks. |
| */ |
| public void setCallbacksInvokedByCurrentThread() { |
| this.invokeCallbacksThread = Thread.currentThread(); |
| } |
| |
| /** |
| * Return true if this event was marked as having its callbacks invoked by the current thread. |
| */ |
| public boolean getCallbacksInvokedByCurrentThread() { |
| if (this.invokeCallbacksThread == null) |
| return false; |
| return Thread.currentThread().equals(this.invokeCallbacksThread); |
| } |
| |
| /** |
| * Returns whether this event is on the PDX type region. |
| * |
| * @return whether this event is on the PDX type region |
| */ |
| public boolean isOnPdxTypeRegion() { |
| return PeerTypeRegistration.REGION_FULL_PATH.equals(getRegion().getFullPath()); |
| } |
| |
| /** |
| * returns true if it is okay to process this event even though it has a null version |
| */ |
| public boolean noVersionReceivedFromServer() { |
| return versionTag == null && getRegion().getConcurrencyChecksEnabled() |
| && getRegion().getServerProxy() != null && !op.isLocal() && !isOriginRemote(); |
| } |
| |
| /** returns a copy of this event with the additional fields for WAN conflict resolution */ |
| @Retained |
| public TimestampedEntryEvent getTimestampedEvent(final int newDSID, final int oldDSID, |
| final long newTimestamp, final long oldTimestamp) { |
| return new TimestampedEntryEventImpl(this, newDSID, oldDSID, newTimestamp, oldTimestamp); |
| } |
| |
| private void setSerializationDeferred(boolean serializationDeferred) { |
| setEventFlag(EventFlags.FLAG_SERIALIZATION_DEFERRED, serializationDeferred); |
| } |
| |
| private boolean isSerializationDeferred() { |
| return testEventFlag(EventFlags.FLAG_SERIALIZATION_DEFERRED); |
| } |
| |
| public boolean isSingleHop() { |
| return (this.causedByMessage != null && this.causedByMessage instanceof RemoteOperationMessage); |
| } |
| |
| public boolean isSingleHopPutOp() { |
| return (this.causedByMessage != null && this.causedByMessage instanceof RemotePutMessage); |
| } |
| |
| /** |
| * True if it is ok to use old/new values that are stored off heap. False if an exception should |
| * be thrown if an attempt is made to access old/new offheap values. |
| */ |
| transient boolean offHeapOk = true; |
| |
| @Override |
| @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) |
| public void release() { |
| // noop if already freed or values can not be off-heap |
| if (!this.offHeapOk) |
| return; |
| if (!mayHaveOffHeapReferences()) { |
| return; |
| } |
| synchronized (this.offHeapLock) { |
| // Note that this method does not set the old/new values to null but |
| // leaves them set to the off-heap value so that future calls to getOld/NewValue |
| // will fail with an exception. |
| testHookReleaseInProgress(); |
| Object ov = basicGetOldValue(); |
| Object nv = basicGetNewValue(); |
| this.offHeapOk = false; |
| |
| if (ov instanceof StoredObject) { |
| // this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " + |
| // System.identityHashCode(ov)); |
| if (ReferenceCountHelper.trackReferenceCounts()) { |
| ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner()); |
| ((Releasable) ov).release(); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| } else { |
| ((Releasable) ov).release(); |
| } |
| } |
| OffHeapHelper.releaseAndTrackOwner(nv, this); |
| } |
| } |
| |
| /** |
| * Return true if this EntryEvent may have off-heap references. |
| */ |
| private boolean mayHaveOffHeapReferences() { |
| if (this.offHeapLock == null) { |
| return false; |
| } |
| |
| InternalRegion lr = getRegion(); |
| if (lr != null) { |
| return lr.getOffHeap(); |
| } |
| // if region field is null it is possible that we have off-heap values |
| return true; |
| } |
| |
| void testHookReleaseInProgress() { |
| // unit test can mock or override this method |
| } |
| |
| /** |
| * Make sure that this event will never own an off-heap value. Once this is called on an event it |
| * does not need to have release called. |
| */ |
| public void disallowOffHeapValues() { |
| if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) { |
| throw new IllegalStateException("This event already has off-heap values"); |
| } |
| if (mayHaveOffHeapReferences()) { |
| synchronized (this.offHeapLock) { |
| this.offHeapOk = false; |
| } |
| } else { |
| this.offHeapOk = false; |
| } |
| |
| } |
| |
| /** |
| * This copies the off-heap new and/or old value to the heap. As a result the current off-heap |
| * new/old will be released. |
| */ |
| @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) |
| public void copyOffHeapToHeap() { |
| if (!mayHaveOffHeapReferences()) { |
| this.offHeapOk = false; |
| return; |
| } |
| synchronized (this.offHeapLock) { |
| Object ov = basicGetOldValue(); |
| if (StoredObject.isOffHeapReference(ov)) { |
| if (ReferenceCountHelper.trackReferenceCounts()) { |
| ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner()); |
| this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov, getRegion().getCache()); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| } else { |
| this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov, getRegion().getCache()); |
| } |
| } |
| Object nv = basicGetNewValue(); |
| if (StoredObject.isOffHeapReference(nv)) { |
| ReferenceCountHelper.setReferenceCountOwner(this); |
| this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv, getRegion().getCache()); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| } |
| if (StoredObject.isOffHeapReference(this.newValue) |
| || StoredObject.isOffHeapReference(this.oldValue)) { |
| throw new IllegalStateException( |
| "event's old/new value still off-heap after calling copyOffHeapToHeap"); |
| } |
| this.offHeapOk = false; |
| } |
| } |
| |
| public boolean isOldValueOffHeap() { |
| return isOffHeapReference(this.oldValue); |
| } |
| |
| /** |
| * If region is currently a bucket |
| * then change it to be the partitioned region that owns that bucket. |
| * Otherwise do nothing. |
| */ |
| public void changeRegionToBucketsOwner() { |
| if (getRegion().isUsedForPartitionedRegionBucket()) { |
| setRegion(getRegion().getPartitionedRegion()); |
| } |
| } |
| } |