blob: cafc821b366f4460b8b891764b03bbee4ff06fbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION1;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION2;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION3;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION7;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION_BUCKET_FILTER;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION_HA;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION_LASTRESULT;
import static org.apache.geode.internal.cache.functions.TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionAdapter;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.functions.TestFunction;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.CacheTestCase;
import org.apache.geode.test.junit.categories.FunctionServiceTest;
@Category({FunctionServiceTest.class})
public class PRFunctionExecutionDUnitTest extends CacheTestCase {
private static final String STRING_KEY = "execKey";
private String regionName;
private String regionNameTop;
private String regionNameColocated1;
private String regionNameColocated2;
@Before
public void setUp() {
regionName = getUniqueName();
regionNameTop = regionName + "_top";
regionNameColocated1 = regionName + "_colo1";
regionNameColocated2 = regionName + "_colo2";
}
@After
public void tearDown() {
invokeInEveryVM(() -> DistributedSystem.setThreadsSocketPolicy(false));
DistributedSystem.setThreadsSocketPolicy(false);
}
@Override
public Properties getDistributedSystemProperties() {
Properties config = new Properties();
config.put(SERIALIZABLE_OBJECT_FILTER,
"org.apache.geode.test.junit.rules.**;org.apache.geode.internal.cache.execute.**;org.apache.geode.internal.cache.functions.**;org.apache.geode.test.dunit.**");
return config;
}
/**
* Test to validate that the function execution is successful on PR with Loner Distributed System
*/
@Test
public void testFunctionExecution() throws Exception {
Properties config = getDistributedSystemProperties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
getCache(config);
Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0);
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION1);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, Collection<Boolean>> execution = FunctionService.onRegion(region);
ResultCollector<Boolean, Collection<Boolean>> resultCollector =
execution.setArguments(true).withFilter(createKeySet(STRING_KEY)).execute(function);
assertThat(resultCollector.getResult()).containsExactly(true);
}
@Test
public void testHAFunctionExecution() throws Exception {
Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0);
Function<Void> function = new TestFunction<>(false, TestFunction.TEST_FUNCTION10);
assertThatThrownBy(() -> FunctionService.registerFunction(function))
.isInstanceOf(FunctionException.class)
.hasMessageContaining("For Functions with isHA true, hasResult must also be true.");
Execution<String, Void, Void> dataSet = FunctionService.onRegion(region);
assertThatThrownBy(() -> dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(STRING_KEY)
.execute(function)).isInstanceOf(FunctionException.class)
.hasMessageContaining("For Functions with isHA true, hasResult must also be true.");
}
/**
* Test remote execution by a pure accessor which doesn't have the function factory present.
*/
@Test
public void testRemoteSingleKeyExecution_byName() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Object, Object, List<Object>> dataSet = FunctionService.onRegion(region);
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId());
ResultCollector<Object, List<Object>> resultCollector =
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId());
assertThat(resultCollector.getResult()).hasSize(1).containsExactly(true);
resultCollector = dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(STRING_KEY)
.execute(function.getId());
assertThat(resultCollector.getResult()).hasSize(1).containsExactly(1);
Map<String, Integer> putData = createEntryMap(2, STRING_KEY, 1, 2);
resultCollector = dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(putData)
.execute(function.getId());
assertThat(resultCollector.getResult()).hasSize(1).containsExactly(true);
assertThat(region.get(STRING_KEY + "1")).isEqualTo(2);
assertThat(region.get(STRING_KEY + "2")).isEqualTo(3);
});
}
/**
* Test remote execution with timeout by sending function id where timeout does not expire.
*/
@Test
public void testRemoteSingleKeyExecution_byName_NotTimedOut() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(
new TestFunction(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME));
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function<Boolean> function =
new TestFunction<>(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME);
FunctionService.registerFunction(function);
Execution<Object, Object, List<Object>> dataSet = FunctionService.onRegion(region);
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId());
long timeout = 8000;
TimeUnit unit = TimeUnit.MILLISECONDS;
ResultCollector<Object, List<Object>> resultCollector =
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId(),
timeout, unit);
List li = (ArrayList) resultCollector.getResult();
assertEquals(li.get(0), "Ran executeFunctionRunningForLongTime for 2000");
});
}
/**
* Test remote execution with timeout by sending function id where timeout expires.
*/
@Test
public void testRemoteSingleKeyExecution_byName_TimedOut() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(
new TestFunction(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME));
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function<Boolean> function =
new TestFunction<>(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME);
FunctionService.registerFunction(function);
Execution<Object, Object, List<Object>> dataSet = FunctionService.onRegion(region)
.withFilter(createKeySet(STRING_KEY)).setArguments(STRING_KEY);
long timeout = 1000;
TimeUnit unit = TimeUnit.MILLISECONDS;
assertThatThrownBy(() -> {
dataSet.execute(function.getId(), timeout, unit);
}).hasCauseInstanceOf(FunctionException.class)
.hasMessageContaining("All results not received in time provided");
});
}
/**
* Test local execution by a datastore Function throws the FunctionInvocationTargetException. As
* this is the case of HA then system should retry the function execution. After 5th attempt
* function will send Boolean as last result. factory present.
*/
@Test
public void testLocalSingleKeyExecution_byName_FunctionInvocationTargetException()
throws Exception {
Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0);
region.put(STRING_KEY, 1);
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_REEXECUTE_EXCEPTION);
FunctionService.registerFunction(function);
Execution<Boolean, Integer, List<Integer>> dataSet = FunctionService.onRegion(region);
ResultCollector<Integer, List<Integer>> resultCollector =
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId());
List<Integer> list = resultCollector.getResult();
assertThat(list.get(0)).isEqualTo(5);
}
/**
* Test remote execution by a pure accessor which doesn't have the function factory
* present.Function throws the FunctionInvocationTargetException. As this is the case of HA then
* system should retry the function execution. After 5th attempt function will send Boolean as
* last result.
*/
@Test
public void testRemoteSingleKeyExecution_byName_FunctionInvocationTargetException()
throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
Function function = new TestFunction(true, TEST_FUNCTION_REEXECUTE_EXCEPTION);
FunctionService.registerFunction(function);
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_REEXECUTE_EXCEPTION);
FunctionService.registerFunction(function);
Execution<Boolean, Integer, List<Integer>> dataSet = FunctionService.onRegion(region);
ResultCollector<Integer, List<Integer>> resultCollector =
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId());
List<Integer> list = resultCollector.getResult();
assertThat(list.get(0)).isEqualTo(5);
});
}
/**
* Test remote execution by a pure accessor which doesn't have the function factory present.
*/
@Test
public void testRemoteSingleKeyExecution_byInstance() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
Function function = new TestFunction(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function function = new TestFunction(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> booleanDataSet = FunctionService.onRegion(region);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanDataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function);
assertThat(booleanResultCollector.getResult().get(0)).isTrue();
Execution<String, Integer, List<Integer>> integerDataSet = FunctionService.onRegion(region);
ResultCollector<Integer, List<Integer>> integerResultCollector = integerDataSet
.withFilter(createKeySet(STRING_KEY)).setArguments(STRING_KEY).execute(function);
assertThat(integerResultCollector.getResult().get(0)).isEqualTo(1);
Map<String, Integer> putData = createEntryMap(2, STRING_KEY, 1, 2);
Execution<Map<String, Integer>, Boolean, List<Boolean>> mapDataSet =
FunctionService.onRegion(region);
ResultCollector<Boolean, List<Boolean>> mapResultCollector =
mapDataSet.withFilter(createKeySet(STRING_KEY)).setArguments(putData).execute(function);
assertThat(mapResultCollector.getResult().get(0)).isTrue();
assertThat(region.get(STRING_KEY + "1")).isEqualTo(2);
assertThat(region.get(STRING_KEY + "2")).isEqualTo(3);
});
}
/**
* Test remote execution with timeout by sending function instance where timeout does not expire.
*/
@Test
public void testRemoteSingleKeyExecution_byInstance_NotTimedOut() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(
new TestFunction(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME));
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function<Boolean> function =
new TestFunction<>(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME);
FunctionService.registerFunction(function);
Execution<Object, Object, List<Object>> dataSet = FunctionService.onRegion(region);
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function);
long timeout = 8000;
TimeUnit unit = TimeUnit.MILLISECONDS;
ResultCollector<Object, List<Object>> resultCollector =
dataSet.withFilter(createKeySet(STRING_KEY)).setArguments(true).execute(function.getId(),
timeout, unit);
List li = (ArrayList) resultCollector.getResult();
assertEquals(li.get(0), "Ran executeFunctionRunningForLongTime for 2000");
});
}
/**
* Test remote execution with timeout by sending function instance where timeout expires.
*/
@Test
public void testRemoteSingleKeyExecution_byInstance_TimedOut() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(
new TestFunction(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME));
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Function<Boolean> function =
new TestFunction<>(true, TestFunction.TEST_FUNCTION_RUNNING_FOR_LONG_TIME);
FunctionService.registerFunction(function);
Execution<Object, Object, List<Object>> dataSet = FunctionService.onRegion(region)
.withFilter(createKeySet(STRING_KEY)).setArguments(STRING_KEY);
long timeout = 1000;
TimeUnit unit = TimeUnit.MILLISECONDS;
assertThatThrownBy(() -> {
dataSet.execute(function, timeout, unit);
}).hasCauseInstanceOf(FunctionException.class)
.hasMessageContaining("All results not received in time provided");
});
}
/**
* Test remote execution of inline function by a pure accessor
*/
@Test
public void testRemoteSingleKeyExecution_byInlineFunction() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
});
accessor.invoke(() -> {
Region<String, Integer> region = getPartitionedRegion(regionName);
region.put(STRING_KEY, 1);
Execution<Boolean, Boolean, List<Boolean>> dataSet = FunctionService.onRegion(region);
ResultCollector<Boolean, List<Boolean>> rs1 = dataSet.withFilter(createKeySet(STRING_KEY))
.setArguments(true).execute(new BooleanFunction(true));
assertThat(rs1.getResult().get(0)).isTrue();
});
}
/**
* Test multi-key remote execution by a pure accessor which doesn't have the function factory
* present. ResultCollector = DefaultResultCollector haveResults = true;
*/
@Test
public void testRemoteMultiKeyExecution_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
Collection<Integer> expectedValues = new HashSet<>();
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
expectedValues.add(value);
pr.put(key, value);
}
Function function = new TestFunction(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> booleanExecution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanExecution.withFilter(keySet).setArguments(true).execute(function.getId());
List<Boolean> booleanResultList = booleanResultCollector.getResult();
assertThat(booleanResultList).hasSize(3).containsExactly(true, true, true);
Execution<Set<String>, List<Integer>, List<List<Integer>>> integerExecution =
FunctionService.onRegion(pr);
ResultCollector<List<Integer>, List<List<Integer>>> valuesResultCollector =
integerExecution.withFilter(keySet).setArguments(keySet).execute(function.getId());
List<List<Integer>> valuesResults = valuesResultCollector.getResult();
assertThat(valuesResults).hasSize(3).isInstanceOf(List.class);
assertThat(valuesResults.get(0)).isInstanceOf(List.class);
Collection<Integer> actualValues = new HashSet<>();
for (List<Integer> values : valuesResults) {
assertThat(values.size()).isGreaterThan(0);
for (int value : values) {
assertThat(actualValues.add(value)).isTrue();
}
}
assertThat(actualValues).isEqualTo(expectedValues);
});
}
@Test
public void testRemoteMultiKeyExecution_BucketMoved() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(0, 1, 0, 113));
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(40, 1, 0, 113));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_LASTRESULT));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(40, 1, 0, 113));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_LASTRESULT));
});
accessor.invoke(() -> {
Region<Integer, String> region = getPartitionedRegion(regionName);
for (int i = 0; i < 113; i++) {
region.put(i, STRING_KEY + i);
}
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(40, 1, 0, 113));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_LASTRESULT));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION_LASTRESULT);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> dataSet = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> resultCollector =
dataSet.setArguments(true).execute(function.getId());
List<Boolean> resultList = resultCollector.getResult();
assertThat(resultList).hasSize(2).containsExactly(true, true);
});
}
@Test
public void testLocalMultiKeyExecution_BucketMoved() throws Exception {
IgnoredException.addIgnoredException("BucketMovedException");
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(40, 0, 0, 113));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_LASTRESULT));
});
datastore0.invoke(() -> {
Region<Integer, String> region = getPartitionedRegion(regionName);
for (int i = 0; i < 113; i++) {
region.put(i, STRING_KEY + i);
}
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(40, 0, 0, 113));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_LASTRESULT));
});
datastore0.invoke(() -> {
Region<Integer, String> region = getPartitionedRegion(regionName);
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION_LASTRESULT);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> dataSet = FunctionService.onRegion(region);
ResultCollector<Boolean, List<Boolean>> resultCollector =
dataSet.setArguments(true).execute(function.getId());
List<Boolean> resultList = resultCollector.getResult();
assertThat(resultList).hasSize(2).containsExactly(true, true);
});
}
/**
* Test remote execution by a pure accessor which doesn't have the function factory
* present.Function throws the FunctionInvocationTargetException. As this is the case of HA then
* system should retry the function execution. After 5th attempt function will send Boolean as
* last result.
*/
@Test
public void testRemoteMultipleKeyExecution_byName_FunctionInvocationTargetException()
throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_REEXECUTE_EXCEPTION));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_REEXECUTE_EXCEPTION));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_REEXECUTE_EXCEPTION));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_REEXECUTE_EXCEPTION);
FunctionService.registerFunction(function);
Execution<Boolean, Integer, List<Integer>> dataSet = FunctionService.onRegion(pr);
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
ResultCollector<Integer, List<Integer>> resultCollector =
dataSet.withFilter(keySet).setArguments(true).execute(function.getId());
List<Integer> resultList = resultCollector.getResult();
assertThat(resultList).hasSize(3).containsExactly(5, 5, 5);
});
}
@Test
public void testRemoteMultiKeyExecutionHA_CacheClose() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 1);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 1);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_HA));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 1);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_HA));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 1);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_HA));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
});
AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(() -> executeFunction());
datastore0.invoke(() -> {
Thread.sleep(3_000);
getCache().close();
});
List<Boolean> resultList = async.get();
assertThat(resultList).hasSize(2).containsExactly(true, true);
}
@Test
public void testRemoteMultiKeyExecutionHA_Disconnect() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 1);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 1);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_HA));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 1);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_HA));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 1);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_HA));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
});
AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(() -> executeFunction());
datastore0.invoke(() -> {
Thread.sleep(3000);
getCache().getDistributedSystem().disconnect();
});
List<Boolean> resultList = async.get();
assertThat(resultList).hasSize(2).containsExactly(true, true);
}
/**
* Test multi-key remote execution of inline function by a pure accessor ResultCollector =
* DefaultResultCollector haveResults = true;
*/
@Test
public void testRemoteMultiKeyExecution_byInlineFunction() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
Execution<Boolean, Boolean, List<Boolean>> execution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> resultCollector =
execution.withFilter(keySet).setArguments(true).execute(new BooleanFunction(true));
List<Boolean> results = resultCollector.getResult();
assertThat(results).hasSize(3).containsExactly(true, true, true);
});
}
/**
* Test multi-key remote execution by a pure accessor which doesn't have the function factory
* present. ResultCollector = CustomResultCollector haveResults = true;
*/
@Test
public void testRemoteMultiKeyExecutionWithCollector_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> execution =
FunctionService.onRegion(pr).withCollector(new CustomResultCollector());
ResultCollector<Boolean, List<Boolean>> resultCollector =
execution.withFilter(keySet).setArguments(true).execute(function.getId());
List<Boolean> results = resultCollector.getResult();
assertThat(results).hasSize(3).containsExactly(true, true, true);
});
}
/**
* Test multi-key remote execution by a pure accessor which doesn't have the function factory
* present. ResultCollector = DefaultResultCollector haveResults = false;
*/
@Test
public void testRemoteMultiKeyExecutionNoResult_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(false, TEST_FUNCTION7));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(false, TEST_FUNCTION7));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(false, TEST_FUNCTION7));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
Function<Void> function = new TestFunction<>(false, TEST_FUNCTION7);
FunctionService.registerFunction(function);
Execution<Boolean, Void, Void> dataSet = FunctionService.onRegion(pr);
ResultCollector<Void, Void> resultCollector =
dataSet.withFilter(keySet).setArguments(true).execute(function.getId());
assertThatThrownBy(() -> resultCollector.getResult()).isInstanceOf(FunctionException.class)
.hasMessageStartingWith(
String.format("Cannot %s result as the Function#hasResult() is false",
"return any"));
});
}
/**
* Test multi-key remote execution by a pure accessor which doesn't have the function factory
* present. ResultCollector = DefaultResultCollector haveResults = true; result Timeout = 10
* milliseconds expected result to be 0.(as the execution gets the timeout)
*/
@Test
public void testRemoteMultiKeyExecution_timeout() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<String, Object, List<Object>> dataSet = FunctionService.onRegion(pr);
ResultCollector<Object, List<Object>> resultCollector =
dataSet.withFilter(keySet).setArguments("TestingTimeOut").execute(function.getId());
List<Object> resultList = resultCollector.getResult(10, TimeUnit.SECONDS);
assertThat(resultList).hasSize(3).containsExactly(null, null, null);
});
}
/**
* Test multi-key remote execution by a pure accessor which doesn't have the function factory
* present. ResultCollector = CustomResultCollector haveResults = false;
*/
@Test
public void testRemoteMultiKeyExecutionWithCollectorNoResult_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(false, TEST_FUNCTION7));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(false, TEST_FUNCTION7));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(false, TEST_FUNCTION7));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7);
FunctionService.registerFunction(function);
Execution<Boolean, Object, List<Object>> dataSet =
FunctionService.onRegion(pr).withCollector(new CustomResultCollector());
ResultCollector<Object, List<Object>> resultCollector =
dataSet.withFilter(keySet).setArguments(true).execute(function.getId());
assertThatThrownBy(() -> resultCollector.getResult()).isInstanceOf(FunctionException.class)
.hasMessageStartingWith(
String.format("Cannot %s result as the Function#hasResult() is false",
"return any"));
});
}
/**
* Test multi-key remote execution by a pure accessor which doesn't have the function factory
* present.
*/
@Test
public void testRemoteMultiKeyExecution_byInstance() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0);
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
Collection<Integer> expectedValues = new HashSet<>();
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
expectedValues.add(value);
pr.put(key, value);
}
Function<List<Integer>> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> booleanExecution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanExecution.withFilter(keySet).setArguments(true).execute(function.getId());
List<Boolean> booleanResultList = booleanResultCollector.getResult();
assertThat(booleanResultList).hasSize(3).containsExactly(true, true, true);
Execution<Set<String>, List<Integer>, List<List<Integer>>> keysExecution =
FunctionService.onRegion(pr);
ResultCollector<List<Integer>, List<List<Integer>>> valuesResultCollector =
keysExecution.withFilter(keySet).setArguments(keySet).execute(function.getId());
List<List<Integer>> valueListResultList = valuesResultCollector.getResult();
assertThat(valueListResultList).hasSize(3);
Collection<Integer> actualValues = new HashSet<>();
for (List<Integer> values : valueListResultList) {
assertThat(values.size()).isGreaterThan(0);
for (int value : values) {
assertThat(actualValues.add(value)).isTrue();
}
}
assertThat(actualValues).isEqualTo(expectedValues);
});
}
/**
* Test bucketFilter functionality
*/
@Test
public void testBucketFilter_1() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0, new DivisorResolver(10));
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0, new DivisorResolver(10));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_BUCKET_FILTER));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0, new DivisorResolver(10));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_BUCKET_FILTER));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0, new DivisorResolver(10));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_BUCKET_FILTER));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
for (int i = 0; i < 50; ++i) {
pr.put(i, i);
}
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_BUCKET_FILTER);
FunctionService.registerFunction(function);
InternalExecution execution = (InternalExecution) FunctionService.onRegion(pr);
Set<Integer> bucketIdSet = createBucketIdSet(2);
ResultCollector<Integer, List<Integer>> resultCollector =
execution.withBucketFilter(bucketIdSet).execute(function);
List<Integer> results = resultCollector.getResult();
assertThat(results).hasSameElementsAs(bucketIdSet);
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_BUCKET_FILTER);
FunctionService.registerFunction(function);
InternalExecution execution = (InternalExecution) FunctionService.onRegion(pr);
Set<Integer> bucketIdSet = createBucketIdSet(2, 3);
ResultCollector<Integer, List<Integer>> resultCollector =
execution.withBucketFilter(bucketIdSet).execute(function);
List<Integer> results = resultCollector.getResult();
assertThat(results).hasSameElementsAs(bucketIdSet);
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_BUCKET_FILTER);
FunctionService.registerFunction(function);
InternalExecution dataSet = (InternalExecution) FunctionService.onRegion(pr);
Set<Integer> bucketIdSet = createBucketIdSet(1, 2, 3, 0, 4);
ResultCollector<Integer, List<Integer>> resultCollector =
dataSet.withBucketFilter(bucketIdSet).execute(function);
List<Integer> results = resultCollector.getResult();
assertThat(results).hasSameElementsAs(bucketIdSet);
});
}
@Test
public void testBucketFilterOverride() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
accessor.invoke(() -> {
createPartitionedRegion(regionName, 0, 0, new DivisorResolver(10));
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, 10, 0, new DivisorResolver(10));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_BUCKET_FILTER));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0, new DivisorResolver(10));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_BUCKET_FILTER));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0, new DivisorResolver(10));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION_BUCKET_FILTER));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
for (int i = 0; i < 50; ++i) {
pr.put(i, i);
}
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION_BUCKET_FILTER);
FunctionService.registerFunction(function);
InternalExecution execution = (InternalExecution) FunctionService.onRegion(pr);
Set<Integer> bucketIdSet = createBucketIdSet(2);
Set<Integer> keySet = createKeySet(33, 43);
Set<Integer> expectedBucketIdSet = createBucketIdSet(3, 4);
ResultCollector<Integer, List<Integer>> resultCollector =
execution.withBucketFilter(bucketIdSet).withFilter(keySet).execute(function);
List<Integer> results = resultCollector.getResult();
assertThat(results).hasSameElementsAs(expectedBucketIdSet);
});
}
/**
* Test ability to execute a multi-key function by a local data store ResultCollector =
* DefaultResultCollector haveResult = true
*/
@Test
public void testLocalMultiKeyExecution_byName() throws Exception {
PartitionedRegion pr = createPartitionedRegion(regionName, 10, 0);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
Collection<Integer> expectedValues = new HashSet<>();
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
expectedValues.add(value);
pr.put(key, value);
}
Function<Integer> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> booleanExecution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanExecution.withFilter(keySet).setArguments(true).execute(function.getId());
List<Boolean> booleanResults = booleanResultCollector.getResult();
assertThat(booleanResults).hasSize(1).containsExactly(true);
Execution<Set<String>, List<Integer>, List<List<Integer>>> valuesExecution =
FunctionService.onRegion(pr);
ResultCollector<List<Integer>, List<List<Integer>>> valuesResultCollector =
valuesExecution.withFilter(keySet).setArguments(keySet).execute(function.getId());
List<List<Integer>> valuesResults = valuesResultCollector.getResult();
assertThat(valuesResults).hasSize(1);
Collection<Integer> actualValues = new HashSet<>();
for (List<Integer> values : valuesResults) {
assertThat(values.size()).isGreaterThan(0);
for (int value : values) {
assertThat(actualValues.add(value)).isTrue();
}
}
assertThat(actualValues).isEqualTo(expectedValues);
}
/**
* Test ability to execute a multi-key function by a local data store
*/
@Test
public void testLocalMultiKeyExecution_byInstance() throws Exception {
PartitionedRegion pr = createPartitionedRegion(regionName, 10, 0);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
Collection<Integer> expectedValues = new HashSet<>();
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
expectedValues.add(value);
pr.put(key, value);
}
Function function = new TestFunction(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> booleanExecution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanExecution.withFilter(keySet).setArguments(true).execute(function);
List<Boolean> booleanResults = booleanResultCollector.getResult();
assertThat(booleanResults).hasSize(1).containsExactly(true);
Execution<Set<String>, List<Integer>, List<List<Integer>>> valuesExecution =
FunctionService.onRegion(pr);
ResultCollector<List<Integer>, List<List<Integer>>> valuesResultCollector =
valuesExecution.withFilter(keySet).setArguments(keySet).execute(function);
List<List<Integer>> valuesResults = valuesResultCollector.getResult();
assertThat(valuesResults).hasSize(1);
Collection<Integer> actualValues = new HashSet<>();
for (List<Integer> values : valuesResults) {
assertThat(values.size()).isGreaterThan(0);
for (int value : values) {
assertThat(actualValues.add(value)).isTrue();
}
}
assertThat(actualValues).isEqualTo(expectedValues);
}
/**
* Ensure that the execution is limited to a single bucket put another way, that the routing logic
* works correctly such that there is not extra execution
*/
@Test
public void testMultiKeyExecutionOnASingleBucket_byName() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM datastore3 = getHost(0).getVM(3);
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore3.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore3.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
// Assert there is data each bucket
for (int bucketId = 0; bucketId < pr.getTotalNumberOfBuckets(); bucketId++) {
assertThat(pr.getBucketKeys(bucketId).size()).isGreaterThan(0);
}
Function function = new TestFunction(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
for (String key : keySet) {
Set<String> singleKeySet = createKeySet(key);
Execution<Boolean, Boolean, List<Boolean>> booleanExecution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanExecution.withFilter(singleKeySet).setArguments(true).execute(function.getId());
List<Boolean> booleanResults = booleanResultCollector.getResult();
assertThat(booleanResults).hasSize(1).containsExactly(true);
Execution<Set<String>, List<Integer>, List<List<Integer>>> valuesExecution =
FunctionService.onRegion(pr);
ResultCollector<List<Integer>, List<List<Integer>>> valuesResultCollector = valuesExecution
.withFilter(singleKeySet).setArguments(singleKeySet).execute(function.getId());
List<List<Integer>> valuesResults = valuesResultCollector.getResult();
assertThat(valuesResults).hasSize(1);
List<Integer> values = valuesResults.get(0);
assertThat(values).hasSize(1);
assertThat(values.get(0)).isEqualTo(pr.get(singleKeySet.iterator().next()));
}
});
}
/**
* Ensure that the execution is limited to a single bucket put another way, that the routing logic
* works correctly such that there is not extra execution
*/
@Test
public void testMultiKeyExecutionOnASingleBucket_byInstance() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM datastore3 = getHost(0).getVM(3);
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore3.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore3.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 3; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
// Assert there is data each bucket
for (int bucketId = 0; bucketId < pr.getTotalNumberOfBuckets(); bucketId++) {
assertThat(pr.getBucketKeys(bucketId).size()).isGreaterThan(0);
}
Function function = new TestFunction(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
for (String key : keySet) {
Set<String> singleKeySet = createKeySet(key);
Execution<Boolean, Boolean, List<Boolean>> booleanExecution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
booleanExecution.withFilter(singleKeySet).setArguments(true).execute(function);
List<Boolean> booleanResults = booleanResultCollector.getResult();
assertThat(booleanResults).hasSize(1).containsExactly(true);
Execution<Set<String>, List<Integer>, List<List<Integer>>> valuesExecution =
FunctionService.onRegion(pr);
ResultCollector<List<Integer>, List<List<Integer>>> valuesResultCollector =
valuesExecution.withFilter(singleKeySet).setArguments(singleKeySet).execute(function);
List<List<Integer>> valuesResults = valuesResultCollector.getResult();
assertThat(valuesResults).hasSize(1);
List<Integer> values = valuesResults.get(0);
assertThat(values).hasSize(1);
assertThat(values.get(0)).isEqualTo(pr.get(singleKeySet.iterator().next()));
}
});
}
/**
* Ensure that the execution is happening all the PR as a whole
*/
@Test
public void testExecutionOnAllNodes_byName() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM datastore3 = getHost(0).getVM(3);
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore3.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore3.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 3; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
// Assert there is data in each bucket
for (int bucketId = 0; bucketId < pr.getTotalNumberOfBuckets(); bucketId++) {
assertThat(pr.getBucketKeys(bucketId).size()).isGreaterThan(0);
}
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> execution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> resultCollector =
execution.setArguments(true).execute(function.getId());
List<Boolean> results = resultCollector.getResult();
assertThat(results).hasSize(4).containsExactly(true, true, true, true);
});
}
/**
* Ensure that the execution is happening all the PR as a whole
*/
@Test
public void testExecutionOnAllNodes_byInstance() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM accessor = getHost(0).getVM(3);
accessor.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(0, 0, 17));
});
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION2));
});
accessor.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 3; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
// Assert there is data in each bucket
for (int bucketId = 0; bucketId < pr.getTotalNumberOfBuckets(); bucketId++) {
assertThat(pr.getBucketKeys(bucketId).size()).isGreaterThan(0);
}
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution<Boolean, Boolean, List<Boolean>> dataSet = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> resultCollector =
dataSet.setArguments(true).execute(function);
List<Boolean> booleanResults = resultCollector.getResult();
assertThat(booleanResults).hasSize(3).containsExactly(true, true, true);
});
}
/**
* Ensure that the execution of inline function is happening all the PR as a whole
*/
@Test
public void testExecutionOnAllNodes_byInlineFunction() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM datastore3 = getHost(0).getVM(3);
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
});
datastore3.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0, 17));
});
datastore3.invoke(() -> {
PartitionedRegion pr = getPartitionedRegion(regionName);
Collection<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 3; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
int valueIndex = 0;
for (String key : keySet) {
int value = valueIndex++;
pr.put(key, value);
}
// Assert there is data in each bucket
for (int bucketId = 0; bucketId < pr.getTotalNumberOfBuckets(); bucketId++) {
assertThat(pr.getBucketKeys(bucketId).size()).isGreaterThan(0);
}
Execution<Boolean, Boolean, List<Boolean>> execution = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> booleanResultCollector =
execution.setArguments(true).execute(new BooleanFunction(true));
List<Boolean> booleanResults = booleanResultCollector.getResult();
assertThat(booleanResults).hasSize(4).containsExactly(true, true, true, true);
});
}
/**
* Ensure that the execution is happening on all the PR as a whole with LocalReadPR as
* LocalDataSet
*/
@Test
public void testExecutionOnAllNodes_LocalReadPR() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM datastore3 = getHost(0).getVM(3);
datastore0.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore1.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore3.invoke(() -> {
createPartitionedRegion(regionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore3.invoke(() -> {
Region<CustId, Customer> region = getPartitionedRegion(regionName);
Set<CustId> keySet = new HashSet<>();
for (int i = 1; i <= 10; i++) {
CustId custId = new CustId(i);
Customer customer = new Customer("name" + i, "Address" + i);
region.put(custId, customer);
keySet.add(custId);
}
Function<List<Customer>> function = new TestFunction<>(true, TEST_FUNCTION3);
FunctionService.registerFunction(function);
Execution<Set<CustId>, List<Customer>, List<List<Customer>>> execution =
FunctionService.onRegion(region);
ResultCollector<List<Customer>, List<List<Customer>>> resultCollector =
execution.setArguments(keySet).execute(function.getId());
List<List<Customer>> customerListResults = resultCollector.getResult();
assertThat(customerListResults).hasSize(4);
List<Customer> values = new ArrayList<>();
for (List<Customer> customers : customerListResults) {
values.addAll(customers);
}
assertThat(values).hasSameSizeAs(keySet);
});
}
/**
* Ensure that the execution is happening on all the PR as a whole with LocalReadPR as
* LocalDataSet
*/
@Test
public void testExecutionOnMultiNodes_LocalReadPR() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
VM datastore3 = getHost(0).getVM(3);
String customerRegionName = "CustomerPartitionedRegionName";
String orderRegionName = "OrderPartitionedRegionName";
datastore0.invoke(() -> {
createPartitionedRegion(customerRegionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore1.invoke(() -> {
createPartitionedRegion(customerRegionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore2.invoke(() -> {
createPartitionedRegion(customerRegionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore3.invoke(() -> {
createPartitionedRegion(customerRegionName, createPartitionAttributes(10, 0,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 17));
FunctionService.registerFunction(new TestFunction(true, TEST_FUNCTION3));
});
datastore0.invoke(() -> {
createPartitionedRegion(orderRegionName, createPartitionAttributes(customerRegionName, 10,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 0, 17));
});
datastore1.invoke(() -> {
createPartitionedRegion(orderRegionName, createPartitionAttributes(customerRegionName, 10,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 0, 17));
});
datastore2.invoke(() -> {
createPartitionedRegion(orderRegionName, createPartitionAttributes(customerRegionName, 10,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 0, 17));
});
datastore3.invoke(() -> {
createPartitionedRegion(orderRegionName, createPartitionAttributes(customerRegionName, 10,
new CustomerIDPartitionResolver("CustomerIDPartitionResolver"), 0, 17));
});
datastore3.invoke(() -> {
Region<CustId, Customer> customerRegion = getPartitionedRegion(customerRegionName);
Set<CustId> keySet = new HashSet<>();
for (int i = 1; i <= 10; i++) {
CustId custId = new CustId(i);
Customer customer = new Customer("name" + i, "Address" + i);
customerRegion.put(custId, customer);
keySet.add(custId);
}
Region<OrderId, Order> orderRegion = getPartitionedRegion(orderRegionName);
for (int i = 1; i <= 100; i++) {
CustId custId = new CustId(i);
for (int j = 1; j <= 10; j++) {
int orderIdInt = i * 10 + j;
OrderId orderId = new OrderId(orderIdInt, custId);
Order order = new Order("Order" + orderIdInt);
orderRegion.put(orderId, order);
// TODO: assertTrue(partitionedregion.containsKey(orderId));
// TODO: assertIndexDetailsEquals(order,partitionedregion.get(orderId));
}
}
Function<List<Order>> function = new TestFunction<>(true, TEST_FUNCTION3);
FunctionService.registerFunction(function);
Execution<Set<CustId>, List<Order>, List<List<Order>>> execution =
FunctionService.onRegion(customerRegion);
ResultCollector<List<Order>, List<List<Order>>> resultCollector =
execution.withFilter(keySet).execute(function.getId());
Collection<List<Order>> results = resultCollector.getResult();
assertThat(results.size()).isGreaterThan(0).isLessThanOrEqualTo(4);
List<Order> values = new ArrayList<>();
for (List<Order> orderList : results) {
values.addAll(orderList);
}
assertThat(values).hasSameSizeAs(keySet);
});
}
/**
* Assert the {@link RegionFunctionContext} yields the proper objects.
*/
@Test
public void testLocalDataContext() throws Exception {
VM accessor = getHost(0).getVM(1);
VM datastore1 = getHost(0).getVM(2);
VM datastore2 = getHost(0).getVM(3);
datastore1.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
});
datastore2.invoke(() -> {
createPartitionedRegion(regionName, 10, 0);
});
int key1 = 1;
int key2 = 2;
accessor.invoke(() -> {
Region<Integer, Integer> region = createPartitionedRegion(regionName, 0, 0);
// Assuming that bucket balancing will create a single bucket (per key)
// in different datastores
region.put(key1, key1);
region.put(key2, key2);
});
accessor.invoke(() -> validateLocalDataContext(key1, key2));
datastore1.invoke(() -> validateLocalDataContext(key1, key2));
datastore2.invoke(() -> validateLocalDataContext(key1, key2));
}
private void validateLocalDataContext(final int key1, final int key2) {
Region region = getPartitionedRegion(regionName);
Function function = new FunctionAdapter() {
@Override
public void execute(FunctionContext context) {
RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context;
Iterable<Integer> filterSet = (Set<Integer>) regionFunctionContext.getFilter();
assertThat(filterSet).hasSize(1).containsExactly(key1);
Region<Integer, Integer> dataSet = regionFunctionContext.getDataSet();
assertThat(PartitionRegionHelper.isPartitionedRegion(dataSet)).isTrue();
Region localData = PartitionRegionHelper.getLocalDataForContext(regionFunctionContext);
assertThat(PartitionRegionHelper.getColocatedRegions(localData)).isEmpty();
// Assert the data is local only
assertThat(localData.get(key2)).isNull();
assertThat(localData.get(key1)).isEqualTo(key1);
Set<Integer> localDataKeySet = localData.keySet();
assertThat(localDataKeySet).hasSize(1).containsExactly(key1);
Collection<Integer> localDataValuesCollection = localData.values();
assertThat(localDataValuesCollection).hasSize(1).containsExactly(key1);
validateLocalEntrySet(key1, localData.entrySet());
Set<Map.Entry<Integer, Integer>> localDataEntrySet = localData.entrySet();
assertThat(localDataEntrySet).hasSize(1);
Map.Entry<Integer, Integer> localDataEntry = localDataEntrySet.iterator().next();
assertThat(localDataEntry.getKey()).isEqualTo(key1);
assertThat(localDataEntry.getValue()).isEqualTo(key1);
context.getResultSender().lastResult(true);
}
// @Override
@Override
public String getId() {
return getClass().getName();
}
};
List<Boolean> results = (List) FunctionService.onRegion(region)
.withFilter(Collections.singleton(key1)).execute(function).getResult();
assertThat(results).hasSize(1).containsExactly(true);
}
/**
* Assert the {@link RegionFunctionContext} yields the proper objects.
*/
@Test
public void testLocalDataContextWithColocation() throws Exception {
VM accessor = getHost(0).getVM(1);
VM datastore1 = getHost(0).getVM(2);
VM datastore2 = getHost(0).getVM(3);
datastore1.invoke(() -> createColocatedPartitionedRegions(10));
datastore2.invoke(() -> createColocatedPartitionedRegions(10));
accessor.invoke(() -> createColocatedPartitionedRegions(0));
int key1 = 1;
int key2 = 2;
accessor.invoke(() -> {
Region<Integer, Integer> regionTop = getCache().getRegion(regionNameTop);
Region<Integer, Integer> regionColocated1 = getCache().getRegion(regionNameColocated1);
Region<Integer, Integer> regionColocated2 =
getCache().getRegion("root" + SEPARATOR + regionNameColocated2);
// Assuming that bucket balancing will create a single bucket (per key)
// in different datastores
regionTop.put(key1, key1);
regionTop.put(key2, key2);
regionColocated1.put(key1, key1);
regionColocated1.put(key2, key2);
regionColocated2.put(key1, key1);
regionColocated2.put(key2, key2);
});
accessor.invoke(() -> validateRegionFunctionContextForColocatedRegions(key1, key2));
datastore1.invoke(() -> validateRegionFunctionContextForColocatedRegions(key1, key2));
datastore2.invoke(() -> validateRegionFunctionContextForColocatedRegions(key1, key2));
}
private void createColocatedPartitionedRegions(final int localMaxMemory) {
createRootRegion(regionNameTop,
createColocatedPartitionedRegionAttributes(null, localMaxMemory, 0));
RegionAttributes colo =
createColocatedPartitionedRegionAttributes(regionNameTop, localMaxMemory, 0);
createRootRegion(regionNameColocated1, colo);
createRegion(regionNameColocated2, "root", colo);
}
private void validateRegionFunctionContextForColocatedRegions(final int key1, final int key2) {
Region rootRegion = getRootRegion(regionNameTop);
Function function = new Function() {
@Override
public boolean hasResult() {
return true;
}
@Override
public void execute(FunctionContext context) {
RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context;
Iterable<Integer> filterSet = (Set<Integer>) regionFunctionContext.getFilter();
assertThat(filterSet).hasSize(1).containsExactly(key1);
Region region = regionFunctionContext.getDataSet();
assertThat(region).isInstanceOf(PartitionedRegion.class);
Map<String, ? extends Region> colocatedRegionsMap =
PartitionRegionHelper.getColocatedRegions(region);
assertThat(colocatedRegionsMap).doesNotContainKey(regionNameTop);
Region localData = PartitionRegionHelper.getLocalDataForContext(regionFunctionContext);
Map<String, ? extends Region> colocatedLocalDataMap =
PartitionRegionHelper.getColocatedRegions(localData);
assertThat(colocatedLocalDataMap).doesNotContainKey(regionNameTop);
Region colocatedRegion1 = getRootRegion(regionNameColocated1);
Region colocatedRegion2 = getRootRegion().getSubregion(regionNameColocated2);
{
assertThat(colocatedRegionsMap.get(colocatedRegion1.getFullPath()))
.isSameAs(colocatedRegion1);
Region localDataRegion = PartitionRegionHelper.getLocalData(colocatedRegion1);
assertThat(localDataRegion).isInstanceOf(LocalDataSet.class);
validateLocalKeySet(key1, localDataRegion.keySet());
validateLocalValues(key1, localDataRegion.values());
validateLocalEntrySet(key1, localDataRegion.entrySet());
}
{
assertThat(colocatedRegionsMap.get(colocatedRegion2.getFullPath()))
.isSameAs(colocatedRegion2);
Region localDataRegion = PartitionRegionHelper.getLocalData(colocatedRegion2);
assertThat(localDataRegion).isInstanceOf(LocalDataSet.class);
validateLocalEntrySet(key1, localDataRegion.entrySet());
validateLocalKeySet(key1, localDataRegion.keySet());
validateLocalValues(key1, localDataRegion.values());
}
// Assert context's local colocated data
{
Region localDataRegion = colocatedLocalDataMap.get(colocatedRegion1.getFullPath());
assertThat(localDataRegion.getFullPath()).isEqualTo(colocatedRegion1.getFullPath());
assertThat(localDataRegion).isInstanceOf(LocalDataSet.class);
validateLocalEntrySet(key1, localDataRegion.entrySet());
validateLocalKeySet(key1, localDataRegion.keySet());
validateLocalValues(key1, localDataRegion.values());
}
{
Region localDataRegion = colocatedLocalDataMap.get(colocatedRegion2.getFullPath());
assertThat(localDataRegion.getFullPath()).isEqualTo(colocatedRegion2.getFullPath());
assertThat(localDataRegion).isInstanceOf(LocalDataSet.class);
validateLocalEntrySet(key1, localDataRegion.entrySet());
validateLocalKeySet(key1, localDataRegion.keySet());
validateLocalValues(key1, localDataRegion.values());
}
// Assert both local forms of the "target" region is local only
assertThat(localData.get(key2)).isNull();
assertThat(localData.get(key1)).isEqualTo(key1);
validateLocalEntrySet(key1, localData.entrySet());
validateLocalKeySet(key1, localData.keySet());
validateLocalValues(key1, localData.values());
context.getResultSender().lastResult(true);
}
// @Override
@Override
public String getId() {
return getClass().getName();
}
};
ResultCollector<Boolean, List<Boolean>> resultCollector =
FunctionService.onRegion(rootRegion).withFilter(createKeySet(key1)).execute(function);
assertThat(resultCollector.getResult()).hasSize(1).containsExactly(true);
}
private List<Boolean> executeFunction() {
PartitionedRegion pr = getPartitionedRegion(regionName);
Set<String> keySet = new HashSet<>();
for (int i = pr.getTotalNumberOfBuckets() * 2; i > 0; i--) {
keySet.add(STRING_KEY + i);
}
Function<Boolean> function = new TestFunction<>(true, TEST_FUNCTION_HA);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(pr);
ResultCollector<Boolean, List<Boolean>> resultCollector =
dataSet.withFilter(keySet).setArguments(true).execute(function.getId());
return resultCollector.getResult();
}
private void validateLocalValues(final int value, final Iterable<Integer> values) {
assertThat(Collections.singleton(value)).isEqualTo(values);
assertThat(values).contains(value);
assertThat(values).hasSize(1);
Iterator iterator = values.iterator();
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isEqualTo(value);
assertThat(iterator.hasNext()).isFalse();
}
private void validateLocalKeySet(final int key, final Iterable<Integer> keys) {
assertThat(Collections.singleton(key)).isEqualTo(keys);
assertThat(keys).hasSize(1);
}
private void validateLocalEntrySet(final int key, final Iterable<Integer> entries) {
assertThat(entries).hasSize(1);
Iterator iterator = entries.iterator();
assertThat(iterator.hasNext()).isTrue();
Entry regionEntry = (Entry) iterator.next();
if (regionEntry instanceof EntrySnapshot) {
assertThat(((EntrySnapshot) regionEntry).wasInitiallyLocal()).isTrue();
} else {
assertThat(regionEntry.isLocal()).isTrue();
}
assertThat(regionEntry.getKey()).isEqualTo(key);
assertThat(regionEntry.getValue()).isEqualTo(key);
assertThat(iterator.hasNext()).isFalse();
}
private RegionAttributes createColocatedPartitionedRegionAttributes(final String colocatedWith,
final int localMaxMemory, final int redundancy) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setColocatedWith(colocatedWith);
paf.setLocalMaxMemory(localMaxMemory);
paf.setPartitionResolver(new SimpleResolver());
paf.setRedundantCopies(redundancy);
AttributesFactory af = new AttributesFactory();
af.setPartitionAttributes(paf.create());
return af.create();
}
private PartitionedRegion createPartitionedRegion(final String regionName,
final int localMaxMemory, final int redundancy) {
PartitionAttributes pa = createPartitionAttributes(localMaxMemory, redundancy);
return createPartitionedRegion(regionName, pa);
}
private PartitionedRegion createPartitionedRegion(final String regionName,
final int localMaxMemory, final int redundancy, final PartitionResolver resolver) {
PartitionAttributes pa = createPartitionAttributes(localMaxMemory, redundancy, resolver);
return createPartitionedRegion(regionName, pa);
}
private PartitionedRegion createPartitionedRegion(final String regionName,
final PartitionAttributes partitionAttributes) {
RegionFactory regionFactory = getCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(partitionAttributes);
return (PartitionedRegion) regionFactory.create(regionName);
}
private PartitionAttributes createPartitionAttributes(final int localMaxMemory,
final int redundancy) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(localMaxMemory);
paf.setRedundantCopies(redundancy);
return paf.create();
}
private PartitionAttributes createPartitionAttributes(final int localMaxMemory,
final int redundancy, final PartitionResolver resolver) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(localMaxMemory);
paf.setPartitionResolver(resolver);
paf.setRedundantCopies(redundancy);
return paf.create();
}
private PartitionAttributes createPartitionAttributes(final int localMaxMemory,
final int redundancy, final PartitionResolver resolver, final int totalNumBuckets) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(localMaxMemory);
paf.setPartitionResolver(resolver);
paf.setRedundantCopies(redundancy);
paf.setTotalNumBuckets(totalNumBuckets);
return paf.create();
}
private PartitionAttributes createPartitionAttributes(final int localMaxMemory,
final int redundancy, final long startupRecoveryDelay, final int totalNumBuckets) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(localMaxMemory);
paf.setRedundantCopies(redundancy);
paf.setStartupRecoveryDelay(startupRecoveryDelay);
paf.setTotalNumBuckets(totalNumBuckets);
return paf.create();
}
private PartitionAttributes createPartitionAttributes(final int localMaxMemory,
final int redundancy, final int totalNumBuckets) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setLocalMaxMemory(localMaxMemory);
paf.setRedundantCopies(redundancy);
paf.setTotalNumBuckets(totalNumBuckets);
return paf.create();
}
private PartitionAttributes createPartitionAttributes(final String colocatedWith,
final int localMaxMemory, final PartitionResolver resolver, final int redundancy,
final int totalNumBuckets) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setColocatedWith(colocatedWith);
paf.setLocalMaxMemory(localMaxMemory);
paf.setPartitionResolver(resolver);
paf.setRedundantCopies(redundancy);
paf.setTotalNumBuckets(totalNumBuckets);
return paf.create();
}
private PartitionedRegion getPartitionedRegion(final String regionName) {
return (PartitionedRegion) getCache().getRegion(regionName);
}
private Set<String> createKeySet(final String... keys) {
Set<String> keySet = new HashSet<>();
for (String key : keys) {
keySet.add(key);
}
return keySet;
}
private Set<Integer> createKeySet(final int... keys) {
Set<Integer> keySet = new HashSet<>();
for (int key : keys) {
keySet.add(key);
}
return keySet;
}
private Map<String, Integer> createEntryMap(final int count, final String keyPrefix,
final int startKey, final int startValue) {
Map<String, Integer> entryMap = new HashMap<>();
int value = startValue;
for (int keyIndex = startKey; keyIndex < startKey + count; keyIndex++) {
entryMap.put(keyPrefix + keyIndex, value++);
}
return entryMap;
}
private Set<Integer> createBucketIdSet(final int... bucketIds) {
Set<Integer> bucketIdSet = new HashSet<>();
for (int bucketId : bucketIds) {
bucketIdSet.add(bucketId);
}
return bucketIdSet;
}
private static class SimpleResolver implements PartitionResolver, Serializable {
@Override
public Serializable getRoutingObject(EntryOperation opDetails) {
return (Serializable) opDetails.getKey();
}
@Override
public String getName() {
return getClass().getName();
}
@Override
public void close() {
// nothing
}
}
private static class DivisorResolver implements PartitionResolver {
private final int divisor;
DivisorResolver(final int divisor) {
this.divisor = divisor;
}
@Override
public Object getRoutingObject(EntryOperation opDetails) {
Object key = opDetails.getKey();
return key.hashCode() / divisor;
}
@Override
public String getName() {
return getClass().getName();
}
@Override
public void close() {
// nothing
}
}
}