blob: 658b0ba72529275464942b9b5211827094373aa9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.deadlock.MessageDependencyMonitor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.serialization.DSFIDNotFoundException;
import org.apache.geode.internal.serialization.UnsupportedSerializationVersionException;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.util.Breadcrumbs;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* This class processes responses to {@link DistributionMessage}s. It handles a the generic case of
* simply waiting for responses from all members. It is intended to be subclassed for special cases.
*
* <P>
*
* Note that, unlike the reply processors in versions previous to GemFire 2.1, this reply processor
* is kept entirely in JOM.
*
* <p>
*
* Recommended usage pattern in subclass...
*
* <pre>
*
* public void process(DistributionMessage msg) {
* try {
* ...custom code for subclass goes here...
* }
* finally {
* super.process(msg);
* }
* }
*
* </pre>
*
* The above usage pattern causes the waitForReplies latch to not be released until after the
* message has been processed. In addition, it is guaranteed to be released even if the custom
* subclass code throws a runtime exception.
*
* <p>
*
* @see MessageWithReply
* @since GemFire 2.1
*/
public class ReplyProcessor21 implements MembershipListener {
private static final Logger logger = LogService.getLogger();
public static final boolean THROW_EXCEPTION_ON_TIMEOUT =
Boolean.getBoolean("ack-threshold-exception");
/**
* the ratio by which ack-severe-alert-threshold is lowered when waiting for a BucketRegion
* operation
*/
public static final double PR_SEVERE_ALERT_RATIO;
/** All live reply processors in this VM */
@MakeNotStatic
protected static final ProcessorKeeper21 keeper = new ProcessorKeeper21();
//////////////////// Instance Methods ////////////////////
/**
* The members that haven't replied yet
*
* Concurrency: protected by synchronization of itself
*/
protected final InternalDistributedMember[] members;
/**
* Set to true in preWait, set to false in postWait. Used to avoid removing membership listener in
* Runnable in postWait if we've called waitForReplies again.
*/
protected volatile boolean waiting = false;
/**
* An <code>Exception</code> that occurred when processing a reply
* <p>
* Since this is set by the executor and read by the initiating thread, this is a volatile.
*
* @see ReplyMessage#getException
*/
protected volatile ReplyException exception;
/** Have we heard back from everyone? */
private volatile boolean done;
private boolean keeperCleanedUp;
/** Have we been aborted due to shutdown? */
protected volatile boolean shutdown;
/** Semaphore used for wait/notify */
private final StoppableCountDownLatch latch;
/** The id of this processor */
protected int processorId;
/**
* Used to get the ack wait threshold (which might change at runtime), etc.
*/
protected final InternalDistributedSystem system;
/** the distribution manager - if null, get the manager from the system */
protected final DistributionManager dmgr;
/** Start time for replyWait stat, in nanos */
long statStart;
/** Start time for ack-wait-threshold, in millis */
private long initTime;
/**
* whether this reply processor should perform severe-alert processing for the message being ack'd
*/
private boolean severeAlertEnabled;
/**
* whether the severe-alert timeout has been reset. This can happen if a member we're waiting for
* is waiting on a suspect member, for instance.
*/
private volatile boolean severeAlertTimerReset;
/**
* whether this reply processor should shorten severe-alert processing due to another vm waiting
* on this one. This is a thread-local so that lower level comm layers can tell that the interval
* should be shortened
*/
private static final ThreadLocal<Boolean> severeAlertShorten =
ThreadLocal.withInitial(() -> Boolean.FALSE);
/**
* whether the next replyProcessor for the current thread should perform severe-alert processing
*/
private static final ThreadLocal<Boolean> forceSevereAlertProcessing =
ThreadLocal.withInitial(() -> Boolean.FALSE);
////////////////////// Static Methods /////////////////////
static {
String str = System
.getProperty(DistributionConfig.GEMFIRE_PREFIX + "ack-severe-alert-reduction-ratio", ".80");
double ratio;
try {
ratio = Double.parseDouble(str);
} catch (NumberFormatException e) {
System.err.println(
"Unable to parse gemfire.ack-severe-alert-reduction-ratio setting of \"" + str + "\"");
ratio = 0.80;
}
PR_SEVERE_ALERT_RATIO = ratio;
}
/**
* Returns the <code>ReplyProcessor</code> with the given id, or <code>null</code> if it no longer
* exists.
*
* @param processorId The id of the processor to get
*
* @see #getProcessorId
*/
public static ReplyProcessor21 getProcessor(int processorId) {
return (ReplyProcessor21) keeper.retrieve(processorId);
}
/////////////////////// Constructors //////////////////////
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from a single member of a
* distributed system.
*
* @param system the DistributedSystem connection
* @param member the member this processor wants a reply from
*/
public ReplyProcessor21(InternalDistributedSystem system, InternalDistributedMember member) {
this(system, Collections.singleton(member));
}
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from a single member of a
* distributed system.
*
* @param system the DistributedSystem connection
* @param member the member this processor wants a reply from
* @param cancelCriterion optional CancelCriterion to use; will use the DistributionManager if
* null
*/
public ReplyProcessor21(InternalDistributedSystem system, InternalDistributedMember member,
CancelCriterion cancelCriterion) {
this(system, Collections.singleton(member), cancelCriterion);
}
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from a single member of a
* distributed system.
*
* @param dm the DistributionManager to use for messaging and membership
* @param member the member this processor wants a reply from
*/
public ReplyProcessor21(DistributionManager dm, InternalDistributedMember member) {
this(dm, Collections.singleton(member));
}
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a
* distributed system. Call this method with
* {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs
* including the one hosted in this VM.
*
* @param dm the DistributionManager to use for messaging and membership
* @param initMembers the Set of members this processor wants replies from
*/
public ReplyProcessor21(DistributionManager dm, Collection initMembers) {
this(dm, initMembers, null);
}
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a
* distributed system. Call this method with
* {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs
* including the one hosted in this VM.
*
* @param dm the DistributionManager to use for messaging and membership
* @param initMembers the Set of members this processor wants replies from
* @param cancelCriterion optional CancelCriterion to use; will use the dm if null
*/
public ReplyProcessor21(DistributionManager dm, Collection initMembers,
CancelCriterion cancelCriterion) {
this(dm, dm.getSystem(), initMembers, cancelCriterion);
}
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a
* distributed system. Call this method with
* {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs
* including the one hosted in this VM.
*
* @param system the DistributedSystem connection
* @param initMembers the Set of members this processor wants replies from
*/
public ReplyProcessor21(InternalDistributedSystem system, Collection initMembers) {
this(system.getDistributionManager(), system, initMembers, null);
}
/**
* Creates a new <code>ReplyProcessor</code> that wants replies from some number of members of a
* distributed system. Call this method with
* {@link ClusterDistributionManager#getDistributionManagerIds} if you want replies from all DMs
* including the one hosted in this VM.
*
* @param system the DistributedSystem connection
* @param initMembers the Set of members this processor wants replies from
* @param cancelCriterion optional CancelCriterion to use; will use the DistributedSystem's
* DistributionManager if null
*/
public ReplyProcessor21(InternalDistributedSystem system, Collection initMembers,
CancelCriterion cancelCriterion) {
this(system.getDistributionManager(), system, initMembers, cancelCriterion);
}
/**
* Construct new ReplyProcessor21.
*
* @param dm the DistributionManager to use for messaging and membership
* @param system the DistributedSystem connection
* @param initMembers the collection of members this processor wants replies from
* @param cancelCriterion optional CancelCriterion to use; will use the dm if null
*/
private ReplyProcessor21(DistributionManager dm, InternalDistributedSystem system,
Collection initMembers, CancelCriterion cancelCriterion) {
this(dm, system, initMembers, cancelCriterion, true);
}
/**
* Construct new ReplyProcessor21.
*
* @param dm the DistributionManager to use for messaging and membership
* @param system the DistributedSystem connection
* @param initMembers the collection of members this processor wants replies from
* @param cancelCriterion optional CancelCriterion to use; will use the dm if null
*/
protected ReplyProcessor21(DistributionManager dm, InternalDistributedSystem system,
Collection initMembers, CancelCriterion cancelCriterion, boolean register) {
if (!allowReplyFromSender()) {
Assert.assertTrue(initMembers != null, "null initMembers");
Assert.assertTrue(system != null, "null system");
if (dm != null) {
Assert.assertTrue(!initMembers.contains(dm.getId()),
"dm present in initMembers but reply from sender is not allowed");
}
}
this.system = system;
this.dmgr = dm;
if (cancelCriterion == null) {
cancelCriterion = dm.getCancelCriterion();
}
this.latch = new StoppableCountDownLatch(cancelCriterion, 1);
int sz = initMembers.size();
this.members = new InternalDistributedMember[sz];
if (sz > 0) {
int i = 0;
for (Iterator it = initMembers.iterator(); it.hasNext(); i++) {
this.members[i] = (InternalDistributedMember) it.next();
}
}
this.done = false;
this.shutdown = false;
this.exception = null;
if (register) {
register();
}
this.keeperCleanedUp = false;
this.initTime = System.currentTimeMillis();
}
protected int register() {
this.processorId = keeper.put(this);
return this.processorId;
}
///////////////////// Instance Methods /////////////////////
/**
* get the distribution manager for this processor. If the distributed system has a distribution
* manager, it is used. Otherwise, we expect a distribution manager has been set with
* setDistributionManager and we'll use that
*/
public DistributionManager getDistributionManager() {
try {
DistributionManager result = this.system.getDistributionManager();
if (result == null) {
result = this.dmgr;
Assert.assertTrue(result != null, "null DistributionManager");
}
return result;
} catch (IllegalStateException ex) {
// fix for bug 35000
this.system.getCancelCriterion().checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(ex.getMessage());
}
}
/**
* Override and return true if processor should wait for reply from sender.
* <p>
* NOTE: the communication layer does not reliably support sending a message to oneself, so other
* means must be used to execute the message in this VM. Typically you would set the sender of the
* message and then invoke its process() method in another thread.
*/
protected boolean allowReplyFromSender() {
return false;
}
/**
* The first time a reply is received that contains an exception, the ReplyProcessor will save
* that exception to be passed along to the waiting client thread. By default, any exception
* encountered after that will be logged. Subclasses can reimplement this method to disable
* logging of multiple exceptions.
*/
protected boolean logMultipleExceptions() {
return true;
}
/**
* Makes note of a reply from a member. If all members have replied, the waiting thread is
* signaled. This method can be overridden to provide customized functionality, however the
* overriden method should always invoke <code>super.process()</code>.
*/
public void process(DistributionMessage msg) {
process(msg, true);
}
protected void process(DistributionMessage msg, boolean warn) {
if (logger.isDebugEnabled()) {
logger.debug("{} got process({}) from {}", this, msg, msg.getSender());
}
if (msg instanceof ReplyMessage) {
ReplyException ex = ((ReplyMessage) msg).getException();
if (ex != null) {
if (ex.getCause() instanceof DSFIDNotFoundException) {
processException(msg, (DSFIDNotFoundException) ex.getCause());
} else {
processException(msg, ex);
}
}
}
final InternalDistributedMember sender = msg.getSender();
if (!removeMember(sender, false) && warn) {
// if the member hasn't left the system, something is wrong
final DistributionManager dm = getDistributionManager(); // fix for bug 33253
Set ids = getDistributionManagerIds();
if (ids == null || ids.contains(sender)) {
List viewMembers = dm.getViewMembers();
if (system.getConfig().getMcastPort() == 0 // could be using multicast & will get responses
// from everyone
&& (viewMembers == null || viewMembers.contains(sender))) {
logger.warn(
"Received reply from member {} but was not expecting one. More than one reply may have been received. The reply that was not expected is: {}",
new Object[] {sender, msg});
}
}
}
checkIfDone();
}
protected synchronized void processException(DistributionMessage msg, ReplyException ex) {
processException(ex);
}
protected synchronized void processException(ReplyException ex) {
if (this.exception == null) { // only keep first exception
this.exception = ex;
} else if (logMultipleExceptions()) {
if (!(ex.getCause() instanceof ConcurrentCacheModificationException)) {
logger.fatal(
"Exception received in ReplyMessage. Only one exception is passed back to caller. This exception is logged only.",
ex);
}
}
}
/**
* Handle a {@link DSFIDNotFoundException} indicating a message type is not implemented on another
* server (for example due to different product version). Default implementation logs the
* exception as severe and moves on.
*
* Rationale for default handling: New operations can have caused changes to other newer versioned
* GFE JVMs that cannot be reverted. So ignoring exceptions is a conservative way considering such
* scenarios. It will be upto individual messages to handle differently by overriding the above
* method.
*/
protected synchronized void processException(DistributionMessage msg, DSFIDNotFoundException ex) {
final short versionOrdinal = ex.getProductVersionOrdinal();
String versionStr = null;
try {
Version version = Version.fromOrdinal(versionOrdinal);
versionStr = version.toString();
} catch (UnsupportedSerializationVersionException e) {
}
if (versionStr == null) {
versionStr = "Ordinal=" + versionOrdinal;
}
logger.fatal(String.format(
"Exception received due to missing DSFID %s on remote node %s running version %s.",
new Object[] {ex.getUnknownDSFID(), msg.getSender(), versionStr}), ex);
}
@Override
public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {
// Nothing to do - member wasn't sent the operation, anyway.
}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {
if (isSevereAlertProcessingEnabled()) {
// if we're waiting for the member that initiated suspicion, we don't
// want to be hasty about kicking it out of the distributed system
synchronized (this.members) {
int cells = this.members.length;
for (int i = 0; i < cells; i++) {
InternalDistributedMember e = this.members[i];
if (e != null && e.equals(whoSuspected)) {
this.severeAlertTimerReset = true;
}
}
} // synchronized
}
}
@Override
public void memberDeparted(DistributionManager distributionManager,
final InternalDistributedMember id, final boolean crashed) {
removeMember(id, true);
checkIfDone();
}
/**
* Wait for all expected acks to be returned or an exception to come in. This method will return
* whether a) we have received replies from all desired members or b) the necessary conditions
* have been met so that we don't need to wait anymore.
*
* @throws InternalGemFireException if ack-threshold was exceeded and system property
* "ack-threshold-exception" is set to true
* @throws InterruptedException thrown if the wait is interrupted
* @see #canStopWaiting()
*/
public void waitForReplies() throws InterruptedException, ReplyException {
boolean result = waitForReplies(0);
Assert.assertTrue(result, "failed but no exception thrown");
}
/**
* Registers this processor as a membership listener and returns a set of the current members.
*
* @return a Set of the current members
* @since GemFire 5.7
*/
protected Set addListenerAndGetMembers() {
return getDistributionManager().addMembershipListenerAndGetDistributionManagerIds(this);
}
/**
* Unregisters this processor as a membership listener
*
* @since GemFire 5.7
*/
protected void removeListener() {
try {
getDistributionManager().removeMembershipListener(this);
} catch (DistributedSystemDisconnectedException e) {
// ignore
}
}
/**
* Returns the set of members that this processor should care about.
*
* @return a Set of the current members
* @since GemFire 5.7
*/
protected Set getDistributionManagerIds() {
return getDistributionManager().getDistributionManagerIds();
}
protected void preWait() {
waiting = true;
DistributionManager mgr = getDistributionManager();
statStart = mgr.getStats().startReplyWait();
synchronized (this.members) {
Set activeMembers = addListenerAndGetMembers();
processActiveMembers(activeMembers);
}
}
/**
* perform initial membership processing while under synchronization of this.members
*
* @param activeMembers the DM's current membership set
*/
protected void processActiveMembers(Set activeMembers) {
for (int i = 0; i < this.members.length; i++) {
if (this.members[i] != null) {
if (!activeMembers.contains(this.members[i])) {
memberDeparted(getDistributionManager(), this.members[i], false);
}
}
}
}
private void postWait() {
waiting = false;
removeListener();
final DistributionManager mgr = getDistributionManager();
mgr.getStats().endReplyWait(this.statStart, this.initTime);
mgr.getCancelCriterion().checkCancelInProgress(null);
}
/**
* Wait a given number of milliseconds for the expected acks to be received. If <code>msecs</code>
* milliseconds pass before all acknowlegdements are received, <code>false</code> is returned.
*
* @param msecs the number of milliseconds to wait for replies
* @throws InterruptedException if interrupted while waiting on latch
* @throws ReplyException an exception passed back in reply
* @throws InternalGemFireException if ack-threshold was exceeded and system property
* "ack-threshold-exception" is set to true
* @throws IllegalStateException if the processor is not registered to receive messages
*
* @return Whether or not we received all of the replies in the given amount of time.
*/
public boolean waitForReplies(long msecs) throws InterruptedException, ReplyException {
return waitForReplies(msecs, getLatch(), true);
}
public boolean waitForReplies(long msecs, StoppableCountDownLatch latch, boolean doCleanUp)
throws InterruptedException, ReplyException {
if (this.keeperCleanedUp) {
throw new IllegalStateException(
"This reply processor has already been removed from the processor keeper");
}
boolean result = true;
boolean interrupted = Thread.interrupted();
MessageDependencyMonitor.waitingForReply(this);
try {
// do the interrupted check inside the try so that cleanup is called
if (interrupted)
throw new InterruptedException();
if (stillWaiting()) {
preWait();
try {
result = basicWait(msecs, latch);
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (doCleanUp) {
postWait();
}
}
}
if (this.exception != null) {
throw this.exception;
}
} finally {
if (doCleanUp) {
try {
cleanup();
} finally {
if (interrupted)
throw new InterruptedException();
}
}
MessageDependencyMonitor.doneWaiting(this);
}
return result;
}
/**
* basicWait occurs after preWait and before postWait. Attempts to acquire the latch are made.
*
* @param msecs the number of milliseconds to wait for replies
* @return whether or not we received all of the replies in the given amount of time
*/
private boolean basicWait(long msecs, StoppableCountDownLatch latch)
throws InterruptedException, ReplyException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (stillWaiting()) {
long timeout = getAckWaitThreshold() * 1000L;
long timeSoFar = System.currentTimeMillis() - this.initTime;
final long severeAlertTimeout = getAckSevereAlertThresholdMS();
// only start SUSPECT processing if severe alerts are enabled
final boolean doSuspectProcessing =
isSevereAlertProcessingEnabled() && (severeAlertTimeout > 0);
if (timeout <= 0) {
timeout = Long.MAX_VALUE;
}
if (msecs == 0) {
boolean timedOut = false;
if (timeout <= timeSoFar + 1) {
timedOut = !latch.await(10);
}
if (timedOut || !latch.await(timeout - timeSoFar - 1)) {
this.dmgr.getCancelCriterion().checkCancelInProgress(null);
timeout(doSuspectProcessing, false);
// If ack-severe-alert-threshold has been set, we now
// wait for that period of time and then force the non-responding
// members from the system. Then we wait indefinitely
if (doSuspectProcessing) {
boolean wasNotUnlatched;
do {
this.severeAlertTimerReset = false; // retry if this gets set by suspect processing
// (splitbrain requirement)
wasNotUnlatched = !latch.await(severeAlertTimeout);
} while (wasNotUnlatched && this.severeAlertTimerReset);
if (wasNotUnlatched) {
this.dmgr.getCancelCriterion().checkCancelInProgress(null);
timeout(false, true);
long suspectProcessingErrorAlertTimeout = severeAlertTimeout * 3;
if (!latch.await(suspectProcessingErrorAlertTimeout)) {
long now = System.currentTimeMillis();
long totalTimeElapsed = now - this.initTime;
String waitingOnMembers;
synchronized (members) {
waitingOnMembers = Arrays.toString(members);
}
logger.fatal("An additional " + suspectProcessingErrorAlertTimeout
+ " milliseconds have elapsed while waiting for replies. Total of "
+ totalTimeElapsed + " milliseconds elapsed (init time:" + this.initTime
+ ", now: " + now + ") Waiting for members: " + waitingOnMembers);
// for consistency, we must now wait indefinitely for a membership view
// that ejects the removed members
latch.await();
}
}
} else {
latch.await();
}
// Give an info message since timeout gave a warning.
logger.info("{} wait for replies completed", shortName());
}
} else if (msecs > timeout) {
if (!latch.await(timeout)) {
timeout(doSuspectProcessing, false);
// after timeout alert, wait remaining time
if (!latch.await(msecs - timeout)) {
logger.info("wait for replies timing out after {} seconds",
Long.valueOf(msecs / 1000));
return false;
}
// Give an info message since timeout gave a warning.
logger.info("{} wait for replies completed", shortName());
}
} else {
if (!latch.await(msecs)) {
return false;
}
}
}
Assert.assertTrue(latch != this.latch || !stillWaiting(), this);
if (stopBecauseOfExceptions()) {
throw this.exception;
}
return true;
}
/**
* Wait a given number of milliseconds for the expected acks to be received. If <code>msecs</code>
* milliseconds pass before all acknowlegdements are received, <code>false</code> is returned.
* <p>
* Thread interruptions will be ignored while waiting. If interruption occurred while in this
* method, the current thread's interrupt flag will be true, but InterruptedException will not be
* thrown.
*
* @param p_msecs the number of milliseconds to wait for replies, zero will be interpreted as
* Long.MAX_VALUE
*
* @throws ReplyException an exception passed back in reply
*
* @throws InternalGemFireException if ack-threshold was exceeded and system property
* "ack-threshold-exception" is set to true
* @throws IllegalStateException if the processor is not registered to receive replies
*/
public boolean waitForRepliesUninterruptibly(long p_msecs) throws ReplyException {
return waitForRepliesUninterruptibly(p_msecs, getLatch(), true);
}
public boolean waitForRepliesUninterruptibly(long p_msecs, StoppableCountDownLatch latch,
boolean doCleanUp) throws ReplyException {
if (this.keeperCleanedUp) {
throw new IllegalStateException(
"This reply processor has already been removed from the processor keeper");
}
long msecs = p_msecs; // don't overwrite parameter
boolean result = true;
MessageDependencyMonitor.waitingForReply(this);
try {
if (stillWaiting()) {
preWait();
try {
while (true) {
// cancellation check
this.dmgr.getCancelCriterion().checkCancelInProgress(null);
long startWaitTime = System.currentTimeMillis();
boolean interrupted = Thread.interrupted();
try {
result = basicWait(msecs, latch);
break;
} catch (InterruptedException e) {
interrupted = true; // keep looping
this.dmgr.getCancelCriterion().checkCancelInProgress(e);
if (msecs > 0) {
final long interruptTime = System.currentTimeMillis();
msecs -= interruptTime - startWaitTime;
if (msecs <= 0) {
msecs = 1;
result = false;
break;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
} finally {
if (doCleanUp) {
postWait();
}
}
} // stillWaiting
if (this.exception != null) {
throw this.exception;
}
} finally {
if (doCleanUp) {
cleanup();
}
MessageDependencyMonitor.doneWaiting(this);
}
return result;
}
/**
* Used to cleanup resources allocated by the processor after we are done using it.
*
* @since GemFire 5.1
*/
public void cleanup() {
if (!this.keeperCleanedUp) {
this.keeperCleanedUp = true;
keeper.remove(getProcessorId());
}
}
/**
* Wait for the expected acks to be received.
* <p>
* Thread interruptions will be ignored while waiting. If interruption occurred while in this
* method, the current thread's interrupt flag will be true, but InterruptedException will not be
* thrown.
*
* @throws ReplyException an exception passed back in reply
*
* @throws InternalGemFireException if ack-threshold was exceeded and system property
* "ack-threshold-exception" is set to true
*/
public void waitForRepliesUninterruptibly() throws ReplyException {
waitForRepliesUninterruptibly(0);
}
/**
* Returns the id of this reply processor. This id is often sent in messages, so that reply
* messages know which processor to notify.
*
* @see #getProcessor
*/
public int getProcessorId() {
return processorId;
}
/**
* Returns whether or not this reply processor can stop waiting for replies. This method can be
* overridden to allow the waiter to be notified before all responses have been received. This is
* useful when we are waiting for a value from any member, or if we can stop waiting if a remote
* exception occurred. By default, this method returns <code>false</code>.
*/
protected boolean canStopWaiting() {
return false;
}
/**
* We're still waiting if there is any member still left in the set and an exception hasn't been
* returned from anyone yet.
*
* @return true if we are still waiting for a response
*/
protected boolean stillWaiting() {
if (shutdown) {
// Create the exception here, so that the call stack reflects the
// failed computation. If you set the exception in onShutdown,
// the resulting stack is not of interest.
ReplyException re = new ReplyException(new DistributedSystemDisconnectedException(
"aborted due to shutdown"));
this.exception = re;
return false;
}
// Optional override by subclass
if (canStopWaiting()) {
return false;
}
// If an error has occurred, we're done.
if (stopBecauseOfExceptions()) {
return false;
}
// All else is good, keep waiting if we have members to wait on.
return numMembers() > 0;
}
/**
* Control of reply processor waiting behavior in the face of exceptions.
*
* @since GemFire 5.7
* @return true to stop waiting when exceptions are present
*/
protected boolean stopBecauseOfExceptions() {
return exception != null;
}
/**
* If this processor is not waiting for any more replies, then the waiting thread will be
* notified.
*/
protected void checkIfDone() {
boolean finished = !stillWaiting();
if (finished) {
finished();
}
}
/** do processing required when finished */
protected void finished() {
boolean isDone = false;
synchronized (this) {
if (!this.done) { // make sure only called once
this.done = true;
isDone = true;
// getSync().release(); // notifies threads in waitForReplies
getLatch().countDown();
}
} // synchronized
// ensure that postFinish is invoked only once
if (isDone) {
postFinish();
}
}
/**
* Override to execute custom code after {@link #finished}. This will be invoked only once for
* each ReplyProcessor21.
*/
protected void postFinish() {}
protected String shortName() {
String base = this.getClass().getName();
int dot = base.lastIndexOf('.');
if (dot == -1) {
return base;
}
return base.substring(dot + 1);
}
@Override
public String toString() {
return "<" + shortName() + " " + this.getProcessorId() + " waiting for " + numMembers()
+ " replies" + (exception == null ? "" : (" exception: " + exception)) + " from "
+ membersToString() + ">";
}
/**
*
* @param m the member to be removed
* @param departed true if it is removed due to membership
* @return true if it was in our list of members
*/
protected boolean removeMember(InternalDistributedMember m, boolean departed) {
boolean removed = false;
synchronized (this.members) {
int cells = this.members.length;
for (int i = 0; i < cells; i++) {
InternalDistributedMember e = this.members[i];
if (e != null && e.equals(m)) {
this.members[i] = null;
// we may be expecting more than one response from a member. so,
// unless the member left, we only scrub the first occurrence of
// the member id from the responder list
if (!departed) {
return true;
}
removed = true;
}
}
} // synchronized
return removed;
}
protected int numMembers() {
int sz = 0;
synchronized (this.members) {
int cells = this.members.length;
for (int i = 0; i < cells; i++) {
if (this.members[i] != null) {
sz++;
}
}
} // synchronized
return sz;
}
protected boolean waitingOnMember(InternalDistributedMember id) {
synchronized (this.members) {
int cells = this.members.length;
for (int i = 0; i < cells; i++) {
if (id.equals(this.members[i])) {
return true;
}
}
return false;
} // synchronized
}
/**
* Return the time in sec to wait before sending an alert while waiting for ack replies. Note that
* the ack wait threshold may change at runtime, so we have to consult the system every time.
*/
protected int getAckWaitThreshold() {
return this.system.getConfig().getAckWaitThreshold();
}
/**
* Return the time in sec to wait before removing unresponsive members from the distributed
* system. This period starts after the ack-wait-threshold period has elapsed
*/
protected int getSevereAlertThreshold() {
return this.system.getConfig().getAckSevereAlertThreshold();
}
protected boolean processTimeout() {
return true;
}
/**
* process a wait-timeout. Usually suspectThem would be used in the first timeout, followed by a
* subsequent use of disconnectThem
*
* @param suspectThem whether to ask the membership manager to suspect the unresponsive members
* @param severeAlert whether to ask the membership manager to disconnect the unresponseive
* members
*/
private void timeout(boolean suspectThem, boolean severeAlert) {
if (!this.processTimeout())
return;
Set activeMembers = getDistributionManagerIds();
// an alert that will show up in the console
long timeout = getAckWaitThreshold();
final Object[] msgArgs =
new Object[] {Long.valueOf(timeout + (severeAlert ? getSevereAlertThreshold() : 0)), this,
getDistributionManager().getId(), activeMembers};
final String msg =
"%s seconds have elapsed while waiting for replies: %s on %s whose current membership list is: [%s]";
if (severeAlert) {
logger.fatal(String.format(msg, msgArgs));
} else {
logger.warn(String.format(msg, msgArgs));
}
msgArgs[3] = "(omitted)";
Breadcrumbs.setProblem(msg, msgArgs);
// Increment the stat
getDistributionManager().getStats().incReplyTimeouts();
final Set<InternalDistributedMember> suspectMembers;
if (suspectThem || severeAlert) {
suspectMembers = new HashSet();
} else {
suspectMembers = null;
}
synchronized (this.members) {
for (int i = 0; i < this.members.length; i++) {
if (this.members[i] != null) {
if (!activeMembers.contains(this.members[i])) {
logger.warn(
"View no longer has {} as an active member, so we will no longer wait for it.",
this.members[i]);
memberDeparted(getDistributionManager(), this.members[i], false);
} else {
if (suspectMembers != null) {
suspectMembers.add(this.members[i]);
}
}
}
}
}
if (THROW_EXCEPTION_ON_TIMEOUT) {
// init the cause to be a TimeoutException so catchers can determine cause
TimeoutException cause =
new TimeoutException("Timed out waiting for ACKS.");
throw new InternalGemFireException(
String.format(
"%s seconds have elapsed while waiting for replies: %s on %s whose current membership list is: [%s]",
msgArgs),
cause);
} else if (suspectThem) {
if (suspectMembers != null && suspectMembers.size() > 0) {
getDistributionManager().getMembershipManager().suspectMembers(
(Set<DistributedMember>) (Set<?>) suspectMembers,
"Failed to respond within ack-wait-threshold");
}
}
}
protected String membersToString() {
StringBuffer sb = new StringBuffer("[");
boolean first = true;
synchronized (this.members) {
for (int i = 0; i < this.members.length; i++) {
InternalDistributedMember member = this.members[i];
if (member != null) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(member);
}
}
}
sb.append("]");
return sb.toString();
}
/**
* You must be synchronize on the result of this function in order to examine its contents.
*
* @return the members that have not replied
*/
protected InternalDistributedMember[] getMembers() {
return this.members;
}
private StoppableCountDownLatch getLatch() {
return this.latch;
}
/**
* Enables severe alert processing in this reply processor, if it has also been enabled in the
* distribution config. Severe alerts are issued if acks are not received within
* ack-wait-threshold plus ack-severe-alert-threshold seconds.
*/
public void enableSevereAlertProcessing() {
this.severeAlertEnabled = true;
}
/**
* Set a shorter ack-severe-alert-threshold than normal
*
* @param flag whether to shorten the time or not
*/
public static void setShortSevereAlertProcessing(boolean flag) {
severeAlertShorten.set(flag);
}
public static boolean getShortSevereAlertProcessing() {
return severeAlertShorten.get();
}
/**
* Force reply-waits in the current thread to perform severe-alert processing
*/
public static void forceSevereAlertProcessing() {
forceSevereAlertProcessing.set(Boolean.TRUE);
}
/**
* Reset the forcing of severe-alert processing for the current thread
*/
public static void unforceSevereAlertProcessing() {
forceSevereAlertProcessing.set(Boolean.FALSE);
}
/**
* Returns true if forceSevereAlertProcessing has been used to force the next reply-wait in the
* current thread to perform severe-alert processing.
*/
public static boolean isSevereAlertProcessingForced() {
return forceSevereAlertProcessing.get();
}
/**
* Get the ack-severe-alert-threshold, in milliseconds, with shortening applied
*/
public long getAckSevereAlertThresholdMS() {
long disconnectTimeout = getSevereAlertThreshold() * 1000L;
if (disconnectTimeout > 0 && severeAlertShorten.get()) {
disconnectTimeout = (long) (disconnectTimeout * PR_SEVERE_ALERT_RATIO);
}
return disconnectTimeout;
}
public boolean isSevereAlertProcessingEnabled() {
return this.severeAlertEnabled || isSevereAlertProcessingForced();
}
private static final ThreadLocal<Integer> messageId = new ThreadLocal<>();
private static final Integer VOID_RPID = 0;
/**
* Used by messages to store the id for the current message into a thread local. This allows the
* comms layer to still send replies even when it can't deserialize a message.
*/
public static void setMessageRPId(int id) {
messageId.set(id);
}
public static void initMessageRPId() {
messageId.set(VOID_RPID);
}
public static void clearMessageRPId() {
messageId.set(VOID_RPID);
// messageId.remove(); change to use remove when we drop 1.4 support
}
/**
* Returns the reply processor id for the message currently being read. Returns 0 if no id exists.
*/
public static int getMessageRPId() {
int result = 0;
Object v = messageId.get();
if (v != null) {
result = (Integer) v;
}
return result;
}
/**
* To fix the hang of 42951 make sure that everything waiting on this processor are told to quit
* waiting
* and tell them why.
*
* @param ex the reason the reply processor is being canceled
*/
public void cancel(InternalDistributedMember sender, RuntimeException ex) {
processException(new ReplyException("Unexpected exception while processing reply message", ex));
removeMember(sender, false);
checkIfDone();
}
}