blob: 859cf43c9c9eb660a6fdeccae33e1f98b4080170 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.tx;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
import com.gemstone.gemfire.internal.cache.DataLocationException;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.KeyInfo;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion.RetryTimeKeeper;
import com.gemstone.gemfire.internal.cache.PartitionedRegionStats;
import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
import com.gemstone.gemfire.internal.cache.TXStateStub;
import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
import com.gemstone.gemfire.internal.cache.partitioned.RemoteSizeMessage;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
/**
* tracks bucketIds of transactional operations so as to distinguish between
* TransactionDataNotColocated and TransactionDataRebalanced exceptions.
* Map rather than set, as a HashSet is backed by a HashMap. (avoids one "new" call).
*/
private Map<Integer, Boolean> buckets = new HashMap<Integer, Boolean>();
public PartitionedTXRegionStub(TXStateStub txstate,LocalRegion r) {
super(txstate,r);
}
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) {
PartitionedRegion pr = (PartitionedRegion)event.getLocalRegion();
try {
pr.destroyRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event, expectedOldValue);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e.getCause());
throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
RuntimeException re;
if (isBucketNotFoundException(e)) {
re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
} else {
re = new TransactionDataNodeHasDepartedException(LocalizedStrings.PartitionedRegion_TRANSACTION_DATA_NODE_0_HAS_DEPARTED_TO_PROCEED_ROLLBACK_THIS_TRANSACTION_AND_BEGIN_A_NEW_ONE.toLocalizedString(state.getTarget()));
}
re.initCause(e);
waitToRetry();
throw re;
}
trackBucketForTx(event.getKeyInfo());
}
private RuntimeException getTransactionException(KeyInfo keyInfo,
Throwable cause) {
region.getCancelCriterion().checkCancelInProgress(cause); // fixes bug 44567
Throwable ex = cause;
while (ex != null) {
if (ex instanceof CacheClosedException) {
return new TransactionDataNodeHasDepartedException(ex.getMessage());
}
ex = ex.getCause();
}
if (keyInfo != null && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId())) {
// for parent region if previous ops were successful and for child colocated regions
// where the bucketId was not previously encountered
return new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION
.toLocalizedString(keyInfo.getKey()));
}
ex = cause;
while (ex != null) {
if (ex instanceof PrimaryBucketException) {
return new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
.toLocalizedString());
}
ex = ex.getCause();
}
return new TransactionDataNodeHasDepartedException(cause.getLocalizedMessage());
}
/**
* wait to retry after getting a ForceReattemptException
*/
private void waitToRetry() {
// this is what PR operations do. The 2000ms is not used
(new RetryTimeKeeper(2000)).waitForBucketsRecovery();
}
public Entry getEntry(KeyInfo keyInfo, boolean allowTombstones) {
PartitionedRegion pr = (PartitionedRegion)region;
try {
Entry e = pr.getEntryRemotely((InternalDistributedMember)state.getTarget(),
keyInfo.getBucketId(), keyInfo.getKey(), false,
allowTombstones);
trackBucketForTx(keyInfo);
return e;
} catch (EntryNotFoundException enfe) {
return null;
} catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
RuntimeException re;
if (isBucketNotFoundException(e)) {
re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
} else {
re = new TransactionDataNodeHasDepartedException(LocalizedStrings.PartitionedRegion_TRANSACTION_DATA_NODE_0_HAS_DEPARTED_TO_PROCEED_ROLLBACK_THIS_TRANSACTION_AND_BEGIN_A_NEW_ONE.toLocalizedString(state.getTarget()));
}
re.initCause(e);
waitToRetry();
throw re;
}
}
private void trackBucketForTx(KeyInfo keyInfo) {
if (region.getCache().getLoggerI18n().fineEnabled()) {
region.getCache().getLoggerI18n().fine("adding bucket:"+keyInfo.getBucketId()+" for tx:"+state.getTransactionId());
}
if (keyInfo.getBucketId() >= 0) {
buckets.put(keyInfo.getBucketId(), Boolean.TRUE);
}
}
public void invalidateExistingEntry(EntryEventImpl event,
boolean invokeCallbacks, boolean forceNewEntry) {
PartitionedRegion pr = (PartitionedRegion)event.getLocalRegion();
try {
pr.invalidateRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e.getCause());
throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
RuntimeException re;
if (isBucketNotFoundException(e)) {
re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
} else {
re = new TransactionDataNodeHasDepartedException(LocalizedStrings.PartitionedRegion_TRANSACTION_DATA_NODE_0_HAS_DEPARTED_TO_PROCEED_ROLLBACK_THIS_TRANSACTION_AND_BEGIN_A_NEW_ONE.toLocalizedString(state.getTarget()));
}
re.initCause(e);
waitToRetry();
throw re;
}
trackBucketForTx(event.getKeyInfo());
}
public boolean containsKey(KeyInfo keyInfo) {
PartitionedRegion pr = (PartitionedRegion)region;
try {
boolean retVal = pr.containsKeyRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
}
catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
throw re;
}
catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
catch (ForceReattemptException e) {
if (isBucketNotFoundException(e)) {
return false;
}
waitToRetry();
RuntimeException re = new TransactionDataNodeHasDepartedException(LocalizedStrings.PartitionedRegion_TRANSACTION_DATA_NODE_0_HAS_DEPARTED_TO_PROCEED_ROLLBACK_THIS_TRANSACTION_AND_BEGIN_A_NEW_ONE.toLocalizedString(state.getTarget()));
re.initCause(e);
throw re;
}
}
/**
* @param e
* @return true if the cause of the FRE is a BucketNotFoundException
*/
private boolean isBucketNotFoundException(ForceReattemptException e) {
ForceReattemptException fre = e;
while (fre.getCause() != null && fre.getCause() instanceof ForceReattemptException) {
fre = (ForceReattemptException)fre.getCause();
}
return fre instanceof BucketNotFoundException;
}
public boolean containsValueForKey(KeyInfo keyInfo) {
PartitionedRegion pr = (PartitionedRegion)region;
try {
boolean retVal = pr.containsValueForKeyRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
}
catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
throw re;
}
catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
catch (ForceReattemptException e) {
if (isBucketNotFoundException(e)) {
return false;
}
waitToRetry();
RuntimeException re = new TransactionDataNodeHasDepartedException(LocalizedStrings.PartitionedRegion_TRANSACTION_DATA_NODE_0_HAS_DEPARTED_TO_PROCEED_ROLLBACK_THIS_TRANSACTION_AND_BEGIN_A_NEW_ONE.toLocalizedString(state.getTarget()));
re.initCause(e);
throw re;
}
}
public Object findObject(KeyInfo keyInfo, boolean isCreate,
boolean generateCallbacks, Object value, boolean peferCD,
ClientProxyMembershipID requestingClient,
EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
PartitionedRegion pr = (PartitionedRegion)region;
try {
retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
if (isBucketNotFoundException(e)) {
return null;
}
waitToRetry();
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
trackBucketForTx(keyInfo);
return retVal;
}
public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstones) {
PartitionedRegion pr = (PartitionedRegion)region;
InternalDistributedMember primary = pr.getBucketPrimary(keyInfo.getBucketId());
if (primary.equals(state.getTarget())) {
return getEntry(keyInfo, allowTombstones);
} else {
return pr.getSharedDataView().getEntry(keyInfo, pr, allowTombstones);
}
}
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
boolean retVal = false;
final LocalRegion r = event.getLocalRegion();
PartitionedRegion pr = (PartitionedRegion)r;
try {
retVal = pr.putRemotely(state.getTarget(), event, ifNew, ifOld, expectedOldValue, requireOldValue);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e.getCause());
throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
waitToRetry();
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
}
trackBucketForTx(event.getKeyInfo());
return retVal;
}
public int entryCount() {
try {
RemoteSizeMessage.SizeResponse response = RemoteSizeMessage.send(Collections.singleton(state.getTarget()), region);
return response.waitForSize();
} catch (Exception e) {
throw getTransactionException(null, e);
}
}
/**
* Create PutAllPRMsgs for each bucket, and send them.
*
* @param putallO
* DistributedPutAllOperation object.
*/
public void postPutAll(DistributedPutAllOperation putallO, VersionedObjectList successfulPuts, LocalRegion r) throws TransactionException {
if (r.getCache().isCacheAtShutdownAll()) {
throw new CacheClosedException("Cache is shutting down");
}
PartitionedRegion pr = (PartitionedRegion)r;
final long startTime = PartitionedRegionStats.startTime();
// build all the msgs by bucketid
HashMap prMsgMap = putallO.createPRMessages();
PutAllPartialResult partialKeys = new PutAllPartialResult(putallO.putAllDataSize);
successfulPuts.clear(); // this is rebuilt by this method
Iterator itor = prMsgMap.entrySet().iterator();
while (itor.hasNext()) {
Map.Entry mapEntry = (Map.Entry)itor.next();
Integer bucketId = (Integer)mapEntry.getKey();
PutAllPRMessage prMsg =(PutAllPRMessage)mapEntry.getValue();
pr.checkReadiness();
try {
VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg,pr);
//prMsg.saveKeySet(partialKeys);
partialKeys.addKeysAndVersions(versions);
successfulPuts.addAll(versions);
} catch (PutAllPartialResultException pre) {
// sendMsgByBucket applied partial keys
partialKeys.consolidate(pre.getResult());
} catch (Exception ex) {
// If failed at other exception
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
} finally {
firstEvent.release();
}
}
}
pr.prStats.endPutAll(startTime);
if (partialKeys.hasFailure()) {
pr.getCache().getLoggerI18n().info(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1,
new Object[] {pr.getFullPath(), partialKeys});
if (putallO.isBridgeOperation()) {
if (partialKeys.getFailure() instanceof CancelException) {
throw (CancelException)partialKeys.getFailure();
} else {
throw new PutAllPartialResultException(partialKeys);
}
} else {
if (partialKeys.getFailure() instanceof RuntimeException) {
throw (RuntimeException)partialKeys.getFailure();
} else {
throw new RuntimeException(partialKeys.getFailure());
}
}
}
}
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps, LocalRegion r) {
if (r.getCache().isCacheAtShutdownAll()) {
throw new CacheClosedException("Cache is shutting down");
}
PartitionedRegion pr = (PartitionedRegion)r;
final long startTime = PartitionedRegionStats.startTime();
// build all the msgs by bucketid
HashMap<Integer, RemoveAllPRMessage> prMsgMap = op.createPRMessages();
PutAllPartialResult partialKeys = new PutAllPartialResult(op.removeAllDataSize);
successfulOps.clear(); // this is rebuilt by this method
Iterator<Map.Entry<Integer, RemoveAllPRMessage>> itor = prMsgMap.entrySet().iterator();
while (itor.hasNext()) {
Map.Entry<Integer, RemoveAllPRMessage> mapEntry = itor.next();
Integer bucketId = mapEntry.getKey();
RemoveAllPRMessage prMsg = mapEntry.getValue();
pr.checkReadiness();
try {
VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg,pr);
//prMsg.saveKeySet(partialKeys);
partialKeys.addKeysAndVersions(versions);
successfulOps.addAll(versions);
} catch (PutAllPartialResultException pre) {
// sendMsgByBucket applied partial keys
partialKeys.consolidate(pre.getResult());
} catch (Exception ex) {
// If failed at other exception
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
}
}
pr.prStats.endRemoveAll(startTime);
if (partialKeys.hasFailure()) {
pr.getCache().getLoggerI18n().info(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_0_1,
new Object[] {pr.getFullPath(), partialKeys});
if (op.isBridgeOperation()) {
if (partialKeys.getFailure() instanceof CancelException) {
throw (CancelException)partialKeys.getFailure();
} else {
throw new PutAllPartialResultException(partialKeys);
}
} else {
if (partialKeys.getFailure() instanceof RuntimeException) {
throw (RuntimeException)partialKeys.getFailure();
} else {
throw new RuntimeException(partialKeys.getFailure());
}
}
}
}
/* If failed after retries, it will throw PartitionedRegionStorageException, no need for return value */
private VersionedObjectList sendMsgByBucket(final Integer bucketId, PutAllPRMessage prMsg,PartitionedRegion pr)
{
// retry the put remotely until it finds the right node managing the bucket
InternalDistributedMember currentTarget = pr.getOrCreateNodeForBucketWrite(bucketId.intValue(), null);
if(!currentTarget.equals(this.state.getTarget())) {
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
throw new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(firstEvent.getKey()));
} finally {
firstEvent.release();
}
}
try {
return pr.tryToSendOnePutAllMessage(prMsg,currentTarget);
}
catch (ForceReattemptException prce) {
pr.checkReadiness();
throw new TransactionDataNotColocatedException(prce.getMessage());
}
catch (PrimaryBucketException notPrimary) {
RuntimeException re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
re.initCause(notPrimary);
throw re;
}
catch (DataLocationException dle) {
throw new TransactionException(dle);
}
}
/* If failed after retries, it will throw PartitionedRegionStorageException, no need for return value */
private VersionedObjectList sendMsgByBucket(final Integer bucketId, RemoveAllPRMessage prMsg,PartitionedRegion pr)
{
// retry the put remotely until it finds the right node managing the bucket
InternalDistributedMember currentTarget = pr.getOrCreateNodeForBucketWrite(bucketId.intValue(), null);
if(!currentTarget.equals(this.state.getTarget())) {
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
throw new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(firstEvent.getKey()));
}
try {
return pr.tryToSendOneRemoveAllMessage(prMsg,currentTarget);
}
catch (ForceReattemptException prce) {
pr.checkReadiness();
throw new TransactionDataNotColocatedException(prce.getMessage());
}
catch (PrimaryBucketException notPrimary) {
RuntimeException re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
re.initCause(notPrimary);
throw re;
}
catch (DataLocationException dle) {
throw new TransactionException(dle);
}
}
@Override
public void cleanup() {
}
}