| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.cache.CacheWriterException; |
| import com.gemstone.gemfire.cache.EntryNotFoundException; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.TimeoutException; |
| import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; |
| import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; |
| import com.gemstone.gemfire.internal.cache.lru.LRUStatistics; |
| import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; |
| import com.gemstone.gemfire.internal.cache.versions.VersionSource; |
| import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; |
| import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; |
| import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; |
| import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper; |
| |
| public abstract class AbstractBucketRegionQueue extends BucketRegion { |
| protected static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * The maximum size of this single queue before we start blocking puts |
| * The system property is in megabytes. |
| */ |
| private final long maximumSize = 1024 * 1024 * Long.getLong("gemfire.GATEWAY_QUEUE_THROTTLE_SIZE_MB", -1); |
| private final long throttleTime = Long.getLong("gemfire.GATEWAY_QUEUE_THROTTLE_TIME_MS", 100); |
| |
| private final LRUStatistics stats; |
| |
| private final ReentrantReadWriteLock initializationLock = new ReentrantReadWriteLock(); |
| |
| private final GatewaySenderStats gatewaySenderStats; |
| |
| protected volatile boolean initialized = false; |
| |
| /** |
| * Holds keys for those events that were not found in BucketRegionQueue during |
| * processing of ParallelQueueRemovalMessage. This can occur due to the scenario |
| * mentioned in #49196. |
| */ |
| private final ConcurrentHashSet<Object> failedBatchRemovalMessageKeys = |
| new ConcurrentHashSet<Object>(); |
| |
| public AbstractBucketRegionQueue(String regionName, RegionAttributes attrs, |
| LocalRegion parentRegion, GemFireCacheImpl cache, |
| InternalRegionArguments internalRegionArgs) { |
| super(regionName, attrs, parentRegion, cache, internalRegionArgs); |
| this.stats = ((AbstractLRURegionMap) getRegionMap()).getLRUStatistics(); |
| gatewaySenderStats = this.getPartitionedRegion().getParallelGatewaySender() |
| .getStatistics(); |
| } |
| |
| //Prevent this region from using concurrency checks |
| @Override |
| public boolean supportsConcurrencyChecks() { |
| return false; |
| } |
| |
| protected void waitIfQueueFull() { |
| if (maximumSize <= 0) { |
| return; |
| } |
| |
| // Make the put block if the queue has reached the maximum size |
| // If the queue is over the maximum size, the put will wait for |
| // the given throttle time until there is space in the queue |
| if (stats.getCounter() > maximumSize) { |
| try { |
| synchronized (this.stats) { |
| this.stats.wait(throttleTime); |
| } |
| } catch (InterruptedException e) { |
| // If the thread is interrupted, just continue on |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| protected void notifyEntriesRemoved() { |
| if (maximumSize > 0) { |
| synchronized (this.stats) { |
| this.stats.notifyAll(); |
| } |
| } |
| } |
| |
| @Override |
| protected void distributeUpdateOperation(EntryEventImpl event, |
| long lastModified) { |
| /** |
| * no-op as there is no need to distribute this operation. |
| */ |
| } |
| |
| // TODO: Kishor: While merging below nethod is defined as skipWriteLock. |
| // Actually Cedar uses needWriteLock. Hence changed method to need writelock. |
| // We have to make it consistecnt. Either skipSwriteLock or needWriteLock |
| // /** |
| // * In case of update we need not take lock as we are doing local |
| // * operation. |
| // * After BatchRemovalThread: We don't need lock for destroy as well. |
| // * @param event |
| // * @return if we can skip taking the lock or not |
| // */ |
| // |
| // protected boolean skipWriteLock(EntryEventImpl event) { |
| // return true; |
| // } |
| |
| |
| protected boolean needWriteLock(EntryEventImpl event) { |
| return false; |
| } |
| |
| @Override |
| protected long basicPutPart2(EntryEventImpl event, RegionEntry entry, |
| boolean isInitialized, long lastModified, boolean clearConflict) { |
| return System.currentTimeMillis(); |
| } |
| |
| @Override |
| protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) { |
| /** |
| * We are doing local destroy on this bucket. No need to send destroy |
| * operation to remote nodes. |
| */ |
| if (logger.isDebugEnabled()) { |
| logger.debug("For Key {}, BasicDestroyBeforeRemoval: no need to send destroy operation to remote nodes. This will be done using BatchRemoval Message.", |
| event.getKey()); |
| } |
| } |
| |
| @Override |
| protected void distributeDestroyOperation(EntryEventImpl event) { |
| /** |
| * no-op as there is no need to distribute this operation. |
| */ |
| } |
| |
| /** |
| * Overridden to allow clear operation on ShadowBucketRegion. Do nothing here, |
| * so clear operations proceeds smoothly. |
| */ |
| @Override |
| protected void updateSizeOnClearRegion(int sizeBeforeClear) { |
| |
| } |
| |
| /** |
| * @return the initializationLock |
| */ |
| public ReentrantReadWriteLock getInitializationLock() { |
| return initializationLock; |
| } |
| /** |
| * Does a get that attempts to not fault values in from disk or make the entry |
| * the most recent in the LRU. |
| */ |
| /* protected Object optimalGet(Object k) { |
| // Get the object at that key (to remove the index). |
| Object object = null; |
| try { |
| object = getValueInVM(k); // OFFHEAP deserialize |
| if (object == null) { |
| // must be on disk |
| // fault it in w/o putting it back in the region |
| object = getValueOnDiskOrBuffer(k); |
| if (object == null) { |
| // try memory one more time in case it was already faulted back in |
| object = getValueInVM(k); // OFFHEAP deserialize |
| if (object == null) { |
| // if we get this far give up and just do a get |
| object = get(k); |
| } else { |
| if (object instanceof CachedDeserializable) { |
| object = ((CachedDeserializable)object).getDeserializedValue( |
| this, this.getRegionEntry(k)); |
| } |
| } |
| } |
| } else { |
| if (object instanceof CachedDeserializable) { |
| object = ((CachedDeserializable)object).getDeserializedValue(this, |
| this.getRegionEntry(k)); |
| } |
| } |
| } catch (EntryNotFoundException ok) { |
| // just return null; |
| } |
| if (object == Token.TOMBSTONE) { |
| object = null; |
| } |
| |
| return object; |
| }*/ |
| |
| public void destroyKey(Object key) throws ForceReattemptException { |
| if (logger.isDebugEnabled()) { |
| logger.debug(" destroying primary key {}", key); |
| } |
| EntryEventImpl event = getPartitionedRegion().newDestroyEntryEvent(key, |
| null); |
| event.setEventId(new EventID(cache.getSystem())); |
| try { |
| event.setRegion(this); |
| basicDestroy(event, true, null); |
| checkReadiness(); |
| } catch (EntryNotFoundException enf) { |
| if (getPartitionedRegion().isDestroyed()) { |
| getPartitionedRegion().checkReadiness(); |
| if (isBucketDestroyed()) { |
| throw new ForceReattemptException( |
| "Bucket moved", |
| new RegionDestroyedException( |
| LocalizedStrings.PartitionedRegionDataStore_REGION_HAS_BEEN_DESTROYED |
| .toLocalizedString(), getPartitionedRegion() |
| .getFullPath())); |
| } |
| } |
| throw enf; |
| } catch (RegionDestroyedException rde) { |
| getPartitionedRegion().checkReadiness(); |
| if (isBucketDestroyed()) { |
| throw new ForceReattemptException("Bucket moved while destroying key " |
| + key, rde); |
| } |
| } finally { |
| event.release(); |
| } |
| |
| this.notifyEntriesRemoved(); |
| } |
| |
| public void decQueueSize(int size) { |
| this.gatewaySenderStats.decQueueSize(size); |
| } |
| |
| public void decQueueSize() { |
| this.gatewaySenderStats.decQueueSize(); |
| } |
| |
| public void incQueueSize(int size) { |
| this.gatewaySenderStats.incQueueSize(size); |
| } |
| |
| public void incQueueSize() { |
| this.gatewaySenderStats.incQueueSize(); |
| } |
| |
| protected void loadEventsFromTempQueue() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("For bucket {} about to load events from the temp queue...", getId()); |
| } |
| Set queues = this.getPartitionedRegion().getParallelGatewaySender() |
| .getQueues(); |
| if (queues != null) { |
| ConcurrentParallelGatewaySenderQueue prq = (ConcurrentParallelGatewaySenderQueue)queues |
| .toArray()[0]; |
| // synchronized (prq.getBucketToTempQueueMap()) { |
| BlockingQueue<GatewaySenderEventImpl> tempQueue = prq.getBucketTmpQueue(getId()); |
| // .getBucketToTempQueueMap().get(getId()); |
| if (tempQueue != null && !tempQueue.isEmpty()) { |
| synchronized (tempQueue) { |
| try { |
| //ParallelQueueRemovalMessage checks for the key in BucketRegionQueue |
| //and if not found there, it removes it from tempQueue. When tempQueue |
| //is getting loaded in BucketRegionQueue, it may not find the key in both. |
| //To fix this race, load the events in writeLock. |
| getInitializationLock().writeLock().lock(); |
| // add the events from tempQueue to the region |
| GatewaySenderEventImpl event; |
| while ((event = tempQueue.poll()) != null) { |
| try { |
| event.setPossibleDuplicate(true); |
| if (this.addToQueue(event.getShadowKey(), event)) { |
| event = null; |
| } |
| } |
| catch (ForceReattemptException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("For bucket {} , enqueing event {} caused exception", getId(), event, e); |
| } |
| } finally { |
| if (event != null) { |
| event.release(); |
| } |
| } |
| } |
| } finally { |
| if (!tempQueue.isEmpty()) { |
| for (GatewaySenderEventImpl e: tempQueue) { |
| e.release(); |
| } |
| tempQueue.clear(); |
| } |
| getInitializationLock().writeLock().unlock(); |
| } |
| } |
| } |
| |
| // } |
| } |
| } |
| |
| /** |
| * Marks batchSize number of events in the iterator as duplicate |
| */ |
| protected void markEventsAsDuplicate(int batchSize, Iterator itr) { |
| int i = 0; |
| // mark number of event equal to the batchSize for setPossibleDuplicate to |
| // true before this bucket becomes primary on the node |
| while (i < batchSize && itr.hasNext()) { |
| Object key = itr.next(); |
| Object senderEvent = |
| getNoLRU(key, true, false, false); |
| |
| if (senderEvent != null) { |
| ((GatewaySenderEventImpl)senderEvent).setPossibleDuplicate(true); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Set possibleDuplicate to true on event: {}", senderEvent); |
| } |
| } |
| i++; |
| } |
| } |
| |
| @Override |
| public void forceSerialized(EntryEventImpl event) { |
| // NOOP since we want the value in the region queue to stay in object form. |
| } |
| |
| @Override |
| protected boolean virtualPut(EntryEventImpl event, boolean ifNew, |
| boolean ifOld, Object expectedOldValue, boolean requireOldValue, |
| long lastModified, boolean overwriteDestroyed) throws TimeoutException, |
| CacheWriterException { |
| boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, |
| requireOldValue, lastModified, overwriteDestroyed); |
| if (success) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Key : ----> {}", event.getKey()); |
| } |
| //@Unretained Object ov = event.getRawOldValue(); |
| //if (ov instanceof GatewaySenderEventImpl) { |
| // ((GatewaySenderEventImpl)ov).release(); |
| //} |
| GatewaySenderEventImpl.release(event.getRawOldValue()); |
| } |
| return success; |
| |
| } |
| @Override |
| protected void basicDestroy(final EntryEventImpl event, |
| final boolean cacheWrite, Object expectedOldValue) |
| throws EntryNotFoundException, CacheWriterException, TimeoutException { |
| super.basicDestroy(event, cacheWrite, expectedOldValue); |
| //@Unretained Object rov = event.getRawOldValue(); |
| //if (rov instanceof GatewaySenderEventImpl) { |
| // ((GatewaySenderEventImpl) rov).release(); |
| //} |
| GatewaySenderEventImpl.release(event.getRawOldValue()); |
| } |
| |
| |
| /** |
| * Return all of the user PR buckets for this bucket region queue. |
| */ |
| public Collection<BucketRegion> getCorrespondingUserPRBuckets() { |
| List<BucketRegion> userPRBuckets = new ArrayList<BucketRegion>(4); |
| Map<String, PartitionedRegion> colocatedPRs = ColocationHelper |
| .getAllColocationRegions(getPartitionedRegion()); |
| for (PartitionedRegion colocatedPR : colocatedPRs.values()) { |
| if (!colocatedPR.isShadowPR() && isThisSenderAttached(colocatedPR)) { |
| BucketRegion parentBucket = colocatedPR.getDataStore() |
| .getLocalBucketById(getId()); |
| if (parentBucket != null) |
| userPRBuckets.add(parentBucket); |
| } |
| } |
| return userPRBuckets; |
| } |
| |
| private boolean isThisSenderAttached(PartitionedRegion pr) { |
| return pr.getParallelGatewaySenderIds().contains( |
| getPartitionedRegion().getParallelGatewaySender().getId()); |
| } |
| |
| /** |
| * It should be an atomic operation. If the key has been added to the |
| * eventSeqNumQueue then make sure that the value is in the Bucket before the |
| * eventSeqNumQueue is available for peek/remove/take from other thread. |
| * |
| * @param key |
| * @param value |
| * @return boolean which shows whether the operation was successful or not. |
| * @throws ForceReattemptException |
| */ |
| public boolean addToQueue(Object key, Object value) |
| throws ForceReattemptException { |
| |
| //if the key exists in failedBatchRemovalMessageKeys, then |
| //remove it from there and return. Handling of a scenario in #49196. |
| if (failedBatchRemovalMessageKeys.remove(key)) { |
| return false; |
| } |
| |
| boolean didPut = false; |
| long startPut = CachePerfStats.getStatTime(); |
| // Value will always be an instanceof GatewaySenderEventImpl which |
| // is never stored offheap so this EntryEventImpl values will never be off-heap. |
| // So the value that ends up being stored in this region is a GatewaySenderEventImpl |
| // which may have a reference to a value stored off-heap. |
| EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key, |
| value, null, false, getMyId()); |
| // here avoiding unnecessary validations of key, value. Readniness check |
| // will be handled in virtualPut. avoiding extractDelta as this will be new |
| // entry everytime |
| // EntryEventImpl event = getPartitionedRegion().newUpdateEntryEvent(key, |
| // value, null); |
| event.copyOffHeapToHeap(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Value : {}", event.getRawNewValue()); |
| } |
| waitIfQueueFull(); |
| |
| int sizeOfHdfsEvent = -1; |
| try { |
| if (this instanceof HDFSBucketRegionQueue) { |
| // need to fetch the size before event is inserted in queue. |
| // fix for #50016 |
| if (this.getBucketAdvisor().isPrimary()) { |
| HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue(); |
| sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted); |
| } |
| } |
| |
| didPut = virtualPut(event, false, false, null, false, startPut, true); |
| |
| checkReadiness(); |
| } catch (RegionDestroyedException rde) { |
| // this can now happen due to a re-balance removing a bucket |
| getPartitionedRegion().checkReadiness(); |
| if (isBucketDestroyed()) { |
| throw new ForceReattemptException("Bucket moved", rde); |
| } |
| } finally { |
| if (!didPut) { |
| GatewaySenderEventImpl.release(value); |
| } |
| } |
| |
| //check again if the key exists in failedBatchRemovalMessageKeys, |
| //if yes, then remove it from there and destroy the key from BucketRegionQueue. |
| //This is to reduce the window of race condition described by Darrel in #49196. |
| if (failedBatchRemovalMessageKeys.remove(key) && didPut) { |
| destroyKey(key); |
| didPut = false; |
| } else { |
| addToEventQueue(key, didPut, event, sizeOfHdfsEvent); |
| } |
| return didPut; |
| } |
| @Override |
| public void closeEntries() { |
| OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { |
| @Override |
| public void run() { |
| AbstractBucketRegionQueue.super.closeEntries(); |
| } |
| }); |
| clearQueues(); |
| |
| } |
| |
| @Override |
| public Set<VersionSource> clearEntries(final RegionVersionVector rvv) { |
| final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>(); |
| OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { |
| @Override |
| public void run() { |
| result.set(AbstractBucketRegionQueue.super.clearEntries(rvv)); |
| } |
| }); |
| clearQueues(); |
| return result.get(); |
| } |
| |
| protected abstract void clearQueues(); |
| protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, |
| int sizeOfHdfsEvent); |
| |
| @Override |
| public void afterAcquiringPrimaryState() { |
| super.afterAcquiringPrimaryState(); |
| notifyEventProcessor(); |
| } |
| |
| protected void notifyEventProcessor() { |
| AbstractGatewaySender sender = getPartitionedRegion().getParallelGatewaySender(); |
| if (sender != null) { |
| AbstractGatewaySenderEventProcessor ep = sender.getEventProcessor(); |
| if (ep != null) { |
| ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("notifyEventProcessor : {} event processor {} queue {}", sender, ep, queue); |
| } |
| queue.notifyEventProcessorIfRequired(this.getId()); |
| } |
| } |
| } |
| |
| public boolean isInitialized() { |
| return this.initialized; |
| } |
| |
| /** |
| * |
| * @param key |
| */ |
| public void addToFailedBatchRemovalMessageKeys(Object key) { |
| failedBatchRemovalMessageKeys.add(key); |
| } |
| |
| public ConcurrentHashSet<Object> getFailedBatchRemovalMessageKeys() { |
| return this.failedBatchRemovalMessageKeys; |
| } |
| |
| } |