/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.tier.sockets;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;

import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadState;

import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.ClientSession;
import org.apache.geode.cache.DynamicRegionFactory;
import org.apache.geode.cache.InterestRegistrationEvent;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.client.internal.RegisterInterestTracker;
import org.apache.geode.cache.operations.DestroyOperationContext;
import org.apache.geode.cache.operations.InvalidateOperationContext;
import org.apache.geode.cache.operations.OperationContext;
import org.apache.geode.cache.operations.PutOperationContext;
import org.apache.geode.cache.operations.RegionClearOperationContext;
import org.apache.geode.cache.operations.RegionCreateOperationContext;
import org.apache.geode.cache.operations.RegionDestroyOperationContext;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.cache.CacheDistributionAdvisee;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.InterestRegistrationEventImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.StateFlushOperation;
import org.apache.geode.internal.cache.ha.HAContainerWrapper;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.ha.HARegionQueueAttributes;
import org.apache.geode.internal.cache.ha.HARegionQueueStats;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp;
import org.apache.geode.internal.cache.tier.sockets.command.Get70;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.LogWriterImpl;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.security.AuthorizeRequestPP;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.AccessControl;

/**
 * Class <code>CacheClientProxy</code> represents the server side of the {@link CacheClientUpdater}.
 * It queues messages to be sent from the server to the client. It then reads those messages from
 * the queue and sends them to the client.
 *
 * @since GemFire 4.2
 */
@SuppressWarnings("synthetic-access")
public class CacheClientProxy implements ClientSession {
  private static final Logger logger = LogService.getLogger();

  /**
   * The socket between the server and the client
   */
  protected Socket _socket;

  private final AtomicBoolean _socketClosed = new AtomicBoolean();

  /**
   * A communication buffer used by each message we send to the client
   */
  protected ByteBuffer _commBuffer;

  /**
   * The remote host's IP address string (cached for convenience)
   */
  protected String _remoteHostAddress;

  /**
   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
   */
  protected volatile boolean isMarkedForRemoval = false;

  /**
   * @see #isMarkedForRemoval
   */
  protected final Object isMarkedForRemovalLock = new Object();

  /**
   * The proxy id of the client represented by this proxy
   */
  protected ClientProxyMembershipID proxyID;

  /**
   * The GemFire cache
   */
  protected final InternalCache _cache;

  /**
   * The list of keys that the client represented by this proxy is interested in (stored by region)
   */
  protected final ClientInterestList[] cils = new ClientInterestList[2];

  /**
   * A thread that dispatches messages to the client
   */
  protected volatile MessageDispatcher _messageDispatcher;

  /**
   * The statistics for this proxy
   */
  protected final CacheClientProxyStats _statistics;

  protected final AtomicReference _durableExpirationTask = new AtomicReference();

  protected SystemTimer durableTimer;

  /**
   * Whether this dispatcher is paused
   */
  protected volatile boolean _isPaused = true;

  /**
   * True if we are connected to a client.
   */
  private volatile boolean connected = false;

  /**
   * True if a marker message is still in the ha queue.
   */
  private boolean markerEnqueued = false;

