| /*- |
| * Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved. |
| * |
| * This file was distributed by Oracle as part of a version of Oracle Berkeley |
| * DB Java Edition made available at: |
| * |
| * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html |
| * |
| * Please see the LICENSE file included in the top-level directory of the |
| * appropriate version of Oracle Berkeley DB Java Edition for a copy of the |
| * license and additional information. |
| */ |
| |
| package com.sleepycat.je.rep.impl; |
| |
| import static com.sleepycat.je.rep.NoConsistencyRequiredPolicy.NO_CONSISTENCY; |
| import static com.sleepycat.je.rep.impl.RepParams.NODE_NAME; |
| import static com.sleepycat.je.rep.impl.RepParams.REPLAY_FREE_DISK_PERCENT; |
| import static com.sleepycat.je.rep.impl.RepParams.REPLICA_RECEIVE_BUFFER_SIZE; |
| import static com.sleepycat.je.rep.impl.RepParams.REPSTREAM_OPEN_TIMEOUT; |
| import static com.sleepycat.je.rep.impl.RepParams.TEST_JE_VERSION; |
| import static com.sleepycat.je.rep.impl.RepParams.VLSN_MAX_DIST; |
| import static com.sleepycat.je.rep.impl.RepParams.VLSN_MAX_MAP; |
| import static com.sleepycat.je.rep.impl.RepParams.VLSN_STRIDE; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.logging.Formatter; |
| import java.util.logging.Level; |
| |
| import com.sleepycat.je.CheckpointConfig; |
| import com.sleepycat.je.Database; |
| import com.sleepycat.je.DatabaseConfig; |
| import com.sleepycat.je.DatabaseException; |
| import com.sleepycat.je.DatabaseNotFoundException; |
| import com.sleepycat.je.Durability; |
| import com.sleepycat.je.Durability.ReplicaAckPolicy; |
| import com.sleepycat.je.Durability.SyncPolicy; |
| import com.sleepycat.je.Environment; |
| import com.sleepycat.je.EnvironmentConfig; |
| import com.sleepycat.je.EnvironmentFailureException; |
| import com.sleepycat.je.EnvironmentLockedException; |
| import com.sleepycat.je.EnvironmentNotFoundException; |
| import com.sleepycat.je.JEVersion; |
| import com.sleepycat.je.ProgressListener; |
| import com.sleepycat.je.ReplicaConsistencyPolicy; |
| import com.sleepycat.je.StatsConfig; |
| import com.sleepycat.je.ThreadInterruptedException; |
| import com.sleepycat.je.TransactionConfig; |
| import com.sleepycat.je.TransactionTimeoutException; |
| import com.sleepycat.je.cleaner.ExtinctionScanner; |
| import com.sleepycat.je.dbi.DatabaseId; |
| import com.sleepycat.je.dbi.DatabaseImpl; |
| import com.sleepycat.je.dbi.DbConfigManager; |
| import com.sleepycat.je.dbi.DbTree; |
| import com.sleepycat.je.dbi.DbType; |
| import com.sleepycat.je.dbi.EnvironmentFailureReason; |
| import com.sleepycat.je.dbi.EnvironmentImpl; |
| import com.sleepycat.je.dbi.RepConfigProxy; |
| import com.sleepycat.je.dbi.StartupTracker.Phase; |
| import com.sleepycat.je.dbi.TTL; |
| import com.sleepycat.je.log.LogEntryHeader; |
| import com.sleepycat.je.log.LogEntryType; |
| import com.sleepycat.je.log.LogItem; |
| import com.sleepycat.je.log.entry.LogEntry; |
| import com.sleepycat.je.log.entry.RestoreRequired; |
| import com.sleepycat.je.recovery.RecoveryInfo; |
| import com.sleepycat.je.recovery.VLSNRecoveryProxy; |
| import com.sleepycat.je.rep.DatabasePreemptedException; |
| import com.sleepycat.je.rep.InsufficientAcksException; |
| import com.sleepycat.je.rep.InsufficientLogException; |
| import com.sleepycat.je.rep.InsufficientReplicasException; |
| import com.sleepycat.je.rep.LockPreemptedException; |
| import com.sleepycat.je.rep.LogFileRewriteListener; |
| import com.sleepycat.je.rep.LogOverwriteException; |
| import com.sleepycat.je.rep.QuorumPolicy; |
| import com.sleepycat.je.rep.RepInternal; |
| import com.sleepycat.je.rep.RepStatManager; |
| import com.sleepycat.je.rep.ReplicaConsistencyException; |
| import com.sleepycat.je.rep.ReplicaWriteException; |
| import com.sleepycat.je.rep.ReplicatedEnvironment; |
| import com.sleepycat.je.rep.ReplicatedEnvironmentStats; |
| import com.sleepycat.je.rep.ReplicationConfig; |
| import com.sleepycat.je.rep.ReplicationMutableConfig; |
| import com.sleepycat.je.rep.ReplicationNetworkConfig; |
| import com.sleepycat.je.rep.RestartRequiredException; |
| import com.sleepycat.je.rep.RollbackException; |
| import com.sleepycat.je.rep.StateChangeEvent; |
| import com.sleepycat.je.rep.StateChangeListener; |
| import com.sleepycat.je.rep.SyncupProgress; |
| import com.sleepycat.je.rep.UnknownMasterException; |
| import com.sleepycat.je.rep.impl.node.Feeder; |
| import com.sleepycat.je.rep.impl.node.MasterTransfer; |
| import com.sleepycat.je.rep.impl.node.NameIdPair; |
| import com.sleepycat.je.rep.impl.node.NodeState; |
| import com.sleepycat.je.rep.impl.node.RepNode; |
| import com.sleepycat.je.rep.impl.node.Replay; |
| import com.sleepycat.je.rep.impl.node.cbvlsn.LocalCBVLSNUpdater; |
| import com.sleepycat.je.rep.net.DataChannelFactory; |
| import com.sleepycat.je.rep.net.DataChannelFactory.ConnectOptions; |
| import com.sleepycat.je.rep.stream.ArbiterFeederSource; |
| import com.sleepycat.je.rep.stream.FeederFilter; |
| import com.sleepycat.je.rep.stream.FeederReader; |
| import com.sleepycat.je.rep.stream.FeederTxns; |
| import com.sleepycat.je.rep.subscription.StreamAuthenticator; |
| import com.sleepycat.je.rep.txn.MasterThreadLocker; |
| import com.sleepycat.je.rep.txn.MasterTxn; |
| import com.sleepycat.je.rep.txn.ReadonlyTxn; |
| import com.sleepycat.je.rep.txn.ReplayTxn; |
| import com.sleepycat.je.rep.txn.ReplicaThreadLocker; |
| import com.sleepycat.je.rep.utilint.HostPortPair; |
| import com.sleepycat.je.rep.utilint.RepUtils; |
| import com.sleepycat.je.rep.utilint.ReplicationFormatter; |
| import com.sleepycat.je.rep.utilint.StatCaptureRepDefinitions; |
| import com.sleepycat.je.rep.utilint.net.DataChannelFactoryBuilder; |
| import com.sleepycat.je.rep.vlsn.VLSNIndex; |
| import com.sleepycat.je.rep.vlsn.VLSNRange; |
| import com.sleepycat.je.rep.vlsn.VLSNRecoveryTracker; |
| import com.sleepycat.je.statcap.StatManager; |
| import com.sleepycat.je.txn.Locker; |
| import com.sleepycat.je.txn.ThreadLocker; |
| import com.sleepycat.je.txn.Txn; |
| import com.sleepycat.je.txn.TxnEnd; |
| import com.sleepycat.je.txn.VersionedWriteTxnEnd; |
| import com.sleepycat.je.util.DbBackup; |
| import com.sleepycat.je.utilint.BooleanStat; |
| import com.sleepycat.je.utilint.DbLsn; |
| import com.sleepycat.je.utilint.LoggerUtils; |
| import com.sleepycat.je.utilint.StatGroup; |
| import com.sleepycat.je.utilint.StringStat; |
| import com.sleepycat.je.utilint.VLSN; |
| |
| public class RepImpl |
| extends EnvironmentImpl |
| implements RepEnvConfigObserver { |
| |
| private VLSNIndex vlsnIndex; |
| /* VLSNIndexAccess coordinates the closing of the vlsn index */ |
| private final VLSNIndexAccess vlsnIndexAccess = new VLSNIndexAccess(); |
| |
| private final FeederTxns feederTxns; |
| |
| /* |
| * The repNode is only non-null when the replicated environment has joined |
| * a group. It's null otherwise. |
| */ |
| private volatile RepNode repNode; |
| private Replay replay; |
| |
| /* |
| * This is the canonical nameIdPair instance used by the node. The internal |
| * Id part of the pair will be updated when the node actually joins the |
| * group. |
| */ |
| private NameIdPair nameIdPair; |
| |
| private final NodeState nodeState; |
| |
| /* |
| * The clockskew used by this environment in ms. It's only used by testing |
| * to inject clock skew between ReplicatedEnvironments. |
| */ |
| private static int clockSkewMs = 0; |
| |
| /* |
| * A handle to the group database. This handle is initialized lazily when |
| * the contents of the database are first required. It's set to null upon |
| * shutdown. The handle must be initialized lazily because the database is |
| * created by the master, and we only know master identity later. The |
| * RepImpl manages the rep group database, so that the lifetime of the |
| * databaseImpl handle can be managed more easily to mesh with the opening |
| * and closing of the RepImpl. |
| */ |
| private DatabaseImpl groupDbImpl = null; |
| |
| /* The status presents whether this replica is doing rollback. */ |
| private boolean backupProhibited = false; |
| |
| /* |
| * Represents whether this Environment is allowed to convert a |
| * non-replicated Environment to replicated. |
| */ |
| private boolean allowConvert = false; |
| |
| /** Config params for preserving and caching the VLSN. */ |
| private boolean preserveVLSN; |
| private boolean cacheVLSN; |
| |
| /* |
| * True if TTL or record extinction is available. Volatile is not used, |
| * since checking more than once is idempotent. |
| */ |
| private boolean isTTLAvailable = false; |
| private boolean isRecordExtinctionAvailable = false; |
| |
| /* Keep an eye on the ongoing DbBackups. */ |
| private final Set<DbBackup> backups = new HashSet<>(); |
| |
| /* |
| * The list of observers who are notified when a mutable rep param changes. |
| */ |
| private final List<RepEnvConfigObserver> repConfigObservers; |
| |
| /* |
| * Lock used to control access and lazy initialization of groupDbImpl, |
| * ensuring that there is exactly one database made. A mutex is used rather |
| * than synchronization to allow us to probe for contention on the |
| * groupDbImpl. |
| */ |
| private final ReentrantLock groupDbLock = new ReentrantLock(); |
| |
| private int replicaAckTimeout; |
| private int arbiterAckTimeout; |
| private int insufficientReplicasTimeout; |
| private int replayTxnTimeout; |
| private ReplicaConsistencyPolicy defaultConsistencyPolicy; |
| private boolean allowArbiterAck; |
| |
| /* |
| * Arbiters, subscribers and networkBackup use RepImpls which are read |
| * only and have some daemon functionality disabled. |
| */ |
| private boolean isArbiter; |
| private boolean isSubscriber; |
| private boolean isNetworkBackup; |
| |
| /* |
| * NodeStats are currently not public, but we may want to evaluate |
| * and decide if they would be useful, perhaps as a debugging aid. |
| */ |
| private final StatGroup nodeStats; |
| private final BooleanStat hardRecoveryStat; |
| private final StringStat hardRecoveryInfoStat; |
| |
| /* |
| * Used to block transaction commit/abort execution just before completing |
| * a Master Transfer operation. |
| */ |
| private volatile CountDownLatch blockTxnLatch = new CountDownLatch(0); |
| |
| /** |
| * A lock used to coordinate access to {@link #blockTxnLatch}. |
| * <p> |
| * When a Master Transfer operation completes Phase 1, it sets a new {@code |
| * CountDownLatch} in order to block the completion of transactions at the |
| * commit or abort stage. We must avoid having it do so at an awkward |
| * moment. There are two (unrelated) cases: |
| * <ol> |
| * <li>There is a brief period between the time a transaction "awaits" the |
| * latch (in {@code checkBlock()}) and the time it publishes its VLSN. We |
| * must avoid having Master Transfer read its "ultimate goal" VLSN during |
| * that interval. |
| * <li>The Feeder input thread occasionally updates the GroupDB, upon |
| * receiving a Heartbeat response. That happens in a transaction, like any |
| * other, so it could be subject to the normal blockage in Phase 2. But |
| * the Feeder input thread is of course also the thread that we rely on for |
| * making progress towards the goal of Master Transfer; so blocking it is |
| * counterproductive. |
| * </ol> |
| * |
| * @see MasterTransfer |
| * @see ReplicatedEnvironment#transferMaster |
| */ |
| private final ReentrantReadWriteLock blockLatchLock = |
| new ReentrantReadWriteLock(true); |
| |
| /* application listener for syncups. */ |
| private final ProgressListener<SyncupProgress> syncupProgressListener; |
| |
| /* Application callback to be notified before we overwrite log files. */ |
| private final LogFileRewriteListener logRewriteListener; |
| |
| /* Configuration for ServiceDispatcher communication */ |
| private final ReplicationNetworkConfig repNetConfig; |
| |
| /* |
| * Factory for creating channel instances. Not available until |
| * initializeChannelFactory is called. |
| */ |
| private volatile DataChannelFactory channelFactory; |
| |
| /** |
| * Used for testing, to create log files with |
| * VLSN.UNINITIALIZED_VLSN_SEQUENCE as the value for the dtvlsn |
| */ |
| private static boolean simulatePreDTVLSNMaster = false; |
| |
| /* |
| * Used to verify VLSN invariants as they are written to the log on the |
| * master |
| */ |
| private long prevLoggedVLSN = VLSN.NULL_VLSN_SEQUENCE; |
| private long prevLoggedDTVLSN = VLSN.NULL_VLSN_SEQUENCE; |
| |
| /* |
| * The filter transmitted to a Feeder so that records can be filtered at |
| * the source |
| */ |
| private final FeederFilter feederFilter; |
| |
| /** |
| * The feeder authenticator generator is instantiated by the replicated |
| * environment creator. |
| */ |
| private volatile StreamAuthenticator authenticator = null; |
| |
| public RepImpl(File envHome, |
| EnvironmentConfig envConfig, |
| EnvironmentImpl sharedCacheEnv, |
| RepConfigProxy repConfigProxy) |
| throws EnvironmentNotFoundException, EnvironmentLockedException { |
| |
| super(envHome, envConfig, sharedCacheEnv, repConfigProxy); |
| |
| allowConvert = |
| RepInternal.getAllowConvert(((ReplicationConfig) repConfigProxy)); |
| |
| repConfigObservers = new ArrayList<>(); |
| addRepConfigObserver(this); |
| |
| repNetConfig = |
| ((ReplicationConfig)repConfigProxy).getRepNetConfig(); |
| nodeState = new NodeState(nameIdPair, this); |
| |
| if (isArbiter || isSubscriber || isNetworkBackup ) { |
| nodeStats = null; |
| syncupProgressListener = null; |
| logRewriteListener = null; |
| hardRecoveryStat = null; |
| hardRecoveryInfoStat = null; |
| feederTxns = null; |
| feederFilter = null; |
| return; |
| } |
| |
| feederTxns = new FeederTxns(this); |
| replay = new Replay(this, nameIdPair); |
| |
| nodeStats = new StatGroup(RepImplStatDefinition.GROUP_NAME, |
| RepImplStatDefinition.GROUP_DESC); |
| hardRecoveryStat = new BooleanStat(nodeStats, |
| RepImplStatDefinition.HARD_RECOVERY); |
| hardRecoveryInfoStat = |
| new StringStat(nodeStats, RepImplStatDefinition.HARD_RECOVERY_INFO, |
| "This node did not incur a hard recovery."); |
| |
| syncupProgressListener = |
| ((ReplicationConfig)repConfigProxy).getSyncupProgressListener(); |
| logRewriteListener = |
| ((ReplicationConfig)repConfigProxy).getLogFileRewriteListener(); |
| |
| feederFilter = |
| ((ReplicationConfig)repConfigProxy).getFeederFilter(); |
| |
| authenticator = ((ReplicationConfig)repConfigProxy).getAuthenticator(); |
| } |
| |
| /** |
| * Called by the EnvironmentImpl constructor. Some rep params, |
| * preserveVLSN for example, are accessed by the EnvironmentImpl via |
| * methods (getPreserveVLSN for example), so they need to be initialized |
| * early. |
| */ |
| @Override |
| protected void initConfigParams(EnvironmentConfig envConfig, |
| RepConfigProxy repConfigProxy) { |
| |
| /* Init standalone config params first. */ |
| super.initConfigParams(envConfig, repConfigProxy); |
| |
| /* Init rep config params. */ |
| replicaAckTimeout = |
| configManager.getDuration(RepParams.REPLICA_ACK_TIMEOUT); |
| insufficientReplicasTimeout = |
| configManager.getDuration(RepParams.INSUFFICIENT_REPLICAS_TIMEOUT); |
| replayTxnTimeout = |
| configManager.getDuration(RepParams.REPLAY_TXN_LOCK_TIMEOUT); |
| defaultConsistencyPolicy = RepUtils.getReplicaConsistencyPolicy |
| (configManager.get(RepParams.CONSISTENCY_POLICY)); |
| preserveVLSN = |
| configManager.getBoolean(RepParams.PRESERVE_RECORD_VERSION); |
| cacheVLSN = |
| configManager.getBoolean(RepParams.CACHE_RECORD_VERSION); |
| allowArbiterAck = |
| configManager.getBoolean(RepParams.ALLOW_ARBITER_ACK); |
| isArbiter = |
| configManager.getBoolean(RepParams.ARBITER_USE); |
| isSubscriber = |
| configManager.getBoolean(RepParams.SUBSCRIBER_USE); |
| isNetworkBackup = |
| configManager.getBoolean(RepParams.NETWORKBACKUP_USE); |
| arbiterAckTimeout = |
| configManager.getDuration(RepParams.ARBITER_ACK_TIMEOUT); |
| } |
| |
| @Override |
| protected Formatter initFormatter() { |
| |
| /* |
| * The nameIdPair field is assigned here rather than in the constructor |
| * because of base class/subclass dependencies. initFormatter() is |
| * called by the base class constructor, and nameIdPair must be |
| * available at that time. |
| */ |
| nameIdPair = new NameIdPair(configManager.get(NODE_NAME)); |
| return new ReplicationFormatter(nameIdPair); |
| } |
| |
| @Override |
| public String getMonitorClassName() { |
| return "com.sleepycat.je.rep.jmx.RepJEMonitor"; |
| } |
| |
| @Override |
| public String getDiagnosticsClassName() { |
| return "com.sleepycat.je.rep.jmx.RepJEDiagnostics"; |
| } |
| |
| @Override |
| protected DbConfigManager |
| initConfigManager(EnvironmentConfig envConfig, |
| RepConfigProxy repConfigProxy) { |
| return new RepConfigManager(envConfig, repConfigProxy); |
| } |
| |
| @Override |
| public boolean getAllowRepConvert() { |
| return allowConvert; |
| } |
| |
| @Override |
| protected DbConfigManager resetConfigManager(EnvironmentConfig newConfig) { |
| /* Save all the replication related properties. */ |
| RepConfigManager repConfigManager = (RepConfigManager) configManager; |
| ReplicationConfig repConfig = repConfigManager.makeReplicationConfig(); |
| return new RepConfigManager(newConfig, repConfig); |
| } |
| |
| public ReplicationConfig cloneRepConfig() { |
| RepConfigManager repConfigManager = (RepConfigManager) configManager; |
| return repConfigManager.makeReplicationConfig(); |
| } |
| |
| /* Make an ReplicatedEnvironment handle for this RepImpl. */ |
| public ReplicatedEnvironment makeEnvironment() { |
| return new ReplicatedEnvironment(getEnvironmentHome(), |
| cloneRepConfig(), |
| cloneConfig()); |
| } |
| |
| public ReplicationMutableConfig cloneRepMutableConfig() { |
| RepConfigManager repConfigManager = (RepConfigManager) configManager; |
| return repConfigManager.makeReplicationConfig(); |
| } |
| |
| public void setRepMutableConfig(ReplicationMutableConfig config) |
| throws DatabaseException { |
| |
| /* Clone the current config. */ |
| RepConfigManager repConfigManager = (RepConfigManager) configManager; |
| ReplicationConfig newConfig = repConfigManager.makeReplicationConfig(); |
| |
| /* Copy in the mutable props. */ |
| config.copyMutablePropsTo(newConfig); |
| repConfigManager = new RepConfigManager |
| (configManager.getEnvironmentConfig(), newConfig); |
| |
| /* |
| * Update the current config and notify observers. The config manager |
| * is replaced with a new instance that uses the new configuration. |
| * This avoids synchronization issues: other threads that have a |
| * reference to the old configuration object are not impacted. |
| * |
| * Notify listeners in reverse order of registration so that the |
| * environment listener is notified last and can start daemon threads |
| * after they are configured. |
| */ |
| for (int i = repConfigObservers.size() - 1; i >= 0; i -= 1) { |
| RepEnvConfigObserver o = repConfigObservers.get(i); |
| o.repEnvConfigUpdate(repConfigManager, newConfig); |
| } |
| } |
| |
| @Override |
| public void repEnvConfigUpdate(RepConfigManager configMgr, |
| ReplicationMutableConfig newConfig) |
| throws DatabaseException { |
| |
| allowArbiterAck = |
| configMgr.getBoolean(RepParams.ALLOW_ARBITER_ACK); |
| |
| if (repNode == null) { |
| return; |
| } |
| |
| repNode.getArbiter().processConfigChange(newConfig); |
| |
| repNode.getElectionQuorum().setElectableGroupSizeOverride |
| (newConfig.getElectableGroupSizeOverride()); |
| |
| /* Account for mutation of deprecated HA LogFlusher params. */ |
| getLogFlusher().configFlushTask(configMgr); |
| |
| repNode.getReplica().getDbCache().setConfig(configMgr); |
| } |
| |
| public synchronized void addRepConfigObserver(RepEnvConfigObserver o) { |
| repConfigObservers.add(o); |
| } |
| |
| /** |
| * The VLSNIndex must be created, merged and flushed before the recovery |
| * checkpoint. This method should be called even if there is no recovery |
| * checkpoint, because it sets up needed data structures. |
| * |
| * On the face of it, it seems that one could flush the VLSNIndex cache |
| * after the recovery checkpoint, before the Replicator constructor returns |
| * and before any user level HA operations can start. That's not sufficient |
| * because the recovery checkpoint is shortening the recovery interval for |
| * future recoveries, and any information that has been garnered must be |
| * persisted. Here's an example of what might happen after a series of |
| * recoveries if we fail to flush VLSNIndex as part of the recovery |
| * checkpoint: |
| * |
| * Environment recovers for first time, brand new environment |
| * recovery did not find any VLSNs in log, because log is brand new |
| * recovery logs ckpt 1start |
| * recovery logs ckpt 1 end |
| * |
| * VLSN 1 logged |
| * VLSN 2 logged |
| * VLSN 3 logged |
| * |
| * crash .... Environment recovers |
| * recovery crawls log from ckpt 1 start onward, finds VLSNs 1-3 |
| * recovery logs ckpt 2 start |
| * recovery logs ckpt 2 end |
| * VLSN index instantiated, VLSNs 1-3 added in but not written too disk |
| * |
| * crash ... Environment recovers |
| * recovery crawls log from ckpt start 2 start onward, finds no VLSNs. |
| * |
| * Instead, the flushed VLSN has to be logged before the checkpoint end |
| * record that is used for the next recovery. |
| */ |
| @Override |
| public void preRecoveryCheckpointInit(RecoveryInfo recoveryInfo) { |
| |
| int stride = configManager.getInt(VLSN_STRIDE); |
| int maxMappings = configManager.getInt(VLSN_MAX_MAP); |
| int maxDist = configManager.getInt(VLSN_MAX_DIST); |
| |
| /* |
| * Our local nameIdPair field isn't set yet because we haven't finished |
| * our initialization, so get it from the config manager. |
| */ |
| NameIdPair useNameIdPair = |
| new NameIdPair(configManager.get(NODE_NAME)); |
| |
| vlsnIndex = new VLSNIndex(this, DbType.VLSN_MAP.getInternalName(), |
| useNameIdPair, stride, maxMappings, maxDist, |
| recoveryInfo); |
| replay.preRecoveryCheckpointInit(recoveryInfo); |
| } |
| |
| /** |
| * Returns the current state associated with this ReplicatedEnvironment |
| * |
| * @return the externally visible ReplicatedEnvironment state |
| */ |
| public ReplicatedEnvironment.State getState() { |
| return nodeState.getRepEnvState(); |
| } |
| |
| /** |
| * Returns the state change event that transitioned the |
| * ReplicatedEnviroment to its current state. |
| */ |
| public StateChangeEvent getStateChangeEvent() { |
| return nodeState.getStateChangeEvent(); |
| } |
| |
| public NodeState getNodeState() { |
| return nodeState; |
| } |
| |
| /** |
| * Wait for this node to join a replication group and return whether it is |
| * a MASTER or REPLICA. Note that any method that creates or clears the |
| * repNode field must be synchronized. |
| */ |
| public synchronized ReplicatedEnvironment.State |
| joinGroup(ReplicaConsistencyPolicy consistency, |
| QuorumPolicy initialElectionPolicy) |
| throws ReplicaConsistencyException, DatabaseException { |
| |
| startupTracker.start(Phase.TOTAL_JOIN_GROUP); |
| try { |
| if (repNode == null) { |
| repNode = new RepNode(this, replay, nodeState); |
| } |
| |
| return repNode.joinGroup(consistency, initialElectionPolicy); |
| } catch (IOException ioe) { |
| throw EnvironmentFailureException.unexpectedException |
| (this, "Problem attempting to join on " + getSocket(), ioe); |
| } finally { |
| startupTracker.stop(Phase.TOTAL_JOIN_GROUP); |
| } |
| } |
| |
| /** |
| * Initialize the DataChannelFactory in our configuration for use. |
| * This is public to allow access by the ReplicatedEnvironment constructor. |
| * @throws IllegalArgumentException if the ReplicationNetworkConfig |
| * is invalid. |
| */ |
| public void initializeChannelFactory() { |
| if (channelFactory != null) { |
| return; |
| } |
| |
| synchronized (this) { |
| if (channelFactory == null) { |
| channelFactory = |
| DataChannelFactoryBuilder.construct( |
| repNetConfig, |
| DataChannelFactoryBuilder.makeLoggerFactory(this)); |
| } |
| } |
| } |
| |
| @Override |
| protected Environment createInternalEnvironment() { |
| return new InternalReplicatedEnvironment |
| (getEnvironmentHome(), cloneRepConfig(), cloneConfig(), this); |
| } |
| |
| /** |
| * @see EnvironmentImpl#setupClose |
| * Release all replication resources that can be released before the |
| * checkpoint. Note that any method that creates or clears the repNode |
| * field must be called from a synchronized caller. |
| * |
| * Note that the vlsnIndex is closed as a callback, from |
| * postCheckpointPreEnvClose() |
| * @throws DatabaseException |
| * |
| */ |
| @Override |
| protected synchronized void setupClose(PrintWriter errors) |
| throws DatabaseException { |
| |
| if (groupDbImpl != null) { |
| getDbTree().releaseDb(groupDbImpl); |
| groupDbImpl = null; |
| LoggerUtils.fine |
| (envLogger, this, "Group member database shutdown"); |
| } |
| |
| try { |
| if (repNode != null) { |
| repNode.shutdown(); |
| repNode = null; |
| } |
| } catch (InterruptedException e) { |
| appendException(errors, e, "shutting down node " + nameIdPair); |
| } |
| } |
| |
| /** |
| * Close any resources that need to be closed after the closing checkpoint. |
| * Note that since Replay.close closes open transactions, it must be |
| * invoked after the checkpoint has been completed, so that the checkpoint |
| * operation can correctly account for the open transactions. |
| */ |
| @Override |
| protected synchronized void postCheckpointClose(boolean checkpointed) |
| throws DatabaseException { |
| |
| if (replay != null) { |
| replay.close(); |
| replay = null; |
| } |
| |
| vlsnIndexAccess.closeVLSNIndex(checkpointed); |
| } |
| |
| /** |
| * @see EnvironmentImpl#setupClose |
| * |
| * Note: this conversion process will iterate over all user created |
| * databases in the environment, which could be potentially be a costly |
| * affair. However, let's opt for simplicity and defer any optimizations |
| * until we see whether this is an important use case. |
| */ |
| @Override |
| protected void postRecoveryConversion() { |
| |
| super.postRecoveryConversion(); |
| |
| if (needRepConvert) { |
| /* Set NameDb to replicated. */ |
| DatabaseImpl nameDb = null; |
| try { |
| nameDb = dbMapTree.getDb(DbTree.NAME_DB_ID); |
| if (!nameDb.isReplicated()) { |
| nameDb.setIsReplicatedBit(); |
| nameDb.setDirty(); |
| } |
| } finally { |
| if (nameDb != null) { |
| dbMapTree.releaseDb(nameDb); |
| } |
| } |
| |
| /* Set user defined databases to replicated. */ |
| Map<DatabaseId, String> idNameMap = dbMapTree.getDbNamesAndIds(); |
| for (DatabaseId id : idNameMap.keySet()) { |
| DatabaseImpl db = null; |
| try { |
| db = dbMapTree.getDb(id); |
| if (db != null && |
| !DbTree.isReservedDbName(idNameMap.get(id))) { |
| |
| db.setIsReplicatedBit(); |
| db.setDirty(); |
| } |
| } finally { |
| if (db != null) { |
| dbMapTree.releaseDb(db); |
| } |
| } |
| } |
| |
| /* |
| * Do a checkpointer to flush dirty datbaseImpls that are converted |
| * to replicated and write the current VLSNRange to the log. |
| */ |
| CheckpointConfig ckptConfig = new CheckpointConfig(); |
| ckptConfig.setForce(true); |
| ckptConfig.setMinimizeRecoveryTime(true); |
| invokeCheckpoint(ckptConfig, "Environment conversion"); |
| } |
| } |
| |
| /* |
| * Close enough resources to support reopening the environment in the same |
| * JVM. |
| * @see EnvironmentImpl#doCloseAfterInvalid() |
| */ |
| @Override |
| public synchronized void doCloseAfterInvalid() { |
| |
| try { |
| /* Release the repNode, in order to release sockets. */ |
| if (repNode != null) { |
| repNode.shutdown(); |
| repNode = null; |
| } |
| } catch (Exception ignore) { |
| } |
| |
| super.doCloseAfterInvalid(); |
| } |
| |
| /** |
| * Used by error handling to forcibly close an environment, and by tests to |
| * close an environment to simulate a crash. Database handles do not have |
| * to be closed before calling this method. A checkpoint is not performed. |
| * The various thread pools will be shutdown abruptly. |
| * |
| * @throws DatabaseException |
| */ |
| @Override |
| public void abnormalClose() |
| throws DatabaseException { |
| |
| /* |
| * Shutdown the daemons, and the checkpointer in particular, before |
| * nulling out the vlsnIndex. |
| */ |
| shutdownDaemons(); |
| |
| try { |
| if (repNode != null) { |
| |
| /* |
| * Don't fire a LeaveGroupEvent if it's an abnormal close, |
| * otherwise an EnvironmentFailureException would be thrown |
| * because daemons of this Environment have been shutdown. |
| */ |
| repNode.getMonitorEventManager().disableLeaveGroupEvent(); |
| repNode.shutdown(); |
| repNode = null; |
| } |
| } catch (InterruptedException ignore) { |
| /* ignore */ |
| } |
| |
| try { |
| vlsnIndexAccess.abnormalCloseVLSNIndex(); |
| } catch (DatabaseException ignore) { |
| /* ignore */ |
| } |
| |
| try { |
| super.abnormalClose(); |
| } catch (DatabaseException ignore) { |
| /* ignore */ |
| } |
| } |
| |
| /** |
| * A replicated log entry has been written on this node. Update the |
| * {@literal VLSN->LSN} mapping. Called outside the log write latch. |
| * @throws DatabaseException |
| */ |
| @Override |
| public void registerVLSN(LogItem logItem) { |
| LogEntryHeader header = logItem.header; |
| VLSN vlsn = header.getVLSN(); |
| |
| /* |
| * Although the very first replicated entry of the system is never a |
| * syncable log entry type, the first GlobalCBVLSN of the system must |
| * start at 1. If we only track the first syncable entry, the |
| * GlobalCBVLSN will start a a value > 1, and replicas that are |
| * starting up from VLSN 1 will be caught in spurious network restores |
| * because VLSN 1 < the GlobalCBVLSN. Therefore treat the VLSN 1 as a |
| * syncable entry for the sake of the GlobalCBVLSN. |
| */ |
| if (LogEntryType.isSyncPoint(header.getType()) || |
| VLSN.FIRST_VLSN.equals(vlsn)) { |
| repNode.trackSyncableVLSN(vlsn, logItem.lsn); |
| } |
| vlsnIndex.put(logItem); |
| } |
| |
| /** |
| * {@literal |
| * Generate the next VLSN and update the DTVLSN value in the item. Note |
| * that this method is only invoked when the node is in the Master state, |
| * since the master assigns new VLSNs and DTVLSNs, and the replicas simply |
| * preserve them. |
| * |
| * The DTVLSN value must be calculated under the same latch as the updating |
| * of the VLSN to ensure that the following invariants are maintained: |
| * |
| * lsn1 > lsn2 ==> VLSN(lsn1) > VLSN(lsn2) |
| * vlsn2 > vlsn1 ==> DTVLSN(vlsn2) >= DTVLSN(vlsn1) |
| * |
| * where vlsn2 and vlsn1 are transaction commit or abort records. |
| * |
| * Replicas, when replaying their stream, verify that this invariant is |
| * maintained. |
| * |
| * Commit/Abort records for Replication groups that have a single electable |
| * and durable node have their dtvlsn written as the associated VLSN, that |
| * is, DTVLSN(vlsn) == vlsn. For all other RG configurations, DTVLSN(vlsn) |
| * < vlsn. |
| * |
| * Commit/Abort Log records that are created by replaying an HA stream from |
| * a pre DTVLSN feeder, will have their dtvlsns set to |
| * VLSN.UNINITIALIZED_VLSN_SEQUENCE during replica replay. They do not |
| * follow this code path. |
| * } |
| * |
| * @param entry the log entry with which the VLSN will be associated. If |
| * the log entry represents a commit or abort entry, its DTVLSN is modified |
| * so that it's correct when it's serialized out. |
| */ |
| @Override |
| public VLSN assignVLSNs(LogEntry entry) { |
| final VLSN vlsn = vlsnIndex.bump(); |
| |
| final byte itemType = entry.getLogType().getTypeNum(); |
| if (itemType != LogEntryType.LOG_TXN_COMMIT.getTypeNum() && |
| itemType != LogEntryType.LOG_TXN_ABORT.getTypeNum()) { |
| return vlsn; |
| } |
| |
| /* |
| * A commit or abort record. Compute the DTVLSN value to associate with |
| * the commit record and store it into the buffer at the appropriate |
| * position. |
| */ |
| final long dtvlsn; |
| |
| if (simulatePreDTVLSNMaster) { |
| dtvlsn = VLSN.UNINITIALIZED_VLSN_SEQUENCE; |
| } else if (repNode.isNeedsAcks()) { |
| /* |
| * Use the dtvlsn value being tracked via acknowledgments from |
| * replicas when replication is being used for durability. |
| */ |
| dtvlsn = getRepNode().getDTVLSN(); |
| } else { |
| /* |
| * Replicated environment, but replication is not being used for |
| * durability. That is, the commit is self-acknowledged, set dtvlsn |
| * == vlsn. |
| */ |
| dtvlsn = getRepNode().updateDTVLSN(vlsn.getSequence()); |
| } |
| |
| final VersionedWriteTxnEnd txnEnd = |
| (VersionedWriteTxnEnd)entry.getMainItem(); |
| |
| /* |
| * As a cheap sanity check, commits/aborts on the master are created |
| * with VLSN.NULL values, so they can be verified here. |
| */ |
| final long checkDTVLSN = txnEnd.getDTVLSN(); |
| if (checkDTVLSN != VLSN.NULL_VLSN_SEQUENCE) { |
| throw new IllegalStateException("NULL DTVLSN expected at VLSN:" + |
| vlsn + " not " + checkDTVLSN); |
| } |
| |
| txnEnd.setDTVLSN(dtvlsn); |
| |
| /* Verify invariant */ |
| if (prevLoggedVLSN > vlsn.getSequence()) { |
| if (dtvlsn < prevLoggedDTVLSN) { |
| String msg = |
| "DTVLSNs must be in ascending order in the stream. " + |
| " prev DTVLSN:" + prevLoggedDTVLSN + |
| " next DTVLSN:" + dtvlsn + " at VLSN: " + vlsn; |
| throw EnvironmentFailureException.unexpectedState(this, msg); |
| } |
| } |
| prevLoggedVLSN = vlsn.getSequence(); |
| prevLoggedDTVLSN = dtvlsn; |
| |
| return vlsn; |
| } |
| |
| /** |
| * Flush any information that needs to go out at checkpoint. Specifically, |
| * write any in-memory {@literal VLSN->LSN} mappings to the VLSNIndex |
| * database so we are guaranteed that the VLSNIndex database will recover |
| * properly. This must be committed with noSync because |
| * - the ensuing checkpoint end record will be logged with an fsync and |
| * will effectively force this out |
| * - it's important to minmize lock contention on the vlsn index and |
| * any fsync done during a checkpoint will be expensive, as there may |
| * be quite a lot to push to disk. We don't want to incur that cost |
| * while holding locks on the vlsn index. [#20702] |
| */ |
| @Override |
| public void preCheckpointEndFlush() |
| throws DatabaseException { |
| |
| if (vlsnIndex != null) { |
| vlsnIndex.flushToDatabase(Durability.COMMIT_NO_SYNC); |
| } |
| } |
| |
| @Override |
| public boolean isMaster() { |
| |
| /* |
| * The volatile repNode field might be modified by joinGroup(), |
| * leaveGroup, or close(), which are synchronized. Keep this method |
| * unsynchronized, assign to a temporary field to guard against a |
| * change. |
| */ |
| RepNode useNode = repNode; |
| if (useNode == null) { |
| return false; |
| } |
| return useNode.isMaster(); |
| } |
| |
| public void setChangeListener(StateChangeListener listener) { |
| StateChangeListener prevListener = nodeState.getChangeListener(); |
| nodeState.setChangeListener(listener); |
| |
| /* |
| * Call back so that it's aware of the last state change event and |
| * the application can initialize itself correctly as a master or |
| * replica. |
| */ |
| final StateChangeEvent stateChangeEvent = |
| nodeState.getStateChangeEvent(); |
| try { |
| /* Invoke application code and handle any app exceptions. */ |
| listener.stateChange(stateChangeEvent); |
| } catch (Exception e) { |
| /* Revert the change. */ |
| nodeState.setChangeListener(prevListener); |
| LoggerUtils.severe |
| (envLogger, this, |
| "State Change listener exception: " + e.getMessage()); |
| /* An application error. */ |
| throw new EnvironmentFailureException |
| (this, EnvironmentFailureReason.LISTENER_EXCEPTION, e); |
| } |
| } |
| |
| public StateChangeListener getChangeListener() { |
| return nodeState.getChangeListener(); |
| } |
| |
| public VLSNIndex getVLSNIndex() { |
| return vlsnIndex; |
| } |
| |
| public FeederTxns getFeederTxns() { |
| return feederTxns; |
| } |
| |
| public ReplicatedEnvironmentStats getStats(StatsConfig config) { |
| return getStats(config, statKey); |
| } |
| |
| @Override |
| public Collection<StatGroup> getRepStatGroups(StatsConfig config, |
| Integer statKey1) { |
| ReplicatedEnvironmentStats res = getStats(config, statKey1); |
| return (res == null) ? null : res.getStatGroups(); |
| } |
| |
| @Override |
| public SortedSet<String> getStatCaptureProjections() { |
| return new StatCaptureRepDefinitions().getStatisticProjections(); |
| } |
| |
| @Override |
| public StatManager createStatManager() { |
| return new RepStatManager(this); |
| } |
| |
| public FeederFilter getFeederFilter() { |
| return feederFilter; |
| } |
| |
| public ReplicatedEnvironmentStats getStatsInternal(StatsConfig config) { |
| if (repNode == null) { |
| return null; |
| } |
| return repNode.getStats(config); |
| } |
| |
| public ReplicatedEnvironmentStats getStats( |
| StatsConfig config, |
| Integer contextKey) { |
| return ((RepStatManager)statManager).getRepStats(config, contextKey); |
| } |
| |
| public Replay getReplay() { |
| return replay; |
| } |
| |
| /** |
| * Ensures that the environment is currently a Master before proceeding |
| * with an operation that requires it to be the master. |
| * |
| * @throws UnknownMasterException if the node is disconnected |
| * @throws ReplicaWriteException if the node is currently a replica |
| */ |
| public void checkIfMaster(Locker locker) |
| throws UnknownMasterException, ReplicaWriteException { |
| |
| final StateChangeEvent event = nodeState.getStateChangeEvent(); |
| |
| switch (nodeState.getRepEnvState()) { |
| case MASTER: |
| break; |
| |
| case REPLICA: |
| throw new ReplicaWriteException(locker, event); |
| |
| case UNKNOWN: |
| throw new UnknownMasterException(locker, event); |
| |
| case DETACHED: |
| throw new UnknownMasterException(locker, event); |
| |
| default: |
| throw EnvironmentFailureException.unexpectedState |
| ("Unexpected state: " + nodeState.getRepEnvState()); |
| } |
| } |
| |
| /** |
| * @return the repNode. May return null. |
| */ |
| public RepNode getRepNode() { |
| return repNode; |
| } |
| |
| /** |
| * Create an appropriate type of ThreadLocker. Specifically, it creates an |
| * MasterThreadLocker if the node is currently a Master, and a |
| * ReplicaThreadLocker otherwise, that is, if the node is a Replica, or |
| * it's currently in a DETACHED state. |
| * |
| * @return an instance of MasterThreadLocker or ReplicaThreadLocker |
| */ |
| @Override |
| public ThreadLocker createRepThreadLocker() { |
| return (isMaster() ? |
| new MasterThreadLocker(this) : |
| new ReplicaThreadLocker(this)); |
| } |
| |
| /** |
| * Create an appropriate type of Replicated transaction. Specifically, |
| * it creates a MasterTxn, if the node is currently a Master, a ReadonlyTxn |
| * otherwise, that is, if the node is a Replica, or it's currently in a |
| * DETACHED state. |
| * |
| * Note that a ReplicaTxn, used for transaction replay on a Replica is not |
| * created on this path. It's created explicitly in the Replay loop by a |
| * Replica. |
| * |
| * @param config the transaction configuration |
| * |
| * @return an instance of MasterTxn or ReadonlyTxn |
| * @throws DatabaseException |
| */ |
| @Override |
| public Txn createRepUserTxn(TransactionConfig config) |
| throws DatabaseException { |
| |
| return (isMaster() && |
| !config.getReadOnly() && |
| !config.getLocalWrite()) ? |
| MasterTxn.create(this, config, nameIdPair) : |
| new ReadonlyTxn(this, config); |
| } |
| |
| /** |
| * Ensure that a sufficient number of feeders are available before |
| * proceeding with a master transaction begin. |
| * |
| * @param txn the master transaction being initiated. |
| * |
| * @throws InterruptedException |
| * @throws DatabaseException if there were insufficient Replicas after the |
| * timeout period. |
| */ |
| public void txnBeginHook(MasterTxn txn) |
| throws InterruptedException, |
| DatabaseException { |
| |
| checkIfInvalid(); |
| final long txnTimeout = txn.getTxnTimeout(); |
| int timeout = insufficientReplicasTimeout; |
| |
| if ((txnTimeout != 0) && (txnTimeout < insufficientReplicasTimeout)) { |
| timeout = (int) txnTimeout; |
| } |
| |
| repNode.getDurabilityQuorum().ensureReplicasForCommit(txn, timeout); |
| } |
| |
| /** |
| * Installs the commit-blocking latch that is used to halt the commit/abort |
| * of transactions, in the final phase of a master transfer. |
| * |
| * @see #updateCBVLSN(LocalCBVLSNUpdater) |
| */ |
| public void blockTxnCompletion(CountDownLatch blocker) |
| throws InterruptedException { |
| |
| ReentrantReadWriteLock.WriteLock lock = blockLatchLock.writeLock(); |
| lock.lockInterruptibly(); |
| try { |
| blockTxnLatch = blocker; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * If the GlobalCBVLSN is not defunct, updates the CBVLSN on behalf of a |
| * Feeder input thread (or FeederManager running in the RepNode thread). |
| * If the GlobalCBVLSN is defunct, does nothing. |
| * <p> |
| * Does the update while avoiding the possibility that any resulting |
| * GroupDB update may get blocked behind the final phase of a master |
| * transfer. |
| * <p> |
| * We skip the update if we're at the point of blocking new transactions |
| * for a master transfer. And we use a read/write lock in order to be able |
| * to examine that state safely. |
| */ |
| public void updateCBVLSN(LocalCBVLSNUpdater updater) { |
| |
| if (repNode.isGlobalCBVLSNDefunct()) { |
| return; |
| } |
| |
| ReentrantReadWriteLock.ReadLock lock = blockLatchLock.readLock(); |
| lock.lock(); |
| try { |
| if (blockTxnLatch.getCount() > 0) { |
| return; |
| } |
| updater.update(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Releases the transaction block latch. |
| */ |
| public void unblockTxnCompletion() { |
| LoggerUtils.info(envLogger, this, "Releasing commit block latch"); |
| blockTxnLatch.countDown(); |
| } |
| |
| /** |
| * This hook is used primarily to perform the final checks before allowing |
| * the commit operation to proceed. The following checks are performed |
| * here: |
| * |
| * 1) Check for master |
| * 2) Check for sufficient Feeder connections to ensure that the commit |
| * policy could be implemented. There is no guarantee that they will all |
| * ack the commit request. |
| * |
| * The method also associates a latch with the transaction. The latch is |
| * used to delay the commit operation until a sufficient number of commits |
| * have been received. |
| * |
| * In addition, when mastership transfers are done, and this node is the |
| * original master, commits and aborts are blocked so as to avoid hard |
| * recovery after electing a new master, see [#18081]. |
| * |
| * @param txn the master transaction being committed |
| * |
| * @throws InsufficientReplicasException if the feeder is not in contact |
| * with enough replicas. |
| * @throws RestartRequiredException if the environment is invalid. |
| * @throws UnknownMasterException if the current master is unknown. |
| * @throws ReplicaWriteException if the node transitioned to a Replica |
| * after the transaction was initiated. |
| */ |
| public void preLogCommitHook(MasterTxn txn) |
| throws InsufficientReplicasException, |
| RestartRequiredException, |
| UnknownMasterException, |
| ReplicaWriteException, |
| EnvironmentFailureException { |
| |
| checkIfInvalid(); |
| checkIfMaster(txn); |
| checkBlock(txn); |
| |
| /* Still a master, check for a sufficient number of connections */ |
| int activeReplicaCount = |
| repNode.feederManager().activeAckReplicaCount(); |
| ReplicaAckPolicy ackPolicy = |
| txn.getCommitDurability().getReplicaAck(); |
| int requiredAckCount = txn.getRequiredAckCount(); |
| |
| if (envLogger.isLoggable(Level.FINE)) { |
| LoggerUtils.fine(envLogger, this, |
| "Txn " + txn.getId() + " requires: " + |
| requiredAckCount + " active: " + |
| activeReplicaCount + |
| " replica acks. Commit Policy: " + ackPolicy); |
| } |
| |
| if (requiredAckCount > activeReplicaCount) { |
| /* Check for possible activation of Primary */ |
| if (ackPolicy.equals(ReplicaAckPolicy.SIMPLE_MAJORITY) && |
| repNode.getArbiter().activateArbitration()) { |
| txn.resetRequiredAckCount(); |
| } else if (useArbiter(txn)) { |
| /* |
| * Note we could change the check to allow a degraded |
| * write from any group size. Limit is place at rep group |
| * size of two due to the priority of requirements |
| * and lower cost of testing. |
| */ |
| txn.setArbiterAck(true); |
| } else { |
| /* |
| * Capture the set to ensure it's consistent with the exception |
| * message. |
| */ |
| final boolean includeArbiters = |
| !ackPolicy.equals(ReplicaAckPolicy.ALL); |
| final Set<String> activeAckRepSet = |
| repNode.feederManager().activeAckReplicas(includeArbiters); |
| |
| if (requiredAckCount > activeAckRepSet.size()) { |
| /* No change in window, throw exception */ |
| InsufficientReplicasException ire = |
| new InsufficientReplicasException |
| (txn, ackPolicy, requiredAckCount, activeAckRepSet); |
| LoggerUtils.info(envLogger, this, ire.getMessage()); |
| throw ire; |
| } |
| |
| /** |
| * A new replica became active in the window between the |
| * first fast check and the second slower check, just continue |
| */ |
| } |
| } |
| feederTxns.setupForAcks(txn); |
| } |
| |
| /* |
| * Block transaction commits/aborts if this node is the original master |
| * and we're doing Master Transfer. |
| */ |
| private void checkBlock(MasterTxn txn) { |
| try { |
| |
| /* |
| * Lock out the setting of the block latch by Master Transfer in |
| * the interval between waiting on the latch and setting the VLSN |
| * for the commit: Master Transfer needs to get a coherent idea of |
| * the final VLSN when it sets the latch. This lock will be |
| * released by the {@code postLogXxxHook()} functions, one of which |
| * is guaranteed to be called, unless an Environment-invalidating |
| * exception occurs. |
| */ |
| if (txn.lockOnce()) { |
| blockLatchLock.readLock().lockInterruptibly(); |
| } |
| |
| if (blockTxnLatch.getCount() > 0) { |
| LoggerUtils.info(envLogger, this, |
| "Block transaction: " + txn.getId() + |
| " pending master transfer. Write locks = " + |
| txn.getWriteLockIds()); |
| } |
| |
| final long txnTimeout = txn.getTxnTimeout(); |
| if (txnTimeout <= 0) { |
| blockTxnLatch.await(); |
| } else if (! blockTxnLatch.await(txnTimeout, |
| TimeUnit.MILLISECONDS)) { |
| |
| final String message = |
| "Timed out waiting for master transfer. " + |
| "Configured transaction timeout:" + txnTimeout + "ms"; |
| |
| throw new TransactionTimeoutException(txn, message); |
| } |
| |
| checkIfInvalid(); |
| |
| /* |
| * Check again, after the block! The block may be a result of a |
| * master->replica transfer, and if this node transitions from |
| * master to replica, this node will be disqualified from being |
| * able to commit transactions. |
| */ |
| checkIfMaster(txn); |
| |
| } catch (InterruptedException e) { |
| throw new ThreadInterruptedException(this, e); |
| } |
| } |
| |
| /** |
| * It ensures that the feeder obtains the requisite number of |
| * acknowledgments required for a successful commit. |
| * |
| * @param txn The MasterTxn that was committed locally. |
| * |
| * @throws InterruptedException if the thread was interrupted while |
| * waiting for acknowledgments. |
| * @throws InsufficientAcksException if the master received an insufficient |
| * number of commit acknowledgments within the replica commit timeout |
| * period. |
| * @throws EnvironmentFailureException |
| */ |
| public void postLogCommitHook(MasterTxn txn, LogItem commitItem) |
| throws InsufficientAcksException, |
| InterruptedException, |
| EnvironmentFailureException { |
| final long txnTimeout = txn.getTxnTimeout(); |
| int timeout = replicaAckTimeout; |
| |
| if ((txnTimeout != 0) && (txnTimeout < replicaAckTimeout)) { |
| timeout = (int) txnTimeout; |
| } |
| postLogCommitHookInternal(txn, commitItem, timeout); |
| } |
| |
| |
| private void postLogCommitHookInternal(MasterTxn txn, LogItem commitItem, |
| int ackTimeout) |
| throws InsufficientAcksException, |
| InterruptedException, |
| EnvironmentFailureException { |
| if (txn.unlockOnce()) { |
| blockLatchLock.readLock().unlock(); |
| } |
| |
| if (!isValid()) { |
| final int currentRequiredAckCount = repNode.getDurabilityQuorum(). |
| getCurrentRequiredAckCount(txn.getCommitDurability(). |
| getReplicaAck()); |
| if (currentRequiredAckCount > 0) { |
| /* Throw a more actionable and accurate exception than EFE */ |
| final String msg = |
| "Acks could not be obtained because the environment" + |
| "was invalidated"; |
| LoggerUtils.info(envLogger, this, msg); |
| throw new InsufficientAcksException(msg); |
| } |
| /* No acks are required, transaction is complete. */ |
| return; |
| } |
| |
| /* Check if using Arbiter for transaction. */ |
| if (txn.getArbiterAck()) { |
| // get the arbiter acker source and add txn id to its queue. |
| Feeder arbFeeder = repNode.feederManager().getArbiterFeeder(); |
| if (arbFeeder != null) { |
| ArbiterFeederSource as = arbFeeder.getArbiterFeederSource(); |
| as.addCommit(commitItem); |
| } |
| } |
| |
| /* Don't do master check, the transaction has already been committed */ |
| try { |
| feederTxns.awaitReplicaAcks(txn, ackTimeout); |
| } catch (InsufficientAcksException e) { |
| if (txn.getArbiterAck() == false && useArbiter(txn)) { |
| txn.setArbiterAck(true); |
| postLogCommitHookInternal(txn, commitItem, arbiterAckTimeout); |
| return; |
| } |
| LoggerUtils.info(envLogger, this, e.getMessage()); |
| throw e; |
| } |
| } |
| |
| /** |
| * Invoked before aborting a MasterTxn, this happens when the master is |
| * going to be a replica because of mastership transfer. We do this to make |
| * sure that the replica going to be the master has the most recent log and |
| * no hard recovery would happen after its election, see SR [#18081]. |
| * |
| * @param txn The MasterTxn that was aborted locally. |
| * |
| * @throws ReplicaWriteException if the node transitioned to a Replica |
| * after the transaction was initiated. |
| * @throws UnknownMasterException if the current master is unknown. |
| * @throws EnvironmentFailureException |
| */ |
| public void preLogAbortHook(MasterTxn txn) |
| throws EnvironmentFailureException, |
| ReplicaWriteException, |
| UnknownMasterException { |
| |
| checkIfInvalid(); |
| checkIfMaster(txn); |
| checkBlock(txn); |
| } |
| |
| /** |
| * Releases the block latch lock, if held. This hook is called in the |
| * normal course of Txn.abort(), once the abort log record has been written |
| * and the associated VLSN stored. |
| */ |
| public void postLogAbortHook(MasterTxn txn) { |
| if (txn.unlockOnce()) { |
| blockLatchLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Removes any pending acknowledgments that were registered by the |
| * preLogCommitHook. This hook is called only when a {@code commit()} |
| * fails and therefore must be aborted. |
| */ |
| public void postLogCommitAbortHook(MasterTxn txn) { |
| LoggerUtils.info(envLogger, this, |
| "post log abort hook for txn: " + txn.getId()); |
| if (txn.unlockOnce()) { |
| blockLatchLock.readLock().unlock(); |
| } |
| feederTxns.clearTransactionAcks(txn); |
| } |
| |
| /** |
| * Create a ReplayTxn for recovery processing. |
| */ |
| @Override |
| public Txn createReplayTxn(long txnId) |
| throws DatabaseException { |
| |
| return |
| new ReplayTxn(this, TransactionConfig.DEFAULT, txnId, envLogger); |
| } |
| |
| /** |
| * Used by environment recovery to get a tracker to collect VLSN-LSN |
| * mappings that are within the recovery part of the log. These might |
| * not be reflected in the persistent mapping db. |
| */ |
| @Override |
| public VLSNRecoveryProxy getVLSNProxy() { |
| int stride = configManager.getInt(RepParams.VLSN_STRIDE); |
| int maxMappings = configManager.getInt(RepParams.VLSN_MAX_MAP); |
| int maxDist = configManager.getInt(RepParams.VLSN_MAX_DIST); |
| |
| return new VLSNRecoveryTracker(this, stride, maxMappings, maxDist); |
| } |
| |
| public UUID getUUID() { |
| return repNode.getUUID(); |
| } |
| |
| /** |
| * Used during testing to introduce artificial clock skews. |
| */ |
| public static void setSkewMs(int skewMs) { |
| clockSkewMs = skewMs; |
| } |
| |
| public static int getClockSkewMs() { |
| return clockSkewMs; |
| } |
| |
| /** |
| * Truncate the head of the VLSNIndex to allow file deletion, if possible. |
| */ |
| @Override |
| public boolean tryVlsnHeadTruncate(long bytesNeeded) { |
| return vlsnIndex.tryTruncateFromHead(bytesNeeded); |
| } |
| |
| /** |
| * Truncate the head of the VLSNIndex to unprotect the given file, if |
| * possible. |
| */ |
| @Override |
| public boolean tryTruncateVlsnHead(VLSN deleteEnd, long deleteFileNum) { |
| return vlsnIndex.tryTruncateFromHead(deleteEnd, deleteFileNum); |
| } |
| |
| /** |
| * Clears the cached checksum for a file when it may be overwritten |
| * (e.g., entries may be erased). |
| */ |
| @Override |
| public void clearedCachedFileChecksum(String fileName) { |
| final RepNode rn = getRepNode(); |
| if (rn != null) { |
| rn.clearedCachedFileChecksum(fileName); |
| } |
| } |
| |
| public int getNodeId() { |
| return nameIdPair.getId(); |
| } |
| |
| public NameIdPair getNameIdPair() { |
| return nameIdPair; |
| } |
| |
| @Override |
| public long getReplayTxnTimeout() { |
| return replayTxnTimeout; |
| } |
| |
| /* Return the default consistency policy. */ |
| @Override |
| public ReplicaConsistencyPolicy getDefaultConsistencyPolicy() { |
| return defaultConsistencyPolicy; |
| } |
| |
| @Override |
| public ReplicaConsistencyPolicy getConsistencyPolicy(String propValue) { |
| return RepUtils.getReplicaConsistencyPolicy(propValue); |
| } |
| |
| /** |
| * The default consistency is not currently mutable in the API, but can be |
| * set for testing purposes. |
| * |
| * TODO: Make it mutable in the API, since Durability is mutable. |
| */ |
| public void setDefaultConsistencyPolicy(ReplicaConsistencyPolicy policy) { |
| defaultConsistencyPolicy = policy; |
| } |
| |
| /* Returns the on disk LSN for VLSN. */ |
| private long getLsnForVLSN(VLSN vlsn, int readBufferSize) { |
| /* Returns the file number which is nearest to the vlsn. */ |
| long fileNumber = vlsnIndex.getLTEFileNumber(vlsn); |
| |
| /* Start reading from the nearest file. */ |
| FeederReader feederReader = |
| new FeederReader(this, |
| vlsnIndex, |
| DbLsn.makeLsn(fileNumber, 0), |
| readBufferSize); |
| |
| try { |
| feederReader.initScan(vlsn); |
| |
| /* |
| * Go on scan the log until FeederReader find the target VLSN, |
| * thrown out an EnvironmentFailureException if it can't be found. |
| */ |
| if (!feederReader.readNextEntry()) { |
| throw EnvironmentFailureException.unexpectedState |
| ("VLSN not found: " + vlsn); |
| } |
| } catch (IOException e) { |
| throw EnvironmentFailureException.unexpectedException(e); |
| } |
| |
| return feederReader.getLastLsn(); |
| } |
| |
| /** |
| * Returns the logged durable txn VLSN. The logged DTVLSN is part of the |
| * last txn commit or abort entry. |
| * |
| * @return the persistent DTVLSN. The value may be |
| * VLSN.UNINITIALIZED_VLSN_SEQUENCE if the environment was newly created, |
| * that is, it has no transactions in it as yet, or if the last entry was |
| * created by a pre-DTVLSN master. |
| * |
| * @throws FileNotFoundException if the file containing the last txn commit |
| * or abort entry does not exist |
| */ |
| public long getLoggedDTVLSN() |
| throws FileNotFoundException { |
| |
| final VLSN lastTxnEnd = getLastTxnEnd(); |
| |
| if (lastTxnEnd.isNull()) { |
| /* A brand new environment with no transactions in it. */ |
| return VLSN.UNINITIALIZED_VLSN_SEQUENCE; |
| } |
| |
| final long lsn = getLsnForVLSN(lastTxnEnd, |
| 1024 /* buffer size for txn end */); |
| final TxnEnd txnEnd = (TxnEnd) getLogManager(). |
| getLogEntryHandleNotFound(lsn).getMainItem(); |
| |
| long dtvlsn = txnEnd.getDTVLSN(); |
| if (dtvlsn != VLSN.UNINITIALIZED_VLSN_SEQUENCE) { |
| return dtvlsn; |
| } |
| |
| /* A JE version <= 7.1 log entry. */ |
| LoggerUtils.logMsg(envLogger, this, Level.INFO, |
| "Pre DTVLSN log, starting with zero dtvlsn"); |
| |
| return dtvlsn; |
| } |
| |
| /* Returns the end of the log. */ |
| @Override |
| public long getEndOfLog() { |
| return vlsnIndex.getRange().getLast().getSequence(); |
| } |
| |
| /** |
| * Returns true if the VLSN is preserved as the record version. |
| */ |
| @Override |
| public boolean getPreserveVLSN() { |
| return preserveVLSN; |
| } |
| |
| /** |
| * Returns true if the VLSN is both preserved and cached. |
| */ |
| @Override |
| public boolean getCacheVLSN() { |
| return preserveVLSN && cacheVLSN; |
| } |
| |
| /** |
| * @see EnvironmentImpl#getName |
| */ |
| @Override |
| public String getName() { |
| return nameIdPair + ":" + super.getName(); |
| } |
| |
| @Override |
| public String getNodeName() { |
| return nameIdPair.getName(); |
| } |
| |
| @Override |
| public boolean getIsMaster() { |
| return getState().isMaster(); |
| } |
| |
| /** |
| * Return true if this environment is part of a replication group. |
| */ |
| @Override |
| public boolean isReplicated() { |
| return true; |
| } |
| |
| /** |
| * Return true if this environment is used as an Arbiter. |
| */ |
| @Override |
| public boolean isArbiter() { |
| return isArbiter; |
| } |
| |
| /** |
| * Check whether this environment can be opened on an existing environment |
| * directory. |
| */ |
| @Override |
| public void checkRulesForExistingEnv(boolean dbTreeReplicatedBit, |
| boolean dbTreePreserveVLSN) |
| throws UnsupportedOperationException { |
| |
| if (!dbTreeReplicatedBit) { |
| |
| /* |
| * We are attempting to open an existing, non-replicated |
| * environment. |
| */ |
| throw new UnsupportedOperationException |
| ("This environment must be converted for replication." + |
| " using com.sleepycat.je.rep.util.DbEnableReplication."); |
| } |
| |
| /* The preserveVLSN setting is forever immutable. */ |
| if (dbTreePreserveVLSN != getPreserveVLSN()) { |
| throw new IllegalArgumentException |
| (RepParams.PRESERVE_RECORD_VERSION.getName() + |
| " parameter may not be changed." + |
| " Previous value: " + dbTreePreserveVLSN + |
| " New value: " + getPreserveVLSN()); |
| } |
| } |
| |
| /** |
| * Returns the hostname associated with this node. |
| * |
| * @return the hostname |
| */ |
| public String getHostName() { |
| String hostAndPort = configManager.get(RepParams.NODE_HOST_PORT); |
| int colonToken = hostAndPort.indexOf(":"); |
| return (colonToken >= 0) ? |
| hostAndPort.substring(0, colonToken) : |
| hostAndPort; |
| } |
| |
| /** |
| * Returns the socket address associated with this node, with a port of zero |
| * suitable for binding locally to any free port associated with the |
| * address. |
| */ |
| public InetSocketAddress getHostAddress() { |
| return new InetSocketAddress(getHostName(), 0); |
| } |
| |
| /** |
| * Returns the options used to configure the feeder channel connection. |
| * |
| * Note that soTimeout is not set since it's a blocking channel and |
| * setSoTimeout has no effect on a blocking nio channel. |
| * |
| * Push responses out rapidly, they are small (heart beat or commit |
| * response) and need timely delivery to the master. (tcpNoDelay = true) |
| * |
| */ |
| public ConnectOptions getFeederConnectOptions() { |
| return new ConnectOptions(). |
| setTcpNoDelay(true). |
| setReceiveBufferSize(configManager. |
| getInt(REPLICA_RECEIVE_BUFFER_SIZE)). |
| setOpenTimeout(configManager.getDuration(REPSTREAM_OPEN_TIMEOUT)). |
| setBlocking(true); |
| } |
| |
| /** |
| * Returns the port used by the replication node. |
| * |
| * @return the port number |
| */ |
| public int getPort() { |
| String hostAndPort = configManager.get(RepParams.NODE_HOST_PORT); |
| int colonToken = hostAndPort.indexOf(":"); |
| return (colonToken >= 0) ? |
| Integer.parseInt(hostAndPort.substring(colonToken + 1)) : |
| configManager.getInt(RepParams.DEFAULT_PORT); |
| } |
| |
| /* Convenience method for returning replication sockets. */ |
| public InetSocketAddress getSocket() { |
| return new InetSocketAddress(getHostName(), getPort()); |
| } |
| |
| /** |
| * Returns the JE version that is currently running on this node, |
| * consulting the TEST_JE_VERSION configuration parameter for a test |
| * override. |
| */ |
| public JEVersion getCurrentJEVersion() { |
| final String testJEVersion = configManager.get(TEST_JE_VERSION); |
| return testJEVersion.isEmpty() ? |
| JEVersion.CURRENT_VERSION : |
| new JEVersion(testJEVersion); |
| } |
| |
| /** |
| * Returns the set of sockets associated with helper nodes. |
| * |
| * @return the set of helper sockets, returns an empty set if there |
| * are no helpers. |
| */ |
| public Set<InetSocketAddress> getHelperSockets() { |
| String helperHosts = configManager.get(RepParams.HELPER_HOSTS); |
| return HostPortPair.getSockets(helperHosts); |
| } |
| |
| /** |
| * Called when a node has identified itself as the master, which is when |
| * the RepNode.selfElect is called. The database should not exist at |
| * this point. |
| * |
| * Lock hierarchy: {@literal GroupDbLock -> sync on EnvironmentImpl} |
| * @throws DatabaseException |
| */ |
| public DatabaseImpl createGroupDb() |
| throws DatabaseException { |
| |
| assert isMaster(); |
| |
| try { |
| groupDbLock.lockInterruptibly(); |
| } catch (InterruptedException e) { |
| throw EnvironmentFailureException.unexpectedException(e); |
| } |
| |
| try { |
| if (groupDbImpl != null) { |
| throw EnvironmentFailureException.unexpectedState |
| ("GroupDb should not exist."); |
| } |
| |
| DatabaseImpl newDbImpl = null; |
| Txn txn = null; |
| try { |
| TransactionConfig txnConfig = new TransactionConfig(); |
| txnConfig.setDurability(new Durability(SyncPolicy.SYNC, |
| SyncPolicy.SYNC, |
| ReplicaAckPolicy.NONE)); |
| txnConfig.setConsistencyPolicy(NO_CONSISTENCY); |
| txn = new MasterTxn(this, |
| txnConfig, |
| getNameIdPair()); |
| |
| /* Database should not exist yet, create it now */ |
| DatabaseConfig dbConfig = new DatabaseConfig(); |
| dbConfig.setAllowCreate(true); |
| dbConfig.setTransactional(true); |
| dbConfig.setExclusiveCreate(true); |
| dbConfig.setReplicated(true); |
| |
| newDbImpl = getDbTree().createInternalDb |
| (txn, DbType.REP_GROUP.getInternalName(), dbConfig); |
| txn.commit(); |
| txn = null; |
| } finally { |
| if (txn != null) { |
| txn.abort(); |
| } |
| } |
| |
| groupDbImpl = newDbImpl; |
| } finally { |
| groupDbLock.unlock(); |
| } |
| return groupDbImpl; |
| } |
| |
| /** |
| * Open the group db, which should exist already, using NO_CONSISTENCY. |
| */ |
| public DatabaseImpl getGroupDb() |
| throws DatabaseNotFoundException, |
| DatabaseException { |
| |
| return openGroupDb(false /* doLockProbe */); |
| } |
| |
| /** |
| * Open the group db, which should exist already, using NO_CONSISTENCY. Do |
| * not wait on the group db lock, return null if the databaseImpl hasn't |
| * been created and we can't obtain it. |
| * |
| * Lock hierarchy: {@literal GroupDbLock -> sync on EnvironmentImpl} |
| */ |
| public DatabaseImpl probeGroupDb() |
| throws DatabaseException { |
| |
| try { |
| return openGroupDb(true /* doLockProbe */); |
| } catch (DatabaseNotFoundException e) { |
| /* Should never happen, DB should exist. */ |
| throw EnvironmentFailureException.unexpectedException(e); |
| } |
| } |
| |
| /** |
| * Do the work of creating the lock and then assigning the groupDbImpl |
| * field, using NO_CONSISTENCY. |
| * |
| * @throws DatabaseException |
| * @throws DatabaseNotFoundException |
| */ |
| private DatabaseImpl openGroupDb(final boolean doLockProbe) |
| throws DatabaseNotFoundException, DatabaseException { |
| |
| /* Acquire the lock. */ |
| try { |
| if (doLockProbe) { |
| if (!groupDbLock.tryLock(1, TimeUnit.MILLISECONDS)) { |
| /* Contention, try later. */ |
| return null; |
| } |
| } else { |
| groupDbLock.lockInterruptibly(); |
| } |
| } catch(InterruptedException e) { |
| throw EnvironmentFailureException.unexpectedException(e); |
| } |
| |
| Txn txn = null; |
| try { |
| if (groupDbImpl != null) { |
| return groupDbImpl; |
| } |
| |
| DatabaseImpl newDbImpl = null; |
| TransactionConfig txnConfig = new TransactionConfig(); |
| txnConfig.setConsistencyPolicy(NO_CONSISTENCY); |
| txn = new ReadonlyTxn(this, txnConfig); |
| |
| newDbImpl = getDbTree().getDb(txn, |
| DbType.REP_GROUP.getInternalName(), |
| null /* databaseHandle */, |
| false); |
| if (newDbImpl == null) { |
| throw new DatabaseNotFoundException |
| (DbType.REP_GROUP.getInternalName()); |
| } |
| txn.commit(); |
| txn = null; |
| |
| groupDbImpl = newDbImpl; |
| return groupDbImpl; |
| } finally { |
| if (txn != null) { |
| txn.abort(); |
| } |
| groupDbLock.unlock(); |
| } |
| } |
| |
| /** |
| * Return true if the node has been configured as a Designated Primary. |
| * This does not necessarily mean that the node is actively operating in |
| * designated primary mode. See |
| * {@link com.sleepycat.je.rep.arbitration.Arbiter#isActive} |
| */ |
| public boolean isDesignatedPrimary() { |
| return getConfigManager().getBoolean(RepParams.DESIGNATED_PRIMARY); |
| } |
| |
| @Override |
| public boolean addDbBackup(DbBackup backup) { |
| synchronized (backups) { |
| if (backupProhibited) { |
| return false; |
| } |
| boolean added = backups.add(backup); |
| assert added; |
| } |
| |
| super.addDbBackup(backup); |
| return true; |
| } |
| |
| @Override |
| public void removeDbBackup(DbBackup backup) { |
| synchronized (backups) { |
| boolean removed = backups.remove(backup); |
| assert removed; |
| } |
| super.removeDbBackup(backup); |
| } |
| |
| /* Invalidate all the on going DbBackups, used in Replay.rollback(). */ |
| public void invalidateBackups(long fileNumber) { |
| synchronized (backups) { |
| for (DbBackup backup : backups) { |
| backup.invalidate(fileNumber); |
| } |
| } |
| } |
| |
| /* Set the backupProhibited status, used in Replay.rollback(). */ |
| public void setBackupProhibited(boolean backupProhibited) { |
| synchronized (backups) { |
| this.backupProhibited = backupProhibited; |
| } |
| } |
| |
| /* For creating a rep exception from standalone code. */ |
| @Override |
| public LockPreemptedException |
| createLockPreemptedException(Locker locker, Throwable cause) { |
| return new LockPreemptedException(locker, cause); |
| } |
| |
| /* For creating a rep exception from standalone code. */ |
| @Override |
| public DatabasePreemptedException |
| createDatabasePreemptedException(String msg, |
| String dbName, |
| Database db) { |
| return new DatabasePreemptedException(msg, dbName, db); |
| } |
| |
| /* For creating a rep exception from standalone code. */ |
| @Override |
| public LogOverwriteException createLogOverwriteException(String msg) { |
| return new LogOverwriteException(msg); |
| } |
| |
| @Override |
| public int getReplayFreeDiskPercent() { |
| return getConfigManager().getInt(REPLAY_FREE_DISK_PERCENT); |
| } |
| |
| /** |
| * Sets up the environment for group shutdown when the environment is |
| * closed. |
| * |
| * @see ReplicatedEnvironment#shutdownGroup(long, TimeUnit) |
| */ |
| public void shutdownGroupSetup(long timeoutMs) { |
| final int openCount = getAppOpenCount(); |
| if (openCount > 1) { |
| throw new IllegalStateException |
| ("Environment has " + (openCount - 1) + |
| " additional open handles."); |
| } |
| |
| final int backupCount = getBackupCount(); |
| if (backupCount > 0) { |
| throw new IllegalStateException |
| ("Environment has " + backupCount + |
| " DbBackups in progress."); |
| } |
| |
| repNode.shutdownGroupOnClose(timeoutMs); |
| } |
| |
| public String transferMaster(Set<String> replicas, |
| long timeout, |
| boolean force) { |
| return repNode.transferMaster(replicas, timeout, force); |
| } |
| |
| /** |
| * Dump interesting aspects of the node's state. Currently for debugging |
| * use, possibly useful for field support. |
| */ |
| public String dumpState() { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append(getNameIdPair()); |
| sb.append("[").append(getState()).append("] " ); |
| |
| if (repNode != null) { |
| sb.append(repNode.dumpState()); |
| } |
| |
| if (vlsnIndex != null) { |
| sb.append("vlsnRange="); |
| sb.append(vlsnIndex.getRange()).append("\n"); |
| } |
| |
| if (replay != null) { |
| sb.append(replay.dumpState()); |
| } |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Dumps the state associated with all active Feeders that supply |
| * acknowledgments, along with identifying information about the node and |
| * its current HA state. |
| */ |
| public String dumpAckFeederState() { |
| return getNameIdPair() + "[" + getState() + "]" + |
| repNode.dumpAckFeederState() ; |
| } |
| |
| /** |
| * If this node was started with a hard recovery, preserve that |
| * information. |
| */ |
| public void setHardRecoveryInfo(RollbackException e) { |
| hardRecoveryStat.set(true); |
| hardRecoveryInfoStat.set(e.getMessage()); |
| } |
| |
| public StatGroup getNodeStats() { |
| return nodeStats; |
| } |
| |
| /** |
| * Ensure that the in-memory vlsn index encompasses all logged entries |
| * before it is flushed to disk. A No-Op for non-replicated systems. |
| * [#19754] |
| */ |
| @Override |
| public void awaitVLSNConsistency() { |
| vlsnIndex.awaitConsistency(); |
| } |
| |
| public void setSyncupProgress(SyncupProgress progress) { |
| setSyncupProgress(progress, 0, -1); |
| } |
| |
| public void setSyncupProgress(SyncupProgress progress, long n, long total) { |
| if (syncupProgressListener == null) { |
| return; |
| } |
| |
| if (!(syncupProgressListener.progress(progress, n, total))) { |
| throw new EnvironmentFailureException |
| (this, EnvironmentFailureReason.PROGRESS_LISTENER_HALT, |
| "ReplicatedEnvironmentConfig.syncupProgressListener: "); |
| } |
| } |
| |
| /** |
| * Test method to create pre-DTVLSN logs. When this is turned on in a test |
| * environment the dtvlsn value in the log is written as a |
| * UNINITIALIZED_VLSN_SEQUENCE (zero), which is the value that |
| * deserialization would assign to it if a new replica came across an older |
| * version commit or abort record. |
| */ |
| public static void setSimulatePreDTVLSNMaster( |
| boolean simulatePreDTVLSNMaster) { |
| |
| RepImpl.simulatePreDTVLSNMaster = simulatePreDTVLSNMaster; |
| } |
| |
| public static boolean isSimulatePreDTVLSNMaster() { |
| return simulatePreDTVLSNMaster; |
| } |
| |
| public LogFileRewriteListener getLogRewriteListener() { |
| return logRewriteListener; |
| } |
| |
| public ReplicationNetworkConfig getRepNetConfig() { |
| return repNetConfig; |
| } |
| |
| public DataChannelFactory getChannelFactory() { |
| initializeChannelFactory(); |
| return channelFactory; |
| } |
| |
| @Override |
| public void invalidate(EnvironmentFailureException e) { |
| super.invalidate(e); |
| unblockTxnCompletion(); |
| } |
| |
| public VLSN getLastTxnEnd() { |
| return vlsnIndexAccess.getLastTxnEnd(); |
| } |
| |
| /** |
| * Private class to prevent used of the close() method by the application |
| * on an internal handle. |
| */ |
| private static class InternalReplicatedEnvironment |
| extends ReplicatedEnvironment { |
| |
| public InternalReplicatedEnvironment(File environmentHome, |
| ReplicationConfig cloneRepConfig, |
| EnvironmentConfig cloneConfig, |
| RepImpl envImpl) { |
| super(environmentHome, cloneRepConfig, cloneConfig, |
| null /*consistencyPolicy*/, null /*initialElectionPolicy*/, |
| false /*joinGroup*/, envImpl); |
| } |
| |
| @Override |
| protected boolean isInternalHandle() { |
| return true; |
| } |
| |
| @Override |
| public synchronized void close() { |
| throw EnvironmentFailureException.unexpectedState |
| ("close() not permitted on an internal environment handle"); |
| } |
| } |
| |
| /** |
| * Peruse the environment wide transaction table, and return a set of |
| * all existing MasterTxns. |
| */ |
| public Set<MasterTxn> getExistingMasterTxns() { |
| return getTxnManager().getTxns(MasterTxn.class); |
| } |
| |
| /** |
| * RepImpl supplies the last txn abort or commit vlsn for use cases such as |
| * determining how caught up a feeder or master transfer is. This info is |
| * usually obtained from the VLSNRange via the VLSNIndex, but in some types |
| * of environment shutdowns, the VLSNIndex may need to be nulled out. When |
| * that happens, VLSNIndexAccess will switch over from using the VLSNIndex |
| * to obtain the range, to using a reference to the last known |
| * VLSNRange. Note that the VLSNRange instance held within VLSNIndex is |
| * constantly being replaced when the replication stream is active., and |
| * that's why LastTxnEndAccess generally obtains the range via the |
| * VLSNIndex, rather keeping a reference to a VLSNRange instance. |
| */ |
| private class VLSNIndexAccess { |
| |
| private VLSNRange savedRange; |
| |
| synchronized VLSN getLastTxnEnd() { |
| if (vlsnIndex != null) { |
| return vlsnIndex.getRange().getLastTxnEnd(); |
| } |
| return savedRange.getLastTxnEnd(); |
| } |
| |
| /** |
| * Save the last range so the lastTxnEnd value can continue |
| * to be available, and null out the vlsnIndex. |
| */ |
| synchronized void closeVLSNIndex(boolean checkpointed) { |
| if (vlsnIndex != null) { |
| vlsnIndex.close(checkpointed); |
| savedRange = vlsnIndex.getRange(); |
| vlsnIndex = null; |
| } |
| } |
| |
| /** |
| * Save the last range so the lastTxnEnd value can continue |
| * to be available, and null out the vlsnIndex. |
| */ |
| synchronized void abnormalCloseVLSNIndex() { |
| if (vlsnIndex != null) { |
| vlsnIndex.abnormalClose(); |
| savedRange = vlsnIndex.getRange(); |
| vlsnIndex = null; |
| } |
| } |
| } |
| |
| /** |
| * Checks that writing records with a TTL is allowed. |
| * |
| * @throws IllegalStateException if any node in the group is less than |
| * JE_TTL_VERSION. |
| */ |
| @Override |
| public void checkTTLAvailable() { |
| if (isTTLAvailable) { |
| return; |
| } |
| checkFeatureAvailable("TTL", TTL.getMinJEVersion()); |
| isTTLAvailable = true; |
| } |
| |
| @Override |
| public void checkRecordExtinctionAvailable() { |
| if (isRecordExtinctionAvailable) { |
| return; |
| } |
| checkFeatureAvailable( |
| "Record Extinction", ExtinctionScanner.getMinJEVersion()); |
| isRecordExtinctionAvailable = true; |
| } |
| |
| private void checkFeatureAvailable(final String name, |
| final JEVersion requiredJEVersion) { |
| try { |
| repNode.setMinJEVersion(requiredJEVersion); |
| } catch (MinJEVersionUnsupportedException e) { |
| if (e.nodeVersion == null) { |
| throw new IllegalStateException( |
| name + " is not currently supported." + |
| " The version running on node " + e.nodeName + |
| " could not be determined," + |
| " but this feature requires version " + |
| requiredJEVersion.getNumericVersionString() + |
| " or later."); |
| } |
| throw new IllegalStateException( |
| name + " is not currently supported." + |
| " Node " + e.nodeName + " is running version " + |
| e.nodeVersion.getNumericVersionString() + |
| ", but this feature requires version " + |
| requiredJEVersion.getNumericVersionString() + |
| " or later."); |
| } |
| } |
| |
| /** |
| * Recovery encountered a RestoreRequired marker. |
| */ |
| @Override |
| public void handleRestoreRequired(RestoreRequired restoreRequired) { |
| |
| switch (restoreRequired.getFailureType()) { |
| case NETWORK_RESTORE: |
| /* |
| * A network restore must be done to get a coherent copy of |
| * the log files into this environment's directory. |
| */ |
| throw new InsufficientLogException( |
| restoreRequired.getProperties(), |
| configManager.get(RepParams.HELPER_HOSTS), |
| repNetConfig); |
| default: |
| /* Not a type we can handle, go to the default behavior */ |
| super.handleRestoreRequired(restoreRequired); |
| } |
| } |
| |
| private boolean useArbiter(MasterTxn txn) { |
| if (allowArbiterAck && |
| repNode.getGroup().getAckGroupSize() == 2 && |
| repNode.feederManager().activeAckArbiterCount() > 0 && |
| txn.getCommitDurability().getReplicaAck() == |
| ReplicaAckPolicy.SIMPLE_MAJORITY) { |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void setReplicaLatestVLSNSeq(long seq) { |
| vlsnIndex.setReplicaLatestVLSNSeq(seq); |
| } |
| |
| public void setAuthenticator(StreamAuthenticator authenticator) { |
| this.authenticator = authenticator; |
| } |
| |
| public StreamAuthenticator getAuthenticator() { |
| return authenticator; |
| } |
| } |