blob: f0bf9e998508380b6f84ecab496721c97bef6690 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.GemFireException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
import com.gemstone.gemfire.cache.client.ServerConnectivityException;
import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker.RegionInterestEntry;
import com.gemstone.gemfire.cache.client.internal.ServerBlackList.BlackListListener;
import com.gemstone.gemfire.cache.client.internal.ServerBlackList.BlackListListenerAdapter;
import com.gemstone.gemfire.cache.client.internal.ServerBlackList.FailureTracker;
import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
import com.gemstone.gemfire.cache.query.internal.cq.ClientCQ;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.BridgeObserver;
import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.tier.InterestType;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientUpdater;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.security.GemFireSecurityException;
import com.gemstone.org.jgroups.util.StringId;
/**
* Manages Client Queues. Responsible for creating callback connections and
* satisfying redundancy requirements.
*
* @author dsmith
* @since 5.7
*
*/
public class QueueManagerImpl implements QueueManager {
private static final Logger logger = LogService.getLogger();
// private static final long SERVER_LOCATION_TIMEOUT = Long.getLong(
// "gemfire.QueueManagerImpl.SERVER_LOCATION_TIMEOUT", 120000).longValue();
private static final Comparator QSIZE_COMPARATOR = new QSizeComparator();
protected final long redundancyRetryInterval;
private final EndpointManager endpointManager;
private final EndpointManager.EndpointListenerAdapter endpointListener;
private final ConnectionSource source;
private final int redundancyLevel;
protected final ConnectionFactory factory;
private final InternalLogWriter securityLogger;
private final ClientProxyMembershipID proxyId;
protected final InternalPool pool;
private final QueueStateImpl state;
private boolean printPrimaryNotFoundError;
private boolean printRedundancyNotSatisfiedError;
private boolean printRecoveringPrimary;
private boolean printRecoveringRedundant;
protected final ServerBlackList blackList;
// Lock which guards updates to queueConnections.
// Also threads calling getAllConnections will wait on this
// lock until there is a primary.
protected final Object lock = new Object();
protected final CountDownLatch initializedLatch = new CountDownLatch(1);
private ScheduledThreadPoolExecutor recoveryThread;
private volatile boolean sentClientReady;
// queueConnections in maintained by using copy-on-write
protected volatile ConnectionList queueConnections = new ConnectionList();
protected volatile RedundancySatisfierTask redundancySatisfierTask = null;
private volatile boolean shuttingDown;
public QueueManagerImpl(
InternalPool pool,
EndpointManager endpointManager,
ConnectionSource source,
ConnectionFactory factory,
int queueRedundancyLevel,
long redundancyRetryInterval,
InternalLogWriter securityLogger,
ClientProxyMembershipID proxyId) {
this.printPrimaryNotFoundError = true;
this.printRedundancyNotSatisfiedError = true;
this.printRecoveringRedundant = true;
this.printRecoveringPrimary = true;
this.pool = pool;
this.endpointManager = endpointManager;
this.source = source;
this.factory = factory;
this.redundancyLevel = queueRedundancyLevel;
this.securityLogger = securityLogger;
this.proxyId = proxyId;
this.redundancyRetryInterval = redundancyRetryInterval;
blackList = new ServerBlackList(redundancyRetryInterval);
this.endpointListener = new EndpointManager.EndpointListenerAdapter() {
@Override
public void endpointCrashed(Endpoint endpoint) {
QueueManagerImpl.this.endpointCrashed(endpoint);
}
};
this.state = new QueueStateImpl(this);
}
public InternalPool getPool() {
return pool;
}
public boolean isPrimaryUpdaterAlive() {
boolean result = false;
QueueConnectionImpl primary = (QueueConnectionImpl)
queueConnections.getPrimary();
if (primary != null) {
ClientUpdater cu = primary.getUpdater();
if (cu != null) {
result = ((CacheClientUpdater)cu).isAlive();
}
}
return result;
}
public QueueConnections getAllConnectionsNoWait() {
return queueConnections;
}
public QueueConnections getAllConnections() {
ConnectionList snapshot = queueConnections;
if (snapshot.getPrimary() == null) {
// wait for a new primary to become available.
synchronized (lock) {
snapshot = queueConnections;
while (snapshot.getPrimary() == null
&& !snapshot.primaryDiscoveryFailed() && !shuttingDown && pool.getPoolOrCacheCancelInProgress()==null) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
snapshot = queueConnections;
}
}
}
if (snapshot.getPrimary() == null) {
pool.getCancelCriterion().checkCancelInProgress(null);
GemFireException exception = snapshot.getPrimaryDiscoveryException();
if(exception == null || exception instanceof NoSubscriptionServersAvailableException) {
exception = new NoSubscriptionServersAvailableException(exception);
}
else {
exception = new ServerConnectivityException(exception.getMessage(), exception);
}
throw exception;
}
return snapshot;
}
public InternalLogWriter getSecurityLogger() {
return securityLogger;
}
public void close(boolean keepAlive) {
endpointManager.removeListener(endpointListener);
synchronized (lock) {
shuttingDown = true;
if (redundancySatisfierTask != null) {
redundancySatisfierTask.cancel();
}
lock.notifyAll();
}
if (recoveryThread != null) {
// it will be null if we never called start
recoveryThread.shutdown();
}
if (recoveryThread != null) {
try {
if(!recoveryThread.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
logger.warn(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_TIMEOUT_WAITING_FOR_RECOVERY_THREAD_TO_COMPLETE));
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
logger.debug("Interrupted waiting for recovery thread termination");
}
}
QueueConnectionImpl primary = (QueueConnectionImpl) queueConnections
.getPrimary();
if(logger.isDebugEnabled()) {
logger.debug("QueueManagerImpl - closing connections with keepAlive={}", keepAlive);
}
if (primary != null) {
try {
if(logger.isDebugEnabled()) {
logger.debug("QueueManagerImpl - closing primary {}", primary);
}
primary.internalClose(keepAlive);
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_ERROR_CLOSING_PRIMARY_CONNECTION_TO_0,
primary.getEndpoint()), e);
}
}
List backups = queueConnections.getBackups();
for (Iterator itr = backups.iterator(); itr.hasNext();) {
QueueConnectionImpl backup = (QueueConnectionImpl) itr.next();
if (backup != null) {
try {
if(logger.isDebugEnabled()) {
logger.debug("QueueManagerImpl - closing backup {}", backup);
}
backup.internalClose(keepAlive);
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_ERROR_CLOSING_BACKUP_CONNECTION_TO_0,
backup.getEndpoint()), e);
}
}
}
}
public void emergencyClose() {
shuttingDown = true;
queueConnections.getPrimary().emergencyClose();
List backups = queueConnections.getBackups();
for(int i = 0; i < backups.size(); i++) {
Connection backup = (Connection) backups.get(i);
backup.emergencyClose();
}
}
public void start(ScheduledExecutorService background) {
try {
blackList.start(background);
endpointManager.addListener(endpointListener);
// Use a separate timer for queue management tasks
// We don't want primary recovery (and therefore user threads) to wait for
// things like pinging connections for health checks.
// this.background = background;
final String name = "queueTimer-" + this.pool.getName();
this.recoveryThread = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread result = new Thread(r, name);
result.setDaemon(true);
return result;
}
});
recoveryThread.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
// TODO - use yet another Timer or the like for these tasks? We know
//we don't want them in the recoveryThread, because the ThreadIdToSequenceIdExpiryTask
//will wait for primary recovery.
getState().start(background, getPool().getSubscriptionAckInterval());
// initialize connections
initializeConnections();
scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
//When a server is removed from the blacklist, try again
//to establish redundancy (if we need to)
BlackListListener blackListListener = new BlackListListenerAdapter() {
@Override
public void serverRemoved(ServerLocation location) {
QueueManagerImpl.this.scheduleRedundancySatisfierIfNeeded(0);
}
};
blackList.addListener(blackListListener);
factory.getBlackList().addListener(blackListListener);
} finally {
initializedLatch.countDown();
}
}
public void readyForEvents(InternalDistributedSystem system) {
synchronized(lock) {
this.sentClientReady = true;
}
QueueConnectionImpl primary = null;
while (primary == null) {
try {
primary = (QueueConnectionImpl) getAllConnections().getPrimary();
} catch(NoSubscriptionServersAvailableException e) {
primary = null;
break;
}
if(primary.sendClientReady()) {
try {
logger.info(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_SENDING_READY_FOR_EVENTS_TO_PRIMARY_0,
primary));
ReadyForEventsOp.execute(pool, primary);
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("Error sending ready for events to {}", primary, e);
}
primary.destroy();
primary = null;
}
}
}
}
public void readyForEventsAfterFailover(QueueConnectionImpl primary) {
try {
logger.info(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_SENDING_READY_FOR_EVENTS_TO_PRIMARY_0, primary));
ReadyForEventsOp.execute(pool, primary);
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("Error sending ready for events to {}", primary, e);
}
primary.destroy();
}
}
void connectionCrashed(Connection con) {
// the endpoint has not crashed but this method does all the work
// we need to do
endpointCrashed(con.getEndpoint());
}
void endpointCrashed(Endpoint endpoint) {
QueueConnectionImpl deadConnection = null;
//We must be synchronized while checking to see if we have a queue connection for the endpoint,
//because when we need to prevent a race between adding a queue connection to the map
//and the endpoint for that connection crashing.
synchronized (lock) {
deadConnection = queueConnections.getConnection(endpoint);
if (deadConnection != null) {
queueConnections = queueConnections.removeConnection(deadConnection);
}
}
if (deadConnection != null) {
logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_ENDPOINT_CRASHED_SCHEDULING_RECOVERY,
new Object[]{deadConnection.getUpdater() != null ?(deadConnection.getUpdater().isPrimary()? "Primary" : "Redundant") : "Queue", endpoint}));
scheduleRedundancySatisfierIfNeeded(0);
deadConnection.internalDestroy();
} else {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint);
}
}
}
/**
* This method checks whether queue connection exist on this endpoint or not.
* if its there then it just destroys connection as clientUpdate thread is not there to read that connection.
*/
public void checkEndpoint(ClientUpdater ccu, Endpoint endpoint)
{
QueueConnectionImpl deadConnection = null;
synchronized (lock) {
if(shuttingDown)
return;
//if same client updater then only remove as we don't know whether it has created new updater/connection on same endpoint or not..
deadConnection = queueConnections.getConnection(endpoint);
if (deadConnection != null && ccu.equals(deadConnection.getUpdater())) {
queueConnections = queueConnections.removeConnection(deadConnection);
deadConnection.internalDestroy();
}
}
logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_CACHE_CLIENT_UPDATER_FOR_ON_ENDPOINT_EXITING_SCHEDULING_RECOVERY,
new Object[]{(deadConnection != null && deadConnection.getUpdater() != null)?(deadConnection.getUpdater().isPrimary()? "Primary" : "Redundant"): "Queue", endpoint}));
scheduleRedundancySatisfierIfNeeded(0);//one more chance
}
private void initializeConnections() {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("SubscriptionManager - intitializing connections");
}
int queuesNeeded = redundancyLevel == -1 ? -1 : redundancyLevel + 1;
Set excludedServers = new HashSet(blackList.getBadServers());
List servers = findQueueServers(excludedServers, queuesNeeded, true, false, null);
if (servers == null || servers.isEmpty()) {
logger.warn(
LocalizedStrings.QueueManagerImpl_COULD_NOT_CREATE_A_QUEUE_NO_QUEUE_SERVERS_AVAILABLE);
scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
synchronized (lock) {
queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
lock.notifyAll();
}
return;
}
if (isDebugEnabled) {
logger.debug("SubscriptionManager - discovered subscription servers {}", servers);
}
SortedMap/* <ServerQueueStatus,Connection> */oldQueueServers = new TreeMap(
QSIZE_COMPARATOR);
List nonRedundantServers = new ArrayList();
for (Iterator itr = servers.iterator(); itr.hasNext();) {
ServerLocation server = (ServerLocation) itr.next();
Connection connection = null;
try {
connection = factory.createClientToServerConnection(server, true);
} catch(GemFireSecurityException e) {
throw e;
} catch (Exception e) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager - Error connected to server: {}", server, e);
}
}
if (connection != null) {
ServerQueueStatus status = connection.getQueueStatus();
if (status.isRedundant() || status.isPrimary()) {
oldQueueServers.put(status, connection);
} else {
nonRedundantServers.add(connection);
}
}
}
// This ordering was determined from the old ConnectionProxyImpl code
//
// initialization order of the new redundant and primary server is
// old redundant w/ second largest queue
// old redundant w/ third largest queue
// ...
// old primary
// non redundants in no particular order
//
// The primary is then chosen as
// redundant with the largest queue
// primary if there are no redundants
// a non redundant
// if the redundant with the largest queue fails, then we go and
// make a new server a primary.
Connection newPrimary = null;
if (!oldQueueServers.isEmpty()) {
newPrimary = (Connection) oldQueueServers.remove(oldQueueServers
.lastKey());
} else if (!nonRedundantServers.isEmpty()) {
newPrimary = (Connection) nonRedundantServers.remove(0);
}
nonRedundantServers.addAll(0, oldQueueServers.values());
for (Iterator itr = nonRedundantServers.iterator(); itr.hasNext();) {
Connection connection = (Connection) itr.next();
QueueConnectionImpl queueConnection = initializeQueueConnection(
connection, false, null);
if (queueConnection != null) {
addToConnectionList(queueConnection, false);
}
}
QueueConnectionImpl primaryQueue = null;
if (newPrimary != null) {
primaryQueue = initializeQueueConnection(newPrimary, true, null);
if (primaryQueue == null) {
newPrimary.destroy();
} else {
if(!addToConnectionList(primaryQueue, true)) {
primaryQueue = null;
}
}
}
excludedServers.addAll(servers);
// Make sure we have enough redundant copies. Some of the connections may
// have failed
// above.
if (redundancyLevel != -1 && getCurrentRedundancy() < redundancyLevel) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager - Some initial connections failed. Trying to create redundant queues");
}
recoverRedundancy(excludedServers, false);
}
if (redundancyLevel != -1 && primaryQueue == null) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager - Intial primary creation failed. Trying to create a new primary");
}
while(primaryQueue == null) {
primaryQueue = createNewPrimary(excludedServers);
if(primaryQueue == null) {
//couldn't find a server to make primary
break;
}
if(!addToConnectionList(primaryQueue, true)) {
excludedServers.add(primaryQueue.getServer());
primaryQueue = null;
}
}
}
if (primaryQueue == null) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager - Unable to create a new primary queue, using one of the redundant queues");
}
while(primaryQueue == null) {
primaryQueue = promoteBackupToPrimary(queueConnections.getBackups());
if(primaryQueue == null) {
//no backup servers available
break;
}
if(!addToConnectionList(primaryQueue, true)) {
synchronized(lock) {
//make sure we don't retry this same connection.
queueConnections = queueConnections.removeConnection(primaryQueue);
}
primaryQueue = null;
}
}
}
if (primaryQueue == null) {
logger.error(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_COULD_NOT_INITIALIZE_A_PRIMARY_QUEUE_ON_STARTUP_NO_QUEUE_SERVERS_AVAILABLE));
synchronized (lock) {
queueConnections = queueConnections.setPrimaryDiscoveryFailed(
new NoSubscriptionServersAvailableException(LocalizedStrings.QueueManagerImpl_COULD_NOT_INITIALIZE_A_PRIMARY_QUEUE_ON_STARTUP_NO_QUEUE_SERVERS_AVAILABLE.toLocalizedString()));
lock.notifyAll();
}
cqsDisconnected();
}
else {
cqsConnected();
}
if (getCurrentRedundancy() < redundancyLevel) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_UNABLE_TO_INITIALIZE_ENOUGH_REDUNDANT_QUEUES_ON_STARTUP_THE_REDUNDANCY_COUNT_IS_CURRENTLY_0,
getCurrentRedundancy()));
}
}
private void cqsConnected() {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if(cache != null) {
CqService cqService = cache.getCqService();
//Primary queue was found, alert the affected cqs if necessary
cqService.cqsConnected(pool);
}
}
private void cqsDisconnected() {
//No primary queue was found, alert the affected cqs if necessary
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if(cache != null) {
CqService cqService = cache.getCqService();
cqService.cqsDisconnected(pool);
}
}
private int getCurrentRedundancy() {
return queueConnections.getBackups().size();
}
/**
* Make sure that we have enough backup servers.
*
* Add any servers we fail to connect to to the excluded servers list.
*/
protected boolean recoverRedundancy(Set excludedServers, boolean recoverInterest) {
if(pool.getPoolOrCacheCancelInProgress() != null) {
return true;
}
int additionalBackups;
while (pool.getPoolOrCacheCancelInProgress()==null && ((additionalBackups = redundancyLevel - getCurrentRedundancy()) > 0
|| redundancyLevel == -1)) {
if(redundancyLevel != -1 && printRecoveringRedundant) {
logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_MANAGER_REDUNDANCY_SATISFIER_REDUNDANT_ENDPOINT_HAS_BEEN_LOST_ATTEMPTIMG_TO_RECOVER));
printRecoveringRedundant = false;
}
List servers = findQueueServers(excludedServers, redundancyLevel == -1 ? -1 : additionalBackups, false,
(redundancyLevel == -1 ? false : printRedundancyNotSatisfiedError),
LocalizedStrings.QueueManagerImpl_COULD_NOT_FIND_SERVER_TO_CREATE_REDUNDANT_CLIENT_QUEUE);
if (servers == null || servers.isEmpty()) {
if (redundancyLevel != -1) {
if(printRedundancyNotSatisfiedError) {
logger.info(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_REDUNDANCY_LEVEL_0_IS_NOT_SATISFIED_BUT_THERE_ARE_NO_MORE_SERVERS_AVAILABLE_REDUNDANCY_IS_CURRENTLY_1,
new Object[] { Integer.valueOf(redundancyLevel), Integer.valueOf(getCurrentRedundancy())}));
}
}
printRedundancyNotSatisfiedError = false;//printed above
return false;
}
excludedServers.addAll(servers);
final boolean isDebugEnabled = logger.isDebugEnabled();
for (Iterator itr = servers.iterator(); itr.hasNext();) {
ServerLocation server = (ServerLocation) itr.next();
Connection connection = null;
try {
connection = factory.createClientToServerConnection(server, true);
} catch(GemFireSecurityException e) {
throw e;
} catch (Exception e) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager - Error connecting to server: ()", server, e);
}
}
if (connection == null) {
continue;
}
QueueConnectionImpl queueConnection = initializeQueueConnection(
connection, false, null);
if (queueConnection != null) {
boolean isFirstNewConnection = false;
synchronized (lock) {
if (recoverInterest && queueConnections.getPrimary() == null
&& queueConnections.getBackups().isEmpty()) {
// we lost our queue at some point. We Need to recover
// interest. This server will be made primary after this method
// finishes
// because whoever killed the primary when this method started
// should
// have scheduled a task to recover the primary.
isFirstNewConnection = true;
// TODO - Actually, we need a better check than the above. There's
// still a chance
// that we haven't realized that the primary has died but it is
// already gone. We should
// get some information from the queue server about whether it was
// able to copy the
// queue from another server and decide if we need to recover our
// interest based on
// that information.
}
}
boolean promotionFailed = false;
if (isFirstNewConnection) {
if (!promoteBackupCnxToPrimary(queueConnection)) {
promotionFailed = true;
}
}
if (!promotionFailed) {
if (addToConnectionList(queueConnection, isFirstNewConnection)) {
//redundancy satisfied
printRedundancyNotSatisfiedError = true;
printRecoveringRedundant = true;
if (logger.isDebugEnabled()) {
logger.debug("SubscriptionManager redundancy satisfier - created a queue on server {}", queueConnection.getEndpoint());
}
// Even though the new redundant queue will usually recover
// subscription information (see bug #39014) from its initial
// image provider, in bug #42280 we found that this is not always
// the case, so clients must always register interest with the new
// redundant server.
if(recoverInterest) {
recoverInterest(queueConnection, isFirstNewConnection);
}
}
}
}
}
}
return true;
}
private QueueConnectionImpl promoteBackupToPrimary(List backups) {
QueueConnectionImpl primary = null;
for (int i = 0; primary == null && i < backups.size(); i++) {
QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i);
if (promoteBackupCnxToPrimary(lastConnection)) {
primary = lastConnection;
}
}
return primary;
}
private boolean promoteBackupCnxToPrimary(QueueConnectionImpl cnx) {
boolean result = false;
if (PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.beforePrimaryIdentificationFromBackup();
}
try {
boolean haveSentClientReady = this.sentClientReady;
if(haveSentClientReady) {
cnx.sendClientReady();
}
ClientUpdater updater = cnx.getUpdater();
if(updater == null) {
if (logger.isDebugEnabled()) {
logger.debug("backup connection was destroyed before it could become the primary.");
}
Assert.assertTrue(cnx.isDestroyed());
} else {
updater.setFailedUpdater(queueConnections.getFailedUpdater());
MakePrimaryOp.execute(pool, cnx, haveSentClientReady);
result = true;
if (PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterPrimaryIdentificationFromBackup(cnx.getServer());
}
}
} catch (Exception e) {
if(pool.getPoolOrCacheCancelInProgress() == null && logger.isDebugEnabled()) {
logger.debug("Error making a backup server the primary server for client subscriptions", e);
}
}
return result;
}
/**
* Create a new primary server from a non-redundant server.
*
* Add any failed servers to the excludedServers set.
*/
private QueueConnectionImpl createNewPrimary(Set excludedServers) {
QueueConnectionImpl primary = null;
while (primary == null && pool.getPoolOrCacheCancelInProgress()==null) {
List servers = findQueueServers(excludedServers, 1, false,
printPrimaryNotFoundError,
LocalizedStrings.QueueManagerImpl_COULD_NOT_FIND_SERVER_TO_CREATE_PRIMARY_CLIENT_QUEUE);
printPrimaryNotFoundError = false; //printed above
if (servers == null || servers.isEmpty()) {
break;
}
Connection connection = null;
try {
connection = factory
.createClientToServerConnection((ServerLocation) servers.get(0), true);
} catch (GemFireSecurityException e) {
throw e;
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("SubscriptionManagerImpl - error creating a connection to server {}", servers.get(0));
}
}
if (connection != null) {
primary = initializeQueueConnection(connection, true, queueConnections.getFailedUpdater());
}
excludedServers.addAll(servers);
}
if(primary != null && sentClientReady && primary.sendClientReady()) {
readyForEventsAfterFailover(primary);
}
return primary;
}
private List findQueueServers(Set excludedServers, int count,
boolean findDurable, boolean printErrorMessage, StringId msgId) {
List servers = null;
Exception ex = null;
try {
if(pool.getPoolOrCacheCancelInProgress()!=null) {
return null;
}
servers = source.findServersForQueue(excludedServers, count, proxyId, findDurable);
} catch(GemFireSecurityException e) {
//propagate the security exception immediately.
throw e;
} catch (Exception e) {
/*logger
.warning(
LocalizedStrings.QueueManagerImpl_COULD_NOT_RETRIEVE_LIST_OF_SERVERS_FOR_SUBSCRIPTION_0,
new Object[] { e.getMessage() });*/
ex = e;
if (logger.isDebugEnabled()) {
logger.debug("SubscriptionManager - Error getting the list of servers: {}", e);
}
}
if(printErrorMessage)
{
if(servers == null || servers.isEmpty())
{
logger.error(LocalizedMessage.create(msgId,
new Object[]{ (excludedServers!= null?excludedServers.size(): 0), (ex != null?ex.getMessage(): "no exception")}));
}
}
return servers;
}
/**
* Find a new primary, adding any failed servers we encounter to the excluded
* servers list
*
* First we try to make a backup server the primary, but if run out of backup
* servers we will try to find a new server.
*/
protected void recoverPrimary(Set excludedServers) {
if(pool.getPoolOrCacheCancelInProgress() != null) {
return;
}
final boolean isDebugEnabled = logger.isDebugEnabled();
if (queueConnections.getPrimary() != null) {
if (isDebugEnabled) {
logger.debug("Primary recovery not needed");
}
return;
}
if (isDebugEnabled) {
logger.debug("SubscriptionManager redundancy satisfier - primary endpoint has been lost. Attempting to recover");
}
if(printRecoveringPrimary) {
logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_MANAGER_REDUNDANCY_SATISFIER_PRIMARY_ENDPOINT_HAS_BEEN_LOST_ATTEMPTIMG_TO_RECOVER));
printRecoveringPrimary = false;
}
QueueConnectionImpl newPrimary = null;
while(newPrimary == null && pool.getPoolOrCacheCancelInProgress()==null) {
List backups = queueConnections.getBackups();
newPrimary = promoteBackupToPrimary(backups);
//Hitesh now lets say that server crashed
if(newPrimary == null) {
//could not find a backup to promote
break;
}
if(!addToConnectionList(newPrimary, true)) {
synchronized(lock) {
//make sure we don't retry the same backup server
queueConnections = queueConnections.removeConnection(newPrimary);
}
newPrimary = null;
}
}
if(newPrimary != null) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager redundancy satisfier - Switched backup server to primary: {}", newPrimary.getEndpoint());
}
if (PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterPrimaryRecovered(newPrimary.getServer());
}
//new primary from back up server was found, alert affected cqs if necessary
cqsConnected();
printPrimaryNotFoundError = true;
printRecoveringPrimary =true;
return;
}
while(newPrimary == null) {
newPrimary = createNewPrimary(excludedServers);
if(newPrimary == null) {
//could not find a new primary to create
break;
}
if(!addToConnectionList(newPrimary, true)) {
excludedServers.add(newPrimary.getServer());
newPrimary = null;
}
if (newPrimary != null) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager redundancy satisfier - Non backup server was made primary. Recovering interest {}", newPrimary.getEndpoint());
}
if(!recoverInterest(newPrimary, true)) {
excludedServers.add(newPrimary.getServer());
newPrimary = null;
}
//New primary queue was found from a non backup, alert the affected cqs
cqsConnected();
}
if (newPrimary != null && PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterPrimaryRecovered(newPrimary.getServer());
}
printPrimaryNotFoundError = true;
printRecoveringPrimary = true;
return;
}
//No primary queue was found, alert the affected cqs
cqsDisconnected();
if (isDebugEnabled) {
logger.debug("SubscriptionManager redundancy satisfier - Could not recover a new primary");
}
synchronized (lock) {
queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
lock.notifyAll();
}
}
private QueueConnectionImpl initializeQueueConnection(Connection connection,
boolean isPrimary, ClientUpdater failedUpdater) {
QueueConnectionImpl queueConnection = null;
FailureTracker failureTracker = blackList.getFailureTracker(connection.getServer());
try {
ClientUpdater updater = factory.createServerToClientConnection(connection
.getEndpoint(), this, isPrimary, failedUpdater);
if (updater != null) {
queueConnection = new QueueConnectionImpl(this, connection, updater, failureTracker);
} else {
logger.warn(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_UNABLE_TO_CREATE_A_SUBSCRIPTION_CONNECTION_TO_SERVER_0,
connection.getEndpoint()));
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("error creating subscription connection to server {}", connection.getEndpoint(), e);
}
}
if (queueConnection == null) {
failureTracker.addFailure();
connection.destroy();
}
return queueConnection;
}
//need flag whether primary is created from backup
// for backuup queue lets assume before we add connection, endpoint crashed, now we put in connection but CCU may died as endpoint closed....
// so before putting connection need to see if something(crash) happen we should be able to recover from it
private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
boolean isBadConnection;
synchronized(lock) {
ClientUpdater cu = connection.getUpdater();
if(cu == null || (! cu.isAlive() ) || (!cu.isProcessing()) )
return false;//don't add
//now still CCU can died but then it will execute Checkendpoint with lock it will remove connection connection and it will reschedule it.
if(connection.getEndpoint().isClosed() || shuttingDown || pool.getPoolOrCacheCancelInProgress()!=null) {
isBadConnection = true;
} else {
isBadConnection = false;
if(isPrimary) {
queueConnections = queueConnections.setPrimary(connection);
lock.notifyAll();
} else {
queueConnections = queueConnections.addBackup(connection);
}
}
}
if(isBadConnection) {
if(logger.isDebugEnabled()) {
logger.debug("Endpoint {} crashed while creating a connection. The connection will be destroyed", connection.getEndpoint());
}
connection.internalDestroy();
}
return !isBadConnection;
}
protected void scheduleRedundancySatisfierIfNeeded(long delay) {
if(shuttingDown) {
return;
}
synchronized (lock) {
if(shuttingDown) {
return;
}
if (queueConnections.getPrimary() == null
|| getCurrentRedundancy() < redundancyLevel || redundancyLevel == -1
|| queueConnections.primaryDiscoveryFailed()) {
if (redundancySatisfierTask != null) {
if (redundancySatisfierTask.getRemainingDelay() > delay) {
redundancySatisfierTask.cancel();
} else {
return;
}
}
redundancySatisfierTask = new RedundancySatisfierTask();
try {
ScheduledFuture future = recoveryThread.schedule(redundancySatisfierTask,
delay, TimeUnit.MILLISECONDS);
redundancySatisfierTask.setFuture(future);
} catch(RejectedExecutionException e) {
//ignore, the timer has been cancelled, which means we're shutting down.
}
}
}
}
private boolean recoverInterest(final QueueConnectionImpl newConnection,
final boolean isFirstNewConnection) {
if(pool.getPoolOrCacheCancelInProgress() != null) {
return true;
}
// recover interest
try {
recoverAllInterestTypes(newConnection, isFirstNewConnection);
newConnection.getFailureTracker().reset();
return true;
}
catch (CancelException ignore) {
return true;
// ok to ignore we are being shutdown
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
SystemFailure.checkFailure();
pool.getCancelCriterion().checkCancelInProgress(t);
logger.warn(LocalizedMessage.create(
LocalizedStrings.QueueManagerImpl_QUEUEMANAGERIMPL_FAILED_TO_RECOVER_INTEREST_TO_SERVER_0,
newConnection.getServer()), t);
newConnection.getFailureTracker().addFailure();
newConnection.destroy();
return false;
}
}
public QueueState getState() {
return this.state;
}
private void recoverSingleList(int interestType, Connection recoveredConnection,
boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
Iterator i = this.getPool().getRITracker()
.getRegionToInterestsMap(interestType, isDurable, !receiveValues).values().iterator();
while (i.hasNext()) { // restore a region
RegionInterestEntry e = (RegionInterestEntry) i.next();
recoverSingleRegion(e.getRegion(), e.getInterests(), interestType,
recoveredConnection, isDurable, receiveValues, isFirstNewConnection);
} // restore a region
}
private void recoverCqs(Connection recoveredConnection, boolean isDurable) {
Map cqs = this.getPool().getRITracker().getCqsMap();
Iterator i = cqs.entrySet().iterator();
while(i.hasNext()) {
Map.Entry e = (Map.Entry)i.next();
ClientCQ cqi = (ClientCQ)e.getKey();
String name = cqi.getName();
if (this.pool.getMultiuserAuthentication()) {
UserAttributes.userAttributes.set(((DefaultQueryService)this.pool
.getQueryService()).getUserAttributes(name));
}
try {
if (((CqStateImpl)cqi.getState()).getState() != CqStateImpl.INIT) {
cqi.createOn(recoveredConnection, isDurable);
}
} finally {
UserAttributes.userAttributes.set(null);
}
}
}
// TODO this is distressingly similar to LocalRegion#processSingleInterest
private void recoverSingleRegion(LocalRegion r, Map keys, int interestType,
Connection recoveredConnection, boolean isDurable,
boolean receiveValues, boolean isFirstNewConnection) {
if (logger.isDebugEnabled()) {
logger.debug("{}.recoverSingleRegion starting kind={} region={}: {}", this, InterestType.getString(interestType), r.getFullPath(), keys);
}
// build a HashMap, key is policy, value is list
HashMap policyMap = new HashMap();
Iterator keysIter = keys.entrySet().iterator();
while (keysIter.hasNext()) { // restore and commit an interest
Map.Entry me = (Map.Entry) keysIter.next();
Object key = me.getKey();
InterestResultPolicy pol = (InterestResultPolicy) me.getValue();
if (interestType == InterestType.KEY) {
// Gester: we only consolidate the key into list for InterestType.KEY
LinkedList keyList = (LinkedList)policyMap.get(pol);
if (keyList == null) {
keyList = new LinkedList();
}
keyList.add(key);
policyMap.put(pol, keyList);
} else {
// for other Interest type, do it one by one
recoverSingleKey(r, key, pol, interestType, recoveredConnection,
isDurable, receiveValues, isFirstNewConnection);
}
}
// Process InterestType.KEY: Iterator list for each each policy
Iterator polIter = policyMap.entrySet().iterator();
while (polIter.hasNext()) {
Map.Entry me = (Map.Entry) polIter.next();
LinkedList keyList = (LinkedList)me.getValue();
InterestResultPolicy pol = (InterestResultPolicy)me.getKey();
recoverSingleKey(r, keyList, pol, interestType, recoveredConnection,
isDurable, receiveValues, isFirstNewConnection);
}
}
private void recoverSingleKey(LocalRegion r, Object keys,
InterestResultPolicy policy, int interestType, Connection recoveredConnection,
boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
r.startRegisterInterest();
try {
// Remove all matching values from local cache
if (isFirstNewConnection) { // only if this recoveredEP
// becomes primaryEndpoint
r.clearKeysOfInterest(keys, interestType, policy);
if (logger.isDebugEnabled()) {
logger.debug("{}.recoverSingleRegion :Endpoint recovered is primary so clearing the keys of interest starting kind={} region={}: {}", this, InterestType.getString(interestType), r.getFullPath(), keys);
}
}
// Register interest, get new values back
List serverKeys;
if (policy != InterestResultPolicy.KEYS_VALUES) {
serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
keys, interestType, policy, isDurable, !receiveValues,
r.getAttributes().getDataPolicy().ordinal);
// Restore keys based on server's response
if (isFirstNewConnection) {
// only if this recoveredEP becomes primaryEndpoint
r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
}
} else {
if (!isFirstNewConnection) {
// InterestResultPolicy.KEYS_VALUES now fetches values in
// RegisterInterestOp's response itself and in this case
// refreshEntriesFromServerKeys(...) does not explicitly fetch values
// but only updates keys-values to region. To not fetch values, we
// need to use policy NONE or KEYS.
serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
keys, interestType, InterestResultPolicy.NONE, isDurable, !receiveValues,
r.getAttributes().getDataPolicy().ordinal);
} else {
serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
keys, interestType, policy, isDurable, !receiveValues,
r.getAttributes().getDataPolicy().ordinal);
r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
}
}
} finally {
r.finishRegisterInterest();
}
}
private void recoverInterestList(final Connection recoveredConnection,
boolean durable, boolean receiveValues, boolean isFirstNewConnection) {
recoverSingleList(InterestType.KEY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
recoverSingleList(InterestType.REGULAR_EXPRESSION, recoveredConnection, durable, receiveValues, isFirstNewConnection);
recoverSingleList(InterestType.FILTER_CLASS, recoveredConnection, durable, receiveValues, isFirstNewConnection);
recoverSingleList(InterestType.OQL_QUERY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
// VJR: Recover CQs moved to recoverAllInterestTypes() to avoid multiple
// calls for receiveValues flag being true and false.
//recoverCqs(recoveredConnection, durable);
//recoverSingleList(InterestType.CQ, recoveredConnection, durable,isFirstNewConnection);
}
protected void recoverAllInterestTypes(final Connection recoveredConnection,
boolean isFirstNewConnection) {
if (PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.beforeInterestRecovery();
}
recoverInterestList(recoveredConnection, false, true, isFirstNewConnection);
recoverInterestList(recoveredConnection, false, false, isFirstNewConnection);
recoverCqs(recoveredConnection, false);
if ( getPool().isDurableClient()) {
recoverInterestList(recoveredConnection, true, true, isFirstNewConnection);
recoverInterestList(recoveredConnection, true, false, isFirstNewConnection);
recoverCqs(recoveredConnection, true);
}
}
/**
* A comparator which sorts queue elements in the order of primary first
* redundant with smallest queue size ... redundant with largest queue size
*
* @author dsmith
*
*/
protected static class QSizeComparator implements java.util.Comparator {
public int compare(Object o1, Object o2) {
ServerQueueStatus s1 = (ServerQueueStatus) o1;
ServerQueueStatus s2 = (ServerQueueStatus) o2;
// sort primaries to the front of the list
if (s1.isPrimary() && !s2.isPrimary()) {
return -1;
} else if (!s1.isPrimary() && s2.isPrimary()) {
return 1;
} else {
int diff = s1.getServerQueueSize() - s2.getServerQueueSize();
if(diff != 0) {
return diff;
} else {
return s1.getMemberId().compareTo(s2.getMemberId());
}
}
}
}
/**
* A data structure for holding the current set of connections the
* queueConnections reference should be maintained by making a copy of this
* data structure for each change.
*
* Note the the order of the backups is significant. The first backup in the
* list is the first server that will be become primary after the primary
* fails, etc.
*
* The order of backups in this list is the reverse of the order or endpoints
* from the old ConnectionProxyImpl .
*/
public class ConnectionList implements QueueConnections {
private final QueueConnectionImpl primary;
private final Map/* <Endpoint, QueueConnection> */connectionMap;
private final List/* <QueueConnection> */backups;
/**
* The primaryDiscoveryException flag is stronger than just not having any
* queue connections It also means we tried all of the possible queue
* servers and we'ren't able to connect.
*/
private final GemFireException primaryDiscoveryException;
private final QueueConnectionImpl failedPrimary;
public ConnectionList() {
primary = null;
connectionMap = Collections.EMPTY_MAP;
backups = Collections.EMPTY_LIST;
primaryDiscoveryException = null;
failedPrimary = null;
}
private ConnectionList(QueueConnectionImpl primary, List backups,
GemFireException discoveryException, QueueConnectionImpl failedPrimary) {
this.primary = primary;
Map allConnectionsTmp = new HashMap();
for (Iterator itr = backups.iterator(); itr.hasNext();) {
QueueConnectionImpl nextConnection = (QueueConnectionImpl) itr.next();
allConnectionsTmp.put(nextConnection.getEndpoint(), nextConnection);
}
if (primary != null) {
allConnectionsTmp.put(primary.getEndpoint(), primary);
}
this.connectionMap = Collections.unmodifiableMap(allConnectionsTmp);
this.backups = Collections.unmodifiableList(new ArrayList(backups));
pool.getStats().setSubscriptionCount(connectionMap.size());
this.primaryDiscoveryException = discoveryException;
this.failedPrimary = failedPrimary;
}
public ConnectionList setPrimary(QueueConnectionImpl newPrimary) {
List newBackups = backups;
if (backups.contains(newPrimary)) {
newBackups = new ArrayList(backups);
newBackups.remove(newPrimary);
}
return new ConnectionList(newPrimary, newBackups, null, null);
}
public ConnectionList setPrimaryDiscoveryFailed(
GemFireException p_discoveryException) {
GemFireException discoveryException = p_discoveryException;
if(discoveryException == null) {
discoveryException = new NoSubscriptionServersAvailableException("Primary discovery failed.");
}
return new ConnectionList(primary, backups, discoveryException, failedPrimary);
}
public ConnectionList addBackup(QueueConnectionImpl queueConnection) {
ArrayList newBackups = new ArrayList(backups);
newBackups.add(queueConnection);
return new ConnectionList(primary, newBackups, primaryDiscoveryException, failedPrimary);
}
public ConnectionList removeConnection(QueueConnectionImpl connection) {
if (primary == connection) {
return new ConnectionList(null, backups, primaryDiscoveryException, primary);
} else {
ArrayList newBackups = new ArrayList(backups);
newBackups.remove(connection);
return new ConnectionList(primary, newBackups, primaryDiscoveryException, failedPrimary);
}
}
public Connection getPrimary() {
return primary;
}
public List/* <QueueConnection> */getBackups() {
return backups;
}
/**
* Return the cache client updater from the previously
* failed primary
* @return the previous updater or null if there is no previous updater
*/
public ClientUpdater getFailedUpdater() {
if(failedPrimary != null) {
return failedPrimary.getUpdater();
} else {
return null;
}
}
public boolean primaryDiscoveryFailed() {
return primaryDiscoveryException != null;
}
public GemFireException getPrimaryDiscoveryException() {
return primaryDiscoveryException;
}
public QueueConnectionImpl getConnection(Endpoint endpoint) {
return (QueueConnectionImpl) connectionMap.get(endpoint);
}
/** return a copy of the list of all server locations */
public Set/* <ServerLocation> */getAllLocations() {
HashSet locations = new HashSet();
for (Iterator itr = connectionMap.keySet().iterator(); itr.hasNext();) {
com.gemstone.gemfire.cache.client.internal.Endpoint endpoint = (com.gemstone.gemfire.cache.client.internal.Endpoint) itr.next();
locations.add(endpoint.getLocation());
}
return locations;
}
}
protected void logError(StringId message, Throwable t) {
if(t instanceof GemFireSecurityException) {
securityLogger.error(message, t);
} else {
logger.error(message, t);
}
}
/**
* Asynchronous task which tries to restablish a primary connection and
* satisfy redundant requirements.
*
* This task should only be running in a single thread at a time. This task is
* the only way that new queue servers will be added, and the only way that a
* backup server can transistion to a primary server.
*
*/
protected class RedundancySatisfierTask extends PoolTask {
private boolean isCancelled;
private ScheduledFuture future;
public void setFuture(ScheduledFuture future) {
this.future = future;
}
public long getRemainingDelay() {
return future.getDelay(TimeUnit.MILLISECONDS);
}
@Override
public void run2() {
try {
initializedLatch.await();
synchronized (lock) {
if (isCancelled) {
return;
} else {
redundancySatisfierTask = null;
}
if(pool.getPoolOrCacheCancelInProgress()!=null) {
/* wake up waiters so they can detect cancel */
lock.notifyAll();
return;
}
}
Set excludedServers = queueConnections.getAllLocations();
excludedServers.addAll(blackList.getBadServers());
excludedServers.addAll(factory.getBlackList().getBadServers());
recoverPrimary(excludedServers);
recoverRedundancy(excludedServers, true);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (CancelException e) {
throw e;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
synchronized (lock) {
if(t instanceof GemFireSecurityException) {
queueConnections = queueConnections.setPrimaryDiscoveryFailed((GemFireSecurityException) t);
} else {
queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
}
lock.notifyAll();
pool.getCancelCriterion().checkCancelInProgress(t);
logError(LocalizedStrings.QueueManagerImpl_ERROR_IN_REDUNDANCY_SATISFIER, t);
}
}
scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
}
public boolean cancel() {
synchronized (lock) {
if(isCancelled) {
return false;
}
isCancelled = true;
future.cancel(false);
redundancySatisfierTask = null;
return true;
}
}
}
public static void loadEmergencyClasses() {
QueueConnectionImpl.loadEmergencyClasses();
}
}