blob: 11b93a297468faad5793aba30bbe2a89f3a4ecdd [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.impl.node;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.rep.MasterTransferFailureException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.elections.Elections;
import com.sleepycat.je.rep.elections.Learner;
import com.sleepycat.je.rep.elections.MasterValue;
import com.sleepycat.je.rep.elections.Proposer.Proposal;
import com.sleepycat.je.rep.elections.Proposer.WinningProposal;
import com.sleepycat.je.rep.elections.TimebasedProposalGenerator;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepNodeImpl;
import com.sleepycat.je.rep.utilint.RepUtils.ExceptionAwareBlockingQueue;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.VLSN;
/**
* A Master Transfer operation.
* <p>
* Each Master Transfer operation uses a separate instance of this class.
* There is usually no more than one instance in the lifetime of a master node,
* because if the transfer succeeds, the old master node environment becomes
* invalid and must be closed. However, if an operation times out, another
* operation can try again later. Or, a second operation can "forcibly"
* supersede an existing operation in progress.
*
* @see ReplicatedEnvironment#transferMaster(Set, int, TimeUnit)
*/
public class MasterTransfer {
final private Set<String> replicas;
final private long startTimeMs;
final private long timeout;
final private long deadlineTimeMs;
final private RepNode repNode;
final private Map<String, VLSN> readyReplicas;
volatile private CountDownLatch blocker;
/**
* Flag that indicates we've reached the point where we're committed to
* proceeding with the transfer: we've completed phase 2, chosen a winner,
* and are now notifying everyone of the new (fake) election result. Once
* we get to this point, we can't allow a new Master Transfer operation
* attempt to supersede us.
*/
volatile private boolean done;
/**
* Queue which communicates key events of interest from Feeders regarding
* the progress of their efforts to catch up with the end of the log. The
* existence of this object signifies that (1) the owning Master Transfer
* object is viable (hasn't been superseded by a later, "forcing" MT
* operation); and (2) we have not yet discovered a winner. Once we have
* chosen a winner we disallow any future attempt to supersede this
* operation.
*
* @see #abort
* @see RepNode#setUpTransfer
*/
private ExceptionAwareBlockingQueue<VLSNProgress> eventQueue;
final private Logger logger = LoggerUtils.getLogger(getClass());
MasterTransfer(Set<String> replicas, long timeout, RepNode repNode) {
this.replicas = replicas;
this.timeout = timeout;
startTimeMs = System.currentTimeMillis();
deadlineTimeMs = startTimeMs + timeout;
this.repNode = repNode;
LoggerUtils.info(logger, repNode.getRepImpl(),
"Start Master Transfer for " +
timeout + " msec, targeting: " +
Arrays.toString(replicas.toArray()));
readyReplicas = new HashMap<>(replicas.size());
eventQueue = new ExceptionAwareBlockingQueue<>
(repNode.getRepImpl(), new VLSNProgress(null, null));
}
/**
* Aborts an existing, in-progress Master Transfer operation, if it hasn't
* reached the point of no return.
*
* @return true, if the operation was cancelled, false if it's too late for
* a clean cancellation.
*/
synchronized public boolean abort(Exception e) {
assert (e != null);
if (done) {
return false;
}
final ExceptionAwareBlockingQueue<VLSNProgress> queue = getQueue();
if (queue != null) {
queue.releasePoll(e);
}
return true;
}
/**
* Accepts a Progress event and posts it to our queue for processing by the
* Master Transfer operation thread.
*/
synchronized void noteProgress(VLSNProgress p) {
final ExceptionAwareBlockingQueue<VLSNProgress> queue = getQueue();
if (queue != null) {
queue.add(p);
}
}
/**
* Informs this Master Transfer operation that the named Feeder is shutting
* down, because its replica connection has been lost. This of course
* means that we can't expect this Feeder to soon catch up with our VLSN.
* In particular, if we have reached Phase 2 on the strength of the
* progress of only this one Feeder, then we must revert back to Phase 1.
* <p>
* Actually all we do here is post a special kind of "progress" event to
* our queue; it gets processed for real in the {@code chooseReplica()}
* thread, along with all the other events.
*
* @see #chooseReplica
*/
void giveUp(String replicaNodeName) {
noteProgress(VLSNProgress.makeFeederDeathEvent(replicaNodeName));
}
synchronized private ExceptionAwareBlockingQueue<VLSNProgress> getQueue() {
return eventQueue;
}
/**
* Performs the core processing of a Master Transfer operation. We first
* wait for one of the candidate target replica nodes to become completely
* synchronized. We then send a message to all nodes in the group
* (including ourselves) announcing which node is to become the new
* master.
* <p>
* If the operation fails we release any transaction commit/abort threads
* that may have been blocked during phase 2 of the wait. However, in the
* success case the release of any such transaction threads is done as a
* natural by-product of the transition of the environment from master to
* replica status.
*/
String transfer() {
try {
String result = chooseReplica();
if (result == null) {
throw new MasterTransferFailureException(getTimeoutMsg());
}
done = true;
synchronized (this) {
eventQueue = null;
}
annouceWinner(result);
return result;
} catch (MasterTransferFailureException e) {
LoggerUtils.warning(logger, repNode.getRepImpl(),
"Master Transfer operation failed: " + e);
throw e;
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(repNode.getRepImpl(), ie);
} finally {
eventQueue = null;
if (!done && blocker != null) {
blocker.countDown();
}
}
}
/**
* Prepares for a Master Transfer operation by waiting for one of the
* nominated candidate target replica nodes to catch up with the master,
* in two phases, as described in
* {@link ReplicatedEnvironment#transferMaster(Set, int, TimeUnit)}.
* <p>
* This method works by observing events generated by Feeder threads and
* passed to us via a queue.
*
* @return the node name of the first replica to complete phase 2 of the
* preparation, or {@code null} if the operation times out.
*/
private String chooseReplica() throws InterruptedException {
final ExceptionAwareBlockingQueue<VLSNProgress> queue = getQueue();
if (queue == null) {
return null;
}
final FeederManager feederManager = repNode.feederManager();
final Map<String, Feeder> activeReplicas =
feederManager.activeReplicasMap();
for (String nodeName : replicas) {
final Feeder feeder = activeReplicas.get(nodeName);
if (feeder != null) {
feeder.setMasterTransfer(this);
}
}
/*
* Phase 1 could last a long time, if all of our candidate replicas are
* still catching up (or not even connected); so we allow new
* transactions to be written. But once we get to phase 2 we block
* commit/abort operations for a final (quicker) catch-up. Thus we can
* tell whether we're in phase 2 by whether we have a non-null blocker.
*/
String result = null;
for (;;) {
final long pollTimeout =
deadlineTimeMs - System.currentTimeMillis();
final VLSNProgress event =
queue.pollOrException(pollTimeout, TimeUnit.MILLISECONDS);
if (event == null) {
return null;
}
final VLSN endVLSN = repNode.getCurrentTxnEndVLSN();
Level level = Level.INFO;
if (event.isFeederDeathEvent()) {
readyReplicas.remove(event.replicaNodeName);
if (blocker != null && readyReplicas.isEmpty()) {
/*
* Must revert back to phase 1. The latch will still
* exist, because we've passed it to repImpl; and this is
* exactly what we want, so that blocked txns can proceed,
* and new ones won't get blocked for now.
*/
blocker.countDown();
blocker = null;
}
} else if (blocker == null) { /* phase 1 */
assert readyReplicas.isEmpty();
readyReplicas.put(event.replicaNodeName, event.vlsn);
blocker = new CountDownLatch(1);
repNode.getRepImpl().blockTxnCompletion(blocker);
/*
* >= comparison, here and below, since currentTxnEndVLSN can
* lag the latest txnEndVLSN actually written to the log.
*/
if (event.getVLSN().compareTo(endVLSN) >= 0) {
result = event.replicaNodeName;
}
} else { /* phase 2 */
if (event.getVLSN().compareTo(endVLSN) >= 0) {
result = event.replicaNodeName;
} else {
/*
* The present VLSN does not match the ultimate target
* VLSN, so we're not done yet. Since there could be a few
* events of this type, only log all of them at the
* {@code FINE} level.
*/
readyReplicas.put(event.replicaNodeName, event.vlsn);
level = Level.FINE;
}
}
/* Emit log message after the fact */
LoggerUtils.logMsg(logger, repNode.getRepImpl(), level,
"Master Transfer progress: " +
event.replicaNodeName + ", " + event.vlsn +
", phase: " + (blocker == null ? 1 : 2) +
", endVLSN: " + endVLSN);
if (result != null) {
return result;
}
}
}
/**
* Broadcasts a fake election result message. This does a couple things:
* (1) prods the chosen replica to become the new master; and (2) forces
* the old master to notice and shut down with a master-replica transition
* exception.
*/
private void annouceWinner(String nodeName) {
final RepGroupImpl group = repNode.getGroup();
RepNodeImpl node = group.getNode(nodeName);
MasterValue newMaster = new MasterValue
(node.getSocketAddress().getHostName(),
node.getSocketAddress().getPort(),
node.getNameIdPair());
Proposal proposal =
new TimebasedProposalGenerator().nextProposal();
final Elections elections = repNode.getElections();
elections.getLearner();
Learner.informLearners
(group.getAllLearnerSockets(),
new WinningProposal(proposal, newMaster, null),
elections.getProtocol(),
elections.getThreadPool(),
elections.getLogger(),
repNode.getRepImpl(),
null);
}
/**
* Enables the given {@code Feeder} to contribute to this Master Transfer
* operation. Called from the {@code FeederManager} when a new {@code
* Feeder} is established during the time when a Master Transfer operation
* is already in progress.
*/
void addFeeder(Feeder f) {
String name = f.getReplicaNameIdPair().getName();
if (replicas.contains(name)) {
LoggerUtils.info(logger, repNode.getRepImpl(),
"Add node " + name +
" to existing Master Transfer");
f.setMasterTransfer(this);
}
}
public long getStartTime() {
return startTimeMs;
}
/**
* Generates a detailed error message for the case when the operation times
* out.
*/
private String getTimeoutMsg() {
return "Timed out: started at " + new Date(startTimeMs) +
" for " + timeout + " milliseconds\n" +
"master's VLSN: " + repNode.getCurrentTxnEndVLSN() +
repNode.dumpAckFeederState();
}
/**
* An event of interest in the pursuit of our goal of completing the Master
* Transfer. Generally it indicates that the named replica has received
* and processed the transaction identified by the given VLSN. As a
* special case, an event representing the death of a Feeder is represented
* by a {@code null} VLSN.
*/
static class VLSNProgress {
final VLSN vlsn;
final String replicaNodeName;
VLSNProgress(VLSN vlsn, String replicaNodeName) {
this.vlsn = vlsn;
this.replicaNodeName = replicaNodeName;
}
static VLSNProgress makeFeederDeathEvent(String nodeName) {
return new VLSNProgress(null, nodeName);
}
VLSN getVLSN() {
assert vlsn != null;
return vlsn;
}
boolean isFeederDeathEvent() {
return vlsn == null;
}
}
}