blob: 7719d801a23f497bff1bc1b18f45dc4518c15156 [file] [log] [blame]
package com.gemstone.gemfire.internal.cache;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
* DistPeerTXStateStub lives on the transaction coordinator for a distributed
* transaction
* </br>1. It forwards TX operations to primary or a selected replica (in case of RR)
* for each op
* </br>2.It also records those transactional operations in order to send those to
* secondaries/replicas(in one batch) at commit time.
*
* @author shirishd
*
*/
public final class DistPeerTXStateStub extends PeerTXStateStub implements
DistTXCoordinatorInterface {
private ArrayList<DistTxEntryEvent> primaryTransactionalOperations = null;
private ArrayList<DistTxEntryEvent> secondaryTransactionalOperations = null;
private DistTXPrecommitMessage precommitDistTxMsg = null;
private DistTXCommitMessage commitDistTxMsg = null;
private DistTXRollbackMessage rollbackDistTxMsg = null;
private DM dm = null;
public DistPeerTXStateStub(TXStateProxy stateProxy, DistributedMember target,
InternalDistributedMember onBehalfOfClient) {
super(stateProxy, target, onBehalfOfClient);
primaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
secondaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
}
@Override
public void precommit() throws CommitConflictException {
if (logger.isDebugEnabled()) {
logger.debug("DistPeerTXStateStub.precommit target=" + target
+ " ,primaryTransactionalOperations="
+ primaryTransactionalOperations
+ " ,secondaryTransactionalOperations="
+ secondaryTransactionalOperations);
}
assert target != null;
assert primaryTransactionalOperations != null
|| secondaryTransactionalOperations != null;
// [DISTTX] TODO Handle Stats
this.precommitDistTxMsg
.setSecondaryTransactionalOperations(secondaryTransactionalOperations);
final Set<DistributedMember> recipients = Collections.singleton(target);
this.precommitDistTxMsg.setRecipients(recipients);
this.dm.putOutgoing(this.precommitDistTxMsg);
this.precommitDistTxMsg.resetRecipients();
// TODO [DISTTX] any precommit hooks
}
@Override
public void commit() throws CommitConflictException {
if (logger.isDebugEnabled()) {
logger.debug("DistPeerTXStateStub.commit target=" + target);
}
// [DISTTX] TODO Handle Stats
this.dm.getStats().incSentCommitMessages(1L);
final Set<DistributedMember> recipients = Collections.singleton(target);
this.commitDistTxMsg.setRecipients(recipients);
this.dm.putOutgoing(this.commitDistTxMsg);
this.commitDistTxMsg.resetRecipients();
}
@Override
public void rollback() {
if (logger.isDebugEnabled()) {
logger.debug("DistPeerTXStateStub.rollback target=" + target);
}
// [DISTTX] TODO Handle callbacks
// if (this.internalAfterSendRollback != null) {
// this.internalAfterSendRollback.run();
// }
final Set<DistributedMember> recipients = Collections.singleton(target);
this.rollbackDistTxMsg.setRecipients(recipients);
this.dm.putOutgoing(this.rollbackDistTxMsg);
this.rollbackDistTxMsg.resetRecipients();
}
@Override
public final ArrayList<DistTxEntryEvent> getPrimaryTransactionalOperations()
throws UnsupportedOperationInTransactionException {
return primaryTransactionalOperations;
}
private final void addPrimaryTransactionalOperations(DistTxEntryEvent dtop) {
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove these
logger.debug("DistPeerTXStateStub.addPrimaryTransactionalOperations add "
+ dtop + " ,stub before=" + this);
}
primaryTransactionalOperations.add(dtop);
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove these
logger
.debug("DistPeerTXStateStub.addPrimaryTransactionalOperations stub after add = "
+ this);
}
}
@Override
public final void addSecondaryTransactionalOperations(DistTxEntryEvent dtop)
throws UnsupportedOperationInTransactionException {
secondaryTransactionalOperations.add(dtop);
}
@Override
protected void cleanup() {
super.cleanup();
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.TXStateStub#putEntry(com.gemstone.gemfire
* .internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object,
* boolean, long, boolean)
*/
@Override
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove throwable
logger.debug("DistPeerTXStateStub.putEntry "
+ event.getKeyInfo().getKey(), new Throwable());
}
boolean returnValue = super.putEntry(event, ifNew, ifOld, expectedOldValue,
requireOldValue, lastModified, overwriteDestroyed);
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
return returnValue;
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com
* .gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean,
* java.lang.Object, boolean, long, boolean)
*/
@Override
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
boolean ifOld, Object expectedOldValue, boolean requireOldValue,
long lastModified, boolean overwriteDestroyed)
throws DataLocationException {
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove throwable
logger.debug("DistPeerTXStateStub.putEntryOnRemote "
+ event.getKeyInfo().getKey(), new Throwable());
}
boolean returnValue = super.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue,
requireOldValue, lastModified, overwriteDestroyed);
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
return returnValue;
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry
* (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean,
* java.lang.Object)
*/
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws EntryNotFoundException {
// logger.debug("DistPeerTXStateStub.destroyExistingEntry", new Throwable());
this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
super.destroyExistingEntry(event, cacheWrite, expectedOldValue);
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.InternalDataView#destroyOnRemote(java
* .lang.Integer, com.gemstone.gemfire.internal.cache.EntryEventImpl,
* java.lang.Object)
*/
public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws DataLocationException {
// logger.debug("DistPeerTXStateStub.destroyOnRemote", new Throwable());
super.destroyOnRemote(event, cacheWrite, expectedOldValue);
this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry
* (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateExistingEntry(EntryEventImpl event,
boolean invokeCallbacks, boolean forceNewEntry) {
// logger
// .debug("DistPeerTXStateStub.invalidateExistingEntry", new Throwable());
super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.InternalDataView#invalidateOnRemote
* (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) throws DataLocationException {
// logger.debug("DistPeerTXStateStub.invalidateOnRemote", new Throwable());
super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
}
public void postPutAll(DistributedPutAllOperation putallOp,
VersionedObjectList successfulPuts, LocalRegion region) {
super.postPutAll(putallOp, successfulPuts, region);
EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region,
Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp
.getBaseEvent().getValue());
event.setEventId(putallOp.getBaseEvent().getEventId());
DistTxEntryEvent dtop = new DistTxEntryEvent(event);
dtop.setPutAllOperation(putallOp);
this.primaryTransactionalOperations.add(dtop);
}
public void postRemoveAll(DistributedRemoveAllOperation removeAllOp,
VersionedObjectList successfulOps, LocalRegion region) {
super.postRemoveAll(removeAllOp, successfulOps, region);
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp,
region, removeAllOp.getBaseEvent().getKey());
event.setEventId(removeAllOp.getBaseEvent().getEventId());
DistTxEntryEvent dtop = new DistTxEntryEvent(event);
dtop.setRemoveAllOperation(removeAllOp);
this.primaryTransactionalOperations.add(dtop);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(super.toString());
builder.append(" ,primary txOps=").append(this.primaryTransactionalOperations);
builder.append(" ,secondary txOps=").append(this.secondaryTransactionalOperations);
return builder.toString();
}
@Override
public boolean getPreCommitResponse()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("getPreCommitResponse"));
}
@Override
public boolean getRollbackResponse()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_ROLLBACK_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("getRollbackResponse"));
}
@Override
public void setPrecommitMessage(DistTXPrecommitMessage precommitMsg, DM dm)
throws UnsupportedOperationInTransactionException {
this.precommitDistTxMsg = precommitMsg;
this.dm = dm;
}
@Override
public void setCommitMessage(DistTXCommitMessage commitMsg, DM dm)
throws UnsupportedOperationInTransactionException {
this.commitDistTxMsg = commitMsg;
this.dm = dm;
}
@Override
public void setRollbackMessage(DistTXRollbackMessage rollbackMsg, DM dm)
throws UnsupportedOperationInTransactionException {
this.rollbackDistTxMsg = rollbackMsg;
this.dm = dm;
}
@Override
public void gatherAffectedRegions(HashSet<LocalRegion> regionSet,
boolean includePrimaryRegions, boolean includeRedundantRegions)
throws UnsupportedOperationInTransactionException {
if (includePrimaryRegions) {
for (DistTxEntryEvent dtos : this.primaryTransactionalOperations) {
regionSet.add(dtos.getRegion());
}
}
if (includeRedundantRegions) {
for (DistTxEntryEvent dtos : this.secondaryTransactionalOperations) {
regionSet.add(dtos.getRegion());
}
}
}
@Override
public void gatherAffectedRegionsName(TreeSet<String> sortedRegionName,
boolean includePrimaryRegions, boolean includeRedundantRegions)
throws UnsupportedOperationInTransactionException {
if (includePrimaryRegions) {
DistTXStateOnCoordinator.gatherAffectedRegions(sortedRegionName,
this.primaryTransactionalOperations);
}
if (includeRedundantRegions) {
DistTXStateOnCoordinator.gatherAffectedRegions(sortedRegionName,
this.secondaryTransactionalOperations);
}
}
@Override
public boolean isDistTx() {
return true;
}
@Override
public boolean isCreatedOnDistTxCoordinator() {
return true;
}
@Override
public void finalCleanup() {
cleanup();
}
}