blob: 0c51e6593662116dced5d37d98a407f80c74ff54 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.wan.parallel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.GemFireException;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* Parallel processor which constitutes of multiple {@link ParallelGatewaySenderEventProcessor}.
* Each of the {@link ParallelGatewaySenderEventProcessor} is responsible of dispatching events from
* a set of shadowPR or buckets.
* Once the buckets/shadowPRs are assigned to a processor it should not change to avoid any event
* ordering issue.
*
* The {@link ParallelGatewaySenderQueue} should be shared among all the {@link ParallelGatewaySenderEventProcessor}s.
*
* @author Suranjan Kumar
*
*/
public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
protected static final Logger logger = LogService.getLogger();
protected ParallelGatewaySenderEventProcessor processors[];
//private final List<ConcurrentParallelGatewaySenderQueue> concurrentParallelQueues;
private GemFireException ex = null;
final int nDispatcher;
public ConcurrentParallelGatewaySenderEventProcessor(AbstractGatewaySender sender) {
super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_"
+ sender.getId()),
"Event Processor for GatewaySender_" + sender.getId(), sender);
// initializeMessageQueue(sender.getId());
logger.info("ConcurrentParallelGatewaySenderEventProcessor: dispatcher threads {}", sender.getDispatcherThreads());
nDispatcher= sender.getDispatcherThreads();
/**
* We have to divide the buckets/shadowPRs here.
* So that the individual processors can start with a set of events to deal with
* In case of shadowPR getting created it will have to attach itself to one of the
* processors when they are created.
*/
// We have to do static partitioning of buckets and region attached.
// We should remember that this partitioning may change in future as new shadowPRs
// get created.
// Static partitioning is as follows
// for each of the shadowPR:
// each of the processor gets : 0 .. totalNumBuckets/totalDispatcherThreads and last processor gets the remaining
// bucket
Set<Region> targetRs = new HashSet<Region>();
for (LocalRegion pr : ((GemFireCacheImpl)((AbstractGatewaySender)sender)
.getCache()).getApplicationRegions()) {
if (pr.getAllGatewaySenderIds().contains(sender.getId())) {
targetRs.add(pr);
}
}
if (logger.isDebugEnabled()) {
logger.debug("The target PRs are {} Dispatchers: {}", targetRs, nDispatcher);
}
createProcessors(sender.getDispatcherThreads(), targetRs);
// this.queue = parallelQueue;
this.queue = new ConcurrentParallelGatewaySenderQueue(this.processors);
setDaemon(true);
}
protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
processors = new ParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
if (logger.isDebugEnabled()) {
logger.debug("Creating AsyncEventProcessor");
}
for (int i = 0; i < sender.getDispatcherThreads(); i++) {
processors[i] = new ParallelGatewaySenderEventProcessor(sender,
targetRs, i, sender.getDispatcherThreads());
}
}
@Override
protected void initializeMessageQueue(String id) {
/* Set<Region> targetRs = new HashSet<Region>();
for (LocalRegion pr : ((GemFireCacheImpl)((ParallelGatewaySenderImpl)sender)
.getCache()).getApplicationRegions()) {
if (pr.getAllGatewaySenderIds().contains(id)) {
targetRs.add(pr);
}
}
*/
// this.parallelQueue = new ParallelGatewaySenderQueue(this.sender, targetRs);
/*if (sender.getIsHDFSQueue())
this.parallelQueue = new HDFSParallelGatewaySenderQueue(this.sender,
targetRs);
else
this.parallelQueue = new ParallelGatewaySenderQueue(this.sender, targetRs);*/
}
@Override
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event,
Object substituteValue) throws IOException, CacheException {
Region region = event.getRegion();
//int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation)event);
int bucketId = ((EntryEventImpl)event).getEventId().getBucketID();
if( bucketId < 0) {
return;
}
int pId = bucketId % this.nDispatcher;
this.processors[pId].enqueueEvent(operation, event, substituteValue);
/* if (getSender().beforeEnque(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
try {
this.parallelQueue.put(gatewayQueueEvent);
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (gatewayQueueEvent != null) {
gatewayQueueEvent.release();
}
getSender().getStatistics().endPut(start);
}
else {
getSender().getStatistics().incEventsFiltered();
}*/
}
@Override
public void run() {
final boolean isDebugEnabled = logger.isDebugEnabled();
for(int i = 0; i < this.processors.length; i++){
if (isDebugEnabled) {
logger.debug("Starting the ParallelProcessors {}", i);
}
this.processors[i].start();
}
try {
waitForRunningStatus();
} catch (GatewaySenderException e) {
this.ex = e;
}
synchronized (this.runningStateLock) {
if (ex != null) {
this.setException(ex);
setIsStopped(true);
} else {
setIsStopped(false);
}
this.runningStateLock.notifyAll();
}
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
try {
parallelProcessor.join();
} catch (InterruptedException e) {
if(isDebugEnabled) {
logger.debug("Got InterruptedException while waiting for child threads to finish.");
Thread.currentThread().interrupt();
}
}
}
}
private void waitForRunningStatus() {
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
synchronized (parallelProcessor.runningStateLock) {
while (parallelProcessor.getException() == null
&& parallelProcessor.isStopped()) {
try {
parallelProcessor.runningStateLock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Exception ex = parallelProcessor.getException();
if (ex != null) {
throw new GatewaySenderException(
LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1
.toLocalizedString(new Object[] { this.getId(),
ex.getMessage() }), ex.getCause());
}
}
}
}
@Override
public void stopProcessing() {
if (!this.isAlive()) {
return;
}
final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
.createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger);
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(final Runnable task) {
final Thread thread = new Thread(loggingThreadGroup, task,
"ConcurrentParallelGatewaySenderEventProcessor Stopper Thread");
thread.setDaemon(true);
return thread;
}
};
List<SenderStopperCallable> stopperCallables = new ArrayList<SenderStopperCallable>();
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
stopperCallables.add(new SenderStopperCallable(parallelProcessor));
}
ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory);
try {
List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables);
for(Future<Boolean> f: futures) {
try {
Boolean b = f.get();
if (logger.isDebugEnabled()) {
logger.debug("ConcurrentParallelGatewaySenderEventProcessor: {} stopped dispatching: {}",
(b ? "Successfully" : "Unsuccesfully"), this);
}
} catch (ExecutionException e) {
// we don't expect any exception but if caught then eat it and log warning
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_0_CAUGHT_EXCEPTION_WHILE_STOPPING_1, sender), e.getCause());
}
}
} catch (InterruptedException e) {
throw new InternalGemFireException(e.getMessage());
} catch (RejectedExecutionException rejectedExecutionEx) {
throw rejectedExecutionEx;
}
setIsStopped(true);
stopperService.shutdown();
closeProcessor();
if (logger.isDebugEnabled()) {
logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Stopped dispatching: {}", this);
}
}
@Override
public void closeProcessor() {
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
parallelProcessor.closeProcessor();
}
}
@Override
public void pauseDispatching(){
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
parallelProcessor.pauseDispatching();
}
super.pauseDispatching();
if (logger.isDebugEnabled()) {
logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Paused dispatching: {}", this);
}
}
@Override
public void waitForDispatcherToPause() {
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
parallelProcessor.waitForDispatcherToPause();
}
// super.waitForDispatcherToPause();
}
@Override
public void resumeDispatching() {
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
parallelProcessor.resumeDispatching();
}
super.resumeDispatching();
if (logger.isDebugEnabled()) {
logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Resumed dispatching: {}", this);
}
}
@Override
protected void waitForResumption() throws InterruptedException {
// TODO Auto-generated method stub
super.waitForResumption();
}
/**
* Test only methods for verification purpose.
*/
public List<ParallelGatewaySenderEventProcessor> getProcessors() {
List<ParallelGatewaySenderEventProcessor> l = new LinkedList<ParallelGatewaySenderEventProcessor>();
for (int i = 0; i < processors.length; i++) {
l.add(processors[i]);
}
return l;
}
/*
public List<ConcurrentParallelGatewaySenderQueue> getConcurrentParallelQueues() {
return concurrentParallelQueues;
}*/
@Override
public RegionQueue getQueue() {
return this.queue;
}
/* public Set<PartitionedRegion> getRegions() {
return ((ParallelGatewaySenderQueue)(processors[0].getQueue())).getRegions();
}
public int localSize() {
return ((ParallelGatewaySenderQueue)(processors[0].getQueue())).localSize();
}*/
@Override
public GatewaySenderEventDispatcher getDispatcher() {
return this.processors[0].getDispatcher();//Suranjan is that fine??
}
@Override
protected void rebalance() {
// no op for AsyncEventProcessor
}
@Override
public void initializeEventDispatcher() {
// no op for AsyncEventProcessor
}
}