  /**
   * The number of times to peek on shutdown before giving up and shutting down
   */
  protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer
      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50).intValue();

  /**
   * The number of milliseconds to wait for an offering to the message queue
   */
  protected static final int MESSAGE_OFFER_TIME = 0;

  /**
   * The default maximum message queue size
   */
  // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;

  /** The message queue size */
  protected final int _maximumMessageCount;

  /**
   * The time (in seconds ) after which a message in the client queue will expire.
   */
  protected final int _messageTimeToLive;

  /**
   * The <code>CacheClientNotifier</code> registering this proxy.
   */
  protected final CacheClientNotifier _cacheClientNotifier;

  /**
   * Defaults to true; meaning do some logging of dropped client notification messages. Set the
   * system property to true to cause dropped messages to NOT be logged.
   */
  protected static final boolean LOG_DROPPED_MSGS =
      !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableNotificationWarnings");

  /**
   * for testing purposes, delays the start of the dispatcher thread
   */
  @MutableForTesting
  public static boolean isSlowStartForTesting = false;

  /**
   * Default value for slow starting time of dispatcher
   */
  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;

  /**
   * Key in the system property from which the slow starting time value will be retrieved
   */
  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";

  private boolean isPrimary;

  /** @since GemFire 5.7 */
  protected byte clientConflation = Handshake.CONFLATION_DEFAULT;

  /**
   * Flag to indicate whether to keep a durable client's queue alive
   */
  boolean keepalive = false;

  private AccessControl postAuthzCallback;
  private Subject subject;

  /**
   * For multiuser environment..
   */
  private ClientUserAuths clientUserAuths;

  private final Object clientUserAuthsLock = new Object();

  /**
   * The version of the client
   */
  private Version clientVersion;

  /**
   * A map of region name as key and integer as its value. Basically, it stores the names of the
   * regions with <code>DataPolicy</code> as EMPTY. If an event's region name is present in this
   * map, it's full value (and not delta) is sent to the client represented by this proxy.
   *
   * @since GemFire 6.1
   */
  private volatile Map regionsWithEmptyDataPolicy = new HashMap();

  /**
   * A debug flag used for testing Backward compatibility
   */
  @MutableForTesting
  public static boolean AFTER_MESSAGE_CREATION_FLAG = false;

  /**
   * Notify the region when a client interest registration occurs. This tells the region to update
   * access time when an update is to be pushed to a client. It is enabled only for
   * <code>PartitionedRegion</code>s currently.
   */
  protected static final boolean NOTIFY_REGION_ON_INTEREST =
      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");

  /**
   * The AcceptorImpl identifier to which the proxy is connected.
   */
  private final long _acceptorId;

  /** acceptor's setting for notifyBySubscription */
  private final boolean notifyBySubscription;

  /**
   * A counter that keeps track of how many task iterations that have occurred since the last ping
   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
   */
  private final AtomicInteger pingCounter = new AtomicInteger();


  /** Date on which this instances was created */
  private Date creationDate;

  /**
   * true when the durable client associated with this proxy is being restarted and prevents cqs
   * from being closed and drained
   **/
  private boolean drainLocked = false;
  private final Object drainLock = new Object();

  /** number of cq drains that are currently in progress **/
  private int numDrainsInProgress = 0;
  private final Object drainsInProgressLock = new Object();

  private final SecurityService securityService;
  private final StatisticsClock statisticsClock;

  /**
   * Constructor.
   *
   * @param ccn The <code>CacheClientNotifier</code> registering this proxy
   * @param socket The socket between the server and the client
   * @param proxyID representing the Connection Proxy of the clien
   * @param isPrimary The boolean stating whether this prozxy is primary
   * @throws CacheException {
   */
  protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
      ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
      Version clientVersion, long acceptorId, boolean notifyBySubscription,
      SecurityService securityService, Subject subject, StatisticsClock statisticsClock)
      throws CacheException {

    initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
    this._cacheClientNotifier = ccn;
    this._cache = ccn.getCache();
    this.securityService = securityService;
    this._maximumMessageCount = ccn.getMaximumMessageCount();
    this._messageTimeToLive = ccn.getMessageTimeToLive();
    this._acceptorId = acceptorId;
    this.notifyBySubscription = notifyBySubscription;
    StatisticsFactory factory = this._cache.getInternalDistributedSystem().getStatisticsManager();
    this.statisticsClock = statisticsClock;
    this._statistics =
        new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId()
            + "_at_" + this._remoteHostAddress);
    this.subject = subject;

    // Create the interest list
    this.cils[RegisterInterestTracker.interestListIndex] =
        new ClientInterestList(this, this.proxyID);
    // Create the durable interest list
    this.cils[RegisterInterestTracker.durableInterestListIndex] =
        new ClientInterestList(this, this.getDurableId());
    this.postAuthzCallback = null;
    this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
    this.creationDate = new Date();
    initializeClientAuths();
  }

  private void initializeClientAuths() {
    if (AcceptorImpl.isPostAuthzCallbackPresent())
      this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
  }

  private void reinitializeClientAuths() {
    if (this.clientUserAuths != null && AcceptorImpl.isPostAuthzCallbackPresent()) {
      synchronized (this.clientUserAuthsLock) {
        ClientUserAuths newClientAuth = ServerConnection.getClientUserAuths(this.proxyID);
        newClientAuth.fillPreviousCQAuth(this.clientUserAuths);
        this.clientUserAuths = newClientAuth;
      }
    }
  }

  public void setPostAuthzCallback(AccessControl authzCallback) {
    // TODO:hitesh synchronization
    synchronized (this.clientUserAuthsLock) {
      if (this.postAuthzCallback != null)
        this.postAuthzCallback.close();
      this.postAuthzCallback = authzCallback;
    }
  }

  public void setSubject(Subject subject) {
    // TODO:hitesh synchronization
    synchronized (this.clientUserAuthsLock) {
      if (this.subject != null) {
        this.subject.logout();
      }
      this.subject = subject;
    }
  }

  public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable) {
    if (postAuthzCallback == null) // only for multiuser
    {
      if (this.clientUserAuths != null)
        this.clientUserAuths.setUserAuthAttributesForCq(cqName, uniqueId, isDurable);
    }
  }

  private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip,
      byte cc, Version vers) {
    this._socket = socket;
    this.proxyID = pid;
    this.connected = true;
    {
      int bufSize = 1024;
      try {
        bufSize = _socket.getSendBufferSize();
        if (bufSize < 1024) {
          bufSize = 1024;
        }
      } catch (SocketException ignore) {
      }
      this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket);
    }
    this._remoteHostAddress = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
    this.isPrimary = ip;
    this.clientConflation = cc;
    this.clientVersion = vers;
  }

  public boolean isMarkerEnqueued() {
    return markerEnqueued;
  }

  public void setMarkerEnqueued(boolean bool) {
    markerEnqueued = bool;
  }

  public long getAcceptorId() {
    return this._acceptorId;
  }

  /**
   * @return the notifyBySubscription
   */
  public boolean isNotifyBySubscription() {
    return this.notifyBySubscription;
  }


  /**
   * Returns the DistributedMember represented by this proxy
   */
  public ClientProxyMembershipID getProxyID() {
    return this.proxyID;
  }

  protected boolean isMember(ClientProxyMembershipID memberId) {
    return this.proxyID.equals(memberId);
  }

  protected boolean isSameDSMember(ClientProxyMembershipID memberId) {
    return this.proxyID.isSameDSMember(memberId);
  }

  /**
   * Set the queue keepalive option
   *
   * @param option whether to keep the durable client's queue alive
   */
  protected void setKeepAlive(boolean option) {
    this.keepalive = option;
  }

  /**
   * Returns the socket between the server and the client
   *
   * @return the socket between the server and the client
   */
  protected Socket getSocket() {
    return this._socket;
  }

  public String getSocketHost() {
    return this._socket.getInetAddress().getHostAddress();
  }

  protected ByteBuffer getCommBuffer() {
    return this._commBuffer;
  }

  /**
   * Returns the remote host's IP address string
   *
   * @return the remote host's IP address string
   */
  protected String getRemoteHostAddress() {
    return this._remoteHostAddress;
  }

  /**
   * Returns the remote host's port
   *
   * @return the remote host's port
   */
  public int getRemotePort() {
    return this._socket.getPort();
  }

  /**
   * Returns whether the proxy is connected to a remote client
   *
   * @return whether the proxy is connected to a remote client
   */
  public boolean isConnected() {
    return this.connected;
  }

  /**
   * Mark the receiver as needing removal
   *
   * @return true if it was already marked for removal
   */
  protected boolean startRemoval() {
    boolean result;
    synchronized (this.isMarkedForRemovalLock) {
      result = this.isMarkedForRemoval;
      this.isMarkedForRemoval = true;
    }
    return result;
  }

  /**
   * Wait until the receiver's removal has completed before returning.
   *
   * @return true if the proxy was initially marked for removal
   */
  protected boolean waitRemoval() {
    boolean result;
    synchronized (this.isMarkedForRemovalLock) {
      result = this.isMarkedForRemoval;
      boolean interrupted = false;
      try {
        while (this.isMarkedForRemoval) {
          if (logger.isDebugEnabled()) {
            logger.debug("Waiting for CacheClientProxy removal: {}", this);
          }
          try {
            this.isMarkedForRemovalLock.wait();
          } catch (InterruptedException e) {
            interrupted = true;
            this._cache.getCancelCriterion().checkCancelInProgress(e);
          }
        } // while
      } finally {
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }
    } // synchronized
    return result;
  }

  /**
   * Indicate that removal has completed on this instance
   */
  protected void notifyRemoval() {
    synchronized (this.isMarkedForRemovalLock) {
      this.isMarkedForRemoval = false;
      this.isMarkedForRemovalLock.notifyAll();
    }
  }

  /**
   * Returns the GemFire cache
   *
   * @return the GemFire cache
   */
  public InternalCache getCache() {
    return this._cache;
  }

  public Set<String> getInterestRegisteredRegions() {
    HashSet<String> regions = new HashSet<String>();
    for (int i = 0; i < this.cils.length; i++) {
      if (!this.cils[i].regions.isEmpty()) {
        regions.addAll(this.cils[i].regions);
      }
    }
    return regions;
  }

  /**
   * Returns the proxy's statistics
   *
   * @return the proxy's statistics
   */
  public CacheClientProxyStats getStatistics() {
    return this._statistics;
  }

  /**
   * Returns this proxy's <code>CacheClientNotifier</code>.
   *
   * @return this proxy's <code>CacheClientNotifier</code>
   */
  protected CacheClientNotifier getCacheClientNotifier() {
    return this._cacheClientNotifier;
  }

  /**
   * Returns the size of the queue for heuristic purposes. This size may be changing concurrently if
   * puts/gets are occurring at the same time.
   */
  public int getQueueSize() {
    return this._messageDispatcher == null ? 0 : this._messageDispatcher.getQueueSize();
  }

  /**
   * returns the queue size calculated through stats
   */
  public int getQueueSizeStat() {
    return this._messageDispatcher == null ? 0 : this._messageDispatcher.getQueueSizeStat();
  }


  public boolean drainInProgress() {
    synchronized (drainsInProgressLock) {
      return numDrainsInProgress > 0;
    }
  }

  // Called from CacheClientNotifier when attempting to restart paused proxy
  // locking the drain lock requires that no drains are in progress
  // when the lock was acquired.
  public boolean lockDrain() {
    synchronized (drainsInProgressLock) {
      if (!drainInProgress()) {
        synchronized (drainLock) {
          if (testHook != null) {
            testHook.doTestHook("PRE_ACQUIRE_DRAIN_LOCK_UNDER_SYNC");
          }
          // prevent multiple lockings of drain lock
          if (!drainLocked) {
            drainLocked = true;
            return true;
          }
        }
      }
    }
    return false;
  }

  // Called from CacheClientNotifier when completed restart of proxy
  public void unlockDrain() {
    if (testHook != null) {
      testHook.doTestHook("PRE_RELEASE_DRAIN_LOCK");
    }
    synchronized (drainLock) {
      drainLocked = false;
    }
  }

  // Only close the client cq if it is paused and no one is attempting to restart the proxy
  public boolean closeClientCq(String clientCQName) throws CqException {
    if (testHook != null) {
      testHook.doTestHook("PRE_DRAIN_IN_PROGRESS");
    }
    synchronized (drainsInProgressLock) {
      numDrainsInProgress++;
    }
    if (testHook != null) {
      testHook.doTestHook("DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK");
    }
    try {
      // If the drain lock was acquired, the other thread did so before we could bump up
      // the numDrainsInProgress. That means we need to stop.
      if (drainLocked) {
        // someone is trying to restart a paused proxy
        String msg =
            String.format(
                "CacheClientProxy: Could not drain cq %s due to client proxy id %s reconnecting.",
                clientCQName, proxyID.getDurableId());
        logger.info(msg);
        throw new CqException(msg);
      }
      // isConnected is to protect against the case where a durable client has reconnected
      // but has not yet sent a ready for events message
      // we can probably remove the isPaused check
      if (isPaused() && !isConnected()) {
        CqService cqService = getCache().getCqService();
        if (cqService != null) {
          InternalCqQuery cqToClose =
              cqService.getCq(cqService.constructServerCqName(clientCQName, this.proxyID));
          // close and drain
          if (cqToClose != null) {
            cqService.closeCq(clientCQName, this.proxyID);
            this._messageDispatcher.drainClientCqEvents(this.proxyID, cqToClose);
          } else {
            String msg = String.format("CQ Not found, Failed to close the specified CQ %s",
                clientCQName);
            logger.info(msg);
            throw new CqException(msg);
          }
        }
      } else {
        String msg =
            String.format(
                "CacheClientProxy: Could not drain cq %s because client proxy id %s is connected.",
                clientCQName, proxyID.getDurableId());
        logger.info(msg);
        throw new CqException(msg);
      }
    } finally {
      synchronized (drainsInProgressLock) {
        numDrainsInProgress--;
      }
      if (testHook != null) {
        testHook.doTestHook("DRAIN_COMPLETE");
      }

    }
    return true;
  }


  /**
   * Returns whether the proxy is alive. It is alive if its message dispatcher is processing
   * messages.
   *
   * @return whether the proxy is alive
   */
  protected boolean isAlive() {
    if (this._messageDispatcher == null) {
      return false;
    }
    return !this._messageDispatcher.isStopped();
  }

  /**
   * Returns whether the proxy is paused. It is paused if its message dispatcher is paused. This
   * only applies to durable clients.
   *
   * @return whether the proxy is paused
   *
   * @since GemFire 5.5
   */
  public boolean isPaused() {
    return this._isPaused;
  }

  protected void setPaused(boolean isPaused) {
    this._isPaused = isPaused;
  }

  /**
   * Closes the proxy. This method checks the message queue for any unprocessed messages and
   * processes them for MAXIMUM_SHUTDOWN_PEEKS.
   *
   * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
   */
  protected void close() {
    close(true, false);
  }

  /**
   * Set to true once this proxy starts being closed. Remains true for the rest of its existence.
   */
  private final AtomicBoolean closing = new AtomicBoolean(false);

  /**
   * Close the <code>CacheClientProxy</code>.
   *
   * @param checkQueue Whether to message check the queue and process any contained messages (up to
   *        MAXIMUM_SHUTDOWN_PEEKS).
   * @param stoppedNormally Whether client stopped normally
   *
   * @return whether to keep this <code>CacheClientProxy</code>
   * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
   */
  protected boolean close(boolean checkQueue, boolean stoppedNormally) {
    boolean pauseDurable = false;
    // If the client is durable and either (a) it hasn't stopped normally or (b) it
    // has stopped normally but it is configured to be kept alive, set pauseDurable
    // to true
    if (isDurable() && (!stoppedNormally || (getDurableKeepAlive() && stoppedNormally))) {
      pauseDurable = true;
    }

    boolean keepProxy = false;
    if (pauseDurable) {
      pauseDispatching();
      keepProxy = true;
    } else {
      terminateDispatching(checkQueue);
      closeTransientFields();
    }

    this.connected = false;

    // Close the Authorization callback (if any)
    try {
      if (!pauseDurable) {
        if (this.postAuthzCallback != null) {// for single user
          this.postAuthzCallback.close();
          this.postAuthzCallback = null;
        } else if (this.clientUserAuths != null) {// for multiple users
          this.clientUserAuths.cleanup(true);
          this.clientUserAuths = null;
        }
      }
    } catch (Exception ex) {
      if (this._cache.getSecurityLogger().warningEnabled()) {
        this._cache.getSecurityLogger().warning(String.format("%s : %s",
            new Object[] {this, ex}));
      }
    }
    // Notify the caller whether to keep this proxy. If the proxy is durable
    // and should be paused, then return true; otherwise return false.
    return keepProxy;
  }

  protected void pauseDispatching() {
    if (this._messageDispatcher == null) {
      return;
    }

    // If this is the primary, pause the dispatcher (which closes its transient
    // fields. Otherwise, just close the transient fields.
    if (logger.isDebugEnabled()) {
      logger.debug("{}: Pausing processing", this);
    }
    // BUGFIX for BUG#38234
    if (!testAndSetPaused(true) && this.isPrimary) {
      if (this._messageDispatcher != Thread.currentThread()) {
        // don't interrupt ourself to fix bug 40611
        this._messageDispatcher.interrupt();
      }
    }

    try {
      // Close transient fields
      closeTransientFields();
    } finally {
      // make sure this gets called if closeTransientFields throws; see bug 40611
      // Start timer
      scheduleDurableExpirationTask();
    }
  }

  private boolean testAndSetPaused(boolean newValue) {

    synchronized (this._messageDispatcher._pausedLock) {
      if (this._isPaused != newValue) {
        this._isPaused = newValue;
        this._messageDispatcher._pausedLock.notifyAll();
        return !this._isPaused;
      } else {
        this._messageDispatcher._pausedLock.notifyAll();
        return this._isPaused;
      }
    }
  }

  protected void terminateDispatching(boolean checkQueue) {
    if (this._messageDispatcher == null) {
      return;
    }

    boolean closedSocket = false;
    try {
      if (logger.isDebugEnabled()) {
        logger.debug("{}: Terminating processing", this);
      }
      if (this._messageDispatcher == Thread.currentThread()) {
        // I'm not even sure this is possible but if the dispatcher
        // calls us then at least call stopDispatching
        // the old code did this (I'm not even sure it is safe to do).
        // This needs to be done without testing OR setting "closing".
        this._messageDispatcher.stopDispatching(checkQueue);
        this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
        this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList();
        // VJR: bug 37487 fix
        destroyRQ();
        return;
      }

      if (!this.closing.compareAndSet(false, true)) {
        // must already be closing so just return
        // this is part of the fix for 37684
        return;
      }
      // Unregister interest in all interests (if necessary)
      this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
      this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList();

      // If the message dispatcher is paused, unpause it. The next bit of
      // code will interrupt the waiter.
      if (this.testAndSetPaused(false)) {
        if (logger.isDebugEnabled()) {
          logger.debug("{}: Paused but terminating processing", this);
        }
        // Cancel the expiration task
        cancelDurableExpirationTask(false);
      }

      boolean alreadyDestroyed = false;
      boolean gotInterrupt = Thread.interrupted(); // clears the flag
      try {
        // Stop the message dispatcher
        this._messageDispatcher.stopDispatching(checkQueue);

        gotInterrupt |= Thread.interrupted(); // clears the flag

        // to fix bug 37684
        // 1. check to see if dispatcher is still alive
        if (this._messageDispatcher.isAlive()) {
          closedSocket = closeSocket();
          destroyRQ();
          alreadyDestroyed = true;
          this._messageDispatcher.interrupt();
          if (this._messageDispatcher.isAlive()) {
            try {
              this._messageDispatcher.join(1000);
            } catch (InterruptedException ex) {
              gotInterrupt = true;
            }
            // if it is still alive then warn and move on
            if (this._messageDispatcher.isAlive()) {
              // org.apache.geode.internal.OSProcess.printStacks(org.apache.geode.internal.OSProcess.getId());
              logger.warn("{}: Could not stop message dispatcher thread.",
                  this);
            }
          }
        }
      } finally {
        if (gotInterrupt) {
          Thread.currentThread().interrupt();
        }
        if (!alreadyDestroyed) {
          destroyRQ();
        }
      }
    } finally {
      // Close the statistics
      this._statistics.close(); // fix for bug 40105
      if (closedSocket) {
        closeOtherTransientFields();
      } else {
        closeTransientFields(); // make sure this happens
      }
    }
  }

  private boolean closeSocket() {
    String remoteHostAddress = this._remoteHostAddress;
    if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress != null) {
      // Only one thread is expected to close the socket
      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, remoteHostAddress, null);
      getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
      return true;
    }
    return false;
  }

  private void closeTransientFields() {
    if (!closeSocket()) {
      // The thread who closed the socket will be responsible to
      // releaseResourcesForAddress and clearClientInterestList
      return;
    }

    closeOtherTransientFields();
  }

  private void closeOtherTransientFields() {
    // Null out comm buffer, host address, ports and proxy id. All will be
    // replaced when the client reconnects.
    releaseCommBuffer();
    {
      String remoteHostAddress = this._remoteHostAddress;
      if (remoteHostAddress != null) {
        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
        this._remoteHostAddress = null;
      }
    }
    try {
      this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
    } catch (CancelException e) {
      // ignore if cache is shutting down
    }
    // Commented to fix bug 40259
    // this.clientVersion = null;
    closeNonDurableCqs();

    // Logout the subject
    if (this.subject != null) {
      this.subject.logout();
    }
  }

  private void releaseCommBuffer() {
    ByteBuffer bb = this._commBuffer;
    if (bb != null) {
      this._commBuffer = null;
      ServerConnection.releaseCommBuffer(bb);
    }
  }

  private void closeNonDurableCqs() {
    CqService cqService = getCache().getCqService();
    if (cqService != null) {
      try {
        cqService.closeNonDurableClientCqs(getProxyID());
      } catch (CqException ex) {
        logger.warn("CqException while closing non durable Cqs. {}",
            ex.getLocalizedMessage());
      }
    }
  }

  private void destroyRQ() {
    if (this._messageDispatcher == null) {
      return;
    }
    try {
      // Using Destroy Region bcoz this method is modified in HARegion so as
      // not to distribute.
      // For normal Regions , even the localDestroyRegion actually propagates
      HARegionQueue rq = this._messageDispatcher._messageQueue;
      rq.destroy();

      // if (!rq.getRegion().isDestroyed()) {
      // rq.getRegion().destroyRegion();
      // }
    } catch (RegionDestroyedException rde) {
      // throw rde;
    } catch (CancelException e) {
      // throw e;
    } catch (Exception warning) {
      logger.warn(
          String.format("%s: Exception in closing the underlying HARegion of the HARegionQueue",
              this),
          warning);
    }
  }

  @Override
  public void registerInterestRegex(String regionName, String regex, boolean isDurable) {
    registerInterestRegex(regionName, regex, isDurable, true);
  }

  @Override
  public void registerInterestRegex(String regionName, String regex, boolean isDurable,
      boolean receiveValues) {
    if (this.isPrimary) {
      // Notify all secondaries and client of change in interest
      notifySecondariesAndClient(regionName, regex, InterestResultPolicy.NONE, isDurable,
          receiveValues, InterestType.REGULAR_EXPRESSION);
    } else {
      throw new IllegalStateException(
          "This process is not the primary server for the given client");
    }
  }

  @Override
  public void registerInterest(String regionName, Object keyOfInterest, InterestResultPolicy policy,
      boolean isDurable) {
    registerInterest(regionName, keyOfInterest, policy, isDurable, true);
  }

  @Override
  public void registerInterest(String regionName, Object keyOfInterest, InterestResultPolicy policy,
      boolean isDurable, boolean receiveValues) {
    if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
      registerInterestRegex(regionName, ".*", isDurable, receiveValues);
    } else if (keyOfInterest instanceof List) {
      if (this.isPrimary) {
        notifySecondariesAndClient(regionName, keyOfInterest, policy, isDurable, receiveValues,
            InterestType.KEY);
      } else {
        throw new IllegalStateException(
            "This process is not the primary server for the given client");
      }
    } else {
      if (this.isPrimary) {
        // Notify all secondaries and client of change in interest
        notifySecondariesAndClient(regionName, keyOfInterest, policy, isDurable, receiveValues,
            InterestType.KEY);

        // Enqueue the initial value message for the client if necessary
        if (policy == InterestResultPolicy.KEYS_VALUES) {
          enqueueInitialValue(null, regionName, keyOfInterest);
        }
        // Add the client to the region's filters
        // addFilterRegisteredClients(regionName, keyOfInterest);
      } else {
        throw new IllegalStateException(
            "This process is not the primary server for the given client");
      }
    }
  }

  private void notifySecondariesAndClient(String regionName, Object keyOfInterest,
      InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) {
    // Create a client interest message for the keyOfInterest
    ClientInterestMessageImpl message = new ClientInterestMessageImpl(
        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
        policy.getOrdinal(), isDurable, !receiveValues, ClientInterestMessageImpl.REGISTER);

    // Notify all secondary proxies of a change in interest
    notifySecondariesOfInterestChange(message);

    // Modify interest registration
    if (keyOfInterest instanceof List) {
      registerClientInterestList(regionName, (List) keyOfInterest, isDurable, !receiveValues, true);
    } else {
      registerClientInterest(regionName, keyOfInterest, interestType, isDurable, !receiveValues,
          true);
    }

    // Enqueue the interest registration message for the client.
    enqueueInterestRegistrationMessage(message);
  }

  private void enqueueInitialValue(ClientInterestMessageImpl clientInterestMessage,
      String regionName, Object keyOfInterest) {
    // Get the initial value
    Get70 request = (Get70) Get70.getCommand();
    LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
    Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, null);
    boolean isObject = entry.isObject;
    byte[] value = null;

    // If the initial value is not null, add it to the client's queue
    if (entry.value != null) {
      if (entry.value instanceof byte[]) {
        value = (byte[]) entry.value;
      } else {
        try {
          value = CacheServerHelper.serialize(entry.value);
        } catch (IOException e) {
          logger.warn(
              String.format("The following exception occurred while attempting to serialize %s",
                  entry.value),
              e);
        }
      }
      VersionTag tag = entry.versionTag;

      // Initialize the event id.
      EventID eventId = null;
      if (clientInterestMessage == null) {
        // If the clientInterestMessage is null, create a new event id
        eventId = new EventID(this._cache.getDistributedSystem());
      } else {
        // If the clientInterestMessage is not null, base the event id off its event id to fix
        // GEM-794.
        // This will cause the updateMessage created below to have the same event id as the one
        // created
        // in the primary.
        eventId = new EventID(clientInterestMessage.getEventId(), 1);
      }
      ClientUpdateMessage updateMessage =
          new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE, lr, keyOfInterest, value,
              null, (isObject ? (byte) 0x01 : (byte) 0x00), null, this.proxyID, eventId, tag);
      CacheClientNotifier.routeSingleClientMessage(updateMessage, this.proxyID);
    }
  }

  private void enqueueInterestRegistrationMessage(ClientInterestMessageImpl message) {
    // Enqueue the interest registration message for the client.
    // If the client is not 7.0.1 or greater and the key of interest is a list,
    // then create an individual message for each entry in the list since the
    // client doesn't support a ClientInterestMessageImpl containing a list.
    if (Version.GFE_701.compareTo(this.clientVersion) > 0
        && message.getKeyOfInterest() instanceof List) {
      for (Iterator i = ((List) message.getKeyOfInterest()).iterator(); i.hasNext();) {
        this._messageDispatcher.enqueueMessage(
            new ClientInterestMessageImpl(getCache().getDistributedSystem(), message, i.next()));
      }
    } else {
      this._messageDispatcher.enqueueMessage(message);
    }
  }

  @Override
  public void unregisterInterestRegex(String regionName, String regex, boolean isDurable) {
    unregisterInterestRegex(regionName, regex, isDurable, true);
  }

  @Override
  public void unregisterInterestRegex(String regionName, String regex, boolean isDurable,
      boolean receiveValues) {
    if (this.isPrimary) {
      notifySecondariesAndClient(regionName, regex, isDurable, receiveValues,
          InterestType.REGULAR_EXPRESSION);
    } else {
      throw new IllegalStateException(
          "This process is not the primary server for the given client");
    }
  }

  @Override
  public void unregisterInterest(String regionName, Object keyOfInterest, boolean isDurable) {
    unregisterInterest(regionName, keyOfInterest, isDurable, true);
  }

  @Override
  public void unregisterInterest(String regionName, Object keyOfInterest, boolean isDurable,
      boolean receiveValues) {
    if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
      unregisterInterestRegex(regionName, ".*", isDurable, receiveValues);
    } else {
      if (this.isPrimary) {
        notifySecondariesAndClient(regionName, keyOfInterest, isDurable, receiveValues,
            InterestType.KEY);
      } else {
        throw new IllegalStateException(
            "This process is not the primary server for the given client");
      }
    }
  }

  private void notifySecondariesAndClient(String regionName, Object keyOfInterest,
      boolean isDurable, boolean receiveValues, int interestType) {
    // Notify all secondary proxies of a change in interest
    ClientInterestMessageImpl message = new ClientInterestMessageImpl(
        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
        (byte) 0, isDurable, !receiveValues, ClientInterestMessageImpl.UNREGISTER);
    notifySecondariesOfInterestChange(message);

    // Modify interest registration
    if (keyOfInterest instanceof List) {
      unregisterClientInterest(regionName, (List) keyOfInterest, false);
    } else {
      unregisterClientInterest(regionName, keyOfInterest, interestType, false);
    }

    // Enqueue the interest unregistration message for the client.
    enqueueInterestRegistrationMessage(message);
  }

  protected void notifySecondariesOfInterestChange(ClientInterestMessageImpl message) {
    if (logger.isDebugEnabled()) {
      StringBuffer subBuffer = new StringBuffer();
      if (message.isRegister()) {
        subBuffer.append("register ").append(message.getIsDurable() ? "" : "non-")
            .append("durable interest in ");
      } else {
        subBuffer.append("unregister interest in ");
      }
      StringBuffer buffer = new StringBuffer();
      buffer.append(this).append(": Notifying secondary proxies to ").append(subBuffer.toString())
          .append(message.getRegionName()).append("->").append(message.getKeyOfInterest())
          .append("->").append(InterestType.getString(message.getInterestType()));
      logger.debug(buffer.toString());
    }
    this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
  }

  /*
   * protected void addFilterRegisteredClients(String regionName, Object keyOfInterest) { try {
   * this._cacheClientNotifier.addFilterRegisteredClients(regionName, this.proxyID); } catch
   * (RegionDestroyedException e) {
   * logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" +
   * keyOfInterest, e); } }
   */

  /**
   * Registers interest in the input region name and key
   *
   * @param regionName The fully-qualified name of the region in which to register interest
   * @param keyOfInterest The key in which to register interest
   */
  protected void registerClientInterest(String regionName, Object keyOfInterest, int interestType,
      boolean isDurable, boolean sendUpdatesAsInvalidates, boolean flushState) {
    ClientInterestList cil =
        this.cils[RegisterInterestTracker.getInterestLookupIndex(isDurable, false)];
    cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
    if (flushState) {
      flushForInterestRegistration(regionName,
          this._cache.getDistributedSystem().getDistributedMember());
    }
    HARegionQueue queue = getHARegionQueue();
    if (queue != null) { // queue is null during initialization
      queue.setHasRegisteredInterest(true);
    }
  }

  /**
   * flush other regions to the given target. This is usually the member that is registering the
   * interest. During queue creation it is the queue's image provider.
   */
  public void flushForInterestRegistration(String regionName, DistributedMember target) {
    Region r = this._cache.getRegion(regionName);
    if (r == null) {
      if (logger.isDebugEnabled()) {
        logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
      }
    } else if (r.getAttributes().getScope().isDistributed()) {
      if (logger.isDebugEnabled()) {
        logger.debug("Flushing region '{}' for interest registration", regionName);
      }
      CacheDistributionAdvisee cd = (CacheDistributionAdvisee) r;
      final StateFlushOperation sfo;
      if (r instanceof PartitionedRegion) {
        // need to flush all buckets. SFO should be changed to target buckets
        // belonging to a particular PR, but it doesn't have that option right now
        sfo = new StateFlushOperation(
            this._cache.getInternalDistributedSystem().getDistributionManager());
      } else {
        sfo = new StateFlushOperation((DistributedRegion) r);
      }
      try {
        // bug 41681 - we need to flush any member that may have a cache operation
        // in progress so that the changes are received there before returning
        // from this method
        InitialImageAdvice advice = cd.getCacheDistributionAdvisor().adviseInitialImage(null);
        HashSet recips = new HashSet(advice.getReplicates());
        recips.addAll(advice.getUninitialized());
        recips.addAll(advice.getEmpties());
        recips.addAll(advice.getPreloaded());
        recips.addAll(advice.getOthers());
        sfo.flush(recips, target, OperationExecutors.HIGH_PRIORITY_EXECUTOR, true);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        return;
      }
    }
  }

  /**
   * Unregisters interest in the input region name and key
   *
   * @param regionName The fully-qualified name of the region in which to unregister interest
   * @param keyOfInterest The key in which to unregister interest
   * @param isClosing Whether the caller is closing
   */
  protected void unregisterClientInterest(String regionName, Object keyOfInterest, int interestType,
      boolean isClosing) {
    // only unregister durable interest if isClosing and !keepalive
    if (!isClosing /* explicit unregister */
        || !getDurableKeepAlive() /* close and no keepAlive */) {
      this.cils[RegisterInterestTracker.durableInterestListIndex]
          .unregisterClientInterest(regionName, keyOfInterest, interestType);
    }
    // always unregister non durable interest
    this.cils[RegisterInterestTracker.interestListIndex].unregisterClientInterest(regionName,
        keyOfInterest, interestType);
  }

  /**
   * Registers interest in the input region name and list of keys
   *
   * @param regionName The fully-qualified name of the region in which to register interest
   * @param keysOfInterest The list of keys in which to register interest
   */
  protected void registerClientInterestList(String regionName, List keysOfInterest,
      boolean isDurable, boolean sendUpdatesAsInvalidates, boolean flushState) {
    // we only use two interest lists to map the non-durable and durable
    // identifiers to their interest settings
    ClientInterestList cil = this.cils[RegisterInterestTracker.getInterestLookupIndex(isDurable,
        false/* sendUpdatesAsInvalidates */)];
    cil.registerClientInterestList(regionName, keysOfInterest, sendUpdatesAsInvalidates);
    if (getHARegionQueue() != null) {
      if (flushState) {
        flushForInterestRegistration(regionName,
            this._cache.getDistributedSystem().getDistributedMember());
      }
      getHARegionQueue().setHasRegisteredInterest(true);
    }
  }

  /**
   * Unregisters interest in the input region name and list of keys
   *
   * @param regionName The fully-qualified name of the region in which to unregister interest
   * @param keysOfInterest The list of keys in which to unregister interest
   * @param isClosing Whether the caller is closing
   */
  protected void unregisterClientInterest(String regionName, List keysOfInterest,
      boolean isClosing) {
    // only unregister durable interest if isClosing and !keepalive
    if (!isClosing /* explicit unregister */
        || !getDurableKeepAlive() /* close and no keepAlive */) {
      this.cils[RegisterInterestTracker.durableInterestListIndex]
          .unregisterClientInterestList(regionName, keysOfInterest);
    }
    // always unregister non durable interest
    this.cils[RegisterInterestTracker.interestListIndex].unregisterClientInterestList(regionName,
        keysOfInterest);
  }


  /** sent by the cache client notifier when there is an interest registration change */
  protected void processInterestMessage(ClientInterestMessageImpl message) {
    // Register or unregister interest depending on the interest type
    int interestType = message.getInterestType();
    String regionName = message.getRegionName();
    Object key = message.getKeyOfInterest();
    if (message.isRegister()) {
      // Register interest in this region->key
      if (key instanceof List) {
        registerClientInterestList(regionName, (List) key, message.getIsDurable(),
            message.getForUpdatesAsInvalidates(), true);
      } else {
        registerClientInterest(regionName, key, interestType, message.getIsDurable(),
            message.getForUpdatesAsInvalidates(), true);
      }

      // Add the client to the region's filters
      // addFilterRegisteredClients(regionName, key);

      if (logger.isDebugEnabled()) {
        StringBuffer buffer = new StringBuffer();
        buffer.append(this).append(": Interest listener registered ")
            .append(message.getIsDurable() ? "" : "non-").append("durable interest in ")
            .append(message.getRegionName()).append("->").append(message.getKeyOfInterest())
            .append("->").append(InterestType.getString(message.getInterestType()));
        logger.debug(buffer.toString());
      }
    } else {
      // Unregister interest in this region->key
      if (key instanceof List) {
        unregisterClientInterest(regionName, (List) key, false);
      } else {
        unregisterClientInterest(regionName, key, interestType, false);
      }

      if (logger.isDebugEnabled()) {
        StringBuffer buffer = new StringBuffer();
        buffer.append(this).append(": Interest listener unregistered interest in ")
            .append(message.getRegionName()).append("->").append(message.getKeyOfInterest())
            .append("->").append(InterestType.getString(message.getInterestType()));
        logger.debug(buffer.toString());
      }
    }

    // Enqueue the interest message in this secondary proxy (fix for bug #52088)
    enqueueInterestRegistrationMessage(message);

    // Enqueue the initial value if the message is register on a key that is not a list (fix for bug
    // #52088)
    if (message.isRegister() && message.getInterestType() == InterestType.KEY
        && !(key instanceof List) && InterestResultPolicy
            .fromOrdinal(message.getInterestResultPolicy()) == InterestResultPolicy.KEYS_VALUES) {
      enqueueInitialValue(message, regionName, key);
    }
  }

  private boolean postDeliverAuthCheckPassed(ClientUpdateMessage clientMessage) {
    // Before adding it in the queue for dispatching, check for post
    // process authorization
    if (AcceptorImpl.isAuthenticationRequired() && this.postAuthzCallback == null
        && AcceptorImpl.isPostAuthzCallbackPresent()) {
      // security is on and callback is null: it means multiuser mode.
      ClientUpdateMessageImpl cumi = (ClientUpdateMessageImpl) clientMessage;

      CqNameToOp clientCq = cumi.getClientCq(this.proxyID);

      if (clientCq != null && !clientCq.isEmpty()) {
        if (logger.isDebugEnabled()) {
          logger.debug("CCP clientCq size before processing auth {}", clientCq.size());
        }
        String[] regionNameHolder = new String[1];
        OperationContext opctxt = getOperationContext(clientMessage, regionNameHolder);
        if (opctxt == null) {
          logger.warn(
              "{}: Not Adding message to queue: {} because the operation context object could not be obtained for this client message.",
              new Object[] {this, clientMessage});
          return false;
        }

        String[] cqNames = clientCq.getNames();
        if (logger.isDebugEnabled()) {
          logger.debug("CCP clientCq names array size {}", cqNames.length);
        }
        for (int i = 0; i < cqNames.length; i++) {
          try {
            if (logger.isDebugEnabled()) {
              logger.debug("CCP clientCq name {}", cqNames[i]);
            }
            boolean isAuthorized = false;

            if (this.proxyID.isDurable() && this.getDurableKeepAlive() && this._isPaused) {
              // need to take lock as we may be reinitializing proxy cache
              synchronized (this.clientUserAuthsLock) {
                AuthorizeRequestPP postAuthCallback =
                    this.clientUserAuths.getUserAuthAttributes(cqNames[i]).getPostAuthzRequest();
                if (logger.isDebugEnabled() && postAuthCallback == null) {
                  logger.debug("CCP clientCq post callback is null");
                }
                if (postAuthCallback != null && postAuthCallback.getPostAuthzCallback()
                    .authorizeOperation(regionNameHolder[0], opctxt)) {
                  isAuthorized = true;
                }
              }
            } else {
              UserAuthAttributes userAuthAttributes =
                  this.clientUserAuths.getUserAuthAttributes(cqNames[i]);

              AuthorizeRequestPP postAuthCallback = userAuthAttributes.getPostAuthzRequest();
              if (postAuthCallback == null && logger.isDebugEnabled()) {
                logger.debug("CCP clientCq post callback is null");
              }
              if (postAuthCallback != null && postAuthCallback.getPostAuthzCallback()
                  .authorizeOperation(regionNameHolder[0], opctxt)) {
                isAuthorized = true;
              }
            }

            if (!isAuthorized) {
              logger.warn("{}: Not Adding CQ message to queue {} because authorization failed.",
                  new Object[] {this, clientMessage});
              clientCq.delete(cqNames[i]);
            }
          } catch (Exception ex) {
            // ignore...
          }
          if (logger.isDebugEnabled()) {
            logger.debug("CCP clientCq size after processing auth {}", clientCq.size());
          }
        }
        // again need to check as there may be no CQ available
        if (!clientMessage.hasCqs(this.proxyID)) {
          this._statistics.incMessagesNotQueuedNotInterested();
          if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
            logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE,
                "{}: Not adding message to queue. It is not interested in this region and key: {}",
                clientMessage);
          }
          return false;
        }
      }
    } else if (this.postAuthzCallback != null) {
      String[] regionNameHolder = new String[1];
      boolean isAuthorize = false;
      OperationContext opctxt = getOperationContext(clientMessage, regionNameHolder);
      if (opctxt == null) {
        logger.warn(
            "{}: Not Adding message to queue: {} because the operation context object could not be obtained for this client message.",
            new Object[] {this, clientMessage});
        return false;
      }
      if (logger.isTraceEnabled()) {
        logger.trace("{}: Invoking authorizeOperation for message: {}", this, clientMessage);
      }

      if (this.proxyID.isDurable() && this.getDurableKeepAlive() && this._isPaused) {
        synchronized (this.clientUserAuthsLock) {
          isAuthorize = this.postAuthzCallback.authorizeOperation(regionNameHolder[0], opctxt);
        }
      } else {
        isAuthorize = this.postAuthzCallback.authorizeOperation(regionNameHolder[0], opctxt);
      }
      if (!isAuthorize) {
        logger.warn("{}: Not Adding message to queue {} because authorization failed.",
            new Object[] {this, clientMessage});
        return false;
      }
    }

    return true;
  }

  /**
   * Delivers the message to the client representing this client proxy.
   *
   */
  protected void deliverMessage(Conflatable conflatable) {
    ThreadState state = this.securityService.bindSubject(this.subject);
    ClientUpdateMessage clientMessage = null;

    if (conflatable instanceof HAEventWrapper) {
      clientMessage = ((HAEventWrapper) conflatable).getClientUpdateMessage();
    } else {
      clientMessage = (ClientUpdateMessage) conflatable;
    }

    this._statistics.incMessagesReceived();

    // post process
    if (this.securityService.needPostProcess()) {
      Object oldValue = clientMessage.getValue();
      Object newValue = securityService.postProcess(clientMessage.getRegionName(),
          clientMessage.getKeyOfInterest(), oldValue, clientMessage.valueIsObject());
      clientMessage.setLatestValue(newValue);
    }

    if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) {
      if (this._messageDispatcher != null) {
        this._messageDispatcher.enqueueMessage(conflatable);
      } else {
        this._statistics.incMessagesFailedQueued();

        if (logger.isDebugEnabled()) {
          logger.debug(
              "Message was not added to the queue. Message dispatcher was null for proxy: " + this
                  + ". Event ID hash code: " + conflatable.hashCode() + "; System ID hash code: "
                  + System.identityHashCode(conflatable) + "; Conflatable details: " + conflatable
                      .toString());
        }
      }
    } else {
      if (logger.isDebugEnabled()) {
        logger.debug(
            "Message was not added to the queue. Event ID hash code: " + conflatable.hashCode()
                + "; System ID hash code: "
                + System.identityHashCode(conflatable) + "; Conflatable details: " + conflatable
                    .toString());
      }

      this._statistics.incMessagesFailedQueued();
    }

    if (state != null)
      state.clear();
  }

  protected void sendMessageDirectly(ClientMessage message) {
    // Send the message directly if the connection exists
    // (do not go through the queue).
    if (logger.isDebugEnabled()) {
      logger.debug("About to send message directly to {}", this);
    }
    if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
      // If the socket is open, send the message to it
      this._messageDispatcher.sendMessageDirectly(message);
      if (logger.isDebugEnabled()) {
        logger.debug("Sent message directly to {}", this);
      }
    } else {
      // Otherwise just reset the ping counter
      resetPingCounter();
      if (logger.isDebugEnabled()) {
        logger.debug("Skipped sending message directly to {}", this);
      }
    }
  }

  private OperationContext getOperationContext(ClientMessage cmsg, String[] regionNameHolder) {
    ClientUpdateMessageImpl cmsgimpl = (ClientUpdateMessageImpl) cmsg;
    OperationContext opctxt = null;
    // TODO SW: Special handling for DynamicRegions; this should be reworked
    // when DynamicRegion API is deprecated
    String regionName = cmsgimpl.getRegionName();
    regionNameHolder[0] = regionName;
    if (cmsgimpl.isCreate()) {
      if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
        regionNameHolder[0] = (String) cmsgimpl.getKeyOfInterest();
        opctxt = new RegionCreateOperationContext(true);
      } else {
        PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(),
            cmsgimpl.getValue(), cmsgimpl.valueIsObject(), PutOperationContext.CREATE, true);
        tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
        opctxt = tmp;
      }
    } else if (cmsgimpl.isUpdate()) {
      if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
        regionNameHolder[0] = (String) cmsgimpl.getKeyOfInterest();
        opctxt = new RegionCreateOperationContext(true);
      } else {
        PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(),
            cmsgimpl.getValue(), cmsgimpl.valueIsObject(), PutOperationContext.UPDATE, true);
        tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
        opctxt = tmp;
      }
    } else if (cmsgimpl.isDestroy()) {
      if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
        regionNameHolder[0] = (String) cmsgimpl.getKeyOfInterest();
        opctxt = new RegionDestroyOperationContext(true);
      } else {
        DestroyOperationContext tmp =
            new DestroyOperationContext(cmsgimpl.getKeyOfInterest(), true);
        tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
        opctxt = tmp;
      }
    } else if (cmsgimpl.isDestroyRegion()) {
      opctxt = new RegionDestroyOperationContext(true);
    } else if (cmsgimpl.isInvalidate()) {
      InvalidateOperationContext tmp =
          new InvalidateOperationContext(cmsgimpl.getKeyOfInterest(), true);
      tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
      opctxt = tmp;
    } else if (cmsgimpl.isClearRegion()) {
      RegionClearOperationContext tmp = new RegionClearOperationContext(true);
      tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
      opctxt = tmp;
    }
    return opctxt;
  }

  /**
   * Initializes the message dispatcher thread. The <code>MessageDispatcher</code> processes the
   * message queue.
   *
   */
  public void initializeMessageDispatcher() throws CacheException {
    try {
      if (logger.isDebugEnabled()) {
        logger.debug("{}: Initializing message dispatcher with capacity of {} entries", this,
            _maximumMessageCount);
      }
      String name = "Client Message Dispatcher for " + getProxyID().getDistributedMember()
          + (isDurable() ? " (" + getDurableId() + ")" : "");
      this._messageDispatcher = createMessageDispatcher(name);
    } catch (final Exception ex) {
      this._statistics.close();
      throw ex;
    }
  }

  MessageDispatcher createMessageDispatcher(String name) {
    return new MessageDispatcher(this, name, statisticsClock);
  }

  protected void startOrResumeMessageDispatcher(boolean processedMarker) {
    // Only start or resume the dispatcher if it is Primary
    if (this.isPrimary) {
      // Add the marker to the queue
      if (!processedMarker) {
        EventID eventId = new EventID(this._cache.getDistributedSystem());
        this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
      }

      // Set the message queue to primary.
      this._messageDispatcher._messageQueue.setPrimary(true);

      // Start or resume the dispatcher
      synchronized (this._messageDispatcher._pausedLock) {
        if (this.isPaused()) {
          // It is paused, resume it
          this.setPaused(false);
          if (this._messageDispatcher.isStopped()) {
            if (logger.isDebugEnabled()) {
              logger.debug("{}: Starting dispatcher", this);
            }
            this._messageDispatcher.start();
          } else {
            // ARB: Initialize transient fields.
            this._messageDispatcher.initializeTransients();
            if (logger.isDebugEnabled()) {
              logger.debug("{}: Resuming dispatcher", this);
            }
            this._messageDispatcher.resumeDispatching();
          }
        } else if (!this._messageDispatcher.isAlive()) {
          if (logger.isDebugEnabled()) {
            logger.debug("{}: Starting dispatcher", this);
          }
          this._messageDispatcher.start();
        }
      }
    }
  }

  /*
   * Returns whether the client represented by this <code> CacheClientProxy </code> has registered
   * interest in anything. @return whether the client represented by this <code> CacheClientProxy
   * </code> has registered interest in anything
   */
  protected boolean hasRegisteredInterested() {
    return this.cils[RegisterInterestTracker.interestListIndex].hasInterest()
        || this.cils[RegisterInterestTracker.durableInterestListIndex].hasInterest();
  }

  /**
   * Returns a string representation of the proxy
   */
  @Override
  public String toString() {
    StringBuffer buffer = new StringBuffer();
    buffer.append("CacheClientProxy[")
        .append(this.proxyID)
        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
        .append("; version=").append(clientVersion).append("]");
    return buffer.toString();
  }

  public String getState() {
    StringBuffer buffer = new StringBuffer();
    buffer.append("CacheClientProxy[")
        .append(this.proxyID)
        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
        .append("; version=").append(clientVersion).append("; paused=").append(isPaused())
        .append("; alive=").append(isAlive()).append("; connected=").append(isConnected())
        .append("; isMarkedForRemoval=").append(isMarkedForRemoval).append("]");

    if (_messageDispatcher != null && isAlive()) {
      buffer.append(LogWriterImpl.getStackTrace(_messageDispatcher));
    }

    return buffer.toString();
  }

  @Override
  public boolean isPrimary() {
    boolean primary = this.isPrimary;
    return primary;
  }

  protected boolean basicIsPrimary() {
    return this.isPrimary;
  }

  protected void setPrimary(boolean isPrimary) {
    this.isPrimary = isPrimary;
  }

  /*
   * Return this client's HA region queue
   *
   * @returns - HARegionQueue of the client
   */
  public HARegionQueue getHARegionQueue() {
    if (this._messageDispatcher != null) {
      return _messageDispatcher._messageQueue;
    }
    return null;
  }


  /**
   * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
   *
   * @param socket The socket between the server and the client
   * @param ip whether this proxy represents the primary
   */
  protected void reinitialize(Socket socket, ClientProxyMembershipID proxyId, Cache cache,
      boolean ip, byte cc, Version ver) {
    // Re-initialize transient fields
    initializeTransientFields(socket, proxyId, ip, cc, ver);
    getCacheClientNotifier().getAcceptorStats().incCurrentQueueConnections();


    // Cancel expiration task
    cancelDurableExpirationTask(true);

    // Set the message dispatcher's primary flag. This could go from primary
    // to secondary
    this._messageDispatcher._messageQueue.setPrimary(ip);
    this._messageDispatcher._messageQueue.setClientConflation(cc);

    // Reset the _socketClosed AtomicBoolean
    this._socketClosed.compareAndSet(true, false);

    reinitializeClientAuths();
    this.creationDate = new Date();
    if (logger.isDebugEnabled()) {
      logger.debug("{}: Has been reinitialized", this);
    }
  }

  protected boolean isDurable() {
    return getProxyID().isDurable();
  }

  protected String getDurableId() {
    return getProxyID().getDurableId();
  }

  protected int getDurableTimeout() {
    return getProxyID().getDurableTimeout();
  }

  private boolean getDurableKeepAlive() {
    return this.keepalive;
  }

  protected String getHARegionName() {
    return getProxyID().getHARegionName();
  }

  public Region getHARegion() {
    return this._messageDispatcher._messageQueue.getRegion();
  }

  public Version getVersion() {
    return this.clientVersion;
  }

  @VisibleForTesting
  protected Subject getSubject() {
    return this.subject;
  }

  protected void scheduleDurableExpirationTask() {
    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
      @Override
      public void run2() {
        _durableExpirationTask.compareAndSet(this, null);
        logger.warn("{} : The expiration task has fired, so this proxy is being terminated.",
            CacheClientProxy.this);
        // Remove the proxy from the CacheClientNofier's registry
        getCacheClientNotifier().removeClientProxy(CacheClientProxy.this);
        getCacheClientNotifier().durableClientTimedOut(CacheClientProxy.this.proxyID);

        // Close the proxy
        terminateDispatching(false);
        _cacheClientNotifier.statistics.incQueueDroppedCount();

        /**
         * Setting the expiration task to null again and cancelling existing one, if any. See
         * #50894.
         * <p/>
         * The message dispatcher may again set the expiry task in below path: <code>
         *  org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.scheduleDurableExpirationTask(CacheClientProxy.java:2020)
         *  org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.pauseDispatching(CacheClientProxy.java:924)
         *  org.apache.geode.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.pauseOrUnregisterProxy(CacheClientProxy.java:2813)
         *  org.apache.geode.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.run(CacheClientProxy.java:2692)
         * </code>
         * <p/>
         * This is because message dispatcher may get an IOException with "Proxy closing due to
         * socket being closed locally" during/after terminateDispatching(false) above.
         */
        Object task = _durableExpirationTask.getAndSet(null);
        if (task != null) {
          if (((SystemTimerTask) task).cancel()) {
            _cache.purgeCCPTimer();
          }
        }
      }

    };
    if (this._durableExpirationTask.compareAndSet(null, task)) {
      _cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
    }
  }

  protected void cancelDurableExpirationTask(boolean logMessage) {
    SystemTimer.SystemTimerTask task = (SystemTimerTask) _durableExpirationTask.getAndSet(null);
    if (task != null) {
      if (logMessage) {
        logger.info("{}: Cancelling expiration task since the client has reconnected.",
            this);
      }
      if (task.cancel()) {
        _cache.purgeCCPTimer();
      }
    }
  }

  /**
   * Class <code>ClientInterestList</code> provides a convenient interface for manipulating client
   * interest information.
   */
  protected static class ClientInterestList {

    final CacheClientProxy ccp;

    final Object id;

    /**
     * An object used for synchronizing the interest lists
     */
    private final Object interestListLock = new Object();

    /**
     * Regions that this client is interested in
     */
    protected final Set<String> regions = new HashSet<String>();

    /**
     * Constructor.
     */
    protected ClientInterestList(CacheClientProxy ccp, Object interestID) {
      this.ccp = ccp;
      this.id = interestID;
      // this.id = getNextId();
    }

    /**
     * Registers interest in the input region name and key
     */
    protected void registerClientInterest(String regionName, Object keyOfInterest, int interestType,
        boolean sendUpdatesAsInvalidates) {
      if (logger.isDebugEnabled()) {
        logger.debug("{}: registerClientInterest region={} key={}", ccp, regionName, keyOfInterest);
      }
      Set keysRegistered = null;
      synchronized (this.interestListLock) {
        LocalRegion r = (LocalRegion) this.ccp._cache.getRegion(regionName, true);
        if (r == null) {
          throw new RegionDestroyedException("Region could not be found for interest registration",
              regionName);
        }
        if (!(r instanceof CacheDistributionAdvisee)) {
          throw new IllegalArgumentException("region " + regionName
              + " is not distributed and does not support interest registration");
        }
        FilterProfile p = r.getFilterProfile();
        keysRegistered =
            p.registerClientInterest(id, keyOfInterest, interestType, sendUpdatesAsInvalidates);
        regions.add(regionName);
      }
      // Perform actions if any keys were registered
      if ((keysRegistered != null) && containsInterestRegistrationListeners()
          && !keysRegistered.isEmpty()) {
        handleInterestEvent(regionName, keysRegistered, interestType, true);
      }
    }


    protected FilterProfile getProfile(String regionName) {
      try {
        return this.ccp._cache.getFilterProfile(regionName);
      } catch (CancelException e) {
        return null;
      }
    }

    /**
     * Unregisters interest in the input region name and key
     *
     * @param regionName The fully-qualified name of the region in which to unregister interest
     * @param keyOfInterest The key in which to unregister interest
     */
    protected void unregisterClientInterest(String regionName, Object keyOfInterest,
        int interestType) {
      if (logger.isDebugEnabled()) {
        logger.debug("{}: unregisterClientInterest region={} key={}", ccp, regionName,
            keyOfInterest);
      }
      FilterProfile p = getProfile(regionName);
      Set keysUnregistered = null;
      synchronized (this.interestListLock) {
        if (p != null) {
          keysUnregistered = p.unregisterClientInterest(id, keyOfInterest, interestType);
          if (!p.hasInterestFor(id)) {
            this.regions.remove(regionName);
          }
        } else {
          this.regions.remove(regionName);
        }
      }
      if (keysUnregistered != null && !keysUnregistered.isEmpty()) {
        handleInterestEvent(regionName, keysUnregistered, interestType, false);
      }
    }

    /**
     * Registers interest in the input region name and list of keys
     *
     * @param regionName The fully-qualified name of the region in which to register interest
     * @param keysOfInterest The list of keys in which to register interest
     */
    protected void registerClientInterestList(String regionName, List keysOfInterest,
        boolean sendUpdatesAsInvalidates) {
      FilterProfile p = getProfile(regionName);
      if (p == null) {
        throw new RegionDestroyedException("Region not found during client interest registration",
            regionName);
      }
      Set keysRegistered = null;
      synchronized (this.interestListLock) {
        keysRegistered = p.registerClientInterestList(id, keysOfInterest, sendUpdatesAsInvalidates);
        regions.add(regionName);
      }
      // Perform actions if any keys were registered
      if (containsInterestRegistrationListeners() && !keysRegistered.isEmpty()) {
        handleInterestEvent(regionName, keysRegistered, InterestType.KEY, true);
      }
    }

    /**
     * Unregisters interest in the input region name and list of keys
     *
     * @param regionName The fully-qualified name of the region in which to unregister interest
     * @param keysOfInterest The list of keys in which to unregister interest
     */
    protected void unregisterClientInterestList(String regionName, List keysOfInterest) {
      FilterProfile p = getProfile(regionName);
      Set keysUnregistered = null;
      synchronized (this.interestListLock) {
        if (p != null) {
          keysUnregistered = p.unregisterClientInterestList(id, keysOfInterest);
          if (!p.hasInterestFor(id)) {
            regions.remove(regionName);
          }
        } else {
          regions.remove(regionName);
        }
      }
      // Perform actions if any keys were unregistered
      if (!keysUnregistered.isEmpty()) {
        handleInterestEvent(regionName, keysUnregistered, InterestType.KEY, false);
      }
    }

    /*
     * Returns whether this interest list has any keys, patterns or filters of interest. It answers
     * the question: Are any clients being notified because of this interest list? @return whether
     * this interest list has any keys, patterns or filters of interest
     */
    protected boolean hasInterest() {
      return regions.size() > 0;
    }

    protected void clearClientInterestList() {
      boolean isClosed = ccp.getCache().isClosed();

      synchronized (this.interestListLock) {
        for (String regionName : regions) {
          FilterProfile p = getProfile(regionName);
          if (p == null) {
            continue;
          }
          if (!isClosed) {
            if (p.hasAllKeysInterestFor(id)) {
              Set allKeys = new HashSet();
              allKeys.add(".*");
              allKeys = Collections.unmodifiableSet(allKeys);
              handleInterestEvent(regionName, allKeys, InterestType.REGULAR_EXPRESSION, false);
            }
            Set keysOfInterest = p.getKeysOfInterestFor(id);
            if (keysOfInterest != null && keysOfInterest.size() > 0) {
              handleInterestEvent(regionName, keysOfInterest, InterestType.KEY, false);
            }
            Map<String, Pattern> patternsOfInterest = p.getPatternsOfInterestFor(id);
            if (patternsOfInterest != null && patternsOfInterest.size() > 0) {
              handleInterestEvent(regionName, patternsOfInterest.keySet(),
                  InterestType.REGULAR_EXPRESSION, false);
            }
          }
          p.clearInterestFor(id);
        }
        regions.clear();
      }
    }


    private void handleInterestEvent(String regionName, Set keysOfInterest, int interestType,
        boolean isRegister) {
      // Notify the region about this register interest event if:
      // - the application has requested it
      // - this is a primary CacheClientProxy (otherwise multiple notifications
      // may occur)
      // - it is a key interest type (regex is currently not supported)
      InterestRegistrationEvent event = null;
      if (NOTIFY_REGION_ON_INTEREST && this.ccp.isPrimary() && interestType == InterestType.KEY) {
        event = new InterestRegistrationEventImpl(this.ccp, regionName, keysOfInterest,
            interestType, isRegister);
        try {
          notifyRegionOfInterest(event);
        } catch (Exception e) {
          logger.warn("Region notification of interest failed", e);
        }
      }
      // Invoke interest registration listeners
      if (containsInterestRegistrationListeners()) {
        if (event == null) {
          event = new InterestRegistrationEventImpl(this.ccp, regionName, keysOfInterest,
              interestType, isRegister);
        }
        notifyInterestRegistrationListeners(event);
      }
    }

    private void notifyRegionOfInterest(InterestRegistrationEvent event) {
      this.ccp.getCacheClientNotifier().handleInterestEvent(event);
    }

    private void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
      this.ccp.getCacheClientNotifier().notifyInterestRegistrationListeners(event);
    }

    private boolean containsInterestRegistrationListeners() {
      return this.ccp.getCacheClientNotifier().containsInterestRegistrationListeners();
    }
  }


  /**
   * Class <code>MessageDispatcher</code> is a <code>Thread</code> that processes messages bound for
   * the client by taking messsages from the message queue and sending them to the client over the
   * socket.
   */
  static class MessageDispatcher extends LoggingThread {

    /**
     * The queue of messages to be sent to the client
     */
    protected final HARegionQueue _messageQueue;

    // /**
    // * An int used to keep track of the number of messages dropped for logging
    // * purposes. If greater than zero then a warning has been logged about
    // * messages being dropped.
    // */
    // private int _numberOfMessagesDropped = 0;

    /**
     * The proxy for which this dispatcher is processing messages
     */
    private final CacheClientProxy _proxy;

    // /**
    // * The conflator faciliates message conflation
    // */
    // protected BridgeEventConflator _eventConflator;

    /**
     * Whether the dispatcher is stopped
     */
    private volatile boolean _isStopped = true;

    /**
     * guarded.By _pausedLock
     */
    // boolean _isPausedDispatcher = false;

    /**
     * A lock object used to control pausing this dispatcher
     */
    protected final Object _pausedLock = new Object();

    /**
     * An object used to protect when dispatching is being stopped.
     */
    private final Object _stopDispatchingLock = new Object();

    private final ReadWriteLock socketLock = new ReentrantReadWriteLock();

    private final Lock socketWriteLock = socketLock.writeLock();
    // /**
    // * A boolean verifying whether a warning has already been issued if the
    // * message queue has reached its capacity.
    // */
    // private boolean _messageQueueCapacityReachedWarning = false;

    /**
     * Constructor.
     *
     * @param proxy The <code>CacheClientProxy</code> for which this dispatcher is processing
     *        messages
     * @param name thread name for this dispatcher
     */
    protected MessageDispatcher(CacheClientProxy proxy, String name,
        StatisticsClock statisticsClock) throws CacheException {
      super(name);

      this._proxy = proxy;

      // Create the event conflator
      // this._eventConflator = new BridgeEventConflator

      // Create the message queue
      try {
        HARegionQueueAttributes harq = new HARegionQueueAttributes();
        harq.setBlockingQueueCapacity(proxy._maximumMessageCount);
        harq.setExpiryTime(proxy._messageTimeToLive);
        ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer())
            .putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
        boolean createDurableQueue = proxy.proxyID.isDurable();
        boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0)
            && InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
            && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
        if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
          logger.debug("Creating a {} subscription queue for {}",
              createDurableQueue ? "durable" : "non-durable",
              proxy.getProxyID());
        }
        this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
            getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue,
            proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
            this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta, statisticsClock);
        // Check if interests were registered during HARegion GII.
        if (this._proxy.hasRegisteredInterested()) {
          this._messageQueue.setHasRegisteredInterest(true);
        }
      } catch (CancelException e) {
        throw e;
      } catch (RegionExistsException ree) {
        throw ree;
      } catch (Exception e) {
        getCache().getCancelCriterion().checkCancelInProgress(e);
        throw new CacheException(
            "Exception occurred while trying to create a message queue.",
            e) {
          private static final long serialVersionUID = 0L;
        };
      }
    }

    private CacheClientProxy getProxy() {
      return this._proxy;
    }

    private InternalCache getCache() {
      return getProxy().getCache();
    }

    private Socket getSocket() {
      return getProxy().getSocket();
    }

    private ByteBuffer getCommBuffer() {
      return getProxy().getCommBuffer();
    }

    private CacheClientProxyStats getStatistics() {
      return getProxy().getStatistics();
    }

    private void basicStopDispatching() {
      if (logger.isDebugEnabled()) {
        logger.debug("{}: notified dispatcher to stop", this);
      }
      this._isStopped = true;
      // this.interrupt(); // don't interrupt here. Let close(boolean) do this.
    }

    @Override
    public String toString() {
      return getProxy().toString();
    }

    /**
     * Notifies the dispatcher to stop dispatching.
     *
     * @param checkQueue Whether to check the message queue for any unprocessed messages and process
     *        them for MAXIMUM_SHUTDOWN_PEEKS.
     *
     * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
     */
    protected synchronized void stopDispatching(boolean checkQueue) {
      if (isStopped()) {
        return;
      }

      if (logger.isDebugEnabled()) {
        logger.debug("{}: Stopping dispatching", this);
      }
      if (!checkQueue) {
        basicStopDispatching();
        return;
      }

      // Stay alive until the queue is empty or a number of peeks is reached.
      List events = null;
      try {
        for (int numberOfPeeks = 0; numberOfPeeks < MAXIMUM_SHUTDOWN_PEEKS; ++numberOfPeeks) {
          boolean interrupted = Thread.interrupted();
          try {
            events = this._messageQueue.peek(1, -1);
            if (events == null || events.size() == 0) {
              break;
            }
            if (logger.isDebugEnabled()) {
              logger.debug("Waiting for client to drain queue: {}", _proxy.proxyID);
            }
            Thread.sleep(500);
          } catch (InterruptedException e) {
            interrupted = true;
          } catch (CancelException e) {
            break;
          } catch (CacheException e) {
            if (logger.isDebugEnabled()) {
              logger.debug("{}: Exception occurred while trying to stop dispatching", this, e);
            }
          } finally {
            if (interrupted)
              Thread.currentThread().interrupt();
          }
        } // for
      } finally {
        basicStopDispatching();
      }
    }

    /**
     * Returns whether the dispatcher is stopped
     *
     * @return whether the dispatcher is stopped
     */
    protected boolean isStopped() {
      return this._isStopped;
    }

    /**
     * Returns the size of the queue for heuristic purposes. This size may be changing concurrently
     * if puts / gets are occurring at the same time.
     *
     * @return the size of the queue
     */
    protected int getQueueSize() {
      return this._messageQueue == null ? 0 : this._messageQueue.size();
    }

    /**
     * Returns the size of the queue calculated through stats This includes events that have
     * dispatched but have yet been removed
     *
     * @return the size of the queue
     */
    protected int getQueueSizeStat() {
      if (this._messageQueue != null) {
        HARegionQueueStats stats = this._messageQueue.getStatistics();
        return ((int) (stats.getEventsEnqued() - stats.getEventsRemoved()
            - stats.getEventsConflated() - stats.getMarkerEventsConflated()
            - stats.getEventsExpired() - stats.getEventsRemovedByQrm() - stats.getEventsTaken()
            - stats.getNumVoidRemovals()));
      }
      return 0;
    }

    protected void drainClientCqEvents(ClientProxyMembershipID clientId,
        InternalCqQuery cqToClose) {
      this._messageQueue.closeClientCq(clientId, cqToClose);
    }

    /**
     * Runs the dispatcher by taking a message from the queue and sending it to the client attached
     * to this proxy.
     */
    @Override
    public void run() {
      boolean exceptionOccurred = false;
      this._isStopped = false;

      if (logger.isDebugEnabled()) {
        logger.debug("{}: Beginning to process events", this);
      }
      // for testing purposes
      if (isSlowStartForTesting) {
        long slowStartTimeForTesting =
            Long.getLong(KEY_SLOW_START_TIME_FOR_TESTING, DEFAULT_SLOW_STARTING_TIME).longValue();
        long elapsedTime = 0;
        long startTime = System.currentTimeMillis();
        while ((slowStartTimeForTesting > elapsedTime) && isSlowStartForTesting) {
          try {
            Thread.sleep(500);
          } catch (InterruptedException ignore) {
            if (logger.isDebugEnabled()) {
              logger.debug("Slow start for testing interrupted");
            }
            break;
          }
          elapsedTime = System.currentTimeMillis() - startTime;
        }
        if (slowStartTimeForTesting < elapsedTime) {
          isSlowStartForTesting = false;
        }
      }

      ClientMessage clientMessage = null;
      while (!isStopped()) {
        // SystemFailure.checkFailure(); DM's stopper does this
        if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
          break;
        }
        try {
          // If paused, wait to be told to resume (or interrupted if stopped)
          if (getProxy().isPaused()) {
            // ARB: Before waiting for resumption, process acks from client.
            // This will reduce the number of duplicates that a client receives after
            // reconnecting.
            synchronized (_pausedLock) {
              try {
                logger.info("available ids = " + this._messageQueue.size() + " , isEmptyAckList ="
                    + this._messageQueue.isEmptyAckList() + ", peekInitialized = "
                    + this._messageQueue.isPeekInitialized());
                while (!this._messageQueue.isEmptyAckList()
                    && this._messageQueue.isPeekInitialized()) {
                  this._messageQueue.remove();
                }
              } catch (InterruptedException ex) {
                logger.warn("{}: sleep interrupted.", this);
              }
            }
            waitForResumption();
          }
          try {
            clientMessage = (ClientMessage) this._messageQueue.peek();
          } catch (RegionDestroyedException skipped) {
            break;
          }
          getStatistics().setQueueSize(this._messageQueue.size());
          if (isStopped()) {
            break;
          }
          if (clientMessage != null) {
            // Process the message
            long start = getStatistics().startTime();
            //// BUGFIX for BUG#38206 and BUG#37791
            boolean isDispatched = dispatchMessage(clientMessage);
            getStatistics().endMessage(start);
            if (isDispatched) {
              this._messageQueue.remove();
              if (clientMessage instanceof ClientMarkerMessageImpl) {
                getProxy().markerEnqueued = false;
              }
            }
          } else {
            this._messageQueue.remove();
          }
          clientMessage = null;
        } catch (MessageTooLargeException e) {
          logger.warn("Message too large to send to client: {}, {}", clientMessage, e.getMessage());
        } catch (IOException e) {
          // Added the synchronization below to ensure that exception handling
          // does not occur while stopping the dispatcher and vice versa.
          synchronized (this._stopDispatchingLock) {
            // An IOException occurred while sending a message to the
            // client. If the processor is not already stopped, assume
            // the client is dead and stop processing.
            if (!isStopped() && !getProxy().isPaused()) {
              if ("Broken pipe".equals(e.getMessage())) {
                logger.warn("{}: Proxy closing due to unexpected broken pipe on socket connection.",
                    this);
              } else if ("Connection reset".equals(e.getMessage())) {
                logger.warn("{}: Proxy closing due to unexpected reset on socket connection.",
                    this);
              } else if ("Connection reset by peer".equals(e.getMessage())) {
                logger.warn(
                    "{}: Proxy closing due to unexpected reset by peer on socket connection.",
                    this);
              } else if ("Socket is closed".equals(e.getMessage())
                  || "Socket Closed".equals(e.getMessage())) {
                logger.info("{}: Proxy closing due to socket being closed locally.",
                    this);
              } else {
                logger.warn(String.format(
                    "%s: An unexpected IOException occurred so the proxy will be closed.",
                    this),
                    e);
              }
              // Let the CacheClientNotifier discover the proxy is not alive.
              // See isAlive().
              // getProxy().close(false);

              pauseOrUnregisterProxy(e);
            } // _isStopped
          } // synchronized
          exceptionOccurred = true;
        } // IOException
        catch (InterruptedException e) {
          // If the thread is paused, ignore the InterruptedException and
          // continue. The proxy is null if stopDispatching has been called.
          if (getProxy().isPaused()) {
            if (logger.isDebugEnabled()) {
              logger.debug(
                  "{}: interrupted because it is being paused. It will continue and wait for resumption.",
                  this);
            }
            Thread.interrupted();
            continue;
          }

          // no need to reset the bit; we're exiting
          if (logger.isDebugEnabled()) {
            logger.debug("{}: interrupted", this);
          }
          break;
        } catch (CancelException e) {
          if (logger.isDebugEnabled()) {
            logger.debug("{}: shutting down due to cancellation", this);
          }
          exceptionOccurred = true; // message queue is defunct, don't try to read it.
          break;
        } catch (RegionDestroyedException e) {
          if (logger.isDebugEnabled()) {
            logger.debug("{}: shutting down due to loss of message queue", this);
          }
          exceptionOccurred = true; // message queue is defunct, don't try to read it.
          break;
        } catch (Exception e) {
          // An exception occurred while processing a message. Since it
          // is not an IOException, the client may still be alive, so
          // continue processing.
          if (!isStopped()) {
            logger.fatal(String.format("%s : An unexpected Exception occurred", this),
                e);
          }
        }
      }

      // Processing gets here if isStopped=true. What is this code below doing?
      List list = null;
      if (!exceptionOccurred) {
        try {
          // Clear the interrupt status if any,
          Thread.interrupted();
          int size = this._messageQueue.size();
          list = this._messageQueue.peek(size);
          if (logger.isDebugEnabled()) {
            logger.debug(
                "{}: After flagging the dispatcher to stop , the residual List of messages to be dispatched={} size={}",
                this, list, list.size());
          }
          if (list.size() > 0) {
            long start = getStatistics().startTime();
            Iterator itr = list.iterator();
            while (itr.hasNext()) {
              dispatchMessage((ClientMessage) itr.next());
              getStatistics().endMessage(start);
              // @todo asif: shouldn't we call itr.remove() since the current msg
              // has been sent? That way list will be more accurate
              // if we have an exception.
            }
            this._messageQueue.remove();
          }
        } catch (CancelException e) {
          if (logger.isDebugEnabled()) {
            logger.debug("CacheClientNotifier stopped due to cancellation");
          }
        } catch (Exception ignore) {
          // if (logger.isInfoEnabled()) {
          String extraMsg = null;

          if ("Broken pipe".equals(ignore.getMessage())) {
            extraMsg = "Problem caused by broken pipe on socket.";
          } else if (ignore instanceof RegionDestroyedException) {
            extraMsg =
                "Problem caused by message queue being closed.";
          }
          final Object[] msgArgs = new Object[] {((!isStopped()) ? this.toString() + ": " : ""),
              ((list == null) ? 0 : list.size())};
          if (extraMsg != null) {
            // Dont print exception details, but add on extraMsg
            logger.info(
                String.format(
                    "%s Possibility of not being able to send some or all of the messages to clients. Total messages currently present in the list %s.",
                    msgArgs));
            logger.info(extraMsg);
          } else {
            // Print full stacktrace
            logger.info(String.format(
                "%s Possibility of not being able to send some or all of the messages to clients. Total messages currently present in the list %s.",
                msgArgs),
                ignore);
          }
        }

        if (list != null && logger.isTraceEnabled()) {
          logger.trace("Messages remaining in the list are: {}", list);
        }

        // }
      }
      if (logger.isTraceEnabled()) {
        logger.trace("{}: Dispatcher thread is ending", this);
      }

    }

    private void pauseOrUnregisterProxy(Throwable t) {
      if (getProxy().isDurable()) {
        try {
          getProxy().pauseDispatching();
        } catch (Exception ex) {
          // see bug 40611; we catch Exception here because
          // we once say an InterruptedException here.
          // log a warning saying we couldn't pause?
          if (logger.isDebugEnabled()) {
            logger.debug("{}: {}", this, ex);
          }
        }
      } else {
        this._isStopped = true;
      }

      // Stop the ServerConnections. This will force the client to
      // server communication to close.
      ClientHealthMonitor chm = ClientHealthMonitor.getInstance();

      // Note now that _proxy is final the following comment is no
      // longer true. the _isStopped check should be sufficient.
      // Added the test for this._proxy != null to prevent bug 35801.
      // The proxy could have been stopped after this IOException has
      // been caught and here, so the _proxy will be null.
      if (chm != null) {
        ClientProxyMembershipID proxyID = getProxy().proxyID;
        chm.removeAllConnectionsAndUnregisterClient(proxyID, t);
        if (!getProxy().isDurable()) {
          getProxy().getCacheClientNotifier().unregisterClient(proxyID, false);
        }
      }
    }

    /**
     * Sends a message to the client attached to this proxy
     *
     * @param clientMessage The <code>ClientMessage</code> to send to the client
     *
     */
    protected boolean dispatchMessage(ClientMessage clientMessage) throws IOException {
      boolean isDispatched = false;
      if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
        logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Dispatching {}", clientMessage);
      }
      Message message = null;

      // byte[] latestValue =
      // this._eventConflator.getLatestValue(clientMessage);

      if (clientMessage instanceof ClientUpdateMessage) {
        byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue();
        if (logger.isTraceEnabled()) {
          StringBuilder msg = new StringBuilder(100);
          msg.append(this).append(": Using latest value: ").append(Arrays.toString(latestValue));
          if (((ClientUpdateMessage) clientMessage).valueIsObject()) {
            if (latestValue != null) {
              msg.append(" (").append(deserialize(latestValue)).append(")");
            }
            msg.append(" for ").append(clientMessage);
          }
          logger.trace(msg.toString());
        }

        message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), latestValue);

        if (AFTER_MESSAGE_CREATION_FLAG) {
          ClientServerObserver bo = ClientServerObserverHolder.getInstance();
          bo.afterMessageCreation(message);
        }
      } else {
        message = clientMessage.getMessage(getProxy(), true /* notify */);
      }

      if (!this._proxy.isPaused()) {
        sendMessage(message);

        if (logger.isTraceEnabled()) {
          logger.trace("{}: Dispatched {}", this, clientMessage);
        }
        isDispatched = true;
      } else {
        if (logger.isDebugEnabled()) {
          logger.debug("Message Dispatcher of a Paused CCProxy is trying to dispatch message");
        }
      }
      if (isDispatched) {
        this._messageQueue.getStatistics().incEventsDispatched();
      }
      return isDispatched;
    }

    private void sendMessage(Message message) throws IOException {
      if (message == null) {
        return;
      }
      this.socketWriteLock.lock();
      try {
        message.setComms(getSocket(), getCommBuffer(), getStatistics());
        message.send();
        getProxy().resetPingCounter();
      } finally {
        this.socketWriteLock.unlock();
      }
      if (logger.isTraceEnabled()) {
        logger.trace("{}: Sent {}", this, message);
      }
    }

    /**
     * Add the input client message to the message queue
     *
     * @param clientMessage The <code>Conflatable</code> to add to the queue
     */
    protected void enqueueMessage(Conflatable clientMessage) {
      try {
        this._messageQueue.put(clientMessage);
        if (this._proxy.isPaused() && this._proxy.isDurable()) {
          this._proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
          if (logger.isDebugEnabled()) {
            logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage);
          }
        }
      } catch (CancelException e) {
        throw e;
      } catch (Exception e) {
        if (!isStopped()) {
          this._proxy._statistics.incMessagesFailedQueued();
          logger.fatal(
              String.format("%s: Exception occurred while attempting to add message to queue",
                  this),
              e);
        }
      }
    }


    protected void enqueueMarker(ClientMessage message) {
      try {
        if (logger.isDebugEnabled()) {
          logger.debug("{}: Queueing marker message. <{}>. The queue contains {} entries.", this,
              message, getQueueSize());
        }
        this._messageQueue.put(message);
        if (logger.isDebugEnabled()) {
          logger.debug("{}: Queued marker message. The queue contains {} entries.", this,
              getQueueSize());
        }
      } catch (CancelException e) {
        throw e;
      } catch (Exception e) {
        if (!isStopped()) {
          logger.fatal(
              String.format("%s : Exception occurred while attempting to add message to queue",
                  this),
              e);
        }
      }
    }

    private void sendMessageDirectly(ClientMessage clientMessage) {
      Message message;
      try {
        if (logger.isDebugEnabled()) {
          logger.debug("{}: Dispatching directly: {}", this, clientMessage);
        }
        message = clientMessage.getMessage(getProxy(), true);
        sendMessage(message);
        if (logger.isDebugEnabled()) {
          logger.debug("{}: Dispatched directly: {}", this, clientMessage);
        }
        // The exception handling code was modeled after the MessageDispatcher
        // run method
      } catch (MessageTooLargeException e) {
        logger.warn("Message too large to send to client: {}, {}", clientMessage, e.getMessage());

      } catch (IOException e) {
        synchronized (this._stopDispatchingLock) {
          // Pause or unregister proxy
          if (!isStopped() && !getProxy().isPaused()) {
            logger.fatal(String.format("%s : An unexpected Exception occurred", this),
                e);
            pauseOrUnregisterProxy(e);
          }
        }
      } catch (Exception e) {
        if (!isStopped()) {
          logger.fatal(String.format("%s : An unexpected Exception occurred", this), e);
        }
      }
    }

    protected void waitForResumption() throws InterruptedException {
      synchronized (this._pausedLock) {
        logger.info("{} : Pausing processing", this);
        if (!getProxy().isPaused()) {
          return;
        }
        while (getProxy().isPaused()) {
          this._pausedLock.wait();
        }
        // Fix for #48571
        _messageQueue.clearPeekedIDs();
      }
    }

    protected void resumeDispatching() {
      logger.info("{} : Resuming processing", this);

      // Notify thread to resume
      this._pausedLock.notifyAll();
    }

    protected Object deserialize(byte[] serializedBytes) {
      Object deserializedObject = serializedBytes;
      // This is a debugging method so ignore all exceptions like
      // ClassNotFoundException
      try {
        ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes);
        deserializedObject = DataSerializer.readObject(dis);
      } catch (Exception e) {
      }
      return deserializedObject;
    }

    protected void initializeTransients() {
      while (!this._messageQueue.isEmptyAckList() && this._messageQueue.isPeekInitialized()) {
        try {
          this._messageQueue.remove();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      this._messageQueue.initializeTransients();
    }
  }

  /**
   * Returns the current number of CQS the client installed.
   *
   * @return int the current count of CQs for this client
   */
  public int getCqCount() {
    synchronized (this) {
      return this._statistics.getCqCount();
    }
  }

  /**
   * Increment the number of CQs the client installed
   *
   */
  public void incCqCount() {
    synchronized (this) {
      this._statistics.incCqCount();
    }
  }

  /**
   * Decrement the number of CQs the client installed
   *
   */
  public synchronized void decCqCount() {
    synchronized (this) {
      this._statistics.decCqCount();
    }
  }

  /**
   * Returns true if the client has one CQ
   *
   * @return true if the client has one CQ
   */
  public boolean hasOneCq() {
    synchronized (this) {
      return this._statistics.getCqCount() == 1;
    }
  }

  /**
   * Returns true if the client has no CQs
   *
   * @return true if the client has no CQs
   */
  public boolean hasNoCq() {
    synchronized (this) {
      return this._statistics.getCqCount() == 0;
    }
  }

  /**
   * Get map of regions with empty data policy
   *
   * @since GemFire 6.1
   */
  public Map<String, Integer> getRegionsWithEmptyDataPolicy() {
    return regionsWithEmptyDataPolicy;
  }

  public int incrementAndGetPingCounter() {
    int pingCount = this.pingCounter.incrementAndGet();
    return pingCount;
  }

  public void resetPingCounter() {
    this.pingCounter.set(0);
  }

  /**
   * Returns the number of seconds that have elapsed since the Client proxy created.
   *
   * @since GemFire 7.0
   */
  public long getUpTime() {
    return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000);
  }

  public interface TestHook {
    void doTestHook(String spot);
  }

  @MutableForTesting
  public static TestHook testHook;
}
