| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.partitioned; |
| |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.atLeastOnce; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.logging.log4j.Level; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.core.Appender; |
| import org.apache.logging.log4j.core.LogEvent; |
| import org.apache.logging.log4j.core.Logger; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.mockito.ArgumentCaptor; |
| |
| import org.apache.geode.admin.internal.AdminDistributedSystemImpl; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.DiskStore; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.control.RebalanceOperation; |
| import org.apache.geode.cache.control.RebalanceResults; |
| import org.apache.geode.cache.persistence.PartitionOfflineException; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.DistributionMessageObserver; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.cache.ColocationLogger; |
| import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionHelper; |
| import org.apache.geode.internal.cache.control.InternalResourceManager; |
| import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.RMIException; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.junit.categories.RegionsTest; |
| |
| @Category({RegionsTest.class}) |
| public class PersistentColocatedPartitionedRegionDUnitTest |
| extends PersistentPartitionedRegionTestBase { |
| |
| private static final String PATTERN_FOR_MISSING_CHILD_LOG = |
| "(?s)Persistent data recovery for region .*is prevented by offline colocated region.*"; |
| private static final int NUM_BUCKETS = 15; |
| private static final int MAX_WAIT = 60 * 1000; |
| private static final int DEFAULT_NUM_EXPECTED_LOG_MESSAGES = 1; |
| private static int numExpectedLogMessages = DEFAULT_NUM_EXPECTED_LOG_MESSAGES; |
| private static int numChildPRs = 1; |
| private static int numChildPRGenerations = 2; |
| // Default region creation delay long enough for the initial cycle of logger warnings |
| private static int delayForChildCreation = MAX_WAIT * 2 / 3; |
| |
| public PersistentColocatedPartitionedRegionDUnitTest() { |
| super(); |
| } |
| |
| @Override |
| public final void preTearDownCacheTestCase() throws Exception { |
| FileUtils.deleteDirectory(getBackupDir()); |
| } |
| |
| @Test |
| public void testColocatedPRAttributes() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(1); |
| |
| vm0.invoke(new SerializableRunnable("create") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| |
| // Create Persistent region |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion("persistentLeader", af.create()); |
| |
| af.setDataPolicy(DataPolicy.PARTITION); |
| af.setDiskStoreName(null); |
| cache.createRegion("nonPersistentLeader", af.create()); |
| |
| |
| // Create a non persistent PR |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| paf.setColocatedWith("nonPersistentLeader"); |
| af.setPartitionAttributes(paf.create()); |
| |
| // Try to colocate a persistent PR with the non persistent PR. This should fail. |
| IgnoredException exp = IgnoredException.addIgnoredException("IllegalStateException"); |
| try { |
| cache.createRegion("colocated", af.create()); |
| fail( |
| "should not have been able to create a persistent region colocated with a non persistent region"); |
| } catch (IllegalStateException expected) { |
| // do nothing |
| } finally { |
| exp.remove(); |
| } |
| |
| // Try to colocate a persistent PR with another persistent PR. This should work. |
| paf.setColocatedWith("persistentLeader"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("colocated", af.create()); |
| |
| // We should also be able to colocate a non persistent region with a persistent region. |
| af.setDataPolicy(DataPolicy.PARTITION); |
| af.setDiskStoreName(null); |
| paf.setColocatedWith("persistentLeader"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("colocated2", af.create()); |
| } |
| }); |
| } |
| |
| /** |
| * Testing that we can colocate persistent PRs |
| */ |
| @Test |
| public void testColocatedPRs() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| SerializableRunnable createPRs = new SerializableRunnable("region1") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| paf.setColocatedWith("region2"); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PARTITION); |
| af.setDiskStoreName(null); |
| cache.createRegion("region3", af.create()); |
| } |
| }; |
| vm0.invoke(createPRs); |
| vm1.invoke(createPRs); |
| vm2.invoke(createPRs); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "region3"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region3")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region3")); |
| Set<Integer> vm2Buckets = getBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region2")); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region3")); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| closeCache(vm2); |
| |
| AsyncInvocation async0 = vm0.invokeAsync(createPRs); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRs); |
| AsyncInvocation async2 = vm2.invokeAsync(createPRs); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| |
| |
| // The secondary buckets can be recovered asynchronously, |
| // so wait for them to come back. |
| waitForBuckets(vm0, vm0Buckets, getPartitionedRegionName()); |
| waitForBuckets(vm0, vm0Buckets, "region2"); |
| waitForBuckets(vm1, vm1Buckets, getPartitionedRegionName()); |
| waitForBuckets(vm1, vm1Buckets, "region2"); |
| |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| checkData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| // region 3 didn't have persistent data, so it nothing should be recovered |
| checkData(vm0, 0, NUM_BUCKETS, null, "region3"); |
| |
| // Make sure can do a put in all of the buckets in region 3 |
| createData(vm0, 0, NUM_BUCKETS, "c", "region3"); |
| // Now all of those buckets should exist. |
| checkData(vm0, 0, NUM_BUCKETS, "c", "region3"); |
| // The region 3 buckets should be restored in the appropriate places. |
| assertEquals(vm0Buckets, getBucketList(vm0, "region3")); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region3")); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region3")); |
| |
| } |
| |
| private void createPR(String regionName, boolean persistent) { |
| createPR(regionName, null, persistent, "disk"); |
| } |
| |
| private void createPR(String regionName, String colocatedWith, boolean persistent) { |
| createPR(regionName, colocatedWith, persistent, "disk"); |
| } |
| |
| private void createPR(String regionName, String colocatedRegionName, boolean persistent, |
| String diskName) { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore(diskName); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskName); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| if (colocatedRegionName != null) { |
| paf.setColocatedWith(colocatedRegionName); |
| } |
| af.setPartitionAttributes(paf.create()); |
| if (persistent) { |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName(diskName); |
| } else { |
| af.setDataPolicy(DataPolicy.PARTITION); |
| af.setDiskStoreName(null); |
| } |
| cache.createRegion(regionName, af.create()); |
| } |
| |
| private SerializableRunnable createPRsColocatedPairThread = |
| new SerializableRunnable("create2PRs") { |
| @Override |
| public void run() { |
| createPR(getPartitionedRegionName(), true); |
| createPR("region2", getPartitionedRegionName(), true); |
| } |
| }; |
| |
| private SerializableRunnable createMultipleColocatedChildPRs = |
| new SerializableRunnable("create multiple child PRs") { |
| @Override |
| public void run() throws Exception { |
| createPR(getPartitionedRegionName(), true); |
| for (int i = 2; i < numChildPRs + 2; ++i) { |
| createPR("region" + i, getPartitionedRegionName(), true); |
| } |
| } |
| }; |
| |
| private SerializableRunnable createPRColocationHierarchy = |
| new SerializableRunnable("create PR colocation hierarchy") { |
| @Override |
| public void run() throws Exception { |
| createPR(getPartitionedRegionName(), true); |
| createPR("region2", getPartitionedRegionName(), true); |
| for (int i = 3; i < numChildPRGenerations + 2; ++i) { |
| createPR("region" + i, "region" + (i - 1), true); |
| } |
| } |
| }; |
| |
| private SerializableCallable createPRsMissingParentRegionThread = |
| new SerializableCallable("createPRsMissingParentRegion") { |
| @Override |
| public Object call() throws Exception { |
| String exClass = ""; |
| Exception ex = null; |
| try { |
| // Skip creation of first region - expect region2 creation to fail |
| // createPR(PR_REGION_NAME, true); |
| createPR("region2", getPartitionedRegionName(), true); |
| } catch (Exception e) { |
| ex = e; |
| exClass = e.getClass().toString(); |
| } finally { |
| return ex; |
| } |
| } |
| }; |
| |
| private SerializableCallable delayedCreatePRsMissingParentRegionThread = |
| new SerializableCallable("delayedCreatePRsMissingParentRegion") { |
| @Override |
| public Object call() throws Exception { |
| String exClass = ""; |
| Exception ex = null; |
| // To ensure that the targeted code paths in ColocationHelper.getColocatedRegion is taken, |
| // this |
| // thread delays the attempted creation on the local member of colocated child region when |
| // parent doesn't exist. |
| // The delay is so that both parent and child regions will be created on another member |
| // and the PR root config |
| // will have an entry for the parent region. |
| Thread.sleep(100); |
| try { |
| // Skip creation of first region - expect region2 creation to fail |
| // createPR(PR_REGION_NAME, true); |
| createPR("region2", getPartitionedRegionName(), true); |
| } catch (Exception e) { |
| ex = e; |
| exClass = e.getClass().toString(); |
| } finally { |
| return ex; |
| } |
| } |
| }; |
| |
| private SerializableCallable createPRsMissingChildRegionThread = |
| new SerializableCallable("createPRsMissingChildRegion") { |
| |
| @Override |
| public Object call() throws Exception { |
| |
| try (MockAppender mockAppender = new MockAppender(ColocationLogger.class)) { |
| createPR(getPartitionedRegionName(), true); |
| // Let this thread continue running long enough for the missing region to be logged a |
| // couple times. |
| // Child regions do not get created by this thread. |
| await().untilAsserted( |
| () -> assertEquals(numExpectedLogMessages, mockAppender.getLogs().size())); |
| |
| return mockAppender.getLogs().get(0).getMessage().getFormattedMessage(); |
| } finally { |
| numExpectedLogMessages = 1; |
| } |
| } |
| }; |
| |
| private SerializableCallable createPRsMissingChildRegionDelayedStartThread = |
| new SerializableCallable("createPRsMissingChildRegionDelayedStart") { |
| |
| @Override |
| public Object call() throws Exception { |
| |
| try (MockAppender mockAppender = new MockAppender(ColocationLogger.class)) { |
| createPR(getPartitionedRegionName(), true); |
| // Delay creation of second (i.e child) region to see missing colocated region log |
| // message (logInterval/2 < delay < logInterval) |
| await().untilAsserted( |
| () -> assertEquals(numExpectedLogMessages, mockAppender.getLogs().size())); |
| createPR("region2", getPartitionedRegionName(), true); |
| // Another delay before exiting the thread to make sure that missing region logging |
| // doesn't continue after missing region is created (delay > logInterval) |
| Thread.sleep(ColocationLogger.getLogInterval() + 10000); |
| return mockAppender.getLogs().get(0).getMessage().getFormattedMessage(); |
| } finally { |
| numExpectedLogMessages = 1; |
| } |
| } |
| }; |
| |
| private SerializableCallable createPRsSequencedChildrenCreationThread = |
| new SerializableCallable("createPRsSequencedChildrenCreation") { |
| |
| @Override |
| public Object call() throws Exception { |
| |
| try (MockAppender mockAppender = new MockAppender(ColocationLogger.class)) { |
| createPR(getPartitionedRegionName(), true); |
| // Delay creation of child generation regions to see missing colocated region log |
| // message |
| // parent region is generation 1, child region is generation 2, grandchild is 3, etc. |
| for (int generation = 2; generation < (numChildPRGenerations + 2); ++generation) { |
| String childPRName = "region" + generation; |
| String colocatedWithRegionName = |
| generation == 2 ? getPartitionedRegionName() : "region" + (generation - 1); |
| |
| // delay between starting generations of child regions until the expected missing |
| // colocation messages are logged |
| int n = (generation - 1) * generation / 2; |
| await().untilAsserted(() -> assertEquals(n, mockAppender.getLogs().size())); |
| |
| // Start the child region |
| createPR(childPRName, colocatedWithRegionName, true); |
| } |
| assertEquals(numExpectedLogMessages, mockAppender.getLogs().size()); |
| |
| // Another delay before exiting the thread to make sure that missing region logging |
| // doesn't continue after all regions are created (delay > logInterval) |
| verify(mockAppender.getMock(), atLeastOnce()).getName(); |
| verify(mockAppender.getMock(), atLeastOnce()).isStarted(); |
| Thread.sleep(ColocationLogger.getLogInterval() + 10000); |
| verifyNoMoreInteractions(mockAppender.getMock()); |
| return mockAppender.getLogs().get(0).getMessage().getFormattedMessage(); |
| } finally { |
| numExpectedLogMessages = 1; |
| } |
| } |
| }; |
| |
| private SerializableCallable createMultipleColocatedChildPRsWithSequencedStart = |
| new SerializableCallable("createPRsMultipleSequencedChildrenCreation") { |
| |
| @Override |
| public Object call() throws Exception { |
| |
| |
| try (MockAppender mockAppender = new MockAppender(ColocationLogger.class)) { |
| createPR(getPartitionedRegionName(), true); |
| // Delay creation of child generation regions to see missing colocated region log |
| // message |
| for (int regionNum = 2; regionNum < (numChildPRs + 2); ++regionNum) { |
| String childPRName = "region" + regionNum; |
| |
| // delay between starting generations of child regions until the expected missing |
| // colocation messages are logged |
| int n = regionNum - 1; |
| await().untilAsserted(() -> assertEquals(n, mockAppender.getLogs().size())); |
| int numLogEvents = mockAppender.getLogs().size(); |
| assertEquals("Expected warning messages to be logged.", regionNum - 1, numLogEvents); |
| |
| // Start the child region |
| createPR(childPRName, getPartitionedRegionName(), true); |
| } |
| String logMsg = ""; |
| assertEquals(String.format("Expected warning messages to be logged."), |
| numExpectedLogMessages, mockAppender.getLogs().size()); |
| logMsg = mockAppender.getLogs().get(0).getMessage().getFormattedMessage(); |
| |
| // Another delay before exiting the thread to make sure that missing region logging |
| // doesn't continue after all regions are created (delay > logInterval) |
| verify(mockAppender.getMock(), atLeastOnce()).getName(); |
| verify(mockAppender.getMock(), atLeastOnce()).isStarted(); |
| Thread.sleep(ColocationLogger.getLogInterval() * 2); |
| verifyNoMoreInteractions(mockAppender.getMock()); |
| return logMsg; |
| } finally { |
| |
| numExpectedLogMessages = 1; |
| } |
| } |
| }; |
| |
| private class ColocationLoggerIntervalSetter extends SerializableRunnable { |
| private int logInterval; |
| |
| ColocationLoggerIntervalSetter(int newInterval) { |
| this.logInterval = newInterval; |
| } |
| |
| @Override |
| public void run() throws Exception { |
| ColocationLogger.testhookSetLogInterval(logInterval); |
| } |
| } |
| |
| private class ColocationLoggerIntervalResetter extends SerializableRunnable { |
| private int logInterval; |
| |
| @Override |
| public void run() throws Exception { |
| ColocationLogger.testhookResetLogInterval(); |
| } |
| } |
| |
| private class ExpectedNumLogMessageSetter extends SerializableRunnable { |
| private int numMsgs; |
| |
| ExpectedNumLogMessageSetter(int num) { |
| this.numMsgs = num; |
| } |
| |
| @Override |
| public void run() throws Exception { |
| numExpectedLogMessages = numMsgs; |
| } |
| } |
| |
| private class ExpectedNumLogMessageResetter extends SerializableRunnable { |
| private int numMsgs; |
| |
| ExpectedNumLogMessageResetter() { |
| this.numMsgs = DEFAULT_NUM_EXPECTED_LOG_MESSAGES; |
| } |
| |
| @Override |
| public void run() throws Exception { |
| numExpectedLogMessages = numMsgs; |
| } |
| } |
| |
| private class NumChildPRsSetter extends SerializableRunnable { |
| private int numChildren; |
| |
| NumChildPRsSetter(int num) { |
| this.numChildren = num; |
| } |
| |
| @Override |
| public void run() throws Exception { |
| numChildPRs = numChildren; |
| } |
| } |
| |
| private class NumChildPRGenerationsSetter extends SerializableRunnable { |
| private int numGenerations; |
| |
| NumChildPRGenerationsSetter(int num) { |
| this.numGenerations = num; |
| } |
| |
| @Override |
| public void run() throws Exception { |
| numChildPRGenerations = numGenerations; |
| } |
| } |
| |
| private class DelayForChildCreationSetter extends SerializableRunnable { |
| private int delay; |
| |
| DelayForChildCreationSetter(int millis) { |
| this.delay = millis; |
| } |
| |
| @Override |
| public void run() throws Exception { |
| delayForChildCreation = delay; |
| } |
| } |
| |
| /** |
| * Testing that missing colocated persistent PRs are logged as warning |
| */ |
| @Test |
| public void testMissingColocatedParentPR() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(createPRsColocatedPairThread); |
| vm1.invoke(createPRsColocatedPairThread); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| // The following should fail immediately with ISE on vm0, it's not necessary to also try the |
| // operation on vm1. |
| Object remoteException = null; |
| remoteException = vm0.invoke(createPRsMissingParentRegionThread); |
| |
| assertEquals("Expected IllegalState Exception for missing colocated parent region", |
| IllegalStateException.class, remoteException.getClass()); |
| assertTrue("Expected IllegalState Exception for missing colocated parent region", |
| remoteException.toString() |
| .matches("java.lang.IllegalStateException: Region specified in 'colocated-with'.*")); |
| } |
| |
| /** |
| * Testing that parent colocated persistent PRs only missing on local member throws exception |
| */ |
| @Test |
| public void testMissingColocatedParentPRWherePRConfigExists() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(createPRsColocatedPairThread); |
| vm1.invoke(createPRsColocatedPairThread); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| AsyncInvocation async0 = null; |
| AsyncInvocation async1a = null; |
| AsyncInvocation async1b = null; |
| try { |
| async0 = vm0.invokeAsync(createPRsColocatedPairThread); |
| |
| Object logMsg = ""; |
| Object remoteException = null; |
| async1a = vm1.invokeAsync(delayedCreatePRsMissingParentRegionThread); |
| remoteException = async1a.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| |
| assertEquals("Expected IllegalState Exception for missing colocated parent region", |
| IllegalStateException.class, remoteException.getClass()); |
| assertTrue("Expected IllegalState Exception for missing colocated parent region", |
| remoteException.toString() |
| .matches("java.lang.IllegalStateException: Region specified in 'colocated-with'.*")); |
| } finally { |
| // The real test is done now (either passing or failing) but there's some cleanup in this test |
| // that needs to be done. |
| // |
| // The vm0 invokeAsync thread is still alive after the expected exception on vm1. Cleanup by |
| // first re-creating both regions |
| // on vm1, vm0 thread should now complete. Then wait (i.e. join() on the thread) for the new |
| // vm1 thread and the vm0 thread to |
| // verify they terminated without timing out, and close the caches. |
| async1b = vm1.invokeAsync(createPRsColocatedPairThread); |
| async1b.join(MAX_WAIT); |
| async0.join(MAX_WAIT); |
| closeCache(vm1); |
| closeCache(vm0); |
| } |
| } |
| |
| /** |
| * Testing that missing colocated child persistent PRs are logged as warning |
| */ |
| @Test |
| public void testMissingColocatedChildPRDueToDelayedStart() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(createPRsColocatedPairThread); |
| vm1.invoke(createPRsColocatedPairThread); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm0.invoke(new ExpectedNumLogMessageSetter(1)); |
| vm1.invoke(new ExpectedNumLogMessageSetter(1)); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createPRsMissingChildRegionDelayedStartThread); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRsMissingChildRegionDelayedStartThread); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ExpectedNumLogMessageResetter()); |
| vm1.invoke(new ExpectedNumLogMessageResetter()); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| /** |
| * Testing that missing colocated child persistent PRs are logged as warning |
| */ |
| @Test |
| public void testMissingColocatedChildPR() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(createPRsColocatedPairThread); |
| vm1.invoke(createPRsColocatedPairThread); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm0.invoke(new ExpectedNumLogMessageSetter(2)); |
| vm1.invoke(new ExpectedNumLogMessageSetter(2)); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createPRsMissingChildRegionThread); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRsMissingChildRegionThread); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ExpectedNumLogMessageResetter()); |
| vm1.invoke(new ExpectedNumLogMessageResetter()); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| /** |
| * Test that when there is more than one missing colocated child persistent PRs for a region all |
| * missing regions are logged in the warning. |
| */ |
| @Test |
| public void testMultipleColocatedChildPRsMissing() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| int numChildPRs = 2; |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(new NumChildPRsSetter(numChildPRs)); |
| vm1.invoke(new NumChildPRsSetter(numChildPRs)); |
| vm0.invoke(createMultipleColocatedChildPRs); |
| vm1.invoke(createMultipleColocatedChildPRs); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertFalse(vm1Buckets.isEmpty()); |
| for (int i = 2; i < numChildPRs + 2; ++i) { |
| String childName = "region" + i; |
| assertEquals(vm0Buckets, getBucketList(vm0, childName)); |
| assertEquals(vm1Buckets, getBucketList(vm1, childName)); |
| } |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm0.invoke(new ExpectedNumLogMessageSetter(2)); |
| vm1.invoke(new ExpectedNumLogMessageSetter(2)); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createPRsMissingChildRegionThread); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRsMissingChildRegionThread); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ExpectedNumLogMessageResetter()); |
| vm1.invoke(new ExpectedNumLogMessageResetter()); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| /** |
| * Test that when there is more than one missing colocated child persistent PRs for a region all |
| * missing regions are logged in the warning. Verifies that as regions are created they no longer |
| * appear in the warning. |
| */ |
| @Test |
| public void testMultipleColocatedChildPRsMissingWithSequencedStart() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| int numChildPRs = 2; |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(new NumChildPRsSetter(numChildPRs)); |
| vm1.invoke(new NumChildPRsSetter(numChildPRs)); |
| vm0.invoke(createMultipleColocatedChildPRs); |
| vm1.invoke(createMultipleColocatedChildPRs); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertFalse(vm1Buckets.isEmpty()); |
| for (int i = 2; i < numChildPRs + 2; ++i) { |
| String childName = "region" + i; |
| assertEquals(vm0Buckets, getBucketList(vm0, childName)); |
| assertEquals(vm1Buckets, getBucketList(vm1, childName)); |
| } |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm0.invoke(new ExpectedNumLogMessageSetter(2)); |
| vm1.invoke(new ExpectedNumLogMessageSetter(2)); |
| vm0.invoke(new DelayForChildCreationSetter((int) (loggerTestInterval))); |
| vm1.invoke(new DelayForChildCreationSetter((int) (loggerTestInterval))); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createMultipleColocatedChildPRsWithSequencedStart); |
| AsyncInvocation async1 = vm1.invokeAsync(createMultipleColocatedChildPRsWithSequencedStart); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ExpectedNumLogMessageResetter()); |
| vm1.invoke(new ExpectedNumLogMessageResetter()); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| /** |
| * Testing that all missing persistent PRs in a colocation hierarchy are logged as warnings |
| */ |
| @Test |
| public void testHierarchyOfColocatedChildPRsMissing() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| int numChildGenerations = 2; |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(new NumChildPRGenerationsSetter(numChildGenerations)); |
| vm1.invoke(new NumChildPRGenerationsSetter(numChildGenerations)); |
| vm0.invoke(createPRColocationHierarchy); |
| vm1.invoke(createPRColocationHierarchy); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "region3"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertFalse(vm1Buckets.isEmpty()); |
| for (int i = 2; i < numChildGenerations + 2; ++i) { |
| String childName = "region" + i; |
| assertEquals(vm0Buckets, getBucketList(vm0, childName)); |
| assertEquals(vm1Buckets, getBucketList(vm1, childName)); |
| } |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| // Expected warning logs only on the child region, because without the child there's nothing |
| // known about the remaining hierarchy |
| vm0.invoke(new ExpectedNumLogMessageSetter(numChildGenerations)); |
| vm1.invoke(new ExpectedNumLogMessageSetter(numChildGenerations)); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createPRsMissingChildRegionThread); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRsMissingChildRegionThread); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ExpectedNumLogMessageResetter()); |
| vm1.invoke(new ExpectedNumLogMessageResetter()); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| /** |
| * Testing that all missing persistent PRs in a colocation hierarchy are logged as warnings |
| */ |
| @Test |
| public void testHierarchyOfColocatedChildPRsMissingGrandchild() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| int numChildGenerations = 3; |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(new NumChildPRGenerationsSetter(numChildGenerations)); |
| vm1.invoke(new NumChildPRGenerationsSetter(numChildGenerations)); |
| vm0.invoke(createPRColocationHierarchy); |
| vm1.invoke(createPRColocationHierarchy); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "region3"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertFalse(vm0Buckets.isEmpty()); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertFalse(vm1Buckets.isEmpty()); |
| for (int i = 2; i < numChildGenerations + 2; ++i) { |
| String childName = "region" + i; |
| assertEquals(vm0Buckets, getBucketList(vm0, childName)); |
| assertEquals(vm1Buckets, getBucketList(vm1, childName)); |
| } |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| // Expected warning logs only on the child region, because without the child there's nothing |
| // known about the remaining hierarchy |
| vm0.invoke( |
| new ExpectedNumLogMessageSetter(numChildGenerations * (numChildGenerations + 1) / 2)); |
| vm1.invoke( |
| new ExpectedNumLogMessageSetter(numChildGenerations * (numChildGenerations + 1) / 2)); |
| vm0.invoke(new DelayForChildCreationSetter((int) (loggerTestInterval))); |
| vm1.invoke(new DelayForChildCreationSetter((int) (loggerTestInterval))); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createPRsSequencedChildrenCreationThread); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRsSequencedChildrenCreationThread); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ExpectedNumLogMessageResetter()); |
| vm1.invoke(new ExpectedNumLogMessageResetter()); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| private SerializableRunnable createPRColocationTree = |
| new SerializableRunnable("create PR colocation hierarchy") { |
| @Override |
| public void run() throws Exception { |
| createPR("Parent", true); |
| createPR("Gen1_C1", "Parent", true); |
| createPR("Gen1_C2", "Parent", true); |
| createPR("Gen2_C1_1", "Gen1_C1", true); |
| createPR("Gen2_C1_2", "Gen1_C1", true); |
| createPR("Gen2_C2_1", "Gen1_C2", true); |
| createPR("Gen2_C2_2", "Gen1_C2", true); |
| } |
| }; |
| |
| /** |
| * The colocation tree has the regions started in a specific order so that the logging is |
| * predictable. For each entry in the list, the array values are: |
| * |
| * <pre> |
| * |
| * [0] - the region name |
| * [1] - the name of that region's parent |
| * [2] - the number of warnings that will be logged after the region is created (1 warning for |
| * each region in the tree that exists that still has 1 or more missing children.) |
| * </pre> |
| */ |
| private static final List<Object[]> CHILD_REGION_RESTART_ORDER = new ArrayList<Object[]>(); |
| static { |
| CHILD_REGION_RESTART_ORDER.add(new Object[] {"Gen1_C1", "Parent", 2}); |
| CHILD_REGION_RESTART_ORDER.add(new Object[] {"Gen2_C1_1", "Gen1_C1", 2}); |
| CHILD_REGION_RESTART_ORDER.add(new Object[] {"Gen1_C2", "Parent", 3}); |
| CHILD_REGION_RESTART_ORDER.add(new Object[] {"Gen2_C1_2", "Gen1_C1", 2}); |
| CHILD_REGION_RESTART_ORDER.add(new Object[] {"Gen2_C2_1", "Gen1_C2", 2}); |
| CHILD_REGION_RESTART_ORDER.add(new Object[] {"Gen2_C2_2", "Gen1_C2", 0}); |
| } |
| |
| /** |
| * This thread starts up multiple colocated child regions in the sequence defined by |
| * {@link #CHILD_REGION_RESTART_ORDER}. The complete startup sequence, which includes timed |
| * periods waiting for log messages, takes at least 28 secs. Tests waiting for this |
| * {@link SerializableCallable} to complete must have sufficient overhead in the wait for runtime |
| * variations that exceed the minimum time to complete. |
| */ |
| private SerializableCallable createPRsSequencedColocationTreeCreationThread = |
| new SerializableCallable("createPRsSequencedColocationTreeCreation") { |
| Appender mockAppender; |
| ArgumentCaptor<LogEvent> loggingEventCaptor; |
| |
| @Override |
| public Object call() throws Exception { |
| // Setup for capturing logger messages |
| mockAppender = mock(Appender.class); |
| when(mockAppender.getName()).thenReturn("MockAppender"); |
| when(mockAppender.isStarted()).thenReturn(true); |
| when(mockAppender.isStopped()).thenReturn(false); |
| Logger logger = (Logger) LogManager.getLogger(ColocationLogger.class); |
| logger.addAppender(mockAppender); |
| logger.setLevel(Level.WARN); |
| loggingEventCaptor = ArgumentCaptor.forClass(LogEvent.class); |
| |
| // Logger interval may have been hooked by the test, so adjust test delays here |
| int logInterval = ColocationLogger.getLogInterval(); |
| List<LogEvent> logEvents = Collections.emptyList(); |
| int nExpectedLogs = 1; |
| |
| createPR("Parent", true); |
| // Delay creation of descendant regions in the hierarchy to see missing colocated region |
| // log messages (logInterval/2 < delay < logInterval) |
| for (Object[] regionInfo : CHILD_REGION_RESTART_ORDER) { |
| loggingEventCaptor = ArgumentCaptor.forClass(LogEvent.class); |
| String childPRName = (String) regionInfo[0]; |
| String colocatedWithRegionName = (String) regionInfo[1]; |
| |
| // delay between starting generations of child regions and verify expected logging |
| await().untilAsserted(() -> { |
| verify(mockAppender, times(nExpectedLogs)).append(loggingEventCaptor.capture()); |
| }); |
| |
| // Finally start the next child region |
| createPR(childPRName, colocatedWithRegionName, true); |
| } |
| String logMsg; |
| logEvents = loggingEventCaptor.getAllValues(); |
| assertEquals(String.format("Expected warning messages to be logged."), nExpectedLogs, |
| logEvents.size()); |
| logMsg = logEvents.get(0).getMessage().getFormattedMessage(); |
| |
| // acknowledge interactions with the mock that have occurred |
| verify(mockAppender, atLeastOnce()).getName(); |
| verify(mockAppender, atLeastOnce()).isStarted(); |
| try { |
| // Another delay before exiting the thread to make sure that missing region logging |
| // doesn't continue after all regions are created (delay > logInterval) |
| verify(mockAppender, atLeastOnce()).append(any(LogEvent.class)); |
| Thread.sleep(logInterval * 2); |
| verifyNoMoreInteractions(mockAppender); |
| } finally { |
| logger.removeAppender(mockAppender); |
| } |
| |
| numExpectedLogMessages = 1; |
| mockAppender = null; |
| return logMsg; |
| } |
| }; |
| |
| /** |
| * Testing that all missing persistent PRs in a colocation tree hierarchy are logged as warnings. |
| * This test is a combines the "multiple children" and "hierarchy of children" tests. This is the |
| * colocation tree for this test |
| * |
| * <pre> |
| * Parent |
| * / \ |
| * / \ |
| * Gen1_C1 Gen1_C2 |
| * / \ / \ |
| * Gen2_C1_1 Gen2_C1_2 Gen2_C2_1 Gen2_C2_2 |
| * </pre> |
| */ |
| @Test |
| public void testFullTreeOfColocatedChildPRsWithMissingRegions() throws Throwable { |
| int loggerTestInterval = 4000; // millis |
| int numChildPRs = 2; |
| int numChildGenerations = 2; |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| vm0.invoke(createPRColocationTree); |
| vm1.invoke(createPRColocationTree); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a", "Parent"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "Gen1_C1"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "Gen1_C2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "Gen2_C1_1"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "Gen2_C1_2"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "Gen2_C2_1"); |
| createData(vm0, 0, NUM_BUCKETS, "c", "Gen2_C2_2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, "Parent"); |
| assertFalse(vm0Buckets.isEmpty()); |
| Set<Integer> vm1Buckets = getBucketList(vm1, "Parent"); |
| assertFalse(vm1Buckets.isEmpty()); |
| for (String region : new String[] {"Gen1_C1", "Gen1_C2", "Gen2_C1_1", "Gen2_C1_2", "Gen2_C2_1", |
| "Gen2_C2_2"}) { |
| assertEquals(vm0Buckets, getBucketList(vm0, region)); |
| assertEquals(vm1Buckets, getBucketList(vm1, region)); |
| } |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| vm0.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm1.invoke(new ColocationLoggerIntervalSetter(loggerTestInterval)); |
| vm0.invoke(new DelayForChildCreationSetter((int) (loggerTestInterval))); |
| vm1.invoke(new DelayForChildCreationSetter((int) (loggerTestInterval))); |
| |
| Object logMsg = ""; |
| AsyncInvocation async0 = vm0.invokeAsync(createPRsSequencedColocationTreeCreationThread); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRsSequencedColocationTreeCreationThread); |
| logMsg = async1.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| async0.get(MAX_WAIT, TimeUnit.MILLISECONDS); |
| vm0.invoke(new ColocationLoggerIntervalResetter()); |
| vm1.invoke(new ColocationLoggerIntervalResetter()); |
| |
| // Expected warning logs only on the child region, because without the child there's nothing |
| // known about the remaining hierarchy |
| assertTrue( |
| "Expected missing colocated region warning on remote. Got message \"" + logMsg + "\"", |
| logMsg.toString().matches(PATTERN_FOR_MISSING_CHILD_LOG)); |
| } |
| |
| /** |
| * Testing what happens we we recreate colocated persistent PRs by creating one PR everywhere and |
| * then the other PR everywhere. |
| */ |
| @Test |
| public void testColocatedPRsRecoveryOnePRAtATime() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } |
| }; |
| SerializableRunnable createChildPR = getCreateChildPRRunnable(); |
| vm0.invoke(createParentPR); |
| vm1.invoke(createParentPR); |
| vm2.invoke(createParentPR); |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| vm2.invoke(createChildPR); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| Set<Integer> vm2Buckets = getBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region2")); |
| |
| Set<Integer> vm0PrimaryBuckets = getPrimaryBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2")); |
| Set<Integer> vm1PrimaryBuckets = getPrimaryBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2")); |
| Set<Integer> vm2PrimaryBuckets = getPrimaryBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2")); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| closeCache(vm2); |
| |
| AsyncInvocation async0 = vm0.invokeAsync(createParentPR); |
| AsyncInvocation async1 = vm1.invokeAsync(createParentPR); |
| AsyncInvocation async2 = vm2.invokeAsync(createParentPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| vm2.invoke(createChildPR); |
| |
| Wait.pause(4000); |
| |
| assertEquals(vm0Buckets, getBucketList(vm0, getPartitionedRegionName())); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| assertEquals(vm1Buckets, getBucketList(vm1, getPartitionedRegionName())); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| assertEquals(vm2Buckets, getBucketList(vm2, getPartitionedRegionName())); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region2")); |
| |
| // primary can differ |
| vm0PrimaryBuckets = getPrimaryBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2")); |
| vm1PrimaryBuckets = getPrimaryBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2")); |
| vm2PrimaryBuckets = getPrimaryBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2")); |
| |
| |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| |
| // region 2 didn't have persistent data, so it nothing should be recovered |
| checkData(vm0, 0, NUM_BUCKETS, null, "region2"); |
| |
| // Make sure can do a put in all of the buckets in vm2 |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| // Now all of those buckets should exist |
| checkData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| // Now all the buckets should be restored in the appropriate places. |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region2")); |
| } |
| |
| private SerializableRunnable getCreateChildPRRunnable() { |
| return new SerializableRunnable("createChildPR") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| final CountDownLatch recoveryDone = new CountDownLatch(1); |
| ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() { |
| @Override |
| public void recoveryFinished(Region region) { |
| if (region.getName().equals("region2")) { |
| recoveryDone.countDown(); |
| } |
| } |
| }; |
| InternalResourceManager.setResourceObserver(observer); |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| |
| try { |
| recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| Assert.fail("interrupted", e); |
| } |
| } |
| }; |
| } |
| |
| @Test |
| public void testColocatedPRsRecoveryOneMemberLater() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } |
| }; |
| SerializableRunnable createChildPR = getCreateChildPRRunnable(); |
| |
| vm0.invoke(createParentPR); |
| vm1.invoke(createParentPR); |
| vm2.invoke(createParentPR); |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| vm2.invoke(createChildPR); |
| |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| Set<Integer> vm0Buckets = getBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0Buckets, getBucketList(vm0, "region2")); |
| Set<Integer> vm1Buckets = getBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1Buckets, getBucketList(vm1, "region2")); |
| Set<Integer> vm2Buckets = getBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region2")); |
| |
| Set<Integer> vm0PrimaryBuckets = getPrimaryBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2")); |
| Set<Integer> vm1PrimaryBuckets = getPrimaryBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2")); |
| Set<Integer> vm2PrimaryBuckets = getPrimaryBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2")); |
| |
| closeCache(vm2); |
| // Make sure the other members notice that vm2 has gone |
| // TODO use a callback for this. |
| Thread.sleep(4000); |
| |
| closeCache(vm0); |
| closeCache(vm1); |
| |
| // Create the members, but don't initialize |
| // VM2 yet |
| AsyncInvocation async0 = vm0.invokeAsync(createParentPR); |
| AsyncInvocation async1 = vm1.invokeAsync(createParentPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| waitForBucketRecovery(vm0, vm0Buckets); |
| waitForBucketRecovery(vm1, vm1Buckets); |
| |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| |
| // region 2 didn't have persistent data, so it nothing should be recovered |
| checkData(vm0, 0, NUM_BUCKETS, null, "region2"); |
| |
| // Make sure can do a put in all of the buckets in vm2 |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| // Now all of those buckets should exist |
| checkData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| // Now we initialize vm2. |
| vm2.invoke(createParentPR); |
| // Make sure vm2 hasn't created any buckets in the parent PR yet |
| // We don't want any buckets until the child PR is created |
| assertEquals(Collections.emptySet(), getBucketList(vm2, getPartitionedRegionName())); |
| vm2.invoke(createChildPR); |
| |
| // Now vm2 should have created all of the appropriate buckets. |
| assertEquals(vm2Buckets, getBucketList(vm2, getPartitionedRegionName())); |
| assertEquals(vm2Buckets, getBucketList(vm2, "region2")); |
| |
| vm0PrimaryBuckets = getPrimaryBucketList(vm0, getPartitionedRegionName()); |
| assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2")); |
| vm1PrimaryBuckets = getPrimaryBucketList(vm1, getPartitionedRegionName()); |
| assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2")); |
| vm2PrimaryBuckets = getPrimaryBucketList(vm2, getPartitionedRegionName()); |
| assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2")); |
| } |
| |
| @Test |
| public void testReplaceOfflineMemberAndRestart() throws Throwable { |
| SerializableRunnable createPRs = new SerializableRunnable("region1") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| |
| final CountDownLatch recoveryDone = new CountDownLatch(2); |
| ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() { |
| @Override |
| public void recoveryFinished(Region region) { |
| recoveryDone.countDown(); |
| } |
| }; |
| InternalResourceManager.setResourceObserver(observer); |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| |
| try { |
| if (!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) { |
| fail("timed out"); |
| } |
| } catch (InterruptedException e) { |
| Assert.fail("interrupted", e); |
| } |
| } |
| }; |
| |
| replaceOfflineMemberAndRestart(createPRs); |
| } |
| |
| /** |
| * Test that if we replace an offline member, even if colocated regions are in different disk |
| * stores, we still keep our metadata consistent. |
| * |
| */ |
| @Test |
| public void testReplaceOfflineMemberAndRestartTwoDiskStores() throws Throwable { |
| SerializableRunnable createPRs = new SerializableRunnable("region1") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| |
| final CountDownLatch recoveryDone = new CountDownLatch(2); |
| ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() { |
| @Override |
| public void recoveryFinished(Region region) { |
| recoveryDone.countDown(); |
| } |
| }; |
| InternalResourceManager.setResourceObserver(observer); |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| |
| DiskStore ds2 = cache.findDiskStore("disk2"); |
| if (ds2 == null) { |
| ds2 = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk2"); |
| } |
| |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setPartitionAttributes(paf.create()); |
| af.setDiskStoreName("disk2"); |
| cache.createRegion("region2", af.create()); |
| |
| try { |
| if (!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) { |
| fail("timed out"); |
| } |
| } catch (InterruptedException e) { |
| Assert.fail("interrupted", e); |
| } |
| } |
| }; |
| |
| replaceOfflineMemberAndRestart(createPRs); |
| } |
| |
| /** |
| * Test for support issue 7870. 1. Run three members with redundancy 1 and recovery delay 0 2. |
| * Kill one of the members, to trigger replacement of buckets 3. Shutdown all members and restart. |
| * |
| * What was happening is that in the parent PR, we discarded our offline data in one member, but |
| * in the child PR the other members ended up waiting for the child bucket to be created in the |
| * member that discarded it's offline data. |
| * |
| */ |
| public void replaceOfflineMemberAndRestart(SerializableRunnable createPRs) throws Throwable { |
| disconnectAllFromDS(); |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| // Create the PR on three members |
| vm0.invoke(createPRs); |
| vm1.invoke(createPRs); |
| vm2.invoke(createPRs); |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Close one of the members to trigger redundancy recovery. |
| closeCache(vm2); |
| |
| // Wait until redundancy is recovered. |
| waitForRedundancyRecovery(vm0, 1, getPartitionedRegionName()); |
| waitForRedundancyRecovery(vm0, 1, "region2"); |
| |
| createData(vm0, 0, NUM_BUCKETS, "b"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| IgnoredException expected = IgnoredException.addIgnoredException("PartitionOfflineException"); |
| try { |
| |
| // Close the remaining members. |
| vm0.invoke(new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| InternalDistributedSystem ds = |
| (InternalDistributedSystem) getCache().getDistributedSystem(); |
| AdminDistributedSystemImpl.shutDownAllMembers(ds.getDistributionManager(), 600000); |
| return null; |
| } |
| }); |
| |
| // Make sure that vm-1 is completely disconnected |
| // The shutdown all asynchronously finishes the disconnect after |
| // replying to the admin member. |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| basicGetSystem().disconnect(); |
| } |
| }); |
| |
| // Recreate the members. Try to make sure that |
| // the member with the latest copy of the buckets |
| // is the one that decides to throw away it's copy |
| // by starting it last. |
| AsyncInvocation async0 = vm0.invokeAsync(createPRs); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRs); |
| Wait.pause(2000); |
| AsyncInvocation async2 = vm2.invokeAsync(createPRs); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| |
| checkData(vm0, 0, NUM_BUCKETS, "b"); |
| checkData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| waitForRedundancyRecovery(vm0, 1, getPartitionedRegionName()); |
| waitForRedundancyRecovery(vm0, 1, "region2"); |
| waitForRedundancyRecovery(vm1, 1, getPartitionedRegionName()); |
| waitForRedundancyRecovery(vm1, 1, "region2"); |
| waitForRedundancyRecovery(vm2, 1, getPartitionedRegionName()); |
| waitForRedundancyRecovery(vm2, 1, "region2"); |
| |
| // Make sure we don't have any extra buckets after the restart |
| int totalBucketCount = getBucketList(vm0).size(); |
| totalBucketCount += getBucketList(vm1).size(); |
| totalBucketCount += getBucketList(vm2).size(); |
| |
| assertEquals(2 * NUM_BUCKETS, totalBucketCount); |
| |
| totalBucketCount = getBucketList(vm0, "region2").size(); |
| totalBucketCount += getBucketList(vm1, "region2").size(); |
| totalBucketCount += getBucketList(vm2, "region2").size(); |
| |
| assertEquals(2 * NUM_BUCKETS, totalBucketCount); |
| } finally { |
| expected.remove(); |
| } |
| } |
| |
| @Test |
| public void testReplaceOfflineMemberAndRestartCreateColocatedPRLate() throws Throwable { |
| SerializableRunnable createParentPR = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| final CountDownLatch recoveryDone = new CountDownLatch(1); |
| ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() { |
| @Override |
| public void recoveryFinished(Region region) { |
| if (region.getName().contains("region2")) { |
| recoveryDone.countDown(); |
| } |
| } |
| }; |
| InternalResourceManager.setResourceObserver(observer); |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| |
| try { |
| if (!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) { |
| fail("timed out"); |
| } |
| } catch (InterruptedException e) { |
| Assert.fail("interrupted", e); |
| } |
| } |
| }; |
| |
| replaceOfflineMemberAndRestartCreateColocatedPRLate(createParentPR, createChildPR); |
| } |
| |
| @Test |
| public void testReplaceOfflineMemberAndRestartCreateColocatedPRLateTwoDiskStores() |
| throws Throwable { |
| SerializableRunnable createParentPR = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| final CountDownLatch recoveryDone = new CountDownLatch(1); |
| ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() { |
| @Override |
| public void recoveryFinished(Region region) { |
| if (region.getName().contains("region2")) { |
| recoveryDone.countDown(); |
| } |
| } |
| }; |
| InternalResourceManager.setResourceObserver(observer); |
| |
| DiskStore ds2 = cache.findDiskStore("disk2"); |
| if (ds2 == null) { |
| ds2 = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk2"); |
| } |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk2"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| |
| try { |
| if (!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) { |
| fail("timed out"); |
| } |
| } catch (InterruptedException e) { |
| Assert.fail("interrupted", e); |
| } |
| } |
| }; |
| |
| replaceOfflineMemberAndRestartCreateColocatedPRLate(createParentPR, createChildPR); |
| } |
| |
| /** |
| * Test for support issue 7870. 1. Run three members with redundancy 1 and recovery delay 0 2. |
| * Kill one of the members, to trigger replacement of buckets 3. Shutdown all members and restart. |
| * |
| * What was happening is that in the parent PR, we discarded our offline data in one member, but |
| * in the child PR the other members ended up waiting for the child bucket to be created in the |
| * member that discarded it's offline data. |
| * |
| * In this test case, we're creating the child PR later, after the parent buckets have already |
| * been completely created. |
| * |
| */ |
| public void replaceOfflineMemberAndRestartCreateColocatedPRLate( |
| SerializableRunnable createParentPR, SerializableRunnable createChildPR) throws Throwable { |
| IgnoredException.addIgnoredException("PartitionOfflineException"); |
| IgnoredException.addIgnoredException("RegionDestroyedException"); |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| |
| |
| // Create the PRs on three members |
| vm0.invoke(createParentPR); |
| vm1.invoke(createParentPR); |
| vm2.invoke(createParentPR); |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| vm2.invoke(createChildPR); |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Close one of the members to trigger redundancy recovery. |
| closeCache(vm2); |
| |
| // Wait until redundancy is recovered. |
| waitForRedundancyRecovery(vm0, 1, getPartitionedRegionName()); |
| waitForRedundancyRecovery(vm0, 1, "region2"); |
| |
| createData(vm0, 0, NUM_BUCKETS, "b"); |
| createData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| // Close the remaining members. |
| vm0.invoke(new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| InternalDistributedSystem ds = |
| (InternalDistributedSystem) getCache().getDistributedSystem(); |
| AdminDistributedSystemImpl.shutDownAllMembers(ds.getDistributionManager(), 0); |
| return null; |
| } |
| }); |
| |
| // Make sure that vm-1 is completely disconnected |
| // The shutdown all asynchronously finishes the disconnect after |
| // replying to the admin member. |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| basicGetSystem().disconnect(); |
| } |
| }); |
| |
| // Recreate the parent region. Try to make sure that |
| // the member with the latest copy of the buckets |
| // is the one that decides to throw away it's copy |
| // by starting it last. |
| AsyncInvocation async2 = vm2.invokeAsync(createParentPR); |
| AsyncInvocation async1 = vm1.invokeAsync(createParentPR); |
| Wait.pause(2000); |
| AsyncInvocation async0 = vm0.invokeAsync(createParentPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| |
| // Wait for async tasks |
| Wait.pause(2000); |
| |
| // Recreate the child region. |
| async2 = vm2.invokeAsync(createChildPR); |
| async1 = vm1.invokeAsync(createChildPR); |
| async0 = vm0.invokeAsync(createChildPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| |
| // Validate the data |
| checkData(vm0, 0, NUM_BUCKETS, "b"); |
| checkData(vm0, 0, NUM_BUCKETS, "b", "region2"); |
| |
| // Make sure we can actually use the buckets in the child region. |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| waitForRedundancyRecovery(vm0, 1, getPartitionedRegionName()); |
| waitForRedundancyRecovery(vm0, 1, "region2"); |
| |
| // Make sure we don't have any extra buckets after the restart |
| int totalBucketCount = getBucketList(vm0).size(); |
| totalBucketCount += getBucketList(vm1).size(); |
| totalBucketCount += getBucketList(vm2).size(); |
| |
| assertEquals(2 * NUM_BUCKETS, totalBucketCount); |
| |
| totalBucketCount = getBucketList(vm0, "region2").size(); |
| totalBucketCount += getBucketList(vm1, "region2").size(); |
| totalBucketCount += getBucketList(vm2, "region2").size(); |
| |
| assertEquals(2 * NUM_BUCKETS, totalBucketCount); |
| } |
| |
| /** |
| * Test what happens when we crash in the middle of satisfying redundancy for a colocated bucket. |
| * |
| */ |
| // This test method is disabled because it is failing |
| // periodically and causing cruise control failures |
| // See bug #46748 |
| @Test |
| public void testCrashDuringRedundancySatisfaction() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| SerializableRunnable createPRs = new SerializableRunnable("region1") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| // Workaround for 44414 - disable recovery delay so we shutdown |
| // vm1 at a predictable point. |
| paf.setRecoveryDelay(-1); |
| paf.setStartupRecoveryDelay(-1); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| } |
| }; |
| |
| // Create the PR on vm0 |
| vm0.invoke(createPRs); |
| |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| vm1.invoke(createPRs); |
| |
| // We shouldn't have created any buckets in vm1 yet. |
| assertEquals(Collections.emptySet(), getBucketList(vm1)); |
| |
| // Add an observer that will disconnect before allowing the peer to |
| // GII a colocated bucket. This should leave the peer with only the parent |
| // bucket |
| vm0.invoke(new SerializableRunnable() { |
| |
| @Override |
| public void run() { |
| DistributionMessageObserver.setInstance(new DistributionMessageObserver() { |
| @Override |
| public void beforeProcessMessage(ClusterDistributionManager dm, |
| DistributionMessage message) { |
| if (message instanceof RequestImageMessage) { |
| if (((RequestImageMessage) message).regionPath.contains("region2")) { |
| DistributionMessageObserver.setInstance(null); |
| disconnectFromDS(); |
| } |
| } |
| } |
| }); |
| } |
| }); |
| |
| IgnoredException ex = IgnoredException.addIgnoredException("PartitionOfflineException", vm1); |
| try { |
| |
| // Do a rebalance to create buckets in vm1. THis will cause vm0 to disconnect |
| // as we satisfy redundancy with vm1. |
| try { |
| RebalanceResults rr = rebalance(vm1); |
| } catch (Exception expected) { |
| // We expect to see a partition offline exception because of the |
| // disconnect |
| if (!(expected.getCause() instanceof PartitionOfflineException)) { |
| throw expected; |
| } |
| } |
| |
| |
| // Wait for vm0 to be closed by the callback |
| vm0.invoke(new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| GeodeAwaitility.await().untilAsserted(new WaitCriterion() { |
| @Override |
| public boolean done() { |
| InternalDistributedSystem ds = basicGetSystem(); |
| return ds == null || !ds.isConnected(); |
| } |
| |
| @Override |
| public String description() { |
| return "DS did not disconnect"; |
| } |
| }); |
| |
| return null; |
| } |
| }); |
| |
| // close the cache in vm1 |
| SerializableCallable disconnectFromDS = new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| disconnectFromDS(); |
| return null; |
| } |
| }; |
| vm1.invoke(disconnectFromDS); |
| |
| // Make sure vm0 is disconnected. This avoids a race where we |
| // may still in the process of disconnecting even though the our async listener |
| // found the system was disconnected |
| vm0.invoke(disconnectFromDS); |
| } finally { |
| ex.remove(); |
| } |
| |
| // Create the cache and PRs on both members |
| AsyncInvocation async0 = vm0.invokeAsync(createPRs); |
| AsyncInvocation async1 = vm1.invokeAsync(createPRs); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| |
| // Make sure the data was recovered correctly |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| // Workaround for bug 46748. |
| checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| } |
| |
| @Test |
| public void testRebalanceWithOfflineChildRegion() throws Throwable { |
| SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| } |
| }; |
| |
| rebalanceWithOfflineChildRegion(createParentPR, createChildPR); |
| |
| } |
| |
| /** |
| * Test that a rebalance will regions are in the middle of recovery doesn't cause issues. |
| * |
| * This is slightly different than {{@link #testRebalanceWithOfflineChildRegion()} because in this |
| * case all of the regions have been created, but they are in the middle of actually recovering |
| * buckets from disk. |
| */ |
| @Test |
| public void testRebalanceDuringRecovery() throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| SerializableRunnable createPRs = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(-1); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| |
| paf.setRedundantCopies(1); |
| paf.setRecoveryDelay(-1); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| } |
| }; |
| |
| |
| // Create the PRs on two members |
| vm0.invoke(createPRs); |
| vm1.invoke(createPRs); |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Close the members |
| closeCache(vm1); |
| closeCache(vm0); |
| |
| SerializableRunnable addHook = new SerializableRunnable() { |
| @Override |
| public void run() { |
| PartitionedRegionObserverHolder.setInstance(new PRObserver()); |
| } |
| }; |
| |
| SerializableRunnable waitForHook = new SerializableRunnable() { |
| @Override |
| public void run() { |
| PRObserver observer = (PRObserver) PartitionedRegionObserverHolder.getInstance(); |
| try { |
| observer.waitForCreate(); |
| } catch (InterruptedException e) { |
| Assert.fail("interrupted", e); |
| } |
| } |
| }; |
| |
| SerializableRunnable removeHook = new SerializableRunnable() { |
| @Override |
| public void run() { |
| PRObserver observer = (PRObserver) PartitionedRegionObserverHolder.getInstance(); |
| observer.release(); |
| PartitionedRegionObserverHolder.setInstance(new PartitionedRegionObserverAdapter()); |
| } |
| }; |
| |
| vm1.invoke(addHook); |
| AsyncInvocation async0; |
| AsyncInvocation async1; |
| AsyncInvocation async2; |
| RebalanceResults rebalanceResults; |
| try { |
| async0 = vm0.invokeAsync(createPRs); |
| async1 = vm1.invokeAsync(createPRs); |
| |
| vm1.invoke(waitForHook); |
| |
| // Now create the parent region on vm-2. vm-2 did not |
| // previous host the child region. |
| vm2.invoke(createPRs); |
| |
| // Try to forcibly move some buckets to vm2 (this should not succeed). |
| moveBucket(0, vm1, vm2); |
| moveBucket(1, vm1, vm2); |
| |
| } finally { |
| vm1.invoke(removeHook); |
| } |
| |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| |
| // Validate the data |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Make sure we can actually use the buckets in the child region. |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| |
| // Make sure the system is recoverable |
| // by restarting it |
| closeCache(vm2); |
| closeCache(vm1); |
| closeCache(vm0); |
| |
| async0 = vm0.invokeAsync(createPRs); |
| async1 = vm1.invokeAsync(createPRs); |
| async2 = vm2.invokeAsync(createPRs); |
| async0.getResult(); |
| async1.getResult(); |
| async2.getResult(); |
| } |
| |
| @Test |
| public void testRebalanceWithOfflineChildRegionTwoDiskStores() throws Throwable { |
| SerializableRunnable createParentPR = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| |
| DiskStore ds2 = cache.findDiskStore("disk2"); |
| if (ds2 == null) { |
| ds2 = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk2"); |
| } |
| |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk2"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| } |
| }; |
| |
| rebalanceWithOfflineChildRegion(createParentPR, createChildPR); |
| } |
| |
| /** |
| * Test that a user is not allowed to change the colocation of a PR with persistent data. |
| * |
| */ |
| @Test |
| public void testModifyColocation() throws Throwable { |
| // Create PRs where region3 is colocated with region1. |
| createColocatedPRs("region1"); |
| |
| // Close everything |
| closeCache(); |
| |
| // Restart colocated with "region2" |
| IgnoredException ex = |
| IgnoredException.addIgnoredException("DiskAccessException|IllegalStateException"); |
| try { |
| createColocatedPRs("region2"); |
| fail("Should have received an illegal state exception"); |
| } catch (IllegalStateException expected) { |
| // do nothing |
| } finally { |
| ex.remove(); |
| } |
| |
| // Close everything |
| closeCache(); |
| |
| // Restart colocated with region1. |
| // Make sure we didn't screw anything up. |
| createColocatedPRs("/region1"); |
| |
| // Close everything |
| closeCache(); |
| |
| // Restart uncolocated. We don't allow changing |
| // from uncolocated to colocated. |
| ex = IgnoredException.addIgnoredException("DiskAccessException|IllegalStateException"); |
| try { |
| createColocatedPRs(null); |
| fail("Should have received an illegal state exception"); |
| } catch (IllegalStateException expected) { |
| // do nothing |
| } finally { |
| ex.remove(); |
| } |
| |
| // Close everything |
| closeCache(); |
| } |
| |
| @Test |
| public void testParentRegionGetWithOfflineChildRegion() throws Throwable { |
| |
| SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { |
| @Override |
| public void run() { |
| String oldRetryTimeout = System.setProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); |
| try { |
| Cache cache = getCache(); |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } finally { |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", |
| String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| } |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { |
| @Override |
| public void run() throws InterruptedException { |
| String oldRetryTimeout = System.setProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); |
| try { |
| Cache cache = getCache(); |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| af.setPartitionAttributes(paf.create()); |
| // delay child region creations to cause a delay in persistent recovery |
| Thread.sleep(100); |
| cache.createRegion("region2", af.create()); |
| } finally { |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", |
| String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| } |
| } |
| }; |
| |
| boolean caughtException = false; |
| try { |
| // Expect a get() on the un-recovered (due to offline child) parent region to fail |
| regionGetWithOfflineChild(createParentPR, createChildPR, false); |
| } catch (Exception e) { |
| caughtException = true; |
| assertTrue(e instanceof RMIException); |
| assertTrue(e.getCause() instanceof PartitionOfflineException); |
| } |
| if (!caughtException) { |
| fail("Expected TimeoutException from remote"); |
| } |
| } |
| |
| @Test |
| public void testParentRegionGetWithRecoveryInProgress() throws Throwable { |
| SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { |
| @Override |
| public void run() { |
| String oldRetryTimeout = System.setProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); |
| try { |
| Cache cache = getCache(); |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } finally { |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", |
| String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| System.out.println("oldRetryTimeout = " + oldRetryTimeout); |
| } |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { |
| @Override |
| public void run() throws InterruptedException { |
| String oldRetryTimeout = System.setProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); |
| try { |
| Cache cache = getCache(); |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region2", af.create()); |
| } finally { |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", |
| String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| } |
| } |
| }; |
| |
| boolean caughtException = false; |
| try { |
| // Expect a get() on the un-recovered (due to offline child) parent region to fail |
| regionGetWithOfflineChild(createParentPR, createChildPR, false); |
| } catch (Exception e) { |
| caughtException = true; |
| assertTrue(e instanceof RMIException); |
| assertTrue(e.getCause() instanceof PartitionOfflineException); |
| } |
| if (!caughtException) { |
| fail("Expected TimeoutException from remote"); |
| } |
| } |
| |
| @Test |
| public void testParentRegionPutWithRecoveryInProgress() throws Throwable { |
| SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { |
| @Override |
| public void run() { |
| String oldRetryTimeout = System.setProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); |
| System.out.println("oldRetryTimeout = " + oldRetryTimeout); |
| try { |
| Cache cache = getCache(); |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion(getPartitionedRegionName(), af.create()); |
| } finally { |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", |
| String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| } |
| } |
| }; |
| |
| SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { |
| @Override |
| public void run() throws InterruptedException { |
| String oldRetryTimeout = System.setProperty( |
| DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); |
| try { |
| Cache cache = getCache(); |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| paf.setRecoveryDelay(0); |
| paf.setColocatedWith(getPartitionedRegionName()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| af.setPartitionAttributes(paf.create()); |
| Thread.sleep(1000); |
| cache.createRegion("region2", af.create()); |
| } finally { |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", |
| String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); |
| } |
| } |
| }; |
| |
| boolean caughtException = false; |
| try { |
| // Expect a get() on the un-recovered (due to offline child) parent region to fail |
| regionGetWithOfflineChild(createParentPR, createChildPR, false); |
| } catch (Exception e) { |
| caughtException = true; |
| assertTrue(e instanceof RMIException); |
| assertTrue(e.getCause() instanceof PartitionOfflineException); |
| } |
| if (!caughtException) { |
| fail("Expected TimeoutException from remote"); |
| } |
| } |
| |
| /** |
| * Create three PRs on a VM, named region1, region2, and region3. The colocated with attribute |
| * describes which region region3 should be colocated with. |
| * |
| */ |
| private void createColocatedPRs(final String colocatedWith) { |
| Cache cache = getCache(); |
| |
| DiskStore ds = cache.findDiskStore("disk"); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); |
| } |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(0); |
| af.setPartitionAttributes(paf.create()); |
| af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); |
| af.setDiskStoreName("disk"); |
| cache.createRegion("region1", af.create()); |
| |
| cache.createRegion("region2", af.create()); |
| |
| if (colocatedWith != null) { |
| paf.setColocatedWith(colocatedWith); |
| } |
| af.setPartitionAttributes(paf.create()); |
| cache.createRegion("region3", af.create()); |
| } |
| |
| /** |
| * Test for bug 43570. Rebalance a persistent parent PR before we recover the persistent child PR |
| * from disk. |
| * |
| */ |
| public void rebalanceWithOfflineChildRegion(SerializableRunnable createParentPR, |
| SerializableRunnable createChildPR) throws Throwable { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| |
| |
| // Create the PRs on two members |
| vm0.invoke(createParentPR); |
| vm1.invoke(createParentPR); |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Close the members |
| closeCache(vm1); |
| closeCache(vm0); |
| |
| // Recreate the parent region. Try to make sure that |
| // the member with the latest copy of the buckets |
| // is the one that decides to throw away it's copy |
| // by starting it last. |
| AsyncInvocation async0 = vm0.invokeAsync(createParentPR); |
| AsyncInvocation async1 = vm1.invokeAsync(createParentPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| |
| // Now create the parent region on vm-2. vm-2 did not |
| // previous host the child region. |
| vm2.invoke(createParentPR); |
| |
| // Rebalance the parent region. |
| // This should not move any buckets, because |
| // we haven't recovered the child region |
| RebalanceResults rebalanceResults = rebalance(vm2); |
| assertEquals(0, rebalanceResults.getTotalBucketTransfersCompleted()); |
| |
| // Recreate the child region. |
| async1 = vm1.invokeAsync(createChildPR); |
| async0 = vm0.invokeAsync(createChildPR); |
| AsyncInvocation async2 = vm2.invokeAsync(createChildPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| |
| // Validate the data |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Make sure we can actually use the buckets in the child region. |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| } |
| |
| /** |
| * Create a colocated pair of persistent regions and populate them with data. Shut down the |
| * servers and then restart them and check the data. |
| * <p> |
| * On the restart, try region operations ({@code get()}) on the parent region before or during |
| * persistent recovery. The {@code concurrentCheckData} argument determines whether the operation |
| * from the parent region occurs before or concurrent with the child region creation and recovery. |
| * |
| * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member |
| * @param createChildPR {@link SerializableRunnable} for creating the child region on one member |
| */ |
| public void regionGetWithOfflineChild(SerializableRunnable createParentPR, |
| SerializableRunnable createChildPR, boolean concurrentCheckData) throws Throwable { |
| Host host = Host.getHost(0); |
| final VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| // Create the PRs on two members |
| vm0.invoke(createParentPR); |
| vm1.invoke(createParentPR); |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Close the members |
| closeCache(vm1); |
| closeCache(vm0); |
| |
| SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| Region region = cache.getRegion(getPartitionedRegionName()); |
| |
| for (int i = 0; i < NUM_BUCKETS; i++) { |
| assertEquals("For key " + i, "a", region.get(i)); |
| } |
| } |
| }); |
| |
| try { |
| // Recreate the parent region. Try to make sure that |
| // the member with the latest copy of the buckets |
| // is the one that decides to throw away it's copy |
| // by starting it last. |
| AsyncInvocation async0 = vm0.invokeAsync(createParentPR); |
| AsyncInvocation async1 = vm1.invokeAsync(createParentPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| // Now create the parent region on vm-2. vm-2 did not |
| // previously host the child region. |
| vm2.invoke(createParentPR); |
| |
| AsyncInvocation async2 = null; |
| AsyncInvocation asyncCheck = null; |
| if (concurrentCheckData) { |
| // Recreate the child region. |
| async1 = vm1.invokeAsync(createChildPR); |
| async0 = vm0.invokeAsync(createChildPR); |
| async2 = vm2.invokeAsync(new SerializableRunnable("delay") { |
| @Override |
| public void run() throws InterruptedException { |
| Thread.sleep(100); |
| vm2.invoke(createChildPR); |
| } |
| }); |
| |
| asyncCheck = vm0.invokeAsync(checkDataOnParent); |
| } else { |
| vm0.invoke(checkDataOnParent); |
| } |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| asyncCheck.getResult(MAX_WAIT); |
| // Validate the data |
| checkData(vm0, 0, NUM_BUCKETS, "a"); |
| checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| // Make sure we can actually use the buckets in the child region. |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| } finally { |
| // Close the members |
| closeCache(vm1); |
| closeCache(vm0); |
| closeCache(vm2); |
| } |
| } |
| |
| /** |
| * Create a colocated pair of persistent regions and populate them with data. Shut down the |
| * servers and then restart them. |
| * <p> |
| * On the restart, try region operations ({@code put()}) on the parent region before or during |
| * persistent recovery. The {@code concurrentCreatekData} argument determines whether the |
| * operation from the parent region occurs before or concurrent with the child region creation and |
| * recovery. |
| * |
| * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member |
| * @param createChildPR {@link SerializableRunnable} for creating the child region on one member |
| */ |
| public void regionPutWithOfflineChild(SerializableRunnable createParentPR, |
| SerializableRunnable createChildPR, boolean concurrentCreateData) throws Throwable { |
| Host host = Host.getHost(0); |
| final VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| Region region = cache.getRegion(getPartitionedRegionName()); |
| |
| for (int i = 0; i < NUM_BUCKETS; i++) { |
| assertEquals("For key " + i, "a", region.get(i)); |
| } |
| } |
| }); |
| |
| SerializableRunnable createDataOnParent = new SerializableRunnable("createDataOnParent") { |
| |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| LogWriterUtils.getLogWriter().info("creating data in " + getPartitionedRegionName()); |
| Region region = cache.getRegion(getPartitionedRegionName()); |
| |
| for (int i = 0; i < NUM_BUCKETS; i++) { |
| region.put(i, "c"); |
| assertEquals("For key " + i, "c", region.get(i)); |
| } |
| } |
| }; |
| |
| // Create the PRs on two members |
| vm0.invoke(createParentPR); |
| vm1.invoke(createParentPR); |
| vm0.invoke(createChildPR); |
| vm1.invoke(createChildPR); |
| |
| // Create some buckets. |
| createData(vm0, 0, NUM_BUCKETS, "a"); |
| createData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| |
| // Close the members |
| closeCache(vm1); |
| closeCache(vm0); |
| |
| try { |
| // Recreate the parent region. Try to make sure that |
| // the member with the latest copy of the buckets |
| // is the one that decides to throw away it's copy |
| // by starting it last. |
| AsyncInvocation async0 = vm0.invokeAsync(createParentPR); |
| AsyncInvocation async1 = vm1.invokeAsync(createParentPR); |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| // Now create the parent region on vm-2. vm-2 did not |
| // previous host the child region. |
| vm2.invoke(createParentPR); |
| |
| AsyncInvocation async2 = null; |
| AsyncInvocation asyncPut = null; |
| if (concurrentCreateData) { |
| // Recreate the child region. |
| async1 = vm1.invokeAsync(createChildPR); |
| async0 = vm0.invokeAsync(createChildPR); |
| async2 = vm2.invokeAsync(createChildPR); |
| |
| Thread.sleep(100); |
| asyncPut = vm0.invokeAsync(createDataOnParent); |
| } else { |
| vm0.invoke(createDataOnParent); |
| } |
| async0.getResult(MAX_WAIT); |
| async1.getResult(MAX_WAIT); |
| async2.getResult(MAX_WAIT); |
| asyncPut.getResult(MAX_WAIT); |
| // Validate the data |
| checkData(vm0, 0, NUM_BUCKETS, "c"); |
| checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); |
| // Make sure we can actually use the buckets in the child region. |
| createData(vm0, 0, NUM_BUCKETS, "c", "region2"); |
| } finally { |
| // Close the members |
| closeCache(vm1); |
| closeCache(vm0); |
| closeCache(vm2); |
| } |
| } |
| |
| private RebalanceResults rebalance(VM vm) { |
| return (RebalanceResults) vm.invoke(new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| RebalanceOperation op = getCache().getResourceManager().createRebalanceFactory().start(); |
| return op.getResults(); |
| } |
| }); |
| } |
| |
| private static class PRObserver extends PartitionedRegionObserverAdapter { |
| private CountDownLatch rebalanceDone = new CountDownLatch(1); |
| private CountDownLatch bucketCreateStarted = new CountDownLatch(3); |
| |
| @Override |
| public void beforeBucketCreation(PartitionedRegion region, int bucketId) { |
| if (region.getName().contains("region2")) { |
| bucketCreateStarted.countDown(); |
| waitForRebalance(); |
| } |
| } |
| |
| |
| |
| private void waitForRebalance() { |
| try { |
| if (!rebalanceDone.await(MAX_WAIT, TimeUnit.SECONDS)) { |
| fail("Failed waiting for the rebalance to start"); |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void waitForCreate() throws InterruptedException { |
| if (!bucketCreateStarted.await(MAX_WAIT, TimeUnit.SECONDS)) { |
| fail("Failed waiting for bucket creation to start"); |
| } |
| } |
| |
| public void release() { |
| rebalanceDone.countDown(); |
| } |
| } |
| |
| } |