| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.execute; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.UseParametersRunnerFactory; |
| |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.execute.Execution; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionAdapter; |
| import org.apache.geode.cache.execute.FunctionContext; |
| import org.apache.geode.cache.execute.FunctionException; |
| import org.apache.geode.cache.execute.FunctionInvocationTargetException; |
| import org.apache.geode.cache.execute.FunctionService; |
| import org.apache.geode.cache.execute.ResultCollector; |
| import org.apache.geode.cache.execute.ResultSender; |
| import org.apache.geode.distributed.ConfigurationProperties; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.functions.TestFunction; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.ThreadUtils; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| import org.apache.geode.test.junit.categories.FunctionServiceTest; |
| import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; |
| |
| @Category({ClientServerTest.class, FunctionServiceTest.class}) |
| @RunWith(Parameterized.class) |
| @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) |
| public class PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| extends PRClientServerTestBase { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final String TEST_FUNCTION7 = TestFunction.TEST_FUNCTION7; |
| |
| private static final String TEST_FUNCTION2 = TestFunction.TEST_FUNCTION2; |
| |
| private Boolean isByName = null; |
| |
| private static int retryCount = 0; |
| |
| public PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest() { |
| super(); |
| } |
| |
| @Override |
| public Properties getDistributedSystemProperties() { |
| Properties result = super.getDistributedSystemProperties(); |
| result.put(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER, |
| "org.apache.geode.internal.cache.partitioned.PersistentPartitionedRegionDUnitTest$TestFunction"); |
| return result; |
| } |
| |
| /* |
| * Execution of the function on server with |
| */ |
| @Test |
| public void testServerAllKeyExecution_byInstance() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.FALSE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverAllKeyExecution(isByName)); |
| } |
| |
| /* |
| * Execution of the function on server with |
| */ |
| @Test |
| public void testServerGetAllFunction() { |
| createScenario(); |
| client.invoke(PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::getAll); |
| } |
| |
| /* |
| * Execution of the function on server with |
| */ |
| @Test |
| public void testServerPutAllFunction() { |
| createScenario(); |
| client.invoke(PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::putAll); |
| } |
| |
| /* |
| * Execution of the function on server with single key as the routing object and using the name of |
| * the function |
| */ |
| @Test |
| public void testServerSingleKeyExecution_byName() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.TRUE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverSingleKeyExecution(isByName)); |
| } |
| |
| /* |
| * Execution of the function on server with single key as the routing. Function throws the |
| * FunctionInvocationTargetException. As this is the case of HA then system should retry the |
| * function execution. After 5th attempt function will send Boolean as last result. |
| */ |
| @Test |
| public void testserverSingleKeyExecution_FunctionInvocationTargetException() { |
| createScenario(); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::serverSingleKeyExecution_FunctionInvocationTargetException); |
| } |
| |
| @Test |
| public void testServerSingleKeyExecution_SocketTimeOut() { |
| createScenario(); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); |
| registerFunctionAtServer(function); |
| isByName = Boolean.TRUE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverSingleKeyExecutionSocketTimeOut(isByName)); |
| } |
| |
| /* |
| * Execution of the function on server with single key as the routing object and using the |
| * instance of the function |
| */ |
| @Test |
| public void testServerSingleKeyExecution_byInstance() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.FALSE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverSingleKeyExecution(isByName)); |
| } |
| |
| /* |
| * Execution of the inline function on server with single key as the routing object |
| */ |
| @Test |
| public void testServerSingleKeyExecution_byInlineFunction() { |
| createScenario(); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::serverSingleKeyExecution_Inline); |
| } |
| |
| /* |
| * Execution of the function on server with set multiple keys as the routing object and using the |
| * name of the function |
| */ |
| @Test |
| public void testserverMultiKeyExecution_byName() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.TRUE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverMultiKeyExecution(isByName)); |
| server1.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::checkBucketsOnServer); |
| server2.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::checkBucketsOnServer); |
| server3.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::checkBucketsOnServer); |
| } |
| |
| @Test |
| public void testserverMultiKeyExecution_SocektTimeOut() { |
| createScenario(); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); |
| registerFunctionAtServer(function); |
| isByName = Boolean.TRUE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverMultiKeyExecutionSocketTimeOut(isByName)); |
| } |
| |
| /* |
| * Execution of the inline function on server with set multiple keys as the routing object |
| */ |
| @Test |
| public void testserverMultiKeyExecution_byInlineFunction() { |
| createScenario(); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::serverMultiKeyExecution_Inline); |
| } |
| |
| /* |
| * Execution of the inline function on server with set multiple keys as the routing object |
| * Function throws the FunctionInvocationTargetException. As this is the case of HA then system |
| * should retry the function execution. After 5th attempt function will send Boolean as last |
| * result. |
| */ |
| @Test |
| public void testserverMultiKeyExecution_FunctionInvocationTargetException() { |
| createScenario(); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::serverMultiKeyExecution_FunctionInvocationTargetException); |
| } |
| |
| /* |
| * Execution of the function on server with set multiple keys as the routing object and using the |
| * name of the function |
| */ |
| @Test |
| public void testserverMultiKeyExecutionNoResult_byName() { |
| createScenario(); |
| Function function = new TestFunction(false, TEST_FUNCTION7); |
| registerFunctionAtServer(function); |
| isByName = Boolean.TRUE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverMultiKeyExecutionNoResult(isByName)); |
| } |
| |
| /* |
| * Execution of the function on server with set multiple keys as the routing object and using the |
| * instance of the function |
| */ |
| @Test |
| public void testserverMultiKeyExecution_byInstance() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.FALSE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverMultiKeyExecution(isByName)); |
| } |
| |
| /* |
| * Ensure that the execution is limited to a single bucket put another way, that the routing logic |
| * works correctly such that there is not extra execution |
| */ |
| @Test |
| public void testserverMultiKeyExecutionOnASingleBucket_byName() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.TRUE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverMultiKeyExecutionOnASingleBucket(isByName)); |
| } |
| |
| /* |
| * Ensure that the execution is limited to a single bucket put another way, that the routing logic |
| * works correctly such that there is not extra execution |
| */ |
| @Test |
| public void testserverMultiKeyExecutionOnASingleBucket_byInstance() { |
| createScenario(); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| registerFunctionAtServer(function); |
| isByName = Boolean.FALSE; |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .serverMultiKeyExecutionOnASingleBucket(isByName)); |
| } |
| |
| /* |
| * Ensure that the while executing the function if the servers is down then the execution is |
| * failover to other available server |
| */ |
| @Test |
| public void testServerFailoverWithTwoServerAliveHA() { |
| IgnoredException.addIgnoredException("FunctionInvocationTargetException"); |
| IgnoredException.addIgnoredException("Connection reset"); |
| IgnoredException.addIgnoredException("SocketTimeoutException"); |
| IgnoredException.addIgnoredException("ServerConnectivityException"); |
| IgnoredException.addIgnoredException("Socket Closed"); |
| ArrayList commonAttributes = |
| createCommonServerAttributes("TestPartitionedRegion", null, 1, null); |
| createClientServerScenarion(commonAttributes, 20, 20, 20); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA); |
| registerFunctionAtServer(function); |
| server2.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::stopServerHA); |
| server3.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::stopServerHA); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::putOperation); |
| |
| int AsyncInvocationArrSize = 1; |
| AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; |
| async[0] = |
| client.invokeAsync( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::executeFunctionHA); |
| server2.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::startServerHA); |
| server3.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::startServerHA); |
| server1.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::stopServerHA); |
| client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest |
| .verifyDeadAndLiveServers(2)); |
| ThreadUtils.join(async[0], 6 * 60 * 1000); |
| if (async[0].getException() != null) { |
| Assert.fail("UnExpected Exception Occurred : ", async[0].getException()); |
| } |
| List l = (List) async[0].getReturnValue(); |
| |
| assertEquals(2, l.size()); |
| } |
| |
| /* |
| * Ensure that the while executing the function if the servers is down then the execution is |
| * failover to other available server |
| */ |
| @Test |
| public void testServerCacheClosedFailoverWithTwoServerAliveHA() { |
| IgnoredException.addIgnoredException("FunctionInvocationTargetException"); |
| IgnoredException.addIgnoredException("Connection reset"); |
| IgnoredException.addIgnoredException("SocketTimeoutException"); |
| IgnoredException.addIgnoredException("ServerConnectivityException"); |
| IgnoredException.addIgnoredException("Socket Closed"); |
| ArrayList commonAttributes = |
| createCommonServerAttributes("TestPartitionedRegion", null, 1, null); |
| createClientServerScenarion(commonAttributes, 20, 20, 20); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA); |
| registerFunctionAtServer(function); |
| server2.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::stopServerHA); |
| server3.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::stopServerHA); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::putOperation); |
| int AsyncInvocationArrSize = 1; |
| AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; |
| async[0] = |
| client.invokeAsync( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::executeFunctionHA); |
| server2.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::startServerHA); |
| server3.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::startServerHA); |
| server1.invoke( |
| PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::closeCacheHA); |
| client.invoke(() -> PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest |
| .verifyDeadAndLiveServers(2)); |
| ThreadUtils.join(async[0], 5 * 60 * 1000); |
| if (async[0].getException() != null) { |
| Assert.fail("UnExpected Exception Occurred : ", async[0].getException()); |
| } |
| List l = (List) async[0].getReturnValue(); |
| assertEquals(2, l.size()); |
| } |
| |
| @Test |
| public void testBug40714() { |
| createScenario(); |
| server1.invoke( |
| (SerializableRunnableIF) PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::registerFunction); |
| server1.invoke( |
| (SerializableRunnableIF) PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::registerFunction); |
| server1.invoke( |
| (SerializableRunnableIF) PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::registerFunction); |
| client.invoke( |
| (SerializableRunnableIF) PRClientServerRegionFunctionExecutionSelectorNoSingleHopDUnitTest::registerFunction); |
| client.invoke( |
| PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714); |
| } |
| |
| public static void registerFunction() { |
| FunctionService.registerFunction(new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| @SuppressWarnings("unchecked") |
| final ResultSender<Object> resultSender = context.getResultSender(); |
| if (context.getArguments() instanceof String) { |
| resultSender.lastResult("Failure"); |
| } else if (context.getArguments() instanceof Boolean) { |
| resultSender.lastResult(Boolean.FALSE); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return "Function"; |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }); |
| } |
| |
| public static void verifyDeadAndLiveServers(final Integer expectedLiveServers) { |
| WaitCriterion wc = new WaitCriterion() { |
| String excuse; |
| |
| @Override |
| public boolean done() { |
| int sz = pool.getConnectedServerCount(); |
| logger.info("Checking for the Live Servers : Expected : " |
| + expectedLiveServers + " Available :" + sz); |
| if (sz == expectedLiveServers) { |
| return true; |
| } |
| excuse = "Expected " + expectedLiveServers + " but found " + sz; |
| return false; |
| } |
| |
| @Override |
| public String description() { |
| return excuse; |
| } |
| }; |
| Wait.waitForCriterion(wc, 3 * 60 * 1000, 1000, true); |
| } |
| |
| public static void executeFunction() { |
| Region region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 10); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| ResultCollector rc1 = |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId()); |
| |
| HashMap resultMap = ((HashMap) rc1.getResult()); |
| assertEquals(3, resultMap.size()); |
| |
| for (Object o : resultMap.entrySet()) { |
| Map.Entry entry = (Map.Entry) o; |
| ArrayList resultListForMember = (ArrayList) entry.getValue(); |
| |
| for (Object result : resultListForMember) { |
| assertEquals(Boolean.TRUE, result); |
| } |
| } |
| } catch (Exception e) { |
| logger.info("Got an exception : " + e.getMessage()); |
| assertTrue(e instanceof CacheClosedException); |
| } |
| } |
| |
| private static Object executeFunctionHA() { |
| Region region = cache.getRegion(PartitionedRegionName); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 10); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| ResultCollector rc1 = |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId()); |
| List l = ((List) rc1.getResult()); |
| logger.info("Result size : " + l.size()); |
| return l; |
| } |
| |
| private static void putOperation() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 10); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| int j = 0; |
| for (String s : testKeysSet) { |
| Integer val = j++; |
| region.put(s, val); |
| } |
| } |
| |
| private void createScenario() { |
| ArrayList commonAttributes = |
| createCommonServerAttributes("TestPartitionedRegion", null, 0, null); |
| createClientServerScenarioSelectorNoSingleHop(commonAttributes, 20, 20, 20); |
| } |
| |
| private static void checkBucketsOnServer() { |
| PartitionedRegion region = (PartitionedRegion) cache.getRegion(PartitionedRegionName); |
| HashMap localBucket2RegionMap = (HashMap) region.getDataStore().getSizeLocally(); |
| logger.info( |
| "Size of the " + PartitionedRegionName + " in this VM :- " + localBucket2RegionMap.size()); |
| Set entrySet = localBucket2RegionMap.entrySet(); |
| assertNotNull(entrySet); |
| } |
| |
| private static void serverAllKeyExecution(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets / 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| int j = 0; |
| HashSet<Integer> origVals = new HashSet<>(); |
| for (String item : testKeysSet) { |
| Integer val = j++; |
| origVals.add(val); |
| region.put(item, val); |
| } |
| ResultCollector rc1 = executeOnAll(dataSet, Boolean.TRUE, function, isByName); |
| List resultList = (List) rc1.getResult(); |
| logger.info("Result size : " + resultList.size()); |
| logger.info("Result are SSSS : " + resultList); |
| assertEquals(3, resultList.size()); |
| |
| for (Object result : resultList) { |
| assertEquals(Boolean.TRUE, result); |
| } |
| ResultCollector rc2 = executeOnAll(dataSet, testKeysSet, function, isByName); |
| List l2 = ((List) rc2.getResult()); |
| assertEquals(3, l2.size()); |
| HashSet<Integer> foundVals = new HashSet(); |
| for (Object value : l2) { |
| ArrayList subL = (ArrayList) (value); |
| assertTrue(subL.size() > 0); |
| for (Object o : subL) { |
| assertTrue(foundVals.add((Integer) o)); |
| } |
| } |
| assertEquals(origVals, foundVals); |
| |
| } catch (Exception e) { |
| Assert.fail("Test failed after the put operation", e); |
| |
| } |
| } |
| |
| public static void getAll() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final List<String> testKeysList = new ArrayList<>(); |
| for (int i = (totalNumBuckets * 3); i > 0; i--) { |
| testKeysList.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| try { |
| int j = 0; |
| Map<String, Integer> origVals = new HashMap<>(); |
| for (String key : testKeysList) { |
| Integer val = j++; |
| origVals.put(key, val); |
| region.put(key, val); |
| } |
| Map resultMap = region.getAll(testKeysList); |
| assertEquals(resultMap, origVals); |
| Wait.pause(2000); |
| Map secondResultMap = region.getAll(testKeysList); |
| assertEquals(secondResultMap, origVals); |
| |
| } catch (Exception e) { |
| Assert.fail("Test failed after the put operation", e); |
| |
| } |
| } |
| |
| public static void putAll() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final List<String> testKeysList = new ArrayList<>(); |
| for (int i = (totalNumBuckets * 3); i > 0; i--) { |
| testKeysList.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| try { |
| int j = 0; |
| Map<String, Integer> origVals = new HashMap<>(); |
| for (String key : testKeysList) { |
| Integer val = j++; |
| origVals.put(key, val); |
| region.put(key, val); |
| } |
| Map resultMap = region.getAll(testKeysList); |
| assertEquals(resultMap, origVals); |
| Wait.pause(2000); |
| Map secondResultMap = region.getAll(testKeysList); |
| assertEquals(secondResultMap, origVals); |
| |
| } catch (Exception e) { |
| Assert.fail("Test failed after the put operation", e); |
| |
| } |
| } |
| |
| private static void serverMultiKeyExecutionOnASingleBucket(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| int j = 0; |
| for (String value : testKeysSet) { |
| Integer val = j++; |
| region.put(value, val); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| for (String o : testKeysSet) { |
| try { |
| Set<String> singleKeySet = Collections.singleton(o); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| ResultCollector rc1 = execute(dataSet, singleKeySet, Boolean.TRUE, function, isByName); |
| List l = ((List) rc1.getResult()); |
| assertEquals(1, l.size()); |
| |
| ResultCollector rc2 = |
| execute(dataSet, singleKeySet, new HashSet<>(singleKeySet), function, isByName); |
| List l2 = ((List) rc2.getResult()); |
| |
| assertEquals(1, l2.size()); |
| List subList = (List) l2.iterator().next(); |
| assertEquals(1, subList.size()); |
| assertEquals(region.get(singleKeySet.iterator().next()), subList.iterator().next()); |
| } catch (Exception expected) { |
| logger.info("Exception : " + expected.getMessage()); |
| expected.printStackTrace(); |
| fail("Test failed after the put operation"); |
| } |
| } |
| } |
| |
| private static void serverMultiKeyExecution(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| int j = 0; |
| HashSet<Integer> origVals = new HashSet<>(); |
| for (String element : testKeysSet) { |
| Integer val = j++; |
| origVals.add(val); |
| region.put(element, val); |
| } |
| ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); |
| List l = ((List) rc1.getResult()); |
| logger.info("Result size : " + l.size()); |
| assertEquals(3, l.size()); |
| for (Object item : l) { |
| assertEquals(Boolean.TRUE, item); |
| } |
| |
| ResultCollector rc2 = execute(dataSet, testKeysSet, testKeysSet, function, isByName); |
| List l2 = ((List) rc2.getResult()); |
| assertEquals(3, l2.size()); |
| HashSet<Integer> foundVals = new HashSet<>(); |
| for (Object value : l2) { |
| ArrayList subL = (ArrayList) value; |
| assertTrue(subL.size() > 0); |
| for (Object o : subL) { |
| assertTrue(foundVals.add((Integer) o)); |
| } |
| } |
| assertEquals(origVals, foundVals); |
| |
| } catch (Exception e) { |
| Assert.fail("Test failed after the put operation", e); |
| |
| } |
| } |
| |
| private static void serverMultiKeyExecutionSocketTimeOut(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| int j = 0; |
| for (String value : testKeysSet) { |
| Integer val = j++; |
| region.put(value, val); |
| } |
| ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); |
| List l = ((List) rc1.getResult()); |
| logger.info("Result size : " + l.size()); |
| assertEquals(3, l.size()); |
| for (Object o : l) { |
| assertEquals(Boolean.TRUE, o); |
| } |
| |
| } catch (Exception e) { |
| Assert.fail("Test failed after the function execution", e); |
| |
| } |
| } |
| |
| private static void serverSingleKeyExecutionSocketTimeOut(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final String testKey = "execKey"; |
| final Set<String> testKeysSet = new HashSet<>(); |
| testKeysSet.add(testKey); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| |
| region.put(testKey, 1); |
| try { |
| ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); |
| assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0)); |
| |
| ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function, isByName); |
| assertEquals(testKey, ((List) rs2.getResult()).get(0)); |
| |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| logger.info("Exception : ", ex); |
| Assert.fail("Test failed after the put operation", ex); |
| } |
| } |
| |
| private static void serverMultiKeyExecution_Inline() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| int j = 0; |
| for (String value : testKeysSet) { |
| Integer val = j++; |
| region.put(value, val); |
| } |
| ResultCollector rc1 = |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| @SuppressWarnings("unchecked") |
| final ResultSender<Object> resultSender = context.getResultSender(); |
| if (context.getArguments() instanceof String) { |
| resultSender.lastResult("Success"); |
| } else if (context.getArguments() instanceof Boolean) { |
| resultSender.lastResult(Boolean.TRUE); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return getClass().getName(); |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }); |
| List l = ((List) rc1.getResult()); |
| logger.info("Result size : " + l.size()); |
| assertEquals(3, l.size()); |
| for (Object o : l) { |
| assertEquals(Boolean.TRUE, o); |
| } |
| } catch (Exception e) { |
| logger.info("Exception : " + e.getMessage()); |
| e.printStackTrace(); |
| fail("Test failed after the put operation"); |
| |
| } |
| } |
| |
| private static void serverMultiKeyExecution_FunctionInvocationTargetException() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Execution dataSet = FunctionService.onRegion(region); |
| int j = 0; |
| for (String o : testKeysSet) { |
| Integer val = j++; |
| region.put(o, val); |
| } |
| try { |
| ResultCollector rc1 = |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| if (context.isPossibleDuplicate()) { |
| context.getResultSender().lastResult(retryCount); |
| return; |
| } |
| if (context.getArguments() instanceof Boolean) { |
| throw new FunctionInvocationTargetException("I have been thrown from TestFunction"); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return getClass().getName(); |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }); |
| |
| List list = (ArrayList) rc1.getResult(); |
| assertEquals(list.get(0), 0); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| Assert.fail("This is not expected Exception", e); |
| } |
| |
| } |
| |
| private static void serverMultiKeyExecutionNoResult(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(false, TEST_FUNCTION7); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| String msg = "<ExpectedException action=add>" + "FunctionException" + "</ExpectedException>"; |
| cache.getLogger().info(msg); |
| int j = 0; |
| for (String o : testKeysSet) { |
| Integer val = j++; |
| region.put(o, val); |
| } |
| ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); |
| rc1.getResult(); |
| Thread.sleep(20000); |
| fail("Test failed after the put operation"); |
| } catch (FunctionException expected) { |
| expected.printStackTrace(); |
| logger.info("Exception : " + expected.getMessage()); |
| assertTrue(expected.getMessage() |
| .startsWith((String.format("Cannot %s result as the Function#hasResult() is false", |
| "return any")))); |
| } catch (Exception notexpected) { |
| Assert.fail("Test failed during execute or sleeping", notexpected); |
| } finally { |
| cache.getLogger() |
| .info("<ExpectedException action=remove>" + "FunctionException" + "</ExpectedException>"); |
| } |
| } |
| |
| private static void serverSingleKeyExecution(Boolean isByName) { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final String testKey = "execKey"; |
| final Set<String> testKeysSet = new HashSet<>(); |
| testKeysSet.add(testKey); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); |
| } catch (Exception expected) { |
| assertTrue(expected.getMessage().contains("No target node found for KEY = " + testKey) |
| || expected.getMessage().startsWith("Server could not send the reply") |
| || expected.getMessage().startsWith("Unexpected exception during")); |
| } |
| |
| region.put(testKey, 1); |
| try { |
| ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); |
| assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0)); |
| |
| ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function, isByName); |
| assertEquals(1, ((List) rs2.getResult()).get(0)); |
| |
| HashMap<String, Integer> putData = new HashMap<>(); |
| putData.put(testKey + "1", 2); |
| putData.put(testKey + "2", 3); |
| |
| ResultCollector rs1 = execute(dataSet, testKeysSet, putData, function, isByName); |
| assertEquals(Boolean.TRUE, ((List) rs1.getResult()).get(0)); |
| |
| assertEquals((Integer) 2, region.get(testKey + "1")); |
| assertEquals((Integer) 3, region.get(testKey + "2")); |
| |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| logger.info("Exception : ", ex); |
| Assert.fail("Test failed after the put operation", ex); |
| } |
| } |
| |
| private static void serverSingleKeyExecution_FunctionInvocationTargetException() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final String testKey = "execKey"; |
| final Set<String> testKeysSet = new HashSet<>(); |
| testKeysSet.add(testKey); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(region); |
| |
| region.put(testKey, 1); |
| try { |
| ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, false); |
| ArrayList list = (ArrayList) rs.getResult(); |
| assertTrue(((Integer) list.get(0)) >= 5); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| Assert.fail("This is not expected Exception", ex); |
| } |
| } |
| |
| private static void serverSingleKeyExecution_Inline() { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final String testKey = "execKey"; |
| final Set<String> testKeysSet = new HashSet<>(); |
| testKeysSet.add(testKey); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| cache.getLogger() |
| .info("<ExpectedException action=add>" + "No target node found for KEY = " |
| + "|Server could not send the reply" + "|Unexpected exception during" |
| + "</ExpectedException>"); |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| @SuppressWarnings("unchecked") |
| final ResultSender<Object> resultSender = context.getResultSender(); |
| if (context.getArguments() instanceof String) { |
| resultSender.lastResult("Success"); |
| } |
| resultSender.lastResult("Failure"); |
| } |
| |
| @Override |
| public String getId() { |
| return getClass().getName(); |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }); |
| } catch (Exception expected) { |
| logger.debug("Exception occurred : " + expected.getMessage()); |
| assertTrue(expected.getMessage().contains("No target node found for KEY = " + testKey) |
| || expected.getMessage().startsWith("Server could not send the reply") |
| || expected.getMessage().startsWith("Unexpected exception during")); |
| } finally { |
| cache.getLogger() |
| .info("<ExpectedException action=remove>" + "No target node found for KEY = " |
| + "|Server could not send the reply" + "|Unexpected exception during" |
| + "</ExpectedException>"); |
| } |
| |
| region.put(testKey, 1); |
| try { |
| ResultCollector rs = |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| @SuppressWarnings("unchecked") |
| final ResultSender<Object> resultSender = context.getResultSender(); |
| if (context.getArguments() instanceof String) { |
| resultSender.lastResult("Success"); |
| } else { |
| resultSender.lastResult("Failure"); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return getClass().getName(); |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }); |
| assertEquals("Failure", ((List) rs.getResult()).get(0)); |
| |
| ResultCollector rs2 = |
| dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| @SuppressWarnings("unchecked") |
| final ResultSender<Object> resultSender = context.getResultSender(); |
| if (context.getArguments() instanceof String) { |
| resultSender.lastResult("Success"); |
| } else { |
| resultSender.lastResult("Failure"); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return getClass().getName(); |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }); |
| assertEquals("Success", ((List) rs2.getResult()).get(0)); |
| |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| logger.info("Exception : ", ex); |
| Assert.fail("Test failed after the put operation", ex); |
| } |
| } |
| |
| private static ResultCollector execute(Execution dataSet, Set testKeysSet, Serializable args, |
| Function function, Boolean isByName) { |
| if (isByName) {// by name |
| return dataSet.withFilter(testKeysSet).setArguments(args).execute(function.getId()); |
| } else { // By Instance |
| return dataSet.withFilter(testKeysSet).setArguments(args).execute(function); |
| } |
| } |
| |
| private static ResultCollector executeOnAll(Execution dataSet, Serializable args, |
| Function function, Boolean isByName) { |
| if (isByName) {// by name |
| return dataSet.setArguments(args).execute(function.getId()); |
| } else { // By Instance |
| return dataSet.setArguments(args).execute(function); |
| } |
| } |
| } |