/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.wan.serial;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.pdx.internal.PeerTypeRegistration;

/**
 * @since GemFire 7.0
 *
 */
public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
  private static final Logger logger = LogService.getLogger();

  private final Object unprocessedEventsLock = new Object();

  protected static final int RANDOM_SLEEP_TIME = 1000;
  /**
   * A <code>Map</code> of events that have not been processed by the primary yet. This map is
   * created and used by a secondary <code>GatewaySender</code> to keep track of events that have
   * been received by the secondary but not yet processed by the primary. Once an event has been
   * processed by the primary, it is removed from this map. This map will only be used in the event
   * that this <code>GatewaySender</code> becomes the primary. Any events contained in this map will
   * need to be sent to other <code>GatewayReceiver</code>s. Note: unprocessedEventsLock MUST be
   * synchronized before using this map.
   */
  private Map<EventID, EventWrapper> unprocessedEvents;

  /**
   * A <code>Map</code> of tokens (i.e. longs) of entries that we have heard of from the primary but
   * not yet the secondary. This map is created and used by a secondary <code>GatewaySender</code>
   * to keep track. Note: unprocessedEventsLock MUST be synchronized before using this map. This is
   * not a cut and paste error. sync unprocessedEventsLock when using unprocessedTokens.
   */
  protected Map<EventID, Long> unprocessedTokens;

  private ExecutorService executor;

  private Object listenerObjectLock = new Object();

  private boolean failoverCompleted = false;

  private final Object failoverCompletedLock = new Object();

  /**
   * When the Number of unchecked events exceeds this threshold and the number of tokens in the map
   * exceeds this threshold then a check will be done for old tokens.
   */
  protected static final int REAP_THRESHOLD = 1000;

  /*
   * How many events have happened without a reap check being done?
   */
  private int uncheckedCount = 0;


  public SerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id,
      ThreadsMonitoring tMonitoring) {
    super("Event Processor for GatewaySender_" + id, sender, tMonitoring);

    this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
    this.unprocessedTokens = new LinkedHashMap<EventID, Long>();

    initializeMessageQueue(id);
  }

  @Override
  protected void initializeMessageQueue(String id) {
    // Create the region name
    StringBuffer regionNameBuffer = new StringBuffer();
    regionNameBuffer.append(id).append("_SERIAL_GATEWAY_SENDER_QUEUE");
    String regionName = regionNameBuffer.toString();

    CacheListener listener = null;
    if (!this.sender.isPrimary()) {
      listener = new SerialSecondaryGatewayListener(this);
      initializeListenerExecutor();
    }
    // Create the region queue
    this.queue = new SerialGatewaySenderQueue(sender, regionName, listener);

    if (logger.isDebugEnabled()) {
      logger.debug("Created queue: {}", this.queue);
    }
  }

  /**
   * @return false on failure
   */
  protected boolean waitForPrimary() {

    try {
      this.sender.getSenderAdvisor().waitToBecomePrimary(this);
    } catch (InterruptedException e) {
      // No need to set the interrupt bit, we're exiting the thread.
      if (!stopped()) {
        logger.fatal(
            "An InterruptedException occurred. The thread will exit.",
            e);
      }
      shutdownListenerExecutor();
      return false;
    }
    try {
      shutdownListenerExecutor();

      // Don't allow conserve-sockets = false so that ordering is preserved.
      DistributedSystem.setThreadsSocketPolicy(true);

      // Once notification has occurred, handle failover
      if (!stopped()) {
        handleFailover();
      } else {
        return false;
      }
    } catch (RegionDestroyedException e) {
      // This happens during handleFailover
      // because the region on _eventQueue can be closed.
      if (!stopped()) {
        logger.debug("Terminating due to {}", e.getMessage(), e);
      }
      return false;
    } catch (CancelException e) {
      if (!stopped()) {
        logger.debug("Terminating due to {}", e.getMessage(), e);
      }
      return false;
    } finally {
      // This is a good thing even on termination, because it ends waits
      // on the part of other threads.
      completeFailover();
    }
    return true;
  }



  @Override
  public void run() {
    try {
      setRunningStatus();
      // If this is not a primary gateway, wait for notification
      if (!sender.isPrimary()) {
        if (!waitForPrimary()) {
          return;
        }
      } else {
        // we are the primary so mark failover as being completed
        completeFailover();
      }
      // Begin to process the message queue after becoming primary
      if (logger.isDebugEnabled()) {
        logger.debug("Beginning to process the message queue");
      }

      if (!sender.isPrimary()) {
        logger.warn("About to process the message queue but not the primary.");
      }

      // Sleep for a bit. The random is so that if several of these are
      // started at once, they will stagger a bit.
      try {
        Thread.sleep(new Random().nextInt(RANDOM_SLEEP_TIME));
      } catch (InterruptedException e) {
        // no need to set the interrupt bit or throw an exception; just exit.
        return;
      }
      processQueue();
    } catch (CancelException e) {
      if (!this.isStopped()) {
        logger.info("A cancellation occurred. Stopping the dispatcher.");
        setIsStopped(true);
      }
    } catch (VirtualMachineError err) {
      SystemFailure.initiateFailure(err);
      // If this ever returns, rethrow the error. We're poisoned
      // now, so don't let this thread continue.
      throw err;
    } catch (Throwable e) {
      // Whenever you catch Error or Throwable, you must also
      // catch VirtualMachineError (see above). However, there is
      // _still_ a possibility that you are dealing with a cascading
      // error condition, so you also need to check to see if the JVM
      // is still usable:
      SystemFailure.checkFailure();
      logger.fatal(
          "Message dispatch failed due to unexpected exception..", e);
    }
  }

  @Override
  protected void rebalance() {
    // No reason to rebalance a serial sender since all connections are to the same server.
    throw new UnsupportedOperationException();
  }

  /**
   * Handle failover. This method is called when a secondary <code>GatewaySender</code> becomes a
   * primary <code>GatewaySender</code>.
   *
   * Once this secondary becomes the primary, it must:
   * <ul>
   * <li>Remove the queue's CacheListener
   * <li>Process the map of unprocessed events (those it has seen but the previous primary had not
   * yet processed before it crashed). These will include both queued and unqueued events. Remove
   * from the queue any events that were already sent
   * <li>Clear the unprocessed events map
   * </ul>
   */
  protected void handleFailover() {
    /*
     * We must hold this lock while we're processing these maps to prevent us from handling a
     * secondary event while failover occurs. See enqueueEvent
     */
    synchronized (this.unprocessedEventsLock) {
      // Remove the queue's CacheListener
      this.queue.removeCacheListener();
      this.unprocessedTokens = null;

      // Process the map of unprocessed events
      logger.info("Gateway Failover Initiated: Processing {} unprocessed events.",
          this.unprocessedEvents.size());
      GatewaySenderStats statistics = this.sender.getStatistics();
      if (!this.unprocessedEvents.isEmpty()) {
        // do a reap for bug 37603
        reapOld(statistics, true); // to get rid of timed out events
        // now iterate over the region queue to figure out what unprocessed
        // events are already in the queue
        {
          Iterator it = this.queue.getRegion().values().iterator();
          while (it.hasNext() && !stopped()) {
            Object o = it.next();
            if (o != null && o instanceof GatewaySenderEventImpl) {
              GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
              EventWrapper unprocessedEvent = this.unprocessedEvents.remove(ge.getEventId());
              if (unprocessedEvent != null) {
                unprocessedEvent.event.release();
                if (this.unprocessedEvents.isEmpty()) {
                  break;
                }
              }
            }
          }
        }
        // now for every unprocessed event add it to the end of the queue
        {
          Iterator<Map.Entry<EventID, EventWrapper>> it =
              this.unprocessedEvents.entrySet().iterator();
          while (it.hasNext()) {
            if (stopped())
              break;
            Map.Entry<EventID, EventWrapper> me = it.next();
            EventWrapper ew = me.getValue();
            GatewaySenderEventImpl gatewayEvent = ew.event;
            // Initialize each gateway event. This initializes the key,
            // value
            // and callback arg based on the EntryEvent.
            // TODO:wan70, remove dependencies from old code
            gatewayEvent.initialize();
            // Verify that they GatewayEventCallbackArgument is initialized.
            // If not, initialize it. It won't be initialized if a client to
            // this GatewayHub VM was the creator of this event. This Gateway
            // will be the first one to process it. If will be initialized if
            // this event was sent to this Gateway from another GatewayHub
            // (either directly or indirectly).
            GatewaySenderEventCallbackArgument seca = gatewayEvent.getSenderCallbackArgument();
            if (seca.getOriginatingDSId() == GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
              seca.setOriginatingDSId(sender.getMyDSId());
              seca.initializeReceipientDSIds(Collections.singletonList(sender.getRemoteDSId()));
            }
            it.remove();
            boolean queuedEvent = false;
            try {
              queuedEvent = queuePrimaryEvent(gatewayEvent);
            } catch (IOException ex) {
              if (!stopped()) {
                logger.warn(
                    String.format("Event dropped during failover: %s", gatewayEvent),
                    ex);
              }
            } catch (CacheException ex) {
              if (!stopped()) {
                logger.warn(
                    String.format("Event dropped during failover: %s", gatewayEvent),
                    ex);
              }
            } finally {
              if (!queuedEvent) {
                gatewayEvent.release();
              }
            }
          }
        }
        // Clear the unprocessed events map
        statistics.clearUnprocessedMaps();
      }

      // Iterate the entire queue and mark all events as possible
      // duplicate
      logger.info("{} : Marking  {}  events as possible duplicates",
          getSender(), Integer.valueOf(this.queue.size()));
      Iterator it = this.queue.getRegion().values().iterator();
      while (it.hasNext() && !stopped()) {
        Object o = it.next();
        if (o != null && o instanceof GatewaySenderEventImpl) {
          GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
          ge.setPossibleDuplicate(true);
        }
      }

      releaseUnprocessedEvents();
    } // synchronized
  }

  private void releaseUnprocessedEvents() {
    synchronized (this.unprocessedEventsLock) {
      Map<EventID, EventWrapper> m = this.unprocessedEvents;
      if (m != null) {
        for (EventWrapper ew : m.values()) {
          GatewaySenderEventImpl gatewayEvent = ew.event;
          if (logger.isDebugEnabled()) {
            logger.debug("releaseUnprocessedEvents:" + gatewayEvent);
          }
          gatewayEvent.release();
        }
        this.unprocessedEvents = null;
      }
    }
  }

  @Override
  public void closeProcessor() {
    try {
      super.closeProcessor();
    } finally {
      releaseUnprocessedEvents();
    }
  }

  /**
   * Add the input object to the event queue
   */
  @Override
  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue)
      throws IOException, CacheException {
    // There is a case where the event is serialized for processing. The
    // region is not
    // serialized along with the event since it is a transient field. I
    // created an
    // intermediate object (GatewayEventImpl) to avoid this since the region
    // name is
    // used in the sendBatch method, and it can't be null. See EntryEventImpl
    // for details.
    GatewaySenderEventImpl senderEvent = null;

    boolean isPrimary = sender.isPrimary();
    if (!isPrimary) {
      // Fix for #40615. We need to check if we've now become the primary
      // while holding the unprocessedEventsLock. This prevents us from failing
      // over while we're processing an event as a secondaryEvent.
      synchronized (unprocessedEventsLock) {
        // Test whether this gateway is the primary.
        if (sender.isPrimary()) {
          isPrimary = true;
        } else {
          // If it is not, create an uninitialized GatewayEventImpl and
          // put it into the map of unprocessed events, except 2 Special cases:
          // 1) UPDATE_VERSION_STAMP: only enqueue to primary
          // 2) CME && !originRemote: only enqueue to primary
          boolean isUpdateVersionStamp =
              event.getOperation().equals(Operation.UPDATE_VERSION_STAMP);
          boolean isCME_And_NotOriginRemote =
              ((EntryEventImpl) event).isConcurrencyConflict() && !event.isOriginRemote();
          if (!(isUpdateVersionStamp || isCME_And_NotOriginRemote)) {
            senderEvent =
                new GatewaySenderEventImpl(operation, event, substituteValue, false);
            handleSecondaryEvent(senderEvent);
          }
        }
      }
    }
    if (isPrimary) {
      Region region = event.getRegion();
      boolean isPDXRegion = (region instanceof DistributedRegion
          && region.getName().equals(PeerTypeRegistration.REGION_NAME));
      if (!isPDXRegion) {
        waitForFailoverCompletion();
      }
      // If it is, create and enqueue an initialized GatewayEventImpl
      senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue); // OFFHEAP ok

      boolean queuedEvent = false;
      try {
        queuedEvent = queuePrimaryEvent(senderEvent);
      } finally {
        // When queuePrimaryEvent() failed with some exception, it could
        // occur after the GatewaySenderEventImpl is put onto the queue.
        // In that case, the GatewaySenderEventImpl could be released here,
        // and IllegalStateException could be thrown if getDeserializedValue is called
        // when the event is accessed through the region queue.
        if (!queuedEvent) {
          GatewaySenderEventImpl.release(senderEvent);
        }
      }
    }
  }

  private boolean queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
      throws IOException, CacheException {
    // Queue the event
    GatewaySenderStats statistics = this.sender.getStatistics();
    if (logger.isDebugEnabled()) {
      logger.debug("{}: Queueing event ({}): {}", sender.getId(),
          (statistics.getEventsQueued() + 1), gatewayEvent);
    }
    if (!sender.beforeEnqueue(gatewayEvent)) {
      if (logger.isDebugEnabled()) {
        logger.debug("Event {} is not added to queue.", gatewayEvent);
      }
      statistics.incEventsFiltered();
      return false;
    }
    long start = statistics.startTime();
    boolean putDone = false;
    try {
      putDone = this.queue.put(gatewayEvent);
    } catch (InterruptedException e) {
      // Asif Not expected from SingleWriteSingleReadRegionQueue as it does not
      // throw
      // InterruptedException. But since both HARegionQueue and
      // SingleReadSingleWriteRegionQueue
      // extend RegionQueue , it has to handle InterruptedException
      Thread.currentThread().interrupt();
      getSender().getCancelCriterion().checkCancelInProgress(e);
    }
    statistics.endPut(start);
    if (logger.isDebugEnabled()) {
      logger.debug("{}: Queued event ({}): {}", sender.getId(), (statistics.getEventsQueued()),
          gatewayEvent);
    }
    // this._logger.warning(getGateway() + ": Queued event (" +
    // (statistics.getEventsQueued()) + "): " + gatewayEvent + " queue size: "
    // + this._eventQueue.size());
    /*
     * FAILOVER TESTING CODE System.out.println(getName() + ": Queued event (" +
     * (statistics.getEventsQueued()) + "): " + gatewayEvent.getId());
     */
    int queueSize = eventQueueSize();
    statistics.incQueueSize(1);
    if (!this.eventQueueSizeWarning && queueSize >= AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) {
      logger.warn("{}: The event queue has reached {} events. Processing will continue.",
          sender.getId(),
          Integer.valueOf(AbstractGatewaySender.QUEUE_SIZE_THRESHOLD));
      this.eventQueueSizeWarning = true;
    }
    return putDone;
  }

  protected void waitForFailoverCompletion() {
    synchronized (this.failoverCompletedLock) {
      if (this.failoverCompleted) {
        return;
      }
      logger.info("{} : Waiting for failover completion", this);
      try {
        while (!this.failoverCompleted) {
          this.failoverCompletedLock.wait();
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        this.sender.getCache().getCancelCriterion().checkCancelInProgress(ex);
        logger.info("{}: did not wait for failover completion due to interruption.",
            this);
      }
    }
  }

  protected void completeFailover() {
    synchronized (this.failoverCompletedLock) {
      this.failoverCompleted = true;
      this.failoverCompletedLock.notifyAll();
    }
  }

  /**
   * Update an unprocessed event in the unprocessed events map. This method is called by a secondary
   * <code>GatewaySender</code> to store a gateway event until it is processed by a primary
   * <code>GatewaySender</code>. The complexity of this method is the fact that the event could be
   * processed first by either the primary or secondary <code>GatewaySender</code>.
   *
   * If the primary processes the event first, the map will already contain an entry for the event
   * (through
   * {@link org.apache.geode.internal.cache.wan.serial.SerialSecondaryGatewayListener#afterDestroy}
   * ). When the secondary processes the event, it will remove it from the map.
   *
   * If the secondary processes the event first, it will add it to the map. When the primary
   * processes the event (through
   * {@link org.apache.geode.internal.cache.wan.serial.SerialSecondaryGatewayListener#afterDestroy}
   * ), it will then be removed from the map.
   *
   * @param senderEvent The event being processed
   */
  protected void handleSecondaryEvent(GatewaySenderEventImpl senderEvent) {
    basicHandleSecondaryEvent(senderEvent);
  }

  /**
   * Update an unprocessed event in the unprocessed events map. This method is called by a primary
   * <code>Gateway</code> (through
   * {@link org.apache.geode.internal.cache.wan.serial.SerialSecondaryGatewayListener#afterCreate} )
   * to notify the secondary <code>Gateway</code> that an event has been added to the queue. Once an
   * event has been added to the queue, the secondary no longer needs to keep track of it in the
   * unprocessed events map. The complexity of this method is the fact that the event could be
   * processed first by either the primary or secondary <code>Gateway</code>.
   *
   * If the primary processes the event first, the map will not contain an entry for the event. It
   * will be added to the map in this case so that when the secondary processes it, it will know
   * that the primary has already processed it, and it can be safely removed.
   *
   * If the secondary processes the event first, the map will already contain an entry for the
   * event. In this case, the event can be removed from the map.
   *
   * @param gatewayEvent The event being processed
   */
  protected void handlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
    Executor my_executor = this.executor;
    synchronized (listenerObjectLock) {
      if (my_executor == null) {
        // should mean we are now primary
        return;
      }
      my_executor.execute(new Runnable() {
        @Override
        public void run() {
          basicHandlePrimaryEvent(gatewayEvent);
        }
      });
    }
  }

  /**
   * Called when the primary gets rid of an event from the queue This method added to fix bug 37603
   */
  protected void handlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) {
    Executor my_executor = this.executor;
    synchronized (listenerObjectLock) {
      if (my_executor == null) {
        // should mean we are now primary
        return;
      }
      my_executor.execute(new Runnable() {
        @Override
        public void run() {
          basicHandlePrimaryDestroy(gatewayEvent.getEventId());
        }
      });
    }
  }

  /**
   * Just remove the event from the unprocessed events map if it is present. This method added to
   * fix bug 37603
   */
  protected boolean basicHandlePrimaryDestroy(final EventID eventId) {
    if (this.sender.isPrimary()) {
      // no need to do anything if we have become the primary
      return false;
    }
    GatewaySenderStats statistics = this.sender.getStatistics();
    // Get the event from the map
    synchronized (unprocessedEventsLock) {
      if (this.unprocessedEvents == null)
        return false;
      // now we can safely use the unprocessedEvents field
      EventWrapper ew = this.unprocessedEvents.remove(eventId);
      if (ew != null) {
        ew.event.release();
        statistics.incUnprocessedEventsRemovedByPrimary();
        return true;
      }
    }
    return false;
  }

  protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
    if (this.sender.isPrimary()) {
      // no need to do anything if we have become the primary
      return;
    }
    GatewaySenderStats statistics = this.sender.getStatistics();
    // Get the event from the map
    synchronized (unprocessedEventsLock) {
      if (this.unprocessedEvents == null)
        return;
      // now we can safely use the unprocessedEvents field
      EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());

      if (ew == null) {
        // first time for the event
        if (logger.isTraceEnabled()) {
          logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed token map",
              sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(),
              gatewayEvent.getValueAsString(true));
        }
        {
          Long mapValue =
              Long.valueOf(System.currentTimeMillis() + AbstractGatewaySender.TOKEN_TIMEOUT);
          Long oldv = this.unprocessedTokens.put(gatewayEvent.getEventId(), mapValue);
          if (oldv == null) {
            statistics.incUnprocessedTokensAddedByPrimary();
          } else {
            // its ok for oldv to be non-null
            // this shouldn't happen anymore @todo add an assertion here
          }
        }
      } else {
        // already added by secondary (i.e. hub)
        // the secondary
        // gateway has already seen this event, and it can be safely
        // removed (it already was above)
        if (logger.isTraceEnabled()) {
          logger.trace(
              "{}: Primary create/update event {}:{}->{} remove from unprocessed events map",
              sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(),
              gatewayEvent.getValueAsString(true));
        }
        ew.event.release();
        statistics.incUnprocessedEventsRemovedByPrimary();
      }
      reapOld(statistics, false);
    }
  }

  private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent) {
    boolean freeGatewayEvent = true;
    try {
      GatewaySenderStats statistics = this.sender.getStatistics();
      // Get the event from the map

      if (!getSender().getGatewayEventFilters().isEmpty()) {
        try {
          gatewayEvent.initialize();
        } catch (Exception e) {
          logger.warn(
              String.format("Event failed to be initialized: %s", gatewayEvent), e);
        }
        if (!sender.beforeEnqueue(gatewayEvent)) {
          statistics.incEventsFiltered();
          return;
        }
      }
      Assert.assertHoldsLock(unprocessedEventsLock, true);
      Assert.assertTrue(unprocessedEvents != null);
      // @todo add an assertion that !getPrimary()
      // now we can safely use the unprocessedEvents field
      Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());

      if (v == null) {
        // first time for the event
        if (logger.isTraceEnabled()) {
          logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map",
              sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(),
              gatewayEvent.getValueAsString(true));
        }
        {
          EventWrapper mapValue = new EventWrapper(gatewayEvent);
          EventWrapper oldv = this.unprocessedEvents.put(gatewayEvent.getEventId(), mapValue);
          if (oldv == null) {
            freeGatewayEvent = false;
            statistics.incUnprocessedEventsAddedBySecondary();
          } else {
            // put old one back in
            this.unprocessedEvents.put(gatewayEvent.getEventId(), oldv);
            // already added by secondary (i.e. hub)
            logger.warn(
                "{}: The secondary map already contained an event from hub {} so ignoring new event {}.",
                sender.getId(), v, gatewayEvent);
          }
        }
      } else {
        // token already added by primary already removed
        if (logger.isTraceEnabled()) {
          logger.trace("{}: Secondary created event {}:{}->{} removed from unprocessed events map",
              sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(),
              gatewayEvent.getValueAsString(true));
        }
        statistics.incUnprocessedTokensRemovedBySecondary();
      }
      reapOld(statistics, false);
    } finally {
      if (freeGatewayEvent) {
        gatewayEvent.release();
      }
    }
  }

  /**
   * Call to check if a cleanup of tokens needs to be done
   */
  private void reapOld(final GatewaySenderStats statistics, boolean forceEventReap) {
    synchronized (this.unprocessedEventsLock) {
      if (uncheckedCount > REAP_THRESHOLD) { // only check every X events
        uncheckedCount = 0;
        long now = System.currentTimeMillis();
        if (!forceEventReap && this.unprocessedTokens.size() > REAP_THRESHOLD) {
          Iterator<Map.Entry<EventID, Long>> it = this.unprocessedTokens.entrySet().iterator();
          int count = 0;
          while (it.hasNext()) {
            Map.Entry<EventID, Long> me = it.next();
            long meValue = me.getValue().longValue();
            if (meValue <= now) {
              // @todo log fine level message here
              // it has expired so remove it
              it.remove();
              count++;
            } else {
              // all done try again
              break;
            }
          }
          if (count > 0) {
            // statistics.incUnprocessedTokensRemovedByTimeout(count);
          }
        }
        if (forceEventReap || this.unprocessedEvents.size() > REAP_THRESHOLD) {
          Iterator<Map.Entry<EventID, EventWrapper>> it =
              this.unprocessedEvents.entrySet().iterator();
          int count = 0;
          while (it.hasNext()) {
            Map.Entry<EventID, EventWrapper> me = it.next();
            EventWrapper ew = me.getValue();
            if (ew.timeout <= now) {
              // @todo log fine level message here
              // it has expired so remove it
              it.remove();
              ew.event.release();
              count++;
            } else {
              // all done try again
              break;
            }
          }
          if (count > 0) {
            // statistics.incUnprocessedEventsRemovedByTimeout(count);
          }
        }
      } else {
        uncheckedCount++;
      }
    }
  }

  @Override
  public String toString() {
    StringBuffer buffer = new StringBuffer();
    buffer.append("GatewayEventProcessor[").append("gatewaySenderId=").append(sender.getId())
        .append(";remoteDSId=").append(getSender().getRemoteDSId()).append(";batchSize=")
        .append(getSender().getBatchSize());
    buffer.append("]");
    return buffer.toString();
  }

  /**
   * Initialize the Executor that handles listener events. Only used by non-primary gateway senders
   */
  private void initializeListenerExecutor() {
    this.executor =
        LoggingExecutors.newFixedThreadPoolWithTimeout("Queued Gateway Listener Thread", 1, 120);
  }

  private void shutdownListenerExecutor() {
    synchronized (listenerObjectLock) {
      if (this.executor != null) {
        this.executor.shutdown();
        this.executor = null;
      }
    }
  }

  @Override
  public void removeCacheListener() {
    this.queue.removeCacheListener();
  }

  @Override
  public void initializeEventDispatcher() {
    if (logger.isDebugEnabled()) {
      logger.debug(" Creating the GatewayEventCallbackDispatcher");
    }
    this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);

  }

  @Override
  protected void enqueueEvent(GatewayQueueEvent event) {
    // @TODO This API hasn't been implemented yet
    throw new UnsupportedOperationException();
  }

  public void sendBatchDestroyOperationForDroppedEvent(EntryEventImpl dropEvent, int index) {
    EntryEventImpl destroyEvent =
        EntryEventImpl.create((LocalRegion) this.queue.getRegion(), Operation.DESTROY, (long) index,
            null/* newValue */, null, false, sender.getCache().getMyId());
    destroyEvent.setEventId(dropEvent.getEventId());
    destroyEvent.disallowOffHeapValues();
    destroyEvent.setTailKey(-1L);
    if (logger.isDebugEnabled()) {
      logger.debug(
          "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for event {}",
          destroyEvent);
    }

    try {
      BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent);
      op.distribute();
      if (logger.isDebugEnabled()) {
        logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent);
      }
    } catch (Exception ignore) {
      if (logger.isDebugEnabled()) {
        logger.debug(
            "Exception in sending dropped event could be ignored in order not to interrupt sender starting",
            ignore);
      }
    }
  }

  @Override
  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
    this.getSender().setModifiedEventId(droppedEvent);
    sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1);
  }

  private String printEventIdList(Set<EventID> eventIds) {
    StringBuffer sb = new StringBuffer().append("[").append(
        eventIds.stream().map(entry -> entry.expensiveToString()).collect(Collectors.joining(", ")))
        .append("]");
    return sb.toString();
  }

  @Override
  public String printUnprocessedEvents() {
    synchronized (this.unprocessedEventsLock) {
      return printEventIdList(this.unprocessedEvents.keySet());
    }
  }

  @Override
  public String printUnprocessedTokens() {
    synchronized (this.unprocessedEventsLock) {
      return printEventIdList(this.unprocessedTokens.keySet());
    }
  }
}
