blob: 3416c93b5187cb8437973cc2f0fcb97b8ddce19d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionService;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.InternalClientCache;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.ProxyRegion;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.InternalEntity;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegion;
public class InternalFunctionExecutionServiceImpl
implements FunctionExecutionService, InternalFunctionExecutionService {
/**
* use when the optimization to execute onMember locally is not desired.
*/
public static final boolean RANDOM_onMember =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "randomizeOnMember");
private static final String[] EMPTY_GROUPS = new String[0];
@MakeNotStatic
private static final ConcurrentHashMap<String, Function> idToFunctionMap =
new ConcurrentHashMap<>();
public InternalFunctionExecutionServiceImpl() {
// nothing
}
// FunctionExecutionService API ----------------------------------------------------------------
@Override
public Execution onServer(Pool pool) {
return onServer(pool, EMPTY_GROUPS);
}
@Override
public Execution onServers(Pool pool) {
return onServers(pool, EMPTY_GROUPS);
}
@Override
public Execution onServer(RegionService regionService) {
return onServer(regionService, EMPTY_GROUPS);
}
@Override
public Execution onServers(RegionService regionService) {
return onServers(regionService, EMPTY_GROUPS);
}
@Override
public Execution onMember(DistributedMember distributedMember) {
return onMember(getDistributedSystem(), distributedMember);
}
@Override
public Execution onMembers(String... groups) {
return onMembers(getDistributedSystem(), groups);
}
@Override
public Execution onMembers(Set<DistributedMember> distributedMembers) {
return onMembers(getDistributedSystem(), distributedMembers);
}
@Override
public Execution onMember(String... groups) {
return onMember(getDistributedSystem(), groups);
}
protected Pool findPool(String poolName) {
return PoolManager.find(poolName);
}
@Override
public Execution onRegion(Region region) {
if (region == null) {
throw new FunctionException("Region instance passed is null");
}
ProxyCache proxyCache = null;
String poolName = region.getAttributes().getPoolName();
if (poolName != null) {
Pool pool = findPool(poolName);
if (pool == null) {
throw new IllegalStateException(String.format("Could not find a pool named %s.", poolName));
} else {
if (pool.getMultiuserAuthentication()) {
if (region instanceof ProxyRegion) {
ProxyRegion proxyRegion = (ProxyRegion) region;
region = proxyRegion.getRealRegion();
proxyCache = proxyRegion.getAuthenticatedCache();
} else {
throw new UnsupportedOperationException();
}
}
}
}
if (isClientRegion(region)) {
return new ServerRegionFunctionExecutor(region, proxyCache);
}
if (PartitionRegionHelper.isPartitionedRegion(region)) {
return new PartitionedRegionFunctionExecutor(region);
}
return new DistributedRegionFunctionExecutor(region);
}
@Override
public Function getFunction(String functionId) {
if (functionId == null) {
throw new FunctionException(String.format("%s passed is null",
"functionId instance "));
}
return idToFunctionMap.get(functionId);
}
@Override
public void registerFunction(Function function) {
if (function == null) {
throw new FunctionException(String.format("%s passed is null",
"function instance "));
}
if (function.getId() == null) {
throw new FunctionException(
"function.getId() returned null, implement the Function.getId() method properly");
}
if (function.isHA() && !function.hasResult()) {
throw new FunctionException(
"For Functions with isHA true, hasResult must also be true.");
}
idToFunctionMap.put(function.getId(), function);
}
@Override
public void unregisterFunction(String functionId) {
if (functionId == null) {
throw new FunctionException(String.format("%s passed is null",
"functionId instance "));
}
idToFunctionMap.remove(functionId);
}
@Override
public boolean isRegistered(String functionId) {
if (functionId == null) {
throw new FunctionException(String.format("%s passed is null",
"functionId instance "));
}
return idToFunctionMap.containsKey(functionId);
}
@Override
public Map<String, Function> getRegisteredFunctions() {
// We have to remove the internal functions before returning the map to the users
final Map<String, Function> tempIdToFunctionMap = new HashMap<>();
for (Map.Entry<String, Function> entry : idToFunctionMap.entrySet()) {
if (!(entry.getValue() instanceof InternalEntity)) {
tempIdToFunctionMap.put(entry.getKey(), entry.getValue());
}
}
return tempIdToFunctionMap;
}
// InternalFunctionExecutionService OnServerGroups API -----------------------------------------
@Override
public Execution onServer(Pool pool, String... groups) {
if (pool == null) {
throw new FunctionException(
String.format("%s passed is null", "Pool instance "));
}
if (pool.getMultiuserAuthentication()) {
throw new UnsupportedOperationException();
}
return new ServerFunctionExecutor(pool, false, groups);
}
@Override
public Execution onServers(Pool pool, String... groups) {
if (pool == null) {
throw new FunctionException(
String.format("%s passed is null", "Pool instance "));
}
if (pool.getMultiuserAuthentication()) {
throw new UnsupportedOperationException();
}
return new ServerFunctionExecutor(pool, true, groups);
}
@Override
public Execution onServer(RegionService regionService, String... groups) {
if (regionService == null) {
throw new FunctionException(String.format("%s passed is null",
"RegionService instance "));
}
if (regionService instanceof GemFireCacheImpl) {
InternalClientCache internalCache = (InternalClientCache) regionService;
if (!internalCache.isClient()) {
throw new FunctionException("The cache was not a client cache");
} else if (internalCache.getDefaultPool() != null) {
return onServer(internalCache.getDefaultPool(), groups);
} else {
throw new FunctionException("The client cache does not have a default pool");
}
} else {
ProxyCache proxyCache = (ProxyCache) regionService;
return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), false, proxyCache,
groups);
}
}
@Override
public Execution onServers(RegionService regionService, String... groups) {
if (regionService == null) {
throw new FunctionException(String.format("%s passed is null",
"RegionService instance "));
}
if (regionService instanceof GemFireCacheImpl) {
InternalClientCache internalCache = (InternalClientCache) regionService;
if (!internalCache.isClient()) {
throw new FunctionException("The cache was not a client cache");
} else if (internalCache.getDefaultPool() != null) {
return onServers(internalCache.getDefaultPool(), groups);
} else {
throw new FunctionException("The client cache does not have a default pool");
}
} else {
ProxyCache proxyCache = (ProxyCache) regionService;
return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), true, proxyCache,
groups);
}
}
// InternalFunctionExecutionService InDistributedSystem API ------------------------------------
@Override
public Execution onMember(DistributedSystem system, DistributedMember distributedMember) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
}
if (distributedMember == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedMember instance "));
}
return new MemberFunctionExecutor(system, distributedMember);
}
@Override
public Execution onMembers(DistributedSystem system, String... groups) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
}
if (groups.length == 0) {
return new MemberFunctionExecutor(system);
}
Set<DistributedMember> members = new HashSet<>();
for (String group : groups) {
members.addAll(system.getGroupMembers(group));
}
if (members.isEmpty()) {
throw new FunctionException(String.format("No members found in group(s) %s",
Arrays.toString(groups)));
}
return new MemberFunctionExecutor(system, members);
}
@Override
public Execution onMember(DistributedSystem system, String... groups) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
}
Set<DistributedMember> members = new HashSet<>();
for (String group : groups) {
List<DistributedMember> grpMembers = new ArrayList<>(system.getGroupMembers(group));
if (!grpMembers.isEmpty()) {
if (!RANDOM_onMember && grpMembers.contains(system.getDistributedMember())) {
members.add(system.getDistributedMember());
} else {
Collections.shuffle(grpMembers);
members.add(grpMembers.get(0));
}
}
}
if (members.isEmpty()) {
throw new FunctionException(String.format("No members found in group(s) %s",
Arrays.toString(groups)));
}
return new MemberFunctionExecutor(system, members);
}
@Override
public Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
}
if (distributedMembers == null) {
throw new FunctionException(String.format("%s passed is null",
"distributedMembers set "));
}
return new MemberFunctionExecutor(system, distributedMembers);
}
// InternalFunctionExecutionService OnRegions API ----------------------------------------------
@Override
public Execution onRegions(Set<Region> regions) {
if (regions == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"regions set"));
}
if (regions.contains(null)) {
throw new IllegalArgumentException(
"One or more region references added to the regions set is(are) null");
}
if (regions.isEmpty()) {
throw new IllegalArgumentException(
"Regions set is empty for onRegions function execution");
}
for (Region region : regions) {
if (isClientRegion(region)) {
throw new UnsupportedOperationException(
"FunctionService#onRegions() is not supported for cache clients in client server mode");
}
}
return new MultiRegionFunctionExecutor(regions);
}
// InternalFunctionExecutionService unregisterAllFunctions API ---------------------------------
@Override
public void unregisterAllFunctions() {
// Unregistering all the functions registered with the FunctionService.
for (String functionId : idToFunctionMap.keySet()) {
unregisterFunction(functionId);
}
}
/**
* @return true if the method is called on a region has a {@link Pool}.
* @since GemFire 6.0
*/
private boolean isClientRegion(Region region) {
return ((InternalRegion) region).hasServerProxy();
}
private static DistributedSystem getDistributedSystem() {
DistributedSystem system = InternalDistributedSystem.getConnectedInstance();
if (system == null) {
throw new DistributedSystemDisconnectedException(
"This connection to a distributed system has been disconnected.");
}
return system;
}
}