blob: cefc623b1ce8e80d5f256c47792ded1fa4668346 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
* File comment
*/
package com.gemstone.gemfire.internal.cache;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.ReliableReplyException;
import com.gemstone.gemfire.distributed.internal.ReliableReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tx.TXRegionStub;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
* TXStateStub lives on the accessor node when we are remoting
* a transaction. It is a stub for {@link TXState}.
*
* @author gregp, sbawaska
*
*/
public abstract class TXStateStub implements TXStateInterface {
protected final DistributedMember target;
protected final TXStateProxy proxy;
protected Runnable internalAfterSendRollback;
protected Runnable internalAfterSendCommit;
Map<Region<?,?>,TXRegionStub> regionStubs = new HashMap<Region<?,?>,TXRegionStub>();
/**
* @param stateProxy
* @param target
*/
protected TXStateStub(TXStateProxy stateProxy, DistributedMember target) {
this.target = target;
this.proxy = stateProxy;
this.internalAfterSendRollback = null;
this.internalAfterSendCommit = null;
}
@Override
public void precommit() throws CommitConflictException,
UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
.toLocalizedString("precommit"));
}
/**
* Implemented in subclasses for Peer vs. Client
*/
public abstract void commit() throws CommitConflictException;
protected abstract void validateRegionCanJoinTransaction(LocalRegion region) throws TransactionException;
protected abstract TXRegionStub generateRegionStub(LocalRegion region);
public abstract void rollback();
public abstract void afterCompletion(int status);
public void beforeCompletion() {
// note that this class must do distribution as it is used as the stub class in some situations
ReliableReplyProcessor21 response = JtaBeforeCompletionMessage.send(proxy.getCache(),
proxy.getTxId().getUniqId(),getOriginatingMember(), target);
try {
try {
response.waitForReliableDelivery();
} catch (ReliableReplyException e) {
throw new TransactionDataNodeHasDepartedException(e);
} catch (ReplyException e) {
e.handleAsUnexpected();
} catch (InterruptedException e) {
}
} catch (SynchronizationCommitConflictException e) {
throw e;
} catch (CommitConflictException cce) {
throw cce;
} catch(TransactionException te) {
throw te;
}
}
/**
* Get or create a TXRegionStub for the given region.
* For regions that are new to the tx, we validate their eligibility.
*
* @param region The region to involve in the tx.
* @return existing or new stub for region
*/
protected final TXRegionStub getTXRegionStub(LocalRegion region) {
TXRegionStub stub = regionStubs.get(region);
if(stub==null) {
/*
* validate whether this region is legit or not
*/
validateRegionCanJoinTransaction(region);
stub = generateRegionStub(region);
regionStubs.put(region,stub);
}
return stub;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(this.getClass())
.append("@")
.append(System.identityHashCode(this))
.append(" target node: ")
.append(target);
return builder.toString();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, java.lang.Object)
*/
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws EntryNotFoundException {
if (event.getOperation().isLocal()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.TXStateStub_LOCAL_DESTROY_NOT_ALLOWED_IN_TRANSACTION.toLocalizedString());
}
TXRegionStub rs = getTXRegionStub(event.getRegion());
rs.destroyExistingEntry(event,cacheWrite,expectedOldValue);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getBeginTime()
*/
public long getBeginTime() {
// TODO Auto-generated method stub
return 0;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getCache()
*/
public Cache getCache() {
return this.proxy.getTxMgr().getCache();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getChanges()
*/
public int getChanges() {
// TODO Auto-generated method stub
return 0;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
// We never have a local value if we are a stub...
return null;
}
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones) {
// We never have a local value if we are a stub...
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getEntry(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Entry getEntry(KeyInfo keyInfo, LocalRegion r, boolean allowTombstones) {
return getTXRegionStub(r).getEntry(keyInfo, allowTombstones);
// Entry retVal = null;
// if (r.getPartitionAttributes() != null) {
// PartitionedRegion pr = (PartitionedRegion)r;
// try {
// retVal = pr.getEntryRemotely((InternalDistributedMember)target,
// keyInfo.getBucketId(), keyInfo.getKey(), allowTombstones);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getEvent()
*/
public TXEvent getEvent() {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getEvents()
*/
public List getEvents() {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getRegions()
*/
public Collection<LocalRegion> getRegions() {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getTransactionId()
*/
public TransactionId getTransactionId() {
return this.proxy.getTxId();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateExistingEntry(EntryEventImpl event,
boolean invokeCallbacks, boolean forceNewEntry) {
if (event.getOperation().isLocal()) {
throw new UnsupportedOperationInTransactionException(
LocalizedStrings.TXStateStub_LOCAL_INVALIDATE_NOT_ALLOWED_IN_TRANSACTION.toLocalizedString());
}
getTXRegionStub(event.getRegion()).invalidateExistingEntry(event,invokeCallbacks,forceNewEntry);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#isInProgress()
*/
public boolean isInProgress() {
return this.proxy.isInProgress();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#isInProgressAndSameAs(com.gemstone.gemfire.internal.cache.TXStateInterface)
*/
public boolean isInProgressAndSameAs(TXStateInterface state) {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#needsLargeModCount()
*/
public boolean needsLargeModCount() {
// TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#nextModSerialNum()
*/
public int nextModSerialNum() {
// TODO Auto-generated method stub
return 0;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#readRegion(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public TXRegionState readRegion(LocalRegion r) {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#rmRegion(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public void rmRegion(LocalRegion r) {
throw new UnsupportedOperationException();
}
public void setAfterSendRollback(Runnable afterSend) {
// TODO Auto-generated method stub
internalAfterSendRollback = afterSend;
}
public void setAfterSendCommit(Runnable afterSend) {
// TODO Auto-generated method stub
internalAfterSendCommit = afterSend;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#txPutEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean, boolean)
*/
public boolean txPutEntry(EntryEventImpl event, boolean ifNew,
boolean requireOldValue, boolean checkResources, Object expectedOldValue) {
return false;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#txReadEntry(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
public TXEntryState txReadEntry(KeyInfo entryKey, LocalRegion localRegion,
boolean rememberRead,boolean createTxEntryIfAbsent) {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#txReadRegion(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public TXRegionState txReadRegion(LocalRegion localRegion) {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#txWriteRegion(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object)
*/
public TXRegionState txWriteRegion(LocalRegion localRegion, KeyInfo entryKey) {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#writeRegion(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public TXRegionState writeRegion(LocalRegion r) {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#containsKey(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public boolean containsKey(KeyInfo keyInfo, LocalRegion localRegion) {
return getTXRegionStub(localRegion).containsKey(keyInfo);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#containsValueForKey(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public boolean containsValueForKey(KeyInfo keyInfo, LocalRegion localRegion) {
return getTXRegionStub(localRegion).containsValueForKey( keyInfo);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#entryCount(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public int entryCount(LocalRegion localRegion) {
return getTXRegionStub(localRegion).entryCount();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent, allowReadFromHDFS);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getAdditionalKeysForIterator(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Set getAdditionalKeysForIterator(LocalRegion currRgn) {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getEntryForIterator(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, boolean)
*/
public Object getEntryForIterator(KeyInfo keyInfo, LocalRegion currRgn,
boolean rememberReads, boolean allowTombstones) {
return getTXRegionStub(currRgn).getEntryForIterator(keyInfo, allowTombstones);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getKeyForIterator(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn,
boolean rememberReads, boolean allowTombstones) {
return keyInfo.getKey();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getValueInVM(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion,
boolean rememberRead) {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#isDeferredStats()
*/
public boolean isDeferredStats() {
return true;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#putEntry(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
*/
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
return getTXRegionStub(event.getRegion()).putEntry(event,ifNew,ifOld,expectedOldValue,requireOldValue,lastModified,overwriteDestroyed);
}
/*
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
throw new UnsupportedOperationException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
*/
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
boolean ifOld, Object expectedOldValue, boolean requireOldValue,
long lastModified, boolean overwriteDestroyed)
throws DataLocationException {
throw new IllegalStateException();
}
public boolean isFireCallbacks() {
return false;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#destroyOnRemote(java.lang.Integer, com.gemstone.gemfire.internal.cache.EntryEventImpl, java.lang.Object)
*/
public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws DataLocationException {
throw new IllegalStateException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#invalidateOnRemote(com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
*/
public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) throws DataLocationException {
throw new IllegalStateException();
}
public void checkSupportsRegionDestroy()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_REGION_DESTROY_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
public void checkSupportsRegionInvalidate()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_REGION_INVALIDATE_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
@Override
public void checkSupportsRegionClear()
throws UnsupportedOperationInTransactionException {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.TXState_REGION_CLEAR_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getBucketKeys(com.gemstone.gemfire.internal.cache.LocalRegion, int)
*/
public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) {
PartitionedRegion pr = (PartitionedRegion)localRegion;
/*
* txtodo: what does this mean for c/s
*/
return pr.getBucketKeys(bucketId, allowTombstones);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getEntryOnRemote(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Entry getEntryOnRemote(KeyInfo key, LocalRegion localRegion, boolean allowTombstones)
throws DataLocationException {
throw new IllegalStateException();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getSemaphore()
*/
public ReentrantLock getLock() {
return proxy.getLock();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getRegionKeysForIteration(com.gemstone.gemfire.internal.cache.LocalRegion)
*/
public Set getRegionKeysForIteration(LocalRegion currRegion) {
return getTXRegionStub(currRegion).getRegionKeysForIteration(currRegion);
}
public boolean isRealDealLocal()
{
return false;
}
public DistributedMember getTarget() {
return target;
}
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,LocalRegion region) {
getTXRegionStub(region).postPutAll(putallOp,successfulPuts,region);
}
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps, LocalRegion region) {
getTXRegionStub(region).postRemoveAll(op, successfulOps, region);
}
public Entry accessEntry(KeyInfo keyInfo, LocalRegion localRegion) {
return getEntry(keyInfo, localRegion, false);
}
@Override
public void updateEntryVersion(EntryEventImpl event)
throws EntryNotFoundException {
throw new UnsupportedOperationException();
}
@Override
public void close() {
// nothing needed
}
@Override
public boolean isTxState() {
return false;
}
@Override
public boolean isTxStateStub() {
return true;
}
@Override
public boolean isTxStateProxy() {
return false;
}
@Override
public boolean isDistTx() {
return false;
}
@Override
public boolean isCreatedOnDistTxCoordinator() {
return false;
}
}