blob: abc0108582f23a6ab3bbc93c18c32fe85ff05475 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache.wan.parallel;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.size.SingleObjectSizer;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
/**
* @author Suranjan Kumar
*
*/
public class ParallelGatewaySenderEventProcessor extends
AbstractGatewaySenderEventProcessor {
private static final Logger logger = LogService.getLogger();
final int index;
final int nDispatcher;
protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender) {
super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_"
+ sender.getId(), logger),
"Event Processor for GatewaySender_" + sender.getId(), sender);
this.index = 0;
this.nDispatcher = 1;
initializeMessageQueue(sender.getId());
setDaemon(true);
}
/**
* use in concurrent scenario where queue is to be shared among all the processors.
*/
protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender, Set<Region> userRegions, int id, int nDispatcher) {
super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_"
+ sender.getId(), logger),
"Event Processor for GatewaySender_" + sender.getId()+"_"+ id, sender);
this.index = id;
this.nDispatcher = nDispatcher;
//this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher);
initializeMessageQueue(sender.getId());
setDaemon(true);
}
@Override
protected void initializeMessageQueue(String id ) {
Set<Region> targetRs = new HashSet<Region>();
for (LocalRegion region : ((GemFireCacheImpl)((AbstractGatewaySender)sender).getCache())
.getApplicationRegions()) {
if (region.getAllGatewaySenderIds().contains(id)) {
targetRs.add(region);
}
}
if (logger.isDebugEnabled()) {
logger.debug("The target Regions are(PGSEP) {}", targetRs);
}
if (sender.getIsHDFSQueue())
this.queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
else
this.queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
if(((ParallelGatewaySenderQueue)queue).localSize() > 0) {
((ParallelGatewaySenderQueue)queue).notifyEventProcessorIfRequired();
}
}
@Override
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event,
Object substituteValue) throws IOException, CacheException {
GatewaySenderEventImpl gatewayQueueEvent = null;
Region region = event.getRegion();
if (!(region instanceof DistributedRegion) && ((EntryEventImpl)event).getTailKey() == -1) {
// In case of parallel sender, we don't expect the key to be not set.
// If it is the case then the event must be coming from notificationOnly message.
// Don't enqueue the event and return from here only.
// Fix for #49081 and EntryDestroyedException in #49367.
if (logger.isDebugEnabled()) {
logger.debug("ParallelGatewaySenderEventProcessor not enqueing the following event since tailKey is not set. {}", event);
}
return;
}
//TODO : Kishor : Looks like for PDX region bucket id is set to -1.
// int bucketId = -1;
// if (!(region instanceof DistributedRegion && ((DistributedRegion)region)
// .isPdxTypesRegion())) {
// bucketId = PartitionedRegionHelper.getHashKey(event);
// }
try {
EventID eventID = ((EntryEventImpl)event).getEventId();
// while merging 42004, kept substituteValue as it is(it is barry's
// change 42466). bucketID is merged with eventID.getBucketID
if (!sender.getIsHDFSQueue())
gatewayQueueEvent = new GatewaySenderEventImpl(operation, event,
substituteValue, true, eventID.getBucketID());
else
gatewayQueueEvent = new HDFSGatewayEventImpl(operation,
event, substituteValue, true, eventID.getBucketID());
if (getSender().beforeEnque(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
try {
this.queue.put(gatewayQueueEvent);
gatewayQueueEvent = null;
}
catch (InterruptedException e) {
e.printStackTrace();
}
getSender().getStatistics().endPut(start);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("The Event {} is filtered.", gatewayQueueEvent);
}
getSender().getStatistics().incEventsFiltered();
}
}
finally {
if (gatewayQueueEvent != null) {
// it was not queued for some reason
gatewayQueueEvent.release();
}
}
}
public void clear(PartitionedRegion pr, int bucketId) {
((ParallelGatewaySenderQueue)this.queue).clear(pr, bucketId);
}
/*public int size(PartitionedRegion pr, int bucketId)
throws ForceReattemptException {
return ((ParallelGatewaySenderQueue)this.queue).size(pr, bucketId);
}*/
public void notifyEventProcessorIfRequired(int bucketId) {
((ParallelGatewaySenderQueue)this.queue).notifyEventProcessorIfRequired();
}
public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
return ((ParallelGatewaySenderQueue)this.queue).getBucketToTempQueueMap().get(bucketId);
}
public PartitionedRegion getRegion(String prRegionName) {
return ((ParallelGatewaySenderQueue)this.queue).getRegion(prRegionName);
}
public void removeShadowPR(String prRegionName) {
((ParallelGatewaySenderQueue)this.queue).removeShadowPR(prRegionName);
}
public void conflateEvent(Conflatable conflatableObject, int bucketId,
Long tailKey) {
((ParallelGatewaySenderQueue)this.queue).conflateEvent(conflatableObject, bucketId, tailKey);
}
public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
int bucketId) throws ForceReattemptException {
return ((HDFSParallelGatewaySenderQueue)this.queue).get(region, regionKey, bucketId);
}
public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
int bucketId) throws ForceReattemptException {
return ((HDFSParallelGatewaySenderQueue)this.queue).getBucketRegionQueue(region, bucketId);
}
public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
// TODO Auto-generated method stub
((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserPR(pr);
}
public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
// TODO Auto-generated method stub
((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserRR(userRegion);
}
@Override
protected void rebalance() {
// No operation for AsyncEventQueuerProcessor
}
@Override
public void initializeEventDispatcher() {
if (logger.isDebugEnabled()) {
logger.debug(" Creating the GatewayEventCallbackDispatcher");
}
this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
}
}