blob: 9a5bb534fd0094a15b89734a530061c08033f4ab [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.execute;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.LowMemoryException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.SetUtils;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/**
*
* @author ymahajan
*
*/
public class MultiRegionFunctionExecutor extends AbstractExecution {
private final Set<Region> regions;
private ServerToClientFunctionResultSender sender;
public MultiRegionFunctionExecutor(Set<Region> regions) {
this.regions = regions;
}
private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor drfe) {
super(drfe);
this.regions = drfe.regions;
if (drfe.filter != null) {
this.filter.clear();
this.filter.addAll(drfe.filter);
}
this.sender = drfe.sender;
}
private MultiRegionFunctionExecutor(Set<Region> regions, Set filter2,
Object args, MemberMappedArgument memberMappedArg,
ServerToClientFunctionResultSender resultSender) {
if (args != null) {
this.args = args;
}
else if (memberMappedArg != null) {
this.memberMappedArg = memberMappedArg;
this.isMemberMappedArgument = true;
}
this.sender = resultSender;
if (filter2 != null) {
this.filter.clear();
this.filter.addAll(filter2);
}
this.regions = regions;
this.isClientServerMode = true;
}
private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor executor,
MemberMappedArgument argument) {
super(executor);
this.regions = executor.getRegions();
this.filter.clear();
this.filter.addAll(executor.filter);
this.sender = executor.getServerResultSender();
this.memberMappedArg = argument;
this.isMemberMappedArgument = true;
}
private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor executor,
ResultCollector rs) {
super(executor);
this.regions = executor.getRegions();
this.filter.clear();
this.filter.addAll(executor.filter);
this.sender = executor.getServerResultSender();
this.rc = rs;
}
public MultiRegionFunctionExecutor(MultiRegionFunctionExecutor executor,
Object args) {
super(executor);
this.regions = executor.getRegions();
this.filter.clear();
this.filter.addAll(executor.filter);
this.sender = executor.getServerResultSender();
this.args = args;
}
public MultiRegionFunctionExecutor(MultiRegionFunctionExecutor executor,
boolean isReExecute) {
super(executor);
this.regions = executor.getRegions();
this.filter.clear();
this.filter.addAll(executor.filter);
this.sender = executor.getServerResultSender();
this.isReExecute = isReExecute;
}
public InternalExecution withMemberMappedArgument(
MemberMappedArgument argument) {
if (argument == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("MemberMapped Arg"));
}
return new MultiRegionFunctionExecutor(this, argument);
}
public Set<Region> getRegions() {
return this.regions;
}
public ServerToClientFunctionResultSender getServerResultSender() {
return this.sender;
}
public Execution withArgs(Object args) {
if (args == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("args"));
}
return new MultiRegionFunctionExecutor(this, args);
}
public Execution withCollector(ResultCollector<?, ?> rc) {
if (rc == null) {
throw new IllegalArgumentException(
LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
.toLocalizedString("Result Collector"));
}
return new MultiRegionFunctionExecutor(this, rc);
}
public Execution withFilter(Set<?> filter) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_CANNOT_SPECIFY_0_FOR_ONREGIONS_FUNCTION
.toLocalizedString("filter"));
}
@Override
public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_CANNOT_SPECIFY_0_FOR_ONREGIONS_FUNCTION
.toLocalizedString("bucket as filter"));
}
@Override
public AbstractExecution withRoutingObjects(Set<Object> routingObjects) {
throw new FunctionException(
LocalizedStrings.ExecuteFunction_CANNOT_SPECIFY_0_FOR_ONREGIONS_FUNCTION
.toLocalizedString("routing objects"));
}
@Override
protected ResultCollector executeFunction(Function function) {
if (function.hasResult()) {
ResultCollector rc = this.rc;
if (rc == null) {
rc = new DefaultResultCollector();
}
return executeFunction(function, rc);
}
else {
executeFunction(function, null);
return new NoResult();
}
}
private ResultCollector executeFunction(final Function function,
ResultCollector resultCollector) {
InternalDistributedSystem ds = InternalDistributedSystem
.getConnectedInstance();
if (ds == null) {
throw new IllegalStateException(
LocalizedStrings.ExecuteFunction_DS_NOT_CREATED_OR_NOT_READY
.toLocalizedString());
}
final DM dm = ds.getDistributionManager();
final Map<InternalDistributedMember, Set<String>> memberToRegionMap = calculateMemberToRegionMap();
final Set<InternalDistributedMember> dest = new HashSet<InternalDistributedMember>(
memberToRegionMap.keySet());
if (dest.isEmpty()) {
throw new FunctionException(
LocalizedStrings.MemberFunctionExecutor_NO_MEMBER_FOUND_FOR_EXECUTING_FUNCTION_0
.toLocalizedString(function.getId()));
}
final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (function.optimizeForWrite() && cache != null
&& cache.getResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
&& !MemoryThresholds.isLowMemoryExceptionDisabled()) {
Set<InternalDistributedMember> hcm = cache.getResourceAdvisor()
.adviseCritialMembers();
Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
.toLocalizedString(new Object[] { function.getId(), sm }), sm);
}
setExecutionNodes(dest);
final InternalDistributedMember localVM = cache.getMyId();
final LocalResultCollector<?, ?> localResultCollector = getLocalResultCollector(
function, resultCollector);
boolean remoteOnly = false;
boolean localOnly = false;
if (!dest.contains(localVM)) {
remoteOnly = true;
}
if (dest.size() == 1 && dest.contains(localVM)) {
localOnly = true;
}
validateExecution(function, dest);
final MemberFunctionResultSender resultSender = new MemberFunctionResultSender(
dm, localResultCollector, function, localOnly, remoteOnly, null);
if (dest.contains(localVM)) {
// if member is local VM
dest.remove(localVM);
Set<String> regionPathSet = memberToRegionMap.get(localVM);
Set<Region> regions = new HashSet<Region>();
if (regionPathSet != null) {
Cache cache1 = GemFireCacheImpl.getInstance();
for (String regionPath : regionPathSet) {
regions.add(cache1.getRegion(regionPath));
}
}
final FunctionContextImpl context = new MultiRegionFunctionContextImpl(
function.getId(), getArgumentsForMember(localVM.getId()),
resultSender, regions, this.isReExecute);
boolean isTx = cache.getTxManager().getTXState() == null ? false : true;
executeFunctionOnLocalNode(function, context, resultSender, dm, isTx);
}
if (!dest.isEmpty()) {
HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<InternalDistributedMember, Object>();
for (InternalDistributedMember recip : dest) {
memberArgs.put(recip, getArgumentsForMember(recip.getId()));
}
Assert.assertTrue(memberArgs.size() == dest.size());
MultiRegionFunctionResultWaiter waiter = new MultiRegionFunctionResultWaiter(
ds, localResultCollector, function, dest, memberArgs, resultSender,
memberToRegionMap);
ResultCollector reply = waiter
.getFunctionResultFrom(dest, function, this);
return reply;
}
return localResultCollector;
}
private Map<InternalDistributedMember, Set<String>> calculateMemberToRegionMap() {
Map<InternalDistributedMember, Set<String>> memberToRegions = new HashMap<InternalDistributedMember, Set<String>>();
// nodes is maintained for node pruning logic
Set<InternalDistributedMember> nodes = new HashSet<InternalDistributedMember>();
for (Region region : regions) {
DataPolicy dp = region.getAttributes().getDataPolicy();
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion)region;
Set<InternalDistributedMember> prMembers = pr.getRegionAdvisor()
.advisePrimaryOwners();
if (pr.isDataStore()) {
GemFireCacheImpl cache = (GemFireCacheImpl)region.getCache();
// Add local node
InternalDistributedMember localVm = cache.getMyId();
Set<String> regions = memberToRegions.get(localVm);
if (regions == null) {
regions = new HashSet<String>();
}
regions.add(pr.getFullPath());
memberToRegions.put(localVm, regions);
}
if (prMembers != null) {
for (InternalDistributedMember member : prMembers) {
Set<String> regions = memberToRegions.get(member);
if (regions == null) {
regions = new HashSet<String>();
}
regions.add(pr.getFullPath());
memberToRegions.put(member, regions);
}
nodes.addAll(prMembers);
}
}
else if (region instanceof DistributedRegion) {
if (dp.isEmpty() || dp.isNormal()) {
// Add local members
DistributedRegion dr = (DistributedRegion)region;
Set<InternalDistributedMember> replicates = dr
.getCacheDistributionAdvisor().adviseInitializedReplicates();
// if existing nodes contain one of the nodes from replicates
boolean added = false;
for (InternalDistributedMember member : replicates) {
if (nodes.contains(member)) {
added = true;
Set<String> regions = memberToRegions.get(member);
if (regions == null) {
regions = new HashSet<String>();
}
regions.add(dr.getFullPath());
memberToRegions.put(member, regions);
break;
}
}
// if existing nodes set is mutually exclusive to replicates
if (replicates.size() != 0 && !added) {
// select a random replicate
InternalDistributedMember member = (InternalDistributedMember)(replicates
.toArray()[new Random().nextInt(replicates.size())]);
Set<String> regions = memberToRegions.get(member);
if (regions == null) {
regions = new HashSet<String>();
}
regions.add(dr.getFullPath());
memberToRegions.put(member, regions);
}
}
else if (dp.withReplication()) {
GemFireCacheImpl cache = (GemFireCacheImpl)region.getCache();
// Add local node
InternalDistributedMember local = cache.getMyId();
Set<String> regions = memberToRegions.get(local);
if (regions == null) {
regions = new HashSet<String>();
}
regions.add(region.getFullPath());
memberToRegions.put(local, regions);
}
}
else if (region instanceof LocalRegion) {
GemFireCacheImpl cache = (GemFireCacheImpl)region.getCache();
// Add local node
InternalDistributedMember local = cache.getMyId();
Set<String> regions = memberToRegions.get(local);
if (regions == null) {
regions = new HashSet<String>();
}
regions.add(region.getFullPath());
memberToRegions.put(local, regions);
}
}
return memberToRegions;
}
@Override
public AbstractExecution setIsReExecute() {
return new MultiRegionFunctionExecutor(this, true);
}
@Override
public void validateExecution(Function function, Set targetMembers) {
GemFireCacheImpl cache = null;
for (Region r : regions) {
cache = (GemFireCacheImpl)r.getCache();
break;
}
if (cache != null && cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
.toLocalizedString());
} else {
assert targetMembers.size() == 1;
DistributedMember funcTarget = (DistributedMember)targetMembers.iterator().next();
DistributedMember target = cache.getTxManager().getTXState().getTarget();
if (target == null) {
cache.getTxManager().getTXState().setTarget(funcTarget);
} else if (!target.equals(funcTarget)) {
throw new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1
.toLocalizedString(new Object[] {target,funcTarget}));
}
}
}
if (function.optimizeForWrite() && cache.getResourceManager().getHeapMonitor().
containsHeapCriticalMembers(targetMembers) &&
!MemoryThresholds.isLowMemoryExceptionDisabled()) {
Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(
new Object[] {function.getId(), sm}), sm);
}
}
}