blob: 7939b77e7aee2c562829d0347120637c0dca43f4 [file] [log] [blame]
package com.gemstone.gemfire.internal.cache;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.TreeSet;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
* TxState on TX coordinator, created when coordinator is also a data node
*
* @author shirishd
*
*/
public final class DistTXStateOnCoordinator extends DistTXState implements
DistTXCoordinatorInterface {
private ArrayList<DistTxEntryEvent> primaryTransactionalOperations = null;
private ArrayList<DistTxEntryEvent> secondaryTransactionalOperations = null;
private boolean preCommitResponse = false;
private boolean rollbackResponse = false;
public DistTXStateOnCoordinator(TXStateProxy proxy,
boolean onBehalfOfRemoteStub) {
super(proxy, onBehalfOfRemoteStub);
primaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
secondaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
}
public final ArrayList<DistTxEntryEvent> getPrimaryTransactionalOperations()
throws UnsupportedOperationInTransactionException {
return primaryTransactionalOperations;
}
private final void addPrimaryTransactionalOperations(DistTxEntryEvent dtop) {
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove these
logger
.debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations add "
+ dtop
+ " ,stub before="
+ this
+ " ,isUpdatingTxStateDuringPreCommit="
+ isUpdatingTxStateDuringPreCommit());
}
if (!isUpdatingTxStateDuringPreCommit()) {
primaryTransactionalOperations.add(dtop);
// [DISTTX] TODO Remove this
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations "
+ " add primary op = {}", dtop);
}
}
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove these
logger
.debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations stub after add = "
+ this);
}
}
public final void addSecondaryTransactionalOperations(DistTxEntryEvent dtop)
throws UnsupportedOperationInTransactionException {
secondaryTransactionalOperations.add(dtop);
}
@Override
public void precommit() {
boolean retVal = applyOpsOnRedundantCopy(this.proxy.getCache()
.getDistributedSystem().getDistributedMember(),
this.secondaryTransactionalOperations);
if (retVal) {
super.precommit();
}
this.preCommitResponse = retVal; // Apply if no exception
}
@Override
public void rollback() {
super.rollback();
this.rollbackResponse = true; // True if no exception
// Cleanup is called next
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.TXStateStub#putEntry(com.gemstone.gemfire
* .internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object,
* boolean, long, boolean)
*/
@Override
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove throwable
logger.debug("DistTXStateOnCoordinator.putEntry "
+ event.getKeyInfo().getKey(), new Throwable());
}
boolean returnValue = super.putEntry(event, ifNew, ifOld, expectedOldValue,
requireOldValue, lastModified, overwriteDestroyed);
// putAll event is already added in postPutAll, don't add individual events
// from the putAll operation again
if (!event.getOperation().isPutAll()) {
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
}
return returnValue;
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com
* .gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean,
* java.lang.Object, boolean, long, boolean)
*/
@Override
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
boolean ifOld, Object expectedOldValue, boolean requireOldValue,
long lastModified, boolean overwriteDestroyed)
throws DataLocationException {
if (logger.isDebugEnabled()) {
// [DISTTX] TODO Remove throwable
logger.debug("DistTXStateOnCoordinator.putEntryOnRemote "
+ event.getKeyInfo().getKey(), new Throwable());
}
boolean returnValue = super.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue,
requireOldValue, lastModified, overwriteDestroyed);
// putAll event is already added in postPutAll, don't add individual events
// from the putAll operation again
if (!event.getOperation().isPutAll()) {
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
}
return returnValue;
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry
* (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean,
* java.lang.Object)
*/
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws EntryNotFoundException {
// logger.debug("DistTXStateOnCoordinator.destroyExistingEntry", new Throwable());
super.destroyExistingEntry(event, cacheWrite, expectedOldValue);
// removeAll event is already added in postRemoveAll, don't add individual
// events from the removeAll operation again
if (!event.getOperation().isRemoveAll()) {
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
}
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.InternalDataView#destroyOnRemote(java
* .lang.Integer, com.gemstone.gemfire.internal.cache.EntryEventImpl,
* java.lang.Object)
*/
public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws DataLocationException {
// logger.debug("DistTXStateOnCoordinator.destroyOnRemote", new Throwable());
super.destroyOnRemote(event, cacheWrite, expectedOldValue);
// removeAll event is already added in postRemoveAll, don't add individual
// events from the removeAll operation again
if (!event.getOperation().isRemoveAll()) {
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
}
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry
* (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateExistingEntry(EntryEventImpl event,
boolean invokeCallbacks, boolean forceNewEntry) {
// logger
// .debug("DistTXStateOnCoordinator.invalidateExistingEntry", new Throwable());
super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
}
/*
* (non-Javadoc)
*
* @see
* com.gemstone.gemfire.internal.cache.InternalDataView#invalidateOnRemote
* (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) throws DataLocationException {
// logger.debug("DistTXStateOnCoordinator.invalidateOnRemote", new Throwable());
super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
}
public void postPutAll(DistributedPutAllOperation putallOp,
VersionedObjectList successfulPuts, LocalRegion region) {
super.postPutAll(putallOp, successfulPuts, region);
EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region,
Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp
.getBaseEvent().getValue());
event.setEventId(putallOp.getBaseEvent().getEventId());
DistTxEntryEvent dtop = new DistTxEntryEvent(event);
dtop.setPutAllOperation(putallOp);
addPrimaryTransactionalOperations(dtop);
}
public void postRemoveAll(DistributedRemoveAllOperation removeAllOp,
VersionedObjectList successfulOps, LocalRegion region) {
super.postRemoveAll(removeAllOp, successfulOps, region);
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp,
region, removeAllOp.getBaseEvent().getKey());
event.setEventId(removeAllOp.getBaseEvent().getEventId());
DistTxEntryEvent dtop = new DistTxEntryEvent(event);
dtop.setRemoveAllOperation(removeAllOp);
addPrimaryTransactionalOperations(dtop);
}
@Override
public boolean getPreCommitResponse()
throws UnsupportedOperationInTransactionException {
return this.preCommitResponse;
}
@Override
public boolean getRollbackResponse()
throws UnsupportedOperationInTransactionException {
return this.rollbackResponse;
}
@Override
public void setPrecommitMessage(DistTXPrecommitMessage precommitMsg, DM dm)
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("setPrecommitMessage"));
}
@Override
public void setCommitMessage(DistTXCommitMessage commitMsg, DM dm)
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("setCommitMessage"));
}
@Override
public void setRollbackMessage(DistTXRollbackMessage rollbackMsg, DM dm)
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_ROLLBACK_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("setRollbackMessage"));
}
@Override
public void gatherAffectedRegions(HashSet<LocalRegion> regionSet,
boolean includePrimaryRegions, boolean includeRedundantRegions)
throws UnsupportedOperationInTransactionException {
if (includePrimaryRegions) {
for (DistTxEntryEvent dtos : this.primaryTransactionalOperations) {
regionSet.add(dtos.getRegion());
}
}
if (includeRedundantRegions) {
for (DistTxEntryEvent dtos : this.secondaryTransactionalOperations) {
regionSet.add(dtos.getRegion());
}
}
}
@Override
public void gatherAffectedRegionsName(TreeSet<String> sortedRegionName,
boolean includePrimaryRegions, boolean includeRedundantRegions)
throws UnsupportedOperationInTransactionException {
if (includePrimaryRegions) {
gatherAffectedRegions(sortedRegionName,
this.primaryTransactionalOperations);
}
if (includeRedundantRegions) {
gatherAffectedRegions(sortedRegionName,
this.secondaryTransactionalOperations);
}
}
public static void gatherAffectedRegions(TreeSet<String> sortedRegionName,
ArrayList<DistTxEntryEvent> regionOps) {
for (DistTxEntryEvent dtos : regionOps) {
LocalRegion lr = dtos.getRegion();
if (lr instanceof PartitionedRegion) {
sortedRegionName.add(PartitionedRegionHelper.getBucketFullPath(
lr.getFullPath(), dtos.getKeyInfo().getBucketId()));
} else {
sortedRegionName.add(lr.getFullPath());
}
}
}
/**
* {@inheritDoc}
*
*/
@Override
protected boolean applyIndividualOp(DistTxEntryEvent dtop)
throws DataLocationException {
boolean result = true;
if (dtop.op.isUpdate() || dtop.op.isCreate()) {
if (dtop.op.isPutAll()) {
assert(dtop.getPutAllOperation() != null);
//[DISTTX] TODO what do with versions next?
final VersionedObjectList versions = new VersionedObjectList(
dtop.getPutAllOperation().putAllDataSize, true,
dtop.region.concurrencyChecksEnabled);
postPutAll(dtop.getPutAllOperation(), versions, dtop.region);
} else {
result = putEntry(dtop, false/* ifNew */,
dtop.hasDelta()/* ifOld */, null/* expectedOldValue */,
false/* requireOldValue */, 0L/* lastModified */, true/*
* overwriteDestroyed
* *not*
* used
*/);
}
} else if (dtop.op.isDestroy()) {
if (dtop.op.isRemoveAll()) {
assert (dtop.getRemoveAllOperation() != null);
// [DISTTX] TODO what do with versions next?
final VersionedObjectList versions = new VersionedObjectList(
dtop.getRemoveAllOperation().removeAllDataSize, true,
dtop.region.concurrencyChecksEnabled);
postRemoveAll(dtop.getRemoveAllOperation(), versions, dtop.region);
} else {
destroyExistingEntry(dtop, false/* TODO [DISTTX] */, null/*
* TODO
* [DISTTX]
*/);
}
} else if (dtop.op.isInvalidate()) {
invalidateExistingEntry(dtop, true/* TODO [DISTTX] */, false/*
* TODO
* [DISTTX]
*/);
} else {
logger.debug("DistTXCommitPhaseOneMessage: unsupported TX operation {}",
dtop);
assert (false);
}
return result;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(super.toString());
builder.append(" ,primary txOps=").append(this.primaryTransactionalOperations);
builder.append(" ,secondary txOps=").append(this.secondaryTransactionalOperations);
builder.append(" ,preCommitResponse=").append(this.preCommitResponse);
builder.append(" ,rollbackResponse=").append(this.rollbackResponse);
return builder.toString();
}
@Override
public boolean isCreatedOnDistTxCoordinator() {
return true;
}
@Override
public void finalCleanup() {
cleanup();
}
}