blob: 1563c51f1f9993d10a275879b02849fb66a605a2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache.locks;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockGrantor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXCommitMessage;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
* Sends <code>TXOriginatorRecoveryMessage</code> to all participants of a given transaction when
* the originator departs. The participants delay reply until the commit has finished. Once all
* replies have come in, the transaction lock (<code>TXLockId</code>) will be released.
public class TXOriginatorRecoveryProcessor extends ReplyProcessor21 {
private static final Logger logger = LogService.getLogger();
static void sendMessage(Set members, InternalDistributedMember originator, TXLockId txLockId,
DLockGrantor grantor, DistributionManager dm) {
TXOriginatorRecoveryProcessor processor = new TXOriginatorRecoveryProcessor(dm, members);
TXOriginatorRecoveryMessage msg = new TXOriginatorRecoveryMessage();
msg.processorId = processor.getProcessorId();
msg.txLockId = txLockId;
// send msg to all members EXCEPT this member...
Set recipients = new HashSet(members);
if (logger.isDebugEnabled()) {
logger.debug("Sending TXOriginatorRecoveryMessage: {}", msg);
// process msg and reply directly if this VM is a participant...
if (members.contains(dm.getId())) {
if (msg.getSender() == null)
msg.process((ClusterDistributionManager) dm);
// keep waiting even if interrupted
// for() loop removed for bug 36983 - you can't loop on waitForReplies()
try {
} catch (ReplyException e) {
// release txLockId...
if (logger.isDebugEnabled()) {
logger.debug("TXOriginatorRecoveryProcessor releasing: {}", txLockId);
// dtls.release(txLockId);
try {
grantor.releaseLockBatch(txLockId, originator);
} catch (InterruptedException e) {
// -------------------------------------------------------------------------
// Constructors
// -------------------------------------------------------------------------
/** Creates a new instance of TXOriginatorRecoveryProcessor */
private TXOriginatorRecoveryProcessor(DistributionManager dm, Set members) {
super(dm, members);
protected boolean allowReplyFromSender() {
return true;
* IllegalStateException is an anticipated reply exception. Receiving multiple replies with this
* exception is normal.
protected boolean logMultipleExceptions() {
return false;
// -------------------------------------------------------------------------
// TXOriginatorRecoveryMessage
// -------------------------------------------------------------------------
public static class TXOriginatorRecoveryMessage extends PooledDistributionMessage
implements MessageWithReply {
/** The transaction lock for which the originator orphaned */
protected TXLockId txLockId;
/** The reply processor to route replies to */
protected int processorId;
public int getProcessorId() {
return this.processorId;
protected void process(final ClusterDistributionManager dm) {
final TXOriginatorRecoveryMessage msg = this;
try {
dm.getExecutors().getWaitingThreadPool().execute(new Runnable() {
public void run() {
processTXOriginatorRecoveryMessage(dm, msg);
} catch (RejectedExecutionException e) {
logger.debug("Rejected processing of <{}>", msg, e);
protected void processTXOriginatorRecoveryMessage(final ClusterDistributionManager dm,
final TXOriginatorRecoveryMessage msg) {
ReplyException replyException = null;"[processTXOriginatorRecoveryMessage]");
try {
// Wait for the transaction associated with this lockid to finish processing
TXCommitMessage.getTracker().waitToProcess(msg.txLockId, dm);
// when the grantor receives reply it will release txLock...
* TODO: implement waitToReleaseTXLockId here testTXOriginatorRecoveryProcessor in
* org.apache.geode.internal.cache.locks.TXLockServiceTest should be expanded upon also...
} catch (RuntimeException t) {
logger.warn("[processTXOriginatorRecoveryMessage] throwable:",
// if (replyException == null) (can only be null)
replyException = new ReplyException(t);
// else {
// log.warning(LocalizedStrings.TXOriginatorRecoveryProcessor_MORE_THAN_ONE_EXCEPTION_THROWN_IN__0,
// this, t);
// }
// }
// catch (VirtualMachineError err) {
// SystemFailure.initiateFailure(err);
// // If this ever returns, rethrow the error. We're poisoned
// // now, so don't let this thread continue.
// throw err;
// }
// catch (Throwable t) {
// // Whenever you catch Error or Throwable, you must also
// // catch VirtualMachineError (see above). However, there is
// // _still_ a possibility that you are dealing with a cascading
// // error condition, so you also need to check to see if the JVM
// // is still usable:
// SystemFailure.checkFailure();
// if (replyException == null) {
// replyException = new ReplyException(t);
// }
// }
// catch (VirtualMachineError err) {
// SystemFailure.initiateFailure(err);
// // If this ever returns, rethrow the error. We're poisoned
// // now, so don't let this thread continue.
// throw err;
// }
// catch (Throwable t) {
// // Whenever you catch Error or Throwable, you must also
// // catch VirtualMachineError (see above). However, there is
// // _still_ a possibility that you are dealing with a cascading
// // error condition, so you also need to check to see if the JVM
// // is still usable:
// SystemFailure.checkFailure();
// if (replyException == null) {
// replyException = new ReplyException(t);
// }
// }
} finally {
TXOriginatorRecoveryReplyMessage replyMsg = new TXOriginatorRecoveryReplyMessage();
replyMsg.txLockId = txLockId;
if (getSender().equals(dm.getId())) {
// process in-line in this VM"[processTXOriginatorRecoveryMessage] locally process reply");
} else {"[processTXOriginatorRecoveryMessage] send reply");
public int getDSFID() {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.txLockId = (TXLockId) DataSerializer.readObject(in);
this.processorId = in.readInt();
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeObject(this.txLockId, out);
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("TXOriginatorRecoveryMessage (txLockId='");
buff.append("'; processorId=");
return buff.toString();
// -------------------------------------------------------------------------
// TXOriginatorRecoveryReplyMessage
// -------------------------------------------------------------------------
public static class TXOriginatorRecoveryReplyMessage extends ReplyMessage {
/** The transaction lock for which the originator orphaned */
protected TXLockId txLockId; // only for the toString
public int getDSFID() {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
public String toString() {
return "TXOriginatorRecoveryReplyMessage (processorId=" + super.processorId + "; txLockId="
+ this.txLockId + "; sender=" + getSender() + ")";