blob: fe28daa11673169c5ac6dc88467418028a73e24f [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.impl.node;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_FEEDERS_CREATED;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_FEEDERS_SHUTDOWN;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_95_DELAY_MS_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_99_DELAY_MS_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_AVG_DELAY_MS_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_DELAY_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_LAST_COMMIT_TIMESTAMP_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_LAST_COMMIT_VLSN_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_MAX_DELAY_MS_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_VLSN_LAG_MAP;
import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.REPLICA_VLSN_RATE_MAP;
import static java.util.concurrent.TimeUnit.MINUTES;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepNodeImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.node.cbvlsn.LocalCBVLSNTracker;
import com.sleepycat.je.rep.impl.node.cbvlsn.LocalCBVLSNUpdater;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException;
import com.sleepycat.je.rep.txn.MasterTxn;
import com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition;
import com.sleepycat.je.rep.utilint.IntRunningTotalStat;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.utilint.SizeAwaitMap;
import com.sleepycat.je.rep.utilint.SizeAwaitMap.Predicate;
import com.sleepycat.je.utilint.AtomicLongMapStat;
import com.sleepycat.je.utilint.IntStat;
import com.sleepycat.je.utilint.LatencyPercentileMapStat;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.LongAvgMapStat;
import com.sleepycat.je.utilint.LongAvgRateMapStat;
import com.sleepycat.je.utilint.LongDiffMapStat;
import com.sleepycat.je.utilint.LongMaxMapStat;
import com.sleepycat.je.utilint.PollCondition;
import com.sleepycat.je.utilint.StatGroup;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
import com.sleepycat.je.utilint.VLSN;
/**
* FeedManager is responsible for the creation and management of the Feeders
* used to respond to connections initiated by a Replica. runfeeders() is the
* central loop that listens for replica connections and manages the lifecycle
* of individual Feeders. It's re-entered each time the node becomes a Master
* and is exited when its status changes.
*
* There is a single instance of FeederManager that is created for a
* replication node. There are many instances of Feeders per FeederManager.
* Each Feeder instance represents an instance of a connection between the node
* serving as the feeder and the replica.
*
* Note that the FeederManager and the Replica currently reuse the Replication
* node's thread of control. When we implement r2r we will need to revisit the
* thread management to provide for concurrent operation of the FeederManger
* and the Replica.
*/
final public class FeederManager {
private final RepNode repNode;
/*
* The queue into which the ServiceDispatcher queues socket channels for
* new Feeder instances.
*/
private final BlockingQueue<DataChannel> channelQueue =
new LinkedBlockingQueue<>();
/*
* Feeders are stored in either nascentFeeders or activeFeeders, and not
* both. To avoid deadlock, if locking both collections, lock
* nascentFeeders first and then activeFeeders.
*/
/*
* Nascent feeders that are starting up and are not yet active. They have
* network connections but have not synched up or completed handshakes.
* They are moved into the active feeder map, once they become active.
*/
private final Set<Feeder> nascentFeeders =
Collections.synchronizedSet(new HashSet<Feeder>());
/*
* The collection of active feeders currently feeding replicas. The map is
* indexed by the Replica's node name. Access to this map must be
* synchronized, since it's updated concurrently by the Feeders that share
* it.
*
* A feeder is considered to be active after it has completed the handshake
* sequence with its associated Replica.
*
* Note that the SizeAwaitMap will only wait for feeders that are connected
* to electable replicas, since those are the only ones participating in
* durability decisions.
*/
private final SizeAwaitMap<String, Feeder> activeFeeders;
/**
* The number of feeders currently being managed by the FeederManager. This
* includes feeders that are in the process of starting up or shutting down.
* The count is increments when a Feeder is first created and is decremented
* after all cleanup associated with the Feeder has been completed.
*/
private final AtomicInteger managedFeederCount = new AtomicInteger(0);
/*
* The number of active ack feeders feeding electable, i.e. acking, nodes.
*/
private final AtomicInteger ackFeeders = new AtomicInteger(0);
/*
* The number of arbiter feeders; there can only be one currently. It's
* Atomic for consistency with ackFeeders.
*/
private final AtomicInteger arbiterFeeders = new AtomicInteger(0);
private String arbiterFeederName;
/*
* A test delay introduced in the feeder loop to simulate a loaded master.
* The feeder waits this amount of time after each message is sent.
*/
private int testDelayMs = 0;
/* Set to true to force a shutdown of the FeederManager. */
AtomicBoolean shutdown = new AtomicBoolean(false);
/*
* Non null if the replication node must be shutdown as well. This is
* typically the result of an unexpected exception in the feeder.
*/
private RuntimeException repNodeShutdownException;
/**
* Used to manage the flushing of the DTVLSN via a null TXN commit
* when appropriate.
*/
private final DTVLSNFlusher dtvlsnFlusher;
private final Logger logger;
/* FeederManager statistics. */
private final StatGroup stats;
private final IntStat nFeedersCreated;
private final IntStat nFeedersShutdown;
private final LongDiffMapStat replicaDelayMap;
private final LongAvgMapStat replicaAvgDelayMsMap;
private final LatencyPercentileMapStat replica95DelayMsMap;
private final LatencyPercentileMapStat replica99DelayMsMap;
private final LongMaxMapStat replicaMaxDelayMsMap;
private final AtomicLongMapStat replicaLastCommitTimestampMap;
private final AtomicLongMapStat replicaLastCommitVLSNMap;
private final LongDiffMapStat replicaVLSNLagMap;
private final LongAvgRateMapStat replicaVLSNRateMap;
/* The poll timeout used when accepting feeder connections. */
public final long pollTimeoutMs ;
/* Identifies the Feeder Service. */
public static final String FEEDER_SERVICE = "Feeder";
/** The moving average period in milliseconds */
private static final long MOVING_AVG_PERIOD_MILLIS = 10000;
/*
* The total amount of time to wait for all Feeders to exit when the
* FeederManager is shutdown. The value is generously large, since we
* typically expect Feeders to exit rapidly when shutdown. If the
* FeederManager takes longer than this time period to shutdown, it's taken
* to be an indication of a problem and a full thread dump is logged to help
* identify the tardy feeder.
*/
private static final int FEEDER_SHUTDOWN_WAIT_MILLIS = 30000;
/**
* A test hook, parameterized by feeder's Name/ID pair, that delays CBVLSN
* updates if it throws an IllegalStateException.
*/
private static volatile TestHook<NameIdPair> delayCBVLSNUpdateHook;
FeederManager(RepNode repNode) {
this.repNode = repNode;
activeFeeders = new SizeAwaitMap<>(
repNode.getRepImpl(), new MatchElectableFeeders());
logger = LoggerUtils.getLogger(getClass());
stats = new StatGroup(FeederManagerStatDefinition.GROUP_NAME,
FeederManagerStatDefinition.GROUP_DESC);
nFeedersCreated = new IntRunningTotalStat(stats, N_FEEDERS_CREATED);
nFeedersShutdown = new IntRunningTotalStat(stats, N_FEEDERS_SHUTDOWN);
/*
* Treat delays and lags as valid for twice the heartbeat interval, to
* allow for minor networking delays when receiving heartbeats
*/
final long validityMillis = 2 * repNode.getHeartbeatInterval();
replicaDelayMap =
new LongDiffMapStat(stats, REPLICA_DELAY_MAP, validityMillis);
replicaAvgDelayMsMap =
new LongAvgMapStat(stats, REPLICA_AVG_DELAY_MS_MAP);
replica95DelayMsMap = new LatencyPercentileMapStat(
stats, REPLICA_95_DELAY_MS_MAP, 0.95f);
replica99DelayMsMap = new LatencyPercentileMapStat(
stats, REPLICA_99_DELAY_MS_MAP, 0.99f);
replicaMaxDelayMsMap =
new LongMaxMapStat(stats, REPLICA_MAX_DELAY_MS_MAP);
replicaLastCommitTimestampMap =
new AtomicLongMapStat(stats, REPLICA_LAST_COMMIT_TIMESTAMP_MAP);
replicaLastCommitVLSNMap =
new AtomicLongMapStat(stats, REPLICA_LAST_COMMIT_VLSN_MAP);
replicaVLSNLagMap =
new LongDiffMapStat(stats, REPLICA_VLSN_LAG_MAP, validityMillis);
replicaVLSNRateMap = new LongAvgRateMapStat(
stats, REPLICA_VLSN_RATE_MAP, MOVING_AVG_PERIOD_MILLIS, MINUTES);
pollTimeoutMs = repNode.getConfigManager().
getDuration(RepParams.FEEDER_MANAGER_POLL_TIMEOUT);
dtvlsnFlusher = new DTVLSNFlusher();
}
/**
* A SizeAwaitMap predicate that matches feeders connected to electable
* replicas.
*/
private class MatchElectableFeeders implements Predicate<Feeder> {
@Override
public boolean match(final Feeder value) {
/* The replica node might be null during unit testing */
final RepNodeImpl replica = value.getReplicaNode();
return (replica != null) &&
repNode.getDurabilityQuorum().replicaAcksQualify(replica);
}
}
/**
* Returns the statistics associated with the FeederManager.
*
* @return the statistics
*/
public StatGroup getFeederManagerStats(StatsConfig config) {
synchronized (stats) {
return stats.cloneGroup(config.getClear());
}
}
/* Get the protocol stats for this FeederManager. */
public StatGroup getProtocolStats(StatsConfig config) {
/* Aggregate stats that have not yet been aggregated. */
StatGroup protocolStats =
new StatGroup(BinaryProtocolStatDefinition.GROUP_NAME,
BinaryProtocolStatDefinition.GROUP_DESC);
synchronized (activeFeeders) {
for (Feeder feeder : activeFeeders.values()) {
protocolStats.addAll(feeder.getProtocolStats(config));
}
}
return protocolStats;
}
/* Reset the feeders' stats of this FeederManager. */
public void resetStats() {
synchronized (stats) {
stats.clear();
}
synchronized (activeFeeders) {
for (Feeder feeder : activeFeeders.values()) {
feeder.resetStats();
}
}
}
/**
* Accumulates statistics from a terminating feeder.
* @param feederStats stats of feeder
*/
void incStats(StatGroup feederStats) {
synchronized (stats) {
stats.addAll(feederStats);
}
}
public int getTestDelayMs() {
return testDelayMs;
}
public void setTestDelayMs(int testDelayMs) {
this.testDelayMs = testDelayMs;
}
/**
* Returns the RepNode associated with the FeederManager
* @return
*/
RepNode repNode() {
return repNode;
}
/**
* Returns the Feeder associated with the node, if such a feeder is
* currently active.
*/
public Feeder getFeeder(String nodeName) {
return activeFeeders.get(nodeName);
}
public Feeder getArbiterFeeder() {
synchronized (activeFeeders) {
return activeFeeders.get(arbiterFeederName);
}
}
/*
* For test use only.
*/
public Feeder putFeeder(String nodeName, Feeder feeder) {
/*
* Can't check for an electable node since the feeder object can be
* mocked for testing so it does not have a rep node.
*/
ackFeeders.incrementAndGet();
return activeFeeders.put(nodeName, feeder);
}
LongDiffMapStat getReplicaDelayMap() {
return replicaDelayMap;
}
LongAvgMapStat getReplicaAvgDelayMsMap() {
return replicaAvgDelayMsMap;
}
LatencyPercentileMapStat getReplica95DelayMsMap() {
return replica95DelayMsMap;
}
LatencyPercentileMapStat getReplica99DelayMsMap() {
return replica99DelayMsMap;
}
LongMaxMapStat getReplicaMaxDelayMsMap() {
return replicaMaxDelayMsMap;
}
AtomicLongMapStat getReplicaLastCommitTimestampMap() {
return replicaLastCommitTimestampMap;
}
AtomicLongMapStat getReplicaLastCommitVLSNMap() {
return replicaLastCommitVLSNMap;
}
LongDiffMapStat getReplicaVLSNLagMap() {
return replicaVLSNLagMap;
}
LongAvgRateMapStat getReplicaVLSNRateMap() {
return replicaVLSNRateMap;
}
void setRepNodeShutdownException(RuntimeException rNSE) {
this.repNodeShutdownException = rNSE;
}
/**
* The numbers of Replicas currently "active" with this feeder. Active
* currently means they are connected. It does not make any guarantees
* about where they are in the replication stream. They may, for example,
* be too far behind to participate in timely acks.
*
* @return the active replica count
*/
public int activeReplicaCount() {
return activeFeeders.size();
}
public int activeAckReplicaCount() {
return ackFeeders.get();
}
public int activeAckArbiterCount() {
return arbiterFeeders.get();
}
/**
* Returns the set of Replicas that are currently active with this feeder.
* A replica is active if it has completed the handshake sequence.
*
* @return the set of replica node names
*/
public Set<String> activeReplicas() {
synchronized (activeFeeders) {
/*
* Create a copy to avoid inadvertent concurrency conflicts,
* since the keySet is a view of the underlying map.
*/
return new HashSet<>(activeFeeders.keySet());
}
}
/**
* Returns the set of active replicas and arbiters, that are currently
* active with this feeder and are supplying acknowledgments. A replica is
* active if it has completed the handshake sequence. An Arbiter is only
* returned if it's in active arbitration.
*
* @param includeArbiters include active arbiters in the list of returned
* node names if true; exclude arbiters otherwise.
*
* @return the set of replica and if includeArbiters active arbiter node names
*/
public Set<String> activeAckReplicas(boolean includeArbiters) {
final Set<String> nodeNames = new HashSet<>();
synchronized (activeFeeders) {
for (final Entry<String, Feeder> entry :
activeFeeders.entrySet()) {
final Feeder feeder = entry.getValue();
/* The replica node should be non-null for an active feeder */
final RepNodeImpl replica = feeder.getReplicaNode();
if (!replica.getType().isElectable()) {
continue;
}
if (replica.getType().isArbiter()) {
if (!includeArbiters ||
!feeder.getRepNode().getArbiter().isActive()) {
/* Skip the arbiter. */
continue;
}
}
nodeNames.add(entry.getKey());
}
}
return nodeNames;
}
public Map<String, Feeder> activeReplicasMap() {
synchronized (activeFeeders){
return new HashMap<>(activeFeeders);
}
}
/**
* Transitions a Feeder to being active, so that it can be used in
* considerations relating to commit acknowledgments and decisions about
* choosing feeders related to system load.
*
* @param feeder the feeder being transitioned.
*/
void activateFeeder(Feeder feeder) {
synchronized (nascentFeeders) {
synchronized (activeFeeders) {
boolean removed = nascentFeeders.remove(feeder);
if (feeder.isShutdown()) {
return;
}
assert(removed);
String replicaName = feeder.getReplicaNameIdPair().getName();
assert(!feeder.getReplicaNameIdPair().equals(NameIdPair.NULL));
Feeder dup = activeFeeders.get(replicaName);
if ((dup != null) && !dup.isShutdown()) {
throw EnvironmentFailureException.
unexpectedState(repNode.getRepImpl(),
feeder.getReplicaNameIdPair() +
" is present in both nascent and " +
"active feeder sets");
}
activeFeeders.put(replicaName, feeder);
if (feeder.getReplicaNode().getType().isArbiter()) {
assert(arbiterFeeders.get() == 0);
arbiterFeeders.incrementAndGet();
arbiterFeederName = replicaName;
} else if (feeder.getReplicaNode().getType().isElectable()) {
ackFeeders.incrementAndGet();
}
MasterTransfer xfr = repNode.getActiveTransfer();
if (xfr != null) {
xfr.addFeeder(feeder);
}
}
}
}
/**
* Remove the feeder from the sets used to track it. Invoked when a feeder
* is shutdown.
*
* @param feeder
*/
void removeFeeder(Feeder feeder) {
assert(feeder.isShutdown());
final String replicaName = feeder.getReplicaNameIdPair().getName();
synchronized (nascentFeeders) {
synchronized (activeFeeders) {
nascentFeeders.remove(feeder);
if (activeFeeders.remove(replicaName) != null) {
if (arbiterFeederName != null &&
arbiterFeederName.equals(replicaName)) {
arbiterFeeders.decrementAndGet();
arbiterFeederName = null;
} else if (feeder.getReplicaNode().getType().isElectable()) {
ackFeeders.decrementAndGet();
}
}
}
}
final RepNodeImpl node = feeder.getReplicaNode();
if ((node != null) && node.getType().hasTransientId()) {
repNode.removeTransientNode(node);
}
}
/**
* Clears and shuts down the runFeeders by inserting a special EOF marker
* value into the queue.
*/
void shutdownQueue() {
if (!repNode.isShutdown()) {
throw EnvironmentFailureException.unexpectedState
("Rep node is still active");
}
channelQueue.clear();
/* Add special entry so that the channelQueue.poll operation exits. */
channelQueue.add(RepUtils.CHANNEL_EOF_MARKER);
}
/**
* The core feeder listener loop that is run either in a Master node, or in
* a Replica that is serving as a Feeder to other Replica nodes. The core
* loop accepts connections from Replicas as they come in and establishes a
* Feeder on that connection.
*
* The loop can be terminated for one of the following reasons:
*
* 1) A change in Masters.
*
* 2) A forced shutdown, via a thread interrupt.
*
* 3) A server socket level exception.
*
* The timeout on the accept is used to ensure that the check is done at
* least once per timeout period.
*/
void runFeeders()
throws DatabaseException {
if (shutdown.get()) {
throw EnvironmentFailureException.unexpectedState
("Feeder manager was shutdown");
}
Exception feederShutdownException = null;
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder manager accepting requests.");
/* Init GlobalCBVLSN using minJEVersion in the rep group DB. */
repNode.globalCBVLSN.init(repNode, repNode.getMinJEVersion());
/* This updater represents the masters's local cbvlsn, which the master
updates directly. */
final LocalCBVLSNUpdater updater = new LocalCBVLSNUpdater(
repNode.getNameIdPair(), repNode.getNodeType(), repNode);
final LocalCBVLSNTracker tracker = repNode.getCBVLSNTracker();
try {
/*
* Ensure that the Global CBVLSN is initialized for the master when
* it first comes up; it's subsequently maintained in the loop
* below.
*/
updater.updateForMaster(tracker);
repNode.getServiceDispatcher().
register(FEEDER_SERVICE, channelQueue);
/*
* The Feeder is ready for business, indicate that the node is
* ready by counting down the latch and releasing any waiters.
*/
repNode.getReadyLatch().countDown();
while (true) {
final DataChannel feederReplicaChannel =
channelQueue.poll(pollTimeoutMs, TimeUnit.MILLISECONDS);
if (feederReplicaChannel == RepUtils.CHANNEL_EOF_MARKER) {
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder manager soft shutdown.");
return;
}
try {
repNode.getMasterStatus().assertSync();
} catch (MasterSyncException mse) {
RepUtils.shutdownChannel(feederReplicaChannel);
throw mse;
}
if (feederReplicaChannel == null) {
if (repNode.isShutdownOrInvalid()) {
/* Timeout and shutdown request */
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder manager forced shutdown.");
return;
}
/*
* Simulate extending the polling period for channel input
* by delay updating the CBVLSN, to allow tests to control
* the timing.
*/
try {
assert TestHookExecute.doHookIfSet(
delayCBVLSNUpdateHook, repNode.getNameIdPair());
} catch (IllegalStateException e) {
continue;
}
/*
* Take this opportunity to update this node's CBVLSN The
* replicas are sending in their CBVLSNs through the
* heartbeat responses, but a master does not send any
* heartbeat responses, and needs a different path to
* update its local CBVLSN.
*/
updater.updateForMaster(tracker);
/* Flush the DTVLSN if it's warranted. */
dtvlsnFlusher.flush();
/*
* Opportunistically attempt to update minJEVersion.
* This must be done while the feeder is active.
*/
repNode.globalCBVLSN.setDefunctJEVersion(repNode);
continue;
}
final Feeder feeder;
try {
feeder = new Feeder(this, feederReplicaChannel);
} catch (IOException e) {
RepUtils.shutdownChannel(feederReplicaChannel);
continue;
}
nFeedersCreated.increment();
nascentFeeders.add(feeder);
managedFeederCount.incrementAndGet();
feeder.startFeederThreads();
}
} catch (MasterSyncException e) {
LoggerUtils.info(logger, repNode.getRepImpl(),
"Master change: " + e.getMessage());
feederShutdownException = new UnknownMasterException("Node " +
repNode.getRepImpl().getName() +
" is not a master anymore");
} catch (InterruptedException e) {
if (this.repNodeShutdownException != null) {
/*
* The interrupt was issued to propagate an exception from one
* of the Feeder threads. It's not a normal exit.
*/
LoggerUtils.warning(logger, repNode.getRepImpl(),
"Feeder manager unexpected interrupt");
throw repNodeShutdownException; /* Terminate the rep node */
}
if (repNode.isShutdown()) {
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder manager interrupted for shutdown");
return;
}
feederShutdownException = e;
LoggerUtils.warning(logger, repNode.getRepImpl(),
"Feeder manager unexpected interrupt");
} finally {
repNode.resetReadyLatch(feederShutdownException);
repNode.getServiceDispatcher().cancel(FEEDER_SERVICE);
shutdownFeeders(feederShutdownException);
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder manager exited. CurrentTxnEnd VLSN: " +
repNode.getCurrentTxnEndVLSN());
}
}
/**
* Shuts down all the feeders managed by the FeederManager. It waits for
* all Feeder activity to stop before returning.
*
* @param feederShutdownException the exception provoking the shutdown.
*/
private void shutdownFeeders(Exception feederShutdownException) {
boolean changed = shutdown.compareAndSet(false, true);
if (!changed) {
return;
}
try {
/* Copy sets for safe iteration in the presence of deletes.*/
final Set<Feeder> feederSet;
synchronized (nascentFeeders) {
synchronized (activeFeeders) {
feederSet = new HashSet<>(activeFeeders.values());
feederSet.addAll(nascentFeeders);
}
}
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder Manager shutting down feeders." +
" Active and nascent feeders:" + feederSet.size() +
" Managed feeders:" + managedFeederCount.get());
for (Feeder feeder : feederSet) {
nFeedersShutdown.increment();
feeder.shutdown(feederShutdownException);
}
/*
* This extra check below is used to ensure that there is no
* premature return while feeder threads are still running. This
* could happen, for example, in the face of concurrent shutdowns
* from this method and the Feeder Input/Output threads where
* Feeder.shutdown may have removed the Feeder from the set of
* nascent and active feeders as part of theFeeder shutdown, or the
* shutdown() method call above may return immediately due to the
* ongoing shutdown initiated by the Feeder.
*/
boolean shutdownCompleted =
new PollCondition(1, FEEDER_SHUTDOWN_WAIT_MILLIS) {
@Override
protected boolean condition() {
return managedFeederCount.get() == 0;
}
}.await();
if (!shutdownCompleted) {
final String msg = this +
" Feeder Manager shutdown failed to quiesce. Feeder count:" +
managedFeederCount.get() + " after waiting for " +
FEEDER_SHUTDOWN_WAIT_MILLIS + " ms.";
LoggerUtils.severe(logger, repNode.getRepImpl(), msg);
LoggerUtils.fullThreadDump(logger, repNode.getRepImpl(),
Level.SEVERE);
throw new IllegalStateException(msg, feederShutdownException);
}
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder Manager completed shutting down " +
"feeders.");
} finally {
if (feederShutdownException == null) {
/*
* Release any threads that may have been waiting, but don't
* throw any exception
*/
activeFeeders.clear(null);
} else {
activeFeeders.clear(feederShutdownException);
}
nascentFeeders.clear();
}
}
/**
* Shuts down a specific feeder. It's typically done in response to the
* removal of a member from the group.
*/
public void shutdownFeeder(RepNodeImpl node) {
Feeder feeder = activeFeeders.get(node.getName());
if (feeder == null) {
return;
}
nFeedersShutdown.increment();
feeder.shutdown(null);
}
/**
* Block until the required number of electable feeders/replica connections
* are established. Used for establishing durability quorums. Since this is
* counting feeder/replica connections, requiredReplicaCount does not
* include the master.
*
* In the future this could be improved by also taking account the position
* of each feeder. If a feeder is lagging far behind the master and this
* is likely to prevent commit, we may want to reject the transaction at
* the outset to reduce the number of wasted txns/aborts. A special case
* is when the replica is in out-of-disk mode and not acking at all.
*/
boolean awaitFeederReplicaConnections(
int requiredReplicaCount, long insufficientReplicasTimeout)
throws InterruptedException {
return activeFeeders.sizeAwait(requiredReplicaCount,
insufficientReplicasTimeout,
TimeUnit.MILLISECONDS);
}
/*
* For debugging help, and for expanded exception messages, dump feeder
* related state. If acksOnly is true, only include information about
* feeders for replicas that supply acknowledgments.
*/
public String dumpState(final boolean acksOnly) {
StringBuilder sb = new StringBuilder();
synchronized (activeFeeders) {
Set<Map.Entry<String, Feeder>> feeds = activeFeeders.entrySet();
if (feeds.size() == 0) {
sb.append("No feeders.");
} else {
sb.append("Current feeds:");
for (Map.Entry<String, Feeder> feedEntry : feeds) {
final Feeder feeder = feedEntry.getValue();
/*
* Ignore secondary and external nodes if only want nodes
* that provide acknowledgments
*/
if (acksOnly) {
final NodeType nodeType =
feeder.getReplicaNode().getType();
if (nodeType.isSecondary() || nodeType.isExternal()) {
continue;
}
}
sb.append("\n ").append(feedEntry.getKey()).append(": ");
sb.append(feeder.dumpState());
}
}
}
return sb.toString();
}
/**
* Returns a count of the number of feeders whose replicas are counted in
* durability decisions and have acknowledged txn-end VLSNs {@literal >=}
* the commitVLSN argument.
*
* @param commitVLSN the commitVLSN being checked
*/
public int getNumCurrentAckFeeders(VLSN commitVLSN) {
final DurabilityQuorum durabilityQuorum =
repNode.getDurabilityQuorum();
int count = 0;
synchronized (activeFeeders) {
for (Feeder feeder : activeFeeders.values()) {
if ((commitVLSN.compareTo(feeder.getReplicaTxnEndVLSN()) <= 0)
&& durabilityQuorum.replicaAcksQualify(
feeder.getReplicaNode())) {
count++;
}
}
return count;
}
}
/**
* Update the Master's DTVLSN if we can conclude based upon the state of
* the replicas that the DTVLSN needs to be advanced.
*
* This method is invoked when a replica heartbeat reports a more recent
* txn VLSN. This (sometimes) redundant form of DTVLS update is useful in
* circumstances when the value could not be maintained via the usual ack
* response processing:
*
* 1) The application is using no ack transactions explicitly.
*
* 2) There were ack transaction timeouts due to network problems and the
* acks were never received or were received after the timeout had expired.
*/
public void updateDTVLSN(long heartbeatVLSN) {
final long currDTVLSN = repNode.getDTVLSN();
if (heartbeatVLSN <= currDTVLSN) {
/* Nothing to update, a lagging replica that's catching up */
return;
}
final DurabilityQuorum durabilityQuorum = repNode.getDurabilityQuorum();
final int durableAckCount = durabilityQuorum.
getCurrentRequiredAckCount(ReplicaAckPolicy.SIMPLE_MAJORITY);
long min = Long.MAX_VALUE;
synchronized (activeFeeders) {
int ackCount = 0;
for (Feeder feeder : activeFeeders.values()) {
if (!durabilityQuorum.
replicaAcksQualify(feeder.getReplicaNode())) {
continue;
}
final long replicaTxnVLSN =
feeder.getReplicaTxnEndVLSN().getSequence();
if (replicaTxnVLSN <= currDTVLSN) {
continue;
}
if (replicaTxnVLSN < min) {
min = replicaTxnVLSN;
}
if (++ackCount >= durableAckCount) {
/*
* If a majority of replicas have vlsns >= durable txn
* vlsn, advance the DTVLSN.
*/
repNode.updateDTVLSN(min);
return;
}
}
/* DTVLSN unchanged. */
return;
}
}
/**
* Set a test hook, parameterized by feeder's Name/ID pair, that delays
* CBVLSN updates if it throws an IllegalStateException.
*/
public static void setDelayCBVLSNUpdateHook(TestHook<NameIdPair> hook) {
delayCBVLSNUpdateHook = hook;
}
/**
* Convenience constant used by the DTVLSN flusher when committing the null
* transaction.
*/
private static TransactionConfig NULL_TXN_CONFIG = new TransactionConfig();
static {
NULL_TXN_CONFIG.setDurability(new Durability(SyncPolicy.WRITE_NO_SYNC,
SyncPolicy.WRITE_NO_SYNC,
ReplicaAckPolicy.NONE));
}
/**
* Writes a null (no modifications) commit record when it detects that the
* DTVLSN is ahead of the persistent DTVLSN and needs to be updated.
*
* Note that without this mechanism, the in-memory DTVLSN would always be
* ahead of the persisted VLSN, since in general DTVLSN(vlsn) < vlsn. That
* is, the commit or abort log record containing the DTVLSN always has a
* more recent VLSN than the one it contains.
*/
private class DTVLSNFlusher {
/**
* The number of feeder ticks for which the in-memory DTVLSN must be
* stable before it's written to the log as a null TXN. We are using
* this "tick" indirection to avoid yet another call to the clock. A
* "tick" in this context is the FEEDER_MANAGER_POLL_TIMEOUT.
*/
final int targetStableTicks;
/**
* The number of ticks for which the DTVLSN has been stable.
*/
private int stableTicks = 0;
public DTVLSNFlusher() {
final int heartbeatMs = repNode.getConfigManager().
getInt(RepParams.HEARTBEAT_INTERVAL);
targetStableTicks =
(int) Math.max(1, (2 * heartbeatMs) / pollTimeoutMs);
}
/**
* Used to track whether the DTVLSN has been stable enough to write
* out. While it's changing application commits and aborts are writing
* it out, so no need to write it here.
*/
private long stableDTVLSN = VLSN.NULL_VLSN_SEQUENCE;
/**
* Update each time we actually persist the DTVLSN via a null txn. It
* represents the DTVLSN that's been written out.
*/
private long persistedDTVLSN = VLSN.NULL_VLSN_SEQUENCE;
/* Identifies the Txn that was used to persist the DTVLSN. */
private long nullTxnVLSN = VLSN.NULL_VLSN_SEQUENCE;
/**
* Persists the DTVLSN if necessary. The DTVLSN is persisted if the
* version in memory is more current than the version on disk and has
* not changed for targetStableTicks.
*/
void flush() {
final long dtvlsn = repNode.getDTVLSN();
if (dtvlsn == nullTxnVLSN) {
/* Don't save VLSN from null transaction as DTVLSN */
return;
}
if (dtvlsn > stableDTVLSN) {
stableTicks = 0;
stableDTVLSN = dtvlsn;
/* The durable DTVLSN is being actively updated. */
return;
}
if (dtvlsn < stableDTVLSN) {
/* Enforce the invariant that the DTVLSN cannot decrease. */
throw new IllegalStateException("The DTVLSN sequence cannot decrease" +
"current DTVLSN:" + dtvlsn +
" previous DTVLSN:" + stableDTVLSN);
}
/* DTVLSN == stableDTVLSN */
if (++stableTicks <= targetStableTicks) {
/*
* Increase the stable tick counter. it has not been stable
* long enough.
*/
return;
}
stableTicks = 0;
/* dtvlsn has been stable */
if (stableDTVLSN > persistedDTVLSN) {
if (repNode.getActiveTransfer() != null) {
/*
* Don't attempt writing a transaction. while a transfer is
* in progress and txns will be blocked.
*/
LoggerUtils.info(logger, repNode.getRepImpl(),
"Skipped null txn updating DTVLSN: " +
dtvlsn + " Master transfer in progress");
return;
}
final RepImpl repImpl = repNode.getRepImpl();
final MasterTxn nullTxn =
MasterTxn.createNullTxn(repImpl, NULL_TXN_CONFIG,
repImpl.getNameIdPair());
/*
* We don't want to wait for any reason, if the txn fails,
* we can try later.
*/
nullTxn.setTxnTimeout(1);
try {
nullTxn.commit();
LoggerUtils.fine(logger, repNode.getRepImpl(),
"Persist DTVLSN: " + dtvlsn +
" at VLSN: " + nullTxn.getCommitVLSN() +
" via null transaction:" + nullTxn.getId());
nullTxnVLSN = nullTxn.getCommitVLSN().getSequence();
persistedDTVLSN = dtvlsn;
stableDTVLSN = persistedDTVLSN;
} catch (Exception e) {
nullTxn.abort();
LoggerUtils.warning(logger, repNode.getRepImpl(),
"Failed to write null txn updating DTVLSN; " +
e.getMessage());
}
}
}
}
/**
* Invoked only after a Feeder has completed its shutdown.
*/
void decrementManagedFeederCount() {
managedFeederCount.getAndDecrement();
}
}