blob: cf13e647190f71836d6287ae6fd24cbed39fa7d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tx;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.geode.CancelException;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.ColocationHelper;
import org.apache.geode.internal.cache.DataLocationException;
import org.apache.geode.internal.cache.DistributedPutAllOperation;
import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.PutAllPartialResultException;
import org.apache.geode.internal.cache.PutAllPartialResultException.PutAllPartialResult;
import org.apache.geode.internal.cache.TXStateStub;
import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.offheap.annotations.Released;
public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
/**
* tracks bucketIds of transactional operations so as to distinguish between
* TransactionDataNotColocated and TransactionDataRebalanced exceptions. Map rather than set, as a
* HashSet is backed by a HashMap. (avoids one "new" call).
*/
private Map<Integer, Boolean> buckets = new HashMap<Integer, Boolean>();
private final PartitionedRegion region;
public PartitionedTXRegionStub(TXStateStub txstate, PartitionedRegion r) {
super(txstate);
this.region = r;
}
public Map<Integer, Boolean> getBuckets() {
return buckets;
}
@Override
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) {
PartitionedRegion pr = (PartitionedRegion) event.getRegion();
try {
pr.destroyRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event,
expectedOldValue);
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
RuntimeException re;
if (isBucketNotFoundException(e)) {
re = new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
} else {
re = new TransactionDataNodeHasDepartedException(
String.format(
"Transaction data node %s has departed. To proceed, rollback this transaction and begin a new one.",
state.getTarget()));
}
re.initCause(e);
waitToRetry();
throw re;
}
trackBucketForTx(event.getKeyInfo());
}
RuntimeException getTransactionException(KeyInfo keyInfo, Throwable cause) {
region.getCancelCriterion().checkCancelInProgress(cause); // fixes bug 44567
Throwable ex = cause;
while (ex != null) {
if (ex instanceof CacheClosedException) {
return new TransactionDataNodeHasDepartedException(ex.getMessage());
}
ex = ex.getCause();
}
if (isKeyInNonColocatedBucket(keyInfo)) {
return new TransactionDataNotColocatedException(
String.format("Key %s is not colocated with transaction",
keyInfo.getKey()));
}
ex = cause;
while (ex != null) {
if (ex instanceof PrimaryBucketException || ex instanceof BucketNotFoundException) {
return new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
}
ex = ex.getCause();
}
return new TransactionDataNodeHasDepartedException(cause.getLocalizedMessage());
}
// is this key in a different bucket from all the existing buckets
// of the underlying PR or its colocated PRs touched by the transaction.
private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) {
Map<Region<?, ?>, TXRegionStub> regionStubs = this.state.getRegionStubs();
Collection<PartitionedRegion> colcatedRegions = (Collection<PartitionedRegion>) ColocationHelper
.getAllColocationRegions(this.region).values();
// get all colocated region buckets touched in the transaction
for (PartitionedRegion colcatedRegion : colcatedRegions) {
PartitionedTXRegionStub regionStub =
(PartitionedTXRegionStub) regionStubs.get(colcatedRegion);
if (regionStub != null) {
buckets.putAll(regionStub.getBuckets());
}
}
return keyInfo != null && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId());
}
/**
* wait to retry after getting a ForceReattemptException
*/
void waitToRetry() {
// this is what PR operations do. The 2000ms is not used
(new RetryTimeKeeper(2000)).waitForBucketsRecovery();
}
@Override
public Entry getEntry(KeyInfo keyInfo, boolean allowTombstones) {
try {
Entry e = region.getEntryRemotely((InternalDistributedMember) state.getTarget(),
keyInfo.getBucketId(), keyInfo.getKey(), false, allowTombstones);
trackBucketForTx(keyInfo);
return e;
} catch (EntryNotFoundException enfe) {
return null;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
RuntimeException re;
if (isBucketNotFoundException(e)) {
re = new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
} else {
re = new TransactionDataNodeHasDepartedException(
String.format(
"Transaction data node %s has departed. To proceed, rollback this transaction and begin a new one.",
state.getTarget()));
}
re.initCause(e);
waitToRetry();
throw re;
}
}
void trackBucketForTx(KeyInfo keyInfo) {
if (region.getCache().getLogger().fineEnabled()) {
region.getCache().getLogger()
.fine("adding bucket:" + keyInfo.getBucketId() + " for tx:" + state.getTransactionId());
}
if (keyInfo.getBucketId() >= 0) {
buckets.put(keyInfo.getBucketId(), Boolean.TRUE);
}
}
@Override
public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) {
PartitionedRegion pr = (PartitionedRegion) event.getRegion();
try {
pr.invalidateRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event);
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
RuntimeException re;
if (isBucketNotFoundException(e)) {
re = new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
} else {
re = new TransactionDataNodeHasDepartedException(
String.format(
"Transaction data node %s has departed. To proceed, rollback this transaction and begin a new one.",
state.getTarget()));
}
re.initCause(e);
waitToRetry();
throw re;
}
trackBucketForTx(event.getKeyInfo());
}
@Override
public boolean containsKey(KeyInfo keyInfo) {
try {
boolean retVal = region.containsKeyRemotely((InternalDistributedMember) state.getTarget(),
keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
if (isBucketNotFoundException(e)) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
waitToRetry();
RuntimeException re = new TransactionDataNodeHasDepartedException(
String.format(
"Transaction data node %s has departed. To proceed, rollback this transaction and begin a new one.",
state.getTarget()));
re.initCause(e);
throw re;
}
}
/**
* @return true if the cause of the FRE is a BucketNotFoundException
*/
boolean isBucketNotFoundException(ForceReattemptException e) {
ForceReattemptException fre = e;
while (fre.getCause() != null && fre.getCause() instanceof ForceReattemptException) {
fre = (ForceReattemptException) fre.getCause();
}
return fre instanceof BucketNotFoundException;
}
@Override
public boolean containsValueForKey(KeyInfo keyInfo) {
try {
boolean retVal = region.containsValueForKeyRemotely(
(InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
if (isBucketNotFoundException(e)) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
waitToRetry();
RuntimeException re = new TransactionDataNodeHasDepartedException(
String.format(
"Transaction data node %s has departed. To proceed, rollback this transaction and begin a new one.",
state.getTarget()));
re.initCause(e);
throw re;
}
}
@Override
public Object findObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
Object value, boolean peferCD, ClientProxyMembershipID requestingClient,
EntryEventImpl clientEvent) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
try {
retVal =
region.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
key, callbackArgument, peferCD, requestingClient, clientEvent, false);
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
if (isBucketNotFoundException(e)) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
waitToRetry();
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
throw re;
}
trackBucketForTx(keyInfo);
return retVal;
}
@Override
public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstones) {
InternalDistributedMember primary = region.getBucketPrimary(keyInfo.getBucketId());
if (primary.equals(state.getTarget())) {
return getEntry(keyInfo, allowTombstones);
} else {
return region.getSharedDataView().getEntry(keyInfo, region, allowTombstones);
}
}
@Override
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
boolean retVal = false;
final InternalRegion r = event.getRegion();
PartitionedRegion pr = (PartitionedRegion) r;
try {
retVal =
pr.putRemotely(state.getTarget(), event, ifNew, ifOld, expectedOldValue, requireOldValue);
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
} catch (ForceReattemptException e) {
waitToRetry();
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
throw re;
}
trackBucketForTx(event.getKeyInfo());
return retVal;
}
/**
* Create PutAllPRMsgs for each bucket, and send them.
*
* @param putallO DistributedPutAllOperation object.
*/
@Override
public void postPutAll(DistributedPutAllOperation putallO, VersionedObjectList successfulPuts,
InternalRegion r) throws TransactionException {
if (r.getCache().isCacheAtShutdownAll()) {
throw r.getCache().getCacheClosedException("Cache is shutting down");
}
PartitionedRegion pr = (PartitionedRegion) r;
final long startTime = pr.prStats.getTime();
// build all the msgs by bucketid
HashMap prMsgMap = putallO.createPRMessages();
PutAllPartialResult partialKeys = new PutAllPartialResult(putallO.putAllDataSize);
successfulPuts.clear(); // this is rebuilt by this method
Iterator itor = prMsgMap.entrySet().iterator();
while (itor.hasNext()) {
Map.Entry mapEntry = (Map.Entry) itor.next();
Integer bucketId = (Integer) mapEntry.getKey();
PutAllPRMessage prMsg = (PutAllPRMessage) mapEntry.getValue();
pr.checkReadiness();
try {
VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg, pr);
// prMsg.saveKeySet(partialKeys);
partialKeys.addKeysAndVersions(versions);
successfulPuts.addAll(versions);
} catch (PutAllPartialResultException pre) {
// sendMsgByBucket applied partial keys
partialKeys.consolidate(pre.getResult());
} catch (Exception ex) {
// If failed at other exception
@Released
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
} finally {
firstEvent.release();
}
}
}
pr.prStats.endPutAll(startTime);
if (partialKeys.hasFailure()) {
pr.getCache().getLogger().info(String.format("Region %s putAll: %s",
new Object[] {pr.getFullPath(), partialKeys}));
if (putallO.isBridgeOperation()) {
if (partialKeys.getFailure() instanceof CancelException) {
throw (CancelException) partialKeys.getFailure();
} else {
throw new PutAllPartialResultException(partialKeys);
}
} else {
if (partialKeys.getFailure() instanceof RuntimeException) {
throw (RuntimeException) partialKeys.getFailure();
} else {
throw new RuntimeException(partialKeys.getFailure());
}
}
}
}
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
InternalRegion r) {
if (r.getCache().isCacheAtShutdownAll()) {
throw r.getCache().getCacheClosedException("Cache is shutting down");
}
PartitionedRegion pr = (PartitionedRegion) r;
final long startTime = pr.prStats.getTime();
// build all the msgs by bucketid
HashMap<Integer, RemoveAllPRMessage> prMsgMap = op.createPRMessages();
PutAllPartialResult partialKeys = new PutAllPartialResult(op.removeAllDataSize);
successfulOps.clear(); // this is rebuilt by this method
Iterator<Map.Entry<Integer, RemoveAllPRMessage>> itor = prMsgMap.entrySet().iterator();
while (itor.hasNext()) {
Map.Entry<Integer, RemoveAllPRMessage> mapEntry = itor.next();
Integer bucketId = mapEntry.getKey();
RemoveAllPRMessage prMsg = mapEntry.getValue();
pr.checkReadiness();
try {
VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg, pr);
// prMsg.saveKeySet(partialKeys);
partialKeys.addKeysAndVersions(versions);
successfulOps.addAll(versions);
} catch (PutAllPartialResultException pre) {
// sendMsgByBucket applied partial keys
partialKeys.consolidate(pre.getResult());
} catch (Exception ex) {
// If failed at other exception
@Released
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
} finally {
firstEvent.release();
}
}
}
pr.prStats.endRemoveAll(startTime);
if (partialKeys.hasFailure()) {
pr.getCache().getLogger().info(String.format("Region %s removeAll: %s",
new Object[] {pr.getFullPath(), partialKeys}));
if (op.isBridgeOperation()) {
if (partialKeys.getFailure() instanceof CancelException) {
throw (CancelException) partialKeys.getFailure();
} else {
throw new PutAllPartialResultException(partialKeys);
}
} else {
if (partialKeys.getFailure() instanceof RuntimeException) {
throw (RuntimeException) partialKeys.getFailure();
} else {
throw new RuntimeException(partialKeys.getFailure());
}
}
}
}
/*
* If failed after retries, it will throw PartitionedRegionStorageException, no need for return
* value
*/
private VersionedObjectList sendMsgByBucket(final Integer bucketId, PutAllPRMessage prMsg,
PartitionedRegion pr) {
// retry the put remotely until it finds the right node managing the bucket
InternalDistributedMember currentTarget =
pr.getOrCreateNodeForBucketWrite(bucketId.intValue(), null);
if (!currentTarget.equals(this.state.getTarget())) {
@Released
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
throw new TransactionDataNotColocatedException(
String.format("Key %s is not colocated with transaction",
firstEvent.getKey()));
} finally {
firstEvent.release();
}
}
try {
return pr.tryToSendOnePutAllMessage(prMsg, currentTarget);
} catch (ForceReattemptException prce) {
pr.checkReadiness();
throw new TransactionDataNotColocatedException(prce.getMessage());
} catch (PrimaryBucketException notPrimary) {
RuntimeException re = new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
re.initCause(notPrimary);
throw re;
} catch (DataLocationException dle) {
throw new TransactionException(dle);
}
}
/*
* If failed after retries, it will throw PartitionedRegionStorageException, no need for return
* value
*/
private VersionedObjectList sendMsgByBucket(final Integer bucketId, RemoveAllPRMessage prMsg,
PartitionedRegion pr) {
// retry the put remotely until it finds the right node managing the bucket
InternalDistributedMember currentTarget =
pr.getOrCreateNodeForBucketWrite(bucketId.intValue(), null);
if (!currentTarget.equals(this.state.getTarget())) {
@Released
EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
throw new TransactionDataNotColocatedException(
String.format("Key %s is not colocated with transaction",
firstEvent.getKey()));
} finally {
firstEvent.release();
}
}
try {
return pr.tryToSendOneRemoveAllMessage(prMsg, currentTarget);
} catch (ForceReattemptException prce) {
pr.checkReadiness();
throw new TransactionDataNotColocatedException(prce.getMessage());
} catch (PrimaryBucketException notPrimary) {
RuntimeException re = new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
re.initCause(notPrimary);
throw re;
} catch (DataLocationException dle) {
throw new TransactionException(dle);
}
}
@Override
public void cleanup() {}
@Override
protected InternalRegion getRegion() {
return this.region;
}
}