blob: 18e6f84aa01460981c470bfc755d49b3c4543fc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import java.util.Set;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.MemoryThresholdInfo;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.MemoryThresholds;
/**
* Executes Function on Distributed Regions.
*
* For DistributedRegions with DataPolicy.NORMAL, it throws UnsupportedOperationException. <br>
* For DistributedRegions with DataPolicy.EMPTY, execute the function on any random member which has
* DataPolicy.REPLICATE <br>
* For DistributedRegions with DataPolicy.REPLICATE, execute the function locally.
*
*
* @since GemFire 5.8 LA
*
*/
public class DistributedRegionFunctionExecutor extends AbstractExecution {
private final LocalRegion region;
private ServerToClientFunctionResultSender sender;
public DistributedRegionFunctionExecutor(Region r) {
if (r == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"region"));
}
this.region = (LocalRegion) r;
}
private DistributedRegionFunctionExecutor(DistributedRegionFunctionExecutor drfe) {
super(drfe);
this.region = drfe.region;
if (drfe.filter != null) {
this.filter.clear();
this.filter.addAll(drfe.filter);
}
this.sender = drfe.sender;
}
public DistributedRegionFunctionExecutor(DistributedRegion region, Set filter2, Object args,
MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) {
if (args != null) {
this.args = args;
} else if (memberMappedArg != null) {
this.memberMappedArg = memberMappedArg;
this.isMemberMappedArgument = true;
}
this.sender = resultSender;
if (filter2 != null) {
this.filter.clear();
this.filter.addAll(filter2);
}
this.region = region;
this.isClientServerMode = true;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor,
MemberMappedArgument argument) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.filter.clear();
this.filter.addAll(distributedRegionFunctionExecutor.filter);
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.memberMappedArg = argument;
this.isMemberMappedArgument = true;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor, ResultCollector rs) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.filter.clear();
this.filter.addAll(distributedRegionFunctionExecutor.filter);
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.rc = rs;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor, Object args) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.filter.clear();
this.filter.addAll(distributedRegionFunctionExecutor.filter);
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.args = args;
}
private DistributedRegionFunctionExecutor(
DistributedRegionFunctionExecutor distributedRegionFunctionExecutor, Set filter2) {
super(distributedRegionFunctionExecutor);
this.region = distributedRegionFunctionExecutor.getRegion();
this.sender = distributedRegionFunctionExecutor.getServerResultSender();
this.filter.clear();
this.filter.addAll(filter2);
}
private DistributedRegionFunctionExecutor(DistributedRegionFunctionExecutor drfe,
boolean isReExecute) {
super(drfe);
this.region = drfe.region;
if (drfe.filter != null) {
this.filter.clear();
this.filter.addAll(drfe.filter);
}
this.sender = drfe.sender;
this.isReExecute = isReExecute;
}
@Override
public ResultCollector execute(final String functionName) {
if (functionName == null) {
throw new FunctionException(
"The input function for the execute function request is null");
}
this.isFnSerializationReqd = false;
Function functionObject = FunctionService.getFunction(functionName);
if (functionObject == null) {
throw new FunctionException(
String.format("Function named %s is not registered to FunctionService",
functionName));
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
"Function execution on region with DataPolicy.NORMAL is not supported");
}
return executeFunction(functionObject);
}
@Override
public ResultCollector execute(final Function function) {
if (function == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"function instance"));
}
if (function.isHA() && !function.hasResult()) {
throw new FunctionException(
"For Functions with isHA true, hasResult must also be true.");
}
if (region.getAttributes().getDataPolicy().isNormal()) {
throw new FunctionException(
"Function execution on region with DataPolicy.NORMAL is not supported");
}
String id = function.getId();
if (id == null) {
throw new FunctionException(
"The Function#getID() returned null");
}
this.isFnSerializationReqd = true;
return executeFunction(function);
}
@Override
protected ResultCollector executeFunction(Function function) {
if (function.hasResult()) { // have Results
if (this.rc == null) { // Default Result Collector
ResultCollector defaultCollector = new DefaultResultCollector();
return this.region.executeFunction(this, function, args, defaultCollector, this.filter,
this.sender);
} else { // Custome Result COllector
return this.region.executeFunction(this, function, args, rc, this.filter, this.sender);
}
} else { // No results
this.region.executeFunction(this, function, args, null, this.filter, this.sender);
return new NoResult();
}
}
@Override
public Execution withFilter(Set filter) {
if (filter == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"filter"));
}
return new DistributedRegionFunctionExecutor(this, filter);
}
@Override
public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
if (bucketIDs != null && !bucketIDs.isEmpty()) {
throw new IllegalArgumentException(
String.format("Buckets as filter cannot be applied to a non partitioned region: %s",
region.getName()));
}
return this;
}
public LocalRegion getRegion() {
return this.region;
}
public ServerToClientFunctionResultSender getServerResultSender() {
return this.sender;
}
@Override
public Execution setArguments(Object args) {
if (args == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"Args"));
}
return new DistributedRegionFunctionExecutor(this, args);
}
@Override
public Execution withArgs(Object args) {
return setArguments(args);
}
@Override
public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"Result Collector"));
}
return new DistributedRegionFunctionExecutor(this, rs);
}
@Override
public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
if (argument == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"MemberMappedArgument"));
}
return new DistributedRegionFunctionExecutor(this, argument);
}
@Override
public AbstractExecution setIsReExecute() {
return new DistributedRegionFunctionExecutor(this, true);
}
@Override
public String toString() {
final StringBuffer buf = new StringBuffer();
buf.append("[ DistributedRegionFunctionExecutor:");
buf.append("args=");
buf.append(this.args);
buf.append(";region=");
buf.append(this.region.getName());
buf.append("]");
return buf.toString();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.internal.cache.execute.AbstractExecution#validateExecution(org.apache.geode.
* cache.execute.Function, java.util.Set)
*/
@Override
public void validateExecution(Function function, Set targetMembers) {
InternalCache cache = region.getGemFireCache();
if (cache != null && cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(
"Function inside a transaction cannot execute on more than one node");
} else {
assert targetMembers.size() == 1;
DistributedMember funcTarget = (DistributedMember) targetMembers.iterator().next();
DistributedMember target = cache.getTxManager().getTXState().getTarget();
if (target == null) {
cache.getTxManager().getTXState().setTarget(funcTarget);
} else if (!target.equals(funcTarget)) {
throw new TransactionDataNotColocatedException(
String.format(
"Function execution is not colocated with transaction. The transactional data is hosted on node %s, but you are trying to target node %s",
target, funcTarget));
}
}
}
if (!MemoryThresholds.isLowMemoryExceptionDisabled() && function.optimizeForWrite()) {
MemoryThresholdInfo info = region.getAtomicThresholdInfo();
if (info.isMemoryThresholdReached()) {
InternalResourceManager.getInternalResourceManager(region.getCache()).getHeapMonitor()
.updateStateAndSendEvent();
Set<DistributedMember> criticalMembers = info.getMembersThatReachedThreshold();
throw new LowMemoryException(
String.format(
"Function: %s cannot be executed because the members %s are running low on memory",
function.getId(), criticalMembers),
criticalMembers);
}
}
}
}