/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.cache.client.internal;

import java.io.IOException;
import java.io.NotSerializableException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.CopyException;
import org.apache.geode.GemFireException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.SerializationException;
import org.apache.geode.cache.CacheRuntimeException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.SubscriptionNotEnabledException;
import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
import org.apache.geode.cache.client.internal.QueueManager.QueueConnections;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.PoolManagerImpl;
import org.apache.geode.internal.cache.PutAllPartialResultException;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
import org.apache.geode.internal.cache.tier.BatchException;
import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;

/**
 * Called from the client and execute client to server requests against servers. Handles retrying to
 * different servers, and marking servers dead if we get exception from them.
 *
 * @since GemFire 5.7
 */
public class OpExecutorImpl implements ExecutablePool {
  private static final Logger logger = LogService.getLogger();

  private static final boolean TRY_SERVERS_ONCE =
      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PoolImpl.TRY_SERVERS_ONCE");
  static final int TX_RETRY_ATTEMPT =
      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "txRetryAttempt", 500);

  private final ConnectionManager connectionManager;
  private final int retryAttempts;
  private final long serverTimeout;
  private final EndpointManager endpointManager;
  private final RegisterInterestTracker riTracker;
  private final QueueManager queueManager;
  private final CancelCriterion cancelCriterion;
  private /* final */ PoolImpl pool;
  private final ThreadLocal<Boolean> serverAffinity = ThreadLocal.withInitial(() -> Boolean.FALSE);
  private boolean serverAffinityFailover = false;
  private final ThreadLocal<ServerLocation> affinityServerLocation = new ThreadLocal<>();

  private final ThreadLocal<Integer> affinityRetryCount = ThreadLocal.withInitial(() -> 0);

  public OpExecutorImpl(ConnectionManager connectionManager, QueueManager queueManager,
      EndpointManager endpointManager, RegisterInterestTracker riTracker, int retryAttempts,
      long serverTimeout, CancelCriterion cancelCriterion,
      PoolImpl pool) {
    this.connectionManager = connectionManager;
    this.queueManager = queueManager;
    this.endpointManager = endpointManager;
    this.riTracker = riTracker;
    this.retryAttempts = retryAttempts;
    this.serverTimeout = serverTimeout;
    this.cancelCriterion = cancelCriterion;
    this.pool = pool;
  }

  @Override
  public Object execute(Op op) {
    return execute(op, retryAttempts);
  }

  @Override
  public Object execute(Op op, int retries) {
    if (serverAffinity.get()) {
      ServerLocation loc = affinityServerLocation.get();
      if (loc == null) {
        loc = getNextOpServerLocation();
        affinityServerLocation.set(loc);
        if (logger.isDebugEnabled()) {
          logger.debug("setting server affinity to {}", affinityServerLocation.get());
        }
      }
      return executeWithServerAffinity(loc, op);
    }

    Connection conn = connectionManager.borrowConnection(serverTimeout);
    try {
      Set<ServerLocation> attemptedServers = null;

      for (int attempt = 0; true; attempt++) {
        // when an op is retried we may need to try to recover the previous
        // attempt's version stamp
        if (attempt == 1 && (op instanceof AbstractOp)) {
          AbstractOp absOp = (AbstractOp) op;
          absOp.getMessage().setIsRetry();
        }
        try {
          authenticateIfRequired(conn, op);
          return executeWithPossibleReAuthentication(conn, op);
        } catch (MessageTooLargeException e) {
          throw new GemFireIOException("unable to transmit message to server", e);
        } catch (Exception e) {
          handleException(e, conn, attempt, attempt >= retries && retries != -1);
          if (null == attemptedServers) {
            // don't allocate this until we need it
            attemptedServers = new HashSet<>();
          }
          attemptedServers.add(conn.getServer());
          try {
            conn = connectionManager.exchangeConnection(conn, attemptedServers);
          } catch (NoAvailableServersException nse) {
            // if retries is -1, don't try again after the last server has failed
            if (retries == -1 || TRY_SERVERS_ONCE) {
              handleException(e, conn, attempt, true);
            } else {
              // try one of the failed servers again, until we exceed the retry attempts.
              attemptedServers.clear();
              try {
                conn = connectionManager.exchangeConnection(conn, attemptedServers);
              } catch (NoAvailableServersException nse2) {
                handleException(e, conn, attempt, true);
              }
            }
          }
        }
      }
    } finally {
      connectionManager.returnConnection(conn);
    }
  }

  /**
   * execute the given op on the given server. If the server cannot be reached, sends a
   * TXFailoverOp, then retries the given op
   *
   * @param loc the server to execute the op on
   * @param op the op to execute
   * @return the result of execution
   */
  Object executeWithServerAffinity(ServerLocation loc, Op op) {
    final int initialRetryCount = getAffinityRetryCount();
    try {
      try {
        return executeOnServer(loc, op, true, false);
      } catch (ServerConnectivityException e) {
        if (logger.isDebugEnabled()) {
          logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e);
        }
        if (!serverAffinityFailover || e instanceof ServerOperationException) {
          throw e;
        }
        int retryCount = getAffinityRetryCount();
        if ((retryAttempts != -1 && retryCount >= retryAttempts)
            || retryCount > TX_RETRY_ATTEMPT) {
          // prevent stack overflow
          throw e;
        }
        setAffinityRetryCount(retryCount + 1);
      }
      affinityServerLocation.set(null);
      if (logger.isDebugEnabled()) {
        logger.debug("reset server affinity: attempting txFailover");
      }
      // send TXFailoverOp, so that new server can
      // do bootstrapping, then re-execute original op
      AbstractOp absOp = (AbstractOp) op;
      absOp.getMessage().setIsRetry();
      int transactionId = absOp.getMessage().getTransactionId();
      // for CommitOp we do not have transactionId in AbstractOp
      // so set it explicitly for TXFailoverOp
      TXFailoverOp.execute(pool, transactionId);

      if (op instanceof ExecuteRegionFunctionOpImpl) {
        op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op, (byte) 1,
            new HashSet<>());
        ((ExecuteRegionFunctionOpImpl) op).getMessage().setTransactionId(transactionId);
      } else if (op instanceof ExecuteFunctionOpImpl) {
        op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* isReExecute */);
        ((ExecuteFunctionOpImpl) op).getMessage().setTransactionId(transactionId);
      }
      return pool.execute(op);
    } finally {
      if (initialRetryCount == 0) {
        setAffinityRetryCount(0);
      }
    }
  }

  @Override
  public void setupServerAffinity(boolean allowFailover) {
    if (logger.isDebugEnabled()) {
      logger.debug("setting up server affinity");
    }
    serverAffinityFailover = allowFailover;
    serverAffinity.set(Boolean.TRUE);
  }

  @Override
  public void releaseServerAffinity() {
    if (logger.isDebugEnabled()) {
      logger.debug("reset server affinity");
    }
    serverAffinity.set(Boolean.FALSE);
    affinityServerLocation.set(null);
  }

  @Override
  public ServerLocation getServerAffinityLocation() {
    return affinityServerLocation.get();
  }

  int getAffinityRetryCount() {
    return affinityRetryCount.get();
  }

  void setAffinityRetryCount(int retryCount) {
    affinityRetryCount.set(retryCount);
  }

  @Override
  public void setServerAffinityLocation(ServerLocation serverLocation) {
    assert affinityServerLocation.get() == null;
    affinityServerLocation.set(serverLocation);
  }

  public ServerLocation getNextOpServerLocation() {
    Connection conn = connectionManager.borrowConnection(serverTimeout);
    try {
      return conn.getServer();
    } finally {
      connectionManager.returnConnection(conn);
    }
  }

  /*
   * (non-Javadoc)
   *
   * @see org.apache.geode.cache.client.internal.OpExecutor#executeOn(org.apache.geode.distributed.
   * internal.ServerLocation, org.apache.geode.cache.client.internal.Op)
   */
  @Override
  public Object executeOn(ServerLocation server, Op op) {
    return executeOn(server, op, true, false);
  }

  @Override
  public Object executeOn(ServerLocation p_server, Op op, boolean accessed,
      boolean onlyUseExistingCnx) {
    ServerLocation server = p_server;
    if (serverAffinity.get()) {
      ServerLocation affinityserver = affinityServerLocation.get();
      if (affinityserver != null) {
        server = affinityserver;
      } else {
        affinityServerLocation.set(server);
      }
      // redirect to executeWithServerAffinity so that we
      // can send a TXFailoverOp.
      return executeWithServerAffinity(server, op);
    }
    return executeOnServer(server, op, accessed, onlyUseExistingCnx);
  }

  protected Object executeOnServer(ServerLocation p_server, Op op, boolean accessed,
      boolean onlyUseExistingCnx) {
    boolean returnCnx = true;
    boolean pingOp = (op instanceof PingOp.PingOpImpl);
    Connection conn = null;
    if (pingOp) {
      // currently for pings we prefer to queue clientToServer cnx so that we will
      // not create a pooled cnx when all we have is queue cnxs.
      if (queueManager != null) {
        // see if our QueueManager has a connection to this server that we can send
        // the ping on.
        Endpoint ep = endpointManager.getEndpointMap().get(p_server);
        if (ep != null) {
          QueueConnections qcs = queueManager.getAllConnectionsNoWait();
          conn = qcs.getConnection(ep);
          if (conn != null) {
            // we found one to do the ping on
            returnCnx = false;
          }
        }
      }
    }
    if (conn == null) {
      conn = connectionManager.borrowConnection(p_server, onlyUseExistingCnx);
    }
    try {
      return executeWithPossibleReAuthentication(conn, op);
    } catch (Exception e) {
      handleException(e, conn, 0, true);
      // this shouldn't actually be reached, handle exception will throw something
      throw new ServerConnectivityException("Received error connecting to server", e);
    } finally {
      if (serverAffinity.get() && affinityServerLocation.get() == null) {
        if (logger.isDebugEnabled()) {
          logger.debug("setting server affinity to {} server:{}", conn.getEndpoint().getMemberId(),
              conn.getServer());
        }
        affinityServerLocation.set(conn.getServer());
      }
      if (returnCnx) {
        connectionManager.returnConnection(conn, accessed);
      }
    }
  }

  /*
   * (non-Javadoc)
   *
   * @see
   * org.apache.geode.cache.client.internal.ExecutablePool#executeOnPrimary(org.apache.geode.cache.
   * client.internal.Op)
   */
  @Override
  public Object executeOnPrimary(Op op) {
    if (queueManager == null) {
      throw new SubscriptionNotEnabledException();
    }

    HashSet<ServerLocation> attemptedPrimaries = new HashSet<>();
    while (true) {
      Connection primary = queueManager.getAllConnections().getPrimary();
      try {
        return executeWithPossibleReAuthentication(primary, op);
      } catch (Exception e) {
        boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
        handleException(e, primary, 0, finalAttempt);
        // we shouldn't reach this code, but just in case
        if (finalAttempt) {
          throw new ServerConnectivityException("Tried the same primary server twice.", e);
        }
      }
    }
  }

  @Override
  public void executeOnAllQueueServers(Op op) {
    if (queueManager == null) {
      throw new SubscriptionNotEnabledException();
    }

    RuntimeException lastException = null;

    QueueConnections connections = queueManager.getAllConnectionsNoWait();
    Connection primary = connections.getPrimary();
    if (primary != null) {
      try {
        executeWithPossibleReAuthentication(primary, op);
      } catch (Exception e) {
        try {
          handleException(e, primary, 0, false);
        } catch (RuntimeException e2) {
          lastException = e2;
        }
      }
    }

    List<Connection> backups = connections.getBackups();
    for (Connection conn : backups) {
      try {
        executeWithPossibleReAuthentication(conn, op);
      } catch (Exception e) {
        try {
          handleException(e, conn, 0, false);
        } catch (RuntimeException e2) {
          lastException = e2;
        }
      }
    }

    if (lastException != null) {
      throw lastException;
    }
  }

  /*
   * (non-Javadoc)
   *
   * @see
   * org.apache.geode.cache.client.internal.ExecutablePool#executeOnAllQueueServers(org.apache.geode
   * .cache.client.internal.Op)
   */
  @Override
  public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
    if (queueManager == null) {
      throw new SubscriptionNotEnabledException();
    }
    QueueConnections connections = queueManager.getAllConnections();

    List backups = connections.getBackups();
    if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
      logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "sending {} to backups: {}", op, backups);
    }
    for (int i = backups.size() - 1; i >= 0; i--) {
      Connection conn = (Connection) backups.get(i);
      try {
        executeWithPossibleReAuthentication(conn, op);
      } catch (Exception e) {
        handleException(e, conn, 0, false);
      }
    }

    Connection primary = connections.getPrimary();
    HashSet<ServerLocation> attemptedPrimaries = new HashSet<>();
    while (true) {
      try {
        if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
          logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "sending {} to primary: {}", op, primary);
        }
        return executeWithPossibleReAuthentication(primary, op);
      } catch (Exception e) {
        if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
          logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "caught exception sending to primary {}",
              e.getMessage(), e);
        }
        boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
        handleException(e, primary, 0, finalAttempt);
        primary = queueManager.getAllConnections().getPrimary();
        // we shouldn't reach this code, but just in case
        if (finalAttempt) {
          throw new ServerConnectivityException("Tried the same primary server twice.", e);
        }
      }
    }
  }

  /**
   * Used by GatewayBatchOp
   */
  @Override
  public Object executeOn(Connection conn, Op op, boolean timeoutFatal) {
    try {
      return executeWithPossibleReAuthentication(conn, op);
    } catch (Exception e) {
      handleException(op, e, conn, 0, true, timeoutFatal);
      // this shouldn't actually be reached, handle exception will throw something
      throw new ServerConnectivityException("Received error connecting to server", e);
    }
  }

  /**
   * This is used by unit tests
   */
  @Override
  public Object executeOn(Connection conn, Op op) {
    return executeOn(conn, op, false);
  }

  @Override
  public RegisterInterestTracker getRITracker() {
    return riTracker;
  }

  /**
   * This method will throw an exception if we need to stop the connection manager if there are
   * failures.
   */
  protected void handleException(Throwable e, Connection conn, int retryCount,
      boolean finalAttempt) {
    handleException(e, conn, retryCount, finalAttempt, false/* timeoutFatal */);
  }

  protected void handleException(Op op, Throwable e, Connection conn, int retryCount,
      boolean finalAttempt, boolean timeoutFatal) throws CacheRuntimeException {
    if (op instanceof AuthenticateUserOp.AuthenticateUserOpImpl) {
      if (e instanceof GemFireSecurityException) {
        throw (GemFireSecurityException) e;
      } else if (e instanceof ServerRefusedConnectionException) {
        throw (ServerRefusedConnectionException) e;
      }
    }
    handleException(e, conn, retryCount, finalAttempt, timeoutFatal);
  }

  protected void handleException(Throwable e, Connection conn, int retryCount, boolean finalAttempt,
      boolean timeoutFatal) throws CacheRuntimeException {
    GemFireException exToThrow = null;
    String title;
    boolean invalidateServer = true;
    boolean warn = true;
    boolean forceThrow = false;
    Throwable cause = e;

    cancelCriterion.checkCancelInProgress(e);

    if (logger.isDebugEnabled() && !(e instanceof java.io.EOFException)) {
      if (e instanceof java.net.SocketTimeoutException) {
        logger.debug("OpExecutor.handleException on Connection to {} read timed out",
            conn.getServer());
      } else {
        logger.debug("OpExecutor.handleException on Connection to {}", conn.getServer(), e);
      }
    }

    // first take care of all exceptions that should not invalidate the
    // connection and do not need to be logged

    if (e instanceof MessageTooLargeException) {
      title = null;
      exToThrow = new GemFireIOException("message is too large to transmit", e);
    } else if (e instanceof NotSerializableException) {
      title = null; // no message
      exToThrow = new SerializationException("Pool message failure", e);
    } else if (e instanceof BatchException || e instanceof BatchException70) {
      title = null; // no message
      exToThrow = new ServerOperationException(e);
    } else if (e instanceof RegionDestroyedException) {
      invalidateServer = false;
      title = null;
      exToThrow = (RegionDestroyedException) e;
    } else if (e instanceof GemFireSecurityException) {
      title = null;
      exToThrow = new ServerOperationException(e);
    } else if (e instanceof SerializationException) {
      title = null; // no message
      exToThrow = new ServerOperationException(e);
    } else if (e instanceof CopyException) {
      title = null; // no message
      exToThrow = new ServerOperationException(e);
    } else if (e instanceof ClassNotFoundException) {
      title = null; // no message
      exToThrow = new ServerOperationException(e);
    } else if (e instanceof TransactionException) {
      title = null; // no message
      exToThrow = (TransactionException) e;
      invalidateServer = false;
    } else if (e instanceof SynchronizationCommitConflictException) {
      title = null;
      exToThrow = (SynchronizationCommitConflictException) e;
      invalidateServer = false;
    } else if (e instanceof SocketException) {
      if ("Socket closed".equals(e.getMessage()) || "Connection reset".equals(e.getMessage())
          || "Connection refused: connect".equals(e.getMessage())
          || "Connection refused".equals(e.getMessage())) {
        title = e.getMessage();
      } else {
        title = "SocketException";
      }
    } else if (e instanceof SocketTimeoutException) {
      invalidateServer = timeoutFatal;
      title = "socket timed out on client";
      cause = null;
    } else if (e instanceof ConnectionDestroyedException) {
      invalidateServer = false;
      title = "connection was asynchronously destroyed";
      cause = null;
    } else if (e instanceof java.io.EOFException) {
      title = "closed socket on server";
    } else if (e instanceof IOException) {
      title = "IOException";
    } else if (e instanceof BufferUnderflowException) {
      title = "buffer underflow reading from server";
    } else if (e instanceof CancelException) {
      title = "Cancelled";
      warn = false;
    } else if (e instanceof InternalFunctionInvocationTargetException) {
      // In this case, function will be re executed
      title = null;
      exToThrow = (InternalFunctionInvocationTargetException) e;
    } else if (e instanceof FunctionInvocationTargetException) {
      // in this case function will not be re executed
      title = null;
      exToThrow = (GemFireException) e;
    } else if (e instanceof PutAllPartialResultException) {
      title = null;
      exToThrow = (PutAllPartialResultException) e;
      invalidateServer = false;
    } else {
      Throwable t = e.getCause();
      if ((t instanceof IOException)
          || (t instanceof SerializationException) || (t instanceof CopyException)
          || (t instanceof GemFireSecurityException) || (t instanceof ServerOperationException)
          || (t instanceof TransactionException) || (t instanceof CancelException)) {
        handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
        return;
      } else if (e instanceof ServerOperationException) {
        title = null; // no message
        exToThrow = (ServerOperationException) e;
        invalidateServer = false; // fix for bug #42225
      } else if (e instanceof FunctionException) {
        if (t instanceof InternalFunctionInvocationTargetException) {
          // Client server to re-execute for node failure
          handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
          return;
        } else {
          title = null; // no message
          exToThrow = (FunctionException) e;
        }
      } else if (e instanceof ServerConnectivityException
          && e.getMessage().equals("Connection error while authenticating user")) {
        title = null;
        if (logger.isDebugEnabled()) {
          logger.debug(e.getMessage(), e);
        }
      } else {
        title = e.toString();
        forceThrow = true;
      }
    }
    if (title != null) {
      conn.destroy();
      if (invalidateServer) {
        endpointManager.serverCrashed(conn.getEndpoint());
      }
      boolean logEnabled = warn ? logger.isWarnEnabled() : logger.isDebugEnabled();
      boolean msgNeeded = logEnabled || finalAttempt;
      if (msgNeeded) {
        final StringBuffer sb = getExceptionMessage(title, retryCount, finalAttempt, conn);
        final String msg = sb.toString();
        if (logEnabled) {
          if (warn) {
            logger.warn(msg);
          } else {
            logger.debug(msg, e);
          }
        }
        if (forceThrow || finalAttempt) {
          exToThrow = new ServerConnectivityException(msg, cause);
        }
      }
    }
    if (exToThrow != null) {
      throw exToThrow;
    }
  }

  private StringBuffer getExceptionMessage(String exceptionName, int retryCount,
      boolean finalAttempt, Connection connection) {
    StringBuffer message = new StringBuffer(200);
    message.append("Pool unexpected ").append(exceptionName);
    if (connection != null) {
      message.append(" connection=").append(connection);
    }
    if (retryCount > 0) {
      message.append(" attempt=").append(retryCount + 1);
    }
    message.append(')');
    if (finalAttempt) {
      message.append(". Server unreachable: could not connect after ").append(retryCount + 1)
          .append(" attempts");
    }
    return message;
  }

  private void authenticateIfRequired(Connection conn, Op op) {
    if (!conn.getServer().getRequiresCredentials()) {
      return;
    }

    if (pool == null) {
      PoolImpl poolImpl =
          (PoolImpl) PoolManagerImpl.getPMI().find(endpointManager.getPoolName());
      if (poolImpl == null) {
        return;
      }
      pool = poolImpl;
    }
    if (pool.getMultiuserAuthentication()) {
      if (((AbstractOp) op).needsUserId()) {
        UserAttributes ua = UserAttributes.userAttributes.get();
        if (ua != null) {
          if (!ua.getServerToId().containsKey(conn.getServer())) {
            authenticateMultiuser(pool, conn, ua);
          }
        }
      }
    } else if (((AbstractOp) op).needsUserId()) {
      // This should not be reached, but keeping this code here in case it is
      // reached.
      if (conn.getServer().getUserId() == -1) {
        Connection connImpl = conn.getWrappedConnection();
        conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, pool));
        if (logger.isDebugEnabled()) {
          logger.debug(
              "OpExecutorImpl.execute() - single user mode - authenticated this user on {}", conn);
        }
      }
    }
  }

  private void authenticateMultiuser(PoolImpl pool, Connection conn, UserAttributes ua) {
    try {
      Long userId =
          (Long) AuthenticateUserOp.executeOn(conn.getServer(), pool, ua.getCredentials());
      if (userId != null) {
        ua.setServerToId(conn.getServer(), userId);
        if (logger.isDebugEnabled()) {
          logger.debug("OpExecutorImpl.execute() - multiuser mode - authenticated this user on {}",
              conn);
        }
      }
    } catch (ServerConnectivityException sce) {
      Throwable cause = sce.getCause();
      if (cause instanceof IOException || cause instanceof BufferUnderflowException
          || cause instanceof CancelException
          || (sce.getMessage() != null
              && (sce.getMessage().contains("Could not create a new connection to server")
                  || sce.getMessage().contains("socket timed out on client")
                  || sce.getMessage().contains("connection was asynchronously destroyed")))) {
        throw new ServerConnectivityException("Connection error while authenticating user");
      } else {
        throw sce;
      }
    }
  }

  private Object executeWithPossibleReAuthentication(Connection conn, Op op) throws Exception {
    try {
      return conn.execute(op);

    } catch (ServerConnectivityException sce) {
      Throwable cause = sce.getCause();
      if ((cause instanceof AuthenticationRequiredException
          && "User authorization attributes not found.".equals(cause.getMessage()))
          || sce.getMessage().contains("Connection error while authenticating user")) {
        // (ashetkar) Need a cleaner way of doing above check.
        // 2nd exception-message above is from AbstractOp.sendMessage()

        PoolImpl pool =
            (PoolImpl) PoolManagerImpl.getPMI().find(endpointManager.getPoolName());
        if (!pool.getMultiuserAuthentication()) {
          Connection connImpl = conn.getWrappedConnection();
          conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, this));
          return conn.execute(op);
        } else {
          UserAttributes ua = UserAttributes.userAttributes.get();
          if (ua != null) {
            authenticateMultiuser(pool, conn, ua);
          }
          return conn.execute(op);
        }

      } else {
        throw sce;
      }
    }
  }

}
