| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.CopyHelper; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.DeltaSerializationException; |
| import com.gemstone.gemfire.GemFireIOException; |
| import com.gemstone.gemfire.InternalGemFireError; |
| import com.gemstone.gemfire.InvalidDeltaException; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.CacheClosedException; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.CacheWriter; |
| import com.gemstone.gemfire.cache.CacheWriterException; |
| import com.gemstone.gemfire.cache.CustomEvictionAttributes; |
| import com.gemstone.gemfire.cache.DiskAccessException; |
| import com.gemstone.gemfire.cache.EntryNotFoundException; |
| import com.gemstone.gemfire.cache.EvictionAction; |
| import com.gemstone.gemfire.cache.EvictionAlgorithm; |
| import com.gemstone.gemfire.cache.EvictionAttributes; |
| import com.gemstone.gemfire.cache.EvictionCriteria; |
| import com.gemstone.gemfire.cache.ExpirationAction; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.TimeoutException; |
| import com.gemstone.gemfire.cache.hdfs.HDFSIOException; |
| import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; |
| import com.gemstone.gemfire.cache.partition.PartitionListener; |
| import com.gemstone.gemfire.cache.query.internal.IndexUpdater; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.AtomicLongWithTerminalState; |
| import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor; |
| import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; |
| import com.gemstone.gemfire.distributed.internal.DistributionStats; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.HeapDataOutputStream; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile; |
| import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo; |
| import com.gemstone.gemfire.internal.cache.control.MemoryEvent; |
| import com.gemstone.gemfire.internal.cache.delta.Delta; |
| import com.gemstone.gemfire.internal.cache.partitioned.Bucket; |
| import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage; |
| import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage; |
| import com.gemstone.gemfire.internal.cache.partitioned.LockObject; |
| import com.gemstone.gemfire.internal.cache.partitioned.PRTombstoneMessage; |
| import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage; |
| import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage; |
| import com.gemstone.gemfire.internal.cache.partitioned.PutMessage; |
| import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientTombstoneMessage; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage; |
| import com.gemstone.gemfire.internal.cache.versions.VersionSource; |
| import com.gemstone.gemfire.internal.cache.versions.VersionStamp; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; |
| import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; |
| import com.gemstone.gemfire.internal.concurrent.Atomics; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.logging.log4j.LogMarker; |
| import com.gemstone.gemfire.internal.offheap.StoredObject; |
| import com.gemstone.gemfire.internal.offheap.annotations.Unretained; |
| import com.gemstone.gemfire.internal.concurrent.AtomicLong5; |
| |
| |
| /** |
| * The storage used for a Partitioned Region. |
| * This class asserts distributed scope as well as a replicate data policy |
| * It does not support transactions |
| * |
| * Primary election for a BucketRegion can be found in the |
| * {@link com.gemstone.gemfire.internal.cache.BucketAdvisor} class |
| * |
| * @author Mitch Thomas |
| * @since 5.1 |
| * |
| */ |
| public class BucketRegion extends DistributedRegion |
| implements Bucket |
| { |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final RawValue NULLVALUE = new RawValue(null); |
| public static final RawValue REQUIRES_ENTRY_LOCK = new RawValue(null); |
| /** |
| * A special value for the bucket size indicating that this bucket |
| * has been destroyed. |
| */ |
| private static final long BUCKET_DESTROYED = Long.MIN_VALUE; |
| private AtomicLong counter = new AtomicLong(); |
| private AtomicLong limit; |
| private final AtomicLong numOverflowOnDisk = new AtomicLong(); |
| private final AtomicLong numOverflowBytesOnDisk = new AtomicLong(); |
| private final AtomicLong numEntriesInVM = new AtomicLong(); |
| private final AtomicLong evictions = new AtomicLong(); |
| |
| /** |
| * Contains size in bytes of the values stored |
| * in theRealMap. Sizes are tallied during put and remove operations. |
| */ |
| private final AtomicLongWithTerminalState bytesInMemory = new AtomicLongWithTerminalState(); |
| |
| public static final class RawValue { |
| private final Object rawValue; |
| public RawValue(Object rawVal) { |
| this.rawValue = rawVal; |
| } |
| |
| public final boolean isValueByteArray() { |
| return this.rawValue instanceof byte[]; |
| } |
| |
| public Object getRawValue() { |
| return this.rawValue; |
| } |
| |
| public void writeAsByteArray(DataOutput out) throws IOException { |
| if (isValueByteArray()) { |
| DataSerializer.writeByteArray((byte[]) this.rawValue, out); |
| } else if (this.rawValue instanceof CachedDeserializable) { |
| ((CachedDeserializable)this.rawValue).writeValueAsByteArray(out); |
| } else if (Token.isInvalid(this.rawValue)) { |
| DataSerializer.writeByteArray(null, out); |
| } else if (this.rawValue == Token.TOMBSTONE) { |
| DataSerializer.writeByteArray(null, out); |
| } else { |
| DataSerializer.writeObjectAsByteArray(this.rawValue, out); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "RawValue("+this.rawValue+")"; |
| } |
| |
| /** |
| * Return the de-serialized value without changing the stored form |
| * in the heap. This causes local access to create a de-serialized copy (extra work) |
| * in favor of keeping values in serialized form which is important because |
| * it makes remote access more efficient. This assumption is that remote |
| * access is much more frequent. |
| * TODO Unused, but keeping for potential performance boost when local Bucket |
| * access de-serializes the entry (which could hurt perf.) |
| * |
| * @return the de-serialized value |
| */ |
| public Object getDeserialized(boolean copyOnRead) { |
| if (isValueByteArray()) { |
| if (copyOnRead) { |
| // TODO move this code to CopyHelper.copy? |
| byte[] src = (byte[])this.rawValue; |
| byte[] dest = new byte[src.length]; |
| System.arraycopy(this.rawValue, 0, dest, 0, dest.length); |
| return dest; |
| } else { |
| return this.rawValue; |
| } |
| } else if (this.rawValue instanceof CachedDeserializable) { |
| if (copyOnRead) { |
| return ((CachedDeserializable)this.rawValue).getDeserializedWritableCopy(null, null); |
| } else { |
| return ((CachedDeserializable)this.rawValue).getDeserializedForReading(); |
| } |
| } else if (Token.isInvalid(this.rawValue)) { |
| return null; |
| } else { |
| if (copyOnRead) { |
| return CopyHelper.copy(this.rawValue); |
| } else { |
| return this.rawValue; |
| } |
| } |
| } |
| } |
| |
| private static final long serialVersionUID = 1L; |
| |
| private final int redundancy; |
| |
| /** the partitioned region to which this bucket belongs */ |
| private final PartitionedRegion partitionedRegion; |
| private final Map<Object, ExpiryTask> pendingSecondaryExpires = new HashMap<Object, ExpiryTask>(); |
| |
| /* one map per bucket region */ |
| public HashMap allKeysMap = new HashMap(); |
| |
| static final boolean FORCE_LOCAL_LISTENERS_INVOCATION = |
| Boolean.getBoolean("gemfire.BucketRegion.alwaysFireLocalListeners"); |
| // gemfire.BucktRegion.alwaysFireLocalListeners=true |
| |
| private volatile AtomicLong5 eventSeqNum = null; |
| |
| public AtomicLong5 getEventSeqNum() { |
| return eventSeqNum; |
| } |
| |
| protected final AtomicReference<HoplogOrganizer> hoplog = new AtomicReference<HoplogOrganizer>(); |
| |
| public BucketRegion(String regionName, RegionAttributes attrs, |
| LocalRegion parentRegion, GemFireCacheImpl cache, |
| InternalRegionArguments internalRegionArgs) { |
| super(regionName, attrs, parentRegion, cache, internalRegionArgs); |
| if(PartitionedRegion.DISABLE_SECONDARY_BUCKET_ACK) { |
| Assert.assertTrue(attrs.getScope().isDistributedNoAck()); |
| } |
| else { |
| Assert.assertTrue(attrs.getScope().isDistributedAck()); |
| } |
| Assert.assertTrue(attrs.getDataPolicy().withReplication()); |
| Assert.assertTrue( ! attrs.getEarlyAck()); |
| Assert.assertTrue(isUsedForPartitionedRegionBucket()); |
| Assert.assertTrue( ! isUsedForPartitionedRegionAdmin()); |
| Assert.assertTrue(internalRegionArgs.getBucketAdvisor() != null); |
| Assert.assertTrue(internalRegionArgs.getPartitionedRegion() != null); |
| this.redundancy = internalRegionArgs.getPartitionedRegionBucketRedundancy(); |
| this.partitionedRegion = internalRegionArgs.getPartitionedRegion(); |
| } |
| |
| // Attempt to direct the GII process to the primary first |
| @Override |
| protected void initialize(InputStream snapshotInputStream, |
| InternalDistributedMember imageTarget, |
| InternalRegionArguments internalRegionArgs) |
| throws TimeoutException, IOException, ClassNotFoundException |
| { |
| // Set this region in the ProxyBucketRegion early so that profile exchange will |
| // perform the correct fillInProfile method |
| getBucketAdvisor().getProxyBucketRegion().setBucketRegion(this); |
| boolean success = false; |
| try { |
| if (this.partitionedRegion.isShadowPR() |
| && this.partitionedRegion.getColocatedWith() != null) { |
| PartitionedRegion parentPR = ColocationHelper |
| .getLeaderRegion(this.partitionedRegion); |
| BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById( |
| getId()); |
| // needs to be set only once. |
| if (parentBucket.eventSeqNum == null) { |
| parentBucket.eventSeqNum = new AtomicLong5(getId()); |
| } |
| } |
| if (this.partitionedRegion.getColocatedWith() == null) { |
| this.eventSeqNum = new AtomicLong5(getId()); |
| } else { |
| PartitionedRegion parentPR = ColocationHelper |
| .getLeaderRegion(this.partitionedRegion); |
| BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById( |
| getId()); |
| if (parentBucket == null && logger.isDebugEnabled()) { |
| logger.debug("The parentBucket of region {} bucketId {} is NULL", this.partitionedRegion.getFullPath(), getId()); |
| } |
| Assert.assertTrue(parentBucket != null); |
| this.eventSeqNum = parentBucket.eventSeqNum; |
| } |
| |
| final InternalDistributedMember primaryHolder = |
| getBucketAdvisor().basicGetPrimaryMember(); |
| if (primaryHolder != null && ! primaryHolder.equals(getMyId())) { |
| // Ignore the provided image target, use an existing primary (if any) |
| super.initialize(snapshotInputStream, primaryHolder, internalRegionArgs); |
| } else { |
| super.initialize(snapshotInputStream, imageTarget, internalRegionArgs); |
| } |
| |
| success = true; |
| } finally { |
| if(!success) { |
| removeFromPeersAdvisors(false); |
| getBucketAdvisor().getProxyBucketRegion().clearBucketRegion(this); |
| } |
| } |
| } |
| |
| |
| |
| @Override |
| public void initialized() { |
| //announce that the bucket is ready |
| //setHosting performs a profile exchange, so there |
| //is no need to call super.initialized() here. |
| } |
| |
| @Override |
| protected DiskStoreImpl findDiskStore(RegionAttributes ra, InternalRegionArguments internalRegionArgs) { |
| return internalRegionArgs.getPartitionedRegion().getDiskStore(); |
| } |
| |
| @Override |
| public void createEventTracker() { |
| this.eventTracker = new EventTracker(this); |
| this.eventTracker.start(); |
| } |
| |
| @Override |
| protected CacheDistributionAdvisor createDistributionAdvisor( |
| InternalRegionArguments internalRegionArgs){ |
| return internalRegionArgs.getBucketAdvisor(); |
| } |
| |
| public BucketAdvisor getBucketAdvisor() { |
| return (BucketAdvisor) getDistributionAdvisor(); |
| } |
| |
| public boolean isHosting() { |
| return getBucketAdvisor().isHosting(); |
| } |
| |
| @Override |
| protected EventID distributeTombstoneGC(Set<Object> keysRemoved) { |
| EventID eventId = super.distributeTombstoneGC(keysRemoved); |
| if (keysRemoved != null && keysRemoved.size() > 0 && getFilterProfile() != null) { |
| // send the GC to members that don't have the bucket but have the PR so they |
| // can forward the event to clients |
| PRTombstoneMessage.send(this, keysRemoved, eventId); |
| } |
| return eventId; |
| } |
| |
| @Override |
| protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) { |
| if (CacheClientNotifier.getInstance() != null) { |
| // Only route the event to clients interested in the partitioned region. |
| // We do this by constructing a region-level event and then use it to |
| // have the filter profile ferret out all of the clients that have interest |
| // in this region |
| FilterProfile fp = getFilterProfile(); |
| if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients |
| && (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients |
| RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId()); |
| FilterInfo clientRouting = routing; |
| if (clientRouting == null) { |
| clientRouting = fp.getLocalFilterRouting(regionEvent); |
| } |
| regionEvent.setLocalFilterInfo(clientRouting); |
| |
| ClientUpdateMessage clientMessage = ClientTombstoneMessage.gc(getPartitionedRegion(), removedKeys, |
| eventID); |
| CacheClientNotifier.notifyClients(regionEvent, clientMessage); |
| } |
| } |
| } |
| |
| /** |
| * Search the CM for keys. If found any, return the first found one |
| * Otherwise, save the keys into the CM, and return null |
| * The thread will acquire the lock before searching. |
| * |
| * @param keys |
| * @return first key found in CM |
| * null means not found |
| */ |
| private LockObject searchAndLock(Object keys[]) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| LockObject foundLock = null; |
| |
| synchronized(allKeysMap) { |
| // check if there's any key in map |
| for (int i=0; i<keys.length; i++) { |
| if (allKeysMap.containsKey(keys[i])) { |
| foundLock = (LockObject)allKeysMap.get(keys[i]); |
| if (isDebugEnabled) { |
| logger.debug("LockKeys: found key: {}:{}", keys[i], foundLock.lockedTimeStamp); |
| } |
| break; |
| } |
| } |
| |
| // save the keys when still locked |
| if (foundLock == null) { |
| for (int i=0; i<keys.length; i++) { |
| LockObject lockValue = new LockObject(keys[i], isDebugEnabled?System.currentTimeMillis():0); |
| allKeysMap.put(keys[i], lockValue); |
| if (isDebugEnabled) { |
| logger.debug("LockKeys: add key: {}:{}", keys[i], lockValue.lockedTimeStamp); |
| } |
| } |
| } |
| } |
| |
| return foundLock; |
| } |
| |
| /** |
| * After processed the keys, this method will remove them from CM. |
| * And notifyAll for each key. |
| * The thread needs to acquire lock of CM first. |
| * |
| * @param keys |
| */ |
| public void removeAndNotifyKeys(Object keys[]) { |
| final boolean isTraceEnabled = logger.isTraceEnabled(); |
| |
| synchronized(allKeysMap) { |
| for (int i=0; i<keys.length; i++) { |
| LockObject lockValue = (LockObject)allKeysMap.remove(keys[i]); |
| if (lockValue != null) { |
| // let current thread become the monitor of the key object |
| synchronized (lockValue) { |
| lockValue.setRemoved(); |
| if (isTraceEnabled) { |
| long waitTime = System.currentTimeMillis()-lockValue.lockedTimeStamp; |
| logger.trace("LockKeys: remove key {}, notifyAll for {}. It waited", keys[i], lockValue, waitTime); |
| } |
| lockValue.notifyAll(); |
| } |
| } |
| } // for |
| } |
| } |
| |
| /** |
| * Keep checking if CM has contained any key in keys. If yes, wait for notify, |
| * then retry again. This method will block current thread for long time. |
| * It only exits when current thread successfully save its keys into CM. |
| * |
| * @param keys |
| */ |
| public void waitUntilLocked(Object keys[]) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| final String title = "BucketRegion.waitUntilLocked:"; |
| while (true) { |
| LockObject foundLock = searchAndLock(keys); |
| |
| if (foundLock != null) { |
| synchronized(foundLock) { |
| try { |
| while (!foundLock.isRemoved()) { |
| this.partitionedRegion.checkReadiness(); |
| foundLock.wait(1000); |
| // primary could be changed by prRebalancing while waiting here |
| checkForPrimary(); |
| } |
| } |
| catch (InterruptedException e) { |
| // TODO this isn't a localizable string and it's being logged at info level |
| if (isDebugEnabled) { |
| logger.debug("{} interrupted while waiting for {}", title, foundLock, e.getMessage()); |
| } |
| } |
| if (isDebugEnabled) { |
| long waitTime = System.currentTimeMillis()-foundLock.lockedTimeStamp; |
| logger.debug("{} waited {} ms to lock", title, waitTime, foundLock); |
| } |
| } |
| } else { |
| // now the keys have been locked by this thread |
| break; |
| } // to lock and process |
| } // while |
| } |
| |
| // Entry (Put/Create) rules |
| // If this is a primary for the bucket |
| // 1) apply op locally, aka update or create entry |
| // 2) distribute op to bucket secondaries and bridge servers with synchrony on local entry |
| // 3) cache listener with synchrony on entry |
| // Else not a primary |
| // 1) apply op locally |
| // 2) update local bs, gateway |
| @Override |
| protected |
| boolean virtualPut(EntryEventImpl event, |
| boolean ifNew, |
| boolean ifOld, |
| Object expectedOldValue, |
| boolean requireOldValue, |
| long lastModified, |
| boolean overwriteDestroyed) |
| throws TimeoutException, |
| CacheWriterException { |
| beginLocalWrite(event); |
| |
| try { |
| if (this.partitionedRegion.isParallelWanEnabled()) { |
| handleWANEvent(event); |
| } |
| if (!hasSeenEvent(event)) { |
| forceSerialized(event); |
| RegionEntry oldEntry = this.entries |
| .basicPut(event, lastModified, ifNew, ifOld, expectedOldValue, |
| requireOldValue, overwriteDestroyed); |
| return oldEntry != null; |
| } |
| if (event.getDeltaBytes() != null && event.getRawNewValue() == null) { |
| // This means that this event has delta bytes but no full value. |
| // Request the full value of this event. |
| // The value in this vm may not be same as this event's value. |
| throw new InvalidDeltaException( |
| "Cache encountered replay of event containing delta bytes for key " |
| + event.getKey()); |
| } |
| // Forward the operation and event messages |
| // to members with bucket copies that may not have seen the event. Their |
| // EventTrackers will keep them from applying the event a second time if |
| // they've already seen it. |
| if (logger.isTraceEnabled(LogMarker.DM)) { |
| logger.trace(LogMarker.DM, "BR.virtualPut: this cache has already seen this event {}", event); |
| } |
| distributeUpdateOperation(event, lastModified); |
| return true; |
| } finally { |
| endLocalWrite(event); |
| } |
| } |
| |
| |
| public long generateTailKey() { |
| long key = this.eventSeqNum.addAndGet(this.partitionedRegion |
| .getTotalNumberOfBuckets()); |
| if (key < 0 |
| || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) { |
| logger |
| .error(LocalizedMessage |
| .create( |
| LocalizedStrings.GatewaySender_SEQUENCENUMBER_GENERATED_FOR_EVENT_IS_INVALID, |
| new Object[] { key, getId() })); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("WAN: On primary bucket {}, setting the seq number as {}", |
| getId(), this.eventSeqNum.get()); |
| } |
| return eventSeqNum.get(); |
| } |
| |
| public void handleWANEvent(EntryEventImpl event) { |
| if (this.eventSeqNum == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("The bucket corresponding to this user bucket is not created yet. This event will not go to remote wan site. Event: {}", event); |
| } |
| } |
| |
| if (!(this instanceof AbstractBucketRegionQueue)) { |
| if (getBucketAdvisor().isPrimary()) { |
| long key = this.eventSeqNum.addAndGet(this.partitionedRegion.getTotalNumberOfBuckets()); |
| if (key < 0 |
| || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) { |
| logger.error(LocalizedMessage.create(LocalizedStrings.GatewaySender_SEQUENCENUMBER_GENERATED_FOR_EVENT_IS_INVALID, |
| new Object[] { key, getId() })); |
| } |
| event.setTailKey(key); |
| if (logger.isDebugEnabled()) { |
| logger.debug("WAN: On primary bucket {}, setting the seq number as {}", getId(), this.eventSeqNum.get()); |
| } |
| } else { |
| // Can there be a race here? Like one thread has done put in primary but |
| // its update comes later |
| // in that case its possible that a tail key is missed. |
| // we can handle that by only incrementing the tailKey and never |
| // setting it less than the current value. |
| Atomics.setIfGreater(this.eventSeqNum, event.getTailKey()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("WAN: On secondary bucket {}, setting the seq number as {}", getId(), event.getTailKey()); |
| } |
| } |
| } |
| } |
| /** |
| * Fix for Bug#45917 |
| * We are updating the seqNumber so that new seqNumbers are |
| * generated starting from the latest in the system. |
| * @param l |
| */ |
| |
| public void updateEventSeqNum(long l) { |
| Atomics.setIfGreater(this.eventSeqNum, l); |
| if (logger.isDebugEnabled()) { |
| logger.debug("WAN: On bucket {}, setting the seq number as {} before GII", getId(), l); |
| } |
| } |
| |
| protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) { |
| if (!event.isOriginRemote() |
| && !event.isNetSearch() |
| && getBucketAdvisor().isPrimary()) { |
| if (event.isBulkOpInProgress()) { |
| // consolidate the UpdateOperation for each entry into a PutAllMessage |
| // since we did not call basicPutPart3(), so we have to explicitly addEntry here |
| event.getPutAllOperation().addEntry(event, this.getId()); |
| } else { |
| new UpdateOperation(event, lastModified).distribute(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("sent update operation : for region : {}: with event: {}", this.getName(), event); |
| } |
| } |
| } |
| if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later |
| event.invokeCallbacks(this, true, true); |
| } |
| } |
| |
| /** |
| * distribute the operation in basicPutPart2 so the region entry lock is |
| * held |
| */ |
| @Override |
| protected long basicPutPart2(EntryEventImpl event, RegionEntry entry, boolean isInitialized, |
| long lastModified, boolean clearConflict) { |
| // Assumed this is called with entry synchrony |
| |
| // Typically UpdateOperation is called with the |
| // timestamp returned from basicPutPart2, but as a bucket we want to do |
| // distribution *before* we do basicPutPart2. |
| final long modifiedTime = event.getEventTime(lastModified); |
| // Update the get stats if necessary. |
| if (this.partitionedRegion.getDataStore().hasClientInterest(event)) { |
| updateStatsForGet(entry, true); |
| } |
| if (!event.isOriginRemote()) { |
| if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) { |
| boolean eventHasDelta = event.getDeltaBytes() != null; |
| VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event); |
| if (v != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("generated version tag {} in region {}", v, this.getName()); |
| } |
| } |
| } |
| |
| // This code assumes it is safe ignore token mode (GII in progress) |
| // because it assumes when the origin of the event is local, |
| // the GII has completed and the region is initialized and open for local |
| // ops |
| if (!event.isBulkOpInProgress()) { |
| long start = this.partitionedRegion.getPrStats().startSendReplication(); |
| try { |
| UpdateOperation op = new UpdateOperation(event, modifiedTime); |
| op.distribute(); |
| } finally { |
| this.partitionedRegion.getPrStats().endSendReplication(start); |
| } |
| } else { |
| // consolidate the UpdateOperation for each entry into a PutAllMessage |
| // basicPutPart3 takes care of this |
| } |
| } |
| |
| return super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict); |
| } |
| |
| protected void notifyGatewaySender(EnumListenerEvent operation, |
| EntryEventImpl event) { |
| // We don't need to clone the event for new Gateway Senders. |
| // Preserve the bucket reference for resetting it later. |
| LocalRegion bucketRegion = event.getRegion(); |
| try { |
| event.setRegion(this.partitionedRegion); |
| this.partitionedRegion.notifyGatewaySender(operation, event); |
| } |
| finally { |
| // reset the event region back to bucket region. |
| // This should work as gateway queue create GatewaySenderEvent for |
| // queueing. |
| event.setRegion(bucketRegion); |
| } |
| } |
| |
| public void checkForPrimary() { |
| final boolean isp = getBucketAdvisor().isPrimary(); |
| if (! isp){ |
| this.partitionedRegion.checkReadiness(); |
| checkReadiness(); |
| InternalDistributedMember primaryHolder = getBucketAdvisor().basicGetPrimaryMember(); |
| throw new PrimaryBucketException("Bucket " + getName() |
| + " is not primary. Current primary holder is "+primaryHolder); |
| } |
| } |
| |
| /** |
| * Checks to make sure that this node is primary, and locks the bucket |
| * to make sure the bucket stays the primary bucket while the write |
| * is in progress. Any call to this method must be followed with a call |
| * to endLocalWrite(). |
| * @param event |
| */ |
| private boolean beginLocalWrite(EntryEventImpl event) { |
| if(!needWriteLock(event)) { |
| return false; |
| } |
| |
| if (cache.isCacheAtShutdownAll()) { |
| throw new CacheClosedException("Cache is shutting down"); |
| } |
| |
| Object keys[] = new Object[1]; |
| keys[0] = event.getKey(); |
| waitUntilLocked(keys); // it might wait for long time |
| |
| boolean lockedForPrimary = false; |
| try { |
| doLockForPrimary(false); |
| return lockedForPrimary = true; |
| } finally { |
| if (!lockedForPrimary) { |
| removeAndNotifyKeys(keys); |
| } |
| } |
| } |
| |
| /** |
| * lock this bucket and, if present, its colocated "parent" |
| * @param tryLock - whether to use tryLock (true) or a blocking lock (false) |
| * @return true if locks were obtained and are still held |
| */ |
| public boolean doLockForPrimary(boolean tryLock) { |
| boolean locked = lockPrimaryStateReadLock(tryLock); |
| if(!locked) { |
| return false; |
| } |
| |
| boolean isPrimary = false; |
| try { |
| // Throw a PrimaryBucketException if this VM is assumed to be the |
| // primary but isn't, preventing update and distribution |
| checkForPrimary(); |
| |
| if (cache.isCacheAtShutdownAll()) { |
| throw new CacheClosedException("Cache is shutting down"); |
| } |
| |
| isPrimary = true; |
| } finally { |
| if(!isPrimary) { |
| doUnlockForPrimary(); |
| } |
| } |
| |
| return true; |
| } |
| |
| private boolean lockPrimaryStateReadLock(boolean tryLock) { |
| Lock activeWriteLock = this.getBucketAdvisor().getActiveWriteLock(); |
| Lock parentLock = this.getBucketAdvisor().getParentActiveWriteLock(); |
| for (;;) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| //Get the lock. If we have to wait here, it's because |
| //this VM is actively becoming "not primary". We don't want |
| //to throw an exception until this VM is actually no longer |
| //primary, so we wait here for not primary to complete. See bug #39963 |
| if (parentLock != null) { |
| if (tryLock) { |
| boolean locked = parentLock.tryLock(); |
| if (!locked) { |
| return false; |
| } |
| } else { |
| parentLock.lockInterruptibly(); |
| } |
| if (tryLock) { |
| boolean locked = activeWriteLock.tryLock(); |
| if (!locked) { |
| parentLock.unlock(); |
| return false; |
| } |
| } else { |
| activeWriteLock.lockInterruptibly(); |
| } |
| } |
| else { |
| if (tryLock) { |
| boolean locked = activeWriteLock.tryLock(); |
| if (!locked) { |
| return false; |
| } |
| } else { |
| activeWriteLock.lockInterruptibly(); |
| } |
| } |
| break; // success |
| } catch (InterruptedException e) { |
| interrupted = true; |
| cache.getCancelCriterion().checkCancelInProgress(null); |
| // don't throw InternalGemFireError to fix bug 40102 |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| public void doUnlockForPrimary() { |
| Lock activeWriteLock = this.getBucketAdvisor().getActiveWriteLock(); |
| activeWriteLock.unlock(); |
| Lock parentLock = this.getBucketAdvisor().getParentActiveWriteLock(); |
| if(parentLock!= null){ |
| parentLock.unlock(); |
| } |
| } |
| |
| /** |
| * Release the lock on the bucket that makes the bucket |
| * stay the primary during a write. |
| */ |
| private void endLocalWrite(EntryEventImpl event) { |
| if(!needWriteLock(event)) { |
| return; |
| } |
| |
| |
| doUnlockForPrimary(); |
| |
| Object keys[] = new Object[1]; |
| keys[0] = event.getKey(); |
| removeAndNotifyKeys(keys); |
| } |
| |
| protected boolean needWriteLock(EntryEventImpl event) { |
| return !(event.isOriginRemote() |
| || event.isNetSearch() |
| || event.getOperation().isLocal() |
| || event.getOperation().isPutAll() |
| || event.getOperation().isRemoveAll() |
| || (event.isExpiration() && isEntryEvictDestroyEnabled() |
| || event.isPendingSecondaryExpireDestroy())); |
| } |
| |
| // this is stubbed out because distribution is done in basicPutPart2 while |
| // the region entry is still locked |
| @Override |
| protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) { |
| } |
| |
| // Entry Invalidation rules |
| // If this is a primary for the bucket |
| // 1) apply op locally, aka update entry |
| // 2) distribute op to bucket secondaries and bridge servers with synchrony on local entry |
| // 3) cache listener with synchrony on entry |
| // 4) update local bs, gateway |
| // Else not a primary |
| // 1) apply op locally |
| // 2) update local bs, gateway |
| @Override |
| void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException |
| { |
| basicInvalidate(event, isInitialized(), false); |
| } |
| |
| @Override |
| void basicInvalidate(final EntryEventImpl event, boolean invokeCallbacks, |
| boolean forceNewEntry) |
| throws EntryNotFoundException { |
| // disallow local invalidation |
| Assert.assertTrue(! event.isLocalInvalid()); |
| Assert.assertTrue(!isTX()); |
| Assert.assertTrue(event.getOperation().isDistributed()); |
| |
| beginLocalWrite(event); |
| try { |
| // increment the tailKey so that invalidate operations are written to HDFS |
| if (this.partitionedRegion.hdfsStoreName != null) { |
| /* MergeGemXDHDFSToGFE Disabled this while porting. Is this required? */ |
| //assert this.partitionedRegion.isLocalParallelWanEnabled(); |
| handleWANEvent(event); |
| } |
| // which performs the local op. |
| // The ARM then calls basicInvalidatePart2 with the entry synchronized. |
| if ( !hasSeenEvent(event) ) { |
| if (event.getOperation().isExpiration()) { // bug 39905 - invoke listeners for expiration |
| DistributedSystem sys = cache.getDistributedSystem(); |
| EventID newID = new EventID(sys); |
| event.setEventId(newID); |
| event.setInvokePRCallbacks(getBucketAdvisor().isPrimary()); |
| } |
| boolean forceCallbacks = isEntryEvictDestroyEnabled(); |
| boolean done = this.entries.invalidate(event, invokeCallbacks, forceNewEntry, forceCallbacks); |
| ExpirationAction expirationAction = getEntryExpirationAction(); |
| if (done && !getBucketAdvisor().isPrimary() && expirationAction != null |
| && expirationAction.isInvalidate()) { |
| synchronized(pendingSecondaryExpires) { |
| pendingSecondaryExpires.remove(event.getKey()); |
| } |
| } |
| return; |
| } |
| else { |
| if (logger.isTraceEnabled(LogMarker.DM)) { |
| logger.trace(LogMarker.DM, "LR.basicInvalidate: this cache has already seen this event {}", event); |
| } |
| if (!event.isOriginRemote() |
| && getBucketAdvisor().isPrimary()) { |
| // This cache has processed the event, forward operation |
| // and event messages to backup buckets |
| new InvalidateOperation(event).distribute(); |
| } |
| event.invokeCallbacks(this,true, false); |
| return; |
| } |
| } finally { |
| endLocalWrite(event); |
| } |
| } |
| @Override |
| void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, |
| boolean clearConflict, boolean invokeCallbacks) |
| { |
| // Assumed this is called with the entry synchronized |
| if (!event.isOriginRemote()) { |
| if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) { |
| VersionTag v = re.generateVersionTag(null, false, this, event); |
| if (logger.isDebugEnabled() && v != null) { |
| logger.debug("generated version tag {} in region {}", v, this.getName()); |
| } |
| event.setVersionTag(v); |
| } |
| |
| // This code assumes it is safe ignore token mode (GII in progress) |
| // because it assumes when the origin of the event is local, |
| // the GII has completed and the region is initialized and open for local |
| // ops |
| |
| // This code assumes that this bucket is primary |
| // distribute op to bucket secondaries and event to other listeners |
| InvalidateOperation op = new InvalidateOperation(event); |
| op.distribute(); |
| } |
| super.basicInvalidatePart2(re, event, clearConflict /*Clear conflict occurred */, invokeCallbacks); |
| } |
| |
| @Override |
| void distributeInvalidate(EntryEventImpl event) { |
| } |
| |
| @Override |
| protected void distributeInvalidateRegion(RegionEventImpl event) { |
| // switch region in event so that we can have distributed region |
| // send InvalidateRegion message. |
| event.region = this; |
| super.distributeInvalidateRegion(event); |
| event.region = this.partitionedRegion; |
| } |
| |
| @Override |
| protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) { |
| return getBucketAdvisor().isPrimary(); |
| } |
| |
| @Override |
| protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) { |
| if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag |
| return false; |
| } |
| return this.concurrencyChecksEnabled && ((event.getVersionTag() == null) || event.getVersionTag().isGatewayTag()); |
| } |
| |
| @Override |
| void expireDestroy(EntryEventImpl event, boolean cacheWrite) { |
| |
| /* Early out before we throw a PrimaryBucketException because we're not primary */ |
| if(needWriteLock(event) && !getBucketAdvisor().isPrimary()) { |
| return; |
| } |
| try { |
| super.expireDestroy(event, cacheWrite); |
| return; |
| } catch(PrimaryBucketException e) { |
| //must have concurrently removed the primary |
| return; |
| } |
| } |
| |
| @Override |
| void expireInvalidate(EntryEventImpl event) { |
| if(!getBucketAdvisor().isPrimary()) { |
| return; |
| } |
| try { |
| super.expireInvalidate(event); |
| } catch (PrimaryBucketException e) { |
| //must have concurrently removed the primary |
| } |
| } |
| |
| @Override |
| final void performExpiryTimeout(ExpiryTask p_task) throws CacheException |
| { |
| ExpiryTask task = p_task; |
| boolean isEvictDestroy = isEntryEvictDestroyEnabled(); |
| //Fix for bug 43805 - get the primary lock before |
| //synchronizing on pendingSecondaryExpires, to match the lock |
| //ordering in other place (like acquiredPrimaryLock) |
| lockPrimaryStateReadLock(false); |
| try { |
| // Why do we care if evict destroy is configured? |
| // See bug 41096 for the answer. |
| if(!getBucketAdvisor().isPrimary() && !isEvictDestroy) { |
| synchronized (this.pendingSecondaryExpires) { |
| if (task.isPending()) { |
| Object key = task.getKey(); |
| if (key != null) { |
| this.pendingSecondaryExpires.put(key, task); |
| } |
| } |
| } |
| } else { |
| super.performExpiryTimeout(task); |
| } |
| } finally { |
| doUnlockForPrimary(); |
| } |
| } |
| |
| protected boolean isEntryEvictDestroyEnabled() { |
| return getEvictionAttributes() != null && EvictionAction.LOCAL_DESTROY.equals(getEvictionAttributes().getAction()); |
| } |
| |
| protected final void processPendingSecondaryExpires() |
| { |
| ExpiryTask[] tasks; |
| while (true) { |
| // note we just keep looping until no more pendingExpires exist |
| synchronized (this.pendingSecondaryExpires) { |
| if (this.pendingSecondaryExpires.isEmpty()) { |
| return; |
| } |
| tasks = new ExpiryTask[this.pendingSecondaryExpires.size()]; |
| tasks = this.pendingSecondaryExpires.values().toArray(tasks); |
| this.pendingSecondaryExpires.clear(); |
| } |
| try { |
| if (isCacheClosing() || isClosed() || this.isDestroyed) { |
| return; |
| } |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| for (int i = 0; i < tasks.length; i++) { |
| try { |
| if (isDebugEnabled) { |
| logger.debug("{} fired at {}", tasks[i], System.currentTimeMillis()); |
| } |
| tasks[i].basicPerformTimeout(true); |
| if (isCacheClosing() || isClosed() || isDestroyed()) { |
| return; |
| } |
| } |
| catch (EntryNotFoundException ignore) { |
| // ignore and try the next expiry task |
| } |
| } |
| } |
| catch (RegionDestroyedException re) { |
| // Ignore - our job is done |
| } |
| catch (CancelException ex) { |
| // ignore |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable ex) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.LocalRegion_EXCEPTION_IN_EXPIRATION_TASK), ex); |
| } |
| } |
| } |
| |
| /** |
| * Creates an event for the EVICT_DESTROY operation so that events will fire |
| * for Partitioned Regions. |
| * @param key - the key that this event is related to |
| * @return an event for EVICT_DESTROY |
| */ |
| @Override |
| protected EntryEventImpl generateEvictDestroyEvent(Object key) { |
| EntryEventImpl event = super.generateEvictDestroyEvent(key); |
| event.setInvokePRCallbacks(true); //see bug 40797 |
| return event; |
| } |
| |
| // Entry Destruction rules |
| // If this is a primary for the bucket |
| // 1) apply op locally, aka destroy entry (REMOVED token) |
| // 2) distribute op to bucket secondaries and bridge servers with synchrony on local entry |
| // 3) cache listener with synchrony on local entry |
| // 4) update local bs, gateway |
| // Else not a primary |
| // 1) apply op locally |
| // 2) update local bs, gateway |
| @Override |
| protected |
| void basicDestroy(final EntryEventImpl event, |
| final boolean cacheWrite, |
| Object expectedOldValue) |
| throws EntryNotFoundException, CacheWriterException, TimeoutException { |
| |
| Assert.assertTrue(!isTX()); |
| Assert.assertTrue(event.getOperation().isDistributed()); |
| |
| beginLocalWrite(event); |
| try { |
| // increment the tailKey for the destroy event |
| if (this.partitionedRegion.isParallelWanEnabled()) { |
| handleWANEvent(event); |
| } |
| // In GemFire EVICT_DESTROY is not distributed, so in order to remove the entry |
| // from memory, allow the destroy to proceed. fixes #49784 |
| if (event.isLoadedFromHDFS() && !getBucketAdvisor().isPrimary()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Put the destory event in HDFS queue on secondary " |
| + "and return as event is HDFS loaded " + event); |
| } |
| notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); |
| return; |
| }else{ |
| if (logger.isDebugEnabled()) { |
| logger.debug("Going ahead with the destroy on GemFire system"); |
| } |
| } |
| // This call should invoke AbstractRegionMap (aka ARM) destroy method |
| // which calls the CacheWriter, then performs the local op. |
| // The ARM then calls basicDestroyPart2 with the entry synchronized. |
| if ( !hasSeenEvent(event) ) { |
| if (event.getOperation().isExpiration()) { // bug 39905 - invoke listeners for expiration |
| DistributedSystem sys = cache.getDistributedSystem(); |
| if (event.getEventId() == null) { // Fix for #47388 |
| EventID newID = new EventID(sys); |
| event.setEventId(newID); |
| } |
| event.setInvokePRCallbacks(getBucketAdvisor().isPrimary()); |
| } |
| boolean done = mapDestroy(event, |
| cacheWrite, |
| false, // isEviction //merge44610: In cheetah instead of false event.getOperation().isEviction() is used. We kept the cedar change as it is. |
| expectedOldValue); |
| if(done && !getBucketAdvisor().isPrimary() && isEntryExpiryPossible()) { |
| synchronized(pendingSecondaryExpires) { |
| pendingSecondaryExpires.remove(event.getKey()); |
| } |
| } |
| return; |
| } |
| else { |
| distributeDestroyOperation(event); |
| return; |
| } |
| } finally { |
| endLocalWrite(event); |
| } |
| } |
| |
| protected void distributeDestroyOperation (EntryEventImpl event) { |
| if (logger.isTraceEnabled(LogMarker.DM)) { |
| logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}", event); |
| } |
| if (!event.isOriginRemote() |
| && getBucketAdvisor().isPrimary()) { |
| if (event.isBulkOpInProgress()) { |
| // consolidate the DestroyOperation for each entry into a RemoveAllMessage |
| event.getRemoveAllOperation().addEntry(event, this.getId()); |
| } else { |
| // This cache has processed the event, forward operation |
| // and event messages to backup buckets |
| event.setOldValueFromRegion(); |
| new DestroyOperation(event).distribute(); |
| } |
| } |
| |
| if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later |
| event.invokeCallbacks(this,true, false); |
| } |
| } |
| |
| @Override |
| protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) { |
| // Assumed this is called with entry synchrony |
| if (!event.isOriginRemote() |
| && !event.isBulkOpInProgress() |
| && !event.getOperation().isLocal() |
| && !Operation.EVICT_DESTROY.equals(event.getOperation()) |
| && !(event.isExpiration() && isEntryEvictDestroyEnabled())) { |
| |
| if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) { |
| VersionTag v = entry.generateVersionTag(null, false, this, event); |
| if (logger.isDebugEnabled() && v != null) { |
| logger.debug("generated version tag {} in region {}", v, this.getName()); |
| } |
| } |
| |
| // This code assumes it is safe ignore token mode (GII in progress) |
| // because it assume when the origin of the event is local, |
| // then GII has completed (the region has been completely initialized) |
| |
| // This code assumes that this bucket is primary |
| new DestroyOperation(event).distribute(); |
| } |
| super.basicDestroyBeforeRemoval(entry, event); |
| } |
| |
| @Override |
| void distributeDestroy(EntryEventImpl event, Object expectedOldValue) { |
| } |
| |
| |
| // impl removed - not needed for listener invocation alterations |
| // void basicDestroyPart2(RegionEntry re, EntryEventImpl event, boolean inTokenMode, boolean invokeCallbacks) |
| |
| @Override |
| protected void validateArguments(Object key, Object value, Object aCallbackArgument) |
| { |
| Assert.assertTrue(!isTX()); |
| super.validateArguments(key, value, aCallbackArgument); |
| } |
| |
| public void forceSerialized(EntryEventImpl event) { |
| event.makeSerializedNewValue(); |
| // Object obj = event.getRawNewValue(); |
| // if (obj instanceof byte[] |
| // || obj == null |
| // || obj instanceof CachedDeserializable |
| // || obj == NotAvailable.NOT_AVAILABLE |
| // || Token.isInvalidOrRemoved(obj)) { |
| // // already serialized |
| // return; |
| // } |
| // throw new InternalGemFireError("event did not force serialized: " + event); |
| } |
| |
| /** |
| * This method is called when a miss from a get ends up |
| * finding an object through a cache loader or from a server. |
| * In that case we want to make sure that we don't move |
| * this bucket while putting the value in the ache. |
| * @see LocalRegion#basicPutEntry(EntryEventImpl, long) |
| */ |
| @Override |
| protected RegionEntry basicPutEntry(final EntryEventImpl event, |
| final long lastModified) throws TimeoutException, |
| CacheWriterException { |
| beginLocalWrite(event); |
| try { |
| event.setInvokePRCallbacks(true); |
| forceSerialized(event); |
| return super.basicPutEntry(event, lastModified); |
| } finally { |
| endLocalWrite(event); |
| } |
| } |
| |
| @Override |
| void basicUpdateEntryVersion(EntryEventImpl event) |
| throws EntryNotFoundException { |
| |
| Assert.assertTrue(!isTX()); |
| Assert.assertTrue(event.getOperation().isDistributed()); |
| |
| beginLocalWrite(event); |
| try { |
| |
| if (!hasSeenEvent(event)) { |
| this.entries.updateEntryVersion(event); |
| } else { |
| if (logger.isTraceEnabled(LogMarker.DM)) { |
| logger.trace(LogMarker.DM, "BR.basicUpdateEntryVersion: this cache has already seen this event {}", event); |
| } |
| } |
| if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) { |
| // This cache has processed the event, forward operation |
| // and event messages to backup buckets |
| new UpdateEntryVersionOperation(event).distribute(); |
| } |
| return; |
| } finally { |
| endLocalWrite(event); |
| } |
| } |
| |
| public int getRedundancyLevel() |
| { |
| return this.redundancy; |
| } |
| |
| public boolean isPrimary() { |
| throw new UnsupportedOperationException(LocalizedStrings.BucketRegion_THIS_SHOULD_NEVER_BE_CALLED_ON_0.toLocalizedString(getClass())); |
| } |
| |
| @Override |
| public boolean isDestroyed() { |
| //TODO prpersist - Added this if null check for the partitioned region |
| // because we create the disk store for a bucket *before* in the constructor |
| // for local region, which is before this final field is assigned. This is why |
| // we shouldn't do some much work in the constructors! This is a temporary |
| // hack until I move must of the constructor code to region.initialize. |
| return isBucketDestroyed() |
| || (this.partitionedRegion != null |
| && this.partitionedRegion.isLocallyDestroyed && !isInDestroyingThread()); |
| } |
| |
| /** |
| * Return true if this bucket has been destroyed. |
| * Don't bother checking to see if the PR that owns this bucket was destroyed; |
| * that has already been checked. |
| * @since 6.0 |
| */ |
| public boolean isBucketDestroyed() { |
| return super.isDestroyed(); |
| } |
| |
| @Override |
| public boolean isHDFSRegion() { |
| return this.partitionedRegion.isHDFSRegion(); |
| } |
| |
| @Override |
| public boolean isHDFSReadWriteRegion() { |
| return this.partitionedRegion.isHDFSReadWriteRegion(); |
| } |
| |
| @Override |
| protected boolean isHDFSWriteOnly() { |
| return this.partitionedRegion.isHDFSWriteOnly(); |
| } |
| |
| @Override |
| public int sizeEstimate() { |
| if (isHDFSReadWriteRegion()) { |
| try { |
| checkForPrimary(); |
| ConcurrentParallelGatewaySenderQueue q = getHDFSQueue(); |
| if (q == null) return 0; |
| int hdfsBucketRegionSize = q.getBucketRegionQueue( |
| partitionedRegion, getId()).size(); |
| int hoplogEstimate = (int) getHoplogOrganizer().sizeEstimate(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("for bucket " + getName() + " estimateSize returning " |
| + (hdfsBucketRegionSize + hoplogEstimate)); |
| } |
| return hdfsBucketRegionSize + hoplogEstimate; |
| } catch (ForceReattemptException e) { |
| throw new PrimaryBucketException(e.getLocalizedMessage(), e); |
| } |
| } |
| return size(); |
| } |
| |
| @Override |
| public void checkReadiness() |
| { |
| super.checkReadiness(); |
| if (isDestroyed()) { |
| throw new RegionDestroyedException(toString(), getFullPath()); |
| } |
| } |
| |
| @Override |
| public PartitionedRegion getPartitionedRegion(){ |
| return this.partitionedRegion; |
| } |
| |
| /** |
| * is the current thread involved in destroying the PR that |
| * owns this region? |
| */ |
| private final boolean isInDestroyingThread() { |
| return this.partitionedRegion.locallyDestroyingThread |
| == Thread.currentThread(); |
| } |
| // public int getSerialNumber() { |
| // String s = "This should never be called on " + getClass(); |
| // throw new UnsupportedOperationException(s); |
| // } |
| |
| @Override |
| public void fillInProfile(Profile profile) { |
| super.fillInProfile(profile); |
| BucketProfile bp = (BucketProfile) profile; |
| bp.isInitializing = this.initializationLatchAfterGetInitialImage.getCount() > 0; |
| } |
| |
| /** check to see if the partitioned region is locally destroyed or closed */ |
| public boolean isPartitionedRegionOpen() { |
| return !this.partitionedRegion.isLocallyDestroyed && |
| !this.partitionedRegion.isClosed && !this.partitionedRegion.isDestroyed(); |
| } |
| |
| /** |
| * Horribly plagiarized from the similar method in LocalRegion |
| * |
| * @param key |
| * @param updateStats |
| * @param clientEvent holder for client version tag |
| * @param returnTombstones whether Token.TOMBSTONE should be returned for destroyed entries |
| * @return serialized form if present, null if the entry is not in the cache, |
| * or INVALID or LOCAL_INVALID re is a miss (invalid) |
| * @throws IOException |
| * if there is a serialization problem |
| * see LocalRegion#getDeserializedValue(RegionEntry, KeyInfo, boolean, boolean, boolean, EntryEventImpl, boolean, boolean, boolean) |
| */ |
| private RawValue getSerialized(Object key, boolean updateStats, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) |
| throws EntryNotFoundException, IOException { |
| RegionEntry re = null; |
| if (allowReadFromHDFS) { |
| re = this.entries.getEntry(key); |
| } else { |
| re = this.entries.getOperationalEntryInVM(key); |
| } |
| if (re == null) { |
| return NULLVALUE; |
| } |
| if (re.isTombstone() && !returnTombstones) { |
| return NULLVALUE; |
| } |
| Object v = null; |
| |
| try { |
| v =re.getValue(this); // TODO OFFHEAP: todo v ends up in a RawValue. For now this can be a copy of the offheap onto the heap. But it might be easy to track lifetime of RawValue |
| if(doNotLockEntry) { |
| if(v == Token.NOT_AVAILABLE || v == null) { |
| return REQUIRES_ENTRY_LOCK; |
| } |
| } |
| if (clientEvent != null) { |
| VersionStamp stamp = re.getVersionStamp(); |
| if (stamp != null) { |
| clientEvent.setVersionTag(stamp.asVersionTag()); |
| } |
| } |
| }catch(DiskAccessException dae) { |
| this.handleDiskAccessException(dae); |
| throw dae; |
| } |
| |
| if (v == null) { |
| return NULLVALUE; |
| } else { |
| if (updateStats) { |
| updateStatsForGet(re, true); |
| } |
| return new RawValue(v); |
| } |
| } |
| |
| /** |
| * Return serialized form of an entry |
| * <p> |
| * Horribly plagiarized from the similar method in LocalRegion |
| * |
| * @param keyInfo |
| * @param generateCallbacks |
| * @param clientEvent holder for the entry's version information |
| * @param returnTombstones TODO |
| * @return serialized (byte) form |
| * @throws IOException if the result is not serializable |
| * @see LocalRegion#get(Object, Object, boolean, EntryEventImpl) |
| */ |
| public RawValue getSerialized(KeyInfo keyInfo, boolean generateCallbacks, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws IOException { |
| checkReadiness(); |
| checkForNoAccess(); |
| CachePerfStats stats = getCachePerfStats(); |
| long start = stats.startGet(); |
| |
| boolean miss = true; |
| try { |
| RawValue valueBytes = NULLVALUE; |
| boolean isCreate = false; |
| RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS); |
| isCreate = result == NULLVALUE || (result.getRawValue() == Token.TOMBSTONE && !returnTombstones); |
| miss = (result == NULLVALUE || Token.isInvalid(result.getRawValue())); |
| if (miss) { |
| // if scope is local and there is no loader, then |
| // don't go further to try and get value |
| if (hasServerProxy() || |
| basicGetLoader() != null) { |
| if(doNotLockEntry) { |
| return REQUIRES_ENTRY_LOCK; |
| } |
| // TODO OFFHEAP: optimze |
| Object value = nonTxnFindObject(keyInfo, isCreate, |
| generateCallbacks, result.getRawValue(), true, true, clientEvent, false, allowReadFromHDFS); |
| if (value != null) { |
| result = new RawValue(value); |
| } |
| } |
| else { // local scope with no loader, still might need to update stats |
| if (isCreate) { |
| recordMiss(null, keyInfo.getKey()); |
| } |
| } |
| } |
| return result; // changed in 7.0 to return RawValue(Token.INVALID) if the entry is invalid |
| } |
| finally { |
| stats.endGet(start, miss); |
| } |
| |
| } // getSerialized |
| |
| @Override |
| public String toString() |
| { |
| return new StringBuilder() |
| .append("BucketRegion") |
| .append("[path='").append(getFullPath()) |
| .append(";serial=").append(getSerialNumber()) |
| .append(";primary=").append(getBucketAdvisor().getProxyBucketRegion().isPrimary()) |
| .append(";indexUpdater=").append(getIndexUpdater()) |
| .append("]") |
| .toString(); |
| } |
| |
| @Override |
| protected void distributedRegionCleanup(RegionEventImpl event) |
| { |
| // No need to close advisor, assume its already closed |
| // However we need to remove our listener from the advisor (see bug 43950). |
| this.distAdvisor.removeMembershipListener(this.advisorListener); |
| } |
| |
| /** |
| * Tell the peers that this VM has destroyed the region. |
| * |
| * Also marks the local disk files as to be deleted before |
| * sending the message to peers. |
| * |
| * |
| * @param rebalance true if this is due to a rebalance removing the bucket |
| */ |
| public void removeFromPeersAdvisors(boolean rebalance) { |
| if(getPersistenceAdvisor() != null) { |
| getPersistenceAdvisor().releaseTieLock(); |
| } |
| |
| DiskRegion diskRegion = getDiskRegion(); |
| |
| //Tell our peers whether we are destroying this region |
| //or just closing it. |
| boolean shouldDestroy = rebalance || diskRegion == null |
| || !diskRegion.isRecreated(); |
| Operation op = shouldDestroy ? Operation.REGION_LOCAL_DESTROY |
| : Operation.REGION_CLOSE; |
| |
| RegionEventImpl event = new RegionEventImpl(this, op, null, false, |
| getMyId(), generateEventID()/* generate EventID */); |
| // When destroying the whole partitioned region, there's no need to |
| // distribute the region closure/destruction, the PR RegionAdvisor.close() |
| // has taken care of it |
| if (isPartitionedRegionOpen()) { |
| |
| |
| //Only delete the files on the local disk if |
| //this is a rebalance, or we are creating the bucket |
| //for the first time |
| if (diskRegion != null && shouldDestroy) { |
| diskRegion.beginDestroyDataStorage(); |
| } |
| |
| //Send out the destroy op to peers |
| new DestroyRegionOperation(event, true).distribute(); |
| } |
| } |
| |
| @Override |
| protected void distributeDestroyRegion(RegionEventImpl event, |
| boolean notifyOfRegionDeparture) { |
| //No need to do this when we actually destroy the region, |
| //we already distributed this info. |
| } |
| |
| EntryEventImpl createEventForPR(EntryEventImpl sourceEvent) { |
| EntryEventImpl e2 = new EntryEventImpl(sourceEvent); |
| boolean returned = false; |
| try { |
| e2.setRegion(this.partitionedRegion); |
| if (FORCE_LOCAL_LISTENERS_INVOCATION) { |
| e2.setInvokePRCallbacks(true); |
| } |
| else { |
| e2.setInvokePRCallbacks(sourceEvent.getInvokePRCallbacks()); |
| } |
| DistributedMember dm = this.getDistributionManager().getDistributionManagerId(); |
| e2.setOriginRemote(!e2.getDistributedMember().equals(dm)); |
| returned = true; |
| return e2; |
| } finally { |
| if (!returned) { |
| e2.release(); |
| } |
| } |
| } |
| |
| |
| |
| @Override |
| public void invokeTXCallbacks( |
| final EnumListenerEvent eventType, final EntryEventImpl event, |
| final boolean callDispatchListenerEvent) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("BR.invokeTXCallbacks for event {}", event); |
| } |
| // bucket events may make it to this point even though the bucket is still |
| // initializing. We can't block while initializing or a GII state flush |
| // may hang, so we avoid notifying the bucket |
| if (this.isInitialized()) { |
| boolean callThem = callDispatchListenerEvent; |
| if (event.isPossibleDuplicate() |
| && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { |
| callThem = false; |
| } |
| super.invokeTXCallbacks(eventType, event, callThem); |
| } |
| final EntryEventImpl prevent = createEventForPR(event); |
| try { |
| this.partitionedRegion.invokeTXCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false); |
| } finally { |
| prevent.release(); |
| } |
| } |
| |
| |
| /* (non-Javadoc) |
| * @see com.gemstone.gemfire.internal.cache.LocalRegion#invokeDestroyCallbacks(com.gemstone.gemfire.internal.cache.EnumListenerEvent, com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean) |
| */ |
| @Override |
| public void invokeDestroyCallbacks( |
| final EnumListenerEvent eventType, final EntryEventImpl event, |
| final boolean callDispatchListenerEvent, boolean notifyGateways) |
| { |
| // bucket events may make it to this point even though the bucket is still |
| // initializing. We can't block while initializing or a GII state flush |
| // may hang, so we avoid notifying the bucket |
| if (this.isInitialized()) { |
| boolean callThem = callDispatchListenerEvent; |
| if (event.isPossibleDuplicate() |
| && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { |
| callThem = false; |
| } |
| super.invokeDestroyCallbacks(eventType, event, callThem, notifyGateways); |
| } |
| final EntryEventImpl prevent = createEventForPR(event); |
| try { |
| this.partitionedRegion.invokeDestroyCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false, false); |
| } finally { |
| prevent.release(); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see com.gemstone.gemfire.internal.cache.LocalRegion#invokeInvalidateCallbacks(com.gemstone.gemfire.internal.cache.EnumListenerEvent, com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean) |
| */ |
| @Override |
| public void invokeInvalidateCallbacks( |
| final EnumListenerEvent eventType, final EntryEventImpl event, |
| final boolean callDispatchListenerEvent) |
| { |
| // bucket events may make it to this point even though the bucket is still |
| // initializing. We can't block while initializing or a GII state flush |
| // may hang, so we avoid notifying the bucket |
| if (this.isInitialized()) { |
| boolean callThem = callDispatchListenerEvent; |
| if (event.isPossibleDuplicate() |
| && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { |
| callThem = false; |
| } |
| super.invokeInvalidateCallbacks(eventType, event, callThem); |
| } |
| final EntryEventImpl prevent = createEventForPR(event); |
| try { |
| this.partitionedRegion.invokeInvalidateCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false); |
| } finally { |
| prevent.release(); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see com.gemstone.gemfire.internal.cache.LocalRegion#invokePutCallbacks(com.gemstone.gemfire.internal.cache.EnumListenerEvent, com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean) |
| */ |
| @Override |
| public void invokePutCallbacks( |
| final EnumListenerEvent eventType, final EntryEventImpl event, |
| final boolean callDispatchListenerEvent, boolean notifyGateways) |
| { |
| if (logger.isTraceEnabled()) { |
| logger.trace("invoking put callbacks on bucket for event {}", event); |
| } |
| // bucket events may make it to this point even though the bucket is still |
| // initializing. We can't block while initializing or a GII state flush |
| // may hang, so we avoid notifying the bucket |
| if (this.isInitialized()) { |
| boolean callThem = callDispatchListenerEvent; |
| if (callThem && event.isPossibleDuplicate() |
| && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { |
| callThem = false; |
| } |
| super.invokePutCallbacks(eventType, event, callThem, notifyGateways); |
| } |
| |
| final EntryEventImpl prevent = createEventForPR(event); |
| try { |
| this.partitionedRegion.invokePutCallbacks(eventType, prevent, |
| this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false, false); |
| } finally { |
| prevent.release(); |
| } |
| } |
| |
| /** |
| * perform adjunct messaging for the given operation and return a set of |
| * members that should be attached to the operation's reply processor (if any) |
| * @param event the event causing this messaging |
| * @param cacheOpRecipients set of receiver which got cacheUpdateOperation. |
| * @param adjunctRecipients recipients that must unconditionally get the event |
| * @param filterRoutingInfo routing information for all members having the region |
| * @param processor the reply processor, or null if there isn't one |
| * @return the set of failed recipients |
| */ |
| protected Set performAdjunctMessaging(EntryEventImpl event, |
| Set cacheOpRecipients, Set adjunctRecipients, |
| FilterRoutingInfo filterRoutingInfo, |
| DirectReplyProcessor processor, |
| boolean calculateDelta, |
| boolean sendDeltaWithFullValue) { |
| |
| Set failures = Collections.EMPTY_SET; |
| PartitionMessage msg = event.getPartitionMessage(); |
| if (calculateDelta) { |
| setDeltaIfNeeded(event); |
| } |
| if (msg != null) { |
| // The primary bucket member which is being modified remotely by a GemFire |
| // thread via a received PartitionedMessage |
| //Asif: Some of the adjunct recepients include those members which |
| // are sqlFabricHub & would need old value along with news |
| msg = msg.getMessageForRelayToListeners(event, adjunctRecipients); |
| msg.setSender(this.partitionedRegion.getDistributionManager() |
| .getDistributionManagerId()); |
| msg.setSendDeltaWithFullValue(sendDeltaWithFullValue); |
| |
| failures = msg.relayToListeners(cacheOpRecipients, adjunctRecipients, |
| filterRoutingInfo, event, this.partitionedRegion, processor); |
| } |
| else { |
| // The primary bucket is being modified locally by an application thread locally |
| Operation op = event.getOperation(); |
| if (op.isCreate() || op.isUpdate()) { |
| // note that at this point ifNew/ifOld have been used to update the |
| // local store, and the event operation should be correct |
| failures = PutMessage.notifyListeners(cacheOpRecipients, |
| adjunctRecipients, filterRoutingInfo, this.partitionedRegion, |
| event, op.isCreate(), !op.isCreate(), processor, |
| sendDeltaWithFullValue); |
| } |
| else if (op.isDestroy()) { |
| failures = DestroyMessage.notifyListeners(cacheOpRecipients, |
| adjunctRecipients, filterRoutingInfo, |
| this.partitionedRegion, event, processor); |
| } |
| else if (op.isInvalidate()) { |
| failures = InvalidateMessage.notifyListeners(cacheOpRecipients, |
| adjunctRecipients, filterRoutingInfo, |
| this.partitionedRegion, event, processor); |
| } |
| else { |
| failures = adjunctRecipients; |
| } |
| } |
| return failures; |
| } |
| |
| private void setDeltaIfNeeded(EntryEventImpl event) { |
| if (this.partitionedRegion.getSystem().getConfig().getDeltaPropagation() |
| && event.getOperation().isUpdate() && event.getDeltaBytes() == null) { |
| @Unretained Object rawNewValue = event.getRawNewValue(); |
| if (!(rawNewValue instanceof CachedDeserializable)) { |
| return; |
| } |
| if (rawNewValue instanceof StoredObject && !((StoredObject) rawNewValue).isSerialized()) { |
| // it is a byte[]; not a Delta |
| return; |
| } |
| Object instance = ((CachedDeserializable)rawNewValue).getValue(); |
| if (instance instanceof com.gemstone.gemfire.Delta |
| && ((com.gemstone.gemfire.Delta)instance).hasDelta()) { |
| try { |
| HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); |
| long start = DistributionStats.getStatTime(); |
| ((com.gemstone.gemfire.Delta)instance).toDelta(hdos); |
| event.setDeltaBytes(hdos.toByteArray()); |
| this.partitionedRegion.getCachePerfStats().endDeltaPrepared(start); |
| } |
| catch (RuntimeException re) { |
| throw re; |
| } |
| catch (Exception e) { |
| throw new DeltaSerializationException( |
| LocalizedStrings.DistributionManager_CAUGHT_EXCEPTION_WHILE_SENDING_DELTA |
| .toLocalizedString(), e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * create a PutAllPRMessage for notify-only and send it to all adjunct nodes. |
| * return a set of members that should be attached to the operation's reply processor (if any) |
| * @param dpao DistributedPutAllOperation object for PutAllMessage |
| * @param cacheOpRecipients set of receiver which got cacheUpdateOperation. |
| * @param adjunctRecipients recipients that must unconditionally get the event |
| * @param filterRoutingInfo routing information for all members having the region |
| * @param processor the reply processor, or null if there isn't one |
| * @return the set of failed recipients |
| */ |
| public Set performPutAllAdjunctMessaging(DistributedPutAllOperation dpao, |
| Set cacheOpRecipients, Set adjunctRecipients, FilterRoutingInfo filterRoutingInfo, |
| DirectReplyProcessor processor) { |
| // create a PutAllPRMessage out of PutAllMessage to send to adjunct nodes |
| PutAllPRMessage prMsg = dpao.createPRMessagesNotifyOnly(getId()); |
| prMsg.initMessage(this.partitionedRegion, adjunctRecipients, true, processor); |
| prMsg.setSender(this.partitionedRegion.getDistributionManager() |
| .getDistributionManagerId()); |
| |
| // find members who have clients subscribed to this event and add them |
| // to the recipients list. Also determine if there are any FilterInfo |
| // routing tables for any of the receivers |
| // boolean anyWithRouting = false; |
| Set recipients = null; |
| Set membersWithRouting = filterRoutingInfo.getMembers(); |
| for (Iterator it=membersWithRouting.iterator(); it.hasNext(); ) { |
| Object mbr = it.next(); |
| if (!cacheOpRecipients.contains(mbr)) { |
| // anyWithRouting = true; |
| if (!adjunctRecipients.contains(mbr)) { |
| if (recipients == null) { |
| recipients = new HashSet(); |
| recipients.add(mbr); |
| } |
| } |
| } |
| } |
| if (recipients == null) { |
| recipients = adjunctRecipients; |
| } else { |
| recipients.addAll(adjunctRecipients); |
| } |
| |
| // Set failures = Collections.EMPTY_SET; |
| |
| // if (!anyWithRouting) { |
| Set failures = this.partitionedRegion.getDistributionManager().putOutgoing(prMsg); |
| |
| // } else { |
| // // Send message to each member. We set a FilterRoutingInfo serialization |
| // // target so that serialization of the PutAllData objects held in the |
| // // message will only serialize the routing entry for the message recipient |
| // Iterator rIter = recipients.iterator(); |
| // failures = new HashSet(); |
| // while (rIter.hasNext()){ |
| // InternalDistributedMember member = (InternalDistributedMember)rIter.next(); |
| // FilterRoutingInfo.setSerializationTarget(member); |
| // try { |
| // prMsg.resetRecipients(); |
| // prMsg.setRecipient(member); |
| // Set fs = this.partitionedRegion.getDistributionManager().putOutgoing(prMsg); |
| // if (fs != null && !fs.isEmpty()) { |
| // failures.addAll(fs); |
| // } |
| // } finally { |
| // FilterRoutingInfo.clearSerializationTarget(); |
| // } |
| // } |
| // } |
| |
| return failures; |
| } |
| |
| /** |
| * create a RemoveAllPRMessage for notify-only and send it to all adjunct nodes. |
| * return a set of members that should be attached to the operation's reply processor (if any) |
| * @param op DistributedRemoveAllOperation object for RemoveAllMessage |
| * @param cacheOpRecipients set of receiver which got cacheUpdateOperation. |
| * @param adjunctRecipients recipients that must unconditionally get the event |
| * @param filterRoutingInfo routing information for all members having the region |
| * @param processor the reply processor, or null if there isn't one |
| * @return the set of failed recipients |
| */ |
| public Set performRemoveAllAdjunctMessaging(DistributedRemoveAllOperation op, |
| Set cacheOpRecipients, Set adjunctRecipients, FilterRoutingInfo filterRoutingInfo, |
| DirectReplyProcessor processor) { |
| // create a RemoveAllPRMessage out of RemoveAllMessage to send to adjunct nodes |
| RemoveAllPRMessage prMsg = op.createPRMessagesNotifyOnly(getId()); |
| prMsg.initMessage(this.partitionedRegion, adjunctRecipients, true, processor); |
| prMsg.setSender(this.partitionedRegion.getDistributionManager() |
| .getDistributionManagerId()); |
| |
| // find members who have clients subscribed to this event and add them |
| // to the recipients list. Also determine if there are any FilterInfo |
| // routing tables for any of the receivers |
| Set recipients = null; |
| Set membersWithRouting = filterRoutingInfo.getMembers(); |
| for (Iterator it=membersWithRouting.iterator(); it.hasNext(); ) { |
| Object mbr = it.next(); |
| if (!cacheOpRecipients.contains(mbr)) { |
| // anyWithRouting = true; |
| if (!adjunctRecipients.contains(mbr)) { |
| if (recipients == null) { |
| recipients = new HashSet(); |
| recipients.add(mbr); |
| } |
| } |
| } |
| } |
| if (recipients == null) { |
| recipients = adjunctRecipients; |
| } else { |
| recipients.addAll(adjunctRecipients); |
| } |
| |
| Set failures = this.partitionedRegion.getDistributionManager().putOutgoing(prMsg); |
| return failures; |
| } |
| |
| /** |
| * return the set of recipients for adjunct operations |
| */ |
| protected Set getAdjunctReceivers(EntryEventImpl event, Set cacheOpReceivers, |
| Set twoMessages, FilterRoutingInfo routing) { |
| Operation op = event.getOperation(); |
| if (op.isUpdate() || op.isCreate() || op.isDestroy() || op.isInvalidate()) { |
| // this method can safely assume that the operation is being distributed from |
| // the primary bucket holder to other nodes |
| Set r = this.partitionedRegion.getRegionAdvisor() |
| .adviseRequiresNotification(event); |
| |
| if (r.size() > 0) { |
| r.removeAll(cacheOpReceivers); |
| } |
| |
| // buckets that are initializing may transition out of token mode during |
| // message transmission and need both cache-op and adjunct messages to |
| // ensure that listeners are invoked |
| if (twoMessages.size() > 0) { |
| if (r.size() == 0) { // can't add to Collections.EMPTY_SET |
| r = twoMessages; |
| } |
| else { |
| r.addAll(twoMessages); |
| } |
| } |
| if (routing != null) { |
| // add adjunct messages to members with client routings |
| for (InternalDistributedMember id: routing.getMembers()) { |
| if (!cacheOpReceivers.contains(id)) { |
| if (r.isEmpty()) { |
| r = new HashSet(); |
| } |
| r.add(id); |
| } |
| } |
| } |
| return r; |
| } |
| else { |
| return Collections.EMPTY_SET; |
| } |
| } |
| |
| public final int getId() { |
| return getBucketAdvisor().getProxyBucketRegion().getId(); |
| } |
| |
| @Override |
| protected void cacheWriteBeforePut(EntryEventImpl event, Set netWriteRecipients, |
| CacheWriter localWriter, |
| boolean requireOldValue, Object expectedOldValue) |
| throws CacheWriterException, TimeoutException { |
| |
| boolean origRemoteState = false; |
| try { |
| if (event.getPartitionMessage() != null || event.hasClientOrigin()) { |
| origRemoteState=event.isOriginRemote(); |
| event.setOriginRemote(true); |
| } |
| event.setRegion(this.partitionedRegion); |
| this.partitionedRegion.cacheWriteBeforePut(event, netWriteRecipients, |
| localWriter, requireOldValue, expectedOldValue); |
| } finally { |
| if (event.getPartitionMessage() != null || event.hasClientOrigin()) { |
| event.setOriginRemote(origRemoteState); |
| } |
| event.setRegion(this); |
| } |
| } |
| |
| @Override |
| boolean cacheWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue) |
| throws CacheWriterException, EntryNotFoundException, TimeoutException { |
| |
| boolean origRemoteState = false; |
| boolean ret = false; |
| try { |
| if (event.getPartitionMessage() != null || event.hasClientOrigin()) { |
| origRemoteState=event.isOriginRemote(); |
| event.setOriginRemote(true); |
| } |
| event.setRegion(this.partitionedRegion); |
| ret = this.partitionedRegion.cacheWriteBeforeDestroy(event, expectedOldValue); |
| } finally { |
| if (event.getPartitionMessage() != null || event.hasClientOrigin()) { |
| event.setOriginRemote(origRemoteState); |
| } |
| event.setRegion(this); |
| } |
| return ret; |
| // return super.cacheWriteBeforeDestroy(event); |
| } |
| |
| @Override |
| public CacheWriter basicGetWriter() { |
| return this.partitionedRegion.basicGetWriter(); |
| } |
| @Override |
| void cleanUpOnIncompleteOp(EntryEventImpl event, RegionEntry re, |
| boolean eventRecorded, boolean updateStats, boolean isReplace) { |
| |
| |
| if(!eventRecorded || isReplace) { |
| //No indexes updated so safe to remove. |
| this.entries.removeEntry(event.getKey(), re, updateStats) ; |
| }/*else { |
| //if event recorded is true, that means as per event tracker entry is in |
| //system. As per sqlfabric, indexes have been updated. What is not done |
| // is basicPutPart2( distribution etc). So we do nothing as PR's re-attempt |
| // will do the required basicPutPart2. If we remove the entry here, than |
| //event tracker will not allow re insertion. So either we do nothing or |
| //if we remove ,than we have to update sqlfindexes as well as undo recording |
| // of event. |
| //TODO:OQL indexes? : Hope they get updated during retry. The issue is that oql indexes |
| // get updated after distribute , so it is entirely possible that oql index are |
| // not updated. what if retry fails? |
| |
| }*/ |
| } |
| |
| /* (non-Javadoc) |
| * @see com.gemstone.gemfire.internal.cache.partitioned.Bucket#getBucketOwners() |
| * @since gemfire59poc |
| */ |
| public Set getBucketOwners() { |
| return getBucketAdvisor().getProxyBucketRegion().getBucketOwners(); |
| } |
| |
| public long getCounter() { |
| return counter.get(); |
| } |
| |
| public void setCounter(AtomicLong counter) { |
| this.counter = counter; |
| } |
| |
| public void updateCounter(long delta) { |
| if (delta != 0) { |
| this.counter.getAndAdd(delta); |
| } |
| } |
| |
| public void resetCounter() { |
| if (this.counter.get() != 0) { |
| this.counter.set(0); |
| } |
| } |
| |
| public long getLimit() { |
| if (this.limit == null) { |
| return 0; |
| } |
| return limit.get(); |
| } |
| |
| public void setLimit(long limit) { |
| // This method can be called before object of this class is created |
| if (this.limit == null) { |
| this.limit = new AtomicLong(); |
| } |
| this.limit.set(limit); |
| } |
| |
| static int calcMemSize(Object value) { |
| if (value != null && (value instanceof GatewaySenderEventImpl)) { |
| return ((GatewaySenderEventImpl)value).getSerializedValueSize(); |
| } |
| if (value == null || value instanceof Token) { |
| return 0; |
| } |
| if (!(value instanceof byte[]) && !(value instanceof CachedDeserializable) |
| && !(value instanceof com.gemstone.gemfire.Delta) && !(value instanceof Delta)) { |
| // ezoerner:20090401 it's possible this value is a Delta |
| throw new InternalGemFireError("DEBUG: calcMemSize: weird value (class " |
| + value.getClass() + "): " + value); |
| } |
| |
| try { |
| return CachedDeserializableFactory.calcMemSize(value); |
| } catch (IllegalArgumentException e) { |
| return 0; |
| } |
| } |
| |
| boolean isDestroyingDiskRegion; |
| |
| @Override |
| protected void updateSizeOnClearRegion(int sizeBeforeClear) { |
| // This method is only called when the bucket is destroyed. If we |
| // start supporting clear of partitioned regions, this logic needs to change |
| // we can't just set these counters to zero, because there could be |
| // concurrent operations that are also updating these stats. For example, |
| //a destroy could have already been applied to the map, and then updates |
| //the stat after we reset it, making the state negative. |
| |
| final PartitionedRegionDataStore prDs = this.partitionedRegion.getDataStore(); |
| long oldMemValue; |
| |
| if(this.isDestroyed || this.isDestroyingDiskRegion) { |
| //If this region is destroyed, mark the stat as destroyed. |
| oldMemValue = this.bytesInMemory.getAndSet(BUCKET_DESTROYED); |
| |
| } else if(!this.isInitialized()) { |
| //This case is rather special. We clear the region if the GII failed. |
| //In the case of bucket regions, we know that there will be no concurrent operations |
| //if GII has failed, because there is not primary. So it's safe to set these |
| //counters to 0. |
| oldMemValue = this.bytesInMemory.getAndSet(0); |
| } |
| // Gemfire PRs don't support clear. allowing it via a hack for tests |
| else if (LocalRegion.simulateClearForTests) { |
| oldMemValue = this.bytesInMemory.getAndSet(0); |
| } |
| else { |
| throw new InternalGemFireError("Trying to clear a bucket region that was not destroyed or in initialization."); |
| } |
| if(oldMemValue != BUCKET_DESTROYED) { |
| this.partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear); |
| prDs.updateMemoryStats(-oldMemValue); |
| } |
| } |
| |
| @Override |
| public int calculateValueSize(Object val) { |
| // Only needed by BucketRegion |
| return calcMemSize(val); |
| } |
| @Override |
| public int calculateRegionEntryValueSize(RegionEntry re) { |
| return calcMemSize(re._getValue()); // OFFHEAP _getValue ok |
| } |
| |
| @Override |
| void updateSizeOnPut(Object key, int oldSize, int newSize) { |
| updateBucket2Size(oldSize, newSize, SizeOp.UPDATE); |
| } |
| |
| @Override |
| void updateSizeOnCreate(Object key, int newSize) { |
| this.partitionedRegion.getPrStats().incDataStoreEntryCount(1); |
| updateBucket2Size(0, newSize, SizeOp.CREATE); |
| } |
| |
| @Override |
| void updateSizeOnRemove(Object key, int oldSize) { |
| this.partitionedRegion.getPrStats().incDataStoreEntryCount(-1); |
| updateBucket2Size(oldSize, 0, SizeOp.DESTROY); |
| } |
| |
| @Override |
| int updateSizeOnEvict(Object key, int oldSize) { |
| int newDiskSize = oldSize; |
| updateBucket2Size(oldSize, newDiskSize, SizeOp.EVICT); |
| return newDiskSize; |
| } |
| |
| @Override |
| public void updateSizeOnFaultIn(Object key, int newMemSize, int oldDiskSize) { |
| updateBucket2Size(oldDiskSize, newMemSize, SizeOp.FAULT_IN); |
| } |
| |
| @Override |
| public void initializeStats(long numEntriesInVM, long numOverflowOnDisk, |
| long numOverflowBytesOnDisk) { |
| super.initializeStats(numEntriesInVM, numOverflowOnDisk, numOverflowBytesOnDisk); |
| incNumEntriesInVM(numEntriesInVM); |
| incNumOverflowOnDisk(numOverflowOnDisk); |
| incNumOverflowBytesOnDisk(numOverflowBytesOnDisk); |
| } |
| |
| @Override |
| protected void setMemoryThresholdFlag(MemoryEvent event) { |
| Assert.assertTrue(false); |
| //Bucket regions are not registered with ResourceListener, |
| //and should not get this event |
| } |
| |
| @Override |
| public void initialCriticalMembers(boolean localHeapIsCritical, |
| Set<InternalDistributedMember> critialMembers) { |
| // The owner Partitioned Region handles critical threshold events |
| } |
| |
| @Override |
| protected void closeCallbacksExceptListener() { |
| //closeCacheCallback(getCacheLoader()); - fix bug 40228 - do NOT close loader |
| closeCacheCallback(getCacheWriter()); |
| closeCacheCallback(getEvictionController()); |
| } |
| |
| public long getTotalBytes() { |
| long result = this.bytesInMemory.get(); |
| if(result == BUCKET_DESTROYED) { |
| return 0; |
| } |
| result += getNumOverflowBytesOnDisk(); |
| return result; |
| } |
| |
| public void preDestroyBucket(int bucketId) { |
| final IndexUpdater indexUpdater = getIndexUpdater(); |
| if (indexUpdater != null) { |
| indexUpdater.clearIndexes(this, bucketId); |
| } |
| } |
| @Override |
| public void cleanupFailedInitialization() |
| { |
| this.preDestroyBucket(this.getId()); |
| super.cleanupFailedInitialization(); |
| } |
| |
| protected void invokePartitionListenerAfterBucketRemoved() { |
| PartitionListener[] partitionListeners = getPartitionedRegion().getPartitionListeners(); |
| if (partitionListeners == null || partitionListeners.length == 0) { |
| return; |
| } |
| for (int i = 0; i < partitionListeners.length; i++) { |
| PartitionListener listener = partitionListeners[i]; |
| if (listener != null) { |
| listener.afterBucketRemoved(getId(), keySet()); |
| } |
| } |
| } |
| |
| protected void invokePartitionListenerAfterBucketCreated() { |
| PartitionListener[] partitionListeners = getPartitionedRegion().getPartitionListeners(); |
| if (partitionListeners == null || partitionListeners.length == 0) { |
| return; |
| } |
| for (int i = 0; i < partitionListeners.length; i++) { |
| PartitionListener listener = partitionListeners[i]; |
| if (listener != null) { |
| listener.afterBucketCreated(getId(), keySet()); |
| } |
| } |
| } |
| |
| enum SizeOp { |
| UPDATE, CREATE, DESTROY, EVICT, FAULT_IN; |
| |
| int computeMemoryDelta(int oldSize, int newSize) { |
| switch (this) { |
| case CREATE: |
| return newSize; |
| case DESTROY: |
| return - oldSize; |
| case UPDATE: |
| return newSize - oldSize; |
| case EVICT: |
| return - oldSize; |
| case FAULT_IN: |
| return newSize; |
| default: |
| throw new AssertionError("unhandled sizeOp: " + this); |
| } |
| } |
| }; |
| |
| /** |
| * Updates the bucket size. |
| */ |
| void updateBucket2Size(int oldSize, int newSize, |
| SizeOp op) { |
| |
| final int memoryDelta = op.computeMemoryDelta(oldSize, newSize); |
| |
| if (memoryDelta == 0) return; |
| // do the bigger one first to keep the sum > 0 |
| updateBucketMemoryStats(memoryDelta); |
| } |
| |
| void updateBucketMemoryStats(final int memoryDelta) { |
| if (memoryDelta != 0) { |
| |
| final long bSize = bytesInMemory.compareAddAndGet(BUCKET_DESTROYED, memoryDelta); |
| if(bSize == BUCKET_DESTROYED) { |
| return; |
| } |
| |
| if (bSize < 0 && getCancelCriterion().cancelInProgress() == null) { |
| throw new InternalGemFireError("Bucket " + this + " size (" + |
| bSize + ") negative after applying delta of " + memoryDelta); |
| } |
| } |
| |
| final PartitionedRegionDataStore prDS = this.partitionedRegion.getDataStore(); |
| prDS.updateMemoryStats(memoryDelta); |
| } |
| |
| /** |
| * Returns the current number of entries whose value has been |
| * overflowed to disk by this bucket.This value will decrease when a value is |
| * faulted in. |
| */ |
| public long getNumOverflowOnDisk() { |
| return this.numOverflowOnDisk.get(); |
| } |
| |
| public long getNumOverflowBytesOnDisk() { |
| return this.numOverflowBytesOnDisk.get(); |
| } |
| |
| /** |
| * Returns the current number of entries whose value resides in the |
| * VM for this bucket. This value will decrease when the entry is overflowed to |
| * disk. |
| */ |
| public long getNumEntriesInVM() { |
| return this.numEntriesInVM.get(); |
| } |
| |
| /** |
| * Increments the current number of entries whose value has been |
| * overflowed to disk by this bucket, by a given amount. |
| */ |
| void incNumOverflowOnDisk(long delta) { |
| this.numOverflowOnDisk.addAndGet(delta); |
| } |
| |
| void incNumOverflowBytesOnDisk(long delta) { |
| if (delta == 0) return; |
| this.numOverflowBytesOnDisk.addAndGet(delta); |
| // The following could be reenabled at a future time. |
| // I deadcoded for now to make sure I didn't have it break |
| // the last 6.5 regression. |
| // It is possible that numOverflowBytesOnDisk might go negative |
| // for a short period of time if a decrement ever happens before |
| // its corresponding increment. |
| // if (res < 0) { |
| // throw new IllegalStateException("numOverflowBytesOnDisk < 0 " + res); |
| // } |
| } |
| |
| /** |
| * Increments the current number of entries whose value has been |
| * overflowed to disk by this bucket,by a given amount. |
| */ |
| void incNumEntriesInVM(long delta) { |
| this.numEntriesInVM.addAndGet(delta); |
| } |
| |
| public void incEvictions(long delta ) { |
| this.evictions.getAndAdd(delta); |
| } |
| |
| public long getEvictions( ) { |
| return this.evictions.get(); |
| } |
| |
| @Override |
| protected boolean isMemoryThresholdReachedForLoad() { |
| return getBucketAdvisor().getProxyBucketRegion().isBucketSick(); |
| } |
| public int getSizeForEviction() { |
| EvictionAttributes ea = this.getAttributes().getEvictionAttributes(); |
| if (ea == null) |
| return 0; |
| EvictionAlgorithm algo = ea.getAlgorithm(); |
| if (!algo.isLRUHeap()) |
| return 0; |
| EvictionAction action = ea.getAction(); |
| int size = action.isLocalDestroy() ? this.getRegionMap().sizeInVM() : (int)this |
| .getNumEntriesInVM(); |
| return size; |
| } |
| @Override |
| public HashMap getDestroyedSubregionSerialNumbers() { |
| return new HashMap(0); |
| } |
| |
| @Override |
| public FilterProfile getFilterProfile(){ |
| return this.partitionedRegion.getFilterProfile(); |
| } |
| |
| @Override |
| protected void generateLocalFilterRouting(InternalCacheEvent event) { |
| if (event.getLocalFilterInfo() == null) { |
| super.generateLocalFilterRouting(event); |
| } |
| } |
| |
| public void beforeAcquiringPrimaryState() { |
| try { |
| createHoplogOrganizer(); |
| } catch (IOException e) { |
| // 48990: when HDFS was down, gemfirexd should still start normally |
| logger.warn(LocalizedStrings.HOPLOG_NOT_STARTED_YET, e); |
| } catch(Throwable e) { |
| /*MergeGemXDHDFSToGFE changed this code to checkReadiness*/ |
| // SystemFailure.checkThrowable(e); |
| this.checkReadiness(); |
| //49333 - no matter what, we should elect a primary. |
| logger.error(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION, e); |
| } |
| } |
| |
| public HoplogOrganizer<?> createHoplogOrganizer() throws IOException { |
| if (getPartitionedRegion().isHDFSRegion()) { |
| HoplogOrganizer<?> organizer = hoplog.get(); |
| if (organizer != null) { |
| // hoplog is recreated by anther thread |
| return organizer; |
| } |
| |
| HoplogOrganizer hdfs = hoplog.getAndSet(getPartitionedRegion().hdfsManager.create(getId())); |
| assert hdfs == null; |
| return hoplog.get(); |
| } else { |
| return null; |
| } |
| } |
| |
| public void afterAcquiringPrimaryState() { |
| |
| } |
| /** |
| * Invoked when a primary bucket is demoted. |
| */ |
| public void beforeReleasingPrimaryLockDuringDemotion() { |
| releaseHoplogOrganizer(); |
| } |
| |
| protected void releaseHoplogOrganizer() { |
| // release resources during a clean transition |
| HoplogOrganizer hdfs = hoplog.getAndSet(null); |
| if (hdfs != null) { |
| getPartitionedRegion().hdfsManager.close(getId()); |
| } |
| } |
| |
| public HoplogOrganizer<?> getHoplogOrganizer() throws HDFSIOException { |
| HoplogOrganizer<?> organizer = hoplog.get(); |
| if (organizer == null) { |
| synchronized (getBucketAdvisor()) { |
| checkForPrimary(); |
| try { |
| organizer = createHoplogOrganizer(); |
| } catch (IOException e) { |
| throw new HDFSIOException("Failed to create Hoplog organizer due to ", e); |
| } |
| if (organizer == null) { |
| throw new HDFSIOException("Hoplog organizer is not available for " + this); |
| } |
| } |
| } |
| return organizer; |
| } |
| |
| @Override |
| public RegionAttributes getAttributes() { |
| return this; |
| } |
| |
| @Override |
| public void hdfsCalled(Object key) { |
| this.partitionedRegion.hdfsCalled(key); |
| } |
| |
| @Override |
| protected void clearHDFSData() { |
| //clear the HDFS data if present |
| if (getPartitionedRegion().isHDFSReadWriteRegion()) { |
| // Clear the queue |
| ConcurrentParallelGatewaySenderQueue q = getHDFSQueue(); |
| if (q == null) return; |
| q.clear(getPartitionedRegion(), this.getId()); |
| HoplogOrganizer organizer = hoplog.get(); |
| if (organizer != null) { |
| try { |
| organizer.clear(); |
| } catch (IOException e) { |
| throw new GemFireIOException(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA.toLocalizedString(), e); |
| } |
| } |
| } |
| } |
| |
| public EvictionCriteria getEvictionCriteria() { |
| return this.partitionedRegion.getEvictionCriteria(); |
| } |
| |
| public CustomEvictionAttributes getCustomEvictionAttributes() { |
| return this.partitionedRegion.getCustomEvictionAttributes(); |
| } |
| |
| /** |
| * @return true if the evict destroy was done; false if it was not needed |
| */ |
| public boolean customEvictDestroy(Object key) |
| { |
| checkReadiness(); |
| final EntryEventImpl event = |
| generateCustomEvictDestroyEvent(key); |
| event.setCustomEviction(true); |
| boolean locked = false; |
| try { |
| locked = beginLocalWrite(event); |
| return mapDestroy(event, |
| false, // cacheWrite |
| true, // isEviction |
| null); // expectedOldValue |
| } |
| catch (CacheWriterException error) { |
| throw new Error(LocalizedStrings.LocalRegion_CACHE_WRITER_SHOULD_NOT_HAVE_BEEN_CALLED_FOR_EVICTDESTROY.toLocalizedString(), error); |
| } |
| catch (TimeoutException anotherError) { |
| throw new Error(LocalizedStrings.LocalRegion_NO_DISTRIBUTED_LOCK_SHOULD_HAVE_BEEN_ATTEMPTED_FOR_EVICTDESTROY.toLocalizedString(), anotherError); |
| } |
| catch (EntryNotFoundException yetAnotherError) { |
| throw new Error(LocalizedStrings.LocalRegion_ENTRYNOTFOUNDEXCEPTION_SHOULD_BE_MASKED_FOR_EVICTDESTROY.toLocalizedString(), yetAnotherError); |
| } finally { |
| if (locked) { |
| endLocalWrite(event); |
| } |
| event.release(); |
| } |
| } |
| |
| public boolean areSecondariesPingable() { |
| |
| Set<InternalDistributedMember> hostingservers = this.partitionedRegion.getRegionAdvisor() |
| .getBucketOwners(this.getId()); |
| hostingservers.remove(cache.getDistributedSystem().getDistributedMember()); |
| |
| if (cache.getLoggerI18n().fineEnabled()) |
| cache.getLoggerI18n().fine("Pinging secondaries of bucket " + this.getId() + " on servers " + hostingservers); |
| |
| if (hostingservers.size() == 0) |
| return true; |
| |
| return ServerPingMessage.send(cache, hostingservers); |
| |
| } |
| |
| @Override |
| public boolean notifiesMultipleSerialGateways() { |
| return getPartitionedRegion().notifiesMultipleSerialGateways(); |
| } |
| |
| } |
| |