blob: 00f708e9792c279c4f7e94e21e833f9227c8b934 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import java.io.IOException;
import java.io.NotSerializableException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.CopyException;
import org.apache.geode.GemFireException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.SerializationException;
import org.apache.geode.cache.CacheRuntimeException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.SubscriptionNotEnabledException;
import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
import org.apache.geode.cache.client.internal.QueueManager.QueueConnections;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.PoolManagerImpl;
import org.apache.geode.internal.cache.PutAllPartialResultException;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
import org.apache.geode.internal.cache.tier.BatchException;
import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
/**
* Called from the client and execute client to server requests against servers. Handles retrying to
* different servers, and marking servers dead if we get exception from them.
*
* @since GemFire 5.7
*/
public class OpExecutorImpl implements ExecutablePool {
private static final Logger logger = LogService.getLogger();
private static final boolean TRY_SERVERS_ONCE =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PoolImpl.TRY_SERVERS_ONCE");
static final int TX_RETRY_ATTEMPT =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "txRetryAttempt", 500);
private final ConnectionManager connectionManager;
private final int retryAttempts;
private final long serverTimeout;
private final EndpointManager endpointManager;
private final RegisterInterestTracker riTracker;
private final QueueManager queueManager;
private final CancelCriterion cancelCriterion;
private /* final */ PoolImpl pool;
private final ThreadLocal<Boolean> serverAffinity = ThreadLocal.withInitial(() -> Boolean.FALSE);
private boolean serverAffinityFailover = false;
private final ThreadLocal<ServerLocation> affinityServerLocation = new ThreadLocal<>();
private final ThreadLocal<Integer> affinityRetryCount = ThreadLocal.withInitial(() -> 0);
public OpExecutorImpl(ConnectionManager connectionManager, QueueManager queueManager,
EndpointManager endpointManager, RegisterInterestTracker riTracker, int retryAttempts,
long serverTimeout, CancelCriterion cancelCriterion,
PoolImpl pool) {
this.connectionManager = connectionManager;
this.queueManager = queueManager;
this.endpointManager = endpointManager;
this.riTracker = riTracker;
this.retryAttempts = retryAttempts;
this.serverTimeout = serverTimeout;
this.cancelCriterion = cancelCriterion;
this.pool = pool;
}
@Override
public Object execute(Op op) {
return execute(op, retryAttempts);
}
@Override
public Object execute(Op op, int retries) {
if (serverAffinity.get()) {
ServerLocation loc = affinityServerLocation.get();
if (loc == null) {
loc = getNextOpServerLocation();
affinityServerLocation.set(loc);
if (logger.isDebugEnabled()) {
logger.debug("setting server affinity to {}", affinityServerLocation.get());
}
}
return executeWithServerAffinity(loc, op);
}
Connection conn = connectionManager.borrowConnection(serverTimeout);
try {
Set<ServerLocation> attemptedServers = null;
for (int attempt = 0; true; attempt++) {
// when an op is retried we may need to try to recover the previous
// attempt's version stamp
if (attempt == 1 && (op instanceof AbstractOp)) {
AbstractOp absOp = (AbstractOp) op;
absOp.getMessage().setIsRetry();
}
try {
authenticateIfRequired(conn, op);
return executeWithPossibleReAuthentication(conn, op);
} catch (MessageTooLargeException e) {
throw new GemFireIOException("unable to transmit message to server", e);
} catch (Exception e) {
handleException(e, conn, attempt, attempt >= retries && retries != -1);
if (null == attemptedServers) {
// don't allocate this until we need it
attemptedServers = new HashSet<>();
}
attemptedServers.add(conn.getServer());
try {
conn = connectionManager.exchangeConnection(conn, attemptedServers);
} catch (NoAvailableServersException nse) {
// if retries is -1, don't try again after the last server has failed
if (retries == -1 || TRY_SERVERS_ONCE) {
handleException(e, conn, attempt, true);
} else {
// try one of the failed servers again, until we exceed the retry attempts.
attemptedServers.clear();
try {
conn = connectionManager.exchangeConnection(conn, attemptedServers);
} catch (NoAvailableServersException nse2) {
handleException(e, conn, attempt, true);
}
}
}
}
}
} finally {
connectionManager.returnConnection(conn);
}
}
/**
* execute the given op on the given server. If the server cannot be reached, sends a
* TXFailoverOp, then retries the given op
*
* @param loc the server to execute the op on
* @param op the op to execute
* @return the result of execution
*/
Object executeWithServerAffinity(ServerLocation loc, Op op) {
final int initialRetryCount = getAffinityRetryCount();
try {
try {
return executeOnServer(loc, op, true, false);
} catch (ServerConnectivityException e) {
if (logger.isDebugEnabled()) {
logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e);
}
if (!serverAffinityFailover || e instanceof ServerOperationException) {
throw e;
}
int retryCount = getAffinityRetryCount();
if ((retryAttempts != -1 && retryCount >= retryAttempts)
|| retryCount > TX_RETRY_ATTEMPT) {
// prevent stack overflow
throw e;
}
setAffinityRetryCount(retryCount + 1);
}
affinityServerLocation.set(null);
if (logger.isDebugEnabled()) {
logger.debug("reset server affinity: attempting txFailover");
}
// send TXFailoverOp, so that new server can
// do bootstrapping, then re-execute original op
AbstractOp absOp = (AbstractOp) op;
absOp.getMessage().setIsRetry();
int transactionId = absOp.getMessage().getTransactionId();
// for CommitOp we do not have transactionId in AbstractOp
// so set it explicitly for TXFailoverOp
TXFailoverOp.execute(pool, transactionId);
if (op instanceof ExecuteRegionFunctionOpImpl) {
op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op, (byte) 1,
new HashSet<>());
((ExecuteRegionFunctionOpImpl) op).getMessage().setTransactionId(transactionId);
} else if (op instanceof ExecuteFunctionOpImpl) {
op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* isReExecute */);
((ExecuteFunctionOpImpl) op).getMessage().setTransactionId(transactionId);
}
return pool.execute(op);
} finally {
if (initialRetryCount == 0) {
setAffinityRetryCount(0);
}
}
}
@Override
public void setupServerAffinity(boolean allowFailover) {
if (logger.isDebugEnabled()) {
logger.debug("setting up server affinity");
}
serverAffinityFailover = allowFailover;
serverAffinity.set(Boolean.TRUE);
}
@Override
public void releaseServerAffinity() {
if (logger.isDebugEnabled()) {
logger.debug("reset server affinity");
}
serverAffinity.set(Boolean.FALSE);
affinityServerLocation.set(null);
}
@Override
public ServerLocation getServerAffinityLocation() {
return affinityServerLocation.get();
}
int getAffinityRetryCount() {
return affinityRetryCount.get();
}
void setAffinityRetryCount(int retryCount) {
affinityRetryCount.set(retryCount);
}
@Override
public void setServerAffinityLocation(ServerLocation serverLocation) {
assert affinityServerLocation.get() == null;
affinityServerLocation.set(serverLocation);
}
public ServerLocation getNextOpServerLocation() {
Connection conn = connectionManager.borrowConnection(serverTimeout);
try {
return conn.getServer();
} finally {
connectionManager.returnConnection(conn);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.cache.client.internal.OpExecutor#executeOn(org.apache.geode.distributed.
* internal.ServerLocation, org.apache.geode.cache.client.internal.Op)
*/
@Override
public Object executeOn(ServerLocation server, Op op) {
return executeOn(server, op, true, false);
}
@Override
public Object executeOn(ServerLocation p_server, Op op, boolean accessed,
boolean onlyUseExistingCnx) {
ServerLocation server = p_server;
if (serverAffinity.get()) {
ServerLocation affinityserver = affinityServerLocation.get();
if (affinityserver != null) {
server = affinityserver;
} else {
affinityServerLocation.set(server);
}
// redirect to executeWithServerAffinity so that we
// can send a TXFailoverOp.
return executeWithServerAffinity(server, op);
}
return executeOnServer(server, op, accessed, onlyUseExistingCnx);
}
protected Object executeOnServer(ServerLocation p_server, Op op, boolean accessed,
boolean onlyUseExistingCnx) {
boolean returnCnx = true;
boolean pingOp = (op instanceof PingOp.PingOpImpl);
Connection conn = null;
if (pingOp) {
// currently for pings we prefer to queue clientToServer cnx so that we will
// not create a pooled cnx when all we have is queue cnxs.
if (queueManager != null) {
// see if our QueueManager has a connection to this server that we can send
// the ping on.
Endpoint ep = endpointManager.getEndpointMap().get(p_server);
if (ep != null) {
QueueConnections qcs = queueManager.getAllConnectionsNoWait();
conn = qcs.getConnection(ep);
if (conn != null) {
// we found one to do the ping on
returnCnx = false;
}
}
}
}
if (conn == null) {
conn = connectionManager.borrowConnection(p_server, onlyUseExistingCnx);
}
try {
return executeWithPossibleReAuthentication(conn, op);
} catch (Exception e) {
handleException(e, conn, 0, true);
// this shouldn't actually be reached, handle exception will throw something
throw new ServerConnectivityException("Received error connecting to server", e);
} finally {
if (serverAffinity.get() && affinityServerLocation.get() == null) {
if (logger.isDebugEnabled()) {
logger.debug("setting server affinity to {} server:{}", conn.getEndpoint().getMemberId(),
conn.getServer());
}
affinityServerLocation.set(conn.getServer());
}
if (returnCnx) {
connectionManager.returnConnection(conn, accessed);
}
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ExecutablePool#executeOnPrimary(org.apache.geode.cache.
* client.internal.Op)
*/
@Override
public Object executeOnPrimary(Op op) {
if (queueManager == null) {
throw new SubscriptionNotEnabledException();
}
HashSet<ServerLocation> attemptedPrimaries = new HashSet<>();
while (true) {
Connection primary = queueManager.getAllConnections().getPrimary();
try {
return executeWithPossibleReAuthentication(primary, op);
} catch (Exception e) {
boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
handleException(e, primary, 0, finalAttempt);
// we shouldn't reach this code, but just in case
if (finalAttempt) {
throw new ServerConnectivityException("Tried the same primary server twice.", e);
}
}
}
}
@Override
public void executeOnAllQueueServers(Op op) {
if (queueManager == null) {
throw new SubscriptionNotEnabledException();
}
RuntimeException lastException = null;
QueueConnections connections = queueManager.getAllConnectionsNoWait();
Connection primary = connections.getPrimary();
if (primary != null) {
try {
executeWithPossibleReAuthentication(primary, op);
} catch (Exception e) {
try {
handleException(e, primary, 0, false);
} catch (RuntimeException e2) {
lastException = e2;
}
}
}
List<Connection> backups = connections.getBackups();
for (Connection conn : backups) {
try {
executeWithPossibleReAuthentication(conn, op);
} catch (Exception e) {
try {
handleException(e, conn, 0, false);
} catch (RuntimeException e2) {
lastException = e2;
}
}
}
if (lastException != null) {
throw lastException;
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ExecutablePool#executeOnAllQueueServers(org.apache.geode
* .cache.client.internal.Op)
*/
@Override
public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
if (queueManager == null) {
throw new SubscriptionNotEnabledException();
}
QueueConnections connections = queueManager.getAllConnections();
List backups = connections.getBackups();
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "sending {} to backups: {}", op, backups);
}
for (int i = backups.size() - 1; i >= 0; i--) {
Connection conn = (Connection) backups.get(i);
try {
executeWithPossibleReAuthentication(conn, op);
} catch (Exception e) {
handleException(e, conn, 0, false);
}
}
Connection primary = connections.getPrimary();
HashSet<ServerLocation> attemptedPrimaries = new HashSet<>();
while (true) {
try {
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "sending {} to primary: {}", op, primary);
}
return executeWithPossibleReAuthentication(primary, op);
} catch (Exception e) {
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "caught exception sending to primary {}",
e.getMessage(), e);
}
boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
handleException(e, primary, 0, finalAttempt);
primary = queueManager.getAllConnections().getPrimary();
// we shouldn't reach this code, but just in case
if (finalAttempt) {
throw new ServerConnectivityException("Tried the same primary server twice.", e);
}
}
}
}
/**
* Used by GatewayBatchOp
*/
@Override
public Object executeOn(Connection conn, Op op, boolean timeoutFatal) {
try {
return executeWithPossibleReAuthentication(conn, op);
} catch (Exception e) {
handleException(op, e, conn, 0, true, timeoutFatal);
// this shouldn't actually be reached, handle exception will throw something
throw new ServerConnectivityException("Received error connecting to server", e);
}
}
/**
* This is used by unit tests
*/
@Override
public Object executeOn(Connection conn, Op op) {
return executeOn(conn, op, false);
}
@Override
public RegisterInterestTracker getRITracker() {
return riTracker;
}
/**
* This method will throw an exception if we need to stop the connection manager if there are
* failures.
*/
protected void handleException(Throwable e, Connection conn, int retryCount,
boolean finalAttempt) {
handleException(e, conn, retryCount, finalAttempt, false/* timeoutFatal */);
}
protected void handleException(Op op, Throwable e, Connection conn, int retryCount,
boolean finalAttempt, boolean timeoutFatal) throws CacheRuntimeException {
if (op instanceof AuthenticateUserOp.AuthenticateUserOpImpl) {
if (e instanceof GemFireSecurityException) {
throw (GemFireSecurityException) e;
} else if (e instanceof ServerRefusedConnectionException) {
throw (ServerRefusedConnectionException) e;
}
}
handleException(e, conn, retryCount, finalAttempt, timeoutFatal);
}
protected void handleException(Throwable e, Connection conn, int retryCount, boolean finalAttempt,
boolean timeoutFatal) throws CacheRuntimeException {
GemFireException exToThrow = null;
String title;
boolean invalidateServer = true;
boolean warn = true;
boolean forceThrow = false;
Throwable cause = e;
cancelCriterion.checkCancelInProgress(e);
if (logger.isDebugEnabled() && !(e instanceof java.io.EOFException)) {
if (e instanceof java.net.SocketTimeoutException) {
logger.debug("OpExecutor.handleException on Connection to {} read timed out",
conn.getServer());
} else {
logger.debug("OpExecutor.handleException on Connection to {}", conn.getServer(), e);
}
}
// first take care of all exceptions that should not invalidate the
// connection and do not need to be logged
if (e instanceof MessageTooLargeException) {
title = null;
exToThrow = new GemFireIOException("message is too large to transmit", e);
} else if (e instanceof NotSerializableException) {
title = null; // no message
exToThrow = new SerializationException("Pool message failure", e);
} else if (e instanceof BatchException || e instanceof BatchException70) {
title = null; // no message
exToThrow = new ServerOperationException(e);
} else if (e instanceof RegionDestroyedException) {
invalidateServer = false;
title = null;
exToThrow = (RegionDestroyedException) e;
} else if (e instanceof GemFireSecurityException) {
title = null;
exToThrow = new ServerOperationException(e);
} else if (e instanceof SerializationException) {
title = null; // no message
exToThrow = new ServerOperationException(e);
} else if (e instanceof CopyException) {
title = null; // no message
exToThrow = new ServerOperationException(e);
} else if (e instanceof ClassNotFoundException) {
title = null; // no message
exToThrow = new ServerOperationException(e);
} else if (e instanceof TransactionException) {
title = null; // no message
exToThrow = (TransactionException) e;
invalidateServer = false;
} else if (e instanceof SynchronizationCommitConflictException) {
title = null;
exToThrow = (SynchronizationCommitConflictException) e;
invalidateServer = false;
} else if (e instanceof SocketException) {
if ("Socket closed".equals(e.getMessage()) || "Connection reset".equals(e.getMessage())
|| "Connection refused: connect".equals(e.getMessage())
|| "Connection refused".equals(e.getMessage())) {
title = e.getMessage();
} else {
title = "SocketException";
}
} else if (e instanceof SocketTimeoutException) {
invalidateServer = timeoutFatal;
title = "socket timed out on client";
cause = null;
} else if (e instanceof ConnectionDestroyedException) {
invalidateServer = false;
title = "connection was asynchronously destroyed";
cause = null;
} else if (e instanceof java.io.EOFException) {
title = "closed socket on server";
} else if (e instanceof IOException) {
title = "IOException";
} else if (e instanceof BufferUnderflowException) {
title = "buffer underflow reading from server";
} else if (e instanceof CancelException) {
title = "Cancelled";
warn = false;
} else if (e instanceof InternalFunctionInvocationTargetException) {
// In this case, function will be re executed
title = null;
exToThrow = (InternalFunctionInvocationTargetException) e;
} else if (e instanceof FunctionInvocationTargetException) {
// in this case function will not be re executed
title = null;
exToThrow = (GemFireException) e;
} else if (e instanceof PutAllPartialResultException) {
title = null;
exToThrow = (PutAllPartialResultException) e;
invalidateServer = false;
} else {
Throwable t = e.getCause();
if ((t instanceof IOException)
|| (t instanceof SerializationException) || (t instanceof CopyException)
|| (t instanceof GemFireSecurityException) || (t instanceof ServerOperationException)
|| (t instanceof TransactionException) || (t instanceof CancelException)) {
handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
return;
} else if (e instanceof ServerOperationException) {
title = null; // no message
exToThrow = (ServerOperationException) e;
invalidateServer = false; // fix for bug #42225
} else if (e instanceof FunctionException) {
if (t instanceof InternalFunctionInvocationTargetException) {
// Client server to re-execute for node failure
handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
return;
} else {
title = null; // no message
exToThrow = (FunctionException) e;
}
} else if (e instanceof ServerConnectivityException
&& e.getMessage().equals("Connection error while authenticating user")) {
title = null;
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage(), e);
}
} else {
title = e.toString();
forceThrow = true;
}
}
if (title != null) {
conn.destroy();
if (invalidateServer) {
endpointManager.serverCrashed(conn.getEndpoint());
}
boolean logEnabled = warn ? logger.isWarnEnabled() : logger.isDebugEnabled();
boolean msgNeeded = logEnabled || finalAttempt;
if (msgNeeded) {
final StringBuffer sb = getExceptionMessage(title, retryCount, finalAttempt, conn);
final String msg = sb.toString();
if (logEnabled) {
if (warn) {
logger.warn(msg);
} else {
logger.debug(msg, e);
}
}
if (forceThrow || finalAttempt) {
exToThrow = new ServerConnectivityException(msg, cause);
}
}
}
if (exToThrow != null) {
throw exToThrow;
}
}
private StringBuffer getExceptionMessage(String exceptionName, int retryCount,
boolean finalAttempt, Connection connection) {
StringBuffer message = new StringBuffer(200);
message.append("Pool unexpected ").append(exceptionName);
if (connection != null) {
message.append(" connection=").append(connection);
}
if (retryCount > 0) {
message.append(" attempt=").append(retryCount + 1);
}
message.append(')');
if (finalAttempt) {
message.append(". Server unreachable: could not connect after ").append(retryCount + 1)
.append(" attempts");
}
return message;
}
private void authenticateIfRequired(Connection conn, Op op) {
if (!conn.getServer().getRequiresCredentials()) {
return;
}
if (pool == null) {
PoolImpl poolImpl =
(PoolImpl) PoolManagerImpl.getPMI().find(endpointManager.getPoolName());
if (poolImpl == null) {
return;
}
pool = poolImpl;
}
if (pool.getMultiuserAuthentication()) {
if (((AbstractOp) op).needsUserId()) {
UserAttributes ua = UserAttributes.userAttributes.get();
if (ua != null) {
if (!ua.getServerToId().containsKey(conn.getServer())) {
authenticateMultiuser(pool, conn, ua);
}
}
}
} else if (((AbstractOp) op).needsUserId()) {
// This should not be reached, but keeping this code here in case it is
// reached.
if (conn.getServer().getUserId() == -1) {
Connection connImpl = conn.getWrappedConnection();
conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, pool));
if (logger.isDebugEnabled()) {
logger.debug(
"OpExecutorImpl.execute() - single user mode - authenticated this user on {}", conn);
}
}
}
}
private void authenticateMultiuser(PoolImpl pool, Connection conn, UserAttributes ua) {
try {
Long userId =
(Long) AuthenticateUserOp.executeOn(conn.getServer(), pool, ua.getCredentials());
if (userId != null) {
ua.setServerToId(conn.getServer(), userId);
if (logger.isDebugEnabled()) {
logger.debug("OpExecutorImpl.execute() - multiuser mode - authenticated this user on {}",
conn);
}
}
} catch (ServerConnectivityException sce) {
Throwable cause = sce.getCause();
if (cause instanceof IOException || cause instanceof BufferUnderflowException
|| cause instanceof CancelException
|| (sce.getMessage() != null
&& (sce.getMessage().contains("Could not create a new connection to server")
|| sce.getMessage().contains("socket timed out on client")
|| sce.getMessage().contains("connection was asynchronously destroyed")))) {
throw new ServerConnectivityException("Connection error while authenticating user");
} else {
throw sce;
}
}
}
private Object executeWithPossibleReAuthentication(Connection conn, Op op) throws Exception {
try {
return conn.execute(op);
} catch (ServerConnectivityException sce) {
Throwable cause = sce.getCause();
if ((cause instanceof AuthenticationRequiredException
&& "User authorization attributes not found.".equals(cause.getMessage()))
|| sce.getMessage().contains("Connection error while authenticating user")) {
// (ashetkar) Need a cleaner way of doing above check.
// 2nd exception-message above is from AbstractOp.sendMessage()
PoolImpl pool =
(PoolImpl) PoolManagerImpl.getPMI().find(endpointManager.getPoolName());
if (!pool.getMultiuserAuthentication()) {
Connection connImpl = conn.getWrappedConnection();
conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, this));
return conn.execute(op);
} else {
UserAttributes ua = UserAttributes.userAttributes.get();
if (ua != null) {
authenticateMultiuser(pool, conn, ua);
}
return conn.execute(op);
}
} else {
throw sce;
}
}
}
}