blob: 4d93f9bb67e92034f67bce121199b2c319ed9539 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tx;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DistributedPutAllOperation;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.RemoteOperationException;
import org.apache.geode.internal.cache.TXStateStub;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemoteContainsKeyValueMessage.RemoteContainsKeyValueResponse;
import org.apache.geode.internal.cache.tx.RemoteOperationMessage.RemoteOperationResponse;
import org.apache.geode.internal.cache.tx.RemotePutMessage.PutResult;
import org.apache.geode.internal.cache.tx.RemotePutMessage.RemotePutResponse;
public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
private final DistributedRegion region;
public DistributedTXRegionStub(TXStateStub txstate, DistributedRegion r) {
super(txstate);
this.region = r;
}
@Override
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) {
try {
RemoteOperationResponse response = RemoteDestroyMessage.send(state.getTarget(),
event.getRegion(), event, expectedOldValue, true, false);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public Entry getEntry(KeyInfo keyInfo, boolean allowTombstone) {
try {
// TODO change RemoteFetchEntryMessage to allow tombstones to be returned
RemoteFetchEntryMessage.FetchEntryResponse res = RemoteFetchEntryMessage
.send((InternalDistributedMember) state.getTarget(), region, keyInfo.getKey());
return res.waitForResponse();
} catch (EntryNotFoundException enfe) {
return null;
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (TransactionException e) {
throw e;
} catch (CacheException | RemoteOperationException e) {
throw new TransactionDataNodeHasDepartedException(e);
}
}
@Override
public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) {
try {
RemoteOperationResponse response =
RemoteInvalidateMessage.send(state.getTarget(), event.getRegion(), event, true, false);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public boolean containsKey(KeyInfo keyInfo) {
try {
RemoteContainsKeyValueResponse response = RemoteContainsKeyValueMessage
.send((InternalDistributedMember) state.getTarget(), region, keyInfo.getKey(), false);
return response.waitForContainsResult();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public boolean containsValueForKey(KeyInfo keyInfo) {
try {
RemoteContainsKeyValueResponse response = RemoteContainsKeyValueMessage
.send((InternalDistributedMember) state.getTarget(), region, keyInfo.getKey(), true);
return response.waitForContainsResult();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public Object findObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
Object value, boolean preferCD, ClientProxyMembershipID requestingClient,
EntryEventImpl clientEvent) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
try {
RemoteGetMessage.RemoteGetResponse response =
RemoteGetMessage.send((InternalDistributedMember) state.getTarget(), region, key,
callbackArgument, requestingClient);
retVal = response.waitForResponse(preferCD);
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
return retVal;
}
@Override
public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone) {
return getEntry(keyInfo, allowTombstone);
}
@Override
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
boolean retVal = false;
final InternalRegion r = event.getRegion();
try {
RemotePutResponse response = RemotePutMessage.txSend(state.getTarget(), r, event,
lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
PutResult result = response.waitForResult();
event.setOldValue(result.oldValue, true/* force */);
retVal = result.returnValue;
} catch (TransactionDataNotColocatedException enfe) {
throw enfe;
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (CacheException | RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
return retVal;
}
@Override
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
InternalRegion region) {
try {
RemotePutAllMessage.PutAllResponse response =
RemotePutAllMessage.send(state.getTarget(), putallOp.getBaseEvent(),
putallOp.getPutAllEntryData(), putallOp.getPutAllEntryData().length, true, false);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
InternalRegion region) {
try {
RemoteRemoveAllMessage.RemoveAllResponse response =
RemoteRemoveAllMessage.send(state.getTarget(), op.getBaseEvent(),
op.getRemoveAllEntryData(), op.getRemoveAllEntryData().length, true, false);
response.waitForRemoteResponse();
} catch (RegionDestroyedException rde) {
throw new TransactionDataNotColocatedException(
String.format("Region %s not colocated with other regions in transaction",
rde.getRegionFullPath()),
rde);
} catch (RemoteOperationException roe) {
throw new TransactionDataNodeHasDepartedException(roe);
}
}
@Override
public void cleanup() {}
@Override
protected InternalRegion getRegion() {
return this.region;
}
}