| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.ha; |
| |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY; |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.THREAD_ID_EXPIRY_TIME_PROPERTY; |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.getProductIntegerProperty; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.ConcurrentModificationException; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.StatisticsFactory; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.CustomExpiry; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.ExpirationAction; |
| import org.apache.geode.cache.ExpirationAttributes; |
| import org.apache.geode.cache.MirrorType; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.cache.query.internal.CqQueryVsdStats; |
| import org.apache.geode.cache.query.internal.cq.CqService; |
| import org.apache.geode.cache.query.internal.cq.InternalCqQuery; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.DataSerializableFixedID; |
| import org.apache.geode.internal.Version; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.CachedDeserializable; |
| import org.apache.geode.internal.cache.Conflatable; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.HARegion; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.RegionQueue; |
| import org.apache.geode.internal.cache.VMCachedDeserializable; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage; |
| import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl; |
| import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp; |
| import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; |
| import org.apache.geode.internal.cache.tier.sockets.Handshake; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.LoggingThread; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.util.BlobHelper; |
| import org.apache.geode.internal.util.concurrent.StoppableCondition; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantLock; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock; |
| |
| /** |
| * An implementation of Queue using Gemfire Region as the underlying datastructure. The key will be |
| * a counter(long) and value will be the offered Object in the queue. For example, an entry in this |
| * region (key = 5; value = obj1) would mean that the Object obj1 is at the 5th position in the |
| * queue. |
| * |
| * This class has a field idsAvailable which is guraded by a ReentrantReadWriteLock. The peek |
| * threads which do not modify the idsAvailable LinkedhashSet take read lock , thereby increasing |
| * the concurrency of peek operations. The threads like take, remove, QRM ,put& expiry take a write |
| * lock while operating on the set. <BR> |
| * <B>This class is performant for multiple dispatchers that are trying to do non blocking peek </B> |
| * <br> |
| * For Blocking operations the object should be of type BlockingHARegionQueue. This class has just a |
| * ReentrantLock . Condition object of this lock is used to issue signal to the waiting peek & take |
| * threads for put operation. As a result , concurrency of peek operations is not possible. This |
| * class is optimized for a single peek thread . |
| * |
| * 30 May 2008: 5.7 onwards the underlying GemFire Region will continue to have key as counter(long) |
| * but the value will be a wrapper object(HAEventWrapper) which will be a key in a separate data |
| * structure called haContainer (an implementation of Map). The value against this wrapper will be |
| * the offered object in the queue. The purpose of this modification is to allow multiple |
| * ha-region-queues share their offered values without storing separate copies in memory, upon GII. |
| * |
| * (See BlockingHARegionQueue) |
| * |
| * @since GemFire 4.3 |
| */ |
| public class HARegionQueue implements RegionQueue { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** The {@code Region} backing this queue */ |
| protected HARegion region; |
| |
| /** |
| * The key into the {@code Region} used when putting entries onto the queue. The counter uses |
| * incrementAndGet so counter will always be started from 1 |
| */ |
| protected final AtomicLong tailKey = new AtomicLong(0); |
| |
| /** |
| * Map whose key is the ThreadIdentifier object value is the LastDispatchedAndCurrentEvents |
| * object. Every add operation will be identified by the ThreadIdentifier object & the position |
| * recorded in the LastDispatchedAndCurrentEvents object. |
| */ |
| protected final ConcurrentMap eventsMap = new ConcurrentHashMap(); |
| |
| /** |
| * The {@code Map} mapping the regionName->key to the queue key. This index allows fast updating |
| * of entries in the queue for conflation. |
| */ |
| protected volatile Map indexes = Collections.unmodifiableMap(new HashMap()); |
| |
| private StoppableReentrantReadWriteLock rwLock; |
| |
| private StoppableReentrantReadWriteLock.StoppableReadLock readLock; |
| |
| private StoppableWriteLock writeLock; |
| |
| /** The name of the {@code Region} backing this queue */ |
| private String regionName; |
| |
| /** The ClientProxyMembershipID associated with the ha queue */ |
| private ClientProxyMembershipID clientProxyID; |
| |
| /** |
| * The statistics for this queue |
| */ |
| public HARegionQueueStats stats; |
| |
| /** |
| * Accesses to this set must be protected via the rwLock. |
| */ |
| protected LinkedHashSet idsAvailable; |
| |
| /** |
| * Map of HA queue region-name and value as a MapWrapper object (whose underlying map contains |
| * ThreadIdentifier as key & value as the last dispatched sequence ID) |
| */ |
| @MakeNotStatic |
| static ConcurrentMap dispatchedMessagesMap; |
| |
| /** |
| * A MapWrapper object whose underlying map contains ThreadIdentifier as key & value as the last |
| * dispatched sequence ID |
| */ |
| protected MapWrapper threadIdToSeqId; |
| |
| /** |
| * A sequence violation can occur , if an HARegionQueue receives events thru GII & the same event |
| * also arrives via Gemfire Put in that local VM. If the HARegionQueue does not receive any data |
| * via GII , then there should not be any violation. If there is data arriving thru GII, such |
| * violations can be expected , but should be analyzed thoroughly. |
| */ |
| protected boolean puttingGIIDataInQueue; |
| |
| /** |
| * flag indicating whether interest has been registered for this queue. This is used to prevent |
| * race conditions between secondaries and primary for interest registration. |
| */ |
| private boolean hasRegisteredInterest; |
| |
| /** |
| * a thread local to store the counters corresponding to the events peeked by a particular thread. |
| * When {@code remove()} will be called, these events stored in thread-local will be destroyed. |
| */ |
| protected static final ThreadLocal peekedEventsContext = new ThreadLocal(); |
| |
| /** |
| * Thread which creates the {@code QueueRemovalMessage} and sends it to other nodes in the system |
| */ |
| @MakeNotStatic |
| private static QueueRemovalThread qrmThread; |
| |
| /** protects from modification during GII chunking */ |
| private StoppableReentrantReadWriteLock giiLock; |
| |
| /** the number of concurrent GII requests being served */ |
| private volatile int giiCount; |
| |
| /** queue to hold events during GII transfer so we do not modify the queue during chunking */ |
| private Queue giiQueue = new ConcurrentLinkedQueue(); |
| |
| /** |
| * Constant used to indicate the instance of BlockingHARegionQueue. The static function used for |
| * creating the queue instance should be passed this as parameter for creating |
| * BlockingHARegionQueue |
| */ |
| public static final int BLOCKING_HA_QUEUE = 1; |
| |
| /** |
| * Constant used to indicate the instance of HARegionQueue. The static function used for creating |
| * the queue instance should be passed this as parameter for creating HARegionQueue |
| */ |
| public static final int NON_BLOCKING_HA_QUEUE = 2; |
| |
| public static final String HA_EVICTION_POLICY_NONE = "none"; |
| |
| public static final String HA_EVICTION_POLICY_MEMORY = "mem"; |
| |
| public static final String HA_EVICTION_POLICY_ENTRY = "entry"; |
| |
| public static final long INIT_OF_SEQUENCEID = -1L; |
| |
| /** |
| * The default frequency (in seconds) at which a message will be sent by the primary to all the |
| * secondary nodes to remove the events which have already been dispatched from the queue. |
| */ |
| public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 1; |
| |
| /** |
| * The frequency (in seconds) at which a message will be sent by the primary to all the secondary |
| * nodes to remove the events which have already been dispatched from the queue. |
| */ |
| @MakeNotStatic |
| protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL; |
| |
| /** |
| * The underlying map (may hold reference to a Region or a ConcurrentHashMap) for this |
| * HARegionQueue instance (and also shared by all the HARegionQueue instances associated with the |
| * same CacheClientNotifier). |
| */ |
| protected Map haContainer; |
| |
| /** |
| * Boolean to indicate whether this HARegionQueue is having active dispatcher or not( primary node |
| * or the secondary node). This will be used to prevent the events from expiry if the node is |
| * primary. This was introduced for fixing the bug#36853 |
| */ |
| private volatile boolean isPrimary = false; |
| |
| /** Boolean to indicate whether destruction of queue is in progress. */ |
| protected volatile boolean destroyInProgress = false; |
| |
| private CancelCriterion stopper; |
| |
| /** @since GemFire 5.7 */ |
| protected byte clientConflation = Handshake.CONFLATION_DEFAULT; |
| |
| /** |
| * Boolean to indicate whether client is a slow receiver |
| * |
| * @since GemFire 6.0 |
| */ |
| public boolean isClientSlowReceiver = false; |
| |
| /** |
| * initialization flag - when true the queue has fully initialized |
| */ |
| final AtomicBoolean initialized = new AtomicBoolean(); |
| |
| /** |
| * Test hooks for periodic ack |
| * |
| * @since GemFire 6.0 |
| */ |
| @MutableForTesting |
| static boolean testMarkerMessageReceived = false; |
| @MutableForTesting |
| static boolean isUsedByTest = false; |
| |
| /** |
| * Used by durable queues to maintain acked events by client |
| */ |
| protected Map ackedEvents; |
| |
| /** |
| * Indicates how many times this ha queue has hit the max queue size limit. |
| */ |
| protected long maxQueueSizeHitCount = 0; |
| |
| /** |
| * Processes the given string and returns a string which is allowed for region names |
| * |
| * @return legal region name |
| */ |
| public static String createRegionName(String regionName) { |
| return regionName.replace('/', '#'); |
| } |
| |
| HARegionQueue(String regionName, HARegion haRegion, InternalCache cache, Map haContainer, |
| ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary, |
| HARegionQueueStats stats, StoppableReentrantReadWriteLock giiLock, |
| StoppableReentrantReadWriteLock rwLock, CancelCriterion cancelCriterion, |
| boolean puttingGIIDataInQueue) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| initializeHARegionQueue(regionName, haRegion, haContainer, clientProxyId, clientConflation, |
| isPrimary, stats, giiLock, rwLock, cancelCriterion, puttingGIIDataInQueue); |
| } |
| |
| private void initializeHARegionQueue(String regionName, HARegion haRegion, Map haContainer, |
| ClientProxyMembershipID clientProxyId, byte clientConflation, boolean isPrimary, |
| HARegionQueueStats stats, StoppableReentrantReadWriteLock giiLock, |
| StoppableReentrantReadWriteLock rwLock, CancelCriterion cancelCriterion, |
| boolean putGIIDataInQueue) throws InterruptedException { |
| this.regionName = regionName; |
| this.region = haRegion; |
| this.threadIdToSeqId = new MapWrapper(); |
| this.idsAvailable = new LinkedHashSet(); |
| setClientConflation(clientConflation); |
| this.isPrimary = isPrimary; |
| // Initialize the statistics |
| this.stats = stats; |
| this.haContainer = haContainer; |
| this.giiLock = giiLock; |
| this.clientProxyID = clientProxyId; |
| |
| this.stopper = cancelCriterion; |
| this.rwLock = rwLock; |
| this.readLock = this.rwLock.readLock(); |
| this.writeLock = this.rwLock.writeLock(); |
| |
| // false specifically set in tests only |
| if (putGIIDataInQueue) { |
| putGIIDataInRegion(); |
| } |
| if (this.getClass() == HARegionQueue.class) { |
| initialized.set(true); |
| } |
| } |
| |
| /** |
| * @param isPrimary whether this is the primary queue for a client |
| */ |
| protected HARegionQueue(String regionName, InternalCache cache, Map haContainer, |
| ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| |
| String processedRegionName = createRegionName(regionName); |
| |
| // Initialize the statistics |
| StatisticsFactory factory = cache.getDistributedSystem(); |
| createHARegion(processedRegionName, cache); |
| |
| initializeHARegionQueue(processedRegionName, this.region, haContainer, clientProxyId, |
| clientConflation, isPrimary, new HARegionQueueStats(factory, processedRegionName), |
| new StoppableReentrantReadWriteLock(cache.getCancelCriterion()), |
| new StoppableReentrantReadWriteLock(region.getCancelCriterion()), |
| this.region.getCancelCriterion(), true); |
| } |
| |
| private void createHARegion(String processedRegionName, InternalCache cache) |
| throws IOException, ClassNotFoundException { |
| AttributesFactory af = new AttributesFactory(); |
| af.setMirrorType(MirrorType.KEYS_VALUES); |
| af.addCacheListener(createCacheListenerForHARegion()); |
| af.setStatisticsEnabled(true); |
| RegionAttributes ra = af.create(); |
| this.region = HARegion.getInstance(processedRegionName, cache, this, ra); |
| |
| if (isPrimary) {// fix for 41878 |
| // since it's primary queue, we will disable the EntryExpiryTask |
| // this should be done after region creation |
| disableEntryExpiryTasks(); |
| } |
| } |
| |
| /** |
| * reinitialize the queue, presumably pulling current information from seconaries |
| */ |
| public void reinitializeRegion() { |
| InternalCache cache = this.region.getCache(); |
| String regionName = this.region.getName(); |
| this.region.destroyRegion(); |
| Exception problem = null; |
| try { |
| createHARegion(regionName, cache); |
| } catch (IOException | ClassNotFoundException e) { |
| problem = e; |
| } |
| if (problem != null) { |
| throw new InternalGemFireException("Problem recreating region queue '" + regionName + "'"); |
| } |
| try { |
| this.putGIIDataInRegion(); |
| } catch (InterruptedException e) { |
| cache.getCancelCriterion().checkCancelInProgress(e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| |
| |
| /** |
| * install DACE information from an initial image provider |
| */ |
| @SuppressWarnings("synthetic-access") |
| public void recordEventState(InternalDistributedMember sender, Map eventState) { |
| StringBuffer sb = null; |
| final boolean isDebugEnabled_BS = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE); |
| if (isDebugEnabled_BS) { |
| sb = new StringBuffer(500); |
| sb.append("Recording initial event state for ").append(this.regionName).append(" from ") |
| .append(sender); |
| } |
| for (Iterator it = eventState.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| DispatchedAndCurrentEvents giiDace = (DispatchedAndCurrentEvents) entry.getValue(); |
| if (giiDace != null) { // fix for bug #41789: npe during state transfer |
| giiDace.owningQueue = this; |
| giiDace.isGIIDace = true; |
| if (giiDace.QRM_LOCK == null) { |
| // this has happened a number of times with JDK 1.6, even when QRM_LOCK |
| // was a "final" field |
| giiDace.QRM_LOCK = new Object(); |
| } |
| if (isDebugEnabled_BS) { |
| sb.append("\n ").append(((ThreadIdentifier) entry.getKey()).expensiveToString()) |
| .append("; sequenceID=").append(giiDace.lastSequenceIDPut); |
| } |
| // use putIfAbsent to avoid overwriting newer dispatch information |
| Object o = this.eventsMap.putIfAbsent(entry.getKey(), giiDace); |
| if (o != null && isDebugEnabled_BS) { |
| sb.append(" -- could not store. found ").append(o); |
| } |
| } |
| } |
| if (isDebugEnabled_BS) { |
| logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, sb.toString()); |
| } |
| } |
| |
| /** |
| * Repopulates the HARegion after the GII is over so as to reset the counters and populate the |
| * DACE objects for the thread identifiers . This method should be invoked as the last method in |
| * the constructor . Thus while creating BlockingQueue this method should be invoked lastly in the |
| * derived class constructor , after the HARegionQueue contructor is complete. Otherwise, the |
| * ReentrantLock will be null. |
| */ |
| void putGIIDataInRegion() throws CacheException, InterruptedException { |
| Set entrySet = this.region.entrySet(false); |
| // check if the region is not empty. if there is |
| // data, then the relevant data structures have to |
| // be populated |
| if (!entrySet.isEmpty()) { |
| this.puttingGIIDataInQueue = true; |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| try { |
| Region.Entry entry = null; |
| Map orderedMap = new TreeMap(); |
| Iterator iterator = entrySet.iterator(); |
| Object key = null; |
| while (iterator.hasNext()) { |
| entry = (Region.Entry) iterator.next(); |
| key = entry.getKey(); |
| if (isDebugEnabled) { |
| logger.debug("{} processing queue key {} and value {}", this.regionName, key, |
| entry.getValue()); |
| } |
| if (key instanceof Long) { |
| if (!(entry.getValue() instanceof ClientMarkerMessageImpl)) { |
| orderedMap.put(key, entry.getValue()); |
| } |
| } |
| this.region.localDestroy(key); |
| } |
| long max = 0; |
| long counterInRegion = 0; |
| entrySet = orderedMap.entrySet(); |
| if (!entrySet.isEmpty()) { |
| Map.Entry mapEntry = null; |
| iterator = entrySet.iterator(); |
| while (iterator.hasNext()) { |
| mapEntry = (Map.Entry) iterator.next(); |
| Conflatable val = (Conflatable) mapEntry.getValue(); |
| if (val != null && val.getEventId() != null) { |
| counterInRegion = ((Long) mapEntry.getKey()).intValue(); |
| // TODO: remove this assertion |
| Assert.assertTrue(counterInRegion > max); |
| max = counterInRegion; |
| this.put(val); |
| } else if (isDebugEnabled) { |
| logger.debug( |
| "{} bug 44959 encountered: HARegion.putGIIDataInRegion found null eventId in {}", |
| this.regionName, val); |
| } |
| } |
| } |
| this.tailKey.set(max); |
| } finally { |
| this.puttingGIIDataInQueue = false; |
| if (isDebugEnabled) { |
| logger.debug("{} done putting GII data into queue", this); |
| } |
| } |
| } |
| // TODO:Asif: Avoid invocation of this method |
| startHAServices(this.region.getCache()); |
| } |
| |
| /** |
| * Puts the GII'd entry into the ha region, if it was GII'd along with its ClientUpdateMessageImpl |
| * instance. |
| * |
| * @since GemFire 5.7 |
| */ |
| protected void putInQueue(Object val) throws InterruptedException { |
| if (val instanceof HAEventWrapper && ((HAEventWrapper) val).getClientUpdateMessage() == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{} HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.", |
| this.regionName, ((Conflatable) val).getKeyToConflate()); |
| } |
| } else { |
| this.put(val); |
| } |
| } |
| |
| /** |
| * Check whether to conflate an event |
| * |
| * @since GemFire 5.7 |
| */ |
| protected boolean shouldBeConflated(Conflatable event) { |
| boolean retVal = event.shouldBeConflated(); |
| // don't apply the client conflation override on durable markers |
| if (event instanceof ClientMarkerMessageImpl) { |
| return retVal; |
| } |
| switch (this.clientConflation) { |
| case Handshake.CONFLATION_OFF: |
| return false; // always disable |
| case Handshake.CONFLATION_ON: |
| if (event instanceof HAEventWrapper) { |
| ClientUpdateMessage cum = (ClientUpdateMessage) this.haContainer.get(event); |
| if (cum != null) { |
| retVal = cum.isUpdate(); |
| } |
| break; |
| } |
| if (event instanceof ClientUpdateMessage) { |
| // Does this ever happen now? |
| retVal = ((ClientUpdateMessage) event).isUpdate(); |
| break; |
| } |
| // Oddness |
| break; |
| case Handshake.CONFLATION_DEFAULT: |
| return retVal; |
| default: |
| throw new InternalGemFireError("Invalid clientConflation"); |
| } |
| return retVal; |
| } |
| |
| /** |
| * Adds an object at the queue's tail. The implementation supports concurrent put operations in a |
| * performant manner. This is done in following steps: <br> |
| * 1)Check if the Event being added has a sequence ID less than the Last Dispatched SequenceId. If |
| * yes do not add the Event to the Queue <br> |
| * 2)If no then Do a "put-if-absent" on the eventsMap ,inserting the |
| * LastDispatchedAndCurrentEvents object or adding the position to the existing object for the |
| * ThreadIdentifier |
| * |
| * It is possible that DispatchedAnCurrentEvents object just retrieved by the put thread has |
| * expired thus a one level recursion can occur to do a valid put |
| * |
| * The operation is thread safe & is guarded by taking a lock on LastDispatchedAndCurrentEvents |
| * object & SIZE Lock |
| * |
| * @param object object to put onto the queue |
| */ |
| @Override |
| public boolean put(Object object) throws CacheException, InterruptedException { |
| this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event |
| try { |
| if (this.giiCount > 0) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName, |
| giiQueue.size(), object); |
| } |
| HAEventWrapper haContainerKey = null; |
| |
| if (object instanceof HAEventWrapper) { |
| HAEventWrapper wrapper = (HAEventWrapper) object; |
| wrapper.incrementPutInProgressCounter("GII queue"); |
| } |
| |
| this.giiQueue.add(object); |
| } else { |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: adding message to HA queue: {}", this.regionName, object); |
| } |
| basicPut(object); |
| } |
| } finally { |
| this.giiLock.readLock().unlock(); |
| } |
| |
| // basicPut() invokes dace.putObject() to put onto HARegionQueue |
| // However, dace.putObject could return true even though |
| // the event is not put onto the HARegionQueue due to eliding events etc. |
| // So it is not reliable to be used whether offheap ref ownership is passed over to |
| // the queue (if and when HARegionQueue uses offheap). The probable |
| // solution could be that to let dace.putObject() to increase offheap REF count |
| // when it puts the event onto the region queue. Also always release (dec) |
| // the offheap REF count from the caller. |
| return true; |
| } |
| |
| |
| private void basicPut(Object object) throws CacheException, InterruptedException { |
| // optmistically decrease the put count |
| try { |
| this.checkQueueSizeConstraint(); |
| // this.region.checkReadiness(); // throws CacheClosed or RegionDestroyed |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| this.region.getCache().getCancelCriterion().checkCancelInProgress(ie); |
| } |
| |
| Conflatable event = (Conflatable) object; |
| // Get the EventID object & from it obtain the ThreadIdentifier |
| EventID eventId = event.getEventId(); |
| ThreadIdentifier ti = getThreadIdentifier(eventId); |
| long sequenceID = eventId.getSequenceID(); |
| // Check from Events Map if the put operation should proceed or not |
| DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(ti); |
| if (dace != null && dace.isGIIDace && this.puttingGIIDataInQueue) { |
| // we only need to retain DACE for which there are no entries in the queue. |
| // for other thread identifiers we build up a new DACE |
| dace = null; |
| } |
| if (dace != null) { |
| // check the last dispatched sequence Id |
| if (this.puttingGIIDataInQueue || (sequenceID > dace.lastDispatchedSequenceId)) { |
| // Asif:Insert the Event into the Region with proper locking. |
| // It is possible that by the time put operation proceeds , the |
| // Last dispatched id has changed so it is possible that the object at |
| // this point |
| // also does not get added |
| if (!dace.putObject(event, sequenceID)) { |
| // dace encountered a DESTROYED token - stop adding GII data |
| if (!this.puttingGIIDataInQueue) { |
| this.put(object); |
| } |
| } else { |
| if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) { |
| logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "{}: Adding message to queue: {}", this, |
| object); |
| } |
| } |
| |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: This queue has already seen this event. The highest sequence number in the queue for {} is {}, but this event's sequence number is {}", |
| this.regionName, ti, dace.lastDispatchedSequenceId, sequenceID); |
| } |
| incrementTakeSidePutPermits(); |
| } |
| } else { |
| dace = new DispatchedAndCurrentEvents(this); |
| DispatchedAndCurrentEvents oldDace = |
| (DispatchedAndCurrentEvents) this.eventsMap.putIfAbsent(ti, dace); |
| if (oldDace != null) { |
| dace = oldDace; |
| } else { |
| // Add the recently added ThreadIdentifier to the RegionQueue for expiry |
| this.region.put(ti, dace.lastDispatchedSequenceId); |
| // update the stats |
| this.stats.incThreadIdentifiers(); |
| } |
| if (!dace.putObject(event, sequenceID)) { |
| this.put(object); |
| } else { |
| if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) { |
| logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "{}: Adding message to queue: {}", this, |
| object); |
| } |
| } |
| } |
| } |
| |
| /** |
| * while serving a GII request we queue put()s in a list so that the GII image is not modified. |
| * This keeps the chunking iteration from sometimes seeing a later event but missing an older |
| * event. Bug #41681 |
| */ |
| public void startGiiQueueing() { |
| this.giiLock.writeLock().lock(); |
| this.giiCount++; // TODO: non-atomic operation on volatile! |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: startGiiQueueing count is now {}", this.regionName, this.giiCount); |
| } |
| this.giiLock.writeLock().unlock(); |
| } |
| |
| /** |
| * at the end of a GII image request we decrement the in-process count and, if it falls to zero we |
| * empty the list of messages that put() has been building. This is done with the lock held to |
| * prevent newer events from being queued before the list is drained. |
| */ |
| public void endGiiQueueing() { |
| this.giiLock.writeLock().lock(); |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| try { |
| this.giiCount--; // TODO: non-atomic operation on volatile! |
| if (isDebugEnabled) { |
| logger.debug("{}: endGiiQueueing count is now {}", this.regionName, this.giiCount); |
| } |
| if (this.giiCount < 0) { |
| if (isDebugEnabled) { |
| logger.debug("{} found giiCount to be {}", this.regionName, this.giiCount); |
| } |
| this.giiCount = 0; |
| } |
| if (this.giiCount == 0) { |
| if (isDebugEnabled) { |
| logger.debug("{} all GII requests completed - draining {} messages", this.regionName, |
| this.giiQueue.size()); |
| } |
| boolean interrupted = false; |
| int expectedCount = this.giiQueue.size(); |
| int actualCount = 0; |
| while (true) { |
| Object value; |
| try { |
| value = this.giiQueue.remove(); |
| } catch (NoSuchElementException ignore) { |
| break; |
| } |
| actualCount++; |
| try { |
| if (isDebugEnabled) { |
| logger.debug("{} draining #{}: {}", this.regionName, (actualCount + 1), value); |
| } |
| if (value instanceof HAEventWrapper) { |
| if (((HAEventWrapper) value).getClientUpdateMessage() == null) { |
| if (isDebugEnabled) { |
| logger.debug( |
| "{} ATTENTION: found gii queued event with null event message. Please see bug #44852: {}", |
| this.regionName, value); |
| } |
| continue; |
| } |
| } |
| |
| basicPut(value); |
| |
| // The HAEventWrapper putInProgressCounter must be decremented because it was |
| // incremented when it was queued in giiQueue. |
| if (value instanceof HAEventWrapper) { |
| ((HAEventWrapper) value).decrementPutInProgressCounter(); |
| } |
| } catch (NoSuchElementException ignore) { |
| break; |
| } catch (InterruptedException ignore) { |
| // complete draining while holding the write-lock so nothing else |
| // can get into the queue |
| interrupted = true; |
| } |
| } |
| if (interrupted) { |
| this.region.getCache().getCancelCriterion() |
| .checkCancelInProgress(new InterruptedException()); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } catch (RuntimeException t) { |
| logger.fatal("endGiiQueueing terminating due to uncaught runtime exception", t); |
| throw t; |
| } catch (Error t) { |
| logger.fatal("endGiiQueueing terminating due to uncaught error", t); |
| throw t; |
| } finally { |
| if (logger.isTraceEnabled()) { |
| logger.trace("{} endGiiQueueing completed", this.regionName); |
| } |
| this.giiLock.writeLock().unlock(); |
| } |
| } |
| |
| |
| /** |
| * this method is for transmission of DACE information with initial image state in HARegions. It |
| * should not be used for other purposes. The map contains only those entries that have no items |
| * queued. This is used to prevent replay of events in the new queue that have already been |
| * removed from this queue. |
| */ |
| public Map getEventMapForGII() { |
| // fix for bug #41621 - concurrent modification exception while serializing event map |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| do { |
| HashMap result = new HashMap(); |
| try { |
| for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : ((Map<ThreadIdentifier, DispatchedAndCurrentEvents>) this.eventsMap) |
| .entrySet()) { |
| if (entry.getValue().isCountersEmpty()) { |
| result.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| return result; |
| } catch (ConcurrentModificationException ignore) { |
| // TODO:WTF: bad practice but eventsMap is ConcurrentHashMap |
| if (isDebugEnabled) { |
| logger.debug( |
| "HARegion encountered concurrent modification exception while analysing event state - will try again"); |
| } |
| } |
| } while (true); |
| } |
| |
| /** |
| * Implementation in BlockingHARegionQueue class |
| */ |
| void checkQueueSizeConstraint() throws InterruptedException { |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Implementation in BlokcingHARegionQueue class |
| * |
| */ |
| void incrementTakeSidePutPermitsWithoutNotify() { |
| |
| } |
| |
| /** |
| * Creates the static dispatchedMessagesMap (if not present) and starts the QueuRemovalThread if |
| * not running |
| */ |
| static synchronized void startHAServices(InternalCache cache) { |
| if (qrmThread == null) { |
| dispatchedMessagesMap = new ConcurrentHashMap(); |
| qrmThread = new QueueRemovalThread(cache); |
| qrmThread.setName("Queue Removal Thread"); |
| qrmThread.start(); |
| } |
| } |
| |
| /** |
| * Clears the dispatchedMessagesMap and shuts down the QueueRemovalThread. |
| * |
| */ |
| public static synchronized void stopHAServices() { |
| if (qrmThread != null) { |
| qrmThread.shutdown(); |
| qrmThread = null; |
| dispatchedMessagesMap.clear(); |
| dispatchedMessagesMap = null; |
| |
| } |
| } |
| |
| /** |
| * Used for testing purposes only |
| * |
| * @return the frequency (in seconds) at which a message will be sent by the primary to all the |
| * secondary nodes to remove the events which have already been dispatched from the queue. |
| */ |
| public static int getMessageSyncInterval() { |
| return messageSyncInterval; |
| } |
| |
| /** |
| * |
| * The internal method used for setting the message synch interval time via the Cache API |
| * |
| * @param seconds time to wait between two synch messages |
| */ |
| public static void setMessageSyncInterval(int seconds) { |
| messageSyncInterval = seconds; |
| } |
| |
| /** |
| * Returns the previous counter if any for the Conflatable object |
| * |
| * @param event Object to be conflated |
| * @param newPosition New Conflatable object's position |
| * @return Long object denoting the position of the previous conflatable object |
| */ |
| protected Long addToConflationMap(Conflatable event, Long newPosition) { |
| String r = event.getRegionToConflate(); |
| ConcurrentMap latestIndexesForRegion = (ConcurrentMap) this.indexes.get(r); |
| if (latestIndexesForRegion == null) { |
| synchronized (HARegionQueue.this) { |
| if ((latestIndexesForRegion = (ConcurrentMap) this.indexes.get(r)) == null) { |
| latestIndexesForRegion = createConcurrentMap(); |
| Map newMap = new HashMap(this.indexes); |
| newMap.put(r, latestIndexesForRegion); |
| this.indexes = Collections.unmodifiableMap(newMap); |
| } |
| } |
| } |
| Object key = event.getKeyToConflate(); |
| return (Long) latestIndexesForRegion.put(key, newPosition); |
| } |
| |
| /** |
| * Creates and returns a ConcurrentMap. This method is over-ridden in test classes to test some |
| * functionality |
| * |
| * @return new ConcurrentMap |
| */ |
| ConcurrentMap createConcurrentMap() { |
| return new ConcurrentHashMap(); |
| } |
| |
| /** |
| * |
| * @return CacheListener object having the desired functionility of expiry of Thread Identifier & |
| * Events for the HAregion. This method is appropriately over ridden in the test classes. |
| */ |
| CacheListener createCacheListenerForHARegion() { |
| return new CacheListenerAdapter() { |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| try { |
| // Fix 39175 |
| // Fix for #48879. As the expiry-tasks are disabled either in |
| // constructor or when the queue becomes primary, we no longer need to |
| // check if the queue is primary here. It'll allow ThreadIdentifiers |
| // to expire. |
| // if (!HARegionQueue.this.isPrimary()) { |
| HARegionQueue.this.expireTheEventOrThreadIdentifier(event); |
| // } |
| } catch (CancelException ignore) { |
| // ignore, we're done |
| } catch (CacheException ce) { |
| if (!destroyInProgress) { |
| logger.error("HAREgionQueue::createCacheListner::Exception in the expiry thread", |
| ce); |
| } |
| } |
| } |
| }; |
| } |
| |
| /** |
| * This function is invoked by the createCacheListenerForHARegion for theHARegionQueue & also by |
| * overridden function createCacheListenerForHARegion of the HARegionQueueJUnitTest class for the |
| * test testConcurrentEventExpiryAndTake. This function provides the meaningful functionality for |
| * expiry of the Event object as well as ThreadIdentifier |
| * |
| * @param event event object representing the data being expired |
| */ |
| void expireTheEventOrThreadIdentifier(EntryEvent event) throws CacheException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug( |
| "HARegionQueue::afterInvalidate. Entry Event being invalidated:{}, isPrimaryQueue:{}", |
| event, HARegionQueue.this.isPrimary()); |
| } |
| Object key = event.getKey(); |
| if (key instanceof ThreadIdentifier) { |
| // Check if the sequenceID present as value against this key is same |
| // as |
| // the last dispatched sequence & the size of set containing the |
| // counters |
| // is 0. If yes the Dace should be removed |
| // Get DACE |
| |
| DispatchedAndCurrentEvents dace = |
| (DispatchedAndCurrentEvents) HARegionQueue.this.eventsMap.get(key); |
| Assert.assertTrue(dace != null); |
| Long expirySequenceID = (Long) event.getOldValue(); |
| boolean expired = dace.expireOrUpdate(expirySequenceID, (ThreadIdentifier) key); |
| if (isDebugEnabled) { |
| logger.debug( |
| "HARegionQueue::afterInvalidate:Size of the region after expiring or updating the ThreadIdentifier={}", |
| HARegionQueue.this.region.keys().size()); |
| logger.debug("HARegionQueue::afterInvalidate:ThreadIdentifier expired={}", expired); |
| } |
| } else if (key instanceof Long) { |
| // if (destroyFromAvailableIDsAndRegion((Long)key)) { |
| destroyFromQueue(key); |
| Conflatable cf = (Conflatable) event.getOldValue(); |
| EventID id = cf.getEventId(); |
| byte[] memID = id.getMembershipID(); |
| long threadId = id.getThreadID(); |
| DispatchedAndCurrentEvents dace = |
| (DispatchedAndCurrentEvents) eventsMap.get(new ThreadIdentifier(memID, threadId)); |
| if (shouldBeConflated(cf)) { |
| dace.destroy((Long) key, cf.getKeyToConflate(), cf.getRegionToConflate()); |
| } else { |
| dace.destroy((Long) key); |
| } |
| // } |
| } else { |
| // unexpected condition, throw exception? |
| } |
| |
| } |
| |
| /** |
| * This method adds the position of newly added object to the List of available IDs so that it is |
| * available for peek or take. This method is called from DispatchedAndCurrentEvents object. This |
| * method is invoked in a write lock for non blocking queue & in a reentrant lock in a blocking |
| * queue. In case of blocking queue , this method also signals the waiting take & peek threads to |
| * awake. |
| * |
| * @param position The Long position of the object which has been added |
| */ |
| void publish(Long position) throws InterruptedException { |
| acquireWriteLock(); |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adding position " + position + " to available IDs. Region: " + regionName); |
| } |
| |
| this.idsAvailable.add(position); |
| // Notify the waiting peek threads or take threads of blocking queue |
| // A void operation for the non blocking queue operations |
| notifyPeekAndTakeThreads(); |
| } finally { |
| releaseWriteLock(); |
| } |
| } |
| |
| protected boolean removeFromOtherLists(Long position) { |
| return false; |
| } |
| |
| /** |
| * @param position Long value present in the Available IDs map against which Event object is |
| * present in HARegion. This function is directly invoked from the basicInvalidate function |
| * where |
| * expiry is aborted if this function returns false |
| * @return boolean true if the position could be removed from the Set |
| * @throws InterruptedException * |
| */ |
| public boolean destroyFromAvailableIDs(Long position) throws InterruptedException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Removing position " + position + " from available IDs. Region: " + regionName); |
| } |
| |
| boolean removedOK = false; |
| acquireWriteLock(); |
| try { |
| removedOK = this.idsAvailable.remove(position); |
| if (!removedOK) { |
| removedOK = this.removeFromOtherLists(position); |
| } |
| if (removedOK) { |
| this.incrementTakeSidePutPermits(); |
| } |
| } finally { |
| releaseWriteLock(); |
| } |
| return removedOK; |
| } |
| |
| /** |
| * Destroys the entry at the position from the Region. It checks for the presence of the position |
| * in the AvailableID Set. If the position existed in the Set, then only it is removed from the |
| * Set & the underlying Region |
| * |
| * @param position Long position counter for entry in the Region |
| * @return true if the entry with <br> |
| * position <br> |
| * specified was removed from the Set |
| */ |
| protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Removing position " + position + " from available IDs and region. Region: " |
| + this.regionName); |
| } |
| |
| boolean removedOK = this.destroyFromAvailableIDs(position); |
| |
| if (removedOK) { |
| try { |
| this.destroyFromQueue(position); |
| } catch (EntryNotFoundException ignore) { |
| if (!HARegionQueue.this.destroyInProgress) { |
| if (!this.region.isDestroyed()) { |
| Assert.assertTrue(false, "HARegionQueue::remove: The position " + position |
| + "existed in availableIDs set but not in Region object is not expected"); |
| } |
| } |
| } |
| } |
| return removedOK; |
| } |
| |
| /* |
| * Removes from the local region and decrements stats GII does not call this because it messes |
| * with the cq stats Expiry can call this for now as durable client expires would shut down the cq |
| * anyways if anything goes wrong |
| * |
| */ |
| private void destroyFromQueue(Object key) { |
| Object event = this.region.get(key); |
| this.region.localDestroy(key); |
| |
| maintainCqStats(event, -1); |
| } |
| |
| /** Returns the {@code toString} for this RegionQueue object */ |
| @Override |
| public String toString() { |
| return "RegionQueue on " + this.regionName + "(" + (this.isPrimary ? "primary" : "backup") |
| + ")"; |
| } |
| |
| /** |
| * Returns the underlying region that backs this queue. |
| */ |
| @Override |
| public HARegion getRegion() { |
| return this.region; |
| } |
| |
| |
| /** |
| * This method is invoked by the take function . For non blocking queue it returns null or a valid |
| * long position while for blocking queue it waits for data in the queue or throws Exception if |
| * the thread encounters exception while waiting. |
| */ |
| protected Long getAndRemoveNextAvailableID() throws InterruptedException { |
| Long next = null; |
| acquireWriteLock(); |
| try { |
| if (this.idsAvailable.isEmpty()) { |
| if (waitForData()) { |
| Iterator itr = this.idsAvailable.iterator(); |
| next = (Long) itr.next(); |
| itr.remove(); |
| this.incrementTakeSidePutPermits(); |
| } |
| } else { |
| Iterator itr = this.idsAvailable.iterator(); |
| next = (Long) itr.next(); |
| itr.remove(); |
| this.incrementTakeSidePutPermits(); |
| } |
| } finally { |
| releaseWriteLock(); |
| } |
| return next; |
| } |
| |
| /** |
| * Returns the next position counter present in idsAvailable set. This method is invoked by the |
| * peek function. In case of BlockingQueue, this method waits till a valid ID is available. |
| * |
| * @return valid Long poistion or null depending upon the nature of the queue |
| * @throws TimeoutException if operation is interrupted (unfortunately) |
| */ |
| private Long getNextAvailableID() throws InterruptedException { |
| Long next = null; |
| acquireReadLock(); |
| try { |
| if (this.idsAvailable.isEmpty()) { |
| // Asif:Wait in case it is a blocking thread |
| if (waitForData()) { |
| next = (Long) this.idsAvailable.iterator().next(); |
| } |
| } else { |
| next = (Long) this.idsAvailable.iterator().next(); |
| } |
| } finally { |
| releaseReadLock(); |
| } |
| return next; |
| } |
| |
| /** |
| * For non blocking queue , this method either returns null or an Object. For blocking queue it |
| * will always return with an Object or wait for queue to be populated. |
| * |
| * @throws CacheException The exception can be thrown by BlockingQueue if it encounters |
| * InterruptedException while waiting for data |
| */ |
| @Override |
| public Object take() throws CacheException, InterruptedException { |
| Conflatable object = null; |
| Long next = null; |
| if ((next = this.getAndRemoveNextAvailableID()) != null) { |
| object = (Conflatable) this.region.get(next); |
| Assert.assertTrue(object != null); |
| |
| object = this.getAndRemoveFromHAContainer(object); |
| Assert.assertTrue(object != null); |
| EventID eventid = object.getEventId(); |
| long sequenceId = eventid.getSequenceID(); |
| ThreadIdentifier threadid = getThreadIdentifier(eventid); |
| DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(threadid); |
| Assert.assertTrue(dace != null); |
| Object keyToConflate = null; |
| if (shouldBeConflated(object)) { |
| keyToConflate = object.getKeyToConflate(); |
| } |
| dace.removeEventAndSetSequenceID( |
| new RemovedEventInfo(next, object.getRegionToConflate(), keyToConflate), sequenceId); |
| |
| // Periodic ack from the client will add to the addDispatchMessage Map. |
| // This method gets called from cacheClientNotifier upon receiving the ack from client. |
| // addDispatchedMessage(threadid, sequenceId); |
| |
| // update the stats |
| // if (logger.isDebugEnabled()) { |
| this.stats.incEventsTaken(); |
| // } |
| } |
| // since size is zero, return null |
| if (object == null && logger.isDebugEnabled()) { |
| logger.debug("RegionQueue is EMPTY, returning null for take()"); |
| } |
| return object; |
| } |
| |
| @Override |
| public List take(int batchSize) throws CacheException, InterruptedException { |
| List batch = new ArrayList(batchSize * 2); |
| for (int i = 0; i < batchSize; i++) { |
| Object obj = take(); |
| if (obj != null) { |
| batch.add(obj); |
| } else { |
| break; |
| } |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Took a batch of {} entries", this, batch.size()); |
| } |
| return batch; |
| } |
| |
| protected boolean checkPrevAcks() { |
| // ARB: Implemented in DurableHARegionQueue. |
| return true; |
| } |
| |
| protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid, long sequenceId) { |
| return true; |
| } |
| |
| protected void setPeekedEvents() { |
| HARegionQueue.peekedEventsContext.set(null); |
| } |
| |
| /** |
| * Removes the events that were peeked by this thread. The events are destroyed from the queue and |
| * conflation map and DispatchedAndCurrentEvents are updated accordingly. |
| */ |
| @Override |
| public void remove() throws InterruptedException { |
| List peekedIds = (List) HARegionQueue.peekedEventsContext.get(); |
| |
| if (peekedIds == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Remove() called before peek(), nothing to remove."); |
| } |
| return; |
| } |
| |
| if (!this.checkPrevAcks()) { |
| return; |
| } |
| |
| Map groupedThreadIDs = new HashMap(); |
| |
| for (Iterator iter = peekedIds.iterator(); iter.hasNext();) { |
| Long counter = (Long) iter.next(); |
| |
| Conflatable event = (Conflatable) this.region.get(counter); |
| if (event != null) { |
| EventID eventid = event.getEventId(); |
| long sequenceId = eventid.getSequenceID(); |
| ThreadIdentifier threadid = getThreadIdentifier(eventid); |
| |
| if (!checkEventForRemoval(counter, threadid, sequenceId)) { |
| continue; |
| } |
| |
| Object key = null; |
| String r = null; |
| if (shouldBeConflated(event)) { |
| key = event.getKeyToConflate(); |
| r = event.getRegionToConflate(); |
| } |
| RemovedEventInfo info = new RemovedEventInfo(counter, r, key); |
| |
| List countersList; |
| if ((countersList = (List) groupedThreadIDs.get(threadid)) != null) { |
| countersList.add(info); |
| countersList.set(0, sequenceId); |
| } else { |
| countersList = new ArrayList(); |
| countersList.add(sequenceId); |
| countersList.add(info); |
| groupedThreadIDs.put(threadid, countersList); |
| } |
| event = null; |
| info = null; |
| } else { |
| // if (logger.isDebugEnabled()) { |
| HARegionQueue.this.stats.incNumVoidRemovals(); |
| // } |
| } |
| } |
| |
| for (Iterator iter = groupedThreadIDs.entrySet().iterator(); iter.hasNext();) { |
| Map.Entry element = (Map.Entry) iter.next(); |
| ThreadIdentifier tid = (ThreadIdentifier) element.getKey(); |
| List removedEvents = (List) element.getValue(); |
| long lastDispatchedId = (Long) removedEvents.remove(0); |
| DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(tid); |
| if (dace != null && dace.lastDispatchedSequenceId < lastDispatchedId) { |
| try { |
| dace.setLastDispatchedIDAndRemoveEvents(removedEvents, lastDispatchedId); |
| } catch (CacheException e) { |
| // ignore and log |
| logger.error("Exception occurred while trying to set the last dispatched id", |
| e); |
| } |
| } |
| |
| // Periodic ack from the client will add to the addDispatchMessage Map. |
| // This method gets called from cacheClientNotifier upon receiving the ack from client. |
| // addDispatchedMessage(tid, lastDispatchedId); |
| } |
| groupedThreadIDs = null; |
| // removed the events from queue, now clear the peekedEventsContext |
| |
| setPeekedEvents(); |
| } |
| |
| protected Object getNextAvailableIDFromList() throws InterruptedException { |
| return this.getNextAvailableID(); |
| } |
| |
| protected void storePeekedID(Long id) { |
| // ARB: Implemented in DurableHARegionQueue. |
| } |
| |
| @Override |
| public Object peek() throws InterruptedException { |
| if (Thread.interrupted()) { |
| throw new InterruptedException(); |
| } |
| Conflatable object = null; |
| Long next = null; |
| |
| while (true) { |
| try { |
| next = (Long) this.getNextAvailableIDFromList(); |
| if (next == null) { |
| break; |
| } |
| } catch (TimeoutException ignore) { |
| throw new InterruptedException(); |
| } |
| |
| object = (Conflatable) this.region.get(next); |
| |
| // It is possible for the object to be null if a queue removal |
| // occurred between getting the next available ID and getting the object |
| // from the region. If this happens, on the next iteration of this loop we will |
| // get a different available ID to process |
| if (object != null) { |
| object = (object instanceof HAEventWrapper) ? (Conflatable) this.haContainer.get(object) |
| : object; |
| |
| if (object != null) { |
| List peekedEvents; |
| if ((peekedEvents = (List) HARegionQueue.peekedEventsContext.get()) != null) { |
| peekedEvents.add(next); |
| } else { |
| peekedEvents = new LinkedList(); |
| peekedEvents.add(next); |
| HARegionQueue.peekedEventsContext.set(peekedEvents); |
| } |
| this.storePeekedID(next); |
| break; |
| } |
| } |
| } |
| // since size is zero, return null |
| if (logger.isTraceEnabled()) { |
| logger.trace("HARegionQueue::peek: Returning object from head = {}", object); |
| } |
| return object; |
| } |
| |
| @Override |
| public List peek(int batchSize) throws InterruptedException { |
| return peek(batchSize, -1); |
| } |
| |
| /** |
| * Return a batch of minimum specified size |
| * |
| * @param minSize minimum number to return |
| * @return null if minimum was not present |
| */ |
| private List doReturn(int minSize, int maxSize) { |
| acquireReadLock(); |
| try { |
| int numToReturn = this.idsAvailable.size(); |
| if (numToReturn < minSize) { |
| return null; |
| } |
| if (numToReturn > maxSize) { |
| numToReturn = maxSize; |
| } |
| return getBatchAndUpdateThreadContext(numToReturn); |
| } finally { |
| releaseReadLock(); |
| } |
| |
| } |
| |
| /** |
| * Peeks either a batchSize number of elements from the queue or until timeToWait milliseconds |
| * have elapsed (whichever comes first). If it has peeked batchSize number of elements from the |
| * queue before timeToWait milliseconds have elapsed, it stops peeking. If timeToWait milliseconds |
| * elapse before batchSize number of elements has been peeked, it stops. All the counters peeked |
| * for the batch are added to the thread-context, so that upon calling of remove(), all the peeked |
| * events of the batch are removed from the queue. |
| * |
| * If the Queue is non blocking multiple peek operations can proceed but if it is of type non |
| * blocking only one peek operation will proceed as the blocking queue does not use ReadWriteLock |
| * |
| * @param batchSize The number of objects to peek from the queue |
| * @param timeToWait The number of milliseconds to attempt to peek |
| * |
| * @return The list of events peeked |
| */ |
| @Override |
| public List peek(int batchSize, int timeToWait) throws InterruptedException { |
| long start = System.currentTimeMillis(); |
| long end = start + timeToWait; |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Peek start time={} end time={} time to wait={}", this, start, end, |
| timeToWait); |
| } |
| |
| // If we're full, just return the results. |
| List result = doReturn(batchSize, batchSize); |
| if (result != null) { |
| return result; |
| } |
| |
| // Need to wait for a while. |
| for (;;) { |
| region.getCache().getCancelCriterion().checkCancelInProgress(null); |
| result = doReturn(batchSize, batchSize); |
| if (result != null) { |
| return result; // full now |
| } |
| |
| // If time to wait is -1 (don't wait) or time interval has elapsed |
| long currentTime = System.currentTimeMillis(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Peek current time: {}", this, currentTime); |
| } |
| if (timeToWait == -1 || currentTime >= end) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Peek timed out", this); |
| } |
| // Return whatever's there. |
| result = doReturn(0, batchSize); |
| Assert.assertTrue(result != null); |
| return result; |
| } |
| |
| // Sleep a bit before trying again. |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Peek continuing", this); |
| } |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(50); // TODO this seems kinda busy IMNSHO -- jason |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| this.region.getCancelCriterion().checkCancelInProgress(null); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } |
| |
| /** |
| * This method prepares the batch of events and updates the thread-context with corresponding |
| * counters, so that when remove is called by this thread, these events are destroyed from the |
| * queue.This method should always be invoked within the {@code rwLock}. |
| * |
| * @param batchSize - number of events to be peeked |
| * @return - list of events peeked |
| */ |
| private List getBatchAndUpdateThreadContext(int batchSize) { |
| Iterator itr = this.idsAvailable.iterator(); |
| int currSize = this.idsAvailable.size(); |
| int limit = currSize >= batchSize ? batchSize : currSize; |
| List batch = new ArrayList(limit); |
| |
| List peekedEventsThreadContext; |
| if ((peekedEventsThreadContext = (List) HARegionQueue.peekedEventsContext.get()) == null) { |
| peekedEventsThreadContext = new LinkedList(); |
| } |
| for (int i = 0; i < limit; i++) { |
| Long counter = (Long) itr.next(); |
| Object eventOrWrapper = this.region.get(counter); |
| Object event; |
| if (eventOrWrapper instanceof HAEventWrapper) { |
| event = haContainer.get(eventOrWrapper); |
| if (event == null) { |
| event = ((HAEventWrapper) eventOrWrapper).getClientUpdateMessage(); |
| } |
| } else { |
| event = eventOrWrapper; |
| } |
| if (event != null) { |
| batch.add(event); |
| } |
| peekedEventsThreadContext.add(counter); |
| } |
| |
| HARegionQueue.peekedEventsContext.set(peekedEventsThreadContext); |
| return batch; |
| } |
| |
| @Override |
| public void addCacheListener(CacheListener listener) { |
| // nothing |
| } |
| |
| @Override |
| public void removeCacheListener() { |
| // nothing |
| } |
| |
| /** |
| * It adds the entry to a static data structure dispatchedMessagesMap which is periodically |
| * operated upon by the QRM thread. |
| * |
| * @param tid - the ThreadIdentifier object for this event |
| * @param sequenceId - the sequence id for this event |
| */ |
| public void addDispatchedMessage(ThreadIdentifier tid, long sequenceId) { |
| Long lastSequenceNumber = sequenceId; |
| boolean wasEmpty = false; |
| Long oldvalue = null; |
| Map internalMap = null; |
| while (true) { |
| internalMap = this.threadIdToSeqId.map; |
| synchronized (internalMap) { |
| if (internalMap != this.threadIdToSeqId.map) { |
| continue; |
| } else { |
| wasEmpty = internalMap.isEmpty(); |
| oldvalue = (Long) internalMap.put(tid, lastSequenceNumber); |
| if (ackedEvents != null) { |
| ackedEvents.put(tid, lastSequenceNumber); |
| } |
| if (oldvalue != null && oldvalue.compareTo(lastSequenceNumber) > 0) { |
| internalMap.put(tid, oldvalue); |
| if (ackedEvents != null) { |
| ackedEvents.put(tid, oldvalue); |
| } |
| } |
| break; |
| } |
| } |
| } |
| if (wasEmpty) { |
| // Asif: If the wasEmpty flag is true this would mean that the |
| // dispatchedMessagesMap |
| // should not contain the threadIdToSeqID Map as it should have been |
| // removed by QRM thread |
| // Also because unless the map is actually put by the thread experiencing |
| // the |
| // wasEmpty flag true, the QRM thread cannot find it & hence cannot toggle |
| // it. |
| // Thus a condition where the internalMap being put by the dispatcher |
| // thread is stale ( that |
| // is already replaced by QRM thread ) cannot arise. Once the dispatcher |
| // thread |
| // has put the Map , then the QRM thread may empty it & replace it with a |
| // new Map |
| // Assert.assertTrue(!dispatchedMessagesMap.containsKey(this.regionName)); |
| // dispatchedMessagesMap.put(this.regionName, this.threadIdToSeqId); |
| Map tempDispatchedMessagesMap = dispatchedMessagesMap; |
| if (tempDispatchedMessagesMap != null) { |
| Object old = ((ConcurrentMap) tempDispatchedMessagesMap).putIfAbsent(this.regionName, |
| this.threadIdToSeqId); |
| if (isUsedByTest) { |
| testMarkerMessageReceived = true; |
| if (logger.isDebugEnabled()) { |
| logger.debug("testIsAckReceived: {}", testMarkerMessageReceived); |
| } |
| } |
| Assert.assertTrue(old == null); |
| } |
| |
| } |
| } |
| |
| public void createAckedEventsMap() { |
| |
| } |
| |
| public void setAckedEvents() { |
| |
| } |
| |
| /** |
| * Used for testing purposes only |
| * |
| * @return Map object |
| */ |
| public static Map getDispatchedMessagesMapForTesting() { |
| return Collections.unmodifiableMap(dispatchedMessagesMap); |
| } |
| |
| /** |
| * Used for testing purposes only |
| * |
| * @return Map object |
| */ |
| Map getConflationMapForTesting() { |
| return Collections.unmodifiableMap(this.indexes); |
| } |
| |
| public HARegionQueueStats getStatistics() { |
| return this.stats; |
| } |
| |
| /** |
| * Used for testing purposes only |
| * |
| * @return Map object containing DispatchedAndCurrentEvents object for a ThreadIdentifier |
| */ |
| Map getEventsMapForTesting() { |
| return Collections.unmodifiableMap(this.eventsMap); |
| } |
| |
| |
| |
| /** |
| * Used for testing purposes only. Returns the set of current counters for the given |
| * ThreadIdentifier |
| * |
| * @param id - the EventID object |
| * @return - the current counters set |
| */ |
| Set getCurrentCounterSet(EventID id) { |
| Set counters = null; |
| ThreadIdentifier tid = getThreadIdentifier(id); |
| DispatchedAndCurrentEvents wrapper = (DispatchedAndCurrentEvents) this.eventsMap.get(tid); |
| if (wrapper != null) { |
| synchronized (wrapper) { |
| if (wrapper.isCountersEmpty()) { |
| counters = Collections.emptySet(); |
| } else { |
| counters = Collections.unmodifiableSet(wrapper.counters.keySet()); |
| } |
| } |
| } |
| return counters; |
| } |
| |
| /** |
| * Used for testing purposes only. Returns the last dispatched sequenceId for the given |
| * ThreadIdentifier |
| * |
| * @param id - the EventID object |
| * @return - the current counters set |
| */ |
| long getLastDispatchedSequenceId(EventID id) { |
| ThreadIdentifier tid = getThreadIdentifier(id); |
| DispatchedAndCurrentEvents wrapper = (DispatchedAndCurrentEvents) this.eventsMap.get(tid); |
| return wrapper.lastDispatchedSequenceId; |
| } |
| |
| /** |
| * Used for testing purposes only |
| * |
| */ |
| Set getAvailableIds() { |
| acquireReadLock(); |
| try { |
| return Collections.unmodifiableSet(this.idsAvailable); |
| } finally { |
| releaseReadLock(); |
| } |
| } |
| |
| /** |
| * This method is invoked by the QRM to remove the IDs which have already been dispatched by the |
| * primary node. It sets the last dispatched sequence ID in the DACE iff the sequenceID received |
| * is more than the current. If the renoval emssage arrives before the DACE was created, it |
| * creates a DACE. Only one QRM operates at a time on a DACE & any other mesasge will be waiting |
| * for the current thread to exit. This is accomplished by taking a lock on QRM_LOCK object in the |
| * DACE. |
| * |
| * @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched sequence |
| * Id |
| */ |
| protected void removeDispatchedEvents(EventID lastDispatched) |
| throws CacheException, InterruptedException { |
| ThreadIdentifier ti = getThreadIdentifier(lastDispatched); |
| long sequenceID = lastDispatched.getSequenceID(); |
| // get the DispatchedAndCurrentEvents object for this threadID |
| DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(ti); |
| if (dace != null && dace.lastDispatchedSequenceId < sequenceID) { |
| dace.setLastDispatchedIDAndRemoveEvents(sequenceID); |
| } else if (dace == null) { |
| dace = new DispatchedAndCurrentEvents(this); |
| dace.lastDispatchedSequenceId = sequenceID; |
| DispatchedAndCurrentEvents oldDace = |
| (DispatchedAndCurrentEvents) this.eventsMap.putIfAbsent(ti, dace); |
| if (oldDace != null) { |
| dace = oldDace; |
| if (dace.lastDispatchedSequenceId < sequenceID) { |
| dace.setLastDispatchedIDAndRemoveEvents(sequenceID); |
| } |
| } else { |
| // Add the recently added ThreadIdentifier to the RegionQueue for |
| // expiry |
| this.region.put(ti, dace.lastDispatchedSequenceId); |
| // update the stats |
| this.stats.incThreadIdentifiers(); |
| } |
| } |
| } |
| |
| /** |
| * Returns the size of the queue |
| * |
| * @return the size of the queue |
| */ |
| @Override |
| public int size() { |
| acquireReadLock(); |
| try { |
| return this.idsAvailable.size(); |
| } finally { |
| releaseReadLock(); |
| } |
| } |
| |
| void incrementTakeSidePutPermits() { |
| // nothing |
| } |
| |
| // called from dace on a put. |
| // Increment ha queue stats and cq stats |
| void entryEnqueued(Conflatable event) { |
| stats.incEventsEnqued(); |
| maintainCqStats(event, 1); |
| } |
| |
| private void maintainCqStats(Object event, long incrementAmount) { |
| CqService cqService = region.getGemFireCache().getCqService(); |
| if (cqService != null) { |
| try { |
| if (event instanceof HAEventWrapper) { |
| HAEventWrapper hw = (HAEventWrapper) event; |
| if (hw.getClientUpdateMessage() != null) { |
| event = hw.getClientUpdateMessage(); |
| } else { |
| event = (Conflatable) this.haContainer.get(event); |
| } |
| |
| |
| if (event instanceof ClientUpdateMessage) { |
| if (((ClientUpdateMessage) event).hasCqs() |
| && ((ClientUpdateMessage) event).hasCqs(clientProxyID)) { |
| CqNameToOp cqNames = ((ClientUpdateMessage) event).getClientCq(clientProxyID); |
| if (cqNames != null) { |
| for (String cqName : cqNames.getNames()) { |
| InternalCqQuery cq = |
| ((InternalCqQuery) cqService.getClientCqFromServer(clientProxyID, cqName)); |
| CqQueryVsdStats cqStats = cq.getVsdStats(); |
| if (cq != null && cqStats != null) { |
| cqStats.incNumHAQueuedEvents(incrementAmount); |
| } |
| } |
| } |
| } |
| } |
| } |
| } catch (Exception e) { |
| // catch exceptions that arise due to maintaining cq stats |
| // as maintaining cq stats should not affect the system. |
| if (logger.isTraceEnabled()) { |
| logger.trace("Exception while maintaining cq events stats.", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Caller must hold the rwLock. |
| * |
| * @return true if the queue contains objects |
| */ |
| boolean internalIsEmpty() { |
| return this.idsAvailable.isEmpty(); |
| } |
| |
| /** |
| * test hook to see if the queue is empty |
| * |
| * @return true if the queue is empty, false if not |
| */ |
| public boolean isEmpty() { |
| acquireReadLock(); |
| try { |
| return internalIsEmpty(); |
| } finally { |
| releaseReadLock(); |
| } |
| } |
| |
| /** |
| * Acquires the write Lock for the non blocking class. This method is overridden in the |
| * BlockingHARegionQueue class which acquires the lock on a ReentrantLock instead of |
| * ReentrantReadWriteLock of this class. A write lock is aquired by any thread which intends to |
| * modify the idsAvailable HashSet , which can be either a put ,remove , take , QRM message or |
| * expiry thread |
| * |
| * All invocations of this method need to have {@link #releaseWriteLock()} in a matching finally |
| * block. |
| * |
| * <p> |
| * author Asif |
| */ |
| void acquireWriteLock() { |
| this.writeLock.lock(); |
| } |
| |
| /** |
| * Acquires the read Lock for the non blocking class. This method is overridden in the |
| * BlockingHARegionQueue class which acquires the lock on a ReentrantLock instead of |
| * ReentrantReadWriteLock of this class. A read lock is aquired by a non blocking peek while |
| * operating on the idsAvailable LinkedHashSet without structurally modifying it. |
| * |
| * All invocations of this method must have {@link #releaseReadLock()} in a matching finally |
| * block. |
| * |
| * <p> |
| * author Asif |
| */ |
| void acquireReadLock() { |
| // TODO should this be interruptible? |
| this.readLock.lock(); |
| } |
| |
| /** |
| * Releases the Read lock. Overridden in the BlockingHARegionQueue class. |
| * |
| */ |
| void releaseReadLock() { |
| this.readLock.unlock(); |
| } |
| |
| /** |
| * Releases the write lock. Overridden in the BlockingHARegionQueue class. |
| * |
| */ |
| void releaseWriteLock() { |
| this.writeLock.unlock(); |
| } |
| |
| /** |
| * A no op. Overridden in the BlockingHARegionQueue class. |
| * |
| * <p> |
| * author Asif |
| */ |
| void notifyPeekAndTakeThreads() { |
| // NO Op for blocking queue |
| } |
| |
| /** |
| * Always returns false for a HARegionQueue class. Suitably overridden in BlockingHARegionQueue |
| * class. |
| * |
| * @return false for HAREgionQueue as this is a non blocking class |
| */ |
| boolean waitForData() throws InterruptedException { |
| return false; |
| } |
| |
| /** |
| * Utility method which extracts ThreadIdentifier from an EventID object |
| * |
| * @param eventId EventID object |
| * @return ThreadIdentifier object |
| */ |
| protected static ThreadIdentifier getThreadIdentifier(EventID eventId) { |
| return new ThreadIdentifier(eventId.getMembershipID(), eventId.getThreadID()); |
| } |
| |
| /** |
| * get the qrm thread for testing purposes |
| * |
| */ |
| static void stopQRMThread() { |
| qrmThread.shutdown(); |
| } |
| |
| /** |
| * Calls the createMessageList method of QueueRemovalThread for testing purposes |
| * |
| * @return message list for testing |
| */ |
| static List createMessageListForTesting() { |
| return qrmThread.createMessageList(); |
| } |
| |
| /** |
| * Creates a HARegionQueue object with default attributes Used by tests |
| * |
| * @param regionName uniquely identifies the HARegionQueue in the VM.For HARegionQueues across the |
| * VM to communicate with each other , the name should be identical |
| * @param cache Gemfire Cache instance |
| * @param haRgnQType int identifying whether the HARegionQueue is of type blocking or non blocking |
| * @return an instance of HARegionQueue |
| */ |
| public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache, |
| final int haRgnQType, final boolean isDurable) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| Map container = null; |
| if (haRgnQType == HARegionQueue.BLOCKING_HA_QUEUE) { |
| container = new HAContainerMap(new ConcurrentHashMap()); |
| } else { |
| // Should actually be HAContainerRegion, but ok if only JUnits using this |
| // method. |
| container = new HashMap(); |
| } |
| |
| return getHARegionQueueInstance(regionName, cache, |
| HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haRgnQType, isDurable, container, null, |
| Handshake.CONFLATION_DEFAULT, false, Boolean.FALSE); |
| } |
| |
| /** |
| * Creates a HARegionQueue object with default attributes |
| * |
| * @param regionName uniquely identifies the HARegionQueue in the VM.For HARegionQueues across the |
| * VM to communicate with each other , the name should be identical |
| * @param cache Gemfire Cache instance |
| * @param hrqa HARegionQueueAttribute instance used for configuring the HARegionQueue |
| * @param haRgnQType int identifying whether the HARegionQueue is of type blocking or non blocking |
| * @param isPrimary whether this is the primary queue for the client |
| * @param canHandleDelta boolean indicating whether the HARegionQueue can handle delta or not |
| * @return an instance of HARegionQueue |
| */ |
| public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache, |
| HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable, Map haContainer, |
| ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary, |
| boolean canHandleDelta) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| |
| HARegionQueue hrq = null; |
| switch (haRgnQType) { |
| case BLOCKING_HA_QUEUE: |
| if (!isDurable && !canHandleDelta) { |
| hrq = new BlockingHARegionQueue(regionName, cache, hrqa, haContainer, clientProxyId, |
| clientConflation, isPrimary); |
| } else { |
| hrq = new DurableHARegionQueue(regionName, cache, hrqa, haContainer, clientProxyId, |
| clientConflation, isPrimary); |
| } |
| break; |
| case NON_BLOCKING_HA_QUEUE: |
| hrq = new HARegionQueue(regionName, cache, haContainer, clientProxyId, clientConflation, |
| isPrimary); |
| break; |
| default: |
| throw new IllegalArgumentException( |
| String.format("haRgnQType can either be BLOCKING ( %s ) or NON BLOCKING ( %s )", |
| new Object[] {BLOCKING_HA_QUEUE, NON_BLOCKING_HA_QUEUE})); |
| } |
| if (!isDurable) { |
| Optional<Integer> expiryTime = |
| getProductIntegerProperty(HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY); |
| hrqa.setExpiryTime(expiryTime.orElseGet(hrqa::getExpiryTime)); |
| ExpirationAttributes ea = |
| new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE); |
| hrq.region.getAttributesMutator().setEntryTimeToLive(ea); |
| } |
| return hrq; |
| } |
| |
| /** |
| * Creates a HARegionQueue object with default attributes. used by tests |
| * |
| * @return an instance of HARegionQueue |
| * @since GemFire 5.7 |
| */ |
| public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache, |
| HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| Map container = null; |
| if (haRgnQType == HARegionQueue.BLOCKING_HA_QUEUE) { |
| container = new HAContainerMap(new ConcurrentHashMap()); |
| } else { |
| // Should actually be HAContainerRegion, but ok if only JUnits using this |
| // method. |
| container = new HashMap(); |
| } |
| |
| return getHARegionQueueInstance(regionName, cache, hrqa, haRgnQType, isDurable, container, null, |
| Handshake.CONFLATION_DEFAULT, false, Boolean.FALSE); |
| } |
| |
| public boolean isEmptyAckList() { |
| synchronized (this.threadIdToSeqId.list) { |
| return this.threadIdToSeqId.list.isEmpty(); |
| } |
| } |
| |
| public void closeClientCq(ClientProxyMembershipID clientId, InternalCqQuery cqToClose) { |
| acquireReadLock(); |
| try { |
| // Get all available Ids for the HA Region Queue |
| Object[] availableIds = this.availableIDsArray(); |
| int currSize = availableIds.length; |
| |
| Object event = null; |
| for (int i = 0; i < currSize; i++) { |
| Long counter = (Long) availableIds[i]; |
| event = this.region.get(counter); |
| HAEventWrapper wrapper = null; |
| if (event instanceof HAEventWrapper) { |
| wrapper = (HAEventWrapper) event; |
| event = this.haContainer.get(event); |
| } |
| |
| // Since this method is invoked in a readlock , the entry in HARegion |
| // cannot be null |
| if (event == null) { |
| Assert.assertTrue(this.destroyInProgress, |
| "Got event null when queue was not being destroyed"); |
| } |
| |
| if (event instanceof ClientUpdateMessageImpl) { |
| ClientUpdateMessageImpl updateEvent = (ClientUpdateMessageImpl) event; |
| updateEvent.removeClientCq(clientId, cqToClose); |
| // If no more interest and no more cqs remove from available ids and backing region |
| if (!updateEvent.hasCqs(clientId) && !updateEvent.isClientInterested(clientId)) { |
| if (wrapper != null) { |
| try { |
| if (this.destroyFromAvailableIDsAndRegion(counter)) { |
| stats.incEventsRemoved(); |
| } |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| } |
| } finally { |
| releaseReadLock(); |
| } |
| } |
| |
| public Object updateHAEventWrapper(InternalDistributedMember sender, |
| CachedDeserializable newValueCd, String regionName) { |
| Object inputValue; |
| try { |
| inputValue = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(), |
| sender.getVersionObject(), null); |
| newValueCd = new VMCachedDeserializable(inputValue, newValueCd.getSizeInBytes()); |
| } catch (IOException | ClassNotFoundException e) { |
| throw new RuntimeException("Unable to deserialize HA event for region " + regionName); |
| } |
| if (inputValue instanceof HAEventWrapper) { |
| HAEventWrapper inputHaEventWrapper = (HAEventWrapper) inputValue; |
| // Key was removed at sender side so not putting it into the HARegion |
| if (inputHaEventWrapper.getClientUpdateMessage() == null) { |
| return null; |
| } |
| // Getting the instance from singleton CCN..This assumes only one bridge |
| // server in the VM |
| HAContainerWrapper haContainer = |
| (HAContainerWrapper) CacheClientNotifier.getInstance().getHaContainer(); |
| if (haContainer == null) { |
| return null; |
| } |
| HAEventWrapper entryHaEventWrapper = null; |
| do { |
| ClientUpdateMessageImpl entryMessage = (ClientUpdateMessageImpl) haContainer |
| .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage()); |
| if (entryMessage != null) { |
| entryHaEventWrapper = (HAEventWrapper) haContainer.getKey(inputHaEventWrapper); |
| if (entryHaEventWrapper == null) { |
| continue; |
| } |
| synchronized (entryHaEventWrapper) { |
| if (entryHaEventWrapper == (HAEventWrapper) haContainer.getKey(entryHaEventWrapper)) { |
| entryHaEventWrapper.incAndGetReferenceCount(); |
| addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer, |
| regionName); |
| inputHaEventWrapper.setClientUpdateMessage(null); |
| newValueCd = |
| new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("GII Update of Event ID hash code: " + entryHaEventWrapper.hashCode() |
| + "; System ID hash code: " + System.identityHashCode(entryHaEventWrapper) |
| + "; Wrapper details: " + entryHaEventWrapper); |
| } |
| } else { |
| entryHaEventWrapper = null; |
| } |
| } |
| } else { // putIfAbsent successful |
| synchronized (inputHaEventWrapper) { |
| inputHaEventWrapper.incAndGetReferenceCount(); |
| inputHaEventWrapper.setHAContainer(haContainer); |
| inputHaEventWrapper.setClientUpdateMessage(null); |
| if (logger.isDebugEnabled()) { |
| logger.debug("GII Add of Event ID hash code: " + inputHaEventWrapper.hashCode() |
| + "; System ID hash code: " + System.identityHashCode(inputHaEventWrapper) |
| + "; Wrapper details: " + entryHaEventWrapper); |
| } |
| } |
| break; |
| } |
| // try until we either get a reference to HAEventWrapper from |
| // HAContainer or successfully put one into it. |
| } while (entryHaEventWrapper == null); |
| } |
| return newValueCd; |
| } |
| |
| /** |
| * This is an implementation of RegionQueue where peek() & take () are blocking operation and will |
| * not return unless it gets some legitimate value The Lock object used by this class is a |
| * ReentrantLock & not a ReadWriteLock as in the base class. This reduces the concurrency of peek |
| * operations, but it enables the condition object of the ReentrantLock used to guard the |
| * idsAvailable Set for notifying blocking peek & take operations. Previously a separate Lock |
| * object was used by the BlockingQueue for wait notify. This class will be performant if there is |
| * a single peek thread. |
| */ |
| private static class BlockingHARegionQueue extends HARegionQueue { |
| /** |
| * Guards the Put permits |
| */ |
| private final Object putGuard = new Object(); |
| |
| private final int capacity; |
| |
| /** |
| * Current put permits available |
| */ |
| private int putPermits; |
| |
| /** |
| * Current take permits available |
| */ |
| private int takeSidePutPermits = 0; |
| |
| /** |
| * Lock on which the put thread waits for permit & on which take/remove thread issue notify |
| */ |
| private final Object permitMon = new Object(); |
| |
| // Lock on which the take & remove threads block awaiting data from put |
| // operations |
| private final StoppableReentrantLock lock; |
| |
| /** |
| * Condition object on which peek & take threads will block |
| */ |
| protected final StoppableCondition blockCond; |
| |
| /** |
| * @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can |
| * be set |
| * @param isPrimary whether this is the primary queue for a client |
| */ |
| protected BlockingHARegionQueue(String regionName, InternalCache cache, |
| HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId, |
| final byte clientConflation, boolean isPrimary) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| super(regionName, cache, haContainer, clientProxyId, clientConflation, isPrimary); |
| this.capacity = hrqa.getBlockingQueueCapacity(); |
| this.putPermits = this.capacity; |
| this.lock = new StoppableReentrantLock(this.region.getCancelCriterion()); |
| this.blockCond = lock.newCondition(); |
| |
| super.putGIIDataInRegion(); |
| if (this.getClass() == BlockingHARegionQueue.class) { |
| initialized.set(true); |
| } |
| } |
| |
| @Override |
| public void destroy() throws CacheWriterException { |
| try { |
| super.destroy(); |
| } finally { |
| synchronized (this.permitMon) { |
| this.permitMon.notifyAll(); // fix for bug 37581 |
| } |
| } |
| } |
| |
| /** |
| * Checks whether a put operation should block or proceed based on the capacity constraint of |
| * the Queue. Initially it checks only via Put Side Put Permits. If it is exhausted it checks |
| * the take side put permits using the appropriate lock. If the queue's capacity is exhausted |
| * then the thread goes into a finite wait state holding the Put Guard lock for the duration of |
| * EVENT_ENQUEUE_WAIT_TIME after which the thread proceeds to enqueue the event, allowing the |
| * putPermits go negative, if needed. Thus, no put thread is blocked for more than |
| * EVENT_ENQUEUE_WAIT_TIME. |
| * <p> |
| * This effectively makes the blocking queue behave like a non-blocking queue which throttles |
| * puts if it reaches its capacity. This was changed in 8.1, see #51400. This function is NOOP |
| * in the HARegionQueue. |
| */ |
| @Override |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT") |
| void checkQueueSizeConstraint() throws InterruptedException { |
| if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413 |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| synchronized (this.putGuard) { |
| if (putPermits <= 0) { |
| synchronized (this.permitMon) { |
| if (reconcilePutPermits() <= 0) { |
| if (region.getSystem().getConfig().getRemoveUnresponsiveClient()) { |
| isClientSlowReceiver = true; |
| } else { |
| try { |
| long logFrequency = CacheClientNotifier.DEFAULT_LOG_FREQUENCY; |
| CacheClientNotifier ccn = CacheClientNotifier.getInstance(); |
| if (ccn != null) { // check needed for junit tests |
| logFrequency = ccn.getLogFrequency(); |
| } |
| if ((this.maxQueueSizeHitCount % logFrequency) == 0) { |
| logger.warn("Client queue for {} client is full.", |
| new Object[] {region.getName()}); |
| this.maxQueueSizeHitCount = 0; |
| } |
| ++this.maxQueueSizeHitCount; |
| this.region.checkReadiness(); // fix for bug 37581 |
| // TODO: wait called while holding two locks |
| this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime); |
| this.region.checkReadiness(); // fix for bug 37581 |
| // Fix for #51400. Allow the queue to grow beyond its |
| // capacity/maxQueueSize, if it is taking a long time to |
| // drain the queue, either due to a slower client or the |
| // deadlock scenario mentioned in the ticket. |
| reconcilePutPermits(); |
| if ((this.maxQueueSizeHitCount % logFrequency) == 1) { |
| logger.info("Resuming with processing puts ..."); |
| } |
| } catch (InterruptedException ex) { |
| // TODO: The line below is meaningless. Comment it out later |
| this.permitMon.notifyAll(); |
| throw ex; |
| } |
| } |
| } |
| } // synchronized (this.permitMon) |
| } // if (putPermits <= 0) |
| --putPermits; |
| } // synchronized (this.putGuard) |
| } |
| } |
| |
| /** |
| * This function should always be called under a lock on putGuard & permitMon obejct |
| * |
| * @return int current Put permits |
| */ |
| private int reconcilePutPermits() { |
| putPermits += takeSidePutPermits; |
| takeSidePutPermits = 0; |
| return putPermits; |
| } |
| |
| /** |
| * Implemented to reduce contention between concurrent take/remove operations and put . The |
| * reconciliation between take side put permits & put side put permits happens only if theput |
| * side put permits are exhausted. In HARehionQueue base class this is a NOOP function. This was |
| * added in case a put operation which has reduced the put permit optmistically but due to some |
| * reason ( most likely because of duplicate event) was not added in the queue. In such case it |
| * will increment take side permit without notifying any waiting thread |
| */ |
| @Override |
| void incrementTakeSidePutPermitsWithoutNotify() { |
| synchronized (this.permitMon) { |
| ++this.takeSidePutPermits; |
| } |
| } |
| |
| /** |
| * Implemented to reduce contention between concurrent take/remove operations and put . The |
| * reconciliation between take side put permits & put side put permits happens only if theput |
| * side put permits are exhausted. In HARehionQueue base class this is a NOOP function |
| */ |
| @Override |
| void incrementTakeSidePutPermits() { |
| if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413 |
| synchronized (this.permitMon) { |
| ++this.takeSidePutPermits; |
| this.permitMon.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Identical to the acquireReadLock as there is only one type of Lock object in this class. |
| */ |
| @Override |
| void acquireWriteLock() { |
| this.lock.lock(); |
| } |
| |
| /** |
| * Identical to the acquireWriteLock as there is only one type of Lock object in this class. |
| */ |
| @Override |
| void acquireReadLock() { |
| this.lock.lock(); |
| } |
| |
| /** |
| * This method is called by the publish method when a valid Long position is added to the |
| * idsAvailable set. It should always be called after acquiring the ReentrantLock. It notifies |
| * the waiting peek & take threads. |
| * |
| * <p> |
| * author Asif |
| */ |
| @Override |
| void notifyPeekAndTakeThreads() { |
| blockCond.signalAll(); |
| } |
| |
| /** |
| * Returns true if data is available in the queue. This method should always be invoked after |
| * acquiring the lock on ReentrantLock object. It blocks the thread if the queue is empty or |
| * returns true otherwise . This will always return true indicating that data is available for |
| * retrieval or throw an Exception.It can never return false. |
| */ |
| @Override |
| boolean waitForData() throws InterruptedException { |
| while (this.internalIsEmpty()) { |
| region.getCache().getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.currentThread().isInterrupted(); |
| try { |
| blockCond.await(StoppableCondition.TIME_TO_WAIT); |
| } catch (InterruptedException ie) { |
| interrupted = true; |
| region.getCache().getCancelCriterion().checkCancelInProgress(ie); |
| throw new TimeoutException(ie); |
| } finally { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Noop method to prevent HARegionQueue population before the constructor of BlockingQueue is |
| * complete. |
| * |
| * <p> |
| * author Asif |
| */ |
| @Override |
| void putGIIDataInRegion() { |
| |
| } |
| |
| /** |
| * Identical to the releaseWriteLock as there is only one type of Lock object in this class. |
| * |
| */ |
| |
| @Override |
| void releaseReadLock() { |
| this.lock.unlock(); |
| } |
| |
| /** |
| * Identical to the releaseReadLock as there is only one type of Lock object in this class. |
| * |
| * <p> |
| * author Asif |
| * |
| */ |
| |
| @Override |
| void releaseWriteLock() { |
| this.lock.unlock(); |
| } |
| |
| } |
| |
| private static class DurableHARegionQueue extends BlockingHARegionQueue { |
| |
| private LinkedHashSet durableIDsList = null; |
| LinkedList unremovedElements = null; |
| HashMap currDurableMap = null; |
| |
| protected DurableHARegionQueue(String regionName, InternalCache cache, |
| HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId, |
| final byte clientConflation, boolean isPrimary) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary); |
| |
| this.threadIdToSeqId.keepPrevAcks = true; |
| this.durableIDsList = new LinkedHashSet(); |
| this.ackedEvents = new HashMap(); |
| this.initialized.set(true); |
| |
| } |
| |
| @Override |
| boolean waitForData() throws InterruptedException { |
| region.getCache().getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.currentThread().isInterrupted(); |
| try { |
| blockCond.await(StoppableCondition.TIME_TO_WAIT); |
| } catch (InterruptedException ie) { |
| interrupted = true; |
| region.getCache().getCancelCriterion().checkCancelInProgress(ie); |
| throw new TimeoutException(ie); |
| } finally { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| return !this.internalIsEmpty(); |
| } |
| |
| @Override |
| protected Object getNextAvailableIDFromList() throws InterruptedException { |
| return this.getAndRemoveNextAvailableID(); |
| } |
| |
| /** |
| * It is different from its super implementation only in not invoking |
| * incrementTakeSidePutPermits(). Fix for #41521. |
| */ |
| @Override |
| protected Long getAndRemoveNextAvailableID() throws InterruptedException { |
| Long next = null; |
| acquireWriteLock(); |
| try { |
| if (this.idsAvailable.isEmpty()) { |
| if (waitForData()) { |
| Iterator itr = this.idsAvailable.iterator(); |
| next = (Long) itr.next(); |
| itr.remove(); |
| } |
| } else { |
| Iterator itr = this.idsAvailable.iterator(); |
| next = (Long) itr.next(); |
| itr.remove(); |
| } |
| } finally { |
| releaseWriteLock(); |
| } |
| return next; |
| } |
| |
| @Override |
| protected void storePeekedID(Long id) { |
| acquireWriteLock(); |
| try { |
| this.durableIDsList.add(id); |
| } finally { |
| releaseWriteLock(); |
| } |
| } |
| |
| @Override |
| protected boolean checkPrevAcks() { |
| this.unremovedElements = new LinkedList(); |
| this.currDurableMap = new HashMap(); |
| synchronized (this.threadIdToSeqId.list) { |
| // ARB: Construct map of threadIdToSeqId from the list of maps made by |
| // QRM. |
| // This list of maps is combined into one map for efficiency. |
| while (!this.threadIdToSeqId.list.isEmpty()) { |
| this.currDurableMap.putAll((Map) this.threadIdToSeqId.list.remove(0)); |
| } |
| } |
| return !this.currDurableMap.isEmpty(); |
| } |
| |
| @Override |
| protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid, |
| long sequenceId) { |
| if (this.currDurableMap.isEmpty()) { |
| // ARB: If no acked events are found, do not remove peeked counter from |
| // HARegion. |
| this.unremovedElements.add(counter); |
| return false; |
| } |
| |
| Long seqId = (Long) this.currDurableMap.get(threadid); |
| if (seqId != null) { |
| if ((Long.valueOf(sequenceId)).compareTo(seqId) > 0) { |
| // ARB: The acked event is older than the current peeked event. |
| this.unremovedElements.add(counter); |
| return false; |
| } |
| } else { |
| // ARB: No events from current threadID were acked. |
| this.unremovedElements.add(counter); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| @Override |
| protected void setPeekedEvents() { |
| // ARB: Set peeked events list to the events that were not acked by client. |
| HARegionQueue.peekedEventsContext.set(unremovedElements.isEmpty() ? null : unremovedElements); |
| this.unremovedElements = null; |
| this.currDurableMap = null; |
| } |
| |
| /** |
| * Caller must hold the rwLock |
| */ |
| @Override |
| protected boolean removeFromOtherLists(Long position) { |
| return this.durableIDsList.remove(position); |
| } |
| |
| @Override |
| public void initializeTransients() { |
| // ARB: Durable client specific data structures for dispatcher. |
| // durableIDsList: list of counters from which dispatcher peeks. Used instead of availableIds |
| // list, |
| // because the iterator for peeks requires a list that is not modified during iteration. |
| // threadIdToSeqId.list: list of threadIdToSeqId maps. These maps are constructed from the |
| // acks that are sent by client - each map corresponds to an ack message. |
| |
| if (!this.durableIDsList.isEmpty()) { |
| this.acquireWriteLock(); |
| try { |
| |
| // ARB: The following addAll() op is expensive. @todo |
| // Can be optimized by removing addAll and changing usage of idsAvailable: |
| // Use durableIDsList before idsAvailable, i.e. first peek() from durableIDsList, |
| // followed by idsAvailable. Similarly, destroy() operations should remove id from |
| // either durableIDsList or idsAvailable. |
| |
| long start = System.currentTimeMillis(); |
| this.durableIDsList.addAll(this.idsAvailable); |
| this.idsAvailable = this.durableIDsList; |
| this.durableIDsList = new LinkedHashSet(); |
| long end = System.currentTimeMillis(); |
| if ((end - start) > 3000) { |
| logger.warn("Durable client queue initialization took {} ms.", |
| Long.toString(end - start)); |
| } |
| } finally { |
| this.releaseWriteLock(); |
| } |
| } |
| /* |
| * Setting this threadlocal variable to null has no use as the current thread never uses it. |
| * Instead it should really be set null by message dispatcher thread while starting or |
| * resuming. This was added in revision 20914. Need to check if it really needs to be thread |
| * local. |
| */ |
| peekedEventsContext.set(null); |
| this.threadIdToSeqId.list.clear(); |
| } |
| |
| /** |
| * Returns the size of the idsAvailable Set |
| * |
| * Caller must hold the rwLock |
| */ |
| @Override |
| protected int availableIDsSize() { |
| return this.idsAvailable.size() + this.durableIDsList.size(); |
| } |
| |
| /** |
| * Caller must hold rwLock |
| */ |
| @Override |
| protected Object[] availableIDsArray() { |
| // ARB: potentially expensive operation. |
| LinkedList retVal = new LinkedList(); |
| retVal.addAll(this.durableIDsList); |
| retVal.addAll(idsAvailable); |
| return retVal.toArray(); |
| } |
| |
| @Override |
| public int size() { |
| acquireReadLock(); |
| try { |
| return this.idsAvailable.size() + this.durableIDsList.size(); |
| } finally { |
| releaseReadLock(); |
| } |
| } |
| |
| @Override |
| public void createAckedEventsMap() { |
| ackedEvents = new HashMap(); |
| } |
| |
| @Override |
| public void setAckedEvents() { |
| if (threadIdToSeqId.keepPrevAcks) { |
| synchronized (threadIdToSeqId.list) { |
| threadIdToSeqId.list.add(ackedEvents); |
| } |
| } |
| } |
| |
| } |
| |
| /** |
| * Use caution while using it! |
| */ |
| public void clearPeekedIDs() { |
| peekedEventsContext.set(null); |
| } |
| |
| /** |
| * A static class which is created only for for testing prposes as some existing tests extend the |
| * HARegionQueue. Since the constructors of HAregionQueue are private , this class can act as a |
| * bridge between the user defined HARegionQueue class & the actual class. This class object will |
| * be buggy as it will tend to publish the Object o QRM thread & the expiry thread before the |
| * complete creation of the HARegionQueue instance |
| */ |
| static class TestOnlyHARegionQueue extends HARegionQueue { |
| /** |
| * Overloaded constructor to accept haContainer. |
| * |
| * @since GemFire 5.7 |
| */ |
| TestOnlyHARegionQueue(String regionName, InternalCache cache, Map haContainer) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haContainer, |
| Handshake.CONFLATION_DEFAULT, false); |
| this.initialized.set(true); |
| } |
| |
| TestOnlyHARegionQueue(String regionName, InternalCache cache) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, new HashMap(), |
| Handshake.CONFLATION_DEFAULT, false); |
| } |
| |
| TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa, |
| Map haContainer, final byte clientConflation, boolean isPrimary) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| super(regionName, cache, haContainer, null, clientConflation, isPrimary); |
| ExpirationAttributes ea = |
| new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE); |
| this.region.setOwner(this); |
| this.region.getAttributesMutator().setEntryTimeToLive(ea); |
| this.initialized.set(true); |
| } |
| |
| /** |
| * Overloaded constructor to pass an {@code HashMap} instance as a haContainer. |
| * |
| * @since GemFire 5.7 |
| */ |
| TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| this(regionName, cache, hrqa, new HashMap(), Handshake.CONFLATION_DEFAULT, false); |
| } |
| } |
| |
| /** |
| * This thread will check for messages which have been dispatched. After a configurable time or |
| * size is reached, it will create a new {@code QueueRemovalMessage} and send it to all the nodes |
| * in the DistributedSystem |
| */ |
| private static class QueueRemovalThread extends Thread { |
| |
| /** |
| * boolean to make a shutdown request |
| */ |
| private volatile boolean shutdown = false; |
| |
| private final InternalCache cache; |
| |
| /** |
| * Constructor : Creates and initializes the thread |
| */ |
| public QueueRemovalThread(InternalCache cache) { |
| this.setDaemon(true); |
| this.cache = cache; |
| } |
| |
| private boolean checkCancelled() { |
| if (shutdown) { |
| return true; |
| } |
| if (cache.getCancelCriterion().isCancelInProgress()) { |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * The thread will check the dispatchedMessages map for messages that have been dispatched. It |
| * will create a new {@code QueueRemovalMessage} and send it to the other nodes |
| */ |
| @Override |
| public void run() { |
| InternalDistributedSystem ids = cache.getInternalDistributedSystem(); |
| DistributionManager dm = ids.getDistributionManager(); |
| |
| try { // ensure exit message is printed |
| // Long waitTime = Long.getLong(QUEUE_REMOVAL_WAIT_TIME, 1000); |
| for (;;) { |
| try { // be somewhat tolerant of failures |
| if (checkCancelled()) { |
| break; |
| } |
| |
| // TODO : make the thread running time configurable |
| boolean interrupted = Thread.interrupted(); |
| try { |
| synchronized (this) { |
| this.wait(messageSyncInterval * 1000); |
| } |
| } catch (InterruptedException e) { |
| interrupted = true; |
| if (checkCancelled()) { |
| break; |
| } |
| logger.warn("InterruptedException occurred in QueueRemovalThread while waiting ", |
| e); |
| break; // desperation...we must be trying to shut down...? |
| } finally { |
| // Not particularly important since we're exiting the thread, |
| // but following the pattern is still good practice... |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("QueueRemovalThread about to query the message list"); |
| } |
| List queueRemovalMessageList = this.createMessageList(); |
| if (queueRemovalMessageList != null && !queueRemovalMessageList.isEmpty()) { // messages |
| // exist |
| QueueRemovalMessage qrm = new QueueRemovalMessage(); |
| qrm.resetRecipients(); |
| List<CacheServer> servers = this.cache.getCacheServers(); |
| List<DistributedMember> recipients = new LinkedList(); |
| for (CacheServer server : servers) { |
| recipients.addAll(CacheServerImpl.class.cast(server).getCacheServerAdvisor() |
| .adviseBridgeServers()); |
| } |
| qrm.setRecipients(recipients); |
| qrm.setMessagesList(queueRemovalMessageList); |
| dm.putOutgoing(qrm); |
| } // messages exist |
| } // be somewhat tolerant of failures |
| catch (CancelException ignore) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("QueueRemovalThread is exiting due to cancellation"); |
| } |
| break; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| if (checkCancelled()) { |
| break; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("QueueRemovalThread: ignoring exception", t); |
| } |
| } |
| } // for |
| } // ensure exit message is printed |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("QueueRemovalThread exiting due to cancellation: ", e); |
| } |
| } finally { |
| logger.info("The QueueRemovalThread is done."); |
| } |
| } |
| |
| /** |
| * Creates a list containing the eventIds that have been dispatched by the clients. The QRM |
| * thread while operating on the MapWrapper for a given region , sets a new map object so that |
| * put operations are not blocked while the QRM thread is iterating over the map contained in |
| * MapWrapper & the put operations will continue using the new internal Ma. |
| * |
| */ |
| protected List createMessageList() { |
| Map.Entry entry = null; |
| Map.Entry internalEntry = null; |
| MapWrapper threadIdToSequenceIdMap = null; |
| String regionName = null; |
| ThreadIdentifier tid = null; |
| Long sequenceId = null; |
| EventID eventId = null; |
| List queueRemovalMessageList = new LinkedList(); |
| Iterator internalIterator = null; |
| Iterator iterator = dispatchedMessagesMap.entrySet().iterator(); |
| while (iterator.hasNext()) { |
| entry = (Map.Entry) iterator.next(); |
| regionName = (String) entry.getKey(); // key will be the string |
| // containing |
| // the region name |
| queueRemovalMessageList.add(regionName);// add region name to the list |
| // then add the number of event ids that will follow and finally the |
| // event |
| // ids themselves |
| threadIdToSequenceIdMap = (MapWrapper) entry.getValue(); |
| // take a lock since dispatcher can be adding at the same time |
| // Asif: After taking the lock , change the underlying Map object |
| // So that the dispatcher thread does not see the old Map object |
| Map internalMap = threadIdToSequenceIdMap.map; |
| synchronized (internalMap) { |
| // remove the current threadID toSequenceMap entry from the |
| // dispatchedEvenstMap |
| // within the lock |
| iterator.remove(); |
| threadIdToSequenceIdMap.map = new HashMap(); |
| } |
| // ARB: Add map to removal list for durable client queues. |
| /* |
| * if (threadIdToSequenceIdMap.keepPrevAcks) { synchronized (threadIdToSequenceIdMap.list) { |
| * threadIdToSequenceIdMap.list.add(internalMap); } } |
| */ |
| // first add the size within the lock |
| queueRemovalMessageList.add(internalMap.size()); |
| internalIterator = internalMap.entrySet().iterator(); |
| // then add the event ids to the message list within the lock |
| while (internalIterator.hasNext()) { |
| internalEntry = (Map.Entry) internalIterator.next(); |
| tid = (ThreadIdentifier) internalEntry.getKey(); |
| sequenceId = (Long) internalEntry.getValue(); |
| eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId); |
| queueRemovalMessageList.add(eventId); |
| } |
| } |
| return queueRemovalMessageList; |
| } |
| |
| /** |
| * shutdown this thread and the caller thread will join this thread |
| * |
| */ |
| public void shutdown() { |
| this.shutdown = true; |
| this.interrupt(); |
| |
| // Asif:Do not exit till QRM thread is dead , else the QRM thread |
| // may operate on Null dispatchedMessagesMap //Bug 37046 |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.join(15 * 1000); |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (this.isAlive()) { |
| logger.warn("QueueRemovalThread ignored cancellation"); |
| } |
| } |
| } |
| |
| /** |
| * Class which keeps track of the positions ( keys) of underlying Region object for the events |
| * placed in the Queue. It also keeps track of the last sequence ID dispatched. Thus all the |
| * events with sequence ID less than that dispatched are eligible for removal |
| */ |
| public static class DispatchedAndCurrentEvents implements DataSerializableFixedID, Serializable { |
| /** |
| * Keeps the track of last dispatched sequence ID. This field should be updated by the |
| * Dispatcher thread or the QRM message |
| */ |
| protected transient volatile long lastDispatchedSequenceId = -1L; |
| |
| /** Indicates if this object has expired* */ |
| private static final int TOKEN_DESTROYED = -2; |
| |
| /** |
| * Counters corresponding to this ThreadIdentifier. Note that LinkedHashMap is used instead of |
| * LinkedHashSet to save some memory. All we really put into this map is the key. This field is |
| * null until the first add. |
| */ |
| protected transient LinkedHashMap<Long, Object> counters; |
| |
| private transient volatile Object QRM_LOCK = new Object(); |
| |
| /** the queue that owns this DACE */ |
| transient HARegionQueue owningQueue; |
| |
| /** set to true if this was transferred from another VM during GII */ |
| transient boolean isGIIDace; |
| |
| public DispatchedAndCurrentEvents(HARegionQueue owner) { |
| this.owningQueue = owner; |
| } |
| |
| /** |
| * for deserialization. To be usable, the owningQueue of a DACE created with this method must be |
| * established. That isn't done by deserialization. |
| */ |
| public DispatchedAndCurrentEvents() {} |
| |
| /** |
| * Used for debugging purpose to ensure that in no situation , for a given ThreadIdentifier the |
| * order gets violated |
| */ |
| protected volatile long lastSequenceIDPut = INIT_OF_SEQUENCEID; |
| |
| /** |
| * This method adds to the conflation map & counters set if the add operation has not yet been |
| * dispatched. Also it is the responsibility of this method to remove the old conflated entry. |
| * As such we can assume only one thread can enter this function at a time because this DACE |
| * object gets created for every ThreadIdentifier . And a given thread ( corresponding to this |
| * thread identifier) is doing operation in sequence & a new add operation in DACE cannot happen |
| * till the old one is done. |
| * |
| * @param event Object to be added to the queue |
| * @param sequenceID Sequence ID of the event originating from a unqiue thread identified by its |
| * ThreadIdentifier |
| */ |
| protected boolean putObject(Conflatable event, long sequenceID) |
| throws CacheException, InterruptedException { |
| Long oldPosition = null; |
| final boolean isDebugEnabled_BS = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE); |
| if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID |
| && !owningQueue.puttingGIIDataInQueue) { |
| logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, |
| "HARegionQueue::DACE:putObject: Given sequence ID is already present ({}).\nThis may be a recovered operation via P2P or a GetInitialImage.\nlastSequenceIDPut = {} ; event = {};\n", |
| sequenceID, lastSequenceIDPut, event); |
| } |
| |
| boolean rejected = false; |
| Conflatable eventInHARegion = null; |
| |
| synchronized (this) { |
| if (sequenceID > this.lastSequenceIDPut) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("HARegionQueue.putObject: adding {}", event); |
| } |
| this.lastSequenceIDPut = sequenceID; |
| } else if (!owningQueue.puttingGIIDataInQueue) { |
| if (isDebugEnabled_BS) { |
| logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, |
| "{} eliding event with ID {}, because it is not greater than the last sequence ID ({}). The rejected event has key <{}> and value <{}>", |
| this, event.getEventId(), this.lastSequenceIDPut, event.getKeyToConflate(), |
| event.getValueToConflate()); |
| } |
| owningQueue.stats.incNumSequenceViolated(); |
| |
| // increase take side put permits instead of increasing put side permits |
| owningQueue.incrementTakeSidePutPermits();// WithoutNotify(); |
| CacheClientNotifier ccn = CacheClientNotifier.getInstance(); |
| if (ccn != null) { |
| ccn.getClientProxy(owningQueue.clientProxyID).getStatistics().incMessagesFailedQueued(); |
| } |
| return true; |
| } |
| |
| if (lastDispatchedSequenceId == TOKEN_DESTROYED) { |
| return false; |
| } |
| |
| if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) { |
| // Insert the object into the Region |
| Long position = owningQueue.tailKey.incrementAndGet(); |
| |
| eventInHARegion = owningQueue.putEventInHARegion(event, position); |
| |
| // Add the position counter to the LinkedHashSet |
| if (this.counters == null) { |
| this.counters = new LinkedHashMap<Long, Object>(); |
| } |
| this.counters.put(position, null); |
| |
| // Check if the event is conflatable |
| if (owningQueue.shouldBeConflated(eventInHARegion)) { |
| // Add to the conflation map & get the position of the |
| // old conflatable entry. The old entry may have inserted by the |
| // same |
| // ThreadIdentifier or different one. |
| oldPosition = owningQueue.addToConflationMap(eventInHARegion, position); |
| } |
| |
| // Take the size lock & add to the list of availabelIds |
| // TODO: Asif : To implement blocking peek & take , ideally notify |
| // should be issued |
| // from this block & hence this function should be appropriately |
| // overridden |
| owningQueue.publish(position); |
| } else { |
| rejected = true; |
| } |
| } |
| if (rejected) { |
| owningQueue.incrementTakeSidePutPermits();// WithoutNotify(); |
| CacheClientNotifier ccn = CacheClientNotifier.getInstance(); |
| if (ccn != null) { |
| ccn.getClientProxy(owningQueue.clientProxyID).getStatistics().incMessagesFailedQueued(); |
| } |
| } else { |
| owningQueue.entryEnqueued(eventInHARegion); |
| } |
| // Remove the old conflated position |
| if (oldPosition != null) { |
| // Obtain the DispatchedAndCurrentEvents object |
| Conflatable old = (Conflatable) owningQueue.region.get(oldPosition); |
| if (old != null) { |
| ThreadIdentifier oldTi = HARegionQueue.getThreadIdentifier(old.getEventId()); |
| DispatchedAndCurrentEvents oldDace = |
| (DispatchedAndCurrentEvents) owningQueue.eventsMap.get(oldTi); |
| if (oldDace != null) { |
| oldDace.removeOldConflatedEntry(oldPosition); |
| } |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Destroys the the old entry ( which got replaced by the new entry due to conflation) from the |
| * availableIDs , Region & Counters set. Since this is executed within a sync block by the new |
| * entry thread, it is guaranteed that the old entry thread will exit first , placing the |
| * position etc in the available IDs set. Also the new entry thread & old entry thread are |
| * belonging to different ThreadIdentifier objects & hence hold different |
| * DispatchedAndCurrentEvents object. |
| */ |
| private void removeOldConflatedEntry(Long oldPosition) |
| throws CacheException, InterruptedException { |
| synchronized (this) { |
| Conflatable conflatable = (Conflatable) owningQueue.region.get(oldPosition); |
| if (owningQueue.destroyFromAvailableIDsAndRegion(oldPosition)) { |
| if (this.counters != null) { |
| this.counters.remove(oldPosition); |
| } |
| // <HA overflow> |
| if (conflatable instanceof HAEventWrapper) { |
| owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable, |
| "Remove Old Conflated Entry"); |
| } |
| // </HA overflow> |
| // update statistics |
| |
| // Fix for bug 39291: |
| // Since markers are always conflated regardless of the conflation |
| // setting and they are not normal (are internal) events, we should |
| // not bump the events-conflated stat for markers. |
| if (!(conflatable instanceof ClientMarkerMessageImpl)) { |
| owningQueue.stats.incEventsConflated(); |
| } else { |
| owningQueue.stats.incMarkerEventsConflated(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Removes the Entry from the Counters Set contained in DACE & from the conflation Map. This |
| * method should be invoked only if the removal from available ID set returns true. |
| * |
| * @param position Long position to be removed from the Counter Set |
| * @param key Object used as the key in the conflation Map |
| * @param rName String region name against which the conflation map is stored |
| */ |
| protected void destroy(Long position, Object key, String rName) { |
| this.destroy(position); |
| // Remove from conflation Map if the position in the conflation map is the |
| // position |
| // that is passed |
| ConcurrentMap conflationMap = (ConcurrentMap) owningQueue.indexes.get(rName); |
| Assert.assertTrue(conflationMap != null); |
| conflationMap.remove(key, position); |
| } |
| |
| /** |
| * Removes the Entry from the Counters Set contained in DACE |
| */ |
| protected synchronized void destroy(Long position) { |
| if (this.counters != null) { |
| this.counters.remove(position); |
| } |
| } |
| |
| private synchronized boolean isCountersEmpty() { |
| return this.counters == null || this.counters.isEmpty(); |
| } |
| |
| /** |
| * Invoked by the Cache Listner attached on the HARegion when the entries experience expiry. |
| * This callabck is used to check whether DispatchedAndCurrentEvens object for a |
| * ThreadIdentifier is eligible for removal or not. A DACE object is removed if the last |
| * dispatched sequenec Id matches the expVal & the size of the counters set is 0 |
| * |
| * @param expVal long value indicating the sequence with which the ThreadIdentifier was last |
| * updated for expiry. |
| * @param ti ThreadIdentifier object corresponding to the thread which is being expired ( whose |
| * DispatchedAndCurrent Events object is being removed) |
| * @return boolean true if the ThreadIdentifier object for a given DACE was expired . |
| */ |
| protected boolean expireOrUpdate(long expVal, ThreadIdentifier ti) { |
| // Check if the object is a candidate for expiry |
| boolean expired = false; |
| synchronized (this) { |
| if (expVal == this.lastDispatchedSequenceId && isCountersEmpty()) { |
| try { |
| // Remove the ThreadIdentifier from the Region which was added for |
| // expiry |
| owningQueue.destroyFromQueue(ti); |
| this.lastDispatchedSequenceId = TOKEN_DESTROYED; |
| owningQueue.eventsMap.remove(ti); |
| expired = true; |
| this.owningQueue.getStatistics().decThreadIdentifiers(); |
| } catch (RegionDestroyedException e) { |
| if (!owningQueue.destroyInProgress && logger.isDebugEnabled()) { |
| logger.debug( |
| "DispatchedAndCurrentEvents::expireOrUpdate: Queue found destroyed while removing expiry entry for ThreadIdentifier={} and expiry value={}", |
| ti, expVal, e); |
| } |
| } catch (EntryNotFoundException enfe) { |
| if (!owningQueue.destroyInProgress) { |
| logger.error( |
| "DispatchedAndCurrentEvents::expireOrUpdate: Unexpectedly encountered exception while removing expiry entry for ThreadIdentifier={} and expiry value={}", |
| new Object[] {ti, expVal, enfe}); |
| } |
| } |
| } |
| } |
| if (!expired) { |
| try { |
| // Update the entry with latest sequence ID |
| owningQueue.region.put(ti, this.lastDispatchedSequenceId); |
| } catch (CancelException e) { |
| throw e; |
| } catch (Exception e) { |
| if (!owningQueue.destroyInProgress) { |
| logger.error(String.format( |
| "DispatchedAndCurrentEvents::expireOrUpdate: Unexpectedly encountered exception while updating expiry ID for ThreadIdentifier=%s", |
| ti), |
| e); |
| } |
| } |
| } |
| return expired; |
| } |
| |
| /** |
| * Invoked by the by the QRM message . This method sets the LastDispatched sequence ID in the |
| * DACE & destroys all the sequence Ids which are less than the last dispatched sequence ID. At |
| * a time only one thread operates on it which is accomplished by the QRM_LOCK. The lock on the |
| * DACE is minimized by copying the Counters Set & then identifying the positions which need to |
| * be removed |
| * |
| * @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE |
| */ |
| protected void setLastDispatchedIDAndRemoveEvents(long lastDispatchedSeqId) |
| throws CacheException, InterruptedException { |
| Long[] countersCopy = null; |
| synchronized (this.QRM_LOCK) { |
| synchronized (this) { |
| if (this.lastDispatchedSequenceId > lastDispatchedSeqId) { |
| // If the current last dispatched ID is greater than the new id , |
| // then do not set it |
| return; |
| } |
| this.lastDispatchedSequenceId = lastDispatchedSeqId; |
| if (this.counters != null) { |
| countersCopy = new Long[this.counters.size()]; |
| countersCopy = this.counters.keySet().toArray(countersCopy); |
| } |
| } // synchronized this |
| |
| if (countersCopy != null) { |
| for (int i = 0; i < countersCopy.length; i++) { |
| Long counter = countersCopy[i]; |
| Conflatable event = (Conflatable) owningQueue.region.get(counter); |
| if (event == null) { |
| // this.destroy(counter); event already destroyed? |
| continue; |
| } |
| |
| long seqId = event.getEventId().getSequenceID(); |
| if (seqId > this.lastDispatchedSequenceId) { |
| break; // we're done |
| } |
| |
| if (!owningQueue.destroyFromAvailableIDsAndRegion(counter)) { |
| continue; // still valid |
| } |
| |
| if (event instanceof HAEventWrapper) { |
| if (((HAEventWrapper) event).getReferenceCount() == 0 && logger.isDebugEnabled()) { |
| logger.debug("Reference count is already zero for event {}", event.getEventId()); |
| } |
| |
| owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) event, |
| "Queue Removal Message"); |
| } |
| |
| // At this point we know we're going to remove the event, |
| // so increment the statistic |
| owningQueue.stats.incEventsRemovedByQrm(); |
| |
| if (!owningQueue.shouldBeConflated(event)) { |
| // Just update the counters set |
| this.destroy(counter); |
| continue; // we're done |
| } |
| |
| Object key = event.getKeyToConflate(); |
| String r = event.getRegionToConflate(); |
| // Update the counters set and the conflation map. |
| this.destroy(counter, key, r); |
| } // for |
| } |
| } // synchronized |
| } |
| |
| /** |
| * Invoked by the by the remove method . This method sets the LastDispatched sequence ID in the |
| * DACE & destroys all the sequence Ids which are less than the last dispatched sequence ID. The |
| * lock on the DACE is minimized by copying the Counters Set & then identifying the positions |
| * which need to be removed |
| * |
| * @param removedEventInfoList List containing objects of RemovedEventInfo class representing |
| * Events which have been peeked & are now candidate for removal. It has to be guaranteed |
| * that the sequence IDs of all the other counters is less than the last dispatched |
| * @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE |
| */ |
| protected void setLastDispatchedIDAndRemoveEvents(List removedEventInfoList, |
| long lastDispatchedSeqId) throws CacheException, InterruptedException { |
| |
| synchronized (this) { |
| if (this.lastDispatchedSequenceId > lastDispatchedSeqId) { |
| // If the current last dispatched ID is greater than the new id , |
| // then do not set it |
| return; |
| } |
| this.lastDispatchedSequenceId = lastDispatchedSeqId; |
| |
| } |
| Iterator it = removedEventInfoList.iterator(); |
| while (it.hasNext()) { |
| RemovedEventInfo info = (RemovedEventInfo) it.next(); |
| Long counter = info.counter; |
| Object key = info.key; |
| String r = info.regionName; |
| Conflatable wrapper = (Conflatable) owningQueue.region.get(counter); |
| if (owningQueue.destroyFromAvailableIDsAndRegion(counter)) { |
| if (key != null) { |
| this.destroy(counter, key, r); |
| } else { |
| this.destroy(counter); |
| } |
| // <HA overflow> |
| if (wrapper instanceof HAEventWrapper) { |
| owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) wrapper, "Message Dispatcher"); |
| } |
| // </HA overflow> |
| owningQueue.stats.incEventsRemoved(); |
| } else { |
| owningQueue.stats.incNumVoidRemovals(); |
| } |
| } |
| } |
| |
| /** |
| * This method is invoked by the take function. Before invoking it , the take has already |
| * removed the poistion from the available IDs set. |
| * |
| * @param info Data object of type RemovedEventInfo which contains info like position countre, |
| * key & region name |
| * @param sequenceID sequence ID of the event being removed from HARegionQueue |
| */ |
| protected void removeEventAndSetSequenceID(RemovedEventInfo info, long sequenceID) { |
| synchronized (this) { |
| if (this.lastDispatchedSequenceId < sequenceID) { |
| this.lastDispatchedSequenceId = sequenceID; |
| } |
| } |
| |
| Long counter = info.counter; |
| Object key = info.key; |
| String r = info.regionName; |
| try { |
| owningQueue.destroyFromQueue(counter); |
| } catch (EntryNotFoundException enfe) { |
| if (!owningQueue.destroyInProgress) { |
| logger.error( |
| "DACE::removeEventAndSetSequenceID: Since the event was successuly removed by a take operation, it should have existed in the region", |
| enfe); |
| } |
| } |
| if (key == null) { |
| this.destroy(counter); |
| } else { |
| this.destroy(counter, key, r); |
| } |
| |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.DataSerializableFixedID#fromData(java.io.DataInput) |
| */ |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| synchronized (this) { |
| this.lastDispatchedSequenceId = in.readLong(); |
| this.lastSequenceIDPut = in.readLong(); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.DataSerializableFixedID#getDSFID() |
| */ |
| @Override |
| public int getDSFID() { |
| return DISPATCHED_AND_CURRENT_EVENTS; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.DataSerializableFixedID#toData(java.io.DataOutput) |
| */ |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| synchronized (this) { // fix for bug #41621 |
| out.writeLong(this.lastDispatchedSequenceId); |
| out.writeLong(this.lastSequenceIDPut); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "DACE(put=" + this.lastSequenceIDPut + "sent=" + this.lastDispatchedSequenceId + ")"; |
| } |
| |
| @Override |
| public Version[] getSerializationVersions() { |
| // TODO Auto-generated method stub |
| return null; |
| } |
| } |
| |
| // TODO:Asif : Remove this method |
| @Override |
| public void remove(int top) { |
| throw new UnsupportedOperationException( |
| "HARegionQueue and its derived class do not support this operation "); |
| |
| } |
| |
| /** |
| * destroys the underlying HARegion and removes its reference from the dispatched messages map |
| */ |
| public void destroy() throws CacheWriterException { |
| this.destroyInProgress = true; |
| Map tempDispatchedMessagesMap = dispatchedMessagesMap; |
| if (tempDispatchedMessagesMap != null) { |
| tempDispatchedMessagesMap.remove(this.regionName); |
| } |
| try { |
| try { |
| updateHAContainer(); |
| } catch (RegionDestroyedException ignore) { |
| // keep going |
| } catch (CancelException ignore) { |
| // keep going |
| if (logger.isDebugEnabled()) { |
| logger.debug("HARegionQueue#destroy: ignored cancellation!!!!"); |
| } |
| } |
| |
| try { |
| this.region.destroyRegion(); |
| } catch (RegionDestroyedException | CancelException ignore) { |
| // keep going |
| } |
| ((HAContainerWrapper) haContainer).removeProxy(regionName); |
| } finally { |
| this.stats.close(); |
| } |
| } |
| |
| /** |
| * If the event is an instance of HAEventWrapper, put it into the haContainer and then into the ha |
| * region. Otherwise, simply put it into the ha region. |
| * |
| * @since GemFire 5.7 |
| */ |
| protected Conflatable putEventInHARegion(Conflatable event, Long position) { |
| if (event instanceof HAEventWrapper) { |
| HAEventWrapper inputHaEventWrapper = (HAEventWrapper) event; |
| HAEventWrapper haContainerKey = null; |
| |
| if (this.isQueueInitialized()) { |
| haContainerKey = putEntryConditionallyIntoHAContainer(inputHaEventWrapper); |
| } else { |
| haContainerKey = inputHaEventWrapper; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("adding haContainerKey to HARegion at " + position + ":" |
| + haContainerKey + " for " + this.regionName); |
| } |
| this.region.put(position, haContainerKey); |
| |
| return haContainerKey; |
| } else { // (event instanceof ClientMarkerMessageImpl OR ConflatableObject OR |
| // ClientInstantiatorMessage) |
| if (logger.isDebugEnabled()) { |
| logger.debug("adding ClientUpdateMessage to HARegion at " + position + ":" + event + " for " |
| + this.regionName); |
| } |
| this.region.put(position, event); |
| |
| return event; |
| } |
| } |
| |
| private void addClientCQsAndInterestList(ClientUpdateMessageImpl msg, |
| HAEventWrapper haEventWrapper, Map haContainer, String regionName) { |
| |
| ClientProxyMembershipID proxyID = ((HAContainerWrapper) haContainer).getProxyID(regionName); |
| if (haEventWrapper.getClientCqs() != null) { |
| CqNameToOp clientCQ = haEventWrapper.getClientCqs().get(proxyID); |
| if (clientCQ != null) { |
| msg.addClientCqs(proxyID, clientCQ); |
| } |
| } |
| |
| // This is a remote HAEventWrapper. |
| // Add new Interested client lists. |
| ClientUpdateMessageImpl clientMsg = |
| (ClientUpdateMessageImpl) haEventWrapper.getClientUpdateMessage(); |
| if (clientMsg != null) { |
| if (clientMsg.isClientInterestedInUpdates(proxyID)) { |
| msg.addClientInterestList(proxyID, true); |
| } else if (clientMsg.isClientInterestedInInvalidates(proxyID)) { |
| msg.addClientInterestList(proxyID, false); |
| } |
| } |
| } |
| |
| /** |
| * If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its |
| * clientUpdateMessage into the haContainer as <key, value>. |
| * |
| * @param inputHaEventWrapper An instance of {@code HAEventWrapper} |
| * @since GemFire 5.7 |
| */ |
| protected HAEventWrapper putEntryConditionallyIntoHAContainer( |
| HAEventWrapper inputHaEventWrapper) { |
| HAEventWrapper haContainerKey = null; |
| |
| while (haContainerKey == null) { |
| ClientUpdateMessageImpl haContainerEntry = |
| (ClientUpdateMessageImpl) ((HAContainerWrapper) this.haContainer) |
| .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage()); |
| |
| if (haContainerEntry != null) { |
| haContainerKey = (HAEventWrapper) ((HAContainerWrapper) this.haContainer) |
| .getKey(inputHaEventWrapper); |
| |
| // Key was already removed from the container, so continue |
| if (haContainerKey == null) { |
| continue; |
| } |
| |
| synchronized (haContainerKey) { |
| // assert the entry is still present and we still have the same reference |
| if (haContainerKey == ((HAContainerWrapper) this.haContainer).getKey(haContainerKey)) { |
| haContainerKey.incAndGetReferenceCount(); |
| |
| addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper, |
| this.haContainer, this.regionName); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Putting updated event in haContainer with Event ID hash code: " |
| + haContainerKey.hashCode() + "; System ID hash code: " |
| + System.identityHashCode(haContainerKey) |
| + "; Wrapper details: " + haContainerKey); |
| } |
| } else { |
| haContainerKey = null; |
| } |
| } |
| } else { |
| synchronized (inputHaEventWrapper) { |
| inputHaEventWrapper.incAndGetReferenceCount(); |
| inputHaEventWrapper.setHAContainer(this.haContainer); |
| |
| if (!inputHaEventWrapper.getPutInProgress()) { |
| // This means that this is a GII'ed event. Hence we must |
| // explicitly set 'clientUpdateMessage' to null. |
| inputHaEventWrapper.setClientUpdateMessage(null); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Putting new event in haContainer with Event ID hash code: " |
| + inputHaEventWrapper.hashCode() |
| + "; System ID hash code: " + System.identityHashCode(inputHaEventWrapper) |
| + "; Wrapper details: " + inputHaEventWrapper); |
| } |
| } |
| |
| haContainerKey = inputHaEventWrapper; |
| } |
| } |
| |
| return haContainerKey; |
| } |
| |
| /** |
| * Caller must hold the rwLock |
| * |
| * @return size of idsAvailable |
| */ |
| protected int availableIDsSize() { |
| return this.idsAvailable.size(); |
| } |
| |
| /** |
| * Caller must hold the rwLock |
| * |
| * @return the idsAvailable Set as an array |
| */ |
| protected Object[] availableIDsArray() { |
| return this.idsAvailable.toArray(); |
| } |
| |
| /** |
| * whether the primary queue for the client has registered interest |
| */ |
| public boolean noPrimaryOrHasRegisteredInterest() { |
| return this.region.noPrimaryOrHasRegisteredInterest(); |
| } |
| |
| /** |
| * set whether this queue has had interest registered for it |
| */ |
| public void setHasRegisteredInterest(boolean flag) { |
| boolean old = this.hasRegisteredInterest; |
| this.hasRegisteredInterest = flag; |
| if (old != flag) { |
| this.region.sendProfileUpdate(); |
| } |
| } |
| |
| /** |
| * hs this queue had interest registered for it? |
| */ |
| public boolean getHasRegisteredInterest() { |
| return this.hasRegisteredInterest; |
| } |
| |
| /** |
| * Called from destroy(), this method decrements the referenceCount of all the HAEventWrapper |
| * instances held by this queue. Also, removes those instances whose referenceCount becomes zero. |
| * |
| * @since GemFire 5.7 |
| */ |
| private void updateHAContainer() { |
| try { |
| Object[] availableIdsArray = null; |
| acquireReadLock(); |
| try { |
| if (this.availableIDsSize() != 0) { |
| availableIdsArray = this.availableIDsArray(); |
| } |
| } finally { |
| releaseReadLock(); |
| } |
| if (availableIdsArray != null) { |
| final Set wrapperSet = new HashSet(); |
| |
| for (int i = 0; i < availableIdsArray.length; i++) { |
| if (destroyFromAvailableIDs((long) availableIdsArray[i])) { |
| wrapperSet.add(this.region.get(availableIdsArray[i])); |
| } |
| } |
| |
| // Start a new thread which will update the clientMessagesRegion for |
| // each of the HAEventWrapper instances present in the wrapperSet |
| Thread regionCleanupTask = |
| new LoggingThread("HA Region Cleanup for " + regionName, false, () -> { |
| try { |
| Iterator iter = wrapperSet.iterator(); |
| while (iter.hasNext()) { |
| Conflatable conflatable = (Conflatable) iter.next(); |
| if (conflatable instanceof HAEventWrapper) { |
| decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy"); |
| } |
| } |
| } catch (CancelException ignore) { |
| return; // we're done |
| } catch (Exception e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Exception in regionCleanupTask thread of HARegionQueue.updateHAContainer$run()", |
| e); |
| } |
| } |
| }); |
| regionCleanupTask.start(); |
| } |
| } catch (CancelException e) { |
| throw e; |
| } catch (RegionDestroyedException e) { |
| // TODO does this really generate a CancelException, or do we still |
| // get a warning in the logs? |
| |
| // Odds are we're shutting down... |
| this.getRegion().getCache().getCancelCriterion().checkCancelInProgress(e); |
| |
| // If we get back, this is Really Weird. Treat it like the |
| // Exception case below. |
| logger.warn("HARegionQueue.updateHAContainer: underlying region has been destroyed", e); |
| } catch (Exception e) { |
| logger.warn( |
| "Exception in HARegionQueue.updateHAContainer(). The task to decrement the ref count by one for all the HAEventWrapper instances of this queue present in the haContainer may not have been started", |
| e); |
| } |
| } |
| |
| /** |
| * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present |
| * in the haContainer, then decrements its reference count. If the decremented ref count is zero |
| * and put is not in progress, removes the entry from the haContainer, before returning the |
| * {@code ClientUpdateMessage} instance. |
| * |
| * @return An instance of {@code ClientUpdateMessage} |
| * @since GemFire 5.7 |
| */ |
| public Conflatable getAndRemoveFromHAContainer(Conflatable conflatable) { |
| Conflatable msg = null; |
| if (conflatable instanceof HAEventWrapper) { |
| HAEventWrapper wrapper = (HAEventWrapper) conflatable; |
| msg = (Conflatable) HARegionQueue.this.haContainer.get(wrapper); |
| if (msg != null) { |
| decAndRemoveFromHAContainer(wrapper, "GetAndRemoveFromHAContainer"); |
| } |
| } else { |
| msg = conflatable; |
| } |
| return msg; |
| } |
| |
| /** |
| * Decrements reference count for the wrapper in the container by one. If the decremented ref |
| * count is zero and put is not in progress, removes the entry from the haContainer. |
| * |
| * @since GemFire 5.7 |
| */ |
| public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) { |
| decAndRemoveFromHAContainer(wrapper, ""); |
| } |
| |
| public void decAndRemoveFromHAContainer(HAEventWrapper wrapper, String caller) { |
| boolean decAndRemovePerformed = false; |
| |
| while (!decAndRemovePerformed) { |
| HAEventWrapper haContainerKey = |
| (HAEventWrapper) ((HAContainerWrapper) haContainer).getKey(wrapper); |
| |
| if (haContainerKey == null) { |
| break; |
| } |
| |
| synchronized (haContainerKey) { |
| if (haContainerKey == (HAEventWrapper) ((HAContainerWrapper) haContainer).getKey(wrapper)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug(caller + " decremented Event ID hash code: " + haContainerKey.hashCode() |
| + "; System ID hash code: " + System.identityHashCode(haContainerKey) |
| + "; Wrapper details: " + haContainerKey); |
| } |
| if (haContainerKey.decAndGetReferenceCount() == 0L) { |
| HARegionQueue.this.haContainer.remove(haContainerKey); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| caller + " removed Event ID hash code: " + haContainerKey.hashCode() |
| + "; System ID hash code: " |
| + System.identityHashCode(haContainerKey) |
| + "; Wrapper details: " + haContainerKey); |
| } |
| } |
| decAndRemovePerformed = true; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns true if the dispatcher for this HARegionQueue is active. |
| * |
| * @return the true if dispatcher for this HARegionQueue is active(primary node) |
| */ |
| public boolean isPrimary() { |
| return isPrimary; |
| } |
| |
| /** |
| * returns true if the queue has been fully initialized |
| */ |
| public boolean isQueueInitialized() { |
| return this.initialized.get(); |
| } |
| |
| /** |
| * Set whether the dispatcher of this node is active or not (i.e. primary or secondary node). If |
| * {@code flag} is set to {@code true}, disables Entry Expiry Tasks. |
| * |
| * @param flag the value to set isPrimary to |
| */ |
| public void setPrimary(boolean flag) { |
| boolean old = this.isPrimary; |
| this.isPrimary = flag; |
| if (flag) {// fix for #41878 |
| // since it's primary queue now, we will disable the EntryExpiryTask |
| disableEntryExpiryTasks(); |
| } |
| if (old != isPrimary) { |
| this.region.sendProfileUpdate(); |
| } |
| } |
| |
| /** |
| * Disables EntryExpiryTask for the HARegion ({@code this.region}). |
| * |
| */ |
| private void disableEntryExpiryTasks() { |
| int oldTimeToLive = this.region.getEntryTimeToLive().getTimeout(); |
| if (oldTimeToLive > 0) { |
| ExpirationAttributes ea = new ExpirationAttributes(0, // disables expiration |
| ExpirationAction.LOCAL_INVALIDATE); |
| this.region.setEntryTimeToLive(ea); |
| this.region.setCustomEntryTimeToLive(new ThreadIdentifierCustomExpiry()); |
| logger.info( |
| "Entry expiry tasks disabled because the queue became primary. Old messageTimeToLive was: {}", |
| oldTimeToLive); |
| } |
| } |
| |
| /** |
| * Set client conflation override |
| * |
| * @since GemFire 5.7 |
| */ |
| public void setClientConflation(byte value) { |
| if (value != Handshake.CONFLATION_OFF && value != Handshake.CONFLATION_ON |
| && value != Handshake.CONFLATION_DEFAULT) { |
| throw new IllegalArgumentException("illegal conflation value"); |
| } |
| this.clientConflation = value; |
| } |
| |
| public void initializeTransients() {} |
| |
| public static boolean isTestMarkerMessageReceived() { |
| return testMarkerMessageReceived; |
| } |
| |
| public static void setUsedByTest(boolean isUsedByTest) { |
| HARegionQueue.isUsedByTest = isUsedByTest; |
| if (!isUsedByTest) { |
| HARegionQueue.testMarkerMessageReceived = isUsedByTest; |
| } |
| } |
| |
| public boolean isClientSlowReceiver() { |
| return isClientSlowReceiver; |
| } |
| |
| @Override |
| public void close() { |
| Region r = getRegion(); |
| if (r != null && !r.isDestroyed()) { |
| try { |
| r.close(); |
| } catch (RegionDestroyedException ignore) { |
| } |
| } |
| } |
| |
| /** |
| * A simple check to validate that the peek() method has been executed as it initializes some |
| * structures used by other methods. |
| * |
| * @return true if peeking returns a non-null value |
| */ |
| public boolean isPeekInitialized() { |
| return HARegionQueue.peekedEventsContext.get() != null; |
| } |
| |
| /** |
| * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is |
| * operating on it. This wrapper acts as a means of communication between the QRM thread & the |
| * MapWrapper object contained in the HARegionQueue |
| */ |
| static class MapWrapper { |
| Map map; |
| |
| List list; |
| |
| boolean keepPrevAcks = false; |
| |
| public MapWrapper() { |
| super(); |
| map = new HashMap(); |
| list = new LinkedList(); |
| } |
| |
| void put(Object key, Object o) { |
| synchronized (this.map) { |
| this.map.put(key, o); |
| } |
| } |
| } |
| |
| /** |
| * A wrapper class that has counter, key and the region-name for an event which was peeked and |
| * needs to be removed. The key and regionName fields will be set only if conflation is true for |
| * the event. |
| */ |
| static class RemovedEventInfo { |
| Long counter; |
| |
| String regionName; |
| |
| Object key; |
| |
| public RemovedEventInfo(Long counter, String regionName, Object key) { |
| this.counter = counter; |
| this.regionName = regionName; |
| this.key = key; |
| } |
| } |
| |
| /** this is used to expire thread identifiers, even in primary queues */ |
| static class ThreadIdentifierCustomExpiry implements CustomExpiry { |
| |
| /** |
| * expiry time for ThreadIdentifiers. In seconds. |
| */ |
| static final int DEFAULT_THREAD_ID_EXPIRY_TIME = 300; |
| |
| @Immutable |
| private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = |
| new ExpirationAttributes(DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE); |
| |
| @MutableForTesting |
| private static volatile ExpirationAttributes testExpAtts; |
| |
| private final int expTime; |
| |
| ThreadIdentifierCustomExpiry() { |
| expTime = calculateThreadIdExpiryTime(); |
| } |
| |
| @Override |
| public ExpirationAttributes getExpiry(Region.Entry entry) { |
| // Use key to determine expiration. |
| Object key = entry.getKey(); |
| if (key instanceof ThreadIdentifier) { |
| // TODO: inject subclass of ThreadIdentifierCustomExpiry and move next block to test class |
| if (expTime != DEFAULT_THREAD_ID_EXPIRY_TIME) { |
| // This should only happen in unit test code |
| ExpirationAttributes result = testExpAtts; |
| if (result == null || result.getTimeout() != expTime) { |
| result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE); |
| // save the expiration attributes in a static to prevent tests from creating lots of |
| // instances. |
| testExpAtts = result; |
| } |
| return result; |
| } else { |
| return DEFAULT_THREAD_ID_EXP_ATTS; |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public void close() { |
| // nothing |
| } |
| |
| private static int calculateThreadIdExpiryTime() { |
| Optional<Integer> expiryTime = getProductIntegerProperty(THREAD_ID_EXPIRY_TIME_PROPERTY); |
| return expiryTime.orElse(DEFAULT_THREAD_ID_EXPIRY_TIME); |
| } |
| } |
| |
| public Queue getGiiQueue() { |
| return this.giiQueue; |
| } |
| } |