| /*- |
| * Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved. |
| * |
| * This file was distributed by Oracle as part of a version of Oracle Berkeley |
| * DB Java Edition made available at: |
| * |
| * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html |
| * |
| * Please see the LICENSE file included in the top-level directory of the |
| * appropriate version of Oracle Berkeley DB Java Edition for a copy of the |
| * license and additional information. |
| */ |
| |
| package com.sleepycat.je.rep.arbiter.impl; |
| |
| import static com.sleepycat.je.log.LogEntryType.LOG_TXN_COMMIT; |
| import static com.sleepycat.je.rep.arbiter.impl.ArbiterStatDefinition.ARB_MASTER; |
| import static com.sleepycat.je.rep.arbiter.impl.ArbiterStatDefinition.ARB_N_ACKS; |
| import static com.sleepycat.je.rep.arbiter.impl.ArbiterStatDefinition.ARB_N_REPLAY_QUEUE_OVERFLOW; |
| |
| import java.io.IOException; |
| import java.net.ConnectException; |
| import java.nio.channels.ClosedByInterruptException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.sleepycat.je.DatabaseException; |
| import com.sleepycat.je.Durability.SyncPolicy; |
| import com.sleepycat.je.EnvironmentFailureException; |
| import com.sleepycat.je.StatsConfig; |
| import com.sleepycat.je.dbi.DbConfigManager; |
| import com.sleepycat.je.log.entry.LogEntry; |
| import com.sleepycat.je.rep.GroupShutdownException; |
| import com.sleepycat.je.rep.NodeType; |
| import com.sleepycat.je.rep.ReplicatedEnvironment; |
| import com.sleepycat.je.rep.UnknownMasterException; |
| import com.sleepycat.je.rep.impl.RepGroupImpl; |
| import com.sleepycat.je.rep.impl.RepImpl; |
| import com.sleepycat.je.rep.impl.RepParams; |
| import com.sleepycat.je.rep.impl.node.FeederManager; |
| import com.sleepycat.je.rep.impl.node.NameIdPair; |
| import com.sleepycat.je.rep.impl.node.ReplicaOutputThread; |
| import com.sleepycat.je.rep.impl.node.ReplicaOutputThreadBase; |
| import com.sleepycat.je.rep.net.DataChannel; |
| import com.sleepycat.je.rep.stream.BaseProtocol.ShutdownRequest; |
| import com.sleepycat.je.rep.stream.InputWireRecord; |
| import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException; |
| import com.sleepycat.je.rep.stream.Protocol; |
| import com.sleepycat.je.rep.stream.ReplicaFeederHandshake; |
| import com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig; |
| import com.sleepycat.je.rep.utilint.BinaryProtocol.Message; |
| import com.sleepycat.je.rep.utilint.BinaryProtocol.MessageOp; |
| import com.sleepycat.je.rep.utilint.NamedChannel; |
| import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout; |
| import com.sleepycat.je.rep.utilint.RepUtils; |
| import com.sleepycat.je.rep.utilint.RepUtils.Clock; |
| import com.sleepycat.je.rep.utilint.ServiceDispatcher; |
| import com.sleepycat.je.rep.utilint.ServiceDispatcher.Response; |
| import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; |
| import com.sleepycat.je.txn.TxnCommit; |
| import com.sleepycat.je.utilint.LoggerUtils; |
| import com.sleepycat.je.utilint.LongStat; |
| import com.sleepycat.je.utilint.StatGroup; |
| import com.sleepycat.je.utilint.StoppableThread; |
| import com.sleepycat.je.utilint.StringStat; |
| import com.sleepycat.je.utilint.VLSN; |
| |
| /** |
| * The ArbiterAcker is used to acknowledge transactions. A feeder |
| * connection is established with the current master. Commit and Heartbeat |
| * messages are sent by the master. The ArbiterAcker responds and persistently |
| * tracks the high VLSN of the commit messages that it acknowledges. |
| * |
| * There are configuration parameters that are used. |
| * RepParams.REPLICA_MESSAGE_QUEUE_SIZE used for the replay queue size and |
| * in the computation of the output |
| * queue size. |
| * RepParams.REPLICA_TIMEOUT used for the Arbiter feeder channel timeout. |
| * RepParams.PRE_HEARTBEAT_TIMEOUT used for the Arbiter feeder channel timeout |
| * before the first heartbeat is sent. |
| * RepParams.REPLICA_RECEIVE_BUFFER_SIZE used for the datachannel buffer size. |
| * RepParams.REPSTREAM_OPEN_TIMEOUT used for the datachannel open timeout. |
| * RepParams.MAX_CLOCK_DELTA - used for ReplicaFeederHandshake maximum clock |
| * delta. |
| * RepParams.HEARTBEAT_INTERVAL heartbeat interval in millis. |
| * RepParams.ENABLE_GROUP_ACKS enables output thread ack grouping. |
| * |
| * The main Arbiter thread reads messages from the feeder channel and queues |
| * the message on the request queue. The request thread reads entries |
| * from the request queue. The request thread may queue an entry on the |
| * output queue. The ArbiterOutputThread reads from the output queue and |
| * writes to the network channel. |
| * read from network -> ArbiterAcker main thread -> requestQueue |
| * requestQueue -> RequestThread -> outputQueue |
| * outputQueue -> ArbiterOutputThread -> writes to network |
| */ |
| class ArbiterAcker { |
| |
| /* |
| * Defines the possible types of exits that can be requested from the |
| * RequestThread. |
| */ |
| private enum RequestExitType { |
| IMMEDIATE, /* An immediate exit; ignore queued requests. */ |
| SOFT /* Process pending requests in queue, then exit */ |
| } |
| /* Number of times to retry on a network connection failure. */ |
| private static final int NETWORK_RETRIES = 2 ; |
| |
| /* |
| * Service unavailable retries. These are typically the result of service |
| * request being made before the node is ready to provide them. For |
| * example, the feeder service is only available after a node has |
| * transitioned to becoming the master. |
| */ |
| private static final int SERVICE_UNAVAILABLE_RETRIES = 10; |
| |
| /* |
| * The number of ms to wait between above retries, allowing time for the |
| * master to assume its role, and start listening on its port. |
| */ |
| private static final int CONNECT_RETRY_SLEEP_MS = 1000; |
| |
| /* The queue poll interval, 1 second */ |
| private final static long QUEUE_POLL_INTERVAL_NS = 1000000000l; |
| |
| /* The exception that provoked the ArbiterAcker exit. */ |
| private Exception shutdownException = null; |
| |
| private final RepImpl repImpl; |
| private final Logger logger; |
| private NamedChannelWithTimeout arbiterFeederChannel; |
| private final Clock clock; |
| private Protocol protocol; |
| private final ArbiterImpl arbiterImpl; |
| |
| private final BlockingQueue<Long> outputQueue; |
| |
| /* |
| * The message queue used for communications between the network read |
| * thread and the request thread. |
| */ |
| private final BlockingQueue<Message> requestQueue; |
| |
| private ArbiterOutputThread arbiterOutputThread; |
| private RequestThread requestThread; |
| |
| /* |
| * The last commit entry acknowledged. |
| */ |
| private volatile VLSN lastReplayedVLSN = null; |
| |
| /* The in-memory DTVLSN maintained by the Arbiter. */ |
| private long dtvlsn = VLSN.NULL_VLSN_SEQUENCE; |
| |
| /* Statistics */ |
| private final StatGroup stats; |
| |
| /* |
| * The number of times a message entry could not be inserted into |
| * the queue within the poll period and had to be retried. |
| */ |
| private final LongStat nReplayQueueOverflow; |
| /* Number of transactions acknowledged */ |
| private final LongStat nAcks; |
| /* Current or last master that was connected */ |
| private final StringStat masterStat; |
| |
| /* |
| * The maximum number of entries pulled out of the request queue that |
| * are grouped together. There is at most one write to the data file for |
| * this group. |
| */ |
| private final int N_MAX_GROUP_XACT = 100; |
| private final List<Message> groupMessages = new ArrayList<>(); |
| private final List<Long> groupXact = new ArrayList<>(); |
| private final long FSYNC_INTERVAL = 1000; |
| private long lastFSyncTime; |
| |
| ArbiterAcker(ArbiterImpl arbiterImpl, |
| RepImpl repImpl) { |
| this.arbiterImpl = arbiterImpl; |
| this.repImpl = repImpl; |
| logger = repImpl.getLogger(); |
| |
| clock = new Clock(RepImpl.getClockSkewMs()); |
| /* Set up the request queue. */ |
| final int requestQueueSize = repImpl.getConfigManager(). |
| getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE); |
| |
| requestQueue = new ArrayBlockingQueue<>(requestQueueSize); |
| |
| /* |
| * The factor of 2 below is somewhat arbitrary. It should be > 1 X so |
| * that the RequestThread can completely process the buffered |
| * messages in the face of a network drop and 2X to allow for |
| * additional headroom and minimize the chances that the operation |
| * might be blocked due to the limited queue length. |
| */ |
| final int outputQueueSize = 2 * |
| repImpl.getConfigManager().getInt( |
| RepParams.REPLICA_MESSAGE_QUEUE_SIZE); |
| outputQueue = new ArrayBlockingQueue<>(outputQueueSize); |
| |
| stats = new StatGroup(ArbiterStatDefinition.GROUP_NAME, |
| ArbiterStatDefinition.GROUP_DESC); |
| nReplayQueueOverflow = |
| new LongStat(stats, ARB_N_REPLAY_QUEUE_OVERFLOW); |
| nAcks = new LongStat(stats, ARB_N_ACKS); |
| masterStat = new StringStat(stats, ARB_MASTER); |
| } |
| |
| private void initializeConnection() |
| throws ConnectRetryException, |
| IOException { |
| createArbiterFeederChannel(); |
| arbiterImpl.refreshCachedGroup(); |
| ReplicaFeederHandshake handshake = |
| new ReplicaFeederHandshake(new RepFeederHandshakeConfig()); |
| protocol = handshake.execute(); |
| |
| arbiterImpl.refreshCachedGroup(); |
| |
| /* read heartbeat and respond */ |
| protocol.read(arbiterFeederChannel.getChannel(), |
| Protocol.Heartbeat.class); |
| queueAck(ReplicaOutputThread.HEARTBEAT_ACK); |
| |
| /* decrement latch to indicate we are connected */ |
| arbiterImpl.getReadyLatch().countDown(); |
| arbiterImpl.notifyJoinGroup(); |
| } |
| |
| /** |
| * The core Arbiter control loop. The loop exits when it |
| * encounters one of the following possible conditions: |
| * |
| * 1) The connection to the master can no longer be maintained, due to |
| * connectivity issues, or because the master has explicitly shutdown its |
| * connections due to an election. |
| * |
| * 2) The node becomes aware of a change in master, that is, assertSync() |
| * fails. |
| * |
| * 3) The loop is interrupted, which is interpreted as a request to |
| * shutdown the Arbiter node as a whole. |
| * |
| * 4) It fails to establish its node information in the master as it |
| * attempts to join the replication group for the first time. |
| * |
| * Normal exit from this run loop results in the Arbiter node retrying |
| * finding the group master. |
| * A thrown exception, on the other hand, results in the Arbiter |
| * node as a whole terminating its operation and no longer participating in |
| * the replication group, that is, it enters the DETACHED state. |
| * |
| * @throws InterruptedException |
| * @throws DatabaseException if the environment cannot be closed/for a |
| * re-init |
| * @throws GroupShutdownException |
| */ |
| void runArbiterAckLoop() |
| throws InterruptedException, |
| DatabaseException, |
| GroupShutdownException { |
| |
| Class<? extends RetryException> retryExceptionClass = null; |
| int retryCount = 0; |
| try { |
| |
| while (true) { |
| try { |
| runArbiterAckLoopInternal(); |
| /* Normal exit */ |
| break; |
| } catch (RetryException e) { |
| if (!arbiterImpl.getMasterStatus().inSync()) { |
| LoggerUtils.fine(logger, repImpl, |
| "Retry terminated, out of sync."); |
| break; |
| } |
| if ((e.getClass() == retryExceptionClass) || |
| (e.retries == 0)) { |
| if (++retryCount >= e.retries) { |
| /* Exit replica retry elections */ |
| LoggerUtils.info |
| (logger, repImpl, |
| "Failed to recover from exception: " + |
| e.getMessage() + ", despite " + e.retries + |
| " retries.\n" + |
| LoggerUtils.getStackTrace(e)); |
| break; |
| } |
| } else { |
| retryCount = 0; |
| retryExceptionClass = e.getClass(); |
| } |
| LoggerUtils.fine(logger, repImpl, "Retry #: " + |
| retryCount + "/" + e.retries + |
| " Will retry Arbiter loop after " + |
| e.retrySleepMs + "ms. "); |
| Thread.sleep(e.retrySleepMs); |
| if (!arbiterImpl.getMasterStatus().inSync()) { |
| break; |
| } |
| } |
| } |
| } finally { |
| arbiterImpl.resetReadyLatch(shutdownException); |
| } |
| /* Exit use elections to try a different master. */ |
| } |
| |
| void shutdown() { |
| if (requestThread != null) { |
| try { |
| requestThread.shutdownThread(logger); |
| } catch (Exception e) { |
| /* Ignore so shutdown can continue */ |
| LoggerUtils.info(logger, repImpl, |
| "Request thread error shutting down." + e); |
| } |
| } |
| if (arbiterOutputThread != null) { |
| arbiterOutputThread.shutdownThread(logger); |
| try { |
| arbiterOutputThread.join(); |
| } catch(InterruptedException e) { |
| /* Ignore we will clean up via killing IO channel anyway. */ |
| } |
| } |
| RepUtils.shutdownChannel(arbiterFeederChannel); |
| } |
| |
| private void runArbiterAckLoopInternal() |
| throws InterruptedException, |
| RetryException { |
| |
| shutdownException = null; |
| LoggerUtils.info(logger, repImpl, |
| "Arbiter loop started with master: " + |
| arbiterImpl.getMasterStatus().getNodeMasterNameId()); |
| try { |
| initializeConnection(); |
| arbiterImpl.setState(ReplicatedEnvironment.State.REPLICA); |
| doRunArbiterLoopInternalWork(); |
| arbiterImpl.setState(ReplicatedEnvironment.State.UNKNOWN); |
| } catch (ClosedByInterruptException closedByInterruptException) { |
| if (arbiterImpl.isShutdown()) { |
| LoggerUtils.info(logger, repImpl, |
| "Arbiter loop interrupted for shutdown."); |
| return; |
| } |
| LoggerUtils.warning(logger, repImpl, |
| "Arbiter loop unexpected interrupt."); |
| throw new InterruptedException |
| (closedByInterruptException.getMessage()); |
| } catch (IOException | UnknownMasterException e) { |
| /* |
| * Master may have changed with the master shutting down its |
| * connection as a result. Or there may be a lack of quorum |
| * preventing selection of a master. Normal course of events, log |
| * it and return to the outer node level loop. |
| */ |
| LoggerUtils.fine(logger, repImpl, |
| "Arbiter exception: " + e.getMessage() + |
| "\n" + LoggerUtils.getStackTrace(e)); |
| } catch (RetryException e) { |
| /* Propagate it outwards. Node does not need to shutdown. */ |
| throw e; |
| } catch (GroupShutdownException e) { |
| shutdownException = e; |
| throw e; |
| } catch (RuntimeException e) { |
| shutdownException = e; |
| LoggerUtils.severe(logger, repImpl, |
| "Arbiter unexpected exception " + e + |
| " " + LoggerUtils.getStackTrace(e)); |
| throw e; |
| } catch (MasterSyncException e) { |
| /* expected change in masters from an election. */ |
| LoggerUtils.fine(logger, repImpl, e.getMessage()); |
| } catch (Exception e) { |
| shutdownException = e; |
| LoggerUtils.severe(logger, repImpl, |
| "Arbiter unexpected exception " + e + |
| " " + LoggerUtils.getStackTrace(e)); |
| throw EnvironmentFailureException.unexpectedException(e); |
| } finally { |
| loopExitCleanup(); |
| } |
| } |
| |
| protected void doRunArbiterLoopInternalWork() |
| throws Exception { |
| |
| final int timeoutMs = repImpl.getConfigManager(). |
| getDuration(RepParams.REPLICA_TIMEOUT); |
| arbiterFeederChannel.setTimeoutMs(timeoutMs); |
| |
| requestQueue.clear(); |
| outputQueue.clear(); |
| |
| arbiterOutputThread = |
| new ArbiterOutputThread(repImpl, |
| outputQueue, |
| protocol, |
| arbiterFeederChannel.getChannel(), |
| arbiterImpl.getArbiterVLSNTracker()); |
| arbiterOutputThread.start(); |
| |
| requestThread = new RequestThread(); |
| requestThread.start(); |
| |
| long maxPending = 0; |
| |
| try { |
| while (true) { |
| Message message = protocol.read(arbiterFeederChannel); |
| |
| if (arbiterImpl.isShutdownOrInvalid() || (message == null)) { |
| return; |
| } |
| |
| while (!requestQueue. |
| offer(message, |
| QUEUE_POLL_INTERVAL_NS, |
| TimeUnit.NANOSECONDS)) { |
| /* Offer timed out. */ |
| if (!requestThread.isAlive()) { |
| return; |
| } |
| /* Retry the offer */ |
| nReplayQueueOverflow.increment(); |
| } |
| |
| final int pending = requestQueue.size(); |
| if (pending > maxPending) { |
| maxPending = pending; |
| LoggerUtils.fine(logger, repImpl, |
| "Max pending request log items:" + |
| maxPending); |
| } |
| } |
| } catch (IOException ioe) { |
| |
| /* |
| * Make sure messages in the queue are processed. Ensure, in |
| * particular, that shutdown requests are processed and not ignored |
| * due to the IOEException resulting from a closed connection. |
| */ |
| requestThread.exitRequest = RequestExitType.SOFT; |
| } finally { |
| |
| if (requestThread.exitRequest == RequestExitType.SOFT) { |
| |
| /* |
| * Drain all queued messages, exceptions may be generated |
| * in the process. They logically precede IO exceptions. |
| */ |
| requestThread.join(); |
| } |
| |
| try { |
| |
| if (requestThread.exception != null) { |
| /* request thread is dead or exiting. */ |
| throw requestThread.exception; |
| } |
| |
| if (arbiterOutputThread.getException() != null) { |
| throw arbiterOutputThread.getException(); |
| } |
| } finally { |
| |
| /* Ensure thread has exited in all circumstances */ |
| requestThread.exitRequest = RequestExitType.IMMEDIATE; |
| requestThread.join(); |
| arbiterOutputThread.shutdownThread(logger); |
| } |
| } |
| } |
| |
| StatGroup loadStats(StatsConfig config) |
| throws DatabaseException { |
| masterStat.set( |
| arbiterImpl.getMasterStatus().getNodeMasterNameId().toString()); |
| StatGroup copyStats = stats.cloneGroup(config.getClear()); |
| return copyStats; |
| } |
| |
| /** |
| * Performs the cleanup actions upon exit from the internal arbiter loop. |
| * |
| */ |
| private void loopExitCleanup() { |
| |
| if (shutdownException != null) { |
| if (shutdownException instanceof RetryException) { |
| LoggerUtils.fine(logger, repImpl, |
| "Retrying connection to feeder. Message: " + |
| shutdownException.getMessage()); |
| } else if (shutdownException instanceof GroupShutdownException) { |
| LoggerUtils.info(logger, repImpl, |
| "Exiting inner Arbiter loop." + |
| " Master requested shutdown."); |
| } else { |
| LoggerUtils.warning |
| (logger, repImpl, |
| "Exiting inner Arbiter loop with exception " + |
| shutdownException + "\n" + |
| LoggerUtils.getStackTrace(shutdownException)); |
| } |
| } else { |
| LoggerUtils.fine(logger, repImpl, "Exiting inner Arbiter loop." ); |
| } |
| |
| shutdown(); |
| } |
| |
| /** |
| * Returns a channel used by the Arbiter to connect to the Feeder. The |
| * socket is configured with a read timeout that's a multiple of the |
| * heartbeat interval to help detect, or initiate a change in master. |
| * |
| * @throws IOException |
| * @throws ConnectRetryException |
| */ |
| private void createArbiterFeederChannel() |
| throws IOException, ConnectRetryException { |
| |
| DataChannel dataChannel = null; |
| |
| final DbConfigManager configManager = repImpl.getConfigManager(); |
| final int timeoutMs = configManager. |
| getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT); |
| |
| try { |
| dataChannel = |
| repImpl.getChannelFactory(). |
| connect(arbiterImpl.getMasterStatus().getNodeMaster(), |
| repImpl.getHostAddress(), |
| repImpl.getFeederConnectOptions()); |
| |
| arbiterFeederChannel = |
| new NamedChannelWithTimeout(repImpl, |
| logger, |
| arbiterImpl.getChannelTimeoutTask(), |
| dataChannel, |
| timeoutMs); |
| |
| ServiceDispatcher.doServiceHandshake |
| (dataChannel, FeederManager.FEEDER_SERVICE); |
| } catch (ConnectException e) { |
| |
| /* |
| * A network problem, or the node went down between the time we |
| * learned it was the master and we tried to connect. |
| */ |
| throw new ConnectRetryException(e.getMessage(), |
| NETWORK_RETRIES, |
| CONNECT_RETRY_SLEEP_MS); |
| } catch (ServiceConnectFailedException e) { |
| |
| /* |
| * The feeder may not have established the Feeder Service |
| * as yet. For example, the transition to the master may not have |
| * been completed. Wait longer. |
| */ |
| if (e.getResponse() == Response.UNKNOWN_SERVICE) { |
| throw new ConnectRetryException(e.getMessage(), |
| SERVICE_UNAVAILABLE_RETRIES, |
| CONNECT_RETRY_SLEEP_MS); |
| } |
| throw EnvironmentFailureException.unexpectedException(e); |
| } |
| } |
| |
| /** |
| * Process a heartbeat message. It queues a response and updates |
| * the consistency tracker with the information in the heartbeat. |
| * |
| * @param xid |
| * @throws IOException |
| */ |
| private void queueAck(Long xid) |
| throws IOException { |
| try { |
| outputQueue.put(xid); |
| } catch (InterruptedException ie) { |
| |
| /* |
| * Have the higher levels treat it like an IOE and |
| * exit the thread. |
| */ |
| throw new IOException("Ack I/O interrupted", ie); |
| } |
| } |
| |
| /** |
| * Process the shutdown message from the master and return the |
| * GroupShutdownException that must be thrown to exit the Replica loop. |
| * |
| * @return the GroupShutdownException |
| */ |
| private GroupShutdownException processShutdown(ShutdownRequest shutdown) |
| throws IOException { |
| |
| /* |
| * Acknowledge the shutdown message right away, since the checkpoint |
| * operation can take a long time to complete. Long enough to exceed |
| * the feeder timeout on the master. The master only needs to know that |
| * the replica has received the message. |
| */ |
| queueAck(ReplicaOutputThreadBase.SHUTDOWN_ACK); |
| |
| /* |
| * Turn off network timeouts on the replica, since we don't want the |
| * replica to timeout the connection. The connection itself is no |
| * longer used past this point and will be reclaimed as part of normal |
| * replica exit cleanup. |
| */ |
| arbiterFeederChannel.setTimeoutMs(Integer.MAX_VALUE); |
| final String masterHostName = |
| arbiterImpl.getMasterStatus().getGroupMaster().getHostName(); |
| return new GroupShutdownException( |
| logger, |
| repImpl, |
| masterHostName, |
| arbiterImpl.getArbiterVLSNTracker().get(), |
| shutdown.getShutdownTimeMs()); |
| } |
| |
| @SuppressWarnings("serial") |
| static abstract class RetryException extends Exception { |
| final int retries; |
| final int retrySleepMs; |
| |
| RetryException(String message, |
| int retries, |
| int retrySleepMs) { |
| super(message); |
| this.retries = retries; |
| this.retrySleepMs = retrySleepMs; |
| } |
| |
| @Override |
| public String getMessage() { |
| return "Failed after retries: " + retries + |
| " with retry interval: " + retrySleepMs + "ms."; |
| } |
| } |
| |
| /** |
| * Apply the operation represented by this log entry on this Arbiter node. |
| */ |
| private Message replayEntries(Message firstMessage) throws IOException { |
| boolean doSync = false; |
| long highVLSN = 0; |
| Message shutdownMessage = null; |
| groupXact.clear(); |
| groupMessages.clear(); |
| groupMessages.add(firstMessage); |
| requestQueue.drainTo(groupMessages, N_MAX_GROUP_XACT); |
| for (int i = 0;i < groupMessages.size(); i++) { |
| final Message message = groupMessages.get(i) ; |
| |
| final MessageOp messageOp = message.getOp(); |
| |
| if (messageOp == Protocol.SHUTDOWN_REQUEST) { |
| shutdownMessage = message; |
| } else if (messageOp == Protocol.HEARTBEAT) { |
| groupXact.add(ReplicaOutputThreadBase.HEARTBEAT_ACK); |
| } else { |
| InputWireRecord wireRecord = |
| ((Protocol.Entry) message).getWireRecord(); |
| final byte entryType = wireRecord.getEntryType(); |
| lastReplayedVLSN = wireRecord.getVLSN(); |
| |
| if (LOG_TXN_COMMIT.equalsType(entryType)) { |
| Protocol.Commit commitEntry = (Protocol.Commit) message; |
| if (commitEntry.getReplicaSyncPolicy() == |
| SyncPolicy.SYNC) { |
| doSync = true; |
| } |
| |
| LogEntry logEntry = wireRecord.getLogEntry(); |
| if (lastReplayedVLSN.getSequence() > highVLSN) { |
| highVLSN = lastReplayedVLSN.getSequence(); |
| } |
| final TxnCommit masterCommit = |
| (TxnCommit) logEntry.getMainItem(); |
| |
| long nextDTVLSN = masterCommit.getDTVLSN(); |
| if (nextDTVLSN == VLSN.UNINITIALIZED_VLSN_SEQUENCE) { |
| /* Pre-DTVLSN log commit record. */ |
| nextDTVLSN = wireRecord.getVLSN().getSequence(); |
| } |
| /* |
| * The Arbiter, unlike Replicas, does not receive commits |
| * in ascending VLSN order, so discard lower DTVLSNs. |
| */ |
| dtvlsn = nextDTVLSN > dtvlsn ? nextDTVLSN : dtvlsn; |
| groupXact.add(logEntry.getTransactionId()); |
| nAcks.increment(); |
| if (logger.isLoggable(Level.FINEST)) { |
| LoggerUtils.finest(logger, repImpl, |
| "Arbiter ack commit record " + |
| wireRecord); |
| } |
| } else { |
| String errMsg = "Illegal message type recieved by " + |
| " Arbiter. [" + wireRecord + "]"; |
| throw new IllegalStateException(errMsg); |
| } |
| } |
| } |
| |
| if (doSync || |
| (lastFSyncTime + FSYNC_INTERVAL) <= System.currentTimeMillis()) { |
| doSync = true; |
| lastFSyncTime = System.currentTimeMillis(); |
| } |
| |
| arbiterImpl.getArbiterVLSNTracker().write( |
| new VLSN(highVLSN), |
| new VLSN(dtvlsn), |
| arbiterImpl.getMasterStatus().getGroupMasterNameId().getId(), |
| doSync); |
| for (int i = 0; i < groupXact.size(); i++) { |
| queueAck(groupXact.get(i)); |
| } |
| return shutdownMessage; |
| } |
| |
| @SuppressWarnings("serial") |
| static class ConnectRetryException extends RetryException { |
| |
| ConnectRetryException(String message, |
| int retries, |
| int retrySleepMs) { |
| super(message, retries, retrySleepMs); |
| } |
| } |
| |
| class RequestThread extends StoppableThread { |
| |
| volatile private Exception exception; |
| |
| /* |
| * Set asynchronously when a shutdown is being requested. |
| */ |
| volatile RequestExitType exitRequest = null; |
| |
| /* The queue poll interval, 1 second */ |
| private final static long REQUEST_QUEUE_POLL_INTERVAL_NS = 1000000000l; |
| |
| protected RequestThread() { |
| super(repImpl, "RequestThread"); |
| } |
| |
| @Override |
| protected int initiateSoftShutdown() { |
| /* Use immediate, since the stream will continue to be read. */ |
| exitRequest = RequestExitType.IMMEDIATE; |
| return 0; |
| } |
| |
| @Override |
| public void run() { |
| |
| LoggerUtils.fine(logger, repImpl, |
| "Request thread started. Message queue size:" + |
| requestQueue.remainingCapacity()); |
| try { |
| while (true) { |
| final Message message = |
| requestQueue.poll(REQUEST_QUEUE_POLL_INTERVAL_NS, |
| TimeUnit.NANOSECONDS); |
| |
| if ((exitRequest == RequestExitType.IMMEDIATE) || |
| ((exitRequest == RequestExitType.SOFT) && |
| (message == null)) || |
| arbiterImpl.isShutdownOrInvalid()) { |
| return; |
| } |
| arbiterImpl.getMasterStatus().assertSync(); |
| if (message == null) { |
| /* Timeout on poll. */ |
| continue; |
| } |
| Message shutdownMessage = replayEntries(message); |
| if (shutdownMessage != null) { |
| throw processShutdown( |
| (ShutdownRequest) shutdownMessage); |
| } |
| } |
| } catch (Exception e) { |
| exception = e; |
| |
| /* |
| * Bring it to the attention of the main thread by freeing |
| * up the "offer" wait right away. |
| */ |
| requestQueue.clear(); |
| |
| /* |
| * Get the attention of the main arbiter thread in case it's |
| * waiting in a read on the socket channel. |
| */ |
| LoggerUtils.fine(logger, repImpl, |
| "closing arbiterFeederChannel = " + |
| arbiterFeederChannel); |
| RepUtils.shutdownChannel(arbiterFeederChannel); |
| |
| LoggerUtils.info(logger, repImpl, |
| "ArbiterAcker thread exiting with exception:" + |
| e.getMessage()); |
| } |
| } |
| |
| @Override |
| protected Logger getLogger() { |
| return logger; |
| } |
| } |
| |
| private class RepFeederHandshakeConfig |
| implements ReplicaFeederHandshakeConfig { |
| |
| @Override |
| public RepImpl getRepImpl() { |
| return repImpl; |
| } |
| |
| @Override |
| public NameIdPair getNameIdPair() { |
| return arbiterImpl.getNameIdPair(); |
| } |
| |
| @Override |
| public Clock getClock() { |
| return clock; |
| } |
| |
| @Override |
| public NodeType getNodeType() { |
| return NodeType.ARBITER; |
| } |
| |
| @Override |
| public RepGroupImpl getGroup() { |
| return arbiterImpl.getGroup(); |
| } |
| |
| @Override |
| public NamedChannel getNamedChannel() { |
| return arbiterFeederChannel; |
| } |
| } |
| } |