| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.internal.cache.execute; |
| |
| import java.util.Set; |
| import java.util.function.Supplier; |
| |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.ServerConnectivityException; |
| import org.apache.geode.cache.client.internal.ExecuteFunctionNoAckOp; |
| import org.apache.geode.cache.client.internal.ExecuteFunctionOp; |
| import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl; |
| import org.apache.geode.cache.client.internal.GetFunctionAttributeOp; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.client.internal.ProxyCache; |
| import org.apache.geode.cache.client.internal.UserAttributes; |
| import org.apache.geode.cache.execute.Execution; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionException; |
| import org.apache.geode.cache.execute.FunctionService; |
| import org.apache.geode.cache.execute.ResultCollector; |
| import org.apache.geode.internal.cache.TXManagerImpl; |
| import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector; |
| |
| public class ServerFunctionExecutor extends AbstractExecution { |
| |
| private PoolImpl pool; |
| |
| private final boolean allServers; |
| |
| private String[] groups; |
| |
| |
| ServerFunctionExecutor(Pool pool, boolean allServers, String... groups) { |
| this.pool = (PoolImpl) pool; |
| this.allServers = allServers; |
| this.groups = groups; |
| } |
| |
| ServerFunctionExecutor(Pool pool, boolean allServers, ProxyCache proxyCache, String... groups) { |
| this.pool = (PoolImpl) pool; |
| this.allServers = allServers; |
| this.proxyCache = proxyCache; |
| this.groups = groups; |
| } |
| |
| private ServerFunctionExecutor(ServerFunctionExecutor sfe) { |
| super(sfe); |
| if (sfe.pool != null) { |
| pool = sfe.pool; |
| } |
| allServers = sfe.allServers; |
| groups = sfe.groups; |
| } |
| |
| private ServerFunctionExecutor(ServerFunctionExecutor sfe, Object args) { |
| this(sfe); |
| this.args = args; |
| } |
| |
| private ServerFunctionExecutor(ServerFunctionExecutor sfe, ResultCollector collector) { |
| this(sfe); |
| rc = collector != null ? new SynchronizedResultCollector(collector) : null; |
| } |
| |
| private ServerFunctionExecutor(ServerFunctionExecutor sfe, MemberMappedArgument argument) { |
| this(sfe); |
| memberMappedArg = argument; |
| isMemberMappedArgument = true; |
| } |
| |
| protected ResultCollector executeFunction(final String functionId, boolean result, boolean isHA, |
| boolean optimizeForWrite) { |
| try { |
| if (proxyCache != null) { |
| if (proxyCache.isClosed()) { |
| throw proxyCache.getCacheClosedException("Cache is closed for this user."); |
| } |
| UserAttributes.userAttributes.set(proxyCache.getUserAttributes()); |
| } |
| |
| byte hasResult = 0; |
| if (result) { |
| hasResult = 1; |
| if (rc == null) { |
| ResultCollector defaultCollector = new DefaultResultCollector(); |
| return executeOnServer(functionId, defaultCollector, hasResult, isHA, optimizeForWrite); |
| } else { |
| return executeOnServer(functionId, rc, hasResult, isHA, optimizeForWrite); |
| } |
| } else { |
| executeOnServerNoAck(functionId, hasResult, isHA, optimizeForWrite); |
| return new NoResult(); |
| } |
| } finally { |
| UserAttributes.userAttributes.set(null); |
| } |
| } |
| |
| @Override |
| protected ResultCollector executeFunction(final Function function) { |
| byte hasResult = 0; |
| try { |
| if (proxyCache != null) { |
| if (proxyCache.isClosed()) { |
| throw proxyCache.getCacheClosedException("Cache is closed for this user."); |
| } |
| UserAttributes.userAttributes.set(proxyCache.getUserAttributes()); |
| } |
| |
| if (function.hasResult()) { |
| hasResult = 1; |
| if (rc == null) { |
| ResultCollector defaultCollector = new DefaultResultCollector(); |
| return executeOnServer(function, defaultCollector, hasResult); |
| } else { |
| return executeOnServer(function, rc, hasResult); |
| } |
| } else { |
| executeOnServerNoAck(function, hasResult); |
| return new NoResult(); |
| } |
| } finally { |
| UserAttributes.userAttributes.set(null); |
| } |
| |
| } |
| |
| private ResultCollector executeOnServer(Function function, ResultCollector rc, byte hasResult) { |
| FunctionStats stats = FunctionStats.getFunctionStats(function.getId()); |
| try { |
| validateExecution(function, null); |
| long start = stats.startTime(); |
| stats.startFunctionExecution(true); |
| |
| final ExecuteFunctionOpImpl executeFunctionOp = |
| new ExecuteFunctionOpImpl(function, args, memberMappedArg, |
| rc, isFnSerializationReqd, (byte) 0, groups, allServers, isIgnoreDepartedMembers(), |
| getTimeoutMs()); |
| |
| final Supplier<ExecuteFunctionOpImpl> executeFunctionOpSupplier = |
| () -> new ExecuteFunctionOpImpl(function, args, memberMappedArg, |
| rc, isFnSerializationReqd, (byte) 0, |
| null/* onGroups does not use single-hop for now */, |
| false, false, getTimeoutMs()); |
| |
| final Supplier<ExecuteFunctionOpImpl> reExecuteFunctionOpSupplier = |
| () -> new ExecuteFunctionOpImpl(function, this.getArguments(), |
| this.getMemberMappedArgument(), rc, |
| isFnSerializationReqd, (byte) 1, groups, allServers, |
| this.isIgnoreDepartedMembers(), getTimeoutMs()); |
| |
| ExecuteFunctionOp.execute(pool, allServers, |
| rc, function.isHA(), UserAttributes.userAttributes.get(), groups, |
| executeFunctionOp, |
| executeFunctionOpSupplier, |
| reExecuteFunctionOpSupplier); |
| |
| stats.endFunctionExecution(start, true); |
| rc.endResults(); |
| return rc; |
| } catch (FunctionException functionException) { |
| stats.endFunctionExecutionWithException(true); |
| throw functionException; |
| } catch (ServerConnectivityException exception) { |
| throw exception; |
| } catch (Exception exception) { |
| stats.endFunctionExecutionWithException(true); |
| throw new FunctionException(exception); |
| } |
| } |
| |
| private ResultCollector executeOnServer(String functionId, ResultCollector rc, byte hasResult, |
| boolean isHA, boolean optimizeForWrite) { |
| FunctionStats stats = FunctionStats.getFunctionStats(functionId); |
| try { |
| validateExecution(null, null); |
| long start = stats.startTime(); |
| stats.startFunctionExecution(true); |
| |
| final ExecuteFunctionOpImpl executeFunctionOp = |
| new ExecuteFunctionOpImpl(functionId, args, memberMappedArg, hasResult, |
| rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 0, groups, allServers, |
| this.isIgnoreDepartedMembers(), getTimeoutMs()); |
| |
| final Supplier<ExecuteFunctionOpImpl> executeFunctionOpSupplier = |
| () -> new ExecuteFunctionOpImpl(functionId, args, memberMappedArg, |
| hasResult, |
| rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 0, |
| null/* onGroups does not use single-hop for now */, false, false, getTimeoutMs()); |
| |
| final Supplier<ExecuteFunctionOpImpl> reExecuteFunctionOpSupplier = |
| () -> new ExecuteFunctionOpImpl(functionId, args, |
| this.getMemberMappedArgument(), |
| hasResult, rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 1, |
| groups, allServers, this.isIgnoreDepartedMembers(), getTimeoutMs()); |
| |
| ExecuteFunctionOp.execute(pool, allServers, |
| rc, isHA, |
| UserAttributes.userAttributes.get(), groups, |
| executeFunctionOp, |
| executeFunctionOpSupplier, |
| reExecuteFunctionOpSupplier); |
| |
| stats.endFunctionExecution(start, true); |
| rc.endResults(); |
| return rc; |
| } catch (FunctionException functionException) { |
| stats.endFunctionExecutionWithException(true); |
| throw functionException; |
| } catch (ServerConnectivityException exception) { |
| throw exception; |
| } catch (Exception exception) { |
| stats.endFunctionExecutionWithException(true); |
| throw new FunctionException(exception); |
| } |
| } |
| |
| private void executeOnServerNoAck(Function function, byte hasResult) { |
| FunctionStats stats = FunctionStats.getFunctionStats(function.getId()); |
| try { |
| validateExecution(function, null); |
| long start = stats.startTime(); |
| stats.startFunctionExecution(false); |
| ExecuteFunctionNoAckOp.execute(pool, function, args, memberMappedArg, allServers, |
| hasResult, isFnSerializationReqd, groups); |
| stats.endFunctionExecution(start, false); |
| } catch (FunctionException functionException) { |
| stats.endFunctionExecutionWithException(false); |
| throw functionException; |
| } catch (ServerConnectivityException exception) { |
| throw exception; |
| } catch (Exception exception) { |
| stats.endFunctionExecutionWithException(false); |
| throw new FunctionException(exception); |
| } |
| } |
| |
| private void executeOnServerNoAck(String functionId, byte hasResult, boolean isHA, |
| boolean optimizeForWrite) { |
| FunctionStats stats = FunctionStats.getFunctionStats(functionId); |
| try { |
| validateExecution(null, null); |
| long start = stats.startTime(); |
| stats.startFunctionExecution(false); |
| ExecuteFunctionNoAckOp.execute(pool, functionId, args, memberMappedArg, allServers, |
| hasResult, isFnSerializationReqd, isHA, optimizeForWrite, groups); |
| stats.endFunctionExecution(start, false); |
| } catch (FunctionException functionException) { |
| stats.endFunctionExecutionWithException(false); |
| throw functionException; |
| } catch (ServerConnectivityException exception) { |
| throw exception; |
| } catch (Exception exception) { |
| stats.endFunctionExecutionWithException(false); |
| throw new FunctionException(exception); |
| } |
| } |
| |
| public Pool getPool() { |
| return pool; |
| } |
| |
| @Override |
| public Execution withFilter(Set filter) { |
| throw new FunctionException( |
| String.format("Cannot specify %s for data independent functions", |
| "filter")); |
| } |
| |
| @Override |
| public InternalExecution withBucketFilter(Set<Integer> bucketIDs) { |
| throw new FunctionException( |
| String.format("Cannot specify %s for data independent functions", |
| "buckets as filter")); |
| } |
| |
| @Override |
| public Execution setArguments(Object args) { |
| if (args == null) { |
| throw new FunctionException( |
| String.format("The input %s for the execute function request is null", |
| "args")); |
| } |
| return new ServerFunctionExecutor(this, args); |
| } |
| |
| @Override |
| public Execution withArgs(Object args) { |
| return setArguments(args); |
| } |
| |
| @Override |
| public Execution withCollector(ResultCollector rs) { |
| if (rs == null) { |
| throw new FunctionException( |
| String.format("The input %s for the execute function request is null", |
| "Result Collector")); |
| } |
| return new ServerFunctionExecutor(this, rs); |
| } |
| |
| @Override |
| public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) { |
| if (argument == null) { |
| throw new FunctionException( |
| String.format("The input %s for the execute function request is null", |
| "MemberMapped Args")); |
| } |
| return new ServerFunctionExecutor(this, argument); |
| } |
| |
| @Override |
| public void validateExecution(Function function, Set targetMembers) { |
| if (TXManagerImpl.getCurrentTXUniqueId() != TXManagerImpl.NOTX) { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| @Override |
| public ResultCollector execute(final String functionName) { |
| if (functionName == null) { |
| throw new FunctionException( |
| "The input function for the execute function request is null"); |
| } |
| isFnSerializationReqd = false; |
| Function functionObject = FunctionService.getFunction(functionName); |
| if (functionObject == null) { |
| byte[] functionAttributes = getFunctionAttributes(functionName); |
| |
| if (functionAttributes == null) { |
| // Set authentication properties before executing the internal function. |
| try { |
| if (proxyCache != null) { |
| if (proxyCache.isClosed()) { |
| throw proxyCache.getCacheClosedException("Cache is closed for this user."); |
| } |
| UserAttributes.userAttributes.set(proxyCache.getUserAttributes()); |
| } |
| |
| Object obj = GetFunctionAttributeOp.execute(pool, functionName); |
| functionAttributes = (byte[]) obj; |
| addFunctionAttributes(functionName, functionAttributes); |
| } finally { |
| UserAttributes.userAttributes.set(null); |
| } |
| } |
| |
| boolean isHA = functionAttributes[1] == 1; |
| boolean hasResult = functionAttributes[0] == 1; |
| boolean optimizeForWrite = functionAttributes[2] == 1; |
| return executeFunction(functionName, hasResult, isHA, optimizeForWrite); |
| } else { |
| return executeFunction(functionObject); |
| } |
| } |
| } |