| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed.internal; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.deadlock.MessageDependencyMonitor; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; |
| import org.apache.geode.internal.serialization.DSFIDNotFoundException; |
| import org.apache.geode.internal.serialization.UnsupportedSerializationVersionException; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.util.Breadcrumbs; |
| import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * This class processes responses to {@link DistributionMessage}s. It handles a the generic case of |
| * simply waiting for responses from all members. It is intended to be subclassed for special cases. |
| * |
| * <P> |
| * |
| * Note that, unlike the reply processors in versions previous to GemFire 2.1, this reply processor |
| * is kept entirely in JOM. |
| * |
| * <p> |
| * |
| * Recommended usage pattern in subclass... |
| * |
| * <pre> |
| * |
| * public void process(DistributionMessage msg) { |
| * try { |
| * ...custom code for subclass goes here... |
| * } |
| * finally { |
| * super.process(msg); |
| * } |
| * } |
| * |
| * </pre> |
| * |
| * The above usage pattern causes the waitForReplies latch to not be released until after the |
| * message has been processed. In addition, it is guaranteed to be released even if the custom |
| * subclass code throws a runtime exception. |
| * |
| * <p> |
| * |
| * @see MessageWithReply |
| * @since GemFire 2.1 |
| */ |
| public class ReplyProcessor21 implements MembershipListener { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final boolean THROW_EXCEPTION_ON_TIMEOUT = |
| Boolean.getBoolean("ack-threshold-exception"); |
| |
| /** |
| * the ratio by which ack-severe-alert-threshold is lowered when waiting for a BucketRegion |
| * operation |
| */ |
| public static final double PR_SEVERE_ALERT_RATIO; |
| |
| /** All live reply processors in this VM */ |
| @MakeNotStatic |
| protected static final ProcessorKeeper21 keeper = new ProcessorKeeper21(); |
| |
| //////////////////// Instance Methods //////////////////// |
| |
| /** |
| * The members that haven't replied yet |
| * |
| * Concurrency: protected by synchronization of itself |
| */ |
| protected final InternalDistributedMember[] members; |
| |
| /** |
| * Set to true in preWait, set to false in postWait. Used to avoid removing membership listener in |
| * Runnable in postWait if we've called waitForReplies again. |
| */ |
| protected volatile boolean waiting = false; |
| |
| /** |
| * An <code>Exception</code> that occurred when processing a reply |
| * <p> |
| * Since this is set by the executor and read by the initiating thread, this is a volatile. |
| * |
| * @see ReplyMessage#getException |
| */ |
| protected volatile ReplyException exception; |
| |
| /** Have we heard back from everyone? */ |
| private volatile boolean done; |
| |
| private boolean keeperCleanedUp; |
| |
| /** Have we been aborted due to shutdown? */ |
| protected volatile boolean shutdown; |
| |
| /** Semaphore used for wait/notify */ |
| private final StoppableCountDownLatch latch; |
| |
| /** The id of this processor */ |
| protected int processorId; |
| |
| /** |
| * Used to get the ack wait threshold (which might change at runtime), etc. |
| */ |
| protected final InternalDistributedSystem system; |
| |
| /** the distribution manager - if null, get the manager from the system */ |
| protected final DistributionManager dmgr; |
| |
| /** Start time for replyWait stat, in nanos */ |
| long statStart; |
| |
| /** Start time for ack-wait-threshold, in millis */ |
| private long initTime; |
| |
| /** |
| * whether this reply processor should perform severe-alert processing for the message being ack'd |
| */ |
| private boolean severeAlertEnabled; |
| |
| /** |
| * whether the severe-alert timeout has been reset. This can happen if a member we're waiting for |
| * is waiting on a suspect member, for instance. |
| */ |
| private volatile boolean severeAlertTimerReset; |
| |
| /** |
| * whether this reply processor should shorten severe-alert processing due to another vm waiting |
| * on this one. This is a thread-local so that lower level comm layers can tell that the interval |
| * should be shortened |
| */ |
| private static final ThreadLocal<Boolean> severeAlertShorten = |
| ThreadLocal.withInitial(() -> Boolean.FALSE); |
| |
| /** |
| * whether the next replyProcessor for the current thread should perform severe-alert processing |
| */ |
| private static final ThreadLocal<Boolean> forceSevereAlertProcessing = |
| ThreadLocal.withInitial(() -> Boolean.FALSE); |
| |
| ////////////////////// Static Methods ///////////////////// |
| |
| static { |
| String str = System |
| .getProperty(DistributionConfig.GEMFIRE_PREFIX + "ack-severe-alert-reduction-ratio", ".80"); |
| double ratio; |
| try { |
| ratio = Double.parseDouble(str); |
| } catch (NumberFormatException e) { |
| System.err.println( |
| "Unable to parse gemfire.ack-severe-alert-reduction-ratio setting of \"" + str + "\""); |
| ratio = 0.80; |
| } |
| PR_SEVERE_ALERT_RATIO = ratio; |
| } |
| |
| /** |
| * Returns the <code>ReplyProcessor</code> with the given id, or <code>null</code> if it no longer |
| * exists. |
| * |
| * @param processorId The id of the processor to get |
| * |
| * @see #getProcessorId |
| */ |
| public static ReplyProcessor21 getProcessor(int processorId) { |
| return (ReplyProcessor21) keeper.retrieve(processorId); |
| } |
| |
| /////////////////////// Constructors ////////////////////// |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from a single member of a |
| * distributed system. |
| * |
| * @param system the DistributedSystem connection |
| * @param member the member this processor wants a reply from |
| */ |
| public ReplyProcessor21(InternalDistributedSystem system, InternalDistributedMember member) { |
| this(system, Collections.singleton(member)); |
| } |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from a single member of a |
| * distributed system. |
| * |
| * @param system the DistributedSystem connection |
| * @param member the member this processor wants a reply from |
| * @param cancelCriterion optional CancelCriterion to use; will use the DistributionManager if |
| * null |
| */ |
| public ReplyProcessor21(InternalDistributedSystem system, InternalDistributedMember member, |
| CancelCriterion cancelCriterion) { |
| this(system, Collections.singleton(member), cancelCriterion); |
| } |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from a single member of a |
| * distributed system. |
| * |
| * @param dm the DistributionManager to use for messaging and membership |
| * @param member the member this processor wants a reply from |
| */ |
| public ReplyProcessor21(DistributionManager dm, InternalDistributedMember member) { |
| this(dm, Collections.singleton(member)); |
| } |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a |
| * distributed system. Call this method with |
| * {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs |
| * including the one hosted in this VM. |
| * |
| * @param dm the DistributionManager to use for messaging and membership |
| * @param initMembers the Set of members this processor wants replies from |
| */ |
| public ReplyProcessor21(DistributionManager dm, Collection initMembers) { |
| this(dm, initMembers, null); |
| } |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a |
| * distributed system. Call this method with |
| * {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs |
| * including the one hosted in this VM. |
| * |
| * @param dm the DistributionManager to use for messaging and membership |
| * @param initMembers the Set of members this processor wants replies from |
| * @param cancelCriterion optional CancelCriterion to use; will use the dm if null |
| */ |
| public ReplyProcessor21(DistributionManager dm, Collection initMembers, |
| CancelCriterion cancelCriterion) { |
| this(dm, dm.getSystem(), initMembers, cancelCriterion); |
| } |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a |
| * distributed system. Call this method with |
| * {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs |
| * including the one hosted in this VM. |
| * |
| * @param system the DistributedSystem connection |
| * @param initMembers the Set of members this processor wants replies from |
| */ |
| public ReplyProcessor21(InternalDistributedSystem system, Collection initMembers) { |
| this(system.getDistributionManager(), system, initMembers, null); |
| } |
| |
| /** |
| * Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a |
| * distributed system. Call this method with |
| * {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs |
| * including the one hosted in this VM. |
| * |
| * @param system the DistributedSystem connection |
| * @param initMembers the Set of members this processor wants replies from |
| * @param cancelCriterion optional CancelCriterion to use; will use the DistributedSystem's |
| * DistributionManager if null |
| */ |
| public ReplyProcessor21(InternalDistributedSystem system, Collection initMembers, |
| CancelCriterion cancelCriterion) { |
| this(system.getDistributionManager(), system, initMembers, cancelCriterion); |
| } |
| |
| /** |
| * Construct new ReplyProcessor21. |
| * |
| * @param dm the DistributionManager to use for messaging and membership |
| * @param system the DistributedSystem connection |
| * @param initMembers the collection of members this processor wants replies from |
| * @param cancelCriterion optional CancelCriterion to use; will use the dm if null |
| */ |
| private ReplyProcessor21(DistributionManager dm, InternalDistributedSystem system, |
| Collection initMembers, CancelCriterion cancelCriterion) { |
| |
| this(dm, system, initMembers, cancelCriterion, true); |
| } |
| |
| /** |
| * Construct new ReplyProcessor21. |
| * |
| * @param dm the DistributionManager to use for messaging and membership |
| * @param system the DistributedSystem connection |
| * @param initMembers the collection of members this processor wants replies from |
| * @param cancelCriterion optional CancelCriterion to use; will use the dm if null |
| */ |
| protected ReplyProcessor21(DistributionManager dm, InternalDistributedSystem system, |
| Collection initMembers, CancelCriterion cancelCriterion, boolean register) { |
| if (!allowReplyFromSender()) { |
| Assert.assertTrue(initMembers != null, "null initMembers"); |
| Assert.assertTrue(system != null, "null system"); |
| if (dm != null) { |
| Assert.assertTrue(!initMembers.contains(dm.getId()), |
| "dm present in initMembers but reply from sender is not allowed"); |
| } |
| } |
| this.system = system; |
| this.dmgr = dm; |
| if (cancelCriterion == null) { |
| cancelCriterion = dm.getCancelCriterion(); |
| } |
| this.latch = new StoppableCountDownLatch(cancelCriterion, 1); |
| int sz = initMembers.size(); |
| this.members = new InternalDistributedMember[sz]; |
| if (sz > 0) { |
| int i = 0; |
| for (Iterator it = initMembers.iterator(); it.hasNext(); i++) { |
| this.members[i] = (InternalDistributedMember) it.next(); |
| } |
| } |
| this.done = false; |
| this.shutdown = false; |
| this.exception = null; |
| if (register) { |
| register(); |
| } |
| this.keeperCleanedUp = false; |
| this.initTime = System.currentTimeMillis(); |
| } |
| |
| protected int register() { |
| this.processorId = keeper.put(this); |
| return this.processorId; |
| } |
| |
| ///////////////////// Instance Methods ///////////////////// |
| |
| /** |
| * get the distribution manager for this processor. If the distributed system has a distribution |
| * manager, it is used. Otherwise, we expect a distribution manager has been set with |
| * setDistributionManager and we'll use that |
| */ |
| public DistributionManager getDistributionManager() { |
| try { |
| DistributionManager result = this.system.getDistributionManager(); |
| if (result == null) { |
| result = this.dmgr; |
| Assert.assertTrue(result != null, "null DistributionManager"); |
| } |
| return result; |
| } catch (IllegalStateException ex) { |
| // fix for bug 35000 |
| this.system.getCancelCriterion().checkCancelInProgress(null); |
| throw new DistributedSystemDisconnectedException(ex.getMessage()); |
| } |
| } |
| |
| |
| /** |
| * Override and return true if processor should wait for reply from sender. |
| * <p> |
| * NOTE: the communication layer does not reliably support sending a message to oneself, so other |
| * means must be used to execute the message in this VM. Typically you would set the sender of the |
| * message and then invoke its process() method in another thread. |
| */ |
| protected boolean allowReplyFromSender() { |
| return false; |
| } |
| |
| /** |
| * The first time a reply is received that contains an exception, the ReplyProcessor will save |
| * that exception to be passed along to the waiting client thread. By default, any exception |
| * encountered after that will be logged. Subclasses can reimplement this method to disable |
| * logging of multiple exceptions. |
| */ |
| protected boolean logMultipleExceptions() { |
| return true; |
| } |
| |
| |
| /** |
| * Makes note of a reply from a member. If all members have replied, the waiting thread is |
| * signaled. This method can be overridden to provide customized functionality, however the |
| * overriden method should always invoke <code>super.process()</code>. |
| */ |
| public void process(DistributionMessage msg) { |
| process(msg, true); |
| } |
| |
| protected void process(DistributionMessage msg, boolean warn) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} got process({}) from {}", this, msg, msg.getSender()); |
| } |
| if (msg instanceof ReplyMessage) { |
| ReplyException ex = ((ReplyMessage) msg).getException(); |
| if (ex != null) { |
| if (ex.getCause() instanceof DSFIDNotFoundException) { |
| processException(msg, (DSFIDNotFoundException) ex.getCause()); |
| } else { |
| processException(msg, ex); |
| } |
| } |
| } |
| |
| final InternalDistributedMember sender = msg.getSender(); |
| if (!removeMember(sender, false) && warn) { |
| // if the member hasn't left the system, something is wrong |
| final DistributionManager dm = getDistributionManager(); // fix for bug 33253 |
| Set ids = getDistributionManagerIds(); |
| if (ids == null || ids.contains(sender)) { |
| List viewMembers = dm.getViewMembers(); |
| if (system.getConfig().getMcastPort() == 0 // could be using multicast & will get responses |
| // from everyone |
| && (viewMembers == null || viewMembers.contains(sender))) { |
| logger.warn( |
| "Received reply from member {} but was not expecting one. More than one reply may have been received. The reply that was not expected is: {}", |
| new Object[] {sender, msg}); |
| } |
| } |
| } |
| checkIfDone(); |
| } |
| |
| |
| protected synchronized void processException(DistributionMessage msg, ReplyException ex) { |
| processException(ex); |
| } |
| |
| protected synchronized void processException(ReplyException ex) { |
| if (this.exception == null) { // only keep first exception |
| this.exception = ex; |
| |
| } else if (logMultipleExceptions()) { |
| if (!(ex.getCause() instanceof ConcurrentCacheModificationException)) { |
| logger.fatal( |
| "Exception received in ReplyMessage. Only one exception is passed back to caller. This exception is logged only.", |
| ex); |
| } |
| } |
| } |
| |
| /** |
| * Handle a {@link DSFIDNotFoundException} indicating a message type is not implemented on another |
| * server (for example due to different product version). Default implementation logs the |
| * exception as severe and moves on. |
| * |
| * Rationale for default handling: New operations can have caused changes to other newer versioned |
| * GFE JVMs that cannot be reverted. So ignoring exceptions is a conservative way considering such |
| * scenarios. It will be upto individual messages to handle differently by overriding the above |
| * method. |
| */ |
| protected synchronized void processException(DistributionMessage msg, DSFIDNotFoundException ex) { |
| final short versionOrdinal = ex.getProductVersionOrdinal(); |
| String versionStr = null; |
| try { |
| Version version = Version.fromOrdinal(versionOrdinal); |
| versionStr = version.toString(); |
| } catch (UnsupportedSerializationVersionException e) { |
| } |
| if (versionStr == null) { |
| versionStr = "Ordinal=" + versionOrdinal; |
| } |
| logger.fatal(String.format( |
| "Exception received due to missing DSFID %s on remote node %s running version %s.", |
| new Object[] {ex.getUnknownDSFID(), msg.getSender(), versionStr}), ex); |
| } |
| |
| @Override |
| public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) { |
| // Nothing to do - member wasn't sent the operation, anyway. |
| } |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, |
| InternalDistributedMember whoSuspected, String reason) { |
| if (isSevereAlertProcessingEnabled()) { |
| // if we're waiting for the member that initiated suspicion, we don't |
| // want to be hasty about kicking it out of the distributed system |
| synchronized (this.members) { |
| int cells = this.members.length; |
| for (int i = 0; i < cells; i++) { |
| InternalDistributedMember e = this.members[i]; |
| if (e != null && e.equals(whoSuspected)) { |
| this.severeAlertTimerReset = true; |
| } |
| } |
| } // synchronized |
| } |
| } |
| |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| final InternalDistributedMember id, final boolean crashed) { |
| removeMember(id, true); |
| checkIfDone(); |
| } |
| |
| /** |
| * Wait for all expected acks to be returned or an exception to come in. This method will return |
| * whether a) we have received replies from all desired members or b) the necessary conditions |
| * have been met so that we don't need to wait anymore. |
| * |
| * @throws InternalGemFireException if ack-threshold was exceeded and system property |
| * "ack-threshold-exception" is set to true |
| * @throws InterruptedException thrown if the wait is interrupted |
| * @see #canStopWaiting() |
| */ |
| public void waitForReplies() throws InterruptedException, ReplyException { |
| boolean result = waitForReplies(0); |
| Assert.assertTrue(result, "failed but no exception thrown"); |
| } |
| |
| |
| /** |
| * Registers this processor as a membership listener and returns a set of the current members. |
| * |
| * @return a Set of the current members |
| * @since GemFire 5.7 |
| */ |
| protected Set addListenerAndGetMembers() { |
| return getDistributionManager().addMembershipListenerAndGetDistributionManagerIds(this); |
| } |
| |
| /** |
| * Unregisters this processor as a membership listener |
| * |
| * @since GemFire 5.7 |
| */ |
| protected void removeListener() { |
| try { |
| getDistributionManager().removeMembershipListener(this); |
| } catch (DistributedSystemDisconnectedException e) { |
| // ignore |
| } |
| } |
| |
| /** |
| * Returns the set of members that this processor should care about. |
| * |
| * @return a Set of the current members |
| * @since GemFire 5.7 |
| */ |
| protected Set getDistributionManagerIds() { |
| return getDistributionManager().getDistributionManagerIds(); |
| } |
| |
| protected void preWait() { |
| waiting = true; |
| DistributionManager mgr = getDistributionManager(); |
| statStart = mgr.getStats().startReplyWait(); |
| synchronized (this.members) { |
| Set activeMembers = addListenerAndGetMembers(); |
| processActiveMembers(activeMembers); |
| } |
| } |
| |
| /** |
| * perform initial membership processing while under synchronization of this.members |
| * |
| * @param activeMembers the DM's current membership set |
| */ |
| protected void processActiveMembers(Set activeMembers) { |
| for (int i = 0; i < this.members.length; i++) { |
| if (this.members[i] != null) { |
| if (!activeMembers.contains(this.members[i])) { |
| memberDeparted(getDistributionManager(), this.members[i], false); |
| } |
| } |
| } |
| } |
| |
| private void postWait() { |
| waiting = false; |
| removeListener(); |
| final DistributionManager mgr = getDistributionManager(); |
| mgr.getStats().endReplyWait(this.statStart, this.initTime); |
| mgr.getCancelCriterion().checkCancelInProgress(null); |
| } |
| |
| |
| /** |
| * Wait a given number of milliseconds for the expected acks to be received. If <code>msecs</code> |
| * milliseconds pass before all acknowlegdements are received, <code>false</code> is returned. |
| * |
| * @param msecs the number of milliseconds to wait for replies |
| * @throws InterruptedException if interrupted while waiting on latch |
| * @throws ReplyException an exception passed back in reply |
| * @throws InternalGemFireException if ack-threshold was exceeded and system property |
| * "ack-threshold-exception" is set to true |
| * @throws IllegalStateException if the processor is not registered to receive messages |
| * |
| * @return Whether or not we received all of the replies in the given amount of time. |
| */ |
| public boolean waitForReplies(long msecs) throws InterruptedException, ReplyException { |
| return waitForReplies(msecs, getLatch(), true); |
| } |
| |
| public boolean waitForReplies(long msecs, StoppableCountDownLatch latch, boolean doCleanUp) |
| throws InterruptedException, ReplyException { |
| if (this.keeperCleanedUp) { |
| throw new IllegalStateException( |
| "This reply processor has already been removed from the processor keeper"); |
| } |
| boolean result = true; |
| boolean interrupted = Thread.interrupted(); |
| MessageDependencyMonitor.waitingForReply(this); |
| try { |
| // do the interrupted check inside the try so that cleanup is called |
| if (interrupted) |
| throw new InterruptedException(); |
| if (stillWaiting()) { |
| preWait(); |
| try { |
| result = basicWait(msecs, latch); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (doCleanUp) { |
| postWait(); |
| } |
| } |
| } |
| if (this.exception != null) { |
| throw this.exception; |
| } |
| } finally { |
| if (doCleanUp) { |
| try { |
| cleanup(); |
| } finally { |
| if (interrupted) |
| throw new InterruptedException(); |
| } |
| } |
| MessageDependencyMonitor.doneWaiting(this); |
| } |
| return result; |
| } |
| |
| /** |
| * basicWait occurs after preWait and before postWait. Attempts to acquire the latch are made. |
| * |
| * @param msecs the number of milliseconds to wait for replies |
| * @return whether or not we received all of the replies in the given amount of time |
| */ |
| private boolean basicWait(long msecs, StoppableCountDownLatch latch) |
| throws InterruptedException, ReplyException { |
| if (Thread.interrupted()) { |
| throw new InterruptedException(); |
| } |
| |
| if (stillWaiting()) { |
| long timeout = getAckWaitThreshold() * 1000L; |
| long timeSoFar = System.currentTimeMillis() - this.initTime; |
| final long severeAlertTimeout = getAckSevereAlertThresholdMS(); |
| // only start SUSPECT processing if severe alerts are enabled |
| final boolean doSuspectProcessing = |
| isSevereAlertProcessingEnabled() && (severeAlertTimeout > 0); |
| if (timeout <= 0) { |
| timeout = Long.MAX_VALUE; |
| } |
| if (msecs == 0) { |
| boolean timedOut = false; |
| if (timeout <= timeSoFar + 1) { |
| timedOut = !latch.await(10); |
| } |
| if (timedOut || !latch.await(timeout - timeSoFar - 1)) { |
| this.dmgr.getCancelCriterion().checkCancelInProgress(null); |
| |
| timeout(doSuspectProcessing, false); |
| |
| // If ack-severe-alert-threshold has been set, we now |
| // wait for that period of time and then force the non-responding |
| // members from the system. Then we wait indefinitely |
| if (doSuspectProcessing) { |
| boolean wasNotUnlatched; |
| do { |
| this.severeAlertTimerReset = false; // retry if this gets set by suspect processing |
| // (splitbrain requirement) |
| wasNotUnlatched = !latch.await(severeAlertTimeout); |
| } while (wasNotUnlatched && this.severeAlertTimerReset); |
| if (wasNotUnlatched) { |
| this.dmgr.getCancelCriterion().checkCancelInProgress(null); |
| timeout(false, true); |
| |
| long suspectProcessingErrorAlertTimeout = severeAlertTimeout * 3; |
| if (!latch.await(suspectProcessingErrorAlertTimeout)) { |
| long now = System.currentTimeMillis(); |
| long totalTimeElapsed = now - this.initTime; |
| |
| String waitingOnMembers; |
| synchronized (members) { |
| waitingOnMembers = Arrays.toString(members); |
| } |
| logger.fatal("An additional " + suspectProcessingErrorAlertTimeout |
| + " milliseconds have elapsed while waiting for replies. Total of " |
| + totalTimeElapsed + " milliseconds elapsed (init time:" + this.initTime |
| + ", now: " + now + ") Waiting for members: " + waitingOnMembers); |
| |
| // for consistency, we must now wait indefinitely for a membership view |
| // that ejects the removed members |
| latch.await(); |
| } |
| } |
| } else { |
| latch.await(); |
| } |
| // Give an info message since timeout gave a warning. |
| logger.info("{} wait for replies completed", shortName()); |
| } |
| } else if (msecs > timeout) { |
| if (!latch.await(timeout)) { |
| timeout(doSuspectProcessing, false); |
| // after timeout alert, wait remaining time |
| if (!latch.await(msecs - timeout)) { |
| logger.info("wait for replies timing out after {} seconds", |
| Long.valueOf(msecs / 1000)); |
| return false; |
| } |
| // Give an info message since timeout gave a warning. |
| logger.info("{} wait for replies completed", shortName()); |
| } |
| } else { |
| if (!latch.await(msecs)) { |
| return false; |
| } |
| } |
| } |
| Assert.assertTrue(latch != this.latch || !stillWaiting(), this); |
| if (stopBecauseOfExceptions()) { |
| throw this.exception; |
| } |
| return true; |
| } |
| |
| /** |
| * Wait a given number of milliseconds for the expected acks to be received. If <code>msecs</code> |
| * milliseconds pass before all acknowlegdements are received, <code>false</code> is returned. |
| * <p> |
| * Thread interruptions will be ignored while waiting. If interruption occurred while in this |
| * method, the current thread's interrupt flag will be true, but InterruptedException will not be |
| * thrown. |
| * |
| * @param p_msecs the number of milliseconds to wait for replies, zero will be interpreted as |
| * Long.MAX_VALUE |
| * |
| * @throws ReplyException an exception passed back in reply |
| * |
| * @throws InternalGemFireException if ack-threshold was exceeded and system property |
| * "ack-threshold-exception" is set to true |
| * @throws IllegalStateException if the processor is not registered to receive replies |
| */ |
| public boolean waitForRepliesUninterruptibly(long p_msecs) throws ReplyException { |
| return waitForRepliesUninterruptibly(p_msecs, getLatch(), true); |
| } |
| |
| public boolean waitForRepliesUninterruptibly(long p_msecs, StoppableCountDownLatch latch, |
| boolean doCleanUp) throws ReplyException { |
| if (this.keeperCleanedUp) { |
| throw new IllegalStateException( |
| "This reply processor has already been removed from the processor keeper"); |
| } |
| long msecs = p_msecs; // don't overwrite parameter |
| boolean result = true; |
| MessageDependencyMonitor.waitingForReply(this); |
| try { |
| if (stillWaiting()) { |
| preWait(); |
| try { |
| while (true) { |
| // cancellation check |
| this.dmgr.getCancelCriterion().checkCancelInProgress(null); |
| |
| long startWaitTime = System.currentTimeMillis(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| result = basicWait(msecs, latch); |
| break; |
| } catch (InterruptedException e) { |
| interrupted = true; // keep looping |
| this.dmgr.getCancelCriterion().checkCancelInProgress(e); |
| if (msecs > 0) { |
| final long interruptTime = System.currentTimeMillis(); |
| msecs -= interruptTime - startWaitTime; |
| if (msecs <= 0) { |
| msecs = 1; |
| result = false; |
| break; |
| } |
| } |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // while |
| } finally { |
| if (doCleanUp) { |
| postWait(); |
| } |
| } |
| } // stillWaiting |
| if (this.exception != null) { |
| throw this.exception; |
| } |
| } finally { |
| if (doCleanUp) { |
| cleanup(); |
| } |
| MessageDependencyMonitor.doneWaiting(this); |
| } |
| return result; |
| } |
| |
| /** |
| * Used to cleanup resources allocated by the processor after we are done using it. |
| * |
| * @since GemFire 5.1 |
| */ |
| public void cleanup() { |
| if (!this.keeperCleanedUp) { |
| this.keeperCleanedUp = true; |
| keeper.remove(getProcessorId()); |
| } |
| } |
| |
| /** |
| * Wait for the expected acks to be received. |
| * <p> |
| * Thread interruptions will be ignored while waiting. If interruption occurred while in this |
| * method, the current thread's interrupt flag will be true, but InterruptedException will not be |
| * thrown. |
| * |
| * @throws ReplyException an exception passed back in reply |
| * |
| * @throws InternalGemFireException if ack-threshold was exceeded and system property |
| * "ack-threshold-exception" is set to true |
| */ |
| public void waitForRepliesUninterruptibly() throws ReplyException { |
| waitForRepliesUninterruptibly(0); |
| } |
| |
| /** |
| * Returns the id of this reply processor. This id is often sent in messages, so that reply |
| * messages know which processor to notify. |
| * |
| * @see #getProcessor |
| */ |
| public int getProcessorId() { |
| return processorId; |
| } |
| |
| /** |
| * Returns whether or not this reply processor can stop waiting for replies. This method can be |
| * overridden to allow the waiter to be notified before all responses have been received. This is |
| * useful when we are waiting for a value from any member, or if we can stop waiting if a remote |
| * exception occurred. By default, this method returns <code>false</code>. |
| */ |
| protected boolean canStopWaiting() { |
| return false; |
| } |
| |
| /** |
| * We're still waiting if there is any member still left in the set and an exception hasn't been |
| * returned from anyone yet. |
| * |
| * @return true if we are still waiting for a response |
| */ |
| protected boolean stillWaiting() { |
| if (shutdown) { |
| // Create the exception here, so that the call stack reflects the |
| // failed computation. If you set the exception in onShutdown, |
| // the resulting stack is not of interest. |
| ReplyException re = new ReplyException(new DistributedSystemDisconnectedException( |
| "aborted due to shutdown")); |
| this.exception = re; |
| return false; |
| } |
| |
| // Optional override by subclass |
| if (canStopWaiting()) { |
| return false; |
| } |
| |
| // If an error has occurred, we're done. |
| if (stopBecauseOfExceptions()) { |
| return false; |
| } |
| |
| // All else is good, keep waiting if we have members to wait on. |
| return numMembers() > 0; |
| } |
| |
| /** |
| * Control of reply processor waiting behavior in the face of exceptions. |
| * |
| * @since GemFire 5.7 |
| * @return true to stop waiting when exceptions are present |
| */ |
| protected boolean stopBecauseOfExceptions() { |
| return exception != null; |
| } |
| |
| /** |
| * If this processor is not waiting for any more replies, then the waiting thread will be |
| * notified. |
| */ |
| protected void checkIfDone() { |
| boolean finished = !stillWaiting(); |
| if (finished) { |
| finished(); |
| } |
| } |
| |
| /** do processing required when finished */ |
| protected void finished() { |
| boolean isDone = false; |
| synchronized (this) { |
| if (!this.done) { // make sure only called once |
| this.done = true; |
| isDone = true; |
| // getSync().release(); // notifies threads in waitForReplies |
| getLatch().countDown(); |
| } |
| } // synchronized |
| |
| // ensure that postFinish is invoked only once |
| if (isDone) { |
| postFinish(); |
| } |
| } |
| |
| /** |
| * Override to execute custom code after {@link #finished}. This will be invoked only once for |
| * each ReplyProcessor21. |
| */ |
| protected void postFinish() {} |
| |
| protected String shortName() { |
| String base = this.getClass().getName(); |
| int dot = base.lastIndexOf('.'); |
| if (dot == -1) { |
| return base; |
| } |
| return base.substring(dot + 1); |
| } |
| |
| @Override |
| public String toString() { |
| return "<" + shortName() + " " + this.getProcessorId() + " waiting for " + numMembers() |
| + " replies" + (exception == null ? "" : (" exception: " + exception)) + " from " |
| + membersToString() + ">"; |
| } |
| |
| /** |
| * |
| * @param m the member to be removed |
| * @param departed true if it is removed due to membership |
| * @return true if it was in our list of members |
| */ |
| protected boolean removeMember(InternalDistributedMember m, boolean departed) { |
| boolean removed = false; |
| synchronized (this.members) { |
| int cells = this.members.length; |
| for (int i = 0; i < cells; i++) { |
| InternalDistributedMember e = this.members[i]; |
| if (e != null && e.equals(m)) { |
| this.members[i] = null; |
| // we may be expecting more than one response from a member. so, |
| // unless the member left, we only scrub the first occurrence of |
| // the member id from the responder list |
| if (!departed) { |
| return true; |
| } |
| removed = true; |
| } |
| } |
| } // synchronized |
| return removed; |
| } |
| |
| protected int numMembers() { |
| int sz = 0; |
| synchronized (this.members) { |
| int cells = this.members.length; |
| for (int i = 0; i < cells; i++) { |
| if (this.members[i] != null) { |
| sz++; |
| } |
| } |
| } // synchronized |
| return sz; |
| } |
| |
| protected boolean waitingOnMember(InternalDistributedMember id) { |
| synchronized (this.members) { |
| int cells = this.members.length; |
| for (int i = 0; i < cells; i++) { |
| if (id.equals(this.members[i])) { |
| return true; |
| } |
| } |
| return false; |
| } // synchronized |
| } |
| |
| |
| /** |
| * Return the time in sec to wait before sending an alert while waiting for ack replies. Note that |
| * the ack wait threshold may change at runtime, so we have to consult the system every time. |
| */ |
| protected int getAckWaitThreshold() { |
| return this.system.getConfig().getAckWaitThreshold(); |
| } |
| |
| /** |
| * Return the time in sec to wait before removing unresponsive members from the distributed |
| * system. This period starts after the ack-wait-threshold period has elapsed |
| */ |
| protected int getSevereAlertThreshold() { |
| return this.system.getConfig().getAckSevereAlertThreshold(); |
| } |
| |
| protected boolean processTimeout() { |
| return true; |
| } |
| |
| /** |
| * process a wait-timeout. Usually suspectThem would be used in the first timeout, followed by a |
| * subsequent use of disconnectThem |
| * |
| * @param suspectThem whether to ask the membership manager to suspect the unresponsive members |
| * @param severeAlert whether to ask the membership manager to disconnect the unresponseive |
| * members |
| */ |
| private void timeout(boolean suspectThem, boolean severeAlert) { |
| |
| if (!this.processTimeout()) |
| return; |
| |
| Set activeMembers = getDistributionManagerIds(); |
| |
| // an alert that will show up in the console |
| long timeout = getAckWaitThreshold(); |
| final Object[] msgArgs = |
| new Object[] {Long.valueOf(timeout + (severeAlert ? getSevereAlertThreshold() : 0)), this, |
| getDistributionManager().getId(), activeMembers}; |
| final String msg = |
| "%s seconds have elapsed while waiting for replies: %s on %s whose current membership list is: [%s]"; |
| if (severeAlert) { |
| logger.fatal(String.format(msg, msgArgs)); |
| } else { |
| logger.warn(String.format(msg, msgArgs)); |
| } |
| msgArgs[3] = "(omitted)"; |
| Breadcrumbs.setProblem(msg, msgArgs); |
| |
| // Increment the stat |
| getDistributionManager().getStats().incReplyTimeouts(); |
| |
| final Set<InternalDistributedMember> suspectMembers; |
| if (suspectThem || severeAlert) { |
| suspectMembers = new HashSet(); |
| } else { |
| suspectMembers = null; |
| } |
| |
| synchronized (this.members) { |
| for (int i = 0; i < this.members.length; i++) { |
| if (this.members[i] != null) { |
| if (!activeMembers.contains(this.members[i])) { |
| logger.warn( |
| "View no longer has {} as an active member, so we will no longer wait for it.", |
| this.members[i]); |
| memberDeparted(getDistributionManager(), this.members[i], false); |
| } else { |
| if (suspectMembers != null) { |
| suspectMembers.add(this.members[i]); |
| } |
| } |
| } |
| } |
| } |
| |
| if (THROW_EXCEPTION_ON_TIMEOUT) { |
| // init the cause to be a TimeoutException so catchers can determine cause |
| TimeoutException cause = |
| new TimeoutException("Timed out waiting for ACKS."); |
| throw new InternalGemFireException( |
| String.format( |
| "%s seconds have elapsed while waiting for replies: %s on %s whose current membership list is: [%s]", |
| msgArgs), |
| cause); |
| } else if (suspectThem) { |
| if (suspectMembers != null && suspectMembers.size() > 0) { |
| getDistributionManager().getMembershipManager().suspectMembers( |
| (Set<DistributedMember>) (Set<?>) suspectMembers, |
| "Failed to respond within ack-wait-threshold"); |
| } |
| } |
| } |
| |
| |
| protected String membersToString() { |
| StringBuffer sb = new StringBuffer("["); |
| boolean first = true; |
| synchronized (this.members) { |
| for (int i = 0; i < this.members.length; i++) { |
| InternalDistributedMember member = this.members[i]; |
| if (member != null) { |
| if (first) { |
| first = false; |
| } else { |
| sb.append(", "); |
| } |
| sb.append(member); |
| } |
| } |
| } |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| /** |
| * You must be synchronize on the result of this function in order to examine its contents. |
| * |
| * @return the members that have not replied |
| */ |
| protected InternalDistributedMember[] getMembers() { |
| return this.members; |
| } |
| |
| private StoppableCountDownLatch getLatch() { |
| return this.latch; |
| } |
| |
| /** |
| * Enables severe alert processing in this reply processor, if it has also been enabled in the |
| * distribution config. Severe alerts are issued if acks are not received within |
| * ack-wait-threshold plus ack-severe-alert-threshold seconds. |
| */ |
| public void enableSevereAlertProcessing() { |
| this.severeAlertEnabled = true; |
| } |
| |
| /** |
| * Set a shorter ack-severe-alert-threshold than normal |
| * |
| * @param flag whether to shorten the time or not |
| */ |
| public static void setShortSevereAlertProcessing(boolean flag) { |
| severeAlertShorten.set(flag); |
| } |
| |
| public static boolean getShortSevereAlertProcessing() { |
| return severeAlertShorten.get(); |
| } |
| |
| /** |
| * Force reply-waits in the current thread to perform severe-alert processing |
| */ |
| public static void forceSevereAlertProcessing() { |
| forceSevereAlertProcessing.set(Boolean.TRUE); |
| } |
| |
| /** |
| * Reset the forcing of severe-alert processing for the current thread |
| */ |
| public static void unforceSevereAlertProcessing() { |
| forceSevereAlertProcessing.set(Boolean.FALSE); |
| } |
| |
| /** |
| * Returns true if forceSevereAlertProcessing has been used to force the next reply-wait in the |
| * current thread to perform severe-alert processing. |
| */ |
| public static boolean isSevereAlertProcessingForced() { |
| return forceSevereAlertProcessing.get(); |
| } |
| |
| |
| /** |
| * Get the ack-severe-alert-threshold, in milliseconds, with shortening applied |
| */ |
| public long getAckSevereAlertThresholdMS() { |
| long disconnectTimeout = getSevereAlertThreshold() * 1000L; |
| if (disconnectTimeout > 0 && severeAlertShorten.get()) { |
| disconnectTimeout = (long) (disconnectTimeout * PR_SEVERE_ALERT_RATIO); |
| } |
| return disconnectTimeout; |
| } |
| |
| public boolean isSevereAlertProcessingEnabled() { |
| return this.severeAlertEnabled || isSevereAlertProcessingForced(); |
| } |
| |
| |
| private static final ThreadLocal<Integer> messageId = new ThreadLocal<>(); |
| |
| private static final Integer VOID_RPID = 0; |
| |
| /** |
| * Used by messages to store the id for the current message into a thread local. This allows the |
| * comms layer to still send replies even when it can't deserialize a message. |
| */ |
| public static void setMessageRPId(int id) { |
| messageId.set(id); |
| } |
| |
| public static void initMessageRPId() { |
| messageId.set(VOID_RPID); |
| } |
| |
| public static void clearMessageRPId() { |
| messageId.set(VOID_RPID); |
| // messageId.remove(); change to use remove when we drop 1.4 support |
| } |
| |
| /** |
| * Returns the reply processor id for the message currently being read. Returns 0 if no id exists. |
| */ |
| public static int getMessageRPId() { |
| int result = 0; |
| Object v = messageId.get(); |
| if (v != null) { |
| result = (Integer) v; |
| } |
| return result; |
| } |
| |
| /** |
| * To fix the hang of 42951 make sure that everything waiting on this processor are told to quit |
| * waiting |
| * and tell them why. |
| * |
| * @param ex the reason the reply processor is being canceled |
| */ |
| public void cancel(InternalDistributedMember sender, RuntimeException ex) { |
| processException(new ReplyException("Unexpected exception while processing reply message", ex)); |
| removeMember(sender, false); |
| checkIfDone(); |
| } |
| } |