blob: c38f2ffe7f6e509ad3d80b0a34cb5228f95d80f6 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.MessageWithReply;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.ReplySender;
import com.gemstone.gemfire.distributed.internal.SerialDistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
/**
*
* @author sbawaska
*
*/
public abstract class TXMessage extends SerialDistributionMessage
implements MessageWithReply, TransactionMessage {
private static final Logger logger = LogService.getLogger();
private int processorId;
private int txUniqId;
private InternalDistributedMember txMemberId = null;
public TXMessage() {
}
public TXMessage(int txUniqueId, InternalDistributedMember onBehalfOfMember, ReplyProcessor21 processor) {
this.txUniqId = txUniqueId;
this.txMemberId = onBehalfOfMember;
this.processorId = processor == null ? 0 : processor.getProcessorId();
}
public boolean canStartRemoteTransaction() {
return false;
}
@Override
protected void process(final DistributionManager dm) {
Throwable thr = null;
boolean sendReply = true;
try {
if (logger.isDebugEnabled()) {
logger.debug("processing {}", this);
}
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if(checkCacheClosing(cache) || checkDSClosing(cache.getDistributedSystem())) {
thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
return;
}
TXManagerImpl txMgr = cache.getTXMgr();
TXStateProxy tx = null;
try {
assert this.txUniqId != TXManagerImpl.NOTX;
TXId txId = new TXId(getMemberToMasqueradeAs(), this.txUniqId);
tx = txMgr.masqueradeAs(this);
sendReply = operateOnTx(txId,dm);
} finally {
txMgr.unmasquerade(tx);
}
} catch (CommitConflictException cce) {
thr = cce;
} catch (DistributedSystemDisconnectedException se) {
sendReply = false;
if (logger.isDebugEnabled()) {
logger.debug("shutdown caught, abandoning message: " + se);
}
} catch (RegionDestroyedException rde) {
thr = new ForceReattemptException(LocalizedStrings.PartitionMessage_REGION_IS_DESTROYED_IN_0.toLocalizedString(dm.getDistributionManagerId()), rde);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
if (sendReply) {
thr = t;
}
} finally {
ReplySender rs = getReplySender(dm);
if (sendReply && (this.processorId != 0 || (rs != dm))) {
ReplyException rex = null;
if (thr != null) {
rex = new ReplyException(thr);
}
sendReply(getSender(), this.processorId, dm, rex);
}
}
}
private boolean checkDSClosing(InternalDistributedSystem distributedSystem) {
return distributedSystem == null || distributedSystem.isDisconnecting();
}
private boolean checkCacheClosing(GemFireCacheImpl cache) {
return cache == null || cache.isClosed();
}
private void sendReply(InternalDistributedMember recipient, int processorId2,
DistributionManager dm, ReplyException rex) {
ReplyMessage.send(recipient, processorId2, rex, getReplySender(dm));
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
String className = getClass().getName();
// className.substring(className.lastIndexOf('.', className.lastIndexOf('.') - 1) + 1); // partition.<foo> more generic version
buff.append(className.substring(className.indexOf(PartitionMessage.PN_TOKEN) + PartitionMessage.PN_TOKEN.length())); // partition.<foo>
buff.append("(txId=").append(this.txUniqId)
.append("; txMbr=").append(this.txMemberId)
.append("; sender=").append(getSender())
.append("; processorId=").append(this.processorId);
appendFields(buff);
buff.append(")");
return buff.toString();
}
public void appendFields(StringBuffer buff) {
}
/**
* Transaction operations override this method to do actual work
* @param txId The transaction Id to operate on
* @return true if {@link TXMessage} should send a reply false otherwise
*/
protected abstract boolean operateOnTx(TXId txId,DistributionManager dm) throws RemoteOperationException;
public int getTXUniqId() {
return this.txUniqId;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(this.processorId);
out.writeInt(this.txUniqId);
DataSerializer.writeObject(this.txMemberId,out);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.processorId = in.readInt();
this.txUniqId = in.readInt();
this.txMemberId = DataSerializer.readObject(in);
}
public final InternalDistributedMember getMemberToMasqueradeAs() {
if(txMemberId==null) {
return getSender();
}
return txMemberId;
}
@Override
public int getProcessorId() {
return this.processorId;
}
public InternalDistributedMember getTXOriginatorClient() {
return this.txMemberId;
}
@Override
public boolean canParticipateInTransaction() {
return true;
}
@Override
public boolean isTransactionDistributed() {
return false;
}
}