blob: a252c4a36462bb21ba05eeb8b14e9cd33fa503bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.FunctionContextImpl;
import org.apache.geode.internal.cache.execute.InternalFunctionExecutionService;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
import org.apache.geode.internal.cache.execute.InternalFunctionService;
import org.apache.geode.internal.cache.execute.MemberMappedArgument;
import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender65;
import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
import org.apache.geode.internal.cache.tier.sockets.ChunkedMessage;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.SecurityService;
/**
* @since GemFire 6.5
*/
public class ExecuteFunction65 extends BaseCommand {
@Immutable
private static final ExecuteFunction65 singleton = new ExecuteFunction65();
public static Command getCommand() {
return singleton;
}
private final InternalFunctionExecutionService internalFunctionExecutionService;
private final ServerToClientFunctionResultSender65Factory serverToClientFunctionResultSender65Factory;
private final FunctionContextImplFactory functionContextImplFactory;
private ExecuteFunction65() {
this(InternalFunctionService.getInternalFunctionExecutionService(),
new DefaultServerToClientFunctionResultSender65Factory(),
new DefaultFunctionContextImplFactory());
}
@VisibleForTesting
ExecuteFunction65(InternalFunctionExecutionService internalFunctionExecutionService,
ServerToClientFunctionResultSender65Factory serverToClientFunctionResultSender65Factory,
FunctionContextImplFactory functionContextImplFactory) {
this.internalFunctionExecutionService = internalFunctionExecutionService;
this.serverToClientFunctionResultSender65Factory = serverToClientFunctionResultSender65Factory;
this.functionContextImplFactory = functionContextImplFactory;
}
@Override
public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
final SecurityService securityService, long start) throws IOException {
Object function = null;
Object args = null;
MemberMappedArgument memberMappedArg = null;
byte hasResult = 0;
byte functionState = 0;
boolean isReexecute = false;
try {
functionState = clientMessage.getPart(0).getSerializedForm()[0];
if (functionState == AbstractExecution.HA_HASRESULT_NO_OPTIMIZEFORWRITE_REEXECUTE) {
functionState = AbstractExecution.HA_HASRESULT_NO_OPTIMIZEFORWRITE;
isReexecute = true;
} else if (functionState == AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE_REEXECUTE) {
functionState = AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE;
isReexecute = true;
}
if (functionState != 1) {
hasResult = (byte) ((functionState & 2) - 1);
} else {
hasResult = functionState;
}
if (hasResult == 1) {
serverConnection.setAsTrue(REQUIRES_RESPONSE);
serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
}
function = clientMessage.getPart(1).getStringOrObject();
args = clientMessage.getPart(2).getObject();
Part part = clientMessage.getPart(3);
if (part != null) {
memberMappedArg = (MemberMappedArgument) part.getObject();
}
} catch (ClassNotFoundException e) {
logger.warn("Exception on server while executing function: {}", function, e);
if (hasResult == 1) {
writeChunkedException(clientMessage, e, serverConnection);
serverConnection.setAsTrue(RESPONDED);
return;
}
}
if (function == null) {
String message = "The input function for the execute function request is null";
logger.warn("{}: {}", serverConnection.getName(), message);
sendError(hasResult, clientMessage, message, serverConnection);
return;
}
// Execute function on the cache
try {
Function<?> functionObject;
if (function instanceof String) {
functionObject = internalFunctionExecutionService.getFunction((String) function);
if (functionObject == null) {
String message = String.format("Function named %s is not registered to FunctionService",
function);
logger.warn("{}: {}", serverConnection.getName(), message);
sendError(hasResult, clientMessage, message, serverConnection);
return;
} else {
byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(),
functionObject.hasResult(), functionObject.optimizeForWrite());
if (logger.isDebugEnabled()) {
logger.debug("Function State on server side: {} on client: {}",
functionStateOnServerSide, functionState);
}
if (functionStateOnServerSide != functionState) {
String message =
"Function attributes at client and server don't match";
logger.warn("{}: {}", serverConnection.getName(), message);
sendError(hasResult, clientMessage, message, serverConnection);
return;
}
}
} else {
functionObject = (Function) function;
}
FunctionStats stats = FunctionStatsManager.getFunctionStats(functionObject.getId());
// check if the caller is authorized to do this operation on server
functionObject.getRequiredPermissions(null, args).forEach(securityService::authorize);
AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
ExecuteFunctionOperationContext executeContext = null;
if (authzRequest != null) {
executeContext = authzRequest.executeFunctionAuthorize(functionObject.getId(), null, null,
args, functionObject.optimizeForWrite());
}
ChunkedMessage chunkedMessage = serverConnection.getFunctionResponseMessage();
chunkedMessage.setTransactionId(clientMessage.getTransactionId());
ResultSender resultSender = serverToClientFunctionResultSender65Factory.create(chunkedMessage,
MessageType.EXECUTE_FUNCTION_RESULT, serverConnection, functionObject, executeContext);
FunctionContext context;
InternalCache cache = serverConnection.getCache();
InternalDistributedMember localVM =
(InternalDistributedMember) cache.getDistributedSystem().getDistributedMember();
if (memberMappedArg != null) {
context = functionContextImplFactory.create(cache, functionObject.getId(),
memberMappedArg.getArgumentsForMember(localVM.getId()), resultSender, isReexecute);
} else {
context = functionContextImplFactory.create(cache, functionObject.getId(), args,
resultSender, isReexecute);
}
ServerSideHandshake handshake = serverConnection.getHandshake();
int earlierClientReadTimeout = handshake.getClientReadTimeout();
handshake.setClientReadTimeout(0);
long startExecution = stats.startFunctionExecution(functionObject.hasResult());
try {
if (logger.isDebugEnabled()) {
logger.debug("Executing Function on Server: {} with context: {}", serverConnection,
context);
}
cache.getInternalResourceManager().getHeapMonitor().createLowMemoryIfNeeded(null,
(DistributedMember) null);
LowMemoryException lowMemoryException = cache.getInternalResourceManager().getHeapMonitor()
.createLowMemoryIfNeeded(functionObject, cache.getMyId());
if (lowMemoryException != null) {
sendException(hasResult, clientMessage, lowMemoryException.getMessage(), serverConnection,
lowMemoryException);
return;
}
functionObject.execute(context);
if (!((ServerToClientFunctionResultSender65) resultSender).isLastResultReceived()
&& functionObject.hasResult()) {
throw new FunctionException(String.format("The function, %s, did not send last result",
functionObject.getId()));
}
stats.endFunctionExecution(startExecution, functionObject.hasResult());
} catch (FunctionException e) {
stats.endFunctionExecutionWithException(startExecution, functionObject.hasResult());
throw e;
} catch (Exception e) {
stats.endFunctionExecutionWithException(startExecution, functionObject.hasResult());
throw new FunctionException(e);
} finally {
handshake.setClientReadTimeout(earlierClientReadTimeout);
}
} catch (IOException e) {
logger.warn("Exception on server while executing function: {}", function, e);
String message = "Server could not send the reply";
sendException(hasResult, clientMessage, message, serverConnection, e);
} catch (InternalFunctionInvocationTargetException e) {
/*
* TRAC #44709: InternalFunctionInvocationTargetException should not be logged
* Fix for #44709: User should not be aware of InternalFunctionInvocationTargetException. No
* instance is giving useful information to user to take any corrective action hence logging
* this at fine level logging. May occur when:
* 1> In case of HA FunctionInvocationTargetException thrown. Since it is HA, function will
* be re-executed on right node
* 2> in case of HA member departed
*/
if (logger.isDebugEnabled()) {
logger.debug("Exception on server while executing function: {}", function, e);
}
sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e);
} catch (Exception e) {
logger.warn("Exception on server while executing function: {}", function, e);
sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e);
}
}
private void sendException(byte hasResult, Message msg, String message,
ServerConnection serverConnection, Throwable e) throws IOException {
if (hasResult == 1) {
writeFunctionResponseException(msg, MessageType.EXCEPTION, serverConnection, e);
serverConnection.setAsTrue(RESPONDED);
}
}
private void sendError(byte hasResult, Message msg, String message,
ServerConnection serverConnection) throws IOException {
if (hasResult == 1) {
writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message,
serverConnection);
serverConnection.setAsTrue(RESPONDED);
}
}
interface ServerToClientFunctionResultSender65Factory {
ServerToClientFunctionResultSender65 create(ChunkedMessage msg, int messageType,
ServerConnection sc, Function function, ExecuteFunctionOperationContext authzContext);
}
interface FunctionContextImplFactory {
FunctionContextImpl create(Cache cache, String functionId, Object args,
ResultSender resultSender, boolean isPossibleDuplicate);
}
private static class DefaultServerToClientFunctionResultSender65Factory
implements ServerToClientFunctionResultSender65Factory {
@Override
public ServerToClientFunctionResultSender65 create(ChunkedMessage msg, int messageType,
ServerConnection sc, Function function, ExecuteFunctionOperationContext authzContext) {
return new ServerToClientFunctionResultSender65(msg, messageType, sc, function, authzContext);
}
}
private static class DefaultFunctionContextImplFactory implements FunctionContextImplFactory {
@Override
public FunctionContextImpl create(Cache cache, String functionId, Object args,
ResultSender resultSender, boolean isPossibleDuplicat) {
return new FunctionContextImpl(cache, functionId, args, resultSender, isPossibleDuplicat);
}
}
}