/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.monitoring.ThreadsMonitoringImpl;
import org.apache.geode.internal.monitoring.ThreadsMonitoringImplDummy;
import org.apache.geode.internal.tcp.ConnectionTable;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class ClusterOperationExecutors implements OperationExecutors {
  private static final Logger logger = LogService.getLogger();

  /**
   * maximum time, in milliseconds, to wait for all threads to exit
   */
  static final int MAX_STOP_TIME = 20000;

  /**
   * Time to sleep, in milliseconds, while polling to see if threads have finished
   */
  static final int STOP_PAUSE_TIME = 1000;

  /**
   * Flag indicating whether to use single Serial-Executor thread or Multiple Serial-executor
   * thread,
   */
  private static final boolean MULTI_SERIAL_EXECUTORS =
      !Boolean.getBoolean("DistributionManager.singleSerialExecutor");

  private static final int MAX_WAITING_THREADS =
      Integer.getInteger("DistributionManager.MAX_WAITING_THREADS", Integer.MAX_VALUE);

  private static final int MAX_PR_META_DATA_CLEANUP_THREADS =
      Integer.getInteger("DistributionManager.MAX_PR_META_DATA_CLEANUP_THREADS", 1);

  private static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
      Math.max(Runtime.getRuntime().availableProcessors() * 32, 200));

  private static final int INCOMING_QUEUE_LIMIT =
      Integer.getInteger("DistributionManager.INCOMING_QUEUE_LIMIT", 80000);

  /** Throttling based on the Queue byte size */
  private static final double THROTTLE_PERCENT = (double) (Integer
      .getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE_PERCENT", 75)) / 100;

  private static final int SERIAL_QUEUE_BYTE_LIMIT = Integer
      .getInteger("DistributionManager.SERIAL_QUEUE_BYTE_LIMIT", (40 * (1024 * 1024)));

  private static final int SERIAL_QUEUE_THROTTLE =
      Integer.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE",
          (int) (SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT));

  private static final int TOTAL_SERIAL_QUEUE_BYTE_LIMIT =
      Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_BYTE_LIMIT", (80 * (1024 * 1024)));

  private static final int TOTAL_SERIAL_QUEUE_THROTTLE =
      Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_THROTTLE",
          (int) (SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT));

  /** Throttling based on the Queue item size */
  private static final int SERIAL_QUEUE_SIZE_LIMIT =
      Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_LIMIT", 20000);

  private static final int SERIAL_QUEUE_SIZE_THROTTLE =
      Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_THROTTLE",
          (int) (SERIAL_QUEUE_SIZE_LIMIT * THROTTLE_PERCENT));

  /** Max number of serial Queue executors, in case of multi-serial-queue executor */
  private static final int MAX_SERIAL_QUEUE_THREAD =
      Integer.getInteger("DistributionManager.MAX_SERIAL_QUEUE_THREAD", 20);

  // 76 not in use


  /**
   * Executor for view related messages
   *
   * @see ViewAckMessage
   */

  private final InternalDistributedSystem system;

  private final DistributionStats stats;


  /** Message processing thread pool */
  private final ExecutorService threadPool;

  /**
   * High Priority processing thread pool, used for initializing messages such as UpdateAttributes
   * and CreateRegion messages
   */
  private final ExecutorService highPriorityPool;

  /**
   * Waiting Pool, used for messages that may have to wait on something. Use this separate pool with
   * an unbounded queue so that waiting runnables don't get in the way of other processing threads.
   * Used for threads that will most likely have to wait for a region to be finished initializing
   * before it can proceed
   */
  private final ExecutorService waitingPool;

  private final ExecutorService prMetaDataCleanupThreadPool;

  /**
   * Thread used to decouple {@link org.apache.geode.internal.cache.partitioned.PartitionMessage}s
   * from {@link org.apache.geode.internal.cache.DistributedCacheOperation}s </b>
   *
   * @see #SERIAL_EXECUTOR
   */
  private ExecutorService partitionedRegionThread;
  private ExecutorService partitionedRegionPool;

  /** Function Execution executors */
  private ExecutorService functionExecutionThread;
  private ExecutorService functionExecutionPool;

  /** Message processing executor for serial, ordered, messages. */
  private final ExecutorService serialThread;

  /**
   * If using a throttling queue for the serialThread, we cache the queue here so we can see if
   * delivery would block
   */
  private ThrottlingMemLinkedQueueWithDMStats<Runnable> serialQueue;

  /**
   * Thread Monitor mechanism to monitor system threads
   *
   * @see org.apache.geode.internal.monitoring.ThreadsMonitoring
   */
  private final ThreadsMonitoring threadMonitor;

  private SerialQueuedExecutorPool serialQueuedExecutorPool;

  @MutableForTesting
  public static final AtomicInteger maxPrThreadsForTest = new AtomicInteger(-1);

  ClusterOperationExecutors(DistributionStats stats,
      InternalDistributedSystem system) {

    this.stats = stats;
    this.system = system;

    DistributionConfig config = system.getConfig();

    threadMonitor = config.getThreadMonitorEnabled() ? new ThreadsMonitoringImpl(system,
        config.getThreadMonitorInterval(), config.getThreadMonitorTimeLimit())
        : new ThreadsMonitoringImplDummy();


    if (MULTI_SERIAL_EXECUTORS) {
      if (logger.isInfoEnabled(LogMarker.DM_MARKER)) {
        logger.info(LogMarker.DM_MARKER,
            "Serial Queue info :" + " THROTTLE_PERCENT: " + THROTTLE_PERCENT
                + " SERIAL_QUEUE_BYTE_LIMIT :" + SERIAL_QUEUE_BYTE_LIMIT
                + " SERIAL_QUEUE_THROTTLE :" + SERIAL_QUEUE_THROTTLE
                + " TOTAL_SERIAL_QUEUE_BYTE_LIMIT :" + TOTAL_SERIAL_QUEUE_BYTE_LIMIT
                + " TOTAL_SERIAL_QUEUE_THROTTLE :" + TOTAL_SERIAL_QUEUE_THROTTLE
                + " SERIAL_QUEUE_SIZE_LIMIT :" + SERIAL_QUEUE_SIZE_LIMIT
                + " SERIAL_QUEUE_SIZE_THROTTLE :" + SERIAL_QUEUE_SIZE_THROTTLE);
      }
      // when TCP/IP is disabled we can't throttle the serial queue or we run the risk of
      // distributed deadlock when we block the UDP reader thread
      boolean throttlingDisabled = system.getConfig().getDisableTcp();
      serialQueuedExecutorPool =
          new SerialQueuedExecutorPool(stats, throttlingDisabled, threadMonitor);
    }

    {
      BlockingQueue<Runnable> poolQueue;
      if (SERIAL_QUEUE_BYTE_LIMIT == 0) {
        poolQueue = new OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
      } else {
        serialQueue =
            new ThrottlingMemLinkedQueueWithDMStats<>(TOTAL_SERIAL_QUEUE_BYTE_LIMIT,
                TOTAL_SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE,
                stats.getSerialQueueHelper());
        poolQueue = serialQueue;
      }
      serialThread = CoreLoggingExecutors.newSerialThreadPool(poolQueue, "Serial Message Processor",
          thread -> stats.incSerialThreadStarts(),
          this::doSerialThread, stats.getSerialProcessorHelper(),
          threadMonitor);

    }

    threadPool =
        CoreLoggingExecutors.newThreadPoolWithFeedStatistics(MAX_THREADS,
            INCOMING_QUEUE_LIMIT, stats.getOverflowQueueHelper(), "Pooled Message Processor ",
            thread -> stats.incProcessingThreadStarts(), this::doProcessingThread,
            stats.getNormalPoolHelper(),
            threadMonitor);

    highPriorityPool = CoreLoggingExecutors.newThreadPoolWithFeedStatistics(
        MAX_THREADS, INCOMING_QUEUE_LIMIT, stats.getHighPriorityQueueHelper(),
        "Pooled High Priority Message Processor ", thread -> stats.incHighPriorityThreadStarts(),
        this::doHighPriorityThread, stats.getHighPriorityPoolHelper(),
        threadMonitor);

    {
      BlockingQueue<Runnable> poolQueue;
      if (MAX_WAITING_THREADS == Integer.MAX_VALUE) {
        // no need for a queue since we have infinite threads
        poolQueue = new SynchronousQueue<>();
      } else {
        poolQueue = new OverflowQueueWithDMStats<>(stats.getWaitingQueueHelper());
      }
      waitingPool = CoreLoggingExecutors.newThreadPool(MAX_WAITING_THREADS, poolQueue,
          "Pooled Waiting Message Processor ",
          thread -> stats.incWaitingThreadStarts(), this::doWaitingThread,
          stats.getWaitingPoolHelper(), threadMonitor);
    }

    // should this pool using the waiting pool stats?
    prMetaDataCleanupThreadPool =
        CoreLoggingExecutors.newThreadPoolWithFeedStatistics(
            MAX_PR_META_DATA_CLEANUP_THREADS, 0, stats.getWaitingQueueHelper(),
            "PrMetaData cleanup Message Processor ", thread -> stats.incWaitingThreadStarts(),
            this::doWaitingThread, stats.getWaitingPoolHelper(),
            threadMonitor);

    int maxPrThreads = maxPrThreadsForTest.get() > 0 ? maxPrThreadsForTest.get() : MAX_PR_THREADS;
    if (maxPrThreads > 1) {
      partitionedRegionPool =
          CoreLoggingExecutors.newThreadPoolWithFeedStatistics(
              maxPrThreads, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(),
              "PartitionedRegion Message Processor",
              thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
              stats.getPartitionedRegionPoolHelper(),
              threadMonitor);
    } else {
      partitionedRegionThread = CoreLoggingExecutors.newSerialThreadPoolWithFeedStatistics(
          INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(),
          "PartitionedRegion Message Processor",
          thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
          stats.getPartitionedRegionPoolHelper(), threadMonitor);
    }
    if (MAX_FE_THREADS > 1) {
      functionExecutionPool =
          CoreLoggingExecutors.newFunctionThreadPoolWithFeedStatistics(
              MAX_FE_THREADS, INCOMING_QUEUE_LIMIT, stats.getFunctionExecutionQueueHelper(),
              FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX,
              thread -> stats.incFunctionExecutionThreadStarts(), this::doFunctionExecutionThread,
              stats.getFunctionExecutionPoolHelper(),
              threadMonitor);
    } else {
      functionExecutionThread =
          CoreLoggingExecutors.newSerialThreadPoolWithFeedStatistics(
              INCOMING_QUEUE_LIMIT, stats.getFunctionExecutionQueueHelper(),
              FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX,
              thread -> stats.incFunctionExecutionThreadStarts(), this::doFunctionExecutionThread,
              stats.getFunctionExecutionPoolHelper(), threadMonitor);
    }
  }

  /**
   * Returns the executor for the given type of processor.
   */
  @Override
  public Executor getExecutor(int processorType, InternalDistributedMember sender) {
    switch (processorType) {
      case STANDARD_EXECUTOR:
        return getThreadPool();
      case SERIAL_EXECUTOR:
        return getSerialExecutor(sender);
      case HIGH_PRIORITY_EXECUTOR:
        return getHighPriorityThreadPool();
      case WAITING_POOL_EXECUTOR:
        return getWaitingThreadPool();
      case PARTITIONED_REGION_EXECUTOR:
        return getPartitionedRegionExecutor();
      case REGION_FUNCTION_EXECUTION_EXECUTOR:
        return getFunctionExecutor();
      default:
        throw new InternalGemFireError(String.format("unknown processor type %s",
            processorType));
    }
  }

  @Override
  public ExecutorService getThreadPool() {
    return threadPool;
  }

  @Override
  public ExecutorService getHighPriorityThreadPool() {
    return highPriorityPool;
  }

  @Override
  public ExecutorService getWaitingThreadPool() {
    return waitingPool;
  }

  @Override
  public ExecutorService getPrMetaDataCleanupThreadPool() {
    return prMetaDataCleanupThreadPool;
  }

  private Executor getPartitionedRegionExecutor() {
    if (partitionedRegionThread != null) {
      return partitionedRegionThread;
    } else {
      return partitionedRegionPool;
    }
  }


  @Override
  public Executor getFunctionExecutor() {
    if (functionExecutionThread != null) {
      return functionExecutionThread;
    } else {
      return functionExecutionPool;
    }
  }

  private Executor getSerialExecutor(InternalDistributedMember sender) {
    if (MULTI_SERIAL_EXECUTORS) {
      return serialQueuedExecutorPool.getThrottledSerialExecutor(sender);
    } else {
      return serialThread;
    }
  }

  /** returns the serialThread's queue if throttling is being used, null if not */
  @Override
  public OverflowQueueWithDMStats<Runnable> getSerialQueue(InternalDistributedMember sender) {
    if (MULTI_SERIAL_EXECUTORS) {
      return serialQueuedExecutorPool.getSerialQueue(sender);
    } else {
      return serialQueue;
    }
  }

  public ThreadsMonitoring getThreadMonitoring() {
    return threadMonitor;
  }



  private void doFunctionExecutionThread(Runnable command) {
    stats.incFunctionExecutionThreads(1);
    FunctionExecutionPooledExecutor.setIsFunctionExecutionThread(Boolean.TRUE);
    try {
      ConnectionTable.threadWantsSharedResources();
      runUntilShutdown(command);
    } finally {
      ConnectionTable.releaseThreadsSockets();
      stats.incFunctionExecutionThreads(-1);
      FunctionExecutionPooledExecutor.setIsFunctionExecutionThread(Boolean.FALSE);
    }
  }

  private void doProcessingThread(Runnable command) {
    stats.incNumProcessingThreads(1);
    try {
      ConnectionTable.threadWantsSharedResources();
      runUntilShutdown(command);
    } finally {
      ConnectionTable.releaseThreadsSockets();
      stats.incNumProcessingThreads(-1);
    }
  }

  private void doHighPriorityThread(Runnable command) {
    stats.incHighPriorityThreads(1);
    try {
      ConnectionTable.threadWantsSharedResources();
      runUntilShutdown(command);
    } finally {
      ConnectionTable.releaseThreadsSockets();
      stats.incHighPriorityThreads(-1);
    }
  }

  private void doWaitingThread(Runnable command) {
    stats.incWaitingThreads(1);
    try {
      ConnectionTable.threadWantsSharedResources();
      runUntilShutdown(command);
    } finally {
      ConnectionTable.releaseThreadsSockets();
      stats.incWaitingThreads(-1);
    }
  }

  private void doPartitionRegionThread(Runnable command) {
    stats.incPartitionedRegionThreads(1);
    try {
      ConnectionTable.threadWantsSharedResources();
      runUntilShutdown(command);
    } finally {
      ConnectionTable.releaseThreadsSockets();
      stats.incPartitionedRegionThreads(-1);
    }
  }

  private void doSerialThread(Runnable command) {
    stats.incNumSerialThreads(1);
    try {
      ConnectionTable.threadWantsSharedResources();
      runUntilShutdown(command);
    } finally {
      ConnectionTable.releaseThreadsSockets();
      stats.incNumSerialThreads(-1);
    }
  }

  private void runUntilShutdown(Runnable r) {
    try {
      r.run();
    } catch (CancelException e) {
      if (logger.isTraceEnabled()) {
        logger.trace("Caught shutdown exception", e);
      }
    } catch (VirtualMachineError err) {
      SystemFailure.initiateFailure(err);
      // If this ever returns, rethrow the error. We're poisoned
      // now, so don't let this thread continue.
      throw err;
    } catch (Throwable t) {
      SystemFailure.checkFailure();
      if (!system.isDisconnecting()) {
        logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
      } else {
        logger.warn("Task failed with exception", t);
      }
    }
  }

  void askThreadsToStop() {
    // Stop executors after they have finished
    ExecutorService es;
    threadMonitor.close();
    es = serialThread;
    if (es != null) {
      es.shutdown();
    }
    if (serialQueuedExecutorPool != null) {
      serialQueuedExecutorPool.shutdown();
    }
    es = functionExecutionThread;
    if (es != null) {
      es.shutdown();
    }
    es = functionExecutionPool;
    if (es != null) {
      es.shutdown();
    }
    es = partitionedRegionThread;
    if (es != null) {
      es.shutdown();
    }
    es = partitionedRegionPool;
    if (es != null) {
      es.shutdown();
    }
    es = highPriorityPool;
    if (es != null) {
      es.shutdown();
    }
    es = waitingPool;
    if (es != null) {
      es.shutdown();
    }
    es = prMetaDataCleanupThreadPool;
    if (es != null) {
      es.shutdown();
    }
    es = threadPool;
    if (es != null) {
      es.shutdown();
    }
  }

  void waitForThreadsToStop(long timeInMillis) throws InterruptedException {
    long start = System.currentTimeMillis();
    long remaining = timeInMillis;

    ExecutorService[] allExecutors = new ExecutorService[] {serialThread,
        functionExecutionThread, functionExecutionPool, partitionedRegionThread,
        partitionedRegionPool, highPriorityPool, waitingPool,
        prMetaDataCleanupThreadPool, threadPool};
    for (ExecutorService es : allExecutors) {
      if (es != null) {
        es.awaitTermination(remaining, TimeUnit.MILLISECONDS);
      }
      remaining = timeInMillis - (System.currentTimeMillis() - start);
      if (remaining <= 0) {
        return;
      }
    }

    serialQueuedExecutorPool.awaitTermination(remaining, TimeUnit.MILLISECONDS);
  }

  private boolean executorAlive(ExecutorService tpe, String name) {
    if (tpe == null) {
      return false;
    } else {
      int ac = ((ThreadPoolExecutor) tpe).getActiveCount();
      // boolean result = tpe.getActiveCount() > 0;
      if (ac > 0) {
        if (logger.isDebugEnabled()) {
          logger.debug("Still waiting for {} threads in '{}' pool to exit", ac, name);
        }
        return true;
      } else {
        return false;
      }
    }
  }


  /**
   * Wait for the ancillary queues to exit. Kills them if they are still around.
   *
   */
  void forceThreadsToStop() {
    long endTime = System.currentTimeMillis() + MAX_STOP_TIME;
    StringBuilder culprits;
    for (;;) {
      boolean stillAlive = false;
      culprits = new StringBuilder();
      if (executorAlive(serialThread, "serial thread")) {
        stillAlive = true;
        culprits.append(" serial thread;");
      }
      if (executorAlive(partitionedRegionThread, "partitioned region thread")) {
        stillAlive = true;
        culprits.append(" partitioned region thread;");
      }
      if (executorAlive(partitionedRegionPool, "partitioned region pool")) {
        stillAlive = true;
        culprits.append(" partitioned region pool;");
      }
      if (executorAlive(highPriorityPool, "high priority pool")) {
        stillAlive = true;
        culprits.append(" high priority pool;");
      }
      if (executorAlive(waitingPool, "waiting pool")) {
        stillAlive = true;
        culprits.append(" waiting pool;");
      }
      if (executorAlive(prMetaDataCleanupThreadPool, "prMetaDataCleanupThreadPool")) {
        stillAlive = true;
        culprits.append(" special waiting pool;");
      }
      if (executorAlive(threadPool, "thread pool")) {
        stillAlive = true;
        culprits.append(" thread pool;");
      }

      if (!stillAlive) {
        return;
      }

      long now = System.currentTimeMillis();
      if (now >= endTime) {
        break;
      }

      try {
        Thread.sleep(STOP_PAUSE_TIME);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        // Desperation, the shutdown thread is being killed. Don't
        // consult a CancelCriterion.
        logger.warn("Interrupted during shutdown", e);
        break;
      }
    } // for

    logger.warn("Daemon threads are slow to stop; culprits include: {}",
        culprits);

    // Kill with no mercy
    if (serialThread != null) {
      serialThread.shutdownNow();
    }
    if (functionExecutionThread != null) {
      functionExecutionThread.shutdownNow();
    }
    if (functionExecutionPool != null) {
      functionExecutionPool.shutdownNow();
    }
    if (partitionedRegionThread != null) {
      partitionedRegionThread.shutdownNow();
    }
    if (partitionedRegionPool != null) {
      partitionedRegionPool.shutdownNow();
    }
    if (highPriorityPool != null) {
      highPriorityPool.shutdownNow();
    }
    if (waitingPool != null) {
      waitingPool.shutdownNow();
    }
    if (prMetaDataCleanupThreadPool != null) {
      prMetaDataCleanupThreadPool.shutdownNow();
    }
    if (threadPool != null) {
      threadPool.shutdownNow();
    }
  }

  public void handleManagerDeparture(InternalDistributedMember theId) {
    if (serialQueuedExecutorPool != null) {
      serialQueuedExecutorPool.handleMemberDeparture(theId);
    }
  }

  /**
   * This class is used for DM's multi serial executor. The serial messages are managed/executed by
   * multiple serial thread. This class takes care of executing messages related to a sender using
   * the same thread.
   */
  private static class SerialQueuedExecutorPool {
    /** To store the serial threads */
    final ConcurrentMap<Integer, ExecutorService> serialQueuedExecutorMap =
        new ConcurrentHashMap<>(MAX_SERIAL_QUEUE_THREAD);

    /** To store the queue associated with thread */
    final Map<Integer, OverflowQueueWithDMStats<Runnable>> serialQueuedMap =
        new HashMap<>(MAX_SERIAL_QUEUE_THREAD);

    /** Holds mapping between sender to the serial thread-id */
    final Map<InternalDistributedMember, Integer> senderToSerialQueueIdMap = new HashMap<>();

    /**
     * Holds info about unused thread, a thread is marked unused when the member associated with it
     * has left distribution system.
     */
    final ArrayList<Integer> threadMarkedForUse = new ArrayList<>();

    final DistributionStats stats;

    final boolean throttlingDisabled;

    final ThreadsMonitoring threadMonitoring;

    SerialQueuedExecutorPool(DistributionStats stats,
        boolean throttlingDisabled, ThreadsMonitoring tMonitoring) {
      this.stats = stats;
      this.throttlingDisabled = throttlingDisabled;
      threadMonitoring = tMonitoring;
    }

    /*
     * Returns an id of the thread in serialQueuedExecutorMap, that's mapped to the given sender.
     *
     *
     * @param createNew boolean flag to indicate whether to create a new id, if id does not exist.
     */
    private Integer getQueueId(InternalDistributedMember sender, boolean createNew) {
      // Create a new Id.
      Integer queueId;

      synchronized (senderToSerialQueueIdMap) {
        // Check if there is a executor associated with this sender.
        queueId = senderToSerialQueueIdMap.get(sender);

        if (!createNew || queueId != null) {
          return queueId;
        }

        // Create new.
        // Check if any threads are available that is marked for Use.
        if (!threadMarkedForUse.isEmpty()) {
          queueId = threadMarkedForUse.remove(0);
        }
        // If Map is full, use the threads in round-robin fashion.
        if (queueId == null) {
          queueId = (serialQueuedExecutorMap.size() + 1) % MAX_SERIAL_QUEUE_THREAD;
        }
        senderToSerialQueueIdMap.put(sender, queueId);
      }
      return queueId;
    }

    /*
     * Returns the queue associated with this sender. Used in FlowControl for throttling (based on
     * queue size).
     */
    OverflowQueueWithDMStats<Runnable> getSerialQueue(InternalDistributedMember sender) {
      Integer queueId = getQueueId(sender, false);
      if (queueId == null) {
        return null;
      }
      return serialQueuedMap.get(queueId);
    }

    /*
     * Returns the serial queue executor, before returning the thread this applies throttling, based
     * on the total serial queue size (total - sum of all the serial queue size). The throttling is
     * applied during put event, this doesnt block the extract operation on the queue.
     *
     */
    ExecutorService getThrottledSerialExecutor(
        InternalDistributedMember sender) {
      ExecutorService executor = getSerialExecutor(sender);

      // Get the total serial queue size.
      long totalSerialQueueMemSize = stats.getInternalSerialQueueBytes();

      // for tcp socket reader threads, this code throttles the thread
      // to keep the sender-side from overwhelming the receiver.
      // UDP readers are throttled in the FC protocol, which queries
      // the queue to see if it should throttle
      if (stats.getInternalSerialQueueBytes() > TOTAL_SERIAL_QUEUE_THROTTLE
          && !DistributionMessage.isMembershipMessengerThread()) {
        do {
          boolean interrupted = Thread.interrupted();
          try {
            float throttlePercent = (float) (totalSerialQueueMemSize - TOTAL_SERIAL_QUEUE_THROTTLE)
                / (float) (TOTAL_SERIAL_QUEUE_BYTE_LIMIT - TOTAL_SERIAL_QUEUE_THROTTLE);
            int sleep = (int) (100.0 * throttlePercent);
            sleep = Math.max(sleep, 1);
            Thread.sleep(sleep);
          } catch (InterruptedException ex) {
            interrupted = true;
            // FIXME-InterruptedException
            // Perhaps we should return null here?
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
          stats.getSerialQueueHelper().incThrottleCount();
        } while (stats.getInternalSerialQueueBytes() >= TOTAL_SERIAL_QUEUE_BYTE_LIMIT);
      }
      return executor;
    }

    /*
     * Returns the serial queue executor for the given sender.
     */
    ExecutorService getSerialExecutor(InternalDistributedMember sender) {
      ExecutorService executor;
      Integer queueId = getQueueId(sender, true);
      if ((executor =
          serialQueuedExecutorMap.get(queueId)) != null) {
        return executor;
      }
      // If executor doesn't exists for this sender, create one.
      executor = createSerialExecutor(queueId);

      serialQueuedExecutorMap.put(queueId, executor);

      if (logger.isDebugEnabled()) {
        logger.debug(
            "Created Serial Queued Executor With queueId {}. Total number of live Serial Threads :{}",
            queueId, serialQueuedExecutorMap.size());
      }
      stats.incSerialPooledThread();
      return executor;
    }

    /*
     * Creates a serial queue executor.
     */
    private ExecutorService createSerialExecutor(final Integer id) {

      OverflowQueueWithDMStats<Runnable> poolQueue;

      if (SERIAL_QUEUE_BYTE_LIMIT == 0 || throttlingDisabled) {
        poolQueue = new OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
      } else {
        poolQueue = new ThrottlingMemLinkedQueueWithDMStats<>(SERIAL_QUEUE_BYTE_LIMIT,
            SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE,
            stats.getSerialQueueHelper());
      }

      serialQueuedMap.put(id, poolQueue);

      return CoreLoggingExecutors.newSerialThreadPool(poolQueue,
          "Pooled Serial Message Processor" + id + "-",
          thread -> stats.incSerialPooledThreadStarts(), this::doSerialPooledThread,
          stats.getSerialPooledProcessorHelper(), threadMonitoring);
    }

    private void doSerialPooledThread(Runnable command) {
      ConnectionTable.threadWantsSharedResources();
      try {
        command.run();
      } finally {
        ConnectionTable.releaseThreadsSockets();
      }
    }

    /*
     * Does cleanup relating to this member. And marks the serial executor associated with this
     * member for re-use.
     */
    private void handleMemberDeparture(InternalDistributedMember member) {
      Integer queueId = getQueueId(member, false);
      if (queueId == null) {
        return;
      }

      boolean isUsed = false;

      synchronized (senderToSerialQueueIdMap) {
        senderToSerialQueueIdMap.remove(member);

        // Check if any other members are using the same executor.
        for (Integer value : senderToSerialQueueIdMap.values()) {
          if (value.equals(queueId)) {
            isUsed = true;
            break;
          }
        }

        // If not used mark this as unused.
        if (!isUsed) {
          if (logger.isInfoEnabled(LogMarker.DM_MARKER)) {
            logger.info(LogMarker.DM_MARKER,
                "Marking the SerialQueuedExecutor with id : {} used by the member {} to be unused.",
                new Object[] {queueId, member});
          }

          threadMarkedForUse.add(queueId);
        }
      }
    }

    private void awaitTermination(long time, TimeUnit unit) throws InterruptedException {
      long remainingNanos = unit.toNanos(time);
      long start = System.nanoTime();
      for (ExecutorService executor : serialQueuedExecutorMap.values()) {
        executor.awaitTermination(remainingNanos, TimeUnit.NANOSECONDS);
        remainingNanos = (System.nanoTime() - start);
        if (remainingNanos <= 0) {
          return;
        }
      }
    }

    private void shutdown() {
      for (ExecutorService executor : serialQueuedExecutorMap
          .values()) {
        executor.shutdown();
      }
    }
  }

}
