blob: 9b1542bf2028b4345185db5ae1b2a57c48e5d06f [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
/**
* Parallel Gateway Sender Queue extended for HDFS functionality
*
* @author Hemant Bhanawat
*/
public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
private int currentBucketIndex = 0;
private int elementsPeekedAcrossBuckets = 0;
private SystemTimer rollListTimer = null;
public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000);
public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
Set<Region> userPRs, int idx, int nDispatcher) {
super(sender, userPRs, idx, nDispatcher);
//only first dispatcher Hemant?
if (sender.getBucketSorted() && this.index == 0) {
rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
true);
// schedule the task to roll the skip lists
rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(),
ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS);
}
}
@Override
public Object peek() throws InterruptedException, CacheException {
/* If you call peek and use super.peek it leads to the following exception.
* So I'm adding an explicit UnsupportedOperationException.
Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue
at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964)
at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078)
*/
throw new UnsupportedOperationException();
}
@Override
public void cleanUp() {
super.cleanUp();
cancelRollListTimer();
}
private void cancelRollListTimer() {
if (rollListTimer != null) {
rollListTimer.cancel();
rollListTimer = null;
}
}
/**
* A call to this function peeks elements from the first local primary bucket.
* Next call to this function peeks elements from the next local primary
* bucket and so on.
*/
@Override
public List peek(int batchSize, int timeToWait) throws InterruptedException,
CacheException {
List batch = new ArrayList();
int batchSizeInBytes = batchSize*1024*1024;
PartitionedRegion prQ = getRandomShadowPR();
if (prQ == null || prQ.getLocalMaxMemory() == 0) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
blockProcesorThreadIfRequired();
return batch;
}
ArrayList list = null;
ArrayList<Integer> pbuckets = new ArrayList<Integer>(prQ
.getDataStore().getAllLocalPrimaryBucketIds());
ArrayList<Integer> buckets = new ArrayList<Integer>();
for(Integer i : pbuckets) {
if(i % this.nDispatcher == this.index)
buckets.add(i);
}
// In case of failures, peekedEvents would possibly have some elements
// add them.
if (this.resetLastPeeked) {
int previousBucketId = -1;
boolean stillPrimary = true;
Iterator<GatewaySenderEventImpl> iter = peekedEvents.iterator();
// we need to remove the events of the bucket that are no more primary on
// this node as they cannot be persisted from this node.
while(iter.hasNext()) {
HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next();
if (previousBucketId != hdfsEvent.getBucketId()){
stillPrimary = buckets.contains(hdfsEvent.getBucketId());
previousBucketId = hdfsEvent.getBucketId();
}
if (stillPrimary)
batch.add(hdfsEvent);
else {
iter.remove();
}
}
this.resetLastPeeked = false;
}
if (buckets.size() == 0) {
// Sleep a bit before trying again. provided by Dan
try {
Thread.sleep(50);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return batch;
}
if (this.sender.getBucketSorted()) {
}
// Each call to this function returns index of next bucket
// that is to be processed. This function takes care
// of the bucket sequence that is peeked by a sequence of
// peek calls.
// If there are bucket movements between two consecutive
// calls to this function then there is chance that a bucket
// is processed twice while another one is skipped. But, that is
// ok because in the next round, it will be processed.
Integer bIdIndex = getCurrentBucketIndex(buckets.size());
// If we have gone through all the buckets once and no
// elements were peeked from any of the buckets, take a nap.
// This always sleep in the first call but that should be ok
// because the timeToWait in practical use cases would be greater
// than this sleep of 100 ms.
if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
.getDataStore().getLocalBucketById(buckets.get(bIdIndex)));
if (hrq == null) {
// bucket moved to another node after getAllLocalPrimaryBucketIds
// was called. Peeking not possible. return.
return batch;
}
long entriesWaitingTobePeeked = hrq.totalEntries();
if (entriesWaitingTobePeeked == 0) {
blockProcesorThreadIfRequired();
return batch;
}
long currentTimeInMillis = System.currentTimeMillis();
long bucketSizeInBytes = hrq.getQueueSizeInBytes();
if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) > timeToWait)
|| ( bucketSizeInBytes > batchSizeInBytes)
|| hrq.shouldDrainImmediately()) {
// peek now
if (logger.isDebugEnabled()) {
logger.debug("Peeking queue " + hrq.getId() + ": bucketSizeInBytes " + bucketSizeInBytes
+ ": batchSizeInBytes" + batchSizeInBytes
+ ": timeToWait" + timeToWait
+ ": (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis()));
}
list = peekAhead(buckets.get(bIdIndex), hrq);
if (list != null && list.size() != 0 ) {
for (Object object : list) {
batch.add(object);
peekedEvents.add((HDFSGatewayEventImpl)object);
}
}
}
else {
blockProcesorThreadIfRequired();
}
if (logger.isDebugEnabled() && batch.size() > 0) {
logger.debug(this + ": Peeked a batch of " + batch.size() + " entries");
}
setElementsPeekedAcrossBuckets(batch.size());
return batch;
}
/**
* This function maintains an index of the last processed bucket.
* When it is called, it returns index of the next bucket.
* @param totalBuckets
* @return current bucket index
*/
private int getCurrentBucketIndex(int totalBuckets) {
int retBucket = currentBucketIndex;
if (retBucket >= totalBuckets) {
currentBucketIndex = 0;
retBucket = 0;
}
currentBucketIndex++;
return retBucket;
}
@Override
public void remove(int batchSize) throws CacheException {
int destroyed = 0;
HDFSGatewayEventImpl event = null;
if (this.peekedEvents.size() > 0)
event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
while (event != null && destroyed < batchSize) {
Region currentRegion = event.getRegion();
int currentBucketId = event.getBucketId();
int bucketId = event.getBucketId();
ArrayList<HDFSGatewayEventImpl> listToDestroy = new ArrayList<HDFSGatewayEventImpl>();
ArrayList<Object> destroyedSeqNum = new ArrayList<Object>();
// create a batch of all the entries of a bucket
while (bucketId == currentBucketId) {
listToDestroy.add(event);
destroyedSeqNum.add(event.getShadowKey());
destroyed++;
if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
event = null;
break;
}
event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
bucketId = event.getBucketId();
if (!this.sender.isRunning()){
if (logger.isDebugEnabled()) {
logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
}
return;
}
}
try {
HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId);
if (brq != null) {
// destroy the entries from the bucket
brq.destroyKeys(listToDestroy);
// Adding the removed event to the map for BatchRemovalMessage
// We need to provide the prQ as there could be multiple
// queue in a PGS now.
PartitionedRegion prQ = brq.getPartitionedRegion();
addRemovedEvents(prQ, currentBucketId, destroyedSeqNum);
}
} catch (ForceReattemptException e) {
if (logger.isDebugEnabled()) {
logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this
+ " for bucket = " + bucketId);
}
}
catch(EntryNotFoundException e) {
if (logger.isDebugEnabled()) {
logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this
+ " for bucket = " + bucketId );
}
}
}
}
/**
* Keeps a track of number of elements peeked across all buckets.
*/
private void setElementsPeekedAcrossBuckets(int peekedElements) {
this.elementsPeekedAcrossBuckets +=peekedElements;
}
/**
* Returns the number of elements peeked across all buckets. Also,
* resets this counter.
*/
private int getAndresetElementsPeekedAcrossBuckets() {
int peekedElements = this.elementsPeekedAcrossBuckets;
this.elementsPeekedAcrossBuckets = 0;
return peekedElements;
}
@Override
public void remove() throws CacheException {
throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
}
@Override
public void put(Object object) throws InterruptedException, CacheException {
super.put(object);
}
protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException {
if (logger.isDebugEnabled()) {
logger.debug(this + ": Peekahead for the bucket " + bucketId);
}
ArrayList list = hrq.peekABatch();
if (logger.isDebugEnabled() && list != null ) {
logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId);
}
return list;
}
@Override
public Object take() {
throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
}
protected boolean isUsedForHDFS()
{
return true;
}
@Override
protected void afterRegionAdd (PartitionedRegion userPR) {
}
/**
* gets the value for region key from the HDFSBucketRegionQueue
* @param region
* @throws ForceReattemptException
*/
public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException {
try {
HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId);
if (brq ==null)
return null;
return brq.getObjectForRegionKey(region, regionKey);
} catch(EntryNotFoundException e) {
if (logger.isDebugEnabled()) {
logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this
+ " for bucket = " + bucketId);
}
}
return null;
}
@Override
public void clear(PartitionedRegion pr, int bucketId) {
HDFSBucketRegionQueue brq;
try {
brq = getBucketRegionQueue(pr, bucketId);
if (brq == null)
return;
brq.clear();
} catch (ForceReattemptException e) {
//do nothing, bucket was destroyed.
}
}
@Override
public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId);
return hq.size();
}
public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
int bucketId) throws ForceReattemptException {
PartitionedRegion leader = ColocationHelper.getLeaderRegion(region);
if (leader == null)
return null;
String leaderregionPath = leader.getFullPath();
PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath);
if (prQ == null)
return null;
HDFSBucketRegionQueue brq;
brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
.getLocalBucketById(bucketId));
if(brq == null) {
prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId);
}
brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
.getInitializedBucketForId(null, bucketId));
return brq;
}
/**
* This class has the responsibility of rolling the lists of Sorted event
* Queue. The rolling of lists by a separate thread is required because
* neither put thread nor the peek/remove thread can do that. Put thread
* cannot do it because that would mean doing some synchronization with
* other put threads and peek thread that would hamper the put latency.
* Peek thread cannot do it because if the event insert rate is too high
* the list size can go way beyond what its size.
* @author hemantb
*
*/
class RollSortedListsTimerTask extends SystemTimerTask {
/**
* This function ensures that if any of the buckets has lists that are beyond
* its size, they gets rolled over into new skip lists.
*/
@Override
public void run2() {
Set<PartitionedRegion> prQs = getRegions();
for (PartitionedRegion prQ : prQs) {
ArrayList<Integer> buckets = new ArrayList<Integer>(prQ
.getDataStore().getAllLocalPrimaryBucketIds());
for (Integer bId : buckets) {
HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
.getDataStore().getLocalBucketById(bId));
if (hrq == null) {
// bucket moved to another node after getAllLocalPrimaryBucketIds
// was called. continue fixing the next bucket.
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Rolling over the list for bucket id: " + bId);
}
hrq.rolloverSkipList();
}
}
}
}
}