blob: b977c5fe11175de5b4e4f791eddba115710974e0 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.elections;
import static com.sleepycat.je.rep.impl.RepParams.ELECTIONS_REBROADCAST_PERIOD;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.config.IntConfigParam;
import com.sleepycat.je.dbi.DbConfigManager;
import com.sleepycat.je.dbi.EnvironmentFailureReason;
import com.sleepycat.je.dbi.EnvironmentImpl;
import com.sleepycat.je.rep.QuorumPolicy;
import com.sleepycat.je.rep.elections.Proposer.ExitElectionException;
import com.sleepycat.je.rep.elections.Proposer.Proposal;
import com.sleepycat.je.rep.elections.Protocol.Value;
import com.sleepycat.je.rep.elections.Utils.FutureTrackingCompService;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepNodeImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.TextProtocol.MessageExchange;
import com.sleepycat.je.rep.impl.node.ElectionQuorum;
import com.sleepycat.je.rep.impl.node.FeederManager;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.net.DataChannelFactory;
import com.sleepycat.je.rep.utilint.ReplicationFormatter;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.StatGroup;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.StoppableThreadFactory;
/**
* Represents the environment in which elections are run on behalf of a node.
* There is exactly one instance of an Elections for each node. Elections are
* initiated via this class.
*
* One of the primary goals of this interface is to keep Elections as free
* standing as possible, so that we can change how elections are held, or
* aspects of the election infrastructure with minimal impact on replication.
* For example, elections currently used tcp for communication of election
* messages but may want to switch over to udp. Such a change should be
* confined to just the Elections module. Other changes might include changes
* to the strategy used to suggest Values and the weight associated with a
* suggested Value.
*
* The following are the principal points of interaction between Elections and
* Replication:
*
* 1) The initiation of elections via the initiateElections() method.
*
* 2) The suggestion of nodes as masters and the ranking of the
* suggestion. This is done via the Acceptor.SuggestionGenerator interface. An
* instance of this interface is supplied when the Elections class is
* instantiated. Note that the implementation must also initiate a freeze of
* VLSNs to ensure that the ranking does not change as the election
* progresses. The VLSN can make progress when the node is informed via its
* Listener that an election with a higher Proposal number (than the one in the
* Propose request) has finished.
*
* 3) Obtaining the result of an election initiated in step 1. This is done via
* the Learner.Listener interface. An instance of this class is supplied when
* the Election class is first instantiated.
*
*/
public class Elections {
/* Describes all nodes of the group. */
private RepGroupImpl repGroup;
/*
* A unique identifier for this election agent. It's used by all the
* agents that comprise Elections.
*/
private final NameIdPair nameIdPair;
/*
* A repNode is kept for error propagation if this election belongs to a
* replicated environment. Elections are dependent on the RepNode to track
* the number of members currently in a group and to deal with changing
* quorum requirements when a node is acting as a Primary.
* Note that repNode may be null if the creator of this Elections object
* does not initiate an election and if the node can never be a master.
* The Arbiter uses it this way.
*/
private final RepNode repNode;
private final ElectionsConfig config;
private final RepImpl envImpl;
/*
* Shutdown can only be executed once. The shutdown field protects against
* multiple invocations.
*/
private final AtomicBoolean shutdown = new AtomicBoolean(false);
/* The three agents involved in the elections run by this class. */
private Proposer proposer;
private Acceptor acceptor;
private Learner learner;
/* The thread pool used to manage the threads used by the Proposer. */
private final ExecutorService pool;
/* Components of the agents. */
final private Acceptor.SuggestionGenerator suggestionGenerator;
final private Learner.Listener listener;
/*
* The protocol used to run the elections. All three agents use this
* instance of the protocol
*/
private final Protocol protocol;
/*
* The thread used to run the proposer during the current election. It's
* volatile to ensure that shutdown can perform an unsynchronized access
* to the iv even if an election is in progress.
*/
private volatile ElectionThread electionThread = null;
/* The listener used to indicate completion of an election. */
private ElectionListener electionListener = null;
/**
* The timer task that re-broadcasts election results from a master. It's
* null in unit tests.
*/
private final RebroadcastTask rebroadcastTask;
/* The number of elections that were held. */
private int nElections = 0;
private final Logger logger;
private final Formatter formatter;
/**
* Creates an instance of Elections. There should be just one instance per
* node. Note that the creation does not result in the election threads
* being started, that is, the instance does not participate in elections.
* This call is typically followed up with a call to startLearner that lets
* it both learn about and supply elections results, and, if applicable, by
* a subsequent call to participate to let it vote in elections.
* The RepNode parameter is null when the Elections object is used by
* the Arbiter. The Arbiter is a Learner and Acceptor. It will never
* initiate an election (the RepNode must be non-null) and never
* become Master.
*
* @param config elections configuration
* @param listener the Listener invoked when results are available
* @param suggestionGenerator used by the Acceptor
*/
public Elections(ElectionsConfig config,
Learner.Listener listener,
Acceptor.SuggestionGenerator suggestionGenerator) {
envImpl = config.getRepImpl();
this.repNode = config.getRepNode();
this.config = config;
this.nameIdPair = config.getNameIdPair();
DataChannelFactory channelFactory;
if (repNode != null && repNode.getRepImpl() != null) {
logger = LoggerUtils.getLogger(getClass());
final DbConfigManager configManager = envImpl.getConfigManager();
int rebroadcastPeriod = configManager.
getDuration(ELECTIONS_REBROADCAST_PERIOD);
rebroadcastTask = new RebroadcastTask(rebroadcastPeriod);
} else {
logger = LoggerUtils.getLoggerFormatterNeeded(getClass());
rebroadcastTask = null;
}
channelFactory = config.getServiceDispatcher().getChannelFactory();
formatter = new ReplicationFormatter(nameIdPair);
protocol = new Protocol(TimebasedProposalGenerator.getParser(),
MasterValue.getParser(),
config.getGroupName(),
nameIdPair,
config.getRepImpl(),
channelFactory);
this.suggestionGenerator = suggestionGenerator;
this.listener = listener;
pool = Executors.newCachedThreadPool
(new StoppableThreadFactory("JE Elections Factory " + nameIdPair,
logger));
}
/* The thread pool used to allocate threads used during elections. */
public ExecutorService getThreadPool() {
return pool;
}
public ServiceDispatcher getServiceDispatcher() {
return config.getServiceDispatcher();
}
public ElectionQuorum getElectionQuorum() {
return repNode.getElectionQuorum();
}
public RepNode getRepNode() {
return repNode;
}
public Logger getLogger() {
return logger;
}
/* Get repImpl for Proposer to set up loggers. */
public RepImpl getRepImpl() {
return config.getRepImpl();
}
/**
* Starts a Learner agent. Note that the underlying Protocol instance it
* uses must have a current picture of the replication group otherwise it
* will reject messages from nodes that it does not think are currently
* part of the replication group.
*/
public void startLearner() {
// repNode used for thread name but can be null here
learner = new Learner(config.getRepImpl(),
protocol,
config.getServiceDispatcher());
learner.start();
learner.addListener(listener);
electionListener = new ElectionListener();
learner.addListener(electionListener);
if (rebroadcastTask != null) {
repNode.getTimer().schedule(rebroadcastTask,
rebroadcastTask.getPeriod(),
rebroadcastTask.getPeriod());
}
}
/**
* Permits the Election agent to start participating in elections held
* by the replication group, or initiate elections on behalf of this node.
* Participation in elections is initiated only after a node has current
* information about group membership.
*/
public void participate() {
proposer = new RankingProposer(this, nameIdPair);
startAcceptor();
}
public void startAcceptor() {
acceptor = new Acceptor(protocol, config, suggestionGenerator);
acceptor.start();
}
/**
* Returns the Acceptor associated with this node.
* @return the Acceptor
*/
public Acceptor getAcceptor() {
return acceptor;
}
/**
* Returns a current set of acceptor sockets.
*/
public Set<InetSocketAddress> getAcceptorSockets() {
if (repGroup == null) {
throw EnvironmentFailureException.unexpectedState
("No rep group was configured");
}
return repGroup.getAllAcceptorSockets();
}
public Protocol getProtocol() {
return protocol;
}
/**
* Returns the Learner associated with this node
* @return the Learner
*/
public Learner getLearner() {
return learner;
}
/**
* The number of elections that have been held. Used for testing.
*
* @return total elections initiated by this node.
*/
public int getElectionCount() {
return nElections;
}
/**
* Initiates an election. Note that this may just be one of many possible
* elections that are in progress in a replication group. The method does
* not wait for this election to complete, but instead returns as soon as
* any election result (including one initiated by some other Proposer)
* becomes available via the Learner.
*
* A proposal submitted as part of this election may lose out to other
* concurrent elections, or there may not be a sufficient number of
* Acceptor agents active or reachable to reach a quorum. In such cases,
* the election will not produce a result. That is, there will be no
* notification to the Learners. Note that only one election can be
* initiated at a time at a node If a new election is initiated while one
* is already in progress, then the method will wait until it completes
* before starting a new one.
*
* The results of this and any other elections that may have been initiated
* concurrently by other nodes are made known to the Learner agents. Note
* that this method does not return a result, since the concurrent arrival
* of results could invalidate the result even before it's returned.
*
* @param newGroup the definition of the group to be used for this election
* @param quorumPolicy the policy to be used to reach a quorum.
* @param maxRetries the max number of times a proposal may be retried
* @throws InterruptedException
*/
public synchronized void initiateElection(RepGroupImpl newGroup,
QuorumPolicy quorumPolicy,
int maxRetries)
throws InterruptedException {
updateRepGroup(newGroup);
long startTime = System.currentTimeMillis();
nElections++;
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Election initiated; election #" + nElections);
if (electionInProgress()) {
/*
* The factor of four used below to arrive at a timeout value is a
* heuristic: A factor of two to cover any pending message exchange
* and another factor of two as a grace period. We really don't
* expect to hit this timeout in the absence of networking issues,
* hence the thread dump to understand the reason in case there's
* some bug.
*/
final int waitMs = protocol.getReadTimeout() * 4;
// A past election request, wait until the election has quiesced
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Election in progress. Waiting ... for " +
waitMs + "ms");
electionThread.join(waitMs);
if (electionThread.isAlive()) {
/* Dump out threads for future analysis if it did not quit. */
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Election did not finish as expected." +
" resorting to shutdown");
LoggerUtils.fullThreadDump(logger, envImpl, Level.INFO);
electionThread.shutdown();
}
final Exception exception =
electionThread.getSavedShutdownException();
if (exception != null) {
throw new EnvironmentFailureException
(envImpl,
EnvironmentFailureReason.UNEXPECTED_EXCEPTION,
exception);
}
}
CountDownLatch countDownLatch = null;
synchronized (electionListener) {
// Watch for any election results from this point forward
countDownLatch = electionListener.setLatch();
}
RetryPredicate retryPredicate =
new RetryPredicate(repNode, maxRetries, countDownLatch);
electionThread = new ElectionThread(quorumPolicy, retryPredicate,
envImpl,
(envImpl == null) ? null :
envImpl.getName());
electionThread.start();
try {
/* Wait until we hear of some "new" election result */
countDownLatch.await();
if (retryPredicate.pendingRetries <= 0) {
/* Ran out of retries -- a test situation */
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Retry count exhausted: " +
retryPredicate.maxRetries);
}
/*
* Note that the election thread continues to run past this point
* and may be active upon re-entry
*/
} catch (InterruptedException e) {
LoggerUtils.logMsg(logger, envImpl, formatter, Level.WARNING,
"Election initiation interrupted");
shutdown();
throw e;
}
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Election finished. Elapsed time: " +
(System.currentTimeMillis() - startTime) + "ms");
}
/**
* The standard method for requesting and election, we normally want to run
* elections until we hear of an election result. Once initiated, elections
* run until there is a successful conclusion, that is, a new master has
* been elected. Since a successful conclusion requires the participation
* of at least a simple majority, this may take a while if a sufficient
* number of nodes are not available.
*
* The above method is used mainly for testing.
*
* @throws InterruptedException
*
* @see #initiateElection
*/
public synchronized void initiateElection(RepGroupImpl newGroup,
QuorumPolicy quorumPolicy)
throws InterruptedException {
initiateElection(newGroup, quorumPolicy, Integer.MAX_VALUE);
}
/**
* Updates elections notion of the rep group, so that acceptors are aware
* of the current state of the group, even in the absence of an election
* conducted by the node itself.
*
* This method should be invoked each time a node becomes aware of a group
* membership change.
*
* @param newRepGroup defines the new group
*/
public void updateRepGroup(RepGroupImpl newRepGroup) {
repGroup = newRepGroup;
protocol.updateNodeIds(newRepGroup.getAllElectionMemberIds());
}
/**
* Updates elections notion of the rep group, so that acceptors are aware
* of the current state of the group, even in the absence of an election
* conducted by the node itself. However this method does not update the
* members in the protocol so checks are not made for the member id.
*
* This method should be invoked each time a node becomes aware of a group
* membership change.
*
* @param newRepGroup defines the new group
*/
public void updateRepGroupOnly(RepGroupImpl newRepGroup) {
repGroup = newRepGroup;
}
/**
* Predicate to determine whether an election is currently in progress.
*/
public synchronized boolean electionInProgress() {
return (electionThread != null) && electionThread.isAlive();
}
/**
* Statistics used during testing.
*/
public synchronized StatGroup getStats() {
if (electionInProgress()) {
throw EnvironmentFailureException.unexpectedState
("Election in progress");
}
return electionThread.getStats();
}
/**
* For INTERNAL TESTING ONLY. Ensures that the initiated election has
* reached a conclusion that can be tested.
*
* @throws InterruptedException
*/
public synchronized void waitForElection()
throws InterruptedException {
assert(electionThread != null);
electionThread.join();
}
/**
* Shutdown all acceptor and learner agents by broadcasting a Shutdown
* message. It waits until reachable agents have acknowledged the message
* and the local learner and acceptor threads have exited.
*
* This is method is intended for use during testing only.
*
* @throws InterruptedException
*/
public void shutdownAcceptorsLearners
(Set<InetSocketAddress> acceptorSockets,
Set<InetSocketAddress> learnerSockets)
throws InterruptedException {
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Elections being shutdown");
FutureTrackingCompService<MessageExchange> compService =
Utils.broadcastMessage(acceptorSockets,
Acceptor.SERVICE_NAME,
protocol.new Shutdown(),
pool);
/* The 60 seconds is just a reasonable timeout for use in tests */
Utils.checkFutures(compService, 60, TimeUnit.SECONDS,
logger, envImpl, formatter);
compService = Utils.broadcastMessage(learnerSockets,
Learner.SERVICE_NAME,
protocol.new Shutdown(),
pool);
Utils.checkFutures(compService, 60, TimeUnit.SECONDS,
logger, envImpl, formatter);
if (learner != null) {
learner.join();
}
if (acceptor != null) {
acceptor.join();
}
}
/**
* Shuts down just the election support at this node. That is the Acceptor,
* and Learner associated with this Elections as well as any pending
* election running in its thread is terminated.
*
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
if (!shutdown.compareAndSet(false, true)) {
return;
}
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Elections shutdown initiated");
if (acceptor != null) {
acceptor.shutdown();
}
if (learner != null) {
learner.shutdown();
}
if (electionThread != null) {
electionThread.shutdown();
}
if (proposer != null) {
proposer.shutdown();
}
if (rebroadcastTask != null) {
rebroadcastTask.cancel();
}
pool.shutdown();
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Elections shutdown completed");
}
public boolean isShutdown() {
return shutdown.get();
}
/**
* Used to short-circuit Proposal retries if a new election has completed
* since the time this election was initiated.
*/
static class ElectionListener implements Learner.Listener {
/*
* The election latch that is shared by the RetryPredicate. It's
* counted down either when some election result becomes available or
* when elections that are in progress need to be shutdown.
*/
private CountDownLatch electionLatch = null;
ElectionListener() {
this.electionLatch = null;
}
/**
* Returns a new latch to be associated with the RetryPredicate.
*/
public synchronized CountDownLatch setLatch() {
electionLatch = new CountDownLatch(1);
return electionLatch;
}
/**
* Used during shutdown only
*
* @return the latch on which elections wait
*/
public CountDownLatch getElectionLatch() {
return electionLatch;
}
/**
* The Listener protocol announcing election results.
*/
@Override
public synchronized void notify(Proposal proposal, Value value) {
// Free up the retry predicate if its waiting
if (electionLatch != null) {
electionLatch.countDown();
}
}
}
/**
* Implements the retry policy
*/
static class RetryPredicate implements Proposer.RetryPredicate {
private final RepNode repNode;
private final int maxRetries;
private int pendingRetries;
/* The latch that is activated by the Listener. */
private final CountDownLatch electionLatch;
/*
* The number of time to retry an election before trying to activate
* the primary.
*/
private final int primaryRetries;
/*
* The backoff period returned by the next call to backoffWaitTime()
*/
private int nextBackoffSec = 0;
/* Defines the range for nextBackoffSec */
private static final int BACKOFF_SLEEP_MIN = 1;
private static final int BACKOFF_SLEEP_MAX = 32;
RetryPredicate(RepNode repNode,
int maxRetries,
CountDownLatch electionLatch) {
this.repNode = repNode;
this.maxRetries = maxRetries;
pendingRetries = maxRetries;
this.electionLatch = electionLatch;
final RepImpl repImpl = repNode.getRepImpl();
final IntConfigParam retriesParam =
RepParams.ELECTIONS_PRIMARY_RETRIES;
primaryRetries = (repImpl != null) ?
repImpl.getConfigManager().getInt(retriesParam) :
Integer.parseInt(retriesParam.getDefault());
}
/**
* Returns the time to backoff before a retry. The backoff is
* non-linear: each call returns double the previous value starting
* with a value of zero and proceeding from 1 to 32sec.
*
* @return the time to backoff in ms
*/
private int backoffWaitTime() {
final int currBackOffSec = nextBackoffSec;
nextBackoffSec = (nextBackoffSec == 0) ?
BACKOFF_SLEEP_MIN :
Math.min(BACKOFF_SLEEP_MAX, nextBackoffSec * 2);
return currBackOffSec * 1000;
}
/**
* Implements the protocol
*/
@Override
public boolean retry() throws InterruptedException {
if ((maxRetries - pendingRetries) >= primaryRetries) {
if ((repNode != null) &&
repNode.getArbiter().activateArbitration()) {
pendingRetries = maxRetries;
return true;
}
}
if (pendingRetries-- <= 0) {
/* Free up the main election thread */
electionLatch.countDown();
return false;
}
/*
* Return true if a Listener was informed of a completed election,
* false if no such election concluded within the timeout period.
*/
return !electionLatch.await(backoffWaitTime(), TimeUnit.MILLISECONDS);
}
/**
* The number of times a retry was attempted
*/
@Override
public int retries() {
return (maxRetries-pendingRetries);
}
@Override
public boolean electionRoundConcluded() {
return electionLatch.getCount() <= 0;
}
}
/**
* The thread that actually runs an election. The thread exits either after
* it has successfully had its proposal accepted and after it has informed
* all learners, or if it gives up after some number of retries.
*/
private class ElectionThread extends StoppableThread {
final private QuorumPolicy quorumPolicy;
/* Non-null on termination if a proposal was issued and accepted. */
Proposer.WinningProposal winningProposal;
/* Non-null at termination if no proposal was accepted. */
ExitElectionException maxRetriesException;
final private RetryPredicate retryPredicate;
private ElectionThread(QuorumPolicy quorumPolicy,
RetryPredicate retryPredicate,
EnvironmentImpl envImpl,
String envName) {
super(envImpl, "ElectionThread_" + envName);
this.quorumPolicy = quorumPolicy;
this.retryPredicate = retryPredicate;
}
/**
* Carries out an election and informs learners of the results. Any
* uncaught exception will invalidate the environment if this is
* being executed on behalf of a replicated node.
*/
@Override
public void run() {
try {
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Started election thread " + new Date());
winningProposal =
proposer.issueProposal(quorumPolicy, retryPredicate);
/*
* TODO: Consider adding an optimization to inform SECONDARY
* nodes of election results, but continuing to only wait for
* the completion of notifications to ELECTABLE nodes. That
* change would increase the chance that SECONDARY nodes have
* up-to-date information about the master, but would avoid
* adding sensitivity to potentially longer network delays in
* communicating with secondary nodes.
*/
Learner.informLearners(repGroup.getAllLearnerSockets(),
winningProposal,
protocol,
pool,
logger,
config.getRepImpl(),
null);
} catch (ExitElectionException mre) {
maxRetriesException = mre;
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Exiting election after " +
retryPredicate.retries() + " retries");
return;
} catch (InterruptedException e) {
pool.shutdownNow();
LoggerUtils.logMsg(logger, envImpl, formatter, Level.INFO,
"Election thread interrupted");
} catch (Exception e) {
saveShutdownException(e);
} finally {
cleanup();
LoggerUtils.logMsg
(logger, envImpl, formatter, Level.INFO,
"Election thread exited. Group master: " +
((repNode != null) ?
repNode.getMasterStatus().getGroupMasterNameId() :
Integer.MAX_VALUE));
}
}
public void shutdown() {
if (shutdownDone(logger)) {
return;
}
shutdownThread(logger);
}
@Override
protected int initiateSoftShutdown() {
final CountDownLatch electionLatch =
electionListener.getElectionLatch();
if (electionLatch != null) {
/*
* Unblock any initiated elections waiting for a result as
* well as this thread.
*/
electionLatch.countDown();
}
/*
* Wait roughly for the time it would take for a read to timeout.
* since the delay in testing the latch is probably related to
* some slow network event
*/
return protocol.getReadTimeout();
}
/**
* Statistics from the election. Should only be invoked after the run()
* method has exited.
*
* @return statistics generated by the proposer
*/
StatGroup getStats() {
return (winningProposal != null) ?
winningProposal.proposerStats :
maxRetriesException.proposerStats;
}
/**
* @see StoppableThread#getLogger
*/
@Override
protected Logger getLogger() {
return logger;
}
}
/**
* Used to propagate the results of an election to any monitors. Note that
* monitors are informed of results redundantly, both from the node that
* concludes the election and via this re-propagation. The use of multiple
* network paths increases the likelihood that the result will reach the
* monitor via some functioning network path.
*
* The method returns immediately after queuing the operation in the
* thread pool.
*/
public void asyncInformMonitors(Proposal proposal, Value value) {
final Set<InetSocketAddress> monitorSockets =
repGroup.getAllMonitorSockets();
if (monitorSockets.size() == 0) {
return;
}
LoggerUtils.logMsg
(logger, envImpl, formatter, Level.INFO,
String.format("Propagating election results to %d monitors\n",
monitorSockets.size()));
pool.execute(new InformLearners
(monitorSockets,
new Proposer.WinningProposal(proposal, value, null)));
}
/**
* Provides the underpinnings of the async mechanism used to deliver
* election results to the monitors.
*/
private class InformLearners implements Runnable {
final Set<InetSocketAddress> learners;
final Proposer.WinningProposal winningProposal;
InformLearners(Set<InetSocketAddress> learners,
Proposer.WinningProposal winningProposal) {
this.learners = learners;
this.winningProposal = winningProposal;
}
@Override
public void run() {
Learner.informLearners(learners,
winningProposal,
protocol,
pool,
logger,
config.getRepImpl(),
null);
}
}
/**
* Task to re-inform learners of election results by re-broadcasting the
* results of an election from the master. This re-broadcast is intended to
* help in network partition situations. See [#20220] for details.
*/
private class RebroadcastTask extends TimerTask {
/* Lock to ensure that async executions don't overlap. */
private final ReentrantLock lock = new ReentrantLock();
private int acquireFailCount = 0;
private final int periodMs;
public RebroadcastTask(int periodMs) {
this.periodMs = periodMs;
}
public int getPeriod() {
return periodMs;
}
/**
* If the node is a master, it broadcasts election results to nodes
* that are not currently connected to it via feeders.
*
* It's worth noting that since this is a timer task method it must be
* be lightweight. So the actual broadcast is done in an asynchronous
* method using a thread from the election thread pool.
*/
@Override
public void run() {
try {
if (!lock.tryLock()) {
if ((++acquireFailCount % 100) == 0) {
LoggerUtils.logMsg(logger, envImpl, formatter,
Level.WARNING,
"Failed to acquire lock after " +
acquireFailCount + " retries");
}
return;
}
acquireFailCount = 0;
if (!repNode.getMasterStatus().isGroupMaster()) {
return;
}
/*
* Re-informing when the node is a master is just an
* optimization, it does not impact correctness. Further
* minimize network traffic by trying just the nodes that are
* currently disconnected.
*/
final FeederManager feederManager = repNode.feederManager();
final Set<String> active = feederManager.activeReplicas();
active.add(repNode.getNodeName());
final Set<InetSocketAddress> learners =
new HashSet<>();
for (final RepNodeImpl rn : repGroup.getAllLearnerMembers()) {
if (!active.contains(rn.getName())) {
learners.add(rn.getSocketAddress());
}
}
if (learners.size() == 0) {
return;
}
LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE,
"informing learners:" +
Arrays.toString(learners.toArray()) +
" active: " +
Arrays.toString(active.toArray()));
pool.execute(new Runnable() {
@Override
public void run() {
learner.reinformLearners(learners, pool);
}
});
} catch (Exception e) {
LoggerUtils.logMsg(logger, envImpl, formatter, Level.SEVERE,
"Unexpected exception:" + e.getMessage());
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
}