blob: 7670ef9d34065d938f35ab3564bef94e9e6ab003 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.ContainsKeyOp.MODE;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.cache.util.BridgeClient;
import com.gemstone.gemfire.cache.util.BridgeLoader;
import com.gemstone.gemfire.cache.util.BridgeWriter;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.internal.cache.AbstractRegion;
import com.gemstone.gemfire.internal.cache.BridgeObserver;
import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.TXCommitMessage;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
import com.gemstone.gemfire.internal.cache.tier.InterestType;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList.Iterator;
import com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub;
import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* Used to send region operations from a client to a server
* @author darrel
* @since 5.7
*/
@SuppressWarnings("deprecation")
public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAccess {
private static final Logger logger = LogService.getLogger();
private final LocalRegion region;
private final String regionName;
/**
* Creates a server region proxy for the given region.
* @param r the region
* @throws IllegalStateException if the region does not have a pool
*/
public ServerRegionProxy(Region r) {
super(calcPool(r));
assert r instanceof LocalRegion;
this.region = (LocalRegion)r;
this.regionName = r.getFullPath();
}
/**
* Used by tests to create proxies for "fake" regions.
* Also, used by ClientStatsManager for admin region.
*/
public ServerRegionProxy(String regionName, PoolImpl pool) {
super(pool);
this.region = null;
this.regionName = regionName;
}
private static InternalPool calcPool(Region r) {
String poolName = r.getAttributes().getPoolName();
if (poolName == null || "".equals(poolName)) {
final CacheLoader cl = r.getAttributes().getCacheLoader();
final CacheWriter cw = r.getAttributes().getCacheWriter();
if (AbstractRegion.isBridgeLoader(cl) || AbstractRegion.isBridgeWriter(cw)) {
Object loaderPool = null;
Object writerPool = null;
if (AbstractRegion.isBridgeLoader(cl)) {
if (cl instanceof BridgeLoader) {
loaderPool = ((BridgeLoader)cl).getConnectionProxy();
} else {
loaderPool = ((BridgeClient)cl).getConnectionProxy();
}
}
if (AbstractRegion.isBridgeWriter(cw)) {
writerPool = ((BridgeWriter)cw).getConnectionProxy();
}
if (loaderPool != writerPool && loaderPool != null && writerPool != null) {
throw new IllegalStateException("The region " + r.getFullPath()
+ " has a BridgeLoader and a BridgeWriter/BridgeClient "
+ " that are configured with different connection pools. "
+ " This is not allowed. Instead create a single BridgeClient and install it as both the loader and the writer."
+ " loaderPool="+loaderPool + " writerPool=" + writerPool);
}
InternalPool result = (InternalPool)loaderPool;
if (result == null) {
result = (InternalPool)writerPool;
}
return result;
} else {
throw new IllegalStateException("The region " + r.getFullPath()
+ " did not have a client pool configured.");
}
} else {
InternalPool pool = (InternalPool)PoolManager.find(poolName);
if (pool == null) {
throw new IllegalStateException("The pool " + poolName
+ " does not exist.");
}
return pool;
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#get(java.lang.Object, java.lang.Object)
*/
public Object get(Object key, Object callbackArg, EntryEventImpl clientEvent) {
recordTXOperation(ServerRegionOperation.GET, key, callbackArg);
return GetOp.execute(this.pool, this.region, key, callbackArg, this.pool.getPRSingleHopEnabled(), clientEvent);
}
public int size() {
return SizeOp.execute(this.pool, this.regionName);
}
/**
* Do not call this method if the value is Delta instance. Exclicitly passing
* <code>Operation.CREATE</code> to the <code>PutOp.execute()</code>
* method as the caller of this method does not put Delta instances as value.
*
* @param key
* @param value
* @param event
* @param callbackArg
*/
public Object putForMetaRegion(Object key,
Object value,
byte[] deltaBytes,
EntryEventImpl event,
Object callbackArg,
boolean isMetaRegionPutOp)
{
if (this.region == null) {
return PutOp.execute(this.pool, this.regionName, key, value, deltaBytes, event,
Operation.CREATE,
false, null,
callbackArg, this.pool.getPRSingleHopEnabled(),
isMetaRegionPutOp);
} else {
return PutOp.execute(this.pool, this.region, key, value, deltaBytes,
event, Operation.CREATE, false, null, callbackArg, this.pool
.getPRSingleHopEnabled());
}
}
public Object put(Object key,
Object value,
byte[] deltaBytes,
EntryEventImpl event,
Operation op,
boolean requireOldValue, Object expectedOldValue,
Object callbackArg,
boolean isCreate)
{
recordTXOperation(ServerRegionOperation.PUT, key, value, deltaBytes,
event.getEventId(), op, Boolean.valueOf(requireOldValue), expectedOldValue,
callbackArg, Boolean.valueOf(isCreate));
Operation operation = op;
if (!isCreate && this.region.getDataPolicy() == DataPolicy.EMPTY
&& op.isCreate() && op != Operation.PUT_IF_ABSENT) {
operation = Operation.UPDATE;
}
if (this.region == null) {
return PutOp.execute(this.pool, this.regionName, key, value, deltaBytes,
event, operation, requireOldValue, expectedOldValue, callbackArg,
this.pool.getPRSingleHopEnabled(), false);
}
else {
return PutOp.execute(this.pool, this.region, key, value, deltaBytes,
event, operation, requireOldValue, expectedOldValue, callbackArg,
this.pool.getPRSingleHopEnabled());
}
}
/**
* Does a region put on the server using the given connection.
* @param con the connection to use to send to the server
* @param key the entry key to do the put on
* @param value the entry value to put
* @param eventId the event ID for this put
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void putOnForTestsOnly(Connection con,
Object key,
Object value,
EventID eventId,
Object callbackArg)
{
EntryEventImpl event = new EntryEventImpl(eventId);
PutOp.execute(con, this.pool, this.regionName, key, value, event, callbackArg, this.pool.getPRSingleHopEnabled());
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#destroy(java.lang.Object, java.lang.Object, com.gemstone.gemfire.cache.Operation, com.gemstone.gemfire.internal.cache.EventID, java.lang.Object)
*/
public Object destroy(Object key,
Object expectedOldValue,
Operation operation,
EntryEventImpl event,
Object callbackArg)
{
if (event.isBulkOpInProgress()) {
// this is a removeAll, ignore this!
return null;
}
recordTXOperation(ServerRegionOperation.DESTROY, key, expectedOldValue, operation, event.getEventId(), callbackArg);
return DestroyOp.execute(this.pool, this.region, key, expectedOldValue,
operation, event, callbackArg, this.pool.getPRSingleHopEnabled());
}
public void invalidate(EntryEventImpl event) {
recordTXOperation(ServerRegionOperation.INVALIDATE, event.getKey(), event);
InvalidateOp.execute(this.pool, this.region.getFullPath(),
event);
}
/**
* Does a region entry destroy on the server using the given connection.
* @param con the connection to use to send to the server
* @param key the entry key to do the destroy on
* @param expectedOldValue the value that the entry must have to perform the operation, or null
* @param operation the operation being performed (Operation.DESTROY, Operation.REMOVE)
* @param event the event for this destroy operation
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void destroyOnForTestsOnly(Connection con,
Object key,
Object expectedOldValue,
Operation operation,
EntryEventImpl event,
Object callbackArg)
{
DestroyOp.execute(con, this.pool, this.regionName, key,
expectedOldValue, operation, event, callbackArg);
}
/**
* Does a region destroy on the server
* @param eventId the event id for this destroy
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void destroyRegion(EventID eventId,
Object callbackArg)
{
DestroyRegionOp.execute(this.pool, this.regionName, eventId, callbackArg);
}
/**
* Does a region destroy on the server using the given connection.
* @param con the connection to use to send to the server
* @param eventId the event id for this destroy
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void destroyRegionOnForTestsOnly(Connection con,
EventID eventId,
Object callbackArg)
{
DestroyRegionOp.execute(con, this.pool, this.regionName, eventId, callbackArg);
}
public TXCommitMessage commit(int txId) {
TXCommitMessage tx = CommitOp.execute(this.pool,txId);
return tx;
}
public void rollback(int txId) {
RollbackOp.execute(this.pool, txId);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#clear(com.gemstone.gemfire.internal.cache.EventID, java.lang.Object)
*/
public void clear(EventID eventId,
Object callbackArg)
{
ClearOp.execute(this.pool, this.regionName, eventId, callbackArg);
}
/**
* Does a region clear on the server using the given connection.
* @param con the connection to use to send to the server
* @param eventId the event id for this clear
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void clearOnForTestsOnly(Connection con,
EventID eventId,
Object callbackArg)
{
ClearOp.execute(con, this.pool, this.regionName, eventId, callbackArg);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
*/
public boolean containsKey(Object key) {
recordTXOperation(ServerRegionOperation.CONTAINS_KEY, key);
return ContainsKeyOp.execute(this.pool, this.regionName, key,MODE.KEY);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
*/
public boolean containsValueForKey(Object key) {
recordTXOperation(ServerRegionOperation.CONTAINS_VALUE_FOR_KEY, key);
return ContainsKeyOp.execute(this.pool, this.regionName, key,MODE.VALUE_FOR_KEY);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
*/
public boolean containsValue(Object value) {
recordTXOperation(ServerRegionOperation.CONTAINS_VALUE, null, value);
return ContainsKeyOp.execute(this.pool, this.regionName, value,MODE.VALUE);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#keySet()
*/
public Set keySet() {
recordTXOperation(ServerRegionOperation.KEY_SET, null);
return KeySetOp.execute(this.pool, this.regionName);
}
/**
* Does a region registerInterest on a server
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterest(final Object key,
final int interestType,
final InterestResultPolicy policy,
final boolean isDurable,
final byte regionDataPolicy)
{
return registerInterest(key, interestType, policy, isDurable, false, regionDataPolicy);
}
/**
* Does a region registerInterest on a server
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterest(final Object key,
final int interestType,
final InterestResultPolicy policy,
final boolean isDurable,
final boolean receiveUpdatesAsInvalidates,
final byte regionDataPolicy)
{
if (interestType == InterestType.KEY
&& key instanceof List) {
return registerInterestList((List)key, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
} else {
final RegisterInterestTracker rit = this.pool.getRITracker();
List result = null;
boolean finished = false;
try {
// register with the tracker early
rit.addSingleInterest(this.region, key, interestType, policy, isDurable, receiveUpdatesAsInvalidates);
result = RegisterInterestOp.execute(this.pool, this.regionName, key,
interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
//////// TEST PURPOSE ONLY ///////////
if (PoolImpl.AFTER_REGISTER_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterInterestRegistration();
}
/////////////////////////////////////////
finished = true;
return result;
}
finally {
if (!finished) {
rit.removeSingleInterest(this.region, key, interestType,
isDurable, receiveUpdatesAsInvalidates);
}
}
}
}
/**
* Support for server-side interest registration
*/
public void addSingleInterest(Object key, int interestType,
InterestResultPolicy pol, boolean isDurable, boolean receiveUpdatesAsInvalidates) {
RegisterInterestTracker rit = this.pool.getRITracker();
boolean finished = false;
try {
rit.addSingleInterest(this.region, key, interestType, pol,
isDurable, receiveUpdatesAsInvalidates);
finished = true;
}
finally {
if (!finished) {
rit.removeSingleInterest(this.region, key, interestType,
isDurable, receiveUpdatesAsInvalidates);
}
}
}
public void addListInterest(List keys, InterestResultPolicy pol,
boolean isDurable, boolean receiveUpdatesAsInvalidates) {
RegisterInterestTracker rit = this.pool.getRITracker();
boolean finished = false;
try {
rit.addInterestList(this.region, keys, pol, isDurable,
receiveUpdatesAsInvalidates);
finished = true;
}
finally {
if (!finished) {
rit.removeInterestList(this.region, keys, isDurable,
receiveUpdatesAsInvalidates);
}
}
}
/**
* Support for server-side interest registration
*/
public void removeSingleInterest(Object key, int interestType,
boolean isDurable, boolean receiveUpdatesAsInvalidates) {
this.pool.getRITracker()
.removeSingleInterest(this.region, key, interestType,
isDurable, receiveUpdatesAsInvalidates);
}
public void removeListInterest(List keys, boolean isDurable,
boolean receiveUpdatesAsInvalidates) {
this.pool.getRITracker().removeInterestList(this.region, keys, isDurable,
receiveUpdatesAsInvalidates);
}
/**
* Does a region registerInterest on a server described by the given server location
* <p>Note that this call by-passes the RegisterInterestTracker.
* @param sl the server to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestOn(ServerLocation sl,
final Object key,
final int interestType,
final InterestResultPolicy policy,
final boolean isDurable,
final byte regionDataPolicy)
{
return registerInterestOn(sl, key, interestType, policy, isDurable, false, regionDataPolicy);
}
/**
* Does a region registerInterest on a server described by the given server location
* <p>Note that this call by-passes the RegisterInterestTracker.
* @param sl the server to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestOn(ServerLocation sl,
final Object key,
final int interestType,
final InterestResultPolicy policy,
final boolean isDurable,
final boolean receiveUpdatesAsInvalidates,
final byte regionDataPolicy)
{
if (interestType == InterestType.KEY
&& key instanceof List) {
return RegisterInterestListOp.executeOn(sl, this.pool, this.regionName,
(List)key, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
} else {
return RegisterInterestOp.executeOn(sl, this.pool, this.regionName, key,
interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
}
}
/**
* Does a region registerInterest on a server described by the given connection
* <p>Note that this call by-passes the RegisterInterestTracker.
* @param conn the connection to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestOn(Connection conn,
final Object key,
final int interestType,
final InterestResultPolicy policy,
final boolean isDurable,
final byte regionDataPolicy)
{
return registerInterestOn(conn, key, interestType, policy, isDurable, false, regionDataPolicy);
}
/**
* Does a region registerInterest on a server described by the given connection
* <p>Note that this call by-passes the RegisterInterestTracker.
* @param conn the connection to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestOn(Connection conn,
final Object key,
final int interestType,
final InterestResultPolicy policy,
final boolean isDurable,
final boolean receiveUpdatesAsInvalidates,
final byte regionDataPolicy)
{
if (interestType == InterestType.KEY
&& key instanceof List) {
return RegisterInterestListOp.executeOn(conn, this.pool, this.regionName,
(List)key, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
} else {
return RegisterInterestOp.executeOn(conn, this.pool, this.regionName,
key, interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
}
}
/**
* Does a region registerInterestList on a server
* @param keys list of keys we are interested in
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestList(List keys,
InterestResultPolicy policy,
boolean isDurable,
boolean receiveUpdatesAsInvalidates,
final byte regionDataPolicy)
{
final RegisterInterestTracker rit = this.pool.getRITracker();
List result = null;
boolean finished = false;
try {
// register with the tracker early
rit.addInterestList(this.region, keys, policy, isDurable, receiveUpdatesAsInvalidates);
result = RegisterInterestListOp.execute(this.pool, this.regionName, keys, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
finished = true;
//////// TEST PURPOSE ONLY ///////////
if (PoolImpl.AFTER_REGISTER_CALLBACK_FLAG) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterInterestRegistration();
}
/////////////////////////////////////////
return result;
}
finally {
if (!finished) {
rit.removeInterestList(this.region, keys, isDurable, receiveUpdatesAsInvalidates);
}
}
}
/**
* Does a region unregisterInterest on a server
* @param key describes what we are no longer interested in
* @param interestType the {@link InterestType} for this unregister
* @param isClosing true if this unregister is done by a close
* @param keepAlive true if this unregister should not undo a durable registration
*/
public void unregisterInterest(Object key,
int interestType,
boolean isClosing,
boolean keepAlive)
{
if (interestType == InterestType.KEY
&& key instanceof List) {
unregisterInterestList((List)key, isClosing, keepAlive);
} else {
RegisterInterestTracker rit = this.pool.getRITracker();
boolean removed =
rit.removeSingleInterest(this.region, key, interestType, false, false) ||
rit.removeSingleInterest(this.region, key, interestType, true, false) ||
rit.removeSingleInterest(this.region, key, interestType, false, true) ||
rit.removeSingleInterest(this.region, key, interestType, true, true);
if (removed) {
UnregisterInterestOp.execute(this.pool, this.regionName, key, interestType, isClosing, keepAlive);
}
}
}
/**
* Does a region unregisterInterestList on a server
* @param keys list of keys we are interested in
* @param isClosing true if this unregister is done by a close
* @param keepAlive true if this unregister should not undo a durable registration
*/
public void unregisterInterestList(List keys,
boolean isClosing,
boolean keepAlive)
{
RegisterInterestTracker rit = this.pool.getRITracker();
boolean removed =
rit.removeInterestList(this.region, keys, false, true) ||
rit.removeInterestList(this.region, keys, false, false) ||
rit.removeInterestList(this.region, keys, true, true) ||
rit.removeInterestList(this.region, keys, true, false);
if (removed) {
UnregisterInterestListOp.execute(this.pool, this.regionName, keys, isClosing, keepAlive);
}
}
public List getInterestList(int interestType) {
return this.pool.getRITracker().getInterestList(this.regionName,
interestType);
}
@Override
public VersionedObjectList putAll(Map map, EventID eventId, boolean skipCallbacks, Object callbackArg) {
recordTXOperation(ServerRegionOperation.PUT_ALL, null, map, eventId);
int txID = TXManagerImpl.getCurrentTXUniqueId();
if (this.pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
return PutAllOp.execute(this.pool, this.region, map, eventId, skipCallbacks, this.pool.getRetryAttempts(), callbackArg);
}
else {
return PutAllOp.execute(this.pool, this.region, map, eventId, skipCallbacks, false, callbackArg);
}
}
@Override
public VersionedObjectList removeAll(Collection<Object> keys, EventID eventId, Object callbackArg) {
recordTXOperation(ServerRegionOperation.REMOVE_ALL, null, keys, eventId);
int txID = TXManagerImpl.getCurrentTXUniqueId();
if (this.pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
return RemoveAllOp.execute(this.pool, this.region, keys, eventId, this.pool.getRetryAttempts(), callbackArg);
}
else {
return RemoveAllOp.execute(this.pool, this.region, keys, eventId, false, callbackArg);
}
}
@Override
public VersionedObjectList getAll(List keys, Object callback) {
recordTXOperation(ServerRegionOperation.GET_ALL, null, keys);
int txID = TXManagerImpl.getCurrentTXUniqueId();
VersionedObjectList result;
if (this.pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
result = GetAllOp.execute(this.pool, this.region, keys,this.pool.getRetryAttempts(), callback);
}
else {
result = GetAllOp.execute(this.pool, this.regionName, keys, callback);
}
if (result != null) {
for (Iterator it=result.iterator(); it.hasNext(); ) {
VersionedObjectList.Entry entry = it.next();
Object key = entry.getKey();
Object value = entry.getValue();
boolean isOnServer = entry.isKeyNotOnServer();
if (!isOnServer) {
if (value instanceof Throwable) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.GetAll_0_CAUGHT_THE_FOLLOWING_EXCEPTION_ATTEMPTING_TO_GET_VALUE_FOR_KEY_1,
new Object[]{value, key}), (Throwable)value);
}
}
}
}
return result;
}
/**
* Release use of this pool
*/
public void detach(boolean keepalive) {
this.pool.getRITracker().unregisterRegion(this, keepalive);
super.detach();
}
public String getRegionName() {
return this.regionName;
}
public Region getRegion() {
return this.region;
}
public void executeFunction(String rgnName, Function function,
ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector, byte hasResult, boolean replaying) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(1),
function, serverRegionExecutor, resultCollector,
Byte.valueOf(hasResult));
int retryAttempts = pool.getRetryAttempts();
if (this.pool.getPRSingleHopEnabled()) {
ClientMetadataService cms = region.getCache().getClientMetadataService();
if (cms.isMetadataStable()) {
if (serverRegionExecutor.getFilter().isEmpty()) {
HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = cms
.groupByServerToAllBuckets(this.region,
function.optimizeForWrite());
if (serverToBuckets == null || serverToBuckets.isEmpty()) {
ExecuteRegionFunctionOp
.execute(this.pool, rgnName, function, serverRegionExecutor,
resultCollector, hasResult, retryAttempts);
cms.scheduleGetPRMetaData(region, false);
} else {
ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
function, serverRegionExecutor, resultCollector, hasResult,
serverToBuckets, retryAttempts, true);
}
} else {
boolean isBucketFilter = serverRegionExecutor.getExecuteOnBucketSetFlag();
Map<ServerLocation, HashSet> serverToFilterMap = cms
.getServerToFilterMap(serverRegionExecutor.getFilter(), region,
function.optimizeForWrite(), isBucketFilter);
if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
ExecuteRegionFunctionOp
.execute(this.pool, rgnName, function, serverRegionExecutor,
resultCollector, hasResult, retryAttempts);
cms.scheduleGetPRMetaData(region, false);
} else {
ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
function, serverRegionExecutor, resultCollector, hasResult,
serverToFilterMap, retryAttempts, isBucketFilter);
}
}
} else {
cms.scheduleGetPRMetaData(region, false);
ExecuteRegionFunctionOp.execute(this.pool, rgnName, function,
serverRegionExecutor, resultCollector, hasResult, retryAttempts);
}
}
else {
ExecuteRegionFunctionOp.execute(this.pool, rgnName, function,
serverRegionExecutor, resultCollector, hasResult, retryAttempts);
}
}
public void executeFunction(String rgnName, String functionId,
ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector, byte hasResult, boolean isHA, boolean optimizeForWrite,
boolean replaying) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(2),
functionId, serverRegionExecutor, resultCollector, Byte.valueOf(hasResult),
Boolean.valueOf(isHA), Boolean.valueOf(optimizeForWrite));
int retryAttempts = pool.getRetryAttempts();
if (this.pool.getPRSingleHopEnabled()) {
ClientMetadataService cms = this.region.getCache()
.getClientMetadataService();
if (cms.isMetadataStable()) {
if (serverRegionExecutor.getFilter().isEmpty()) {
HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = cms
.groupByServerToAllBuckets(this.region, optimizeForWrite);
if (serverToBuckets == null || serverToBuckets.isEmpty()) {
ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
serverRegionExecutor, resultCollector, hasResult,
retryAttempts, isHA, optimizeForWrite);
cms.scheduleGetPRMetaData(this.region, false);
} else {
ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
functionId, serverRegionExecutor, resultCollector, hasResult,
serverToBuckets, retryAttempts, true, isHA, optimizeForWrite);
}
} else {
boolean isBucketsAsFilter = serverRegionExecutor.getExecuteOnBucketSetFlag();
Map<ServerLocation, HashSet> serverToFilterMap = cms
.getServerToFilterMap(serverRegionExecutor.getFilter(), region,
optimizeForWrite, isBucketsAsFilter);
if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
serverRegionExecutor, resultCollector, hasResult,
retryAttempts, isHA, optimizeForWrite);
cms.scheduleGetPRMetaData(region, false);
} else {
ExecuteRegionFunctionSingleHopOp
.execute(this.pool, this.region, functionId,
serverRegionExecutor, resultCollector, hasResult,
serverToFilterMap, retryAttempts, false, isHA,
optimizeForWrite);
}
}
} else {
cms.scheduleGetPRMetaData(region, false);
ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
serverRegionExecutor, resultCollector, hasResult, retryAttempts,
isHA, optimizeForWrite);
}
}
else {
ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
serverRegionExecutor, resultCollector, hasResult, retryAttempts, isHA, optimizeForWrite);
}
}
public void executeFunctionNoAck(String rgnName, Function function,
ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult, boolean replaying) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(3),
function, serverRegionExecutor, Byte.valueOf(hasResult));
ExecuteRegionFunctionNoAckOp.execute(this.pool, rgnName, function,
serverRegionExecutor, hasResult);
}
public void executeFunctionNoAck(String rgnName, String functionId,
ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult,
boolean isHA, boolean optimizeForWrite, boolean replaying) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(4),
functionId, serverRegionExecutor, Byte.valueOf(hasResult));
ExecuteRegionFunctionNoAckOp.execute(this.pool, rgnName, functionId,
serverRegionExecutor, hasResult, isHA, optimizeForWrite);
}
public Entry getEntry(Object key) {
recordTXOperation(ServerRegionOperation.GET_ENTRY, key);
return (Entry) GetEntryOp.execute(pool, region, key);
}
/**
* Transaction synchronization notification to the servers
* @see com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub#beforeCompletion()
*/
public void beforeCompletion(int txId) {
TXSynchronizationOp.execute(pool, 0, txId, TXSynchronizationOp.CompletionType.BEFORE_COMPLETION);
}
/**
* Transaction synchronization notification to the servers
* @param status
* @return the server's TXCommitMessage
* @see com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub#afterCompletion(int)
*/
public TXCommitMessage afterCompletion(int status, int txId) {
return TXSynchronizationOp.execute(pool, status, txId, TXSynchronizationOp.CompletionType.AFTER_COMPLETION);
}
public byte[] getFunctionAttributes(String functionId){
return (byte[])GetFunctionAttributeOp.execute(this.pool, functionId);
}
/** test hook */
private void recordTXOperation(ServerRegionOperation op, Object key, Object... arguments) {
if (ClientTXStateStub.transactionRecordingEnabled()) {
TXStateProxy tx = TXManagerImpl.getCurrentTXState();
if (tx == null) {
return;
}
tx.recordTXOperation(this, op, key, arguments);
}
}
}