blob: ae77db2272261dcc376d5c27c7b8edc983ce12eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.client.ClientRegionShortcut.CACHING_PROXY;
import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import util.TestException;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.internal.ClientMetadataService;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.dunit.rules.MemberVM;
public class FunctionRetryTestBase implements Serializable {
@ClassRule
public static final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
private static final int TOTAL_NUM_BUCKETS = 3;
private static final int REDUNDANT_COPIES = 0;
private static final String regionName = "FunctionOnRegionRetryDUnitTest";
@Rule
public DistributedRule distributedRule = new DistributedRule(5);
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
protected enum HAStatus {
NOT_HA, HA
}
protected enum ExecutionTarget {
REGION,
REGION_WITH_FILTER_1_KEY,
REGION_WITH_FILTER_2_KEYS,
SERVER,
SERVERS,
SERVER_REGION_SERVICE,
SERVERS_REGION_SERVICE,
MEMBER,
MEMBERS
}
protected enum FunctionIdentifierType {
STRING, OBJECT_REFERENCE
}
protected enum ClientMetadataStatus {
CLIENT_HAS_METADATA, CLIENT_MISSING_METADATA
}
MemberVM locator;
MemberVM server1;
MemberVM server2;
MemberVM server3;
ClientVM client;
Logger logger = LogService.getLogger();
@Before
public void setUp() throws Exception {
locator = clusterStartupRule.startLocatorVM(0);
server1 = startServer(1);
server2 = startServer(2);
server3 = startServer(3);
}
@After
public void tearDown() throws Exception {
IgnoredException.removeAllExpectedExceptions();
}
public Function testFunctionRetry(
final HAStatus haStatus,
final ClientMetadataStatus clientMetadataStatus,
final ExecutionTarget executionTarget,
final FunctionIdentifierType functionIdentifierType,
final int retryAttempts) throws Exception {
if (retryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
client = startClient(4);
} else {
client = startClient(4, retryAttempts);
}
TheFunction function = new TheFunction(haStatus);
createServerRegionAndRegisterFunction(functionIdentifierType, function);
createClientRegionAndInitializeMetaDataState();
setClientMetaDataStatus(clientMetadataStatus);
IgnoredException.addIgnoredException(FunctionException.class.getName());
AsyncInvocation clientExecuteAsync = client.invokeAsync(() -> {
assertThat(executionTarget).isNotNull();
final Execution<Integer, Long, List<Long>> execution = getExecutionTarget(executionTarget);
((AbstractExecution) execution).removeFunctionAttributes(function.getId());
ResultCollector<Long, List<Long>> resultCollector = null;
try {
switch (functionIdentifierType) {
case STRING:
resultCollector = execution.execute(function.getId());
break;
case OBJECT_REFERENCE:
resultCollector = execution.execute(function);
break;
default:
throw new TestException("unknown FunctionIdentifierType: " + functionIdentifierType);
}
} catch (final FunctionException e) {
assertThat(e.getCause()).isInstanceOf(ServerConnectivityException.class);
} catch (ServerConnectivityException sce) {
assertThat(executionTarget).isInstanceOfAny(ExecutionTarget.SERVER.getClass(),
ExecutionTarget.SERVERS.getClass());
}
if (resultCollector != null) {
try {
resultCollector.getResult();
} catch (Exception ex) {
logger.info("Exception while collecting the result: " + ex.getMessage());
}
}
});
clientExecuteAsync.join();
return function;
}
private void setClientMetaDataStatus(final ClientMetadataStatus clientMetadataStatus) {
client.invoke(() -> {
ClientMetadataService cms = ((InternalCache) clusterStartupRule.getClientCache())
.getClientMetadataService();
if (clientMetadataStatus.equals(clientMetadataStatus.CLIENT_HAS_METADATA)) {
final Region<Object, Object> region =
clusterStartupRule.getClientCache().getRegion(regionName);
cms.scheduleGetPRMetaData((InternalRegion) region, true);
GeodeAwaitility.await("Awaiting ClientMetadataService.isMetadataStable()")
.untilAsserted(() -> assertThat(cms.isMetadataStable()).isTrue());
} else {
cms.setMetadataStable(false);
}
});
}
private void createClientRegionAndInitializeMetaDataState() {
client.invoke(() -> {
createClientRegion();
// registerFunctionIfNeeded(functionIdentifierType, function);
ClientMetadataService cms =
((InternalCache) clusterStartupRule.getClientCache()).getClientMetadataService();
cms.setMetadataStable(false);
final Region<Object, Object> region =
clusterStartupRule.getClientCache().getRegion(regionName);
for (int i = 0; i < 3 /* numberOfEntries */; i++) {
region.put("k" + i, "v" + i);
}
});
}
private void createServerRegionAndRegisterFunction(
final FunctionIdentifierType functionIdentifierType,
final TheFunction function) {
server1.invoke(() -> {
createServerRegion();
registerFunctionIfNeeded(functionIdentifierType, function);
});
server2.invoke(() -> {
createServerRegion();
registerFunctionIfNeeded(functionIdentifierType, function);
});
server3.invoke(() -> {
createServerRegion();
registerFunctionIfNeeded(functionIdentifierType, function);
});
}
private Execution getExecutionTarget(ExecutionTarget executionTarget) {
assertThat(executionTarget).isNotNull();
final Execution<Integer, Long, List<Long>> execution;
switch (executionTarget) {
case REGION:
execution =
FunctionService.onRegion(clusterStartupRule.getClientCache().getRegion(regionName))
.setArguments(200);
break;
case REGION_WITH_FILTER_1_KEY:
final HashSet<String> filter = new HashSet<String>(Arrays.asList("k0"));
execution =
FunctionService.onRegion(clusterStartupRule.getClientCache().getRegion(regionName))
.setArguments(200).withFilter(filter);
break;
case SERVER:
execution = FunctionService.onServer(clusterStartupRule.getClientCache().getDefaultPool())
.setArguments(200);
break;
case SERVER_REGION_SERVICE:
execution = FunctionService
.onServer(clusterStartupRule.getClientCache().getRegion(regionName).getRegionService())
.setArguments(200);
break;
case SERVERS:
execution = FunctionService.onServers(clusterStartupRule.getClientCache().getDefaultPool())
.setArguments(200);
break;
case SERVERS_REGION_SERVICE:
execution = FunctionService
.onServers(clusterStartupRule.getClientCache().getRegion(regionName).getRegionService())
.setArguments(200);
break;
default:
throw new TestException("unknown ExecutionTarget: " + executionTarget);
}
return execution;
}
private void registerFunctionIfNeeded(
final FunctionIdentifierType functionIdentifierType,
final TheFunction function) {
switch (functionIdentifierType) {
case STRING:
FunctionService.registerFunction(function);
break;
case OBJECT_REFERENCE:
// no-op: no need to pre-register the fn if we will invoke it by reference
break;
default:
throw new TestException("unknown FunctionIdentifierType: " + functionIdentifierType);
}
}
int getNumberOfFunctionCalls(final String functionId) {
return getNumberOfFunctionCalls(server1, functionId) +
getNumberOfFunctionCalls(server2, functionId) +
getNumberOfFunctionCalls(server3, functionId);
}
private int getNumberOfFunctionCalls(final MemberVM vm, final String functionId) {
return vm.invoke(() -> {
final int numExecutions;
final FunctionStats functionStats = FunctionStats.getFunctionStats(functionId);
if (functionStats == null) {
numExecutions = 0;
} else {
try {
GeodeAwaitility.await("Awaiting functionStats.getFunctionExecutionsRunning().isZero()")
.atMost(30 * 4, TimeUnit.SECONDS)
.untilAsserted(
() -> assertThat(functionStats.getFunctionExecutionsRunning()).isZero());
} catch (final Exception e) {
throw e;
}
numExecutions = functionStats.getFunctionExecutionCalls();
}
return numExecutions;
});
}
private ClientVM startClient(final int vmIndex) throws Exception {
return clusterStartupRule.startClientVM(
vmIndex,
cacheRule -> cacheRule
.withCacheSetup(fnTimeOut -> System
.setProperty(DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT", "20"))
.withLocatorConnection(locator.getPort()));
}
private ClientVM startClient(final int vmIndex, final int retryAttempts) throws Exception {
return clusterStartupRule.startClientVM(
vmIndex,
cacheRule -> cacheRule
.withCacheSetup(fnTimeOut -> System
.setProperty(DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT", "20"))
.withLocatorConnection(locator.getPort())
.withCacheSetup(cf -> cf.setPoolRetryAttempts(retryAttempts)));
}
private MemberVM startServer(final int vmIndex) {
return clusterStartupRule.startServerVM(
vmIndex,
cacheRule -> cacheRule
.withConnectionToLocator(locator.getPort())
.withProperty(SERIALIZABLE_OBJECT_FILTER,
"org.apache.geode.internal.cache.execute.FunctionRetryTestBase*"));
}
private void createServerRegion() {
final PartitionAttributesFactory<String, String> paf = new PartitionAttributesFactory<>();
paf.setRedundantCopies(REDUNDANT_COPIES);
paf.setTotalNumBuckets(TOTAL_NUM_BUCKETS);
clusterStartupRule.getCache().createRegionFactory(PARTITION)
.setPartitionAttributes(paf.create())
.create(regionName);
}
private void createClientRegion() {
clusterStartupRule.getClientCache().createClientRegionFactory(CACHING_PROXY).create(regionName);
}
static class TheFunction implements Function<Integer> {
private final boolean haStatus;
public TheFunction(final HAStatus haStatus) {
this.haStatus = (haStatus == HAStatus.HA ? true : false);
}
@Override
public void execute(final FunctionContext<Integer> context) {
LogService.getLogger().info("Function Executing on server.");
final int thinkTimeMillis = context.getArguments();
try {
Thread.sleep(thinkTimeMillis);
} catch (Exception ex) {
}
context.getResultSender().lastResult(thinkTimeMillis);
}
@Override
public boolean isHA() {
return haStatus;
}
}
}