blob: 395332bcf582fcb519eb1f4ff92a2c2f348bb4b2 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.execute;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import com.gemstone.gemfire.cache.LowMemoryException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.SetUtils;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
*
* @author ymahajan
*
*/
public class PartitionedRegionFunctionExecutor extends AbstractExecution {
private final PartitionedRegion pr;
private ServerToClientFunctionResultSender sender;
private boolean executeOnBucketSet = false;
private boolean isPRSingleHop = false;
public PartitionedRegionFunctionExecutor(Region r) {
if (r == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("region"));
}
this.pr = (PartitionedRegion)r;
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe) {
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = prfe.executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
this.isReExecute = prfe.isReExecute;
if (prfe.filter != null) {
this.filter.clear();
this.filter.addAll(prfe.filter);
}
if (prfe.sender != null) {
this.sender = prfe.sender;
}
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe,
MemberMappedArgument argument) {
// super copies args, rc and memberMappedArgument
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = prfe.executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
this.filter.clear();
this.filter.addAll(prfe.filter);
this.sender = prfe.sender;
// override member mapped arguments
this.memberMappedArg = argument;
this.isMemberMappedArgument = true;
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe,
ResultCollector rs) {
// super copies args, rc and memberMappedArgument
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = prfe.executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
this.filter.clear();
this.filter.addAll(prfe.filter);
this.sender = prfe.sender;
// override ResultCollector
this.rc = rs;
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe,
Object arguments) {
// super copies args, rc and memberMappedArgument
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = prfe.executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
this.filter.clear();
this.filter.addAll(prfe.filter);
this.sender = prfe.sender;
// override arguments
this.args = arguments;
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe,
Set filter2) {
// super copies args, rc and memberMappedArgument
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = prfe.executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
this.sender = prfe.sender;
this.filter.clear();
this.filter.addAll(filter2);
this.isReExecute = prfe.isReExecute;
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe,
Set<Integer> bucketsAsFilter, boolean executeOnBucketSet) {
// super copies args, rc and memberMappedArgument
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
this.sender = prfe.sender;
this.filter.clear();
this.filter.addAll(bucketsAsFilter);
this.isReExecute = prfe.isReExecute;
}
private PartitionedRegionFunctionExecutor(
PartitionedRegionFunctionExecutor prfe, boolean isReExecute) {
super(prfe);
this.pr = prfe.pr;
this.executeOnBucketSet = prfe.executeOnBucketSet;
this.isPRSingleHop = prfe.isPRSingleHop;
if (prfe.filter != null) {
this.filter.clear();
this.filter.addAll(prfe.filter);
}
if (prfe.sender != null) {
this.sender = prfe.sender;
}
this.isReExecute = isReExecute;
this.isClientServerMode = prfe.isClientServerMode;
if (prfe.failedNodes != null) {
this.failedNodes.clear();
this.failedNodes.addAll(prfe.failedNodes);
}
}
public PartitionedRegionFunctionExecutor(PartitionedRegion region, Set filter2, Object args,
MemberMappedArgument memberMappedArg,
ServerToClientFunctionResultSender resultSender, Set failedNodes,
boolean executeOnBucketSet) {
this.pr = region;
this.sender = resultSender;
this.isClientServerMode = true ;
this.executeOnBucketSet = executeOnBucketSet;
if (filter2 != null) {
this.filter.clear();
this.filter.addAll(filter2);
}
if (args != null) {
this.args = args;
}
else if (memberMappedArg != null) {
this.memberMappedArg = memberMappedArg;
this.isMemberMappedArgument = true;
}
if (failedNodes != null) {
this.failedNodes.clear();
this.failedNodes.addAll(failedNodes);
}
}
public PartitionedRegionFunctionExecutor(PartitionedRegion region,
Set filter2, Object args, MemberMappedArgument memberMappedArg,
ServerToClientFunctionResultSender resultSender, Set failedNodes,
boolean executeOnBucketSet, boolean isPRSingleHop) {
this.pr = region;
this.sender = resultSender;
this.isClientServerMode = true ;
this.executeOnBucketSet = executeOnBucketSet;
this.isPRSingleHop = isPRSingleHop;
if (filter2 != null) {
this.filter.clear();
this.filter.addAll(filter2);
}
if (args != null) {
this.args = args;
}
else if (memberMappedArg != null) {
this.memberMappedArg = memberMappedArg;
this.isMemberMappedArgument = true;
}
if (failedNodes != null) {
this.failedNodes.clear();
this.failedNodes.addAll(failedNodes);
}
}
public ResultCollector executeFunction(final Function function) {
if (function.hasResult()) {
if (this.rc == null) {
return this.pr.executeFunction(function, this,
new DefaultResultCollector(), this.executeOnBucketSet);
}
else {
return this.pr.executeFunction(function, this, rc,
this.executeOnBucketSet);
}
}
else { /* NO RESULT:fire-n-forget */
this.pr.executeFunction(function, this, null, this.executeOnBucketSet);
return new NoResult();
}
}
public Execution withFilter(Set filter) {
if (filter == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("filter"));
}
this.executeOnBucketSet = false;
return new PartitionedRegionFunctionExecutor(this, filter);
}
public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
if (bucketIDs == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("buckets as filter"));
}
else if (bucketIDs.isEmpty()) {
throw new FunctionException("Bucket IDs list is empty");
}
Set<Integer> actualBucketSet = pr.getRegionAdvisor().getBucketSet();
bucketIDs.retainAll(actualBucketSet);
Iterator<Integer> it = bucketIDs.iterator();
while (it.hasNext()) {
int bid = it.next();
if (!actualBucketSet.contains(bid)) {
throw new FunctionException("Bucket " + bid + " does not exist.");
}
}
if (bucketIDs.isEmpty()) {
throw new FunctionException("No valid buckets to execute on");
}
return new PartitionedRegionFunctionExecutor(this, bucketIDs, true);
}
public LocalRegion getRegion() {
return this.pr;
}
public ServerToClientFunctionResultSender getServerResultSender() {
return this.sender;
}
public Execution withArgs(Object args) {
if (args == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("args"));
}
return new PartitionedRegionFunctionExecutor(this, args);
}
public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("Result Collector"));
}
return new PartitionedRegionFunctionExecutor(this, rs);
}
public AbstractExecution setIsReExecute() {
return new PartitionedRegionFunctionExecutor(this, true);
}
public boolean isPrSingleHop(){
return this.isPRSingleHop;
}
public InternalExecution withMemberMappedArgument(
MemberMappedArgument argument) {
if (argument == null) {
throw new FunctionException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("MemberMapped arg"));
}
return new PartitionedRegionFunctionExecutor(this,argument);
}
@Override
public String toString() {
final StringBuffer buf = new StringBuffer();
buf.append("[ PartitionedRegionFunctionExecutor:");
buf.append("args=");
buf.append(this.args);
buf.append(";filter=");
buf.append(this.filter);
buf.append(";region=");
buf.append(this.pr.getName());
buf.append("]");
return buf.toString();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.execute.AbstractExecution#validateExecution(com.gemstone.gemfire.cache.execute.Function, java.util.Set)
*/
@Override
public void validateExecution(Function function, Set targetMembers) {
GemFireCacheImpl cache = pr.getGemFireCache();
if (cache != null && cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
.toLocalizedString());
} else {
assert targetMembers.size() == 1;
DistributedMember funcTarget = (DistributedMember)targetMembers.iterator().next();
DistributedMember target = cache.getTxManager().getTXState().getTarget();
if (target == null) {
cache.getTxManager().getTXState().setTarget(funcTarget);
} else if (!target.equals(funcTarget)) {
throw new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1
.toLocalizedString(new Object[] {target,funcTarget}));
}
}
}
if (function.optimizeForWrite() && cache.getResourceManager().getHeapMonitor().
containsHeapCriticalMembers(targetMembers) &&
!MemoryThresholds.isLowMemoryExceptionDisabled()) {
Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(
new Object[] {function.getId(), sm}), sm);
}
}
}