| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.execute; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import com.gemstone.gemfire.DataSerializable; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.EntryOperation; |
| import com.gemstone.gemfire.cache.PartitionAttributes; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.PartitionResolver; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.execute.Function; |
| import com.gemstone.gemfire.cache.execute.FunctionContext; |
| import com.gemstone.gemfire.cache.execute.FunctionService; |
| import com.gemstone.gemfire.cache.execute.RegionFunctionContext; |
| import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; |
| import com.gemstone.gemfire.cache30.CacheTestCase; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.functions.LocalDataSetFunction; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| |
| public class LocalDataSetDUnitTest extends CacheTestCase { |
| |
| private static final long serialVersionUID = 1L; |
| |
| protected static VM dataStore1 = null; |
| |
| protected static VM dataStore2 = null; |
| |
| protected static VM dataStore3 = null; |
| |
| protected static VM accessor = null; |
| |
| protected static Region customerPR = null; |
| |
| protected static Region orderPR = null; |
| |
| protected static Region shipmentPR = null; |
| |
| public LocalDataSetDUnitTest(String name) { |
| super(name); |
| } |
| |
| public void setUp() throws Exception { |
| super.setUp(); |
| Host host = Host.getHost(0); |
| dataStore1 = host.getVM(0); |
| dataStore2 = host.getVM(1); |
| dataStore3 = host.getVM(2); |
| accessor = host.getVM(3); |
| } |
| |
| public void testLocalDataSet() { |
| createCacheInAllVms(); |
| createCustomerPR(); |
| createOrderPR(); |
| createShipmentPR(); |
| putInPRs(); |
| registerFunctions(); |
| executeFunctions(); |
| } |
| |
| public void testLocalDataSetIteration() { |
| createCacheInAllVms(); |
| createCustomerPR(); |
| createOrderPR(); |
| createShipmentPR(); |
| putInPRs(); |
| registerIteratorFunctionOnAll(); |
| |
| SerializableCallable installHook = new SerializableCallable() { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)cache.getRegion("CustomerPR"); |
| Runnable r = new ReadHook(); |
| pr.getDataStore().setBucketReadHook(r); |
| return null; |
| } |
| }; |
| invokeInAllDataStores(installHook); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region region = cache.getRegion("CustomerPR"); |
| Set filter = new HashSet(); |
| filter.add("1"); |
| FunctionService.onRegion(region).withFilter(filter).execute(IterateFunction.id, true, false, true).getResult(); |
| return null; |
| } |
| }); |
| |
| SerializableCallable bucketRead = new SerializableCallable() { |
| public Object call() throws Exception { |
| return getHookInvoked(); |
| } |
| }; |
| Integer ds1 = (Integer)dataStore1.invoke(bucketRead); |
| Integer ds2 = (Integer)dataStore2.invoke(bucketRead); |
| Integer ds3 = (Integer)dataStore3.invoke(bucketRead); |
| assertEquals(1, ds1+ds2+ds3); |
| } |
| |
| private void invokeInAllDataStores(SerializableCallable installHook) { |
| dataStore1.invoke(installHook); |
| dataStore2.invoke(installHook); |
| dataStore3.invoke(installHook); |
| } |
| |
| protected static class IterateFunction implements Function { |
| public static final String id = "IteratorFunction"; |
| public void execute(FunctionContext context) { |
| Region localRegion = PartitionRegionHelper.getLocalDataForContext((RegionFunctionContext)context); |
| Iterator it = localRegion.keySet().iterator(); |
| while (it.hasNext()) { |
| getLogWriter().info("LocalKeys:"+it.next()); |
| } |
| context.getResultSender().lastResult(Boolean.TRUE); |
| } |
| public String getId() { |
| return id; |
| } |
| public boolean hasResult() { |
| return true; |
| } |
| public boolean optimizeForWrite() { |
| return true; |
| } |
| public boolean isHA() { |
| return false; |
| } |
| } |
| |
| static volatile boolean invoked = false; |
| public static void setHookInvoked() { |
| invoked = true; |
| } |
| public Integer getHookInvoked() { |
| if (invoked) { |
| return Integer.valueOf(1); |
| } |
| return Integer.valueOf(0); |
| } |
| protected static class ReadHook implements Runnable { |
| |
| public void run() { |
| System.out.println("SWAP:invokedHook"); |
| setHookInvoked(); |
| } |
| } |
| |
| private void executeFunctions() { |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "executeFunction"); |
| |
| } |
| public static void executeFunction() { |
| try { |
| FunctionService.onRegion(customerPR).execute( |
| "LocalDataSetFunction" + true,true,false,true).getResult(); |
| FunctionService.onRegion(customerPR).execute( |
| "LocalDataSetFunction" + false,true,false,false).getResult(); |
| Set<String> filter = new HashSet<String>(); |
| filter.add("YOYO-CUST-KEY-"+0); |
| FunctionService.onRegion(customerPR).withFilter(filter).execute( |
| "LocalDataSetFunction" + true,true,false,true).getResult(); |
| FunctionService.onRegion(customerPR).withFilter(filter).execute( |
| "LocalDataSetFunction" + false,true,false,false).getResult(); |
| filter.clear(); |
| for(int i=0 ; i<6 ; i++){ |
| filter.add("YOYO-CUST-KEY-"+i); |
| } |
| FunctionService.onRegion(customerPR).withFilter(filter).execute( |
| "LocalDataSetFunction" + true,true,false,true).getResult(); |
| FunctionService.onRegion(customerPR).withFilter(filter).execute( |
| "LocalDataSetFunction" + false,true,false,false).getResult(); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to ", e); |
| } |
| } |
| private void registerFunctions() { |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "registerFunction"); |
| dataStore2.invoke(LocalDataSetDUnitTest.class, "registerFunction"); |
| dataStore3.invoke(LocalDataSetDUnitTest.class, "registerFunction"); |
| } |
| |
| public static void registerFunction() { |
| Function function1 = new LocalDataSetFunction(false); |
| Function function2 = new LocalDataSetFunction(true); |
| FunctionService.registerFunction(function1); |
| FunctionService.registerFunction(function2); |
| } |
| |
| private void registerIteratorFunctionOnAll() { |
| accessor.invoke(LocalDataSetDUnitTest.class, "registerIteratorFunction"); |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "registerIteratorFunction"); |
| dataStore2.invoke(LocalDataSetDUnitTest.class, "registerIteratorFunction"); |
| dataStore3.invoke(LocalDataSetDUnitTest.class, "registerIteratorFunction"); |
| } |
| |
| public static void registerIteratorFunction() { |
| Function function = new IterateFunction(); |
| FunctionService.registerFunction(function); |
| } |
| |
| public static void createCacheInAllVms() { |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "createCacheInVm"); |
| dataStore2.invoke(LocalDataSetDUnitTest.class, "createCacheInVm"); |
| dataStore3.invoke(LocalDataSetDUnitTest.class, "createCacheInVm"); |
| accessor.invoke(LocalDataSetDUnitTest.class, "createCacheInVm"); |
| } |
| |
| public static void createCacheInVm() { |
| new LocalDataSetDUnitTest("temp").createCache(); |
| } |
| |
| public void createCache() { |
| try { |
| getCache(); |
| assertNotNull(cache); |
| } |
| catch (Exception e) { |
| fail("Failed while creating the cache", e); |
| } |
| } |
| |
| private static void createCustomerPR() { |
| Object args[] = new Object[] { "CustomerPR", new Integer(1), |
| new Integer(0), new Integer(10), null }; |
| accessor.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| args = new Object[] { "CustomerPR", new Integer(1), new Integer(50), |
| new Integer(10), null }; |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| dataStore2.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| dataStore3.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| |
| } |
| |
| private static void createOrderPR() { |
| Object args[] = new Object[] { "OrderPR", new Integer(1), new Integer(0), |
| new Integer(10), "CustomerPR" }; |
| accessor.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| args = new Object[] { "OrderPR", new Integer(1), new Integer(50), |
| new Integer(10), "CustomerPR" }; |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| dataStore2.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| dataStore3.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| } |
| |
| private static void createShipmentPR() { |
| Object args[] = new Object[] { "ShipmentPR", new Integer(1), |
| new Integer(0), new Integer(10), "OrderPR" }; |
| accessor.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| args = new Object[] { "ShipmentPR", new Integer(1), new Integer(50), |
| new Integer(10), "OrderPR" }; |
| dataStore1.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| dataStore2.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| dataStore3.invoke(LocalDataSetDUnitTest.class, "createPR", args); |
| } |
| |
| public static void createPR(String partitionedRegionName, Integer redundancy, |
| Integer localMaxMemory, Integer totalNumBuckets, String colocatedWith) { |
| |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| PartitionAttributes prAttr = paf.setRedundantCopies(redundancy.intValue()) |
| .setLocalMaxMemory(localMaxMemory.intValue()).setTotalNumBuckets( |
| totalNumBuckets.intValue()).setColocatedWith(colocatedWith) |
| .setPartitionResolver(new LDSPartitionResolver()).create(); |
| AttributesFactory attr = new AttributesFactory(); |
| attr.setPartitionAttributes(prAttr); |
| assertNotNull(cache); |
| |
| if (partitionedRegionName.equals("CustomerPR")) { |
| customerPR = cache.createRegion(partitionedRegionName, attr.create()); |
| assertNotNull(customerPR); |
| getLogWriter().info( |
| "Partitioned Region " + partitionedRegionName |
| + " created Successfully :" + customerPR); |
| |
| } |
| if (partitionedRegionName.equals("OrderPR")) { |
| orderPR = cache.createRegion(partitionedRegionName, attr.create()); |
| assertNotNull(orderPR); |
| getLogWriter().info( |
| "Partitioned Region " + partitionedRegionName |
| + " created Successfully :" + orderPR); |
| |
| } |
| |
| if (partitionedRegionName.equals("ShipmentPR")) { |
| shipmentPR = cache.createRegion(partitionedRegionName, attr.create()); |
| assertNotNull(shipmentPR); |
| getLogWriter().info( |
| "Partitioned Region " + partitionedRegionName |
| + " created Successfully :" + shipmentPR); |
| |
| } |
| } |
| |
| private static void putInPRs() { |
| accessor.invoke(LocalDataSetDUnitTest.class, "put"); |
| } |
| |
| public static void put() { |
| for (int i = 0; i < 120; i++) { |
| customerPR.put("YOYO-CUST-KEY-" + i, "YOYO-CUST-VAL-" + i); |
| orderPR.put("YOYO-ORD-KEY-" + i, "YOYO-ORD-VAL-" + i); |
| shipmentPR.put("YOYO-SHIP-KEY-" + i, "YOYO-SHIP-VAL-" + i); |
| } |
| } |
| |
| } |
| |
| class LDSPartitionResolver implements PartitionResolver { |
| |
| public LDSPartitionResolver() { |
| } |
| |
| public String getName() { |
| return this.getClass().getName(); |
| } |
| |
| public Serializable getRoutingObject(EntryOperation opDetails) { |
| String key = (String)opDetails.getKey(); |
| return new LDSRoutingObject("" + key.charAt(key.length() - 1)); |
| } |
| |
| public void close() { |
| } |
| |
| public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| if (!(o instanceof LDSPartitionResolver)) |
| return false; |
| LDSPartitionResolver otherKeyPartitionResolver = (LDSPartitionResolver)o; |
| return otherKeyPartitionResolver.getName().equals(getName()); |
| } |
| } |
| |
| class LDSRoutingObject implements DataSerializable { |
| public LDSRoutingObject(String value) { |
| this.value = value; |
| } |
| |
| private String value; |
| |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| this.value = DataSerializer.readString(in); |
| } |
| |
| public void toData(DataOutput out) throws IOException { |
| DataSerializer.writeString(this.value, out); |
| } |
| |
| public int hashCode() { |
| return Integer.parseInt(this.value); |
| } |
| } |