| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.versions; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.nio.BufferUnderflowException; |
| import java.util.Objects; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.cache.persistence.DiskStoreID; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.serialization.DataSerializableFixedID; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.size.ReflectionSingleObjectSizer; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * VersionTags are sent with distribution messages and carry version info for the operation. |
| * <p/> |
| * Note that on the receiving end the membership IDs in a version tag will not be references to |
| * canonical IDs and should be made so before storing them for any length of time. |
| * <p/> |
| * This class implements java.io.Serializable for dunit testing. It should not otherwise be |
| * serialized with that mechanism. |
| * |
| */ |
| public abstract class VersionTag<T extends VersionSource> |
| implements DataSerializableFixedID, java.io.Serializable, VersionHolder<T> { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final long serialVersionUID = 9098338414308465271L; |
| |
| // tag_size represents the tag, but does not count member ID sizes since they are |
| // interned in the region version vectors |
| public static final int TAG_SIZE = |
| ReflectionSingleObjectSizer.OBJECT_SIZE + ReflectionSingleObjectSizer.REFERENCE_SIZE * 2 + 23; |
| |
| /** |
| * A timestamp that cannot exist due to range restrictions. This is used to mark a timestamp as |
| * not being real |
| */ |
| public static final long ILLEGAL_VERSION_TIMESTAMP = 0x8000000000000000l; |
| |
| |
| // flags for serialization |
| static final int HAS_MEMBER_ID = 0x01; |
| static final int HAS_PREVIOUS_MEMBER_ID = 0x02; |
| static final int VERSION_TWO_BYTES = 0x04; |
| static final int DUPLICATE_MEMBER_IDS = 0x08; |
| static final int HAS_RVV_HIGH_BYTE = 0x10; |
| |
| private static final int BITS_POSDUP = 0x01; |
| private static final int BITS_RECORDED = 0x02; // has the rvv recorded this? |
| private static final int BITS_HAS_PREVIOUS_ID = 0x04; |
| private static final int BITS_GATEWAY_TAG = 0x08; |
| private static final int BITS_IS_REMOTE_TAG = 0x10; |
| private static final int BITS_TIMESTAMP_APPLIED = 0x20; |
| |
| private static final int BITS_ALLOWED_BY_RESOLVER = 0x40; |
| // Note: the only valid BITS_* are 0xFFFF. |
| |
| /** |
| * the per-entry version number for the operation |
| */ |
| private int entryVersion; |
| |
| /** |
| * high byte for large region version numbers |
| */ |
| private short regionVersionHighBytes; |
| |
| /** |
| * low bytes for region version numbers |
| */ |
| private int regionVersionLowBytes; |
| |
| /** |
| * time stamp |
| */ |
| private long timeStamp; |
| |
| /** |
| * distributed system ID |
| */ |
| private byte distributedSystemId; |
| |
| // In GEODE-1252 we found that the bits field |
| // was concurrently modified by calls to |
| // setPreviousMemberID and setRecorded. |
| // So bits has been changed to volatile and |
| // all modification to it happens using AtomicIntegerFieldUpdater. |
| private static final AtomicIntegerFieldUpdater<VersionTag> bitsUpdater = |
| AtomicIntegerFieldUpdater.newUpdater(VersionTag.class, "bits"); |
| /** |
| * boolean bits Note: this is an int field so it has 32 bits BUT only the lower 16 bits are |
| * serialized. So all our code should treat this an an unsigned short field. |
| */ |
| private volatile int bits; |
| |
| /** |
| * the initiator of the operation. If null, the initiator was the sender of the operation |
| */ |
| private T memberID; |
| |
| /** |
| * for Delta operations, the ID of the version stamp on which the delta is based. The version |
| * number for that stamp is getEntryVersion()-1 |
| */ |
| private T previousMemberID; |
| |
| public boolean isFromOtherMember() { |
| return (this.bits & BITS_IS_REMOTE_TAG) != 0; |
| } |
| |
| /** was the timestamp of this tag used to update the cache's timestamp? */ |
| public boolean isTimeStampUpdated() { |
| return (this.bits & BITS_TIMESTAMP_APPLIED) != 0; |
| } |
| |
| /** record that the timestamp from this tag was applied to the cache */ |
| public void setTimeStampApplied(boolean isTimeStampUpdated) { |
| if (isTimeStampUpdated) { |
| setBits(BITS_TIMESTAMP_APPLIED); |
| } else { |
| clearBits(~BITS_TIMESTAMP_APPLIED); |
| } |
| } |
| |
| /** |
| * @return true if this is a gateway timestamp holder rather than a full version tag |
| */ |
| public boolean isGatewayTag() { |
| return (this.bits & BITS_GATEWAY_TAG) != 0; |
| } |
| |
| public void setEntryVersion(int version) { |
| this.entryVersion = version; |
| } |
| |
| @Override |
| public int getEntryVersion() { |
| return this.entryVersion; |
| } |
| |
| public void setVersionTimeStamp(long timems) { |
| this.timeStamp = timems; |
| } |
| |
| public void setIsGatewayTag(boolean isGateway) { |
| if (isGateway) { |
| setBits(BITS_GATEWAY_TAG); |
| } else { |
| clearBits(~BITS_GATEWAY_TAG); |
| } |
| } |
| |
| public void setRegionVersion(long version) { |
| this.regionVersionHighBytes = (short) ((version & 0xFFFF00000000L) >> 32); |
| this.regionVersionLowBytes = (int) (version & 0xFFFFFFFFL); |
| } |
| |
| @Override |
| public long getRegionVersion() { |
| return (((long) regionVersionHighBytes) << 32) | (regionVersionLowBytes & 0x00000000FFFFFFFFL); |
| } |
| |
| /** |
| * set rvv internal bytes. Used by region entries |
| */ |
| public void setRegionVersion(short highBytes, int lowBytes) { |
| this.regionVersionHighBytes = highBytes; |
| this.regionVersionLowBytes = lowBytes; |
| } |
| |
| /** |
| * get rvv internal high byte. Used by region entries for transferring to storage |
| */ |
| @Override |
| public short getRegionVersionHighBytes() { |
| return this.regionVersionHighBytes; |
| } |
| |
| /** |
| * get rvv internal low bytes. Used by region entries for transferring to storage |
| */ |
| @Override |
| public int getRegionVersionLowBytes() { |
| return this.regionVersionLowBytes; |
| } |
| |
| /** |
| * set that this tag has been recorded in a receiver's RVV |
| */ |
| public void setRecorded() { |
| setBits(BITS_RECORDED); |
| } |
| |
| /** |
| * has this tag been recorded in a receiver's RVV? |
| */ |
| public boolean isRecorded() { |
| return ((this.bits & BITS_RECORDED) != 0); |
| } |
| |
| /** |
| * Set canonical ID objects into this version tag using the DM's cache of IDs |
| * |
| */ |
| public void setCanonicalIDs(DistributionManager distributionManager) {} |
| |
| /** |
| * @return the memberID |
| */ |
| @Override |
| public T getMemberID() { |
| return this.memberID; |
| } |
| |
| /** |
| * @param memberID the memberID to set |
| */ |
| public void setMemberID(T memberID) { |
| this.memberID = memberID; |
| } |
| |
| /** |
| * @return the previousMemberID |
| */ |
| public T getPreviousMemberID() { |
| return this.previousMemberID; |
| } |
| |
| /** |
| * @param previousMemberID the previousMemberID to set |
| */ |
| public void setPreviousMemberID(T previousMemberID) { |
| setBits(BITS_HAS_PREVIOUS_ID); |
| this.previousMemberID = previousMemberID; |
| } |
| |
| /** |
| * sets the possible-duplicate flag for this tag. When a tag has this bit it means that the cache |
| * had seen the operation that was being applied to it and plucked out the current version stamp |
| * to use in propagating the event to other members and clients. A member receiving this event |
| * should not allow duplicate application of the event to the cache. |
| */ |
| public VersionTag setPosDup(boolean flag) { |
| if (flag) { |
| setBits(BITS_POSDUP); |
| } else { |
| clearBits(~BITS_POSDUP); |
| } |
| return this; |
| } |
| |
| public boolean isPosDup() { |
| return (this.bits & BITS_POSDUP) != 0; |
| } |
| |
| /** |
| * set or clear the flag that this tag was blessed by a conflict resolver |
| * |
| * @return this tag |
| */ |
| public VersionTag setAllowedByResolver(boolean flag) { |
| if (flag) { |
| setBits(BITS_ALLOWED_BY_RESOLVER); |
| } else { |
| clearBits(~BITS_ALLOWED_BY_RESOLVER); |
| } |
| return this; |
| } |
| |
| public boolean isAllowedByResolver() { |
| return (this.bits & BITS_ALLOWED_BY_RESOLVER) != 0; |
| } |
| |
| @Override |
| public int getDistributedSystemId() { |
| return this.distributedSystemId; |
| } |
| |
| public void setDistributedSystemId(int id) { |
| this.distributedSystemId = (byte) (id & 0xFF); |
| } |
| |
| /** |
| * replace null member IDs with the given identifier. This is used to incorporate version |
| * information into the cache that has been received from another VM |
| * |
| */ |
| public void replaceNullIDs(VersionSource id) { |
| if (this.memberID == null) { |
| this.memberID = (T) id; |
| } |
| if (this.previousMemberID == null && this.hasPreviousMemberID() && entryVersion > 1) { |
| this.previousMemberID = (T) id; |
| } |
| } |
| |
| /** |
| * returns true if this tag has a previous member ID for delta operation checks |
| */ |
| public boolean hasPreviousMemberID() { |
| return (this.bits & BITS_HAS_PREVIOUS_ID) != 0; |
| } |
| |
| /** |
| * returns true if entry and region version numbers are not both zero, meaning this has valid |
| * version numbers |
| */ |
| public boolean hasValidVersion() { |
| return !(this.entryVersion == 0 && this.regionVersionHighBytes == 0 |
| && this.regionVersionLowBytes == 0); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| toData(out, true); |
| } |
| |
| public void toData(DataOutput out, boolean includeMember) throws IOException { |
| int flags = 0; |
| boolean versionIsShort = false; |
| if (this.entryVersion < 0x10000) { |
| versionIsShort = true; |
| flags |= VERSION_TWO_BYTES; |
| } |
| if (this.regionVersionHighBytes != 0) { |
| flags |= HAS_RVV_HIGH_BYTE; |
| } |
| if (this.memberID != null && includeMember) { |
| flags |= HAS_MEMBER_ID; |
| } |
| boolean writePreviousMemberID = false; |
| if (this.previousMemberID != null && includeMember) { |
| flags |= HAS_PREVIOUS_MEMBER_ID; |
| if (Objects.equals(this.previousMemberID, this.memberID)) { |
| flags |= DUPLICATE_MEMBER_IDS; |
| } else { |
| writePreviousMemberID = true; |
| } |
| } |
| if (logger.isTraceEnabled(LogMarker.VERSION_TAG_VERBOSE)) { |
| logger.trace(LogMarker.VERSION_TAG_VERBOSE, "serializing {} with flags 0x{}", this.getClass(), |
| Integer.toHexString(flags)); |
| } |
| out.writeShort(flags); |
| out.writeShort(this.bits); |
| out.write(this.distributedSystemId); |
| if (versionIsShort) { |
| out.writeShort(this.entryVersion & 0xffff); |
| } else { |
| out.writeInt(this.entryVersion); |
| } |
| if (this.regionVersionHighBytes != 0) { |
| out.writeShort(this.regionVersionHighBytes); |
| } |
| out.writeInt(this.regionVersionLowBytes); |
| InternalDataSerializer.writeUnsignedVL(this.timeStamp, out); |
| if (this.memberID != null && includeMember) { |
| writeMember(this.memberID, out); |
| } |
| if (writePreviousMemberID) { |
| writeMember(this.previousMemberID, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| int flags = in.readUnsignedShort(); |
| if (logger.isTraceEnabled(LogMarker.VERSION_TAG_VERBOSE)) { |
| logger.trace(LogMarker.VERSION_TAG_VERBOSE, "deserializing {} with flags 0x{}", |
| this.getClass(), Integer.toHexString(flags)); |
| } |
| bitsUpdater.set(this, in.readUnsignedShort()); |
| this.distributedSystemId = in.readByte(); |
| if ((flags & VERSION_TWO_BYTES) != 0) { |
| this.entryVersion = in.readShort() & 0xffff; |
| } else { |
| this.entryVersion = in.readInt() & 0xffffffff; |
| } |
| if ((flags & HAS_RVV_HIGH_BYTE) != 0) { |
| this.regionVersionHighBytes = in.readShort(); |
| } |
| this.regionVersionLowBytes = in.readInt(); |
| this.timeStamp = InternalDataSerializer.readUnsignedVL(in); |
| if ((flags & HAS_MEMBER_ID) != 0) { |
| this.memberID = readMember(in); |
| } |
| if ((flags & HAS_PREVIOUS_MEMBER_ID) != 0) { |
| if ((flags & DUPLICATE_MEMBER_IDS) != 0) { |
| this.previousMemberID = this.memberID; |
| } else { |
| try { |
| this.previousMemberID = readMember(in); |
| } catch (BufferUnderflowException e) { |
| if (context.getSerializationVersion().compareTo(Version.GEODE_1_11_0) < 0) { |
| // GEODE-7219: older versions may report HAS_PREVIOUS_MEMBER_ID but not transmit it |
| logger.info("Buffer underflow encountered while reading a version tag - ignoring"); |
| } else { |
| throw e; |
| } |
| } |
| } |
| } |
| setBits(BITS_IS_REMOTE_TAG); |
| } |
| |
| /** for unit testing receipt of version tags from another member of the cluster */ |
| public void setIsRemoteForTesting() { |
| setBits(BITS_IS_REMOTE_TAG); |
| } |
| |
| public abstract T readMember(DataInput in) throws IOException, ClassNotFoundException; |
| |
| public abstract void writeMember(T memberID, DataOutput out) throws IOException; |
| |
| |
| @Override |
| public String toString() { |
| StringBuilder s = new StringBuilder(); |
| if (isGatewayTag()) { |
| s.append("{ds=").append(this.distributedSystemId).append("; time=") |
| .append(getVersionTimeStamp()).append("}"); |
| } else { |
| s.append("{v").append(this.entryVersion); |
| s.append("; rv").append(getRegionVersion()); |
| if (this.memberID != null) { |
| s.append("; mbr=").append(this.memberID); |
| } |
| if (hasPreviousMemberID()) { |
| s.append("; prev=").append(this.previousMemberID); |
| } |
| if (this.distributedSystemId >= 0) { |
| s.append("; ds=").append(this.distributedSystemId); |
| } |
| s.append("; time=").append(getVersionTimeStamp()); |
| if (isFromOtherMember()) { |
| s.append("; remote"); |
| } |
| if (this.isAllowedByResolver()) { |
| s.append("; allowed"); |
| } |
| s.append("}"); |
| } |
| return s.toString(); |
| } |
| |
| |
| /** |
| * @return the time stamp of this operation. This is an unsigned integer returned as a long |
| */ |
| @Override |
| public long getVersionTimeStamp() { |
| return this.timeStamp; |
| } |
| |
| /** |
| * Creates a version tag of the appropriate type, based on the member id |
| * |
| */ |
| public static VersionTag create(VersionSource memberId) { |
| VersionTag tag; |
| if (memberId instanceof DiskStoreID) { |
| tag = new DiskVersionTag(); |
| } else { |
| tag = new VMVersionTag(); |
| } |
| |
| tag.setMemberID(memberId); |
| |
| return tag; |
| } |
| |
| public static VersionTag create(boolean persistent, DataInput in) |
| throws IOException, ClassNotFoundException { |
| VersionTag<?> tag; |
| if (persistent) { |
| tag = new DiskVersionTag(); |
| } else { |
| tag = new VMVersionTag(); |
| } |
| InternalDataSerializer.invokeFromData(tag, in); |
| return tag; |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + entryVersion; |
| result = prime * result + ((memberID == null) ? 0 : memberID.hashCode()); |
| result = prime * result + regionVersionHighBytes; |
| result = prime * result + regionVersionLowBytes; |
| if (isGatewayTag()) { |
| result = prime * result + (int) timeStamp; |
| result = prime * result + (int) (timeStamp >>> 32); |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) |
| return true; |
| if (obj == null) |
| return false; |
| if (getClass() != obj.getClass()) |
| return false; |
| VersionTag<?> other = (VersionTag<?>) obj; |
| if (entryVersion != other.entryVersion) |
| return false; |
| if (memberID == null) { |
| if (other.memberID != null) |
| return false; |
| } else if (!memberID.equals(other.memberID)) |
| return false; |
| if (regionVersionHighBytes != other.regionVersionHighBytes) |
| return false; |
| if (regionVersionLowBytes != other.regionVersionLowBytes) |
| return false; |
| if (isGatewayTag() != other.isGatewayTag()) { |
| return false; |
| } |
| if (isGatewayTag()) { |
| if (timeStamp != other.timeStamp) { |
| return false; |
| } |
| if (distributedSystemId != other.distributedSystemId) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Set any bits in the given bitMask on the bits field |
| */ |
| private void setBits(int bitMask) { |
| int oldBits; |
| int newBits; |
| do { |
| oldBits = this.bits; |
| newBits = oldBits | bitMask; |
| } while (!bitsUpdater.compareAndSet(this, oldBits, newBits)); |
| } |
| |
| /** |
| * Clear any bits not in the given bitMask from the bits field |
| */ |
| private void clearBits(int bitMask) { |
| int oldBits; |
| int newBits; |
| do { |
| oldBits = this.bits; |
| newBits = oldBits & bitMask; |
| } while (!bitsUpdater.compareAndSet(this, oldBits, newBits)); |
| } |
| } |