blob: 54bbde49488d46ad00bc0b0e984d6b333a3e06aa [file] [log] [blame]
/*
* =========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache.execute;
import java.util.Set;
import com.gemstone.gemfire.cache.LowMemoryException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
* Executes Function on Distributed Regions.
*
* For DistributedRegions with DataPolicy.NORMAL, it throws
* UnsupportedOperationException. <br>
* For DistributedRegions with DataPolicy.EMPTY, execute the function on any
* random member which has DataPolicy.REPLICATE <br>
* For DistributedRegions with DataPolicy.REPLICATE, execute the function
* locally.
*
* @author Yogesh Mahajan
*
* @since 5.8 LA
*
*/
public class DistributedRegionFunctionExecutor extends AbstractExecution {
private final LocalRegion region;
private ServerToClientFunctionResultSender sender;
public DistributedRegionFunctionExecutor(Region r) {
if (r == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("region"));
}
this.region = (LocalRegion)r;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor drfe) {
super(drfe);
this.region = drfe.region;
if (drfe.filter != null) {
this.filter.clear();
this.filter.addAll(drfe.filter);
}
this.sender = drfe.sender;
}
public DistributedRegionFunctionExecutor(DistributedRegion region, Set filter2, Object args,
MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) {
if (args != null) {
this.args = args;
}
else if (memberMappedArg != null) {
this.memberMappedArg = memberMappedArg;
this.isMemberMappedArgument = true;
}
this.sender = resultSender;
if (filter2 != null) {
this.filter.clear();
this.filter.addAll(filter2);
}
this.region = region;
this.isClientServerMode = true;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor,
MemberMappedArgument argument) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.filter.clear();
this.filter.addAll(distributedRegionFunctionExecutor.filter);
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.memberMappedArg = argument;
this.isMemberMappedArgument = true;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor,
ResultCollector rs) { super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.filter.clear();
this.filter.addAll(distributedRegionFunctionExecutor.filter);
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.rc = rs;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor,
Object args) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.filter.clear();
this.filter.addAll(distributedRegionFunctionExecutor.filter);
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.args = args;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor,
Set filter2) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.filter.clear();
this.filter.addAll(filter2);
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor drfe, boolean isReExecute) {
super(drfe);
this.region = drfe.region;
if (drfe.filter != null) {
this.filter.clear();
this.filter.addAll(drfe.filter);
}
this.sender = drfe.sender;
this.isReExecute = isReExecute;
}
public ResultCollector execute(final String functionName) {
if (functionName == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString());
}
this.isFnSerializationReqd = false;
Function functionObject = FunctionService.getFunction(functionName);
if (functionObject == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
.toLocalizedString(functionObject));
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_CAN_NOT_EXECUTE_ON_NORMAL_REGION
.toLocalizedString());
}
return executeFunction(functionObject);
}
public ResultCollector execute(String functionName, boolean hasResult)
throws FunctionException {
if (functionName == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString());
}
Function functionObject = FunctionService.getFunction(functionName);
if (functionObject == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
.toLocalizedString(functionObject));
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_CAN_NOT_EXECUTE_ON_NORMAL_REGION
.toLocalizedString());
}
byte registeredFunctionState = AbstractExecution.getFunctionState(
functionObject.isHA(), functionObject.hasResult(), functionObject
.optimizeForWrite());
byte functionState = AbstractExecution.getFunctionState(hasResult,
hasResult, false);
if (registeredFunctionState != functionState) {
throw new FunctionException(
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
.toLocalizedString(functionName));
}
this.isFnSerializationReqd = false;
// If hasResult is true, isHA will also be true and hasResult is false then
// isHA will be false
// For other combination use next API
return executeFunction(functionObject);
}
public ResultCollector execute(String functionName, boolean hasResult,
boolean isHA) throws FunctionException {
if (functionName == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString());
}
if (isHA && !hasResult) {
throw new FunctionException(
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH
.toLocalizedString());
}
Function functionObject = FunctionService.getFunction(functionName);
if (functionObject == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
.toLocalizedString(functionObject));
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_CAN_NOT_EXECUTE_ON_NORMAL_REGION
.toLocalizedString());
}
byte registeredFunctionState = AbstractExecution.getFunctionState(
functionObject.isHA(), functionObject.hasResult(), functionObject
.optimizeForWrite());
byte functionState = AbstractExecution.getFunctionState(isHA, hasResult,
false);
if (registeredFunctionState != functionState) {
throw new FunctionException(
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
.toLocalizedString(functionName));
}
this.isFnSerializationReqd = false;
return executeFunction(functionObject);
}
public ResultCollector execute(String functionName, boolean hasResult,
boolean isHA, boolean isOptimizeForWrite) throws FunctionException {
if (functionName == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString());
}
if (isHA && !hasResult) {
throw new FunctionException(
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH
.toLocalizedString());
}
Function functionObject = FunctionService.getFunction(functionName);
if (functionObject == null) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
.toLocalizedString(functionObject));
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_CAN_NOT_EXECUTE_ON_NORMAL_REGION
.toLocalizedString());
}
byte registeredFunctionState = AbstractExecution.getFunctionState(
functionObject.isHA(), functionObject.hasResult(), functionObject
.optimizeForWrite());
byte functionState = AbstractExecution.getFunctionState(isHA, hasResult,
isOptimizeForWrite);
if (registeredFunctionState != functionState) {
throw new FunctionException(
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
.toLocalizedString(functionName));
}
this.isFnSerializationReqd = false;
return executeFunction(functionObject);
}
@Override
public ResultCollector execute(final Function function){
if (function == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("function instance"));
}
if (function.isHA() && !function.hasResult()) {
throw new FunctionException(
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH
.toLocalizedString());
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_CAN_NOT_EXECUTE_ON_NORMAL_REGION
.toLocalizedString());
}
String id = function.getId();
if (id == null) {
throw new FunctionException(LocalizedStrings.ExecuteFunction_THE_FUNCTION_GET_ID_RETURNED_NULL.toLocalizedString());
}
this.isFnSerializationReqd = true;
return executeFunction(function);
}
@Override
protected ResultCollector executeFunction(Function function){
if (function.hasResult()) { // have Results
if (this.rc == null) { // Default Result Collector
ResultCollector defaultCollector = new DefaultResultCollector();
return this.region.executeFunction(this,function, args, defaultCollector,
this.filter, this.sender);
}
else { // Custome Result COllector
return this.region.executeFunction(this, function, args, rc, this.filter,
this.sender);
}
}
else { // No results
this.region.executeFunction(this,function, args, null, this.filter,
this.sender);
return new NoResult();
}
}
public Execution withFilter(Set filter) {
if (filter == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("filter"));
}
return new DistributedRegionFunctionExecutor(this,filter);
}
@Override
public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
if(bucketIDs != null && !bucketIDs.isEmpty()) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_BUCKET_FILTER_ON_NON_PR.toLocalizedString(region.getName()));
}
return this;
}
public LocalRegion getRegion() {
return this.region;
}
public ServerToClientFunctionResultSender getServerResultSender() {
return this.sender;
}
public Execution withArgs(Object args) {
if (args == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("Args"));
}
return new DistributedRegionFunctionExecutor(this, args);
}
public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("Result Collector"));
}
return new DistributedRegionFunctionExecutor(this,rs);
}
public InternalExecution withMemberMappedArgument(
MemberMappedArgument argument) {
if (argument == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("MemberMappedArgument"));
}
return new DistributedRegionFunctionExecutor(this,argument);
}
@Override
public AbstractExecution setIsReExecute() {
return new DistributedRegionFunctionExecutor(this, true);
}
@Override
public String toString() {
final StringBuffer buf = new StringBuffer();
buf.append("[ DistributedRegionFunctionExecutor:");
buf.append("args=");
buf.append(this.args);
buf.append(";region=");
buf.append(this.region.getName());
buf.append("]");
return buf.toString();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.execute.AbstractExecution#validateExecution(com.gemstone.gemfire.cache.execute.Function, java.util.Set)
*/
@Override
public void validateExecution(Function function, Set targetMembers) {
GemFireCacheImpl cache = region.getGemFireCache();
if (cache != null && cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
.toLocalizedString());
} else {
assert targetMembers.size() == 1;
DistributedMember funcTarget = (DistributedMember)targetMembers.iterator().next();
DistributedMember target = cache.getTxManager().getTXState().getTarget();
if (target == null) {
cache.getTxManager().getTXState().setTarget(funcTarget);
} else if (!target.equals(funcTarget)) {
throw new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1
.toLocalizedString(new Object[] {target,funcTarget}));
}
}
}
if (!MemoryThresholds.isLowMemoryExceptionDisabled() && function.optimizeForWrite()) {
try {
region.checkIfAboveThreshold(null);
} catch (LowMemoryException e) {
Set<DistributedMember> htrm = region.getMemoryThresholdReachedMembers();
throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(
new Object[] {function.getId(), htrm}), htrm);
}
}
}
}