| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.wan.serial; |
| |
| import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE; |
| import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES; |
| |
| import java.util.ArrayList; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Predicate; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.cache.AttributesMutator; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.EvictionAction; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.cache.TransactionId; |
| import org.apache.geode.cache.asyncqueue.AsyncEvent; |
| import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.cache.CachedDeserializable; |
| import org.apache.geode.internal.cache.Conflatable; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalRegionArguments; |
| import org.apache.geode.internal.cache.InternalRegionFactory; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.RegionQueue; |
| import org.apache.geode.internal.cache.Token; |
| import org.apache.geode.internal.cache.event.EventTracker; |
| import org.apache.geode.internal.cache.event.NonDistributedEventTracker; |
| import org.apache.geode.internal.cache.versions.RegionVersionVector; |
| import org.apache.geode.internal.cache.versions.VersionSource; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySender; |
| import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; |
| import org.apache.geode.internal.cache.wan.GatewaySenderStats; |
| import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.management.ManagementService; |
| import org.apache.geode.management.internal.beans.AsyncEventQueueMBean; |
| import org.apache.geode.management.internal.beans.GatewaySenderMBean; |
| import org.apache.geode.pdx.internal.PeerTypeRegistration; |
| import org.apache.geode.util.internal.GeodeGlossary; |
| |
| /** |
| * @since GemFire 7.0 |
| */ |
| public class SerialGatewaySenderQueue implements RegionQueue { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * The key into the <code>Region</code> used when taking entries from the queue. This value is |
| * either set when the queue is instantiated or read from the <code>Region</code> in the case |
| * where this queue takes over where a previous one left off. |
| */ |
| private long headKey = -1; |
| |
| /** |
| * The key into the <code>Region</code> used when putting entries onto the queue. This value is |
| * either set when the queue is instantiated or read from the <code>Region</code> in the case |
| * where this queue takes over where a previous one left off. |
| */ |
| private final AtomicLong tailKey = new AtomicLong(); |
| |
| /** |
| * Last key peeked from the queue excluding the keys peeked |
| * to complete transactions when group-transaction-events is enabled. |
| */ |
| private final AtomicLong lastPeekedId = new AtomicLong(-1); |
| |
| private final Deque<Long> peekedIds = new LinkedBlockingDeque<Long>(); |
| |
| /** |
| * Contains the set of peekedIds that were peeked to complete a transaction |
| * inside a batch when groupTransactionEvents is set. |
| */ |
| private final Set<Long> extraPeekedIds = ConcurrentHashMap.newKeySet(); |
| |
| /** |
| * Contains the set of peekedIds that were peeked to complete a transaction |
| * inside a batch when groupTransactionEvents is set and that have |
| * been sent in a batch but have not yet been removed. |
| */ |
| private final Set<Long> extraPeekedIdsSentNotRemoved = ConcurrentHashMap.newKeySet(); |
| |
| /** |
| * The name of the <code>Region</code> backing this queue |
| */ |
| private final String regionName; |
| |
| /** |
| * The <code>Region</code> backing this queue |
| */ |
| private Region<Long, AsyncEvent> region; |
| |
| /** |
| * The name of the <code>DiskStore</code> to overflow this queue |
| */ |
| private String diskStoreName; |
| |
| /** |
| * The maximum number of entries in a batch. |
| */ |
| private int batchSize; |
| |
| /** |
| * The maximum amount of memory (MB) to allow in the queue before overflowing entries to disk |
| */ |
| private int maximumQueueMemory; |
| |
| /** |
| * Whether conflation is enabled for this queue. |
| */ |
| private boolean enableConflation; |
| |
| /** |
| * Whether persistence is enabled for this queue. |
| */ |
| private boolean enablePersistence; |
| |
| private final boolean cleanQueues; |
| |
| /** |
| * Whether write to disk is synchronous. |
| */ |
| private boolean isDiskSynchronous; |
| |
| /** |
| * The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast |
| * updating of entries in the queue for conflation. |
| */ |
| private final Map<String, Map<Object, Long>> indexes; |
| |
| private final GatewaySenderStats stats; |
| |
| /** |
| * The maximum allowed key before the keys are rolled over |
| */ |
| private static final long MAXIMUM_KEY = Long.MAX_VALUE; |
| |
| /** |
| * Whether the <code>Gateway</code> queue should be no-ack instead of ack. |
| */ |
| private static final boolean NO_ACK = |
| Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "gateway-queue-no-ack"); |
| |
| private volatile long lastDispatchedKey = -1; |
| |
| private volatile long lastDestroyedKey = -1; |
| |
| public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 1; |
| |
| @Immutable |
| private static final int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL; |
| |
| private BatchRemovalThread removalThread = null; |
| |
| private AbstractGatewaySender sender = null; |
| |
| private MetaRegionFactory metaRegionFactory; |
| |
| public SerialGatewaySenderQueue(AbstractGatewaySender abstractSender, String regionName, |
| CacheListener listener, boolean cleanQueues) { |
| this(abstractSender, regionName, listener, cleanQueues, new MetaRegionFactory()); |
| } |
| |
| public SerialGatewaySenderQueue(AbstractGatewaySender abstractSender, String regionName, |
| CacheListener listener, boolean cleanQueues, MetaRegionFactory metaRegionFactory) { |
| // The queue starts out with headKey and tailKey equal to -1 to force |
| // them to be initialized from the region. |
| this.regionName = regionName; |
| this.cleanQueues = cleanQueues; |
| this.metaRegionFactory = metaRegionFactory; |
| this.headKey = -1; |
| this.tailKey.set(-1); |
| this.indexes = new HashMap<String, Map<Object, Long>>(); |
| this.enableConflation = abstractSender.isBatchConflationEnabled(); |
| this.diskStoreName = abstractSender.getDiskStoreName(); |
| this.batchSize = abstractSender.getBatchSize(); |
| this.enablePersistence = abstractSender.isPersistenceEnabled(); |
| if (this.enablePersistence) { |
| this.isDiskSynchronous = abstractSender.isDiskSynchronous(); |
| } else { |
| this.isDiskSynchronous = false; |
| } |
| this.maximumQueueMemory = abstractSender.getMaximumMemeoryPerDispatcherQueue(); |
| this.stats = abstractSender.getStatistics(); |
| initializeRegion(abstractSender, listener); |
| // Increment queue size. Fix for bug 51988. |
| this.stats.incQueueSize(this.region.size()); |
| this.removalThread = new BatchRemovalThread(abstractSender.getCache()); |
| this.removalThread.start(); |
| this.sender = abstractSender; |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Contains {} elements", this, size()); |
| } |
| } |
| |
| @Override |
| public Region<Long, AsyncEvent> getRegion() { |
| return this.region; |
| } |
| |
| public void destroy() { |
| getRegion().localDestroyRegion(); |
| } |
| |
| @Override |
| public synchronized boolean put(Object event) throws CacheException { |
| GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event; |
| final Region r = eventImpl.getRegion(); |
| final boolean isPDXRegion = |
| (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME)); |
| final boolean isWbcl = |
| this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX); |
| if (!(isPDXRegion && isWbcl)) { |
| putAndGetKey(event); |
| return true; |
| } |
| return false; |
| } |
| |
| private long putAndGetKey(Object object) throws CacheException { |
| // Get the tail key |
| Long key = Long.valueOf(getTailKey()); |
| // Put the object into the region at that key |
| this.region.put(key, (AsyncEvent) object); |
| |
| // Increment the tail key |
| // It is important that we increment the tail |
| // key after putting in the region, this is the |
| // signal that a new object is available. |
| incrementTailKey(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Inserted {} -> {}", this, key, object); |
| } |
| if (object instanceof Conflatable) { |
| removeOldEntry((Conflatable) object, key); |
| } |
| return key.longValue(); |
| } |
| |
| |
| @Override |
| public AsyncEvent take() throws CacheException { |
| // Unsupported since we have no callers. |
| // If we do want to support it then each caller needs |
| // to call freeOffHeapResources and the returned GatewaySenderEventImpl |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public List<AsyncEvent> take(int batchSize) throws CacheException { |
| // This method has no callers. |
| // If we do want to support it then the callers |
| // need to call freeOffHeapResources on each returned GatewaySenderEventImpl |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * This method removes the last entry. However, it will only let the user remove entries that they |
| * have peeked. If the entry was not peeked, this method will silently return. |
| */ |
| @Override |
| public synchronized void remove() throws CacheException { |
| if (peekedIds.isEmpty()) { |
| return; |
| } |
| Long key = peekedIds.remove(); |
| boolean isExtraPeeked = extraPeekedIds.remove(key); |
| try { |
| // Increment the head key |
| if (!isExtraPeeked) { |
| updateHeadKey(key.longValue()); |
| } |
| removeIndex(key); |
| // Remove the entry at that key with a callback arg signifying it is |
| // a WAN queue so that AbstractRegionEntry.destroy can get the value |
| // even if it has been evicted to disk. In the normal case, the |
| // AbstractRegionEntry.destroy only gets the value in the VM. |
| this.region.localDestroy(key, WAN_QUEUE_TOKEN); |
| this.stats.decQueueSize(); |
| |
| } catch (EntryNotFoundException ok) { |
| // this is acceptable because the conflation can remove entries |
| // out from underneath us. |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", |
| this, key); |
| } |
| } |
| |
| boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; |
| if (!isExtraPeeked) { |
| this.lastDispatchedKey = key; |
| // Remove also the extraPeekedIds right after this one so that |
| // they do not stay in the secondary's queue forever |
| long tmpKey = key; |
| while (extraPeekedIdsSentNotRemoved.contains(tmpKey = inc(tmpKey))) { |
| extraPeekedIdsSentNotRemoved.remove(tmpKey); |
| this.lastDispatchedKey = tmpKey; |
| updateHeadKey(tmpKey); |
| } |
| } else { |
| extraPeekedIdsSentNotRemoved.add(key); |
| // Remove if previous key was already dispatched so that it does |
| // not stay in the secondary's queue forever |
| long tmpKey = dec(key); |
| if (this.lastDispatchedKey == tmpKey) { |
| this.lastDispatchedKey = key; |
| updateHeadKey(key); |
| } |
| } |
| if (wasEmpty) { |
| synchronized (this) { |
| notifyAll(); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}", |
| this, key, this.lastDispatchedKey, this.lastDestroyedKey); |
| } |
| } |
| |
| /** |
| * This method removes batchSize entries from the queue. It will only remove entries that were |
| * previously peeked. |
| * |
| * @param size the number of entries to remove |
| */ |
| @Override |
| public void remove(int size) throws CacheException { |
| for (int i = 0; i < size; i++) { |
| remove(); |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Removed a batch of {} entries", this, size); |
| } |
| } |
| |
| @Override |
| public Object peek() throws CacheException { |
| KeyAndEventPair object = peekAhead(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Peeked {} -> {}", this, peekedIds, object); |
| } |
| |
| return object.event; |
| // OFFHEAP returned object only used to see if queue is empty |
| // so no need to worry about off-heap refCount. |
| } |
| |
| @Override |
| public List<AsyncEvent<?, ?>> peek(int size) throws CacheException { |
| return peek(size, -1); |
| } |
| |
| @Override |
| public List<AsyncEvent<?, ?>> peek(int size, int timeToWait) throws CacheException { |
| final boolean isTraceEnabled = logger.isTraceEnabled(); |
| |
| long start = System.currentTimeMillis(); |
| long end = start + timeToWait; |
| if (isTraceEnabled) { |
| logger.trace("{}: Peek start time={} end time={} time to wait={}", this, start, end, |
| timeToWait); |
| } |
| |
| List<AsyncEvent<?, ?>> batch = |
| new ArrayList<>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size); |
| long lastKey = -1; |
| while (size == BATCH_BASED_ON_TIME_ONLY || batch.size() < size) { |
| KeyAndEventPair pair = peekAhead(); |
| // Conflate here |
| if (pair != null) { |
| AsyncEvent<?, ?> object = pair.event; |
| lastKey = pair.key; |
| batch.add(object); |
| } else { |
| // If time to wait is -1 (don't wait) or time interval has elapsed |
| long currentTime = System.currentTimeMillis(); |
| if (isTraceEnabled) { |
| logger.trace("{}: Peek current time: {}", this, currentTime); |
| } |
| if (timeToWait == -1 || (end <= currentTime)) { |
| if (isTraceEnabled) { |
| logger.trace("{}: Peek breaking", this); |
| } |
| break; |
| } |
| |
| if (isTraceEnabled) { |
| logger.trace("{}: Peek continuing", this); |
| } |
| // Sleep a bit before trying again. |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| break; |
| } |
| continue; |
| } |
| } |
| if (batch.size() > 0) { |
| peekEventsFromIncompleteTransactions(batch, lastKey); |
| } |
| |
| if (isTraceEnabled) { |
| logger.trace("{}: Peeked a batch of {} entries", this, batch.size()); |
| } |
| return batch; |
| // OFFHEAP: all returned AsyncEvent end up being removed from queue after the batch is sent |
| // so no need to worry about off-heap refCount. |
| } |
| |
| private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) { |
| if (!mustGroupTransactionEvents()) { |
| return; |
| } |
| |
| Set<TransactionId> incompleteTransactionIdsInBatch = getIncompleteTransactionsInBatch(batch); |
| if (incompleteTransactionIdsInBatch.size() == 0) { |
| return; |
| } |
| |
| for (TransactionId transactionId : incompleteTransactionIdsInBatch) { |
| boolean areAllEventsForTransactionInBatch = false; |
| int retries = 0; |
| long lastKeyForTransaction = lastKey; |
| while (!areAllEventsForTransactionInBatch |
| && retries++ < GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES) { |
| EventsAndLastKey eventsAndKey = |
| peekEventsWithTransactionId(transactionId, lastKeyForTransaction); |
| |
| for (Object object : eventsAndKey.events) { |
| GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; |
| batch.add(event); |
| areAllEventsForTransactionInBatch = event.isLastEventInTransaction(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Peeking extra event: {}, isLastEventInTransaction: {}, batch size: {}", |
| event.getKey(), event.isLastEventInTransaction(), batch.size()); |
| } |
| } |
| lastKeyForTransaction = eventsAndKey.lastKey; |
| } |
| if (!areAllEventsForTransactionInBatch) { |
| logger.warn("Not able to retrieve all events for transaction {} after {} tries", |
| transactionId, retries); |
| } |
| } |
| } |
| |
| protected boolean mustGroupTransactionEvents() { |
| return sender.mustGroupTransactionEvents(); |
| } |
| |
| private Set<TransactionId> getIncompleteTransactionsInBatch(List<AsyncEvent<?, ?>> batch) { |
| Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>(); |
| for (Object object : batch) { |
| if (object instanceof GatewaySenderEventImpl) { |
| GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; |
| if (event.getTransactionId() != null) { |
| if (event.isLastEventInTransaction()) { |
| incompleteTransactionsInBatch.remove(event.getTransactionId()); |
| } else { |
| incompleteTransactionsInBatch.add(event.getTransactionId()); |
| } |
| } |
| } |
| } |
| return incompleteTransactionsInBatch; |
| } |
| |
| @Override |
| public String toString() { |
| return "SerialGatewaySender queue :" + this.regionName; |
| } |
| |
| @Override |
| public int size() { |
| int size = ((LocalRegion) this.region).entryCount(); |
| return size + this.sender.getTmpQueuedEventSize(); |
| } |
| |
| @Override |
| @SuppressWarnings("rawtypes") |
| public void addCacheListener(CacheListener listener) { |
| AttributesMutator mutator = this.region.getAttributesMutator(); |
| mutator.addCacheListener(listener); |
| } |
| |
| @Override |
| @SuppressWarnings("rawtypes") |
| public void removeCacheListener() { |
| AttributesMutator mutator = this.region.getAttributesMutator(); |
| CacheListener[] listeners = this.region.getAttributes().getCacheListeners(); |
| for (CacheListener listener : listeners) { |
| if (listener instanceof SerialSecondaryGatewayListener) { |
| mutator.removeCacheListener(listener); |
| break; |
| } |
| } |
| } |
| |
| private boolean removeOldEntry(Conflatable object, Long tailKey) throws CacheException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| boolean keepOldEntry = true; |
| |
| // Determine whether conflation is enabled for this queue and object |
| // Conflation is enabled iff: |
| // - this queue has conflation enabled |
| // - the object can be conflated |
| if (this.enableConflation && object.shouldBeConflated()) { |
| if (isDebugEnabled) { |
| logger.debug("{}: Conflating {} at queue index={} queue size={} head={} tail={}", this, |
| object, tailKey, size(), this.headKey, tailKey); |
| } |
| |
| // Determine whether this region / key combination is already indexed. |
| // If so, it is already in the queue. Update the value in the queue and |
| // set the shouldAddToQueue flag accordingly. |
| String rName = object.getRegionToConflate(); |
| Object key = object.getKeyToConflate(); |
| Long previousIndex; |
| |
| synchronized (this) { |
| Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName); |
| if (latestIndexesForRegion == null) { |
| latestIndexesForRegion = new HashMap<>(); |
| this.indexes.put(rName, latestIndexesForRegion); |
| } |
| |
| previousIndex = latestIndexesForRegion.put(key, tailKey); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("{}: Adding index key={}->index={} for {} head={} tail={}", this, key, tailKey, |
| object, this.headKey, tailKey); |
| } |
| // Test if the key is contained in the latest indexes map. If the key is |
| // not contained in the latest indexes map, then it should be added to |
| // the queue. |
| // |
| // It no longer matters if we remove an entry that is going out in the |
| // current |
| // batch, because we already put the latest value on the tail of the |
| // queue, and |
| // peekedIds list prevents us from removing an entry that was not peeked. |
| if (previousIndex != null) { |
| if (isDebugEnabled) { |
| logger.debug( |
| "{}: Indexes contains index={} for key={} head={} tail={} and it can be used.", this, |
| previousIndex, key, this.headKey, tailKey); |
| } |
| keepOldEntry = false; |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("{}: No old entry for key={} head={} tail={} not removing old entry.", this, |
| key, this.headKey, tailKey); |
| } |
| this.stats.incConflationIndexesMapSize(); |
| keepOldEntry = true; |
| } |
| |
| // Replace the object's value into the queue if necessary |
| if (!keepOldEntry) { |
| Conflatable previous = (Conflatable) this.region.remove(previousIndex); |
| this.stats.decQueueSize(1); |
| if (isDebugEnabled) { |
| logger.debug("{}: Previous conflatable at key={} head={} tail={}: {}", this, |
| previousIndex, this.headKey, tailKey, previous); |
| logger.debug("{}: Current conflatable at key={} head={} tail={}: {}", this, tailKey, |
| this.headKey, tailKey, object); |
| if (previous != null) { |
| logger.debug( |
| "{}: Removed {} and added {} for key={} head={} tail={} in queue for region={} old event={}", |
| this, previous.getValueToConflate(), object.getValueToConflate(), key, this.headKey, |
| tailKey, rName, previous); |
| } |
| } |
| } |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("{}: Not conflating {} queue size: {} head={} tail={}", this, object, size(), |
| this.headKey, tailKey); |
| } |
| } |
| return keepOldEntry; |
| } |
| |
| /** |
| * Does a get that gets the value without fault values in from disk. |
| */ |
| private AsyncEvent optimalGet(Long k) { |
| // Get the object at that key (to remove the index). |
| LocalRegion lr = (LocalRegion) this.region; |
| Object o = null; |
| try { |
| o = lr.getValueInVMOrDiskWithoutFaultIn(k); |
| if (o instanceof CachedDeserializable) { |
| o = ((CachedDeserializable) o).getDeserializedValue(lr, lr.getRegionEntry(k)); |
| } |
| } catch (EntryNotFoundException ok) { |
| // just return null; |
| } |
| // bug #46023 do not return a destroyed entry marker |
| if (o == Token.TOMBSTONE) { |
| o = null; |
| } |
| return (AsyncEvent) o; |
| } |
| |
| /* |
| * this must be invoked under synchronization |
| */ |
| private void removeIndex(Long qkey) { |
| // Determine whether conflation is enabled for this queue and object |
| if (this.enableConflation) { |
| // only call get after checking enableConflation for bug 40508 |
| Object o = optimalGet(qkey); |
| if (o instanceof Conflatable) { |
| Conflatable object = (Conflatable) o; |
| if (object.shouldBeConflated()) { |
| // Otherwise, remove the index from the indexes map. |
| String rName = object.getRegionToConflate(); |
| Object key = object.getKeyToConflate(); |
| |
| Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName); |
| if (latestIndexesForRegion != null) { |
| // Remove the index. |
| Long index = latestIndexesForRegion.remove(key); |
| // Decrement the size if something was removed. This check is for |
| // the case where failover has occurred and the entry was not put |
| // into the map initially. |
| if (index != null) { |
| this.stats.decConflationIndexesMapSize(); |
| } |
| if (logger.isDebugEnabled()) { |
| if (index != null) { |
| logger.debug("{}: Removed index {} for {}", this, index, object); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * returns true if key a is before key b. This test handles keys that have wrapped around |
| * |
| */ |
| private boolean before(long a, long b) { |
| // a is before b if a < b or a>b and a MAXIMUM_KEY/2 larger than b |
| // (indicating we have wrapped) |
| return a < b ^ a - b > (MAXIMUM_KEY / 2); |
| } |
| |
| private long inc(long value) { |
| long val = value + 1; |
| val = val == MAXIMUM_KEY ? 0 : val; |
| return val; |
| } |
| |
| private long dec(long value) { |
| long val = value - 1; |
| val = val == -1 ? MAXIMUM_KEY - 1 : val; |
| return val; |
| } |
| |
| |
| /** |
| * Clear the list of peeked keys. The next peek will start again at the head key. |
| */ |
| public void resetLastPeeked() { |
| peekedIds.clear(); |
| extraPeekedIds.clear(); |
| lastPeekedId.set(-1); |
| } |
| |
| /** |
| * Finds the next object after the last key peeked |
| * |
| */ |
| private Long getCurrentKey() { |
| long currentKey; |
| if (lastPeekedId.get() == -1) { |
| currentKey = getHeadKey(); |
| } else { |
| currentKey = inc(lastPeekedId.get()); |
| } |
| return currentKey; |
| } |
| |
| private AsyncEvent getObjectInSerialSenderQueue(Long currentKey) { |
| AsyncEvent object = optimalGet(currentKey); |
| if ((null != object) && logger.isDebugEnabled()) { |
| logger.debug("{}: Peeked {}->{}", this, currentKey, object); |
| } |
| if (object != null && object instanceof GatewaySenderEventImpl) { |
| GatewaySenderEventImpl copy = ((GatewaySenderEventImpl) object).makeHeapCopyIfOffHeap(); |
| if (copy == null) { |
| logger.debug( |
| "Unable to make heap copy and will not be added to peekedIds for object" + " : {} ", |
| object.toString()); |
| } |
| object = copy; |
| } |
| return object; |
| } |
| |
| @VisibleForTesting |
| static class KeyAndEventPair { |
| public final long key; |
| public final AsyncEvent event; |
| |
| KeyAndEventPair(Long key, AsyncEvent event) { |
| this.key = key; |
| this.event = event; |
| } |
| } |
| |
| @VisibleForTesting |
| public KeyAndEventPair peekAhead() throws CacheException { |
| AsyncEvent object = null; |
| Long currentKey = getCurrentKey(); |
| if (currentKey == null) { |
| return null; |
| } |
| |
| // It's important here that we check where the current key |
| // is in relation to the tail key before we check to see if the |
| // object exists. The reason is that the tail key is basically |
| // the synchronization between this thread and the putter thread. |
| // The tail key will not be incremented until the object is put in the |
| // region |
| // If we check for the object, and then check the tail key, we could |
| // skip objects. |
| |
| // TODO: don't do a get which updates the lru, instead just get the value |
| // w/o modifying the LRU. |
| // Note: getting the serialized form here (if it has overflowed to disk) |
| // does not save anything since GatewayBatchOp needs to GatewayEventImpl |
| // in object form. |
| while (before(currentKey, getTailKey())) { |
| if (!extraPeekedIds.contains(currentKey)) { |
| object = getObjectInSerialSenderQueue(currentKey); |
| if (object != null) { |
| break; |
| } |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Trying head key + offset: {}", this, currentKey); |
| } |
| currentKey = inc(currentKey); |
| // When mustGroupTransactionEvents is true, conflation cannot be enabled. |
| // Therefore, if we reach here, it would not be due to a conflated event |
| // but rather to an extra peeked event already sent. |
| if (!mustGroupTransactionEvents() && this.stats != null) { |
| this.stats.incEventsNotQueuedConflated(); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Peeked {}->{}", this, currentKey, object); |
| } |
| |
| if (object != null) { |
| peekedIds.add(currentKey); |
| lastPeekedId.set(currentKey); |
| return new KeyAndEventPair(currentKey, object); |
| } |
| return null; |
| } |
| |
| private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId, long lastKey) { |
| Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = |
| x -> transactionId.equals(x.getTransactionId()); |
| Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = |
| x -> x.isLastEventInTransaction(); |
| |
| return getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate, |
| lastKey); |
| } |
| |
| static class EventsAndLastKey { |
| public final List<Object> events; |
| public final long lastKey; |
| |
| EventsAndLastKey(List<Object> events, long lastKey) { |
| this.events = events; |
| this.lastKey = lastKey; |
| } |
| } |
| |
| EventsAndLastKey getElementsMatching(Predicate condition, Predicate stopCondition, long lastKey) { |
| Object object; |
| List elementsMatching = new ArrayList<>(); |
| |
| long currentKey = lastKey; |
| |
| while ((currentKey = inc(currentKey)) != getTailKey()) { |
| if (extraPeekedIds.contains(currentKey)) { |
| continue; |
| } |
| object = optimalGet(currentKey); |
| if (object == null) { |
| continue; |
| } |
| |
| if (condition.test(object)) { |
| elementsMatching.add(object); |
| peekedIds.add(currentKey); |
| extraPeekedIds.add(currentKey); |
| lastKey = currentKey; |
| |
| if (stopCondition.test(object)) { |
| break; |
| } |
| } |
| } |
| |
| return new EventsAndLastKey(elementsMatching, lastKey); |
| } |
| |
| /** |
| * Returns the value of the tail key. The tail key points to an empty where the next queue entry |
| * will be stored. |
| * |
| * @return the value of the tail key |
| */ |
| private long getTailKey() throws CacheException { |
| long tlKey; |
| // Test whether tailKey = -1. If so, the queue has just been created. |
| // Go into the region to get the value of TAIL_KEY. If it is null, then |
| // this is the first attempt to access this queue. Set the tailKey and |
| // tailKey appropriately (to 0). If there is a value in the region, then |
| // this queue has been accessed before and this instance is taking up where |
| // a previous one left off. Set the tailKey to the value in the region. |
| // From now on, this queue will use the value of tailKey in the VM to |
| // determine the tailKey. If the tailKey != -1, set the tailKey |
| // to the value of the tailKey. |
| initializeKeys(); |
| |
| tlKey = this.tailKey.get(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Determined tail key: {}", this, tlKey); |
| } |
| return tlKey; |
| } |
| |
| /** |
| * Increments the value of the tail key by one. |
| * |
| */ |
| private void incrementTailKey() throws CacheException { |
| this.tailKey.set(inc(this.tailKey.get())); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Incremented TAIL_KEY for region {} to {}", this, this.region.getName(), |
| this.tailKey); |
| } |
| } |
| |
| /** |
| * If the keys are not yet initialized, initialize them from the region . |
| * |
| * TODO - We could initialize the indexes maps at the time here. However, that would require |
| * iterating over the values of the region rather than the keys, which could be much more |
| * expensive if the region has overflowed to disk. |
| * |
| * We do iterate over the values of the region in SerialGatewaySender at the time of failover. see |
| * SerialGatewaySender.handleFailover. So there's a possibility we can consolidate that code with |
| * this method and iterate over the region once. |
| * |
| */ |
| private void initializeKeys() throws CacheException { |
| if (tailKey.get() != -1) { |
| return; |
| } |
| synchronized (this) { |
| long largestKey = -1; |
| long largestKeyLessThanHalfMax = -1; |
| long smallestKey = -1; |
| long smallestKeyGreaterThanHalfMax = -1; |
| |
| Set<Long> keySet = this.region.keySet(); |
| for (Long key : keySet) { |
| long k = key.longValue(); |
| if (k > largestKey) { |
| largestKey = k; |
| } |
| if (k > largestKeyLessThanHalfMax && k < MAXIMUM_KEY / 2) { |
| largestKeyLessThanHalfMax = k; |
| } |
| |
| if (k < smallestKey || smallestKey == -1) { |
| smallestKey = k; |
| } |
| if ((k < smallestKeyGreaterThanHalfMax || smallestKeyGreaterThanHalfMax == -1) |
| && k > MAXIMUM_KEY / 2) { |
| smallestKeyGreaterThanHalfMax = k; |
| } |
| } |
| |
| // Test to see if the current set of keys has keys that are |
| // both before and after we wrapped around the MAXIMUM_KEY |
| // If we do have keys that wrapped, the |
| // head key should be something close to MAXIMUM_KEY |
| // and the tail key should be something close to 0. |
| // Here, I'm guessing that the head key should be greater than |
| // MAXIMUM_KEY/2 |
| // and the head key - tail key > MAXIMUM/2. |
| if (smallestKeyGreaterThanHalfMax != -1 && largestKeyLessThanHalfMax != -1 |
| && (smallestKeyGreaterThanHalfMax - largestKeyLessThanHalfMax) > MAXIMUM_KEY / 2) { |
| this.headKey = smallestKeyGreaterThanHalfMax; |
| this.tailKey.set(inc(largestKeyLessThanHalfMax)); |
| logger.info("{}: During failover, detected that keys have wrapped tailKey={} headKey={}", |
| new Object[] {this, this.tailKey, Long.valueOf(this.headKey)}); |
| } else { |
| this.headKey = smallestKey == -1 ? 0 : smallestKey; |
| this.tailKey.set(inc(largestKey)); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey, |
| this.headKey); |
| } |
| } |
| } |
| |
| /** |
| * Returns the value of the head key. The head key points to the next entry to be removed from the |
| * queue. |
| * |
| * @return the value of the head key |
| */ |
| private long getHeadKey() throws CacheException { |
| long hKey; |
| // Test whether _headKey = -1. If so, the queue has just been created. |
| // Go into the region to get the value of HEAD_KEY. If it is null, then |
| // this is the first attempt to access this queue. Set the _headKey and |
| // headKey appropriately (to 0). If there is a value in the region, then |
| // this queue has been accessed before and this instance is taking up where |
| // a previous one left off. Set the _headKey to the value in the region. |
| // From now on, this queue will use the value of _headKey in the VM to |
| // determine the headKey. If the _headKey != -1, set the headKey |
| // to the value of the _headKey. |
| initializeKeys(); |
| hKey = this.headKey; |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Determined head key: {}", this, hKey); |
| } |
| return hKey; |
| } |
| |
| /** |
| * Increments the value of the head key by one. |
| * |
| */ |
| private void updateHeadKey(long destroyedKey) throws CacheException { |
| this.headKey = inc(destroyedKey); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Incremented HEAD_KEY for region {} to {}", this, this.region.getName(), |
| this.headKey); |
| } |
| } |
| |
| /** |
| * Initializes the <code>Region</code> backing this queue. The <code>Region</code>'s scope is |
| * DISTRIBUTED_NO_ACK and mirror type is KEYS_VALUES and is set to overflow to disk based on the |
| * <code>GatewayQueueAttributes</code>. |
| * |
| * @param sender The GatewaySender <code>SerialGatewaySenderImpl</code> |
| * @param listener The GemFire <code>CacheListener</code>. The <code>CacheListener</code> can be |
| * null. |
| */ |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| private void initializeRegion(AbstractGatewaySender sender, CacheListener listener) { |
| final InternalCache gemCache = sender.getCache(); |
| this.region = gemCache.getRegion(this.regionName); |
| if (this.region == null) { |
| RegionShortcut regionShortcut; |
| if (enablePersistence) { |
| regionShortcut = RegionShortcut.REPLICATE_PERSISTENT; |
| } else { |
| regionShortcut = RegionShortcut.REPLICATE; |
| } |
| InternalRegionFactory<Long, AsyncEvent> factory = |
| gemCache.createInternalRegionFactory(regionShortcut); |
| if (NO_ACK) { |
| factory.setScope(Scope.DISTRIBUTED_NO_ACK); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("The policy of region is {}", |
| (this.enablePersistence ? DataPolicy.PERSISTENT_REPLICATE : DataPolicy.REPLICATE)); |
| } |
| // Set listener if it is not null. The listener will be non-null |
| // when the user of this queue is a secondary VM. |
| if (listener != null) { |
| factory.addCacheListener(listener); |
| } |
| // allow for no overflow directory |
| EvictionAttributes ea = EvictionAttributes.createLIFOMemoryAttributes(this.maximumQueueMemory, |
| EvictionAction.OVERFLOW_TO_DISK); |
| |
| factory.setEvictionAttributes(ea); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| factory.setDiskStoreName(this.diskStoreName); |
| |
| // In case of persistence write to disk sync and in case of eviction write in async |
| factory.setDiskSynchronous(this.isDiskSynchronous); |
| |
| // Create the region |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Attempting to create queue region: {}", this, this.regionName); |
| } |
| final RegionAttributes<Long, AsyncEvent> ra = factory.getCreateAttributes(); |
| try { |
| SerialGatewaySenderQueueMetaRegion meta = |
| metaRegionFactory.newMetaRegion(gemCache, this.regionName, ra, sender); |
| factory |
| .setInternalMetaRegion(meta) |
| .setDestroyLockFlag(true) |
| .setSnapshotInputStream(null) |
| .setImageTarget(null) |
| .setIsUsedForSerialGatewaySenderQueue(true) |
| .setInternalRegion(true) |
| .setSerialGatewaySender(sender); |
| region = factory.create(regionName); |
| // Add overflow statistics to the mbean |
| addOverflowStatisticsToMBean(gemCache, sender); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Created queue region: {}", this, this.region); |
| } |
| } catch (CacheException e) { |
| logger.fatal(String.format("%s: The queue region named %s could not be created", |
| new Object[] {this, this.regionName}), |
| e); |
| } |
| } else { |
| if (listener != null) { |
| addCacheListener(listener); |
| } |
| } |
| if ((this.region != null) && this.cleanQueues) { |
| this.region.clear(); |
| } |
| } |
| |
| @VisibleForTesting |
| protected void addOverflowStatisticsToMBean(Cache cache, AbstractGatewaySender sender) { |
| // Get the appropriate mbean and add the overflow stats to it |
| LocalRegion lr = (LocalRegion) this.region; |
| ManagementService service = ManagementService.getManagementService(cache); |
| if (sender.getId().contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX)) { |
| AsyncEventQueueMBean bean = (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean( |
| AsyncEventQueueImpl.getAsyncEventQueueIdFromSenderId(sender.getId())); |
| |
| if (bean != null) { |
| // Add the eviction stats |
| bean.getBridge().addOverflowStatistics(lr.getEvictionStatistics()); |
| |
| // Add the disk region stats |
| bean.getBridge().addOverflowStatistics(lr.getDiskRegion().getStats().getStats()); |
| } |
| } else { |
| GatewaySenderMBean bean = |
| (GatewaySenderMBean) service.getLocalGatewaySenderMXBean(sender.getId()); |
| |
| if (bean != null) { |
| // Add the eviction stats |
| bean.getBridge().addOverflowStatistics(lr.getEvictionStatistics()); |
| |
| // Add the disk region stats |
| bean.getBridge().addOverflowStatistics(lr.getDiskRegion().getStats().getStats()); |
| } |
| } |
| } |
| |
| public void cleanUp() { |
| if (this.removalThread != null) { |
| this.removalThread.shutdown(); |
| } |
| } |
| |
| public boolean isRemovalThreadAlive() { |
| if (this.removalThread != null) { |
| return this.removalThread.isAlive(); |
| } |
| return false; |
| } |
| |
| @Override |
| public void close() { |
| Region r = getRegion(); |
| if (r != null && !r.isDestroyed()) { |
| try { |
| r.close(); |
| } catch (RegionDestroyedException e) { |
| } |
| } |
| |
| } |
| |
| private class BatchRemovalThread extends Thread { |
| /** |
| * boolean to make a shutdown request |
| */ |
| private volatile boolean shutdown = false; |
| |
| private final InternalCache cache; |
| |
| /** |
| * Constructor : Creates and initializes the thread |
| * |
| */ |
| public BatchRemovalThread(InternalCache c) { |
| this.setDaemon(true); |
| this.cache = c; |
| } |
| |
| private boolean checkCancelled() { |
| if (shutdown) { |
| return true; |
| } |
| if (cache.getCancelCriterion().isCancelInProgress()) { |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void run() { |
| InternalDistributedSystem ids = cache.getInternalDistributedSystem(); |
| |
| try { // ensure exit message is printed |
| // Long waitTime = Long.getLong(QUEUE_REMOVAL_WAIT_TIME, 1000); |
| for (;;) { |
| try { // be somewhat tolerant of failures |
| if (checkCancelled()) { |
| break; |
| } |
| |
| // TODO : make the thread running time configurable |
| boolean interrupted = Thread.interrupted(); |
| try { |
| synchronized (this) { |
| this.wait(messageSyncInterval * 1000L); |
| } |
| } catch (InterruptedException e) { |
| interrupted = true; |
| if (checkCancelled()) { |
| break; |
| } |
| |
| break; // desperation...we must be trying to shut down...? |
| } finally { |
| // Not particularly important since we're exiting the thread, |
| // but following the pattern is still good practice... |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread about to send the last Dispatched key {}", |
| lastDispatchedKey); |
| } |
| |
| long temp; |
| synchronized (SerialGatewaySenderQueue.this) { |
| temp = lastDispatchedKey; |
| boolean wasEmpty = temp == lastDestroyedKey; |
| while (lastDispatchedKey == lastDestroyedKey) { |
| SerialGatewaySenderQueue.this.wait(); |
| temp = lastDispatchedKey; |
| } |
| if (wasEmpty) |
| continue; |
| } |
| // release not needed since disallowOffHeapValues called |
| EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY, |
| (lastDestroyedKey + 1), null/* newValue */, null, false, cache.getMyId()); |
| event.disallowOffHeapValues(); |
| event.setTailKey(temp); |
| |
| BatchDestroyOperation op = new BatchDestroyOperation(event); |
| op.distribute(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread completed destroy of keys from {} to {}", |
| lastDestroyedKey, temp); |
| } |
| lastDestroyedKey = temp; |
| |
| } // be somewhat tolerant of failures |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread is exiting due to cancellation"); |
| } |
| break; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| if (checkCancelled()) { |
| break; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread: ignoring exception", t); |
| } |
| } |
| } // for |
| } // ensure exit message is printed |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("BatchRemovalThread exiting due to cancellation: " + e); |
| } |
| } finally { |
| logger.info("The QueueRemovalThread is done."); |
| } |
| } |
| |
| /** |
| * shutdown this thread and the caller thread will join this thread |
| */ |
| public void shutdown() { |
| this.shutdown = true; |
| this.interrupt(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.join(15 * 1000); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (this.isAlive()) { |
| logger.warn("QueueRemovalThread ignored cancellation"); |
| } |
| } |
| } |
| |
| public static class SerialGatewaySenderQueueMetaRegion extends DistributedRegion { |
| AbstractGatewaySender sender = null; |
| |
| protected SerialGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs, |
| LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender sender, |
| StatisticsClock statisticsClock) { |
| super(regionName, attrs, parentRegion, cache, |
| new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false) |
| .setSnapshotInputStream(null).setImageTarget(null) |
| .setIsUsedForSerialGatewaySenderQueue(true).setSerialGatewaySender(sender), |
| statisticsClock); |
| this.sender = sender; |
| } |
| |
| // Prevent this region from using concurrency checks |
| @Override |
| protected boolean supportsConcurrencyChecks() { |
| return false; |
| } |
| |
| @Override |
| public boolean isCopyOnRead() { |
| return false; |
| } |
| |
| // Prevent this region from participating in a TX, bug 38709 |
| @Override |
| public boolean isSecret() { |
| return true; |
| } |
| |
| // @override event tracker not needed for this type of region |
| @Override |
| public EventTracker createEventTracker() { |
| return NonDistributedEventTracker.getInstance(); |
| } |
| |
| @Override |
| public boolean shouldNotifyBridgeClients() { |
| return false; |
| } |
| |
| @Override |
| public boolean generateEventID() { |
| return false; |
| } |
| |
| @Override |
| protected boolean isUsedForSerialGatewaySenderQueue() { |
| return true; |
| } |
| |
| @Override |
| public AbstractGatewaySender getSerialGatewaySender() { |
| return sender; |
| } |
| |
| @Override |
| public void closeEntries() { |
| OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { |
| @Override |
| public void run() { |
| SerialGatewaySenderQueueMetaRegion.super.closeEntries(); |
| } |
| }); |
| } |
| |
| @Override |
| public Set<VersionSource> clearEntries(final RegionVersionVector rvv) { |
| final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>(); |
| OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { |
| @Override |
| public void run() { |
| result.set(SerialGatewaySenderQueueMetaRegion.super.clearEntries(rvv)); |
| } |
| }); |
| return result.get(); |
| } |
| |
| @Override |
| public void basicDestroy(final EntryEventImpl event, final boolean cacheWrite, |
| Object expectedOldValue) |
| throws EntryNotFoundException, CacheWriterException, TimeoutException { |
| try { |
| super.basicDestroy(event, cacheWrite, expectedOldValue); |
| } finally { |
| GatewaySenderEventImpl.release(event.getRawOldValue()); |
| } |
| } |
| |
| @Override |
| public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, |
| Object expectedOldValue, boolean requireOldValue, long lastModified, |
| boolean overwriteDestroyed, boolean invokeCallbacks, boolean throwConcurrentModificaiton) |
| throws TimeoutException, CacheWriterException { |
| try { |
| boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, requireOldValue, |
| lastModified, overwriteDestroyed, invokeCallbacks, throwConcurrentModificaiton); |
| if (!success) { |
| // release offheap reference if GatewaySenderEventImpl is not put into |
| // the region queue |
| GatewaySenderEventImpl.release(event.getRawNewValue()); |
| } |
| return success; |
| } finally { |
| // GatewaySenderQueue probably only adding new events into the queue. |
| // Add the finally block just in case if there actually is an update |
| // in the sender queue or occurs in the the future. |
| GatewaySenderEventImpl.release(event.getRawOldValue()); |
| } |
| } |
| } |
| |
| static class MetaRegionFactory { |
| SerialGatewaySenderQueueMetaRegion newMetaRegion(InternalCache cache, |
| final String regionName, |
| final RegionAttributes ra, |
| AbstractGatewaySender sender) { |
| SerialGatewaySenderQueueMetaRegion meta = |
| new SerialGatewaySenderQueueMetaRegion(regionName, ra, null, cache, sender, |
| sender.getStatisticsClock()); |
| return meta; |
| } |
| } |
| |
| public String displayContent() { |
| return this.region.keySet().toString(); |
| } |
| } |