blob: eed1594ee946983481749bc1ef6c836154828f23 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.persistence.query.mock.ByteComparator;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.concurrent.Atomics;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
/**
* @author Suranjan Kumar
*
*/
public class BucketRegionQueue extends AbstractBucketRegionQueue {
private static final Logger logger = LogService.getLogger();
/**
* The <code>Map</code> mapping the regionName->key to the queue key. This
* index allows fast updating of entries in the queue for conflation. This is
* necesaary for Colocated regions and if any of the regions use same key for
* data.
*/
private final Map indexes;
/**
* A transient queue to maintain the eventSeqNum of the events that are to be
* sent to remote site. It is cleared when the queue is cleared.
*/
private final BlockingQueue<Object> eventSeqNumQueue = new LinkedBlockingQueue<Object>();
//private final BlockingQueue<EventID> eventSeqNumQueueWithEventId = new LinkedBlockingQueue<EventID>();
private long lastKeyRecovered;
/**
* @param regionName
* @param attrs
* @param parentRegion
* @param cache
* @param internalRegionArgs
*/
public BucketRegionQueue(String regionName, RegionAttributes attrs,
LocalRegion parentRegion, GemFireCacheImpl cache,
InternalRegionArguments internalRegionArgs) {
super(regionName, attrs, parentRegion, cache, internalRegionArgs);
this.keySet();
indexes = new ConcurrentHashMap<Object, Long>();
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.BucketRegion#initialize(java.io.InputStream
* ,
* com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember
* , com.gemstone.gemfire.internal.cache.InternalRegionArguments)
*/
@Override
protected void initialize(InputStream snapshotInputStream,
InternalDistributedMember imageTarget,
InternalRegionArguments internalRegionArgs) throws TimeoutException,
IOException, ClassNotFoundException {
super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
//take initialization writeLock inside the method after synchronizing on tempQueue
loadEventsFromTempQueue();
getInitializationLock().writeLock().lock();
try {
if (!this.keySet().isEmpty()) {
if (getPartitionedRegion().getColocatedWith() == null) {
List<EventID> keys = new ArrayList<EventID>(this.keySet());
Collections.sort(keys, new Comparator<EventID>() {
@Override
public int compare(EventID o1, EventID o2) {
int compareMem = new ByteComparator().compare(
o1.getMembershipID(), o2.getMembershipID());
if (compareMem == 1) {
return 1;
} else if (compareMem == -1) {
return -1;
} else {
if (o1.getThreadID() > o2.getThreadID()) {
return 1;
} else if (o1.getThreadID() < o2.getThreadID()) {
return -1;
} else {
return o1.getSequenceID() < o2.getSequenceID() ? -1 : o1
.getSequenceID() == o2.getSequenceID() ? 0 : 1;
}
}
}
});
for (EventID eventID : keys) {
eventSeqNumQueue.add(eventID);
}
} else {
TreeSet<Long> sortedKeys = new TreeSet<Long>(this.keySet());
//although the empty check for this.keySet() is done above,
//do the same for sortedKeys as well because the keySet() might have become
//empty since the above check was made (keys might have been destroyed through BatchRemoval)
//fix for #49679 NoSuchElementException thrown from BucketRegionQueue.initialize
if (!sortedKeys.isEmpty()) {
for (Long key : sortedKeys) {
eventSeqNumQueue.add(key);
}
lastKeyRecovered = sortedKeys.last();
if (this.getEventSeqNum() != null) {
Atomics.setIfGreater(getEventSeqNum(), lastKeyRecovered);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("For bucket {} ,total keys recovered are : {} last key recovered is : {} and the seqNo is ",
getId(), eventSeqNumQueue.size(), lastKeyRecovered, getEventSeqNum());
}
}
this.initialized = true;
//Now, the bucket is initialized. Destroy the failedBatchRemovalKeys.
destroyFailedBatchRemovalMessageKeys();
}
finally {
notifyEventProcessor();
getInitializationLock().writeLock().unlock();
}
}
/**
* When the GII was going, some of BatchRemoval messages (especially for events destroyed due to conflation)
* might have failed since the event might not be found in the BucketRegionQueue.
* Those messages are added to failedBatchRemovalMessageKeys map in ParallelQueueRemovalMessage.
* Events found in the map need to be destroyed once GII is done. Fix for #47431.
* This method is invoked deliberately after this.initialized is set to true to fix defect #50316.
*/
private void destroyFailedBatchRemovalMessageKeys() {
final boolean isDebugEnabled = logger.isDebugEnabled();
Iterator<Object> itr = getFailedBatchRemovalMessageKeys().iterator();
while (itr.hasNext()) {
//at this point, failedBatchRemovalMessageKeys contains messages failed during bucket
//initialization only. Iterate over failedBatchRemovalMessageKeys and clear it completely.
Object key = itr.next();
itr.remove();
if (isDebugEnabled) {
logger.debug("key from failedBatchRemovalMessageKeys is: {}", key);
}
if (containsKey(key)) {
try {
destroyKey(key);
if (isDebugEnabled) {
logger.debug("Destroyed {} from bucket: ", key, getId());
}
} catch (ForceReattemptException fe) {
if (isDebugEnabled) {
logger.debug("Bucket :{} moved to other member", getId());
}
}
}
}
}
@Override
public void beforeAcquiringPrimaryState() {
int batchSize = this.getPartitionedRegion().getParallelGatewaySender()
.getBatchSize();
Iterator<Object> itr = eventSeqNumQueue.iterator();
markEventsAsDuplicate(batchSize, itr);
}
@Override
public void closeEntries() {
OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
@Override
public void run() {
BucketRegionQueue.super.closeEntries();
}
});
this.indexes.clear();
this.eventSeqNumQueue.clear();
}
@Override
public Set<VersionSource> clearEntries(final RegionVersionVector rvv) {
final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>();
OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
@Override
public void run() {
result.set(BucketRegionQueue.super.clearEntries(rvv));
}
});
this.eventSeqNumQueue.clear();
return result.get();
}
@Override
public void forceSerialized(EntryEventImpl event) {
// NOOP since we want the value in the region queue to stay in object form.
}
protected void clearQueues(){
getInitializationLock().writeLock().lock();
try {
this.indexes.clear();
this.eventSeqNumQueue.clear();
}
finally {
getInitializationLock().writeLock().unlock();
}
}
@Override
protected boolean virtualPut(EntryEventImpl event, boolean ifNew,
boolean ifOld, Object expectedOldValue, boolean requireOldValue,
long lastModified, boolean overwriteDestroyed) throws TimeoutException,
CacheWriterException {
boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
requireOldValue, lastModified, overwriteDestroyed);
if (success) {
Object ov = event.getRawOldValue();
if (ov instanceof GatewaySenderEventImpl) {
((GatewaySenderEventImpl) ov).release();
}
if (getPartitionedRegion().getColocatedWith() == null) {
return success;
}
if (getPartitionedRegion().isConflationEnabled() && this.getBucketAdvisor().isPrimary()) {
Object object = event.getNewValue();
Long key = (Long)event.getKey();
if (object instanceof Conflatable) {
if (logger.isDebugEnabled()) {
logger.debug("Key :{} , Object : {} is conflatable", key, object);
}
// TODO: TO optimize by destroying on primary and secondary separately
// in case of conflation
conflateOldEntry((Conflatable)object, key);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Object : {} is not conflatable", object);
}
}
}
}
return success;
}
private void conflateOldEntry(Conflatable object, Long tailKey) {
PartitionedRegion region = this.getPartitionedRegion();
Conflatable conflatableObject = object;
if (region.isConflationEnabled() && conflatableObject.shouldBeConflated()) {
Object keyToConflate = conflatableObject.getKeyToConflate();
String rName = object.getRegionToConflate();
if (logger.isDebugEnabled()) {
logger.debug(" The region name is : {}", rName);
}
Map latestIndexesForRegion = (Map)this.indexes.get(rName);
if (latestIndexesForRegion == null) {
latestIndexesForRegion = new ConcurrentHashMap();
this.indexes.put(rName, latestIndexesForRegion);
}
Long previousTailKey = (Long)latestIndexesForRegion.put(keyToConflate,
tailKey);
if (previousTailKey != null) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Conflating {} at queue index={} and previousTailKey: ", this, object, tailKey, previousTailKey);
}
AbstractGatewaySenderEventProcessor ep = region.getParallelGatewaySender().getEventProcessor();
if (ep == null) return;
ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
// Give the actual conflation work to another thread.
// ParallelGatewaySenderQueue takes care of maintaining a thread pool.
queue.conflateEvent(conflatableObject, getId(), previousTailKey);
} else {
region.getParallelGatewaySender().getStatistics()
.incConflationIndexesMapSize();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: Not conflating {}", this, object);
}
}
}
// No need to synchronize because it is called from a synchronized method
private void removeIndex(Long qkey) {
// Determine whether conflation is enabled for this queue and object
Object o = getNoLRU(qkey, true, false, false);
if (o instanceof Conflatable) {
Conflatable object = (Conflatable)o;
if (object.shouldBeConflated()) {
// Otherwise, remove the index from the indexes map.
String rName = object.getRegionToConflate();
Object key = object.getKeyToConflate();
Map latestIndexesForRegion = (Map)this.indexes.get(rName);
if (latestIndexesForRegion != null) {
// Remove the index.
Long index = (Long)latestIndexesForRegion.remove(key);
if (index != null) {
this.getPartitionedRegion().getParallelGatewaySender()
.getStatistics().decConflationIndexesMapSize();
if (logger.isDebugEnabled()) {
logger.debug("{}: Removed index {} for {}", this, index, object);
}
}
}
}
}
}
@Override
protected void basicDestroy(final EntryEventImpl event,
final boolean cacheWrite, Object expectedOldValue)
throws EntryNotFoundException, CacheWriterException, TimeoutException {
if (getPartitionedRegion().isConflationEnabled()) {
removeIndex((Long)event.getKey());
}
super.basicDestroy(event, cacheWrite, expectedOldValue);
Object rov = event.getRawOldValue();
if (rov instanceof GatewaySenderEventImpl) {
((GatewaySenderEventImpl) rov).release();
}
// Primary buckets should already remove the key while peeking
if (!this.getBucketAdvisor().isPrimary()) {
if (logger.isDebugEnabled()) {
logger.debug(" removing the key {} from eventSeqNumQueue", event.getKey());
}
this.eventSeqNumQueue.remove(event.getKey());
}
}
/**
* Does a get that gets the value without fault values in from disk.
*/
private Object optimalGet(Object k) {
// Get the object at that key (to remove the index).
Object object = null;
try {
object = getValueInVMOrDiskWithoutFaultIn(k);
if (object != null && object instanceof CachedDeserializable) {
object = ((CachedDeserializable)object).getDeserializedValue(this, this.getRegionEntry(k));
}
} catch (EntryNotFoundException ok) {
// just return null;
}
if (object == Token.TOMBSTONE) {
object = null;
}
return object; // OFFHEAP: ok since callers are careful to do destroys on region queue after finished with peeked object.
}
public Object peek() {
Object key = null;
Object object = null;
//doing peek in initializationLock because during region destroy, the clearQueues
//clears the eventSeqNumQueue and can cause data inconsistency (defect #48984)
getInitializationLock().readLock().lock();
try {
if (this.getPartitionedRegion().isDestroyed()) {
throw new BucketRegionQueueUnavailableException();
}
key = this.eventSeqNumQueue.peek();
if (key != null) {
object = optimalGet(key);
if (object == null && !this.getPartitionedRegion().isConflationEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("The value against key {} in the bucket region queue with id {} is NULL for the GatewaySender {}",
key, getId(), this.getPartitionedRegion().getParallelGatewaySender());
}
}
// In case of conflation and a race where bucket recovers
// key-value from other bucket while put has come to this bucket.
// if (object != null) {
// ParallelGatewaySenderQueue queue =
// (ParallelGatewaySenderQueue)getPartitionedRegion()
// .getParallelGatewaySender().getQueues().toArray(new
// RegionQueue[1])[0];
// //queue.addToPeekedKeys(key);
// }
this.eventSeqNumQueue.remove(key);
}
return object; // OFFHEAP: ok since callers are careful to do destroys on
// region queue after finished with peeked object.
}
finally {
getInitializationLock().readLock().unlock();
}
}
protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
if (didPut) {
if (this.initialized) {
this.eventSeqNumQueue.add(key);
}
if (logger.isDebugEnabled()) {
logger.debug("Put successfully in the queue : {} was initialized: {}", event.getRawNewValue(), this.initialized);
}
}
if (this.getBucketAdvisor().isPrimary()) {
incQueueSize(1);
}
}
/**
* It removes the first key from the queue.
*
* @return Returns the key for which value was destroyed.
* @throws ForceReattemptException
*/
public Object remove() throws ForceReattemptException {
Object key = this.eventSeqNumQueue.remove();
if (key != null) {
destroyKey(key);
}
return key;
}
/**
* It removes the first key from the queue.
*
* @return Returns the value.
* @throws InterruptedException
* @throws ForceReattemptException
*/
public Object take() throws InterruptedException, ForceReattemptException {
throw new UnsupportedOperationException();
// Currently has no callers.
// To support this callers need to call freeOffHeapResources on the returned GatewaySenderEventImpl.
// Object key = this.eventSeqNumQueue.remove();
// Object object = null;
// if (key != null) {
// object = PartitionRegionHelper
// .getLocalPrimaryData(getPartitionedRegion()).get(key);
// /**
// * TODO: For the time being this is same as peek. To do a batch peek we
// * need to remove the head key. We will destroy the key once the event is
// * delivered to the GatewayReceiver.
// */
// destroyKey(key);
// }
// return object; // TODO OFFHEAP: see what callers do with the returned GatewaySenderEventImpl. We need to inc its refcount before we do the destroyKey.
}
/**
* Overriding this method from AbstractBucketRegionQueue in order to remove
* the event from eventSeqNumQueue if EntryNotFoundException is encountered
* during basicDestroy.
* This change is done during selective merge from r41425 from gemfire701X_maint.
*/
public void destroyKey(Object key) throws ForceReattemptException {
if (logger.isDebugEnabled()) {
logger.debug(" destroying primary key {}", key);
}
EntryEventImpl event = getPartitionedRegion().newDestroyEntryEvent(key,
null);
event.setEventId(new EventID(cache.getSystem()));
try {
event.setRegion(this);
basicDestroy(event, true, null);
checkReadiness();
} catch (EntryNotFoundException enf) {
if (getPartitionedRegion().isDestroyed()) {
getPartitionedRegion().checkReadiness();
if (isBucketDestroyed()) {
throw new ForceReattemptException(
"Bucket moved",
new RegionDestroyedException(
LocalizedStrings.PartitionedRegionDataStore_REGION_HAS_BEEN_DESTROYED
.toLocalizedString(), getPartitionedRegion()
.getFullPath()));
}
}
throw enf;
} catch (RegionDestroyedException rde) {
getPartitionedRegion().checkReadiness();
if (isBucketDestroyed()) {
throw new ForceReattemptException("Bucket moved while destroying key "
+ key, rde);
}
} finally {
//merge42180: are we considering offheap in cedar. Comment freeOffHeapReference intentionally
//event.freeOffHeapReferences();
event.release();
}
this.notifyEntriesRemoved();
}
public boolean isReadyForPeek() {
return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty() && !this.eventSeqNumQueue.isEmpty()
&& getBucketAdvisor().isPrimary();
}
}