| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.wan.parallel; |
| |
| import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.AttributesMutator; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.CacheListener; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EntryNotFoundException; |
| import com.gemstone.gemfire.cache.EvictionAction; |
| import com.gemstone.gemfire.cache.EvictionAttributes; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue; |
| import com.gemstone.gemfire.internal.cache.BucketNotFoundException; |
| import com.gemstone.gemfire.internal.cache.BucketRegion; |
| import com.gemstone.gemfire.internal.cache.BucketRegionQueue; |
| import com.gemstone.gemfire.internal.cache.ColocationHelper; |
| import com.gemstone.gemfire.internal.cache.Conflatable; |
| import com.gemstone.gemfire.internal.cache.DiskRegionStats; |
| import com.gemstone.gemfire.internal.cache.DistributedRegion; |
| import com.gemstone.gemfire.internal.cache.EntryEventImpl; |
| import com.gemstone.gemfire.internal.cache.ForceReattemptException; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.InternalRegionArguments; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; |
| import com.gemstone.gemfire.internal.cache.PrimaryBucketException; |
| import com.gemstone.gemfire.internal.cache.RegionQueue; |
| import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; |
| import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; |
| import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; |
| import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.size.SingleObjectSizer; |
| import com.gemstone.gemfire.internal.util.concurrent.StoppableCondition; |
| import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock; |
| |
| public class ParallelGatewaySenderQueue implements RegionQueue { |
| |
| protected static final Logger logger = LogService.getLogger(); |
| |
| protected final Map<String, PartitionedRegion> userRegionNameToshadowPRMap = new ConcurrentHashMap<String, PartitionedRegion>(); |
| |
| // <PartitionedRegion, Map<Integer, List<Object>>> |
| private static final Map regionToDispatchedKeysMap = new ConcurrentHashMap(); |
| |
| protected static StoppableReentrantLock buckToDispatchLock; |
| |
| private static StoppableCondition regionToDispatchedKeysMapEmpty; |
| |
| protected final StoppableReentrantLock queueEmptyLock; |
| |
| private volatile boolean isQueueEmpty = true; |
| |
| /** |
| * False signal is fine on this condition. |
| * As processor will loop again and find out if it was a false signal. |
| * However, make sure that whatever scenario can cause an entry to be peeked |
| * shoudld signal the processor to unblock. |
| */ |
| private StoppableCondition queueEmptyCondition; |
| |
| protected final GatewaySenderStats stats; |
| |
| protected volatile boolean resetLastPeeked = false; |
| |
| |
| |
| /** |
| * There will be one shadow pr for each of the the PartitionedRegion which has added the GatewaySender |
| * Fix for Bug#45917 |
| * We maintain a tempQueue to queue events when buckets are not available locally. |
| */ |
| private final ConcurrentMap<Integer, BlockingQueue<GatewaySenderEventImpl>> bucketToTempQueueMap = new ConcurrentHashMap<Integer, BlockingQueue<GatewaySenderEventImpl>>(); |
| |
| /** |
| * The default frequency (in milliseconds) at which a message will be sent by the |
| * primary to all the secondary nodes to remove the events which have already |
| * been dispatched from the queue. |
| */ |
| public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 10; |
| //TODO:REF: how to change the message sync interval ? should it be common for serial and parallel |
| protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL; |
| //TODO:REF: name change for thread, as it appears in the log |
| private static BatchRemovalThread removalThread = null; |
| |
| protected BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>(); |
| |
| public final AbstractGatewaySender sender ; |
| |
| public static final int WAIT_CYCLE_SHADOW_BUCKET_LOAD = 10; |
| |
| public static final String QSTRING = "_PARALLEL_GATEWAY_SENDER_QUEUE"; |
| |
| /** |
| * Fixed size Thread pool for conflating the events in the queue. The size of |
| * the thread pool is set to the number of processors available to the JVM. |
| * There will be one thread pool per ParallelGatewaySender on a node. |
| */ |
| private static volatile ExecutorService conflationExecutor; |
| |
| /** |
| * This class carries out the actual removal of the previousTailKey from QPR. |
| * The class implements Runnable and the destroy operation is done in the run |
| * method. The Runnable is executed by the one of the threads in the |
| * conflation thread pool configured above. |
| */ |
| private class ConflationHandler implements Runnable { |
| Conflatable conflatableObject; |
| |
| Long previousTailKeyTobeRemoved; |
| |
| int bucketId; |
| |
| public ConflationHandler(Conflatable conflatableObject, int bId, |
| Long previousTailKey) { |
| this.conflatableObject = conflatableObject; |
| this.previousTailKeyTobeRemoved = previousTailKey; |
| this.bucketId = bId; |
| } |
| |
| public void run() { |
| PartitionedRegion prQ = null; |
| GatewaySenderEventImpl event = (GatewaySenderEventImpl)conflatableObject; |
| try { |
| String regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath(); |
| prQ = userRegionNameToshadowPRMap.get(regionPath); |
| destroyEventFromQueue(prQ, bucketId, previousTailKeyTobeRemoved); |
| } catch (EntryNotFoundException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Not conflating {} due to EntryNotFoundException", this, conflatableObject.getKeyToConflate()); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Conflated {} for key={} in queue for region={}", |
| this, conflatableObject.getValueToConflate(), conflatableObject.getKeyToConflate(), prQ.getName()); |
| } |
| } |
| |
| private Object deserialize(Object serializedBytes) { |
| Object deserializedObject = serializedBytes; |
| if (serializedBytes instanceof byte[]) { |
| byte[] serializedBytesCast = (byte[])serializedBytes; |
| // This is a debugging method so ignore all exceptions like |
| // ClassNotFoundException |
| try { |
| deserializedObject = EntryEventImpl.deserialize(serializedBytesCast); |
| } catch (Exception e) { |
| } |
| } |
| return deserializedObject; |
| } |
| } |
| |
| final protected int index; |
| final protected int nDispatcher; |
| |
| /** |
| * A transient queue to maintain the eventSeqNum of the events that are to be |
| * sent to remote site. It is cleared when the queue is cleared. |
| */ |
| //private final BlockingQueue<Long> eventSeqNumQueue; |
| |
| public ParallelGatewaySenderQueue(AbstractGatewaySender sender, |
| Set<Region> userRegions, int idx, int nDispatcher) { |
| this.index = idx; |
| this.nDispatcher = nDispatcher; |
| this.stats = sender.getStatistics(); |
| this.sender = (AbstractGatewaySender)sender; |
| |
| List<Region> listOfRegions = new ArrayList<Region>(userRegions); |
| //eventSeqNumQueue = new LinkedBlockingQueue<Long>(); |
| Collections.sort(listOfRegions, new Comparator<Region>() { |
| @Override |
| public int compare(Region o1, Region o2) { |
| return o1.getFullPath().compareTo(o2.getFullPath()); |
| } |
| }); |
| |
| for (Region userRegion : listOfRegions) { |
| if(userRegion instanceof PartitionedRegion){ |
| addShadowPartitionedRegionForUserPR((PartitionedRegion)userRegion); |
| } |
| else { |
| //Fix for Bug#51491. Once decided to support this configuration we have call addShadowPartitionedRegionForUserRR |
| if (this.sender.getId().contains( |
| AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX)) { |
| throw new AsyncEventQueueConfigurationException( |
| LocalizedStrings.ParallelAsyncEventQueue_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1.toLocalizedString(new Object[] { |
| AsyncEventQueueImpl |
| .getAsyncEventQueueIdFromSenderId(this.sender.getId()), |
| userRegion.getFullPath() })); |
| } |
| throw new GatewaySenderConfigurationException( |
| LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1 |
| .toLocalizedString(new Object[] { this.sender.getId(), |
| userRegion.getFullPath() })); |
| // addShadowPartitionedRegionForUserRR((DistributedRegion)userRegion); |
| } |
| } |
| |
| if( this.buckToDispatchLock == null) { |
| this.buckToDispatchLock = new StoppableReentrantLock(sender.getCancelCriterion()); |
| } |
| if(this.regionToDispatchedKeysMapEmpty == null) { |
| this.regionToDispatchedKeysMapEmpty = this.buckToDispatchLock.newCondition(); |
| } |
| |
| queueEmptyLock = new StoppableReentrantLock(sender.getCancelCriterion()); |
| queueEmptyCondition = queueEmptyLock.newCondition(); |
| //at present, this won't be accessed by multiple threads, |
| //still, it is safer approach to synchronize it |
| synchronized (ParallelGatewaySenderQueue.class) { |
| if (this.removalThread == null) { |
| this.removalThread = new BatchRemovalThread( |
| (GemFireCacheImpl)sender.getCache(), this); |
| this.removalThread.start(); |
| } |
| } |
| |
| //initialize the conflation thread pool if conflation is enabled |
| if (sender.isBatchConflationEnabled()) { |
| initializeConflationThreadPool(); |
| } |
| } |
| |
| public void addShadowPartitionedRegionForUserRR( |
| DistributedRegion userRegion) { |
| this.sender.lifeCycleLock.writeLock().lock(); |
| PartitionedRegion prQ = null; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("addShadowPartitionedRegionForUserRR: Going to create shadowpr for userRegion {}", userRegion.getFullPath()); |
| } |
| |
| try { |
| String regionName = userRegion.getFullPath(); |
| |
| if (this.userRegionNameToshadowPRMap.containsKey(regionName)) |
| return; |
| |
| GemFireCacheImpl cache = (GemFireCacheImpl)sender.getCache(); |
| final String prQName = getQueueName(sender.getId(), userRegion.getFullPath()); |
| prQ = (PartitionedRegion)cache.getRegion(prQName); |
| if (prQ == null) { |
| // TODO:REF:Avoid deprecated apis |
| AttributesFactory fact = new AttributesFactory(); |
| //Fix for 48621 - don't enable concurrency checks |
| //for queue buckets., event with persistence |
| fact.setConcurrencyChecksEnabled(false); |
| PartitionAttributesFactory pfact = new PartitionAttributesFactory(); |
| pfact.setTotalNumBuckets(sender.getMaxParallelismForReplicatedRegion()); |
| int localMaxMemory = userRegion.getDataPolicy().withStorage() ? sender |
| .getMaximumQueueMemory() : 0; |
| pfact.setLocalMaxMemory(localMaxMemory); |
| pfact.setRedundantCopies(3); //TODO:Kishor : THis need to be handled nicely |
| pfact.setPartitionResolver(new RREventIDResolver()); |
| if (sender.isPersistenceEnabled()) { |
| fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| } |
| |
| fact.setDiskStoreName(sender.getDiskStoreName()); |
| |
| // if persistence is enabled, set the diskSyncronous to whatever user |
| // has set |
| // else set it to false |
| //optimize with above check of enable persistence |
| if (sender.isPersistenceEnabled()) |
| fact.setDiskSynchronous(sender.isDiskSynchronous()); |
| else { |
| fact.setDiskSynchronous(false); |
| } |
| |
| // allow for no overflow directory |
| EvictionAttributes ea = EvictionAttributes.createLIFOMemoryAttributes( |
| sender.getMaximumQueueMemory(), EvictionAction.OVERFLOW_TO_DISK); |
| |
| fact.setEvictionAttributes(ea); |
| fact.setPartitionAttributes(pfact.create()); |
| |
| final RegionAttributes ra = fact.create(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Attempting to create queue region: {}", this, prQName); |
| } |
| |
| ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion( |
| prQName, ra, null, cache, sender); |
| |
| try { |
| prQ = (PartitionedRegion)cache.createVMRegion(prQName, ra, |
| new InternalRegionArguments().setInternalMetaRegion(meta) |
| .setDestroyLockFlag(true).setSnapshotInputStream(null) |
| .setImageTarget(null)); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Region created : {} partition Attributes : {}", prQ, prQ.getPartitionAttributes()); |
| } |
| |
| // Suranjan: TODO This should not be set on the PR but on the |
| // GatewaySender |
| prQ.enableConflation(sender.isBatchConflationEnabled()); |
| |
| // Before going ahead, make sure all the buckets of shadowPR are |
| // loaded |
| // and primary nodes have been decided. |
| // This is required in case of persistent PR and sender. |
| if (prQ.getLocalMaxMemory() != 0) { |
| Iterator<Integer> itr = prQ.getRegionAdvisor().getBucketSet() |
| .iterator(); |
| while (itr.hasNext()) { |
| itr.next(); |
| } |
| } |
| // In case of Replicated Region it may not be necessary. |
| |
| // if (sender.isPersistenceEnabled()) { |
| // //Kishor: I need to write a test for this code. |
| // Set<Integer> allBucketsClone = new HashSet<Integer>(); |
| // // allBucketsClone.addAll(allBuckets);*/ |
| // for (int i = 0; i < sender.getMaxParallelismForReplicatedRegion(); i++) |
| // allBucketsClone.add(i); |
| // |
| // while (!(allBucketsClone.size() == 0)) { |
| // Iterator<Integer> itr = allBucketsClone.iterator(); |
| // while (itr.hasNext()) { |
| // InternalDistributedMember node = prQ.getNodeForBucketWrite( |
| // itr.next(), null); |
| // if (node != null) { |
| // itr.remove(); |
| // } |
| // } |
| // // after the iteration is over, sleep for sometime before trying |
| // // again |
| // try { |
| // Thread.sleep(WAIT_CYCLE_SHADOW_BUCKET_LOAD); |
| // } |
| // catch (InterruptedException e) { |
| // logger.error(e); |
| // } |
| // } |
| // } |
| } |
| catch (IOException veryUnLikely) { |
| logger.fatal(LocalizedMessage.create( |
| LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, |
| this.getClass()), veryUnLikely); |
| } |
| catch (ClassNotFoundException alsoUnlikely) { |
| logger.fatal(LocalizedMessage.create( |
| LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, |
| this.getClass()), alsoUnlikely); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Created queue region: {}", this, prQ); |
| } |
| } |
| else { |
| // in case shadowPR exists already (can be possible when sender is |
| // started from stop operation) |
| if(this.index == 0) //HItesh: for first processor only |
| handleShadowPRExistsScenario(cache, prQ); |
| } |
| /* |
| * Here, enqueTempEvents need to be invoked when a sender is already |
| * running and userPR is created later. When the flow comes here through |
| * start() method of sender i.e. userPR already exists and sender is |
| * started later, the enqueTempEvents is done in the start() method of |
| * ParallelGatewaySender |
| */ |
| if ((this.index == this.nDispatcher - 1) && this.sender.isRunning()) { |
| ((AbstractGatewaySender)sender).enqueTempEvents(); |
| } |
| } |
| finally { |
| if (prQ != null) { |
| this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ); |
| } |
| this.sender.lifeCycleLock.writeLock().unlock(); |
| } |
| } |
| |
| private static String convertPathToName(String fullPath) { |
| //return fullPath.replaceAll("/", "_"); |
| return ""; |
| } |
| |
| public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} addShadowPartitionedRegionForUserPR: Attempting to create queue region: {}", this, userPR.getDisplayName()); |
| } |
| this.sender.lifeCycleLock.writeLock().lock(); |
| |
| PartitionedRegion prQ = null; |
| try { |
| String regionName = userPR.getFullPath(); |
| // Find if there is any parent region for this userPR |
| // if there is then no need to add another q for the same |
| String leaderRegionName = ColocationHelper.getLeaderRegion(userPR) |
| .getFullPath(); |
| if(!regionName.equals(leaderRegionName)) { |
| //Fix for defect #50364. Allow user to attach GatewaySender to child PR (without attaching to leader PR) |
| //though, internally, colocate the GatewaySender's shadowPR with the leader PR in colocation chain |
| if (!this.userRegionNameToshadowPRMap.containsKey(leaderRegionName)) { |
| addShadowPartitionedRegionForUserPR(ColocationHelper.getLeaderRegion(userPR)); |
| } |
| return; |
| } |
| |
| if (this.userRegionNameToshadowPRMap.containsKey(regionName)) |
| return; |
| |
| if(!isUsedForHDFS() && userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){ |
| throw new GatewaySenderException( |
| LocalizedStrings.ParallelGatewaySenderQueue_NON_PERSISTENT_GATEWAY_SENDER_0_CAN_NOT_BE_ATTACHED_TO_PERSISTENT_REGION_1 |
| .toLocalizedString(new Object[] { this.sender.getId(), |
| userPR.getFullPath() })); |
| } |
| |
| GemFireCacheImpl cache = (GemFireCacheImpl)sender.getCache(); |
| boolean isAccessor = (userPR.getLocalMaxMemory() == 0); |
| |
| final String prQName = sender.getId() |
| + QSTRING + convertPathToName(userPR.getFullPath()); |
| prQ = (PartitionedRegion)cache.getRegion(prQName); |
| if (prQ == null) { |
| //TODO:REF:Avoid deprecated apis |
| |
| AttributesFactory fact = new AttributesFactory(); |
| fact.setConcurrencyChecksEnabled(false); |
| PartitionAttributesFactory pfact = new PartitionAttributesFactory(); |
| pfact.setTotalNumBuckets(userPR.getTotalNumberOfBuckets()); |
| pfact.setRedundantCopies(userPR.getRedundantCopies()); |
| pfact.setColocatedWith(regionName); |
| // EITHER set localMaxMemory to 0 for accessor node |
| // OR override shadowPRs default local max memory with the sender's max |
| // queue memory (Fix for bug#44254) |
| int localMaxMemory = isAccessor ? 0 : sender |
| .getMaximumQueueMemory(); |
| pfact.setLocalMaxMemory(localMaxMemory); |
| |
| pfact.setStartupRecoveryDelay(userPR.getPartitionAttributes().getStartupRecoveryDelay()); |
| pfact.setRecoveryDelay(userPR.getPartitionAttributes().getRecoveryDelay()); |
| |
| if(sender.isPersistenceEnabled() && !isAccessor) { |
| fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| } |
| |
| fact.setDiskStoreName(sender.getDiskStoreName()); |
| |
| //if persistence is enabled, set the diskSyncronous to whatever user has set |
| //else set it to false |
| if (sender.isPersistenceEnabled()) |
| fact.setDiskSynchronous(sender.isDiskSynchronous()); |
| else { |
| fact.setDiskSynchronous(false); |
| } |
| |
| // allow for no overflow directory |
| EvictionAttributes ea = EvictionAttributes.createLIFOMemoryAttributes( |
| sender.getMaximumQueueMemory(), EvictionAction.OVERFLOW_TO_DISK); |
| |
| fact.setEvictionAttributes(ea); |
| fact.setPartitionAttributes(pfact.create()); |
| |
| final RegionAttributes ra = fact.create(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Attempting to create queue region: {}", this, prQName); |
| } |
| |
| ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion( |
| prQName, ra, null, cache, sender, isUsedForHDFS()); |
| |
| try { |
| prQ = (PartitionedRegion)cache |
| .createVMRegion(prQName, ra, new InternalRegionArguments() |
| .setInternalMetaRegion(meta).setDestroyLockFlag(true) |
| .setSnapshotInputStream(null).setImageTarget(null)); |
| // at this point we should be able to assert prQ == meta; |
| |
| //Suranjan: TODO This should not be set on the PR but on the GatewaySender |
| prQ.enableConflation(sender |
| .isBatchConflationEnabled()); |
| if (isAccessor) |
| return; // return from here if accessor node |
| |
| // if the current node is marked uninitialized (SQLF DDL replay in |
| // progress) then we cannot wait for buckets to recover, because |
| // bucket creation has been disabled until DDL replay is complete. |
| if(!prQ.getCache().isUnInitializedMember(prQ.getDistributionManager().getId())) { |
| //Wait for buckets to be recovered. |
| prQ.shadowPRWaitForBucketRecovery(); |
| } |
| |
| } catch (IOException veryUnLikely) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, |
| this.getClass()), veryUnLikely); |
| } catch (ClassNotFoundException alsoUnlikely) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, |
| this.getClass()), alsoUnlikely); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Created queue region: {}", this, prQ); |
| } |
| } else { |
| if (isAccessor) |
| return; // return from here if accessor node |
| // in case shadowPR exists already (can be possible when sender is |
| // started from stop operation) |
| if(this.index == 0) //HItesh:for first parallelGatewaySenderQueue only |
| handleShadowPRExistsScenario(cache, prQ); |
| } |
| |
| } finally { |
| if (prQ != null) { |
| |
| this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ); |
| } |
| /* |
| * Here, enqueTempEvents need to be invoked when a sender is already |
| * running and userPR is created later. When the flow comes here through |
| * start() method of sender i.e. userPR already exists and sender is |
| * started later, the enqueTempEvents is done in the start() method of |
| * ParallelGatewaySender |
| */ |
| if ((this.index == this.nDispatcher - 1) && this.sender.isRunning()) { |
| ((AbstractGatewaySender)sender).enqueTempEvents(); |
| } |
| afterRegionAdd(userPR); |
| this.sender.lifeCycleLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * This will be case when the sender is started again after stop operation. |
| */ |
| private void handleShadowPRExistsScenario(Cache cache, PartitionedRegion prQ) { |
| //Note: The region will not be null if the sender is started again after stop operation |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: No need to create the region as the region has been retrieved: {}", this, prQ); |
| } |
| // now, clean up the shadowPR's buckets on this node (primary as well as |
| // secondary) for a fresh start |
| Set<BucketRegion> localBucketRegions = prQ.getDataStore() |
| .getAllLocalBucketRegions(); |
| for (BucketRegion bucketRegion : localBucketRegions) { |
| bucketRegion.clear(); |
| } |
| } |
| protected boolean isUsedForHDFS() |
| { |
| return false; |
| } |
| protected void afterRegionAdd (PartitionedRegion userPR) { |
| |
| } |
| /** |
| * Initialize the thread pool, setting the number of threads that is equal |
| * to the number of processors available to the JVM. |
| */ |
| private static void initializeConflationThreadPool() { |
| if (conflationExecutor == null) { |
| final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup |
| .createThreadGroup("WAN Queue Conflation Logger Group", logger); |
| |
| final ThreadFactory threadFactory = new ThreadFactory() { |
| public Thread newThread(final Runnable task) { |
| final Thread thread = new Thread(loggingThreadGroup, task, |
| "WAN Queue Conflation Thread"); |
| thread.setDaemon(true); |
| return thread; |
| } |
| }; |
| |
| conflationExecutor = Executors.newFixedThreadPool(Runtime.getRuntime() |
| .availableProcessors(), threadFactory); |
| } |
| } |
| |
| /** |
| * Cleans up the conflation thread pool. |
| * Initially, shutdown is done to avoid accepting any newly submitted tasks. |
| * Wait a while for existing tasks to terminate. If the existing tasks still don't |
| * complete, cancel them by calling shutdownNow. |
| */ |
| private void cleanupConflationThreadPool() { |
| conflationExecutor.shutdown();// Disable new tasks from being submitted |
| |
| try { |
| if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) { |
| conflationExecutor.shutdownNow(); // Cancel currently executing tasks |
| // Wait a while for tasks to respond to being cancelled |
| if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) |
| logger.warn(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderQueue_COULD_NOT_TERMINATE_CONFLATION_THREADPOOL, this.sender)); |
| } |
| } catch (InterruptedException e) { |
| // (Re-)Cancel if current thread also interrupted |
| conflationExecutor.shutdownNow(); |
| // Preserve interrupt status |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| public void put(Object object) throws InterruptedException, CacheException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| //Suranjan : Can this region ever be null? Should we work with regionName and not with region instance. |
| // It can't be as put is happeing on the region and its still under process |
| GatewaySenderEventImpl value = (GatewaySenderEventImpl)object; |
| boolean isDREvent = isDREvent(value); |
| |
| // if (isDREvent(value)) { |
| // putInShadowPRForReplicatedRegion(object); |
| // value.freeOffHeapValue(); |
| // return; |
| // } |
| |
| Region region = value.getRegion(); |
| String regionPath = null; |
| if (isDREvent) { |
| regionPath = region.getFullPath(); |
| } |
| else { |
| regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion)region) |
| .getFullPath(); |
| } |
| if (isDebugEnabled) { |
| logger.debug("Put is for the region {}", region); |
| } |
| if (!this.userRegionNameToshadowPRMap.containsKey(regionPath)) { |
| if (isDebugEnabled) { |
| logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToshadowPRMap); |
| } |
| logger.warn(LocalizedMessage.create(LocalizedStrings.NOT_QUEUING_AS_USERPR_IS_NOT_YET_CONFIGURED, value)); |
| value.release(); |
| return; |
| } |
| |
| PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath); |
| int bucketId = value.getBucketId(); |
| Object key = null; |
| if(!isDREvent){ |
| key = value.getShadowKey(); |
| |
| if ((Long)key == -1) { |
| // In case of parallel we don't expect |
| // the key to be not set. If it is the case then the event must be coming |
| // through listener, so return. |
| if (isDebugEnabled) { |
| logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", key, value); |
| } |
| value.release(); |
| return; |
| } |
| }else{ |
| key = value.getEventId(); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("ParallelGatewaySenderOrderedQueue putting key {} : Value : {}", key, value); |
| } |
| AbstractBucketRegionQueue brq = (AbstractBucketRegionQueue)prQ.getDataStore() |
| .getLocalBucketById(bucketId); |
| |
| try { |
| if (brq == null) { |
| // Set the threadInitLevel to BEFORE_INITIAL_IMAGE. |
| int oldLevel = LocalRegion |
| .setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); |
| try { |
| // Full path of the bucket: |
| |
| final String bucketFullPath = Region.SEPARATOR |
| + PartitionedRegionHelper.PR_ROOT_REGION_NAME + Region.SEPARATOR |
| + prQ.getBucketName(bucketId); |
| |
| brq = (AbstractBucketRegionQueue)prQ.getCache().getRegionByPath( |
| bucketFullPath); |
| if (isDebugEnabled) { |
| logger.debug("ParallelGatewaySenderOrderedQueue : The bucket in the cache is bucketRegionName : {} bucket : {}", |
| bucketFullPath, brq); |
| } |
| if (brq != null) { |
| brq.getInitializationLock().readLock().lock(); |
| try { |
| putIntoBucketRegionQueue(brq, key, value); |
| } finally { |
| brq.getInitializationLock().readLock().unlock(); |
| } |
| } else if (isDREvent) { |
| // in case of DR with PGS, if shadow bucket is not found event after |
| // above search then it means that bucket is not intended for this |
| // node. So lets not add this event in temp queue event as we are |
| // doing it for PRevent |
| value.release(); |
| } else { |
| // We have to handle the case where brq is null because the |
| // colocation |
| // chain is getting destroyed one by one starting from child region |
| // i.e this bucket due to moveBucket operation |
| // In that case we don't want to store this event. |
| if (((PartitionedRegion)prQ.getColocatedWithRegion()) |
| .getRegionAdvisor().getBucketAdvisor(bucketId) |
| .getShadowBucketDestroyed()) { |
| if (isDebugEnabled) { |
| logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.", |
| key, value); |
| } |
| value.release(); |
| } else { |
| /** |
| * This is to prevent data loss, in the scenario when bucket is |
| * not available in the cache but we know that it will be created. |
| */ |
| BlockingQueue tempQueue = null; |
| synchronized (this.bucketToTempQueueMap) { |
| tempQueue = this.bucketToTempQueueMap.get(bucketId); |
| if (tempQueue == null) { |
| tempQueue = new LinkedBlockingQueue(); |
| this.bucketToTempQueueMap.put(bucketId, tempQueue); |
| } |
| } |
| |
| synchronized (tempQueue) { |
| brq = (AbstractBucketRegionQueue)prQ.getCache() |
| .getRegionByPath(bucketFullPath); |
| if (brq != null) { |
| brq.getInitializationLock().readLock().lock(); |
| try { |
| putIntoBucketRegionQueue(brq, key, value); |
| } finally { |
| brq.getInitializationLock().readLock().unlock(); |
| } |
| } else { |
| // tempQueue = this.bucketToTempQueueMap.get(bucketId); |
| // if (tempQueue == null) { |
| // tempQueue = new LinkedBlockingQueue(); |
| // this.bucketToTempQueueMap.put(bucketId, tempQueue); |
| // } |
| tempQueue.add(value); |
| // TODO OFFHEAP is value refCount ok here? |
| // For debugging purpose. |
| if (isDebugEnabled) { |
| logger.debug("The value {} is enqueued to the tempQueue for the BucketRegionQueue.", value); |
| } |
| } |
| } |
| } |
| // } |
| } |
| |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } else { |
| boolean thisbucketDestroyed = false; |
| |
| if (!isDREvent) { |
| thisbucketDestroyed = ((PartitionedRegion)prQ |
| .getColocatedWithRegion()).getRegionAdvisor() |
| .getBucketAdvisor(bucketId).getShadowBucketDestroyed() |
| || brq.isDestroyed(); |
| } else { |
| thisbucketDestroyed = brq.isDestroyed(); |
| } |
| |
| if (!thisbucketDestroyed) { |
| putIntoBucketRegionQueue(brq, key, value); |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.", |
| key, value); |
| } |
| value.release(); |
| } |
| } |
| } finally { |
| notifyEventProcessorIfRequired(); |
| } |
| } |
| |
| public void notifyEventProcessorIfRequired() { |
| //putter thread should not take lock every time |
| if(isQueueEmpty) { |
| queueEmptyLock.lock(); |
| try { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Going to notify, isQueueEmpty {}", isQueueEmpty); |
| } |
| if (isQueueEmpty) { |
| isQueueEmpty = false; |
| queueEmptyCondition.signal(); |
| } |
| } finally { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Notified!, isQueueEmpty {}", isQueueEmpty); |
| } |
| queueEmptyLock.unlock(); |
| } |
| } |
| } |
| |
| private void putIntoBucketRegionQueue(AbstractBucketRegionQueue brq, Object key, |
| GatewaySenderEventImpl value) { |
| boolean addedValueToQueue = false; |
| try { |
| if (brq != null) { |
| addedValueToQueue = brq.addToQueue(key, value); |
| // TODO : Kishor : During merge, ParallelWANstats test failed. On |
| // comment below code test passed. cheetha does not have below code. |
| // need to find out from hcih revision this code came |
| // if (brq.getBucketAdvisor().isPrimary()) { |
| // this.stats.incQueueSize(); |
| // } |
| } |
| } catch (BucketNotFoundException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("For bucket {} the current bucket redundancy is {}", brq.getId(), |
| brq.getPartitionedRegion().getRegionAdvisor().getBucketAdvisor(brq.getId()).getBucketRedundancy()); |
| } |
| } catch (ForceReattemptException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("getInitializedBucketForId: Got ForceReattemptException for {} for bucket = {}", this, brq.getId()); |
| } |
| } finally { |
| if (!addedValueToQueue) { |
| value.release(); |
| } |
| } |
| } |
| |
| /** |
| * This returns queueRegion if there is only one PartitionedRegion using the GatewaySender |
| * Otherwise it returns null. |
| */ |
| public Region getRegion() { |
| return this.userRegionNameToshadowPRMap.size() == 1 ? (Region)this.userRegionNameToshadowPRMap |
| .values().toArray()[0] : null; |
| } |
| |
| public PartitionedRegion getRegion(String fullpath) { |
| return this.userRegionNameToshadowPRMap.get(fullpath); |
| } |
| |
| public PartitionedRegion removeShadowPR(String fullpath) { |
| try { |
| this.sender.lifeCycleLock.writeLock().lock(); |
| this.sender.setEnqueuedAllTempQueueEvents(false); |
| return this.userRegionNameToshadowPRMap.remove(fullpath); |
| } |
| finally { |
| sender.lifeCycleLock.writeLock().unlock(); |
| } |
| } |
| |
| public ExecutorService getConflationExecutor() { |
| return this.conflationExecutor; |
| } |
| |
| /** |
| * Returns the set of shadowPR backign this queue. |
| */ |
| public Set<PartitionedRegion> getRegions() { |
| return new HashSet(this.userRegionNameToshadowPRMap.values()); |
| } |
| |
| // TODO: Suranjan Find optimal way to get Random shadow pr as this will be called in each put and peek. |
| protected PartitionedRegion getRandomShadowPR() { |
| PartitionedRegion prQ = null; |
| if (this.userRegionNameToshadowPRMap.values().size() > 0) { |
| int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size()); |
| prQ = (PartitionedRegion)this.userRegionNameToshadowPRMap.values().toArray()[randomIndex]; |
| } |
| // if (this.userPRToshadowPRMap.values().size() > 0 |
| // && (prQ == null)) { |
| // prQ = getRandomShadowPR(); |
| // } |
| return prQ; |
| } |
| |
| private boolean isDREvent(GatewaySenderEventImpl event){ |
| return (event.getRegion() instanceof DistributedRegion) ? true : false; |
| } |
| /** |
| * Take will choose a random BucketRegionQueue which is primary and will take the head element |
| * from it. |
| */ |
| @Override |
| public Object take() throws CacheException, InterruptedException { |
| //merge42180. |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * TODO: Optimization needed. We are creating 1 array list for each peek!! |
| * @return BucketRegionQueue |
| */ |
| private final BucketRegionQueue getRandomBucketRegionQueue() { |
| PartitionedRegion prQ = getRandomShadowPR(); |
| if( prQ != null) { |
| final PartitionedRegionDataStore ds = prQ.getDataStore(); |
| final List<Integer> buckets = new ArrayList<Integer>( |
| ds.getAllLocalPrimaryBucketIds()); |
| if (buckets.isEmpty()) |
| return null; |
| final int index = new Random().nextInt(buckets.size()); |
| final int brqId = buckets.get(index); |
| final BucketRegionQueue brq = (BucketRegionQueue)ds |
| .getLocalBucketById(brqId); |
| if (brq.isReadyForPeek()) { |
| return brq; |
| } |
| } |
| return null; |
| } |
| |
| private boolean areLocalBucketQueueRegionsPresent() { |
| boolean bucketsAvailable = false; |
| for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { |
| if (prQ.getDataStore().getAllLocalBucketRegions().size() > 0) |
| return true; |
| } |
| return false; |
| } |
| |
| private boolean areLocalBucketQueueRegionsPresent(PartitionedRegion prQ) { |
| return prQ.getDataStore().isLocalBucketRegionPresent(); |
| } |
| |
| private int pickBucketId; |
| |
| protected int getRandomPrimaryBucket(PartitionedRegion prQ) { |
| if (prQ != null) { |
| Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets(); |
| List<Integer> thisProcessorBuckets = new ArrayList<Integer>(); |
| |
| for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) { |
| BucketRegion bucket = bucketEntry.getValue(); |
| if (bucket.getBucketAdvisor().isPrimary()) { |
| int bId = bucket.getId(); |
| if (bId % this.nDispatcher == this.index) { |
| thisProcessorBuckets.add(bId); |
| } |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), thisProcessorBuckets.size()); |
| } |
| |
| int nTry = thisProcessorBuckets.size(); |
| |
| while(nTry-- > 0) { |
| if(pickBucketId >= thisProcessorBuckets.size()) |
| pickBucketId = 0; |
| BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore() |
| .getLocalBucketById(thisProcessorBuckets.get(pickBucketId++)); |
| if (br != null && br.isReadyForPeek()) { |
| return br.getId(); |
| } |
| } |
| |
| // TODO:REF: instead of shuffle use random number, in this method we are |
| // returning id instead we should return BRQ itself |
| /*Collections.shuffle(thisProcessorBuckets); |
| for (Integer bucketId : thisProcessorBuckets) { |
| BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore() |
| .getLocalBucketById(bucketId); |
| |
| if (br != null && br.isReadyForPeek()) { |
| return br.getId(); |
| } |
| }*/ |
| } |
| return -1; |
| } |
| |
| @Override |
| public List take(int batchSize) throws CacheException, InterruptedException { |
| //merge42180 |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void remove() throws CacheException { |
| if (!this.peekedEvents.isEmpty()) { |
| |
| GatewaySenderEventImpl event = this.peekedEvents.remove(); |
| try { |
| // PartitionedRegion prQ = this.userPRToshadowPRMap.get(ColocationHelper |
| // .getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath()); |
| // |
| PartitionedRegion prQ = null; |
| int bucketId = -1; |
| Object key = null; |
| if (event.getRegion() != null) { |
| if (isDREvent(event)) { |
| prQ = this.userRegionNameToshadowPRMap.get(event.getRegion() |
| .getFullPath()); |
| bucketId = event.getEventId().getBucketID(); |
| key = event.getEventId(); |
| } else { |
| prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper |
| .getLeaderRegion((PartitionedRegion)event.getRegion()) |
| .getFullPath()); |
| bucketId = event.getBucketId(); |
| key = event.getShadowKey(); |
| } |
| } else { |
| String regionPath = event.getRegionPath(); |
| GemFireCacheImpl cache = (GemFireCacheImpl)this.sender.getCache(); |
| Region region = (PartitionedRegion)cache.getRegion(regionPath); |
| if (region != null && !region.isDestroyed()) { |
| // TODO: Suranjan We have to get colocated parent region for this |
| // region |
| if (region instanceof DistributedRegion) { |
| prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath()); |
| event.getBucketId(); |
| key = event.getEventId(); |
| } else { |
| prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper |
| .getLeaderRegion((PartitionedRegion)region).getFullPath()); |
| event.getBucketId(); |
| key = event.getShadowKey(); |
| } |
| } |
| } |
| |
| if (prQ != null) { |
| destroyEventFromQueue(prQ, bucketId, key); |
| } |
| } finally { |
| event.release(); |
| } |
| } |
| } |
| |
| private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, |
| Object key) { |
| boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId) |
| .isPrimary(); |
| if (isPrimary) { |
| BucketRegionQueue brq = (BucketRegionQueue)prQ.getDataStore() |
| .getLocalBucketById(bucketId); |
| // TODO : Kishor : Make sure we dont need to initalize a bucket |
| // before destroying a key from it |
| try { |
| if (brq != null) { |
| brq.destroyKey(key); |
| } |
| stats.decQueueSize(); |
| } catch (EntryNotFoundException e) { |
| if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) { |
| logger.debug("ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}", |
| key, this, bucketId, this.sender); |
| } |
| } catch (ForceReattemptException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bucket :{} moved to other member", bucketId); |
| } |
| } catch (PrimaryBucketException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Primary bucket :{} moved to other member", bucketId); |
| } |
| } |
| addRemovedEvent(prQ, bucketId, key); |
| } |
| } |
| |
| public void resetLastPeeked() { |
| this.resetLastPeeked = true; |
| } |
| |
| // Need to improve here.If first peek returns NULL then look in another bucket. |
| @Override |
| public Object peek() throws InterruptedException, CacheException { |
| Object object = null; |
| |
| int bucketId = -1; |
| PartitionedRegion prQ = getRandomShadowPR(); |
| if (prQ != null && prQ.getDataStore().getAllLocalBucketRegions() |
| .size() > 0 |
| && ((bucketId = getRandomPrimaryBucket(prQ)) != -1)) { |
| BucketRegionQueue brq; |
| try { |
| brq = ((BucketRegionQueue)prQ.getDataStore() |
| .getInitializedBucketForId(null, bucketId)); |
| object = brq.peek(); |
| } catch (BucketRegionQueueUnavailableException e) { |
| return object;//since this is not set, it would be null |
| } catch (ForceReattemptException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Remove: Got ForceReattemptException for {} for bucke = {}", this, bucketId); |
| } |
| } |
| } |
| return object; // OFFHEAP: ok since only callers uses it to check for empty queue |
| } |
| |
| // This method may need synchronization in case it is used by |
| // ConcurrentParallelGatewaySender |
| protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { |
| buckToDispatchLock.lock(); |
| boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); |
| try { |
| Map bucketIdToDispatchedKeys = (Map)regionToDispatchedKeysMap.get(prQ.getFullPath()); |
| if (bucketIdToDispatchedKeys == null) { |
| bucketIdToDispatchedKeys = new ConcurrentHashMap(); |
| regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys); |
| } |
| addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key); |
| if (wasEmpty) { |
| regionToDispatchedKeysMapEmpty.signal(); |
| } |
| } |
| finally { |
| buckToDispatchLock.unlock(); |
| } |
| } |
| |
| private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int bucketId, |
| Object key) { |
| List dispatchedKeys = (List)bucketIdToDispatchedKeys.get(bucketId); |
| if (dispatchedKeys == null) { |
| dispatchedKeys = new ArrayList<Object>(); |
| bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys); |
| } |
| dispatchedKeys.add(key); |
| } |
| |
| protected void addRemovedEvents(PartitionedRegion prQ, int bucketId, |
| List<Object> shadowKeys) { |
| buckToDispatchLock.lock(); |
| boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); |
| try { |
| Map bucketIdToDispatchedKeys = (Map)regionToDispatchedKeysMap.get(prQ.getFullPath()); |
| if (bucketIdToDispatchedKeys == null) { |
| bucketIdToDispatchedKeys = new ConcurrentHashMap(); |
| regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys); |
| } |
| addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys); |
| if (wasEmpty) { |
| regionToDispatchedKeysMapEmpty.signal(); |
| } |
| } |
| finally { |
| buckToDispatchLock.unlock(); |
| } |
| } |
| |
| protected void addRemovedEvents(String prQPath, int bucketId, |
| List<Object> shadowKeys) { |
| buckToDispatchLock.lock(); |
| boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); |
| try { |
| Map bucketIdToDispatchedKeys = (Map)regionToDispatchedKeysMap.get(prQPath); |
| if (bucketIdToDispatchedKeys == null) { |
| bucketIdToDispatchedKeys = new ConcurrentHashMap(); |
| regionToDispatchedKeysMap.put(prQPath, bucketIdToDispatchedKeys); |
| } |
| addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys); |
| if (wasEmpty) { |
| regionToDispatchedKeysMapEmpty.signal(); |
| } |
| } |
| finally { |
| buckToDispatchLock.unlock(); |
| } |
| } |
| |
| private void addRemovedEventsToMap(Map bucketIdToDispatchedKeys, |
| int bucketId, List keys) { |
| List dispatchedKeys = (List)bucketIdToDispatchedKeys.get(bucketId); |
| if (dispatchedKeys == null) { |
| dispatchedKeys = keys == null ? new ArrayList<Object>() : keys; |
| } else { |
| dispatchedKeys.addAll(keys); |
| } |
| bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys); |
| } |
| |
| public List peek(int batchSize) throws InterruptedException, CacheException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public List peek(int batchSize, int timeToWait) throws InterruptedException, |
| CacheException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| PartitionedRegion prQ = getRandomShadowPR(); |
| List batch = new ArrayList(); |
| if (prQ == null || prQ.getLocalMaxMemory() == 0) { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| blockProcesorThreadIfRequired(); |
| return batch; |
| } |
| |
| long start = System.currentTimeMillis(); |
| long end = start + timeToWait; |
| |
| if (this.resetLastPeeked) { |
| batch.addAll(peekedEvents); |
| this.resetLastPeeked = false; |
| if (isDebugEnabled) { |
| StringBuffer buffer = new StringBuffer(); |
| for (GatewaySenderEventImpl ge : peekedEvents) { |
| buffer.append("event :"); |
| buffer.append(ge); |
| } |
| logger.debug("Adding already peeked events to the batch {}", buffer); |
| } |
| } |
| |
| int bId = -1; |
| while (batch.size() < batchSize) { |
| if (areLocalBucketQueueRegionsPresent() |
| && ((bId = getRandomPrimaryBucket(prQ)) != -1)) { |
| GatewaySenderEventImpl object = (GatewaySenderEventImpl) peekAhead(prQ, bId); |
| if (object != null) { |
| GatewaySenderEventImpl copy = object.makeHeapCopyIfOffHeap(); |
| if (copy == null) { |
| continue; |
| } |
| object = copy; |
| } |
| // Conflate here |
| if (object != null) { |
| if (isDebugEnabled) { |
| logger.debug("The gatewayEventImpl in peek is {}", object); |
| } |
| batch.add(object); |
| peekedEvents.add(object); |
| BucketRegionQueue brq = ((BucketRegionQueue)prQ |
| .getDataStore().getLocalBucketById(bId)); |
| |
| //brq.doLockForPrimary(false); |
| |
| } else { |
| // If time to wait is -1 (don't wait) or time interval has elapsed |
| long currentTime = System.currentTimeMillis(); |
| if (isDebugEnabled) { |
| logger.debug("{}: Peeked object was null. Peek current time: {}", this, currentTime); |
| } |
| if (timeToWait == -1 || (end <= currentTime)) { |
| if (isDebugEnabled) { |
| logger.debug("{}: Peeked object was null.. Peek breaking", this); |
| } |
| break; |
| } |
| if (isDebugEnabled) { |
| logger.debug("{}: Peeked object was null. Peek continuing", this); |
| } |
| continue; |
| } |
| } else { |
| // If time to wait is -1 (don't wait) or time interval has elapsed |
| long currentTime = System.currentTimeMillis(); |
| if (isDebugEnabled) { |
| logger.debug("{}: Peek current time: {}", this, currentTime); |
| } |
| if (timeToWait == -1 || (end <= currentTime)) { |
| if (isDebugEnabled) { |
| logger.debug("{}: Peek breaking", this); |
| } |
| break; |
| } |
| if (isDebugEnabled) { |
| logger.debug("{}: Peek continuing", this); |
| } |
| // Sleep a bit before trying again. |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| break; |
| } |
| continue; |
| } |
| } |
| if (isDebugEnabled) { |
| logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}", |
| this, batch.size(), size(), localSize()); |
| } |
| if (batch.size() == 0) { |
| blockProcesorThreadIfRequired(); |
| } |
| return batch; |
| } |
| |
| protected void blockProcesorThreadIfRequired() throws InterruptedException { |
| queueEmptyLock.lock(); |
| try { |
| //while (isQueueEmpty) { |
| if(isQueueEmpty) { //merge44610: this if condition came from cheetah 44610 |
| if (logger.isDebugEnabled()) { |
| logger.debug("Going to wait, till notified."); |
| } |
| // merge44610: this time waiting came from cheetah 44610. In cedar 1000 |
| // is assumed as milliseconds. In cheetah TimeUnitParamter Millisecond |
| // is used. In cheetah stoppable has method to consider timeunit |
| // parameter but cedar does not have such corresponding method |
| queueEmptyCondition.await(1000); |
| //merge44610: this time waiting came from cheetah 44610 |
| //isQueueEmpty = this.localSize() == 0; |
| } |
| // update the flag so that next time when we come we will block. |
| isQueueEmpty = this.localSizeForProcessor() == 0; |
| } finally { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Going to unblock. isQueueEmpty {}", isQueueEmpty); |
| } |
| queueEmptyLock.unlock(); |
| } |
| } |
| |
| protected Object peekAhead(PartitionedRegion prQ, int bucketId) throws CacheException { |
| Object object = null; |
| BucketRegionQueue brq = ((BucketRegionQueue)prQ |
| .getDataStore().getLocalBucketById(bucketId)); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Peekahead for the bucket {}",this, bucketId); |
| } |
| try { |
| object = brq.peek(); |
| } catch (BucketRegionQueueUnavailableException e) { |
| //BucketRegionQueue unavailable. Can be due to the BucketRegionQueue being destroyed. |
| return object;//this will be null |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Peeked object from bucket {} object: {}",this, bucketId, object); |
| } |
| |
| if (object == null) { |
| if (this.stats != null) { |
| this.stats.incEventsNotQueuedConflated(); |
| } |
| } |
| return object; // OFFHEAP: ok since callers are careful to do destroys on region queue after finished with peeked object. |
| } |
| |
| |
| public int localSize() { |
| int size = 0; |
| for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { |
| if(((PartitionedRegion)prQ.getRegion()).getDataStore() != null) { |
| size += ((PartitionedRegion)prQ.getRegion()).getDataStore() |
| .getSizeOfLocalPrimaryBuckets(); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(), size); |
| } |
| } |
| return size /*+ sender.getTmpQueuedEventSize()*/; |
| } |
| |
| public int localSizeForProcessor() { |
| int size = 0; |
| for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { |
| if(((PartitionedRegion)prQ.getRegion()).getDataStore() != null) { |
| Set<BucketRegion> primaryBuckets = ((PartitionedRegion)prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions(); |
| |
| for (BucketRegion br : primaryBuckets) { |
| if(br.getId() % this.nDispatcher == this.index) |
| size += br.size(); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(), size); |
| } |
| } |
| return size /*+ sender.getTmpQueuedEventSize()*/; |
| } |
| |
| @Override |
| public int size() { |
| int size = 0; |
| for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}", |
| prQ.getName(), prQ.size(), prQ.keys().size()); |
| } |
| size += prQ.size(); |
| } |
| |
| return size + sender.getTmpQueuedEventSize(); |
| } |
| |
| @Override |
| public void addCacheListener(CacheListener listener) { |
| for(PartitionedRegion prQ: this.userRegionNameToshadowPRMap.values()) { |
| AttributesMutator mutator = prQ.getAttributesMutator(); |
| mutator.addCacheListener(listener); |
| } |
| } |
| |
| @Override |
| public void removeCacheListener() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| |
| @Override |
| public void remove(int batchSize) throws CacheException { |
| for (int i = 0; i < batchSize; i++) { |
| remove(); |
| } |
| } |
| |
| public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) { |
| ConflationHandler conflationHandler = new ConflationHandler( |
| conflatableObject, bucketId, tailKey); |
| conflationExecutor.execute(conflationHandler); |
| } |
| |
| public long getNumEntriesOverflowOnDiskTestOnly() { |
| long numEntriesOnDisk = 0; |
| for(PartitionedRegion prQ: this.userRegionNameToshadowPRMap.values()) { |
| DiskRegionStats diskStats = prQ.getDiskRegionStats(); |
| if (diskStats == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: DiskRegionStats for shadow PR is null. Returning the numEntriesOverflowOnDisk as 0", this); |
| } |
| return 0; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesOverflowOnDisk obtained from DiskRegionStats", this); |
| } |
| numEntriesOnDisk += diskStats.getNumOverflowOnDisk(); |
| } |
| return numEntriesOnDisk; |
| } |
| |
| public long getNumEntriesInVMTestOnly() { |
| long numEntriesInVM = 0; |
| for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { |
| DiskRegionStats diskStats = prQ.getDiskRegionStats(); |
| if (diskStats == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: DiskRegionStats for shadow PR is null. Returning the numEntriesInVM as 0", this); |
| } |
| return 0; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesInVM obtained from DiskRegionStats", this); |
| } |
| numEntriesInVM += diskStats.getNumEntriesInVM(); |
| } |
| return numEntriesInVM; |
| } |
| |
| /** |
| * This method does the cleanup of any threads, sockets, connection that are held up |
| * by the queue. Note that this cleanup doesn't clean the data held by the queue. |
| */ |
| public void cleanUp() { |
| if(buckToDispatchLock != null){ |
| this.buckToDispatchLock = null; |
| } |
| if(regionToDispatchedKeysMapEmpty != null) { |
| this.regionToDispatchedKeysMapEmpty = null; |
| } |
| this.regionToDispatchedKeysMap.clear(); |
| synchronized (ParallelGatewaySenderQueue.class) { |
| if (this.removalThread != null) { |
| this.removalThread.shutdown(); |
| this.removalThread = null; |
| } |
| } |
| if (conflationExecutor != null) { |
| cleanupConflationThreadPool(); |
| this.conflationExecutor = null; |
| } |
| } |
| |
| @Override |
| public void close() { |
| // Because of bug 49060 do not close the regions of a parallel queue |
| // for (Region r: getRegions()) { |
| // if (r != null && !r.isDestroyed()) { |
| // try { |
| // r.close(); |
| // } catch (RegionDestroyedException e) { |
| // } |
| // } |
| // } |
| } |
| |
| /** |
| * @return the bucketToTempQueueMap |
| */ |
| public Map<Integer, BlockingQueue<GatewaySenderEventImpl>> getBucketToTempQueueMap() { |
| return this.bucketToTempQueueMap; |
| } |
| |
| public static boolean isParallelQueue(String regionName) { |
| return regionName.contains(QSTRING); |
| } |
| |
| public static String getQueueName(String senderId, String regionPath) { |
| return senderId + QSTRING + convertPathToName(regionPath); |
| } |
| |
| public static String getSenderId(String regionName) { |
| int queueStringStart = regionName.indexOf(QSTRING); |
| //The queue id is everything after the leading / and before the QSTRING |
| return regionName.substring(1, queueStringStart); |
| } |
| |
| //TODO:REF: Name for this class should be appropriate? |
| private static class BatchRemovalThread extends Thread { |
| /** |
| * boolean to make a shutdown request |
| */ |
| private volatile boolean shutdown = false; |
| |
| private final GemFireCacheImpl cache; |
| |
| private final ParallelGatewaySenderQueue parallelQueue; |
| |
| /** |
| * Constructor : Creates and initializes the thread |
| */ |
| public BatchRemovalThread(GemFireCacheImpl c, ParallelGatewaySenderQueue queue) { |
| super("BatchRemovalThread"); |
| //TODO:REF: Name for this thread ? |
| this.setDaemon(true); |
| this.cache = c; |
| this.parallelQueue = queue; |
| } |
| |
| private boolean checkCancelled() { |
| if (shutdown) { |
| return true; |
| } |
| if (cache.getCancelCriterion().cancelInProgress() != null) { |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| InternalDistributedSystem ids = cache.getDistributedSystem(); |
| DM dm = ids.getDistributionManager(); |
| for (;;) { |
| try { // be somewhat tolerant of failures |
| if (checkCancelled()) { |
| break; |
| } |
| |
| // TODO : make the thread running time configurable |
| boolean interrupted = Thread.interrupted(); |
| try { |
| synchronized (this) { |
| this.wait(messageSyncInterval); |
| } |
| } catch (InterruptedException e) { |
| interrupted = true; |
| if (checkCancelled()) { |
| break; |
| } |
| break; // desperation...we must be trying to shut |
| // down...? |
| } finally { |
| // Not particularly important since we're exiting |
| // the thread, |
| // but following the pattern is still good |
| // practice... |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| buckToDispatchLock.lock(); |
| try { |
| logger.debug("BatchRemovalThread about to query the batch removal map {}", regionToDispatchedKeysMap); |
| } |
| finally { |
| buckToDispatchLock.unlock(); |
| } |
| } |
| |
| final HashMap<String, Map<Integer, List>> temp = new HashMap<String, Map<Integer, List>>(); |
| buckToDispatchLock.lock(); |
| try { |
| boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); |
| while (regionToDispatchedKeysMap.isEmpty()) { |
| regionToDispatchedKeysMapEmpty.await(); |
| } |
| if (wasEmpty) continue; |
| // TODO: This should be optimized. |
| temp.putAll(regionToDispatchedKeysMap); |
| regionToDispatchedKeysMap.clear(); |
| } |
| finally { |
| buckToDispatchLock.unlock(); |
| } |
| // Get all the data-stores wherever userPRs are present |
| Set<InternalDistributedMember> recipients = getAllRecipients(cache, |
| temp); |
| cache.getDistributionManager().removeMembersWithSameOrNewerVersion( |
| recipients, Version.GFE_80); |
| if (!recipients.isEmpty()) { |
| for (Map.Entry<String, Map<Integer, List>> mapEntry : temp |
| .entrySet()) { |
| Set<InternalDistributedMember> tempOldVersionMembers = recipients; |
| PartitionedRegion prQ = (PartitionedRegion)cache |
| .getRegion(mapEntry.getKey()); |
| Set<InternalDistributedMember> memberForPRQ = prQ |
| .getRegionAdvisor().adviseDataStore(); |
| memberForPRQ.retainAll(tempOldVersionMembers); |
| ParallelQueueBatchRemovalResponse response = ParallelQueueBatchRemovalMessage |
| .send(memberForPRQ, prQ, mapEntry.getValue()); |
| try { |
| response.waitForResponse(); |
| } |
| catch (ForceReattemptException e) { |
| // put temp back again in the map |
| for (Integer bId : mapEntry.getValue().keySet()) { |
| this.parallelQueue.addRemovedEvents(prQ, bId, mapEntry.getValue().get(bId)); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("ParallelQueueBatchRemovalMessage got ForceReattemptException. Will continue."); |
| } |
| } |
| } |
| } |
| recipients = getAllRecipients(cache, temp); |
| cache.getDistributionManager().retainMembersWithSameOrNewerVersion( |
| recipients, Version.GFE_80); |
| if (!recipients.isEmpty()) { |
| ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage( |
| temp); |
| pqrm.setRecipients(recipients); |
| dm.putOutgoing(pqrm); |
| } |
| |
| } // be somewhat tolerant of failures |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread is exiting due to cancellation"); |
| } |
| break; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| Error err; |
| if (t instanceof Error |
| && SystemFailure.isJVMFailureError(err = (Error)t)) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're |
| // poisoned now, so don't let this thread continue. |
| throw err; |
| } |
| // Whenever you catch Error or Throwable, you must also |
| // check for fatal JVM error (see above). However, there |
| // is _still_ a possibility that you are dealing with a |
| // cascading error condition, so you also need to check to see if |
| // the JVM is still usable: |
| SystemFailure.checkFailure(); |
| if (checkCancelled()) { |
| break; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread: ignoring exception", t); |
| } |
| } |
| } // for |
| } // ensure exit message is printed |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread exiting due to cancellation: " + e); |
| } |
| } finally { |
| logger.info(LocalizedMessage.create(LocalizedStrings.HARegionQueue_THE_QUEUEREMOVALTHREAD_IS_DONE)); |
| } |
| } |
| |
| private Set<InternalDistributedMember> getAllRecipients( |
| GemFireCacheImpl cache, Map map) { |
| Set recipients = new ObjectOpenHashSet(); |
| for (Object pr : map.keySet()) { |
| recipients.addAll(((PartitionedRegion)(cache.getRegion((String)pr))) |
| .getRegionAdvisor().adviseDataStore()); |
| } |
| return recipients; |
| } |
| |
| /** |
| * shutdown this thread and the caller thread will join this thread |
| */ |
| public void shutdown() { |
| this.shutdown = true; |
| this.interrupt(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.join(15 * 1000); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (this.isAlive()) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.HARegionQueue_QUEUEREMOVALTHREAD_IGNORED_CANCELLATION)); |
| } |
| } |
| } |
| |
| protected class ParallelGatewaySenderQueueMetaRegion extends |
| PartitionedRegion { |
| |
| AbstractGatewaySender sender = null; |
| public ParallelGatewaySenderQueueMetaRegion(String regionName, |
| RegionAttributes attrs, LocalRegion parentRegion, |
| GemFireCacheImpl cache, AbstractGatewaySender pgSender) { |
| this( regionName, attrs, parentRegion, cache, pgSender, false); |
| } |
| public ParallelGatewaySenderQueueMetaRegion(String regionName, |
| RegionAttributes attrs, LocalRegion parentRegion, |
| GemFireCacheImpl cache, AbstractGatewaySender pgSender, boolean isUsedForHDFS) { |
| super(regionName, attrs, parentRegion, cache, |
| new InternalRegionArguments().setDestroyLockFlag(true) |
| .setRecreateFlag(false).setSnapshotInputStream(null) |
| .setImageTarget(null) |
| .setIsUsedForParallelGatewaySenderQueue(true) |
| .setParallelGatewaySender((AbstractGatewaySender)pgSender) |
| .setIsUsedForHDFSParallelGatewaySenderQueue(isUsedForHDFS)); |
| this.sender = (AbstractGatewaySender)pgSender; |
| |
| } |
| |
| @Override |
| protected boolean isCopyOnRead() { |
| return false; |
| } |
| |
| // Prevent this region from participating in a TX, bug 38709 |
| @Override |
| final public boolean isSecret() { |
| return true; |
| } |
| |
| //Prevent this region from using concurrency checks |
| @Override |
| final public boolean supportsConcurrencyChecks() { |
| return false; |
| } |
| |
| @Override |
| final protected boolean shouldNotifyBridgeClients() { |
| return false; |
| } |
| |
| @Override |
| final public boolean generateEventID() { |
| return false; |
| } |
| |
| final public boolean isUsedForParallelGatewaySenderQueue() { |
| return true; |
| } |
| |
| final public AbstractGatewaySender getParallelGatewaySender(){ |
| return this.sender; |
| } |
| } |
| |
| public long estimateMemoryFootprint(SingleObjectSizer sizer) { |
| return sizer.sizeof(this) + sizer.sizeof(regionToDispatchedKeysMap) |
| + sizer.sizeof(userRegionNameToshadowPRMap) |
| + sizer.sizeof(bucketToTempQueueMap) + sizer.sizeof(peekedEvents) |
| + sizer.sizeof(conflationExecutor); |
| } |
| public void clear(PartitionedRegion pr, int bucketId) { |
| throw new RuntimeException("This method(clear)is not supported by ParallelGatewaySenderQueue"); |
| } |
| |
| public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException{ |
| throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue"); |
| } |
| } |