blob: 225f995ae8f2a1e75c0473a97781453e8071121e [file] [log] [blame]
/*
* =========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache.wan.serial;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackArgument;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender.EventWrapper;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
/**
* @author Suranjan Kumar
* @author Yogesh Mahajan
* @since 7.0
*
*/
public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
private static final Logger logger = LogService.getLogger();
private final Object unprocessedEventsLock = new Object();
protected static final int RANDOM_SLEEP_TIME = 1000;
/**
* A <code>Map</code> of events that have not been processed by the primary
* yet. This map is created and used by a secondary <code>GatewaySender</code> to
* keep track of events that have been received by the secondary but not yet
* processed by the primary. Once an event has been processed by the primary,
* it is removed from this map. This map will only be used in the event that
* this <code>GatewaySender</code> becomes the primary. Any events contained in this
* map will need to be sent to other <code>GatewayReceiver</code>s. Note:
* unprocessedEventsLock MUST be synchronized before using this map.
*/
private Map<EventID, EventWrapper> unprocessedEvents;
/**
* A <code>Map</code> of tokens (i.e. longs) of entries that we have heard of
* from the primary but not yet the secondary. This map is created and used by
* a secondary <code>GatewaySender</code> to keep track. Note: unprocessedEventsLock
* MUST be synchronized before using this map. This is not a cut and paste
* error. sync unprocessedEventsLock when using unprocessedTokens.
*/
private Map<EventID, Long> unprocessedTokens;
private ExecutorService executor;
private Object listenerObjectLock = new Object();
private boolean failoverCompleted = false;
private final Object failoverCompletedLock = new Object();
/**
* When the Number of unchecked events exceeds this threshold and the number
* of tokens in the map exceeds this threshold then a check will be done for
* old tokens.
*/
static private final int REAP_THRESHOLD = 1000;
/*
* How many events have happened without a reap check being done?
*/
private int uncheckedCount = 0;
public SerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id) {
super(LoggingThreadGroup.createThreadGroup(
"Event Processor for GatewaySender_" + id,
logger), "Event Processor for GatewaySender_"
+ id, sender);
this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
this.unprocessedTokens = new LinkedHashMap<EventID, Long>();
initializeMessageQueue(id);
setDaemon(true);
}
@Override
protected void initializeMessageQueue(String id) {
// Create the region name
StringBuffer regionNameBuffer = new StringBuffer();
regionNameBuffer.append(id).append(
"_SERIAL_GATEWAY_SENDER_QUEUE");
String regionName = regionNameBuffer.toString();
CacheListener listener = null;
if (!this.sender.isPrimary()) {
listener = new SerialSecondaryGatewayListener(this);
initializeListenerExecutor();
}
// Create the region queue
this.queue = new SerialGatewaySenderQueue(sender, regionName, listener);
if (logger.isDebugEnabled()) {
logger.debug("Created queue: {}", this.queue);
}
}
/**
* @return false on failure
*/
protected boolean waitForPrimary() {
try {
this.sender.getSenderAdvisor().waitToBecomePrimary();
} catch (InterruptedException e) {
// No need to set the interrupt bit, we're exiting the thread.
if (!stopped()) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayImpl_AN_INTERRUPTEDEXCEPTION_OCCURRED_THE_THREAD_WILL_EXIT), e);
}
shutdownListenerExecutor();
return false;
}
try {
shutdownListenerExecutor();
// Don't allow conserve-sockets = false so that ordering is preserved.
DistributedSystem.setThreadsSocketPolicy(true);
// Once notification has occurred, handle failover
if (!stopped()) {
handleFailover();
} else {
return false;
}
} catch (RegionDestroyedException e) {
// This happens during handleFailover
// because the region on _eventQueue can be closed.
if (!stopped()) {
logger.debug("Terminating due to {}", e.getMessage(), e);
}
return false;
} catch (CancelException e) {
if (!stopped()) {
logger.debug("Terminating due to {}", e.getMessage(), e);
}
return false;
} finally {
// This is a good thing even on termination, because it ends waits
// on the part of other threads.
completeFailover();
}
return true;
}
@Override
public void run() {
try {
setRunningStatus();
// If this is not a primary gateway, wait for notification
if (!sender.isPrimary()) {
if (!waitForPrimary()) {
return;
}
} else {
// we are the primary so mark failover as being completed
completeFailover();
}
// Begin to process the message queue after becoming primary
if (logger.isDebugEnabled()) {
logger.debug("Beginning to process the message queue");
}
if (!sender.isPrimary()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_ABOUT_TO_PROCESS_THE_MESSAGE_QUEUE_BUT_NOT_THE_PRIMARY));
}
// Sleep for a bit. The random is so that if several of these are
// started at once, they will stagger a bit.
try {
Thread.sleep(new Random().nextInt(RANDOM_SLEEP_TIME));
} catch (InterruptedException e) {
// no need to set the interrupt bit or throw an exception; just exit.
return;
}
processQueue();
} catch (CancelException e) {
if (!this.isStopped()) {
logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_A_CANCELLATION_OCCURRED_STOPPING_THE_DISPATCHER));
setIsStopped(true);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayImpl_MESSAGE_DISPATCH_FAILED_DUE_TO_UNEXPECTED_EXCEPTION), e);
}
}
@Override
protected void rebalance() {
// No reason to rebalance a serial sender since all connections are to the same server.
throw new UnsupportedOperationException();
}
/**
* Handle failover. This method is called when a secondary
* <code>GatewaySender</code> becomes a primary <code>GatewaySender</code>.
*
* Once this secondary becomes the primary, it must:
* <ul>
* <li>Remove the queue's CacheListener
* <li>Process the map of unprocessed events (those it has seen but the
* previous primary had not yet processed before it crashed). These will
* include both queued and unqueued events. Remove from the queue any events
* that were already sent
* <li>Clear the unprocessed events map
* </ul>
*/
protected void handleFailover() {
/*
* We must hold this lock while we're processing these maps to prevent us
* from handling a secondary event while failover occurs. See enqueueEvent
*/
synchronized (this.unprocessedEventsLock) {
// Remove the queue's CacheListener
this.queue.removeCacheListener();
this.unprocessedTokens = null;
// Process the map of unprocessed events
logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_GATEWAY_FAILOVER_INITIATED_PROCESSING_0_UNPROCESSED_EVENTS,
this.unprocessedEvents.size()));
GatewaySenderStats statistics = this.sender.getStatistics();
if (!this.unprocessedEvents.isEmpty()) {
// do a reap for bug 37603
reapOld(statistics, true); // to get rid of timed out events
// now iterate over the region queue to figure out what unprocessed
// events are already in the queue
{
Iterator it = this.queue.getRegion().values().iterator();
while (it.hasNext() && !stopped()) {
Object o = it.next();
if (o != null && o instanceof GatewaySenderEventImpl) {
GatewaySenderEventImpl ge = (GatewaySenderEventImpl)o;
EventWrapper unprocessedEvent = this.unprocessedEvents.remove(ge.getEventId());
if (unprocessedEvent != null) {
unprocessedEvent.event.release();
if (this.unprocessedEvents.isEmpty()) {
break;
}
}
}
}
}
// now for every unprocessed event add it to the end of the queue
{
Iterator<Map.Entry<EventID, EventWrapper>> it = this.unprocessedEvents.entrySet().iterator();
while (it.hasNext()) {
if (stopped()) break;
Map.Entry<EventID, EventWrapper> me = it.next();
EventWrapper ew = me.getValue();
GatewaySenderEventImpl gatewayEvent = ew.event;
// Initialize each gateway event. This initializes the key,
// value
// and callback arg based on the EntryEvent.
// TODO:wan70, remove dependencies from old code
gatewayEvent.initialize();
// Verify that they GatewayEventCallbackArgument is initialized.
// If not, initialize it. It won't be initialized if a client to
// this GatewayHub VM was the creator of this event. This Gateway
// will be the first one to process it. If will be initialized if
// this event was sent to this Gateway from another GatewayHub
// (either directly or indirectly).
GatewaySenderEventCallbackArgument seca = gatewayEvent
.getSenderCallbackArgument();
if (seca.getOriginatingDSId() == GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
seca.setOriginatingDSId(sender.getMyDSId());
seca.initializeReceipientDSIds(Collections.singletonList(sender
.getRemoteDSId()));
}
it.remove();
boolean queuedEvent = false;
try {
queuePrimaryEvent(gatewayEvent);
queuedEvent = true;
} catch (IOException ex) {
if (!stopped()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_DROPPED_DURING_FAILOVER_0, gatewayEvent), ex);
}
} catch (CacheException ex) {
if (!stopped()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_DROPPED_DURING_FAILOVER_0, gatewayEvent), ex);
}
} finally {
if (!queuedEvent) {
gatewayEvent.release();
}
}
}
}
// Clear the unprocessed events map
statistics.clearUnprocessedMaps();
}
// Iterate the entire queue and mark all events as possible
// duplicate
logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0__MARKING__1__EVENTS_AS_POSSIBLE_DUPLICATES,
new Object[] {getSender(), Integer.valueOf(this.queue.size())}));
Iterator it = this.queue.getRegion().values().iterator();
while (it.hasNext() && !stopped()) {
Object o = it.next();
if (o != null && o instanceof GatewaySenderEventImpl) {
GatewaySenderEventImpl ge = (GatewaySenderEventImpl)o;
ge.setPossibleDuplicate(true);
}
}
releaseUnprocessedEvents();
} // synchronized
}
private void releaseUnprocessedEvents() {
synchronized (this.unprocessedEventsLock) {
Map<EventID, EventWrapper> m = this.unprocessedEvents;
if (m != null) {
for (EventWrapper ew: m.values()) {
GatewaySenderEventImpl gatewayEvent = ew.event;
gatewayEvent.release();
}
this.unprocessedEvents = null;
}
}
}
@Override
public void closeProcessor() {
try {
super.closeProcessor();
} finally {
releaseUnprocessedEvents();
}
}
/**
* Add the input object to the event queue
*/
@Override
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event,
Object substituteValue) throws IOException, CacheException {
// There is a case where the event is serialized for processing. The
// region is not
// serialized along with the event since it is a transient field. I
// created an
// intermediate object (GatewayEventImpl) to avoid this since the region
// name is
// used in the sendBatch method, and it can't be null. See EntryEventImpl
// for details.
GatewaySenderEventImpl senderEvent = null;
boolean isPrimary = sender.isPrimary();
if (!isPrimary) {
// Fix for #40615. We need to check if we've now become the primary
// while holding the unprocessedEventsLock. This prevents us from failing
// over while we're processing an event as a secondaryEvent.
synchronized (unprocessedEventsLock) {
// Test whether this gateway is the primary.
if (sender.isPrimary()) {
isPrimary = true;
} else {
// If it is not, create an uninitialized GatewayEventImpl and
// put it into the map of unprocessed events.
senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP ok
handleSecondaryEvent(senderEvent);
}
}
}
if (isPrimary) {
Region region = event.getRegion();
boolean isPDXRegion = (region instanceof DistributedRegion && region.getName().equals(PeerTypeRegistration.REGION_NAME));
if (!isPDXRegion) {
waitForFailoverCompletion();
}
// If it is, create and enqueue an initialized GatewayEventImpl
senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue); // OFFHEAP ok
queuePrimaryEvent(senderEvent);
}
}
private void queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
throws IOException, CacheException {
// Queue the event
GatewaySenderStats statistics = this.sender.getStatistics();
if (logger.isDebugEnabled()) {
logger.debug("{}: Queueing event ({}): {}", sender.getId(), (statistics.getEventsQueued() + 1), gatewayEvent);
}
if (!sender.beforeEnque(gatewayEvent)) {
if (logger.isDebugEnabled()) {
logger.debug("Event {} is not added to queue.", gatewayEvent);
}
statistics.incEventsFiltered();
return;
}
long start = statistics.startTime();
try {
this.queue.put(gatewayEvent);
} catch (InterruptedException e) {
// Asif Not expected from SingleWriteSingleReadRegionQueue as it does not
// throw
// InterruptedException. But since both HARegionQueue and
// SingleReadSingleWriteRegionQueue
// extend RegionQueue , it has to handle InterruptedException
Thread.currentThread().interrupt();
getSender().getCancelCriterion().checkCancelInProgress(e);
}
statistics.endPut(start);
if (logger.isDebugEnabled()) {
logger.debug("{}: Queued event ({}): {}", sender.getId(), (statistics.getEventsQueued()), gatewayEvent);
}
// this._logger.warning(getGateway() + ": Queued event (" +
// (statistics.getEventsQueued()) + "): " + gatewayEvent + " queue size: "
// + this._eventQueue.size());
/*
* FAILOVER TESTING CODE System.out.println(getName() + ": Queued event (" +
* (statistics.getEventsQueued()) + "): " + gatewayEvent.getId());
*/
int queueSize = eventQueueSize();
statistics.incQueueSize(1);
if (!this.eventQueueSizeWarning
&& queueSize >= AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0_THE_EVENT_QUEUE_SIZE_HAS_REACHED_THE_THRESHOLD_1,
new Object[] { sender.getId(), Integer.valueOf(AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) }));
this.eventQueueSizeWarning = true;
}
}
protected void waitForFailoverCompletion() {
synchronized (this.failoverCompletedLock) {
if (this.failoverCompleted) {
return;
}
logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0__WAITING_FOR_FAILOVER_COMPLETION, this));
try {
while (!this.failoverCompleted) {
this.failoverCompletedLock.wait();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
this.sender.getCache().getCancelCriterion().checkCancelInProgress(ex);
logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0_DID_NOT_WAIT_FOR_FAILOVER_COMPLETION_DUE_TO_INTERRUPTION, this));
}
}
}
protected void completeFailover() {
synchronized (this.failoverCompletedLock) {
this.failoverCompleted = true;
this.failoverCompletedLock.notifyAll();
}
}
/**
* Update an unprocessed event in the unprocessed events map. This method is
* called by a secondary <code>GatewaySender</code> to store a gateway event until
* it is processed by a primary <code>GatewaySender</code>. The complexity of this
* method is the fact that the event could be processed first by either the
* primary or secondary <code>GatewaySender</code>.
*
* If the primary processes the event first, the map will already contain an
* entry for the event (through
* {@link com.gemstone.gemfire.internal.cache.wan.serial.SerialSecondaryGatewayListener#afterDestroy}
* ). When the secondary processes the event, it will remove it from the map.
*
* If the secondary processes the event first, it will add it to the map. When
* the primary processes the event (through
* {@link com.gemstone.gemfire.internal.cache.wan.serial.SerialSecondaryGatewayListener#afterDestroy}
* ), it will then be removed from the map.
*
* @param senderEvent
* The event being processed
*/
protected void handleSecondaryEvent(GatewaySenderEventImpl senderEvent) {
basicHandleSecondaryEvent(senderEvent);
}
/**
* Update an unprocessed event in the unprocessed events map. This method is
* called by a primary <code>Gateway</code> (through
* {@link com.gemstone.gemfire.internal.cache.wan.serial.SerialSecondaryGatewayListener#afterCreate}
* ) to notify the secondary <code>Gateway</code> that an event has been added
* to the queue. Once an event has been added to the queue, the secondary no
* longer needs to keep track of it in the unprocessed events map. The
* complexity of this method is the fact that the event could be processed
* first by either the primary or secondary <code>Gateway</code>.
*
* If the primary processes the event first, the map will not contain an entry
* for the event. It will be added to the map in this case so that when the
* secondary processes it, it will know that the primary has already processed
* it, and it can be safely removed.
*
* If the secondary processes the event first, the map will already contain an
* entry for the event. In this case, the event can be removed from the map.
*
* @param gatewayEvent
* The event being processed
*/
protected void handlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
Executor my_executor = this.executor;
synchronized (listenerObjectLock) {
if (my_executor == null) {
// should mean we are now primary
return;
}
try {
my_executor.execute(new Runnable() {
public void run() {
basicHandlePrimaryEvent(gatewayEvent);
}
});
}
catch (RejectedExecutionException ex) {
throw ex;
}
}
}
/**
* Called when the primary gets rid of an event from the queue This method
* added to fix bug 37603
*/
protected void handlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) {
Executor my_executor = this.executor;
synchronized (listenerObjectLock) {
if (my_executor == null) {
// should mean we are now primary
return;
}
try {
my_executor.execute(new Runnable() {
public void run() {
basicHandlePrimaryDestroy(gatewayEvent);
}
});
}
catch (RejectedExecutionException ex) {
throw ex;
}
}
}
/**
* Just remove the event from the unprocessed events map if it is present.
* This method added to fix bug 37603
*/
protected void basicHandlePrimaryDestroy(
final GatewaySenderEventImpl gatewayEvent) {
if (this.sender.isPrimary()) {
// no need to do anything if we have become the primary
return;
}
GatewaySenderStats statistics = this.sender.getStatistics();
// Get the event from the map
synchronized (unprocessedEventsLock) {
if (this.unprocessedEvents == null)
return;
// now we can safely use the unprocessedEvents field
EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
if (ew != null) {
ew.event.release();
statistics.incUnprocessedEventsRemovedByPrimary();
}
}
}
protected void basicHandlePrimaryEvent(
final GatewaySenderEventImpl gatewayEvent) {
if (this.sender.isPrimary()) {
// no need to do anything if we have become the primary
return;
}
GatewaySenderStats statistics = this.sender.getStatistics();
// Get the event from the map
synchronized (unprocessedEventsLock) {
if (this.unprocessedEvents == null)
return;
// now we can safely use the unprocessedEvents field
EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
if (ew == null) {
// first time for the event
if (logger.isTraceEnabled()) {
logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed token map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), gatewayEvent.getValueAsString(true));
}
{
Long mapValue = Long.valueOf(System.currentTimeMillis()
+ AbstractGatewaySender.TOKEN_TIMEOUT);
Long oldv = this.unprocessedTokens.put(gatewayEvent.getEventId(), mapValue);
if (oldv == null) {
statistics.incUnprocessedTokensAddedByPrimary();
} else {
// its ok for oldv to be non-null
// this shouldn't happen anymore @todo add an assertion here
}
}
} else {
// already added by secondary (i.e. hub)
// the secondary
// gateway has already seen this event, and it can be safely
// removed (it already was above)
if (logger.isTraceEnabled()) {
logger.trace("{}: Primary create/update event {}:{}->{} remove from unprocessed events map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), gatewayEvent.getValueAsString(true));
}
ew.event.release();
statistics.incUnprocessedEventsRemovedByPrimary();
}
reapOld(statistics, false);
}
}
private void basicHandleSecondaryEvent(
final GatewaySenderEventImpl gatewayEvent) {
boolean freeGatewayEvent = true;
try {
GatewaySenderStats statistics = this.sender.getStatistics();
// Get the event from the map
if (!getSender().getGatewayEventFilters().isEmpty()) {
try {
gatewayEvent.initialize();
}
catch (Exception e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_FAILED_TO_BE_INITIALIZED_0, gatewayEvent), e);
}
if (!sender.beforeEnque(gatewayEvent)) {
statistics.incEventsFiltered();
return;
}
}
Assert.assertHoldsLock(unprocessedEventsLock, true);
Assert.assertTrue(unprocessedEvents != null);
// @todo add an assertion that !getPrimary()
// now we can safely use the unprocessedEvents field
Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
if (v == null) {
// first time for the event
if (logger.isTraceEnabled()) {
logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), gatewayEvent.getValueAsString(true));
}
{
EventWrapper mapValue = new EventWrapper(gatewayEvent);
EventWrapper oldv = this.unprocessedEvents.put(gatewayEvent.getEventId(), mapValue);
if (oldv == null) {
freeGatewayEvent = false;
statistics.incUnprocessedEventsAddedBySecondary();
} else {
// put old one back in
this.unprocessedEvents.put(gatewayEvent.getEventId(), oldv);
// already added by secondary (i.e. hub)
logger.warn(LocalizedMessage.create(
LocalizedStrings.GatewayImpl_0_THE_UNPROCESSED_EVENTS_MAP_ALREADY_CONTAINED_AN_EVENT_FROM_THE_HUB_1_SO_IGNORING_NEW_EVENT_2,
new Object[] { sender.getId(), v, gatewayEvent }));
}
}
} else {
// token already added by primary already removed
if (logger.isTraceEnabled()) {
logger.trace("{}: Secondary created event {}:{}->{} removed from unprocessed events map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), gatewayEvent.getValueAsString(true));
}
statistics.incUnprocessedTokensRemovedBySecondary();
}
reapOld(statistics, false);
} finally {
if (freeGatewayEvent) {
gatewayEvent.release();
}
}
}
/**
* Call to check if a cleanup of tokens needs to be done
*/
private void reapOld(final GatewaySenderStats statistics, boolean forceEventReap) {
synchronized (this.unprocessedEventsLock) {
if (uncheckedCount > REAP_THRESHOLD) { // only check every X events
uncheckedCount = 0;
long now = System.currentTimeMillis();
if (!forceEventReap && this.unprocessedTokens.size() > REAP_THRESHOLD) {
Iterator<Map.Entry<EventID, Long>> it = this.unprocessedTokens.entrySet().iterator();
int count = 0;
while (it.hasNext()) {
Map.Entry<EventID, Long> me = it.next();
long meValue = me.getValue().longValue();
if (meValue <= now) {
// @todo log fine level message here
// it has expired so remove it
it.remove();
count++;
} else {
// all done try again
break;
}
}
if (count > 0) {
//statistics.incUnprocessedTokensRemovedByTimeout(count);
}
}
if (forceEventReap || this.unprocessedEvents.size() > REAP_THRESHOLD) {
Iterator<Map.Entry<EventID, EventWrapper>> it = this.unprocessedEvents.entrySet().iterator();
int count = 0;
while (it.hasNext()) {
Map.Entry<EventID, EventWrapper> me = it.next();
EventWrapper ew = me.getValue();
if (ew.timeout <= now) {
// @todo log fine level message here
// it has expired so remove it
it.remove();
ew.event.release();
count++;
} else {
// all done try again
break;
}
}
if (count > 0) {
//statistics.incUnprocessedEventsRemovedByTimeout(count);
}
}
} else {
uncheckedCount++;
}
}
}
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("GatewayEventProcessor[").append("gatewaySenderId=")
.append(sender.getId())
.append(";remoteDSId=")
.append(getSender().getRemoteDSId())
.append(";batchSize=")
.append(getSender().getBatchSize());
buffer.append("]");
return buffer.toString();
}
/**
* Initialize the Executor that handles listener events. Only used by
* non-primary gateway senders
*/
private void initializeListenerExecutor() {
// Create the ThreadGroups
final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
"Gateway Listener Group", logger);
// Create the Executor
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(Runnable command) {
Thread thread = new Thread(loggerGroup, command,
"Queued Gateway Listener Thread");
thread.setDaemon(true);
return thread;
}
};
LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(1, 1/* max unused */, 120,
TimeUnit.SECONDS, q, tf);
}
private void shutdownListenerExecutor() {
synchronized (listenerObjectLock) {
if (this.executor != null) {
this.executor.shutdown();
this.executor = null;
}
}
}
@Override
public void removeCacheListener() {
this.queue.removeCacheListener();
}
@Override
public void initializeEventDispatcher() {
if (logger.isDebugEnabled()) {
logger.debug(" Creating the GatewayEventCallbackDispatcher");
}
this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
}
}