blob: 5a03f47a7750d1060f4cec626b2d33130785eeb6 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
import com.gemstone.gemfire.internal.cache.tier.MessageType;
import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
import com.gemstone.gemfire.internal.logging.LogService;
/**
* @author skumar
* @since 6.5
*/
public class ExecuteRegionFunctionSingleHopOp {
private static final Logger logger = LogService.getLogger();
private ExecuteRegionFunctionSingleHopOp() {
}
public static void execute(ExecutablePool pool, Region region,
Function function, ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector, byte hasResult,
Map<ServerLocation, ? extends HashSet> serverToFilterMap, int mRetryAttempts, boolean allBuckets) {
boolean reexecute = false;
Set<String> failedNodes = new HashSet<String>();
int maxRetryAttempts = 0;
if (function.isHA()) {
maxRetryAttempts = mRetryAttempts;
}
ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
.getClientMetadataService();
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The serverToFilterMap is : {}", serverToFilterMap);
}
List<SingleHopOperationCallable> callableTasks = constructAndGetExecuteFunctionTasks(region.getFullPath(),
serverRegionExecutor, serverToFilterMap, (PoolImpl)pool, function,
hasResult, resultCollector, cms, allBuckets);
reexecute = SingleHopClientExecutor.submitAllHA(callableTasks,
(LocalRegion)region, function.isHA(), resultCollector, failedNodes);
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The size of callableTask is : {}", callableTasks.size());
}
if (reexecute ) {
resultCollector.clearResults();
if(function.isHA()) {
ExecuteRegionFunctionOp.reexecute(pool, region.getFullPath(), function,
serverRegionExecutor, resultCollector, hasResult, failedNodes,
maxRetryAttempts - 1);
}
// else {
// ExecuteRegionFunctionOp.execute(pool, region.getFullPath(), function,
// serverRegionExecutor, resultCollector, hasResult,
// maxRetryAttempts - 1);
// }
}
resultCollector.endResults();
}
public static void execute(ExecutablePool pool, Region region,
String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector, byte hasResult,
Map<ServerLocation, ? extends HashSet> serverToFilterMap,
int mRetryAttempts, boolean allBuckets, boolean isHA,
boolean optimizeForWrite) {
boolean reexecute = false;
Set<String> failedNodes = new HashSet<String>();
int maxRetryAttempts = 0;
if (isHA) {
maxRetryAttempts = mRetryAttempts;
}
ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
.getClientMetadataService();
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The serverToFilterMap is : {}", serverToFilterMap);
}
List<SingleHopOperationCallable> callableTasks = constructAndGetExecuteFunctionTasks(region.getFullPath(),
serverRegionExecutor, serverToFilterMap, (PoolImpl)pool, functionId,
hasResult, resultCollector, cms, allBuckets, isHA,optimizeForWrite);
reexecute = SingleHopClientExecutor.submitAllHA(callableTasks,
(LocalRegion)region, isHA, resultCollector, failedNodes);
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The size of callableTask is: {}, reexecute={}", callableTasks.size(), reexecute);
}
if (reexecute) {
resultCollector.clearResults();
if (isHA) {
ExecuteRegionFunctionOp.reexecute(pool, region.getFullPath(),
functionId, serverRegionExecutor, resultCollector, hasResult,
failedNodes, maxRetryAttempts - 1, isHA, optimizeForWrite);
}
// else {
// ExecuteRegionFunctionOp.execute(pool, region.getFullPath(), functionId,
// serverRegionExecutor, resultCollector, hasResult,
// maxRetryAttempts - 1, isHA, optimizeForWrite);
// }
}
resultCollector.endResults();
}
static List<SingleHopOperationCallable> constructAndGetExecuteFunctionTasks(String region,
ServerRegionFunctionExecutor serverRegionExecutor,
final Map<ServerLocation, ? extends HashSet> serverToFilterMap,
final PoolImpl pool, final Function function, byte hasResult,
ResultCollector rc, ClientMetadataService cms, boolean allBucket) {
final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
serverToFilterMap.keySet());
if (logger.isDebugEnabled()) {
logger.debug("Constructing tasks for the servers {}", servers);
}
for (ServerLocation server : servers) {
ServerRegionFunctionExecutor executor = (ServerRegionFunctionExecutor)serverRegionExecutor
.withFilter(serverToFilterMap.get(server));
AbstractOp op = new ExecuteRegionFunctionSingleHopOpImpl(region, function, executor, rc, hasResult, new HashSet<String>(),
allBucket);
SingleHopOperationCallable task = new SingleHopOperationCallable(
new ServerLocation(server.getHostName(), server.getPort()), pool, op, UserAttributes.userAttributes.get());
tasks.add(task);
}
return tasks;
}
static List<SingleHopOperationCallable> constructAndGetExecuteFunctionTasks(String region,
ServerRegionFunctionExecutor serverRegionExecutor,
final Map<ServerLocation, ? extends HashSet> serverToFilterMap,
final PoolImpl pool, final String functionId, byte hasResult,
ResultCollector rc, ClientMetadataService cms, boolean allBucket, boolean isHA, boolean optimizeForWrite) {
final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
serverToFilterMap.keySet());
if (logger.isDebugEnabled()) {
logger.debug("Constructing tasks for the servers {}", servers);
}
for (ServerLocation server : servers) {
ServerRegionFunctionExecutor executor = (ServerRegionFunctionExecutor)serverRegionExecutor
.withFilter(serverToFilterMap.get(server));
AbstractOp op = new ExecuteRegionFunctionSingleHopOpImpl(region, functionId, executor, rc, hasResult, new HashSet<String>(),
allBucket, isHA, optimizeForWrite);
SingleHopOperationCallable task = new SingleHopOperationCallable(
new ServerLocation(server.getHostName(), server.getPort()), pool, op, UserAttributes.userAttributes.get());
tasks.add(task);
}
return tasks;
}
static class ExecuteRegionFunctionSingleHopOpImpl extends AbstractOp {
private final ResultCollector resultCollector;
private final String functionId;
private final String regionName;
private final ServerRegionFunctionExecutor executor;
private final byte hasResult;
private Set<String> failedNodes = new HashSet<String>();
private boolean isHA;
private boolean optimizeForWrite;
public ExecuteRegionFunctionSingleHopOpImpl(String region,
Function function, ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector rc, byte hasResult,
Set<String> removedNodes, boolean allBuckets) {
// What is this 8 that is getting added to filter and removednode sizes?
// It should have been used as a constant and documented
super(MessageType.EXECUTE_REGION_FUNCTION_SINGLE_HOP, 8
+ serverRegionExecutor.getFilter().size() + removedNodes.size());
this.isHA = function.isHA();
this.optimizeForWrite = function.optimizeForWrite();
byte functionState = AbstractExecution.getFunctionState(function.isHA(),
function.hasResult() , function.optimizeForWrite());
Set routingObjects = serverRegionExecutor.getFilter();
Object args = serverRegionExecutor.getArguments();
MemberMappedArgument memberMappedArg = serverRegionExecutor
.getMemberMappedArgument();
addBytes(functionState);
getMessage().addStringPart(region);
if (serverRegionExecutor.isFnSerializationReqd()) {
getMessage().addStringOrObjPart(function);
}
else {
getMessage().addStringOrObjPart(function.getId());
}
getMessage().addObjPart(args);
getMessage().addObjPart(memberMappedArg);
getMessage().addBytesPart(new byte[] { allBuckets ? (byte)1 : (byte)0 });
getMessage().addIntPart(routingObjects.size());
for (Object key : routingObjects) {
if(allBuckets){
getMessage().addIntPart((Integer)key);
}
else {
getMessage().addStringOrObjPart(key);
}
}
getMessage().addIntPart(removedNodes.size());
for (Object nodes : removedNodes) {
getMessage().addStringOrObjPart(nodes);
}
this.resultCollector = rc;
this.regionName = region;
this.functionId = function.getId();
this.executor = serverRegionExecutor;
this.hasResult = functionState;
this.failedNodes = removedNodes;
}
public ExecuteRegionFunctionSingleHopOpImpl(String region,
String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector rc, byte hasResult,
Set<String> removedNodes, boolean allBuckets, boolean isHA, boolean optimizeForWrite) {
// What is this 8 that is getting added to filter and removednode sizes?
// It should have been used as a constant and documented
super(MessageType.EXECUTE_REGION_FUNCTION_SINGLE_HOP, 8
+ serverRegionExecutor.getFilter().size() + removedNodes.size());
this.isHA = isHA;
this.optimizeForWrite = optimizeForWrite;
Set routingObjects = serverRegionExecutor.getFilter();
Object args = serverRegionExecutor.getArguments();
byte functionState = AbstractExecution.getFunctionState(isHA,
hasResult == (byte)1 ? true : false, optimizeForWrite);
MemberMappedArgument memberMappedArg = serverRegionExecutor
.getMemberMappedArgument();
addBytes(functionState);
getMessage().addStringPart(region);
getMessage().addStringOrObjPart(functionId);
getMessage().addObjPart(args);
getMessage().addObjPart(memberMappedArg);
getMessage().addBytesPart(new byte[] { allBuckets ? (byte)1 : (byte)0 });
getMessage().addIntPart(routingObjects.size());
for (Object key : routingObjects) {
if(allBuckets){
getMessage().addIntPart((Integer)key);
}
else {
getMessage().addStringOrObjPart(key);
}
}
getMessage().addIntPart(removedNodes.size());
for (Object nodes : removedNodes) {
getMessage().addStringOrObjPart(nodes);
}
this.resultCollector = rc;
this.regionName = region;
this.functionId = functionId;
this.executor = serverRegionExecutor;
this.hasResult = functionState;
this.failedNodes = removedNodes;
}
private void addBytes(byte functionState) {
if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
getMessage().addBytesPart(new byte[] { functionState });
} else {
byte[] bytes = new byte[5];
bytes[0] = functionState;
Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
getMessage().addBytesPart(bytes);
}
}
@Override
protected Object processResponse(Message msg) throws Exception {
ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
try {
executeFunctionResponseMsg.readHeader();
switch (executeFunctionResponseMsg.getMessageType()) {
case MessageType.EXECUTE_REGION_FUNCTION_RESULT:
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_RESULT.");
}
do {
executeFunctionResponseMsg.receiveChunk();
Object resultResponse = executeFunctionResponseMsg.getPart(0)
.getObject();
Object result;
if (resultResponse instanceof ArrayList) {
result = ((ArrayList)resultResponse).get(0);
}
else {
result = resultResponse;
}
if (result instanceof FunctionException) {
FunctionException ex = ((FunctionException)result);
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received Exception. {}", ex.getCause());
}
if (ex instanceof InternalFunctionException) {
Throwable cause = ex.getCause();
DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
.get(1);
this.resultCollector
.addResult(memberID, cause);
FunctionStats.getFunctionStats(this.functionId,
this.executor.getRegion().getSystem())
.incResultsReceived();
continue;
}
else if (((FunctionException)result).getCause() instanceof InternalFunctionInvocationTargetException) {
InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
.getCause();
this.failedNodes.addAll(ifite.getFailedNodeSet());
}
if (!ex.getMessage().equals("Buckets are null"))
throw ex;
return null;
}
else if (result instanceof Throwable) {
String s = "While performing a remote " + getOpName();
throw new ServerOperationException(s, (Throwable)result);
}
else {
DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
.get(1);
synchronized (this.resultCollector) {
this.resultCollector
.addResult(memberID, result);
}
FunctionStats.getFunctionStats(this.functionId,
this.executor.getRegion().getSystem()).incResultsReceived();
}
} while (!executeFunctionResponseMsg.isLastChunk());
if (isDebugEnabled) {
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received all the results from server successfully.");
}
return null;
case MessageType.EXCEPTION:
if (logger.isDebugEnabled()) {
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXCEPTION");
}
executeFunctionResponseMsg.receiveChunk();
Part part0 = executeFunctionResponseMsg.getPart(0);
Object obj = part0.getObject();
if (obj instanceof FunctionException) {
FunctionException ex = ((FunctionException)obj);
if (((FunctionException)obj).getCause() instanceof InternalFunctionInvocationTargetException) {
InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
.getCause();
this.failedNodes.addAll(ifite.getFailedNodeSet());
}
if (!ex.getMessage().equals("Buckets are null")) {
throw ex;
}
return null;
}
else if (obj instanceof Throwable) {
String s = "While performing a remote " + getOpName();
throw new ServerOperationException(s, (Throwable)obj);
}
break;
case MessageType.EXECUTE_REGION_FUNCTION_ERROR:
if (logger.isDebugEnabled()) {
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_ERROR");
}
executeFunctionResponseMsg.receiveChunk();
String errorMessage = executeFunctionResponseMsg.getPart(0)
.getString();
throw new ServerOperationException(errorMessage);
default:
throw new InternalGemFireError("Unknown message type "
+ executeFunctionResponseMsg.getMessageType());
}
}
finally {
executeFunctionResponseMsg.clear();
}
return null;
}
ResultCollector getResultCollector() {
return this.resultCollector;
}
String getFunctionId() {
return this.functionId;
}
String getRegionName() {
return this.regionName;
}
ServerRegionFunctionExecutor getExecutor() {
return this.executor;
}
byte getHasResult() {
return this.hasResult;
}
boolean isHA() {
return this.isHA;
}
boolean optimizeForWrite() {
return this.optimizeForWrite;
}
@Override
protected boolean isErrorResponse(int msgType) {
return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
}
@Override
protected long startAttempt(ConnectionStats stats) {
return stats.startExecuteFunction();
}
protected String getOpName() {
return "executeRegionFunctionSingleHop";
}
@Override
protected void endSendAttempt(ConnectionStats stats, long start) {
stats.endExecuteFunctionSend(start, hasFailed());
}
@Override
protected void endAttempt(ConnectionStats stats, long start) {
stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
}
@Override
protected Message createResponseMessage() {
return new ChunkedMessage(1, Version.CURRENT);
}
}
}