blob: 677459b1d28798861678b5b9a176882266974664 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tx;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DistributedPutAllOperation;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RemoteOperationException;
import org.apache.geode.internal.cache.TXStateStub;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemoteContainsKeyValueMessage.RemoteContainsKeyValueResponse;
import org.apache.geode.internal.cache.tx.RemoteOperationMessage.RemoteOperationResponse;
import org.apache.geode.internal.cache.tx.RemotePutMessage.PutResult;
import org.apache.geode.internal.cache.tx.RemotePutMessage.RemotePutResponse;
public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
private final DistributedRegion region;
public DistributedTXRegionStub(TXStateStub txstate, DistributedRegion r) {
super(txstate);
region = r;
}
@Override
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) {
try {
RemoteOperationResponse response = sendRemoteDestroyMessage(event, expectedOldValue);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
RemoteOperationResponse sendRemoteDestroyMessage(EntryEventImpl event, Object expectedOldValue)
throws RemoteOperationException {
return RemoteDestroyMessage.send(state.getTarget(), event.getRegion(), event, expectedOldValue,
true, false);
}
@Override
public Entry getEntry(KeyInfo keyInfo, boolean allowTombstone) {
try {
// TODO change RemoteFetchEntryMessage to allow tombstones to be returned
RemoteFetchEntryMessage.FetchEntryResponse res = sendRemoteFetchEntryMessage(
(InternalDistributedMember) state.getTarget(), region, keyInfo.getKey());
return res.waitForResponse();
} catch (EntryNotFoundException enfe) {
return null;
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (TransactionException e) {
throw e;
} catch (CacheException | RemoteOperationException e) {
throw new TransactionDataNodeHasDepartedException(e);
}
}
RemoteFetchEntryMessage.FetchEntryResponse sendRemoteFetchEntryMessage(
InternalDistributedMember recipient, LocalRegion region, Object key)
throws RemoteOperationException {
return RemoteFetchEntryMessage.send(recipient, region, key);
}
@Override
public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) {
try {
RemoteOperationResponse response = sendRemoteInvalidateMessage(state.getTarget(), event);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
RemoteOperationResponse sendRemoteInvalidateMessage(DistributedMember recipient,
EntryEventImpl event) throws RemoteOperationException {
return RemoteInvalidateMessage.send(recipient, event.getRegion(), event, true, false);
}
@Override
public boolean containsKey(KeyInfo keyInfo) {
try {
RemoteContainsKeyValueResponse response = sendRemoteContainsKeyValueMessage(
(InternalDistributedMember) state.getTarget(), keyInfo.getKey(), false);
return response.waitForContainsResult();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public boolean containsValueForKey(KeyInfo keyInfo) {
try {
RemoteContainsKeyValueResponse response = sendRemoteContainsKeyValueMessage(
(InternalDistributedMember) state.getTarget(), keyInfo.getKey(), true);
return response.waitForContainsResult();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
RemoteContainsKeyValueResponse sendRemoteContainsKeyValueMessage(
InternalDistributedMember recipient, Object key, boolean valueCheck)
throws RemoteOperationException {
return RemoteContainsKeyValueMessage.send(recipient, region, key, valueCheck);
}
@Override
public Object findObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
Object value, boolean preferCD, ClientProxyMembershipID requestingClient,
EntryEventImpl clientEvent) {
Object retVal;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
try {
RemoteGetMessage.RemoteGetResponse response = sendRemoteGetMessage(
(InternalDistributedMember) state.getTarget(), key, callbackArgument, requestingClient);
retVal = response.waitForResponse(preferCD);
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
return retVal;
}
RemoteGetMessage.RemoteGetResponse sendRemoteGetMessage(InternalDistributedMember recipient,
final Object key, final Object callbackArgument, ClientProxyMembershipID requestingClient)
throws RemoteOperationException {
return RemoteGetMessage.send(recipient, region, key, callbackArgument, requestingClient);
}
@Override
public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone) {
return getEntry(keyInfo, allowTombstone);
}
@Override
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
boolean retVal;
final InternalRegion r = event.getRegion();
try {
RemotePutResponse response = txSendRemotePutMessage(state.getTarget(), r, event,
lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
PutResult result = response.waitForResult();
event.setOldValue(result.oldValue, true/* force */);
retVal = result.returnValue;
} catch (TransactionDataNotColocatedException enfe) {
throw enfe;
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (CacheException | RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
return retVal;
}
RemotePutResponse txSendRemotePutMessage(DistributedMember recipient, InternalRegion r,
EntryEventImpl event, final long lastModified, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue) throws RemoteOperationException {
return RemotePutMessage.txSend(recipient, r, event, lastModified, ifNew, ifOld,
expectedOldValue, requireOldValue);
}
@Override
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
InternalRegion region) {
try {
RemotePutAllMessage.PutAllResponse response =
sendRemotePutAllMessage(state.getTarget(), putallOp.getBaseEvent(),
putallOp.getPutAllEntryData(), putallOp.getPutAllEntryData().length);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
RemotePutAllMessage.PutAllResponse sendRemotePutAllMessage(DistributedMember recipient,
EntryEventImpl event, DistributedPutAllOperation.PutAllEntryData[] putAllData,
int putAllDataCount) throws RemoteOperationException {
return RemotePutAllMessage.send(recipient, event,
putAllData, putAllDataCount, true, false);
}
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
InternalRegion region) {
try {
RemoteRemoveAllMessage.RemoveAllResponse response =
sendRemoteRemoveAllMessage(state.getTarget(), op.getBaseEvent(),
op.getRemoveAllEntryData(), op.getRemoveAllEntryData().length);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
RemoteRemoveAllMessage.RemoveAllResponse sendRemoteRemoveAllMessage(DistributedMember recipient,
EntryEventImpl event, DistributedRemoveAllOperation.RemoveAllEntryData[] removeAllData,
int removeAllDataCount) throws RemoteOperationException {
return RemoteRemoveAllMessage.send(recipient, event, removeAllData, removeAllDataCount, true,
false);
}
@Override
public void cleanup() {}
@Override
protected InternalRegion getRegion() {
return region;
}
}