| /*- |
| * Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved. |
| * |
| * This file was distributed by Oracle as part of a version of Oracle Berkeley |
| * DB Java Edition made available at: |
| * |
| * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html |
| * |
| * Please see the LICENSE file included in the top-level directory of the |
| * appropriate version of Oracle Berkeley DB Java Edition for a copy of the |
| * license and additional information. |
| */ |
| |
| package com.sleepycat.je.rep.subscription; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Timer; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.logging.Logger; |
| |
| import com.sleepycat.je.EnvironmentFailureException; |
| import com.sleepycat.je.rep.GroupShutdownException; |
| import com.sleepycat.je.rep.InsufficientLogException; |
| import com.sleepycat.je.rep.NodeType; |
| import com.sleepycat.je.rep.RepInternal; |
| import com.sleepycat.je.rep.ReplicatedEnvironment; |
| import com.sleepycat.je.rep.ReplicationSecurityException; |
| import com.sleepycat.je.rep.impl.RepGroupImpl; |
| import com.sleepycat.je.rep.impl.RepImpl; |
| import com.sleepycat.je.rep.impl.RepParams; |
| import com.sleepycat.je.rep.impl.node.ChannelTimeoutTask; |
| import com.sleepycat.je.rep.impl.node.FeederManager; |
| import com.sleepycat.je.rep.impl.node.NameIdPair; |
| import com.sleepycat.je.rep.impl.node.ReplicaOutputThread; |
| import com.sleepycat.je.rep.net.DataChannel; |
| import com.sleepycat.je.rep.net.DataChannelFactory; |
| import com.sleepycat.je.rep.stream.Protocol; |
| import com.sleepycat.je.rep.stream.ReplicaFeederHandshake; |
| import com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig; |
| import com.sleepycat.je.rep.stream.SubscriberFeederSyncup; |
| import com.sleepycat.je.rep.utilint.BinaryProtocol; |
| import com.sleepycat.je.rep.utilint.NamedChannel; |
| import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout; |
| import com.sleepycat.je.rep.utilint.RepUtils; |
| import com.sleepycat.je.rep.utilint.ServiceDispatcher; |
| import com.sleepycat.je.utilint.InternalException; |
| import com.sleepycat.je.utilint.LoggerUtils; |
| import com.sleepycat.je.utilint.StoppableThread; |
| import com.sleepycat.je.utilint.TestHook; |
| import com.sleepycat.je.utilint.TestHookExecute; |
| import com.sleepycat.je.utilint.VLSN; |
| |
| /** |
| * Main thread created by Subscription to stream log entries from feeder |
| */ |
| class SubscriptionThread extends StoppableThread { |
| |
| /* wait time in ms in soft shut down */ |
| private final static int SOFT_SHUTDOWN_WAIT_MS = 5 * 1000; |
| |
| private final Logger logger; |
| private final SubscriptionConfig config; |
| private final SubscriptionStat stats; |
| |
| /* communication queues and working threads */ |
| private final BlockingQueue<Long> outputQueue; |
| private final BlockingQueue<Object> inputQueue; |
| private SubscriptionProcessMessageThread messageProcThread; |
| |
| /* communication channel between subscriber and feeder */ |
| private NamedChannelWithTimeout namedChannel; |
| /* task to register channel with timeout */ |
| private ChannelTimeoutTask channelTimeoutTask; |
| /* protocol used to communicate with feeder */ |
| private Protocol protocol; |
| |
| /* requested VLSN from which to stream log entries */ |
| private final VLSN reqVLSN; |
| |
| /* |
| * volatile because it can be concurrently accessed by the subscription |
| * thread itself in checkOutputThread(), and another thread trying to |
| * shut down subscription by calling shutdown() |
| */ |
| private volatile SubscriptionOutputThread outputThread; |
| |
| private volatile SubscriptionStatus status; |
| |
| /* stored exception */ |
| private volatile Exception storedException; |
| |
| /* |
| * For unit test only. The hook will be called by unit test to inject an |
| * exception into msg queue, which to be processed by the callback function |
| * defined in unit test. |
| */ |
| private TestHook<SubscriptionThread> exceptionHandlingTestHook; |
| |
| SubscriptionThread(ReplicatedEnvironment env, |
| VLSN reqVLSN, |
| SubscriptionConfig config, |
| SubscriptionStat stats, |
| Logger logger) { |
| |
| super(RepInternal.getNonNullRepImpl(env), "Subscription Main"); |
| setUncaughtExceptionHandler(new SubscriptionThreadExceptionHandler()); |
| |
| this.reqVLSN = reqVLSN; |
| this.config = config; |
| this.stats = stats; |
| this.logger = logger; |
| protocol = null; |
| namedChannel = null; |
| /* init subscription input and output queue */ |
| inputQueue = |
| new ArrayBlockingQueue<>(config.getInputMessageQueueSize()); |
| outputQueue = |
| new ArrayBlockingQueue<>(config.getOutputMessageQueueSize()); |
| |
| status = SubscriptionStatus.INIT; |
| storedException = null; |
| exceptionHandlingTestHook = null; |
| } |
| |
| /** |
| * Returns subscription status to client |
| * |
| * @return subscription status |
| */ |
| public SubscriptionStatus getStatus() { |
| return status; |
| } |
| |
| /** |
| * Returns stored exception |
| * |
| * @return stored exception |
| */ |
| Exception getStoredException() { |
| return storedException; |
| } |
| |
| @Override |
| protected Logger getLogger() { |
| return logger; |
| } |
| |
| @Override |
| protected int initiateSoftShutdown() { |
| return SOFT_SHUTDOWN_WAIT_MS; |
| } |
| |
| @Override |
| public void run() { |
| |
| LoggerUtils.info(logger, envImpl, |
| "Start subscription from VLSN " + reqVLSN + |
| " from feeder at " + |
| config.getFeederHost() + ":" + config.getFeederPort()); |
| |
| try { |
| final int maxRetry = config.getMaxConnectRetries(); |
| boolean auxThreadCreated = false; |
| int numRetry = 0; |
| |
| while (!isShutdown()) { |
| try { |
| initializeConnection(); |
| if (!auxThreadCreated) { |
| LoggerUtils.fine(logger, envImpl, |
| "Create auxiliary msg processing " + |
| "and output threads"); |
| |
| auxThreadCreated = createAuxThread(); |
| if (auxThreadCreated) { |
| /* subscription succeed, start streaming data */ |
| status = SubscriptionStatus.SUCCESS; |
| loopInternal(); |
| } else { |
| status = SubscriptionStatus.UNKNOWN_ERROR; |
| } |
| } |
| break; |
| } catch (ConnectionException e) { |
| if (numRetry == maxRetry) { |
| LoggerUtils.info(logger, envImpl, |
| "Shut down after reaching max " + |
| "retry(" + maxRetry + ") to " + |
| "connect feeder " + |
| config.getFeederHost() + |
| ", error: " + e.getMessage()); |
| LoggerUtils.fine(logger, envImpl, |
| LoggerUtils.getStackTrace(e)); |
| storedException = e; |
| status = SubscriptionStatus.CONNECTION_ERROR; |
| break; |
| } else { |
| numRetry++; |
| LoggerUtils.fine(logger, envImpl, |
| "Fail to connect feeder at " + |
| config.getFeederHost() + |
| " sleep for " + e.getRetrySleepMs() + |
| " ms and re-connect again"); |
| Thread.sleep(e.getRetrySleepMs()); |
| } |
| } |
| } |
| } catch (ReplicationSecurityException ure) { |
| storedException = ure; |
| LoggerUtils.warning(logger, envImpl, |
| "Subscription thread exited due to security " + |
| "check failure message: " + ure.getMessage()); |
| status = SubscriptionStatus.SECURITY_CHECK_ERROR; |
| } catch (GroupShutdownException e) { |
| if (messageProcThread.isAlive()) { |
| try { |
| /* let message processing thread finish up */ |
| messageProcThread.join(); |
| } catch (InterruptedException ie) { |
| /* ignore since we will shut down, just log */ |
| LoggerUtils.fine(logger, envImpl, |
| "exception in shutting down msg proc " + |
| "thread " + ie.getMessage() + |
| "\n" + LoggerUtils.getStackTrace(ie)); |
| } |
| } |
| storedException = e; |
| LoggerUtils.info(logger, envImpl, |
| "received group shutdown " + e.getMessage() + |
| "\n" + LoggerUtils.getStackTrace(e)); |
| status = SubscriptionStatus.GRP_SHUTDOWN; |
| } catch (InsufficientLogException e) { |
| storedException = e; |
| LoggerUtils.info(logger, envImpl, |
| "unable to subscribe from requested VLSN " + |
| reqVLSN + ", error: " + e.getMessage()); |
| LoggerUtils.fine(logger, envImpl, LoggerUtils.getStackTrace(e)); |
| status = SubscriptionStatus.VLSN_NOT_AVAILABLE; |
| } catch (EnvironmentFailureException e) { |
| storedException = e; |
| LoggerUtils.warning(logger, envImpl, |
| "unable to sync up with feeder due to EFE " + |
| e.getMessage()); |
| LoggerUtils.fine(logger, envImpl, LoggerUtils.getStackTrace(e)); |
| status = SubscriptionStatus.UNKNOWN_ERROR; |
| } catch (InterruptedException e) { |
| storedException = e; |
| LoggerUtils.warning(logger, envImpl, |
| "interrupted exception " + e.getMessage()); |
| LoggerUtils.fine(logger, envImpl, LoggerUtils.getStackTrace(e)); |
| status = SubscriptionStatus.UNKNOWN_ERROR; |
| } catch (InternalException e) { |
| storedException = e; |
| LoggerUtils.warning(logger, envImpl, |
| "internal exception " + e.getMessage()); |
| LoggerUtils.fine(logger, envImpl, LoggerUtils.getStackTrace(e)); |
| status = SubscriptionStatus.UNKNOWN_ERROR; |
| } finally { |
| shutdown(); |
| } |
| } |
| |
| /** |
| * For unit test |
| * |
| * @param exceptionHandlingTestHook test hook |
| */ |
| void setExceptionHandlingTestHook( |
| TestHook<SubscriptionThread> exceptionHandlingTestHook) { |
| this.exceptionHandlingTestHook = exceptionHandlingTestHook; |
| } |
| |
| /** |
| * Sets subscription status |
| * |
| * @param s subscription status |
| */ |
| void setStatus(SubscriptionStatus s) { |
| status = s; |
| } |
| |
| /** |
| * shutdown the subscriber and all auxiliary threads, close channel to |
| * the Feeder. |
| */ |
| void shutdown() { |
| |
| /* Note start of shutdown and return if already requested */ |
| if (shutdownDone(logger)) { |
| return; |
| } |
| |
| /* shutdown aux threads */ |
| if (messageProcThread != null) { |
| try { |
| messageProcThread.shutdownThread(logger); |
| LoggerUtils.fine(logger, envImpl, |
| "message processing thread has shut down."); |
| } catch (Exception e) { |
| /* Ignore so shutdown can continue */ |
| LoggerUtils.warning(logger, envImpl, |
| "error in shutdown msg proc thread: " + |
| e.getMessage() + ", continue shutdown the" + |
| " subscription thread."); |
| } finally { |
| messageProcThread = null; |
| } |
| } |
| if (outputThread != null) { |
| try { |
| outputThread.shutdownThread(logger); |
| LoggerUtils.fine(logger, envImpl, |
| "output thread has shut down."); |
| |
| } catch (Exception e) { |
| /* Ignore we will clean up via killing IO channel anyway. */ |
| LoggerUtils.warning(logger, envImpl, |
| "error in shutdown output thread: " + |
| e.getMessage() + ", continue shutdown " + |
| "subscription thread."); |
| } finally { |
| outputThread = null; |
| } |
| } |
| |
| inputQueue.clear(); |
| outputQueue.clear(); |
| |
| shutdownThread(logger); |
| |
| LoggerUtils.info(logger, envImpl, |
| "Subscription thread shut down with status: " + |
| status); |
| |
| /* finally we shut down channel to feeder */ |
| RepUtils.shutdownChannel(namedChannel); |
| if (channelTimeoutTask != null) { |
| channelTimeoutTask.cancel(); |
| } |
| LoggerUtils.fine(logger, envImpl, |
| "Channel to " + config.getFeederHost() + |
| " shut down."); |
| } |
| |
| /** |
| * Enqueue message received from feeder into input queue |
| * |
| * @param message message received from feeder |
| * |
| * @throws InterruptedException if enqueue is interrupted |
| * @throws InternalException if consumer thread is gone unexpectedly |
| */ |
| void offer(Object message) |
| throws InterruptedException, InternalException { |
| |
| /* Don't enqueue msg if thread is shutdown */ |
| if (isShutdown()) { |
| return; |
| } |
| |
| RepImpl repImpl = (RepImpl)envImpl; |
| |
| while (!inputQueue.offer(message, |
| SubscriptionConfig.QUEUE_POLL_INTERVAL_MS, |
| TimeUnit.MILLISECONDS)) { |
| /* |
| * Offer timed out. |
| * |
| * There are three cases: |
| * |
| * Case 1: This thread was shutdown (shutdown() is called) while |
| * waiting to add to the queue. Regardless of the state of the msg |
| * proc thread, we just return and let caller capture the shutdown |
| * signal and exit; |
| * |
| * Case 2: The msg proc thread is dead for some reason, this is an |
| * exception we throw IE and let caller to capture this IE and |
| * exit; |
| * |
| * Case 3: The msg proc thread is alive, try again. |
| */ |
| |
| /* Case 1 */ |
| if (isShutdown()) { |
| return; |
| } |
| |
| if (!messageProcThread.isAlive()) { |
| /* Case 2 */ |
| final String err = "Thread consuming input queue is gone, " + |
| "start shutdown process"; |
| LoggerUtils.warning(logger, repImpl, err); |
| throw new InternalException(err); |
| } else { |
| /* Case 3: count the overflow and retry */ |
| stats.getNumReplayQueueOverflow().increment(); |
| } |
| } |
| } |
| |
| /** |
| * Create connection to feeder and execute handshake |
| * |
| * @throws InternalException if unable to connect to source node due to |
| * protocol error |
| * @throws EnvironmentFailureException if fail to handshake with source, or |
| * source does not have enough log to start streaming |
| * @throws ConnectionException if unable to connect to source node |
| * @throws ReplicationSecurityException if authentication failure |
| */ |
| private void initializeConnection() throws InternalException, |
| EnvironmentFailureException, ConnectionException, |
| ReplicationSecurityException { |
| |
| /* open a channel to feeder */ |
| LoggerUtils.fine(logger, envImpl, |
| "Subscription " + config.getSubNodeName() + |
| " start open channel and handshake with feeder"); |
| |
| try { |
| |
| openChannel(); |
| ReplicaFeederHandshake handshake = |
| new ReplicaFeederHandshake(new SubFeederHandshakeConfig |
| (config.getNodeType())); |
| |
| protocol = handshake.execute(); |
| |
| /* check if negotiated protocol version is high enough */ |
| final int minReqVersion = config.getMinProtocolVersion(); |
| if (protocol.getVersion() < minReqVersion) { |
| throw new BinaryProtocol.ProtocolException( |
| "HA protocol version (" + protocol.getVersion() + ") is " + |
| "lower than minimal required version (" + minReqVersion + |
| ")"); |
| } |
| |
| LoggerUtils.fine(logger, envImpl, |
| "subscription " + config.getSubNodeName() + |
| " sync-up with feeder at vlsn: " + reqVLSN); |
| SubscriberFeederSyncup syncup = |
| new SubscriberFeederSyncup(namedChannel, protocol, |
| config.getFeederFilter(), |
| (RepImpl) envImpl, |
| config.getStreamMode(), |
| config.getPartGenDBName(), |
| logger); |
| final VLSN startVLSN = syncup.execute(reqVLSN); |
| /* after sync-up the part generation db id is ready */ |
| stats.setPartGenDBId(syncup.getPartGenDBId()); |
| LoggerUtils.fine(logger, envImpl, |
| "sync-up with feeder done, start vlsn: " + |
| startVLSN + ", partition generation db id " + |
| stats.getPartGenDBId() + " for given db name " + |
| config.getPartGenDBName()); |
| |
| if (!startVLSN.equals(VLSN.NULL_VLSN)) { |
| |
| final BinaryProtocol.Message msg = protocol.read(namedChannel); |
| final BinaryProtocol.MessageOp op = msg.getOp(); |
| |
| if (op == Protocol.HEARTBEAT) { |
| /* normally, a heartbeat */ |
| stats.setStartVLSN(startVLSN); |
| queueAck(ReplicaOutputThread.HEARTBEAT_ACK); |
| |
| LoggerUtils.info(logger, envImpl, |
| "Subscription " + config.getSubNodeName() + |
| " successfully connect to feeder at " + |
| config.getFeederHost() + ":" + |
| config.getFeederPort() + |
| ", reqVLSN: " + reqVLSN + |
| ", start VLSN: " + startVLSN); |
| |
| return; |
| } else if (op == Protocol.SECURITY_FAILURE_RESPONSE) { |
| /* feeder fails security check in syncup */ |
| final Protocol.SecurityFailureResponse resp = |
| (Protocol.SecurityFailureResponse) msg; |
| |
| LoggerUtils.warning(logger, envImpl, |
| "Receiving security check " + |
| "failure message from feeder " + |
| config.getFeederHost() + |
| ", message: " + resp.getMessage()); |
| |
| throw new ReplicationSecurityException( |
| resp.getMessage(), config.getSubNodeName(), null); |
| } |
| |
| /* unexpected message */ |
| throw new BinaryProtocol.ProtocolException( |
| msg, Protocol.Heartbeat.class); |
| |
| } else { |
| throw new InsufficientLogException((RepImpl) envImpl, reqVLSN); |
| } |
| } catch (IOException e) { |
| throw new ConnectionException("Unable to connect due to " + |
| e.getMessage() + |
| ", will retry later.", |
| config.getSleepBeforeRetryMs(), |
| e); |
| } catch (EnvironmentFailureException e) { |
| logger.warning("Fail to handshake with feeder: " + |
| e.getMessage()); |
| throw e; |
| } catch (BinaryProtocol.ProtocolException e) { |
| final String msg = ("Unable to connect to feeder " + |
| config.getFeederHost() + |
| " due to protocol exception " + |
| e.getMessage()); |
| LoggerUtils.warning(logger, envImpl, msg); |
| throw new InternalException(msg, e); |
| } |
| } |
| |
| /** |
| * Create auxiliary message processing and output thread |
| */ |
| private boolean createAuxThread() { |
| |
| RepImpl repImpl = (RepImpl)envImpl; |
| |
| inputQueue.clear(); |
| outputQueue.clear(); |
| |
| /* start output thread over data channel to send response to feeder */ |
| outputThread = |
| new SubscriptionOutputThread(this, |
| repImpl, outputQueue, protocol, |
| namedChannel.getChannel(), |
| config.getAuthenticator(), stats); |
| /* |
| * output thread can be shutdown and set to null anytime, thus |
| * use a cached copy to ensure it is alive before start it |
| */ |
| final SubscriptionOutputThread cachedOutputThread = outputThread; |
| if (cachedOutputThread != null) { |
| cachedOutputThread.start(); |
| LoggerUtils.fine(logger, envImpl, |
| "output thread created for subscription " + |
| config.getSubNodeName()); |
| /* start thread to consume data in input queue */ |
| messageProcThread = |
| new SubscriptionProcessMessageThread(repImpl, inputQueue, |
| config, |
| stats, logger); |
| messageProcThread.start(); |
| LoggerUtils.fine(logger, envImpl, |
| "message processing thread created for subscription " + |
| config.getSubNodeName()); |
| return true; |
| } else { |
| LoggerUtils.info(logger, envImpl, |
| "subscription " + config.getSubNodeName() + " " + |
| "just shut down, no need to create auxiliary " + |
| "threads"); |
| return false; |
| } |
| } |
| |
| /** |
| * Open a data channel to feeder |
| * |
| * @throws ConnectionException unable to connect due to error and need retry |
| * @throws InternalException fail to handshake with feeder |
| * @throws ReplicationSecurityException if unauthorized to stream |
| * from feeder |
| */ |
| private void openChannel() throws ConnectionException, |
| InternalException, ReplicationSecurityException { |
| |
| RepImpl repImpl = (RepImpl)envImpl; |
| |
| if (repImpl == null) { |
| throw new IllegalStateException("Replication env is unavailable."); |
| } |
| |
| try { |
| DataChannelFactory.ConnectOptions connectOpts = |
| new DataChannelFactory |
| .ConnectOptions() |
| .setTcpNoDelay(config.TCP_NO_DELAY) |
| .setReceiveBufferSize(config.getReceiveBufferSize()) |
| .setOpenTimeout((int) config |
| .getStreamOpenTimeout(TimeUnit.MILLISECONDS)) |
| .setBlocking(config.BLOCKING_MODE_CHANNEL); |
| |
| InetSocketAddress localAddr = repImpl.getHostAddress(); |
| if (localAddr.getHostName().equals( |
| SubscriptionConfig.ANY_ADDRESS.getHostName())) { |
| localAddr = SubscriptionConfig.ANY_ADDRESS; |
| } |
| LoggerUtils.fine(logger, envImpl, |
| "connect to " + config.getInetSocketAddress() + |
| " from local address " + localAddr + |
| " with connect option " + connectOpts); |
| |
| final DataChannel channel = |
| repImpl.getChannelFactory() |
| .connect(config.getInetSocketAddress(), |
| localAddr, |
| connectOpts); |
| |
| ServiceDispatcher.doServiceHandshake(channel, |
| FeederManager.FEEDER_SERVICE, |
| config.getAuthInfo()); |
| LoggerUtils.fine(logger, envImpl, |
| "channel opened to service " + |
| FeederManager.FEEDER_SERVICE + "@" + |
| config.getFeederHost() + |
| "[address: " + config.getFeederHostAddr() + |
| " port: " + config.getFeederPort() + "]"); |
| |
| final int timeoutMs = repImpl.getConfigManager(). |
| getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT); |
| |
| channelTimeoutTask = new ChannelTimeoutTask(new Timer(true)); |
| namedChannel = |
| new NamedChannelWithTimeout(repImpl, logger, channelTimeoutTask, |
| channel, timeoutMs); |
| } catch (IOException cause) { |
| /* retry if unable to connect to feeder */ |
| throw new ConnectionException("Fail to open channel to feeder " + |
| "due to " + cause.getMessage() + |
| ", will retry later", |
| config.getSleepBeforeRetryMs(), |
| cause); |
| } catch (ServiceDispatcher.ServiceConnectFailedException cause) { |
| |
| /* |
| * The feeder may not have established the Feeder Service |
| * as yet. For example, the transition to the master may not have |
| * been completed. |
| */ |
| if (cause.getResponse() == |
| ServiceDispatcher.Response.UNKNOWN_SERVICE) { |
| throw new ConnectionException("Service exception: " + |
| cause.getMessage() + |
| ", wait longer and will retry " + |
| "later", |
| config.getSleepBeforeRetryMs(), |
| cause); |
| } |
| |
| if (cause.getResponse() == |
| ServiceDispatcher.Response.INVALID) { |
| throw new ReplicationSecurityException( |
| "Security check failure:" + cause.getMessage(), |
| config.getSubNodeName(), |
| cause); |
| } |
| |
| throw new InternalException("Subscription " + |
| config.getSubNodeName() + |
| "failed to handshake for service " + |
| FeederManager.FEEDER_SERVICE + |
| " with feeder " + |
| config.getFeederHost(), |
| cause); |
| } |
| LoggerUtils.info(logger, envImpl, |
| "Subscription " + config.getSubNodeName() + |
| " has successfully created a channel to feeder at " + |
| config.getFeederHost() + ":" + config.getFeederPort()); |
| |
| } |
| |
| /** |
| * Internal loop to dequeue message from channel to the feeder, |
| * process shutdown and heartbeat messages, and relay data operations to |
| * the input queue to be consumed by input thread. |
| * |
| * @throws InternalException if error in reading messages from channel or |
| * enqueue message into input queue |
| * @throws GroupShutdownException if receive shutdown message from feeder |
| * @throws ReplicationSecurityException if output thread exits due to |
| * security check failure. In this case the main subscription need to |
| * exit without retry. |
| */ |
| private void loopInternal() throws InternalException, |
| GroupShutdownException, ReplicationSecurityException { |
| |
| RepImpl repImpl = (RepImpl)envImpl; |
| |
| try { |
| |
| LoggerUtils.info(logger, envImpl, |
| "Start reading messages from feeder " + |
| config.getFeederHost() + ":" + |
| config.getFeederPort()); |
| while (!isShutdown()) { |
| |
| checkOutputThread(); |
| |
| BinaryProtocol.Message message = protocol.read(namedChannel); |
| |
| if ((message == null)) { |
| LoggerUtils.info(logger, envImpl, |
| "Subscription " + config.getSubNodeName() + |
| " has nothing stream, exit loop."); |
| return; |
| } |
| |
| assert TestHookExecute.doHookIfSet(exceptionHandlingTestHook, |
| this); |
| |
| stats.getNumMsgReceived().increment(); |
| |
| BinaryProtocol.MessageOp messageOp = message.getOp(); |
| if (messageOp == Protocol.HEARTBEAT) { |
| LoggerUtils.finest(logger, envImpl, |
| "receive heartbeat from " + |
| namedChannel.getNameIdPair()); |
| queueAck(ReplicaOutputThread.HEARTBEAT_ACK); |
| } else if (messageOp == Protocol.SECURITY_FAILURE_RESPONSE) { |
| final Protocol.SecurityFailureResponse resp = |
| (Protocol.SecurityFailureResponse) message; |
| LoggerUtils.fine(logger, envImpl, |
| "Receiving security check " + |
| "failure message from feeder " + |
| config.getFeederHost() + |
| ", message: " + resp.getMessage()); |
| |
| throw new ReplicationSecurityException( |
| resp.getMessage(), config.getSubNodeName(), null); |
| |
| } else if (messageOp == Protocol.SHUTDOWN_REQUEST) { |
| |
| LoggerUtils.info(logger, envImpl, |
| "Receive shutdown request from feeder " + |
| config.getFeederHost() + |
| ", shutdown subscriber"); |
| |
| /* |
| * create a shutdown request, make it in the queue so |
| * client is able to see that in callback, and throw an |
| * exception. |
| * |
| * The message processing thread will exit when seeing a |
| * GroupShutdownException |
| */ |
| Protocol.ShutdownRequest req = |
| (Protocol.ShutdownRequest) message; |
| Exception exp = |
| new GroupShutdownException(logger, repImpl, |
| config.getFeederHost(), |
| stats.getHighVLSN(), |
| req.getShutdownTimeMs()); |
| offer(exp); |
| throw exp; |
| } else { |
| /* a regular data entry message */ |
| offer(message); |
| |
| final long pending = inputQueue.size(); |
| if (pending > stats.getMaxPendingInput().get()) { |
| stats.getMaxPendingInput().set(pending); |
| LoggerUtils.finest(logger, envImpl, |
| "Max pending request log items:" + |
| pending); |
| } |
| } |
| |
| } |
| } catch (IOException e) { |
| /* |
| * connection to feeder dropped, wrap with ConnectionException |
| * and the caller run() method will capture it and re-connect if |
| * necessary |
| */ |
| LoggerUtils.info(logger, envImpl, |
| "Connection to " + config.getFeederHost() + |
| " dropped with error " + e.getMessage()); |
| throw new ConnectionException("Unable to connect due to " + |
| e.getMessage() + |
| ", will retry later.", |
| config.getSleepBeforeRetryMs(), |
| e); |
| } catch (GroupShutdownException | ReplicationSecurityException exp) { |
| /* throw to caller, let caller deal with it */ |
| throw exp; |
| } catch (Exception e) { |
| /* other exception is thrown as IE */ |
| throw new InternalException(e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Checks status of output thread and propagates RSE to main |
| * loop. If output thread exited due to RSE, the main thread need to |
| * capture it to set the subscription status correctly. For other |
| * exceptions, output thread uses the traditional mechanism to notify the |
| * main subscription thread: simply shut down channel. |
| */ |
| private void checkOutputThread() |
| throws InternalException, ReplicationSecurityException { |
| |
| /* |
| * output thread can be shutdown and set to null anytime, thus |
| * use a cached copy to avoid NPE after the first check |
| */ |
| final SubscriptionOutputThread cachedOutputThread = outputThread; |
| |
| /* output thread already gone */ |
| if (cachedOutputThread == null) { |
| /* |
| * if output thread is set to null only when subscription thread |
| * shut down. If we reach here, it means the subscription thread |
| * is shut down right after isShutdown check in loopInternal(). |
| * We simply return and subscription thread would detect the shut |
| * down in next check of isShutdown in loopInternal(). |
| */ |
| LoggerUtils.fine(logger, envImpl, |
| "output thread no longer exists"); |
| return; |
| } |
| |
| if (cachedOutputThread.getException() instanceof |
| ReplicationSecurityException) { |
| final ReplicationSecurityException rse = |
| (ReplicationSecurityException) cachedOutputThread.getException(); |
| LoggerUtils.warning(logger, envImpl, |
| "Output thread exited due to security check " + |
| "failure: " + rse.getMessage()); |
| throw rse; |
| } |
| } |
| |
| /** |
| * Enqueue an ack message in output queue |
| * |
| * @param xid txn id to enqueue |
| * |
| * @throws IOException if fail to queue the msg |
| */ |
| private void queueAck(Long xid) throws IOException { |
| |
| try { |
| outputQueue.put(xid); |
| } catch (InterruptedException ie) { |
| |
| /* |
| * If interrupted while waiting, have the higher levels treat |
| * it like an IOE and exit the thread. |
| */ |
| throw new IOException("Ack I/O interrupted", ie); |
| } |
| } |
| |
| /*-----------------------------------*/ |
| /*- Inner Classes -*/ |
| /*-----------------------------------*/ |
| |
| /** |
| * Subscriber-Feeder handshake config |
| */ |
| private class SubFeederHandshakeConfig |
| implements ReplicaFeederHandshakeConfig { |
| |
| private final NodeType nodeType; |
| private final RepImpl repImpl; |
| SubFeederHandshakeConfig(NodeType nodeType) { |
| this.nodeType = nodeType; |
| repImpl = (RepImpl)envImpl; |
| } |
| |
| public RepImpl getRepImpl() { |
| return repImpl; |
| } |
| |
| public NameIdPair getNameIdPair() { |
| return getRepImpl().getNameIdPair(); |
| } |
| |
| public RepUtils.Clock getClock() { |
| return new RepUtils.Clock(RepImpl.getClockSkewMs()); |
| } |
| |
| public NodeType getNodeType() { |
| return nodeType; |
| } |
| |
| public NamedChannel getNamedChannel() { |
| return namedChannel; |
| } |
| |
| /* create a group impl from group name and group uuid */ |
| public RepGroupImpl getGroup() { |
| |
| RepGroupImpl repGroupImpl = new RepGroupImpl( |
| config.getGroupName(), |
| true, /* unknown group uuid */ |
| repImpl.getCurrentJEVersion()); |
| |
| /* use uuid if specified, otherwise unknown uuid will be used */ |
| if (config.getGroupUUID() != null) { |
| repGroupImpl.setUUID(config.getGroupUUID()); |
| } |
| return repGroupImpl; |
| } |
| } |
| |
| /** |
| * Thrown to indicate that the Subscriber must retry connecting to the same |
| * master, after some period of time. |
| */ |
| private class ConnectionException extends RuntimeException { |
| |
| private final long retrySleepMs; |
| |
| ConnectionException(String message, |
| long retrySleepMs, |
| Throwable cause) { |
| super(message, cause); |
| this.retrySleepMs = retrySleepMs; |
| } |
| |
| /** |
| * Get thread sleep time before retry |
| * |
| * @return sleep time in ms |
| */ |
| long getRetrySleepMs() { |
| return retrySleepMs; |
| } |
| |
| @Override |
| public String getMessage() { |
| final String str = super.getMessage(); |
| return ((str == null || str.isEmpty()) ? |
| "Fail to connect" : str) + ", will retry after " + |
| "sleeping " + retrySleepMs + " ms"; |
| } |
| } |
| |
| /** |
| * Handle exceptions uncaught in SubscriptionThread |
| */ |
| private class SubscriptionThreadExceptionHandler |
| implements UncaughtExceptionHandler { |
| |
| public void uncaughtException(Thread t, Throwable e) { |
| logger.severe("Error { " + e.getMessage() + |
| " } in SubscriptionThread {" + |
| t + " } was uncaught.\nstack trace:\n" + |
| LoggerUtils.getStackTrace(e)); |
| } |
| } |
| } |
| |