blob: 5a9579111955d1db6b9eacb2f6b67e4330d4514e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.qourum;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {
public enum BACKUP_ACTIVATION {
FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;
}
private QuorumManager quorumManager;
private String targetServerID = "";
private final NodeManager nodeManager;
private final StorageManager storageManager;
private final ScheduledExecutorService scheduledPool;
private CountDownLatch latch;
private ClientSessionFactoryInternal sessionFactory;
private CoreRemotingConnection connection;
/**
* This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage}
* with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with
* {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#FAIL_OVER}.
* <p>
* If the second message does come within this dead line, we fail over anyway.
*/
public static final int WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG = 60;
public SharedNothingBackupQuorum(StorageManager storageManager,
NodeManager nodeManager,
ScheduledExecutorService scheduledPool) {
this.storageManager = storageManager;
this.scheduledPool = scheduledPool;
this.latch = new CountDownLatch(1);
this.nodeManager = nodeManager;
}
private volatile BACKUP_ACTIVATION signal;
/**
* safety parameter to make _sure_ we get out of await()
*/
private static final int LATCH_TIMEOUT = 30;
private static final int RECONNECT_ATTEMPTS = 5;
private final Object decisionGuard = new Object();
@Override
public String getName() {
return "SharedNothingBackupQuorum";
}
public void decideOnAction(Topology topology) {
//we may get called via multiple paths so need to guard
synchronized (decisionGuard) {
if (signal == BACKUP_ACTIVATION.FAIL_OVER) {
return;
}
if (!isLiveDown()) {
try {
// no point in repeating all the reconnection logic
sessionFactory.connect(RECONNECT_ATTEMPTS, false);
return;
}
catch (ActiveMQException e) {
if (e.getType() != ActiveMQExceptionType.NOT_CONNECTED)
ActiveMQServerLogger.LOGGER.errorReConnecting(e);
}
}
// live is assumed to be down, backup fails-over
signal = BACKUP_ACTIVATION.FAIL_OVER;
}
latch.countDown();
}
public void liveIDSet(String liveID) {
targetServerID = liveID;
nodeManager.setNodeID(liveID);
//now we are replicating we can start waiting for disconnect notifications so we can fail over
// sessionFactory.addFailureListener(this);
}
@Override
public void setQuorumManager(QuorumManager quorumManager) {
this.quorumManager = quorumManager;
}
/**
* if the node going down is the node we are replicating from then decide on an action.
*
* @param topology
* @param eventUID
* @param nodeID
*/
@Override
public void nodeDown(Topology topology, long eventUID, String nodeID) {
if (targetServerID.equals(nodeID)) {
decideOnAction(topology);
}
}
@Override
public void nodeUp(Topology topology) {
//noop
}
/**
* if the connection to our replicated live goes down then decide on an action
*/
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
decideOnAction(sessionFactory.getServerLocator().getTopology());
}
@Override
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
connectionFailed(me, failedOver);
}
@Override
public void beforeReconnect(ActiveMQException exception) {
//noop
}
@Override
public void close() {
causeExit(BACKUP_ACTIVATION.STOP);
}
/**
* @param sessionFactory the session factory used to connect to the live server
*/
public void setSessionFactory(final ClientSessionFactoryInternal sessionFactory) {
this.sessionFactory = sessionFactory;
this.connection = (CoreRemotingConnection) sessionFactory.getConnection();
connection.addFailureListener(this);
//belts and braces, there are circumstances where the connection listener doesn't get called but the session does.
sessionFactory.addFailureListener(this);
}
/**
* Releases the latch, causing the backup activation thread to fail-over.
* <p>
* The use case is for when the 'live' has an orderly shutdown, in which case it informs the
* backup that it should fail-over.
*/
public synchronized void failOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
removeListener();
signal = BACKUP_ACTIVATION.FAIL_OVER;
if (finalMessage == ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER) {
latch.countDown();
}
if (finalMessage == ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED) {
final CountDownLatch localLatch = latch;
scheduledPool.schedule(new Runnable() {
@Override
public void run() {
localLatch.countDown();
}
}, WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
}
}
public void notifyRegistrationFailed() {
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
latch.countDown();
}
public void notifyAlreadyReplicating() {
signal = BACKUP_ACTIVATION.ALREADY_REPLICATING;
latch.countDown();
}
private void removeListener() {
if (connection != null) {
connection.removeFailureListener(this);
}
}
/**
* Called by the replicating backup (i.e. "SharedNothing" backup) to wait for the signal to
* fail-over or to stop.
*
* @return signal, indicating whether to stop or to fail-over
*/
public BACKUP_ACTIVATION waitForStatusChange() {
try {
latch.await();
}
catch (InterruptedException e) {
return BACKUP_ACTIVATION.STOP;
}
return signal;
}
/**
* Cause the Activation thread to exit and the server to be stopped.
*
* @param explicitSignal the state we want to set the quorum manager to return
*/
public synchronized void causeExit(BACKUP_ACTIVATION explicitSignal) {
removeListener();
this.signal = explicitSignal;
latch.countDown();
}
public synchronized void reset() {
latch = new CountDownLatch(1);
}
/**
* we need to vote on the quorum to decide if we should become live
*
* @return the voting decision
*/
private boolean isLiveDown() {
// we use 1 less than the max cluste size as we arent bothered about the replicated live node
int size = quorumManager.getMaxClusterSize();
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager);
quorumManager.vote(quorumVote);
try {
quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
}
catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has
}
quorumManager.voteComplete(quorumVote);
return quorumVote.getDecision();
}
}