| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.execute; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EntryOperation; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.PartitionResolver; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.execute.Execution; |
| import com.gemstone.gemfire.cache.execute.Function; |
| import com.gemstone.gemfire.cache.execute.FunctionException; |
| import com.gemstone.gemfire.cache.execute.FunctionService; |
| import com.gemstone.gemfire.cache.execute.ResultCollector; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.internal.cache.PartitionAttributesImpl; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionDUnitTestCase; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionTestHelper; |
| import com.gemstone.gemfire.internal.cache.functions.TestFunction; |
| |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.VM; |
| |
| public class PRFunctionExecutionWithResultSenderDUnitTest extends |
| PartitionedRegionDUnitTestCase { |
| private static final String TEST_FUNCTION7 = TestFunction.TEST_FUNCTION7; |
| |
| private static final String TEST_FUNCTION2 = TestFunction.TEST_FUNCTION2; |
| |
| private static final long serialVersionUID = 1L; |
| |
| public PRFunctionExecutionWithResultSenderDUnitTest(String name) { |
| super(name); |
| } |
| |
| /** |
| * Test remote execution by a pure accessor which doesn't have the function |
| * factory present. |
| * |
| * @throws Exception |
| */ |
| public void testRemoteSingleKeyExecution_byName() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM accessor = host.getVM(2); |
| final VM datastore = host.getVM(3); |
| final Cache c = getCache(); |
| accessor.invoke(new SerializableCallable("Create PR") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 0); |
| |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, ra); |
| return Boolean.TRUE; |
| } |
| }); |
| |
| datastore |
| .invoke(new SerializableCallable("Create PR with Function Factory") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| raf.setPartitionAttributes(pa); |
| |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, raf.create()); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| return Boolean.TRUE; |
| } |
| }); |
| |
| Object o = accessor.invoke(new SerializableCallable( |
| "Create data, invoke exectuable") { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getCache().getRegion(rName); |
| |
| final String testKey = "execKey"; |
| final Set testKeysSet = new HashSet(); |
| testKeysSet.add(testKey); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| try { |
| dataSet.withFilter(testKeysSet).withArgs(Boolean.TRUE).execute( |
| function.getId()); |
| } |
| catch (Exception expected) { |
| // No data should cause exec to throw |
| assertTrue(expected.getMessage().contains( |
| "No target node found for KEY = " + testKey)); |
| } |
| pr.put(testKey, new Integer(1)); |
| pr.put(testKey+"3", new Integer(3)); |
| pr.put(testKey+"4", new Integer(4)); |
| testKeysSet.add(testKey + "3"); |
| testKeysSet.add(testKey + "4"); |
| |
| ResultCollector rs1 = dataSet.withFilter(testKeysSet).withArgs( |
| Boolean.TRUE).execute(function.getId()); |
| assertEquals(Boolean.TRUE, ((List)rs1.getResult()).get(0)); |
| ResultCollector rs2 = dataSet.withFilter(testKeysSet).withArgs((Serializable)testKeysSet) |
| .execute(function.getId()); |
| |
| HashMap putData = new HashMap(); |
| putData.put(testKey + "1", new Integer(2)); |
| putData.put(testKey + "2", new Integer(3)); |
| ResultCollector rs3 = dataSet.withFilter(testKeysSet).withArgs(putData) |
| .execute(function.getId()); |
| assertEquals(Boolean.TRUE, ((List)rs3.getResult()).get(0)); |
| |
| assertEquals(new Integer(2), pr.get(testKey + "1")); |
| assertEquals(new Integer(3), pr.get(testKey + "2")); |
| return Boolean.TRUE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| |
| /** |
| * Test remote execution by a pure accessor which doesn't have the function |
| * factory present And the function doesn't send last result. |
| * FunctionException is expected in this case |
| * |
| * @throws Exception |
| */ |
| public void testRemoteExecution_NoLastResult() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM accessor = host.getVM(0); |
| final VM datastore = host.getVM(1); |
| final Cache c = getCache(); |
| accessor.invoke(new SerializableCallable("Create PR") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 0); |
| |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, ra); |
| return Boolean.TRUE; |
| } |
| }); |
| |
| datastore |
| .invoke(new SerializableCallable("Create PR with Function Factory") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| raf.setPartitionAttributes(pa); |
| |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, raf.create()); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NO_LASTRESULT); |
| FunctionService.registerFunction(function); |
| return Boolean.TRUE; |
| } |
| }); |
| |
| Object o = accessor.invoke(new SerializableCallable( |
| "Create data, invoke exectuable") { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getCache().getRegion(rName); |
| |
| final String testKey = "execKey"; |
| final Set testKeysSet = new HashSet(); |
| testKeysSet.add(testKey); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NO_LASTRESULT); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| |
| pr.put(testKey, new Integer(1)); |
| pr.put(testKey+"3", new Integer(3)); |
| pr.put(testKey+"4", new Integer(4)); |
| testKeysSet.add(testKey + "3"); |
| testKeysSet.add(testKey + "4"); |
| |
| ResultCollector rs1 = dataSet.withFilter(testKeysSet).withArgs( |
| Boolean.TRUE).execute(function.getId()); |
| try{ |
| assertEquals(Boolean.TRUE, ((List)rs1.getResult()).get(0)); |
| fail("Expected FunctionException : Function did not send last result"); |
| }catch(FunctionException fe){ |
| assertTrue(fe.getMessage().contains( |
| "did not send last result")); |
| return Boolean.TRUE; |
| } |
| return Boolean.FALSE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| |
| /** |
| * Test multi-key remote execution by a pure accessor which doesn't have the |
| * function factory present. |
| * ResultCollector = DefaultResultCollector |
| * haveResults = true; |
| * |
| * @throws Exception |
| */ |
| public void testRemoteMultiKeyExecution_byName() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM accessor = host.getVM(3); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| |
| final Cache c = getCache(); |
| accessor.invoke(new SerializableCallable("Create PR") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 0); |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, ra); |
| return Boolean.TRUE; |
| } |
| }); |
| |
| SerializableCallable dataStoreCreate = new SerializableCallable( |
| "Create PR with Function Factory") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| raf.setPartitionAttributes(pa); |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, raf.create()); |
| Function function = new TestFunction(true,TestFunction.TEST_FUNCTION9); |
| FunctionService.registerFunction(function); |
| return Boolean.TRUE; |
| } |
| }; |
| datastore0.invoke(dataStoreCreate); |
| datastore1.invoke(dataStoreCreate); |
| datastore2.invoke(dataStoreCreate); |
| |
| Object o = accessor.invoke(new SerializableCallable( |
| "Create data, invoke exectuable") { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getCache().getRegion(rName); |
| |
| final HashSet testKeysSet = new HashSet(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 2); i > 0; i--) { |
| testKeysSet.add("execKey-" + i); |
| } |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true,TestFunction.TEST_FUNCTION9); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| |
| try { |
| dataSet.withFilter(testKeysSet).withArgs(Boolean.TRUE).execute( |
| function.getId()); |
| } |
| catch (Exception expected) { |
| assertTrue(expected.getMessage().contains( |
| "No target node found for KEY = ")); |
| } |
| |
| int j = 0; |
| HashSet origVals = new HashSet(); |
| for (Iterator i = testKeysSet.iterator(); i.hasNext();) { |
| Integer val = new Integer(j++); |
| origVals.add(val); |
| pr.put(i.next(), val); |
| } |
| ResultCollector rs = dataSet.withFilter(testKeysSet).withArgs(Boolean.TRUE).execute(function.getId()); |
| List l = ((List)rs.getResult()); |
| |
| assertEquals(3, l.size()); |
| |
| for (Iterator i = l.iterator(); i.hasNext();) { |
| assertEquals(Boolean.TRUE, i.next()); |
| } |
| |
| ResultCollector rc2 = dataSet.withFilter(testKeysSet).withArgs(testKeysSet) |
| .execute(function.getId()); |
| List l2 = ((List)rc2.getResult()); |
| assertEquals(pr.getTotalNumberOfBuckets() * 2 * 3, l2.size()); |
| |
| return Boolean.TRUE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| |
| } |
| |
| /** |
| * Test ability to execute a multi-key function by a local data store |
| * ResultCollector = DefaultResultCollector |
| * haveResult = true |
| * @throws Exception |
| */ |
| public void testLocalMultiKeyExecution_byName() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| VM localOnly = host.getVM(3); |
| final Cache c = getCache(); |
| Object o = localOnly.invoke(new SerializableCallable( |
| "Create PR, validate local execution)") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| raf.setPartitionAttributes(pa); |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, raf.create()); |
| final String testKey = "execKey"; |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true,TestFunction.TEST_FUNCTION9); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| final HashSet testKeysSet = new HashSet(); |
| testKeysSet.add(testKey); |
| try { |
| dataSet.withFilter(testKeysSet).withArgs(Boolean.TRUE).execute(function.getId()); |
| } |
| catch (Exception expected) { |
| // No data should cause exec to throw |
| assertTrue(expected.getMessage().contains( |
| "No target node found for KEY = " + testKey)); |
| } |
| |
| final HashSet testKeys = new HashSet(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 2); i > 0; i--) { |
| testKeys.add("execKey-" + i); |
| } |
| |
| int j = 0; |
| HashSet origVals = new HashSet(); |
| for (Iterator i = testKeys.iterator(); i.hasNext();) { |
| Integer val = new Integer(j++); |
| origVals.add(val); |
| pr.put(i.next(), val); |
| } |
| |
| ResultCollector rc1 = dataSet.withFilter(testKeys).withArgs(Boolean.TRUE) |
| .execute(function.getId()); |
| List l = ((List)rc1.getResult()); |
| assertEquals(1, l.size()); |
| for (Iterator i = l.iterator(); i.hasNext();) { |
| assertEquals(Boolean.TRUE, i.next()); |
| } |
| |
| ResultCollector rc2 = dataSet.withFilter(testKeys).withArgs(testKeys) |
| .execute(function.getId()); |
| List l2 = ((List)rc2.getResult()); |
| assertEquals(226, l2.size()); |
| |
| return Boolean.TRUE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| |
| /** |
| * Test local execution on datastore with function that doesn't send last result. |
| * FunctionException is expected in this case |
| * |
| * @throws Exception |
| */ |
| public void testLocalExecution_NoLastResult() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| VM localOnly = host.getVM(0); |
| final Cache c = getCache(); |
| Object o = localOnly.invoke(new SerializableCallable( |
| "Create PR, validate local execution)") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| raf.setPartitionAttributes(pa); |
| PartitionedRegion pr = (PartitionedRegion)getCache().createRegion( |
| rName, raf.create()); |
| final String testKey = "execKey"; |
| DistributedSystem.setThreadsSocketPolicy(false); |
| Function function = new TestFunction(true,TestFunction.TEST_FUNCTION_NO_LASTRESULT); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| |
| final HashSet testKeys = new HashSet(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 2); i > 0; i--) { |
| testKeys.add("execKey-" + i); |
| } |
| |
| int j = 0; |
| HashSet origVals = new HashSet(); |
| for (Iterator i = testKeys.iterator(); i.hasNext();) { |
| Integer val = new Integer(j++); |
| origVals.add(val); |
| pr.put(i.next(), val); |
| } |
| |
| ResultCollector rc1 = dataSet.withFilter(testKeys).withArgs(Boolean.TRUE) |
| .execute(function.getId()); |
| try{ |
| assertEquals(Boolean.TRUE, ((List)rc1.getResult()).get(0)); |
| fail("Expected FunctionException : Function did not send last result"); |
| }catch(Exception fe){ |
| assertTrue(fe.getMessage().contains( |
| "did not send last result")); |
| return Boolean.TRUE; |
| } |
| return Boolean.FALSE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| |
| /** |
| * |
| * Test execution on all datastores with function that doesn't send last result. |
| * FunctionException is expected in this case |
| * |
| * @throws Exception |
| */ |
| public void testExecutionOnAllNodes_NoLastResult() |
| throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| final VM datastore3 = host.getVM(3); |
| getCache(); |
| SerializableCallable dataStoreCreate = new SerializableCallable( |
| "Create PR with Function Factory") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| pa.setTotalNumBuckets(17); |
| raf.setPartitionAttributes(pa); |
| getCache().createRegion( |
| rName, raf.create()); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NO_LASTRESULT); |
| FunctionService.registerFunction(function); |
| return Boolean.TRUE; |
| } |
| }; |
| datastore0.invoke(dataStoreCreate); |
| datastore1.invoke(dataStoreCreate); |
| datastore2.invoke(dataStoreCreate); |
| datastore3.invoke(dataStoreCreate); |
| |
| Object o = datastore3.invoke(new SerializableCallable( |
| "Create data, invoke exectuable") { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getCache().getRegion(rName); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| final HashSet testKeys = new HashSet(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) { |
| testKeys.add("execKey-" + i); |
| } |
| int j = 0; |
| for (Iterator i = testKeys.iterator(); i.hasNext();) { |
| Integer val = new Integer(j++); |
| pr.put(i.next(), val); |
| } |
| // Assert there is data in each bucket |
| for (int bid = 0; bid < pr.getTotalNumberOfBuckets(); bid++) { |
| assertTrue(pr.getBucketKeys(bid).size() > 0); |
| } |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_NO_LASTRESULT); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| ResultCollector rc1 = dataSet.withArgs(Boolean.TRUE) |
| .execute(function.getId()); |
| try{ |
| assertEquals(Boolean.TRUE, ((List)rc1.getResult()).get(0)); |
| fail("Expected FunctionException : Function did not send last result"); |
| }catch(Exception fe){ |
| assertTrue(fe.getMessage().contains( |
| "did not send last result")); |
| return Boolean.TRUE; |
| } |
| return Boolean.FALSE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| |
| public void testExecutionOnAllNodes_byName() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM datastore0 = host.getVM(0); |
| final VM datastore1 = host.getVM(1); |
| final VM datastore2 = host.getVM(2); |
| final VM datastore3 = host.getVM(3); |
| getCache(); |
| SerializableCallable dataStoreCreate = new SerializableCallable( |
| "Create PR with Function Factory") { |
| public Object call() throws Exception { |
| RegionAttributes ra = PartitionedRegionTestHelper |
| .createRegionAttrsForPR(0, 10); |
| AttributesFactory raf = new AttributesFactory(ra); |
| PartitionAttributesImpl pa = new PartitionAttributesImpl(); |
| pa.setAll(ra.getPartitionAttributes()); |
| pa.setTotalNumBuckets(17); |
| raf.setPartitionAttributes(pa); |
| getCache().createRegion(rName, raf.create()); |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION9); |
| FunctionService.registerFunction(function); |
| return Boolean.TRUE; |
| } |
| }; |
| datastore0.invoke(dataStoreCreate); |
| datastore1.invoke(dataStoreCreate); |
| datastore2.invoke(dataStoreCreate); |
| datastore3.invoke(dataStoreCreate); |
| |
| Object o = datastore3.invoke(new SerializableCallable( |
| "Create data, invoke exectuable") { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getCache().getRegion(rName); |
| DistributedSystem.setThreadsSocketPolicy(false); |
| final HashSet testKeys = new HashSet(); |
| for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) { |
| testKeys.add("execKey-" + i); |
| } |
| int j = 0; |
| for (Iterator i = testKeys.iterator(); i.hasNext();) { |
| Integer val = new Integer(j++); |
| pr.put(i.next(), val); |
| } |
| // Assert there is data in each bucket |
| for (int bid = 0; bid < pr.getTotalNumberOfBuckets(); bid++) { |
| assertTrue(pr.getBucketKeys(bid).size() > 0); |
| } |
| Function function = new TestFunction(true, TestFunction.TEST_FUNCTION9); |
| FunctionService.registerFunction(function); |
| Execution dataSet = FunctionService.onRegion(pr); |
| ResultCollector rc1 = dataSet.withArgs(Boolean.TRUE).execute( |
| function.getId()); |
| List l = ((List)rc1.getResult()); |
| getLogWriter().info( |
| "PRFunctionExecutionDUnitTest#testExecutionOnAllNodes_byName : Result size :" |
| + l.size() + " Result : " + l); |
| assertEquals(4, l.size()); |
| |
| for (int i = 0; i < 4; i++) { |
| assertEquals(Boolean.TRUE, l.iterator().next()); |
| } |
| return Boolean.TRUE; |
| |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| /** |
| * Ensure that the execution is happening all the PR as a whole |
| * |
| * @throws Exception |
| */ |
| |
| public static class TestResolver implements PartitionResolver, Serializable { |
| public String getName() { |
| return "ResolverName_" + getClass().getName(); |
| } |
| |
| public Serializable getRoutingObject(EntryOperation opDetails) { |
| return (Serializable)opDetails.getKey(); |
| } |
| |
| public void close() { |
| } |
| |
| public Properties getProperties() { |
| return new Properties(); |
| } |
| } |
| |
| private RegionAttributes createColoRegionAttrs(int red, int mem, |
| String coloRegion) { |
| final TestResolver resolver = new TestResolver(); |
| AttributesFactory attr = new AttributesFactory(); |
| attr.setPartitionAttributes(new PartitionAttributesFactory() |
| .setPartitionResolver(resolver).setRedundantCopies(red) |
| .setLocalMaxMemory(mem).setColocatedWith(coloRegion).create()); |
| return attr.create(); |
| } |
| |
| public void testlonerSystem_Bug41832() throws Exception { |
| final String rName = getUniqueName(); |
| Host host = Host.getHost(0); |
| final VM datastore = host.getVM(3); |
| Object o = datastore.invoke(new SerializableCallable("Create region") { |
| public Object call() throws Exception { |
| createLonerCache(); // creates loner cache |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| Region region = getCache().createRegion(rName, factory.create()); |
| Function function = new TestFunction(true, TEST_FUNCTION2); |
| FunctionService.registerFunction(function); |
| |
| Execution dataSet = FunctionService.onRegion(region); |
| try { |
| ResultCollector rc = dataSet.withArgs(Boolean.TRUE).execute( |
| function.getId()); |
| Object o = rc.getResult(); |
| fail("Expected Function Exception"); |
| } |
| catch (Exception expected) { |
| assertTrue(expected.getMessage().startsWith( |
| "No Replicated Region found for executing function")); |
| return Boolean.TRUE; |
| } |
| return Boolean.FALSE; |
| } |
| }); |
| assertEquals(Boolean.TRUE, o); |
| } |
| } |