| /* |
| * ========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| * ========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.PartitionAttributes; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.PartitionedRegionStorageException; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache30.CacheSerializableRunnable; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserver; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter; |
| |
| import dunit.AsyncInvocation; |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| |
| /** |
| * @author tapshank, Created on Jan 17, 2006 |
| * |
| */ |
| public class PartitionedRegionHADUnitTest extends PartitionedRegionDUnitTestCase |
| { |
| |
| //////constructor ////////// |
| public PartitionedRegionHADUnitTest(String name) { |
| |
| super(name); |
| }//end of constructor |
| |
| public static final String PR_PREFIX = "PR"; |
| |
| Properties props = new Properties(); |
| |
| volatile static int regionCnt = 0; |
| |
| final static int MAX_REGIONS = 1; |
| |
| final int totalNumBuckets = 5; |
| |
| /** |
| * Test to ensure that we have proper bucket failover, with no data loss, in the face |
| * of sequential cache.close() events. |
| * @throws Exception |
| */ |
| public void testBucketFailOverDuringCacheClose() throws Exception { |
| |
| final String regionName = getUniqueName(); |
| final Boolean value = new Boolean(true); |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| CacheSerializableRunnable createPR = new CacheSerializableRunnable( |
| "createRegion") { |
| public void run2() throws CacheException |
| { |
| Cache cache = getCache(); |
| final CountDownLatch rebalancingFinished = new CountDownLatch(1); |
| InternalResourceManager.setResourceObserver(new ResourceObserverAdapter(){ |
| |
| @Override |
| public void rebalancingOrRecoveryFinished(Region region) { |
| rebalancingFinished.countDown(); |
| } |
| |
| }); |
| try { |
| Region partitionedregion = cache.createRegion(regionName, |
| createRegionAttributesForPR(1, 20)); |
| if(!rebalancingFinished.await(60000, TimeUnit.MILLISECONDS)) { |
| fail("Redundancy recovery did not happen within 60 seconds"); |
| } |
| assertNotNull(partitionedregion); |
| } catch (InterruptedException e) { |
| fail("interrupted",e); |
| } finally { |
| InternalResourceManager.setResourceObserver(null); |
| } |
| } |
| }; |
| vm2.invoke(createPR); |
| vm3.invoke(createPR); |
| |
| vm3.invoke( |
| new CacheSerializableRunnable( |
| "createPRBuckets") { |
| public void run2() throws CacheException |
| { |
| Cache cache = getCache(); |
| PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName); |
| assertTrue(pr.isEmpty()); |
| Integer k; |
| // Create keys such that all buckets are created, Integer works well |
| // assuming buckets are allocated on the mod of the key hashCode, x 2 just to be safe |
| final int numEntries=pr.getTotalNumberOfBuckets()*2; |
| for(int i=numEntries; i>=0; --i) { |
| k = new Integer(i); |
| pr.put(k, value); |
| } |
| assertEquals(numEntries+1,pr.size()); |
| assertEquals(pr.getRegionAdvisor().getBucketSet().size(), pr.getTotalNumberOfBuckets()); |
| |
| } |
| } |
| ); |
| |
| CacheSerializableRunnable existsEntryCheck = new CacheSerializableRunnable( |
| "PRExistsEntryCheck") { |
| public void run2() throws CacheException |
| { |
| Cache cache = getCache(); |
| PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName); |
| Integer k; |
| for(int i=pr.getTotalNumberOfBuckets()*2; i>=0; --i) { |
| k=new Integer(i); |
| assertTrue("containsKey for key="+k, pr.containsKey(k)); |
| assertEquals("get for key="+k, value, pr.get(k)); |
| } |
| } |
| }; |
| vm3.invoke(existsEntryCheck); |
| vm2.invoke(existsEntryCheck); |
| |
| CacheSerializableRunnable closeCache = new CacheSerializableRunnable( |
| "PRCloseCache") { |
| public void run2() throws CacheException |
| { |
| Cache cache = getCache(); |
| cache.close(); |
| } |
| }; |
| |
| // origin VM down! |
| vm2.invoke(closeCache); |
| // origin down, but no data loss |
| vm3.invoke(existsEntryCheck); |
| |
| // get back to the desired redundancy |
| vm0.invoke(createPR); |
| // verify no data loss |
| vm0.invoke(existsEntryCheck); |
| |
| // 2nd oldest VM down! |
| vm3.invoke(closeCache); |
| // 2nd down, but no data loss |
| vm0.invoke(existsEntryCheck); |
| |
| // get back (for 2nd time) to desired redundancy |
| vm1.invoke(createPR); |
| // verify no data loss |
| vm1.invoke(existsEntryCheck); |
| vm0.invoke(existsEntryCheck); |
| } |
| |
| //////////test methods //////////////// |
| public void testGrabBackupBuckets() throws Throwable |
| { |
| |
| Host host = Host.getHost(0); |
| VM dataStore0 = host.getVM(0); |
| // VM dataStore1 = host.getVM(1); |
| VM dataStore2 = host.getVM(2); |
| VM accessor = host.getVM(3); |
| final int redundantCopies = 1; |
| // Create PRs On 2 VMs |
| CacheSerializableRunnable createPRs = new CacheSerializableRunnable( |
| "createPrRegions") { |
| |
| public void run2() throws CacheException |
| { |
| final CountDownLatch recoveryDone = new CountDownLatch(MAX_REGIONS); |
| ResourceObserver waitForRecovery = new ResourceObserverAdapter() { |
| @Override |
| public void rebalancingOrRecoveryFinished(Region region) { |
| recoveryDone.countDown(); |
| } |
| }; |
| InternalResourceManager.setResourceObserver(waitForRecovery); |
| try { |
| Cache cache = getCache(); |
| System.setProperty(PartitionedRegion.RETRY_TIMEOUT_PROPERTY, "20000"); |
| for (int i = 0; i < MAX_REGIONS; i++) { |
| cache.createRegion(PR_PREFIX + i, |
| createRegionAttributesForPR(redundantCopies, 200)); |
| } |
| System.setProperty(PartitionedRegion.RETRY_TIMEOUT_PROPERTY, |
| Integer.toString(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| if(!recoveryDone.await(60, TimeUnit.SECONDS)) { |
| fail("recovery didn't happen in 60 seconds"); |
| } |
| } catch (InterruptedException e) { |
| fail("recovery wait interrupted", e); |
| } finally { |
| InternalResourceManager.setResourceObserver(null); |
| } |
| } |
| }; |
| CacheSerializableRunnable createAccessor = new CacheSerializableRunnable( |
| "createAccessor") { |
| |
| public void run2() throws CacheException |
| { |
| Cache cache = getCache(); |
| for (int i = 0; i < MAX_REGIONS; i++) { |
| cache.createRegion(PR_PREFIX + i, |
| createRegionAttributesForPR(redundantCopies, 0)); |
| } |
| } |
| }; |
| // Create PRs on only 2 VMs |
| dataStore0.invoke(createPRs); |
| // dataStore1.invoke(createPRs); |
| final String expectedExceptions = PartitionedRegionStorageException.class.getName(); |
| SerializableRunnable addExpectedExceptions = |
| new CacheSerializableRunnable("addExpectedExceptions") { |
| public void run2() throws CacheException { |
| getCache().getLogger().info("<ExpectedException action=add>" + |
| expectedExceptions + "</ExpectedException>"); |
| getLogWriter().info("<ExpectedException action=add>" + |
| expectedExceptions + "</ExpectedException>"); |
| } |
| }; |
| SerializableRunnable removeExpectedExceptions = |
| new CacheSerializableRunnable("removeExpectedExceptions") { |
| public void run2() throws CacheException { |
| getLogWriter().info("<ExpectedException action=remove>" + |
| expectedExceptions + "</ExpectedException>"); |
| getCache().getLogger().info("<ExpectedException action=remove>" + |
| expectedExceptions + "</ExpectedException>"); |
| } |
| }; |
| |
| // Do put operations on these 2 PRs asynchronosly. |
| CacheSerializableRunnable dataStore0Puts = new CacheSerializableRunnable("dataStore0PutOperations") { |
| public void run2() |
| { |
| Cache cache = getCache(); |
| for (int j = 0; j < MAX_REGIONS; j++) { |
| Region pr = cache.getRegion(Region.SEPARATOR + PR_PREFIX + j); |
| assertNotNull(pr); |
| for (int k = 0; k < 10; k++) { |
| pr.put(j + PR_PREFIX + k, PR_PREFIX + k); |
| } |
| getLogWriter().info("VM0 Done put successfully for PR = " + PR_PREFIX |
| + j); |
| } |
| } |
| }; |
| |
| CacheSerializableRunnable dataStore1Puts = new CacheSerializableRunnable("dataStore1PutOperations") { // TODO bug36296 |
| public void run2() |
| { |
| |
| Cache cache = getCache(); |
| for (int j = 0; j < MAX_REGIONS; j++) { |
| Region pr = cache.getRegion(Region.SEPARATOR + PR_PREFIX + (j)); |
| assertNotNull(pr); |
| for (int k = 10; k < 20; k++) { |
| pr.put(j + PR_PREFIX + k, PR_PREFIX + k); |
| } |
| getLogWriter().info("VM1 Done put successfully for PR = " + PR_PREFIX |
| + j); |
| } |
| } |
| }; |
| dataStore0.invoke(addExpectedExceptions); |
| // dataStore1.invoke(addExpectedExceptions); |
| AsyncInvocation async0 = dataStore0.invokeAsync(dataStore0Puts); |
| // AsyncInvocation async1 = dataStore1.invokeAsync(dataStore1Puts); |
| DistributedTestCase.join(async0, 30 * 1000, getLogWriter()); |
| // async1.join(); |
| dataStore0.invoke(removeExpectedExceptions); |
| // dataStore1.invoke(removeExpectedExceptions); |
| |
| // Verify that buckets can not be created if there are not enough Nodes to support |
| // the redundancy Configuration |
| assertFalse(async0.exceptionOccurred()); |
| // assertTrue(async0.getException() instanceof PartitionedRegionStorageException); |
| // assertTrue(async1.exceptionOccurred()); |
| // assertTrue(async1.getException() instanceof PartitionedRegionStorageException); |
| |
| // At this point redundancy criterion is not meet. |
| // now if we create PRs on more VMs, it should create those "supposed to |
| // be redundant" buckets on these nodes, if it can accommodate the data |
| // (localMaxMemory>0). |
| dataStore2.invoke(createPRs); |
| |
| async0 = dataStore0.invokeAsync(dataStore0Puts); |
| // async1 = dataStore1.invokeAsync(dataStore1Puts); |
| DistributedTestCase.join(async0, 30 * 1000, getLogWriter()); |
| // async1.join(); |
| |
| if (async0.exceptionOccurred()) { |
| fail("async0 failed", async0.getException()); |
| } |
| // assertFalse(async1.exceptionOccurred()); |
| |
| accessor.invoke(createAccessor); |
| |
| for (int c = 0; c < MAX_REGIONS; c++) { |
| final Integer ri = new Integer(c); |
| final SerializableCallable validateLocalBucket2RegionMapSize = |
| new SerializableCallable("validateLocalBucket2RegionMapSize") { |
| public Object call() throws Exception { |
| int size = 0; |
| Cache cache = getCache(); |
| PartitionedRegion pr = (PartitionedRegion)cache |
| .getRegion(Region.SEPARATOR + PR_PREFIX + ri.intValue()); |
| if (pr.getDataStore() != null) { |
| size = pr.getDataStore().getBucketsManaged(); |
| } |
| return new Integer(size); |
| } |
| }; |
| final SerializableCallable validateBucketsOnNode = |
| new SerializableCallable("validateBucketOnNode") { |
| public Object call() throws Exception { |
| int containsNode = 0; |
| Cache cache = getCache(); |
| PartitionedRegion pr = (PartitionedRegion)cache |
| .getRegion(Region.SEPARATOR + PR_PREFIX + ri.intValue()); |
| |
| Iterator it = pr.getRegionAdvisor().getBucketSet().iterator(); |
| Set nodeList; |
| try { |
| while (it.hasNext()) { |
| Integer bucketId = (Integer) it.next(); |
| nodeList = pr.getRegionAdvisor().getBucketOwners(bucketId.intValue()); |
| if ((nodeList != null) && (nodeList.contains(pr.getMyId()))) { |
| containsNode++; |
| } |
| else { |
| getCache().getLogger().fine("I don't contain member " + pr.getMyId()); |
| } |
| } |
| } catch (NoSuchElementException done) { |
| } |
| |
| return new Integer(containsNode); |
| } |
| }; |
| |
| // int vm0LBRsize = ((Integer)dataStore0.invoke(validateLocalBucket2RegionMapSize)).intValue(); |
| int vm2LBRsize = ((Integer)dataStore2.invoke(validateLocalBucket2RegionMapSize)).intValue(); |
| int vm3LBRsize = ((Integer)accessor.invoke(validateLocalBucket2RegionMapSize)).intValue(); |
| // This would mean that up coming node didn't pick up any buckets |
| assertFalse(vm2LBRsize == 0); |
| // This accessor should NOT have picked up any buckets. |
| assertFalse(vm3LBRsize != 0); |
| int vm2B2Nsize = ((Integer)dataStore2.invoke(validateBucketsOnNode)).intValue(); |
| getLogWriter().info("vm2B2Nsize = " + vm2B2Nsize); |
| assertEquals(vm2B2Nsize, vm2LBRsize); |
| } |
| } |
| |
| /** |
| * This verifies the Bucket Regions on the basis of |
| * redundantCopies set in RegionAttributes. |
| * @see PartitionedRegionSingleNodeOperationsJUnitTest#testBucketScope() |
| * @throws Exception |
| */ |
| public void testBucketsScope() throws Exception |
| { |
| |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final String PR_ZeroRedundancy = "PR_ZeroRedundancy"; |
| final String PR_SingleRedundancy = "PR_SingleRedundancy"; |
| // Create PRs On 2 VMs |
| CacheSerializableRunnable createPRs = new CacheSerializableRunnable( |
| "createPrRegionWithZeroRed") { |
| public void run2() throws CacheException |
| { |
| Cache cache = getCache(); |
| |
| // RedundantCopies = 0 , Scope = DISTRIBUTED_ACK |
| cache.createRegion(PR_ZeroRedundancy, createRegionAttributesForPR( |
| 0, 200)); |
| // RedundantCopies > 0 , Scope = DISTRIBUTED_ACK |
| cache.createRegion(PR_SingleRedundancy, createRegionAttributesForPR(1, |
| 200)); |
| } |
| }; |
| |
| // Create PRs on only 2 VMs |
| vm0.invoke(createPRs); |
| vm1.invoke(createPRs); |
| // Do put operations on these 2 PRs asynchronosly. |
| |
| vm0.invoke(new CacheSerializableRunnable("doPutOperations") { |
| public void run2() |
| { |
| Cache cache = getCache(); |
| String regionName = PR_ZeroRedundancy; |
| Region pr = cache.getRegion(Region.SEPARATOR + regionName); |
| assertNotNull(pr); |
| for (int k = 0; k < 10; k++) { |
| pr.put(k + "", k + ""); |
| } |
| cache.getLogger().fine( |
| "VM0 Done put successfully for PR = " + regionName); |
| |
| regionName = PR_SingleRedundancy; |
| Region pr1 = cache.getRegion(Region.SEPARATOR + regionName); |
| assertNotNull(pr1); |
| for (int k = 0; k < 10; k++) { |
| pr1.put(k + "", k + ""); |
| } |
| cache.getLogger().fine( |
| "VM0 Done put successfully for PR = " + regionName); |
| } |
| |
| }); |
| |
| CacheSerializableRunnable validateBucketScope = new CacheSerializableRunnable( |
| "validateBucketScope") { |
| public void run2() |
| { |
| Cache cache = getCache(); |
| |
| |
| String regionName = PR_ZeroRedundancy; |
| PartitionedRegion pr = (PartitionedRegion)cache |
| .getRegion(Region.SEPARATOR + regionName); |
| |
| java.util.Iterator buckRegionIterator = pr.getDataStore().localBucket2RegionMap |
| .values().iterator(); |
| while (buckRegionIterator.hasNext()) { |
| BucketRegion bucket = (BucketRegion)buckRegionIterator.next(); |
| assertTrue(bucket.getAttributes().getScope().isDistributedAck()); |
| } |
| |
| regionName = PR_SingleRedundancy; |
| PartitionedRegion pr1 = (PartitionedRegion)cache |
| .getRegion(Region.SEPARATOR + regionName); |
| |
| java.util.Iterator buckRegionIterator1 = pr1.getDataStore().localBucket2RegionMap |
| .values().iterator(); |
| while (buckRegionIterator1.hasNext()) { |
| Region bucket = (Region)buckRegionIterator1.next(); |
| assertEquals(DataPolicy.REPLICATE, bucket.getAttributes().getDataPolicy()); |
| } |
| } |
| |
| }; |
| |
| vm0.invoke(validateBucketScope); |
| vm1.invoke(validateBucketScope); |
| |
| } |
| |
| /** |
| * This private methods sets the passed attributes and returns RegionAttribute |
| * object, which is used in create region |
| * @param redundancy |
| * @param localMaxMem |
| * |
| * @return |
| */ |
| protected RegionAttributes createRegionAttributesForPR(int redundancy, |
| int localMaxMem) |
| { |
| |
| AttributesFactory attr = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| PartitionAttributes prAttr = paf.setRedundantCopies(redundancy) |
| .setLocalMaxMemory(localMaxMem) |
| .setTotalNumBuckets(totalNumBuckets) |
| .create(); |
| attr.setPartitionAttributes(prAttr); |
| return attr.create(); |
| } |
| } |