| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.tx; |
| |
| import java.util.Collections; |
| import java.util.Map; |
| |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.EntryNotFoundException; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.RemoteTransactionException; |
| import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException; |
| import com.gemstone.gemfire.cache.TransactionDataNotColocatedException; |
| import com.gemstone.gemfire.cache.TransactionException; |
| import com.gemstone.gemfire.cache.Region.Entry; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation; |
| import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation; |
| import com.gemstone.gemfire.internal.cache.EntryEventImpl; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.KeyInfo; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionException; |
| import com.gemstone.gemfire.internal.cache.RemoteContainsKeyValueMessage; |
| import com.gemstone.gemfire.internal.cache.RemoteDestroyMessage; |
| import com.gemstone.gemfire.internal.cache.RemoteFetchEntryMessage; |
| import com.gemstone.gemfire.internal.cache.RemoteGetMessage; |
| import com.gemstone.gemfire.internal.cache.RemoteInvalidateMessage; |
| import com.gemstone.gemfire.internal.cache.RemoteOperationException; |
| import com.gemstone.gemfire.internal.cache.RemotePutAllMessage; |
| import com.gemstone.gemfire.internal.cache.RemotePutMessage; |
| import com.gemstone.gemfire.internal.cache.RemoteRemoveAllMessage; |
| import com.gemstone.gemfire.internal.cache.TXStateStub; |
| import com.gemstone.gemfire.internal.cache.RemoteContainsKeyValueMessage.RemoteContainsKeyValueResponse; |
| import com.gemstone.gemfire.internal.cache.RemoteOperationMessage.RemoteOperationResponse; |
| import com.gemstone.gemfire.internal.cache.RemotePutMessage.PutResult; |
| import com.gemstone.gemfire.internal.cache.RemotePutMessage.RemotePutResponse; |
| import com.gemstone.gemfire.internal.cache.partitioned.RemoteSizeMessage; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock; |
| |
| public class DistributedTXRegionStub extends AbstractPeerTXRegionStub { |
| |
| private final LocalRegion region; |
| |
| public DistributedTXRegionStub(TXStateStub txstate,LocalRegion r) { |
| super(txstate,r); |
| this.region = r; |
| } |
| |
| |
| public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite, |
| Object expectedOldValue) { |
| // TODO Auto-generated method stub |
| //this.prStats.incPartitionMessagesSent(); |
| try { |
| RemoteOperationResponse response = RemoteDestroyMessage.send(state.getTarget(), |
| event.getLocalRegion(), |
| event, |
| expectedOldValue, DistributionManager.PARTITIONED_REGION_EXECUTOR, true, false); |
| response.waitForCacheException(); |
| } |
| catch (EntryNotFoundException enfe) { |
| throw enfe; |
| }catch (TransactionDataNotColocatedException enfe) { |
| throw enfe; |
| } |
| catch (CacheException ce) { |
| throw new PartitionedRegionException(LocalizedStrings.PartitionedRegion_DESTROY_OF_ENTRY_ON_0_FAILED.toLocalizedString(state.getTarget()), ce); |
| } |
| catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch(RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| } |
| |
| |
| public Entry getEntry(KeyInfo keyInfo, boolean allowTombstone) { |
| try { |
| // TODO change RemoteFetchEntryMessage to allow tombstones to be returned |
| RemoteFetchEntryMessage.FetchEntryResponse res = RemoteFetchEntryMessage.send((InternalDistributedMember)state.getTarget(), region, keyInfo.getKey()); |
| //this.prStats.incPartitionMessagesSent(); |
| return res.waitForResponse(); |
| } catch (EntryNotFoundException enfe) { |
| return null; |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch (TransactionException e) { |
| RuntimeException re = new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(keyInfo.getKey())); |
| re.initCause(e); |
| throw re; |
| } catch (RemoteOperationException e) { |
| throw new TransactionDataNodeHasDepartedException(e); |
| } |
| } |
| |
| |
| public void invalidateExistingEntry(EntryEventImpl event, |
| boolean invokeCallbacks, boolean forceNewEntry) { |
| try { |
| RemoteOperationResponse response = RemoteInvalidateMessage.send(state.getTarget(), |
| event.getRegion(), event, |
| DistributionManager.PARTITIONED_REGION_EXECUTOR, true, false); |
| response.waitForCacheException(); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch(RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| } |
| |
| |
| public boolean containsKey(KeyInfo keyInfo) { |
| try { |
| RemoteContainsKeyValueResponse response = RemoteContainsKeyValueMessage.send((InternalDistributedMember)state.getTarget(), |
| region, keyInfo.getKey(), false); |
| return response.waitForContainsResult(); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch(RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| } |
| |
| |
| public boolean containsValueForKey(KeyInfo keyInfo) { |
| try { |
| RemoteContainsKeyValueResponse response = RemoteContainsKeyValueMessage.send((InternalDistributedMember)state.getTarget(), |
| region, keyInfo.getKey(), true); |
| return response.waitForContainsResult(); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch(RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| } |
| |
| |
| public Object findObject(KeyInfo keyInfo, boolean isCreate, |
| boolean generateCallbacks, Object value, boolean preferCD, |
| ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS) { |
| Object retVal = null; |
| final Object key = keyInfo.getKey(); |
| final Object callbackArgument = keyInfo.getCallbackArg(); |
| try { |
| RemoteGetMessage.RemoteGetResponse response = RemoteGetMessage.send((InternalDistributedMember)state.getTarget(), region, key, |
| callbackArgument, requestingClient); |
| retVal = response.waitForResponse(preferCD); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch(RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| return retVal; |
| } |
| |
| |
| public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone) { |
| return getEntry(keyInfo, allowTombstone); |
| } |
| |
| |
| public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld, |
| Object expectedOldValue, boolean requireOldValue, long lastModified, |
| boolean overwriteDestroyed) { |
| boolean retVal = false; |
| final LocalRegion r = event.getLocalRegion(); |
| |
| try { |
| RemotePutResponse response = RemotePutMessage.txSend(state.getTarget(),r,event,lastModified,ifNew,ifOld,expectedOldValue,requireOldValue); |
| PutResult result = response.waitForResult(); |
| event.setOldValue(result.oldValue, true/*force*/); |
| retVal = result.returnValue; |
| }catch (TransactionDataNotColocatedException enfe) { |
| throw enfe; |
| } |
| catch (CacheException ce) { |
| throw new PartitionedRegionException(LocalizedStrings.PartitionedRegion_DESTROY_OF_ENTRY_ON_0_FAILED.toLocalizedString(state.getTarget()), ce); |
| } |
| catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch(RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| return retVal; |
| } |
| |
| |
| public int entryCount() { |
| try { |
| RemoteSizeMessage.SizeResponse response = RemoteSizeMessage.send(Collections.singleton(state.getTarget()), region); |
| return response.waitForSize(); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch (Exception e) { |
| throw new TransactionException(e); |
| } |
| } |
| |
| |
| public void postPutAll(DistributedPutAllOperation putallOp, |
| VersionedObjectList successfulPuts, LocalRegion region) { |
| try { |
| RemotePutAllMessage.PutAllResponse response = RemotePutAllMessage.send(state.getTarget(), putallOp.getBaseEvent(), putallOp.getPutAllEntryData(), putallOp.getPutAllEntryData().length, true, DistributionManager.PARTITIONED_REGION_EXECUTOR, false); |
| response.waitForCacheException(); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch (RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| } |
| @Override |
| public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps, LocalRegion region) { |
| try { |
| RemoteRemoveAllMessage.RemoveAllResponse response = RemoteRemoveAllMessage.send(state.getTarget(), op.getBaseEvent(), op.getRemoveAllEntryData(), op.getRemoveAllEntryData().length, true, DistributionManager.PARTITIONED_REGION_EXECUTOR, false); |
| response.waitForCacheException(); |
| } catch (RegionDestroyedException rde) { |
| throw new TransactionDataNotColocatedException(LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION |
| .toLocalizedString(rde.getRegionFullPath()), rde); |
| } catch (RemoteOperationException roe) { |
| throw new TransactionDataNodeHasDepartedException(roe); |
| } |
| } |
| |
| @Override |
| public void cleanup() { |
| } |
| } |