| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.execute; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.junit.runners.Parameterized.UseParametersRunnerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.execute.Execution; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionAdapter; |
| import org.apache.geode.cache.execute.FunctionContext; |
| import org.apache.geode.cache.execute.FunctionService; |
| import org.apache.geode.cache.execute.ResultCollector; |
| import org.apache.geode.cache.execute.ResultSender; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.ConfigurationProperties; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.AvailablePort; |
| import org.apache.geode.internal.cache.PartitionAttributesImpl; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionTestHelper; |
| import org.apache.geode.internal.cache.execute.metrics.FunctionServiceStats; |
| import org.apache.geode.internal.cache.execute.metrics.FunctionStats; |
| import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager; |
| import org.apache.geode.internal.cache.functions.TestFunction; |
| import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.SerializableCallableIF; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.junit.categories.FunctionServiceTest; |
| import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; |
| |
| /** |
| * This is DUnite Test to test the Function Execution stats under various scenarios like |
| * Client-Server with Region/without Region, P2P with partitioned Region/Distributed Region,member |
| * Execution |
| */ |
| @Category({FunctionServiceTest.class}) |
| @RunWith(Parameterized.class) |
| @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) |
| public class FunctionServiceStatsDUnitTest extends PRClientServerTestBase { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| static InternalDistributedSystem ds = null; |
| |
| private static int noOfExecutionCalls_Aggregate = 0; |
| private static int noOfExecutionsCompleted_Aggregate = 0; |
| private static int resultReceived_Aggregate = 0; |
| |
| private static int noOfExecutionCalls_TESTFUNCTION1 = 0; |
| private static int noOfExecutionsCompleted_TESTFUNCTION1 = 0; |
| private static int resultReceived_TESTFUNCTION1 = 0; |
| |
| private static int noOfExecutionCalls_TESTFUNCTION2 = 0; |
| private static int noOfExecutionsCompleted_TESTFUNCTION2 = 0; |
| private static int resultReceived_TESTFUNCTION2 = 0; |
| |
| private static int noOfExecutionCalls_TESTFUNCTION3 = 0; |
| private static int noOfExecutionsCompleted_TESTFUNCTION3 = 0; |
| private static int resultReceived_TESTFUNCTION3 = 0; |
| |
| private static int noOfExecutionCalls_TESTFUNCTION5 = 0; |
| private static int noOfExecutionsCompleted_TESTFUNCTION5 = 0; |
| private static int resultReceived_TESTFUNCTION5 = 0; |
| |
| private static int noOfExecutionCalls_Inline = 0; |
| private static int noOfExecutionsCompleted_Inline = 0; |
| private static int resultReceived_Inline = 0; |
| |
| @Override |
| protected final void postSetUpPRClientServerTestBase() { |
| // Make sure stats to linger from a previous test |
| disconnectAllFromDS(); |
| } |
| |
| private final transient SerializableRunnableIF initializeStats = () -> { |
| noOfExecutionCalls_Aggregate = 0; |
| noOfExecutionsCompleted_Aggregate = 0; |
| resultReceived_Aggregate = 0; |
| |
| noOfExecutionCalls_TESTFUNCTION1 = 0; |
| noOfExecutionsCompleted_TESTFUNCTION1 = 0; |
| resultReceived_TESTFUNCTION1 = 0; |
| |
| noOfExecutionCalls_TESTFUNCTION2 = 0; |
| noOfExecutionsCompleted_TESTFUNCTION2 = 0; |
| resultReceived_TESTFUNCTION2 = 0; |
| |
| noOfExecutionCalls_TESTFUNCTION3 = 0; |
| noOfExecutionsCompleted_TESTFUNCTION3 = 0; |
| resultReceived_TESTFUNCTION3 = 0; |
| |
| noOfExecutionCalls_TESTFUNCTION5 = 0; |
| noOfExecutionsCompleted_TESTFUNCTION5 = 0; |
| resultReceived_TESTFUNCTION5 = 0; |
| |
| noOfExecutionCalls_Inline = 0; |
| noOfExecutionsCompleted_Inline = 0; |
| resultReceived_Inline = 0; |
| }; |
| |
| private final transient SerializableRunnableIF closeDistributedSystem = () -> { |
| if (getCache() != null && !getCache().isClosed()) { |
| getCache().close(); |
| getCache().getDistributedSystem().disconnect(); |
| } |
| }; |
| |
| /* |
| * This helper method prevents race conditions in local functions. Typically, when calling |
| * ResultCollector.getResult() one might expect the function to have completed. For local |
| * functions this is true, however, at this point the function stats may not have been updated yet |
| * thus any code which checks stats after calling getResult() may get wrong data. |
| */ |
| private void waitNoFunctionsRunning(FunctionServiceStats stats) { |
| int count = 100; |
| while (stats.getFunctionExecutionsRunning() > 0 && count > 0) { |
| count--; |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException ex) { |
| // Ignored |
| } |
| } |
| } |
| |
| /** |
| * 1-client 3-Servers Function : TEST_FUNCTION2 Function : TEST_FUNCTION3 Execution of the |
| * function on serverRegion with set multiple keys as the routing object and using the name of the |
| * function |
| * |
| * On server side, function execution calls should be equal to the no of function executions |
| * completed. |
| */ |
| @Test |
| public void testClientServerPartitonedRegionFunctionExecutionStats() { |
| createScenario(); |
| registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION2)); |
| registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION3)); |
| |
| client.invoke(initializeStats); |
| server1.invoke(initializeStats); |
| server2.invoke(initializeStats); |
| server3.invoke(initializeStats); |
| |
| client.invoke(() -> { |
| Region<String, Integer> region = cache.getRegion(PartitionedRegionName); |
| assertNotNull(region); |
| final HashSet<String> testKeysSet = new HashSet<>(); |
| for (int i = (totalNumBuckets * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION2); |
| if (shouldRegisterFunctionsOnClient()) { |
| FunctionService.registerFunction(function); |
| } |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| int j = 0; |
| for (String s : testKeysSet) { |
| Integer val = j++; |
| region.put(s, val); |
| } |
| ResultCollector rc = |
| dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId()); |
| int resultSize = ((List) rc.getResult()).size(); |
| resultReceived_Aggregate += resultSize; |
| resultReceived_TESTFUNCTION2 += resultSize; |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionCalls_TESTFUNCTION2++; |
| noOfExecutionsCompleted_Aggregate++; |
| noOfExecutionsCompleted_TESTFUNCTION2++; |
| |
| rc = dataSet.withFilter(testKeysSet).setArguments(testKeysSet).execute(function.getId()); |
| resultSize = ((List) rc.getResult()).size(); |
| resultReceived_Aggregate += resultSize; |
| resultReceived_TESTFUNCTION2 += resultSize; |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionCalls_TESTFUNCTION2++; |
| noOfExecutionsCompleted_Aggregate++; |
| noOfExecutionsCompleted_TESTFUNCTION2++; |
| |
| function = new TestFunction(true, TestFunction.TEST_FUNCTION3); |
| FunctionService.registerFunction(function); |
| rc = dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId()); |
| resultSize = ((List) rc.getResult()).size(); |
| resultReceived_Aggregate += resultSize; |
| resultReceived_TESTFUNCTION3 += resultSize; |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionCalls_TESTFUNCTION3++; |
| noOfExecutionsCompleted_Aggregate++; |
| noOfExecutionsCompleted_TESTFUNCTION3++; |
| |
| } catch (Exception e) { |
| logger.info("Exception : " + e.getMessage()); |
| e.printStackTrace(); |
| fail("Test failed after the put operation"); |
| } |
| }); |
| |
| client.invoke(() -> { |
| // checks for the aggregate stats |
| InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem(); |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| assertTrue(functionServiceStats.getResultsReceived() >= resultReceived_Aggregate); |
| |
| logger.info("Calling FunctionStats for TEST_FUNCTION2 :"); |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS); |
| logger.info("Called FunctionStats for TEST_FUNCTION2 :"); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION2, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION2, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertTrue(functionStats.getResultsReceived() >= resultReceived_TESTFUNCTION2); |
| |
| functionStats = FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION3, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION3, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertTrue(functionStats.getResultsReceived() >= resultReceived_TESTFUNCTION3); |
| }); |
| |
| SerializableRunnableIF checkStatsOnServer = () -> { |
| // checks for the aggregate stats |
| InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem(); |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| // functions are executed 3 times |
| noOfExecutionCalls_Aggregate += 3; |
| assertTrue( |
| functionServiceStats.getFunctionExecutionCalls() >= noOfExecutionCalls_Aggregate); |
| noOfExecutionsCompleted_Aggregate += 3; |
| assertTrue(functionServiceStats |
| .getFunctionExecutionsCompleted() >= noOfExecutionsCompleted_Aggregate); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS); |
| // TEST_FUNCTION2 is executed twice |
| noOfExecutionCalls_TESTFUNCTION2 += 2; |
| assertTrue(functionStats.getFunctionExecutionCalls() >= noOfExecutionCalls_TESTFUNCTION2); |
| noOfExecutionsCompleted_TESTFUNCTION2 += 2; |
| assertTrue(functionStats |
| .getFunctionExecutionsCompleted() >= noOfExecutionsCompleted_TESTFUNCTION2); |
| |
| functionStats = FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS); |
| // TEST_FUNCTION3 is executed once |
| noOfExecutionCalls_TESTFUNCTION3 += 1; |
| assertTrue(functionStats.getFunctionExecutionCalls() >= noOfExecutionCalls_TESTFUNCTION3); |
| noOfExecutionsCompleted_TESTFUNCTION3 += 1; |
| assertTrue(functionStats |
| .getFunctionExecutionsCompleted() >= noOfExecutionsCompleted_TESTFUNCTION3); |
| }; |
| |
| server1.invoke(checkStatsOnServer); |
| server2.invoke(checkStatsOnServer); |
| server3.invoke(checkStatsOnServer); |
| } |
| |
| /** |
| * 1-client 3-Servers server1 : Replicate server2 : Replicate server3 : Replicate client : Empty |
| * Function : TEST_FUNCTION2 Execution of the function on serverRegion with set multiple keys as |
| * the routing object and using the name of the function |
| * |
| * On server side, function execution calls should be equal to the no of function executions |
| * completed. |
| */ |
| @Test |
| public void testClientServerDistributedRegionFunctionExecutionStats() { |
| |
| final String regionName = "FunctionServiceStatsDUnitTest"; |
| SerializableCallableIF<Integer> createCahenServer = () -> { |
| try { |
| Properties props = new Properties(); |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| ds.disconnect(); |
| ds = getSystem(props); |
| cache = CacheFactory.create(ds); |
| logger.info("Created Cache on Server"); |
| assertNotNull(cache); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| assertNotNull(cache); |
| Region<String, Integer> region = cache.createRegion(regionName, factory.create()); |
| logger.info("Region Created :" + region); |
| assertNotNull(region); |
| for (int i = 1; i <= 200; i++) { |
| region.put("execKey-" + i, i); |
| } |
| CacheServer server = cache.addCacheServer(); |
| assertNotNull(server); |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| server.setPort(port); |
| try { |
| server.start(); |
| } catch (IOException e) { |
| Assert.fail("Failed to start the Server", e); |
| } |
| assertTrue(server.isRunning()); |
| return server.getPort(); |
| } catch (Exception e) { |
| Assert.fail("FunctionServiceStatsDUnitTest#createCache() Failed while creating the cache", |
| e); |
| throw e; |
| } |
| }; |
| final Integer port1 = server1.invoke(createCahenServer); |
| final Integer port2 = server2.invoke(createCahenServer); |
| final Integer port3 = server3.invoke(createCahenServer); |
| |
| client.invoke(() -> { |
| try { |
| Properties props = new Properties(); |
| props.put(MCAST_PORT, "0"); |
| props.put(LOCATORS, ""); |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| ds.disconnect(); |
| ds = getSystem(props); |
| cache = CacheFactory.create(ds); |
| logger.info("Created Cache on Client"); |
| assertNotNull(cache); |
| |
| |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| Pool p; |
| try { |
| p = PoolManager.createFactory().addServer("localhost", port1) |
| .addServer("localhost", port2).addServer("localhost", port3) |
| .setPingInterval(250).setSubscriptionEnabled(false).setSubscriptionRedundancy(-1) |
| .setReadTimeout(2000).setSocketBufferSize(1000).setMinConnections(6) |
| .setMaxConnections(10).setRetryAttempts(3) |
| .create("FunctionServiceStatsDUnitTest_pool"); |
| } finally { |
| CacheServerTestUtil.enableShufflingOfEndpoints(); |
| } |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setPoolName(p.getName()); |
| assertNotNull(cache); |
| Region<String, Integer> region = cache.createRegion(regionName, factory.create()); |
| logger.info("Client Region Created :" + region); |
| assertNotNull(region); |
| for (int i = 1; i <= 200; i++) { |
| region.put("execKey-" + i, i); |
| } |
| } catch (Exception e) { |
| Assert.fail("FunctionServiceStatsDUnitTest#createCache() Failed while creating the cache", |
| e); |
| throw e; |
| } |
| }); |
| |
| client.invoke(initializeStats); |
| server1.invoke(initializeStats); |
| server2.invoke(initializeStats); |
| server3.invoke(initializeStats); |
| |
| registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION2)); |
| registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION3)); |
| |
| client.invoke(() -> { |
| Function function2 = new TestFunction(true, TestFunction.TEST_FUNCTION2); |
| Function function3 = new TestFunction(true, TestFunction.TEST_FUNCTION3); |
| if (shouldRegisterFunctionsOnClient()) { |
| FunctionService.registerFunction(function2); |
| FunctionService.registerFunction(function3); |
| } |
| Region region = cache.getRegion(regionName); |
| Set<String> filter = new HashSet<>(); |
| for (int i = 100; i < 120; i++) { |
| filter.add("execKey-" + i); |
| } |
| |
| try { |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionCalls_TESTFUNCTION2++; |
| List list = (List) FunctionService.onRegion(region).withFilter(filter) |
| .execute(function2).getResult(); |
| noOfExecutionsCompleted_Aggregate++; |
| noOfExecutionsCompleted_TESTFUNCTION2++; |
| int size = list.size(); |
| resultReceived_Aggregate += size; |
| resultReceived_TESTFUNCTION2 += size; |
| |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionCalls_TESTFUNCTION2++; |
| list = (List) FunctionService.onRegion(region).withFilter(filter).execute(function2) |
| .getResult(); |
| noOfExecutionsCompleted_Aggregate++; |
| noOfExecutionsCompleted_TESTFUNCTION2++; |
| size = list.size(); |
| resultReceived_Aggregate += size; |
| resultReceived_TESTFUNCTION2 += size; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail("test failed due to", e); |
| throw e; |
| } |
| |
| }); |
| |
| client.invoke(() -> { |
| // checks for the aggregate stats |
| InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem(); |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived()); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION2, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION2, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_TESTFUNCTION2, functionStats.getResultsReceived()); |
| }); |
| } |
| |
| /** |
| * Execution of the function on server using the name of the function TEST_FUNCTION1 |
| * TEST_FUNCTION5 On client side, the no of result received should equal to the no of function |
| * execution calls. On server side, function execution calls should be equal to the no of function |
| * executions completed. |
| */ |
| @Test |
| public void testClientServerwithoutRegion() { |
| createClientServerScenarionWithoutRegion(); |
| registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION1)); |
| registerFunctionAtServer(new TestFunction(true, TestFunction.TEST_FUNCTION5)); |
| |
| client.invoke(initializeStats); |
| server1.invoke(initializeStats); |
| server2.invoke(initializeStats); |
| server3.invoke(initializeStats); |
| |
| client.invoke(() -> { |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION1); |
| if (shouldRegisterFunctionsOnClient()) { |
| FunctionService.registerFunction(function); |
| } |
| Execution member = FunctionService.onServers(pool); |
| |
| try { |
| ResultCollector rs = member.setArguments(Boolean.TRUE).execute(function.getId()); |
| int size = ((List) rs.getResult()).size(); |
| resultReceived_Aggregate += size; |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionsCompleted_Aggregate++; |
| resultReceived_TESTFUNCTION1 += size; |
| noOfExecutionCalls_TESTFUNCTION1++; |
| noOfExecutionsCompleted_TESTFUNCTION1++; |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| logger.info("Exception : ", ex); |
| fail("Test failed after the execute operation nn TRUE"); |
| } |
| function = new TestFunction(true, TestFunction.TEST_FUNCTION5); |
| if (shouldRegisterFunctionsOnClient()) { |
| FunctionService.registerFunction(function); |
| } |
| try { |
| ResultCollector rs = member.setArguments("Success").execute(function.getId()); |
| int size = ((List) rs.getResult()).size(); |
| resultReceived_Aggregate += size; |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionsCompleted_Aggregate++; |
| resultReceived_TESTFUNCTION5 += size; |
| noOfExecutionCalls_TESTFUNCTION5++; |
| noOfExecutionsCompleted_TESTFUNCTION5++; |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| logger.info("Exception : ", ex); |
| fail("Test failed after the execute operationssssss"); |
| } |
| }); |
| |
| |
| client.invoke(() -> { |
| // checks for the aggregate stats |
| InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem(); |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived()); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION1, iDS); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION1, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION1, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_TESTFUNCTION1, functionStats.getResultsReceived()); |
| |
| functionStats = FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION5, iDS); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION5, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION5, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_TESTFUNCTION5, functionStats.getResultsReceived()); |
| |
| }); |
| |
| SerializableRunnableIF checkStatsOnServer = () -> { |
| // checks for the aggregate stats |
| InternalDistributedSystem iDS = (InternalDistributedSystem) cache.getDistributedSystem(); |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| // functions are executed 2 times |
| noOfExecutionCalls_Aggregate += 2; |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| noOfExecutionsCompleted_Aggregate += 2; |
| // this check is time sensitive, so allow it to fail a few times |
| // before giving up |
| for (int i = 0; i < 10; i++) { |
| try { |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| } catch (RuntimeException r) { |
| if (i == 9) { |
| throw r; |
| } |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw r; |
| } |
| } |
| } |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION1, iDS); |
| // TEST_FUNCTION1 is executed once |
| noOfExecutionCalls_TESTFUNCTION1 += 1; |
| assertEquals(noOfExecutionCalls_TESTFUNCTION1, functionStats.getFunctionExecutionCalls()); |
| noOfExecutionsCompleted_TESTFUNCTION1 += 1; |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION1, |
| functionStats.getFunctionExecutionsCompleted()); |
| |
| functionStats = FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION5, iDS); |
| // TEST_FUNCTION5 is executed once |
| noOfExecutionCalls_TESTFUNCTION5 += 1; |
| assertEquals(noOfExecutionCalls_TESTFUNCTION5, functionStats.getFunctionExecutionCalls()); |
| noOfExecutionsCompleted_TESTFUNCTION5 += 1; |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION5, |
| functionStats.getFunctionExecutionsCompleted()); |
| |
| }; |
| |
| server1.invoke(checkStatsOnServer); |
| server2.invoke(checkStatsOnServer); |
| server3.invoke(checkStatsOnServer); |
| } |
| |
| @Test |
| public void testP2PDummyExecutionStats() { |
| Host host = Host.getHost(0); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| final VM accessor = host.getVM(3); |
| accessor.invoke(closeDistributedSystem); |
| datastore0.invoke(closeDistributedSystem); |
| datastore1.invoke(closeDistributedSystem); |
| datastore2.invoke(closeDistributedSystem); |
| } |
| |
| /** |
| * Ensure that the execution is happening all the PR as a whole |
| * |
| * Function Execution will not take place on accessor, accessor will onlu receive the |
| * resultsReceived. On datastore, no of function execution calls should be equal to the no of |
| * function execution calls from the accessor. |
| */ |
| @Test |
| public void testP2PPartitionedRegionsFunctionExecutionStats() { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| final VM accessor = host.getVM(3); |
| |
| datastore0.invoke(initializeStats); |
| datastore1.invoke(initializeStats); |
| datastore2.invoke(initializeStats); |
| accessor.invoke(initializeStats); |
| |
| accessor.invoke(() -> { |
| RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 0); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| pa.setTotalNumBuckets(17); |
| raf.setPartitionAttributes(pa); |
| |
| getCache().createRegion(rName, raf.create()); |
| }); |
| |
| SerializableRunnableIF dataStoreCreate = () -> { |
| RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| pa.setTotalNumBuckets(17); |
| raf.setPartitionAttributes(pa); |
| getCache().createRegion(rName, raf.create()); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| function = new TestFunction(true, TestFunction.TEST_FUNCTION3); |
| FunctionService.registerFunction(function); |
| }; |
| datastore0.invoke(dataStoreCreate); |
| datastore1.invoke(dataStoreCreate); |
| datastore2.invoke(dataStoreCreate); |
| |
| accessor.invoke(() -> { |
| PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(rName); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| final HashSet<String> testKeys = new HashSet<>(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) { |
| testKeys.add("execKey-" + i); |
| } |
| int j = 0; |
| for (Object testKey : testKeys) { |
| Integer val = j++; |
| pr.put(testKey, val); |
| } |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| ResultCollector rc1 = dataSet.setArguments(Boolean.TRUE).execute(function); |
| int size = ((List) rc1.getResult()).size(); |
| resultReceived_Aggregate += size; |
| resultReceived_TESTFUNCTION2 += size; |
| |
| rc1 = dataSet.setArguments(testKeys).execute(function); |
| size = ((List) rc1.getResult()).size(); |
| resultReceived_Aggregate += size; |
| resultReceived_TESTFUNCTION2 += size; |
| |
| function = new TestFunction(true, TestFunction.TEST_FUNCTION3); |
| FunctionService.registerFunction(function); |
| rc1 = dataSet.setArguments(Boolean.TRUE).execute(function); |
| size = ((List) rc1.getResult()).size(); |
| resultReceived_Aggregate += size; |
| resultReceived_TESTFUNCTION3 += size; |
| }); |
| |
| accessor.invoke(() -> { |
| InternalDistributedSystem iDS = |
| ((InternalDistributedSystem) getCache().getDistributedSystem()); |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived()); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION2, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION2, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_TESTFUNCTION2, functionStats.getResultsReceived()); |
| |
| functionStats = FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS); |
| assertEquals(noOfExecutionCalls_TESTFUNCTION3, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION3, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_TESTFUNCTION3, functionStats.getResultsReceived()); |
| }); |
| |
| SerializableRunnableIF checkFunctionExecutionStatsForDataStore = () -> { |
| InternalDistributedSystem iDS = |
| ((InternalDistributedSystem) getCache().getDistributedSystem()); |
| // 3 Function Executions took place |
| FunctionServiceStats functionServiceStats = |
| iDS.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| noOfExecutionCalls_Aggregate += 3; |
| noOfExecutionsCompleted_Aggregate += 3; |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION2, iDS); |
| // TEST_FUNCTION2 is executed twice |
| noOfExecutionCalls_TESTFUNCTION2 += 2; |
| assertEquals(noOfExecutionCalls_TESTFUNCTION2, |
| functionStats.getFunctionExecutionCalls()); |
| noOfExecutionsCompleted_TESTFUNCTION2 += 2; |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION2, |
| functionStats.getFunctionExecutionsCompleted()); |
| |
| functionStats = FunctionStatsManager.getFunctionStats(TestFunction.TEST_FUNCTION3, iDS); |
| // TEST_FUNCTION3 is executed once |
| noOfExecutionCalls_TESTFUNCTION3 += 1; |
| assertEquals(noOfExecutionCalls_TESTFUNCTION3, |
| functionStats.getFunctionExecutionCalls()); |
| noOfExecutionsCompleted_TESTFUNCTION3 += 1; |
| assertEquals(noOfExecutionsCompleted_TESTFUNCTION3, |
| functionStats.getFunctionExecutionsCompleted()); |
| }; |
| datastore0.invoke(checkFunctionExecutionStatsForDataStore); |
| datastore1.invoke(checkFunctionExecutionStatsForDataStore); |
| datastore2.invoke(checkFunctionExecutionStatsForDataStore); |
| |
| accessor.invoke(closeDistributedSystem); |
| datastore0.invoke(closeDistributedSystem); |
| datastore1.invoke(closeDistributedSystem); |
| datastore2.invoke(closeDistributedSystem); |
| } |
| |
| /** |
| * Test the function execution statistics in case of the distributed Region P2P DataStore0 is with |
| * Empty datapolicy |
| */ |
| @Test |
| public void testP2PDistributedRegionFunctionExecutionStats() { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| final VM datastore3 = host.getVM(3); |
| |
| datastore0.invoke(initializeStats); |
| datastore1.invoke(initializeStats); |
| datastore2.invoke(initializeStats); |
| datastore3.invoke(initializeStats); |
| |
| datastore0.invoke(() -> { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| Region<String, Integer> region = getCache().createRegion(rName, factory.create()); |
| logger.info("Region Created :" + region); |
| assertNotNull(region); |
| FunctionService.registerFunction(new TestFunction(true, TestFunction.TEST_FUNCTION2)); |
| for (int i = 1; i <= 200; i++) { |
| region.put("execKey-" + i, i); |
| } |
| }); |
| |
| SerializableRunnableIF createAndPopulateRegionWithReplicate = () -> { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| Region<String, Integer> region = getCache().createRegion(rName, factory.create()); |
| logger.info("Region Created :" + region); |
| assertNotNull(region); |
| FunctionService.registerFunction(new TestFunction(true, TestFunction.TEST_FUNCTION2)); |
| for (int i = 1; i <= 200; i++) { |
| region.put("execKey-" + i, i); |
| } |
| }; |
| |
| datastore1.invoke(createAndPopulateRegionWithReplicate); |
| datastore2.invoke(createAndPopulateRegionWithReplicate); |
| datastore3.invoke(createAndPopulateRegionWithReplicate); |
| |
| datastore0.invoke(() -> { |
| Region region = getCache().getRegion(rName); |
| try { |
| List list = (List) FunctionService.onRegion(region).setArguments(Boolean.TRUE) |
| .execute(TestFunction.TEST_FUNCTION2).getResult(); |
| // this is the Distributed Region with Empty Data policy. |
| // therefore no function execution takes place here. it only receives the results. |
| resultReceived_Aggregate += list.size(); |
| assertEquals(resultReceived_Aggregate, |
| ((InternalDistributedSystem) getCache().getDistributedSystem()) |
| .getFunctionStatsManager().getFunctionServiceStats().getResultsReceived()); |
| |
| resultReceived_TESTFUNCTION2 += list.size(); |
| assertEquals(resultReceived_TESTFUNCTION2, |
| ((InternalDistributedSystem) getCache().getDistributedSystem()) |
| .getFunctionStatsManager().getFunctionServiceStats().getResultsReceived()); |
| |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail("test failed due to", e); |
| } |
| }); |
| |
| // there is a replicated region on 3 nodes so we cannot predict on which |
| // node the function execution will take place |
| // so i have avoided that check. |
| |
| datastore0.invoke(closeDistributedSystem); |
| datastore1.invoke(closeDistributedSystem); |
| datastore2.invoke(closeDistributedSystem); |
| datastore3.invoke(closeDistributedSystem); |
| } |
| |
| |
| /** |
| * Test the execution of function on all memebers haveResults = true |
| * |
| * member1 calls for the function executions sp the results received on memeber 1 should be equal |
| * to the no of function execution calls. Function Execution should happen on all other members |
| * too. so the no of function execution calls and no of function executions completed should be |
| * equal tio the no of functions from member 1 |
| */ |
| @Test |
| public void testP2PMembersFunctionExecutionStats() { |
| Host host = Host.getHost(0); |
| VM member1 = host.getVM(0); |
| VM member2 = host.getVM(1); |
| VM member3 = host.getVM(2); |
| VM member4 = host.getVM(3); |
| |
| SerializableRunnableIF connectToDistributedSystem = () -> { |
| Properties props = new Properties(); |
| try { |
| ds = getSystem(props); |
| assertNotNull(ds); |
| } catch (Exception e) { |
| Assert.fail("Failed while creating the Distribued System", e); |
| } |
| }; |
| member1.invoke(connectToDistributedSystem); |
| member2.invoke(connectToDistributedSystem); |
| member3.invoke(connectToDistributedSystem); |
| member4.invoke(connectToDistributedSystem); |
| |
| member1.invoke(initializeStats); |
| member2.invoke(initializeStats); |
| member3.invoke(initializeStats); |
| member4.invoke(initializeStats); |
| |
| final Function inlineFunction = new FunctionAdapter() { |
| @Override |
| public void execute(FunctionContext context) { |
| @SuppressWarnings("unchecked") |
| final ResultSender<String> resultSender = context.getResultSender(); |
| if (context.getArguments() instanceof String) { |
| resultSender.lastResult("Success"); |
| } else { |
| resultSender.lastResult("Failure"); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return getClass().getName(); |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| }; |
| |
| member1.invoke(() -> { |
| |
| assertNotNull(ds); |
| DistributedMember localmember = ds.getDistributedMember(); |
| Execution memberExecution = FunctionService.onMember(localmember); |
| |
| memberExecution.setArguments("Key"); |
| try { |
| ResultCollector rc = memberExecution.execute(inlineFunction); |
| int size = ((List) rc.getResult()).size(); |
| resultReceived_Aggregate += size; |
| noOfExecutionCalls_Aggregate++; |
| noOfExecutionsCompleted_Aggregate++; |
| resultReceived_Inline += size; |
| noOfExecutionCalls_Inline++; |
| noOfExecutionsCompleted_Inline++; |
| |
| } catch (Exception e) { |
| logger.info("Exception Occurred : " + e.getMessage()); |
| e.printStackTrace(); |
| Assert.fail("Test failed", e); |
| } |
| }); |
| |
| member1.invoke(() -> { |
| FunctionServiceStats functionServiceStats = |
| ds.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_Aggregate, functionServiceStats.getResultsReceived()); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(inlineFunction.getId(), ds); |
| assertEquals(noOfExecutionCalls_Inline, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Inline, |
| functionStats.getFunctionExecutionsCompleted()); |
| assertEquals(resultReceived_Inline, functionStats.getResultsReceived()); |
| }); |
| |
| SerializableRunnableIF checkFunctionExecutionStatsForOtherMember = () -> { |
| FunctionServiceStats functionServiceStats = |
| ds.getFunctionStatsManager().getFunctionServiceStats(); |
| waitNoFunctionsRunning(functionServiceStats); |
| |
| // One function Execution took place on there members |
| // noOfExecutionCalls_Aggregate++; |
| // noOfExecutionsCompleted_Aggregate++; |
| assertEquals(noOfExecutionCalls_Aggregate, |
| functionServiceStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Aggregate, |
| functionServiceStats.getFunctionExecutionsCompleted()); |
| |
| FunctionStats functionStats = |
| FunctionStatsManager.getFunctionStats(inlineFunction.getId(), ds); |
| // noOfExecutionCalls_Inline++; |
| // noOfExecutionsCompleted_Inline++; |
| assertEquals(noOfExecutionCalls_Inline, functionStats.getFunctionExecutionCalls()); |
| assertEquals(noOfExecutionsCompleted_Inline, |
| functionStats.getFunctionExecutionsCompleted()); |
| }; |
| member2.invoke(checkFunctionExecutionStatsForOtherMember); |
| member3.invoke(checkFunctionExecutionStatsForOtherMember); |
| member4.invoke(checkFunctionExecutionStatsForOtherMember); |
| |
| member1.invoke(closeDistributedSystem); |
| member2.invoke(closeDistributedSystem); |
| member3.invoke(closeDistributedSystem); |
| member4.invoke(closeDistributedSystem); |
| } |
| |
| @Override |
| public Properties getDistributedSystemProperties() { |
| Properties properties = super.getDistributedSystemProperties(); |
| properties.put(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER, |
| "org.apache.geode.internal.cache.execute.MyFunctionExecutionException"); |
| return properties; |
| } |
| |
| /** |
| * Test the exception occurred while invoking the function execution on all members of DS |
| * |
| * Function throws the Exception, The check is added to for the no of function execution execption |
| * in datatostore1 |
| */ |
| @Test |
| public void testFunctionExecutionExceptionStatsOnAllNodesPRegion() { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| final VM datastore3 = host.getVM(3); |
| |
| datastore0.invoke(initializeStats); |
| datastore1.invoke(initializeStats); |
| datastore2.invoke(initializeStats); |
| datastore3.invoke(initializeStats); |
| |
| SerializableRunnableIF dataStoreCreate = () -> { |
| RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| pa.setTotalNumBuckets(17); |
| raf.setPartitionAttributes(pa); |
| getCache().createRegion(rName, raf.create()); |
| Function function = new TestFunction(true, "TestFunctionException"); |
| FunctionService.registerFunction(function); |
| }; |
| datastore0.invoke(dataStoreCreate); |
| datastore1.invoke(dataStoreCreate); |
| datastore2.invoke(dataStoreCreate); |
| datastore3.invoke(dataStoreCreate); |
| |
| datastore3.invoke(() -> { |
| PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(rName); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| final HashSet<String> testKeys = new HashSet<>(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) { |
| testKeys.add("execKey-" + i); |
| } |
| int j = 0; |
| for (Object testKey : testKeys) { |
| Integer key = j++; |
| pr.put(key, testKey); |
| } |
| try { |
| Function function = new TestFunction(true, "TestFunctionException"); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| ResultCollector rc = dataSet.setArguments(Boolean.TRUE).execute(function.getId()); |
| // Wait Criterion is added to make sure that the function execution |
| // happens on all nodes and all nodes get the FunctionException so that the stats will get |
| // incremented, |
| Wait.pause(2000); |
| rc.getResult(); |
| fail("No exception Occurred"); |
| } catch (Exception ignored) { |
| } |
| }); |
| |
| datastore0.invoke(closeDistributedSystem); |
| datastore1.invoke(closeDistributedSystem); |
| datastore2.invoke(closeDistributedSystem); |
| datastore3.invoke(closeDistributedSystem); |
| } |
| |
| private void createScenario() { |
| ArrayList commonAttributes = |
| createCommonServerAttributes("TestPartitionedRegion", null, 0, null); |
| createClientServerScenarion(commonAttributes, 20, 20, 20); |
| } |
| } |