blob: b308b2085150621c991ff5f62e64fc5d2140feb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.NAME;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.VM.getController;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.toArray;
import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.InternalClientCache;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.execute.util.TypedFunctionService;
import org.apache.geode.security.templates.DummyAuthenticator;
import org.apache.geode.security.templates.UserPasswordAuthInit;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.ClientCacheRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.FunctionServiceTest;
@Category(FunctionServiceTest.class)
@SuppressWarnings("serial")
public class DistributedRegionFunctionExecutionDUnitTest implements Serializable {
private static final AtomicReference<Region<?, ?>> REGION = new AtomicReference<>();
private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();
private final Set<String> filter = new HashSet<>();
private String regionName;
private String poolName;
private VM empty;
private VM normal;
private VM replicate1;
private VM replicate2;
private VM replicate3;
@Rule
public CacheRule cacheRule = new CacheRule();
@Rule
public ClientCacheRule clientCacheRule = new ClientCacheRule();
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@Before
public void setUp() throws Exception {
empty = getController();
replicate1 = getVM(0);
replicate2 = getVM(1);
replicate3 = getVM(2);
normal = getVM(3);
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
// zero is no-op unless test replaces the reference
vm.invoke(() -> LATCH.set(new CountDownLatch(0)));
}
regionName = getClass().getSimpleName() + "_region";
poolName = getClass().getSimpleName() + "_pool";
for (int i = 100; i < 120; i++) {
filter.add("execKey-" + i);
}
}
@After
public void tearDown() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> LATCH.get().countDown());
}
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> {
populateRegion(200);
executeDistributedRegionFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_SendException() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> {
populateRegion(200);
executeResultWithExceptionFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_NoLastResult() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> {
populateRegion(200);
executeNoLastResultFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyNormal() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> populateRegion(200));
normal.invoke(() -> {
Throwable thrown = catchThrowable(() -> executeDistributedRegionFunction());
assertThat(thrown)
.isInstanceOf(FunctionException.class)
.hasMessage("Function execution on region with DataPolicy.NORMAL is not supported");
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> populateRegion(200));
replicate1.invoke(() -> executeDistributedRegionFunction());
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicateNotTimedOut()
throws Exception {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> {
createCache(getDistributedSystemProperties());
LATCH.set(new CountDownLatch(1));
FunctionService.registerFunction(new DistributedRegionFunction());
FunctionService.registerFunction(new LongRunningFunction(LATCH.get()));
});
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> populateRegion(200));
AsyncInvocation executeFunctionInReplicate1 = replicate1.invokeAsync(() -> {
ResultCollector<String, List<String>> resultCollector = TypedFunctionService
.<Void, String, List<String>>onRegion(getRegion())
.withFilter(filter)
.execute(LongRunningFunction.class.getSimpleName(), getTimeout().toMillis(),
MILLISECONDS);
assertThat(resultCollector.getResult().get(0))
.isEqualTo("LongRunningFunction completed");
});
// how long LongRunningFunction runs for is now controlled here:
Thread.sleep(2000);
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> {
LATCH.get().countDown();
});
}
executeFunctionInReplicate1.await();
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicateTimedOut() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> {
createCache(getDistributedSystemProperties());
LATCH.set(new CountDownLatch(1));
FunctionService.registerFunction(new DistributedRegionFunction());
FunctionService.registerFunction(new LongRunningFunction(LATCH.get()));
});
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> populateRegion(200));
replicate1.invoke(() -> {
Throwable thrown = catchThrowable(() -> {
TypedFunctionService
.<Void, String, List<String>>onRegion(getRegion())
.withFilter(filter)
.execute(LongRunningFunction.class.getSimpleName(), 1000, MILLISECONDS);
});
assertThat(thrown)
.hasCauseInstanceOf(FunctionException.class)
.hasMessageContaining("All results not received in time provided");
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate_SendException() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> populateRegion(200));
replicate1.invoke(() -> executeResultWithExceptionFunction());
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate_NoLastResult() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
empty.invoke(() -> populateRegion(200));
replicate1.invoke(() -> executeNoLastResultFunction());
}
@Test
public void testDistributedRegionFunctionExecutionWithFunctionInvocationTargetException() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(true, 5));
}
empty.invoke(() -> populateRegion(200));
replicate1.invoke(() -> {
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetException();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionWithFunctionInvocationTargetException_WithoutHA() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(false, 0));
}
empty.invoke(() -> populateRegion(200));
replicate1.invoke(() -> {
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetExceptionWithoutHA();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionWithFunctionInvocationTargetExceptionForEmptyDataPolicy() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(true, 5));
}
empty.invoke(() -> populateRegion(200));
empty.invoke(() -> {
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetException();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionWithFunctionInvocationTargetExceptionForEmptyDataPolicy_WithoutHA() {
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
for (VM vm : toArray(replicate1, replicate2, replicate3)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
for (VM vm : toArray(empty, normal, replicate1, replicate2, replicate3)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(false, 0));
}
empty.invoke(() -> populateRegion(200));
empty.invoke(() -> {
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetExceptionWithoutHA();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionHACacheClosedException() {
for (VM vm : toArray(empty, replicate1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
replicate1.invoke(() -> createRegion(DataPolicy.REPLICATE));
empty.invoke(() -> populateRegion(200));
replicate2.invoke(() -> {
createCache(getDistributedSystemProperties());
createRegion(DataPolicy.REPLICATE);
});
empty.invoke(() -> {
List<Boolean> result = executeDistributedRegionFunction();
assertThat(result)
.hasSize(5001)
.containsOnly(true);
});
}
@Test
public void testDistributedRegionFunctionExecutionHANodeFailure() throws Exception {
for (VM vm : toArray(empty, replicate1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
replicate1.invoke(() -> createRegion(DataPolicy.REPLICATE));
empty.invoke(() -> populateRegion(200));
AsyncInvocation<List<Boolean>> executeFunctionHaInEmptyVM =
empty.invokeAsync(() -> executeDistributedRegionFunction());
replicate2.invoke(() -> {
createCache(getDistributedSystemProperties());
createRegion(DataPolicy.REPLICATE);
});
replicate1.invoke(() -> getCache().close());
List<Boolean> result = executeFunctionHaInEmptyVM.get();
assertThat(result)
.hasSize(5001)
.containsOnly(true);
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_ClientServer() {
VM empty1 = replicate3;
VM empty2 = normal;
VM client = empty;
for (VM vm : toArray(empty2, replicate1, replicate2, empty1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = empty1.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
int port2 = empty2.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
for (VM vm : toArray(replicate1, replicate2)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
executeDistributedRegionFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_ClientServer_SendException() {
VM empty1 = replicate3;
VM empty2 = normal;
VM client = empty;
for (VM vm : toArray(empty2, replicate1, replicate2, empty1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = empty1.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
int port2 = empty2.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
for (VM vm : toArray(replicate1, replicate2)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
executeResultWithExceptionFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_ClientServer_NoLastResult() {
VM empty1 = replicate3;
VM empty2 = normal;
VM client = empty;
for (VM vm : toArray(empty2, replicate1, replicate2, empty1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = empty1.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
int port2 = empty2.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
for (VM vm : toArray(replicate1, replicate2)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
try (IgnoredException ie1 = addIgnoredException("did not send last result", empty1);
IgnoredException ie2 = addIgnoredException("did not send last result", empty2)) {
executeNoLastResultFunction();
}
});
}
/**
* If one server goes down while executing a function, that function should failover to other
* available server.
*/
@Test
public void testServerFailoverWithTwoServerAliveHA() throws InterruptedException {
VM emptyServer1 = replicate1;
VM client = normal;
for (VM vm : toArray(emptyServer1, replicate2, replicate3)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
int port1 = emptyServer1.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
replicate3.invoke(() -> createRegion(DataPolicy.REPLICATE));
client.invoke(() -> {
LATCH.set(new CountDownLatch(1));
createClientCache();
createClientRegion(port1, port2);
populateClientRegion(200);
});
replicate2.invoke(() -> stopServerHA());
AsyncInvocation<List<Boolean>> executeFunctionHaInClientVm =
client.invokeAsync(() -> executeDistributedRegionFunction());
replicate2.invoke(() -> {
startServerHA();
client.invoke(() -> LATCH.get().countDown());
});
client.invoke(() -> {
LATCH.get().await(getTimeout().toMillis(), MILLISECONDS);
});
emptyServer1.invoke(() -> closeCacheHA());
List<Boolean> result = executeFunctionHaInClientVm.get();
assertThat(result)
.hasSize(5001)
.containsOnly(true);
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyNormal_ClientServer() {
VM client = empty;
VM normal1 = normal;
VM normal2 = replicate3;
VM empty = replicate2;
for (VM vm : toArray(normal1, replicate1, empty, normal2)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = normal1.invoke(() -> {
createRegion(DataPolicy.NORMAL);
return createCacheServer();
});
int port2 = normal2.invoke(() -> {
createRegion(DataPolicy.NORMAL);
return createCacheServer();
});
for (VM vm : toArray(empty, replicate1)) {
vm.invoke(() -> createRegion(DataPolicy.EMPTY));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
// add expected exception
try (IgnoredException ie = addIgnoredException(FunctionException.class)) {
Throwable thrown = catchThrowable(() -> executeDistributedRegionFunction());
assertThat(thrown)
.isInstanceOf(FunctionException.class)
.hasMessageContaining(
"Function execution on region with DataPolicy.NORMAL is not supported");
}
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate_ClientServer() {
VM client = empty;
VM empty = replicate3;
for (VM vm : toArray(normal, replicate1, replicate2, empty)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = replicate1.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
executeUnregisteredFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate_ClientServer_WithoutRegister() {
VM client = empty;
VM empty = replicate3;
for (VM vm : toArray(normal, replicate1, replicate2, empty)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = replicate1.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
executeDistributedRegionFunction();
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate_ClientServer_FunctionInvocationTargetException() {
VM client = empty;
VM empty = replicate3;
for (VM vm : toArray(normal, replicate1, replicate2, empty)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
int port1 = replicate1.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
client.invoke(() -> createClientCache());
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
for (VM vm : toArray(client, replicate1, replicate2, normal, empty)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(true, 5));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
// add expected exception to avoid suspect strings
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetException_ClientServer();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyReplicate_ClientServer_FunctionInvocationTargetException_WithoutHA() {
VM client = empty;
VM empty = replicate3;
for (VM vm : toArray(normal, replicate1, replicate2, empty)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
int port1 = replicate1.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
client.invoke(() -> createClientCache());
normal.invoke(() -> createRegion(DataPolicy.NORMAL));
empty.invoke(() -> createRegion(DataPolicy.EMPTY));
for (VM vm : toArray(client, replicate1, replicate2, normal, empty)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(false, 0));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
// add expected exception to avoid suspect strings
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetException_ClientServer_WithoutHA();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_ClientServer_FunctionInvocationTargetException() {
VM client = empty;
VM empty1 = replicate3;
VM empty2 = normal;
for (VM vm : toArray(empty2, replicate1, replicate2, empty1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
int port1 = empty1.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
int port2 = empty2.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
client.invoke(() -> createClientCache());
for (VM vm : toArray(replicate1, replicate2)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
});
for (VM vm : toArray(client, empty1, empty2, replicate1, replicate2)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(true, 5));
}
client.invoke(() -> {
// add expected exception to avoid suspect strings
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetException_ClientServer();
}
});
}
@Test
public void testDistributedRegionFunctionExecutionOnDataPolicyEmpty_ClientServer_FunctionInvocationTargetException_WithoutHA() {
VM client = empty;
VM empty1 = replicate3;
VM empty2 = normal;
for (VM vm : toArray(empty2, replicate1, replicate2, empty1)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache());
int port1 = empty1.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
int port2 = empty2.invoke(() -> {
createRegion(DataPolicy.EMPTY);
return createCacheServer();
});
for (VM vm : toArray(replicate1, replicate2)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
});
for (VM vm : toArray(client, empty1, empty2, replicate1, replicate2)) {
vm.invoke(() -> registerThrowsFunctionInvocationTargetExceptionFunction(false, 0));
}
client.invoke(() -> {
// add expected exception to avoid suspect strings
try (IgnoredException ie = addIgnoredException("I have been thrown")) {
executeFunctionFunctionInvocationTargetException_ClientServer_WithoutHA();
}
});
}
@Test
public void inlineFunctionIsUsedOnClientInsteadOfLookingUpFunctionById() {
VM client = empty;
VM empty = replicate3;
for (VM vm : toArray(normal, empty, replicate1, replicate2)) {
vm.invoke(() -> createCache(getDistributedSystemProperties()));
}
client.invoke(() -> createClientCache(getClientProperties()));
int port1 = replicate1.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
for (VM vm : toArray(normal, empty)) {
vm.invoke(() -> createRegion(DataPolicy.REPLICATE));
}
client.invoke(() -> {
createClientRegion(port1, port2);
populateClientRegion(200);
});
for (VM vm : toArray(normal, empty, replicate1, replicate2, client)) {
vm.invoke(() -> FunctionService.registerFunction(inlineFunction("Failure", false)));
}
client.invoke(() -> {
ResultCollector<Boolean, List<Boolean>> resultCollector = TypedFunctionService
.<Boolean, Boolean, List<Boolean>>onRegion(getRegion())
.setArguments(true)
.execute(inlineFunction("Success", true));
assertThat(resultCollector.getResult())
.hasSize(1)
.containsOnly(true);
});
}
/**
* Verify that AuthenticationRequiredException is not thrown when security-* properties are NOT
* provided. We have to grep for this exception in logs for any occurrence.
*/
@Test
public void authenticationRequiredExceptionIsNotThrownWhenSecurityIsNotConfigured() {
VM client = replicate1;
VM server = replicate2;
int port = server.invoke(() -> {
createCacheWithSecurity();
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
client.invoke(() -> {
createClientCacheWithSecurity();
createClientRegion(port);
executeNoResultFunction();
doPut();
});
}
@Test
public void testFunctionWithNoResultThrowsException() {
VM client = empty;
int port1 = replicate1.invoke(() -> {
createCache(getDistributedSystemProperties());
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
int port2 = replicate2.invoke(() -> {
createCache(getDistributedSystemProperties());
createRegion(DataPolicy.REPLICATE);
return createCacheServer();
});
client.invoke(() -> {
createClientCache();
createClientRegion(port1, port2);
populateClientRegion(200);
try (IgnoredException ie = addIgnoredException(RuntimeException.class)) {
executeThrowsRuntimeExceptionFunction();
}
});
}
private Properties getDistributedSystemProperties() {
Properties props = DistributedRule.getDistributedSystemProperties();
props.setProperty(SERIALIZABLE_OBJECT_FILTER,
"org.apache.geode.internal.cache.functions.**;org.apache.geode.internal.cache.execute.**;org.apache.geode.test.dunit.**");
return props;
}
private Properties getClientProperties() {
Properties props = new Properties();
props.setProperty(SERIALIZABLE_OBJECT_FILTER,
"org.apache.geode.internal.cache.functions.**;org.apache.geode.internal.cache.execute.**;org.apache.geode.test.dunit.**");
return props;
}
private void createCache(Properties props) {
cacheRule.createCache(props);
}
private void createClientCache(Properties props) {
clientCacheRule.createClientCache(props);
}
private InternalCache getCache() {
return cacheRule.getCache();
}
private InternalClientCache getClientCache() {
return clientCacheRule.getClientCache();
}
private void createCacheWithSecurity() {
Properties props = getDistributedSystemProperties();
props.setProperty(NAME, "SecurityServer");
props.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
DummyAuthenticator.class.getName() + ".create");
createCache(props);
}
private void createClientCache() {
Properties props = getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
createClientCache(props);
}
private void createClientCacheWithSecurity() {
Properties props = getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
props.setProperty(NAME, "SecurityClient");
props.setProperty(SECURITY_CLIENT_AUTH_INIT, UserPasswordAuthInit.class.getName() + ".create");
props.setProperty("security-username", "reader1");
props.setProperty("security-password", "reader1");
createClientCache(props);
}
private void createClientRegion(int... ports) {
System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
PoolFactory poolFactory = PoolManager.createFactory();
for (int port : ports) {
poolFactory.addServer("localhost", port);
}
Pool pool = poolFactory
.setMaxConnections(10)
.setMinConnections(6)
.setPingInterval(3000)
.setReadTimeout(2000)
.setRetryAttempts(2)
.setSocketBufferSize(1000)
.setSubscriptionEnabled(false)
.setSubscriptionRedundancy(-1)
.create(poolName);
Region<?, ?> region = getClientCache()
.createClientRegionFactory(ClientRegionShortcut.PROXY)
.setPoolName(pool.getName())
.create(regionName);
setRegion(region);
}
private void createRegion(DataPolicy dataPolicy) {
Region<?, ?> region = getCache().createRegionFactory()
.setDataPolicy(dataPolicy)
.setScope(Scope.DISTRIBUTED_ACK)
.create(regionName);
setRegion(region);
}
private int createCacheServer() throws IOException {
CacheServer cacheServer = getCache().addCacheServer();
cacheServer.setPort(0);
cacheServer.start();
return cacheServer.getPort();
}
private void doPut() {
Region<String, String> region = getRegion();
region.put("K1", "B1");
}
private void populateRegion(int count) {
Region<String, Integer> region = getRegion();
for (int i = 1; i <= count; i++) {
region.put("execKey-" + i, i);
}
}
private void populateClientRegion(int count) {
Region<String, Integer> region = getRegion();
for (int i = 1; i <= count; i++) {
region.put("execKey-" + i, i);
}
}
private void startServerHA() throws IOException {
for (CacheServer cacheServer : getCache().getCacheServers()) {
cacheServer.start();
}
}
private void stopServerHA() {
for (CacheServer cacheServer : getCache().getCacheServers()) {
cacheServer.stop();
}
}
private void closeCacheHA() {
for (CacheServer cacheServer : getCache().getCacheServers()) {
cacheServer.stop();
}
getCache().close();
}
private void registerThrowsFunctionInvocationTargetExceptionFunction(boolean isHA,
int retryCount) {
FunctionService.registerFunction(
new ThrowsFunctionInvocationTargetExceptionFunction(isHA, retryCount));
}
private void executeNoResultFunction() {
TypedFunctionService
.onRegion(getRegion())
.execute(new NoResultFunction());
}
private List<Boolean> executeDistributedRegionFunction() {
return TypedFunctionService
.<Boolean, Boolean, List<Boolean>>onRegion(getRegion())
.withFilter(filter)
.setArguments(false)
.execute(new DistributedRegionFunction())
.getResult();
}
private void executeThrowsRuntimeExceptionFunction() {
TypedFunctionService
.<Void, Void, Void>onRegion(getRegion())
.withFilter(filter)
.execute(new ThrowsRuntimeExceptionFunction());
}
private void executeResultWithExceptionFunction() {
Function function = new ResultWithExceptionFunction();
Set<String> filter = new HashSet<>();
for (int i = 0; i <= 19; i++) {
filter.add("execKey-" + 100 + i);
}
ResultCollector<Object, List<Object>> resultCollector = TypedFunctionService
.<Boolean, Object, List<Object>>onRegion(getRegion())
.withFilter(filter)
.setArguments(true)
.execute(function);
List<Object> result = resultCollector.getResult();
result.sort(new NumericComparator());
assertThat(result.get(0))
.as("First element of " + resultCollector.getResult())
.isInstanceOf(CustomRuntimeException.class);
resultCollector = TypedFunctionService
.<Set<String>, Object, List<Object>>onRegion(getRegion())
.withFilter(filter)
.setArguments(filter)
.execute(function);
result = resultCollector.getResult();
result.sort(new NumericComparator());
assertThat(result)
.hasSize(filter.size() + 1)
.containsAll(IntStream.rangeClosed(0, 19).boxed().collect(toList()));
assertThat(result.get(result.size() - 1))
.as("Last element of " + result)
.isInstanceOf(CustomRuntimeException.class);
}
private void executeNoLastResultFunction() {
Throwable thrown = catchThrowable(() -> {
TypedFunctionService
.onRegion(getRegion())
.withFilter(filter)
.execute(new NoLastResultFunction())
.getResult();
});
assertThat(thrown)
.hasMessageContaining("did not send last result");
}
private void executeUnregisteredFunction() {
FunctionService.unregisterFunction(new DistributedRegionFunction().getId());
TypedFunctionService
.<Void, Boolean, List<Boolean>>onRegion(getRegion())
.withFilter(filter)
.execute(new DistributedRegionFunction())
.getResult();
}
private void executeFunctionFunctionInvocationTargetException() {
ResultCollector<Integer, List<Integer>> resultCollector = TypedFunctionService
.<Boolean, Integer, List<Integer>>onRegion(getRegion())
.setArguments(true)
.execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName());
assertThat(resultCollector.getResult())
.containsOnly(5);
}
private void executeFunctionFunctionInvocationTargetExceptionWithoutHA() {
Throwable thrown = catchThrowable(() -> {
TypedFunctionService
.<Boolean, Integer, List<Integer>>onRegion(getRegion())
.setArguments(true)
.execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName())
.getResult();
});
assertThat(thrown)
.isInstanceOf(FunctionException.class);
assertThat(thrown.getCause())
.isInstanceOf(FunctionInvocationTargetException.class);
}
private void executeFunctionFunctionInvocationTargetException_ClientServer() {
ResultCollector<Integer, List<Integer>> resultCollector = TypedFunctionService
.<Boolean, Integer, List<Integer>>onRegion(getRegion())
.setArguments(true)
.execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName());
assertThat(resultCollector.getResult())
.containsOnly(5);
}
private void executeFunctionFunctionInvocationTargetException_ClientServer_WithoutHA() {
Throwable thrown = catchThrowable(() -> {
TypedFunctionService
.<Boolean, Integer, List<Integer>>onRegion(getRegion())
.setArguments(true)
.execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName())
.getResult();
});
assertThat(thrown)
.isInstanceOf(FunctionException.class);
assertThat(thrown.getCause())
.isInstanceOf(FunctionInvocationTargetException.class);
}
private static <K, V> Region<K, V> getRegion() {
return uncheckedCast(REGION.get());
}
private static void setRegion(Region<?, ?> region) {
REGION.set(region);
}
private static Function<Object> inlineFunction(String stringResult, boolean booleanResult) {
return new Function<Object>() {
@Override
public void execute(FunctionContext<Object> context) {
if (context.getArguments() instanceof String) {
context.getResultSender().lastResult(stringResult);
} else if (context.getArguments() instanceof Boolean) {
context.getResultSender().lastResult(booleanResult);
}
}
@Override
public String getId() {
return "Function";
}
};
}
/**
* Return Integers provided as Arguments and a last result of CustomRuntimeException.
*/
private static class ResultWithExceptionFunction implements Function<Object> {
@Override
public void execute(FunctionContext<Object> context) {
if (context.getArguments() instanceof Set) {
Set<Integer> arguments =
uncheckedCast(context.getArguments());
for (int i = 0; i < arguments.size(); i++) {
context.getResultSender().sendResult(i);
}
}
context.getResultSender().sendException(
new CustomRuntimeException("I have been thrown from TestFunction with set"));
}
@Override
public String getId() {
return getClass().getName();
}
@Override
public boolean isHA() {
return false;
}
}
/**
* Throw RuntimeException without sending any results.
*/
private static class ThrowsRuntimeExceptionFunction implements Function<Void> {
@Override
public void execute(FunctionContext<Void> context) {
throw new RuntimeException("failure");
}
@Override
public String getId() {
return getClass().getName();
}
@Override
public boolean hasResult() {
return false;
}
@Override
public boolean isHA() {
return false;
}
}
/**
* Return the Integer 5 or throw FunctionInvocationTargetException depending on object state.
*/
private static class ThrowsFunctionInvocationTargetExceptionFunction implements Function<Void> {
private final AtomicInteger count = new AtomicInteger();
private final int retryCount;
private final boolean isHA;
private ThrowsFunctionInvocationTargetExceptionFunction(boolean isHA, int retryCount) {
this.isHA = isHA;
this.retryCount = retryCount;
}
@Override
public void execute(FunctionContext<Void> context) {
count.incrementAndGet();
if (retryCount != 0 && count.get() >= retryCount) {
context.getResultSender().lastResult(5);
} else {
throw new FunctionInvocationTargetException("I have been thrown from " + getId());
}
}
@Override
public String getId() {
return getClass().getSimpleName();
}
@Override
public boolean isHA() {
return isHA;
}
}
/**
* Validate DataPolicy and Filter state, perform Region put operations, and then return 5000
* boolean true values and a last result of boolean false.
*/
private static class DistributedRegionFunction implements Function<Boolean> {
@Override
public void execute(FunctionContext<Boolean> context) {
RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context;
Region<Object, Object> region = regionFunctionContext.getDataSet();
InternalDistributedSystem sys = InternalDistributedSystem.getConnectedInstance();
assertThat(region.getAttributes().getDataPolicy().withStorage()).isTrue();
assertThat(region.getAttributes().getDataPolicy()).isNotEqualTo(DataPolicy.NORMAL);
assertThat(regionFunctionContext.getFilter()).hasSize(20);
// argument true indicates that CacheClose has to be done from the body itself
if (context.getArguments() != null && context.getArguments()) {
// do not close cache in retry
if (!regionFunctionContext.isPossibleDuplicate()) {
sys.disconnect();
throw new CacheClosedException(
"Throwing CacheClosedException to simulate failover during function exception");
}
}
// intentionally doing region operation to cause cacheClosedException
region.put("execKey-201", 201);
if (regionFunctionContext.isPossibleDuplicate()) {
// Below operation is done when the function is reexecuted
region.put("execKey-202", 202);
region.put("execKey-203", 203);
}
for (int i = 0; i < 5000; i++) {
context.getResultSender().sendResult(true);
}
context.getResultSender().lastResult(true);
}
@Override
public String getId() {
return getClass().getSimpleName();
}
}
/**
* Execute until the provided CountDownLatch counts down to zero.
*/
private static class LongRunningFunction implements Function<Void> {
private final CountDownLatch latch;
private LongRunningFunction(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void execute(FunctionContext<Void> context) {
try {
latch.await(getTimeout().toMillis(), MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
context.getResultSender().lastResult("LongRunningFunction completed");
}
@Override
public String getId() {
return getClass().getSimpleName();
}
}
/**
* Complete immediately with no results.
*/
private static class NoResultFunction implements Function<Void> {
@Override
public void execute(FunctionContext<Void> context) {
// no result
}
@Override
public String getId() {
return getClass().getSimpleName();
}
@Override
public boolean hasResult() {
return false;
}
@Override
public boolean isHA() {
return false;
}
}
/**
* Return Arguments as results without a last result.
*/
private static class NoLastResultFunction implements Function<Object> {
@Override
public void execute(FunctionContext<Object> context) {
context.getResultSender().sendResult(context.getArguments());
}
@Override
public String getId() {
return getClass().getSimpleName();
}
}
/**
* Custom RuntimeException.
*/
private static class CustomRuntimeException extends RuntimeException {
private CustomRuntimeException(String message) {
super(message);
}
}
/**
* Use natural ordering for Integers and moves optional Exception to end of Collection.
*/
private static class NumericComparator implements Comparator<Object>, Serializable {
@Override
public int compare(Object o1, Object o2) {
if (o1 == o2) {
return 0;
}
if (!(o1 instanceof Integer)) {
return 1;
}
if (!(o2 instanceof Integer)) {
return -1;
}
if ((int) o1 > (int) o2) {
return 1;
}
return -1;
}
}
}