blob: 24d20ae7a00dda9932232698b0f650b995b62459 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache.execute;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager;
import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector;
import org.apache.geode.logging.internal.log4j.api.LogService;
* Executes Function with FunctionService#onRegion(Region region) in client server mode.
* @see FunctionService#onRegion(Region) *
* @since GemFire 5.8 LA
public class ServerRegionFunctionExecutor extends AbstractExecution {
private static final Logger logger = LogService.getLogger();
private final LocalRegion region;
private boolean executeOnBucketSet = false;
ServerRegionFunctionExecutor(Region r, ProxyCache proxyCache) {
if (r == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
region = (LocalRegion) r;
this.proxyCache = proxyCache;
private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
Object args) {
region = serverRegionFunctionExecutor.region;
this.args = args;
executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
MemberMappedArgument memberMapargs) {
region = serverRegionFunctionExecutor.region;
memberMappedArg = memberMapargs;
executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
ResultCollector rc) {
region = serverRegionFunctionExecutor.region;
this.rc = rc != null ? new SynchronizedResultCollector(rc) : null;
executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
Set filter2) {
region = serverRegionFunctionExecutor.region;
executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
Set<Integer> bucketsAsFilter, boolean executeOnBucketSet) {
region = serverRegionFunctionExecutor.region;
this.executeOnBucketSet = executeOnBucketSet;
public Execution withFilter(Set fltr) {
if (fltr == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
executeOnBucketSet = false;
return new ServerRegionFunctionExecutor(this, fltr);
public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
if (bucketIDs == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"buckets as filter"));
return new ServerRegionFunctionExecutor(this, bucketIDs, true /* execute on bucketset */);
protected ResultCollector executeFunction(final Function function, long timeout, TimeUnit unit) {
byte hasResult = 0;
try {
if (proxyCache != null) {
if (proxyCache.isClosed()) {
throw proxyCache.getCacheClosedException("Cache is closed for this user.");
if (function.hasResult()) { // have Results
final int timeoutMs = TimeoutHelper.toMillis(timeout, unit);
hasResult = 1;
if (rc == null) { // Default Result Collector
ResultCollector defaultCollector = new DefaultResultCollector();
return executeOnServer(function, defaultCollector, hasResult, timeoutMs);
} else { // Custome Result COllector
return executeOnServer(function, rc, hasResult, timeoutMs);
} else { // No results
executeOnServerNoAck(function, hasResult);
return new NoResult();
} finally {
protected ResultCollector executeFunction(final String functionId, boolean resultReq,
boolean isHA, boolean optimizeForWrite, long timeout, TimeUnit unit) {
try {
if (proxyCache != null) {
if (proxyCache.isClosed()) {
throw proxyCache.getCacheClosedException("Cache is closed for this user.");
byte hasResult = 0;
if (resultReq) { // have Results
hasResult = 1;
final int timeoutMs = TimeoutHelper.toMillis(timeout, unit);
if (rc == null) { // Default Result Collector
ResultCollector defaultCollector = new DefaultResultCollector();
return executeOnServer(functionId, defaultCollector, hasResult, isHA, optimizeForWrite,
} else { // Custome Result COllector
return executeOnServer(functionId, rc, hasResult, isHA, optimizeForWrite, timeoutMs);
} else { // No results
executeOnServerNoAck(functionId, hasResult, isHA, optimizeForWrite);
return new NoResult();
} finally {
private ResultCollector executeOnServer(Function function, ResultCollector collector,
byte hasResult, int timeoutMs) throws FunctionException {
ServerRegionProxy srp = getServerRegionProxy();
FunctionStats stats =
FunctionStatsManager.getFunctionStats(function.getId(), region.getSystem());
long start = stats.startFunctionExecution(true);
try {
validateExecution(function, null);
srp.executeFunction(function, this, collector, hasResult,
stats.endFunctionExecution(start, true);
return collector;
} catch (FunctionException functionException) {
stats.endFunctionExecutionWithException(start, true);
throw functionException;
} catch (Exception exception) {
stats.endFunctionExecutionWithException(start, true);
throw new FunctionException(exception);
private ResultCollector executeOnServer(String functionId, ResultCollector collector,
byte hasResult, boolean isHA, boolean optimizeForWrite, int timeoutMs)
throws FunctionException {
ServerRegionProxy srp = getServerRegionProxy();
FunctionStats stats = FunctionStatsManager.getFunctionStats(functionId, region.getSystem());
long start = stats.startFunctionExecution(true);
try {
validateExecution(null, null);
srp.executeFunction(functionId, this, collector, hasResult, isHA,
optimizeForWrite, timeoutMs);
stats.endFunctionExecution(start, true);
return collector;
} catch (FunctionException functionException) {
stats.endFunctionExecutionWithException(start, true);
throw functionException;
} catch (Exception exception) {
stats.endFunctionExecutionWithException(start, true);
throw new FunctionException(exception);
private void executeOnServerNoAck(Function function, byte hasResult) throws FunctionException {
ServerRegionProxy srp = getServerRegionProxy();
FunctionStats stats =
FunctionStatsManager.getFunctionStats(function.getId(), region.getSystem());
long start = stats.startFunctionExecution(false);
try {
validateExecution(function, null);
srp.executeFunctionNoAck(region.getFullPath(), function, this, hasResult, false);
stats.endFunctionExecution(start, false);
} catch (FunctionException functionException) {
stats.endFunctionExecutionWithException(start, false);
throw functionException;
} catch (Exception exception) {
stats.endFunctionExecutionWithException(start, false);
throw new FunctionException(exception);
private void executeOnServerNoAck(String functionId, byte hasResult, boolean isHA,
boolean optimizeForWrite) throws FunctionException {
ServerRegionProxy srp = getServerRegionProxy();
FunctionStats stats = FunctionStatsManager.getFunctionStats(functionId, region.getSystem());
long start = stats.startFunctionExecution(false);
try {
validateExecution(null, null);
srp.executeFunctionNoAck(region.getFullPath(), functionId, this, hasResult, isHA,
optimizeForWrite, false);
stats.endFunctionExecution(start, false);
} catch (FunctionException functionException) {
stats.endFunctionExecutionWithException(start, false);
throw functionException;
} catch (Exception exception) {
stats.endFunctionExecutionWithException(start, false);
throw new FunctionException(exception);
private ServerRegionProxy getServerRegionProxy() throws FunctionException {
ServerRegionProxy srp = region.getServerProxy();
if (srp != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found server region proxy on region. RegionName: {}", region.getName());
return srp;
} else {
String message = srp + ": "
+ "No available connection was found. Server Region Proxy is not available for this region "
+ region.getName();
throw new FunctionException(message);
public LocalRegion getRegion() {
return region;
public String toString() {
return "[ ServerRegionExecutor:" + "args=" + args + " ;filter=" + filter + " ;region="
+ region.getName() + "]";
public Execution setArguments(Object args) {
if (args == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
return new ServerRegionFunctionExecutor(this, args);
public Execution withArgs(Object args) {
return setArguments(args);
public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"Result Collector"));
return new ServerRegionFunctionExecutor(this, rs);
public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
if (argument == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
return new ServerRegionFunctionExecutor(this, argument);
public void validateExecution(Function function, Set targetMembers) {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && cache.getTxManager().getTXState() != null) {
TXStateProxyImpl tx = (TXStateProxyImpl) cache.getTxManager().getTXState();
tx.getRealDeal(null, region);
public ResultCollector execute(final String functionName) {
return execute(functionName, getTimeoutMs(), TimeUnit.MILLISECONDS);
public ResultCollector execute(final String functionName, long timeout, TimeUnit unit) {
if (functionName == null) {
throw new FunctionException(
"The input function for the execute function request is null");
int timeoutInMs = (int) TimeUnit.MILLISECONDS.convert(timeout, unit);
isFnSerializationReqd = false;
Function functionObject = FunctionService.getFunction(functionName);
if (functionObject == null) {
byte[] functionAttributes = getFunctionAttributes(functionName);
if (functionAttributes == null) {
// GEODE-5618: Set authentication properties before executing the internal function.
try {
if (proxyCache != null) {
if (proxyCache.isClosed()) {
throw proxyCache.getCacheClosedException("Cache is closed for this user.");
ServerRegionProxy srp = getServerRegionProxy();
Object obj = srp.getFunctionAttributes(functionName);
functionAttributes = (byte[]) obj;
addFunctionAttributes(functionName, functionAttributes);
} finally {
boolean isHA = functionAttributes[1] == 1;
boolean hasResult = functionAttributes[0] == 1;
boolean optimizeForWrite = functionAttributes[2] == 1;
return executeFunction(functionName, hasResult, isHA, optimizeForWrite, timeout, unit);
} else {
return executeFunction(functionObject, timeout, unit);
public boolean getExecuteOnBucketSetFlag() {
return executeOnBucketSet;