blob: e4e764c9abf40aa6c174a826b1a4162d41dbc7a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import static java.util.Collections.emptySet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.ContainsKeyOp.MODE;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.EventIDHolder;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXCommitMessage;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList.Iterator;
import org.apache.geode.internal.cache.tx.ClientTXStateStub;
import org.apache.geode.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import org.apache.geode.internal.logging.LogService;
/**
* Used to send region operations from a client to a server
*
* @since GemFire 5.7
*/
@SuppressWarnings("deprecation")
public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAccess {
private static final Logger logger = LogService.getLogger();
private final LocalRegion region;
private final String regionName;
/**
* Creates a server region proxy for the given region.
*
* @param r the region
* @throws IllegalStateException if the region does not have a pool
*/
public ServerRegionProxy(Region r) {
super(calcPool(r));
assert r instanceof LocalRegion;
region = (LocalRegion) r;
regionName = r.getFullPath();
}
/**
* Used by tests to create proxies for "fake" regions. Also, used by ClientStatsManager for admin
* region.
*/
public ServerRegionProxy(String regionName, PoolImpl pool) {
super(pool);
region = null;
this.regionName = regionName;
}
private static InternalPool calcPool(Region r) {
String poolName = r.getAttributes().getPoolName();
if (poolName == null || "".equals(poolName)) {
throw new IllegalStateException(
"The region " + r.getFullPath() + " did not have a client pool configured.");
} else {
InternalPool pool = (InternalPool) PoolManager.find(poolName);
if (pool == null) {
throw new IllegalStateException("The pool " + poolName + " does not exist.");
}
return pool;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.cache.client.internal.ServerRegionDataAccess#get(java.lang.Object,
* java.lang.Object)
*/
@Override
public Object get(Object key, Object callbackArg, EntryEventImpl clientEvent) {
recordTXOperation(ServerRegionOperation.GET, key, callbackArg);
return GetOp.execute(pool, region, key, callbackArg,
pool.getPRSingleHopEnabled(), clientEvent);
}
@Override
public int size() {
return SizeOp.execute(pool, regionName);
}
/**
* Do not call this method if the value is Delta instance. Exclicitly passing
* <code>Operation.CREATE</code> to the <code>PutOp.execute()</code> method as the caller of this
* method does not put Delta instances as value.
*
*/
public Object putForMetaRegion(Object key, Object value, byte[] deltaBytes, EntryEventImpl event,
Object callbackArg) {
if (region == null) {
return PutOp.execute(pool, regionName, key, value, deltaBytes, event,
Operation.CREATE, false, null, callbackArg, pool.getPRSingleHopEnabled());
} else {
return PutOp.execute(pool, region, key, value, deltaBytes, event, Operation.CREATE,
false, null, callbackArg, pool.getPRSingleHopEnabled());
}
}
@Override
public Object put(Object key, Object value, byte[] deltaBytes, EntryEventImpl event, Operation op,
boolean requireOldValue, Object expectedOldValue, Object callbackArg, boolean isCreate) {
recordTXOperation(ServerRegionOperation.PUT, key, value, deltaBytes, event.getEventId(), op,
requireOldValue, expectedOldValue, callbackArg, isCreate);
Operation operation = op;
if (!isCreate && region.getDataPolicy() == DataPolicy.EMPTY && op.isCreate()
&& op != Operation.PUT_IF_ABSENT) {
operation = Operation.UPDATE;
}
if (region == null) {
return PutOp.execute(pool, regionName, key, value, deltaBytes, event, operation,
requireOldValue, expectedOldValue, callbackArg, pool.getPRSingleHopEnabled());
} else {
return PutOp.execute(pool, region, key, value, deltaBytes, event, operation,
requireOldValue, expectedOldValue, callbackArg, pool.getPRSingleHopEnabled());
}
}
/**
* Does a region put on the server using the given connection.
*
* @param con the connection to use to send to the server
* @param key the entry key to do the put on
* @param value the entry value to put
* @param eventId the event ID for this put
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void putOnForTestsOnly(Connection con, Object key, Object value, EventID eventId,
Object callbackArg) {
EventIDHolder event = new EventIDHolder(eventId);
PutOp.execute(con, pool, regionName, key, value, event, callbackArg,
pool.getPRSingleHopEnabled());
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.cache.client.internal.ServerRegionDataAccess#destroy(java.lang.Object,
* java.lang.Object, org.apache.geode.cache.Operation, org.apache.geode.internal.cache.EventID,
* java.lang.Object)
*/
@Override
public Object destroy(Object key, Object expectedOldValue, Operation operation,
EntryEventImpl event, Object callbackArg) {
if (event.isBulkOpInProgress()) {
// this is a removeAll, ignore this!
return null;
}
recordTXOperation(ServerRegionOperation.DESTROY, key, expectedOldValue, operation,
event.getEventId(), callbackArg);
return DestroyOp.execute(pool, region, key, expectedOldValue, operation, event,
callbackArg, pool.getPRSingleHopEnabled());
}
@Override
public void invalidate(EntryEventImpl event) {
recordTXOperation(ServerRegionOperation.INVALIDATE, event.getKey(), event);
InvalidateOp.execute(pool, region.getFullPath(), event);
}
/**
* Does a region entry destroy on the server using the given connection.
*
* @param con the connection to use to send to the server
* @param key the entry key to do the destroy on
* @param expectedOldValue the value that the entry must have to perform the operation, or null
* @param operation the operation being performed (Operation.DESTROY, Operation.REMOVE)
* @param event the event for this destroy operation
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void destroyOnForTestsOnly(Connection con, Object key, Object expectedOldValue,
Operation operation, EntryEventImpl event, Object callbackArg) {
DestroyOp.execute(con, pool, regionName, key, expectedOldValue, operation, event,
callbackArg);
}
/**
* Does a region destroy on the server
*
* @param eventId the event id for this destroy
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void destroyRegion(EventID eventId, Object callbackArg) {
DestroyRegionOp.execute(pool, regionName, eventId, callbackArg);
}
/**
* Does a region destroy on the server using the given connection.
*
* @param con the connection to use to send to the server
* @param eventId the event id for this destroy
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void destroyRegionOnForTestsOnly(Connection con, EventID eventId, Object callbackArg) {
DestroyRegionOp.execute(con, pool, regionName, eventId, callbackArg);
}
public TXCommitMessage commit(int txId) {
return CommitOp.execute(pool, txId);
}
public void rollback(int txId) {
RollbackOp.execute(pool, txId);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ServerRegionDataAccess#clear(org.apache.geode.internal.
* cache.EventID, java.lang.Object)
*/
@Override
public void clear(EventID eventId, Object callbackArg) {
ClearOp.execute(pool, regionName, eventId, callbackArg);
}
/**
* Does a region clear on the server using the given connection.
*
* @param con the connection to use to send to the server
* @param eventId the event id for this clear
* @param callbackArg an optional callback arg to pass to any cache callbacks
*/
public void clearOnForTestsOnly(Connection con, EventID eventId, Object callbackArg) {
ClearOp.execute(con, pool, regionName, eventId, callbackArg);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
*/
@Override
public boolean containsKey(Object key) {
recordTXOperation(ServerRegionOperation.CONTAINS_KEY, key);
return ContainsKeyOp.execute(pool, regionName, key, MODE.KEY);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
*/
@Override
public boolean containsValueForKey(Object key) {
recordTXOperation(ServerRegionOperation.CONTAINS_VALUE_FOR_KEY, key);
return ContainsKeyOp.execute(pool, regionName, key, MODE.VALUE_FOR_KEY);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
*/
@Override
public boolean containsValue(Object value) {
recordTXOperation(ServerRegionOperation.CONTAINS_VALUE, null, value);
return ContainsKeyOp.execute(pool, regionName, value, MODE.VALUE);
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.cache.client.internal.ServerRegionDataAccess#keySet()
*/
@Override
public Set keySet() {
recordTXOperation(ServerRegionOperation.KEY_SET, null);
return KeySetOp.execute(pool, regionName);
}
/**
* Does a region registerInterest on a server
*
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterest(final Object key, final int interestType,
final InterestResultPolicy policy, final boolean isDurable, final byte regionDataPolicy) {
return registerInterest(key, interestType, policy, isDurable, false, regionDataPolicy);
}
/**
* Does a region registerInterest on a server
*
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterest(final Object key, final int interestType,
final InterestResultPolicy policy, final boolean isDurable,
final boolean receiveUpdatesAsInvalidates, final byte regionDataPolicy) {
if (interestType == InterestType.KEY && key instanceof List) {
logger.warn(
"Usage of registerInterest(List) has been deprecated. Please use registerInterestForKeys(Iterable)");
return registerInterestList((List) key, policy, isDurable, receiveUpdatesAsInvalidates,
regionDataPolicy);
} else {
final RegisterInterestTracker rit = pool.getRITracker();
List result;
boolean finished = false;
try {
// register with the tracker early
rit.addSingleInterest(region, key, interestType, policy, isDurable,
receiveUpdatesAsInvalidates);
result = RegisterInterestOp.execute(pool, regionName, key, interestType, policy,
isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
//////// TEST PURPOSE ONLY ///////////
if (PoolImpl.AFTER_REGISTER_CALLBACK_FLAG) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.afterInterestRegistration();
}
/////////////////////////////////////////
finished = true;
return result;
} finally {
if (!finished) {
rit.removeSingleInterest(region, key, interestType, isDurable,
receiveUpdatesAsInvalidates);
}
}
}
}
/**
* Support for server-side interest registration
*/
public void addSingleInterest(Object key, int interestType, InterestResultPolicy pol,
boolean isDurable, boolean receiveUpdatesAsInvalidates) {
RegisterInterestTracker rit = pool.getRITracker();
boolean finished = false;
try {
rit.addSingleInterest(region, key, interestType, pol, isDurable,
receiveUpdatesAsInvalidates);
finished = true;
} finally {
if (!finished) {
rit.removeSingleInterest(region, key, interestType, isDurable,
receiveUpdatesAsInvalidates);
}
}
}
public void addListInterest(List keys, InterestResultPolicy pol, boolean isDurable,
boolean receiveUpdatesAsInvalidates) {
RegisterInterestTracker rit = pool.getRITracker();
boolean finished = false;
try {
rit.addInterestList(region, keys, pol, isDurable, receiveUpdatesAsInvalidates);
finished = true;
} finally {
if (!finished) {
rit.removeInterestList(region, keys, isDurable, receiveUpdatesAsInvalidates);
}
}
}
/**
* Support for server-side interest registration
*/
public void removeSingleInterest(Object key, int interestType, boolean isDurable,
boolean receiveUpdatesAsInvalidates) {
pool.getRITracker().removeSingleInterest(region, key, interestType, isDurable,
receiveUpdatesAsInvalidates);
}
public void removeListInterest(List keys, boolean isDurable,
boolean receiveUpdatesAsInvalidates) {
pool.getRITracker().removeInterestList(region, keys, isDurable,
receiveUpdatesAsInvalidates);
}
/**
* Does a region registerInterest on a server described by the given server location
* <p>
* Note that this call by-passes the RegisterInterestTracker.
*
* @param sl the server to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestOn(ServerLocation sl, final Object key, final int interestType,
final InterestResultPolicy policy, final boolean isDurable, final byte regionDataPolicy) {
return registerInterestOn(sl, key, interestType, policy, isDurable, false, regionDataPolicy);
}
/**
* Does a region registerInterest on a server described by the given server location
* <p>
* Note that this call by-passes the RegisterInterestTracker.
*
* @param sl the server to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
private List registerInterestOn(ServerLocation sl, final Object key, final int interestType,
final InterestResultPolicy policy, final boolean isDurable,
final boolean receiveUpdatesAsInvalidates, final byte regionDataPolicy) {
if (interestType == InterestType.KEY && key instanceof List) {
return RegisterInterestListOp.executeOn(sl, pool, regionName, (List) key, policy,
isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
} else {
return RegisterInterestOp.executeOn(sl, pool, regionName, key, interestType, policy,
isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
}
}
/**
* Does a region registerInterest on a server described by the given connection
* <p>
* Note that this call by-passes the RegisterInterestTracker.
*
* @param conn the connection to do the register interest on.
* @param key describes what we are interested in
* @param interestType the {@link InterestType} for this registration
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
List registerInterestOn(Connection conn, final Object key, final int interestType,
final InterestResultPolicy policy, final boolean isDurable,
final boolean receiveUpdatesAsInvalidates, final byte regionDataPolicy) {
if (interestType == InterestType.KEY && key instanceof List) {
return RegisterInterestListOp.executeOn(conn, pool, regionName, (List) key, policy,
isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
} else {
return RegisterInterestOp.executeOn(conn, pool, regionName, key, interestType,
policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
}
}
/**
* Does a region registerInterestList on a server
*
* @param keys list of keys we are interested in
* @param policy the interest result policy for this registration
* @param isDurable true if this registration is durable
* @param regionDataPolicy the data policy ordinal of the region
* @return list of keys
*/
public List registerInterestList(List keys, InterestResultPolicy policy, boolean isDurable,
boolean receiveUpdatesAsInvalidates, final byte regionDataPolicy) {
final RegisterInterestTracker rit = pool.getRITracker();
List result;
boolean finished = false;
try {
// register with the tracker early
rit.addInterestList(region, keys, policy, isDurable, receiveUpdatesAsInvalidates);
result = RegisterInterestListOp.execute(pool, regionName, keys, policy, isDurable,
receiveUpdatesAsInvalidates, regionDataPolicy);
finished = true;
//////// TEST PURPOSE ONLY ///////////
if (PoolImpl.AFTER_REGISTER_CALLBACK_FLAG) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.afterInterestRegistration();
}
/////////////////////////////////////////
return result;
} finally {
if (!finished) {
rit.removeInterestList(region, keys, isDurable, receiveUpdatesAsInvalidates);
}
}
}
/**
* Does a region unregisterInterest on a server
*
* @param key describes what we are no longer interested in
* @param interestType the {@link InterestType} for this unregister
* @param isClosing true if this unregister is done by a close
* @param keepAlive true if this unregister should not undo a durable registration
*/
public void unregisterInterest(Object key, int interestType, boolean isClosing,
boolean keepAlive) {
if (interestType == InterestType.KEY && key instanceof List) {
unregisterInterestList((List) key, isClosing, keepAlive);
} else {
RegisterInterestTracker rit = pool.getRITracker();
boolean removed = rit.removeSingleInterest(region, key, interestType, false, false)
|| rit.removeSingleInterest(region, key, interestType, true, false)
|| rit.removeSingleInterest(region, key, interestType, false, true)
|| rit.removeSingleInterest(region, key, interestType, true, true);
if (removed) {
UnregisterInterestOp.execute(pool, regionName, key, interestType, isClosing,
keepAlive);
}
}
}
/**
* Does a region unregisterInterestList on a server
*
* @param keys list of keys we are interested in
* @param isClosing true if this unregister is done by a close
* @param keepAlive true if this unregister should not undo a durable registration
*/
public void unregisterInterestList(List keys, boolean isClosing, boolean keepAlive) {
RegisterInterestTracker rit = pool.getRITracker();
boolean removed = rit.removeInterestList(region, keys, false, true)
|| rit.removeInterestList(region, keys, false, false)
|| rit.removeInterestList(region, keys, true, true)
|| rit.removeInterestList(region, keys, true, false);
if (removed) {
UnregisterInterestListOp.execute(pool, regionName, keys, isClosing, keepAlive);
}
}
public List getInterestList(int interestType) {
return pool.getRITracker().getInterestList(regionName, interestType);
}
@Override
public VersionedObjectList putAll(Map map, EventID eventId, boolean skipCallbacks,
Object callbackArg) {
recordTXOperation(ServerRegionOperation.PUT_ALL, null, map, eventId);
int txID = TXManagerImpl.getCurrentTXUniqueId();
if (pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
return PutAllOp.execute(pool, region, map, eventId, skipCallbacks,
pool.getRetryAttempts(), callbackArg);
} else {
return PutAllOp.execute(pool, region, map, eventId, skipCallbacks, false,
callbackArg);
}
}
@Override
public VersionedObjectList removeAll(Collection<Object> keys, EventID eventId,
Object callbackArg) {
recordTXOperation(ServerRegionOperation.REMOVE_ALL, null, keys, eventId);
int txID = TXManagerImpl.getCurrentTXUniqueId();
if (pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
return RemoveAllOp.execute(pool, region, keys, eventId,
pool.getRetryAttempts(), callbackArg);
} else {
return RemoveAllOp.execute(pool, region, keys, eventId, false, callbackArg);
}
}
@Override
public VersionedObjectList getAll(List keys, Object callback) {
recordTXOperation(ServerRegionOperation.GET_ALL, null, keys);
int txID = TXManagerImpl.getCurrentTXUniqueId();
VersionedObjectList result;
if (pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
result =
GetAllOp.execute(pool, region, keys, pool.getRetryAttempts(), callback);
} else {
result = GetAllOp.execute(pool, regionName, keys, callback);
}
if (result != null) {
for (Iterator it = result.iterator(); it.hasNext();) {
VersionedObjectList.Entry entry = it.next();
Object key = entry.getKey();
Object value = entry.getValue();
boolean isOnServer = entry.isKeyNotOnServer();
if (!isOnServer) {
if (value instanceof Throwable) {
logger.warn(String.format(
"%s: Caught the following exception attempting to get value for key=%s", value,
key),
(Throwable) value);
}
}
}
}
return result;
}
/**
* Release use of this pool
*/
public void detach(boolean keepalive) {
pool.getRITracker().unregisterRegion(this, keepalive);
super.detach();
}
@Override
public String getRegionName() {
return regionName;
}
@Override
public Region getRegion() {
return region;
}
public void executeFunction(Function function,
ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector,
byte hasResult, final int timeoutMs) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, 1, function,
serverRegionExecutor, resultCollector, hasResult);
int retryAttempts = pool.getRetryAttempts();
boolean inTransaction = TXManagerImpl.getCurrentTXState() != null;
final Supplier<AbstractOp> executeRegionFunctionOpSupplier =
() -> new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(region.getFullPath(),
function, serverRegionExecutor, resultCollector, timeoutMs);
if (pool.getPRSingleHopEnabled() && !inTransaction) {
ClientMetadataService cms = region.getCache().getClientMetadataService();
if (cms.isMetadataStable()) {
if (serverRegionExecutor.getFilter().isEmpty()) {
HashMap<ServerLocation, HashSet<Integer>> serverToBuckets =
cms.groupByServerToAllBuckets(region, function.optimizeForWrite());
if (serverToBuckets == null || serverToBuckets.isEmpty()) {
ExecuteRegionFunctionOp.execute(pool, resultCollector, retryAttempts, function.isHA(),
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
cms.scheduleGetPRMetaData(region, false);
} else {
final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction =
executor -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
region.getFullPath(), function, executor, resultCollector,
hasResult, emptySet(), true, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor,
resultCollector, serverToBuckets, retryAttempts, function.isHA(),
regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
}
} else {
boolean isBucketFilter = serverRegionExecutor.getExecuteOnBucketSetFlag();
Map<ServerLocation, HashSet> serverToFilterMap =
cms.getServerToFilterMap(serverRegionExecutor.getFilter(), region,
function.optimizeForWrite(), isBucketFilter);
if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
ExecuteRegionFunctionOp.execute(pool, resultCollector, retryAttempts, function.isHA(),
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
cms.scheduleGetPRMetaData(region, false);
} else {
final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction =
executor -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
region.getFullPath(), function, executor, resultCollector,
hasResult, emptySet(), isBucketFilter, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region,
serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
function.isHA(), regionFunctionSingleHopOpFunction,
executeRegionFunctionOpSupplier);
}
}
} else {
cms.scheduleGetPRMetaData(region, false);
ExecuteRegionFunctionOp.execute(pool, resultCollector, retryAttempts, function.isHA(),
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
}
} else {
ExecuteRegionFunctionOp.execute(pool,
resultCollector, retryAttempts, function.isHA(),
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
}
}
public void executeFunction(String functionId,
ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector,
byte hasResult, boolean isHA, boolean optimizeForWrite,
final int timeoutMs) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, 2, functionId,
serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite);
int retryAttempts = pool.getRetryAttempts();
boolean inTransaction = TXManagerImpl.getCurrentTXState() != null;
final Supplier<AbstractOp> executeRegionFunctionOpSupplier =
() -> new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(region.getFullPath(),
functionId, serverRegionExecutor, resultCollector, hasResult, isHA,
optimizeForWrite, true, timeoutMs);
if (pool.getPRSingleHopEnabled() && !inTransaction) {
ClientMetadataService cms = region.getCache().getClientMetadataService();
if (cms.isMetadataStable()) {
if (serverRegionExecutor.getFilter().isEmpty()) {
HashMap<ServerLocation, HashSet<Integer>> serverToBuckets =
cms.groupByServerToAllBuckets(region, optimizeForWrite);
if (serverToBuckets == null || serverToBuckets.isEmpty()) {
ExecuteRegionFunctionOp.execute(pool, resultCollector, retryAttempts, isHA,
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
cms.scheduleGetPRMetaData(region, false);
} else {
final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction =
executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
region.getFullPath(), functionId, executor1, resultCollector, hasResult,
emptySet(), true, isHA, optimizeForWrite, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region,
serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA,
regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
}
} else {
boolean isBucketsAsFilter = serverRegionExecutor.getExecuteOnBucketSetFlag();
Map<ServerLocation, HashSet> serverToFilterMap = cms.getServerToFilterMap(
serverRegionExecutor.getFilter(), region, optimizeForWrite, isBucketsAsFilter);
if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
ExecuteRegionFunctionOp.execute(pool, resultCollector, retryAttempts, isHA,
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
cms.scheduleGetPRMetaData(region, false);
} else {
final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction =
executor -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
region.getFullPath(), functionId, executor, resultCollector, hasResult,
emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region,
serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
}
}
} else {
cms.scheduleGetPRMetaData(region, false);
ExecuteRegionFunctionOp.execute(pool,
resultCollector, retryAttempts, isHA,
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
}
} else {
ExecuteRegionFunctionOp.execute(pool,
resultCollector, retryAttempts, isHA,
(ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl) executeRegionFunctionOpSupplier
.get(),
false, emptySet());
}
}
public void executeFunctionNoAck(String rgnName, Function function,
ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult, boolean replaying) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, 3, function,
serverRegionExecutor, hasResult);
ExecuteRegionFunctionNoAckOp.execute(pool, rgnName, function, serverRegionExecutor,
hasResult);
}
public void executeFunctionNoAck(String rgnName, String functionId,
ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult, boolean isHA,
boolean optimizeForWrite, boolean replaying) {
recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, 4, functionId,
serverRegionExecutor, hasResult);
ExecuteRegionFunctionNoAckOp.execute(pool, rgnName, functionId, serverRegionExecutor,
hasResult, isHA, optimizeForWrite);
}
@Override
public Entry getEntry(Object key) {
recordTXOperation(ServerRegionOperation.GET_ENTRY, key);
return (Entry) GetEntryOp.execute(pool, region, key);
}
/**
* Transaction synchronization notification to the servers
*
* @see org.apache.geode.internal.cache.tx.ClientTXStateStub#beforeCompletion()
*/
public void beforeCompletion(int txId) {
TXSynchronizationOp.execute(pool, 0, txId,
TXSynchronizationOp.CompletionType.BEFORE_COMPLETION);
}
/**
* Transaction synchronization notification to the servers
*
* @return the server's TXCommitMessage
* @see org.apache.geode.internal.cache.tx.ClientTXStateStub#afterCompletion(int)
*/
public TXCommitMessage afterCompletion(int status, int txId) {
return TXSynchronizationOp.execute(pool, status, txId,
TXSynchronizationOp.CompletionType.AFTER_COMPLETION);
}
public byte[] getFunctionAttributes(String functionId) {
return (byte[]) GetFunctionAttributeOp.execute(pool, functionId);
}
/** test hook */
private void recordTXOperation(ServerRegionOperation op, Object key, Object... arguments) {
if (ClientTXStateStub.transactionRecordingEnabled()) {
TXStateProxy tx = TXManagerImpl.getCurrentTXState();
if (tx == null) {
return;
}
tx.recordTXOperation(this, op, key, arguments);
}
}
}