| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.versions; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.ConcurrentModificationException; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.DataSerializableFixedID; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.Version; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.persistence.DiskStoreID; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| |
| /** |
| * RegionVersionVector tracks the highest region-level version number of operations applied to a |
| * region for each member that has the region. |
| * <p> |
| * |
| */ |
| public abstract class RegionVersionVector<T extends VersionSource<?>> |
| implements DataSerializableFixedID, MembershipListener { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| // TODO:LOG:CONVERT: REMOVE THIS |
| public static final boolean DEBUG = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "VersionVector.VERBOSE"); |
| |
| |
| |
| //////////////////// The following statics exist for unit testing. //////////////////////////// |
| |
| /** maximum ms wait time while waiting for dominance to be achieved */ |
| public static final long MAX_DOMINANCE_WAIT_TIME = |
| Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "max-dominance-wait-time", 5000); |
| |
| /** maximum ms pause time while waiting for dominance to be achieved */ |
| public static final long DOMINANCE_PAUSE_TIME = |
| Math.min(Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "dominance-pause-time", 300), |
| MAX_DOMINANCE_WAIT_TIME); |
| |
| private static final int INITIAL_CAPACITY = 2; |
| private static final int CONCURRENCY_LEVEL = 2; |
| private static final float LOAD_FACTOR = 0.75f; |
| //////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| |
| |
| /** map of member to version h older. This is the actual version "vector" */ |
| private ConcurrentHashMap<T, RegionVersionHolder<T>> memberToVersion; |
| |
| /** current version in the local region for generating next version */ |
| private AtomicLong localVersion = new AtomicLong(0); |
| /** |
| * The list of exceptions for the local member. The version held in this RegionVersionHolder may |
| * not be accurate, but the exception list is. We can have exceptions for our own id if we recover |
| * from disk or GII from a peer that has exceptions from us. |
| * |
| * The version held in this object can lag behind the localVersion atomic long, because that long |
| * is incremented without obtaining a lock. Operations that use the localException list are |
| * responsible for updating the version of the local exceptions under lock. |
| */ |
| private RegionVersionHolder<T> localExceptions; |
| |
| /** highest reaped tombstone region-version for this member */ |
| private AtomicLong localGCVersion = new AtomicLong(0); |
| |
| /** the member that this version vector applies to */ |
| private T myId; |
| |
| |
| |
| /** |
| * a flag stating whether this vector contains only the version information for a single member. |
| * This is used when a member crashed to transmit only the version information for that member. |
| */ |
| private boolean singleMember; |
| |
| /** a flag to prevent accidental serialization of a live member */ |
| private transient boolean isLiveVector; |
| |
| private transient LocalRegion region; |
| |
| private ConcurrentHashMap<T, Long> memberToGCVersion; |
| |
| /** map of canonical IDs for this RVV that are not in the memberToVersion map */ |
| @SuppressWarnings("unchecked") |
| private transient Map<T, T> canonicalIds = Collections.EMPTY_MAP; |
| |
| private final Object canonicalIdLock = new Object(); |
| |
| /** is recording disabled? */ |
| private transient boolean recordingDisabled; |
| |
| /** is this a vector in a client cache? */ |
| private transient boolean clientVector; |
| |
| /** |
| * this read/write lock is used to stop generation of new versions by the vector while a |
| * region-level operation is underway. The locking scheme assumes that only one region-level RVV |
| * operation is allowed at a time on a region across the distributed system. If that changes then |
| * the locking scheme here may need additional work. |
| */ |
| private final transient ReentrantReadWriteLock versionLock = new ReentrantReadWriteLock(); |
| private transient volatile boolean locked; // this is only modified by the version locking thread |
| private transient volatile boolean doUnlock; // this is only modified by the version locking |
| // thread |
| private transient InternalDistributedMember lockOwner; // guarded by lockWaitSync |
| |
| private final transient Object clearLockSync = new Object(); // sync for coordinating thread |
| // startup and lockOwner setting |
| |
| /** |
| * constructor used to create a cloned vector |
| */ |
| protected RegionVersionVector(T ownerId, ConcurrentHashMap<T, RegionVersionHolder<T>> vector, |
| long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion, boolean singleMember, |
| RegionVersionHolder<T> localExceptions) { |
| this.myId = ownerId; |
| this.memberToVersion = vector; |
| this.memberToGCVersion = gcVersions; |
| this.localGCVersion.set(gcVersion); |
| this.localVersion.set(version); |
| this.singleMember = singleMember; |
| this.localExceptions = localExceptions; |
| } |
| |
| /** |
| * deserialize a cloned vector |
| */ |
| public RegionVersionVector() { |
| this.memberToVersion = new ConcurrentHashMap<T, RegionVersionHolder<T>>(INITIAL_CAPACITY, |
| LOAD_FACTOR, CONCURRENCY_LEVEL); |
| this.memberToGCVersion = |
| new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); |
| } |
| |
| /** |
| * create a live version vector for a region |
| */ |
| public RegionVersionVector(T ownerId) { |
| this(ownerId, null); |
| } |
| |
| /** |
| * create a live version vector for a region |
| */ |
| public RegionVersionVector(T ownerId, LocalRegion owner) { |
| this(ownerId, owner, 0); |
| } |
| |
| @VisibleForTesting |
| RegionVersionVector(T ownerId, LocalRegion owner, long version) { |
| this.myId = ownerId; |
| this.isLiveVector = true; |
| this.region = owner; |
| this.localExceptions = new RegionVersionHolder<T>(0); |
| this.memberToVersion = |
| new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); |
| this.memberToGCVersion = |
| new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); |
| this.localVersion.set(version); |
| } |
| |
| /** |
| * Retrieve a vector that can be sent to another member. This clones all of the version |
| * information to protect against concurrent modification during serialization |
| */ |
| public RegionVersionVector<T> getCloneForTransmission() { |
| Map<T, RegionVersionHolder<T>> liveHolders; |
| liveHolders = new HashMap<T, RegionVersionHolder<T>>(this.memberToVersion); |
| ConcurrentHashMap<T, RegionVersionHolder<T>> clonedHolders = |
| new ConcurrentHashMap<T, RegionVersionHolder<T>>(liveHolders.size(), LOAD_FACTOR, |
| CONCURRENCY_LEVEL); |
| for (Map.Entry<T, RegionVersionHolder<T>> entry : liveHolders.entrySet()) { |
| clonedHolders.put(entry.getKey(), entry.getValue().clone()); |
| } |
| ConcurrentHashMap<T, Long> gcVersions = new ConcurrentHashMap<T, Long>( |
| this.memberToGCVersion.size(), LOAD_FACTOR, CONCURRENCY_LEVEL); |
| gcVersions.putAll(this.memberToGCVersion); |
| RegionVersionHolder<T> clonedLocalHolder; |
| clonedLocalHolder = this.localExceptions.clone(); |
| // Make sure the holder that we send to the peer does |
| // have an accurate RegionVersionHolder for our local version |
| return createCopy(this.myId, clonedHolders, this.localVersion.get(), gcVersions, |
| this.localGCVersion.get(), false, clonedLocalHolder); |
| } |
| |
| protected abstract RegionVersionVector<T> createCopy(T ownerId, |
| ConcurrentHashMap<T, RegionVersionHolder<T>> vector, long version, |
| ConcurrentHashMap<T, Long> gcVersions, long gcVersion, boolean singleMember, |
| RegionVersionHolder<T> clonedLocalHolder); |
| |
| |
| /** |
| * Retrieve a vector that can be sent to another member. This clones only the version information |
| * for the given ID. |
| * <p> |
| * The clone returned by this method does not have distributed garbage-collection information. |
| */ |
| public RegionVersionVector<T> getCloneForTransmission(T mbr) { |
| Map<T, RegionVersionHolder<T>> liveHolders; |
| liveHolders = new HashMap<T, RegionVersionHolder<T>>(this.memberToVersion); |
| RegionVersionHolder<T> holder = liveHolders.get(mbr); |
| if (holder == null) { |
| if (mbr.isDiskStoreId() && mbr.equals(myId)) { |
| // For region recovered from disk, we may have local exceptions needs to be |
| // brought back during region synchronization |
| holder = localExceptions.clone(); |
| holder.setVersion(localVersion.get()); |
| } else { |
| holder = new RegionVersionHolder<T>(-1); |
| } |
| } else { |
| holder = holder.clone(); |
| } |
| return createCopy(this.myId, |
| new ConcurrentHashMap<T, RegionVersionHolder<T>>(Collections.singletonMap(mbr, holder)), 0, |
| new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL), 0, true, |
| new RegionVersionHolder<T>(-1)); |
| } |
| |
| |
| /** |
| * Retrieve a collection of tombstone GC region-versions |
| */ |
| public Map<T, Long> getTombstoneGCVector() { |
| Map<T, Long> result; |
| synchronized (memberToGCVersion) { |
| result = new HashMap<T, Long>(this.memberToGCVersion); |
| } |
| if (this.localGCVersion.get() != 0) { |
| result.put(this.myId, this.localGCVersion.get()); |
| } |
| return result; |
| } |
| |
| |
| /** returns true if all of the GC versions in the given map have already been processed here */ |
| public boolean containsTombstoneGCVersions(Map<T, Long> regionGCVersions) { |
| Long myVersion = regionGCVersions.get(this.myId); |
| if (myVersion != null) { |
| if (this.localGCVersion.get() < myVersion.longValue()) { |
| return false; |
| } |
| } |
| synchronized (this.memberToGCVersion) { |
| for (Map.Entry<T, Long> entry : regionGCVersions.entrySet()) { |
| Long version = this.memberToGCVersion.get(entry.getKey()); |
| if (version == null || version.longValue() < entry.getValue().longValue()) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * locks against new version generation and returns the current region version number |
| * |
| */ |
| public long lockForClear(String regionPath, DistributionManager dm, |
| InternalDistributedMember locker) { |
| lockVersionGeneration(regionPath, dm, locker); |
| return this.localVersion.get(); |
| } |
| |
| /** unlocks version generation for clear() operations */ |
| public void unlockForClear(InternalDistributedMember locker) { |
| synchronized (this.clearLockSync) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Unlocking for clear, from member {} RVV {}", locker, |
| System.identityHashCode(this)); |
| } |
| if (this.lockOwner != null && !locker.equals(this.lockOwner)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("current clear lock owner was {} not unlocking", lockOwner); |
| } |
| // this method is invoked by memberDeparted events and may not be for the current lock owner |
| return; |
| } |
| unlockVersionGeneration(locker); |
| } |
| } |
| |
| /** |
| * This schedules a thread that owns the version-generation write-lock for this vector. The method |
| * unlockVersionGeneration notifies the thread to release the lock and terminate its run. |
| * |
| * @param dm the distribution manager - used to obtain an executor to hold the thread |
| * @param locker the member requesting the lock (currently not used) |
| */ |
| private void lockVersionGeneration(final String regionPath, final DistributionManager dm, |
| final InternalDistributedMember locker) { |
| final CountDownLatch acquiredLock = new CountDownLatch(1); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Locking version generation for {} region {} RVV {}", locker, regionPath, |
| System.identityHashCode(this)); |
| } |
| // this could block for a while if a limit has been set on the waiting-thread-pool |
| dm.getWaitingThreadPool().execute(new Runnable() { |
| @Override |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings( |
| value = {"UL_UNRELEASED_LOCK", "IMSE_DONT_CATCH_IMSE"}) |
| public void run() { |
| boolean haveLock = false; |
| synchronized (clearLockSync) { |
| try { |
| // TODO Following code does not seem necessary as dlock has been taken |
| // so no two threads will try to enter in this code section. |
| while (locked && dm.isCurrentMember(locker)) { |
| try { |
| clearLockSync.wait(); |
| } catch (InterruptedException e) { |
| // okay to ignore - release the lock and exit |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Waiting thread is now locking version generation for {} region {} RVV {}", |
| locker, regionPath, System.identityHashCode(this)); |
| } |
| try { |
| versionLock.writeLock().lock(); |
| lockOwner = locker; |
| doUnlock = false; |
| locked = true; |
| haveLock = true; |
| acquiredLock.countDown(); |
| } catch (IllegalMonitorStateException e) { |
| // dlock on the clear() operation should prevent this from happening |
| logger.fatal( |
| "Request from {} to block operations found that operations are already blocked by member {}.", |
| new Object[] {locker, lockOwner}); |
| return; |
| } |
| |
| while (!doUnlock && dm.isCurrentMember(locker)) { |
| try { |
| clearLockSync.wait(250); |
| } catch (InterruptedException e) { |
| // okay to ignore - release the lock and exit |
| } |
| } |
| } finally { |
| if (haveLock) { |
| locked = false; // this must be clear when the writeLock is released |
| // so we don't get warnings about it still being locked |
| versionLock.writeLock().unlock(); |
| doUnlock = false; |
| clearLockSync.notifyAll(); |
| // leave lockOwner set so we can see who the last lock request came from |
| } |
| acquiredLock.countDown(); |
| } |
| } |
| } |
| }); |
| boolean interrupted = false; |
| while (dm.isCurrentMember(locker)) { |
| try { |
| if (acquiredLock.await(250, TimeUnit.MILLISECONDS)) { |
| break; |
| } |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } |
| } |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Done locking"); |
| } |
| |
| } |
| |
| private void unlockVersionGeneration(final InternalDistributedMember locker) { |
| synchronized (clearLockSync) { |
| this.doUnlock = true; |
| this.clearLockSync.notifyAll(); |
| } |
| } |
| |
| /** |
| * return the next local version number |
| */ |
| public long getNextVersion() { |
| return getNextVersion(true); |
| } |
| |
| /** |
| * return the next local version number for a clear() operation, bypassing lock checks |
| */ |
| public long getNextVersionWhileLocked() { |
| return getNextVersion(false); |
| } |
| |
| /** |
| * return the next local version number |
| */ |
| private long getNextVersion(boolean checkLocked) { |
| if (checkLocked && this.locked) { |
| // this should never be the case. If version generation is locked and we get here |
| // then the path to this point is not protected by getting the version generation |
| // lock from the RVV but it should be |
| if (logger.isDebugEnabled()) { |
| logger.debug("Generating a version tag when version generation is locked by {}", |
| this.lockOwner); |
| } |
| } |
| long new_version = localVersion.incrementAndGet(); |
| // since there could be special exception, we have to use recordVersion() |
| recordVersion(getOwnerId(), new_version); |
| return new_version; |
| } |
| |
| /** obtain a lock to prevent concurrent clear() from happening */ |
| public void lockForCacheModification(LocalRegion owner) { |
| if (owner.getServerProxy() == null) { |
| this.versionLock.readLock().lock(); |
| } |
| } |
| |
| /** release the lock preventing concurrent clear() from happening */ |
| public void releaseCacheModificationLock(LocalRegion owner) { |
| if (owner.getServerProxy() == null) { |
| this.versionLock.readLock().unlock(); |
| } |
| } |
| |
| /** obtain a lock to prevent concurrent clear() from happening */ |
| public void lockForCacheModification() { |
| this.versionLock.readLock().lock(); |
| } |
| |
| /** release the lock preventing concurrent clear() from happening */ |
| public void releaseCacheModificationLock() { |
| this.versionLock.readLock().unlock(); |
| } |
| |
| private void syncLocalVersion() { |
| long v = localVersion.get(); |
| synchronized (localExceptions) { |
| if (v != localExceptions.version) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adjust localExceptions.version {} to equal localVersion {}", |
| localExceptions.version, localVersion.get()); |
| } |
| localExceptions.version = v; |
| } |
| } |
| } |
| |
| /** |
| * return the current version for this member |
| */ |
| public long getCurrentVersion() { |
| synchronized (localExceptions) { |
| syncLocalVersion(); |
| return localExceptions.getVersion(); |
| } |
| } |
| |
| /** |
| * return the current version for this member |
| */ |
| public RegionVersionHolder<T> getLocalExceptions() { |
| return localExceptions; |
| } |
| |
| /** |
| * return version holder for this member |
| */ |
| public RegionVersionHolder<T> getHolderForMember(T id) { |
| if (id.equals(this.myId)) { |
| return localExceptions; |
| } else { |
| return this.memberToVersion.get(id); |
| } |
| } |
| |
| /** |
| * returns the ID of the member that owns this version vector |
| */ |
| public T getOwnerId() { |
| return this.myId; |
| } |
| |
| |
| /** |
| * turns off recording of versions for this vector. This can be used when recording of versions is |
| * not necessary, as in an empty region |
| */ |
| public void turnOffRecordingForEmptyRegion() { |
| this.recordingDisabled = true; |
| } |
| |
| /** |
| * client version vectors only record GC numbers and don't keep exceptions, etc, because there |
| * could be MANY of them |
| */ |
| public void setIsClientVector() { |
| this.clientVector = true; |
| } |
| |
| /** |
| * record all of the version information from an initial image provider |
| */ |
| public void recordVersions(RegionVersionVector<T> otherVector) { |
| synchronized (this.memberToVersion) { |
| for (Map.Entry<T, RegionVersionHolder<T>> entry : otherVector.getMemberToVersion() |
| .entrySet()) { |
| T mbr = entry.getKey(); |
| RegionVersionHolder<T> otherHolder = entry.getValue(); |
| |
| initializeVersionHolder(mbr, otherHolder); |
| } |
| // Get the set of local exceptions from the other vector |
| // before directly accessing localExceptions, should sync its this.version with localVersion |
| otherVector.syncLocalVersion(); |
| initializeVersionHolder(otherVector.getOwnerId(), otherVector.localExceptions); |
| |
| if (otherVector.getCurrentVersion() > 0 |
| && !this.memberToVersion.containsKey(otherVector.getOwnerId())) { |
| recordVersion(otherVector.getOwnerId(), otherVector.getCurrentVersion()); |
| } |
| |
| // check if I have updates from members that the otherVector does not have |
| // If yes, these are unfinished ops and should be cleaned |
| for (T mbr : this.memberToVersion.keySet()) { |
| if (!otherVector.memberToVersion.containsKey(mbr) |
| && !mbr.equals(otherVector.getOwnerId())) { |
| RegionVersionHolder holder = this.memberToVersion.get(mbr); |
| initializeVersionHolder(mbr, new RegionVersionHolder(0)); |
| } |
| } |
| if (!otherVector.memberToVersion.containsKey(myId) |
| && !myId.equals(otherVector.getOwnerId())) { |
| initializeVersionHolder(myId, new RegionVersionHolder(0)); |
| } |
| |
| synchronized (this.memberToGCVersion) { |
| for (Map.Entry<T, Long> entry : otherVector.getMemberToGCVersion().entrySet()) { |
| T member = entry.getKey(); |
| Long value = entry.getValue(); |
| if (member.equals(myId)) { |
| // If this entry is for our id, update our local GC version |
| long currentValue; |
| while ((currentValue = localGCVersion.get()) < value) { |
| localGCVersion.compareAndSet(currentValue, value); |
| } |
| } else { |
| // Update the memberToGCVersionMap. |
| Long myVersion = this.memberToGCVersion.get(entry.getKey()); |
| if (myVersion == null || myVersion < entry.getValue()) { |
| this.memberToGCVersion.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| public void initializeVersionHolder(T mbr, RegionVersionHolder<T> otherHolder) { |
| RegionVersionHolder<T> h = this.memberToVersion.get(mbr); |
| if (h == null) { |
| if (!mbr.equals(this.myId)) { |
| h = otherHolder.clone(); |
| h.makeReadyForRecording(); |
| this.memberToVersion.put(mbr, h); |
| } else { |
| RegionVersionHolder<T> vh = otherHolder; |
| long version = vh.version; |
| updateLocalVersion(version); |
| this.localExceptions.initializeFrom(vh); |
| } |
| } else { |
| // holders must be modified under synchronization |
| h.initializeFrom(otherHolder); |
| } |
| } |
| |
| void updateLocalVersion(long newVersion) { |
| boolean needToTrySetAgain; |
| do { |
| needToTrySetAgain = false; |
| long currentVersion = this.localVersion.get(); |
| if (currentVersion < newVersion) { |
| needToTrySetAgain = !compareAndSetVersion(currentVersion, newVersion); |
| } |
| } while (needToTrySetAgain); |
| } |
| |
| boolean compareAndSetVersion(long currentVersion, long newVersion) { |
| return this.localVersion.compareAndSet(currentVersion, newVersion); |
| } |
| |
| /** |
| * Records a received region-version. These are transmitted in VersionTags in messages between |
| * peers and from servers to clients. |
| * |
| * @param tag the version information |
| */ |
| public void recordVersion(T mbr, VersionTag<T> tag) { |
| tag.setRecorded(); |
| assert tag.isRecorded(); |
| T member = tag.getMemberID(); |
| if (member == null) { |
| member = mbr; |
| } |
| |
| if (this.myId.equals(member)) { |
| // We can be asked to record a version for the local member if a persistent |
| // member is restarted and an event is replayed after the persistent member |
| // recovers. So we can only assert that the local member has already seen |
| // the replayed event. |
| synchronized (localExceptions) { |
| if (this.localVersion.get() < tag.getRegionVersion() && region != null |
| && region.isInitialized() && region.getDataPolicy().withPersistence()) { |
| Assert.fail( |
| "recordVersion invoked for a local version tag that is higher than our local version. rvv=" |
| + this + ", tag=" + tag + " " + region.getName()); |
| } |
| } |
| } |
| |
| recordVersion(member, tag.getRegionVersion()); |
| } |
| |
| /** |
| * Records a received region-version. These are transmitted in VersionTags in messages between |
| * peers and from servers to clients. In general you should use recordVersion(mbr, versionTag) so |
| * that the tag is marked as having been recorded. This will keep |
| * DistributedCacheOperation.basicProcess() from trying to record it again. |
| * |
| * This method is also called for versions which have been recovered from disk. |
| * |
| * @param member the peer that performed the operation |
| * @param version the version of the peers region that reflects the operation |
| */ |
| public void recordVersion(T member, long version) { |
| T mbr = member; |
| |
| if (this.recordingDisabled || clientVector) { |
| return; |
| } |
| |
| RegionVersionHolder<T> holder; |
| |
| if (mbr.equals(this.myId)) { |
| // If we are recording a version for the local member, |
| // use the local exception list. |
| holder = this.localExceptions; |
| |
| synchronized (holder) { |
| // Advance the version held in the local |
| // exception list to match the atomic long |
| // we using for the local version. |
| holder.version = this.localVersion.get(); |
| } |
| updateLocalVersion(version); |
| } else { |
| // Find the version holder object |
| holder = memberToVersion.get(mbr); |
| if (holder == null) { |
| synchronized (memberToVersion) { |
| // Look for the holder under lock |
| holder = memberToVersion.get(mbr); |
| if (holder == null) { |
| mbr = getCanonicalId(mbr); |
| holder = new RegionVersionHolder<T>(mbr); |
| memberToVersion.put(holder.id, holder); |
| } |
| } |
| } |
| } |
| |
| // Update the version holder |
| if (logger.isTraceEnabled(LogMarker.RVV_VERBOSE)) { |
| logger.trace(LogMarker.RVV_VERBOSE, "Recording rv{} for {}", version, mbr); |
| } |
| holder.recordVersion(version); |
| } |
| |
| |
| |
| /** |
| * Records a version holder that we have recovered from disk. This version holder replaces the |
| * current version holder if it dominates the version holder we already have. This method will |
| * called once for each oplog we recover. |
| * |
| */ |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings( |
| value = "ML_SYNC_ON_FIELD_TO_GUARD_CHANGING_THAT_FIELD", |
| justification = "sync on localExceptions guards concurrent modification but this is a replacement") |
| public void initRecoveredVersion(T member, RegionVersionHolder<T> v, boolean latestOplog) { |
| RegionVersionHolder<T> recovered = v.clone(); |
| |
| if (member == null || member.equals(myId)) { |
| // if this is the version for the local member, update our local info |
| |
| // update the local exceptions |
| synchronized (localExceptions) { |
| // Fix for 45622 - We only take the version holder from the latest |
| // oplog. There may be more than one RVV in the latest oplog, in which |
| // case we want to end up with the last RVV from the latest oplog |
| if (latestOplog || localVersion.get() == 0) { |
| localExceptions = recovered; |
| if (logger.isTraceEnabled(LogMarker.RVV_VERBOSE)) { |
| logger.trace(LogMarker.RVV_VERBOSE, "initRecoveredVersion setting local version to {}", |
| recovered.version); |
| } |
| localVersion.set(recovered.version); |
| } |
| } |
| } else { |
| // If this is not a local member, update the member to version map |
| Long gcVersion = memberToGCVersion.get(member); |
| |
| synchronized (memberToVersion) { |
| RegionVersionHolder<T> oldVersion = memberToVersion.get(member); |
| |
| // Fix for 45622 - We only take the version holder from the latest |
| // oplog. There may be more than one RVV in the latest oplog, in which |
| // case we want to end up with the last RVV from the latest oplog |
| if (latestOplog || oldVersion == null || oldVersion.version == 0) { |
| if (gcVersion != null) { |
| recovered.removeExceptionsOlderThan(gcVersion); |
| } |
| memberToVersion.put(member, recovered); |
| } |
| } |
| } |
| } |
| |
| /** |
| * get the last recorded region version number for the given member |
| */ |
| public long getVersionForMember(T mbr) { |
| RegionVersionHolder<T> holder = this.memberToVersion.get(mbr); |
| if (holder == null) { |
| if (mbr.equals(myId)) { |
| return getCurrentVersion(); |
| } else { |
| return 0; |
| } |
| } else { |
| return holder.getVersion(); |
| } |
| } |
| |
| /** |
| * Returns a list of the members that have been marked as having left the system. |
| */ |
| public Set<T> getDepartedMembersSet() { |
| synchronized (this.memberToVersion) { |
| Set<T> result = new HashSet<T>(); |
| for (RegionVersionHolder<T> h : this.memberToVersion.values()) { |
| if (h.isDepartedMember) { |
| result.add((T) h.id); |
| } |
| } |
| return result; |
| } |
| } |
| |
| |
| /** |
| * Test to see if this vector has seen the given version. |
| * |
| * @return true if this vector has seen the given version |
| */ |
| public boolean contains(T id, long version) { |
| RegionVersionHolder<T> holder = this.memberToVersion.get(id); |
| // For region synchronization. |
| if (isForSynchronization()) { |
| if (holder == null) { |
| // we only care about missing changes from a particular member, and this |
| // vector is known to contain that member's version holder |
| return true; |
| } |
| if (id.equals(this.myId)) { |
| if (!myId.isDiskStoreId()) { |
| // a sync vector only has one holder if not recovered from persistence, |
| // no valid version for the vector's owner |
| return true; |
| } |
| } |
| return holder.contains(version); |
| } |
| |
| // Regular GII |
| if (id.equals(this.myId)) { |
| if (getCurrentVersion() < version) { |
| return false; |
| } else { |
| return !localExceptions.hasExceptionFor(version); |
| } |
| } |
| if (holder == null) { |
| return false; |
| } else { |
| return holder.contains(version); |
| } |
| } |
| |
| /** |
| * Removes departed members not in the given collection of IDs from the version vector |
| * |
| * @param idsToKeep collection of the kind of IDs appropriate for this vector |
| */ |
| public void removeOldMembers(Set<VersionSource<T>> idsToKeep) { |
| synchronized (this.memberToVersion) { |
| for (Iterator<Map.Entry<T, RegionVersionHolder<T>>> it = |
| this.memberToVersion.entrySet().iterator(); it.hasNext();) { |
| Map.Entry<T, RegionVersionHolder<T>> entry = it.next(); |
| if (entry.getValue().isDepartedMember) { |
| if (!idsToKeep.contains(entry.getKey())) { |
| it.remove(); |
| this.memberToGCVersion.remove(entry.getKey()); |
| synchronized (this.canonicalIdLock) { |
| this.canonicalIds.remove(entry.getKey()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| |
| |
| /** |
| * Test hook - does this vector hold an entry for the given ID? |
| */ |
| public boolean containsMember(T id) { |
| if (this.memberToVersion.containsKey(id)) |
| return true; |
| if (this.memberToGCVersion.containsKey(id)) |
| return true; |
| if (this.canonicalIds.containsKey(id)) |
| return true; |
| return false; |
| } |
| |
| /** |
| * This marks the given entry as departed, making it eligible to be removed during an operation |
| * like DistributedRegion.synchronizeWith() |
| * |
| */ |
| protected void markDepartedMember(T id) { |
| synchronized (this.memberToVersion) { |
| RegionVersionHolder<T> holder = this.memberToVersion.get(id); |
| if (holder != null) { |
| holder.isDepartedMember = true; |
| } |
| } |
| } |
| |
| /** |
| * check to see if tombstone removal in this RVV indicates that tombstones have been removed from |
| * its Region that have not been removed from the argument's Region. If this is the case, then a |
| * delta GII may leave entries in the other RVV's Region that should be deleted. |
| * |
| * @return true if there have been tombstone removals in this vector's Region that were not done |
| * in the argument's region |
| */ |
| public boolean hasHigherTombstoneGCVersions(RegionVersionVector<T> other) { |
| if (this.localGCVersion.get() > 0) { |
| Long version = other.memberToGCVersion.get(this.myId); |
| if (version == null) { |
| return true; // this vector has removed locally created tombstones that the other hasn't |
| // reaped |
| } else if (this.localGCVersion.get() > version.longValue()) { |
| return true; |
| } |
| } |
| // see if I have members with GC versions that the other vector doesn't have |
| for (T mbr : this.memberToGCVersion.keySet()) { |
| if (!other.memberToGCVersion.containsKey(mbr)) { |
| if (!mbr.equals(other.getOwnerId())) { |
| return true; |
| } |
| } |
| } |
| // see if the other vector has members that have been removed from this |
| // vector. If this happens we don't know if tombstones were removed |
| for (T id : other.memberToGCVersion.keySet()) { |
| if (!id.equals(this.myId) && !this.memberToGCVersion.containsKey(id)) { |
| return true; |
| } |
| } |
| // now see if I have anything newer for things we have in common |
| for (Map.Entry<T, Long> entry : other.memberToGCVersion.entrySet()) { |
| Long version = this.memberToGCVersion.get(entry.getKey()); |
| if (version != null) { |
| Long otherVersion = entry.getValue(); |
| if (version.longValue() > otherVersion.longValue()) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Test to see if this vector's region may be able to provide updates that the given vector has |
| * not seen. This method assumes that the argument is not a live vector and requires no |
| * synchronization. |
| */ |
| public boolean isNewerThanOrCanFillExceptionsFor(RegionVersionVector<T> other) { |
| if (other.singleMember) { |
| // do the diff for only a single member. This is typically a member that |
| // recently crashed. |
| Map.Entry<T, RegionVersionHolder<T>> entry = |
| other.memberToVersion.entrySet().iterator().next(); |
| RegionVersionHolder<T> holder = this.memberToVersion.get(entry.getKey()); |
| if (holder == null) { |
| return false; |
| } else { |
| RegionVersionHolder<T> otherHolder = entry.getValue(); |
| return holder.isNewerThanOrCanFillExceptionsFor(otherHolder); |
| } |
| } |
| // check my own updates |
| if (getCurrentVersion() > 0) { |
| RegionVersionHolder<T> otherHolder = other.memberToVersion.get(this.myId); |
| if (otherHolder == null) { |
| return true; |
| } |
| if (localExceptions.isNewerThanOrCanFillExceptionsFor(otherHolder)) { |
| return true; |
| } |
| } |
| // now see if I have anything newer for things we have in common |
| for (Map.Entry<T, RegionVersionHolder<T>> entry : this.memberToVersion.entrySet()) { |
| T mbr = entry.getKey(); |
| RegionVersionHolder<T> holder = entry.getValue(); |
| RegionVersionHolder<T> otherHolder = other.memberToVersion.get(mbr); |
| if (otherHolder != null) { |
| if (holder.isNewerThanOrCanFillExceptionsFor(otherHolder)) { |
| return true; |
| } |
| } else if (mbr.equals(other.getOwnerId())) { |
| if (holder.isNewerThanOrCanFillExceptionsFor(other.localExceptions)) { |
| return true; |
| } |
| } else { |
| // mbr = entry.getKey() does not exist in other |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private boolean isGCVersionDominatedByOtherHolder(Long gcVersion, |
| RegionVersionHolder<T> otherHolder) { |
| if (gcVersion == null || gcVersion.longValue() == 0) { |
| return true; |
| } else { |
| RegionVersionHolder<T> holder = new RegionVersionHolder<T>(gcVersion.longValue()); |
| return !holder.isNewerThanOrCanFillExceptionsFor(otherHolder); |
| } |
| } |
| |
| /** |
| * See if this vector's rvvgc has updates that has not seen. |
| */ |
| public synchronized boolean isRVVGCDominatedBy(RegionVersionVector<T> requesterRVV) { |
| if (requesterRVV.singleMember) { |
| // do the diff for only a single member. This is typically a member that |
| // recently crashed. |
| Map.Entry<T, RegionVersionHolder<T>> entry = |
| requesterRVV.memberToVersion.entrySet().iterator().next(); |
| |
| Long gcVersion = this.memberToGCVersion.get(entry.getKey()); |
| return isGCVersionDominatedByOtherHolder(gcVersion, entry.getValue()); |
| } |
| |
| boolean isDominatedByRemote = true; |
| long localgcversion = this.localGCVersion.get(); |
| if (localgcversion > 0) { |
| RegionVersionHolder<T> otherHolder = requesterRVV.memberToVersion.get(this.myId); |
| isDominatedByRemote = isGCVersionDominatedByOtherHolder(localgcversion, otherHolder); |
| if (isDominatedByRemote == false) { |
| return false; |
| } |
| } |
| |
| for (Map.Entry<T, Long> entry : this.memberToGCVersion.entrySet()) { |
| T mbr = entry.getKey(); |
| Long gcVersion = entry.getValue(); |
| RegionVersionHolder<T> otherHolder = null; |
| if (mbr.equals(requesterRVV.getOwnerId())) { |
| otherHolder = requesterRVV.localExceptions; |
| } else { |
| otherHolder = requesterRVV.memberToVersion.get(mbr); |
| } |
| isDominatedByRemote = isGCVersionDominatedByOtherHolder(gcVersion, otherHolder); |
| if (isDominatedByRemote == false) { |
| return false; |
| } |
| } |
| return isDominatedByRemote; |
| } |
| |
| /** |
| * wait for this vector to dominate the given vector. This means that the receiver has seen all |
| * version changes that the given vector has seen. |
| * |
| * @param otherVector the vector, usually from another member, that we want to dominate |
| * @param region the region owning this vector |
| * @return true if dominance was achieved |
| */ |
| public boolean waitToDominate(RegionVersionVector<T> otherVector, LocalRegion region) { |
| if (otherVector == this) { |
| return true; |
| } |
| boolean result = false; |
| long waitTimeRemaining = 0; |
| long startTime = System.currentTimeMillis(); |
| boolean interrupted = false; |
| CancelCriterion stopper = region.getCancelCriterion(); |
| try { |
| do { |
| stopper.checkCancelInProgress(null); |
| result = dominates(otherVector); |
| if (!result) { |
| long now = System.currentTimeMillis(); |
| waitTimeRemaining = MAX_DOMINANCE_WAIT_TIME - (now - startTime); |
| if (logger.isTraceEnabled()) { |
| logger.trace("Waiting up to {} ms to achieve dominance", waitTimeRemaining); |
| } |
| if (waitTimeRemaining > 0) { |
| long waitTime = Math.min(DOMINANCE_PAUSE_TIME, waitTimeRemaining); |
| try { |
| Thread.sleep(waitTime); |
| } catch (InterruptedException e) { |
| stopper.checkCancelInProgress(e); |
| interrupted = true; |
| break; |
| } |
| } |
| } |
| } while (waitTimeRemaining > 0 && !result); |
| } finally { |
| if (interrupted) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("waitForDominance has been interrupted"); |
| } |
| Thread.currentThread().interrupt(); |
| } |
| } |
| return result; |
| } |
| |
| |
| /** return true if this vector has seen all version changes that the other vector has seen */ |
| public boolean dominates(RegionVersionVector<T> other) { |
| return !other.isNewerThanOrCanFillExceptionsFor(this); |
| } |
| |
| /** |
| * Remove any exceptions for the given member that are older than the given version. This is used |
| * after a synchronization operation to get rid of unneeded history. |
| * |
| */ |
| public void removeExceptionsFor(DistributedMember mbr, long version) { |
| RegionVersionHolder<T> holder = this.memberToVersion.get(mbr); |
| if (holder != null) { |
| synchronized (holder) { |
| holder.removeExceptionsOlderThan(version); |
| } |
| } |
| } |
| |
| /** |
| * This is used by clear() while version generation is locked to remove old exceptions and update |
| * the GC vector to be the same as the current version vector |
| */ |
| public void removeOldVersions() { |
| synchronized (this.memberToVersion) { |
| long currentVersion = getCurrentVersion(); |
| for (Map.Entry<T, RegionVersionHolder<T>> entry : this.memberToVersion.entrySet()) { |
| RegionVersionHolder<T> holder = entry.getValue(); |
| T id = entry.getKey(); |
| holder.removeExceptionsOlderThan(holder.version); |
| this.memberToGCVersion.put(id, Long.valueOf(holder.version)); |
| } |
| this.localGCVersion.set(getCurrentVersion()); |
| if (this.localExceptions != null) { |
| synchronized (this.localExceptions) { |
| this.localExceptions.removeExceptionsOlderThan(currentVersion); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Test hook |
| */ |
| public int getExceptionCount(T mbr) { |
| if (mbr.equals(this.myId)) { |
| return localExceptions.getExceptionCount(); |
| } |
| RegionVersionHolder<T> h = this.memberToVersion.get(mbr); |
| if (h == null) { |
| throw new IllegalStateException("there should be a holder for " + mbr); |
| } |
| return h.getExceptionCount(); |
| } |
| |
| /** |
| * after deserializing a version tag or RVV the IDs in it should be replaced with references to |
| * IDs returned by this method. This vastly reduces the memory footprint of tags/stamps/rvvs |
| * |
| * @return the canonical reference |
| */ |
| public T getCanonicalId(T id) { |
| if (id == null) { |
| return null; |
| } else if (id.equals(myId)) { |
| return myId; |
| } else { |
| T can = id; |
| T cId = this.canonicalIds.get(can); |
| if (cId != null) { |
| return cId; |
| } |
| if (!id.isDiskStoreId()) { |
| InternalDistributedSystem system = InternalDistributedSystem.getConnectedInstance(); |
| if (system != null) { |
| can = (T) system.getDistributionManager().getCanonicalId((InternalDistributedMember) id); |
| } |
| } |
| synchronized (this.canonicalIdLock) { |
| HashMap<T, T> tmp = new HashMap<T, T>(this.canonicalIds); |
| tmp.put(can, can); |
| this.canonicalIds = tmp; |
| } |
| return can; |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.DataSerializableFixedID#toData(java.io.DataOutput) |
| */ |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| if (this.isLiveVector) { |
| throw new IllegalStateException("serialization of this object is not allowed"); |
| } |
| writeMember(this.myId, out); |
| int flags = 0; |
| if (this.singleMember) { |
| flags |= 0x01; |
| } |
| out.writeInt(flags); |
| out.writeLong(this.localVersion.get()); |
| out.writeLong(this.localGCVersion.get()); |
| out.writeInt(this.memberToVersion.size()); |
| for (Map.Entry<T, RegionVersionHolder<T>> entry : this.memberToVersion.entrySet()) { |
| writeMember(entry.getKey(), out); |
| InternalDataSerializer.invokeToData(entry.getValue(), out); |
| } |
| out.writeInt(this.memberToGCVersion.size()); |
| for (Map.Entry<T, Long> entry : this.memberToGCVersion.entrySet()) { |
| writeMember(entry.getKey(), out); |
| out.writeLong(entry.getValue()); |
| } |
| InternalDataSerializer.invokeToData(this.localExceptions, out); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.DataSerializableFixedID#fromData(java.io.DataInput) |
| */ |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| this.myId = readMember(in); |
| int flags = in.readInt(); |
| this.singleMember = ((flags & 0x01) == 0x01); |
| this.localVersion.set(in.readLong()); |
| this.localGCVersion.set(in.readLong()); |
| int numHolders = in.readInt(); |
| for (int i = 0; i < numHolders; i++) { |
| T key = readMember(in); |
| RegionVersionHolder<T> holder = new RegionVersionHolder<T>(in); |
| holder.id = key; |
| this.memberToVersion.put(key, holder); |
| } |
| int numGCVersions = in.readInt(); |
| for (int i = 0; i < numGCVersions; i++) { |
| T key = readMember(in); |
| RegionVersionHolder<T> holder = this.memberToVersion.get(key); |
| if (holder != null) { |
| key = holder.id; |
| } // else it could go in canonicalIds, but that's not used in copies of RVVs |
| long value = in.readLong(); |
| this.memberToGCVersion.put(key, value); |
| } |
| this.localExceptions = new RegionVersionHolder<T>(in); |
| } |
| |
| protected abstract void writeMember(T member, DataOutput out) throws IOException; |
| |
| protected abstract T readMember(DataInput in) throws IOException, ClassNotFoundException; |
| |
| /** |
| * When a tombstone is removed from the entry map this method must be called to record the max |
| * region-version of any tombstone reaped. Any older versions are then immediately eligible for |
| * reaping. |
| * |
| */ |
| public void recordGCVersion(T mbr, long regionVersion) { |
| if (mbr == null) { |
| mbr = this.myId; |
| } |
| // record the GC version to make sure we know we have seen this version |
| // during recovery, this will prevent us from recording exceptions |
| // for entries less than the GC version. |
| recordVersion(mbr, regionVersion); |
| if (mbr == null || mbr.equals(this.myId)) { |
| boolean succeeded; |
| do { |
| long v = localGCVersion.get(); |
| if (v > regionVersion) { |
| break; |
| } |
| succeeded = localGCVersion.compareAndSet(v, regionVersion); |
| } while (!succeeded); |
| } else { |
| synchronized (this.memberToGCVersion) { |
| Long holder = this.memberToGCVersion.get(mbr); |
| if (holder != null) { |
| this.memberToGCVersion.put(mbr, Math.max(regionVersion, holder)); |
| } else { |
| this.memberToGCVersion.put(mbr, regionVersion); |
| } |
| } |
| } |
| } |
| |
| /** |
| * record all of the GC versions in the given vector |
| * |
| */ |
| public void recordGCVersions(RegionVersionVector<T> other) { |
| assert other.memberToGCVersion != null : "incoming gc version set is null"; |
| recordGCVersion(other.myId, other.localGCVersion.get()); |
| for (Map.Entry<T, Long> entry : other.memberToGCVersion.entrySet()) { |
| recordGCVersion(entry.getKey(), entry.getValue().longValue()); |
| } |
| } |
| |
| /** |
| * returns true if tombstones newer than the given version have already been reaped. This means |
| * that a clear or GC has been received that should have wiped out the operation this version |
| * stamp represents, but this operation had not yet been received |
| * |
| * @return true if the given version should be rejected |
| */ |
| public boolean isTombstoneTooOld(T mbr, long gcVersion) { |
| Long newestReapedVersion; |
| if (mbr == null || mbr.equals(myId)) { |
| newestReapedVersion = this.localGCVersion.get(); |
| } else { |
| newestReapedVersion = this.memberToGCVersion.get(mbr); |
| } |
| if (newestReapedVersion != null) { |
| return (newestReapedVersion.longValue() >= gcVersion); |
| } |
| return false; |
| } |
| |
| /** |
| * returns the highest region-version of any tombstone owned by the given member that was reaped |
| * in this vector's region |
| */ |
| public long getGCVersion(T mbr) { |
| if (mbr == null || mbr.equals(this.myId)) { |
| return localGCVersion.get(); |
| } else { |
| synchronized (this.memberToGCVersion) { |
| Long holder = this.memberToGCVersion.get(mbr); |
| if (holder != null) { |
| return holder.longValue(); |
| } |
| return -1; |
| } |
| } |
| } |
| |
| /** |
| * Get a map of the member to the version and exception list for that member, including the local |
| * member. |
| */ |
| public Map<T, RegionVersionHolder<T>> getMemberToVersion() { |
| RegionVersionHolder<T> myExceptions; |
| myExceptions = this.localExceptions.clone(); |
| |
| HashMap<T, RegionVersionHolder<T>> results = |
| new HashMap<T, RegionVersionHolder<T>>(memberToVersion); |
| |
| results.put(getOwnerId(), myExceptions); |
| return results; |
| } |
| |
| /** |
| * Get a map of member to the GC version of that member, including the local member. |
| */ |
| public synchronized Map<T, Long> getMemberToGCVersion() { |
| HashMap<T, Long> results = new HashMap<T, Long>(memberToGCVersion); |
| if (localGCVersion.get() > 0) { |
| results.put(getOwnerId(), localGCVersion.get()); |
| } |
| return results; |
| } |
| |
| /** |
| * Remove an exceptions that are older than the current GC version for each member in the RVV. |
| */ |
| public void pruneOldExceptions() { |
| Set<T> members; |
| members = new HashSet<T>(memberToGCVersion.keySet()); |
| |
| for (T member : members) { |
| Long gcVersion = memberToGCVersion.get(member); |
| |
| RegionVersionHolder<T> holder; |
| holder = memberToVersion.get(member); |
| |
| if (holder != null && gcVersion != null) { |
| synchronized (holder) { |
| holder.removeExceptionsOlderThan(gcVersion); |
| } |
| } |
| } |
| |
| localExceptions.removeExceptionsOlderThan(localGCVersion.get()); |
| } |
| |
| @Override |
| public String toString() { |
| if (this.isLiveVector) { |
| return "RegionVersionVector{rv" + this.localVersion + " gc" + this.localGCVersion + "}@" |
| + System.identityHashCode(this); |
| } else { |
| return fullToString(); |
| } |
| } |
| |
| /** this toString method is not thread-safe */ |
| public String fullToString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("RegionVersionVector[").append(this.myId).append("={rv") |
| .append(this.localExceptions.version).append(" gc" + this.localGCVersion) |
| .append(" localVersion=" + this.localVersion); |
| try { |
| sb.append(" local exceptions=" + this.localExceptions.exceptionsToString()); |
| } catch (ConcurrentModificationException c) { |
| sb.append(" (unable to access local exceptions)"); |
| } |
| sb.append("} others="); |
| String mbrVersions = ""; |
| try { |
| mbrVersions = this.memberToVersion.toString(); |
| } catch (ConcurrentModificationException e) { |
| mbrVersions = "(unable to access)"; |
| } |
| sb.append(mbrVersions); |
| if (this.memberToGCVersion != null) { |
| try { |
| mbrVersions = this.memberToGCVersion.toString(); |
| } catch (ConcurrentModificationException e) { |
| mbrVersions = "(unable to access)"; |
| } |
| sb.append(", gc=").append(mbrVersions); |
| } |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| |
| @Override |
| public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {} |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, |
| InternalDistributedMember whoSuspected, String reason) {} |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} |
| |
| /* |
| * (non-Javadoc) this ensures that the version generation lock is released |
| * |
| * @see org.apache.geode.distributed.internal.MembershipListener#memberDeparted(org.apache.geode. |
| * distributed.internal.membership.InternalDistributedMember, boolean) |
| */ |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| final InternalDistributedMember id, boolean crashed) { |
| // since unlockForClear uses synchronization we need to try to execute it in another |
| // thread so that membership events aren't blocked |
| if (distributionManager != null) { |
| distributionManager.getWaitingThreadPool().execute(new Runnable() { |
| @Override |
| public void run() { |
| unlockForClear(id); |
| } |
| }); |
| } else { |
| unlockForClear(id); |
| } |
| } |
| |
| public static RegionVersionVector<?> create(boolean persistent, DataInput in) |
| throws IOException, ClassNotFoundException { |
| RegionVersionVector<?> rvv; |
| if (persistent) { |
| rvv = new DiskRegionVersionVector(); |
| } else { |
| rvv = new VMRegionVersionVector(); |
| } |
| InternalDataSerializer.invokeFromData(rvv, in); |
| return rvv; |
| } |
| |
| public static RegionVersionVector<?> create(VersionSource<?> versionMember, LocalRegion owner) { |
| if (versionMember.isDiskStoreId()) { |
| return new DiskRegionVersionVector((DiskStoreID) versionMember, owner); |
| } else { |
| return new VMRegionVersionVector((InternalDistributedMember) versionMember, owner); |
| } |
| } |
| |
| /** |
| * For test purposes, see if two RVVs have seen the same events and GC version vectors |
| * |
| * @return true if the RVVs are the same. |
| */ |
| public boolean sameAs(RegionVersionVector<T> other) { |
| // Compare the version version vectors |
| Map<T, RegionVersionHolder<T>> myMemberToVersion = getMemberToVersion(); |
| Map<T, RegionVersionHolder<T>> otherMemberToVersion = other.getMemberToVersion(); |
| |
| if (!myMemberToVersion.keySet().equals(otherMemberToVersion.keySet())) { |
| return false; |
| } |
| for (Iterator<T> it = myMemberToVersion.keySet().iterator(); it.hasNext();) { |
| T key = it.next(); |
| if (!myMemberToVersion.get(key).sameAs(otherMemberToVersion.get(key))) { |
| return false; |
| } |
| } |
| |
| Map<T, Long> myGCVersion = getMemberToGCVersion(); |
| Map<T, Long> otherGCVersion = other.getMemberToGCVersion(); |
| |
| if (!myGCVersion.equals(otherGCVersion)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Note: test only. If put in production will cause ConcurrentModificationException see if two |
| * RVVs have seen the same events and GC version vectors This will treat member with null version |
| * the same as member with version=0 |
| * |
| * @return true if the RVVs are the same logically. |
| */ |
| public boolean logicallySameAs(RegionVersionVector<T> other) { |
| return (this.dominates(other) && other.dominates(this)); |
| } |
| |
| |
| /** |
| * @return true if this vector represents the entry for a single member to be used in |
| * synchronizing caches for that member in the case of a crash. Otherwise return false. |
| */ |
| public boolean isForSynchronization() { |
| return this.singleMember; |
| } |
| |
| /** |
| * Test hook - see if a member is marked as "departed" |
| */ |
| public boolean isDepartedMember(VersionSource<T> mbr) { |
| RegionVersionHolder<T> h = memberToVersion.get(mbr); |
| return (h != null) && h.isDepartedMember; |
| } |
| |
| @Override |
| public Version[] getSerializationVersions() { |
| return null; |
| } |
| |
| // /** |
| // * This class will wrap DM member IDs to provide integers that can be stored |
| // * on disk and be timed out in the vector. |
| // * |
| // * |
| // */ |
| // static class RVVMember implements Comparable { |
| // private static AtomicLong NextId = new AtomicLong(); |
| // T memberId; |
| // long timeAdded; |
| // long internalId; |
| // |
| // RVVMember(T m, long timeAdded, long internalId) { |
| // this.memberId = m; |
| // this.timeAdded = timeAdded; |
| // this.internalId = internalId; |
| // if (NextId.get() < internalId) { |
| // NextId.set(internalId); |
| // } |
| // } |
| // |
| // RVVMember(T m) { |
| // this.memberId = m; |
| // this.timeAdded = System.currentTimeMillis(); |
| // this.internalId = NextId.incrementAndGet(); |
| // } |
| // |
| // public int compareTo(Object o) { |
| // if (o instanceof T) { |
| // return -((T)o).compareTo(this.memberId); |
| // } else { |
| // return this.memberId.compareTo(((RVVMember)o).memberId); |
| // } |
| // } |
| // |
| // @Override |
| // public int hashCode() { |
| // return this.memberId.hashCode(); |
| // } |
| // |
| // @Override |
| // public boolean equals(Object o) { |
| // if (o instanceof T) { |
| // return ((T)o).equals(this.memberId); |
| // } else { |
| // return this.memberId.equals(((RVVMember)o).memberId); |
| // } |
| // } |
| // |
| // @Override |
| // public String toString() { |
| // return "vID(#"+this.internalId+"; time="+this.timeAdded+"; id="+this.memberId+")"; |
| // } |
| // } |
| |
| |
| |
| } |