blob: 847ca632586415ef7c1d58a4b4a38fc24954c8ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import static org.apache.geode.internal.cache.execute.AbstractExecution.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.FunctionStats;
import org.apache.geode.internal.cache.execute.InternalFunctionException;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
import org.apache.geode.internal.cache.execute.MemberMappedArgument;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.ChunkedMessage;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.logging.LogService;
/**
* Executes the function on server (possibly without region/cache).<br>
* Also gets the result back from the server
*
* @since GemFire 5.8
*/
public class ExecuteFunctionOp {
private static final Logger logger = LogService.getLogger();
/** index of allMembers in flags[] */
public static final int ALL_MEMBERS_INDEX = 0;
/** index of ignoreFailedMembers in flags[] */
public static final int IGNORE_FAILED_MEMBERS_INDEX = 1;
private ExecuteFunctionOp() {
// no instances allowed
}
public static void execute(final PoolImpl pool,
boolean allServers, ResultCollector rc,
final boolean isHA, UserAttributes attributes, String[] groups,
final ExecuteFunctionOpImpl executeFunctionOp,
final Supplier<ExecuteFunctionOpImpl> executeFunctionOpSupplier,
final Supplier<ExecuteFunctionOpImpl> reExecuteFunctionOpSupplier) {
final AbstractOp op = executeFunctionOp;
if (allServers && groups.length == 0) {
List callableTasks = constructAndGetFunctionTasks(pool,
attributes, executeFunctionOpSupplier);
SingleHopClientExecutor.submitAll(callableTasks);
} else {
boolean reexecute = false;
int maxRetryAttempts = pool.getRetryAttempts();
if (!isHA) {
maxRetryAttempts = 0;
}
do {
try {
if (reexecute) {
pool.execute(reExecuteFunctionOpSupplier.get(), 0);
} else {
pool.execute(op, 0);
}
reexecute = false;
} catch (InternalFunctionInvocationTargetException e) {
if (isHA) {
reexecute = true;
}
rc.clearResults();
} catch (ServerOperationException serverOperationException) {
throw serverOperationException;
} catch (ServerConnectivityException se) {
if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
// If the retryAttempt is set to default(-1). Try it on all servers once.
// Calculating number of servers when function is re-executed as it involves
// messaging locator.
maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
}
if ((maxRetryAttempts--) < 1) {
throw se;
}
reexecute = true;
rc.clearResults();
}
} while (reexecute);
}
}
private static List constructAndGetFunctionTasks(final PoolImpl pool,
UserAttributes attributes,
final Supplier<ExecuteFunctionOpImpl> executeFunctionOpSupplier) {
final List<SingleHopOperationCallable> tasks = new ArrayList<>();
List<ServerLocation> servers = pool.getConnectionSource().getAllServers();
for (ServerLocation server : servers) {
final AbstractOp op = executeFunctionOpSupplier.get();
SingleHopOperationCallable task =
new SingleHopOperationCallable(server, pool, op, attributes);
tasks.add(task);
}
return tasks;
}
static byte[] getByteArrayForFlags(boolean... flags) {
byte[] retVal = null;
if (flags.length > 0) {
retVal = new byte[flags.length];
for (int i = 0; i < flags.length; i++) {
if (flags[i]) {
retVal[i] = 1;
} else {
retVal[i] = 0;
}
}
}
return retVal;
}
public static class ExecuteFunctionOpImpl extends AbstractOpWithTimeout {
private ResultCollector resultCollector;
// To get the instance of the Function Statistics we need the function name or instance
private String functionId;
private Function function;
private Object args;
private MemberMappedArgument memberMappedArg;
private byte hasResult;
private boolean isFnSerializationReqd;
private String[] groups;
/**
* [0] = allMembers [1] = ignoreFailedMembers
*/
private byte[] flags;
/**
* number of parts in the request message
*/
private static final int MSG_PARTS = 6;
/**
* @throws org.apache.geode.SerializationException if serialization fails
*/
public ExecuteFunctionOpImpl(Function function, Object args,
MemberMappedArgument memberMappedArg,
ResultCollector rc, boolean isFnSerializationReqd,
byte isReexecute,
String[] groups, boolean allMembers, boolean ignoreFailedMembers,
final int timeoutMs) {
super(MessageType.EXECUTE_FUNCTION, MSG_PARTS, timeoutMs);
byte fnState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(),
function.optimizeForWrite());
addBytes(isReexecute, fnState);
if (isFnSerializationReqd) {
getMessage().addStringOrObjPart(function);
} else {
getMessage().addStringOrObjPart(function.getId());
}
getMessage().addObjPart(args);
getMessage().addObjPart(memberMappedArg);
getMessage().addObjPart(groups);
flags = getByteArrayForFlags(allMembers, ignoreFailedMembers);
getMessage().addBytesPart(flags);
resultCollector = rc;
if (isReexecute == 1) {
resultCollector.clearResults();
}
functionId = function.getId();
this.function = function;
this.args = args;
this.memberMappedArg = memberMappedArg;
this.hasResult = fnState;
this.isFnSerializationReqd = isFnSerializationReqd;
this.groups = groups;
}
public ExecuteFunctionOpImpl(String functionId, Object args2,
MemberMappedArgument memberMappedArg, byte hasResult, ResultCollector rc,
boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite, byte isReexecute,
String[] groups, boolean allMembers, boolean ignoreFailedMembers, final int timeoutMs) {
super(MessageType.EXECUTE_FUNCTION, MSG_PARTS, timeoutMs);
byte fnState = AbstractExecution.getFunctionState(isHA, hasResult == (byte) 1,
optimizeForWrite);
addBytes(isReexecute, fnState);
getMessage().addStringOrObjPart(functionId);
getMessage().addObjPart(args2);
getMessage().addObjPart(memberMappedArg);
getMessage().addObjPart(groups);
flags = getByteArrayForFlags(allMembers, ignoreFailedMembers);
getMessage().addBytesPart(flags);
resultCollector = rc;
if (isReexecute == 1) {
resultCollector.clearResults();
}
this.functionId = functionId;
args = args2;
this.memberMappedArg = memberMappedArg;
this.hasResult = fnState;
this.isFnSerializationReqd = isFnSerializationReqd;
this.groups = groups;
}
ExecuteFunctionOpImpl(ExecuteFunctionOpImpl op, byte isReexecute) {
super(MessageType.EXECUTE_FUNCTION, MSG_PARTS, op.getTimeoutMs());
resultCollector = op.resultCollector;
function = op.function;
functionId = op.functionId;
hasResult = op.hasResult;
args = op.args;
memberMappedArg = op.memberMappedArg;
isFnSerializationReqd = op.isFnSerializationReqd;
groups = op.groups;
flags = op.flags;
addBytes(isReexecute, hasResult);
if (isFnSerializationReqd) {
getMessage().addStringOrObjPart(function);
} else {
getMessage().addStringOrObjPart(function.getId());
}
getMessage().addObjPart(args);
getMessage().addObjPart(memberMappedArg);
getMessage().addObjPart(groups);
getMessage().addBytesPart(flags);
if (isReexecute == 1) {
resultCollector.clearResults();
}
}
private void addBytes(byte isReexecute, byte fnStateOrHasResult) {
if (getTimeoutMs() == DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
if (isReexecute == 1) {
getMessage().addBytesPart(
new byte[] {AbstractExecution.getReexecuteFunctionState(fnStateOrHasResult)});
} else {
getMessage().addBytesPart(new byte[] {fnStateOrHasResult});
}
} else {
byte[] bytes = new byte[5];
if (isReexecute == 1) {
bytes[0] = AbstractExecution.getReexecuteFunctionState(fnStateOrHasResult);
} else {
bytes[0] = fnStateOrHasResult;
}
Part.encodeInt(getTimeoutMs(), bytes, 1);
getMessage().addBytesPart(bytes);
}
}
/**
* ignoreFaileMember flag is at index 1
*/
private boolean getIgnoreFailedMembers() {
boolean ignoreFailedMembers = false;
if (flags != null && flags.length > 1) {
if (flags[IGNORE_FAILED_MEMBERS_INDEX] == 1) {
ignoreFailedMembers = true;
}
}
return ignoreFailedMembers;
}
@Override
protected Object processResponse(Message msg) throws Exception {
ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage) msg;
try {
// Read the header which describes the type of message following
executeFunctionResponseMsg.readHeader();
switch (executeFunctionResponseMsg.getMessageType()) {
case MessageType.EXECUTE_FUNCTION_RESULT:
if (logger.isDebugEnabled()) {
logger.debug(
"ExecuteFunctionOpImpl#processResponse: received message of type EXECUTE_FUNCTION_RESULT.");
}
Exception exception = null;
// Read the chunk
do {
executeFunctionResponseMsg.receiveChunk();
Object resultResponse = executeFunctionResponseMsg.getPart(0).getObject();
Object result;
if (resultResponse instanceof ArrayList) {
result = ((ArrayList) resultResponse).get(0);
} else {
result = resultResponse;
}
if (result instanceof FunctionException) {
FunctionException ex = ((FunctionException) result);
if (ex instanceof InternalFunctionException || getIgnoreFailedMembers()) {
Throwable cause = ex.getCause() == null ? ex : ex.getCause();
DistributedMember memberID =
(DistributedMember) ((ArrayList) resultResponse).get(1);
resultCollector.addResult(memberID, cause);
FunctionStats.getFunctionStats(functionId).incResultsReceived();
} else {
exception = ex;
}
} else if (result instanceof Throwable) {
String s = "While performing a remote " + getOpName();
exception = new ServerOperationException(s, (Throwable) result);
// Get the exception toString part.
// This was added for c++ thin client and not used in java
} else {
DistributedMember memberID =
(DistributedMember) ((ArrayList) resultResponse).get(1);
resultCollector.addResult(memberID, result);
FunctionStats.getFunctionStats(functionId).incResultsReceived();
}
} while (!executeFunctionResponseMsg.isLastChunk());
if (exception != null) {
throw exception;
}
if (logger.isDebugEnabled()) {
logger.debug(
"ExecuteFunctionOpImpl#processResponse: received all the results from server successfully.");
}
return null;
case MessageType.EXCEPTION:
if (logger.isDebugEnabled()) {
logger.debug(
"ExecuteFunctionOpImpl#processResponse: received message of type EXCEPTION");
}
// Read the chunk
executeFunctionResponseMsg.receiveChunk();
Part part0 = executeFunctionResponseMsg.getPart(0);
Object obj = part0.getObject();
if (obj instanceof FunctionException) {
throw ((FunctionException) obj);
} else {
final Throwable t = (Throwable) obj;
throw new ServerOperationException(
": While performing a remote execute Function" + t.getMessage(), t);
}
case MessageType.EXECUTE_FUNCTION_ERROR:
if (logger.isDebugEnabled()) {
logger.debug(
"ExecuteFunctionOpImpl#processResponse: received message of type EXECUTE_FUNCTION_ERROR");
}
// Read the chunk
executeFunctionResponseMsg.receiveChunk();
String errorMessage = executeFunctionResponseMsg.getPart(0).getString();
throw new ServerOperationException(errorMessage);
default:
throw new InternalGemFireError(String.format("Unknown message type %s",
executeFunctionResponseMsg.getMessageType()));
}
} finally {
executeFunctionResponseMsg.clear();
}
}
@Override
protected boolean isErrorResponse(int msgType) {
return msgType == MessageType.EXECUTE_FUNCTION_ERROR;
}
@Override
protected long startAttempt(ConnectionStats stats) {
return stats.startExecuteFunction();
}
protected String getOpName() {
return "executeFunction";
}
@Override
protected void endSendAttempt(ConnectionStats stats, long start) {
stats.endExecuteFunctionSend(start, hasFailed());
}
@Override
protected void endAttempt(ConnectionStats stats, long start) {
stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
}
@Override
protected Message createResponseMessage() {
return new ChunkedMessage(1, Version.CURRENT);
}
}
}